9509 commits
Author | SHA1 | Message | Date | |
---|---|---|---|---|
Takeshi Yamamuro | b806fc4582 |
[SPARK-31854][SQL] Invoke in MapElementsExec should not propagate null
### What changes were proposed in this pull request?
This PR intends to fix a bug of `Dataset.map` below when the whole-stage codegen enabled;
```
scala> val ds = Seq(1.asInstanceOf[Integer], null.asInstanceOf[Integer]).toDS()
scala> sql("SET spark.sql.codegen.wholeStage=true")
scala> ds.map(v=>(v,v)).explain
== Physical Plan ==
*(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1.intValue AS _1#69, assertnotnull(input[0, scala.Tuple2, true])._2.intValue AS _2#70]
+- *(1) MapElements <function1>, obj#68: scala.Tuple2
+- *(1) DeserializeToObject staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, value#1, true, false), obj#67: java.lang.Integer
+- LocalTableScan [value#1]
// `AssertNotNull` in `SerializeFromObject` will fail;
scala> ds.map(v => (v, v)).show()
java.lang.NullPointerException: Null value appeared in non-nullable fails:
top level Product input object
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
// When the whole-stage codegen disabled, the query works well;
scala> sql("SET spark.sql.codegen.wholeStage=false")
scala> ds.map(v=>(v,v)).show()
+----+----+
| _1| _2|
+----+----+
| 1| 1|
|null|null|
+----+----+
```
A root cause is that `Invoke` used in `MapElementsExec` propagates input null, and then [AssertNotNull](
|
||
HyukjinKwon | e69466056f |
[SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic
### What changes were proposed in this pull request? This PR proposes to make PySpark exception more Pythonic by hiding JVM stacktrace by default. It can be enabled by turning on `spark.sql.pyspark.jvmStacktrace.enabled` configuration. ``` Traceback (most recent call last): ... pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): ... ``` If this `spark.sql.pyspark.jvmStacktrace.enabled` is enabled, it appends: ``` JVM stacktrace: org.apache.spark.Exception: ... ... ``` For example, the codes below: ```python from pyspark.sql.functions import udf udf def divide_by_zero(v): raise v / 0 spark.range(1).select(divide_by_zero("id")).show() ``` will show an error messages that looks like Python exception thrown from the local. <details> <summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is off (default)</summary> ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "<string>", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero ``` </details> <details> <summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is on</summary> ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 137, in deco raise_from(converted) File "<string>", line 3, in raise_from pyspark.sql.utils.PythonException: An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace. Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero JVM stacktrace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 192.168.35.193, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642) at org.apache.spark.sql.Dataset.head(Dataset.scala:2695) at org.apache.spark.sql.Dataset.take(Dataset.scala:2902) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300) at org.apache.spark.sql.Dataset.showString(Dataset.scala:337) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more ``` </details> <details> <summary>Python exception message without this change</summary> ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o160.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 5.0 failed 4 times, most recent failure: Lost task 10.3 in stage 5.0 (TID 37, 192.168.35.193, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642) at org.apache.spark.sql.Dataset.head(Dataset.scala:2695) at org.apache.spark.sql.Dataset.take(Dataset.scala:2902) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300) at org.apache.spark.sql.Dataset.showString(Dataset.scala:337) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched for item in iterator: File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda> return lambda *a: f(*a) File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in divide_by_zero ZeroDivisionError: division by zero at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more ``` </details> <br/> Another example with Python 3.7: ```python sql("a") ``` <details> <summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is off (default)</summary> ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/session.py", line 646, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "<string>", line 3, in raise_from pyspark.sql.utils.ParseException: mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == a ^^^ ``` </details> <details> <summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is on</summary> ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/session.py", line 646, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco raise_from(converted) File "<string>", line 3, in raise_from pyspark.sql.utils.ParseException: mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == a ^^^ JVM stacktrace: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == a ^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81) at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) ``` </details> <details> <summary>Python exception message without this change</summary> ``` Traceback (most recent call last): File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o26.sql. : org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == a ^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81) at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/session.py", line 646, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 102, in deco raise converted pyspark.sql.utils.ParseException: mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == a ^^^ ``` </details> ### Why are the changes needed? Currently, PySpark exceptions are very unfriendly to Python users with causing a bunch of JVM stacktrace. See "Python exception message without this change" above. ### Does this PR introduce _any_ user-facing change? Yes, it will change the exception message. See the examples above. ### How was this patch tested? Manually tested by ```bash ./bin/pyspark --conf spark.sql.pyspark.jvmStacktrace.enabled=true ``` and running the examples above. Closes #28661 from HyukjinKwon/python-debug. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Max Gekk | 47dc332258 |
[SPARK-31874][SQL] Use FastDateFormat as the legacy fractional formatter
### What changes were proposed in this pull request? 1. Replace `SimpleDateFormat` by `FastDateFormat` as the legacy formatter of `FractionTimestampFormatter`. 2. Optimise `LegacyFastTimestampFormatter` for `java.sql.Timestamp` w/o fractional part. ### Why are the changes needed? 1. By default `HiveResult`.`hiveResultString` retrieves timestamps values as instances of `java.sql.Timestamp`, and uses the legacy parser `SimpleDateFormat` to convert the timestamps to strings. After the fix https://github.com/apache/spark/pull/28024, the fractional formatter and its companion - legacy formatter `SimpleDateFormat` are created per every value. By switching from `LegacySimpleTimestampFormatter` to `LegacyFastTimestampFormatter`, we can utilize the internal cache of `FastDateFormat`, and avoid parsing the default pattern `yyyy-MM-dd HH:mm:ss`. 2. The second change in the method `def format(ts: Timestamp): String` of `LegacyFastTimestampFormatter` is needed to optimize the formatter for patterns without the fractional part and avoid conversions to microseconds. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By existing tests in `TimestampFormatter`. Closes #28678 from MaxGekk/fastdateformat-as-legacy-frac-formatter. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Kent Yao | 547c5bf552 |
[SPARK-31867][SQL] Disable year type datetime patterns which are longer than 10
### What changes were proposed in this pull request? As mentioned in https://github.com/apache/spark/pull/28673 and suggested via cloud-fan at https://github.com/apache/spark/pull/28673#discussion_r432817075 In this PR, we disable datetime pattern in the form of `y..y` and `Y..Y` whose lengths are greater than 10 to avoid sort of JDK bug as described below he new datetime formatter introduces silent data change like, ```sql spark-sql> select from_unixtime(1, 'yyyyyyyyyyy-MM-dd'); NULL spark-sql> set spark.sql.legacy.timeParserPolicy=legacy; spark.sql.legacy.timeParserPolicy legacy spark-sql> select from_unixtime(1, 'yyyyyyyyyyy-MM-dd'); 00000001970-01-01 spark-sql> ``` For patterns that support `SignStyle.EXCEEDS_PAD`, e.g. `y..y`(len >=4), when using the `NumberPrinterParser` to format it ```java switch (signStyle) { case EXCEEDS_PAD: if (minWidth < 19 && value >= EXCEED_POINTS[minWidth]) { buf.append(decimalStyle.getPositiveSign()); } break; .... ``` the `minWidth` == `len(y..y)` the `EXCEED_POINTS` is ```java /** * Array of 10 to the power of n. */ static final long[] EXCEED_POINTS = new long[] { 0L, 10L, 100L, 1000L, 10000L, 100000L, 1000000L, 10000000L, 100000000L, 1000000000L, 10000000000L, }; ``` So when the `len(y..y)` is greater than 10, ` ArrayIndexOutOfBoundsException` will be raised. And at the caller side, for `from_unixtime`, the exception will be suppressed and silent data change occurs. for `date_format`, the `ArrayIndexOutOfBoundsException` will continue. ### Why are the changes needed? fix silent data change ### Does this PR introduce _any_ user-facing change? Yes, SparkUpgradeException will take place of `null` result when the pattern contains 10 or more continuous 'y' or 'Y' ### How was this patch tested? new tests Closes #28684 from yaooqinn/SPARK-31867-2. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Maryann Xue | b9737c3c22 |
[SPARK-31864][SQL] Adjust AQE skew join trigger condition
### What changes were proposed in this pull request? This PR makes a minor change in deciding whether a partition is skewed by comparing the partition size to the median size of coalesced partitions instead of median size of raw partitions before coalescing. ### Why are the changes needed? This change is line with target size criteria of splitting skew join partitions and can also cope with situations of extra empty partitions caused by over-partitioning. This PR has also improved skew join tests in AQE tests. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Updated UTs. Closes #28669 from maryannxue/spark-31864. Authored-by: Maryann Xue <maryann.xue@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Juliusz Sompolski | af35691de4 |
[SPARK-31859][SPARK-31861][SPARK-31863] Fix Thriftserver session timezone issues
### What changes were proposed in this pull request? Timestamp literals in Spark are interpreted as timestamps in local timezone spark.sql.session.timeZone. If JDBC client is e.g. in TimeZone UTC-7, and sets spark.sql.session.timeZone to PST, and sends a query "SELECT timestamp '2020-05-20 12:00:00'", and the JVM timezone of the Spark cluster is e.g. UTC+2, then what currently happens is: * The timestamp literal in the query is interpreted as 12:00:00 UTC-7, i.e. 19:00:00 UTC. * When it's returned from the query, it is collected as a java.sql.Timestamp object with Dataset.collect(), and put into a Thriftserver RowSet. * Before sending it over the wire, the Timestamp is converted to String. This happens in explicitly in ColumnValue for RowBasedSet, and implicitly in ColumnBuffer for ColumnBasedSet (all non-primitive types are converted toString() there). The conversion toString uses JVM timezone, which results in a "21:00:00" (UTC+2) string representation. * The client JDBC application parses gets a "21:00:00" Timestamp back (in it's JVM timezone; if the JDBC application cares about the correct UTC internal value, it should set spark.sql.session.timeZone to be consistent with its JVM timezone) The problem is caused by the conversion happening in Thriftserver RowSet with the generic toString() function, instead of using HiveResults.toHiveString() that takes care of correct, timezone respecting conversions. This PR fixes it by converting the Timestamp values to String earlier, in SparkExecuteStatementOperation, using that function. This fixes SPARK-31861. Thriftserver also did not work spark.sql.datetime.java8API.enabled, because the conversions in RowSet expected an Timestamp object instead of Instant object. Using HiveResults.toHiveString() also fixes that. For this reason, we also convert Date values in SparkExecuteStatementOperation as well - so that HiveResults.toHiveString() handles LocalDate as well. This fixes SPARK-31859. Thriftserver also did not correctly set the active SparkSession. Because of that, configuration obtained using SQLConf.get was not the correct session configuration. This affected getting the correct spark.sql.session.timeZone. It is fixed by extending the use of SparkExecuteStatementOperation.withSchedulerPool to also set the correct active SparkSession. When the correct session is set, we also no longer need to maintain the pool mapping in a sessionToActivePool map. The scheduler pool can be just correctly retrieved from the session config. "withSchedulerPool" is renamed to "withLocalProperties" and moved into a mixin helper trait, because it should be applied with every operation. This fixes SPARK-31863. I used the opportunity to move some repetitive code from the operations to the mixin helper trait. Closes #28671 from juliuszsompolski/SPARK-31861. Authored-by: Juliusz Sompolski <julek@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Jungtaek Lim (HeartSaVioR) | fe1d1e24bc |
[SPARK-31214][BUILD] Upgrade Janino to 3.1.2
### What changes were proposed in this pull request? This PR proposes to upgrade Janino to 3.1.2 which is released recently. Major changes were done for refactoring, as well as there're lots of "no commit message". Belows are the pairs of (commit title, commit) which seem to deal with some bugs or specific improvements (not purposed to refactor) after 3.0.15. * Issue #119: Guarantee executing popOperand() in popUninitializedVariableOperand() via moving popOperand() out of "assert" * Issue #116: replace operand to final target type if boxing conversion and widening reference conversion happen together * Merged pull request `#114` "Grow the code for relocatables, and do fixup, and relocate". * |
||
Yuming Wang | 91148f428b |
[SPARK-28481][SQL] More expressions should extend NullIntolerant
### What changes were proposed in this pull request? 1. Make more expressions extend `NullIntolerant`. 2. Add a checker(in `ExpressionInfoSuite`) to identify whether the expression is `NullIntolerant`. ### Why are the changes needed? Avoid skew join if the join column has many null values and can improve query performance. For examples: ```sql CREATE TABLE t1(c1 string, c2 string) USING parquet; CREATE TABLE t2(c1 string, c2 string) USING parquet; EXPLAIN SELECT t1.* FROM t1 JOIN t2 ON upper(t1.c1) = upper(t2.c1); ``` Before and after this PR: ```sql == Physical Plan == *(2) Project [c1#5, c2#6] +- *(2) BroadcastHashJoin [upper(c1#5)], [upper(c1#7)], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(upper(input[0, string, true]))), [id=#41] : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[c1#5,c2#6] +- *(2) ColumnarToRow +- FileScan parquet default.t2[c1#7] == Physical Plan == *(2) Project [c1#5, c2#6] +- *(2) BroadcastHashJoin [upper(c1#5)], [upper(c1#7)], Inner, BuildRight :- *(2) Project [c1#5, c2#6] : +- *(2) Filter isnotnull(c1#5) : +- *(2) ColumnarToRow : +- FileScan parquet default.t1[c1#5,c2#6] +- BroadcastExchange HashedRelationBroadcastMode(List(upper(input[0, string, true]))), [id=#59] +- *(1) Project [c1#7] +- *(1) Filter isnotnull(c1#7) +- *(1) ColumnarToRow +- FileScan parquet default.t2[c1#7] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #28626 from wangyum/SPARK-28481. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Maryann Xue | 45864faaf2 |
[SPARK-31862][SQL] Remove exception wrapping in AQE
### What changes were proposed in this pull request? This PR removes the excessive exception wrapping in AQE so that error messages are less verbose and mostly consistent with non-aqe execution. Exceptions from stage materialization are now only wrapped with `SparkException` if there are multiple stage failures. Also, stage cancelling errors will not be included as part the exception thrown, but rather just be error logged. ### Why are the changes needed? This will make the AQE error reporting more readable and debuggable. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Updated existing tests. Closes #28668 from maryannxue/spark-31862. Authored-by: Maryann Xue <maryann.xue@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Maryann Xue | 90302309a3 |
[SPARK-31865][SQL] Fix complex AQE query stage not reused
### What changes were proposed in this pull request? This PR fixes the issue of complex query stages that contain sub stages not being reused at times due to dynamic plan changes. This PR synchronizes the "finished" flag between all reused stages so that the runtime replanning would always produce the same sub plan for their potentially reusable parent stages. ### Why are the changes needed? Without this change, complex query stages that contain sub stages will sometimes not be reused due to dynamic plan changes and the status of their child query stages not being synced. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested TPC-DS q47 and q57. Before this PR, the reuse of the biggest stage would happen with a 50/50 chance; and after this PR, it will happen 100% of the time. Closes #28670 from maryannxue/fix-aqe-reuse. Authored-by: Maryann Xue <maryann.xue@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Kent Yao | fe1da296da |
[SPARK-31833][SQL][TEST-HIVE1.2] Set HiveThriftServer2 with actual port while configured 0
### What changes were proposed in this pull request? When I was developing some stuff based on the `DeveloperAPI ` `org.apache.spark.sql.hive.thriftserver.HiveThriftServer2#startWithContext`, I need to use thrift port randomly to avoid race on ports. But the `org.apache.hive.service.cli.thrift.ThriftCLIService#getPortNumber` do not respond to me with the actual bound port but always 0. And the server log is not right too, after starting the server, it's hard to form to the right JDBC connection string. ``` INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 0 with 5...500 worker threads ``` Indeed, the `53742` is the right port ```shell lsof -nP -p `cat ./pid/spark-kentyao-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1.pid` | grep LISTEN java 18990 kentyao 288u IPv6 0x22858e3e60d6a0a7 0t0 TCP 10.242.189.214:53723 (LISTEN) java 18990 kentyao 290u IPv6 0x22858e3e60d68827 0t0 TCP *:4040 (LISTEN) java 18990 kentyao 366u IPv6 0x22858e3e60d66987 0t0 TCP 10.242.189.214:53724 (LISTEN) java 18990 kentyao 438u IPv6 0x22858e3e60d65d47 0t0 TCP *:53742 (LISTEN) ``` In the PR, when the port is configured 0, the `portNum` will be set to the real used port during the start process. Also use 0 in thrift related tests to avoid potential flakiness. ### Why are the changes needed? 1 fix API bug 2 reduce test flakiness ### Does this PR introduce _any_ user-facing change? yes, `org.apache.hive.service.cli.thrift.ThriftCLIService#getPortNumber` will always give you the actual port when it is configured 0. ### How was this patch tested? modified unit tests Closes #28651 from yaooqinn/SPARK-31833. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
GuoPhilipse | dfbc5edf20 |
[SPARK-31839][TESTS] Delete duplicate code in castsuit
### What changes were proposed in this pull request? Delete duplicate code castsuit ### Why are the changes needed? keep spark code clean ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? no need Closes #28655 from GuoPhilipse/delete-duplicate-code-castsuit. Lead-authored-by: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com> Co-authored-by: GuoPhilipse <guofei_ok@126.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Wenchen Fan | 1528fbced8 |
[SPARK-31827][SQL] fail datetime parsing/formatting if detect the Java 8 bug of stand-alone form
### What changes were proposed in this pull request? If `LLL`/`qqq` is used in the datetime pattern string, and the current JDK in use has a bug for the stand-alone form (see https://bugs.openjdk.java.net/browse/JDK-8114833), throw an exception with a clear error message. ### Why are the changes needed? to keep backward compatibility with Spark 2.4 ### Does this PR introduce _any_ user-facing change? Yes Spark 2.4 ``` scala> sql("select date_format('1990-1-1', 'LLL')").show +---------------------------------------------+ |date_format(CAST(1990-1-1 AS TIMESTAMP), LLL)| +---------------------------------------------+ | Jan| +---------------------------------------------+ ``` Spark 3.0 with Java 11 ``` scala> sql("select date_format('1990-1-1', 'LLL')").show +---------------------------------------------+ |date_format(CAST(1990-1-1 AS TIMESTAMP), LLL)| +---------------------------------------------+ | Jan| +---------------------------------------------+ ``` Spark 3.0 with Java 8 ``` // before this PR +---------------------------------------------+ |date_format(CAST(1990-1-1 AS TIMESTAMP), LLL)| +---------------------------------------------+ | 1| +---------------------------------------------+ // after this PR scala> sql("select date_format('1990-1-1', 'LLL')").show org.apache.spark.SparkUpgradeException ``` ### How was this patch tested? manual test with java 8 and 11 Closes #28646 from cloud-fan/format. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Max Gekk | b5eb0933ac |
[SPARK-31762][SQL][FOLLOWUP] Avoid double formatting in legacy fractional formatter
### What changes were proposed in this pull request? Currently, the legacy fractional formatter is based on the implementation from Spark 2.4 which formats the input timestamp twice: ``` val timestampString = ts.toString val formatted = legacyFormatter.format(ts) ``` to strip trailing zeros. This PR proposes to avoid the first formatting by forming the second fraction directly. ### Why are the changes needed? It makes legacy fractional formatter faster. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By existing test "format fraction of second" in `TimestampFormatterSuite` + added test for timestamps before 1970-01-01 00:00:00Z Closes #28643 from MaxGekk/optimize-legacy-fract-format. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Kent Yao | 311fe6a880 |
[SPARK-31835][SQL][TESTS] Add zoneId to codegen related tests in DateExpressionsSuite
### What changes were proposed in this pull request? This PR modifies some codegen related tests to test escape characters for datetime functions which are time zone aware. If the timezone is absent, the formatter could result in `null` caused by `java.util.NoSuchElementException: None.get` and bypassing the real intention of those test cases. ### Why are the changes needed? fix tests ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? passing the modified test cases. Closes #28653 from yaooqinn/SPARK-31835. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Ali Afroozeh | f6f1e51072 |
[SPARK-31719][SQL] Refactor JoinSelection
### What changes were proposed in this pull request? This PR extracts the logic for selecting the planned join type out of the `JoinSelection` rule and moves it to `JoinSelectionHelper` in Catalyst. ### Why are the changes needed? This change both cleans up the code in `JoinSelection` and allows the logic to be in one place and be used from other rules that need to make decision based on the join type before the planning time. ### Does this PR introduce _any_ user-facing change? `BuildSide`, `BuildLeft`, and `BuildRight` are moved from `org.apache.spark.sql.execution` to Catalyst in `org.apache.spark.sql.catalyst.optimizer`. ### How was this patch tested? This is a refactoring, passes existing tests. Closes #28540 from dbaliafroozeh/RefactorJoinSelection. Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
iRakson | 765105b6f1 |
[SPARK-31638][WEBUI] Clean Pagination code for all webUI pages
### What changes were proposed in this pull request? Pagination code across pages needs to be cleaned. I have tried to clear out these things : * Unused methods * Unused method arguments * remove redundant `if` expressions * fix indentation ### Why are the changes needed? This fix will make code more readable and remove unnecessary methods and variables. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually Closes #28448 from iRakson/refactorPagination. Authored-by: iRakson <raksonrakesh@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
beliefer | 8f2b6f3a0b |
[SPARK-31393][SQL][FOLLOW-UP] Show the correct alias in schema for expression
### What changes were proposed in this pull request? Some alias of expression can not display correctly in schema. This PR will fix them. - `ln` - `rint` - `lcase` - `position` ### Why are the changes needed? Improve the implement of some expression. ### Does this PR introduce _any_ user-facing change? 'Yes'. This PR will let user see the correct alias in schema. ### How was this patch tested? Jenkins test. Closes #28551 from beliefer/show-correct-alias-in-schema. Lead-authored-by: beliefer <beliefer@163.com> Co-authored-by: gengjiaan <gengjiaan@360.cn> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Max Gekk | 87d34e6b96 |
[SPARK-31820][SQL][TESTS] Fix flaky JavaBeanDeserializationSuite
### What changes were proposed in this pull request?
Modified formatting of expected timestamp strings in the test `JavaBeanDeserializationSuite`.`testSpark22000` to correctly format timestamps with **zero** seconds fraction. Current implementation outputs `.0` but must be empty string. From SPARK-31820 failure:
- should be `2020-05-25 12:39:17`
- but incorrect expected string is `2020-05-25 12:39:17.0`
### Why are the changes needed?
To make `JavaBeanDeserializationSuite` stable, and avoid test failures like https://github.com/apache/spark/pull/28630#issuecomment-633695723
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
I changed
|
||
Dilip Biswal | b44acee953 |
[SPARK-31673][SQL] QueryExection.debug.toFile() to take an addtional explain mode param
### What changes were proposed in this pull request? Currently QueryExecution.debug.toFile dumps the query plan information in a fixed format. This PR adds an additional explain mode parameter that writes the debug information as per the user supplied format. ``` df.queryExecution.debug.toFile("/tmp/plan.txt", explainMode = ExplainMode.fromString("formatted")) ``` ``` == Physical Plan == * Filter (2) +- Scan hive default.s1 (1) (1) Scan hive default.s1 Output [2]: [c1#15, c2#16] Arguments: [c1#15, c2#16], HiveTableRelation `default`.`s1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#15, c2#16] (2) Filter [codegen id : 1] Input [2]: [c1#15, c2#16] Condition : (isnotnull(c1#15) AND (c1#15 > 0)) == Whole Stage Codegen == Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 (maxMethodCodeSize:220; maxConstantPoolSize:105(0.16% used); numInnerClasses:0) == *(1) Filter (isnotnull(c1#15) AND (c1#15 > 0)) +- Scan hive default.s1 [c1#15, c2#16], HiveTableRelation `default`.`s1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#15, c2#16] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator inputadapter_input_0; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] filter_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1]; /* 011 */ /* 012 */ public GeneratedIteratorForCodegenStage1(Object[] references) { /* 013 */ this.references = references; /* 014 */ } /* 015 */ /* 016 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 017 */ partitionIndex = index; /* 018 */ this.inputs = inputs; /* 019 */ inputadapter_input_0 = inputs[0]; /* 020 */ filter_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0); /* 021 */ /* 022 */ } /* 023 */ /* 024 */ protected void processNext() throws java.io.IOException { /* 025 */ while ( inputadapter_input_0.hasNext()) { /* 026 */ InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next(); /* 027 */ /* 028 */ do { /* 029 */ boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0); /* 030 */ int inputadapter_value_0 = inputadapter_isNull_0 ? /* 031 */ -1 : (inputadapter_row_0.getInt(0)); /* 032 */ /* 033 */ boolean filter_value_2 = !inputadapter_isNull_0; /* 034 */ if (!filter_value_2) continue; /* 035 */ /* 036 */ boolean filter_value_3 = false; /* 037 */ filter_value_3 = inputadapter_value_0 > 0; /* 038 */ if (!filter_value_3) continue; /* 039 */ /* 040 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 041 */ /* 042 */ boolean inputadapter_isNull_1 = inputadapter_row_0.isNullAt(1); /* 043 */ int inputadapter_value_1 = inputadapter_isNull_1 ? /* 044 */ -1 : (inputadapter_row_0.getInt(1)); /* 045 */ filter_mutableStateArray_0[0].reset(); /* 046 */ /* 047 */ filter_mutableStateArray_0[0].zeroOutNullBytes(); /* 048 */ /* 049 */ filter_mutableStateArray_0[0].write(0, inputadapter_value_0); /* 050 */ /* 051 */ if (inputadapter_isNull_1) { /* 052 */ filter_mutableStateArray_0[0].setNullAt(1); /* 053 */ } else { /* 054 */ filter_mutableStateArray_0[0].write(1, inputadapter_value_1); /* 055 */ } /* 056 */ append((filter_mutableStateArray_0[0].getRow())); /* 057 */ /* 058 */ } while(false); /* 059 */ if (shouldStop()) return; /* 060 */ } /* 061 */ } /* 062 */ /* 063 */ } ``` ### Why are the changes needed? Hopefully enhances the usability of debug.toFile(..) ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added a test in QueryExecutionSuite Closes #28493 from dilipbiswal/write_to_file. Authored-by: Dilip Biswal <dkbiswal@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Max Gekk | 7e4f5bbd8a |
[SPARK-31806][SQL][TESTS] Check reading date/timestamp from legacy parquet: dictionary encoding, w/o Spark version
### What changes were proposed in this pull request? 1. Add the following parquet files to the resource folder `sql/core/src/test/resources/test-data`: - Files saved by Spark 2.4.5 ( |
||
Prakhar Jain | 452594f5a4 |
[SPARK-31810][TEST] Fix AlterTableRecoverPartitions test using incorrect api to modify RDD_PARALLEL_LISTING_THRESHOLD
### What changes were proposed in this pull request? Use the correct API in AlterTableRecoverPartition tests to modify the `RDD_PARALLEL_LISTING_THRESHOLD` conf. ### Why are the changes needed? The existing AlterTableRecoverPartitions test modify the RDD_PARALLEL_LISTING_THRESHOLD as a SQLConf using the withSQLConf API. But since, this is not a SQLConf, it is not overridden and so the test doesn't end up testing the required behaviour. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is UT Fix. UTs are still passing after the fix. Closes #28634 from prakharjain09/SPARK-31810-fix-recover-partitions. Authored-by: Prakhar Jain <prakharjain09@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
HyukjinKwon |
df2a1fe131
|
[SPARK-31808][SQL] Makes struct function's output name and class name pretty
### What changes were proposed in this pull request? This PR proposes to set the alias, and class name in its `ExpressionInfo` for `struct`. - Class name in `ExpressionInfo` - from: `org.apache.spark.sql.catalyst.expressions.NamedStruct` - to:`org.apache.spark.sql.catalyst.expressions.CreateNamedStruct` - Alias name: `named_struct(col1, v, ...)` -> `struct(v, ...)` This PR takes over https://github.com/apache/spark/pull/28631 ### Why are the changes needed? To show the correct output name and class names to users. ### Does this PR introduce _any_ user-facing change? Yes. **Before:** ```scala scala> sql("DESC FUNCTION struct").show(false) +------------------------------------------------------------------------------------+ |function_desc | +------------------------------------------------------------------------------------+ |Function: struct | |Class: org.apache.spark.sql.catalyst.expressions.NamedStruct | |Usage: struct(col1, col2, col3, ...) - Creates a struct with the given field values.| +------------------------------------------------------------------------------------+ ``` ```scala scala> sql("SELECT struct(1, 2)").show(false) +------------------------------+ |named_struct(col1, 1, col2, 2)| +------------------------------+ |[1, 2] | +------------------------------+ ``` **After:** ```scala scala> sql("DESC FUNCTION struct").show(false) +------------------------------------------------------------------------------------+ |function_desc | +------------------------------------------------------------------------------------+ |Function: struct | |Class: org.apache.spark.sql.catalyst.expressions.CreateNamedStruct | |Usage: struct(col1, col2, col3, ...) - Creates a struct with the given field values.| +------------------------------------------------------------------------------------+ ``` ```scala scala> sql("SELECT struct(1, 2)").show(false) +------------+ |struct(1, 2)| +------------+ |[1, 2] | +------------+ ``` ### How was this patch tested? Manually tested, and Jenkins tests. Closes #28633 from HyukjinKwon/SPARK-31808. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> |
||
Max Gekk |
6c80ebbccb
|
[SPARK-31818][SQL] Fix pushing down filters with java.time.Instant values in ORC
### What changes were proposed in this pull request? Convert `java.time.Instant` to `java.sql.Timestamp` in pushed down filters to ORC datasource when Java 8 time API enabled. ### Why are the changes needed? The changes fix the exception raised while pushing date filters when `spark.sql.datetime.java8API.enabled` is set to `true`: ``` java.lang.IllegalArgumentException: Wrong value class java.time.Instant for TIMESTAMP.EQUALS leaf at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.checkLiteralType(SearchArgumentImpl.java:192) at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.<init>(SearchArgumentImpl.java:75) ``` ### Does this PR introduce any user-facing change? Yes ### How was this patch tested? Added tests to `OrcFilterSuite`. Closes #28636 from MaxGekk/orc-timestamp-filter-pushdown. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> |
||
Kent Yao | 695cb617d4 |
[SPARK-31771][SQL] Disable Narrow TextStyle for datetime pattern 'G/M/L/E/u/Q/q'
### What changes were proposed in this pull request? Five continuous pattern characters with 'G/M/L/E/u/Q/q' means Narrow-Text Style while we turn to use `java.time.DateTimeFormatterBuilder` since 3.0.0, which output the leading single letter of the value, e.g. `December` would be `D`. In Spark 2.4 they mean Full-Text Style. In this PR, we explicitly disable Narrow-Text Style for these pattern characters. ### Why are the changes needed? Without this change, there will be a silent data change. ### Does this PR introduce _any_ user-facing change? Yes, queries with datetime operations using datetime patterns, e.g. `G/M/L/E/u` will fail if the pattern length is 5 and other patterns, e,g. 'k', 'm' also can accept a certain number of letters. 1. datetime patterns that are not supported by the new parser but the legacy will get SparkUpgradeException, e.g. "GGGGG", "MMMMM", "LLLLL", "EEEEE", "uuuuu", "aa", "aaa". 2 options are given to end-users, one is to use legacy mode, and the other is to follow the new online doc for correct datetime patterns 2, datetime patterns that are not supported by both the new parser and the legacy, e.g. "QQQQQ", "qqqqq", will get IllegalArgumentException which is captured by Spark internally and results NULL to end-users. ### How was this patch tested? add unit tests Closes #28592 from yaooqinn/SPARK-31771. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Max Gekk | 92685c0148 |
[SPARK-31755][SQL][FOLLOWUP] Update date-time, CSV and JSON benchmark results
### What changes were proposed in this pull request? Re-generate results of: - DateTimeBenchmark - CSVBenchmark - JsonBenchmark in the environment: | Item | Description | | ---- | ----| | Region | us-west-2 (Oregon) | | Instance | r3.xlarge | | AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) | | Java | OpenJDK 64-Bit Server VM 1.8.0_242 and OpenJDK 64-Bit Server VM 11.0.6+10 | ### Why are the changes needed? 1. The PR https://github.com/apache/spark/pull/28576 changed date-time parser. The `DateTimeBenchmark` should confirm that the PR didn't slow down date/timestamp parsing. 2. CSV/JSON datasources are affected by the above PR too. This PR updates the benchmark results in the same environment as other benchmarks to have a base line for future optimizations. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running benchmarks via the script: ```python #!/usr/bin/env python3 import os from sparktestsupport.shellutils import run_cmd benchmarks = [ ['sql/test', 'org.apache.spark.sql.execution.benchmark.DateTimeBenchmark'], ['sql/test', 'org.apache.spark.sql.execution.datasources.csv.CSVBenchmark'], ['sql/test', 'org.apache.spark.sql.execution.datasources.json.JsonBenchmark'] ] print('Set SPARK_GENERATE_BENCHMARK_FILES=1') os.environ['SPARK_GENERATE_BENCHMARK_FILES'] = '1' for b in benchmarks: print("Run benchmark: %s" % b[1]) run_cmd(['build/sbt', '%s:runMain %s' % (b[0], b[1])]) ``` Closes #28613 from MaxGekk/missing-hour-year-benchmarks. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Kent Yao | 0df8dd6073 |
[SPARK-30352][SQL] DataSourceV2: Add CURRENT_CATALOG function
### What changes were proposed in this pull request? As we support multiple catalogs with DataSourceV2, we may need the `CURRENT_CATALOG` value expression from the SQL standard. `CURRENT_CATALOG` is a general value specification in the SQL Standard, described as: > The value specified by CURRENT_CATALOG is the character string that represents the current default catalog name. ### Why are the changes needed? improve catalog v2 with ANSI SQL standard. ### Does this PR introduce any user-facing change? yes, add a new function `current_catalog()` to point the current active catalog ### How was this patch tested? add ut Closes #27006 from yaooqinn/SPARK-30352. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Max Gekk | 7f36310500 |
[SPARK-31802][SQL] Format Java date-time types in Row.jsonValue directly
### What changes were proposed in this pull request? Use `format()` methods for Java date-time types in `Row.jsonValue`. The PR https://github.com/apache/spark/pull/28582 added the methods to avoid conversions to days and microseconds. ### Why are the changes needed? To avoid unnecessary overhead of converting Java date-time types to micros/days before formatting. Also formatters have to convert input micros/days back to Java types to pass instances to standard library API. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By existing tests in `RowJsonSuite`. Closes #28620 from MaxGekk/toJson-format-Java-datetime-types. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
rishi | b90e10c546 |
[SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
### What changes were proposed in this pull request? Add unit tests to the 'number of output rows metric' for some join types in the SQLMetricSuite. A list of unit tests added are as follows. - ShuffledHashJoin: leftOuter, RightOuter, LeftAnti, LeftSemi - BroadcastNestedLoopJoin: RightOuter - BroadcastHashJoin: LeftAnti ### Why are the changes needed? For some combinations of JoinType and Join algorithm there is no test coverage for the 'number of output rows' metric. ### Does this PR introduce any user-facing change? No ### How was this patch tested? I added debug statements in the code to ensure the correct combination if JoinType and Join algorithms are triggered. I further used Intellij debugger to test the same. Closes #28330 from sririshindra/SPARK-31377. Authored-by: rishi <spothireddi@cloudera.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
sandeep katta | cf7463f309 |
[SPARK-31761][SQL] cast integer to Long to avoid IntegerOverflow for IntegralDivide operator
### What changes were proposed in this pull request? `IntegralDivide` operator returns Long DataType, so integer overflow case should be handled. If the operands are of type Int it will be casted to Long ### Why are the changes needed? As `IntegralDivide` returns Long datatype, integer overflow should not happen ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT and also tested in the local cluster After fix ![image](https://user-images.githubusercontent.com/35216143/82603361-25eccc00-9bd0-11ea-9ca7-001c539e628b.png) SQL Test After fix ![image](https://user-images.githubusercontent.com/35216143/82637689-f0250300-9c22-11ea-85c3-886ab2c23471.png) Before Fix ![image](https://user-images.githubusercontent.com/35216143/82637984-878a5600-9c23-11ea-9e47-5ce2fb923c01.png) Closes #28600 from sandeep-katta/integerOverFlow. Authored-by: sandeep katta <sandeep.katta2007@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Gengliang Wang | 9fdc2a0801 |
[SPARK-31793][SQL] Reduce the memory usage in file scan location metadata
### What changes were proposed in this pull request? Currently, the data source scan node stores all the paths in its metadata. The metadata is kept when a SparkPlan is converted into SparkPlanInfo. SparkPlanInfo can be used to construct the Spark plan graph in UI. However, the paths can be very large (e.g. it can be many partitions after partition pruning), while UI pages only require up to 100 bytes for the location metadata. We can reduce the paths stored in metadata to reduce memory usage. ### Why are the changes needed? Reduce unnecessary memory cost. In the heap dump of a driver, the SparkPlanInfo instances are quite large and it should be avoided: ![image](https://user-images.githubusercontent.com/1097932/82642318-8f65de00-9bc2-11ea-9c9c-f05c2b0e1c49.png) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes #28610 from gengliangwang/improveLocationMetadata. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com> |
||
iRakson | fbb3144a9c |
[SPARK-31642] Add Pagination Support for Structured Streaming Page
### What changes were proposed in this pull request? Add Pagination Support for structured streaming page. Now both tables `Active Queries` and `Completed Queries` will have pagination. To implement pagination, pagination framework from #7399 is used. * Also tables will only be shown if there is at least one entry in the table. ### Why are the changes needed? * This will help users in analysing their structured streaming queries in much better way. * Other Web UI pages support pagination in their table. So this will make web UI more consistent across pages. * This can prevent potential OOM errors. ### Does this PR introduce _any_ user-facing change? Yes. Both tables will support pagination. ### How was this patch tested? Manually. I will add snapshots soon. Closes #28485 from iRakson/SPARK-31642. Authored-by: iRakson <raksonrakesh@gmail.com> Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com> |
||
Takeshi Yamamuro | 7ca73f03fb |
[SPARK-29854][SQL][TESTS] Add tests to check lpad/rpad throw an exception for invalid length input
### What changes were proposed in this pull request? This PR intends to add trivial tests to check https://github.com/apache/spark/pull/27024 has already been fixed in the master. Closes #27024 ### Why are the changes needed? For test coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests. Closes #28604 from maropu/SPARK-29854. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org> |
||
Jungtaek Lim (HeartSaVioR) |
5a258b0b67
|
[SPARK-30915][SS] CompactibleFileStreamLog: Avoid reading the metadata log file when finding the latest batch ID
### What changes were proposed in this pull request? This patch adds the new method `getLatestBatchId()` in CompactibleFileStreamLog in complement of getLatest() which doesn't read the content of the latest batch metadata log file, and apply to both FileStreamSource and FileStreamSink to avoid unnecessary latency on reading log file. ### Why are the changes needed? Once compacted metadata log file becomes huge, writing outputs for the compact + 1 batch is also affected due to unnecessarily reading the compacted metadata log file. This unnecessary latency can be simply avoided. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? New UT. Also manually tested under query which has huge metadata log on file stream sink: > before applying the patch ![Screen Shot 2020-02-21 at 4 20 19 PM](https://user-images.githubusercontent.com/1317309/75016223-d3ffb180-54cd-11ea-9063-49405943049d.png) > after applying the patch ![Screen Shot 2020-02-21 at 4 06 18 PM](https://user-images.githubusercontent.com/1317309/75016220-d235ee00-54cd-11ea-81a7-7c03a43c4db4.png) Peaks are compact batches - please compare the next batch after compact batches, especially the area of "light brown". Closes #27664 from HeartSaVioR/SPARK-30915. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com> |
||
TJX2014 | 2115c55efe |
[SPARK-31710][SQL] Adds TIMESTAMP_SECONDS, TIMESTAMP_MILLIS and TIMESTAMP_MICROS functions
### What changes were proposed in this pull request? Add and register three new functions: `TIMESTAMP_SECONDS`, `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` A test is added. Reference: [BigQuery](https://cloud.google.com/bigquery/docs/reference/standard-sql/timestamp_functions?hl=en#timestamp_seconds) ### Why are the changes needed? People will have convenient way to get timestamps from seconds,milliseconds and microseconds. ### Does this PR introduce _any_ user-facing change? Yes, people will have the following ways to get timestamp: ```scala sql("select TIMESTAMP_SECONDS(t.a) as timestamp from values(1230219000),(-1230219000) as t(a)").show(false) ``` ``` +-------------------------+ |timestamp | +-------------------------+ |2008-12-25 23:30:00| |1931-01-07 16:30:00| +-------------------------+ ``` ```scala sql("select TIMESTAMP_MILLIS(t.a) as timestamp from values(1230219000123),(-1230219000123) as t(a)").show(false) ``` ``` +-------------------------------+ |timestamp | +-------------------------------+ |2008-12-25 23:30:00.123| |1931-01-07 16:29:59.877| +-------------------------------+ ``` ```scala sql("select TIMESTAMP_MICROS(t.a) as timestamp from values(1230219000123123),(-1230219000123123) as t(a)").show(false) ``` ``` +------------------------------------+ |timestamp | +------------------------------------+ |2008-12-25 23:30:00.123123| |1931-01-07 16:29:59.876877| +------------------------------------+ ``` ### How was this patch tested? Unit test. Closes #28534 from TJX2014/master-SPARK-31710. Authored-by: TJX2014 <xiaoxingstack@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Wenchen Fan | ce4da29ec3 |
[SPARK-31755][SQL] allow missing year/hour when parsing date/timestamp string
### What changes were proposed in this pull request? This PR allows missing hour fields when parsing date/timestamp string, with 0 as the default value. If the year field is missing, this PR still fail the query by default, but provides a new legacy config to allow it and use 1970 as the default value. It's not a good default value, as it is not a leap year, which means that it would never parse Feb 29. We just pick it for backward compatibility. ### Why are the changes needed? To keep backward compatibility with Spark 2.4. ### Does this PR introduce _any_ user-facing change? Yes. Spark 2.4: ``` scala> sql("select to_timestamp('16', 'dd')").show +------------------------+ |to_timestamp('16', 'dd')| +------------------------+ | 1970-01-16 00:00:00| +------------------------+ scala> sql("select to_date('16', 'dd')").show +-------------------+ |to_date('16', 'dd')| +-------------------+ | 1970-01-16| +-------------------+ scala> sql("select to_timestamp('2019 40', 'yyyy mm')").show +----------------------------------+ |to_timestamp('2019 40', 'yyyy mm')| +----------------------------------+ | 2019-01-01 00:40:00| +----------------------------------+ scala> sql("select to_timestamp('2019 10:10:10', 'yyyy hh:mm:ss')").show +----------------------------------------------+ |to_timestamp('2019 10:10:10', 'yyyy hh:mm:ss')| +----------------------------------------------+ | 2019-01-01 10:10:10| +----------------------------------------------+ ``` in branch 3.0 ``` scala> sql("select to_timestamp('16', 'dd')").show +--------------------+ |to_timestamp(16, dd)| +--------------------+ | null| +--------------------+ scala> sql("select to_date('16', 'dd')").show +---------------+ |to_date(16, dd)| +---------------+ | null| +---------------+ scala> sql("select to_timestamp('2019 40', 'yyyy mm')").show +------------------------------+ |to_timestamp(2019 40, yyyy mm)| +------------------------------+ | 2019-01-01 00:00:00| +------------------------------+ scala> sql("select to_timestamp('2019 10:10:10', 'yyyy hh:mm:ss')").show +------------------------------------------+ |to_timestamp(2019 10:10:10, yyyy hh:mm:ss)| +------------------------------------------+ | 2019-01-01 00:00:00| +------------------------------------------+ ``` After this PR, the behavior becomes the same as 2.4, if the legacy config is enabled. ### How was this patch tested? new tests Closes #28576 from cloud-fan/bug. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Max Gekk | 60118a2426 |
[SPARK-31785][SQL][TESTS] Add a helper function to test all parquet readers
### What changes were proposed in this pull request? Add `withAllParquetReaders` to `ParquetTest`. The function allow to run a block of code for all available Parquet readers. ### Why are the changes needed? 1. It simplifies tests 2. Allow to test all parquet readers that could be available in projects based on Apache Spark. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running affected test suites. Closes #28598 from MaxGekk/add-withAllParquetReaders. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Gengliang Wang | db5e5fce68 |
Revert "[SPARK-31765][WEBUI] Upgrade HtmlUnit >= 2.37.0"
This reverts commit
|
||
Kousuke Saruta | 92877c4ef2 |
[SPARK-31765][WEBUI] Upgrade HtmlUnit >= 2.37.0
### What changes were proposed in this pull request? This PR upgrades HtmlUnit. Selenium and Jetty also upgraded because of dependency. ### Why are the changes needed? Recently, a security issue which affects HtmlUnit is reported. https://nvd.nist.gov/vuln/detail/CVE-2020-5529 According to the report, arbitrary code can be run by malicious users. HtmlUnit is used for test so the impact might not be large but it's better to upgrade it just in case. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing testcases. Closes #28585 from sarutak/upgrade-htmlunit. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com> |
||
iRakson | f1495c5bc0 |
[SPARK-31688][WEBUI] Refactor Pagination framework
### What changes were proposed in this pull request? Currently while implementing pagination using the existing pagination framework, a lot of code is being copied as pointed out [here](https://github.com/apache/spark/pull/28485#pullrequestreview-408881656). I introduced some changes in `PagedTable` which is the main trait for implementing the pagination. * Added function for getting table parameters. * Added a function for table header row. This will help in maintaining consistency across the tables. All the header rows across tables will be consistent now. ### Why are the changes needed? * A lot of code is copied every time pagination is implemented for any table. * Code readability is not great as lot of HTML is embedded. * Paginating other tables will be a lot easier now. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. This is mainly refactoring work, no new functionality introduced. Existing test cases should pass. Closes #28512 from iRakson/refactorPaginationFramework. Authored-by: iRakson <raksonrakesh@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
Vinoo Ganesh | dae79888dc |
[SPARK-31354] SparkContext only register one SparkSession ApplicationEnd listener
## What changes were proposed in this pull request? This change was made as a result of the conversation on https://issues.apache.org/jira/browse/SPARK-31354 and is intended to continue work from that ticket here. This change fixes a memory leak where SparkSession listeners are never cleared off of the SparkContext listener bus. Before running this PR, the following code: ``` SparkSession.builder().master("local").getOrCreate() SparkSession.clearActiveSession() SparkSession.clearDefaultSession() SparkSession.builder().master("local").getOrCreate() SparkSession.clearActiveSession() SparkSession.clearDefaultSession() ``` would result in a SparkContext with the following listeners on the listener bus: ``` [org.apache.spark.status.AppStatusListener5f610071, org.apache.spark.HeartbeatReceiverd400c17, org.apache.spark.sql.SparkSession$$anon$125849aeb, <-First instance org.apache.spark.sql.SparkSession$$anon$1fadb9a0] <- Second instance ``` After this PR, the execution of the same code above results in SparkContext with the following listeners on the listener bus: ``` [org.apache.spark.status.AppStatusListener5f610071, org.apache.spark.HeartbeatReceiverd400c17, org.apache.spark.sql.SparkSession$$anon$125849aeb] <-One instance ``` ## How was this patch tested? * Unit test included as a part of the PR Closes #28128 from vinooganesh/vinooganesh/SPARK-27958. Lead-authored-by: Vinoo Ganesh <vinoo.ganesh@gmail.com> Co-authored-by: Vinoo Ganesh <vganesh@palantir.com> Co-authored-by: Vinoo Ganesh <vinoo@safegraph.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Max Gekk | 5d673319af |
[SPARK-31762][SQL] Fix perf regression of date/timestamp formatting in toHiveString
### What changes were proposed in this pull request? 1. Add new methods that accept date-time Java types to the DateFormatter and TimestampFormatter traits. The methods format input date-time instances to strings: - TimestampFormatter: - `def format(ts: Timestamp): String` - `def format(instant: Instant): String` - DateFormatter: - `def format(date: Date): String` - `def format(localDate: LocalDate): String` 2. Re-use the added methods from `HiveResult.toHiveString` 3. Borrow the code for formatting of `java.sql.Timestamp` from Spark 2.4 `DateTimeUtils.timestampToString` to `FractionTimestampFormatter` because legacy formatters don't support variable length patterns for seconds fractions. ### Why are the changes needed? To avoid unnecessary overhead of converting Java date-time types to micros/days before formatting. Also formatters have to convert input micros/days back to Java types to pass instances to standard library API. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By existing tests for toHiveString and new tests in `TimestampFormatterSuite`. Closes #28582 from MaxGekk/opt-format-old-types. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Ali Smesseim |
d40ecfa3f7
|
[SPARK-31387][SQL] Handle unknown operation/session ID in HiveThriftServer2Listener
### What changes were proposed in this pull request? This is a recreation of #28155, which was reverted due to causing test failures. The update methods in HiveThriftServer2Listener now check if the parameter operation/session ID actually exist in the `sessionList` and `executionList` respectively. This prevents NullPointerExceptions if the operation or session ID is unknown. Instead, a warning is written to the log. To improve robustness, we also make the following changes in HiveSessionImpl.close(): - Catch any exception thrown by `operationManager.closeOperation`. If for any reason this throws an exception, other operations are not prevented from being closed. - Handle not being able to access the scratch directory. When closing, all `.pipeout` files are removed from the scratch directory, which would have resulted in an NPE if the directory does not exist. ### Why are the changes needed? The listener's update methods would throw an exception if the operation or session ID is unknown. In Spark 2, where the listener is called directly, this changes the caller's control flow. In Spark 3, the exception is caught by the ListenerBus but results in an uninformative NullPointerException. In HiveSessionImpl.close(), if an exception is thrown when closing an operation, all following operations are not closed. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit tests Closes #28544 from alismess-db/hive-thriftserver-listener-update-safer-2. Authored-by: Ali Smesseim <ali.smesseim@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> |
||
Wenchen Fan | 34414acfa3 |
[SPARK-31706][SQL] add back the support of streaming update mode
### What changes were proposed in this pull request? This PR adds a private `WriteBuilder` mixin trait: `SupportsStreamingUpdate`, so that the builtin v2 streaming sinks can still support the update mode. Note: it's private because we don't have a proper design yet. I didn't take the proposal in https://github.com/apache/spark/pull/23702#discussion_r258593059 because we may want something more general, like updating by an expression `key1 = key2 + 10`. ### Why are the changes needed? In Spark 2.4, all builtin v2 streaming sinks support all streaming output modes, and v2 sinks are enabled by default, see https://issues.apache.org/jira/browse/SPARK-22911 It's too risky for 3.0 to go back to v1 sinks, so I propose to add a private trait to fix builtin v2 sinks, to keep backward compatibility. ### Does this PR introduce _any_ user-facing change? Yes, now all the builtin v2 streaming sinks support all streaming output modes, which is the same as 2.4 ### How was this patch tested? existing tests. Closes #28523 from cloud-fan/update. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
yi.wu | 0fd98abd85 |
[SPARK-31750][SQL] Eliminate UpCast if child's dataType is DecimalType
### What changes were proposed in this pull request? Eliminate the `UpCast` if it's child data type is already decimal type. ### Why are the changes needed? While deserializing internal `Decimal` value to external `BigDecimal`(Java/Scala) value, Spark should also respect `Decimal`'s precision and scale, otherwise it will cause precision lost and look weird in some cases, e.g.: ``` sql("select cast(11111111111111111111111111111111111111 as decimal(38, 0)) as d") .write.mode("overwrite") .parquet(f.getAbsolutePath) // can fail spark.read.parquet(f.getAbsolutePath).as[BigDecimal] ``` ``` [info] org.apache.spark.sql.AnalysisException: Cannot up cast `d` from decimal(38,0) to decimal(38,18). [info] The type path of the target object is: [info] - root class: "scala.math.BigDecimal" [info] You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object; [info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:3060) [info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$33$$anonfun$applyOrElse$174.applyOrElse(Analyzer.scala:3087) [info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$33$$anonfun$applyOrElse$174.applyOrElse(Analyzer.scala:3071) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309) [info] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314) ``` ### Does this PR introduce _any_ user-facing change? Yes, for cases(cause precision lost) mentioned above will fail before this change but run successfully after this change. ### How was this patch tested? Added tests. Closes #28572 from Ngone51/fix_encoder. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
Kent Yao | 1f29f1ba58 |
[SPARK-31684][SQL] Overwrite partition failed with 'WRONG FS' when the target partition is not belong to the filesystem as same as the table
### What changes were proposed in this pull request? With SPARK-18107, we will disable the underlying replace(overwrite) and instead do delete in spark side and only do copy in hive side to bypass the performance issue - [HIVE-11940](https://issues.apache.org/jira/browse/HIVE-11940) Conditionally, if the table location and partition location do not belong to the same `FileSystem`, We should not disable hive overwrite. Otherwise, hive will use the `FileSystem` instance belong to the table location to copy files, which will fail in `FileSystem#checkPath` https://github.com/apache/hive/blob/rel/release-2.3.7/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L1657 In this PR, for Hive 2.0.0 and onwards, as [HIVE-11940](https://issues.apache.org/jira/browse/HIVE-11940) has been fixed, and there is no performance issue anymore. We should leave the overwrite logic to hive to avoid failure in `FileSystem#checkPath` **NOTE THAT** For Hive 2.2.0 and earlier, if the table and partition locations do not belong together, we will still get the same error thrown by hive encryption check due to [HIVE-14380]( https://issues.apache.org/jira/browse/HIVE-14380) which need to fix in another ticket SPARK-31675. ### Why are the changes needed? bugfix. a logic table can be decoupled with the storage layer and may contain data from remote storage systems. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Currently verified manually. add benchmark tests ```sql -INSERT INTO DYNAMIC 7742 7918 248 0.0 756044.0 1.0X -INSERT INTO HYBRID 1289 1307 26 0.0 125866.3 6.0X -INSERT INTO STATIC 371 393 38 0.0 36219.4 20.9X -INSERT OVERWRITE DYNAMIC 8456 8554 138 0.0 825790.3 0.9X -INSERT OVERWRITE HYBRID 1303 1311 12 0.0 127198.4 5.9X -INSERT OVERWRITE STATIC 434 447 13 0.0 42373.8 17.8X +INSERT INTO DYNAMIC 7382 7456 105 0.0 720904.8 1.0X +INSERT INTO HYBRID 1128 1129 1 0.0 110169.4 6.5X +INSERT INTO STATIC 349 370 39 0.0 34095.4 21.1X +INSERT OVERWRITE DYNAMIC 8149 8362 301 0.0 795821.8 0.9X +INSERT OVERWRITE HYBRID 1317 1318 2 0.0 128616.7 5.6X +INSERT OVERWRITE STATIC 387 408 37 0.0 37804.1 19.1X ``` + for master - for this PR both using hive 2.3.7 Closes #28511 from yaooqinn/SPARK-31684. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Ali Afroozeh | b9cc31cd95 |
[SPARK-31721][SQL] Assert optimized is initialized before tracking the planning time
### What changes were proposed in this pull request? The QueryPlanningTracker in QueryExeuction reports the planning time that also includes the optimization time. This happens because the optimizedPlan in QueryExecution is lazy and only will initialize when first called. When df.queryExecution.executedPlan is called, the the tracker starts recording the planning time, and then calls the optimized plan. This causes the planning time to start before optimization and also include the planning time. This PR fixes this behavior by introducing a method assertOptimized, similar to assertAnalyzed that explicitly initializes the optimized plan. This method is called before measuring the time for sparkPlan and executedPlan. We call it before sparkPlan because that also counts as planning time. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes #28543 from dbaliafroozeh/AddAssertOptimized. Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com> Signed-off-by: herman <herman@databricks.com> |
||
Eren Avsarogullari | ab4cf49a1c |
[SPARK-31440][SQL] Improve SQL Rest API
### What changes were proposed in this pull request? SQL Rest API exposes query execution metrics as Public API. This PR aims to apply following improvements on SQL Rest API by aligning Spark-UI. **Proposed Improvements:** 1- Support Physical Operations and group metrics per physical operation by aligning Spark UI. 2- Support `wholeStageCodegenId` for Physical Operations 3- `nodeId` can be useful for grouping metrics and sorting physical operations (according to execution order) to differentiate same operators (if used multiple times during the same query execution) and their metrics. 4- Filter `empty` metrics by aligning with Spark UI - SQL Tab. Currently, Spark UI does not show empty metrics. 5- Remove line breakers(`\n`) from `metricValue`. 6- `planDescription` can be `optional` Http parameter to avoid network cost where there is specially complex jobs creating big-plans. 7- `metrics` attribute needs to be exposed at the bottom order as `nodes`. Specially, this can be useful for the user where `nodes` array size is high. 8- `edges` attribute is being exposed to show relationship between `nodes`. 9- Reverse order on `metricDetails` aims to match with Spark UI by supporting Physical Operators' execution order. ### Why are the changes needed? Proposed improvements provides more useful (e.g: physical operations and metrics correlation, grouping) and clear (e.g: filtering blank metrics, removing line breakers) result for the end-user. ### Does this PR introduce any user-facing change? Yes. Please find both current and improved versions of the results as attached for following SQL Rest Endpoint: ``` curl -X GET http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true ``` **Current version:** https://issues.apache.org/jira/secure/attachment/12999821/current_version.json **Improved version:** https://issues.apache.org/jira/secure/attachment/13000621/improved_version.json ### Backward Compatibility SQL Rest API will be started to expose with `Spark 3.0` and `3.0.0-preview2` (released on 12/23/19) does not cover this API so if PR can catch 3.0 release, this will not have any backward compatibility issue. ### How was this patch tested? 1. New Unit tests are added. 2. Also, patch has been tested manually through both **Spark Core** and **History Server** Rest APIs. Closes #28208 from erenavsarogullari/SPARK-31440. Authored-by: Eren Avsarogullari <eren.avsarogullari@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com> |
||
Max Gekk | fc5b90243c |
[SPARK-31727][SQL] Fix error message of casting timestamp to int in ANSI non-codegen mode
### What changes were proposed in this pull request? Change timestamp casting to int in ANSI and non-codegen mode, and make the error message consistent to the error messages in the codegen mode. In particular, casting to int is implemented in the same way as casting to short and byte. ### Why are the changes needed? 1. The error message in the non-codegen mode is diversed from the error message in the codegen mode. 2. The error message contains intermediate results that could confuse. ### Does this PR introduce _any_ user-facing change? Yes. Before the changes, the error message of casting timestamp to int contains intermediate result but after the changes it contains the input values which causes arithmetic overflow. ### How was this patch tested? By running the modified test suite `AnsiCastSuite`. Closes #28549 from MaxGekk/fix-error-msg-cast-timestamp. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
Jungtaek Lim (HeartSaVioR) | d2bec5e265 |
[SPARK-31707][SQL] Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax
### What changes were proposed in this pull request? This patch effectively reverts SPARK-30098 via below changes: * Removed the config * Removed the changes done in parser rule * Removed the usage of config in tests * Removed tests which depend on the config * Rolled back some tests to before SPARK-30098 which were affected by SPARK-30098 * Reflect the change into docs (migration doc, create table syntax) ### Why are the changes needed? SPARK-30098 brought confusion and frustration on using create table DDL query, and we agreed about the bad effect on the change. Please go through the [discussion thread](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Resolve-ambiguous-parser-rule-between-two-quot-create-table-quot-s-td29051i20.html) to see the details. ### Does this PR introduce _any_ user-facing change? No, compared to Spark 2.4.x. End users tried to experiment with Spark 3.0.0 previews will see the change that the behavior is going back to Spark 2.4.x, but I believe we won't guarantee compatibility in preview releases. ### How was this patch tested? Existing UTs. Closes #28517 from HeartSaVioR/revert-SPARK-30098. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |