Currently SortMergeJoin and BroadcastHashJoin do not support condition, the need a followed Filter for that, the result projection to generate UnsafeRow could be very expensive if they generate lots of rows and could be filtered mostly by condition.
This PR brings the support of condition for SortMergeJoin and BroadcastHashJoin, just like other outer joins do.
This could improve the performance of Q72 by 7x (from 120s to 16.5s).
Author: Davies Liu <davies@databricks.com>
Closes#10653 from davies/filter_join.
In SPARK-10743 we wrap cast with `UnresolvedAlias` to give `Cast` a better alias if possible. However, for cases like `filter`, the `UnresolvedAlias` can't be resolved and actually we don't need a better alias for this case. This PR move the cast wrapping logic to `Column.named` so that we will only do it when we need a alias name.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10781 from cloud-fan/bug.
This pull request removes the public developer parser API for external parsers. Given everything a parser depends on (e.g. logical plans and expressions) are internal and not stable, external parsers will break with every release of Spark. It is a bad idea to create the illusion that Spark actually supports pluggable parsers. In addition, this also reduces incentives for 3rd party projects to contribute parse improvements back to Spark.
Author: Reynold Xin <rxin@databricks.com>
Closes#10801 from rxin/SPARK-12855.
This is the initial work for whole stage codegen, it support Projection/Filter/Range, we will continue work on this to support more physical operators.
A micro benchmark show that a query with range, filter and projection could be 3X faster then before.
It's turned on by default. For a tree that have at least two chained plans, a WholeStageCodegen will be inserted into it, for example, the following plan
```
Limit 10
+- Project [(id#5L + 1) AS (id + 1)#6L]
+- Filter ((id#5L & 1) = 1)
+- Range 0, 1, 4, 10, [id#5L]
```
will be translated into
```
Limit 10
+- WholeStageCodegen
+- Project [(id#1L + 1) AS (id + 1)#2L]
+- Filter ((id#1L & 1) = 1)
+- Range 0, 1, 4, 10, [id#1L]
```
Here is the call graph to generate Java source for A and B (A support codegen, but B does not):
```
* WholeStageCodegen Plan A FakeInput Plan B
* =========================================================================
*
* -> execute()
* |
* doExecute() --------> produce()
* |
* doProduce() -------> produce()
* |
* doProduce() ---> execute()
* |
* consume()
* doConsume() ------------|
* |
* doConsume() <----- consume()
```
A SparkPlan that support codegen need to implement doProduce() and doConsume():
```
def doProduce(ctx: CodegenContext): (RDD[InternalRow], String)
def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String
```
Author: Davies Liu <davies@databricks.com>
Closes#10735 from davies/whole2.
This inlines a few of the Parquet decoders and adds vectorized APIs to support decoding in batch.
There are a few particulars in the Parquet encodings that make this much more efficient. In
particular, RLE encodings are very well suited for batch decoding. The Parquet 2.0 encodings are
also very suited for this.
This is a work in progress and does not affect the current execution. In subsequent patches, we will
support more encodings and types before enabling this.
Simple benchmarks indicate this can decode single ints about > 3x faster.
Author: Nong Li <nong@databricks.com>
Author: Nong <nongli@gmail.com>
Closes#10593 from nongli/spark-12644.
This PR adds the support to read bucketed tables, and correctly populate `outputPartitioning`, so that we can avoid shuffle for some cases.
TODO(follow-up PRs):
* bucket pruning
* avoid shuffle for bucketed table join when use any super-set of the bucketing key.
(we should re-visit it after https://issues.apache.org/jira/browse/SPARK-12704 is fixed)
* recognize hive bucketed table
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10604 from cloud-fan/bucket-read.
In this PR the new CatalystQl parser stack reaches grammar parity with the old Parser-Combinator based SQL Parser. This PR also replaces all uses of the old Parser, and removes it from the code base.
Although the existing Hive and SQL parser dialects were mostly the same, some kinks had to be worked out:
- The SQL Parser allowed syntax like ```APPROXIMATE(0.01) COUNT(DISTINCT a)```. In order to make this work we needed to hardcode approximate operators in the parser, or we would have to create an approximate expression. ```APPROXIMATE_COUNT_DISTINCT(a, 0.01)``` would also do the job and is much easier to maintain. So, this PR **removes** this keyword.
- The old SQL Parser supports ```LIMIT``` clauses in nested queries. This is **not supported** anymore. See https://github.com/apache/spark/pull/10689 for the rationale for this.
- Hive has a charset name char set literal combination it supports, for instance the following expression ```_ISO-8859-1 0x4341464562616265``` would yield this string: ```CAFEbabe```. Hive will only allow charset names to start with an underscore. This is quite annoying in spark because as soon as you use a tuple names will start with an underscore. In this PR we **remove** this feature from the parser. It would be quite easy to implement such a feature as an Expression later on.
- Hive and the SQL Parser treat decimal literals differently. Hive will turn any decimal into a ```Double``` whereas the SQL Parser would convert a non-scientific decimal into a ```BigDecimal```, and would turn a scientific decimal into a Double. We follow Hive's behavior here. The new parser supports a big decimal literal, for instance: ```81923801.42BD```, which can be used when a big decimal is needed.
cc rxin viirya marmbrus yhuai cloud-fan
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10745 from hvanhovell/SPARK-12575-2.
CSV is the most common data format in the "small data" world. It is often the first format people want to try when they see Spark on a single node. Having to rely on a 3rd party component for this leads to poor user experience for new users. This PR merges the popular spark-csv data source package (https://github.com/databricks/spark-csv) with SparkSQL.
This is a first PR to bring the functionality to spark 2.0 master. We will complete items outlines in the design document (see JIRA attachment) in follow up pull requests.
Author: Hossein <hossein@databricks.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#10766 from rxin/csv.
The goal of this PR is to eliminate unnecessary translations when there are back-to-back `MapPartitions` operations. In order to achieve this I also made the following simplifications:
- Operators no longer have hold encoders, instead they have only the expressions that they need. The benefits here are twofold: the expressions are visible to transformations so go through the normal resolution/binding process. now that they are visible we can change them on a case by case basis.
- Operators no longer have type parameters. Since the engine is responsible for its own type checking, having the types visible to the complier was an unnecessary complication. We still leverage the scala compiler in the companion factory when constructing a new operator, but after this the types are discarded.
Deferred to a follow up PR:
- Remove as much of the resolution/binding from Dataset/GroupedDataset as possible. We should still eagerly check resolution and throw an error though in the case of mismatches for an `as` operation.
- Eliminate serializations in more cases by adding more cases to `EliminateSerialization`
Author: Michael Armbrust <michael@databricks.com>
Closes#10747 from marmbrus/encoderExpressions.
This PR makes bucketing and exchange share one common hash algorithm, so that we can guarantee the data distribution is same between shuffle and bucketed data source, which enables us to only shuffle one side when join a bucketed table and a normal one.
This PR also fixes the tests that are broken by the new hash behaviour in shuffle.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10703 from cloud-fan/use-hash-expr-in-shuffle.
Fix the style violation (space before , and :).
This PR is a followup for #10643 and rework of #10685 .
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#10732 from sarutak/SPARK-12692-followup-sql.
There are many potential benefits of having an efficient in memory columnar format as an alternate
to UnsafeRow. This patch introduces ColumnarBatch/ColumnarVector which starts this effort. The
remaining implementation can be done as follow up patches.
As stated in the in the JIRA, there are useful external components that operate on memory in a
simple columnar format. ColumnarBatch would serve that purpose and could server as a
zero-serialization/zero-copy exchange for this use case.
This patch supports running the underlying data either on heap or off heap. On heap runs a bit
faster but we would need offheap for zero-copy exchanges. Currently, this mode is hidden behind one
interface (ColumnVector).
This differs from Parquet or the existing columnar cache because this is *not* intended to be used
as a storage format. The focus is entirely on CPU efficiency as we expect to only have 1 of these
batches in memory per task. The layout of the values is just dense arrays of the value type.
Author: Nong Li <nong@databricks.com>
Author: Nong <nongli@gmail.com>
Closes#10628 from nongli/spark-12635.
This PR implements SQL generation support for persisted data source tables. A new field `metastoreTableIdentifier: Option[TableIdentifier]` is added to `LogicalRelation`. When a `LogicalRelation` representing a persisted data source relation is created, this field holds the database name and table name of the relation.
Author: Cheng Lian <lian@databricks.com>
Closes#10712 from liancheng/spark-12724-datasources-sql-gen.
Let me know whether you'd like to see it in other place
Author: Robert Kruszewski <robertk@palantir.com>
Closes#10210 from robert3005/feature/pluggable-optimizer.
Fix the style violation (space before , and :).
This PR is a followup for #10643.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#10718 from sarutak/SPARK-12692-followup-sql.
JIRA: https://issues.apache.org/jira/browse/SPARK-12744
This PR makes parsing JSON integers to timestamps consistent with casting behavior.
Author: Anatoliy Plastinin <anatoliy.plastinin@gmail.com>
Closes#10687 from antlypls/fix-json-timestamp-parsing.
Turn import ordering violations into build errors, plus a few adjustments
to account for how the checker behaves. I'm a little on the fence about
whether the existing code is right, but it's easier to appease the checker
than to discuss what's the more correct order here.
Plus a few fixes to imports that cropped in since my recent cleanups.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#10612 from vanzin/SPARK-3873-enable.
Fix most build warnings: mostly deprecated API usages. I'll annotate some of the changes below. CC rxin who is leading the charge to remove the deprecated APIs.
Author: Sean Owen <sowen@cloudera.com>
Closes#10570 from srowen/SPARK-12618.
This PR is continue from previous closed PR 10314.
In this PR, SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE will be taken memory string conventions as input.
For example, the user can now specify 10g for SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE in SQLConf file.
marmbrus srowen : Can you help review this code changes ? Thanks.
Author: Kevin Yu <qyu@us.ibm.com>
Closes#10629 from kevinyu98/spark-12317.
[SPARK-12640][SQL] Add simple benchmarking utility class and add Parquet scan benchmarks.
We've run benchmarks ad hoc to measure the scanner performance. We will continue to invest in this
and it makes sense to get these benchmarks into code. This adds a simple benchmarking utility to do
this.
Author: Nong Li <nong@databricks.com>
Author: Nong <nongli@gmail.com>
Closes#10589 from nongli/spark-12640.
For queries like :
select <> from table group by a distribute by a
we can eliminate distribute by ; since group by will anyways do a hash partitioning
Also applicable when user uses Dataframe API
Author: Yash Datta <Yash.Datta@guavus.com>
Closes#9858 from saucam/eliminatedistribute.
This fix masks JDBC credentials in the explain output. URL patterns to specify credential seems to be vary between different databases. Added a new method to dialect to mask the credentials according to the database specific URL pattern.
While adding tests I noticed explain output includes array variable for partitions ([Lorg.apache.spark.Partition;3ff74546,). Modified the code to include the first, and last partition information.
Author: sureshthalamati <suresh.thalamati@gmail.com>
Closes#10452 from sureshthalamati/mask_jdbc_credentials_spark-12504.
As noted in the code, this change is to make this component easier to test in isolation.
Author: Nong <nongli@gmail.com>
Closes#10581 from nongli/spark-12636.
just write the arguments into unsafe row and use murmur3 to calculate hash code
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10435 from cloud-fan/hash-expr.
The reader was previously not setting the row length meaning it was wrong if there were variable
length columns. This problem does not manifest usually, since the value in the column is correct and
projecting the row fixes the issue.
Author: Nong Li <nong@databricks.com>
Closes#10576 from nongli/spark-12589.
This PR enable cube/rollup as function, so they can be used as this:
```
select a, b, sum(c) from t group by rollup(a, b)
```
Author: Davies Liu <davies@databricks.com>
Closes#10522 from davies/rollup.
We can provides the option to choose JSON parser can be enabled to accept quoting of all character or not.
Author: Cazen <Cazen@korea.com>
Author: Cazen Lee <cazen.lee@samsung.com>
Author: Cazen Lee <Cazen@korea.com>
Author: cazen.lee <cazen.lee@samsung.com>
Closes#10497 from Cazen/master.
Avoiding the the No such table exception and throwing analysis exception as per the bug: SPARK-12533
Author: thomastechs <thomas.sebastian@tcs.com>
Closes#10529 from thomastechs/topic-branch.
This PR is followed by https://github.com/apache/spark/pull/8391.
Previous PR fixes JDBCRDD to support null-safe equality comparison for JDBC datasource. This PR fixes the problem that it can actually return null as a result of the comparison resulting error as using the value of that comparison.
Author: hyukjinkwon <gurwls223@gmail.com>
Author: HyukjinKwon <gurwls223@gmail.com>
Closes#8743 from HyukjinKwon/SPARK-10180.
It's confusing that some operator output UnsafeRow but some not, easy to make mistake.
This PR change to only output UnsafeRow for all the operators (SparkPlan), removed the rule to insert Unsafe/Safe conversions. For those that can't output UnsafeRow directly, added UnsafeProjection into them.
Closes#10330
cc JoshRosen rxin
Author: Davies Liu <davies@databricks.com>
Closes#10511 from davies/unsafe_row.
This patch refactors the filter pushdown for JDBCRDD and also adds few filters.
Added filters are basically from #10468 with some refactoring. Test cases are from #10468.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#10470 from viirya/refactor-jdbc-filter.
Right now, numFields will be passed in by pointTo(), then bitSetWidthInBytes is calculated, making pointTo() a little bit heavy.
It should be part of constructor of UnsafeRow.
Author: Davies Liu <davies@databricks.com>
Closes#10528 from davies/numFields.
This is rework from #10386 and add more tests and LIKE push-down support.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#10468 from maropu/SupportMorePushdownInJdbc.
```
org.apache.spark.sql.AnalysisException: cannot resolve 'value' given input columns text;
```
lets put a `:` after `columns` and put the columns in `[]` so that they match the toString of DataFrame.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10518 from gatorsmile/improveAnalysisExceptionMsg.
If DataFrame has BYTE types, throws an exception:
org.postgresql.util.PSQLException: ERROR: type "byte" does not exist
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#9350 from maropu/FixBugInPostgreJdbc.
When explain any plan with Generate, we will see an exclamation mark in the plan. Normally, when we see this mark, it means the plan has an error. This PR is to correct the `missingInput` in `Generate`.
For example,
```scala
val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
val df2 =
df.explode('letters) {
case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
}
df2.explain(true)
```
Before the fix, the plan is like
```
== Parsed Logical Plan ==
'Generate UserDefinedGenerator('letters), true, false, None
+- Project [_1#0 AS number#2,_2#1 AS letters#3]
+- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]]
== Analyzed Logical Plan ==
number: int, letters: string, _1: string
Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8]
+- Project [_1#0 AS number#2,_2#1 AS letters#3]
+- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]]
== Optimized Logical Plan ==
Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8]
+- LocalRelation [number#2,letters#3], [[1,a b c],[2,a b],[3,a]]
== Physical Plan ==
!Generate UserDefinedGenerator(letters#3), true, false, [number#2,letters#3,_1#8]
+- LocalTableScan [number#2,letters#3], [[1,a b c],[2,a b],[3,a]]
```
**Updates**: The same issues are also found in the other four Dataset operators: `MapPartitions`/`AppendColumns`/`MapGroups`/`CoGroup`. Fixed all these four.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10393 from gatorsmile/generateExplain.
Hello Michael & All:
We have some issues to submit the new codes in the other PR(#10299), so we closed that PR and open this one with the fix.
The reason for the previous failure is that the projection for the scan when there is a filter that is not pushed down (the "left-over" filter) could be different, in elements or ordering, from the original projection.
With this new codes, the approach to solve this problem is:
Insert a new Project if the "left-over" filter is nonempty and (the original projection is not empty and the projection for the scan has more than one elements which could otherwise cause different ordering in projection).
We create 3 test cases to cover the otherwise failure cases.
Author: Kevin Yu <qyu@us.ibm.com>
Closes#10388 from kevinyu98/spark-12231.
This PR is a follow-up of PR #10362.
Two major changes:
1. The fix introduced in #10362 is OK for Parquet, but may disable ORC PPD in many cases
PR #10362 stops converting an `AND` predicate if any branch is inconvertible. On the other hand, `OrcFilters` combines all filters into a single big conjunction first and then tries to convert it into ORC `SearchArgument`. This means, if any filter is inconvertible, no filters can be pushed down. This PR fixes this issue by finding out all convertible filters first before doing the actual conversion.
The reason behind the current implementation is mostly due to the limitation of ORC `SearchArgument` builder, which is documented in this PR in detail.
1. Copied the `AND` predicate fix for ORC from #10362 to avoid merge conflict.
Same as #10362, this PR targets master (2.0.0-SNAPSHOT), branch-1.6, and branch-1.5.
Author: Cheng Lian <lian@databricks.com>
Closes#10377 from liancheng/spark-12218.fix-orc-conjunction-ppd.
Accessing null elements in an array field fails when tungsten is enabled.
It works in Spark 1.3.1, and in Spark > 1.5 with Tungsten disabled.
This PR solves this by checking if the accessed element in the array field is null, in the generated code.
Example:
```
// Array of String
case class AS( as: Seq[String] )
val dfAS = sc.parallelize( Seq( AS ( Seq("a",null,"b") ) ) ).toDF
dfAS.registerTempTable("T_AS")
for (i <- 0 to 2) { println(i + " = " + sqlContext.sql(s"select as[$i] from T_AS").collect.mkString(","))}
```
With Tungsten disabled:
```
0 = [a]
1 = [null]
2 = [b]
```
With Tungsten enabled:
```
0 = [a]
15/12/22 09:32:50 ERROR Executor: Exception in task 7.0 in stage 1.0 (TID 15)
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$UTF8StringWriter.getSize(UnsafeRowWriters.java:90)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:90)
at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:88)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
```
Author: pierre-borckmans <pierre.borckmans@realimpactanalytics.com>
Closes#10429 from pierre-borckmans/SPARK-12477_Tungsten-Projection-Null-Element-In-Array.
When the filter is ```"b in ('1', '2')"```, the filter is not pushed down to Parquet. Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10278 from gatorsmile/parquetFilterNot.
When creating extractors for product types (i.e. case classes and tuples), a null check is missing, thus we always assume input product values are non-null.
This PR adds a null check in the extractor expression for product types. The null check is stripped off for top level product fields, which are mapped to the outermost `Row`s, since they can't be null.
Thanks cloud-fan for helping investigating this issue!
Author: Cheng Lian <lian@databricks.com>
Closes#10431 from liancheng/spark-12478.top-level-null-field.
This PR adds a new expression `AssertNotNull` to ensure non-nullable fields of products and case classes don't receive null values at runtime.
Author: Cheng Lian <lian@databricks.com>
Closes#10331 from liancheng/dataset-nullability-check.
According the benchmark [1], LZ4-java could be 80% (or 30%) faster than Snappy.
After changing the compressor to LZ4, I saw 20% improvement on end-to-end time for a TPCDS query (Q4).
[1] https://github.com/ning/jvm-compressor-benchmark/wiki
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#10342 from davies/lz4.
Based on the suggestions from marmbrus , added logical/physical operators for Range for improving the performance.
Also added another API for resolving the JIRA Spark-12150.
Could you take a look at my implementation, marmbrus ? If not good, I can rework it. : )
Thank you very much!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10335 from gatorsmile/rangeOperators.
When a DataFrame or Dataset has a long schema, we should intelligently truncate to avoid flooding the screen with unreadable information.
// Standard output
[a: int, b: int]
// Truncate many top level fields
[a: int, b, string ... 10 more fields]
// Truncate long inner structs
[a: struct<a: Int ... 10 more fields>]
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#10373 from dilipbiswal/spark-12398.
Now `StaticInvoke` receives `Any` as a object and `StaticInvoke` can be serialized but sometimes the object passed is not serializable.
For example, following code raises Exception because `RowEncoder#extractorsFor` invoked indirectly makes `StaticInvoke`.
```
case class TimestampContainer(timestamp: java.sql.Timestamp)
val rdd = sc.parallelize(1 to 2).map(_ => TimestampContainer(System.currentTimeMillis))
val df = rdd.toDF
val ds = df.as[TimestampContainer]
val rdd2 = ds.rdd <----------------- invokes extractorsFor indirectory
```
I'll add test cases.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Author: Michael Armbrust <michael@databricks.com>
Closes#10357 from sarutak/SPARK-12404.
JIRA: https://issues.apache.org/jira/browse/SPARK-12218
When creating filters for Parquet/ORC, we should not push nested AND expressions partially.
Author: Yin Huai <yhuai@databricks.com>
Closes#10362 from yhuai/SPARK-12218.
Description of the problem from cloud-fan
Actually this line: https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L689
When we use `selectExpr`, we pass in `UnresolvedFunction` to `DataFrame.select` and fall in the last case. A workaround is to do special handling for UDTF like we did for `explode`(and `json_tuple` in 1.6), wrap it with `MultiAlias`.
Another workaround is using `expr`, for example, `df.select(expr("explode(a)").as(Nil))`, I think `selectExpr` is no longer needed after we have the `expr` function....
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9981 from dilipbiswal/spark-11619.
Hide the error logs for 'SQLListenerMemoryLeakSuite' to avoid noises. Most of changes are space changes.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#10363 from zsxwing/hide-log.
This PR removes Hive windows functions from Spark and replaces them with (native) Spark ones. The PR is on par with Hive in terms of features.
This has the following advantages:
* Better memory management.
* The ability to use spark UDAFs in Window functions.
cc rxin / yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9819 from hvanhovell/SPARK-8641-2.
For API DataFrame.join(right, usingColumns, joinType), if the joinType is right_outer or full_outer, the resulting join columns could be wrong (will be null).
The order of columns had been changed to match that with MySQL and PostgreSQL [1].
This PR also fix the nullability of output for outer join.
[1] http://www.postgresql.org/docs/9.2/static/queries-table-expressions.html
Author: Davies Liu <davies@databricks.com>
Closes#10353 from davies/fix_join.
This PR makes JSON parser and schema inference handle more cases where we have unparsed records. It is based on #10043. The last commit fixes the failed test and updates the logic of schema inference.
Regarding the schema inference change, if we have something like
```
{"f1":1}
[1,2,3]
```
originally, we will get a DF without any column.
After this change, we will get a DF with columns `f1` and `_corrupt_record`. Basically, for the second row, `[1,2,3]` will be the value of `_corrupt_record`.
When merge this PR, please make sure that the author is simplyianm.
JIRA: https://issues.apache.org/jira/browse/SPARK-12057Closes#10043
Author: Ian Macalinao <me@ian.pw>
Author: Yin Huai <yhuai@databricks.com>
Closes#10288 from yhuai/handleCorruptJson.
Based on the suggestions from marmbrus cloud-fan in https://github.com/apache/spark/pull/10165 , this PR is to print the decoded values(user objects) in `Dataset.show`
```scala
implicit val kryoEncoder = Encoders.kryo[KryoClassData]
val ds = Seq(KryoClassData("a", 1), KryoClassData("b", 2), KryoClassData("c", 3)).toDS()
ds.show(20, false);
```
The current output is like
```
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1, 0, 111, 114, 103, 46, 97, 112, 97, 99, 104, 101, 46, 115, 112, 97, 114, 107, 46, 115, 113, 108, 46, 75, 114, 121, 111, 67, 108, 97, 115, 115, 68, 97, 116, -31, 1, 1, -126, 97, 2]|
|[1, 0, 111, 114, 103, 46, 97, 112, 97, 99, 104, 101, 46, 115, 112, 97, 114, 107, 46, 115, 113, 108, 46, 75, 114, 121, 111, 67, 108, 97, 115, 115, 68, 97, 116, -31, 1, 1, -126, 98, 4]|
|[1, 0, 111, 114, 103, 46, 97, 112, 97, 99, 104, 101, 46, 115, 112, 97, 114, 107, 46, 115, 113, 108, 46, 75, 114, 121, 111, 67, 108, 97, 115, 115, 68, 97, 116, -31, 1, 1, -126, 99, 6]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
After the fix, it will be like the below if and only if the users override the `toString` function in the class `KryoClassData`
```scala
override def toString: String = s"KryoClassData($a, $b)"
```
```
+-------------------+
|value |
+-------------------+
|KryoClassData(a, 1)|
|KryoClassData(b, 2)|
|KryoClassData(c, 3)|
+-------------------+
```
If users do not override the `toString` function, the results will be like
```
+---------------------------------------+
|value |
+---------------------------------------+
|org.apache.spark.sql.KryoClassData68ef|
|org.apache.spark.sql.KryoClassData6915|
|org.apache.spark.sql.KryoClassData693b|
+---------------------------------------+
```
Question: Should we add another optional parameter in the function `show`? It will decide if the function `show` will display the hex values or the object values?
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10215 from gatorsmile/showDecodedValue.
https://issues.apache.org/jira/browse/SPARK-12249
Currently `!=` operator is not pushed down correctly.
I simply added a case for this.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#10233 from HyukjinKwon/SPARK-12249.
https://issues.apache.org/jira/browse/SPARK-12236
Currently JDBC filters are not tested properly. All the tests pass even if the filters are not pushed down due to Spark-side filtering.
In this PR,
Firstly, I corrected the tests to properly check the pushed down filters by removing Spark-side filtering.
Also, `!=` was being tested which is actually not pushed down. So I removed them.
Lastly, I moved the `stripSparkFilter()` function to `SQLTestUtils` as this functions would be shared for all tests for pushed down filters. This function would be also shared with ORC datasource as the filters for that are also not being tested properly.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#10221 from HyukjinKwon/SPARK-12236.
Support UnsafeRow for the Coalesce/Except/Intersect.
Could you review if my code changes are ok? davies Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10285 from gatorsmile/unsafeSupportCIE.
When SparkStrategies.BasicOperators's "case BroadcastHint(child) => apply(child)" is hit, it only recursively invokes BasicOperators.apply with this "child". It makes many strategies have no change to process this plan, which probably leads to "No plan" issue, so we use planLater to go through all strategies.
https://issues.apache.org/jira/browse/SPARK-12275
Author: yucai <yucai.yu@intel.com>
Closes#10265 from yucai/broadcast_hint.
Modifies the String overload to call the Column overload and ensures this is called in a test.
Author: Ankur Dave <ankurdave@gmail.com>
Closes#10271 from ankurdave/SPARK-12298.
This patch adds documentation for Spark configurations that affect off-heap memory and makes some naming and validation improvements for those configs.
- Change `spark.memory.offHeapSize` to `spark.memory.offHeap.size`. This is fine because this configuration has not shipped in any Spark release yet (it's new in Spark 1.6).
- Deprecated `spark.unsafe.offHeap` in favor of a new `spark.memory.offHeap.enabled` configuration. The motivation behind this change is to gather all memory-related configurations under the same prefix.
- Add a check which prevents users from setting `spark.memory.offHeap.enabled=true` when `spark.memory.offHeap.size == 0`. After SPARK-11389 (#9344), which was committed in Spark 1.6, Spark enforces a hard limit on the amount of off-heap memory that it will allocate to tasks. As a result, enabling off-heap execution memory without setting `spark.memory.offHeap.size` will lead to immediate OOMs. The new configuration validation makes this scenario easier to diagnose, helping to avoid user confusion.
- Document these configurations on the configuration page.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10237 from JoshRosen/SPARK-12251.
This PR adds a `private[sql]` method `metadata` to `SparkPlan`, which can be used to describe detail information about a physical plan during visualization. Specifically, this PR uses this method to provide details of `PhysicalRDD`s translated from a data source relation. For example, a `ParquetRelation` converted from Hive metastore table `default.psrc` is now shown as the following screenshot:
![image](https://cloud.githubusercontent.com/assets/230655/11526657/e10cb7e6-9916-11e5-9afa-f108932ec890.png)
And here is the screenshot for a regular `ParquetRelation` (not converted from Hive metastore table) loaded from a really long path:
![output](https://cloud.githubusercontent.com/assets/230655/11680582/37c66460-9e94-11e5-8f50-842db5309d5a.png)
Author: Cheng Lian <lian@databricks.com>
Closes#10004 from liancheng/spark-12012.physical-rdd-metadata.
Currently Parquet predicate tests all pass even if filters are not pushed down or this is disabled.
In this PR, For checking evaluating filters, Simply it makes the expression from `expression.Filter` and then try to create filters just like Spark does.
For checking the results, this manually accesses to the child rdd (of `expression.Filter`) and produces the results which should be filtered properly, and then compares it to expected values.
Now, if filters are not pushed down or this is disabled, this throws exceptions.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#9659 from HyukjinKwon/SPARK-11676.
Delays application of ResolvePivot until all aggregates are resolved to prevent problems with UnresolvedFunction and adds unit test
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#10202 from aray/sql-pivot-unresolved-function.
This PR is to add three more data types into Encoder, including `BigDecimal`, `Date` and `Timestamp`.
marmbrus cloud-fan rxin Could you take a quick look at these three types? Not sure if it can be merged to 1.6. Thank you very much!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10188 from gatorsmile/dataTypesinEncoder.
See the thread Ben started:
http://search-hadoop.com/m/q3RTtveEuhjsr7g/
This PR adds drop() method to DataFrame which accepts multiple column names
Author: tedyu <yuzhihong@gmail.com>
Closes#9862 from ted-yu/master.
We should upgrade to SBT 0.13.9, since this is a requirement in order to use SBT's new Maven-style resolution features (which will be done in a separate patch, because it's blocked by some binary compatibility issues in the POM reader plugin).
I also upgraded Scalastyle to version 0.8.0, which was necessary in order to fix a Scala 2.10.5 compatibility issue (see https://github.com/scalastyle/scalastyle/issues/156). The newer Scalastyle is slightly stricter about whitespace surrounding tokens, so I fixed the new style violations.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10112 from JoshRosen/upgrade-to-sbt-0.13.9.
Resubmit #9297 and #9991
On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution.
To support SQL UI on the history server:
1. I added an onOtherEvent method to the SparkListener trait and post all SQL related events to the same event bus.
2. Two SQL events SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd are defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4. A new trait SparkHistoryListenerFactory is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using java.util.ServiceLoader.
Author: Carson Wang <carson.wang@intel.com>
Closes#10061 from carsonwang/SqlHistoryUI.
Use try to match the behavior for single distinct aggregation with Spark 1.5, but that's not scalable, we should be robust by default, have a flag to address performance regression for low cardinality aggregation.
cc yhuai nongli
Author: Davies Liu <davies@databricks.com>
Closes#10075 from davies/agg_15.
When query the Timestamp or Date column like the following
val filtered = jdbcdf.where($"TIMESTAMP_COLUMN" >= beg && $"TIMESTAMP_COLUMN" < end)
The generated SQL query is "TIMESTAMP_COLUMN >= 2015-01-01 00:00:00.0"
It should have quote around the Timestamp/Date value such as "TIMESTAMP_COLUMN >= '2015-01-01 00:00:00.0'"
Author: Huaxin Gao <huaxing@oc0558782468.ibm.com>
Closes#9872 from huaxingao/spark-11788.
Persist and Unpersist exist in both RDD and Dataframe APIs. I think they are still very critical in Dataset APIs. Not sure if my understanding is correct? If so, could you help me check if the implementation is acceptable?
Please provide your opinions. marmbrus rxin cloud-fan
Thank you very much!
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#9889 from gatorsmile/persistDS.
create java version of `constructorFor` and `extractorFor` in `JavaTypeInference`
Author: Wenchen Fan <wenchen@databricks.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9937 from cloud-fan/pojo.
When we build the `fromRowExpression` for an encoder, we set up a lot of "unresolved" stuff and lost the required data type, which may lead to runtime error if the real type doesn't match the encoder's schema.
For example, we build an encoder for `case class Data(a: Int, b: String)` and the real type is `[a: int, b: long]`, then we will hit runtime error and say that we can't construct class `Data` with int and long, because we lost the information that `b` should be a string.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9840 from cloud-fan/err-msg.
The reason is that, for a single culumn `RowEncoder`(or a single field product encoder), when we use it as the encoder for grouping key, we should also combine the grouping attributes, although there is only one grouping attribute.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10059 from cloud-fan/bug.
JIRA: https://issues.apache.org/jira/browse/SPARK-11949
The result of cube plan uses incorrect schema. The schema of cube result should set nullable property to true because the grouping expressions will have null values.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10038 from viirya/fix-cube.
This reverts commit cc243a079b / PR #9297
I'm reverting this because it broke SQLListenerMemoryLeakSuite in the master Maven builds.
See #9991 for a discussion of why this broke the tests.
This PR improve the performance of CartesianProduct by caching the result of right plan.
After this patch, the query time of TPC-DS Q65 go down to 4 seconds from 28 minutes (420X faster).
cc nongli
Author: Davies Liu <davies@databricks.com>
Closes#9969 from davies/improve_cartesian.
In 1.6, we introduce a public API to have a SQLContext for current thread, SparkPlan should use that.
Author: Davies Liu <davies@databricks.com>
Closes#9990 from davies/leak_context.
When calling `get_json_object` for the following two cases, both results are `"null"`:
```scala
val tuple: Seq[(String, String)] = ("5", """{"f1": null}""") :: Nil
val df: DataFrame = tuple.toDF("key", "jstring")
val res = df.select(functions.get_json_object($"jstring", "$.f1")).collect()
```
```scala
val tuple2: Seq[(String, String)] = ("5", """{"f1": "null"}""") :: Nil
val df2: DataFrame = tuple2.toDF("key", "jstring")
val res3 = df2.select(functions.get_json_object($"jstring", "$.f1")).collect()
```
Fixed the problem and also added a test case.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10018 from gatorsmile/get_json_object.
Check for partition column null-ability while building the partition spec.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#10001 from dilipbiswal/spark-11997.
On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution.
To support SQL UI on the history server:
1. I added an `onOtherEvent` method to the `SparkListener` trait and post all SQL related events to the same event bus.
2. Two SQL events `SparkListenerSQLExecutionStart` and `SparkListenerSQLExecutionEnd` are defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4. A new trait `SparkHistoryListenerFactory` is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using `java.util.ServiceLoader`.
Author: Carson Wang <carson.wang@intel.com>
Closes#9297 from carsonwang/SqlHistoryUI.
Except inner join, maybe the other join types are also useful when users are using the joinWith function. Thus, added the joinType into the existing joinWith call in Dataset APIs.
Also providing another joinWith interface for the cartesian-join-like functionality.
Please provide your opinions. marmbrus rxin cloud-fan Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9921 from gatorsmile/joinWith.
This patch makes it consistent to use varargs in all DataFrameReader methods, including Parquet, JSON, text, and the generic load function.
Also added a few more API tests for the Java API.
Author: Reynold Xin <rxin@databricks.com>
Closes#9945 from rxin/SPARK-11967.
This PR is to provide two common `coalesce` and `repartition` in Dataset APIs.
After reading the comments of SPARK-9999, I am unclear about the plan for supporting re-partitioning in Dataset APIs. Currently, both RDD APIs and Dataframe APIs provide users such a flexibility to control the number of partitions.
In most traditional RDBMS, they expose the number of partitions, the partitioning columns, the table partitioning methods to DBAs for performance tuning and storage planning. Normally, these parameters could largely affect the query performance. Since the actual performance depends on the workload types, I think it is almost impossible to automate the discovery of the best partitioning strategy for all the scenarios.
I am wondering if Dataset APIs are planning to hide these APIs from users? Feel free to reject my PR if it does not match the plan.
Thank you for your answers. marmbrus rxin cloud-fan
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9899 from gatorsmile/coalesce.
Currently pivot's signature looks like
```scala
scala.annotation.varargs
def pivot(pivotColumn: Column, values: Column*): GroupedData
scala.annotation.varargs
def pivot(pivotColumn: String, values: Any*): GroupedData
```
I think we can remove the one that takes "Column" types, since callers should always be passing in literals. It'd also be more clear if the values are not varargs, but rather Seq or java.util.List.
I also made similar changes for Python.
Author: Reynold Xin <rxin@databricks.com>
Closes#9929 from rxin/SPARK-11946.
we should pass in resolved encodera to logical `CoGroup` and bind them in physical `CoGroup`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9928 from cloud-fan/cogroup.
Based on feedback from Matei, this is more consistent with mapPartitions in Spark.
Also addresses some of the cleanups from a previous commit that renames the type variables.
Author: Reynold Xin <rxin@databricks.com>
Closes#9919 from rxin/SPARK-11933.
We should use `InternalRow.isNullAt` to check if the field is null before calling `InternalRow.getXXX`
Thanks gatorsmile who discovered this bug.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9904 from cloud-fan/null.
Can someone review my code to make sure I'm not missing anything? Thanks!
Author: Xiu Guo <xguo27@gmail.com>
Author: Xiu Guo <guoxi@us.ibm.com>
Closes#9612 from xguo27/SPARK-11628.
1. Renamed map to mapGroup, flatMap to flatMapGroup.
2. Renamed asKey -> keyAs.
3. Added more documentation.
4. Changed type parameter T to V on GroupedDataset.
5. Added since versions for all functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9880 from rxin/SPARK-11899.
In this PR I delete a method that breaks type inference for aggregators (only in the REPL)
The error when this method is present is:
```
<console>:38: error: missing parameter type for expanded function ((x$2) => x$2._2)
ds.groupBy(_._1).agg(sum(_._2), sum(_._3)).collect()
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9870 from marmbrus/dataset-repl-agg.
This mainly moves SqlNewHadoopRDD to the sql package. There is some state that is
shared between core and I've left that in core. This allows some other associated
minor cleanup.
Author: Nong Li <nong@databricks.com>
Closes#9845 from nongli/spark-11787.
Hive has since changed this behavior as well. https://issues.apache.org/jira/browse/HIVE-3454
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#9685 from nongli/spark-11724.
Apply the user supplied pathfilter while retrieving the files from fs.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9830 from dilipbiswal/spark-11544.
Fixes bug with grouping sets (including cube/rollup) where aggregates that included grouping expressions would return the wrong (null) result.
Also simplifies the analyzer rule a bit and leaves column pruning to the optimizer.
Added multiple unit tests to DataFrameAggregateSuite and verified it passes hive compatibility suite:
```
build/sbt -Phive -Dspark.hive.whitelist='groupby.*_grouping.*' 'test-only org.apache.spark.sql.hive.execution.HiveCompatibilitySuite'
```
This is an alternative to pr https://github.com/apache/spark/pull/9419 but I think its better as it simplifies the analyzer rule instead of adding another special case to it.
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#9815 from aray/groupingset-agg-fix.
In addition, tightened visibility of a lot of classes in the columnar package from private[sql] to private[columnar].
Author: Reynold Xin <rxin@databricks.com>
Closes#9842 from rxin/SPARK-11858.
When handling self joins, the implementation did not consider the case insensitivity of HiveContext. It could cause an exception as shown in the JIRA:
```
TreeNodeException: Failed to copy node.
```
The fix is low risk. It avoids unnecessary attribute replacement. It should not affect the existing behavior of self joins. Also added the test case to cover this case.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9762 from gatorsmile/joinMakeCopy.
This patch adds an alternate to the Parquet RecordReader from the parquet-mr project
that is much faster for flat schemas. Instead of using the general converter mechanism
from parquet-mr, this directly uses the lower level APIs from parquet-columnar and a
customer RecordReader that directly assembles into UnsafeRows.
This is optionally disabled and only used for supported schemas.
Using the tpcds store sales table and doing a sum of increasingly more columns, the results
are:
For 1 Column:
Before: 11.3M rows/second
After: 18.2M rows/second
For 2 Columns:
Before: 7.2M rows/second
After: 11.2M rows/second
For 5 Columns:
Before: 2.9M rows/second
After: 4.5M rows/second
Author: Nong Li <nong@databricks.com>
Closes#9774 from nongli/parquet.
Also added some nicer error messages for incompatible types (private types and primitive types) for Kryo/Java encoder.
Author: Reynold Xin <rxin@databricks.com>
Closes#9823 from rxin/SPARK-11833.
This patch refactors the existing Kryo encoder expressions and adds support for Java serialization.
Author: Reynold Xin <rxin@databricks.com>
Closes#9802 from rxin/SPARK-11810.
Apply the user supplied pathfilter while retrieving the files from fs.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9652 from dilipbiswal/spark-11544.
return Double.NaN for mean/average when count == 0 for all numeric types that is converted to Double, Decimal type continue to return null.
Author: JihongMa <linlin200605@gmail.com>
Closes#9705 from JihongMA/SPARK-11720.
Currently, if the first SQLContext is not removed after stopping SparkContext, a SQLContext could set there forever. This patch make this more robust.
Author: Davies Liu <davies@databricks.com>
Closes#9706 from davies/clear_context.
we use `ExpressionEncoder.tuple` to build the result encoder, which assumes the input encoder should point to a struct type field if it’s non-flat.
However, our keyEncoder always point to a flat field/fields: `groupingAttributes`, we should combine them into a single `NamedExpression`.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9792 from cloud-fan/agg.
If user use primitive parameters in UDF, there is no way for him to do the null-check for primitive inputs, so we are assuming the primitive input is null-propagatable for this case and return null if the input is null.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9770 from cloud-fan/udf.
When we resolve the join operator, we may change the output of right side if self-join is detected. So in `Dataset.joinWith`, we should resolve the join operator first, and then get the left output and right output from it, instead of using `left.output` and `right.output` directly.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9806 from cloud-fan/self-join.
I also found a bug with self-joins returning incorrect results in the Dataset API. Two test cases attached and filed SPARK-11803.
Author: Reynold Xin <rxin@databricks.com>
Closes#9789 from rxin/SPARK-11802.
I also wrote a test case -- but unfortunately the test case is not working due to SPARK-11795.
Author: Reynold Xin <rxin@databricks.com>
Closes#9784 from rxin/SPARK-11503.
In the previous method, fields.toArray will cast java.util.List[StructField] into Array[Object] which can not cast into Array[StructField], thus when invoking this method will throw "java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.sql.types.StructField;"
I directly cast java.util.List[StructField] into Array[StructField] in this patch.
Author: mayuanwen <mayuanwen@qiyi.com>
Closes#9649 from jackieMaKing/Spark-11679.
During executing PromoteStrings rule, if one side of binaryComparison is StringType and the other side is not StringType, the current code will promote(cast) the StringType to DoubleType, and if the StringType doesn't contain the numbers, it will get null value. So if it is doing <=> (NULL-safe equal) with Null, it will not filter anything, caused the problem reported by this jira.
I proposal to the changes through this PR, can you review my code changes ?
This problem only happen for <=>, other operators works fine.
scala> val filteredDF = df.filter(df("column") > (new Column(Literal(null))))
filteredDF: org.apache.spark.sql.DataFrame = [column: string]
scala> filteredDF.show
+------+
|column|
+------+
+------+
scala> val filteredDF = df.filter(df("column") === (new Column(Literal(null))))
filteredDF: org.apache.spark.sql.DataFrame = [column: string]
scala> filteredDF.show
+------+
|column|
+------+
+------+
scala> df.registerTempTable("DF")
scala> sqlContext.sql("select * from DF where 'column' = NULL")
res27: org.apache.spark.sql.DataFrame = [column: string]
scala> res27.show
+------+
|column|
+------+
+------+
Author: Kevin Yu <qyu@us.ibm.com>
Closes#9720 from kevinyu98/working_on_spark-11447.
This patch adds an alias for current_timestamp (now function).
Also fixes SPARK-9196 to re-enable the test case for current_timestamp.
Author: Reynold Xin <rxin@databricks.com>
Closes#9753 from rxin/SPARK-11768.
Invocation of getters for type extending AnyVal returns default value (if field value is null) instead of throwing NPE. Please check comments for SPARK-11553 issue for more details.
Author: Bartlomiej Alberski <bartlomiej.alberski@allegrogroup.com>
Closes#9642 from alberskib/bugfix/SPARK-11553.
Parquet supports some JSON and BSON datatypes. They are represented as binary for BSON and string (UTF-8) for JSON internally.
I searched a bit and found Apache drill also supports both in this way, [link](https://drill.apache.org/docs/parquet-format/).
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Hyukjin Kwon <gurwls223@gmail.com>
Closes#9658 from HyukjinKwon/SPARK-11692.
https://issues.apache.org/jira/browse/SPARK-11044
Spark writes a parquet file only with writer version1 ignoring the writer version given by user.
So, in this PR, it keeps the writer version if given or sets version1 as default.
Author: hyukjinkwon <gurwls223@gmail.com>
Author: HyukjinKwon <gurwls223@gmail.com>
Closes#9060 from HyukjinKwon/SPARK-11044.
This patch adds the following options to the JSON data source, for dealing with non-standard JSON files:
* `allowComments` (default `false`): ignores Java/C++ style comment in JSON records
* `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names
* `allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
* `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers (e.g. 00012)
To avoid passing a lot of options throughout the json package, I introduced a new JSONOptions case class to define all JSON config options.
Also updated documentation to explain these options.
Scala
![screen shot 2015-11-15 at 6 12 12 pm](https://cloud.githubusercontent.com/assets/323388/11172965/e3ace6ec-8bc4-11e5-805e-2d78f80d0ed6.png)
Python
![screen shot 2015-11-15 at 6 11 28 pm](https://cloud.githubusercontent.com/assets/323388/11172964/e23ed6ee-8bc4-11e5-8216-312f5983acd5.png)
Author: Reynold Xin <rxin@databricks.com>
Closes#9724 from rxin/SPARK-11745.
I didn't remove the old Sort operator, since we still use it in randomized tests. I moved it into test module and renamed it ReferenceSort.
Author: Reynold Xin <rxin@databricks.com>
Closes#9700 from rxin/SPARK-11734.
All the physical types are properly tested at `ParquetIOSuite` but logical type mapping is not being tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Hyukjin Kwon <gurwls223@gmail.com>
Closes#9660 from HyukjinKwon/SPARK-11694.
* rename `AppendColumn` to `AppendColumns` to be consistent with the physical plan name.
* clean up stale comments.
* always pass in resolved encoder to `TypedColumn.withInputType`(test added)
* enable a mistakenly disabled java test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9688 from cloud-fan/follow.
https://issues.apache.org/jira/browse/SPARK-11678
The change of this PR is to pass root paths of table to the partition discovery logic. So, the process of partition discovery stops at those root paths instead of going all the way to the root path of the file system.
Author: Yin Huai <yhuai@databricks.com>
Closes#9651 from yhuai/SPARK-11678.
This PR adds a new method, `reduce`, to `GroupedDataset`, which allows similar operations to `reduceByKey` on a traditional `PairRDD`.
```scala
val ds = Seq("abc", "xyz", "hello").toDS()
ds.groupBy(_.length).reduce(_ + _).collect() // not actually commutative :P
res0: Array(3 -> "abcxyz", 5 -> "hello")
```
While implementing this method and its test cases several more deficiencies were found in our encoder handling. Specifically, in order to support positional resolution, named resolution and tuple composition, it is important to keep the unresolved encoder around and to use it when constructing new `Datasets` with the same object type but different output attributes. We now divide the encoder lifecycle into three phases (that mirror the lifecycle of standard expressions) and have checks at various boundaries:
- Unresoved Encoders: all users facing encoders (those constructed by implicits, static methods, or tuple composition) are unresolved, meaning they have only `UnresolvedAttributes` for named fields and `BoundReferences` for fields accessed by ordinal.
- Resolved Encoders: internal to a `[Grouped]Dataset` the encoder is resolved, meaning all input has been resolved to a specific `AttributeReference`. Any encoders that are placed into a logical plan for use in object construction should be resolved.
- BoundEncoder: Are constructed by physical plans, right before actual conversion from row -> object is performed.
It is left to future work to add explicit checks for resolution and provide good error messages when it fails. We might also consider enforcing the above constraints in the type system (i.e. `fromRow` only exists on a `ResolvedEncoder`), but we should probably wait before spending too much time on this.
Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9673 from marmbrus/pr/9628.
switched stddev support from DeclarativeAggregate to ImperativeAggregate.
Author: JihongMa <linlin200605@gmail.com>
Closes#9380 from JihongMA/SPARK-11420.
Parquet supports some unsigned datatypes. However, Since Spark does not support unsigned datatypes, it needs to emit an exception with a clear message rather then with the one saying illegal datatype.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#9646 from HyukjinKwon/SPARK-10113.
`to_unix_timestamp` is the deterministic version of `unix_timestamp`, as it accepts at least one parameters.
Since the behavior here is quite similar to `unix_timestamp`, I think the dataframe API is not necessary here.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#9347 from adrian-wang/to_unix_timestamp.
This adds a pivot method to the dataframe api.
Following the lead of cube and rollup this adds a Pivot operator that is translated into an Aggregate by the analyzer.
Currently the syntax is like:
~~courseSales.pivot(Seq($"year"), $"course", Seq("dotNET", "Java"), sum($"earnings"))~~
~~Would we be interested in the following syntax also/alternatively? and~~
courseSales.groupBy($"year").pivot($"course", "dotNET", "Java").agg(sum($"earnings"))
//or
courseSales.groupBy($"year").pivot($"course").agg(sum($"earnings"))
Later we can add it to `SQLParser`, but as Hive doesn't support it we cant add it there, right?
~~Also what would be the suggested Java friendly method signature for this?~~
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#7841 from aray/sql-pivot.
This patch modifies Spark's closure cleaner (and a few other places) to use ASM 5, which is necessary in order to support cleaning of closures that were compiled by Java 8.
In order to avoid ASM dependency conflicts, Spark excludes ASM from all of its dependencies and uses a shaded version of ASM 4 that comes from `reflectasm` (see [SPARK-782](https://issues.apache.org/jira/browse/SPARK-782) and #232). This patch updates Spark to use a shaded version of ASM 5.0.4 that was published by the Apache XBean project; the POM used to create the shaded artifact can be found at https://github.com/apache/geronimo-xbean/blob/xbean-4.4/xbean-asm5-shaded/pom.xml.
http://movingfulcrum.tumblr.com/post/80826553604/asm-framework-50-the-missing-migration-guide was a useful resource while upgrading the code to use the new ASM5 opcodes.
I also added a new regression tests in the `java8-tests` subproject; the existing tests were insufficient to catch this bug, which only affected Scala 2.11 user code which was compiled targeting Java 8.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9512 from JoshRosen/SPARK-6152.
We need to support custom classes like java beans and combine them into tuple, and it's very hard to do it with the TypeTag-based approach.
We should keep only the compose-based way to create tuple encoder.
This PR also move `Encoder` to `org.apache.spark.sql`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9567 from cloud-fan/java.
This patch adds the building blocks for codegening subexpr elimination and implements
it end to end for UnsafeProjection. The building blocks can be used to do the same thing
for other operators.
It introduces some utilities to compute common sub expressions. Expressions can be added to
this data structure. The expr and its children will be recursively matched against existing
expressions (ones previously added) and grouped into common groups. This is built using
the existing `semanticEquals`. It does not understand things like commutative or associative
expressions. This can be done as future work.
After building this data structure, the codegen process takes advantage of it by:
1. Generating a helper function in the generated class that computes the common
subexpression. This is done for all common subexpressions that have at least
two occurrences and the expression tree is sufficiently complex.
2. When generating the apply() function, if the helper function exists, call that
instead of regenerating the expression tree. Repeated calls to the helper function
shortcircuit the evaluation logic.
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9480 from nongli/spark-10371.
Currently the user facing api for typed aggregation has some limitations:
* the customized typed aggregation must be the first of aggregation list
* the customized typed aggregation can only use long as buffer type
* the customized typed aggregation can only use flat type as result type
This PR tries to remove these limitations.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9599 from cloud-fan/agg.
https://issues.apache.org/jira/browse/SPARK-9830
This PR contains the following main changes.
* Removing `AggregateExpression1`.
* Removing `Aggregate` operator, which is used to evaluate `AggregateExpression1`.
* Removing planner rule used to plan `Aggregate`.
* Linking `MultipleDistinctRewriter` to analyzer.
* Renaming `AggregateExpression2` to `AggregateExpression` and `AggregateFunction2` to `AggregateFunction`.
* Updating places where we create aggregate expression. The way to create aggregate expressions is `AggregateExpression(aggregateFunction, mode, isDistinct)`.
* Changing `val`s in `DeclarativeAggregate`s that touch children of this function to `lazy val`s (when we create aggregate expression in DataFrame API, children of an aggregate function can be unresolved).
Author: Yin Huai <yhuai@databricks.com>
Closes#9556 from yhuai/removeAgg1.
This PR adds a new interface for user-defined aggregations, that can be used in `DataFrame` and `Dataset` operations to take all of the elements of a group and reduce them to a single value.
For example, the following aggregator extracts an `int` from a specific class and adds them up:
```scala
case class Data(i: Int)
val customSummer = new Aggregator[Data, Int, Int] {
def prepare(d: Data) = d.i
def reduce(l: Int, r: Int) = l + r
def present(r: Int) = r
}.toColumn()
val ds: Dataset[Data] = ...
val aggregated = ds.select(customSummer)
```
By using helper functions, users can make a generic `Aggregator` that works on any input type:
```scala
/** An `Aggregator` that adds up any numeric type returned by the given function. */
class SumOf[I, N : Numeric](f: I => N) extends Aggregator[I, N, N] with Serializable {
val numeric = implicitly[Numeric[N]]
override def zero: N = numeric.zero
override def reduce(b: N, a: I): N = numeric.plus(b, f(a))
override def present(reduction: N): N = reduction
}
def sum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N] = new SumOf(f).toColumn
```
These aggregators can then be used alongside other built-in SQL aggregations.
```scala
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
ds
.groupBy(_._1)
.agg(
sum(_._2), // The aggregator defined above.
expr("sum(_2)").as[Int], // A built-in dynatically typed aggregation.
count("*")) // A built-in statically typed aggregation.
.collect()
res0: ("a", 30, 30, 2L), ("b", 3, 3, 2L), ("c", 1, 1, 1L)
```
The current implementation focuses on integrating this into the typed API, but currently only supports running aggregations that return a single long value as explained in `TypedAggregateExpression`. This will be improved in a followup PR.
Author: Michael Armbrust <michael@databricks.com>
Closes#9555 from marmbrus/dataset-useragg.
Actually this was resolved by https://github.com/apache/spark/pull/8275.
But I found the JIRA issue for this is not marked as resolved since the PR above was made for another issue but the PR above resolved both.
I commented that this is resolved by the PR above; however, I opened this PR as I would like to just add
a little bit of corrections.
In the previous PR, I refactored the test by not reducing just collecting filters; however, this would not test properly `And` filter (which is not given to the tests). I unintentionally changed this from the original way (before being refactored).
In this PR, I just followed the original way to collect filters by reducing.
I would like to close this if this PR is inappropriate and somebody would like this deal with it in the separate PR related with this.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#9554 from HyukjinKwon/SPARK-9557.
The reason is that:
1. For partitioned hive table, we will move the partitioned columns after data columns. (e.g. `<a: Int, b: Int>` partition by `a` will become `<b: Int, a: Int>`)
2. When append data to table, we use position to figure out how to match input columns to table's columns.
So when we append data to partitioned table, we will match wrong columns between input and table. A solution is reordering the input columns before match by position, like what we did for [`InsertIntoHadoopFsRelation`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala#L101-L105)
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9408 from cloud-fan/append.
A few changes:
1. Removed fold, since it can be confusing for distributed collections.
2. Created specific interfaces for each Dataset function (e.g. MapFunction, ReduceFunction, MapPartitionsFunction)
3. Added more documentation and test cases.
The other thing I'm considering doing is to have a "collector" interface for FlatMapFunction and MapPartitionsFunction, similar to MapReduce's map function.
Author: Reynold Xin <rxin@databricks.com>
Closes#9531 from rxin/SPARK-11564.
This PR adds support for multiple column in a single count distinct aggregate to the new aggregation path.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9409 from hvanhovell/SPARK-11451.
This PR enables the Expand operator to process and produce Unsafe Rows.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9414 from hvanhovell/SPARK-11450.
https://issues.apache.org/jira/browse/SPARK-10116
This is really trivial, just happened to notice it -- if `XORShiftRandom.hashSeed` is really supposed to have random bits throughout (as the comment implies), it needs to do something for the conversion to `long`.
mengxr mkolod
Author: Imran Rashid <irashid@cloudera.com>
Closes#8314 from squito/SPARK-10116.
This PR adds test cases that test various column pruning and filter push-down cases.
Author: Cheng Lian <lian@databricks.com>
Closes#9468 from liancheng/spark-10978.follow-up.
JIRA: https://issues.apache.org/jira/browse/SPARK-9162
Currently ScalaUDF extends CodegenFallback and doesn't provide code generation implementation. This path implements code generation for ScalaUDF.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9270 from viirya/scalaudf-codegen.
This PR adds the ability to do typed SQL aggregations. We will likely also want to provide an interface to allow users to do aggregations on objects, but this is deferred to another PR.
```scala
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
ds.groupBy(_._1).agg(sum("_2").as[Int]).collect()
res0: Array(("a", 30), ("b", 3), ("c", 1))
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9499 from marmbrus/dataset-agg.
the main problem is: we interpret column name with special handling of `.` for DataFrame. This enables us to write something like `df("a.b")` to get the field `b` of `a`. However, we don't need this feature in `DataFrame.apply("*")` or `DataFrame.withColumnRenamed`. In these 2 cases, the column name is the final name already, we don't need extra process to interpret it.
The solution is simple, use `queryExecution.analyzed.output` to get resolved column directly, instead of using `DataFrame.resolve`.
close https://github.com/apache/spark/pull/8811
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9462 from cloud-fan/special-chars.
After aggregation, the dataset could be smaller than inputs, so it's better to do hash based aggregation for all inputs, then using sort based aggregation to merge them.
Author: Davies Liu <davies@databricks.com>
Closes#9383 from davies/fix_switch.
We have some aggregate function tests in both DataFrameAggregateSuite and SQLQuerySuite. The two have almost the same coverage and we should just remove the SQL one.
Author: Reynold Xin <rxin@databricks.com>
Closes#9475 from rxin/SPARK-11510.
1. Renamed localSort -> sortWithinPartitions to avoid ambiguity in "local"
2. distributeBy -> repartition to match the existing repartition.
Author: Reynold Xin <rxin@databricks.com>
Closes#9470 from rxin/SPARK-11504.
stddev is an alias for stddev_samp. variance should be consistent with stddev.
Also took the chance to remove internal Stddev and Variance, and only kept StddevSamp/StddevPop and VarianceSamp/VariancePop.
Author: Reynold Xin <rxin@databricks.com>
Closes#9449 from rxin/SPARK-11490.
depend on `caseSensitive` to do column name equality check, instead of just `==`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9410 from cloud-fan/partition.
We added a bunch of higher order statistics such as skewness and kurtosis to GroupedData. I don't think they are common enough to justify being listed, since users can always use the normal statistics aggregate functions.
That is to say, after this change, we won't support
```scala
df.groupBy("key").kurtosis("colA", "colB")
```
However, we will still support
```scala
df.groupBy("key").agg(kurtosis(col("colA")), kurtosis(col("colB")))
```
Author: Reynold Xin <rxin@databricks.com>
Closes#9446 from rxin/SPARK-11489.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9434 from cloud-fan/rdd2ds and squashes the following commits:
0892d72 [Wenchen Fan] support create Dataset from RDD
This PR adds a new method `unhandledFilters` to `BaseRelation`. Data sources which implement this method properly may avoid the overhead of defensive filtering done by Spark SQL.
Author: Cheng Lian <lian@databricks.com>
Closes#9399 from liancheng/spark-10978.unhandled-filters.
JIRA: https://issues.apache.org/jira/browse/SPARK-10304
This patch detects if the structure of partition directories is not valid.
The test cases are from #8547. Thanks zhzhan.
cc liancheng
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8840 from viirya/detect_invalid_part_dir.
This PR adds a new method `groupBy(cols: Column*)` to `Dataset` that allows users to group using column expressions instead of a lambda function. Since the return type of these expressions is not known at compile time, we just set the key type as a generic `Row`. If the user would like to work the key in a type-safe way, they can call `grouped.asKey[Type]`, which is also added in this PR.
```scala
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
val grouped = ds.groupBy($"_1").asKey[String]
val agged = grouped.mapGroups { case (g, iter) =>
Iterator((g, iter.map(_._2).sum))
}
agged.collect()
res0: Array(("a", 30), ("b", 3), ("c", 1))
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9359 from marmbrus/columnGroupBy and squashes the following commits:
bbcb03b [Michael Armbrust] Update DatasetSuite.scala
8fd2908 [Michael Armbrust] Update DatasetSuite.scala
0b0e2f8 [Michael Armbrust] [SPARK-11404] [SQL] Support for groupBy using column expressions
When we join 2 datasets, we will combine 2 encoders into a tupled one, and use it as the encoder for the jioned dataset. Assume both of the 2 encoders are flat, their `constructExpression`s both reference to the first element of input row. However, when we combine 2 encoders, the schema of input row changed, now the right encoder should reference to second element of input row. So we should rebind right encoder to let it know the new schema of input row before combine it.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9391 from cloud-fan/join and squashes the following commits:
846d3ab [Wenchen Fan] rebind right encoder when join 2 datasets
1. Supporting expanding structs in Projections. i.e.
"SELECT s.*" where s is a struct type.
This is fixed by allowing the expand function to handle structs in addition to tables.
2. Supporting expanding * inside aggregate functions of structs.
"SELECT max(struct(col1, structCol.*))"
This requires recursively expanding the expressions. In this case, it it the aggregate
expression "max(...)" and we need to recursively expand its children inputs.
Author: Nong Li <nongli@gmail.com>
Closes#9343 from nongli/spark-11329.
…ering.
For cached tables, we can just maintain the partitioning and ordering from the
source relation.
Author: Nong Li <nongli@gmail.com>
Closes#9404 from nongli/spark-5354.
DISTRIBUTE BY allows the user to hash partition the data by specified exprs. It also allows for
optioning sorting within each resulting partition. There is no required relationship between the
exprs for partitioning and sorting (i.e. one does not need to be a prefix of the other).
This patch adds to APIs to DataFrames which can be used together to provide this functionality:
1. distributeBy() which partitions the data frame into a specified number of partitions using the
partitioning exprs.
2. localSort() which sorts each partition using the provided sorting exprs.
To get the DISTRIBUTE BY functionality, the user simply does: df.distributeBy(...).localSort(...)
Author: Nong Li <nongli@gmail.com>
Closes#9364 from nongli/spark-11410.
This PR fixes two issues:
1. `PhysicalRDD.outputsUnsafeRows` is always `false`
Thus a `ConvertToUnsafe` operator is often required even if the underlying data source relation does output `UnsafeRow`.
1. Internal/external row conversion for `HadoopFsRelation` is kinda messy
Currently we're using `HadoopFsRelation.needConversion` and [dirty type erasure hacks][1] to indicate whether the relation outputs external row or internal row and apply external-to-internal conversion when necessary. Basically, all builtin `HadoopFsRelation` data sources, i.e. Parquet, JSON, ORC, and Text output `InternalRow`, while typical external `HadoopFsRelation` data sources, e.g. spark-avro and spark-csv, output `Row`.
This PR adds a `private[sql]` interface method `HadoopFsRelation.buildInternalScan`, which by default invokes `HadoopFsRelation.buildScan` and converts `Row`s to `UnsafeRow`s (which are also `InternalRow`s). All builtin `HadoopFsRelation` data sources override this method and directly output `UnsafeRow`s. In this way, now `HadoopFsRelation` always produces `UnsafeRow`s. Thus `PhysicalRDD.outputsUnsafeRows` can be properly set by checking whether the underlying data source is a `HadoopFsRelation`.
A remaining question is that, can we assume that all non-builtin `HadoopFsRelation` data sources output external rows? At least all well known ones do so. However it's possible that some users implemented their own `HadoopFsRelation` data sources that leverages `InternalRow` and thus all those unstable internal data representations. If this assumption is safe, we can deprecate `HadoopFsRelation.needConversion` and cleanup some more conversion code (like [here][2] and [here][3]).
This PR supersedes #9125.
Follow-ups:
1. Makes JSON and ORC data sources output `UnsafeRow` directly
1. Makes `HiveTableScan` output `UnsafeRow` directly
This is related to 1 since ORC data source shares the same `Writable` unwrapping code with `HiveTableScan`.
[1]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L353
[2]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L331-L335
[3]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L630-L669
Author: Cheng Lian <lian@databricks.com>
Closes#9305 from liancheng/spark-11345.unsafe-hadoop-fs-relation.
Currently the empty line in json file will be parsed into Row with all null field values. But in json, "{}" represents a json object, empty line is supposed to be skipped.
Make a trivial change for this.
Author: Jeff Zhang <zjffdu@apache.org>
Closes#9211 from zjffdu/SPARK-11226.
When we cogroup 2 `GroupedIterator`s in `CoGroupedIterator`, if the right side is smaller, we will consume right data and keep the left data unchanged. Then we call `hasNext` which will call `left.hasNext`. This will make `GroupedIterator` generate an extra group as the previous one has not been comsumed yet.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9346 from cloud-fan/cogroup and squashes the following commits:
9be67c8 [Wenchen Fan] SPARK-11393
When enabling mergedSchema and predicate filter, this fails since Parquet does not accept filters pushed down when the columns of the filters do not exist in the schema.
This is related with Parquet issue (https://issues.apache.org/jira/browse/PARQUET-389).
For now, it just simply disables predicate push down when using merged schema in this PR.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#9327 from HyukjinKwon/SPARK-11103.
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
The PrepareRDD may be not needed anymore, could be removed in follow up PR.
The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
```python
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()
```
For thread-safety, here what I'm got:
1) Without calling spill(), the operators should only be used by single thread, no safety problems.
2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).
Author: Davies Liu <davies@databricks.com>
Closes#9241 from davies/force_spill.
Before this PR, user has to consume the iterator of one group before process next group, or we will get into infinite loops.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9330 from cloud-fan/group.
In some cases, we can broadcast the smaller relation in cartesian join, which improve the performance significantly.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#8652 from chenghao-intel/cartesian.
This PR adds a new operation `joinWith` to a `Dataset`, which returns a `Tuple` for each pair where a given `condition` evaluates to true.
```scala
case class ClassData(a: String, b: Int)
val ds1 = Seq(ClassData("a", 1), ClassData("b", 2)).toDS()
val ds2 = Seq(("a", 1), ("b", 2)).toDS()
> ds1.joinWith(ds2, $"_1" === $"a").collect()
res0: Array((ClassData("a", 1), ("a", 1)), (ClassData("b", 2), ("b", 2)))
```
This operation is similar to the relation `join` function with one important difference in the result schema. Since `joinWith` preserves objects present on either side of the join, the result schema is similarly nested into a tuple under the column names `_1` and `_2`.
This type of join can be useful both for preserving type-safety with the original object types as well as working with relational data where either side of the join has column names in common.
## Required Changes to Encoders
In the process of working on this patch, several deficiencies to the way that we were handling encoders were discovered. Specifically, it turned out to be very difficult to `rebind` the non-expression based encoders to extract the nested objects from the results of joins (and also typed selects that return tuples).
As a result the following changes were made.
- `ClassEncoder` has been renamed to `ExpressionEncoder` and has been improved to also handle primitive types. Additionally, it is now possible to take arbitrary expression encoders and rewrite them into a single encoder that returns a tuple.
- All internal operations on `Dataset`s now require an `ExpressionEncoder`. If the users tries to pass a non-`ExpressionEncoder` in, an error will be thrown. We can relax this requirement in the future by constructing a wrapper class that uses expressions to project the row to the expected schema, shielding the users code from the required remapping. This will give us a nice balance where we don't force user encoders to understand attribute references and binding, but still allow our native encoder to leverage runtime code generation to construct specific encoders for a given schema that avoid an extra remapping step.
- Additionally, the semantics for different types of objects are now better defined. As stated in the `ExpressionEncoder` scaladoc:
- Classes will have their sub fields extracted by name using `UnresolvedAttribute` expressions
and `UnresolvedExtractValue` expressions.
- Tuples will have their subfields extracted by position using `BoundReference` expressions.
- Primitives will have their values extracted from the first ordinal with a schema that defaults
to the name `value`.
- Finally, the binding lifecycle for `Encoders` has now been unified across the codebase. Encoders are now `resolved` to the appropriate schema in the constructor of `Dataset`. This process replaces an unresolved expressions with concrete `AttributeReference` expressions. Binding then happens on demand, when an encoder is going to be used to construct an object. This closely mirrors the lifecycle for standard expressions when executing normal SQL or `DataFrame` queries.
Author: Michael Armbrust <michael@databricks.com>
Closes#9300 from marmbrus/datasets-tuples.
When sampling and then filtering DataFrame, the SQL Optimizer will push down filter into sample and produce wrong result. This is due to the sampler is calculated based on the original scope rather than the scope after filtering.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#9294 from yanboliang/spark-11303.
Currently, when a schema is inferred from a JSON file using sqlContext.read.json, the primitive object types are inferred as string, long, boolean, etc.
However, if the inferred type is too specific (JSON obviously does not enforce types itself), this can cause issues with merging dataframe schemas.
This pull request adds the option "primitivesAsString" to the JSON DataFrameReader which when true (defaults to false if not set) will infer all primitives as strings.
Below is an example usage of this new functionality.
```
val jsonDf = sqlContext.read.option("primitivesAsString", "true").json(sampleJsonFile)
scala> jsonDf.printSchema()
root
|-- bigInteger: string (nullable = true)
|-- boolean: string (nullable = true)
|-- double: string (nullable = true)
|-- integer: string (nullable = true)
|-- long: string (nullable = true)
|-- null: string (nullable = true)
|-- string: string (nullable = true)
```
Author: Stephen De Gennaro <stepheng@realitymine.com>
Closes#9249 from stephend-realitymine/stephend-primitives.
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes:
- MemoryManager
- StaticMemoryManager
- ExecutorMemoryManager
- TaskMemoryManager
- ShuffleMemoryManager
This is fairly confusing. To simplify things, this patch consolidates several of these classes:
- ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager.
- TaskMemoryManager is moved into Spark Core.
**Key changes and tasks**:
- [x] Merge ExecutorMemoryManager into MemoryManager.
- [x] Move pooling logic into Allocator.
- [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`.
- [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager.
- [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager.
- [x] Merge ShuffleMemoryManager into MemoryManager.
- [x] Move code
- [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.**
- [x] Port ShuffleMemoryManagerSuite tests.
- [x] Move classes from `unsafe` package to `memory` package.
- [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction.
- [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation
- [x] AbstractBytesToBytesMapSuite
- [x] UnsafeExternalSorterSuite
- [x] UnsafeFixedWidthAggregationMapSuite
- [x] UnsafeKVExternalSorterSuite
**Compatiblity notes**:
- This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9127 from JoshRosen/SPARK-10984.
This adds API for reading and writing text files, similar to SparkContext.textFile and RDD.saveAsTextFile.
```
SQLContext.read.text("/path/to/something.txt")
DataFrame.write.text("/path/to/write.txt")
```
Using the new Dataset API, this also supports
```
val ds: Dataset[String] = SQLContext.read.text("/path/to/something.txt").as[String]
```
Author: Reynold Xin <rxin@databricks.com>
Closes#9240 from rxin/SPARK-11274.
*This PR adds a new experimental API to Spark, tentitively named Datasets.*
A `Dataset` is a strongly-typed collection of objects that can be transformed in parallel using functional or relational operations. Example usage is as follows:
### Functional
```scala
> val ds: Dataset[Int] = Seq(1, 2, 3).toDS()
> ds.filter(_ % 1 == 0).collect()
res1: Array[Int] = Array(1, 2, 3)
```
### Relational
```scala
scala> ds.toDF().show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
> ds.select(expr("value + 1").as[Int]).collect()
res11: Array[Int] = Array(2, 3, 4)
```
## Comparison to RDDs
A `Dataset` differs from an `RDD` in the following ways:
- The creation of a `Dataset` requires the presence of an explicit `Encoder` that can be
used to serialize the object into a binary format. Encoders are also capable of mapping the
schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime
reflection based serialization.
- Internally, a `Dataset` is represented by a Catalyst logical plan and the data is stored
in the encoded form. This representation allows for additional logical operations and
enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
an object.
A `Dataset` can be converted to an `RDD` by calling the `.rdd` method.
## Comparison to DataFrames
A `Dataset` can be thought of as a specialized DataFrame, where the elements map to a specific
JVM object type, instead of to a generic `Row` container. A DataFrame can be transformed into
specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed
`Dataset` to a generic DataFrame by calling `ds.toDF()`.
## Implementation Status and TODOs
This is a rough cut at the least controversial parts of the API. The primary purpose here is to get something committed so that we can better parallelize further work and get early feedback on the API. The following is being deferred to future PRs:
- Joins and Aggregations (prototype here f11f91e6f0)
- Support for Java
Additionally, the responsibility for binding an encoder to a given schema is currently done in a fairly ad-hoc fashion. This is an internal detail, and what we are doing today works for the cases we care about. However, as we add more APIs we'll probably need to do this in a more principled way (i.e. separate resolution from binding as we do in DataFrames).
## COMPATIBILITY NOTE
Long term we plan to make `DataFrame` extend `Dataset[Row]`. However,
making this change to che class hierarchy would break the function signatures for the existing
function operations (map, flatMap, etc). As such, this class should be considered a preview
of the final API. Changes will be made to the interface after Spark 1.6.
Author: Michael Armbrust <michael@databricks.com>
Closes#9190 from marmbrus/dataset-infra.
There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8829 from JoshRosen/consolidate-sort-shuffle-implementations.
This PR introduce a new feature to run SQL directly on files without create a table, for example:
```
select id from json.`path/to/json/files` as j
```
Author: Davies Liu <davies@databricks.com>
Closes#9173 from davies/source.
Due to PARQUET-251, `BINARY` columns in existing Parquet files may be written with corrupted statistics information. This information is used by filter push-down optimization. Since Spark 1.5 turns on Parquet filter push-down by default, we may end up with wrong query results. PARQUET-251 has been fixed in parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0.
This affects all Spark SQL data types that can be mapped to Parquet {{BINARY}}, namely:
- `StringType`
- `BinaryType`
- `DecimalType`
(But Spark SQL doesn't support pushing down filters involving `DecimalType` columns for now.)
To avoid wrong query results, we should disable filter push-down for columns of `StringType` and `BinaryType` until we upgrade to parquet-mr 1.8.
Author: Cheng Lian <lian@databricks.com>
Closes#9152 from liancheng/spark-11153.workaround-parquet-251.
(cherry picked from commit 0887e5e878)
Signed-off-by: Cheng Lian <lian@databricks.com>
Currently, we use CartesianProduct for join with null-safe-equal condition.
```
scala> sqlContext.sql("select * from t a join t b on (a.i <=> b.i)").explain
== Physical Plan ==
TungstenProject [i#2,j#3,i#7,j#8]
Filter (i#2 <=> i#7)
CartesianProduct
LocalTableScan [i#2,j#3], [[1,1]]
LocalTableScan [i#7,j#8], [[1,1]]
```
Actually, we can have an equal-join condition as `coalesce(i, default) = coalesce(b.i, default)`, then an partitioned join algorithm could be used.
After this PR, the plan will become:
```
>>> sqlContext.sql("select * from a join b ON a.id <=> b.id").explain()
TungstenProject [id#0L,id#1L]
Filter (id#0L <=> id#1L)
SortMergeJoin [coalesce(id#0L,0)], [coalesce(id#1L,0)]
TungstenSort [coalesce(id#0L,0) ASC], false, 0
TungstenExchange hashpartitioning(coalesce(id#0L,0),200)
ConvertToUnsafe
Scan PhysicalRDD[id#0L]
TungstenSort [coalesce(id#1L,0) ASC], false, 0
TungstenExchange hashpartitioning(coalesce(id#1L,0),200)
ConvertToUnsafe
Scan PhysicalRDD[id#1L]
```
Author: Davies Liu <davies@databricks.com>
Closes#9120 from davies/null_safe.
The purpose of this PR is to keep the unsafe format detail only inside the unsafe class itself, so when we use them(like use unsafe array in unsafe map, use unsafe array and map in columnar cache), we don't need to understand the format before use them.
change list:
* unsafe array's 4-bytes numElements header is now required(was optional), and become a part of unsafe array format.
* w.r.t the previous changing, the `sizeInBytes` of unsafe array now counts the 4-bytes header.
* unsafe map's format was `[numElements] [key array numBytes] [key array content(without numElements header)] [value array content(without numElements header)]` before, which is a little hacky as it makes unsafe array's header optional. I think saving 4 bytes is not a big deal, so the format is now: `[key array numBytes] [unsafe key array] [unsafe value array]`.
* w.r.t the previous changing, the `sizeInBytes` of unsafe map now counts both map's header and array's header.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9131 from cloud-fan/unsafe.
The unit test added in #9132 is flaky. This is a follow up PR to add `listenerBus.waitUntilEmpty` to fix it.
Author: zsxwing <zsxwing@gmail.com>
Closes#9163 from zsxwing/SPARK-11126-follow-up.
SQLListener adds all stage infos to `_stageIdToStageMetrics`, but only removes stage infos belonging to SQL executions. This PR fixed it by ignoring stages that don't belong to SQL executions.
Reported by Terry Hoo in https://www.mail-archive.com/userspark.apache.org/msg38810.html
Author: zsxwing <zsxwing@gmail.com>
Closes#9132 from zsxwing/SPARK-11126.
Make sure comma-separated paths get processed correcly in ResolvedDataSource for a HadoopFsRelationProvider
Author: Koert Kuipers <koert@tresata.com>
Closes#8416 from koertkuipers/feat-sql-comma-separated-paths.
In Spark SQL, the Exchange planner tries to avoid unnecessary sorts in cases where the data has already been sorted by a superset of the requested sorting columns. For instance, let's say that a query calls for an operator's input to be sorted by `a.asc` and the input happens to already be sorted by `[a.asc, b.asc]`. In this case, we do not need to re-sort the input. The converse, however, is not true: if the query calls for `[a.asc, b.asc]`, then `a.asc` alone will not satisfy the ordering requirements, requiring an additional sort to be planned by Exchange.
However, the current Exchange code gets this wrong and incorrectly skips sorting when the existing output ordering is a subset of the required ordering. This is simple to fix, however.
This bug was introduced in https://github.com/apache/spark/pull/7458, so it affects 1.5.0+.
This patch fixes the bug and significantly improves the unit test coverage of Exchange's sort-planning logic.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9140 from JoshRosen/SPARK-11135.
#9084 uncovered that many tests that test spilling don't actually spill. This is a follow-up patch to fix that to ensure our unit tests actually catch potential bugs in spilling. The size of this patch is inflated by the refactoring of `ExternalSorterSuite`, which had a lot of duplicate code and logic.
Author: Andrew Or <andrew@databricks.com>
Closes#9124 from andrewor14/spilling-tests.
This patch extends TungstenAggregate to support ImperativeAggregate functions. The existing TungstenAggregate operator only supported DeclarativeAggregate functions, which are defined in terms of Catalyst expressions and can be evaluated via generated projections. ImperativeAggregate functions, on the other hand, are evaluated by calling their `initialize`, `update`, `merge`, and `eval` methods.
The basic strategy here is similar to how SortBasedAggregate evaluates both types of aggregate functions: use a generated projection to evaluate the expression-based declarative aggregates with dummy placeholder expressions inserted in place of the imperative aggregate function output, then invoke the imperative aggregate functions and target them against the aggregation buffer. The bulk of the diff here consists of code that was copied and adapted from SortBasedAggregate, with some key changes to handle TungstenAggregate's sort fallback path.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9038 from JoshRosen/support-interpreted-in-tungsten-agg-final.
```scala
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
(2 to 3).map(i => Row(i, i.toString, 1)))
}
}
```
We expect the result to be:
```
2,1
3,1
```
But got
```
1,1
2,1
3,1
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes#8916 from chenghao-intel/partition_filter.
Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string.
Author: Wenchen Fan <wenchen@databricks.com>
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8453 from cloud-fan/table-name.
With this feature, we can track the query plan, time cost, exception during query execution for spark users.
Author: Wenchen Fan <cloud0fan@163.com>
Closes#9078 from cloud-fan/callback.
We should not stop resolving having when the having condtion is resolved, or something like `count(1)` will crash.
Author: Wenchen Fan <cloud0fan@163.com>
Closes#9105 from cloud-fan/having.
This patch unifies the memory management of the storage and execution regions such that either side can borrow memory from each other. When memory pressure arises, storage will be evicted in favor of execution. To avoid regressions in cases where storage is crucial, we dynamically allocate a fraction of space for storage that execution cannot evict. Several configurations are introduced:
- **spark.memory.fraction (default 0.75)**: fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records.
- **spark.memory.storageFraction (default 0.5)**: size of the storage region within the space set aside by `spark.memory.fraction`. Cached data may only be evicted if total storage exceeds this region.
- **spark.memory.useLegacyMode (default false)**: whether to use the memory management that existed in Spark 1.5 and before. This is mainly for backward compatibility.
For a detailed description of the design, see [SPARK-10000](https://issues.apache.org/jira/browse/SPARK-10000). This patch builds on top of the `MemoryManager` interface introduced in #9000.
Author: Andrew Or <andrew@databricks.com>
Closes#9084 from andrewor14/unified-memory-manager.
The SQLTab will be shared by multiple sessions.
If we create multiple independent SQLContexts (not using newSession()), will still see multiple SQLTabs in the Spark UI.
Author: Davies Liu <davies@databricks.com>
Closes#9048 from davies/sqlui.
This PR improve the unrolling and read of complex types in columnar cache:
1) Using UnsafeProjection to do serialization of complex types, so they will not be serialized three times (two for actualSize)
2) Copy the bytes from UnsafeRow/UnsafeArrayData to ByteBuffer directly, avoiding the immediate byte[]
3) Using the underlying array in ByteBuffer to create UTF8String/UnsafeRow/UnsafeArrayData without copy.
Combine these optimizations, we can reduce the unrolling time from 25s to 21s (20% less), reduce the scanning time from 3.5s to 2.5s (28% less).
```
df = sqlContext.read.parquet(path)
t = time.time()
df.cache()
df.count()
print 'unrolling', time.time() - t
for i in range(10):
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
The schema is
```
root
|-- a: struct (nullable = true)
| |-- b: long (nullable = true)
| |-- c: string (nullable = true)
|-- d: array (nullable = true)
| |-- element: long (containsNull = true)
|-- e: map (nullable = true)
| |-- key: long
| |-- value: string (valueContainsNull = true)
```
Now the columnar cache depends on that UnsafeProjection support all the data types (including UDT), this PR also fix that.
Author: Davies Liu <davies@databricks.com>
Closes#9016 from davies/complex2.
For Parquet decimal columns that are encoded using plain-dictionary encoding, we can make the upper level converter aware of the dictionary, so that we can pre-instantiate all the decimals to avoid duplicated instantiation.
Note that plain-dictionary encoding isn't available for `FIXED_LEN_BYTE_ARRAY` for Parquet writer version `PARQUET_1_0`. So currently only decimals written as `INT32` and `INT64` can benefit from this optimization.
Author: Cheng Lian <lian@databricks.com>
Closes#9040 from liancheng/spark-11007.decimal-converter-dict-support.
marmbrus
rxin
This patch adds a JdbcDialect class, which customizes the datatype mappings for Derby backends. The patch also adds unit tests for the new dialect, corresponding to the existing tests for other JDBC dialects.
JDBCSuite runs cleanly for me with this patch. So does JDBCWriteSuite, although it produces noise as described here: https://issues.apache.org/jira/browse/SPARK-10890
This patch is my original work, which I license to the ASF. I am a Derby contributor, so my ICLA is on file under SVN id "rhillegas": http://people.apache.org/committer-index.html
Touches the following files:
---------------------------------
org.apache.spark.sql.jdbc.JdbcDialects
Adds a DerbyDialect.
---------------------------------
org.apache.spark.sql.jdbc.JDBCSuite
Adds unit tests for the new DerbyDialect.
Author: Rick Hillegas <rhilleg@us.ibm.com>
Closes#8982 from rick-ibm/b_10855.
This patch introduces a `MemoryManager` that is the central arbiter of how much memory to grant to storage and execution. This patch is primarily concerned only with refactoring while preserving the existing behavior as much as possible.
This is the first step away from the existing rigid separation of storage and execution memory, which has several major drawbacks discussed on the [issue](https://issues.apache.org/jira/browse/SPARK-10956). It is the precursor of a series of patches that will attempt to address those drawbacks.
Author: Andrew Or <andrew@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Author: andrewor14 <andrew@databricks.com>
Closes#9000 from andrewor14/memory-manager.
This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session.
A new session of SQLContext could be created by:
1) create an new SQLContext
2) call newSession() on existing SQLContext
For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession).
CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache.
Added jars are still shared by all the sessions, because SparkContext does not support sessions.
cc marmbrus yhuai rxin
Author: Davies Liu <davies@databricks.com>
Closes#8909 from davies/sessions.
UnsafeRow contains 3 pieces of information when pointing to some data in memory (an object, a base offset, and length). When the row is serialized with Java/Kryo serialization, the object layout in memory can change if two machines have different pointer width (Oops in JVM).
To reproduce, launch Spark using
MASTER=local-cluster[2,1,1024] bin/spark-shell --conf "spark.executor.extraJavaOptions=-XX:-UseCompressedOops"
And then run the following
scala> sql("select 1 xx").collect()
Author: Reynold Xin <rxin@databricks.com>
Closes#9030 from rxin/SPARK-10914.
This PR refactors Parquet write path to follow parquet-format spec. It's a successor of PR #7679, but with less non-essential changes.
Major changes include:
1. Replaces `RowWriteSupport` and `MutableRowWriteSupport` with `CatalystWriteSupport`
- Writes Parquet data using standard layout defined in parquet-format
Specifically, we are now writing ...
- ... arrays and maps in standard 3-level structure with proper annotations and field names
- ... decimals as `INT32` and `INT64` whenever possible, and taking `FIXED_LEN_BYTE_ARRAY` as the final fallback
- Supports legacy mode which is compatible with Spark 1.4 and prior versions
The legacy mode is by default off, and can be turned on by flipping SQL option `spark.sql.parquet.writeLegacyFormat` to `true`.
- Eliminates per value data type dispatching costs via prebuilt composed writer functions
1. Cleans up the last pieces of old Parquet support code
As pointed out by rxin previously, we probably want to rename all those `Catalyst*` Parquet classes to `Parquet*` for clarity. But I'd like to do this in a follow-up PR to minimize code review noises in this one.
Author: Cheng Lian <lian@databricks.com>
Closes#8988 from liancheng/spark-8848/standard-parquet-write-path.
In `aggregate/utils.scala`, there is a substantial amount of duplication in the expression-rewriting logic. As a prerequisite to supporting imperative aggregate functions in `TungstenAggregate`, this patch refactors this file so that the same expression-rewriting logic is used for both `SortAggregate` and `TungstenAggregate`.
In order to allow both operators to use the same rewriting logic, `TungstenAggregationIterator. generateResultProjection()` has been updated so that it first evaluates all declarative aggregate functions' `evaluateExpression`s and writes the results into a temporary buffer, and then uses this temporary buffer and the grouping expressions to evaluate the final resultExpressions. This matches the logic in SortAggregateIterator, where this two-pass approach is necessary in order to support imperative aggregates. If this change turns out to cause performance regressions, then we can look into re-implementing the single-pass evaluation in a cleaner way as part of a followup patch.
Since the rewriting logic is now shared across both operators, this patch also extracts that logic and places it in `SparkStrategies`. This makes the rewriting logic a bit easier to follow, I think.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9015 from JoshRosen/SPARK-10988.
This PR refactors `HashJoinNode` to take a existing `HashedRelation`. So, we can reuse this node for both `ShuffledHashJoin` and `BroadcastHashJoin`.
https://issues.apache.org/jira/browse/SPARK-10887
Author: Yin Huai <yhuai@databricks.com>
Closes#8953 from yhuai/SPARK-10887.
This PR addresses [SPARK-7869](https://issues.apache.org/jira/browse/SPARK-7869)
Before the patch, attempt to load the table from Postgres with JSON/JSONb datatype caused error `java.sql.SQLException: Unsupported type 1111`
Postgres data types JSON and JSONb are now mapped to String on Spark side thus they can be loaded into DF and processed on Spark side
Example
Postgres:
```
create table test_json (id int, value json);
create table test_jsonb (id int, value jsonb);
insert into test_json (id, value) values
(1, '{"field1":"value1","field2":"value2","field3":[1,2,3]}'::json),
(2, '{"field1":"value3","field2":"value4","field3":[4,5,6]}'::json),
(3, '{"field3":"value5","field4":"value6","field3":[7,8,9]}'::json);
insert into test_jsonb (id, value) values
(4, '{"field1":"value1","field2":"value2","field3":[1,2,3]}'::jsonb),
(5, '{"field1":"value3","field2":"value4","field3":[4,5,6]}'::jsonb),
(6, '{"field3":"value5","field4":"value6","field3":[7,8,9]}'::jsonb);
```
PySpark:
```
>>> import json
>>> df1 = sqlContext.read.jdbc("jdbc:postgresql://127.0.0.1:5432/test?user=testuser", "test_json")
>>> df1.map(lambda x: (x.id, json.loads(x.value))).map(lambda (id, value): (id, value.get('field3'))).collect()
[(1, [1, 2, 3]), (2, [4, 5, 6]), (3, [7, 8, 9])]
>>> df2 = sqlContext.read.jdbc("jdbc:postgresql://127.0.0.1:5432/test?user=testuser", "test_jsonb")
>>> df2.map(lambda x: (x.id, json.loads(x.value))).map(lambda (id, value): (id, value.get('field1'))).collect()
[(4, u'value1'), (5, u'value3'), (6, None)]
```
Author: 0x0FFF <programmerag@gmail.com>
Closes#8948 from 0x0FFF/SPARK-7869.
This PR improve the performance of complex types in columnar cache by using UnsafeProjection instead of KryoSerializer.
A simple benchmark show that this PR could improve the performance of scanning a cached table with complex columns by 15x (comparing to Spark 1.5).
Here is the code used to benchmark:
```
df = sc.range(1<<23).map(lambda i: Row(a=Row(b=i, c=str(i)), d=range(10), e=dict(zip(range(10), [str(i) for i in range(10)])))).toDF()
df.write.parquet("table")
```
```
df = sqlContext.read.parquet("table")
df.cache()
df.count()
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
Author: Davies Liu <davies@databricks.com>
Closes#8971 from davies/complex.
This patch refactors several of the Aggregate2 interfaces in order to improve code clarity.
The biggest change is a refactoring of the `AggregateFunction2` class hierarchy. In the old code, we had a class named `AlgebraicAggregate` that inherited from `AggregateFunction2`, added a new set of methods, then banned the use of the inherited methods. I found this to be fairly confusing because.
If you look carefully at the existing code, you'll see that subclasses of `AggregateFunction2` fall into two disjoint categories: imperative aggregation functions which directly extended `AggregateFunction2` and declarative, expression-based aggregate functions which extended `AlgebraicAggregate`. In order to make this more explicit, this patch refactors things so that `AggregateFunction2` is a sealed abstract class with two subclasses, `ImperativeAggregateFunction` and `ExpressionAggregateFunction`. The superclass, `AggregateFunction2`, now only contains methods and fields that are common to both subclasses.
After making this change, I updated the various AggregationIterator classes to comply with this new naming scheme. I also performed several small renamings in the aggregate interfaces themselves in order to improve clarity and rewrote or expanded a number of comments.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8973 from JoshRosen/tungsten-agg-comments.
This PR remove the typeId in columnar cache, it's not needed anymore, it also remove DATE and TIMESTAMP (use INT/LONG instead).
Author: Davies Liu <davies@databricks.com>
Closes#8989 from davies/refactor_cache.
`Murmur3_x86_32.hashUnsafeWords` only accepts word-aligned bytes, but unsafe array is not.
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8987 from cloud-fan/hash.
We introduced SQL option `spark.sql.parquet.followParquetFormatSpec` while working on implementing Parquet backwards-compatibility rules in SPARK-6777. It indicates whether we should use legacy Parquet format adopted by Spark 1.4 and prior versions or the standard format defined in parquet-format spec to write Parquet files.
This option defaults to `false` and is marked as a non-public option (`isPublic = false`) because we haven't finished refactored Parquet write path. The problem is, the name of this option is somewhat confusing, because it's not super intuitive why we shouldn't follow the spec. Would be nice to rename it to `spark.sql.parquet.writeLegacyFormat`, and invert its default value (the two option names have opposite meanings).
Although this option is private in 1.5, we'll make it public in 1.6 after refactoring Parquet write path. So that users can decide whether to write Parquet files in standard format or legacy format.
Author: Cheng Lian <lian@databricks.com>
Closes#8566 from liancheng/spark-10400/deprecate-follow-parquet-format-spec.
Floor & Ceiling function should returns Long type, rather than Double.
Verified with MySQL & Hive.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#8933 from chenghao-intel/ceiling.
This is an implementation of Hive's `json_tuple` function using Jackson Streaming.
Author: Nathan Howell <nhowell@godaddy.com>
Closes#7946 from NathanHowell/SPARK-9617.
The UTF8String may come from UnsafeRow, then underline buffer of it is not copied, so we should clone it in order to hold it in Stats.
cc yhuai
Author: Davies Liu <davies@databricks.com>
Closes#8929 from davies/pushdown_string.
https://issues.apache.org/jira/browse/SPARK-10741
I choose the second approach: do not change output exprIds when convert MetastoreRelation to LogicalRelation
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8889 from cloud-fan/hot-bug.
This patch reverts most of the changes in a previous fix#8827.
The real cause of the issue is that in `TungstenAggregate`'s prepare method we only reserve 1 page, but later when we switch to sort-based aggregation we try to acquire 1 page AND a pointer array. The longer-term fix should be to reserve also the pointer array, but for now ***we will simply not track the pointer array***. (Note that elsewhere we already don't track the pointer array, e.g. [here](a18208047f/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java (L88)))
Note: This patch reuses the unit test added in #8827 so it doesn't show up in the diff.
Author: Andrew Or <andrew@databricks.com>
Closes#8888 from andrewor14/dont-track-pointer-array.
This patch attempts to fix an issue where Spark SQL's UnsafeRowSerializer was incompatible with the `tungsten-sort` ShuffleManager.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8873 from JoshRosen/SPARK-10403.
JIRA: https://issues.apache.org/jira/browse/SPARK-10446
Currently the method `join(right: DataFrame, usingColumns: Seq[String])` only supports inner join. It is more convenient to have it support other join types.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8600 from viirya/usingcolumns_df.
Reading from Microsoft SQL Server over jdbc fails when the table contains datetimeoffset types.
This patch registers a SQLServer JDBC Dialect that maps datetimeoffset to a String, as Microsoft suggest.
Author: Ewan Leith <ewan.leith@realitymine.com>
Closes#8575 from realitymine-coordinator/sqlserver.
It would be nice to support creating a DataFrame directly from a Java List of Row.
Author: Holden Karau <holden@pigscanfly.ca>
Closes#8779 from holdenk/SPARK-10630-create-DataFrame-from-Java-List.
It does not make much sense to set `spark.shuffle.spill` or `spark.sql.planner.externalSort` to false: I believe that these configurations were initially added as "escape hatches" to guard against bugs in the external operators, but these operators are now mature and well-tested. In addition, these configurations are not handled in a consistent way anymore: SQL's Tungsten codepath ignores these configurations and will continue to use spilling operators. Similarly, Spark Core's `tungsten-sort` shuffle manager does not respect `spark.shuffle.spill=false`.
This pull request removes these configurations, adds warnings at the appropriate places, and deletes a large amount of code which was only used in code paths that did not support spilling.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8831 from JoshRosen/remove-ability-to-disable-spilling.
When `TungstenAggregation` hits memory pressure, it switches from hash-based to sort-based aggregation in-place. However, in the process we try to allocate the pointer array for writing to the new `UnsafeExternalSorter` *before* actually freeing the memory from the hash map. This lead to the following exception:
```
java.io.IOException: Could not acquire 65536 bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126)
at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
```
Author: Andrew Or <andrew@databricks.com>
Closes#8827 from andrewor14/allocate-pointer-array.
Intersect and Except are both set operators and they use the all the columns to compare equality between rows. When pushing their Project parent down, the relations they based on would change, therefore not an equivalent transformation.
JIRA: https://issues.apache.org/jira/browse/SPARK-10539
I added some comments based on the fix of https://github.com/apache/spark/pull/8742.
Author: Yijie Shen <henry.yijieshen@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#8823 from yhuai/fix_set_optimization.
Current implementation uses query with a LIMIT clause to find if table already exists. This syntax works only in some database systems. This patch changes the default query to the one that is likely to work on most databases, and adds a new method to the JdbcDialect abstract class to allow dialects to override the default query.
I looked at using the JDBC meta data calls, it turns out there is no common way to find the current schema, catalog..etc. There is a new method Connection.getSchema() , but that is available only starting jdk1.7 , and existing jdbc drivers may not have implemented it. Other option was to use jdbc escape syntax clause for LIMIT, not sure on how well this supported in all the databases also. After looking at all the jdbc metadata options my conclusion was most common way is to use the simple select query with 'where 1 =0' , and allow dialects to customize as needed
Author: sureshthalamati <suresh.thalamati@gmail.com>
Closes#8676 from sureshthalamati/table_exists_spark-9078.
Instead of relying on `DataFrames` to verify our answers, we can just use simple arrays. This significantly simplifies the test logic for `LocalNode`s and reduces a lot of code duplicated from `SparkPlanTest`.
This also fixes an additional issue [SPARK-10624](https://issues.apache.org/jira/browse/SPARK-10624) where the output of `TakeOrderedAndProjectNode` is not actually ordered.
Author: Andrew Or <andrew@databricks.com>
Closes#8764 from andrewor14/sql-local-tests-cleanup.
When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop.
This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish).
This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8544 from JoshRosen/SPARK-10381.
*Note: this is for master branch only.* The fix for branch-1.5 is at #8721.
The query execution ID is currently passed from a thread to its children, which is not the intended behavior. This led to `IllegalArgumentException: spark.sql.execution.id is already set` when running queries in parallel, e.g.:
```
(1 to 100).par.foreach { _ =>
sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
}
```
The cause is `SparkContext`'s local properties are inherited by default. This patch adds a way to exclude keys we don't want to be inherited, and makes SQL go through that code path.
Author: Andrew Or <andrew@databricks.com>
Closes#8710 from andrewor14/concurrent-sql-executions.
JIRA: https://issues.apache.org/jira/browse/SPARK-10437
If an expression in `SortOrder` is a resolved one, such as `count(1)`, the corresponding rule in `Analyzer` to make it work in order by will not be applied.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8599 from viirya/orderby-agg.
This PR is in conflict with #8535 and #8573. Will update this one when they are merged.
Author: zsxwing <zsxwing@gmail.com>
Closes#8642 from zsxwing/expand-nest-join.
Adding STDDEV support for DataFrame using 1-pass online /parallel algorithm to compute variance. Please review the code change.
Author: JihongMa <linlin200605@gmail.com>
Author: Jihong MA <linlin200605@gmail.com>
Author: Jihong MA <jihongma@jihongs-mbp.usca.ibm.com>
Author: Jihong MA <jihongma@Jihongs-MacBook-Pro.local>
Closes#6297 from JihongMA/SPARK-SQL.
Fix a few Java API test style issues: unused generic types, exceptions, wrong assert argument order
Author: Sean Owen <sowen@cloudera.com>
Closes#8706 from srowen/SPARK-10547.
1. Hide `LocalNodeIterator` behind the `LocalNode#asIterator` method
2. Add tests for this
Author: Andrew Or <andrew@databricks.com>
Closes#8708 from andrewor14/local-hash-join-follow-up.
This PR is in conflict with #8535. I will update this one when #8535 gets merged.
Author: zsxwing <zsxwing@gmail.com>
Closes#8573 from zsxwing/more-local-operators.
Before this fix, `MyDenseVectorUDT.typeName` gives `mydensevecto`, which is not desirable.
Author: Cheng Lian <lian@databricks.com>
Closes#8640 from liancheng/spark-10472/udt-type-name.
This PR includes the following changes:
- Add SQLConf to LocalNode
- Add HashJoinNode
- Add ConvertToUnsafeNode and ConvertToSafeNode.scala to test unsafe hash join.
Author: zsxwing <zsxwing@gmail.com>
Closes#8535 from zsxwing/SPARK-9990.