Commit graph

843 commits

Author SHA1 Message Date
yangjie01 900908b9be [SPARK-36410][CORE][SQL][STRUCTURED STREAMING][EXAMPLES] Replace anonymous classes with lambda expressions
### What changes were proposed in this pull request?
The main change of this pr is replace anonymous classes with lambda expressions in Java code

**Before**
```java
 new Thread(new Runnable() {
    Override
    public void run() {
      // run thread
    }
  });
```

**After**

```java
new Thread(() -> {
    // run thread
  });
```

### Why are the changes needed?
Code Simpilefications.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the Jenkins or GitHub Action
- Manually test `JavaUserDefinedScalar` with command
   `bin/spark-submit run-example org.apache.spark.examples.sql.JavaUserDefinedScalar` passed

Closes #33635 from LuciferYang/lambda.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-09 19:28:31 +09:00
Angerszhuuuu f3e079b09b [SPARK-36271][SQL] Unify V1 insert check field name before prepare writter
### What changes were proposed in this pull request?
Unify DataSource V1 insert schema check field name before prepare writer.
And in this PR we add check for avro V1 insert too.

### Why are the changes needed?
Unify code and add check for avro V1 insert too.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #33566 from AngersZhuuuu/SPARK-36271.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-09 17:18:06 +08:00
Max Gekk 67cbc93263 [SPARK-36349][SQL] Disallow ANSI intervals in file-based datasources
### What changes were proposed in this pull request?
In the PR, I propose to ban `YearMonthIntervalType` and `DayTimeIntervalType` at the analysis phase while creating a table using a built-in filed-based datasource or writing a dataset to such datasource. In particular, add the following case:
```scala
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
```
to all methods that override either:
- V2 `FileTable.supportsDataType()`
- V1 `FileFormat.supportDataType()`

### Why are the changes needed?
To improve user experience with Spark SQL, and output a proper error message at the analysis phase.

