Commit graph

23897 commits

Author SHA1 Message Date
Hyukjin Kwon 88bc481b9e [SPARK-26830][SQL][R] Vectorized R dapply() implementation
## What changes were proposed in this pull request?

This PR targets to add vectorized `dapply()` in R, Arrow optimization.

This can be tested as below:

```bash
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```

```r
df <- createDataFrame(mtcars)
collect(dapply(df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
```

### Requirements
  - R 3.5.x
  - Arrow package 0.12+
    ```bash
    Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
    ```

**Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
**Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.

### Benchmarks

**Shall**

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false --driver-memory 4g
```

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true --driver-memory 4g
```

**R code**

```r
rdf <- read.csv("500000.csv")
df <- cache(createDataFrame(rdf))
count(df)

test <- function() {
  options(digits.secs = 6) # milliseconds
  start.time <- Sys.time()
  count(cache(dapply(df, function(rdf) { rdf }, schema(df))))
  end.time <- Sys.time()
  time.taken <- end.time - start.time
  print(time.taken)
}

test()
```

**Data (350 MB):**

```r
object.size(read.csv("500000.csv"))
350379504 bytes
```

"500000 Records"  http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/

**Results**

```
Time difference of 13.42037 mins
```

```
Time difference of 30.64156 secs
```

The performance improvement was around **2627%**.

### Limitations

- For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values.

- Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.

## How was this patch tested?

Unit tests were added, and manually tested.

Closes #23787 from HyukjinKwon/SPARK-26830-1.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-02-27 14:29:58 +09:00
Liang-Chi Hsieh 0f2c0b53e8 [SPARK-26837][SQL] Pruning nested fields from object serializers
## What changes were proposed in this pull request?

In SPARK-26619, we make change to prune unnecessary individual serializers when serializing objects. This is extension to SPARK-26619. We can further prune nested fields from object serializers if they are not used.

For example, in following query, we only use one field in a struct column:

```scala
val data = Seq((("a", 1), 1), (("b", 2), 2), (("c", 3), 3))
val df = data.toDS().map(t => (t._1, t._2 + 1)).select("_1._1")
```

So, instead of having a serializer to create a two fields struct, we can prune unnecessary field from it. This is what this PR proposes to do.

In order to make this change conservative and safer, a SQL config is added to control it. It is disabled by default.

TODO: Support to prune nested fields inside MapType's key and value.

## How was this patch tested?

Added tests.

Closes #23740 from viirya/nested-pruning-serializer-2.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-02-27 12:45:24 +08:00
Sean Owen 9c283662c6 [SPARK-26986][ML] Add JAXB reference impl to build for Java 9+
## What changes were proposed in this pull request?

Add reference JAXB impl for Java 9+ from Glassfish. Right now it's only apparently necessary in MLlib but can be expanded later.

## How was this patch tested?

Existing tests particularly PMML-related ones, which use JAXB.
This works on Java 11.

Closes #23890 from srowen/SPARK-26986.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-26 18:26:49 -06:00
Hellsen83 387efe29b7 [SPARK-26449][PYTHON] Add transform method to DataFrame API
## What changes were proposed in this pull request?

Added .transform() method to Python DataFrame API to be in sync with Scala API.

## How was this patch tested?

Addition has been tested manually.

Closes #23877 from Hellsen83/pyspark-dataframe-transform.

Authored-by: Hellsen83 <erik.christiansen83@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-26 18:22:36 -06:00
Jungtaek Lim (HeartSaVioR) c17150a5f5 [SPARK-22860][CORE][YARN] Redact command line arguments for running Driver and Executor before logging (standalone and YARN)
## What changes were proposed in this pull request?

This patch applies redaction to command line arguments before logging them. This applies to two resource managers: standalone cluster and YARN.

This patch only concerns about arguments starting with `-D` since Spark is likely passing the Spark configuration to command line arguments as `-Dspark.blabla=blabla`. More change is necessary if we also want to handle the case of `--conf spark.blabla=blabla`.

## How was this patch tested?

Added UT for redact logic. This patch only touches how to log so not easy to add UT regarding it.

Closes #23820 from HeartSaVioR/MINOR-redact-command-line-args-for-running-driver-executor.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-26 14:49:46 -08:00
Marcelo Vanzin afbff6446f Revert "[SPARK-26742][K8S] Update Kubernetes-Client version to 4.1.2"
This reverts commit a3192d966a.
2019-02-26 13:42:07 -08:00
Maxim Gekk a2a41b7bf2 [SPARK-26978][CORE][SQL] Avoid magic time constants
## What changes were proposed in this pull request?

In the PR, I propose to refactor existing code related to date/time conversions, and replace constants like `1000` and `1000000` by `DateTimeUtils` constants and transformation functions from `java.util.concurrent.TimeUnit._`.

## How was this patch tested?

The changes are tested by existing test suites.

Closes #23878 from MaxGekk/magic-time-constants.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-26 09:08:12 -06:00
seancxmao 3d2e55abd0 [MINOR][DOCS] Remove Akka leftover
## What changes were proposed in this pull request?
Since Spark 2.0, Akka is not used anymore and Akka related stuff were removed. However there are still some leftover. This PR aims to remove these leftover.

* `/pom.xml` has a comment about Akka, which is not needed anymore.

## How was this patch tested?
Existing tests.

Closes #23885 from seancxmao/remove-akka-leftover.

Authored-by: seancxmao <seancxmao@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-26 08:31:02 -06:00
Xianyang Liu bc03c8b3fa [SPARK-26952][SQL] Row count statics should respect the data reported by data source
## What changes were proposed in this pull request?

