Commit graph

23594 commits

Author SHA1 Message Date
Nihar Sheth 055bf8ea1f [SPARK-24938][CORE] Prevent Netty from using onheap memory for headers without regard for configuration
## What changes were proposed in this pull request?

In MessageEncoder.java, the header would always be allocated on onheap memory regardless of whether netty was configured to use/prefer onheap or offheap. By default this made netty allocate 16mb of onheap memory for a tiny header message. It would be more practical to use preallocated buffers.

Using a memory monitor tool on a simple spark application, the following services currently allocate 16 mb of onheap memory:
netty-rpc-client
netty-blockTransfer-client
netty-external-shuffle-client

With this change, the memory monitor tool reports all three of these services as using 0 b of onheap memory. The offheap memory allocation does not increase, but more of the already-allocated space is used.

## How was this patch tested?

Manually tested change using spark-memory-tool https://github.com/squito/spark-memory

Closes #22114 from NiharS/nettybuffer.

Lead-authored-by: Nihar Sheth <niharrsheth@gmail.com>
Co-authored-by: Nihar Sheth <nsheth@cloudera.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-22 08:41:42 -06:00
Kazuaki Ishizaki 7bf0794651 [SPARK-26463][CORE] Use ConfigEntry for hardcoded configs for scheduler categories.
## What changes were proposed in this pull request?

The PR makes hardcoded `spark.dynamicAllocation`, `spark.scheduler`, `spark.rpc`, `spark.task`, `spark.speculation`, and `spark.cleaner` configs to use `ConfigEntry`.

## How was this patch tested?

Existing tests

Closes #23416 from kiszk/SPARK-26463.

Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-22 07:44:36 -06:00
Jatin Puri d2e86cb3cd [SPARK-26616][MLLIB] Expose document frequency in IDFModel
## What changes were proposed in this pull request?

This change exposes the `df` (document frequency) as a public val along with the number of documents (`m`) as part of the IDF model.

* The document frequency is returned as an `Array[Long]`
* If the minimum  document frequency is set, this is considered in the df calculation. If the count is less than minDocFreq, the df is 0 for such terms
* numDocs is not very required. But it can be useful, if we plan to provide a provision in future for user to give their own idf function, instead of using a default (log((1+m)/(1+df))). In such cases, the user can provide a function taking input of `m` and `df` and returning the idf value
* Pyspark changes

## How was this patch tested?

The existing test case was edited to also check for the document frequency values.

I  am not very good with python or pyspark. I have committed and run tests based on my understanding. Kindly let me know if I have missed anything

Reviewer request: mengxr  zjffdu yinxusen

Closes #23549 from purijatin/master.

Authored-by: Jatin Puri <purijatin@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-22 07:41:54 -06:00
Liang-Chi Hsieh f92d276653 [SPARK-25811][PYSPARK] Raise a proper error when unsafe cast is detected by PyArrow
## What changes were proposed in this pull request?