### Does this PR introduce _any_ user-facing change?
Yes but ANSI interval types haven't released yet. So, for users this is new behavior.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt -Phive-2.3 "test:testOnly *HiveOrcSourceSuite"
```

Closes #33580 from MaxGekk/interval-ban-in-ds.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-03 20:30:20 +03:00
gengjiaan 1deb386727 [SPARK-36175][SQL][FOLLOWUP] Improve the comments for AvroDeserializer/AvroSerializer
### What changes were proposed in this pull request?
This PR follows up https://github.com/apache/spark/pull/33413 and just improve the comments for `AvroDeserializer`/`AvroSerializer`.

### Why are the changes needed?
Make the comment more correctly.

### Does this PR introduce _any_ user-facing change?
'No'.
Just change the comments.

### How was this patch tested?
No need.

Closes #33607 from beliefer/SPARK-36175-followup.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-08-03 15:52:18 +08:00
Erik Krogen be06e4156e [SPARK-35918][AVRO] Unify schema mismatch handling for read/write and enhance error messages
### What changes were proposed in this pull request?
This unifies struct schema mismatch-handling logic between `AvroSerializer` and `AvroDeserializer`, pushing it into `AvroUtils` which is used by both. The newly unified exception-handling logic is updated to provide more contextual information in error messages. When a schema mismatch is found, previously we would only report the first missing field that is found, but there may be any others as well, which can make it less clear what exactly is going wrong. Now, we will report on all missing fields.

### Why are the changes needed?
While working on #31490, we discussed that there is room for improvement in how schema mismatch errors are reported ([comment1](https://github.com/apache/spark/pull/31490#discussion_r659970793), [comment2](https://github.com/apache/spark/pull/31490#issuecomment-869866848)). Additionally, the logic between `AvroSerializer` and `AvroDeserializer` was quite similar for handling these issues, but didn't share common code, causing duplication and making it harder to see exactly what differences existed between the two.

### Does this PR introduce _any_ user-facing change?
Some error messages when matching Catalyst struct schemas against Avro record schemas now include more information.

### How was this patch tested?
New unit tests added.

Closes #33308 from xkrogen/xkrogen-SPARK-35918-avroserde-unify-better-error-messages.

Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-08-02 20:45:23 +08:00
Sean Owen 72615bc551 [SPARK-36362][CORE][SQL][TESTS] Omnibus Java code static analyzer warning fixes
### What changes were proposed in this pull request?

Fix up some minor Java issues:

- Some int*int multiplications that widen to long maybe could overflow
- Unnecessarily non-static inner classes
- Some tests "catch (AssertionError)" and do nothing
- Manual array iteration vs very slightly faster/simpler foreach
- Incorrect generic types that just happen to not cause a runtime error
- Missed opportunities for try-close
- Mutable enums
- .. and a few other minor things

### Why are the changes needed?

Some are minor but clear fixes; some may have a marginal perf impact or avoid a bug later. Also: maybe avoid future PRs to address these one by one.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests

Closes #33594 from srowen/SPARK-36362.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-31 22:35:57 -07:00
gengjiaan 69c454374f [SPARK-36175][SQL] Support TimestampNTZ in Avro data source
### What changes were proposed in this pull request?
Spark support Timestamp in Avro data source now. We should support TimestampNTZ in Avro data source too.
As per the Avro spec https://avro.apache.org/docs/1.10.2/spec.html#Local+timestamp+%28microsecond+precision%29, Spark can convert TimestampNTZ type from/to Avro's Local timestamp type.

### Why are the changes needed?
Docking TimestampNTZ type in Spark and Local timestamp in Avro.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Spark will read/write the TimestampNTZ/Local timestamp in Avro.

### How was this patch tested?
New tests.

Closes #33413 from beliefer/SPARK-36175.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-29 20:34:38 +08:00
Angerszhuuuu 86f44578e5 [SPARK-33865][SPARK-36202][SQL] When HiveDDL, we need check avro schema too
### What changes were proposed in this pull request?
Unify schema check code of FileFormat and check avro schema filed name when CREATE TABLE DDL too

### Why are the changes needed?
Refactor code

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not need

Closes #33441 from AngersZhuuuu/SPARK-36202.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 14:04:24 +08:00
Liang-Chi Hsieh 201566cdd5 [SPARK-36109][SS][TEST] Check data after adding data to topic in KafkaSourceStressSuite
### What changes were proposed in this pull request?

This patch proposes to check data after adding data to topic in `KafkaSourceStressSuite`.

### Why are the changes needed?

The test logic in `KafkaSourceStressSuite` is not stable. For example, https://github.com/apache/spark/runs/3049244904.

Once we add data to a topic and then delete the topic before checking data, the expected answer is different to retrieved data from the sink.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33311 from viirya/stream-assert.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-13 01:21:32 -07:00
Wenchen Fan 8b46e26fc6 [SPARK-34302][SQL][FOLLOWUP] More code cleanup
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/33113, to do some code cleanup:
1. `UnresolvedFieldPosition` doesn't need to include the field name. We can get it through "context" (`AlterTableAlterColumn.column.name`).
2. Run `ResolveAlterTableCommands` in the main resolution batch, so that the column/field resolution is also unified between v1 and v2 commands (same error message).
3. Fail immediately in `ResolveAlterTableCommands` if we can't resolve the field, instead of waiting until `CheckAnalysis`. We don't expect other rules to resolve fields in ALTER  TABLE commands, so failing immediately is simpler and we can remove duplicated code in `CheckAnalysis`.

### Why are the changes needed?

code simplification.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33213 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-06 03:43:42 +08:00
Dongjoon Hyun f9f95686cb [SPARK-35996][BUILD] Setting version to 3.3.0-SNAPSHOT
### What changes were proposed in this pull request?

This PR aims to update `master` branch version to 3.3.0-SNAPSHOT.

### Why are the changes needed?

Start to prepare Apache Spark 3.3.0 and the published snapshot version should not conflict with `branch-3.2`.

### Does this PR introduce _any_ user-facing change?

N/A.

### How was this patch tested?

Pass the CIs.

Closes #33196 from dongjoon-hyun/SPARK-35996.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-02 13:47:36 -07:00
Anton Okolnychyi fceabe2372 [SPARK-35779][SQL] Dynamic filtering for Data Source V2
### What changes were proposed in this pull request?

This PR implemented the proposal per [design doc](https://docs.google.com/document/d/1RfFn2e9o_1uHJ8jFGsSakp-BZMizX1uRrJSybMe2a6M) for SPARK-35779.

### Why are the changes needed?

Spark supports dynamic partition filtering that enables reusing parts of the query to skip unnecessary partitions in the larger table during joins. This optimization has proven to be beneficial for star-schema queries which are common in the industry. Unfortunately, dynamic pruning is currently limited to partition pruning during joins and is only supported for built-in v1 sources. As more and more Spark users migrate to Data Source V2, it is important to generalize dynamic filtering and expose it to all v2 connectors.

Please, see the design doc for more information on this effort.

### Does this PR introduce _any_ user-facing change?

Yes, this PR adds a new optional mix-in interface for `Scan` in Data Source V2.

### How was this patch tested?

This PR comes with tests.

Closes #32921 from aokolnychyi/dynamic-filtering-wip.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-01 17:00:12 -07:00
Erik Krogen 4dd41b9678 [SPARK-34365][AVRO] Add support for positional Catalyst-to-Avro schema matching
### What changes were proposed in this pull request?
Provide the (configurable) ability to perform Avro-to-Catalyst schema field matching using the position of the fields instead of their names. A new `option` is added for the Avro datasource, `positionalFieldMatching`, which instructs `AvroSerializer`/`AvroDeserializer` to perform positional field matching instead of matching by name.

### Why are the changes needed?
This by-name matching is somewhat recent; prior to PR #24635, at least on the write path, schemas were matched by positionally ("structural" comparison). While by-name is better behavior as a default, it will be better to make this configurable by a user. Even at the time that PR #24635 was handled, there was [interest in making this behavior configurable](https://github.com/apache/spark/pull/24635#issuecomment-494205251), but it appears it went unaddressed.

There is precedence for configurability of this behavior as seen in PR #29737, which added this support for ORC. Besides this precedence, the behavior of Hive is to perform matching positionally ([ref](https://cwiki.apache.org/confluence/display/Hive/AvroSerDe#AvroSerDe-WritingtablestoAvrofiles)), so this is behavior that Hadoop/Hive ecosystem users are familiar with.

### Does this PR introduce _any_ user-facing change?
Yes, a new option is provided for the Avro datasource, `positionalFieldMatching`, which provides compatibility with Hive and pre-3.0.0 Spark behavior.

### How was this patch tested?
New unit tests are added within `AvroSuite`, `AvroSchemaHelperSuite`, and `AvroSerdeSuite`; and most of the existing tests within `AvroSerdeSuite` are adapted to perform the same test using by-name and positional matching to ensure feature parity.

Closes #31490 from xkrogen/xkrogen-SPARK-34365-avro-positional-field-matching.

Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-30 16:20:45 +08:00
Dongjoon Hyun 16e50356ee [SPARK-34302][FOLLOWUP][SQL][TESTS] Update jdbc.v2.*IntegrationSuite
### What changes were proposed in this pull request?

This PR aims to update JDBC v2 integration suite by adding `catalogName`.

### Why are the changes needed?

To recover the integration test suite.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the GitHub Action.

Closes #33124 from dongjoon-hyun/SPARK-34302.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-28 23:01:54 -07:00
Kousuke Saruta c562c1674e [SPARK-34320][SQL][FOLLOWUP] Modify V2JDBCTest to follow the change of the error message
### What changes were proposed in this pull request?

This is a followup PR for SPARK-34320 (#32854).
That PR changed the error message of `ALTER TABLE` but `V2JDBCTest` didn't comply with the change.

### Why are the changes needed?

To fix`v2.*JDBCSuite` failure.
```
[info] - SPARK-33034: ALTER TABLE ... add new columns (173 milliseconds)
[info] - SPARK-33034: ALTER TABLE ... drop column *** FAILED *** (126 milliseconds)
[info]   "Cannot delete missing field bad_column in postgresql.alt_table schema: root
[info]    |-- C2: string (nullable = true)
[info]   ; line 1 pos 0;
[info]   'AlterTableDropColumns [unresolvedfieldname(bad_column)]
[info]   +- ResolvedTable org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog7f4b7516, alt_table, JDBCTable(alt_table,StructType(StructField(C2,StringType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions5842301d), [C2#1879]
[info]   " did not contain "Cannot delete missing field bad_column in alt_table schema" (V2JDBCTest.scala:106)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.jdbc.v2.V2JDBCTest.$anonfun$$init$$6(V2JDBCTest.scala:106)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1461)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:305)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:303)
[info]   at org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.withTable(DockerJDBCIntegrationSuite.scala:95)
[info]   at org.apache.spark.sql.jdbc.v2.V2JDBCTest.$anonfun$$init$$5(V2JDBCTest.scala:95)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
[info]   at org.scalatest.Suite.run(Suite.scala:1112)
[info]   at org.scalatest.Suite.run$(Suite.scala:1094)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:62)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:62)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
[info] - SPARK-33034: ALTER TABLE ... update column type (122 milliseconds)
[info] - SPARK-33034: ALTER TABLE ... rename column (93 milliseconds)
[info] - SPARK-33034: ALTER TABLE ... update column nullability (92 milliseconds)
[info] - CREATE TABLE with table comment (38 milliseconds)
[info] - CREATE TABLE with table property (52 milliseconds)
[info] MySQLIntegrationSuite:
[info] - Basic test (61 milliseconds)
[info] - Numeric types (67 milliseconds)
[info] - Date types (59 milliseconds)
[info] - String types (50 milliseconds)
[info] - Basic write test (216 milliseconds)
[info] - query JDBC option (64 milliseconds)
[info] Run completed in 19 minutes, 43 seconds.
[info] Total number of tests run: 89
[info] Suites: completed 14, aborted 0
[info] Tests: succeeded 84, failed 5, canceled 0, ignored 0, pending 0
[info] *** 5 TESTS FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.sql.jdbc.v2.OracleIntegrationSuite
[error] 	org.apache.spark.sql.jdbc.v2.MsSqlServerIntegrationSuite
[error] 	org.apache.spark.sql.jdbc.v2.DB2IntegrationSuite
[error] 	org.apache.spark.sql.jdbc.v2.MySQLIntegrationSuite
[error] 	org.apache.spark.sql.jdbc.v2.PostgresIntegrationSuite
[error] (docker-integration-tests / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 1223 s (20:23), completed Jun 25, 2021 1:31:04 AM
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

docker-integration-tests on GA.

Closes #33074 from sarutak/followup-SPARK-34320.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-06-25 12:58:38 +09:00
Vinod KC 4dabba8f76 [SPARK-35747][CORE] Avoid printing full Exception stack trace, if Hbase/Kafka/Hive services are not running in a secure cluster
### What changes were proposed in this pull request?
In a secure Yarn cluster, even though HBase or Kafka, or Hive services are not used in the user application, yarn client unnecessarily trying to generate  Delegations token from these services. This will add additional delays while submitting spark application in a yarn cluster

 Also during HBase delegation token generation step in the application submit stage,  HBaseDelegationTokenProvider prints a full Exception Stack trace and it causes a noisy warning.
 Apart from printing exception stack trace, Application submission taking more time as it retries connection to HBase master multiple times before it gives up. So, if HBase is not used in the user Applications, it is better to suggest User disable HBase Delegation Token generation.

 This PR aims to avoid printing full Exception Stack by just printing just Exception name and also add a suggestion message to disable `Delegation Token generation` if service is not used in the Spark Application.

 eg: `If HBase is not used, set spark.security.credentials.hbase.enabled to false`

### Why are the changes needed?

To avoid printing full Exception stack trace in WARN log
#### Before the fix
----------------
```
spark-shell --master yarn
.......
.......
21/06/12 14:29:41 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.security.HBaseDelegationTokenProvider.obtainDelegationTokensWithHBaseConn(HBaseDelegationT
okenProvider.scala:93)
        at org.apache.spark.deploy.security.HBaseDelegationTokenProvider.obtainDelegationTokens(HBaseDelegationTokenProvider.
scala:60)
        at org.apache.spark.deploy.security.HadoopDelegationTokenManager$$anonfun$6.apply(HadoopDelegationTokenManager.scala:
166)
        at org.apache.spark.deploy.security.HadoopDelegationTokenManager$$anonfun$6.apply(HadoopDelegationTokenManager.scala:
164)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.deploy.security.HadoopDelegationTokenManager.obtainDelegationTokens(HadoopDelegationTokenManager.
scala:164)
```

#### After  the fix
------------
```
 spark-shell --master yarn

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/06/13 02:10:02 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase due to  java.lang.reflect.InvocationTargetException Retrying to fetch HBase security token with hbase connection parameter.
21/06/13 02:10:40 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase java.lang.reflect.InvocationTargetException. If HBase is not used, set spark.security.credentials.hbase.enabled to false
21/06/13 02:10:47 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
```
### Does this PR introduce _any_ user-facing change?

Yes, in the log, it avoids printing full Exception stack trace.
Instread prints this.
**WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase java.lang.reflect.InvocationTargetException. If HBase is not used, set spark.security.credentials.hbase.enabled to false**

### How was this patch tested?

Tested manually as it can be verified only in a secure cluster

Closes #32894 from vinodkc/br_fix_Hbase_DT_Exception_stack_printing.

Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-23 23:12:02 -07:00
Bruce Robbins 66d5a0049a [SPARK-35817][SQL] Restore performance of queries against wide Avro tables
### What changes were proposed in this pull request?

When creating a record writer in an AvroDeserializer, or creating a struct converter in an AvroSerializer, look up Avro fields using a map rather than scanning the entire list of Avro fields.

### Why are the changes needed?

A query against an Avro table can be quite slow when all are true:

* There are many columns in the Avro file
* The query contains a wide projection
* There are many splits in the input
* Some of the splits are read serially (e.g., less executors than there are tasks)

A write to an Avro table can be quite slow when all are true:

* There are many columns in the new rows
* The operation is creating many files

For example, a single-threaded query against a 6000 column Avro data set with 50K rows and 20 files takes less than a minute with Spark 3.0.1 but over 7 minutes with Spark 3.2.0-SNAPSHOT.

This PR restores the faster time.

For the 1000 column read benchmark:
Before patch: 108447 ms
After patch: 35925 ms
percent improvement: 66%

For the 1000 column write benchmark:
Before patch: 123307
After patch: 42313
percent improvement: 65%

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

* Ran existing unit tests
* Added new unit tests
* Added new benchmarks

Closes #32969 from bersprockets/SPARK-35817.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-23 22:36:56 +08:00
YangJie 6c05459600 [SPARK-35838][BUILD][TESTS] Ensure all modules can be maven test independently in Scala 2.13
### What changes were proposed in this pull request?
Similar to SPARK-35532, the main change of this pr is add `scala-2.13` profile to external/kafka-0-10-sql/pom.xml, external/avro/pom.xml and sql/hive-thriftserver/pom.xml,  the `scala-2.13` profile include dependency on `scala-parallel-collections_2.13`, then all(34) spark modules can maven test independently.

### Why are the changes needed?
Ensure alll(34) spark modules can be maven test independently in Scala 2.13

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass the GitHub Action Scala 2.13 job
- Manual test:

1. Execute
```
dev/change-scala-version.sh 2.13

mvn clean install -DskipTests -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13
```

2. maven test `external/kafka-0-10-sql` module
```
mvn test -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13 -pl external/kafka-0-10-sql
```

**before**

```
Discovery starting.
Discovery completed in 857 milliseconds.
Run starting. Expected test count is: 464
...
KafkaRelationSuiteV2:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- default starting and ending offsets with headers
- timestamp provided for starting and ending
- timestamp provided for starting, offset provided for ending
- timestamp provided for ending, offset provided for starting
- timestamp provided for starting, ending not provided
- timestamp provided for ending, starting not provided
- global timestamp provided for starting and ending
- no matched offset for timestamp - startingOffsets
- preferences on offset related options
- no matched offset for timestamp - endingOffsets
*** RUN ABORTED ***
  java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:182)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:220)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:217)
  ...
  Cause: java.lang.ClassNotFoundException: scala.collection.parallel.TaskSupport
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  ...
```

**After**

```
Run completed in 33 minutes, 51 seconds.
Total number of tests run: 464
Suites: completed 31, aborted 0
Tests: succeeded 464, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

3. maven test `external/avro` module

```
mvn test -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13 -pl external/avro
```

**before**

```
Discovery starting.
Discovery completed in 2 seconds, 765 milliseconds.
Run starting. Expected test count is: 255
AvroReadSchemaSuite:
- append column at the end
- hide column at the end
- append column into middle
- hide column in the middle
- add a nested column at the end of the leaf struct column
- add a nested column in the middle of the leaf struct column
- add a nested column at the end of the middle struct column
- add a nested column in the middle of the middle struct column
- hide a nested column at the end of the leaf struct column
- hide a nested column in the middle of the leaf struct column
- hide a nested column at the end of the middle struct column
- hide a nested column in the middle of the middle struct column
*** RUN ABORTED ***
  java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:182)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:220)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:217)
  ...
  Cause: java.lang.ClassNotFoundException: scala.collection.parallel.TaskSupport
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  ...
```

**After**

```
Run completed in 1 minute, 42 seconds.
Total number of tests run: 255
Suites: completed 12, aborted 0
Tests: succeeded 255, failed 0, canceled 0, ignored 2, pending 0
All tests passed.
```

4.  maven test `sql/hive-thriftserver` module

```
mvn test -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13 -pl sql/hive-thriftserver
```

**before**

```
- union.sql *** FAILED ***
  "1  a
  1 a
  2 b
  2 b" did not contain "Exception" Exception did not match for query #2
  SELECT *
  FROM   (SELECT * FROM t1
          UNION ALL
          SELECT * FROM t1), expected: 1  a
  1 a
  2 b
  2 b, but got: java.sql.SQLException
  org.apache.hive.service.cli.HiveSQLException: Error running query: java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
    at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:38)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:324)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:229)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:229)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:224)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:238)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
  Caused by: java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
    at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
    at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
    at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:182)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:220)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:217)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:178)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:323)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:389)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3719)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2987)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3710)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:774)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3708)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2987)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:299)
    ... 16 more
  Caused by: java.lang.ClassNotFoundException: scala.collection.parallel.TaskSupport
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 40 more (ThriftServerQueryTestSuite.scala:209)
```

**After**

```
Run completed in 29 minutes, 17 seconds.
Total number of tests run: 535
Suites: completed 20, aborted 0
Tests: succeeded 535, failed 0, canceled 0, ignored 17, pending 0
All tests passed.
```

Closes #32994 from LuciferYang/SPARK-35838.

Authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-22 06:31:24 -07:00
Jungtaek Lim 4a6d90e187 [SPARK-35611][SS] Introduce the strategy on mismatched offset for start offset timestamp on Kafka data source
### What changes were proposed in this pull request?

This PR proposes to introduce the strategy on mismatched offset for start offset timestamp on Kafka data source.

Please read the section `Why are the changes needed?` to understand the rationalization of the functionality.

This would be pretty much helpful for the case where there's a skew between partitions and some partitions have older records.

* AS-IS: Spark simply fails the query and end users have to deal with workarounds requiring manual steps.
* TO-BE: Spark will assign the latest offset for these partitions, so that Spark can read newer records from these partitions in further micro-batches.

To retain the existing behavior and also give some help for the proposed "TO-BE" behavior, we'd like to introduce the strategy on mismatched offset for start offset timestamp to let end users choose from them.

The strategy will be added as source option, to ensure end users set the behavior explicitly (otherwise simply "known" default value).

* New source option to be added: startingOffsetsByTimestampStrategy
* Available values: `error` (fail the query as referred as AS-IS), `latest` (set the offset to the latest as referred as TO-BE)

Doc changes are following:

![ES-106042-doc-screenshot-1](https://user-images.githubusercontent.com/1317309/120472697-2c1ba800-c3e1-11eb-884f-f28152168053.png)
![ES-106042-doc-screenshot-2](https://user-images.githubusercontent.com/1317309/120472719-33db4c80-c3e1-11eb-9851-939be8a3ddb7.png)

### Why are the changes needed?

We encountered a real-world case Spark fails the query if some of the partitions don't have matching offset by timestamp.

This is intended behavior to avoid bring unintended output for some cases like:

* timestamp 2 is presented as timestamp-offset, but the some of partitions don't have the record yet
* record with timestamp 1 comes "later" in the following micro-batch

which is possible since Kafka allows to specify the timestamp in record.

Here the unintended output we talked about was the risk of reading record with timestamp 1 in the next micro-batch despite the option specifying timestamp 2.

But for many cases end users just suppose timestamp is increasing monotonically with wall clocks are all in sync, and current behavior blocks these cases to make progress.

### Does this PR introduce _any_ user-facing change?

Yes, but not a breaking change. It's up to end users to choose the behavior which the default value is "error" (current behavior). And it's a source option (not config) so they need to explicitly set the behavior to let the functionality takes effect.

### How was this patch tested?

New UTs.

Closes #32747 from HeartSaVioR/SPARK-35611.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-06-21 00:37:42 -07:00
Satish Gopalani 2a331177ba [SPARK-35312][SS] Introduce new Option in Kafka source to specify minimum number of records to read per trigger
### What changes were proposed in this pull request?
This patch introduces a new option to specify the minimum number of offsets to read per trigger i.e. minOffsetsPerTrigger and maxTriggerDelay to avoid the infinite wait for the trigger.

This new option will allow skipping trigger/batch when the number of records available in Kafka is low. This is a very useful feature in cases where we have a sudden burst of data at certain intervals in a day and data volume is low for the rest of the day.
'maxTriggerDelay' option will help to avoid cases of infinite delay in scheduling trigger and the trigger will happen irrespective of records available if the maxTriggerDelay time exceeds the last trigger. It would be an optional parameter with a default value of 15 mins. This option will be only applicable if minOffsetsPerTrigger is set.

minOffsetsPerTrigger option would be optional of course, but once specified it would take precedence over maxOffestsPerTrigger which will be honored only after minOffsetsPerTrigger is satisfied.

### Why are the changes needed?
There are many scenarios where there is a sudden burst of data at certain intervals in a day and data volume is low for the rest of the day. Tunning such jobs is difficult as decreasing trigger processing time increasing the number of batches and hence cluster resource usage and adds to small file issues. Increasing trigger processing time adds consumer lag. This patch tries to address this issue.

### How was this patch tested?
This patch was tested by adding test cases as well as manually on a cluster where the job was running for a full one day with a data burst happening once a day.
Here is the picture of databurst and hence consumer lag:
<img width="1198" alt="Screenshot 2021-04-29 at 11 39 35 PM" src="https://user-images.githubusercontent.com/1044003/116997587-9b2ab180-acfa-11eb-91fd-524802ce3316.png">

This is how the job behaved at burst time running every 4.5 mins (which is the specified trigger time):
<img width="1154" alt="Burst Time" src="https://user-images.githubusercontent.com/1044003/116997919-12f8dc00-acfb-11eb-9b0a-98387fc67560.png">

This is job behavior during the non-burst time where it is skipping 2 to 3 triggers and running once every 9 to 13.5 mins
<img width="1154" alt="Non Burst Time" src="https://user-images.githubusercontent.com/1044003/116998244-8b5f9d00-acfb-11eb-8340-33d47149ef81.png">

Here are some more stats from the two-run i.e. one normal run and the other with minOffsetsperTrigger set:

| Run | Data Size | Number of Batch Runs | Number of Files |
| ------------- | ------------- |------------- |------------- |
| Normal Run | 54.2 GB | 320 | 21968 |
| Run with minOffsetsperTrigger | 54.2 GB | 120 | 12104 |

Closes #32653 from satishgopalani/SPARK-35312.

Authored-by: Satish Gopalani <satish.gopalani@pubmatic.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-08 23:48:09 +09:00
Kousuke Saruta 08e6f633b5 [SPARK-35577][TESTS] Allow to log container output for docker integration tests
### What changes were proposed in this pull request?

This PR proposes to add a feature that logs container output for docker integration tests.
With this change, if we run test with SBT, we will have like the following log in `unit-tests.log`.
```
===== CONTAINER LOGS FOR container Id: 3360c98eb28337d8b217fb614e47bf49aafa18a6cb60ecadf3178aee0c663021 =====
21/05/31 20:54:56.433 pool-1-thread-1 INFO PostgresIntegrationSuite: The files belonging to this database system will be owned by user "postgres".
This user must also own the server process.

