Commit graph

22534 commits

Author SHA1 Message Date
Xingbo Jiang e3486e1b95 [SPARK-24795][CORE] Implement barrier execution mode
## What changes were proposed in this pull request?

Propose new APIs and modify job/task scheduling to support barrier execution mode, which requires all tasks in a same barrier stage start at the same time, and retry all tasks in case some tasks fail in the middle. The barrier execution mode is useful for some ML/DL workloads.

The proposed API changes include:

- `RDDBarrier` that marks an RDD as barrier (Spark must launch all the tasks together for the current stage).
- `BarrierTaskContext` that support global sync of all tasks in a barrier stage, and provide extra `BarrierTaskInfo`s.

In DAGScheduler, we retry all tasks of a barrier stage in case some tasks fail in the middle, this is achieved by unregistering map outputs for a shuffleId (for ShuffleMapStage) or clear the finished partitions in an active job (for ResultStage).

## How was this patch tested?

Add `RDDBarrierSuite` to ensure we convert RDDs correctly;
Add new test cases in `DAGSchedulerSuite` to ensure we do task scheduling correctly;
Add new test cases in `SparkContextSuite` to ensure the barrier execution mode actually works (both under local mode and local cluster mode).
Add new test cases in `TaskSchedulerImplSuite` to ensure we schedule tasks for barrier taskSet together.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21758 from jiangxb1987/barrier-execution-mode.
2018-07-26 12:09:01 -07:00
maryannxue 5ed7660d14 [SPARK-24802][SQL][FOLLOW-UP] Add a new config for Optimization Rule Exclusion
## What changes were proposed in this pull request?

This is an extension to the original PR, in which rule exclusion did not work for classes derived from Optimizer, e.g., SparkOptimizer.
To solve this issue, Optimizer and its derived classes will define/override `defaultBatches` and `nonExcludableRules` in order to define its default rule set as well as rules that cannot be excluded by the SQL config. In the meantime, Optimizer's `batches` method is dedicated to the rule exclusion logic and is defined "final".

## How was this patch tested?

Added UT.

Author: maryannxue <maryannxue@apache.org>

Closes #21876 from maryannxue/rule-exclusion.
2018-07-26 11:06:23 -07:00
Dongjoon Hyun 58353d7f4b [SPARK-24924][SQL] Add mapping for built-in Avro data source
## What changes were proposed in this pull request?

This PR aims to the followings.
1. Like `com.databricks.spark.csv` mapping, we had better map `com.databricks.spark.avro` to built-in Avro data source.
2. Remove incorrect error message, `Please find an Avro package at ...`.

## How was this patch tested?

Pass the newly added tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21878 from dongjoon-hyun/SPARK-24924.
2018-07-26 16:11:03 +08:00
Takuya UESHIN c9b233d414 [SPARK-24878][SQL] Fix reverse function for array type of primitive type containing null.
## What changes were proposed in this pull request?

If we use `reverse` function for array type of primitive type containing `null` and the child array is `UnsafeArrayData`, the function returns a wrong result because `UnsafeArrayData` doesn't define the behavior of re-assignment, especially we can't set a valid value after we set `null`.

## How was this patch tested?

