Commit graph

832 commits

Author SHA1 Message Date
Anton Okolnychyi fceabe2372 [SPARK-35779][SQL] Dynamic filtering for Data Source V2
### What changes were proposed in this pull request?

This PR implemented the proposal per [design doc](https://docs.google.com/document/d/1RfFn2e9o_1uHJ8jFGsSakp-BZMizX1uRrJSybMe2a6M) for SPARK-35779.

### Why are the changes needed?

Spark supports dynamic partition filtering that enables reusing parts of the query to skip unnecessary partitions in the larger table during joins. This optimization has proven to be beneficial for star-schema queries which are common in the industry. Unfortunately, dynamic pruning is currently limited to partition pruning during joins and is only supported for built-in v1 sources. As more and more Spark users migrate to Data Source V2, it is important to generalize dynamic filtering and expose it to all v2 connectors.

Please, see the design doc for more information on this effort.

### Does this PR introduce _any_ user-facing change?

Yes, this PR adds a new optional mix-in interface for `Scan` in Data Source V2.

### How was this patch tested?

This PR comes with tests.

Closes #32921 from aokolnychyi/dynamic-filtering-wip.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-01 17:00:12 -07:00
Erik Krogen 4dd41b9678 [SPARK-34365][AVRO] Add support for positional Catalyst-to-Avro schema matching
### What changes were proposed in this pull request?
Provide the (configurable) ability to perform Avro-to-Catalyst schema field matching using the position of the fields instead of their names. A new `option` is added for the Avro datasource, `positionalFieldMatching`, which instructs `AvroSerializer`/`AvroDeserializer` to perform positional field matching instead of matching by name.

### Why are the changes needed?
This by-name matching is somewhat recent; prior to PR #24635, at least on the write path, schemas were matched by positionally ("structural" comparison). While by-name is better behavior as a default, it will be better to make this configurable by a user. Even at the time that PR #24635 was handled, there was [interest in making this behavior configurable](https://github.com/apache/spark/pull/24635#issuecomment-494205251), but it appears it went unaddressed.

There is precedence for configurability of this behavior as seen in PR #29737, which added this support for ORC. Besides this precedence, the behavior of Hive is to perform matching positionally ([ref](https://cwiki.apache.org/confluence/display/Hive/AvroSerDe#AvroSerDe-WritingtablestoAvrofiles)), so this is behavior that Hadoop/Hive ecosystem users are familiar with.

### Does this PR introduce _any_ user-facing change?
Yes, a new option is provided for the Avro datasource, `positionalFieldMatching`, which provides compatibility with Hive and pre-3.0.0 Spark behavior.

### How was this patch tested?
New unit tests are added within `AvroSuite`, `AvroSchemaHelperSuite`, and `AvroSerdeSuite`; and most of the existing tests within `AvroSerdeSuite` are adapted to perform the same test using by-name and positional matching to ensure feature parity.

Closes #31490 from xkrogen/xkrogen-SPARK-34365-avro-positional-field-matching.

Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-30 16:20:45 +08:00
Dongjoon Hyun 16e50356ee [SPARK-34302][FOLLOWUP][SQL][TESTS] Update jdbc.v2.*IntegrationSuite
### What changes were proposed in this pull request?

This PR aims to update JDBC v2 integration suite by adding `catalogName`.

### Why are the changes needed?

To recover the integration test suite.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the GitHub Action.

Closes #33124 from dongjoon-hyun/SPARK-34302.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-28 23:01:54 -07:00
Kousuke Saruta c562c1674e [SPARK-34320][SQL][FOLLOWUP] Modify V2JDBCTest to follow the change of the error message
### What changes were proposed in this pull request?

This is a followup PR for SPARK-34320 (#32854).
That PR changed the error message of `ALTER TABLE` but `V2JDBCTest` didn't comply with the change.

### Why are the changes needed?

To fix`v2.*JDBCSuite` failure.
```
[info] - SPARK-33034: ALTER TABLE ... add new columns (173 milliseconds)
[info] - SPARK-33034: ALTER TABLE ... drop column *** FAILED *** (126 milliseconds)
[info]   "Cannot delete missing field bad_column in postgresql.alt_table schema: root
[info]    |-- C2: string (nullable = true)
[info]   ; line 1 pos 0;
[info]   'AlterTableDropColumns [unresolvedfieldname(bad_column)]
[info]   +- ResolvedTable org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog7f4b7516, alt_table, JDBCTable(alt_table,StructType(StructField(C2,StringType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions5842301d), [C2#1879]
[info]   " did not contain "Cannot delete missing field bad_column in alt_table schema" (V2JDBCTest.scala:106)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.jdbc.v2.V2JDBCTest.$anonfun$$init$$6(V2JDBCTest.scala:106)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1461)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:305)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:303)
[info]   at org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.withTable(DockerJDBCIntegrationSuite.scala:95)
[info]   at org.apache.spark.sql.jdbc.v2.V2JDBCTest.$anonfun$$init$$5(V2JDBCTest.scala:95)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
[info]   at org.scalatest.Suite.run(Suite.scala:1112)
[info]   at org.scalatest.Suite.run$(Suite.scala:1094)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:62)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:62)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
[info] - SPARK-33034: ALTER TABLE ... update column type (122 milliseconds)
[info] - SPARK-33034: ALTER TABLE ... rename column (93 milliseconds)
[info] - SPARK-33034: ALTER TABLE ... update column nullability (92 milliseconds)
[info] - CREATE TABLE with table comment (38 milliseconds)
[info] - CREATE TABLE with table property (52 milliseconds)
[info] MySQLIntegrationSuite:
[info] - Basic test (61 milliseconds)
[info] - Numeric types (67 milliseconds)
[info] - Date types (59 milliseconds)
[info] - String types (50 milliseconds)
[info] - Basic write test (216 milliseconds)
[info] - query JDBC option (64 milliseconds)
[info] Run completed in 19 minutes, 43 seconds.
[info] Total number of tests run: 89
[info] Suites: completed 14, aborted 0
[info] Tests: succeeded 84, failed 5, canceled 0, ignored 0, pending 0
[info] *** 5 TESTS FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.sql.jdbc.v2.OracleIntegrationSuite
[error] 	org.apache.spark.sql.jdbc.v2.MsSqlServerIntegrationSuite
[error] 	org.apache.spark.sql.jdbc.v2.DB2IntegrationSuite
[error] 	org.apache.spark.sql.jdbc.v2.MySQLIntegrationSuite
[error] 	org.apache.spark.sql.jdbc.v2.PostgresIntegrationSuite
[error] (docker-integration-tests / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 1223 s (20:23), completed Jun 25, 2021 1:31:04 AM
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

docker-integration-tests on GA.

Closes #33074 from sarutak/followup-SPARK-34320.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-06-25 12:58:38 +09:00
Vinod KC 4dabba8f76 [SPARK-35747][CORE] Avoid printing full Exception stack trace, if Hbase/Kafka/Hive services are not running in a secure cluster
### What changes were proposed in this pull request?
In a secure Yarn cluster, even though HBase or Kafka, or Hive services are not used in the user application, yarn client unnecessarily trying to generate  Delegations token from these services. This will add additional delays while submitting spark application in a yarn cluster

 Also during HBase delegation token generation step in the application submit stage,  HBaseDelegationTokenProvider prints a full Exception Stack trace and it causes a noisy warning.
 Apart from printing exception stack trace, Application submission taking more time as it retries connection to HBase master multiple times before it gives up. So, if HBase is not used in the user Applications, it is better to suggest User disable HBase Delegation Token generation.

 This PR aims to avoid printing full Exception Stack by just printing just Exception name and also add a suggestion message to disable `Delegation Token generation` if service is not used in the Spark Application.

 eg: `If HBase is not used, set spark.security.credentials.hbase.enabled to false`

### Why are the changes needed?

To avoid printing full Exception stack trace in WARN log
#### Before the fix
----------------
```
spark-shell --master yarn
.......
.......
21/06/12 14:29:41 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.security.HBaseDelegationTokenProvider.obtainDelegationTokensWithHBaseConn(HBaseDelegationT
okenProvider.scala:93)
        at org.apache.spark.deploy.security.HBaseDelegationTokenProvider.obtainDelegationTokens(HBaseDelegationTokenProvider.
scala:60)
        at org.apache.spark.deploy.security.HadoopDelegationTokenManager$$anonfun$6.apply(HadoopDelegationTokenManager.scala:
166)
        at org.apache.spark.deploy.security.HadoopDelegationTokenManager$$anonfun$6.apply(HadoopDelegationTokenManager.scala:
164)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.deploy.security.HadoopDelegationTokenManager.obtainDelegationTokens(HadoopDelegationTokenManager.
scala:164)
```

#### After  the fix
------------
```
 spark-shell --master yarn

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/06/13 02:10:02 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase due to  java.lang.reflect.InvocationTargetException Retrying to fetch HBase security token with hbase connection parameter.
21/06/13 02:10:40 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase java.lang.reflect.InvocationTargetException. If HBase is not used, set spark.security.credentials.hbase.enabled to false
21/06/13 02:10:47 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
```
### Does this PR introduce _any_ user-facing change?

Yes, in the log, it avoids printing full Exception stack trace.
Instread prints this.
**WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase java.lang.reflect.InvocationTargetException. If HBase is not used, set spark.security.credentials.hbase.enabled to false**

### How was this patch tested?

Tested manually as it can be verified only in a secure cluster

Closes #32894 from vinodkc/br_fix_Hbase_DT_Exception_stack_printing.

Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-23 23:12:02 -07:00
Bruce Robbins 66d5a0049a [SPARK-35817][SQL] Restore performance of queries against wide Avro tables
### What changes were proposed in this pull request?

When creating a record writer in an AvroDeserializer, or creating a struct converter in an AvroSerializer, look up Avro fields using a map rather than scanning the entire list of Avro fields.

### Why are the changes needed?

A query against an Avro table can be quite slow when all are true:

* There are many columns in the Avro file
* The query contains a wide projection
* There are many splits in the input
* Some of the splits are read serially (e.g., less executors than there are tasks)

A write to an Avro table can be quite slow when all are true:

* There are many columns in the new rows
* The operation is creating many files

For example, a single-threaded query against a 6000 column Avro data set with 50K rows and 20 files takes less than a minute with Spark 3.0.1 but over 7 minutes with Spark 3.2.0-SNAPSHOT.

This PR restores the faster time.

For the 1000 column read benchmark:
Before patch: 108447 ms
After patch: 35925 ms
percent improvement: 66%

For the 1000 column write benchmark:
Before patch: 123307
After patch: 42313
percent improvement: 65%

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

* Ran existing unit tests
* Added new unit tests
* Added new benchmarks

Closes #32969 from bersprockets/SPARK-35817.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-23 22:36:56 +08:00
YangJie 6c05459600 [SPARK-35838][BUILD][TESTS] Ensure all modules can be maven test independently in Scala 2.13
### What changes were proposed in this pull request?
Similar to SPARK-35532, the main change of this pr is add `scala-2.13` profile to external/kafka-0-10-sql/pom.xml, external/avro/pom.xml and sql/hive-thriftserver/pom.xml,  the `scala-2.13` profile include dependency on `scala-parallel-collections_2.13`, then all(34) spark modules can maven test independently.

### Why are the changes needed?
Ensure alll(34) spark modules can be maven test independently in Scala 2.13

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass the GitHub Action Scala 2.13 job
- Manual test:

1. Execute
```
dev/change-scala-version.sh 2.13

mvn clean install -DskipTests -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13
```

2. maven test `external/kafka-0-10-sql` module
```
mvn test -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13 -pl external/kafka-0-10-sql
```

**before**

```
Discovery starting.
Discovery completed in 857 milliseconds.
Run starting. Expected test count is: 464
...
KafkaRelationSuiteV2:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- default starting and ending offsets with headers
- timestamp provided for starting and ending
- timestamp provided for starting, offset provided for ending
- timestamp provided for ending, offset provided for starting
- timestamp provided for starting, ending not provided
- timestamp provided for ending, starting not provided
- global timestamp provided for starting and ending
- no matched offset for timestamp - startingOffsets
- preferences on offset related options
- no matched offset for timestamp - endingOffsets
*** RUN ABORTED ***
  java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:182)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:220)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:217)
  ...
  Cause: java.lang.ClassNotFoundException: scala.collection.parallel.TaskSupport
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  ...
```

**After**

```
Run completed in 33 minutes, 51 seconds.
Total number of tests run: 464
Suites: completed 31, aborted 0
Tests: succeeded 464, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

