Commit graph

22017 commits

Author SHA1 Message Date
hyukjinkwon 2c9c8629b7 [MINOR][YARN] Add YARN-specific credential providers in debug logging message
This PR adds a debugging log for YARN-specific credential providers which is loaded by service loader mechanism.

It took me a while to debug if it's actually loaded or not. I had to explicitly set the deprecated configuration and check if that's actually being loaded.

The change scope is manually tested. Logs are like:

```
Using the following builtin delegation token providers: hadoopfs, hive, hbase.
Using the following YARN-specific credential providers: yarn-test.
```

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21466 from HyukjinKwon/minor-log.

Change-Id: I18e2fb8eeb3289b148f24c47bb3130a560a881cf
2018-06-01 08:44:57 +08:00
Stavros Kontopoulos 21e1fc7d4a [SPARK-24232][K8S] Add support for secret env vars
## What changes were proposed in this pull request?

* Allows to refer a secret as an env var.
* Introduces new config properties in the form: spark.kubernetes{driver,executor}.secretKeyRef.ENV_NAME=name:key
  ENV_NAME is case sensitive.

* Updates docs.
* Adds required unit tests.

## How was this patch tested?
Manually tested and confirmed that the secrets exist in driver's and executor's container env.
Also job finished successfully.
First created a secret with the following yaml:
```
apiVersion: v1
kind: Secret
metadata:
  name: test-secret
data:
  username: c3RhdnJvcwo=
  password: Mzk1MjgkdmRnN0pi

-------

$ echo -n 'stavros' | base64
c3RhdnJvcw==
$ echo -n '39528$vdg7Jb' | base64
MWYyZDFlMmU2N2Rm
```
Run a job as follows:
```./bin/spark-submit \
      --master k8s://http://localhost:9000 \
      --deploy-mode cluster \
      --name spark-pi \
      --class org.apache.spark.examples.SparkPi \
      --conf spark.executor.instances=1 \
      --conf spark.kubernetes.container.image=skonto/spark:k8envs3 \
      --conf spark.kubernetes.driver.secretKeyRef.MY_USERNAME=test-secret:username \
      --conf spark.kubernetes.driver.secretKeyRef.My_password=test-secret:password \
      --conf spark.kubernetes.executor.secretKeyRef.MY_USERNAME=test-secret:username \
      --conf spark.kubernetes.executor.secretKeyRef.My_password=test-secret:password \
      local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar 10000
```

