Commit graph

6742 commits

Author SHA1 Message Date
debugger87 c04cb2d1b7 [SPARK-21687][SQL] Spark SQL should set createTime for Hive partition
## What changes were proposed in this pull request?

Set createTime for every hive partition created in Spark SQL, which could be used to manage data lifecycle in Hive warehouse. We found  that almost every partition modified by spark sql has not been set createTime.

```
mysql> select * from partitions where create_time=0 limit 1\G;
*************************** 1. row ***************************
         PART_ID: 1028584
     CREATE_TIME: 0
LAST_ACCESS_TIME: 1502203611
       PART_NAME: date=20170130
           SD_ID: 1543605
          TBL_ID: 211605
  LINK_TARGET_ID: NULL
1 row in set (0.27 sec)
```

## How was this patch tested?
 N/A

Author: debugger87 <yangchaozhong.2009@gmail.com>
Author: Chaozhong Yang <yangchaozhong.2009@gmail.com>

Closes #18900 from debugger87/fix/set-create-time-for-hive-partition.
2018-06-27 11:34:28 -07:00
Yuanjian Li 6a0b77a55d [SPARK-24215][PYSPARK][FOLLOW UP] Implement eager evaluation for DataFrame APIs in PySpark
## What changes were proposed in this pull request?

Address comments in #21370 and add more test.

## How was this patch tested?

Enhance test in pyspark/sql/test.py and DataFrameSuite

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes #21553 from xuanyuanking/SPARK-24215-follow.
2018-06-27 10:43:06 -07:00
Takuya UESHIN 9a76f23c6a [SPARK-23927][SQL][FOLLOW-UP] Fix a build failure.
## What changes were proposed in this pull request?

This pr is a follow-up pr of #21155.
The #21155 removed unnecessary import at that time, but the import became necessary in another pr.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21646 from ueshin/issues/SPARK-23927/fup1.
2018-06-27 11:52:48 +08:00
Vayda, Oleksandr: IT (PRG) 2669b4de3b [SPARK-23927][SQL] Add "sequence" expression
## What changes were proposed in this pull request?
The PR adds the SQL function ```sequence```.
https://issues.apache.org/jira/browse/SPARK-23927

The behavior of the function is based on Presto's one.
Ref: https://prestodb.io/docs/current/functions/array.html

- ```sequence(start, stop) → array<bigint>```
Generate a sequence of integers from ```start``` to ```stop```, incrementing by ```1``` if ```start``` is less than or equal to ```stop```, otherwise ```-1```.
- ```sequence(start, stop, step) → array<bigint>```
Generate a sequence of integers from ```start``` to ```stop```, incrementing by ```step```.
- ```sequence(start_date, stop_date) → array<date>```
Generate a sequence of dates from ```start_date``` to ```stop_date```, incrementing by ```interval 1 day``` if ```start_date``` is less than or equal to ```stop_date```, otherwise ```- interval 1 day```.
- ```sequence(start_date, stop_date, step_interval) → array<date>```
Generate a sequence of dates from ```start_date``` to ```stop_date```, incrementing by ```step_interval```. The type of ```step_interval``` is ```CalendarInterval```.
- ```sequence(start_timestemp, stop_timestemp) → array<timestamp>```
Generate a sequence of timestamps from ```start_timestamps``` to ```stop_timestamps```, incrementing by ```interval 1 day``` if ```start_date``` is less than or equal to ```stop_date```, otherwise ```- interval 1 day```.
- ```sequence(start_timestamp, stop_timestamp, step_interval) → array<timestamp>```
Generate a sequence of timestamps from ```start_timestamps``` to ```stop_timestamps```, incrementing by ```step_interval```. The type of ```step_interval``` is ```CalendarInterval```.

## How was this patch tested?

Added unit tests.

Author: Vayda, Oleksandr: IT (PRG) <Oleksandr.Vayda@barclayscapital.com>

Closes #21155 from wajda/feature/array-api-sequence.
2018-06-27 11:52:31 +09:00
Maxim Gekk d08f53dc61 [SPARK-24605][SQL] size(null) returns null instead of -1
## What changes were proposed in this pull request?

In PR, I propose new behavior of `size(null)` under the config flag `spark.sql.legacy.sizeOfNull`. If the former one is disabled, the `size()` function returns `null` for `null` input. By default the `spark.sql.legacy.sizeOfNull` is enabled to keep backward compatibility with previous versions. In that case, `size(null)` returns `-1`.

## How was this patch tested?

Modified existing tests for the `size()` function to check new behavior (`null`) and old one (`-1`).

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21598 from MaxGekk/legacy-size-of-null.
2018-06-27 10:36:51 +08:00
Kris Mok 1b9368f7d4 [SPARK-24659][SQL] GenericArrayData.equals should respect element type differences
## What changes were proposed in this pull request?

Fix `GenericArrayData.equals`, so that it respects the actual types of the elements.
e.g. an instance that represents an `array<int>` and another instance that represents an `array<long>` should be considered incompatible, and thus should return false for `equals`.

`GenericArrayData` doesn't keep any schema information by itself, and rather relies on the Java objects referenced by its `array` field's elements to keep track of their own object types. So, the most straightforward way to respect their types is to call `equals` on the elements, instead of using Scala's `==` operator, which can have semantics that are not always desirable:
```
new java.lang.Integer(123) == new java.lang.Long(123L) // true in Scala
new java.lang.Integer(123).equals(new java.lang.Long(123L)) // false in Scala
```

## How was this patch tested?

Added unit test in `ComplexDataSuite`

Author: Kris Mok <kris.mok@databricks.com>

Closes #21643 from rednaxelafx/fix-genericarraydata-equals.
2018-06-27 10:27:40 +08:00
Dilip Biswal 02f8781fa2 [SPARK-24423][SQL] Add a new option for JDBC sources
## What changes were proposed in this pull request?
Here is the description in the JIRA -

Currently, our JDBC connector provides the option `dbtable` for users to specify the to-be-loaded JDBC source table.

 ```SQL
 val jdbcDf = spark.read
   .format("jdbc")
   .option("dbtable", "dbName.tableName")
   .options(jdbcCredentials: Map)
   .load()
 ```

Normally, users do not fetch the whole JDBC table due to the poor performance/throughput of JDBC. Thus, they normally just fetch a small set of tables. For advanced users, they can pass a subquery as the option.

 ```SQL
 val query = """ (select * from tableName limit 10) as tmp """
 val jdbcDf = spark.read
   .format("jdbc")
   .option("dbtable", query)
   .options(jdbcCredentials: Map)
   .load()
 ```
However, this is straightforward to end users. We should simply allow users to specify the query by a new option `query`. We will handle the complexity for them.

 ```SQL
 val query = """select * from tableName limit 10"""
 val jdbcDf = spark.read
   .format("jdbc")
   .option("query", query)
   .options(jdbcCredentials: Map)
   .load()
```

## How was this patch tested?
Added tests in JDBCSuite and JDBCWriterSuite.
Also tested against MySQL, Postgress, Oracle, DB2 (using docker infrastructure) to make sure there are no syntax issues.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21590 from dilipbiswal/SPARK-24423.
2018-06-26 15:17:00 -07:00
Yuming Wang dcaa49ff1e [SPARK-24658][SQL] Remove workaround for ANTLR bug
## What changes were proposed in this pull request?

Issue antlr/antlr4#781 has already been fixed, so the workaround of extracting the pattern into a separate rule is no longer needed. The presto already removed it: https://github.com/prestodb/presto/pull/10744.

## How was this patch tested?

Existing tests

Author: Yuming Wang <yumwang@ebay.com>

Closes #21641 from wangyum/ANTLR-780.
2018-06-26 14:33:04 -07:00
Marek Novotny e07aee2165 [SPARK-24636][SQL] Type coercion of arrays for array_join function
## What changes were proposed in this pull request?
Presto's implementation accepts arbitrary arrays of primitive types as an input:

```
presto> SELECT array_join(ARRAY [1, 2, 3], ', ');
_col0
---------
1, 2, 3
(1 row)
```

This PR proposes to implement a type coercion rule for ```array_join``` function that converts arrays of primitive as well as non-primitive types to arrays of string.

## How was this patch tested?

New test cases add into:
- sql-tests/inputs/typeCoercion/native/arrayJoin.sql
- DataFrameFunctionsSuite.scala

Author: Marek Novotny <mn.mikke@gmail.com>

Closes #21620 from mn-mikke/SPARK-24636.
2018-06-26 09:51:55 +08:00
Bryan Cutler d48803bf64 [SPARK-24324][PYTHON][FOLLOWUP] Grouped Map positional conf should have deprecation note
## What changes were proposed in this pull request?

Followup to the discussion of the added conf in SPARK-24324 which allows assignment by column position only.  This conf is to preserve old behavior and will be removed in future releases, so it should have a note to indicate that.

## How was this patch tested?

NA

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #21637 from BryanCutler/arrow-groupedMap-conf-deprecate-followup-SPARK-24324.
2018-06-25 17:08:23 -07:00
Marcelo Vanzin 6d16b9885d [SPARK-24552][CORE][SQL] Use task ID instead of attempt number for writes.
This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted.

For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem.

Closes #21558

Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Ryan Blue <blue@apache.org>

Closes #21606 from vanzin/SPARK-24552.2.
2018-06-25 16:54:57 -07:00
Stacy Kerkela 5264164a67 [SPARK-24648][SQL] SqlMetrics should be threadsafe
Use LongAdder to make SQLMetrics thread safe.

## What changes were proposed in this pull request?
Replace += with LongAdder.add() for concurrent counting

## How was this patch tested?
Unit tests with local threads

Author: Stacy Kerkela <stacy.kerkela@databricks.com>

