Commit graph

667 commits

Author SHA1 Message Date
DB Tsai a12de29c1a [SPARK-27838][SQL] Support user provided non-nullable avro schema for nullable catalyst schema without any null record
## What changes were proposed in this pull request?

When the data is read from the sources, the catalyst schema is always nullable. Since Avro uses Union type to represent nullable, when any non-nullable avro file is read and then written out, the schema will always be changed.

This PR provides a solution for users to keep the Avro schema without being forced to use Union type.

## How was this patch tested?

One test is added.

Closes #24682 from dbtsai/avroNull.

Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-05-24 21:47:14 +00:00
Sean Owen 6c5827c723 [SPARK-27794][R][DOCS] Use https URL for CRAN repo
## What changes were proposed in this pull request?

Use https URL for CRAN repo (and for a Scala download in a Dockerfile)

## How was this patch tested?

Existing tests.

Closes #24664 from srowen/SPARK-27794.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-22 14:28:21 -07:00
Sean Owen 4d64ed8114 [SPARK-27796][MESOS] Remove obsolete spark-mesos Dockerfile example
## What changes were proposed in this pull request?

Remove obsolete spark-mesos Dockerfile example. This isn't tested and apparently hasn't been updated in 4 years.

## How was this patch tested?

N/A

Closes #24667 from srowen/SPARK-27796.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-21 10:53:55 -07:00
DB Tsai 808d9d05fc [SPARK-27762][SQL] Support user provided avro schema for writing fields with different ordering
## What changes were proposed in this pull request?

Spark Avro reader supports reading avro files with provided schema with different field orderings. However, the avro writer doesn't support this feature. This PR enables the Spark avro writer with this feature.

## How was this patch tested?

New test is added.

Closes #24635 from dbtsai/avroFix.

Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Brian Lindblom <blindblom@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-05-21 17:34:19 +00:00
Gabor Somogyi efa303581a [SPARK-27687][SS] Rename Kafka consumer cache capacity conf and document caching
## What changes were proposed in this pull request?

Kafka related Spark parameters has to start with `spark.kafka.` and not with `spark.sql.`. Because of this I've renamed `spark.sql.kafkaConsumerCache.capacity`.

Since Kafka consumer caching is not documented I've added this also.

## How was this patch tested?

Existing + added unit test.

```
cd docs
SKIP_API=1 jekyll build
```
and manual webpage check.

Closes #24590 from gaborgsomogyi/SPARK-27687.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-15 10:42:09 -07:00
Sean Owen a10608cb82 [SPARK-27680][CORE][SQL][GRAPHX] Remove usage of Traversable
## What changes were proposed in this pull request?

This removes usage of `Traversable`, which is removed in Scala 2.13. This is mostly an internal change, except for the change in the `SparkConf.setAll` method. See additional comments below.

## How was this patch tested?

Existing tests.

Closes #24584 from srowen/SPARK-27680.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-14 09:14:56 -05:00
hehuiyuan 5a8aad01c2 [SPARK-27343][KAFKA][SS] Avoid hardcoded for spark-sql-kafka-0-10
## What changes were proposed in this pull request?

[SPARK-27343](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-27343)

Based on the previous PR: https://github.com/apache/spark/pull/24270

Extracting parameters , building the objects of ConfigEntry.

For example:
for the parameter "spark.kafka.producer.cache.timeout",we build
```
private[kafka010] val PRODUCER_CACHE_TIMEOUT =
    ConfigBuilder("spark.kafka.producer.cache.timeout")
      .doc("The expire time to remove the unused producers.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("10m")
```

Closes #24574 from hehuiyuan/hehuiyuan-patch-9.

Authored-by: hehuiyuan <hehuiyuan@ZBMAC-C02WD3K5H.local>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-12 10:46:18 -05:00
Gengliang Wang 78a403fab9 [SPARK-27627][SQL] Make option "pathGlobFilter" as a general option for all file sources
## What changes were proposed in this pull request?

### Background:
The data source option `pathGlobFilter` is introduced for Binary file format: https://github.com/apache/spark/pull/24354 , which can be used for filtering file names, e.g. reading `.png` files only while there is `.json` files in the same directory.

### Proposal:
Make the option `pathGlobFilter` as a general option for all file sources. The path filtering should happen in the path globbing on Driver.

### Motivation:
Filtering the file path names in file scan tasks on executors is kind of ugly.

### Impact:
1. The splitting of file partitions will be more balanced.
2. The metrics of file scan will be more accurate.
3. Users can use the option for reading other file sources.

## How was this patch tested?

Unit tests

Closes #24518 from gengliangwang/globFilter.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-09 08:41:43 +09:00
Wenchen Fan bae5baae52 [SPARK-27642][SS] make v1 offset extends v2 offset
## What changes were proposed in this pull request?

To move DS v2 to the catalyst module, we can't make v2 offset rely on v1 offset, as v1 offset is in sql/core.

## How was this patch tested?

existing tests

Closes #24538 from cloud-fan/offset.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-05-07 23:03:15 -07:00
gengjiaan 8329e7debd [SPARK-27649][SS] Unify the way use 'spark.network.timeout'
## What changes were proposed in this pull request?

For historical reasons, structured streaming still has some old way of use
`spark.network.timeout`
, even though
`org.apache.spark.internal.config.Network.NETWORK_TIMEOUT`
is now available.

## How was this patch tested?
Exists UT.

Closes #24545 from beliefer/unify-spark-network-timeout.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-05-08 11:43:03 +08:00
Gabor Somogyi 2f55809425 [SPARK-27294][SS] Add multi-cluster Kafka delegation token
## What changes were proposed in this pull request?

The actual implementation doesn't support multi-cluster Kafka connection with delegation token. In this PR I've added this functionality.

What this PR contains:
* New way of configuration
* Multiple delegation token obtain/store/use functionality
* Documentation
* The change works on DStreams also

## How was this patch tested?

Existing + additional unit tests.
Additionally tested on cluster.

Test scenario:

* 2 * 4 node clusters
* The 4-4 nodes are in different kerberos realms
* Cross-Realm trust between the 2 realms
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512
* Artificial exceptions during processing
* Source reads from realm1 sink writes to realm2

Kafka broker settings:

* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)

Closes #24305 from gaborgsomogyi/SPARK-27294.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-05-07 11:40:43 -07:00
Adi Muraru 8ef4da753d [SPARK-27610][YARN] Shade netty native libraries
## What changes were proposed in this pull request?

Fixed the `spark-<version>-yarn-shuffle.jar` artifact packaging to shade the native netty libraries:
- shade the `META-INF/native/libnetty_*` native libraries when packagin
the yarn shuffle service jar. This is required as netty library loader
derives that based on shaded package name.
- updated the `org/spark_project` shade package prefix to `org/sparkproject`
(i.e. removed underscore) as the former breaks the netty native lib loading.

This was causing the yarn external shuffle service to fail
when spark.shuffle.io.mode=EPOLL

## How was this patch tested?
Manual tests

Closes #24502 from amuraru/SPARK-27610_master.

Authored-by: Adi Muraru <amuraru@adobe.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-05-07 10:47:36 -07:00
Wenchen Fan 6ef45301a4 [SPARK-27579][SQL] remove BaseStreamingSource and BaseStreamingSink
## What changes were proposed in this pull request?

`BaseStreamingSource` and `BaseStreamingSink` is used to unify v1 and v2 streaming data source API in some code paths.

This PR removes these 2 interfaces, and let the v1 API extend v2 API to keep API compatibility.

The motivation is https://github.com/apache/spark/pull/24416 . We want to move data source v2 to catalyst module, but `BaseStreamingSource` and `BaseStreamingSink` are in sql/core.

## How was this patch tested?

existing tests

Closes #24471 from cloud-fan/streaming.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-05-06 20:41:57 +08:00
Dilip Biswal 6001d476ce [SPARK-27596][SQL] The JDBC 'query' option doesn't work for Oracle database
## What changes were proposed in this pull request?
**Description from JIRA**
For the JDBC option `query`, we use the identifier name to start with underscore: s"(${subquery}) _SPARK_GEN_JDBC_SUBQUERY_NAME${curId.getAndIncrement()}". This is not supported by Oracle.
The Oracle doesn't seem to support identifier name to start with non-alphabet character (unless it is quoted) and has length restrictions as well. [link](https://docs.oracle.com/cd/B19306_01/server.102/b14200/sql_elements008.htm)

In this PR, the generated alias name 'SPARK_GEN_JDBC_SUBQUERY_NAME<int value>' is fixed to remove "_" prefix and also the alias name is shortened to not exceed the identifier length limit.

## How was this patch tested?
Tests are added for MySql, Postgress, Oracle and DB2 to ensure enough coverage.

Closes #24532 from dilipbiswal/SPARK-27596.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-05-05 21:52:23 -07:00
Gabor Somogyi fb6b19ab7c [SPARK-23014][SS] Fully remove V1 memory sink.
## What changes were proposed in this pull request?

There is a MemorySink v2 already so v1 can be removed. In this PR I've removed it completely.
What this PR contains:
* V1 memory sink removal
* V2 memory sink renamed to become the only implementation
* Since DSv2 sends exceptions in a chained format (linking them with cause field) I've made python side compliant
* Adapted all the tests

## How was this patch tested?

Existing unit tests.

Closes #24403 from gaborgsomogyi/SPARK-23014.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-04-29 09:44:23 -07:00
Sean Owen 8a17d26784 [SPARK-27536][CORE][ML][SQL][STREAMING] Remove most use of scala.language.existentials
## What changes were proposed in this pull request?

I want to get rid of as much use of `scala.language.existentials` as possible for 3.0. It's a complicated language feature that generates warnings unless this value is imported. It might even be on the way out of Scala: https://contributors.scala-lang.org/t/proposal-to-remove-existential-types-from-the-language/2785

For Spark, it comes up mostly where the code plays fast and loose with generic types, not the advanced situations you'll often see referenced where this feature is explained. For example, it comes up in cases where a function returns something like `(String, Class[_])`. Scala doesn't like matching this to any other instance of `(String, Class[_])` because doing so requires inferring the existence of some type that satisfies both. Seems obvious if the generic type is a wildcard, but, not technically something Scala likes to let you get away with.

This is a large PR, and it only gets rid of _most_ instances of `scala.language.existentials`. The change should be all compile-time and shouldn't affect APIs or logic.

Many of the changes simply touch up sloppiness about generic types, making the known correct value explicit in the code.

Some fixes involve being more explicit about the existence of generic types in methods. For instance, `def foo(arg: Class[_])` seems innocent enough but should really be declared `def foo[T](arg: Class[T])` to let Scala select and fix a single type when evaluating calls to `foo`.

For kind of surprising reasons, this comes up in places where code evaluates a tuple of things that involve a generic type, but is OK if the two parts of the tuple are evaluated separately.

One key change was altering `Utils.classForName(...): Class[_]` to the more correct `Utils.classForName[T](...): Class[T]`. This caused a number of small but positive changes to callers that otherwise had to cast the result.

In several tests, `Dataset[_]` was used where `DataFrame` seems to be the clear intent.

Finally, in a few cases in MLlib, the return type `this.type` was used where there are no subclasses of the class that uses it. This really isn't needed and causes issues for Scala reasoning about the return type. These are just changed to be concrete classes as return types.

After this change, we have only a few classes that still import `scala.language.existentials` (because modifying them would require extensive rewrites to fix) and no build warnings.

## How was this patch tested?

Existing tests.

Closes #24431 from srowen/SPARK-27536.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-04-29 11:02:01 -05:00
Koert Kuipers 7b367bfc86 [SPARK-27477][BUILD] Kafka token provider should have provided dependency on Spark
## What changes were proposed in this pull request?

Change spark-token-provider-kafka-0-10 dependency on spark-core to be provided

## How was this patch tested?

Ran existing unit tests

Closes #24384 from koertkuipers/feat-kafka-token-provider-fix-deps.

Authored-by: Koert Kuipers <koert@tresata.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-04-26 11:52:08 -07:00
Wenchen Fan 85fd552ed6 [SPARK-27190][SQL] add table capability for streaming
## What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/24012 , to add the corresponding capabilities for streaming.

## How was this patch tested?

existing tests

Closes #24129 from cloud-fan/capability.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-04-26 15:44:23 +08:00
uncleGen d2656aaecd [SPARK-27494][SS] Null values don't work in Kafka source v2
## What changes were proposed in this pull request?

Right now Kafka source v2 doesn't support null values. The issue is in org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which doesn't handle null values.

## How was this patch tested?

add new unit tests

Closes #24441 from uncleGen/SPARK-27494.

Authored-by: uncleGen <hustyugm@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-04-26 14:25:31 +08:00
Sean Owen 596a5ff273 [MINOR][BUILD] Update genjavadoc to 0.13
## What changes were proposed in this pull request?

Kind of related to https://github.com/gatorsmile/spark/pull/5 - let's update genjavadoc to see if it generates fewer spurious javadoc errors to begin with.