In data source v2, if the data source scan implemented `SupportsReportStatistics`. `DataSourceV2Relation` should respect the row count reported by the data source.

## How was this patch tested?

New UT test.

Closes #23853 from ConeyLiu/report-row-count.

Authored-by: Xianyang Liu <xianyang.liu@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-02-26 14:10:54 +08:00
liupengcheng 52a180f25f [SPARK-26674][CORE] Consolidate CompositeByteBuf when reading large frame
## What changes were proposed in this pull request?

Currently, TransportFrameDecoder will not consolidate the buffers read from network which may cause memory waste. Actually, bytebuf's writtenIndex is far less than it's capacity  in most cases, so we can optimize it by doing consolidation.

This PR will do this optimization.

Related codes:
9a30e23211/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java (L143)

## How was this patch tested?

UT

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #23602 from liupc/Reduce-memory-consumption-in-TransportFrameDecoder.

Lead-authored-by: liupengcheng <liupengcheng@xiaomi.com>
Co-authored-by: Liupengcheng <liupengcheng@xiaomi.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-25 16:40:46 -08:00
Gengliang Wang 4baa2d4449 [SPARK-26673][FOLLOWUP][SQL] File Source V2: check existence of output path before delete it
## What changes were proposed in this pull request?
This is a followup PR to resolve comment: https://github.com/apache/spark/pull/23601#pullrequestreview-207101115

When Spark writes DataFrame with "overwrite" mode, it deletes the output path before actual writes. To safely handle the case that the output path doesn't exist,  it is suggested to follow the V1 code by checking the existence.

## How was this patch tested?

Apply https://github.com/apache/spark/pull/23836 and run unit tests

Closes #23889 from gengliangwang/checkFileBeforeOverwrite.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-02-25 16:20:06 -08:00
Ilya Matiach b66be0e490 [SPARK-24103][ML][MLLIB] ML Evaluators should use weight column - added weight column for binary classification evaluator
## What changes were proposed in this pull request?

The evaluators BinaryClassificationEvaluator, RegressionEvaluator, and MulticlassClassificationEvaluator and the corresponding metrics classes BinaryClassificationMetrics, RegressionMetrics and MulticlassMetrics should use sample weight data.

I've closed the PR: https://github.com/apache/spark/pull/16557
as recommended in favor of creating three pull requests, one for each of the evaluators (binary/regression/multiclass) to make it easier to review/update.

## How was this patch tested?
I added tests to the metrics and evaluators classes.

Closes #17084 from imatiach-msft/ilmat/binary-evalute.

Authored-by: Ilya Matiach <ilmat@microsoft.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-25 17:16:51 -06:00
Marcelo Vanzin 4808393449 [SPARK-26788][YARN] Remove SchedulerExtensionService.
Since the yarn module is actually private to Spark, this interface was never
actually "public". Since it has no use inside of Spark, let's avoid adding
a yarn-specific extension that isn't public, and point any potential users
are more general solutions (like using a SparkListener).

Closes #23839 from vanzin/SPARK-26788.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-25 13:57:37 -06:00
“attilapiros” 0ac516bebd [SPARK-25035][CORE] Avoiding memory mapping at disk-stored blocks replication
Before this PR the method `BlockManager#putBlockDataAsStream()` (which is used during block replication where the block data is received as a stream) was reading the whole block content into the memory even at DISK_ONLY storage level.

With this change the received block data (which was temporary stored in a file) is just simply moved into the right location backing the target block. This way a possible OOM error is avoided.

In this implementation to save code duplications the method `doPutBytes` is refactored into a template method called `BlockStoreUpdater` which has a separate implementation to handle byte buffer based and temporary file based block store updates.

With existing unit tests of `DistributedSuite` (the ones dealing with replications):
- caching on disk, replicated (encryption = off) (with replication as stream)
- caching on disk, replicated (encryption = on) (with replication as stream)
- caching in memory, serialized, replicated (encryption = on) (with replication as stream)
- caching in memory, serialized, replicated (encryption = off) (with replication as stream)
- etc.

And with new unit tests testing `putBlockDataAsStream` method directly:
- test putBlockDataAsStream with caching (encryption = off)
- test putBlockDataAsStream with caching (encryption = on)
- test putBlockDataAsStream with caching on disk (encryption = off)
- test putBlockDataAsStream with caching on disk (encryption = on)

Closes #23688 from attilapiros/SPARK-25035.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-25 11:43:39 -08:00
Jungtaek Lim (HeartSaVioR) c5de804093 [MINOR][BUILD] Update all checkstyle dtd to use "https://checkstyle.org"
## What changes were proposed in this pull request?

Below build failed with Java checkstyle test, but instead of violation it shows FileNotFound on dtd file.
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102751/

Looks like the link of dtd file is dead `http://www.puppycrawl.com/dtds/configuration_1_3.dtd`.

This patch updates the dtd link to "https://checkstyle.org/dtds/" given checkstyle repository also updated the URL path.
https://github.com/checkstyle/checkstyle/issues/5601

## How was this patch tested?

Checked the new links.

Closes #23887 from HeartSaVioR/java-checkstyle-dtd-change-url.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-25 11:25:53 -08:00
Jiaxin Shan a3192d966a [SPARK-26742][K8S] Update Kubernetes-Client version to 4.1.2
## What changes were proposed in this pull request?
Changed the `kubernetes-client` version to 4.1.2.  Latest version fix error with exec credentials (used by aws eks) and this will be used to talk with kubernetes API server. Users can submit spark job to EKS api endpoint now with this patch.

## How was this patch tested?
unit tests and manual tests.

Closes #23814 from Jeffwan/update_k8s_sdk.