Closes #21634 from dbkerkela/sqlmetrics-concurrency-stacy.
2018-06-25 23:41:39 +02:00
Marco Gaido 594ac4f7b8 [SPARK-24633][SQL] Fix codegen when split is required for arrays_zip
## What changes were proposed in this pull request?

In function array_zip, when split is required by the high number of arguments, a codegen error can happen.

The PR fixes codegen for cases when splitting the code is required.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21621 from mgaido91/SPARK-24633.
2018-06-25 23:44:20 +08:00
Maryann Xue bac50aa371 [SPARK-24596][SQL] Non-cascading Cache Invalidation
## What changes were proposed in this pull request?

1. Add parameter 'cascade' in CacheManager.uncacheQuery(). Under 'cascade=false' mode, only invalidate the current cache, and for other dependent caches, rebuild execution plan and reuse cached buffer.
2. Pass true/false from callers in different uncache scenarios:
- Drop tables and regular (persistent) views: regular mode
- Drop temporary views: non-cascading mode
- Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
- Call `DataSet.unpersist()`: non-cascading mode
- Call `Catalog.uncacheTable()`: follow the same convention as drop tables/view, which is, use non-cascading mode for temporary views and regular mode for the rest

Note that a regular (persistent) view is a database object just like a table, so after dropping a regular view (whether cached or not cached), any query referring to that view should no long be valid. Hence if a cached persistent view is dropped, we need to invalidate the all dependent caches so that exceptions will be thrown for any later reference. On the other hand, a temporary view is in fact equivalent to an unnamed DataSet, and dropping a temporary view should have no impact on queries referencing that view. Thus we should do non-cascading uncaching for temporary views, which also guarantees a consistent uncaching behavior between temporary views and unnamed DataSets.

## How was this patch tested?

New tests in CachedTableSuite and DatasetCacheSuite.

Author: Maryann Xue <maryannxue@apache.org>

Closes #21594 from maryannxue/noncascading-cache.
2018-06-25 07:17:30 -07:00
Takuya UESHIN 6e0596e263 [SPARK-23931][SQL][FOLLOW-UP] Make arrays_zip in function.scala @scala.annotation.varargs.
## What changes were proposed in this pull request?

This is a follow-up pr of #21045 which added `arrays_zip`.
The `arrays_zip` in functions.scala should've been `scala.annotation.varargs`.

This pr makes it `scala.annotation.varargs`.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21630 from ueshin/issues/SPARK-23931/fup1.
2018-06-24 23:56:47 -07:00
Takeshi Yamamuro f596ebe4d3 [SPARK-24327][SQL] Verify and normalize a partition column name based on the JDBC resolved schema
## What changes were proposed in this pull request?
This pr modified JDBC datasource code to verify and normalize a partition column based on the JDBC resolved schema before building `JDBCRelation`.

Closes #20370

