To keep full compatibility of Parquet write path with Spark 1.4, we should rename the innermost field name of arrays that may contain null from "array_element" to "array".
Please refer to [SPARK-10434] [1] for more details.
[1]: https://issues.apache.org/jira/browse/SPARK-10434
Author: Cheng Lian <lian@databricks.com>
Closes#8586 from liancheng/spark-10434/fix-parquet-array-type.
Jenkins master builders are currently broken by a merge conflict between PR #8584 and PR #8155.
Author: Cheng Lian <lian@databricks.com>
Closes#8614 from liancheng/hotfix/fix-pr-8155-8584-conflict.
This PR takes over https://github.com/apache/spark/pull/8389.
This PR improves `checkAnswer` to print the partially analyzed plan in addition to the user friendly error message, in order to aid debugging failing tests.
In doing so, I ran into a conflict with the various ways that we bring a SQLContext into the tests. Depending on the trait we refer to the current context as `sqlContext`, `_sqlContext`, `ctx` or `hiveContext` with access modifiers `public`, `protected` and `private` depending on the defining class.
I propose we refactor as follows:
1. All tests should only refer to a `protected sqlContext` when testing general features, and `protected hiveContext` when it is a method that only exists on a `HiveContext`.
2. All tests should only import `testImplicits._` (i.e., don't import `TestHive.implicits._`)
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8584 from cloud-fan/cleanupTests.
For example, we can write `SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY key + 1` in PostgreSQL, and we should support this in Spark SQL.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8548 from cloud-fan/support-order-by-non-attribute.
Before #8371, there was a bug for `Sort` on `Aggregate` that we can't use aggregate expressions named `_aggOrdering` and can't use more than one ordering expressions which contains aggregate functions. The reason of this bug is that: The aggregate expression in `SortOrder` never get resolved, we alias it with `_aggOrdering` and call `toAttribute` which gives us an `UnresolvedAttribute`. So actually we are referencing aggregate expression by name, not by exprId like we thought. And if there is already an aggregate expression named `_aggOrdering` or there are more than one ordering expressions having aggregate functions, we will have conflict names and can't search by name.
However, after #8371 got merged, the `SortOrder`s are guaranteed to be resolved and we are always referencing aggregate expression by exprId. The Bug doesn't exist anymore and this PR add regression tests for it.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8231 from cloud-fan/sort-agg.
This PR can be quite challenging to review. I'm trying to give a detailed description of the problem as well as its solution here.
When reading Parquet files, we need to specify a potentially nested Parquet schema (of type `MessageType`) as requested schema for column pruning. This Parquet schema is translated from a Catalyst schema (of type `StructType`), which is generated by the query planner and represents all requested columns. However, this translation can be fairly complicated because of several reasons:
1. Requested schema must conform to the real schema of the physical file to be read.
This means we have to tailor the actual file schema of every individual physical Parquet file to be read according to the given Catalyst schema. Fortunately we are already doing this in Spark 1.5 by pushing request schema conversion to executor side in PR #7231.
1. Support for schema merging.
A single Parquet dataset may consist of multiple physical Parquet files come with different but compatible schemas. This means we may request for a column path that doesn't exist in a physical Parquet file. All requested column paths can be nested. For example, for a Parquet file schema
```
message root {
required group f0 {
required group f00 {
required int32 f000;
required binary f001 (UTF8);
}
}
}
```
we may request for column paths defined in the following schema:
```
message root {
required group f0 {
required group f00 {
required binary f001 (UTF8);
required float f002;
}
}
optional double f1;
}
```
Notice that we pruned column path `f0.f00.f000`, but added `f0.f00.f002` and `f1`.
The good news is that Parquet handles non-existing column paths properly and always returns null for them.
1. The map from `StructType` to `MessageType` is a one-to-many map.
This is the most unfortunate part.
Due to historical reasons (dark histories!), schemas of Parquet files generated by different libraries have different "flavors". For example, to handle a schema with a single non-nullable column, whose type is an array of non-nullable integers, parquet-protobuf generates the following Parquet schema:
```
message m0 {
repeated int32 f;
}
```
while parquet-avro generates another version:
```
message m1 {
required group f (LIST) {
repeated int32 array;
}
}
```
and parquet-thrift spills this:
```
message m1 {
required group f (LIST) {
repeated int32 f_tuple;
}
}
```
All of them can be mapped to the following _unique_ Catalyst schema:
```
StructType(
StructField(
"f",
ArrayType(IntegerType, containsNull = false),
nullable = false))
```
This greatly complicates Parquet requested schema construction, since the path of a given column varies in different cases. To read the array elements from files with the above schemas, we must use `f` for `m0`, `f.array` for `m1`, and `f.f_tuple` for `m2`.
In earlier Spark versions, we didn't try to fix this issue properly. Spark 1.4 and prior versions simply translate the Catalyst schema in a way more or less compatible with parquet-hive and parquet-avro, but is broken in many other cases. Earlier revisions of Spark 1.5 only try to tailor the Parquet file schema at the first level, and ignore nested ones. This caused [SPARK-10301] [spark-10301] as well as [SPARK-10005] [spark-10005]. In PR #8228, I tried to avoid the hard part of the problem and made a minimum change in `CatalystRowConverter` to fix SPARK-10005. However, when taking SPARK-10301 into consideration, keeping hacking `CatalystRowConverter` doesn't seem to be a good idea. So this PR is an attempt to fix the problem in a proper way.
For a given physical Parquet file with schema `ps` and a compatible Catalyst requested schema `cs`, we use the following algorithm to tailor `ps` to get the result Parquet requested schema `ps'`:
For a leaf column path `c` in `cs`:
- if `c` exists in `cs` and a corresponding Parquet column path `c'` can be found in `ps`, `c'` should be included in `ps'`;
- otherwise, we convert `c` to a Parquet column path `c"` using `CatalystSchemaConverter`, and include `c"` in `ps'`;
- no other column paths should exist in `ps'`.
Then comes the most tedious part:
> Given `cs`, `ps`, and `c`, how to locate `c'` in `ps`?
Unfortunately, there's no quick answer, and we have to enumerate all possible structures defined in parquet-format spec. They are:
1. the standard structure of nested types, and
1. cases defined in all backwards-compatibility rules for `LIST` and `MAP`.
The core part of this PR is `CatalystReadSupport.clipParquetType()`, which tailors a given Parquet file schema according to a requested schema in its Catalyst form. Backwards-compatibility rules of `LIST` and `MAP` are covered in `clipParquetListType()` and `clipParquetMapType()` respectively. The column path selection algorithm is implemented in `clipParquetGroupFields()`.
With this PR, we no longer need to do schema tailoring in `CatalystReadSupport` and `CatalystRowConverter`. Another benefit is that, now we can also read Parquet datasets consist of files with different physical Parquet schema but share the same logical schema, for example, files generated by different Parquet libraries. This situation is illustrated by [this test case] [test-case].
[spark-10301]: https://issues.apache.org/jira/browse/SPARK-10301
[spark-10005]: https://issues.apache.org/jira/browse/SPARK-10005
[test-case]: 38644d8a45 (diff-a9b98e28ce3ae30641829dffd1173be2R26)
Author: Cheng Lian <lian@databricks.com>
Closes#8509 from liancheng/spark-10301/fix-parquet-requested-schema.
They don't bring much value since we now have better unit test coverage for hash joins. This will also help reduce the test time.
Author: Reynold Xin <rxin@databricks.com>
Closes#8542 from rxin/SPARK-10378.
Data frame write to DB2 database is failing because by default JDBC data source implementation is generating a table schema with DB2 unsupported data types TEXT for String, and BIT1(1) for Boolean.
This patch registers DB2 JDBC Dialect that maps String, Boolean to valid DB2 data types.
Author: sureshthalamati <suresh.thalamati@gmail.com>
Closes#8393 from sureshthalamati/db2_dialect_spark-10170.
This PR includes the following changes:
- Add `LocalNodeTest` for local operator tests and add unit tests for FilterNode and ProjectNode.
- Add `LimitNode` and `UnionNode` and their unit tests to show how to use `LocalNodeTest`. (SPARK-9991, SPARK-9993)
Author: zsxwing <zsxwing@gmail.com>
Closes#8464 from zsxwing/local-execution.
This fixes the problem that scanning partitioned table causes driver have a high memory pressure and takes down the cluster. Also, with this fix, we will be able to correctly show the query plan of a query consuming partitioned tables.
https://issues.apache.org/jira/browse/SPARK-10339https://issues.apache.org/jira/browse/SPARK-10334
Finally, this PR squeeze in a "quick fix" for SPARK-10301. It is not a real fix, but it just throw a better error message to let user know what to do.
Author: Yin Huai <yhuai@databricks.com>
Closes#8515 from yhuai/partitionedTableScan.
SparkHadoopUtil contains methods that use reflection to work around TaskAttemptContext binary incompatibilities between Hadoop 1.x and 2.x. We should use these methods in more places.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8499 from JoshRosen/use-hadoop-reflection-in-more-places.
When I tested the latest version of spark with exclamation mark, I got some errors. Then I reseted the spark version and found that commit id "a2409d1c8e8ddec04b529ac6f6a12b5993f0eeda" brought the bug. With jline version changing from 0.9.94 to 2.12 after this commit, exclamation mark would be treated as a special character in ConsoleReader.
Author: wangwei <wangwei82@huawei.com>
Closes#8420 from small-wang/jline-SPARK-10226.
Actually using this API requires access to a lot of classes that we might make private by accident. I've added some tests to prevent this.
Author: Michael Armbrust <michael@databricks.com>
Closes#8516 from marmbrus/extraStrategiesTests.
This PR introduces a direct write API for testing Parquet. It's a DSL flavored version of the [`writeDirect` method] [1] comes with parquet-avro testing code. With this API, it's much easier to construct arbitrary Parquet structures. It's especially useful when adding regression tests for various compatibility corner cases.
Sample usage of this API can be found in the new test case added in `ParquetThriftCompatibilitySuite`.
[1]: https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.1/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java#L945-L972
Author: Cheng Lian <lian@databricks.com>
Closes#8454 from liancheng/spark-10289/parquet-testing-direct-write-api.
After this PR, In/InSet/ArrayContain will return null if value is null, instead of false. They also will return null even if there is a null in the set/array.
Author: Davies Liu <davies@databricks.com>
Closes#8492 from davies/fix_in.
This commit fixes an issue where the public SQL `Row` class did not override `hashCode`, causing it to violate the hashCode() + equals() contract. To fix this, I simply ported the `hashCode` implementation from the 1.4.x version of `Row`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8500 from JoshRosen/SPARK-10325 and squashes the following commits:
51ffea1 [Josh Rosen] Override hashCode() for public Row.
Having sizeInBytes in HadoopFsRelation to enable broadcast join.
cc marmbrus
Author: Davies Liu <davies@databricks.com>
Closes#8490 from davies/sizeInByte.
https://issues.apache.org/jira/browse/SPARK-10287
After porting json to HadoopFsRelation, it seems hard to keep the behavior of picking up new files automatically for JSON. This PR removes this behavior, so JSON is consistent with others (ORC and Parquet).
Author: Yin Huai <yhuai@databricks.com>
Closes#8469 from yhuai/jsonRefresh.
In BigDecimal or java.math.BigDecimal, the precision could be smaller than scale, for example, BigDecimal("0.001") has precision = 1 and scale = 3. But DecimalType require that the precision should be larger than scale, so we should use the maximum of precision and scale when inferring the schema from decimal literal.
Author: Davies Liu <davies@databricks.com>
Closes#8428 from davies/smaller_decimal.
This PR:
1. supports transferring arbitrary nested array from JVM to R side in SerDe;
2. based on 1, collect() implemenation is improved. Now it can support collecting data of complex types
from a DataFrame.
Author: Sun Rui <rui.sun@intel.com>
Closes#8276 from sun-rui/SPARK-10048.
Replace `JavaConversions` implicits with `JavaConverters`
Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet.
Author: Sean Owen <sowen@cloudera.com>
Closes#8033 from srowen/SPARK-9613.
Spark SQL's data sources API exposes Catalyst's internal types through its Filter interfaces. This is a problem because types like UTF8String are not stable developer APIs and should not be exposed to third-parties.
This issue caused incompatibilities when upgrading our `spark-redshift` library to work against Spark 1.5.0. To avoid these issues in the future we should only expose public types through these Filter objects. This patch accomplishes this by using CatalystTypeConverters to add the appropriate conversions.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8403 from JoshRosen/datasources-internal-vs-external-types.
We misunderstood the Julian days and nanoseconds of the day in parquet (as TimestampType) from Hive/Impala, they are overlapped, so can't be added together directly.
In order to avoid the confusing rounding when do the converting, we use `2440588` as the Julian Day of epoch of unix timestamp (which should be 2440587.5).
Author: Davies Liu <davies@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Closes#8400 from davies/timestamp_parquet.
This patch adds an analyzer rule to ensure that set operations (union, intersect, and except) are only applied to tables with the same number of columns. Without this rule, there are scenarios where invalid queries can return incorrect results instead of failing with error messages; SPARK-9813 provides one example of this problem. In other cases, the invalid query can crash at runtime with extremely confusing exceptions.
I also performed a bit of cleanup to refactor some of those logical operators' code into a common `SetOperation` base class.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7631 from JoshRosen/SPARK-9293.
PR #8341 is a valid fix for SPARK-10136, but it didn't catch the real root cause. The real problem can be rather tricky to explain, and requires audiences to be pretty familiar with parquet-format spec, especially details of `LIST` backwards-compatibility rules. Let me have a try to give an explanation here.
The structure of the problematic Parquet schema generated by parquet-avro is something like this:
```
message m {
<repetition> group f (LIST) { // Level 1
repeated group array (LIST) { // Level 2
repeated <primitive-type> array; // Level 3
}
}
}
```
(The schema generated by parquet-thrift is structurally similar, just replace the `array` at level 2 with `f_tuple`, and the other one at level 3 with `f_tuple_tuple`.)
This structure consists of two nested legacy 2-level `LIST`-like structures:
1. The repeated group type at level 2 is the element type of the outer array defined at level 1
This group should map to an `CatalystArrayConverter.ElementConverter` when building converters.
2. The repeated primitive type at level 3 is the element type of the inner array defined at level 2
This group should also map to an `CatalystArrayConverter.ElementConverter`.
The root cause of SPARK-10136 is that, the group at level 2 isn't properly recognized as the element type of level 1. Thus, according to parquet-format spec, the repeated primitive at level 3 is left as a so called "unannotated repeated primitive type", and is recognized as a required list of required primitive type, thus a `RepeatedPrimitiveConverter` instead of a `CatalystArrayConverter.ElementConverter` is created for it.
According to parquet-format spec, unannotated repeated type shouldn't appear in a `LIST`- or `MAP`-annotated group. PR #8341 fixed this issue by allowing such unannotated repeated type appear in `LIST`-annotated groups, which is a non-standard, hacky, but valid fix. (I didn't realize this when authoring #8341 though.)
As for the reason why level 2 isn't recognized as a list element type, it's because of the following `LIST` backwards-compatibility rule defined in the parquet-format spec:
> If the repeated field is a group with one field and is named either `array` or uses the `LIST`-annotated group's name with `_tuple` appended then the repeated type is the element type and elements are required.
(The `array` part is for parquet-avro compatibility, while the `_tuple` part is for parquet-thrift.)
This rule is implemented in [`CatalystSchemaConverter.isElementType`] [1], but neglected in [`CatalystRowConverter.isElementType`] [2]. This PR delivers a more robust fix by adding this rule in the latter method.
Note that parquet-avro 1.7.0 also suffers from this issue. Details can be found at [PARQUET-364] [3].
[1]: 85f9a61357/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala (L259-L305)
[2]: 85f9a61357/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala (L456-L463)
[3]: https://issues.apache.org/jira/browse/PARQUET-364
Author: Cheng Lian <lian@databricks.com>
Closes#8361 from liancheng/spark-10136/proper-version.
In `HiveComparisionTest`s it is possible to fail a query of the form `SELECT * FROM dest1`, where `dest1` is the query that is actually computing the incorrect results. To aid debugging this patch improves the harness to also print these query plans and their results.
Author: Michael Armbrust <michael@databricks.com>
Closes#8388 from marmbrus/generatedTables.
https://issues.apache.org/jira/browse/SPARK-10121
Looks like the problem is that if we add a jar through another thread, the thread handling the JDBC session will not get the latest classloader.
Author: Yin Huai <yhuai@databricks.com>
Closes#8368 from yhuai/SPARK-10121.
* Makes `SQLImplicits.rddToDataFrameHolder` scaladoc consistent with `SQLContext.createDataFrame[A <: Product](rdd: RDD[A])` since the former is essentially a wrapper for the latter
* Clarifies `createDataFrame[A <: Product]` scaladoc to apply for any `RDD[Product]`, not just case classes
Author: Feynman Liang <fliang@databricks.com>
Closes#8406 from feynmanliang/sql-doc-fixes.
Currently, we eagerly attempt to resolve functions, even before their children are resolved. However, this is not valid in cases where we need to know the types of the input arguments (i.e. when resolving Hive UDFs).
As a fix, this PR delays function resolution until the functions children are resolved. This change also necessitates a change to the way we resolve aggregate expressions that are not in aggregate operators (e.g., in `HAVING` or `ORDER BY` clauses). Specifically, we can't assume that these misplaced functions will be resolved, allowing us to differentiate aggregate functions from normal functions. To compensate for this change we now attempt to resolve these unresolved expressions in the context of the aggregate operator, before checking to see if any aggregate expressions are present.
Author: Michael Armbrust <michael@databricks.com>
Closes#8371 from marmbrus/hiveUDFResolution.
This adds a missing null check to the Decimal `toScala` converter in `CatalystTypeConverters`, fixing an NPE.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8401 from JoshRosen/SPARK-10190.
Move `test.org.apache.spark.sql.hive` package tests to apparent intended `org.apache.spark.sql.hive` as they don't intend to test behavior from outside org.apache.spark.*
Alternate take, per discussion at https://github.com/apache/spark/pull/8051
I think this is what vanzin and I had in mind but also CC rxin to cross-check, as this does indeed depend on whether these tests were accidentally in this package or not. Testing from a `test.org.apache.spark` package is legitimate but didn't seem to be the intent here.
Author: Sean Owen <sowen@cloudera.com>
Closes#8307 from srowen/SPARK-9758.
This PR refactors `ParquetHiveCompatibilitySuite` so that it's easier to add new test cases.
Hit two bugs, SPARK-10177 and HIVE-11625, while working on this, added test cases for them and marked as ignored for now. SPARK-10177 will be addressed in a separate PR.
Author: Cheng Lian <lian@databricks.com>
Closes#8392 from liancheng/spark-8580/parquet-hive-compat-tests.
This PR contains examples on how to use some of the Stat Functions available for DataFrames under `df.stat`.
rxin
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#8378 from brkyvz/update-sql-docs.
https://issues.apache.org/jira/browse/SPARK-10143
With this PR, we will set min split size to parquet's block size (row group size) set in the conf if the min split size is smaller. So, we can avoid have too many tasks and even useless tasks for reading parquet data.
I tested it locally. The table I have has 343MB and it is in my local FS. Because I did not set any min/max split size, the default split size was 32MB and the map stage had 11 tasks. But there were only three tasks that actually read data. With my PR, there were only three tasks in the map stage. Here is the difference.
Without this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399179/8587dba6-4765-11e5-9189-7ebba52a2b6d.png)
With this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399185/a4735d74-4765-11e5-8848-1f1e361a6b4b.png)
Even if the block size setting does match the actual block size of parquet file, I think it is still generally good to use parquet's block size setting if min split size is smaller than this block size.
Tested it on a cluster using
```
val count = sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count
```
Basically, it reads 0 column of table `store_sales`. My table has 1824 parquet files with size from 80MB to 280MB (1 to 3 row group sizes). Without this patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this patch, the job had 2893 tasks and spent 64s. It is still not as good as using one mapper per file (1824 tasks and 42s), but it is much better than our master.
Author: Yin Huai <yhuai@databricks.com>
Closes#8346 from yhuai/parquetMinSplit.
Type coercion for IF should have children resolved first, or we could meet unresolved exception.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#8331 from adrian-wang/spark10130.
This is based on #7779 , thanks to tarekauel . Fix the conflict and nullability.
Closes#7779 and #8274 .
Author: Tarek Auel <tarek.auel@googlemail.com>
Author: Davies Liu <davies@databricks.com>
Closes#8330 from davies/stringLocate.
This class is identical to `org.apache.spark.sql.execution.datasources.jdbc. DefaultSource` and is not needed.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8334 from cloud-fan/minor.