Commit graph

5454 commits

Author SHA1 Message Date
Maxim Gekk 027ed2d11b [SPARK-23643][CORE][SQL][ML] Shrinking the buffer in hashSeed up to size of the seed parameter
## What changes were proposed in this pull request?

The hashSeed method allocates 64 bytes instead of 8. Other bytes are always zeros (thanks to default behavior of ByteBuffer). And they could be excluded from hash calculation because they don't differentiate inputs.

## How was this patch tested?

By running the existing tests - XORShiftRandomSuite

Closes #20793 from MaxGekk/hash-buff-size.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-23 11:26:09 -05:00
Ryan Blue 34e3cc7060 [SPARK-27108][SQL] Add parsed SQL plans for create, CTAS.
## What changes were proposed in this pull request?

This moves parsing `CREATE TABLE ... USING` statements into catalyst. Catalyst produces logical plans with the parsed information and those plans are converted to v1 `DataSource` plans in `DataSourceAnalysis`.

This prepares for adding v2 create plans that should receive the information parsed from SQL without being translated to v1 plans first.

This also makes it possible to parse in catalyst instead of breaking the parser across the abstract `AstBuilder` in catalyst and `SparkSqlParser` in core.

For more information, see the [mailing list thread](https://lists.apache.org/thread.html/54f4e1929ceb9a2b0cac7cb058000feb8de5d6c667b2e0950804c613%3Cdev.spark.apache.org%3E).

## How was this patch tested?

This uses existing tests to catch regressions. This introduces no behavior changes.

Closes #24029 from rdblue/SPARK-27108-add-parsed-create-logical-plans.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-22 13:58:54 -07:00
Jungtaek Lim (HeartSaVioR) 78d546fe15
[SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted
## What changes were proposed in this pull request?

This patch proposes ManifestFileCommitProtocol to clean up incomplete output files in task level if task aborts. Please note that this works as 'best-effort', not kind of guarantee, as we have in HadoopMapReduceCommitProtocol.

## How was this patch tested?

Added UT.

Closes #24154 from HeartSaVioR/SPARK-27210.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-03-22 11:26:53 -07:00
Martin Junghanns 8efc5ec72e [SPARK-27174][SQL] Add support for casting integer types to binary
Co-authored-by: Philip Stutz <philip.stutzgmail.com>

## What changes were proposed in this pull request?

This PR adds support for casting

* `ByteType`
* `ShortType`
* `IntegerType`
* `LongType`

to `BinaryType`.

## How was this patch tested?

We added unit tests for casting instances of the above types. For validation, we used Javas `DataOutputStream` to compare the resulting byte array with the result of `Cast`.

We state that the contribution is our original work and that we license the work to the project under the project’s open source license.

cloud-fan we'd appreciate a review if you find the time, thx

Closes #24107 from s1ck/cast_to_binary.

Authored-by: Martin Junghanns <martin.junghanns@neotechnology.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-22 10:09:35 -07:00
Maxim Gekk a529be2930 [SPARK-27212][SQL] Eliminate TimeZone to ZoneId conversion in stringToTimestamp
## What changes were proposed in this pull request?

In the PR, I propose to avoid the `TimeZone` to `ZoneId` conversion in `DateTimeUtils.stringToTimestamp` by changing signature of the method, and require a parameter of `ZoneId` type. This will allow to avoid unnecessary conversion (`TimeZone` -> `String` -> `ZoneId`) per each row.

Also the PR avoids creation of `ZoneId` instances from `ZoneOffset` because `ZoneOffset` is a sub-class, and the conversion is unnecessary too.

## How was this patch tested?

It was tested by `DateTimeUtilsSuite` and `CastSuite`.

Closes #24155 from MaxGekk/stringtotimestamp-zoneid.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-03-22 18:01:29 +09:00
maryannxue 9f58d3b436 [SPARK-27236][TEST] Refactor log-appender pattern in tests
## What changes were proposed in this pull request?

Refactored code in tests regarding the "withLogAppender()" pattern by creating a general helper method in SparkFunSuite.

## How was this patch tested?

Passed existing tests.

Closes #24172 from maryannxue/log-appender.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-03-21 19:18:30 -07:00
John Zhuge 80565ce253 [SPARK-26946][SQL] Identifiers for multi-catalog
## What changes were proposed in this pull request?

- Support N-part identifier in SQL
- N-part identifier extractor in Analyzer

## How was this patch tested?

- A new unit test suite ResolveMultipartRelationSuite
- CatalogLoadingSuite

rblue cloud-fan mccheah

Closes #23848 from jzhuge/SPARK-26946.

Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-21 18:04:50 -07:00
Takeshi Yamamuro 0627850b7e [SPARK-25196][SQL] Extends the analyze column command for cached tables
## What changes were proposed in this pull request?
This pr extended `ANALYZE` commands to analyze column stats for cached table.

In common use cases, users read catalog table data, join/aggregate them, and then cache the result for following reuse. Since we are only allowed to analyze column statistics in catalog tables via ANALYZE commands, the current optimization depends on non-existing or inaccurate column statistics of cached data. So, it would be great if we could analyze cached data as follows;

```scala
scala> def printColumnStats(tableName: String) = {
     |   spark.table(tableName).queryExecution.optimizedPlan.stats.attributeStats.foreach {
     |     case (k, v) => println(s"[$k]: $v")
     |   }
     | }

scala> sql("SET spark.sql.cbo.enabled=true")
scala> sql("SET spark.sql.statistics.histogram.enabled=true")

scala> spark.range(1000).selectExpr("id % 33 AS c0", "rand() AS c1", "0 AS c2").write.saveAsTable("t")
scala> sql("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS c0, c1, c2")
scala> spark.table("t").groupBy("c0").agg(count("c1").as("v1"), sum("c2").as("v2")).createTempView("temp")

// Prints column statistics in catalog table `t`
scala> printColumnStats("t")
[c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;9f7c1c)),2)
[c1#7074]: ColumnStat(Some(944),Some(3.2108484832404915E-4),Some(0.997584797423909),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;60a386b1)),2)
[c2#7075]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(4),Some(4),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;5ffd29e8)),2)

// Prints column statistics on cached table `temp`
scala> sql("CACHE TABLE temp")
scala> printColumnStats("temp")
<No Column Statistics>

// Analyzes columns `v1` and `v2` on cached table `temp`
scala> sql("ANALYZE TABLE temp COMPUTE STATISTICS FOR COLUMNS v1, v2")

// Then, prints again
scala> printColumnStats("temp")
[v1#7084L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;49f7bb6f)),2)
[v2#7086L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;12701677)),2)

// Analyzes one left column and prints again
scala> sql("ANALYZE TABLE temp COMPUTE STATISTICS FOR COLUMNS c0")
scala> printColumnStats("temp")
[v1#7084L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;49f7bb6f)),2)
[v2#7086L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;12701677)),2)
[c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;1f5c1b81)),2)
```

## How was this patch tested?
Added tests in `CachedTableSuite` and `StatisticsCollectionSuite`.

Closes #24047 from maropu/SPARK-25196-4.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-21 09:20:35 -07:00
Bryan Cutler be08b415da [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality
## What changes were proposed in this pull request?

This change is a cleanup and consolidation of 3 areas related to Pandas UDFs:

1) `ArrowStreamPandasSerializer` now inherits from `ArrowStreamSerializer` and uses the base class `dump_stream`, `load_stream` to create Arrow reader/writer and send Arrow record batches.  `ArrowStreamPandasSerializer` makes the conversions to/from Pandas and converts to Arrow record batch iterators. This change removed duplicated creation of Arrow readers/writers.

2) `createDataFrame` with Arrow now uses `ArrowStreamPandasSerializer` instead of doing its own conversions from Pandas to Arrow and sending record batches through `ArrowStreamSerializer`.

3) Grouped Map UDFs now reuse existing logic in `ArrowStreamPandasSerializer` to send Pandas DataFrame results as a `StructType` instead of separating each column from the DataFrame. This makes the code a little more consistent with the Python worker, but does require that the returned StructType column is flattened out in `FlatMapGroupsInPandasExec` in Scala.

