CSV is the most common data format in the "small data" world. It is often the first format people want to try when they see Spark on a single node. Having to rely on a 3rd party component for this leads to poor user experience for new users. This PR merges the popular spark-csv data source package (https://github.com/databricks/spark-csv) with SparkSQL.
This is a first PR to bring the functionality to spark 2.0 master. We will complete items outlines in the design document (see JIRA attachment) in follow up pull requests.
Author: Hossein <hossein@databricks.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#10766 from rxin/csv.
The goal of this PR is to eliminate unnecessary translations when there are back-to-back `MapPartitions` operations. In order to achieve this I also made the following simplifications:
- Operators no longer have hold encoders, instead they have only the expressions that they need. The benefits here are twofold: the expressions are visible to transformations so go through the normal resolution/binding process. now that they are visible we can change them on a case by case basis.
- Operators no longer have type parameters. Since the engine is responsible for its own type checking, having the types visible to the complier was an unnecessary complication. We still leverage the scala compiler in the companion factory when constructing a new operator, but after this the types are discarded.
Deferred to a follow up PR:
- Remove as much of the resolution/binding from Dataset/GroupedDataset as possible. We should still eagerly check resolution and throw an error though in the case of mismatches for an `as` operation.
- Eliminate serializations in more cases by adding more cases to `EliminateSerialization`
Author: Michael Armbrust <michael@databricks.com>
Closes#10747 from marmbrus/encoderExpressions.
This PR makes bucketing and exchange share one common hash algorithm, so that we can guarantee the data distribution is same between shuffle and bucketed data source, which enables us to only shuffle one side when join a bucketed table and a normal one.
This PR also fixes the tests that are broken by the new hash behaviour in shuffle.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10703 from cloud-fan/use-hash-expr-in-shuffle.
This pull request rewrites CaseWhen expression to break the single, monolithic "branches" field into a sequence of tuples (Seq[(condition, value)]) and an explicit optional elseValue field.
Prior to this pull request, each even position in "branches" represents the condition for each branch, and each odd position represents the value for each branch. The use of them have been pretty confusing with a lot sliding windows or grouped(2) calls.
Author: Reynold Xin <rxin@databricks.com>
Closes#10734 from rxin/simplify-case.
Fix the style violation (space before , and :).
This PR is a followup for #10643 and rework of #10685 .
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#10732 from sarutak/SPARK-12692-followup-sql.
There are many potential benefits of having an efficient in memory columnar format as an alternate
to UnsafeRow. This patch introduces ColumnarBatch/ColumnarVector which starts this effort. The
remaining implementation can be done as follow up patches.
As stated in the in the JIRA, there are useful external components that operate on memory in a
simple columnar format. ColumnarBatch would serve that purpose and could server as a
zero-serialization/zero-copy exchange for this use case.
This patch supports running the underlying data either on heap or off heap. On heap runs a bit
faster but we would need offheap for zero-copy exchanges. Currently, this mode is hidden behind one
interface (ColumnVector).
This differs from Parquet or the existing columnar cache because this is *not* intended to be used
as a storage format. The focus is entirely on CPU efficiency as we expect to only have 1 of these
batches in memory per task. The layout of the values is just dense arrays of the value type.
Author: Nong Li <nong@databricks.com>
Author: Nong <nongli@gmail.com>
Closes#10628 from nongli/spark-12635.
This PR implements SQL generation support for persisted data source tables. A new field `metastoreTableIdentifier: Option[TableIdentifier]` is added to `LogicalRelation`. When a `LogicalRelation` representing a persisted data source relation is created, this field holds the database name and table name of the relation.
Author: Cheng Lian <lian@databricks.com>
Closes#10712 from liancheng/spark-12724-datasources-sql-gen.
Let me know whether you'd like to see it in other place
Author: Robert Kruszewski <robertk@palantir.com>
Closes#10210 from robert3005/feature/pluggable-optimizer.
Fix the style violation (space before , and :).
This PR is a followup for #10643.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#10718 from sarutak/SPARK-12692-followup-sql.
JIRA: https://issues.apache.org/jira/browse/SPARK-12744
This PR makes parsing JSON integers to timestamps consistent with casting behavior.
Author: Anatoliy Plastinin <anatoliy.plastinin@gmail.com>
Closes#10687 from antlypls/fix-json-timestamp-parsing.
Turn import ordering violations into build errors, plus a few adjustments
to account for how the checker behaves. I'm a little on the fence about
whether the existing code is right, but it's easier to appease the checker
than to discuss what's the more correct order here.
Plus a few fixes to imports that cropped in since my recent cleanups.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#10612 from vanzin/SPARK-3873-enable.
This PR tries to enable Spark SQL to convert resolved logical plans back to SQL query strings. For now, the major use case is to canonicalize Spark SQL native view support. The major entry point is `SQLBuilder.toSQL`, which returns an `Option[String]` if the logical plan is recognized.
The current version is still in WIP status, and is quite limited. Known limitations include:
1. The logical plan must be analyzed but not optimized
The optimizer erases `Subquery` operators, which contain necessary scope information for SQL generation. Future versions should be able to recover erased scope information by inserting subqueries when necessary.
1. The logical plan must be created using HiveQL query string
Query plans generated by composing arbitrary DataFrame API combinations are not supported yet. Operators within these query plans need to be rearranged into a canonical form that is more suitable for direct SQL generation. For example, the following query plan
```
Filter (a#1 < 10)
+- MetastoreRelation default, src, None
```
need to be canonicalized into the following form before SQL generation:
```
Project [a#1, b#2, c#3]
+- Filter (a#1 < 10)
+- MetastoreRelation default, src, None
```
Otherwise, the SQL generation process will have to handle a large number of special cases.
1. Only a fraction of expressions and basic logical plan operators are supported in this PR
Currently, 95.7% (1720 out of 1798) query plans in `HiveCompatibilitySuite` can be successfully converted to SQL query strings.
Known unsupported components are:
- Expressions
- Part of math expressions
- Part of string expressions (buggy?)
- Null expressions
- Calendar interval literal
- Part of date time expressions
- Complex type creators
- Special `NOT` expressions, e.g. `NOT LIKE` and `NOT IN`
- Logical plan operators/patterns
- Cube, rollup, and grouping set
- Script transformation
- Generator
- Distinct aggregation patterns that fit `DistinctAggregationRewriter` analysis rule
- Window functions
Support for window functions, generators, and cubes etc. will be added in follow-up PRs.
This PR leverages `HiveCompatibilitySuite` for testing SQL generation in a "round-trip" manner:
* For all select queries, we try to convert it back to SQL
* If the query plan is convertible, we parse the generated SQL into a new logical plan
* Run the new logical plan instead of the original one
If the query plan is inconvertible, the test case simply falls back to the original logic.
TODO
- [x] Fix failed test cases
- [x] Support for more basic expressions and logical plan operators (e.g. distinct aggregation etc.)
- [x] Comments and documentation
Author: Cheng Lian <lian@databricks.com>
Closes#10541 from liancheng/sql-generation.
Fix most build warnings: mostly deprecated API usages. I'll annotate some of the changes below. CC rxin who is leading the charge to remove the deprecated APIs.
Author: Sean Owen <sowen@cloudera.com>
Closes#10570 from srowen/SPARK-12618.
This PR is continue from previous closed PR 10314.
In this PR, SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE will be taken memory string conventions as input.
For example, the user can now specify 10g for SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE in SQLConf file.
marmbrus srowen : Can you help review this code changes ? Thanks.
Author: Kevin Yu <qyu@us.ibm.com>
Closes#10629 from kevinyu98/spark-12317.
It was introduced in 917d3fc069
/cc cloud-fan rxin
Author: Jacek Laskowski <jacek@japila.pl>
Closes#10636 from jaceklaskowski/fix-for-build-failure-2.11.
This PR manage the memory used by window functions (buffered rows), also enable external spilling.
After this PR, we can run window functions on a partition with hundreds of millions of rows with only 1G.
Author: Davies Liu <davies@databricks.com>
Closes#10605 from davies/unsafe_window.
[SPARK-12640][SQL] Add simple benchmarking utility class and add Parquet scan benchmarks.
We've run benchmarks ad hoc to measure the scanner performance. We will continue to invest in this
and it makes sense to get these benchmarks into code. This adds a simple benchmarking utility to do
this.
Author: Nong Li <nong@databricks.com>
Author: Nong <nongli@gmail.com>
Closes#10589 from nongli/spark-12640.
This PR adds bucket write support to Spark SQL. User can specify bucketing columns, numBuckets and sorting columns with or without partition columns. For example:
```
df.write.partitionBy("year").bucketBy(8, "country").sortBy("amount").saveAsTable("sales")
```
When bucketing is used, we will calculate bucket id for each record, and group the records by bucket id. For each group, we will create a file with bucket id in its name, and write data into it. For each bucket file, if sorting columns are specified, the data will be sorted before write.
Note that there may be multiply files for one bucket, as the data is distributed.
Currently we store the bucket metadata at hive metastore in a non-hive-compatible way. We use different bucketing hash function compared to hive, so we can't be compatible anyway.
Limitations:
* Can't write bucketed data without hive metastore.
* Can't insert bucketed data into existing hive tables.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10498 from cloud-fan/bucket-write.
This PR moves a major part of the new SQL parser to Catalyst. This is a prelude to start using this parser for all of our SQL parsing. The following key changes have been made:
The ANTLR Parser & Supporting classes have been moved to the Catalyst project. They are now part of the ```org.apache.spark.sql.catalyst.parser``` package. These classes contained quite a bit of code that was originally from the Hive project, I have added aknowledgements whenever this applied. All Hive dependencies have been factored out. I have also taken this chance to clean-up the ```ASTNode``` class, and to improve the error handling.
The HiveQl object that provides the functionality to convert an AST into a LogicalPlan has been refactored into three different classes, one for every SQL sub-project:
- ```CatalystQl```: This implements Query and Expression parsing functionality.
- ```SparkQl```: This is a subclass of CatalystQL and provides SQL/Core only functionality such as Explain and Describe.
- ```HiveQl```: This is a subclass of ```SparkQl``` and this adds Hive-only functionality to the parser such as Analyze, Drop, Views, CTAS & Transforms. This class still depends on Hive.
cc rxin
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10583 from hvanhovell/SPARK-12575.
For queries like :
select <> from table group by a distribute by a
we can eliminate distribute by ; since group by will anyways do a hash partitioning
Also applicable when user uses Dataframe API
Author: Yash Datta <Yash.Datta@guavus.com>
Closes#9858 from saucam/eliminatedistribute.
This fix masks JDBC credentials in the explain output. URL patterns to specify credential seems to be vary between different databases. Added a new method to dialect to mask the credentials according to the database specific URL pattern.
While adding tests I noticed explain output includes array variable for partitions ([Lorg.apache.spark.Partition;3ff74546,). Modified the code to include the first, and last partition information.
Author: sureshthalamati <suresh.thalamati@gmail.com>
Closes#10452 from sureshthalamati/mask_jdbc_credentials_spark-12504.
As noted in the code, this change is to make this component easier to test in isolation.
Author: Nong <nongli@gmail.com>
Closes#10581 from nongli/spark-12636.
address comments in #10435
This makes the API easier to use if user programmatically generate the call to hash, and they will get analysis exception if the arguments of hash is empty.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10588 from cloud-fan/hash.
just write the arguments into unsafe row and use murmur3 to calculate hash code
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10435 from cloud-fan/hash-expr.
Currently, when we call corr or cov on dataframe with invalid input we see these error messages for both corr and cov:
- "Currently cov supports calculating the covariance between two columns"
- "Covariance calculation for columns with dataType "[DataType Name]" not supported."
I've fixed this issue by passing the function name as an argument. We could also do the input checks separately for each function. I avoided doing that because of code duplication.
Thanks!
Author: Narine Kokhlikyan <narine.kokhlikyan@gmail.com>
Closes#10458 from NarineK/sparksqlstatsmessages.
The reader was previously not setting the row length meaning it was wrong if there were variable
length columns. This problem does not manifest usually, since the value in the column is correct and
projecting the row fixes the issue.
Author: Nong Li <nong@databricks.com>
Closes#10576 from nongli/spark-12589.
This PR enable cube/rollup as function, so they can be used as this:
```
select a, b, sum(c) from t group by rollup(a, b)
```
Author: Davies Liu <davies@databricks.com>
Closes#10522 from davies/rollup.
Spark SQL's JDBC data source allows users to specify an explicit JDBC driver to load (using the `driver` argument), but in the current code it's possible that the user-specified driver will not be used when it comes time to actually create a JDBC connection.
In a nutshell, the problem is that you might have multiple JDBC drivers on the classpath that claim to be able to handle the same subprotocol, so simply registering the user-provided driver class with the our `DriverRegistry` and JDBC's `DriverManager` is not sufficient to ensure that it's actually used when creating the JDBC connection.
This patch addresses this issue by first registering the user-specified driver with the DriverManager, then iterating over the driver manager's loaded drivers in order to obtain the correct driver and use it to create a connection (previously, we just called `DriverManager.getConnection()` directly).
If a user did not specify a JDBC driver to use, then we call `DriverManager.getDriver` to figure out the class of the driver to use, then pass that class's name to executors; this guards against corner-case bugs in situations where the driver and executor JVMs might have different sets of JDBC drivers on their classpaths (previously, there was the (rare) potential for `DriverManager.getConnection()` to use different drivers on the driver and executors if the user had not explicitly specified a JDBC driver class and the classpaths were different).
This patch is inspired by a similar patch that I made to the `spark-redshift` library (https://github.com/databricks/spark-redshift/pull/143), which contains its own modified fork of some of Spark's JDBC data source code (for cross-Spark-version compatibility reasons).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10519 from JoshRosen/jdbc-driver-precedence.
We can provides the option to choose JSON parser can be enabled to accept quoting of all character or not.
Author: Cazen <Cazen@korea.com>
Author: Cazen Lee <cazen.lee@samsung.com>
Author: Cazen Lee <Cazen@korea.com>
Author: cazen.lee <cazen.lee@samsung.com>
Closes#10497 from Cazen/master.
Avoiding the the No such table exception and throwing analysis exception as per the bug: SPARK-12533
Author: thomastechs <thomas.sebastian@tcs.com>
Closes#10529 from thomastechs/topic-branch.
callUDF has been deprecated. However, we do not have an alternative for users to specify the output data type without type tags. This pull request introduced a new API for that, and replaces the invocation of the deprecated callUDF with that.
Author: Reynold Xin <rxin@databricks.com>
Closes#10547 from rxin/SPARK-12599.
This PR is followed by https://github.com/apache/spark/pull/8391.
Previous PR fixes JDBCRDD to support null-safe equality comparison for JDBC datasource. This PR fixes the problem that it can actually return null as a result of the comparison resulting error as using the value of that comparison.
Author: hyukjinkwon <gurwls223@gmail.com>
Author: HyukjinKwon <gurwls223@gmail.com>
Closes#8743 from HyukjinKwon/SPARK-10180.