The database cluster will be initialized with locale "en_US.utf8".
The default database encoding has accordingly been set to "UTF8".
The default text search configuration will be set to "english".

Data page checksums are disabled.

fixing permissions on existing directory /var/lib/postgresql/data ... ok
creating subdirectories ... ok
selecting dynamic shared memory implementation ... posix
selecting default max_connections ... 100
selecting default shared_buffers ... 128MB
selecting default time zone ... UTC
creating configuration files ... ok
running bootstrap script ... ok
sh: locale: not found
2021-05-31 11:54:49.892 UTC [29] WARNING:  no usable system locales were found
performing post-bootstrap initialization ... ok
initdb: warning: enabling "trust" authentication for local connections
You can change this by editing pg_hba.conf or using the option -A, or
--auth-local and --auth-host, the next time you run initdb.
syncing data to disk ... ok

Success. You can now start the database server using:

    pg_ctl -D /var/lib/postgresql/data -l logfile start

waiting for server to start....2021-05-31 11:54:50.284 UTC [34] LOG:  starting PostgreSQL 13.0 on x86_64-pc-linux-musl, compiled by gcc (Alpine 9.3.0) 9.3.0, 64-bit
2021-05-31 11:54:50.287 UTC [34] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
2021-05-31 11:54:50.296 UTC [35] LOG:  database system was shut down at 2021-05-31 11:54:50 UTC
2021-05-31 11:54:50.301 UTC [34] LOG:  database system is ready to accept connections
 done
