https://issues.apache.org/jira/browse/SPARK-9830
This PR contains the following main changes.
* Removing `AggregateExpression1`.
* Removing `Aggregate` operator, which is used to evaluate `AggregateExpression1`.
* Removing planner rule used to plan `Aggregate`.
* Linking `MultipleDistinctRewriter` to analyzer.
* Renaming `AggregateExpression2` to `AggregateExpression` and `AggregateFunction2` to `AggregateFunction`.
* Updating places where we create aggregate expression. The way to create aggregate expressions is `AggregateExpression(aggregateFunction, mode, isDistinct)`.
* Changing `val`s in `DeclarativeAggregate`s that touch children of this function to `lazy val`s (when we create aggregate expression in DataFrame API, children of an aggregate function can be unresolved).
Author: Yin Huai <yhuai@databricks.com>
Closes#9556 from yhuai/removeAgg1.
A few changes:
1. Removed fold, since it can be confusing for distributed collections.
2. Created specific interfaces for each Dataset function (e.g. MapFunction, ReduceFunction, MapPartitionsFunction)
3. Added more documentation and test cases.
The other thing I'm considering doing is to have a "collector" interface for FlatMapFunction and MapPartitionsFunction, similar to MapReduce's map function.
Author: Reynold Xin <rxin@databricks.com>
Closes#9531 from rxin/SPARK-11564.
This PR adds support for multiple column in a single count distinct aggregate to the new aggregation path.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9409 from hvanhovell/SPARK-11451.
This PR is a follow up for PR https://github.com/apache/spark/pull/9406. It adds more documentation to the rewriting rule, removes a redundant if expression in the non-distinct aggregation path and adds a multiple distinct test to the AggregationQuerySuite.
cc yhuai marmbrus
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9541 from hvanhovell/SPARK-9241-followup.
The second PR for SPARK-9241, this adds support for multiple distinct columns to the new aggregation code path.
This PR solves the multiple DISTINCT column problem by rewriting these Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information on this. The advantages over the - competing - [first PR](https://github.com/apache/spark/pull/9280) are:
- This can use the faster TungstenAggregate code path.
- It is impossible to OOM due to an ```OpenHashSet``` allocating to much memory. However, this will multiply the number of input rows by the number of distinct clauses (plus one), and puts a lot more memory pressure on the aggregation code path itself.
The location of this Rule is a bit funny, and should probably change when the old aggregation path is changed.
cc yhuai - Could you also tell me where to add tests for this?
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9406 from hvanhovell/SPARK-9241-rewriter.
This PR enables the Expand operator to process and produce Unsafe Rows.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9414 from hvanhovell/SPARK-11450.
JIRA: https://issues.apache.org/jira/browse/SPARK-9162
Currently ScalaUDF extends CodegenFallback and doesn't provide code generation implementation. This path implements code generation for ScalaUDF.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9270 from viirya/scalaudf-codegen.
A cleanup for https://github.com/apache/spark/pull/9085.
The `DecimalLit` is very similar to `FloatLit`, we can just keep one of them.
Also added low level unit test at `SqlParserSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9482 from cloud-fan/parser.
This PR adds the ability to do typed SQL aggregations. We will likely also want to provide an interface to allow users to do aggregations on objects, but this is deferred to another PR.
```scala
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
ds.groupBy(_._1).agg(sum("_2").as[Int]).collect()
res0: Array(("a", 30), ("b", 3), ("c", 1))
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9499 from marmbrus/dataset-agg.
Currently, if the Timestamp is before epoch (1970/01/01), the hours, minutes and seconds will be negative (also rounding up).
Author: Davies Liu <davies@databricks.com>
Closes#9502 from davies/neg_hour.
functions.scala was getting pretty long. I broke it into multiple files.
I also added explicit data types for some public vals, and renamed aggregate function pretty names to lower case, which is more consistent with rest of the functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9471 from rxin/SPARK-11505.
stddev is an alias for stddev_samp. variance should be consistent with stddev.
Also took the chance to remove internal Stddev and Variance, and only kept StddevSamp/StddevPop and VarianceSamp/VariancePop.
Author: Reynold Xin <rxin@databricks.com>
Closes#9449 from rxin/SPARK-11490.
Right now, SQL's mutable projection updates every value of the mutable project after it evaluates the corresponding expression. This makes the behavior of MutableProjection confusing and complicate the implementation of common aggregate functions like stddev because developers need to be aware that when evaluating {{i+1}}th expression of a mutable projection, {{i}}th slot of the mutable row has already been updated.
This PR make the MutableProjection atomic, by generating all the results of expressions first, then copy them into mutableRow.
Had run a mircro-benchmark, there is no notable performance difference between using class members and local variables.
cc yhuai
Author: Davies Liu <davies@databricks.com>
Closes#9422 from davies/atomic_mutable and squashes the following commits:
bbc1758 [Davies Liu] support wide table
8a0ae14 [Davies Liu] fix bug
bec07da [Davies Liu] refactor
2891628 [Davies Liu] make mutableProjection atomic
Hive GenericUDTF#initialize() defines field names in a returned schema though,
the current HiveGenericUDTF drops these names.
We might need to reflect these in a logical plan tree.
Author: navis.ryu <navis@apache.org>
Closes#8456 from navis/SPARK-9034.
1. Supporting expanding structs in Projections. i.e.
"SELECT s.*" where s is a struct type.
This is fixed by allowing the expand function to handle structs in addition to tables.
2. Supporting expanding * inside aggregate functions of structs.
"SELECT max(struct(col1, structCol.*))"
This requires recursively expanding the expressions. In this case, it it the aggregate
expression "max(...)" and we need to recursively expand its children inputs.
Author: Nong Li <nongli@gmail.com>
Closes#9343 from nongli/spark-11329.
From Reynold in the thread 'Exception when using some aggregate operators' (http://search-hadoop.com/m/q3RTt0xFr22nXB4/):
I don't think these are bugs. The SQL standard for average is "avg", not "mean". Similarly, a distinct count is supposed to be written as "count(distinct col)", not "countDistinct(col)".
We can, however, make "mean" an alias for "avg" to improve compatibility between DataFrame and SQL.
Author: tedyu <yuzhihong@gmail.com>
Closes#9332 from ted-yu/master.
JIRA: https://issues.apache.org/jira/browse/SPARK-9298
This patch adds pearson correlation aggregation function based on `AggregateExpression2`.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8587 from viirya/corr_aggregation.
DISTRIBUTE BY allows the user to hash partition the data by specified exprs. It also allows for
optioning sorting within each resulting partition. There is no required relationship between the
exprs for partitioning and sorting (i.e. one does not need to be a prefix of the other).
This patch adds to APIs to DataFrames which can be used together to provide this functionality:
1. distributeBy() which partitions the data frame into a specified number of partitions using the
partitioning exprs.
2. localSort() which sorts each partition using the provided sorting exprs.
To get the DISTRIBUTE BY functionality, the user simply does: df.distributeBy(...).localSort(...)
Author: Nong Li <nongli@gmail.com>
Closes#9364 from nongli/spark-11410.
Add a rule in optimizer to convert NULL [NOT] IN (expr1,...,expr2) to
Literal(null).
This is a follow up defect to SPARK-8654
cloud-fan Can you please take a look ?
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9348 from dilipbiswal/spark_11024.
Older version of Janino (>2.7) does not support Override, we should not use that in codegen.
Author: Davies Liu <davies@databricks.com>
Closes#9372 from davies/no_override.
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
The PrepareRDD may be not needed anymore, could be removed in follow up PR.
The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
```python
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()
```
For thread-safety, here what I'm got:
1) Without calling spill(), the operators should only be used by single thread, no safety problems.
2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).
Author: Davies Liu <davies@databricks.com>
Closes#9241 from davies/force_spill.
This is minor, but I ran into while writing Datasets and while it wasn't needed for the final solution, it was super confusing so we should fix it.
Basically we recurse into `Seq` to see if they have children. This breaks because we don't preserve the original subclass of `Seq` (and `StructType <:< Seq[StructField]`). Since a struct can never contain children, lets just not recurse into it.
Author: Michael Armbrust <michael@databricks.com>
Closes#9334 from marmbrus/structMakeCopy.
This PR adds a new operation `joinWith` to a `Dataset`, which returns a `Tuple` for each pair where a given `condition` evaluates to true.
```scala
case class ClassData(a: String, b: Int)
val ds1 = Seq(ClassData("a", 1), ClassData("b", 2)).toDS()
val ds2 = Seq(("a", 1), ("b", 2)).toDS()
> ds1.joinWith(ds2, $"_1" === $"a").collect()
res0: Array((ClassData("a", 1), ("a", 1)), (ClassData("b", 2), ("b", 2)))
```
This operation is similar to the relation `join` function with one important difference in the result schema. Since `joinWith` preserves objects present on either side of the join, the result schema is similarly nested into a tuple under the column names `_1` and `_2`.
This type of join can be useful both for preserving type-safety with the original object types as well as working with relational data where either side of the join has column names in common.
## Required Changes to Encoders
In the process of working on this patch, several deficiencies to the way that we were handling encoders were discovered. Specifically, it turned out to be very difficult to `rebind` the non-expression based encoders to extract the nested objects from the results of joins (and also typed selects that return tuples).
As a result the following changes were made.
- `ClassEncoder` has been renamed to `ExpressionEncoder` and has been improved to also handle primitive types. Additionally, it is now possible to take arbitrary expression encoders and rewrite them into a single encoder that returns a tuple.
- All internal operations on `Dataset`s now require an `ExpressionEncoder`. If the users tries to pass a non-`ExpressionEncoder` in, an error will be thrown. We can relax this requirement in the future by constructing a wrapper class that uses expressions to project the row to the expected schema, shielding the users code from the required remapping. This will give us a nice balance where we don't force user encoders to understand attribute references and binding, but still allow our native encoder to leverage runtime code generation to construct specific encoders for a given schema that avoid an extra remapping step.
- Additionally, the semantics for different types of objects are now better defined. As stated in the `ExpressionEncoder` scaladoc:
- Classes will have their sub fields extracted by name using `UnresolvedAttribute` expressions
and `UnresolvedExtractValue` expressions.
- Tuples will have their subfields extracted by position using `BoundReference` expressions.
- Primitives will have their values extracted from the first ordinal with a schema that defaults
to the name `value`.
- Finally, the binding lifecycle for `Encoders` has now been unified across the codebase. Encoders are now `resolved` to the appropriate schema in the constructor of `Dataset`. This process replaces an unresolved expressions with concrete `AttributeReference` expressions. Binding then happens on demand, when an encoder is going to be used to construct an object. This closely mirrors the lifecycle for standard expressions when executing normal SQL or `DataFrame` queries.
Author: Michael Armbrust <michael@databricks.com>
Closes#9300 from marmbrus/datasets-tuples.
When sampling and then filtering DataFrame, the SQL Optimizer will push down filter into sample and produce wrong result. This is due to the sampler is calculated based on the original scope rather than the scope after filtering.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#9294 from yanboliang/spark-11303.
I'm new to spark. I was trying out the sort_array function then hit this exception. I looked into the spark source code. I found the root cause is that sort_array does not check for an array of NULLs. It's not meaningful to sort an array of entirely NULLs anyway.
I'm adding a check on the input array type to SortArray. If the array consists of NULLs entirely, there is no need to sort such array. I have also added a test case for this.
Please help to review my fix. Thanks!
Author: Jia Li <jiali@us.ibm.com>
Closes#9247 from jliwork/SPARK-11277.
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes:
- MemoryManager
- StaticMemoryManager
- ExecutorMemoryManager
- TaskMemoryManager
- ShuffleMemoryManager
This is fairly confusing. To simplify things, this patch consolidates several of these classes:
- ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager.
- TaskMemoryManager is moved into Spark Core.
**Key changes and tasks**:
- [x] Merge ExecutorMemoryManager into MemoryManager.
- [x] Move pooling logic into Allocator.
- [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`.
- [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager.
- [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager.
- [x] Merge ShuffleMemoryManager into MemoryManager.
- [x] Move code
- [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.**
- [x] Port ShuffleMemoryManagerSuite tests.
- [x] Move classes from `unsafe` package to `memory` package.
- [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction.
- [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation
- [x] AbstractBytesToBytesMapSuite
- [x] UnsafeExternalSorterSuite
- [x] UnsafeFixedWidthAggregationMapSuite
- [x] UnsafeKVExternalSorterSuite
**Compatiblity notes**:
- This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9127 from JoshRosen/SPARK-10984.
marmbrus rxin I believe these typecasts are not required in the presence of explicit return types.
Author: Alexander Slesarenko <avslesarenko@gmail.com>
Closes#9262 from aslesarenko/remove-typecasts.
For nested StructType, the underline buffer could be used for others before, we should zero out the padding bytes for those primitive types that have less than 8 bytes.
cc cloud-fan
Author: Davies Liu <davies@databricks.com>
Closes#9217 from davies/zero_out.
*This PR adds a new experimental API to Spark, tentitively named Datasets.*
A `Dataset` is a strongly-typed collection of objects that can be transformed in parallel using functional or relational operations. Example usage is as follows:
### Functional
```scala
> val ds: Dataset[Int] = Seq(1, 2, 3).toDS()
> ds.filter(_ % 1 == 0).collect()
res1: Array[Int] = Array(1, 2, 3)
```
### Relational
```scala
scala> ds.toDF().show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
> ds.select(expr("value + 1").as[Int]).collect()
res11: Array[Int] = Array(2, 3, 4)
```
## Comparison to RDDs
A `Dataset` differs from an `RDD` in the following ways:
- The creation of a `Dataset` requires the presence of an explicit `Encoder` that can be
used to serialize the object into a binary format. Encoders are also capable of mapping the
schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime
reflection based serialization.
- Internally, a `Dataset` is represented by a Catalyst logical plan and the data is stored
in the encoded form. This representation allows for additional logical operations and
enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
an object.
A `Dataset` can be converted to an `RDD` by calling the `.rdd` method.
## Comparison to DataFrames
A `Dataset` can be thought of as a specialized DataFrame, where the elements map to a specific
JVM object type, instead of to a generic `Row` container. A DataFrame can be transformed into
specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed
`Dataset` to a generic DataFrame by calling `ds.toDF()`.
## Implementation Status and TODOs
This is a rough cut at the least controversial parts of the API. The primary purpose here is to get something committed so that we can better parallelize further work and get early feedback on the API. The following is being deferred to future PRs:
- Joins and Aggregations (prototype here f11f91e6f0)
- Support for Java
Additionally, the responsibility for binding an encoder to a given schema is currently done in a fairly ad-hoc fashion. This is an internal detail, and what we are doing today works for the cases we care about. However, as we add more APIs we'll probably need to do this in a more principled way (i.e. separate resolution from binding as we do in DataFrames).
## COMPATIBILITY NOTE
Long term we plan to make `DataFrame` extend `Dataset[Row]`. However,
making this change to che class hierarchy would break the function signatures for the existing
function operations (map, flatMap, etc). As such, this class should be considered a preview
of the final API. Changes will be made to the interface after Spark 1.6.
Author: Michael Armbrust <michael@databricks.com>
Closes#9190 from marmbrus/dataset-infra.
This PR change InMemoryTableScan to output UnsafeRow, and optimize the unrolling and scanning by coping the bytes for var-length types between UnsafeRow and ByteBuffer directly without creating the wrapper objects. When scanning the decimals in TPC-DS store_sales table, it's 80% faster (copy it as long without create Decimal objects).
Author: Davies Liu <davies@databricks.com>
Closes#9203 from davies/unsafe_cache.
In the analysis phase , while processing the rules for IN predicate, we
compare the in-list types to the lhs expression type and generate
cast operation if necessary. In the case of NULL [NOT] IN expr1 , we end up
generating cast between in list types to NULL like cast (1 as NULL) which
is not a valid cast.
The fix is to find a common type between LHS and RHS expressions and cast
all the expression to the common type.
Author: Dilip Biswal <dbiswal@us.ibm.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9036 from dilipbiswal/spark_8654_new.
I am changing the default behavior of `First`/`Last` to respect null values (the SQL standard default behavior).
https://issues.apache.org/jira/browse/SPARK-9740
Author: Yin Huai <yhuai@databricks.com>
Closes#8113 from yhuai/firstLast.
This PR introduce a new feature to run SQL directly on files without create a table, for example:
```
select id from json.`path/to/json/files` as j
```
Author: Davies Liu <davies@databricks.com>
Closes#9173 from davies/source.