Added some tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21830 from ueshin/issues/SPARK-24878/fix_reverse.
2018-07-26 15:06:13 +08:00
Xiao Li d2e7deb59f [SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter
## What changes were proposed in this pull request?
```Scala
      val udf1 = udf({(x: Int, y: Int) => x + y})
      val df = spark.range(0, 3).toDF("a")
        .withColumn("b", udf1($"a", udf1($"a", lit(10))))
      df.cache()
      df.write.saveAsTable("t")
```
Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent.

## How was this patch tested?
Added a test.

Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21821 from gatorsmile/testMaster22.
2018-07-25 17:22:37 -07:00
Koert Kuipers 17f469bc80 [SPARK-24860][SQL] Support setting of partitionOverWriteMode in output options for writing DataFrame
## What changes were proposed in this pull request?

Besides spark setting spark.sql.sources.partitionOverwriteMode also allow setting partitionOverWriteMode per write

## How was this patch tested?

Added unit test in InsertSuite

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Koert Kuipers <koert@tresata.com>

Closes #21818 from koertkuipers/feat-partition-overwrite-mode-per-write.
2018-07-25 13:06:03 -07:00
mcheah 0c83f718ee [SPARK-23146][K8S][TESTS] Enable client mode integration test.
## What changes were proposed in this pull request?

Enable client mode integration test after merging from master.

## How was this patch tested?

Check the integration test runs in the build.

Author: mcheah <mcheah@palantir.com>

Closes #21874 from mccheah/enable-client-mode-test.
2018-07-25 12:10:23 -07:00
Maxim Gekk 2f77616e1d [SPARK-24849][SPARK-24911][SQL] Converting a value of StructType to a DDL string
## What changes were proposed in this pull request?

In the PR, I propose to extend the `StructType`/`StructField` classes by new method `toDDL` which converts a value of the `StructType`/`StructField` type to a string formatted in DDL style. The resulted string can be used in a table creation.

The `toDDL` method of `StructField` is reused in `SHOW CREATE TABLE`. In this way the PR fixes the bug of unquoted names of nested fields.

## How was this patch tested?

I add a test for checking the new method and 2 round trip tests: `fromDDL` -> `toDDL` and `toDDL` -> `fromDDL`

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21803 from MaxGekk/to-ddl.
2018-07-25 11:09:12 -07:00
mcheah 571a6f0574 [SPARK-23146][K8S] Support client mode.
## What changes were proposed in this pull request?

Support client mode for the Kubernetes scheduler.

Client mode works more or less identically to cluster mode. However, in client mode, the Spark Context needs to be manually bootstrapped with certain properties which would have otherwise been set up by spark-submit in cluster mode. Specifically:

- If the user doesn't provide a driver pod name, we don't add an owner reference. This is for usage when the driver is not running in a pod in the cluster. In such a case, the driver can only provide a best effort to clean up the executors when the driver exits, but cleaning up the resources is not guaranteed. The executor JVMs should exit if the driver JVM exits, but the pods will still remain in the cluster in a COMPLETED or FAILED state.
- The user must provide a host (spark.driver.host) and port (spark.driver.port) that the executors can connect to. When using spark-submit in cluster mode, spark-submit generates the headless service automatically; in client mode, the user is responsible for setting up their own connectivity.

We also change the authentication configuration prefixes for client mode.

## How was this patch tested?

Adding an integration test to exercise client mode support.

Author: mcheah <mcheah@palantir.com>

Closes #21748 from mccheah/k8s-client-mode.
2018-07-25 11:08:41 -07:00
Gengliang Wang c44eb561ec [SPARK-24768][FOLLOWUP][SQL] Avro migration followup: change artifactId to spark-avro
## What changes were proposed in this pull request?
After rethinking on the artifactId, I think it should be `spark-avro` instead of `spark-sql-avro`, which is simpler, and consistent with the previous artifactId. I think we need to change it before Spark 2.4 release.

Also a tiny change: use `spark.sessionState.newHadoopConf()` to get the hadoop configuration, thus the related hadoop configurations in SQLConf will come into effect.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21866 from gengliangwang/avro_followup.
2018-07-25 08:42:45 -07:00
Yuming Wang 7a5fd4a91e [SPARK-18874][SQL][FOLLOW-UP] Improvement type mismatched message
## What changes were proposed in this pull request?
Improvement `IN` predicate type mismatched message:
```sql
Mismatched columns:
[(, t, 4, ., `, t, 4, a, `, :, d, o, u, b, l, e, ,,  , t, 5, ., `, t, 5, a, `, :, d, e, c, i, m, a, l, (, 1, 8, ,, 0, ), ), (, t, 4, ., `, t, 4, c, `, :, s, t, r, i, n, g, ,,  , t, 5, ., `, t, 5, c, `, :, b, i, g, i, n, t, )]
```
After this patch:
```sql
Mismatched columns:
[(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, t5.`t5c`:bigint)]
```

## How was this patch tested?

unit tests

Author: Yuming Wang <yumwang@ebay.com>

Closes #21863 from wangyum/SPARK-18874.
2018-07-24 23:59:13 -07:00
crafty-coder 78e0a725e0 [SPARK-19018][SQL] Add support for custom encoding on csv writer
## What changes were proposed in this pull request?

Add support for custom encoding on csv writer, see https://issues.apache.org/jira/browse/SPARK-19018

## How was this patch tested?

Added two unit tests in CSVSuite

Author: crafty-coder <carlospb86@gmail.com>
Author: Carlos <crafty-coder@users.noreply.github.com>

Closes #20949 from crafty-coder/master.
2018-07-25 14:17:20 +08:00
Dilip Biswal afb0627536 [SPARK-23957][SQL] Sorts in subqueries are redundant and can be removed
## What changes were proposed in this pull request?
Thanks to henryr for the original idea at https://github.com/apache/spark/pull/21049

Description from the original PR :
Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering
them is therefore redundant (unless combined with a limit).

This patch removes the top sort operators from the subquery plans.

This closes https://github.com/apache/spark/pull/21049.

## How was this patch tested?
Added test cases in SubquerySuite to cover in, exists and scalar subqueries.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21853 from dilipbiswal/SPARK-23957.
2018-07-24 20:46:27 -07:00
DB Tsai d4c3415894 [SPARK-24890][SQL] Short circuiting the if condition when trueValue and falseValue are the same
## What changes were proposed in this pull request?

When `trueValue` and `falseValue` are semantic equivalence, the condition expression in `if` can be removed to avoid extra computation in runtime.

## How was this patch tested?

Test added.

Author: DB Tsai <d_tsai@apple.com>

Closes #21848 from dbtsai/short-circuit-if.
2018-07-24 20:21:11 -07:00
maryannxue c26b092169 [SPARK-24891][SQL] Fix HandleNullInputsForUDF rule
## What changes were proposed in this pull request?

The HandleNullInputsForUDF would always add a new `If` node every time it is applied. That would cause a difference between the same plan being analyzed once and being analyzed twice (or more), thus raising issues like plan not matched in the cache manager. The solution is to mark the arguments as null-checked, which is to add a "KnownNotNull" node above those arguments, when adding the UDF under an `If` node, because clearly the UDF will not be called when any of those arguments is null.

## How was this patch tested?

Add new tests under sql/UDFSuite and AnalysisSuite.

Author: maryannxue <maryannxue@apache.org>

Closes #21851 from maryannxue/spark-24891.
2018-07-24 19:35:34 -07:00
Imran Rashid 15fff79032 [SPARK-24297][CORE] Fetch-to-disk by default for > 2gb
Fetch-to-mem is guaranteed to fail if the message is bigger than 2 GB,
so we might as well use fetch-to-disk in that case.  The message includes
some metadata in addition to the block data itself (in particular
UploadBlock has a lot of metadata), so we leave a little room.

Author: Imran Rashid <irashid@cloudera.com>

Closes #21474 from squito/SPARK-24297.
2018-07-25 09:08:42 +08:00
shane knapp 3efdf35327
[SPARK-24908][R][STYLE] removing spaces to make lintr happy
## What changes were proposed in this pull request?

during my travails in porting spark builds to run on our centos worker, i managed to recreate (as best i could) the centos environment on our new ubuntu-testing machine.

while running my initial builds, lintr was crashing on some extraneous spaces in test_basic.R (see:  https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/862/console)

after removing those spaces, the ubuntu build happily passed the lintr tests.

## How was this patch tested?

i then tested this against a modified spark-master-test-sbt-hadoop-2.6 build (see https://amplab.cs.berkeley.edu/jenkins/view/RISELab%20Infra/job/testing-spark-master-test-with-updated-R-crap/4/), which scp'ed a copy of test_basic.R in to the repo after the git clone.  everything seems to be working happily.

Author: shane knapp <incomplete@gmail.com>

Closes #21864 from shaneknapp/fixing-R-lint-spacing.
2018-07-24 16:13:57 -07:00
Eric Chang fc21f192a3 [SPARK-24895] Remove spotbugs plugin
## What changes were proposed in this pull request?

Spotbugs maven plugin was a recently added plugin before 2.4.0 snapshot artifacts were broken.  To ensure it does not affect the maven deploy plugin, this change removes it.

## How was this patch tested?

Local build was ran, but this patch will be actually tested by monitoring the apache repo artifacts and making sure metadata is correctly uploaded after this job is ran: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20Packaging/job/spark-master-maven-snapshots/

Author: Eric Chang <eric.chang@databricks.com>

Closes #21865 from ericfchang/SPARK-24895.
2018-07-24 15:53:50 -07:00
s71955 d4a277f0ce [SPARK-24812][SQL] Last Access Time in the table description is not valid
## What changes were proposed in this pull request?

Last Access Time will always displayed wrong date Thu Jan 01 05:30:00 IST 1970 when user run  DESC FORMATTED table command
In hive its displayed as "UNKNOWN" which makes more sense than displaying wrong date. seems to be a limitation as of now even from hive, better we can follow the hive behavior unless the limitation has been resolved from hive.

spark client output
![spark_desc table](https://user-images.githubusercontent.com/12999161/42753448-ddeea66a-88a5-11e8-94aa-ef8d017f94c5.png)

Hive client output
![hive_behaviour](https://user-images.githubusercontent.com/12999161/42753489-f4fd366e-88a5-11e8-83b0-0f3a53ce83dd.png)

## How was this patch tested?
UT has been added which makes sure that the wrong date "Thu Jan 01 05:30:00 IST 1970 "
shall not be added as value for the Last Access  property

Author: s71955 <sujithchacko.2010@gmail.com>

Closes #21775 from sujith71955/master_hive.
2018-07-24 11:31:27 -07:00
Ryan Blue 9d27541a85 [SPARK-23325] Use InternalRow when reading with DataSourceV2.
## What changes were proposed in this pull request?

This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.

Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.

## How was this patch tested?

This uses existing tests.

Author: Ryan Blue <blue@apache.org>

Closes #21118 from rdblue/SPARK-23325-datasource-v2-internal-row.
2018-07-24 10:46:36 -07:00
hyukjinkwon 3d5c61e5fd [SPARK-22499][FOLLOWUP][SQL] Reduce input string expressions for Least and Greatest to reduce time in its test
## What changes were proposed in this pull request?

It's minor and trivial but looks 2000 input is good enough to reproduce and test in SPARK-22499.

## How was this patch tested?

Manually brought the change and tested.

Locally tested:

Before: 3m 21s 288ms
After: 1m 29s 134ms

Given the latest successful build took:

```
ArithmeticExpressionSuite:
- SPARK-22499: Least and greatest should not generate codes beyond 64KB (7 minutes, 49 seconds)
```

I expect it's going to save 4ish mins.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21855 from HyukjinKwon/minor-fix-suite.
2018-07-24 19:51:09 +08:00
10129659 13a67b070d [SPARK-24870][SQL] Cache can't work normally if there are case letters in SQL
## What changes were proposed in this pull request?
Modified the canonicalized to not case-insensitive.
Before the PR, cache can't work normally if there are case letters in SQL,
for example:
     sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")

    sql("select key, sum(case when Key > 0 then 1 else 0 end) as positiveNum " +
      "from src group by key").cache().createOrReplaceTempView("src_cache")
    sql(
      s"""select a.key
           from
           (select key from src_cache where positiveNum = 1)a
           left join
           (select key from src_cache )b
           on a.key=b.key
        """).explain

The physical plan of the sql is:
![image](https://user-images.githubusercontent.com/26834091/42979518-3decf0fa-8c05-11e8-9837-d5e4c334cb1f.png)

The subquery "select key from src_cache where positiveNum = 1" on the left of join can use the cache data, but the subquery "select key from src_cache" on the right of join cannot use the cache data.

## How was this patch tested?

new added test

Author: 10129659 <chen.yanshan@zte.com.cn>

Closes #21823 from eatoncys/canonicalized.
2018-07-23 23:05:08 -07:00
“attilapiros” d2436a8529 [SPARK-24594][YARN] Introducing metrics for YARN
## What changes were proposed in this pull request?

In this PR metrics are introduced for YARN.  As up to now there was no metrics in the YARN module a new metric system is created with the name "applicationMaster".
To support both client and cluster mode the metric system lifecycle is bound to the AM.

## How was this patch tested?

Both client and cluster mode was tested manually.
Before the test on one of the YARN node spark-core was removed to cause the allocation failure.
Spark was started as (in case of client mode):

```
spark2-submit \
  --class org.apache.spark.examples.SparkPi \
  --conf "spark.yarn.blacklist.executor.launch.blacklisting.enabled=true" --conf "spark.blacklist.application.maxFailedExecutorsPerNode=2" --conf "spark.dynamicAllocation.enabled=true" --conf "spark.metrics.conf.*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink" \
  --master yarn \
  --deploy-mode client \
  original-spark-examples_2.11-2.4.0-SNAPSHOT.jar \
  1000
```

In both cases the YARN logs contained the new metrics as:

```
$ yarn logs --applicationId application_1529926424933_0015
...
-- Gauges ----------------------------------------------------------------------
application_1531751594108_0046.applicationMaster.numContainersPendingAllocate
             value = 0
application_1531751594108_0046.applicationMaster.numExecutorsFailed
             value = 3
application_1531751594108_0046.applicationMaster.numExecutorsRunning
             value = 9
application_1531751594108_0046.applicationMaster.numLocalityAwareTasks
             value = 0
application_1531751594108_0046.applicationMaster.numReleasedContainers
             value = 0
...

```

Author: “attilapiros” <piros.attila.zsolt@gmail.com>
Author: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>

Closes #21635 from attilapiros/SPARK-24594.
2018-07-24 09:33:10 +08:00
Yuanjian Li cfc3e1aaa4 [SPARK-24339][SQL] Prunes the unused columns from child of ScriptTransformation
## What changes were proposed in this pull request?

Modify the strategy in ColumnPruning to add a Project between ScriptTransformation and its child, this strategy can reduce the scan time especially in the scenario of the table has many columns.

## How was this patch tested?

Add UT in ColumnPruningSuite and ScriptTransformationSuite.

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes #21839 from xuanyuanking/SPARK-24339.
2018-07-23 13:04:39 -07:00
Tathagata Das 61f0ca4f1c [SPARK-24699][SS] Make watermarks work with Trigger.Once by saving updated watermark to commit log
## What changes were proposed in this pull request?

Streaming queries with watermarks do not work with Trigger.Once because of the following.
- Watermark is updated in the driver memory after a batch completes, but it is persisted to checkpoint (in the offset log) only when the next batch is planned
- In trigger.once, the query terminated as soon as one batch has completed. Hence, the updated watermark is never persisted anywhere.

The simple solution is to persist the updated watermark value in the commit log when a batch is marked as completed. Then the next batch, in the next trigger.once run can pick it up from the commit log.

## How was this patch tested?
new unit tests

Co-authored-by: Tathagata Das <tathagata.das1565gmail.com>
Co-authored-by: c-horn <chorn4033gmail.com>

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21746 from tdas/SPARK-24699.
2018-07-23 13:03:32 -07:00
Onur Satici 2edf17effd [SPARK-24850][SQL] fix str representation of CachedRDDBuilder
## What changes were proposed in this pull request?
As of https://github.com/apache/spark/pull/21018, InMemoryRelation includes its cacheBuilder when logging query plans. This PR changes the string representation of the CachedRDDBuilder to not include the cached spark plan.

## How was this patch tested?

spark-shell, query:
```
var df_cached = spark.read.format("csv").option("header", "true").load("test.csv").cache()
0 to 1 foreach { _ =>
df_cached = df_cached.join(spark.read.format("csv").option("header", "true").load("test.csv"), "A").cache()
}
df_cached.explain
```
as of master results in:
```
== Physical Plan ==
InMemoryTableScan [A#10, B#11, B#35, B#87]
+- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
+- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#34)
+- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
+- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
+- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#34)
+- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
+- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
```
with this patch results in:
```
== Physical Plan ==
InMemoryTableScan [A#10, B#11, B#35, B#87]
   +- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
         +- *(2) Project [A#10, B#11, B#35, B#87]
            +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
               :- *(2) Filter isnotnull(A#10)
               :  +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
               :        +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
               :              +- *(2) Project [A#10, B#11, B#35]
               :                 +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
               :                    :- *(2) Filter isnotnull(A#10)
               :                    :  +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
               :                    :        +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
               :                    :              +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
               :                    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
               :                       +- *(1) Filter isnotnull(A#34)
               :                          +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
               :                                +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
               :                                      +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
               +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
                  +- *(1) Filter isnotnull(A#86)
                     +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
                           +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
                                 +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
```

Author: Onur Satici <osatici@palantir.com>

Closes #21805 from onursatici/os/inmemoryrelation-str.
2018-07-23 09:52:28 -07:00
Gengliang Wang 08e315f633 [SPARK-24887][SQL] Avro: use SerializableConfiguration in Spark utils to deduplicate code
## What changes were proposed in this pull request?

To implement the method `buildReader` in `FileFormat`, it is required to serialize the hadoop configuration for executors.

Previous spark-avro uses its own class `SerializableConfiguration` for the serialization. As now it is part of Spark, we can use SerializableConfiguration in Spark util to deduplicate the code.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21846 from gengliangwang/removeSerializableConfiguration.
2018-07-23 08:31:48 -07:00
maryannxue 434319e73f [SPARK-24802][SQL] Add a new config for Optimization Rule Exclusion
## What changes were proposed in this pull request?

Since Spark has provided fairly clear interfaces for adding user-defined optimization rules, it would be nice to have an easy-to-use interface for excluding an optimization rule from the Spark query optimizer as well.

This would make customizing Spark optimizer easier and sometimes could debugging issues too.

- Add a new config spark.sql.optimizer.excludedRules, with the value being a list of rule names separated by comma.
- Modify the current batches method to remove the excluded rules from the default batches. Log the rules that have been excluded.
- Split the existing default batches into "post-analysis batches" and "optimization batches" so that only rules in the "optimization batches" can be excluded.

## How was this patch tested?

Add a new test suite: OptimizerRuleExclusionSuite

Author: maryannxue <maryannxue@apache.org>

Closes #21764 from maryannxue/rule-exclusion.
2018-07-23 08:25:24 -07:00
SongYadong ab18b02e66 [SQL][HIVE] Correct an assert message in function makeRDDForTable
## What changes were proposed in this pull request?
according to the context, "makeRDDForTablePartitions" in assert message should be "makeRDDForPartitionedTable", because "makeRDDForTablePartitions" does't exist in spark code.

## How was this patch tested?
unit tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: SongYadong <song.yadong1@zte.com.cn>

Closes #21836 from SongYadong/assert_info_modify.
2018-07-23 19:10:53 +08:00
Gengliang Wang f59de52a2a [SPARK-24883][SQL] Avro: remove implicit class AvroDataFrameWriter/AvroDataFrameReader
## What changes were proposed in this pull request?

As per Reynold's comment: https://github.com/apache/spark/pull/21742#discussion_r203496489

It makes sense to remove the implicit class AvroDataFrameWriter/AvroDataFrameReader, since the Avro package is external module.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21841 from gengliangwang/removeImplicit.
2018-07-23 15:27:33 +08:00
Gengliang Wang 8817c68f50 [SPARK-24811][SQL] Avro: add new function from_avro and to_avro
## What changes were proposed in this pull request?

1. Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

2. Add a new function to_avro for converting a column into binary of avro format with the specified schema.

I created #21774 for this, but it failed the build https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/

Additional changes In this PR:
1. Add `scalacheck` dependency in pom.xml to resolve the failure.
2. Update the `log4j.properties` to make it consistent with other modules.

## How was this patch tested?

Unit test
Compile with different commands:
```
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
```

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21838 from gengliangwang/from_and_to_avro.
2018-07-22 17:36:57 -07:00
zhengruifeng3 81af88687f [SPARK-23231][ML][DOC] Add doc for string indexer ordering to user guide (also to RFormula guide)
## What changes were proposed in this pull request?
add doc for string indexer ordering

## How was this patch tested?
existing tests

Author: zhengruifeng3 <zhengruifeng@jd.com>
Author: zhengruifeng <ruifengz@foxmail.com>

Closes #21792 from zhengruifeng/doc_string_indexer_ordering.
2018-07-21 08:26:45 -05:00
Yuming Wang d7ae4247ea [SPARK-24873][YARN] Turn off spark-shell noisy log output
## What changes were proposed in this pull request?

[SPARK-24182](https://github.com/apache/spark/pull/21243) changed the `logApplicationReport` from `false` to `true`. This pr revert it to `false`. otherwise `spark-shell` will show noisy log output:
```java
...
18/07/16 04:46:25 INFO Client: Application report for application_1530676576026_54551 (state: RUNNING)
18/07/16 04:46:26 INFO Client: Application report for application_1530676576026_54551 (state: RUNNING)
...
```

Closes https://github.com/apache/spark/pull/21827

## How was this patch tested?

 manual tests

Author: Yuming Wang <yumwang@ebay.com>

Closes #21784 from wangyum/SPARK-24182.
2018-07-21 16:43:10 +08:00
Maxim Gekk 106880edcd [SPARK-24836][SQL] New option for Avro datasource - ignoreExtension
## What changes were proposed in this pull request?

I propose to add new option for AVRO datasource which should control ignoring of files without `.avro` extension in read. The option has name `ignoreExtension` with default value `true`. If both options `ignoreExtension` and `avro.mapred.ignore.inputs.without.extension` are set, `ignoreExtension` overrides the former one. Here is an example of usage:

```
spark
  .read
  .option("ignoreExtension", false)
  .avro("path to avro files")
```

## How was this patch tested?

I added a test which checks the option directly and a test for checking that new option overrides hadoop's config.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21798 from MaxGekk/avro-ignore-extension.
2018-07-20 20:04:40 -07:00
William Sheu bbd6f0c25f [SPARK-24879][SQL] Fix NPE in Hive partition pruning filter pushdown
## What changes were proposed in this pull request?
We get a NPE when we have a filter on a partition column of the form `col in (x, null)`. This is due to the filter converter in HiveShim not handling `null`s correctly. This patch fixes this bug while still pushing down as much of the partition pruning predicates as possible, by filtering out `null`s from any `in` predicate. Since Hive only supports very simple partition pruning filters, this change should preserve correctness.

## How was this patch tested?
Unit tests, manual tests

Author: William Sheu <william.sheu@databricks.com>

Closes #21832 from PenguinToast/partition-pruning-npe.
2018-07-20 19:59:28 -07:00
William Sheu 96f3120760 [PYSPARK][TEST][MINOR] Fix UDFInitializationTests
## What changes were proposed in this pull request?

Fix a typo in pyspark sql tests

Author: William Sheu <william.sheu@databricks.com>

Closes #21833 from PenguinToast/fix-test-typo.
2018-07-20 19:48:32 -07:00
Brandon Krieger 597bdeff2d [SPARK-24488][SQL] Fix issue when generator is aliased multiple times
## What changes were proposed in this pull request?

Currently, the Analyzer throws an exception if your try to nest a generator. However, it special cases generators "nested" in an alias, and allows that. If you try to alias a generator twice, it is not caught by the special case, so an exception is thrown.

This PR trims the unnecessary, non-top-level aliases, so that the generator is allowed.

## How was this patch tested?

new tests in AnalysisSuite.

Author: Brandon Krieger <bkrieger@palantir.com>

Closes #21508 from bkrieger/bk/SPARK-24488.
2018-07-21 00:44:00 +02:00
zsxwing f765bb7823 [SPARK-24880][BUILD] Fix the group id for spark-kubernetes-integration-tests
## What changes were proposed in this pull request?

The correct group id should be `org.apache.spark`. This is causing the nightly build failure: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-maven-snapshots/2295/console

`
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-deploy-plugin:2.8.2:deploy (default-deploy) on project spark-kubernetes-integration-tests_2.11: Failed to deploy artifacts: Could not transfer artifact spark-kubernetes-integration-tests:spark-kubernetes-integration-tests_2.11🫙2.4.0-20180720.101629-1 from/to apache.snapshots.https (https://repository.apache.org/content/repositories/snapshots): Access denied to: https://repository.apache.org/content/repositories/snapshots/spark-kubernetes-integration-tests/spark-kubernetes-integration-tests_2.11/2.4.0-SNAPSHOT/spark-kubernetes-integration-tests_2.11-2.4.0-20180720.101629-1.jar, ReasonPhrase: Forbidden. -> [Help 1]
[ERROR]
`

## How was this patch tested?

Jenkins.

Author: zsxwing <zsxwing@gmail.com>

Closes #21831 from zsxwing/fix-k8s-test.
2018-07-20 15:23:04 -07:00
Gengliang Wang 00b864aa70 [SPARK-24876][SQL] Avro: simplify schema serialization
## What changes were proposed in this pull request?

Previously in the refactoring of Avro Serializer and Deserializer, a new class SerializableSchema is created for serializing the Avro schema:
https://github.com/apache/spark/pull/21762/files#diff-01fea32e6ec6bcf6f34d06282e08705aR37

On second thought, we can use `toString` method for serialization. After that, parse the JSON format schema on executor. This makes the code much simpler.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21829 from gengliangwang/removeSerializableSchema.
2018-07-20 14:57:59 -07:00
Daniel van der Ende 2333a34d39 [SPARK-22880][SQL] Add cascadeTruncate option to JDBC datasource
This commit adds the `cascadeTruncate` option to the JDBC datasource
API, for databases that support this functionality (PostgreSQL and
Oracle at the moment). This allows for applying a cascading truncate
that affects tables that have foreign key constraints on the table
being truncated.

## What changes were proposed in this pull request?

Add `cascadeTruncate` option to JDBC datasource API. Allow this to affect the
`TRUNCATE` query for databases that support this option.

## How was this patch tested?
Existing tests for `truncateQuery` were updated. Also, an additional test was added
to ensure that the correct syntax was applied, and that enabling the config for databases
that do not support this option does not result in invalid queries.

Author: Daniel van der Ende <daniel.vanderende@gmail.com>

Closes #20057 from danielvdende/SPARK-22880.
2018-07-20 13:03:57 -07:00
Xiao Li 9ad77b3037 Revert "[SPARK-24811][SQL] Avro: add new function from_avro and to_avro"
This reverts commit 244bcff194.
2018-07-20 12:55:38 -07:00
Bago Amirbekian 3cb1b57809 [SPARK-24852][ML] Update spark.ml to use Instrumentation.instrumented.
## What changes were proposed in this pull request?

Followup for #21719.
Update spark.ml training code to fully wrap instrumented methods and remove old instrumentation APIs.

## How was this patch tested?

existing tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Bago Amirbekian <bago@databricks.com>

Closes #21799 from MrBago/new-instrumentation-apis2.
2018-07-20 12:13:15 -07:00
Gengliang Wang 244bcff194 [SPARK-24811][SQL] Avro: add new function from_avro and to_avro
## What changes were proposed in this pull request?

Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

Add a new function to_avro for converting a column into binary of avro format with the specified schema.

This PR is in progress. Will add test cases.
## How was this patch tested?

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21774 from gengliangwang/from_and_to_avro.
2018-07-20 09:19:29 -07:00
Marco Gaido cc4d64bb16 [SPARK-23451][ML] Deprecate KMeans.computeCost
## What changes were proposed in this pull request?

Deprecate `KMeans.computeCost` which was introduced as a temp fix and now it is not needed anymore, since we introduced `ClusteringEvaluator`.

## How was this patch tested?

manual test (deprecation warning displayed)
Scala
```
...
scala> model.computeCost(dataset)
warning: there was one deprecation warning; re-run with -deprecation for details
res1: Double = 0.0
```

Python
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
...
>>> model.computeCost(df)
/Users/mgaido/apache/spark/python/pyspark/ml/clustering.py:330: DeprecationWarning: Deprecated in 2.4.0. It will be removed in 3.0.0. Use ClusteringEvaluator instead.
  " instead.", DeprecationWarning)
```

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20629 from mgaido91/SPARK-23451.
2018-07-20 09:18:57 -07:00
hyukjinkwon e0b6383218 [SPARK-23731][SQL] Make FileSourceScanExec canonicalizable after being (de)serialized
## What changes were proposed in this pull request?

### What's problem?

In some cases, sub scalar query could throw a NPE, which is caused in execution side.

```
java.lang.NullPointerException
	at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:169)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:225)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
	at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
	at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
	at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
	at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
	at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
	at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.get(HashMap.scala:70)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:56)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:97)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:98)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### How does this happen?