server started

/usr/local/bin/docker-entrypoint.sh: ignoring /docker-entrypoint-initdb.d/*

waiting for server to shut down....2021-05-31 11:54:50.363 UTC [34] LOG:  received fast shutdown request
2021-05-31 11:54:50.366 UTC [34] LOG:  aborting any active transactions
2021-05-31 11:54:50.368 UTC [34] LOG:  background worker "logical replication launcher" (PID 41) exited with exit code 1
2021-05-31 11:54:50.368 UTC [36] LOG:  shutting down
2021-05-31 11:54:50.402 UTC [34] LOG:  database system is shut down
 done
server stopped

PostgreSQL init process complete; ready for start up.

2021-05-31 11:54:50.510 UTC [1] LOG:  starting PostgreSQL 13.0 on x86_64-pc-linux-musl, compiled by gcc (Alpine 9.3.0) 9.3.0, 64-bit
2021-05-31 11:54:50.510 UTC [1] LOG:  listening on IPv4 address "0.0.0.0", port 5432
2021-05-31 11:54:50.510 UTC [1] LOG:  listening on IPv6 address "::", port 5432
2021-05-31 11:54:50.517 UTC [1] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
2021-05-31 11:54:50.526 UTC [43] LOG:  database system was shut down at 2021-05-31 11:54:50 UTC
2021-05-31 11:54:50.531 UTC [1] LOG:  database system is ready to accept connections
2021-05-31 11:54:54.226 UTC [54] ERROR:  relation "public.barcopy" does not exist at character 15
2021-05-31 11:54:54.226 UTC [54] STATEMENT:  SELECT 1 FROM public.barcopy LIMIT 1
2021-05-31 11:54:54.610 UTC [59] ERROR:  relation "public.barcopy2" does not exist at character 15
2021-05-31 11:54:54.610 UTC [59] STATEMENT:  SELECT 1 FROM public.barcopy2 LIMIT 1
2021-05-31 11:54:54.934 UTC [63] ERROR:  relation "shortfloat" does not exist at character 15
2021-05-31 11:54:54.934 UTC [63] STATEMENT:  SELECT 1 FROM shortfloat LIMIT 1
2021-05-31 11:54:55.675 UTC [75] ERROR:  relation "byte_to_smallint_test" does not exist at character 15
2021-05-31 11:54:55.675 UTC [75] STATEMENT:  SELECT 1 FROM byte_to_smallint_test LIMIT 1

21/05/31 20:54:56.434 pool-1-thread-1 INFO PostgresIntegrationSuite:

===== END OF CONTAINER LOGS FOR container Id: 3360c98eb28337d8b217fb614e47bf49aafa18a6cb60ecadf3178aee0c663021 =====
```

### Why are the changes needed?

If we have container logs, it's useful to debug especially for GA.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I run docker integration tests and got logs. The example shown above is for `PostgresIntegrationSuite`.

Closes #32715 from sarutak/log-docker-container.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-01 22:44:48 +09:00
yangjie01 16d9de815e [SPARK-35532][TESTS] Ensure mllib and kafka-0-10 module can be maven test independently in Scala 2.13
### What changes were proposed in this pull request?
Before this pr, when we execute maven test command to test `mllib` and `kafka-0-10` module independently, there are some Java UTs failed, the key error messages are as follows:

```
java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
```

and

```
java.lang.NoClassDefFoundError: scala/collection/parallel/immutable/ParVector
```

The UTs need `scala-parallel-collections_2.13`,  but it not in classpath when we run `mvn test -pl mllib -Pscala-2.13` and `mvn test -pl external/kafka-0-10 -Pscala-2.13`.

So the main change of this pr is add `scala-2.13` profile to `mllib/pom.xml` and `external/kafka-0-10/pom.xml`, the `scala-2.13` profile include dependency on `scala-parallel-collections_2.13`, then these two modules can maven test independently.

### Why are the changes needed?
Ensure mllib and kafka-0-10 module can be maven test independently in Scala 2.13

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the GitHub Action Scala 2.13 job
- Manual test:

1. Execute
```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13
```

2. Execute

```
mvn test -pl mllib -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13
```

**Before**

6 Java UTs failed:

```
[ERROR] Errors:
[ERROR]   JavaStreamingLogisticRegressionSuite.javaAPI:78 » TestFailed 20005 was not les...
[ERROR]   JavaStreamingKMeansSuite.javaAPI:78 » TestFailed 20040 was not less than 20000...
[ERROR]   JavaPrefixSpanSuite.runPrefixSpan:45 » NoClassDefFound scala/collection/parall...
[ERROR]   JavaPrefixSpanSuite.runPrefixSpanSaveLoad:67 » NoClassDefFound scala/collectio...
[ERROR]   JavaStreamingLinearRegressionSuite.javaAPI:77 » TestFailed 20014 was not less ...
[ERROR]   JavaStatisticsSuite.streamingTest:112 » TestFailed 20043 was not less than 200...
[INFO]
[ERROR] Tests run: 122, Failures: 0, Errors: 6, Skipped: 0
```

**After**

```
[INFO] Tests run: 122, Failures: 0, Errors: 0, Skipped: 0

Run completed in 28 minutes, 32 seconds.
Total number of tests run: 1654
Suites: completed 208, aborted 0
Tests: succeeded 1654, failed 0, canceled 0, ignored 7, pending 0
All tests passed.
```

3. Execute

```
mvn test -pl external/kafka-0-10 -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13
```

**Before**

2 Java UTs failed:

```
[ERROR] Errors:
[ERROR] org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite.testKafkaStream
[ERROR]   Run 1: JavaDirectKafkaStreamSuite.testKafkaStream:170 expected:<[topic1-1, topic1-2, topic2-1, topic1-3, topic2-2, topic2-3]> but was:<[]>
[ERROR]   Run 2: JavaDirectKafkaStreamSuite.tearDown:57 » NoClassDefFound scala/collection/para...
[ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0
```

**After**

```
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0

Run completed in 1 minute, 3 seconds.
Total number of tests run: 21
Suites: completed 4, aborted 0
Tests: succeeded 21, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

```

Closes #32676 from LuciferYang/mllib-kafka-mvn-test.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-30 16:36:17 -07:00
Kousuke Saruta 2de19e460b [SPARK-35483][INFRA] Add docker-integration-tests to run-tests.py and GA
### What changes were proposed in this pull request?

This PR proposes to add `docker-integratin-tests` to `run-tests.py` and GA.
Once #32631 was merged but there was a lack of consideration.

Diff between this change and 692d95d145 merged in #32631 is as follows.

```
       if: github.repository != 'apache/spark'
       id: sync-branch
       run: |
+        apache_spark_ref=`git rev-parse HEAD`
         git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/}
         git -c user.name='Apache Spark Test Account' -c user.email='sparktestaccgmail.com' merge --no-commit --progress --squash FETCH_HEAD
         git -c user.name='Apache Spark Test Account' -c user.email='sparktestaccgmail.com' commit -m "Merged commit"
+        echo "::set-output name=APACHE_SPARK_REF::$apache_spark_ref"
     - name: Cache Scala, SBT and Maven
       uses: actions/cachev2
       with:
```

### Why are the changes needed?

CI for `docker-integration-tests` is absent for now.

### Does this PR introduce _any_ user-facing change?

GA.

### How was this patch tested?

Closes #32691 from sarutak/docker-integration-test-ga-take2.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-28 16:54:47 +09:00
Hyukjin Kwon d189cf75f9 Revert "[SPARK-35483][INFRA] Add docker-integration-tests to run-tests.py and GA"
This reverts commit 0a74ad66b3.
2021-05-28 14:29:12 +09:00
Kousuke Saruta 0a74ad66b3 [SPARK-35483][INFRA] Add docker-integration-tests to run-tests.py and GA
### What changes were proposed in this pull request?

This PR proposes to add `docker-integratin-tests` to `run-tests.py` and GA.
`doker-integration-tests` can't run if docker is not installed so it run only if `docker-integration-tests` is specified with `--module`.

### Why are the changes needed?

CI for `docker-integration-tests` is absent for now.

### Does this PR introduce _any_ user-facing change?

GA.

### How was this patch tested?

Closes #32631 from sarutak/docker-integration-test-ga.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-28 07:56:37 +09:00
Kousuke Saruta 116a97e153 [SPARK-35501][SQL][TESTS] Add a feature for removing pulled container image for docker integration tests
### What changes were proposed in this pull request?

This PR adds a feature for removing pulled container image after every docker integration test finish.
This feature is enabled by the new propoerty `spark.tes.docker.removePulledImage`.

### Why are the changes needed?

For idempotent.
I'm trying to add docker integration tests to GA in SPARK-35483 (#32631) but I noticed that `jdbc.OracleIntegrationSuite` consistently fails(https://github.com/sarutak/spark/runs/2646707235?check_suite_focus=true).
I investigated the reason and I found it's short of the storage capacity of the host on GA.
```
 ORACLE PASSWORD FOR SYS AND SYSTEM: oracle
The location '/opt/oracle' specified for database files has insufficient space.
Database creation needs at least '4.5GB' disk space.
Specify a different database file destination that has enough space in the configuration file '/etc/sysconfig/oracle-xe-18c.conf'.
mv: cannot stat '/opt/oracle/product/18c/dbhomeXE/dbs/spfileXE.ora': No such file or directory
mv: cannot stat '/opt/oracle/product/18c/dbhomeXE/dbs/orapwXE': No such file or directory
ORACLE_HOME = [/home/oracle] ? ORACLE_BASE environment variable is not being set since this
information is not available for the current user ID .
You can set ORACLE_BASE manually if it is required.
Resetting ORACLE_BASE to its previous value or ORACLE_HOME
The Oracle base remains unchanged with value /opt/oracle
#####################################
########### E R R O R ###############
DATABASE SETUP WAS NOT SUCCESSFUL!
Please check output for further info!
########### E R R O R ###############
#####################################
The following output is now a tail of the alert.log:
tail: cannot open '/opt/oracle/diag/rdbms/*/*/trace/alert*.log' for reading: No such file or directory
tail: no files remaining
```

With this feature, pulled container image is removed and keep the capacity for `jdbc.OracleIntegrationSuite` in GA.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I confirmed the following things.

* A container image which is absent in the local repository is removed after test finished if `spark.test.container.removePulledImage` is `true`.
* A container image which is present in the local repository is not removed after the finished even if `spark.test.container.removePulledImage` is `true`.
* A container image is not removed regardless of presence of the container image in the local repository even if `spark.test.container.removePulledImage` is `true`.

Closes #32652 from sarutak/docker-image-rm.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-26 17:24:29 +09:00
Jungtaek Lim a57afd442c [SPARK-29223][SQL][SS] New option to specify timestamp on all subscribing topic-partitions in Kafka source
### What changes were proposed in this pull request?

This patch is a follow-up of SPARK-26848 (#23747). In SPARK-26848, we decided to open possibility to let end users set individual timestamp per partition. But in many cases, specifying timestamp represents the intention that we would want to go back to specific timestamp and reprocess records, which should be applied to all topics and partitions.

This patch proposes to provide a way to set a global timestamp across topic-partitions which the source is subscribing to, so that end users can set all offsets by specific timestamp easily. To provide the way to config the timestamp easier, the new options only receive "a" timestamp for start/end timestamp.

New options introduced in this PR:

* startingTimestamp
* endingTimestamp

All two options receive timestamp as string.

There're priorities for options regarding starting/ending offset as we will have three options for start offsets and another three options for end offsets. Priorities are following:

* starting offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets
* ending offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets

### Why are the changes needed?

Existing option to specify timestamp as offset is quite verbose if there're a lot of partitions across topics. Suppose there're 100s of partitions in a topic, the json should contain 100s of times of the same timestamp.

Also, the number of partitions can also change, which requires either:

* fixing the code if the json is statically created
* introducing the dependencies on Kafka client and deal with Kafka API on crafting json programmatically

Both approaches are even not "acceptable" if we're dealing with ad-hoc query; anyone doesn't want to write the code more complicated than the query itself. Flink [provides the option](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-start-position-configuration) to specify a timestamp for all topic-partitions like this PR, and even doesn't provide the option to specify the timestamp per topic-partition.

With this PR, end users are only required to provide a single timestamp value. No more complicated JSON format end users need to know about the structure.

### Does this PR introduce _any_ user-facing change?

Yes, this PR introduces two new options, described in above section.

Doc changes are following:

![스크린샷 2021-05-21 오후 12 01 02](https://user-images.githubusercontent.com/1317309/119076244-3034e680-ba2d-11eb-8323-0e227932d2e5.png)
![스크린샷 2021-05-21 오후 12 01 12](https://user-images.githubusercontent.com/1317309/119076255-35923100-ba2d-11eb-9d79-538a7f9ee738.png)
![스크린샷 2021-05-21 오후 12 01 24](https://user-images.githubusercontent.com/1317309/119076264-39be4e80-ba2d-11eb-8265-ac158f55c360.png)
![스크린샷 2021-05-21 오후 12 06 01](https://user-images.githubusercontent.com/1317309/119076271-3d51d580-ba2d-11eb-98ea-35fd72b1bbfc.png)

### How was this patch tested?

New UTs covering new functionalities. Also manually tested via simple batch & streaming queries.

Closes #32609 from HeartSaVioR/SPARK-29223-v2.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-05-25 21:43:49 +09:00
Kousuke Saruta 1a43415d8d [SPARK-35226][SQL][FOLLOWUP] Fix test added in SPARK-35226 for DB2KrbIntegrationSuite
### What changes were proposed in this pull request?

This PR fixes an test added in SPARK-35226 (#32344).

### Why are the changes needed?

`SELECT 1` seems non-valid query for DB2.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

DB2KrbIntegrationSuite passes on my laptop.

I also confirmed all the KrbIntegrationSuites pass with the following command.
```
build/sbt -Phive -Phive-thriftserver -Pdocker-integration-tests "testOnly org.apache.spark.sql.jdbc.*KrbIntegrationSuite"
```

Closes #32632 from sarutak/followup-SPARK-35226.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-22 22:31:43 -07:00
Kousuke Saruta a59a214610 [MINOR][FOLLOWUP] Update SHA for the oracle docker image
### What changes were proposed in this pull request?

This PR updates SHA for the oracle docker image in the comment in `OracleIntegrationSuite` and `v2.OracleIntegrationSuite`.
The SHA for the latest image is `3f422c4a35b423dfcdbcc57a84f01db6c82eb6c1`

### Why are the changes needed?

The script name for creating the oracle docker image is changed in #32629, following the latest image so we also need to update the corresponding SHA.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The value is from `git log`.

Closes #32630 from sarutak/followup-oracle-script-name.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-22 19:52:24 -07:00
Kousuke Saruta 0549caf07a [MINOR][SQL] Change the script name for creating oracle docker image
### What changes were proposed in this pull request?

This PR changes the name in the comment in `jdbc.OracleIntegrationSuite` and `v2.OracleIntegrationSuite`.
The script is for creating oracle docker image.

### Why are the changes needed?

The name of the script is `buildContainerImage`, not `buildDockerImage` now.
- d918f5a4c6 (diff-be303ab32e74192aca829e5ea259a0aec07aac23a6049120fb337ec4efa601b0)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I confirmed that I can build the image with `./buildContainerImage.sh -v 18.4.0 -x`.

Closes #32629 from sarutak/change-oracle-container-script-name.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-21 22:58:44 -07:00
Max Gekk 2b08070e79 [SPARK-35427][SQL][TESTS] Check the EXCEPTION rebase mode for Avro/Parquet
### What changes were proposed in this pull request?
Add tests to check the `EXCEPTION` rebase mode explicitly in the datasources:
- Parquet: `DATE` type and `TIMESTAMP`: `INT96`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS`
- Avro: `DATE` type and `TIMESTAMP`: `timestamp-millis` and `timestamp-micros`.

