JIRA: https://issues.apache.org/jira/browse/SPARK-12850
This PR is to support bucket pruning when the predicates are `EqualTo`, `EqualNullSafe`, `IsNull`, `In`, and `InSet`.
Like HIVE, in this PR, the bucket pruning works when the bucketing key has one and only one column.
So far, I do not find a way to verify how many buckets are actually scanned. However, I did verify it when doing the debug. Could you provide a suggestion how to do it properly? Thank you! cloud-fan yhuai rxin marmbrus
BTW, we can add more cases to support complex predicate including `Or` and `And`. Please let me know if I should do it in this PR.
Maybe we also need to add test cases to verify if bucket pruning works well for each data type.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10942 from gatorsmile/pruningBuckets.
Make an internal non-deprecated version of incBytesRead and incRecordsRead so we don't have unecessary deprecation warnings in our build.
Right now incBytesRead and incRecordsRead are marked as deprecated and for internal use only. We should make private[spark] versions which are not deprecated and switch to those internally so as to not clutter up the warning messages when building.
cc andrewor14 who did the initial deprecation
Author: Holden Karau <holden@us.ibm.com>
Closes#11056 from holdenk/SPARK-13152-fix-task-metrics-deprecation-warnings.
They seem redundant and we can simply use DataFrameReader/Writer. The new usage looks like:
```scala
val df = sqlContext.read.stream("...")
val handle = df.write.stream("...")
handle.stop()
```
Author: Reynold Xin <rxin@databricks.com>
Closes#11062 from rxin/SPARK-13166.
A row from stream side could match multiple rows on build side, the loop for these matched rows should not be interrupted when emitting a row, so we buffer the output rows in a linked list, check the termination condition on producer loop (for example, Range or Aggregate).
Author: Davies Liu <davies@databricks.com>
Closes#10989 from davies/gen_join.
1. try to avoid the suffix (unique id)
2. remove the comment if there is no code generated.
3. re-arrange the order of functions
4. trop the new line for inlined blocks.
Author: Davies Liu <davies@databricks.com>
Closes#11032 from davies/better_suffix.
This PR add spilling support for generated TungstenAggregate.
If spilling happened, it's not that bad to do the iterator based sort-merge-aggregate (not generated).
The changes will be covered by TungstenAggregationQueryWithControlledFallbackSuite
Author: Davies Liu <davies@databricks.com>
Closes#10998 from davies/gen_spilling.
This patch implements support for more types when doing the vectorized decode. There are
a few more types remaining but they should be very straightforward after this. This code
has a few copy and paste pieces but they are difficult to eliminate due to performance
considerations.
Specifically, this patch adds support for:
- String, Long, Byte types
- Dictionary encoding for those types.
Author: Nong Li <nong@databricks.com>
Closes#10908 from nongli/spark-12992.
As benchmarked and discussed here: https://github.com/apache/spark/pull/10786/files#r50038294, benefits from codegen, the declarative aggregate function could be much faster than imperative one.
Author: Davies Liu <davies@databricks.com>
Closes#10960 from davies/stddev.
ddl.scala is defined in the execution package, and yet its reference of "UnaryNode" and "Command" are logical. This was fairly confusing when I was trying to understand the ddl code.
Author: Reynold Xin <rxin@databricks.com>
Closes#11021 from rxin/SPARK-13138.
This is a follow up to 9aadcffabd that extends Spark SQL to allow users to _repeatedly_ optimize and execute structured queries. A `ContinuousQuery` can be expressed using SQL, DataFrames or Datasets. The purpose of this PR is only to add some initial infrastructure which will be extended in subsequent PRs.
## User-facing API
- `sqlContext.streamFrom` and `df.streamTo` return builder objects that are analogous to the `read/write` interfaces already available to executing queries in a batch-oriented fashion.
- `ContinuousQuery` provides an interface for interacting with a query that is currently executing in the background.
## Internal Interfaces
- `StreamExecution` - executes streaming queries in micro-batches
The following are currently internal, but public APIs will be provided in a future release.
- `Source` - an interface for providers of continually arriving data. A source must have a notion of an `Offset` that monotonically tracks what data has arrived. For fault tolerance, a source must be able to replay data given a start offset.
- `Sink` - an interface that accepts the results of a continuously executing query. Also responsible for tracking the offset that should be resumed from in the case of a failure.
## Testing
- `MemoryStream` and `MemorySink` - simple implementations of source and sink that keep all data in memory and have methods for simulating durability failures
- `StreamTest` - a framework for performing actions and checking invariants on a continuous query
Author: Michael Armbrust <michael@databricks.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Josh Rosen <rosenville@gmail.com>
Closes#11006 from marmbrus/structured-streaming.
It is not valid to call `toAttribute` on a `NamedExpression` unless we know for sure that the child produced that `NamedExpression`. The current code worked fine when the grouping expressions were simple, but when they were a derived value this blew up at execution time.
Author: Michael Armbrust <michael@databricks.com>
Closes#11013 from marmbrus/groupByFunction-master.
1. Use lower case
2. Change long prefixes to something shorter (in this case I am changing only one: TungstenAggregate -> agg).
Author: Reynold Xin <rxin@databricks.com>
Closes#11017 from rxin/SPARK-13130.
This includes: float, boolean, short, decimal and calendar interval.
Decimal is mapped to long or byte array depending on the size and calendar
interval is mapped to a struct of int and long.
The only remaining type is map. The schema mapping is straightforward but
we might want to revisit how we deal with this in the rest of the execution
engine.
Author: Nong Li <nong@databricks.com>
Closes#10961 from nongli/spark-13043.
This PR adds the ability to specify the ```ignoreNulls``` option to the functions dsl, e.g:
```df.select($"id", last($"value", ignoreNulls = true).over(Window.partitionBy($"id").orderBy($"other"))```
This PR is some where between a bug fix (see the JIRA) and a new feature. I am not sure if we should backport to 1.6.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10957 from hvanhovell/SPARK-13049.
JIRA: https://issues.apache.org/jira/browse/SPARK-12689
DDLParser processes three commands: createTable, describeTable and refreshTable.
This patch migrates the three commands to newly absorbed parser.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10723 from viirya/migrate-ddl-describe.
Make sure we throw better error messages when Parquet schema merging fails.
Author: Cheng Lian <lian@databricks.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#10979 from viirya/schema-merging-failure-message.
This class is only used for serialization of Python DataFrame. However, we don't require internal row there, so `GenericRowWithSchema` can also do the job.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10992 from cloud-fan/python.
This PR add support for grouping keys for generated TungstenAggregate.
Spilling and performance improvements for BytesToBytesMap will be done by followup PR.
Author: Davies Liu <davies@databricks.com>
Closes#10855 from davies/gen_keys.
This issue is causing tests to fail consistently in master with Hadoop 2.6 / 2.7. This is because for Hadoop 2.5+ we overwrite existing values of `InputMetrics#bytesRead` in each call to `HadoopRDD#compute`. In the case of coalesce, e.g.
```
sc.textFile(..., 4).coalesce(2).count()
```
we will call `compute` multiple times in the same task, overwriting `bytesRead` values from previous calls to `compute`.
For a regression test, see `InputOutputMetricsSuite.input metrics for old hadoop with coalesce`. I did not add a new regression test because it's impossible without significant refactoring; there's a lot of existing duplicate code in this corner of Spark.
This was caused by #10835.
Author: Andrew Or <andrew@databricks.com>
Closes#10973 from andrewor14/fix-input-metrics-coalesce.
And ClientWrapper -> HiveClientImpl.
I have some followup pull requests to introduce a new internal catalog, and I think this new naming reflects better the functionality of the two classes.
Author: Reynold Xin <rxin@databricks.com>
Closes#10981 from rxin/SPARK-13076.
This is an existing issue uncovered recently by #10835. The reason for the exception was because the `SQLHistoryListener` gets all sorts of accumulators, not just the ones that represent SQL metrics. For example, the listener gets the `internal.metrics.shuffleRead.remoteBlocksFetched`, which is an Int, then it proceeds to cast the Int to a Long, which fails.
The fix is to mark accumulators representing SQL metrics using some internal metadata. Then we can identify which ones are SQL metrics and only process those in the `SQLHistoryListener`.
Author: Andrew Or <andrew@databricks.com>
Closes#10971 from andrewor14/fix-sql-history.
Our current Intersect physical operator simply delegates to RDD.intersect. We should remove the Intersect physical operator and simply transform a logical intersect into a semi-join with distinct. This way, we can take advantage of all the benefits of join implementations (e.g. managed memory, code generation, broadcast joins).
After a search, I found one of the mainstream RDBMS did the same. In their query explain, Intersect is replaced by Left-semi Join. Left-semi Join could help outer-join elimination in Optimizer, as shown in the PR: https://github.com/apache/spark/pull/10566
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10630 from gatorsmile/IntersectBySemiJoin.
1. enable whole stage codegen during tests even there is only one operator supports that.
2. split doProduce() into two APIs: upstream() and doProduce()
3. generate prefix for fresh names of each operator
4. pass UnsafeRow to parent directly (avoid getters and create UnsafeRow again)
5. fix bugs and tests.
This PR re-open #10944 and fix the bug.
Author: Davies Liu <davies@databricks.com>
Closes#10977 from davies/gen_refactor.
JIRA: https://issues.apache.org/jira/browse/SPARK-12968
Implement command to set current database.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10916 from viirya/ddl-use-database.
JIRA: https://issues.apache.org/jira/browse/SPARK-11955
Currently we simply skip pushdowning filters in parquet if we enable schema merging.
However, we can actually mark particular fields in merging schema for safely pushdowning filters in parquet.
Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#9940 from viirya/safe-pushdown-parquet-filters.
I tried to add this via `USE_BIG_DECIMAL_FOR_FLOATS` option from Jackson with no success.
Added test for non-complex types. Should I add a test for complex types?
Author: Brandon Bradley <bradleytastic@gmail.com>
Closes#10936 from blbradley/spark-12749.
1. enable whole stage codegen during tests even there is only one operator supports that.
2. split doProduce() into two APIs: upstream() and doProduce()
3. generate prefix for fresh names of each operator
4. pass UnsafeRow to parent directly (avoid getters and create UnsafeRow again)
5. fix bugs and tests.
Author: Davies Liu <davies@databricks.com>
Closes#10944 from davies/gen_refactor.
Users unknowingly try to set core Spark configs in SQLContext but later realise that it didn't work. eg. sqlContext.sql("SET spark.shuffle.memoryFraction=0.4"). This PR adds a warning message when such operations are done.
Author: Tejas Patil <tejasp@fb.com>
Closes#10849 from tejasapatil/SPARK-12926.
This PR is a follow-up of #10911. It adds specialized update methods for `CountMinSketch` so that we can avoid doing internal/external row format conversion in `DataFrame.countMinSketch()`.
Author: Cheng Lian <lian@databricks.com>
Closes#10968 from liancheng/cms-specialized.
This PR moves all the functionality provided by the SparkSQLParser/ExtendedHiveQlParser to the new Parser hierarchy (SparkQl/HiveQl). This also improves the current SET command parsing: the current implementation swallows ```set role ...``` and ```set autocommit ...``` commands, this PR respects these commands (and passes them on to Hive).
This PR and https://github.com/apache/spark/pull/10723 end the use of Parser-Combinator parsers for SQL parsing. As a result we can also remove the ```AbstractSQLParser``` in Catalyst.
The PR is marked WIP as long as it doesn't pass all tests.
cc rxin viirya winningsix (this touches https://github.com/apache/spark/pull/10144)
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10905 from hvanhovell/SPARK-12866.
This PR integrates Bloom filter from spark-sketch into DataFrame. This version resorts to RDD.aggregate for building the filter. A more performant UDAF version can be built in future follow-up PRs.
This PR also add 2 specify `put` version(`putBinary` and `putLong`) into `BloomFilter`, which makes it easier to build a Bloom filter over a `DataFrame`.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10937 from cloud-fan/bloom-filter.
The high level idea is that instead of having the executors send both accumulator updates and TaskMetrics, we should have them send only accumulator updates. This eliminates the need to maintain both code paths since one can be implemented in terms of the other. This effort is split into two parts:
**SPARK-12895: Implement TaskMetrics using accumulators.** TaskMetrics is basically just a bunch of accumulable fields. This patch makes TaskMetrics a syntactic wrapper around a collection of accumulators so we don't need to send TaskMetrics from the executors to the driver.
**SPARK-12896: Send only accumulator updates to the driver.** Now that TaskMetrics are expressed in terms of accumulators, we can capture all TaskMetrics values if we just send accumulator updates from the executors to the driver. This completes the parent issue SPARK-10620.
While an effort has been made to preserve as much of the public API as possible, there were a few known breaking DeveloperApi changes that would be very awkward to maintain. I will gather the full list shortly and post it here.
Note: This was once part of #10717. This patch is split out into its own patch from there to make it easier for others to review. Other smaller pieces of already been merged into master.
Author: Andrew Or <andrew@databricks.com>
Closes#10835 from andrewor14/task-metrics-use-accums.
This PR is a follow-up of PR #10541. It integrates the newly introduced SQL generation feature with native view to make native view canonical.
In this PR, a new SQL option `spark.sql.nativeView.canonical` is added. When this option and `spark.sql.nativeView` are both `true`, Spark SQL tries to handle `CREATE VIEW` DDL statements using SQL query strings generated from view definition logical plans. If we failed to map the plan to SQL, we fallback to the original native view approach.
One important issue this PR fixes is that, now we can use CTE when defining a view. Originally, when native view is turned on, we wrap the view definition text with an extra `SELECT`. However, HiveQL parser doesn't allow CTE appearing as a subquery. Namely, something like this is disallowed:
```sql
SELECT n
FROM (
WITH w AS (SELECT 1 AS n)
SELECT * FROM w
) v
```
This PR fixes this issue because the extra `SELECT` is no longer needed (also, CTE expressions are inlined as subqueries during analysis phase, thus there won't be CTE expressions in the generated SQL query string).
Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#10733 from liancheng/spark-12728.integrate-sql-gen-with-native-view.
This PR integrates Count-Min Sketch from spark-sketch into DataFrame. This version resorts to `RDD.aggregate` for building the sketch. A more performant UDAF version can be built in future follow-up PRs.
Author: Cheng Lian <lian@databricks.com>
Closes#10911 from liancheng/cms-df-api.
This patch adds support for complex types for ColumnarBatch. ColumnarBatch supports structs
and arrays. There is a simple mapping between the richer catalyst types to these two. Strings
are treated as an array of bytes.
ColumnarBatch will contain a column for each node of the schema. Non-complex schemas consists
of just leaf nodes. Structs represent an internal node with one child for each field. Arrays
are internal nodes with one child. Structs just contain nullability. Arrays contain offsets
and lengths into the child array. This structure is able to handle arbitrary nesting. It has
the key property that we maintain columnar throughout and that primitive types are only stored
in the leaf nodes and contiguous across rows. For example, if the schema is
```
array<array<int>>
```
There are three columns in the schema. The internal nodes each have one children. The leaf node contains all the int data stored consecutively.
As part of this, this patch adds append APIs in addition to the Put APIs (e.g. putLong(rowid, v)
vs appendLong(v)). These APIs are necessary when the batch contains variable length elements.
The vectors are not fixed length and will grow as necessary. This should make the usage a lot
simpler for the writer.
Author: Nong Li <nong@databricks.com>
Closes#10820 from nongli/spark-12854.
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable.
CC rxin pwendell for API change; tdas since it also touches streaming.
Author: Sean Owen <sowen@cloudera.com>
Closes#10413 from srowen/SPARK-3369.
This pull request simply fixes a few minor coding style issues in csv, as I was reviewing the change post-hoc.
Author: Reynold Xin <rxin@databricks.com>
Closes#10919 from rxin/csv-minor.
As we begin to use unsafe row writing framework(`BufferHolder` and `UnsafeRowWriter`) in more and more places(`UnsafeProjection`, `UnsafeRowParquetRecordReader`, `GenerateColumnAccessor`, etc.), we should add more doc to it and make it easier to use.
This PR abstract the technique used in `UnsafeRowParquetRecordReader`: avoid unnecessary operatition as more as possible. For example, do not always point the row to the buffer at the end, we only need to update the size of row. If all fields are of primitive type, we can even save the row size updating. Then we can apply this technique to more places easily.
a local benchmark shows `UnsafeProjection` is up to 1.7x faster after this PR:
**old version**
```
Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz
unsafe projection: Avg Time(ms) Avg Rate(M/s) Relative Rate
-------------------------------------------------------------------------------
single long 2616.04 102.61 1.00 X
single nullable long 3032.54 88.52 0.86 X
primitive types 9121.05 29.43 0.29 X
nullable primitive types 12410.60 21.63 0.21 X
```
**new version**
```
Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz
unsafe projection: Avg Time(ms) Avg Rate(M/s) Relative Rate
-------------------------------------------------------------------------------
single long 1533.34 175.07 1.00 X
single nullable long 2306.73 116.37 0.66 X
primitive types 8403.93 31.94 0.18 X
nullable primitive types 12448.39 21.56 0.12 X
```
For single non-nullable long(the best case), we can have about 1.7x speed up. Even it's nullable, we can still have 1.3x speed up. For other cases, it's not such a boost as the saved operations only take a little proportion of the whole process. The benchmark code is included in this PR.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10809 from cloud-fan/unsafe-projection.
When users are using `partitionBy` and `bucketBy` at the same time, some bucketing columns might be part of partitioning columns. For example,
```
df.write
.format(source)
.partitionBy("i")
.bucketBy(8, "i", "k")
.saveAsTable("bucketed_table")
```
However, in the above case, adding column `i` into `bucketBy` is useless. It is just wasting extra CPU when reading or writing bucket tables. Thus, like Hive, we can issue an exception and let users do the change.
Also added a test case for checking if the information of `sortBy` and `bucketBy` columns are correctly saved in the metastore table.
Could you check if my understanding is correct? cloud-fan rxin marmbrus Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10891 from gatorsmile/commonKeysInPartitionByBucketBy.
https://issues.apache.org/jira/browse/SPARK-12901
This PR refactors the options in JSON and CSV datasources.
In more details,
1. `JSONOptions` uses the same format as `CSVOptions`.
2. Not case classes.
3. `CSVRelation` that does not have to be serializable (it was `with Serializable` but I removed)
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#10895 from HyukjinKwon/SPARK-12901.
When actual row length doesn't conform to specified schema field length, we should give a better error message instead of throwing an unintuitive `ArrayOutOfBoundsException`.
Author: Cheng Lian <lian@databricks.com>
Closes#10886 from liancheng/spark-12624.
This pull request implements strength reduction for comparing integral expressions and decimal literals, which is more common now because we switch to parsing fractional literals as decimal types (rather than doubles). I added the rules to the existing DecimalPrecision rule with some refactoring to simplify the control flow. I also moved DecimalPrecision rule into its own file due to the growing size.
Author: Reynold Xin <rxin@databricks.com>
Closes#10882 from rxin/SPARK-12904-1.
https://issues.apache.org/jira/browse/SPARK-12872
This PR makes the JSON datasource can compress output by option instead of manually setting Hadoop configurations.
For reflecting codec by names, it is similar with https://github.com/apache/spark/pull/10805.
As `CSVCompressionCodecs` can be shared with other datasources, it became a separate class to share as `CompressionCodecs`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#10858 from HyukjinKwon/SPARK-12872.
When users turn off bucketing in SQLConf, we should issue some messages to tell users these operations will be converted to normal way.
Also added a test case for this scenario and fixed the helper function.
Do you think this PR is helpful when using bucket tables? cloud-fan Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10870 from gatorsmile/bucketTableWritingTestcases.
The existing `Union` logical operator only supports two children. Thus, adding a new logical operator `Unions` which can have arbitrary number of children to replace the existing one.
`Union` logical plan is a binary node. However, a typical use case for union is to union a very large number of input sources (DataFrames, RDDs, or files). It is not uncommon to union hundreds of thousands of files. In this case, our optimizer can become very slow due to the large number of logical unions. We should change the Union logical plan to support an arbitrary number of children, and add a single rule in the optimizer to collapse all adjacent `Unions` into a single `Unions`. Note that this problem doesn't exist in physical plan, because the physical `Unions` already supports arbitrary number of children.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10577 from gatorsmile/unionAllMultiChildren.
https://issues.apache.org/jira/browse/SPARK-12871
This PR added an option to support to specify compression codec.
This adds the option `codec` as an alias `compression` as filed in [SPARK-12668 ](https://issues.apache.org/jira/browse/SPARK-12668).
Note that I did not add configurations for Hadoop 1.x as this `CsvRelation` is using Hadoop 2.x API and I guess it is going to drop Hadoop 1.x support.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#10805 from HyukjinKwon/SPARK-12420.
This is a step in implementing SPARK-10620, which migrates TaskMetrics to accumulators.
TaskMetrics has a bunch of var's, some are fully public, some are `private[spark]`. This is bad coding style that makes it easy to accidentally overwrite previously set metrics. This has happened a few times in the past and caused bugs that were difficult to debug.
Instead, we should have get-or-create semantics, which are more readily understandable. This makes sense in the case of TaskMetrics because these are just aggregated metrics that we want to collect throughout the task, so it doesn't matter who's incrementing them.
Parent PR: #10717
Author: Andrew Or <andrew@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Author: andrewor14 <andrew@databricks.com>
Closes#10815 from andrewor14/get-or-create-metrics.
for normal parquet file without bucket, it's file name ends with a jobUUID which maybe all numbers and mistakeny regarded as bucket id. This PR improves the format of bucket id in file name by using a different seperator, `_`, so that the regex is more robust.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10799 from cloud-fan/fix-bucket.
Currently SortMergeJoin and BroadcastHashJoin do not support condition, the need a followed Filter for that, the result projection to generate UnsafeRow could be very expensive if they generate lots of rows and could be filtered mostly by condition.
This PR brings the support of condition for SortMergeJoin and BroadcastHashJoin, just like other outer joins do.
This could improve the performance of Q72 by 7x (from 120s to 16.5s).
Author: Davies Liu <davies@databricks.com>
Closes#10653 from davies/filter_join.
Based on discussions in #10801, I'm submitting a pull request to rename ParserDialect to ParserInterface.
Author: Reynold Xin <rxin@databricks.com>
Closes#10817 from rxin/SPARK-12889.
In SPARK-10743 we wrap cast with `UnresolvedAlias` to give `Cast` a better alias if possible. However, for cases like `filter`, the `UnresolvedAlias` can't be resolved and actually we don't need a better alias for this case. This PR move the cast wrapping logic to `Column.named` so that we will only do it when we need a alias name.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10781 from cloud-fan/bug.
This pull request removes the public developer parser API for external parsers. Given everything a parser depends on (e.g. logical plans and expressions) are internal and not stable, external parsers will break with every release of Spark. It is a bad idea to create the illusion that Spark actually supports pluggable parsers. In addition, this also reduces incentives for 3rd party projects to contribute parse improvements back to Spark.
Author: Reynold Xin <rxin@databricks.com>
Closes#10801 from rxin/SPARK-12855.
This is the initial work for whole stage codegen, it support Projection/Filter/Range, we will continue work on this to support more physical operators.
A micro benchmark show that a query with range, filter and projection could be 3X faster then before.
It's turned on by default. For a tree that have at least two chained plans, a WholeStageCodegen will be inserted into it, for example, the following plan
```
Limit 10
+- Project [(id#5L + 1) AS (id + 1)#6L]
+- Filter ((id#5L & 1) = 1)
+- Range 0, 1, 4, 10, [id#5L]
```
will be translated into
```
Limit 10
+- WholeStageCodegen
+- Project [(id#1L + 1) AS (id + 1)#2L]
+- Filter ((id#1L & 1) = 1)
+- Range 0, 1, 4, 10, [id#1L]
```
Here is the call graph to generate Java source for A and B (A support codegen, but B does not):
```
* WholeStageCodegen Plan A FakeInput Plan B
* =========================================================================
*
* -> execute()
* |
* doExecute() --------> produce()
* |
* doProduce() -------> produce()
* |
* doProduce() ---> execute()
* |
* consume()
* doConsume() ------------|
* |
* doConsume() <----- consume()
```
A SparkPlan that support codegen need to implement doProduce() and doConsume():
```
def doProduce(ctx: CodegenContext): (RDD[InternalRow], String)
def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String
```
Author: Davies Liu <davies@databricks.com>
Closes#10735 from davies/whole2.
This inlines a few of the Parquet decoders and adds vectorized APIs to support decoding in batch.
There are a few particulars in the Parquet encodings that make this much more efficient. In
particular, RLE encodings are very well suited for batch decoding. The Parquet 2.0 encodings are
also very suited for this.
This is a work in progress and does not affect the current execution. In subsequent patches, we will
support more encodings and types before enabling this.
Simple benchmarks indicate this can decode single ints about > 3x faster.
Author: Nong Li <nong@databricks.com>
Author: Nong <nongli@gmail.com>
Closes#10593 from nongli/spark-12644.
This PR adds the support to read bucketed tables, and correctly populate `outputPartitioning`, so that we can avoid shuffle for some cases.
TODO(follow-up PRs):
* bucket pruning
* avoid shuffle for bucketed table join when use any super-set of the bucketing key.
(we should re-visit it after https://issues.apache.org/jira/browse/SPARK-12704 is fixed)
* recognize hive bucketed table
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10604 from cloud-fan/bucket-read.
In this PR the new CatalystQl parser stack reaches grammar parity with the old Parser-Combinator based SQL Parser. This PR also replaces all uses of the old Parser, and removes it from the code base.
Although the existing Hive and SQL parser dialects were mostly the same, some kinks had to be worked out:
- The SQL Parser allowed syntax like ```APPROXIMATE(0.01) COUNT(DISTINCT a)```. In order to make this work we needed to hardcode approximate operators in the parser, or we would have to create an approximate expression. ```APPROXIMATE_COUNT_DISTINCT(a, 0.01)``` would also do the job and is much easier to maintain. So, this PR **removes** this keyword.
- The old SQL Parser supports ```LIMIT``` clauses in nested queries. This is **not supported** anymore. See https://github.com/apache/spark/pull/10689 for the rationale for this.
- Hive has a charset name char set literal combination it supports, for instance the following expression ```_ISO-8859-1 0x4341464562616265``` would yield this string: ```CAFEbabe```. Hive will only allow charset names to start with an underscore. This is quite annoying in spark because as soon as you use a tuple names will start with an underscore. In this PR we **remove** this feature from the parser. It would be quite easy to implement such a feature as an Expression later on.
- Hive and the SQL Parser treat decimal literals differently. Hive will turn any decimal into a ```Double``` whereas the SQL Parser would convert a non-scientific decimal into a ```BigDecimal```, and would turn a scientific decimal into a Double. We follow Hive's behavior here. The new parser supports a big decimal literal, for instance: ```81923801.42BD```, which can be used when a big decimal is needed.
cc rxin viirya marmbrus yhuai cloud-fan
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10745 from hvanhovell/SPARK-12575-2.
CSV is the most common data format in the "small data" world. It is often the first format people want to try when they see Spark on a single node. Having to rely on a 3rd party component for this leads to poor user experience for new users. This PR merges the popular spark-csv data source package (https://github.com/databricks/spark-csv) with SparkSQL.
This is a first PR to bring the functionality to spark 2.0 master. We will complete items outlines in the design document (see JIRA attachment) in follow up pull requests.
Author: Hossein <hossein@databricks.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#10766 from rxin/csv.
The goal of this PR is to eliminate unnecessary translations when there are back-to-back `MapPartitions` operations. In order to achieve this I also made the following simplifications:
- Operators no longer have hold encoders, instead they have only the expressions that they need. The benefits here are twofold: the expressions are visible to transformations so go through the normal resolution/binding process. now that they are visible we can change them on a case by case basis.
- Operators no longer have type parameters. Since the engine is responsible for its own type checking, having the types visible to the complier was an unnecessary complication. We still leverage the scala compiler in the companion factory when constructing a new operator, but after this the types are discarded.
Deferred to a follow up PR:
- Remove as much of the resolution/binding from Dataset/GroupedDataset as possible. We should still eagerly check resolution and throw an error though in the case of mismatches for an `as` operation.
- Eliminate serializations in more cases by adding more cases to `EliminateSerialization`
Author: Michael Armbrust <michael@databricks.com>
Closes#10747 from marmbrus/encoderExpressions.
This PR makes bucketing and exchange share one common hash algorithm, so that we can guarantee the data distribution is same between shuffle and bucketed data source, which enables us to only shuffle one side when join a bucketed table and a normal one.
This PR also fixes the tests that are broken by the new hash behaviour in shuffle.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10703 from cloud-fan/use-hash-expr-in-shuffle.
This pull request rewrites CaseWhen expression to break the single, monolithic "branches" field into a sequence of tuples (Seq[(condition, value)]) and an explicit optional elseValue field.
Prior to this pull request, each even position in "branches" represents the condition for each branch, and each odd position represents the value for each branch. The use of them have been pretty confusing with a lot sliding windows or grouped(2) calls.
Author: Reynold Xin <rxin@databricks.com>
Closes#10734 from rxin/simplify-case.
Fix the style violation (space before , and :).
This PR is a followup for #10643 and rework of #10685 .
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#10732 from sarutak/SPARK-12692-followup-sql.
There are many potential benefits of having an efficient in memory columnar format as an alternate
to UnsafeRow. This patch introduces ColumnarBatch/ColumnarVector which starts this effort. The
remaining implementation can be done as follow up patches.
As stated in the in the JIRA, there are useful external components that operate on memory in a
simple columnar format. ColumnarBatch would serve that purpose and could server as a
zero-serialization/zero-copy exchange for this use case.
This patch supports running the underlying data either on heap or off heap. On heap runs a bit
faster but we would need offheap for zero-copy exchanges. Currently, this mode is hidden behind one
interface (ColumnVector).
This differs from Parquet or the existing columnar cache because this is *not* intended to be used
as a storage format. The focus is entirely on CPU efficiency as we expect to only have 1 of these
batches in memory per task. The layout of the values is just dense arrays of the value type.
Author: Nong Li <nong@databricks.com>
Author: Nong <nongli@gmail.com>
Closes#10628 from nongli/spark-12635.
This PR implements SQL generation support for persisted data source tables. A new field `metastoreTableIdentifier: Option[TableIdentifier]` is added to `LogicalRelation`. When a `LogicalRelation` representing a persisted data source relation is created, this field holds the database name and table name of the relation.
Author: Cheng Lian <lian@databricks.com>
Closes#10712 from liancheng/spark-12724-datasources-sql-gen.
Let me know whether you'd like to see it in other place
Author: Robert Kruszewski <robertk@palantir.com>
Closes#10210 from robert3005/feature/pluggable-optimizer.
Fix the style violation (space before , and :).
This PR is a followup for #10643.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#10718 from sarutak/SPARK-12692-followup-sql.
JIRA: https://issues.apache.org/jira/browse/SPARK-12744
This PR makes parsing JSON integers to timestamps consistent with casting behavior.
Author: Anatoliy Plastinin <anatoliy.plastinin@gmail.com>
Closes#10687 from antlypls/fix-json-timestamp-parsing.
Turn import ordering violations into build errors, plus a few adjustments
to account for how the checker behaves. I'm a little on the fence about
whether the existing code is right, but it's easier to appease the checker
than to discuss what's the more correct order here.
Plus a few fixes to imports that cropped in since my recent cleanups.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#10612 from vanzin/SPARK-3873-enable.
This PR tries to enable Spark SQL to convert resolved logical plans back to SQL query strings. For now, the major use case is to canonicalize Spark SQL native view support. The major entry point is `SQLBuilder.toSQL`, which returns an `Option[String]` if the logical plan is recognized.
The current version is still in WIP status, and is quite limited. Known limitations include:
1. The logical plan must be analyzed but not optimized
The optimizer erases `Subquery` operators, which contain necessary scope information for SQL generation. Future versions should be able to recover erased scope information by inserting subqueries when necessary.
1. The logical plan must be created using HiveQL query string
Query plans generated by composing arbitrary DataFrame API combinations are not supported yet. Operators within these query plans need to be rearranged into a canonical form that is more suitable for direct SQL generation. For example, the following query plan
```
Filter (a#1 < 10)
+- MetastoreRelation default, src, None
```
need to be canonicalized into the following form before SQL generation:
```
Project [a#1, b#2, c#3]
+- Filter (a#1 < 10)
+- MetastoreRelation default, src, None
```
Otherwise, the SQL generation process will have to handle a large number of special cases.
1. Only a fraction of expressions and basic logical plan operators are supported in this PR
Currently, 95.7% (1720 out of 1798) query plans in `HiveCompatibilitySuite` can be successfully converted to SQL query strings.
Known unsupported components are:
- Expressions
- Part of math expressions
- Part of string expressions (buggy?)
- Null expressions
- Calendar interval literal
- Part of date time expressions
- Complex type creators
- Special `NOT` expressions, e.g. `NOT LIKE` and `NOT IN`
- Logical plan operators/patterns
- Cube, rollup, and grouping set
- Script transformation
- Generator
- Distinct aggregation patterns that fit `DistinctAggregationRewriter` analysis rule
- Window functions
Support for window functions, generators, and cubes etc. will be added in follow-up PRs.
This PR leverages `HiveCompatibilitySuite` for testing SQL generation in a "round-trip" manner:
* For all select queries, we try to convert it back to SQL
* If the query plan is convertible, we parse the generated SQL into a new logical plan
* Run the new logical plan instead of the original one
If the query plan is inconvertible, the test case simply falls back to the original logic.
TODO
- [x] Fix failed test cases
- [x] Support for more basic expressions and logical plan operators (e.g. distinct aggregation etc.)
- [x] Comments and documentation
Author: Cheng Lian <lian@databricks.com>
Closes#10541 from liancheng/sql-generation.
Fix most build warnings: mostly deprecated API usages. I'll annotate some of the changes below. CC rxin who is leading the charge to remove the deprecated APIs.
Author: Sean Owen <sowen@cloudera.com>
Closes#10570 from srowen/SPARK-12618.
This PR is continue from previous closed PR 10314.
In this PR, SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE will be taken memory string conventions as input.
For example, the user can now specify 10g for SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE in SQLConf file.
marmbrus srowen : Can you help review this code changes ? Thanks.
Author: Kevin Yu <qyu@us.ibm.com>
Closes#10629 from kevinyu98/spark-12317.
It was introduced in 917d3fc069
/cc cloud-fan rxin
Author: Jacek Laskowski <jacek@japila.pl>
Closes#10636 from jaceklaskowski/fix-for-build-failure-2.11.
This PR manage the memory used by window functions (buffered rows), also enable external spilling.
After this PR, we can run window functions on a partition with hundreds of millions of rows with only 1G.
Author: Davies Liu <davies@databricks.com>
Closes#10605 from davies/unsafe_window.
This PR adds bucket write support to Spark SQL. User can specify bucketing columns, numBuckets and sorting columns with or without partition columns. For example:
```
df.write.partitionBy("year").bucketBy(8, "country").sortBy("amount").saveAsTable("sales")
```
When bucketing is used, we will calculate bucket id for each record, and group the records by bucket id. For each group, we will create a file with bucket id in its name, and write data into it. For each bucket file, if sorting columns are specified, the data will be sorted before write.
Note that there may be multiply files for one bucket, as the data is distributed.
Currently we store the bucket metadata at hive metastore in a non-hive-compatible way. We use different bucketing hash function compared to hive, so we can't be compatible anyway.
Limitations:
* Can't write bucketed data without hive metastore.
* Can't insert bucketed data into existing hive tables.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10498 from cloud-fan/bucket-write.
This PR moves a major part of the new SQL parser to Catalyst. This is a prelude to start using this parser for all of our SQL parsing. The following key changes have been made:
The ANTLR Parser & Supporting classes have been moved to the Catalyst project. They are now part of the ```org.apache.spark.sql.catalyst.parser``` package. These classes contained quite a bit of code that was originally from the Hive project, I have added aknowledgements whenever this applied. All Hive dependencies have been factored out. I have also taken this chance to clean-up the ```ASTNode``` class, and to improve the error handling.
The HiveQl object that provides the functionality to convert an AST into a LogicalPlan has been refactored into three different classes, one for every SQL sub-project:
- ```CatalystQl```: This implements Query and Expression parsing functionality.
- ```SparkQl```: This is a subclass of CatalystQL and provides SQL/Core only functionality such as Explain and Describe.
- ```HiveQl```: This is a subclass of ```SparkQl``` and this adds Hive-only functionality to the parser such as Analyze, Drop, Views, CTAS & Transforms. This class still depends on Hive.
cc rxin
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10583 from hvanhovell/SPARK-12575.
For queries like :
select <> from table group by a distribute by a
we can eliminate distribute by ; since group by will anyways do a hash partitioning
Also applicable when user uses Dataframe API
Author: Yash Datta <Yash.Datta@guavus.com>
Closes#9858 from saucam/eliminatedistribute.
This fix masks JDBC credentials in the explain output. URL patterns to specify credential seems to be vary between different databases. Added a new method to dialect to mask the credentials according to the database specific URL pattern.
While adding tests I noticed explain output includes array variable for partitions ([Lorg.apache.spark.Partition;3ff74546,). Modified the code to include the first, and last partition information.
Author: sureshthalamati <suresh.thalamati@gmail.com>
Closes#10452 from sureshthalamati/mask_jdbc_credentials_spark-12504.
As noted in the code, this change is to make this component easier to test in isolation.
Author: Nong <nongli@gmail.com>
Closes#10581 from nongli/spark-12636.
address comments in #10435
This makes the API easier to use if user programmatically generate the call to hash, and they will get analysis exception if the arguments of hash is empty.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10588 from cloud-fan/hash.
just write the arguments into unsafe row and use murmur3 to calculate hash code
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10435 from cloud-fan/hash-expr.
Currently, when we call corr or cov on dataframe with invalid input we see these error messages for both corr and cov:
- "Currently cov supports calculating the covariance between two columns"
- "Covariance calculation for columns with dataType "[DataType Name]" not supported."
I've fixed this issue by passing the function name as an argument. We could also do the input checks separately for each function. I avoided doing that because of code duplication.
Thanks!
Author: Narine Kokhlikyan <narine.kokhlikyan@gmail.com>
Closes#10458 from NarineK/sparksqlstatsmessages.
The reader was previously not setting the row length meaning it was wrong if there were variable
length columns. This problem does not manifest usually, since the value in the column is correct and
projecting the row fixes the issue.
Author: Nong Li <nong@databricks.com>
Closes#10576 from nongli/spark-12589.
This PR enable cube/rollup as function, so they can be used as this:
```
select a, b, sum(c) from t group by rollup(a, b)
```
Author: Davies Liu <davies@databricks.com>
Closes#10522 from davies/rollup.
Spark SQL's JDBC data source allows users to specify an explicit JDBC driver to load (using the `driver` argument), but in the current code it's possible that the user-specified driver will not be used when it comes time to actually create a JDBC connection.
In a nutshell, the problem is that you might have multiple JDBC drivers on the classpath that claim to be able to handle the same subprotocol, so simply registering the user-provided driver class with the our `DriverRegistry` and JDBC's `DriverManager` is not sufficient to ensure that it's actually used when creating the JDBC connection.
This patch addresses this issue by first registering the user-specified driver with the DriverManager, then iterating over the driver manager's loaded drivers in order to obtain the correct driver and use it to create a connection (previously, we just called `DriverManager.getConnection()` directly).
If a user did not specify a JDBC driver to use, then we call `DriverManager.getDriver` to figure out the class of the driver to use, then pass that class's name to executors; this guards against corner-case bugs in situations where the driver and executor JVMs might have different sets of JDBC drivers on their classpaths (previously, there was the (rare) potential for `DriverManager.getConnection()` to use different drivers on the driver and executors if the user had not explicitly specified a JDBC driver class and the classpaths were different).
This patch is inspired by a similar patch that I made to the `spark-redshift` library (https://github.com/databricks/spark-redshift/pull/143), which contains its own modified fork of some of Spark's JDBC data source code (for cross-Spark-version compatibility reasons).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10519 from JoshRosen/jdbc-driver-precedence.
We can provides the option to choose JSON parser can be enabled to accept quoting of all character or not.
Author: Cazen <Cazen@korea.com>
Author: Cazen Lee <cazen.lee@samsung.com>
Author: Cazen Lee <Cazen@korea.com>
Author: cazen.lee <cazen.lee@samsung.com>
Closes#10497 from Cazen/master.
callUDF has been deprecated. However, we do not have an alternative for users to specify the output data type without type tags. This pull request introduced a new API for that, and replaces the invocation of the deprecated callUDF with that.
Author: Reynold Xin <rxin@databricks.com>
Closes#10547 from rxin/SPARK-12599.
This PR is followed by https://github.com/apache/spark/pull/8391.
Previous PR fixes JDBCRDD to support null-safe equality comparison for JDBC datasource. This PR fixes the problem that it can actually return null as a result of the comparison resulting error as using the value of that comparison.
Author: hyukjinkwon <gurwls223@gmail.com>
Author: HyukjinKwon <gurwls223@gmail.com>
Closes#8743 from HyukjinKwon/SPARK-10180.
It's confusing that some operator output UnsafeRow but some not, easy to make mistake.
This PR change to only output UnsafeRow for all the operators (SparkPlan), removed the rule to insert Unsafe/Safe conversions. For those that can't output UnsafeRow directly, added UnsafeProjection into them.
Closes#10330
cc JoshRosen rxin
Author: Davies Liu <davies@databricks.com>
Closes#10511 from davies/unsafe_row.
This patch refactors the filter pushdown for JDBCRDD and also adds few filters.
Added filters are basically from #10468 with some refactoring. Test cases are from #10468.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#10470 from viirya/refactor-jdbc-filter.
Right now, numFields will be passed in by pointTo(), then bitSetWidthInBytes is calculated, making pointTo() a little bit heavy.
It should be part of constructor of UnsafeRow.
Author: Davies Liu <davies@databricks.com>
Closes#10528 from davies/numFields.
This is rework from #10386 and add more tests and LIKE push-down support.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#10468 from maropu/SupportMorePushdownInJdbc.
In Spark we allow UDFs to declare its expected input types in order to apply type coercion. The expected input type parameter takes a Seq[DataType] and uses Nil when no type coercion is applied. It makes more sense to take Option[Seq[DataType]] instead, so we can differentiate a no-arg function vs function with no expected input type specified.
Author: Reynold Xin <rxin@databricks.com>
Closes#10504 from rxin/SPARK-12549.
* Changes api.r.SQLUtils to use ```SQLContext.getOrCreate``` instead of creating a new context.
* Adds a simple test
[SPARK-11199] #comment link with JIRA
Author: Hossein <hossein@databricks.com>
Closes#9185 from falaki/SPARK-11199.
If DataFrame has BYTE types, throws an exception:
org.postgresql.util.PSQLException: ERROR: type "byte" does not exist
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#9350 from maropu/FixBugInPostgreJdbc.
We use scalastyle:off to turn off style checks in certain places where it is not possible to follow the style guide. This is usually ok. However, in udf registration, we disable the checker for a large amount of code simply because some of them exceed 100 char line limit. It is better to just disable the line limit check rather than everything.
In this pull request, I only disabled line length check, and fixed a problem (lack explicit types for public methods).
Author: Reynold Xin <rxin@databricks.com>
Closes#10501 from rxin/SPARK-12547.
Fixing the missing the document for the configuration. We can see the missing messages "TODO" when issuing the command "SET -V".
```
spark.sql.columnNameOfCorruptRecord
spark.sql.hive.verifyPartitionPath
spark.sql.sources.parallelPartitionDiscovery.threshold
spark.sql.hive.convertMetastoreParquet.mergeSchema
spark.sql.hive.convertCTAS
spark.sql.hive.thriftServer.async
```
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10471 from gatorsmile/commandDesc.
Include the following changes:
1. Close `java.sql.Statement`
2. Fix incorrect `asInstanceOf`.
3. Remove unnecessary `synchronized` and `ReentrantLock`.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#10440 from zsxwing/findbugs.
When explain any plan with Generate, we will see an exclamation mark in the plan. Normally, when we see this mark, it means the plan has an error. This PR is to correct the `missingInput` in `Generate`.
For example,
```scala
val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
val df2 =
df.explode('letters) {
case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
}
df2.explain(true)
```
Before the fix, the plan is like
```
== Parsed Logical Plan ==
'Generate UserDefinedGenerator('letters), true, false, None
+- Project [_1#0 AS number#2,_2#1 AS letters#3]
+- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]]
== Analyzed Logical Plan ==
number: int, letters: string, _1: string
Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8]
+- Project [_1#0 AS number#2,_2#1 AS letters#3]
+- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]]
== Optimized Logical Plan ==
Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8]
+- LocalRelation [number#2,letters#3], [[1,a b c],[2,a b],[3,a]]
== Physical Plan ==
!Generate UserDefinedGenerator(letters#3), true, false, [number#2,letters#3,_1#8]
+- LocalTableScan [number#2,letters#3], [[1,a b c],[2,a b],[3,a]]
```
**Updates**: The same issues are also found in the other four Dataset operators: `MapPartitions`/`AppendColumns`/`MapGroups`/`CoGroup`. Fixed all these four.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10393 from gatorsmile/generateExplain.
Support Unsafe Row in MapPartitions/MapGroups/CoGroup.
Added a test case for MapPartitions. Since MapGroups and CoGroup are built on AppendColumns, all the related dataset test cases already can verify the correctness when MapGroups and CoGroup processing unsafe rows.
davies cloud-fan Not sure if my understanding is right, please correct me. Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10398 from gatorsmile/unsafeRowMapGroup.
Hello Michael & All:
We have some issues to submit the new codes in the other PR(#10299), so we closed that PR and open this one with the fix.
The reason for the previous failure is that the projection for the scan when there is a filter that is not pushed down (the "left-over" filter) could be different, in elements or ordering, from the original projection.
With this new codes, the approach to solve this problem is:
Insert a new Project if the "left-over" filter is nonempty and (the original projection is not empty and the projection for the scan has more than one elements which could otherwise cause different ordering in projection).
We create 3 test cases to cover the otherwise failure cases.
Author: Kevin Yu <qyu@us.ibm.com>
Closes#10388 from kevinyu98/spark-12231.
In the past Spark JDBC write only worked with technologies which support the following INSERT statement syntax (JdbcUtils.scala: insertStatement()):
INSERT INTO $table VALUES ( ?, ?, ..., ? )
But some technologies require a list of column names:
INSERT INTO $table ( $colNameList ) VALUES ( ?, ?, ..., ? )
This was blocking the use of e.g. the Progress JDBC Driver for Cassandra.
Another limitation is that syntax 1 relies no the dataframe field ordering match that of the target table. This works fine, as long as the target table has been created by writer.jdbc().
If the target table contains more columns (not created by writer.jdbc()), then the insert fails due mismatch of number of columns or their data types.
This PR switches to the recommended second INSERT syntax. Column names are taken from datafram field names.
Author: CK50 <christian.kurz@oracle.com>
Closes#10380 from CK50/master-SPARK-12010-2.
When the filter is ```"b in ('1', '2')"```, the filter is not pushed down to Parquet. Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10278 from gatorsmile/parquetFilterNot.
This PR adds a new expression `AssertNotNull` to ensure non-nullable fields of products and case classes don't receive null values at runtime.
Author: Cheng Lian <lian@databricks.com>
Closes#10331 from liancheng/dataset-nullability-check.
Updates made in SPARK-11206 missed an edge case which cause's a NullPointerException when a task is killed. In some cases when a task ends in failure taskMetrics is initialized as null (see JobProgressListener.onTaskEnd()). To address this a null check was added. Before the changes in SPARK-11206 this null check was called at the start of the updateTaskAccumulatorValues() function.
Author: Alex Bozarth <ajbozart@us.ibm.com>
Closes#10405 from ajbozarth/spark12339.
Based on the suggestions from marmbrus , added logical/physical operators for Range for improving the performance.
Also added another API for resolving the JIRA Spark-12150.
Could you take a look at my implementation, marmbrus ? If not good, I can rework it. : )
Thank you very much!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10335 from gatorsmile/rangeOperators.
When a DataFrame or Dataset has a long schema, we should intelligently truncate to avoid flooding the screen with unreadable information.
// Standard output
[a: int, b: int]
// Truncate many top level fields
[a: int, b, string ... 10 more fields]
// Truncate long inner structs
[a: struct<a: Int ... 10 more fields>]
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#10373 from dilipbiswal/spark-12398.
JIRA: https://issues.apache.org/jira/browse/SPARK-12218
When creating filters for Parquet/ORC, we should not push nested AND expressions partially.
Author: Yin Huai <yhuai@databricks.com>
Closes#10362 from yhuai/SPARK-12218.
This could simplify the generated code for expressions that is not nullable.
This PR fix lots of bugs about nullability.
Author: Davies Liu <davies@databricks.com>
Closes#10333 from davies/skip_nullable.
Description of the problem from cloud-fan
Actually this line: https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L689
When we use `selectExpr`, we pass in `UnresolvedFunction` to `DataFrame.select` and fall in the last case. A workaround is to do special handling for UDTF like we did for `explode`(and `json_tuple` in 1.6), wrap it with `MultiAlias`.
Another workaround is using `expr`, for example, `df.select(expr("explode(a)").as(Nil))`, I think `selectExpr` is no longer needed after we have the `expr` function....
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9981 from dilipbiswal/spark-11619.
This PR removes Hive windows functions from Spark and replaces them with (native) Spark ones. The PR is on par with Hive in terms of features.
This has the following advantages:
* Better memory management.
* The ability to use spark UDAFs in Window functions.
cc rxin / yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9819 from hvanhovell/SPARK-8641-2.
Since we rename the column name from ```text``` to ```value``` for DataFrame load by ```SQLContext.read.text```, we need to update doc.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#10349 from yanboliang/text-value.
For API DataFrame.join(right, usingColumns, joinType), if the joinType is right_outer or full_outer, the resulting join columns could be wrong (will be null).
The order of columns had been changed to match that with MySQL and PostgreSQL [1].
This PR also fix the nullability of output for outer join.
[1] http://www.postgresql.org/docs/9.2/static/queries-table-expressions.html
Author: Davies Liu <davies@databricks.com>
Closes#10353 from davies/fix_join.
This PR makes JSON parser and schema inference handle more cases where we have unparsed records. It is based on #10043. The last commit fixes the failed test and updates the logic of schema inference.
Regarding the schema inference change, if we have something like
```
{"f1":1}
[1,2,3]
```
originally, we will get a DF without any column.
After this change, we will get a DF with columns `f1` and `_corrupt_record`. Basically, for the second row, `[1,2,3]` will be the value of `_corrupt_record`.
When merge this PR, please make sure that the author is simplyianm.
JIRA: https://issues.apache.org/jira/browse/SPARK-12057Closes#10043
Author: Ian Macalinao <me@ian.pw>
Author: Yin Huai <yhuai@databricks.com>
Closes#10288 from yhuai/handleCorruptJson.
Based on the suggestions from marmbrus cloud-fan in https://github.com/apache/spark/pull/10165 , this PR is to print the decoded values(user objects) in `Dataset.show`
```scala
implicit val kryoEncoder = Encoders.kryo[KryoClassData]
val ds = Seq(KryoClassData("a", 1), KryoClassData("b", 2), KryoClassData("c", 3)).toDS()
ds.show(20, false);
```
The current output is like
```
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1, 0, 111, 114, 103, 46, 97, 112, 97, 99, 104, 101, 46, 115, 112, 97, 114, 107, 46, 115, 113, 108, 46, 75, 114, 121, 111, 67, 108, 97, 115, 115, 68, 97, 116, -31, 1, 1, -126, 97, 2]|
|[1, 0, 111, 114, 103, 46, 97, 112, 97, 99, 104, 101, 46, 115, 112, 97, 114, 107, 46, 115, 113, 108, 46, 75, 114, 121, 111, 67, 108, 97, 115, 115, 68, 97, 116, -31, 1, 1, -126, 98, 4]|
|[1, 0, 111, 114, 103, 46, 97, 112, 97, 99, 104, 101, 46, 115, 112, 97, 114, 107, 46, 115, 113, 108, 46, 75, 114, 121, 111, 67, 108, 97, 115, 115, 68, 97, 116, -31, 1, 1, -126, 99, 6]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
After the fix, it will be like the below if and only if the users override the `toString` function in the class `KryoClassData`
```scala
override def toString: String = s"KryoClassData($a, $b)"
```
```
+-------------------+
|value |
+-------------------+
|KryoClassData(a, 1)|
|KryoClassData(b, 2)|
|KryoClassData(c, 3)|
+-------------------+
```
If users do not override the `toString` function, the results will be like
```
+---------------------------------------+
|value |
+---------------------------------------+
|org.apache.spark.sql.KryoClassData68ef|
|org.apache.spark.sql.KryoClassData6915|
|org.apache.spark.sql.KryoClassData693b|
+---------------------------------------+
```
Question: Should we add another optional parameter in the function `show`? It will decide if the function `show` will display the hex values or the object values?
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10215 from gatorsmile/showDecodedValue.
https://issues.apache.org/jira/browse/SPARK-12249
Currently `!=` operator is not pushed down correctly.
I simply added a case for this.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#10233 from HyukjinKwon/SPARK-12249.
This is continuation of SPARK-12056 where change is applied to SqlNewHadoopRDD.scala
andrewor14
FYI
Author: tedyu <yuzhihong@gmail.com>
Closes#10164 from tedyu/master.
Support UnsafeRow for the Coalesce/Except/Intersect.
Could you review if my code changes are ok? davies Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10285 from gatorsmile/unsafeSupportCIE.
marmbrus This PR is to address your comment. Thanks for your review!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10214 from gatorsmile/followup12188.
When SparkStrategies.BasicOperators's "case BroadcastHint(child) => apply(child)" is hit, it only recursively invokes BasicOperators.apply with this "child". It makes many strategies have no change to process this plan, which probably leads to "No plan" issue, so we use planLater to go through all strategies.
https://issues.apache.org/jira/browse/SPARK-12275
Author: yucai <yucai.yu@intel.com>
Closes#10265 from yucai/broadcast_hint.
Currently, we could generate different plans for query with single distinct (depends on spark.sql.specializeSingleDistinctAggPlanning), one works better on low cardinality columns, the other
works better for high cardinality column (default one).
This PR change to generate a single plan (three aggregations and two exchanges), which work better in both cases, then we could safely remove the flag `spark.sql.specializeSingleDistinctAggPlanning` (introduced in 1.6).
For a query like `SELECT COUNT(DISTINCT a) FROM table` will be
```
AGG-4 (count distinct)
Shuffle to a single reducer
Partial-AGG-3 (count distinct, no grouping)
Partial-AGG-2 (grouping on a)
Shuffle by a
Partial-AGG-1 (grouping on a)
```
This PR also includes large refactor for aggregation (reduce 500+ lines of code)
cc yhuai nongli marmbrus
Author: Davies Liu <davies@databricks.com>
Closes#10228 from davies/single_distinct.
Modifies the String overload to call the Column overload and ensures this is called in a test.
Author: Ankur Dave <ankurdave@gmail.com>
Closes#10271 from ankurdave/SPARK-12298.
This patch adds documentation for Spark configurations that affect off-heap memory and makes some naming and validation improvements for those configs.
- Change `spark.memory.offHeapSize` to `spark.memory.offHeap.size`. This is fine because this configuration has not shipped in any Spark release yet (it's new in Spark 1.6).
- Deprecated `spark.unsafe.offHeap` in favor of a new `spark.memory.offHeap.enabled` configuration. The motivation behind this change is to gather all memory-related configurations under the same prefix.
- Add a check which prevents users from setting `spark.memory.offHeap.enabled=true` when `spark.memory.offHeap.size == 0`. After SPARK-11389 (#9344), which was committed in Spark 1.6, Spark enforces a hard limit on the amount of off-heap memory that it will allocate to tasks. As a result, enabling off-heap execution memory without setting `spark.memory.offHeap.size` will lead to immediate OOMs. The new configuration validation makes this scenario easier to diagnose, helping to avoid user confusion.
- Document these configurations on the configuration page.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10237 from JoshRosen/SPARK-12251.
This PR adds a `private[sql]` method `metadata` to `SparkPlan`, which can be used to describe detail information about a physical plan during visualization. Specifically, this PR uses this method to provide details of `PhysicalRDD`s translated from a data source relation. For example, a `ParquetRelation` converted from Hive metastore table `default.psrc` is now shown as the following screenshot:
![image](https://cloud.githubusercontent.com/assets/230655/11526657/e10cb7e6-9916-11e5-9afa-f108932ec890.png)
And here is the screenshot for a regular `ParquetRelation` (not converted from Hive metastore table) loaded from a really long path:
![output](https://cloud.githubusercontent.com/assets/230655/11680582/37c66460-9e94-11e5-8f50-842db5309d5a.png)
Author: Cheng Lian <lian@databricks.com>
Closes#10004 from liancheng/spark-12012.physical-rdd-metadata.
This PR contains the following updates:
- Created a new private variable `boundTEncoder` that can be shared by multiple functions, `RDD`, `select` and `collect`.
- Replaced all the `queryExecution.analyzed` by the function call `logicalPlan`
- A few API comments are using wrong class names (e.g., `DataFrame`) or parameter names (e.g., `n`)
- A few API descriptions are wrong. (e.g., `mapPartitions`)
marmbrus rxin cloud-fan Could you take a look and check if they are appropriate? Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10184 from gatorsmile/datasetClean.
See the thread Ben started:
http://search-hadoop.com/m/q3RTtveEuhjsr7g/
This PR adds drop() method to DataFrame which accepts multiple column names
Author: tedyu <yuzhihong@gmail.com>
Closes#9862 from ted-yu/master.
`ByteBuffer` doesn't guarantee all contents in `ByteBuffer.array` are valid. E.g, a ByteBuffer returned by `ByteBuffer.slice`. We should not use the whole content of `ByteBuffer` unless we know that's correct.
This patch fixed all places that use `ByteBuffer.array` incorrectly.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#10083 from zsxwing/bytebuffer-array.
We should upgrade to SBT 0.13.9, since this is a requirement in order to use SBT's new Maven-style resolution features (which will be done in a separate patch, because it's blocked by some binary compatibility issues in the POM reader plugin).
I also upgraded Scalastyle to version 0.8.0, which was necessary in order to fix a Scala 2.10.5 compatibility issue (see https://github.com/scalastyle/scalastyle/issues/156). The newer Scalastyle is slightly stricter about whitespace surrounding tokens, so I fixed the new style violations.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10112 from JoshRosen/upgrade-to-sbt-0.13.9.
This replaces https://github.com/apache/spark/pull/9696
Invoke Checkstyle and print any errors to the console, failing the step.
Use Google's style rules modified according to
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide
Some important checks are disabled (see TODOs in `checkstyle.xml`) due to
multiple violations being present in the codebase.
Suggest fixing those TODOs in a separate PR(s).
More on Checkstyle can be found on the [official website](http://checkstyle.sourceforge.net/).
Sample output (from [build 46345](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46345/consoleFull)) (duplicated because I run the build twice with different profiles):
> Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java:[217,7] (coding) MissingSwitchDefault: switch without "default" clause.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[198,10] (modifier) ModifierOrder: 'protected' modifier out of order with the JLS suggestions.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java:[217,7] (coding) MissingSwitchDefault: switch without "default" clause.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[198,10] (modifier) ModifierOrder: 'protected' modifier out of order with the JLS suggestions.
> [error] running /home/jenkins/workspace/SparkPullRequestBuilder2/dev/lint-java ; received return code 1
Also fix some of the minor violations that didn't require sweeping changes.
Apologies for the previous botched PRs - I finally figured out the issue.
cr: JoshRosen, pwendell
> I state that the contribution is my original work, and I license the work to the project under the project's open source license.
Author: Dmitry Erastov <derastov@gmail.com>
Closes#9867 from dskrvk/master.
Resubmit #9297 and #9991
On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution.
To support SQL UI on the history server:
1. I added an onOtherEvent method to the SparkListener trait and post all SQL related events to the same event bus.
2. Two SQL events SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd are defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4. A new trait SparkHistoryListenerFactory is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using java.util.ServiceLoader.
Author: Carson Wang <carson.wang@intel.com>
Closes#10061 from carsonwang/SqlHistoryUI.
In Java Spec java.sql.Connection, it has
boolean getAutoCommit() throws SQLException
Throws:
SQLException - if a database access error occurs or this method is called on a closed connection
So if conn.getAutoCommit is called on a closed connection, a SQLException will be thrown. Even though the code catch the SQLException and program can continue, I think we should check conn.isClosed before calling conn.getAutoCommit to avoid the unnecessary SQLException.
Author: Huaxin Gao <huaxing@oc0558782468.ibm.com>
Closes#10095 from huaxingao/spark-12088.
Use try to match the behavior for single distinct aggregation with Spark 1.5, but that's not scalable, we should be robust by default, have a flag to address performance regression for low cardinality aggregation.
cc yhuai nongli
Author: Davies Liu <davies@databricks.com>
Closes#10075 from davies/agg_15.
When query the Timestamp or Date column like the following
val filtered = jdbcdf.where($"TIMESTAMP_COLUMN" >= beg && $"TIMESTAMP_COLUMN" < end)
The generated SQL query is "TIMESTAMP_COLUMN >= 2015-01-01 00:00:00.0"
It should have quote around the Timestamp/Date value such as "TIMESTAMP_COLUMN >= '2015-01-01 00:00:00.0'"
Author: Huaxin Gao <huaxing@oc0558782468.ibm.com>
Closes#9872 from huaxingao/spark-11788.
The issue is that the output commiter is not idempotent and retry attempts will
fail because the output file already exists. It is not safe to clean up the file
as this output committer is by design not retryable. Currently, the job fails
with a confusing file exists error. This patch is a stop gap to tell the user
to look at the top of the error log for the proper message.
This is difficult to test locally as Spark is hardcoded not to retry. Manually
verified by upping the retry attempts.
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
Closes#10080 from nongli/spark-11328.
Persist and Unpersist exist in both RDD and Dataframe APIs. I think they are still very critical in Dataset APIs. Not sure if my understanding is correct? If so, could you help me check if the implementation is acceptable?
Please provide your opinions. marmbrus rxin cloud-fan
Thank you very much!
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#9889 from gatorsmile/persistDS.
The reason is that, for a single culumn `RowEncoder`(or a single field product encoder), when we use it as the encoder for grouping key, we should also combine the grouping attributes, although there is only one grouping attribute.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10059 from cloud-fan/bug.
This reverts commit cc243a079b / PR #9297
I'm reverting this because it broke SQLListenerMemoryLeakSuite in the master Maven builds.
See #9991 for a discussion of why this broke the tests.
This PR improve the performance of CartesianProduct by caching the result of right plan.
After this patch, the query time of TPC-DS Q65 go down to 4 seconds from 28 minutes (420X faster).
cc nongli
Author: Davies Liu <davies@databricks.com>
Closes#9969 from davies/improve_cartesian.
In 1.6, we introduce a public API to have a SQLContext for current thread, SparkPlan should use that.
Author: Davies Liu <davies@databricks.com>
Closes#9990 from davies/leak_context.
In https://github.com/apache/spark/pull/9409 we enabled multi-column counting. The approach taken in that PR introduces a bit of overhead by first creating a row only to check if all of the columns are non-null.
This PR fixes that technical debt. Count now takes multiple columns as its input. In order to make this work I have also added support for multiple columns in the single distinct code path.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10015 from hvanhovell/SPARK-12024.
Check for partition column null-ability while building the partition spec.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#10001 from dilipbiswal/spark-11997.
Reference: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
In order for PostgreSQL to honor the fetchSize non-zero setting, its Connection.autoCommit needs to be set to false. Otherwise, it will just quietly ignore the fetchSize setting.
This adds a new side-effecting dialect specific beforeFetch method that will fire before a select query is ran.
Author: mariusvniekerk <marius.v.niekerk@gmail.com>
Closes#9861 from mariusvniekerk/SPARK-11881.
On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution.
To support SQL UI on the history server:
1. I added an `onOtherEvent` method to the `SparkListener` trait and post all SQL related events to the same event bus.
2. Two SQL events `SparkListenerSQLExecutionStart` and `SparkListenerSQLExecutionEnd` are defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4. A new trait `SparkHistoryListenerFactory` is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using `java.util.ServiceLoader`.
Author: Carson Wang <carson.wang@intel.com>
Closes#9297 from carsonwang/SqlHistoryUI.
Currently, we does not have visualization for SQL query from Python, this PR fix that.
cc zsxwing
Author: Davies Liu <davies@databricks.com>
Closes#9949 from davies/pyspark_sql_ui.
Except inner join, maybe the other join types are also useful when users are using the joinWith function. Thus, added the joinType into the existing joinWith call in Dataset APIs.
Also providing another joinWith interface for the cartesian-join-like functionality.
Please provide your opinions. marmbrus rxin cloud-fan Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9921 from gatorsmile/joinWith.
This patch makes it consistent to use varargs in all DataFrameReader methods, including Parquet, JSON, text, and the generic load function.
Also added a few more API tests for the Java API.
Author: Reynold Xin <rxin@databricks.com>
Closes#9945 from rxin/SPARK-11967.
This PR is to provide two common `coalesce` and `repartition` in Dataset APIs.
After reading the comments of SPARK-9999, I am unclear about the plan for supporting re-partitioning in Dataset APIs. Currently, both RDD APIs and Dataframe APIs provide users such a flexibility to control the number of partitions.
In most traditional RDBMS, they expose the number of partitions, the partitioning columns, the table partitioning methods to DBAs for performance tuning and storage planning. Normally, these parameters could largely affect the query performance. Since the actual performance depends on the workload types, I think it is almost impossible to automate the discovery of the best partitioning strategy for all the scenarios.
I am wondering if Dataset APIs are planning to hide these APIs from users? Feel free to reject my PR if it does not match the plan.
Thank you for your answers. marmbrus rxin cloud-fan
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9899 from gatorsmile/coalesce.
Currently pivot's signature looks like
```scala
scala.annotation.varargs
def pivot(pivotColumn: Column, values: Column*): GroupedData
scala.annotation.varargs
def pivot(pivotColumn: String, values: Any*): GroupedData
```
I think we can remove the one that takes "Column" types, since callers should always be passing in literals. It'd also be more clear if the values are not varargs, but rather Seq or java.util.List.
I also made similar changes for Python.
Author: Reynold Xin <rxin@databricks.com>
Closes#9929 from rxin/SPARK-11946.
we should pass in resolved encodera to logical `CoGroup` and bind them in physical `CoGroup`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9928 from cloud-fan/cogroup.
Based on feedback from Matei, this is more consistent with mapPartitions in Spark.
Also addresses some of the cleanups from a previous commit that renames the type variables.
Author: Reynold Xin <rxin@databricks.com>
Closes#9919 from rxin/SPARK-11933.
1. Renamed map to mapGroup, flatMap to flatMapGroup.
2. Renamed asKey -> keyAs.
3. Added more documentation.
4. Changed type parameter T to V on GroupedDataset.
5. Added since versions for all functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9880 from rxin/SPARK-11899.
In this PR I delete a method that breaks type inference for aggregators (only in the REPL)
The error when this method is present is:
```
<console>:38: error: missing parameter type for expanded function ((x$2) => x$2._2)
ds.groupBy(_._1).agg(sum(_._2), sum(_._3)).collect()
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9870 from marmbrus/dataset-repl-agg.
This mainly moves SqlNewHadoopRDD to the sql package. There is some state that is
shared between core and I've left that in core. This allows some other associated
minor cleanup.
Author: Nong Li <nong@databricks.com>
Closes#9845 from nongli/spark-11787.
DataSet APIs look great! However, I am lost when doing multiple level joins. For example,
```
val ds1 = Seq(("a", 1), ("b", 2)).toDS().as("a")
val ds2 = Seq(("a", 1), ("b", 2)).toDS().as("b")
val ds3 = Seq(("a", 1), ("b", 2)).toDS().as("c")
ds1.joinWith(ds2, $"a._2" === $"b._2").as("ab").joinWith(ds3, $"ab._1._2" === $"c._2").printSchema()
```
The printed schema is like
```
root
|-- _1: struct (nullable = true)
| |-- _1: struct (nullable = true)
| | |-- _1: string (nullable = true)
| | |-- _2: integer (nullable = true)
| |-- _2: struct (nullable = true)
| | |-- _1: string (nullable = true)
| | |-- _2: integer (nullable = true)
|-- _2: struct (nullable = true)
| |-- _1: string (nullable = true)
| |-- _2: integer (nullable = true)
```
Personally, I think we need the printSchema function. Sometimes, I do not know how to specify the column, especially when their data types are mixed. For example, if I want to write the following select for the above multi-level join, I have to know the schema:
```
newDS.select(expr("_1._2._2 + 1").as[Int]).collect()
```
marmbrus rxin cloud-fan Do you have the same feeling?
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9855 from gatorsmile/printSchemaDataSet.
Apply the user supplied pathfilter while retrieving the files from fs.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9830 from dilipbiswal/spark-11544.
In addition, tightened visibility of a lot of classes in the columnar package from private[sql] to private[columnar].
Author: Reynold Xin <rxin@databricks.com>
Closes#9842 from rxin/SPARK-11858.
Fix a bug in DataFrameReader.table (table with schema name such as "db_name.table" doesn't work)
Use SqlParser.parseTableIdentifier to parse the table name before lookupRelation.
Author: Huaxin Gao <huaxing@oc0558782468.ibm.com>
Closes#9773 from huaxingao/spark-11778.
After some experiment, I found it's not convenient to have separate encoder builders: `FlatEncoder` and `ProductEncoder`. For example, when create encoders for `ScalaUDF`, we have no idea if the type `T` is flat or not. So I revert the splitting change in https://github.com/apache/spark/pull/9693, while still keeping the bug fixes and tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9726 from cloud-fan/follow.
When debugging DataSet API, I always need to print the logical and physical plans.
I am wondering if we should provide a simple API for EXPLAIN?
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9832 from gatorsmile/explainDS.
When handling self joins, the implementation did not consider the case insensitivity of HiveContext. It could cause an exception as shown in the JIRA:
```
TreeNodeException: Failed to copy node.
```
The fix is low risk. It avoids unnecessary attribute replacement. It should not affect the existing behavior of self joins. Also added the test case to cover this case.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9762 from gatorsmile/joinMakeCopy.
This patch adds an alternate to the Parquet RecordReader from the parquet-mr project
that is much faster for flat schemas. Instead of using the general converter mechanism
from parquet-mr, this directly uses the lower level APIs from parquet-columnar and a
customer RecordReader that directly assembles into UnsafeRows.
This is optionally disabled and only used for supported schemas.
Using the tpcds store sales table and doing a sum of increasingly more columns, the results
are:
For 1 Column:
Before: 11.3M rows/second
After: 18.2M rows/second
For 2 Columns:
Before: 7.2M rows/second
After: 11.2M rows/second
For 5 Columns:
Before: 2.9M rows/second
After: 4.5M rows/second
Author: Nong Li <nong@databricks.com>
Closes#9774 from nongli/parquet.
Before this PR there were two things that would blow up if you called `df.as[MyClass]` if `MyClass` was defined in the REPL:
- [x] Because `classForName` doesn't work on the munged names returned by `tpe.erasure.typeSymbol.asClass.fullName`
- [x] Because we don't have anything to pass into the constructor for the `$outer` pointer.
Note that this PR is just adding the infrastructure for working with inner classes in encoder and is not yet sufficient to make them work in the REPL. Currently, the implementation show in 95cec7d413 is causing a bug that breaks code gen due to some interaction between janino and the `ExecutorClassLoader`. This will be addressed in a follow-up PR.
Author: Michael Armbrust <michael@databricks.com>
Closes#9602 from marmbrus/dataset-replClasses.
Apply the user supplied pathfilter while retrieving the files from fs.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9652 from dilipbiswal/spark-11544.
Currently, if the first SQLContext is not removed after stopping SparkContext, a SQLContext could set there forever. This patch make this more robust.
Author: Davies Liu <davies@databricks.com>
Closes#9706 from davies/clear_context.
https://issues.apache.org/jira/browse/SPARK-11792
The main changes include:
* Renaming `SizeEstimation` to `KnownSizeEstimation`. Hopefully this new name has more information.
* Making `estimatedSize` return `Long` instead of `Option[Long]`.
* In `UnsaveHashedRelation`, `estimatedSize` will delegate the work to `SizeEstimator` if we have not created a `BytesToBytesMap`.
Since we will put `UnsaveHashedRelation` to `BlockManager`, it is generally good to let it provide a more accurate size estimation. Also, if we do not put `BytesToBytesMap` directly into `BlockerManager`, I feel it is not really necessary to make `BytesToBytesMap` extends `KnownSizeEstimation`.
Author: Yin Huai <yhuai@databricks.com>
Closes#9813 from yhuai/SPARK-11792-followup.
we use `ExpressionEncoder.tuple` to build the result encoder, which assumes the input encoder should point to a struct type field if it’s non-flat.
However, our keyEncoder always point to a flat field/fields: `groupingAttributes`, we should combine them into a single `NamedExpression`.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9792 from cloud-fan/agg.
When we resolve the join operator, we may change the output of right side if self-join is detected. So in `Dataset.joinWith`, we should resolve the join operator first, and then get the left output and right output from it, instead of using `left.output` and `right.output` directly.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9806 from cloud-fan/self-join.
I also found a bug with self-joins returning incorrect results in the Dataset API. Two test cases attached and filed SPARK-11803.
Author: Reynold Xin <rxin@databricks.com>
Closes#9789 from rxin/SPARK-11802.
I also wrote a test case -- but unfortunately the test case is not working due to SPARK-11795.
Author: Reynold Xin <rxin@databricks.com>
Closes#9784 from rxin/SPARK-11503.
Currently the size of cached batch in only controlled by `batchSize` (default value is 10000), which does not work well with the size of serialized columns (for example, complex types). The memory used to build the batch is not accounted, it's easy to OOM (especially after unified memory management).
This PR introduce a hard limit as 4M for total columns (up to 50 columns of uncompressed primitive columns).
This also change the way to grow buffer, double it each time, then trim it once finished.
cc liancheng
Author: Davies Liu <davies@databricks.com>
Closes#9760 from davies/cache_limit.
Parquet supports some JSON and BSON datatypes. They are represented as binary for BSON and string (UTF-8) for JSON internally.
I searched a bit and found Apache drill also supports both in this way, [link](https://drill.apache.org/docs/parquet-format/).
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Hyukjin Kwon <gurwls223@gmail.com>
Closes#9658 from HyukjinKwon/SPARK-11692.
https://issues.apache.org/jira/browse/SPARK-11044
Spark writes a parquet file only with writer version1 ignoring the writer version given by user.
So, in this PR, it keeps the writer version if given or sets version1 as default.
Author: hyukjinkwon <gurwls223@gmail.com>
Author: HyukjinKwon <gurwls223@gmail.com>
Closes#9060 from HyukjinKwon/SPARK-11044.
This patch adds the following options to the JSON data source, for dealing with non-standard JSON files:
* `allowComments` (default `false`): ignores Java/C++ style comment in JSON records
* `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names
* `allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
* `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers (e.g. 00012)
To avoid passing a lot of options throughout the json package, I introduced a new JSONOptions case class to define all JSON config options.
Also updated documentation to explain these options.
Scala
![screen shot 2015-11-15 at 6 12 12 pm](https://cloud.githubusercontent.com/assets/323388/11172965/e3ace6ec-8bc4-11e5-805e-2d78f80d0ed6.png)
Python
![screen shot 2015-11-15 at 6 11 28 pm](https://cloud.githubusercontent.com/assets/323388/11172964/e23ed6ee-8bc4-11e5-8216-312f5983acd5.png)
Author: Reynold Xin <rxin@databricks.com>
Closes#9724 from rxin/SPARK-11745.
LogicalLocalTable in ExistingRDD.scala is replaced by localRelation in LocalRelation.scala?
Do you know any reason why we still keep this class?
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9717 from gatorsmile/LogicalLocalTable.
I didn't remove the old Sort operator, since we still use it in randomized tests. I moved it into test module and renamed it ReferenceSort.
Author: Reynold Xin <rxin@databricks.com>
Closes#9700 from rxin/SPARK-11734.
Also introduces new spark private API in RDD.scala with name 'mapPartitionsInternal' which doesn't closure cleans the RDD elements.
Author: nitin goyal <nitin.goyal@guavus.com>
Author: nitin.goyal <nitin.goyal@guavus.com>
Closes#9253 from nitin2goyal/master.
also add more tests for encoders, and fix bugs that I found:
* when convert array to catalyst array, we can only skip element conversion for native types(e.g. int, long, boolean), not `AtomicType`(String is AtomicType but we need to convert it)
* we should also handle scala `BigDecimal` when convert from catalyst `Decimal`.
* complex map type should be supported
other issues that still in investigation:
* encode java `BigDecimal` and decode it back, seems we will loss precision info.
* when encode case class that defined inside a object, `ClassNotFound` exception will be thrown.
I'll remove unused code in a follow-up PR.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9693 from cloud-fan/split.
* rename `AppendColumn` to `AppendColumns` to be consistent with the physical plan name.
* clean up stale comments.
* always pass in resolved encoder to `TypedColumn.withInputType`(test added)
* enable a mistakenly disabled java test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9688 from cloud-fan/follow.
https://issues.apache.org/jira/browse/SPARK-11678
The change of this PR is to pass root paths of table to the partition discovery logic. So, the process of partition discovery stops at those root paths instead of going all the way to the root path of the file system.
Author: Yin Huai <yhuai@databricks.com>
Closes#9651 from yhuai/SPARK-11678.
This PR adds a new method, `reduce`, to `GroupedDataset`, which allows similar operations to `reduceByKey` on a traditional `PairRDD`.
```scala
val ds = Seq("abc", "xyz", "hello").toDS()
ds.groupBy(_.length).reduce(_ + _).collect() // not actually commutative :P
res0: Array(3 -> "abcxyz", 5 -> "hello")
```
While implementing this method and its test cases several more deficiencies were found in our encoder handling. Specifically, in order to support positional resolution, named resolution and tuple composition, it is important to keep the unresolved encoder around and to use it when constructing new `Datasets` with the same object type but different output attributes. We now divide the encoder lifecycle into three phases (that mirror the lifecycle of standard expressions) and have checks at various boundaries:
- Unresoved Encoders: all users facing encoders (those constructed by implicits, static methods, or tuple composition) are unresolved, meaning they have only `UnresolvedAttributes` for named fields and `BoundReferences` for fields accessed by ordinal.
- Resolved Encoders: internal to a `[Grouped]Dataset` the encoder is resolved, meaning all input has been resolved to a specific `AttributeReference`. Any encoders that are placed into a logical plan for use in object construction should be resolved.
- BoundEncoder: Are constructed by physical plans, right before actual conversion from row -> object is performed.
It is left to future work to add explicit checks for resolution and provide good error messages when it fails. We might also consider enforcing the above constraints in the type system (i.e. `fromRow` only exists on a `ResolvedEncoder`), but we should probably wait before spending too much time on this.
Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9673 from marmbrus/pr/9628.
switched stddev support from DeclarativeAggregate to ImperativeAggregate.
Author: JihongMa <linlin200605@gmail.com>
Closes#9380 from JihongMA/SPARK-11420.
Parquet supports some unsigned datatypes. However, Since Spark does not support unsigned datatypes, it needs to emit an exception with a clear message rather then with the one saying illegal datatype.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#9646 from HyukjinKwon/SPARK-10113.
This adds a pivot method to the dataframe api.
Following the lead of cube and rollup this adds a Pivot operator that is translated into an Aggregate by the analyzer.
Currently the syntax is like:
~~courseSales.pivot(Seq($"year"), $"course", Seq("dotNET", "Java"), sum($"earnings"))~~
~~Would we be interested in the following syntax also/alternatively? and~~
courseSales.groupBy($"year").pivot($"course", "dotNET", "Java").agg(sum($"earnings"))
//or
courseSales.groupBy($"year").pivot($"course").agg(sum($"earnings"))
Later we can add it to `SQLParser`, but as Hive doesn't support it we cant add it there, right?
~~Also what would be the suggested Java friendly method signature for this?~~
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#7841 from aray/sql-pivot.
We need to support custom classes like java beans and combine them into tuple, and it's very hard to do it with the TypeTag-based approach.
We should keep only the compose-based way to create tuple encoder.
This PR also move `Encoder` to `org.apache.spark.sql`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9567 from cloud-fan/java.
https://issues.apache.org/jira/browse/SPARK-11500
As filed in SPARK-11500, if merging schemas is enabled, the order of files to touch is a matter which might affect the ordering of the output columns.
This was mostly because of the use of `Set` and `Map` so I replaced them to `LinkedHashSet` and `LinkedHashMap` to keep the insertion order.
Also, I changed `reduceOption` to `reduceLeftOption`, and replaced the order of `filesToTouch` from `metadataStatuses ++ commonMetadataStatuses ++ needMerged` to `needMerged ++ metadataStatuses ++ commonMetadataStatuses` in order to touch the part-files first which always have the schema in footers whereas the others might not exist.
One nit is, If merging schemas is not enabled, but when multiple files are given, there is no guarantee of the output order, since there might not be a summary file for the first file, which ends up putting ahead the columns of the other files.
However, I thought this should be okay since disabling merging schemas means (assumes) all the files have the same schemas.
In addition, in the test code for this, I only checked the names of fields.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#9517 from HyukjinKwon/SPARK-11500.
See http://search-hadoop.com/m/q3RTtjpe8r1iRbTj2 for discussion.
Summary: addition of VisibleForTesting annotation resulted in spark-shell malfunctioning.
Author: tedyu <yuzhihong@gmail.com>
Closes#9585 from tedyu/master.
This patch adds the building blocks for codegening subexpr elimination and implements
it end to end for UnsafeProjection. The building blocks can be used to do the same thing
for other operators.
It introduces some utilities to compute common sub expressions. Expressions can be added to
this data structure. The expr and its children will be recursively matched against existing
expressions (ones previously added) and grouped into common groups. This is built using
the existing `semanticEquals`. It does not understand things like commutative or associative
expressions. This can be done as future work.
After building this data structure, the codegen process takes advantage of it by:
1. Generating a helper function in the generated class that computes the common
subexpression. This is done for all common subexpressions that have at least
two occurrences and the expression tree is sufficiently complex.
2. When generating the apply() function, if the helper function exists, call that
instead of regenerating the expression tree. Repeated calls to the helper function
shortcircuit the evaluation logic.
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9480 from nongli/spark-10371.
Currently the user facing api for typed aggregation has some limitations:
* the customized typed aggregation must be the first of aggregation list
* the customized typed aggregation can only use long as buffer type
* the customized typed aggregation can only use flat type as result type
This PR tries to remove these limitations.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9599 from cloud-fan/agg.
https://issues.apache.org/jira/browse/SPARK-9830
This PR contains the following main changes.
* Removing `AggregateExpression1`.
* Removing `Aggregate` operator, which is used to evaluate `AggregateExpression1`.
* Removing planner rule used to plan `Aggregate`.
* Linking `MultipleDistinctRewriter` to analyzer.
* Renaming `AggregateExpression2` to `AggregateExpression` and `AggregateFunction2` to `AggregateFunction`.
* Updating places where we create aggregate expression. The way to create aggregate expressions is `AggregateExpression(aggregateFunction, mode, isDistinct)`.
* Changing `val`s in `DeclarativeAggregate`s that touch children of this function to `lazy val`s (when we create aggregate expression in DataFrame API, children of an aggregate function can be unresolved).
Author: Yin Huai <yhuai@databricks.com>
Closes#9556 from yhuai/removeAgg1.
This PR adds a new interface for user-defined aggregations, that can be used in `DataFrame` and `Dataset` operations to take all of the elements of a group and reduce them to a single value.
For example, the following aggregator extracts an `int` from a specific class and adds them up:
```scala
case class Data(i: Int)
val customSummer = new Aggregator[Data, Int, Int] {
def prepare(d: Data) = d.i
def reduce(l: Int, r: Int) = l + r
def present(r: Int) = r
}.toColumn()
val ds: Dataset[Data] = ...
val aggregated = ds.select(customSummer)
```
By using helper functions, users can make a generic `Aggregator` that works on any input type:
```scala
/** An `Aggregator` that adds up any numeric type returned by the given function. */
class SumOf[I, N : Numeric](f: I => N) extends Aggregator[I, N, N] with Serializable {
val numeric = implicitly[Numeric[N]]
override def zero: N = numeric.zero
override def reduce(b: N, a: I): N = numeric.plus(b, f(a))
override def present(reduction: N): N = reduction
}
def sum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N] = new SumOf(f).toColumn
```
These aggregators can then be used alongside other built-in SQL aggregations.
```scala
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
ds
.groupBy(_._1)
.agg(
sum(_._2), // The aggregator defined above.
expr("sum(_2)").as[Int], // A built-in dynatically typed aggregation.
count("*")) // A built-in statically typed aggregation.
.collect()
res0: ("a", 30, 30, 2L), ("b", 3, 3, 2L), ("c", 1, 1, 1L)
```
The current implementation focuses on integrating this into the typed API, but currently only supports running aggregations that return a single long value as explained in `TypedAggregateExpression`. This will be improved in a followup PR.
Author: Michael Armbrust <michael@databricks.com>
Closes#9555 from marmbrus/dataset-useragg.
For now they are thin wrappers around the corresponding Hive UDAFs.
One limitation with these in Hive 0.13.0 is they only support aggregating primitive types.
I chose snake_case here instead of camelCase because it seems to be used in the majority of the multi-word fns.
Do we also want to add these to `functions.py`?
This approach was recommended here: https://github.com/apache/spark/pull/8592#issuecomment-154247089
marmbrus rxin
Author: Nick Buroojy <nick.buroojy@civitaslearning.com>
Closes#9526 from nburoojy/nick/udaf-alias.
(cherry picked from commit a6ee4f989d)
Signed-off-by: Michael Armbrust <michael@databricks.com>
The reason is that:
1. For partitioned hive table, we will move the partitioned columns after data columns. (e.g. `<a: Int, b: Int>` partition by `a` will become `<b: Int, a: Int>`)
2. When append data to table, we use position to figure out how to match input columns to table's columns.
So when we append data to partitioned table, we will match wrong columns between input and table. A solution is reordering the input columns before match by position, like what we did for [`InsertIntoHadoopFsRelation`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala#L101-L105)
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9408 from cloud-fan/append.
A few changes:
1. Removed fold, since it can be confusing for distributed collections.
2. Created specific interfaces for each Dataset function (e.g. MapFunction, ReduceFunction, MapPartitionsFunction)
3. Added more documentation and test cases.
The other thing I'm considering doing is to have a "collector" interface for FlatMapFunction and MapPartitionsFunction, similar to MapReduce's map function.
Author: Reynold Xin <rxin@databricks.com>
Closes#9531 from rxin/SPARK-11564.
JIRA: https://issues.apache.org/jira/browse/SPARK-11362
We use scala.collection.mutable.BitSet in BroadcastNestedLoopJoin now. We should use Spark's BitSet.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9316 from viirya/use-spark-bitset.
The second PR for SPARK-9241, this adds support for multiple distinct columns to the new aggregation code path.
This PR solves the multiple DISTINCT column problem by rewriting these Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information on this. The advantages over the - competing - [first PR](https://github.com/apache/spark/pull/9280) are:
- This can use the faster TungstenAggregate code path.
- It is impossible to OOM due to an ```OpenHashSet``` allocating to much memory. However, this will multiply the number of input rows by the number of distinct clauses (plus one), and puts a lot more memory pressure on the aggregation code path itself.
The location of this Rule is a bit funny, and should probably change when the old aggregation path is changed.
cc yhuai - Could you also tell me where to add tests for this?
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9406 from hvanhovell/SPARK-9241-rewriter.
This PR enables the Expand operator to process and produce Unsafe Rows.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9414 from hvanhovell/SPARK-11450.
This PR adds the ability to do typed SQL aggregations. We will likely also want to provide an interface to allow users to do aggregations on objects, but this is deferred to another PR.
```scala
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
ds.groupBy(_._1).agg(sum("_2").as[Int]).collect()
res0: Array(("a", 30), ("b", 3), ("c", 1))
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9499 from marmbrus/dataset-agg.
This brings the support of off-heap memory for array inside BytesToBytesMap and InMemorySorter, then we could allocate all the memory from off-heap for execution.
Closes#8068
Author: Davies Liu <davies@databricks.com>
Closes#9477 from davies/unsafe_timsort.
the main problem is: we interpret column name with special handling of `.` for DataFrame. This enables us to write something like `df("a.b")` to get the field `b` of `a`. However, we don't need this feature in `DataFrame.apply("*")` or `DataFrame.withColumnRenamed`. In these 2 cases, the column name is the final name already, we don't need extra process to interpret it.
The solution is simple, use `queryExecution.analyzed.output` to get resolved column directly, instead of using `DataFrame.resolve`.
close https://github.com/apache/spark/pull/8811
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9462 from cloud-fan/special-chars.
This is the alternative/agreed upon solution to PR #8780.
Creating an OracleDialect to handle the nonspecific numeric types that can be defined in oracle.
Author: Travis Hegner <thegner@trilliumit.com>
Closes#9495 from travishegner/OracleDialect.
This internal implicit conversion has been a source of confusion for a lot of new developers.
Author: Reynold Xin <rxin@databricks.com>
Closes#9479 from rxin/SPARK-11513.
In DefaultDataSource.scala, it has
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
The parameters is CaseInsensitiveMap.
After this line
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
properties is set to all lower case key/value pairs and fetchSize becomes fetchsize.
However, in compute method in JDBCRDD, it has
val fetchSize = properties.getProperty("fetchSize", "0").toInt
so fetchSize value is always 0 and never gets set correctly.
Author: Huaxin Gao <huaxing@oc0558782468.ibm.com>
Closes#9473 from huaxingao/spark-11474.
After aggregation, the dataset could be smaller than inputs, so it's better to do hash based aggregation for all inputs, then using sort based aggregation to merge them.
Author: Davies Liu <davies@databricks.com>
Closes#9383 from davies/fix_switch.
1. def dialectClassName in HiveContext is unnecessary.
In HiveContext, if conf.dialect == "hiveql", getSQLDialect() will return new HiveQLDialect(this);
else it will use super.getSQLDialect(). Then in super.getSQLDialect(), it calls dialectClassName, which is overriden in HiveContext and still return super.dialectClassName.
So we'll never reach the code "classOf[HiveQLDialect].getCanonicalName" of def dialectClassName in HiveContext.
2. When we start bin/spark-sql, the default context is HiveContext, and the corresponding dialect is hiveql.
However, if we type "set spark.sql.dialect;", the result is "sql", which is inconsistent with the actual dialect and is misleading. For example, we can use sql like "create table" which is only allowed in hiveql, but this dialect conf shows it's "sql".
Although this problem will not cause any execution error, it's misleading to spark sql users. Therefore I think we should fix it.
In this pr, while procesing “set spark.sql.dialect” in SetCommand, I use "conf.dialect" instead of "getConf()" for the case of key == SQLConf.DIALECT.key, so that it will return the right dialect conf.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#9349 from wzhfy/dialect.
1. Renamed localSort -> sortWithinPartitions to avoid ambiguity in "local"
2. distributeBy -> repartition to match the existing repartition.
Author: Reynold Xin <rxin@databricks.com>
Closes#9470 from rxin/SPARK-11504.
stddev is an alias for stddev_samp. variance should be consistent with stddev.
Also took the chance to remove internal Stddev and Variance, and only kept StddevSamp/StddevPop and VarianceSamp/VariancePop.
Author: Reynold Xin <rxin@databricks.com>
Closes#9449 from rxin/SPARK-11490.
depend on `caseSensitive` to do column name equality check, instead of just `==`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9410 from cloud-fan/partition.
We added a bunch of higher order statistics such as skewness and kurtosis to GroupedData. I don't think they are common enough to justify being listed, since users can always use the normal statistics aggregate functions.
That is to say, after this change, we won't support
```scala
df.groupBy("key").kurtosis("colA", "colB")
```
However, we will still support
```scala
df.groupBy("key").agg(kurtosis(col("colA")), kurtosis(col("colB")))
```
Author: Reynold Xin <rxin@databricks.com>
Closes#9446 from rxin/SPARK-11489.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9434 from cloud-fan/rdd2ds and squashes the following commits:
0892d72 [Wenchen Fan] support create Dataset from RDD
Add Python API for stddev/stddev_pop/stddev_samp/variance/var_pop/var_samp/skewness/kurtosis
Author: Davies Liu <davies@databricks.com>
Closes#9424 from davies/py_var.
This PR adds a new method `unhandledFilters` to `BaseRelation`. Data sources which implement this method properly may avoid the overhead of defensive filtering done by Spark SQL.
Author: Cheng Lian <lian@databricks.com>
Closes#9399 from liancheng/spark-10978.unhandled-filters.
JIRA: https://issues.apache.org/jira/browse/SPARK-10304
This patch detects if the structure of partition directories is not valid.
The test cases are from #8547. Thanks zhzhan.
cc liancheng
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8840 from viirya/detect_invalid_part_dir.
This PR adds a new method `groupBy(cols: Column*)` to `Dataset` that allows users to group using column expressions instead of a lambda function. Since the return type of these expressions is not known at compile time, we just set the key type as a generic `Row`. If the user would like to work the key in a type-safe way, they can call `grouped.asKey[Type]`, which is also added in this PR.
```scala
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
val grouped = ds.groupBy($"_1").asKey[String]
val agged = grouped.mapGroups { case (g, iter) =>
Iterator((g, iter.map(_._2).sum))
}
agged.collect()
res0: Array(("a", 30), ("b", 3), ("c", 1))
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9359 from marmbrus/columnGroupBy and squashes the following commits:
bbcb03b [Michael Armbrust] Update DatasetSuite.scala
8fd2908 [Michael Armbrust] Update DatasetSuite.scala
0b0e2f8 [Michael Armbrust] [SPARK-11404] [SQL] Support for groupBy using column expressions
When we join 2 datasets, we will combine 2 encoders into a tupled one, and use it as the encoder for the jioned dataset. Assume both of the 2 encoders are flat, their `constructExpression`s both reference to the first element of input row. However, when we combine 2 encoders, the schema of input row changed, now the right encoder should reference to second element of input row. So we should rebind right encoder to let it know the new schema of input row before combine it.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9391 from cloud-fan/join and squashes the following commits:
846d3ab [Wenchen Fan] rebind right encoder when join 2 datasets
Hive GenericUDTF#initialize() defines field names in a returned schema though,
the current HiveGenericUDTF drops these names.
We might need to reflect these in a logical plan tree.
Author: navis.ryu <navis@apache.org>
Closes#8456 from navis/SPARK-9034.
1. Supporting expanding structs in Projections. i.e.
"SELECT s.*" where s is a struct type.
This is fixed by allowing the expand function to handle structs in addition to tables.
2. Supporting expanding * inside aggregate functions of structs.
"SELECT max(struct(col1, structCol.*))"
This requires recursively expanding the expressions. In this case, it it the aggregate
expression "max(...)" and we need to recursively expand its children inputs.
Author: Nong Li <nongli@gmail.com>
Closes#9343 from nongli/spark-11329.
…ering.
For cached tables, we can just maintain the partitioning and ordering from the
source relation.
Author: Nong Li <nongli@gmail.com>
Closes#9404 from nongli/spark-5354.
JIRA: https://issues.apache.org/jira/browse/SPARK-9298
This patch adds pearson correlation aggregation function based on `AggregateExpression2`.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8587 from viirya/corr_aggregation.
DISTRIBUTE BY allows the user to hash partition the data by specified exprs. It also allows for
optioning sorting within each resulting partition. There is no required relationship between the
exprs for partitioning and sorting (i.e. one does not need to be a prefix of the other).
This patch adds to APIs to DataFrames which can be used together to provide this functionality:
1. distributeBy() which partitions the data frame into a specified number of partitions using the
partitioning exprs.
2. localSort() which sorts each partition using the provided sorting exprs.
To get the DISTRIBUTE BY functionality, the user simply does: df.distributeBy(...).localSort(...)
Author: Nong Li <nongli@gmail.com>
Closes#9364 from nongli/spark-11410.
This PR fixes two issues:
1. `PhysicalRDD.outputsUnsafeRows` is always `false`
Thus a `ConvertToUnsafe` operator is often required even if the underlying data source relation does output `UnsafeRow`.
1. Internal/external row conversion for `HadoopFsRelation` is kinda messy
Currently we're using `HadoopFsRelation.needConversion` and [dirty type erasure hacks][1] to indicate whether the relation outputs external row or internal row and apply external-to-internal conversion when necessary. Basically, all builtin `HadoopFsRelation` data sources, i.e. Parquet, JSON, ORC, and Text output `InternalRow`, while typical external `HadoopFsRelation` data sources, e.g. spark-avro and spark-csv, output `Row`.
This PR adds a `private[sql]` interface method `HadoopFsRelation.buildInternalScan`, which by default invokes `HadoopFsRelation.buildScan` and converts `Row`s to `UnsafeRow`s (which are also `InternalRow`s). All builtin `HadoopFsRelation` data sources override this method and directly output `UnsafeRow`s. In this way, now `HadoopFsRelation` always produces `UnsafeRow`s. Thus `PhysicalRDD.outputsUnsafeRows` can be properly set by checking whether the underlying data source is a `HadoopFsRelation`.
A remaining question is that, can we assume that all non-builtin `HadoopFsRelation` data sources output external rows? At least all well known ones do so. However it's possible that some users implemented their own `HadoopFsRelation` data sources that leverages `InternalRow` and thus all those unstable internal data representations. If this assumption is safe, we can deprecate `HadoopFsRelation.needConversion` and cleanup some more conversion code (like [here][2] and [here][3]).
This PR supersedes #9125.
Follow-ups:
1. Makes JSON and ORC data sources output `UnsafeRow` directly
1. Makes `HiveTableScan` output `UnsafeRow` directly
This is related to 1 since ORC data source shares the same `Writable` unwrapping code with `HiveTableScan`.
[1]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L353
[2]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L331-L335
[3]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L630-L669
Author: Cheng Lian <lian@databricks.com>
Closes#9305 from liancheng/spark-11345.unsafe-hadoop-fs-relation.
Currently the empty line in json file will be parsed into Row with all null field values. But in json, "{}" represents a json object, empty line is supposed to be skipped.
Make a trivial change for this.
Author: Jeff Zhang <zjffdu@apache.org>
Closes#9211 from zjffdu/SPARK-11226.
Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore.
This PR basically revert #8543, #8511, #8038, #8011
Author: Davies Liu <davies@databricks.com>
Closes#9381 from davies/remove_prepare2.
When we cogroup 2 `GroupedIterator`s in `CoGroupedIterator`, if the right side is smaller, we will consume right data and keep the left data unchanged. Then we call `hasNext` which will call `left.hasNext`. This will make `GroupedIterator` generate an extra group as the previous one has not been comsumed yet.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9346 from cloud-fan/cogroup and squashes the following commits:
9be67c8 [Wenchen Fan] SPARK-11393
When enabling mergedSchema and predicate filter, this fails since Parquet does not accept filters pushed down when the columns of the filters do not exist in the schema.
This is related with Parquet issue (https://issues.apache.org/jira/browse/PARQUET-389).
For now, it just simply disables predicate push down when using merged schema in this PR.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#9327 from HyukjinKwon/SPARK-11103.
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
The PrepareRDD may be not needed anymore, could be removed in follow up PR.
The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
```python
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()
```
For thread-safety, here what I'm got:
1) Without calling spill(), the operators should only be used by single thread, no safety problems.
2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).
Author: Davies Liu <davies@databricks.com>
Closes#9241 from davies/force_spill.
The root cause is that when spark.sql.hive.convertMetastoreParquet=true by default, the cached InMemoryRelation of the ParquetRelation can not be looked up from the cachedData of CacheManager because the key comparison fails even though it is the same LogicalPlan representing the Subquery that wraps the ParquetRelation.
The solution in this PR is overriding the LogicalPlan.sameResult function in Subquery case class to eliminate subquery node first before directly comparing the child (ParquetRelation), which will find the key to the cached InMemoryRelation.
Author: xin Wu <xinwu@us.ibm.com>
Closes#9326 from xwu0226/spark-11246-commit.
Before this PR, user has to consume the iterator of one group before process next group, or we will get into infinite loops.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9330 from cloud-fan/group.
This PR fixes a mistake in the code generated by `GenerateColumnAccessor`. Interestingly, although the code is illegal in Java (the class has two fields with the same name), Janino accepts it happily and accidentally works properly.
Author: Cheng Lian <lian@databricks.com>
Closes#9335 from liancheng/spark-11376.fix-generated-code.
JIRA: https://issues.apache.org/jira/browse/SPARK-11363
In SparkStrategies some places use LeftSemiJoin. It should be LeftSemi.
cc chenghao-intel liancheng
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9318 from viirya/no-left-semi-join.
In some cases, we can broadcast the smaller relation in cartesian join, which improve the performance significantly.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#8652 from chenghao-intel/cartesian.
This PR adds a new operation `joinWith` to a `Dataset`, which returns a `Tuple` for each pair where a given `condition` evaluates to true.
```scala
case class ClassData(a: String, b: Int)
val ds1 = Seq(ClassData("a", 1), ClassData("b", 2)).toDS()
val ds2 = Seq(("a", 1), ("b", 2)).toDS()
> ds1.joinWith(ds2, $"_1" === $"a").collect()
res0: Array((ClassData("a", 1), ("a", 1)), (ClassData("b", 2), ("b", 2)))
```
This operation is similar to the relation `join` function with one important difference in the result schema. Since `joinWith` preserves objects present on either side of the join, the result schema is similarly nested into a tuple under the column names `_1` and `_2`.
This type of join can be useful both for preserving type-safety with the original object types as well as working with relational data where either side of the join has column names in common.
## Required Changes to Encoders
In the process of working on this patch, several deficiencies to the way that we were handling encoders were discovered. Specifically, it turned out to be very difficult to `rebind` the non-expression based encoders to extract the nested objects from the results of joins (and also typed selects that return tuples).
As a result the following changes were made.
- `ClassEncoder` has been renamed to `ExpressionEncoder` and has been improved to also handle primitive types. Additionally, it is now possible to take arbitrary expression encoders and rewrite them into a single encoder that returns a tuple.
- All internal operations on `Dataset`s now require an `ExpressionEncoder`. If the users tries to pass a non-`ExpressionEncoder` in, an error will be thrown. We can relax this requirement in the future by constructing a wrapper class that uses expressions to project the row to the expected schema, shielding the users code from the required remapping. This will give us a nice balance where we don't force user encoders to understand attribute references and binding, but still allow our native encoder to leverage runtime code generation to construct specific encoders for a given schema that avoid an extra remapping step.
- Additionally, the semantics for different types of objects are now better defined. As stated in the `ExpressionEncoder` scaladoc:
- Classes will have their sub fields extracted by name using `UnresolvedAttribute` expressions
and `UnresolvedExtractValue` expressions.
- Tuples will have their subfields extracted by position using `BoundReference` expressions.
- Primitives will have their values extracted from the first ordinal with a schema that defaults
to the name `value`.
- Finally, the binding lifecycle for `Encoders` has now been unified across the codebase. Encoders are now `resolved` to the appropriate schema in the constructor of `Dataset`. This process replaces an unresolved expressions with concrete `AttributeReference` expressions. Binding then happens on demand, when an encoder is going to be used to construct an object. This closely mirrors the lifecycle for standard expressions when executing normal SQL or `DataFrame` queries.
Author: Michael Armbrust <michael@databricks.com>
Closes#9300 from marmbrus/datasets-tuples.
Currently, when a schema is inferred from a JSON file using sqlContext.read.json, the primitive object types are inferred as string, long, boolean, etc.
However, if the inferred type is too specific (JSON obviously does not enforce types itself), this can cause issues with merging dataframe schemas.
This pull request adds the option "primitivesAsString" to the JSON DataFrameReader which when true (defaults to false if not set) will infer all primitives as strings.
Below is an example usage of this new functionality.
```
val jsonDf = sqlContext.read.option("primitivesAsString", "true").json(sampleJsonFile)
scala> jsonDf.printSchema()
root
|-- bigInteger: string (nullable = true)
|-- boolean: string (nullable = true)
|-- double: string (nullable = true)
|-- integer: string (nullable = true)
|-- long: string (nullable = true)
|-- null: string (nullable = true)
|-- string: string (nullable = true)
```
Author: Stephen De Gennaro <stepheng@realitymine.com>
Closes#9249 from stephend-realitymine/stephend-primitives.
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes:
- MemoryManager
- StaticMemoryManager
- ExecutorMemoryManager
- TaskMemoryManager
- ShuffleMemoryManager
This is fairly confusing. To simplify things, this patch consolidates several of these classes:
- ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager.
- TaskMemoryManager is moved into Spark Core.
**Key changes and tasks**:
- [x] Merge ExecutorMemoryManager into MemoryManager.
- [x] Move pooling logic into Allocator.
- [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`.
- [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager.
- [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager.
- [x] Merge ShuffleMemoryManager into MemoryManager.
- [x] Move code
- [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.**
- [x] Port ShuffleMemoryManagerSuite tests.
- [x] Move classes from `unsafe` package to `memory` package.
- [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction.
- [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation
- [x] AbstractBytesToBytesMapSuite
- [x] UnsafeExternalSorterSuite
- [x] UnsafeFixedWidthAggregationMapSuite
- [x] UnsafeKVExternalSorterSuite
**Compatiblity notes**:
- This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9127 from JoshRosen/SPARK-10984.
This adds API for reading and writing text files, similar to SparkContext.textFile and RDD.saveAsTextFile.
```
SQLContext.read.text("/path/to/something.txt")
DataFrame.write.text("/path/to/write.txt")
```
Using the new Dataset API, this also supports
```
val ds: Dataset[String] = SQLContext.read.text("/path/to/something.txt").as[String]
```
Author: Reynold Xin <rxin@databricks.com>
Closes#9240 from rxin/SPARK-11274.
*This PR adds a new experimental API to Spark, tentitively named Datasets.*
A `Dataset` is a strongly-typed collection of objects that can be transformed in parallel using functional or relational operations. Example usage is as follows:
### Functional
```scala
> val ds: Dataset[Int] = Seq(1, 2, 3).toDS()
> ds.filter(_ % 1 == 0).collect()
res1: Array[Int] = Array(1, 2, 3)
```
### Relational
```scala
scala> ds.toDF().show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
> ds.select(expr("value + 1").as[Int]).collect()
res11: Array[Int] = Array(2, 3, 4)
```
## Comparison to RDDs
A `Dataset` differs from an `RDD` in the following ways:
- The creation of a `Dataset` requires the presence of an explicit `Encoder` that can be
used to serialize the object into a binary format. Encoders are also capable of mapping the
schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime
reflection based serialization.
- Internally, a `Dataset` is represented by a Catalyst logical plan and the data is stored
in the encoded form. This representation allows for additional logical operations and
enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
an object.
A `Dataset` can be converted to an `RDD` by calling the `.rdd` method.
## Comparison to DataFrames
A `Dataset` can be thought of as a specialized DataFrame, where the elements map to a specific
JVM object type, instead of to a generic `Row` container. A DataFrame can be transformed into
specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed
`Dataset` to a generic DataFrame by calling `ds.toDF()`.
## Implementation Status and TODOs
This is a rough cut at the least controversial parts of the API. The primary purpose here is to get something committed so that we can better parallelize further work and get early feedback on the API. The following is being deferred to future PRs:
- Joins and Aggregations (prototype here f11f91e6f0)
- Support for Java
Additionally, the responsibility for binding an encoder to a given schema is currently done in a fairly ad-hoc fashion. This is an internal detail, and what we are doing today works for the cases we care about. However, as we add more APIs we'll probably need to do this in a more principled way (i.e. separate resolution from binding as we do in DataFrames).
## COMPATIBILITY NOTE
Long term we plan to make `DataFrame` extend `Dataset[Row]`. However,
making this change to che class hierarchy would break the function signatures for the existing
function operations (map, flatMap, etc). As such, this class should be considered a preview
of the final API. Changes will be made to the interface after Spark 1.6.
Author: Michael Armbrust <michael@databricks.com>
Closes#9190 from marmbrus/dataset-infra.
To enable the unit test of `hadoopFsRelationSuite.Partition column type casting`. It previously threw exception like below, as we treat the auto infer partition schema with higher priority than the user specified one.
```
java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220)
at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
07:44:01.344 ERROR org.apache.spark.executor.Executor: Exception in task 14.0 in stage 3.0 (TID 206)
java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220)
at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes#8026 from chenghao-intel/partition_discovery.
There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8829 from JoshRosen/consolidate-sort-shuffle-implementations.
This PR change InMemoryTableScan to output UnsafeRow, and optimize the unrolling and scanning by coping the bytes for var-length types between UnsafeRow and ByteBuffer directly without creating the wrapper objects. When scanning the decimals in TPC-DS store_sales table, it's 80% faster (copy it as long without create Decimal objects).
Author: Davies Liu <davies@databricks.com>
Closes#9203 from davies/unsafe_cache.
I am changing the default behavior of `First`/`Last` to respect null values (the SQL standard default behavior).
https://issues.apache.org/jira/browse/SPARK-9740
Author: Yin Huai <yhuai@databricks.com>
Closes#8113 from yhuai/firstLast.
This PR introduce a new feature to run SQL directly on files without create a table, for example:
```
select id from json.`path/to/json/files` as j
```
Author: Davies Liu <davies@databricks.com>
Closes#9173 from davies/source.
Due to PARQUET-251, `BINARY` columns in existing Parquet files may be written with corrupted statistics information. This information is used by filter push-down optimization. Since Spark 1.5 turns on Parquet filter push-down by default, we may end up with wrong query results. PARQUET-251 has been fixed in parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0.
This affects all Spark SQL data types that can be mapped to Parquet {{BINARY}}, namely:
- `StringType`
- `BinaryType`
- `DecimalType`
(But Spark SQL doesn't support pushing down filters involving `DecimalType` columns for now.)
To avoid wrong query results, we should disable filter push-down for columns of `StringType` and `BinaryType` until we upgrade to parquet-mr 1.8.
Author: Cheng Lian <lian@databricks.com>
Closes#9152 from liancheng/spark-11153.workaround-parquet-251.
(cherry picked from commit 0887e5e878)
Signed-off-by: Cheng Lian <lian@databricks.com>
This PR improve the performance by:
1) Generate an Iterator that take Iterator[CachedBatch] as input, and call accessors (unroll the loop for columns), avoid the expensive Iterator.flatMap.
2) Use Unsafe.getInt/getLong/getFloat/getDouble instead of ByteBuffer.getInt/getLong/getFloat/getDouble, the later one actually read byte by byte.
3) Remove the unnecessary copy() in Coalesce(), which is not related to memory cache, found during benchmark.
The following benchmark showed that we can speedup the columnar cache of int by 2x.
```
path = '/opt/tpcds/store_sales/'
int_cols = ['ss_sold_date_sk', 'ss_sold_time_sk', 'ss_item_sk','ss_customer_sk']
df = sqlContext.read.parquet(path).select(int_cols).cache()
df.count()
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
Author: Davies Liu <davies@databricks.com>
Closes#9145 from davies/byte_buffer.
`DataSourceStrategy.mergeWithPartitionValues` is essentially a projection implemented in a quite inefficient way. This PR optimizes this method with `UnsafeProjection` to avoid unnecessary boxing costs.
Author: Cheng Lian <lian@databricks.com>
Closes#9104 from liancheng/spark-11088.faster-partition-values-merging.
The purpose of this PR is to keep the unsafe format detail only inside the unsafe class itself, so when we use them(like use unsafe array in unsafe map, use unsafe array and map in columnar cache), we don't need to understand the format before use them.
change list:
* unsafe array's 4-bytes numElements header is now required(was optional), and become a part of unsafe array format.
* w.r.t the previous changing, the `sizeInBytes` of unsafe array now counts the 4-bytes header.
* unsafe map's format was `[numElements] [key array numBytes] [key array content(without numElements header)] [value array content(without numElements header)]` before, which is a little hacky as it makes unsafe array's header optional. I think saving 4 bytes is not a big deal, so the format is now: `[key array numBytes] [unsafe key array] [unsafe value array]`.
* w.r.t the previous changing, the `sizeInBytes` of unsafe map now counts both map's header and array's header.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9131 from cloud-fan/unsafe.
SQLListener adds all stage infos to `_stageIdToStageMetrics`, but only removes stage infos belonging to SQL executions. This PR fixed it by ignoring stages that don't belong to SQL executions.
Reported by Terry Hoo in https://www.mail-archive.com/userspark.apache.org/msg38810.html
Author: zsxwing <zsxwing@gmail.com>
Closes#9132 from zsxwing/SPARK-11126.
Make sure comma-separated paths get processed correcly in ResolvedDataSource for a HadoopFsRelationProvider
Author: Koert Kuipers <koert@tresata.com>
Closes#8416 from koertkuipers/feat-sql-comma-separated-paths.
Groups are not resolved properly in scaladoc in following classes:
sql/core/src/main/scala/org/apache/spark/sql/Column.scala
sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Author: Pravin Gadakh <pravingadakh177@gmail.com>
Closes#9148 from pravingadakh/master.
Some json parsers are not closed. parser in JacksonParser#parseJson, for example.
Author: navis.ryu <navis@apache.org>
Closes#9130 from navis/SPARK-11124.
In Spark SQL, the Exchange planner tries to avoid unnecessary sorts in cases where the data has already been sorted by a superset of the requested sorting columns. For instance, let's say that a query calls for an operator's input to be sorted by `a.asc` and the input happens to already be sorted by `[a.asc, b.asc]`. In this case, we do not need to re-sort the input. The converse, however, is not true: if the query calls for `[a.asc, b.asc]`, then `a.asc` alone will not satisfy the ordering requirements, requiring an additional sort to be planned by Exchange.
However, the current Exchange code gets this wrong and incorrectly skips sorting when the existing output ordering is a subset of the required ordering. This is simple to fix, however.
This bug was introduced in https://github.com/apache/spark/pull/7458, so it affects 1.5.0+.
This patch fixes the bug and significantly improves the unit test coverage of Exchange's sort-planning logic.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9140 from JoshRosen/SPARK-11135.
This patch extends TungstenAggregate to support ImperativeAggregate functions. The existing TungstenAggregate operator only supported DeclarativeAggregate functions, which are defined in terms of Catalyst expressions and can be evaluated via generated projections. ImperativeAggregate functions, on the other hand, are evaluated by calling their `initialize`, `update`, `merge`, and `eval` methods.
The basic strategy here is similar to how SortBasedAggregate evaluates both types of aggregate functions: use a generated projection to evaluate the expression-based declarative aggregates with dummy placeholder expressions inserted in place of the imperative aggregate function output, then invoke the imperative aggregate functions and target them against the aggregation buffer. The bulk of the diff here consists of code that was copied and adapted from SortBasedAggregate, with some key changes to handle TungstenAggregate's sort fallback path.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9038 from JoshRosen/support-interpreted-in-tungsten-agg-final.
```scala
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
(2 to 3).map(i => Row(i, i.toString, 1)))
}
}
```
We expect the result to be:
```
2,1
3,1
```
But got
```
1,1
2,1
3,1
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes#8916 from chenghao-intel/partition_filter.
Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string.
Author: Wenchen Fan <wenchen@databricks.com>
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8453 from cloud-fan/table-name.
With this feature, we can track the query plan, time cost, exception during query execution for spark users.
Author: Wenchen Fan <cloud0fan@163.com>
Closes#9078 from cloud-fan/callback.
Two points in this PR:
1. Originally thought was that a named R list is assumed to be a struct in SerDe. But this is problematic because some R functions will implicitly generate named lists that are not intended to be a struct when transferred by SerDe. So SerDe clients have to explicitly mark a names list as struct by changing its class from "list" to "struct".
2. SerDe is in the Spark Core module, and data of StructType is represented as GenricRow which is defined in Spark SQL module. SerDe can't import GenricRow as in maven build Spark SQL module depends on Spark Core module. So this PR adds a registration hook in SerDe to allow SQLUtils in Spark SQL module to register its functions for serialization and deserialization of StructType.
Author: Sun Rui <rui.sun@intel.com>
Closes#8794 from sun-rui/SPARK-10051.
The SQLTab will be shared by multiple sessions.
If we create multiple independent SQLContexts (not using newSession()), will still see multiple SQLTabs in the Spark UI.
Author: Davies Liu <davies@databricks.com>
Closes#9048 from davies/sqlui.
Currently, All windows function could generate wrong result in cluster sometimes.
The root cause is that AttributeReference is called in executor, then id of it may not be unique than others created in driver.
Here is the script that could reproduce the problem (run in local cluster):
```
from pyspark import SparkContext, HiveContext
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
sqlContext = HiveContext(SparkContext())
sqlContext.setConf("spark.sql.shuffle.partitions", "3")
df = sqlContext.range(1<<20)
df2 = df.select((df.id % 1000).alias("A"), (df.id / 1000).alias('B'))
ws = Window.partitionBy(df2.A).orderBy(df2.B)
df3 = df2.select("client", "date", rowNumber().over(ws).alias("rn")).filter("rn < 0")
assert df3.count() == 0
```
Author: Davies Liu <davies@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#9050 from davies/wrong_window.
This PR improve the unrolling and read of complex types in columnar cache:
1) Using UnsafeProjection to do serialization of complex types, so they will not be serialized three times (two for actualSize)
2) Copy the bytes from UnsafeRow/UnsafeArrayData to ByteBuffer directly, avoiding the immediate byte[]
3) Using the underlying array in ByteBuffer to create UTF8String/UnsafeRow/UnsafeArrayData without copy.
Combine these optimizations, we can reduce the unrolling time from 25s to 21s (20% less), reduce the scanning time from 3.5s to 2.5s (28% less).
```
df = sqlContext.read.parquet(path)
t = time.time()
df.cache()
df.count()
print 'unrolling', time.time() - t
for i in range(10):
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
The schema is
```
root
|-- a: struct (nullable = true)
| |-- b: long (nullable = true)
| |-- c: string (nullable = true)
|-- d: array (nullable = true)
| |-- element: long (containsNull = true)
|-- e: map (nullable = true)
| |-- key: long
| |-- value: string (valueContainsNull = true)
```
Now the columnar cache depends on that UnsafeProjection support all the data types (including UDT), this PR also fix that.
Author: Davies Liu <davies@databricks.com>
Closes#9016 from davies/complex2.
For Parquet decimal columns that are encoded using plain-dictionary encoding, we can make the upper level converter aware of the dictionary, so that we can pre-instantiate all the decimals to avoid duplicated instantiation.
Note that plain-dictionary encoding isn't available for `FIXED_LEN_BYTE_ARRAY` for Parquet writer version `PARQUET_1_0`. So currently only decimals written as `INT32` and `INT64` can benefit from this optimization.
Author: Cheng Lian <lian@databricks.com>
Closes#9040 from liancheng/spark-11007.decimal-converter-dict-support.
SortBasedAggregationIterator uses a KVIterator interface in order to process input rows as key-value pairs, but this use of KVIterator is unnecessary, slightly complicates the code, and might hurt performance. This patch refactors this code to remove the use of this extra layer of iterator wrapping and simplifies other parts of the code in the process.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9066 from JoshRosen/sort-iterator-cleanup.
marmbrus
rxin
This patch adds a JdbcDialect class, which customizes the datatype mappings for Derby backends. The patch also adds unit tests for the new dialect, corresponding to the existing tests for other JDBC dialects.
JDBCSuite runs cleanly for me with this patch. So does JDBCWriteSuite, although it produces noise as described here: https://issues.apache.org/jira/browse/SPARK-10890
This patch is my original work, which I license to the ASF. I am a Derby contributor, so my ICLA is on file under SVN id "rhillegas": http://people.apache.org/committer-index.html
Touches the following files:
---------------------------------
org.apache.spark.sql.jdbc.JdbcDialects
Adds a DerbyDialect.
---------------------------------
org.apache.spark.sql.jdbc.JDBCSuite
Adds unit tests for the new DerbyDialect.
Author: Rick Hillegas <rhilleg@us.ibm.com>
Closes#8982 from rick-ibm/b_10855.
This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session.
A new session of SQLContext could be created by:
1) create an new SQLContext
2) call newSession() on existing SQLContext
For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession).
CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache.
Added jars are still shared by all the sessions, because SparkContext does not support sessions.
cc marmbrus yhuai rxin
Author: Davies Liu <davies@databricks.com>
Closes#8909 from davies/sessions.
This PR refactors Parquet write path to follow parquet-format spec. It's a successor of PR #7679, but with less non-essential changes.
Major changes include:
1. Replaces `RowWriteSupport` and `MutableRowWriteSupport` with `CatalystWriteSupport`
- Writes Parquet data using standard layout defined in parquet-format
Specifically, we are now writing ...
- ... arrays and maps in standard 3-level structure with proper annotations and field names
- ... decimals as `INT32` and `INT64` whenever possible, and taking `FIXED_LEN_BYTE_ARRAY` as the final fallback
- Supports legacy mode which is compatible with Spark 1.4 and prior versions
The legacy mode is by default off, and can be turned on by flipping SQL option `spark.sql.parquet.writeLegacyFormat` to `true`.
- Eliminates per value data type dispatching costs via prebuilt composed writer functions
1. Cleans up the last pieces of old Parquet support code
As pointed out by rxin previously, we probably want to rename all those `Catalyst*` Parquet classes to `Parquet*` for clarity. But I'd like to do this in a follow-up PR to minimize code review noises in this one.
Author: Cheng Lian <lian@databricks.com>
Closes#8988 from liancheng/spark-8848/standard-parquet-write-path.
In `aggregate/utils.scala`, there is a substantial amount of duplication in the expression-rewriting logic. As a prerequisite to supporting imperative aggregate functions in `TungstenAggregate`, this patch refactors this file so that the same expression-rewriting logic is used for both `SortAggregate` and `TungstenAggregate`.
In order to allow both operators to use the same rewriting logic, `TungstenAggregationIterator. generateResultProjection()` has been updated so that it first evaluates all declarative aggregate functions' `evaluateExpression`s and writes the results into a temporary buffer, and then uses this temporary buffer and the grouping expressions to evaluate the final resultExpressions. This matches the logic in SortAggregateIterator, where this two-pass approach is necessary in order to support imperative aggregates. If this change turns out to cause performance regressions, then we can look into re-implementing the single-pass evaluation in a cleaner way as part of a followup patch.
Since the rewriting logic is now shared across both operators, this patch also extracts that logic and places it in `SparkStrategies`. This makes the rewriting logic a bit easier to follow, I think.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9015 from JoshRosen/SPARK-10988.
This PR refactors `HashJoinNode` to take a existing `HashedRelation`. So, we can reuse this node for both `ShuffledHashJoin` and `BroadcastHashJoin`.
https://issues.apache.org/jira/browse/SPARK-10887
Author: Yin Huai <yhuai@databricks.com>
Closes#8953 from yhuai/SPARK-10887.
This PR addresses [SPARK-7869](https://issues.apache.org/jira/browse/SPARK-7869)
Before the patch, attempt to load the table from Postgres with JSON/JSONb datatype caused error `java.sql.SQLException: Unsupported type 1111`
Postgres data types JSON and JSONb are now mapped to String on Spark side thus they can be loaded into DF and processed on Spark side
Example
Postgres:
```
create table test_json (id int, value json);
create table test_jsonb (id int, value jsonb);
insert into test_json (id, value) values
(1, '{"field1":"value1","field2":"value2","field3":[1,2,3]}'::json),
(2, '{"field1":"value3","field2":"value4","field3":[4,5,6]}'::json),
(3, '{"field3":"value5","field4":"value6","field3":[7,8,9]}'::json);
insert into test_jsonb (id, value) values
(4, '{"field1":"value1","field2":"value2","field3":[1,2,3]}'::jsonb),
(5, '{"field1":"value3","field2":"value4","field3":[4,5,6]}'::jsonb),
(6, '{"field3":"value5","field4":"value6","field3":[7,8,9]}'::jsonb);
```
PySpark:
```
>>> import json
>>> df1 = sqlContext.read.jdbc("jdbc:postgresql://127.0.0.1:5432/test?user=testuser", "test_json")
>>> df1.map(lambda x: (x.id, json.loads(x.value))).map(lambda (id, value): (id, value.get('field3'))).collect()
[(1, [1, 2, 3]), (2, [4, 5, 6]), (3, [7, 8, 9])]
>>> df2 = sqlContext.read.jdbc("jdbc:postgresql://127.0.0.1:5432/test?user=testuser", "test_jsonb")
>>> df2.map(lambda x: (x.id, json.loads(x.value))).map(lambda (id, value): (id, value.get('field1'))).collect()
[(4, u'value1'), (5, u'value3'), (6, None)]
```
Author: 0x0FFF <programmerag@gmail.com>
Closes#8948 from 0x0FFF/SPARK-7869.
This PR improve the performance of complex types in columnar cache by using UnsafeProjection instead of KryoSerializer.
A simple benchmark show that this PR could improve the performance of scanning a cached table with complex columns by 15x (comparing to Spark 1.5).
Here is the code used to benchmark:
```
df = sc.range(1<<23).map(lambda i: Row(a=Row(b=i, c=str(i)), d=range(10), e=dict(zip(range(10), [str(i) for i in range(10)])))).toDF()
df.write.parquet("table")
```
```
df = sqlContext.read.parquet("table")
df.cache()
df.count()
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
Author: Davies Liu <davies@databricks.com>
Closes#8971 from davies/complex.
This patch allows `Repartition` to support UnsafeRows. This is accomplished by implementing the logical `Repartition` operator in terms of `Exchange` and a new `RoundRobinPartitioning`.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8083 from JoshRosen/SPARK-9702.
DeclarativeAggregate matches more closely with ImperativeAggregate we already have.
Author: Reynold Xin <rxin@databricks.com>
Closes#9013 from rxin/SPARK-10982.
This patch refactors several of the Aggregate2 interfaces in order to improve code clarity.
The biggest change is a refactoring of the `AggregateFunction2` class hierarchy. In the old code, we had a class named `AlgebraicAggregate` that inherited from `AggregateFunction2`, added a new set of methods, then banned the use of the inherited methods. I found this to be fairly confusing because.
If you look carefully at the existing code, you'll see that subclasses of `AggregateFunction2` fall into two disjoint categories: imperative aggregation functions which directly extended `AggregateFunction2` and declarative, expression-based aggregate functions which extended `AlgebraicAggregate`. In order to make this more explicit, this patch refactors things so that `AggregateFunction2` is a sealed abstract class with two subclasses, `ImperativeAggregateFunction` and `ExpressionAggregateFunction`. The superclass, `AggregateFunction2`, now only contains methods and fields that are common to both subclasses.
After making this change, I updated the various AggregationIterator classes to comply with this new naming scheme. I also performed several small renamings in the aggregate interfaces themselves in order to improve clarity and rewrote or expanded a number of comments.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8973 from JoshRosen/tungsten-agg-comments.
This PR remove the typeId in columnar cache, it's not needed anymore, it also remove DATE and TIMESTAMP (use INT/LONG instead).
Author: Davies Liu <davies@databricks.com>
Closes#8989 from davies/refactor_cache.
Given LogicalRelation (and other classes) were moved from sources package to execution.sources package, removed private[sql] to make LogicalRelation public to facilitate access for data sources.
Author: gweidner <gweidner@us.ibm.com>
Closes#8965 from gweidner/SPARK-7275.
We introduced SQL option `spark.sql.parquet.followParquetFormatSpec` while working on implementing Parquet backwards-compatibility rules in SPARK-6777. It indicates whether we should use legacy Parquet format adopted by Spark 1.4 and prior versions or the standard format defined in parquet-format spec to write Parquet files.
This option defaults to `false` and is marked as a non-public option (`isPublic = false`) because we haven't finished refactored Parquet write path. The problem is, the name of this option is somewhat confusing, because it's not super intuitive why we shouldn't follow the spec. Would be nice to rename it to `spark.sql.parquet.writeLegacyFormat`, and invert its default value (the two option names have opposite meanings).
Although this option is private in 1.5, we'll make it public in 1.6 after refactoring Parquet write path. So that users can decide whether to write Parquet files in standard format or legacy format.
Author: Cheng Lian <lian@databricks.com>
Closes#8566 from liancheng/spark-10400/deprecate-follow-parquet-format-spec.
When reading Parquet string and binary-backed decimal values, Parquet `Binary.getBytes` always returns a copied byte array, which is unnecessary. Since the underlying implementation of `Binary` values there is guaranteed to be `ByteArraySliceBackedBinary`, and Parquet itself never reuses underlying byte arrays, we can use `Binary.toByteBuffer.array()` to steal the underlying byte arrays without copying them.
This brings performance benefits when scanning Parquet string and binary-backed decimal columns. Note that, this trick doesn't cover binary-backed decimals with precision greater than 18.
My micro-benchmark result is that, this brings a ~15% performance boost for scanning TPC-DS `store_sales` table (scale factor 15).
Another minor optimization done in this PR is that, now we directly construct a Java `BigDecimal` in `Decimal.toJavaBigDecimal` without constructing a Scala `BigDecimal` first. This brings another ~5% performance gain.
Author: Cheng Lian <lian@databricks.com>
Closes#8907 from liancheng/spark-10811/eliminate-array-copying.
The UTF8String may come from UnsafeRow, then underline buffer of it is not copied, so we should clone it in order to hold it in Stats.
cc yhuai
Author: Davies Liu <davies@databricks.com>
Closes#8929 from davies/pushdown_string.
https://issues.apache.org/jira/browse/SPARK-10741
I choose the second approach: do not change output exprIds when convert MetastoreRelation to LogicalRelation
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8889 from cloud-fan/hot-bug.
This makes two changes:
- Allow reduce tasks to fetch multiple map output partitions -- this is a pretty small change to HashShuffleFetcher
- Move shuffle locality computation out of DAGScheduler and into ShuffledRDD / MapOutputTracker; this was needed because the code in DAGScheduler wouldn't work for RDDs that fetch multiple map output partitions from each reduce task
I also added an AdaptiveSchedulingSuite that creates RDDs depending on multiple map output partitions.
Author: Matei Zaharia <matei@databricks.com>
Closes#8844 from mateiz/spark-9852.
JIRA: https://issues.apache.org/jira/browse/SPARK-10705
As described in the JIRA ticket, `DataFrame.toJSON` uses `DataFrame.mapPartitions`, which converts internal rows to external rows. We should use `queryExecution.toRdd.mapPartitions` that directly uses internal rows for better performance.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8865 from viirya/df-tojson-internalrow.
This patch reverts most of the changes in a previous fix#8827.
The real cause of the issue is that in `TungstenAggregate`'s prepare method we only reserve 1 page, but later when we switch to sort-based aggregation we try to acquire 1 page AND a pointer array. The longer-term fix should be to reserve also the pointer array, but for now ***we will simply not track the pointer array***. (Note that elsewhere we already don't track the pointer array, e.g. [here](a18208047f/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java (L88)))
Note: This patch reuses the unit test added in #8827 so it doesn't show up in the diff.
Author: Andrew Or <andrew@databricks.com>
Closes#8888 from andrewor14/dont-track-pointer-array.
Python DataFrame.head/take now requires scanning all the partitions. This pull request changes them to delegate the actual implementation to Scala DataFrame (by calling DataFrame.take).
This is more of a hack for fixing this issue in 1.5.1. A more proper fix is to change executeCollect and executeTake to return InternalRow rather than Row, and thus eliminate the extra round-trip conversion.
Author: Reynold Xin <rxin@databricks.com>
Closes#8876 from rxin/SPARK-10731.