## How was this patch tested?

Existing docs build

Closes #24443 from srowen/genjavadoc013.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-04-24 13:44:48 +09:00
Gabor Somogyi 94adffa8b1 [SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility
## What changes were proposed in this pull request?

`Krb5LoginModule` supports debug parameter which is not yet supported from Spark side. This configuration makes it easier to debug authentication issues against Kafka.

In this PR `Krb5LoginModule` debug flag controlled by either `sun.security.krb5.debug` or `com.ibm.security.krb5.Krb5Debug`.

Additionally found some hardcoded values like `ssl.truststore.location`, etc... which could be error prone if Kafka changes it so in such cases Kafka define used.

## How was this patch tested?

Existing + additional unit tests + on cluster.

Closes #24204 from gaborgsomogyi/SPARK-27270.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-04-11 16:39:40 -07:00
Sean Owen 4ec7f631aa [SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition
## What changes were proposed in this pull request?

Fix build warnings -- see some details below.

But mostly, remove use of postfix syntax where it causes warnings without the `scala.language.postfixOps` import. This is mostly in expressions like "120000 milliseconds". Which, I'd like to simplify to things like "2.minutes" anyway.

## How was this patch tested?

Existing tests.

Closes #24314 from srowen/SPARK-27404.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-04-11 13:43:44 -05:00
gengjiaan 53e31e2ca1 [SPARK-27399][STREAMING][KAFKA] Arrange scattered config and reduce hardcode for kafka 10.
## What changes were proposed in this pull request?

I found a lot scattered config in `Kafka` streaming.I think should arrange these config in unified position.

## How was this patch tested?

No need UT.

Closes #24267 from beliefer/arrange-scattered-streaming-kafka-config.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-04-06 18:05:15 -05:00
Sean Owen d4420b455a [SPARK-27323][CORE][SQL][STREAMING] Use Single-Abstract-Method support in Scala 2.12 to simplify code
## What changes were proposed in this pull request?

Use Single Abstract Method syntax where possible (and minor related cleanup). Comments below. No logic should change here.

## How was this patch tested?

Existing tests.

Closes #24241 from srowen/SPARK-27323.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-04-02 07:37:05 -07:00
Zhu, Lipeng 1f2564d0b0 [SPARK-27155][TEST] Parameterize Oracle docker image name
## What changes were proposed in this pull request?

Update Oracle docker image name.

## How was this patch tested?

./build/mvn test -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12

Closes #24086 from lipzhu/SPARK-27155.

Authored-by: Zhu, Lipeng <lipzhu@ebay.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-25 15:17:41 -05:00
Sean Owen 8bc304f97e [SPARK-26132][BUILD][CORE] Remove support for Scala 2.11 in Spark 3.0.0
## What changes were proposed in this pull request?

Remove Scala 2.11 support in build files and docs, and in various parts of code that accommodated 2.11. See some targeted comments below.

## How was this patch tested?

Existing tests.

Closes #23098 from srowen/SPARK-26132.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-25 10:46:42 -05:00
Dongjoon Hyun 6ef94e0f18 [SPARK-27260][SS] Upgrade to Kafka 2.2.0
## What changes were proposed in this pull request?

This PR aims to update Kafka dependency to 2.2.0 to bring the following improvement and bug fixes.
- https://issues.apache.org/jira/projects/KAFKA/versions/12344063

Due to [KAFKA-4453](https://issues.apache.org/jira/browse/KAFKA-4453), data plane API and controller plane API are separated. Apache Spark needs the following changes.
```scala
- servers.head.apis.metadataCache
+ servers.head.dataPlaneRequestProcessor.metadataCache
```

## How was this patch tested?

Pass the Jenkins with the existing tests.

Closes #24190 from dongjoon-hyun/SPARK-27260.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-24 17:39:57 -07:00
Dongjoon Hyun 4d5247778a [SPARK-27197][SQL][TEST] Add ReadNestedSchemaTest for file-based data sources
## What changes were proposed in this pull request?

The reader schema is said to be evolved (or projected) when it changed after the data is written by writers. Apache Spark file-based data sources have a test coverage for that; e.g. [ReadSchemaSuite.scala](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala). This PR aims to add a test coverage for nested columns by adding and hiding nested columns.

## How was this patch tested?

Pass the Jenkins with newly added tests.

Closes #24139 from dongjoon-hyun/SPARK-27197.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-03-20 00:22:05 +00:00
Zhu, Lipeng 99c427b1d3 [SPARK-27168][SQL][TEST] Add docker integration test for MsSql server
## What changes were proposed in this pull request?

This PR aims to add a JDBC integration test for MsSql server.

## How was this patch tested?

```
./build/mvn clean install -DskipTests
./build/mvn test -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12 \
-Dtest=none -DwildcardSuites=org.apache.spark.sql.jdbc.MsSqlServerIntegrationSuite
```

Closes #24099 from lipzhu/SPARK-27168.

Lead-authored-by: Zhu, Lipeng <lipzhu@ebay.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: Lipeng Zhu <lipzhu@icloud.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-19 08:43:23 -07:00
Dongjoon Hyun 26e9849cb4 [SPARK-27195][SQL][TEST] Add AvroReadSchemaSuite
## What changes were proposed in this pull request?

The reader schema is said to be evolved (or projected) when it changed after the data is written by writers. Apache Spark file-based data sources have a test coverage for that, [ReadSchemaSuite.scala](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala). This PR aims to add `AvroReadSchemaSuite` to ensure the minimal consistency among file-based data sources and prevent a future regression in Avro data source.

## How was this patch tested?

Pass the Jenkins with the newly added test suite.

Closes #24135 from dongjoon-hyun/SPARK-27195.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-18 20:10:30 -07:00
Ryan Blue e348f14259 [SPARK-26811][SQL] Add capabilities to v2.Table
## What changes were proposed in this pull request?

This adds a new method, `capabilities` to `v2.Table` that returns a set of `TableCapability`. Capabilities are used to fail queries during analysis checks, `V2WriteSupportCheck`, when the table does not support operations, like truncation.

## How was this patch tested?

Existing tests for regressions, added new analysis suite, `V2WriteSupportCheckSuite`, for new capability checks.

Closes #24012 from rdblue/SPARK-26811-add-capabilities.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-18 18:25:11 +08:00
DylanGuedes 2fecc4a3fe [SPARK-27138][TESTS][KAFKA] Remove AdminUtils calls (fixes deprecation)
## What changes were proposed in this pull request?

To change calls to AdminUtils, currently used to create and delete topics in Kafka tests. With this change, it will rely on adminClient, the recommended way from now on.

## How was this patch tested?
I ran all unit tests and they are fine. Since it is already good tested, I thought that changes in the API wouldn't require new tests, as long as the current tests are working fine.

Closes #24071 from DylanGuedes/spark-27138.

Authored-by: DylanGuedes <djmgguedes@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-14 09:20:30 -05:00
Wenchen Fan 2a80a4cd39 [SPARK-27106][SQL] merge CaseInsensitiveStringMap and DataSourceOptions
## What changes were proposed in this pull request?

It's a little awkward to have 2 different classes(`CaseInsensitiveStringMap` and `DataSourceOptions`) to present the options in data source and catalog API.

This PR merges these 2 classes, while keeping the name `CaseInsensitiveStringMap`, which is more precise.

## How was this patch tested?

existing tests

Closes #24025 from cloud-fan/option.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-14 01:23:27 +08:00
Wenchen Fan d3813d8b21 [SPARK-27064][SS] create StreamingWrite at the beginning of streaming execution
## What changes were proposed in this pull request?

According to the [design](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing), the life cycle of `StreamingWrite` should be the same as the read side `MicroBatch/ContinuousStream`, i.e. each run of the stream query, instead of each epoch.

This PR fixes it.

## How was this patch tested?

existing tests

Closes #23981 from cloud-fan/dsv2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-13 19:47:54 +08:00
Gabor Somogyi 98a8725e66 [SPARK-27022][DSTREAMS] Add kafka delegation token support.
## What changes were proposed in this pull request?

It adds Kafka delegation token support for DStreams. Please be aware as Kafka native sink is not available for DStreams this PR contains delegation token usage only on consumer side.

What this PR contains:
* Usage of token through dynamic JAAS configuration
* `KafkaConfigUpdater` moved to `kafka-0-10-token-provider`
* `KafkaSecurityHelper` functionality moved into `KafkaTokenUtil`
* Documentation

## How was this patch tested?

Existing unit tests + on cluster.

Long running Kafka to file tests on 4 node cluster with randomly thrown artificial exceptions.

Test scenario:

* 4 node cluster
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512

Kafka broker settings:

* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)

After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75).
When token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token picked up correctly.

cd docs/
SKIP_API=1 jekyll build
Manual webpage check.

Closes #23929 from gaborgsomogyi/SPARK-27022.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-03-07 11:36:37 -08:00
Wenchen Fan 382d5a82b0 [SPARK-26956][SS] remove streaming output mode from data source v2 APIs
## What changes were proposed in this pull request?

Similar to `SaveMode`, we should remove streaming `OutputMode` from data source v2 API, and use operations that has clear semantic.

The changes are:
1. append mode: create `StreamingWrite` directly. By default, the `WriteBuilder` will create `Write` to append data.
2. complete mode: call `SupportsTruncate#truncate`. Complete mode means truncating all the old data and appending new data of the current epoch. `SupportsTruncate` has exactly the same semantic.
3. update mode: fail. The current streaming framework can't propagate the update keys, so v2 sinks are not able to implement update mode. In the future we can introduce a `SupportsUpdate` trait.

The behavior changes:
1. all the v2 sinks(foreach, console, memory, kafka, noop) don't support update mode. The fact is, previously all the v2 sinks implement the update mode wrong. None of them can really support it.
2. kafka sink doesn't support complete mode. The fact is, the kafka sink can only append data.

## How was this patch tested?

existing tests

Closes #23859 from cloud-fan/update.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-03-03 22:20:31 -08:00
liuxian 02bbe977ab [MINOR] Remove unnecessary gets when getting a value from map.
## What changes were proposed in this pull request?

Redundant `get`  when getting a value from `Map` given a key.

## How was this patch tested?

N/A

Closes #23901 from 10110346/removegetfrommap.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-01 11:48:07 -06:00
Gabor Somogyi 76e0b6bafb [SPARK-27002][SS] Get kafka delegation tokens right before consumer/producer created
## What changes were proposed in this pull request?

Spark not always picking up the latest Kafka delegation tokens even if a new one properly obtained.
In the PR I'm setting delegation tokens right before `KafkaConsumer` and `KafkaProducer` creation to be on the safe side.

## How was this patch tested?

Long running Kafka to Kafka tests on 4 node cluster with randomly thrown artificial exceptions.

Test scenario:
* 4 node cluster
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512

Kafka broker settings:
* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)

After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75).
But when token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token not always picked up.

Closes #23906 from gaborgsomogyi/SPARK-27002.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-27 10:07:02 -08:00
liuxian 7912dbb88f [MINOR] Simplify boolean expression
## What changes were proposed in this pull request?

Comparing whether Boolean expression is equal to true is redundant
For example:
The datatype of `a` is boolean.
Before:
if (a == true)
After:
if (a)

## How was this patch tested?
N/A

Closes #23884 from 10110346/simplifyboolean.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-27 08:38:00 -06:00
Wenchen Fan f85ed9a3e5 [SPARK-26785][SQL] data source v2 API refactor: streaming write
## What changes were proposed in this pull request?

Continue the API refactor for streaming write, according to the [doc](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing).

The major changes:
1. rename `StreamingWriteSupport` to `StreamingWrite`
2. add `WriteBuilder.buildForStreaming`
3. update existing sinks, to move the creation of `StreamingWrite` to `Table`

## How was this patch tested?

existing tests

Closes #23702 from cloud-fan/stream-write.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-02-18 16:17:24 -08:00
Gabor Somogyi 28ced387b9 [SPARK-26772][YARN] Delete ServiceCredentialProvider and make HadoopDelegationTokenProvider a developer API
## What changes were proposed in this pull request?

`HadoopDelegationTokenProvider` has basically the same functionality just like `ServiceCredentialProvider` so the interfaces can be merged.

`YARNHadoopDelegationTokenManager` now loads `ServiceCredentialProvider`s in one step. The drawback of this if one provider fails all others are not loaded. `HadoopDelegationTokenManager` loads `HadoopDelegationTokenProvider`s independently so it provides more robust behaviour.

In this PR I've I've made the following changes:
* Deleted `YARNHadoopDelegationTokenManager` and `ServiceCredentialProvider`
* Made `HadoopDelegationTokenProvider` a `DeveloperApi`

## How was this patch tested?

Existing unit tests.

Closes #23686 from gaborgsomogyi/SPARK-26772.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-15 14:43:13 -08:00
Hyukjin Kwon c406472970 [SPARK-26870][SQL] Move to_avro/from_avro into functions object due to Java compatibility
## What changes were proposed in this pull request?

