Commit graph

2919 commits

Author SHA1 Message Date
Wenchen Fan f70f46d1e5 [SPARK-23877][SQL][FOLLOWUP] use PhysicalOperation to simplify the handling of Project and Filter over partitioned relation
## What changes were proposed in this pull request?

A followup of https://github.com/apache/spark/pull/20988

`PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation.

## How was this patch tested?

existing test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21111 from cloud-fan/refactor.
2018-04-23 20:18:50 +08:00
Mykhailo Shtelma c48085aa91 [SPARK-23799][SQL] FilterEstimation.evaluateInSet produces devision by zero in a case of empty table with analyzed statistics
>What changes were proposed in this pull request?

During evaluation of IN conditions, if the source data frame, is represented by a plan, that uses hive table with columns, which were previously analysed, and the plan has conditions for these fields, that cannot be satisfied (which leads us to an empty data frame), FilterEstimation.evaluateInSet method produces NumberFormatException and ClassCastException.
In order to fix this bug, method FilterEstimation.evaluateInSet at first checks, if distinct count is not zero, and also checks if colStat.min and colStat.max  are defined, and only in this case proceeds with the calculation. If at least one of the conditions is not satisfied, zero is returned.

>How was this patch tested?

In order to test the PR two tests were implemented: one in FilterEstimationSuite, that tests the plan with the statistics that violates the conditions mentioned above,  and another one in StatisticsCollectionSuite, that test the whole process of analysis/optimisation of the query, that leads to the problems, mentioned in the first section.

Author: Mykhailo Shtelma <mykhailo.shtelma@bearingpoint.com>
Author: smikesh <mshtelma@gmail.com>

Closes #21052 from mshtelma/filter_estimation_evaluateInSet_Bugs.
2018-04-21 23:33:57 -07:00
gatorsmile 7bc853d089 [SPARK-24033][SQL] Fix Mismatched of Window Frame specifiedwindowframe(RowFrame, -1, -1)
## What changes were proposed in this pull request?

When the OffsetWindowFunction's frame is `UnaryMinus(Literal(1))` but the specified window frame has been simplified to `Literal(-1)` by some optimizer rules e.g., `ConstantFolding`. Thus, they do not match and cause the following error:
```
org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, -1, -1) must match the required frame specifiedwindowframe(RowFrame, -1, -1);
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
at
```
## How was this patch tested?
Added a test

Author: gatorsmile <gatorsmile@gmail.com>

Closes #21115 from gatorsmile/fixLag.
2018-04-21 10:45:12 -07:00
Takeshi Yamamuro 0dd97f6ea4 [SPARK-23595][SQL] ValidateExternalType should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `ValidateExternalType`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20757 from maropu/SPARK-23595.
2018-04-20 15:02:27 +02:00
Takeshi Yamamuro 074a7f9053 [SPARK-23588][SQL][FOLLOW-UP] Resolve a map builder method per execution in CatalystToExternalMap
## What changes were proposed in this pull request?
This pr is a follow-up pr of #20979 and fixes code to resolve a map builder method per execution instead of per row in `CatalystToExternalMap`.

## How was this patch tested?
Existing tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21112 from maropu/SPARK-23588-FOLLOWUP.
2018-04-20 14:43:47 +02:00
mn-mikke e6b466084c [SPARK-23736][SQL] Extending the concat function to support array columns
## What changes were proposed in this pull request?
The PR adds a logic for easy concatenation of multiple array columns and covers:
- Concat expression has been extended to support array columns
- A Python wrapper

## How was this patch tested?
New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite
- typeCoercion/native/concat.sql

## Codegen examples
### Primitive-type elements
```
val df = Seq(
  (Seq(1 ,2), Seq(3, 4)),
  (Seq(1, 2, 3), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 070 */               project_numElements,
/* 071 */               4);
/* 072 */             if (project_size > 2147483632) {
/* 073 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_size +
/* 074 */                 " bytes of data due to exceeding the limit 2147483632 bytes" +
/* 075 */                 " for UnsafeArrayData.");
/* 076 */             }
/* 077 */
/* 078 */             byte[] project_array = new byte[(int)project_size];
/* 079 */             UnsafeArrayData project_arrayData = new UnsafeArrayData();
/* 080 */             Platform.putLong(project_array, 16, project_numElements);
/* 081 */             project_arrayData.pointTo(project_array, 16, (int)project_size);
/* 082 */             int project_counter = 0;
/* 083 */             for (int y = 0; y < 2; y++) {
/* 084 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 085 */                 if (args[y].isNullAt(z)) {
/* 086 */                   project_arrayData.setNullAt(project_counter);
/* 087 */                 } else {
/* 088 */                   project_arrayData.setInt(
/* 089 */                     project_counter,
/* 090 */                     args[y].getInt(z)
/* 091 */                   );
/* 092 */                 }
/* 093 */                 project_counter++;
/* 094 */               }
/* 095 */             }
/* 096 */             return project_arrayData;
/* 097 */           }
/* 098 */         }.concat(project_args);
/* 099 */         boolean project_isNull = project_value == null;
```

### Non-primitive-type elements
```
val df = Seq(
  (Seq("aa" ,"bb"), Seq("ccc", "ddd")),
  (Seq("x", "y"), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             Object[] project_arrayObjects = new Object[(int)project_numElements];
/* 070 */             int project_counter = 0;
/* 071 */             for (int y = 0; y < 2; y++) {
/* 072 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 073 */                 project_arrayObjects[project_counter] = args[y].getUTF8String(z);
/* 074 */                 project_counter++;
/* 075 */               }
/* 076 */             }
/* 077 */             return new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObjects);
/* 078 */           }
/* 079 */         }.concat(project_args);
/* 080 */         boolean project_isNull = project_value == null;
```

Author: mn-mikke <mrkAha12346github>

Closes #20858 from mn-mikke/feature/array-api-concat_arrays-to-master.
2018-04-20 14:58:11 +09:00
Xingbo Jiang d96c3e33cc [SPARK-21811][SQL] Fix the inconsistency behavior when finding the widest common type
## What changes were proposed in this pull request?

Currently we find the wider common type by comparing the two types from left to right, this can be a problem when you have two data types which don't have a common type but each can be promoted to StringType.

For instance, if you have a table with the schema:
[c1: date, c2: string, c3: int]

The following succeeds:
SELECT coalesce(c1, c2, c3) FROM table

While the following produces an exception:
SELECT coalesce(c1, c3, c2) FROM table

This is only a issue when the seq of dataTypes contains `StringType` and all the types can do string promotion.

close #19033

## How was this patch tested?

Add test in `TypeCoercionSuite`

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21074 from jiangxb1987/typeCoercion.
2018-04-19 21:21:22 +08:00
Takeshi Yamamuro e13416502f [SPARK-23588][SQL] CatalystToExternalMap should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `CatalystToExternalMap`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20979 from maropu/SPARK-23588.
2018-04-19 14:42:50 +02:00
Takeshi Yamamuro 1b08c4393c [SPARK-23584][SQL] NewInstance should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `NewInstance`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20778 from maropu/SPARK-23584.
2018-04-19 14:38:26 +02:00
Kazuaki Ishizaki 46bb2b5129 [SPARK-23924][SQL] Add element_at function
## What changes were proposed in this pull request?

The PR adds the SQL function `element_at`. The behavior of the function is based on Presto's one.

This function returns element of array at given index in value if column is array, or returns value for the given key in value if column is map.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21053 from kiszk/SPARK-23924.
2018-04-19 21:00:10 +09:00
Kazuaki Ishizaki d5bec48b9c [SPARK-23919][SQL] Add array_position function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_position`. The behavior of the function is based on Presto's one.

The function returns the position of the first occurrence of the element in array x (or 0 if not found) using 1-based index as BigInt.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21037 from kiszk/SPARK-23919.
2018-04-19 11:59:17 +09:00
Liang-Chi Hsieh a9066478f6 [SPARK-23875][SQL][FOLLOWUP] Add IndexedSeq wrapper for ArrayData
## What changes were proposed in this pull request?

