JIRA: https://issues.apache.org/jira/browse/SPARK-9298
This patch adds pearson correlation aggregation function based on `AggregateExpression2`.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8587 from viirya/corr_aggregation.
DISTRIBUTE BY allows the user to hash partition the data by specified exprs. It also allows for
optioning sorting within each resulting partition. There is no required relationship between the
exprs for partitioning and sorting (i.e. one does not need to be a prefix of the other).
This patch adds to APIs to DataFrames which can be used together to provide this functionality:
1. distributeBy() which partitions the data frame into a specified number of partitions using the
partitioning exprs.
2. localSort() which sorts each partition using the provided sorting exprs.
To get the DISTRIBUTE BY functionality, the user simply does: df.distributeBy(...).localSort(...)
Author: Nong Li <nongli@gmail.com>
Closes#9364 from nongli/spark-11410.
Add a rule in optimizer to convert NULL [NOT] IN (expr1,...,expr2) to
Literal(null).
This is a follow up defect to SPARK-8654
cloud-fan Can you please take a look ?
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9348 from dilipbiswal/spark_11024.
Older version of Janino (>2.7) does not support Override, we should not use that in codegen.
Author: Davies Liu <davies@databricks.com>
Closes#9372 from davies/no_override.
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
The PrepareRDD may be not needed anymore, could be removed in follow up PR.
The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
```python
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()
```
For thread-safety, here what I'm got:
1) Without calling spill(), the operators should only be used by single thread, no safety problems.
2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).
Author: Davies Liu <davies@databricks.com>
Closes#9241 from davies/force_spill.
This is minor, but I ran into while writing Datasets and while it wasn't needed for the final solution, it was super confusing so we should fix it.
Basically we recurse into `Seq` to see if they have children. This breaks because we don't preserve the original subclass of `Seq` (and `StructType <:< Seq[StructField]`). Since a struct can never contain children, lets just not recurse into it.
Author: Michael Armbrust <michael@databricks.com>
Closes#9334 from marmbrus/structMakeCopy.
This PR adds a new operation `joinWith` to a `Dataset`, which returns a `Tuple` for each pair where a given `condition` evaluates to true.
```scala
case class ClassData(a: String, b: Int)
val ds1 = Seq(ClassData("a", 1), ClassData("b", 2)).toDS()
val ds2 = Seq(("a", 1), ("b", 2)).toDS()
> ds1.joinWith(ds2, $"_1" === $"a").collect()
res0: Array((ClassData("a", 1), ("a", 1)), (ClassData("b", 2), ("b", 2)))
```
This operation is similar to the relation `join` function with one important difference in the result schema. Since `joinWith` preserves objects present on either side of the join, the result schema is similarly nested into a tuple under the column names `_1` and `_2`.
This type of join can be useful both for preserving type-safety with the original object types as well as working with relational data where either side of the join has column names in common.
## Required Changes to Encoders
In the process of working on this patch, several deficiencies to the way that we were handling encoders were discovered. Specifically, it turned out to be very difficult to `rebind` the non-expression based encoders to extract the nested objects from the results of joins (and also typed selects that return tuples).
As a result the following changes were made.
- `ClassEncoder` has been renamed to `ExpressionEncoder` and has been improved to also handle primitive types. Additionally, it is now possible to take arbitrary expression encoders and rewrite them into a single encoder that returns a tuple.
- All internal operations on `Dataset`s now require an `ExpressionEncoder`. If the users tries to pass a non-`ExpressionEncoder` in, an error will be thrown. We can relax this requirement in the future by constructing a wrapper class that uses expressions to project the row to the expected schema, shielding the users code from the required remapping. This will give us a nice balance where we don't force user encoders to understand attribute references and binding, but still allow our native encoder to leverage runtime code generation to construct specific encoders for a given schema that avoid an extra remapping step.
- Additionally, the semantics for different types of objects are now better defined. As stated in the `ExpressionEncoder` scaladoc:
- Classes will have their sub fields extracted by name using `UnresolvedAttribute` expressions
and `UnresolvedExtractValue` expressions.
- Tuples will have their subfields extracted by position using `BoundReference` expressions.
- Primitives will have their values extracted from the first ordinal with a schema that defaults
to the name `value`.
- Finally, the binding lifecycle for `Encoders` has now been unified across the codebase. Encoders are now `resolved` to the appropriate schema in the constructor of `Dataset`. This process replaces an unresolved expressions with concrete `AttributeReference` expressions. Binding then happens on demand, when an encoder is going to be used to construct an object. This closely mirrors the lifecycle for standard expressions when executing normal SQL or `DataFrame` queries.
Author: Michael Armbrust <michael@databricks.com>
Closes#9300 from marmbrus/datasets-tuples.
When sampling and then filtering DataFrame, the SQL Optimizer will push down filter into sample and produce wrong result. This is due to the sampler is calculated based on the original scope rather than the scope after filtering.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#9294 from yanboliang/spark-11303.
I'm new to spark. I was trying out the sort_array function then hit this exception. I looked into the spark source code. I found the root cause is that sort_array does not check for an array of NULLs. It's not meaningful to sort an array of entirely NULLs anyway.
I'm adding a check on the input array type to SortArray. If the array consists of NULLs entirely, there is no need to sort such array. I have also added a test case for this.
Please help to review my fix. Thanks!
Author: Jia Li <jiali@us.ibm.com>
Closes#9247 from jliwork/SPARK-11277.
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes:
- MemoryManager
- StaticMemoryManager
- ExecutorMemoryManager
- TaskMemoryManager
- ShuffleMemoryManager
This is fairly confusing. To simplify things, this patch consolidates several of these classes:
- ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager.
- TaskMemoryManager is moved into Spark Core.
**Key changes and tasks**:
- [x] Merge ExecutorMemoryManager into MemoryManager.
- [x] Move pooling logic into Allocator.
- [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`.
- [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager.
- [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager.
- [x] Merge ShuffleMemoryManager into MemoryManager.
- [x] Move code
- [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.**
- [x] Port ShuffleMemoryManagerSuite tests.
- [x] Move classes from `unsafe` package to `memory` package.
- [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction.
- [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation
- [x] AbstractBytesToBytesMapSuite
- [x] UnsafeExternalSorterSuite
- [x] UnsafeFixedWidthAggregationMapSuite
- [x] UnsafeKVExternalSorterSuite
**Compatiblity notes**:
- This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9127 from JoshRosen/SPARK-10984.
marmbrus rxin I believe these typecasts are not required in the presence of explicit return types.
Author: Alexander Slesarenko <avslesarenko@gmail.com>
Closes#9262 from aslesarenko/remove-typecasts.
For nested StructType, the underline buffer could be used for others before, we should zero out the padding bytes for those primitive types that have less than 8 bytes.
cc cloud-fan
Author: Davies Liu <davies@databricks.com>
Closes#9217 from davies/zero_out.
*This PR adds a new experimental API to Spark, tentitively named Datasets.*
A `Dataset` is a strongly-typed collection of objects that can be transformed in parallel using functional or relational operations. Example usage is as follows:
### Functional
```scala
> val ds: Dataset[Int] = Seq(1, 2, 3).toDS()
> ds.filter(_ % 1 == 0).collect()
res1: Array[Int] = Array(1, 2, 3)
```
### Relational
```scala
scala> ds.toDF().show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
> ds.select(expr("value + 1").as[Int]).collect()
res11: Array[Int] = Array(2, 3, 4)
```
## Comparison to RDDs
A `Dataset` differs from an `RDD` in the following ways:
- The creation of a `Dataset` requires the presence of an explicit `Encoder` that can be
used to serialize the object into a binary format. Encoders are also capable of mapping the
schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime
reflection based serialization.
- Internally, a `Dataset` is represented by a Catalyst logical plan and the data is stored
in the encoded form. This representation allows for additional logical operations and
enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
an object.
A `Dataset` can be converted to an `RDD` by calling the `.rdd` method.
## Comparison to DataFrames
A `Dataset` can be thought of as a specialized DataFrame, where the elements map to a specific
JVM object type, instead of to a generic `Row` container. A DataFrame can be transformed into
specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed
`Dataset` to a generic DataFrame by calling `ds.toDF()`.
## Implementation Status and TODOs
This is a rough cut at the least controversial parts of the API. The primary purpose here is to get something committed so that we can better parallelize further work and get early feedback on the API. The following is being deferred to future PRs:
- Joins and Aggregations (prototype here f11f91e6f0)
- Support for Java
Additionally, the responsibility for binding an encoder to a given schema is currently done in a fairly ad-hoc fashion. This is an internal detail, and what we are doing today works for the cases we care about. However, as we add more APIs we'll probably need to do this in a more principled way (i.e. separate resolution from binding as we do in DataFrames).
## COMPATIBILITY NOTE
Long term we plan to make `DataFrame` extend `Dataset[Row]`. However,
making this change to che class hierarchy would break the function signatures for the existing
function operations (map, flatMap, etc). As such, this class should be considered a preview
of the final API. Changes will be made to the interface after Spark 1.6.
Author: Michael Armbrust <michael@databricks.com>
Closes#9190 from marmbrus/dataset-infra.
This PR change InMemoryTableScan to output UnsafeRow, and optimize the unrolling and scanning by coping the bytes for var-length types between UnsafeRow and ByteBuffer directly without creating the wrapper objects. When scanning the decimals in TPC-DS store_sales table, it's 80% faster (copy it as long without create Decimal objects).
Author: Davies Liu <davies@databricks.com>
Closes#9203 from davies/unsafe_cache.
In the analysis phase , while processing the rules for IN predicate, we
compare the in-list types to the lhs expression type and generate
cast operation if necessary. In the case of NULL [NOT] IN expr1 , we end up
generating cast between in list types to NULL like cast (1 as NULL) which
is not a valid cast.
The fix is to find a common type between LHS and RHS expressions and cast
all the expression to the common type.
Author: Dilip Biswal <dbiswal@us.ibm.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9036 from dilipbiswal/spark_8654_new.
I am changing the default behavior of `First`/`Last` to respect null values (the SQL standard default behavior).
https://issues.apache.org/jira/browse/SPARK-9740
Author: Yin Huai <yhuai@databricks.com>
Closes#8113 from yhuai/firstLast.
This PR introduce a new feature to run SQL directly on files without create a table, for example:
```
select id from json.`path/to/json/files` as j
```
Author: Davies Liu <davies@databricks.com>
Closes#9173 from davies/source.
Find out the missing attributes by recursively looking
at the sort order expression and rest of the code
takes care of projecting them out.
Added description from cloud-fan
I wanna explain a bit more about this bug.
When we resolve sort ordering, we will use a special method, which only resolves UnresolvedAttributes and UnresolvedExtractValue. However, for something like Floor('a), even the 'a is resolved, the floor expression may still being unresolved as data type mismatch(for example, 'a is string type and Floor need double type), thus can't pass this filter, and we can't push down this missing attribute 'a
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9123 from dilipbiswal/SPARK-10534.
Implement encode/decode for external row based on `ClassEncoder`.
TODO:
* code cleanup
* ~~fix corner cases~~
* refactor the encoder interface
* improve test for product codegen, to cover more corner cases.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9184 from cloud-fan/encoder.
Push conjunctive predicates though Aggregate operators when their references are a subset of the groupingExpressions.
Query plan before optimisation :-
Filter ((c#138L = 2) && (a#0 = 3))
Aggregate [a#0], [a#0,count(b#1) AS c#138L]
Project [a#0,b#1]
LocalRelation [a#0,b#1,c#2]
Query plan after optimisation :-
Filter (c#138L = 2)
Aggregate [a#0], [a#0,count(b#1) AS c#138L]
Filter (a#0 = 3)
Project [a#0,b#1]
LocalRelation [a#0,b#1,c#2]
Author: nitin goyal <nitin.goyal@guavus.com>
Author: nitin.goyal <nitin.goyal@guavus.com>
Closes#9167 from nitin2goyal/master.
This PR improve the performance by:
1) Generate an Iterator that take Iterator[CachedBatch] as input, and call accessors (unroll the loop for columns), avoid the expensive Iterator.flatMap.
2) Use Unsafe.getInt/getLong/getFloat/getDouble instead of ByteBuffer.getInt/getLong/getFloat/getDouble, the later one actually read byte by byte.
3) Remove the unnecessary copy() in Coalesce(), which is not related to memory cache, found during benchmark.
The following benchmark showed that we can speedup the columnar cache of int by 2x.
```
path = '/opt/tpcds/store_sales/'
int_cols = ['ss_sold_date_sk', 'ss_sold_time_sk', 'ss_item_sk','ss_customer_sk']
df = sqlContext.read.parquet(path).select(int_cols).cache()
df.count()
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
Author: Davies Liu <davies@databricks.com>
Closes#9145 from davies/byte_buffer.
Currently, we use CartesianProduct for join with null-safe-equal condition.
```
scala> sqlContext.sql("select * from t a join t b on (a.i <=> b.i)").explain
== Physical Plan ==
TungstenProject [i#2,j#3,i#7,j#8]
Filter (i#2 <=> i#7)
CartesianProduct
LocalTableScan [i#2,j#3], [[1,1]]
LocalTableScan [i#7,j#8], [[1,1]]
```
Actually, we can have an equal-join condition as `coalesce(i, default) = coalesce(b.i, default)`, then an partitioned join algorithm could be used.
After this PR, the plan will become:
```
>>> sqlContext.sql("select * from a join b ON a.id <=> b.id").explain()
TungstenProject [id#0L,id#1L]
Filter (id#0L <=> id#1L)
SortMergeJoin [coalesce(id#0L,0)], [coalesce(id#1L,0)]
TungstenSort [coalesce(id#0L,0) ASC], false, 0
TungstenExchange hashpartitioning(coalesce(id#0L,0),200)
ConvertToUnsafe
Scan PhysicalRDD[id#0L]
TungstenSort [coalesce(id#1L,0) ASC], false, 0
TungstenExchange hashpartitioning(coalesce(id#1L,0),200)
ConvertToUnsafe
Scan PhysicalRDD[id#1L]
```
Author: Davies Liu <davies@databricks.com>
Closes#9120 from davies/null_safe.
We can't parse `NOT` operator with comparison operations like `SELECT NOT TRUE > TRUE`, this PR fixed it.
Takes over https://github.com/apache/spark/pull/6326.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8617 from cloud-fan/not.
The purpose of this PR is to keep the unsafe format detail only inside the unsafe class itself, so when we use them(like use unsafe array in unsafe map, use unsafe array and map in columnar cache), we don't need to understand the format before use them.
change list:
* unsafe array's 4-bytes numElements header is now required(was optional), and become a part of unsafe array format.
* w.r.t the previous changing, the `sizeInBytes` of unsafe array now counts the 4-bytes header.
* unsafe map's format was `[numElements] [key array numBytes] [key array content(without numElements header)] [value array content(without numElements header)]` before, which is a little hacky as it makes unsafe array's header optional. I think saving 4 bytes is not a big deal, so the format is now: `[key array numBytes] [unsafe key array] [unsafe value array]`.
* w.r.t the previous changing, the `sizeInBytes` of unsafe map now counts both map's header and array's header.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9131 from cloud-fan/unsafe.
Some json parsers are not closed. parser in JacksonParser#parseJson, for example.
Author: navis.ryu <navis@apache.org>
Closes#9130 from navis/SPARK-11124.
Actually all of the `UnaryMathExpression` doens't support the Decimal, will create follow ups for supporing it. This is the first PR which will be good to review the approach I am taking.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#9086 from chenghao-intel/ceiling.
This patch extends TungstenAggregate to support ImperativeAggregate functions. The existing TungstenAggregate operator only supported DeclarativeAggregate functions, which are defined in terms of Catalyst expressions and can be evaluated via generated projections. ImperativeAggregate functions, on the other hand, are evaluated by calling their `initialize`, `update`, `merge`, and `eval` methods.
The basic strategy here is similar to how SortBasedAggregate evaluates both types of aggregate functions: use a generated projection to evaluate the expression-based declarative aggregates with dummy placeholder expressions inserted in place of the imperative aggregate function output, then invoke the imperative aggregate functions and target them against the aggregation buffer. The bulk of the diff here consists of code that was copied and adapted from SortBasedAggregate, with some key changes to handle TungstenAggregate's sort fallback path.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9038 from JoshRosen/support-interpreted-in-tungsten-agg-final.
Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string.
Author: Wenchen Fan <wenchen@databricks.com>
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8453 from cloud-fan/table-name.
We should not stop resolving having when the having condtion is resolved, or something like `count(1)` will crash.
Author: Wenchen Fan <cloud0fan@163.com>
Closes#9105 from cloud-fan/having.
This is a first draft of the ability to construct expressions that will take a catalyst internal row and construct a Product (case class or tuple) that has fields with the correct names. Support include:
- Nested classes
- Maps
- Efficiently handling of arrays of primitive types
Not yet supported:
- Case classes that require custom collection types (i.e. List instead of Seq).
Author: Michael Armbrust <michael@databricks.com>
Closes#9100 from marmbrus/productContructor.
In the current implementation of named expressions' `ExprIds`, we rely on a per-JVM AtomicLong to ensure that expression ids are unique within a JVM. However, these expression ids will not be _globally_ unique. This opens the potential for id collisions if new expression ids happen to be created inside of tasks rather than on the driver.
There are currently a few cases where tasks allocate expression ids, which happen to be safe because those expressions are never compared to expressions created on the driver. In order to guard against the introduction of invalid comparisons between driver-created and executor-created expression ids, this patch extends `ExprId` to incorporate a UUID to identify the JVM that created the id, which prevents collisions.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9093 from JoshRosen/SPARK-11080.
This PR improve the unrolling and read of complex types in columnar cache:
1) Using UnsafeProjection to do serialization of complex types, so they will not be serialized three times (two for actualSize)
2) Copy the bytes from UnsafeRow/UnsafeArrayData to ByteBuffer directly, avoiding the immediate byte[]
3) Using the underlying array in ByteBuffer to create UTF8String/UnsafeRow/UnsafeArrayData without copy.
Combine these optimizations, we can reduce the unrolling time from 25s to 21s (20% less), reduce the scanning time from 3.5s to 2.5s (28% less).
```
df = sqlContext.read.parquet(path)
t = time.time()
df.cache()
df.count()
print 'unrolling', time.time() - t
for i in range(10):
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
The schema is
```
root
|-- a: struct (nullable = true)
| |-- b: long (nullable = true)
| |-- c: string (nullable = true)
|-- d: array (nullable = true)
| |-- element: long (containsNull = true)
|-- e: map (nullable = true)
| |-- key: long
| |-- value: string (valueContainsNull = true)
```
Now the columnar cache depends on that UnsafeProjection support all the data types (including UDT), this PR also fix that.
Author: Davies Liu <davies@databricks.com>
Closes#9016 from davies/complex2.
JIRA: https://issues.apache.org/jira/browse/SPARK-10960
When accessing a column in inner select from a select with window function, `AnalysisException` will be thrown. For example, an query like this:
select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 from (select month, area, product, 1 as tmp1 from windowData) tmp
Currently, the rule `ExtractWindowExpressions` in `Analyzer` only extracts regular expressions from `WindowFunction`, `WindowSpecDefinition` and `AggregateExpression`. We need to also extract other attributes as the one in `Alias` as shown in the above query.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9011 from viirya/fix-window-inner-column.
This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session.
A new session of SQLContext could be created by:
1) create an new SQLContext
2) call newSession() on existing SQLContext
For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession).
CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache.
Added jars are still shared by all the sessions, because SparkContext does not support sessions.
cc marmbrus yhuai rxin
Author: Davies Liu <davies@databricks.com>
Closes#8909 from davies/sessions.
UnsafeRow contains 3 pieces of information when pointing to some data in memory (an object, a base offset, and length). When the row is serialized with Java/Kryo serialization, the object layout in memory can change if two machines have different pointer width (Oops in JVM).
To reproduce, launch Spark using
MASTER=local-cluster[2,1,1024] bin/spark-shell --conf "spark.executor.extraJavaOptions=-XX:-UseCompressedOops"
And then run the following
scala> sql("select 1 xx").collect()
Author: Reynold Xin <rxin@databricks.com>
Closes#9030 from rxin/SPARK-10914.
This PR refactors Parquet write path to follow parquet-format spec. It's a successor of PR #7679, but with less non-essential changes.
Major changes include:
1. Replaces `RowWriteSupport` and `MutableRowWriteSupport` with `CatalystWriteSupport`
- Writes Parquet data using standard layout defined in parquet-format
Specifically, we are now writing ...
- ... arrays and maps in standard 3-level structure with proper annotations and field names
- ... decimals as `INT32` and `INT64` whenever possible, and taking `FIXED_LEN_BYTE_ARRAY` as the final fallback
- Supports legacy mode which is compatible with Spark 1.4 and prior versions
The legacy mode is by default off, and can be turned on by flipping SQL option `spark.sql.parquet.writeLegacyFormat` to `true`.
- Eliminates per value data type dispatching costs via prebuilt composed writer functions
1. Cleans up the last pieces of old Parquet support code
As pointed out by rxin previously, we probably want to rename all those `Catalyst*` Parquet classes to `Parquet*` for clarity. But I'd like to do this in a follow-up PR to minimize code review noises in this one.
Author: Cheng Lian <lian@databricks.com>
Closes#8988 from liancheng/spark-8848/standard-parquet-write-path.
This PR is a first cut at code generating an encoder that takes a Scala `Product` type and converts it directly into the tungsten binary format. This is done through the addition of a new set of expression that can be used to invoke methods on raw JVM objects, extracting fields and converting the result into the required format. These can then be used directly in an `UnsafeProjection` allowing us to leverage the existing encoding logic.
According to some simple benchmarks, this can significantly speed up conversion (~4x). However, replacing CatalystConverters is deferred to a later PR to keep this PR at a reasonable size.
```scala
case class SomeInts(a: Int, b: Int, c: Int, d: Int, e: Int)
val data = SomeInts(1, 2, 3, 4, 5)
val encoder = ProductEncoder[SomeInts]
val converter = CatalystTypeConverters.createToCatalystConverter(ScalaReflection.schemaFor[SomeInts].dataType)
(1 to 5).foreach {iter =>
benchmark(s"converter $iter") {
var i = 100000000
while (i > 0) {
val res = converter(data).asInstanceOf[InternalRow]
assert(res.getInt(0) == 1)
assert(res.getInt(1) == 2)
i -= 1
}
}
benchmark(s"encoder $iter") {
var i = 100000000
while (i > 0) {
val res = encoder.toRow(data)
assert(res.getInt(0) == 1)
assert(res.getInt(1) == 2)
i -= 1
}
}
}
```
Results:
```
[info] converter 1: 7170ms
[info] encoder 1: 1888ms
[info] converter 2: 6763ms
[info] encoder 2: 1824ms
[info] converter 3: 6912ms
[info] encoder 3: 1802ms
[info] converter 4: 7131ms
[info] encoder 4: 1798ms
[info] converter 5: 7350ms
[info] encoder 5: 1912ms
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9019 from marmbrus/productEncoder.
This PR refactors `HashJoinNode` to take a existing `HashedRelation`. So, we can reuse this node for both `ShuffledHashJoin` and `BroadcastHashJoin`.
https://issues.apache.org/jira/browse/SPARK-10887
Author: Yin Huai <yhuai@databricks.com>
Closes#8953 from yhuai/SPARK-10887.
In the analysis phase , while processing the rules for IN predicate, we
compare the in-list types to the lhs expression type and generate
cast operation if necessary. In the case of NULL [NOT] IN expr1 , we end up
generating cast between in list types to NULL like cast (1 as NULL) which
is not a valid cast.
The fix is to not generate such a cast if the lhs type is a NullType instead
we translate the expression to Literal(Null).
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#8983 from dilipbiswal/spark_8654.
Its pretty hard to debug problems with expressions when you can't see all the arguments.
Before: `invoke()`
After: `invoke(inputObject#1, intField, IntegerType)`
Author: Michael Armbrust <michael@databricks.com>
Closes#9022 from marmbrus/expressionToString.
This PR improve the performance of complex types in columnar cache by using UnsafeProjection instead of KryoSerializer.
A simple benchmark show that this PR could improve the performance of scanning a cached table with complex columns by 15x (comparing to Spark 1.5).
Here is the code used to benchmark:
```
df = sc.range(1<<23).map(lambda i: Row(a=Row(b=i, c=str(i)), d=range(10), e=dict(zip(range(10), [str(i) for i in range(10)])))).toDF()
df.write.parquet("table")
```
```
df = sqlContext.read.parquet("table")
df.cache()
df.count()
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
Author: Davies Liu <davies@databricks.com>
Closes#8971 from davies/complex.
This patch allows `Repartition` to support UnsafeRows. This is accomplished by implementing the logical `Repartition` operator in terms of `Exchange` and a new `RoundRobinPartitioning`.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8083 from JoshRosen/SPARK-9702.
The created decimal is wrong if using `Decimal(unscaled, precision, scale)` with unscaled > 1e18 and and precision > 18 and scale > 0.
This bug exists since the beginning.
Author: Davies Liu <davies@databricks.com>
Closes#9014 from davies/fix_decimal.
DeclarativeAggregate matches more closely with ImperativeAggregate we already have.
Author: Reynold Xin <rxin@databricks.com>
Closes#9013 from rxin/SPARK-10982.
This patch refactors several of the Aggregate2 interfaces in order to improve code clarity.
The biggest change is a refactoring of the `AggregateFunction2` class hierarchy. In the old code, we had a class named `AlgebraicAggregate` that inherited from `AggregateFunction2`, added a new set of methods, then banned the use of the inherited methods. I found this to be fairly confusing because.
If you look carefully at the existing code, you'll see that subclasses of `AggregateFunction2` fall into two disjoint categories: imperative aggregation functions which directly extended `AggregateFunction2` and declarative, expression-based aggregate functions which extended `AlgebraicAggregate`. In order to make this more explicit, this patch refactors things so that `AggregateFunction2` is a sealed abstract class with two subclasses, `ImperativeAggregateFunction` and `ExpressionAggregateFunction`. The superclass, `AggregateFunction2`, now only contains methods and fields that are common to both subclasses.
After making this change, I updated the various AggregationIterator classes to comply with this new naming scheme. I also performed several small renamings in the aggregate interfaces themselves in order to improve clarity and rewrote or expanded a number of comments.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8973 from JoshRosen/tungsten-agg-comments.
This PR is mostly cosmetic and cleans up some warts in codegen (nearly all of which were inherited from the original quasiquote version).
- Add lines numbers to errors (in stacktraces when debug logging is on, and always for compile fails)
- Use a variable for input row instead of hardcoding "i" everywhere
- rename `primitive` -> `value` (since its often actually an object)
Author: Michael Armbrust <michael@databricks.com>
Closes#9006 from marmbrus/codegen-cleanup.
`Murmur3_x86_32.hashUnsafeWords` only accepts word-aligned bytes, but unsafe array is not.
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8987 from cloud-fan/hash.
This PR is a completely rewritten of GenerateUnsafeProjection, to accomplish the goal of copying data only once. The old code of GenerateUnsafeProjection is still there to reduce review difficulty.
Instead of creating unsafe conversion code for struct, array and map, we create code of writing the content to the global row buffer.
Author: Wenchen Fan <cloud0fan@163.com>
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8747 from cloud-fan/copy-once.
The utilities such as Substring#substringBinarySQL and BinaryPrefixComparator#computePrefix for binary data are put together in ByteArray for easy-to-read.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#8122 from maropu/CleanUpForBinaryType.
Floor & Ceiling function should returns Long type, rather than Double.
Verified with MySQL & Hive.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#8933 from chenghao-intel/ceiling.
This is an implementation of Hive's `json_tuple` function using Jackson Streaming.
Author: Nathan Howell <nhowell@godaddy.com>
Closes#7946 from NathanHowell/SPARK-9617.
This PR implements a HyperLogLog based Approximate Count Distinct function using the new UDAF interface.
The implementation is inspired by the ClearSpring HyperLogLog implementation and should produce the same results.
There is still some documentation and testing left to do.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#8362 from hvanhovell/SPARK-9741.
When reading Parquet string and binary-backed decimal values, Parquet `Binary.getBytes` always returns a copied byte array, which is unnecessary. Since the underlying implementation of `Binary` values there is guaranteed to be `ByteArraySliceBackedBinary`, and Parquet itself never reuses underlying byte arrays, we can use `Binary.toByteBuffer.array()` to steal the underlying byte arrays without copying them.
This brings performance benefits when scanning Parquet string and binary-backed decimal columns. Note that, this trick doesn't cover binary-backed decimals with precision greater than 18.
My micro-benchmark result is that, this brings a ~15% performance boost for scanning TPC-DS `store_sales` table (scale factor 15).
Another minor optimization done in this PR is that, now we directly construct a Java `BigDecimal` in `Decimal.toJavaBigDecimal` without constructing a Scala `BigDecimal` first. This brings another ~5% performance gain.
Author: Cheng Lian <lian@databricks.com>
Closes#8907 from liancheng/spark-10811/eliminate-array-copying.
https://issues.apache.org/jira/browse/SPARK-10741
I choose the second approach: do not change output exprIds when convert MetastoreRelation to LogicalRelation
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8889 from cloud-fan/hot-bug.
Since `scala.util.parsing.combinator.Parsers` is thread-safe since Scala 2.10 (See [SI-4929](https://issues.scala-lang.org/browse/SI-4929)), we can change SqlParser to object to avoid memory leak.
I didn't change other subclasses of `scala.util.parsing.combinator.Parsers` because there is only one instance in one SQLContext, which should not be an issue.
Author: zsxwing <zsxwing@gmail.com>
Closes#8357 from zsxwing/sql-memory-leak.
From JIRA: Schema merging should only handle struct fields. But currently we also reconcile decimal precision and scale information.
Author: Holden Karau <holden@pigscanfly.ca>
Closes#8634 from holdenk/SPARK-10449-dont-merge-different-precision.
Intersect and Except are both set operators and they use the all the columns to compare equality between rows. When pushing their Project parent down, the relations they based on would change, therefore not an equivalent transformation.
JIRA: https://issues.apache.org/jira/browse/SPARK-10539
I added some comments based on the fix of https://github.com/apache/spark/pull/8742.
Author: Yijie Shen <henry.yijieshen@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#8823 from yhuai/fix_set_optimization.
Kryo fails with buffer overflow even with max value (2G).
{noformat}
org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1
Serialization trace:
containsChild (org.apache.spark.sql.catalyst.expressions.BoundReference)
child (org.apache.spark.sql.catalyst.expressions.SortOrder)
array (scala.collection.mutable.ArraySeq)
ordering (org.apache.spark.sql.catalyst.expressions.InterpretedOrdering)
interpretedOrdering (org.apache.spark.sql.types.StructType)
schema (org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema). To avoid this, increase spark.kryoserializer.buffer.max value.
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:263)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{noformat}
Author: navis.ryu <navis@apache.org>
Closes#8808 from navis/SPARK-10684.
This fixes https://issues.apache.org/jira/browse/SPARK-9794 by using a real ISO8601 parser. (courtesy of the xml component of the standard java library)
cc: angelini
Author: Kevin Cox <kevincox@kevincox.ca>
Closes#8396 from kevincox/kevincox-sql-time-parsing.
Sometimes we can't push down the whole `Project` though `Sort`, but we still have a chance to push down part of it.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8644 from cloud-fan/column-prune.
JIRA: https://issues.apache.org/jira/browse/SPARK-10437
If an expression in `SortOrder` is a resolved one, such as `count(1)`, the corresponding rule in `Analyzer` to make it work in order by will not be applied.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8599 from viirya/orderby-agg.
Move .java files in `src/main/scala` to `src/main/java` root, except for `package-info.java` (to stay next to package.scala)
Author: Sean Owen <sowen@cloudera.com>
Closes#8736 from srowen/SPARK-10576.
Adding STDDEV support for DataFrame using 1-pass online /parallel algorithm to compute variance. Please review the code change.
Author: JihongMa <linlin200605@gmail.com>
Author: Jihong MA <linlin200605@gmail.com>
Author: Jihong MA <jihongma@jihongs-mbp.usca.ibm.com>
Author: Jihong MA <jihongma@Jihongs-MacBook-Pro.local>
Closes#6297 from JihongMA/SPARK-SQL.
Before this fix, `MyDenseVectorUDT.typeName` gives `mydensevecto`, which is not desirable.
Author: Cheng Lian <lian@databricks.com>
Closes#8640 from liancheng/spark-10472/udt-type-name.
Use these in the optimizer as well:
A and (not(A) or B) => A and B
not(A and B) => not(A) or not(B)
not(A or B) => not(A) and not(B)
Author: Yash Datta <Yash.Datta@guavus.com>
Closes#5700 from saucam/bool_simp.
The reason for this extra copy is that we iterate the array twice: calculate elements data size and copy elements to array buffer.
A simple solution is to follow `createCodeForStruct`, we can dynamically grow the buffer when needed and thus don't need to know the data size ahead.
This PR also include some typo and style fixes, and did some minor refactor to make sure `input.primitive` is always variable name not code when generate unsafe code.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8496 from cloud-fan/avoid-copy.
When we generate unsafe code inside `createCodeForXXX`, we always assign the `input.primitive` to a temp variable in case `input.primitive` is expression code.
This PR did some refactor to make sure `input.primitive` is always variable name, and some other typo and style fixes.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8613 from cloud-fan/minor.
The bulk of the changes are on `transient` annotation on class parameter. Often the compiler doesn't generate a field for this parameters, so the the transient annotation would be unnecessary.
But if the class parameter are used in methods, then fields are created. So it is safer to keep the annotations.
The remainder are some potential bugs, and deprecated syntax.
Author: Luc Bourlier <luc.bourlier@typesafe.com>
Closes#8433 from skyluc/issue/sbt-2.11.
We did a lot of special handling for non-deterministic expressions in `Optimizer`. However, `PhysicalOperation` just collects all Projects and Filters and mess it up. We should respect the operators order caused by non-deterministic expressions in `PhysicalOperation`.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8486 from cloud-fan/fix.
For example, we can write `SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY key + 1` in PostgreSQL, and we should support this in Spark SQL.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8548 from cloud-fan/support-order-by-non-attribute.
After this PR, In/InSet/ArrayContain will return null if value is null, instead of false. They also will return null even if there is a null in the set/array.
Author: Davies Liu <davies@databricks.com>
Closes#8492 from davies/fix_in.
This commit fixes an issue where the public SQL `Row` class did not override `hashCode`, causing it to violate the hashCode() + equals() contract. To fix this, I simply ported the `hashCode` implementation from the 1.4.x version of `Row`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8500 from JoshRosen/SPARK-10325 and squashes the following commits:
51ffea1 [Josh Rosen] Override hashCode() for public Row.
In BigDecimal or java.math.BigDecimal, the precision could be smaller than scale, for example, BigDecimal("0.001") has precision = 1 and scale = 3. But DecimalType require that the precision should be larger than scale, so we should use the maximum of precision and scale when inferring the schema from decimal literal.
Author: Davies Liu <davies@databricks.com>
Closes#8428 from davies/smaller_decimal.
Replace `JavaConversions` implicits with `JavaConverters`
Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet.
Author: Sean Owen <sowen@cloudera.com>
Closes#8033 from srowen/SPARK-9613.
We misunderstood the Julian days and nanoseconds of the day in parquet (as TimestampType) from Hive/Impala, they are overlapped, so can't be added together directly.
In order to avoid the confusing rounding when do the converting, we use `2440588` as the Julian Day of epoch of unix timestamp (which should be 2440587.5).
Author: Davies Liu <davies@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Closes#8400 from davies/timestamp_parquet.
This patch adds an analyzer rule to ensure that set operations (union, intersect, and except) are only applied to tables with the same number of columns. Without this rule, there are scenarios where invalid queries can return incorrect results instead of failing with error messages; SPARK-9813 provides one example of this problem. In other cases, the invalid query can crash at runtime with extremely confusing exceptions.
I also performed a bit of cleanup to refactor some of those logical operators' code into a common `SetOperation` base class.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7631 from JoshRosen/SPARK-9293.