Secret loaded correctly at the driver container:
![image](https://user-images.githubusercontent.com/7945591/40174346-7fee70c8-59dd-11e8-8705-995a5472716f.png)

Also if I log into the exec container:

kubectl exec -it spark-pi-1526555613156-exec-1 bash
bash-4.4# env

> SPARK_EXECUTOR_MEMORY=1g
> SPARK_EXECUTOR_CORES=1
> LANG=C.UTF-8
> HOSTNAME=spark-pi-1526555613156-exec-1
> SPARK_APPLICATION_ID=spark-application-1526555618626
> **MY_USERNAME=stavros**
>
> JAVA_HOME=/usr/lib/jvm/java-1.8-openjdk
> KUBERNETES_PORT_443_TCP_PROTO=tcp
> KUBERNETES_PORT_443_TCP_ADDR=10.100.0.1
> JAVA_VERSION=8u151
> KUBERNETES_PORT=tcp://10.100.0.1:443
> PWD=/opt/spark/work-dir
> HOME=/root
> SPARK_LOCAL_DIRS=/var/data/spark-b569b0ae-b7ef-4f91-bcd5-0f55535d3564
> KUBERNETES_SERVICE_PORT_HTTPS=443
> KUBERNETES_PORT_443_TCP_PORT=443
> SPARK_HOME=/opt/spark
> SPARK_DRIVER_URL=spark://CoarseGrainedSchedulerspark-pi-1526555613156-driver-svc.default.svc:7078
> KUBERNETES_PORT_443_TCP=tcp://10.100.0.1:443
> SPARK_EXECUTOR_POD_IP=9.0.9.77
> TERM=xterm
> SPARK_EXECUTOR_ID=1
> SHLVL=1
> KUBERNETES_SERVICE_PORT=443
> SPARK_CONF_DIR=/opt/spark/conf
> PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-1.8-openjdk/jre/bin:/usr/lib/jvm/java-1.8-openjdk/bin
> JAVA_ALPINE_VERSION=8.151.12-r0
> KUBERNETES_SERVICE_HOST=10.100.0.1
> **My_password=39528$vdg7Jb**
> _=/usr/bin/env
>

Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>

Closes #21317 from skonto/k8s-fix-env-secrets.
2018-05-31 14:28:33 -07:00
Yuming Wang cc976f6cb8 [SPARK-23900][SQL] format_number support user specifed format as argument
## What changes were proposed in this pull request?

`format_number` support user specifed format as argument. For example:
```sql
spark-sql> SELECT format_number(12332.123456, '##################.###');
12332.123
```

## How was this patch tested?

unit test

Author: Yuming Wang <yumwang@ebay.com>

Closes #21010 from wangyum/SPARK-23900.
2018-05-31 11:38:23 -07:00
Tathagata Das 223df5d9d4 [SPARK-24397][PYSPARK] Added TaskContext.getLocalProperty(key) in Python
## What changes were proposed in this pull request?

This adds a new API `TaskContext.getLocalProperty(key)` to the Python TaskContext. It mirrors the Java TaskContext API of returning a string value if the key exists, or None if the key does not exist.

## How was this patch tested?
New test added.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21437 from tdas/SPARK-24397.
2018-05-31 11:23:57 -07:00
Marcelo Vanzin 7a82e93b34 [SPARK-24414][UI] Calculate the correct number of tasks for a stage.
This change takes into account all non-pending tasks when calculating
the number of tasks to be shown. This also means that when the stage
is pending, the task table (or, in fact, most of the data in the stage
page) will not be rendered.

I also fixed the label when the known number of tasks is larger than
the recorded number of tasks (it was inverted).

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21457 from vanzin/SPARK-24414.
2018-05-31 10:05:20 -07:00
Sean Owen 698b9a0981 [WEBUI] Avoid possibility of script in query param keys
As discussed separately, this avoids the possibility of XSS on certain request param keys.

CC vanzin

Author: Sean Owen <srowen@gmail.com>

Closes #21464 from srowen/XSS2.
2018-05-31 09:34:39 -07:00
WeichenXu 90ae98d1ac [SPARK-24146][PYSPARK][ML] spark.ml parity for sequential pattern mining - PrefixSpan: Python API
## What changes were proposed in this pull request?

spark.ml parity for sequential pattern mining - PrefixSpan: Python API

## How was this patch tested?

doctests

Author: WeichenXu <weichen.xu@databricks.com>

Closes #21265 from WeichenXu123/prefix_span_py.
2018-05-31 06:53:10 -07:00
William Sheu 0053e153fa [SPARK-24337][CORE] Improve error messages for Spark conf values
## What changes were proposed in this pull request?

Improve the exception messages when retrieving Spark conf values to include the key name when the value is invalid.

## How was this patch tested?

Unit tests for all get* operations in SparkConf that require a specific value format

Author: William Sheu <william.sheu@databricks.com>

Closes #21454 from PenguinToast/SPARK-24337-spark-config-errors.
2018-05-30 22:37:27 -07:00
Marco Gaido 24ef7fbfa9 [SPARK-24276][SQL] Order of literals in IN should not affect semantic equality
## What changes were proposed in this pull request?

When two `In` operators are created with the same list of values, but different order, we are considering them as semantically different. This is wrong, since they have the same semantic meaning.

The PR adds a canonicalization rule which orders the literals in the `In` operator so the semantic equality works properly.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21331 from mgaido91/SPARK-24276.
2018-05-30 15:31:40 -07:00
Marco Gaido 1b36f14889 [SPARK-23901][SQL] Add masking functions
## What changes were proposed in this pull request?

The PR adds the masking function as they are described in Hive's documentation: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DataMaskingFunctions.
This means that only `string`s are accepted as parameter for the masking functions.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21246 from mgaido91/SPARK-23901.
2018-05-30 11:18:04 -07:00
Huaxin Gao ec6f971dc5 [SPARK-23161][PYSPARK][ML] Add missing APIs to Python GBTClassifier
## What changes were proposed in this pull request?

Add featureSubsetStrategy in GBTClassifier and GBTRegressor.  Also make GBTClassificationModel inherit from JavaClassificationModel instead of prediction model so it will have numClasses.

## How was this patch tested?

Add tests in doctest

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #21413 from huaxingao/spark-23161.
2018-05-30 11:04:09 -07:00
hyukjinkwon b142157dcc [SPARK-24384][PYTHON][SPARK SUBMIT] Add .py files correctly into PythonRunner in submit with client mode in spark-submit
## What changes were proposed in this pull request?

In client side before context initialization specifically,  .py file doesn't work in client side before context initialization when the application is a Python file. See below:

```
$ cat /home/spark/tmp.py
def testtest():
    return 1
```

This works:

```
$ cat app.py
import pyspark
pyspark.sql.SparkSession.builder.getOrCreate()
import tmp
print("************************%s" % tmp.testtest())

$ ./bin/spark-submit --master yarn --deploy-mode client --py-files /home/spark/tmp.py app.py
...
************************1
```

but this doesn't:

```
$ cat app.py
import pyspark
import tmp
pyspark.sql.SparkSession.builder.getOrCreate()
print("************************%s" % tmp.testtest())

$ ./bin/spark-submit --master yarn --deploy-mode client --py-files /home/spark/tmp.py app.py
Traceback (most recent call last):
  File "/home/spark/spark/app.py", line 2, in <module>
    import tmp
ImportError: No module named tmp
```

### How did it happen?

In client mode specifically, the paths are being added into PythonRunner as are:

628c7b5179/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala (L430)

628c7b5179/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala (L49-L88)

The problem here is, .py file shouldn't be added as are since `PYTHONPATH` expects a directory or an archive like zip or egg.

### How does this PR fix?

We shouldn't simply just add its parent directory because other files in the parent directory could also be added into the `PYTHONPATH` in client mode before context initialization.

Therefore, we copy .py files into a temp directory for .py files and add it to `PYTHONPATH`.

## How was this patch tested?

Unit tests are added and manually tested in both standalond and yarn client modes with submit.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21426 from HyukjinKwon/SPARK-24384.
2018-05-30 10:33:34 -07:00
Takeshi Yamamuro 1e46f92f95 [SPARK-24369][SQL] Correct handling for multiple distinct aggregations having the same argument set
## What changes were proposed in this pull request?
This pr fixed an issue when having multiple distinct aggregations having the same argument set, e.g.,
```
scala>: paste
val df = sql(
  s"""SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
     | FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
   """.stripMargin)

java.lang.RuntimeException
You hit a query analyzer bug. Please report your query to Spark user mailing list.
```
The root cause is that `RewriteDistinctAggregates` can't detect multiple distinct aggregations if they have the same argument set. This pr modified code so that `RewriteDistinctAggregates` could count the number of aggregate expressions with `isDistinct=true`.

## How was this patch tested?
Added tests in `DataFrameAggregateSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21443 from maropu/SPARK-24369.
2018-05-31 00:23:25 +08:00
DB Tsai 9e7bad0edd
[SPARK-24419][BUILD] Upgrade SBT to 0.13.17 with Scala 2.10.7 for JDK9+
## What changes were proposed in this pull request?

Upgrade SBT to 0.13.17 with Scala 2.10.7 for JDK9+

## How was this patch tested?

Existing tests

Author: DB Tsai <d_tsai@apple.com>

Closes #21458 from dbtsai/sbt.
2018-05-30 05:18:18 -07:00
e-dorigatti 0ebb0c0d4d [SPARK-23754][PYTHON] Re-raising StopIteration in client code
## What changes were proposed in this pull request?

Make sure that `StopIteration`s raised in users' code do not silently interrupt processing by spark, but are raised as exceptions to the users. The users' functions are wrapped in `safe_iter` (in `shuffle.py`), which re-raises `StopIteration`s as `RuntimeError`s

## How was this patch tested?

Unit tests, making sure that the exceptions are indeed raised. I am not sure how to check whether a `Py4JJavaError` contains my exception, so I simply looked for the exception message in the java exception's `toString`. Can you propose a better way?

## License

This is my original work, licensed in the same way as spark

Author: e-dorigatti <emilio.dorigatti@gmail.com>
Author: edorigatti <emilio.dorigatti@gmail.com>

Closes #21383 from e-dorigatti/fix_spark_23754.
2018-05-30 18:11:33 +08:00
Marek Novotny a4be981c04 [SPARK-24331][SPARKR][SQL] Adding arrays_overlap, array_repeat, map_entries to SparkR
## What changes were proposed in this pull request?

The PR adds functions `arrays_overlap`, `array_repeat`, `map_entries` to SparkR.

## How was this patch tested?

Tests added into R/pkg/tests/fulltests/test_sparkSQL.R

## Examples
### arrays_overlap
```
df <- createDataFrame(list(list(list(1L, 2L), list(3L, 1L)),
                           list(list(1L, 2L), list(3L, 4L)),
                           list(list(1L, NA), list(3L, 4L))))
collect(select(df, arrays_overlap(df[[1]], df[[2]])))
```
```
  arrays_overlap(_1, _2)
1                   TRUE
2                  FALSE
3                     NA
```
### array_repeat
```
df <- createDataFrame(list(list("a", 3L), list("b", 2L)))
collect(select(df, array_repeat(df[[1]], df[[2]])))
```
```
  array_repeat(_1, _2)
1              a, a, a
2                 b, b
```
```
collect(select(df, array_repeat(df[[1]], 2L)))
```
```
  array_repeat(_1, 2)
1                a, a
2                b, b
```
### map_entries
```
df <- createDataFrame(list(list(map = as.environment(list(x = 1, y = 2)))))
collect(select(df, map_entries(df$map)))
```
```
  map_entries(map)
1       x, 1, y, 2
```

Author: Marek Novotny <mn.mikke@gmail.com>

Closes #21434 from mn-mikke/SPARK-24331.
2018-05-29 23:26:39 -07:00
Gengliang Wang f48938800e [SPARK-24365][SQL] Add Data Source write benchmark
## What changes were proposed in this pull request?

Add Data Source write benchmark. So that it would be easier to measure the writer performance.

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21409 from gengliangwang/parquetWriteBenchmark.
2018-05-30 09:32:33 +08:00
DB Tsai 900bc1f7dc
[SPARK-24371][SQL] Added isInCollection in DataFrame API for Scala and Java.
## What changes were proposed in this pull request?

Implemented **`isInCollection `** in DataFrame API for both Scala and Java, so users can do

```scala
val profileDF = Seq(
  Some(1), Some(2), Some(3), Some(4),
  Some(5), Some(6), Some(7), None
).toDF("profileID")

val validUsers: Seq[Any] = Seq(6, 7.toShort, 8L, "3")

val result = profileDF.withColumn("isValid", $"profileID". isInCollection(validUsers))

result.show(10)
"""
+---------+-------+
|profileID|isValid|
+---------+-------+
|        1|  false|
|        2|  false|
|        3|   true|
|        4|  false|
|        5|  false|
|        6|   true|
|        7|   true|
|     null|   null|
+---------+-------+
 """.stripMargin
```
## How was this patch tested?

Several unit tests are added.

Author: DB Tsai <d_tsai@apple.com>

Closes #21416 from dbtsai/optimize-set.
2018-05-29 10:22:18 -07:00
Gabor Somogyi aca65c63cb [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch
When blocks tried to get allocated to a batch and WAL write fails then the blocks will be removed from the received block queue. This fact simply produces data loss because the next allocation will not find the mentioned blocks in the queue.

In this PR blocks will be removed from the received queue only if WAL write succeded.

Additional unit test.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #21430 from gaborgsomogyi/SPARK-23991.

Change-Id: I5ead84f0233f0c95e6d9f2854ac2ff6906f6b341
2018-05-29 20:10:59 +08:00
Xiao Li 23db600c95 [SPARK-24250][SQL][FOLLOW-UP] support accessing SQLConf inside tasks
## What changes were proposed in this pull request?
We should not stop users from calling `getActiveSession` and `getDefaultSession` in executors. To not break the existing behaviors, we should simply return None.

## How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21436 from gatorsmile/followUpSPARK-24250.
2018-05-28 23:23:22 -07:00
jerryshao 2ced6193b3 [SPARK-24377][SPARK SUBMIT] make --py-files work in non pyspark application
## What changes were proposed in this pull request?

For some Spark applications, though they're a java program, they require not only jar dependencies, but also python dependencies. One example is Livy remote SparkContext application, this application is actually an embedded REPL for Scala/Python/R, it will not only load in jar dependencies, but also python and R deps, so we should specify not only "--jars", but also "--py-files".

Currently for a Spark application, --py-files can only be worked for a pyspark application, so it will not be worked in the above case. So here propose to remove such restriction.

Also we tested that "spark.submit.pyFiles" only supports quite limited scenario (client mode with local deps), so here also expand the usage of "spark.submit.pyFiles" to be alternative of --py-files.

## How was this patch tested?

UT added.

Author: jerryshao <sshao@hortonworks.com>

Closes #21420 from jerryshao/SPARK-24377.
2018-05-29 10:48:48 +08:00
Dongjoon Hyun b31b587cd0 [SPARK-19613][SS][TEST] Random.nextString is not safe for directory namePrefix
## What changes were proposed in this pull request?

`Random.nextString` is good for generating random string data, but it's not proper for directory name prefix in `Utils.createDirectory(tempDir, Random.nextString(10))`. This PR uses more safe directory namePrefix.

```scala
scala> scala.util.Random.nextString(10)
res0: String = 馨쭔ᎰႻ穚䃈兩㻞藑並
```

```scala
StateStoreRDDSuite:
- versioning and immutability
- recovering from files
- usage with iterators - only gets and only puts
- preferred locations using StateStoreCoordinator *** FAILED ***
  java.io.IOException: Failed to create a temp directory (under /.../spark/sql/core/target/tmp/StateStoreRDDSuite8712796397908632676) after 10 attempts!
  at org.apache.spark.util.Utils$.createDirectory(Utils.scala:295)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13$$anonfun$apply$6.apply(StateStoreRDDSuite.scala:152)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13$$anonfun$apply$6.apply(StateStoreRDDSuite.scala:149)
  at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13.apply(StateStoreRDDSuite.scala:149)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13.apply(StateStoreRDDSuite.scala:149)
...
- distributed test *** FAILED ***
  java.io.IOException: Failed to create a temp directory (under /.../spark/sql/core/target/tmp/StateStoreRDDSuite8712796397908632676) after 10 attempts!
  at org.apache.spark.util.Utils$.createDirectory(Utils.scala:295)
```

## How was this patch tested?

Pass the existing tests.StateStoreRDDSuite:

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21446 from dongjoon-hyun/SPARK-19613.
2018-05-29 10:35:30 +08:00
Bryan Cutler fa2ae9d201 [SPARK-24392][PYTHON] Label pandas_udf as Experimental
## What changes were proposed in this pull request?

The pandas_udf functionality was introduced in 2.3.0, but is not completely stable and still evolving.  This adds a label to indicate it is still an experimental API.

## How was this patch tested?

NA

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #21435 from BryanCutler/arrow-pandas_udf-experimental-SPARK-24392.
2018-05-28 12:56:05 +08:00
Marco Gaido de01a8d50c [SPARK-24373][SQL] Add AnalysisBarrier to RelationalGroupedDataset's and KeyValueGroupedDataset's child
## What changes were proposed in this pull request?

When we create a `RelationalGroupedDataset` or a `KeyValueGroupedDataset` we set its child to the `logicalPlan` of the `DataFrame` we need to aggregate. Since the `logicalPlan` is already analyzed, we should not analyze it again. But this happens when the new plan of the aggregate is analyzed.

The current behavior in most of the cases is likely to produce no harm, but in other cases re-analyzing an analyzed plan can change it, since the analysis is not idempotent. This can cause issues like the one described in the JIRA (missing to find a cached plan).

The PR adds an `AnalysisBarrier` to the `logicalPlan` which is used as child of `RelationalGroupedDataset` or a `KeyValueGroupedDataset`.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21432 from mgaido91/SPARK-24373.
2018-05-28 12:09:44 +08:00
Li Jin 672209f290 [SPARK-24334] Fix race condition in ArrowPythonRunner causes unclean shutdown of Arrow memory allocator
## What changes were proposed in this pull request?

There is a race condition of closing Arrow VectorSchemaRoot and Allocator in the writer thread of ArrowPythonRunner.

The race results in memory leak exception when closing the allocator. This patch removes the closing routine from the TaskCompletionListener and make the writer thread responsible for cleaning up the Arrow memory.

This issue be reproduced by this test:

```
def test_memory_leak(self):
    from pyspark.sql.functions import pandas_udf, col, PandasUDFType, array, lit, explode

   # Have all data in a single executor thread so it can trigger the race condition easier
    with self.sql_conf({'spark.sql.shuffle.partitions': 1}):
        df = self.spark.range(0, 1000)
        df = df.withColumn('id', array([lit(i) for i in range(0, 300)])) \
                   .withColumn('id', explode(col('id'))) \
                   .withColumn('v',  array([lit(i) for i in range(0, 1000)]))

       pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
       def foo(pdf):
           xxx
           return pdf

       result = df.groupby('id').apply(foo)

       with QuietTest(self.sc):
           with self.assertRaises(py4j.protocol.Py4JJavaError) as context:
               result.count()
           self.assertTrue('Memory leaked' not in str(context.exception))
```

Note: Because of the race condition, the test case cannot reproduce the issue reliably so it's not added to test cases.

## How was this patch tested?

Because of the race condition, the bug cannot be unit test easily. So far it has only happens on large amount of data. This is currently tested manually.

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21397 from icexelloss/SPARK-24334-arrow-memory-leak.
2018-05-28 10:50:17 +08:00
Miles Yucht d440699192 [SPARK-24381][TESTING] Add unit tests for NOT IN subquery around null values
## What changes were proposed in this pull request?
This PR adds several unit tests along the `cols NOT IN (subquery)` pathway. There are a scattering of tests here and there which cover this codepath, but there doesn't seem to be a unified unit test of the correctness of null-aware anti joins anywhere. I have also added a brief explanation of how this expression behaves in SubquerySuite. Lastly, I made some clarifying changes in the NOT IN pathway in RewritePredicateSubquery.

## How was this patch tested?
Added unit tests! There should be no behavioral change in this PR.

Author: Miles Yucht <miles@databricks.com>

Closes #21425 from mgyucht/spark-24381.
2018-05-26 20:42:23 -07:00
Yuming Wang ed1a65448f [SPARK-19112][CORE][FOLLOW-UP] Add missing shortCompressionCodecNames to configuration.
## What changes were proposed in this pull request?

Spark provides four codecs: `lz4`, `lzf`, `snappy`, and `zstd`. This pr add missing shortCompressionCodecNames to configuration.

## How was this patch tested?

 manually tested

Author: Yuming Wang <yumwang@ebay.com>

Closes #21431 from wangyum/SPARK-19112.
2018-05-26 20:26:00 +08:00
Maxim Gekk 1b1528a504 [SPARK-24366][SQL] Improving of error messages for type converting
## What changes were proposed in this pull request?

Currently, users are getting the following error messages on type conversions:

```
scala.MatchError: test (of class java.lang.String)
```

The message doesn't give any clues to the users where in the schema the error happened. In this PR, I would like to improve the error message like:

```
The value (test) of the type (java.lang.String) cannot be converted to struct<f1:int>
```

## How was this patch tested?

Added tests for converting of wrong values to `struct`, `map`, `array`, `string` and `decimal`.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21410 from MaxGekk/type-conv-error.
2018-05-25 15:42:46 -07:00
Marco Gaido fd315f5884 [MINOR] Add port SSL config in toString and scaladoc
## What changes were proposed in this pull request?

SPARK-17874 introduced a new configuration to set the port where SSL services bind to. We missed to update the scaladoc and the `toString` method, though. The PR adds it in the missing places

## How was this patch tested?

checked the `toString` output in the logs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21429 from mgaido91/minor_ssl.
2018-05-25 12:49:06 -07:00
Maxim Gekk 64fad0b519 [SPARK-24244][SPARK-24368][SQL] Passing only required columns to the CSV parser
## What changes were proposed in this pull request?

uniVocity parser allows to specify only required column names or indexes for [parsing](https://www.univocity.com/pages/parsers-tutorial) like:

```
// Here we select only the columns by their indexes.
// The parser just skips the values in other columns
parserSettings.selectIndexes(4, 0, 1);
CsvParser parser = new CsvParser(parserSettings);
```
In this PR, I propose to extract indexes from required schema and pass them into the CSV parser. Benchmarks show the following improvements in parsing of 1000 columns:

```
Select 100 columns out of 1000: x1.76
Select 1 column out of 1000: x2
```

**Note**: Comparing to current implementation, the changes can return different result for malformed rows in the `DROPMALFORMED` and `FAILFAST` modes if only subset of all columns is requested. To have previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.

## How was this patch tested?

It was tested by new test which selects 3 columns out of 15, by existing tests and by new benchmarks.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21415 from MaxGekk/csv-column-pruning2.
2018-05-24 21:38:04 -07:00
Gengliang Wang 3b20b34ab7 [SPARK-24367][SQL] Parquet: use JOB_SUMMARY_LEVEL instead of deprecated flag ENABLE_JOB_SUMMARY
## What changes were proposed in this pull request?

In current parquet version,the conf ENABLE_JOB_SUMMARY is deprecated.

When writing to Parquet files, the warning message
```WARN org.apache.parquet.hadoop.ParquetOutputFormat: Setting parquet.enable.summary-metadata is deprecated, please use parquet.summary.metadata.level```
keeps showing up.

From https://github.com/apache/parquet-mr/blame/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L164 we can see that we should use JOB_SUMMARY_LEVEL.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21411 from gengliangwang/summaryLevel.
2018-05-25 11:16:35 +08:00
Jose Torres 0fd68cb727 [SPARK-24234][SS] Support multiple row writers in continuous processing shuffle reader.
## What changes were proposed in this pull request?

https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii

Support multiple different row writers in continuous processing shuffle reader.

Note that having multiple read-side buffers ended up being the natural way to do this. Otherwise it's hard to express the constraint of sending an epoch marker only when all writers have sent one.

## How was this patch tested?

new unit tests

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #21385 from jose-torres/multipleWrite.
2018-05-24 17:08:52 -07:00
Shixiong Zhu 53c06ddabb [SPARK-24332][SS][MESOS] Fix places reading 'spark.network.timeout' as milliseconds
## What changes were proposed in this pull request?

This PR replaces `getTimeAsMs` with `getTimeAsSeconds` to fix the issue that reading "spark.network.timeout" using a wrong time unit when the user doesn't specify a time out.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #21382 from zsxwing/fix-network-timeout-conf.
2018-05-24 13:00:24 -07:00
Yuming Wang 0d89943449 [SPARK-24378][SQL] Fix date_trunc function incorrect examples
## What changes were proposed in this pull request?

Fix `date_trunc` function incorrect examples.

## How was this patch tested?

N/A

Author: Yuming Wang <yumwang@ebay.com>

Closes #21423 from wangyum/SPARK-24378.
2018-05-24 23:38:50 +08:00
Maxim Gekk 13bedc05c2 [SPARK-24329][SQL] Test for skipping multi-space lines
## What changes were proposed in this pull request?

The PR is a continue of https://github.com/apache/spark/pull/21380 . It checks cases that are handled by the code:
e3de6ab30d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala (L303-L304)

Basically the code skips lines with one or many whitespaces, and lines with comments (see [filterCommentAndEmpty](e3de6ab30d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala (L47)))

```scala
   iter.filter { line =>
      line.trim.nonEmpty && !line.startsWith(options.comment.toString)
    }
```

Closes #21380

## How was this patch tested?

Added a test for the case described above.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21394 from MaxGekk/test-for-multi-space-lines.
2018-05-24 22:18:58 +08:00
Ryan Blue 3469f5c989 [SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase with dictionary filters.
## What changes were proposed in this pull request?

I missed this commit when preparing #21070.

When Parquet is able to filter blocks with dictionary filtering, the expected total value count to be too high in Spark, leading to an error when there were fewer than expected row groups to process. Spark should get the row groups from Parquet to pick up new filter schemes in Parquet like dictionary filtering.

## How was this patch tested?

Using in production at Netflix. Added test case for dictionary-filtered blocks.

Author: Ryan Blue <blue@apache.org>

Closes #21295 from rdblue/SPARK-24230-fix-parquet-block-tracking.
2018-05-24 20:55:26 +08:00
hyukjinkwon 4a14dc0aff [SPARK-22269][BUILD] Run Java linter via SBT for Jenkins
## What changes were proposed in this pull request?

This PR proposes to check Java lint via SBT for Jenkins. It uses the SBT wrapper for checkstyle.

I manually tested. If we build the codes once, running this script takes 2 mins at maximum in my local:

Test codes:

```
Checkstyle failed at following occurrences:
[error] Checkstyle error found in /.../spark/core/src/test/java/test/org/apache/spark/JavaAPISuite.java:82: Line is longer than 100 characters (found 103).
[error] 1 issue(s) found in Checkstyle report: /.../spark/core/target/checkstyle-test-report.xml
[error] Checkstyle error found in /.../spark/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java:84: Line is longer than 100 characters (found 115).
[error] 1 issue(s) found in Checkstyle report: /.../spark/sql/hive/target/checkstyle-test-report.xml
...
```

Main codes:

```
Checkstyle failed at following occurrences:
[error] Checkstyle error found in /.../spark/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java:39: Line is longer than 100 characters (found 104).
[error] Checkstyle error found in /.../spark/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java:26: Line is longer than 100 characters (found 110).
[error] Checkstyle error found in /.../spark/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java:30: Line is longer than 100 characters (found 104).
...
```

## How was this patch tested?

Manually tested. Jenkins build should test this.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21399 from HyukjinKwon/SPARK-22269.
2018-05-24 14:19:32 +08:00
hyukjinkwon 8a545822d0 [SPARK-24364][SS] Prevent InMemoryFileIndex from failing if file path doesn't exist
## What changes were proposed in this pull request?

This PR proposes to follow up https://github.com/apache/spark/pull/15153 and complete SPARK-17599.

`FileSystem` operation (`fs.getFileBlockLocations`) can still fail if the file path does not exist. For example see the exception message below:

```
Error occurred while processing: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...
java.io.FileNotFoundException: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:249)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:229)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:314)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:297)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:297)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:174)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:173)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles(InMemoryFileIndex.scala:173)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:161)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:152)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:166)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:261)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)
	at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:196)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206)
	at com.hwx.StreamTest$.main(StreamTest.scala:97)
	at com.hwx.StreamTest.main(StreamTest.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...
```

So, it fixes it to make a warning instead.

## How was this patch tested?

It's hard to write a test. Manually tested multiple times.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21408 from HyukjinKwon/missing-files.
2018-05-24 13:21:02 +08:00
Xingbo Jiang e108f84f5c [MINOR][CORE] Cleanup unused vals in DAGScheduler.handleTaskCompletion
## What changes were proposed in this pull request?

Cleanup unused vals in `DAGScheduler.handleTaskCompletion` to reduce the code complexity slightly.

## How was this patch tested?

Existing test cases.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21406 from jiangxb1987/handleTaskCompletion.
2018-05-24 11:42:25 +08:00
Dongjoon Hyun 486ecc680e [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
## What changes were proposed in this pull request?

ORC 1.4.4 includes [nine fixes](https://issues.apache.org/jira/issues/?filter=12342568&jql=project%20%3D%20ORC%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%201.4.4). One of the issues is about `Timestamp` bug (ORC-306) which occurs when `native` ORC vectorized reader reads ORC column vector's sub-vector `times` and `nanos`. ORC-306 fixes this according to the [original definition](https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java#L45-L46) and this PR includes the updated interpretation on ORC column vectors. Note that `hive` ORC reader and ORC MR reader is not affected.

```scala
scala> spark.version
res0: String = 2.3.0
scala> spark.sql("set spark.sql.orc.impl=native")
scala> Seq(java.sql.Timestamp.valueOf("1900-05-05 12:34:56.000789")).toDF().write.orc("/tmp/orc")
scala> spark.read.orc("/tmp/orc").show(false)
+--------------------------+
|value                     |
+--------------------------+
|1900-05-05 12:34:55.000789|
+--------------------------+
```

This PR aims to update Apache Spark to use it.

**FULL LIST**

ID | TITLE
-- | --
ORC-281 | Fix compiler warnings from clang 5.0
ORC-301 | `extractFileTail` should open a file in `try` statement
ORC-304 | Fix TestRecordReaderImpl to not fail with new storage-api
ORC-306 | Fix incorrect workaround for bug in java.sql.Timestamp
ORC-324 | Add support for ARM and PPC arch
ORC-330 | Remove unnecessary Hive artifacts from root pom
ORC-332 | Add syntax version to orc_proto.proto
ORC-336 | Remove avro and parquet dependency management entries
ORC-360 | Implement error checking on subtype fields in Java

## How was this patch tested?

Pass the Jenkins.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21372 from dongjoon-hyun/SPARK_ORC144.
2018-05-24 11:34:13 +08:00
sychen 888340151f [SPARK-24257][SQL] LongToUnsafeRowMap calculate the new size may be wrong
LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted.

Author: sychen <sychen@ctrip.com>

Closes #21311 from cxzl25/fix_LongToUnsafeRowMap_page_size.
2018-05-24 11:18:07 +08:00
Vayda, Oleksandr: IT (PRG) 230f144197 [SPARK-24350][SQL] Fixes ClassCastException in the "array_position" function
## What changes were proposed in this pull request?

### Fixes `ClassCastException` in the `array_position` function - [SPARK-24350](https://issues.apache.org/jira/browse/SPARK-24350)
When calling `array_position` function with a wrong type of the 1st argument an `AnalysisException` should be thrown instead of `ClassCastException`

Example:

```sql
select array_position('foo', 'bar')
```

```
java.lang.ClassCastException: org.apache.spark.sql.types.StringType$ cannot be cast to org.apache.spark.sql.types.ArrayType
	at org.apache.spark.sql.catalyst.expressions.ArrayPosition.inputTypes(collectionOperations.scala:1398)
	at org.apache.spark.sql.catalyst.expressions.ExpectsInputTypes$class.checkInputDataTypes(ExpectsInputTypes.scala:44)
	at org.apache.spark.sql.catalyst.expressions.ArrayPosition.checkInputDataTypes(collectionOperations.scala:1401)
	at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:168)
	at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:168)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveAliases$$assignAliases$1$$anonfun$apply$3.applyOrElse(Analyzer.scala:256)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveAliases$$assignAliases$1$$anonfun$apply$3.applyOrElse(Analyzer.scala:252)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
