## What changes were proposed in this pull request?
In Dataset.join we have a small hack for resolving ambiguity in the column name for self-joins. The current code supports only `EqualTo`.
The PR extends the fix to `EqualNullSafe`.
Credit for this PR should be given to daniel-shields.
## How was this patch tested?
added UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21605 from mgaido91/SPARK-24385_2.
## What changes were proposed in this pull request?
The ColumnPruning rule tries adding an extra Project if an input node produces fields more than needed, but as a post-processing step, it needs to remove the lower Project in the form of "Project - Filter - Project" otherwise it would conflict with PushPredicatesThroughProject and would thus cause a infinite optimization loop. The current post-processing method is defined as:
```
private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan transform {
case p1 Project(_, f Filter(_, p2 Project(_, child)))
if p2.outputSet.subsetOf(child.outputSet) =>
p1.copy(child = f.copy(child = child))
}
```
This method works well when there is only one Filter but would not if there's two or more Filters. In this case, there is a deterministic filter and a non-deterministic filter so they stay as separate filter nodes and cannot be combined together.
An simplified illustration of the optimization process that forms the infinite loop is shown below (F1 stands for the 1st filter, F2 for the 2nd filter, P for project, S for scan of relation, PredicatePushDown as abbrev. of PushPredicatesThroughProject):
```
F1 - F2 - P - S
PredicatePushDown => F1 - P - F2 - S
ColumnPruning => F1 - P - F2 - P - S
=> F1 - P - F2 - S (Project removed)
PredicatePushDown => P - F1 - F2 - S
ColumnPruning => P - F1 - P - F2 - S
=> P - F1 - P - F2 - P - S
=> P - F1 - F2 - P - S (only one Project removed)
RemoveRedundantProject => F1 - F2 - P - S (goes back to the loop start)
```
So the problem is the ColumnPruning rule adds a Project under a Filter (and fails to remove it in the end), and that new Project triggers PushPredicateThroughProject. Once the filters have been push through the Project, a new Project will be added by the ColumnPruning rule and this goes on and on.
The fix should be when adding Projects, the rule applies top-down, but later when removing extra Projects, the process should go bottom-up to ensure all extra Projects can be matched.
## How was this patch tested?
Added a optimization rule test in ColumnPruningSuite; and a end-to-end test in SQLQuerySuite.
Author: maryannxue <maryannxue@apache.org>
Closes#21674 from maryannxue/spark-24696.
## What changes were proposed in this pull request?
Provide a continuous processing implementation of coalesce(1), as well as allowing aggregates on top of it.
The changes in ContinuousQueuedDataReader and such are to use split.index (the ID of the partition within the RDD currently being compute()d) rather than context.partitionId() (the partition ID of the scheduled task within the Spark job - that is, the post coalesce writer). In the absence of a narrow dependency, these values were previously always the same, so there was no need to distinguish.
## How was this patch tested?
new unit test
Author: Jose Torres <torres.joseph.f+github@gmail.com>
Closes#21560 from jose-torres/coalesce.
## What changes were proposed in this pull request?
A few math functions (`abs` , `bitwiseNOT`, `isnan`, `nanvl`) are not in **math_funcs** group. They should really be.
## How was this patch tested?
Awaiting Jenkins
Author: Jacek Laskowski <jacek@japila.pl>
Closes#21448 from jaceklaskowski/SPARK-24408-math-funcs-doc.
## What changes were proposed in this pull request?
Add a new test suite to test RecordBinaryComparator.
## How was this patch tested?
New test suite.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#21570 from jiangxb1987/rbc-test.
findTightestCommonTypeOfTwo has been renamed to findTightestCommonType
## What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Closes#21597 from Fokko/fd-typo.
## What changes were proposed in this pull request?
This pr corrected the default configuration (`spark.master=local[1]`) for benchmarks. Also, this updated performance results on the AWS `r3.xlarge`.
## How was this patch tested?
N/A
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21625 from maropu/FixDataSourceReadBenchmark.
## What changes were proposed in this pull request?
In the master, when `csvColumnPruning`(implemented in [this commit](64fad0b519 (diff-d19881aceddcaa5c60620fdcda99b4c4))) enabled and partitions scanned only, it throws an exception below;
```
scala> val dir = "/tmp/spark-csv/csv"
scala> spark.range(10).selectExpr("id % 2 AS p", "id").write.mode("overwrite").partitionBy("p").csv(dir)
scala> spark.read.csv(dir).selectExpr("sum(p)").collect()
18/06/25 13:12:51 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 5)
java.lang.NullPointerException
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:197)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:190)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:309)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:309)
at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
...
```
This pr modified code to skip CSV parsing in the case.
## How was this patch tested?
Added tests in `CSVSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21631 from maropu/SPARK-24645.
## What changes were proposed in this pull request?
Updated URL/href links to include a '/' before '?id' to make links consistent and avoid http 302 redirect errors within UI port 4040 tabs.
## How was this patch tested?
Built a runnable distribution and executed jobs. Validated that http 302 redirects are no longer encountered when clicking on links within UI port 4040 tabs.
Author: Steven Kallman <SJKallmangmail.com>
Author: Kallman, Steven <Steven.Kallman@CapitalOne.com>
Closes#21600 from SJKallman/{Spark-24553}{WEB-UI}-redirect-href-fixes.
## What changes were proposed in this pull request?
This pr added code to verify a schema in Json/Orc/ParquetFileFormat along with CSVFileFormat.
## How was this patch tested?
Added verification tests in `FileBasedDataSourceSuite` and `HiveOrcSourceSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21389 from maropu/SPARK-24204.
## What changes were proposed in this pull request?
Set createTime for every hive partition created in Spark SQL, which could be used to manage data lifecycle in Hive warehouse. We found that almost every partition modified by spark sql has not been set createTime.
```
mysql> select * from partitions where create_time=0 limit 1\G;
*************************** 1. row ***************************
PART_ID: 1028584
CREATE_TIME: 0
LAST_ACCESS_TIME: 1502203611
PART_NAME: date=20170130
SD_ID: 1543605
TBL_ID: 211605
LINK_TARGET_ID: NULL
1 row in set (0.27 sec)
```
## How was this patch tested?
N/A
Author: debugger87 <yangchaozhong.2009@gmail.com>
Author: Chaozhong Yang <yangchaozhong.2009@gmail.com>
Closes#18900 from debugger87/fix/set-create-time-for-hive-partition.
## What changes were proposed in this pull request?
Address comments in #21370 and add more test.
## How was this patch tested?
Enhance test in pyspark/sql/test.py and DataFrameSuite
Author: Yuanjian Li <xyliyuanjian@gmail.com>
Closes#21553 from xuanyuanking/SPARK-24215-follow.
## What changes were proposed in this pull request?
The PR adds the SQL function ```sequence```.
https://issues.apache.org/jira/browse/SPARK-23927
The behavior of the function is based on Presto's one.
Ref: https://prestodb.io/docs/current/functions/array.html
- ```sequence(start, stop) → array<bigint>```
Generate a sequence of integers from ```start``` to ```stop```, incrementing by ```1``` if ```start``` is less than or equal to ```stop```, otherwise ```-1```.
- ```sequence(start, stop, step) → array<bigint>```
Generate a sequence of integers from ```start``` to ```stop```, incrementing by ```step```.
- ```sequence(start_date, stop_date) → array<date>```
Generate a sequence of dates from ```start_date``` to ```stop_date```, incrementing by ```interval 1 day``` if ```start_date``` is less than or equal to ```stop_date```, otherwise ```- interval 1 day```.
- ```sequence(start_date, stop_date, step_interval) → array<date>```
Generate a sequence of dates from ```start_date``` to ```stop_date```, incrementing by ```step_interval```. The type of ```step_interval``` is ```CalendarInterval```.
- ```sequence(start_timestemp, stop_timestemp) → array<timestamp>```
Generate a sequence of timestamps from ```start_timestamps``` to ```stop_timestamps```, incrementing by ```interval 1 day``` if ```start_date``` is less than or equal to ```stop_date```, otherwise ```- interval 1 day```.
- ```sequence(start_timestamp, stop_timestamp, step_interval) → array<timestamp>```
Generate a sequence of timestamps from ```start_timestamps``` to ```stop_timestamps```, incrementing by ```step_interval```. The type of ```step_interval``` is ```CalendarInterval```.
## How was this patch tested?
Added unit tests.
Author: Vayda, Oleksandr: IT (PRG) <Oleksandr.Vayda@barclayscapital.com>
Closes#21155 from wajda/feature/array-api-sequence.
## What changes were proposed in this pull request?
In PR, I propose new behavior of `size(null)` under the config flag `spark.sql.legacy.sizeOfNull`. If the former one is disabled, the `size()` function returns `null` for `null` input. By default the `spark.sql.legacy.sizeOfNull` is enabled to keep backward compatibility with previous versions. In that case, `size(null)` returns `-1`.
## How was this patch tested?
Modified existing tests for the `size()` function to check new behavior (`null`) and old one (`-1`).
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21598 from MaxGekk/legacy-size-of-null.
## What changes were proposed in this pull request?
Here is the description in the JIRA -
Currently, our JDBC connector provides the option `dbtable` for users to specify the to-be-loaded JDBC source table.
```SQL
val jdbcDf = spark.read
.format("jdbc")
.option("dbtable", "dbName.tableName")
.options(jdbcCredentials: Map)
.load()
```
Normally, users do not fetch the whole JDBC table due to the poor performance/throughput of JDBC. Thus, they normally just fetch a small set of tables. For advanced users, they can pass a subquery as the option.
```SQL
val query = """ (select * from tableName limit 10) as tmp """
val jdbcDf = spark.read
.format("jdbc")
.option("dbtable", query)
.options(jdbcCredentials: Map)
.load()
```
However, this is straightforward to end users. We should simply allow users to specify the query by a new option `query`. We will handle the complexity for them.
```SQL
val query = """select * from tableName limit 10"""
val jdbcDf = spark.read
.format("jdbc")
.option("query", query)
.options(jdbcCredentials: Map)
.load()
```
## How was this patch tested?
Added tests in JDBCSuite and JDBCWriterSuite.
Also tested against MySQL, Postgress, Oracle, DB2 (using docker infrastructure) to make sure there are no syntax issues.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21590 from dilipbiswal/SPARK-24423.
## What changes were proposed in this pull request?
Presto's implementation accepts arbitrary arrays of primitive types as an input:
```
presto> SELECT array_join(ARRAY [1, 2, 3], ', ');
_col0
---------
1, 2, 3
(1 row)
```
This PR proposes to implement a type coercion rule for ```array_join``` function that converts arrays of primitive as well as non-primitive types to arrays of string.
## How was this patch tested?
New test cases add into:
- sql-tests/inputs/typeCoercion/native/arrayJoin.sql
- DataFrameFunctionsSuite.scala
Author: Marek Novotny <mn.mikke@gmail.com>
Closes#21620 from mn-mikke/SPARK-24636.
This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted.
For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem.
Closes#21558
Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Ryan Blue <blue@apache.org>
Closes#21606 from vanzin/SPARK-24552.2.
Use LongAdder to make SQLMetrics thread safe.
## What changes were proposed in this pull request?
Replace += with LongAdder.add() for concurrent counting
## How was this patch tested?
Unit tests with local threads
Author: Stacy Kerkela <stacy.kerkela@databricks.com>
Closes#21634 from dbkerkela/sqlmetrics-concurrency-stacy.
## What changes were proposed in this pull request?
In function array_zip, when split is required by the high number of arguments, a codegen error can happen.
The PR fixes codegen for cases when splitting the code is required.
## How was this patch tested?
added UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21621 from mgaido91/SPARK-24633.
## What changes were proposed in this pull request?
1. Add parameter 'cascade' in CacheManager.uncacheQuery(). Under 'cascade=false' mode, only invalidate the current cache, and for other dependent caches, rebuild execution plan and reuse cached buffer.
2. Pass true/false from callers in different uncache scenarios:
- Drop tables and regular (persistent) views: regular mode
- Drop temporary views: non-cascading mode
- Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
- Call `DataSet.unpersist()`: non-cascading mode
- Call `Catalog.uncacheTable()`: follow the same convention as drop tables/view, which is, use non-cascading mode for temporary views and regular mode for the rest
Note that a regular (persistent) view is a database object just like a table, so after dropping a regular view (whether cached or not cached), any query referring to that view should no long be valid. Hence if a cached persistent view is dropped, we need to invalidate the all dependent caches so that exceptions will be thrown for any later reference. On the other hand, a temporary view is in fact equivalent to an unnamed DataSet, and dropping a temporary view should have no impact on queries referencing that view. Thus we should do non-cascading uncaching for temporary views, which also guarantees a consistent uncaching behavior between temporary views and unnamed DataSets.
## How was this patch tested?
New tests in CachedTableSuite and DatasetCacheSuite.
Author: Maryann Xue <maryannxue@apache.org>
Closes#21594 from maryannxue/noncascading-cache.
## What changes were proposed in this pull request?
This is a follow-up pr of #21045 which added `arrays_zip`.
The `arrays_zip` in functions.scala should've been `scala.annotation.varargs`.
This pr makes it `scala.annotation.varargs`.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21630 from ueshin/issues/SPARK-23931/fup1.
## What changes were proposed in this pull request?
This pr modified JDBC datasource code to verify and normalize a partition column based on the JDBC resolved schema before building `JDBCRelation`.
Closes#20370
## How was this patch tested?
Added tests in `JDBCSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21379 from maropu/SPARK-24327.
## What changes were proposed in this pull request?
Currently, a `pandas_udf` of type `PandasUDFType.GROUPED_MAP` will assign the resulting columns based on index of the return pandas.DataFrame. If a new DataFrame is returned and constructed using a dict, then the order of the columns could be arbitrary and be different than the defined schema for the UDF. If the schema types still match, then no error will be raised and the user will see column names and column data mixed up.
This change will first try to assign columns using the return type field names. If a KeyError occurs, then the column index is checked if it is string based. If so, then the error is raised as it is most likely a naming mistake, else it will fallback to assign columns by position and raise a TypeError if the field types do not match.
## How was this patch tested?
Added a test that returns a new DataFrame with column order different than the schema.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#21427 from BryanCutler/arrow-grouped-map-mixesup-cols-SPARK-24324.
## What changes were proposed in this pull request?
This pr added benchmark code `FilterPushdownBenchmark` for string pushdown and updated performance results on the AWS `r3.xlarge`.
## How was this patch tested?
N/A
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21288 from maropu/UpdateParquetBenchmark.
## What changes were proposed in this pull request?
Currently, restrictions in JSONOptions for `encoding` and `lineSep` are the same for read and for write. For example, a requirement for `lineSep` in the code:
```
df.write.option("encoding", "UTF-32BE").json(file)
```
doesn't allow to skip `lineSep` and use its default value `\n` because it throws the exception:
```
equirement failed: The lineSep option must be specified for the UTF-32BE encoding
java.lang.IllegalArgumentException: requirement failed: The lineSep option must be specified for the UTF-32BE encoding
```
In the PR, I propose to separate JSONOptions in read and write, and make JSONOptions in write less restrictive.
## How was this patch tested?
Added new test for blacklisted encodings in read. And the `lineSep` option was removed in write for some tests.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>
Closes#21247 from MaxGekk/json-options-in-write.
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/19080 we simplified the distribution/partitioning framework, and make all the join-like operators require `HashClusteredDistribution` from children. Unfortunately streaming join operator was missed.
This can cause wrong result. Think about
```
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]
val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
```
The physical plan is
```
*(3) Project [a#5]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
:- Exchange hashpartitioning(a#5, b#6, 5)
: +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6]
: +- StreamingRelation MemoryStream[value#1], [value#1]
+- Exchange hashpartitioning(b#11, 5)
+- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11]
+- StreamingRelation MemoryStream[value#3], [value#3]
```
The left table is hash partitioned by `a, b`, while the right table is hash partitioned by `b`. This means, we may have a matching record that is in different partitions, which should be in the output but not.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21587 from cloud-fan/join.
## What changes were proposed in this pull request?
Wrap the logical plan with a `AnalysisBarrier` for execution plan compilation in CacheManager, in order to avoid the plan being analyzed again.
## How was this patch tested?
Add one test in `DatasetCacheSuite`
Author: Maryann Xue <maryannxue@apache.org>
Closes#21602 from maryannxue/cache-mismatch.
When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.
The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.
This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#21577 from vanzin/SPARK-24552.
## What changes were proposed in this pull request?
For the function ```def array_contains(column: Column, value: Any): Column ``` , if we pass the `value` parameter as a Column type, it will yield a runtime exception.
This PR proposes a pattern matching to detect if `value` is of type Column. If yes, it will use the .expr of the column, otherwise it will work as it used to.
Same thing for ```array_position, array_remove and element_at``` functions
## How was this patch tested?
Unit test modified to cover this code change.
Ping ueshin
Author: Chongguang LIU <chong@Chongguangs-MacBook-Pro.local>
Closes#21581 from chongguang/SPARK-24574.
## What changes were proposed in this pull request?
In the PR, I propose to automatically convert a `Literal` with `Char` type to a `Literal` of `String` type. Currently, the following code:
```scala
val df = Seq("Amsterdam", "San Francisco", "London").toDF("city")
df.where($"city".contains('o')).show(false)
```
fails with the exception:
```
Unsupported literal type class java.lang.Character o
java.lang.RuntimeException: Unsupported literal type class java.lang.Character o
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
```
The PR fixes this issue by converting `char` to `string` of length `1`. I believe it makes sense to does not differentiate `char` and `string(1)` in _a unified, multi-language data platform_ like Spark which supports languages like Python/R.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>
Closes#21578 from MaxGekk/support-char-literals.
## What changes were proposed in this pull request?
Add array_distinct to remove duplicate value from the array.
## How was this patch tested?
Add unit tests
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#21050 from huaxingao/spark-23912.
## What changes were proposed in this pull request?
As discussed [before](https://github.com/apache/spark/pull/19193#issuecomment-393726964), this PR prohibits window expressions inside WHERE and HAVING clauses.
## How was this patch tested?
This PR comes with a dedicated unit test.
Author: aokolnychyi <anton.okolnychyi@sap.com>
Closes#21580 from aokolnychyi/spark-24575.
## What changes were proposed in this pull request?
This patch is removing invalid comment from SparkStrategies, given that TODO-like comment is no longer preferred one as the comment: https://github.com/apache/spark/pull/21388#issuecomment-396856235
Removing invalid comment will prevent contributors to spend their times which is not going to be merged.
## How was this patch tested?
N/A
Author: Jungtaek Lim <kabhwan@gmail.com>
Closes#21595 from HeartSaVioR/MINOR-remove-invalid-comment-on-spark-strategies.
## What changes were proposed in this pull request?
Change insert input schema type: "insertRelationType" -> "insertRelationType.asNullable", in order to avoid nullable being overridden.
## How was this patch tested?
Added one test in InsertSuite.
Author: Maryann Xue <maryannxue@apache.org>
Closes#21585 from maryannxue/spark-24583.
## What changes were proposed in this pull request?
Currently, the micro-batches in the MicroBatchExecution is not exposed to the user through any public API. This was because we did not want to expose the micro-batches, so that all the APIs we expose, we can eventually support them in the Continuous engine. But now that we have better sense of buiding a ContinuousExecution, I am considering adding APIs which will run only the MicroBatchExecution. I have quite a few use cases where exposing the microbatch output as a dataframe is useful.
- Pass the output rows of each batch to a library that is designed only the batch jobs (example, uses many ML libraries need to collect() while learning).
- Reuse batch data sources for output whose streaming version does not exists (e.g. redshift data source).
- Writer the output rows to multiple places by writing twice for each batch. This is not the most elegant thing to do for multiple-output streaming queries but is likely to be better than running two streaming queries processing the same data twice.
The proposal is to add a method `foreachBatch(f: Dataset[T] => Unit)` to Scala/Java/Python `DataStreamWriter`.
## How was this patch tested?
New unit tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21571 from tdas/foreachBatch.
## What changes were proposed in this pull request?
Currently, ReusedExchange and InMemoryTableScanExec only rewrite output partitioning if child's partitioning is HashPartitioning and do nothing for other partitioning, e.g., RangePartitioning. We should always rewrite it, otherwise, unnecessary shuffle could be introduced like https://issues.apache.org/jira/browse/SPARK-24556.
## How was this patch tested?
Add new tests.
Author: yucai <yyu1@ebay.com>
Closes#21564 from yucai/SPARK-24556.
## What changes were proposed in this pull request?
test("withColumn doesn't invalidate cached dataframe") in CachedTableSuite doesn't not work because:
The UDF is executed and test count incremented when "df.cache()" is called and the subsequent "df.collect()" has no effect on the test result.
This PR fixed this test and add another test for caching UDF.
## How was this patch tested?
Add new tests.
Author: Li Jin <ice.xelloss@gmail.com>
Closes#21531 from icexelloss/fix-cache-test.
## What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/21503, to completely move operator pushdown to the planner rule.
The code are mostly from https://github.com/apache/spark/pull/21319
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21574 from cloud-fan/followup.
## What changes were proposed in this pull request?
When creating tuple expression encoders, we should give the serializer expressions of tuple items correct names, so we can have correct output schema when we use such tuple encoders.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#21576 from viirya/SPARK-24548.
## What changes were proposed in this pull request?
This pr added a new JSON option `dropFieldIfAllNull ` to ignore column of all null values or empty array/struct during JSON schema inference.
## How was this patch tested?
Added tests in `JsonSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Author: Xiangrui Meng <meng@databricks.com>
Closes#20929 from maropu/SPARK-23772.
## What changes were proposed in this pull request?
Provide an option to limit number of rows in a MemorySink. Currently, MemorySink and MemorySinkV2 have unbounded size, meaning that if they're used on big data, they can OOM the stream. This change adds a maxMemorySinkRows option to limit how many rows MemorySink and MemorySinkV2 can hold. By default, they are still unbounded.
## How was this patch tested?
Added new unit tests.
Author: Mukul Murthy <mukul.murthy@databricks.com>
Closes#21559 from mukulmurthy/SPARK-24525.
## What changes were proposed in this pull request?
This PR fixes possible overflow in int add or multiply. In particular, their overflows in multiply are detected by [Spotbugs](https://spotbugs.github.io/)
The following assignments may cause overflow in right hand side. As a result, the result may be negative.
```
long = int * int
long = int + int
```
To avoid this problem, this PR performs cast from int to long in right hand side.
## How was this patch tested?
Existing UTs.
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21481 from kiszk/SPARK-24452.
## What changes were proposed in this pull request?
This PR adds `foreach` for streaming queries in Python. Users will be able to specify their processing logic in two different ways.
- As a function that takes a row as input.
- As an object that has methods `open`, `process`, and `close` methods.
See the python docs in this PR for more details.
## How was this patch tested?
Added java and python unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21477 from tdas/SPARK-24396.
## What changes were proposed in this pull request?
This removes the v2 optimizer rule for push-down and instead pushes filters and required columns when converting to a physical plan, as suggested by marmbrus. This makes the v2 relation cleaner because the output and filters do not change in the logical plan.
A side-effect of this change is that the stats from the logical (optimized) plan no longer reflect pushed filters and projection. This is a temporary state, until the planner gathers stats from the physical plan instead. An alternative to this approach is 9d3a11e68b.
The first commit was proposed in #21262. This PR replaces #21262.
## How was this patch tested?
Existing tests.
Author: Ryan Blue <blue@apache.org>
Closes#21503 from rdblue/SPARK-24478-move-push-down-to-physical-conversion.
## What changes were proposed in this pull request?
In the PR, I propose to support any DataType represented as DDL string for the from_json function. After the changes, it will be possible to specify `MapType` in SQL like:
```sql
select from_json('{"a":1, "b":2}', 'map<string, int>')
```
and in Scala (similar in other languages)
```scala
val in = Seq("""{"a": {"b": 1}}""").toDS()
val schema = "map<string, map<string, int>>"
val out = in.select(from_json($"value", schema, Map.empty[String, String]))
```
## How was this patch tested?
Added a couple sql tests and modified existing tests for Python and Scala. The former tests were modified because it is not imported for them in which format schema for `from_json` is provided.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21550 from MaxGekk/from_json-ddl-schema.
## What changes were proposed in this pull request?
`EnsureRequirement` in its `reorder` method currently assumes that the same key appears only once in the join condition. This of course might not be the case, and when it is not satisfied, it returns a wrong plan which produces a wrong result of the query.
## How was this patch tested?
added UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21529 from mgaido91/SPARK-24495.
## What changes were proposed in this pull request?
https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit
Implement continuous shuffle write RDD for a single reader partition. (I don't believe any implementation changes are actually required for multiple reader partitions, but this PR is already very large, so I want to exclude those for now to keep the size down.)
## How was this patch tested?
new unit tests
Author: Jose Torres <torres.joseph.f+github@gmail.com>
Closes#21428 from jose-torres/writerTask.
## What changes were proposed in this pull request?
If you construct catalyst trees using `scala.collection.immutable.Stream` you can run into situations where valid transformations do not seem to have any effect. There are two causes for this behavior:
- `Stream` is evaluated lazily. Note that default implementation will generally only evaluate a function for the first element (this makes testing a bit tricky).
- `TreeNode` and `QueryPlan` use side effects to detect if a tree has changed. Mapping over a stream is lazy and does not need to trigger this side effect. If this happens the node will invalidly assume that it did not change and return itself instead if the newly created node (this is for GC reasons).
This PR fixes this issue by forcing materialization on streams in `TreeNode` and `QueryPlan`.
## How was this patch tested?
Unit tests were added to `TreeNodeSuite` and `LogicalPlanSuite`. An integration test was added to the `PlannerSuite`
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#21539 from hvanhovell/SPARK-24500.