Commit graph

22195 commits

Author SHA1 Message Date
maryannxue e3201e165e [SPARK-24035][SQL] SQL syntax for Pivot
## What changes were proposed in this pull request?

Add SQL support for Pivot according to Pivot grammar defined by Oracle (https://docs.oracle.com/database/121/SQLRF/img_text/pivot_clause.htm) with some simplifications, based on our existing functionality and limitations for Pivot at the backend:
1. For pivot_for_clause (https://docs.oracle.com/database/121/SQLRF/img_text/pivot_for_clause.htm), the column list form is not supported, which means the pivot column can only be one single column.
2. For pivot_in_clause (https://docs.oracle.com/database/121/SQLRF/img_text/pivot_in_clause.htm), the sub-query form and "ANY" is not supported (this is only supported by Oracle for XML anyway).
3. For pivot_in_clause, aliases for the constant values are not supported.

The code changes are:
1. Add parser support for Pivot. Note that according to https://docs.oracle.com/database/121/SQLRF/statements_10002.htm#i2076542, Pivot cannot be used together with lateral views in the from clause. This restriction has been implemented in the Parser rule.
2. Infer group-by expressions: group-by expressions are not explicitly specified in SQL Pivot clause and need to be deduced based on this rule: https://docs.oracle.com/database/121/SQLRF/statements_10002.htm#CHDFAFIE, so we have to post-fix it at query analysis stage.
3. Override Pivot.resolved as "false": for the reason mentioned in [2] and the fact that output attributes change after Pivot being replaced by Project or Aggregate, we avoid resolving parent references until after Pivot has been resolved and replaced.
4. Verify aggregate expressions: only aggregate expressions with or without aliases can appear in the first part of the Pivot clause, and this check is performed as analysis stage.

## How was this patch tested?

A new test suite PivotSuite is added.

Author: maryannxue <maryann.xue@gmail.com>

Closes #21187 from maryannxue/spark-24035.
2018-05-03 17:05:02 -07:00
Imran Rashid 94641fe6cc [SPARK-23433][CORE] Late zombie task completions update all tasksets
Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <irashid@cloudera.com>

Closes #21131 from squito/SPARK-23433.
2018-05-03 10:59:18 -05:00
Wenchen Fan 96a50016bb [SPARK-24169][SQL] JsonToStructs should not access SQLConf at executor side
## What changes were proposed in this pull request?

This PR is extracted from #21190 , to make it easier to backport.

`JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body.

## How was this patch tested?

tested in #21190

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21226 from cloud-fan/minor4.
2018-05-03 23:36:09 +08:00
Wenchen Fan 991b526992 [SPARK-24166][SQL] InMemoryTableScanExec should not access SQLConf at executor side
## What changes were proposed in this pull request?

This PR is extracted from https://github.com/apache/spark/pull/21190 , to make it easier to backport.

`InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there.

## How was this patch tested?

it's tested in #21190

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21223 from cloud-fan/minor1.
2018-05-03 19:56:30 +08:00
Wenchen Fan 417ad92502 [SPARK-23715][SQL] the input of to/from_utc_timestamp can not have timezone
## What changes were proposed in this pull request?

`from_utc_timestamp` assumes its input is in UTC timezone and shifts it to the specified timezone. When the timestamp contains timezone(e.g. `2018-03-13T06:18:23+00:00`), Spark breaks the semantic and respect the timezone in the string. This is not what user expects and the result is different from Hive/Impala. `to_utc_timestamp` has the same problem.

More details please refer to the JIRA ticket.

This PR fixes this by returning null if the input timestamp contains timezone.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21169 from cloud-fan/from_utc_timezone.
2018-05-03 19:27:01 +08:00
Dongjoon Hyun c9bfd1c6f8 [SPARK-23489][SQL][TEST] HiveExternalCatalogVersionsSuite should verify the downloaded file
## What changes were proposed in this pull request?

Although [SPARK-22654](https://issues.apache.org/jira/browse/SPARK-22654) made `HiveExternalCatalogVersionsSuite` download from Apache mirrors three times, it has been flaky because it didn't verify the downloaded file. Some Apache mirrors terminate the downloading abnormally, the *corrupted* file shows the following errors.

```
gzip: stdin: not in gzip format
tar: Child returned status 1
tar: Error is not recoverable: exiting now
22:46:32.700 WARN org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.hive.HiveExternalCatalogVersionsSuite, thread names: Keep-Alive-Timer =====

*** RUN ABORTED ***
  java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "/tmp/test-spark/spark-2.2.0"): error=2, No such file or directory
```

This has been reported weirdly in two ways. For example, the above case is reported as Case 2 `no failures`.

- Case 1. [Test Result (1 failure / +1)](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/4389/)
- Case 2. [Test Result (no failures)](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.6/4811/)

This PR aims to make `HiveExternalCatalogVersionsSuite` more robust by verifying the downloaded `tgz` file by extracting and checking the existence of `bin/spark-submit`. If it turns out that the file is empty or corrupted, `HiveExternalCatalogVersionsSuite` will do retry logic like the download failure.

## How was this patch tested?

Pass the Jenkins.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21210 from dongjoon-hyun/SPARK-23489.
2018-05-03 15:15:05 +08:00
jerryshao bf4352ca6c [SPARK-24110][THRIFT-SERVER] Avoid UGI.loginUserFromKeytab in STS
## What changes were proposed in this pull request?

Spark ThriftServer will call UGI.loginUserFromKeytab twice in initialization. This is unnecessary and will cause various potential problems, like Hadoop IPC failure after 7 days, or RM failover issue and so on.

So here we need to remove all the unnecessary login logics and make sure UGI in the context never be created again.

Note this is actually a HS2 issue, If later on we upgrade supported Hive version, the issue may already be fixed in Hive side.

## How was this patch tested?

Local verification in secure cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #21178 from jerryshao/SPARK-24110.
2018-05-03 09:28:14 +08:00
Takeshi Yamamuro e4c91c089a [SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries in TPCDSQueryBenchmark
## What changes were proposed in this pull request?
This pr added  the TPCDS v2.7 (latest) queries in `TPCDSQueryBenchmark`.
These query files have been added in `SPARK-23167`.

## How was this patch tested?
Manually checked.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21177 from maropu/AddTpcdsV2_7InBenchmark.
2018-05-02 16:12:21 -07:00
Kazuaki Ishizaki 5be8aab144 [SPARK-23923][SQL] Add cardinality function
## What changes were proposed in this pull request?

The PR adds the SQL function `cardinality`. The behavior of the function is based on Presto's one.

The function returns the length of the array or map stored in the column as `int` while the Presto version returns the value as `BigInt` (`long` in Spark). The discussions regarding the difference of return type are [here](https://github.com/apache/spark/pull/21031#issuecomment-381284638) and [there](https://github.com/apache/spark/pull/21031#discussion_r181622107).

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21031 from kiszk/SPARK-23923.
2018-05-02 13:53:10 -07:00
Marco Gaido 504c9cfd21 [SPARK-24123][SQL] Fix precision issues in monthsBetween with more than 8 digits
## What changes were proposed in this pull request?

SPARK-23902 introduced the ability to retrieve more than 8 digits in `monthsBetween`. Unfortunately, current implementation can cause precision loss in such a case. This was causing also a flaky UT.

This PR mirrors Hive's implementation in order to avoid precision loss also when more than 8 digits are returned.

## How was this patch tested?

running 10000000 times the flaky UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21196 from mgaido91/SPARK-24123.
2018-05-02 13:49:15 -07:00
Ala Luszczak 8bd27025b7 [SPARK-24133][SQL] Check for integer overflows when resizing WritableColumnVectors
## What changes were proposed in this pull request?

`ColumnVector`s store string data in one big byte array. Since the array size is capped at just under Integer.MAX_VALUE, a single `ColumnVector` cannot store more than 2GB of string data.
But since the Parquet files commonly contain large blobs stored as strings, and `ColumnVector`s by default carry 4096 values, it's entirely possible to go past that limit. In such cases a negative capacity is requested from `WritableColumnVector.reserve()`. The call succeeds (requested capacity is smaller than already allocated capacity), and consequently `java.lang.ArrayIndexOutOfBoundsException` is thrown when the reader actually attempts to put the data into the array.

This change introduces a simple check for integer overflow to `WritableColumnVector.reserve()` which should help catch the error earlier and provide more informative exception. Additionally, the error message in `WritableColumnVector.throwUnsupportedException()` was corrected, as it previously encouraged users to increase rather than reduce the batch size.

## How was this patch tested?

New units tests were added.

Author: Ala Luszczak <ala@databricks.com>

Closes #21206 from ala/overflow-reserve.
2018-05-02 12:43:19 -07:00
Marco Gaido 8dbf56c055 [SPARK-24013][SQL] Remove unneeded compress in ApproximatePercentile
## What changes were proposed in this pull request?

`ApproximatePercentile` contains a workaround logic to compress the samples since at the beginning `QuantileSummaries` was ignoring the compression threshold. This problem was fixed in SPARK-17439, but the workaround logic was not removed. So we are compressing the samples many more times than needed: this could lead to critical performance degradation.

This can create serious performance issues in queries like:
```
select approx_percentile(id, array(0.1)) from range(10000000)
```

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21133 from mgaido91/SPARK-24013.
2018-05-02 11:58:55 -07:00
WangJinhai02 152eaf6ae6 [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method has not reset the limit value
JIRA Issue: https://issues.apache.org/jira/browse/SPARK-24107?jql=text%20~%20%22ChunkedByteBuffer%22

ChunkedByteBuffer.writeFully method has not reset the limit value. When
chunks larger than bufferWriteChunkSize, such as 80 * 1024 * 1024 larger than
config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost 16 * 1024 * 1024 byte

Author: WangJinhai02 <jinhai.wang02@ele.me>

Closes #21175 from manbuyun/bugfix-ChunkedByteBuffer.
2018-05-02 22:40:14 +08:00
Kazuaki Ishizaki 9215ee7a16 [SPARK-23976][CORE] Detect length overflow in UTF8String.concat()/ByteArray.concat()
## What changes were proposed in this pull request?

This PR detects length overflow if total elements in inputs are not acceptable.

For example, when the three inputs has `0x7FFF_FF00`, `0x7FFF_FF00`, and `0xE00`, we should detect length overflow since we cannot allocate such a large structure on `byte[]`.
On the other hand, the current algorithm can allocate the result structure with `0x1000`-byte length due to integer sum overflow.

## How was this patch tested?

Existing UTs.
If we would create UTs, we need large heap (6-8GB). It may make test environment unstable.
If it is necessary to create UTs, I will create them.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21064 from kiszk/SPARK-23976.
2018-05-02 10:41:34 +02:00
Liang-Chi Hsieh e15850be6e [SPARK-24131][PYSPARK] Add majorMinorVersion API to PySpark for determining Spark versions
## What changes were proposed in this pull request?

We need to determine Spark major and minor versions in PySpark. We can add a `majorMinorVersion` API to PySpark which is similar to the Scala API in `VersionUtils.majorMinorVersion`.

## How was this patch tested?

Added tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21203 from viirya/SPARK-24131.
2018-05-02 10:55:01 +08:00
Bounkong Khamphousone 6782359a04 [SPARK-23941][MESOS] Mesos task failed on specific spark app name
## What changes were proposed in this pull request?
Shell escaped the name passed to spark-submit and change how conf attributes are shell escaped.

## How was this patch tested?
This test has been tested manually with Hive-on-spark with mesos or with the use case described in the issue with the sparkPi application with a custom name which contains illegal shell characters.

With this PR, hive-on-spark on mesos works like a charm with hive 3.0.0-SNAPSHOT.

I state that this contribution is my original work and that I license the work to the project under the project’s open source license

Author: Bounkong Khamphousone <bounkong.khamphousone@ebiznext.com>

Closes #21014 from tiboun/fix/SPARK-23941.
2018-05-01 08:28:21 -07:00
wangyanlin01 7bbec0dced [SPARK-24061][SS] Add TypedFilter support for continuous processing
## What changes were proposed in this pull request?

Add TypedFilter support for continuous processing application.

## How was this patch tested?

unit tests

Author: wangyanlin01 <wangyanlin01@baidu.com>

Closes #21136 from yanlin-Lynn/SPARK-24061.
2018-05-01 16:22:52 +08:00
Dongjoon Hyun b857fb549f [SPARK-23853][PYSPARK][TEST] Run Hive-related PySpark tests only for -Phive
## What changes were proposed in this pull request?

When `PyArrow` or `Pandas` are not available, the corresponding PySpark tests are skipped automatically. Currently, PySpark tests fail when we are not using `-Phive`. This PR aims to skip Hive related PySpark tests when `-Phive` is not given.

**BEFORE**
```bash
$ build/mvn -DskipTests clean package
$ python/run-tests.py --python-executables python2.7 --modules pyspark-sql
File "/Users/dongjoon/spark/python/pyspark/sql/readwriter.py", line 295, in pyspark.sql.readwriter.DataFrameReader.table
...
IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveExternalCatalog':"
**********************************************************************
   1 of   3 in pyspark.sql.readwriter.DataFrameReader.table
***Test Failed*** 1 failures.
```

**AFTER**
```bash
$ build/mvn -DskipTests clean package
$ python/run-tests.py --python-executables python2.7 --modules pyspark-sql
...
Tests passed in 138 seconds

Skipped tests in pyspark.sql.tests with python2.7:
...
    test_hivecontext (pyspark.sql.tests.HiveSparkSubmitTests) ... skipped 'Hive is not available.'
```

## How was this patch tested?

This is a test-only change. First, this should pass the Jenkins. Then, manually do the following.

```bash
build/mvn -DskipTests clean package
python/run-tests.py --python-executables python2.7 --modules pyspark-sql
```

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21141 from dongjoon-hyun/SPARK-23853.
2018-05-01 09:06:23 +08:00
Devaraj K 007ae6878f [SPARK-24003][CORE] Add support to provide spark.executor.extraJavaOptions in terms of App Id and/or Executor Id's
## What changes were proposed in this pull request?

Added support to specify the 'spark.executor.extraJavaOptions' value in terms of the `{{APP_ID}}` and/or `{{EXECUTOR_ID}}`,  `{{APP_ID}}` will be replaced by Application Id and `{{EXECUTOR_ID}}` will be replaced by Executor Id while starting the executor.

## How was this patch tested?

I have verified this by checking the executor process command and gc logs. I verified the same in different deployment modes(Standalone, YARN, Mesos) client and cluster modes.

Author: Devaraj K <devaraj@apache.org>

Closes #21088 from devaraj-kavali/SPARK-24003.
2018-04-30 13:40:03 -07:00
Wenchen Fan b42ad165bb [SPARK-24072][SQL] clearly define pushed filters
## What changes were proposed in this pull request?

filters like parquet row group filter, which is actually pushed to the data source but still to be evaluated by Spark, should also count as `pushedFilters`.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21143 from cloud-fan/step1.
2018-04-30 09:13:32 -07:00
Maxim Gekk 3121b411f7 [SPARK-23846][SQL] The samplingRatio option for CSV datasource
## What changes were proposed in this pull request?

I propose to support the `samplingRatio` option for schema inferring of CSV datasource similar to the same option of JSON datasource:
b14993e1fc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala (L49-L50)

## How was this patch tested?

Added 2 tests for json and 2 tests for csv datasources. The tests checks that only subset of input dataset is used for schema inferring.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #20959 from MaxGekk/csv-sampling.
2018-04-30 09:45:22 +08:00
hyukjinkwon 56f501e1c0 [MINOR][DOCS] Fix a broken link for Arrow's supported types in the programming guide
## What changes were proposed in this pull request?

This PR fixes a broken link for Arrow's supported types in the programming guide.

## How was this patch tested?

Manually tested via `SKIP_API=1 jekyll watch`.

"Supported SQL Types" here in https://spark.apache.org/docs/latest/sql-programming-guide.html#enabling-for-conversion-tofrom-pandas is broken. It should be https://spark.apache.org/docs/latest/sql-programming-guide.html#supported-sql-types

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21191 from HyukjinKwon/minor-arrow-link.
2018-04-30 09:40:46 +08:00
Maxim Gekk bd14da6fd5 [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support custom encoding for json files
## What changes were proposed in this pull request?

I propose new option for JSON datasource which allows to specify encoding (charset) of input and output files. Here is an example of using of the option:

```
spark.read.schema(schema)
  .option("multiline", "true")
  .option("encoding", "UTF-16LE")
  .json(fileName)
```

If the option is not specified, charset auto-detection mechanism is used by default.

The option can be used for saving datasets to jsons. Currently Spark is able to save datasets into json files in `UTF-8` charset only. The changes allow to save data in any supported charset. Here is the approximate list of supported charsets by Oracle Java SE: https://docs.oracle.com/javase/8/docs/technotes/guides/intl/encoding.doc.html . An user can specify the charset of output jsons via the charset option like `.option("charset", "UTF-16BE")`. By default the output charset is still `UTF-8` to keep backward compatibility.

The solution has the following restrictions for per-line mode (`multiline = false`):

- If charset is different from UTF-8, the lineSep option must be specified. The option required because Hadoop LineReader cannot detect the line separator correctly. Here is the ticket for solving the issue: https://issues.apache.org/jira/browse/SPARK-23725

- Encoding with [BOM](https://en.wikipedia.org/wiki/Byte_order_mark) are not supported. For example, the `UTF-16` and `UTF-32` encodings are blacklisted. The problem can be solved by https://github.com/MaxGekk/spark-1/pull/2

## How was this patch tested?

I added the following tests:
- reads an json file in `UTF-16LE` encoding with BOM in `multiline` mode
- read json file by using charset auto detection (`UTF-32BE` with BOM)
- read json file using of user's charset (`UTF-16LE`)
- saving in `UTF-32BE` and read the result by standard library (not by Spark)
- checking that default charset is `UTF-8`
- handling wrong (unsupported) charset

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #20937 from MaxGekk/json-encoding-line-sep.
2018-04-29 11:25:31 +08:00
Yuming Wang 4df51361a5 [SPARK-22732][SS][FOLLOW-UP] Fix MemorySinkV2 toString error
## What changes were proposed in this pull request?

Fix `MemorySinkV2` toString() error

## How was this patch tested?

N/A

Author: Yuming Wang <yumwang@ebay.com>

Closes #21170 from wangyum/SPARK-22732.
2018-04-28 16:57:41 +08:00
Marco Gaido ad94e8592b [SPARK-23736][SQL][FOLLOWUP] Error message should contains SQL types
## What changes were proposed in this pull request?

In the error messages we should return the SQL types (like `string` rather than the internal types like `StringType`).

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21181 from mgaido91/SPARK-23736_followup.
2018-04-28 10:47:43 +08:00
Jungtaek Lim 1fb46f30f8 [SPARK-23688][SS] Refactor tests away from rate source
## What changes were proposed in this pull request?

Replace rate source with memory source in continuous mode test suite. Keep using "rate" source if the tests intend to put data periodically in background, or need to put short source name to load, since "memory" doesn't have provider for source.

## How was this patch tested?

Ran relevant test suite from IDE.

Author: Jungtaek Lim <kabhwan@gmail.com>

Closes #21152 from HeartSaVioR/SPARK-23688.
2018-04-28 09:55:56 +08:00
Juliusz Sompolski 8614edd445 [SPARK-24104] SQLAppStatusListener overwrites metrics onDriverAccumUpdates instead of updating them
## What changes were proposed in this pull request?

Event `SparkListenerDriverAccumUpdates` may happen multiple times in a query - e.g. every `FileSourceScanExec` and `BroadcastExchangeExec` call `postDriverMetricUpdates`.
In Spark 2.2 `SQLListener` updated the map with new values. `SQLAppStatusListener` overwrites it.
Unless `update` preserved it in the KV store (dependant on `exec.lastWriteTime`), only the metrics from the last operator that does `postDriverMetricUpdates` are preserved.

## How was this patch tested?

Unit test added.

Author: Juliusz Sompolski <julek@databricks.com>

Closes #21171 from juliuszsompolski/SPARK-24104.
2018-04-27 14:14:28 -07:00
Dilip Biswal 3fd297af6d [SPARK-24085][SQL] Query returns UnsupportedOperationException when scalar subquery is present in partitioning expression
## What changes were proposed in this pull request?
In this case, the partition pruning happens before the planning phase of scalar subquery expressions.
For scalar subquery expressions, the planning occurs late in the cycle (after the physical planning)  in "PlanSubqueries" just before execution. Currently we try to execute the scalar subquery expression as part of partition pruning and fail as it implements Unevaluable.

The fix attempts to ignore the Subquery expressions from partition pruning computation. Another option can be to somehow plan the subqueries before the partition pruning. Since this may not be a commonly occuring expression, i am opting for a simpler fix.

Repro
``` SQL
CREATE TABLE test_prc_bug (
id_value string
)
partitioned by (id_type string)
location '/tmp/test_prc_bug'
stored as parquet;

insert into test_prc_bug values ('1','a');
insert into test_prc_bug values ('2','a');
insert into test_prc_bug values ('3','b');
insert into test_prc_bug values ('4','b');

select * from test_prc_bug
where id_type = (select 'b');
```
## How was this patch tested?
Added test in SubquerySuite and hive/SQLQuerySuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21174 from dilipbiswal/spark-24085.
2018-04-27 11:43:29 -07:00
Patrick McGloin 2824f12b8b [SPARK-23565][SS] New error message for structured streaming sources assertion
## What changes were proposed in this pull request?

A more informative message to tell you why a structured streaming query cannot continue if you have added more sources, than there are in the existing checkpoint offsets.

## How was this patch tested?

I added a Unit Test.

Author: Patrick McGloin <mcgloin.patrick@gmail.com>

Closes #20946 from patrickmcgloin/master.
2018-04-27 23:04:14 +08:00
eric-maynard 109935fc5d [SPARK-23830][YARN] added check to ensure main method is found
## What changes were proposed in this pull request?

When a user specifies the wrong class -- or, in fact, a class instead of an object -- Spark throws an NPE which is not useful for debugging. This was reported in [SPARK-23830](https://issues.apache.org/jira/browse/SPARK-23830). This PR adds a check to ensure the main method was found and logs a useful error in the event that it's null.

## How was this patch tested?

* Unit tests + Manual testing
* The scope of the changes is very limited

Author: eric-maynard <emaynard@cloudera.com>
Author: Eric Maynard <emaynard@cloudera.com>

Closes #21168 from eric-maynard/feature/SPARK-23830.
2018-04-27 15:25:07 +08:00
Dongjoon Hyun 8aa1d7b0ed [SPARK-23355][SQL] convertMetastore should not ignore table properties
## What changes were proposed in this pull request?

Previously, SPARK-22158 fixed for `USING hive` syntax. This PR aims to fix for `STORED AS` syntax. Although the test case covers ORC part, the patch considers both `convertMetastoreOrc` and `convertMetastoreParquet`.

## How was this patch tested?

Pass newly added test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20522 from dongjoon-hyun/SPARK-22158-2.
2018-04-27 11:00:41 +08:00
zhoukang 9ee9fcf522 [SPARK-24083][YARN] Log stacktrace for uncaught exception
## What changes were proposed in this pull request?

Log stacktrace for uncaught exception

## How was this patch tested?
UT and manually test

Author: zhoukang <zhoukang199191@gmail.com>

Closes #21151 from caneGuy/zhoukang/log-stacktrace.
2018-04-26 15:38:11 -07:00
hyukjinkwon f7435bec6a [SPARK-24044][PYTHON] Explicitly print out skipped tests from unittest module
## What changes were proposed in this pull request?

This PR proposes to remove duplicated dependency checking logics and also print out skipped tests from unittests.

For example, as below:

```
Skipped tests in pyspark.sql.tests with pypy:
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
...

Skipped tests in pyspark.sql.tests with python3:
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
...
```

Currently, it's not printed out in the console. I think we should better print out skipped tests in the console.

## How was this patch tested?

Manually tested. Also, fortunately, Jenkins has good environment to test the skipped output.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21107 from HyukjinKwon/skipped-tests-print.
2018-04-26 15:11:42 -07:00
Huaxin Gao 4f1e38649e [SPARK-24057][PYTHON] put the real data type in the AssertionError message
## What changes were proposed in this pull request?

Print out the data type in the AssertionError message to make it more meaningful.

## How was this patch tested?

I manually tested the changed code on my local, but didn't add any test.

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #21159 from huaxingao/spark-24057.
2018-04-26 14:21:22 -07:00
gatorsmile ce2f919f8d [SPARK-23799][SQL][FOLLOW-UP] FilterEstimation.evaluateInSet produces wrong stats for STRING
## What changes were proposed in this pull request?
`colStat.min` AND `colStat.max` are empty for string type. Thus, `evaluateInSet` should not return zero when either `colStat.min` or `colStat.max`.

## How was this patch tested?
Added a test case.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #21147 from gatorsmile/cached.
2018-04-26 19:07:13 +08:00
Tathagata Das d1eb8d3ddc [SPARK-24094][SS][MINOR] Change description strings of v2 streaming sources to reflect the change
## What changes were proposed in this pull request?

This makes it easy to understand at runtime which version is running. Great for debugging production issues.

## How was this patch tested?
Not necessary.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21160 from tdas/SPARK-24094.
2018-04-25 23:24:05 -07:00
jerryshao ffaf0f9fd4 [SPARK-24062][THRIFT SERVER] Fix SASL encryption cannot enabled issue in thrift server
## What changes were proposed in this pull request?

For the details of the exception please see [SPARK-24062](https://issues.apache.org/jira/browse/SPARK-24062).

The issue is:

Spark on Yarn stores SASL secret in current UGI's credentials, this credentials will be distributed to AM and executors, so that executors and drive share the same secret to communicate. But STS/Hive library code will refresh the current UGI by UGI's loginFromKeytab() after Spark application is started, this will create a new UGI in the current driver's context with empty tokens and secret keys, so secret key is lost in the current context's UGI, that's why Spark driver throws secret key not found exception.

In Spark 2.2 code, Spark also stores this secret key in SecurityManager's class variable, so even UGI is refreshed, the secret is still existed in the object, so STS with SASL can still be worked in Spark 2.2. But in Spark 2.3, we always search key from current UGI, which makes it fail to work in Spark 2.3.

To fix this issue, there're two possible solutions:

1. Fix in STS/Hive library, when a new UGI is refreshed, copy the secret key from original UGI to the new one. The difficulty is that some codes to refresh the UGI is existed in Hive library, which makes us hard to change the code.
2. Roll back the logics in SecurityManager to match Spark 2.2, so that this issue can be fixed.

2nd solution seems a simple one. So I will propose a PR with 2nd solution.

## How was this patch tested?

Verified in local cluster.

CC vanzin  tgravescs  please help to review. Thanks!

Author: jerryshao <sshao@hortonworks.com>

Closes #21138 from jerryshao/SPARK-24062.
2018-04-26 13:27:33 +08:00
Marco Gaido cd10f9df82 [SPARK-23916][SQL] Add array_join function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_join`. The behavior of the function is based on Presto's one.

The function accepts an `array` of `string` which is to be joined, a `string` which is the delimiter to use between the items of the first argument and optionally a `string` which is used to replace `null` values.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21011 from mgaido91/SPARK-23916.
2018-04-26 13:37:13 +09:00
Marco Gaido 58c55cb4a6 [SPARK-23902][SQL] Add roundOff flag to months_between
## What changes were proposed in this pull request?

HIVE-15511 introduced the `roundOff` flag in order to disable the rounding to 8 digits which is performed in `months_between`. Since this can be a computational intensive operation, skipping it may improve performances when the rounding is not needed.

## How was this patch tested?

modified existing UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21008 from mgaido91/SPARK-23902.
2018-04-26 12:19:20 +09:00
Maxim Gekk 3f1e999d3d [SPARK-23849][SQL] Tests for samplingRatio of json datasource
## What changes were proposed in this pull request?

Added the `samplingRatio` option to the `json()` method of PySpark DataFrame Reader. Improving existing tests for Scala API according to review of the PR: https://github.com/apache/spark/pull/20959

## How was this patch tested?

Added new test for PySpark, updated 2 existing tests according to reviews of https://github.com/apache/spark/pull/20959 and added new negative test

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21056 from MaxGekk/json-sampling.
2018-04-26 09:14:24 +08:00
hyukjinkwon 95a651339e [SPARK-24069][R] Add array_min / array_max functions
## What changes were proposed in this pull request?

This PR proposes to add array_max and array_min in R side too.

array_max:

```r
df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
mutated <- mutate(df, v1 = create_array(df$gear, df$am, df$carb))
head(select(mutated, array_max(mutated$v1)))
```

```
  array_max(v1)
1             4
2             4
3             4
4             3
5             3
6             3
```

array_min:

```r
df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
mutated <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp))
head(select(mutated, array_min(mutated$v1)))
```

```
  array_min(v1)
1             6
2             6
3             4
4             6
5             8
6             6
```

## How was this patch tested?

Unit tests were added in `R/pkg/tests/fulltests/test_sparkSQL.R` and manually tested. Documentation was manually built and verified.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21142 from HyukjinKwon/sparkr_array_min_array_max.
2018-04-26 09:12:38 +08:00
Wenchen Fan ac4ca7c4dd [SPARK-24012][SQL][TEST][FOLLOWUP] add unit test
## What changes were proposed in this pull request?

a followup of https://github.com/apache/spark/pull/21100

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21154 from cloud-fan/test.
2018-04-25 13:42:44 -07:00
Tathagata Das 396938ef02 [SPARK-24050][SS] Calculate input / processing rates correctly for DataSourceV2 streaming sources
## What changes were proposed in this pull request?

In some streaming queries, the input and processing rates are not calculated at all (shows up as zero) because MicroBatchExecution fails to associated metrics from the executed plan of a trigger with the sources in the logical plan of the trigger. The way this executed-plan-leaf-to-logical-source attribution works is as follows. With V1 sources, there was no way to identify which execution plan leaves were generated by a streaming source. So did a best-effort attempt to match logical and execution plan leaves when the number of leaves were same. In cases where the number of leaves is different, we just give up and report zero rates. An example where this may happen is as follows.

```
val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
val streamingInputDF = ...

val query = streamingInputDF.join(cachedStaticDF).writeStream....
```
In this case, the `cachedStaticDF` has multiple logical leaves, but in the trigger's execution plan it only has leaf because a cached subplan is represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in the number of leaves causing the input rates to be computed as zero.

With DataSourceV2, all inputs are represented in the executed plan using `DataSourceV2ScanExec`, each of which has a reference to the associated logical `DataSource` and `DataSourceReader`. So its easy to associate the metrics to the original streaming sources.

In this PR, the solution is as follows. If all the streaming sources in a streaming query as v2 sources, then use a new code path where the execution-metrics-to-source mapping is done directly. Otherwise we fall back to existing mapping logic.

## How was this patch tested?
- New unit tests using V2 memory source
- Existing unit tests using V1 source

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21126 from tdas/SPARK-24050.
2018-04-25 12:21:55 -07:00
Takeshi Yamamuro 20ca208bcd [SPARK-23880][SQL] Do not trigger any jobs for caching data
## What changes were proposed in this pull request?
This pr fixed code so that `cache` could prevent any jobs from being triggered.
For example, in the current master, an operation below triggers a actual job;
```
val df = spark.range(10000000000L)
  .filter('id > 1000)
  .orderBy('id.desc)
  .cache()
```
This triggers a job while the cache should be lazy. The problem is that, when creating `InMemoryRelation`, we build the RDD, which calls `SparkPlan.execute` and may trigger jobs, like sampling job for range partitioner, or broadcast job.

This pr removed the code to build a cached `RDD` in the constructor of `InMemoryRelation` and added `CachedRDDBuilder` to lazily build the `RDD` in `InMemoryRelation`. Then, the first call of `CachedRDDBuilder.cachedColumnBuffers` triggers a job to materialize the cache in  `InMemoryTableScanExec` .

## How was this patch tested?
Added tests in `CachedTableSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21018 from maropu/SPARK-23880.
2018-04-25 19:06:18 +08:00
liutang123 64e8408e6f [SPARK-24012][SQL] Union of map and other compatible column
## What changes were proposed in this pull request?
Union of map and other compatible column result in unresolved operator 'Union; exception

Reproduction
`spark-sql>select map(1,2), 'str' union all select map(1,2,3,null), 1`
Output:
```
Error in query: unresolved operator 'Union;;
'Union
:- Project [map(1, 2) AS map(1, 2)#106, str AS str#107]
:  +- OneRowRelation$
+- Project [map(1, cast(2 as int), 3, cast(null as int)) AS map(1, CAST(2 AS INT), 3, CAST(NULL AS INT))#109, 1 AS 1#108]
   +- OneRowRelation$
```
So, we should cast part of columns to be compatible when appropriate.

## How was this patch tested?
Added a test (query union of map and other columns) to SQLQueryTestSuite's union.sql.

Author: liutang123 <liutang123@yeah.net>

Closes #21100 from liutang123/SPARK-24012.
2018-04-25 18:10:51 +08:00
mn-mikke 5fea17b3be [SPARK-23821][SQL] Collection function: flatten
## What changes were proposed in this pull request?

This PR adds a new collection function that transforms an array of arrays into a single array. The PR comprises:
- An expression for flattening array structure
- Flatten function
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(Seq(1, 2), Seq(4, 5)),
  Seq(null, Seq(1))
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(flatten($"i")).debugCodegen
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         boolean filter_value = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull)) {
/* 040 */           filter_value = inputadapter_isNull;
/* 041 */         }
/* 042 */         if (!filter_value) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull = inputadapter_isNull;
/* 047 */         ArrayData project_value = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull) {
/* 050 */           for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) {
/* 051 */             project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */           }
/* 053 */           if (!project_isNull) {
/* 054 */             long project_numElements = 0;
/* 055 */             for (int z = 0; z < inputadapter_value.numElements(); z++) {
/* 056 */               project_numElements += inputadapter_value.getArray(z).numElements();
/* 057 */             }
/* 058 */             if (project_numElements > 2147483632) {
/* 059 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 060 */                 project_numElements + " elements due to exceeding the array size limit 2147483632.");
/* 061 */             }
/* 062 */
/* 063 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 064 */               project_numElements,
/* 065 */               4);
/* 066 */             if (project_size > 2147483632) {
/* 067 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 068 */                 project_size + " bytes of data due to exceeding the limit 2147483632" +
/* 069 */                 " bytes for UnsafeArrayData.");
/* 070 */             }
/* 071 */
/* 072 */             byte[] project_array = new byte[(int)project_size];
/* 073 */             UnsafeArrayData project_tempArrayData = new UnsafeArrayData();
/* 074 */             Platform.putLong(project_array, 16, project_numElements);
/* 075 */             project_tempArrayData.pointTo(project_array, 16, (int)project_size);
/* 076 */             int project_counter = 0;
/* 077 */             for (int k = 0; k < inputadapter_value.numElements(); k++) {
/* 078 */               ArrayData arr = inputadapter_value.getArray(k);
/* 079 */               for (int l = 0; l < arr.numElements(); l++) {
/* 080 */                 if (arr.isNullAt(l)) {
/* 081 */                   project_tempArrayData.setNullAt(project_counter);
/* 082 */                 } else {
/* 083 */                   project_tempArrayData.setInt(
/* 084 */                     project_counter,
/* 085 */                     arr.getInt(l)
/* 086 */                   );
/* 087 */                 }
/* 088 */                 project_counter++;
/* 089 */               }
/* 090 */             }
/* 091 */             project_value = project_tempArrayData;
/* 092 */
/* 093 */           }
/* 094 */
/* 095 */         }
```
### Non-primitive type
```
val df = Seq(
  Seq(Seq("a", "b"), Seq(null, "d")),
  Seq(null, Seq("a"))
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(flatten($"s")).debugCodegen
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         boolean filter_value = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull)) {
/* 040 */           filter_value = inputadapter_isNull;
/* 041 */         }
/* 042 */         if (!filter_value) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull = inputadapter_isNull;
/* 047 */         ArrayData project_value = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull) {
/* 050 */           for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) {
/* 051 */             project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */           }
/* 053 */           if (!project_isNull) {
/* 054 */             long project_numElements = 0;
/* 055 */             for (int z = 0; z < inputadapter_value.numElements(); z++) {
/* 056 */               project_numElements += inputadapter_value.getArray(z).numElements();
/* 057 */             }
/* 058 */             if (project_numElements > 2147483632) {
/* 059 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 060 */                 project_numElements + " elements due to exceeding the array size limit 2147483632.");
/* 061 */             }
/* 062 */
/* 063 */             Object[] project_arrayObject = new Object[(int)project_numElements];
/* 064 */             int project_counter = 0;
/* 065 */             for (int k = 0; k < inputadapter_value.numElements(); k++) {
/* 066 */               ArrayData arr = inputadapter_value.getArray(k);
/* 067 */               for (int l = 0; l < arr.numElements(); l++) {
/* 068 */                 project_arrayObject[project_counter] = arr.getUTF8String(l);
/* 069 */                 project_counter++;
/* 070 */               }
/* 071 */             }
/* 072 */             project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObject);
/* 073 */
/* 074 */           }
/* 075 */
/* 076 */         }
```

Author: mn-mikke <mrkAha12346github>

Closes #20938 from mn-mikke/feature/array-api-flatten-to-master.
2018-04-25 11:19:08 +09:00
Jose Torres d6c26d1c9a [SPARK-24038][SS] Refactor continuous writing to its own class
## What changes were proposed in this pull request?

Refactor continuous writing to its own class.

See WIP https://github.com/jose-torres/spark/pull/13 for the overall direction this is going, but I think this PR is very isolated and necessary anyway.

## How was this patch tested?

existing unit tests - refactoring only

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #21116 from jose-torres/SPARK-24038.
2018-04-24 17:06:03 -07:00
Tathagata Das 7b1e6523af [SPARK-24056][SS] Make consumer creation lazy in Kafka source for Structured streaming
## What changes were proposed in this pull request?

Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. However, we create dummy KafkaMicroBatchReader to get the schema and immediately stop it. Its better to make the consumer creation lazy, it will be created on the first attempt to fetch offsets using the KafkaOffsetReader.

## How was this patch tested?
Existing unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21134 from tdas/SPARK-24056.
2018-04-24 14:33:33 -07:00
WeichenXu 379bffa052 [SPARK-23990][ML] Instruments logging improvements - ML regression package
## What changes were proposed in this pull request?

Instruments logging improvements - ML regression package

I add an `OptionalInstrument` class which used in `WeightLeastSquares` and `IterativelyReweightedLeastSquares`.

## How was this patch tested?

N/A

Author: WeichenXu <weichen.xu@databricks.com>

Closes #21078 from WeichenXu123/inst_reg.
2018-04-24 11:02:22 -07:00
Liang-Chi Hsieh 83013752e3 [SPARK-23455][ML] Default Params in ML should be saved separately in metadata
## What changes were proposed in this pull request?

We save ML's user-supplied params and default params as one entity in metadata. During loading the saved models, we set all the loaded params into created ML model instances as user-supplied params.

It causes some problems, e.g., if we strictly disallow some params to be set at the same time, a default param can fail the param check because it is treated as user-supplied param after loading.

The loaded default params should not be set as user-supplied params. We should save ML default params separately in metadata.

For backward compatibility, when loading metadata, if it is a metadata file from previous Spark, we shouldn't raise error if we can't find the default param field.

## How was this patch tested?

Pass existing tests and added tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20633 from viirya/save-ml-default-params.
2018-04-24 10:40:25 -07:00