Use specified accessor in `ArrayData.foreach` and `toArray`.

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21099 from viirya/SPARK-23875-followup.
2018-04-19 00:05:47 +02:00
Takuya UESHIN f09a9e9418 [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.
## What changes were proposed in this pull request?

`EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen.

```scala
scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF()
df: org.apache.spark.sql.DataFrame = [_1: double, _2: double]

scala> df.show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+

scala> df.filter("_1 <=> _2").show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+
```

The result should be empty but the result remains two rows.

## How was this patch tested?

Added a test.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21094 from ueshin/issues/SPARK-24007/equalnullsafe.
2018-04-18 08:22:05 -07:00
mn-mikke f81fa478ff [SPARK-23926][SQL] Extending reverse function to support ArrayType arguments
## What changes were proposed in this pull request?

This PR extends `reverse` functions to be able to operate over array columns and covers:
- Introduction of `Reverse` expression that represents logic for reversing arrays and also strings
- Removal of `StringReverse` expression
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(1, 3, 4, 2),
  null
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(reverse($"i")).debugCodegen
```
Result:
```
/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = inputadapter_value.copy();
/* 051 */           for(int k = 0; k < project_length / 2; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             boolean isNullAtK = project_value.isNullAt(k);
/* 054 */             boolean isNullAtL = project_value.isNullAt(l);
/* 055 */             if(!isNullAtK) {
/* 056 */               int el = project_value.getInt(k);
/* 057 */               if(!isNullAtL) {
/* 058 */                 project_value.setInt(k, project_value.getInt(l));
/* 059 */               } else {
/* 060 */                 project_value.setNullAt(k);
/* 061 */               }
/* 062 */               project_value.setInt(l, el);
/* 063 */             } else if (!isNullAtL) {
/* 064 */               project_value.setInt(k, project_value.getInt(l));
/* 065 */               project_value.setNullAt(l);
/* 066 */             }
/* 067 */           }
/* 068 */
/* 069 */         }
```
### Non-primitive type
```
val df = Seq(
  Seq("a", "c", "d", "b"),
  null
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(reverse($"s")).debugCodegen
```
Result:
```
/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(new Object[project_length]);
/* 051 */           for(int k = 0; k < project_length; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             project_value.update(k, inputadapter_value.getUTF8String(l));
/* 054 */           }
/* 055 */
/* 056 */         }
```

Author: mn-mikke <mrkAha12346github>

Closes #21034 from mn-mikke/feature/array-api-reverse-to-master.
2018-04-18 18:41:55 +09:00
maryannxue 1e3b8762a8 [SPARK-21479][SQL] Outer join filter pushdown in null supplying table when condition is on one of the joined columns
## What changes were proposed in this pull request?

Added `TransitPredicateInOuterJoin` optimization rule that transits constraints from the preserved side of an outer join to the null-supplying side. The constraints of the join operator will remain unchanged.

## How was this patch tested?

Added 3 tests in `InferFiltersFromConstraintsSuite`.

Author: maryannxue <maryann.xue@gmail.com>

Closes #20816 from maryannxue/spark-21479.
2018-04-18 10:36:41 +08:00
Marco Gaido f39e82ce15 [SPARK-23986][SQL] freshName can generate non-unique names
## What changes were proposed in this pull request?

We are using `CodegenContext.freshName` to get a unique name for any new variable we are adding. Unfortunately, this method currently fails to create a unique name when we request more than one instance of variables with starting name `name1` and an instance with starting name `name11`.

The PR changes the way a new name is generated by `CodegenContext.freshName` so that we generate unique names in this scenario too.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21080 from mgaido91/SPARK-23986.
2018-04-18 00:35:44 +08:00
jinxing ed4101d29f [SPARK-22676] Avoid iterating all partition paths when spark.sql.hive.verifyPartitionPath=true
## What changes were proposed in this pull request?

In current code, it will scanning all partition paths when spark.sql.hive.verifyPartitionPath=true.
e.g. table like below:
```
CREATE TABLE `test`(
`id` int,
`age` int,
`name` string)
PARTITIONED BY (
`A` string,
`B` string)
load data local inpath '/tmp/data0' into table test partition(A='00', B='00')
load data local inpath '/tmp/data1' into table test partition(A='01', B='01')
load data local inpath '/tmp/data2' into table test partition(A='10', B='10')
load data local inpath '/tmp/data3' into table test partition(A='11', B='11')
```
If I query with SQL – "select * from test where A='00' and B='01'  ", current code will scan all partition paths including '/data/A=00/B=00', '/data/A=00/B=00', '/data/A=01/B=01', '/data/A=10/B=10', '/data/A=11/B=11'. It costs much time and memory cost.

This pr proposes to avoid iterating all partition paths. Add a config `spark.files.ignoreMissingFiles` and ignore the `file not found` when `getPartitions/compute`(for hive table scan). This is much like the logic brought by
`spark.sql.files.ignoreMissingFiles`(which is for datasource scan).

## How was this patch tested?
UT

Author: jinxing <jinxing6042@126.com>

Closes #19868 from jinxing64/SPARK-22676.
2018-04-17 21:52:33 +08:00
Marco Gaido 0a9172a05e [SPARK-23835][SQL] Add not-null check to Tuples' arguments deserialization
## What changes were proposed in this pull request?

There was no check on nullability for arguments of `Tuple`s. This could lead to have weird behavior when a null value had to be deserialized into a non-nullable Scala object: in those cases, the `null` got silently transformed in a valid value (like `-1` for `Int`), corresponding to the default value we are using in the SQL codebase. This situation was very likely to happen when deserializing to a Tuple of primitive Scala types (like Double, Int, ...).

The PR adds the `AssertNotNull` to arguments of tuples which have been asked to be converted to non-nullable types.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20976 from mgaido91/SPARK-23835.
2018-04-17 21:45:20 +08:00
Liang-Chi Hsieh 30ffb53cad [SPARK-23875][SQL] Add IndexedSeq wrapper for ArrayData
## What changes were proposed in this pull request?

We don't have a good way to sequentially access `UnsafeArrayData` with a common interface such as `Seq`. An example is `MapObject` where we need to access several sequence collection types together. But `UnsafeArrayData` doesn't implement `ArrayData.array`. Calling `toArray` will copy the entire array. We can provide an `IndexedSeq` wrapper for `ArrayData`, so we can avoid copying the entire array.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20984 from viirya/SPARK-23875.
2018-04-17 15:09:36 +02:00
Marco Gaido 14844a62c0 [SPARK-23918][SQL] Add array_min function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_min`. It takes an array as argument and returns the minimum value in it.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21025 from mgaido91/SPARK-23918.
2018-04-17 17:55:35 +09:00
Liang-Chi Hsieh fd990a908b [SPARK-23873][SQL] Use accessors in interpreted LambdaVariable
## What changes were proposed in this pull request?

Currently, interpreted execution of `LambdaVariable` just uses `InternalRow.get` to access element. We should use specified accessors if possible.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20981 from viirya/SPARK-23873.
2018-04-16 22:45:57 +02:00
Marco Gaido 6931022031 [SPARK-23917][SQL] Add array_max function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_max`. It takes an array as argument and returns the maximum value in it.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21024 from mgaido91/SPARK-23917.
2018-04-15 21:45:55 -07:00
Liang-Chi Hsieh 73f28530d6 [SPARK-23979][SQL] MultiAlias should not be a CodegenFallback
## What changes were proposed in this pull request?

Just found `MultiAlias` is a `CodegenFallback`. It should not be as looks like `MultiAlias` won't be evaluated.

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21065 from viirya/multialias-without-codegenfallback.
2018-04-14 08:59:04 +08:00
Tathagata Das cbb41a0c5b [SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common CheckpointFileManager interface
## What changes were proposed in this pull request?

Checkpoint files (offset log files, state store files) in Structured Streaming must be written atomically such that no partial files are generated (would break fault-tolerance guarantees). Currently, there are 3 locations which try to do this individually, and in some cases, incorrectly.

1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any implementation of `FileSystem` or `FileContext` APIs. It preferably loads `FileContext` implementation as FileContext of HDFS has atomic renames.
1. HDFSBackedStateStore (aka in-memory state store)
  - Writing a version.delta file - This uses FileSystem APIs only to perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem implementation.
  - Writing a snapshot file - Same as above.

#### Current problems:
1. State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename.
1. Inflexible - Some file systems provide mechanisms other than write-to-temp-file-and-rename for writing atomically and more efficiently. For example, with S3 you can write directly to the final file and it will be made visible only when the entire file is written and closed correctly. Any failure can be made to terminate the writing without making any partial files visible in S3. The current code does not abstract out this mechanism enough that it can be customized.

#### Solution:

1. Introduce a common interface that all 3 cases above can use to write checkpoint files atomically.
2. This interface must provide the necessary interfaces that allow customization of the write-and-rename mechanism.

This PR does that by introducing the interface `CheckpointFileManager` and modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. Similar to earlier `FileManager`, there are implementations based on `FileSystem` and `FileContext` APIs, and the latter implementation is preferred to make it work correctly with HDFS.

The key method this interface has is `createAtomic(path, overwrite)` which returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All users of this method need to either call `close()` to successfully write the file, or `cancel()` in case of an error.

## How was this patch tested?
New tests in `CheckpointFileManagerSuite` and slightly modified existing tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21048 from tdas/SPARK-23966.
2018-04-13 16:31:39 -07:00
Marco Gaido 25892f3cc9 [SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer
## What changes were proposed in this pull request?

Added a new rule to remove Sort operation when its child is already sorted.
For instance, this simple code:
```
spark.sparkContext.parallelize(Seq(("a", "b"))).toDF("a", "b").registerTempTable("table1")
val df = sql(s"""SELECT b
                | FROM (
                |     SELECT a, b
                |     FROM table1
                |     ORDER BY a
                | ) t
                | ORDER BY a""".stripMargin)
df.explain(true)
```
before the PR produces this plan:
```
== Parsed Logical Plan ==
'Sort ['a ASC NULLS FIRST], true
+- 'Project ['b]
   +- 'SubqueryAlias t
      +- 'Sort ['a ASC NULLS FIRST], true
         +- 'Project ['a, 'b]
            +- 'UnresolvedRelation `table1`

== Analyzed Logical Plan ==
b: string
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
      +- SubqueryAlias t
         +- Sort [a#6 ASC NULLS FIRST], true
            +- Project [a#6, b#7]
               +- SubqueryAlias table1
                  +- Project [_1#3 AS a#6, _2#4 AS b#7]
                     +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
                        +- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
      +- Sort [a#6 ASC NULLS FIRST], true
         +- Project [_1#3 AS a#6, _2#4 AS b#7]
            +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
               +- ExternalRDD [obj#2]

== Physical Plan ==
*(3) Project [b#7]
+- *(3) Sort [a#6 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200)
      +- *(2) Project [b#7, a#6]
         +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0
            +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200)
               +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7]
                  +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
                     +- Scan ExternalRDDScan[obj#2]
```

while after the PR produces:

```
== Parsed Logical Plan ==
'Sort ['a ASC NULLS FIRST], true
+- 'Project ['b]
   +- 'SubqueryAlias t
      +- 'Sort ['a ASC NULLS FIRST], true
         +- 'Project ['a, 'b]
            +- 'UnresolvedRelation `table1`

== Analyzed Logical Plan ==
b: string
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
      +- SubqueryAlias t
         +- Sort [a#6 ASC NULLS FIRST], true
            +- Project [a#6, b#7]
               +- SubqueryAlias table1
                  +- Project [_1#3 AS a#6, _2#4 AS b#7]
                     +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
                        +- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [_1#3 AS a#6, _2#4 AS b#7]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
         +- ExternalRDD [obj#2]

== Physical Plan ==
*(2) Project [b#7]
+- *(2) Sort [a#6 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 5)
      +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7]
         +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
            +- Scan ExternalRDDScan[obj#2]
```

this means that an unnecessary sort operation is not performed after the PR.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20560 from mgaido91/SPARK-23375.
2018-04-14 01:01:00 +08:00
yucai 0323e61465 [SPARK-23905][SQL] Add UDF weekday
## What changes were proposed in this pull request?

Add UDF weekday

## How was this patch tested?

A new test

Author: yucai <yyu1@ebay.com>

Closes #21009 from yucai/SPARK-23905.
2018-04-13 00:00:04 -07:00
jerryshao 14291b061b [SPARK-23748][SS] Fix SS continuous process doesn't support SubqueryAlias issue
## What changes were proposed in this pull request?

Current SS continuous doesn't support processing on temp table or `df.as("xxx")`, SS will throw an exception as LogicalPlan not supported, details described in [here](https://issues.apache.org/jira/browse/SPARK-23748).

So here propose to add this support.

## How was this patch tested?

new UT.

Author: jerryshao <sshao@hortonworks.com>

Closes #21017 from jerryshao/SPARK-23748.
2018-04-12 20:00:25 -07:00
Kazuaki Ishizaki 0b19122d43 [SPARK-23762][SQL] UTF8StringBuffer uses MemoryBlock
## What changes were proposed in this pull request?

This PR tries to use `MemoryBlock` in `UTF8StringBuffer`. In general, there are two advantages to use `MemoryBlock`.

1. Has clean API calls rather than using a Java array or `PlatformMemory`
2. Improve runtime performance of memory access instead of using `Object`.

## How was this patch tested?

Added `UTF8StringBufferSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20871 from kiszk/SPARK-23762.
2018-04-12 22:21:30 +08:00
Herman van Hovell c604d659e1 [SPARK-23951][SQL] Use actual java class instead of string representation.
## What changes were proposed in this pull request?
This PR slightly refactors the newly added `ExprValue` API by quite a bit. The following changes are introduced:

1. `ExprValue` now uses the actual class instead of the class name as its type. This should give some more flexibility with generating code in the future.
2. Renamed `StatementValue` to `SimpleExprValue`. The statement concept is broader then an expression (untyped and it cannot be on the right hand side of an assignment), and this was not really what we were using it for. I have added a top level `JavaCode` trait that can be used in the future to reinstate (no pun intended) a statement a-like code fragment.
3. Added factory methods to the `JavaCode` companion object to make it slightly less verbose to create `JavaCode`/`ExprValue` objects. This is also what makes the diff quite large.
4. Added one more factory method to `ExprCode` to make it easier to create code-less expressions.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #21026 from hvanhovell/SPARK-23951.
2018-04-11 20:11:03 +08:00
Gengliang Wang e179658914 [SPARK-19724][SQL][FOLLOW-UP] Check location of managed table when ignoreIfExists is true
## What changes were proposed in this pull request?

In the PR #20886, I mistakenly check the table location only when `ignoreIfExists` is false, which was following the original deprecated PR.
That was wrong. When `ignoreIfExists` is true and the target table doesn't exist, we should also check the table location. In other word, **`ignoreIfExists` has nothing to do with table location validation**.
This is a follow-up PR to fix the mistake.

## How was this patch tested?

Add one unit test.

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21001 from gengliangwang/SPARK-19724-followup.
2018-04-10 09:33:09 -07:00
Herman van Hovell 3323b156f9 [SPARK-23864][SQL] Add unsafe object writing to UnsafeWriter
## What changes were proposed in this pull request?
This PR moves writing of `UnsafeRow`, `UnsafeArrayData` & `UnsafeMapData` out of the `GenerateUnsafeProjection`/`InterpretedUnsafeProjection` classes into the `UnsafeWriter` interface. This cleans up the code a little bit, and it should also result in less byte code for the code generated path.

## How was this patch tested?
Existing tests

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #20986 from hvanhovell/SPARK-23864.
2018-04-10 17:32:00 +02:00
Herman van Hovell 6498884154 [SPARK-23898][SQL] Simplify add & subtract code generation
## What changes were proposed in this pull request?
Code generation for the `Add` and `Subtract` expressions was not done using the `BinaryArithmetic.doCodeGen` method because these expressions also support `CalendarInterval`. This leads to a bit of duplication.

This PR gets rid of that duplication by adding `calendarIntervalMethod` to `BinaryArithmetic` and doing the code generation for `CalendarInterval` in `BinaryArithmetic` instead.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #21005 from hvanhovell/SPARK-23898.
2018-04-09 21:49:49 -07:00
Kris Mok f94f3624ea [SPARK-23947][SQL] Add hashUTF8String convenience method to hasher classes
## What changes were proposed in this pull request?

Add `hashUTF8String()` to the hasher classes to allow Spark SQL codegen to generate cleaner code for hashing `UTF8String`s. No change in behavior otherwise.

Although with the introduction of SPARK-10399, the code size for hashing `UTF8String` is already smaller, it's still good to extract a separate function in the hasher classes so that the generated code can stay clean.

## How was this patch tested?

Existing tests.

Author: Kris Mok <kris.mok@databricks.com>

Closes #21016 from rednaxelafx/hashutf8.
2018-04-09 21:07:28 -07:00
Liang-Chi Hsieh 7c1654e215 [SPARK-22856][SQL] Add wrappers for codegen output and nullability
## What changes were proposed in this pull request?

The codegen output of `Expression`, aka `ExprCode`, now encapsulates only strings of output value (`value`) and nullability (`isNull`). It makes difficulty for us to know what the output really is. I think it is better if we can add wrappers for the value and nullability that let us to easily know that.

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20043 from viirya/SPARK-22856.
2018-04-09 11:54:35 -07:00
Kazuaki Ishizaki 8d40a79a07 [SPARK-23893][CORE][SQL] Avoid possible integer overflow in multiplication
## What changes were proposed in this pull request?

This PR avoids possible overflow at an operation `long = (long)(int * int)`. The multiplication of large positive integer values may set one to MSB. This leads to a negative value in long while we expected a positive value (e.g. `0111_0000_0000_0000 * 0000_0000_0000_0010`).

This PR performs long cast before the multiplication to avoid this situation.

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21002 from kiszk/SPARK-23893.
2018-04-08 20:40:27 +02:00
Kazuaki Ishizaki b6935ffb4d [SPARK-10399][SPARK-23879][HOTFIX] Fix Java lint errors
## What changes were proposed in this pull request?

This PR fixes the following errors in [Java lint](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-lint/7717/console) after #19222 has been merged. These errors were pointed by ueshin .

```
[ERROR] src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java:[57] (sizes) LineLength: Line is longer than 100 characters (found 106).
[ERROR] src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java:[26,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java:[23,10] (modifier) ModifierOrder: 'public' modifier out of order with the JLS suggestions.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[64,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[69,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[74,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[79,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[84,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[89,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[94,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[99,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[104,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[109,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[114,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[119,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[124,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[129,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[60,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[65,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[70,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[75,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[80,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[85,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[90,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[95,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[100,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[105,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[110,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[115,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[120,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[125,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java:[114,16] (modifier) ModifierOrder: 'static' modifier out of order with the JLS suggestions.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java:[20,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java:[30,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.memory.MemoryBlock.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[126,15] (naming) MethodName: Method name 'ByteArrayMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[143,15] (naming) MethodName: Method name 'OnHeapMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[160,15] (naming) MethodName: Method name 'OffHeapArrayMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java:[19,8] (imports) UnusedImports: Unused import - com.google.common.primitives.Ints.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java:[21,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java:[20,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20991 from kiszk/SPARK-10399-jlint.
2018-04-06 10:23:26 -07:00
Gengliang Wang 249007e37f [SPARK-19724][SQL] create a managed table with an existed default table should throw an exception
## What changes were proposed in this pull request?
This PR is to finish https://github.com/apache/spark/pull/17272

This JIRA is a follow up work after SPARK-19583

As we discussed in that PR

The following DDL for a managed table with an existed default location should throw an exception:

CREATE TABLE ... (PARTITIONED BY ...) AS SELECT ...
CREATE TABLE ... (PARTITIONED BY ...)
Currently there are some situations which are not consist with above logic:

CREATE TABLE ... (PARTITIONED BY ...) succeed with an existed default location
situation: for both hive/datasource(with HiveExternalCatalog/InMemoryCatalog)

CREATE TABLE ... (PARTITIONED BY ...) AS SELECT ...
situation: hive table succeed with an existed default location

This PR is going to make above two situations consist with the logic that it should throw an exception
with an existed default location.
## How was this patch tested?

unit test added

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #20886 from gengliangwang/pr-17272.
2018-04-05 20:19:25 -07:00
JiahuiJiang d65e531b44 [SPARK-23823][SQL] Keep origin in transformExpression
Fixes https://issues.apache.org/jira/browse/SPARK-23823

Keep origin for all the methods using transformExpression

## What changes were proposed in this pull request?

Keep origin in transformExpression

## How was this patch tested?

Manually tested that this fixes https://issues.apache.org/jira/browse/SPARK-23823 and columns have correct origins after Analyzer.analyze

Author: JiahuiJiang <jjiang@palantir.com>
Author: Jiahui Jiang <jjiang@palantir.com>

Closes #20961 from JiahuiJiang/jj/keep-origin.
2018-04-05 20:06:08 -07:00
Kazuaki Ishizaki 4807d381bb [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks to choose several types of memory block
## What changes were proposed in this pull request?

This PR allows us to use one of several types of `MemoryBlock`, such as byte array, int array, long array, or `java.nio.DirectByteBuffer`. To use `java.nio.DirectByteBuffer` allows to have off heap memory which is automatically deallocated by JVM. `MemoryBlock`  class has primitive accessors like `Platform.getInt()`, `Platform.putint()`, or `Platform.copyMemory()`.

This PR uses `MemoryBlock` for `OffHeapColumnVector`, `UTF8String`, and other places. This PR can improve performance of operations involving memory accesses (e.g. `UTF8String.trim`) by 1.8x.

For now, this PR does not use `MemoryBlock` for `BufferHolder` based on cloud-fan's [suggestion](https://github.com/apache/spark/pull/11494#issuecomment-309694290).

Since this PR is a successor of #11494, close #11494. Many codes were ported from #11494. Many efforts were put here. **I think this PR should credit to yzotov.**

This PR can achieve **1.1-1.4x performance improvements** for  operations in `UTF8String` or `Murmur3_x86_32`. Other operations are almost comparable performances.

Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Hash byte arrays with length 268435487:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32                                 526 /  536          0.0   131399881.5       1.0X

UTF8String benchmark:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
hashCode                                       525 /  552       1022.6           1.0       1.0X
substring                                      414 /  423       1298.0           0.8       1.3X
```

With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Hash byte arrays with length 268435487:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32                                 474 /  488          0.0   118552232.0       1.0X

UTF8String benchmark:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
hashCode                                       476 /  480       1127.3           0.9       1.0X
substring                                      287 /  291       1869.9           0.5       1.7X
```

Benchmark program
```
test("benchmark Murmur3_x86_32") {
  val length = 8192 * 32768 + 31
  val seed = 42L
  val iters = 1 << 2
  val random = new Random(seed)
  val arrays = Array.fill[MemoryBlock](numArrays) {
    val bytes = new Array[Byte](length)
    random.nextBytes(bytes)
    new ByteArrayMemoryBlock(bytes, Platform.BYTE_ARRAY_OFFSET, length)
  }

  val benchmark = new Benchmark("Hash byte arrays with length " + length,
    iters * numArrays, minNumIters = 20)
  benchmark.addCase("HiveHasher") { _: Int =>
    var sum = 0L
    for (_ <- 0L until iters) {
      sum += HiveHasher.hashUnsafeBytesBlock(
        arrays(i), Platform.BYTE_ARRAY_OFFSET, length)
    }
  }
  benchmark.run()
}

test("benchmark UTF8String") {
  val N = 512 * 1024 * 1024
  val iters = 2
  val benchmark = new Benchmark("UTF8String benchmark", N, minNumIters = 20)
  val str0 = new java.io.StringWriter() { { for (i <- 0 until N) { write(" ") } } }.toString
  val s0 = UTF8String.fromString(str0)
  benchmark.addCase("hashCode") { _: Int =>
    var h: Int = 0
    for (_ <- 0L until iters) { h += s0.hashCode }
  }
  benchmark.addCase("substring") { _: Int =>
    var s: UTF8String = null
    for (_ <- 0L until iters) { s = s0.substring(N / 2 - 5, N / 2 + 5) }
  }
  benchmark.run()
}
```

I run [this benchmark program](https://gist.github.com/kiszk/94f75b506c93a663bbbc372ffe8f05de) using [the commit](ee5a79861c). I got the following results:

```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Memory access benchmarks:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ByteArrayMemoryBlock get/putInt()              220 /  221        609.3           1.6       1.0X
Platform get/putInt(byte[])                    220 /  236        610.9           1.6       1.0X
Platform get/putInt(Object)                    492 /  494        272.8           3.7       0.4X
OnHeapMemoryBlock get/putLong()                322 /  323        416.5           2.4       0.7X
long[]                                         221 /  221        608.0           1.6       1.0X
Platform get/putLong(long[])                   321 /  321        418.7           2.4       0.7X
Platform get/putLong(Object)                   561 /  563        239.2           4.2       0.4X
```

I also run [this benchmark program](https://gist.github.com/kiszk/5fdb4e03733a5d110421177e289d1fb5) for comparing performance of `Platform.copyMemory()`.
```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Platform copyMemory:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Object to Object                              1961 / 1967          8.6         116.9       1.0X
System.arraycopy Object to Object             1917 / 1921          8.8         114.3       1.0X
byte array to byte array                      1961 / 1968          8.6         116.9       1.0X
System.arraycopy byte array to byte array      1909 / 1937          8.8         113.8       1.0X
int array to int array                        1921 / 1990          8.7         114.5       1.0X
double array to double array                  1918 / 1923          8.7         114.3       1.0X
Object to byte array                          1961 / 1967          8.6         116.9       1.0X
Object to short array                         1965 / 1972          8.5         117.1       1.0X
Object to int array                           1910 / 1915          8.8         113.9       1.0X
Object to float array                         1971 / 1978          8.5         117.5       1.0X
Object to double array                        1919 / 1944          8.7         114.4       1.0X
byte array to Object                          1959 / 1967          8.6         116.8       1.0X
int array to Object                           1961 / 1970          8.6         116.9       1.0X
double array to Object                        1917 / 1924          8.8         114.3       1.0X
```

These results show three facts:
1. According to the second/third or sixth/seventh results in the first experiment, if we use `Platform.get/putInt(Object)`, we achieve more than 2x worse performance than `Platform.get/putInt(byte[])` with concrete type (i.e. `byte[]`).
2. According to the second/third or fourth/fifth/sixth results in the first experiment, the fastest way to access an array element on Java heap is `array[]`. **Cons of `array[]` is that it is not possible to support unaligned-8byte access.**
3. According to the first/second/third or fourth/sixth/seventh results in the first experiment, `getInt()/putInt() or getLong()/putLong()` in subclasses of `MemoryBlock` can achieve comparable performance to `Platform.get/putInt()` or `Platform.get/putLong()` with concrete type (second or sixth result). There is no overhead regarding virtual call.
4. According to results in the second experiment, for `Platform.copy()`, to pass `Object` can achieve the same performance as to pass any type of primitive array as source or destination.
5. According to second/fourth results in the second experiment, `Platform.copy()` can achieve the same performance as `System.arrayCopy`. **It would be good to use `Platform.copy()` since `Platform.copy()` can take any types for src and dst.**

We are incrementally replace `Platform.get/putXXX` with `MemoryBlock.get/putXXX`. This is because we have two advantages.
1) Achieve better performance due to having a concrete type for an array.
2) Use simple OO design instead of passing `Object`
It is easy to use `MemoryBlock` in `InternalRow`, `BufferHolder`, `TaskMemoryManager`, and others that are already abstracted. It is not easy to use `MemoryBlock` in utility classes related to hashing or others.

Other candidates are
- UnsafeRow, UnsafeArrayData, UnsafeMapData, SpecificUnsafeRowJoiner
- UTF8StringBuffer
- BufferHolder
- TaskMemoryManager
- OnHeapColumnVector
- BytesToBytesMap
- CachedBatch
- classes for hash
- others.

## How was this patch tested?

Added `UnsafeMemoryAllocator`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19222 from kiszk/SPARK-10399.
2018-04-06 10:13:59 +08:00
Liang-Chi Hsieh d9ca1c906b [SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean expression
## What changes were proposed in this pull request?

Add interpreted execution for `InitializeJavaBean` expression.

## How was this patch tested?

Added unit test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20985 from viirya/SPARK-23593-2.
2018-04-05 20:43:05 +02:00
Herman van Hovell b2329fb1fc Revert "[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean expression"
This reverts commit c5c8b54404.
2018-04-05 13:57:41 +02:00
Kazuaki Ishizaki 1822ecda51 [SPARK-23582][SQL] StaticInvoke should support interpreted execution
## What changes were proposed in this pull request?

This pr added interpreted execution for `StaticInvoke`.

## How was this patch tested?

Added tests in `ObjectExpressionsSuite`.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20753 from kiszk/SPARK-23582.
2018-04-05 13:47:06 +02:00
Liang-Chi Hsieh c5c8b54404 [SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean expression
## What changes were proposed in this pull request?

Add interpreted execution for `InitializeJavaBean` expression.

## How was this patch tested?

Added unit test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20756 from viirya/SPARK-23593.
2018-04-05 13:39:45 +02:00
Kazuaki Ishizaki a35523653c [SPARK-23583][SQL] Invoke should support interpreted execution
## What changes were proposed in this pull request?

This pr added interpreted execution for `Invoke`.

## How was this patch tested?

Added tests in `ObjectExpressionsSuite`.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20797 from kiszk/SPARK-28583.
2018-04-04 18:36:15 +02:00
Takeshi Yamamuro 5197562afe [SPARK-21351][SQL] Update nullability based on children's output
## What changes were proposed in this pull request?
This pr added a new optimizer rule `UpdateNullabilityInAttributeReferences ` to update the nullability that `Filter` changes when having `IsNotNull`. In the master, optimized plans do not respect the nullability when `Filter` has `IsNotNull`. This wrongly generates unnecessary code. For example:

```
scala> val df = Seq((Some(1), Some(2))).toDF("a", "b")
scala> val bIsNotNull = df.where($"b" =!= 2).select($"b")
scala> val targetQuery = bIsNotNull.distinct
scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable
res5: Boolean = true

scala> targetQuery.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*HashAggregate(keys=[b#19], functions=[], output=[b#19])
+- Exchange hashpartitioning(b#19, 200)
   +- *HashAggregate(keys=[b#19], functions=[], output=[b#19])
      +- *Project [_2#16 AS b#19]
         +- *Filter isnotnull(_2#16)
            +- LocalTableScan [_1#15, _2#16]

Generated code:
...
/* 124 */   protected void processNext() throws java.io.IOException {
...
/* 132 */     // output the result
/* 133 */
/* 134 */     while (agg_mapIter.next()) {
/* 135 */       wholestagecodegen_numOutputRows.add(1);
/* 136 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 137 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 138 */
/* 139 */       boolean agg_isNull4 = agg_aggKey.isNullAt(0);
/* 140 */       int agg_value4 = agg_isNull4 ? -1 : (agg_aggKey.getInt(0));
/* 141 */       agg_rowWriter1.zeroOutNullBytes();
/* 142 */
                // We don't need this NULL check because NULL is filtered out in `$"b" =!=2`
/* 143 */       if (agg_isNull4) {
/* 144 */         agg_rowWriter1.setNullAt(0);
/* 145 */       } else {
/* 146 */         agg_rowWriter1.write(0, agg_value4);
/* 147 */       }
/* 148 */       append(agg_result1);
/* 149 */
/* 150 */       if (shouldStop()) return;
/* 151 */     }
/* 152 */
/* 153 */     agg_mapIter.close();
/* 154 */     if (agg_sorter == null) {
/* 155 */       agg_hashMap.free();
/* 156 */     }
/* 157 */   }
/* 158 */
/* 159 */ }
```

In the line 143, we don't need this NULL check because NULL is filtered out in `$"b" =!=2`.
This pr could remove this NULL check;

```
scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable
res5: Boolean = false

scala> targetQuery.debugCodegen
...
Generated code:
...
/* 144 */   protected void processNext() throws java.io.IOException {
...
/* 152 */     // output the result
/* 153 */
/* 154 */     while (agg_mapIter.next()) {
/* 155 */       wholestagecodegen_numOutputRows.add(1);
/* 156 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 157 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 158 */
/* 159 */       int agg_value4 = agg_aggKey.getInt(0);
/* 160 */       agg_rowWriter1.write(0, agg_value4);
/* 161 */       append(agg_result1);
/* 162 */
/* 163 */       if (shouldStop()) return;
/* 164 */     }
/* 165 */
/* 166 */     agg_mapIter.close();
/* 167 */     if (agg_sorter == null) {
/* 168 */       agg_hashMap.free();
/* 169 */     }
/* 170 */   }
```

## How was this patch tested?
Added `UpdateNullabilityInAttributeReferencesSuite` for unit tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #18576 from maropu/SPARK-21351.
2018-04-04 14:39:19 +08:00
Robert Kruszewski 5cfd5fabcd [SPARK-23802][SQL] PropagateEmptyRelation can leave query plan in unresolved state
## What changes were proposed in this pull request?

Add cast to nulls introduced by PropagateEmptyRelation so in cases they're part of coalesce they will not break its type checking rules

## How was this patch tested?

Added unit test

Author: Robert Kruszewski <robertk@palantir.com>

Closes #20914 from robert3005/rk/propagate-empty-fix.
2018-04-03 17:25:54 -07:00
Liang-Chi Hsieh 1035aaa617 [SPARK-23587][SQL] Add interpreted execution for MapObjects expression
## What changes were proposed in this pull request?

Add interpreted execution for `MapObjects` expression.

## How was this patch tested?

Added unit test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20771 from viirya/SPARK-23587.
2018-04-04 01:36:58 +02:00
Kazuaki Ishizaki a7c19d9c21 [SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder classes
## What changes were proposed in this pull request?

This PR implemented the following cleanups related to  `UnsafeWriter` class:
- Remove code duplication between `UnsafeRowWriter` and `UnsafeArrayWriter`
- Make `BufferHolder` class internal by delegating its accessor methods to `UnsafeWriter`
- Replace `UnsafeRow.setTotalSize(...)` with `UnsafeRowWriter.setTotalSize()`

## How was this patch tested?

Tested by existing UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20850 from kiszk/SPARK-23713.
2018-04-02 21:48:44 +02:00
gatorsmile bc8d093117 [SPARK-23500][SQL][FOLLOWUP] Fix complex type simplification rules to apply to entire plan
## What changes were proposed in this pull request?
This PR is to improve the test coverage of the original PR https://github.com/apache/spark/pull/20687

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20911 from gatorsmile/addTests.
2018-03-30 23:21:07 +08:00
yucai b02e76cbff [SPARK-23727][SQL] Support for pushing down filters for DateType in parquet
## What changes were proposed in this pull request?

This PR supports for pushing down filters for DateType in parquet

## How was this patch tested?

Added UT and tested in local.

Author: yucai <yyu1@ebay.com>

Closes #20851 from yucai/SPARK-23727.
2018-03-30 15:07:38 +08:00
hyukjinkwon 34c4b9c57e [SPARK-23765][SQL] Supports custom line separator for json datasource
## What changes were proposed in this pull request?

This PR proposes to add lineSep option for a configurable line separator in text datasource.
It supports this option by using `LineRecordReader`'s functionality with passing it to the constructor.

The approach is similar with https://github.com/apache/spark/pull/20727; however, one main difference is, it uses text datasource's `lineSep` option to parse line by line in JSON's schema inference.

## How was this patch tested?

Manually tested and unit tests were added.

Author: hyukjinkwon <gurwls223@apache.org>
Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20877 from HyukjinKwon/linesep-json.
2018-03-28 19:49:27 +08:00
Liang-Chi Hsieh 35997b59f3 [SPARK-23794][SQL] Make UUID as stateful expression
## What changes were proposed in this pull request?

The UUID() expression is stateful and should implement the `Stateful` trait instead of the `Nondeterministic` trait.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20912 from viirya/SPARK-23794.
2018-03-27 14:49:50 +02:00
Kazuaki Ishizaki e4bec7cb88 [SPARK-23549][SQL] Cast to timestamp when comparing timestamp with date
## What changes were proposed in this pull request?

This PR fixes an incorrect comparison in SQL between timestamp and date. This is because both of them are casted to `string` and then are compared lexicographically. This implementation shows `false` regarding this query `spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as date) and cast('2017-03-01' as date)").show`.

This PR shows `true` for this query by casting `date("2017-03-01")` to `timestamp("2017-03-01 00:00:00")`.

(Please fill in changes proposed in this fix)

## How was this patch tested?

Added new UTs to `TypeCoercionSuite`.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20774 from kiszk/SPARK-23549.
2018-03-25 16:38:49 -07:00
Liang-Chi Hsieh 4d37008c78 [SPARK-23599][SQL] Use RandomUUIDGenerator in Uuid expression
## What changes were proposed in this pull request?

As stated in Jira, there are problems with current `Uuid` expression which uses `java.util.UUID.randomUUID` for UUID generation.

This patch uses the newly added `RandomUUIDGenerator` for UUID generation. So we can make `Uuid` deterministic between retries.

## How was this patch tested?

Added unit tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20861 from viirya/SPARK-23599-2.
2018-03-22 19:57:32 +01:00
Kris Mok 95e51ff849 [SPARK-23760][SQL] CodegenContext.withSubExprEliminationExprs should save/restore CSE state correctly
## What changes were proposed in this pull request?

Fixed `CodegenContext.withSubExprEliminationExprs()` so that it saves/restores CSE state correctly.

## How was this patch tested?

Added new unit test to verify that the old CSE state is indeed saved and restored around the `withSubExprEliminationExprs()` call. Manually verified that this test fails without this patch.

Author: Kris Mok <kris.mok@databricks.com>

Closes #20870 from rednaxelafx/codegen-subexpr-fix.
2018-03-21 21:21:36 -07:00
Takeshi Yamamuro 983e8d9d64 [SPARK-23666][SQL] Do not display exprIds of Alias in user-facing info.
## What changes were proposed in this pull request?
To drop `exprId`s for `Alias` in user-facing info., this pr added an entry for `Alias` in `NonSQLExpression.sql`

## How was this patch tested?
Added tests in `UDFSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20827 from maropu/SPARK-23666.
2018-03-20 23:17:49 -07:00
Henry Robinson 477d6bd726 [SPARK-23500][SQL] Fix complex type simplification rules to apply to entire plan
## What changes were proposed in this pull request?

Complex type simplification optimizer rules were not applied to the
entire plan, just the expressions reachable from the root node. This
patch fixes the rules to transform the entire plan.

## How was this patch tested?

New unit test + ran sql / core tests.

Author: Henry Robinson <henry@apache.org>
Author: Henry Robinson <henry@cloudera.com>

Closes #20687 from henryr/spark-25000.
2018-03-20 13:27:50 -07:00
Liang-Chi Hsieh 4de638c197 [SPARK-23599][SQL] Add a UUID generator from Pseudo-Random Numbers
## What changes were proposed in this pull request?

This patch adds a UUID generator from Pseudo-Random Numbers. We can use it later to have deterministic `UUID()` expression.

## How was this patch tested?

Added unit tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20817 from viirya/SPARK-23599.
2018-03-19 09:41:43 +01:00
Herman van Hovell 88d8de9260 [SPARK-23581][SQL] Add interpreted unsafe projection
## What changes were proposed in this pull request?
We currently can only create unsafe rows using code generation. This is a problem for situations in which code generation fails. There is no fallback, and as a result we cannot execute the query.

This PR adds an interpreted version of `UnsafeProjection`. The implementation is modeled after `InterpretedMutableProjection`. It stores the expression results in a `GenericInternalRow`, and it then uses a conversion function to convert the `GenericInternalRow` into an `UnsafeRow`.

This PR does not implement the actual code generated to interpreted fallback logic. This will be done in a follow-up.

## How was this patch tested?
I am piggybacking on exiting `UnsafeProjection` tests, and I have added an interpreted version for each of these.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #20750 from hvanhovell/SPARK-23581.
2018-03-16 18:28:16 +01:00
Kazuaki Ishizaki 23370554d0 [SPARK-23656][TEST] Perform assertions in XXH64Suite.testKnownByteArrayInputs() on big endian platform, too
## What changes were proposed in this pull request?

This PR enables assertions in `XXH64Suite.testKnownByteArrayInputs()` on big endian platform, too. The current implementation performs them only on little endian platform. This PR increase test coverage of big endian platform.

## How was this patch tested?

Updated `XXH64Suite`
Tested on big endian platform using JIT compiler or interpreter `-Xint`.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20804 from kiszk/SPARK-23656.
2018-03-13 15:20:09 +01:00
Xiayun Sun b304e07e06 [SPARK-23462][SQL] improve missing field error message in StructType
## What changes were proposed in this pull request?

The error message ```s"""Field "$name" does not exist."""``` is thrown when looking up an unknown field in StructType. In the error message, we should also contain the information about which columns/fields exist in this struct.

## How was this patch tested?

Added new unit tests.

Note: I created a new `StructTypeSuite.scala` as I couldn't find an existing suite that's suitable to place these tests. I may be missing something so feel free to propose new locations.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xiayun Sun <xiayunsun@gmail.com>

Closes #20649 from xysun/SPARK-23462.
2018-03-12 22:13:28 +09:00
Michał Świtakowski 2ca9bb083c [SPARK-23173][SQL] Avoid creating corrupt parquet files when loading data from JSON
## What changes were proposed in this pull request?

The from_json() function accepts an additional parameter, where the user might specify the schema. The issue is that the specified schema might not be compatible with data. In particular, the JSON data might be missing data for fields declared as non-nullable in the schema. The from_json() function does not verify the data against such errors. When data with missing fields is sent to the parquet encoder, there is no verification either. The end results is a corrupt parquet file.

To avoid corruptions, make sure that all fields in the user-specified schema are set to be nullable.
Since this changes the behavior of a public function, we need to include it in release notes.
The behavior can be reverted by setting `spark.sql.fromJsonForceNullableSchema=false`

## How was this patch tested?

Added two new tests.

Author: Michał Świtakowski <michal.switakowski@databricks.com>

Closes #20694 from mswit-databricks/SPARK-23173.
2018-03-09 14:29:31 -08:00
Marco Gaido e7bbca8896 [SPARK-23602][SQL] PrintToStderr prints value also in interpreted mode
## What changes were proposed in this pull request?

`PrintToStderr` was doing what is it supposed to only when code generation is enabled.
The PR adds the same behavior in interpreted mode too.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20773 from mgaido91/SPARK-23602.
2018-03-08 22:02:28 +01:00
Marco Gaido ea480990e7 [SPARK-23628][SQL] calculateParamLength should not return 1 + num of epressions
## What changes were proposed in this pull request?

There was a bug in `calculateParamLength` which caused it to return always 1 + the number of expressions. This could lead to Exceptions especially with expressions of type long.

## How was this patch tested?

added UT + fixed previous UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20772 from mgaido91/SPARK-23628.
2018-03-08 11:09:15 -08:00
Marco Gaido 92e7ecbbbd [SPARK-23592][SQL] Add interpreted execution to DecodeUsingSerializer
## What changes were proposed in this pull request?

The PR adds interpreted execution to DecodeUsingSerializer.

## How was this patch tested?
added UT

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20760 from mgaido91/SPARK-23592.
2018-03-08 14:18:14 +01:00
hyukjinkwon d6632d185e [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in toPandas/createDataFrame with Pandas DataFrame
## What changes were proposed in this pull request?

This PR adds a configuration to control the fallback of Arrow optimization for `toPandas` and `createDataFrame` with Pandas DataFrame.

## How was this patch tested?

Manually tested and unit tests added.

You can test this by:

**`createDataFrame`**

```python
spark.conf.set("spark.sql.execution.arrow.enabled", False)
pdf = spark.createDataFrame([[{'a': 1}]]).toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", True)
spark.createDataFrame(pdf, "a: map<string, int>")
```

```python
spark.conf.set("spark.sql.execution.arrow.enabled", False)
pdf = spark.createDataFrame([[{'a': 1}]]).toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", False)
spark.createDataFrame(pdf, "a: map<string, int>")
```

**`toPandas`**

```python
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", True)
spark.createDataFrame([[{'a': 1}]]).toPandas()
```

```python
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", False)
spark.createDataFrame([[{'a': 1}]]).toPandas()
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20678 from HyukjinKwon/SPARK-23380-conf.
2018-03-08 20:22:07 +09:00
Marco Gaido aff7d81cb7 [SPARK-23591][SQL] Add interpreted execution to EncodeUsingSerializer
## What changes were proposed in this pull request?

The PR adds interpreted execution to EncodeUsingSerializer.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20751 from mgaido91/SPARK-23591.
2018-03-07 18:31:59 +01:00
Takeshi Yamamuro 33c2cb22b3 [SPARK-23611][SQL] Add a helper function to check exception for expr evaluation
## What changes were proposed in this pull request?
This pr added a helper function in `ExpressionEvalHelper` to check exceptions in all the path of expression evaluation.

## How was this patch tested?
Modified the existing tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20748 from maropu/SPARK-23611.
2018-03-07 13:10:51 +01:00
Marco Gaido 4c587eb488 [SPARK-23590][SQL] Add interpreted execution to CreateExternalRow
## What changes were proposed in this pull request?

The PR adds interpreted execution to CreateExternalRow

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20749 from mgaido91/SPARK-23590.
2018-03-06 17:42:17 +01:00
Takeshi Yamamuro e8a259d66d [SPARK-23594][SQL] GetExternalRowField should support interpreted execution
## What changes were proposed in this pull request?
This pr added interpreted execution for `GetExternalRowField`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20746 from maropu/SPARK-23594.
2018-03-06 13:55:13 +01:00
Marco Gaido f6b49f9d1b [SPARK-23586][SQL] Add interpreted execution to WrapOption
## What changes were proposed in this pull request?

The PR adds interpreted execution to WrapOption.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20741 from mgaido91/SPARK-23586_2.
2018-03-06 01:37:51 +01:00
Marco Gaido ba622f45ca [SPARK-23585][SQL] Add interpreted execution to UnwrapOption
## What changes were proposed in this pull request?

The PR adds interpreted execution to UnwrapOption.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20736 from mgaido91/SPARK-23586.
2018-03-05 20:43:03 +01:00
Mihaly Toth a366b950b9 [SPARK-23329][SQL] Fix documentation of trigonometric functions
## What changes were proposed in this pull request?

Provide more details in trigonometric function documentations. Referenced `java.lang.Math` for further details in the descriptions.
## How was this patch tested?

Ran full build, checked generated documentation manually

Author: Mihaly Toth <misutoth@gmail.com>

Closes #20618 from misutoth/trigonometric-doc.
2018-03-05 23:46:40 +09:00
Kazuaki Ishizaki 2ce37b50fc [SPARK-23546][SQL] Refactor stateless methods/values in CodegenContext
## What changes were proposed in this pull request?

A current `CodegenContext` class has immutable value or method without mutable state, too.
This refactoring moves them to `CodeGenerator` object class which can be accessed from anywhere without an instantiated `CodegenContext` in the program.

## How was this patch tested?

Existing tests

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20700 from kiszk/SPARK-23546.
2018-03-05 11:39:01 +01:00
Eric Liang a89cdf55fa [SQL][MINOR] XPathDouble prettyPrint should say 'double' not 'float'
## What changes were proposed in this pull request?

It looks like this was incorrectly copied from `XPathFloat` in the class above.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Eric Liang <ekhliang@gmail.com>

Closes #20730 from ericl/fix-typo-xpath.
2018-03-05 07:32:24 +09:00
Feng Liu 3a4d15e5d2 [SPARK-23518][SQL] Avoid metastore access when the users only want to read and write data frames
## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/18944 added one patch, which allowed a spark session to be created when the hive metastore server is down. However, it did not allow running any commands with the spark session. This brings troubles to the user who only wants to read / write data frames without metastore setup.

## How was this patch tested?

Added some unit tests to read and write data frames based on the original HiveMetastoreLazyInitializationSuite.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Feng Liu <fengliu@databricks.com>

Closes #20681 from liufengdb/completely-lazy.
2018-03-02 10:38:50 -08:00
KaiXinXiaoLei cdcccd7b41 [SPARK-23405] Generate additional constraints for Join's children
## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)
I run a sql: `select ls.cs_order_number from ls left semi join catalog_sales cs on ls.cs_order_number = cs.cs_order_number`, The `ls` table is a small table ,and the number is one. The `catalog_sales` table is a big table,  and the number is 10 billion. The task will be hang up. And i find the many null values of `cs_order_number` in the `catalog_sales` table. I think the null value should be removed in the logical plan.

>== Optimized Logical Plan ==
>Join LeftSemi, (cs_order_number#1 = cs_order_number#22)
>:- Project cs_order_number#1
>   : +- Filter isnotnull(cs_order_number#1)
>      : +- MetastoreRelation 100t, ls
>+- Project cs_order_number#22
>   +- MetastoreRelation 100t, catalog_sales

Now, use this patch, the plan will be:
>== Optimized Logical Plan ==
>Join LeftSemi, (cs_order_number#1 = cs_order_number#22)
>:- Project cs_order_number#1
>   : +- Filter isnotnull(cs_order_number#1)
>      : +- MetastoreRelation 100t, ls
>+- Project cs_order_number#22
>   : **+- Filter isnotnull(cs_order_number#22)**
>     :+- MetastoreRelation 100t, catalog_sales

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: KaiXinXiaoLei <584620569@qq.com>
Author: hanghang <584620569@qq.com>

Closes #20670 from KaiXinXiaoLei/Spark-23405.
2018-03-02 00:09:44 +08:00
Liang-Chi Hsieh b14993e1fc [SPARK-23448][SQL] Clarify JSON and CSV parser behavior in document
## What changes were proposed in this pull request?

Clarify JSON and CSV reader behavior in document.

JSON doesn't support partial results for corrupted records.
CSV only supports partial results for the records with more or less tokens.

## How was this patch tested?

Pass existing tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20666 from viirya/SPARK-23448-2.
2018-02-28 11:00:54 +09:00
gatorsmile 414ee867ba [SPARK-23523][SQL] Fix the incorrect result caused by the rule OptimizeMetadataOnlyQuery
## What changes were proposed in this pull request?
```Scala
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
 Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
 .write.json(tablePath.getCanonicalPath)
 val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
 df.show()
```

It generates a wrong result.
```
[c,e,a]
```

We have a bug in the rule `OptimizeMetadataOnlyQuery `. We should respect the attribute order in the original leaf node. This PR is to fix it.

## How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20684 from gatorsmile/optimizeMetadataOnly.
2018-02-27 08:44:25 -08:00
Juliusz Sompolski 8077bb04f3 [SPARK-23445] ColumnStat refactoring
## What changes were proposed in this pull request?

Refactor ColumnStat to be more flexible.

* Split `ColumnStat` and `CatalogColumnStat` just like `CatalogStatistics` is split from `Statistics`. This detaches how the statistics are stored from how they are processed in the query plan. `CatalogColumnStat` keeps `min` and `max` as `String`, making it not depend on dataType information.
* For `CatalogColumnStat`, parse column names from property names in the metastore (`KEY_VERSION` property), not from metastore schema. This means that `CatalogColumnStat`s can be created for columns even if the schema itself is not stored in the metastore.
* Make all fields optional. `min`, `max` and `histogram` for columns were optional already. Having them all optional is more consistent, and gives flexibility to e.g. drop some of the fields through transformations if they are difficult / impossible to calculate.

The added flexibility will make it possible to have alternative implementations for stats, and separates stats collection from stats and estimation processing in plans.

## How was this patch tested?

Refactored existing tests to work with refactored `ColumnStat` and `CatalogColumnStat`.
New tests added in `StatisticsSuite` checking that backwards / forwards compatibility is not broken.

Author: Juliusz Sompolski <julek@databricks.com>

Closes #20624 from juliuszsompolski/SPARK-23445.
2018-02-26 23:37:31 -08:00
Kazuaki Ishizaki 95e25ed1a8 [SPARK-23424][SQL] Add codegenStageId in comment
## What changes were proposed in this pull request?

This PR always adds `codegenStageId` in comment of the generated class. This is a replication of #20419  for post-Spark 2.3.
Closes #20419

```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
...
```

## How was this patch tested?

Existing tests

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20612 from kiszk/SPARK-23424.
2018-02-21 11:26:06 +08:00
Dongjoon Hyun 83c008762a [SPARK-23456][SPARK-21783] Turn on native ORC impl and PPD by default
## What changes were proposed in this pull request?

Apache Spark 2.3 introduced `native` ORC supports with vectorization and many fixes. However, it's shipped as a not-default option. This PR enables `native` ORC implementation and predicate-pushdown by default for Apache Spark 2.4. We will improve and stabilize ORC data source before Apache Spark 2.4. And, eventually, Apache Spark will drop old Hive-based ORC code.

## How was this patch tested?

Pass the Jenkins with existing tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20634 from dongjoon-hyun/SPARK-23456.
2018-02-20 09:14:56 -08:00
Kris Mok 15ad4a7f10 [SPARK-23447][SQL] Cleanup codegen template for Literal
## What changes were proposed in this pull request?

Cleaned up the codegen templates for `Literal`s, to make sure that the `ExprCode` returned from `Literal.doGenCode()` has:
1. an empty `code` field;
2. an `isNull` field of either literal `true` or `false`;
3. a `value` field that is just a simple literal/constant.

Before this PR, there are a couple of paths that would return a non-trivial `code` and all of them are actually unnecessary. The `NaN` and `Infinity` constants for `double` and `float` can be accessed through constants directly available so there's no need to add a reference for them.

Also took the opportunity to add a new util method for ease of creating `ExprCode` for inline-able non-null values.

## How was this patch tested?

Existing tests.

Author: Kris Mok <kris.mok@databricks.com>

Closes #20626 from rednaxelafx/codegen-literal.
2018-02-17 10:54:14 +08:00
Tathagata Das 0a73aa31f4 [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
## What changes were proposed in this pull request?
Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with data source v2).

Performance comparison:
In a unit test with in-process Kafka broker, I tested the read throughput of V1 and V2 using 20M records in a single partition. They were comparable.

## How was this patch tested?
Existing tests, few modified to be better tests than the existing ones.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20554 from tdas/SPARK-23362.
2018-02-16 14:30:19 -08:00
Dongjoon Hyun 2f0498d1e8 [SPARK-23426][SQL] Use hive ORC impl and disable PPD for Spark 2.3.0
## What changes were proposed in this pull request?

To prevent any regressions, this PR changes ORC implementation to `hive` by default like Spark 2.2.X.
Users can enable `native` ORC. Also, ORC PPD is also restored to `false` like Spark 2.2.X.

![orc_section](https://user-images.githubusercontent.com/9700541/36221575-57a1d702-1173-11e8-89fe-dca5842f4ca7.png)

## How was this patch tested?

Pass all test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20610 from dongjoon-hyun/SPARK-ORC-DISABLE.
2018-02-15 08:55:39 -08:00
hyukjinkwon ed86476098 [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames' in Scala's StructType
## What changes were proposed in this pull request?

This PR proposes to add an alias 'names' of  'fieldNames' in Scala. Please see the discussion in [SPARK-20090](https://issues.apache.org/jira/browse/SPARK-20090).

## How was this patch tested?

Unit tests added in `DataTypeSuite.scala`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20545 from HyukjinKwon/SPARK-23359.
2018-02-15 17:13:05 +08:00
gatorsmile 95e4b49160 [SPARK-23094] Revert [] Fix invalid character handling in JsonDataSource
## What changes were proposed in this pull request?
This PR is to revert the PR https://github.com/apache/spark/pull/20302, because it causes a regression.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20614 from gatorsmile/revertJsonFix.
2018-02-14 23:56:02 -08:00
gatorsmile 2ee76c22b6 [SPARK-23400][SQL] Add a constructors for ScalaUDF
## What changes were proposed in this pull request?

In this upcoming 2.3 release, we changed the interface of `ScalaUDF`. Unfortunately, some Spark packages (e.g., spark-deep-learning) are using our internal class `ScalaUDF`. In the release 2.3, we added new parameters into this class. The users hit the binary compatibility issues and got the exception:

```
> java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF.&lt;init&gt;(Ljava/lang/Object;Lorg/apache/spark/sql/types/DataType;Lscala/collection/Seq;Lscala/collection/Seq;Lscala/Option;)V
```

This PR is to improve the backward compatibility. However, we definitely should not encourage the external packages to use our internal classes. This might make us hard to maintain/develop the codes in Spark.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20591 from gatorsmile/scalaUDF.
2018-02-13 11:56:49 -08:00
Bogdan Raducanu 05d051293f [SPARK-23316][SQL] AnalysisException after max iteration reached for IN query
## What changes were proposed in this pull request?
Added flag ignoreNullability to DataType.equalsStructurally.
The previous semantic is for ignoreNullability=false.
When ignoreNullability=true equalsStructurally ignores nullability of contained types (map key types, value types, array element types, structure field types).
In.checkInputTypes calls equalsStructurally to check if the children types match. They should match regardless of nullability (which is just a hint), so it is now called with ignoreNullability=true.

## How was this patch tested?
New test in SubquerySuite

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #20548 from bogdanrdc/SPARK-23316.
2018-02-13 09:49:52 -08:00
hyukjinkwon c338c8cf82 [SPARK-23352][PYTHON] Explicitly specify supported types in Pandas UDFs
## What changes were proposed in this pull request?

This PR targets to explicitly specify supported types in Pandas UDFs.
The main change here is to add a deduplicated and explicit type checking in `returnType` ahead with documenting this; however, it happened to fix multiple things.

1. Currently, we don't support `BinaryType` in Pandas UDFs, for example, see:

    ```python
    from pyspark.sql.functions import pandas_udf
    pudf = pandas_udf(lambda x: x, "binary")
    df = spark.createDataFrame([[bytearray(1)]])
    df.select(pudf("_1")).show()
    ```
    ```
    ...
    TypeError: Unsupported type in conversion to Arrow: BinaryType
    ```

    We can document this behaviour for its guide.

2. Also, the grouped aggregate Pandas UDF fails fast on `ArrayType` but seems we can support this case.

    ```python
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    foo = pandas_udf(lambda v: v.mean(), 'array<double>', PandasUDFType.GROUPED_AGG)
    df = spark.range(100).selectExpr("id", "array(id) as value")
    df.groupBy("id").agg(foo("value")).show()
    ```

    ```
    ...
     NotImplementedError: ArrayType, StructType and MapType are not supported with PandasUDFType.GROUPED_AGG
    ```

3. Since we can check the return type ahead, we can fail fast before actual execution.

    ```python
    # we can fail fast at this stage because we know the schema ahead
    pandas_udf(lambda x: x, BinaryType())
    ```

## How was this patch tested?

Manually tested and unit tests for `BinaryType` and `ArrayType(...)` were added.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20531 from HyukjinKwon/pudf-cleanup.
2018-02-12 20:49:36 +09:00
gatorsmile c36fecc3b4 [SPARK-23327][SQL] Update the description and tests of three external API or functions
## What changes were proposed in this pull request?
Update the description and tests of three external API or functions `createFunction `, `length` and `repartitionByRange `

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20495 from gatorsmile/updateFunc.
2018-02-06 16:46:43 -08:00
caoxuewen 63b49fa2e5 [SPARK-23311][SQL][TEST] add FilterFunction test case for test CombineTypedFilters
## What changes were proposed in this pull request?

In the current test case for CombineTypedFilters, we lack the test of FilterFunction, so let's add it.
In addition, in TypedFilterOptimizationSuite's existing test cases, Let's extract a common LocalRelation.

## How was this patch tested?

add new test cases.

Author: caoxuewen <cao.xuewen@zte.com.cn>

Closes #20482 from heary-cao/TypedFilterOptimizationSuite.
2018-02-03 00:02:03 -08:00
Wenchen Fan b9503fcbb3 [SPARK-23312][SQL] add a config to turn off vectorized cache reader
## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-23309 reported a performance regression about cached table in Spark 2.3. While the investigating is still going on, this PR adds a conf to turn off the vectorized cache reader, to unblock the 2.3 release.

## How was this patch tested?

a new test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20483 from cloud-fan/cache.
2018-02-02 22:43:28 +08:00
Xingbo Jiang b6b50efc85 [SQL][MINOR] Inline SpecifiedWindowFrame.defaultWindowFrame().
## What changes were proposed in this pull request?

SpecifiedWindowFrame.defaultWindowFrame(hasOrderSpecification, acceptWindowFrame) was designed to handle the cases when some Window functions don't support setting a window frame (e.g. rank). However this param is never used.

We may inline the whole of this function to simplify the code.

## How was this patch tested?

Existing tests.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #20463 from jiangxb1987/defaultWindowFrame.
2018-01-31 20:59:19 -08:00
Xingbo Jiang cc41245fa3 [SPARK-23188][SQL] Make vectorized columar reader batch size configurable
## What changes were proposed in this pull request?

This PR include the following changes:
- Make the capacity of `VectorizedParquetRecordReader` configurable;
- Make the capacity of `OrcColumnarBatchReader` configurable;
- Update the error message when required capacity in writable columnar vector cannot be fulfilled.

## How was this patch tested?

N/A

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #20361 from jiangxb1987/vectorCapacity.
2018-02-01 12:56:07 +08:00
Dilip Biswal 9ff1d96f01 [SPARK-23281][SQL] Query produces results in incorrect order when a composite order by clause refers to both original columns and aliases
## What changes were proposed in this pull request?
Here is the test snippet.
``` SQL
scala> Seq[(Integer, Integer)](
     |         (1, 1),
     |         (1, 3),
     |         (2, 3),
     |         (3, 3),
     |         (4, null),
     |         (5, null)
     |       ).toDF("key", "value").createOrReplaceTempView("src")

scala> sql(
     |         """
     |           |SELECT MAX(value) as value, key as col2
     |           |FROM src
     |           |GROUP BY key
     |           |ORDER BY value desc, key
     |         """.stripMargin).show
+-----+----+
|value|col2|
+-----+----+
|    3|   3|
|    3|   2|
|    3|   1|
| null|   5|
| null|   4|
+-----+----+
```SQL
Here is the explain output :

```SQL
== Parsed Logical Plan ==
'Sort ['value DESC NULLS LAST, 'key ASC NULLS FIRST], true
+- 'Aggregate ['key], ['MAX('value) AS value#9, 'key AS col2#10]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
value: int, col2: int
Project [value#9, col2#10]
+- Sort [value#9 DESC NULLS LAST, col2#10 DESC NULLS LAST], true
   +- Aggregate [key#5], [max(value#6) AS value#9, key#5 AS col2#10]
      +- SubqueryAlias src
         +- Project [_1#2 AS key#5, _2#3 AS value#6]
            +- LocalRelation [_1#2, _2#3]
``` SQL
The sort direction is being wrongly changed from ASC to DSC while resolving ```Sort``` in
resolveAggregateFunctions.

The above testcase models TPCDS-Q71 and thus we have the same issue in Q71 as well.

## How was this patch tested?
A few tests are added in SQLQuerySuite.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #20453 from dilipbiswal/local_spark.
2018-01-31 13:52:47 -08:00
gatorsmile ca04c3ff23 [SPARK-23274][SQL] Fix ReplaceExceptWithFilter when the right's Filter contains the references that are not in the left output
## What changes were proposed in this pull request?
This PR is to fix the `ReplaceExceptWithFilter` rule when the right's Filter contains the references that are not in the left output.

Before this PR, we got the error like
```
java.util.NoSuchElementException: key not found: a
  at scala.collection.MapLike$class.default(MapLike.scala:228)
  at scala.collection.AbstractMap.default(Map.scala:59)
  at scala.collection.MapLike$class.apply(MapLike.scala:141)
  at scala.collection.AbstractMap.apply(Map.scala:59)
```

After this PR, `ReplaceExceptWithFilter ` will not take an effect in this case.

## How was this patch tested?
Added tests

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20444 from gatorsmile/fixReplaceExceptWithFilter.
2018-01-30 20:05:57 -08:00
gatorsmile 31c00ad8b0 [SPARK-23267][SQL] Increase spark.sql.codegen.hugeMethodLimit to 65535
## What changes were proposed in this pull request?
Still saw the performance regression introduced by `spark.sql.codegen.hugeMethodLimit` in our internal workloads. There are two major issues in the current solution.
- The size of the complied byte code is not identical to the bytecode size of the method. The detection is still not accurate.
- The bytecode size of a single operator (e.g., `SerializeFromObject`) could still exceed 8K limit. We saw the performance regression in such scenario.

Since it is close to the release of 2.3, we decide to increase it to 64K for avoiding the perf regression.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20434 from gatorsmile/revertConf.
2018-01-30 11:33:30 -08:00
gatorsmile 7a2ada223e [SPARK-23261][PYSPARK] Rename Pandas UDFs
## What changes were proposed in this pull request?
Rename the public APIs and names of pandas udfs.

- `PANDAS SCALAR UDF` -> `SCALAR PANDAS UDF`
- `PANDAS GROUP MAP UDF` -> `GROUPED MAP PANDAS UDF`
- `PANDAS GROUP AGG UDF` -> `GROUPED AGG PANDAS UDF`

## How was this patch tested?
The existing tests

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20428 from gatorsmile/renamePandasUDFs.
2018-01-30 21:55:55 +09:00
Bryan Cutler f235df66a4 [SPARK-22221][SQL][FOLLOWUP] Externalize spark.sql.execution.arrow.maxRecordsPerBatch
## What changes were proposed in this pull request?

This is a followup to #19575 which added a section on setting max Arrow record batches and this will externalize the conf that was referenced in the docs.

## How was this patch tested?
NA

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20423 from BryanCutler/arrow-user-doc-externalize-maxRecordsPerBatch-SPARK-22221.
2018-01-29 17:37:55 -08:00