3. maven test `external/avro` module

```
mvn test -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13 -pl external/avro
```

**before**

```
Discovery starting.
Discovery completed in 2 seconds, 765 milliseconds.
Run starting. Expected test count is: 255
AvroReadSchemaSuite:
- append column at the end
- hide column at the end
- append column into middle
- hide column in the middle
- add a nested column at the end of the leaf struct column
- add a nested column in the middle of the leaf struct column
- add a nested column at the end of the middle struct column
- add a nested column in the middle of the middle struct column
- hide a nested column at the end of the leaf struct column
- hide a nested column in the middle of the leaf struct column
- hide a nested column at the end of the middle struct column
- hide a nested column in the middle of the middle struct column
*** RUN ABORTED ***
  java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:182)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:220)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:217)
  ...
  Cause: java.lang.ClassNotFoundException: scala.collection.parallel.TaskSupport
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  ...
```

**After**

```
Run completed in 1 minute, 42 seconds.
Total number of tests run: 255
Suites: completed 12, aborted 0
Tests: succeeded 255, failed 0, canceled 0, ignored 2, pending 0
All tests passed.
```

4.  maven test `sql/hive-thriftserver` module

```
mvn test -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13 -pl sql/hive-thriftserver
```

**before**

```
- union.sql *** FAILED ***
  "1  a
  1 a
  2 b
  2 b" did not contain "Exception" Exception did not match for query #2
  SELECT *
  FROM   (SELECT * FROM t1
          UNION ALL
          SELECT * FROM t1), expected: 1  a
  1 a
  2 b
  2 b, but got: java.sql.SQLException
  org.apache.hive.service.cli.HiveSQLException: Error running query: java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
    at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:38)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:324)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:229)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:229)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:224)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:238)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
  Caused by: java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
    at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
    at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
    at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:182)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:220)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:217)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:178)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:323)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:389)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3719)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2987)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3710)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:774)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3708)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2987)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:299)
    ... 16 more
  Caused by: java.lang.ClassNotFoundException: scala.collection.parallel.TaskSupport
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 40 more (ThriftServerQueryTestSuite.scala:209)
```

**After**

```
Run completed in 29 minutes, 17 seconds.
Total number of tests run: 535
Suites: completed 20, aborted 0
Tests: succeeded 535, failed 0, canceled 0, ignored 17, pending 0
All tests passed.
```

Closes #32994 from LuciferYang/SPARK-35838.

Authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-22 06:31:24 -07:00
Jungtaek Lim 4a6d90e187 [SPARK-35611][SS] Introduce the strategy on mismatched offset for start offset timestamp on Kafka data source
### What changes were proposed in this pull request?

This PR proposes to introduce the strategy on mismatched offset for start offset timestamp on Kafka data source.

Please read the section `Why are the changes needed?` to understand the rationalization of the functionality.

This would be pretty much helpful for the case where there's a skew between partitions and some partitions have older records.

* AS-IS: Spark simply fails the query and end users have to deal with workarounds requiring manual steps.
* TO-BE: Spark will assign the latest offset for these partitions, so that Spark can read newer records from these partitions in further micro-batches.

To retain the existing behavior and also give some help for the proposed "TO-BE" behavior, we'd like to introduce the strategy on mismatched offset for start offset timestamp to let end users choose from them.

The strategy will be added as source option, to ensure end users set the behavior explicitly (otherwise simply "known" default value).

* New source option to be added: startingOffsetsByTimestampStrategy
* Available values: `error` (fail the query as referred as AS-IS), `latest` (set the offset to the latest as referred as TO-BE)

Doc changes are following:

