Current Spark SQL parser can have pretty confusing error messages when parsing an incorrect SELECT SQL statement. The proposed fix has the following effect.
BEFORE:
```
spark-sql> SELECT * FROM test WHERE x NOT NULL;
Error in query:
mismatched input 'FROM' expecting {<EOF>, 'CLUSTER', 'DISTRIBUTE', 'EXCEPT', 'GROUP', 'HAVING', 'INTERSECT', 'LATERAL', 'LIMIT', 'ORDER', 'MINUS', 'SORT', 'UNION', 'WHERE', 'WINDOW'}(line 1, pos 9)
== SQL ==
SELECT * FROM test WHERE x NOT NULL
---------^^^
```
where in fact the error message should be hinted to be near `NOT NULL`.
AFTER:
```
spark-sql> SELECT * FROM test WHERE x NOT NULL;
Error in query:
mismatched input 'NOT' expecting {<EOF>, 'AND', 'CLUSTER', 'DISTRIBUTE', 'EXCEPT', 'GROUP', 'HAVING', 'INTERSECT', 'LIMIT', 'OR', 'ORDER', 'MINUS', 'SORT', 'UNION', 'WINDOW'}(line 1, pos 27)
== SQL ==
SELECT * FROM test WHERE x NOT NULL
---------------------------^^^
```
In fact, this problem is brought by some problematic Spark SQL grammar. There are two kinds of SELECT statements that are supported by Hive (and thereby supported in SparkSQL):
* `FROM table SELECT blahblah SELECT blahblah`
* `SELECT blah FROM table`
*Reference* [HiveQL single-from stmt grammar](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g)
It is fine when these two SELECT syntaxes are supported separately. However, since we are currently supporting these two kinds of syntaxes in a single ANTLR rule, this can be problematic and therefore leading to confusing parser errors. This is because when a SELECT clause was parsed, it can't tell whether the following FROM clause actually belongs to it or is just the beginning of a new `FROM table SELECT *` statement.
## What changes were proposed in this pull request?
1. Modify ANTLR grammar to fix the above-mentioned problem. This fix is important because the previous problematic grammar does affect a lot of real-world queries. Due to the previous problematic and messy grammar, we refactored the grammar related to `querySpecification`.
2. Modify `AstBuilder` to have separate visitors for `SELECT ... FROM ...` and `FROM ... SELECT ...` statements.
3. Drop the `FROM table` statement, which is supported by accident and is actually parsed in the wrong code path. Both Hive and Presto do not support this syntax.
## How was this patch tested?
Existing UTs and new UTs.
Closes#24809 from yeshengm/parser-refactor.
Authored-by: Yesheng Ma <kimi.ysma@gmail.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
## What changes were proposed in this pull request?
This PR is the same fix as https://github.com/apache/spark/pull/24816 but in vectorized `dapply` in SparkR.
## How was this patch tested?
Manually tested.
Closes#24818 from HyukjinKwon/SPARK-27971.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
The newly added `Refresh` method in PR #24401 prevented the work of moving DataSourceV2Relation into catalyst. It calls `case table: FileTable => table.fileIndex.refresh()` while `FileTable` belongs to sql/core.
More importantly, Ryan Blue pointed out DataSourceV2Relation is immutable by design, it should not have refresh method.
## How was this patch tested?
Unit test
Closes#24815 from gengliangwang/removeRefreshTable.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Flush batch timely for pandas UDF.
This could improve performance when multiple pandas UDF plans are pipelined.
When batch being flushed in time, downstream pandas UDFs will get pipelined as soon as possible, and pipeline will help hide the donwstream UDFs computation time. For example:
When the first UDF start computing on batch-3, the second pipelined UDF can start computing on batch-2, and the third pipelined UDF can start computing on batch-1.
If we do not flush each batch in time, the donwstream UDF's pipeline will lag behind too much, which may increase the total processing time.
I add flush at two places:
* JVM process feed data into python worker. In jvm side, when write one batch, flush it
* VM process read data from python worker output, In python worker side, when write one batch, flush it
If no flush, the default buffer size for them are both 65536. Especially in the ML case, in order to make realtime prediction, we will make batch size very small. The buffer size is too large for the case, which cause downstream pandas UDF pipeline lag behind too much.
### Note
* This is only applied to pandas scalar UDF.
* Do not flush for each batch. The minimum interval between two flush is 0.1 second. This avoid too frequent flushing when batch size is small. It works like:
```
last_flush_time = time.time()
for batch in iterator:
writer.write_batch(batch)
flush_time = time.time()
if self.flush_timely and (flush_time - last_flush_time > 0.1):
stream.flush()
last_flush_time = flush_time
```
## How was this patch tested?
### Benchmark to make sure the flush do not cause performance regression
#### Test code:
```
numRows = ...
batchSize = ...
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', str(batchSize))
df = spark.range(1, numRows + 1, numPartitions=1).select(col('id').alias('a'))
pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
return x + 10
beg_time = time.time()
result = df.select(sum(fp1('a'))).head()
print("result: " + str(result[0]))
print("consume time: " + str(time.time() - beg_time))
```
#### Test Result:
params | Consume time (Before) | Consume time (After)
------------ | ----------------------- | ----------------------
numRows=100000000, batchSize=10000 | 23.43s | 24.64s
numRows=100000000, batchSize=1000 | 36.73s | 34.50s
numRows=10000000, batchSize=100 | 35.67s | 32.64s
numRows=1000000, batchSize=10 | 33.60s | 32.11s
numRows=100000, batchSize=1 | 33.36s | 31.82s
### Benchmark pipelined pandas UDF
#### Test code:
```
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a'))
pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
print("run fp1")
time.sleep(1)
return x + 100
pandas_udf("int", PandasUDFType.SCALAR)
def fp2(x, y):
print("run fp2")
time.sleep(1)
return x + y
beg_time = time.time()
result = df.select(sum(fp2(fp1('a'), col('a')))).head()
print("result: " + str(result[0]))
print("consume time: " + str(time.time() - beg_time))
```
#### Test Result:
**Before**: consume time: 63.57s
**After**: consume time: 32.43s
**So the PR improve performance by make downstream UDF get pipelined early.**
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes#24734 from WeichenXu123/improve_pandas_udf_pipeline.
Lead-authored-by: WeichenXu <weichen.xu@databricks.com>
Co-authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
In PR https://github.com/apache/spark/pull/24365, we pass in the partitionBy columns as options in `DataFrameWriter`. To make this change less intrusive for a patch release, we added a feature flag `LEGACY_PASS_PARTITION_BY_AS_OPTIONS` with the default to be false.
For 3.0, we should just do the correct behavior for DSV1, i.e., always passing partitionBy as options, and remove this legacy feature flag.
## How was this patch tested?
Existing tests.
Closes#24784 from liwensun/SPARK-27453-default.
Authored-by: liwensun <liwen.sun@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Issued fixed in https://github.com/apache/spark/pull/24734 but that PR might takes longer to merge.
## How was this patch tested?
It should pass existing unit tests.
Closes#24816 from mengxr/SPARK-27968.
Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
## What changes were proposed in this pull request?
Move methods that implement v2 catalog operations to CatalogV2Util so they can be used in #24768.
## How was this patch tested?
Behavior is validated by existing tests.
Closes#24813 from rdblue/SPARK-27964-add-catalog-v2-util.
Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
I believe the log message: `Committer $committerClass is not a ParquetOutputCommitter and cannot create job summaries. Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.` is at odds with the `if` statement that logs the warning. Despite the instructions in the warning, users still encounter the warning if `JOB_SUMMARY_LEVEL` is already set to `NONE`.
This pull request introduces a change to skip logging the warning if `JOB_SUMMARY_LEVEL` is set to `NONE`.
## How was this patch tested?
I built to make sure everything still compiled and I ran the existing test suite. I didn't feel it was worth the overhead to add a test to make sure a log message does not get logged, but if reviewers feel differently, I can add one.
Closes#24808 from jmsanders/master.
Authored-by: Jordan Sanders <jmsanders@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This moves parsing logic for `ALTER TABLE` into Catalyst and adds parsed logical plans for alter table changes that use multi-part identifiers. This PR is similar to SPARK-27108, PR #24029, that created parsed logical plans for create and CTAS.
* Create parsed logical plans
* Move parsing logic into Catalyst's AstBuilder
* Convert to DataSource plans in DataSourceResolution
* Parse `ALTER TABLE ... SET LOCATION ...` separately from the partition variant
* Parse `ALTER TABLE ... ALTER COLUMN ... [TYPE dataType] [COMMENT comment]` [as discussed on the dev list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Syntax-for-table-DDL-td25197.html#a25270)
* Parse `ALTER TABLE ... RENAME COLUMN ... TO ...`
* Parse `ALTER TABLE ... DROP COLUMNS ...`
## How was this patch tested?
* Added new tests in Catalyst's `DDLParserSuite`
* Moved converted plan tests from SQL `DDLParserSuite` to `PlanResolutionSuite`
* Existing tests for regressions
Closes#24723 from rdblue/SPARK-27857-add-alter-table-statements-in-catalyst.
Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Extracting the common purge "behaviour" to the parent StreamExecution.
## How was this patch tested?
No added behaviour so relying on existing tests.
Closes#24781 from jaceklaskowski/StreamExecution-purge.
Authored-by: Jacek Laskowski <jacek@japila.pl>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Currently we are in a strange status that, some data source v2 interfaces(catalog related) are in sql/catalyst, some data source v2 interfaces(Table, ScanBuilder, DataReader, etc.) are in sql/core.
I don't see a reason to keep data source v2 API in 2 modules. If we should pick one module, I think sql/catalyst is the one to go.
Catalyst module already has some user-facing stuff like DataType, Row, etc. And we have to update `Analyzer` and `SessionCatalog` to support the new catalog plugin, which needs to be in the catalyst module.
This PR can solve the problem we have in https://github.com/apache/spark/pull/24246
## How was this patch tested?
existing tests
Closes#24416 from cloud-fan/move.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
Similar to https://github.com/apache/spark/pull/24070, we now propagate SparkExceptions that are encountered during the collect in the java process to the python process.
Fixes https://jira.apache.org/jira/browse/SPARK-27805
## How was this patch tested?
Added a new unit test
Closes#24677 from dvogelbacher/dv/betterErrorMsgWhenUsingArrow.
Authored-by: David Vogelbacher <dvogelbacher@palantir.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
## What changes were proposed in this pull request?
The current `SQLTestUtils` created many `withXXX` utility functions to clean up tables/views/caches created for testing purpose. Java's `try-with-resources` statement does something similar, but it does not mask exception throwing in the try block with any exception caught in the 'close()' statement. Exception caught in the 'close()' statement would add as a suppressed exception instead.
This PR standardizes those 'withXXX' function to use`Utils.tryWithSafeFinally` function, which does something similar to Java's try-with-resources statement. The purpose of this proposal is to help developers to identify what actually breaks their tests.
## How was this patch tested?
Existing testcases.
Closes#24747 from William1104/feature/SPARK-27772-2.
Lead-authored-by: williamwong <william1104@gmail.com>
Co-authored-by: William Wong <william1104@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
In the previous work of csv/json migration, CSVFileFormat/JsonFileFormat is removed in the table provider whitelist of `AlterTableAddColumnsCommand.verifyAlterTableAddColumn`:
https://github.com/apache/spark/pull/24005https://github.com/apache/spark/pull/24058
This is regression. If a table is created with Provider `org.apache.spark.sql.execution.datasources.csv.CSVFileFormat` or `org.apache.spark.sql.execution.datasources.json.JsonFileFormat`, Spark should allow the "alter table add column" operation.
## How was this patch tested?
Unit test
Closes#24776 from gengliangwang/v1Table.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
This PR targets to deduplicate hardcoded `py4j-0.10.8.1-src.zip` in order to make py4j upgrade easier.
## How was this patch tested?
N/A
Closes#24770 from HyukjinKwon/minor-py4j-dedup.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This PR aims to add `interceptParseException` test utility function to `AnalysisTest` to reduce the duplications of `intercept` functions.
## How was this patch tested?
Pass the Jenkins with the updated test suites.
Closes#24769 from dongjoon-hyun/SPARK-27920.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
If we want to keep corrupt record when reading CSV, we provide a new column into the schema, that is `columnNameOfCorruptRecord`. But this new column isn't actually a column in CSV header. So if `enforceSchema` is disabled, `CSVHeaderChecker` throws a exception complaining that number of column in CSV header isn't equal to that in the schema.
## How was this patch tested?
Added test.
Closes#24757 from viirya/SPARK-27873.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This PR targets to add an integrated test base for various UDF test cases so that Scalar UDF, Python UDF and Scalar Pandas UDFs can be tested in SBT & Maven tests.
### Problem
One of the problems we face is that: `ExtractPythonUDFs` (for Python UDF and Scalar Pandas UDF) has unevaluable expressions that always has to be wrapped with special plans. This special rule seems producing many issues, for instance, SPARK-27803, SPARK-26147, SPARK-26864, SPARK-26293, SPARK-25314 and SPARK-24721.
### Why do we have less test cases dedicated for SQL and plans with Python UDFs?
We have virtually no such SQL (or plan) dedicated tests in PySpark to catch such issues because:
- A developer should know all the analyzer, the optimizer, SQL, PySpark, Py4J and version differences in Python to write such good test cases
- To test plans, we should access to plans in JVM via Py4J which is tricky, messy and duplicates Scala test cases
- Usually we just add end-to-end test cases in PySpark therefore there are not so many dedicated examples to refer to write in PySpark
It is also a non-trivial overhead to switch test base and method (IMHO).
### How does this PR fix?
This PR adds Python UDF and Scalar Pandas UDF into our `*.sql` file based test base in runtime of SBT / Maven test cases. It generates Python-pickled instance (consisting of return type and Python native function) that is used in Python or Scalar Pandas UDF and directly brings into JVM.
After that, (we don't interact via Py4J) run the tests directly in JVM - we can just register and run Python UDF and Scalar Pandas UDF in JVM.
Currently, I only integrated this change into SQL file based testing. This is how works with test files under `udf` directory:
After the test files under 'inputs/udf' directory are detected, it creates three test cases:
- Scala UDF test case with a Scalar UDF registered named 'udf'.
- Python UDF test case with a Python UDF registered named 'udf' iff Python executable and pyspark are available.
- Scalar Pandas UDF test case with a Scalar Pandas UDF registered named 'udf' iff Python executable, pandas, pyspark and pyarrow are available.
Therefore, UDF test cases should have single input and output files but executed by three different types of UDFs.
For instance,
```sql
CREATE TEMPORARY VIEW ta AS
SELECT udf(a) AS a, udf('a') AS tag FROM t1
UNION ALL
SELECT udf(a) AS a, udf('b') AS tag FROM t2;
CREATE TEMPORARY VIEW tb AS
SELECT udf(a) AS a, udf('a') AS tag FROM t3
UNION ALL
SELECT udf(a) AS a, udf('b') AS tag FROM t4;
SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag;
```
will be ran 3 times with Scalar UDF, Python UDF and Scalar Pandas UDF each.
### Appendix
Plus, this PR adds `IntegratedUDFTestUtils` which enables to test and execute Python UDF and Scalar Pandas UDFs as below:
To register Python UDF in SQL:
```scala
IntegratedUDFTestUtils.registerTestUDF(TestPythonUDF(name = "udf"), spark)
```
To register Scalar Pandas UDF in SQL:
```scala
IntegratedUDFTestUtils.registerTestUDF(TestScalarPandasUDF(name = "udf"), spark)
```
To use it in Scala API:
```scala
spark.select(expr("udf(1)").show()
```
To use it in SQL:
```scala
sql("SELECT udf(1)").show()
```
This util could be used in the future for better coverage with Scala API combinations as well.
## How was this patch tested?
Tested via the command below:
```bash
build/sbt "sql/test-only *SQLQueryTestSuite -- -z udf/udf-inner-join.sql"
```
```
[info] SQLQueryTestSuite:
[info] - udf/udf-inner-join.sql - Scala UDF (5 seconds, 47 milliseconds)
[info] - udf/udf-inner-join.sql - Python UDF (4 seconds, 335 milliseconds)
[info] - udf/udf-inner-join.sql - Scalar Pandas UDF (5 seconds, 423 milliseconds)
```
[python] unavailable:
```
[info] SQLQueryTestSuite:
[info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 577 milliseconds)
[info] - udf/udf-inner-join.sql - Python UDF is skipped because [pyton] and/or pyspark were not available. !!! IGNORED !!!
[info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pyspark,pandas and/or pyarrow were not available in [pyton]. !!! IGNORED !!!
```
pyspark unavailable:
```
[info] SQLQueryTestSuite:
[info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 991 milliseconds)
[info] - udf/udf-inner-join.sql - Python UDF is skipped because [python] and/or pyspark were not available. !!! IGNORED !!!
[info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pyspark,pandas and/or pyarrow were not available in [python]. !!! IGNORED !!!
```
pandas and/or pyarrow unavailable:
```
[info] SQLQueryTestSuite:
[info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 713 milliseconds)
[info] - udf/udf-inner-join.sql - Python UDF (3 seconds, 89 milliseconds)
[info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pandas and/or pyarrow were not available in [python]. !!! IGNORED !!!
```
Closes#24752 from HyukjinKwon/udf-tests.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
As outlined in the JIRA by JoshRosen, our conversion mechanism from catalyst types to scala ones is pretty inefficient for primitive data types. Indeed, in these cases, most of the times we are adding useless calls to `identity` function or anyway to functions which return the same value. Using the information we have when we generate the code, we can avoid most of these overheads.
## How was this patch tested?
Here is a simple test which shows the benefit that this PR can bring:
```
test("SPARK-27684: perf evaluation") {
val intLongUdf = ScalaUDF(
(a: Int, b: Long) => a + b, LongType,
Literal(1) :: Literal(1L) :: Nil,
true :: true :: Nil,
nullable = false)
val plan = generateProject(
MutableProjection.create(Alias(intLongUdf, s"udf")() :: Nil),
intLongUdf)
plan.initialize(0)
var i = 0
val N = 100000000
val t0 = System.nanoTime()
while(i < N) {
plan(EmptyRow).get(0, intLongUdf.dataType)
plan(EmptyRow).get(0, intLongUdf.dataType)
plan(EmptyRow).get(0, intLongUdf.dataType)
plan(EmptyRow).get(0, intLongUdf.dataType)
plan(EmptyRow).get(0, intLongUdf.dataType)
plan(EmptyRow).get(0, intLongUdf.dataType)
plan(EmptyRow).get(0, intLongUdf.dataType)
plan(EmptyRow).get(0, intLongUdf.dataType)
plan(EmptyRow).get(0, intLongUdf.dataType)
plan(EmptyRow).get(0, intLongUdf.dataType)
i += 1
}
val t1 = System.nanoTime()
println(s"Avg time: ${(t1 - t0).toDouble / N} ns")
}
```
The output before the patch is:
```
Avg time: 51.27083294 ns
```
after, we get:
```
Avg time: 11.85874227 ns
```
which is ~5X faster.
Moreover a benchmark has been added for Scala UDF. The output after the patch can be seen in this PR, before the patch, the output was:
```
================================================================================================
UDF with mixed input types
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU 2.80GHz
long/nullable int/string to string: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int/string to string wholestage off 257 287 42 0,4 2569,5 1,0X
long/nullable int/string to string wholestage on 158 172 18 0,6 1579,0 1,6X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU 2.80GHz
long/nullable int/string to option: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int/string to option wholestage off 104 107 5 1,0 1037,9 1,0X
long/nullable int/string to option wholestage on 80 92 12 1,2 804,0 1,3X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU 2.80GHz
long/nullable int to primitive: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int to primitive wholestage off 71 76 7 1,4 712,1 1,0X
long/nullable int to primitive wholestage on 64 71 6 1,6 636,2 1,1X
================================================================================================
UDF with primitive types
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU 2.80GHz
long/nullable int to string: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int to string wholestage off 60 60 0 1,7 600,3 1,0X
long/nullable int to string wholestage on 55 64 8 1,8 551,2 1,1X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU 2.80GHz
long/nullable int to option: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int to option wholestage off 66 73 9 1,5 663,0 1,0X
long/nullable int to option wholestage on 30 32 2 3,3 300,7 2,2X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU 2.80GHz
long/nullable int/string to primitive: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int/string to primitive wholestage off 32 35 5 3,2 316,7 1,0X
long/nullable int/string to primitive wholestage on 41 68 17 2,4 414,0 0,8X
```
The improvements are particularly visible in the second case, ie. when only primitive types are used as inputs.
Closes#24636 from mgaido91/SPARK-27684.
Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Josh Rosen <rosenville@gmail.com>
## What changes were proposed in this pull request?
Support DROP TABLE from V2 catalogs.
Move DROP TABLE into catalyst.
Move parsing tests for DROP TABLE/VIEW to PlanResolutionSuite to validate existing behavior.
Add new tests fo catalyst parser suite.
Separate DROP VIEW into different code path from DROP TABLE.
Move DROP VIEW into catalyst as a new operator.
Add a meaningful exception to indicate view is not currently supported in v2 catalog.
## How was this patch tested?
New unit tests.
Existing unit tests in catalyst and sql core.
Closes#24686 from jzhuge/SPARK-27813-pr.
Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This pr wrap all `PrintWriter` with `Utils.tryWithResource` to prevent resource leak.
## How was this patch tested?
Existing test
Closes#24739 from wangyum/SPARK-27875.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Require the lookup function with interface LookupCatalog. Rationale is in the review comments below.
Make `Analyzer` abstract. BaseSessionStateBuilder and HiveSessionStateBuilder implements lookupCatalog with a call to SparkSession.catalog().
Existing test cases and those that don't need catalog lookup will use a newly added `TestAnalyzer` with a default lookup function that throws` CatalogNotFoundException("No catalog lookup function")`.
Rewrote the unit test for LookupCatalog to demonstrate the interface can be used anywhere, not just Analyzer.
Removed Analyzer parameter `lookupCatalog` because we can override in the following manner:
```
new Analyzer() {
override def lookupCatalog(name: String): CatalogPlugin = ???
}
```
## How was this patch tested?
Existing unit tests.
Closes#24689 from jzhuge/SPARK-26946-follow.
Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
To follow https://github.com/apache/spark/pull/17397, the output of FileTable and DataSourceV2ScanExecBase can contain sensitive information (like Amazon keys). Such information should not end up in logs, or be exposed to non-privileged users.
This PR is to add a redaction facility for these outputs to resolve the issue. A user can enable this by setting a regex in the same spark.redaction.string.regex configuration as V1.
## How was this patch tested?
Unit test
Closes#24719 from gengliangwang/RedactionSuite.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
In order to support outer joins with null top-level objects, SPARK-15441 modified Dataset.joinWith to project both inputs into single-column structs prior to the join.
For inner joins, however, this step is unnecessary and actually harms performance: performing the nesting before the join increases the shuffled data size. As an optimization for inner joins only, we can move this nesting to occur after the join (effectively switching back to the pre-SPARK-15441 behavior; see #13425).
## How was this patch tested?
Existing tests, which I strengthened to also make assertions about the join result's nullability (since this guards against a bug I almost introduced during prototyping).
Here's a quick `spark-shell` experiment demonstrating the reduction in shuffle size:
```scala
// With --conf spark.shuffle.compress=false
sql("set spark.sql.autoBroadcastJoinThreshold=-1") // for easier shuffle measurements
case class Foo(a: Long, b: Long)
val left = spark.range(10000).map(x => Foo(x, x))
val right = spark.range(10000).map(x => Foo(x, x))
left.joinWith(right, left("a") === right("a"), "inner").rdd.count()
left.joinWith(right, left("a") === right("a"), "left").rdd.count()
```
With inner join (which benefits from this PR's optimization) we shuffle 546.9 KiB. With left outer join (whose plan hasn't changed, therefore being a representation of the state before this PR) we shuffle 859.4 KiB. Shuffle compression (which is enabled by default) narrows this gap a bit: with compression, outer joins shuffle about 12% more than inner joins.
Closes#24693 from JoshRosen/fast-join-with-for-inner-joins.
Lead-authored-by: Josh Rosen <rosenville@gmail.com>
Co-authored-by: Josh Rosen <joshrosen@stripe.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
I checked the code of
`org.apache.spark.sql.execution.datasources.DataSource`
, there exists duplicate Java reflection.
`sourceSchema`,`createSource`,`createSink`,`resolveRelation`,`writeAndRead`, all the methods call the `providingClass.getConstructor().newInstance()`.
The instance of `providingClass` is stateless, such as:
`KafkaSourceProvider`
`RateSourceProvider`
`TextSocketSourceProvider`
`JdbcRelationProvider`
`ConsoleSinkProvider`
AFAIK, Java reflection will result in significant performance issue.
The oracle website [https://docs.oracle.com/javase/tutorial/reflect/index.html](https://docs.oracle.com/javase/tutorial/reflect/index.html) contains some performance description about Java reflection:
```
Performance Overhead
Because reflection involves types that are dynamically resolved, certain Java virtual machine optimizations can not be performed. Consequently, reflective operations have slower performance than their non-reflective counterparts, and should be avoided in sections of code which are called frequently in performance-sensitive applications.
```
I have found some performance cost test of Java reflection as follows:
[https://blog.frankel.ch/performance-cost-of-reflection/](https://blog.frankel.ch/performance-cost-of-reflection/) contains performance cost test.
[https://stackoverflow.com/questions/435553/java-reflection-performance](https://stackoverflow.com/questions/435553/java-reflection-performance) has a discussion of java reflection.
So I think should avoid duplicate Java reflection and reuse the instance of `providingClass`.
## How was this patch tested?
Exists UT.
Closes#24647 from beliefer/optimize-DataSource.
Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
descending sort in HDFSMetadataLog.getLatest instead of two action of ascending sort and reverse
## How was this patch tested?
Jenkins
Closes#24711 from wenxuanguan/bug-fix-hdfsmetadatalog.
Authored-by: wenxuanguan <choose_home@126.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/22104 , we create the python-eval nodes at the end of the optimization phase, which causes a problem.
After the main optimization batch, Filter and Project nodes are usually pushed to the bottom, near the scan node. However, if we extract Python UDFs from Filter/Project, and create a python-eval node under Filter/Project, it will break column pruning/filter pushdown of the scan node.
There are some hacks in the `ExtractPythonUDFs` rule, to duplicate the column pruning and filter pushdown logic. However, it has some bugs as demonstrated in the new test case(only column pruning is broken). This PR removes the hacks and re-apply the column pruning and filter pushdown rules explicitly.
**Before:**
```
...
== Analyzed Logical Plan ==
a: bigint
Project [a#168L]
+- Filter dummyUDF(a#168L)
+- Relation[a#168L,b#169L] parquet
== Optimized Logical Plan ==
Project [a#168L]
+- Project [a#168L, b#169L]
+- Filter pythonUDF0#174: boolean
+- BatchEvalPython [dummyUDF(a#168L)], [a#168L, b#169L, pythonUDF0#174]
+- Relation[a#168L,b#169L] parquet
== Physical Plan ==
*(2) Project [a#168L]
+- *(2) Project [a#168L, b#169L]
+- *(2) Filter pythonUDF0#174: boolean
+- BatchEvalPython [dummyUDF(a#168L)], [a#168L, b#169L, pythonUDF0#174]
+- *(1) FileScan parquet [a#168L,b#169L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/_1/bzcp960d0hlb988k90654z2w0000gp/T/spark-798bae3c-a2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
```
**After:**
```
...
== Analyzed Logical Plan ==
a: bigint
Project [a#168L]
+- Filter dummyUDF(a#168L)
+- Relation[a#168L,b#169L] parquet
== Optimized Logical Plan ==
Project [a#168L]
+- Filter pythonUDF0#174: boolean
+- BatchEvalPython [dummyUDF(a#168L)], [pythonUDF0#174]
+- Project [a#168L]
+- Relation[a#168L,b#169L] parquet
== Physical Plan ==
*(2) Project [a#168L]
+- *(2) Filter pythonUDF0#174: boolean
+- BatchEvalPython [dummyUDF(a#168L)], [pythonUDF0#174]
+- *(1) FileScan parquet [a#168L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/_1/bzcp960d0hlb988k90654z2w0000gp/T/spark-9500cafb-78..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint>
```
## How was this patch tested?
new test
Closes#24675 from cloud-fan/python.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This is a minor pr to use `#` as a marker for expression id that is embedded in the name field of SubqueryExec operator.
## How was this patch tested?
Added a small test in SubquerySuite.
Closes#24652 from dilipbiswal/subquery-name.
Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Avoid hard-coded config: `spark.rdd.parallelListingThreshold`.
## How was this patch tested?
N/A
Closes#24708 from wangyum/spark.rdd.parallelListingThreshold.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
InMemoryFileIndex.listLeafFiles should use listLocatedStatus for DistributedFileSystem. DistributedFileSystem overrides the listLocatedStatus method in order to do it with 1 single namenode call thus saving thousands of calls to getBlockLocations.
Currently in InMemoryFileIndex, all directory listings are done using FileSystem.listStatus following by individual calls to FileSystem.getFileBlockLocations. This is painstakingly slow for folders that have large numbers of files because this process happens serially and parallelism is only applied at the folder level, not the file level.
FileSystem also provides another API listLocatedStatus which returns the LocatedFileStatus objects that already have the block locations. In FileSystem main class this just delegates to listStatus and getFileBlockLocations similarly to the way Spark does it. However when HDFS specifically is the backing file system, DistributedFileSystem overrides this method and simply makes one single call to the namenode to retrieve the directory listing with the block locations. This avoids potentially thousands or more calls to namenode and also is more consistent because files will either exist with locations or not exist instead of having the FileNotFoundException exception case.
For our example directory with 6500 files, the load time of spark.read.parquet was reduced 96x from 76 seconds to .8 seconds. This savings only goes up with the number of files in the directory.
In the pull request instead of using this method always which could lead to a FileNotFoundException that could be tough to decipher in the default FileSystem implementation, this method is only used when the FileSystem is a DistributedFileSystem and otherwise the old logic still applies.
## How was this patch tested?
test suite ran
Closes#24672 from rrusso2007/master.
Authored-by: rrusso2007 <rrusso2007@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Each time, when I write a complex CREATE DATABASE/VIEW statements, I have to open the .g4 file to find the EXACT order of clauses in CREATE TABLE statement. When the order is not right, I will get A strange confusing error message generated from ANTLR4.
The original g4 grammar for CREATE VIEW is
```
CREATE [OR REPLACE] [[GLOBAL] TEMPORARY] VIEW [db_name.]view_name
[(col_name1 [COMMENT col_comment1], ...)]
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
AS select_statement
```
The proposal is to make the following clauses order insensitive.
```
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
```
–
The original g4 grammar for CREATE DATABASE is
```
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] db_name
[COMMENT comment_text]
[LOCATION path]
[WITH DBPROPERTIES (key1=val1, key2=val2, ...)]
```
The proposal is to make the following clauses order insensitive.
```
[COMMENT comment_text]
[LOCATION path]
[WITH DBPROPERTIES (key1=val1, key2=val2, ...)]
```
## How was this patch tested?
By adding new unit tests to test duplicate clauses and modifying some existing unit tests to test whether those clauses are actually order insensitive
Closes#24681 from yeshengm/create-view-parser.
Authored-by: Yesheng Ma <kimi.ysma@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
This fix prevents the rule EliminateResolvedHint from being applied again if it's already applied.
## How was this patch tested?
Added new UT.
Closes#24692 from maryannxue/eliminatehint-bug.
Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
In data source v1, save mode specified in `DataFrameWriter` is passed to data source implementation directly, and each data source can define its own behavior about save mode. This is confusing and we want to get rid of save mode in data source v2.
For data source v2, we expect data source to implement the `TableCatalog` API, and end-users use SQL(or the new write API described in [this doc](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5ace0718#heading=h.e9v1af12g5zo)) to acess data sources. The SQL API has very clear semantic and we don't need save mode at all.
However, for simple data sources that do not have table management (like a JIRA data source, a noop sink, etc.), it's not ideal to ask them to implement the `TableCatalog` API, and throw exception here and there.
`TableProvider` API is created for simple data sources. It can only get tables, without any other table management methods. This means, it can only deal with existing tables.
`TableProvider` fits well with `DataStreamReader` and `DataStreamWriter`, as they can only read/write existing tables. However, `TableProvider` doesn't fit `DataFrameWriter` well, as the save mode requires more than just get table. More specifically, `ErrorIfExists` mode needs to check if table exists, and create table. `Ignore` mode needs to check if table exists. When end-users specify `ErrorIfExists` or `Ignore` mode and write data to `TableProvider` via `DataFrameWriter`, Spark fails the query and asks users to use `Append` or `Overwrite` mode.
The file source is in the middle of `TableProvider` and `TableCatalog`: it's simple but it can check table(path) exists and create table(path). That said, file source supports all the save modes.
Currently file source implements `TableProvider`, and it's not working because `TableProvider` doesn't support `ErrorIfExists` and `Ignore` modes. Ideally we should create a new API for path-based data sources, but to unblock the work of file source v2 migration, this PR proposes to special-case file source v2 in `DataFrameWriter`, to make it work.
This PR also removes `SaveMode` from data source v2, as now only the internal file source v2 needs it.
## How was this patch tested?
existing tests
Closes#24233 from cloud-fan/file.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
This adds a v2 implementation of create table:
* `CreateV2Table` is the logical plan, named using v2 to avoid conflicting with the existing plan
* `CreateTableExec` is the physical plan
## How was this patch tested?
Added resolution and v2 SQL tests.
Closes#24617 from rdblue/SPARK-27732-add-v2-create-table.
Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Add type parameter to `TreeNodeTag`.
## How was this patch tested?
existing tests
Closes#24687 from cloud-fan/tag.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This PR is a follow up of https://github.com/apache/spark/pull/24669 to fix the wrong answers used in test cases.
Closes#24674 from dongjoon-hyun/SPARK-27800.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
https://github.com/apache/spark/pull/22275 introduced a performance improvement where we send partitions out of order to python and then, as a last step, send the partition order as well.
However, if there are no partitions we will never send the partition order and we will get an "EofError" on the python side.
This PR fixes this by also sending the partition order if there are no partitions present.
## How was this patch tested?
New unit test added.
Closes#24650 from dvogelbacher/dv/fixNoPartitionArrowConversion.
Authored-by: David Vogelbacher <dvogelbacher@palantir.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
To return accurate pushed filters in Parquet file scan(https://github.com/apache/spark/pull/24327#pullrequestreview-234775673), we can process the original data source filters in the following way:
1. For "And" operators, split the conjunctive predicates and try converting each of them. After that
1.1 if partially predicate pushed down is allowed, return convertible results;
1.2 otherwise, return the whole predicate if convertible, or empty result if not convertible.
2. For "Or" operators, if both children can be pushed down, it is partially or totally convertible; otherwise, return empty result
3. For other operators, they are not able to be partially pushed down.
2.1 if the entire predicate is convertible, return itself
2.2 otherwise, return an empty result.
This PR also contains code refactoring. Currently `ParquetFilters. createFilter ` accepts parameter `schema: MessageType` and create field mapping for every input filter. We can make it a class member and avoid creating the `nameToParquetField` mapping for every input filter.
## How was this patch tested?
Unit test
Closes#24597 from gengliangwang/refactorParquetFilters.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
If we refresh a cached table, the table cache will be first uncached and then recache (lazily). Currently, the logic is embedded in CatalogImpl.refreshTable method.
The current implementation does not preserve the cache name and storage level. As a result, cache name and cache level could be changed after a REFERSH. IMHO, it is not what a user would expect.
I would like to fix this behavior by first save the cache name and storage level for recaching the table.
Two unit tests are added to make sure cache name is unchanged upon table refresh. Before applying this patch, the test created for qualified case would fail.
Closes#24221 from William1104/feature/SPARK-27248.
Lead-authored-by: williamwong <william1104@gmail.com>
Co-authored-by: William Wong <william1104@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Because a temporary view is resolved during analysis when we create a dataset, the content of the view is determined when the dataset is created, not when it is evaluated. Now the explain result of a dataset is not correctly consistent with the collected result of it, because we use pre-analyzed logical plan of the dataset in explain command. The explain command will analyzed the logical plan passed in. So if a view is changed after the dataset was created, the plans shown by explain command aren't the same with the plan of the dataset.
```scala
scala> spark.range(10).createOrReplaceTempView("test")
scala> spark.range(5).createOrReplaceTempView("test2")
scala> spark.sql("select * from test").createOrReplaceTempView("tmp001")
scala> val df = spark.sql("select * from tmp001")
scala> spark.sql("select * from test2").createOrReplaceTempView("tmp001")
scala> df.show
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
scala> df.explain(true)
```
Before:
```scala
== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation `tmp001`
== Analyzed Logical Plan ==
id: bigint
Project [id#2L]
+- SubqueryAlias `tmp001`
+- Project [id#2L]
+- SubqueryAlias `test2`
+- Range (0, 5, step=1, splits=Some(12))
== Optimized Logical Plan ==
Range (0, 5, step=1, splits=Some(12))
== Physical Plan ==
*(1) Range (0, 5, step=1, splits=12)
```
After:
```scala
== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation `tmp001`
== Analyzed Logical Plan ==
id: bigint
Project [id#0L]
+- SubqueryAlias `tmp001`
+- Project [id#0L]
+- SubqueryAlias `test`
+- Range (0, 10, step=1, splits=Some(12))
== Optimized Logical Plan ==
Range (0, 10, step=1, splits=Some(12))
== Physical Plan ==
*(1) Range (0, 10, step=1, splits=12)
```
Previous PR to this issue has a regression when to explain an explain statement, like `sql("explain select 1").explain(true)`. This new fix is following up with hvanhovell's advice at https://github.com/apache/spark/pull/24464#issuecomment-494165538.
Explain an explain:
```scala
scala> sql("explain select 1").explain(true)
== Parsed Logical Plan ==
ExplainCommand 'Project [unresolvedalias(1, None)], false, false, false
== Analyzed Logical Plan ==
plan: string
ExplainCommand 'Project [unresolvedalias(1, None)], false, false, false
== Optimized Logical Plan ==
ExplainCommand 'Project [unresolvedalias(1, None)], false, false, false
== Physical Plan ==
Execute ExplainCommand
+- ExplainCommand 'Project [unresolvedalias(1, None)], false, false, false
```
Btw, I found there is a regression after applying hvanhovell's advice:
```scala
spark.readStream
.format("org.apache.spark.sql.streaming.test")
.load()
.explain(true)
```
```scala
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.test.TestSparkSession3e8c7175,org.apache.spark.sql.streaming.test,List(),None,List(),None,Map(),None
), dummySource, [a#559]
== Analyzed Logical Plan ==
a: int
StreamingRelation DataSource(org.apache.spark.sql.test.TestSparkSession3e8c7175,org.apache.spark.sql.streaming.test,List(),None,List(),None,Map(),Non$
), dummySource, [a#559]
== Optimized Logical Plan ==
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
dummySource
== Physical Plan ==
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
dummySource
```
So I did a change to that to fix it too.
## How was this patch tested?
Added test and manually test.
Closes#24654 from viirya/SPARK-27439-3.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>