Here looks what happen now:

1. Sub scalar query was made (for instance `SELECT (SELECT id FROM foo)`).

2. Try to extract some common expressions (via `CodeGenerator.subexpressionElimination`) so that it can generates some common codes and can be reused.

3. During this, seems it extracts some expressions that can be reused (via `EquivalentExpressions.addExprTree`)

  b2deef64f6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala (L1102)

4. During this, if the hash (`EquivalentExpressions.Expr.hashCode`) happened to be the same at `EquivalentExpressions.addExpr` anyhow, `EquivalentExpressions.Expr.equals` is called to identify object in the same hash, which eventually calls `semanticEquals` in `ScalarSubquery`

  087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L54)

  087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L36)

5. `ScalarSubquery`'s `semanticEquals` needs `SubqueryExec`'s `sameResult`

  77a2fc5b52/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala (L58)

6. `SubqueryExec`'s `sameResult` requires a canonicalized plan which calls `FileSourceScanExec`'s `doCanonicalize`

  e008ad1752/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala (L258)

7. In `FileSourceScanExec`'s `doCanonicalize`, `FileSourceScanExec`'s `relation` is required but seems `transient` so it becomes `null`.

  e76b0124fb/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala (L527)

  e76b0124fb/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala (L160)

8. NPE is thrown.

