Commit graph

568 commits

Author SHA1 Message Date
Shixiong Zhu 2119e518d3 [SPARK-25336][SS]Revert SPARK-24863 and SPARK-24748
## What changes were proposed in this pull request?

Revert SPARK-24863 (#21819) and SPARK-24748 (#21721) as per discussion in #21721. We will revisit them when the data source v2 APIs are out.

## How was this patch tested?

Jenkins

Closes #22334 from zsxwing/revert-SPARK-24863-SPARK-24748.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-09-05 13:39:34 +08:00
Shixiong Zhu aa70a0a1a4
[SPARK-25288][TESTS] Fix flaky Kafka transaction tests
## What changes were proposed in this pull request?

Here are the failures:

http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite&test_name=read+Kafka+transactional+messages%3A+read_committed
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed

I found the Kafka consumer may not see the committed messages for a short time. This PR just adds a new method `waitUntilOffsetAppears` and uses it to make sure the consumer can see a specified offset before checking the result.

## How was this patch tested?

Jenkins

Closes #22293 from zsxwing/SPARK-25288.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-30 23:23:11 -07:00
Reza Safi 135ff16a35 [SPARK-25233][STREAMING] Give the user the option of specifying a minimum message per partition per batch when using kafka direct API with backpressure
After SPARK-18371, it is guaranteed that there would be at least one message per partition per batch using direct kafka API when new messages exist in the topics. This change will give the user the option of setting the minimum instead of just a hard coded 1 limit
The related unit test is updated and some internal tests verified that the topic partitions with new messages will be progressed by the specified minimum.

Author: Reza Safi <rezasafi@cloudera.com>

Closes #22223 from rezasafi/streaminglag.
2018-08-30 13:26:03 -05:00
Arun Mahadevan 68ec207a32 [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType
## What changes were proposed in this pull request?

`toAvroType` converts spark data type to avro schema. It always appends the record name to namespace so its impossible to have an Avro namespace independent of the record name.

When invoked with a spark data type like,

```java
val sparkSchema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("address", StructType(Seq(
        StructField("city", StringType, nullable = false),
        StructField("state", StringType, nullable = false))),
    nullable = false)))

// map it to an avro schema with record name "employee" and top level namespace "foo.bar",
val avroSchema = SchemaConverters.toAvroType(sparkSchema,  false, "employee", "foo.bar")

// result is
// avroSchema.getName = employee
// avroSchema.getNamespace = foo.bar.employee
// avroSchema.getFullname = foo.bar.employee.employee
```
The patch proposes to fix this so that the result is

```
avroSchema.getName = employee
avroSchema.getNamespace = foo.bar
avroSchema.getFullname = foo.bar.employee
```
## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22251 from arunmahadevan/avro-fix.

Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-29 09:25:49 +08:00
Shixiong Zhu 1149c4efbc
[SPARK-25005][SS] Support non-consecutive offsets for Kafka
## What changes were proposed in this pull request?

As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support:

- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch

They are all covered by the new unit tests.

## How was this patch tested?

The new unit tests.

Closes #22042 from zsxwing/kafka-transaction-read.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-28 08:38:07 -07:00
Jose Torres 810d59ce44
[SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kafka tests.
## What changes were proposed in this pull request?

Fix flaky synchronization in Kafka tests - we need to use the scan config that was persisted rather than reconstructing it to identify the stream's current configuration.

We caught most instances of this in the original PR, but this one slipped through.

## How was this patch tested?

n/a

Closes #22245 from jose-torres/fixflake.

Authored-by: Jose Torres <torres.joseph.f+github@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-27 11:04:39 -07:00
Shixiong Zhu c17a8ff523
[SPARK-25214][SS][FOLLOWUP] Fix the issue that Kafka v2 source may return duplicated records when failOnDataLoss=false
## What changes were proposed in this pull request?

This is a follow up PR for #22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query.

## How was this patch tested?

Jenkins.

Closes #22230 from zsxwing/SPARK-25214-2.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-25 09:17:40 -07:00
Shixiong Zhu 8bb9414aaf
[SPARK-25214][SS] Fix the issue that Kafka v2 source may return duplicated records when failOnDataLoss=false
## What changes were proposed in this pull request?

When there are missing offsets, Kafka v2 source may return duplicated records when `failOnDataLoss=false` because it doesn't skip missing offsets.

This PR fixes the issue and also adds regression tests for all Kafka readers.

## How was this patch tested?

New tests.

Closes #22207 from zsxwing/SPARK-25214.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-24 12:00:34 -07:00
Gengliang Wang e3b7bb4132 [SPARK-24811][FOLLOWUP][SQL] Revise package of AvroDataToCatalyst and CatalystDataToAvro
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/21838, the class `AvroDataToCatalyst` and `CatalystDataToAvro` were put in package `org.apache.spark.sql`.
They should be moved to package  `org.apache.spark.sql.avro`.
Also optimize imports in Avro module.

## How was this patch tested?

Unit test

Closes #22196 from gengliangwang/avro_revise_package_name.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-23 15:08:46 +08:00
Takeshi Yamamuro 2a0a8f753b [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive table scan nodes
## What changes were proposed in this pull request?
This pr proposed to show RDD/relation names in RDD/Hive table scan nodes.
This change made these names show up in the webUI and explain results.
For example;
```
scala> sql("CREATE TABLE t(c1 int) USING hive")
scala> sql("INSERT INTO t VALUES(1)")
scala> spark.table("t").explain()
== Physical Plan ==
Scan hive default.t [c1#8], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#8]
         ^^^^^^^^^^^
```
<img width="212" alt="spark-pr-hive" src="https://user-images.githubusercontent.com/692303/44501013-51264c80-a6c6-11e8-94f8-0704aee83bb6.png">

Closes #20226

## How was this patch tested?
Added tests in `DataFrameSuite`, `DatasetSuite`, and `HiveExplainSuite`

Closes #22153 from maropu/pr20226.

Lead-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Co-authored-by: Tejas Patil <tejasp@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-23 14:26:10 +08:00
Tathagata Das 3106324986 [SPARK-25184][SS] Fixed race condition in StreamExecution that caused flaky test in FlatMapGroupsWithState
## What changes were proposed in this pull request?

The race condition that caused test failure is between 2 threads.
- The MicrobatchExecution thread that processes inputs to produce answers and then generates progress events.
- The test thread that generates some input data, checked the answer and then verified the query generated progress event.

The synchronization structure between these threads is as follows
1. MicrobatchExecution thread, in every batch, does the following in order.
   a. Processes batch input to generate answer.
   b. Signals `awaitProgressLockCondition` to wake up threads waiting for progress using `awaitOffset`
   c. Generates progress event

2. Test execution thread
   a. Calls `awaitOffset` to wait for progress, which waits on `awaitProgressLockCondition`.
   b. As soon as `awaitProgressLockCondition` is signaled, it would move on the in the test to check answer.
  c. Finally, it would verify the last generated progress event.

What can happen is the following sequence of events: 2a -> 1a -> 1b -> 2b -> 2c -> 1c.
In other words, the progress event may be generated after the test tries to verify it.

The solution has two steps.
1. Signal the waiting thread after the progress event has been generated, that is, after `finishTrigger()`.
2. Increase the timeout of `awaitProgressLockCondition.await(100 ms)` to a large value.

This latter is to ensure that test thread for keeps waiting on `awaitProgressLockCondition`until the MicroBatchExecution thread explicitly signals it. With the existing small timeout of 100ms the following sequence can occur.
 - MicroBatchExecution thread updates committed offsets
 - Test thread waiting on `awaitProgressLockCondition` accidentally times out after 100 ms, finds that the committed offsets have been updated, therefore returns from `awaitOffset` and moves on to the progress event tests.
 - MicroBatchExecution thread then generates progress event and signals. But the test thread has already attempted to verify the event and failed.

By increasing the timeout to large (e.g., `streamingTimeoutMs = 60 seconds`, similar to `awaitInitialization`), this above type of race condition is also avoided.

## How was this patch tested?
Ran locally many times.

Closes #22182 from tdas/SPARK-25184.

Authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
2018-08-22 12:22:53 -07:00
Wenchen Fan e754887182 [SPARK-24882][SQL] improve data source v2 API
## What changes were proposed in this pull request?

Improve the data source v2 API according to the [design doc](https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing)

summary of the changes
1. rename `ReadSupport` -> `DataSourceReader` -> `InputPartition` -> `InputPartitionReader` to `BatchReadSupportProvider` -> `BatchReadSupport` -> `InputPartition`/`PartitionReaderFactory` -> `PartitionReader`. Similar renaming also happens at streaming and write APIs.
2. create `ScanConfig` to store query specific information like operator pushdown result, streaming offsets, etc. This makes batch and streaming `ReadSupport`(previouslly named `DataSourceReader`) immutable. All other methods take `ScanConfig` as input, which implies applying operator pushdown and getting streaming offsets happen before all other things(get input partitions, report statistics, etc.).
3. separate `InputPartition` to `InputPartition` and `PartitionReaderFactory`. This is a natural separation, data splitting and reading are orthogonal and we should not mix them in one interfaces. This also makes the naming consistent between read and write API: `PartitionReaderFactory` vs `DataWriterFactory`.
4. separate the batch and streaming interfaces. Sometimes it's painful to force the streaming interface to extend batch interface, as we may need to override some batch methods to return false, or even leak the streaming concept to batch API(e.g. `DataWriterFactory#createWriter(partitionId, taskId, epochId)`)

Some follow-ups we should do after this PR (tracked by https://issues.apache.org/jira/browse/SPARK-25186 ):
1. Revisit the life cycle of `ReadSupport` instances. Currently I keep it same as the previous `DataSourceReader`, i.e. the life cycle is bound to the batch/stream query. This fits streaming very well but may not be perfect for batch source. We can also consider to let `ReadSupport.newScanConfigBuilder` take `DataSourceOptions` as parameter, if we decide to change the life cycle.
2. Add `WriteConfig`. This is similar to `ScanConfig` and makes the write API more flexible. But it's only needed when we add the `replaceWhere` support, and it needs to change the streaming execution engine for this new concept, which I think is better to be done in another PR.
3. Refine the document. This PR adds/changes a lot of document and it's very likely that some people may have better ideas.
4. Figure out the life cycle of `CustomMetrics`. It looks to me that it should be bound to a `ScanConfig`, but we need to change `ProgressReporter` to get the `ScanConfig`. Better to be done in another PR.
5. Better operator pushdown API. This PR keeps the pushdown API as it was, i.e. using the `SupportsPushdownXYZ` traits. We can design a better API using build pattern, but this is a complicated design and deserves an individual JIRA ticket and design doc.
6. Improve the continuous streaming engine to only create a new `ScanConfig` when re-configuring.
7. Remove `SupportsPushdownCatalystFilter`. This is actually not a must-have for file source, we can change the hive partition pruning to use the public `Filter`.

## How was this patch tested?

existing tests.

Closes #22009 from cloud-fan/redesign.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2018-08-22 00:10:55 -07:00
Gengliang Wang ac0174e55a [SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in module configurable
## What changes were proposed in this pull request?

In https://issues.apache.org/jira/browse/SPARK-24924, the data source provider com.databricks.spark.avro is mapped to the new package org.apache.spark.sql.avro .

As per the discussion in the [Jira](https://issues.apache.org/jira/browse/SPARK-24924) and PR #22119, we should make the mapping configurable.

This PR also improve the error message when data source of Avro/Kafka is not found.

## How was this patch tested?

Unit test

Closes #22133 from gengliangwang/configurable_avro_mapping.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2018-08-21 15:26:24 -07:00
Gengliang Wang 60af2501e1 [SPARK-25160][SQL] Avro: remove sql configuration spark.sql.avro.outputTimestampType
## What changes were proposed in this pull request?

In the PR for supporting logical timestamp types https://github.com/apache/spark/pull/21935, a SQL configuration spark.sql.avro.outputTimestampType is added, so that user can specify the output timestamp precision they want.

With PR https://github.com/apache/spark/pull/21847,  the output file can be written with user specified types.

So there is no need to have such trivial configuration. Otherwise to make it consistent we need to add configuration for all the Catalyst types that can be converted into different Avro types.

This PR also add a test case for user specified output schema with different timestamp types.

## How was this patch tested?

Unit test

Closes #22151 from gengliangwang/removeOutputTimestampType.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-20 20:42:27 +08:00
Arun Mahadevan 14d7c1c3e9 [SPARK-24863][SS] Report Kafka offset lag as a custom metrics
## What changes were proposed in this pull request?

This builds on top of SPARK-24748 to report 'offset lag' as a custom metrics for Kafka structured streaming source.

This lag is the difference between the latest offsets in Kafka the time the metrics is reported (just after a micro-batch completes) and the latest offset Spark has processed. It can be 0 (or close to 0) if spark keeps up with the rate at which messages are ingested into Kafka topics in steady state. This measures how far behind the spark source has fallen behind (per partition) and can aid in tuning the application.

## How was this patch tested?

Existing and new unit tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #21819 from arunmahadevan/SPARK-24863.

Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-18 17:31:52 +08:00
Shixiong Zhu da2dc69291
[SPARK-25116][TESTS] Fix the Kafka cluster leak and clean up cached producers
## What changes were proposed in this pull request?

KafkaContinuousSinkSuite leaks a Kafka cluster because both KafkaSourceTest and KafkaContinuousSinkSuite create a Kafka cluster but `afterAll` only shuts down one cluster. This leaks a Kafka cluster and causes that some Kafka thread crash and kill JVM when SBT is trying to clean up tests.

This PR fixes the leak and also adds a shut down hook to detect Kafka cluster leak.

In additions, it also fixes `AdminClient` leak and cleans up cached producers (When a record is writtn using a producer, the producer will keep refreshing the topic and I don't find an API to clear it except closing the producer) to eliminate the following annoying logs:
```
8/13 15:34:42.568 kafka-admin-client-thread | adminclient-4 WARN NetworkClient: [AdminClient clientId=adminclient-4] Connection to node 0 could not be established. Broker may not be available.
18/08/13 15:34:42.570 kafka-admin-client-thread | adminclient-6 WARN NetworkClient: [AdminClient clientId=adminclient-6] Connection to node 0 could not be established. Broker may not be available.
18/08/13 15:34:42.606 kafka-admin-client-thread | adminclient-8 WARN NetworkClient: [AdminClient clientId=adminclient-8] Connection to node -1 could not be established. Broker may not be available.
18/08/13 15:34:42.729 kafka-producer-network-thread | producer-797 WARN NetworkClient: [Producer clientId=producer-797] Connection to node -1 could not be established. Broker may not be available.
18/08/13 15:34:42.906 kafka-producer-network-thread | producer-1598 WARN NetworkClient: [Producer clientId=producer-1598] Connection to node 0 could not be established. Broker may not be available.
```

I also reverted b5eb54244e introduced by #22097 since it doesn't help.

## How was this patch tested?

Jenkins

Closes #22106 from zsxwing/SPARK-25116.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-17 14:21:08 -07:00
Sean Owen b3e6fe7c46 [SPARK-23654][BUILD] remove jets3t as a dependency of spark
## What changes were proposed in this pull request?

Remove jets3t dependency, and bouncy castle which it brings in; update licenses and deps
Note this just takes over https://github.com/apache/spark/pull/21146

## How was this patch tested?

Existing tests.

Closes #22081 from srowen/SPARK-23654.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2018-08-16 12:34:23 -07:00
Shixiong Zhu 80784a1de8
[SPARK-18057][FOLLOW-UP] Use 127.0.0.1 to avoid zookeeper picking up an ipv6 address
## What changes were proposed in this pull request?

I'm still seeing the Kafka tests failed randomly due to `kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING`. I checked the test output and saw zookeeper picked up an ipv6 address. Most details can be found in https://issues.apache.org/jira/browse/KAFKA-7193

This PR just uses `127.0.0.1` rather than `localhost` to make sure zookeeper will never use an ipv6 address.

## How was this patch tested?

Jenkins

Closes #22097 from zsxwing/fix-zookeeper-connect.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2018-08-14 09:57:01 -07:00
Gengliang Wang ab197308a7
[SPARK-25104][SQL] Avro: Validate user specified output schema
## What changes were proposed in this pull request?

With code changes in https://github.com/apache/spark/pull/21847 , Spark can write out to Avro file as per user provided output schema.

To make it more robust and user friendly, we should validate the Avro schema before tasks launched.

Also we should support output logical decimal type as BYTES (By default we output as FIXED)

## How was this patch tested?

Unit test

Closes #22094 from gengliangwang/AvroSerializerMatch.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2018-08-14 04:43:14 +00:00
Gengliang Wang 26775e3c8e [SPARK-25099][SQL][TEST] Generate Avro Binary files in test suite
## What changes were proposed in this pull request?

In PR https://github.com/apache/spark/pull/21984 and https://github.com/apache/spark/pull/21935 , the related test cases are using binary files created by Python scripts.

Generate the binary files in test suite to make it more transparent.  Also we can

Also move the related test cases to a new file `AvroLogicalTypeSuite.scala`.

## How was this patch tested?

Unit test.

Closes #22091 from gengliangwang/logicalType_suite.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-13 20:50:28 +08:00
Gengliang Wang be2238fb50 [SPARK-24774][SQL] Avro: Support logical decimal type
## What changes were proposed in this pull request?

Support Avro logical date type:
https://avro.apache.org/docs/1.8.2/spec.html#Decimal

## How was this patch tested?
Unit test

Closes #22037 from gengliangwang/avro_decimal.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-13 08:29:07 +08:00
Brian Lindblom 0cea9e3cd0
[SPARK-24855][SQL][EXTERNAL] Built-in AVRO support should support specified schema on write
## What changes were proposed in this pull request?

Allows `avroSchema` option to be specified on write, allowing a user to specify a schema in cases where this is required.  A trivial use case is reading in an avro dataset, making some small adjustment to a column or columns and writing out using the same schema.  Implicit schema creation from SQL Struct results in a schema that while for the most part, is functionally similar, is not necessarily compatible.

Allows `fixed` Field type to be utilized for records of specified `avroSchema`

## How was this patch tested?

Unit tests in AvroSuite are extended to test this with enum and fixed types.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #21847 from lindblombr/specify_schema_on_write.

Lead-authored-by: Brian Lindblom <blindblom@apple.com>
Co-authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2018-08-10 03:35:29 +00:00
Kazuaki Ishizaki 56e9e97073 [MINOR][DOC] Fix typo
## What changes were proposed in this pull request?

This PR fixes typo regarding `auxiliary verb + verb[s]`. This is a follow-on of #21956.

## How was this patch tested?

N/A

Closes #22040 from kiszk/spellcheck1.

Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-09 20:10:17 +08:00
Gengliang Wang 819c4de45a [SPARK-24772][SQL] Avro: support logical date type
## What changes were proposed in this pull request?

Support Avro logical date type:
https://avro.apache.org/docs/1.8.2/spec.html#Date

## How was this patch tested?

Unit test

Closes #21984 from gengliangwang/avro_date.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-07 17:24:25 +08:00
Wenchen Fan ac527b5205 [SPARK-24991][SQL] use InternalRow in DataSourceWriter
## What changes were proposed in this pull request?

A follow up of #21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21948 from cloud-fan/row-write.
2018-08-06 15:52:01 +08:00
Yuval Itzchakov b7fdf8eb20 [SPARK-24987][SS] - Fix Kafka consumer leak when no new offsets for TopicPartition
## What changes were proposed in this pull request?

This small fix adds a `consumer.release()` call to `KafkaSourceRDD` in the case where we've retrieved offsets from Kafka, but the `fromOffset` is equal to the `lastOffset`, meaning there is no new data to read for a particular topic partition. Up until now, we'd just return an empty iterator without closing the consumer which would cause a FD leak.

If accepted, this pull request should be merged into master as well.

## How was this patch tested?

Haven't ran any specific tests, would love help on how to test methods running inside `RDD.compute`.

Author: Yuval Itzchakov <yuval.itzchakov@clicktale.com>

Closes #21997 from YuvalItzchakov/master.
2018-08-04 14:44:10 -05:00
Sean Owen 4c27663cb2
[SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
## What changes were proposed in this pull request?

Increase ZK timeout and harmonize configs across Kafka tests to resol…ve potentially flaky test failure

## How was this patch tested?

Existing tests

Author: Sean Owen <srowen@gmail.com>

Closes #21995 from srowen/SPARK-18057.3.
2018-08-03 16:22:54 -07:00
Sean Owen c32dbd6bd5 [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
## What changes were proposed in this pull request?

Update to kafka 2.0.0 in streaming-kafka module, and remove override for Scala 2.12. It won't compile for 2.12 otherwise.

## How was this patch tested?

Existing tests.

Author: Sean Owen <srowen@gmail.com>

Closes #21955 from srowen/SPARK-18057.2.
2018-08-03 08:17:18 -05:00
DB Tsai 273b28404c
[SPARK-24993][SQL] Make Avro Fast Again
## What changes were proposed in this pull request?

When lindblombr at apple developed [SPARK-24855](https://github.com/apache/spark/pull/21847) to support specified schema on write, we found a performance regression in Avro writer for our dataset.

With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further.

Spark 2.4
```
spark git:(master) ./build/mvn -DskipTests clean package
spark git:(master) bin/spark-shell --jars external/avro/target/spark-avro_2.11-2.4.0-SNAPSHOT.jar
```

Spark 2.3 + databricks avro
```
spark git:(branch-2.3) ./build/mvn -DskipTests clean package
spark git:(branch-2.3) bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0
```

Current master:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|             2.95621|
| stddev|0.030895815479469294|
|    min|               2.915|
|    max|               3.049|
+-------+--------------------+

+-------+--------------------+
|summary|           readTimes|
+-------+--------------------+
|  count|                 100|
|   mean| 0.31072999999999995|
| stddev|0.054139709842390006|
|    min|               0.259|
|    max|               0.692|
+-------+--------------------+
```

Current master with this PR:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|  2.5804300000000002|
| stddev|0.011175600225672079|
|    min|               2.558|
|    max|                2.62|
+-------+--------------------+

+-------+--------------------+
|summary|           readTimes|
+-------+--------------------+
|  count|                 100|
|   mean| 0.29922000000000004|
| stddev|0.058261961532514166|
|    min|               0.251|
|    max|               0.732|
+-------+--------------------+
```

Spark 2.3 + databricks avro:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|  1.7730500000000005|
| stddev|0.025199156230863575|
|    min|               1.729|
|    max|               1.833|
+-------+--------------------+

+-------+-------------------+
|summary|          readTimes|
+-------+-------------------+
|  count|                100|
|   mean|            0.29715|
| stddev|0.05685643358850465|
|    min|              0.258|
|    max|              0.718|
+-------+-------------------+
```

The following is the test code to reproduce the result.
```scala
    spark.sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
    val sparkSession = spark
    import sparkSession.implicits._
    val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid =>
      val features = Array.fill(16000)(scala.math.random)
      (uid, scala.math.random, java.util.UUID.randomUUID().toString, java.util.UUID.randomUUID().toString, features)
    }.toDF("uid", "random", "uuid1", "uuid2", "features").cache()
    val size = df.count()

    // Write into ramdisk to rule out the disk IO impact
    val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/"
    val n = 150
    val writeTimes = new Array[Double](n)
    var i = 0
    while (i < n) {
      val t1 = System.currentTimeMillis()
      df.write
        .format("com.databricks.spark.avro")
        .mode("overwrite")
        .save(tempSaveDir)
      val t2 = System.currentTimeMillis()
      writeTimes(i) = (t2 - t1) / 1000.0
      i += 1
    }

    df.unpersist()

    // The first 50 runs are for warm-up
    val readTimes = new Array[Double](n)
    i = 0
    while (i < n) {
      val t1 = System.currentTimeMillis()
      val readDF = spark.read.format("com.databricks.spark.avro").load(tempSaveDir)
      assert(readDF.count() == size)
      val t2 = System.currentTimeMillis()
      readTimes(i) = (t2 - t1) / 1000.0
      i += 1
    }

    spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show()
    spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show()
```

## How was this patch tested?

Existing tests.

Author: DB Tsai <d_tsai@apple.com>
Author: Brian Lindblom <blindblom@apple.com>

Closes #21952 from dbtsai/avro-performance-fix.
2018-08-03 07:43:54 +00:00
Gengliang Wang f45d60a5a1 [SPARK-25002][SQL] Avro: revise the output record namespace
## What changes were proposed in this pull request?

Currently the output namespace is starting with ".", e.g. `.topLevelRecord`

Although it is valid according to Avro spec, we should remove the starting dot in case of failures when the output Avro file is read by other lib:

https://github.com/linkedin/goavro/pull/96

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21974 from gengliangwang/avro_namespace.
2018-08-03 13:28:44 +08:00
Gengliang Wang 7cf16a7fa4 [SPARK-24773] Avro: support logical timestamp type with different precisions
## What changes were proposed in this pull request?

Support reading/writing Avro logical timestamp type with different precisions
https://avro.apache.org/docs/1.8.2/spec.html#Timestamp+%28millisecond+precision%29

To specify the output timestamp type, use Dataframe option `outputTimestampType`  or SQL config `spark.sql.avro.outputTimestampType`.  The supported values are
* `TIMESTAMP_MICROS`
* `TIMESTAMP_MILLIS`

The default output type is `TIMESTAMP_MICROS`
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21935 from gengliangwang/avro_timestamp.
2018-08-03 08:32:08 +08:00
Stavros Kontopoulos a65736996b [SPARK-14540][CORE] Fix remaining major issues for Scala 2.12 Support
## What changes were proposed in this pull request?
This PR addresses issues 2,3 in this [document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk).

* We modified the closure cleaner to identify closures that are implemented via the LambdaMetaFactory mechanism (serializedLambdas) (issue2).

* We also fix the issue due to scala/bug#11016. There are two options for solving the Unit issue, either add () at the end of the closure or use the trick described in the doc. Otherwise overloading resolution does not work (we are not going to eliminate either of the methods) here. Compiler tries to adapt to Unit and makes these two methods candidates for overloading, when there is polymorphic overloading there is no ambiguity (that is the workaround implemented). This does not look that good but it serves its purpose as we need to support two different uses for method: `addTaskCompletionListener`. One that passes a TaskCompletionListener and one that passes a closure that is wrapped with a TaskCompletionListener later on (issue3).

Note: regarding issue 1 in the doc the plan is:

> Do Nothing. Don’t try to fix this as this is only a problem for Java users who would want to use 2.11 binaries. In that case they can cast to MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be simplified so that this issue is removed.

## How was this patch tested?
This was manually tested:
```./dev/change-scala-version.sh 2.12
./build/mvn -DskipTests -Pscala-2.12 clean package
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None```

Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>

Closes #21930 from skonto/scala2.12-sup.
2018-08-02 09:17:09 -05:00
tedyu e82784d13f [SPARK-18057][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
## What changes were proposed in this pull request?

This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated.

## How was this patch tested?

This PR uses existing Kafka related unit tests

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: tedyu <yuzhihong@gmail.com>

Closes #21488 from tedyu/master.
2018-07-31 13:14:14 -07:00
Maxim Gekk d20c10fdf3 [SPARK-24952][SQL] Support LZMA2 compression by Avro datasource
## What changes were proposed in this pull request?

In the PR, I propose to support `LZMA2` (`XZ`) and `BZIP2` compressions by `AVRO` datasource  in write since the codecs may have better characteristics like compression ratio and speed comparing to already supported `snappy` and `deflate` codecs.

## How was this patch tested?

It was tested manually and by an existing test which was extended to check the `xz` and `bzip2` compressions.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21902 from MaxGekk/avro-xz-bzip2.
2018-07-31 09:12:57 +08:00
Takeshi Yamamuro 47d84e4d0e [SPARK-22814][SQL] Support Date/Timestamp in a JDBC partition column
## What changes were proposed in this pull request?
This pr supported Date/Timestamp in a JDBC partition column (a numeric column is only supported in the master). This pr also modified code to verify a partition column type;
```
val jdbcTable = spark.read
 .option("partitionColumn", "text")
 .option("lowerBound", "aaa")
 .option("upperBound", "zzz")
 .option("numPartitions", 2)
 .jdbc("jdbc:postgresql:postgres", "t", options)

// with this pr
org.apache.spark.sql.AnalysisException: Partition column type should be numeric, date, or timestamp, but string found.;
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.verifyAndGetNormalizedPartitionColumn(JDBCRelation.scala:165)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.columnPartition(JDBCRelation.scala:85)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317)

// without this pr
java.lang.NumberFormatException: For input string: "aaa"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Long.parseLong(Long.java:589)
  at java.lang.Long.parseLong(Long.java:631)
  at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
```

Closes #19999

## How was this patch tested?
Added tests in `JDBCSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21834 from maropu/SPARK-22814.
2018-07-30 07:42:00 -07:00
hyukjinkwon fca0b8528e [SPARK-24967][SQL] Avro: Use internal.Logging instead for logging
## What changes were proposed in this pull request?

Looks Avro uses direct `getLogger` to create a SLF4J logger. Should better use `internal.Logging` instead.

## How was this patch tested?

Exiting tests.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21914 from HyukjinKwon/avro-log.
2018-07-30 21:13:08 +08:00
Xiao Li c6a3db2fb6 [SPARK-24924][SQL][FOLLOW-UP] Add mapping for built-in Avro data source
## What changes were proposed in this pull request?
Add one more test case for `com.databricks.spark.avro`.

## How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21906 from gatorsmile/avro.
2018-07-28 13:43:32 +08:00
Maxim Gekk 0a0f68bae6 [SPARK-24881][SQL] New Avro option - compression
## What changes were proposed in this pull request?

In the PR, I added new option for Avro datasource - `compression`. The option allows to specify compression codec for saved Avro files. This option is similar to `compression` option in another datasources like `JSON` and `CSV`.

Also I added the SQL configs `spark.sql.avro.compression.codec` and `spark.sql.avro.deflate.level`. I put the configs into `SQLConf`. If the `compression` option is not specified by an user, the first SQL config is taken into account.

## How was this patch tested?

I added new test which read meta info from written avro files and checks `avro.codec` property.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21837 from MaxGekk/avro-compression.
2018-07-28 00:11:32 +08:00
Gengliang Wang fa09d91925 [SPARK-24919][BUILD] New linter rule for sparkContext.hadoopConfiguration
## What changes were proposed in this pull request?

In most cases, we should use `spark.sessionState.newHadoopConf()` instead of `sparkContext.hadoopConfiguration`, so that the hadoop configurations specified in Spark session
configuration will come into effect.

Add a rule matching `spark.sparkContext.hadoopConfiguration` or `spark.sqlContext.sparkContext.hadoopConfiguration` to prevent the usage.
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21873 from gengliangwang/linterRule.
2018-07-26 16:50:59 -07:00
Dongjoon Hyun 58353d7f4b [SPARK-24924][SQL] Add mapping for built-in Avro data source
## What changes were proposed in this pull request?

This PR aims to the followings.
1. Like `com.databricks.spark.csv` mapping, we had better map `com.databricks.spark.avro` to built-in Avro data source.
2. Remove incorrect error message, `Please find an Avro package at ...`.

## How was this patch tested?

Pass the newly added tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21878 from dongjoon-hyun/SPARK-24924.
2018-07-26 16:11:03 +08:00
Gengliang Wang c44eb561ec [SPARK-24768][FOLLOWUP][SQL] Avro migration followup: change artifactId to spark-avro
## What changes were proposed in this pull request?
After rethinking on the artifactId, I think it should be `spark-avro` instead of `spark-sql-avro`, which is simpler, and consistent with the previous artifactId. I think we need to change it before Spark 2.4 release.

Also a tiny change: use `spark.sessionState.newHadoopConf()` to get the hadoop configuration, thus the related hadoop configurations in SQLConf will come into effect.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21866 from gengliangwang/avro_followup.
2018-07-25 08:42:45 -07:00
Ryan Blue 9d27541a85 [SPARK-23325] Use InternalRow when reading with DataSourceV2.
## What changes were proposed in this pull request?

This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.

Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.

## How was this patch tested?

This uses existing tests.

Author: Ryan Blue <blue@apache.org>

Closes #21118 from rdblue/SPARK-23325-datasource-v2-internal-row.
2018-07-24 10:46:36 -07:00
Gengliang Wang 08e315f633 [SPARK-24887][SQL] Avro: use SerializableConfiguration in Spark utils to deduplicate code
## What changes were proposed in this pull request?

To implement the method `buildReader` in `FileFormat`, it is required to serialize the hadoop configuration for executors.

Previous spark-avro uses its own class `SerializableConfiguration` for the serialization. As now it is part of Spark, we can use SerializableConfiguration in Spark util to deduplicate the code.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21846 from gengliangwang/removeSerializableConfiguration.
2018-07-23 08:31:48 -07:00
Gengliang Wang f59de52a2a [SPARK-24883][SQL] Avro: remove implicit class AvroDataFrameWriter/AvroDataFrameReader
## What changes were proposed in this pull request?

As per Reynold's comment: https://github.com/apache/spark/pull/21742#discussion_r203496489

It makes sense to remove the implicit class AvroDataFrameWriter/AvroDataFrameReader, since the Avro package is external module.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21841 from gengliangwang/removeImplicit.
2018-07-23 15:27:33 +08:00
Gengliang Wang 8817c68f50 [SPARK-24811][SQL] Avro: add new function from_avro and to_avro
## What changes were proposed in this pull request?

1. Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

2. Add a new function to_avro for converting a column into binary of avro format with the specified schema.

I created #21774 for this, but it failed the build https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/

Additional changes In this PR:
1. Add `scalacheck` dependency in pom.xml to resolve the failure.
2. Update the `log4j.properties` to make it consistent with other modules.

## How was this patch tested?

Unit test
Compile with different commands:
```
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
```

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21838 from gengliangwang/from_and_to_avro.
2018-07-22 17:36:57 -07:00
Maxim Gekk 106880edcd [SPARK-24836][SQL] New option for Avro datasource - ignoreExtension
## What changes were proposed in this pull request?

I propose to add new option for AVRO datasource which should control ignoring of files without `.avro` extension in read. The option has name `ignoreExtension` with default value `true`. If both options `ignoreExtension` and `avro.mapred.ignore.inputs.without.extension` are set, `ignoreExtension` overrides the former one. Here is an example of usage:

```
spark
  .read
  .option("ignoreExtension", false)
  .avro("path to avro files")
```

## How was this patch tested?

I added a test which checks the option directly and a test for checking that new option overrides hadoop's config.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21798 from MaxGekk/avro-ignore-extension.
2018-07-20 20:04:40 -07:00
Gengliang Wang 00b864aa70 [SPARK-24876][SQL] Avro: simplify schema serialization
## What changes were proposed in this pull request?

Previously in the refactoring of Avro Serializer and Deserializer, a new class SerializableSchema is created for serializing the Avro schema:
https://github.com/apache/spark/pull/21762/files#diff-01fea32e6ec6bcf6f34d06282e08705aR37

On second thought, we can use `toString` method for serialization. After that, parse the JSON format schema on executor. This makes the code much simpler.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21829 from gengliangwang/removeSerializableSchema.
2018-07-20 14:57:59 -07:00
Xiao Li 9ad77b3037 Revert "[SPARK-24811][SQL] Avro: add new function from_avro and to_avro"
This reverts commit 244bcff194.
2018-07-20 12:55:38 -07:00
Gengliang Wang 244bcff194 [SPARK-24811][SQL] Avro: add new function from_avro and to_avro
## What changes were proposed in this pull request?

Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

Add a new function to_avro for converting a column into binary of avro format with the specified schema.

This PR is in progress. Will add test cases.
## How was this patch tested?

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21774 from gengliangwang/from_and_to_avro.
2018-07-20 09:19:29 -07:00
Marco Gaido a5925c1631 [SPARK-24268][SQL] Use datatype.catalogString in error messages
## What changes were proposed in this pull request?

As stated in https://github.com/apache/spark/pull/21321, in the error messages we should use `catalogString`. This is not the case, as SPARK-22893 used `simpleString` in order to have the same representation everywhere and it missed some places.

The PR unifies the messages using alway the `catalogString` representation of the dataTypes in the messages.

## How was this patch tested?

existing/modified UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21804 from mgaido91/SPARK-24268_catalog.
2018-07-19 23:29:29 -07:00
Maxim Gekk cd5d93c0e4 [SPARK-24854][SQL] Gathering all Avro options into the AvroOptions class
## What changes were proposed in this pull request?

In the PR, I propose to put all `Avro` options in new class `AvroOptions` in the same way as for other datasources `JSON` and `CSV`.

## How was this patch tested?

It was tested by `AvroSuite`

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21810 from MaxGekk/avro-options.
2018-07-19 09:16:16 +08:00
Takuya UESHIN 34cb3b54e9 [SPARK-24386][SPARK-24768][BUILD][FOLLOWUP] Fix lint-java and Scala 2.12 build.
## What changes were proposed in this pull request?

This pr fixes lint-java and Scala 2.12 build.

lint-java:

```
[ERROR] src/test/resources/log4j.properties:[0] (misc) NewlineAtEndOfFile: File does not end with a newline.
```

Scala 2.12 build:

```
[error] /.../sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala:121: overloaded method value addTaskCompletionListener with alternatives:
[error]   (f: org.apache.spark.TaskContext => Unit)org.apache.spark.TaskContext <and>
[error]   (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error]  cannot be applied to (org.apache.spark.TaskContext => java.util.List[Runnable])
[error]       context.addTaskCompletionListener { ctx =>
[error]               ^
```

## How was this patch tested?

Manually executed lint-java and Scala 2.12 build in my local environment.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21801 from ueshin/issues/SPARK-24386_24768/fix_build.
2018-07-18 19:17:18 +08:00
Maxim Gekk ba437fc5c7 [SPARK-24805][SQL] Do not ignore avro files without extensions by default
## What changes were proposed in this pull request?

In the PR, I propose to change default behaviour of AVRO datasource which currently ignores files without `.avro` extension in read by default. This PR sets the default value for `avro.mapred.ignore.inputs.without.extension` to `false` in the case if the parameter is not set by an user.

## How was this patch tested?

Added a test file without extension in AVRO format, and new test for reading the file with and wihout specified schema.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21769 from MaxGekk/avro-without-extension.
2018-07-16 14:35:44 -07:00
Maxim Gekk 9f929458fb [SPARK-24810][SQL] Fix paths to test files in AvroSuite
## What changes were proposed in this pull request?

In the PR, I propose to move `testFile()` to the common trait `SQLTestUtilsBase` and wrap test files in `AvroSuite` by the method `testFile()` which returns full paths to test files in the resource folder.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21773 from MaxGekk/test-file.
2018-07-15 23:01:36 -07:00
Gengliang Wang 9603087638 [SPARK-24800][SQL] Refactor Avro Serializer and Deserializer
## What changes were proposed in this pull request?
Currently the Avro Deserializer converts input Avro format data to `Row`, and then convert the `Row` to `InternalRow`.
While the Avro Serializer converts `InternalRow` to `Row`, and then output Avro format data.
This PR allows direct conversion between `InternalRow` and Avro format data.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21762 from gengliangwang/avro_io.
2018-07-15 22:06:33 +08:00
Gengliang Wang 3e7dc82960 [SPARK-24776][SQL] Avro unit test: deduplicate code and replace deprecated methods
## What changes were proposed in this pull request?

Improve Avro unit test:
1. use QueryTest/SharedSQLContext/SQLTestUtils, instead of the duplicated test utils.
2. replace deprecated methods

This is a follow up PR for #21760, the PR passes pull request tests but failed in: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7842/

This PR is to fix it.
## How was this patch tested?
Unit test.
Compile with different commands:

```
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile

```

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21768 from gengliangwang/improve_avro_test.
2018-07-14 21:36:56 -07:00
Xiao Li 3bcb1b4814 Revert "[SPARK-24776][SQL] Avro unit test: use SQLTestUtils and replace deprecated methods"
This reverts commit c1b62e420a.
2018-07-13 10:06:26 -07:00
Gengliang Wang c1b62e420a [SPARK-24776][SQL] Avro unit test: use SQLTestUtils and replace deprecated methods
## What changes were proposed in this pull request?
Improve Avro unit test:
1. use QueryTest/SharedSQLContext/SQLTestUtils, instead of the duplicated test utils.
2. replace deprecated methods

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21760 from gengliangwang/improve_avro_test.
2018-07-13 08:55:46 -07:00
Yuanbo Liu 0f24c6f8ab [SPARK-24713] AppMatser of spark streaming kafka OOM if there are hund…
We have hundreds of kafka topics need to be consumed in one application. The application master will throw OOM exception after hanging for nearly half of an hour.

OOM happens in the env with a lot of topics, and it's not convenient to set up such kind of env in the unit test. So I didn't change/add test case.

Author: Yuanbo Liu <yuanbo@Yuanbos-MacBook-Air.local>
Author: yuanbo <yuanbo@apache.org>

Closes #21690 from yuanboliu/master.
2018-07-13 07:37:24 -06:00
Gengliang Wang 395860a986 [SPARK-24768][SQL] Have a built-in AVRO data source implementation
## What changes were proposed in this pull request?

Apache Avro (https://avro.apache.org) is a popular data serialization format. It is widely used in the Spark and Hadoop ecosystem, especially for Kafka-based data pipelines.  Using the external package https://github.com/databricks/spark-avro, Spark SQL can read and write the avro data. Making spark-Avro built-in can provide a better experience for first-time users of Spark SQL and structured streaming. We expect the built-in Avro data source can further improve the adoption of structured streaming.
The proposal is to inline code from spark-avro package (https://github.com/databricks/spark-avro). The target release is Spark 2.4.

[Built-in AVRO Data Source In Spark 2.4.pdf](https://github.com/apache/spark/files/2181511/Built-in.AVRO.Data.Source.In.Spark.2.4.pdf)

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21742 from gengliangwang/export_avro.
2018-07-12 13:55:25 -07:00
Yash Sharma 9fa4a1ed38 [SPARK-20168][STREAMING KINESIS] Setting the timestamp directly would cause exception on …
Setting the timestamp directly would cause exception on reading stream, it can be set directly only if the mode is not AT_TIMESTAMP

## What changes were proposed in this pull request?

The last patch in the kinesis streaming receiver sets the timestamp for the mode AT_TIMESTAMP, but this mode can only be set via the

`baseClientLibConfiguration.withTimestampAtInitialPositionInStream()
`
and can't be set directly using
`.withInitialPositionInStream()`

This patch fixes the issue.

## How was this patch tested?
Kinesis Receiver doesn't expose the internal state outside, so couldn't find the right way to test this change. Seeking for tips from other contributors here.

Author: Yash Sharma <ysharma@atlassian.com>

Closes #21541 from yashs360/ysharma/fix_kinesis_bug.
2018-07-12 10:04:47 -07:00
Xiao Li aec966b05e Revert "[SPARK-24268][SQL] Use datatype.simpleString in error messages"
This reverts commit 1bd3d61f41.
2018-07-09 14:24:23 -07:00
Marco Gaido 1bd3d61f41 [SPARK-24268][SQL] Use datatype.simpleString in error messages
## What changes were proposed in this pull request?

SPARK-22893 tried to unify error messages about dataTypes. Unfortunately, still many places were missing the `simpleString` method in other to have the same representation everywhere.

The PR unified the messages using alway the simpleString representation of the dataTypes in the messages.

## How was this patch tested?

existing/modified UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21321 from mgaido91/SPARK-24268.
2018-07-09 22:59:05 +08:00
Marcelo Vanzin 6d16b9885d [SPARK-24552][CORE][SQL] Use task ID instead of attempt number for writes.
This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted.

For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem.

Closes #21558

Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Ryan Blue <blue@apache.org>

Closes #21606 from vanzin/SPARK-24552.2.
2018-06-25 16:54:57 -07:00
Shixiong Zhu 53c06ddabb [SPARK-24332][SS][MESOS] Fix places reading 'spark.network.timeout' as milliseconds
## What changes were proposed in this pull request?

This PR replaces `getTimeAsMs` with `getTimeAsSeconds` to fix the issue that reading "spark.network.timeout" using a wrong time unit when the user doesn't specify a time out.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #21382 from zsxwing/fix-network-timeout-conf.
2018-05-24 13:00:24 -07:00
Gabor Somogyi 79e06faa4e [SPARK-19185][DSTREAMS] Avoid concurrent use of cached consumers in CachedKafkaConsumer
## What changes were proposed in this pull request?

`CachedKafkaConsumer` in the project streaming-kafka-0-10 is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one thread trying to read the same Kafka TopicPartition at the same time. This assumption is not true all the time and this can inadvertently lead to ConcurrentModificationException.

Here is a better way to design this. The consumer pool should be smart enough to avoid concurrent use of a cached consumer. If there is another request for the same TopicPartition as a currently in-use consumer, the pool should automatically return a fresh consumer.

- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called `KafkaDataConsumer` is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply call `val consumer = KafkaDataConsumer.acquire` and then `consumer.release`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is request for a consumer which is a task reattempt, then already existing cached consumer will be invalidated and a new consumer is generated. This could fix potential issues if the source of the reattempt is a malfunctioning consumer.
- In addition, I renamed the `CachedKafkaConsumer` class to `KafkaDataConsumer` because is a misnomer given that what it returns may or may not be cached.

## How was this patch tested?

A new stress test that verifies it is safe to concurrently get consumers for the same TopicPartition from the consumer pool.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20997 from gaborgsomogyi/SPARK-19185.
2018-05-22 13:43:45 -07:00
Arun Mahadevan 710e4e81a8 [SPARK-24308][SQL] Handle DataReaderFactory to InputPartition rename in left over classes
## What changes were proposed in this pull request?

SPARK-24073 renames DataReaderFactory -> InputPartition and DataReader -> InputPartitionReader. Some classes still reflects the old name and causes confusion. This patch renames the left over classes to reflect the new interface and fixes a few comments.

## How was this patch tested?

Existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Arun Mahadevan <arunm@apache.org>

Closes #21355 from arunmahadevan/SPARK-24308.
2018-05-18 14:37:01 -07:00
Ryan Blue 62d01391fe [SPARK-24073][SQL] Rename DataReaderFactory to InputPartition.
## What changes were proposed in this pull request?

Renames:
* `DataReaderFactory` to `InputPartition`
* `DataReader` to `InputPartitionReader`
* `createDataReaderFactories` to `planInputPartitions`
* `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions`
* `createBatchDataReaderFactories` to `planBatchInputPartitions`

This fixes the changes in SPARK-23219, which renamed ReadTask to
DataReaderFactory. The intent of that change was to make the read and
write API match (write side uses DataWriterFactory), but the underlying
problem is that the two classes are not equivalent.

ReadTask/DataReader function as Iterable/Iterator. One InputPartition is
a specific partition of the data to be read, in contrast to
DataWriterFactory where the same factory instance is used in all write
tasks. InputPartition's purpose is to manage the lifecycle of the
associated reader, which is now called InputPartitionReader, with an
explicit create operation to mirror the close operation. This was no
longer clear from the API because DataReaderFactory appeared to be more
generic than it is and it isn't clear why a set of them is produced for
a read.

## How was this patch tested?

Existing tests, which have been updated to use the new name.

Author: Ryan Blue <blue@apache.org>

Closes #21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename.
2018-05-09 21:48:54 -07:00
Tathagata Das d1eb8d3ddc [SPARK-24094][SS][MINOR] Change description strings of v2 streaming sources to reflect the change
## What changes were proposed in this pull request?

This makes it easy to understand at runtime which version is running. Great for debugging production issues.

## How was this patch tested?
Not necessary.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21160 from tdas/SPARK-24094.
2018-04-25 23:24:05 -07:00
Tathagata Das 396938ef02 [SPARK-24050][SS] Calculate input / processing rates correctly for DataSourceV2 streaming sources
## What changes were proposed in this pull request?

In some streaming queries, the input and processing rates are not calculated at all (shows up as zero) because MicroBatchExecution fails to associated metrics from the executed plan of a trigger with the sources in the logical plan of the trigger. The way this executed-plan-leaf-to-logical-source attribution works is as follows. With V1 sources, there was no way to identify which execution plan leaves were generated by a streaming source. So did a best-effort attempt to match logical and execution plan leaves when the number of leaves were same. In cases where the number of leaves is different, we just give up and report zero rates. An example where this may happen is as follows.

```
val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
val streamingInputDF = ...

val query = streamingInputDF.join(cachedStaticDF).writeStream....
```
In this case, the `cachedStaticDF` has multiple logical leaves, but in the trigger's execution plan it only has leaf because a cached subplan is represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in the number of leaves causing the input rates to be computed as zero.

With DataSourceV2, all inputs are represented in the executed plan using `DataSourceV2ScanExec`, each of which has a reference to the associated logical `DataSource` and `DataSourceReader`. So its easy to associate the metrics to the original streaming sources.

In this PR, the solution is as follows. If all the streaming sources in a streaming query as v2 sources, then use a new code path where the execution-metrics-to-source mapping is done directly. Otherwise we fall back to existing mapping logic.

## How was this patch tested?
- New unit tests using V2 memory source
- Existing unit tests using V1 source

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21126 from tdas/SPARK-24050.
2018-04-25 12:21:55 -07:00
Tathagata Das 7b1e6523af [SPARK-24056][SS] Make consumer creation lazy in Kafka source for Structured streaming
## What changes were proposed in this pull request?

Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. However, we create dummy KafkaMicroBatchReader to get the schema and immediately stop it. Its better to make the consumer creation lazy, it will be created on the first attempt to fetch offsets using the KafkaOffsetReader.

## How was this patch tested?
Existing unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21134 from tdas/SPARK-24056.
2018-04-24 14:33:33 -07:00
liuzhaokun 448d248f89 [SPARK-21168] KafkaRDD should always set kafka clientId.
[https://issues.apache.org/jira/browse/SPARK-21168](https://issues.apache.org/jira/browse/SPARK-21168)
There are no a number of other places that a client ID should be set,and I think we should use consumer.clientId in the clientId method,because the fetch request  will be used by the same consumer behind.

Author: liuzhaokun <liu.zhaokun@zte.com.cn>

Closes #19887 from liu-zhaokun/master1205.
2018-04-23 13:56:11 -05:00
jerryshao 5fccdae189 [SPARK-22968][DSTREAM] Throw an exception on partition revoking issue
## What changes were proposed in this pull request?

Kafka partitions can be revoked when new consumers joined in the consumer group to rebalance the partitions. But current Spark Kafka connector code makes sure there's no partition revoking scenarios, so trying to get latest offset from revoked partitions will throw exceptions as JIRA mentioned.

Partition revoking happens when new consumer joined the consumer group, which means different streaming apps are trying to use same group id. This is fundamentally not correct, different apps should use different consumer group. So instead of throwing an confused exception from Kafka, improve the exception message by identifying revoked partition and directly throw an meaningful exception when partition is revoked.

Besides, this PR also fixes bugs in `DirectKafkaWordCount`, this example simply cannot be worked without the fix.

```
8/01/05 09:48:27 INFO internals.ConsumerCoordinator: Revoking previously assigned partitions [kssh-7, kssh-4, kssh-3, kssh-6, kssh-5, kssh-0, kssh-2, kssh-1] for group use_a_separate_group_id_for_each_stream
18/01/05 09:48:27 INFO internals.AbstractCoordinator: (Re-)joining group use_a_separate_group_id_for_each_stream
18/01/05 09:48:27 INFO internals.AbstractCoordinator: Successfully joined group use_a_separate_group_id_for_each_stream with generation 4
18/01/05 09:48:27 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [kssh-7, kssh-4, kssh-6, kssh-5] for group use_a_separate_group_id_for_each_stream
```

## How was this patch tested?

This is manually verified in local cluster, unfortunately I'm not sure how to simulate it in UT, so propose the PR without UT added.

Author: jerryshao <sshao@hortonworks.com>

Closes #21038 from jerryshao/SPARK-22968.
2018-04-17 21:08:42 -05:00
Marco Gaido 0a9172a05e [SPARK-23835][SQL] Add not-null check to Tuples' arguments deserialization
## What changes were proposed in this pull request?

There was no check on nullability for arguments of `Tuple`s. This could lead to have weird behavior when a null value had to be deserialized into a non-nullable Scala object: in those cases, the `null` got silently transformed in a valid value (like `-1` for `Int`), corresponding to the default value we are using in the SQL codebase. This situation was very likely to happen when deserializing to a Tuple of primitive Scala types (like Double, Int, ...).

The PR adds the `AssertNotNull` to arguments of tuples which have been asked to be converted to non-nullable types.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20976 from mgaido91/SPARK-23835.
2018-04-17 21:45:20 +08:00
Nolan Emirot 32471ba0af Fix typo in Python docstring kinesis example
## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Nolan Emirot <nolan@turo.com>

Closes #20990 from emirot/kinesis_stream_example_typo.
2018-04-09 08:04:02 -05:00
Kazuaki Ishizaki a7c19d9c21 [SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder classes
## What changes were proposed in this pull request?

This PR implemented the following cleanups related to  `UnsafeWriter` class:
- Remove code duplication between `UnsafeRowWriter` and `UnsafeArrayWriter`
- Make `BufferHolder` class internal by delegating its accessor methods to `UnsafeWriter`
- Replace `UnsafeRow.setTotalSize(...)` with `UnsafeRowWriter.setTotalSize()`

## How was this patch tested?

Tested by existing UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20850 from kiszk/SPARK-23713.
2018-04-02 21:48:44 +02:00
akonopko 2b89e4aa2e [SPARK-18580][DSTREAM][KAFKA] Add spark.streaming.backpressure.initialRate to direct Kafka streams
## What changes were proposed in this pull request?

Add `spark.streaming.backpressure.initialRate` to direct Kafka Streams for Kafka 0.8 and 0.10
This is required in order to be able to use backpressure with huge lags, which cannot be processed at once. Without this parameter `DirectKafkaInputDStream` with backpressure enabled would try to get all the possible data from Kafka before adjusting consumption rate

## How was this patch tested?

- Tests added to `org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala` and `org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala`
- Manual tests on YARN cluster

Author: akonopko <alex.konopko@teamaol.com>
Author: Alexander Konopko <alexander.konopko@gmail.com>

Closes #19431 from akonopko/SPARK-18580-initialrate.
2018-03-21 14:40:21 -05:00
Tathagata Das bd201bf61e [SPARK-23623][SS] Avoid concurrent use of cached consumers in CachedKafkaConsumer
## What changes were proposed in this pull request?

CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.

## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20767 from tdas/SPARK-23623.
2018-03-16 11:11:07 -07:00
Sebastian Arzt dffeac3691 [SPARK-18371][STREAMING] Spark Streaming backpressure generates batch with large number of records
## What changes were proposed in this pull request?

Omit rounding of backpressure rate. Effects:
- no batch with large number of records is created when rate from PID estimator is one
- the number of records per batch and partition is more fine-grained improving backpressure accuracy

## How was this patch tested?

This was tested by running:
- `mvn test -pl external/kafka-0-8`
- `mvn test -pl external/kafka-0-10`
- a streaming application which was suffering from the issue

JasonMWhite

The contribution is my original work and I license the work to the project under the project’s open source license

Author: Sebastian Arzt <sebastian.arzt@plista.com>

Closes #17774 from arzt/kafka-back-pressure.
2018-03-16 12:25:58 -05:00
Yuanjian Li 7c3e8995f1 [SPARK-23533][SS] Add support for changing ContinuousDataReader's startOffset
## What changes were proposed in this pull request?

As discussion in #20675, we need add a new interface `ContinuousDataReaderFactory` to support the requirements of setting start offset in Continuous Processing.

## How was this patch tested?

Existing UT.

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes #20689 from xuanyuanking/SPARK-23533.
2018-03-15 00:04:28 -07:00
Wenchen Fan ad640a5aff [SPARK-23303][SQL] improve the explain result for data source v2 relations
## What changes were proposed in this pull request?

The proposed explain format:
**[streaming header] [RelationV2/ScanV2] [data source name] [output] [pushed filters] [options]**

**streaming header**: if it's a streaming relation, put a "Streaming" at the beginning.
**RelationV2/ScanV2**: if it's a logical plan, put a "RelationV2", else, put a "ScanV2"
**data source name**: the simple class name of the data source implementation
**output**: a string of the plan output attributes
**pushed filters**: a string of all the filters that have been pushed to this data source
**options**: all the options to create the data source reader.

The current explain result for data source v2 relation is unreadable:
```
== Parsed Logical Plan ==
'Filter ('i > 6)
+- AnalysisBarrier
      +- Project [j#1]
         +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Filter (i#0 > 6)
   +- Project [j#1, i#0]
      +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Optimized Logical Plan ==
Project [j#1]
+- Filter isnotnull(i#0)
   +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Physical Plan ==
*(1) Project [j#1]
+- *(1) Filter isnotnull(i#0)
   +- *(1) DataSourceV2Scan [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940
```

after this PR
```
== Parsed Logical Plan ==
'Project [unresolvedalias('j, None)]
+- AnalysisBarrier
      +- RelationV2 AdvancedDataSourceV2[i#0, j#1]

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- RelationV2 AdvancedDataSourceV2[i#0, j#1]

== Optimized Logical Plan ==
RelationV2 AdvancedDataSourceV2[j#1]

== Physical Plan ==
*(1) ScanV2 AdvancedDataSourceV2[j#1]
```
-------
```
== Analyzed Logical Plan ==
i: int, j: int
Filter (i#88 > 3)
+- RelationV2 JavaAdvancedDataSourceV2[i#88, j#89]

== Optimized Logical Plan ==
Filter isnotnull(i#88)
+- RelationV2 JavaAdvancedDataSourceV2[i#88, j#89] (Pushed Filters: [GreaterThan(i,3)])

== Physical Plan ==
*(1) Filter isnotnull(i#88)
+- *(1) ScanV2 JavaAdvancedDataSourceV2[i#88, j#89] (Pushed Filters: [GreaterThan(i,3)])
```

an example for streaming query
```
== Parsed Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject cast(value#25 as string).toString, obj#4: java.lang.String
         +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject cast(value#25 as string).toString, obj#4: java.lang.String
         +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Optimized Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject value#25.toString, obj#4: java.lang.String
         +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Physical Plan ==
*(4) HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#11L])
+- StateStoreSave [value#6], state info [ checkpoint = *********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state, runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions = 5], Complete, 0
   +- *(3) HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#16L])
      +- StateStoreRestore [value#6], state info [ checkpoint = *********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state, runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions = 5]
         +- *(2) HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#16L])
            +- Exchange hashpartitioning(value#6, 5)
               +- *(1) HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#16L])
                  +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
                     +- *(1) MapElements <function1>, obj#5: java.lang.String
                        +- *(1) DeserializeToObject value#25.toString, obj#4: java.lang.String
                           +- *(1) ScanV2 MemoryStreamDataSource[value#25]
```
## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20647 from cloud-fan/explain.
2018-03-05 20:35:14 -08:00
Jose Torres b0f422c386 [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
## What changes were proposed in this pull request?

Add an epoch ID argument to DataWriterFactory for use in streaming. As a side effect of passing in this value, DataWriter will now have a consistent lifecycle; commit() or abort() ends the lifecycle of a DataWriter instance in any execution mode.

I considered making a separate streaming interface and adding the epoch ID only to that one, but I think it requires a lot of extra work for no real gain. I think it makes sense to define epoch 0 as the one and only epoch of a non-streaming query.

## How was this patch tested?

existing unit tests

Author: Jose Torres <jose@databricks.com>

Closes #20710 from jose-torres/api2.
2018-03-05 13:23:01 -08:00
Tathagata Das 486f99eefe [SPARK-23541][SS] Allow Kafka source to read data with greater parallelism than the number of topic-partitions
## What changes were proposed in this pull request?

Currently, when the Kafka source reads from Kafka, it generates as many tasks as the number of partitions in the topic(s) to be read. In some case, it may be beneficial to read the data with greater parallelism, that is, with more number partitions/tasks. That means, offset ranges must be divided up into smaller ranges such the number of records in partition ~= total records in batch / desired partitions. This would also balance out any data skews between topic-partitions.

In this patch, I have added a new option called `minPartitions`, which allows the user to specify the desired level of parallelism.

## How was this patch tested?
New tests in KafkaMicroBatchV2SourceSuite.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20698 from tdas/SPARK-23541.
2018-03-02 18:14:13 -08:00
cody koeninger eac0b06722 [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive offsets
## What changes were proposed in this pull request?

Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to allow streaming jobs to proceed on compacted topics (or other situations involving gaps between offsets in the log).

## How was this patch tested?

Added new unit test

justinrmiller has been testing this branch in production for a few weeks

Author: cody koeninger <cody@koeninger.org>

Closes #20572 from koeninger/SPARK-17147.
2018-02-27 08:21:11 -06:00
Tathagata Das 3fd0ccb13f [SPARK-23484][SS] Fix possible race condition in KafkaContinuousReader
## What changes were proposed in this pull request?

var `KafkaContinuousReader.knownPartitions` should be threadsafe as it is accessed from multiple threads - the query thread at the time of reader factory creation, and the epoch tracking thread at the time of `needsReconfiguration`.

## How was this patch tested?

Existing tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20655 from tdas/SPARK-23484.
2018-02-21 14:56:13 -08:00
Ryan Blue aadf9535b4 [SPARK-23203][SQL] DataSourceV2: Use immutable logical plans.
## What changes were proposed in this pull request?

SPARK-23203: DataSourceV2 should use immutable catalyst trees instead of wrapping a mutable DataSourceV2Reader. This commit updates DataSourceV2Relation and consolidates much of the DataSourceV2 API requirements for the read path in it. Instead of wrapping a reader that changes, the relation lazily produces a reader from its configuration.

This commit also updates the predicate and projection push-down. Instead of the implementation from SPARK-22197, this reuses the rule matching from the Hive and DataSource read paths (using `PhysicalOperation`) and copies most of the implementation of `SparkPlanner.pruneFilterProject`, with updates for DataSourceV2. By reusing the implementation from other read paths, this should have fewer regressions from other read paths and is less code to maintain.

The new push-down rules also supports the following edge cases:

* The output of DataSourceV2Relation should be what is returned by the reader, in case the reader can only partially satisfy the requested schema projection
* The requested projection passed to the DataSourceV2Reader should include filter columns
* The push-down rule may be run more than once if filters are not pushed through projections

## How was this patch tested?

Existing push-down and read tests.

Author: Ryan Blue <blue@apache.org>

Closes #20387 from rdblue/SPARK-22386-push-down-immutable-trees.
2018-02-20 16:04:22 +08:00
Tathagata Das 0a73aa31f4 [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
## What changes were proposed in this pull request?
Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with data source v2).

Performance comparison:
In a unit test with in-process Kafka broker, I tested the read throughput of V1 and V2 using 20M records in a single partition. They were comparable.

## How was this patch tested?
Existing tests, few modified to be better tests than the existing ones.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20554 from tdas/SPARK-23362.
2018-02-16 14:30:19 -08:00
gatorsmile d6f5e172b4 Revert "[SPARK-23303][SQL] improve the explain result for data source v2 relations"
This reverts commit f17b936f0d.
2018-02-13 16:21:17 -08:00
Wenchen Fan f17b936f0d [SPARK-23303][SQL] improve the explain result for data source v2 relations
## What changes were proposed in this pull request?

The current explain result for data source v2 relation is unreadable:
```
== Parsed Logical Plan ==
'Filter ('i > 6)
+- AnalysisBarrier
      +- Project [j#1]
         +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Filter (i#0 > 6)
   +- Project [j#1, i#0]
      +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Optimized Logical Plan ==
Project [j#1]
+- Filter isnotnull(i#0)
   +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Physical Plan ==
*(1) Project [j#1]
+- *(1) Filter isnotnull(i#0)
   +- *(1) DataSourceV2Scan [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940
```

after this PR
```
== Parsed Logical Plan ==
'Project [unresolvedalias('j, None)]
+- AnalysisBarrier
      +- Relation AdvancedDataSourceV2[i#0, j#1]

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Relation AdvancedDataSourceV2[i#0, j#1]

== Optimized Logical Plan ==
Relation AdvancedDataSourceV2[j#1]

== Physical Plan ==
*(1) Scan AdvancedDataSourceV2[j#1]
```
-------
```
== Analyzed Logical Plan ==
i: int, j: int
Filter (i#88 > 3)
+- Relation JavaAdvancedDataSourceV2[i#88, j#89]

== Optimized Logical Plan ==
Filter isnotnull(i#88)
+- Relation JavaAdvancedDataSourceV2[i#88, j#89] (PushedFilter: [GreaterThan(i,3)])

== Physical Plan ==
*(1) Filter isnotnull(i#88)
+- *(1) Scan JavaAdvancedDataSourceV2[i#88, j#89] (PushedFilter: [GreaterThan(i,3)])
```

an example for streaming query
```
== Parsed Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject cast(value#25 as string).toString, obj#4: java.lang.String
         +- Streaming Relation FakeDataSourceV2$[value#25]

== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject cast(value#25 as string).toString, obj#4: java.lang.String
         +- Streaming Relation FakeDataSourceV2$[value#25]

== Optimized Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject value#25.toString, obj#4: java.lang.String
         +- Streaming Relation FakeDataSourceV2$[value#25]

== Physical Plan ==
*(4) HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#11L])
+- StateStoreSave [value#6], state info [ checkpoint = *********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state, runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions = 5], Complete, 0
   +- *(3) HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#16L])
      +- StateStoreRestore [value#6], state info [ checkpoint = *********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state, runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions = 5]
         +- *(2) HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#16L])
            +- Exchange hashpartitioning(value#6, 5)
               +- *(1) HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#16L])
                  +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
                     +- *(1) MapElements <function1>, obj#5: java.lang.String
                        +- *(1) DeserializeToObject value#25.toString, obj#4: java.lang.String
                           +- *(1) Scan FakeDataSourceV2$[value#25]
```
## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20477 from cloud-fan/explain.
2018-02-12 21:12:22 -08:00
Wenchen Fan a75f927173 [SPARK-23268][SQL][FOLLOWUP] Reorganize packages in data source V2
## What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/20435.

While reorganizing the packages for streaming data source v2, the top level stream read/write support interfaces should not be in the reader/writer package, but should be in the `sources.v2` package, to follow the `ReadSupport`, `WriteSupport`, etc.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20509 from cloud-fan/followup.
2018-02-08 19:20:11 +08:00
Wenchen Fan fe73cb4b43 [SPARK-23317][SQL] rename ContinuousReader.setOffset to setStartOffset
## What changes were proposed in this pull request?

In the document of `ContinuousReader.setOffset`, we say this method is used to specify the start offset. We also have a `ContinuousReader.getStartOffset` to get the value back. I think it makes more sense to rename `ContinuousReader.setOffset` to `setStartOffset`.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20486 from cloud-fan/rename.
2018-02-02 20:49:08 -08:00
Wang Gengliang 56ae32657e [SPARK-23268][SQL] Reorganize packages in data source V2
## What changes were proposed in this pull request?
1. create a new package for partitioning/distribution related classes.
    As Spark will add new concrete implementations of `Distribution` in new releases, it is good to
    have a new package for partitioning/distribution related classes.

2. move streaming related class to package `org.apache.spark.sql.sources.v2.reader/writer.streaming`, instead of `org.apache.spark.sql.sources.v2.streaming.reader/writer`.
So that the there won't be package reader/writer inside package streaming, which is quite confusing.
Before change:
```
v2
├── reader
├── streaming
│   ├── reader
│   └── writer
└── writer
```

After change:
```
v2
├── reader
│   └── streaming
└── writer
    └── streaming
```
## How was this patch tested?
Unit test.

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #20435 from gengliangwang/new_pkg.
2018-01-31 20:33:51 -08:00
Wenchen Fan 0a9ac0248b [SPARK-23260][SPARK-23262][SQL] several data source v2 naming cleanup
## What changes were proposed in this pull request?

All other classes in the reader/writer package doesn't have `V2` in their names, and the streaming reader/writer don't have `V2` either. It's more consistent to remove `V2` from `DataSourceV2Reader` and `DataSourceVWriter`.

Also rename `DataSourceV2Option` to remote the `V2`, we should only have `V2` in the root interface: `DataSourceV2`.

This PR also fixes some places that the mix-in interface doesn't extend the interface it aimed to mix in.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20427 from cloud-fan/ds-v2.
2018-01-30 19:43:17 +08:00
Wang Gengliang badf0d0e0d [SPARK-23219][SQL] Rename ReadTask to DataReaderFactory in data source v2
## What changes were proposed in this pull request?

Currently we have `ReadTask` in data source v2 reader, while in writer we have `DataWriterFactory`.
To make the naming consistent and better, renaming `ReadTask` to `DataReaderFactory`.

## How was this patch tested?

Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #20397 from gengliangwang/rename.
2018-01-30 00:50:49 +08:00
Jose Torres 49b0207dc9 [SPARK-23196] Unify continuous and microbatch V2 sinks
## What changes were proposed in this pull request?

Replace streaming V2 sinks with a unified StreamWriteSupport interface, with a shim to use it with microbatch execution.

Add a new SQL config to use for disabling V2 sinks, falling back to the V1 sink implementation.

## How was this patch tested?

Existing tests, which in the case of Kafka (the only existing continuous V2 sink) now use V2 for microbatch.

Author: Jose Torres <jose@databricks.com>

Closes #20369 from jose-torres/streaming-sink.
2018-01-29 13:10:38 +08:00
Shixiong Zhu 073744985f [SPARK-23242][SS][TESTS] Don't run tests in KafkaSourceSuiteBase twice
## What changes were proposed in this pull request?

KafkaSourceSuiteBase should be abstract class, otherwise KafkaSourceSuiteBase will also run.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #20412 from zsxwing/SPARK-23242.
2018-01-26 16:09:57 -08:00
Dongjoon Hyun bc9641d902 [SPARK-23198][SS][TEST] Fix KafkaContinuousSourceStressForDontFailOnDataLossSuite to test ContinuousExecution
## What changes were proposed in this pull request?

Currently, `KafkaContinuousSourceStressForDontFailOnDataLossSuite` runs on `MicroBatchExecution`. It should test `ContinuousExecution`.

## How was this patch tested?

Pass the updated test suite.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20374 from dongjoon-hyun/SPARK-23198.
2018-01-24 12:58:44 -08:00
Jacek Laskowski 76b8b840dd [MINOR] Typo fixes
## What changes were proposed in this pull request?

Typo fixes

## How was this patch tested?

Local build / Doc-only changes

Author: Jacek Laskowski <jacek@japila.pl>

Closes #20344 from jaceklaskowski/typo-fixes.
2018-01-22 13:55:14 -06:00
Jose Torres 86a8450318 [SPARK-23033][SS] Don't use task level retry for continuous processing
## What changes were proposed in this pull request?

Continuous processing tasks will fail on any attempt number greater than 0. ContinuousExecution will catch these failures and restart globally from the last recorded checkpoints.
## How was this patch tested?
unit test

Author: Jose Torres <jose@databricks.com>

Closes #20225 from jose-torres/no-retry.
2018-01-17 13:52:51 -08:00
Jose Torres a963980a6d Fix merge between 07ae39d0ec and 1667057851
## What changes were proposed in this pull request?

The first commit added a new test, and the second refactored the class the test was in. The automatic merge put the test in the wrong place.

## How was this patch tested?
-

Author: Jose Torres <jose@databricks.com>

Closes #20289 from jose-torres/fix.
2018-01-16 22:27:28 -08:00
Jose Torres 1667057851 [SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to continuous Kafka data reader
## What changes were proposed in this pull request?

The Kafka reader is now interruptible and can close itself.
## How was this patch tested?

I locally ran one of the ContinuousKafkaSourceSuite tests in a tight loop. Before the fix, my machine ran out of open file descriptors a few iterations in; now it works fine.

Author: Jose Torres <jose@databricks.com>

Closes #20253 from jose-torres/fix-data-reader.
2018-01-16 18:11:27 -08:00
Yuanjian Li 07ae39d0ec [SPARK-22956][SS] Bug fix for 2 streams union failover scenario
## What changes were proposed in this pull request?

This problem reported by yanlin-Lynn ivoson and LiangchangZ. Thanks!

When we union 2 streams from kafka or other sources, while one of them have no continues data coming and in the same time task restart, this will cause an `IllegalStateException`. This mainly cause because the code in [MicroBatchExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L190) , while one stream has no continues data, its comittedOffset same with availableOffset during `populateStartOffsets`, and `currentPartitionOffsets` not properly handled in KafkaSource. Also, maybe we should also consider this scenario in other Source.

## How was this patch tested?

Add a UT in KafkaSourceSuite.scala

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes #20150 from xuanyuanking/SPARK-22956.
2018-01-15 22:01:14 -08:00
Dongjoon Hyun 7a3d0aad2b [SPARK-23038][TEST] Update docker/spark-test (JDK/OS)
## What changes were proposed in this pull request?

This PR aims to update the followings in `docker/spark-test`.

- JDK7 -> JDK8
Spark 2.2+ supports JDK8 only.

- Ubuntu 12.04.5 LTS(precise) -> Ubuntu 16.04.3 LTS(xeniel)
The end of life of `precise` was April 28, 2017.

## How was this patch tested?

Manual.

* Master
```
$ cd external/docker
$ ./build
$ export SPARK_HOME=...
$ docker run -v $SPARK_HOME:/opt/spark spark-test-master
CONTAINER_IP=172.17.0.3
...
18/01/11 06:50:25 INFO MasterWebUI: Bound MasterWebUI to 172.17.0.3, and started at http://172.17.0.3:8080
18/01/11 06:50:25 INFO Utils: Successfully started service on port 6066.
18/01/11 06:50:25 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066
18/01/11 06:50:25 INFO Master: I have been elected leader! New state: ALIVE
```

* Slave
```
$ docker run -v $SPARK_HOME:/opt/spark spark-test-worker spark://172.17.0.3:7077
CONTAINER_IP=172.17.0.4
...
18/01/11 06:51:54 INFO Worker: Successfully registered with master spark://172.17.0.3:7077
```

After slave starts, master will show
```
18/01/11 06:51:54 INFO Master: Registering worker 172.17.0.4:8888 with 4 cores, 1024.0 MB RAM
```

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20230 from dongjoon-hyun/SPARK-23038.
2018-01-13 23:26:12 -08:00
Sameer Agarwal 55dbfbca37 Revert "[SPARK-22908] Add kafka source and sink for continuous processing."
This reverts commit 6f7aaed805.
2018-01-12 15:00:00 -08:00
gatorsmile 651f76153f [SPARK-23028] Bump master branch version to 2.4.0-SNAPSHOT
## What changes were proposed in this pull request?
This patch bumps the master branch version to `2.4.0-SNAPSHOT`.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20222 from gatorsmile/bump24.
2018-01-13 00:37:59 +08:00
Jose Torres 6f7aaed805 [SPARK-22908] Add kafka source and sink for continuous processing.
## What changes were proposed in this pull request?

Add kafka source and sink for continuous processing. This involves two small changes to the execution engine:

* Bring data reader close() into the normal data reader thread to avoid thread safety issues.
* Fix up the semantics of the RECONFIGURING StreamExecution state. State updates are now atomic, and we don't have to deal with swallowing an exception.

## How was this patch tested?

new unit tests

Author: Jose Torres <jose@databricks.com>

Closes #20096 from jose-torres/continuous-kafka.
2018-01-11 10:52:12 -08:00
Sean Owen c284c4e1f6 [MINOR] Fix a bunch of typos 2018-01-02 07:10:19 +09:00
Dongjoon Hyun 5536f3181c [MINOR][BUILD] Fix Java linter errors
## What changes were proposed in this pull request?

This PR cleans up a few Java linter errors for Apache Spark 2.3 release.

## How was this patch tested?

```bash
$ dev/lint-java
Using `mvn` from path: /usr/local/bin/mvn
Checkstyle checks passed.
```

We can see the result from [Travis CI](https://travis-ci.org/dongjoon-hyun/spark/builds/322470787), too.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20101 from dongjoon-hyun/fix-java-lint.
2017-12-28 09:43:50 -06:00
Yash Sharma 0e6833006d [SPARK-20168][DSTREAM] Add changes to use kinesis fetches from specific timestamp
## What changes were proposed in this pull request?

Kinesis client can resume from a specified timestamp while creating a stream. We should have option to pass a timestamp in config to allow kinesis to resume from the given timestamp.

The patch introduces a new `KinesisInitialPositionInStream` that takes the `InitialPositionInStream` with the `timestamp` information that can be used to resume kinesis fetches from the provided timestamp.

## How was this patch tested?

Unit Tests

cc : budde brkyvz

Author: Yash Sharma <ysharma@atlassian.com>

Closes #18029 from yssharma/ysharma/kcl_resume.
2017-12-26 09:50:39 +02:00
Jose Torres 7798c9e6ef [SPARK-22824] Restore old offset for binary compatibility
## What changes were proposed in this pull request?

Some users depend on source compatibility with the org.apache.spark.sql.execution.streaming.Offset class. Although this is not a stable interface, we can keep it in place for now to simplify upgrades to 2.3.

Author: Jose Torres <jose@databricks.com>

Closes #20012 from joseph-torres/binary-compat.
2017-12-20 10:43:10 -08:00
Jose Torres f8c7c1f21a [SPARK-22732] Add Structured Streaming APIs to DataSourceV2
## What changes were proposed in this pull request?

This PR provides DataSourceV2 API support for structured streaming, including new pieces needed to support continuous processing [SPARK-20928]. High level summary:

- DataSourceV2 includes new mixins to support micro-batch and continuous reads and writes. For reads, we accept an optional user specified schema rather than using the ReadSupportWithSchema model, because doing so would severely complicate the interface.

- DataSourceV2Reader includes new interfaces to read a specific microbatch or read continuously from a given offset. These follow the same setter pattern as the existing Supports* mixins so that they can work with SupportsScanUnsafeRow.

- DataReader (the per-partition reader) has a new subinterface ContinuousDataReader only for continuous processing. This reader has a special method to check progress, and next() blocks for new input rather than returning false.

- Offset, an abstract representation of position in a streaming query, is ported to the public API. (Each type of reader will define its own Offset implementation.)

- DataSourceV2Writer has a new subinterface ContinuousWriter only for continuous processing. Commits to this interface come tagged with an epoch number, as the execution engine will continue to produce new epoch commits as the task continues indefinitely.

Note that this PR does not propose to change the existing DataSourceV2 batch API, or deprecate the existing streaming source/sink internal APIs in spark.sql.execution.streaming.

## How was this patch tested?

Toy implementations of the new interfaces with unit tests.

Author: Jose Torres <jose@databricks.com>

Closes #19925 from joseph-torres/continuous-api.
2017-12-13 22:31:39 -08:00
gatorsmile a4002651a3 [SPARK-20557][SQL] Only support TIMESTAMP WITH TIME ZONE for Oracle Dialect
## What changes were proposed in this pull request?
In the previous PRs, https://github.com/apache/spark/pull/17832 and https://github.com/apache/spark/pull/17835 , we convert `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` to `TIMESTAMP` for all the JDBC sources. However, this conversion could be risky since it does not respect our SQL configuration `spark.sql.session.timeZone`.

In addition, each vendor might have different semantics for these two types. For example, Postgres simply returns `TIMESTAMP` types for `TIMESTAMP WITH TIME ZONE`. For such supports, we should do it case by case. This PR reverts the general support of `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` for JDBC sources, except ORACLE Dialect.

When supporting the ORACLE's `TIMESTAMP WITH TIME ZONE`, we only support it when the JVM default timezone is the same as the user-specified configuration `spark.sql.session.timeZone` (whose default is the JVM default timezone). Now, we still treat `TIMESTAMP WITH TIME ZONE` as `TIMESTAMP` when fetching the values via the Oracle JDBC connector, whose client converts the timestamp values with time zone to the timestamp values using the local JVM default timezone (a test case is added to `OracleIntegrationSuite.scala` in this PR for showing the behavior). Thus, to avoid any future behavior change, we will not support it if JVM default timezone is different from `spark.sql.session.timeZone`

No regression because the previous two PRs were just merged to be unreleased master branch.

## How was this patch tested?
Added the test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19939 from gatorsmile/timezoneUpdate.
2017-12-11 16:33:06 -08:00
kellyzly f41c0a93fd [SPARK-22660][BUILD] Use position() and limit() to fix ambiguity issue in scala-2.12
…a-2.12 and JDK9

## What changes were proposed in this pull request?
Some compile error after upgrading to scala-2.12
```javascript
spark_source/core/src/main/scala/org/apache/spark/executor/Executor.scala:455: ambiguous reference to overloaded definition, method limit in class ByteBuffer of type (x$1: Int)java.nio.ByteBuffer
method limit in class Buffer of type ()Int
match expected type ?
     val resultSize = serializedDirectResult.limit
error
```
The limit method was moved from ByteBuffer to the superclass Buffer and it can no longer be called without (). The same reason for position method.

```javascript
/home/zly/prj/oss/jdk9_HOS_SOURCE/spark_source/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala:427: ambiguous reference to overloaded definition, [error] both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit [error] and  method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit [error] match argument types (java.util.Map[String,String])
 [error]       props.putAll(outputSerdeProps.toMap.asJava)
 [error]             ^
 ```
This is because the key type is Object instead of String which is unsafe.

## How was this patch tested?

running tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: kellyzly <kellyzly@126.com>

Closes #19854 from kellyzly/SPARK-22660.
2017-12-07 10:04:04 -06:00
Jen-Ming Chung bc7ca9786e [SPARK-22291][SQL] Conversion error when transforming array types of uuid, inet and cidr to StingType in PostgreSQL
## What changes were proposed in this pull request?

This PR fixes the conversion error when reads data from a PostgreSQL table that contains columns of `uuid[]`, `inet[]` and `cidr[]` data types.

For example, create a table with the uuid[] data type, and insert the test data.
```SQL
CREATE TABLE users
(
    id smallint NOT NULL,
    name character varying(50),
    user_ids uuid[],
    PRIMARY KEY (id)
)

INSERT INTO users ("id", "name","user_ids")
VALUES (1, 'foo', ARRAY
    ['7be8aaf8-650e-4dbb-8186-0a749840ecf2'
    ,'205f9bfc-018c-4452-a605-609c0cfad228']::UUID[]
)
```
Then it will throw the following exceptions when trying to load the data.
```
java.lang.ClassCastException: [Ljava.util.UUID; cannot be cast to [Ljava.lang.String;
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:459)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:458)
...
```

## How was this patch tested?

Added test in `PostgresIntegrationSuite`.

Author: Jen-Ming Chung <jenmingisme@gmail.com>

Closes #19567 from jmchung/SPARK-22291.
2017-10-29 18:11:48 +01:00
Kohki Nishio 5a5b6b7851 [SPARK-22303][SQL] Handle Oracle specific jdbc types in OracleDialect
TIMESTAMP (-101), BINARY_DOUBLE (101) and BINARY_FLOAT (100) are handled in OracleDialect

## What changes were proposed in this pull request?

When a oracle table contains columns whose type is BINARY_FLOAT or BINARY_DOUBLE, spark sql fails to load a table with SQLException

```
java.sql.SQLException: Unsupported type 101
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getCatalystType(JdbcUtils.scala:235)
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:292)
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:292)
 at scala.Option.getOrElse(Option.scala:121)
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:291)
 at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:64)
 at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:113)
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
 at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:306)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
```

## How was this patch tested?

I updated a UT which covers type conversion test for types (-101, 100, 101), on top of that I tested this change against actual table with those columns and it was able to read and write to the table.

Author: Kohki Nishio <taroplus@me.com>

Closes #19548 from taroplus/oracle_sql_types_101.
2017-10-23 09:55:46 -07:00
Sean Owen 0c03297bf0 [SPARK-22142][BUILD][STREAMING] Move Flume support behind a profile, take 2
## What changes were proposed in this pull request?

Move flume behind a profile, take 2. See https://github.com/apache/spark/pull/19365 for most of the back-story.

This change should fix the problem by removing the examples module dependency and moving Flume examples to the module itself. It also adds deprecation messages, per a discussion on dev about deprecating for 2.3.0.

## How was this patch tested?

Existing tests, which still enable flume integration.

Author: Sean Owen <sowen@cloudera.com>

Closes #19412 from srowen/SPARK-22142.2.
2017-10-06 15:08:28 +01:00
Sean Owen 576c43fb42 [SPARK-22087][SPARK-14650][WIP][BUILD][REPL][CORE] Compile Spark REPL for Scala 2.12 + other 2.12 fixes
## What changes were proposed in this pull request?

Enable Scala 2.12 REPL. Fix most remaining issues with 2.12 compilation and warnings, including:

- Selecting Kafka 0.10.1+ for Scala 2.12 and patching over a minor API difference
- Fixing lots of "eta expansion of zero arg method deprecated" warnings
- Resolving the SparkContext.sequenceFile implicits compile problem
- Fixing an odd but valid jetty-server missing dependency in hive-thriftserver

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19307 from srowen/Scala212.
2017-09-24 09:40:13 +01:00
Yuming Wang 17edfec59d [SPARK-20427][SQL] Read JDBC table use custom schema
## What changes were proposed in this pull request?

Auto generated Oracle schema some times not we expect:

- `number(1)` auto mapped to BooleanType, some times it's not we expect, per [SPARK-20921](https://issues.apache.org/jira/browse/SPARK-20921).
-  `number` auto mapped to Decimal(38,10), It can't read big data, per [SPARK-20427](https://issues.apache.org/jira/browse/SPARK-20427).

This PR fix this issue by custom schema as follows:
```scala
val props = new Properties()
props.put("customSchema", "ID decimal(38, 0), N1 int, N2 boolean")
val dfRead = spark.read.schema(schema).jdbc(jdbcUrl, "tableWithCustomSchema", props)
dfRead.show()
```
or
```sql
CREATE TEMPORARY VIEW tableWithCustomSchema
USING org.apache.spark.sql.jdbc
OPTIONS (url '$jdbcUrl', dbTable 'tableWithCustomSchema', customSchema'ID decimal(38, 0), N1 int, N2 boolean')
```

## How was this patch tested?

unit tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #18266 from wangyum/SPARK-20427.
2017-09-13 16:34:17 -07:00
Sean Owen 4fbf748bf8 [SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behind a profile
## What changes were proposed in this pull request?

Put Kafka 0.8 support behind a kafka-0-8 profile.

## How was this patch tested?

Existing tests, but, until PR builder and Jenkins configs are updated the effect here is to not build or test Kafka 0.8 support at all.

Author: Sean Owen <sowen@cloudera.com>

Closes #19134 from srowen/SPARK-21893.
2017-09-13 10:10:40 +01:00
caoxuewen dc74c0e67d [MINOR][SQL] remove unuse import class
## What changes were proposed in this pull request?

this PR describe remove the import class that are unused.

## How was this patch tested?

N/A

Author: caoxuewen <cao.xuewen@zte.com.cn>

Closes #19131 from heary-cao/unuse_import.
2017-09-11 10:09:20 +01:00
Sean Owen 12ab7f7e89 [SPARK-14280][BUILD][WIP] Update change-version.sh and pom.xml to add Scala 2.12 profiles and enable 2.12 compilation
…build; fix some things that will be warnings or errors in 2.12; restore Scala 2.12 profile infrastructure

## What changes were proposed in this pull request?

This change adds back the infrastructure for a Scala 2.12 build, but does not enable it in the release or Python test scripts.

In order to make that meaningful, it also resolves compile errors that the code hits in 2.12 only, in a way that still works with 2.11.

It also updates dependencies to the earliest minor release of dependencies whose current version does not yet support Scala 2.12. This is in a sense covered by other JIRAs under the main umbrella, but implemented here. The versions below still work with 2.11, and are the _latest_ maintenance release in the _earliest_ viable minor release.

- Scalatest 2.x -> 3.0.3
- Chill 0.8.0 -> 0.8.4
- Clapper 1.0.x -> 1.1.2
- json4s 3.2.x -> 3.4.2
- Jackson 2.6.x -> 2.7.9 (required by json4s)

This change does _not_ fully enable a Scala 2.12 build:

- It will also require dropping support for Kafka before 0.10. Easy enough, just didn't do it yet here
- It will require recreating `SparkILoop` and `Main` for REPL 2.12, which is SPARK-14650. Possible to do here too.

What it does do is make changes that resolve much of the remaining gap without affecting the current 2.11 build.

## How was this patch tested?

Existing tests and build. Manually tested with `./dev/change-scala-version.sh 2.12` to verify it compiles, modulo the exceptions above.

Author: Sean Owen <sowen@cloudera.com>

Closes #18645 from srowen/SPARK-14280.
2017-09-01 19:21:21 +01:00
Yuval Itzchakov 8f0df6bc10 [SPARK-21873][SS] - Avoid using return inside CachedKafkaConsumer.get
During profiling of a structured streaming application with Kafka as the source, I came across this exception:

![Structured Streaming Kafka Exceptions](https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png)

This is a 1 minute sample, which caused 106K `NonLocalReturnControl` exceptions to be thrown.
This happens because `CachedKafkaConsumer.get` is ran inside:

`private def runUninterruptiblyIfPossible[T](body: => T): T`

Where `body: => T` is the `get` method. Turning the method into a function means that in order to escape the `while` loop defined in `get` the runtime has to do dirty tricks which involve throwing the above exception.

## What changes were proposed in this pull request?

Instead of using `return` (which is generally not recommended in Scala), we place the result of the `fetchData` method inside a local variable and use a boolean flag to indicate the status of fetching data, which we monitor as our predicate to the `while` loop.

## How was this patch tested?

I've ran the `KafkaSourceSuite` to make sure regression passes. Since the exception isn't visible from user code, there is no way (at least that I could think of) to add this as a test to the existing suite.

Author: Yuval Itzchakov <yuval.itzchakov@clicktale.com>

Closes #19059 from YuvalItzchakov/master.
2017-08-30 10:33:23 +01:00
Jose Torres 3c0c2d09ca [SPARK-21765] Set isStreaming on leaf nodes for streaming plans.
## What changes were proposed in this pull request?
All streaming logical plans will now have isStreaming set. This involved adding isStreaming as a case class arg in a few cases, since a node might be logically streaming depending on where it came from.

## How was this patch tested?

Existing unit tests - no functional change is intended in this PR.

Author: Jose Torres <joseph-torres@databricks.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18973 from joseph-torres/SPARK-21765.
2017-08-22 19:07:43 -07:00
Yuming Wang ba843292e3 [SPARK-21790][TESTS][FOLLOW-UP] Add filter pushdown verification back.
## What changes were proposed in this pull request?

The previous PR(https://github.com/apache/spark/pull/19000) removed filter pushdown verification, This PR add them back.

## How was this patch tested?
manual tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #19002 from wangyum/SPARK-21790-follow-up.
2017-08-21 10:16:56 -07:00
Yuming Wang 72b738d8dc [SPARK-21790][TESTS] Fix Docker-based Integration Test errors.
## What changes were proposed in this pull request?
[SPARK-17701](https://github.com/apache/spark/pull/18600/files#diff-b9f96d092fb3fea76bcf75e016799678L77) removed `metadata` function, this PR removed the Docker-based Integration module that has been relevant to `SparkPlan.metadata`.

## How was this patch tested?
manual tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #19000 from wangyum/SPARK-21709.
2017-08-19 11:41:32 -07:00
Marcelo Vanzin 3f958a9992 [SPARK-21731][BUILD] Upgrade scalastyle to 0.9.
This version fixes a few issues in the import order checker; it provides
better error messages, and detects more improper ordering (thus the need
to change a lot of files in this patch). The main fix is that it correctly
complains about the order of packages vs. classes.

As part of the above, I moved some "SparkSession" import in ML examples
inside the "$example on$" blocks; that didn't seem consistent across
different source files to start with, and avoids having to add more on/off blocks
around specific imports.

The new scalastyle also seems to have a better header detector, so a few
license headers had to be updated to match the expected indentation.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18943 from vanzin/SPARK-21731.
2017-08-15 13:59:00 -07:00
Takeshi Yamamuro b78cf13bf0 [SPARK-21276][CORE] Update lz4-java to the latest (v1.4.0)
## What changes were proposed in this pull request?
This pr updated `lz4-java` to the latest (v1.4.0) and removed custom `LZ4BlockInputStream`. We currently use custom `LZ4BlockInputStream` to read concatenated byte stream in shuffle. But, this functionality has been implemented in the latest lz4-java (https://github.com/lz4/lz4-java/pull/105). So, we might update the latest to remove the custom `LZ4BlockInputStream`.

Major diffs between the latest release and v1.3.0 in the master are as follows (62f7547abb...6d4693f562);
- fixed NPE in XXHashFactory similarly
- Don't place resources in default package to support shading
- Fixes ByteBuffer methods failing to apply arrayOffset() for array-backed
- Try to load lz4-java from java.library.path, then fallback to bundled
- Add ppc64le binary
- Add s390x JNI binding
- Add basic LZ4 Frame v1.5.0 support
- enable aarch64 support for lz4-java
- Allow unsafeInstance() for ppc64le archiecture
- Add unsafeInstance support for AArch64
- Support 64-bit JNI build on Solaris
- Avoid over-allocating a buffer
- Allow EndMark to be incompressible for LZ4FrameInputStream.
- Concat byte stream

## How was this patch tested?
Existing tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #18883 from maropu/SPARK-21276.
2017-08-09 17:31:52 +02:00
Yash Sharma 4f77c06238 [SPARK-20855][Docs][DStream] Update the Spark kinesis docs to use the KinesisInputDStream builder instead of deprecated KinesisUtils
## What changes were proposed in this pull request?

The examples and docs for Spark-Kinesis integrations use the deprecated KinesisUtils. We should update the docs to use the KinesisInputDStream builder to create DStreams.

## How was this patch tested?

The patch primarily updates the documents. The patch will also need to make changes to the Spark-Kinesis examples. The examples need to be tested.

Author: Yash Sharma <ysharma@atlassian.com>

Closes #18071 from yssharma/ysharma/kinesis_docs.
2017-07-25 08:27:03 +01:00
Tim Van Wassenhove 03367d7aa3 [SPARK-21142][SS] spark-streaming-kafka-0-10 should depend on kafka-clients instead of full blown kafka library
## What changes were proposed in this pull request?

Currently spark-streaming-kafka-0-10 has a dependency on the full kafka distribution (but only uses and requires the kafka-clients library).

The PR fixes that (the library only depends on kafka-clients), and the tests depend on the full kafka.

## How was this patch tested?

All existing tests still pass.

Author: Tim Van Wassenhove <github@timvw.be>

Closes #18353 from timvw/master.
2017-07-20 18:19:14 +01:00
Sean Owen e26dac5feb [SPARK-21415] Triage scapegoat warnings, part 1
## What changes were proposed in this pull request?

Address scapegoat warnings for:
- BigDecimal double constructor
- Catching NPE
- Finalizer without super
- List.size is O(n)
- Prefer Seq.empty
- Prefer Set.empty
- reverse.map instead of reverseMap
- Type shadowing
- Unnecessary if condition.
- Use .log1p
- Var could be val

In some instances like Seq.empty, I avoided making the change even where valid in test code to keep the scope of the change smaller. Those issues are concerned with performance and it won't matter for tests.

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #18635 from srowen/Scapegoat1.
2017-07-18 08:47:17 +01:00
Sean Owen 425c4ada4c [SPARK-19810][BUILD][CORE] Remove support for Scala 2.10
## What changes were proposed in this pull request?

- Remove Scala 2.10 build profiles and support
- Replace some 2.10 support in scripts with commented placeholders for 2.12 later
- Remove deprecated API calls from 2.10 support
- Remove usages of deprecated context bounds where possible
- Remove Scala 2.10 workarounds like ScalaReflectionLock
- Other minor Scala warning fixes

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #17150 from srowen/SPARK-19810.
2017-07-13 17:06:24 +08:00
Rui Zha d4107196d5 [SPARK-18004][SQL] Make sure the date or timestamp related predicate can be pushed down to Oracle correctly
## What changes were proposed in this pull request?

Move `compileValue` method in JDBCRDD to JdbcDialect, and override the `compileValue` method in OracleDialect to rewrite the Oracle-specific timestamp and date literals in where clause.

## How was this patch tested?

An integration test has been added.

Author: Rui Zha <zrdt713@gmail.com>
Author: Zharui <zrdt713@gmail.com>

Closes #18451 from SharpRay/extend-compileValue-to-dialects.
2017-07-02 17:37:47 -07:00
Gabor Feher b837bf9ae9 [SPARK-20555][SQL] Fix mapping of Oracle DECIMAL types to Spark types in read path
## What changes were proposed in this pull request?

This PR is to revert some code changes in the read path of https://github.com/apache/spark/pull/14377. The original fix is https://github.com/apache/spark/pull/17830

When merging this PR, please give the credit to gaborfeher

## How was this patch tested?

Added a test case to OracleIntegrationSuite.scala

Author: Gabor Feher <gabor.feher@lynxanalytics.com>
Author: gatorsmile <gatorsmile@gmail.com>

Closes #18408 from gatorsmile/OracleType.
2017-06-23 21:53:38 -07:00
sureshthalamati 9ce714dca2 [SPARK-10655][SQL] Adding additional data type mappings to jdbc DB2dialect.
This patch adds DB2 specific data type mappings for decfloat, real, xml , and timestamp with time zone (DB2Z specific type)  types on read and for byte, short data types  on write to the to jdbc data source DB2 dialect. Default mapping does not work for these types when reading/writing from DB2 database.

Added docker test, and a JDBC unit test case.

Author: sureshthalamati <suresh.thalamati@gmail.com>

Closes #9162 from sureshthalamati/db2dialect_enhancements-spark-10655.
2017-06-20 22:35:42 -07:00
Mark Grover 55b8cfe6e6 [SPARK-19185][DSTREAM] Make Kafka consumer cache configurable
## What changes were proposed in this pull request?

Add a new property `spark.streaming.kafka.consumer.cache.enabled` that allows users to enable or disable the cache for Kafka consumers. This property can be especially handy in cases where issues like SPARK-19185 get hit, for which there isn't a solution committed yet. By default, the cache is still on, so this change doesn't change any out-of-box behavior.

## How was this patch tested?
Running unit tests

Author: Mark Grover <mark@apache.org>
Author: Mark Grover <grover.markgrover@gmail.com>

Closes #18234 from markgrover/spark-19185.
2017-06-08 09:55:43 -07:00
Wenchen Fan 10e526e7e6 [SPARK-20213][SQL] Fix DataFrameWriter operations in SQL UI tab
## What changes were proposed in this pull request?

Currently the `DataFrameWriter` operations have several problems:

1. non-file-format data source writing action doesn't show up in the SQL tab in Spark UI
2. file-format data source writing action shows a scan node in the SQL tab, without saying anything about writing. (streaming also have this issue, but not fixed in this PR)
3. Spark SQL CLI actions don't show up in the SQL tab.

This PR fixes all of them, by refactoring the `ExecuteCommandExec` to make it have children.

 close https://github.com/apache/spark/pull/17540

## How was this patch tested?

existing tests.

Also test the UI manually. For a simple command: `Seq(1 -> "a").toDF("i", "j").write.parquet("/tmp/qwe")`

before this PR:
<img width="266" alt="qq20170523-035840 2x" src="https://cloud.githubusercontent.com/assets/3182036/26326050/24e18ba2-3f6c-11e7-8817-6dd275bf6ac5.png">
after this PR:
<img width="287" alt="qq20170523-035708 2x" src="https://cloud.githubusercontent.com/assets/3182036/26326054/2ad7f460-3f6c-11e7-8053-d68325beb28f.png">

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18064 from cloud-fan/execution.
2017-05-30 20:12:32 -07:00
Prashant Sharma 96a4d1d082 [SPARK-19968][SS] Use a cached instance of KafkaProducer instead of creating one every batch.
## What changes were proposed in this pull request?

In summary, cost of recreating a KafkaProducer for writing every batch is high as it starts a lot threads and make connections and then closes them. A KafkaProducer instance is promised to be thread safe in Kafka docs. Reuse of KafkaProducer instance while writing via multiple threads is encouraged.

Furthermore, I have performance improvement of 10x in latency, with this patch.

### These are times that addBatch took in ms. Without applying this patch
![with-out_patch](https://cloud.githubusercontent.com/assets/992952/23994612/a9de4a42-0a6b-11e7-9d5b-7ae18775bee4.png)
### These are times that addBatch took in ms. After applying this patch
![with_patch](https://cloud.githubusercontent.com/assets/992952/23994616/ad8c11ec-0a6b-11e7-8634-2266ebb5033f.png)

## How was this patch tested?
Running distributed benchmarks comparing runs with this patch and without it.
Added relevant unit tests.

Author: Prashant Sharma <prashsh1@in.ibm.com>

Closes #17308 from ScrapCodes/cached-kafka-producer.
2017-05-29 18:12:01 -07:00
liuzhaokun dba2ca2c12 [SPARK-20759] SCALA_VERSION in _config.yml should be consistent with pom.xml
[https://issues.apache.org/jira/browse/SPARK-20759](https://issues.apache.org/jira/browse/SPARK-20759)
SCALA_VERSION in _config.yml is 2.11.7, but 2.11.8 in pom.xml. So I think SCALA_VERSION in _config.yml should be consistent with pom.xml.

Author: liuzhaokun <liu.zhaokun@zte.com.cn>

Closes #17992 from liu-zhaokun/new.
2017-05-19 15:26:39 +01:00
Shixiong Zhu 324a904d8e [SPARK-13747][CORE] Add ThreadUtils.awaitReady and disallow Await.ready
## What changes were proposed in this pull request?

Add `ThreadUtils.awaitReady` similar to `ThreadUtils.awaitResult` and disallow `Await.ready`.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17763 from zsxwing/awaitready.
2017-05-17 17:21:46 -07:00
Yash Sharma 38f4e8692c [SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries
## What changes were proposed in this pull request?

The pull requests proposes to remove the hardcoded values for Amazon Kinesis - MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES.

This change is critical for kinesis checkpoint recovery when the kinesis backed rdd is huge.
Following happens in a typical kinesis recovery :
- kinesis throttles large number of requests while recovering
- retries in case of throttling are not able to recover due to the small wait period
- kinesis throttles per second, the wait period should be configurable for recovery

The patch picks the spark kinesis configs from:
- spark.streaming.kinesis.retry.wait.time
- spark.streaming.kinesis.retry.max.attempts

Jira : https://issues.apache.org/jira/browse/SPARK-20140

## How was this patch tested?

Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the modified configurations. Wasn't able to test the patch with actual throttling.

Author: Yash Sharma <ysharma@atlassian.com>

Closes #17467 from yssharma/ysharma/spark-kinesis-retries.
2017-05-16 15:08:05 -07:00
Jacek Laskowski 7144b51809 [SPARK-20600][SS] KafkaRelation should be pretty printed in web UI
## What changes were proposed in this pull request?

User-friendly name of `KafkaRelation` in web UI (under Details for Query).

### Before

<img width="516" alt="spark-20600-before" src="https://cloud.githubusercontent.com/assets/62313/25841955/74479ac6-34a2-11e7-87fb-d9f62a1356a7.png">

### After

<img width="439" alt="spark-20600-after" src="https://cloud.githubusercontent.com/assets/62313/25841829/f5335630-34a1-11e7-85a4-afe9b66d73c8.png">

## How was this patch tested?

Local build

```
./bin/spark-shell --jars ~/.m2/repository/org/apache/spark/spark-sql-kafka-0-10_2.11/2.3.0-SNAPSHOT/spark-sql-kafka-0-10_2.11-2.3.0-SNAPSHOT.jar --packages org.apache.kafka:kafka-clients:0.10.0.1
```

Author: Jacek Laskowski <jacek@japila.pl>

Closes #17917 from jaceklaskowski/SPARK-20600-KafkaRelation-webUI.
2017-05-11 10:55:11 -07:00
Xianyang Liu fcb88f9211 [MINOR][BUILD] Fix lint-java breaks.
## What changes were proposed in this pull request?

This PR proposes to fix the lint-breaks as below:
```
[ERROR] src/main/java/org/apache/spark/unsafe/Platform.java:[51] (regexp) RegexpSingleline: No trailing whitespace allowed.
[ERROR] src/main/scala/org/apache/spark/sql/streaming/Trigger.java:[45,25] (naming) MethodName: Method name 'ProcessingTime' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/main/scala/org/apache/spark/sql/streaming/Trigger.java:[62,25] (naming) MethodName: Method name 'ProcessingTime' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/main/scala/org/apache/spark/sql/streaming/Trigger.java:[78,25] (naming) MethodName: Method name 'ProcessingTime' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/main/scala/org/apache/spark/sql/streaming/Trigger.java:[92,25] (naming) MethodName: Method name 'ProcessingTime' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/main/scala/org/apache/spark/sql/streaming/Trigger.java:[102,25] (naming) MethodName: Method name 'Once' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java:[28,8] (imports) UnusedImports: Unused import - org.apache.spark.streaming.api.java.JavaDStream.
```

after:
```
dev/lint-java
Checkstyle checks passed.
```
[Test Result](https://travis-ci.org/ConeyLiu/spark/jobs/229666169)

## How was this patch tested?

Travis CI

Author: Xianyang Liu <xianyang.liu@intel.com>

Closes #17890 from ConeyLiu/codestyle.
2017-05-10 13:56:34 +01:00
Xiao Li cafca54c0e [SPARK-20557][SQL] Support JDBC data type Time with Time Zone
### What changes were proposed in this pull request?

This PR is to support JDBC data type TIME WITH TIME ZONE. It can be converted to TIMESTAMP

In addition, before this PR, for unsupported data types, we simply output the type number instead of the type name.

```
java.sql.SQLException: Unsupported type 2014
```
After this PR, the message is like
```
java.sql.SQLException: Unsupported type TIMESTAMP_WITH_TIMEZONE
```

- Also upgrade the H2 version to `1.4.195` which has the type fix for "TIMESTAMP WITH TIMEZONE". However, it is not fully supported. Thus, we capture the exception, but we still need it to partially test the support of "TIMESTAMP WITH TIMEZONE", because Docker tests are not regularly run.

### How was this patch tested?
Added test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17835 from gatorsmile/h2.
2017-05-06 22:21:19 -07:00
Jannik Arndt b31648c081 [SPARK-20557][SQL] Support for db column type TIMESTAMP WITH TIME ZONE
## What changes were proposed in this pull request?

SparkSQL can now read from a database table with column type [TIMESTAMP WITH TIME ZONE](https://docs.oracle.com/javase/8/docs/api/java/sql/Types.html#TIMESTAMP_WITH_TIMEZONE).

## How was this patch tested?

Tested against Oracle database.

JoshRosen, you seem to know the class, would you look at this? Thanks!

Author: Jannik Arndt <jannik@jannikarndt.de>

Closes #17832 from JannikArndt/spark-20557-timestamp-with-timezone.
2017-05-05 11:42:55 -07:00
Shixiong Zhu bd57882879 [SPARK-20603][SS][TEST] Set default number of topic partitions to 1 to reduce the load
## What changes were proposed in this pull request?

I checked the logs of https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-2.2-test-maven-hadoop-2.7/47/ and found it took several seconds to create Kafka internal topic `__consumer_offsets`. As Kafka creates this topic lazily, the topic creation happens in the first test `deserialization of initial offset with Spark 2.1.0` and causes it timeout.

This PR changes `offsets.topic.num.partitions` from the default value 50 to 1 to make creating `__consumer_offsets` (50 partitions -> 1 partition) much faster.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17863 from zsxwing/fix-kafka-flaky-test.
2017-05-05 11:08:26 -07:00
Bill Chambers 733b81b835 [SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans
## What changes were proposed in this pull request?

We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka.

## How was this patch tested?

New unit test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Bill Chambers <bill@databricks.com>

Closes #17804 from anabranch/SPARK-20496-2.
2017-04-28 10:18:31 -07:00
Shixiong Zhu 823baca2cb [SPARK-20452][SS][KAFKA] Fix a potential ConcurrentModificationException for batch Kafka DataFrame
## What changes were proposed in this pull request?

Cancel a batch Kafka query but one of task cannot be cancelled, and rerun the same DataFrame may cause ConcurrentModificationException because it may launch two tasks sharing the same group id.

This PR always create a new consumer when `reuseKafkaConsumer = false` to avoid ConcurrentModificationException. It also contains other minor fixes.

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17752 from zsxwing/kafka-fix.
2017-04-27 13:58:44 -07:00
Shixiong Zhu 01c999e7f9 [SPARK-20461][CORE][SS] Use UninterruptibleThread for Executor and fix the potential hang in CachedKafkaConsumer
## What changes were proposed in this pull request?

This PR changes Executor's threads to `UninterruptibleThread` so that we can use `runUninterruptibly` in `CachedKafkaConsumer`. However, this is just best effort to avoid hanging forever. If the user uses`CachedKafkaConsumer` in another thread (e.g., create a new thread or Future), the potential hang may still happen.

## How was this patch tested?

The new added test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17761 from zsxwing/int.
2017-04-27 13:55:03 -07:00
Josh Rosen f44c8a843c [SPARK-20453] Bump master branch version to 2.3.0-SNAPSHOT
This patch bumps the master branch version to `2.3.0-SNAPSHOT`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17753 from JoshRosen/SPARK-20453.
2017-04-24 21:48:04 -07:00
Yash Sharma ec68d8f8cf [SPARK-20189][DSTREAM] Fix spark kinesis testcases to remove deprecated createStream and use Builders
## What changes were proposed in this pull request?

The spark-kinesis testcases use the KinesisUtils.createStream which are deprecated now. Modify the testcases to use the recommended KinesisInputDStream.builder instead.
This change will also enable the testcases to automatically use the session tokens automatically.

## How was this patch tested?

All the existing testcases work fine as expected with the changes.

https://issues.apache.org/jira/browse/SPARK-20189

Author: Yash Sharma <ysharma@atlassian.com>

Closes #17506 from yssharma/ysharma/cleanup_kinesis_testcases.
2017-04-13 08:49:19 +01:00