![ES-106042-doc-screenshot-1](https://user-images.githubusercontent.com/1317309/120472697-2c1ba800-c3e1-11eb-884f-f28152168053.png)
![ES-106042-doc-screenshot-2](https://user-images.githubusercontent.com/1317309/120472719-33db4c80-c3e1-11eb-9851-939be8a3ddb7.png)

### Why are the changes needed?

We encountered a real-world case Spark fails the query if some of the partitions don't have matching offset by timestamp.

This is intended behavior to avoid bring unintended output for some cases like:

* timestamp 2 is presented as timestamp-offset, but the some of partitions don't have the record yet
* record with timestamp 1 comes "later" in the following micro-batch

which is possible since Kafka allows to specify the timestamp in record.

Here the unintended output we talked about was the risk of reading record with timestamp 1 in the next micro-batch despite the option specifying timestamp 2.

But for many cases end users just suppose timestamp is increasing monotonically with wall clocks are all in sync, and current behavior blocks these cases to make progress.

### Does this PR introduce _any_ user-facing change?

Yes, but not a breaking change. It's up to end users to choose the behavior which the default value is "error" (current behavior). And it's a source option (not config) so they need to explicitly set the behavior to let the functionality takes effect.

### How was this patch tested?

New UTs.

Closes #32747 from HeartSaVioR/SPARK-35611.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-06-21 00:37:42 -07:00
Satish Gopalani 2a331177ba [SPARK-35312][SS] Introduce new Option in Kafka source to specify minimum number of records to read per trigger
### What changes were proposed in this pull request?
This patch introduces a new option to specify the minimum number of offsets to read per trigger i.e. minOffsetsPerTrigger and maxTriggerDelay to avoid the infinite wait for the trigger.

This new option will allow skipping trigger/batch when the number of records available in Kafka is low. This is a very useful feature in cases where we have a sudden burst of data at certain intervals in a day and data volume is low for the rest of the day.
'maxTriggerDelay' option will help to avoid cases of infinite delay in scheduling trigger and the trigger will happen irrespective of records available if the maxTriggerDelay time exceeds the last trigger. It would be an optional parameter with a default value of 15 mins. This option will be only applicable if minOffsetsPerTrigger is set.

minOffsetsPerTrigger option would be optional of course, but once specified it would take precedence over maxOffestsPerTrigger which will be honored only after minOffsetsPerTrigger is satisfied.

### Why are the changes needed?
There are many scenarios where there is a sudden burst of data at certain intervals in a day and data volume is low for the rest of the day. Tunning such jobs is difficult as decreasing trigger processing time increasing the number of batches and hence cluster resource usage and adds to small file issues. Increasing trigger processing time adds consumer lag. This patch tries to address this issue.

### How was this patch tested?
This patch was tested by adding test cases as well as manually on a cluster where the job was running for a full one day with a data burst happening once a day.
Here is the picture of databurst and hence consumer lag:
<img width="1198" alt="Screenshot 2021-04-29 at 11 39 35 PM" src="https://user-images.githubusercontent.com/1044003/116997587-9b2ab180-acfa-11eb-91fd-524802ce3316.png">

This is how the job behaved at burst time running every 4.5 mins (which is the specified trigger time):
<img width="1154" alt="Burst Time" src="https://user-images.githubusercontent.com/1044003/116997919-12f8dc00-acfb-11eb-9b0a-98387fc67560.png">

This is job behavior during the non-burst time where it is skipping 2 to 3 triggers and running once every 9 to 13.5 mins
<img width="1154" alt="Non Burst Time" src="https://user-images.githubusercontent.com/1044003/116998244-8b5f9d00-acfb-11eb-8340-33d47149ef81.png">

Here are some more stats from the two-run i.e. one normal run and the other with minOffsetsperTrigger set:

| Run | Data Size | Number of Batch Runs | Number of Files |
| ------------- | ------------- |------------- |------------- |
| Normal Run | 54.2 GB | 320 | 21968 |
| Run with minOffsetsperTrigger | 54.2 GB | 120 | 12104 |

Closes #32653 from satishgopalani/SPARK-35312.

Authored-by: Satish Gopalani <satish.gopalani@pubmatic.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-08 23:48:09 +09:00
Kousuke Saruta 08e6f633b5 [SPARK-35577][TESTS] Allow to log container output for docker integration tests
### What changes were proposed in this pull request?

This PR proposes to add a feature that logs container output for docker integration tests.
With this change, if we run test with SBT, we will have like the following log in `unit-tests.log`.
```
===== CONTAINER LOGS FOR container Id: 3360c98eb28337d8b217fb614e47bf49aafa18a6cb60ecadf3178aee0c663021 =====
21/05/31 20:54:56.433 pool-1-thread-1 INFO PostgresIntegrationSuite: The files belonging to this database system will be owned by user "postgres".
This user must also own the server process.

The database cluster will be initialized with locale "en_US.utf8".
The default database encoding has accordingly been set to "UTF8".
The default text search configuration will be set to "english".

Data page checksums are disabled.

fixing permissions on existing directory /var/lib/postgresql/data ... ok
creating subdirectories ... ok
selecting dynamic shared memory implementation ... posix
selecting default max_connections ... 100
selecting default shared_buffers ... 128MB
selecting default time zone ... UTC
creating configuration files ... ok
running bootstrap script ... ok
sh: locale: not found
2021-05-31 11:54:49.892 UTC [29] WARNING:  no usable system locales were found
performing post-bootstrap initialization ... ok
initdb: warning: enabling "trust" authentication for local connections
You can change this by editing pg_hba.conf or using the option -A, or
--auth-local and --auth-host, the next time you run initdb.
syncing data to disk ... ok

Success. You can now start the database server using:

    pg_ctl -D /var/lib/postgresql/data -l logfile start

waiting for server to start....2021-05-31 11:54:50.284 UTC [34] LOG:  starting PostgreSQL 13.0 on x86_64-pc-linux-musl, compiled by gcc (Alpine 9.3.0) 9.3.0, 64-bit
2021-05-31 11:54:50.287 UTC [34] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
2021-05-31 11:54:50.296 UTC [35] LOG:  database system was shut down at 2021-05-31 11:54:50 UTC
2021-05-31 11:54:50.301 UTC [34] LOG:  database system is ready to accept connections
 done
server started

/usr/local/bin/docker-entrypoint.sh: ignoring /docker-entrypoint-initdb.d/*

waiting for server to shut down....2021-05-31 11:54:50.363 UTC [34] LOG:  received fast shutdown request
2021-05-31 11:54:50.366 UTC [34] LOG:  aborting any active transactions
2021-05-31 11:54:50.368 UTC [34] LOG:  background worker "logical replication launcher" (PID 41) exited with exit code 1
2021-05-31 11:54:50.368 UTC [36] LOG:  shutting down
2021-05-31 11:54:50.402 UTC [34] LOG:  database system is shut down
 done
server stopped

PostgreSQL init process complete; ready for start up.

2021-05-31 11:54:50.510 UTC [1] LOG:  starting PostgreSQL 13.0 on x86_64-pc-linux-musl, compiled by gcc (Alpine 9.3.0) 9.3.0, 64-bit
2021-05-31 11:54:50.510 UTC [1] LOG:  listening on IPv4 address "0.0.0.0", port 5432
2021-05-31 11:54:50.510 UTC [1] LOG:  listening on IPv6 address "::", port 5432
2021-05-31 11:54:50.517 UTC [1] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
2021-05-31 11:54:50.526 UTC [43] LOG:  database system was shut down at 2021-05-31 11:54:50 UTC
2021-05-31 11:54:50.531 UTC [1] LOG:  database system is ready to accept connections
2021-05-31 11:54:54.226 UTC [54] ERROR:  relation "public.barcopy" does not exist at character 15
2021-05-31 11:54:54.226 UTC [54] STATEMENT:  SELECT 1 FROM public.barcopy LIMIT 1
2021-05-31 11:54:54.610 UTC [59] ERROR:  relation "public.barcopy2" does not exist at character 15
2021-05-31 11:54:54.610 UTC [59] STATEMENT:  SELECT 1 FROM public.barcopy2 LIMIT 1
2021-05-31 11:54:54.934 UTC [63] ERROR:  relation "shortfloat" does not exist at character 15
2021-05-31 11:54:54.934 UTC [63] STATEMENT:  SELECT 1 FROM shortfloat LIMIT 1
2021-05-31 11:54:55.675 UTC [75] ERROR:  relation "byte_to_smallint_test" does not exist at character 15
2021-05-31 11:54:55.675 UTC [75] STATEMENT:  SELECT 1 FROM byte_to_smallint_test LIMIT 1

21/05/31 20:54:56.434 pool-1-thread-1 INFO PostgresIntegrationSuite:

===== END OF CONTAINER LOGS FOR container Id: 3360c98eb28337d8b217fb614e47bf49aafa18a6cb60ecadf3178aee0c663021 =====
```

### Why are the changes needed?

If we have container logs, it's useful to debug especially for GA.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I run docker integration tests and got logs. The example shown above is for `PostgresIntegrationSuite`.

Closes #32715 from sarutak/log-docker-container.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-01 22:44:48 +09:00
yangjie01 16d9de815e [SPARK-35532][TESTS] Ensure mllib and kafka-0-10 module can be maven test independently in Scala 2.13
### What changes were proposed in this pull request?
Before this pr, when we execute maven test command to test `mllib` and `kafka-0-10` module independently, there are some Java UTs failed, the key error messages are as follows:

```
java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
```

and

```
java.lang.NoClassDefFoundError: scala/collection/parallel/immutable/ParVector
```

The UTs need `scala-parallel-collections_2.13`,  but it not in classpath when we run `mvn test -pl mllib -Pscala-2.13` and `mvn test -pl external/kafka-0-10 -Pscala-2.13`.

So the main change of this pr is add `scala-2.13` profile to `mllib/pom.xml` and `external/kafka-0-10/pom.xml`, the `scala-2.13` profile include dependency on `scala-parallel-collections_2.13`, then these two modules can maven test independently.

### Why are the changes needed?
Ensure mllib and kafka-0-10 module can be maven test independently in Scala 2.13

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the GitHub Action Scala 2.13 job
- Manual test:

1. Execute
```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13
```

2. Execute

```
mvn test -pl mllib -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13
```

**Before**

6 Java UTs failed:

```
[ERROR] Errors:
[ERROR]   JavaStreamingLogisticRegressionSuite.javaAPI:78 » TestFailed 20005 was not les...
[ERROR]   JavaStreamingKMeansSuite.javaAPI:78 » TestFailed 20040 was not less than 20000...
[ERROR]   JavaPrefixSpanSuite.runPrefixSpan:45 » NoClassDefFound scala/collection/parall...
[ERROR]   JavaPrefixSpanSuite.runPrefixSpanSaveLoad:67 » NoClassDefFound scala/collectio...
[ERROR]   JavaStreamingLinearRegressionSuite.javaAPI:77 » TestFailed 20014 was not less ...
[ERROR]   JavaStatisticsSuite.streamingTest:112 » TestFailed 20043 was not less than 200...
[INFO]
[ERROR] Tests run: 122, Failures: 0, Errors: 6, Skipped: 0
```

**After**

```
[INFO] Tests run: 122, Failures: 0, Errors: 0, Skipped: 0

Run completed in 28 minutes, 32 seconds.
Total number of tests run: 1654
Suites: completed 208, aborted 0
Tests: succeeded 1654, failed 0, canceled 0, ignored 7, pending 0
All tests passed.
```

3. Execute

```
mvn test -pl external/kafka-0-10 -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13
```

**Before**

2 Java UTs failed:

```
[ERROR] Errors:
[ERROR] org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite.testKafkaStream
[ERROR]   Run 1: JavaDirectKafkaStreamSuite.testKafkaStream:170 expected:<[topic1-1, topic1-2, topic2-1, topic1-3, topic2-2, topic2-3]> but was:<[]>
[ERROR]   Run 2: JavaDirectKafkaStreamSuite.tearDown:57 » NoClassDefFound scala/collection/para...
[ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0
```

**After**

```
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0

Run completed in 1 minute, 3 seconds.
Total number of tests run: 21
Suites: completed 4, aborted 0
Tests: succeeded 21, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

```

Closes #32676 from LuciferYang/mllib-kafka-mvn-test.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-30 16:36:17 -07:00
Kousuke Saruta 2de19e460b [SPARK-35483][INFRA] Add docker-integration-tests to run-tests.py and GA
### What changes were proposed in this pull request?

This PR proposes to add `docker-integratin-tests` to `run-tests.py` and GA.
Once #32631 was merged but there was a lack of consideration.

Diff between this change and 692d95d145 merged in #32631 is as follows.

```
       if: github.repository != 'apache/spark'
       id: sync-branch
       run: |
+        apache_spark_ref=`git rev-parse HEAD`
         git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/}
         git -c user.name='Apache Spark Test Account' -c user.email='sparktestaccgmail.com' merge --no-commit --progress --squash FETCH_HEAD
         git -c user.name='Apache Spark Test Account' -c user.email='sparktestaccgmail.com' commit -m "Merged commit"
+        echo "::set-output name=APACHE_SPARK_REF::$apache_spark_ref"
     - name: Cache Scala, SBT and Maven
       uses: actions/cachev2
       with:
```

### Why are the changes needed?

CI for `docker-integration-tests` is absent for now.

### Does this PR introduce _any_ user-facing change?

GA.

### How was this patch tested?

Closes #32691 from sarutak/docker-integration-test-ga-take2.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-28 16:54:47 +09:00
Hyukjin Kwon d189cf75f9 Revert "[SPARK-35483][INFRA] Add docker-integration-tests to run-tests.py and GA"
This reverts commit 0a74ad66b3.
2021-05-28 14:29:12 +09:00
Kousuke Saruta 0a74ad66b3 [SPARK-35483][INFRA] Add docker-integration-tests to run-tests.py and GA
### What changes were proposed in this pull request?

This PR proposes to add `docker-integratin-tests` to `run-tests.py` and GA.
`doker-integration-tests` can't run if docker is not installed so it run only if `docker-integration-tests` is specified with `--module`.

### Why are the changes needed?

CI for `docker-integration-tests` is absent for now.

### Does this PR introduce _any_ user-facing change?

GA.

### How was this patch tested?

Closes #32631 from sarutak/docker-integration-test-ga.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-28 07:56:37 +09:00
Kousuke Saruta 116a97e153 [SPARK-35501][SQL][TESTS] Add a feature for removing pulled container image for docker integration tests
### What changes were proposed in this pull request?

This PR adds a feature for removing pulled container image after every docker integration test finish.
This feature is enabled by the new propoerty `spark.tes.docker.removePulledImage`.

### Why are the changes needed?

For idempotent.
I'm trying to add docker integration tests to GA in SPARK-35483 (#32631) but I noticed that `jdbc.OracleIntegrationSuite` consistently fails(https://github.com/sarutak/spark/runs/2646707235?check_suite_focus=true).
I investigated the reason and I found it's short of the storage capacity of the host on GA.
```
 ORACLE PASSWORD FOR SYS AND SYSTEM: oracle
The location '/opt/oracle' specified for database files has insufficient space.
Database creation needs at least '4.5GB' disk space.
Specify a different database file destination that has enough space in the configuration file '/etc/sysconfig/oracle-xe-18c.conf'.
mv: cannot stat '/opt/oracle/product/18c/dbhomeXE/dbs/spfileXE.ora': No such file or directory
mv: cannot stat '/opt/oracle/product/18c/dbhomeXE/dbs/orapwXE': No such file or directory
ORACLE_HOME = [/home/oracle] ? ORACLE_BASE environment variable is not being set since this
information is not available for the current user ID .
You can set ORACLE_BASE manually if it is required.
Resetting ORACLE_BASE to its previous value or ORACLE_HOME
The Oracle base remains unchanged with value /opt/oracle
#####################################
########### E R R O R ###############
DATABASE SETUP WAS NOT SUCCESSFUL!
Please check output for further info!
########### E R R O R ###############
#####################################
The following output is now a tail of the alert.log:
tail: cannot open '/opt/oracle/diag/rdbms/*/*/trace/alert*.log' for reading: No such file or directory
tail: no files remaining
```

With this feature, pulled container image is removed and keep the capacity for `jdbc.OracleIntegrationSuite` in GA.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I confirmed the following things.

* A container image which is absent in the local repository is removed after test finished if `spark.test.container.removePulledImage` is `true`.
* A container image which is present in the local repository is not removed after the finished even if `spark.test.container.removePulledImage` is `true`.
* A container image is not removed regardless of presence of the container image in the local repository even if `spark.test.container.removePulledImage` is `true`.

Closes #32652 from sarutak/docker-image-rm.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-26 17:24:29 +09:00
Jungtaek Lim a57afd442c [SPARK-29223][SQL][SS] New option to specify timestamp on all subscribing topic-partitions in Kafka source
### What changes were proposed in this pull request?

This patch is a follow-up of SPARK-26848 (#23747). In SPARK-26848, we decided to open possibility to let end users set individual timestamp per partition. But in many cases, specifying timestamp represents the intention that we would want to go back to specific timestamp and reprocess records, which should be applied to all topics and partitions.

This patch proposes to provide a way to set a global timestamp across topic-partitions which the source is subscribing to, so that end users can set all offsets by specific timestamp easily. To provide the way to config the timestamp easier, the new options only receive "a" timestamp for start/end timestamp.

New options introduced in this PR:

* startingTimestamp
* endingTimestamp

All two options receive timestamp as string.

There're priorities for options regarding starting/ending offset as we will have three options for start offsets and another three options for end offsets. Priorities are following:

* starting offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets
* ending offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets

### Why are the changes needed?

Existing option to specify timestamp as offset is quite verbose if there're a lot of partitions across topics. Suppose there're 100s of partitions in a topic, the json should contain 100s of times of the same timestamp.

Also, the number of partitions can also change, which requires either:

* fixing the code if the json is statically created
* introducing the dependencies on Kafka client and deal with Kafka API on crafting json programmatically

Both approaches are even not "acceptable" if we're dealing with ad-hoc query; anyone doesn't want to write the code more complicated than the query itself. Flink [provides the option](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-start-position-configuration) to specify a timestamp for all topic-partitions like this PR, and even doesn't provide the option to specify the timestamp per topic-partition.

With this PR, end users are only required to provide a single timestamp value. No more complicated JSON format end users need to know about the structure.

### Does this PR introduce _any_ user-facing change?

Yes, this PR introduces two new options, described in above section.

Doc changes are following:

![스크린샷 2021-05-21 오후 12 01 02](https://user-images.githubusercontent.com/1317309/119076244-3034e680-ba2d-11eb-8323-0e227932d2e5.png)
![스크린샷 2021-05-21 오후 12 01 12](https://user-images.githubusercontent.com/1317309/119076255-35923100-ba2d-11eb-9d79-538a7f9ee738.png)
![스크린샷 2021-05-21 오후 12 01 24](https://user-images.githubusercontent.com/1317309/119076264-39be4e80-ba2d-11eb-8265-ac158f55c360.png)
![스크린샷 2021-05-21 오후 12 06 01](https://user-images.githubusercontent.com/1317309/119076271-3d51d580-ba2d-11eb-98ea-35fd72b1bbfc.png)

### How was this patch tested?

New UTs covering new functionalities. Also manually tested via simple batch & streaming queries.

Closes #32609 from HeartSaVioR/SPARK-29223-v2.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-05-25 21:43:49 +09:00
Kousuke Saruta 1a43415d8d [SPARK-35226][SQL][FOLLOWUP] Fix test added in SPARK-35226 for DB2KrbIntegrationSuite
### What changes were proposed in this pull request?

This PR fixes an test added in SPARK-35226 (#32344).

### Why are the changes needed?

`SELECT 1` seems non-valid query for DB2.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

DB2KrbIntegrationSuite passes on my laptop.

I also confirmed all the KrbIntegrationSuites pass with the following command.
```
build/sbt -Phive -Phive-thriftserver -Pdocker-integration-tests "testOnly org.apache.spark.sql.jdbc.*KrbIntegrationSuite"
```

Closes #32632 from sarutak/followup-SPARK-35226.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-22 22:31:43 -07:00
Kousuke Saruta a59a214610 [MINOR][FOLLOWUP] Update SHA for the oracle docker image
### What changes were proposed in this pull request?

This PR updates SHA for the oracle docker image in the comment in `OracleIntegrationSuite` and `v2.OracleIntegrationSuite`.
The SHA for the latest image is `3f422c4a35b423dfcdbcc57a84f01db6c82eb6c1`

### Why are the changes needed?

The script name for creating the oracle docker image is changed in #32629, following the latest image so we also need to update the corresponding SHA.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The value is from `git log`.

Closes #32630 from sarutak/followup-oracle-script-name.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-22 19:52:24 -07:00
Kousuke Saruta 0549caf07a [MINOR][SQL] Change the script name for creating oracle docker image
### What changes were proposed in this pull request?

This PR changes the name in the comment in `jdbc.OracleIntegrationSuite` and `v2.OracleIntegrationSuite`.
The script is for creating oracle docker image.

### Why are the changes needed?

The name of the script is `buildContainerImage`, not `buildDockerImage` now.
- d918f5a4c6 (diff-be303ab32e74192aca829e5ea259a0aec07aac23a6049120fb337ec4efa601b0)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I confirmed that I can build the image with `./buildContainerImage.sh -v 18.4.0 -x`.

Closes #32629 from sarutak/change-oracle-container-script-name.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-21 22:58:44 -07:00
Max Gekk 2b08070e79 [SPARK-35427][SQL][TESTS] Check the EXCEPTION rebase mode for Avro/Parquet
### What changes were proposed in this pull request?
Add tests to check the `EXCEPTION` rebase mode explicitly in the datasources:
- Parquet: `DATE` type and `TIMESTAMP`: `INT96`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS`
- Avro: `DATE` type and `TIMESTAMP`: `timestamp-millis` and `timestamp-micros`.

### Why are the changes needed?
1. To improve test coverage
2. The `EXCEPTION` rebase mode should be checked independently from the default settings.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *AvroV2Suite"
$ build/sbt "test:testOnly *ParquetRebaseDatetimeV1Suite"
```

Closes #32574 from MaxGekk/test-rebase-exception.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 06:18:06 +00:00
Max Gekk 2bd32548f5 [SPARK-35459][SQL][TESTS] Move AvroRowReaderSuite to a separate file
### What changes were proposed in this pull request?
Move `AvroRowReaderSuite` out from `AvroSuite.scala` and place it to `AvroRowReaderSuite.scala`.

### Why are the changes needed?
To improve code maintenance. Usually, independent test suites are placed to separate files.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *AvroRowReaderSuite"
$ build/sbt "test:testOnly *AvroV1Suite"
$ build/sbt "test:testOnly *AvroV2Suite"
```

Closes #32607 from MaxGekk/move-AvroRowReaderSuite.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-20 20:04:10 +09:00
Linhong Liu 6aa2594c6b [SPARK-35366][SQL] Avoid using deprecated buildForBatch and buildForStreaming
### What changes were proposed in this pull request?
Currently, in DSv2, we are still using the deprecated `buildForBatch` and `buildForStreaming`.
This PR implements the `build`, `toBatch`, `toStreaming` interfaces to replace the deprecated ones.

### Why are the changes needed?
Code refactor

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exsting UT

Closes #32497 from linhongliu-db/dsv2-writer.

Lead-authored-by: Linhong Liu <linhong.liu@databricks.com>
Co-authored-by: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-13 17:23:08 +00:00
Yijia Cui bbdbe0f734 [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay
### What changes were proposed in this pull request?
This pull request proposes a new API for streaming sources to signal that they can report metrics, and adds a use case to support Kafka micro batch stream to report the stats of # of offsets for the current offset falling behind the latest.

A public interface is added.

`metrics`: returns the metrics reported by the streaming source with given offset.

### Why are the changes needed?
The new API can expose any custom metrics for the "current" offset for streaming sources. Different from #31398, this PR makes metrics available to user through progress report, not through spark UI. A use case is that people want to know how the current offset falls behind the latest offset.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test for Kafka micro batch source v2 are added to test the Kafka use case.

Closes #31944 from yijiacui-db/SPARK-34297.

Authored-by: Yijia Cui <yijia.cui@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-05-05 17:26:07 +09:00
Kousuke Saruta 529b875901 [SPARK-35226][SQL] Support refreshKrb5Config option in JDBC datasources
### What changes were proposed in this pull request?

This PR proposes to introduce a new JDBC option `refreshKrb5Config` which allows to reflect the change of `krb5.conf`.

### Why are the changes needed?

In the current master, JDBC datasources can't accept `refreshKrb5Config` which is defined in `Krb5LoginModule`.
So even if we change the `krb5.conf` after establishing a connection, the change will not be reflected.

The similar issue happens when we run multiple `*KrbIntegrationSuites` at the same time.
`MiniKDC` starts and stops every KerberosIntegrationSuite and different port number is recorded to `krb5.conf`.
Due to `SecureConnectionProvider.JDBCConfiguration` doesn't take `refreshKrb5Config`, KerberosIntegrationSuites except the first running one see the wrong port so those suites fail.
You can easily confirm with the following command.
```
build/sbt -Phive Phive-thriftserver -Pdocker-integration-tests "testOnly org.apache.spark.sql.jdbc.*KrbIntegrationSuite"
```
### Does this PR introduce _any_ user-facing change?

Yes. Users can set `refreshKrb5Config` to refresh krb5 relevant configuration.

### How was this patch tested?

New test.

Closes #32344 from sarutak/kerberos-refresh-issue.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-04-29 13:55:53 +09:00
Cheng Su 7f51106c0d [SPARK-26164][SQL] Allow concurrent writers for writing dynamic partitions and bucket table
### What changes were proposed in this pull request?

This is a re-proposal of https://github.com/apache/spark/pull/23163. Currently spark always requires a [local sort](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L188) before writing to output table with dynamic partition/bucket columns. The sort can be unnecessary if cardinality of partition/bucket values is small, and can be avoided by keeping multiple output writers concurrently.

This PR introduces a config `spark.sql.maxConcurrentOutputFileWriters` (which disables this feature by default), where user can tune the maximal number of concurrent writers. The config is needed here as we cannot keep arbitrary number of writers in task memory which can cause OOM (especially for Parquet/ORC vectorization writer).

The feature is to first use concurrent writers to write rows. If the number of writers exceeds the above config specified limit. Sort rest of rows and write rows one by one (See `DynamicPartitionDataConcurrentWriter.writeWithIterator()`).

In addition, interface `WriteTaskStatsTracker` and its implementation `BasicWriteTaskStatsTracker` are also changed because previously they are relying on the assumption that only one writer is active for writing dynamic partitions and bucketed table.

### Why are the changes needed?

Avoid the sort before writing output for dynamic partitioned query and bucketed table.
Help improve CPU and IO performance for these queries.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `DataFrameReaderWriterSuite.scala`.

Closes #32198 from c21/writer.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-04-27 05:37:08 +00:00
Liang-Chi Hsieh bdac19184a [SPARK-35230][SQL] Move custom metric classes to proper package
### What changes were proposed in this pull request?

This patch moves DS v2 custom metric classes to `org.apache.spark.sql.connector.metric` package. Moving `CustomAvgMetric` and `CustomSumMetric` to above package and make them as public java abstract class too.

### Why are the changes needed?

`CustomAvgMetric` and `CustomSumMetric`  should be public APIs for developers to extend. As there are a few metric classes, we should put them together in one package.

### Does this PR introduce _any_ user-facing change?

No, dev only and they are not released yet.

### How was this patch tested?

Unit tests.

Closes #32348 from viirya/move-custom-metric-classes.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-04-26 07:19:36 -07:00
Dongjoon Hyun b108e7fff3 [SPARK-33913][SS] Upgrade Kafka to 2.8.0
### What changes were proposed in this pull request?

This PR aims to upgrade Kafka client to 2.8.0.
Note that Kafka 2.8.0 uses ZSTD JNI 1.4.9-1 like Apache Spark 3.2.0.

### Why are the changes needed?

This will bring the latest client-side improvement and bug fixes like the following examples.

- KAFKA-10631 ProducerFencedException is not Handled on Offest Commit
- KAFKA-10134 High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
- KAFKA-12193 Re-resolve IPs when a client is disconnected
- KAFKA-10090 Misleading warnings: The configuration was supplied but isn't a known config
- KAFKA-9263 The new hw is added to incorrect log when  ReplicaAlterLogDirsThread is replacing log
- KAFKA-10607 Ensure the error counts contains the NONE
- KAFKA-10458 Need a way to update quota for TokenBucket registered with Sensor
- KAFKA-10503 MockProducer doesn't throw ClassCastException when no partition for topic

**RELEASE NOTE**
- https://downloads.apache.org/kafka/2.8.0/RELEASE_NOTES.html
- https://downloads.apache.org/kafka/2.7.0/RELEASE_NOTES.html

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with the existing tests because this is a dependency change.

Closes #32325 from dongjoon-hyun/SPARK-33913.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2021-04-25 16:20:22 +09:00
Liang-Chi Hsieh b2a2b5d820 [SPARK-34297][SQL][SS] Add metrics for data loss and offset out range for KafkaMicroBatchStream
### What changes were proposed in this pull request?

This patch proposes to add a couple of metrics in scan node for Kafka batch streaming query.

### Why are the changes needed?

When testing SS, I found it is hard to track data loss of SS reading from Kafka. The micro batch scan node has only one metric, number of output rows. Users have no idea how many offsets to fetch are out of Kafka, how many times data loss happens. These metrics are important for users to know the quality of SS query running.

### Does this PR introduce _any_ user-facing change?

Yes, adding two metrics to micro batch scan node for Kafka batch streaming.

### How was this patch tested?

Currently I tested on internal cluster with Kafka:

<img width="1193" alt="Screen Shot 2021-04-22 at 7 16 29 PM" src="https://user-images.githubusercontent.com/68855/115808460-61bf8100-a39f-11eb-99a9-65d22c3f5fb0.png">

I was trying to add unit test. But as our batch streaming query disallows to specify ending offsets. If I only specify an out-of-range starting offset, when we get offset range in `getRanges`,  any negative size range will be filtered out. So it cannot actually test the case of fetched non-existing offset.

Closes #31398 from viirya/micro-batch-metrics.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-04-23 13:56:53 -07:00
Kousuke Saruta ba92de0ae5 [SPARK-34843][SQL][FOLLOWUP] Fix a test failure in OracleIntegrationSuite
### What changes were proposed in this pull request?

This PR fixes a test failure in `OracleIntegrationSuite`.
After SPARK-34843 (#31965), the way to divide partitions is changed and `OracleIntegrationSuites` is affected.
```
[info] - SPARK-22814 support date/timestamp types in partitionColumn *** FAILED *** (230 milliseconds)
[info]   Set(""D" < '2018-07-11' or "D" is null", ""D" >= '2018-07-11' AND "D" < '2018-07-15'", ""D" >= '2018-07-15'") did not equal Set(""D" < '2018-07-10' or "D" is null", ""D" >= '2018-07-10' AND "D" < '2018-07-14'", ""D" >= '2018-07-14'") (OracleIntegrationSuite.scala:448)
[info]   Analysis:
[info]   Set(missingInLeft: ["D" < '2018-07-10' or "D" is null, "D" >= '2018-07-10' AND "D" < '2018-07-14', "D" >= '2018-07-14'], missingInRight: ["D" < '2018-07-11' or "D" is null, "D" >= '2018-07-11' AND "D" < '2018-07-15', "D" >= '2018-07-15'])
```

### Why are the changes needed?

To follow the previous change.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The modified test.

Closes #32186 from sarutak/fix-oracle-date-error.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-04-15 07:07:34 -07:00
Ali Afroozeh 0945baf906 [SPARK-34989] Improve the performance of mapChildren and withNewChildren methods
### What changes were proposed in this pull request?
One of the main performance bottlenecks in query compilation is overly-generic tree transformation methods, namely `mapChildren` and `withNewChildren` (defined in `TreeNode`). These methods have an overly-generic implementation to iterate over the children and rely on reflection to create new instances. We have observed that, especially for queries with large query plans, a significant amount of CPU cycles are wasted in these methods. In this PR we make these methods more efficient, by delegating the iteration and instantiation to concrete node types. The benchmarks show that we can expect significant performance improvement in total query compilation time in queries with large query plans (from 30-80%) and about 20% on average.

#### Problem detail
The `mapChildren` method in `TreeNode` is overly generic and costly. To be more specific, this method:
- iterates over all the fields of a node using Scala’s product iterator. While the iteration is not reflection-based, thanks to the Scala compiler generating code for `Product`, we create many anonymous functions and visit many nested structures (recursive calls).
The anonymous functions (presumably compiled to Java anonymous inner classes) also show up quite high on the list in the object allocation profiles, so we are putting unnecessary pressure on GC here.
- does a lot of comparisons. Basically for each element returned from the product iterator, we check if it is a child (contained in the list of children) and then transform it. We can avoid that by just iterating over children, but in the current implementation, we need to gather all the fields (only transform the children) so that we can instantiate the object using the reflection.
- creates objects using reflection, by delegating to the `makeCopy` method, which is several orders of magnitude slower than using the constructor.

#### Solution
The proposed solution in this PR is rather straightforward: we rewrite the `mapChildren` method using the `children` and `withNewChildren` methods. The default `withNewChildren` method suffers from the same problems as `mapChildren` and we need to make it more efficient by specializing it in concrete classes.  Similar to how each concrete query plan node already defines its children, it should also define how they can be constructed given a new list of children. Actually, the implementation is quite simple in most cases and is a one-liner thanks to the copy method present in Scala case classes. Note that we cannot abstract over the copy method, it’s generated by the compiler for case classes if no other type higher in the hierarchy defines it. For most concrete nodes, the implementation of `withNewChildren` looks like this:
```
override def withNewChildren(newChildren: Seq[LogicalPlan]): LogicalPlan = copy(children = newChildren)
```
The current `withNewChildren` method has two properties that we should preserve:

- It returns the same instance if the provided children are the same as its children, i.e., it preserves referential equality.
- It copies tags and maintains the origin links when a new copy is created.

These properties are hard to enforce in the concrete node type implementation. Therefore, we propose a template method `withNewChildrenInternal` that should be rewritten by the concrete classes and let the `withNewChildren` method take care of referential equality and copying:
```
override def withNewChildren(newChildren: Seq[LogicalPlan]): LogicalPlan = {
 if (childrenFastEquals(children, newChildren)) {
   this
 } else {
   CurrentOrigin.withOrigin(origin) {
     val res = withNewChildrenInternal(newChildren)
     res.copyTagsFrom(this)
     res
   }
 }
}
```

With the refactoring done in a previous PR (https://github.com/apache/spark/pull/31932) most tree node types fall in one of the categories of `Leaf`, `Unary`, `Binary` or `Ternary`. These traits have a more efficient implementation for `mapChildren` and define a more specialized version of `withNewChildrenInternal` that avoids creating unnecessary lists. For example, the `mapChildren` method in `UnaryLike` is defined as follows:
```
  override final def mapChildren(f: T => T): T = {
    val newChild = f(child)
    if (newChild fastEquals child) {
      this.asInstanceOf[T]
    } else {
      CurrentOrigin.withOrigin(origin) {
        val res = withNewChildInternal(newChild)
        res.copyTagsFrom(this.asInstanceOf[T])
        res
      }
    }
  }
```

#### Results
With this PR, we have observed significant performance improvements in query compilation time, more specifically in the analysis and optimization phases. The table below shows the TPC-DS queries that had more than 25% speedup in compilation times. Biggest speedups are observed in queries with large query plans.
| Query  | Speedup |
| ------------- | ------------- |
|q4    |29%|
|q9    |81%|
|q14a  |31%|
|q14b  |28%|
|q22   |33%|
|q33   |29%|
|q34   |25%|
|q39   |27%|
|q41   |27%|
|q44   |26%|
|q47   |28%|
|q48   |76%|
|q49   |46%|
|q56   |26%|
|q58   |43%|
|q59   |46%|
|q60   |50%|
|q65   |59%|
|q66   |46%|
|q67   |52%|
|q69   |31%|
|q70   |30%|
|q96   |26%|
|q98   |32%|

#### Binary incompatibility
Changing the `withNewChildren` in `TreeNode` breaks the binary compatibility of the code compiled against older versions of Spark because now it is expected that concrete `TreeNode` subclasses all implement the `withNewChildrenInternal` method. This is a problem, for example, when users write custom expressions. This change is the right choice, since it forces all newly added expressions to Catalyst implement it in an efficient manner and will prevent future regressions.
Please note that we have not completely removed the old implementation and renamed it to `legacyWithNewChildren`. This method will be removed in the future and for now helps the transition. There are expressions such as `UpdateFields` that have a complex way of defining children. Writing `withNewChildren` for them requires refactoring the expression. For now, these expressions use the old, slow method. In a future PR we address these expressions.

### Does this PR introduce _any_ user-facing change?

This PR does not introduce user facing changes but my break binary compatibility of the code compiled against older versions. See the binary compatibility section.

### How was this patch tested?

This PR is mainly a refactoring and passes existing tests.

Closes #32030 from dbaliafroozeh/ImprovedMapChildren.

Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: herman <herman@databricks.com>
2021-04-09 15:06:26 +02:00
William Hyun ba32b200e4 [SPARK-34479][FOLLOWUP][DOC] Add zstandard codec to AvroOptions.compression comment
### What changes were proposed in this pull request?

This PR aims to add zstandard codec to the `AvroOptions.compression` comment.

### Why are the changes needed?

SPARK-34479 added zstandard codec.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
N/A

Closes #32050 from williamhyun/avro.

Authored-by: William Hyun <williamhyun3@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-04-05 07:30:59 +08:00
HyukjinKwon ebf01ec3c1 [SPARK-34950][TESTS] Update benchmark results to the ones created by GitHub Actions machines
### What changes were proposed in this pull request?

https://github.com/apache/spark/pull/32015 added a way to run benchmarks much more easily in the same GitHub Actions build. This PR updates the benchmark results by using the way.

**NOTE** that looks like GitHub Actions use four types of CPU given my observations:

- Intel(R) Xeon(R) Platinum 8171M CPU  2.60GHz
- Intel(R) Xeon(R) CPU E5-2673 v4  2.30GHz
- Intel(R) Xeon(R) CPU E5-2673 v3  2.40GHz
- Intel(R) Xeon(R) Platinum 8272CL CPU  2.60GHz

Given my quick research, seems like they perform roughly similarly:

![Screen Shot 2021-04-03 at 9 31 23 PM](https://user-images.githubusercontent.com/6477701/113478478-f4b57b80-94c3-11eb-9047-f81ca8c59672.png)

I couldn't find enough information about Intel(R) Xeon(R) Platinum 8272CL CPU  2.60GHz but the performance seems roughly similar given the numbers.

So shouldn't be a big deal especially given that this way is much easier, encourages contributors to run more and guarantee the same number of cores and same memory with the same softwares.

### Why are the changes needed?

To have a base line of the benchmarks accordingly.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

It was generated from:

- [Run benchmarks: * (JDK 11)](https://github.com/HyukjinKwon/spark/actions/runs/713575465)
- [Run benchmarks: * (JDK 8)](https://github.com/HyukjinKwon/spark/actions/runs/713154337)

Closes #32044 from HyukjinKwon/SPARK-34950.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-04-03 23:02:56 +03:00
yangjie01 7158e7f986 [SPARK-34900][TEST] Make sure benchmarks can run using spark-submit cmd described in the guide
### What changes were proposed in this pull request?
Some `spark-submit`  commands used to run benchmarks in the user's guide is wrong, we can't use these commands to run benchmarks successful.

So the major changes of this pr is correct these wrong commands, for example, run a benchmark which inherits from `SqlBasedBenchmark`, we must specify `--jars <spark core test jar>,<spark catalyst test jar>` because `SqlBasedBenchmark` based benchmark extends `BenchmarkBase(defined in spark core test jar)` and `SQLHelper(defined in spark catalyst test jar)`.

Another change of this pr is removed the `scalatest Assertions` dependency of Benchmarks because `scalatest-*.jar` are not in the distribution package, it will be troublesome to use.

### Why are the changes needed?
Make sure benchmarks can run using spark-submit cmd described in the guide

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Use the corrected `spark-submit` commands to run benchmarks successfully.

Closes #31995 from LuciferYang/fix-benchmark-guide.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-30 11:58:01 +09:00
Peter Toth 93a5d34f84 [SPARK-33482][SPARK-34756][SQL] Fix FileScan equality check
### What changes were proposed in this pull request?

This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.

### Why are the changes needed?
- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.

### Does this PR introduce _any_ user-facing change?
Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.

### How was this patch tested?
Added new UTs.

Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-23 17:01:16 +08:00
Ismaël Mejía 8a552bfc76 [SPARK-34778][BUILD] Upgrade to Avro 1.10.2
### What changes were proposed in this pull request?
Update the  Avro version to 1.10.2

### Why are the changes needed?
To stay up to date with upstream and catch compatibility issues with zstd

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests

Closes #31866 from iemejia/SPARK-27733-upgrade-avro-1.10.2.

Authored-by: Ismaël Mejía <iemejia@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-03-22 19:30:14 +08:00
Josh Soref f4de93efb0 [MINOR][SQL] Spelling: filters - PushedFilers
### What changes were proposed in this pull request?
Consistently correct the spelling of `PushedFilters`

### Why are the changes needed?
bersprockets noted that it's wrong

### Does this PR introduce _any_ user-facing change?

Technically, I think it does. Practically, neither Google nor GitHub show anyone using `pushedFilers` outside of forks (or the discussion about fixing it started at https://github.com/apache/spark/pull/30323#issuecomment-725568719)

### How was this patch tested?
None beyond CI in the previous PR

Closes #30678 from jsoref/spelling-filters.

Authored-by: Josh Soref <jsoref@users.noreply.github.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-03-22 08:00:12 +03:00
William Hyun c799d049fc [SPARK-34810][TEST] Update PostgreSQL test with the latest results
### What changes were proposed in this pull request?

This PR aims to update `PostgresIntegrationSuite` with the latest results.

### Why are the changes needed?

The latest PostgreSQL jar version is 42.2.19. Since 42.2.9, the test is broken because it returns `0.0` instead of `0.00`.
- https://jdbc.postgresql.org/documentation/changelog.html#version_42.2.19

42.2.9 (2019-12-06)
42.2.10 (2020-01-30)
42.2.11 (2020-03-09)
42.2.12 (2020-03-31)
42.2.13 (2020-06-04)
42.2.14 (2020-06-10)
42.2.15 (2020-08-14)
42.2.16 (2020-08-20)
42.2.17 (2020-10-09)
42.2.18 (2020-10-15)
42.2.19 (2021-02-18)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CI with the updated test cases.

```
build/sbt -Pdocker-integration-tests 'docker-integration-tests/testOnly org.apache.spark.sql.jdbc.PostgresIntegrationSuite'
```

Closes #31910 from williamhyun/pg.

Authored-by: William Hyun <williamhyun3@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-03-21 13:36:45 +08:00
Dongjoon Hyun 631a85ed9b [SPARK-34650][BUILD][SS] Exclude zstd-jni transitive dependency from Kafka Client
### What changes were proposed in this pull request?

This PR aims to exclude `zstd-jni` transitive dependency from kafka-client.

### Why are the changes needed?

To prevent future conflicts, the followings are removed. We should use Spark's zstd-jni dependency consistently.

```
$ build/sbt "token-provider-kafka-0-10/dependencyTree" | grep zstd
[info]   | +-com.github.luben:zstd-jni:1.4.4-7

$ build/sbt "streaming-kafka-0-10/dependencyTree" | grep zstd
[info]   | +-com.github.luben:zstd-jni:1.4.4-7
[info]   | | +-com.github.luben:zstd-jni:1.4.4-7

$ build/sbt "sql-kafka-0-10/dependencyTree" | grep zstd
[info]   | +-com.github.luben:zstd-jni:1.4.4-7
[info]   | | +-com.github.luben:zstd-jni:1.4.4-7
```
### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #31767 from dongjoon-hyun/SPARK-34650.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-07 13:53:55 +09:00
HyukjinKwon 3d0ee9604e [SPARK-34520][CORE][FOLLOW-UP] Remove SecurityManager in GangliaSink
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/31636. There was one place missed in `GangliaSink`, and we should also remove `SecurityManager`.

### Why are the changes needed?

To make `GangliaSink` work.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It was found in the internal it tests in the company I work for.

Closes #31688 from HyukjinKwon/SPARK-34520-followup.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-01 11:18:57 +09:00
Yuming Wang 54c053afb0 [SPARK-34479][SQL] Add zstandard codec to Avro compression codec list
### What changes were proposed in this pull request?

Avro add zstandard codec since AVRO-2195. This pr add zstandard codec to Avro compression codec list.

### Why are the changes needed?

To make Avro support zstandard codec.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #31673 from wangyum/SPARK-34479.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-02-27 10:31:42 -08:00
Max Gekk 5957bc18a1 [SPARK-34451][SQL] Add alternatives for datetime rebasing SQL configs and deprecate legacy configs
### What changes were proposed in this pull request?
Move the datetime rebase SQL configs from the `legacy` namespace by:
1. Renaming of the existing rebase configs like `spark.sql.legacy.parquet.datetimeRebaseModeInRead` -> `spark.sql.parquet.datetimeRebaseModeInRead`.
2. Add the legacy configs as alternatives
3. Deprecate the legacy rebase configs.

### Why are the changes needed?
The rebasing SQL configs like `spark.sql.legacy.parquet.datetimeRebaseModeInRead` can be used not only for migration from previous Spark versions but also to read/write datatime columns saved by other systems/frameworks/libs. So, the configs shouldn't be considered as legacy configs.

### Does this PR introduce _any_ user-facing change?
Should not. Users will see a warning if they still use one of the legacy configs.

### How was this patch tested?
1. Manually checking new configs:
```scala
scala> spark.conf.get("spark.sql.parquet.datetimeRebaseModeInRead")
res0: String = EXCEPTION

scala> spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY")
21/02/17 14:57:10 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInRead' instead.

scala> spark.conf.get("spark.sql.parquet.datetimeRebaseModeInRead")
res2: String = LEGACY
```
2. By running a datetime rebasing test suite:
```
$ build/sbt "test:testOnly *ParquetRebaseDatetimeV1Suite"
```

Closes #31576 from MaxGekk/rebase-confs-alternatives.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-02-17 14:04:47 +00:00
Kousuke Saruta dd6383f0a3 [SPARK-34333][SQL] Fix PostgresDialect to handle money types properly
### What changes were proposed in this pull request?

This PR changes the type mapping for `money` and `money[]`  types for PostgreSQL.
Currently, those types are tried to convert to `DoubleType` and `ArrayType` of `double` respectively.
But the JDBC driver seems not to be able to handle those types properly.

https://github.com/pgjdbc/pgjdbc/issues/100
https://github.com/pgjdbc/pgjdbc/issues/1405

Due to these issue, we can get the error like as follows.

money type.
```
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.1.204 executor driver): org.postgresql.util.PSQLException: Bad value for type double : 1,000.00
[info] 	at org.postgresql.jdbc.PgResultSet.toDouble(PgResultSet.java:3104)
[info] 	at org.postgresql.jdbc.PgResultSet.getDouble(PgResultSet.java:2432)
[info] 	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$5(JdbcUtils.scala:418)
```

money[] type.
```
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.1.204 executor driver): org.postgresql.util.PSQLException: Bad value for type double : $2,000.00
[info] 	at org.postgresql.jdbc.PgResultSet.toDouble(PgResultSet.java:3104)
[info] 	at org.postgresql.jdbc.ArrayDecoding$5.parseValue(ArrayDecoding.java:235)
[info] 	at org.postgresql.jdbc.ArrayDecoding$AbstractObjectStringArrayDecoder.populateFromString(ArrayDecoding.java:122)
[info] 	at org.postgresql.jdbc.ArrayDecoding.readStringArray(ArrayDecoding.java:764)
[info] 	at org.postgresql.jdbc.PgArray.buildArray(PgArray.java:310)
[info] 	at org.postgresql.jdbc.PgArray.getArrayImpl(PgArray.java:171)
[info] 	at org.postgresql.jdbc.PgArray.getArray(PgArray.java:111)
```

For money type, a known workaround is to treat it as string so this PR do it.
For money[], however, there is no reasonable workaround so this PR remove the support.

### Why are the changes needed?

This is a bug.

### Does this PR introduce _any_ user-facing change?

Yes. As of this PR merged, money type is mapped to `StringType` rather than `DoubleType` and the support for money[] is stopped.
For money type, if the value is less than one thousand,  `$100.00` for instance, it works without this change so I also updated the migration guide because it's a behavior change for such small values.
On the other hand, money[] seems not to work with any value but mentioned in the migration guide just in case.

### How was this patch tested?

New test.

Closes #31442 from sarutak/fix-for-money-type.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-02-17 10:50:06 +09:00
oraviv 3d39dfa8c3 [SPARK-34416][SQL] Adding support for user provided schema url in Avro
### What changes were proposed in this pull request?

Added option to provide Avro schema by URL.

### Why are the changes needed?
(copied from Jira ticket)

We have a use case in which we read a huge table in Avro format. About 30k columns.

using the default Hive reader - `AvroGenericRecordReader` it is just hangs forever. after 4 hours not even one task has finished.

We tried instead to use `spark.read.format("com.databricks.spark.avro").load(..)` but we failed on:

```

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema

..

at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:85)
at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:67)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:421)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
... 53 elided

```

because files schema contain duplicate column names (when considering case-insensitive).

So we wanted to provide a user schema with non-duplicated fields, but the schema is huge. a few MBs. it is not practical to provide it in json format.

So we patched spark-avro to be able to get also `avroSchemaUrl` in addition to `avroSchema` and it worked perfectly.

### How was this patch tested?
added a unitest to AvroSuite and tested locally with patched version

Closes #31543 from uzadude/avro_schema.

Authored-by: oraviv <oraviv@paypal.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-02-14 13:43:57 -08:00
Max Gekk c082c537de [SPARK-34404][SQL] Add new Avro datasource options to control datetime rebasing in read
### What changes were proposed in this pull request?
In the PR, I propose new option `datetimeRebaseMode` for the Avro datasource. The option influences on loading ancient dates and timestamps column values from avro files.

The option supports the same values as the SQL config `spark.sql.legacy.avro.datetimeRebaseModeInRead` namely;
- `"LEGACY"`, when an option is set to this value, Spark rebases dates/timestamps from the legacy hybrid calendar (Julian + Gregorian) to the Proleptic Gregorian calendar.
- `"CORRECTED"`, dates/timestamps are read AS IS from avro files.
- `"EXCEPTION"`, when it is set as an option value, Spark will fail the reading if it sees ancient dates/timestamps that are ambiguous between the two calendars.

### Why are the changes needed?
1. New options will allow to load avro files from at least two sources in different rebasing modes in the same query. For instance:
```scala
val df1 = spark.read.option("datetimeRebaseMode", "legacy").format("avro").load(folder1)
val df2 = spark.read.option("datetimeRebaseMode", "corrected").format("avro").load(folder2)
df1.join(df2, ...)
```
Before the changes, it is impossible because the SQL config `spark.sql.legacy.avro.datetimeRebaseModeInRead` influences on both reads.

2. Mixing of Dataset/DataFrame and RDD APIs should become possible. Since SQL configs are not propagated through RDDs, the following code fails on ancient timestamps:
```scala
spark.conf.set("spark.sql.legacy.avro.datetimeRebaseModeInRead", "legacy")
spark.read.format("avro").load(folder).distinct.rdd.collect()
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "test:testOnly *AvroV1Suite"
$ build/sbt "test:testOnly *AvroV2Suite"
```

Closes #31529 from MaxGekk/avro-rebase-options.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-02-10 06:23:10 +00:00
Kousuke Saruta f79305a402 [SPARK-34311][SQL] PostgresDialect can't treat arrays of some types
### What changes were proposed in this pull request?

This PR fixes the issue that `PostgresDialect` can't treat arrays of some types.
Though PostgreSQL supports wide range of types (https://www.postgresql.org/docs/13/datatype.html),  the current `PostgresDialect` can't treat arrays of the following types.

* xml
* tsvector
* tsquery
* macaddr
* macaddr8
* txid_snapshot
* pg_snapshot
* point
* line
* lseg
* box
* path
* polygon
* circle
* pg_lsn
* bit varying
* interval

NOTE: PostgreSQL doesn't implement arrays of serial types so this PR doesn't care about them.

### Why are the changes needed?

To provide better support with PostgreSQL.

### Does this PR introduce _any_ user-facing change?

Yes. PostgresDialect can handle arrays of types shown above.

### How was this patch tested?

New test.

Closes #31419 from sarutak/postgres-array-types.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-02-10 11:29:14 +09:00
Kousuke Saruta 1f4135c4bd [SPARK-34350][SQL][TESTS] replace withTimeZone defined in OracleIntegrationSuite with DateTimeTestUtils.withDefaultTimeZone
### What changes were proposed in this pull request?

This PR replaces `withTimeZone` defined and used in `OracleIntegrationSuite` with `DateTimeTestUtils.withDefaultTimeZone` which is defined as a utility method.

### Why are the changes needed?

Both methods are semantically the same so it might be better to use the utility one.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`OracleIntegrationSuite` passes.

Closes #31465 from sarutak/oracle-timezone-util.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-02-05 23:26:22 +09:00
Kousuke Saruta 55399eb3b2 [SPARK-34343][SQL][TESTS] Add missing test for some non-array types in PostgreSQL
### What changes were proposed in this pull request?

This PR added tests for some non-array types in PostgreSQL.
PostgreSQL supports wide range of types (https://www.postgresql.org/docs/13/datatype.html) and `PostgresIntegrationSuite` contains tests for some types but ones for the following types are missing.

* bit varying
* point
* line
* lseg
* box
* path
* polygon
* circle
* pg_lsn
* macaddr
* macaddr8
* numeric
* pg_snapshot
* real
* time
* timestamp
* tsquery
* tsvector
* txid_snapshot
* xml

NOTE: Handling money types can be buggy so this PR doesn't add tests for those types.

### Why are the changes needed?

To ensure those types work with Spark well.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Extended `PostgresIntegrationSuite`.

Closes #31456 from sarutak/test-for-some-types-postgresql.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-02-05 14:27:14 +09:00
Jungtaek Lim (HeartSaVioR) fbe726f5b1 [SPARK-34339][CORE][SQL] Expose the number of total paths in Utils.buildLocationMetadata()
### What changes were proposed in this pull request?

This PR proposes to expose the number of total paths in Utils.buildLocationMetadata(), with relaxing space usage a bit (around 10+ chars).

Suppose the first 2 of 5 paths are only fit to the threshold, the outputs between the twos are below:

* before the change: `[path1, path2]`
* after the change: `(5 paths)[path1, path2, ...]`

### Why are the changes needed?

SPARK-31793 silently truncates the paths hence end users can't indicate how many paths are truncated, and even more, whether paths are truncated or not.

### Does this PR introduce _any_ user-facing change?

Yes, the location metadata will also show how many paths are truncated (not shown), instead of silently truncated.

### How was this patch tested?

Modified UTs

Closes #31464 from HeartSaVioR/SPARK-34339.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-02-05 09:37:38 +09:00
Hoa 7675582dab [SPARK-34357][SQL] Map JDBC SQL TIME type to TimestampType with time portion fixed regardless of timezone
### What changes were proposed in this pull request?

Due to user-experience (confusing to Spark users - java.sql.Time using milliseconds vs Spark using microseconds; and user losing useful functions like hour(), minute(), etc on the column), we have decided to revert back to use TimestampType but this time we will enforce the hour to be consistently across system timezone (via offset manipulation) and date part fixed to zero epoch.

Full Discussion with Wenchen Fan Wenchen Fan regarding this ticket is here https://github.com/apache/spark/pull/30902#discussion_r569186823

### Why are the changes needed?

Revert and improvement to sql.Time handling

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests and integration tests

Closes #31473 from saikocat/SPARK-34357.

Authored-by: Hoa <hoameomu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-02-04 17:16:39 +00:00
Erik Krogen 791de00ddf [SPARK-34182][AVRO] Improve error messages when matching Catalyst-to-Avro schemas
### What changes were proposed in this pull request?
Improve the error messages for incompatibilities between Avro and Catalyst schemas. First, make `AvroSerializer` more similar to `AvroDeserializer` in printing out contextual information such as hierarchical field names. Standardize exception messages in both serializer and deserializer to always include such contextual information, and include a top-level exception which shows the full schemas which were being parsed when the incompatibility was found. Both now print out the hierarchical name for both the Avro and Catalyst fields, since they may be different due to case sensitivity and Avro union handling.

### Why are the changes needed?
The error messages in this type of failure scenario are very lacking in information on the write path (`AvroSerializer`). Below are two examples of messages that provide insufficient information to determine what went wrong (lacking in field names, context about the overall schema structure, etc.).
```
org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type IntegerType to Avro type "float".

org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type StructType(StructField(bar,IntegerType,true)) to Avro type {"type":"record","name":"test","fields":[{"name":"NOTbar","type":["null","int"],"default":null}]}.
```
The error messages currently existing in `AvroDeserializer` are much better, but still not very internally consistent, and it would be better if they were consistent with the newly added exception messages in `AvroSerializer`.

### Does this PR introduce _any_ user-facing change?
Error messages when there are incompatibilities between Avro and Catalyst schemas will be greatly improved on when writing Avro data using the `avroSchema` option, a little bit improved when reading Avro data, and much more consistent between the two.

Below is an example of a new message. See `AvroSerdeSuite` for more examples.
```
org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type STRUCT<`foo`: STRUCT<`bar`: INT>> to Avro type {"type":"record","name":"top","fields":[{"name":"foo","type":"int"}]}
	at org.apache.spark.sql.avro.AvroSerializer.liftedTree1$1(AvroSerializer.scala:83)
...
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst field 'foo' to Avro field 'foo' because schema is incompatible (sqlType = STRUCT<`bar`: INT>, avroType = "int")
	at org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:230)
...
```

### How was this patch tested?
New unit test suite, `AvroSerdeSuite`, was added to test corner cases on `AvroSerializer` and `AvroDeserializer` and verify that the exception messages are as expected. Existing tests in `AvroSuite` also continue to pass, with modifications in places where assertions were made about the exceptions that would be thrown.

Closes #31333 from xkrogen/xkrogen-SPARK-34182-avro-serde-errormessages.

Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-02-03 08:49:36 +00:00