Authored-by: Jiaxin Shan <seedjeffwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-25 04:56:04 -06:00
Sean Owen d2529788ed [SPARK-26966][ML] Update to JPMML 1.4.8
## What changes were proposed in this pull request?

JPMML apparently only supports Java 9 in 1.4.2+. We are seeing text failures from JPMML relating to JAXB when running on Java 11. It's shaded and not a big change, so should be safe.

## How was this patch tested?

Existing tests.

Closes #23868 from srowen/SPARK-26966.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-25 04:37:45 -06:00
Maxim Gekk 2d2fb34b93 [SPARK-26953][CORE][TEST] Test TimSort for ArrayIndexOutOfBoundsException
## What changes were proposed in this pull request?

In the PR, I propose to test the input showed at the end of the article: https://arxiv.org/pdf/1805.08612.pdf . The difference of the test and paper's test is type of array. This test allocates arrays of bytes instead of array of ints.

## How was this patch tested?

New test is added to `SorterSuite`.

Closes #23856 from MaxGekk/timsort-bug-fix.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-24 17:37:32 -06:00
Douglas R Colkitt faa61980c4 [SPARK-26935][SQL] Skip DataFrameReader's CSV first line scan when not used
Prior to this patch, all DataFrameReader.csv() calls would collect the first
line from the CSV input iterator. This is done to allow schema inference from the
header row.

However when schema is already specified this is a wasteful operation. It results
in an unncessary compute step on the first partition. This can be expensive if
the CSV itself is expensive to generate (e.g. it's the product of a long-running
external pipe()).

This patch short-circuits the first-line collection in DataFrameReader.csv() when
schema is specified. Thereby improving CSV read performance in certain cases.

## What changes were proposed in this pull request?

Short-circuiting DataFrameReader.csv() first-line read when schema is user-specified.

## How was this patch tested?

Compiled and tested against several CSV datasets.

Closes #23830 from Mister-Meeseeks/master.

Authored-by: Douglas R Colkitt <douglas.colkitt@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-23 14:00:57 -06:00
Maxim Gekk 75c48ac36d [SPARK-26908][SQL] Fix DateTimeUtils.toMillis and millisToDays
## What changes were proposed in this pull request?

The `DateTimeUtils.toMillis` can produce inaccurate result for some negative values (timestamps before epoch). The error can be around 1ms. In the PR, I propose to use `Math.floorDiv` in casting microseconds to milliseconds, and milliseconds to days since epoch.

## How was this patch tested?

Added new test to `DateTimeUtilsSuite`, and tested by `CastSuite` as well.

Closes #23815 from MaxGekk/micros-to-millis.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-23 11:35:11 -06:00
seancxmao a07b07fd85 [MINOR][DOCS] Remove references to Shark
## What changes were proposed in this pull request?
This PR aims to remove references to "Shark", which is a precursor to Spark SQL. I searched the whole project for the text "Shark" (ignore case) and just found a single match. Note that occurrences like nickname or test data are irrelevant.

## How was this patch tested?
N/A. Change comments only.

Closes #23876 from seancxmao/remove-Shark.

Authored-by: seancxmao <seancxmao@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-23 11:03:05 -06:00
Sean Owen ab4e83aca7 [SPARK-26963][MLLIB] SizeEstimator can't make some JDK fields accessible in Java 9+
## What changes were proposed in this pull request?

Don't use inaccessible fields in SizeEstimator, which comes up in Java 9+

## How was this patch tested?

Manually ran tests with Java 11; it causes these tests that failed before to pass.
This ought to pass on Java 8 as there's effectively no change for Java 8.

Closes #23866 from srowen/SPARK-26963.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-23 11:01:47 -06:00
seancxmao ce3a157f00 [SPARK-26939][CORE][DOC] Fix some outdated comments about task schedulers
## What changes were proposed in this pull request?
This PR aims to fix some outdated comments about task schedulers.

1. Change "ClusterScheduler" to "YarnScheduler" in comments of `YarnClusterScheduler`