\*1. driver side
\*2., 3., 4., 5., 6., 7., 8. executor side

Note that most of cases, it looks fine because we will usually call:

087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L40)

which make a canonicalized plan via:

b045315e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala (L192)

77a2fc5b52/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala (L52)

### How to reproduce?

This looks what happened now. I can reproduce this by a bit of messy way:

```diff
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 8d06804ce1e..d25fc9a7ba9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
 -37,7 +37,9  class EquivalentExpressions {
       case _ => false
     }

-    override def hashCode: Int = e.semanticHash()
+    override def hashCode: Int = {
+      1
+    }
   }
```

```scala
spark.range(1).write.mode("overwrite").parquet("/tmp/foo")
spark.read.parquet("/tmp/foo").createOrReplaceTempView("foo")
spark.conf.set("spark.sql.codegen.wholeStage", false)
sql("SELECT (SELECT id FROM foo) == (SELECT id FROM foo)").collect()
```

### How does this PR fix?

- Make all variables that access to `FileSourceScanExec`'s `relation` as `lazy val` so that we avoid NPE. This is a temporary fix.

- Allow `makeCopy` in `SparkPlan` without Spark session too. This looks still able to be accessed within executor side. For instance:

  ```
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:70)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:47)
	at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:233)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:243)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
	at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
	at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
	at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
	at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
	at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
	at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.get(HashMap.scala:70)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
  ```

