## What changes were proposed in this pull request?
We call stop() on a Structured Streaming Source only when the stream is shutdown when a user calls streamingQuery.stop(). We should actually stop all sources when the stream fails as well, otherwise we may leak resources, e.g. connections to Kafka.
## How was this patch tested?
Unit tests in `StreamingQuerySuite`.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#17107 from brkyvz/close-source.
## What changes were proposed in this pull request?
Changes to SQLQueryTests to make the order of the results constant.
Where possible ORDER BY has been added to match the existing expected output
## How was this patch tested?
Test runs on x86, zLinux (big endian), ppc (big endian)
Author: Pete Robbins <robbinspg@gmail.com>
Closes#17039 from robbinspg/SPARK-19710.
## What changes were proposed in this pull request?
When we resolve inline tables in analyzer, we will evaluate the expressions of inline tables.
When it evaluates a `TimeZoneAwareExpression` expression, an error will happen because the `TimeZoneAwareExpression` is not associated with timezone yet.
So we need to resolve these `TimeZoneAwareExpression`s with time zone when resolving inline tables.
## How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#17114 from viirya/resolve-timeawareexpr-inline-table.
## What changes were proposed in this pull request?
[SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779)
The PR (https://github.com/apache/spark/pull/17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future.
## How was this patch tested?
unit tests
Author: guifeng <guifengleaf@gmail.com>
Closes#17124 from gf53520/SPARK-19779.
## What changes were proposed in this pull request?
- Add tests covering different scenarios with qualified column names
- Please see Section 2 in the design doc for the various test scenarios [here](https://issues.apache.org/jira/secure/attachment/12854681/Design_ColResolution_JIRA19602.pdf)
- As part of SPARK-19602, changes are made to support three part column name. In order to aid in the review and to reduce the diff, the test scenarios are separated out into this PR.
## How was this patch tested?
- This is a **test only** change. The individual test suites were run successfully.
Author: Sunitha Kambhampati <skambha@us.ibm.com>
Closes#17067 from skambha/colResolutionTests.
## What changes were proposed in this pull request?
```
spark.sql(
s"""
|CREATE TABLE t
|USING parquet
|PARTITIONED BY(a, b)
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
```
Failed with the error message:
```
path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4c0000gn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.;
org.apache.spark.sql.AnalysisException: path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4c0000gn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.;
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:102)
```
while hive table is ok ,so we should fix it for datasource table.
The reason is that the SaveMode check is put in `InsertIntoHadoopFsRelationCommand` , and the SaveMode check actually use `path`, this is fine when we use `DataFrameWriter.save()`, because this situation of SaveMode act on `path`.
While when we use `CreateDataSourceAsSelectCommand`, the situation of SaveMode act on table, and
we have already do SaveMode check in `CreateDataSourceAsSelectCommand` for table , so we should not do SaveMode check in the following logic in `InsertIntoHadoopFsRelationCommand` for path, this is redundant and wrong logic for `CreateDataSourceAsSelectCommand`
After this PR, the following DDL will succeed, when the location has been created we will append it or overwrite it.
```
CREATE TABLE ... (PARTITIONED BY ...) LOCATION path AS SELECT ...
```
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes#16938 from windpiger/CTASDataSourceWitLocation.
## What changes were proposed in this pull request?
If we create a InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero, it will throw an exception:
```
Positive number of slices required
java.lang.IllegalArgumentException: Positive number of slices required
at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119)
at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:357)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:256)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:50)
at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9$$anonfun$apply$mcV$sp$2.apply$mcV$sp(FileIndexSuite.scala:186)
at org.apache.spark.sql.test.SQLTestUtils$class.withSQLConf(SQLTestUtils.scala:105)
at org.apache.spark.sql.execution.datasources.FileIndexSuite.withSQLConf(FileIndexSuite.scala:33)
at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply$mcV$sp(FileIndexSuite.scala:185)
at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes#17093 from windpiger/fixEmptiPathInBulkListFiles.
## What changes were proposed in this pull request?
`Catalog.refreshByPath` can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path.
However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678.
## How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#17064 from viirya/fix-refreshByPath.
## What changes were proposed in this pull request?
Right now file source always uses `InMemoryFileIndex` to scan files from a given path.
But when reading the outputs from another streaming query, the file source should use `MetadataFileIndex` to list files from the sink log. This patch adds this support.
## `MetadataFileIndex` or `InMemoryFileIndex`
```scala
spark
.readStream
.format(...)
.load("/some/path") // for a non-glob path:
// - use `MetadataFileIndex` when `/some/path/_spark_meta` exists
// - fall back to `InMemoryFileIndex` otherwise
```
```scala
spark
.readStream
.format(...)
.load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex`
```
## How was this patch tested?
two newly added tests
Author: Liwei Lin <lwlin7@gmail.com>
Closes#16987 from lw-lin/source-read-from-sink.
## What changes were proposed in this pull request?
This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file).
So, this PR introduces `wholeFile` option which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory.
## How was this patch tested?
Unit tests in `CSVSuite` and `tests.py`
Manual tests with a single 9GB CSV file in local file system, for example,
```scala
spark.read.option("wholeFile", true).option("inferSchema", true).csv("tmp.csv").count()
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16976 from HyukjinKwon/SPARK-19610.
## What changes were proposed in this pull request?
If we first cache a DataSource table, then we insert some data into the table, we should refresh the data in the cache after the insert command.
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes#16809 from windpiger/refreshCacheAfterInsert.
## What changes were proposed in this pull request?
HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html) of `rename()`, the behavior of the local filesystem and HDFS varies:
> Destination exists and is a file
> Renaming a file atop an existing file is specified as failing, raising an exception.
> - Local FileSystem : the rename succeeds; the destination file is replaced by the source file.
> - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false.
This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output.
## How was this patch tested?
This patch was tested by running `StateStoreSuite`.
Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com>
Closes#17012 from vitillo/fix_rename.
## What changes were proposed in this pull request?
`MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation`
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#17015 from cloud-fan/table-relation.
## What changes were proposed in this pull request?
If we refresh a InMemoryFileIndex with a FileStatusCache, it will first use the FileStatusCache to re-generate the cachedLeafFiles etc, then call FileStatusCache.invalidateAll.
While the order to do these two actions is wrong, this lead to the refresh action does not take effect.
```
override def refresh(): Unit = {
refresh0()
fileStatusCache.invalidateAll()
}
private def refresh0(): Unit = {
val files = listLeafFiles(rootPaths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
cachedPartitionSpec = null
}
```
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes#17079 from windpiger/fixInMemoryFileIndexRefresh.
## What changes were proposed in this pull request?
This PR proposes to replace the deprecated `json(RDD[String])` usage to `json(Dataset[String])`.
This currently produces so many warnings.
## How was this patch tested?
Fixed tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#17071 from HyukjinKwon/SPARK-15615-followup.
## What changes were proposed in this pull request?
This PR proposes to fix the lint-breaks as below:
```
[ERROR] src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java:[29,8] (imports) UnusedImports: Unused import - org.apache.spark.network.buffer.ManagedBuffer.
[ERROR] src/main/java/org/apache/spark/unsafe/types/UTF8String.java:[156,10] (modifier) ModifierOrder: 'Nonnull' annotation modifier does not precede non-annotation modifiers.
[ERROR] src/main/java/org/apache/spark/SparkFirehoseListener.java:[122] (sizes) LineLength: Line is longer than 100 characters (found 105).
[ERROR] src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java:[164,78] (coding) OneStatementPerLine: Only one statement per line allowed.
[ERROR] src/test/java/test/org/apache/spark/JavaAPISuite.java:[1157] (sizes) LineLength: Line is longer than 100 characters (found 121).
[ERROR] src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java:[149] (sizes) LineLength: Line is longer than 100 characters (found 113).
[ERROR] src/test/java/test/org/apache/spark/streaming/Java8APISuite.java:[146] (sizes) LineLength: Line is longer than 100 characters (found 122).
[ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[32,8] (imports) UnusedImports: Unused import - org.apache.spark.streaming.Time.
[ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[611] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[1317] (sizes) LineLength: Line is longer than 100 characters (found 102).
[ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java:[91] (sizes) LineLength: Line is longer than 100 characters (found 102).
[ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[113] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[164] (sizes) LineLength: Line is longer than 100 characters (found 110).
[ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[212] (sizes) LineLength: Line is longer than 100 characters (found 114).
[ERROR] src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java:[36] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java:[26,8] (imports) UnusedImports: Unused import - com.amazonaws.regions.RegionUtils.
[ERROR] src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java:[20,8] (imports) UnusedImports: Unused import - com.amazonaws.regions.RegionUtils.
[ERROR] src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java:[94] (sizes) LineLength: Line is longer than 100 characters (found 103).
[ERROR] src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java:[30,8] (imports) UnusedImports: Unused import - org.apache.spark.sql.api.java.UDF1.
[ERROR] src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java:[72] (sizes) LineLength: Line is longer than 100 characters (found 104).
[ERROR] src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java:[121] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java:[28,8] (imports) UnusedImports: Unused import - org.apache.spark.api.java.JavaRDD.
[ERROR] src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java:[29,8] (imports) UnusedImports: Unused import - org.apache.spark.api.java.JavaSparkContext.
```
## How was this patch tested?
Manually via
```bash
./dev/lint-java
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#17072 from HyukjinKwon/java-lint.
## What changes were proposed in this pull request?
currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event.
this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set.
in this PR, the query id will be removed from the set only after all the listeners handles the event
## How was this patch tested?
a test with multiple listeners has been added to StreamingQueryListenerSuite
Author: Eyal Zituny <eyal.zituny@equalum.io>
Closes#16991 from eyalzit/master.
## What changes were proposed in this pull request?
After adding the tests for subquery, we now have multiple level of directories under "sql-tests/inputs". Some times on Mac while using Finder application it creates the meta data files called ".DS_Store". When these files are present at different levels in directory hierarchy, we get duplicate test exception while running the tests as we just use the file name as the test case name. In this PR, we use the relative file path from the base directory along with the test file as the test name. Also after this change, we can have the same test file name under different directory like exists/basic.sql , in/basic.sql. Here is the truncated output of the test run after the change.
```SQL
info] SQLQueryTestSuite:
[info] - arithmetic.sql (5 seconds, 235 milliseconds)
[info] - array.sql (536 milliseconds)
[info] - blacklist.sql !!! IGNORED !!!
[info] - cast.sql (550 milliseconds)
....
....
....
[info] - union.sql (315 milliseconds)
[info] - subquery/.DS_Store !!! IGNORED !!!
[info] - subquery/exists-subquery/.DS_Store !!! IGNORED !!!
[info] - subquery/exists-subquery/exists-aggregate.sql (2 seconds, 451 milliseconds)
....
....
[info] - subquery/in-subquery/in-group-by.sql (12 seconds, 264 milliseconds)
....
....
[info] - subquery/scalar-subquery/scalar-subquery-predicate.sql (7 seconds, 769 milliseconds)
[info] - subquery/scalar-subquery/scalar-subquery-select.sql (4 seconds, 119 milliseconds)
```
Since this is a simple change, i haven't created a JIRA for it.
## How was this patch tested?
Manually verified. This is change to test infrastructure
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#17060 from dilipbiswal/sqlquerytestsuite.
Spark executes SQL commands eagerly. It does this by creating an RDD which contains the command's results. The downside to this is that any action on this RDD triggers a Spark job which is expensive and is unnecessary.
This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; it just materializes the result and puts them in a `LocalRelation`.
Added a regression test to `SQLQuerySuite`.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#17027 from hvanhovell/no-job-command.
## What changes were proposed in this pull request?
Currently we can only check the estimated stats in logical plans by debugging. We need to provide an easier and more efficient way for developers/users.
In this pr, we add EXPLAIN COST command to show stats in the optimized logical plan.
E.g.
```
spark-sql> EXPLAIN COST select count(1) from store_returns;
...
== Optimized Logical Plan ==
Aggregate [count(1) AS count(1)#24L], Statistics(sizeInBytes=16.0 B, rowCount=1, isBroadcastable=false)
+- Project, Statistics(sizeInBytes=4.3 GB, rowCount=5.76E+8, isBroadcastable=false)
+- Relation[sr_returned_date_sk#3,sr_return_time_sk#4,sr_item_sk#5,sr_customer_sk#6,sr_cdemo_sk#7,sr_hdemo_sk#8,sr_addr_sk#9,sr_store_sk#10,sr_reason_sk#11,sr_ticket_number#12,sr_return_quantity#13,sr_return_amt#14,sr_return_tax#15,sr_return_amt_inc_tax#16,sr_fee#17,sr_return_ship_cost#18,sr_refunded_cash#19,sr_reversed_charge#20,sr_store_credit#21,sr_net_loss#22] parquet, Statistics(sizeInBytes=28.6 GB, rowCount=5.76E+8, isBroadcastable=false)
...
```
## How was this patch tested?
Add test cases.
Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#16594 from wzhfy/showStats.
## What changes were proposed in this pull request?
In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException.
## How was this patch tested?
Updated unit test.
Author: Carson Wang <carson.wang@intel.com>
Closes#17009 from carsonwang/FixSQLMetrics.
## What changes were proposed in this pull request?
This pr added a logic to put malformed tokens into a new field when parsing CSV data in case of permissive modes. In the current master, if the CSV parser hits these malformed ones, it throws an exception below (and then a job fails);
```
Caused by: java.lang.IllegalArgumentException
at java.sql.Date.valueOf(Date.java:143)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137)
at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:272)
at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272)
at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:269)
at
```
In case that users load large CSV-formatted data, the job failure makes users get some confused. So, this fix set NULL for original columns and put malformed tokens in a new field.
## How was this patch tested?
Added tests in `CSVSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#16928 from maropu/SPARK-18699-2.
## What changes were proposed in this pull request?
This PR adds a special streaming deduplication operator to support `dropDuplicates` with `aggregation` and watermark. It reuses the `dropDuplicates` API but creates new logical plan `Deduplication` and new physical plan `DeduplicationExec`.
The following cases are supported:
- one or multiple `dropDuplicates()` without aggregation (with or without watermark)
- `dropDuplicates` before aggregation
Not supported cases:
- `dropDuplicates` after aggregation
Breaking changes:
- `dropDuplicates` without aggregation doesn't work with `complete` or `update` mode.
## How was this patch tested?
The new unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16970 from zsxwing/dedup.
## What changes were proposed in this pull request?
This pr fixed a class-cast exception below;
```
scala> spark.range(10).selectExpr("cast (id as decimal) as x").selectExpr("percentile(x, 0.5)").collect()
java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be cast to java.lang.Number
at org.apache.spark.sql.catalyst.expressions.aggregate.Percentile.update(Percentile.scala:141)
at org.apache.spark.sql.catalyst.expressions.aggregate.Percentile.update(Percentile.scala:58)
at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:514)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:151)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
at
```
This fix simply converts catalyst values (i.e., `Decimal`) into scala ones by using `CatalystTypeConverters`.
## How was this patch tested?
Added a test in `DataFrameSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#17028 from maropu/SPARK-19691.
## What changes were proposed in this pull request?
This pr comes from #16928 and fixed a json behaviour along with the CSV one.
## How was this patch tested?
Added tests in `JsonSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#17023 from maropu/SPARK-19695.
### What changes were proposed in this pull request?
Currently, if `NumPartitions` is not set in RepartitionByExpression, we will set it using `spark.sql.shuffle.partitions` during Planner. However, this is not following the general resolution process. This PR is to set it in `Parser` and then `Optimizer` can use the value for plan optimization.
### How was this patch tested?
Added a test case.
Author: Xiao Li <gatorsmile@gmail.com>
Closes#16988 from gatorsmile/resolveRepartition.
## What changes were proposed in this pull request?
This PR proposes to fix two.
**Skip a property without a getter in beans**
Currently, if we use a JavaBean without the getter as below:
```java
public static class BeanWithoutGetter implements Serializable {
private String a;
public void setA(String a) {
this.a = a;
}
}
BeanWithoutGetter bean = new BeanWithoutGetter();
List<BeanWithoutGetter> data = Arrays.asList(bean);
spark.createDataFrame(data, BeanWithoutGetter.class).show();
```
- Before
It throws an exception as below:
```
java.lang.NullPointerException
at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
```
- After
```
++
||
++
||
++
```
**Supports empty bean in encoder creation**
```java
public static class EmptyBean implements Serializable {}
EmptyBean bean = new EmptyBean();
List<EmptyBean> data = Arrays.asList(bean);
spark.createDataset(data, Encoders.bean(EmptyBean.class)).show();
```
- Before
throws an exception as below:
```
java.lang.UnsupportedOperationException: Cannot infer type for class EmptyBean because it is not bean-compliant
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:436)
at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:341)
```
- After
```
++
||
++
||
++
```
## How was this patch tested?
Unit test in `JavaDataFrameSuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#17013 from HyukjinKwon/SPARK-19666.
### What changes were proposed in this pull request?
Bucketed table reading and writing does not need Hive support. We can move the test cases from `sql/hive` to `sql/core`. After this PR, we can improve the test case coverage. Bucket table reading and writing can be tested with and without Hive support.
### How was this patch tested?
N/A
Author: Xiao Li <gatorsmile@gmail.com>
Closes#17004 from gatorsmile/mvTestCaseForBuckets.
## What changes were proposed in this pull request?
This PR proposes to fix new test failures on WIndows as below:
**Before**
```
KafkaRelationSuite:
- test late binding start offsets *** FAILED *** (7 seconds, 679 milliseconds)
Cause: java.nio.file.FileSystemException: C:\projects\spark\target\tmp\spark-4c4b0cd1-4cb7-4908-949d-1b0cc8addb50\topic-4-0\00000000000000000000.log -> C:\projects\spark\target\tmp\spark-4c4b0cd1-4cb7-4908-949d-1b0cc8addb50\topic-4-0\00000000000000000000.log.deleted: The process cannot access the file because it is being used by another process.
KafkaSourceSuite:
- deserialization of initial offset with Spark 2.1.0 *** FAILED *** (3 seconds, 542 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-97ef64fc-ae61-4ce3-ac59-287fd38bd824
- deserialization of initial offset written by Spark 2.1.0 *** FAILED *** (60 milliseconds)
java.nio.file.InvalidPathException: Illegal char <:> at index 2: /C:/projects/spark/external/kafka-0-10-sql/target/scala-2.11/test-classes/kafka-source-initial-offset-version-2.1.0.b
HiveDDLSuite:
- partitioned table should always put partition columns at the end of table schema *** FAILED *** (657 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-f1b83d09-850a-4bba-8e43-a2a28dfaa757;
DDLSuite:
- create a data source table without schema *** FAILED *** (94 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-a3f3c161-afae-4d6f-9182-e8642f77062b;
- SET LOCATION for managed table *** FAILED *** (219 milliseconds)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartit
+- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#99367L])
+- *FileScan parquet default.tbl[] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:projectsspark arget mpspark-15be2f2f-4ea9-4c47-bfee-1b7b49363033], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
- insert data to a data source table which has a not existed location should succeed *** FAILED *** (16 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-34987671-e8d1-4624-ba5b-db1012e1246b;
- insert into a data source table with no existed partition location should succeed *** FAILED *** (16 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-4c6ccfbf-4091-4032-9fbc-3d40c58267d5;
- read data from a data source table which has a not existed location should succeed *** FAILED *** (0 milliseconds)
- read data from a data source table with no existed partition location should succeed *** FAILED *** (0 milliseconds)
org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-6af39e37-abd1-44e8-ac68-e2dfcf67a2f3;
InputOutputMetricsSuite:
- output metrics on records written *** FAILED *** (0 milliseconds)
java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-cd69ee77-88f2-4202-bed6-19c0ee05ef55\InputOutputMetricsSuite, expected: file:///
- output metrics on records written - new Hadoop API *** FAILED *** (16 milliseconds)
java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-b69e8fcb-047b-4de8-9cdf-5f026efb6762\InputOutputMetricsSuite, expected: file:///
```
**After**
```
KafkaRelationSuite:
- test late binding start offsets !!! CANCELED !!! (62 milliseconds)
KafkaSourceSuite:
- deserialization of initial offset with Spark 2.1.0 (5 seconds, 341 milliseconds)
- deserialization of initial offset written by Spark 2.1.0 (910 milliseconds)
HiveDDLSuite:
- partitioned table should always put partition columns at the end of table schema (2 seconds)
DDLSuite:
- create a data source table without schema (828 milliseconds)
- SET LOCATION for managed table (406 milliseconds)
- insert data to a data source table which has a not existed location should succeed (406 milliseconds)
- insert into a data source table with no existed partition location should succeed (453 milliseconds)
- read data from a data source table which has a not existed location should succeed (94 milliseconds)
- read data from a data source table with no existed partition location should succeed (265 milliseconds)
InputOutputMetricsSuite:
- output metrics on records written (172 milliseconds)
- output metrics on records written - new Hadoop API (297 milliseconds)
```
## How was this patch tested?
Fixed tests in `InputOutputMetricsSuite`, `KafkaRelationSuite`, `KafkaSourceSuite`, `DDLSuite.scala` and `HiveDDLSuite`.
Manually tested via AppVeyor as below:
`InputOutputMetricsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/633-20170219-windows-test/job/ex8nvwa6tsh7rmto
`KafkaRelationSuite`: https://ci.appveyor.com/project/spark-test/spark/build/633-20170219-windows-test/job/h8dlcowew52y8ncw
`KafkaSourceSuite`: https://ci.appveyor.com/project/spark-test/spark/build/634-20170219-windows-test/job/9ybgjl7yeubxcre4
`DDLSuite`: https://ci.appveyor.com/project/spark-test/spark/build/635-20170219-windows-test
`HiveDDLSuite`: https://ci.appveyor.com/project/spark-test/spark/build/633-20170219-windows-test/job/up6o9n47er087ltb
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16999 from HyukjinKwon/windows-fix.
## What changes were proposed in this pull request?
In [SPARK-19669](0733a54a45) change the sessionState access privileges from private to public, this lead to the compile failed in TestSQLContext
this pr is a hotfix for this.
## How was this patch tested?
N/A
Author: windpiger <songjun@outlook.com>
Closes#17008 from windpiger/hotfixcompile.
## What changes were proposed in this pull request?
Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code.
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes#16964 from srowen/SPARK-19534.
## What changes were proposed in this pull request?
The Range was modified to produce "recordsRead" metric instead of "generated rows". The tests were updated and partially moved to SQLMetricsSuite.
## How was this patch tested?
Unit tests.
Author: Ala Luszczak <ala@databricks.com>
Closes#16960 from ala/range-records-read.
## What changes were proposed in this pull request?
The streaming thread in StreamExecution uses the following ways to check if it should exit:
- Catch an InterruptException.
- `StreamExecution.state` is TERMINATED.
When starting and stopping a query quickly, the above two checks may both fail:
- Hit [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) and swallow InterruptException
- StreamExecution.stop is called before `state` becomes `ACTIVE`. Then [runBatches](dcc2d540a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala (L252)) changes the state from `TERMINATED` to `ACTIVE`.
If the above cases both happen, the query will hang forever.
This PR changes `state` to `AtomicReference` and uses`compareAndSet` to make sure we only change the state from `INITIALIZING` to `ACTIVE`. It also removes the `runUninterruptibly` hack from ``HDFSMetadata`, because HADOOP-14084 won't cause any problem after we fix the race condition.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16947 from zsxwing/SPARK-19617.
## What changes were proposed in this pull request?
Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1).
This PR fix the off-by-one bug in BytesToBytesMap.
This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 .
## How was this patch tested?
Added regression test.
Author: Davies Liu <davies@databricks.com>
Closes#16844 from davies/off_by_one.
## What changes were proposed in this pull request?
We only notify `QueryExecutionListener` for several `Dataset` operations, e.g. collect, take, etc. We should also do the notification for `DataFrameWriter` operations.
## How was this patch tested?
new regression test
close https://github.com/apache/spark/pull/16664
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16962 from cloud-fan/insert.
## What changes were proposed in this pull request?
If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory.
Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired.
These changes have allowed types other than `String` to be parsed. Support for `UTF8String` and `Text` have been added (alongside `String` and `InputFormat`) and no longer require a conversion to `String` just for parsing.
I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one.
## How was this patch tested?
New and existing unit tests. No performance or load tests have been run.
Author: Nathan Howell <nhowell@godaddy.com>
Closes#16386 from NathanHowell/SPARK-18352.
## What changes were proposed in this pull request?
1, check the behavior with illegal `quantiles` and `relativeError`
2, add tests for `relativeError` > 1
3, update tests for `null` data
4, update some docs for javadoc8
## How was this patch tested?
local test in spark-shell
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Author: Ruifeng Zheng <ruifengz@foxmail.com>
Closes#16776 from zhengruifeng/fix_approxQuantile.
- Move external/java8-tests tests into core, streaming, sql and remove
- Remove MaxPermGen and related options
- Fix some reflection / TODOs around Java 8+ methods
- Update doc references to 1.7/1.8 differences
- Remove Java 7/8 related build profiles
- Update some plugins for better Java 8 compatibility
- Fix a few Java-related warnings
For the future:
- Update Java 8 examples to fully use Java 8
- Update Java tests to use lambdas for simplicity
- Update Java internal implementations to use lambdas
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#16871 from srowen/SPARK-19493.
## What changes were proposed in this pull request?
Jira: https://issues.apache.org/jira/browse/SPARK-19618
Moved the check for validating number of buckets from `DataFrameWriter` to `BucketSpec` creation
## How was this patch tested?
- Added more unit tests
Author: Tejas Patil <tejasp@fb.com>
Closes#16948 from tejasapatil/SPARK-19618_max_buckets.
## What changes were proposed in this pull request?
This is a follow-up pr of #16308.
This pr enables timezone support in CSV/JSON parsing.
We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone).
The datasources should use the `timeZone` option to format/parse to write/read timestamp values.
Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values.
For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are:
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "GMT")
scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala> df.show()
+-------------------+
|ts |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
scala> df.write.json("/path/to/gmtjson")
```
```sh
$ cat /path/to/gmtjson/part-*
{"ts":"2016-01-01T00:00:00.000Z"}
```
whereas setting the option to `"PST"`, they are:
```scala
scala> df.write.option("timeZone", "PST").json("/path/to/pstjson")
```
```sh
$ cat /path/to/pstjson/part-*
{"ts":"2015-12-31T16:00:00.000-08:00"}
```
We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info:
```scala
scala> val schema = new StructType().add("ts", TimestampType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true))
scala> spark.read.schema(schema).json("/path/to/gmtjson").show()
+-------------------+
|ts |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show()
+-------------------+
|ts |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```
And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option:
```scala
scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson")
```
```sh
$ cat /path/to/jstjson/part-*
{"ts":"2016-01-01T09:00:00"}
```
```scala
// wrong result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show()
+-------------------+
|ts |
+-------------------+
|2016-01-01 09:00:00|
+-------------------+
// correct result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show()
+-------------------+
|ts |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```
This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option.
## How was this patch tested?
Existing tests and added some tests.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#16750 from ueshin/issues/SPARK-18937.
## What changes were proposed in this pull request?
when we insert data into a datasource table use `sqlText`, and the table has an not exists location,
this will throw an Exception.
example:
```
spark.sql("create table t(a string, b int) using parquet")
spark.sql("alter table t set location '/xx'")
spark.sql("insert into table t select 'c', 1")
```
Exception:
```
com.google.common.util.concurrent.UncheckedExecutionException: org.apache.spark.sql.AnalysisException: Path does not exist: /xx;
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4814)
at com.google.common.cache.LocalCache$LocalLoadingCache.apply(LocalCache.java:4830)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:122)
at org.apache.spark.sql.hive.HiveSessionCatalog.lookupRelation(HiveSessionCatalog.scala:69)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:456)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:465)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:463)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:453)
```
As discussed following comments, we should unify the action when we reading from or writing to a datasource table with a non pre-existing locaiton:
1. reading from a datasource table: return 0 rows
2. writing to a datasource table: write data successfully
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes#16672 from windpiger/insertNotExistLocation.
Move `SQLViewSuite` from `sql/hive` to `sql/core`, so we can test the view supports without hive metastore. Also moved the test cases that specified to hive to `HiveSQLViewSuite`.
Improve the test coverage of SQLViewSuite, cover the following cases:
1. view resolution(possibly a referenced table/view have changed after the view creation);
2. handle a view with user specified column names;
3. improve the test cases for a nested view.
Also added a test case for cyclic view reference, which is a known issue that is not fixed yet.
N/A
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#16674 from jiangxb1987/view-test.
## What changes were proposed in this pull request?
This PR adds the third and final set of tests for EXISTS subquery.
File name | Brief description
------------------------| -----------------
exists-cte.sql |Tests Exist subqueries referencing CTE
exists-joins-and-set-ops.sql|Tests Exists subquery used in Joins (Both when joins occurs in outer and suquery blocks)
DB2 results are attached here as reference :
[exists-cte-db2.txt](https://github.com/apache/spark/files/752091/exists-cte-db2.txt)
[exists-joins-and-set-ops-db2.txt](https://github.com/apache/spark/files/753283/exists-joins-and-set-ops-db2.txt) (updated)
## How was this patch tested?
The test result is compared with the result run from another SQL engine (in this case is IBM DB2). If the result are equivalent, we assume the result is correct.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#16802 from dilipbiswal/exists-pr3.
## What changes were proposed in this pull request?
This PR adds new test cases for scalar subquery in predicate context
## How was this patch tested?
The test result is compared with the result run from another SQL engine (in this case is IBM DB2). If the result are equivalent, we assume the result is correct.
Author: Nattavut Sutyanyong <nsy.can@gmail.com>
Closes#16798 from nsyca/18873-2.
## What changes were proposed in this pull request?
Implementing a mapping between executionId and corresponding QueryExecution in SQLExecution.
## How was this patch tested?
Adds a unit test.
Author: Ala Luszczak <ala@databricks.com>
Closes#16940 from ala/execution-id.
## What changes were proposed in this pull request?
The reason for test failure is that the property “oracle.jdbc.mapDateToTimestamp” set by the test was getting converted into all lower case. Oracle database expects this property in case-sensitive manner.
This test was passing in previous releases because connection properties were sent as user specified for the test case scenario. Fixes to handle all option uniformly in case-insensitive manner, converted the JDBC connection properties also to lower case.
This PR enhances CaseInsensitiveMap to keep track of input case-sensitive keys , and uses those when creating connection properties that are passed to the JDBC connection.
Alternative approach PR https://github.com/apache/spark/pull/16847 is to pass original input keys to JDBC data source by adding check in the Data source class and handle case-insensitivity in the JDBC source code.
## How was this patch tested?
Added new test cases to JdbcSuite , and OracleIntegrationSuite. Ran docker integration tests passed on my laptop, all tests passed successfully.
Author: sureshthalamati <suresh.thalamati@gmail.com>
Closes#16891 from sureshthalamati/jdbc_case_senstivity_props_fix-SPARK-19318.
## What changes were proposed in this pull request?
This pull request introduces a simple hint infrastructure to SQL and implements broadcast join hint using the infrastructure.
The hint syntax looks like the following:
```
SELECT /*+ BROADCAST(t) */ * FROM t
```
For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of relation aliases can be specified in the hint. A broadcast hint plan node will be inserted on top of any relation (that is not aliased differently), subquery, or common table expression that match the specified name.
The hint resolution works by recursively traversing down the query plan to find a relation or subquery that matches one of the specified broadcast aliases. The traversal does not go past beyond any existing broadcast hints, subquery aliases. This rule happens before common table expressions.
Note that there was an earlier patch in https://github.com/apache/spark/pull/14426. This is a rewrite of that patch, with different semantics and simpler test cases.
## How was this patch tested?
Added a new unit test suite for the broadcast hint rule (SubstituteHintsSuite) and new test cases for parser change (in PlanParserSuite). Also added end-to-end test case in BroadcastSuite.
Author: Reynold Xin <rxin@databricks.com>
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#16925 from rxin/SPARK-16475-broadcast-hint.
## What changes were proposed in this pull request?
Current `CREATE TEMPORARY TABLE ... ` is deprecated and recommend users to use `CREATE TEMPORARY VIEW ...` And it does not support `IF NOT EXISTS `clause. However, if there is an existing temporary view defined, it is possible to unintentionally replace this existing view by issuing `CREATE TEMPORARY TABLE ...` with the same table/view name.
This PR is to disallow `CREATE TEMPORARY TABLE ...` with an existing view name.
Under the cover, `CREATE TEMPORARY TABLE ...` will be changed to create temporary view, however, passing in a flag `replace=false`, instead of currently `true`. So when creating temporary view under the cover, if there is existing view with the same name, the operation will be blocked.
## How was this patch tested?
New unit test case is added and updated some existing test cases to adapt the new behavior
Author: Xin Wu <xinwu@us.ibm.com>
Closes#16878 from xwu0226/block_duplicate_temp_table.
## What changes were proposed in this pull request?
When a query uses a temp checkpoint dir, it's better to delete it if it's stopped without errors.
## How was this patch tested?
New unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16880 from zsxwing/delete-temp-checkpoint.
Improve the test for SPARK-19514, so that it's clear which stage is being cancelled.
Author: Ala Luszczak <ala@databricks.com>
Closes#16914 from ala/fix-range-test.
## What changes were proposed in this pull request?
Currently the udf `to_date` has different return value with an invalid date input.
```
SELECT to_date('2015-07-22', 'yyyy-dd-MM') -> return `2016-10-07`
SELECT to_date('2014-31-12') -> return null
```
As discussed in JIRA [SPARK-19496](https://issues.apache.org/jira/browse/SPARK-19496), we should return null in both situations when the input date is invalid
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes#16870 from windpiger/to_date.
## What changes were proposed in this pull request?
This change add an optional argument to `SparkContext.cancelStage()` and `SparkContext.cancelJob()` functions, which allows the caller to provide exact reason for the cancellation.
## How was this patch tested?
Adds unit test.
Author: Ala Luszczak <ala@databricks.com>
Closes#16887 from ala/cancel.
## What changes were proposed in this pull request?
Reading from an existing ORC table which contains `char` or `varchar` columns can fail with a `ClassCastException` if the table metadata has been created using Spark. This is caused by the fact that spark internally replaces `char` and `varchar` columns with a `string` column.
This PR fixes this by adding the hive type to the `StructField's` metadata under the `HIVE_TYPE_STRING` key. This is picked up by the `HiveClient` and the ORC reader, see https://github.com/apache/spark/pull/16060 for more details on how the metadata is used.
## How was this patch tested?
Added a regression test to `OrcSourceSuite`.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#16804 from hvanhovell/SPARK-19459.
## What changes were proposed in this pull request?
Set currentVars to null in GenerateOrdering.genComparisons before genCode is called. genCode ignores INPUT_ROW if currentVars is not null and in genComparisons we want it to use INPUT_ROW.
## How was this patch tested?
Added test with 2 queries in WholeStageCodegenSuite
Author: Bogdan Raducanu <bogdan.rdc@gmail.com>
Closes#16852 from bogdanrdc/SPARK-19512.
## What changes were proposed in this pull request?
Previously range operator could not be interrupted. For example, using DAGScheduler.cancelStage(...) on a query with range might have been ineffective.
This change adds periodic checks of TaskContext.isInterrupted to codegen version, and InterruptibleOperator to non-codegen version.
I benchmarked the performance of codegen version on a sample query `spark.range(1000L * 1000 * 1000 * 10).count()` and there is no measurable difference.
## How was this patch tested?
Adds a unit test.
Author: Ala Luszczak <ala@databricks.com>
Closes#16872 from ala/SPARK-19514b.
## What changes were proposed in this pull request?
SPARK-19265 had made table relation cache general; this follow-up aims to make `tableRelationCache`'s maximum size configurable.
In order to do sanity-check, this patch also adds a `checkValue()` method to `TypedConfigBuilder`.
## How was this patch tested?
new test case: `test("conf entry: checkValue()")`
Author: Liwei Lin <lwlin7@gmail.com>
Closes#16736 from lw-lin/conf.
## What changes were proposed in this pull request?
This PR adds the second set of tests for EXISTS subquery.
File name | Brief description
------------------------| -----------------
exists-aggregate.sql |Tests aggregate expressions in outer query and EXISTS subquery.
exists-having.sql|Tests HAVING clause in subquery.
exists-orderby-limit.sql|Tests EXISTS subquery support with ORDER BY and LIMIT clauses.
DB2 results are attached here as reference :
[exists-aggregate-db2.txt](https://github.com/apache/spark/files/743287/exists-aggregate-db2.txt)
[exists-having-db2.txt](https://github.com/apache/spark/files/743286/exists-having-db2.txt)
[exists-orderby-limit-db2.txt](https://github.com/apache/spark/files/743288/exists-orderby-limit-db2.txt)
## How the patch was tested.
The test result is compared with the result run from another SQL engine (in this case is IBM DB2). If the result are equivalent, we assume the result is correct.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#16760 from dilipbiswal/exists-pr2.
## What changes were proposed in this pull request?
when csv infer schema, it does not use user defined csvoptions to parse the field, such as `inf`, `-inf` which are should be parsed to DoubleType
this pr add `options.nanValue`, `options.negativeInf`, `options.positiveIn` to check if the field is a DoubleType
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes#16834 from windpiger/fixinferInfSchemaCsv.
## What changes were proposed in this pull request?
This PR adds new test cases for scalar subquery in SELECT clause.
## How was this patch tested?
The test result is compared with the result run from another SQL engine (in this case is IBM DB2). If the result are equivalent, we assume the result is correct.
Author: Nattavut Sutyanyong <nsy.can@gmail.com>
Closes#16712 from nsyca/18873.
## What changes were proposed in this pull request?
`mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState`
*Requirements*
- Users should be able to specify a function that can do the following
- Access the input row corresponding to a key
- Access the previous state corresponding to a key
- Optionally, update or remove the state
- Output any number of new rows (or none at all)
*Proposed API*
```
// ------------ New methods on KeyValueGroupedDataset ------------
class KeyValueGroupedDataset[K, V] {
// Scala friendly
def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U)
def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U])
// Java friendly
def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
}
// ------------------- New Java-friendly function classes -------------------
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}
// ---------------------- Wrapper class for state data ----------------------
trait State[S] {
def exists(): Boolean
def get(): S // throws Exception is state does not exist
def getOption(): Option[S]
def update(newState: S): Unit
def remove(): Unit // exists() will be false after this
}
```
Key Semantics of the State class
- The state can be null.
- If the state.remove() is called, then state.exists() will return false, and getOption will returm None.
- After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...).
- None of the operations are thread-safe. This is to avoid memory barriers.
*Usage*
```
val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => {
val newCount = words.size + runningCount.getOption.getOrElse(0L)
runningCount.update(newCount)
(word, newCount)
}
dataset // type is Dataset[String]
.groupByKey[String](w => w) // generates KeyValueGroupedDataset[String, String]
.mapGroupsWithState[Long, (String, Long)](stateFunc) // returns Dataset[(String, Long)]
```
## How was this patch tested?
New unit tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#16758 from tdas/mapWithState.
### What changes were proposed in this pull request?
Prior to Spark 2.1, the option names are case sensitive for all the formats. Since Spark 2.1, the option key names become case insensitive except the format `Text` and `LibSVM `. This PR is to fix these issues.
Also, add a check to know whether the input option vector type is legal for `LibSVM`.
### How was this patch tested?
Added test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16737 from gatorsmile/libSVMTextOptions.
## What changes were proposed in this pull request?
The optimizer tries to remove redundant alias only projections from the query plan using the `RemoveAliasOnlyProject` rule. The current rule identifies removes such a project and rewrites the project's attributes in the **entire** tree. This causes problems when parts of the tree are duplicated (for instance a self join on a temporary view/CTE) and the duplicated part contains the alias only project, in this case the rewrite will break the tree.
This PR fixes these problems by using a blacklist for attributes that are not to be moved, and by making sure that attribute remapping is only done for the parent tree, and not for unrelated parts of the query plan.
The current tree transformation infrastructure works very well if the transformation at hand requires little or a global contextual information. In this case we need to know both the attributes that were not to be moved, and we also needed to know which child attributes were modified. This cannot be done easily using the current infrastructure, and solutions typically involves transversing the query plan multiple times (which is super slow). I have moved around some code in `TreeNode`, `QueryPlan` and `LogicalPlan`to make this much more straightforward; this basically allows you to manually traverse the tree.
This PR subsumes the following PRs by windpiger:
Closes https://github.com/apache/spark/pull/16267
Closes https://github.com/apache/spark/pull/16255
## How was this patch tested?
I have added unit tests to `RemoveRedundantAliasAndProjectSuite` and I have added integration tests to the `SQLQueryTestSuite.union` and `SQLQueryTestSuite.cte` test cases.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#16757 from hvanhovell/SPARK-18609.
## What changes were proposed in this pull request?
This pull request makes SQLConf slightly more extensible by removing the visibility limitations on the build* functions.
## How was this patch tested?
N/A - there are no logic changes and everything should be covered by existing unit tests.
Author: Reynold Xin <rxin@databricks.com>
Closes#16835 from rxin/SPARK-19495.
## What changes were proposed in this pull request?
This pull request adds two new user facing functions:
- `to_date` which accepts an expression and a format and returns a date.
- `to_timestamp` which accepts an expression and a format and returns a timestamp.
For example, Given a date in format: `2016-21-05`. (YYYY-dd-MM)
### Date Function
*Previously*
```
to_date(unix_timestamp(lit("2016-21-05"), "yyyy-dd-MM").cast("timestamp"))
```
*Current*
```
to_date(lit("2016-21-05"), "yyyy-dd-MM")
```
### Timestamp Function
*Previously*
```
unix_timestamp(lit("2016-21-05"), "yyyy-dd-MM").cast("timestamp")
```
*Current*
```
to_timestamp(lit("2016-21-05"), "yyyy-dd-MM")
```
### Tasks
- [X] Add `to_date` to Scala Functions
- [x] Add `to_date` to Python Functions
- [x] Add `to_date` to SQL Functions
- [X] Add `to_timestamp` to Scala Functions
- [x] Add `to_timestamp` to Python Functions
- [x] Add `to_timestamp` to SQL Functions
- [x] Add function to R
## How was this patch tested?
- [x] Add Functions to `DateFunctionsSuite`
- Test new `ParseToTimestamp` Expression (*not necessary*)
- Test new `ParseToDate` Expression (*not necessary*)
- [x] Add test for R
- [x] Add test for Python in test.py
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: anabranch <wac.chambers@gmail.com>
Author: Bill Chambers <bill@databricks.com>
Author: anabranch <bill@databricks.com>
Closes#16138 from anabranch/SPARK-16609.
## What changes were proposed in this pull request?
This change introduces a new metric "number of generated rows". It is used exclusively for Range, which is a leaf in the query tree, yet doesn't read any input data, and therefore cannot report "recordsRead".
Additionally the way in which the metrics are reported by the JIT-compiled version of Range was changed. Previously, it was immediately reported that all the records were produced. This could be confusing for a user monitoring execution progress in the UI. Now, the metric is updated gradually.
In order to avoid negative impact on Range performance, the code generation was reworked. The values are now produced in batches in the tighter inner loop, while the metrics are updated in the outer loop.
The change also contains a number of unit tests, which should help ensure the correctness of metrics for various input sources.
## How was this patch tested?
Unit tests.
Author: Ala Luszczak <ala@databricks.com>
Closes#16829 from ala/SPARK-19447.
## What changes were proposed in this pull request?
This PR refactors CSV schema inference path to be consistent with JSON data source and moves some filtering codes having the similar/same logics into `CSVUtils`.
It makes the methods in classes have consistent arguments with JSON ones. (this PR renames `.../json/InferSchema.scala` → `.../json/JsonInferSchema.scala`)
`CSVInferSchema` and `JsonInferSchema`
``` scala
private[csv] object CSVInferSchema {
...
def infer(
csv: Dataset[String],
caseSensitive: Boolean,
options: CSVOptions): StructType = {
...
```
``` scala
private[sql] object JsonInferSchema {
...
def infer(
json: RDD[String],
columnNameOfCorruptRecord: String,
configOptions: JSONOptions): StructType = {
...
```
These allow schema inference from `Dataset[String]` directly, meaning the similar functionalities that use `JacksonParser`/`JsonInferSchema` for JSON can be easily implemented by `UnivocityParser`/`CSVInferSchema` for CSV.
This completes refactoring CSV datasource and they are now pretty consistent.
## How was this patch tested?
Existing tests should cover this and
```
./dev/change-scala-version.sh 2.10
./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16680 from HyukjinKwon/SPARK-16101-schema-inference.
## What changes were proposed in this pull request?
The current way of resolving `InsertIntoTable` and `CreateTable` is convoluted: sometimes we replace them with concrete implementation commands during analysis, sometimes during planning phase.
And the error checking logic is also a mess: we may put it in extended analyzer rules, or extended checking rules, or `CheckAnalysis`.
This PR simplifies the data source analysis:
1. `InsertIntoTable` and `CreateTable` are always unresolved and need to be replaced by concrete implementation commands during analysis.
2. The error checking logic is mainly in 2 rules: `PreprocessTableCreation` and `PreprocessTableInsertion`.
## How was this patch tested?
existing test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16269 from cloud-fan/ddl.
## What changes were proposed in this pull request?
This PR proposes to enable the tests for Parquet filter pushdown with binary and string.
This was disabled in https://github.com/apache/spark/pull/16106 due to Parquet's issue but it is now revived in https://github.com/apache/spark/pull/16791 after upgrading Parquet to 1.8.2.
## How was this patch tested?
Manually tested `ParquetFilterSuite` via IDE.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16817 from HyukjinKwon/SPARK-17213.
### What changes were proposed in this pull request?
So far, we allow users to create a table with an empty schema: `CREATE TABLE tab1`. This could break many code paths if we enable it. Thus, we should follow Hive to block it.
For Hive serde tables, some serde libraries require the specified schema and record it in the metastore. To get the list, we need to check `hive.serdes.using.metastore.for.schema,` which contains a list of serdes that require user-specified schema. The default values are
- org.apache.hadoop.hive.ql.io.orc.OrcSerde
- org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
- org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe
- org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
- org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
- org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
- org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
### How was this patch tested?
Added test cases for both Hive and data source tables
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16636 from gatorsmile/fixEmptyTableSchema.
## What changes were proposed in this pull request?
DataFrame.except doesn't work for UDT columns. It is because `ExtractEquiJoinKeys` will run `Literal.default` against UDT. However, we don't handle UDT in `Literal.default` and an exception will throw like:
java.lang.RuntimeException: no default for type
org.apache.spark.ml.linalg.VectorUDT3bfc3ba7
at org.apache.spark.sql.catalyst.expressions.Literal$.default(literals.scala:179)
at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:117)
at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:110)
More simple fix is just let `Literal.default` handle UDT by its sql type. So we can use more efficient join type on UDT.
Besides `except`, this also fixes other similar scenarios, so in summary this fixes:
* `except` on two Datasets with UDT
* `intersect` on two Datasets with UDT
* `Join` with the join conditions using `<=>` on UDT columns
## How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16765 from viirya/df-except-for-udt.
## What changes were proposed in this pull request?
There is a metadata introduced before to mark the optional columns in merged Parquet schema for filter predicate pushdown. As we upgrade to Parquet 1.8.2 which includes the fix for the pushdown of optional columns, we don't need this metadata now.
## How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#16756 from viirya/remove-optional-metadata.
## What changes were proposed in this pull request?
1, add the multi-cols support based on current private api
2, add the multi-cols support to pyspark
## How was this patch tested?
unit tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Author: Ruifeng Zheng <ruifengz@foxmail.com>
Closes#12135 from zhengruifeng/quantile4multicols.
## What changes were proposed in this pull request?
In StructuredStreaming, if a new trigger was skipped because no new data arrived, we suddenly report nothing for the metrics `stateOperator`. We could however easily report the metrics from `lastExecution` to ensure continuity of metrics.
## How was this patch tested?
Regression test in `StreamingQueryStatusAndProgressSuite`
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#16716 from brkyvz/state-agg.
### What changes were proposed in this pull request?
Currently, the function `to_json` allows users to provide options for generating JSON. However, it does not pass it to `JacksonGenerator`. Thus, it ignores the user-provided options. This PR is to fix it. Below is an example.
```Scala
val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a")
val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")
df.select(to_json($"a", options)).show(false)
```
The current output is like
```
+--------------------------------------+
|structtojson(a) |
+--------------------------------------+
|{"_1":"2015-08-26T18:00:00.000-07:00"}|
+--------------------------------------+
```
After the fix, the output is like
```
+-------------------------+
|structtojson(a) |
+-------------------------+
|{"_1":"26/08/2015 18:00"}|
+-------------------------+
```
### How was this patch tested?
Added test cases for both `from_json` and `to_json`
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16745 from gatorsmile/toJson.
## What changes were proposed in this pull request?
This PR adds the first set of tests for EXISTS subquery.
File name | Brief description
------------------------| -----------------
exists-basic.sql |Tests EXISTS and NOT EXISTS subqueries with both correlated and local predicates.
exists-within-and-or.sql|Tests EXISTS and NOT EXISTS subqueries embedded in AND or OR expression.
DB2 results are attached here as reference :
[exists-basic-db2.txt](https://github.com/apache/spark/files/733031/exists-basic-db2.txt)
[exists-and-or-db2.txt](https://github.com/apache/spark/files/733030/exists-and-or-db2.txt)
## How was this patch tested?
This patch is adding tests.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#16710 from dilipbiswal/exist-basic.
## What changes were proposed in this pull request?
This pr added a variable for a UDF name in `ScalaUDF`.
Then, if the variable filled, `DataFrame#explain` prints the name.
## How was this patch tested?
Added a test in `UDFSuite`.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#16707 from maropu/SPARK-19338.
## What changes were proposed in this pull request?
As of Spark 2.1, Spark SQL assumes the machine timezone for datetime manipulation, which is bad if users are not in the same timezones as the machines, or if different users have different timezones.
We should introduce a session local timezone setting that is used for execution.
An explicit non-goal is locale handling.
### Semantics
Setting the session local timezone means that the timezone-aware expressions listed below should use the timezone to evaluate values, and also it should be used to convert (cast) between string and timestamp or between timestamp and date.
- `CurrentDate`
- `CurrentBatchTimestamp`
- `Hour`
- `Minute`
- `Second`
- `DateFormatClass`
- `ToUnixTimestamp`
- `UnixTimestamp`
- `FromUnixTime`
and below are implicitly timezone-aware through cast from timestamp to date:
- `DayOfYear`
- `Year`
- `Quarter`
- `Month`
- `DayOfMonth`
- `WeekOfYear`
- `LastDay`
- `NextDay`
- `TruncDate`
For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values evaluated by some of timezone-aware expressions are:
```scala
scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false)
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|ts |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)|
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|2016-01-01 00:00:00|2016 |1 |1 |0 |0 |0 |
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
```
whereas setting the session local timezone to `"PST"`, they are:
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "PST")
scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false)
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|ts |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)|
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|2015-12-31 16:00:00|2015 |12 |31 |16 |0 |0 |
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
```
Notice that even if you set the session local timezone, it affects only in `DataFrame` operations, neither in `Dataset` operations, `RDD` operations nor in `ScalaUDF`s. You need to properly handle timezone by yourself.
### Design of the fix
I introduced an analyzer to pass session local timezone to timezone-aware expressions and modified DateTimeUtils to take the timezone argument.
## How was this patch tested?
Existing tests and added tests for timezone aware expressions.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#16308 from ueshin/issues/SPARK-18350.
## What changes were proposed in this pull request?
In CachedTableSuite, we are not setting up the test data at the beginning. Some tests fail while trying to run individually. When running the entire suite they run fine.
Here are some of the tests that fail -
- test("SELECT star from cached table")
- test("Self-join cached")
As part of this simplified a couple of tests by calling a support method to count the number of
InMemoryRelations.
## How was this patch tested?
Ran the failing tests individually.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#16688 from dilipbiswal/cachetablesuite_simple.
## What changes were proposed in this pull request?
acceptType() in UDT will no only accept the same type but also all base types
## How was this patch tested?
Manual test using a set of generated UDTs fixing acceptType() in my user defined types
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: gmoehler <moehler@de.ibm.com>
Closes#16660 from gmoehler/master.
## What changes were proposed in this pull request?
This PR will report proper error messages when a subquery expression contain an invalid plan. This problem is fixed by calling CheckAnalysis for the plan inside a subquery.
## How was this patch tested?
Existing tests and two new test cases on 2 forms of subquery, namely, scalar subquery and in/exists subquery.
````
-- TC 01.01
-- The column t2b in the SELECT of the subquery is invalid
-- because it is neither an aggregate function nor a GROUP BY column.
select t1a, t2b
from t1, t2
where t1b = t2c
and t2b = (select max(avg)
from (select t2b, avg(t2b) avg
from t2
where t2a = t1.t1b
)
)
;
-- TC 01.02
-- Invalid due to the column t2b not part of the output from table t2.
select *
from t1
where t1a in (select min(t2a)
from t2
group by t2c
having t2c in (select max(t3c)
from t3
group by t3b
having t3b > t2b ))
;
````
Author: Nattavut Sutyanyong <nsy.can@gmail.com>
Closes#16572 from nsyca/18863.
## What changes were proposed in this pull request?
Similar to SPARK-15165, codegen is in danger of arbitrary code injection. The root cause is how variable names are created by codegen.
In GenerateExec#codeGenAccessor, a variable name is created like as follows.
```
val value = ctx.freshName(name)
```
The variable `value` is named based on the value of the variable `name` and the value of `name` is from schema given by users so an attacker can attack with queries like as follows.
```
SELECT inline(array(cast(struct(1) AS struct<`=new Object() { {f();} public void f() {throw new RuntimeException("This exception is injected.");} public int x;}.x`:int>)))
```
In the example above, a RuntimeException is thrown but an attacker can replace it with arbitrary code.
## How was this patch tested?
Added a new test case.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#16681 from sarutak/SPARK-19334.
## What changes were proposed in this pull request?
This PR fixes the code in Optimizer phase where the NULL-aware expression of a NOT IN query is expanded in Rule `RewritePredicateSubquery`.
Example:
The query
select a1,b1
from t1
where (a1,b1) not in (select a2,b2
from t2);
has the (a1, b1) = (a2, b2) rewritten from (before this fix):
Join LeftAnti, ((isnull((_1#2 = a2#16)) || isnull((_2#3 = b2#17))) || ((_1#2 = a2#16) && (_2#3 = b2#17)))
to (after this fix):
Join LeftAnti, (((_1#2 = a2#16) || isnull((_1#2 = a2#16))) && ((_2#3 = b2#17) || isnull((_2#3 = b2#17))))
## How was this patch tested?
sql/test, catalyst/test and new test cases in SQLQueryTestSuite.
Author: Nattavut Sutyanyong <nsy.can@gmail.com>
Closes#16467 from nsyca/19017.
## What changes were proposed in this pull request?
Spark SQL follows MySQL to do the implicit type conversion for binary comparison: http://dev.mysql.com/doc/refman/5.7/en/type-conversion.html
However, this may return confusing result, e.g. `1 = 'true'` will return true, `19157170390056973L = '19157170390056971'` will return true.
I think it's more reasonable to follow postgres in this case, i.e. cast string to the type of the other side, but return null if the string is not castable to keep hive compatibility.
## How was this patch tested?
newly added tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#15880 from cloud-fan/compare.
## What changes were proposed in this pull request?
As adaptive query execution may change the number of partitions in different batches, it may break streaming queries. Hence, we should disallow this feature in Structured Streaming.
## How was this patch tested?
`test("SPARK-19268: Adaptive query execution should be disallowed")`.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16683 from zsxwing/SPARK-19268.
## What changes were proposed in this pull request?
Currently, running the codes in Java
```java
spark.udf().register("inc", new UDF1<Long, Long>() {
Override
public Long call(Long i) {
return i + 1;
}
}, DataTypes.LongType);
spark.range(10).toDF("x").createOrReplaceTempView("tmp");
Row result = spark.sql("SELECT inc(x) FROM tmp GROUP BY inc(x)").head();
Assert.assertEquals(7, result.getLong(0));
```
fails as below:
```
org.apache.spark.sql.AnalysisException: expression 'tmp.`x`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [UDF(x#19L)], [UDF(x#19L) AS UDF(x)#23L]
+- SubqueryAlias tmp, `tmp`
+- Project [id#16L AS x#19L]
+- Range (0, 10, step=1, splits=Some(8))
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
```
The root cause is because we were creating the function every time when it needs to build as below:
```scala
scala> def inc(i: Int) = i + 1
inc: (i: Int)Int
scala> (inc(_: Int)).hashCode
res15: Int = 1231799381
scala> (inc(_: Int)).hashCode
res16: Int = 2109839984
scala> (inc(_: Int)) == (inc(_: Int))
res17: Boolean = false
```
This seems leading to the comparison failure between `ScalaUDF`s created from Java UDF API, for example, in `Expression.semanticEquals`.
In case of Scala one, it seems already fine.
Both can be tested easily as below if any reviewer is more comfortable with Scala:
```scala
val df = Seq((1, 10), (2, 11), (3, 12)).toDF("x", "y")
val javaUDF = new UDF1[Int, Int] {
override def call(i: Int): Int = i + 1
}
// spark.udf.register("inc", javaUDF, IntegerType) // Uncomment this for Java API
// spark.udf.register("inc", (i: Int) => i + 1) // Uncomment this for Scala API
df.createOrReplaceTempView("tmp")
spark.sql("SELECT inc(y) FROM tmp GROUP BY inc(y)").show()
```
## How was this patch tested?
Unit test in `JavaUDFSuite.java` and `./dev/lint-java`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16553 from HyukjinKwon/SPARK-9435.
## What changes were proposed in this pull request?
when we append data to a existed partitioned datasource table, the InsertIntoHadoopFsRelationCommand.getCustomPartitionLocations currently
return the same location with Hive default, it should return None.
## How was this patch tested?
Author: windpiger <songjun@outlook.com>
Closes#16642 from windpiger/appendSchema.
### What changes were proposed in this pull request?
It is weird to create Hive source tables when using InMemoryCatalog. We are unable to operate it. This PR is to block users to create Hive source tables.
### How was this patch tested?
Fixed the test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#16587 from gatorsmile/blockHiveTable.
## What changes were proposed in this pull request?
This PR refactors CSV read path to be consistent with JSON data source. It makes the methods in classes have consistent arguments with JSON ones.
`UnivocityParser` and `JacksonParser`
``` scala
private[csv] class UnivocityParser(
schema: StructType,
requiredSchema: StructType,
options: CSVOptions) extends Logging {
...
def parse(input: String): Seq[InternalRow] = {
...
```
``` scala
class JacksonParser(
schema: StructType,
columnNameOfCorruptRecord: String,
options: JSONOptions) extends Logging {
...
def parse(input: String): Option[InternalRow] = {
...
```
These allow parsing an iterator (`String` to `InternalRow`) as below for both JSON and CSV:
```scala
iter.flatMap(parser.parse)
```
## How was this patch tested?
Existing tests should cover this.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#16669 from HyukjinKwon/SPARK-16101-read.
## What changes were proposed in this pull request?
PythonUDF is unevaluable, which can not be used inside a join condition, currently the optimizer will push a PythonUDF which accessing both side of join into the join condition, then the query will fail to plan.
This PR fix this issue by checking the expression is evaluable or not before pushing it into Join.
## How was this patch tested?
Add a regression test.
Author: Davies Liu <davies@databricks.com>
Closes#16581 from davies/pyudf_join.
## What changes were proposed in this pull request?
The initial shouldFilterOut() method invocation filter the root path name(table name in the intial call) and remove if it contains _. I moved the check one level below, so it first list files/directories in the given root path and then apply filter.
(Please fill in changes proposed in this fix)
## How was this patch tested?
Added new test case for this scenario
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: jayadevanmurali <jayadevan.m@tcs.com>
Author: jayadevan <jayadevan.m@tcs.com>
Closes#16635 from jayadevanmurali/branch-0.1-SPARK-19059.
## What changes were proposed in this pull request?
We have a table relation plan cache in `HiveMetastoreCatalog`, which caches a lot of things: file status, resolved data source, inferred schema, etc.
However, it doesn't make sense to limit this cache with hive support, we should move it to SQL core module so that users can use this cache without hive support.
It can also reduce the size of `HiveMetastoreCatalog`, so that it's easier to remove it eventually.
main changes:
1. move the table relation cache to `SessionCatalog`
2. `SessionCatalog.lookupRelation` will return `SimpleCatalogRelation` and the analyzer will convert it to `LogicalRelation` or `MetastoreRelation` later, then `HiveSessionCatalog` doesn't need to override `lookupRelation` anymore
3. `FindDataSourceTable` will read/write the table relation cache.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#16621 from cloud-fan/plan-cache.
## What changes were proposed in this pull request?
#16492 missed one race condition: `StreamExecution.awaitInitialization` may throw fatal errors and fail the test. This PR just ignores `StreamingQueryException` thrown from `awaitInitialization` so that we can verify the exception in the `ExpectFailure` action later. It's fine since `StopStream` or `ExpectFailure` will catch `StreamingQueryException` as well.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#16567 from zsxwing/SPARK-19113-2.