## How was this patch tested?

Existing tests and ran tests with pyarrow 0.12.0

Closes #24095 from BryanCutler/arrow-refactor-cleanup-UDFs.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-03-21 17:44:51 +09:00
maryannxue 2e090ba628 [SPARK-27223][SQL] Remove private methods that skip conversion when passing user schemas for constructing a DataFrame
## What changes were proposed in this pull request?

When passing in a user schema to create a DataFrame, there might be mismatched nullability between the user schema and the the actual data. All related public interfaces now perform catalyst conversion using the user provided schema, which catches such mismatches to avoid runtime errors later on. However, there're private methods which allow this conversion to be skipped, so we need to remove these private methods which may lead to confusion and potential issues.

## How was this patch tested?

Passed existing tests. No new tests were added since this PR removed the private interfaces that would potentially cause null problems and other interfaces are covered already by existing tests.

Closes #24162 from maryannxue/spark-27223.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-03-21 11:13:25 +09:00
wangguangxin.cn 46f9f44918 [SPARK-27202][MINOR][SQL] Update comments to keep according with code
## What changes were proposed in this pull request?

Update comments in `InMemoryFileIndex.listLeafFiles` to keep according with code.

## How was this patch tested?

existing test cases

Closes #24146 from WangGuangxin/SPARK-27202.

Authored-by: wangguangxin.cn <wangguangxin.cn@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-20 17:54:28 -05:00
Maxim Gekk 1882912cca [SPARK-27199][SQL] Replace TimeZone by ZoneId in TimestampFormatter API
## What changes were proposed in this pull request?

In the PR, I propose to use `ZoneId` instead of `TimeZone` in:
- the `apply` and `getFractionFormatter ` methods of the `TimestampFormatter` object,
- and in implementations of the `TimestampFormatter` trait like `FractionTimestampFormatter`.

The reason of the changes is to avoid unnecessary conversion from `TimeZone` to `ZoneId` because `ZoneId` is used in `TimestampFormatter` implementations internally, and the conversion is performed via `String` which is not for free. Also taking into account that `TimeZone` instances are converted from `String` in some cases, the worse case looks like `String` -> `TimeZone` -> `String` -> `ZoneId`. The PR eliminates the unneeded conversions.

## How was this patch tested?

It was tested by `DateExpressionsSuite`, `DateTimeUtilsSuite` and `TimestampFormatterSuite`.

Closes #24141 from MaxGekk/zone-id.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-03-20 21:28:11 +09:00
Huon Wilson b67d369572 [SPARK-27099][SQL] Add 'xxhash64' for hashing arbitrary columns to Long
## What changes were proposed in this pull request?

This introduces a new SQL function 'xxhash64' for getting a 64-bit hash of an arbitrary number of columns.

This is designed to exactly mimic the 32-bit `hash`, which uses
MurmurHash3. The name is designed to be more future-proof than the
'hash', by indicating the exact algorithm used, similar to md5 and the
sha hashes.

## How was this patch tested?

The tests for the existing `hash` function were duplicated to run with `xxhash64`.

Closes #24019 from huonw/hash64.