This PR takes over https://github.com/apache/spark/pull/20856.

## How was this patch tested?

Manually tested and unit test was added.

Closes #20856

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21815 from HyukjinKwon/SPARK-23731.
2018-07-20 20:59:48 +08:00
Stavros Kontopoulos 20ce1a8f8b [SPARK-24551][K8S] Add integration tests for secrets
## What changes were proposed in this pull request?

- Adds integration tests for env and mount secrets.

## How was this patch tested?

Manually by checking that secrets were added to the containers and by tuning the tests.

![image](https://user-images.githubusercontent.com/7945591/42968472-fee3740a-8bab-11e8-9eac-573f67d861fc.png)

Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>

Closes #21652 from skonto/add-secret-its.
2018-07-20 07:55:58 -05:00
Takuya UESHIN 7b6d36bc9e [SPARK-24871][SQL] Refactor Concat and MapConcat to avoid creating concatenator object for each row.
## What changes were proposed in this pull request?

Refactor `Concat` and `MapConcat` to:

- avoid creating concatenator object for each row.
- make `Concat` handle `containsNull` properly.
- make `Concat` shortcut if `null` child is found.

## How was this patch tested?

Added some tests and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21824 from ueshin/issues/SPARK-24871/refactor_concat_mapconcat.
2018-07-20 20:08:42 +08:00
Huaxin Gao 0ab07b357b [SPARK-24868][PYTHON] add sequence function in Python
## What changes were proposed in this pull request?

Add ```sequence``` in functions.py

## How was this patch tested?

Add doctest.

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #21820 from huaxingao/spark-24868.
2018-07-20 17:53:14 +08:00
Dilip Biswal 2b91d9918c [SPARK-24424][SQL] Support ANSI-SQL compliant syntax for GROUPING SET
## What changes were proposed in this pull request?

Enhances the parser and analyzer to support ANSI compliant syntax for GROUPING SET. As part of this change we derive the grouping expressions from user supplied groupings in the grouping sets clause.

```SQL
SELECT c1, c2, max(c3)
FROM t1
GROUP BY GROUPING SETS ((c1), (c1, c2))
```

## How was this patch tested?
Added tests in SQLQueryTestSuite and ResolveGroupingAnalyticsSuite.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21813 from dilipbiswal/spark-24424.
2018-07-19 23:52:53 -07:00
Marco Gaido a5925c1631 [SPARK-24268][SQL] Use datatype.catalogString in error messages
## What changes were proposed in this pull request?

As stated in https://github.com/apache/spark/pull/21321, in the error messages we should use `catalogString`. This is not the case, as SPARK-22893 used `simpleString` in order to have the same representation everywhere and it missed some places.

The PR unifies the messages using alway the `catalogString` representation of the dataTypes in the messages.

## How was this patch tested?

existing/modified UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21804 from mgaido91/SPARK-24268_catalog.
2018-07-19 23:29:29 -07:00