Currently, looks, to use `from_avro` and `to_avro` in Java APIs side,

```java
import static org.apache.spark.sql.avro.package$.MODULE$;

MODULE$.to_avro
MODULE$.from_avro
```

This PR targets to deprecate and move both functions under `avro` package into `functions` object like the way of our `org.apache.spark.sql.functions`.

Therefore, Java side can import:

```java
import static org.apache.spark.sql.avro.functions.*;
```

and Scala side can import:

```scala
import org.apache.spark.sql.avro.functions._
```

## How was this patch tested?

Manually tested, and unit tests for Java APIs were added.

Closes #23784 from HyukjinKwon/SPARK-26870.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-02-15 10:24:35 +08:00
Maxim Gekk a829234df3 [SPARK-26817][CORE] Use System.nanoTime to measure time intervals
## What changes were proposed in this pull request?

In the PR, I propose to use `System.nanoTime()` instead of `System.currentTimeMillis()` in measurements of time intervals.

`System.currentTimeMillis()` returns current wallclock time and will follow changes to the system clock. Thus, negative wallclock adjustments can cause timeouts to "hang" for a long time (until wallclock time has caught up to its previous value again). This can happen when ntpd does a "step" after the network has been disconnected for some time. The most canonical example is during system bootup when DHCP takes longer than usual. This can lead to failures that are really hard to understand/reproduce. `System.nanoTime()` is guaranteed to be monotonically increasing irrespective of wallclock changes.

## How was this patch tested?

By existing test suites.

Closes #23727 from MaxGekk/system-nanotime.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-13 13:12:16 -06:00
Gabor Somogyi d0443a74d1 [SPARK-26766][CORE] Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens
## What changes were proposed in this pull request?

Delegation token providers interface now has a parameter `fileSystems` but this is needed only for `HadoopFSDelegationTokenProvider`.

In this PR I've addressed this issue in the following way:
* Removed `fileSystems` parameter from `HadoopDelegationTokenProvider`
* Moved `YarnSparkHadoopUtil.hadoopFSsToAccess` into `HadoopFSDelegationTokenProvider`
* Moved `spark.yarn.stagingDir` into core
* Moved `spark.yarn.access.namenodes` into core and renamed to `spark.kerberos.access.namenodes`
* Moved `spark.yarn.access.hadoopFileSystems` into core and renamed to `spark.kerberos.access.hadoopFileSystems`

## How was this patch tested?

Existing unit tests.

Closes #23698 from gaborgsomogyi/SPARK-26766.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-08 13:41:52 -08:00
Gengliang Wang 308996bc72 [SPARK-26716][SPARK-26765][FOLLOWUP][SQL] Clean up schema validation methods and override toString method in Avro
## What changes were proposed in this pull request?

In #23639, the API `supportDataType` is refactored. We should also remove the method `verifyWriteSchema` and `verifyReadSchema` in `DataSourceUtils`.

Since the error message use `FileFormat.toString` to specify the data source naming,  this PR also overriding the `toString` method in `AvroFileFormat`.

## How was this patch tested?

Unit test.

Closes #23699 from gengliangwang/SPARK-26716-followup.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-01-31 15:44:44 +08:00
ryne.yang fbc3c5e8a3
[SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation
## What changes were proposed in this pull request?

Fix the integer overflow issue in rateLimit.

## How was this patch tested?

Pass the Jenkins with newly added UT for the possible case where integer could be overflowed.

Closes #23666 from linehrr/master.

Authored-by: ryne.yang <ryne.yang@acuityads.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2019-01-29 10:58:10 -08:00
Gengliang Wang 1beed0d7c2 [SPARK-26765][SQL] Avro: Validate input and output schema
## What changes were proposed in this pull request?

The API `supportDataType` in `FileFormat` helps to validate the output/input schema before exection starts. So that we can avoid some invalid data source IO, and users can see clean error messages.

This PR is to override the validation API in Avro data source.
Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), `NullType` is supported. This PR fixes the handling of `NullType`.

## How was this patch tested?

Unit test

Closes #23684 from gengliangwang/avroSupportDataType.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-01-30 00:17:33 +08:00
Wenchen Fan e97ab1d980 [SPARK-26695][SQL] data source v2 API refactor - continuous read
## What changes were proposed in this pull request?

Following https://github.com/apache/spark/pull/23430, this PR does the API refactor for continuous read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)

The major changes:
1. rename `XXXContinuousReadSupport` to `XXXContinuousStream`
2. at the beginning of continuous streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`.
3. remove all the hacks as we have finished all the read side API refactor

## How was this patch tested?

existing tests

Closes #23619 from cloud-fan/continuous.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-01-29 00:07:27 -08:00
Gabor Somogyi 773efede20 [SPARK-26254][CORE] Extract Hive + Kafka dependencies from Core.
## What changes were proposed in this pull request?

There are ugly provided dependencies inside core for the following:
* Hive
* Kafka

In this PR I've extracted them out. This PR contains the following:
* Token providers are now loaded with service loader
* Hive token provider moved to hive project
* Kafka token provider extracted into a new project

## How was this patch tested?

Existing + newly added unit tests.
Additionally tested on cluster.

Closes #23499 from gaborgsomogyi/SPARK-26254.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-01-25 10:36:00 -08:00
Wenchen Fan 098a2c41fc [SPARK-26520][SQL] data source v2 API refactor (micro-batch read)
## What changes were proposed in this pull request?

Following https://github.com/apache/spark/pull/23086, this PR does the API refactor for micro-batch read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)

The major changes:
1. rename `XXXMicroBatchReadSupport` to `XXXMicroBatchReadStream`
2. implement `TableProvider`, `Table`, `ScanBuilder` and `Scan` for streaming sources
3. at the beginning of micro-batch streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`.

followup:
support operator pushdown for stream sources

## How was this patch tested?

existing tests