Since 0.11.0, PyArrow supports to raise an error for unsafe cast ([PR](https://github.com/apache/arrow/pull/2504)). We should use it to raise a proper error for pandas udf users when such cast is detected.

Added a SQL config `spark.sql.execution.pandas.arrowSafeTypeConversion` to disable Arrow safe type check.

## How was this patch tested?

Added test and manually test.

Closes #22807 from viirya/SPARK-25811.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-22 14:54:41 +08:00
Wenchen Fan 098a2c41fc [SPARK-26520][SQL] data source v2 API refactor (micro-batch read)
## What changes were proposed in this pull request?

Following https://github.com/apache/spark/pull/23086, this PR does the API refactor for micro-batch read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)

The major changes:
1. rename `XXXMicroBatchReadSupport` to `XXXMicroBatchReadStream`
2. implement `TableProvider`, `Table`, `ScanBuilder` and `Scan` for streaming sources
3. at the beginning of micro-batch streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`.

followup:
support operator pushdown for stream sources

## How was this patch tested?

existing tests

Closes #23430 from cloud-fan/micro-batch.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-01-21 14:29:12 -08:00
Hyukjin Kwon 75d84498a4 [SPARK-26676][PYTHON] Make HiveContextSQLTests.test_unbounded_frames test compatible with Python 2 and PyPy
## What changes were proposed in this pull request?

This particular test is being skipped at PyPy and Python 2.

```
Skipped tests in pyspark.sql.tests.test_context with pypy:
    test_unbounded_frames (pyspark.sql.tests.test_context.HiveContextSQLTests) ... skipped "Unittest < 3.3 doesn't support mocking"

Skipped tests in pyspark.sql.tests.test_context with python2.7:
    test_unbounded_frames (pyspark.sql.tests.test_context.HiveContextSQLTests) ... skipped "Unittest < 3.3 doesn't support mocking"
```

We don't have to use unittest 3.3 module to mock. And looks the test itself isn't compatible with Python 2.

This PR makes:
 - Manually monkey-patch `sys.maxsize` to get rid of unittest 3.3 condition
 - Use the built-in `reload` in Python 2, and `importlib.reload` in Python 3

## How was this patch tested?

Manually tested, and unit test is fixed.

Closes #23604 from HyukjinKwon/test-window.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-01-21 14:27:17 -08:00
Maxim Gekk 4c1cd809f8 [SPARK-26652][SQL] Remove fromJSON and fromString from Literal
## What changes were proposed in this pull request?

The `fromString` and `fromJSON` methods of the `Literal` object are removed because they are not used.

Closes #23596

Closes #23603 from MaxGekk/remove-literal-fromstring.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-22 02:24:12 +08:00
Shahid 9a30e23211 [SPARK-26351][MLLIB] Update doc and minor correction in the mllib evaluation metrics
## What changes were proposed in this pull request?
Currently, there are some minor inconsistencies in doc compared to the code. In this PR, I am correcting those inconsistencies.
1) Links related to the evaluation metrics in the docs are not working
2) Minor correction in the evaluation metrics formulas in docs.

## How was this patch tested?

NA

Closes #23589 from shahidki31/docCorrection.

Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-20 18:11:14 -06:00
Luca Canali 6c18d8d807 [SPARK-26642][K8S] Add --num-executors option to spark-submit for Spark on K8S.
## What changes were proposed in this pull request?

This PR proposes to extend the spark-submit option --num-executors to be applicable to Spark on K8S too. It is motivated by convenience, for example when migrating jobs written for YARN to run on K8S.

## How was this patch tested?

Manually tested on a K8S cluster.

Author: Luca Canali <luca.canali@cern.ch>

Closes #23573 from LucaCanali/addNumExecutorsToK8s.
2019-01-20 12:43:34 -08:00
Marco Gaido 6d9c54b62c [SPARK-26645][PYTHON] Support decimals with negative scale when parsing datatype
## What changes were proposed in this pull request?

When parsing datatypes from the json internal representation, PySpark doesn't support decimals with negative scales. Since they are allowed and can actually happen, PySpark should be able to successfully parse them.

## How was this patch tested?

added test

Closes #23575 from mgaido91/SPARK-26645.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-20 17:43:50 +08:00
liuxian ace2364296 [MINOR][TEST] Correct some unit test mistakes
## What changes were proposed in this pull request?

Correct some unit test mistakes.

## How was this patch tested?
N/A

Closes #23583 from 10110346/unused_symbol.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-19 08:54:55 -06:00
Kazuaki Ishizaki 64cc9e572e
[SPARK-26477][CORE] Use ConfigEntry for hardcoded configs for unsafe category
## What changes were proposed in this pull request?

The PR makes hardcoded `spark.unsafe` configs to use ConfigEntry and put them in the `config` package.

## How was this patch tested?

Existing UTs

Closes #23412 from kiszk/SPARK-26477.

Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2019-01-18 23:57:04 -08:00
Liang-Chi Hsieh 8503aa3007 [SPARK-26646][TEST][PYSPARK] Fix flaky test: pyspark.mllib.tests.test_streaming_algorithms StreamingLogisticRegressionWithSGDTests.test_training_and_prediction
## What changes were proposed in this pull request?

The test pyspark.mllib.tests.test_streaming_algorithms StreamingLogisticRegressionWithSGDTests.test_training_and_prediction looks sometimes flaky.

```
======================================================================
FAIL: test_training_and_prediction (pyspark.mllib.tests.test_streaming_algorithms.StreamingLogisticRegressionWithSGDTests)
Test that the model improves on toy data with no. of batches
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 367, in test_training_and_prediction
    self._eventually(condition, timeout=60.0)
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 69, in _eventually
    lastValue = condition()
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 362, in condition
    self.assertGreater(errors[1] - errors[-1], 0.3)
AssertionError: -0.070000000000000062 not greater than 0.3

----------------------------------------------------------------------
Ran 13 tests in 198.327s

FAILED (failures=1, skipped=1)

Had test failures in pyspark.mllib.tests.test_streaming_algorithms with python3.4; see logs
```

The predict stream can possibly be consumed to the end before the input stream. When it happens, the model improvement is not high as expected and causes test failed. This patch tries to increase number of batches of streams. This won't increase test time because we have a timeout there.

## How was this patch tested?

Manually test.

Closes #23586 from viirya/SPARK-26646.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-18 23:53:11 +08:00
Maxim Gekk 34db5f5652 [SPARK-26618][SQL] Make typed Timestamp/Date literals consistent to casting
## What changes were proposed in this pull request?

In the PR, I propose to make creation of typed Literals `TIMESTAMP` and `DATE` consistent to the `Cast` expression. More precisely, reusing the `Cast` expression in the type constructors. In this way, it allows:
- To use the same calendar in parsing methods
- To support the same set of timestamp/date patterns

For example, creating timestamp literal:
```sql
SELECT TIMESTAMP '2019-01-14 20:54:00.000'
```
behaves similarly as casting the string literal:
```sql
SELECT CAST('2019-01-14 20:54:00.000' AS TIMESTAMP)
```

## How was this patch tested?

This was tested by `SQLQueryTestSuite` as well as `ExpressionParserSuite`.

Closes #23541 from MaxGekk/timestamp-date-constructors.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
2019-01-18 12:47:36 +01:00
Kris Mok e3418649dc [SPARK-26659][SQL] Fix duplicate cmd.nodeName in the explain output of DataWritingCommandExec
## What changes were proposed in this pull request?

`DataWritingCommandExec` generates `cmd.nodeName` twice in its explain output, e.g. when running this query `spark.sql("create table foo stored as parquet as select id, id % 10 as cat1, id % 20 as cat2 from range(10)")`,
```
Execute OptimizedCreateHiveTableAsSelectCommand OptimizedCreateHiveTableAsSelectCommand [Database:default, TableName: foo, InsertIntoHiveTable]
+- *(1) Project [id#2L, (id#2L % 10) AS cat1#0L, (id#2L % 20) AS cat2#1L]
   +- *(1) Range (0, 10, step=1, splits=8)
```
After the fix, it'll go back to normal:
```
Execute OptimizedCreateHiveTableAsSelectCommand [Database:default, TableName: foo, InsertIntoHiveTable]
+- *(1) Project [id#2L, (id#2L % 10) AS cat1#0L, (id#2L % 20) AS cat2#1L]
   +- *(1) Range (0, 10, step=1, splits=8)
```

This duplication is introduced when this specialized `DataWritingCommandExec` was created in place of `ExecutedCommandExec`.

The former is a `UnaryExecNode` whose `children` include the physical plan of the query, and the `cmd` is picked up via `TreeNode.stringArgs` into the argument string. The duplication comes from: `DataWritingCommandExec.nodeName` is `s"Execute ${cmd.nodeName}"` while the argument string is `cmd.simpleString()` which also includes `cmd.nodeName`.

The latter didn't have that problem because it's a `LeafExecNode` with no children, and it declares the `cmd` as being a part of the `innerChildren` which is excluded from the argument string.

## How was this patch tested?

Manual testing of running the example above in a local Spark Shell.
Also added a new test case in `ExplainSuite`.

Closes #23579 from rednaxelafx/fix-explain.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-01-17 22:43:39 -08:00
Sean Owen c2d0d700b5 [SPARK-26640][CORE][ML][SQL][STREAMING][PYSPARK] Code cleanup from lgtm.com analysis
## What changes were proposed in this pull request?

Misc code cleanup from lgtm.com analysis. See comments below for details.

## How was this patch tested?

Existing tests.

Closes #23571 from srowen/SPARK-26640.

Lead-authored-by: Sean Owen <sean.owen@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-17 19:40:39 -06:00
Sean Owen 0b3abef195 [SPARK-26638][PYSPARK][ML] Pyspark vector classes always return error for unary negation
## What changes were proposed in this pull request?

Fix implementation of unary negation (`__neg__`) in Pyspark DenseVectors

## How was this patch tested?

Existing tests, plus new doctest

Closes #23570 from srowen/SPARK-26638.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-17 14:24:21 -06:00
Juliusz Sompolski ede35c88e0 [SPARK-26622][SQL] Revise SQL Metrics labels
## What changes were proposed in this pull request?

Try to make labels more obvious
"avg hash probe"	avg hash probe bucket iterations
"partition pruning time (ms)"	dynamic partition pruning time
"total number of files in the table"	file count
"number of files that would be returned by partition pruning alone"	file count after partition pruning
"total size of files in the table"	file size
"size of files that would be returned by partition pruning alone"	file size after partition pruning
"metadata time (ms)"	metadata time
"aggregate time"	time in aggregation build
"aggregate time"	time in aggregation build
"time to construct rdd bc"	time to build
"total time to remove rows"	time to remove
"total time to update rows"	time to update

Add proper metric type to some metrics:
"bytes of written output"	written output - createSizeMetric
"metadata time"	- createTimingMetric
"dataSize"	- createSizeMetric
"collectTime"	- createTimingMetric
"buildTime"	- createTimingMetric
"broadcastTIme"	- createTimingMetric

## How is this patch tested?

Existing tests.

Author: Stacy Kerkela <stacy.kerkeladatabricks.com>
Signed-off-by: Juliusz Sompolski <julekdatabricks.com>

Closes #23551 from juliuszsompolski/SPARK-26622.

Lead-authored-by: Juliusz Sompolski <julek@databricks.com>
Co-authored-by: Stacy Kerkela <stacy.kerkela@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-01-17 10:49:42 -08:00
liuxian 1b575ef5d1 [SPARK-26621][CORE] Use ConfigEntry for hardcoded configs for shuffle categories.
## What changes were proposed in this pull request?

The PR makes hardcoded `spark.shuffle` configs to use ConfigEntry and put them in the config package.

## How was this patch tested?
Existing unit tests

Closes #23550 from 10110346/ConfigEntry_shuffle.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-17 12:29:17 -06:00
Maxim Gekk 6f8c0e5255 [SPARK-26593][SQL] Use Proleptic Gregorian calendar in casting UTF8String to Date/TimestampType
## What changes were proposed in this pull request?

In the PR, I propose to use *java.time* classes in `stringToDate` and `stringToTimestamp`. This switches the methods from the hybrid calendar (Gregorian+Julian) to Proleptic Gregorian calendar. And it should make the casting consistent to other Spark classes that converts textual representation of dates/timestamps to `DateType`/`TimestampType`.

## How was this patch tested?

The changes were tested by existing suites - `HashExpressionsSuite`, `CastSuite` and `DateTimeUtilsSuite`.

Closes #23512 from MaxGekk/utf8string-timestamp-parsing.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
2019-01-17 17:53:00 +01:00
Gengliang Wang c0632cec04 [SPARK-23817][SQL] Create file source V2 framework and migrate ORC read path
## What changes were proposed in this pull request?
Create a framework for file source V2 based on data source V2 API.
As a good example for demonstrating the framework, this PR also migrate ORC source. This is because ORC file source supports both row scan and columnar scan, and the implementation is simpler comparing with Parquet.

Note: Currently only read path of V2 API is done, this framework and migration are only for the read path.
Supports the following scan:
- Scan ColumnarBatch
- Scan UnsafeRow
- Push down filters
- Push down required columns

Not supported( due to the limitation of data source V2 API):
- Stats metrics
- Catalog table
- Writes

## How was this patch tested?

Unit test

Closes #23383 from gengliangwang/latest_orcV2.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-01-17 23:33:29 +08:00
xiaodeshan 650b879de9 [SPARK-26457] Show hadoop configurations in HistoryServer environment tab
## What changes were proposed in this pull request?

I know that yarn provided all hadoop configurations. But I guess it may be fine that the historyserver unify all configuration in it. It will be convenient for us to debug some problems.

## How was this patch tested?

![image](https://user-images.githubusercontent.com/42019462/50808610-4d742900-133a-11e9-868c-2976e856ed9a.png)

Closes #23486 from deshanxiao/spark-26457.

Lead-authored-by: xiaodeshan <xiaodeshan@xiaomi.com>
Co-authored-by: deshanxiao <42019462+deshanxiao@users.noreply.github.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-17 05:51:43 -06:00
wright 4915cb3adf [MINOR][BUILD] ensure call to translate_component has correct number of arguments
## What changes were proposed in this pull request?

The call to `translate_component` only supplied 2 out of the 3 required arguments. I added a default empty list for the missing argument to avoid a run-time error.

I work for Semmle, and noticed the bug with our LGTM code analyzer:
0655f1624f/files/dev/create-release/releaseutils.py?sort=name&dir=ASC&mode=heatmap#x1434915b6576fb40:1

## How was this patch tested?

I checked that  `./dev/run-tests` pass OK.

Closes #23567 from ipwright/wrong-number-of-arguments-fix.

Authored-by: wright <wright@semmle.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-16 21:00:58 -06:00
Jungtaek Lim (HeartSaVioR) 38f030725c [SPARK-26466][CORE] Use ConfigEntry for hardcoded configs for submit categories.
## What changes were proposed in this pull request?

The PR makes hardcoded configs below to use `ConfigEntry`.

* spark.kryo
* spark.kryoserializer
* spark.serializer
* spark.jars
* spark.files
* spark.submit
* spark.deploy
* spark.worker

This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties).

## How was this patch tested?

Existing tests.

Closes #23532 from HeartSaVioR/SPARK-26466-v2.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-16 20:57:21 -06:00
Luca Canali 272428db6f [SPARK-26600] Update spark-submit usage message
## What changes were proposed in this pull request?

Spark-submit usage message should be put in sync with recent changes in particular regarding K8S support. These are the proposed changes to the usage message:

--executor-cores NUM -> can be useed for Spark on YARN and K8S

--principal PRINCIPAL  and --keytab KEYTAB -> can be used for Spark on YARN and K8S

--total-executor-cores NUM> can be used for Spark standalone, YARN and K8S

In addition this PR proposes to remove certain implementation details from the --keytab argument description as the implementation details vary between YARN and K8S, for example.

## How was this patch tested?

Manually tested

Closes #23518 from LucaCanali/updateSparkSubmitArguments.

Authored-by: Luca Canali <luca.canali@cern.ch>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-16 20:55:28 -06:00
Kris Mok dc3b35c5da [SPARK-26633][REPL] Add ExecutorClassLoader.getResourceAsStream
## What changes were proposed in this pull request?

Add `ExecutorClassLoader.getResourceAsStream`, so that classes dynamically generated by the REPL can be accessed by user code as `InputStream`s for non-class-loading purposes, such as reading the class file for extracting method/constructor parameter names.

Caveat: The convention in Java's `ClassLoader` is that `ClassLoader.getResourceAsStream()` should be considered as a convenience method of `ClassLoader.getResource()`, where the latter provides a `URL` for the resource, and the former invokes `openStream()` on it to serve the resource as an `InputStream`. The former should also catch `IOException` from `openStream()` and convert it to `null`.

This PR breaks this convention by only overriding `ClassLoader.getResourceAsStream()` instead of also overriding `ClassLoader.getResource()`, so after this PR, it would be possible to get a non-null result from the former, but get a null result from the latter. This isn't ideal, but it's sufficient to cover the main use case and practically it shouldn't matter.
To implement the convention properly, we'd need to register a URL protocol handler with Java to allow it to properly handle the `spark://` protocol, etc, which sounds like an overkill for the intent of this PR.

Credit goes to zsxwing for the initial investigation and fix suggestion.

## How was this patch tested?

Added new test case in `ExecutorClassLoaderSuite` and `ReplSuite`.

Closes #23558 from rednaxelafx/executorclassloader-getresourceasstream.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-01-16 15:21:11 -08:00
Vinoo Ganesh 01301d0972 [SPARK-26625] Add oauthToken to spark.redaction.regex
## What changes were proposed in this pull request?

The regex (spark.redaction.regex) that is used to decide which config properties or environment settings are sensitive should also include oauthToken to match  spark.kubernetes.authenticate.submission.oauthToken

## How was this patch tested?

Simple regex addition - happy to add a test if needed.

Author: Vinoo Ganesh <vganesh@palantir.com>

Closes #23555 from vinooganesh/vinooganesh/SPARK-26625.
2019-01-16 11:43:10 -08:00
Liang-Chi Hsieh 8f170787d2
[SPARK-26619][SQL] Prune the unused serializers from SerializeFromObject
## What changes were proposed in this pull request?

`SerializeFromObject` now keeps all serializer expressions for domain object even when only part of output attributes are used by top plan.

We should be able to prune unused serializers from `SerializeFromObject` in such case.

## How was this patch tested?

Added tests.

Closes #23562 from viirya/SPARK-26619.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-01-16 19:16:37 +00:00
Maxim Gekk 190814e82e [SPARK-26550][SQL] New built-in datasource - noop
## What changes were proposed in this pull request?

In the PR, I propose new built-in datasource with name `noop` which can be used in:
- benchmarking to avoid additional overhead of actions and unnecessary type conversions
- caching of datasets/dataframes
- producing other side effects as a consequence of row materialisations like uploading data to a IO caches.

## How was this patch tested?

Added a test to check that datasource rows are materialised.

Closes #23471 from MaxGekk/none-datasource.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
2019-01-16 19:01:58 +01:00
Tathagata Das 06d5b173b6
[SPARK-26629][SS] Fixed error with multiple file stream in a query + restart on a batch that has no data for one file stream
## What changes were proposed in this pull request?
When a streaming query has multiple file streams, and there is a batch where one of the file streams dont have data in that batch, then if the query has to restart from that, it will throw the following error.
```
java.lang.IllegalStateException: batch 1 doesn't exist
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$.verifyBatchIds(HDFSMetadataLog.scala:300)
	at org.apache.spark.sql.execution.streaming.FileStreamSourceLog.get(FileStreamSourceLog.scala:120)
	at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:181)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:294)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:291)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:291)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:178)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:205)
```

Existing `HDFSMetadata.verifyBatchIds` threw error whenever the `batchIds` list was empty. In the context of `FileStreamSource.getBatch` (where verify is called) and `FileStreamSourceLog` (subclass of `HDFSMetadata`), this is usually okay because, in a streaming query with one file stream, the `batchIds` can never be empty:
- A batch is planned only when the `FileStreamSourceLog` has seen new offset (that is, there are new data files).
- So `FileStreamSource.getBatch` will be called on X to Y where X will always be > Y. This calls internally`HDFSMetadata.verifyBatchIds (X+1, Y)` with X+1-Y ids.

For example.,`FileStreamSource.getBatch(4, 5)` will call `verify(batchIds = Seq(5), start = 5, end = 5)`. However, the invariant of X > Y is not true when there are two file stream sources, as a batch may be planned even when only one of the file streams has data. So one of the file stream may not have data, which can call `FileStreamSource.getBatch(X, X)` -> `verify(batchIds = Seq.empty, start = X+1, end = X)` -> failure.

Note that `FileStreamSource.getBatch(X, X)` gets called **only when restarting a query in a batch where a file source did not have data**. This is because in normal planning of batches, `MicroBatchExecution` avoids calling `FileStreamSource.getBatch(X, X)` when offset X has not changed. However, when restarting a stream at such a batch, `MicroBatchExecution.populateStartOffsets()` calls `FileStreamSource.getBatch(X, X)` (DataSource V1 hack to initialize the source with last known offsets) thus hitting this issue.

The minimum solution here is to skip verification when `FileStreamSource.getBatch(X, X)`.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #23557 from tdas/SPARK-26629.

Authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-01-16 09:42:14 -08:00
Hyukjin Kwon 670bc55f8d [SPARK-25992][PYTHON] Document SparkContext cannot be shared for multiprocessing
## What changes were proposed in this pull request?

This PR proposes to explicitly document that SparkContext cannot be shared for multiprocessing, and multi-processing execution is not guaranteed in PySpark.

I have seen some cases that users attempt to use multiple processes via `multiprocessing` module time to time. For instance, see the example in the JIRA (https://issues.apache.org/jira/browse/SPARK-25992).

Py4J itself does not support Python's multiprocessing out of the box (sharing the same JavaGateways for instance).

In general, such pattern can cause errors with somewhat arbitrary symptoms difficult to diagnose. For instance, see the error message in JIRA:

```
Traceback (most recent call last):
File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 696, in __init__
    self.handle()
File "/usr/local/hadoop/spark2.3.1/python/pyspark/accumulators.py", line 238, in handle
    _accumulatorRegistry[aid] += update
KeyError: 0
```

The root cause of this was because global `_accumulatorRegistry` is not shared across processes.

Using thread instead of process is quite easy in Python. See `threading` vs `multiprocessing` in Python - they can be usually direct replacement for each other. For instance, Python also support threadpool as well (`multiprocessing.pool.ThreadPool`) which can be direct replacement of process-based thread pool (`multiprocessing.Pool`).

## How was this patch tested?

Manually tested, and manually built the doc.

Closes #23564 from HyukjinKwon/SPARK-25992.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-16 23:25:57 +08:00
Hyukjin Kwon e92088de4d [MINOR][PYTHON] Fix SQLContext to SparkSession in Python API main page
## What changes were proposed in this pull request?

This PR proposes to fix deprecated `SQLContext` to `SparkSession` in Python API main page.

**Before:**

![screen shot 2019-01-16 at 5 30 19 pm](https://user-images.githubusercontent.com/6477701/51239583-bac82f80-19b4-11e9-9129-8dae2c23ec79.png)

**After:**

![screen shot 2019-01-16 at 5 29 54 pm](https://user-images.githubusercontent.com/6477701/51239577-b734a880-19b4-11e9-8539-592cb772168d.png)

## How was this patch tested?

Manually checked the doc after building it.
I also checked by `grep -r "SQLContext"` and looks this is the only instance left.

Closes #23565 from HyukjinKwon/minor-doc-change.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-16 23:23:36 +08:00
“attilapiros” 819e5ea7c2 [SPARK-26615][CORE] Fixing transport server/client resource leaks in the core unittests
## What changes were proposed in this pull request?

Fixing resource leaks where TransportClient/TransportServer instances are not closed properly.

In StandaloneSchedulerBackend the null check is added because during the SparkContextSchedulerCreationSuite #"local-cluster" test it turned out that client is not initialised as org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend#start isn't called. It throw an NPE and some resource remained in open.

## How was this patch tested?

By executing the unittests and using some extra temporary logging for counting created and closed TransportClient/TransportServer instances.

Closes #23540 from attilapiros/leaks.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-16 09:00:21 -06:00
Liang-Chi Hsieh cf133e6110 [SPARK-26604][CORE] Clean up channel registration for StreamManager
## What changes were proposed in this pull request?

Now in `TransportRequestHandler.processStreamRequest`, when a stream request is processed, the stream id is not registered with the current channel in stream manager. It should do that so in case of that the channel gets terminated we can remove associated streams of stream requests too.

This also cleans up channel registration in `StreamManager`. Since `StreamManager` doesn't register channel but only `OneForOneStreamManager` does it, this removes `registerChannel` from `StreamManager`. When `OneForOneStreamManager` goes to register stream, it will also register channel for the stream.

## How was this patch tested?

Existing tests.

Closes #23521 from viirya/SPARK-26604.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-01-16 10:58:07 +08:00
Jungtaek Lim (HeartSaVioR) 2ebb79b2a6
[SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350
## What changes were proposed in this pull request?

This patch adds the check to verify consumer group id is given correctly when custom group id is provided to Kafka parameter.

## How was this patch tested?

Modified UT.

Closes #23544 from HeartSaVioR/SPARK-26350-follow-up-actual-verification-on-UT.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-01-15 14:21:51 -08:00
Wenchen Fan 954ef96c49 [SPARK-25530][SQL] data source v2 API refactor (batch write)
## What changes were proposed in this pull request?

Adjust the batch write API to match the read API refactor after https://github.com/apache/spark/pull/23086

The doc with high-level ideas:
https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing

Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, and make it extend `Table`. Renames `WriteSupport` to `Write`. It also cleans up some code as batch API is completed.

This PR also removes the test from https://github.com/apache/spark/pull/22688 . Now data source must return a table for read/write.

A few notes about future changes:
1. We will create `SupportsStreamingWrite` later for streaming APIs
2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. for the new end-user write APIs. I think streaming APIs would remain to use `OutputMode`, and new end-user write APIs will apply to batch only, at least in the near future.
3. We will remove `SaveMode` from data source API: https://issues.apache.org/jira/browse/SPARK-26356

## How was this patch tested?

existing tests

Closes #23208 from cloud-fan/refactor-batch.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-01-15 13:53:48 -08:00
Devaraj K 1b75f3bcff [SPARK-17928][MESOS] No driver.memoryOverhead setting for mesos cluster mode
## What changes were proposed in this pull request?

Added a new configuration 'spark.mesos.driver.memoryOverhead' for providing the driver memory overhead in mesos cluster mode.

## How was this patch tested?
Verified it manually, Resource Scheduler allocates (drivermemory+ driver memoryOverhead) for driver in mesos cluster mode.

Closes #17726 from devaraj-kavali/SPARK-17928.

Authored-by: Devaraj K <devaraj@apache.org>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-15 15:45:20 -06:00
Marcelo Vanzin 8a54492149 [SPARK-25857][CORE] Add developer documentation regarding delegation tokens.
Closes #23348 from vanzin/SPARK-25857.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-01-15 11:23:38 -08:00
Pralabh Kumar 7296999c47 [SPARK-26462][CORE] Use ConfigEntry for hardcoded configs for execution categories
## What changes were proposed in this pull request?

Make the following hardcoded configs to use ConfigEntry.
spark.memory
spark.storage
spark.io
spark.buffer
spark.rdd
spark.locality
spark.broadcast
spark.reducer

## How was this patch tested?

Existing tests.

Closes #23447 from pralabhkumar/execution_categories.

Authored-by: Pralabh Kumar <pkumar2@linkedin.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-15 12:50:07 -06:00
Gabor Somogyi 5ca45e8a3d [SPARK-26592][SS] Throw exception when kafka delegation token tried to obtain with proxy user
## What changes were proposed in this pull request?

Kafka is not yet support to obtain delegation token with proxy user. It has to be turned off until https://issues.apache.org/jira/browse/KAFKA-6945 implemented.

In this PR an exception will be thrown when this situation happens.

## How was this patch tested?

Additional unit test.

Closes #23511 from gaborgsomogyi/SPARK-26592.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-01-15 10:00:01 -08:00
Anton Okolnychyi b45ff02e77
[SPARK-26203][SQL][TEST] Benchmark performance of In and InSet expressions
## What changes were proposed in this pull request?

This PR contains benchmarks for `In` and `InSet` expressions. They cover literals of different data types and will help us to decide where to integrate the switch-based logic for bytes/shorts/ints.

As discussed in [PR-23171](https://github.com/apache/spark/pull/23171), one potential approach is to convert `In` to `InSet` if all elements are literals independently of data types and the number of elements. According to the results of this PR, we might want to keep the threshold for the number of elements. The if-else approach approach might be faster for some data types on a small number of elements (structs? arrays? small decimals?).

### byte / short / int / long

Unless the number of items is really big, `InSet` is slower than `In` because of autoboxing .

Interestingly, `In` scales worse on bytes/shorts than on ints/longs. For example, `InSet` starts to match the performance on around 50 bytes/shorts while this does not happen on the same number of ints/longs. This is a bit strange as shorts/bytes (e.g., `(byte) 1`, `(short) 2`) are represented as ints in the bytecode.

### float / double

Use cases on floats/doubles also suffer from autoboxing. Therefore, `In` outperforms `InSet` on 10 elements.

Similarly to shorts/bytes, `In` scales worse on floats/doubles than on ints/longs because the equality condition is more complicated (e.g., `java.lang.Float.isNaN(filter_valueArg_0) && java.lang.Float.isNaN(9.0F)) || filter_valueArg_0 == 9.0F`).

### decimal

The reason why we have separate benchmarks for small and large decimals is that Spark might use longs to represent decimals in some cases.

If this optimization happens, then `equals` will be nothing else as comparing longs. If this does not happen, Spark will create an instance of `scala.BigDecimal` and use it for comparisons. The latter is more expensive.

`Decimal$hashCode` will always use `scala.BigDecimal$hashCode` even if the number is small enough to fit into a long variable. As a consequence, we see that use cases on small decimals are faster with `In` as they are using long comparisons under the hood. Large decimal values are always faster with `InSet`.

### string

`UTF8String$equals` is not cheap. Therefore, `In` does not really outperform `InSet` as in previous use cases.

### timestamp / date

Under the hood, timestamp/date values will be represented as long/int values. So, `In` allows us to avoid autoboxing.

### array

Arrays are working as expected. `In` is faster on 5 elements while `InSet` is faster on 15 elements. The benchmarks are using `UnsafeArrayData`.

### struct

`InSet` is always faster than `In` for structs. These benchmarks use `GenericInternalRow`.

Closes #23291 from aokolnychyi/spark-26203.

Lead-authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2019-01-15 07:25:50 -07:00
SongYadong a77505d4d3 [CORE][MINOR] Fix some typos about MemoryMode
## What changes were proposed in this pull request?

Fix typos in comments by replacing "in-heap" with "on-heap".

## How was this patch tested?

Existing Tests.

Closes #23533 from SongYadong/typos_inheap_to_onheap.

Authored-by: SongYadong <song.yadong1@zte.com.cn>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-15 14:40:00 +08:00
Maxim Gekk 33b5039cd3 [SPARK-25935][SQL] Allow null rows for bad records from JSON/CSV parsers
## What changes were proposed in this pull request?

This PR reverts  #22938 per discussion in #23325

Closes #23325

Closes #23543 from MaxGekk/return-nulls-from-json-parser.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-01-15 13:02:55 +08:00
Takeshi Yamamuro abc937b247 [MINOR][BUILD] Remove binary license/notice files in a source release for branch-2.4+ only
## What changes were proposed in this pull request?
To skip some steps to remove binary license/notice files in a source release for branch2.3 (these files only exist in master/branch-2.4 now), this pr checked a Spark release version in `dev/create-release/release-build.sh`.

## How was this patch tested?
Manually checked.

Closes #23538 from maropu/FixReleaseScript.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-14 19:17:39 -06:00
Shixiong Zhu bafc7ac025
[SPARK-26350][SS] Allow to override group id of the Kafka consumer
## What changes were proposed in this pull request?

This PR allows the user to override `kafka.group.id` for better monitoring or security. The user needs to make sure there are not multiple queries or sources using the same group id.

It also fixes a bug that the `groupIdPrefix` option cannot be retrieved.

## How was this patch tested?

The new added unit tests.

Closes #23301 from zsxwing/SPARK-26350.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-01-14 13:37:24 -08:00
Maxim Gekk 115fecfd84 [SPARK-26456][SQL] Cast date/timestamp to string by Date/TimestampFormatter
## What changes were proposed in this pull request?

In the PR, I propose to switch on `TimestampFormatter`/`DateFormatter` in casting dates/timestamps to strings. The changes should make the date/timestamp casting consistent to JSON/CSV datasources and time-related functions like `to_date`, `to_unix_timestamp`/`from_unixtime`.

Local formatters are moved out from `DateTimeUtils` to where they are actually used. It allows to avoid re-creation of new formatter instance per-each call. Another reason is to have separate parser for `PartitioningUtils` because default parsing pattern cannot be used (expected optional section `[.S]`).

## How was this patch tested?

It was tested by `DateTimeUtilsSuite`, `CastSuite` and `JDBC*Suite`.

Closes #23391 from MaxGekk/thread-local-date-format.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-01-14 21:59:25 +08:00
John Zhuge 3f8007102a [SPARK-26576][SQL] Broadcast hint not applied to partitioned table
## What changes were proposed in this pull request?

Make sure broadcast hint is applied to partitioned tables.

## How was this patch tested?

- A new unit test in PruneFileSourcePartitionsSuite
- Unit test suites touched by SPARK-14581: JoinOptimizationSuite, FilterPushdownSuite, ColumnPruningSuite, and PruneFiltersSuite

Closes #23507 from jzhuge/SPARK-26576.

Closes #23530 from jzhuge/SPARK-26576-master.

Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-01-13 15:36:40 -08:00
maryannxue 985f966b9c [SPARK-26065][FOLLOW-UP][SQL] Revert hint behavior in join reordering
## What changes were proposed in this pull request?

This is to fix a bug in #23036 that would cause a join hint to be applied on node it is not supposed to after join reordering. For example,
```
  val join = df.join(df, "id")
  val broadcasted = join.hint("broadcast")
  val join2 = join.join(broadcasted, "id").join(broadcasted, "id")
```
There should only be 2 broadcast hints on `join2`, but after join reordering there would be 4. It is because the hint application in join reordering compares the attribute set for testing relation equivalency.
Moreover, it could still be problematic even if the child relations were used in testing relation equivalency, due to the potential exprId conflict in nested self-join.

As a result, this PR simply reverts the join reorder hint behavior change introduced in #23036, which means if a join hint is present, the join node itself will not participate in the join reordering, while the sub-joins within its children still can.

## How was this patch tested?

Added new tests

Closes #23524 from maryannxue/query-hint-followup-2.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-01-13 15:30:45 -08:00
Bruce Robbins 09b05487b7 [SPARK-26450][SQL] Avoid rebuilding map of schema for every column in projection
## What changes were proposed in this pull request?

When creating some unsafe projections, Spark rebuilds the map of schema attributes once for each expression in the projection. Some file format readers create one unsafe projection per input file, others create one per task. ProjectExec also creates one unsafe projection per task. As a result, for wide queries on wide tables, Spark might build the map of schema attributes hundreds of thousands of times.

This PR changes two functions to reuse the same AttributeSeq instance when creating BoundReference objects for each expression in the projection. This avoids the repeated rebuilding of the map of schema attributes.

### Benchmarks

The time saved by this PR depends on size of the schema, size of the projection, number of input files (or number of file splits), number of tasks, and file format. I chose a couple of example cases.

In the following tests, I ran the query
```sql
select * from table where id1 = 1
```

Matching rows are about 0.2% of the table.

#### Orc table 6000 columns, 500K rows, 34 input files

baseline | pr | improvement
----|----|----
1.772306 min | 1.487267 min | 16.082943%

#### Orc table 6000 columns, 500K rows, *17* input files

baseline | pr | improvement
----|----|----
 1.656400 min | 1.423550 min | 14.057595%

#### Orc table 60 columns, 50M rows, 34 input files

baseline | pr | improvement
----|----|----
0.299878 min | 0.290339 min | 3.180926%

#### Parquet table 6000 columns, 500K rows, 34 input files

baseline | pr | improvement
----|----|----
1.478306 min | 1.373728 min | 7.074165%

Note: The parquet reader does not create an unsafe projection. However, the filter operation in the query causes the planner to add a ProjectExec, which does create an unsafe projection for each task. So these results have nothing to do with Parquet itself.

#### Parquet table 60 columns, 50M rows, 34 input files

baseline | pr | improvement
----|----|----
0.245006 min | 0.242200 min | 1.145099%

#### CSV table 6000 columns, 500K rows, 34 input files

baseline | pr | improvement
----|----|----
2.390117 min | 2.182778 min | 8.674844%

#### CSV table 60 columns, 50M rows, 34 input files

baseline | pr | improvement
----|----|----
1.520911 min | 1.510211 min | 0.703526%

## How was this patch tested?

SQL unit tests
Python core and SQL test

Closes #23392 from bersprockets/norebuild.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
2019-01-13 23:54:19 +01:00
Petar Petrov c01152dd22 [SPARK-23182][CORE] Allow enabling TCP keep alive on the RPC connections
## What changes were proposed in this pull request?

Make it possible for the master to enable TCP keep alive on the RPC connections with clients.

## How was this patch tested?

Manually tested.

Added the following:
```
spark.rpc.io.enableTcpKeepAlive  true
```
to spark-defaults.conf.

Observed the following on the Spark master:
```
$ netstat -town | grep 7077
tcp6       0      0 10.240.3.134:7077       10.240.1.25:42851       ESTABLISHED keepalive (6736.50/0/0)
tcp6       0      0 10.240.3.134:44911      10.240.3.134:7077       ESTABLISHED keepalive (4098.68/0/0)
tcp6       0      0 10.240.3.134:7077       10.240.3.134:44911      ESTABLISHED keepalive (4098.68/0/0)
```

Which proves that the keep alive setting is taking effect.

It's currently possible to enable TCP keep alive on the worker / executor, but is not possible to configure on other RPC connections. It's unclear to me why this could be the case. Keep alive is more important for the master to protect it against suddenly departing workers / executors, thus I think it's very important to have it. Particularly this makes the master resilient in case of using preemptible worker VMs in GCE. GCE has the concept of shutdown scripts, which it doesn't guarantee to execute. So workers often don't get shutdown gracefully and the TCP connections on the master linger as there's nothing to close them. Thus the need of enabling keep alive.

This enables keep-alive on connections besides the master's connections, but that shouldn't cause harm.

Closes #20512 from peshopetrov/master.

Authored-by: Petar Petrov <petar.petrov@leanplum.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-01-13 13:39:12 -06:00