## What changes were proposed in this pull request?
Currently, the SQL metrics looks like `number of rows: 111111111111`, it's very hard to read how large the number is. So a separator was added by #12425, but removed by #14142, because the separator is weird in some locales (for example, pl_PL), this PR will add that back, but always use "," as the separator, since the SQL UI are all in English.
## How was this patch tested?
Existing tests.
![metrics](https://cloud.githubusercontent.com/assets/40902/14573908/21ad2f00-030d-11e6-9e2c-c544f30039ea.png)
Author: Davies Liu <davies@databricks.com>
Closes#15106 from davies/metric_sep.
## What changes were proposed in this pull request?
Clarify that slide and window duration are absolute, and not relative to a calendar.
## How was this patch tested?
Doc build (no functional change)
Author: Sean Owen <sowen@cloudera.com>
Closes#15142 from srowen/SPARK-17297.
## What changes were proposed in this pull request?
AssertOnQuery has two apply constructor: one that accepts a closure that returns boolean, and another that accepts a closure that returns Unit. This is actually very confusing because developers could mistakenly think that AssertOnQuery always require a boolean return type and verifies the return result, when indeed the value of the last statement is ignored in one of the constructors.
This pull request makes the two constructor consistent and always require boolean value. It will overall make the test suites more robust against developer errors.
As an evidence for the confusing behavior, this change also identified a bug with an existing test case due to file system time granularity. This pull request fixes that test case as well.
## How was this patch tested?
This is a test only change.
Author: petermaxlee <petermaxlee@gmail.com>
Closes#15127 from petermaxlee/SPARK-17571.
## Problem
CSV in Spark 2.0.0:
- does not read null values back correctly for certain data types such as `Boolean`, `TimestampType`, `DateType` -- this is a regression comparing to 1.6;
- does not read empty values (specified by `options.nullValue`) as `null`s for `StringType` -- this is compatible with 1.6 but leads to problems like SPARK-16903.
## What changes were proposed in this pull request?
This patch makes changes to read all empty values back as `null`s.
## How was this patch tested?
New test cases.
Author: Liwei Lin <lwlin7@gmail.com>
Closes#14118 from lw-lin/csv-cast-null.
## What changes were proposed in this pull request?
In `SessionCatalog`, we have several operations(`tableExists`, `dropTable`, `loopupRelation`, etc) that handle both temp views and metastore tables/views. This brings some bugs to DDL commands that want to handle temp view only or metastore table/view only. These bugs are:
1. `CREATE TABLE USING` will fail if a same-name temp view exists
2. `Catalog.dropTempView`will un-cache and drop metastore table if a same-name table exists
3. `saveAsTable` will fail or have unexpected behaviour if a same-name temp view exists.
These bug fixes are pulled out from https://github.com/apache/spark/pull/14962 and targets both master and 2.0 branch
## How was this patch tested?
new regression tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#15099 from cloud-fan/fix-view.
### What changes were proposed in this pull request?
In Spark 2.1, we introduced a new internal provider `hive` for telling Hive serde tables from data source tables. This PR is to block users to specify this in `DataFrameWriter` and SQL APIs.
### How was this patch tested?
Added a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#15073 from gatorsmile/formatHive.
## What changes were proposed in this pull request?
This PR fixes all the instances which was fixed in the previous PR.
To make sure, I manually debugged and also checked the Scala source. `length` in [LinearSeqOptimized.scala#L49-L57](https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/LinearSeqOptimized.scala#L49-L57) is O(n). Also, `size` calls `length` via [SeqLike.scala#L106](https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/SeqLike.scala#L106).
For debugging, I have created these as below:
```scala
ArrayBuffer(1, 2, 3)
Array(1, 2, 3)
List(1, 2, 3)
Seq(1, 2, 3)
```
and then called `size` and `length` for each to debug.
## How was this patch tested?
I ran the bash as below on Mac
```bash
find . -name *.scala -type f -exec grep -il "while (.*\\.length)" {} \; | grep "src/main"
find . -name *.scala -type f -exec grep -il "while (.*\\.size)" {} \; | grep "src/main"
```
and then checked each.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#15093 from HyukjinKwon/SPARK-17480-followup.
## What changes were proposed in this pull request?
Add a clearUntil() method on BitSet (adapted from the pre-existing setUntil() method).
Use this method to clear the subset of the BitSet which needs to be used during merge joins.
## How was this patch tested?
dev/run-tests, as well as performance tests on skewed data as described in jira.
I expect there to be a small local performance hit using BitSet.clearUntil rather than BitSet.clear for normally shaped (unskewed) joins (additional read on the last long). This is expected to be de-minimis and was not specifically tested.
Author: David Navas <davidn@clearstorydata.com>
Closes#15084 from davidnavas/bitSet.
The existing code caches all stats for all columns for each partition
in the driver; for a large relation, this causes extreme memory usage,
which leads to gc hell and application failures.
It seems that only the size in bytes of the data is actually used in the
driver, so instead just colllect that. In executors, the full stats are
still kept, but that's not a big problem; we expect the data to be distributed
and thus not really incur in too much memory pressure in each individual
executor.
There are also potential improvements on the executor side, since the data
being stored currently is very wasteful (e.g. storing boxed types vs.
primitive types for stats). But that's a separate issue.
On a mildly related change, I'm also adding code to catch exceptions in the
code generator since Janino was breaking with the test data I tried this
patch on.
Tested with unit tests and by doing a count a very wide table (20k columns)
with many partitions.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#15112 from vanzin/SPARK-17549.
## What changes were proposed in this pull request?
Fix `<ul> / <li>` problems in SQL scaladoc.
## How was this patch tested?
Scaladoc build and manual verification of generated HTML.
Author: Sean Owen <sowen@cloudera.com>
Closes#15117 from srowen/SPARK-17561.
## What changes were proposed in this pull request?
This PR is a follow up of SPARK-17356. Current implementation of `TreeNode.toJSON` recursively converts all fields of TreeNode to JSON, even if the field is of type `Seq` or type Map. This may trigger out of memory exception in cases like:
1. the Seq or Map can be very big. Converting them to JSON may take huge memory, which may trigger out of memory error.
2. Some user space input may also be propagated to the Plan. The user space input can be of arbitrary type, and may also be self-referencing. Trying to print user space input to JSON may trigger out of memory error or stack overflow error.
For a code example, please check the Jira description of SPARK-17426.
In this PR, we refactor the `TreeNode.toJSON` so that we only convert a field to JSON string if the field is a safe type.
## How was this patch tested?
Unit test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#14990 from clockfly/json_oom2.
## What changes were proposed in this pull request?
This change preserves aliases that are given for pivot aggregations
## How was this patch tested?
New unit test
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#15111 from aray/SPARK-17458.
## What changes were proposed in this pull request?
select length(11);
select length(2.0);
these sql will return errors, but hive is ok.
this PR will support casting input types implicitly for function length
the correct result is:
select length(11) return 2
select length(2.0) return 3
Author: 岑玉海 <261810726@qq.com>
Author: cenyuhai <cenyuhai@didichuxing.com>
Closes#15014 from cenyuhai/SPARK-17429.
## What changes were proposed in this pull request?
This PR fixes an issue with aggregates that have an empty input, and use a literals as their grouping keys. These aggregates are currently interpreted as aggregates **without** grouping keys, this triggers the ungrouped code path (which aways returns a single row).
This PR fixes the `RemoveLiteralFromGroupExpressions` optimizer rule, which changes the semantics of the Aggregate by eliminating all literal grouping keys.
## How was this patch tested?
Added tests to `SQLQueryTestSuite`.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#15101 from hvanhovell/SPARK-17114-3.
## What changes were proposed in this pull request?
Optimize a while loop during batch inserts
## How was this patch tested?
Unit tests were done, specifically "mvn test" for sql
Author: John Muller <jmuller@us.imshealth.com>
Closes#15098 from blue666man/SPARK-17536.
### What changes were proposed in this pull request?
For the following `ALTER TABLE` DDL, we should issue an exception when the target table is a `VIEW`:
```SQL
ALTER TABLE viewName SET LOCATION '/path/to/your/lovely/heart'
ALTER TABLE viewName SET SERDE 'whatever'
ALTER TABLE viewName SET SERDEPROPERTIES ('x' = 'y')
ALTER TABLE viewName PARTITION (a=1, b=2) SET SERDEPROPERTIES ('x' = 'y')
ALTER TABLE viewName ADD IF NOT EXISTS PARTITION (a='4', b='8')
ALTER TABLE viewName DROP IF EXISTS PARTITION (a='2')
ALTER TABLE viewName RECOVER PARTITIONS
ALTER TABLE viewName PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')
```
In addition, `ALTER TABLE RENAME PARTITION` is unable to handle data source tables, just like the other `ALTER PARTITION` commands. We should issue an exception instead.
### How was this patch tested?
Added a few test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#15004 from gatorsmile/altertable.
## What changes were proposed in this pull request?
Make CollectionAccumulator and SetAccumulator's value can be read thread-safely to fix the ConcurrentModificationException reported in [JIRA](https://issues.apache.org/jira/browse/SPARK-17463).
## How was this patch tested?
Existing tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#15063 from zsxwing/SPARK-17463.
## What changes were proposed in this pull request?
Currently, ORDER BY clause returns nulls value according to sorting order (ASC|DESC), considering null value is always smaller than non-null values.
However, SQL2003 standard support NULLS FIRST or NULLS LAST to allow users to specify whether null values should be returned first or last, regardless of sorting order (ASC|DESC).
This PR is to support this new feature.
## How was this patch tested?
New test cases are added to test NULLS FIRST|LAST for regular select queries and windowing queries.
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: Xin Wu <xinwu@us.ibm.com>
Closes#14842 from xwu0226/SPARK-10747.
## What changes were proposed in this pull request?
I first thought they are missing because they are kind of hidden options but it seems they are just missing.
For example, `spark.sql.parquet.mergeSchema` is documented in [sql-programming-guide.md](https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md) but this function is missing whereas many options such as `spark.sql.join.preferSortMergeJoin` are not documented but have its own function individually.
So, this PR suggests making them consistent by adding the missing functions for some options in `SQLConf` and use them where applicable, in order to make them more readable.
## How was this patch tested?
Existing tests should cover this.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#14678 from HyukjinKwon/sqlconf-cleanup.
## What changes were proposed in this pull request?
In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing.
The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.).
In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations.
## How was this patch tested?
Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#15068 from JoshRosen/pyspark-collect-limit.
### What changes were proposed in this pull request?
As explained in https://github.com/apache/spark/pull/14797:
>Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs.
For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result.
We should not optimize the query in CTAS more than once. For example,
```Scala
spark.range(99, 101).createOrReplaceTempView("tab1")
val sqlStmt = "SELECT id, cast(id as long) * cast('1.0' as decimal(38, 18)) as num FROM tab1"
sql(s"CREATE TABLE tab2 USING PARQUET AS $sqlStmt")
checkAnswer(spark.table("tab2"), sql(sqlStmt))
```
Before this PR, the results do not match
```
== Results ==
!== Correct Answer - 2 == == Spark Answer - 2 ==
![100,100.000000000000000000] [100,null]
[99,99.000000000000000000] [99,99.000000000000000000]
```
After this PR, the results match.
```
+---+----------------------+
|id |num |
+---+----------------------+
|99 |99.000000000000000000 |
|100|100.000000000000000000|
+---+----------------------+
```
In this PR, we do not treat the `query` in CTAS as a child. Thus, the `query` will not be optimized when optimizing CTAS statement. However, we still need to analyze it for normalizing and verifying the CTAS in the Analyzer. Thus, we do it in the analyzer rule `PreprocessDDL`, because so far only this rule needs the analyzed plan of the `query`.
### How was this patch tested?
Added a test
Author: gatorsmile <gatorsmile@gmail.com>
Closes#15048 from gatorsmile/ctasOptimized.
## What changes were proposed in this pull request?
Point references to spark-packages.org to https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
This will be accompanied by a parallel change to the spark-website repo, and additional changes to this wiki.
## How was this patch tested?
Jenkins tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#15075 from srowen/SPARK-17445.
## What changes were proposed in this pull request?
Scala's List.length method is O(N) and it makes the gatherCompressibilityStats function O(N^2). Eliminate the List.length calls by writing it in Scala way.
https://github.com/scala/scala/blob/2.10.x/src/library/scala/collection/LinearSeqOptimized.scala#L36
As suggested. Extended the fix to HiveInspectors and AggregationIterator classes as well.
## How was this patch tested?
Profiled a Spark job and found that CompressibleColumnBuilder is using 39% of the CPU. Out of this 39% CompressibleColumnBuilder->gatherCompressibilityStats is using 23% of it. 6.24% of the CPU is spend on List.length which is called inside gatherCompressibilityStats.
After this change we started to save 6.24% of the CPU.
Author: Ergin Seyfe <eseyfe@fb.com>
Closes#15032 from seyfe/gatherCompressibilityStats.
## What changes were proposed in this pull request?
CollectLimit.execute() incorrectly omits per-partition limits, leading to performance regressions in case this case is hit (which should not happen in normal operation, but can occur in some cases (see #15068 for one example).
## How was this patch tested?
Regression test in SQLQuerySuite that asserts the number of records scanned from the input RDD.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#15070 from JoshRosen/SPARK-17515.
## What changes were proposed in this pull request?
When there is any Python UDF in the Project between Sort and Limit, it will be collected into TakeOrderedAndProjectExec, ExtractPythonUDFs failed to pull the Python UDFs out because QueryPlan.expressions does not include the expression inside Option[Seq[Expression]].
Ideally, we should fix the `QueryPlan.expressions`, but tried with no luck (it always run into infinite loop). In PR, I changed the TakeOrderedAndProjectExec to no use Option[Seq[Expression]] to workaround it. cc JoshRosen
## How was this patch tested?
Added regression test.
Author: Davies Liu <davies@databricks.com>
Closes#15030 from davies/all_expr.
## What changes were proposed in this pull request?
This is a trivial patch that catches all `OutOfMemoryError` while building the broadcast hash relation and rethrows it by wrapping it in a nice error message.
## How was this patch tested?
Existing Tests
Author: Sameer Agarwal <sameerag@cs.berkeley.edu>
Closes#14979 from sameeragarwal/broadcast-join-error.
## What changes were proposed in this pull request?
Check the database warehouse used in Spark UT, and remove the existing database file before run the UT (SPARK-8368).
## How was this patch tested?
Run Spark UT with the command for several times:
./build/sbt -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver "test-only *HiveSparkSubmitSuit*"
Without the patch, the test case can be passed only at the first time, and always failed from the second time.
With the patch the test case always can be passed correctly.
Author: tone-zhang <tone.zhang@linaro.org>
Closes#14894 from tone-zhang/issue1.
## What changes were proposed in this pull request?
This PR fixes `ColumnVectorUtils.populate` so that Parquet vectorized reader can read partitioned table with dates/timestamps. This works fine with Parquet normal reader.
This is being only called within [VectorizedParquetRecordReader.java#L185](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L185).
When partition column types are explicitly given to `DateType` or `TimestampType` (rather than inferring the type of partition column), this fails with the exception below:
```
16/09/01 10:30:07 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 6)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.sql.Date
at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:89)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:185)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:204)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:362)
...
```
## How was this patch tested?
Unit tests in `SQLQuerySuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#14919 from HyukjinKwon/SPARK-17354.
## What changes were proposed in this pull request?
In `PreprocessDDL` we will check if table columns are duplicated. However, this checking ignores case sensitivity config(it's always case-sensitive) and lead to different result between `HiveExternalCatalog` and `InMemoryCatalog`. `HiveExternalCatalog` will throw exception because hive metastore is always case-nonsensitive, and `InMemoryCatalog` is fine.
This PR fixes it.
## How was this patch tested?
a new test in DDLSuite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#14994 from cloud-fan/check-dup.
## What changes were proposed in this pull request?
`select size(null)` returns -1 in Hive. In order to be compatible, we should return `-1`.
## How was this patch tested?
unit test in `CollectionFunctionsSuite` and `DataFrameFunctionsSuite`.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#14991 from adrian-wang/size.
## What changes were proposed in this pull request?
We should generally use `ArrayBuffer.+=(A)` rather than `ArrayBuffer.append(A)`, because `append(A)` would involve extra boxing / unboxing.
## How was this patch tested?
N/A
Author: Liwei Lin <lwlin7@gmail.com>
Closes#14914 from lw-lin/append_to_plus_eq_v2.
## What changes were proposed in this pull request?
When we create a filestream on a directory that has partitioned subdirs (i.e. dir/x=y/), then ListingFileCatalog.allFiles returns the files in the dir as Seq[String] which internally is a Stream[String]. This is because of this [line](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93), where a LinkedHashSet.values.toSeq returns Stream. Then when the [FileStreamSource](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79) filters this Stream[String] to remove the seen files, it creates a new Stream[String], which has a filter function that has a $outer reference to the FileStreamSource (in Scala 2.10). Trying to serialize this Stream[String] causes NotSerializableException. This will happened even if there is just one file in the dir.
Its important to note that this behavior is different in Scala 2.11. There is no $outer reference to FileStreamSource, so it does not throw NotSerializableException. However, with a large sequence of files (tested with 10000 files), it throws StackOverflowError. This is because how Stream class is implemented. Its basically like a linked list, and attempting to serialize a long Stream requires *recursively* going through linked list, thus resulting in StackOverflowError.
In short, across both Scala 2.10 and 2.11, serialization fails when both the following conditions are true.
- file stream defined on a partitioned directory
- directory has 10k+ files
The right solution is to convert the seq to an array before writing to the log. This PR implements this fix in two ways.
- Changing all uses for HDFSMetadataLog to ensure Array is used instead of Seq
- Added a `require` in HDFSMetadataLog such that it is never used with type Seq
## How was this patch tested?
Added unit test that test that ensures the file stream source can handle with 10000 files. This tests fails in both Scala 2.10 and 2.11 with different failures as indicated above.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#14987 from tdas/SPARK-17372.
## What changes were proposed in this pull request?
In LongToUnsafeRowMap, we use offset of a value as pointer, stored in a array also in the page for chained values. The offset is not portable, because Platform.LONG_ARRAY_OFFSET will be different with different JVM Heap size, then the deserialized LongToUnsafeRowMap will be corrupt.
This PR will change to use portable address (without Platform.LONG_ARRAY_OFFSET).
## How was this patch tested?
Added a test case with random generated keys, to improve the coverage. But this test is not a regression test, that could require a Spark cluster that have at least 32G heap in driver or executor.
Author: Davies Liu <davies@databricks.com>
Closes#14927 from davies/longmap.
## What changes were proposed in this pull request?
This PR adds better error messages for malformed record when reading a JSON file using DataFrameReader.
For example, for query:
```
import org.apache.spark.sql.types._
val corruptRecords = spark.sparkContext.parallelize("""{"a":{, b:3}""" :: Nil)
val schema = StructType(StructField("a", StringType, true) :: Nil)
val jsonDF = spark.read.schema(schema).json(corruptRecords)
```
**Before change:**
We silently replace corrupted line with null
```
scala> jsonDF.show
+----+
| a|
+----+
|null|
+----+
```
**After change:**
Add an explicit warning message:
```
scala> jsonDF.show
16/09/02 14:43:16 WARN JacksonParser: Found at least one malformed records (sample: {"a":{, b:3}). The JSON reader will replace
all malformed records with placeholder null in current PERMISSIVE parser mode.
To find out which corrupted records have been replaced with null, please use the
default inferred schema instead of providing a custom schema.
Code example to print all malformed records (scala):
===================================================
// The corrupted record exists in column _corrupt_record.
val parsedJson = spark.read.json("/path/to/json/file/test.json")
+----+
| a|
+----+
|null|
+----+
```
###
## How was this patch tested?
Unit test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#14929 from clockfly/logwarning_if_schema_not_contain_corrupted_record.
## What changes were proposed in this pull request?
class `org.apache.spark.sql.types.Metadata` is widely used in mllib to store some ml attributes. `Metadata` is commonly stored in `Alias` expression.
```
case class Alias(child: Expression, name: String)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifier: Option[String] = None,
val explicitMetadata: Option[Metadata] = None,
override val isGenerated: java.lang.Boolean = false)
```
The `Metadata` can take a big memory footprint since the number of attributes is big ( in scale of million). When `toJSON` is called on `Alias` expression, the `Metadata` will also be converted to a big JSON string.
If a plan contains many such kind of `Alias` expressions, it may trigger out of memory error when `toJSON` is called, since converting all `Metadata` references to JSON will take huge memory.
With this PR, we will skip scanning Metadata when doing JSON conversion. For a reproducer of the OOM, and analysis, please look at jira https://issues.apache.org/jira/browse/SPARK-17356.
## How was this patch tested?
Existing tests.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#14915 from clockfly/json_oom.
## What changes were proposed in this pull request?
Using the public `Catalog` API, users can create a file-based data source table, without giving the path options. For this case, currently we can create the table successfully, but fail when we read it. Ideally we should fail during creation.
This is because when we create data source table, we resolve the data source relation without validating path: `resolveRelation(checkPathExist = false)`.
Looking back to why we add this trick(`checkPathExist`), it's because when we call `resolveRelation` for managed table, we add the path to data source options but the path is not created yet. So why we add this not-yet-created path to data source options? This PR fix the problem by adding path to options after we call `resolveRelation`. Then we can remove the `checkPathExist` parameter in `DataSource.resolveRelation` and do some related cleanups.
## How was this patch tested?
existing tests and new test in `CatalogSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#14921 from cloud-fan/check-path.
## What changes were proposed in this pull request?
1. Support generation table-level statistics for
- hive tables in HiveExternalCatalog
- data source tables in HiveExternalCatalog
- data source tables in InMemoryCatalog.
2. Add a property "catalogStats" in CatalogTable to hold statistics in Spark side.
3. Put logics of statistics transformation between Spark and Hive in HiveClientImpl.
4. Extend Statistics class by adding rowCount (will add estimatedSize when we have column stats).
## How was this patch tested?
add unit tests
Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#14712 from wzhfy/tableStats.
## What changes were proposed in this pull request?
It's really weird that we allow users to specify database in both from table name and to table name
in `ALTER TABLE RENAME TO`, while logically we can't support rename a table to a different database.
Both postgres and MySQL disallow this syntax, it's reasonable to follow them and simply our code.
## How was this patch tested?
new test in `DDLCommandSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#14955 from cloud-fan/rename.
### What changes were proposed in this pull request?
When we trying to read a table and then write to the same table using the `Overwrite` save mode, we got a very confusing error message:
For example,
```Scala
Seq((1, 2)).toDF("i", "j").write.saveAsTable("tab1")
table("tab1").write.mode(SaveMode.Overwrite).saveAsTable("tab1")
```
```
Job aborted.
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp
...
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.sql.execution.datasources
```
After the PR, we will issue an `AnalysisException`:
```
Cannot overwrite table `tab1` that is also being read from
```
### How was this patch tested?
Added test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#14954 from gatorsmile/ctasQueryAnalyze.
### What changes were proposed in this pull request?
This is another step to get rid of HiveClient from `HiveSessionState`. All the metastore interactions should be through `ExternalCatalog` interface. However, the existing implementation of `InsertIntoHiveTable ` still requires Hive clients. This PR is to remove HiveClient by moving the metastore interactions into `ExternalCatalog`.
### How was this patch tested?
Existing test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#14888 from gatorsmile/removeClientFromInsertIntoHiveTable.
## What changes were proposed in this pull request?
Require the use of CROSS join syntax in SQL (and a new crossJoin
DataFrame API) to specify explicit cartesian products between relations.
By cartesian product we mean a join between relations R and S where
there is no join condition involving columns from both R and S.
If a cartesian product is detected in the absence of an explicit CROSS
join, an error must be thrown. Turning on the
"spark.sql.crossJoin.enabled" configuration flag will disable this check
and allow cartesian products without an explicit CROSS join.
The new crossJoin DataFrame API must be used to specify explicit cross
joins. The existing join(DataFrame) method will produce a INNER join
that will require a subsequent join condition.
That is df1.join(df2) is equivalent to select * from df1, df2.
## How was this patch tested?
Added cross-join.sql to the SQLQueryTestSuite to test the check for cartesian products. Added a couple of tests to the DataFrameJoinSuite to test the crossJoin API. Modified various other test suites to explicitly specify a cross join where an INNER join or a comma-separated list was previously used.
Author: Srinath Shankar <srinath@databricks.com>
Closes#14866 from srinathshankar/crossjoin.
## What changes were proposed in this pull request?
This patch fixes a bug in the vectorized parquet reader that's caused by re-using the same dictionary column vector while reading consecutive row groups. Specifically, this issue manifests for a certain distribution of dictionary/plain encoded data while we read/populate the underlying bit packed dictionary data into a column-vector based data structure.
## How was this patch tested?
Manually tested on datasets provided by the community. Thanks to Chris Perluss and Keith Kraus for their invaluable help in tracking down this issue!
Author: Sameer Agarwal <sameerag@cs.berkeley.edu>
Closes#14941 from sameeragarwal/parquet-exception-2.
## What changes were proposed in this pull request?
Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs.
For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result.
Ideally, we should make all the analyzer rules all idempotent, that may require lots of effort to double checking them one by one (may be not easy).
An easier approach could be never feed a optimized plan into Analyzer, this PR fix the case for RunnableComand, they will be optimized, during execution, the passed `query` will also be passed into QueryExecution again. This PR make these `query` not part of the children, so they will not be optimized and analyzed again.
Right now, we did not know a logical plan is optimized or not, we could introduce a flag for that, and make sure a optimized logical plan will not be analyzed again.
## How was this patch tested?
Added regression tests.
Author: Davies Liu <davies@databricks.com>
Closes#14797 from davies/fix_writer.
This patch refactors the internals of the JDBC data source in order to allow some of its code to be re-used in an automated comparison testing harness. Here are the key changes:
- Move the JDBC `ResultSetMetadata` to `StructType` conversion logic from `JDBCRDD.resolveTable()` to the `JdbcUtils` object (as a new `getSchema(ResultSet, JdbcDialect)` method), allowing it to be applied on `ResultSet`s that are created elsewhere.
- Move the `ResultSet` to `InternalRow` conversion methods from `JDBCRDD` to `JdbcUtils`:
- It makes sense to move the `JDBCValueGetter` type and `makeGetter` functions here given that their write-path counterparts (`JDBCValueSetter`) are already in `JdbcUtils`.
- Add an internal `resultSetToSparkInternalRows` method which takes a `ResultSet` and schema and returns an `Iterator[InternalRow]`. This effectively extracts the main loop of `JDBCRDD` into its own method.
- Add a public `resultSetToRows` method to `JdbcUtils`, which wraps the minimal machinery around `resultSetToSparkInternalRows` in order to allow it to be called from outside of a Spark job.
- Make `JdbcDialect.get` into a `DeveloperApi` (`JdbcDialect` itself is already a `DeveloperApi`).
Put together, these changes enable the following testing pattern:
```scala
val jdbResultSet: ResultSet = conn.prepareStatement(query).executeQuery()
val resultSchema: StructType = JdbcUtils.getSchema(jdbResultSet, JdbcDialects.get("jdbc:postgresql"))
val jdbcRows: Seq[Row] = JdbcUtils.resultSetToRows(jdbResultSet, schema).toSeq
checkAnswer(sparkResult, jdbcRows) // in a test case
```
Author: Josh Rosen <joshrosen@databricks.com>
Closes#14907 from JoshRosen/modularize-jdbc-internals.
## What changes were proposed in this pull request?
Try increase number of partitions to try so we don't revert to all.
## How was this patch tested?
Empirically. This is common case optimization.
Author: Robert Kruszewski <robertk@palantir.com>
Closes#14573 from robert3005/robertk/execute-take-backoff.
## What changes were proposed in this pull request?
Adds (Scala-specific) and (Java-specific) to Scaladoc.
## How was this patch tested?
local build
Author: Jacek Laskowski <jacek@japila.pl>
Closes#14891 from jaceklaskowski/scala-specifics.
follow #13137 This pr sets the right number of partitions when reading data from a local collection.
Query 'val df = Seq((1, 2)).toDF("key", "value").count' always use defaultParallelism tasks. So it causes run many empty or small tasks.
Manually tested and checked.
Author: Lianhui Wang <lianhuiwang09@gmail.com>
Closes#13979 from lianhuiwang/localTable-Parallel.
## What changes were proposed in this pull request?
This PR is the second step for the following feature:
For hash aggregation in Spark SQL, we use a fast aggregation hashmap to act as a "cache" in order to boost aggregation performance. Previously, the hashmap is backed by a `ColumnarBatch`. This has performance issues when we have wide schema for the aggregation table (large number of key fields or value fields).
In this JIRA, we support another implementation of fast hashmap, which is backed by a `RowBatch`. We then automatically pick between the two implementations based on certain knobs.
In this second-step PR, we enable `RowBasedHashMapGenerator` in `HashAggregateExec`.
## How was this patch tested?
Added tests: `RowBasedAggregateHashMapSuite` and ` VectorizedAggregateHashMapSuite`
Additional micro-benchmarks tests and TPCDS results will be added in a separate PR in the series.
Author: Qifan Pu <qifan.pu@gmail.com>
Author: ooq <qifan.pu@gmail.com>
Closes#14176 from ooq/rowbasedfastaggmap-pr2.
## What changes were proposed in this pull request?
Attempting to use Spark SQL's JDBC data source against the Hive ThriftServer results in a `java.sql.SQLException: Method` not supported exception from `org.apache.hive.jdbc.HiveResultSetMetaData.isSigned`. Here are two user reports of this issue:
- https://stackoverflow.com/questions/34067686/spark-1-5-1-not-working-with-hive-jdbc-1-2-0
- https://stackoverflow.com/questions/32195946/method-not-supported-in-spark
I have filed [HIVE-14684](https://issues.apache.org/jira/browse/HIVE-14684) to attempt to fix this in Hive by implementing the isSigned method, but in the meantime / for compatibility with older JDBC drivers I think we should add special-case error handling to work around this bug.
This patch updates `JDBCRDD`'s `ResultSetMetadata` to schema conversion to catch the "Method not supported" exception from Hive and return `isSigned = true`. I believe that this is safe because, as far as I know, Hive does not support unsigned numeric types.
## How was this patch tested?
Tested manually against a Spark Thrift Server.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#14911 from JoshRosen/hive-jdbc-workaround.
## What changes were proposed in this pull request?
It seems `EqualNullSafe` filter was missed for batch pruneing partitions in cached tables.
It seems supporting this improves the performance roughly 5 times faster.
Running the codes below:
```scala
test("Null-safe equal comparison") {
val N = 20000000
val df = spark.range(N).repartition(20)
val benchmark = new Benchmark("Null-safe equal comparison", N)
df.createOrReplaceTempView("t")
spark.catalog.cacheTable("t")
sql("select id from t where id <=> 1").collect()
benchmark.addCase("Null-safe equal comparison", 10) { _ =>
sql("select id from t where id <=> 1").collect()
}
benchmark.run()
}
```
produces the results below:
**Before:**
```
Running benchmark: Null-safe equal comparison
Running case: Null-safe equal comparison
Stopped after 10 iterations, 2098 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_45-b14 on Mac OS X 10.11.5
Intel(R) Core(TM) i7-4850HQ CPU 2.30GHz
Null-safe equal comparison: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Null-safe equal comparison 204 / 210 98.1 10.2 1.0X
```
**After:**
```
Running benchmark: Null-safe equal comparison
Running case: Null-safe equal comparison
Stopped after 10 iterations, 478 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_45-b14 on Mac OS X 10.11.5
Intel(R) Core(TM) i7-4850HQ CPU 2.30GHz
Null-safe equal comparison: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Null-safe equal comparison 42 / 48 474.1 2.1 1.0X
```
## How was this patch tested?
Unit tests in `PartitionBatchPruningSuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#14117 from HyukjinKwon/SPARK-16461.
## What changes were proposed in this pull request?
Avoid allocating some 0-length arrays, esp. in UTF8String, and by using Array.empty in Scala over Array[T]()
## How was this patch tested?
Jenkins
Author: Sean Owen <sowen@cloudera.com>
Closes#14895 from srowen/SPARK-17331.
## What changes were proposed in this pull request?
This PR adds the ability to parse SQL (hexadecimal) binary literals (AKA bit strings). It follows the following syntax `X'[Hexadecimal Characters]+'`, for example: `X'01AB'` would create a binary the following binary array `0x01AB`.
If an uneven number of hexadecimal characters is passed, then the upper 4 bits of the initial byte are kept empty, and the lower 4 bits are filled using the first character. For example `X'1C7'` would create the following binary array `0x01C7`.
Binary data (Array[Byte]) does not have a proper `hashCode` and `equals` functions. This meant that comparing `Literal`s containing binary data was a pain. I have updated Literal.hashCode and Literal.equals to deal properly with binary data.
## How was this patch tested?
Added tests to the `ExpressionParserSuite`, `SQLQueryTestSuite` and `ExpressionSQLBuilderSuite`.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#14832 from hvanhovell/SPARK-17263.
## What changes were proposed in this pull request?
This is kind of a follow-up of https://github.com/apache/spark/pull/14482 . As we put `CatalogTable` in the logical plan directly, it makes sense to let physical plans take `CatalogTable` directly, instead of extracting some fields of `CatalogTable` in planner and then construct a new `CatalogTable` in physical plan.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#14823 from cloud-fan/create-table.
### What changes were proposed in this pull request?
The existing `CREATE TABLE LIKE` command has multiple issues:
- The generated table is non-empty when the source table is a data source table. The major reason is the data source table is using the table property `path` to store the location of table contents. Currently, we keep it unchanged. Thus, we still create the same table with the same location.
- The table type of the generated table is `EXTERNAL` when the source table is an external Hive Serde table. Currently, we explicitly set it to `MANAGED`, but Hive is checking the table property `EXTERNAL` to decide whether the table is `EXTERNAL` or not. (See https://github.com/apache/hive/blob/master/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1407-L1408) Thus, the created table is still `EXTERNAL`.
- When the source table is a `VIEW`, the metadata of the generated table contains the original view text and view original text. So far, this does not break anything, but it could cause something wrong in Hive. (For example, https://github.com/apache/hive/blob/master/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1405-L1406)
- The issue regarding the table `comment`. To follow what Hive does, the table comment should be cleaned, but the column comments should be still kept.
- The `INDEX` table is not supported. Thus, we should throw an exception in this case.
- `owner` should not be retained. `ToHiveTable` set it [here](e679bc3c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala (L793)) no matter which value we set in `CatalogTable`. We set it to an empty string for avoiding the confusing output in Explain.
- Add a support for temp tables
- Like Hive, we should not copy the table properties from the source table to the created table, especially for the statistics-related properties, which could be wrong in the created table.
- `unsupportedFeatures` should not be copied from the source table. The created table does not have these unsupported features.
- When the type of source table is a view, the target table is using the default format of data source tables: `spark.sql.sources.default`.
This PR is to fix the above issues.
### How was this patch tested?
Improve the test coverage by adding more test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#14531 from gatorsmile/createTableLike.
## What changes were proposed in this pull request?
This PR implements aggregation function `percentile_approx`. Function `percentile_approx` returns the approximate percentile(s) of a column at the given percentage(s). A percentile is a watermark value below which a given percentage of the column values fall. For example, the percentile of column `col` at percentage 50% is the median value of column `col`.
### Syntax:
```
# Returns percentile at a given percentage value. The approximation error can be reduced by increasing parameter accuracy, at the cost of memory.
percentile_approx(col, percentage [, accuracy])
# Returns percentile value array at given percentage value array
percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy])
```
### Features:
1. This function supports partial aggregation.
2. The memory consumption is bounded. The larger `accuracy` parameter we choose, we smaller error we get. The default accuracy value is 10000, to match with Hive default setting. Choose a smaller value for smaller memory footprint.
3. This function supports window function aggregation.
### Example usages:
```
## Returns the 25th percentile value, with default accuracy
SELECT percentile_approx(col, 0.25) FROM table
## Returns an array of percentile value (25th, 50th, 75th), with default accuracy
SELECT percentile_approx(col, array(0.25, 0.5, 0.75)) FROM table
## Returns 25th percentile value, with custom accuracy value 100, larger accuracy parameter yields smaller approximation error
SELECT percentile_approx(col, 0.25, 100) FROM table
## Returns the 25th, and 50th percentile values, with custom accuracy value 100
SELECT percentile_approx(col, array(0.25, 0.5), 100) FROM table
```
### NOTE:
1. The `percentile_approx` implementation is different from Hive, so the result returned on same query maybe slightly different with Hive. This implementation uses `QuantileSummaries` as the underlying probabilistic data structure, and mainly follows paper `Space-efficient Online Computation of Quantile Summaries` by Greenwald, Michael and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)`
2. The current implementation of `QuantileSummaries` doesn't support automatic compression. This PR has a rule to do compression automatically at the caller side, but it may not be optimal.
## How was this patch tested?
Unit test, and Sql query test.
## Acknowledgement
1. This PR's work in based on lw-lin's PR https://github.com/apache/spark/pull/14298, with improvements like supporting partial aggregation, fixing out of memory issue.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#14868 from clockfly/appro_percentile_try_2.
## What changes were proposed in this pull request?
according to the discussion in the original PR #10896 and the new approach PR #14876 , we decided to revert these 2 PRs and go with the new approach.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#14909 from cloud-fan/revert.
## What changes were proposed in this pull request?
Currently we use `CreateViewCommand` to implement ALTER VIEW AS, which has 3 bugs:
1. SPARK-17180: ALTER VIEW AS should alter temp view if view name has no database part and temp view exists
2. SPARK-17309: ALTER VIEW AS should issue exception if view does not exist.
3. SPARK-17323: ALTER VIEW AS should keep the previous table properties, comment, create_time, etc.
The root cause is, ALTER VIEW AS is quite different from CREATE VIEW, we need different code path to handle them. However, in `CreateViewCommand`, there is no way to distinguish ALTER VIEW AS and CREATE VIEW, we have to introduce extra flag. But instead of doing this, I think a more natural way is to separate the ALTER VIEW AS logic into a new command.
## How was this patch tested?
new tests in SQLViewSuite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#14874 from cloud-fan/minor4.
## What changes were proposed in this pull request?
Clean up unused variables and unused import statements, unnecessary `return` and `toArray`, and some more style improvement, when I walk through the code examples.
## How was this patch tested?
Testet manually on local laptop.
Author: Xin Ren <iamshrek@126.com>
Closes#14836 from keypointt/codeWalkThroughML.
## What changes were proposed in this pull request?
Clarify that only parquet files are supported by DataStreamWriter now
## How was this patch tested?
(Doc build -- no functional changes to test)
Author: Sean Owen <sowen@cloudera.com>
Closes#14860 from srowen/SPARK-17264.
## What changes were proposed in this pull request?
Partial aggregations are generated in `EnsureRequirements`, but the planner fails to
check if partial aggregation satisfies sort requirements.
For the following query:
```
val df2 = (0 to 1000).map(x => (x % 2, x.toString)).toDF("a", "b").createOrReplaceTempView("t2")
spark.sql("select max(b) from t2 group by a").explain(true)
```
Now, the SortAggregator won't insert Sort operator before partial aggregation, this will break sort-based partial aggregation.
```
== Physical Plan ==
SortAggregate(key=[a#5], functions=[max(b#6)], output=[max(b)#17])
+- *Sort [a#5 ASC], false, 0
+- Exchange hashpartitioning(a#5, 200)
+- SortAggregate(key=[a#5], functions=[partial_max(b#6)], output=[a#5, max#19])
+- LocalTableScan [a#5, b#6]
```
Actually, a correct plan is:
```
== Physical Plan ==
SortAggregate(key=[a#5], functions=[max(b#6)], output=[max(b)#17])
+- *Sort [a#5 ASC], false, 0
+- Exchange hashpartitioning(a#5, 200)
+- SortAggregate(key=[a#5], functions=[partial_max(b#6)], output=[a#5, max#19])
+- *Sort [a#5 ASC], false, 0
+- LocalTableScan [a#5, b#6]
```
## How was this patch tested?
Added tests in `PlannerSuite`.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#14865 from maropu/SPARK-17289.
## What changes were proposed in this pull request?
This PR split the the single `createPartitions()` call into smaller batches, which could prevent Hive metastore from OOM (caused by millions of partitions).
It will also try to gather all the fast stats (number of files and total size of all files) in parallel to avoid the bottle neck of listing the files in metastore sequential, which is controlled by spark.sql.gatherFastStats (enabled by default).
## How was this patch tested?
Tested locally with 10000 partitions and 100 files with embedded metastore, without gathering fast stats in parallel, adding partitions took 153 seconds, after enable that, gathering the fast stats took about 34 seconds, adding these partitions took 25 seconds (most of the time spent in object store), 59 seconds in total, 2.5X faster (with larger cluster, gathering will much faster).
Author: Davies Liu <davies@databricks.com>
Closes#14607 from davies/repair_batch.
## What changes were proposed in this pull request?
Jira : https://issues.apache.org/jira/browse/SPARK-17271
Planner is adding un-needed SORT operation due to bug in the way comparison for `SortOrder` is done at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L253
`SortOrder` needs to be compared semantically because `Expression` within two `SortOrder` can be "semantically equal" but not literally equal objects.
eg. In case of `sql("SELECT * FROM table1 a JOIN table2 b ON a.col1=b.col1")`
Expression in required SortOrder:
```
AttributeReference(
name = "col1",
dataType = LongType,
nullable = false
) (exprId = exprId,
qualifier = Some("a")
)
```
Expression in child SortOrder:
```
AttributeReference(
name = "col1",
dataType = LongType,
nullable = false
) (exprId = exprId)
```
Notice that the output column has a qualifier but the child attribute does not but the inherent expression is the same and hence in this case we can say that the child satisfies the required sort order.
This PR includes following changes:
- Added a `semanticEquals` method to `SortOrder` so that it can compare underlying child expressions semantically (and not using default Object.equals)
- Fixed `EnsureRequirements` to use semantic comparison of SortOrder
## How was this patch tested?
- Added a test case to `PlannerSuite`. Ran rest tests in `PlannerSuite`
Author: Tejas Patil <tejasp@fb.com>
Closes#14841 from tejasapatil/SPARK-17271_sort_order_equals_bug.
## What changes were proposed in this pull request?
This pr to fix a bug below in sampling with replacement
```
val df = Seq((1, 0), (2, 0), (3, 0)).toDF("a", "b")
df.sample(true, 2.0).withColumn("c", monotonically_increasing_id).select($"c").show
+---+
| c|
+---+
| 0|
| 1|
| 1|
| 1|
| 2|
+---+
```
## How was this patch tested?
Added a test in `DataFrameSuite`.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#14800 from maropu/FixSampleBug.
## What changes were proposed in this pull request?
This patch adds a purge interface to MetadataLog, and an implementation in HDFSMetadataLog. The purge function is currently unused, but I will use it to purge old execution and file source logs in follow-up patches. These changes are required in a production structured streaming job that runs for a long period of time.
## How was this patch tested?
Added a unit test case in HDFSMetadataLogSuite.
Author: petermaxlee <petermaxlee@gmail.com>
Closes#14802 from petermaxlee/SPARK-17235.
## What changes were proposed in this pull request?
This PR adds parser support for `BigDecimal` literals. If you append the suffix `BD` to a valid number then this will be interpreted as a `BigDecimal`, for example `12.0E10BD` will interpreted into a BigDecimal with scale -9 and precision 3. This is useful in situations where you need exact values.
## How was this patch tested?
Added tests to `ExpressionParserSuite`, `ExpressionSQLBuilderSuite` and `SQLQueryTestSuite`.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#14819 from hvanhovell/SPARK-17246.
## What changes were proposed in this pull request?
Before this change, FileStreamSource uses an in-memory hash set to track the list of files processed by the engine. The list can grow indefinitely, leading to OOM or overflow of the hash set.
This patch introduces a new user-defined option called "maxFileAge", default to 24 hours. If a file is older than this age, FileStreamSource will purge it from the in-memory map that was used to track the list of files that have been processed.
## How was this patch tested?
Added unit tests for the underlying utility, and also added an end-to-end test to validate the purge in FileStreamSourceSuite. Also verified the new test cases would fail when the timeout was set to a very large number.
Author: petermaxlee <petermaxlee@gmail.com>
Closes#14728 from petermaxlee/SPARK-17165.
### What changes were proposed in this pull request?
Address the comments by yhuai in the original PR: https://github.com/apache/spark/pull/14207
First, issue an exception instead of logging a warning when users specify the partitioning columns without a given schema.
Second, refactor the codes a little.
### How was this patch tested?
Fixed the test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#14572 from gatorsmile/followup16552.
## What changes were proposed in this pull request?
This PR enables the tests for `TimestampType` for JSON and unifies the logics for verifying schema when writing in CSV.
In more details, this PR,
- Enables the tests for `TimestampType` for JSON and
This was disabled due to an issue in `DatatypeConverter.parseDateTime` which parses dates incorrectly, for example as below:
```scala
val d = javax.xml.bind.DatatypeConverter.parseDateTime("0900-01-01T00:00:00.000").getTime
println(d.toString)
```
```
Fri Dec 28 00:00:00 KST 899
```
However, since we use `FastDateFormat`, it seems we are safe now.
```scala
val d = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS").parse("0900-01-01T00:00:00.000")
println(d)
```
```
Tue Jan 01 00:00:00 PST 900
```
- Verifies all unsupported types in CSV
There is a separate logics to verify the schemas in `CSVFileFormat`. This is actually not quite correct enough because we don't support `NullType` and `CalanderIntervalType` as well `StructType`, `ArrayType`, `MapType`. So, this PR adds both types.
## How was this patch tested?
Tests in `JsonHadoopFsRelation` and `CSVSuite`
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#14829 from HyukjinKwon/SPARK-16216-followup.
## What changes were proposed in this pull request?
This PR introduces an abstract class `TypedImperativeAggregate` so that an aggregation function of TypedImperativeAggregate can use **arbitrary** user-defined Java object as intermediate aggregation buffer object.
**This has advantages like:**
1. It now can support larger category of aggregation functions. For example, it will be much easier to implement aggregation function `percentile_approx`, which has a complex aggregation buffer definition.
2. It can be used to avoid doing serialization/de-serialization for every call of `update` or `merge` when converting domain specific aggregation object to internal Spark-Sql storage format.
3. It is easier to integrate with other existing monoid libraries like algebird, and supports more aggregation functions with high performance.
Please see `org.apache.spark.sql.TypedImperativeAggregateSuite.TypedMaxAggregate` to find an example of how to defined a `TypedImperativeAggregate` aggregation function.
Please see Java doc of `TypedImperativeAggregate` and Jira ticket SPARK-17187 for more information.
## How was this patch tested?
Unit tests.
Author: Sean Zhong <seanzhong@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#14753 from clockfly/object_aggregation_buffer_try_2.
## What changes were proposed in this pull request?
When reading float4 and smallint columns from PostgreSQL, Spark's `PostgresDialect` widens these types to Decimal and Integer rather than using the narrower Float and Short types. According to https://www.postgresql.org/docs/7.1/static/datatype.html#DATATYPE-TABLE, Postgres maps the `smallint` type to a signed two-byte integer and the `real` / `float4` types to single precision floating point numbers.
This patch fixes this by adding more special-cases to `getCatalystType`, similar to what was done for the Derby JDBC dialect. I also fixed a similar problem in the write path which causes Spark to create integer columns in Postgres for what should have been ShortType columns.
## How was this patch tested?
New test cases in `PostgresIntegrationSuite` (which I ran manually because Jenkins can't run it right now).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#14796 from JoshRosen/postgres-jdbc-type-fixes.
### What changes were proposed in this pull request?
This PR is to fix an incorrect outer join elimination when filter's `isNotNull` constraints is unable to filter out all null-supplying rows. For example, `isnotnull(coalesce(b#227, c#238))`.
Users can hit this error when they try to use `using/natural outer join`, which is converted to a normal outer join with a `coalesce` expression on the `using columns`. For example,
```Scala
val a = Seq((1, 2), (2, 3)).toDF("a", "b")
val b = Seq((2, 5), (3, 4)).toDF("a", "c")
val c = Seq((3, 1)).toDF("a", "d")
val ab = a.join(b, Seq("a"), "fullouter")
ab.join(c, "a").explain(true)
```
The dataframe `ab` is doing `using full-outer join`, which is converted to a normal outer join with a `coalesce` expression. Constraints inference generates a `Filter` with constraints `isnotnull(coalesce(b#227, c#238))`. Then, it triggers a wrong outer join elimination and generates a wrong result.
```
Project [a#251, b#227, c#237, d#247]
+- Join Inner, (a#251 = a#246)
:- Project [coalesce(a#226, a#236) AS a#251, b#227, c#237]
: +- Join FullOuter, (a#226 = a#236)
: :- Project [_1#223 AS a#226, _2#224 AS b#227]
: : +- LocalRelation [_1#223, _2#224]
: +- Project [_1#233 AS a#236, _2#234 AS c#237]
: +- LocalRelation [_1#233, _2#234]
+- Project [_1#243 AS a#246, _2#244 AS d#247]
+- LocalRelation [_1#243, _2#244]
== Optimized Logical Plan ==
Project [a#251, b#227, c#237, d#247]
+- Join Inner, (a#251 = a#246)
:- Project [coalesce(a#226, a#236) AS a#251, b#227, c#237]
: +- Filter isnotnull(coalesce(a#226, a#236))
: +- Join FullOuter, (a#226 = a#236)
: :- LocalRelation [a#226, b#227]
: +- LocalRelation [a#236, c#237]
+- LocalRelation [a#246, d#247]
```
**A note to the `Committer`**, please also give the credit to dongjoon-hyun who submitted another PR for fixing this issue. https://github.com/apache/spark/pull/14580
### How was this patch tested?
Added test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#14661 from gatorsmile/fixOuterJoinElimination.
## What changes were proposed in this pull request?
Method `SQLContext.parseDataType(dataTypeString: String)` could be removed, we should use `SparkSession.parseDataType(dataTypeString: String)` instead.
This require updating PySpark.
## How was this patch tested?
Existing test cases.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#14790 from jiangxb1987/parseDataType.
### What changes were proposed in this pull request?
Since `HiveClient` is used to interact with the Hive metastore, it should be hidden in `HiveExternalCatalog`. After moving `HiveClient` into `HiveExternalCatalog`, `HiveSharedState` becomes a wrapper of `HiveExternalCatalog`. Thus, removal of `HiveSharedState` becomes straightforward. After removal of `HiveSharedState`, the reflection logic is directly applied on the choice of `ExternalCatalog` types, based on the configuration of `CATALOG_IMPLEMENTATION`.
~~`HiveClient` is also used/invoked by the other entities besides HiveExternalCatalog, we defines the following two APIs: getClient and getNewClient~~
### How was this patch tested?
The existing test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#14757 from gatorsmile/removeHiveClient.
## What changes were proposed in this pull request?
### Default - ISO 8601
Currently, CSV datasource is writing `Timestamp` and `Date` as numeric form and JSON datasource is writing both as below:
- CSV
```
// TimestampType
1414459800000000
// DateType
16673
```
- Json
```
// TimestampType
1970-01-01 11:46:40.0
// DateType
1970-01-01
```
So, for CSV we can't read back what we write and for JSON it becomes ambiguous because the timezone is being missed.
So, this PR make both **write** `Timestamp` and `Date` in ISO 8601 formatted string (please refer the [ISO 8601 specification](https://www.w3.org/TR/NOTE-datetime)).
- For `Timestamp` it becomes as below: (`yyyy-MM-dd'T'HH:mm:ss.SSSZZ`)
```
1970-01-01T02:00:01.000-01:00
```
- For `Date` it becomes as below (`yyyy-MM-dd`)
```
1970-01-01
```
### Custom date format option - `dateFormat`
This PR also adds the support to write and read dates and timestamps in a formatted string as below:
- **DateType**
- With `dateFormat` option (e.g. `yyyy/MM/dd`)
```
+----------+
| date|
+----------+
|2015/08/26|
|2014/10/27|
|2016/01/28|
+----------+
```
### Custom date format option - `timestampFormat`
- **TimestampType**
- With `dateFormat` option (e.g. `dd/MM/yyyy HH:mm`)
```
+----------------+
| date|
+----------------+
|2015/08/26 18:00|
|2014/10/27 18:30|
|2016/01/28 20:00|
+----------------+
```
## How was this patch tested?
Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#14279 from HyukjinKwon/SPARK-16216-json-csv.
## What changes were proposed in this pull request?
Actually Spark SQL doesn't support index, the catalog table type `INDEX` is from Hive. However, most operations in Spark SQL can't handle index table, e.g. create table, alter table, etc.
Logically index table should be invisible to end users, and Hive also generates special table name for index table to avoid users accessing it directly. Hive has special SQL syntax to create/show/drop index tables.
At Spark SQL side, although we can describe index table directly, but the result is unreadable, we should use the dedicated SQL syntax to do it(e.g. `SHOW INDEX ON tbl`). Spark SQL can also read index table directly, but the result is always empty.(Can hive read index table directly?)
This PR remove the table type `INDEX`, to make it clear that Spark SQL doesn't support index currently.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#14752 from cloud-fan/minor2.
## What changes were proposed in this pull request?
Some JDBC driver (for example PostgreSQL) does not use the underlying exception as cause, but have another APIs (getNextException) to access that, so it it's included in the error logging, making us hard to find the root cause, especially in batch mode.
This PR will pull out the next exception and add it as cause (if it's different) or suppressed (if there is another different cause).
## How was this patch tested?
Can't reproduce this on the default JDBC driver, so did not add a regression test.
Author: Davies Liu <davies@databricks.com>
Closes#14722 from davies/keep_cause.
## What changes were proposed in this pull request?
Use `CatalystConf.resolver` consistently for case-sensitivity comparison (removed dups).
## How was this patch tested?
Local build. Waiting for Jenkins to ensure clean build and test.
Author: Jacek Laskowski <jacek@japila.pl>
Closes#14771 from jaceklaskowski/17199-catalystconf-resolver.
## What changes were proposed in this pull request?
This is a sub-task of [SPARK-16283](https://issues.apache.org/jira/browse/SPARK-16283) (Implement percentile_approx SQL function), which moves class QuantileSummaries to project catalyst so that it can be reused when implementing aggregation function `percentile_approx`.
## How was this patch tested?
This PR only does class relocation, class implementation is not changed.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#14754 from clockfly/move_QuantileSummaries_to_catalyst.
## What changes were proposed in this pull request?
`CreateHiveTableAsSelectLogicalPlan` is a dead code after refactoring.
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#14707 from gatorsmile/removeCreateHiveTable.
## What changes were proposed in this pull request?
The range operator previously didn't support SQL generation, which made it not possible to use in views.
## How was this patch tested?
Unit tests.
cc hvanhovell
Author: Eric Liang <ekl@databricks.com>
Closes#14724 from ericl/spark-17162.
## What changes were proposed in this pull request?
Fix some typos in comments and test hints
## How was this patch tested?
N/A.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#14755 from clockfly/fix_minor_typo.
## What changes were proposed in this pull request?
In 2.0, we change the threshold of splitting expressions from 16K to 64K, which cause very bad performance on wide table, because the generated method can't be JIT compiled by default (above the limit of 8K bytecode).
This PR will decrease it to 1K, based on the benchmark results for a wide table with 400 columns of LongType.
It also fix a bug around splitting expression in whole-stage codegen (it should not split them).
## How was this patch tested?
Added benchmark suite.
Author: Davies Liu <davies@databricks.com>
Closes#14692 from davies/split_exprs.
## What changes were proposed in this pull request?
Spark SQL doesn't have its own meta store yet, and use hive's currently. However, hive's meta store has some limitations(e.g. columns can't be too many, not case-preserving, bad decimal type support, etc.), so we have some hacks to successfully store data source table metadata into hive meta store, i.e. put all the information in table properties.
This PR moves these hacks to `HiveExternalCatalog`, tries to isolate hive specific logic in one place.
changes overview:
1. **before this PR**: we need to put metadata(schema, partition columns, etc.) of data source tables to table properties before saving it to external catalog, even the external catalog doesn't use hive metastore(e.g. `InMemoryCatalog`)
**after this PR**: the table properties tricks are only in `HiveExternalCatalog`, the caller side doesn't need to take care of it anymore.
2. **before this PR**: because the table properties tricks are done outside of external catalog, so we also need to revert these tricks when we read the table metadata from external catalog and use it. e.g. in `DescribeTableCommand` we will read schema and partition columns from table properties.
**after this PR**: The table metadata read from external catalog is exactly the same with what we saved to it.
bonus: now we can create data source table using `SessionCatalog`, if schema is specified.
breaks: `schemaStringLengthThreshold` is not configurable anymore. `hive.default.rcfile.serde` is not configurable anymore.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#14155 from cloud-fan/catalog-table.
## What changes were proposed in this pull request?
Currently, `NullPropagation` optimizer replaces `COUNT` on null literals in a bottom-up fashion. During that, `WindowExpression` is not covered properly. This PR adds the missing propagation logic.
**Before**
```scala
scala> sql("SELECT COUNT(1 + NULL) OVER ()").show
java.lang.UnsupportedOperationException: Cannot evaluate expression: cast(0 as bigint) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
```
**After**
```scala
scala> sql("SELECT COUNT(1 + NULL) OVER ()").show
+----------------------------------------------------------------------------------------------+
|count((1 + CAST(NULL AS INT))) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)|
+----------------------------------------------------------------------------------------------+
| 0|
+----------------------------------------------------------------------------------------------+
```
## How was this patch tested?
Pass the Jenkins test with a new test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#14689 from dongjoon-hyun/SPARK-17098.
## What changes were proposed in this pull request?
This patch fixes a longstanding issue with one of the RelationalGroupedDataset.agg function. Even though the signature accepts vararg of pairs, the underlying implementation turns the seq into a map, and thus not order preserving nor allowing multiple aggregates per column.
This change also allows users to use this function to run multiple different aggregations for a single column, e.g.
```
agg("age" -> "max", "age" -> "count")
```
## How was this patch tested?
Added a test case in DataFrameAggregateSuite.
Author: petermaxlee <petermaxlee@gmail.com>
Closes#14697 from petermaxlee/SPARK-17124.
## What changes were proposed in this pull request?
Currently `LogicalRelation.newInstance()` simply creates another `LogicalRelation` object with the same parameters. However, the `newInstance()` method inherited from `MultiInstanceRelation` should return a copy of object with unique expression ids. Current `LogicalRelation.newInstance()` can cause failure when doing self-join.
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#14682 from viirya/fix-localrelation.
## What changes were proposed in this pull request?
This patch adds support for SQL generation for inline tables. With this, it would be possible to create a view that depends on inline tables.
## How was this patch tested?
Added a test case in LogicalPlanToSQLSuite.
Author: petermaxlee <petermaxlee@gmail.com>
Closes#14709 from petermaxlee/SPARK-17150.
## What changes were proposed in this pull request?
Modifies error message for numeric literals to
Numeric literal <literal> does not fit in range [min, max] for type <T>
## How was this patch tested?
Fixed up the error messages for literals.sql in SqlQueryTestSuite and re-ran via sbt. Also fixed up error messages in ExpressionParserSuite
Author: Srinath Shankar <srinath@databricks.com>
Closes#14721 from srinathshankar/sc4296.
## What changes were proposed in this pull request?
This patch creates array.sql in SQLQueryTestSuite for testing array related functions, including:
- indexing
- array creation
- size
- array_contains
- sort_array
## How was this patch tested?
The patch itself is about adding tests.
Author: petermaxlee <petermaxlee@gmail.com>
Closes#14708 from petermaxlee/SPARK-17149.
## What changes were proposed in this pull request?
This patch changes predicate pushdown optimization rule (PushDownPredicate) from using a blacklist to a whitelist. That is to say, operators must be explicitly allowed. This approach is more future-proof: previously it was possible for us to introduce a new operator and then render the optimization rule incorrect.
This also fixes the bug that previously we allowed pushing filter beneath limit, which was incorrect. That is to say, before this patch, the optimizer would rewrite
```
select * from (select * from range(10) limit 5) where id > 3
to
select * from range(10) where id > 3 limit 5
```
## How was this patch tested?
- a unit test case in FilterPushdownSuite
- an end-to-end test in limit.sql
Author: Reynold Xin <rxin@databricks.com>
Closes#14713 from rxin/SPARK-16994.
## What changes were proposed in this pull request?
This patch improves inline table support with the following:
1. Support type coercion.
2. Support using foldable expressions. Previously only literals were supported.
3. Improve error message handling.
4. Improve test coverage.
## How was this patch tested?
Added a new unit test suite ResolveInlineTablesSuite and a new file-based end-to-end test inline-table.sql.
Author: petermaxlee <petermaxlee@gmail.com>
Closes#14676 from petermaxlee/SPARK-16947.
## What changes were proposed in this pull request?
This patch fixes the problem described in SPARK-17117, i.e. "SELECT 1 / NULL" throws an analysis exception:
```
org.apache.spark.sql.AnalysisException: cannot resolve '(1 / NULL)' due to data type mismatch: differing types in '(1 / NULL)' (int and null).
```
The problem is that division type coercion did not take null type into account.
## How was this patch tested?
A unit test for the type coercion, and a few end-to-end test cases using SQLQueryTestSuite.
Author: petermaxlee <petermaxlee@gmail.com>
Closes#14695 from petermaxlee/SPARK-17117.
## What changes were proposed in this pull request?
This adds analyzer rules for resolving table-valued functions, and adds one builtin implementation for range(). The arguments for range() are the same as those of `spark.range()`.
## How was this patch tested?
Unit tests.
cc hvanhovell
Author: Eric Liang <ekl@databricks.com>
Closes#14656 from ericl/sc-4309.
## What changes were proposed in this pull request?
This patch introduces a new private ReduceAggregator interface that is a subclass of Aggregator. ReduceAggregator only requires a single associative and commutative reduce function. ReduceAggregator is also used to implement KeyValueGroupedDataset.reduceGroups in order to support partial aggregation.
Note that the pull request was initially done by viirya.
## How was this patch tested?
Covered by original tests for reduceGroups, as well as a new test suite for ReduceAggregator.
Author: Reynold Xin <rxin@databricks.com>
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#14576 from rxin/reduceAggregator.
## What changes were proposed in this pull request?
A TreeNodeException is thrown when executing the following minimal example in Spark 2.0.
import spark.implicits._
case class test (x: Int, q: Int)
val d = Seq(1).toDF("x")
d.withColumn("q", lit(0)).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show
d.withColumn("q", expr("0")).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show
The problem is at `FoldablePropagation`. The rule will do `transformExpressions` on `LogicalPlan`. The query above contains a `MapGroups` which has a parameter `dataAttributes:Seq[Attribute]`. One attributes in `dataAttributes` will be transformed to an `Alias(literal(0), _)` in `FoldablePropagation`. `Alias` is not an `Attribute` and causes the error.
We can't easily detect such type inconsistency during transforming expressions. A direct approach to this problem is to skip doing `FoldablePropagation` on object operators as they should not contain such expressions.
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#14648 from viirya/flat-mapping.