Closes #23430 from cloud-fan/micro-batch.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-01-21 14:29:12 -08:00
Jungtaek Lim (HeartSaVioR) 2ebb79b2a6
[SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350
## What changes were proposed in this pull request?

This patch adds the check to verify consumer group id is given correctly when custom group id is provided to Kafka parameter.

## How was this patch tested?

Modified UT.

Closes #23544 from HeartSaVioR/SPARK-26350-follow-up-actual-verification-on-UT.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-01-15 14:21:51 -08:00
Shixiong Zhu bafc7ac025
[SPARK-26350][SS] Allow to override group id of the Kafka consumer
## What changes were proposed in this pull request?

This PR allows the user to override `kafka.group.id` for better monitoring or security. The user needs to make sure there are not multiple queries or sources using the same group id.

It also fixes a bug that the `groupIdPrefix` option cannot be retrieved.

## How was this patch tested?

The new added unit tests.

Closes #23301 from zsxwing/SPARK-26350.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-01-14 13:37:24 -08:00
Oleksii Shkarupin 5b37092311
[SPARK-26538][SQL] Set default precision and scale for elements of postgres numeric array
## What changes were proposed in this pull request?

When determining CatalystType for postgres columns with type `numeric[]` set the type of array element to `DecimalType(38, 18)` instead of `DecimalType(0,0)`.

## How was this patch tested?

Tested with modified `org.apache.spark.sql.jdbc.JDBCSuite`.
Ran the `PostgresIntegrationSuite` manually.

Closes #23456 from a-shkarupin/postgres_numeric_array.

Lead-authored-by: Oleksii Shkarupin <a.shkarupin@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2019-01-12 11:06:39 -08:00
Dongjoon Hyun 5969b8a2ed
[SPARK-26541][BUILD] Add -Pdocker-integration-tests to dev/scalastyle
## What changes were proposed in this pull request?

This PR makes `scalastyle` to check `docker-integration-tests` module additionally and fixes one error.

## How was this patch tested?

Pass the Jenkins with the updated Scalastyle.
```
========================================================================
Running Scala style checks
========================================================================
Scalastyle checks passed.
```

Closes #23459 from dongjoon-hyun/SPARK-26541.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2019-01-05 00:55:17 -08:00
Dongjoon Hyun e15a319ccd
[SPARK-26536][BUILD][TEST] Upgrade Mockito to 2.23.4
## What changes were proposed in this pull request?

This PR upgrades Mockito from 1.10.19 to 2.23.4. The following changes are required.

- Replace `org.mockito.Matchers` with `org.mockito.ArgumentMatchers`
- Replace `anyObject` with `any`
- Replace `getArgumentAt` with `getArgument` and add type annotation.
- Use `isNull` matcher in case of `null` is invoked.
```scala
     saslHandler.channelInactive(null);
-    verify(handler).channelInactive(any(TransportClient.class));
+    verify(handler).channelInactive(isNull());
```

- Make and use `doReturn` wrapper to avoid [SI-4775](https://issues.scala-lang.org/browse/SI-4775)
```scala
private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
```

## How was this patch tested?

Pass the Jenkins with the existing tests.

Closes #23452 from dongjoon-hyun/SPARK-26536.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2019-01-04 19:23:38 -08:00
Dongjoon Hyun ceff0c8450
[SPARK-26428][SS][TEST] Minimize deprecated ProcessingTime usage
## What changes were proposed in this pull request?

Use of `ProcessingTime` class was deprecated in favor of `Trigger.ProcessingTime` in Spark 2.2. And, [SPARK-21464](https://issues.apache.org/jira/browse/SPARK-21464) minimized it at 2.2.1. Recently, it grows again in test suites. This PR aims to clean up newly introduced deprecation warnings for Spark 3.0.

## How was this patch tested?

Pass the Jenkins with existing tests and manually check the warnings.

Closes #23367 from dongjoon-hyun/SPARK-26428.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2018-12-22 00:43:59 -08:00
Shixiong Zhu 8e76d6621a
[SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka
## What changes were proposed in this pull request?

Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), Kafka may return an earliest offset when we are request a latest offset. This will cause Spark to reprocess data.

As per suggestion in KAFKA-7703, we put a position call between poll and seekToEnd to block the fetch request triggered by `poll` before calling `seekToEnd`.

In addition, to avoid other unknown issues, we also use the previous known offsets to audit the latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times.

## How was this patch tested?

Jenkins

Closes #23324 from zsxwing/SPARK-26267.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-12-21 10:41:25 -08:00
Vaclav Kosar 81d377d772 [SPARK-24933][SS] Report numOutputRows in SinkProgress
## What changes were proposed in this pull request?

SinkProgress should report similar properties like SourceProgress as long as they are available for given Sink. Count of written rows is metric availble for all Sinks. Since relevant progress information is with respect to commited rows, ideal object to carry this info is WriterCommitMessage. For brevity the implementation will focus only on Sinks with API V2 and on Micro Batch mode. Implemention for Continuous mode will be provided at later date.

### Before
```
{"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider3c0bd317"}
```

### After
```
{"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider3c0bd317","numOutputRows":5000}
```

### This PR is related to:
- https://issues.apache.org/jira/browse/SPARK-24647
- https://issues.apache.org/jira/browse/SPARK-21313

## How was this patch tested?

Existing and new unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #21919 from vackosar/feature/SPARK-24933-numOutputRows.

Lead-authored-by: Vaclav Kosar <admin@vaclavkosar.com>
Co-authored-by: Kosar, Vaclav: Functions Transformation <Vaclav.Kosar@barclayscapital.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-12-17 11:50:24 -08:00
Gabor Somogyi 5a116e669c
[SPARK-26371][SS] Increase kafka ConfigUpdater test coverage.
## What changes were proposed in this pull request?

As Kafka delegation token added logic into ConfigUpdater it would be good to test it.
This PR contains the following changes:
* ConfigUpdater extracted to a separate file and renamed to KafkaConfigUpdater
* mockito-core dependency added to kafka-0-10-sql
* Unit tests added

## How was this patch tested?

Existing + new unit tests + on cluster.

Closes #23321 from gaborgsomogyi/SPARK-26371.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2018-12-17 10:07:35 -08:00
jasonwayne 9c481c7a6b [SPARK-26360] remove redundant validateQuery call
## What changes were proposed in this pull request?
remove a redundant `KafkaWriter.validateQuery` call in `KafkaSourceProvider `

## How was this patch tested?
Just removing duplicate codes, so I just build and run unit tests.

Closes #23309 from JasonWayne/SPARK-26360.

Authored-by: jasonwayne <wuwenjie0102@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2018-12-14 10:47:58 +08:00
Gabor Somogyi 6daa783094 [SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration.
## What changes were proposed in this pull request?

When Kafka delegation token obtained, SCRAM `sasl.mechanism` has to be configured for authentication. This can be configured on the related source/sink which is inconvenient from user perspective. Such granularity is not required and this configuration can be implemented with one central parameter.

In this PR `spark.kafka.sasl.token.mechanism` added to configure this centrally (default: `SCRAM-SHA-512`).

## How was this patch tested?

Existing unit tests + on cluster.

Closes #23274 from gaborgsomogyi/SPARK-26322.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2018-12-12 16:45:50 -08:00
Gabor Somogyi 9b1f6c8bab [SPARK-26304][SS] Add default value to spark.kafka.sasl.kerberos.service.name parameter
## What changes were proposed in this pull request?

spark.kafka.sasl.kerberos.service.name is an optional parameter but most of the time value `kafka` has to be set. As I've written in the jira the following reasoning is behind:
* Kafka's configuration guide suggest the same value: https://kafka.apache.org/documentation/#security_sasl_kerberos_brokerconfig
* It would be easier for spark users by providing less configuration
* Other streaming engines are doing the same

In this PR I've changed the parameter from optional to `WithDefault` and set `kafka` as default value.

## How was this patch tested?

Available unit tests + on cluster.

Closes #23254 from gaborgsomogyi/SPARK-26304.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2018-12-07 13:58:02 -08:00
Wenchen Fan 2b2c94a3ee [SPARK-25528][SQL] data source v2 API refactor (batch read)
## What changes were proposed in this pull request?

This is the first step of the data source v2 API refactor [proposal](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)

It adds the new API for batch read, without removing the old APIs, as they are still needed for streaming sources.

More concretely, it adds
1. `TableProvider`, works like an anonymous catalog
2. `Table`, represents a structured data set.
3. `ScanBuilder` and `Scan`, a logical represents of data source scan
4. `Batch`, a physical representation of data source batch scan.

## How was this patch tested?

existing tests

Closes #23086 from cloud-fan/refactor-batch.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-11-30 00:02:43 -08:00
Gabor Somogyi 0166c7373e [SPARK-25501][SS] Add kafka delegation token support.
## What changes were proposed in this pull request?

It adds kafka delegation token support for structured streaming. Please see the relevant [SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing)

What this PR contains:
* Configuration parameters for the feature
* Delegation token fetching from broker
* Usage of token through dynamic JAAS configuration
* Minor refactoring in the existing code

What this PR doesn't contain:
* Documentation changes because design can change

## How was this patch tested?

Existing tests + added small amount of additional unit tests.

Because it's an external service integration mainly tested on cluster.
* 4 node cluster
* Kafka broker version 1.1.0
* Topic with 4 partitions
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-256

An example of obtaining a token:
```
18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID         HMAC                           OWNER           RENEWERS                  ISSUEDATE       EXPIRYDATE      MAXDATE
18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user    []                        2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07
18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67
```

An example token usage:
```
18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]";
18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login.
```

Closes #22598 from gaborgsomogyi/SPARK-25501.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2018-11-29 18:00:47 -08:00
Wenchen Fan fa0d4bf699 [SPARK-25829][SQL] remove duplicated map keys with last wins policy
## What changes were proposed in this pull request?

Currently duplicated map keys are not handled consistently. For example, map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc.

This PR proposes to remove duplicated map keys with last wins policy, to follow Java/Scala and Presto. It only applies to built-in functions, as users can create map with duplicated map keys via private APIs anyway.

updated functions: `CreateMap`, `MapFromArrays`, `MapFromEntries`, `StringToMap`, `MapConcat`, `TransformKeys`.

For other places:
1. data source v1 doesn't have this problem, as users need to provide a java/scala map, which can't have duplicated keys.
2. data source v2 may have this problem. I've added a note to `ArrayBasedMapData` to ask the caller to take care of duplicated keys. In the future we should enforce it in the stable data APIs for data source v2.
3. UDF doesn't have this problem, as users need to provide a java/scala map. Same as data source v1.
4. file format. I checked all of them and only parquet does not enforce it. For backward compatibility reasons I change nothing but leave a note saying that the behavior will be undefined if users write map with duplicated keys to parquet files. Maybe we can add a config and fail by default if parquet files have map with duplicated keys. This can be done in followup.

## How was this patch tested?

updated tests and new tests

Closes #23124 from cloud-fan/map.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-11-28 23:42:13 +08:00
Anastasios Zouzias 2512a1d429 [SPARK-26121][STRUCTURED STREAMING] Allow users to define prefix of Kafka's consumer group (group.id)
## What changes were proposed in this pull request?

Allow the Spark Structured Streaming user to specify the prefix of the consumer group (group.id), compared to force consumer group ids of the form `spark-kafka-source-*`

## How was this patch tested?

Unit tests provided by Spark (backwards compatible change, i.e., user can optionally use the functionality)

`mvn test -pl external/kafka-0-10`

Closes #23103 from zouzias/SPARK-26121.

Authored-by: Anastasios Zouzias <anastasios@sqooba.io>
Signed-off-by: cody koeninger <cody@koeninger.org>
2018-11-26 11:10:38 -06:00
Dongjoon Hyun 1d3dd58d21
[SPARK-25954][SS][FOLLOWUP][TEST-MAVEN] Add Zookeeper 3.4.7 test dependency to Kafka modules
## What changes were proposed in this pull request?

This is a followup of #23099 . After upgrading to Kafka 2.1.0, maven test fails due to Zookeeper test dependency while sbt test succeeds.

- [sbt test on master branch](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/5203/)
- [maven test on master branch](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/5653/)

The root cause is that the embedded Kafka server is using [Zookeepr 3.4.7 API](https://zookeeper.apache.org/doc/r3.4.7/api/org/apache/zookeeper/AsyncCallback.MultiCallback.html
) while Apache Spark provides Zookeeper 3.4.6. This PR adds a test dependency.

```
KafkaMicroBatchV2SourceSuite:
*** RUN ABORTED ***
...
org.apache.spark.sql.kafka010.KafkaTestUtils.setupEmbeddedKafkaServer(KafkaTestUtils.scala:123)
  ...
  Cause: java.lang.ClassNotFoundException: org.apache.zookeeper.AsyncCallback$MultiCallback
  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1693)
  at kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:348)
  at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:372)
  at kafka.server.KafkaServer.startup(KafkaServer.scala:202)
  at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$setupEmbeddedKafkaServer$2(KafkaTestUtils.scala:120)
  at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$setupEmbeddedKafkaServer$2$adapted(KafkaTestUtils.scala:116)
  ...
```

## How was this patch tested?

Pass the maven Jenkins test.

Closes #23119 from dongjoon-hyun/SPARK-25954-2.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2018-11-22 22:45:08 -08:00
Dongjoon Hyun a480a62563
[SPARK-25954][SS] Upgrade to Kafka 2.1.0
## What changes were proposed in this pull request?

[Kafka 2.1.0 vote](https://lists.apache.org/thread.html/9f487094491e512b556a1c9c3c6034ac642b088e3f797e3d192ebc9d%3Cdev.kafka.apache.org%3E) passed. Since Kafka 2.1.0 includes official JDK 11 support [KAFKA-7264](https://issues.apache.org/jira/browse/KAFKA-7264), we had better use that.

## How was this patch tested?

Pass the Jenkins.

Closes #23099 from dongjoon-hyun/SPARK-25954.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2018-11-21 00:24:34 -08:00
Sean Owen 32365f8177 [SPARK-26090][CORE][SQL][ML] Resolve most miscellaneous deprecation and build warnings for Spark 3
## What changes were proposed in this pull request?

The build has a lot of deprecation warnings. Some are new in Scala 2.12 and Java 11. We've fixed some, but I wanted to take a pass at fixing lots of easy miscellaneous ones here.

They're too numerous and small to list here; see the pull request. Some highlights:

- `BeanInfo` is deprecated in 2.12, and BeanInfo classes are pretty ancient in Java. Instead, case classes can explicitly declare getters
- Eta expansion of zero-arg methods; foo() becomes () => foo() in many cases
- Floating-point Range is inexact and deprecated, like 0.0 to 100.0 by 1.0
- finalize() is finally deprecated (just needs to be suppressed)
- StageInfo.attempId was deprecated and easiest to remove here

I'm not now going to touch some chunks of deprecation warnings:

- Parquet deprecations
- Hive deprecations (particularly serde2 classes)
- Deprecations in generated code (mostly Thriftserver CLI)
- ProcessingTime deprecations (we may need to revive this class as internal)
- many MLlib deprecations because they concern methods that may be removed anyway
- a few Kinesis deprecations I couldn't figure out
- Mesos get/setRole, which I don't know well
- Kafka/ZK deprecations (e.g. poll())
- Kinesis
- a few other ones that will probably resolve by deleting a deprecated method

## How was this patch tested?

Existing tests, including manual testing with the 2.11 build and Java 11.

Closes #23065 from srowen/SPARK-26090.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-11-19 09:16:42 -06:00
Sean Owen 630e25e355 [SPARK-26026][BUILD] Published Scaladoc jars missing from Maven Central
## What changes were proposed in this pull request?

This restores scaladoc artifact generation, which got dropped with the Scala 2.12 update. The change looks large, but is almost all due to needing to make the InterfaceStability annotations top-level classes (i.e. `InterfaceStability.Stable` -> `Stable`), unfortunately. A few inner class references had to be qualified too.

Lots of scaladoc warnings now reappear. We can choose to disable generation by default and enable for releases, later.

## How was this patch tested?

N/A; build runs scaladoc now.

Closes #23069 from srowen/SPARK-26026.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-11-19 08:06:33 -06:00
DB Tsai ad853c5678
[SPARK-25956] Make Scala 2.12 as default Scala version in Spark 3.0
## What changes were proposed in this pull request?

This PR makes Spark's default Scala version as 2.12, and Scala 2.11 will be the alternative version. This implies that Scala 2.12 will be used by our CI builds including pull request builds.

We'll update the Jenkins to include a new compile-only jobs for Scala 2.11 to ensure the code can be still compiled with Scala 2.11.

## How was this patch tested?

existing tests

Closes #22967 from dbtsai/scala2.12.

Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2018-11-14 16:22:23 -08:00
Yuanjian Li 2977e2312d [SPARK-25986][BUILD] Add rules to ban throw Errors in application code
## What changes were proposed in this pull request?

Add scala and java lint check rules to ban the usage of `throw new xxxErrors` and fix up all exists instance followed by https://github.com/apache/spark/pull/22989#issuecomment-437939830. See more details in https://github.com/apache/spark/pull/22969.

## How was this patch tested?

Local test with lint-scala and lint-java.

Closes #22989 from xuanyuanking/SPARK-25986.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-11-14 13:05:18 -08:00
Gengliang Wang 922dfe4865
[SPARK-25965][SQL][TEST] Add avro read benchmark
Add read benchmark for Avro, which is missing for a period.
The benchmark is similar to `DataSourceReadBenchmark` and `OrcReadBenchmark`

Manually run benchmark

Closes #22966 from gengliangwang/avroReadBenchmark.

Lead-authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Co-authored-by: Gengliang Wang <ltnwgl@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2018-11-14 11:26:26 -08:00
Sean Owen 2d085c13b7 [SPARK-25984][CORE][SQL][STREAMING] Remove deprecated .newInstance(), primitive box class constructor calls
## What changes were proposed in this pull request?

Deprecated in Java 11, replace Class.newInstance with Class.getConstructor.getInstance, and primtive wrapper class constructors with valueOf or equivalent

## How was this patch tested?

Existing tests.

Closes #22988 from srowen/SPARK-25984.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-11-10 09:52:14 -06:00
Gengliang Wang 57eddc7182 [SPARK-25886][SQL][MINOR] Improve error message of FailureSafeParser and from_avro in FAILFAST mode
## What changes were proposed in this pull request?

Currently in `FailureSafeParser` and `from_avro`, the exception is created with such code
```
throw new SparkException("Malformed records are detected in record parsing. " +
s"Parse Mode: ${FailFastMode.name}.", e.cause)
```

1. The cause part should be `e` instead of `e.cause`
2. If `e` contains non-null message, it should be shown in `from_json`/`from_csv`/`from_avro`, e.g.
```
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('1' (code 49)): was expecting a colon to separate field name and value
at [Source: (InputStreamReader); line: 1, column: 7]
```
3.Kindly show hint for trying PERMISSIVE in error message.

## How was this patch tested?
Unit test.

Closes #22895 from gengliangwang/improve_error_msg.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-10-31 20:22:57 +08:00
yucai f8484e49ef
[SPARK-25663][SPARK-25661][SQL][TEST] Refactor BuiltInDataSourceWriteBenchmark, DataSourceWriteBenchmark and AvroWriteBenchmark to use main method
## What changes were proposed in this pull request?

Refactor BuiltInDataSourceWriteBenchmark, DataSourceWriteBenchmark and AvroWriteBenchmark to use main method.

```
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.BuiltInDataSourceWriteBenchmark"

SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "avro/test:runMain org.apache.spark.sql.execution.benchmark.AvroWriteBenchmark"
```
## How was this patch tested?

manual tests

Closes #22861 from yucai/BuiltInDataSourceWriteBenchmark.

Lead-authored-by: yucai <yyu1@ebay.com>
Co-authored-by: Yucai Yu <yucai.yu@foxmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2018-10-31 03:03:42 -07:00
Dilip Biswal 34c3bc9f1e
[SPARK-25618][SQL][TEST] Reduce time taken to execute KafkaContinuousSourceStressForDontFailOnDataLossSuite
## What changes were proposed in this pull request?
In this test, i have reduced the test time to 20 secs from 1 minute while reducing the sleep time from 1 sec to 100 milliseconds.

With this change, i was able to run the test in 20+ seconds consistently on my laptop. I would like see if it passes in jenkins consistently.

## How was this patch tested?
Its a test fix.

Closes #22900 from dilipbiswal/SPARK-25618.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2018-10-31 02:57:39 -07:00
Gengliang Wang 0ad93b0931 [SPARK-25883][SQL][MINOR] Override method prettyName in from_avro/to_avro
## What changes were proposed in this pull request?

Previously in from_avro/to_avro, we override the method `simpleString` and `sql` for the string output. However, the override only affects the alias naming:
```
Project [from_avro('col,
...
, (mode,PERMISSIVE)) AS from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))#11]
```
It only makes the alias name quite long: `from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))`).

We should follow `from_csv`/`from_json` here, to override the method prettyName only, and we will get a clean alias name

```
... AS from_avro(col)#11
```

## How was this patch tested?

Manual check

Closes #22890 from gengliangwang/revise_from_to_avro.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-10-30 23:59:37 -07:00
Gengliang Wang 24e8c27dfe [SPARK-25819][SQL] Support parse mode option for the function from_avro
## What changes were proposed in this pull request?

Current the function `from_avro` throws exception on reading corrupt records.
In practice, there could be various reasons of data corruption. It would be good to support `PERMISSIVE` mode and allow the function from_avro to process all the input file/streaming, which is consistent with from_json and from_csv. There is no obvious down side for supporting `PERMISSIVE` mode.

Different from `from_csv` and `from_json`, the default parse mode is `FAILFAST` for the following reasons:
1. Since Avro is structured data format, input data is usually able to be parsed by certain schema.  In such case, exposing the problems of input data to users is better than hiding it.
2. For `PERMISSIVE` mode, we have to force the data schema as fully nullable. This seems quite unnecessary for Avro. Reversing non-null schema might archive more perf optimizations in Spark.
3. To be consistent with the behavior in Spark 2.4 .

## How was this patch tested?

Unit test

Manual previewing generated html for the Avro data source doc:

![image](https://user-images.githubusercontent.com/1097932/47510100-02558880-d8aa-11e8-9d57-a43daee4c6b9.png)

Closes #22814 from gengliangwang/improve_from_avro.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-10-26 11:39:38 +08:00
Sean Owen f83fedc9f2 [SPARK-25737][CORE] Remove JavaSparkContextVarargsWorkaround
## What changes were proposed in this pull request?

Remove JavaSparkContextVarargsWorkaround

## How was this patch tested?

Existing tests.

Closes #22729 from srowen/SPARK-25737.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-10-24 14:43:51 -05:00
Sean Owen 734c6af0dd [SPARK-24601][FOLLOWUP] Update Jackson to 2.9.6 in Kinesis
## What changes were proposed in this pull request?

Also update Kinesis SDK's Jackson to match Spark's

## How was this patch tested?

Existing tests, including Kinesis ones, which ought to be hereby triggered.
This was uncovered, I believe, in https://github.com/apache/spark/pull/22729#issuecomment-430666080

Closes #22757 from srowen/SPARK-24601.2.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-10-18 07:00:00 -05:00
Takeshi Yamamuro a9f685bb70 [SPARK-25734][SQL] Literal should have a value corresponding to dataType
## What changes were proposed in this pull request?
`Literal.value` should have a value a value corresponding to `dataType`. This pr added code to verify it and fixed the existing tests to do so.

## How was this patch tested?
Modified the existing tests.

Closes #22724 from maropu/SPARK-25734.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-10-17 11:02:39 +08:00
Dilip Biswal 9d4dd7992b [SPARK-25631][SPARK-25632][SQL][TEST] Improve the test runtime of KafkaRDDSuite
## What changes were proposed in this pull request?
Set a reasonable poll timeout thats used while consuming topics/partitions from kafka. In the
absence of it, a default of 2 minute is used as the timeout values. And all the negative tests take a minimum of 2 minute to execute.

After this change, we save about 4 minutes in this suite.

## How was this patch tested?
Test fix.

Closes #22670 from dilipbiswal/SPARK-25631.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-10-16 17:49:40 -05:00
Sean Owen 703e6da1ec [SPARK-25705][BUILD][STREAMING][TEST-MAVEN] Remove Kafka 0.8 integration
## What changes were proposed in this pull request?

Remove Kafka 0.8 integration

## How was this patch tested?

Existing tests, build scripts

Closes #22703 from srowen/SPARK-25705.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-10-16 09:10:24 -05:00
Gengliang Wang 2eaf058788 [SPARK-25718][SQL] Detect recursive reference in Avro schema and throw exception
## What changes were proposed in this pull request?

Avro schema allows recursive reference, e.g. the schema for linked-list in https://avro.apache.org/docs/1.8.2/spec.html#schema_record
```
{
  "type": "record",
  "name": "LongList",
  "aliases": ["LinkedLongs"],                      // old name for this
  "fields" : [
    {"name": "value", "type": "long"},             // each element has a long
    {"name": "next", "type": ["null", "LongList"]} // optional next element
  ]
}
```

In current Spark SQL, it is impossible to convert the schema as `StructType` . Run `SchemaConverters.toSqlType(avroSchema)` and we will get stack overflow exception.

We should detect the recursive reference and throw exception for it.
## How was this patch tested?

New unit test case.

Closes #22709 from gengliangwang/avroRecursiveRef.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-10-13 14:49:38 +08:00
Sean Owen a001814189 [SPARK-25598][STREAMING][BUILD][TEST-MAVEN] Remove flume connector in Spark 3
## What changes were proposed in this pull request?

Removes all vestiges of Flume in the build, for Spark 3.
I don't think this needs Jenkins config changes.

## How was this patch tested?

Existing tests.

Closes #22692 from srowen/SPARK-25598.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-10-11 14:28:06 -07:00
Dilip Biswal adf648b5be [SPARK-25615][SQL][TEST] Improve the test runtime of KafkaSinkSuite: streaming write to non-existing topic
## What changes were proposed in this pull request?
Specify `kafka.max.block.ms` to 10 seconds while creating the kafka writer. In the absence of this overridden config, by default it uses a default time out of 60 seconds.

With this change the test completes in close to 10 seconds as opposed to 1 minute.

## How was this patch tested?
This is a test fix.

Closes #22671 from dilipbiswal/SPARK-25615.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-10-11 14:10:07 -07:00
Dongjoon Hyun 9cbf105ab1
[SPARK-25644][SS][FOLLOWUP][BUILD] Fix Scala 2.12 build error due to foreachBatch
## What changes were proposed in this pull request?

This PR fixes the Scala-2.12 build error due to ambiguity in `foreachBatch` test cases.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/428/console
```scala
[error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala:102: ambiguous reference to overloaded definition,
[error] both method foreachBatch in class DataStreamWriter of type (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[Int],Long])org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] and  method foreachBatch in class DataStreamWriter of type (function: (org.apache.spark.sql.Dataset[Int], Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] match argument types ((org.apache.spark.sql.Dataset[Int], Any) => Unit)
[error]       ds.writeStream.foreachBatch((_, _) => {}).trigger(Trigger.Continuous("1 second")).start()
[error]                      ^
[error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala:106: ambiguous reference to overloaded definition,
[error] both method foreachBatch in class DataStreamWriter of type (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[Int],Long])org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] and  method foreachBatch in class DataStreamWriter of type (function: (org.apache.spark.sql.Dataset[Int], Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] match argument types ((org.apache.spark.sql.Dataset[Int], Any) => Unit)
[error]       ds.writeStream.foreachBatch((_, _) => {}).partitionBy("value").start()
[error]                      ^
```

## How was this patch tested?

Manual.

Since this failure occurs in Scala-2.12 profile and test cases, Jenkins will not test this. We need to build with Scala-2.12 and run the tests.

Closes #22649 from dongjoon-hyun/SPARK-SCALA212.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2018-10-06 09:40:42 -07:00
gatorsmile 7ef65c0537 [HOT-FIX] Fix compilation errors. 2018-10-06 08:50:50 -07:00
gatorsmile 44cf800c83 [SPARK-25655][BUILD] Add -Pspark-ganglia-lgpl to the scala style check.
## What changes were proposed in this pull request?
Our lint failed due to the following errors:
```
[INFO] --- scalastyle-maven-plugin:1.0.0:check (default)  spark-ganglia-lgpl_2.11 ---
error file=/home/jenkins/workspace/spark-master-maven-snapshots/spark/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala message=
      Are you sure that you want to use toUpperCase or toLowerCase without the root locale? In most cases, you
      should use toUpperCase(Locale.ROOT) or toLowerCase(Locale.ROOT) instead.
      If you must use toUpperCase or toLowerCase without the root locale, wrap the code block with
      // scalastyle:off caselocale
      .toUpperCase
      .toLowerCase
      // scalastyle:on caselocale
     line=67 column=49
error file=/home/jenkins/workspace/spark-master-maven-snapshots/spark/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala message=
      Are you sure that you want to use toUpperCase or toLowerCase without the root locale? In most cases, you
      should use toUpperCase(Locale.ROOT) or toLowerCase(Locale.ROOT) instead.
      If you must use toUpperCase or toLowerCase without the root locale, wrap the code block with
      // scalastyle:off caselocale
      .toUpperCase
      .toLowerCase
      // scalastyle:on caselocale
     line=71 column=32
Saving to outputFile=/home/jenkins/workspace/spark-master-maven-snapshots/spark/external/spark-ganglia-lgpl/target/scalastyle-output.xml
```

See https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-lint/8890/

## How was this patch tested?
N/A

Closes #22647 from gatorsmile/fixLint.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-10-06 14:25:48 +08:00
Gengliang Wang 928d0739c4 [SPARK-25595] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled
## What changes were proposed in this pull request?

With flag `IGNORE_CORRUPT_FILES` enabled, schema inference should ignore corrupt Avro files, which is consistent with Parquet and Orc data source.

## How was this patch tested?

Unit test

Closes #22611 from gengliangwang/ignoreCorruptAvro.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-10-03 17:08:55 +08:00
gatorsmile 9bf397c0e4 [SPARK-25592] Setting version to 3.0.0-SNAPSHOT
## What changes were proposed in this pull request?

This patch is to bump the master branch version to 3.0.0-SNAPSHOT.

## How was this patch tested?
N/A

Closes #22606 from gatorsmile/bump3.0.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-10-02 08:48:24 -07:00
seancxmao 21f0b73dbc [SPARK-25453][SQL][TEST][.FFFFFFFFF] OracleIntegrationSuite IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss
## What changes were proposed in this pull request?
This PR aims to fix the failed test of `OracleIntegrationSuite`.

## How was this patch tested?
Existing integration tests.

Closes #22461 from seancxmao/SPARK-25453.

Authored-by: seancxmao <seancxmao@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-09-30 22:49:14 -07:00
Shixiong Zhu 66d29870c0
[SPARK-25495][SS] FetchedData.reset should reset all fields
## What changes were proposed in this pull request?

`FetchedData.reset` should reset `_nextOffsetInFetchedData` and `_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may make Kafka connector return wrong results.

## How was this patch tested?

The new unit test.

Closes #22507 from zsxwing/fix-kafka-reset.

Lead-authored-by: Shixiong Zhu <zsxwing@gmail.com>
Co-authored-by: Shixiong Zhu <shixiong@databricks.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-09-25 11:42:27 -07:00
Gengliang Wang 950ab79957 [SPARK-24777][SQL] Add write benchmark for AVRO
## What changes were proposed in this pull request?

Refactor `DataSourceWriteBenchmark` and add write benchmark for AVRO.

## How was this patch tested?

Build and run the benchmark.

Closes #22451 from gengliangwang/avroWriteBenchmark.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-09-20 17:41:24 -07:00
Wenchen Fan a71f6a1750 [SPARK-25414][SS][TEST] make it clear that the numRows metrics should be counted for each scan of the source
## What changes were proposed in this pull request?

For self-join/self-union, Spark will produce a physical plan which has multiple `DataSourceV2ScanExec` instances referring to the same `ReadSupport` instance. In this case, the streaming source is indeed scanned multiple times, and the `numInputRows` metrics should be counted for each scan.

Actually we already have 2 test cases to verify the behavior:
1. `StreamingQuerySuite.input row calculation with same V2 source used twice in self-join`
2. `KafkaMicroBatchSourceSuiteBase.ensure stream-stream self-join generates only one offset in log and correct metrics`.

However, in these 2 tests, the expected result is different, which is super confusing. It turns out that, the first test doesn't trigger exchange reuse, so the source is scanned twice. The second test triggers exchange reuse, and the source is scanned only once.

This PR proposes to improve these 2 tests, to test with/without exchange reuse.

## How was this patch tested?

test only change

Closes #22402 from cloud-fan/bug.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-09-20 00:29:48 +08:00
gatorsmile bb2f069cf2 [SPARK-25436] Bump master branch version to 2.5.0-SNAPSHOT
## What changes were proposed in this pull request?
In the dev list, we can still discuss whether the next version is 2.5.0 or 3.0.0. Let us first bump the master branch version to `2.5.0-SNAPSHOT`.

## How was this patch tested?
N/A

Closes #22426 from gatorsmile/bumpVersionMaster.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-09-15 16:24:02 -07:00
Kazuaki Ishizaki f60cd7cc3c
[SPARK-25338][TEST] Ensure to call super.beforeAll() and super.afterAll() in test cases
## What changes were proposed in this pull request?

This PR ensures to call `super.afterAll()` in `override afterAll()` method for test suites.

* Some suites did not call `super.afterAll()`
* Some suites may call `super.afterAll()` only under certain condition
* Others never call `super.afterAll()`.

This PR also ensures to call `super.beforeAll()` in `override beforeAll()` for test suites.

## How was this patch tested?

Existing UTs

Closes #22337 from kiszk/SPARK-25338.

Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2018-09-13 11:34:22 -07:00
Lee Dongjin 458f5011bd [MINOR][SS] Fix kafka-0-10-sql trivials
## What changes were proposed in this pull request?

Fix unused imports & outdated comments on `kafka-0-10-sql` module. (Found while I was working on [SPARK-23539](https://github.com/apache/spark/pull/22282))

## How was this patch tested?

Existing unit tests.

Closes #22342 from dongjinleekr/feature/fix-kafka-sql-trivials.

Authored-by: Lee Dongjin <dongjin@apache.org>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-09-07 10:36:15 -07:00
Shixiong Zhu 2119e518d3 [SPARK-25336][SS]Revert SPARK-24863 and SPARK-24748
## What changes were proposed in this pull request?

Revert SPARK-24863 (#21819) and SPARK-24748 (#21721) as per discussion in #21721. We will revisit them when the data source v2 APIs are out.

## How was this patch tested?

Jenkins

Closes #22334 from zsxwing/revert-SPARK-24863-SPARK-24748.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-09-05 13:39:34 +08:00
Shixiong Zhu aa70a0a1a4
[SPARK-25288][TESTS] Fix flaky Kafka transaction tests
## What changes were proposed in this pull request?

Here are the failures:

http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite&test_name=read+Kafka+transactional+messages%3A+read_committed
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed

I found the Kafka consumer may not see the committed messages for a short time. This PR just adds a new method `waitUntilOffsetAppears` and uses it to make sure the consumer can see a specified offset before checking the result.

## How was this patch tested?

Jenkins

Closes #22293 from zsxwing/SPARK-25288.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-30 23:23:11 -07:00
Reza Safi 135ff16a35 [SPARK-25233][STREAMING] Give the user the option of specifying a minimum message per partition per batch when using kafka direct API with backpressure
After SPARK-18371, it is guaranteed that there would be at least one message per partition per batch using direct kafka API when new messages exist in the topics. This change will give the user the option of setting the minimum instead of just a hard coded 1 limit
The related unit test is updated and some internal tests verified that the topic partitions with new messages will be progressed by the specified minimum.

Author: Reza Safi <rezasafi@cloudera.com>

Closes #22223 from rezasafi/streaminglag.
2018-08-30 13:26:03 -05:00
Arun Mahadevan 68ec207a32 [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType
## What changes were proposed in this pull request?

`toAvroType` converts spark data type to avro schema. It always appends the record name to namespace so its impossible to have an Avro namespace independent of the record name.

When invoked with a spark data type like,

```java
val sparkSchema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("address", StructType(Seq(
        StructField("city", StringType, nullable = false),
        StructField("state", StringType, nullable = false))),
    nullable = false)))

// map it to an avro schema with record name "employee" and top level namespace "foo.bar",
val avroSchema = SchemaConverters.toAvroType(sparkSchema,  false, "employee", "foo.bar")

// result is
// avroSchema.getName = employee
// avroSchema.getNamespace = foo.bar.employee
// avroSchema.getFullname = foo.bar.employee.employee
```
The patch proposes to fix this so that the result is

```
avroSchema.getName = employee
avroSchema.getNamespace = foo.bar
avroSchema.getFullname = foo.bar.employee
```
## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22251 from arunmahadevan/avro-fix.

Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-29 09:25:49 +08:00
Shixiong Zhu 1149c4efbc
[SPARK-25005][SS] Support non-consecutive offsets for Kafka
## What changes were proposed in this pull request?

As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support:

- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch

They are all covered by the new unit tests.

## How was this patch tested?

The new unit tests.

Closes #22042 from zsxwing/kafka-transaction-read.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-28 08:38:07 -07:00
Jose Torres 810d59ce44
[SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kafka tests.
## What changes were proposed in this pull request?

Fix flaky synchronization in Kafka tests - we need to use the scan config that was persisted rather than reconstructing it to identify the stream's current configuration.

We caught most instances of this in the original PR, but this one slipped through.

## How was this patch tested?

n/a

Closes #22245 from jose-torres/fixflake.

Authored-by: Jose Torres <torres.joseph.f+github@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-27 11:04:39 -07:00
Shixiong Zhu c17a8ff523
[SPARK-25214][SS][FOLLOWUP] Fix the issue that Kafka v2 source may return duplicated records when failOnDataLoss=false
## What changes were proposed in this pull request?

This is a follow up PR for #22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query.

## How was this patch tested?

Jenkins.

Closes #22230 from zsxwing/SPARK-25214-2.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-25 09:17:40 -07:00
Shixiong Zhu 8bb9414aaf
[SPARK-25214][SS] Fix the issue that Kafka v2 source may return duplicated records when failOnDataLoss=false
## What changes were proposed in this pull request?

When there are missing offsets, Kafka v2 source may return duplicated records when `failOnDataLoss=false` because it doesn't skip missing offsets.

This PR fixes the issue and also adds regression tests for all Kafka readers.

## How was this patch tested?

New tests.

Closes #22207 from zsxwing/SPARK-25214.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-24 12:00:34 -07:00
Gengliang Wang e3b7bb4132 [SPARK-24811][FOLLOWUP][SQL] Revise package of AvroDataToCatalyst and CatalystDataToAvro
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/21838, the class `AvroDataToCatalyst` and `CatalystDataToAvro` were put in package `org.apache.spark.sql`.
They should be moved to package  `org.apache.spark.sql.avro`.
Also optimize imports in Avro module.

## How was this patch tested?

Unit test

Closes #22196 from gengliangwang/avro_revise_package_name.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-23 15:08:46 +08:00
Takeshi Yamamuro 2a0a8f753b [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive table scan nodes
## What changes were proposed in this pull request?
This pr proposed to show RDD/relation names in RDD/Hive table scan nodes.
This change made these names show up in the webUI and explain results.
For example;
```
scala> sql("CREATE TABLE t(c1 int) USING hive")
scala> sql("INSERT INTO t VALUES(1)")
scala> spark.table("t").explain()
== Physical Plan ==
Scan hive default.t [c1#8], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#8]
         ^^^^^^^^^^^
```
<img width="212" alt="spark-pr-hive" src="https://user-images.githubusercontent.com/692303/44501013-51264c80-a6c6-11e8-94f8-0704aee83bb6.png">

Closes #20226

## How was this patch tested?
Added tests in `DataFrameSuite`, `DatasetSuite`, and `HiveExplainSuite`

Closes #22153 from maropu/pr20226.

Lead-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Co-authored-by: Tejas Patil <tejasp@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-23 14:26:10 +08:00
Tathagata Das 3106324986 [SPARK-25184][SS] Fixed race condition in StreamExecution that caused flaky test in FlatMapGroupsWithState
## What changes were proposed in this pull request?

The race condition that caused test failure is between 2 threads.
- The MicrobatchExecution thread that processes inputs to produce answers and then generates progress events.
- The test thread that generates some input data, checked the answer and then verified the query generated progress event.

The synchronization structure between these threads is as follows
1. MicrobatchExecution thread, in every batch, does the following in order.
   a. Processes batch input to generate answer.
   b. Signals `awaitProgressLockCondition` to wake up threads waiting for progress using `awaitOffset`
   c. Generates progress event

2. Test execution thread
   a. Calls `awaitOffset` to wait for progress, which waits on `awaitProgressLockCondition`.
   b. As soon as `awaitProgressLockCondition` is signaled, it would move on the in the test to check answer.
  c. Finally, it would verify the last generated progress event.

What can happen is the following sequence of events: 2a -> 1a -> 1b -> 2b -> 2c -> 1c.
In other words, the progress event may be generated after the test tries to verify it.

The solution has two steps.
1. Signal the waiting thread after the progress event has been generated, that is, after `finishTrigger()`.
2. Increase the timeout of `awaitProgressLockCondition.await(100 ms)` to a large value.

This latter is to ensure that test thread for keeps waiting on `awaitProgressLockCondition`until the MicroBatchExecution thread explicitly signals it. With the existing small timeout of 100ms the following sequence can occur.
 - MicroBatchExecution thread updates committed offsets
 - Test thread waiting on `awaitProgressLockCondition` accidentally times out after 100 ms, finds that the committed offsets have been updated, therefore returns from `awaitOffset` and moves on to the progress event tests.
 - MicroBatchExecution thread then generates progress event and signals. But the test thread has already attempted to verify the event and failed.

By increasing the timeout to large (e.g., `streamingTimeoutMs = 60 seconds`, similar to `awaitInitialization`), this above type of race condition is also avoided.

## How was this patch tested?
Ran locally many times.

Closes #22182 from tdas/SPARK-25184.

Authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
2018-08-22 12:22:53 -07:00
Wenchen Fan e754887182 [SPARK-24882][SQL] improve data source v2 API
## What changes were proposed in this pull request?

Improve the data source v2 API according to the [design doc](https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing)

summary of the changes
1. rename `ReadSupport` -> `DataSourceReader` -> `InputPartition` -> `InputPartitionReader` to `BatchReadSupportProvider` -> `BatchReadSupport` -> `InputPartition`/`PartitionReaderFactory` -> `PartitionReader`. Similar renaming also happens at streaming and write APIs.
2. create `ScanConfig` to store query specific information like operator pushdown result, streaming offsets, etc. This makes batch and streaming `ReadSupport`(previouslly named `DataSourceReader`) immutable. All other methods take `ScanConfig` as input, which implies applying operator pushdown and getting streaming offsets happen before all other things(get input partitions, report statistics, etc.).
3. separate `InputPartition` to `InputPartition` and `PartitionReaderFactory`. This is a natural separation, data splitting and reading are orthogonal and we should not mix them in one interfaces. This also makes the naming consistent between read and write API: `PartitionReaderFactory` vs `DataWriterFactory`.
4. separate the batch and streaming interfaces. Sometimes it's painful to force the streaming interface to extend batch interface, as we may need to override some batch methods to return false, or even leak the streaming concept to batch API(e.g. `DataWriterFactory#createWriter(partitionId, taskId, epochId)`)

Some follow-ups we should do after this PR (tracked by https://issues.apache.org/jira/browse/SPARK-25186 ):
1. Revisit the life cycle of `ReadSupport` instances. Currently I keep it same as the previous `DataSourceReader`, i.e. the life cycle is bound to the batch/stream query. This fits streaming very well but may not be perfect for batch source. We can also consider to let `ReadSupport.newScanConfigBuilder` take `DataSourceOptions` as parameter, if we decide to change the life cycle.
2. Add `WriteConfig`. This is similar to `ScanConfig` and makes the write API more flexible. But it's only needed when we add the `replaceWhere` support, and it needs to change the streaming execution engine for this new concept, which I think is better to be done in another PR.
3. Refine the document. This PR adds/changes a lot of document and it's very likely that some people may have better ideas.
4. Figure out the life cycle of `CustomMetrics`. It looks to me that it should be bound to a `ScanConfig`, but we need to change `ProgressReporter` to get the `ScanConfig`. Better to be done in another PR.
5. Better operator pushdown API. This PR keeps the pushdown API as it was, i.e. using the `SupportsPushdownXYZ` traits. We can design a better API using build pattern, but this is a complicated design and deserves an individual JIRA ticket and design doc.
6. Improve the continuous streaming engine to only create a new `ScanConfig` when re-configuring.
7. Remove `SupportsPushdownCatalystFilter`. This is actually not a must-have for file source, we can change the hive partition pruning to use the public `Filter`.

## How was this patch tested?

existing tests.

Closes #22009 from cloud-fan/redesign.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2018-08-22 00:10:55 -07:00
Gengliang Wang ac0174e55a [SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in module configurable
## What changes were proposed in this pull request?

In https://issues.apache.org/jira/browse/SPARK-24924, the data source provider com.databricks.spark.avro is mapped to the new package org.apache.spark.sql.avro .

As per the discussion in the [Jira](https://issues.apache.org/jira/browse/SPARK-24924) and PR #22119, we should make the mapping configurable.

This PR also improve the error message when data source of Avro/Kafka is not found.

## How was this patch tested?

Unit test

Closes #22133 from gengliangwang/configurable_avro_mapping.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2018-08-21 15:26:24 -07:00
Gengliang Wang 60af2501e1 [SPARK-25160][SQL] Avro: remove sql configuration spark.sql.avro.outputTimestampType
## What changes were proposed in this pull request?

In the PR for supporting logical timestamp types https://github.com/apache/spark/pull/21935, a SQL configuration spark.sql.avro.outputTimestampType is added, so that user can specify the output timestamp precision they want.

With PR https://github.com/apache/spark/pull/21847,  the output file can be written with user specified types.

So there is no need to have such trivial configuration. Otherwise to make it consistent we need to add configuration for all the Catalyst types that can be converted into different Avro types.

This PR also add a test case for user specified output schema with different timestamp types.

## How was this patch tested?

Unit test

Closes #22151 from gengliangwang/removeOutputTimestampType.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-20 20:42:27 +08:00
Arun Mahadevan 14d7c1c3e9 [SPARK-24863][SS] Report Kafka offset lag as a custom metrics
## What changes were proposed in this pull request?

This builds on top of SPARK-24748 to report 'offset lag' as a custom metrics for Kafka structured streaming source.

This lag is the difference between the latest offsets in Kafka the time the metrics is reported (just after a micro-batch completes) and the latest offset Spark has processed. It can be 0 (or close to 0) if spark keeps up with the rate at which messages are ingested into Kafka topics in steady state. This measures how far behind the spark source has fallen behind (per partition) and can aid in tuning the application.

## How was this patch tested?

Existing and new unit tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #21819 from arunmahadevan/SPARK-24863.

Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-18 17:31:52 +08:00
Shixiong Zhu da2dc69291
[SPARK-25116][TESTS] Fix the Kafka cluster leak and clean up cached producers
## What changes were proposed in this pull request?

KafkaContinuousSinkSuite leaks a Kafka cluster because both KafkaSourceTest and KafkaContinuousSinkSuite create a Kafka cluster but `afterAll` only shuts down one cluster. This leaks a Kafka cluster and causes that some Kafka thread crash and kill JVM when SBT is trying to clean up tests.

This PR fixes the leak and also adds a shut down hook to detect Kafka cluster leak.

In additions, it also fixes `AdminClient` leak and cleans up cached producers (When a record is writtn using a producer, the producer will keep refreshing the topic and I don't find an API to clear it except closing the producer) to eliminate the following annoying logs:
```
8/13 15:34:42.568 kafka-admin-client-thread | adminclient-4 WARN NetworkClient: [AdminClient clientId=adminclient-4] Connection to node 0 could not be established. Broker may not be available.
18/08/13 15:34:42.570 kafka-admin-client-thread | adminclient-6 WARN NetworkClient: [AdminClient clientId=adminclient-6] Connection to node 0 could not be established. Broker may not be available.
18/08/13 15:34:42.606 kafka-admin-client-thread | adminclient-8 WARN NetworkClient: [AdminClient clientId=adminclient-8] Connection to node -1 could not be established. Broker may not be available.
18/08/13 15:34:42.729 kafka-producer-network-thread | producer-797 WARN NetworkClient: [Producer clientId=producer-797] Connection to node -1 could not be established. Broker may not be available.
18/08/13 15:34:42.906 kafka-producer-network-thread | producer-1598 WARN NetworkClient: [Producer clientId=producer-1598] Connection to node 0 could not be established. Broker may not be available.
```

I also reverted b5eb54244e introduced by #22097 since it doesn't help.

## How was this patch tested?

Jenkins

Closes #22106 from zsxwing/SPARK-25116.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-17 14:21:08 -07:00
Sean Owen b3e6fe7c46 [SPARK-23654][BUILD] remove jets3t as a dependency of spark
## What changes were proposed in this pull request?

Remove jets3t dependency, and bouncy castle which it brings in; update licenses and deps
Note this just takes over https://github.com/apache/spark/pull/21146

## How was this patch tested?

Existing tests.

Closes #22081 from srowen/SPARK-23654.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-16 12:34:23 -07:00
Shixiong Zhu 80784a1de8
[SPARK-18057][FOLLOW-UP] Use 127.0.0.1 to avoid zookeeper picking up an ipv6 address
## What changes were proposed in this pull request?

I'm still seeing the Kafka tests failed randomly due to `kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING`. I checked the test output and saw zookeeper picked up an ipv6 address. Most details can be found in https://issues.apache.org/jira/browse/KAFKA-7193

This PR just uses `127.0.0.1` rather than `localhost` to make sure zookeeper will never use an ipv6 address.

## How was this patch tested?

Jenkins

Closes #22097 from zsxwing/fix-zookeeper-connect.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-14 09:57:01 -07:00
Gengliang Wang ab197308a7
[SPARK-25104][SQL] Avro: Validate user specified output schema
## What changes were proposed in this pull request?

With code changes in https://github.com/apache/spark/pull/21847 , Spark can write out to Avro file as per user provided output schema.

To make it more robust and user friendly, we should validate the Avro schema before tasks launched.

Also we should support output logical decimal type as BYTES (By default we output as FIXED)

## How was this patch tested?

Unit test

Closes #22094 from gengliangwang/AvroSerializerMatch.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2018-08-14 04:43:14 +00:00
Gengliang Wang 26775e3c8e [SPARK-25099][SQL][TEST] Generate Avro Binary files in test suite
## What changes were proposed in this pull request?

In PR https://github.com/apache/spark/pull/21984 and https://github.com/apache/spark/pull/21935 , the related test cases are using binary files created by Python scripts.

Generate the binary files in test suite to make it more transparent.  Also we can

Also move the related test cases to a new file `AvroLogicalTypeSuite.scala`.

## How was this patch tested?

Unit test.

Closes #22091 from gengliangwang/logicalType_suite.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-13 20:50:28 +08:00
Gengliang Wang be2238fb50 [SPARK-24774][SQL] Avro: Support logical decimal type
## What changes were proposed in this pull request?

Support Avro logical date type:
https://avro.apache.org/docs/1.8.2/spec.html#Decimal

## How was this patch tested?
Unit test

Closes #22037 from gengliangwang/avro_decimal.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-13 08:29:07 +08:00
Brian Lindblom 0cea9e3cd0
[SPARK-24855][SQL][EXTERNAL] Built-in AVRO support should support specified schema on write
## What changes were proposed in this pull request?

Allows `avroSchema` option to be specified on write, allowing a user to specify a schema in cases where this is required.  A trivial use case is reading in an avro dataset, making some small adjustment to a column or columns and writing out using the same schema.  Implicit schema creation from SQL Struct results in a schema that while for the most part, is functionally similar, is not necessarily compatible.

Allows `fixed` Field type to be utilized for records of specified `avroSchema`

## How was this patch tested?

Unit tests in AvroSuite are extended to test this with enum and fixed types.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #21847 from lindblombr/specify_schema_on_write.

Lead-authored-by: Brian Lindblom <blindblom@apple.com>
Co-authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2018-08-10 03:35:29 +00:00
Kazuaki Ishizaki 56e9e97073 [MINOR][DOC] Fix typo
## What changes were proposed in this pull request?

This PR fixes typo regarding `auxiliary verb + verb[s]`. This is a follow-on of #21956.

## How was this patch tested?

N/A

Closes #22040 from kiszk/spellcheck1.

Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-09 20:10:17 +08:00
Gengliang Wang 819c4de45a [SPARK-24772][SQL] Avro: support logical date type
## What changes were proposed in this pull request?

Support Avro logical date type:
https://avro.apache.org/docs/1.8.2/spec.html#Date

## How was this patch tested?

Unit test

Closes #21984 from gengliangwang/avro_date.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-07 17:24:25 +08:00
Wenchen Fan ac527b5205 [SPARK-24991][SQL] use InternalRow in DataSourceWriter
## What changes were proposed in this pull request?

A follow up of #21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21948 from cloud-fan/row-write.
2018-08-06 15:52:01 +08:00
Yuval Itzchakov b7fdf8eb20 [SPARK-24987][SS] - Fix Kafka consumer leak when no new offsets for TopicPartition
## What changes were proposed in this pull request?

This small fix adds a `consumer.release()` call to `KafkaSourceRDD` in the case where we've retrieved offsets from Kafka, but the `fromOffset` is equal to the `lastOffset`, meaning there is no new data to read for a particular topic partition. Up until now, we'd just return an empty iterator without closing the consumer which would cause a FD leak.

If accepted, this pull request should be merged into master as well.

## How was this patch tested?

Haven't ran any specific tests, would love help on how to test methods running inside `RDD.compute`.

Author: Yuval Itzchakov <yuval.itzchakov@clicktale.com>

Closes #21997 from YuvalItzchakov/master.
2018-08-04 14:44:10 -05:00
Sean Owen 4c27663cb2
[SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
## What changes were proposed in this pull request?

Increase ZK timeout and harmonize configs across Kafka tests to resol…ve potentially flaky test failure

## How was this patch tested?

Existing tests

Author: Sean Owen <srowen@gmail.com>

Closes #21995 from srowen/SPARK-18057.3.
2018-08-03 16:22:54 -07:00
Sean Owen c32dbd6bd5 [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
## What changes were proposed in this pull request?

Update to kafka 2.0.0 in streaming-kafka module, and remove override for Scala 2.12. It won't compile for 2.12 otherwise.

## How was this patch tested?

Existing tests.

Author: Sean Owen <srowen@gmail.com>

Closes #21955 from srowen/SPARK-18057.2.
2018-08-03 08:17:18 -05:00
DB Tsai 273b28404c
[SPARK-24993][SQL] Make Avro Fast Again
## What changes were proposed in this pull request?

When lindblombr at apple developed [SPARK-24855](https://github.com/apache/spark/pull/21847) to support specified schema on write, we found a performance regression in Avro writer for our dataset.

With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further.

Spark 2.4
```
spark git:(master) ./build/mvn -DskipTests clean package
spark git:(master) bin/spark-shell --jars external/avro/target/spark-avro_2.11-2.4.0-SNAPSHOT.jar
```

Spark 2.3 + databricks avro
```
spark git:(branch-2.3) ./build/mvn -DskipTests clean package
spark git:(branch-2.3) bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0
```

Current master:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|             2.95621|
| stddev|0.030895815479469294|
|    min|               2.915|
|    max|               3.049|
+-------+--------------------+

+-------+--------------------+
|summary|           readTimes|
+-------+--------------------+
|  count|                 100|
|   mean| 0.31072999999999995|
| stddev|0.054139709842390006|
|    min|               0.259|
|    max|               0.692|
+-------+--------------------+
```

Current master with this PR:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|  2.5804300000000002|
| stddev|0.011175600225672079|
|    min|               2.558|
|    max|                2.62|
+-------+--------------------+

+-------+--------------------+
|summary|           readTimes|
+-------+--------------------+
|  count|                 100|
|   mean| 0.29922000000000004|
| stddev|0.058261961532514166|
|    min|               0.251|
|    max|               0.732|
+-------+--------------------+
```

Spark 2.3 + databricks avro:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|  1.7730500000000005|
| stddev|0.025199156230863575|
|    min|               1.729|
|    max|               1.833|
+-------+--------------------+

+-------+-------------------+
|summary|          readTimes|
+-------+-------------------+
|  count|                100|
|   mean|            0.29715|
| stddev|0.05685643358850465|
|    min|              0.258|
|    max|              0.718|
+-------+-------------------+
```

The following is the test code to reproduce the result.
```scala
    spark.sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
    val sparkSession = spark
    import sparkSession.implicits._
    val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid =>
      val features = Array.fill(16000)(scala.math.random)
      (uid, scala.math.random, java.util.UUID.randomUUID().toString, java.util.UUID.randomUUID().toString, features)
    }.toDF("uid", "random", "uuid1", "uuid2", "features").cache()
    val size = df.count()

    // Write into ramdisk to rule out the disk IO impact
    val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/"
    val n = 150
    val writeTimes = new Array[Double](n)
    var i = 0
    while (i < n) {
      val t1 = System.currentTimeMillis()
      df.write
        .format("com.databricks.spark.avro")
        .mode("overwrite")
        .save(tempSaveDir)
      val t2 = System.currentTimeMillis()
      writeTimes(i) = (t2 - t1) / 1000.0
      i += 1
    }

    df.unpersist()

    // The first 50 runs are for warm-up
    val readTimes = new Array[Double](n)
    i = 0
    while (i < n) {
      val t1 = System.currentTimeMillis()
      val readDF = spark.read.format("com.databricks.spark.avro").load(tempSaveDir)
      assert(readDF.count() == size)
      val t2 = System.currentTimeMillis()
      readTimes(i) = (t2 - t1) / 1000.0
      i += 1
    }

    spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show()
    spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show()
```

## How was this patch tested?

Existing tests.

Author: DB Tsai <d_tsai@apple.com>
Author: Brian Lindblom <blindblom@apple.com>

Closes #21952 from dbtsai/avro-performance-fix.
2018-08-03 07:43:54 +00:00
Gengliang Wang f45d60a5a1 [SPARK-25002][SQL] Avro: revise the output record namespace
## What changes were proposed in this pull request?

Currently the output namespace is starting with ".", e.g. `.topLevelRecord`

Although it is valid according to Avro spec, we should remove the starting dot in case of failures when the output Avro file is read by other lib:

https://github.com/linkedin/goavro/pull/96

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21974 from gengliangwang/avro_namespace.
2018-08-03 13:28:44 +08:00
Gengliang Wang 7cf16a7fa4 [SPARK-24773] Avro: support logical timestamp type with different precisions
## What changes were proposed in this pull request?

Support reading/writing Avro logical timestamp type with different precisions
https://avro.apache.org/docs/1.8.2/spec.html#Timestamp+%28millisecond+precision%29

To specify the output timestamp type, use Dataframe option `outputTimestampType`  or SQL config `spark.sql.avro.outputTimestampType`.  The supported values are
* `TIMESTAMP_MICROS`
* `TIMESTAMP_MILLIS`

The default output type is `TIMESTAMP_MICROS`
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21935 from gengliangwang/avro_timestamp.
2018-08-03 08:32:08 +08:00
Stavros Kontopoulos a65736996b [SPARK-14540][CORE] Fix remaining major issues for Scala 2.12 Support
## What changes were proposed in this pull request?
This PR addresses issues 2,3 in this [document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk).

* We modified the closure cleaner to identify closures that are implemented via the LambdaMetaFactory mechanism (serializedLambdas) (issue2).

* We also fix the issue due to scala/bug#11016. There are two options for solving the Unit issue, either add () at the end of the closure or use the trick described in the doc. Otherwise overloading resolution does not work (we are not going to eliminate either of the methods) here. Compiler tries to adapt to Unit and makes these two methods candidates for overloading, when there is polymorphic overloading there is no ambiguity (that is the workaround implemented). This does not look that good but it serves its purpose as we need to support two different uses for method: `addTaskCompletionListener`. One that passes a TaskCompletionListener and one that passes a closure that is wrapped with a TaskCompletionListener later on (issue3).

Note: regarding issue 1 in the doc the plan is:

> Do Nothing. Don’t try to fix this as this is only a problem for Java users who would want to use 2.11 binaries. In that case they can cast to MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be simplified so that this issue is removed.

## How was this patch tested?
This was manually tested:
```./dev/change-scala-version.sh 2.12
./build/mvn -DskipTests -Pscala-2.12 clean package
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None```

Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>

Closes #21930 from skonto/scala2.12-sup.
2018-08-02 09:17:09 -05:00
tedyu e82784d13f [SPARK-18057][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
## What changes were proposed in this pull request?

This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated.

## How was this patch tested?

This PR uses existing Kafka related unit tests

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: tedyu <yuzhihong@gmail.com>

Closes #21488 from tedyu/master.
2018-07-31 13:14:14 -07:00
Maxim Gekk d20c10fdf3 [SPARK-24952][SQL] Support LZMA2 compression by Avro datasource
## What changes were proposed in this pull request?

In the PR, I propose to support `LZMA2` (`XZ`) and `BZIP2` compressions by `AVRO` datasource  in write since the codecs may have better characteristics like compression ratio and speed comparing to already supported `snappy` and `deflate` codecs.

## How was this patch tested?

It was tested manually and by an existing test which was extended to check the `xz` and `bzip2` compressions.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21902 from MaxGekk/avro-xz-bzip2.
2018-07-31 09:12:57 +08:00
Takeshi Yamamuro 47d84e4d0e [SPARK-22814][SQL] Support Date/Timestamp in a JDBC partition column
## What changes were proposed in this pull request?
This pr supported Date/Timestamp in a JDBC partition column (a numeric column is only supported in the master). This pr also modified code to verify a partition column type;
```
val jdbcTable = spark.read
 .option("partitionColumn", "text")
 .option("lowerBound", "aaa")
 .option("upperBound", "zzz")
 .option("numPartitions", 2)
 .jdbc("jdbc:postgresql:postgres", "t", options)

// with this pr
org.apache.spark.sql.AnalysisException: Partition column type should be numeric, date, or timestamp, but string found.;
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.verifyAndGetNormalizedPartitionColumn(JDBCRelation.scala:165)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.columnPartition(JDBCRelation.scala:85)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317)

// without this pr
java.lang.NumberFormatException: For input string: "aaa"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Long.parseLong(Long.java:589)
  at java.lang.Long.parseLong(Long.java:631)
  at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
```

Closes #19999

## How was this patch tested?
Added tests in `JDBCSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21834 from maropu/SPARK-22814.
2018-07-30 07:42:00 -07:00
hyukjinkwon fca0b8528e [SPARK-24967][SQL] Avro: Use internal.Logging instead for logging
## What changes were proposed in this pull request?

Looks Avro uses direct `getLogger` to create a SLF4J logger. Should better use `internal.Logging` instead.

## How was this patch tested?

Exiting tests.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21914 from HyukjinKwon/avro-log.
2018-07-30 21:13:08 +08:00
Xiao Li c6a3db2fb6 [SPARK-24924][SQL][FOLLOW-UP] Add mapping for built-in Avro data source
## What changes were proposed in this pull request?
Add one more test case for `com.databricks.spark.avro`.

## How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21906 from gatorsmile/avro.
2018-07-28 13:43:32 +08:00
Maxim Gekk 0a0f68bae6 [SPARK-24881][SQL] New Avro option - compression
## What changes were proposed in this pull request?

In the PR, I added new option for Avro datasource - `compression`. The option allows to specify compression codec for saved Avro files. This option is similar to `compression` option in another datasources like `JSON` and `CSV`.

Also I added the SQL configs `spark.sql.avro.compression.codec` and `spark.sql.avro.deflate.level`. I put the configs into `SQLConf`. If the `compression` option is not specified by an user, the first SQL config is taken into account.

## How was this patch tested?

I added new test which read meta info from written avro files and checks `avro.codec` property.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21837 from MaxGekk/avro-compression.
2018-07-28 00:11:32 +08:00
Gengliang Wang fa09d91925 [SPARK-24919][BUILD] New linter rule for sparkContext.hadoopConfiguration
## What changes were proposed in this pull request?

In most cases, we should use `spark.sessionState.newHadoopConf()` instead of `sparkContext.hadoopConfiguration`, so that the hadoop configurations specified in Spark session
configuration will come into effect.

Add a rule matching `spark.sparkContext.hadoopConfiguration` or `spark.sqlContext.sparkContext.hadoopConfiguration` to prevent the usage.
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21873 from gengliangwang/linterRule.
2018-07-26 16:50:59 -07:00
Dongjoon Hyun 58353d7f4b [SPARK-24924][SQL] Add mapping for built-in Avro data source
## What changes were proposed in this pull request?

This PR aims to the followings.
1. Like `com.databricks.spark.csv` mapping, we had better map `com.databricks.spark.avro` to built-in Avro data source.
2. Remove incorrect error message, `Please find an Avro package at ...`.

## How was this patch tested?

Pass the newly added tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21878 from dongjoon-hyun/SPARK-24924.
2018-07-26 16:11:03 +08:00
Gengliang Wang c44eb561ec [SPARK-24768][FOLLOWUP][SQL] Avro migration followup: change artifactId to spark-avro
## What changes were proposed in this pull request?
After rethinking on the artifactId, I think it should be `spark-avro` instead of `spark-sql-avro`, which is simpler, and consistent with the previous artifactId. I think we need to change it before Spark 2.4 release.

Also a tiny change: use `spark.sessionState.newHadoopConf()` to get the hadoop configuration, thus the related hadoop configurations in SQLConf will come into effect.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21866 from gengliangwang/avro_followup.
2018-07-25 08:42:45 -07:00
Ryan Blue 9d27541a85 [SPARK-23325] Use InternalRow when reading with DataSourceV2.
## What changes were proposed in this pull request?

This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.

Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.

## How was this patch tested?

This uses existing tests.

Author: Ryan Blue <blue@apache.org>

Closes #21118 from rdblue/SPARK-23325-datasource-v2-internal-row.
2018-07-24 10:46:36 -07:00
Gengliang Wang 08e315f633 [SPARK-24887][SQL] Avro: use SerializableConfiguration in Spark utils to deduplicate code
## What changes were proposed in this pull request?

To implement the method `buildReader` in `FileFormat`, it is required to serialize the hadoop configuration for executors.

Previous spark-avro uses its own class `SerializableConfiguration` for the serialization. As now it is part of Spark, we can use SerializableConfiguration in Spark util to deduplicate the code.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21846 from gengliangwang/removeSerializableConfiguration.
2018-07-23 08:31:48 -07:00
Gengliang Wang f59de52a2a [SPARK-24883][SQL] Avro: remove implicit class AvroDataFrameWriter/AvroDataFrameReader
## What changes were proposed in this pull request?

As per Reynold's comment: https://github.com/apache/spark/pull/21742#discussion_r203496489

It makes sense to remove the implicit class AvroDataFrameWriter/AvroDataFrameReader, since the Avro package is external module.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21841 from gengliangwang/removeImplicit.
2018-07-23 15:27:33 +08:00
Gengliang Wang 8817c68f50 [SPARK-24811][SQL] Avro: add new function from_avro and to_avro
## What changes were proposed in this pull request?

1. Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

2. Add a new function to_avro for converting a column into binary of avro format with the specified schema.

I created #21774 for this, but it failed the build https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/

Additional changes In this PR:
1. Add `scalacheck` dependency in pom.xml to resolve the failure.
2. Update the `log4j.properties` to make it consistent with other modules.

## How was this patch tested?

Unit test
Compile with different commands:
```
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
```

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21838 from gengliangwang/from_and_to_avro.
2018-07-22 17:36:57 -07:00
Maxim Gekk 106880edcd [SPARK-24836][SQL] New option for Avro datasource - ignoreExtension
## What changes were proposed in this pull request?

I propose to add new option for AVRO datasource which should control ignoring of files without `.avro` extension in read. The option has name `ignoreExtension` with default value `true`. If both options `ignoreExtension` and `avro.mapred.ignore.inputs.without.extension` are set, `ignoreExtension` overrides the former one. Here is an example of usage:

```
spark
  .read
  .option("ignoreExtension", false)
  .avro("path to avro files")
```

## How was this patch tested?

I added a test which checks the option directly and a test for checking that new option overrides hadoop's config.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21798 from MaxGekk/avro-ignore-extension.
2018-07-20 20:04:40 -07:00
Gengliang Wang 00b864aa70 [SPARK-24876][SQL] Avro: simplify schema serialization
## What changes were proposed in this pull request?

Previously in the refactoring of Avro Serializer and Deserializer, a new class SerializableSchema is created for serializing the Avro schema:
https://github.com/apache/spark/pull/21762/files#diff-01fea32e6ec6bcf6f34d06282e08705aR37

On second thought, we can use `toString` method for serialization. After that, parse the JSON format schema on executor. This makes the code much simpler.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21829 from gengliangwang/removeSerializableSchema.
2018-07-20 14:57:59 -07:00
Xiao Li 9ad77b3037 Revert "[SPARK-24811][SQL] Avro: add new function from_avro and to_avro"
This reverts commit 244bcff194.
2018-07-20 12:55:38 -07:00
Gengliang Wang 244bcff194 [SPARK-24811][SQL] Avro: add new function from_avro and to_avro
## What changes were proposed in this pull request?

Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

Add a new function to_avro for converting a column into binary of avro format with the specified schema.

This PR is in progress. Will add test cases.
## How was this patch tested?

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21774 from gengliangwang/from_and_to_avro.
2018-07-20 09:19:29 -07:00
Marco Gaido a5925c1631 [SPARK-24268][SQL] Use datatype.catalogString in error messages
## What changes were proposed in this pull request?

As stated in https://github.com/apache/spark/pull/21321, in the error messages we should use `catalogString`. This is not the case, as SPARK-22893 used `simpleString` in order to have the same representation everywhere and it missed some places.

The PR unifies the messages using alway the `catalogString` representation of the dataTypes in the messages.

## How was this patch tested?

existing/modified UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21804 from mgaido91/SPARK-24268_catalog.
2018-07-19 23:29:29 -07:00
Maxim Gekk cd5d93c0e4 [SPARK-24854][SQL] Gathering all Avro options into the AvroOptions class
## What changes were proposed in this pull request?

In the PR, I propose to put all `Avro` options in new class `AvroOptions` in the same way as for other datasources `JSON` and `CSV`.

## How was this patch tested?

It was tested by `AvroSuite`

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21810 from MaxGekk/avro-options.
2018-07-19 09:16:16 +08:00