## How was this patch tested?
Added tests in `JDBCSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21379 from maropu/SPARK-24327.
2018-06-24 23:14:42 -07:00
Bryan Cutler a5849ad9a3 [SPARK-24324][PYTHON] Pandas Grouped Map UDF should assign result columns by name
## What changes were proposed in this pull request?

Currently, a `pandas_udf` of type `PandasUDFType.GROUPED_MAP` will assign the resulting columns based on index of the return pandas.DataFrame.  If a new DataFrame is returned and constructed using a dict, then the order of the columns could be arbitrary and be different than the defined schema for the UDF.  If the schema types still match, then no error will be raised and the user will see column names and column data mixed up.

This change will first try to assign columns using the return type field names.  If a KeyError occurs, then the column index is checked if it is string based. If so, then the error is raised as it is most likely a naming mistake, else it will fallback to assign columns by position and raise a TypeError if the field types do not match.

## How was this patch tested?

Added a test that returns a new DataFrame with column order different than the schema.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #21427 from BryanCutler/arrow-grouped-map-mixesup-cols-SPARK-24324.
2018-06-24 09:28:46 +08:00
Takeshi Yamamuro 98f363b774 [SPARK-24206][SQL] Improve FilterPushdownBenchmark benchmark code
## What changes were proposed in this pull request?
This pr added benchmark code `FilterPushdownBenchmark` for string pushdown and updated performance results on the AWS `r3.xlarge`.

## How was this patch tested?
N/A

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21288 from maropu/UpdateParquetBenchmark.
2018-06-23 17:51:18 -07:00
Maxim Gekk c7e2742f9b [SPARK-24190][SQL] Allow saving of JSON files in UTF-16 and UTF-32
## What changes were proposed in this pull request?

Currently, restrictions in JSONOptions for `encoding` and `lineSep` are the same for read and for write. For example, a requirement for `lineSep` in the code:

```
df.write.option("encoding", "UTF-32BE").json(file)
```
doesn't allow to skip `lineSep` and use its default value `\n` because it throws the exception:
```
equirement failed: The lineSep option must be specified for the UTF-32BE encoding
java.lang.IllegalArgumentException: requirement failed: The lineSep option must be specified for the UTF-32BE encoding
```

In the PR, I propose to separate JSONOptions in read and write, and make JSONOptions in write less restrictive.

## How was this patch tested?

Added new test for blacklisted encodings in read. And the `lineSep` option was removed in write for some tests.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21247 from MaxGekk/json-options-in-write.
2018-06-23 17:40:20 -07:00
Marek Novotny 92c2f00bd2 [SPARK-23934][SQL] Adding map_from_entries function
## What changes were proposed in this pull request?
The PR adds the `map_from_entries` function that returns a map created from the given array of entries.

## How was this patch tested?
New tests added into:
- `CollectionExpressionSuite`
- `DataFrameFunctionSuite`

## CodeGen Examples
### Primitive-type Keys and Values
```
val idf = Seq(
  Seq((1, 10), (2, 20), (3, 10)),
  Seq((1, 10), null, (2, 20))
).toDF("a")
idf.filter('a.isNotNull).select(map_from_entries('a)).debugCodegen
```
Result:
```
/* 042 */         boolean project_isNull_0 = false;
/* 043 */         MapData project_value_0 = null;
/* 044 */
/* 045 */         for (int project_idx_2 = 0; !project_isNull_0 && project_idx_2 < inputadapter_value_0.numElements(); project_idx_2++) {
/* 046 */           project_isNull_0 |= inputadapter_value_0.isNullAt(project_idx_2);
/* 047 */         }
/* 048 */         if (!project_isNull_0) {
/* 049 */           final int project_numEntries_0 = inputadapter_value_0.numElements();
/* 050 */
/* 051 */           final long project_keySectionSize_0 = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(project_numEntries_0, 4);
/* 052 */           final long project_valueSectionSize_0 = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(project_numEntries_0, 4);
/* 053 */           final long project_byteArraySize_0 = 8 + project_keySectionSize_0 + project_valueSectionSize_0;
/* 054 */           if (project_byteArraySize_0 > 2147483632) {
/* 055 */             final Object[] project_keys_0 = new Object[project_numEntries_0];
/* 056 */             final Object[] project_values_0 = new Object[project_numEntries_0];
/* 057 */
/* 058 */             for (int project_idx_1 = 0; project_idx_1 < project_numEntries_0; project_idx_1++) {
/* 059 */               InternalRow project_entry_1 = inputadapter_value_0.getStruct(project_idx_1, 2);
/* 060 */
/* 061 */               project_keys_0[project_idx_1] = project_entry_1.getInt(0);
/* 062 */               project_values_0[project_idx_1] = project_entry_1.getInt(1);
/* 063 */             }
/* 064 */
/* 065 */             project_value_0 = org.apache.spark.sql.catalyst.util.ArrayBasedMapData.apply(project_keys_0, project_values_0);
/* 066 */
/* 067 */           } else {
/* 068 */             final byte[] project_byteArray_0 = new byte[(int)project_byteArraySize_0];
/* 069 */             UnsafeMapData project_unsafeMapData_0 = new UnsafeMapData();
/* 070 */             Platform.putLong(project_byteArray_0, 16, project_keySectionSize_0);
/* 071 */             Platform.putLong(project_byteArray_0, 24, project_numEntries_0);
/* 072 */             Platform.putLong(project_byteArray_0, 24 + project_keySectionSize_0, project_numEntries_0);
/* 073 */             project_unsafeMapData_0.pointTo(project_byteArray_0, 16, (int)project_byteArraySize_0);
/* 074 */             ArrayData project_keyArrayData_0 = project_unsafeMapData_0.keyArray();
/* 075 */             ArrayData project_valueArrayData_0 = project_unsafeMapData_0.valueArray();
/* 076 */
/* 077 */             for (int project_idx_0 = 0; project_idx_0 < project_numEntries_0; project_idx_0++) {
/* 078 */               InternalRow project_entry_0 = inputadapter_value_0.getStruct(project_idx_0, 2);
/* 079 */
/* 080 */               project_keyArrayData_0.setInt(project_idx_0, project_entry_0.getInt(0));
/* 081 */               project_valueArrayData_0.setInt(project_idx_0, project_entry_0.getInt(1));
/* 082 */             }
/* 083 */
/* 084 */             project_value_0 = project_unsafeMapData_0;
/* 085 */           }
/* 086 */
/* 087 */         }
```
### Non-primitive-type Keys and Values
```
val sdf = Seq(
  Seq(("a", null), ("b", "bb"), ("c", "aa")),
  Seq(("a", "aa"), null, (null, "bb"))
).toDF("a")
sdf.filter('a.isNotNull).select(map_from_entries('a)).debugCodegen
```
Result:
```
/* 042 */         boolean project_isNull_0 = false;
/* 043 */         MapData project_value_0 = null;
/* 044 */
/* 045 */         for (int project_idx_1 = 0; !project_isNull_0 && project_idx_1 < inputadapter_value_0.numElements(); project_idx_1++) {
/* 046 */           project_isNull_0 |= inputadapter_value_0.isNullAt(project_idx_1);
/* 047 */         }
/* 048 */         if (!project_isNull_0) {
/* 049 */           final int project_numEntries_0 = inputadapter_value_0.numElements();
/* 050 */
/* 051 */           final Object[] project_keys_0 = new Object[project_numEntries_0];
/* 052 */           final Object[] project_values_0 = new Object[project_numEntries_0];
/* 053 */
/* 054 */           for (int project_idx_0 = 0; project_idx_0 < project_numEntries_0; project_idx_0++) {
/* 055 */             InternalRow project_entry_0 = inputadapter_value_0.getStruct(project_idx_0, 2);
/* 056 */
/* 057 */             if (project_entry_0.isNullAt(0)) {
/* 058 */               throw new RuntimeException("The first field from a struct (key) can't be null.");
/* 059 */             }
/* 060 */
/* 061 */             project_keys_0[project_idx_0] = project_entry_0.getUTF8String(0);
/* 062 */             project_values_0[project_idx_0] = project_entry_0.getUTF8String(1);
/* 063 */           }
/* 064 */
/* 065 */           project_value_0 = org.apache.spark.sql.catalyst.util.ArrayBasedMapData.apply(project_keys_0, project_values_0);
/* 066 */
/* 067 */         }
```

Author: Marek Novotny <mn.mikke@gmail.com>

Closes #21282 from mn-mikke/feature/array-api-map_from_entries-to-master.
2018-06-22 16:18:22 +09:00
Wenchen Fan dc8a6befa5 [SPARK-24588][SS] streaming join should require HashClusteredPartitioning from children
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/19080 we simplified the distribution/partitioning framework, and make all the join-like operators require `HashClusteredDistribution` from children. Unfortunately streaming join operator was missed.

This can cause wrong result. Think about
```
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
```

The physical plan is
```
*(3) Project [a#5]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6]
   :     +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(b#11, 5)
      +- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11]
         +- StreamingRelation MemoryStream[value#3], [value#3]
```

The left table is hash partitioned by `a, b`, while the right table is hash partitioned by `b`. This means, we may have a matching record that is in different partitions, which should be in the output but not.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21587 from cloud-fan/join.
2018-06-21 15:38:46 -07:00
Maryann Xue b9a6f7499a [SPARK-24613][SQL] Cache with UDF could not be matched with subsequent dependent caches
## What changes were proposed in this pull request?

Wrap the logical plan with a `AnalysisBarrier` for execution plan compilation in CacheManager, in order to avoid the plan being analyzed again.

## How was this patch tested?

Add one test in `DatasetCacheSuite`

Author: Maryann Xue <maryannxue@apache.org>

Closes #21602 from maryannxue/cache-mismatch.
2018-06-21 11:45:30 -07:00
Marcelo Vanzin c8e909cd49 [SPARK-24589][CORE] Correctly identify tasks in output commit coordinator.
When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21577 from vanzin/SPARK-24552.
2018-06-21 13:25:15 -05:00
Chongguang LIU 7236e759c9 [SPARK-24574][SQL] array_contains, array_position, array_remove and element_at functions deal with Column type
## What changes were proposed in this pull request?

For the function ```def array_contains(column: Column, value: Any): Column ``` , if we pass the `value` parameter as a Column type, it will yield a runtime exception.

This PR proposes a pattern matching to detect if `value` is of type Column. If yes, it will use the .expr of the column, otherwise it will work as it used to.

Same thing for ```array_position, array_remove and element_at``` functions

## How was this patch tested?

Unit test modified to cover this code change.

Ping ueshin

Author: Chongguang LIU <chong@Chongguangs-MacBook-Pro.local>

Closes #21581 from chongguang/SPARK-24574.
2018-06-21 14:58:57 +08:00
Maxim Gekk 54fcaafb09 [SPARK-24571][SQL] Support Char literals
## What changes were proposed in this pull request?

In the PR, I propose to automatically convert a `Literal` with `Char` type to a `Literal` of `String` type. Currently, the following code:
```scala
val df = Seq("Amsterdam", "San Francisco", "London").toDF("city")
df.where($"city".contains('o')).show(false)
```
fails with the exception:
```
Unsupported literal type class java.lang.Character o
java.lang.RuntimeException: Unsupported literal type class java.lang.Character o
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
```
The PR fixes this issue by converting `char` to `string` of length `1`. I believe it makes sense to does not differentiate `char` and `string(1)` in _a unified, multi-language data platform_ like Spark which supports languages like Python/R.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21578 from MaxGekk/support-char-literals.
2018-06-20 23:38:37 -07:00
Huaxin Gao 9de11d3f90 [SPARK-23912][SQL] add array_distinct
## What changes were proposed in this pull request?

Add array_distinct to remove duplicate value from the array.

## How was this patch tested?

Add unit tests

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #21050 from huaxingao/spark-23912.
2018-06-21 12:24:53 +09:00
aokolnychyi c5a0d1132a [SPARK-24575][SQL] Prohibit window expressions inside WHERE and HAVING clauses
## What changes were proposed in this pull request?

As discussed [before](https://github.com/apache/spark/pull/19193#issuecomment-393726964), this PR prohibits window expressions inside WHERE and HAVING clauses.

## How was this patch tested?

This PR comes with a dedicated unit test.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes #21580 from aokolnychyi/spark-24575.
2018-06-20 18:57:13 +02:00
Jungtaek Lim c8ef9232cf [MINOR][SQL] Remove invalid comment from SparkStrategies
## What changes were proposed in this pull request?

This patch is removing invalid comment from SparkStrategies, given that TODO-like comment is no longer preferred one as the comment: https://github.com/apache/spark/pull/21388#issuecomment-396856235

Removing invalid comment will prevent contributors to spend their times which is not going to be merged.

## How was this patch tested?

N/A

Author: Jungtaek Lim <kabhwan@gmail.com>

Closes #21595 from HeartSaVioR/MINOR-remove-invalid-comment-on-spark-strategies.
2018-06-20 18:38:42 +02:00
Maryann Xue bc0498d582 [SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand
## What changes were proposed in this pull request?

Change insert input schema type: "insertRelationType" -> "insertRelationType.asNullable", in order to avoid nullable being overridden.

## How was this patch tested?

Added one test in InsertSuite.

Author: Maryann Xue <maryannxue@apache.org>

Closes #21585 from maryannxue/spark-24583.
2018-06-19 15:27:20 -07:00
Tathagata Das 2cb976355c [SPARK-24565][SS] Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame
## What changes were proposed in this pull request?

Currently, the micro-batches in the MicroBatchExecution is not exposed to the user through any public API. This was because we did not want to expose the micro-batches, so that all the APIs we expose, we can eventually support them in the Continuous engine. But now that we have better sense of buiding a ContinuousExecution, I am considering adding APIs which will run only the MicroBatchExecution. I have quite a few use cases where exposing the microbatch output as a dataframe is useful.
- Pass the output rows of each batch to a library that is designed only the batch jobs (example, uses many ML libraries need to collect() while learning).
- Reuse batch data sources for output whose streaming version does not exists (e.g. redshift data source).
- Writer the output rows to multiple places by writing twice for each batch. This is not the most elegant thing to do for multiple-output streaming queries but is likely to be better than running two streaming queries processing the same data twice.

The proposal is to add a method `foreachBatch(f: Dataset[T] => Unit)` to Scala/Java/Python `DataStreamWriter`.

## How was this patch tested?
New unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21571 from tdas/foreachBatch.
2018-06-19 13:56:51 -07:00
yucai 9dbe53eb6b [SPARK-24556][SQL] Always rewrite output partitioning in ReusedExchangeExec and InMemoryTableScanExec
## What changes were proposed in this pull request?

Currently, ReusedExchange and InMemoryTableScanExec only rewrite output partitioning if child's partitioning is HashPartitioning and do nothing for other partitioning, e.g., RangePartitioning. We should always rewrite it, otherwise, unnecessary shuffle could be introduced like https://issues.apache.org/jira/browse/SPARK-24556.

## How was this patch tested?

Add new tests.

Author: yucai <yyu1@ebay.com>

Closes #21564 from yucai/SPARK-24556.
2018-06-19 10:52:51 -07:00
Li Jin a78a904641 [SPARK-24521][SQL][TEST] Fix ineffective test in CachedTableSuite
## What changes were proposed in this pull request?

test("withColumn doesn't invalidate cached dataframe") in CachedTableSuite doesn't not work because:

The UDF is executed and test count incremented when "df.cache()" is called and the subsequent "df.collect()" has no effect on the test result.

This PR fixed this test and add another test for caching UDF.

## How was this patch tested?

Add new tests.

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21531 from icexelloss/fix-cache-test.
2018-06-19 10:42:08 -07:00
Xiao Li 9a75c18290 [SPARK-24542][SQL] UDF series UDFXPathXXXX allow users to pass carefully crafted XML to access arbitrary files
## What changes were proposed in this pull request?

UDF series UDFXPathXXXX allow users to pass carefully crafted XML to access arbitrary files. Spark does not have built-in access control. When users use the external access control library, users might bypass them and access the file contents.

This PR basically patches the Hive fix to Apache Spark. https://issues.apache.org/jira/browse/HIVE-18879

## How was this patch tested?

A unit test case

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21549 from gatorsmile/xpathSecurity.
2018-06-18 20:17:04 -07:00
Wenchen Fan 1737d45e08 [SPARK-24478][SQL][FOLLOWUP] Move projection and filter push down to physical conversion
## What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/21503, to completely move operator pushdown to the planner rule.

The code are mostly from https://github.com/apache/spark/pull/21319

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21574 from cloud-fan/followup.
2018-06-18 20:15:01 -07:00
Liang-Chi Hsieh 8f225e055c [SPARK-24548][SQL] Fix incorrect schema of Dataset with tuple encoders
## What changes were proposed in this pull request?

When creating tuple expression encoders, we should give the serializer expressions of tuple items correct names, so we can have correct output schema when we use such tuple encoders.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21576 from viirya/SPARK-24548.
2018-06-18 11:01:17 -07:00
Takeshi Yamamuro e219e692ef [SPARK-23772][SQL] Provide an option to ignore column of all null values or empty array during JSON schema inference
## What changes were proposed in this pull request?
This pr added a new JSON option `dropFieldIfAllNull ` to ignore column of all null values or empty array/struct during JSON schema inference.

## How was this patch tested?
Added tests in `JsonSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>
Author: Xiangrui Meng <meng@databricks.com>

Closes #20929 from maropu/SPARK-23772.
2018-06-19 00:24:54 +08:00
James Yu c7c0b086a0 add one supported type missing from the javadoc
## What changes were proposed in this pull request?

The supported java.math.BigInteger type is not mentioned in the javadoc of Encoders.bean()

## How was this patch tested?

only Javadoc fix

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: James Yu <james@ispot.tv>

Closes #21544 from yuj/master.
2018-06-15 21:04:04 -07:00
Mukul Murthy e4fee395ec [SPARK-24525][SS] Provide an option to limit number of rows in a MemorySink
## What changes were proposed in this pull request?

Provide an option to limit number of rows in a MemorySink. Currently, MemorySink and MemorySinkV2 have unbounded size, meaning that if they're used on big data, they can OOM the stream. This change adds a maxMemorySinkRows option to limit how many rows MemorySink and MemorySinkV2 can hold. By default, they are still unbounded.

## How was this patch tested?

Added new unit tests.

Author: Mukul Murthy <mukul.murthy@databricks.com>

Closes #21559 from mukulmurthy/SPARK-24525.
2018-06-15 13:56:48 -07:00
Kazuaki Ishizaki 90da7dc241 [SPARK-24452][SQL][CORE] Avoid possible overflow in int add or multiple
## What changes were proposed in this pull request?

This PR fixes possible overflow in int add or multiply. In particular, their overflows in multiply are detected by [Spotbugs](https://spotbugs.github.io/)

The following assignments may cause overflow in right hand side. As a result, the result may be negative.
```
long = int * int
long = int + int
```

To avoid this problem, this PR performs cast from int to long in right hand side.

## How was this patch tested?

Existing UTs.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21481 from kiszk/SPARK-24452.
2018-06-15 13:47:48 -07:00
Tathagata Das b5ccf0d395 [SPARK-24396][SS][PYSPARK] Add Structured Streaming ForeachWriter for python
## What changes were proposed in this pull request?

This PR adds `foreach` for streaming queries in Python. Users will be able to specify their processing logic in two different ways.
- As a function that takes a row as input.
- As an object that has methods `open`, `process`, and `close` methods.

See the python docs in this PR for more details.

## How was this patch tested?
Added java and python unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21477 from tdas/SPARK-24396.
2018-06-15 12:56:39 -07:00
Ryan Blue 22daeba59b [SPARK-24478][SQL] Move projection and filter push down to physical conversion
## What changes were proposed in this pull request?

This removes the v2 optimizer rule for push-down and instead pushes filters and required columns when converting to a physical plan, as suggested by marmbrus. This makes the v2 relation cleaner because the output and filters do not change in the logical plan.

A side-effect of this change is that the stats from the logical (optimized) plan no longer reflect pushed filters and projection. This is a temporary state, until the planner gathers stats from the physical plan instead. An alternative to this approach is 9d3a11e68b.

The first commit was proposed in #21262. This PR replaces #21262.

## How was this patch tested?

Existing tests.

Author: Ryan Blue <blue@apache.org>

Closes #21503 from rdblue/SPARK-24478-move-push-down-to-physical-conversion.
2018-06-14 20:59:42 -07:00
Maxim Gekk b8f27ae3b3 [SPARK-24543][SQL] Support any type as DDL string for from_json's schema
## What changes were proposed in this pull request?

In the PR, I propose to support any DataType represented as DDL string for the from_json function. After the changes, it will be possible to specify `MapType` in SQL like:
```sql
select from_json('{"a":1, "b":2}', 'map<string, int>')
```
and in Scala (similar in other languages)
```scala
val in = Seq("""{"a": {"b": 1}}""").toDS()
val schema = "map<string, map<string, int>>"
val out = in.select(from_json($"value", schema, Map.empty[String, String]))
```

## How was this patch tested?

Added a couple sql tests and modified existing tests for Python and Scala. The former tests were modified because it is not imported for them in which format schema for `from_json` is provided.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21550 from MaxGekk/from_json-ddl-schema.
2018-06-14 13:27:27 -07:00
Marco Gaido fdadc4be08 [SPARK-24495][SQL] EnsureRequirement returns wrong plan when reordering equal keys
## What changes were proposed in this pull request?

`EnsureRequirement` in its `reorder` method currently assumes that the same key appears only once in the join condition. This of course might not be the case, and when it is not satisfied, it returns a wrong plan which produces a wrong result of the query.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21529 from mgaido91/SPARK-24495.
2018-06-14 09:20:41 -07:00
Marco Gaido 3bf76918fb [SPARK-24531][TESTS] Replace 2.3.0 version with 2.3.1
## What changes were proposed in this pull request?

The PR updates the 2.3 version tested to the new release 2.3.1.

## How was this patch tested?

existing UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21543 from mgaido91/patch-1.
2018-06-13 15:18:19 -07:00
Jose Torres 1b46f41c55 [SPARK-24235][SS] Implement continuous shuffle writer for single reader partition.
## What changes were proposed in this pull request?

https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit

Implement continuous shuffle write RDD for a single reader partition. (I don't believe any implementation changes are actually required for multiple reader partitions, but this PR is already very large, so I want to exclude those for now to keep the size down.)

## How was this patch tested?

new unit tests

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #21428 from jose-torres/writerTask.
2018-06-13 13:13:01 -07:00
Herman van Hovell 299d297e25 [SPARK-24500][SQL] Make sure streams are materialized during Tree transforms.
## What changes were proposed in this pull request?
If you construct catalyst trees using `scala.collection.immutable.Stream` you can run into situations where valid transformations do not seem to have any effect. There are two causes for this behavior:
- `Stream` is evaluated lazily. Note that default implementation will generally only evaluate a function for the first element (this makes testing a bit tricky).
- `TreeNode` and `QueryPlan` use side effects to detect if a tree has changed. Mapping over a stream is lazy and does not need to trigger this side effect. If this happens the node will invalidly assume that it did not change and return itself instead if the newly created node (this is for GC reasons).

This PR fixes this issue by forcing materialization on streams in `TreeNode` and `QueryPlan`.

## How was this patch tested?
Unit tests were added to `TreeNodeSuite` and `LogicalPlanSuite`. An integration test was added to the `PlannerSuite`

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #21539 from hvanhovell/SPARK-24500.
2018-06-13 07:09:48 -07:00
Arun Mahadevan 7703b46d28 [SPARK-24479][SS] Added config for registering streamingQueryListeners
## What changes were proposed in this pull request?

Currently a "StreamingQueryListener" can only be registered programatically. We could have a new config "spark.sql.streamingQueryListeners" similar to  "spark.sql.queryExecutionListeners" and "spark.extraListeners" for users to register custom streaming listeners.

## How was this patch tested?

New unit test and running example programs.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Arun Mahadevan <arunm@apache.org>

Closes #21504 from arunmahadevan/SPARK-24480.
2018-06-13 20:43:16 +08:00
Jungtaek Lim 4c388bccf1 [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider
## What changes were proposed in this pull request?

This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue.

## How was this patch tested?

Manually tested.

Author: Jungtaek Lim <kabhwan@gmail.com>

Closes #21506 from HeartSaVioR/SPARK-24485.
2018-06-13 12:36:20 +08:00
Jungtaek Lim 3352d6fe9a [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be compatible with netcat again
## What changes were proposed in this pull request?

TextSocketMicroBatchReader was no longer be compatible with netcat due to launching temporary reader for reading schema, and closing reader, and re-opening reader. While reliable socket server should be able to handle this without any issue, nc command normally can't handle multiple connections and simply exits when closing temporary reader.

This patch fixes TextSocketMicroBatchReader to be compatible with netcat again, via deferring opening socket to the first call of planInputPartitions() instead of constructor.

## How was this patch tested?

Added unit test which fails on current and succeeds with the patch. And also manually tested.

Author: Jungtaek Lim <kabhwan@gmail.com>

Closes #21497 from HeartSaVioR/SPARK-24466.
2018-06-13 12:34:46 +08:00
Li Jin 9786ce66c5 [SPARK-22239][SQL][PYTHON] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames
## What changes were proposed in this pull request?
This PR enables using a grouped aggregate pandas UDFs as window functions. The semantics is the same as using SQL aggregation function as window functions.

```
       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
       >>> from pyspark.sql import Window
       >>> df = spark.createDataFrame(
       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
       ...     ("id", "v"))
       >>> pandas_udf("double", PandasUDFType.GROUPED_AGG)
       ... def mean_udf(v):
       ...     return v.mean()
       >>> w = Window.partitionBy('id')
       >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
       +---+----+------+
       | id|   v|mean_v|
       +---+----+------+
       |  1| 1.0|   1.5|
       |  1| 2.0|   1.5|
       |  2| 3.0|   6.0|
       |  2| 5.0|   6.0|
       |  2|10.0|   6.0|
       +---+----+------+
```

The scope of this PR is somewhat limited in terms of:
(1) Only supports unbounded window, which acts essentially as group by.
(2) Only supports aggregation functions, not "transform" like window functions (n -> n mapping)

Both of these are left as future work. Especially, (1) needs careful thinking w.r.t. how to pass rolling window data to python efficiently. (2) is a bit easier but does require more changes therefore I think it's better to leave it as a separate PR.

## How was this patch tested?

WindowPandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21082 from icexelloss/SPARK-22239-window-udf.
2018-06-13 09:10:52 +08:00
Kazuaki Ishizaki ada28f2595 [SPARK-23933][SQL] Add map_from_arrays function
## What changes were proposed in this pull request?

The PR adds the SQL function `map_from_arrays`. The behavior of the function is based on Presto's `map`. Since SparkSQL already had a `map` function, we prepared the different name for this behavior.

This function returns returns a map from a pair of arrays for keys and values.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21258 from kiszk/SPARK-23933.
2018-06-12 12:31:22 -07:00
Fangshi Li cc88d7fad1 [SPARK-24216][SQL] Spark TypedAggregateExpression uses getSimpleName that is not safe in scala
## What changes were proposed in this pull request?

When user create a aggregator object in scala and pass the aggregator to Spark Dataset's agg() method, Spark's will initialize TypedAggregateExpression with the nodeName field as aggregator.getClass.getSimpleName. However, getSimpleName is not safe in scala environment, depending on how user creates the aggregator object. For example, if the aggregator class full qualified name is "com.my.company.MyUtils$myAgg$2$", the getSimpleName will throw java.lang.InternalError "Malformed class name". This has been reported in scalatest https://github.com/scalatest/scalatest/pull/1044 and discussed in many scala upstream jiras such as SI-8110, SI-5425.

To fix this issue, we follow the solution in https://github.com/scalatest/scalatest/pull/1044 to add safer version of getSimpleName as a util method, and TypedAggregateExpression will invoke this util method rather than getClass.getSimpleName.

## How was this patch tested?
added unit test

Author: Fangshi Li <fli@linkedin.com>

Closes #21276 from fangshil/SPARK-24216.
2018-06-12 12:10:08 -07:00
DylanGuedes f0ef1b311d [SPARK-23931][SQL] Adds arrays_zip function to sparksql
Signed-off-by: DylanGuedes <djmgguedesgmail.com>

## What changes were proposed in this pull request?

Addition of arrays_zip function to spark sql functions.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Unit tests that checks if the results are correct.

Author: DylanGuedes <djmgguedes@gmail.com>

Closes #21045 from DylanGuedes/SPARK-23931.
2018-06-12 11:57:25 -07:00
Marco Gaido 2824f1436b [SPARK-24531][TESTS] Remove version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite
## What changes were proposed in this pull request?

Removing version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite as it is not present anymore in the mirrors and this is blocking all the open PRs.

## How was this patch tested?

running UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21540 from mgaido91/SPARK-24531.
2018-06-12 09:56:35 -07:00
Tom Saleeba 1d7db65e96 docs: fix typo
no => no[t]

## What changes were proposed in this pull request?

Fixing a typo.

## How was this patch tested?

Visual check of the docs.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tom Saleeba <tom.saleeba@gmail.com>

Closes #21496 from tomsaleeba/patch-1.
2018-06-12 09:22:52 -05:00
Wenchen Fan 01452ea9c7 [SPARK-24502][SQL] flaky test: UnsafeRowSerializerSuite
## What changes were proposed in this pull request?

`UnsafeRowSerializerSuite` calls `UnsafeProjection.create` which accesses `SQLConf.get`, while the current active SparkSession may already be stopped, and we may hit exception like this

```
sbt.ForkMain$ForkError: java.lang.IllegalStateException: LiveListenerBus is stopped.
	at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:97)
	at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:80)
	at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:93)
	at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120)
	at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:120)
	at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:119)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:286)
	at org.apache.spark.sql.test.TestSparkSession.sessionState$lzycompute(TestSQLContext.scala:42)
	at org.apache.spark.sql.test.TestSparkSession.sessionState(TestSQLContext.scala:41)
	at org.apache.spark.sql.SparkSession$$anonfun$1$$anonfun$apply$1.apply(SparkSession.scala:95)
	at org.apache.spark.sql.SparkSession$$anonfun$1$$anonfun$apply$1.apply(SparkSession.scala:95)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.sql.SparkSession$$anonfun$1.apply(SparkSession.scala:95)
	at org.apache.spark.sql.SparkSession$$anonfun$1.apply(SparkSession.scala:94)
	at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:126)
	at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:54)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:157)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:150)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite.org$apache$spark$sql$execution$UnsafeRowSerializerSuite$$unsafeRowConverter(UnsafeRowSerializerSuite.scala:54)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite.org$apache$spark$sql$execution$UnsafeRowSerializerSuite$$toUnsafeRow(UnsafeRowSerializerSuite.scala:49)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite$$anonfun$2.apply(UnsafeRowSerializerSuite.scala:63)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite$$anonfun$2.apply(UnsafeRowSerializerSuite.scala:60)
...
```

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21518 from cloud-fan/test.
2018-06-11 22:08:44 -07:00
liutang123 048197749e [SPARK-22144][SQL] ExchangeCoordinator combine the partitions of an 0 sized pre-shuffle to 0
## What changes were proposed in this pull request?
when the length of pre-shuffle's partitions is 0, the length of post-shuffle's partitions should be 0 instead of spark.sql.shuffle.partitions.

## How was this patch tested?
ExchangeCoordinator converted a  pre-shuffle that partitions is 0 to a post-shuffle that partitions is 0 instead of one that partitions is spark.sql.shuffle.partitions.

Author: liutang123 <liutang123@yeah.net>

Closes #19364 from liutang123/SPARK-22144.
2018-06-11 17:48:07 -07:00
Marco Gaido f07c5064a3 [SPARK-24468][SQL] Handle negative scale when adjusting precision for decimal operations
## What changes were proposed in this pull request?

In SPARK-22036 we introduced the possibility to allow precision loss in arithmetic operations (according to the SQL standard). The implementation was drawn from Hive's one, where Decimals with a negative scale are not allowed in the operations.

The PR handles the case when the scale is negative, removing the assertion that it is not.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21499 from mgaido91/SPARK-24468.
2018-06-08 18:51:56 -07:00
Thiruvasakan Paramasivan 36a3409134
[SPARK-24412][SQL] Adding docs about automagical type casting in isin and isInCollection APIs
## What changes were proposed in this pull request?
Update documentation for `isInCollection` API to clealy explain the "auto-casting" of elements if their types are different.

## How was this patch tested?
No-Op

Author: Thiruvasakan Paramasivan <thiru@apple.com>

Closes #21519 from trvskn/sql-doc-update.
2018-06-08 17:17:43 -07:00
Bruce Robbins 1462bba4fd [SPARK-24119][SQL] Add interpreted execution to SortPrefix expression
## What changes were proposed in this pull request?

Implemented eval in SortPrefix expression.

## How was this patch tested?

- ran existing sbt SQL tests
- added unit test
- ran existing Python SQL tests
- manual tests: disabling codegen -- patching code to disable beyond what spark.sql.codegen.wholeStage=false can do -- and running sbt SQL tests

Author: Bruce Robbins <bersprockets@gmail.com>

Closes #21231 from bersprockets/sortprefixeval.
2018-06-08 13:27:52 +02:00
Asher Saban e76b0124fb [SPARK-23803][SQL] Support bucket pruning
## What changes were proposed in this pull request?
support bucket pruning when filtering on a single bucketed column on the following predicates -
EqualTo, EqualNullSafe, In, And/Or predicates

## How was this patch tested?
refactored unit tests to test the above.

based on gatorsmile work in e3c75c6398

Author: Asher Saban <asaban@palantir.com>
Author: asaban <asaban@palantir.com>

Closes #20915 from sabanas/filter-prune-buckets.
2018-06-06 07:14:08 -07:00
jinxing 93df3cd035 [SPARK-22384][SQL] Refine partition pruning when attribute is wrapped in Cast
## What changes were proposed in this pull request?

Sql below will get all partitions from metastore, which put much burden on metastore;
```
CREATE TABLE `partition_test`(`col` int) PARTITIONED BY (`pt` byte)
SELECT * FROM partition_test WHERE CAST(pt AS INT)=1
```
The reason is that the the analyzed attribute `dt` is wrapped in `Cast` and `HiveShim` fails to generate a proper partition filter.
This pr proposes to take `Cast` into consideration when generate partition filter.

## How was this patch tested?
Test added.
This pr proposes to use analyzed expressions in `HiveClientSuite`

Author: jinxing <jinxing6042@126.com>

Closes #19602 from jinxing64/SPARK-22384.
2018-06-05 11:32:42 -07:00
Tathagata Das 2c2a86b5d5 [SPARK-24453][SS] Fix error recovering from the failure in a no-data batch
## What changes were proposed in this pull request?

The error occurs when we are recovering from a failure in a no-data batch (say X) that has been planned (i.e. written to offset log) but not executed (i.e. not written to commit log). Upon recovery the following sequence of events happen.

1. `MicroBatchExecution.populateStartOffsets` sets `currentBatchId` to X. Since there was no data in the batch, the `availableOffsets` is same as `committedOffsets`, so `isNewDataAvailable` is `false`.
2. When `MicroBatchExecution.constructNextBatch` is called, ideally it should immediately return true because the next batch has already been constructed. However, the check of whether the batch has been constructed was `if (isNewDataAvailable) return true`. Since the planned batch is a no-data batch, it escaped this check and proceeded to plan the same batch X *once again*.

The solution is to have an explicit flag that signifies whether a batch has already been constructed or not. `populateStartOffsets` is going to set the flag appropriately.

## How was this patch tested?

new unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21491 from tdas/SPARK-24453.
2018-06-05 01:08:55 -07:00
Yuanjian Li dbb4d83829 [SPARK-24215][PYSPARK] Implement _repr_html_ for dataframes in PySpark
## What changes were proposed in this pull request?

Implement `_repr_html_` for PySpark while in notebook and add config named "spark.sql.repl.eagerEval.enabled" to control this.

The dev list thread for context: http://apache-spark-developers-list.1001551.n3.nabble.com/eager-execution-and-debuggability-td23928.html

## How was this patch tested?

New ut in DataFrameSuite and manual test in jupyter. Some screenshot below.

**After:**
![image](https://user-images.githubusercontent.com/4833765/40268422-8db5bef0-5b9f-11e8-80f1-04bc654a4f2c.png)

**Before:**
![image](https://user-images.githubusercontent.com/4833765/40268431-9f92c1b8-5b9f-11e8-9db9-0611f0940b26.png)

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes #21370 from xuanyuanking/SPARK-24215.
2018-06-05 08:23:08 +07:00
aokolnychyi 7297ae04d8 [SPARK-21896][SQL] Fix StackOverflow caused by window functions inside aggregate functions
## What changes were proposed in this pull request?

This PR explicitly prohibits window functions inside aggregates. Currently, this will cause StackOverflow during analysis. See PR #19193 for previous discussion.

## How was this patch tested?

This PR comes with a dedicated unit test.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes #21473 from aokolnychyi/fix-stackoverflow-window-funcs.
2018-06-04 13:28:16 -07:00
Yuming Wang 0be5aa2746 [SPARK-23903][SQL] Add support for date extract
## What changes were proposed in this pull request?

Add support for date `extract` function:
```sql
spark-sql> SELECT EXTRACT(YEAR FROM TIMESTAMP '2000-12-16 12:21:13');
2000
```
Supported field same as [Hive](https://github.com/apache/hive/blob/rel/release-2.3.3/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g#L308-L316): `YEAR`, `QUARTER`, `MONTH`, `WEEK`, `DAY`, `DAYOFWEEK`, `HOUR`, `MINUTE`, `SECOND`.

## How was this patch tested?

unit tests

Author: Yuming Wang <yumwang@ebay.com>

Closes #21479 from wangyum/SPARK-23903.
2018-06-04 10:16:13 -07:00
Maxim Gekk 1d9338bb10 [SPARK-23786][SQL] Checking column names of csv headers
## What changes were proposed in this pull request?

Currently column names of headers in CSV files are not checked against provided schema of CSV data. It could cause errors like showed in the [SPARK-23786](https://issues.apache.org/jira/browse/SPARK-23786) and https://github.com/apache/spark/pull/20894#issuecomment-375957777. I introduced new CSV option - `enforceSchema`. If it is enabled (by default `true`), Spark forcibly applies provided or inferred schema to CSV files. In that case, CSV headers are ignored and not checked against the schema. If `enforceSchema` is set to `false`, additional checks can be performed. For example, if column in CSV header and in the schema have different ordering, the following exception is thrown:

```
java.lang.IllegalArgumentException: CSV file header does not contain the expected fields
 Header: depth, temperature
 Schema: temperature, depth
CSV file: marina.csv
```

## How was this patch tested?

The changes were tested by existing tests of CSVSuite and by 2 new tests.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #20894 from MaxGekk/check-column-names.
2018-06-03 22:02:21 -07:00
Wenchen Fan 416cd1fd96 [SPARK-24369][SQL] Correct handling for multiple distinct aggregations having the same argument set
## What changes were proposed in this pull request?

bring back https://github.com/apache/spark/pull/21443

This is a different approach: just change the check to count distinct columns with `toSet`

## How was this patch tested?

a new test to verify the planner behavior.

Author: Wenchen Fan <wenchen@databricks.com>
Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21487 from cloud-fan/back.
2018-06-03 21:57:42 -07:00
Xiao Li d2c3de7efc Revert "[SPARK-24369][SQL] Correct handling for multiple distinct aggregations having the same argument set"
This reverts commit 1e46f92f95.
2018-06-01 11:51:10 -07:00
Huang Tengfei 6039b13230 [SPARK-24351][SS] offsetLog/commitLog purge thresholdBatchId should be computed with current committed epoch but not currentBatchId in CP mode
## What changes were proposed in this pull request?
Compute the thresholdBatchId to purge metadata based on current committed epoch instead of currentBatchId in CP mode to avoid cleaning all the committed metadata in some case as described in the jira [SPARK-24351](https://issues.apache.org/jira/browse/SPARK-24351).

## How was this patch tested?
Add new unit test.

Author: Huang Tengfei <tengfei.h@gmail.com>

Closes #21400 from ivoson/branch-cp-meta.
2018-06-01 10:47:53 -07:00
Huaxin Gao 98909c398d [SPARK-23920][SQL] add array_remove to remove all elements that equal element from array
## What changes were proposed in this pull request?

add array_remove to remove all elements that equal element from array

## How was this patch tested?

add unit tests

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #21069 from huaxingao/spark-23920.
2018-05-31 22:04:26 -07:00
Gengliang Wang cbaa729132 [SPARK-24330][SQL] Refactor ExecuteWriteTask and Use while in writing files
## What changes were proposed in this pull request?
1. Refactor ExecuteWriteTask in FileFormatWriter to reduce common logic and improve readability.
After the change, callers only need to call `commit()` or `abort` at the end of task.
Also there is less code in `SingleDirectoryWriteTask` and `DynamicPartitionWriteTask`.
Definitions of related classes are moved to a new file, and `ExecuteWriteTask` is renamed to `FileFormatDataWriter`.

2. As per code style guide: https://github.com/databricks/scala-style-guide#traversal-and-zipwithindex , we avoid using `for` for looping in [FileFormatWriter](https://github.com/apache/spark/pull/21381/files#diff-3b69eb0963b68c65cfe8075f8a42e850L536) , or `foreach` in [WriteToDataSourceV2Exec](https://github.com/apache/spark/pull/21381/files#diff-6fbe10db766049a395bae2e785e9d56eL119).
In such critical code path, using `while` is good for performance.

## How was this patch tested?
Existing unit test.
I tried the microbenchmark in https://github.com/apache/spark/pull/21409

| Workload | Before changes(Best/Avg Time(ms)) | After changes(Best/Avg Time(ms)) |
| --- | --- | -- |
|Output Single Int Column|    2018 / 2043   |    2096 / 2236 |
|Output Single Double Column| 1978 / 2043 | 2013 / 2018 |
|Output Int and String Column| 6332 / 6706 | 6162 / 6298 |
|Output Partitions| 4458 / 5094 |  3792 / 4008  |
|Output Buckets|           5695 / 6102 |    5120 / 5154 |

Also a microbenchmark on my laptop for general comparison among while/foreach/for :
```
class Writer {
  var sum = 0L
  def write(l: Long): Unit = sum += l
}

def testWhile(iterator: Iterator[Long]): Long = {
  val w = new Writer
  while (iterator.hasNext) {
    w.write(iterator.next())
  }
  w.sum
}

def testForeach(iterator: Iterator[Long]): Long = {
  val w = new Writer
  iterator.foreach(w.write)
  w.sum
}

def testFor(iterator: Iterator[Long]): Long = {
  val w = new Writer
  for (x <- iterator) {
    w.write(x)
  }
  w.sum
}

val data = 0L to 100000000L
val start = System.nanoTime
(0 to 10).foreach(_ => testWhile(data.iterator))
println("benchmark while: " + (System.nanoTime - start)/1000000)

val start2 = System.nanoTime
(0 to 10).foreach(_ => testForeach(data.iterator))
println("benchmark foreach: " + (System.nanoTime - start2)/1000000)

val start3 = System.nanoTime
(0 to 10).foreach(_ => testForeach(data.iterator))
println("benchmark for: " + (System.nanoTime - start3)/1000000)
```
Benchmark result:
`while`: 15401 ms
`foreach`: 43034 ms
`for`: 41279 ms

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21381 from gengliangwang/refactorExecuteWriteTask.
2018-06-01 10:01:15 +08:00
Yuming Wang cc976f6cb8 [SPARK-23900][SQL] format_number support user specifed format as argument
## What changes were proposed in this pull request?

`format_number` support user specifed format as argument. For example:
```sql
spark-sql> SELECT format_number(12332.123456, '##################.###');
12332.123
```

## How was this patch tested?

unit test

Author: Yuming Wang <yumwang@ebay.com>

Closes #21010 from wangyum/SPARK-23900.
2018-05-31 11:38:23 -07:00
Marco Gaido 24ef7fbfa9 [SPARK-24276][SQL] Order of literals in IN should not affect semantic equality
## What changes were proposed in this pull request?

When two `In` operators are created with the same list of values, but different order, we are considering them as semantically different. This is wrong, since they have the same semantic meaning.

The PR adds a canonicalization rule which orders the literals in the `In` operator so the semantic equality works properly.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21331 from mgaido91/SPARK-24276.
2018-05-30 15:31:40 -07:00
Marco Gaido 1b36f14889 [SPARK-23901][SQL] Add masking functions
## What changes were proposed in this pull request?

The PR adds the masking function as they are described in Hive's documentation: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DataMaskingFunctions.
This means that only `string`s are accepted as parameter for the masking functions.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21246 from mgaido91/SPARK-23901.
2018-05-30 11:18:04 -07:00
Takeshi Yamamuro 1e46f92f95 [SPARK-24369][SQL] Correct handling for multiple distinct aggregations having the same argument set
## What changes were proposed in this pull request?
This pr fixed an issue when having multiple distinct aggregations having the same argument set, e.g.,
```
scala>: paste
val df = sql(
  s"""SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
     | FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
   """.stripMargin)

java.lang.RuntimeException
You hit a query analyzer bug. Please report your query to Spark user mailing list.
```
The root cause is that `RewriteDistinctAggregates` can't detect multiple distinct aggregations if they have the same argument set. This pr modified code so that `RewriteDistinctAggregates` could count the number of aggregate expressions with `isDistinct=true`.

## How was this patch tested?
Added tests in `DataFrameAggregateSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21443 from maropu/SPARK-24369.
2018-05-31 00:23:25 +08:00
Gengliang Wang f48938800e [SPARK-24365][SQL] Add Data Source write benchmark
## What changes were proposed in this pull request?

Add Data Source write benchmark. So that it would be easier to measure the writer performance.

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21409 from gengliangwang/parquetWriteBenchmark.
2018-05-30 09:32:33 +08:00
DB Tsai 900bc1f7dc
[SPARK-24371][SQL] Added isInCollection in DataFrame API for Scala and Java.
## What changes were proposed in this pull request?

Implemented **`isInCollection `** in DataFrame API for both Scala and Java, so users can do

```scala
val profileDF = Seq(
  Some(1), Some(2), Some(3), Some(4),
  Some(5), Some(6), Some(7), None
).toDF("profileID")

val validUsers: Seq[Any] = Seq(6, 7.toShort, 8L, "3")

val result = profileDF.withColumn("isValid", $"profileID". isInCollection(validUsers))

result.show(10)
"""
+---------+-------+
|profileID|isValid|
+---------+-------+
|        1|  false|
|        2|  false|
|        3|   true|
|        4|  false|
|        5|  false|
|        6|   true|
|        7|   true|
|     null|   null|
+---------+-------+
 """.stripMargin
```
## How was this patch tested?

Several unit tests are added.

Author: DB Tsai <d_tsai@apple.com>

Closes #21416 from dbtsai/optimize-set.
2018-05-29 10:22:18 -07:00
Xiao Li 23db600c95 [SPARK-24250][SQL][FOLLOW-UP] support accessing SQLConf inside tasks
## What changes were proposed in this pull request?
We should not stop users from calling `getActiveSession` and `getDefaultSession` in executors. To not break the existing behaviors, we should simply return None.

## How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21436 from gatorsmile/followUpSPARK-24250.
2018-05-28 23:23:22 -07:00
Dongjoon Hyun b31b587cd0 [SPARK-19613][SS][TEST] Random.nextString is not safe for directory namePrefix
## What changes were proposed in this pull request?

`Random.nextString` is good for generating random string data, but it's not proper for directory name prefix in `Utils.createDirectory(tempDir, Random.nextString(10))`. This PR uses more safe directory namePrefix.

```scala
scala> scala.util.Random.nextString(10)
res0: String = 馨쭔ᎰႻ穚䃈兩㻞藑並
```

```scala
StateStoreRDDSuite:
- versioning and immutability
- recovering from files
- usage with iterators - only gets and only puts
- preferred locations using StateStoreCoordinator *** FAILED ***
  java.io.IOException: Failed to create a temp directory (under /.../spark/sql/core/target/tmp/StateStoreRDDSuite8712796397908632676) after 10 attempts!
  at org.apache.spark.util.Utils$.createDirectory(Utils.scala:295)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13$$anonfun$apply$6.apply(StateStoreRDDSuite.scala:152)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13$$anonfun$apply$6.apply(StateStoreRDDSuite.scala:149)
  at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13.apply(StateStoreRDDSuite.scala:149)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13.apply(StateStoreRDDSuite.scala:149)
...
- distributed test *** FAILED ***
  java.io.IOException: Failed to create a temp directory (under /.../spark/sql/core/target/tmp/StateStoreRDDSuite8712796397908632676) after 10 attempts!
  at org.apache.spark.util.Utils$.createDirectory(Utils.scala:295)
```

## How was this patch tested?

Pass the existing tests.StateStoreRDDSuite:

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21446 from dongjoon-hyun/SPARK-19613.
2018-05-29 10:35:30 +08:00
Marco Gaido de01a8d50c [SPARK-24373][SQL] Add AnalysisBarrier to RelationalGroupedDataset's and KeyValueGroupedDataset's child
## What changes were proposed in this pull request?

When we create a `RelationalGroupedDataset` or a `KeyValueGroupedDataset` we set its child to the `logicalPlan` of the `DataFrame` we need to aggregate. Since the `logicalPlan` is already analyzed, we should not analyze it again. But this happens when the new plan of the aggregate is analyzed.

The current behavior in most of the cases is likely to produce no harm, but in other cases re-analyzing an analyzed plan can change it, since the analysis is not idempotent. This can cause issues like the one described in the JIRA (missing to find a cached plan).

The PR adds an `AnalysisBarrier` to the `logicalPlan` which is used as child of `RelationalGroupedDataset` or a `KeyValueGroupedDataset`.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21432 from mgaido91/SPARK-24373.
2018-05-28 12:09:44 +08:00
Li Jin 672209f290 [SPARK-24334] Fix race condition in ArrowPythonRunner causes unclean shutdown of Arrow memory allocator
## What changes were proposed in this pull request?

There is a race condition of closing Arrow VectorSchemaRoot and Allocator in the writer thread of ArrowPythonRunner.

The race results in memory leak exception when closing the allocator. This patch removes the closing routine from the TaskCompletionListener and make the writer thread responsible for cleaning up the Arrow memory.

This issue be reproduced by this test:

```
def test_memory_leak(self):
    from pyspark.sql.functions import pandas_udf, col, PandasUDFType, array, lit, explode

   # Have all data in a single executor thread so it can trigger the race condition easier
    with self.sql_conf({'spark.sql.shuffle.partitions': 1}):
        df = self.spark.range(0, 1000)
        df = df.withColumn('id', array([lit(i) for i in range(0, 300)])) \
                   .withColumn('id', explode(col('id'))) \
                   .withColumn('v',  array([lit(i) for i in range(0, 1000)]))

       pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
       def foo(pdf):
           xxx
           return pdf

       result = df.groupby('id').apply(foo)

       with QuietTest(self.sc):
           with self.assertRaises(py4j.protocol.Py4JJavaError) as context:
               result.count()
           self.assertTrue('Memory leaked' not in str(context.exception))
```

Note: Because of the race condition, the test case cannot reproduce the issue reliably so it's not added to test cases.

## How was this patch tested?

Because of the race condition, the bug cannot be unit test easily. So far it has only happens on large amount of data. This is currently tested manually.

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21397 from icexelloss/SPARK-24334-arrow-memory-leak.
2018-05-28 10:50:17 +08:00
Miles Yucht d440699192 [SPARK-24381][TESTING] Add unit tests for NOT IN subquery around null values
## What changes were proposed in this pull request?
This PR adds several unit tests along the `cols NOT IN (subquery)` pathway. There are a scattering of tests here and there which cover this codepath, but there doesn't seem to be a unified unit test of the correctness of null-aware anti joins anywhere. I have also added a brief explanation of how this expression behaves in SubquerySuite. Lastly, I made some clarifying changes in the NOT IN pathway in RewritePredicateSubquery.

## How was this patch tested?
Added unit tests! There should be no behavioral change in this PR.

Author: Miles Yucht <miles@databricks.com>

Closes #21425 from mgyucht/spark-24381.
2018-05-26 20:42:23 -07:00
Maxim Gekk 1b1528a504 [SPARK-24366][SQL] Improving of error messages for type converting
## What changes were proposed in this pull request?

Currently, users are getting the following error messages on type conversions:

```
scala.MatchError: test (of class java.lang.String)
```

The message doesn't give any clues to the users where in the schema the error happened. In this PR, I would like to improve the error message like:

```
The value (test) of the type (java.lang.String) cannot be converted to struct<f1:int>
```

## How was this patch tested?

Added tests for converting of wrong values to `struct`, `map`, `array`, `string` and `decimal`.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21410 from MaxGekk/type-conv-error.
2018-05-25 15:42:46 -07:00
Maxim Gekk 64fad0b519 [SPARK-24244][SPARK-24368][SQL] Passing only required columns to the CSV parser
## What changes were proposed in this pull request?

uniVocity parser allows to specify only required column names or indexes for [parsing](https://www.univocity.com/pages/parsers-tutorial) like:

```
// Here we select only the columns by their indexes.
// The parser just skips the values in other columns
parserSettings.selectIndexes(4, 0, 1);
CsvParser parser = new CsvParser(parserSettings);
```
In this PR, I propose to extract indexes from required schema and pass them into the CSV parser. Benchmarks show the following improvements in parsing of 1000 columns:

```
Select 100 columns out of 1000: x1.76
Select 1 column out of 1000: x2
```

**Note**: Comparing to current implementation, the changes can return different result for malformed rows in the `DROPMALFORMED` and `FAILFAST` modes if only subset of all columns is requested. To have previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.

## How was this patch tested?

It was tested by new test which selects 3 columns out of 15, by existing tests and by new benchmarks.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21415 from MaxGekk/csv-column-pruning2.
2018-05-24 21:38:04 -07:00
Gengliang Wang 3b20b34ab7 [SPARK-24367][SQL] Parquet: use JOB_SUMMARY_LEVEL instead of deprecated flag ENABLE_JOB_SUMMARY
## What changes were proposed in this pull request?

In current parquet version,the conf ENABLE_JOB_SUMMARY is deprecated.

When writing to Parquet files, the warning message
```WARN org.apache.parquet.hadoop.ParquetOutputFormat: Setting parquet.enable.summary-metadata is deprecated, please use parquet.summary.metadata.level```
keeps showing up.

From https://github.com/apache/parquet-mr/blame/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L164 we can see that we should use JOB_SUMMARY_LEVEL.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21411 from gengliangwang/summaryLevel.
2018-05-25 11:16:35 +08:00
Jose Torres 0fd68cb727 [SPARK-24234][SS] Support multiple row writers in continuous processing shuffle reader.
## What changes were proposed in this pull request?

https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii

Support multiple different row writers in continuous processing shuffle reader.

Note that having multiple read-side buffers ended up being the natural way to do this. Otherwise it's hard to express the constraint of sending an epoch marker only when all writers have sent one.

## How was this patch tested?

new unit tests

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #21385 from jose-torres/multipleWrite.
2018-05-24 17:08:52 -07:00
Yuming Wang 0d89943449 [SPARK-24378][SQL] Fix date_trunc function incorrect examples
## What changes were proposed in this pull request?

Fix `date_trunc` function incorrect examples.

## How was this patch tested?

N/A

Author: Yuming Wang <yumwang@ebay.com>

Closes #21423 from wangyum/SPARK-24378.
2018-05-24 23:38:50 +08:00
Maxim Gekk 13bedc05c2 [SPARK-24329][SQL] Test for skipping multi-space lines
## What changes were proposed in this pull request?

The PR is a continue of https://github.com/apache/spark/pull/21380 . It checks cases that are handled by the code:
e3de6ab30d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala (L303-L304)

Basically the code skips lines with one or many whitespaces, and lines with comments (see [filterCommentAndEmpty](e3de6ab30d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala (L47)))

```scala
   iter.filter { line =>
      line.trim.nonEmpty && !line.startsWith(options.comment.toString)
    }
```

Closes #21380

## How was this patch tested?

Added a test for the case described above.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21394 from MaxGekk/test-for-multi-space-lines.
2018-05-24 22:18:58 +08:00
Ryan Blue 3469f5c989 [SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase with dictionary filters.
## What changes were proposed in this pull request?

I missed this commit when preparing #21070.

When Parquet is able to filter blocks with dictionary filtering, the expected total value count to be too high in Spark, leading to an error when there were fewer than expected row groups to process. Spark should get the row groups from Parquet to pick up new filter schemes in Parquet like dictionary filtering.

## How was this patch tested?

Using in production at Netflix. Added test case for dictionary-filtered blocks.

Author: Ryan Blue <blue@apache.org>

Closes #21295 from rdblue/SPARK-24230-fix-parquet-block-tracking.
2018-05-24 20:55:26 +08:00
hyukjinkwon 8a545822d0 [SPARK-24364][SS] Prevent InMemoryFileIndex from failing if file path doesn't exist
## What changes were proposed in this pull request?

This PR proposes to follow up https://github.com/apache/spark/pull/15153 and complete SPARK-17599.

`FileSystem` operation (`fs.getFileBlockLocations`) can still fail if the file path does not exist. For example see the exception message below:

```
Error occurred while processing: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...
java.io.FileNotFoundException: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:249)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:229)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:314)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:297)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:297)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:174)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:173)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles(InMemoryFileIndex.scala:173)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:161)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:152)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:166)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:261)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)
	at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:196)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206)
	at com.hwx.StreamTest$.main(StreamTest.scala:97)
	at com.hwx.StreamTest.main(StreamTest.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...
```

So, it fixes it to make a warning instead.

## How was this patch tested?

It's hard to write a test. Manually tested multiple times.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21408 from HyukjinKwon/missing-files.
2018-05-24 13:21:02 +08:00
Dongjoon Hyun 486ecc680e [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
## What changes were proposed in this pull request?

ORC 1.4.4 includes [nine fixes](https://issues.apache.org/jira/issues/?filter=12342568&jql=project%20%3D%20ORC%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%201.4.4). One of the issues is about `Timestamp` bug (ORC-306) which occurs when `native` ORC vectorized reader reads ORC column vector's sub-vector `times` and `nanos`. ORC-306 fixes this according to the [original definition](https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java#L45-L46) and this PR includes the updated interpretation on ORC column vectors. Note that `hive` ORC reader and ORC MR reader is not affected.

```scala
scala> spark.version
res0: String = 2.3.0
scala> spark.sql("set spark.sql.orc.impl=native")
scala> Seq(java.sql.Timestamp.valueOf("1900-05-05 12:34:56.000789")).toDF().write.orc("/tmp/orc")
scala> spark.read.orc("/tmp/orc").show(false)
+--------------------------+
|value                     |
+--------------------------+
|1900-05-05 12:34:55.000789|
+--------------------------+
```

This PR aims to update Apache Spark to use it.

**FULL LIST**

ID | TITLE
-- | --
ORC-281 | Fix compiler warnings from clang 5.0
ORC-301 | `extractFileTail` should open a file in `try` statement
ORC-304 | Fix TestRecordReaderImpl to not fail with new storage-api
ORC-306 | Fix incorrect workaround for bug in java.sql.Timestamp
ORC-324 | Add support for ARM and PPC arch
ORC-330 | Remove unnecessary Hive artifacts from root pom
ORC-332 | Add syntax version to orc_proto.proto
ORC-336 | Remove avro and parquet dependency management entries
ORC-360 | Implement error checking on subtype fields in Java

## How was this patch tested?

Pass the Jenkins.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21372 from dongjoon-hyun/SPARK_ORC144.
2018-05-24 11:34:13 +08:00
sychen 888340151f [SPARK-24257][SQL] LongToUnsafeRowMap calculate the new size may be wrong
LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted.

Author: sychen <sychen@ctrip.com>

Closes #21311 from cxzl25/fix_LongToUnsafeRowMap_page_size.
2018-05-24 11:18:07 +08:00
Vayda, Oleksandr: IT (PRG) 230f144197 [SPARK-24350][SQL] Fixes ClassCastException in the "array_position" function
## What changes were proposed in this pull request?

### Fixes `ClassCastException` in the `array_position` function - [SPARK-24350](https://issues.apache.org/jira/browse/SPARK-24350)
When calling `array_position` function with a wrong type of the 1st argument an `AnalysisException` should be thrown instead of `ClassCastException`

Example:

```sql
select array_position('foo', 'bar')
```

```
java.lang.ClassCastException: org.apache.spark.sql.types.StringType$ cannot be cast to org.apache.spark.sql.types.ArrayType
	at org.apache.spark.sql.catalyst.expressions.ArrayPosition.inputTypes(collectionOperations.scala:1398)
	at org.apache.spark.sql.catalyst.expressions.ExpectsInputTypes$class.checkInputDataTypes(ExpectsInputTypes.scala:44)
	at org.apache.spark.sql.catalyst.expressions.ArrayPosition.checkInputDataTypes(collectionOperations.scala:1401)
	at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:168)
	at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:168)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveAliases$$assignAliases$1$$anonfun$apply$3.applyOrElse(Analyzer.scala:256)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveAliases$$assignAliases$1$$anonfun$apply$3.applyOrElse(Analyzer.scala:252)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
