## What changes were proposed in this pull request?
Logging was made private in Spark 2.0. If we move it, then users would be able to create a Logging trait themselves to avoid changing their own code.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11764 from cloud-fan/logger.
This commit updates the HiveContext so that sc.hadoopConfiguration is used to instantiate its internal instances of HiveConf.
I tested this by overriding the S3 FileSystem implementation from spark-defaults.conf as "spark.hadoop.fs.s3.impl" (to avoid [HADOOP-12810](https://issues.apache.org/jira/browse/HADOOP-12810)).
Author: Ryan Blue <blue@apache.org>
Closes#11273 from rdblue/SPARK-13403-new-hive-conf-from-hadoop-conf.
## What changes were proposed in this pull request?
Since developer API of plug-able parser has been removed in #10801 , docs should be updated accordingly.
## How was this patch tested?
This patch will not affect the real code path.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#11758 from adrian-wang/spark12855.
## What changes were proposed in this pull request?
As part of the effort to merge `SQLContext` and `HiveContext`, this patch implements an internal catalog called `SessionCatalog` that handles temporary functions and tables and delegates metastore operations to `ExternalCatalog`. Currently, this is still dead code, but in the future it will be part of `SessionState` and will replace `o.a.s.sql.catalyst.analysis.Catalog`.
A recent patch #11573 parses Hive commands ourselves in Spark, but still passes the entire query text to Hive. In a future patch, we will use `SessionCatalog` to implement the parsed commands.
## How was this patch tested?
800+ lines of tests in `SessionCatalogSuite`.
Author: Andrew Or <andrew@databricks.com>
Closes#11750 from andrewor14/temp-catalog.
#### What changes were proposed in this pull request?
This PR is to convert to SQL from analyzed logical plans containing operator `ScriptTransformation`.
For example, below is the SQL containing `Transform`
```
SELECT TRANSFORM (a, b, c, d) USING 'cat' FROM parquet_t2
```
Its logical plan is like
```
ScriptTransformation [a#210L,b#211L,c#212L,d#213L], cat, [key#208,value#209], HiveScriptIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim, )),List((field.delim, )),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),true)
+- SubqueryAlias parquet_t2
+- Relation[a#210L,b#211L,c#212L,d#213L] ParquetRelation
```
The generated SQL will be like
```
SELECT TRANSFORM (`parquet_t2`.`a`, `parquet_t2`.`b`, `parquet_t2`.`c`, `parquet_t2`.`d`) USING 'cat' AS (`key` string, `value` string) FROM `default`.`parquet_t2`
```
#### How was this patch tested?
Seven test cases are added to `LogicalPlanToSQLSuite`.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#11503 from gatorsmile/transformToSQL.
## What changes were proposed in this pull request?
This PR tries to solve a fundamental issue in the `SQLBuilder`. When we want to turn a logical plan into SQL string and put it after FROM clause, we need to wrap it with a sub-query. However, a logical plan is allowed to have same-name outputs with different qualifiers(e.g. the `Join` operator), and this kind of plan can't be put under a subquery as we will erase and assign a new qualifier to all outputs and make it impossible to distinguish same-name outputs.
To solve this problem, this PR renames all attributes with globally unique names(using exprId), so that we don't need qualifiers to resolve ambiguity anymore.
For example, `SELECT x.key, MAX(y.key) OVER () FROM t x JOIN t y`, we will parse this SQL to a Window operator and a Project operator, and add a sub-query between them. The generated SQL looks like:
```
SELECT sq_1.key, sq_1.max
FROM (
SELECT sq_0.key, sq_0.key, MAX(sq_0.key) OVER () AS max
FROM (
SELECT x.key, y.key FROM t1 AS x JOIN t2 AS y
) AS sq_0
) AS sq_1
```
You can see, the `key` columns become ambiguous after `sq_0`.
After this PR, it will generate something like:
```
SELECT attr_30 AS key, attr_37 AS max
FROM (
SELECT attr_30, attr_37
FROM (
SELECT attr_30, attr_35, MAX(attr_35) AS attr_37
FROM (
SELECT attr_30, attr_35 FROM
(SELECT key AS attr_30 FROM t1) AS sq_0
INNER JOIN
(SELECT key AS attr_35 FROM t1) AS sq_1
) AS sq_2
) AS sq_3
) AS sq_4
```
The outermost SELECT is used to turn the generated named to real names back, and the innermost SELECT is used to alias real columns to our generated names. Between them, there is no name ambiguity anymore.
## How was this patch tested?
existing tests and new tests in LogicalPlanToSQLSuite.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11658 from cloud-fan/gensql.
## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-13894
Change the return type of the `SQLContext.range` API from `DataFrame` to `Dataset`.
## How was this patch tested?
No additional unit test required.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#11730 from chenghao-intel/range.
## What changes were proposed in this pull request?
There is a feature of hive SQL called multi-insert. For example:
```
FROM src
INSERT OVERWRITE TABLE dest1
SELECT key + 1
INSERT OVERWRITE TABLE dest2
SELECT key WHERE key > 2
INSERT OVERWRITE TABLE dest3
SELECT col EXPLODE(arr) exp AS col
...
```
We partially support it currently, with some limitations: 1) WHERE can't reference columns produced by LATERAL VIEW. 2) It's not executed eagerly, i.e. `sql("...multi-insert clause...")` won't take place right away like other commands, e.g. CREATE TABLE.
This PR removes these limitations and make us fully support multi-insert.
## How was this patch tested?
new tests in `SQLQuerySuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11754 from cloud-fan/lateral-view.
## What changes were proposed in this pull request?
Follow up to https://github.com/apache/spark/pull/11657
- Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8`
- And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests)
- And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes#11725 from srowen/SPARK-13823.2.
## What changes were proposed in this pull request?
The purpose of [SPARK-12653](https://issues.apache.org/jira/browse/SPARK-12653) is re-enabling a regression test.
Historically, the target regression test is added by [SPARK-8498](093c34838d), but is temporarily disabled by [SPARK-12615](8ce645d4ee) due to binary compatibility error.
The following is the current error message at the submitting spark job with the pre-built `test.jar` file in the target regression test.
```
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.SparkContext$.$lessinit$greater$default$6()Lscala/collection/Map;
```
Simple rebuilding `test.jar` can not recover the purpose of testcase since we need to support both Scala 2.10 and 2.11 for a while. For example, we will face the following Scala 2.11 error if we use `test.jar` built by Scala 2.10.
```
Exception in thread "main" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
```
This PR replace the existing `test.jar` with `test-2.10.jar` and `test-2.11.jar` and improve the regression test to use the suitable jar file.
## How was this patch tested?
Pass the existing Jenkins test.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11744 from dongjoon-hyun/SPARK-12653.
## What changes were proposed in this pull request?
This PR brings codegen support for broadcast left-semi join.
## How was this patch tested?
Existing tests. Added benchmark, the result show 7X speedup.
Author: Davies Liu <davies@databricks.com>
Closes#11742 from davies/gen_semi.
## What changes were proposed in this pull request?
Change the return type of toJson in Dataset class
## How was this patch tested?
No additional unit test required.
Author: Stavros Kontopoulos <stavros.kontopoulos@typesafe.com>
Closes#11732 from skonto/fix_toJson.
## What changes were proposed in this pull request?
Our internal code can go through SessionState.catalog and SessionState.analyzer. This brings two small benefits:
1. Reduces internal dependency on SQLContext.
2. Removes 2 public methods in Java (Java does not obey package private visibility).
More importantly, according to the design in SPARK-13485, we'd need to claim this catalog function for the user-facing public functions, rather than having an internal field.
## How was this patch tested?
Existing unit/integration test code.
Author: Reynold Xin <rxin@databricks.com>
Closes#11716 from rxin/SPARK-13893.
## What changes were proposed in this pull request?
In general it is better for internal classes to not depend on the external class (in this case SQLContext) to reduce coupling between user-facing APIs and the internal implementations. This patch removes SQLContext dependency from some internal classes such as SparkPlanner, SparkOptimizer.
As part of this patch, I also removed the following internal methods from SQLContext:
```
protected[sql] def functionRegistry: FunctionRegistry
protected[sql] def optimizer: Optimizer
protected[sql] def sqlParser: ParserInterface
protected[sql] def planner: SparkPlanner
protected[sql] def continuousQueryManager
protected[sql] def prepareForExecution: RuleExecutor[SparkPlan]
```
## How was this patch tested?
Existing unit/integration tests.
Author: Reynold Xin <rxin@databricks.com>
Closes#11712 from rxin/sqlContext-planner.
## What changes were proposed in this pull request?
This patch removes DescribeCommand's dependency on LogicalPlan. After this patch, DescribeCommand simply accepts a TableIdentifier. It minimizes the dependency, and blocks my next patch (removes SQLContext dependency from SparkPlanner).
## How was this patch tested?
Should be covered by existing unit tests and Hive compatibility tests that run describe table.
Author: Reynold Xin <rxin@databricks.com>
Closes#11710 from rxin/SPARK-13884.
This PR adds a new strategy, `FileSourceStrategy`, that can be used for planning scans of collections of files that might be partitioned or bucketed.
Compared with the existing planning logic in `DataSourceStrategy` this version has the following desirable properties:
- It removes the need to have `RDD`, `broadcastedHadoopConf` and other distributed concerns in the public API of `org.apache.spark.sql.sources.FileFormat`
- Partition column appending is delegated to the format to avoid an extra copy / devectorization when appending partition columns
- It minimizes the amount of data that is shipped to each executor (i.e. it does not send the whole list of files to every worker in the form of a hadoop conf)
- it natively supports bucketing files into partitions, and thus does not require coalescing / creating a `UnionRDD` with the correct partitioning.
- Small files are automatically coalesced into fewer tasks using an approximate bin-packing algorithm.
Currently only a testing source is planned / tested using this strategy. In follow-up PRs we will port the existing formats to this API.
A stub for `FileScanRDD` is also added, but most methods remain unimplemented.
Other minor cleanups:
- partition pruning is pushed into `FileCatalog` so both the new and old code paths can use this logic. This will also allow future implementations to use indexes or other tricks (i.e. a MySQL metastore)
- The partitions from the `FileCatalog` now propagate information about file sizes all the way up to the planner so we can intelligently spread files out.
- `Array` -> `Seq` in some internal APIs to avoid unnecessary `toArray` calls
- Rename `Partition` to `PartitionDirectory` to differentiate partitions used earlier in pruning from those where we have already enumerated the files and their sizes.
Author: Michael Armbrust <michael@databricks.com>
Closes#11646 from marmbrus/fileStrategy.
Three different things were needed to get rid of spurious warnings:
- silence deprecation warnings when cloning configuration
- change the way SparkHadoopUtil instantiates SparkConf to silence
warnings
- avoid creating new SparkConf instances where it's not needed.
On top of that, I changed the way that Logging.scala detects the repl;
now it uses a method that is overridden in the repl's Main class, and
the hack in Utils.scala is not needed anymore. This makes the 2.11 repl
behave like the 2.10 one and set the default log level to WARN, which
is a lot better. Previously, this wasn't working because the 2.11 repl
triggers log initialization earlier than the 2.10 one.
I also removed and simplified some other code in the 2.11 repl's Main
to avoid replicating logic that already exists elsewhere in Spark.
Tested the 2.11 repl in local and yarn modes.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#11510 from vanzin/SPARK-13626.
## What changes were proposed in this pull request?
This PR fixes 135 typos over 107 files:
* 121 typos in comments
* 11 typos in testcase name
* 3 typos in log messages
## How was this patch tested?
Manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11689 from dongjoon-hyun/fix_more_typos.
## What changes were proposed in this pull request?
- Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8
- Same for `InputStreamReader` and `OutputStreamWriter` constructors
- Standardizes on UTF-8 everywhere
- Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`)
- (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit 1deecd8d9c )
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes#11657 from srowen/SPARK-13823.
## What changes were proposed in this pull request?
This PR split the PhysicalRDD into two classes, PhysicalRDD and PhysicalScan. PhysicalRDD is used for DataFrames that is created from existing RDD. PhysicalScan is used for DataFrame that is created from data sources. This enable use to apply different optimization on both of them.
Also fix the problem for sameResult() on two DataSourceScan.
Also fix the equality check to toString for `In`. It's better to use Seq there, but we can't break this public API (sad).
## How was this patch tested?
Existing tests. Manually tested with TPCDS query Q59 and Q64, all those duplicated exchanges can be re-used now, also saw there are 40+% performance improvement (saving half of the scan).
Author: Davies Liu <davies@databricks.com>
Closes#11514 from davies/existing_rdd.
## What changes were proposed in this pull request?
This patch is ported over from viirya's changes in #11048. Currently for most DDLs we just pass the query text directly to Hive. Instead, we should parse these commands ourselves and in the future (not part of this patch) use the `HiveCatalog` to process these DDLs. This is a pretext to merging `SQLContext` and `HiveContext`.
Note: As of this patch we still pass the query text to Hive. The difference is that we now parse the commands ourselves so in the future we can just use our own catalog.
## How was this patch tested?
Jenkins, new `DDLCommandSuite`, which comprises of about 40% of the changes here.
Author: Andrew Or <andrew@databricks.com>
Closes#11573 from andrewor14/parser-plus-plus.
## What changes were proposed in this pull request?
PR #11443 temporarily disabled MiMA check, this PR re-enables it.
One extra change is that `object DataFrame` is also removed. The only purpose of introducing `object DataFrame` was to use it as an internal factory for creating `Dataset[Row]`. By replacing this internal factory with `Dataset.newDataFrame`, both `DataFrame` and `DataFrame$` are entirely removed from the API, so that we can simply put a `MissingClassProblem` filter in `MimaExcludes.scala` for most DataFrame API changes.
## How was this patch tested?
Tested by MiMA check triggered by Jenkins.
Author: Cheng Lian <lian@databricks.com>
Closes#11656 from liancheng/re-enable-mima.
Fix the compilation failure introduced by https://github.com/apache/spark/pull/11555 because of a merge conflict.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11648 from cloud-fan/hotbug.
## What changes were proposed in this pull request?
Add SQL generation support for window functions. The idea is simple, just treat `Window` operator like `Project`, i.e. add subquery to its child when necessary, generate a `SELECT ... FROM ...` SQL string, implement `sql` method for window related expressions, e.g. `WindowSpecDefinition`, `WindowFrame`, etc.
This PR also fixed SPARK-13720 by improving the process of adding extra `SubqueryAlias`(the `RecoverScopingInfo` rule). Before this PR, we update the qualifiers in project list while adding the subquery. However, this is incomplete as we need to update qualifiers in all ancestors that refer attributes here. In this PR, we split `RecoverScopingInfo` into 2 rules: `AddSubQuery` and `UpdateQualifier`. `AddSubQuery` only add subquery if necessary, and `UpdateQualifier` will re-propagate and update qualifiers bottom up.
Ideally we should put the bug fix part in an individual PR, but this bug also blocks the window stuff, so I put them together here.
Many thanks to gatorsmile for the initial discussion and test cases!
## How was this patch tested?
new tests in `LogicalPlanToSQLSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11555 from cloud-fan/window.
## What changes were proposed in this pull request?
This PR unifies DataFrame and Dataset by migrating existing DataFrame operations to Dataset and make `DataFrame` a type alias of `Dataset[Row]`.
Most Scala code changes are source compatible, but Java API is broken as Java knows nothing about Scala type alias (mostly replacing `DataFrame` with `Dataset<Row>`).
There are several noticeable API changes related to those returning arrays:
1. `collect`/`take`
- Old APIs in class `DataFrame`:
```scala
def collect(): Array[Row]
def take(n: Int): Array[Row]
```
- New APIs in class `Dataset[T]`:
```scala
def collect(): Array[T]
def take(n: Int): Array[T]
def collectRows(): Array[Row]
def takeRows(n: Int): Array[Row]
```
Two specialized methods `collectRows` and `takeRows` are added because Java doesn't support returning generic arrays. Thus, for example, `DataFrame.collect(): Array[T]` actually returns `Object` instead of `Array<T>` from Java side.
Normally, Java users may fall back to `collectAsList` and `takeAsList`. The two new specialized versions are added to avoid performance regression in ML related code (but maybe I'm wrong and they are not necessary here).
1. `randomSplit`
- Old APIs in class `DataFrame`:
```scala
def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame]
def randomSplit(weights: Array[Double]): Array[DataFrame]
```
- New APIs in class `Dataset[T]`:
```scala
def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
def randomSplit(weights: Array[Double]): Array[Dataset[T]]
```
Similar problem as above, but hasn't been addressed for Java API yet. We can probably add `randomSplitAsList` to fix this one.
1. `groupBy`
Some original `DataFrame.groupBy` methods have conflicting signature with original `Dataset.groupBy` methods. To distinguish these two, typed `Dataset.groupBy` methods are renamed to `groupByKey`.
Other noticeable changes:
1. Dataset always do eager analysis now
We used to support disabling DataFrame eager analysis to help reporting partially analyzed malformed logical plan on analysis failure. However, Dataset encoders requires eager analysi during Dataset construction. To preserve the error reporting feature, `AnalysisException` now takes an extra `Option[LogicalPlan]` argument to hold the partially analyzed plan, so that we can check the plan tree when reporting test failures. This plan is passed by `QueryExecution.assertAnalyzed`.
## How was this patch tested?
Existing tests do the work.
## TODO
- [ ] Fix all tests
- [ ] Re-enable MiMA check
- [ ] Update ScalaDoc (`since`, `group`, and example code)
Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Author: Cheng Lian <liancheng@users.noreply.github.com>
Closes#11443 from liancheng/ds-to-df.
## What changes were proposed in this pull request?
Since the opening curly brace, '{', has many usages as discussed in [SPARK-3854](https://issues.apache.org/jira/browse/SPARK-3854), this PR adds a ScalaStyle rule to prevent '){' pattern for the following majority pattern and fixes the code accordingly. If we enforce this in ScalaStyle from now, it will improve the Scala code quality and reduce review time.
```
// Correct:
if (true) {
println("Wow!")
}
// Incorrect:
if (true){
println("Wow!")
}
```
IntelliJ also shows new warnings based on this.
## How was this patch tested?
Pass the Jenkins ScalaStyle test.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11637 from dongjoon-hyun/SPARK-3854.
## What changes were proposed in this pull request?
This PR adds support for inferring `IsNotNull` constraints from expressions with an `!==`. More specifically, if an operator has a condition on `a !== b`, we know that both `a` and `b` in the operator output can no longer be null.
## How was this patch tested?
1. Modified a test in `ConstraintPropagationSuite` to test for expressions with an inequality.
2. Added a test in `NullFilteringSuite` for making sure an Inner join with a "non-equal" condition appropriately filters out null from their input.
cc nongli
Author: Sameer Agarwal <sameer@databricks.com>
Closes#11594 from sameeragarwal/isnotequal-constraints.
## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-13766
This PR makes the file extensions (written by internal datasource) consistent.
**Before**
- TEXT, CSV and JSON
```
[.COMPRESSION_CODEC_NAME]
```
- Parquet
```
[.COMPRESSION_CODEC_NAME].parquet
```
- ORC
```
.orc
```
**After**
- TEXT, CSV and JSON
```
.txt[.COMPRESSION_CODEC_NAME]
.csv[.COMPRESSION_CODEC_NAME]
.json[.COMPRESSION_CODEC_NAME]
```
- Parquet
```
[.COMPRESSION_CODEC_NAME].parquet
```
- ORC
```
[.COMPRESSION_CODEC_NAME].orc
```
When the compression codec is set,
- For Parquet and ORC, each still stays in Parquet and ORC format but just have compressed data internally. So, I think it is okay to name `.parquet` and `.orc` at the end.
- For Text, CSV and JSON, each does not stays in each format but it has different data format according to compression codec. So, each has the names `.json`, `.csv` and `.txt` before the compression extension.
## How was this patch tested?
Unit tests are used and `./dev/run_tests` for coding style tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#11604 from HyukjinKwon/SPARK-13766.
## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-13728https://github.com/apache/spark/pull/11509 makes the output only single ORC file.
It was 10 files but this PR writes only single file. So, this could not skip stripes in ORC by the pushed down filters.
So, this PR simply repartitions data into 10 so that the test could pass.
## How was this patch tested?
unittest and `./dev/run_tests` for code style test.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#11593 from HyukjinKwon/SPARK-13728.
## What changes were proposed in this pull request?
Analysis exception occurs while running the following query.
```
SELECT ints FROM nestedArray LATERAL VIEW explode(a.b) `a` AS `ints`
```
```
Failed to analyze query: org.apache.spark.sql.AnalysisException: cannot resolve '`ints`' given input columns: [a, `ints`]; line 1 pos 7
'Project ['ints]
+- Generate explode(a#0.b), true, false, Some(a), [`ints`#8]
+- SubqueryAlias nestedarray
+- LocalRelation [a#0], [[[[1,2,3]]]]
```
## How was this patch tested?
Added new unit tests in SQLQuerySuite and HiveQlSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#11538 from dilipbiswal/SPARK-13698.
## What changes were proposed in this pull request?
In order to make `docs/examples` (and other related code) more simple/readable/user-friendly, this PR replaces existing codes like the followings by using `diamond` operator.
```
- final ArrayList<Product2<Object, Object>> dataToWrite =
- new ArrayList<Product2<Object, Object>>();
+ final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
```
Java 7 or higher supports **diamond** operator which replaces the type arguments required to invoke the constructor of a generic class with an empty set of type parameters (<>). Currently, Spark Java code use mixed usage of this.
## How was this patch tested?
Manual.
Pass the existing tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11541 from dongjoon-hyun/SPARK-13702.
This PR replaces #9925 which had issues with CI. **Please see the original PR for any previous discussions.**
## What changes were proposed in this pull request?
Deprecate the SparkSQL column operator !== and use =!= as an alternative.
Fixes subtle issues related to operator precedence (basically, !== does not have the same priority as its logical negation, ===).
## How was this patch tested?
All currently existing tests.
Author: Jakob Odersky <jodersky@gmail.com>
Closes#11588 from jodersky/SPARK-7286.
Follow-up to #11509, that simply refactors the interface that we use when resolving a pluggable `DataSource`.
- Multiple functions share the same set of arguments so we make this a case class, called `DataSource`. Actual resolution is now done by calling a function on this class.
- Instead of having multiple methods named `apply` (some of which do writing some of which do reading) we now explicitly have `resolveRelation()` and `write(mode, df)`.
- Get rid of `Array[String]` since this is an internal API and was forcing us to awkwardly call `toArray` in a bunch of places.
Author: Michael Armbrust <michael@databricks.com>
Closes#11572 from marmbrus/dataSourceResolution.
## What changes were proposed in this pull request?
This removes the remaining deprecated Octal escape literals. The followings are the warnings on those two lines.
```
LiteralExpressionSuite.scala:99: Octal escape literals are deprecated, use \u0000 instead.
HiveQlSuite.scala:74: Octal escape literals are deprecated, use \u002c instead.
```
## How was this patch tested?
Manual.
During building, there should be no warning on `Octal escape literals`.
```
mvn -DskipTests clean install
```
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11584 from dongjoon-hyun/SPARK-13400.
## What changes were proposed in this pull request?
This PR add SQL generation support for aggregate with multi-distinct, by simply moving the `DistinctAggregationRewriter` rule to optimizer.
More discussions are needed as this breaks an import contract: analyzed plan should be able to run without optimization. However, the `ComputeCurrentTime` rule has kind of broken it already, and I think maybe we should add a new phase for this kind of rules, because strictly speaking they don't belong to analysis and is coupled with the physical plan implementation.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11579 from cloud-fan/distinct.
## What changes were proposed in this pull request?
When we add more DDL parsing logic in the future, SparkQl will become very big. To keep it smaller, we'll introduce helper "parser objects", e.g. one to parse alter table commands. However, these parser objects will need to access some helper methods that exist in CatalystQl. The proposal is to move those methods to an isolated ParserUtils object.
This is based on viirya's changes in #11048. It prefaces the bigger fix for SPARK-13139 to make the diff of that patch smaller.
## How was this patch tested?
No change in functionality, so just Jenkins.
Author: Andrew Or <andrew@databricks.com>
Closes#11529 from andrewor14/parser-utils.
## What changes were proposed in this pull request?
Adding the hive-cli classes to the classloader
## How was this patch tested?
The hive Versionssuite tests were run
This is my original work and I license the work to the project under the project's open source license.
Author: Tim Preece <tim.preece.in.oz@gmail.com>
Closes#11495 from preecet/master.
`HadoopFsRelation` is used for reading most files into Spark SQL. However today this class mixes the concerns of file management, schema reconciliation, scan building, bucketing, partitioning, and writing data. As a result, many data sources are forced to reimplement the same functionality and the various layers have accumulated a fair bit of inefficiency. This PR is a first cut at separating this into several components / interfaces that are each described below. Additionally, all implementations inside of Spark (parquet, csv, json, text, orc, svmlib) have been ported to the new API `FileFormat`. External libraries, such as spark-avro will also need to be ported to work with Spark 2.0.
### HadoopFsRelation
A simple `case class` that acts as a container for all of the metadata required to read from a datasource. All discovery, resolution and merging logic for schemas and partitions has been removed. This an internal representation that no longer needs to be exposed to developers.
```scala
case class HadoopFsRelation(
sqlContext: SQLContext,
location: FileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String]) extends BaseRelation
```
### FileFormat
The primary interface that will be implemented by each different format including external libraries. Implementors are responsible for reading a given format and converting it into `InternalRow` as well as writing out an `InternalRow`. A format can optionally return a schema that is inferred from a set of files.
```scala
trait FileFormat {
def inferSchema(
sqlContext: SQLContext,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType]
def prepareWrite(
sqlContext: SQLContext,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory
def buildInternalScan(
sqlContext: SQLContext,
dataSchema: StructType,
requiredColumns: Array[String],
filters: Array[Filter],
bucketSet: Option[BitSet],
inputFiles: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration],
options: Map[String, String]): RDD[InternalRow]
}
```
The current interface is based on what was required to get all the tests passing again, but still mixes a couple of concerns (i.e. `bucketSet` is passed down to the scan instead of being resolved by the planner). Additionally, scans are still returning `RDD`s instead of iterators for single files. In a future PR, bucketing should be removed from this interface and the scan should be isolated to a single file.
### FileCatalog
This interface is used to list the files that make up a given relation, as well as handle directory based partitioning.
```scala
trait FileCatalog {
def paths: Seq[Path]
def partitionSpec(schema: Option[StructType]): PartitionSpec
def allFiles(): Seq[FileStatus]
def getStatus(path: Path): Array[FileStatus]
def refresh(): Unit
}
```
Currently there are two implementations:
- `HDFSFileCatalog` - based on code from the old `HadoopFsRelation`. Infers partitioning by recursive listing and caches this data for performance
- `HiveFileCatalog` - based on the above, but it uses the partition spec from the Hive Metastore.
### ResolvedDataSource
Produces a logical plan given the following description of a Data Source (which can come from DataFrameReader or a metastore):
- `paths: Seq[String] = Nil`
- `userSpecifiedSchema: Option[StructType] = None`
- `partitionColumns: Array[String] = Array.empty`
- `bucketSpec: Option[BucketSpec] = None`
- `provider: String`
- `options: Map[String, String]`
This class is responsible for deciding which of the Data Source APIs a given provider is using (including the non-file based ones). All reconciliation of partitions, buckets, schema from metastores or inference is done here.
### DataSourceAnalysis / DataSourceStrategy
Responsible for analyzing and planning reading/writing of data using any of the Data Source APIs, including:
- pruning the files from partitions that will be read based on filters.
- appending partition columns*
- applying additional filters when a data source can not evaluate them internally.
- constructing an RDD that is bucketed correctly when required*
- sanity checking schema match-up and other analysis when writing.
*In the future we should do that following:
- Break out file handling into its own Strategy as its sufficiently complex / isolated.
- Push the appending of partition columns down in to `FileFormat` to avoid an extra copy / unvectorization.
- Use a custom RDD for scans instead of `SQLNewNewHadoopRDD2`
Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11509 from marmbrus/fileDataSource.
## What changes were proposed in this pull request?
This PR adds an optimizer rule to eliminate reading (unnecessary) NULL values if they are not required for correctness by inserting `isNotNull` filters is the query plan. These filters are currently inserted beneath existing `Filter` and `Join` operators and are inferred based on their data constraints.
Note: While this optimization is applicable to all types of join, it primarily benefits `Inner` and `LeftSemi` joins.
## How was this patch tested?
1. Added a new `NullFilteringSuite` that tests for `IsNotNull` filters in the query plan for joins and filters. Also, tests interaction with the `CombineFilters` optimizer rules.
2. Test generated ExpressionTrees via `OrcFilterSuite`
3. Test filter source pushdown logic via `SimpleTextHadoopFsRelationSuite`
cc yhuai nongli
Author: Sameer Agarwal <sameer@databricks.com>
Closes#11372 from sameeragarwal/gen-isnotnull.
## What changes were proposed in this pull request?
```
Seq(("id1", "value1")).toDF("key", "value").registerTempTable("src")
sqlContext.sql("SELECT t1.* FROM src LATERAL VIEW explode(map('key1', 100, 'key2', 200)) t1 AS key, value")
```
Results in following logical plan
```
Project [key#2,value#3]
+- Generate explode(HiveGenericUDF#org.apache.hadoop.hive.ql.udf.generic.GenericUDFMap(key1,100,key2,200)), true, false, Some(genoutput), [key#2,value#3]
+- SubqueryAlias src
+- Project [_1#0 AS key#2,_2#1 AS value#3]
+- LocalRelation [_1#0,_2#1], [[id1,value1]]
```
The above query fails with following runtime error.
```
java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:221)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:42)
at org.apache.spark.sql.execution.Generate$$anonfun$doExecute$1$$anonfun$apply$9.apply(Generate.scala:98)
at org.apache.spark.sql.execution.Generate$$anonfun$doExecute$1$$anonfun$apply$9.apply(Generate.scala:96)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
<stack-trace omitted.....>
```
In this case the generated outputs are wrongly resolved from its child (LocalRelation) due to
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L537-L548
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Added unit tests in hive/SQLQuerySuite and AnalysisSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#11497 from dilipbiswal/spark-13651.
## What changes were proposed in this pull request?
Today we have `analysis.Catalog` and `catalog.Catalog`. In the future the former will call the latter. When that happens, if both of them are still called `Catalog` it will be very confusing. This patch renames the latter `ExternalCatalog` because it is expected to talk to external systems.
## How was this patch tested?
Jenkins.
Author: Andrew Or <andrew@databricks.com>
Closes#11526 from andrewor14/rename-catalog.
This reverts commit f87ce0504e.
According to discussion in #11466, let's revert PR #11466 for safe.
Author: Cheng Lian <lian@databricks.com>
Closes#11539 from liancheng/revert-pr-11466.
#### What changes were proposed in this pull request?
This PR is for supporting SQL generation for cube, rollup and grouping sets.
For example, a query using rollup:
```SQL
SELECT count(*) as cnt, key % 5, grouping_id() FROM t1 GROUP BY key % 5 WITH ROLLUP
```
Original logical plan:
```
Aggregate [(key#17L % cast(5 as bigint))#47L,grouping__id#46],
[(count(1),mode=Complete,isDistinct=false) AS cnt#43L,
(key#17L % cast(5 as bigint))#47L AS _c1#45L,
grouping__id#46 AS _c2#44]
+- Expand [List(key#17L, value#18, (key#17L % cast(5 as bigint))#47L, 0),
List(key#17L, value#18, null, 1)],
[key#17L,value#18,(key#17L % cast(5 as bigint))#47L,grouping__id#46]
+- Project [key#17L,
value#18,
(key#17L % cast(5 as bigint)) AS (key#17L % cast(5 as bigint))#47L]
+- Subquery t1
+- Relation[key#17L,value#18] ParquetRelation
```
Converted SQL:
```SQL
SELECT count( 1) AS `cnt`,
(`t1`.`key` % CAST(5 AS BIGINT)),
grouping_id() AS `_c2`
FROM `default`.`t1`
GROUP BY (`t1`.`key` % CAST(5 AS BIGINT))
GROUPING SETS (((`t1`.`key` % CAST(5 AS BIGINT))), ())
```
#### How was the this patch tested?
Added eight test cases in `LogicalPlanToSQLSuite`.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#11283 from gatorsmile/groupingSetsToSQL.
## What changes were proposed in this pull request?
This patch simply moves things to existing package `o.a.s.sql.catalyst.parser` in an effort to reduce the size of the diff in #11048. This is conceptually the same as a recently merged patch #11482.
## How was this patch tested?
Jenkins.
Author: Andrew Or <andrew@databricks.com>
Closes#11506 from andrewor14/parser-package.
Earlier fix did not copy the bytes and it is possible for higher level to reuse Text object. This was causing issues. Proposed fix now copies the bytes from Text. This still avoids the expensive encoding/decoding
Author: Rajesh Balamohan <rbalamohan@apache.org>
Closes#11477 from rajeshbalamohan/SPARK-12925.2.
## What changes were proposed in this pull request?
This is support SQL generation for subquery expressions, which will be replaced to a SubqueryHolder inside SQLBuilder recursively.
## How was this patch tested?
Added unit tests.
Author: Davies Liu <davies@databricks.com>
Closes#11453 from davies/sql_subquery.
## What changes were proposed in this pull request?
Fix race conditions when cleanup files.
## How was this patch tested?
Existing tests.
Author: Davies Liu <davies@databricks.com>
Closes#11507 from davies/flaky.
## What changes were proposed in this pull request?
This patch simply moves things to a new package in an effort to reduce the size of the diff in #11048. Currently the new package only has one file, but in the future we'll add many new commands in SPARK-13139.
## How was this patch tested?
Jenkins.
Author: Andrew Or <andrew@databricks.com>
Closes#11482 from andrewor14/commands-package.
## What changes were proposed in this pull request?
This PR adds the support to specify compression codecs for both ORC and Parquet.
## How was this patch tested?
unittests within IDE and code style tests with `dev/run_tests`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#11464 from HyukjinKwon/SPARK-13543.
## What changes were proposed in this pull request?
After SPARK-6990, `dev/lint-java` keeps Java code healthy and helps PR review by saving much time.
This issue aims remove unused imports from Java/Scala code and add `UnusedImports` checkstyle rule to help developers.
## How was this patch tested?
```
./dev/lint-java
./build/sbt compile
```
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11438 from dongjoon-hyun/SPARK-13583.
## What changes were proposed in this pull request?
Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly:
- Inner class should be static
- Mismatched hashCode/equals
- Overflow in compareTo
- Unchecked warnings
- Misuse of assert, vs junit.assert
- get(a) + getOrElse(b) -> getOrElse(a,b)
- Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions
- Dead code
- tailrec
- exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count
- reduce(_+_) -> sum map + flatten -> map
The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places.
## How was the this patch tested?
Existing Jenkins unit tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#11292 from srowen/SPARK-13423.
JIRA: https://issues.apache.org/jira/browse/SPARK-13616
## What changes were proposed in this pull request?
It is possibly that a logical plan has been removed `Project` from the top of it. Or the plan doesn't has a top `Project` from the beginning because it is not necessary. Currently the `SQLBuilder` can't convert such plans back to SQL. This change is to add this feature.
## How was this patch tested?
A test is added to `LogicalPlanToSQLSuite`.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#11466 from viirya/sqlbuilder-notopselect.
## What changes were proposed in this pull request?
In order to tell OutputStream that the task has failed or not, we should call the failure callbacks BEFORE calling writer.close().
## How was this patch tested?
Added new unit tests.
Author: Davies Liu <davies@databricks.com>
Closes#11450 from davies/callback.
#### What changes were proposed in this pull request?
```SQL
FROM
(FROM test SELECT TRANSFORM(key, value) USING 'cat' AS (`thing1` int, thing2 string)) t
SELECT thing1 + 1
```
This query returns an analysis error, like:
```
Failed to analyze query: org.apache.spark.sql.AnalysisException: cannot resolve '`thing1`' given input columns: [`thing1`, thing2]; line 3 pos 7
'Project [unresolvedalias(('thing1 + 1), None)]
+- SubqueryAlias t
+- ScriptTransformation [key#2,value#3], cat, [`thing1`#6,thing2#7], HiveScriptIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim, )),List((field.delim, )),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),false)
+- SubqueryAlias test
+- Project [_1#0 AS key#2,_2#1 AS value#3]
+- LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3],[4,4],[5,5]]
```
The backpacks of \`thing1\` should be cleaned before entering Parser/Analyzer. This PR fixes this issue.
#### How was this patch tested?
Added a test case and modified an existing test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11415 from gatorsmile/scriptTransform.
JIRA: https://issues.apache.org/jira/browse/SPARK-13537
## What changes were proposed in this pull request?
In readBytes of VectorizedPlainValuesReader, we use buffer[offset] to access bytes in buffer. It is incorrect because offset is added with Platform.BYTE_ARRAY_OFFSET when initialization. We should fix it.
## How was this patch tested?
`ParquetHadoopFsRelationSuite` sometimes (depending on the randomly generated data) will be [failed](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52136/consoleFull) by this bug. After applying this, the test can be passed.
I added a test to `ParquetHadoopFsRelationSuite` with the data which will fail without this patch.
The error exception:
[info] ParquetHadoopFsRelationSuite:
[info] - test all data types - StringType (440 milliseconds)
[info] - test all data types - BinaryType (434 milliseconds)
[info] - test all data types - BooleanType (406 milliseconds)
20:59:38.618 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 2597.0 (TID 67966)
java.lang.ArrayIndexOutOfBoundsException: 46
at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBytes(VectorizedPlainValuesReader.java:88)
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#11418 from viirya/fix-readbytes.
## What changes were proposed in this pull request?
This creates a `SessionState`, which groups a few fields that existed in `SQLContext`. Because `HiveContext` extends `SQLContext` we also need to make changes there. This is mainly a cleanup task that will soon pave the way for merging the two contexts.
## How was this patch tested?
Existing unit tests; this patch introduces no change in behavior.
Author: Andrew Or <andrew@databricks.com>
Closes#11405 from andrewor14/refactor-session.
## What changes were proposed in this pull request?
This is another try of PR #11323.
This PR removes DataFrame RDD operations except for `foreach` and `foreachPartitions` (they are actions rather than transformations). Original calls are now replaced by calls to methods of `DataFrame.rdd`.
PR #11323 was reverted because it introduced a regression: both `DataFrame.foreach` and `DataFrame.foreachPartitions` wrap underlying RDD operations with `withNewExecutionId` to track Spark jobs. But they are removed in #11323.
## How was the this patch tested?
No extra tests are added. Existing tests should do the work.
Author: Cheng Lian <lian@databricks.com>
Closes#11388 from liancheng/remove-df-rdd-ops.
## Motivation
As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults.
## Changes
### BlockInfoManager and reader/writer locks
This patch adds block-level read/write locks to the BlockManager. It introduces a new `BlockInfoManager` component, which is contained within the `BlockManager`, holds the `BlockInfo` objects that the `BlockManager` uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes.
`BlockManager`'s `get*()` and `put*()` methods now implicitly acquire the necessary locks. After a `get()` call successfully retrieves a block, that block is locked in a shared read mode. A `put()` call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. This `put()` locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplify `CacheManager` in the future (see #10748).
See `BlockInfoManagerSuite`'s test cases for a more detailed specification of the locking semantics.
### Auto-release of locks at the end of tasks
Our locking APIs support explicit release of locks (by calling `unlock()`), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicit `close()` operator to signal that no more records will be consumed, operations like `take()` or `limit()` don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task.
To address this, `BlockInfoManager` uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it from `TaskContext`. When a task finishes, code in `Executor` calls `BlockInfoManager.unlockAllLocksForTask(taskAttemptId)` to free locks.
### Locking and the MemoryStore
In order to prevent in-memory blocks from being evicted while they are being read, the `MemoryStore`'s `evictBlocksToFreeSpace()` method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed.
### Locking and remote block transfer
This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers.
## FAQ
- **Why not use Java's built-in [`ReadWriteLock`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html)?**
Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using `ReadWriteLock` would mean that we might call `unlock()` from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to use [`StampedLock`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html) to work around this issue.
- **Why not detect "leaked" locks in tests?**:
See above notes about `take()` and `limit`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10705 from JoshRosen/pin-pages.
## What changes were proposed in this pull request?
This PR removes DataFrame RDD operations. Original calls are now replaced by calls to methods of `DataFrame.rdd`.
## How was the this patch tested?
No extra tests are added. Existing tests should do the work.
Author: Cheng Lian <lian@databricks.com>
Closes#11323 from liancheng/remove-df-rdd-ops.
## What changes were proposed in this pull request?
This patch moves SQLConf into org.apache.spark.sql.internal package to make it very explicit that it is internal. Soon I will also submit more API work that creates implementations of interfaces in this internal package.
## How was this patch tested?
If it compiles, then the refactoring should work.
Author: Reynold Xin <rxin@databricks.com>
Closes#11363 from rxin/SPARK-13486.
Some parts of the engine rely on UnsafeRow which the vectorized parquet scanner does not want
to produce. This add a conversion in Physical RDD. In the case where codegen is used (and the
scan is the start of the pipeline), there is no requirement to use UnsafeRow. This patch adds
update PhysicallRDD to support codegen, which eliminates the need for the UnsafeRow conversion
in all cases.
The result of these changes for TPCDS-Q19 at the 10gb sf reduces the query time from 9.5 seconds
to 6.5 seconds.
Author: Nong Li <nong@databricks.com>
Closes#11141 from nongli/spark-13250.
## What changes were proposed in this pull request?
`HiveCompatibilitySuite` should still run in PR build even if a PR only changes sql/core. So, I am going to remove `ExtendedHiveTest` annotation from `HiveCompatibilitySuite`.
https://issues.apache.org/jira/browse/SPARK-13475
Author: Yin Huai <yhuai@databricks.com>
Closes#11351 from yhuai/SPARK-13475.
## What changes were proposed in this pull request?
Since "[SPARK-13321][SQL] Support nested UNION in parser" is reverted, we need to disable the test case that requires this PR. Thanks!
rxin yhuai marmbrus
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11352 from gatorsmile/disableTestCase.
## What changes were proposed in this pull request?
Generates code for SortMergeJoin.
## How was the this patch tested?
Unit tests and manually tested with TPCDS Q72, which showed 70% performance improvements (from 42s to 25s), but micro benchmark only show minor improvements, it may depends the distribution of data and number of columns.
Author: Davies Liu <davies@databricks.com>
Closes#11248 from davies/gen_smj.
This PR is to implement SQL generation for the following three set operations:
- Union Distinct
- Intersect
- Except
liancheng Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#11195 from gatorsmile/setOpSQLGen.
#### What changes were proposed in this pull request?
Ensure that all built-in expressions can be mapped to its SQL representation if there is one (e.g. ScalaUDF doesn't have a SQL representation). The function lists are from the expression list in `FunctionRegistry`.
window functions, grouping sets functions (`cube`, `rollup`, `grouping`, `grouping_id`), generator functions (`explode` and `json_tuple`) are covered by separate JIRA and PRs. Thus, this PR does not cover them. Except these functions, all the built-in expressions are covered. For details, see the list in `ExpressionToSQLSuite`.
Fixed a few issues. For example, the `prettyName` of `approx_count_distinct` is not right. The `sql` of `hash` function is not right, since the `hash` function does not accept `seed`.
Additionally, also correct the order of expressions in `FunctionRegistry` so that people are easier to find which functions are missing.
cc liancheng
#### How was the this patch tested?
Added two test cases in LogicalPlanToSQLSuite for covering `not like` and `not in`.
Added a new test suite `ExpressionToSQLSuite` to cover the functions:
1. misc non-aggregate functions + complex type creators + null expressions
2. math functions
3. aggregate functions
4. string functions
5. date time functions + calendar interval
6. collection functions
7. misc functions
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11314 from gatorsmile/expressionToSQL.
In SparkSQLCLI, we have created a `CliSessionState`, but then we call `SparkSQLEnv.init()`, which will start another `SessionState`. This would lead to exception because `processCmd` need to get the `CliSessionState` instance by calling `SessionState.get()`, but the return value would be a instance of `SessionState`. See the exception below.
spark-sql> !echo "test";
Exception in thread "main" java.lang.ClassCastException: org.apache.hadoop.hive.ql.session.SessionState cannot be cast to org.apache.hadoop.hive.cli.CliSessionState
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:112)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:301)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:242)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:691)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#9589 from adrian-wang/clicommand.
## What changes were proposed in this pull request?
This is a step towards merging `SQLContext` and `HiveContext`. A new internal Catalog API was introduced in #10982 and extended in #11069. This patch introduces an implementation of this API using `HiveClient`, an existing interface to Hive. It also extends `HiveClient` with additional calls to Hive that are needed to complete the catalog implementation.
*Where should I start reviewing?* The new catalog introduced is `HiveCatalog`. This class is relatively simple because it just calls `HiveClientImpl`, where most of the new logic is. I would not start with `HiveClient`, `HiveQl`, or `HiveMetastoreCatalog`, which are modified mainly because of a refactor.
*Why is this patch so big?* I had to refactor HiveClient to remove an intermediate representation of databases, tables, partitions etc. After this refactor `CatalogTable` convert directly to and from `HiveTable` (etc.). Otherwise we would have to first convert `CatalogTable` to the intermediate representation and then convert that to HiveTable, which is messy.
The new class hierarchy is as follows:
```
org.apache.spark.sql.catalyst.catalog.Catalog
- org.apache.spark.sql.catalyst.catalog.InMemoryCatalog
- org.apache.spark.sql.hive.HiveCatalog
```
Note that, as of this patch, none of these classes are currently used anywhere yet. This will come in the future before the Spark 2.0 release.
## How was the this patch tested?
All existing unit tests, and HiveCatalogSuite that extends CatalogTestCases.
Author: Andrew Or <andrew@databricks.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#11293 from rxin/hive-catalog.
Quite a few Spark SQL join operators broadcast one side of the join to all nodes. The are a few problems with this:
- This conflates broadcasting (a data exchange) with joining. Data exchanges should be managed by a different operator.
- All these nodes implement their own (duplicate) broadcasting logic.
- Re-use of indices is quite hard.
This PR defines both a ```BroadcastDistribution``` and ```BroadcastPartitioning```, these contain a `BroadcastMode`. The `BroadcastMode` defines the way in which we transform the Array of `InternalRow`'s into an index. We currently support the following `BroadcastMode`'s:
- IdentityBroadcastMode: This broadcasts the rows in their original form.
- HashSetBroadcastMode: This applies a projection to the input rows, deduplicates these rows and broadcasts the resulting `Set`.
- HashedRelationBroadcastMode: This transforms the input rows into a `HashedRelation`, and broadcasts this index.
To match this distribution we implement a ```BroadcastExchange``` operator which will perform the broadcast for us, and have ```EnsureRequirements``` plan this operator. The old Exchange operator has been renamed into ShuffleExchange in order to clearly separate between Shuffled and Broadcasted exchanges. Finally the classes in Exchange.scala have been moved to a dedicated package.
cc rxin davies
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#11083 from hvanhovell/SPARK-13136.
## What changes were proposed in this pull request?
This patch renames logical.Subquery to logical.SubqueryAlias, which is a more appropriate name for this operator (versus subqueries as expressions).
## How was the this patch tested?
Unit tests.
Author: Reynold Xin <rxin@databricks.com>
Closes#11288 from rxin/SPARK-13420.
This PR introduces several major changes:
1. Replacing `Expression.prettyString` with `Expression.sql`
The `prettyString` method is mostly an internal, developer faced facility for debugging purposes, and shouldn't be exposed to users.
1. Using SQL-like representation as column names for selected fields that are not named expression (back-ticks and double quotes should be removed)
Before, we were using `prettyString` as column names when possible, and sometimes the result column names can be weird. Here are several examples:
Expression | `prettyString` | `sql` | Note
------------------ | -------------- | ---------- | ---------------
`a && b` | `a && b` | `a AND b` |
`a.getField("f")` | `a[f]` | `a.f` | `a` is a struct
1. Adding trait `NonSQLExpression` extending from `Expression` for expressions that don't have a SQL representation (e.g. Scala UDF/UDAF and Java/Scala object expressions used for encoders)
`NonSQLExpression.sql` may return an arbitrary user facing string representation of the expression.
Author: Cheng Lian <lian@databricks.com>
Closes#10757 from liancheng/spark-12799.simplify-expression-string-methods.
```scala
// case 1: missing sort columns are resolvable if join is true
sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c")
// case 2: missing sort columns are not resolvable if join is false. Thus, issue an error message in this case
sql("SELECT explode(a) AS val FROM data order by val, c")
```
When sort columns are not in `Generate`, we can resolve them when `join` is equal to `true`. Still trying to add more test cases for the other `UnaryNode` types.
Could you review the changes? davies cloud-fan Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11198 from gatorsmile/missingInSort.
Using GroupingSets will generate a wrong result when Aggregate Functions containing GroupBy columns.
This PR is to fix it. Since the code changes are very small. Maybe we also can merge it to 1.6
For example, the following query returns a wrong result:
```scala
sql("select course, sum(earnings) as sum from courseSales group by course, earnings" +
" grouping sets((), (course), (course, earnings))" +
" order by course, sum").show()
```
Before the fix, the results are like
```
[null,null]
[Java,null]
[Java,20000.0]
[Java,30000.0]
[dotNET,null]
[dotNET,5000.0]
[dotNET,10000.0]
[dotNET,48000.0]
```
After the fix, the results become correct:
```
[null,113000.0]
[Java,20000.0]
[Java,30000.0]
[Java,50000.0]
[dotNET,5000.0]
[dotNET,10000.0]
[dotNET,48000.0]
[dotNET,63000.0]
```
UPDATE: This PR also deprecated the external column: GROUPING__ID.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11100 from gatorsmile/groupingSets.
This patch adds a new optimizer rule for performing limit pushdown. Limits will now be pushed down in two cases:
- If a limit is on top of a `UNION ALL` operator, then a partition-local limit operator will be pushed to each of the union operator's children.
- If a limit is on top of an `OUTER JOIN` then a partition-local limit will be pushed to one side of the join. For `LEFT OUTER` and `RIGHT OUTER` joins, the limit will be pushed to the left and right side, respectively. For `FULL OUTER` join, we will only push limits when at most one of the inputs is already limited: if one input is limited we will push a smaller limit on top of it and if neither input is limited then we will limit the input which is estimated to be larger.
These optimizations were proposed previously by gatorsmile in #10451 and #10454, but those earlier PRs were closed and deferred for later because at that time Spark's physical `Limit` operator would trigger a full shuffle to perform global limits so there was a chance that pushdowns could actually harm performance by causing additional shuffles/stages. In #7334, we split the `Limit` operator into separate `LocalLimit` and `GlobalLimit` operators, so we can now push down only local limits (which don't require extra shuffles). This patch is based on both of gatorsmile's patches, with changes and simplifications due to partition-local-limiting.
When we push down the limit, we still keep the original limit in place, so we need a mechanism to ensure that the optimizer rule doesn't keep pattern-matching once the limit has been pushed down. In order to handle this, this patch adds a `maxRows` method to `SparkPlan` which returns the maximum number of rows that the plan can compute, then defines the pushdown rules to only push limits to children if the children's maxRows are greater than the limit's maxRows. This idea is carried over from #10451; see that patch for additional discussion.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11121 from JoshRosen/limit-pushdown-2.
This pull request has the following changes:
1. Moved UserDefinedFunction into expressions package. This is more consistent with how we structure the packages for window functions and UDAFs.
2. Moved UserDefinedPythonFunction into execution.python package, so we don't have a random private class in the top level sql package.
3. Move everything in execution/python.scala into the newly created execution.python package.
Most of the diffs are just straight copy-paste.
Author: Reynold Xin <rxin@databricks.com>
Closes#11181 from rxin/SPARK-13296.
Previously we were using Option[String] and None to indicate the case when Spark fails to generate SQL. It is easier to just use exceptions to propagate error cases, rather than having for comprehension everywhere. I also introduced a "build" function that simplifies string concatenation (i.e. no need to reason about whether we have an extra space or not).
Author: Reynold Xin <rxin@databricks.com>
Closes#11171 from rxin/SPARK-13282.
The current implementation of ResolveSortReferences can only push one missing attributes into it's child, it failed to analyze TPCDS Q98, because of there are two missing attributes in that (one from Window, another from Aggregate).
Author: Davies Liu <davies@databricks.com>
Closes#11153 from davies/resolve_sort.
For lots of SQL operators, we have metrics for both of input and output, the number of input rows should be exactly the number of output rows of child, we could only have metrics for output rows.
After we improved the performance using whole stage codegen, the overhead of SQL metrics are not trivial anymore, we should avoid that if it's not necessary.
This PR remove all the SQL metrics for number of input rows, add SQL metric of number of output rows for all LeafNode. All remove the SQL metrics from those operators that have the same number of rows from input and output (for example, Projection, we may don't need that).
The new SQL UI will looks like:
![metrics](https://cloud.githubusercontent.com/assets/40902/12965227/63614e5e-d009-11e5-88b3-84fea04f9c20.png)
Author: Davies Liu <davies@databricks.com>
Closes#11163 from davies/remove_metrics.
Grouping() returns a column is aggregated or not, grouping_id() returns the aggregation levels.
grouping()/grouping_id() could be used with window function, but does not work in having/sort clause, will be fixed by another PR.
The GROUPING__ID/grouping_id() in Hive is wrong (according to docs), we also did it wrongly, this PR change that to match the behavior in most databases (also the docs of Hive).
Author: Davies Liu <davies@databricks.com>
Closes#10677 from davies/grouping.
This PR addresses two issues:
- Self join does not work in SQL Generation
- When creating new instances for `LogicalRelation`, `metastoreTableIdentifier` is lost.
liancheng Could you please review the code changes? Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11084 from gatorsmile/selfJoinInSQLGen.
Some analysis rules generate aliases or auxiliary attribute references with the same name but different expression IDs. For example, `ResolveAggregateFunctions` introduces `havingCondition` and `aggOrder`, and `DistinctAggregationRewriter` introduces `gid`.
This is OK for normal query execution since these attribute references get expression IDs. However, it's troublesome when converting resolved query plans back to SQL query strings since expression IDs are erased.
Here's an example Spark 1.6.0 snippet for illustration:
```scala
sqlContext.range(10).select('id as 'a, 'id as 'b).registerTempTable("t")
sqlContext.sql("SELECT SUM(a) FROM t GROUP BY a, b ORDER BY COUNT(a), COUNT(b)").explain(true)
```
The above code produces the following resolved plan:
```
== Analyzed Logical Plan ==
_c0: bigint
Project [_c0#101L]
+- Sort [aggOrder#102L ASC,aggOrder#103L ASC], true
+- Aggregate [a#47L,b#48L], [(sum(a#47L),mode=Complete,isDistinct=false) AS _c0#101L,(count(a#47L),mode=Complete,isDistinct=false) AS aggOrder#102L,(count(b#48L),mode=Complete,isDistinct=false) AS aggOrder#103L]
+- Subquery t
+- Project [id#46L AS a#47L,id#46L AS b#48L]
+- LogicalRDD [id#46L], MapPartitionsRDD[44] at range at <console>:26
```
Here we can see that both aggregate expressions in `ORDER BY` are extracted into an `Aggregate` operator, and both of them are named `aggOrder` with different expression IDs.
The solution is to automatically add the expression IDs into the attribute name for the Alias and AttributeReferences that are generated by Analyzer in SQL Generation.
In this PR, it also resolves another issue. Users could use the same name as the internally generated names. The duplicate names should not cause name ambiguity. When resolving the column, Catalyst should not pick the column that is internally generated.
Could you review the solution? marmbrus liancheng
I did not set the newly added flag for all the alias and attribute reference generated by Analyzers. Please let me know if I should do it? Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11050 from gatorsmile/namingConflicts.
The patch for SPARK-8964 ("use Exchange to perform shuffle in Limit" / #7334) inadvertently broke the planning of the TakeOrderedAndProject operator: because ReturnAnswer was the new root of the query plan, the TakeOrderedAndProject rule was unable to match before BasicOperators.
This patch fixes this by moving the `TakeOrderedAndCollect` and `CollectLimit` rules into the same strategy.
In addition, I made changes to the TakeOrderedAndProject operator in order to make its `doExecute()` method lazy and added a new TakeOrderedAndProjectSuite which tests the new code path.
/cc davies and marmbrus for review.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11145 from JoshRosen/take-ordered-and-project-fix.
WIP: running tests. Code needs a bit of clean up.
This patch completes the vectorized decoding with the goal of passing the existing
tests. There is still more patches to support the rest of the format spec, even
just for flat schemas.
This patch adds a new flag to enable the vectorized decoding. Tests were updated
to try with both modes where applicable.
Once this is working well, we can remove the previous code path.
Author: Nong Li <nong@databricks.com>
Closes#11055 from nongli/spark-12992-2.
JIRA: https://issues.apache.org/jira/browse/SPARK-12850
This PR is to support bucket pruning when the predicates are `EqualTo`, `EqualNullSafe`, `IsNull`, `In`, and `InSet`.
Like HIVE, in this PR, the bucket pruning works when the bucketing key has one and only one column.
So far, I do not find a way to verify how many buckets are actually scanned. However, I did verify it when doing the debug. Could you provide a suggestion how to do it properly? Thank you! cloud-fan yhuai rxin marmbrus
BTW, we can add more cases to support complex predicate including `Or` and `And`. Please let me know if I should do it in this PR.
Maybe we also need to add test cases to verify if bucket pruning works well for each data type.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10942 from gatorsmile/pruningBuckets.
Spark SQL should collapse adjacent `Repartition` operators and only keep the last one.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11064 from JoshRosen/collapse-repartition.
As benchmarked and discussed here: https://github.com/apache/spark/pull/10786/files#r50038294, benefits from codegen, the declarative aggregate function could be much faster than imperative one.
Author: Davies Liu <davies@databricks.com>
Closes#10960 from davies/stddev.
It is not valid to call `toAttribute` on a `NamedExpression` unless we know for sure that the child produced that `NamedExpression`. The current code worked fine when the grouping expressions were simple, but when they were a derived value this blew up at execution time.
Author: Michael Armbrust <michael@databricks.com>
Closes#11013 from marmbrus/groupByFunction-master.
JIRA: https://issues.apache.org/jira/browse/SPARK-12705
**Scope:**
This PR is a general fix for sorting reference resolution when the child's `outputSet` does not have the order-by attributes (called, *missing attributes*):
- UnaryNode support is limited to `Project`, `Window`, `Aggregate`, `Distinct`, `Filter`, `RepartitionByExpression`.
- We will not try to resolve the missing references inside a subquery, unless the outputSet of this subquery contains it.
**General Reference Resolution Rules:**
- Jump over the nodes with the following types: `Distinct`, `Filter`, `RepartitionByExpression`. Do not need to add missing attributes. The reason is their `outputSet` is decided by their `inputSet`, which is the `outputSet` of their children.
- Group-by expressions in `Aggregate`: missing order-by attributes are not allowed to be added into group-by expressions since it will change the query result. Thus, in RDBMS, it is not allowed.
- Aggregate expressions in `Aggregate`: if the group-by expressions in `Aggregate` contains the missing attributes but aggregate expressions do not have it, just add them into the aggregate expressions. This can resolve the analysisExceptions thrown by the three TCPDS queries.
- `Project` and `Window` are special. We just need to add the missing attributes to their `projectList`.
**Implementation:**
1. Traverse the whole tree in a pre-order manner to find all the resolvable missing order-by attributes.
2. Traverse the whole tree in a post-order manner to add the found missing order-by attributes to the node if their `inputSet` contains the attributes.
3. If the origins of the missing order-by attributes are different nodes, each pass only resolves the missing attributes that are from the same node.
**Risk:**
Low. This rule will be trigger iff ```!s.resolved && child.resolved``` is true. Thus, very few cases are affected.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10678 from gatorsmile/sortWindows.
This patch changes Spark's build to make Scala 2.11 the default Scala version. To be clear, this does not mean that Spark will stop supporting Scala 2.10: users will still be able to compile Spark for Scala 2.10 by following the instructions on the "Building Spark" page; however, it does mean that Scala 2.11 will be the default Scala version used by our CI builds (including pull request builds).
The Scala 2.11 compiler is faster than 2.10, so I think we'll be able to look forward to a slight speedup in our CI builds (it looks like it's about 2X faster for the Maven compile-only builds, for instance).
After this patch is merged, I'll update Jenkins to add new compile-only jobs to ensure that Scala 2.10 compilation doesn't break.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10608 from JoshRosen/SPARK-6363.
And ClientWrapper -> HiveClientImpl.
I have some followup pull requests to introduce a new internal catalog, and I think this new naming reflects better the functionality of the two classes.
Author: Reynold Xin <rxin@databricks.com>
Closes#10981 from rxin/SPARK-13076.
JIRA: https://issues.apache.org/jira/browse/SPARK-12968
Implement command to set current database.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10916 from viirya/ddl-use-database.
This PR moves all the functionality provided by the SparkSQLParser/ExtendedHiveQlParser to the new Parser hierarchy (SparkQl/HiveQl). This also improves the current SET command parsing: the current implementation swallows ```set role ...``` and ```set autocommit ...``` commands, this PR respects these commands (and passes them on to Hive).
This PR and https://github.com/apache/spark/pull/10723 end the use of Parser-Combinator parsers for SQL parsing. As a result we can also remove the ```AbstractSQLParser``` in Catalyst.
The PR is marked WIP as long as it doesn't pass all tests.
cc rxin viirya winningsix (this touches https://github.com/apache/spark/pull/10144)
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10905 from hvanhovell/SPARK-12866.
This PR is a follow-up of PR #10541. It integrates the newly introduced SQL generation feature with native view to make native view canonical.
In this PR, a new SQL option `spark.sql.nativeView.canonical` is added. When this option and `spark.sql.nativeView` are both `true`, Spark SQL tries to handle `CREATE VIEW` DDL statements using SQL query strings generated from view definition logical plans. If we failed to map the plan to SQL, we fallback to the original native view approach.
One important issue this PR fixes is that, now we can use CTE when defining a view. Originally, when native view is turned on, we wrap the view definition text with an extra `SELECT`. However, HiveQL parser doesn't allow CTE appearing as a subquery. Namely, something like this is disallowed:
```sql
SELECT n
FROM (
WITH w AS (SELECT 1 AS n)
SELECT * FROM w
) v
```
This PR fixes this issue because the extra `SELECT` is no longer needed (also, CTE expressions are inlined as subqueries during analysis phase, thus there won't be CTE expressions in the generated SQL query string).
Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#10733 from liancheng/spark-12728.integrate-sql-gen-with-native-view.
This patch adds support for complex types for ColumnarBatch. ColumnarBatch supports structs
and arrays. There is a simple mapping between the richer catalyst types to these two. Strings
are treated as an array of bytes.
ColumnarBatch will contain a column for each node of the schema. Non-complex schemas consists
of just leaf nodes. Structs represent an internal node with one child for each field. Arrays
are internal nodes with one child. Structs just contain nullability. Arrays contain offsets
and lengths into the child array. This structure is able to handle arbitrary nesting. It has
the key property that we maintain columnar throughout and that primitive types are only stored
in the leaf nodes and contiguous across rows. For example, if the schema is
```
array<array<int>>
```
There are three columns in the schema. The internal nodes each have one children. The leaf node contains all the int data stored consecutively.
As part of this, this patch adds append APIs in addition to the Put APIs (e.g. putLong(rowid, v)
vs appendLong(v)). These APIs are necessary when the batch contains variable length elements.
The vectors are not fixed length and will grow as necessary. This should make the usage a lot
simpler for the writer.
Author: Nong Li <nong@databricks.com>
Closes#10820 from nongli/spark-12854.
This PR adds a new table option (`skip_hive_metadata`) that'd allow the user to skip storing the table metadata in hive metadata format. While this could be useful in general, the specific use-case for this change is that Hive doesn't handle wide schemas well (see https://issues.apache.org/jira/browse/SPARK-12682 and https://issues.apache.org/jira/browse/SPARK-6024) which in turn prevents such tables from being queried in SparkSQL.
Author: Sameer Agarwal <sameer@databricks.com>
Closes#10826 from sameeragarwal/skip-hive-metadata.
When users are using `partitionBy` and `bucketBy` at the same time, some bucketing columns might be part of partitioning columns. For example,
```
df.write
.format(source)
.partitionBy("i")
.bucketBy(8, "i", "k")
.saveAsTable("bucketed_table")
```
However, in the above case, adding column `i` into `bucketBy` is useless. It is just wasting extra CPU when reading or writing bucket tables. Thus, like Hive, we can issue an exception and let users do the change.
Also added a test case for checking if the information of `sortBy` and `bucketBy` columns are correctly saved in the metastore table.
Could you check if my understanding is correct? cloud-fan rxin marmbrus Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10891 from gatorsmile/commonKeysInPartitionByBucketBy.
ErrorPositionSuite and one of the HiveComparisonTest tests have been consistently failing on the Hadoop 2.3 SBT build (but on no other builds). I believe that this is due to test isolation issues (e.g. tests sharing state via the sets of temporary tables that are registered to TestHive).
This patch attempts to improve the isolation of these tests in order to address this issue.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10884 from JoshRosen/fix-failing-hadoop-2.3-hive-tests.
When users turn off bucketing in SQLConf, we should issue some messages to tell users these operations will be converted to normal way.
Also added a test case for this scenario and fixed the helper function.
Do you think this PR is helpful when using bucket tables? cloud-fan Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10870 from gatorsmile/bucketTableWritingTestcases.
The current parser turns a decimal literal, for example ```12.1```, into a Double. The problem with this approach is that we convert an exact literal into a non-exact ```Double```. The PR changes this behavior, a Decimal literal is now converted into an extact ```BigDecimal```.
The behavior for scientific decimals, for example ```12.1e01```, is unchanged. This will be converted into a Double.
This PR replaces the ```BigDecimal``` literal by a ```Double``` literal, because the ```BigDecimal``` is the default now. You can use the double literal by appending a 'D' to the value, for instance: ```3.141527D```
cc davies rxin
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10796 from hvanhovell/SPARK-12848.
The existing `Union` logical operator only supports two children. Thus, adding a new logical operator `Unions` which can have arbitrary number of children to replace the existing one.
`Union` logical plan is a binary node. However, a typical use case for union is to union a very large number of input sources (DataFrames, RDDs, or files). It is not uncommon to union hundreds of thousands of files. In this case, our optimizer can become very slow due to the large number of logical unions. We should change the Union logical plan to support an arbitrary number of children, and add a single rule in the optimizer to collapse all adjacent `Unions` into a single `Unions`. Note that this problem doesn't exist in physical plan, because the physical `Unions` already supports arbitrary number of children.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10577 from gatorsmile/unionAllMultiChildren.
Currently, HiveTableScan runs with getCallSite which is really expensive and shows up when scanning through large table with partitions (e.g TPC-DS) which slows down the overall runtime of the job. It would be good to consider having dummyCallSite in HiveTableScan.
Author: Rajesh Balamohan <rbalamohan@apache.org>
Closes#10825 from rajeshbalamohan/SPARK-12898.
for normal parquet file without bucket, it's file name ends with a jobUUID which maybe all numbers and mistakeny regarded as bucket id. This PR improves the format of bucket id in file name by using a different seperator, `_`, so that the regex is more robust.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10799 from cloud-fan/fix-bucket.
Based on discussions in #10801, I'm submitting a pull request to rename ParserDialect to ParserInterface.
Author: Reynold Xin <rxin@databricks.com>
Closes#10817 from rxin/SPARK-12889.
Right now, the bucket tests are kind of hard to understand, this PR simplifies them and add more commetns.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10813 from cloud-fan/bucket-comment.
This pull request removes the public developer parser API for external parsers. Given everything a parser depends on (e.g. logical plans and expressions) are internal and not stable, external parsers will break with every release of Spark. It is a bad idea to create the illusion that Spark actually supports pluggable parsers. In addition, this also reduces incentives for 3rd party projects to contribute parse improvements back to Spark.
Author: Reynold Xin <rxin@databricks.com>
Closes#10801 from rxin/SPARK-12855.
This is the initial work for whole stage codegen, it support Projection/Filter/Range, we will continue work on this to support more physical operators.
A micro benchmark show that a query with range, filter and projection could be 3X faster then before.
It's turned on by default. For a tree that have at least two chained plans, a WholeStageCodegen will be inserted into it, for example, the following plan
```
Limit 10
+- Project [(id#5L + 1) AS (id + 1)#6L]
+- Filter ((id#5L & 1) = 1)
+- Range 0, 1, 4, 10, [id#5L]
```
will be translated into
```
Limit 10
+- WholeStageCodegen
+- Project [(id#1L + 1) AS (id + 1)#2L]
+- Filter ((id#1L & 1) = 1)
+- Range 0, 1, 4, 10, [id#1L]
```
Here is the call graph to generate Java source for A and B (A support codegen, but B does not):
```
* WholeStageCodegen Plan A FakeInput Plan B
* =========================================================================
*
* -> execute()
* |
* doExecute() --------> produce()
* |
* doProduce() -------> produce()
* |
* doProduce() ---> execute()
* |
* consume()
* doConsume() ------------|
* |
* doConsume() <----- consume()
```
A SparkPlan that support codegen need to implement doProduce() and doConsume():
```
def doProduce(ctx: CodegenContext): (RDD[InternalRow], String)
def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String
```
Author: Davies Liu <davies@databricks.com>
Closes#10735 from davies/whole2.
This PR adds the support to read bucketed tables, and correctly populate `outputPartitioning`, so that we can avoid shuffle for some cases.
TODO(follow-up PRs):
* bucket pruning
* avoid shuffle for bucketed table join when use any super-set of the bucketing key.
(we should re-visit it after https://issues.apache.org/jira/browse/SPARK-12704 is fixed)
* recognize hive bucketed table
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10604 from cloud-fan/bucket-read.
In this PR the new CatalystQl parser stack reaches grammar parity with the old Parser-Combinator based SQL Parser. This PR also replaces all uses of the old Parser, and removes it from the code base.
Although the existing Hive and SQL parser dialects were mostly the same, some kinks had to be worked out:
- The SQL Parser allowed syntax like ```APPROXIMATE(0.01) COUNT(DISTINCT a)```. In order to make this work we needed to hardcode approximate operators in the parser, or we would have to create an approximate expression. ```APPROXIMATE_COUNT_DISTINCT(a, 0.01)``` would also do the job and is much easier to maintain. So, this PR **removes** this keyword.
- The old SQL Parser supports ```LIMIT``` clauses in nested queries. This is **not supported** anymore. See https://github.com/apache/spark/pull/10689 for the rationale for this.
- Hive has a charset name char set literal combination it supports, for instance the following expression ```_ISO-8859-1 0x4341464562616265``` would yield this string: ```CAFEbabe```. Hive will only allow charset names to start with an underscore. This is quite annoying in spark because as soon as you use a tuple names will start with an underscore. In this PR we **remove** this feature from the parser. It would be quite easy to implement such a feature as an Expression later on.
- Hive and the SQL Parser treat decimal literals differently. Hive will turn any decimal into a ```Double``` whereas the SQL Parser would convert a non-scientific decimal into a ```BigDecimal```, and would turn a scientific decimal into a Double. We follow Hive's behavior here. The new parser supports a big decimal literal, for instance: ```81923801.42BD```, which can be used when a big decimal is needed.
cc rxin viirya marmbrus yhuai cloud-fan
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10745 from hvanhovell/SPARK-12575-2.
This PR makes bucketing and exchange share one common hash algorithm, so that we can guarantee the data distribution is same between shuffle and bucketed data source, which enables us to only shuffle one side when join a bucketed table and a normal one.
This PR also fixes the tests that are broken by the new hash behaviour in shuffle.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10703 from cloud-fan/use-hash-expr-in-shuffle.
Fix the style violation (space before , and :).
This PR is a followup for #10643 and rework of #10685 .
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#10732 from sarutak/SPARK-12692-followup-sql.
cloud-fan Can you please take a look ?
In this case, we are failing during check analysis while validating the aggregation expression. I have added a semanticEquals for HiveGenericUDF to fix this. Please let me know if this is the right way to address this issue.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#10520 from dilipbiswal/spark-12558.
This PR implements SQL generation support for persisted data source tables. A new field `metastoreTableIdentifier: Option[TableIdentifier]` is added to `LogicalRelation`. When a `LogicalRelation` representing a persisted data source relation is created, this field holds the database name and table name of the relation.
Author: Cheng Lian <lian@databricks.com>
Closes#10712 from liancheng/spark-12724-datasources-sql-gen.
Fix the style violation (space before , and :).
This PR is a followup for #10643.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#10718 from sarutak/SPARK-12692-followup-sql.
```
[info] Exception encountered when attempting to run a suite with class name:
org.apache.spark.sql.hive.LogicalPlanToSQLSuite *** ABORTED *** (325 milliseconds)
[info] org.apache.spark.sql.AnalysisException: Table `t1` already exists.;
[info] at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:296)
[info] at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:285)
[info] at org.apache.spark.sql.hive.LogicalPlanToSQLSuite.beforeAll(LogicalPlanToSQLSuite.scala:33)
[info] at org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187)
[info] at org.apache.spark.sql.hive.LogicalPlanToSQLSuite.beforeAll(LogicalPlanToSQLSuite.scala:23)
[info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253)
[info] at org.apache.spark.sql.hive.LogicalPlanToSQLSuite.run(LogicalPlanToSQLSuite.scala:23)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info] at java.lang.Thread.run(Thread.java:745)
```
/cc liancheng
Author: wangfei <wangfei_hello@126.com>
Closes#10682 from scwf/fix-test.
The PR allows us to use the new SQL parser to parse SQL expressions such as: ```1 + sin(x*x)```
We enable this functionality in this PR, but we will not start using this actively yet. This will be done as soon as we have reached grammar parity with the existing parser stack.
cc rxin
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10649 from hvanhovell/SPARK-12576.
Turn import ordering violations into build errors, plus a few adjustments
to account for how the checker behaves. I'm a little on the fence about
whether the existing code is right, but it's easier to appease the checker
than to discuss what's the more correct order here.
Plus a few fixes to imports that cropped in since my recent cleanups.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#10612 from vanzin/SPARK-3873-enable.
This PR tries to enable Spark SQL to convert resolved logical plans back to SQL query strings. For now, the major use case is to canonicalize Spark SQL native view support. The major entry point is `SQLBuilder.toSQL`, which returns an `Option[String]` if the logical plan is recognized.
The current version is still in WIP status, and is quite limited. Known limitations include:
1. The logical plan must be analyzed but not optimized
The optimizer erases `Subquery` operators, which contain necessary scope information for SQL generation. Future versions should be able to recover erased scope information by inserting subqueries when necessary.
1. The logical plan must be created using HiveQL query string
Query plans generated by composing arbitrary DataFrame API combinations are not supported yet. Operators within these query plans need to be rearranged into a canonical form that is more suitable for direct SQL generation. For example, the following query plan
```
Filter (a#1 < 10)
+- MetastoreRelation default, src, None
```
need to be canonicalized into the following form before SQL generation:
```
Project [a#1, b#2, c#3]
+- Filter (a#1 < 10)
+- MetastoreRelation default, src, None
```
Otherwise, the SQL generation process will have to handle a large number of special cases.
1. Only a fraction of expressions and basic logical plan operators are supported in this PR
Currently, 95.7% (1720 out of 1798) query plans in `HiveCompatibilitySuite` can be successfully converted to SQL query strings.
Known unsupported components are:
- Expressions
- Part of math expressions
- Part of string expressions (buggy?)
- Null expressions
- Calendar interval literal
- Part of date time expressions
- Complex type creators
- Special `NOT` expressions, e.g. `NOT LIKE` and `NOT IN`
- Logical plan operators/patterns
- Cube, rollup, and grouping set
- Script transformation
- Generator
- Distinct aggregation patterns that fit `DistinctAggregationRewriter` analysis rule
- Window functions
Support for window functions, generators, and cubes etc. will be added in follow-up PRs.
This PR leverages `HiveCompatibilitySuite` for testing SQL generation in a "round-trip" manner:
* For all select queries, we try to convert it back to SQL
* If the query plan is convertible, we parse the generated SQL into a new logical plan
* Run the new logical plan instead of the original one
If the query plan is inconvertible, the test case simply falls back to the original logic.
TODO
- [x] Fix failed test cases
- [x] Support for more basic expressions and logical plan operators (e.g. distinct aggregation etc.)
- [x] Comments and documentation
Author: Cheng Lian <lian@databricks.com>
Closes#10541 from liancheng/sql-generation.
This PR adds bucket write support to Spark SQL. User can specify bucketing columns, numBuckets and sorting columns with or without partition columns. For example:
```
df.write.partitionBy("year").bucketBy(8, "country").sortBy("amount").saveAsTable("sales")
```
When bucketing is used, we will calculate bucket id for each record, and group the records by bucket id. For each group, we will create a file with bucket id in its name, and write data into it. For each bucket file, if sorting columns are specified, the data will be sorted before write.
Note that there may be multiply files for one bucket, as the data is distributed.
Currently we store the bucket metadata at hive metastore in a non-hive-compatible way. We use different bucketing hash function compared to hive, so we can't be compatible anyway.
Limitations:
* Can't write bucketed data without hive metastore.
* Can't insert bucketed data into existing hive tables.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10498 from cloud-fan/bucket-write.
This PR moves a major part of the new SQL parser to Catalyst. This is a prelude to start using this parser for all of our SQL parsing. The following key changes have been made:
The ANTLR Parser & Supporting classes have been moved to the Catalyst project. They are now part of the ```org.apache.spark.sql.catalyst.parser``` package. These classes contained quite a bit of code that was originally from the Hive project, I have added aknowledgements whenever this applied. All Hive dependencies have been factored out. I have also taken this chance to clean-up the ```ASTNode``` class, and to improve the error handling.
The HiveQl object that provides the functionality to convert an AST into a LogicalPlan has been refactored into three different classes, one for every SQL sub-project:
- ```CatalystQl```: This implements Query and Expression parsing functionality.
- ```SparkQl```: This is a subclass of CatalystQL and provides SQL/Core only functionality such as Explain and Describe.
- ```HiveQl```: This is a subclass of ```SparkQl``` and this adds Hive-only functionality to the parser such as Analyze, Drop, Views, CTAS & Transforms. This class still depends on Hive.
cc rxin
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10583 from hvanhovell/SPARK-12575.
JIRA: https://issues.apache.org/jira/browse/SPARK-12578
Slightly update to Hive parser. We should keep the distinct keyword when used in an aggregate function with OVER clause. So the CheckAnalysis will detect it and throw exception later.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#10557 from viirya/keep-distinct-hivesql.
I looked at each case individually and it looks like they can all be removed. The only one that I had to think twice was toArray (I even thought about un-deprecating it, until I realized it was a problem in Java to have toArray returning java.util.List).
Author: Reynold Xin <rxin@databricks.com>
Closes#10569 from rxin/SPARK-12615.
just write the arguments into unsafe row and use murmur3 to calculate hash code
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10435 from cloud-fan/hash-expr.
This PR enable cube/rollup as function, so they can be used as this:
```
select a, b, sum(c) from t group by rollup(a, b)
```
Author: Davies Liu <davies@databricks.com>
Closes#10522 from davies/rollup.
This PR inlines the Hive SQL parser in Spark SQL.
The previous (merged) incarnation of this PR passed all tests, but had and still has problems with the build. These problems are caused by a the fact that - for some reason - in some cases the ANTLR generated code is not included in the compilation fase.
This PR is a WIP and should not be merged until we have sorted out the build issues.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
Closes#10525 from hvanhovell/SPARK-12362.
It's confusing that some operator output UnsafeRow but some not, easy to make mistake.
This PR change to only output UnsafeRow for all the operators (SparkPlan), removed the rule to insert Unsafe/Safe conversions. For those that can't output UnsafeRow directly, added UnsafeProjection into them.
Closes#10330
cc JoshRosen rxin
Author: Davies Liu <davies@databricks.com>
Closes#10511 from davies/unsafe_row.
There's a hack done in `TestHive.reset()`, which intended to mute noisy Hive loggers. However, Spark testing loggers are also muted.
Author: Cheng Lian <lian@databricks.com>
Closes#10540 from liancheng/spark-12592.dont-mute-spark-loggers.
This is a WIP. The PR has been taken over from nongli (see https://github.com/apache/spark/pull/10420). I have removed some additional dead code, and fixed a few issues which were caused by the fact that the inlined Hive parser is newer than the Hive parser we currently use in Spark.
I am submitting this PR in order to get some feedback and testing done. There is quite a bit of work to do:
- [ ] Get it to pass jenkins build/test.
- [ ] Aknowledge Hive-project for using their parser.
- [ ] Refactorings between HiveQl and the java classes.
- [ ] Create our own ASTNode and integrate the current implicit extentions.
- [ ] Move remaining ```SemanticAnalyzer``` and ```ParseUtils``` functionality to ```HiveQl```.
- [ ] Removing Hive dependencies from the parser. This will require some edits in the grammar files.
- [ ] Introduce our own context which needs to contain a ```TokenRewriteStream```.
- [ ] Add ```useSQL11ReservedKeywordsForIdentifier``` and ```allowQuotedId``` to the catalyst or sql configuration.
- [ ] Remove ```HiveConf``` from grammar files &HiveQl, and pass in our own configuration.
- [ ] Moving the parser into sql/core.
cc nongli rxin
Author: Herman van Hovell <hvanhovell@questtec.nl>
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
Closes#10509 from hvanhovell/SPARK-12362.
Fixing the missing the document for the configuration. We can see the missing messages "TODO" when issuing the command "SET -V".
```
spark.sql.columnNameOfCorruptRecord
spark.sql.hive.verifyPartitionPath
spark.sql.sources.parallelPartitionDiscovery.threshold
spark.sql.hive.convertMetastoreParquet.mergeSchema
spark.sql.hive.convertCTAS
spark.sql.hive.thriftServer.async
```
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10471 from gatorsmile/commandDesc.
When explain any plan with Generate, we will see an exclamation mark in the plan. Normally, when we see this mark, it means the plan has an error. This PR is to correct the `missingInput` in `Generate`.
For example,
```scala
val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
val df2 =
df.explode('letters) {
case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
}
df2.explain(true)
```
Before the fix, the plan is like
```
== Parsed Logical Plan ==
'Generate UserDefinedGenerator('letters), true, false, None
+- Project [_1#0 AS number#2,_2#1 AS letters#3]
+- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]]
== Analyzed Logical Plan ==
number: int, letters: string, _1: string
Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8]
+- Project [_1#0 AS number#2,_2#1 AS letters#3]
+- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]]
== Optimized Logical Plan ==
Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8]
+- LocalRelation [number#2,letters#3], [[1,a b c],[2,a b],[3,a]]
== Physical Plan ==
!Generate UserDefinedGenerator(letters#3), true, false, [number#2,letters#3,_1#8]
+- LocalTableScan [number#2,letters#3], [[1,a b c],[2,a b],[3,a]]
```
**Updates**: The same issues are also found in the other four Dataset operators: `MapPartitions`/`AppendColumns`/`MapGroups`/`CoGroup`. Fixed all these four.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10393 from gatorsmile/generateExplain.
This PR is a follow-up of PR #10362.
Two major changes:
1. The fix introduced in #10362 is OK for Parquet, but may disable ORC PPD in many cases
PR #10362 stops converting an `AND` predicate if any branch is inconvertible. On the other hand, `OrcFilters` combines all filters into a single big conjunction first and then tries to convert it into ORC `SearchArgument`. This means, if any filter is inconvertible, no filters can be pushed down. This PR fixes this issue by finding out all convertible filters first before doing the actual conversion.
The reason behind the current implementation is mostly due to the limitation of ORC `SearchArgument` builder, which is documented in this PR in detail.
1. Copied the `AND` predicate fix for ORC from #10362 to avoid merge conflict.
Same as #10362, this PR targets master (2.0.0-SNAPSHOT), branch-1.6, and branch-1.5.
Author: Cheng Lian <lian@databricks.com>
Closes#10377 from liancheng/spark-12218.fix-orc-conjunction-ppd.
https://issues.apache.org/jira/browse/SPARK-11677
Although it checks correctly the filters by the number of results if ORC filter-push-down is enabled, the filters themselves are not being tested.
So, this PR includes the test similarly with `ParquetFilterSuite`.
Since the results are checked by `OrcQuerySuite`, this `OrcFilterSuite` only checks if the appropriate filters are created.
One thing different with `ParquetFilterSuite` here is, it does not check the results because that is checked in `OrcQuerySuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#10341 from HyukjinKwon/SPARK-11677-followup.
JIRA: https://issues.apache.org/jira/browse/SPARK-12218
When creating filters for Parquet/ORC, we should not push nested AND expressions partially.
Author: Yin Huai <yhuai@databricks.com>
Closes#10362 from yhuai/SPARK-12218.
Description of the problem from cloud-fan
Actually this line: https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L689
When we use `selectExpr`, we pass in `UnresolvedFunction` to `DataFrame.select` and fall in the last case. A workaround is to do special handling for UDTF like we did for `explode`(and `json_tuple` in 1.6), wrap it with `MultiAlias`.
Another workaround is using `expr`, for example, `df.select(expr("explode(a)").as(Nil))`, I think `selectExpr` is no longer needed after we have the `expr` function....
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9981 from dilipbiswal/spark-11619.
This PR removes Hive windows functions from Spark and replaces them with (native) Spark ones. The PR is on par with Hive in terms of features.
This has the following advantages:
* Better memory management.
* The ability to use spark UDAFs in Window functions.
cc rxin / yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9819 from hvanhovell/SPARK-8641-2.
Currently ORC filters are not tested properly. All the tests pass even if the filters are not pushed down or disabled. In this PR, I add some logics for this.
Since ORC does not filter record by record fully, this checks the count of the result and if it contains the expected values.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#9687 from HyukjinKwon/SPARK-11677.
Currently, we could generate different plans for query with single distinct (depends on spark.sql.specializeSingleDistinctAggPlanning), one works better on low cardinality columns, the other
works better for high cardinality column (default one).
This PR change to generate a single plan (three aggregations and two exchanges), which work better in both cases, then we could safely remove the flag `spark.sql.specializeSingleDistinctAggPlanning` (introduced in 1.6).
For a query like `SELECT COUNT(DISTINCT a) FROM table` will be
```
AGG-4 (count distinct)
Shuffle to a single reducer
Partial-AGG-3 (count distinct, no grouping)
Partial-AGG-2 (grouping on a)
Shuffle by a
Partial-AGG-1 (grouping on a)
```
This PR also includes large refactor for aggregation (reduce 500+ lines of code)
cc yhuai nongli marmbrus
Author: Davies Liu <davies@databricks.com>
Closes#10228 from davies/single_distinct.
This PR tries to make execution hive's derby run in memory since it is a fake metastore and every time we create a HiveContext, we will switch to a new one. It is possible that it can reduce the flakyness of our tests that need to create HiveContext (e.g. HiveSparkSubmitSuite). I will test it more.
https://issues.apache.org/jira/browse/SPARK-12228
Author: Yin Huai <yhuai@databricks.com>
Closes#10204 from yhuai/derbyInMemory.
This PR adds a `private[sql]` method `metadata` to `SparkPlan`, which can be used to describe detail information about a physical plan during visualization. Specifically, this PR uses this method to provide details of `PhysicalRDD`s translated from a data source relation. For example, a `ParquetRelation` converted from Hive metastore table `default.psrc` is now shown as the following screenshot:
![image](https://cloud.githubusercontent.com/assets/230655/11526657/e10cb7e6-9916-11e5-9afa-f108932ec890.png)
And here is the screenshot for a regular `ParquetRelation` (not converted from Hive metastore table) loaded from a really long path:
![output](https://cloud.githubusercontent.com/assets/230655/11680582/37c66460-9e94-11e5-8f50-842db5309d5a.png)
Author: Cheng Lian <lian@databricks.com>
Closes#10004 from liancheng/spark-12012.physical-rdd-metadata.
This replaces https://github.com/apache/spark/pull/9696
Invoke Checkstyle and print any errors to the console, failing the step.
Use Google's style rules modified according to
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide
Some important checks are disabled (see TODOs in `checkstyle.xml`) due to
multiple violations being present in the codebase.
Suggest fixing those TODOs in a separate PR(s).
More on Checkstyle can be found on the [official website](http://checkstyle.sourceforge.net/).
Sample output (from [build 46345](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46345/consoleFull)) (duplicated because I run the build twice with different profiles):
> Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java:[217,7] (coding) MissingSwitchDefault: switch without "default" clause.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[198,10] (modifier) ModifierOrder: 'protected' modifier out of order with the JLS suggestions.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java:[217,7] (coding) MissingSwitchDefault: switch without "default" clause.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[198,10] (modifier) ModifierOrder: 'protected' modifier out of order with the JLS suggestions.
> [error] running /home/jenkins/workspace/SparkPullRequestBuilder2/dev/lint-java ; received return code 1
Also fix some of the minor violations that didn't require sweeping changes.
Apologies for the previous botched PRs - I finally figured out the issue.
cr: JoshRosen, pwendell
> I state that the contribution is my original work, and I license the work to the project under the project's open source license.
Author: Dmitry Erastov <derastov@gmail.com>
Closes#9867 from dskrvk/master.
When profiling HiveCompatibilitySuite, I noticed that most of the time seems to be spent in expensive `TestHive.reset()` calls. This patch speeds up suites based on HiveComparisionTest, such as HiveCompatibilitySuite, with the following changes:
- Avoid `TestHive.reset()` whenever possible:
- Use a simple set of heuristics to guess whether we need to call `reset()` in between tests.
- As a safety-net, automatically re-run failed tests by calling `reset()` before the re-attempt.
- Speed up the expensive parts of `TestHive.reset()`: loading the `src` and `srcpart` tables took roughly 600ms per test, so we now avoid this by using a simple heuristic which only loads those tables by tests that reference them. This is based on simple string matching over the test queries which errs on the side of loading in more situations than might be strictly necessary.
After these changes, HiveCompatibilitySuite seems to run in about 10 minutes.
This PR is a revival of #6663, an earlier experimental PR from June, where I played around with several possible speedups for this suite.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10055 from JoshRosen/speculative-testhive-reset.
https://issues.apache.org/jira/browse/SPARK-12039
Since it is pretty flaky in hadoop 1 tests, we can disable it while we are investigating the cause.
Author: Yin Huai <yhuai@databricks.com>
Closes#10035 from yhuai/SPARK-12039-ignore.
Fix regression test for SPARK-11778.
marmbrus
Could you please take a look?
Thank you very much!!
Author: Huaxin Gao <huaxing@oc0558782468.ibm.com>
Closes#9890 from huaxingao/spark-11778-regression-test.
If we need to download Hive/Hadoop artifacts, try to download a Hadoop that matches the Hadoop used by Spark. If the Hadoop artifact cannot be resolved (e.g. Hadoop version is a vendor specific version like 2.0.0-cdh4.1.1), we will use Hadoop 2.4.0 (we used to hard code this version as the hadoop that we will download from maven) and we will not share Hadoop classes.
I tested this match in my laptop with the following confs (these confs are used by our builds). All tests are good.
```
build/sbt -Phadoop-1 -Dhadoop.version=1.2.1 -Pkinesis-asl -Phive-thriftserver -Phive
build/sbt -Phadoop-1 -Dhadoop.version=2.0.0-mr1-cdh4.1.1 -Pkinesis-asl -Phive-thriftserver -Phive
build/sbt -Pyarn -Phadoop-2.2 -Pkinesis-asl -Phive-thriftserver -Phive
build/sbt -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Pkinesis-asl -Phive-thriftserver -Phive
```
Author: Yin Huai <yhuai@databricks.com>
Closes#9979 from yhuai/versionsSuite.
When using remote Hive metastore, `hive.metastore.uris` is set to the metastore URI. However, it overrides `javax.jdo.option.ConnectionURL` unexpectedly, thus the execution Hive client connects to the actual remote Hive metastore instead of the Derby metastore created in the temporary directory. Cleaning this configuration for the execution Hive client fixes this issue.
Author: Cheng Lian <lian@databricks.com>
Closes#9895 from liancheng/spark-11783.clean-remote-metastore-config.
This patch attempts to speed up VersionsSuite by storing fetched Hive JARs in an Ivy cache that persists across tests runs. If `SPARK_VERSIONS_SUITE_IVY_PATH` is set, that path will be used for the cache; if it is not set, VersionsSuite will create a temporary Ivy cache which is deleted after the test completes.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9624 from JoshRosen/SPARK-9866.
Can someone review my code to make sure I'm not missing anything? Thanks!
Author: Xiu Guo <xguo27@gmail.com>
Author: Xiu Guo <guoxi@us.ibm.com>
Closes#9612 from xguo27/SPARK-11628.
Hive has since changed this behavior as well. https://issues.apache.org/jira/browse/HIVE-3454
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#9685 from nongli/spark-11724.
This patch fixes an issue where the `spark.sql.TungstenAggregate.testFallbackStartsAt` SQLConf setting was not properly reset / cleared at the end of `TungstenAggregationQueryWithControlledFallbackSuite`. This ended up causing test failures in HiveCompatibilitySuite in Maven builds by causing spilling to occur way too frequently.
This configuration leak was inadvertently introduced during test cleanup in #9618.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9857 from JoshRosen/clear-fallback-prop-in-test-teardown.
In addition, tightened visibility of a lot of classes in the columnar package from private[sql] to private[columnar].
Author: Reynold Xin <rxin@databricks.com>
Closes#9842 from rxin/SPARK-11858.
Fix a bug in DataFrameReader.table (table with schema name such as "db_name.table" doesn't work)
Use SqlParser.parseTableIdentifier to parse the table name before lookupRelation.
Author: Huaxin Gao <huaxing@oc0558782468.ibm.com>
Closes#9773 from huaxingao/spark-11778.
see HIVE-7975 and HIVE-12373
With changed semantic of setters in thrift objects in hive, setter should be called only after all parameters are set. It's not problem of current state but will be a problem in some day.
Author: navis.ryu <navis@apache.org>
Closes#9580 from navis/SPARK-11614.
This PR adds a new option `spark.sql.hive.thriftServer.singleSession` for disabling multi-session support in the Thrift server.
Note that this option is added as a Spark configuration (retrieved from `SparkConf`) rather than Spark SQL configuration (retrieved from `SQLConf`). This is because all SQL configurations are session-ized. Since multi-session support is by default on, no JDBC connection can modify global configurations like the newly added one.
Author: Cheng Lian <lian@databricks.com>
Closes#9740 from liancheng/spark-11089.single-session-option.
According to discussion in PR #9664, the anonymous `HiveFunctionRegistry` in `HiveContext` can be removed now.
Author: Cheng Lian <lian@databricks.com>
Closes#9737 from liancheng/spark-11191.follow-up.
When computing partition for non-parquet relation, `HadoopRDD.compute` is used. but it does not set the thread local variable `inputFileName` in `NewSqlHadoopRDD`, like `NewSqlHadoopRDD.compute` does.. Yet, when getting the `inputFileName`, `NewSqlHadoopRDD.inputFileName` is exptected, which is empty now.
Adding the setting inputFileName in HadoopRDD.compute resolves this issue.
Author: xin Wu <xinwu@us.ibm.com>
Closes#9542 from xwu0226/SPARK-11522.
On driver process start up, UserGroupInformation.loginUserFromKeytab is called with the principal and keytab passed in, and therefore static var UserGroupInfomation,loginUser is set to that principal with kerberos credentials saved in its private credential set, and all threads within the driver process are supposed to see and use this login credentials to authenticate with Hive and Hadoop. However, because of IsolatedClientLoader, UserGroupInformation class is not shared for hive metastore clients, and instead it is loaded separately and of course not able to see the prepared kerberos login credentials in the main thread.
The first proposed fix would cause other classloader conflict errors, and is not an appropriate solution. This new change does kerberos login during hive client initialization, which will make credentials ready for the particular hive client instance.
yhuai Please take a look and let me know. If you are not the right person to talk to, could you point me to someone responsible for this?
Author: Yu Gao <ygao@us.ibm.com>
Author: gaoyu <gaoyu@gaoyu-macbookpro.roam.corp.google.com>
Author: Yu Gao <crystalgaoyu@gmail.com>
Closes#9272 from yolandagao/master.
I didn't remove the old Sort operator, since we still use it in randomized tests. I moved it into test module and renamed it ReferenceSort.
Author: Reynold Xin <rxin@databricks.com>
Closes#9700 from rxin/SPARK-11734.
https://issues.apache.org/jira/browse/SPARK-11678
The change of this PR is to pass root paths of table to the partition discovery logic. So, the process of partition discovery stops at those root paths instead of going all the way to the root path of the file system.
Author: Yin Huai <yhuai@databricks.com>
Closes#9651 from yhuai/SPARK-11678.
When looking up Hive temporary functions, we should always use the `SessionState` within the execution Hive client, since temporary functions are registered there.
Author: Cheng Lian <lian@databricks.com>
Closes#9664 from liancheng/spark-11191.fix-temp-function.
This patch aims to reduce the test time and flakiness of HiveSparkSubmitSuite, SparkSubmitSuite, and CliSuite.
Key changes:
- Disable IO synchronization calls for Derby writes, since durability doesn't matter for tests. This was done for HiveCompatibilitySuite in #6651 and resulted in huge test speedups.
- Add a few missing `--conf`s to disable various Spark UIs. The CliSuite, in particular, never disabled these UIs, leaving it prone to port-contention-related flakiness.
- Fix two instances where tests defined `beforeAll()` methods which were never called because the appropriate traits were not mixed in. I updated these tests suites to extend `BeforeAndAfterEach` so that they play nicely with our `ResetSystemProperties` trait.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9623 from JoshRosen/SPARK-11647.
https://issues.apache.org/jira/browse/SPARK-11500
As filed in SPARK-11500, if merging schemas is enabled, the order of files to touch is a matter which might affect the ordering of the output columns.
This was mostly because of the use of `Set` and `Map` so I replaced them to `LinkedHashSet` and `LinkedHashMap` to keep the insertion order.
Also, I changed `reduceOption` to `reduceLeftOption`, and replaced the order of `filesToTouch` from `metadataStatuses ++ commonMetadataStatuses ++ needMerged` to `needMerged ++ metadataStatuses ++ commonMetadataStatuses` in order to touch the part-files first which always have the schema in footers whereas the others might not exist.
One nit is, If merging schemas is not enabled, but when multiple files are given, there is no guarantee of the output order, since there might not be a summary file for the first file, which ends up putting ahead the columns of the other files.
However, I thought this should be okay since disabling merging schemas means (assumes) all the files have the same schemas.
In addition, in the test code for this, I only checked the names of fields.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#9517 from HyukjinKwon/SPARK-11500.
This PR is a 2nd follow-up for [SPARK-9241](https://issues.apache.org/jira/browse/SPARK-9241). It contains the following improvements:
* Fix for a potential bug in distinct child expression and attribute alignment.
* Improved handling of duplicate distinct child expressions.
* Added test for distinct UDAF with multiple children.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9566 from hvanhovell/SPARK-9241-followup-2.
https://issues.apache.org/jira/browse/SPARK-9830
This PR contains the following main changes.
* Removing `AggregateExpression1`.
* Removing `Aggregate` operator, which is used to evaluate `AggregateExpression1`.
* Removing planner rule used to plan `Aggregate`.
* Linking `MultipleDistinctRewriter` to analyzer.
* Renaming `AggregateExpression2` to `AggregateExpression` and `AggregateFunction2` to `AggregateFunction`.
* Updating places where we create aggregate expression. The way to create aggregate expressions is `AggregateExpression(aggregateFunction, mode, isDistinct)`.
* Changing `val`s in `DeclarativeAggregate`s that touch children of this function to `lazy val`s (when we create aggregate expression in DataFrame API, children of an aggregate function can be unresolved).
Author: Yin Huai <yhuai@databricks.com>
Closes#9556 from yhuai/removeAgg1.
The DataFrame APIs that takes a SQL expression always use SQLParser, then the HiveFunctionRegistry will called outside of Hive state, cause NPE if there is not a active Session State for current thread (in PySpark).
cc rxin yhuai
Author: Davies Liu <davies@databricks.com>
Closes#9576 from davies/hive_udf.
For now they are thin wrappers around the corresponding Hive UDAFs.
One limitation with these in Hive 0.13.0 is they only support aggregating primitive types.
I chose snake_case here instead of camelCase because it seems to be used in the majority of the multi-word fns.
Do we also want to add these to `functions.py`?
This approach was recommended here: https://github.com/apache/spark/pull/8592#issuecomment-154247089
marmbrus rxin
Author: Nick Buroojy <nick.buroojy@civitaslearning.com>
Closes#9526 from nburoojy/nick/udaf-alias.
(cherry picked from commit a6ee4f989d)
Signed-off-by: Michael Armbrust <michael@databricks.com>
The reason is that:
1. For partitioned hive table, we will move the partitioned columns after data columns. (e.g. `<a: Int, b: Int>` partition by `a` will become `<b: Int, a: Int>`)
2. When append data to table, we use position to figure out how to match input columns to table's columns.
So when we append data to partitioned table, we will match wrong columns between input and table. A solution is reordering the input columns before match by position, like what we did for [`InsertIntoHadoopFsRelation`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala#L101-L105)
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9408 from cloud-fan/append.