## What changes were proposed in this pull request?
Add a flag to ignore corrupt files. For Spark core, the configuration is `spark.files.ignoreCorruptFiles`. For Spark SQL, it's `spark.sql.files.ignoreCorruptFiles`.
## How was this patch tested?
The added unit tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#15422 from zsxwing/SPARK-17850.
## What changes were proposed in this pull request?
This change adds a check in castToInterval method of Cast expression , such that if converted value is null , then isNull variable should be set to true.
Earlier, the expression Cast(Literal(), CalendarIntervalType) was throwing NullPointerException because of the above mentioned reason.
## How was this patch tested?
Added test case in CastSuite.scala
jira entry for detail: https://issues.apache.org/jira/browse/SPARK-17884
Author: prigarg <prigarg@adobe.com>
Closes#15449 from priyankagargnitk/SPARK-17884.
## What changes were proposed in this pull request?
SQLConf is session-scoped and mutable. However, we do have the requirement for a static SQL conf, which is global and immutable, e.g. the `schemaStringThreshold` in `HiveExternalCatalog`, the flag to enable/disable hive support, the global temp view database in https://github.com/apache/spark/pull/14897.
Actually we've already implemented static SQL conf implicitly via `SparkConf`, this PR just make it explicit and expose it to users, so that they can see the config value via SQL command or `SparkSession.conf`, and forbid users to set/unset static SQL conf.
## How was this patch tested?
new tests in SQLConfSuite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#15295 from cloud-fan/global-conf.
## What changes were proposed in this pull request?
Currently `Canonicalize` object doesn't support `And` and `Or`. So we can compare canonicalized form of predicates consistently. We should add the support.
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#15388 from viirya/canonicalize-and-or.
## What changes were proposed in this pull request?
The data type API has not been changed since Spark 1.3.0, and is ready for graduation. This patch marks them as stable APIs using the new InterfaceStability annotation.
This patch also looks at the various files in the catalyst module (not the "package") and marks the remaining few classes appropriately as well.
## How was this patch tested?
This is an annotation change. No functional changes.
Author: Reynold Xin <rxin@databricks.com>
Closes#15426 from rxin/SPARK-17864.
## What changes were proposed in this pull request?
address post hoc review comments for https://github.com/apache/spark/pull/14897
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#15424 from cloud-fan/global-temp-view.
## What changes were proposed in this pull request?
When I was creating the example code for SPARK-10496, I realized it was pretty convoluted to define the frame boundaries for window functions when there is no partition column or ordering column. The reason is that we don't provide a way to create a WindowSpec directly with the frame boundaries. We can trivially improve this by adding rowsBetween and rangeBetween to Window object.
As an example, to compute cumulative sum using the natural ordering, before this pr:
```
df.select('key, sum("value").over(Window.partitionBy(lit(1)).rowsBetween(Long.MinValue, 0)))
```
After this pr:
```
df.select('key, sum("value").over(Window.rowsBetween(Long.MinValue, 0)))
```
Note that you could argue there is no point specifying a window frame without partitionBy/orderBy -- but it is strange that only rowsBetween and rangeBetween are not the only two APIs not available.
This also fixes https://issues.apache.org/jira/browse/SPARK-17656 (removing _root_.scala).
## How was this patch tested?
Added test cases to compute cumulative sum in DataFrameWindowSuite for Scala/Java and tests.py for Python.
Author: Reynold Xin <rxin@databricks.com>
Closes#15412 from rxin/SPARK-17844.
## What changes were proposed in this pull request?
This PR proposes to fix arbitrary usages among `Map[String, String]`, `Properties` and `JDBCOptions` instances for options in `execution/jdbc` package and make the connection properties exclude Spark-only options.
This PR includes some changes as below:
- Unify `Map[String, String]`, `Properties` and `JDBCOptions` in `execution/jdbc` package to `JDBCOptions`.
- Move `batchsize`, `fetchszie`, `driver` and `isolationlevel` options into `JDBCOptions` instance.
- Document `batchSize` and `isolationlevel` with marking both read-only options and write-only options. Also, this includes minor types and detailed explanation for some statements such as url.
- Throw exceptions fast by checking arguments first rather than in execution time (e.g. for `fetchsize`).
- Exclude Spark-only options in connection properties.
## How was this patch tested?
Existing tests should cover this.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#15292 from HyukjinKwon/SPARK-17719.
## What changes were proposed in this pull request?
Currently, CSV datasource allows to load duplicated empty string fields or fields having `nullValue` in the header. It'd be great if this can deal with normal fields as well.
This PR proposes handling the duplicates consistently with the existing behaviour with considering case-sensitivity (`spark.sql.caseSensitive`) as below:
data below:
```
fieldA,fieldB,,FIELDA,fielda,,
1,2,3,4,5,6,7
```
is parsed as below:
```scala
spark.read.format("csv").option("header", "true").load("test.csv").show()
```
- when `spark.sql.caseSensitive` is `false` (by default).
```
+-------+------+---+-------+-------+---+---+
|fieldA0|fieldB|_c2|FIELDA3|fieldA4|_c5|_c6|
+-------+------+---+-------+-------+---+---+
| 1| 2| 3| 4| 5| 6| 7|
+-------+------+---+-------+-------+---+---+
```
- when `spark.sql.caseSensitive` is `true`.
```
+-------+------+---+-------+-------+---+---+
|fieldA0|fieldB|_c2| FIELDA|fieldA4|_c5|_c6|
+-------+------+---+-------+-------+---+---+
| 1| 2| 3| 4| 5| 6| 7|
+-------+------+---+-------+-------+---+---+
```
**In more details**,
There is a good reference about this problem, `read.csv()` in R. So, I initially wanted to propose the similar behaviour.
In case of R, the CSV data below:
```
fieldA,fieldB,,fieldA,fieldA,,
1,2,3,4,5,6,7
```
is parsed as below:
```r
test <- read.csv(file="test.csv",header=TRUE,sep=",")
> test
fieldA fieldB X fieldA.1 fieldA.2 X.1 X.2
1 1 2 3 4 5 6 7
```
However, Spark CSV datasource already is handling duplicated empty strings and `nullValue` as field names. So the data below:
```
,,,fieldA,,fieldB,
1,2,3,4,5,6,7
```
is parsed as below:
```scala
spark.read.format("csv").option("header", "true").load("test.csv").show()
```
```
+---+---+---+------+---+------+---+
|_c0|_c1|_c2|fieldA|_c4|fieldB|_c6|
+---+---+---+------+---+------+---+
| 1| 2| 3| 4| 5| 6| 7|
+---+---+---+------+---+------+---+
```
R starts the number for each duplicate but Spark adds the number for its position for all fields for `nullValue` and empty strings.
In terms of case-sensitivity, it seems R is case-sensitive as below: (it seems it is not configurable).
```
a,a,a,A,A
1,2,3,4,5
```
is parsed as below:
```r
test <- read.csv(file="test.csv",header=TRUE,sep=",")
> test
a a.1 a.2 A A.1
1 1 2 3 4 5
```
## How was this patch tested?
Unit test in `CSVSuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#14745 from HyukjinKwon/SPARK-16896.
## What changes were proposed in this pull request?
The default buffer size is not big enough for randomly generated MapType.
## How was this patch tested?
Ran the tests in 100 times, it never fail (it fail 8 times before the patch).
Author: Davies Liu <davies@databricks.com>
Closes#15395 from davies/flaky_map.
## What changes were proposed in this pull request?
This patch annotates the InterfaceStability level for top level classes in o.a.spark.sql and o.a.spark.sql.util packages, to experiment with this new annotation.
## How was this patch tested?
N/A
Author: Reynold Xin <rxin@databricks.com>
Closes#15392 from rxin/SPARK-17830.
## What changes were proposed in this pull request?
The function `SparkSqlParserSuite.createTempViewUsing` is not used for now and causes build failure, this PR simply removes it.
## How was this patch tested?
N/A
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#15418 from jiangxb1987/parserSuite.
## What changes were proposed in this pull request?
Global temporary view is a cross-session temporary view, which means it's shared among all sessions. Its lifetime is the lifetime of the Spark application, i.e. it will be automatically dropped when the application terminates. It's tied to a system preserved database `global_temp`(configurable via SparkConf), and we must use the qualified name to refer a global temp view, e.g. SELECT * FROM global_temp.view1.
changes for `SessionCatalog`:
1. add a new field `gloabalTempViews: GlobalTempViewManager`, to access the shared global temp views, and the global temp db name.
2. `createDatabase` will fail if users wanna create `global_temp`, which is system preserved.
3. `setCurrentDatabase` will fail if users wanna set `global_temp`, which is system preserved.
4. add `createGlobalTempView`, which is used in `CreateViewCommand` to create global temp views.
5. add `dropGlobalTempView`, which is used in `CatalogImpl` to drop global temp view.
6. add `alterTempViewDefinition`, which is used in `AlterViewAsCommand` to update the view definition for local/global temp views.
7. `renameTable`/`dropTable`/`isTemporaryTable`/`lookupRelation`/`getTempViewOrPermanentTableMetadata`/`refreshTable` will handle global temp views.
changes for SQL commands:
1. `CreateViewCommand`/`AlterViewAsCommand` is updated to support global temp views
2. `ShowTablesCommand` outputs a new column `database`, which is used to distinguish global and local temp views.
3. other commands can also handle global temp views if they call `SessionCatalog` APIs which accepts global temp views, e.g. `DropTableCommand`, `AlterTableRenameCommand`, `ShowColumnsCommand`, etc.
changes for other public API
1. add a new method `dropGlobalTempView` in `Catalog`
2. `Catalog.findTable` can find global temp view
3. add a new method `createGlobalTempView` in `Dataset`
## How was this patch tested?
new tests in `SQLViewSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#14897 from cloud-fan/global-temp-view.
## What changes were proposed in this pull request?
Currently we use the same rule to parse top level and nested data fields. For example:
```
create table tbl_x(
id bigint,
nested struct<col1:string,col2:string>
)
```
Shows both syntaxes. In this PR we split this rule in a top-level and nested rule.
Before this PR,
```
sql("CREATE TABLE my_tab(column1: INT)")
```
works fine.
After this PR, it will throw a `ParseException`:
```
scala> sql("CREATE TABLE my_tab(column1: INT)")
org.apache.spark.sql.catalyst.parser.ParseException:
no viable alternative at input 'CREATE TABLE my_tab(column1:'(line 1, pos 27)
```
## How was this patch tested?
Add new testcases in `SparkSqlParserSuite`.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#15346 from jiangxb1987/cdt.
## What changes were proposed in this pull request?
The `quotedString` method in `TableIdentifier` and `FunctionIdentifier` produce an illegal (un-parseable) name when the name contains a backtick. For example:
```
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
val complexName = TableIdentifier("`weird`table`name", Some("`d`b`1"))
parseTableIdentifier(complexName.unquotedString) // Does not work
parseTableIdentifier(complexName.quotedString) // Does not work
parseExpression(complexName.unquotedString) // Does not work
parseExpression(complexName.quotedString) // Does not work
```
We should handle the backtick properly to make `quotedString` parseable.
## How was this patch tested?
Add new testcases in `TableIdentifierParserSuite` and `ExpressionParserSuite`.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#15403 from jiangxb1987/backtick.
## What changes were proposed in this pull request?
This PR modified the test case `test("script")` to use resource path for `test_script.sh`. Make the test case portable (even in IntelliJ).
## How was this patch tested?
Passed the test case.
Before:
Run `test("script")` in IntelliJ:
```
Caused by: org.apache.spark.SparkException: Subprocess exited with status 127. Error: bash: src/test/resources/test_script.sh: No such file or directory
```
After:
Test passed.
Author: Weiqing Yang <yangweiqing001@gmail.com>
Closes#15246 from weiqingy/hivetest.
## What changes were proposed in this pull request?
This PR proposes the fix the use of `contains` API which only exists from Scala 2.11.
## How was this patch tested?
Manually checked:
```scala
scala> val o: Option[Boolean] = None
o: Option[Boolean] = None
scala> o == Some(false)
res17: Boolean = false
scala> val o: Option[Boolean] = Some(true)
o: Option[Boolean] = Some(true)
scala> o == Some(false)
res18: Boolean = false
scala> val o: Option[Boolean] = Some(false)
o: Option[Boolean] = Some(false)
scala> o == Some(false)
res19: Boolean = true
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#15393 from HyukjinKwon/hotfix.
## What changes were proposed in this pull request?
In HashJoin, we try to rewrite the join key as Long to improve the performance of finding a match. The rewriting part is not well tested, has a bug that could cause wrong result when there are at least three integral columns in the joining key also the total length of the key exceed 8 bytes.
## How was this patch tested?
Added unit test to covering the rewriting with different number of columns and different data types. Manually test the reported case and confirmed that this PR fix the bug.
Author: Davies Liu <davies@databricks.com>
Closes#15390 from davies/rewrite_key.
## What changes were proposed in this pull request?
In practice we cannot guarantee that an `InternalRow` is immutable. This makes the `MutableRow` almost redundant. This PR folds `MutableRow` into `InternalRow`.
The code below illustrates the immutability issue with InternalRow:
```scala
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
val struct = new GenericMutableRow(1)
val row = InternalRow(struct, 1)
println(row)
scala> [[null], 1]
struct.setInt(0, 42)
println(row)
scala> [[42], 1]
```
This might be somewhat controversial, so feedback is appreciated.
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#15333 from hvanhovell/SPARK-17761.
## What changes were proposed in this pull request?
When execute a Python UDF, we buffer the input row into as queue, then pull them out to join with the result from Python UDF. In the case that Python UDF is slow or the input row is too wide, we could ran out of memory because of the queue. Since we can't flush all the buffers (sockets) between JVM and Python process from JVM side, we can't limit the rows in the queue, otherwise it could deadlock.
This PR will manage the memory used by the queue, spill that into disk when there is no enough memory (also release the memory and disk space as soon as possible).
## How was this patch tested?
Added unit tests. Also manually ran a workload with large input row and slow python UDF (with large broadcast) like this:
```
b = range(1<<24)
add = udf(lambda x: x + len(b), IntegerType())
df = sqlContext.range(1, 1<<26, 1, 4)
print df.select(df.id, lit("adf"*10000).alias("s"), add(df.id).alias("add")).groupBy(length("s")).sum().collect()
```
It ran out of memory (hang because of full GC) before the patch, ran smoothly after the patch.
Author: Davies Liu <davies@databricks.com>
Closes#15089 from davies/spill_udf.
## What changes were proposed in this pull request?
Adds the textFile API which exists in DataFrameReader and serves same purpose.
## How was this patch tested?
Added corresponding testcase.
Author: Prashant Sharma <prashsh1@in.ibm.com>
Closes#14087 from ScrapCodes/textFile.
## What changes were proposed in this pull request?
This PR proposes cleaning up the confusing part in `createRelation` as discussed in https://github.com/apache/spark/pull/12601/files#r80627940
Also, this PR proposes the changes below:
- Add documentation for `batchsize` and `isolationLevel`.
- Move property names into `JDBCOptions` so that they can be managed in a single place. which were, `fetchsize`, `batchsize`, `isolationLevel` and `driver`.
## How was this patch tested?
Existing tests should cover this.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#15263 from HyukjinKwon/SPARK-14525.
## What changes were proposed in this pull request?
This expands calls to Jetty's simple `ServerConnector` constructor to explicitly specify a `ScheduledExecutorScheduler` that makes daemon threads. It should otherwise result in exactly the same configuration, because the other args are copied from the constructor that is currently called.
(I'm not sure we should change the Hive Thriftserver impl, but I did anyway.)
This also adds `sc.stop()` to the quick start guide example.
## How was this patch tested?
Existing tests; _pending_ at least manual verification of the fix.
Author: Sean Owen <sowen@cloudera.com>
Closes#15381 from srowen/SPARK-17707.
## What changes were proposed in this pull request?
When using an incompatible source for structured streaming, it may throw NoClassDefFoundError. It's better to just catch Throwable and report it to the user since the streaming thread is dying.
## How was this patch tested?
`test("NoClassDefFoundError from an incompatible source")`
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#15352 from zsxwing/SPARK-17780.
## What changes were proposed in this pull request?
I was looking through API annotations to catch mislabeled APIs, and realized DataStreamReader and DataStreamWriter classes are already annotated as Experimental, and as a result there is no need to annotate each method within them.
## How was this patch tested?
N/A
Author: Reynold Xin <rxin@databricks.com>
Closes#15373 from rxin/SPARK-17798.
## What changes were proposed in this pull request?
Currently, Spark raises `RuntimeException` when creating a view with timestamp with INTERVAL arithmetic like the following. The root cause is the arithmetic expression, `TimeAdd`, was transformed into `timeadd` function as a VIEW definition. This PR fixes the SQL definition of `TimeAdd` and `TimeSub` expressions.
```scala
scala> sql("CREATE TABLE dates (ts TIMESTAMP)")
scala> sql("CREATE VIEW view1 AS SELECT ts + INTERVAL 1 DAY FROM dates")
java.lang.RuntimeException: Failed to analyze the canonicalized SQL: ...
```
## How was this patch tested?
Pass Jenkins with a new testcase.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#15318 from dongjoon-hyun/SPARK-17750.
## What changes were proposed in this pull request?
Generate the sql test jar to fix the maven build
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#15368 from zsxwing/sql-test-jar.
## What changes were proposed in this pull request?
This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source.
It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
tdas did most of work and part of them was inspired by koeninger's work.
### Introduction
The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows:
Column | Type
---- | ----
key | binary
value | binary
topic | string
partition | int
offset | long
timestamp | long
timestampType | int
The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic.
### Configuration
The user can use `DataStreamReader.option` to set the following configurations.
Kafka Source's options | value | default | meaning
------ | ------- | ------ | -----
startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off.
failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors
fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets.
fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets
Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")`
### Usage
* Subscribe to 1 topic
```Scala
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "topic1")
.load()
```
* Subscribe to multiple topics
```Scala
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "topic1,topic2")
.load()
```
* Subscribe to a pattern
```Scala
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribePattern", "topic.*")
.load()
```
## How was this patch tested?
The new unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Author: cody koeninger <cody@koeninger.org>
Closes#15102 from zsxwing/kafka-source.
## What changes were proposed in this pull request?
The result of the `Last` function can be wrong when the last partition processed is empty. It can return `null` instead of the expected value. For example, this can happen when we process partitions in the following order:
```
- Partition 1 [Row1, Row2]
- Partition 2 [Row3]
- Partition 3 []
```
In this case the `Last` function will currently return a null, instead of the value of `Row3`.
This PR fixes this by adding a `valueSet` flag to the `Last` function.
## How was this patch tested?
We only used end to end tests for `DeclarativeAggregateFunction`s. I have added an evaluator for these functions so we can tests them in catalyst. I have added a `LastTestSuite` to test the `Last` aggregate function.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#15348 from hvanhovell/SPARK-17758.
## What changes were proposed in this pull request?
This PR fixes the following NPE scenario in two ways.
**Reported Error Scenario**
```scala
scala> sql("EXPLAIN DESCRIBE TABLE x").show(truncate = false)
INFO SparkSqlParser: Parsing command: EXPLAIN DESCRIBE TABLE x
java.lang.NullPointerException
```
- **DESCRIBE**: Extend `DESCRIBE` syntax to accept `TABLE`.
- **EXPLAIN**: Prevent NPE in case of the parsing failure of target statement, e.g., `EXPLAIN DESCRIBE TABLES x`.
## How was this patch tested?
Pass the Jenkins test with a new test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#15357 from dongjoon-hyun/SPARK-17328.
## What changes were proposed in this pull request?
Currently Spark SQL parses regular decimal literals (e.g. `10.00`) as decimals and scientific decimal literals (e.g. `10.0e10`) as doubles. The difference between the two confuses most users. This PR unifies the parsing behavior and also parses scientific decimal literals as decimals.
This implications in tests are limited to a single Hive compatibility test.
## How was this patch tested?
Updated tests in `ExpressionParserSuite` and `SQLQueryTestSuite`.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#14828 from hvanhovell/SPARK-17258.
This reverts commit 9ac68dbc57. Turns out
the original fix was correct.
Original change description:
The existing code caches all stats for all columns for each partition
in the driver; for a large relation, this causes extreme memory usage,
which leads to gc hell and application failures.
It seems that only the size in bytes of the data is actually used in the
driver, so instead just colllect that. In executors, the full stats are
still kept, but that's not a big problem; we expect the data to be distributed
and thus not really incur in too much memory pressure in each individual
executor.
There are also potential improvements on the executor side, since the data
being stored currently is very wasteful (e.g. storing boxed types vs.
primitive types for stats). But that's a separate issue.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#15304 from vanzin/SPARK-17549.2.
## What changes were proposed in this pull request?
Made changes to record length offsets to make them uniform throughout various areas of Spark core and unsafe
## How was this patch tested?
This change affects only SPARC architectures and was tested on X86 architectures as well for regression.
Author: sumansomasundar <suman.somasundar@oracle.com>
Closes#14762 from sumansomasundar/master.
## What changes were proposed in this pull request?
Added VoidObjectInspector to the list of PrimitiveObjectInspectors
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Executing following query was failing.
select SOME_UDAF*(a.arr)
from (
select Array(null) as arr from dim_one_row
) a
After the fix, I am getting the correct output:
res0: Array[org.apache.spark.sql.Row] = Array([null])
Author: Ergin Seyfe <eseyfe@fb.com>
Closes#15337 from seyfe/add_void_object_inspector.
## What changes were proposed in this pull request?
Code generation including too many mutable states exceeds JVM size limit to extract values from `references` into fields in the constructor.
We should split the generated extractions in the constructor into smaller functions.
## How was this patch tested?
I added some tests to check if the generated codes for the expressions exceed or not.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#15275 from ueshin/issues/SPARK-17702.
## What changes were proposed in this pull request?
We currently only allow relatively simple expressions as the input for a value based case statement. Expressions like `case (a > 1) or (b = 2) when true then 1 when false then 0 end` currently fail. This PR adds support for such expressions.
## How was this patch tested?
Added a test to the ExpressionParserSuite.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#15322 from hvanhovell/SPARK-17753.
## What changes were proposed in this pull request?
Generate basic column statistics for all the atomic types:
- numeric types: max, min, num of nulls, ndv (number of distinct values)
- date/timestamp types: they are also represented as numbers internally, so they have the same stats as above.
- string: avg length, max length, num of nulls, ndv
- binary: avg length, max length, num of nulls
- boolean: num of nulls, num of trues, num of falsies
Also support storing and loading these statistics.
One thing to notice:
We support analyzing columns independently, e.g.:
sql1: `ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS key;`
sql2: `ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS value;`
when running sql2 to collect column stats for `value`, we don’t remove stats of columns `key` which are analyzed in sql1 and not in sql2. As a result, **users need to guarantee consistency** between sql1 and sql2. If the table has been changed before sql2, users should re-analyze column `key` when they want to analyze column `value`:
`ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS key, value;`
## How was this patch tested?
add unit tests
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#15090 from wzhfy/colStats.
## What changes were proposed in this pull request?
This PR proposes to fix/skip some tests failed on Windows. This PR takes over https://github.com/apache/spark/pull/12696.
**Before**
- **SparkSubmitSuite**
```
[info] - launch simple application with spark-submit *** FAILED *** (202 milliseconds)
[info] java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specifie
[info] - includes jars passed in through --jars *** FAILED *** (1 second, 625 milliseconds)
[info] java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
```
- **DiskStoreSuite**
```
[info] - reads of memory-mapped and non memory-mapped files are equivalent *** FAILED *** (1 second, 78 milliseconds)
[info] diskStoreMapped.remove(blockId) was false (DiskStoreSuite.scala:41)
```
**After**
- **SparkSubmitSuite**
```
[info] - launch simple application with spark-submit (578 milliseconds)
[info] - includes jars passed in through --jars (1 second, 875 milliseconds)
```
- **DiskStoreSuite**
```
[info] DiskStoreSuite:
[info] - reads of memory-mapped and non memory-mapped files are equivalent !!! CANCELED !!! (766 milliseconds
```
For `CreateTableAsSelectSuite` and `FsHistoryProviderSuite`, I could not reproduce as the Java version seems higher than the one that has the bugs about `setReadable(..)` and `setWritable(...)` but as they are bugs reported clearly, it'd be sensible to skip those. We should revert the changes for both back as soon as we drop the support of Java 7.
## How was this patch tested?
Manually tested via AppVeyor.
Closes#12696
Author: Tao LI <tl@microsoft.com>
Author: U-FAREAST\tl <tl@microsoft.com>
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#15320 from HyukjinKwon/SPARK-14914.
## What changes were proposed in this pull request?
When wrapping catalyst datatypes to Hive data type, wrap function was doing an expensive pattern matching which was consuming around 11% of cpu time. Avoid the pattern matching by returning the wrapper only once and reuse it.
## How was this patch tested?
Tested by running the job on cluster and saw around 8% cpu improvements.
Author: Sital Kedia <skedia@fb.com>
Closes#15064 from sitalkedia/skedia/hive_wrapper.
## What changes were proposed in this pull request?
We added find and exists methods for Databases, Tables and Functions to the user facing Catalog in PR https://github.com/apache/spark/pull/15301. However, it was brought up that the semantics of the `find` methods are more in line a `get` method (get an object or else fail). So we rename these in this PR.
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#15308 from hvanhovell/SPARK-17717-2.
## What changes were proposed in this pull request?
As a followup to SPARK-17666, ensure filesystem connections are not leaked at least in unit tests. This is done here by intercepting filesystem calls as suggested by JoshRosen . At the end of each test, we assert no filesystem streams are left open.
This applies to all tests using SharedSQLContext or SharedSparkContext.
## How was this patch tested?
I verified that tests in sql and core are indeed using the filesystem backend, and fixed the detected leaks. I also checked that reverting https://github.com/apache/spark/pull/15245 causes many actual test failures due to connection leaks.
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>
Closes#15306 from ericl/sc-4672.
## What changes were proposed in this pull request?
Currently, Spark does not collapse adjacent windows with the same partitioning and sorting. This PR implements `CollapseWindow` optimizer to do the followings.
1. If the partition specs and order specs are the same, collapse into the parent.
2. If the partition specs are the same and one order spec is a prefix of the other, collapse to the more specific one.
For example:
```scala
val df = spark.range(1000).select($"id" % 100 as "grp", $"id", rand() as "col1", rand() as "col2")
// Add summary statistics for all columns
import org.apache.spark.sql.expressions.Window
val cols = Seq("id", "col1", "col2")
val window = Window.partitionBy($"grp").orderBy($"id")
val result = cols.foldLeft(df) { (base, name) =>
base.withColumn(s"${name}_avg", avg(col(name)).over(window))
.withColumn(s"${name}_stddev", stddev(col(name)).over(window))
.withColumn(s"${name}_min", min(col(name)).over(window))
.withColumn(s"${name}_max", max(col(name)).over(window))
}
```
**Before**
```scala
scala> result.explain
== Physical Plan ==
Window [max(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_max#234], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [min(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_min#216], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [stddev_samp(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_stddev#191], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [avg(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_avg#167], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [max(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_max#152], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [min(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_min#138], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [stddev_samp(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_stddev#117], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [avg(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_avg#97], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [max(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_max#86L], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [min(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_min#76L], [grp#17L], [id#14L ASC NULLS FIRST]
+- *Project [grp#17L, id#14L, col1#18, col2#19, id_avg#26, id_stddev#42]
+- Window [stddev_samp(_w0#59) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_stddev#42], [grp#17L], [id#14L ASC NULLS FIRST]
+- *Project [grp#17L, id#14L, col1#18, col2#19, id_avg#26, cast(id#14L as double) AS _w0#59]
+- Window [avg(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_avg#26], [grp#17L], [id#14L ASC NULLS FIRST]
+- *Sort [grp#17L ASC NULLS FIRST, id#14L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(grp#17L, 200)
+- *Project [(id#14L % 100) AS grp#17L, id#14L, rand(-6329949029880411066) AS col1#18, rand(-7251358484380073081) AS col2#19]
+- *Range (0, 1000, step=1, splits=Some(8))
```
**After**
```scala
scala> result.explain
== Physical Plan ==
Window [max(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_max#220, min(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_min#202, stddev_samp(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_stddev#177, avg(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_avg#153, max(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_max#138, min(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_min#124, stddev_samp(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_stddev#103, avg(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_avg#83, max(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_max#72L, min(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_min#62L], [grp#3L], [id#0L ASC NULLS FIRST]
+- *Project [grp#3L, id#0L, col1#4, col2#5, id_avg#12, id_stddev#28]
+- Window [stddev_samp(_w0#45) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_stddev#28], [grp#3L], [id#0L ASC NULLS FIRST]
+- *Project [grp#3L, id#0L, col1#4, col2#5, id_avg#12, cast(id#0L as double) AS _w0#45]
+- Window [avg(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_avg#12], [grp#3L], [id#0L ASC NULLS FIRST]
+- *Sort [grp#3L ASC NULLS FIRST, id#0L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(grp#3L, 200)
+- *Project [(id#0L % 100) AS grp#3L, id#0L, rand(6537478539664068821) AS col1#4, rand(-8961093871295252795) AS col2#5]
+- *Range (0, 1000, step=1, splits=Some(8))
```
## How was this patch tested?
Pass the Jenkins tests with a newly added testsuite.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#15317 from dongjoon-hyun/SPARK-17739.
## What changes were proposed in this pull request?
There are many minor objects in references, which are extracted to the generated class field, e.g. `errMsg` in `GetExternalRowField` or `ValidateExternalType`, but number of fields in class is limited so we should reduce the number.
This pr adds unnamed version of `addReferenceObj` for these minor objects not to store the object into field but refer it from the `references` field at the time of use.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#15276 from ueshin/issues/SPARK-17703.
## What changes were proposed in this pull request?
The actualSize() of array and map is different from the actual size, the header is Int, rather than Long.
## How was this patch tested?
The flaky test should be fixed.
Author: Davies Liu <davies@databricks.com>
Closes#15305 from davies/fix_MAP.
## What changes were proposed in this pull request?
The current user facing catalog does not implement methods for checking object existence or finding objects. You could theoretically do this using the `list*` commands, but this is rather cumbersome and can actually be costly when there are many objects. This PR adds `exists*` and `find*` methods for Databases, Table and Functions.
## How was this patch tested?
Added tests to `org.apache.spark.sql.internal.CatalogSuite`
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#15301 from hvanhovell/SPARK-17717.
## What changes were proposed in this pull request?
Currently for `Union [Distinct]`, a `Distinct` operator is necessary to be on the top of `Union`. Once there are adjacent `Union [Distinct]`, there will be multiple `Distinct` in the query plan.
E.g.,
For a query like: select 1 a union select 2 b union select 3 c
Before this patch, its physical plan looks like:
*HashAggregate(keys=[a#13], functions=[])
+- Exchange hashpartitioning(a#13, 200)
+- *HashAggregate(keys=[a#13], functions=[])
+- Union
:- *HashAggregate(keys=[a#13], functions=[])
: +- Exchange hashpartitioning(a#13, 200)
: +- *HashAggregate(keys=[a#13], functions=[])
: +- Union
: :- *Project [1 AS a#13]
: : +- Scan OneRowRelation[]
: +- *Project [2 AS b#14]
: +- Scan OneRowRelation[]
+- *Project [3 AS c#15]
+- Scan OneRowRelation[]
Only the top distinct should be necessary.
After this patch, the physical plan looks like:
*HashAggregate(keys=[a#221], functions=[], output=[a#221])
+- Exchange hashpartitioning(a#221, 5)
+- *HashAggregate(keys=[a#221], functions=[], output=[a#221])
+- Union
:- *Project [1 AS a#221]
: +- Scan OneRowRelation[]
:- *Project [2 AS b#222]
: +- Scan OneRowRelation[]
+- *Project [3 AS c#223]
+- Scan OneRowRelation[]
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#15238 from viirya/remove-extra-distinct-union.