### Why are the changes needed?
1. To improve test coverage
2. The `EXCEPTION` rebase mode should be checked independently from the default settings.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *AvroV2Suite"
$ build/sbt "test:testOnly *ParquetRebaseDatetimeV1Suite"
```

Closes #32574 from MaxGekk/test-rebase-exception.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 06:18:06 +00:00
Max Gekk 2bd32548f5 [SPARK-35459][SQL][TESTS] Move AvroRowReaderSuite to a separate file
### What changes were proposed in this pull request?
Move `AvroRowReaderSuite` out from `AvroSuite.scala` and place it to `AvroRowReaderSuite.scala`.

### Why are the changes needed?
To improve code maintenance. Usually, independent test suites are placed to separate files.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *AvroRowReaderSuite"
$ build/sbt "test:testOnly *AvroV1Suite"
$ build/sbt "test:testOnly *AvroV2Suite"
```

Closes #32607 from MaxGekk/move-AvroRowReaderSuite.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-20 20:04:10 +09:00
Linhong Liu 6aa2594c6b [SPARK-35366][SQL] Avoid using deprecated buildForBatch and buildForStreaming
### What changes were proposed in this pull request?
Currently, in DSv2, we are still using the deprecated `buildForBatch` and `buildForStreaming`.
This PR implements the `build`, `toBatch`, `toStreaming` interfaces to replace the deprecated ones.