```

## How was this patch tested?

unit test

Author: Vayda, Oleksandr: IT (PRG) <Oleksandr.Vayda@barclayscapital.com>

Closes #21401 from wajda/SPARK-24350-array_position-error-fix.
2018-05-23 17:22:52 -07:00
Jose Torres f457933293 [SPARK-23416][SS] Add a specific stop method for ContinuousExecution.
## What changes were proposed in this pull request?

Add a specific stop method for ContinuousExecution. The previous StreamExecution.stop() method had a race condition as applied to continuous processing: if the cancellation was round-tripped to the driver too quickly, the generic SparkException it caused would be reported as the query death cause. We earlier decided that SparkException should not be added to the StreamExecution.isInterruptionException() whitelist, so we need to ensure this never happens instead.

## How was this patch tested?

Existing tests. I could consistently reproduce the previous flakiness by putting Thread.sleep(1000) between the first job cancellation and thread interruption in StreamExecution.stop().

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #21384 from jose-torres/fixKafka.
2018-05-23 17:21:29 -07:00
jinxing b7a036b75b [SPARK-24294] Throw SparkException when OOM in BroadcastExchangeExec
## What changes were proposed in this pull request?

When OutOfMemoryError thrown from BroadcastExchangeExec, scala.concurrent.Future will hit scala bug – https://github.com/scala/bug/issues/9554, and hang until future timeout:

We could wrap the OOM inside SparkException to resolve this issue.

## How was this patch tested?

Manually tested.

Author: jinxing <jinxing6042@126.com>

Closes #21342 from jinxing64/SPARK-24294.
2018-05-23 13:12:05 -07:00
Takeshi Yamamuro 84557bc9f8 [SPARK-24206][SQL] Improve DataSource read benchmark code
## What changes were proposed in this pull request?
This pr added benchmark code `DataSourceReadBenchmark` for `orc`, `paruqet`, `csv`, and `json` based on the existing `ParquetReadBenchmark` and `OrcReadBenchmark`.

## How was this patch tested?
N/A

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21266 from maropu/DataSourceReadBenchmark.
2018-05-23 13:02:32 -07:00
Xiao Li 5a5a868dc4 Revert "[SPARK-24244][SQL] Passing only required columns to the CSV parser"
This reverts commit 8086acc2f6.
2018-05-23 11:51:13 -07:00
WeichenXu df125062c8 [SPARK-20114][ML][FOLLOW-UP] spark.ml parity for sequential pattern mining - PrefixSpan
## What changes were proposed in this pull request?

Change `PrefixSpan` into a class with param setter/getters.
This address issues mentioned here:
https://github.com/apache/spark/pull/20973#discussion_r186931806

## How was this patch tested?

UT.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: WeichenXu <weichen.xu@databricks.com>

Closes #21393 from WeichenXu123/fix_prefix_span.
2018-05-23 11:00:23 -07:00
Liang-Chi Hsieh a40ffc656d [SPARK-23711][SQL] Add fallback generator for UnsafeProjection
## What changes were proposed in this pull request?

Add fallback logic for `UnsafeProjection`. In production we can try to create unsafe projection using codegen implementation. Once any compile error happens, it fallbacks to interpreted implementation.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21106 from viirya/SPARK-23711.
2018-05-23 22:40:52 +08:00
Seth Fitzsimmons 00c13cfad7 Correct reference to Offset class
This is a documentation-only correction; `org.apache.spark.sql.sources.v2.reader.Offset` is actually `org.apache.spark.sql.sources.v2.reader.streaming.Offset`.

Author: Seth Fitzsimmons <seth@mojodna.net>

Closes #21387 from mojodna/patch-1.
2018-05-23 09:14:03 +08:00
Gabor Somogyi 79e06faa4e [SPARK-19185][DSTREAMS] Avoid concurrent use of cached consumers in CachedKafkaConsumer
## What changes were proposed in this pull request?

`CachedKafkaConsumer` in the project streaming-kafka-0-10 is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one thread trying to read the same Kafka TopicPartition at the same time. This assumption is not true all the time and this can inadvertently lead to ConcurrentModificationException.

Here is a better way to design this. The consumer pool should be smart enough to avoid concurrent use of a cached consumer. If there is another request for the same TopicPartition as a currently in-use consumer, the pool should automatically return a fresh consumer.

- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called `KafkaDataConsumer` is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply call `val consumer = KafkaDataConsumer.acquire` and then `consumer.release`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is request for a consumer which is a task reattempt, then already existing cached consumer will be invalidated and a new consumer is generated. This could fix potential issues if the source of the reattempt is a malfunctioning consumer.
- In addition, I renamed the `CachedKafkaConsumer` class to `KafkaDataConsumer` because is a misnomer given that what it returns may or may not be cached.

## How was this patch tested?

A new stress test that verifies it is safe to concurrently get consumers for the same TopicPartition from the consumer pool.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20997 from gaborgsomogyi/SPARK-19185.
2018-05-22 13:43:45 -07:00