## What changes were proposed in this pull request?
Union of map and other compatible column result in unresolved operator 'Union; exception
Reproduction
`spark-sql>select map(1,2), 'str' union all select map(1,2,3,null), 1`
Output:
```
Error in query: unresolved operator 'Union;;
'Union
:- Project [map(1, 2) AS map(1, 2)#106, str AS str#107]
: +- OneRowRelation$
+- Project [map(1, cast(2 as int), 3, cast(null as int)) AS map(1, CAST(2 AS INT), 3, CAST(NULL AS INT))#109, 1 AS 1#108]
+- OneRowRelation$
```
So, we should cast part of columns to be compatible when appropriate.
## How was this patch tested?
Added a test (query union of map and other columns) to SQLQueryTestSuite's union.sql.
Author: liutang123 <liutang123@yeah.net>
Closes#21100 from liutang123/SPARK-24012.
## What changes were proposed in this pull request?
Refactor continuous writing to its own class.
See WIP https://github.com/jose-torres/spark/pull/13 for the overall direction this is going, but I think this PR is very isolated and necessary anyway.
## How was this patch tested?
existing unit tests - refactoring only
Author: Jose Torres <torres.joseph.f+github@gmail.com>
Closes#21116 from jose-torres/SPARK-24038.
## What changes were proposed in this pull request?
A structured streaming query with a streaming aggregation can throw the following error in rare cases.
```
java.lang.IllegalStateException: Cannot commit after already committed or aborted
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:643)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:135)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$2$$anonfun$hasNext$2.apply$mcV$sp(statefulOperators.scala:359)
at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:102)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:251)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$2.hasNext(statefulOperators.scala:359)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:188)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:114)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:42)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:336)
```
This can happen when the following conditions are accidentally hit.
- Streaming aggregation with aggregation function that is a subset of [`TypedImperativeAggregation`](76b8b840dd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala (L473)) (for example, `collect_set`, `collect_list`, `percentile`, etc.).
- Query running in `update}` mode
- After the shuffle, a partition has exactly 128 records.
This causes StateStore.commit to be called twice. See the [JIRA](https://issues.apache.org/jira/browse/SPARK-23004) for a more detailed explanation. The solution is to use `NextIterator` or `CompletionIterator`, each of which has a flag to prevent the "onCompletion" task from being called more than once. In this PR, I chose to implement using `NextIterator`.
## How was this patch tested?
Added unit test that I have confirm will fail without the fix.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21124 from tdas/SPARK-23004.
## What changes were proposed in this pull request?
A followup of https://github.com/apache/spark/pull/20988
`PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation.
## How was this patch tested?
existing test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21111 from cloud-fan/refactor.
>What changes were proposed in this pull request?
During evaluation of IN conditions, if the source data frame, is represented by a plan, that uses hive table with columns, which were previously analysed, and the plan has conditions for these fields, that cannot be satisfied (which leads us to an empty data frame), FilterEstimation.evaluateInSet method produces NumberFormatException and ClassCastException.
In order to fix this bug, method FilterEstimation.evaluateInSet at first checks, if distinct count is not zero, and also checks if colStat.min and colStat.max are defined, and only in this case proceeds with the calculation. If at least one of the conditions is not satisfied, zero is returned.
>How was this patch tested?
In order to test the PR two tests were implemented: one in FilterEstimationSuite, that tests the plan with the statistics that violates the conditions mentioned above, and another one in StatisticsCollectionSuite, that test the whole process of analysis/optimisation of the query, that leads to the problems, mentioned in the first section.
Author: Mykhailo Shtelma <mykhailo.shtelma@bearingpoint.com>
Author: smikesh <mshtelma@gmail.com>
Closes#21052 from mshtelma/filter_estimation_evaluateInSet_Bugs.
## What changes were proposed in this pull request?
When the OffsetWindowFunction's frame is `UnaryMinus(Literal(1))` but the specified window frame has been simplified to `Literal(-1)` by some optimizer rules e.g., `ConstantFolding`. Thus, they do not match and cause the following error:
```
org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, -1, -1) must match the required frame specifiedwindowframe(RowFrame, -1, -1);
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
at
```
## How was this patch tested?
Added a test
Author: gatorsmile <gatorsmile@gmail.com>
Closes#21115 from gatorsmile/fixLag.
## What changes were proposed in this pull request?
This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver.
This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq.
## How was this patch tested?
Existing tests for metadata-only queries.
Author: Ryan Blue <blue@apache.org>
Closes#20988 from rdblue/SPARK-23877-metadata-only-push-filters.
## What changes were proposed in this pull request?
Improving the test coverage of window functions focusing on missing test for window aggregate functions. No new UDAF test is added as it has been tested already.
## How was this patch tested?
Only new tests were added, automated tests were executed.
Author: “attilapiros” <piros.attila.zsolt@gmail.com>
Author: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Closes#20046 from attilapiros/SPARK-22362.
## What changes were proposed in this pull request?
In Spark SQL, we usually reuse the `UnsafeRow` instance and need to copy the data when a place buffers non-serialized objects.
Shuffle may buffer objects if we don't make it to the bypass merge shuffle or unsafe shuffle.
`ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` misses the case that, if `spark.sql.shuffle.partitions` is large enough, we could fail to run unsafe shuffle and go with the non-serialized shuffle.
This bug is very hard to hit since users wouldn't set such a large number of partitions(16 million) for Spark SQL exchange.
TODO: test
## How was this patch tested?
todo.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21101 from cloud-fan/shuffle.
## What changes were proposed in this pull request?
The PR adds the SQL function `element_at`. The behavior of the function is based on Presto's one.
This function returns element of array at given index in value if column is array, or returns value for the given key in value if column is map.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21053 from kiszk/SPARK-23924.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_position`. The behavior of the function is based on Presto's one.
The function returns the position of the first occurrence of the element in array x (or 0 if not found) using 1-based index as BigInt.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21037 from kiszk/SPARK-23919.
## What changes were proposed in this pull request?
DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build.
There were multiple issues with the test:
1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout:
```
eventually(timeout(10.seconds), interval(1.millis)) {
assert(DataFrameRangeSuite.stageToKill > 0)
}
```
2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait.
This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `wait` and `CountDownLatch` for synhronization.
## How was this patch tested?
Existing unit test.
Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Closes#20888 from gaborgsomogyi/SPARK-23775.
## What changes were proposed in this pull request?
```
Py4JJavaError: An error occurred while calling o153.sql.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:223)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:189)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:3021)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:89)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:127)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3020)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:646)
at sun.reflect.GeneratedMethodAccessor153.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:293)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:226)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Exception thrown in Future.get:
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:190)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:267)
at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doConsume(BroadcastNestedLoopJoinExec.scala:530)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:37)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:69)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:144)
...
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
... 23 more
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Task not serializable
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:179)
... 276 more
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2380)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:417)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:89)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:125)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:116)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:116)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:271)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:181)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:414)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:61)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:70)
at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:264)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:93)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:81)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:150)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:80)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
at java.nio.ByteBuffer.get(ByteBuffer.java:715)
at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:405)
at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytesUnsafe(Binary.java:414)
at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.writeObject(Binary.java:484)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
```
The Parquet filters are serializable but not thread safe. SparkPlan.prepare() could be called in different threads (BroadcastExchange will call it in a thread pool). Thus, we could serialize the same Parquet filter at the same time. This is not easily reproduced. The fix is to avoid serializing these Parquet filters in the driver. This PR is to avoid serializing these Parquet filters by moving the parquet filter generation from the driver to executors.
## How was this patch tested?
Having two queries one is a 1000-line SQL query and a 3000-line SQL query. Need to run at least one hour with a heavy write workload to reproduce once.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#21086 from gatorsmile/taskNotSerializable.
## What changes were proposed in this pull request?
Each data source implementation can define its own options and teach its users how to set them. Spark doesn't have any restrictions about what options a data source should or should not have. It's possible that some options are very common and many data sources use them. However different data sources may define the common options(key and meaning) differently, which is quite confusing to end users.
This PR defines some standard options that data sources can optionally adopt: path, table and database.
## How was this patch tested?
a new test case.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#20535 from cloud-fan/options.
## What changes were proposed in this pull request?
There was no check on nullability for arguments of `Tuple`s. This could lead to have weird behavior when a null value had to be deserialized into a non-nullable Scala object: in those cases, the `null` got silently transformed in a valid value (like `-1` for `Int`), corresponding to the default value we are using in the SQL codebase. This situation was very likely to happen when deserializing to a Tuple of primitive Scala types (like Double, Int, ...).
The PR adds the `AssertNotNull` to arguments of tuples which have been asked to be converted to non-nullable types.
## How was this patch tested?
added UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#20976 from mgaido91/SPARK-23835.
## What changes were proposed in this pull request?
Unit tests for EpochCoordinator that test correct sequencing of committed epochs. Several tests are ignored since they test functionality implemented in SPARK-23503 which is not yet merged, otherwise they fail.
Author: Efim Poberezkin <efim@poberezkin.ru>
Closes#20983 from efimpoberezkin/pr/EpochCoordinator-tests.
## What changes were proposed in this pull request?
Add a memory source for continuous processing.
Note that only one of the ContinuousSuite tests is migrated to minimize the diff here. I'll submit a second PR for SPARK-23688 to change the rest and get rid of waitForRateSourceTriggers.
## How was this patch tested?
unit test
Author: Jose Torres <torres.joseph.f+github@gmail.com>
Closes#20828 from jose-torres/continuousMemory.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_min`. It takes an array as argument and returns the minimum value in it.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21025 from mgaido91/SPARK-23918.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_max`. It takes an array as argument and returns the maximum value in it.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21024 from mgaido91/SPARK-23917.
## What changes were proposed in this pull request?
Checkpoint files (offset log files, state store files) in Structured Streaming must be written atomically such that no partial files are generated (would break fault-tolerance guarantees). Currently, there are 3 locations which try to do this individually, and in some cases, incorrectly.
1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any implementation of `FileSystem` or `FileContext` APIs. It preferably loads `FileContext` implementation as FileContext of HDFS has atomic renames.
1. HDFSBackedStateStore (aka in-memory state store)
- Writing a version.delta file - This uses FileSystem APIs only to perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem implementation.
- Writing a snapshot file - Same as above.
#### Current problems:
1. State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename.
1. Inflexible - Some file systems provide mechanisms other than write-to-temp-file-and-rename for writing atomically and more efficiently. For example, with S3 you can write directly to the final file and it will be made visible only when the entire file is written and closed correctly. Any failure can be made to terminate the writing without making any partial files visible in S3. The current code does not abstract out this mechanism enough that it can be customized.
#### Solution:
1. Introduce a common interface that all 3 cases above can use to write checkpoint files atomically.
2. This interface must provide the necessary interfaces that allow customization of the write-and-rename mechanism.
This PR does that by introducing the interface `CheckpointFileManager` and modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. Similar to earlier `FileManager`, there are implementations based on `FileSystem` and `FileContext` APIs, and the latter implementation is preferred to make it work correctly with HDFS.
The key method this interface has is `createAtomic(path, overwrite)` which returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All users of this method need to either call `close()` to successfully write the file, or `cancel()` in case of an error.
## How was this patch tested?
New tests in `CheckpointFileManagerSuite` and slightly modified existing tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21048 from tdas/SPARK-23966.
## What changes were proposed in this pull request?
Currently `PartitioningAwareFileIndex` accepts an optional parameter `userPartitionSchema`. If provided, it will combine the inferred partition schema with the parameter.
However,
1. to get `userPartitionSchema`, we need to combine inferred partition schema with `userSpecifiedSchema`
2. to get the inferred partition schema, we have to create a temporary file index.
Only after that, a final version of `PartitioningAwareFileIndex` can be created.
This can be improved by passing `userSpecifiedSchema` to `PartitioningAwareFileIndex`.
With the improvement, we can reduce redundant code and avoid parsing the file partition twice.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21004 from gengliangwang/PartitioningAwareFileIndex.
## What changes were proposed in this pull request?
Add UDF weekday
## How was this patch tested?
A new test
Author: yucai <yyu1@ebay.com>
Closes#21009 from yucai/SPARK-23905.
## What changes were proposed in this pull request?
Many suites currently leak Spark sessions (sometimes with stopped SparkContexts) via the thread-local active Spark session and default Spark session. We should attempt to clean these up and detect when this happens to improve the reproducibility of tests.
## How was this patch tested?
Existing tests
Author: Eric Liang <ekl@databricks.com>
Closes#21058 from ericl/clear-session.
## What changes were proposed in this pull request?
This PR proposes to add `collect` to a query executor as an action.
Seems `collect` / `collect` with Arrow are not recognised via `QueryExecutionListener` as an action. For example, if we have a custom listener as below:
```scala
package org.apache.spark.sql
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
class TestQueryExecutionListener extends QueryExecutionListener with Logging {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
logError("Look at me! I'm 'onSuccess'")
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { }
}
```
and set `spark.sql.queryExecutionListeners` to `org.apache.spark.sql.TestQueryExecutionListener`
Other operations in PySpark or Scala side seems fine:
```python
>>> sql("SELECT * FROM range(1)").show()
```
```
18/04/09 17:02:04 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
+---+
| id|
+---+
| 0|
+---+
```
```scala
scala> sql("SELECT * FROM range(1)").collect()
```
```
18/04/09 16:58:41 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
res1: Array[org.apache.spark.sql.Row] = Array([0])
```
but ..
**Before**
```python
>>> sql("SELECT * FROM range(1)").collect()
```
```
[Row(id=0)]
```
```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
```
```
id
0 0
```
**After**
```python
>>> sql("SELECT * FROM range(1)").collect()
```
```
18/04/09 16:57:58 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
[Row(id=0)]
```
```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
```
```
18/04/09 17:53:26 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
id
0 0
```
## How was this patch tested?
I have manually tested as described above and unit test was added.
Author: hyukjinkwon <gurwls223@apache.org>
Closes#21007 from HyukjinKwon/SPARK-23942.
## What changes were proposed in this pull request?
Current SS continuous doesn't support processing on temp table or `df.as("xxx")`, SS will throw an exception as LogicalPlan not supported, details described in [here](https://issues.apache.org/jira/browse/SPARK-23748).
So here propose to add this support.
## How was this patch tested?
new UT.
Author: jerryshao <sshao@hortonworks.com>
Closes#21017 from jerryshao/SPARK-23748.
SQLMetricsTestUtils.currentExecutionIds() was racing with the listener
bus, which lead to some flaky tests. We should wait till the listener bus is
empty.
I tested by adding some Thread.sleep()s in SQLAppStatusListener, which
reproduced the exceptions I saw on Jenkins. With this change, they went
away.
Author: Imran Rashid <irashid@cloudera.com>
Closes#21041 from squito/SPARK-23962.
## What changes were proposed in this pull request?
Mark `HashAggregateExec.bufVars` as transient to avoid it from being serialized.
Also manually null out this field at the end of `doProduceWithoutKeys()` to shorten its lifecycle, because it'll no longer be used after that.
## How was this patch tested?
Existing tests.
Author: Kris Mok <kris.mok@databricks.com>
Closes#21039 from rednaxelafx/codegen-improve.
## What changes were proposed in this pull request?
This PR slightly refactors the newly added `ExprValue` API by quite a bit. The following changes are introduced:
1. `ExprValue` now uses the actual class instead of the class name as its type. This should give some more flexibility with generating code in the future.
2. Renamed `StatementValue` to `SimpleExprValue`. The statement concept is broader then an expression (untyped and it cannot be on the right hand side of an assignment), and this was not really what we were using it for. I have added a top level `JavaCode` trait that can be used in the future to reinstate (no pun intended) a statement a-like code fragment.
3. Added factory methods to the `JavaCode` companion object to make it slightly less verbose to create `JavaCode`/`ExprValue` objects. This is also what makes the diff quite large.
4. Added one more factory method to `ExprCode` to make it easier to create code-less expressions.
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#21026 from hvanhovell/SPARK-23951.
## What changes were proposed in this pull request?
In the PR #20886, I mistakenly check the table location only when `ignoreIfExists` is false, which was following the original deprecated PR.
That was wrong. When `ignoreIfExists` is true and the target table doesn't exist, we should also check the table location. In other word, **`ignoreIfExists` has nothing to do with table location validation**.
This is a follow-up PR to fix the mistake.
## How was this patch tested?
Add one unit test.
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21001 from gengliangwang/SPARK-19724-followup.
## What changes were proposed in this pull request?
The codegen output of `Expression`, aka `ExprCode`, now encapsulates only strings of output value (`value`) and nullability (`isNull`). It makes difficulty for us to know what the output really is. I think it is better if we can add wrappers for the value and nullability that let us to easily know that.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#20043 from viirya/SPARK-22856.
## What changes were proposed in this pull request?
This PR avoids possible overflow at an operation `long = (long)(int * int)`. The multiplication of large positive integer values may set one to MSB. This leads to a negative value in long while we expected a positive value (e.g. `0111_0000_0000_0000 * 0000_0000_0000_0010`).
This PR performs long cast before the multiplication to avoid this situation.
## How was this patch tested?
Existing UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21002 from kiszk/SPARK-23893.
## What changes were proposed in this pull request?
Proposed tests checks that only subset of input dataset is touched during schema inferring.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#20963 from MaxGekk/json-sampling-tests.
## What changes were proposed in this pull request?
Column.scala and Functions.scala have asc_nulls_first, asc_nulls_last, desc_nulls_first and desc_nulls_last. Add the corresponding python APIs in column.py and functions.py
## How was this patch tested?
Add doctest
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#20962 from huaxingao/spark-23847.
## What changes were proposed in this pull request?
Add docstring to clarify default window frame boundaries with and without orderBy clause
## How was this patch tested?
Manually generate doc and check.
Author: Li Jin <ice.xelloss@gmail.com>
Closes#20978 from icexelloss/SPARK-23861-window-doc.
## What changes were proposed in this pull request?
This pull request tries to improve the error message for spark while reading parquet files with different schemas, e.g. One with a STRING column and the other with a INT column. A new ParquetSchemaColumnConvertNotSupportedException is added to replace the old UnsupportedOperationException. The Exception is again wrapped in FileScanRdd.scala to throw a more a general QueryExecutionException with the actual parquet file name which trigger the exception.
## How was this patch tested?
Unit tests added to check the new exception and verify the error messages.
Also manually tested with two parquet with different schema to check the error message.
<img width="1125" alt="screen shot 2018-03-30 at 4 03 04 pm" src="https://user-images.githubusercontent.com/37087310/38156580-dd58a140-3433-11e8-973a-b816d859fbe1.png">
Author: Yuchen Huo <yuchen.huo@databricks.com>
Closes#20953 from yuchenhuo/SPARK-23822.
## What changes were proposed in this pull request?
This PR is to finish https://github.com/apache/spark/pull/17272
This JIRA is a follow up work after SPARK-19583
As we discussed in that PR
The following DDL for a managed table with an existed default location should throw an exception:
CREATE TABLE ... (PARTITIONED BY ...) AS SELECT ...
CREATE TABLE ... (PARTITIONED BY ...)
Currently there are some situations which are not consist with above logic:
CREATE TABLE ... (PARTITIONED BY ...) succeed with an existed default location
situation: for both hive/datasource(with HiveExternalCatalog/InMemoryCatalog)
CREATE TABLE ... (PARTITIONED BY ...) AS SELECT ...
situation: hive table succeed with an existed default location
This PR is going to make above two situations consist with the logic that it should throw an exception
with an existed default location.
## How was this patch tested?
unit test added
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#20886 from gengliangwang/pr-17272.
## What changes were proposed in this pull request?
This PR allows us to use one of several types of `MemoryBlock`, such as byte array, int array, long array, or `java.nio.DirectByteBuffer`. To use `java.nio.DirectByteBuffer` allows to have off heap memory which is automatically deallocated by JVM. `MemoryBlock` class has primitive accessors like `Platform.getInt()`, `Platform.putint()`, or `Platform.copyMemory()`.
This PR uses `MemoryBlock` for `OffHeapColumnVector`, `UTF8String`, and other places. This PR can improve performance of operations involving memory accesses (e.g. `UTF8String.trim`) by 1.8x.
For now, this PR does not use `MemoryBlock` for `BufferHolder` based on cloud-fan's [suggestion](https://github.com/apache/spark/pull/11494#issuecomment-309694290).
Since this PR is a successor of #11494, close#11494. Many codes were ported from #11494. Many efforts were put here. **I think this PR should credit to yzotov.**
This PR can achieve **1.1-1.4x performance improvements** for operations in `UTF8String` or `Murmur3_x86_32`. Other operations are almost comparable performances.
Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Hash byte arrays with length 268435487: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32 526 / 536 0.0 131399881.5 1.0X
UTF8String benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
hashCode 525 / 552 1022.6 1.0 1.0X
substring 414 / 423 1298.0 0.8 1.3X
```
With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Hash byte arrays with length 268435487: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32 474 / 488 0.0 118552232.0 1.0X
UTF8String benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
hashCode 476 / 480 1127.3 0.9 1.0X
substring 287 / 291 1869.9 0.5 1.7X
```
Benchmark program
```
test("benchmark Murmur3_x86_32") {
val length = 8192 * 32768 + 31
val seed = 42L
val iters = 1 << 2
val random = new Random(seed)
val arrays = Array.fill[MemoryBlock](numArrays) {
val bytes = new Array[Byte](length)
random.nextBytes(bytes)
new ByteArrayMemoryBlock(bytes, Platform.BYTE_ARRAY_OFFSET, length)
}
val benchmark = new Benchmark("Hash byte arrays with length " + length,
iters * numArrays, minNumIters = 20)
benchmark.addCase("HiveHasher") { _: Int =>
var sum = 0L
for (_ <- 0L until iters) {
sum += HiveHasher.hashUnsafeBytesBlock(
arrays(i), Platform.BYTE_ARRAY_OFFSET, length)
}
}
benchmark.run()
}
test("benchmark UTF8String") {
val N = 512 * 1024 * 1024
val iters = 2
val benchmark = new Benchmark("UTF8String benchmark", N, minNumIters = 20)
val str0 = new java.io.StringWriter() { { for (i <- 0 until N) { write(" ") } } }.toString
val s0 = UTF8String.fromString(str0)
benchmark.addCase("hashCode") { _: Int =>
var h: Int = 0
for (_ <- 0L until iters) { h += s0.hashCode }
}
benchmark.addCase("substring") { _: Int =>
var s: UTF8String = null
for (_ <- 0L until iters) { s = s0.substring(N / 2 - 5, N / 2 + 5) }
}
benchmark.run()
}
```
I run [this benchmark program](https://gist.github.com/kiszk/94f75b506c93a663bbbc372ffe8f05de) using [the commit](ee5a79861c). I got the following results:
```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Memory access benchmarks: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ByteArrayMemoryBlock get/putInt() 220 / 221 609.3 1.6 1.0X
Platform get/putInt(byte[]) 220 / 236 610.9 1.6 1.0X
Platform get/putInt(Object) 492 / 494 272.8 3.7 0.4X
OnHeapMemoryBlock get/putLong() 322 / 323 416.5 2.4 0.7X
long[] 221 / 221 608.0 1.6 1.0X
Platform get/putLong(long[]) 321 / 321 418.7 2.4 0.7X
Platform get/putLong(Object) 561 / 563 239.2 4.2 0.4X
```
I also run [this benchmark program](https://gist.github.com/kiszk/5fdb4e03733a5d110421177e289d1fb5) for comparing performance of `Platform.copyMemory()`.
```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Platform copyMemory: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Object to Object 1961 / 1967 8.6 116.9 1.0X
System.arraycopy Object to Object 1917 / 1921 8.8 114.3 1.0X
byte array to byte array 1961 / 1968 8.6 116.9 1.0X
System.arraycopy byte array to byte array 1909 / 1937 8.8 113.8 1.0X
int array to int array 1921 / 1990 8.7 114.5 1.0X
double array to double array 1918 / 1923 8.7 114.3 1.0X
Object to byte array 1961 / 1967 8.6 116.9 1.0X
Object to short array 1965 / 1972 8.5 117.1 1.0X
Object to int array 1910 / 1915 8.8 113.9 1.0X
Object to float array 1971 / 1978 8.5 117.5 1.0X
Object to double array 1919 / 1944 8.7 114.4 1.0X
byte array to Object 1959 / 1967 8.6 116.8 1.0X
int array to Object 1961 / 1970 8.6 116.9 1.0X
double array to Object 1917 / 1924 8.8 114.3 1.0X
```
These results show three facts:
1. According to the second/third or sixth/seventh results in the first experiment, if we use `Platform.get/putInt(Object)`, we achieve more than 2x worse performance than `Platform.get/putInt(byte[])` with concrete type (i.e. `byte[]`).
2. According to the second/third or fourth/fifth/sixth results in the first experiment, the fastest way to access an array element on Java heap is `array[]`. **Cons of `array[]` is that it is not possible to support unaligned-8byte access.**
3. According to the first/second/third or fourth/sixth/seventh results in the first experiment, `getInt()/putInt() or getLong()/putLong()` in subclasses of `MemoryBlock` can achieve comparable performance to `Platform.get/putInt()` or `Platform.get/putLong()` with concrete type (second or sixth result). There is no overhead regarding virtual call.
4. According to results in the second experiment, for `Platform.copy()`, to pass `Object` can achieve the same performance as to pass any type of primitive array as source or destination.
5. According to second/fourth results in the second experiment, `Platform.copy()` can achieve the same performance as `System.arrayCopy`. **It would be good to use `Platform.copy()` since `Platform.copy()` can take any types for src and dst.**
We are incrementally replace `Platform.get/putXXX` with `MemoryBlock.get/putXXX`. This is because we have two advantages.
1) Achieve better performance due to having a concrete type for an array.
2) Use simple OO design instead of passing `Object`
It is easy to use `MemoryBlock` in `InternalRow`, `BufferHolder`, `TaskMemoryManager`, and others that are already abstracted. It is not easy to use `MemoryBlock` in utility classes related to hashing or others.
Other candidates are
- UnsafeRow, UnsafeArrayData, UnsafeMapData, SpecificUnsafeRowJoiner
- UTF8StringBuffer
- BufferHolder
- TaskMemoryManager
- OnHeapColumnVector
- BytesToBytesMap
- CachedBatch
- classes for hash
- others.
## How was this patch tested?
Added `UnsafeMemoryAllocator`
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#19222 from kiszk/SPARK-10399.
## What changes were proposed in this pull request?
Currently, the active spark session is set inconsistently (e.g., in createDataFrame, prior to query execution). Many places in spark also incorrectly query active session when they should be calling activeSession.getOrElse(defaultSession) and so might get None even if a Spark session exists.
The semantics here can be cleaned up if we also set the active session when the default session is set.
Related: https://github.com/apache/spark/pull/20926/files
## How was this patch tested?
Unit test, existing test. Note that if https://github.com/apache/spark/pull/20926 merges first we should also update the tests there.
Author: Eric Liang <ekl@databricks.com>
Closes#20927 from ericl/active-session-cleanup.
## What changes were proposed in this pull request?
Migrate foreach sink to DataSourceV2.
Since the previous attempt at this PR #20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works.
## How was this patch tested?
existing tests
Author: Jose Torres <torres.joseph.f+github@gmail.com>
Closes#20951 from jose-torres/foreach.
## What changes were proposed in this pull request?
This PR implemented the following cleanups related to `UnsafeWriter` class:
- Remove code duplication between `UnsafeRowWriter` and `UnsafeArrayWriter`
- Make `BufferHolder` class internal by delegating its accessor methods to `UnsafeWriter`
- Replace `UnsafeRow.setTotalSize(...)` with `UnsafeRowWriter.setTotalSize()`
## How was this patch tested?
Tested by existing UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#20850 from kiszk/SPARK-23713.
## What changes were proposed in this pull request?
Currently, the requiredChildDistribution does not specify the partitions. This can cause the weird corner cases where the child's distribution is `SinglePartition` which satisfies the required distribution of `ClusterDistribution(no-num-partition-requirement)`, thus eliminating the shuffle needed to repartition input data into the required number of partitions (i.e. same as state stores). That can lead to "file not found" errors on the state store delta files as the micro-batch-with-no-shuffle will not run certain tasks and therefore not generate the expected state store delta files.
This PR adds the required constraint on the number of partitions.
## How was this patch tested?
Modified test harness to always check that ANY stateful operator should have a constraint on the number of partitions. As part of that, the existing opt-in checks on child output partitioning were removed, as they are redundant.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#20941 from tdas/SPARK-23827.