According to [SPARK-1140 Remove references to ClusterScheduler](https://issues.apache.org/jira/browse/SPARK-1140), ClusterScheduler is not used anymore.

I also searched "ClusterScheduler" within the whole project, no other occurrences are found in comments or test cases. Note classes like `YarnClusterSchedulerBackend` or `MesosClusterScheduler` are not relevant.

2. Update comments about `statusUpdate` from `TaskSetManager`
`statusUpdate` has been moved to `TaskSchedulerImpl`. StatusUpdate event handling is delegated to `handleSuccessfulTask`/`handleFailedTask`.

## How was this patch tested?
N/A. Fix comments only.

Closes #23844 from seancxmao/taskscheduler-comments.

Authored-by: seancxmao <seancxmao@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-23 10:52:53 -06:00
Maxim Gekk d0f2fd05e1 [SPARK-26903][SQL] Remove the TimeZone cache
## What changes were proposed in this pull request?

In the PR, I propose to convert time zone string to `TimeZone` by converting it to `ZoneId` which uses `ZoneOffset` internally. The `ZoneOffset` class of JDK 8 has a cache already: http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/classes/java/time/ZoneOffset.java#l205 . In this way, there is no need to support cache of time zones in Spark.

The PR removes `computedTimeZones` from `DateTimeUtils`, and uses `ZoneId.of` to convert time zone id string to `ZoneId` and to `TimeZone` at the end.

## How was this patch tested?

The changes were tested by

Closes #23812 from MaxGekk/timezone-cache.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-23 09:44:22 -06:00
Hyukjin Kwon a56b3511fc [SPARK-26945][PYTHON][SS][TESTS] Fix flaky test_*_await_termination in PySpark SS tests
## What changes were proposed in this pull request?

This PR proposes to make sure processing all available data before stopping and delete the temp directory.

See https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console

```
ERROR: test_query_manager_await_termination (pyspark.sql.tests.test_streaming.StreamingTests)
----------------------------------------------------------------------
Traceback (most recent call last):
 File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests/test_streaming.py", line 259, in test_query_manager_await_termination
 shutil.rmtree(tmpPath)
 File "/home/anaconda/lib/python2.7/shutil.py", line 256, in rmtree
 onerror(os.rmdir, path, sys.exc_info())
 File "/home/anaconda/lib/python2.7/shutil.py", line 254, in rmtree
 os.rmdir(path)
OSError: [Errno 39] Directory not empty: '/home/jenkins/workspace/SparkPullRequestBuilder/python/target/072153bd-f981-47be-bda2-e2b657a16f65/tmp4WGp7n'
```

See https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102311/console

```
ERROR: test_stream_await_termination (pyspark.sql.tests.test_streaming.StreamingTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/jenkins/workspace/SparkPullRequestBuilder2/python/pyspark/sql/tests/test_streaming.py", line 202, in test_stream_await_termination
    shutil.rmtree(tmpPath)
  File "/usr/lib64/pypy-2.5.1/lib-python/2.7/shutil.py", line 256, in rmtree
    onerror(os.rmdir, path, sys.exc_info())
  File "/usr/lib64/pypy-2.5.1/lib-python/2.7/shutil.py", line 254, in rmtree
    os.rmdir(path)
OSError: [Errno 39] Directory not empty: '/home/jenkins/workspace/SparkPullRequestBuilder2/python/target/7244f4ff-6b60-4f6c-b787-de4f15922bf5/tmpQbMZSo'
```

## How was this patch tested?

Jenkins tests - I should run multiple times to see if there are other flaky tests + if this PR really fixes it.

Closes #23870 from HyukjinKwon/SPARK-26945.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-02-23 14:57:04 +08:00
Takeshi Yamamuro 967e4cb011 [SPARK-26215][SQL] Define reserved/non-reserved keywords based on the ANSI SQL standard
## What changes were proposed in this pull request?
This pr targeted to define reserved/non-reserved keywords for Spark SQL based on the ANSI SQL standards and the other database-like systems (e.g., PostgreSQL). We assume that they basically follow the ANSI SQL-2011 standard, but it is slightly different between each other. Therefore, this pr documented all the keywords in `docs/sql-reserved-and-non-reserved-key-words.md`.

NOTE: This pr only added a small set of keywords as reserved ones and these keywords are reserved in all the ANSI SQL standards (SQL-92, SQL-99, SQL-2003, SQL-2008, SQL-2011, and SQL-2016) and PostgreSQL. This is because there is room to discuss which keyword should be reserved or not, .e.g., interval units (day, hour, minute, second, ...) are reserved in the ANSI SQL standards though, they are not reserved in PostgreSQL. Therefore, we need more researches about the other database-like systems (e.g., Oracle Databases, DB2, SQL server) in follow-up activities.

References:
 - The reserved/non-reserved SQL keywords in the ANSI SQL standards: https://developer.mimer.com/wp-content/uploads/2018/05/Standard-SQL-Reserved-Words-Summary.pdf
 - SQL Key Words in PostgreSQL: https://www.postgresql.org/docs/current/sql-keywords-appendix.html

## How was this patch tested?
Added tests in `TableIdentifierParserSuite`.

Closes #23259 from maropu/SPARK-26215-WIP.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-02-23 08:38:47 +09:00
Alessandro Bellina 79a650494f [SPARK-26895][CORE] prepareSubmitEnvironment should be called within doAs for proxy users
## What changes were proposed in this pull request?

`prepareSubmitEnvironment` performs globbing that will fail in the case where a proxy user (`--proxy-user`) doesn't have permission to the file. This is a bug also with 2.3, so we should backport, as currently you can't launch an application that for instance is passing a file under `--archives`, and that file is owned by the target user.

The solution is to call `prepareSubmitEnvironment` within a doAs context if proxying.

## How was this patch tested?

Manual tests running with `--proxy-user` and `--archives`, before and after, showing that the globbing is successful when the resource is owned by the target user.

I've looked at writing unit tests, but I am not sure I can do that cleanly (perhaps with a custom FileSystem). Open to ideas.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #23806 from abellina/SPARK-26895_prepareSubmitEnvironment_from_doAs.

Lead-authored-by: Alessandro Bellina <abellina@gmail.com>
Co-authored-by: Alessandro Bellina <abellina@yahoo-inc.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-22 11:15:20 -08:00
Sean Owen 95bb01282c [SPARK-26851][SQL][FOLLOWUP] Fix cachedColumnBuffers field for Scala 2.11 build
## What changes were proposed in this pull request?

Per https://github.com/apache/spark/pull/23768/files#r259083019 the last change to this line here caused the 2.11 build to fail. It's worked around by making `_cachedColumnBuffers` a field, as it was never set by callers to anything other than its default of null.

## How was this patch tested?

Existing tests.

Closes #23864 from srowen/SPARK-26851.2.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-02-22 15:22:52 +09:00
nandorKollar 066379783a [SPARK-26930][SQL] Tests in ParquetFilterSuite don't verify filter class
## What changes were proposed in this pull request?

Add assert to verify predicate class in ParquetFilterSuite

## How was this patch tested?

Ran ParquetFilterSuite, tests passed

Closes #23855 from nandorKollar/SPARK-26930.

Lead-authored-by: nandorKollar <nandorKollar@users.noreply.github.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Co-authored-by: Nandor Kollar <nkollar@cloudera.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-02-22 14:07:55 +08:00
Dongjoon Hyun ffef3d4074 [SPARK-26950][SQL][TEST] Make RandomDataGenerator use Float.NaN or Double.NaN for all NaN values
## What changes were proposed in this pull request?

Apache Spark uses the predefined `Float.NaN` and `Double.NaN` for NaN values, but there exists more NaN values with different binary presentations.

```scala
scala> java.nio.ByteBuffer.allocate(4).putFloat(Float.NaN).array
res1: Array[Byte] = Array(127, -64, 0, 0)

scala> val x = java.lang.Float.intBitsToFloat(-6966608)
x: Float = NaN

scala> java.nio.ByteBuffer.allocate(4).putFloat(x).array
res2: Array[Byte] = Array(-1, -107, -78, -80)
```

Since users can have these values, `RandomDataGenerator` generates these NaN values. However, this causes `checkEvaluationWithUnsafeProjection` failures due to the difference between `UnsafeRow` binary presentation. The following is the UT failure instance. This PR aims to fix this UT flakiness.

- https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102528/testReport/

## How was this patch tested?

Pass the Jenkins with the newly added test cases.

Closes #23851 from dongjoon-hyun/SPARK-26950.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-02-22 12:25:26 +08:00
zhengruifeng 89d42dc6d3 [SPARK-25097][ML] Support prediction on single instance in KMeans/BiKMeans/GMM
## What changes were proposed in this pull request?
expose method `predict` in KMeans/BiKMeans/GMM

## How was this patch tested?
added testsuites

Closes #22087 from zhengruifeng/clu_pre_instance.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-21 22:21:28 -06:00
Maxim Gekk 1304974539 [SPARK-26955][CORE] Align Spark's TimSort to jdk11 implementation
## What changes were proposed in this pull request?

Spark's TimSort deviates from JDK 11 TimSort in a couple places:
- `stackLen` was increased in jdk
- additional cases for break in `mergeCollapse`: `n < 0`

In the PR, I propose to align Spark TimSort to jdk implementation.

## How was this patch tested?

By existing test suites, in particular, `SorterSuite`.

Closes #23858 from MaxGekk/timsort-java-alignment.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-21 22:18:23 -06:00
gengjiaan f9776e3892 [MINOR][SQL] Fix typo in exception about set table properties.
## What changes were proposed in this pull request?

The function of the method named verifyTableProperties is

`If the given table properties contains datasource properties, throw an exception. We will do this check when create or alter a table, i.e. when we try to write table metadata to Hive metastore.`

But the message of AnalysisException in verifyTableProperties contains one typo and one unsuited word.
So I change the exception from

`Cannot persistent ${table.qualifiedName} into hive metastore`

to

`Cannot persist ${table.qualifiedName} into Hive metastore`

## How was this patch tested?

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #23574 from beliefer/incorrect-analysis-exception.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-21 22:13:47 -06:00
Joseph K. Bradley be1cadf16d [SPARK-26960][ML] Wait for listener bus to clear in MLEventsSuite to reduce test flakiness
## What changes were proposed in this pull request?

This patch aims to address flakiness I've observed in MLEventsSuite in these tests:
*  test("pipeline read/write events")
*  test("pipeline model read/write events")

The issue is in the "read/write events" tests, which work as follows:
* write
* wait until we see at least 1 write-related SparkListenerEvent
* read
* wait until we see at least 1 read-related SparkListenerEvent

The problem is that the last step does NOT allow any write-related SparkListenerEvents, but some of those events may be delayed enough that they are seen in this last step. We should ideally add logic before "read" to wait until the listener events are cleared/complete. Looking into other SparkListener tests, we need to use `sc.listenerBus.waitUntilEmpty(TIMEOUT)`.

This patch adds the waitUntilEmpty() call.

## How was this patch tested?

It's a test!

Closes #23863 from jkbradley/SPARK-26960.

Authored-by: Joseph K. Bradley <joseph@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-02-22 10:08:16 +08:00
Dongjoon Hyun 6bd995b101 [SPARK-26958][SQL][TEST] Add NestedSchemaPruningBenchmark
## What changes were proposed in this pull request?

This adds `NestedSchemaPruningBenchmark` to show the nested schema pruning performance clearly and to verify new PR's performance benefit and to prevent the future performance degradation.

## How was this patch tested?

Manually run the benchmark.

```
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.NestedSchemaPruningBenchmark"
```

Closes #23862 from dongjoon-hyun/SPARK-NESTED-SCHEMA-PRUNING-BM.

Lead-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-02-21 23:39:36 +00:00
Liang-Chi Hsieh 91caf0bfce [DOCS] MINOR Complement the document of stringOrderType for StringIndexer in PySpark
## What changes were proposed in this pull request?

We revised the behavior of the param `stringOrderType` of `StringIndexer` in case of equal frequency when under frequencyDesc/Asc. This isn't reflected in PySpark's document. We should do it.

## How was this patch tested?

Only document change.

Closes #23849 from viirya/py-stringindexer-doc.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Holden Karau <holden@pigscanfly.ca>
2019-02-21 08:36:48 -08:00
Dave DeCaprio 17d0cfcaa4 [SPARK-26917][SQL] Cache lock recache by condition
## What changes were proposed in this pull request?

Related to SPARK-26617 and SPARK-26548.  There was a new location we found where we were still seeing the locks.  We traced it to the recacheByCondition function.  In this PR I have changed that function so that the writeLock is not held while the condition is being evaluated.

cloud-fan & gatorsmile This is a further tweak to the other cache PRs we have done (which have helped us tremendously).

## How was this patch tested?

Has been tested on a live system where the blocking was causing major issues and it is working well.
CacheManager has no explicit unit test but is used in many places internally as part of the SharedState.

Closes #23833 from DaveDeCaprio/cache-lock-recacheByCondition.

Lead-authored-by: Dave DeCaprio <daved@alum.mit.edu>
Co-authored-by: David DeCaprio <daved@alum.mit.edu>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-21 09:04:50 -06:00
Felix Cheung 927081dd95 [R] update package description
## What changes were proposed in this pull request?

update package description

Closes #23852 from felixcheung/rdesccran.

Authored-by: Felix Cheung <felixcheung_m@hotmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-02-21 19:00:36 +08:00
Shixiong Zhu 77b99af573
[SPARK-26824][SS] Fix the checkpoint location and _spark_metadata when it contains special chars
## What changes were proposed in this pull request?

When a user specifies a checkpoint location or a file sink output using a path containing special chars that need to be escaped in a path, the streaming query will store checkpoint and file sink metadata in a wrong place. In this PR, I uploaded a checkpoint that was generated by the following codes using Spark 2.4.0 to show this issue:

```
implicit val s = spark.sqlContext
val input = org.apache.spark.sql.execution.streaming.MemoryStream[Int]
input.addData(1, 2, 3)
val q = input.toDF.writeStream.format("parquet").option("checkpointLocation", ".../chk %#chk").start(".../output %#output")
q.stop()
```
Here is the structure of the directory:
```
sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0
├── chk%252520%252525%252523chk
│   ├── commits
│   │   └── 0
│   ├── metadata
│   └── offsets
│       └── 0
├── output %#output
│   └── part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet
└── output%20%25%23output
    └── _spark_metadata
        └── 0
```

In this checkpoint, the user specified checkpoint location is `.../chk %#chk` but the real path to store the checkpoint is `.../chk%252520%252525%252523chk` (this is generated by escaping the original path three times). The user specified output path is `.../output %#output` but the path to store `_spark_metadata` is `.../output%20%25%23output/_spark_metadata` (this is generated by escaping the original path once). The data files are still in the correct path (such as `.../output %#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet`).

This checkpoint will be used in unit tests in this PR.

The fix is just simply removing improper `Path.toUri` calls to fix the issue.

However, as the user may not read the release note and is not aware of this checkpoint location change, if they upgrade Spark without moving checkpoint to the new location, their query will just start from the scratch. In order to not surprise the users, this PR also adds a check to **detect the impacted paths and throws an error** to include the migration guide. This check can be turned off by an internal sql conf `spark.sql.streaming.checkpoint.escapedPathCheck.enabled`. Here are examples of errors that will be reported:

- Streaming checkpoint error:
```
Error: we detected a possible problem with the location of your checkpoint and you
likely need to move it before restarting this query.

Earlier version of Spark incorrectly escaped paths when writing out checkpoints for
structured streaming. While this was corrected in Spark 3.0, it appears that your
query was started using an earlier version that incorrectly handled the checkpoint
path.

Correct Checkpoint Directory: /.../chk %#chk
Incorrect Checkpoint Directory: /.../chk%252520%252525%252523chk

Please move the data from the incorrect directory to the correct one, delete the
incorrect directory, and then restart this query. If you believe you are receiving
this message in error, you can disable it with the SQL conf
spark.sql.streaming.checkpoint.escapedPathCheck.enabled.
```

- File sink error (`_spark_metadata`):
```
Error: we detected a possible problem with the location of your "_spark_metadata"
directory and you likely need to move it before restarting this query.

Earlier version of Spark incorrectly escaped paths when writing out the
"_spark_metadata" directory for structured streaming. While this was corrected in
Spark 3.0, it appears that your query was started using an earlier version that
incorrectly handled the "_spark_metadata" path.

Correct "_spark_metadata" Directory: /.../output %#output/_spark_metadata
Incorrect "_spark_metadata" Directory: /.../output%20%25%23output/_spark_metadata

Please move the data from the incorrect directory to the correct one, delete the
incorrect directory, and then restart this query. If you believe you are receiving
this message in error, you can disable it with the SQL conf
spark.sql.streaming.checkpoint.escapedPathCheck.enabled.
```

## How was this patch tested?

The new unit tests.

Closes #23733 from zsxwing/path-fix.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-02-20 15:44:20 -08:00
liupengcheng 2153b316bd [SPARK-26892][CORE] Fix saveAsTextFile throws NullPointerException when null row present
## What changes were proposed in this pull request?

Currently, RDD.saveAsTextFile may throw NullPointerException then null row is present.
```
scala> sc.parallelize(Seq(1,null),1).saveAsTextFile("/tmp/foobar.dat")
19/02/15 21:39:17 ERROR Utils: Aborting task
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$3(RDD.scala:1510)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:129)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1352)
at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:127)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:83)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1318)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```

This PR write "Null" for null row to avoid NPE and fix it.

## How was this patch tested?

NA

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #23799 from liupc/Fix-saveAsTextFile-throws-NullPointerException-when-null-row-present.

Lead-authored-by: liupengcheng <liupengcheng@xiaomi.com>
Co-authored-by: Liupengcheng <liupengcheng@xiaomi.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-20 16:42:55 -06:00
Liupengcheng eb6fd7eab7 [SPARK-26877][YARN] Support user-level app staging directory in yarn mode when spark.yarn…
Currently, when running applications on yarn mode, the app staging directory of  is controlled by `spark.yarn.stagingDir` config if specified, and this directory cannot separate different users, sometimes, it's inconvenient for file and quota management for users.

Sometimes, there might be an unexpected increasing of the staging files, two possible reasons are:
1. The `spark.yarn.preserve.staging.files` provided can be misused by users
2. cron task constantly starting new applications on non-existent yarn queue(wrong configuration).

But now, we are not easy to find out the which user obtains the most HDFS files or spaces.
what's more, even we want set HDFS name quota or space quota for each user to limit the increase is impossible.

So I propose to add user sub directories under this app staging directory which is more clear.

existing UT

Closes #23786 from liupc/Support-user-level-app-staging-dir.

Authored-by: Liupengcheng <liupengcheng@xiaomi.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-20 11:45:17 -08:00
Maxim Gekk 331ac60f28 [SPARK-26900][SQL] Simplify truncation to quarter of year
## What changes were proposed in this pull request?

In the PR, I propose to simplify timestamp truncation to quarter of year by using *java.time* API directly. The `LocalDate` instance can be truncation to quarter timestamp via adjusting by chrono field `IsoFields.DAY_OF_QUARTER`.

## How was this patch tested?

This was checked by existing test suite - `DateTimeUtilsSuite`.

Closes #23808 from MaxGekk/date-quarter-of-year.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-20 08:55:08 -06:00
Huaxin Gao 74e9e1c192 [SPARK-22798][PYTHON][ML] Add multiple column support to PySpark StringIndexer
## What changes were proposed in this pull request?

Add multiple column support to PySpark StringIndexer

## How was this patch tested?

Add doctest

Closes #23741 from huaxingao/spark-22798.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-20 08:52:46 -06:00
Ivan Vergiliev 096552ae4d [SPARK-26859][SQL] Fix field writer index bug in non-vectorized ORC deserializer
## What changes were proposed in this pull request?

This happens in a schema evolution use case only when a user specifies the schema manually and use non-vectorized ORC deserializer code path.

There is a bug in `OrcDeserializer.scala` that results in `null`s being set at the wrong column position, and for state from previous records to remain uncleared in next records. There are more details for when exactly the bug gets triggered and what the outcome is in the [JIRA issue](https://jira.apache.org/jira/browse/SPARK-26859).

The high-level summary is that this bug results in severe data correctness issues, but fortunately the set of conditions to expose the bug are complicated and make the surface area somewhat small.

This change fixes the problem and adds a respective test.

## How was this patch tested?

Pass the Jenkins with the newly added test cases.

Closes #23766 from IvanVergiliev/fix-orc-deserializer.

Lead-authored-by: Ivan Vergiliev <ivan.vergiliev@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-02-20 21:49:38 +08:00
Dongjoon Hyun f87153a3ac [SPARK-26916][SS] Upgrade to Kafka 2.1.1
## What changes were proposed in this pull request?

As a part of preparing the official JDK 11 support ([SPARK-24417](https://issues.apache.org/jira/browse/SPARK-24417)), Spark 3.0.0 upgraded KAFKA version to 2.1.0. This PR updates Kafka dependency to 2.1.1 to bring the following 42 bug fixes.
- https://issues.apache.org/jira/projects/KAFKA/versions/12344250

## How was this patch tested?

Pass the Jenkins with the existing tests.

Closes #23831 from dongjoon-hyun/SPARK-26916.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-02-19 20:29:11 -08:00
Hyukjin Kwon 3c15d8b71c [SPARK-26762][SQL][R] Arrow optimization for conversion from Spark DataFrame to R DataFrame
## What changes were proposed in this pull request?

This PR targets to support Arrow optimization for conversion from Spark DataFrame to R DataFrame.
Like PySpark side, it falls back to non-optimization code path when it's unable to use Arrow optimization.

This can be tested as below:

```bash
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```

```r
collect(createDataFrame(mtcars))
```

### Requirements
  - R 3.5.x
  - Arrow package 0.12+
    ```bash
    Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
    ```

**Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
**Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.

### Benchmarks

**Shall**

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false --driver-memory 4g
```

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true --driver-memory 4g
```

**R code**

```r
df <- cache(createDataFrame(read.csv("500000.csv")))
count(df)

test <- function() {
  options(digits.secs = 6) # milliseconds
  start.time <- Sys.time()
  collect(df)
  end.time <- Sys.time()
  time.taken <- end.time - start.time
  print(time.taken)
}

test()
```

**Data (350 MB):**

```r
object.size(read.csv("500000.csv"))
350379504 bytes
```

"500000 Records"  http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/

**Results**

```
Time difference of 221.32014 secs
```

```
Time difference of 15.51145 secs
```

The performance improvement was around **1426%**.

### Limitations:

- For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values. In this case, we decide to fall back to non-optimization code path.

- Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.

## How was this patch tested?

Existing tests related with Arrow optimization cover this change. Also, manually tested.

Closes #23760 from HyukjinKwon/SPARK-26762.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-02-20 11:35:17 +08:00
Hyukjin Kwon ab850c02f7 [SPARK-26901][SQL][R] Adds child's output into references to avoid column-pruning for vectorized gapply()
## What changes were proposed in this pull request?

Currently, looks column pruning is done to vectorized `gapply()`. Given R native function could use all referred fields so it shouldn't be pruned. To avoid this, it adds child's output into `references` like `OutputConsumer`.

```
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```

```r
df <- createDataFrame(mtcars)
explain(count(groupBy(gapply(df,
                             "gear",
                             function(key, group) {
                               data.frame(gear = key[[1]], disp = mean(group$disp))
                             },
                             structType("gear double, disp double")))), TRUE)
```

**Before:**

```
== Optimized Logical Plan ==
Aggregate [count(1) AS count#41L]
+- Project
   +- FlatMapGroupsInRWithArrow [...]
      +- Project [gear#9]
         +- LogicalRDD [mpg#0, cyl#1, disp#2, hp#3, drat#4, wt#5, qsec#6, vs#7, am#8, gear#9, carb#10], false

== Physical Plan ==
*(4) HashAggregate(keys=[], functions=[count(1)], output=[count#41L])
+- Exchange SinglePartition
   +- *(3) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#44L])
      +- *(3) Project
         +- FlatMapGroupsInRWithArrow [...]
            +- *(2) Sort [gear#9 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(gear#9, 200)
                  +- *(1) Project [gear#9]
                     +- *(1) Scan ExistingRDD arrow[mpg#0,cyl#1,disp#2,hp#3,drat#4,wt#5,qsec#6,vs#7,am#8,gear#9,carb#10]
```

**After:**

```
== Optimized Logical Plan ==
Aggregate [count(1) AS count#91L]
+- Project
   +- FlatMapGroupsInRWithArrow [...]
      +- LogicalRDD [mpg#0, cyl#1, disp#2, hp#3, drat#4, wt#5, qsec#6, vs#7, am#8, gear#9, carb#10], false

== Physical Plan ==
*(4) HashAggregate(keys=[], functions=[count(1)], output=[count#91L])
+- Exchange SinglePartition
   +- *(3) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#94L])
      +- *(3) Project
         +- FlatMapGroupsInRWithArrow [...]
            +- *(2) Sort [gear#9 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(gear#9, 200)
                  +- *(1) Scan ExistingRDD arrow[mpg#0,cyl#1,disp#2,hp#3,drat#4,wt#5,qsec#6,vs#7,am#8,gear#9,carb#10]
```

Currently, it adds corrupt values for missing columns (via pruned columnar batches to Arrow writers that requires non-pruned columns) such as:

```r
...
  c(7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 7.90505033345994e-323, 0, 0, 4.17777978645388e-314)
  c(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1.04669129845114e+219)
  c(3.4482690635875e-313, 3.4482690635875e-313, 3.4482690635875e-313,
  c(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2.47032822920623e-323)
...
```

which should be something like:

```r
...
  c(4, 4, 1, 2, 2, 4, 4, 1, 2, 1, 1, 2)
  c(26, 30.4, 15.8, 19.7, 15)
  c(4, 4, 8, 6, 8)
  c(120.3, 95.1, 351, 145, 301)
...
```

## How was this patch tested?

Manually tested, and unit tests were added.

The test code is basiaclly:

```r
df <- createDataFrame(mtcars)
count(gapply(df,
             c("gear"),
             function(key, group) {
                stopifnot(all(group$hp > 50))
                group
             },
             schema(df)))
```

`mtcars`'s hp is all more then 50.

```r
> mtcars$hp > 50
 [1] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE
[16] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE
[31] TRUE TRUE
```

However, due to corrpt value, (like 0 or 7.xxxxx), werid values were found. So, it's currently being failed as below in the master:

```
Error in handleErrors(returnStatus, conn) :
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 82 in stage 1.0 failed 1 times, most recent failure: Lost task 82.0 in stage 1.0 (TID 198, localhost, executor driver): org.apache.spark.SparkException: R worker exited unexpectedly (crashed)
 Error in computeFunc(key, inputData) : all(group$hp > 50) is not TRUE
Error in computeFunc(key, inputData) : all(group$hp > 50) is not TRUE
Error in computeFunc(key, inputData) : all(group$hp > 50) is not TRUE
```

I also compared the total length while I am here. Regular `gapply` without Arrow has some holes .. so I had to compare the results with R data frame.

Closes #23810 from HyukjinKwon/SPARK-26901.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-02-20 10:24:40 +08:00
Marcelo Vanzin 61c3cdc706 [SPARK-24894][K8S] Make sure valid host names are created for executors.
Since the host name is derived from the app name, which can contain arbitrary
characters, it needs to be sanitized so that only valid characters are allowed.

On top of that, take extra care that truncation doesn't leave characters that
are valid except at the start of a host name.

Closes #23781 from vanzin/SPARK-24894.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-19 15:19:59 -08:00
Holden Karau 6b3c832dac [SPARK-26882] Check the Kubernetes integration tests scalatyle
## What changes were proposed in this pull request?

Add the kubernetes integration tests to the scalastyle profiles.

## How was this patch tested?

Run ./dev/scalastyle with a bad change manually

## Follow on work

See SPARK-26898 to add scalastyle for k8s integration to the CI

Closes #23792 from holdenk/SPARK-26882-check-k8s-integration-tests-when-linting.

Authored-by: Holden Karau <holden@pigscanfly.ca>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-19 13:49:47 -08:00
“attilapiros” e4e4e2b842 [SPARK-26891][YARN] Fixing flaky test in YarnSchedulerBackendSuite
The test "RequestExecutors reflects node blacklist and is serializable" is flaky because of multi threaded access of the mock task scheduler. For details check [Mockito FAQ (occasional exceptions like: WrongTypeOfReturnValue)](https://github.com/mockito/mockito/wiki/FAQ#is-mockito-thread-safe). So instead of mocking the task scheduler in the test TaskSchedulerImpl is simply subclassed.

This multithreaded access of the `nodeBlacklist()` method is coming from:
1) the unit test thread via calling of the method `prepareRequestExecutors()`
2) the `DriverEndpoint.onStart` which runs a periodic task that ends up calling this method

Existing unittest.

Closes #23801 from attilapiros/SPARK-26891.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-19 13:29:42 -08:00