### Why are the changes needed?
Code refactor

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exsting UT

Closes #32497 from linhongliu-db/dsv2-writer.

Lead-authored-by: Linhong Liu <linhong.liu@databricks.com>
Co-authored-by: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-13 17:23:08 +00:00
Yijia Cui bbdbe0f734 [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay
### What changes were proposed in this pull request?
This pull request proposes a new API for streaming sources to signal that they can report metrics, and adds a use case to support Kafka micro batch stream to report the stats of # of offsets for the current offset falling behind the latest.

A public interface is added.

`metrics`: returns the metrics reported by the streaming source with given offset.

### Why are the changes needed?
The new API can expose any custom metrics for the "current" offset for streaming sources. Different from #31398, this PR makes metrics available to user through progress report, not through spark UI. A use case is that people want to know how the current offset falls behind the latest offset.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test for Kafka micro batch source v2 are added to test the Kafka use case.

Closes #31944 from yijiacui-db/SPARK-34297.

Authored-by: Yijia Cui <yijia.cui@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-05-05 17:26:07 +09:00
Kousuke Saruta 529b875901 [SPARK-35226][SQL] Support refreshKrb5Config option in JDBC datasources
### What changes were proposed in this pull request?

This PR proposes to introduce a new JDBC option `refreshKrb5Config` which allows to reflect the change of `krb5.conf`.

### Why are the changes needed?

In the current master, JDBC datasources can't accept `refreshKrb5Config` which is defined in `Krb5LoginModule`.
So even if we change the `krb5.conf` after establishing a connection, the change will not be reflected.

The similar issue happens when we run multiple `*KrbIntegrationSuites` at the same time.
`MiniKDC` starts and stops every KerberosIntegrationSuite and different port number is recorded to `krb5.conf`.
Due to `SecureConnectionProvider.JDBCConfiguration` doesn't take `refreshKrb5Config`, KerberosIntegrationSuites except the first running one see the wrong port so those suites fail.
You can easily confirm with the following command.
```
build/sbt -Phive Phive-thriftserver -Pdocker-integration-tests "testOnly org.apache.spark.sql.jdbc.*KrbIntegrationSuite"
```
### Does this PR introduce _any_ user-facing change?

Yes. Users can set `refreshKrb5Config` to refresh krb5 relevant configuration.

### How was this patch tested?

New test.

Closes #32344 from sarutak/kerberos-refresh-issue.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-04-29 13:55:53 +09:00
Cheng Su 7f51106c0d [SPARK-26164][SQL] Allow concurrent writers for writing dynamic partitions and bucket table
### What changes were proposed in this pull request?

This is a re-proposal of https://github.com/apache/spark/pull/23163. Currently spark always requires a [local sort](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L188) before writing to output table with dynamic partition/bucket columns. The sort can be unnecessary if cardinality of partition/bucket values is small, and can be avoided by keeping multiple output writers concurrently.

This PR introduces a config `spark.sql.maxConcurrentOutputFileWriters` (which disables this feature by default), where user can tune the maximal number of concurrent writers. The config is needed here as we cannot keep arbitrary number of writers in task memory which can cause OOM (especially for Parquet/ORC vectorization writer).

The feature is to first use concurrent writers to write rows. If the number of writers exceeds the above config specified limit. Sort rest of rows and write rows one by one (See `DynamicPartitionDataConcurrentWriter.writeWithIterator()`).

In addition, interface `WriteTaskStatsTracker` and its implementation `BasicWriteTaskStatsTracker` are also changed because previously they are relying on the assumption that only one writer is active for writing dynamic partitions and bucketed table.

### Why are the changes needed?

Avoid the sort before writing output for dynamic partitioned query and bucketed table.
Help improve CPU and IO performance for these queries.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `DataFrameReaderWriterSuite.scala`.

Closes #32198 from c21/writer.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-04-27 05:37:08 +00:00
Liang-Chi Hsieh bdac19184a [SPARK-35230][SQL] Move custom metric classes to proper package
### What changes were proposed in this pull request?

This patch moves DS v2 custom metric classes to `org.apache.spark.sql.connector.metric` package. Moving `CustomAvgMetric` and `CustomSumMetric` to above package and make them as public java abstract class too.

### Why are the changes needed?

`CustomAvgMetric` and `CustomSumMetric`  should be public APIs for developers to extend. As there are a few metric classes, we should put them together in one package.

### Does this PR introduce _any_ user-facing change?

No, dev only and they are not released yet.

### How was this patch tested?

Unit tests.

Closes #32348 from viirya/move-custom-metric-classes.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-04-26 07:19:36 -07:00
Dongjoon Hyun b108e7fff3 [SPARK-33913][SS] Upgrade Kafka to 2.8.0
### What changes were proposed in this pull request?

This PR aims to upgrade Kafka client to 2.8.0.
Note that Kafka 2.8.0 uses ZSTD JNI 1.4.9-1 like Apache Spark 3.2.0.

### Why are the changes needed?

This will bring the latest client-side improvement and bug fixes like the following examples.

- KAFKA-10631 ProducerFencedException is not Handled on Offest Commit
- KAFKA-10134 High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
- KAFKA-12193 Re-resolve IPs when a client is disconnected
- KAFKA-10090 Misleading warnings: The configuration was supplied but isn't a known config
- KAFKA-9263 The new hw is added to incorrect log when  ReplicaAlterLogDirsThread is replacing log
- KAFKA-10607 Ensure the error counts contains the NONE
- KAFKA-10458 Need a way to update quota for TokenBucket registered with Sensor
- KAFKA-10503 MockProducer doesn't throw ClassCastException when no partition for topic

**RELEASE NOTE**
- https://downloads.apache.org/kafka/2.8.0/RELEASE_NOTES.html
- https://downloads.apache.org/kafka/2.7.0/RELEASE_NOTES.html

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with the existing tests because this is a dependency change.

Closes #32325 from dongjoon-hyun/SPARK-33913.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2021-04-25 16:20:22 +09:00
Liang-Chi Hsieh b2a2b5d820 [SPARK-34297][SQL][SS] Add metrics for data loss and offset out range for KafkaMicroBatchStream
### What changes were proposed in this pull request?

This patch proposes to add a couple of metrics in scan node for Kafka batch streaming query.

### Why are the changes needed?

When testing SS, I found it is hard to track data loss of SS reading from Kafka. The micro batch scan node has only one metric, number of output rows. Users have no idea how many offsets to fetch are out of Kafka, how many times data loss happens. These metrics are important for users to know the quality of SS query running.

### Does this PR introduce _any_ user-facing change?

Yes, adding two metrics to micro batch scan node for Kafka batch streaming.

### How was this patch tested?

Currently I tested on internal cluster with Kafka:

<img width="1193" alt="Screen Shot 2021-04-22 at 7 16 29 PM" src="https://user-images.githubusercontent.com/68855/115808460-61bf8100-a39f-11eb-99a9-65d22c3f5fb0.png">

I was trying to add unit test. But as our batch streaming query disallows to specify ending offsets. If I only specify an out-of-range starting offset, when we get offset range in `getRanges`,  any negative size range will be filtered out. So it cannot actually test the case of fetched non-existing offset.

Closes #31398 from viirya/micro-batch-metrics.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-04-23 13:56:53 -07:00
Kousuke Saruta ba92de0ae5 [SPARK-34843][SQL][FOLLOWUP] Fix a test failure in OracleIntegrationSuite
### What changes were proposed in this pull request?

This PR fixes a test failure in `OracleIntegrationSuite`.
After SPARK-34843 (#31965), the way to divide partitions is changed and `OracleIntegrationSuites` is affected.
```
[info] - SPARK-22814 support date/timestamp types in partitionColumn *** FAILED *** (230 milliseconds)
[info]   Set(""D" < '2018-07-11' or "D" is null", ""D" >= '2018-07-11' AND "D" < '2018-07-15'", ""D" >= '2018-07-15'") did not equal Set(""D" < '2018-07-10' or "D" is null", ""D" >= '2018-07-10' AND "D" < '2018-07-14'", ""D" >= '2018-07-14'") (OracleIntegrationSuite.scala:448)
[info]   Analysis:
[info]   Set(missingInLeft: ["D" < '2018-07-10' or "D" is null, "D" >= '2018-07-10' AND "D" < '2018-07-14', "D" >= '2018-07-14'], missingInRight: ["D" < '2018-07-11' or "D" is null, "D" >= '2018-07-11' AND "D" < '2018-07-15', "D" >= '2018-07-15'])
```

### Why are the changes needed?

To follow the previous change.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The modified test.

Closes #32186 from sarutak/fix-oracle-date-error.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-04-15 07:07:34 -07:00
Ali Afroozeh 0945baf906 [SPARK-34989] Improve the performance of mapChildren and withNewChildren methods
### What changes were proposed in this pull request?
One of the main performance bottlenecks in query compilation is overly-generic tree transformation methods, namely `mapChildren` and `withNewChildren` (defined in `TreeNode`). These methods have an overly-generic implementation to iterate over the children and rely on reflection to create new instances. We have observed that, especially for queries with large query plans, a significant amount of CPU cycles are wasted in these methods. In this PR we make these methods more efficient, by delegating the iteration and instantiation to concrete node types. The benchmarks show that we can expect significant performance improvement in total query compilation time in queries with large query plans (from 30-80%) and about 20% on average.

#### Problem detail
The `mapChildren` method in `TreeNode` is overly generic and costly. To be more specific, this method:
- iterates over all the fields of a node using Scala’s product iterator. While the iteration is not reflection-based, thanks to the Scala compiler generating code for `Product`, we create many anonymous functions and visit many nested structures (recursive calls).
The anonymous functions (presumably compiled to Java anonymous inner classes) also show up quite high on the list in the object allocation profiles, so we are putting unnecessary pressure on GC here.
- does a lot of comparisons. Basically for each element returned from the product iterator, we check if it is a child (contained in the list of children) and then transform it. We can avoid that by just iterating over children, but in the current implementation, we need to gather all the fields (only transform the children) so that we can instantiate the object using the reflection.
- creates objects using reflection, by delegating to the `makeCopy` method, which is several orders of magnitude slower than using the constructor.

#### Solution
The proposed solution in this PR is rather straightforward: we rewrite the `mapChildren` method using the `children` and `withNewChildren` methods. The default `withNewChildren` method suffers from the same problems as `mapChildren` and we need to make it more efficient by specializing it in concrete classes.  Similar to how each concrete query plan node already defines its children, it should also define how they can be constructed given a new list of children. Actually, the implementation is quite simple in most cases and is a one-liner thanks to the copy method present in Scala case classes. Note that we cannot abstract over the copy method, it’s generated by the compiler for case classes if no other type higher in the hierarchy defines it. For most concrete nodes, the implementation of `withNewChildren` looks like this:
```
override def withNewChildren(newChildren: Seq[LogicalPlan]): LogicalPlan = copy(children = newChildren)
```
The current `withNewChildren` method has two properties that we should preserve:

- It returns the same instance if the provided children are the same as its children, i.e., it preserves referential equality.
- It copies tags and maintains the origin links when a new copy is created.

These properties are hard to enforce in the concrete node type implementation. Therefore, we propose a template method `withNewChildrenInternal` that should be rewritten by the concrete classes and let the `withNewChildren` method take care of referential equality and copying:
```
override def withNewChildren(newChildren: Seq[LogicalPlan]): LogicalPlan = {
 if (childrenFastEquals(children, newChildren)) {
   this
 } else {
   CurrentOrigin.withOrigin(origin) {
     val res = withNewChildrenInternal(newChildren)
     res.copyTagsFrom(this)
     res
   }
 }
}
```

With the refactoring done in a previous PR (https://github.com/apache/spark/pull/31932) most tree node types fall in one of the categories of `Leaf`, `Unary`, `Binary` or `Ternary`. These traits have a more efficient implementation for `mapChildren` and define a more specialized version of `withNewChildrenInternal` that avoids creating unnecessary lists. For example, the `mapChildren` method in `UnaryLike` is defined as follows:
```
  override final def mapChildren(f: T => T): T = {
    val newChild = f(child)
    if (newChild fastEquals child) {
      this.asInstanceOf[T]
    } else {
      CurrentOrigin.withOrigin(origin) {
        val res = withNewChildInternal(newChild)
        res.copyTagsFrom(this.asInstanceOf[T])
        res
      }
    }
  }
```

#### Results
With this PR, we have observed significant performance improvements in query compilation time, more specifically in the analysis and optimization phases. The table below shows the TPC-DS queries that had more than 25% speedup in compilation times. Biggest speedups are observed in queries with large query plans.
| Query  | Speedup |
| ------------- | ------------- |
|q4    |29%|
|q9    |81%|
|q14a  |31%|
|q14b  |28%|
|q22   |33%|
|q33   |29%|
|q34   |25%|
|q39   |27%|
|q41   |27%|
|q44   |26%|
|q47   |28%|
|q48   |76%|
|q49   |46%|
|q56   |26%|
|q58   |43%|
|q59   |46%|
|q60   |50%|
|q65   |59%|
|q66   |46%|
|q67   |52%|
|q69   |31%|
|q70   |30%|
|q96   |26%|
|q98   |32%|

#### Binary incompatibility
Changing the `withNewChildren` in `TreeNode` breaks the binary compatibility of the code compiled against older versions of Spark because now it is expected that concrete `TreeNode` subclasses all implement the `withNewChildrenInternal` method. This is a problem, for example, when users write custom expressions. This change is the right choice, since it forces all newly added expressions to Catalyst implement it in an efficient manner and will prevent future regressions.
Please note that we have not completely removed the old implementation and renamed it to `legacyWithNewChildren`. This method will be removed in the future and for now helps the transition. There are expressions such as `UpdateFields` that have a complex way of defining children. Writing `withNewChildren` for them requires refactoring the expression. For now, these expressions use the old, slow method. In a future PR we address these expressions.

### Does this PR introduce _any_ user-facing change?

This PR does not introduce user facing changes but my break binary compatibility of the code compiled against older versions. See the binary compatibility section.

### How was this patch tested?

This PR is mainly a refactoring and passes existing tests.

Closes #32030 from dbaliafroozeh/ImprovedMapChildren.

Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: herman <herman@databricks.com>
2021-04-09 15:06:26 +02:00
William Hyun ba32b200e4 [SPARK-34479][FOLLOWUP][DOC] Add zstandard codec to AvroOptions.compression comment
### What changes were proposed in this pull request?

This PR aims to add zstandard codec to the `AvroOptions.compression` comment.

### Why are the changes needed?

SPARK-34479 added zstandard codec.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
N/A

Closes #32050 from williamhyun/avro.

Authored-by: William Hyun <williamhyun3@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-04-05 07:30:59 +08:00
HyukjinKwon ebf01ec3c1 [SPARK-34950][TESTS] Update benchmark results to the ones created by GitHub Actions machines
### What changes were proposed in this pull request?

https://github.com/apache/spark/pull/32015 added a way to run benchmarks much more easily in the same GitHub Actions build. This PR updates the benchmark results by using the way.

**NOTE** that looks like GitHub Actions use four types of CPU given my observations:

- Intel(R) Xeon(R) Platinum 8171M CPU  2.60GHz
- Intel(R) Xeon(R) CPU E5-2673 v4  2.30GHz
- Intel(R) Xeon(R) CPU E5-2673 v3  2.40GHz
- Intel(R) Xeon(R) Platinum 8272CL CPU  2.60GHz

Given my quick research, seems like they perform roughly similarly:

![Screen Shot 2021-04-03 at 9 31 23 PM](https://user-images.githubusercontent.com/6477701/113478478-f4b57b80-94c3-11eb-9047-f81ca8c59672.png)

I couldn't find enough information about Intel(R) Xeon(R) Platinum 8272CL CPU  2.60GHz but the performance seems roughly similar given the numbers.

So shouldn't be a big deal especially given that this way is much easier, encourages contributors to run more and guarantee the same number of cores and same memory with the same softwares.

### Why are the changes needed?

To have a base line of the benchmarks accordingly.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

It was generated from:

- [Run benchmarks: * (JDK 11)](https://github.com/HyukjinKwon/spark/actions/runs/713575465)
- [Run benchmarks: * (JDK 8)](https://github.com/HyukjinKwon/spark/actions/runs/713154337)

Closes #32044 from HyukjinKwon/SPARK-34950.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-04-03 23:02:56 +03:00
yangjie01 7158e7f986 [SPARK-34900][TEST] Make sure benchmarks can run using spark-submit cmd described in the guide
### What changes were proposed in this pull request?
Some `spark-submit`  commands used to run benchmarks in the user's guide is wrong, we can't use these commands to run benchmarks successful.

So the major changes of this pr is correct these wrong commands, for example, run a benchmark which inherits from `SqlBasedBenchmark`, we must specify `--jars <spark core test jar>,<spark catalyst test jar>` because `SqlBasedBenchmark` based benchmark extends `BenchmarkBase(defined in spark core test jar)` and `SQLHelper(defined in spark catalyst test jar)`.

Another change of this pr is removed the `scalatest Assertions` dependency of Benchmarks because `scalatest-*.jar` are not in the distribution package, it will be troublesome to use.

### Why are the changes needed?
Make sure benchmarks can run using spark-submit cmd described in the guide

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Use the corrected `spark-submit` commands to run benchmarks successfully.

Closes #31995 from LuciferYang/fix-benchmark-guide.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-30 11:58:01 +09:00
Peter Toth 93a5d34f84 [SPARK-33482][SPARK-34756][SQL] Fix FileScan equality check
### What changes were proposed in this pull request?

This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.

### Why are the changes needed?
- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.

### Does this PR introduce _any_ user-facing change?
Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.

### How was this patch tested?
Added new UTs.

Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-23 17:01:16 +08:00
Ismaël Mejía 8a552bfc76 [SPARK-34778][BUILD] Upgrade to Avro 1.10.2
### What changes were proposed in this pull request?
Update the  Avro version to 1.10.2

### Why are the changes needed?
To stay up to date with upstream and catch compatibility issues with zstd

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests

Closes #31866 from iemejia/SPARK-27733-upgrade-avro-1.10.2.

Authored-by: Ismaël Mejía <iemejia@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-03-22 19:30:14 +08:00
Josh Soref f4de93efb0 [MINOR][SQL] Spelling: filters - PushedFilers
### What changes were proposed in this pull request?
Consistently correct the spelling of `PushedFilters`

### Why are the changes needed?
bersprockets noted that it's wrong

### Does this PR introduce _any_ user-facing change?

Technically, I think it does. Practically, neither Google nor GitHub show anyone using `pushedFilers` outside of forks (or the discussion about fixing it started at https://github.com/apache/spark/pull/30323#issuecomment-725568719)

### How was this patch tested?
None beyond CI in the previous PR

Closes #30678 from jsoref/spelling-filters.

Authored-by: Josh Soref <jsoref@users.noreply.github.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-03-22 08:00:12 +03:00
William Hyun c799d049fc [SPARK-34810][TEST] Update PostgreSQL test with the latest results
### What changes were proposed in this pull request?

This PR aims to update `PostgresIntegrationSuite` with the latest results.

### Why are the changes needed?

The latest PostgreSQL jar version is 42.2.19. Since 42.2.9, the test is broken because it returns `0.0` instead of `0.00`.
- https://jdbc.postgresql.org/documentation/changelog.html#version_42.2.19

42.2.9 (2019-12-06)
42.2.10 (2020-01-30)
42.2.11 (2020-03-09)
42.2.12 (2020-03-31)
42.2.13 (2020-06-04)
42.2.14 (2020-06-10)
42.2.15 (2020-08-14)
42.2.16 (2020-08-20)
42.2.17 (2020-10-09)
42.2.18 (2020-10-15)
42.2.19 (2021-02-18)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CI with the updated test cases.

```
build/sbt -Pdocker-integration-tests 'docker-integration-tests/testOnly org.apache.spark.sql.jdbc.PostgresIntegrationSuite'
```

Closes #31910 from williamhyun/pg.

Authored-by: William Hyun <williamhyun3@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-03-21 13:36:45 +08:00
Dongjoon Hyun 631a85ed9b [SPARK-34650][BUILD][SS] Exclude zstd-jni transitive dependency from Kafka Client
### What changes were proposed in this pull request?

This PR aims to exclude `zstd-jni` transitive dependency from kafka-client.

### Why are the changes needed?

To prevent future conflicts, the followings are removed. We should use Spark's zstd-jni dependency consistently.

```
$ build/sbt "token-provider-kafka-0-10/dependencyTree" | grep zstd
[info]   | +-com.github.luben:zstd-jni:1.4.4-7

$ build/sbt "streaming-kafka-0-10/dependencyTree" | grep zstd
[info]   | +-com.github.luben:zstd-jni:1.4.4-7
[info]   | | +-com.github.luben:zstd-jni:1.4.4-7

$ build/sbt "sql-kafka-0-10/dependencyTree" | grep zstd
[info]   | +-com.github.luben:zstd-jni:1.4.4-7
[info]   | | +-com.github.luben:zstd-jni:1.4.4-7
```
### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #31767 from dongjoon-hyun/SPARK-34650.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-07 13:53:55 +09:00
HyukjinKwon 3d0ee9604e [SPARK-34520][CORE][FOLLOW-UP] Remove SecurityManager in GangliaSink
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/31636. There was one place missed in `GangliaSink`, and we should also remove `SecurityManager`.

### Why are the changes needed?

To make `GangliaSink` work.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It was found in the internal it tests in the company I work for.

Closes #31688 from HyukjinKwon/SPARK-34520-followup.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-01 11:18:57 +09:00