## What changes were proposed in this pull request?
`UnsafeKVExternalSorter` uses `UnsafeInMemorySorter` to sort the records of `BytesToBytesMap` if it is given a map.
Currently we use the number of keys in `BytesToBytesMap` to determine if the array used for sort is enough or not. We has an assert that ensures the size of the array is enough: `map.numKeys() <= map.getArray().size() / 2`.
However, each record in the map takes two entries in the array, one is record pointer, another is key prefix. So the correct assert should be `map.numKeys() * 2 <= map.getArray().size() / 2`.
## How was this patch tested?
N/A
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16232 from viirya/SPARK-18800-fix-UnsafeKVExternalSorter.
## What changes were proposed in this pull request?
Statistics in LogicalPlan should use attributes to refer to columns rather than column names, because two columns from two relations can have the same column name. But CatalogTable doesn't have the concepts of attribute or broadcast hint in Statistics. Therefore, putting Statistics in CatalogTable is confusing.
We define a different statistic structure in CatalogTable, which is only responsible for interacting with metastore, and is converted to statistics in LogicalPlan when it is used.
## How was this patch tested?
add test cases
Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#16323 from wzhfy/nameToAttr.
## What changes were proposed in this pull request?
The time complexity of ConcurrentHashMap's `remove` is O(1). Changing ContextCleaner.referenceBuffer's type from `ConcurrentLinkedQueue` to `ConcurrentHashMap's` will make the removal much faster.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16390 from zsxwing/SPARK-18991.
fails on big endian. Only change byte order on little endian
## What changes were proposed in this pull request?
Fix test to only change byte order on LE platforms
## How was this patch tested?
Test run on Big Endian and Little Endian platforms
Author: Pete Robbins <robbinspg@gmail.com>
Closes#16375 from robbinspg/SPARK-18963.
## What changes were proposed in this pull request?
It would make it easier to integrate with other component expecting row-based JSON format.
This replaces the non-public toJSON RDD API.
## How was this patch tested?
manual, unit tests
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes#16368 from felixcheung/rJSON.
## What changes were proposed in this pull request?
Right now the name of threads created by Netty for Spark RPC are `shuffle-client-**` and `shuffle-server-**`. It's pretty confusing.
This PR just uses the module name in TransportConf to set the thread name. In addition, it also includes the following minor fixes:
- TransportChannelHandler.channelActive and channelInactive should call the corresponding super methods.
- Make ShuffleBlockFetcherIterator throw NoSuchElementException if it has no more elements. Otherwise, if the caller calls `next` without `hasNext`, it will just hang.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16380 from zsxwing/SPARK-18972.
## What changes were proposed in this pull request?
Add missing InterfaceStability.Evolving for Structured Streaming APIs
## How was this patch tested?
Compiling the codes.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16385 from zsxwing/SPARK-18985.
## What changes were proposed in this pull request?
This PR is an inheritance from #16000, and is a completion of #15904.
**Description**
- Augment the `org.apache.spark.status.api.v1` package for serving streaming information.
- Retrieve the streaming information through StreamingJobProgressListener.
> this api should cover exceptly the same amount of information as you can get from the web interface
> the implementation is base on the current REST implementation of spark-core
> and will be available for running applications only
>
> https://issues.apache.org/jira/browse/SPARK-18537
## How was this patch tested?
Local test.
Author: saturday_s <shi.indetail@gmail.com>
Author: Chan Chor Pang <ChorPang.Chan@access-company.com>
Author: peterCPChan <universknight@gmail.com>
Closes#16253 from saturday-shi/SPARK-18537.
## What changes were proposed in this pull request?
In current Spark we could add customized SparkListener through `SparkContext#addListener` API, but there's no equivalent API to remove the registered one. In our scenario SparkListener will be added repeatedly accordingly to the changed environment. If lacks the ability to remove listeners, there might be many registered listeners finally, this is unnecessary and potentially affects the performance. So here propose to add an API to remove registered listener.
## How was this patch tested?
Add an unit test to verify it.
Author: jerryshao <sshao@hortonworks.com>
Closes#16382 from jerryshao/SPARK-18975.
## What changes were proposed in this pull request?
SortPartitions and RedistributeData logical operators are not actually used and can be removed. Note that we do have a Sort operator (with global flag false) that subsumed SortPartitions.
## How was this patch tested?
Also updated test cases to reflect the removal.
Author: Reynold Xin <rxin@databricks.com>
Closes#16381 from rxin/SPARK-18973.
## What changes were proposed in this pull request?
This PR cleans up duplicated checking for file paths in implemented data sources and prevent to attempt to list twice in ORC data source.
https://github.com/apache/spark/pull/14585 handles a problem for the partition column name having `_` and the issue itself is resolved correctly. However, it seems the data sources implementing `FileFormat` are validating the paths duplicately. Assuming from the comment in `CSVFileFormat`, `// TODO: Move filtering.`, I guess we don't have to check this duplicately.
Currently, this seems being filtered in `PartitioningAwareFileIndex.shouldFilterOut` and`PartitioningAwareFileIndex.isDataPath`. So, `FileFormat.inferSchema` will always receive leaf files. For example, running to codes below:
``` scala
spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet")
spark.read.parquet("/tmp/parquet")
```
gives the paths below without directories but just valid data files:
``` bash
/tmp/parquet/_col=0/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet
/tmp/parquet/_col=1/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet
/tmp/parquet/_col=2/part-r-00000-25de2b50-225a-4bcf-a2bc-9eb9ed407ef6.snappy.parquet
...
```
to `FileFormat.inferSchema`.
## How was this patch tested?
Unit test added in `HadoopFsRelationTest` and related existing tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#14627 from HyukjinKwon/SPARK-16975.
## What changes were proposed in this pull request?
There are several tests failing due to resource-closing-related and path-related problems on Windows as below.
- `SQLQuerySuite`:
```
- specifying database name for a temporary table is not allowed *** FAILED *** (125 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-1f4471ab-aac0-4239-ae35-833d54b37e52;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:382)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
```
- `JsonSuite`:
```
- Loading a JSON dataset from a text file with SQL *** FAILED *** (94 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-c918a8b7-fc09-433c-b9d0-36c0f78ae918;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:382)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
```
- `StateStoreSuite`:
```
- SPARK-18342: commit fails when rename fails *** FAILED *** (16 milliseconds)
java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: StateStoreSuite29777261fs://C:%5Cprojects%5Cspark%5Ctarget%5Ctmp%5Cspark-ef349862-7281-4963-aaf3-add0d670a4ad%5C?????-2218c2f8-2cf6-4f80-9cdf-96354e8246a77685899733421033312/0
at org.apache.hadoop.fs.Path.initialize(Path.java:206)
at org.apache.hadoop.fs.Path.<init>(Path.java:116)
at org.apache.hadoop.fs.Path.<init>(Path.java:89)
...
Cause: java.net.URISyntaxException: Relative path in absolute URI: StateStoreSuite29777261fs://C:%5Cprojects%5Cspark%5Ctarget%5Ctmp%5Cspark-ef349862-7281-4963-aaf3-add0d670a4ad%5C?????-2218c2f8-2cf6-4f80-9cdf-96354e8246a77685899733421033312/0
at java.net.URI.checkPath(URI.java:1823)
at java.net.URI.<init>(URI.java:745)
at org.apache.hadoop.fs.Path.initialize(Path.java:203)
```
- `HDFSMetadataLogSuite`:
```
- FileManager: FileContextManager *** FAILED *** (94 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-415bb0bd-396b-444d-be82-04599e025f21
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010)
at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:127)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLogSuite.withTempDir(HDFSMetadataLogSuite.scala:38)
- FileManager: FileSystemManager *** FAILED *** (78 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-ef8222cd-85aa-47c0-a396-bc7979e15088
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010)
at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:127)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLogSuite.withTempDir(HDFSMetadataLogSuite.scala:38)
```
And, there are some tests being failed due to the length limitation on cmd in Windows as below:
- `LauncherBackendSuite`:
```
- local: launcher handle *** FAILED *** (30 seconds, 120 milliseconds)
The code passed to eventually never returned normally. Attempted 283 times over 30.0960053 seconds. Last failure message: The reference was null. (LauncherBackendSuite.scala:56)
org.scalatest.exceptions.TestFailedDueToTimeoutException:
at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
- standalone/client: launcher handle *** FAILED *** (30 seconds, 47 milliseconds)
The code passed to eventually never returned normally. Attempted 282 times over 30.037987100000002 seconds. Last failure message: The reference was null. (LauncherBackendSuite.scala:56)
org.scalatest.exceptions.TestFailedDueToTimeoutException:
at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
```
The executed command is, https://gist.github.com/HyukjinKwon/d3fdd2e694e5c022992838a618a516bd, which is 16K length; however, the length limitation is 8K on Windows. So, it is being failed to launch.
This PR proposes to fix the test failures on Windows and skip the tests failed due to the length limitation
## How was this patch tested?
Manually tested via AppVeyor
**Before**
`SQLQuerySuite `: https://ci.appveyor.com/project/spark-test/spark/build/306-pr-references
`JsonSuite`: https://ci.appveyor.com/project/spark-test/spark/build/307-pr-references
`StateStoreSuite` : https://ci.appveyor.com/project/spark-test/spark/build/305-pr-references
`HDFSMetadataLogSuite`: https://ci.appveyor.com/project/spark-test/spark/build/304-pr-references
`LauncherBackendSuite`: https://ci.appveyor.com/project/spark-test/spark/build/303-pr-references
**After**
`SQLQuerySuite`: https://ci.appveyor.com/project/spark-test/spark/build/293-SQLQuerySuite
`JsonSuite`: https://ci.appveyor.com/project/spark-test/spark/build/294-JsonSuite
`StateStoreSuite`: https://ci.appveyor.com/project/spark-test/spark/build/297-StateStoreSuite
`HDFSMetadataLogSuite`: https://ci.appveyor.com/project/spark-test/spark/build/319-pr-references
`LauncherBackendSuite`: failed test skipped.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16335 from HyukjinKwon/more-fixes-on-windows.
## What changes were proposed in this pull request?
Starting Spark 2.1.0, bucketing feature is available for all file-based data sources. This patch fixes some function docs that haven't yet been updated to reflect that.
## How was this patch tested?
N/A
Author: Reynold Xin <rxin@databricks.com>
Closes#16349 from rxin/ds-doc.
## What changes were proposed in this pull request?
This patch includes minor changes to improve readability for partition handling code. I'm in the middle of implementing some new feature and found some naming / implicit type inference not as intuitive.
## How was this patch tested?
This patch should have no semantic change and the changes should be covered by existing test cases.
Author: Reynold Xin <rxin@databricks.com>
Closes#16378 from rxin/minor-fix.
## What changes were proposed in this pull request?
This PR audits places using `logicalPlan` in StreamExecution and ensures they all handles the case that `logicalPlan` cannot be created.
In addition, this PR also fixes the following issues in `StreamingQueryException`:
- `StreamingQueryException` and `StreamExecution` are cycle-dependent because in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s `toDebugString` which uses `StreamingQueryException`. Hence it will output `null` value in the error message.
- Duplicated stack trace when calling Throwable.printStackTrace because StreamingQueryException's toString contains the stack trace.
## How was this patch tested?
The updated `test("max files per trigger - incorrect values")`. I found this issue when I switched from `testStream` to the real codes to verify the failure in this test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16322 from zsxwing/SPARK-18907.
## What changes were proposed in this pull request?
make-distribution.sh should find JAVA_HOME for Ubuntu, Mac and other non-RHEL systems
## How was this patch tested?
Manually
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes#16363 from felixcheung/buildjava.
## What changes were proposed in this pull request?
API for SparkUI URL from SparkContext
## How was this patch tested?
manual, unit tests
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes#16367 from felixcheung/rwebui.
## What changes were proposed in this pull request?
This pr is to fix an `NullPointerException` issue caused by a following `limit + aggregate` query;
```
scala> val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
scala> df.limit(2).groupBy("id").count().show
WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
```
The root culprit is that [`$doAgg()`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L596) skips an initialization of [the buffer iterator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L603); `BaseLimitExec` sets `stopEarly=true` and `$doAgg()` exits in the middle without the initialization.
## How was this patch tested?
Added a test to check if no exception happens for limit + aggregates in `DataFrameAggregateSuite.scala`.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#15980 from maropu/SPARK-18528.
## What changes were proposed in this pull request?
Made update mode public. As part of that here are the changes.
- Update DatastreamWriter to accept "update"
- Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst
- Added update mode state removing with watermark to StateStoreSaveExec
## How was this patch tested?
Added new tests in changed modules
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16360 from tdas/SPARK-18234.
Remove spark-tag's compile-scope dependency (and, indirectly, spark-core's compile-scope transitive-dependency) on scalatest by splitting test-oriented tags into spark-tags' test JAR.
Alternative to #16303.
Author: Ryan Williams <ryan.blake.williams@gmail.com>
Closes#16311 from ryan-williams/tt.
## What changes were proposed in this pull request?
When KafkaSource fails on Kafka errors, we should create a new consumer to retry rather than using the existing broken one because it's possible that the broken one will fail again.
This PR also assigns a new group id to the new created consumer for a possible race condition: the broken consumer cannot talk with the Kafka cluster in `close` but the new consumer can talk to Kafka cluster. I'm not sure if this will happen or not. Just for safety to avoid that the Kafka cluster thinks there are two consumers with the same group id in a short time window. (Note: CachedKafkaConsumer doesn't need this fix since `assign` never uses the group id.)
## How was this patch tested?
In https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/70370/console , it ran this flaky test 120 times and all passed.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16282 from zsxwing/kafka-fix.
## What changes were proposed in this pull request?
Currently, Spark writes a single file out per task, sometimes leading to very large files. It would be great to have an option to limit the max number of records written per file in a task, to avoid humongous files.
This patch introduces a new write config option `maxRecordsPerFile` (default to a session-wide setting `spark.sql.files.maxRecordsPerFile`) that limits the max number of records written to a single file. A non-positive value indicates there is no limit (same behavior as not having this flag).
## How was this patch tested?
Added test cases in PartitionedWriteSuite for both dynamic partition insert and non-dynamic partition insert.
Author: Reynold Xin <rxin@databricks.com>
Closes#16204 from rxin/SPARK-18775.
## What changes were proposed in this pull request?
The issue in this test is the cleanup of RDDs may not be able to finish before stopping StreamingContext. This PR basically just puts the assertions into `eventually` and runs it before stopping StreamingContext.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16362 from zsxwing/SPARK-18954.
## What changes were proposed in this pull request?
The failure is because in `test("basic functionality")`, it doesn't block until `ExecutorAllocationManager.manageAllocation` is called. This PR just adds StreamManualClock to allow the tests to block on expected wait time to make the test deterministic.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16321 from zsxwing/SPARK-18031.
## What changes were proposed in this pull request?
Two changes
- Fix how delays specified in months and years are translated to milliseconds
- Following up on #16258, not show watermark when there is no watermarking in the query
## How was this patch tested?
Updated and new unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16304 from tdas/SPARK-18834-1.
## What changes were proposed in this pull request?
I recently hit a bug of com.thoughtworks.paranamer/paranamer, which causes jackson fail to handle byte array defined in a case class. Then I find https://github.com/FasterXML/jackson-module-scala/issues/48, which suggests that it is caused by a bug in paranamer. Let's upgrade paranamer. Since we are using jackson 2.6.5 and jackson-module-paranamer 2.6.5 use com.thoughtworks.paranamer/paranamer 2.6, I suggests that we upgrade paranamer to 2.6.
Author: Yin Huai <yhuai@databricks.com>
Closes#16359 from yhuai/SPARK-18951.
## What changes were proposed in this pull request?
It's a huge waste to call `Catalog.listTables` in `SQLContext.tableNames`, which only need the table names, while `Catalog.listTables` will get the table metadata for each table name.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16352 from cloud-fan/minor.
## What changes were proposed in this pull request?
We can build Python API docs by `cd ./python/docs && make html for Python` and R API docs by `cd ./R && sh create-docs.sh for R` separately. However, `jekyll` fails in some environments.
This PR aims to support `SKIP_PYTHONDOC` and `SKIP_RDOC` for documentation build in `docs` folder. Currently, we can use `SKIP_SCALADOC` or `SKIP_API`. The reason providing additional options is that the Spark documentation build uses a number of tools to build HTML docs and API docs in Scala, Python and R. Specifically, for Python and R,
- Python API docs requires `sphinx`.
- R API docs requires `R` installation and `knitr` (and more others libraries).
In other words, we cannot generate Python API docs without R installation. Also, we cannot generate R API docs without Python `sphinx` installation. If Spark provides `SKIP_PYTHONDOC` and `SKIP_RDOC` like `SKIP_SCALADOC`, it would be more convenient.
## How was this patch tested?
Manual.
**Skipping Scala/Java/Python API Doc Build**
```bash
$ cd docs
$ SKIP_SCALADOC=1 SKIP_PYTHONDOC=1 jekyll build
$ ls api
DESCRIPTION R
```
**Skipping Scala/Java/R API Doc Build**
```bash
$ cd docs
$ SKIP_SCALADOC=1 SKIP_RDOC=1 jekyll build
$ ls api
python
```
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#16336 from dongjoon-hyun/SPARK-18923.
### What changes were proposed in this pull request?
Currently, we only have a SQL interface for recovering all the partitions in the directory of a table and update the catalog. `MSCK REPAIR TABLE` or `ALTER TABLE table RECOVER PARTITIONS`. (Actually, very hard for me to remember `MSCK` and have no clue what it means)
After the new "Scalable Partition Handling", the table repair becomes much more important for making visible the data in the created data source partitioned table.
Thus, this PR is to add it into the Catalog interface. After this PR, users can repair the table by
```Scala
spark.catalog.recoverPartitions("testTable")
```
### How was this patch tested?
Modified the existing test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16356 from gatorsmile/repairTable.
## What changes were proposed in this pull request?
It was pretty flaky before 10 days ago.
https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.execution.streaming.state.StateStoreSuite&test_name=maintenance
Since no code changes went into this code path to not be so flaky, I'm just increasing the timeouts such that load related flakiness shouldn't be a problem. As you may see from the testing, I haven't been able to reproduce it.
## How was this patch tested?
2000 retries 5 times
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#16314 from brkyvz/maint-flaky.
## What changes were proposed in this pull request?
Adds basic TaskContext information to PySpark.
## How was this patch tested?
New unit tests to `tests.py` & existing unit tests.
Author: Holden Karau <holden@us.ibm.com>
Closes#16211 from holdenk/SPARK-18576-pyspark-taskcontext.
## What changes were proposed in this pull request?
Checkpoint Location can be defined for a StructuredStreaming on a per-query basis by the `DataStreamWriter` options, but it can also be provided through SparkSession configurations. It should be able to recover in both cases when the OutputMode is Complete for MemorySinks.
## How was this patch tested?
Unit tests
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#16342 from brkyvz/chk-rec.
## What changes were proposed in this pull request?
There is a timeout failure when using `rdd.toLocalIterator()` or `df.toLocalIterator()` for a PySpark RDD and DataFrame:
df = spark.createDataFrame([[1],[2],[3]])
it = df.toLocalIterator()
row = next(it)
df2 = df.repartition(1000) # create many empty partitions which increase materialization time so causing timeout
it2 = df2.toLocalIterator()
row = next(it2)
The cause of this issue is, we open a socket to serve the data from JVM side. We set timeout for connection and reading through the socket in Python side. In Python we use a generator to read the data, so we only begin to connect the socket once we start to ask data from it. If we don't consume it immediately, there is connection timeout.
In the other side, the materialization time for RDD partitions is unpredictable. So we can't set a timeout for reading data through the socket. Otherwise, it is very possibly to fail.
## How was this patch tested?
Added tests into PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16263 from viirya/fix-pyspark-localiterator.
## What changes were proposed in this pull request?
When we append data to an existing table with `DataFrameWriter.saveAsTable`, we will do various checks to make sure the appended data is consistent with the existing data.
However, we get the information of the existing table by matching the table relation, instead of looking at the table metadata. This is error-prone, e.g. we only check the number of columns for `HadoopFsRelation`, we forget to check bucketing, etc.
This PR refactors the error checking by looking at the metadata of the existing table, and fix several bugs:
* SPARK-18899: We forget to check if the specified bucketing matched the existing table, which may lead to a problematic table that has different bucketing in different data files.
* SPARK-18912: We forget to check the number of columns for non-file-based data source table
* SPARK-18913: We don't support append data to a table with special column names.
## How was this patch tested?
new regression test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16313 from cloud-fan/bug1.
## What changes were proposed in this pull request?
Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks.
This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks.
This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details.
## How was this patch tested?
Tested via a new test case in `JobCancellationSuite`, plus manual testing.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#16189 from JoshRosen/cancellation.
## What changes were proposed in this pull request?
In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189).
This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.
## How was this patch tested?
Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#16340 from JoshRosen/sql-task-interruption.
## What changes were proposed in this pull request?
Right now we serialize the empty task metrics once per task – Since this is shared across all tasks we could use the same serialized task metrics across all tasks of a stage.
## How was this patch tested?
- [x] Run tests on EC2 to measure performance improvement
Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Closes#16261 from shivaram/task-metrics-one-copy.
## What changes were proposed in this pull request?
Currently `ImplicitTypeCasts` doesn't handle casts between `ArrayType`s, this is not convenient, we should add a rule to enable casting from `ArrayType(InternalType)` to `ArrayType(newInternalType)`.
Goals:
1. Add a rule to `ImplicitTypeCasts` to enable casting between `ArrayType`s;
2. Simplify `Percentile` and `ApproximatePercentile`.
## How was this patch tested?
Updated test cases in `TypeCoercionSuite`.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#16057 from jiangxb1987/implicit-cast-complex-types.
## What changes were proposed in this pull request?
It's weird that we use `Hive.getDatabase` to check the existence of a database, while Hive has a `databaseExists` interface.
What's worse, `Hive.getDatabase` will produce an error message if the database doesn't exist, which is annoying when we only want to check the database existence.
This PR fixes this and use `Hive.databaseExists` to check database existence.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16332 from cloud-fan/minor.
## What changes were proposed in this pull request?
As the scenario describe in [SPARK-18700](https://issues.apache.org/jira/browse/SPARK-18700), when cachedDataSourceTables invalided, the coming few queries will fetch all FileStatus in listLeafFiles function. In the condition of table has many partitions, these jobs will occupy much memory of driver finally may cause driver OOM.
In this patch, add StripedLock for each table's relation in cache not for the whole cachedDataSourceTables, each table's load cache operation protected by it.
## How was this patch tested?
Add a multi-thread access table test in `PartitionedTablePerfStatsSuite` and check it only loading once using metrics in `HiveCatalogMetrics`
Author: xuanyuanking <xyliyuanjian@gmail.com>
Closes#16135 from xuanyuanking/SPARK-18700.
## What changes were proposed in this pull request?
According to request of Mr. Joseph Bradley , I did this update of my PR https://github.com/apache/spark/pull/15965 in order to eliminate the extrat fit() method.
jkbradley
## How was this patch tested?
Pass existing tests
Author: Zakaria_Hili <zakahili@gmail.com>
Author: HILI Zakaria <zakahili@gmail.com>
Closes#16295 from ZakariaHili/zakbranch.
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since https://github.com/apache/spark/pull/15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.
This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it more robust.
We can cache and read broadcast even it cannot fit in memory from this pull request.
Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
at scala.Option.map(Option.scala:146)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
at scala.Option.map(Option.scala:146)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
... 12 more
```
## How was this patch tested?
Add unit test
Author: Yuming Wang <wgyumg@gmail.com>
Closes#16252 from wangyum/SPARK-18827.
## What changes were proposed in this pull request?
There is an underlying integer overflow when create ChunkedByteBufferOutputStream in MemoryStore. This PR provide a check before cast.
## How was this patch tested?
add new unit test
Author: uncleGen <hustyugm@gmail.com>
Closes#15915 from uncleGen/SPARK-18485.
## What changes were proposed in this pull request?
There are several tests failing due to resource-closing-related and path-related problems on Windows as below.
- `RPackageUtilsSuite`:
```
- build an R package from a jar end to end *** FAILED *** (1 second, 625 milliseconds)
java.io.IOException: Unable to delete file: C:\projects\spark\target\tmp\1481729427517-0\a\dep2\d\dep2-d.jar
at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2279)
at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
- faulty R package shows documentation *** FAILED *** (359 milliseconds)
java.io.IOException: Unable to delete file: C:\projects\spark\target\tmp\1481729428970-0\dep1-c.jar
at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2279)
at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
- SparkR zipping works properly *** FAILED *** (47 milliseconds)
java.util.regex.PatternSyntaxException: Unknown character property name {r} near index 4
C:\projects\spark\target\tmp\1481729429282-0
^
at java.util.regex.Pattern.error(Pattern.java:1955)
at java.util.regex.Pattern.charPropertyNodeFor(Pattern.java:2781)
```
- `InputOutputMetricsSuite`:
```
- input metrics for old hadoop with coalesce *** FAILED *** (240 milliseconds)
java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
- input metrics with cache and coalesce *** FAILED *** (109 milliseconds)
java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
- input metrics for new Hadoop API with coalesce *** FAILED *** (0 milliseconds)
java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-9366ec94-dac7-4a5c-a74b-3e7594a692ab\test\InputOutputMetricsSuite.txt, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:462)
at org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:114)
- input metrics when reading text file *** FAILED *** (110 milliseconds)
java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
- input metrics on records read - simple *** FAILED *** (125 milliseconds)
java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
- input metrics on records read - more stages *** FAILED *** (110 milliseconds)
java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
- input metrics on records - New Hadoop API *** FAILED *** (16 milliseconds)
java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-3f10a1a4-7820-4772-b821-25fd7523bf6f\test\InputOutputMetricsSuite.txt, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:462)
at org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:114)
- input metrics on records read with cache *** FAILED *** (93 milliseconds)
java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
- input read/write and shuffle read/write metrics all line up *** FAILED *** (93 milliseconds)
java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
- input metrics with interleaved reads *** FAILED *** (0 milliseconds)
java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-2638d893-e89b-47ce-acd0-bbaeee78dd9b\InputOutputMetricsSuite_cart.txt, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:462)
at org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:114)
- input metrics with old CombineFileInputFormat *** FAILED *** (157 milliseconds)
17947 was not greater than or equal to 300000 (InputOutputMetricsSuite.scala:324)
org.scalatest.exceptions.TestFailedException:
at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
- input metrics with new CombineFileInputFormat *** FAILED *** (16 milliseconds)
java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-11920c08-19d8-4c7c-9fba-28ed72b79f80\test\InputOutputMetricsSuite.txt, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:462)
at org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:114)
```
- `ReplayListenerSuite`:
```
- End-to-end replay *** FAILED *** (121 milliseconds)
java.io.IOException: No FileSystem for scheme: C
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
- End-to-end replay with compression *** FAILED *** (516 milliseconds)
java.io.IOException: No FileSystem for scheme: C
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
```
- `EventLoggingListenerSuite`:
```
- End-to-end event logging *** FAILED *** (7 seconds, 435 milliseconds)
java.io.IOException: No FileSystem for scheme: C
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
- End-to-end event logging with compression *** FAILED *** (1 second)
java.io.IOException: No FileSystem for scheme: C
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
- Event log name *** FAILED *** (16 milliseconds)
"file:/[]base-dir/app1" did not equal "file:/[C:/]base-dir/app1" (EventLoggingListenerSuite.scala:123)
org.scalatest.exceptions.TestFailedException:
at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
```
This PR proposes to fix the test failures on Windows
## How was this patch tested?
Manually tested via AppVeyor
**Before**
`RPackageUtilsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/273-RPackageUtilsSuite-before
`InputOutputMetricsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/272-InputOutputMetricsSuite-before
`ReplayListenerSuite`: https://ci.appveyor.com/project/spark-test/spark/build/274-ReplayListenerSuite-before
`EventLoggingListenerSuite`: https://ci.appveyor.com/project/spark-test/spark/build/275-EventLoggingListenerSuite-before
**After**
`RPackageUtilsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/270-RPackageUtilsSuite
`InputOutputMetricsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/271-InputOutputMetricsSuite
`ReplayListenerSuite`: https://ci.appveyor.com/project/spark-test/spark/build/277-ReplayListenerSuite-after
`EventLoggingListenerSuite`: https://ci.appveyor.com/project/spark-test/spark/build/278-EventLoggingListenerSuite-after
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16305 from HyukjinKwon/RPackageUtilsSuite-InputOutputMetricsSuite.
## What changes were proposed in this pull request?
Merge two FileStreamSourceSuite files into one file.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16315 from zsxwing/FileStreamSourceSuite.