Commit graph

24535 commits

Author SHA1 Message Date
Dongjoon Hyun ea0e119f84 [SPARK-28111][BUILD] Upgrade xbean-asm7-shaded to 4.14
## What changes were proposed in this pull request?

This PR aims to update `xbean-asm7-shaded` to bring [XBEAN-318](https://issues.apache.org/jira/browse/XBEAN-318) which is helpful to log the class definition reading failures.
- https://issues.apache.org/jira/projects/XBEAN/versions/12345220

## How was this patch tested?

Pass the Jenkins.

Closes #24914 from dongjoon-hyun/SPARK-28111.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-20 07:59:59 -07:00
sychen d9697fedf5 [SPARK-28012][SQL] Hive UDF supports struct type foldable expression
## What changes were proposed in this pull request?

Currently using hive udf, the parameter is struct type, there will be an exception thrown.

No handler for Hive UDF 'xxxUDF': java.lang.RuntimeException: Hive doesn't support the constant type [StructType(StructField(name,StringType,true), StructField(value,DecimalType(3,1),true))]

## How was this patch tested?
added new UT

Closes #24846 from cxzl25/hive_udf_literal_struct_type.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-20 14:36:01 +09:00
Yuming Wang 4968f87168 [SPARK-23263][TEST] CTAS should update stat if autoUpdate statistics is enabled
## What changes were proposed in this pull request?
The [SPARK-27403](https://issues.apache.org/jira/browse/SPARK-27403) fixed CTAS cannot update statistics even if `spark.sql.statistics.size.autoUpdate.enabled` is enabled, as mentioned in [SPARK-23263](https://issues.apache.org/jira/browse/SPARK-23263).

This pr adds tests for that fix.

## How was this patch tested?

N/A

Closes #20430 from wangyum/SPARK-23263.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-20 14:19:10 +09:00
Gengliang Wang f5107614d6 [SPARK-28089][SQL] File source v2: support reading output of file streaming Sink
## What changes were proposed in this pull request?

File source V1 supports reading output of FileStreamSink as batch. https://github.com/apache/spark/pull/11897
We should support this in file source V2 as well. When reading with paths, we first check if there is metadata log of FileStreamSink. If yes, we use `MetadataLogFileIndex` for listing files; Otherwise, we use `InMemoryFileIndex`.

## How was this patch tested?

Unit test

Closes #24900 from gengliangwang/FileStreamV2.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-06-20 12:57:13 +08:00
WeichenXu b276788d57 [SPARK-27990][SQL][ML] Provide a way to recursively load data from datasource
## What changes were proposed in this pull request?

Provide a way to recursively load data from datasource.
I add a "recursiveFileLookup" option.

When "recursiveFileLookup" option turn on, then partition inferring is turned off and all files from the directory will be loaded recursively.

If some datasource explicitly specify the partitionSpec, then if user turn on "recursive" option, then exception will be thrown.

## How was this patch tested?

Unit tests.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #24830 from WeichenXu123/recursive_ds.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-06-20 12:43:01 +08:00
Josh Rosen ec032cea4f [SPARK-28112][TEST] Fix Kryo exception perf. bottleneck in tests due to absence of ML/MLlib classes
## What changes were proposed in this pull request?
In a nutshell, it looks like the absence of ML / MLlib classes on the classpath causes code in KryoSerializer to throw and catch ClassNotFoundExceptions whenever instantiating a new serializer in newInstance(). This isn't a performance problem in production (since MLlib is on the classpath there) but it's a huge issue in tests and appears to account for an enormous amount of test time

We can address this problem by reducing the total number of ClassNotFoundExceptions by performing the class existence checks once and storing the results in KryoSerializer instances rather than repeating the checks on each newInstance() call.

## How was this patch tested?
The existing tests.

Authored-by: Josh Rosen <joshrosendatabricks.com>

Closes #24916 from gatorsmile/kryoException.

Lead-authored-by: Josh Rosen <rosenville@gmail.com>
Co-authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: Josh Rosen <rosenville@gmail.com>
2019-06-19 19:06:22 -07:00
Josh Rosen 6b27ad5ea1 [SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libraries fail to initialize
## What changes were proposed in this pull request?

This PR fixes a performance problem in environments where `lz4-java`'s native JNI libraries fail to initialize.

Spark's uses `lz4-java` for LZ4 compression. Under the hood, the `LZ4BlockInputStream` and `LZ4BlockOutputStream` constructors call `LZ4Factory.fastestInstance()`, which attempts to load JNI libraries and falls back on Java implementations in case the JNI library cannot be loaded or initialized.

If the LZ4 JNI libraries are present on the library load path (`Native.isLoaded()`) but cannot be initialized (e.g. due to breakage caused by shading) then an exception will be thrown and caught, triggering fallback to `fastestJavaInstance()` (a non-JNI implementation).

Unfortunately, the LZ4 library does not cache the fact that the JNI library failed during initialization, so every call to `LZ4Factory.fastestInstance()` re-attempts (and fails) to initialize the native code. These initialization attempts are performed in a `static synchronized` method, so exceptions from failures are thrown while holding shared monitors and this causes monitor-contention performance issues. Here's an example stack trace showing the problem:

```java

java.lang.Throwable.fillInStackTrace(Native Method)
java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding Monitor(java.lang.NoClassDefFoundError441628568})
java.lang.Throwable.<init>(Throwable.java:265)
java.lang.Error.<init>(Error.java:70)
java.lang.LinkageError.<init>(LinkageError.java:55)
java.lang.NoClassDefFoundError.<init>(NoClassDefFoundError.java:59)
shaded.net.jpountz.lz4.LZ4JNICompressor.compress(LZ4JNICompressor.java:36)
shaded.net.jpountz.lz4.LZ4Factory.<init>(LZ4Factory.java:200)
shaded.net.jpountz.lz4.LZ4Factory.instance(LZ4Factory.java:51)
shaded.net.jpountz.lz4.LZ4Factory.nativeInstance(LZ4Factory.java:84) => holding Monitor(java.lang.Class1475983836})
shaded.net.jpountz.lz4.LZ4Factory.fastestInstance(LZ4Factory.java:157)
shaded.net.jpountz.lz4.LZ4BlockOutputStream.<init>(LZ4BlockOutputStream.java:135)
org.apache.spark.io.LZ4CompressionCodec.compressedOutputStream(CompressionCodec.scala:122)
org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:156)
org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:131)
org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:120)
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:211)
[...]
```

To avoid this problem, this PR modifies Spark's `LZ4CompressionCodec` to call `fastestInstance()` itself and cache the result (which is safe because these factories [are thread-safe](https://github.com/lz4/lz4-java/issues/82)).

## How was this patch tested?

Existing unit tests.

Closes #24905 from JoshRosen/lz4-factory-flags.

Lead-authored-by: Josh Rosen <rosenville@gmail.com>
Co-authored-by: Josh Rosen <joshrosen@stripe.com>
Signed-off-by: Josh Rosen <rosenville@gmail.com>
2019-06-19 15:26:26 -07:00
Josh Rosen fc65e0fe2c [SPARK-27839][SQL] Change UTF8String.replace() to operate on UTF8 bytes
## What changes were proposed in this pull request?

This PR significantly improves the performance of `UTF8String.replace()` by performing direct replacement over UTF8 bytes instead of decoding those bytes into Java Strings.

In cases where the search string is not found (i.e. no replacements are performed, a case which I expect to be common) this new implementation performs no object allocation or memory copying.

My implementation is modeled after `commons-lang3`'s `StringUtils.replace()` method. As part of my implementation, I needed a StringBuilder / resizable buffer, so I moved `UTF8StringBuilder` from the `catalyst` package to `unsafe`.

## How was this patch tested?

Copied tests from `StringExpressionSuite` to `UTF8StringSuite` and added a couple of new cases.

To evaluate performance, I did some quick local benchmarking by running the following code in `spark-shell` (with Java 1.8.0_191):

```scala
import org.apache.spark.unsafe.types.UTF8String

def benchmark(text: String, search: String, replace: String) {
  val utf8Text = UTF8String.fromString(text)
  val utf8Search = UTF8String.fromString(search)
  val utf8Replace = UTF8String.fromString(replace)

  val start = System.currentTimeMillis
  var i = 0
  while (i < 1000 * 1000 * 100) {
    utf8Text.replace(utf8Search, utf8Replace)
    i += 1
  }
  val end = System.currentTimeMillis

  println(end - start)
}

benchmark("ABCDEFGH", "DEF", "ZZZZ")  // replacement occurs
benchmark("ABCDEFGH", "Z", "")  // no replacement occurs
```

On my laptop this took ~54 / ~40 seconds seconds before this patch's changes and ~6.5 / ~3.8 seconds afterwards.

Closes #24707 from JoshRosen/faster-string-replace.

Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: Josh Rosen <rosenville@gmail.com>
2019-06-19 15:21:26 -07:00
Yuming Wang fe5145ede2 [SPARK-28109][SQL] Fix TRIM(type trimStr FROM str) returns incorrect value
## What changes were proposed in this pull request?

[SPARK-28093](https://issues.apache.org/jira/browse/SPARK-28093) fixed `TRIM/LTRIM/RTRIM('str', 'trimStr')` returns an incorrect value, but that fix introduced a new bug, `TRIM(type trimStr FROM str)` returns an incorrect value. This pr fix this issue.

## How was this patch tested?

unit tests and manual tests:
Before this PR:
```sql
spark-sql> SELECT trim('yxTomxx', 'xyz'), trim(BOTH 'xyz' FROM 'yxTomxx');
Tom	z
spark-sql> SELECT trim('xxxbarxxx', 'x'), trim(BOTH 'x' FROM 'xxxbarxxx');
bar
spark-sql> SELECT ltrim('zzzytest', 'xyz'), trim(LEADING 'xyz' FROM 'zzzytest');
test	xyz
spark-sql> SELECT ltrim('zzzytestxyz', 'xyz'), trim(LEADING 'xyz' FROM 'zzzytestxyz');
testxyz
spark-sql> SELECT ltrim('xyxXxyLAST WORD', 'xy'), trim(LEADING 'xy' FROM 'xyxXxyLAST WORD');
XxyLAST WORD
spark-sql> SELECT rtrim('testxxzx', 'xyz'), trim(TRAILING 'xyz' FROM 'testxxzx');
test	xy
spark-sql> SELECT rtrim('xyztestxxzx', 'xyz'), trim(TRAILING 'xyz' FROM 'xyztestxxzx');
xyztest
spark-sql> SELECT rtrim('TURNERyxXxy', 'xy'), trim(TRAILING 'xy' FROM 'TURNERyxXxy');
TURNERyxX
```
After this PR:
```sql
spark-sql> SELECT trim('yxTomxx', 'xyz'), trim(BOTH 'xyz' FROM 'yxTomxx');
Tom     Tom
spark-sql> SELECT trim('xxxbarxxx', 'x'), trim(BOTH 'x' FROM 'xxxbarxxx');
bar     bar
spark-sql> SELECT ltrim('zzzytest', 'xyz'), trim(LEADING 'xyz' FROM 'zzzytest');
test    test
spark-sql> SELECT ltrim('zzzytestxyz', 'xyz'), trim(LEADING 'xyz' FROM 'zzzytestxyz');
testxyz testxyz
spark-sql> SELECT ltrim('xyxXxyLAST WORD', 'xy'), trim(LEADING 'xy' FROM 'xyxXxyLAST WORD');
XxyLAST WORD    XxyLAST WORD
spark-sql> SELECT rtrim('testxxzx', 'xyz'), trim(TRAILING 'xyz' FROM 'testxxzx');
test    test
spark-sql> SELECT rtrim('xyztestxxzx', 'xyz'), trim(TRAILING 'xyz' FROM 'xyztestxxzx');
xyztest xyztest
spark-sql> SELECT rtrim('TURNERyxXxy', 'xy'), trim(TRAILING 'xy' FROM 'TURNERyxXxy');
TURNERyxX       TURNERyxX
```
And PostgreSQL:
```sql
postgres=# SELECT trim('yxTomxx', 'xyz'), trim(BOTH 'xyz' FROM 'yxTomxx');
 btrim | btrim
-------+-------
 Tom   | Tom
(1 row)

postgres=# SELECT trim('xxxbarxxx', 'x'), trim(BOTH 'x' FROM 'xxxbarxxx');
 btrim | btrim
-------+-------
 bar   | bar
(1 row)

postgres=# SELECT ltrim('zzzytest', 'xyz'), trim(LEADING 'xyz' FROM 'zzzytest');
 ltrim | ltrim
-------+-------
 test  | test
(1 row)

postgres=# SELECT ltrim('zzzytestxyz', 'xyz'), trim(LEADING 'xyz' FROM 'zzzytestxyz');
  ltrim  |  ltrim
---------+---------
 testxyz | testxyz
(1 row)

postgres=# SELECT ltrim('xyxXxyLAST WORD', 'xy'), trim(LEADING 'xy' FROM 'xyxXxyLAST WORD');
    ltrim     |    ltrim
--------------+--------------
 XxyLAST WORD | XxyLAST WORD
(1 row)

postgres=# SELECT rtrim('testxxzx', 'xyz'), trim(TRAILING 'xyz' FROM 'testxxzx');
 rtrim | rtrim
-------+-------
 test  | test
(1 row)

postgres=# SELECT rtrim('xyztestxxzx', 'xyz'), trim(TRAILING 'xyz' FROM 'xyztestxxzx');
  rtrim  |  rtrim
---------+---------
 xyztest | xyztest
(1 row)

postgres=# SELECT rtrim('TURNERyxXxy', 'xy'), trim(TRAILING 'xy' FROM 'TURNERyxXxy');
   rtrim   |   rtrim
-----------+-----------
 TURNERyxX | TURNERyxX
(1 row)
```

Closes #24911 from wangyum/SPARK-28109.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-19 12:47:18 -07:00
Dongjoon Hyun 630dfdf550 [SPARK-28101][DSTREAM][TEST] Fix Flaky Test: InputStreamsSuite.Modified files are correctly detected in JDK9+
## What changes were proposed in this pull request?

It seems that https://bugs.openjdk.java.net/browse/JDK-8068730 makes `InputStreamsSuite` very flaky.

<img width="903" alt="error" src="https://user-images.githubusercontent.com/9700541/59727067-017eb780-91e9-11e9-8bb0-ac5f4c1bc44d.png">

As we can see the Jenkins result, this can be reproduced frequently with JDK9+.
```
$ build/sbt "streaming/testOnly *.InputStreamsSuite"
[info] - Modified files are correctly detected. *** FAILED *** (134 milliseconds)
[info]   Set("renamed") did not equal Set() (InputStreamsSuite.scala:312)
[info]   org.scalatest.exceptions.TestFailedException:
```

The reason is the `renamed.txt`'s modification time becomes greater than the clock in JDK9+ and Spark ignored it with **not selected** message. In JDK8,  the modification time generated by this test case doesn't have `milliseconds` part.
```
Getting new files for time 1560896662000, ignoring files older than 1560896659679
file:/.../streaming/subdir/renamed.txt not selected as mod time 1560896662679 > current time 1560896662000
file:/.../streaming/subdir/existing ignored as mod time 1560896657679 <= ignore time 1560896659679
Finding new files took 0 ms
New files at time 1560896662000 ms:
```

## How was this patch tested?

Pass the Jenkins and manually repeat the following with JDK11 10 times.
```
$ build/sbt "streaming/testOnly *.InputStreamsSuite"
```

Closes #24904 from dongjoon-hyun/SPARK-28101.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-19 07:55:00 -07:00
Andrew-Crosby 36b327d479 [SPARK-28062][ML] Avoid unnecessary copy of coefficients vector in HuberAggregator
## What changes were proposed in this pull request?

Modifies the HuberAggregator class so that a copy of the coefficients vector isn't created every time that an instance is added. Follows the approach of LeastSquaresAggregator and uses transient lazy class variable to store the reused quantities. (See https://github.com/apache/spark/pull/14109 for explanation of the use of transient lazy variables)

On the test case in the linked JIRA, this change gives an order of magnitude performance improvement reducing the time taken to fit the model from 540 to 47 seconds.

## How was this patch tested?

Existing unit tests.
See https://issues.apache.org/jira/browse/SPARK-28062 for results from running a benchmark script.

Closes #24880 from Andrew-Crosby/spark-28062.

Authored-by: Andrew-Crosby <andrew.crosby@autotrader.co.uk>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-19 08:57:12 -05:00
zhengruifeng 9ec049601a [SPARK-28044][ML][PYTHON] MulticlassClassificationEvaluator support more metrics
## What changes were proposed in this pull request?

expose more metrics in evaluator: weightedTruePositiveRate/weightedFalsePositiveRate/weightedFMeasure/truePositiveRateByLabel/falsePositiveRateByLabel/precisionByLabel/recallByLabel/fMeasureByLabel

## How was this patch tested?
existing cases and add cases

Closes #24868 from zhengruifeng/multi_class_support_bylabel.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-19 08:56:15 -05:00
Yesheng Ma 7b7f16f2a7 [SPARK-27890][SQL] Improve SQL parser error message for character-only identifier with hyphens except those in expressions
## What changes were proposed in this pull request?

Current SQL parser's error message for hyphen-connected identifiers without surrounding backquotes(e.g. hyphen-table) is confusing for end users. A possible approach to tackle this is to explicitly capture these wrong usages in the SQL parser. In this way, the end users can fix these errors more quickly.

For example, for a simple query such as `SELECT * FROM test-table`, the original error message is
```
Error in SQL statement: ParseException:
mismatched input '-' expecting <EOF>(line 1, pos 18)
```
which can be confusing in a large query.

After the fix, the error message is:
```
Error in query:
Possibly unquoted identifier test-table detected. Please consider quoting it with back-quotes as `test-table`(line 1, pos 14)

== SQL ==
SELECT * FROM test-table
--------------^^^
```
which is easier for end users to identify the issue and fix.

We safely augmented the current grammar rule to explicitly capture these error cases. The error handling logic is implemented in the SQL parsing listener `PostProcessor`.

However, note that for cases such as `a - my-func(b)`, the parser can't actually tell whether this should be ``a -`my-func`(b) `` or `a - my - func(b)`. Therefore for these cases, we leave the parser as is. Also, in this patch we only provide better error messages for character-only identifiers.

## How was this patch tested?
Adding new unit tests.

Closes #24749 from yeshengm/hyphen-ident.

Authored-by: Yesheng Ma <kimi.ysma@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-06-18 21:51:15 -07:00
Yesheng Ma 15de6d0500 [SPARK-28096][SQL] Convert defs to lazy vals to avoid expensive reference computation in QueryPlan and Expression
## What changes were proposed in this pull request?

The original `references` and `validConstraints` implementations in a few `QueryPlan` and `Expression` classes are methods, which means unnecessary re-computation can happen at times. This PR resolves this problem by making these method `lazy val`s.

As shown in the following chart, the planning time(without cost-based optimization) was dramatically reduced after this optimization.
- The average planning time of TPC-DS queries was reduced by 19.63%.
- The planning time of the most time-consuming TPC-DS query (q64) was reduced by 43.03%.
- The running time for rule-based reordering joins(not cost-based join reordering) optimization, which are common in real-world OLAP queries,  was largely reduced.

![chart](https://user-images.githubusercontent.com/12269969/59721493-536a1200-91d6-11e9-9bfb-d7cb1e841a86.png)

Detailed stats are listed in the following spreadsheet (we warmed up the queries 5 iterations and then took average of the next 5 iterations).
[Lazy val benchmark.xlsx](https://github.com/apache/spark/files/3303530/Lazy.val.benchmark.xlsx)

## How was this patch tested?

Existing UTs.

Closes #24866 from yeshengm/plannode-micro-opt.

Authored-by: Yesheng Ma <kimi.ysma@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-06-18 21:13:50 -07:00
Ivan Vergiliev a5dcb82b5a [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
## What changes were proposed in this pull request?

`OrcFilters.createBuilder` has exponential complexity in the height of the filter tree due to the way the check-and-build pattern is implemented. We've hit this in production by passing a `Column` filter to Spark directly, with a job taking multiple hours for a simple set of ~30 filters. This PR changes the checking logic so that the conversion has linear complexity in the size of the tree instead of exponential in its height.

Right now, due to the way ORC `SearchArgument` works, the code is forced to do two separate phases when converting a given Spark filter to an ORC filter:
1. Check if the filter is convertible.
2. Only if the check in 1. succeeds, perform the actual conversion into the resulting ORC filter.

However, there's one detail which is the culprit in the exponential complexity: phases 1. and 2. are both done using the exact same method. The resulting exponential complexity is easiest to see in the `NOT` case - consider the following code:

```
val f1 = col("id") === lit(5)
val f2 = !f1
val f3 = !f2
val f4 = !f3
val f5 = !f4
```

Now, when we run `createBuilder` on `f5`, we get the following behaviour:
1. call `createBuilder(f4)` to check if the child `f4` is convertible
2. call `createBuilder(f4)` to actually convert it

This seems fine when looking at a single level, but what actually ends up happening is:
- `createBuilder(f3)` will then recursively be called 4 times - 2 times in step 1., and two times in step 2.
- `createBuilder(f2)` will be called 8 times - 4 times in each top-level step, 2 times in each sub-step.
- `createBuilder(f1)` will be called 16 times.

As a result, having a tree of height > 30 leads to billions of calls to `createBuilder`, heap allocations, and so on and can take multiple hours.

The way this PR solves this problem is by separating the `check` and `convert` functionalities into separate functions. This way, the call to `createBuilder` on `f5` above would look like this:
1. call `isConvertible(f4)` to check if the child `f4` is convertible - amortized constant complexity
2. call `createBuilder(f4)` to actually convert it - linear complexity in the size of the subtree.

This way, we get an overall complexity that's linear in the size of the filter tree, allowing us to convert tree with 10s of thousands of nodes in milliseconds.

The reason this split (`check` and `build`) is possible is that the checking never actually depends on the actual building of the filter. The `check` part of `createBuilder` depends mainly on:
- `isSearchableType` for leaf nodes, and
- `check`-ing the child filters for composite nodes like NOT, AND and OR.
Situations like the `SearchArgumentBuilder` throwing an exception while building the resulting ORC filter are not handled right now - they just get thrown out of the class, and this change preserves this behaviour.

This PR extracts this part of the code to a separate class which allows the conversion to make very efficient checks to confirm that a given child is convertible before actually converting it.

Results:
Before:
- converting a skewed tree with a height of ~35 took about 6-7 hours.
- converting a skewed tree with hundreds or thousands of nodes would be completely impossible.

Now:
- filtering against a skewed tree with a height of 1500 in the benchmark suite finishes in less than 10 seconds.

## Steps to reproduce
```scala
val schema = StructType.fromDDL("col INT")
(20 to 30).foreach { width =>
  val whereFilter = (1 to width).map(i => EqualTo("col", i)).reduceLeft(Or)
  val start = System.currentTimeMillis()
  OrcFilters.createFilter(schema, Seq(whereFilter))
  println(s"With $width filters, conversion takes ${System.currentTimeMillis() - start} ms")
}
```

### Before this PR
```
With 20 filters, conversion takes 363 ms
With 21 filters, conversion takes 496 ms
With 22 filters, conversion takes 939 ms
With 23 filters, conversion takes 1871 ms
With 24 filters, conversion takes 3756 ms
With 25 filters, conversion takes 7452 ms
With 26 filters, conversion takes 14978 ms
With 27 filters, conversion takes 30519 ms
With 28 filters, conversion takes 60361 ms // 1 minute
With 29 filters, conversion takes 126575 ms // 2 minutes 6 seconds
With 30 filters, conversion takes 257369 ms // 4 minutes 17 seconds
```

### After this PR
```
With 20 filters, conversion takes 12 ms
With 21 filters, conversion takes 0 ms
With 22 filters, conversion takes 1 ms
With 23 filters, conversion takes 0 ms
With 24 filters, conversion takes 1 ms
With 25 filters, conversion takes 1 ms
With 26 filters, conversion takes 0 ms
With 27 filters, conversion takes 1 ms
With 28 filters, conversion takes 0 ms
With 29 filters, conversion takes 1 ms
With 30 filters, conversion takes 0 ms
```

## How was this patch tested?

There are no changes in behaviour, and the existing tests pass. Added new benchmarks that expose the problematic behaviour and they finish quickly with the changes applied.

Closes #24068 from IvanVergiliev/optimize-orc-filters.

Authored-by: Ivan Vergiliev <ivan.vergiliev@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-06-19 10:44:58 +08:00
Sean Owen e96dd82f12 [SPARK-28081][ML] Handle large vocab counts in word2vec
## What changes were proposed in this pull request?

The word2vec logic fails if a corpora has a word with count > 1e9. We should be able to handle very large counts generally better here by using longs to count.

This takes over https://github.com/apache/spark/pull/24814

## How was this patch tested?

Existing tests.

Closes #24893 from srowen/SPARK-28081.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-18 20:27:43 -05:00
Xiangrui Meng 7056e004ee [SPARK-27823][CORE] Refactor resource handling code
## What changes were proposed in this pull request?

Continue the work from https://github.com/apache/spark/pull/24821. Refactor resource handling code to make the code more readable. Major changes:

* Moved resource-related classes to `spark.resource` from `spark`.
* Added ResourceUtils and helper classes so we don't need to directly deal with Spark conf.
 * ResourceID: resource identifier and it provides conf keys
 * ResourceRequest/Allocation: abstraction for requested and allocated resources
* Added `TestResourceIDs` to reference commonly used resource IDs in tests like `spark.executor.resource.gpu`.

cc: tgravescs jiangxb1987 Ngone51

## How was this patch tested?

Unit tests for added utils and existing unit tests.

Closes #24856 from mengxr/SPARK-27823.

Lead-authored-by: Xiangrui Meng <meng@databricks.com>
Co-authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
2019-06-18 17:18:17 -07:00
Yuming Wang 2e3ae97668 [SPARK-28039][SQL][TEST] Port float4.sql
## What changes were proposed in this pull request?

This PR is to port float4.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/float4.sql

The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/expected/float4.out

When porting the test cases, found three PostgreSQL specific features that do not exist in Spark SQL:
[SPARK-28060](https://issues.apache.org/jira/browse/SPARK-28060): Float type can not accept some special inputs
[SPARK-28027](https://issues.apache.org/jira/browse/SPARK-28027): Spark SQL does not support prefix operator ``
[SPARK-28061](https://issues.apache.org/jira/browse/SPARK-28061): Support for converting float to binary format

Also, found a bug:
[SPARK-28024](https://issues.apache.org/jira/browse/SPARK-28024): Incorrect value when out of range

Also, found three inconsistent behavior:
[SPARK-27923](https://issues.apache.org/jira/browse/SPARK-27923): Spark SQL insert there bad inputs to NULL
[SPARK-28028](https://issues.apache.org/jira/browse/SPARK-28028): Cast numeric to integral type need round
[SPARK-27923](https://issues.apache.org/jira/browse/SPARK-27923): Spark SQL returns NULL when dividing by zero

## How was this patch tested?

N/A

Closes #24887 from wangyum/SPARK-28039.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-06-18 16:22:30 -07:00
Yuming Wang c7f0301477 [SPARK-28088][SQL] Enhance LPAD/RPAD function
## What changes were proposed in this pull request?

This pr enhances `LPAD`/`RPAD` function to make `pad` parameter optional.

PostgreSQL, Vertica, Teradata, Oracle and DB2 support make `pad` parameter optional. MySQL, Hive and Presto does not support make `pad` parameter optional. SQL Server does not have `lapd`/`rpad` function.
**PostgreSQL**:
```
postgres=# select substr(version(), 0, 16), lpad('hi', 5), rpad('hi', 5);
     substr      | lpad  | rpad
-----------------+-------+-------
 PostgreSQL 11.3 |    hi | hi
(1 row)
```
**Vertica**:
```
dbadmin=> select version(), lpad('hi', 5), rpad('hi', 5);
              version               | lpad  | rpad
------------------------------------+-------+-------
 Vertica Analytic Database v9.1.1-0 |    hi | hi
(1 row)
```
**Teradata**:
![image](https://user-images.githubusercontent.com/5399861/59656550-89a49300-91d0-11e9-9f26-ed554f49ea34.png)
**Oracle**:
![image](https://user-images.githubusercontent.com/5399861/59656591-a9d45200-91d0-11e9-8b0e-3e1f75983099.png)
**DB2**:
![image](https://user-images.githubusercontent.com/5399861/59656468-3e8a8000-91d0-11e9-8826-0d854ed7f397.png)

More details:
https://www.postgresql.org/docs/11/functions-string.html
https://docs.teradata.com/reader/kmuOwjp1zEYg98JsB8fu_A/e5w8LujIQDlVmRSww2E27A

## How was this patch tested?

unit tests

Closes #24899 from wangyum/SPARK-28088.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-18 14:08:18 -07:00
Yuming Wang bef5d9d6c3 [SPARK-28093][SQL] Fix TRIM/LTRIM/RTRIM function parameter order issue
## What changes were proposed in this pull request?

This pr fix `TRIM`/`LTRIM`/`RTRIM` function parameter order issue, otherwise:

```sql
spark-sql> SELECT trim('yxTomxx', 'xyz'), trim('xxxbarxxx', 'x');
z
spark-sql> SELECT ltrim('zzzytest', 'xyz'), ltrim('xyxXxyLAST WORD', 'xy');
xyz
spark-sql> SELECT rtrim('testxxzx', 'xyz'), rtrim('TURNERyxXxy', 'xy');
xy
spark-sql>
```

```sql
postgres=# SELECT trim('yxTomxx', 'xyz'), trim('xxxbarxxx', 'x');
 btrim | btrim
-------+-------
 Tom   | bar
(1 row)

postgres=# SELECT ltrim('zzzytest', 'xyz'), ltrim('xyxXxyLAST WORD', 'xy');
 ltrim |    ltrim
-------+--------------
 test  | XxyLAST WORD
(1 row)

postgres=# SELECT rtrim('testxxzx', 'xyz'), rtrim('TURNERyxXxy', 'xy');
 rtrim |   rtrim
-------+-----------
 test  | TURNERyxX
(1 row)
```

## How was this patch tested?

unit tests

Closes #24902 from wangyum/SPARK-28093.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-18 13:28:29 -07:00
maryannxue 1ada36b571 [SPARK-27783][SQL] Add customizable hint error handler
## What changes were proposed in this pull request?

Added an interface for handling hint errors, with a default implementation class that logs warnings in the callbacks.

## How was this patch tested?

Passed existing tests.

Closes #24653 from maryannxue/hint-handler.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-06-18 12:33:32 -07:00
Dongjoon Hyun ed280c23ca [SPARK-28072][SQL] Fix IncompatibleClassChangeError in FromUnixTime codegen on JDK9+
## What changes were proposed in this pull request?

With JDK9+, the generate **bytecode** of `FromUnixTime` raise `java.lang.IncompatibleClassChangeError` due to [JDK-8145148](https://bugs.openjdk.java.net/browse/JDK-8145148) . This is a blocker in [Apache Spark JDK11 Jenkins job](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7-jdk-11-ubuntu-testing/). Locally, this is reproducible by the following unit test suite with JDK9+.
```
$ build/sbt "catalyst/testOnly *.DateExpressionsSuite"
...
[info] org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite *** ABORTED *** (23 seconds, 75 milliseconds)
[info]   java.lang.IncompatibleClassChangeError: Method org.apache.spark.sql.catalyst.util.TimestampFormatter.apply(Ljava/lang/String;Ljava/time/ZoneId;Ljava/util/Locale;)Lorg/apache/spark/sql/catalyst/util/TimestampFormatter; must be InterfaceMeth
```

This bytecode issue is generated by `Janino` , so we replace `.apply` to `.MODULE$$.apply` and adds test coverage for similar codes.

## How was this patch tested?

Manually with the existing UTs by doing the following with JDK9+.
```
build/sbt "catalyst/testOnly *.DateExpressionsSuite"
```

Actually, this is the last JDK11 error in `catalyst` module. So, we can verify with the following, too.
```
$ build/sbt "project catalyst" test
...
[info] Total number of tests run: 3552
[info] Suites: completed 210, aborted 0
[info] Tests: succeeded 3552, failed 0, canceled 0, ignored 2, pending 0
[info] All tests passed.
[info] Passed: Total 3583, Failed 0, Errors 0, Passed 3583, Ignored 2
[success] Total time: 294 s, completed Jun 16, 2019, 10:15:08 PM
```

Closes #24889 from dongjoon-hyun/SPARK-28072.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-18 00:08:37 -07:00
Liang-Chi Hsieh b7bdc3111e [SPARK-28058][DOC] Add a note to doc of mode of CSV for column pruning
## What changes were proposed in this pull request?

When using `DROPMALFORMED` mode, corrupted records aren't dropped if malformed columns aren't read. This behavior is due to CSV parser column pruning. Current doc of `DROPMALFORMED` doesn't mention the effect of column pruning. Users will be confused by the fact that `DROPMALFORMED` mode doesn't work as expected.

Column pruning also affects other modes. This is a doc improvement to add a note to doc of `mode` to explain it.

## How was this patch tested?

N/A. This is just doc change.

Closes #24894 from viirya/SPARK-28058.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-18 13:48:32 +09:00
Yuming Wang ab6bb8fc1c [SPARK-28075][SQL] Enhance TRIM function
## What changes were proposed in this pull request?

The `TRIM` function accept these patterns:
```sql
TRIM(str)
TRIM(trimStr, str)
TRIM(BOTH trimStr FROM str)
TRIM(LEADING trimStr FROM str)
TRIM(TRAILING trimStr FROM str)
```
This pr add support other three patterns:
```sql
TRIM(BOTH FROM str)
TRIM(LEADING FROM str)
TRIM(TRAILING FROM str)
```

PostgreSQL, Vertica, MySQL, Teradata, Oracle and DB2 support these patterns. Hive, Presto and SQL Server does not support this feature.

**PostgreSQL**:
```sql
postgres=# select substr(version(), 0, 16), trim(BOTH from '    SparkSQL   '), trim(LEADING FROM '    SparkSQL   '), trim(TRAILING FROM '    SparkSQL   ');
     substr      |  btrim   |    ltrim    |    rtrim
-----------------+----------+-------------+--------------
 PostgreSQL 11.3 | SparkSQL | SparkSQL    |     SparkSQL
(1 row)
```
**Vertica**:
```
dbadmin=> select version(), trim(BOTH from '    SparkSQL   '), trim(LEADING FROM '    SparkSQL   '), trim(TRAILING FROM '    SparkSQL   ');
              version               |  btrim   |    ltrim    |    rtrim
------------------------------------+----------+-------------+--------------
 Vertica Analytic Database v9.1.1-0 | SparkSQL | SparkSQL    |     SparkSQL
(1 row)
```
**MySQL**:
```
mysql> select version(), trim(BOTH from '    SparkSQL   '), trim(LEADING FROM '    SparkSQL   '), trim(TRAILING FROM '    SparkSQL   ');
+-----------+-----------------------------------+--------------------------------------+---------------------------------------+
| version() | trim(BOTH from '    SparkSQL   ') | trim(LEADING FROM '    SparkSQL   ') | trim(TRAILING FROM '    SparkSQL   ') |
+-----------+-----------------------------------+--------------------------------------+---------------------------------------+
| 5.7.26    | SparkSQL                          | SparkSQL                             |     SparkSQL                          |
+-----------+-----------------------------------+--------------------------------------+---------------------------------------+
1 row in set (0.01 sec)
```
**Teradata**:
![image](https://user-images.githubusercontent.com/5399861/59587081-070bcd00-9117-11e9-8534-df547860b585.png)
**Oracle**:
![image](https://user-images.githubusercontent.com/5399861/59587003-cf048a00-9116-11e9-839e-90da9e5183e0.png)
**DB2**:
![image](https://user-images.githubusercontent.com/5399861/59587801-af6e6100-9118-11e9-80be-ee1f6bbbeceb.png)

## How was this patch tested?

unit tests

Closes #24891 from wangyum/SPARK-28075.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-06-18 12:26:10 +08:00
Xiangrui Meng 1b2448bc10 [SPARK-28056][PYTHON] add doc for SCALAR_ITER Pandas UDF
## What changes were proposed in this pull request?

Add docs for `SCALAR_ITER` Pandas UDF.

cc: WeichenXu123 HyukjinKwon

## How was this patch tested?

Tested example code manually.

Closes #24897 from mengxr/SPARK-28056.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2019-06-17 20:51:36 -07:00
wuyi bb17aec916 [SPARK-27666][CORE] Do not release lock while TaskContext already completed
## What changes were proposed in this pull request?

PythonRunner uses an asynchronous way, which produces elements in WriteThread but consumes elements in another thread, to execute task. When child operator, like take()/first(), does not consume all elements produced by WriteThread, task would finish before WriteThread and releases all locks on blocks. However, WriteThread would continue to produce elements by pulling elements from parent operator until it exhausts all elements. And at the time WriteThread exhausts all elements, it will try to release the corresponding block but hit a AssertionError since task has already released that lock previously.

#24542 previously fix this by catching AssertionError, so that we won't fail our executor.

However, when not using PySpark, issue still exists when user implements a custom RDD or task, which spawn a separate child thread to consume iterator from a cached parent RDD. Below is a demo which could easily reproduce the issue.

```
    val rdd0 = sc.parallelize(Range(0, 10), 1).cache()
    rdd0.collect()
    rdd0.mapPartitions { iter =>
      val t = new Thread(new Runnable {
        override def run(): Unit = {
          while(iter.hasNext) {
            println(iter.next())
            Thread.sleep(1000)
          }
        }
      })
      t.setDaemon(false)
      t.start()
      Iterator(0)
    }.collect()
    Thread.sleep(100000)
```

So, if we could prevent the separate thread from releasing lock on block when TaskContext has already completed, we won't hit this issue again.

## How was this patch tested?

Added in new unit test in RDDSuite.

Closes #24699 from Ngone51/SPARK-27666.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-06-18 10:15:44 +08:00
Bryan Cutler 90f80395af [SPARK-28041][PYTHON] Increase minimum supported Pandas to 0.23.2
## What changes were proposed in this pull request?

This increases the minimum supported version of Pandas to 0.23.2. Using a lower version will raise an error `Pandas >= 0.23.2 must be installed; however, your version was 0.XX`. Also, a workaround for using pyarrow with Pandas 0.19.2 was removed.

## How was this patch tested?

Existing Tests

Closes #24867 from BryanCutler/pyspark-increase-min-pandas-SPARK-28041.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-18 09:10:58 +09:00
Sean Owen 4576dfde19 [SPARK-28066][CORE] Optimize UTF8String.trim() for common case of no whitespace
## What changes were proposed in this pull request?

UTF8String.trim() allocates a new object even if the string has no whitespace, when it can just return itself. A simple check for this case makes the method about 3x faster in the common case.

## How was this patch tested?

Existing tests.

A rough benchmark of 90% strings without whitespace (at ends), and 10% that do have whitespace, suggests the average runtime goes from 20 ns to 6 ns.

Closes #24884 from srowen/SPARK-28066.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-17 08:49:11 -07:00
Mellacheruvu Sandeep b7b4452553 [SPARK-24898][DOC] Adding spark.checkpoint.compress to the docs
## What changes were proposed in this pull request?

Adding spark.checkpoint.compress configuration parameter to the documentation

![](https://user-images.githubusercontent.com/3538013/59580409-a7013080-90ee-11e9-9b2c-3d29015f597e.png)

## How was this patch tested?

Checked locally for jeykyll html docs. Also validated the html for any issues.

Closes #24883 from sandeepvja/SPARK-24898.

Authored-by: Mellacheruvu Sandeep <mellacheruvu.sandeep@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-16 22:54:08 -07:00
Dongjoon Hyun d6a479b1f8 [SPARK-28063][SQL] Replace deprecated .newInstance() in DSv2 Catalogs
## What changes were proposed in this pull request?

This PR aims to replace deprecated `.newInstance()` in DSv2 `Catalogs` and distinguish the plugin class errors more. According to the JDK11 build log, there is no other new instance.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7-jdk-11-ubuntu-testing/978/consoleFull

SPARK-25984 removes all instances of the deprecated `.newInstance()` usages at Nov 10, 2018, but this was added at SPARK-24252 on March 8, 2019.

## How was this patch tested?

Pass the Jenkins with the updated test case.

Closes #24882 from dongjoon-hyun/SPARK-28063.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-16 19:58:02 -07:00
Takuya UESHIN 5ae1a6bf0d [SPARK-28052][SQL] Make ArrayExists follow the three-valued boolean logic.
## What changes were proposed in this pull request?

Currently `ArrayExists` always returns boolean values (if the arguments are not `null`), but it should follow the three-valued boolean logic:

- `true` if the predicate holds at least one `true`
- otherwise, `null` if the predicate holds `null`
- otherwise, `false`

This behavior change is made to match Postgres' equivalent function `ANY/SOME (array)`'s behavior: https://www.postgresql.org/docs/9.6/functions-comparisons.html#AEN21174

## How was this patch tested?

Modified tests and existing tests.

Closes #24873 from ueshin/issues/SPARK-28052/fix_exists.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-15 10:48:06 -07:00
WeichenXu 6d441dcdc6 [SPARK-26412][PYSPARK][SQL] Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series
## What changes were proposed in this pull request?

Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series.
Note the UDF input args will be always one iterator:
* if the udf take only column as input, the iterator's element will be pd.Series (corresponding to the column values batch)
* if the udf take multiple columns as inputs, the iterator's element will be a tuple composed of multiple `pd.Series`s, each one corresponding to the multiple columns as inputs (keep the same order). For example:
```
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def the_udf(iterator):
    for col1_batch, col2_batch in iterator:
        yield col1_batch + col2_batch

df.select(the_udf("col1", "col2"))
```
The udf above will add col1 and col2.

I haven't add unit tests, but manually tests show it works fine. So it is ready for first pass review.
We can test several typical cases:

```
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf
from pyspark.taskcontext import TaskContext

df = spark.createDataFrame([(1, 20), (3, 40)], ["a", "b"])

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi1(it):
    pid = TaskContext.get().partitionId()
    print("DBG: fi1: do init stuff, partitionId=" + str(pid))
    for batch in it:
        yield batch + 100
    print("DBG: fi1: do close stuff, partitionId=" + str(pid))

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi2(it):
    pid = TaskContext.get().partitionId()
    print("DBG: fi2: do init stuff, partitionId=" + str(pid))
    for batch in it:
        yield batch + 10000
    print("DBG: fi2: do close stuff, partitionId=" + str(pid))

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi3(it):
    pid = TaskContext.get().partitionId()
    print("DBG: fi3: do init stuff, partitionId=" + str(pid))
    for x, y in it:
        yield x + y * 10 + 100000
    print("DBG: fi3: do close stuff, partitionId=" + str(pid))

pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
    return x + 1000

udf("int")
def fu1(x):
    return x + 10

# test select "pandas iter udf/pandas udf/sql udf" expressions at the same time.
# Note this case the `fi1("a"), fi2("b"), fi3("a", "b")` will generate only one plan,
# and `fu1("a")`, `fp1("a")` will generate another two separate plans.
df.select(fi1("a"), fi2("b"), fi3("a", "b"), fu1("a"), fp1("a")).show()

# test chain two pandas iter udf together
# Note this case `fi2(fi1("a"))` will generate only one plan
# Also note the init stuff/close stuff call order will be like:
# (debug output following)
#     DBG: fi2: do init stuff, partitionId=0
#     DBG: fi1: do init stuff, partitionId=0
#     DBG: fi1: do close stuff, partitionId=0
#     DBG: fi2: do close stuff, partitionId=0
df.select(fi2(fi1("a"))).show()

# test more complex chain
# Note this case `fi1("a"), fi2("a")` will generate one plan,
# and `fi3(fi1_output, fi2_output)` will generate another plan
df.select(fi3(fi1("a"), fi2("a"))).show()
```

## How was this patch tested?

To be added.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #24643 from WeichenXu123/pandas_udf_iter.

Lead-authored-by: WeichenXu <weichen.xu@databricks.com>
Co-authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2019-06-15 08:29:20 -07:00
Terry Kim a950570f91 [MINOR][CORE] Remove unused variables, unused imports, etc.
## What changes were proposed in this pull request?

- Remove unused variables.
- Remove unused imports.
- Change var to val in few places.

## How was this patch tested?

Unit tests.

Closes #24857 from imback82/unused_variable.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-15 09:42:22 -05:00
HyukjinKwon 26998b86c1 [SPARK-27870][SQL][PYTHON] Add a runtime buffer size configuration for Pandas UDFs
## What changes were proposed in this pull request?

This PR is an alternative approach for #24734.

This PR fixes two things:

1. Respects `spark.buffer.size` in Python workers.
2. Adds a runtime buffer size configuration for Pandas UDFs, `spark.sql.pandas.udf.buffer.size` (which falls back to `spark.buffer.size`.

## How was this patch tested?

Manually tested:

```python
import time
from pyspark.sql.functions import *

spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a'))

pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
    print("run fp1")
    time.sleep(1)
    return x + 100

pandas_udf("int", PandasUDFType.SCALAR)
def fp2(x, y):
    print("run fp2")
    time.sleep(1)
    return x + y

beg_time = time.time()
result = df.select(sum(fp2(fp1('a'), col('a')))).head()
print("result: " + str(result[0]))
print("consume time: " + str(time.time() - beg_time))
```

```
consume time: 62.68265891075134
```

```python
import time
from pyspark.sql.functions import *

spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
spark.conf.set('spark.sql.pandas.udf.buffer.size', '4')
df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a'))

pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
    print("run fp1")
    time.sleep(1)
    return x + 100

pandas_udf("int", PandasUDFType.SCALAR)
def fp2(x, y):
    print("run fp2")
    time.sleep(1)
    return x + y

beg_time = time.time()
result = df.select(sum(fp2(fp1('a'), col('a')))).head()
print("result: " + str(result[0]))
print("consume time: " + str(time.time() - beg_time))
```

```
consume time: 34.00594782829285
```

Closes #24826 from HyukjinKwon/SPARK-27870.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-15 20:56:22 +09:00
Gengliang Wang 23ebd389b5 [SPARK-27418][SQL] Migrate Parquet to File Data Source V2
## What changes were proposed in this pull request?

 Migrate Parquet to File Data Source V2

## How was this patch tested?

Unit test

Closes #24327 from gengliangwang/parquetV2.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-15 20:52:50 +09:00
maryannxue c79f471d04 [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL
## What changes were proposed in this pull request?

Implemented a new SparkPlan that executes the query adaptively. It splits the query plan into independent stages and executes them in order according to their dependencies. The query stage materializes its output at the end. When one stage completes, the data statistics of the materialized output will be used to optimize the remainder of the query.

The adaptive mode is off by default, when turned on, user can see "AdaptiveSparkPlan" as the top node of a query or sub-query. The inner plan of "AdaptiveSparkPlan" is subject to change during query execution but becomes final once the execution is complete. Whether the inner plan is final is included in the EXPLAIN string. Below is an example of the EXPLAIN plan before and after execution:

Query:
```
SELECT * FROM testData JOIN testData2 ON key = a WHERE value = '1'
```

Before execution:
```
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- SortMergeJoin [key#13], [a#23], Inner
   :- Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5)
   :     +- Filter (isnotnull(value#14) AND (value#14 = 1))
   :        +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
   :           +- Scan[obj#12]
   +- Sort [a#23 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#23, 5)
         +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]
```

After execution:
```
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=true)
+- *(1) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft
   :- BroadcastQueryStage 2
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :     +- ShuffleQueryStage 0
   :        +- Exchange hashpartitioning(key#13, 5)
   :           +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
   :              +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
   :                 +- Scan[obj#12]
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(a#23, 5)
         +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]
```

Credit also goes to carsonwang and cloud-fan

## How was this patch tested?

Added new UT.

Closes #24706 from maryannxue/aqe.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: herman <herman@databricks.com>
2019-06-15 11:27:15 +02:00
Sean Owen 15462e1a8f [SPARK-28004][UI] Update jquery to 3.4.1
## What changes were proposed in this pull request?

We're using an old-ish jQuery, 1.12.4, and should probably update for Spark 3 to keep up in general, but also to keep up with CVEs. In fact, we know of at least one resolved in only 3.4.0+ (https://nvd.nist.gov/vuln/detail/CVE-2019-11358). They may not affect Spark, but, if the update isn't painful, maybe worthwhile in order to make future 3.x updates easier.

jQuery 1 -> 2 doesn't sound like a breaking change, as 2.0 is supposed to maintain compatibility with 1.9+ (https://blog.jquery.com/2013/04/18/jquery-2-0-released/)

2 -> 3 has breaking changes: https://jquery.com/upgrade-guide/3.0/. It's hard to evaluate each one, but the most likely area for problems is in ajax(). However, our usage of jQuery (and plugins) is pretty simple.

Update jquery to 3.4.1; update jquery blockUI and mustache to latest

## How was this patch tested?

Manual testing of docs build (except R docs), worker/master UI, spark application UI.
Note: this really doesn't guarantee it works, as our tests can't test javascript, and this is merely anecdotal testing, although I clicked about every link I could find. There's a risk this breaks a minor part of the UI; it does seem to work fine in the main.

Closes #24843 from srowen/SPARK-28004.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-14 22:19:20 -07:00
Peter Toth 9e6666bde1 [SPARK-28002][SQL] Support WITH clause column aliases
## What changes were proposed in this pull request?

This PR adds support of column aliasing in a CTE so this query becomes valid:
```
WITH t(x) AS (SELECT 1)
SELECT * FROM t WHERE x = 1
```
## How was this patch tested?

Added new UTs.

Closes #24842 from peter-toth/SPARK-28002.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-14 20:47:11 -07:00
Dongjoon Hyun fd8240d10c [SPARK-28051][INFRA] Exposing JIRA issue component types at GitHub PRs
## What changes were proposed in this pull request?

This PR aims to expose JIRA issue component types at GitHub PRs.

## How was this patch tested?

Manual.
```
$ export GITHUB_OAUTH_KEY=...
$ export JIRA_PASSWORD=...
$ export GITHUB_API_BASE='https://api.github.com/repos/your-id/spark'
$ dev/github_jira_sync.py
```

Please note that the existing script will raise the following exceptions if your repo has less than 100 PRs. This will be handled at #24874 .
```
Traceback (most recent call last):
  File "dev/github_jira_sync.py", line 139, in <module>
    jira_prs = get_jira_prs()
  File "dev/github_jira_sync.py", line 83, in get_jira_prs
    link_header = filter(lambda k: k.startswith("Link"), page.info().headers)[0]
IndexError: list index out of range
```
That is beyond the scope of this PR.

Closes #24871 from dongjoon-hyun/SPARK-28051.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-14 20:36:45 -07:00
Jungtaek Lim (HeartSaVioR) bd0a04baab [SPARK-26949][SS] Prevent 'purge' to remove needed batch files in CompactibleFileStreamLog
## What changes were proposed in this pull request?

This patch proposes making `purge` in `CompactibleFileStreamLog` to throw `UnsupportedOperationException` to prevent purging necessary batch files, as well as adding javadoc to document its behavior. Actually it would only break when latest compaction batch is requested to be purged, but caller wouldn't be aware of this so safer to just prevent it.

## How was this patch tested?

Added UT.

Closes #23850 from HeartSaVioR/SPARK-26949.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-14 20:34:18 -07:00
maryannxue d1951aa23b [SPARK-28057][SQL] Add method clone in catalyst TreeNode
## What changes were proposed in this pull request?

Implemented the `clone` method for `TreeNode` based on `mapChildren`.

## How was this patch tested?

Added new UT.

Closes #24876 from maryannxue/treenode-clone.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: herman <herman@databricks.com>
2019-06-15 00:40:55 +02:00
Sean Owen b508eab985 [SPARK-21882][CORE] OutputMetrics doesn't count written bytes correctly in the saveAsHadoopDataset function
## What changes were proposed in this pull request?

(Continuation of https://github.com/apache/spark/pull/19118 ; see for details)

## How was this patch tested?

Existing tests.

Closes #24863 from srowen/SPARK-21882.2.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-14 12:44:43 -05:00
Dongjoon Hyun 7533cccc5d [SPARK-28053][INFRA] Handle a corner case where there is no Link header
## What changes were proposed in this pull request?

Currently, `github_jira_sync.py` assumes that there is `Link` always. However, it will fail when the number of the open PR is less than 100 (the default paging number). It will not happen in Apache Spark, but we had better fix that because it happens during review process for `github_jira_sync.py` script.
```
Traceback (most recent call last):
  File "dev/github_jira_sync.py", line 139, in <module>
    jira_prs = get_jira_prs()
  File "dev/github_jira_sync.py", line 83, in get_jira_prs
    link_header = filter(lambda k: k.startswith("Link"), page.info().headers)[0]
IndexError: list index out of range
```

## How was this patch tested?

Manually check with another repo which has small number of open PRs (< 100).
```
$ export JIRA_PASSWORD=...
$ export GITHUB_API_BASE='https://api.github.com/repos/your-id/spark'
$ dev/github_jira_sync.py
```

Closes #24874 from dongjoon-hyun/SPARK-28053.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-14 16:33:34 +09:00
Liang-Chi Hsieh c0297dedd8 [MINOR][PYSPARK][SQL][DOC] Fix rowsBetween doc in Window
## What changes were proposed in this pull request?

I suspect that the doc of `rowsBetween` methods in Scala and PySpark looks wrong.
Because:

```scala
scala> val df = Seq((1, "a"), (2, "a"), (3, "a"), (4, "a"), (5, "a"), (6, "a")).toDF("id", "category")
df: org.apache.spark.sql.DataFrame = [id: int, category: string]

scala> val byCategoryOrderedById = Window.partitionBy('category).orderBy('id).rowsBetween(-1, 2)
byCategoryOrderedById: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec7f04de97

scala> df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
+---+--------+---+
| id|category|sum|
+---+--------+---+
|  1|       a|  6|              # sum from index 0 to (0 + 2): 1 + 2 + 3 = 6
|  2|       a| 10|              # sum from index (1 - 1) to (1 + 2): 1 + 2 + 3 + 4 = 10
|  3|       a| 14|
|  4|       a| 18|
|  5|       a| 15|
|  6|       a| 11|
+---+--------+---+
```

So the frame (-1, 2) for row with index 5, as described in the doc, should range from index 4 to index 7.

## How was this patch tested?

N/A, just doc change.

Closes #24864 from viirya/window-spec-doc.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-14 09:56:37 +09:00
Zhu, Lipeng 5700c39c89 [SPARK-27578][SQL] Support INTERVAL ... HOUR TO SECOND syntax
## What changes were proposed in this pull request?

Currently, SparkSQL can support interval format like this.
```sql
SELECT INTERVAL '0 23:59:59.155' DAY TO SECOND
 ```

Like Presto/Teradata, this PR aims to support grammar like below.
```sql
SELECT INTERVAL '23:59:59.155' HOUR TO SECOND
```

Although we can add a new function for this pattern, we had better extend the existing code to handle a missing day case. So, the following is also supported.
```sql
SELECT INTERVAL '23:59:59.155' DAY TO SECOND
SELECT INTERVAL '1 23:59:59.155' HOUR TO SECOND
```
Currently Vertica/Teradata/Postgresql/SQL Server have fully support of below interval functions.
- interval ... year to month
- interval ... day to hour
- interval ... day to minute
- interval ... day to second
- interval ... hour to minute
- interval ... hour to second
- interval ... minute to second

https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/LanguageElements/Literals/interval-qualifier.htm
df1a699e5b/src/test/regress/sql/interval.sql (L180-L203)
https://docs.teradata.com/reader/S0Fw2AVH8ff3MDA0wDOHlQ/KdCtT3pYFo~_enc8~kGKVw
https://docs.microsoft.com/en-us/sql/odbc/reference/appendixes/interval-literals?view=sql-server-2017

## How was this patch tested?

Pass the Jenkins with the updated test cases.

Closes #24472 from lipzhu/SPARK-27578.

Lead-authored-by: Zhu, Lipeng <lipzhu@ebay.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: Lipeng Zhu <lipzhu@icloud.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-13 10:12:55 -07:00
zhengruifeng 7281784883 [SPARK-16692][ML][PYTHON] add MultilabelClassificationEvaluator
## What changes were proposed in this pull request?
add MultilabelClassificationEvaluator

## How was this patch tested?
added testsuites

Closes #24777 from zhengruifeng/multi_label_eval.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-13 07:58:22 -05:00
John Zhuge abe370f971 [SPARK-27322][SQL] DataSourceV2 table relation
## What changes were proposed in this pull request?

Support multi-catalog in the following SELECT code paths:

- SELECT * FROM catalog.db.tbl
- TABLE catalog.db.tbl
- JOIN or UNION tables from different catalogs
- SparkSession.table("catalog.db.tbl")
- CTE relation
- View text

## How was this patch tested?

New unit tests.
All existing unit tests in catalyst and sql core.

Closes #24741 from jzhuge/SPARK-27322-pr.

Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-06-13 13:48:40 +08:00
Liang-Chi Hsieh ddf4a50312 [SPARK-28031][PYSPARK][TEST] Improve doctest on over function of Column
## What changes were proposed in this pull request?

Just found the doctest on `over` function of `Column` is commented out. The window spec is also not for the window function used there.

We should either remove the doctest, or improve it.

Because other functions of `Column` have doctest generally, so this PR tries to improve it.

## How was this patch tested?

Added doctest.

Closes #24854 from viirya/column-test-minor.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-13 11:04:41 +09:00
Xiangrui Meng 4f4829b4ae [SPARK-28030][SQL] convert filePath to URI in binary file data source
## What changes were proposed in this pull request?

Convert `PartitionedFile.filePath` to URI first in binary file data source. Otherwise Spark will throw a FileNotFound exception because we create `Path` with URL encoded string, instead of wrapping it with URI.

## How was this patch tested?

Unit test.

Closes #24855 from mengxr/SPARK-28030.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2019-06-12 13:24:02 -07:00
Dongjoon Hyun 37ab43339d [SPARK-28013][BUILD][SS] Upgrade to Kafka 2.2.1
## What changes were proposed in this pull request?

For Apache Spark 3.0.0 release, this PR aims to update Kafka dependency to 2.2.1 to bring the following improvement and bug fixes like [KAFKA-8134](https://issues.apache.org/jira/browse/KAFKA-8134) (`'linger.ms' must be a long`).

https://issues.apache.org/jira/projects/KAFKA/versions/12345010

## How was this patch tested?

Pass the Jenkins.

Closes #24847 from dongjoon-hyun/SPARK-28013.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-12 07:34:42 -07:00