Authored-by: Huon Wilson <Huon.Wilson@data61.csiro.au>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-20 16:34:34 +08:00
Darcy Shen 9a43852f17 [SPARK-27160][SQL] Fix DecimalType when building orc filters
## What changes were proposed in this pull request?
DecimalType Literal should not be casted to Long.

eg. For `df.filter("x < 3.14")`, assuming df (x in DecimalType) reads from a ORC table and uses the native ORC reader with predicate push down enabled, we will push down the `x < 3.14` predicate to the ORC reader via a SearchArgument.

OrcFilters will construct the SearchArgument, but not handle the DecimalType correctly.

The previous impl will construct `x < 3` from `x < 3.14`.

## How was this patch tested?
```
$ sbt
> sql/testOnly *OrcFilterSuite
> sql/testOnly *OrcQuerySuite -- -z "27160"
```

Closes #24092 from sadhen/spark27160.

Authored-by: Darcy Shen <sadhen@zoho.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-19 20:28:46 -07:00
Dongjoon Hyun 257391497b [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
## What changes were proposed in this pull request?

As [SPARK-26958](https://github.com/apache/spark/pull/23862/files) benchmark shows, nested-column pruning has limitations. This PR aims to remove the limitations on `limit/repartition/sample`. Here, repartition means `Repartition`, not `RepartitionByExpression`.

**PREPARATION**
```scala
scala> spark.range(100).map(x => (x, (x, s"$x" * 100))).toDF("col1", "col2").write.mode("overwrite").save("/tmp/p")
scala> sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=true")
scala> spark.read.parquet("/tmp/p").createOrReplaceTempView("t")
```

**BEFORE**
```scala
scala> sql("SELECT col2._1 FROM (SELECT col2 FROM t LIMIT 1000000)").explain
== Physical Plan ==
CollectLimit 1000000
+- *(1) Project [col2#22._1 AS _1#28L]
   +- *(1) FileScan parquet [col2#22] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/p], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col2:struct<_1:bigint>>

scala> sql("SELECT col2._1 FROM (SELECT /*+ REPARTITION(1) */ col2 FROM t)").explain
== Physical Plan ==
*(2) Project [col2#22._1 AS _1#33L]
+- Exchange RoundRobinPartitioning(1)
   +- *(1) Project [col2#22]
      +- *(1) FileScan parquet [col2#22] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/p], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col2:struct<_1:bigint,_2:string>>
```

**AFTER**
```scala
scala> sql("SELECT col2._1 FROM (SELECT /*+ REPARTITION(1) */ col2 FROM t)").explain
== Physical Plan ==
Exchange RoundRobinPartitioning(1)
+- *(1) Project [col2#5._1 AS _1#11L]
   +- *(1) FileScan parquet [col2#5] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/p], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col2:struct<_1:bigint>>
```

This supercedes https://github.com/apache/spark/pull/23542 and https://github.com/apache/spark/pull/23873 .

## How was this patch tested?

Pass the Jenkins with a newly added test suite.

Closes #23964 from dongjoon-hyun/SPARK-26975-ALIAS.

Lead-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-19 20:24:22 -07:00
Dongjoon Hyun 4d5247778a [SPARK-27197][SQL][TEST] Add ReadNestedSchemaTest for file-based data sources
## What changes were proposed in this pull request?

The reader schema is said to be evolved (or projected) when it changed after the data is written by writers. Apache Spark file-based data sources have a test coverage for that; e.g. [ReadSchemaSuite.scala](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala). This PR aims to add a test coverage for nested columns by adding and hiding nested columns.

## How was this patch tested?

Pass the Jenkins with newly added tests.

Closes #24139 from dongjoon-hyun/SPARK-27197.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-03-20 00:22:05 +00:00
s71955 e402de5fd0 [SPARK-26176][SQL] Verify column names for CTAS with STORED AS
## What changes were proposed in this pull request?
Currently, users meet job abortions while creating a table using the Hive serde "STORED AS" with invalid column names. We had better prevent this by raising **AnalysisException** with a guide to use aliases instead like Paquet data source tables.
thus making compatible with error message shown while creating Parquet/ORC native table.

**BEFORE**
```scala
scala> sql("set spark.sql.hive.convertMetastoreParquet=false")
scala> sql("CREATE TABLE a STORED AS PARQUET AS SELECT 1 AS `COUNT(ID)`")
Caused by: java.lang.IllegalArgumentException: No enum constant parquet.schema.OriginalType.col1
```

**AFTER**
```scala
scala> sql("CREATE TABLE a STORED AS PARQUET AS SELECT 1 AS `COUNT(ID)`")
 Please use alias to rename it.;eption: Attribute name "count(ID)" contains invalid character(s) among " ,;{}()\n\t=".
```

## How was this patch tested?
Pass the Jenkins with the newly added test case.

Closes #24075 from sujith71955/master_serde.

Authored-by: s71955 <sujithchacko.2010@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-19 20:29:47 +08:00
Takeshi Yamamuro 901c7408a4 [SPARK-27161][SQL][FOLLOWUP] Drops non-keywords from docs/sql-keywords.md
## What changes were proposed in this pull request?
This pr is a follow-up of #24093 and includes fixes below;
 - Lists up all the keywords of Spark only (that is, drops non-keywords there); I listed up all the keywords of ANSI SQL-2011 in the previous commit (SPARK-26215).
 - Sorts the keywords in `SqlBase.g4` in a alphabetical order

## How was this patch tested?
Pass Jenkins.

Closes #24125 from maropu/SPARK-27161-FOLLOWUP.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-19 20:18:40 +08:00
Gengliang Wang 28d35c8578 [SPARK-27162][SQL] Add new method asCaseSensitiveMap in CaseInsensitiveStringMap
## What changes were proposed in this pull request?

Currently, DataFrameReader/DataFrameReader supports setting Hadoop configurations via method `.option()`.
E.g, the following test case should be passed in both ORC V1 and V2
```
  class TestFileFilter extends PathFilter {
    override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
  }

  withTempPath { dir =>
      val path = dir.getCanonicalPath

      val df = spark.range(2)
      df.write.orc(path + "/p=1")
      df.write.orc(path + "/p=2")
      val extraOptions = Map(
        "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
        "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
      )
      assert(spark.read.options(extraOptions).orc(path).count() === 2)
    }
  }
```
While Hadoop Configurations are case sensitive, the current data source V2 APIs are using `CaseInsensitiveStringMap` in the top level entry `TableProvider`.
To create Hadoop configurations correctly, I suggest
1. adding a new method `asCaseSensitiveMap` in `CaseInsensitiveStringMap`.
2. Make `CaseInsensitiveStringMap` read-only to ambiguous conversion in `asCaseSensitiveMap`

## How was this patch tested?

Unit test

Closes #24094 from gengliangwang/originalMap.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-19 13:35:47 +08:00
Dongjoon Hyun 26e9849cb4 [SPARK-27195][SQL][TEST] Add AvroReadSchemaSuite
## What changes were proposed in this pull request?

The reader schema is said to be evolved (or projected) when it changed after the data is written by writers. Apache Spark file-based data sources have a test coverage for that, [ReadSchemaSuite.scala](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala). This PR aims to add `AvroReadSchemaSuite` to ensure the minimal consistency among file-based data sources and prevent a future regression in Avro data source.

## How was this patch tested?

Pass the Jenkins with the newly added test suite.

Closes #24135 from dongjoon-hyun/SPARK-27195.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-18 20:10:30 -07:00
Ryan Blue e348f14259 [SPARK-26811][SQL] Add capabilities to v2.Table
## What changes were proposed in this pull request?

This adds a new method, `capabilities` to `v2.Table` that returns a set of `TableCapability`. Capabilities are used to fail queries during analysis checks, `V2WriteSupportCheck`, when the table does not support operations, like truncation.

## How was this patch tested?

Existing tests for regressions, added new analysis suite, `V2WriteSupportCheckSuite`, for new capability checks.

Closes #24012 from rdblue/SPARK-26811-add-capabilities.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-18 18:25:11 +08:00
Jungtaek Lim (HeartSaVioR) 4adbcdc424 [SPARK-22000][SQL][FOLLOW-UP] Fix bad test to ensure it can test properly
## What changes were proposed in this pull request?

There was some mistake on test code: it has wrong assertion. The patch proposes fixing it, as well as fixing other stuff to make test really pass.

## How was this patch tested?

Fixed unit test.

Closes #24112 from HeartSaVioR/SPARK-22000-hotfix.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-03-17 08:25:40 +09:00
Dilip Biswal 7a136f8670 [SPARK-27096][SQL][FOLLOWUP] Do the correct validation of join types in R side and fix join docs for scala, python and r
## What changes were proposed in this pull request?
This is a minor follow-up PR for SPARK-27096. The original PR reconciled the join types supported between dataset and sql interface. In case of R, we do the join type validation in the R side. In this PR we do the correct validation and adds tests in R to test all the join types along with the error condition. Along with this, i made the necessary doc correction.

## How was this patch tested?
Add R tests.

Closes #24087 from dilipbiswal/joinfix_followup.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-03-16 13:04:54 +09:00
Zhu, Lipeng 8ee09f26d5 [SPARK-27159][SQL] update mssql server dialect to support binary type
## What changes were proposed in this pull request?

Change the binary type mapping from default blob to varbinary(max) for mssql server.
https://docs.microsoft.com/en-us/sql/t-sql/data-types/binary-and-varbinary-transact-sql?view=sql-server-2017
![image](https://user-images.githubusercontent.com/698621/54351715-0e8c8780-468b-11e9-8931-7ecb85c5ad6b.png)

## How was this patch tested?

Unit test.

Closes #24091 from lipzhu/SPARK-27159.

Authored-by: Zhu, Lipeng <lipzhu@ebay.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-15 20:21:59 -05:00
Gengliang Wang 2a37d6ed93 [SPARK-27132][SQL] Improve file source V2 framework
## What changes were proposed in this pull request?

During the migration of CSV V2(https://github.com/apache/spark/pull/24005), I find that we can improve the file source v2 framework by:
1. check duplicated column names in both read and write
2. Not all the file sources support filter push down. So remove `SupportsPushDownFilters` from FileScanBuilder
3. The method `isSplitable` might require data source options. Add a new member `options` to FileScan.
4. Make `FileTable.schema` a lazy value instead of a method.

## How was this patch tested?

Unit test

Closes #24066 from gengliangwang/reviseFileSourceV2.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-15 11:58:03 +08:00
Dongjoon Hyun 74d2f04183 [SPARK-27166][SQL] Improve printSchema to print up to the given level
## What changes were proposed in this pull request?

This PR aims to improve `printSchema` to be able to print up to the given level of the schema.

```scala
scala> val df = Seq((1,(2,(3,4)))).toDF
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: int, _2: struct<_1: int, _2: int>>]

scala> df.printSchema
root
|-- _1: integer (nullable = false)
|-- _2: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: struct (nullable = true)
| | |-- _1: integer (nullable = false)
| | |-- _2: integer (nullable = false)

scala> df.printSchema(1)
root
|-- _1: integer (nullable = false)
|-- _2: struct (nullable = true)

scala> df.printSchema(2)
root
|-- _1: integer (nullable = false)
|-- _2: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: struct (nullable = true)

scala> df.printSchema(3)
root
|-- _1: integer (nullable = false)
|-- _2: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: struct (nullable = true)
| | |-- _1: integer (nullable = false)
| | |-- _2: integer (nullable = false)
```

## How was this patch tested?

Pass the Jenkins with the newly added test case.

Closes #24098 from dongjoon-hyun/SPARK-27166.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-14 20:27:55 -07:00
Gengliang Wang 6d22ee3969 [SPARK-27136][SQL] Remove data source option check_files_exist
## What changes were proposed in this pull request?

The data source option check_files_exist is introduced in In #23383 when the file source V2 framework is implemented. In the PR, FileIndex was created as a member of FileTable, so that we could implement partition pruning like 0f9fcab in the future. At that time `FileIndex`es will always be created for file writes, so we needed the option to decide whether to check file existence.

After https://github.com/apache/spark/pull/23774, the option is not needed anymore, since Dataframe writes won't create unnecessary FileIndex. This PR is to remove the option.

## How was this patch tested?

Unit test.

Closes #24069 from gengliangwang/removeOptionCheckFilesExist.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-15 10:19:26 +08:00
Dave DeCaprio 8819eaba4d [SPARK-26917][SQL] Further reduce locks in CacheManager
## What changes were proposed in this pull request?

Further load increases in our production environment have shown that even the read locks can cause some contention, since they contain a mechanism that turns a read lock into an exclusive lock if a writer has been starved out.  This PR reduces the potential for lock contention even further than https://github.com/apache/spark/pull/23833.  Additionally, it uses more idiomatic scala than the previous implementation.

cloud-fan & gatorsmile This is a relatively minor improvement to the previous CacheManager changes.  At this point, I think we finally are doing the minimum possible amount of locking.

## How was this patch tested?

Has been tested on a live system where the blocking was causing major issues and it is working well.
CacheManager has no explicit unit test but is used in many places internally as part of the SharedState.

Closes #24028 from DaveDeCaprio/read-locks-master.

Lead-authored-by: Dave DeCaprio <daved@alum.mit.edu>
Co-authored-by: David DeCaprio <daved@alum.mit.edu>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-15 10:13:34 +08:00
Shahid 8b5224097b [SPARK-27145][MINOR] Close store in the SQLAppStatusListenerSuite after test
## What changes were proposed in this pull request?
We create many stores in the SQLAppStatusListenerSuite, but we need to the close store after test.

## How was this patch tested?
Existing tests

Closes #24079 from shahidki31/SPARK-27145.

Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-03-14 13:08:41 -07:00
Takeshi Yamamuro 66c5cd2d9c [SPARK-27151][SQL] ClearCacheCommand extends IgnoreCahedData to avoid plan node copys
## What changes were proposed in this pull request?
In SPARK-27011, we introduced `IgnoreCahedData` to avoid plan node copys in `CacheManager`.
Since `ClearCacheCommand` has no argument, it also can extend `IgnoreCahedData`.

## How was this patch tested?
Pass Jenkins.

Closes #24081 from maropu/SPARK-27011-2.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-14 11:36:16 -07:00
Takeshi Yamamuro bacffb8810 [SPARK-23264][SQL] Make INTERVAL keyword optional in INTERVAL clauses when ANSI mode enabled
## What changes were proposed in this pull request?
This pr updated parsing rules in `SqlBase.g4` to support a SQL query below when ANSI mode enabled;
```
SELECT CAST('2017-08-04' AS DATE) + 1 days;
```
The current master cannot parse it though, other dbms-like systems support the syntax (e.g., hive and mysql). Also, the syntax is frequently used in the official TPC-DS queries.

This pr added new tokens as follows;
```
YEAR | YEARS | MONTH | MONTHS | WEEK | WEEKS | DAY | DAYS | HOUR | HOURS | MINUTE
MINUTES | SECOND | SECONDS | MILLISECOND | MILLISECONDS | MICROSECOND | MICROSECONDS
```
Then, it registered the keywords below as the ANSI reserved (this follows SQL-2011);
```
 DAY | HOUR | MINUTE | MONTH | SECOND | YEAR
```

## How was this patch tested?
Added tests in `SQLQuerySuite`, `ExpressionParserSuite`, and `TableIdentifierParserSuite`.

Closes #20433 from maropu/SPARK-23264.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-03-14 10:45:29 +09:00
Jungtaek Lim (HeartSaVioR) 733f2c0b98 [MINOR][SQL] Deduplicate huge if statements in get between specialized getters
## What changes were proposed in this pull request?

This patch deduplicates the huge if statements regarding getting value between specialized getters.

## How was this patch tested?

Existing UT.

Closes #24016 from HeartSaVioR/MINOR-deduplicate-get-from-specialized-getters.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-13 15:52:21 -05:00
Dongjoon Hyun 3221bf4cd5 [SPARK-27034][SPARK-27123][SQL][FOLLOWUP] Update Nested Schema Pruning BM result with EC2
## What changes were proposed in this pull request?

This is a follow up PR for #23943 in order to update the benchmark result with EC2 `r3.xlarge` instance.

## How was this patch tested?

N/A. (Manually compare the diff)

Closes #24078 from dongjoon-hyun/SPARK-27034.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-03-13 20:27:10 +00:00
Wenchen Fan 2a80a4cd39 [SPARK-27106][SQL] merge CaseInsensitiveStringMap and DataSourceOptions
## What changes were proposed in this pull request?

It's a little awkward to have 2 different classes(`CaseInsensitiveStringMap` and `DataSourceOptions`) to present the options in data source and catalog API.

This PR merges these 2 classes, while keeping the name `CaseInsensitiveStringMap`, which is more precise.

## How was this patch tested?

existing tests

Closes #24025 from cloud-fan/option.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-14 01:23:27 +08:00
Dave DeCaprio 812ad55461 [SPARK-26103][SQL] Limit the length of debug strings for query plans
## What changes were proposed in this pull request?

The PR puts in a limit on the size of a debug string generated for a tree node.  Helps to fix out of memory errors when large plans have huge debug strings.   In addition to SPARK-26103, this should also address SPARK-23904 and SPARK-25380.  AN alternative solution was proposed in #23076, but that solution doesn't address all the cases that can cause a large query.  This limit is only on calls treeString that don't pass a Writer, which makes it play nicely with #22429, #23018 and #23039.  Full plans can be written to files, but truncated plans will be used when strings are held in memory, such as for the UI.

- A new configuration parameter called spark.sql.debug.maxPlanLength was added to control the length of the plans.
- When plans are truncated, "..." is printed to indicate that it isn't a full plan
- A warning is printed out the first time a truncated plan is displayed. The warning explains what happened and how to adjust the limit.

## How was this patch tested?

Unit tests were created for the new SizeLimitedWriter.  Also a unit test for TreeNode was created that checks that a long plan is correctly truncated.

Closes #23169 from DaveDeCaprio/text-plan-size.

Lead-authored-by: Dave DeCaprio <daved@alum.mit.edu>
Co-authored-by: David DeCaprio <daved@alum.mit.edu>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-03-13 09:58:43 -07:00
Wenchen Fan d3813d8b21 [SPARK-27064][SS] create StreamingWrite at the beginning of streaming execution
## What changes were proposed in this pull request?

According to the [design](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing), the life cycle of `StreamingWrite` should be the same as the read side `MicroBatch/ContinuousStream`, i.e. each run of the stream query, instead of each epoch.

This PR fixes it.

## How was this patch tested?

existing tests

Closes #23981 from cloud-fan/dsv2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-13 19:47:54 +08:00
Liang-Chi Hsieh f55c760df6 [SPARK-27034][SQL][FOLLOWUP] Rename ParquetSchemaPruning to SchemaPruning
## What changes were proposed in this pull request?

This is a followup to #23943. This proposes to rename ParquetSchemaPruning to SchemaPruning as ParquetSchemaPruning supports both Parquet and ORC v1 now.

## How was this patch tested?

Existing tests.

Closes #24077 from viirya/nested-schema-pruning-orc-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-03-13 20:12:01 +09:00
Ajith e60d8fce0b [SPARK-27045][SQL] SQL tab in UI shows actual SQL instead of callsite in case of SparkSQLDriver
## What changes were proposed in this pull request?

When we run sql in spark via SparkSQLDriver (thrift server, spark-sql), SQL string is siet via ``setJobDescription``. the SparkUI SQL tab must show SQL instead of stacktrace in case ``setJobDescription`` is set which is more useful to end user. Instead it currently shows in description column the callsite shortform which is less useful

![image](https://user-images.githubusercontent.com/22072336/53734682-aaa7d900-3eaa-11e9-957b-0e5006db417e.png)

## How was this patch tested?

Manually:
![image](https://user-images.githubusercontent.com/22072336/53734657-9f54ad80-3eaa-11e9-8dc5-2b38f6970f4e.png)

Closes #23958 from ajithme/sqlui.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-12 16:14:29 -07:00
Liang-Chi Hsieh b0c2b3bfd9 [SPARK-27034][SQL] Nested schema pruning for ORC
## What changes were proposed in this pull request?

We only supported nested schema pruning for Parquet previously. This proposes to support nested schema pruning for ORC too.

Note: This only covers ORC v1. For ORC v2, the necessary change is at the schema pruning rule. We should deal with ORC v2 as a TODO item, in order to reduce review burden.

## How was this patch tested?

Added tests.

Closes #23943 from viirya/nested-schema-pruning-orc.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-12 15:39:16 -07:00
shivusondur 4b6d39d85d [SPARK-27090][CORE] Removing old LEGACY_DRIVER_IDENTIFIER ("<driver>")
## What changes were proposed in this pull request?
LEGACY_DRIVER_IDENTIFIER and its reference are removed.
corresponding references test are updated.

## How was this patch tested?
tested  UT test cases

Closes #24026 from shivusondur/newjira2.

Authored-by: shivusondur <shivusondur@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-12 13:29:39 -05:00
Shahid 1853db3186 [SPARK-27125][SQL][TEST] Add test suite for sql execution page
## What changes were proposed in this pull request?
Added test suite for AllExecutionsPage class. Checked the scenarios for SPARK-27019 and SPARK-27075.

## How was this patch tested?
Added UT, manually tested

Closes #24052 from shahidki31/SPARK-27125.

Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-12 10:15:28 -05:00
Ajith b8dd84b9e4 [SPARK-27011][SQL] reset command fails with cache
## What changes were proposed in this pull request?

When cache is enabled ( i.e once cache table command is executed), any following sql will trigger
 CacheManager#lookupCachedData which will create a copy of the tree node, which inturn calls TreeNode#makeCopy. Here the problem is it will try to create a copy instance. But as ResetCommand is a case object this will fail

## How was this patch tested?

Added UT to reproduce the issue

Closes #23918 from ajithme/reset.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-12 11:02:09 +08:00
Hyukjin Kwon 3725b1324f [SPARK-26923][SQL][R] Refactor ArrowRRunner and RRunner to share one BaseRRunner
## What changes were proposed in this pull request?

This PR proposes to have one base R runner.

In the high level,

Previously, it had `ArrowRRunner` and it inherited `RRunner`:

```
└── RRunner
    └── ArrowRRunner
```

After this PR, now it has a `BaseRRunner`, and `ArrowRRunner` and `RRunner` inherit `BaseRRunner`:

```
└── BaseRRunner
    ├── ArrowRRunner
    └── RRunner
```

This way is consistent with Python's.

In more details, see below:

```scala
class BaseRRunner[IN, OUT] {

  def compute: Iterator[OUT] = {
    ...
    newWriterThread(...).start()
    ...
    newReaderIterator(...)
    ...
  }

  // Make a thread that writes data from JVM to R process
  abstract protected def newWriterThread(..., iter: Iterator[IN], ...): WriterThread

  // Make an iterator that reads data from the R process to JVM
  abstract protected def newReaderIterator(...): ReaderIterator

  abstract class WriterThread(..., iter: Iterator[IN], ...) extends Thread {
    override def run(): Unit {
      ...
      writeIteratorToStream(...)
      ...
    }

    // Actually writing logic to the socket stream.
    abstract protected def writeIteratorToStream(dataOut: DataOutputStream): Unit
  }

  abstract class ReaderIterator extends Iterator[OUT] {
    override def hasNext(): Boolean = {
      ...
      read(...)
      ...
    }

    override def next(): OUT = {
      ...
      hasNext()
      ...
    }

    // Actually reading logic from the socket stream.
    abstract protected def read(...): OUT
  }
}
```

```scala
case [Arrow]RRunner extends BaseRRunner {
  override def newWriterThread(...) {
    new WriterThread(...) {
      override def writeIteratorToStream(...) {
        ...
      }
    }
  }

  override def newReaderIterator(...) {
    new ReaderIterator(...) {
      override def read(...) {
        ...
      }
    }
  }
}
```

## How was this patch tested?

Manually tested and existing tests should cover.

Closes #23977 from HyukjinKwon/SPARK-26923.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-03-12 08:45:29 +09:00
Jagadesh Kiran d9978fb4e4 [SPARK-26860][PYSPARK][SPARKR] Fix for RangeBetween and RowsBetween docs to be in sync with spark documentation
The docs describing RangeBetween & RowsBetween for pySpark & SparkR are not in sync with Spark description.

a. Edited PySpark and SparkR docs  and made description same for both RangeBetween and RowsBetween
b. created executable examples in both pySpark and SparkR documentation
c. Locally tested the patch for scala Style checks and UT for checking no testcase failures

Closes #23946 from jagadesh-kiran/master.

Authored-by: Jagadesh Kiran <jagadesh.n@in.verizon.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-11 08:53:09 -05:00
Dilip Biswal 1b9fd67904 [SPARK-27096][SQL] Reconcile the join types between data frame and sql interface
## What changes were proposed in this pull request?
Currently in the grammar file, we have the joinType rule defined as following :
```
joinType
    : INNER?
   ....
   ....
    | LEFT SEMI
    | LEFT? ANTI
    ;
```
The keyword LEFT is optional for ANTI join even though its not optional for SEMI join. When
using data frame interface join type "anti" is not allowed. The allowed types are "left_anti" or
"leftanti" for anti joins. ~~In this PR, i am making the LEFT keyword mandatory for ANTI joins so
it aligns better with the LEFT SEMI join in the grammar file and also the join types allowed from dataframe api.~~

This PR makes LEFT optional for SEMI join in .g4 and add "semi" and "anti" join types from dataframe.

~~I have not opened any JIRA for this as probably we may need some discussion to see if
we are going to address this or not.~~

## How was this patch tested?
Modified the join type tests.

Closes #23982 from dilipbiswal/join_fix.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-11 14:02:21 +08:00
Takeshi Yamamuro f0927d8ac4 [SPARK-27110][SQL] Moves some functions from AnalyzeColumnCommand to command/CommandUtils
## What changes were proposed in this pull request?
To reuse some common logics for improving `Analyze` commands (See the description of `SPARK-25196` for details), this pr moved some functions from `AnalyzeColumnCommand` to `command/CommandUtils`.  A follow-up pr will add code to extend `Analyze` commands for cached tables.

## How was this patch tested?
Existing tests.

Closes #22204 from maropu/SPARK-25196.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-10 15:17:46 -07:00
Shixiong Zhu 6e1c0827ec
[SPARK-27111][SS] Fix a race that a continuous query may fail with InterruptedException
## What changes were proposed in this pull request?

Before a Kafka consumer gets assigned with partitions, its offset will contain 0 partitions. However, runContinuous will still run and launch a Spark job having 0 partitions. In this case, there is a race that epoch may interrupt the query execution thread after `lastExecution.toRdd`, and either `epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` or the next `runContinuous` will get interrupted unintentionally.

To handle this case, this PR has the following changes:

- Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase the waiting time of `stop` but should be minor because the operations here are very fast (just sending an RPC message in the same process and stopping a very simple thread).
- Clear the interrupted status at the end so that it won't impact the `runContinuous` call. We may clear the interrupted status set by `stop`, but it doesn't affect the query termination because `runActivatedStream` will check `state` and exit accordingly.

I also updated the clean up codes to make sure exceptions thrown from `epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` won't stop the clean up.

## How was this patch tested?

Jenkins

Closes #24034 from zsxwing/SPARK-27111.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-03-09 14:26:58 -08:00
Kris Mok 57ae251f75 [SPARK-27097] Avoid embedding platform-dependent offsets literally in whole-stage generated code
## What changes were proposed in this pull request?

Spark SQL performs whole-stage code generation to speed up query execution. There are two steps to it:
- Java source code is generated from the physical query plan on the driver. A single version of the source code is generated from a query plan, and sent to all executors.
  - It's compiled to bytecode on the driver to catch compilation errors before sending to executors, but currently only the generated source code gets sent to the executors. The bytecode compilation is for fail-fast only.
- Executors receive the generated source code and compile to bytecode, then the query runs like a hand-written Java program.

In this model, there's an implicit assumption about the driver and executors being run on similar platforms. Some code paths accidentally embedded platform-dependent object layout information into the generated code, such as:
```java
Platform.putLong(buffer, /* offset */ 24, /* value */ 1);
```
This code expects a field to be at offset +24 of the `buffer` object, and sets a value to that field.
But whole-stage code generation generally uses platform-dependent information from the driver. If the object layout is significantly different on the driver and executors, the generated code can be reading/writing to wrong offsets on the executors, causing all kinds of data corruption.

One code pattern that leads to such problem is the use of `Platform.XXX` constants in generated code, e.g. `Platform.BYTE_ARRAY_OFFSET`.

Bad:
```scala
val baseOffset = Platform.BYTE_ARRAY_OFFSET
// codegen template:
s"Platform.putLong($buffer, $baseOffset, $value);"
```
This will embed the value of `Platform.BYTE_ARRAY_OFFSET` on the driver into the generated code.

Good:
```scala
val baseOffset = "Platform.BYTE_ARRAY_OFFSET"
// codegen template:
s"Platform.putLong($buffer, $baseOffset, $value);"
```
This will generate the offset symbolically -- `Platform.putLong(buffer, Platform.BYTE_ARRAY_OFFSET, value)`, which will be able to pick up the correct value on the executors.

Caveat: these offset constants are declared as runtime-initialized `static final` in Java, so they're not compile-time constants from the Java language's perspective. It does lead to a slightly increased size of the generated code, but this is necessary for correctness.

NOTE: there can be other patterns that generate platform-dependent code on the driver which is invalid on the executors. e.g. if the endianness is different between the driver and the executors, and if some generated code makes strong assumption about endianness, it would also be problematic.

## How was this patch tested?

Added a new test suite `WholeStageCodegenSparkSubmitSuite`. This test suite needs to set the driver's extraJavaOptions to force the driver and executor use different Java object layouts, so it's run as an actual SparkSubmit job.

Authored-by: Kris Mok <kris.mokdatabricks.com>

Closes #24031 from gatorsmile/cherrypickSPARK-27097.

Lead-authored-by: Kris Mok <kris.mok@databricks.com>
Co-authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-03-09 01:20:32 +00:00
Ryan Blue 6170e40c15 [SPARK-24252][SQL] Add v2 catalog plugin system
## What changes were proposed in this pull request?

This adds a v2 API for adding new catalog plugins to Spark.

* Catalog implementations extend `CatalogPlugin` and are loaded via reflection, similar to data sources
* `Catalogs` loads and initializes catalogs using configuration from a `SQLConf`
* `CaseInsensitiveStringMap` is used to pass configuration to `CatalogPlugin` via `initialize`

Catalogs are configured by adding config properties starting with `spark.sql.catalog.(name)`. The name property must specify a class that implements `CatalogPlugin`. Other properties under the namespace (`spark.sql.catalog.(name).(prop)`) are passed to the provider during initialization along with the catalog name.

This replaces #21306, which will be implemented in two multiple parts: the catalog plugin system (this commit) and specific catalog APIs, like `TableCatalog`.

## How was this patch tested?

Added test suites for `CaseInsensitiveStringMap` and for catalog loading.

Closes #23915 from rdblue/SPARK-24252-add-v2-catalog-plugins.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-08 19:31:49 +08:00
Yuming Wang 2036074b99 [SPARK-26004][SQL] InMemoryTable support StartsWith predicate push down
## What changes were proposed in this pull request?

[SPARK-24638](https://issues.apache.org/jira/browse/SPARK-24638) adds support for Parquet file `StartsWith` predicate push down.
`InMemoryTable` can also support this feature.

This is an example to explain how it works, Imagine that the `id` column stored as below:

Partition ID | lowerBound | upperBound
-- | -- | --
p1 | '1' | '9'
p2 | '10' | '19'
p3 | '20' | '29'
p4 | '30' | '39'
p5 | '40' | '49'

A filter ```df.filter($"id".startsWith("2"))``` or ```id like '2%'```
then we substr lowerBound and upperBound:

Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0, Length("2"))
-- | -- | --
p1 | '1' | '9'
p2 | '1' | '1'
p3 | '2' | '2'
p4 | '3' | '3'
p5 | '4' | '4'

We can see that we only need to read `p1` and `p3`.

## How was this patch tested?

 unit tests and benchmark tests

benchmark test result:
```
================================================================================================
Pushdown benchmark for StringStartsWith
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU  2.90GHz
StringStartsWith filter: (value like '10%'): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
InMemoryTable Vectorized                    12068 / 14198          1.3         767.3       1.0X
InMemoryTable Vectorized (Pushdown)           5457 / 8662          2.9         347.0       2.2X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU  2.90GHz
StringStartsWith filter: (value like '1000%'): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
InMemoryTable Vectorized                      5246 / 5355          3.0         333.5       1.0X
InMemoryTable Vectorized (Pushdown)           2185 / 2346          7.2         138.9       2.4X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU  2.90GHz
StringStartsWith filter: (value like '786432%'): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
InMemoryTable Vectorized                      5112 / 5312          3.1         325.0       1.0X
InMemoryTable Vectorized (Pushdown)           2292 / 2522          6.9         145.7       2.2X
```

Closes #23004 from wangyum/SPARK-26004.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-08 19:18:32 +08:00