```

## How was this patch tested?

unit test

Author: Vayda, Oleksandr: IT (PRG) <Oleksandr.Vayda@barclayscapital.com>

Closes #21401 from wajda/SPARK-24350-array_position-error-fix.
2018-05-23 17:22:52 -07:00
Jose Torres f457933293 [SPARK-23416][SS] Add a specific stop method for ContinuousExecution.
## What changes were proposed in this pull request?

Add a specific stop method for ContinuousExecution. The previous StreamExecution.stop() method had a race condition as applied to continuous processing: if the cancellation was round-tripped to the driver too quickly, the generic SparkException it caused would be reported as the query death cause. We earlier decided that SparkException should not be added to the StreamExecution.isInterruptionException() whitelist, so we need to ensure this never happens instead.

## How was this patch tested?

Existing tests. I could consistently reproduce the previous flakiness by putting Thread.sleep(1000) between the first job cancellation and thread interruption in StreamExecution.stop().

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #21384 from jose-torres/fixKafka.
2018-05-23 17:21:29 -07:00
jinxing b7a036b75b [SPARK-24294] Throw SparkException when OOM in BroadcastExchangeExec
## What changes were proposed in this pull request?

When OutOfMemoryError thrown from BroadcastExchangeExec, scala.concurrent.Future will hit scala bug – https://github.com/scala/bug/issues/9554, and hang until future timeout:

We could wrap the OOM inside SparkException to resolve this issue.

## How was this patch tested?

Manually tested.

Author: jinxing <jinxing6042@126.com>

Closes #21342 from jinxing64/SPARK-24294.
2018-05-23 13:12:05 -07:00
Takeshi Yamamuro 84557bc9f8 [SPARK-24206][SQL] Improve DataSource read benchmark code
## What changes were proposed in this pull request?
This pr added benchmark code `DataSourceReadBenchmark` for `orc`, `paruqet`, `csv`, and `json` based on the existing `ParquetReadBenchmark` and `OrcReadBenchmark`.

## How was this patch tested?
N/A

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21266 from maropu/DataSourceReadBenchmark.
2018-05-23 13:02:32 -07:00
Xiao Li 5a5a868dc4 Revert "[SPARK-24244][SQL] Passing only required columns to the CSV parser"
This reverts commit 8086acc2f6.
2018-05-23 11:51:13 -07:00
Liang-Chi Hsieh a40ffc656d [SPARK-23711][SQL] Add fallback generator for UnsafeProjection
## What changes were proposed in this pull request?

Add fallback logic for `UnsafeProjection`. In production we can try to create unsafe projection using codegen implementation. Once any compile error happens, it fallbacks to interpreted implementation.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21106 from viirya/SPARK-23711.
2018-05-23 22:40:52 +08:00
Seth Fitzsimmons 00c13cfad7 Correct reference to Offset class
This is a documentation-only correction; `org.apache.spark.sql.sources.v2.reader.Offset` is actually `org.apache.spark.sql.sources.v2.reader.streaming.Offset`.

Author: Seth Fitzsimmons <seth@mojodna.net>

Closes #21387 from mojodna/patch-1.
2018-05-23 09:14:03 +08:00