## What changes were proposed in this pull request?
when we append data to a existed partitioned datasource table, the InsertIntoHadoopFsRelationCommand.getCustomPartitionLocations currently
return the same location with Hive default, it should return None.
## How was this patch tested?
Author: windpiger <songjun@outlook.com>
Closes#16642 from windpiger/appendSchema.
## What changes were proposed in this pull request?
As I pointed out in https://github.com/apache/spark/pull/15807#issuecomment-259143655 , the current subexpression elimination framework has a problem, it always evaluates all common subexpressions at the beginning, even they are inside conditional expressions and may not be accessed.
Ideally we should implement it like scala lazy val, so we only evaluate it when it gets accessed at lease once. https://github.com/apache/spark/issues/15837 tries this approach, but it seems too complicated and may introduce performance regression.
This PR simply stops common subexpression elimination for conditional expressions, with some cleanup.
## How was this patch tested?
regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16659 from cloud-fan/codegen.
### What changes were proposed in this pull request?
It is weird to create Hive source tables when using InMemoryCatalog. We are unable to operate it. This PR is to block users to create Hive source tables.
### How was this patch tested?
Fixed the test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16587 from gatorsmile/blockHiveTable.
## What changes were proposed in this pull request?
This PR refactors CSV read path to be consistent with JSON data source. It makes the methods in classes have consistent arguments with JSON ones.
`UnivocityParser` and `JacksonParser`
``` scala
private[csv] class UnivocityParser(
schema: StructType,
requiredSchema: StructType,
options: CSVOptions) extends Logging {
...
def parse(input: String): Seq[InternalRow] = {
...
```
``` scala
class JacksonParser(
schema: StructType,
columnNameOfCorruptRecord: String,
options: JSONOptions) extends Logging {
...
def parse(input: String): Option[InternalRow] = {
...
```
These allow parsing an iterator (`String` to `InternalRow`) as below for both JSON and CSV:
```scala
iter.flatMap(parser.parse)
```
## How was this patch tested?
Existing tests should cover this.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16669 from HyukjinKwon/SPARK-16101-read.
## What changes were proposed in this pull request?
For data source tables, we will always reorder the specified table schema, or the query in CTAS, to put partition columns at the end. e.g. `CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)` will create a table with schema `<a, c, d, b>`
Hive serde tables don't have this problem before, because its CREATE TABLE syntax specifies data schema and partition schema individually.
However, after we unifed the CREATE TABLE syntax, Hive serde table also need to do the reorder. This PR puts the reorder logic in a analyzer rule, which works with both data source tables and Hive serde tables.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16655 from cloud-fan/schema.
## What changes were proposed in this pull request?
JDBC read is failing with NPE due to missing null value check for array data type if the source table has null values in the array type column. For null values Resultset.getArray() returns null.
This PR adds null safe check to the Resultset.getArray() value before invoking method on the Array object.
## How was this patch tested?
Updated the PostgresIntegration test suite to test null values. Ran docker integration tests on my laptop.
Author: sureshthalamati <suresh.thalamati@gmail.com>
Closes#15192 from sureshthalamati/jdbc_array_null_fix-SPARK-14536.
## What changes were proposed in this pull request?
This PR refactors CSV write path to be consistent with JSON data source.
This PR makes the methods in classes have consistent arguments with JSON ones.
- `UnivocityGenerator` and `JacksonGenerator`
``` scala
private[csv] class UnivocityGenerator(
schema: StructType,
writer: Writer,
options: CSVOptions = new CSVOptions(Map.empty[String, String])) {
...
def write ...
def close ...
def flush ...
```
``` scala
private[sql] class JacksonGenerator(
schema: StructType,
writer: Writer,
options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
...
def write ...
def close ...
def flush ...
```
- This PR also makes the classes put in together in a consistent manner with JSON.
- `CsvFileFormat`
``` scala
CsvFileFormat
CsvOutputWriter
```
- `JsonFileFormat`
``` scala
JsonFileFormat
JsonOutputWriter
```
## How was this patch tested?
Existing tests should cover this.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16496 from HyukjinKwon/SPARK-16101-write.
## What changes were proposed in this pull request?
There is a race condition when stopping StateStore which makes `StateStoreSuite.maintenance` flaky. `StateStore.stop` doesn't wait for the running task to finish, and an out-of-date task may fail `doMaintenance` and cancel the new task. Here is a reproducer: dde1b5b106
This PR adds MaintenanceTask to eliminate the race condition.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16627 from zsxwing/SPARK-19267.
## What changes were proposed in this pull request?
When we query a table with a filter on partitioned columns, we will push the partition filter to the metastore to get matched partitions directly.
In `HiveExternalCatalog.listPartitionsByFilter`, we assume the column names in partition filter are already normalized and we don't need to consider case sensitivity. However, `HiveTableScanExec` doesn't follow this assumption. This PR fixes it.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16647 from cloud-fan/bug.
## What changes were proposed in this pull request?
This PR refactors the code generation part to get data from `ColumnarVector` and `ColumnarBatch` by using a trait `ColumnarBatchScan` for ease of reuse. This is because this part will be reused by several components (e.g. parquet reader, Dataset.cache, and others) since `ColumnarBatch` will be first citizen.
This PR is a part of https://github.com/apache/spark/pull/15219. In advance, this PR makes the code generation for `ColumnarVector` and `ColumnarBatch` reuseable as a trait. In general, this is very useful for other components from the reuseability view, too.
## How was this patch tested?
tested existing test suites
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#15467 from kiszk/columnarrefactor.
## What changes were proposed in this pull request?
The initial shouldFilterOut() method invocation filter the root path name(table name in the intial call) and remove if it contains _. I moved the check one level below, so it first list files/directories in the given root path and then apply filter.
(Please fill in changes proposed in this fix)
## How was this patch tested?
Added new test case for this scenario
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: jayadevanmurali <jayadevan.m@tcs.com>
Author: jayadevan <jayadevan.m@tcs.com>
Closes#16635 from jayadevanmurali/branch-0.1-SPARK-19059.
## What changes were proposed in this pull request?
We have a table relation plan cache in `HiveMetastoreCatalog`, which caches a lot of things: file status, resolved data source, inferred schema, etc.
However, it doesn't make sense to limit this cache with hive support, we should move it to SQL core module so that users can use this cache without hive support.
It can also reduce the size of `HiveMetastoreCatalog`, so that it's easier to remove it eventually.
main changes:
1. move the table relation cache to `SessionCatalog`
2. `SessionCatalog.lookupRelation` will return `SimpleCatalogRelation` and the analyzer will convert it to `LogicalRelation` or `MetastoreRelation` later, then `HiveSessionCatalog` doesn't need to override `lookupRelation` anymore
3. `FindDataSourceTable` will read/write the table relation cache.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16621 from cloud-fan/plan-cache.
## What changes were proposed in this pull request?
We should call `StateStore.abort()` when there should be any error before the store is committed.
## How was this patch tested?
Manually.
Author: Liwei Lin <lwlin7@gmail.com>
Closes#16547 from lw-lin/append-filter.
## What changes were proposed in this pull request?
On CREATE/ALTER a view, it's no longer needed to generate a SQL text string from the LogicalPlan, instead we store the SQL query text、the output column names of the query plan, and current database to CatalogTable. Permanent views created by this approach can be resolved by current view resolution approach.
The main advantage includes:
1. If you update an underlying view, the current view also gets updated;
2. That gives us a change to get ride of SQL generation for operators.
Major changes of this PR:
1. Generate the view-specific properties(e.g. view default database, view query output column names) during permanent view creation and store them as properties in the CatalogTable;
2. Update the commands `CreateViewCommand` and `AlterViewAsCommand`, get rid of SQL generation from them.
## How was this patch tested?
Existing tests.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#16613 from jiangxb1987/view-write-path.
## What changes were proposed in this pull request?
Inserting data into Hive tables has its own implementation that is distinct from data sources: `InsertIntoHiveTable`, `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`.
Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep `InsertIntoHiveTable` to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to call `loadTable`/`loadPartition` at the end and remove `InsertIntoHiveTable`.
This PR removes `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`, and create a `HiveFileFormat` to implement the write logic. In the future, we should also implement the read logic in `HiveFileFormat`.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16517 from cloud-fan/insert-hive.
## What changes were proposed in this pull request?
Added outer_explode, outer_posexplode, outer_inline functions and expressions.
Some bug fixing in GenerateExec.scala for CollectionGenerator. Previously it was not correctly handling the case of outer with empty collections, only with nulls.
## How was this patch tested?
New tests added to GeneratorFunctionSuite
Author: Bogdan Raducanu <bogdan.rdc@gmail.com>
Closes#16608 from bogdanrdc/SPARK-13721.
## What changes were proposed in this pull request?
In append mode, we check whether the schema of the write is compatible with the schema of the existing data. It can be a significant performance issue in cloud environment to find the existing schema for files. This patch removes the check.
Note that for catalog tables, we always do the check, as discussed in https://github.com/apache/spark/pull/16339#discussion_r96208357
## How was this patch tested?
N/A
Closes#16339.
Author: Reynold Xin <rxin@databricks.com>
Closes#16622 from rxin/SPARK-18917.
## What changes were proposed in this pull request?
`dropDuplicates` will create an Alias using the same exprId, so `StreamExecution` should also replace Alias if necessary.
## How was this patch tested?
test("SPARK-19065: dropDuplicates should not create expressions using the same id")
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16564 from zsxwing/SPARK-19065.
## What changes were proposed in this pull request?
This PR proposes to fix ambiguous link warnings by simply making them as code blocks for both javadoc and scaladoc.
```
[warn] .../spark/core/src/main/scala/org/apache/spark/Accumulator.scala:20: The link target "SparkContext#accumulator" is ambiguous. Several members fit the target:
[warn] .../spark/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala:281: The link target "runMiniBatchSGD" is ambiguous. Several members fit the target:
[warn] .../spark/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala:83: The link target "run" is ambiguous. Several members fit the target:
...
```
This PR also fixes javadoc8 break as below:
```
[error] .../spark/sql/core/target/java/org/apache/spark/sql/LowPrioritySQLImplicits.java:7: error: reference not found
[error] * newProductEncoder - to disambiguate for {link List}s which are both {link Seq} and {link Product}
[error] ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/LowPrioritySQLImplicits.java:7: error: reference not found
[error] * newProductEncoder - to disambiguate for {link List}s which are both {link Seq} and {link Product}
[error] ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/LowPrioritySQLImplicits.java:7: error: reference not found
[error] * newProductEncoder - to disambiguate for {link List}s which are both {link Seq} and {link Product}
[error] ^
[info] 3 errors
```
## How was this patch tested?
Manually via `sbt unidoc > output.txt` and the checked it via `cat output.txt | grep ambiguous`
and `sbt unidoc | grep error`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16604 from HyukjinKwon/SPARK-3249.
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/16296 , we reached a consensus that we should hide the external/managed table concept to users and only expose custom table path.
This PR renames `Catalog.createExternalTable` to `createTable`(still keep the old versions for backward compatibility), and only set the table type to EXTERNAL if `path` is specified in options.
## How was this patch tested?
new tests in `CatalogSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16528 from cloud-fan/create-table.
## What changes were proposed in this pull request?
We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet:
1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html
2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too.
This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc.
Two main changes in this patch:
1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner
We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`.
2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator.
One thing to notice is:
We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out df9d8e4154/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java (L470). So this patch catches `RuntimeException`. One concern is that it might also shadow other runtime exceptions other than reading corrupt files.
## How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16474 from viirya/fix-ignorecorrupted-parquet-files.
### What changes were proposed in this pull request?
```Scala
sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")
// This table fetch is to fill the cache with zero leaf files
spark.table("tab").show()
sql(
s"""
|LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
|INTO TABLE tab
""".stripMargin)
spark.table("tab").show()
```
In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on.
This PR is to refresh the metadata cache after processing the `LOAD DATA` command.
In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not.
### How was this patch tested?
Added test cases in parquetSuites.scala
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16500 from gatorsmile/refreshInsertIntoHiveTable.
## What changes were proposed in this pull request?
After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19107), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support.
This PR implement:
DataFrameWriter.saveAsTable work with hive format with overwrite mode
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes#16549 from windpiger/saveAsTableWithHiveOverwrite.
## What changes were proposed in this pull request?
the offset of short is 4 in OffHeapColumnVector's putShorts, but actually it should be 2.
## How was this patch tested?
unit test
Author: Yucai Yu <yucai.yu@intel.com>
Closes#16555 from yucai/offheap_short.
Otherwise the open parentheses isn't closed in query plan descriptions of batch scans.
PushedFilters: [In(COL_A, [1,2,4,6,10,16,219,815], IsNotNull(COL_B), ...
Author: Andrew Ash <andrew@andrewash.com>
Closes#16558 from ash211/patch-9.
### What changes were proposed in this pull request?
`DataFrameWriter`'s [save() API](5d38f09f47/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala (L207)) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it.
The related PR: https://github.com/apache/spark/pull/16090
### How was this patch tested?
Updated the existing test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16481 from gatorsmile/saveFileScan.
## What changes were proposed in this pull request?
Currently in SQL we implement overwrites by calling fs.delete() directly on the original data. This is not ideal since we the original files end up deleted even if the job aborts. We should extend the commit protocol to allow file overwrites to be managed as well.
## How was this patch tested?
Existing tests. I also fixed a bunch of tests that were depending on the commit protocol implementation being set to the legacy mapreduce one.
cc rxin cloud-fan
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>
Closes#16554 from ericl/add-delete-protocol.
## What changes were proposed in this pull request?
This PR proposes to throw an exception for both jdbc APIs when user specified schemas are not allowed or useless.
**DataFrameReader.jdbc(...)**
``` scala
spark.read.schema(StructType(Nil)).jdbc(...)
```
**DataFrameReader.table(...)**
```scala
spark.read.schema(StructType(Nil)).table("usrdb.test")
```
## How was this patch tested?
Unit test in `JDBCSuite` and `DataFrameReaderWriterSuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#14451 from HyukjinKwon/SPARK-16848.
## What changes were proposed in this pull request?
We should be able to resolve a nested view. The main advantage is that if you update an underlying view, the current view also gets updated.
The new approach should be compatible with older versions of SPARK/HIVE, that means:
1. The new approach should be able to resolve the views that created by older versions of SPARK/HIVE;
2. The new approach should be able to resolve the views that are currently supported by SPARK SQL.
The new approach mainly brings in the following changes:
1. Add a new operator called `View` to keep track of the CatalogTable that describes the view, and the output attributes as well as the child of the view;
2. Update the `ResolveRelations` rule to resolve the relations and views, note that a nested view should be resolved correctly;
3. Add `viewDefaultDatabase` variable to `CatalogTable` to keep track of the default database name used to resolve a view, if the `CatalogTable` is not a view, then the variable should be `None`;
4. Add `AnalysisContext` to enable us to still support a view created with CTE/Windows query;
5. Enables the view support without enabling Hive support (i.e., enableHiveSupport);
6. Fix a weird behavior: the result of a view query may have different schema if the referenced table has been changed. After this PR, we try to cast the child output attributes to that from the view schema, throw an AnalysisException if cast is not allowed.
Note this is compatible with the views defined by older versions of Spark(before 2.2), which have empty `defaultDatabase` and all the relations in `viewText` have database part defined.
## How was this patch tested?
1. Add new tests in `SessionCatalogSuite` to test the function `lookupRelation`;
2. Add new test case in `SQLViewSuite` to test resolve a nested view.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#16233 from jiangxb1987/resolve-view.
## What changes were proposed in this pull request?
Currently we have two sets of statistics in LogicalPlan: a simple stats and a stats estimated by cbo, but the computing logic and naming are quite confusing, we need to unify these two sets of stats.
## How was this patch tested?
Just modify existing tests.
Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#16529 from wzhfy/unifyStats.
## What changes were proposed in this pull request?
The analyzer rule that supports to query files directly will be added to `Analyzer.extendedResolutionRules` when SparkSession is created, according to the `spark.sql.runSQLOnFiles` flag. If the flag is off when we create `SparkSession`, this rule is not added and we can not query files directly even we turn on the flag later.
This PR fixes this bug by always adding that rule to `Analyzer.extendedResolutionRules`.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16531 from cloud-fan/sql-on-files.
## What changes were proposed in this pull request?
This PR allow update mode for non-aggregation streaming queries. It will be same as the append mode if a query has no aggregations.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16520 from zsxwing/update-without-agg.
## What changes were proposed in this pull request?
To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However,
Scala `Iterator.duplicate` uses a **queue to buffer all items between both iterators**,
this causes GC and hangs for queries with large number of rows. We should not use this,
especially for `spark.sql.thriftServer.incrementalCollect`.
https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300
## How was this patch tested?
Pass the existing tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#16440 from dongjoon-hyun/SPARK-18857.
## What changes were proposed in this pull request?
This PR proposes to fix all the test failures identified by testing with AppVeyor.
**Scala - aborted tests**
```
WindowQuerySuite:
Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.execution.WindowQuerySuite *** ABORTED *** (156 milliseconds)
org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: C:projectssparksqlhive argetscala-2.11 est-classesdatafilespart_tiny.txt;
OrcSourceSuite:
Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.orc.OrcSourceSuite *** ABORTED *** (62 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
ParquetMetastoreSuite:
Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.ParquetMetastoreSuite *** ABORTED *** (4 seconds, 703 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
ParquetSourceSuite:
Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.ParquetSourceSuite *** ABORTED *** (3 seconds, 907 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-581a6575-454f-4f21-a516-a07f95266143;
KafkaRDDSuite:
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka.KafkaRDDSuite *** ABORTED *** (5 seconds, 212 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-4722304d-213e-4296-b556-951df1a46807
DirectKafkaStreamSuite:
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka.DirectKafkaStreamSuite *** ABORTED *** (7 seconds, 127 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-d0d3eba7-4215-4e10-b40e-bb797e89338e
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010)
ReliableKafkaStreamSuite
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka.ReliableKafkaStreamSuite *** ABORTED *** (5 seconds, 498 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-d33e45a0-287e-4bed-acae-ca809a89d888
KafkaStreamSuite:
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka.KafkaStreamSuite *** ABORTED *** (2 seconds, 892 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-59c9d169-5a56-4519-9ef0-cefdbd3f2e6c
KafkaClusterSuite:
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka.KafkaClusterSuite *** ABORTED *** (1 second, 690 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-3ef402b0-8689-4a60-85ae-e41e274f179d
DirectKafkaStreamSuite:
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka010.DirectKafkaStreamSuite *** ABORTED *** (59 seconds, 626 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-426107da-68cf-4d94-b0d6-1f428f1c53f6
KafkaRDDSuite:
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka010.KafkaRDDSuite *** ABORTED *** (2 minutes, 6 seconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-b9ce7929-5dae-46ab-a0c4-9ef6f58fbc2
```
**Java - failed tests**
```
Test org.apache.spark.streaming.kafka.JavaKafkaRDDSuite.testKafkaRDD failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-1cee32f4-4390-4321-82c9-e8616b3f0fb0, took 9.61 sec
Test org.apache.spark.streaming.kafka.JavaKafkaStreamSuite.testKafkaStream failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-f42695dd-242e-4b07-847c-f299b8e4676e, took 11.797 sec
Test org.apache.spark.streaming.kafka.JavaDirectKafkaStreamSuite.testKafkaStream failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-85c0d062-78cf-459c-a2dd-7973572101ce, took 1.581 sec
Test org.apache.spark.streaming.kafka010.JavaKafkaRDDSuite.testKafkaRDD failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-49eb6b5c-8366-47a6-83f2-80c443c48280, took 17.895 sec
org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite.testKafkaStream failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-898cf826-d636-4b1c-a61a-c12a364c02e7, took 8.858 sec
```
**Scala - failed tests**
```
PartitionProviderCompatibilitySuite:
- insert overwrite partition of new datasource table overwrites just partition *** FAILED *** (828 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-bb6337b9-4f99-45ab-ad2c-a787ab965c09
- SPARK-18635 special chars in partition values - partition management true *** FAILED *** (5 seconds, 360 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- SPARK-18635 special chars in partition values - partition management false *** FAILED *** (141 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```
```
UtilsSuite:
- reading offset bytes of a file (compressed) *** FAILED *** (0 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-ecb2b7d5-db8b-43a7-b268-1bf242b5a491
- reading offset bytes across multiple files (compressed) *** FAILED *** (0 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-25cc47a8-1faa-4da5-8862-cf174df63ce0
```
```
StatisticsSuite:
- MetastoreRelations fallback to HDFS for size estimation *** FAILED *** (110 milliseconds)
org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'csv_table' not found in database 'default';
```
```
SQLQuerySuite:
- permanent UDTF *** FAILED *** (125 milliseconds)
org.apache.spark.sql.AnalysisException: Undefined function: 'udtf_count_temp'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 24
- describe functions - user defined functions *** FAILED *** (125 milliseconds)
org.apache.spark.sql.AnalysisException: Undefined function: 'udtf_count'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7
- CTAS without serde with location *** FAILED *** (16 milliseconds)
java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:C:projectsspark%09arget%09mpspark-ed673d73-edfc-404e-829e-2e2b9725d94e/c1
- derived from Hive query file: drop_database_removes_partition_dirs.q *** FAILED *** (47 milliseconds)
java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:C:projectsspark%09arget%09mpspark-d2ddf08e-699e-45be-9ebd-3dfe619680fe/drop_database_removes_partition_dirs_table
- derived from Hive query file: drop_table_removes_partition_dirs.q *** FAILED *** (0 milliseconds)
java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:C:projectsspark%09arget%09mpspark-d2ddf08e-699e-45be-9ebd-3dfe619680fe/drop_table_removes_partition_dirs_table2
- SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH *** FAILED *** (109 milliseconds)
java.nio.file.InvalidPathException: Illegal char <:> at index 2: /C:/projects/spark/sql/hive/projectsspark arget mpspark-1a122f8c-dfb3-46c4-bab1-f30764baee0e/*part-r*
```
```
HiveDDLSuite:
- drop external tables in default database *** FAILED *** (16 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- add/drop partitions - external table *** FAILED *** (16 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- create/drop database - location without pre-created directory *** FAILED *** (16 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- create/drop database - location with pre-created directory *** FAILED *** (32 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- drop database containing tables - CASCADE *** FAILED *** (94 milliseconds)
CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map()) did not equal CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map()) (HiveDDLSuite.scala:675)
- drop an empty database - CASCADE *** FAILED *** (63 milliseconds)
CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map()) did not equal CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map()) (HiveDDLSuite.scala:675)
- drop database containing tables - RESTRICT *** FAILED *** (47 milliseconds)
CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map()) did not equal CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map()) (HiveDDLSuite.scala:675)
- drop an empty database - RESTRICT *** FAILED *** (47 milliseconds)
CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map()) did not equal CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map()) (HiveDDLSuite.scala:675)
- CREATE TABLE LIKE an external data source table *** FAILED *** (140 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-c5eba16d-07ae-4186-95bb-21c5811cf888;
- CREATE TABLE LIKE an external Hive serde table *** FAILED *** (16 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- desc table for data source table - no user-defined schema *** FAILED *** (125 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-e8bf5bf5-721a-4cbe-9d6 at scala.collection.immutable.List.foreach(List.scala:381)d-5543a8301c1d;
```
```
MetastoreDataSourcesSuite
- CTAS: persisted bucketed data source table *** FAILED *** (16 milliseconds)
java.lang.IllegalArgumentException: Can not create a Path from an empty string
```
```
ShowCreateTableSuite:
- simple external hive table *** FAILED *** (0 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```
```
PartitionedTablePerfStatsSuite:
- hive table: partitioned pruned table reports only selected files *** FAILED *** (313 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- datasource table: partitioned pruned table reports only selected files *** FAILED *** (219 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-311f45f8-d064-4023-a4bb-e28235bff64d;
- hive table: lazy partition pruning reads only necessary partition data *** FAILED *** (203 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- datasource table: lazy partition pruning reads only necessary partition data *** FAILED *** (187 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-fde874ca-66bd-4d0b-a40f-a043b65bf957;
- hive table: lazy partition pruning with file status caching enabled *** FAILED *** (188 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- datasource table: lazy partition pruning with file status caching enabled *** FAILED *** (187 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-e6d20183-dd68-4145-acbe-4a509849accd;
- hive table: file status caching respects refresh table and refreshByPath *** FAILED *** (172 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- datasource table: file status caching respects refresh table and refreshByPath *** FAILED *** (203 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-8b2c9651-2adf-4d58-874f-659007e21463;
- hive table: file status cache respects size limit *** FAILED *** (219 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- datasource table: file status cache respects size limit *** FAILED *** (171 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-7835ab57-cb48-4d2c-bb1d-b46d5a4c47e4;
- datasource table: table setup does not scan filesystem *** FAILED *** (266 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-20598d76-c004-42a7-8061-6c56f0eda5e2;
- hive table: table setup does not scan filesystem *** FAILED *** (266 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- hive table: num hive client calls does not scale with partition count *** FAILED *** (2 seconds, 281 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- datasource table: num hive client calls does not scale with partition count *** FAILED *** (2 seconds, 422 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-4cfed321-4d1d-4b48-8d34-5c169afff383;
- hive table: files read and cached when filesource partition management is off *** FAILED *** (234 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- datasource table: all partition data cached in memory when partition management is off *** FAILED *** (203 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-4bcc0398-15c9-4f6a-811e-12d40f3eec12;
- SPARK-18700: table loaded only once even when resolved concurrently *** FAILED *** (1 second, 266 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```
```
HiveSparkSubmitSuite:
- temporary Hive UDF: define a UDF and use it *** FAILED *** (2 seconds, 94 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- permanent Hive UDF: define a UDF and use it *** FAILED *** (281 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- permanent Hive UDF: use a already defined permanent function *** FAILED *** (718 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- SPARK-8368: includes jars passed in through --jars *** FAILED *** (3 seconds, 521 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- SPARK-8020: set sql conf in spark conf *** FAILED *** (0 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- SPARK-8489: MissingRequirementError during reflection *** FAILED *** (94 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- SPARK-9757 Persist Parquet relation with decimal column *** FAILED *** (16 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- SPARK-11009 fix wrong result of Window function in cluster mode *** FAILED *** (16 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- SPARK-14244 fix window partition size attribute binding failure *** FAILED *** (78 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- set spark.sql.warehouse.dir *** FAILED *** (16 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- set hive.metastore.warehouse.dir *** FAILED *** (15 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- SPARK-16901: set javax.jdo.option.ConnectionURL *** FAILED *** (16 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
- SPARK-18360: default table path of tables in default database should depend on the location of default database *** FAILED *** (15 milliseconds)
java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
```
```
UtilsSuite:
- resolveURIs with multiple paths *** FAILED *** (0 milliseconds)
".../jar3,file:/C:/pi.py[%23]py.pi,file:/C:/path%..." did not equal ".../jar3,file:/C:/pi.py[#]py.pi,file:/C:/path%..." (UtilsSuite.scala:468)
```
```
CheckpointSuite:
- recovery with file input stream *** FAILED *** (10 seconds, 205 milliseconds)
The code passed to eventually never returned normally. Attempted 660 times over 10.014272499999999 seconds. Last failure message: Unexpected internal error near index 1
\
^. (CheckpointSuite.scala:680)
```
## How was this patch tested?
Manually via AppVeyor as below:
**Scala - aborted tests**
```
WindowQuerySuite - all passed
OrcSourceSuite:
- SPARK-18220: read Hive orc table with varchar column *** FAILED *** (4 seconds, 417 milliseconds)
org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:625)
ParquetMetastoreSuite - all passed
ParquetSourceSuite - all passed
KafkaRDDSuite - all passed
DirectKafkaStreamSuite - all passed
ReliableKafkaStreamSuite - all passed
KafkaStreamSuite - all passed
KafkaClusterSuite - all passed
DirectKafkaStreamSuite - all passed
KafkaRDDSuite - all passed
```
**Java - failed tests**
```
org.apache.spark.streaming.kafka.JavaKafkaRDDSuite - all passed
org.apache.spark.streaming.kafka.JavaDirectKafkaStreamSuite - all passed
org.apache.spark.streaming.kafka.JavaKafkaStreamSuite - all passed
org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite - all passed
org.apache.spark.streaming.kafka010.JavaKafkaRDDSuite - all passed
```
**Scala - failed tests**
```
PartitionProviderCompatibilitySuite:
- insert overwrite partition of new datasource table overwrites just partition (1 second, 953 milliseconds)
- SPARK-18635 special chars in partition values - partition management true (6 seconds, 31 milliseconds)
- SPARK-18635 special chars in partition values - partition management false (4 seconds, 578 milliseconds)
```
```
UtilsSuite:
- reading offset bytes of a file (compressed) (203 milliseconds)
- reading offset bytes across multiple files (compressed) (0 milliseconds)
```
```
StatisticsSuite:
- MetastoreRelations fallback to HDFS for size estimation (94 milliseconds)
```
```
SQLQuerySuite:
- permanent UDTF (407 milliseconds)
- describe functions - user defined functions (441 milliseconds)
- CTAS without serde with location (2 seconds, 831 milliseconds)
- derived from Hive query file: drop_database_removes_partition_dirs.q (734 milliseconds)
- derived from Hive query file: drop_table_removes_partition_dirs.q (563 milliseconds)
- SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH (453 milliseconds)
```
```
HiveDDLSuite:
- drop external tables in default database (3 seconds, 5 milliseconds)
- add/drop partitions - external table (2 seconds, 750 milliseconds)
- create/drop database - location without pre-created directory (500 milliseconds)
- create/drop database - location with pre-created directory (407 milliseconds)
- drop database containing tables - CASCADE (453 milliseconds)
- drop an empty database - CASCADE (375 milliseconds)
- drop database containing tables - RESTRICT (328 milliseconds)
- drop an empty database - RESTRICT (391 milliseconds)
- CREATE TABLE LIKE an external data source table (953 milliseconds)
- CREATE TABLE LIKE an external Hive serde table (3 seconds, 782 milliseconds)
- desc table for data source table - no user-defined schema (1 second, 150 milliseconds)
```
```
MetastoreDataSourcesSuite
- CTAS: persisted bucketed data source table (875 milliseconds)
```
```
ShowCreateTableSuite:
- simple external hive table (78 milliseconds)
```
```
PartitionedTablePerfStatsSuite:
- hive table: partitioned pruned table reports only selected files (1 second, 109 milliseconds)
- datasource table: partitioned pruned table reports only selected files (860 milliseconds)
- hive table: lazy partition pruning reads only necessary partition data (859 milliseconds)
- datasource table: lazy partition pruning reads only necessary partition data (1 second, 219 milliseconds)
- hive table: lazy partition pruning with file status caching enabled (875 milliseconds)
- datasource table: lazy partition pruning with file status caching enabled (890 milliseconds)
- hive table: file status caching respects refresh table and refreshByPath (922 milliseconds)
- datasource table: file status caching respects refresh table and refreshByPath (640 milliseconds)
- hive table: file status cache respects size limit (469 milliseconds)
- datasource table: file status cache respects size limit (453 milliseconds)
- datasource table: table setup does not scan filesystem (328 milliseconds)
- hive table: table setup does not scan filesystem (313 milliseconds)
- hive table: num hive client calls does not scale with partition count (5 seconds, 431 milliseconds)
- datasource table: num hive client calls does not scale with partition count (4 seconds, 79 milliseconds)
- hive table: files read and cached when filesource partition management is off (656 milliseconds)
- datasource table: all partition data cached in memory when partition management is off (484 milliseconds)
- SPARK-18700: table loaded only once even when resolved concurrently (2 seconds, 578 milliseconds)
```
```
HiveSparkSubmitSuite:
- temporary Hive UDF: define a UDF and use it (1 second, 745 milliseconds)
- permanent Hive UDF: define a UDF and use it (406 milliseconds)
- permanent Hive UDF: use a already defined permanent function (375 milliseconds)
- SPARK-8368: includes jars passed in through --jars (391 milliseconds)
- SPARK-8020: set sql conf in spark conf (156 milliseconds)
- SPARK-8489: MissingRequirementError during reflection (187 milliseconds)
- SPARK-9757 Persist Parquet relation with decimal column (157 milliseconds)
- SPARK-11009 fix wrong result of Window function in cluster mode (156 milliseconds)
- SPARK-14244 fix window partition size attribute binding failure (156 milliseconds)
- set spark.sql.warehouse.dir (172 milliseconds)
- set hive.metastore.warehouse.dir (156 milliseconds)
- SPARK-16901: set javax.jdo.option.ConnectionURL (157 milliseconds)
- SPARK-18360: default table path of tables in default database should depend on the location of default database (172 milliseconds)
```
```
UtilsSuite:
- resolveURIs with multiple paths (0 milliseconds)
```
```
CheckpointSuite:
- recovery with file input stream (4 seconds, 452 milliseconds)
```
Note: after resolving the aborted tests, there is a test failure identified as below:
```
OrcSourceSuite:
- SPARK-18220: read Hive orc table with varchar column *** FAILED *** (4 seconds, 417 milliseconds)
org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:625)
```
This does not look due to this problem so this PR does not fix it here.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16451 from HyukjinKwon/all-path-resource-fixes.
## What changes were proposed in this pull request?
After unifying the CREATE TABLE syntax in https://github.com/apache/spark/pull/16296, it's pretty easy to support creating hive table with `DataFrameWriter` and `Catalog` now.
This PR basically just removes the hive provider check in `DataFrameWriter.saveAsTable` and `Catalog.createExternalTable`, and add tests.
## How was this patch tested?
new tests in `HiveDDLSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16487 from cloud-fan/hive-table.
## What changes were proposed in this pull request?
If I use the function regexp_extract, and then in my regex string, use `\`, i.e. escape character, this fails codegen, because the `\` character is not properly escaped when codegen'd.
Example stack trace:
```
/* 059 */ private int maxSteps = 2;
/* 060 */ private int numRows = 0;
/* 061 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add("date_format(window#325.start, yyyy-MM-dd HH:mm)", org.apache.spark.sql.types.DataTypes.StringType)
/* 062 */ .add("regexp_extract(source#310.description, ([a-zA-Z]+)\[.*, 1)", org.apache.spark.sql.types.DataTypes.StringType);
/* 063 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add("sum", org.apache.spark.sql.types.DataTypes.LongType);
/* 064 */ private Object emptyVBase;
...
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 62, Column 58: Invalid escape sequence
at org.codehaus.janino.Scanner.scanLiteralCharacter(Scanner.java:918)
at org.codehaus.janino.Scanner.produce(Scanner.java:604)
at org.codehaus.janino.Parser.peekRead(Parser.java:3239)
at org.codehaus.janino.Parser.parseArguments(Parser.java:3055)
at org.codehaus.janino.Parser.parseSelector(Parser.java:2914)
at org.codehaus.janino.Parser.parseUnaryExpression(Parser.java:2617)
at org.codehaus.janino.Parser.parseMultiplicativeExpression(Parser.java:2573)
at org.codehaus.janino.Parser.parseAdditiveExpression(Parser.java:2552)
```
In the codegend expression, the literal should use `\\` instead of `\`
A similar problem was solved here: https://github.com/apache/spark/pull/15156.
## How was this patch tested?
Regression test in `DataFrameAggregationSuite`
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#16361 from brkyvz/reg-break.
## What changes were proposed in this pull request?
- [X] Make sure all join types are clearly mentioned
- [X] Make join labeling/style consistent
- [X] Make join label ordering docs the same
- [X] Improve join documentation according to above for Scala
- [X] Improve join documentation according to above for Python
- [X] Improve join documentation according to above for R
## How was this patch tested?
No tests b/c docs.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: anabranch <wac.chambers@gmail.com>
Closes#16504 from anabranch/SPARK-19126.
## What changes were proposed in this pull request?
- [X] Fix inconsistencies in function reference for dense rank and dense
- [X] Make all languages equivalent in their reference to `dense_rank` and `rank`.
## How was this patch tested?
N/A for docs.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: anabranch <wac.chambers@gmail.com>
Closes#16505 from anabranch/SPARK-19127.
## What changes were proposed in this pull request?
`OutputWriterFactory`/`OutputWriter` are internal interfaces and we can remove some unnecessary APIs:
1. `OutputWriterFactory.newWriter(path: String)`: no one calls it and no one implements it.
2. `OutputWriter.write(row: Row)`: during execution we only call `writeInternal`, which is weird as `OutputWriter` is already an internal interface. We should rename `writeInternal` to `write` and remove `def write(row: Row)` and it's related converter code. All implementations should just implement `def write(row: InternalRow)`
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16479 from cloud-fan/hive-writer.
## What changes were proposed in this pull request?
Added a `to` call at the end of the code generated by `ScalaReflection.deserializerFor` if the requested type is not a supertype of `WrappedArray[_]` that uses `CanBuildFrom[_, _, _]` to convert result into an arbitrary subtype of `Seq[_]`.
Care was taken to preserve the original deserialization where it is possible to avoid the overhead of conversion in cases where it is not needed
`ScalaReflection.serializerFor` could already be used to serialize any `Seq[_]` so it was not altered
`SQLImplicits` had to be altered and new implicit encoders added to permit serialization of other sequence types
Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException
## How was this patch tested?
```bash
./build/mvn -DskipTests clean package && ./dev/run-tests
```
Also manual execution of the following sets of commands in the Spark shell:
```scala
case class TestCC(key: Int, letters: List[String])
val ds1 = sc.makeRDD(Seq(
(List("D")),
(List("S","H")),
(List("F","H")),
(List("D","L","L"))
)).map(x=>(x.length,x)).toDF("key","letters").as[TestCC]
val test1=ds1.map{_.key}
test1.show
```
```scala
case class X(l: List[String])
spark.createDataset(Seq(List("A"))).map(X).show
```
```scala
spark.sqlContext.createDataset(sc.parallelize(List(1) :: Nil)).collect
```
After adding arbitrary sequence support also tested with the following commands:
```scala
case class QueueClass(q: scala.collection.immutable.Queue[Int])
spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect
```
Author: Michal Senkyr <mike.senkyr@gmail.com>
Closes#16240 from michalsenkyr/sql-caseclass-list-fix.
## What changes were proposed in this pull request?
Today we have different syntax to create data source or hive serde tables, we should unify them to not confuse users and step forward to make hive a data source.
Please read https://issues.apache.org/jira/secure/attachment/12843835/CREATE-TABLE.pdf for details.
TODO(for follow-up PRs):
1. TBLPROPERTIES is not added to the new syntax, we should decide if we wanna add it later.
2. `SHOW CREATE TABLE` should be updated to use the new syntax.
3. we should decide if we wanna change the behavior of `SET LOCATION`.
## How was this patch tested?
new tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16296 from cloud-fan/create-table.
## What changes were proposed in this pull request?
When we append data to a partitioned table with `DataFrameWriter.saveAsTable`, there are 2 issues:
1. doesn't work when the partition has custom location.
2. will recover all partitions
This PR fixes them by moving the special partition handling code from `DataSourceAnalysis` to `InsertIntoHadoopFsRelationCommand`, so that the `DataFrameWriter.saveAsTable` code path can also benefit from it.
## How was this patch tested?
newly added regression tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16460 from cloud-fan/append.
## What changes were proposed in this pull request?
Dataset actions currently spin off a new `Dataframe` only to track query execution. This PR simplifies this code path by using the `Dataset.queryExecution` directly. This PR also merges the typed and untyped action evaluation paths.
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#16466 from hvanhovell/SPARK-19070.
## What changes were proposed in this pull request?
There are many locations in the Spark repo where the same word occurs consecutively. Sometimes they are appropriately placed, but many times they are not. This PR removes the inappropriately duplicated words.
## How was this patch tested?
N/A since only docs or comments were updated.
Author: Niranjan Padmanabhan <niranjan.padmanabhan@gmail.com>
Closes#16455 from neurons/np.structure_streaming_doc.
## What changes were proposed in this pull request?
Now all aggregation functions support partial aggregate, we can remove the `supportsPartual` flag in `AggregateFunction`
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16461 from cloud-fan/partial.
### What changes were proposed in this pull request?
The data in the managed table should be deleted after table is dropped. However, if the partition location is not under the location of the partitioned table, it is not deleted as expected. Users can specify any location for the partition when they adding a partition.
This PR is to delete partition location when dropping managed partitioned tables stored in `InMemoryCatalog`.
### How was this patch tested?
Added test cases for both HiveExternalCatalog and InMemoryCatalog
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16448 from gatorsmile/unsetSerdeProp.
## What changes were proposed in this pull request?
CSV type inferencing causes `IllegalArgumentException` on decimal numbers with heterogeneous precisions and scales because the current logic uses the last decimal type in a **partition**. Specifically, `inferRowType`, the **seqOp** of **aggregate**, returns the last decimal type. This PR fixes it to use `findTightestCommonType`.
**decimal.csv**
```
9.03E+12
1.19E+11
```
**BEFORE**
```scala
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").printSchema
root
|-- _c0: decimal(3,-9) (nullable = true)
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").show
16/12/16 14:32:49 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException: requirement failed: Decimal precision 4 exceeds max precision 3
```
**AFTER**
```scala
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").printSchema
root
|-- _c0: decimal(4,-9) (nullable = true)
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").show
+---------+
| _c0|
+---------+
|9.030E+12|
| 1.19E+11|
+---------+
```
## How was this patch tested?
Pass the newly add test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#16320 from dongjoon-hyun/SPARK-18877.
## What changes were proposed in this pull request?
We add a cbo configuration to switch between default stats and estimated stats.
We also define a new statistics method `planStats` in LogicalPlan with conf as its parameter, in order to pass the cbo switch and other estimation related configurations in the future. `planStats` is used on the caller sides (i.e. in Optimizer and Strategies) to make transformation decisions based on stats.
## How was this patch tested?
Add a test case using a dummy LogicalPlan.
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#16401 from wzhfy/cboSwitch.
## What changes were proposed in this pull request?
Apache Spark supports the following cases **by quoting RDD column names** while saving through JDBC.
- Allow reserved keyword as a column name, e.g., 'order'.
- Allow mixed-case colume names like the following, e.g., `[a: int, A: int]`.
``` scala
scala> val df = sql("select 1 a, 1 A")
df: org.apache.spark.sql.DataFrame = [a: int, A: int]
...
scala> df.write.mode("overwrite").format("jdbc").options(option).save()
scala> df.write.mode("append").format("jdbc").options(option).save()
```
This PR aims to use **database column names** instead of RDD column ones in order to support the following additionally.
Note that this case succeeds with `MySQL`, but fails on `Postgres`/`Oracle` before.
``` scala
val df1 = sql("select 1 a")
val df2 = sql("select 1 A")
...
df1.write.mode("overwrite").format("jdbc").options(option).save()
df2.write.mode("append").format("jdbc").options(option).save()
```
## How was this patch tested?
Pass the Jenkins test with a new testcase.
Author: Dongjoon Hyun <dongjoon@apache.org>
Author: gatorsmile <gatorsmile@gmail.com>
Closes#15664 from dongjoon-hyun/SPARK-18123.
## What changes were proposed in this pull request?
Currently, `createTempView`, `createOrReplaceTempView`, and `createGlobalTempView` show `ParseExceptions` on invalid table names. We had better show better error message. Also, this PR also adds and updates the missing description on the API docs correctly.
**BEFORE**
```
scala> spark.range(10).createOrReplaceTempView("11111")
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '11111' expecting {'SELECT', 'FROM', 'ADD', ...}(line 1, pos 0)
== SQL ==
11111
...
```
**AFTER**
```
scala> spark.range(10).createOrReplaceTempView("11111")
org.apache.spark.sql.AnalysisException: Invalid view name: 11111;
...
```
## How was this patch tested?
Pass the Jenkins with updated a test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#16427 from dongjoon-hyun/SPARK-19012.
## What changes were proposed in this pull request?
The `CreateDataSourceTableAsSelectCommand` is quite complex now, as it has a lot of work to do if the table already exists:
1. throw exception if we don't want to ignore it.
2. do some check and adjust the schema if we want to append data.
3. drop the table and create it again if we want to overwrite.
The work 2 and 3 should be done by analyzer, so that we can also apply it to hive tables.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#15996 from cloud-fan/append.
## What changes were proposed in this pull request?
Fix the document of `ForeachWriter` to use `writeStream` instead of `write` for a streaming dataset.
## How was this patch tested?
Docs only.
Author: Carson Wang <carson.wang@intel.com>
Closes#16419 from carsonwang/FixDoc.
## What changes were proposed in this pull request?
In HDFS, when we copy a file into target directory, there will a temporary `._COPY_` file for a period of time. The duration depends on file size. If we do not skip this file, we will may read the same data for two times.
## How was this patch tested?
update unit test
Author: uncleGen <hustyugm@gmail.com>
Closes#16370 from uncleGen/SPARK-18960.
### What changes were proposed in this pull request?
Since `spark.sql.hive.thriftServer.singleSession` is a configuration of SQL component, this conf can be moved from `SparkConf` to `StaticSQLConf`.
When we introduced `spark.sql.hive.thriftServer.singleSession`, all the SQL configuration are session specific. They can be modified in different sessions.
In Spark 2.1, static SQL configuration is added. It is a perfect fit for `spark.sql.hive.thriftServer.singleSession`. Previously, we did the same move for `spark.sql.warehouse.dir` from `SparkConf` to `StaticSQLConf`
### How was this patch tested?
Added test cases in HiveThriftServer2Suites.scala
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16392 from gatorsmile/hiveThriftServerSingleSession.
## What changes were proposed in this pull request?
Currently we implement `Aggregator` with `DeclarativeAggregate`, which will serialize/deserialize the buffer object every time we process an input.
This PR implements `Aggregator` with `TypedImperativeAggregate` and avoids to serialize/deserialize buffer object many times. The benchmark shows we get about 2 times speed up.
For simple buffer object that doesn't need serialization, we still go with `DeclarativeAggregate`, to avoid performance regression.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16383 from cloud-fan/aggregator.
## What changes were proposed in this pull request?
`CSVRelation.csvParser` does type dispatch for each value in each row. We can prevent this because the schema is already kept in `CSVRelation`.
So, this PR proposes that converters are created first according to the schema, and then apply them to each.
I just ran some small benchmarks as below after resembling the logics in 7c33b0fd05/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala (L170-L178) to test the updated logics.
```scala
test("Benchmark for CSV converter") {
var numMalformedRecords = 0
val N = 500 << 12
val schema = StructType(
StructField("a", StringType) ::
StructField("b", StringType) ::
StructField("c", StringType) ::
StructField("d", StringType) :: Nil)
val row = Array("1.0", "test", "2015-08-20 14:57:00", "FALSE")
val data = spark.sparkContext.parallelize(List.fill(N)(row))
val parser = CSVRelation.csvParser(schema, schema.fieldNames, CSVOptions())
val benchmark = new Benchmark("CSV converter", N)
benchmark.addCase("cast CSV string tokens", 10) { _ =>
data.flatMap { recordTokens =>
parser(recordTokens, numMalformedRecords)
}.collect()
}
benchmark.run()
}
```
**Before**
```
CSV converter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
cast CSV string tokens 1061 / 1130 1.9 517.9 1.0X
```
**After**
```
CSV converter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
cast CSV string tokens 940 / 1011 2.2 459.2 1.0X
```
## How was this patch tested?
Tests in `CSVTypeCastSuite` and `CSVRelation`
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16351 from HyukjinKwon/type-dispatch.
## What changes were proposed in this pull request?
`UnsafeKVExternalSorter` uses `UnsafeInMemorySorter` to sort the records of `BytesToBytesMap` if it is given a map.
Currently we use the number of keys in `BytesToBytesMap` to determine if the array used for sort is enough or not. We has an assert that ensures the size of the array is enough: `map.numKeys() <= map.getArray().size() / 2`.
However, each record in the map takes two entries in the array, one is record pointer, another is key prefix. So the correct assert should be `map.numKeys() * 2 <= map.getArray().size() / 2`.
## How was this patch tested?
N/A
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16232 from viirya/SPARK-18800-fix-UnsafeKVExternalSorter.
## What changes were proposed in this pull request?
Statistics in LogicalPlan should use attributes to refer to columns rather than column names, because two columns from two relations can have the same column name. But CatalogTable doesn't have the concepts of attribute or broadcast hint in Statistics. Therefore, putting Statistics in CatalogTable is confusing.
We define a different statistic structure in CatalogTable, which is only responsible for interacting with metastore, and is converted to statistics in LogicalPlan when it is used.
## How was this patch tested?
add test cases
Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#16323 from wzhfy/nameToAttr.
## What changes were proposed in this pull request?
Add missing InterfaceStability.Evolving for Structured Streaming APIs
## How was this patch tested?
Compiling the codes.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16385 from zsxwing/SPARK-18985.
## What changes were proposed in this pull request?
SortPartitions and RedistributeData logical operators are not actually used and can be removed. Note that we do have a Sort operator (with global flag false) that subsumed SortPartitions.
## How was this patch tested?
Also updated test cases to reflect the removal.
Author: Reynold Xin <rxin@databricks.com>
Closes#16381 from rxin/SPARK-18973.
## What changes were proposed in this pull request?
This PR cleans up duplicated checking for file paths in implemented data sources and prevent to attempt to list twice in ORC data source.
https://github.com/apache/spark/pull/14585 handles a problem for the partition column name having `_` and the issue itself is resolved correctly. However, it seems the data sources implementing `FileFormat` are validating the paths duplicately. Assuming from the comment in `CSVFileFormat`, `// TODO: Move filtering.`, I guess we don't have to check this duplicately.
Currently, this seems being filtered in `PartitioningAwareFileIndex.shouldFilterOut` and`PartitioningAwareFileIndex.isDataPath`. So, `FileFormat.inferSchema` will always receive leaf files. For example, running to codes below:
``` scala
spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet")
spark.read.parquet("/tmp/parquet")
```
gives the paths below without directories but just valid data files:
``` bash
/tmp/parquet/_col=0/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet
/tmp/parquet/_col=1/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet
/tmp/parquet/_col=2/part-r-00000-25de2b50-225a-4bcf-a2bc-9eb9ed407ef6.snappy.parquet
...
```
to `FileFormat.inferSchema`.
## How was this patch tested?
Unit test added in `HadoopFsRelationTest` and related existing tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#14627 from HyukjinKwon/SPARK-16975.
## What changes were proposed in this pull request?
Starting Spark 2.1.0, bucketing feature is available for all file-based data sources. This patch fixes some function docs that haven't yet been updated to reflect that.
## How was this patch tested?
N/A
Author: Reynold Xin <rxin@databricks.com>
Closes#16349 from rxin/ds-doc.
## What changes were proposed in this pull request?
This patch includes minor changes to improve readability for partition handling code. I'm in the middle of implementing some new feature and found some naming / implicit type inference not as intuitive.
## How was this patch tested?
This patch should have no semantic change and the changes should be covered by existing test cases.
Author: Reynold Xin <rxin@databricks.com>
Closes#16378 from rxin/minor-fix.
## What changes were proposed in this pull request?
This PR audits places using `logicalPlan` in StreamExecution and ensures they all handles the case that `logicalPlan` cannot be created.
In addition, this PR also fixes the following issues in `StreamingQueryException`:
- `StreamingQueryException` and `StreamExecution` are cycle-dependent because in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s `toDebugString` which uses `StreamingQueryException`. Hence it will output `null` value in the error message.
- Duplicated stack trace when calling Throwable.printStackTrace because StreamingQueryException's toString contains the stack trace.
## How was this patch tested?
The updated `test("max files per trigger - incorrect values")`. I found this issue when I switched from `testStream` to the real codes to verify the failure in this test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16322 from zsxwing/SPARK-18907.
## What changes were proposed in this pull request?
This pr is to fix an `NullPointerException` issue caused by a following `limit + aggregate` query;
```
scala> val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
scala> df.limit(2).groupBy("id").count().show
WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
```
The root culprit is that [`$doAgg()`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L596) skips an initialization of [the buffer iterator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L603); `BaseLimitExec` sets `stopEarly=true` and `$doAgg()` exits in the middle without the initialization.
## How was this patch tested?
Added a test to check if no exception happens for limit + aggregates in `DataFrameAggregateSuite.scala`.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#15980 from maropu/SPARK-18528.
## What changes were proposed in this pull request?
Made update mode public. As part of that here are the changes.
- Update DatastreamWriter to accept "update"
- Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst
- Added update mode state removing with watermark to StateStoreSaveExec
## How was this patch tested?
Added new tests in changed modules
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16360 from tdas/SPARK-18234.
## What changes were proposed in this pull request?
Currently, Spark writes a single file out per task, sometimes leading to very large files. It would be great to have an option to limit the max number of records written per file in a task, to avoid humongous files.
This patch introduces a new write config option `maxRecordsPerFile` (default to a session-wide setting `spark.sql.files.maxRecordsPerFile`) that limits the max number of records written to a single file. A non-positive value indicates there is no limit (same behavior as not having this flag).
## How was this patch tested?
Added test cases in PartitionedWriteSuite for both dynamic partition insert and non-dynamic partition insert.
Author: Reynold Xin <rxin@databricks.com>
Closes#16204 from rxin/SPARK-18775.
## What changes were proposed in this pull request?
Two changes
- Fix how delays specified in months and years are translated to milliseconds
- Following up on #16258, not show watermark when there is no watermarking in the query
## How was this patch tested?
Updated and new unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16304 from tdas/SPARK-18834-1.
## What changes were proposed in this pull request?
It's a huge waste to call `Catalog.listTables` in `SQLContext.tableNames`, which only need the table names, while `Catalog.listTables` will get the table metadata for each table name.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16352 from cloud-fan/minor.
### What changes were proposed in this pull request?
Currently, we only have a SQL interface for recovering all the partitions in the directory of a table and update the catalog. `MSCK REPAIR TABLE` or `ALTER TABLE table RECOVER PARTITIONS`. (Actually, very hard for me to remember `MSCK` and have no clue what it means)
After the new "Scalable Partition Handling", the table repair becomes much more important for making visible the data in the created data source partitioned table.
Thus, this PR is to add it into the Catalog interface. After this PR, users can repair the table by
```Scala
spark.catalog.recoverPartitions("testTable")
```
### How was this patch tested?
Modified the existing test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16356 from gatorsmile/repairTable.
## What changes were proposed in this pull request?
Checkpoint Location can be defined for a StructuredStreaming on a per-query basis by the `DataStreamWriter` options, but it can also be provided through SparkSession configurations. It should be able to recover in both cases when the OutputMode is Complete for MemorySinks.
## How was this patch tested?
Unit tests
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#16342 from brkyvz/chk-rec.
## What changes were proposed in this pull request?
When we append data to an existing table with `DataFrameWriter.saveAsTable`, we will do various checks to make sure the appended data is consistent with the existing data.
However, we get the information of the existing table by matching the table relation, instead of looking at the table metadata. This is error-prone, e.g. we only check the number of columns for `HadoopFsRelation`, we forget to check bucketing, etc.
This PR refactors the error checking by looking at the metadata of the existing table, and fix several bugs:
* SPARK-18899: We forget to check if the specified bucketing matched the existing table, which may lead to a problematic table that has different bucketing in different data files.
* SPARK-18912: We forget to check the number of columns for non-file-based data source table
* SPARK-18913: We don't support append data to a table with special column names.
## How was this patch tested?
new regression test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16313 from cloud-fan/bug1.
## What changes were proposed in this pull request?
In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189).
This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.
## How was this patch tested?
Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#16340 from JoshRosen/sql-task-interruption.
## What changes were proposed in this pull request?
This PR proposes to fix lint-check failures and javadoc8 break.
Few errors were introduced as below:
**lint-check failures**
```
[ERROR] src/test/java/org/apache/spark/network/TransportClientFactorySuite.java:[45,1] (imports) RedundantImport: Duplicate import to line 43 - org.apache.spark.network.util.MapConfigProvider.
[ERROR] src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java:[255,10] (modifier) RedundantModifier: Redundant 'final' modifier.
```
**javadoc8**
```
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:19: error: bad use of '>'
[error] * "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger
[error] ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:20: error: bad use of '>'
[error] * "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger
[error] ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:21: error: bad use of '>'
[error] * "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger
[error] ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:22: error: bad use of '>'
[error] * "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger
[error]
```
## How was this patch tested?
Manually checked as below:
**lint-check failures**
```
./dev/lint-java
Checkstyle checks passed.
```
**javadoc8**
This seems hidden in the API doc but I manually checked after removing access modifier as below:
It looks not rendering properly (scaladoc).
![2016-12-16 3 40 34](https://cloud.githubusercontent.com/assets/6477701/21255175/8df1fe6e-c3ad-11e6-8cda-ce7f76c6677a.png)
After this PR, it renders as below:
- scaladoc
![2016-12-16 3 40 23](https://cloud.githubusercontent.com/assets/6477701/21255135/4a11dab6-c3ad-11e6-8ab2-b091c4f45029.png)
- javadoc
![2016-12-16 3 41 10](https://cloud.githubusercontent.com/assets/6477701/21255137/4bba1d9c-c3ad-11e6-9b88-62f1f697b56a.png)
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16307 from HyukjinKwon/lint-javadoc8.
## What changes were proposed in this pull request?
A vectorized parquet reader fails to read column data if data schema and partition schema overlap with each other and inferred types in the partition schema differ from ones in the data schema. An example code to reproduce this bug is as follows;
```
scala> case class A(a: Long, b: Int)
scala> val as = Seq(A(1, 2))
scala> spark.createDataFrame(as).write.parquet("/data/a=1/")
scala> val df = spark.read.parquet("/data/")
scala> df.printSchema
root
|-- a: long (nullable = true)
|-- b: integer (nullable = true)
scala> df.collect
java.lang.NullPointerException
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:283)
at org.apache.spark.sql.execution.vectorized.ColumnarBatch$Row.getLong(ColumnarBatch.java:191)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
```
The root cause is that a logical layer (`HadoopFsRelation`) and a physical layer (`VectorizedParquetRecordReader`) have a different assumption on partition schema; the logical layer trusts the data schema to infer the type the overlapped partition columns, and, on the other hand, the physical layer trusts partition schema which is inferred from path string. To fix this bug, this pr simply updates `HadoopFsRelation.schema` to respect the partition columns position in data schema and respect the partition columns type in partition schema.
## How was this patch tested?
Add tests in `ParquetPartitionDiscoverySuite`
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#16030 from maropu/SPARK-18108.
## What changes were proposed in this pull request?
This PR adds StreamingQueryWrapper to make StreamExecution and progress classes serializable because it is too easy for it to get captured with normal usage. If StreamingQueryWrapper gets captured in a closure but no place calls its methods, it should not fail the Spark tasks. However if its methods are called, then this PR will throw a better message.
## How was this patch tested?
`test("StreamingQuery should be Serializable but cannot be used in executors")`
`test("progress classes should be Serializable")`
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16272 from zsxwing/SPARK-18850.
## What changes were proposed in this pull request?
When starting a stream with a lot of backfill and maxFilesPerTrigger, the user could often want to start with most recent files first. This would let you keep low latency for recent data and slowly backfill historical data.
This PR adds a new option `latestFirst` to control this behavior. When it's true, `FileStreamSource` will sort the files by the modified time from latest to oldest, and take the first `maxFilesPerTrigger` files as a new batch.
## How was this patch tested?
The added test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16251 from zsxwing/newest-first.
## What changes were proposed in this pull request?
Right now, once a user set the comment of a column with create table command, he/she cannot update the comment. It will be useful to provide a public interface (e.g. SQL) to do that.
This PR implements the following SQL statement:
```
ALTER TABLE table [PARTITION partition_spec]
CHANGE [COLUMN] column_old_name column_new_name column_dataType
[COMMENT column_comment]
[FIRST | AFTER column_name];
```
For further expansion, we could support alter `name`/`dataType`/`index` of a column too.
## How was this patch tested?
Add new test cases in `ExternalCatalogSuite` and `SessionCatalogSuite`.
Add sql file test for `ALTER TABLE CHANGE COLUMN` statement.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#15717 from jiangxb1987/change-column.
## What changes were proposed in this pull request?
In `DataSource`, if the table is not analyzed, we will use 0 as the default value for table size. This is dangerous, we may broadcast a large table and cause OOM. We should use `defaultSizeInBytes` instead.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16280 from cloud-fan/bug.
## What changes were proposed in this pull request?
This is a bug introduced by subquery handling. numberedTreeString (which uses generateTreeString under the hood) numbers trees including innerChildren (used to print subqueries), but apply (which uses getNodeNumbered) ignores innerChildren. As a result, apply(i) would return the wrong plan node if there are subqueries.
This patch fixes the bug.
## How was this patch tested?
Added a test case in SubquerySuite.scala to test both the depth-first traversal of numbering as well as making sure the two methods are consistent.
Author: Reynold Xin <rxin@databricks.com>
Closes#16277 from rxin/SPARK-18854.
## What changes were proposed in this pull request?
Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError.
This PR just makes it return null instead.
## How was this patch tested?
`test("lastProgress should be null when recentProgress is empty")`
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16273 from zsxwing/SPARK-18852.
## What changes were proposed in this pull request?
Currently, `FileSourceStrategy` does not handle the case when the pushed-down filter is `Literal(null)` and removes it at the post-filter in Spark-side.
For example, the codes below:
```scala
val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
df.filter($"_1" === "true").explain(true)
```
shows it keeps `null` properly.
```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- LocalRelation [_1#17]
== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#17 as double) = cast(true as double))
+- LocalRelation [_1#17]
== Optimized Logical Plan ==
Filter (isnotnull(_1#17) && null)
+- LocalRelation [_1#17]
== Physical Plan ==
*Filter (isnotnull(_1#17) && null) << Here `null` is there
+- LocalTableScan [_1#17]
```
However, when we read it back from Parquet,
```scala
val path = "/tmp/testfile"
df.write.parquet(path)
spark.read.parquet(path).filter($"_1" === "true").explain(true)
```
`null` is removed at the post-filter.
```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#11] parquet
== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#11 as double) = cast(true as double))
+- Relation[_1#11] parquet
== Optimized Logical Plan ==
Filter (isnotnull(_1#11) && null)
+- Relation[_1#11] parquet
== Physical Plan ==
*Project [_1#11]
+- *Filter isnotnull(_1#11) << Here `null` is missing
+- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```
This PR fixes it to keep it properly. In more details,
```scala
val partitionKeyFilters =
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
```
This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have `children` and `references` is being empty which is always the subset of `partitionSet`.
And then in
```scala
val afterScanFilters = filterSet -- partitionKeyFilters
```
`null` is always removed from the post filter. So, if the referenced fields are empty, it should be applied into data columns too.
After this PR, it becomes as below:
```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#276] parquet
== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#276 as double) = cast(true as double))
+- Relation[_1#276] parquet
== Optimized Logical Plan ==
Filter (isnotnull(_1#276) && null)
+- Relation[_1#276] parquet
== Physical Plan ==
*Project [_1#276]
+- *Filter (isnotnull(_1#276) && null)
+- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b..., PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```
## How was this patch tested?
Unit test in `FileSourceStrategySuite`
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16184 from HyukjinKwon/SPARK-18753.
## What changes were proposed in this pull request?
`OverwriteOptions` was introduced in https://github.com/apache/spark/pull/15705, to carry the information of static partitions. However, after further refactor, this information becomes duplicated and we can remove `OverwriteOptions`.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#15995 from cloud-fan/overwrite.
## What changes were proposed in this pull request?
Add implicit encoders for BigDecimal, timestamp and date.
## How was this patch tested?
Add an unit test. Pass build, unit tests, and some tests below .
Before:
```
scala> spark.createDataset(Seq(new java.math.BigDecimal(10)))
<console>:24: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
spark.createDataset(Seq(new java.math.BigDecimal(10)))
^
scala>
```
After:
```
scala> spark.createDataset(Seq(new java.math.BigDecimal(10)))
res0: org.apache.spark.sql.Dataset[java.math.BigDecimal] = [value: decimal(38,18)]
```
Author: Weiqing Yang <yangweiqing001@gmail.com>
Closes#16176 from weiqingy/SPARK-18746.
## What changes were proposed in this pull request?
- Changed `StreamingQueryProgress.watermark` to `StreamingQueryProgress.queryTimestamps` which is a `Map[String, String]` containing the following keys: "eventTime.max", "eventTime.min", "eventTime.avg", "processingTime", "watermark". All of them UTC formatted strings.
- Renamed `StreamingQuery.timestamp` to `StreamingQueryProgress.triggerTimestamp` to differentiate from `queryTimestamps`. It has the timestamp of when the trigger was started.
## How was this patch tested?
Updated tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16258 from tdas/SPARK-18834.
## What changes were proposed in this pull request?
Change the statement `SHOW TABLES [EXTENDED] [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards'] [PARTITION(partition_spec)]` to the following statements:
- SHOW TABLES [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards']
- SHOW TABLE EXTENDED [(IN|FROM) database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)]
After this change, the statements `SHOW TABLE/SHOW TABLES` have the same syntax with that HIVE has.
## How was this patch tested?
Modified the test sql file `show-tables.sql`;
Modified the test suite `DDLSuite`.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#16262 from jiangxb1987/show-table-extended.
## What changes were proposed in this pull request?
Some places in SQL may call `RpcEndpointRef.askWithRetry` (e.g., ParquetFileFormat.buildReader -> SparkContext.broadcast -> ... -> BlockManagerMaster.updateBlockInfo -> RpcEndpointRef.askWithRetry), which will finally call `Await.result`. It may cause `java.lang.IllegalArgumentException: spark.sql.execution.id is already set` when running in Scala ForkJoinPool.
This PR includes the following changes to fix this issue:
- Remove `ThreadUtils.awaitResult`
- Rename `ThreadUtils. awaitResultInForkJoinSafely` to `ThreadUtils.awaitResult`
- Replace `Await.result` in RpcTimeout with `ThreadUtils.awaitResult`.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16230 from zsxwing/fix-SPARK-13747.
## What changes were proposed in this pull request?
I **believe** that I _only_ removed duplicated code (that adds nothing but noise). I'm gonna remove the comment after Jenkins has built the changes with no issues and Spark devs has agreed to include the changes.
Remove explicit `RDD` and `Partition` overrides (that turn out code duplication)
## How was this patch tested?
Local build. Awaiting Jenkins.
…cation)
Author: Jacek Laskowski <jacek@japila.pl>
Closes#16145 from jaceklaskowski/rdd-overrides-removed.
## What changes were proposed in this pull request?
Major change in this PR:
- Add `pendingQueryNames` and `pendingQueryIds` to track that are going to start but not yet put into `activeQueries` so that we don't need to hold a lock when starting a query.
Minor changes:
- Fix a potential NPE when the user sets `checkpointLocation` using SQLConf but doesn't specify a query name.
- Add missing docs in `StreamingQueryListener`
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16220 from zsxwing/SPARK-18796.
The value of the "isSrcLocal" parameter passed to Hive's loadTable and
loadPartition methods needs to be set according to the user query (e.g.
"LOAD DATA LOCAL"), and not the current code that tries to guess what
it should be.
For existing versions of Hive the current behavior is probably ok, but
some recent changes in the Hive code changed the semantics slightly,
making code that sets "isSrcLocal" to "true" incorrectly to do the
wrong thing. It would end up moving the parent directory of the files
into the final location, instead of the file themselves, resulting
in a table that cannot be read.
I modified HiveCommandSuite so that existing "LOAD DATA" tests are run
both in local and non-local mode, since the semantics are slightly different.
The tests include a few new checks to make sure the semantics follow
what Hive describes in its documentation.
Tested with existing unit tests and also ran some Hive integration tests
with a version of Hive containing the changes that surfaced the problem.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#16179 from vanzin/SPARK-18752.
The problem is if it is run with no fix throws an exception and causes the following error:
"Cannot specify a column width on data type bit."
The problem stems from the fact that the "java.sql.types.BIT" type is mapped as BIT[n] that really must be mapped as BIT.
This concerns the type Boolean.
As for the type String with maximum length of characters it must be mapped as VARCHAR (MAX) instead of TEXT which is a type deprecated in SQLServer.
Here is the list of mappings for SQL Server:
https://msdn.microsoft.com/en-us/library/ms378878(v=sql.110).aspxCloses#13944 from meknio/master.
## What changes were proposed in this pull request?
Instead of only keeping the minimum number of offsets around, we should keep enough information to allow us to roll back n batches and reexecute the stream starting from a given point. In particular, we should create a config in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure that we keep enough log files in the following places to roll back the specified number of batches:
the offsets that are present in each batch
versions of the state store
the files lists stored for the FileStreamSource
the metadata log stored by the FileStreamSink
marmbrus zsxwing
## How was this patch tested?
The following tests were added.
### StreamExecution offset metadata
Test added to StreamingQuerySuite that ensures offset metadata is garbage collected according to minBatchesRetain
### CompactibleFileStreamLog
Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged starting before the first compaction file that proceeds the current batch id - minBatchesToRetain.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Tyson Condie <tcondie@gmail.com>
Closes#16219 from tcondie/offset_hist.
### What changes were proposed in this pull request?
Currently, when users use Python UDF in Filter, BatchEvalPython is always generated below FilterExec. However, not all the predicates need to be evaluated after Python UDF execution. Thus, this PR is to push down the determinisitc predicates through `BatchEvalPython`.
```Python
>>> df = spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
>>> from pyspark.sql.functions import udf, col
>>> from pyspark.sql.types import BooleanType
>>> my_filter = udf(lambda a: a < 2, BooleanType())
>>> sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2"))
>>> sel.explain(True)
```
Before the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]
== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter ((isnotnull(value#1) && pythonUDF0#9) && (value#1 < 2))
+- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
+- Scan ExistingRDD[key#0L,value#1]
```
After the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]
== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter pythonUDF0#9: boolean
+- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
+- *Filter (isnotnull(value#1) && (value#1 < 2))
+- Scan ExistingRDD[key#0L,value#1]
```
### How was this patch tested?
Added both unit test cases for `BatchEvalPythonExec` and also add an end-to-end test case in Python test suite.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16193 from gatorsmile/pythonUDFPredicatePushDown.
## What changes were proposed in this pull request?
1. In SparkStrategies.canBroadcast, I will add the check plan.statistics.sizeInBytes >= 0
2. In LocalRelations.statistics, when calculate the statistics, I will change the size to BigInt so it won't overflow.
## How was this patch tested?
I will add a test case to make sure the statistics.sizeInBytes won't overflow.
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#16175 from huaxingao/spark-17460.
## What changes were proposed in this pull request?
When you start a stream, if we are trying to resolve the source of the stream, for example if we need to resolve partition columns, this could take a long time. This long execution time should not block the main thread where `query.start()` was called on. It should happen in the stream execution thread possibly before starting any triggers.
## How was this patch tested?
Unit test added. Made sure test fails with no code changes.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#16238 from brkyvz/SPARK-18811.
## What changes were proposed in this pull request?
This PR avoids that a result of a cast `toInt` is negative due to signed integer overflow (e.g. 0x0000_0000_1???????L.toInt < 0 ). This PR performs casts after we can ensure the value is within range of signed integer (the result of `max(array.length, ???)` is always integer).
## How was this patch tested?
Manually executed query68 of TPC-DS with 100TB
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#16235 from kiszk/SPARK-18745.
## What changes were proposed in this pull request?
* This PR changes `JVMObjectTracker` from `object` to `class` and let its instance associated with each RBackend. So we can manage the lifecycle of JVM objects when there are multiple `RBackend` sessions. `RBackend.close` will clear the object tracker explicitly.
* I assume that `SQLUtils` and `RRunner` do not need to track JVM instances, which could be wrong.
* Small refactor of `SerDe.sqlSerDe` to increase readability.
## How was this patch tested?
* Added unit tests for `JVMObjectTracker`.
* Wait for Jenkins to run full tests.
Author: Xiangrui Meng <meng@databricks.com>
Closes#16154 from mengxr/SPARK-17822.
## What changes were proposed in this pull request?
- Changed FileStreamSource to use new FileStreamSourceOffset rather than LongOffset. The field is named as `logOffset` to make it more clear that this is a offset in the file stream log.
- Fixed bug in FileStreamSourceLog, the field endId in the FileStreamSourceLog.get(startId, endId) was not being used at all. No test caught it earlier. Only my updated tests caught it.
Other minor changes
- Dont use batchId in the FileStreamSource, as calling it batch id is extremely miss leading. With multiple sources, it may happen that a new batch has no new data from a file source. So offset of FileStreamSource != batchId after that batch.
## How was this patch tested?
Updated unit test.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16205 from tdas/SPARK-18776.
## What changes were proposed in this pull request?
This patch fixes the format specification in explain for file sources (Parquet and Text formats are the only two that are different from the rest):
Before:
```
scala> spark.read.text("test.text").explain()
== Physical Plan ==
*FileScan text [value#15] Batched: false, Format: org.apache.spark.sql.execution.datasources.text.TextFileFormatxyz, Location: InMemoryFileIndex[file:/scratch/rxin/spark/test.text], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
```
After:
```
scala> spark.read.text("test.text").explain()
== Physical Plan ==
*FileScan text [value#15] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/scratch/rxin/spark/test.text], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
```
Also closes#14680.
## How was this patch tested?
Verified in spark-shell.
Author: Reynold Xin <rxin@databricks.com>
Closes#16187 from rxin/SPARK-18760.
## What changes were proposed in this pull request?
`input_file_name` doesn't return filename when working with UDF in PySpark. An example shows the problem:
from pyspark.sql.functions import *
from pyspark.sql.types import *
def filename(path):
return path
sourceFile = udf(filename, StringType())
spark.read.json("tmp.json").select(sourceFile(input_file_name())).show()
+---------------------------+
|filename(input_file_name())|
+---------------------------+
| |
+---------------------------+
The cause of this issue is, we group rows in `BatchEvalPythonExec` for batching processing of PythonUDF. Currently we group rows first and then evaluate expressions on the rows. If the data is less than the required number of rows for a group, the iterator will be consumed to the end before the evaluation. However, once the iterator reaches the end, we will unset input filename. So the input_file_name expression can't return correct filename.
This patch fixes the approach to group the batch of rows. We evaluate the expression first and then group evaluated results to batch.
## How was this patch tested?
Added unit test to PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16115 from viirya/fix-py-udf-input-filename.
## What changes were proposed in this pull request?
When `ignoreCorruptFiles` is enabled, it's better to also ignore non-existing files.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16203 from zsxwing/ignore-file-not-found.
## What changes were proposed in this pull request?
Listeners added with `sparkSession.streams.addListener(l)` are added to a SparkSession. So events only from queries in the same session as a listener should be posted to the listener. Currently, all the events gets rerouted through the Spark's main listener bus, that is,
- StreamingQuery posts event to StreamingQueryListenerBus. Only the queries associated with the same session as the bus posts events to it.
- StreamingQueryListenerBus posts event to Spark's main LiveListenerBus as a SparkEvent.
- StreamingQueryListenerBus also subscribes to LiveListenerBus events thus getting back the posted event in a different thread.
- The received is posted to the registered listeners.
The problem is that *all StreamingQueryListenerBuses in all sessions* gets the events and posts them to their listeners. This is wrong.
In this PR, I solve it by making StreamingQueryListenerBus track active queries (by their runIds) when a query posts the QueryStarted event to the bus. This allows the rerouted events to be filtered using the tracked queries.
Note that this list needs to be maintained separately
from the `StreamingQueryManager.activeQueries` because a terminated query is cleared from
`StreamingQueryManager.activeQueries` as soon as it is stopped, but the this ListenerBus must
clear a query only after the termination event of that query has been posted lazily, much after the query has been terminated.
Credit goes to zsxwing for coming up with the initial idea.
## How was this patch tested?
Updated test harness code to use the correct session, and added new unit test.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16186 from tdas/SPARK-18758.
Based on an informal survey, users find this option easier to understand / remember.
Author: Michael Armbrust <michael@databricks.com>
Closes#16182 from marmbrus/renameRecentProgress.
## What changes were proposed in this pull request?
It's better to add a warning log when skipping a corrupted file. It will be helpful when we want to finish the job first, then find them in the log and fix these files.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16192 from zsxwing/SPARK-18764.
## What changes were proposed in this pull request?
Easier to read while debugging as a formatted string (in ISO8601 format) than in millis
## How was this patch tested?
Updated unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16166 from tdas/SPARK-18734.
## What changes were proposed in this pull request?
Many Spark developers often want to test the runtime of some function in interactive debugging and testing. This patch adds a simple time function to SparkSession:
```
scala> spark.time { spark.range(1000).count() }
Time taken: 77 ms
res1: Long = 1000
```
## How was this patch tested?
I tested this interactively in spark-shell.
Author: Reynold Xin <rxin@databricks.com>
Closes#16140 from rxin/SPARK-18714.
## What changes were proposed in this pull request?
Right now ForeachSink creates a new physical plan, so StreamExecution cannot retrieval metrics and watermark.
This PR changes ForeachSink to manually convert InternalRows to objects without creating a new plan.
## How was this patch tested?
`test("foreach with watermark: append")`.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16160 from zsxwing/SPARK-18721.
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-18572)
## What changes were proposed in this pull request?
Currently Spark answers the `SHOW PARTITIONS` command by fetching all of the table's partition metadata from the external catalog and constructing partition names therefrom. The Hive client has a `getPartitionNames` method which is many times faster for this purpose, with the performance improvement scaling with the number of partitions in a table.
To test the performance impact of this PR, I ran the `SHOW PARTITIONS` command on two Hive tables with large numbers of partitions. One table has ~17,800 partitions, and the other has ~95,000 partitions. For the purposes of this PR, I'll call the former table `table1` and the latter table `table2`. I ran 5 trials for each table with before-and-after versions of this PR. The results are as follows:
Spark at bdc8153, `SHOW PARTITIONS table1`, times in seconds:
7.901
3.983
4.018
4.331
4.261
Spark at bdc8153, `SHOW PARTITIONS table2`
(Timed out after 10 minutes with a `SocketTimeoutException`.)
Spark at this PR, `SHOW PARTITIONS table1`, times in seconds:
3.801
0.449
0.395
0.348
0.336
Spark at this PR, `SHOW PARTITIONS table2`, times in seconds:
5.184
1.63
1.474
1.519
1.41
Taking the best times from each trial, we get a 12x performance improvement for a table with ~17,800 partitions and at least a 426x improvement for a table with ~95,000 partitions. More significantly, the latter command doesn't even complete with the current code in master.
This is actually a patch we've been using in-house at VideoAmp since Spark 1.1. It's made all the difference in the practical usability of our largest tables. Even with tables with about 1,000 partitions there's a performance improvement of about 2-3x.
## How was this patch tested?
I added a unit test to `VersionsSuite` which tests that the Hive client's `getPartitionNames` method returns the correct number of partitions.
Author: Michael Allman <michael@videoamp.com>
Closes#15998 from mallman/spark-18572-list_partition_names.
## What changes were proposed in this pull request?
Move no data rate limit from StreamExecution to ProgressReporter to make `recentProgresses` and listener events consistent.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16155 from zsxwing/SPARK-18722.
## What changes were proposed in this pull request?
DataSet.na.fill(0) used on a DataSet which has a long value column, it will change the original long value.
The reason is that the type of the function fill's param is Double, and the numeric columns are always cast to double(`fillCol[Double](f, value)`) .
```
def fill(value: Double, cols: Seq[String]): DataFrame = {
val columnEquals = df.sparkSession.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
// Only fill if the column is part of the cols list.
if (f.dataType.isInstanceOf[NumericType] && cols.exists(col => columnEquals(f.name, col))) {
fillCol[Double](f, value)
} else {
df.col(f.name)
}
}
df.select(projections : _*)
}
```
For example:
```
scala> val df = Seq[(Long, Long)]((1, 2), (-1, -2), (9123146099426677101L, 9123146560113991650L)).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: bigint, b: bigint]
scala> df.show
+-------------------+-------------------+
| a| b|
+-------------------+-------------------+
| 1| 2|
| -1| -2|
|9123146099426677101|9123146560113991650|
+-------------------+-------------------+
scala> df.na.fill(0).show
+-------------------+-------------------+
| a| b|
+-------------------+-------------------+
| 1| 2|
| -1| -2|
|9123146099426676736|9123146560113991680|
+-------------------+-------------------+
```
the original values changed [which is not we expected result]:
```
9123146099426677101 -> 9123146099426676736
9123146560113991650 -> 9123146560113991680
```
## How was this patch tested?
unit test added.
Author: root <root@iZbp1gsnrlfzjxh82cz80vZ.(none)>
Closes#15994 from windpiger/nafillMissupOriginalValue.
### What changes were proposed in this pull request?
Our existing withColumn for adding metadata can simply use the existing public withColumn API.
### How was this patch tested?
The existing test cases cover it.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16152 from gatorsmile/withColumnRefactoring.
## What changes were proposed in this pull request?
Here are the major changes in this PR.
- Added the ability to recover `StreamingQuery.id` from checkpoint location, by writing the id to `checkpointLoc/metadata`.
- Added `StreamingQuery.runId` which is unique for every query started and does not persist across restarts. This is to identify each restart of a query separately (same as earlier behavior of `id`).
- Removed auto-generation of `StreamingQuery.name`. The purpose of name was to have the ability to define an identifier across restarts, but since id is precisely that, there is no need for a auto-generated name. This means name becomes purely cosmetic, and is null by default.
- Added `runId` to `StreamingQueryListener` events and `StreamingQueryProgress`.
Implementation details
- Renamed existing `StreamExecutionMetadata` to `OffsetSeqMetadata`, and moved it to the file `OffsetSeq.scala`, because that is what this metadata is tied to. Also did some refactoring to make the code cleaner (got rid of a lot of `.json` and `.getOrElse("{}")`).
- Added the `id` as the new `StreamMetadata`.
- When a StreamingQuery is created it gets or writes the `StreamMetadata` from `checkpointLoc/metadata`.
- All internal logging in `StreamExecution` uses `(name, id, runId)` instead of just `name`
TODO
- [x] Test handling of name=null in json generation of StreamingQueryProgress
- [x] Test handling of name=null in json generation of StreamingQueryListener events
- [x] Test python API of runId
## How was this patch tested?
Updated unit tests and new unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16113 from tdas/SPARK-18657.
## What changes were proposed in this pull request?
Move DataFrame.collect out of synchronized block so that we can query content in MemorySink when `DataFrame.collect` is running.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16162 from zsxwing/SPARK-18729.
## What changes were proposed in this pull request?
As reported in the Jira, there are some weird issues with exploding Python UDFs in SparkSQL.
The following test code can reproduce it. Notice: the following test code is reported to return wrong results in the Jira. However, as I tested on master branch, it causes exception and so can't return any result.
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>>
>>> df = spark.range(10)
>>>
>>> def return_range(value):
... return [(i, str(i)) for i in range(value - 1, value + 1)]
...
>>> range_udf = udf(return_range, ArrayType(StructType([StructField("integer_val", IntegerType()),
... StructField("string_val", StringType())])))
>>>
>>> df.select("id", explode(range_udf(df.id))).show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/spark/python/pyspark/sql/dataframe.py", line 318, in show
print(self._jdf.showString(n, 20))
File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.showString.: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:120)
at org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:57)
The cause of this issue is, in `ExtractPythonUDFs` we insert `BatchEvalPythonExec` to run PythonUDFs in batch. `BatchEvalPythonExec` will add extra outputs (e.g., `pythonUDF0`) to original plan. In above case, the original `Range` only has one output `id`. After `ExtractPythonUDFs`, the added `BatchEvalPythonExec` has two outputs `id` and `pythonUDF0`.
Because the output of `GenerateExec` is given after analysis phase, in above case, it is the combination of `id`, i.e., the output of `Range`, and `col`. But in planning phase, we change `GenerateExec`'s child plan to `BatchEvalPythonExec` with additional output attributes.
It will cause no problem in non wholestage codegen. Because when evaluating the additional attributes are projected out the final output of `GenerateExec`.
However, as `GenerateExec` now supports wholestage codegen, the framework will input all the outputs of the child plan to `GenerateExec`. Then when consuming `GenerateExec`'s output data (i.e., calling `consume`), the number of output attributes is different to the output variables in wholestage codegen.
To solve this issue, this patch only gives the generator's output to `GenerateExec` after analysis phase. `GenerateExec`'s output is the combination of its child plan's output and the generator's output. So when we change `GenerateExec`'s child, its output is still correct.
## How was this patch tested?
Added test cases to PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16120 from viirya/fix-py-udf-with-generator.
## What changes were proposed in this pull request?
- Add StreamingQuery.explain and exception to Python.
- Fix StreamingQueryException to not expose `OffsetSeq`.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16125 from zsxwing/py-streaming-explain.
## What changes were proposed in this pull request?
We currently have function input_file_name to get the path of the input file, but don't have functions to get the block start offset and length. This patch introduces two functions:
1. input_file_block_start: returns the file block start offset, or -1 if not available.
2. input_file_block_length: returns the file block length, or -1 if not available.
## How was this patch tested?
Updated existing test cases in ColumnExpressionSuite that covered input_file_name to also cover the two new functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#16133 from rxin/SPARK-18702.
## What changes were proposed in this pull request?
Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason.
We should avoid doing this when the user specifies a schema.
## How was this patch tested?
Perf stat tests.
Author: Eric Liang <ekl@databricks.com>
Closes#16090 from ericl/spark-18661.
## What changes were proposed in this pull request?
This patch significantly improves the IO / file listing performance of schema inference in Spark's built-in CSV data source.
Previously, this data source used the legacy `SparkContext.hadoopFile` and `SparkContext.hadoopRDD` methods to read files during its schema inference step, causing huge file-listing bottlenecks on the driver.
This patch refactors this logic to use Spark SQL's `text` data source to read files during this step. The text data source still performs some unnecessary file listing (since in theory we already have resolved the table prior to schema inference and therefore should be able to scan without performing _any_ extra listing), but that listing is much faster and takes place in parallel. In one production workload operating over tens of thousands of files, this change managed to reduce schema inference time from 7 minutes to 2 minutes.
A similar problem also affects the JSON file format and this patch originally fixed that as well, but I've decided to split that change into a separate patch so as not to conflict with changes in another JSON PR.
## How was this patch tested?
Existing unit tests, plus manual benchmarking on a production workload.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#15813 from JoshRosen/use-text-data-source-in-csv-and-json.
## What changes were proposed in this pull request?
This PR adds a sql conf `spark.sql.streaming.noDataReportInterval` to control how long to wait before outputing the next StreamProgressEvent when there is no data.
## How was this patch tested?
The added unit test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16108 from zsxwing/SPARK-18670.
## What changes were proposed in this pull request?
Two bugs are addressed here
1. INSERT OVERWRITE TABLE sometime crashed when catalog partition management was enabled. This was because when dropping partitions after an overwrite operation, the Hive client will attempt to delete the partition files. If the entire partition directory was dropped, this would fail. The PR fixes this by adding a flag to control whether the Hive client should attempt to delete files.
2. The static partition spec for OVERWRITE TABLE was not correctly resolved to the case-sensitive original partition names. This resulted in the entire table being overwritten if you did not correctly capitalize your partition names.
cc yhuai cloud-fan
## How was this patch tested?
Unit tests. Surprisingly, the existing overwrite table tests did not catch these edge cases.
Author: Eric Liang <ekl@databricks.com>
Closes#16088 from ericl/spark-18659.
## What changes were proposed in this pull request?
Currently, `JDBCRelation.insert` removes Spark options too early by mistakenly using `asConnectionProperties`. Spark options like `numPartitions` should be passed into `DataFrameWriter.jdbc` correctly. This bug have been **hidden** because `JDBCOptions.asConnectionProperties` fails to filter out the mixed-case options. This PR aims to fix both.
**JDBCRelation.insert**
```scala
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
val url = jdbcOptions.url
val table = jdbcOptions.table
- val properties = jdbcOptions.asConnectionProperties
+ val properties = jdbcOptions.asProperties
data.write
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.jdbc(url, table, properties)
```
**JDBCOptions.asConnectionProperties**
```scala
scala> import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
scala> import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
scala> new JDBCOptions(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10")).asConnectionProperties
res0: java.util.Properties = {numpartitions=10}
scala> new JDBCOptions(new CaseInsensitiveMap(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10"))).asConnectionProperties
res1: java.util.Properties = {numpartitions=10}
```
## How was this patch tested?
Pass the Jenkins with a new testcase.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#15863 from dongjoon-hyun/SPARK-18419.
## What changes were proposed in this pull request?
In Spark 2.1 ListingFileCatalog was significantly refactored (and renamed to InMemoryFileIndex). This introduced a regression where parallelism could only be introduced at the very top of the tree. However, in many cases (e.g. `spark.read.parquet(topLevelDir)`), the top of the tree is only a single directory.
This PR simplifies and fixes the parallel recursive listing code to allow parallelism to be introduced at any level during recursive descent (though note that once we decide to list a sub-tree in parallel, the sub-tree is listed in serial on executors).
cc mallman cloud-fan
## How was this patch tested?
Checked metrics in unit tests.
Author: Eric Liang <ekl@databricks.com>
Closes#16112 from ericl/spark-18679.
This PR targets to both master and branch-2.1.
## What changes were proposed in this pull request?
Due to PARQUET-686, Parquet doesn't do string comparison correctly while doing filter push-down for string columns. This PR disables filter push-down for both string and binary columns to work around this issue. Binary columns are also affected because some Parquet data models (like Hive) may store string columns as a plain Parquet `binary` instead of a `binary (UTF8)`.
## How was this patch tested?
New test case added in `ParquetFilterSuite`.
Author: Cheng Lian <lian@databricks.com>
Closes#16106 from liancheng/spark-17213-bad-string-ppd.
## What changes were proposed in this pull request?
This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering.
The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric.
## How was this patch tested?
Existing unit tests.
Author: Nathan Howell <nhowell@godaddy.com>
Closes#16089 from NathanHowell/SPARK-18658.
## What changes were proposed in this pull request?
SQL query generated for the JDBC data source is not quoting columns in the predicate clause. When the source table has quoted column names, spark jdbc read fails with column not found error incorrectly.
Error:
org.h2.jdbc.JdbcSQLException: Column "ID" not found;
Source SQL statement:
SELECT "Name","Id" FROM TEST."mixedCaseCols" WHERE (Id < 1)
This PR fixes by quoting column names in the generated SQL for predicate clause when filters are pushed down to the data source.
Source SQL statement after the fix:
SELECT "Name","Id" FROM TEST."mixedCaseCols" WHERE ("Id" < 1)
## How was this patch tested?
Added new test case to the JdbcSuite
Author: sureshthalamati <suresh.thalamati@gmail.com>
Closes#15662 from sureshthalamati/filter_quoted_cols-SPARK-18141.
## What changes were proposed in this pull request?
The current error message of USING join is quite confusing, for example:
```
scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1")
df1: org.apache.spark.sql.DataFrame = [c1: int]
scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2")
df2: org.apache.spark.sql.DataFrame = [c2: int]
scala> df1.join(df2, usingColumn = "c1")
org.apache.spark.sql.AnalysisException: using columns ['c1] can not be resolved given input columns: [c1, c2] ;;
'Join UsingJoin(Inner,List('c1))
:- Project [value#1 AS c1#3]
: +- LocalRelation [value#1]
+- Project [value#7 AS c2#9]
+- LocalRelation [value#7]
```
after this PR, it becomes:
```
scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1")
df1: org.apache.spark.sql.DataFrame = [c1: int]
scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2")
df2: org.apache.spark.sql.DataFrame = [c2: int]
scala> df1.join(df2, usingColumn = "c1")
org.apache.spark.sql.AnalysisException: USING column `c1` can not be resolved with the right join side, the right output is: [c2];
```
## How was this patch tested?
updated tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16100 from cloud-fan/natural.
### What changes were proposed in this pull request?
The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree.
```Scala
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
```
```Scala
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
```
This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node.
Before the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
After the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
### How was this patch tested?
Added the verification logics on all the test cases for JDBC concurrent fetching.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#15975 from gatorsmile/jdbc.
## What changes were proposed in this pull request?
Currently we haven't implemented `SHOW TABLE EXTENDED` in Spark 2.0. This PR is to implement the statement.
Goals:
1. Support `SHOW TABLES EXTENDED LIKE 'identifier_with_wildcards'`;
2. Explicitly output an unsupported error message for `SHOW TABLES [EXTENDED] ... PARTITION` statement;
3. Improve test cases for `SHOW TABLES` statement.
## How was this patch tested?
1. Add new test cases in file `show-tables.sql`.
2. Modify tests for `SHOW TABLES` in `DDLSuite`.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#15958 from jiangxb1987/show-table-extended.
## What changes were proposed in this pull request?
- Add StreamingQueryStatus.json
- Make it not case class (to avoid unnecessarily exposing implicit object StreamingQueryStatus, consistent with StreamingQueryProgress)
- Add StreamingQuery.status to Python
- Fix post-termination status
## How was this patch tested?
New unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16075 from tdas/SPARK-18516-1.
## What changes were proposed in this pull request?
`AggregateFunction` currently implements `ImplicitCastInputTypes` (which enables implicit input type casting). There are actually quite a few situations in which we don't need this, or require more control over our input. A recent example is the aggregate for `CountMinSketch` which should only take string, binary or integral types inputs.
This PR removes `ImplicitCastInputTypes` from the `AggregateFunction` and makes a case-by-case decision on what kind of input validation we should use.
## How was this patch tested?
Refactoring only. Existing tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#16066 from hvanhovell/SPARK-18632.
This PR separates the status of a `StreamingQuery` into two separate APIs:
- `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available.
- `recentProgress` - an array of statistics about the most recent microbatches that have executed.
A recent progress contains the following information:
```
{
"id" : "2be8670a-fce1-4859-a530-748f29553bb6",
"name" : "query-29",
"timestamp" : 1479705392724,
"inputRowsPerSecond" : 230.76923076923077,
"processedRowsPerSecond" : 10.869565217391303,
"durationMs" : {
"triggerExecution" : 276,
"queryPlanning" : 3,
"getBatch" : 5,
"getOffset" : 3,
"addBatch" : 234,
"walCommit" : 30
},
"currentWatermark" : 0,
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-14]]",
"startOffset" : {
"topic-14" : {
"2" : 0,
"4" : 1,
"1" : 0,
"3" : 0,
"0" : 0
}
},
"endOffset" : {
"topic-14" : {
"2" : 1,
"4" : 2,
"1" : 0,
"3" : 0,
"0" : 1
}
},
"numRecords" : 3,
"inputRowsPerSecond" : 230.76923076923077,
"processedRowsPerSecond" : 10.869565217391303
} ]
}
```
Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Michael Armbrust <michael@databricks.com>
Closes#15954 from marmbrus/queryProgress.
## What changes were proposed in this pull request?
Re-partitioning logic in ExchangeCoordinator changed so that adding another pre-shuffle partition to the post-shuffle partition will not be done if doing so would cause the size of the post-shuffle partition to exceed the target partition size.
## How was this patch tested?
Existing tests updated to reflect new expectations.
Author: Mark Hamstra <markhamstra@gmail.com>
Closes#16065 from markhamstra/SPARK-17064.
Revise HDFSMetadataLog API such that metadata object serialization and final batch file write are separated. This will allow serialization checks without worrying about batch file name formats. marmbrus zsxwing
Existing tests already ensure this API faithfully support core functionality i.e., creation of batch files.
Author: Tyson Condie <tcondie@gmail.com>
Closes#15924 from tcondie/SPARK-18498.
Signed-off-by: Michael Armbrust <michael@databricks.com>
## What changes were proposed in this pull request?
This PR make `sbt unidoc` complete with Java 8.
This PR roughly includes several fixes as below:
- Fix unrecognisable class and method links in javadoc by changing it from `[[..]]` to `` `...` ``
```diff
- * A column that will be computed based on the data in a [[DataFrame]].
+ * A column that will be computed based on the data in a `DataFrame`.
```
- Fix throws annotations so that they are recognisable in javadoc
- Fix URL links to `<a href="http..."></a>`.
```diff
- * [[http://en.wikipedia.org/wiki/Decision_tree_learning Decision tree]] model for regression.
+ * <a href="http://en.wikipedia.org/wiki/Decision_tree_learning">
+ * Decision tree (Wikipedia)</a> model for regression.
```
```diff
- * see http://en.wikipedia.org/wiki/Receiver_operating_characteristic
+ * see <a href="http://en.wikipedia.org/wiki/Receiver_operating_characteristic">
+ * Receiver operating characteristic (Wikipedia)</a>
```
- Fix < to > to
- `greater than`/`greater than or equal to` or `less than`/`less than or equal to` where applicable.
- Wrap it with `{{{...}}}` to print them in javadoc or use `{code ...}` or `{literal ..}`. Please refer https://github.com/apache/spark/pull/16013#discussion_r89665558
- Fix `</p>` complaint
## How was this patch tested?
Manually tested by `jekyll build` with Java 7 and 8
```
java version "1.7.0_80"
Java(TM) SE Runtime Environment (build 1.7.0_80-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)
```
```
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16013 from HyukjinKwon/SPARK-3359-errors-more.
## What changes were proposed in this pull request?
For the following workflow:
1. I have a column called time which is at minute level precision in a Streaming DataFrame
2. I want to perform groupBy time, count
3. Then I want my MemorySink to only have the last 30 minutes of counts and I perform this by
.where('time >= current_timestamp().cast("long") - 30 * 60)
what happens is that the `filter` gets pushed down before the aggregation, and the filter happens on the source data for the aggregation instead of the result of the aggregation (where I actually want to filter).
I guess the main issue here is that `current_timestamp` is non-deterministic in the streaming context and shouldn't be pushed down the filter.
Does this require us to store the `current_timestamp` for each trigger of the streaming job, that is something to discuss.
Furthermore, we want to persist current batch timestamp and watermark timestamp to the offset log so that these values are consistent across multiple executions of the same batch.
brkyvz zsxwing tdas
## How was this patch tested?
A test was added to StreamingAggregationSuite ensuring the above use case is handled. The test injects a stream of time values (in seconds) to a query that runs in complete mode and only outputs the (count) aggregation results for the past 10 seconds.
Author: Tyson Condie <tcondie@gmail.com>
Closes#15949 from tcondie/SPARK-18339.
## What changes were proposed in this pull request?
We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location.
## How was this patch tested?
Unit test that fails before the patch.
Author: Eric Liang <ekl@databricks.com>
Closes#15983 from ericl/spark-18544.
## What changes were proposed in this pull request?
This is absolutely minor. PR https://github.com/apache/spark/pull/15595 uses `dt1.asNullable == dt2.asNullable` expressions in a few places. It is however more efficient to call `dt1.sameType(dt2)`. I have replaced every instance of the first pattern with the second pattern (3/5 were introduced by #15595).
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#16041 from hvanhovell/SPARK-18058.
## What changes were proposed in this pull request?
This PR fixes a random OOM issue occurred while running `ObjectHashAggregateSuite`.
This issue can be steadily reproduced under the following conditions:
1. The aggregation must be evaluated using `ObjectHashAggregateExec`;
2. There must be an input column whose data type involves `ArrayType` (an input column of `MapType` may even cause SIGSEGV);
3. Sort-based aggregation fallback must be triggered during evaluation.
The root cause is that while falling back to sort-based aggregation, we must sort and feed already evaluated partial aggregation buffers living in the hash map to the sort-based aggregator using an external sorter. However, the underlying mutable byte buffer of `UnsafeRow`s produced by the iterator of the external sorter is reused and may get overwritten when the iterator steps forward. After the last entry is consumed, the byte buffer points to a block of uninitialized memory filled by `5a`. Therefore, while reading an `UnsafeArrayData` out of the `UnsafeRow`, `5a5a5a5a` is treated as array size and triggers a memory allocation for a ridiculously large array and immediately blows up the JVM with an OOM.
To fix this issue, we only need to add `.copy()` accordingly.
## How was this patch tested?
New regression test case added in `ObjectHashAggregateSuite`.
Author: Cheng Lian <lian@databricks.com>
Closes#15976 from liancheng/investigate-oom.
## What changes were proposed in this pull request?
`CatalogTable` has a parameter named `tracksPartitionsInCatalog`, and in `CatalogTable.toString` we use `"Partition Provider: Catalog"` to represent it. This PR fixes `DESC TABLE` to make it consistent with `CatalogTable.toString`.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16035 from cloud-fan/minor.
## What changes were proposed in this pull request?
https://github.com/apache/spark/pull/15704 will fail if we use int literal in `DROP PARTITION`, and we have reverted it in branch-2.1.
This PR reverts it in master branch, and add a regression test for it, to make sure the master branch is healthy.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16036 from cloud-fan/revert.
### What changes were proposed in this pull request?
Currently, the name validation checks are limited to table creation. It is enfored by Analyzer rule: `PreWriteCheck`.
However, table renaming and database creation have the same issues. It makes more sense to do the checks in `SessionCatalog`. This PR is to add it into `SessionCatalog`.
### How was this patch tested?
Added test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16018 from gatorsmile/nameValidate.
## What changes were proposed in this pull request?
This PR is to fix incorrect `code` tag in `sql-programming-guide.md`
## How was this patch tested?
Manually.
Author: Weiqing Yang <yangweiqing001@gmail.com>
Closes#15941 from weiqingy/fixtag.
## What changes were proposed in this pull request?
The expression `in(empty seq)` is invalid in some data source. Since `in(empty seq)` is always false, we should generate `in(empty seq)` to false literal in optimizer.
The sql `SELECT * FROM t WHERE a IN ()` throws a `ParseException` which is consistent with Hive, don't need to change that behavior.
## How was this patch tested?
Add new test case in `OptimizeInSuite`.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#15977 from jiangxb1987/isin-empty.