* Makes `SQLImplicits.rddToDataFrameHolder` scaladoc consistent with `SQLContext.createDataFrame[A <: Product](rdd: RDD[A])` since the former is essentially a wrapper for the latter
* Clarifies `createDataFrame[A <: Product]` scaladoc to apply for any `RDD[Product]`, not just case classes
Author: Feynman Liang <fliang@databricks.com>
Closes#8406 from feynmanliang/sql-doc-fixes.
This PR contains examples on how to use some of the Stat Functions available for DataFrames under `df.stat`.
rxin
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#8378 from brkyvz/update-sql-docs.
https://issues.apache.org/jira/browse/SPARK-10143
With this PR, we will set min split size to parquet's block size (row group size) set in the conf if the min split size is smaller. So, we can avoid have too many tasks and even useless tasks for reading parquet data.
I tested it locally. The table I have has 343MB and it is in my local FS. Because I did not set any min/max split size, the default split size was 32MB and the map stage had 11 tasks. But there were only three tasks that actually read data. With my PR, there were only three tasks in the map stage. Here is the difference.
Without this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399179/8587dba6-4765-11e5-9189-7ebba52a2b6d.png)
With this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399185/a4735d74-4765-11e5-8848-1f1e361a6b4b.png)
Even if the block size setting does match the actual block size of parquet file, I think it is still generally good to use parquet's block size setting if min split size is smaller than this block size.
Tested it on a cluster using
```
val count = sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count
```
Basically, it reads 0 column of table `store_sales`. My table has 1824 parquet files with size from 80MB to 280MB (1 to 3 row group sizes). Without this patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this patch, the job had 2893 tasks and spent 64s. It is still not as good as using one mapper per file (1824 tasks and 42s), but it is much better than our master.
Author: Yin Huai <yhuai@databricks.com>
Closes#8346 from yhuai/parquetMinSplit.
This class is identical to `org.apache.spark.sql.execution.datasources.jdbc. DefaultSource` and is not needed.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8334 from cloud-fan/minor.
I caught SPARK-10136 while adding more test cases to `ParquetAvroCompatibilitySuite`. Actual bug fix code lies in `CatalystRowConverter.scala`.
Author: Cheng Lian <lian@databricks.com>
Closes#8341 from liancheng/spark-10136/parquet-avro-nested-primitive-array.
This improves performance by ~ 20 - 30% in one of my local test and should fix the performance regression from 1.4 to 1.5 on ss_max.
Author: Reynold Xin <rxin@databricks.com>
Closes#8332 from rxin/SPARK-10100.
https://issues.apache.org/jira/browse/SPARK-10092
This pr is a follow-up one for Multi-DB support. It has the following changes:
* `HiveContext.refreshTable` now accepts `dbName.tableName`.
* `HiveContext.analyze` now accepts `dbName.tableName`.
* `CreateTableUsing`, `CreateTableUsingAsSelect`, `CreateTempTableUsing`, `CreateTempTableUsingAsSelect`, `CreateMetastoreDataSource`, and `CreateMetastoreDataSourceAsSelect` all take `TableIdentifier` instead of the string representation of table name.
* When you call `saveAsTable` with a specified database, the data will be saved to the correct location.
* Explicitly do not allow users to create a temporary with a specified database name (users cannot do it before).
* When we save table to metastore, we also check if db name and table name can be accepted by hive (using `MetaStoreUtils.validateName`).
Author: Yin Huai <yhuai@databricks.com>
Closes#8324 from yhuai/saveAsTableDB.
A few minor changes:
1. Improved documentation
2. Rename apply(distinct....) to distinct.
3. Changed MutableAggregationBuffer from a trait to an abstract class.
4. Renamed returnDataType to dataType to be more consistent with other expressions.
And unrelated to UDAFs:
1. Renamed file names in expressions to use suffix "Expressions" to be more consistent.
2. Moved regexp related expressions out to its own file.
3. Renamed StringComparison => StringPredicate.
Author: Reynold Xin <rxin@databricks.com>
Closes#8321 from rxin/SPARK-9242.
As I talked with Lian,
1. I added EquelNullSafe to ParquetFilters
- It uses the same equality comparison filter with EqualTo since the Parquet filter performs actually null-safe equality comparison.
2. Updated the test code (ParquetFilterSuite)
- Convert catalyst.Expression to sources.Filter
- Removed Cast since only Literal is picked up as a proper Filter in DataSourceStrategy
- Added EquelNullSafe comparison
3. Removed deprecated createFilter for catalyst.Expression
Author: hyukjinkwon <gurwls223@gmail.com>
Author: 권혁진 <gurwls223@gmail.com>
Closes#8275 from HyukjinKwon/master.
Speculation hates direct output committer, as there are multiple corner cases that may cause data corruption and/or data loss.
Please see this [PR comment] [1] for more details.
[1]: https://github.com/apache/spark/pull/8191#issuecomment-131598385
Author: Cheng Lian <lian@databricks.com>
Closes#8317 from liancheng/spark-9899/speculation-hates-direct-output-committer.
`DictionaryEncoding` uses Scala runtime reflection to avoid boxing costs while building the directory array. However, this code path may hit [SI-6240] [1] and throw exception.
[1]: https://issues.scala-lang.org/browse/SI-6240
Author: Cheng Lian <lian@databricks.com>
Closes#8306 from liancheng/spark-9627/in-memory-cache-scala-reflection.
DataFrame.withColumn in Python should be consistent with the Scala one (replacing the existing column that has the same name).
cc marmbrus
Author: Davies Liu <davies@databricks.com>
Closes#8300 from davies/with_column.
This is kind of a weird case, but given a sufficiently complex query plan (in this case a TungstenProject with an Exchange underneath), we could have NPEs on the executors due to the time when we were calling transformAllExpressions
In general we should ensure that all transformations occur on the driver and not on the executors. Some reasons for avoid executor side transformations include:
* (this case) Some operator constructors require state such as access to the Spark/SQL conf so doing a makeCopy on the executor can fail.
* (unrelated reason for avoid executor transformations) ExprIds are calculated using an atomic integer, so you can violate their uniqueness constraint by constructing them anywhere other than the driver.
This subsumes #8285.
Author: Reynold Xin <rxin@databricks.com>
Author: Michael Armbrust <michael@databricks.com>
Closes#8295 from rxin/SPARK-10096.
Turns out that inner classes of inner objects are referenced directly, and thus moving it will break binary compatibility.
Author: Michael Armbrust <michael@databricks.com>
Closes#8281 from marmbrus/binaryCompat.
Parquet hard coded a JUL logger which always writes to stdout. This PR redirects it via SLF4j JUL bridge handler, so that we can control Parquet logs via `log4j.properties`.
This solution is inspired by https://github.com/Parquet/parquet-mr/issues/390#issuecomment-46064909.
Author: Cheng Lian <lian@databricks.com>
Closes#8196 from liancheng/spark-8118/redirect-parquet-jul.
This PR uses `JDBCRDD.getConnector` to load JDBC driver before creating connection in `DataFrameReader.jdbc` and `DataFrameWriter.jdbc`.
Author: zsxwing <zsxwing@gmail.com>
Closes#8232 from zsxwing/SPARK-10036 and squashes the following commits:
adf75de [zsxwing] Add extraOptions to the connection properties
57f59d4 [zsxwing] Load JDBC driver in DataFrameReader.jdbc and DataFrameWriter.jdbc
When inserting data into a `HadoopFsRelation`, if `commitTask()` of the writer container fails, `abortTask()` will be invoked. However, both `commitTask()` and `abortTask()` try to close the output writer(s). The problem is that, closing underlying writers may not be an idempotent operation. E.g., `ParquetRecordWriter.close()` throws NPE when called twice.
Author: Cheng Lian <lian@databricks.com>
Closes#8236 from liancheng/spark-7837/double-closing.
In case of schema merging, we only handled first level fields when converting Parquet groups to `InternalRow`s. Nested struct fields are not properly handled.
For example, the schema of a Parquet file to be read can be:
```
message individual {
required group f1 {
optional binary f11 (utf8);
}
}
```
while the global schema is:
```
message global {
required group f1 {
optional binary f11 (utf8);
optional int32 f12;
}
}
```
This PR fixes this issue by padding missing fields when creating actual converters.
Author: Cheng Lian <lian@databricks.com>
Closes#8228 from liancheng/spark-10005/nested-schema-merging.
The `initialSize` argument of `ColumnBuilder.initialize()` should be the
number of rows rather than bytes. However `InMemoryColumnarTableScan`
passes in a byte size, which makes Spark SQL allocate more memory than
necessary when building in-memory columnar buffers.
Author: Kun Xu <viper_kun@163.com>
Closes#8189 from viper-kun/errorSize.
This pull request creates a new operator interface that is more similar to traditional database query iterators (with open/close/next/get).
These local operators are not currently used anywhere, but will become the basis for SPARK-9983 (local physical operators for query execution).
cc zsxwing
Author: Reynold Xin <rxin@databricks.com>
Closes#8212 from rxin/SPARK-9984.
This PR enforce dynamic partition column data type requirements by adding analysis rules.
JIRA: https://issues.apache.org/jira/browse/SPARK-8887
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes#8201 from yjshen/dynamic_partition_columns.
Also alias the ExtractValue instead of wrapping it with UnresolvedAlias when resolve attribute in LogicalPlan, as this alias will be trimmed if it's unnecessary.
Based on #7957 without the changes to mllib, but instead maintaining earlier behavior when using `withColumn` on expressions that already have metadata.
Author: Wenchen Fan <cloud0fan@outlook.com>
Author: Michael Armbrust <michael@databricks.com>
Closes#8215 from marmbrus/pr/7957.
This bug is caused by a wrong column-exist-check in `__getitem__` of pyspark dataframe. `DataFrame.apply` accepts not only top level column names, but also nested column name like `a.b`, so we should remove that check from `__getitem__`.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8202 from cloud-fan/nested.
in MLlib sometimes we need to set metadata for the new column, thus we will alias the new column with metadata before call `withColumn` and in `withColumn` we alias this clolumn again. Here I overloaded `withColumn` to allow user set metadata, just like what we did for `Column.as`.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8159 from cloud-fan/withColumn.
Currently, pageSize of TungstenSort is calculated from driver.memory, it should use executor.memory instead.
Also, in the worst case, the safeFactor could be 4 (because of rounding), increase it to 16.
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#8175 from davies/page_size.
A fundamental limitation of the existing SQL tests is that *there is simply no way to create your own `SparkContext`*. This is a serious limitation because the user may wish to use a different master or config. As a case in point, `BroadcastJoinSuite` is entirely commented out because there is no way to make it pass with the existing infrastructure.
This patch removes the singletons `TestSQLContext` and `TestData`, and instead introduces a `SharedSQLContext` that starts a context per suite. Unfortunately the singletons were so ingrained in the SQL tests that this patch necessarily needed to touch *all* the SQL test files.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/8111)
<!-- Reviewable:end -->
Author: Andrew Or <andrew@databricks.com>
Closes#8111 from andrewor14/sql-tests-refactor.
When the free memory in executor goes low, the cached broadcast objects need to serialized into disk, but currently the deserialized UnsafeHashedRelation can't be serialized , fail with NPE. This PR fixes that.
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#8174 from davies/serialize_hashed.
I made a mistake in #8049 by casting literal value to attribute's data type, which would cause simply truncate the literal value and push a wrong filter down.
JIRA: https://issues.apache.org/jira/browse/SPARK-9927
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes#8157 from yjshen/rever8049.
This patch add a thread-safe lookup for BytesToBytseMap, and use that in broadcasted HashedRelation.
Author: Davies Liu <davies@databricks.com>
Closes#8151 from davies/safeLookup.
https://issues.apache.org/jira/browse/SPARK-9920
Taking `sqlContext.sql("select i, sum(j1) as sum from testAgg group by i").explain()` as an example, the output of our current master is
```
== Physical Plan ==
TungstenAggregate(key=[i#0], value=[(sum(cast(j1#1 as bigint)),mode=Final,isDistinct=false)]
TungstenExchange hashpartitioning(i#0)
TungstenAggregate(key=[i#0], value=[(sum(cast(j1#1 as bigint)),mode=Partial,isDistinct=false)]
Scan ParquetRelation[file:/user/hive/warehouse/testagg][i#0,j1#1]
```
With this PR, the output will be
```
== Physical Plan ==
TungstenAggregate(key=[i#0], functions=[(sum(cast(j1#1 as bigint)),mode=Final,isDistinct=false)], output=[i#0,sum#18L])
TungstenExchange hashpartitioning(i#0)
TungstenAggregate(key=[i#0], functions=[(sum(cast(j1#1 as bigint)),mode=Partial,isDistinct=false)], output=[i#0,currentSum#22L])
Scan ParquetRelation[file:/user/hive/warehouse/testagg][i#0,j1#1]
```
Author: Yin Huai <yhuai@databricks.com>
Closes#8150 from yhuai/SPARK-9920.
Currently, UnsafeRowSerializer does not close the InputStream, will cause fd leak if the InputStream has an open fd in it.
TODO: the fd could still be leaked, if any items in the stream is not consumed. Currently it replies on GC to close the fd in this case.
cc JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes#8116 from davies/fd_leak.
This is the sister patch to #8011, but for aggregation.
In a nutshell: create the `TungstenAggregationIterator` before computing the parent partition. Internally this creates a `BytesToBytesMap` which acquires a page in the constructor as of this patch. This ensures that the aggregation operator is not starved since we reserve at least 1 page in advance.
rxin yhuai
Author: Andrew Or <andrew@databricks.com>
Closes#8038 from andrewor14/unsafe-starve-memory-agg.
This PR adds a hacky workaround for PARQUET-201, and should be removed once we upgrade to parquet-mr 1.8.1 or higher versions.
In Parquet, not all types of columns can be used for filter push-down optimization. The set of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and prior versions, this limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be pushed down. On the other hand, `BINARY (ENUM)` is commonly seen in Parquet files written by libraries like `parquet-avro`.
This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps to Parquet `BINARY (ENUM)` directly, and always converts `BINARY (ENUM)` to Catalyst `StringType`. Thus, a predicate involving a `BINARY (ENUM)` is recognized as one involving a string field instead and can be pushed down by the query optimizer. Such predicates are actually perfectly legal except that it fails the `ValidTypeMap` check.
The workaround added here is relaxing `ValidTypeMap` to include `BINARY (ENUM)`. I also took the chance to simplify `ParquetCompatibilityTest` a little bit when adding regression test.
Author: Cheng Lian <lian@databricks.com>
Closes#8107 from liancheng/spark-9407/parquet-enum-filter-push-down.
This PR fixes unable to push filter down to JDBC source caused by `Cast` during pattern matching.
While we are comparing columns of different type, there's a big chance we need a cast on the column, therefore not match the pattern directly on Attribute and would fail to push down.
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes#8049 from yjshen/jdbc_pushdown.
Author: Davies Liu <davies@databricks.com>
Closes#8117 from davies/fix_serialization and squashes the following commits:
d21ac71 [Davies Liu] fix serialization with empty broadcast
DirectParquetOutputCommitter was moved in SPARK-9763. However, users can explicitly set the class as a config option, so we must be able to resolve the old committer qualified name.
Author: Reynold Xin <rxin@databricks.com>
Closes#8114 from rxin/SPARK-9849.
This PR added metrics for all join and aggregate operators. However, I found the metrics may be confusing in the following two case:
1. The iterator is not totally consumed and the metric values will be less.
2. Recreating the iterators will make metric values look bigger than the size of the input source, such as `CartesianProduct`.
Author: zsxwing <zsxwing@gmail.com>
Closes#8060 from zsxwing/sql-metrics and squashes the following commits:
40f3fc1 [zsxwing] Mark LongSQLMetric private[metric] to avoid using incorrectly and leak memory
b1b9071 [zsxwing] Merge branch 'master' into sql-metrics
4bef25a [zsxwing] Add metrics for SortMergeOuterJoin
95ccfc6 [zsxwing] Merge branch 'master' into sql-metrics
67cb4dd [zsxwing] Add metrics for Project and TungstenProject; remove metrics from PhysicalRDD and LocalTableScan
0eb47d4 [zsxwing] Merge branch 'master' into sql-metrics
dd9d932 [zsxwing] Avoid creating new Iterators
589ea26 [zsxwing] Add metrics for all join and aggregate operators
PlatformDependent.UNSAFE is way too verbose.
Author: Reynold Xin <rxin@databricks.com>
Closes#8094 from rxin/SPARK-9815 and squashes the following commits:
229b603 [Reynold Xin] [SPARK-9815] Rename PlatformDependent.UNSAFE -> Platform.
This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join. It also refactors `SortMergeJoin` in order to improve performance and code clarity.
Along the way, I also performed a couple pieces of minor cleanup and optimization:
- Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins.
- Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators.
- Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings.
This patch incorporates several ideas from adrian-wang's patch, #5717.
Closes#5717.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7904)
<!-- Reviewable:end -->
Author: Josh Rosen <joshrosen@databricks.com>
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#7904 from JoshRosen/outer-join-smj and squashes 1 commits.
This PR is inspired by #8063 authored by dguy. Especially, testing Parquet files added here are all taken from that PR.
**Committer who merges this PR should attribute it to "Damian Guy <damian.guygmail.com>".**
----
SPARK-6776 and SPARK-6777 followed `parquet-avro` to implement backwards-compatibility rules defined in `parquet-format` spec. However, both Spark SQL and `parquet-avro` neglected the following statement in `parquet-format`:
> This does not affect repeated fields that are not annotated: A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor annotated by `LIST` or `MAP` should be interpreted as a required list of required elements where the element type is the type of the field.
One of the consequences is that, Parquet files generated by `parquet-protobuf` containing unannotated repeated fields are not correctly converted to Catalyst arrays.
This PR fixes this issue by
1. Handling unannotated repeated fields in `CatalystSchemaConverter`.
2. Converting this kind of special repeated fields to Catalyst arrays in `CatalystRowConverter`.
Two special converters, `RepeatedPrimitiveConverter` and `RepeatedGroupConverter`, are added. They delegate actual conversion work to a child `elementConverter` and accumulates elements in an `ArrayBuffer`.
Two extra methods, `start()` and `end()`, are added to `ParentContainerUpdater`. So that they can be used to initialize new `ArrayBuffer`s for unannotated repeated fields, and propagate converted array values to upstream.
Author: Cheng Lian <lian@databricks.com>
Closes#8070 from liancheng/spark-9340/unannotated-parquet-list and squashes the following commits:
ace6df7 [Cheng Lian] Moves ParquetProtobufCompatibilitySuite
f1c7bfd [Cheng Lian] Updates .rat-excludes
420ad2b [Cheng Lian] Fixes converting unannotated Parquet lists
There are a few changes in this pull request:
1. Moved all data sources to execution.datasources, except the public JDBC APIs.
2. In order to maintain backward compatibility from 1, added a backward compatibility translation map in data source resolution.
3. Moved ui and metric package into execution.
4. Added more documentation on some internal classes.
5. Renamed DataSourceRegister.format -> shortName.
6. Added "override" modifier on shortName.
7. Removed IntSQLMetric.
Author: Reynold Xin <rxin@databricks.com>
Closes#8056 from rxin/SPARK-9763 and squashes the following commits:
9df4801 [Reynold Xin] Removed hardcoded name in test cases.
d9babc6 [Reynold Xin] Shorten.
e484419 [Reynold Xin] Removed VisibleForTesting.
171b812 [Reynold Xin] MimaExcludes.
2041389 [Reynold Xin] Compile ...
79dda42 [Reynold Xin] Compile.
0818ba3 [Reynold Xin] Removed IntSQLMetric.
c46884f [Reynold Xin] Two more fixes.
f9aa88d [Reynold Xin] [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
Exchange.isUnsafe should check whether codegen and unsafe are enabled.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8073 from JoshRosen/SPARK-9784 and squashes the following commits:
7a1019f [Josh Rosen] [SPARK-9784] Exchange.isUnsafe should check whether codegen and unsafe are enabled
PR #7696 added two `HadoopFsRelation.refresh()` calls ([this] [1], and [this] [2]) in `DataSourceStrategy` to make test case `InsertSuite.save directly to the path of a JSON table` pass. However, this forces every `HadoopFsRelation` table scan to do a refresh, which can be super expensive for tables with large number of partitions.
The reason why the original test case fails without the `refresh()` calls is that, the old JSON relation builds the base RDD with the input paths, while `HadoopFsRelation` provides `FileStatus`es of leaf files. With the old JSON relation, we can create a temporary table based on a path, writing data to that, and then read newly written data without refreshing the table. This is no long true for `HadoopFsRelation`.
This PR removes those two expensive refresh calls, and moves the refresh into `JSONRelation` to fix this issue. We might want to update `HadoopFsRelation` interface to provide better support for this use case.
[1]: ebfd91c542/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala (L63)
[2]: ebfd91c542/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala (L91)
Author: Cheng Lian <lian@databricks.com>
Closes#8035 from liancheng/spark-9743/fix-json-relation-refreshing and squashes the following commits:
ec1957d [Cheng Lian] Fixes JSONRelation refreshing
https://issues.apache.org/jira/browse/SPARK-9777
Author: Yin Huai <yhuai@databricks.com>
Closes#8064 from yhuai/windowUnsafe and squashes the following commits:
8fb3537 [Yin Huai] Set canProcessUnsafeRows to true.
This pull request refactors the `EnsureRequirements` planning rule in order to avoid the addition of certain unnecessary shuffles.
As an example of how unnecessary shuffles can occur, consider SortMergeJoin, which requires clustered distribution and sorted ordering of its children's input rows. Say that both of SMJ's children produce unsorted output but are both SinglePartition. In this case, we will need to inject sort operators but should not need to inject Exchanges. Unfortunately, it looks like the EnsureRequirements unnecessarily repartitions using a hash partitioning.
This patch solves this problem by refactoring `EnsureRequirements` to properly implement the `compatibleWith` checks that were broken in earlier implementations. See the significant inline comments for a better description of how this works. The majority of this PR is new comments and test cases, with few actual changes to the code.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7988 from JoshRosen/exchange-fixes and squashes the following commits:
38006e7 [Josh Rosen] Rewrite EnsureRequirements _yet again_ to make things even simpler
0983f75 [Josh Rosen] More guarantees vs. compatibleWith cleanup; delete BroadcastPartitioning.
8784bd9 [Josh Rosen] Giant comment explaining compatibleWith vs. guarantees
1307c50 [Josh Rosen] Update conditions for requiring child compatibility.
18cddeb [Josh Rosen] Rename DummyPlan to DummySparkPlan.
2c7e126 [Josh Rosen] Merge remote-tracking branch 'origin/master' into exchange-fixes
fee65c4 [Josh Rosen] Further refinement to comments / reasoning
642b0bb [Josh Rosen] Further expand comment / reasoning
06aba0c [Josh Rosen] Add more comments
8dbc845 [Josh Rosen] Add even more tests.
4f08278 [Josh Rosen] Fix the test by adding the compatibility check to EnsureRequirements
a1c12b9 [Josh Rosen] Add failing test to demonstrate allCompatible bug
0725a34 [Josh Rosen] Small assertion cleanup.
5172ac5 [Josh Rosen] Add test for requiresChildrenToProduceSameNumberOfPartitions.
2e0f33a [Josh Rosen] Write a more generic test for EnsureRequirements.
752b8de [Josh Rosen] style fix
c628daf [Josh Rosen] Revert accidental ExchangeSuite change.
c9fb231 [Josh Rosen] Rewrite exchange to fix better handle this case.
adcc742 [Josh Rosen] Move test to PlannerSuite.
0675956 [Josh Rosen] Preserving ordering and partitioning in row format converters also does not help.
cc5669c [Josh Rosen] Adding outputPartitioning to Repartition does not fix the test.
2dfc648 [Josh Rosen] Add failing test illustrating bad exchange planning.
In order for this to work, I had to disable gap sampling.
Author: Reynold Xin <rxin@databricks.com>
Closes#8040 from rxin/SPARK-9752 and squashes the following commits:
f9e248c [Reynold Xin] Fix the test case for real this time.
adbccb3 [Reynold Xin] Fixed test case.
589fb23 [Reynold Xin] Merge branch 'SPARK-9752' of github.com:rxin/spark into SPARK-9752
55ccddc [Reynold Xin] Fixed core test.
78fa895 [Reynold Xin] [SPARK-9752][SQL] Support UnsafeRow in Sample operator.
c9e7112 [Reynold Xin] [SPARK-9752][SQL] Support UnsafeRow in Sample operator.
Users currently have to provide the full class name for external data sources, like:
`sqlContext.read.format("com.databricks.spark.avro").load(path)`
This allows external data source packages to register themselves using a Service Loader so that they can add custom alias like:
`sqlContext.read.format("avro").load(path)`
This makes it so that using external data source packages uses the same format as the internal data sources like parquet, json, etc.
Author: Joseph Batchik <joseph.batchik@cloudera.com>
Author: Joseph Batchik <josephbatchik@gmail.com>
Closes#7802 from JDrit/service_loader and squashes the following commits:
49a01ec [Joseph Batchik] fixed a couple of format / error bugs
e5e93b2 [Joseph Batchik] modified rat file to only excluded added services
72b349a [Joseph Batchik] fixed error with orc data source actually
9f93ea7 [Joseph Batchik] fixed error with orc data source
87b7f1c [Joseph Batchik] fixed typo
101cd22 [Joseph Batchik] removing unneeded changes
8f3cf43 [Joseph Batchik] merged in changes
b63d337 [Joseph Batchik] merged in master
95ae030 [Joseph Batchik] changed the new trait to be used as a mixin for data source to register themselves
74db85e [Joseph Batchik] reformatted class loader
ac2270d [Joseph Batchik] removing some added test
a6926db [Joseph Batchik] added test cases for data source loader
208a2a8 [Joseph Batchik] changes to do error catching if there are multiple data sources
946186e [Joseph Batchik] started working on service loader
This PR fixes a minor bug introduced in #7455: when writing decimals, we should use the unscaled Long for better performance when the precision <= 18 rather than 8 (should be a typo). This bug doesn't affect correctness, but hurts Parquet decimal writing performance.
This PR also replaced similar magic numbers with newly defined constants.
Author: Cheng Lian <lian@databricks.com>
Closes#8031 from liancheng/spark-4176/minor-fix-for-writing-decimals and squashes the following commits:
10d4ea3 [Cheng Lian] Should use unscaled Long to write decimals for precision <= 18 rather than 8
https://issues.apache.org/jira/browse/SPARK-9753
This PR makes TungstenAggregate to accept `InternalRow` instead of just `UnsafeRow`. Also, it adds an `getAggregationBufferFromUnsafeRow` method to `UnsafeFixedWidthAggregationMap`. It is useful when we already have grouping keys stored in `UnsafeRow`s. Finally, it wraps `InputStream` and `OutputStream` in `UnsafeRowSerializer` with `BufferedInputStream` and `BufferedOutputStream`, respectively.
Author: Yin Huai <yhuai@databricks.com>
Closes#8041 from yhuai/joinedRowForProjection and squashes the following commits:
7753e34 [Yin Huai] Use BufferedInputStream and BufferedOutputStream.
d68b74e [Yin Huai] Use joinedRow instead of UnsafeRowJoiner.
e93c009 [Yin Huai] Add getAggregationBufferFromUnsafeRow for cases that the given groupingKeyRow is already an UnsafeRow.
TypeCheck no longer applies in the new "Tungsten" world.
Author: Reynold Xin <rxin@databricks.com>
Closes#8043 from rxin/SPARK-9754 and squashes the following commits:
4ec471e [Reynold Xin] [SPARK-9754][SQL] Remove TypeCheck in debug package.
Previously, we would open a new file for each new dynamic written out using `HadoopFsRelation`. For formats like parquet this is very costly due to the buffers required to get good compression. In this PR I refactor the code allowing us to fall back on an external sort when many partitions are seen. As such each task will open no more than `spark.sql.sources.maxFiles` files. I also did the following cleanup:
- Instead of keying the file HashMap on an expensive to compute string representation of the partition, we now use a fairly cheap UnsafeProjection that avoids heap allocations.
- The control flow for instantiating and invoking a writer container has been simplified. Now instead of switching in two places based on the use of partitioning, the specific writer container must implement a single method `writeRows` that is invoked using `runJob`.
- `InternalOutputWriter` has been removed. Instead we have a `private[sql]` method `writeInternal` that converts and calls the public method. This method can be overridden by internal datasources to avoid the conversion. This change remove a lot of code duplication and per-row `asInstanceOf` checks.
- `commands.scala` has been split up.
Author: Michael Armbrust <michael@databricks.com>
Closes#8010 from marmbrus/fsWriting and squashes the following commits:
00804fe [Michael Armbrust] use shuffleMemoryManager.pageSizeBytes
775cc49 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into fsWriting
17b690e [Michael Armbrust] remove comment
40f0372 [Michael Armbrust] address comments
f5675bd [Michael Armbrust] char -> string
7e2d0a4 [Michael Armbrust] make sure we close current writer
8100100 [Michael Armbrust] delete empty commands.scala
71cc717 [Michael Armbrust] update comment
8ec75ac [Michael Armbrust] [SPARK-8890][SQL] Fallback on sorting when writing many dynamic partitions
The original code that this test tests is removed in 9270bd06fd. It was ignored shortly before that so we never caught it. This patch re-enables the test and adds the code necessary to make it pass.
JoshRosen yhuai
Author: Andrew Or <andrew@databricks.com>
Closes#8015 from andrewor14/SPARK-9674 and squashes the following commits:
225eac2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into SPARK-9674
8c24209 [Andrew Or] Fix NPE
e541d64 [Andrew Or] Track aggregation memory for both sort and hash
0be3a42 [Andrew Or] Fix test
All data sources show up as "PhysicalRDD" in physical plan explain. It'd be better if we can show the name of the data source.
Without this patch:
```
== Physical Plan ==
NewAggregate with UnsafeHybridAggregationIterator ArrayBuffer(date#0, cat#1) ArrayBuffer((sum(CAST((CAST(count#2, IntegerType) + 1), LongType))2,mode=Final,isDistinct=false))
Exchange hashpartitioning(date#0,cat#1)
NewAggregate with UnsafeHybridAggregationIterator ArrayBuffer(date#0, cat#1) ArrayBuffer((sum(CAST((CAST(count#2, IntegerType) + 1), LongType))2,mode=Partial,isDistinct=false))
PhysicalRDD [date#0,cat#1,count#2], MapPartitionsRDD[3] at
```
With this patch:
```
== Physical Plan ==
TungstenAggregate(key=[date#0,cat#1], value=[(sum(CAST((CAST(count#2, IntegerType) + 1), LongType)),mode=Final,isDistinct=false)]
Exchange hashpartitioning(date#0,cat#1)
TungstenAggregate(key=[date#0,cat#1], value=[(sum(CAST((CAST(count#2, IntegerType) + 1), LongType)),mode=Partial,isDistinct=false)]
ConvertToUnsafe
Scan ParquetRelation[file:/scratch/rxin/spark/sales4][date#0,cat#1,count#2]
```
Author: Reynold Xin <rxin@databricks.com>
Closes#8024 from rxin/SPARK-9733 and squashes the following commits:
811b90e [Reynold Xin] Fixed Python test case.
52cab77 [Reynold Xin] Cast.
eea9ccc [Reynold Xin] Fix test case.
fcecb22 [Reynold Xin] [SPARK-9733][SQL] Improve explain message for data source scan node.
This way we recursively test the data types.
cc chenghao-intel
Author: Reynold Xin <rxin@databricks.com>
Closes#8036 from rxin/cansupport and squashes the following commits:
f7302ff [Reynold Xin] Can GenerateUnsafeProjection.canSupport to test Exchange supported data types.
It is now subsumed by various Tungsten operators.
Author: Reynold Xin <rxin@databricks.com>
Closes#7981 from rxin/SPARK-9674 and squashes the following commits:
144f96e [Reynold Xin] Re-enable test
58b7332 [Reynold Xin] Disable failing list.
fb797e3 [Reynold Xin] Match all UDTs.
be9f243 [Reynold Xin] Updated if.
71fc99c [Reynold Xin] [SPARK-9674][SPARK-9667] Remove GeneratedAggregate & SparkSqlSerializer2.
This PR adds SQLMetric/SQLMetricParam/SQLMetricValue to specialize accumulators to avoid boxing. All SQL metrics should use these classes rather than `Accumulator`.
Author: zsxwing <zsxwing@gmail.com>
Closes#7996 from zsxwing/sql-accu and squashes the following commits:
14a5f0a [zsxwing] Address comments
367ca23 [zsxwing] Use localValue directly to avoid changing Accumulable
42f50c3 [zsxwing] Add SQLMetric to specialize accumulators to avoid boxing
Previously, we use 64MB as the default page size, which was way too big for a lot of Spark applications (especially for single node).
This patch changes it so that the default page size, if unset by the user, is determined by the number of cores available and the total execution memory available.
Author: Reynold Xin <rxin@databricks.com>
Closes#8012 from rxin/pagesize and squashes the following commits:
16f4756 [Reynold Xin] Fixed failing test.
5afd570 [Reynold Xin] private...
0d5fb98 [Reynold Xin] Update default value.
674a6cd [Reynold Xin] Address review feedback.
dc00e05 [Reynold Xin] Merge with master.
73ebdb6 [Reynold Xin] [SPARK-9700] Pick default page size more intelligently.
This is a follow-up PR to solve the UI issue when there are multiple SQLContexts. Each SQLContext has a separate tab and contains queries which are executed by this SQLContext.
<img width="1366" alt="multiple sqlcontexts" src="https://cloud.githubusercontent.com/assets/1000778/9088391/54584434-3bc2-11e5-9caf-94c2b0da528e.png">
Author: zsxwing <zsxwing@gmail.com>
Closes#7962 from zsxwing/multi-sqlcontext-ui and squashes the following commits:
cf661e1 [zsxwing] sql -> SQL
39b0c97 [zsxwing] Support multiple SQLContexts in Web UI
spark.sql.tungsten.enabled will be the default value for both codegen and unsafe, they are kept internally for debug/testing.
cc marmbrus rxin
Author: Davies Liu <davies@databricks.com>
Closes#7998 from davies/tungsten and squashes the following commits:
c1c16da [Davies Liu] update doc
1a47be1 [Davies Liu] use tungsten.enabled for both of codegen/unsafe
(cherry picked from commit 4e70e8256c)
Signed-off-by: Reynold Xin <rxin@databricks.com>
The issue is that a task may run multiple sorts, and the sorts run by the child operator (i.e. parent RDD) may acquire all available memory such that other sorts in the same task do not have enough to proceed. This manifests itself in an `IOException("Unable to acquire X bytes of memory")` thrown by `UnsafeExternalSorter`.
The solution is to reserve a page in each sorter in the chain before computing the child operator's (parent RDD's) partitions. This requires us to use a new special RDD that does some preparation before computing the parent's partitions.
Author: Andrew Or <andrew@databricks.com>
Closes#8011 from andrewor14/unsafe-starve-memory and squashes the following commits:
35b69a4 [Andrew Or] Simplify test
0b07782 [Andrew Or] Minor: update comments
5d5afdf [Andrew Or] Merge branch 'master' of github.com:apache/spark into unsafe-starve-memory
254032e [Andrew Or] Add tests
234acbd [Andrew Or] Reserve a page in sorter when preparing each partition
b889e08 [Andrew Or] MapPartitionsWithPreparationRDD
A small performance optimization – we don't need to generate a Tuple2 and then immediately discard the key. We also don't need an extra wrapper from InterruptibleIterator.
Author: Reynold Xin <rxin@databricks.com>
Closes#8000 from rxin/SPARK-9692 and squashes the following commits:
1d4d0b3 [Reynold Xin] [SPARK-9692] Remove SqlNewHadoopRDD's generated Tuple2 and InterruptibleIterator.
Make sure that `$"column"` is consistent with other methods with respect to backticks. Adds a bunch of tests for various ways of constructing columns.
Author: Michael Armbrust <michael@databricks.com>
Closes#7969 from marmbrus/namesWithDots and squashes the following commits:
53ef3d7 [Michael Armbrust] [SPARK-9650][SQL] Fix quoting behavior on interpolated column names
2bf7a92 [Michael Armbrust] WIP
spark.sql.tungsten.enabled will be the default value for both codegen and unsafe, they are kept internally for debug/testing.
cc marmbrus rxin
Author: Davies Liu <davies@databricks.com>
Closes#7998 from davies/tungsten and squashes the following commits:
c1c16da [Davies Liu] update doc
1a47be1 [Davies Liu] use tungsten.enabled for both of codegen/unsafe
This is the followup of https://github.com/apache/spark/pull/7813. It renames `HybridUnsafeAggregationIterator` to `TungstenAggregationIterator` and makes it only work with `UnsafeRow`. Also, I add a `TungstenAggregate` that uses `TungstenAggregationIterator` and make `SortBasedAggregate` (renamed from `SortBasedAggregate`) only works with `SafeRow`.
Author: Yin Huai <yhuai@databricks.com>
Closes#7954 from yhuai/agg-followUp and squashes the following commits:
4d2f4fc [Yin Huai] Add comments and free map.
0d7ddb9 [Yin Huai] Add TungstenAggregationQueryWithControlledFallbackSuite to test fall back process.
91d69c2 [Yin Huai] Rename UnsafeHybridAggregationIterator to TungstenAggregateIteraotr and make it only work with UnsafeRow.
This pull request adds a destructive iterator to BytesToBytesMap. When used, the iterator frees pages as it traverses them. This is part of the effort to avoid starving when we have more than one operators that can exhaust memory.
This is based on #7924, but fixes a bug there (Don't use destructive iterator in UnsafeKVExternalSorter).
Closes#7924.
Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#8003 from rxin/map-destructive-iterator and squashes the following commits:
6b618c3 [Reynold Xin] Don't use destructive iterator in UnsafeKVExternalSorter.
a7bd8ec [Reynold Xin] Merge remote-tracking branch 'viirya/destructive_iter' into map-destructive-iterator
7652083 [Liang-Chi Hsieh] For comments: add destructiveIterator(), modify unit test, remove code block.
4a3e9de [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter
581e9e3 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter
f0ff783 [Liang-Chi Hsieh] No need to free last page.
9e9d2a3 [Liang-Chi Hsieh] Add a destructive iterator for BytesToBytesMap.
This re-applies #7955, which was reverted due to a race condition to fix build breaking.
Author: Wenchen Fan <cloud0fan@outlook.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#8002 from rxin/InternalRow-toSeq and squashes the following commits:
332416a [Reynold Xin] Merge pull request #7955 from cloud-fan/toSeq
21665e2 [Wenchen Fan] fix hive again...
4addf29 [Wenchen Fan] fix hive
bc16c59 [Wenchen Fan] minor fix
33d802c [Wenchen Fan] pass data type info to InternalRow.toSeq
3dd033e [Wenchen Fan] move the default special getters implementation from InternalRow to BaseGenericInternalRow
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7955 from cloud-fan/toSeq and squashes the following commits:
21665e2 [Wenchen Fan] fix hive again...
4addf29 [Wenchen Fan] fix hive
bc16c59 [Wenchen Fan] minor fix
33d802c [Wenchen Fan] pass data type info to InternalRow.toSeq
3dd033e [Wenchen Fan] move the default special getters implementation from InternalRow to BaseGenericInternalRow
Inspiration drawn from this blog post: https://lab.getbase.com/pandarize-spark-dataframes/
Author: Reynold Xin <rxin@databricks.com>
Closes#7977 from rxin/isin and squashes the following commits:
9b1d3d6 [Reynold Xin] Added return.
2197d37 [Reynold Xin] Fixed test case.
7c1b6cf [Reynold Xin] Import warnings.
4f4a35d [Reynold Xin] [SPARK-9659][SQL] Rename inSet to isin to match Pandas function.
In short:
1- FrequentItems should not use the InternalRow representation, because the keys in the map get messed up. For example, every key in the Map correspond to the very last element observed in the partition, when the elements are strings.
2- Merging two partitions had a bug:
**Existing behavior with size 3**
Partition A -> Map(1 -> 3, 2 -> 3, 3 -> 4)
Partition B -> Map(4 -> 25)
Result -> Map()
**Correct Behavior:**
Partition A -> Map(1 -> 3, 2 -> 3, 3 -> 4)
Partition B -> Map(4 -> 25)
Result -> Map(3 -> 1, 4 -> 22)
cc mengxr rxin JoshRosen
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#7945 from brkyvz/freq-fix and squashes the following commits:
07fa001 [Burak Yavuz] address 2
1dc61a8 [Burak Yavuz] address 1
506753e [Burak Yavuz] fixed and added reg test
47bfd50 [Burak Yavuz] pushing
This PR also change to use `def` instead of `lazy val` for UnsafeProjection, because it's not thread safe.
TODO: cleanup the debug code once the flaky test passed 100 times.
Author: Davies Liu <davies@databricks.com>
Closes#7940 from davies/semijoin and squashes the following commits:
93baac7 [Davies Liu] fix outerjoin
5c40ded [Davies Liu] address comments
aa3de46 [Davies Liu] Merge branch 'master' of github.com:apache/spark into semijoin
7590a25 [Davies Liu] Merge branch 'master' of github.com:apache/spark into semijoin
2d4085b [Davies Liu] use def for resultProjection
0833407 [Davies Liu] Merge branch 'semijoin' of github.com:davies/spark into semijoin
e0d8c71 [Davies Liu] use lazy val
6a59e8f [Davies Liu] Update HashedRelation.scala
0fdacaf [Davies Liu] fix broadcast and thread-safety of UnsafeProjection
2fc3ef6 [Davies Liu] reproduce failure in semijoin
In order to support update a varlength (actually fixed length) object, the space should be preserved even it's null. And, we can't call setNullAt(i) for it anymore, we because setNullAt(i) will remove the offset of the preserved space, should call setDecimal(i, null, precision) instead.
After this, we can do hash based aggregation on DecimalType with precision > 18. In a tests, this could decrease the end-to-end run time of aggregation query from 37 seconds (sort based) to 24 seconds (hash based).
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#7978 from davies/update_decimal and squashes the following commits:
bed8100 [Davies Liu] isSettable -> isMutable
923c9eb [Davies Liu] address comments and fix bug
385891d [Davies Liu] Merge branch 'master' of github.com:apache/spark into update_decimal
36a1872 [Davies Liu] fix tests
cd6c524 [Davies Liu] support set decimal with precision > 18
![translate](http://www.w3resource.com/PostgreSQL/postgresql-translate-function.png)
Author: zhichao.li <zhichao.li@intel.com>
Closes#7709 from zhichao-li/translate and squashes the following commits:
9418088 [zhichao.li] refine checking condition
f2ab77a [zhichao.li] clone string
9d88f2d [zhichao.li] fix indent
6aa2962 [zhichao.li] style
e575ead [zhichao.li] add python api
9d4bab0 [zhichao.li] add special case for fodable and refactor unittest
eda7ad6 [zhichao.li] update to use TernaryExpression
cdfd4be [zhichao.li] add function translate
https://issues.apache.org/jira/browse/SPARK-9664
Author: Yin Huai <yhuai@databricks.com>
Closes#7982 from yhuai/udafRegister and squashes the following commits:
0cc2287 [Yin Huai] Remove UDAFRegistration and add apply to UserDefinedAggregateFunction.
The new aggregate replaces the old GeneratedAggregate.
Author: Reynold Xin <rxin@databricks.com>
Closes#7983 from rxin/remove-generated-agg and squashes the following commits:
8334aae [Reynold Xin] [SPARK-9674][SQL] Remove GeneratedAggregate.
This PR is a fork of PR #5733 authored by chenghao-intel. For committers who's going to merge this PR, please set the author to "Cheng Hao <hao.chengintel.com>".
----
When a data source relation meets the following requirements, we persist it in Hive compatible format, so that other systems like Hive can access it:
1. It's a `HadoopFsRelation`
2. It has only one input path
3. It's non-partitioned
4. It's data source provider can be naturally mapped to a Hive builtin SerDe (e.g. ORC and Parquet)
Author: Cheng Lian <lian@databricks.com>
Author: Cheng Hao <hao.cheng@intel.com>
Closes#7967 from liancheng/spark-6923/refactoring-pr-5733 and squashes the following commits:
5175ee6 [Cheng Lian] Fixes an oudated comment
3870166 [Cheng Lian] Fixes build error and comments
864acee [Cheng Lian] Refactors PR #5733
3490cdc [Cheng Hao] update the scaladoc
6f57669 [Cheng Hao] write schema info to hivemetastore for data source
This PR has the following three small fixes.
1. UnsafeKVExternalSorter does not use 0 as the initialSize to create an UnsafeInMemorySorter if its BytesToBytesMap is empty.
2. We will not not spill a InMemorySorter if it is empty.
3. We will not add a SpillReader to a SpillMerger if this SpillReader is empty.
JIRA: https://issues.apache.org/jira/browse/SPARK-9611
Author: Yin Huai <yhuai@databricks.com>
Closes#7948 from yhuai/unsafeEmptyMap and squashes the following commits:
9727abe [Yin Huai] Address Josh's comments.
34b6f76 [Yin Huai] 1. UnsafeKVExternalSorter does not use 0 as the initialSize to create an UnsafeInMemorySorter if its BytesToBytesMap is empty. 2. Do not spill a InMemorySorter if it is empty. 3. Do not add spill to SpillMerger if this spill is empty.
This patches renames `RowOrdering` to `InterpretedOrdering` and updates SortMergeJoin to use the `SparkPlan` methods for constructing its ordering so that it may benefit from codegen.
This is an updated version of #7408.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7973 from JoshRosen/SPARK-9054 and squashes the following commits:
e610655 [Josh Rosen] Add comment RE: Ascending ordering
34b8e0c [Josh Rosen] Import ordering
be19a0f [Josh Rosen] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places.
This continues tarekauel's work in #7778.
Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Tarek Auel <tarek.auel@googlemail.com>
Closes#7893 from viirya/codegen_in and squashes the following commits:
81ff97b [Liang-Chi Hsieh] For comments.
47761c6 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into codegen_in
cf4bf41 [Liang-Chi Hsieh] For comments.
f532b3c [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into codegen_in
446bbcd [Liang-Chi Hsieh] Fix bug.
b3d0ab4 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into codegen_in
4610eff [Liang-Chi Hsieh] Relax the types of references and update optimizer test.
224f18e [Liang-Chi Hsieh] Beef up the test cases for In and InSet to include all primitive data types.
86dc8aa [Liang-Chi Hsieh] Only convert In to InSet when the number of items in set is more than the threshold.
b7ded7e [Tarek Auel] [SPARK-9403][SQL] codeGen in / inSet
Currently we collapse successive projections that are added by `withColumn`. However, this optimization violates the constraint that adding nodes to a plan will never change its analyzed form and thus breaks caching. Instead of doing early optimization, in this PR I just fix some low-hanging slowness in the analyzer. In particular, I add a mechanism for skipping already analyzed subplans, `resolveOperators` and `resolveExpression`. Since trees are generally immutable after construction, it's safe to annotate a plan as already analyzed as any transformation will create a new tree with this bit no longer set.
Together these result in a faster analyzer than before, even with added timing instrumentation.
```
Original Code
[info] 3430ms
[info] 2205ms
[info] 1973ms
[info] 1982ms
[info] 1916ms
Without Project Collapsing in DataFrame
[info] 44610ms
[info] 45977ms
[info] 46423ms
[info] 46306ms
[info] 54723ms
With analyzer optimizations
[info] 6394ms
[info] 4630ms
[info] 4388ms
[info] 4093ms
[info] 4113ms
With resolveOperators
[info] 2495ms
[info] 1380ms
[info] 1685ms
[info] 1414ms
[info] 1240ms
```
Author: Michael Armbrust <michael@databricks.com>
Closes#7920 from marmbrus/withColumnCache and squashes the following commits:
2145031 [Michael Armbrust] fix hive udfs tests
5a5a525 [Michael Armbrust] remove wrong comment
7a507d5 [Michael Armbrust] style
b59d710 [Michael Armbrust] revert small change
1fa5949 [Michael Armbrust] move logic into LogicalPlan, add tests
0e2cb43 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into withColumnCache
c926e24 [Michael Armbrust] naming
e593a2d [Michael Armbrust] style
f5a929e [Michael Armbrust] [SPARK-9141][SQL] Remove project collapsing from DataFrame API
38b1c83 [Michael Armbrust] WIP
Support partitioning for the JSON data source.
Still 2 open issues for the `HadoopFsRelation`
- `refresh()` will invoke the `discoveryPartition()`, which will auto infer the data type for the partition columns, and maybe conflict with the given partition columns. (TODO enable `HadoopFsRelationSuite.Partition column type casting"
- When insert data into a cached HadoopFsRelation based table, we need to invalidate the cache after the insertion (TODO enable `InsertSuite.Caching`)
Author: Cheng Hao <hao.cheng@intel.com>
Closes#7696 from chenghao-intel/json and squashes the following commits:
d90b104 [Cheng Hao] revert the change for JacksonGenerator.apply
307111d [Cheng Hao] fix bug in the unit test
8738c8a [Cheng Hao] fix bug in unit testing
35f2cde [Cheng Hao] support partition for json format
The user specified schema is currently ignored when loading Parquet files.
One workaround is to use the `format` and `load` methods instead of `parquet`, e.g.:
```
val schema = ???
// schema is ignored
sqlContext.read.schema(schema).parquet("hdfs:///test")
// schema is retained
sqlContext.read.schema(schema).format("parquet").load("hdfs:///test")
```
The fix is simple, but I wonder if the `parquet` method should instead be written in a similar fashion to `orc`:
```
def parquet(path: String): DataFrame = format("parquet").load(path)
```
Author: Nathan Howell <nhowell@godaddy.com>
Closes#7947 from NathanHowell/SPARK-9618 and squashes the following commits:
d1ea62c [Nathan Howell] [SPARK-9618] [SQL] Use the specified schema when reading Parquet files
This PR includes the following changes:
### SPARK-8862: Add basic instrumentation to each SparkPlan operator
A SparkPlan can override `def accumulators: Map[String, Accumulator[_]]` to expose its metrics that can be displayed in UI. The UI will use them to track the updates and show them in the web page in real-time.
### SparkSQLExecution and SQLSparkListener
`SparkSQLExecution.withNewExecutionId` will set `spark.sql.execution.id` to the local properties so that we can use it to track all jobs that belong to the same query.
SQLSparkListener is a listener to track all accumulator updates of all tasks for a query. It receives them from heartbeats can the UI can query them in real-time.
When running a query, `SQLSparkListener.onExecutionStart` will be called. When a query is finished, `SQLSparkListener.onExecutionEnd` will be called. And the Spark jobs with the same execution id will be tracked and stored with this query.
`SQLSparkListener` has to store all accumulator updates for tasks separately. When a task fails and starts to retry, we need to drop the old accumulator updates. Because we can not revert our changes to an accumulator, we have to maintain these accumulator updates by ourselves so as to drop accumulator updates for a failed task.
### SPARK-8862: A new SQL tab
Includes two pages:
#### A page for all DataFrame/SQL queries
It will show the running, completed and failed queries in 3 tables. It also displays the jobs and their links for a query in each row.
#### A detail page for a DataFrame/SQL query
In this page, it also shows the SparkPlan metrics in real-time. Run a long-running query, such as
```
val testData = sc.parallelize((1 to 1000000).map(i => (i, i.toString))).toDF()
testData.select($"_1").filter($"_1" < 1000).foreach(_ => Thread.sleep(60))
```
and you will see the metrics keep updating in real-time.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7774)
<!-- Reviewable:end -->
Author: zsxwing <zsxwing@gmail.com>
Closes#7774 from zsxwing/sql-ui and squashes the following commits:
5a2bc99 [zsxwing] Remove UISeleniumSuite and its dependency
57d4cd2 [zsxwing] Use VisibleForTesting annotation
cc1c736 [zsxwing] Add SparkPlan.trackNumOfRowsEnabled to make subclasses easy to track the number of rows; fix the issue that the "save" action cannot collect metrics
3771ab0 [zsxwing] Register SQL metrics accmulators
3a101c0 [zsxwing] Change prepareCalled's type to AtomicBoolean for thread-safety
b8d5605 [zsxwing] Make prepare idempotent; call children's prepare in SparkPlan.prepare; change doPrepare to def
4ed11a1 [zsxwing] var -> val
332639c [zsxwing] Ignore UISeleniumSuite and SQLListenerSuite."no memory leak" because of SPARK-9580
bb52359 [zsxwing] Address other commens in SQLListener
c4d0f5d [zsxwing] Move newPredicate out of the iterator loop
957473c [zsxwing] Move STATIC_RESOURCE_DIR to object SQLTab
7ab4816 [zsxwing] Make SparkPlan accumulator API private[sql]
dae195e [zsxwing] Fix the code style and comments
3a66207 [zsxwing] Ignore irrelevant accumulators
b8484a1 [zsxwing] Merge branch 'master' into sql-ui
9406592 [zsxwing] Implement the SparkPlan viz
4ebce68 [zsxwing] Add SparkPlan.prepare to support BroadcastHashJoin to run background work in parallel
ca1811f [zsxwing] Merge branch 'master' into sql-ui
fef6fc6 [zsxwing] Fix a corner case
25f335c [zsxwing] Fix the code style
6eae828 [zsxwing] SQLSparkListener -> SQLListener; SparkSQLExecutionUIData -> SQLExecutionUIData; SparkSQLExecution -> SQLExecution
822af75 [zsxwing] Add SQLSparkListenerSuite and fix the issue about onExecutionEnd and onJobEnd
6be626f [zsxwing] Add UISeleniumSuite to test UI
d02a24d [zsxwing] Make ExecutionPage private
23abf73 [zsxwing] [SPARK-8862][SPARK-8862][SQL] Add basic instrumentation to each SparkPlan operator and add a new SQL tab
The current implementation of UnsafeExternalSort uses NoOpPrefixComparator for binary-typed data.
So, we need to add BinaryPrefixComparator in PrefixComparators.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#7676 from maropu/BinaryTypePrefixComparator and squashes the following commits:
fe6f31b [Takeshi YAMAMURO] Apply comments
d943c04 [Takeshi YAMAMURO] Add a codegen'd entry for BinaryType in SortPrefix
ecf3ac5 [Takeshi YAMAMURO] Support BinaryType in PrefixComparator
Let Decimal carry the correct precision and scale with DecimalType.
cc rxin yhuai
Author: Davies Liu <davies@databricks.com>
Closes#7925 from davies/decimal_scale and squashes the following commits:
e19701a [Davies Liu] some tweaks
57d78d2 [Davies Liu] fix tests
5d5bc69 [Davies Liu] match precision and scale with DecimalType
This PR is based on #7580 , thanks to EntilZha
PR for work on https://issues.apache.org/jira/browse/SPARK-8231
Currently, I have an initial implementation for contains. Based on discussion on JIRA, it should behave same as Hive: https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArrayContains.java#L102-L128
Main points are:
1. If the array is empty, null, or the value is null, return false
2. If there is a type mismatch, throw error
3. If comparison is not supported, throw error
Closes#7580
Author: Pedro Rodriguez <prodriguez@trulia.com>
Author: Pedro Rodriguez <ski.rodriguez@gmail.com>
Author: Davies Liu <davies@databricks.com>
Closes#7949 from davies/array_contains and squashes the following commits:
d3c08bc [Davies Liu] use foreach() to avoid copy
bc3d1fe [Davies Liu] fix array_contains
719e37d [Davies Liu] Merge branch 'master' of github.com:apache/spark into array_contains
e352cf9 [Pedro Rodriguez] fixed diff from master
4d5b0ff [Pedro Rodriguez] added docs and another type check
ffc0591 [Pedro Rodriguez] fixed unit test
7a22deb [Pedro Rodriguez] Changed test to use strings instead of long/ints which are different between python 2 an 3
b5ffae8 [Pedro Rodriguez] fixed pyspark test
4e7dce3 [Pedro Rodriguez] added more docs
3082399 [Pedro Rodriguez] fixed unit test
46f9789 [Pedro Rodriguez] reverted change
d3ca013 [Pedro Rodriguez] Fixed type checking to match hive behavior, then added tests to insure this
8528027 [Pedro Rodriguez] added more tests
686e029 [Pedro Rodriguez] fix scala style
d262e9d [Pedro Rodriguez] reworked type checking code and added more tests
2517a58 [Pedro Rodriguez] removed unused import
28b4f71 [Pedro Rodriguez] fixed bug with type conversions and re-added tests
12f8795 [Pedro Rodriguez] fix scala style checks
e8a20a9 [Pedro Rodriguez] added python df (broken atm)
65b562c [Pedro Rodriguez] made array_contains nullable false
33b45aa [Pedro Rodriguez] reordered test
9623c64 [Pedro Rodriguez] fixed test
4b4425b [Pedro Rodriguez] changed Arrays in tests to Seqs
72cb4b1 [Pedro Rodriguez] added checkInputTypes and docs
69c46fb [Pedro Rodriguez] added tests and codegen
9e0bfc4 [Pedro Rodriguez] initial attempt at implementation
This adds Python API for those DataFrame functions that is introduced in 1.5.
There is issue with serialize byte_array in Python 3, so some of functions (for BinaryType) does not have tests.
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#7922 from davies/python_functions and squashes the following commits:
8ad942f [Davies Liu] fix test
5fb6ec3 [Davies Liu] fix bugs
3495ed3 [Davies Liu] fix issues
ea5f7bb [Davies Liu] Add python API for DataFrame functions
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7932 from cloud-fan/generic-getter and squashes the following commits:
c60de4c [Wenchen Fan] do not expose generic getter in internal row
This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in #7762: store large records in their own overflow pages.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits:
967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
948c344 [Josh Rosen] Add large records tests for KV sorter.
3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method
380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite
a49baef [Josh Rosen] Address initial round of review comments
3edb931 [Josh Rosen] Remove accidentally-committed debug statements.
2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
We often return abstract iterator types in various sort-related classes (e.g. UnsafeKVExternalSorter). It is actually better to return a more concrete type, so the callsite uses that type and JIT can inline the iterator calls.
Author: Reynold Xin <rxin@databricks.com>
Closes#7911 from rxin/surface-concrete-type and squashes the following commits:
0422add [Reynold Xin] [SPARK-9577][SQL] Surface concrete iterator types in various sort classes.
Author: Cheng Lian <lian@databricks.com>
Closes#7895 from liancheng/spark-9554/enable-in-memory-partition-pruning and squashes the following commits:
67c403e [Cheng Lian] Enables in-memory partition pruning by default
This PR adds a base aggregation iterator `AggregationIterator`, which is used to create `SortBasedAggregationIterator` (for sort-based aggregation) and `UnsafeHybridAggregationIterator` (first it tries hash-based aggregation and falls back to the sort-based aggregation (using external sorter) if we cannot allocate memory for the map). With these two iterators, we will not need existing iterators and I am removing those. Also, we can use a single physical `Aggregate` operator and it internally determines what iterators to used.
https://issues.apache.org/jira/browse/SPARK-9240
Author: Yin Huai <yhuai@databricks.com>
Closes#7813 from yhuai/AggregateOperator and squashes the following commits:
e317e2b [Yin Huai] Remove unnecessary change.
74d93c5 [Yin Huai] Merge remote-tracking branch 'upstream/master' into AggregateOperator
ba6afbc [Yin Huai] Add a little bit more comments.
c9cf3b6 [Yin Huai] update
0f1b06f [Yin Huai] Remove unnecessary code.
21fd15f [Yin Huai] Remove unnecessary change.
964f88b [Yin Huai] Implement fallback strategy.
b1ea5cf [Yin Huai] wip
7fcbd87 [Yin Huai] Add a flag to control what iterator to use.
533d5b2 [Yin Huai] Prepare for fallback!
33b7022 [Yin Huai] wip
bd9282b [Yin Huai] UDAFs now supports UnsafeRow.
f52ee53 [Yin Huai] wip
3171f44 [Yin Huai] wip
d2c45a0 [Yin Huai] wip
f60cc83 [Yin Huai] Also check input schema.
af32210 [Yin Huai] Check iter.hasNext before we create an iterator because the constructor of the iterato will read at least one row from a non-empty input iter.
299008c [Yin Huai] First round cleanup.
3915bac [Yin Huai] Create a base iterator class for aggregation iterators and add the initial version of the hybrid iterator.
This PR adds an optimization rule, `FilterNullsInJoinKey`, to add `Filter` before join operators to filter out rows having null values for join keys.
This optimization is guarded by a new SQL conf, `spark.sql.advancedOptimization`.
The code in this PR was authored by yhuai; I'm opening this PR to factor out this change from #7685, a larger pull request which contains two other optimizations.
Author: Yin Huai <yhuai@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7768 from JoshRosen/filter-nulls-in-join-key and squashes the following commits:
c02fc3f [Yin Huai] Address Josh's comments.
0a8e096 [Yin Huai] Update comments.
ea7d5a6 [Yin Huai] Make sure we do not keep adding filters.
be88760 [Yin Huai] Make it clear that FilterNullsInJoinKeySuite.scala is used to test FilterNullsInJoinKey.
8bb39ad [Yin Huai] Fix non-deterministic tests.
303236b [Josh Rosen] Revert changes that are unrelated to null join key filtering
40eeece [Josh Rosen] Merge remote-tracking branch 'origin/master' into filter-nulls-in-join-key
c57a954 [Yin Huai] Bug fix.
d3d2e64 [Yin Huai] First round of cleanup.
f9516b0 [Yin Huai] Style
c6667e7 [Yin Huai] Add PartitioningCollection.
e616d3b [Yin Huai] wip
7c2d2d8 [Yin Huai] Bug fix and refactoring.
69bb072 [Yin Huai] Introduce NullSafeHashPartitioning and NullUnsafePartitioning.
d5b84c3 [Yin Huai] Do not add unnessary filters.
2201129 [Yin Huai] Filter out rows that will not be joined in equal joins early.
This PR adds `PartitioningCollection`, which is used to represent the `outputPartitioning` for SparkPlans with multiple children (e.g. `ShuffledHashJoin`). So, a `SparkPlan` can have multiple descriptions of its partitioning schemes. Taking `ShuffledHashJoin` as an example, it has two descriptions of its partitioning schemes, i.e. `left.outputPartitioning` and `right.outputPartitioning`. So when we have a query like `select * from t1 join t2 on (t1.x = t2.x) join t3 on (t2.x = t3.x)` will only have three Exchange operators (when shuffled joins are needed) instead of four.
The code in this PR was authored by yhuai; I'm opening this PR to factor out this change from #7685, a larger pull request which contains two other optimizations.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7773)
<!-- Reviewable:end -->
Author: Yin Huai <yhuai@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7773 from JoshRosen/multi-way-join-planning-improvements and squashes the following commits:
5c45924 [Josh Rosen] Merge remote-tracking branch 'origin/master' into multi-way-join-planning-improvements
cd8269b [Josh Rosen] Refactor test to use SQLTestUtils
2963857 [Yin Huai] Revert unnecessary SqlConf change.
73913f7 [Yin Huai] Add comments and test. Also, revert the change in ShuffledHashOuterJoin for now.
4a99204 [Josh Rosen] Delete unrelated expression change
884ab95 [Josh Rosen] Carve out only SPARK-2205 changes.
247e5fa [Josh Rosen] Merge remote-tracking branch 'origin/master' into multi-way-join-planning-improvements
c57a954 [Yin Huai] Bug fix.
d3d2e64 [Yin Huai] First round of cleanup.
f9516b0 [Yin Huai] Style
c6667e7 [Yin Huai] Add PartitioningCollection.
e616d3b [Yin Huai] wip
7c2d2d8 [Yin Huai] Bug fix and refactoring.
69bb072 [Yin Huai] Introduce NullSafeHashPartitioning and NullUnsafePartitioning.
d5b84c3 [Yin Huai] Do not add unnessary filters.
2201129 [Yin Huai] Filter out rows that will not be joined in equal joins early.
This pull request creates two isOrderable functions in RowOrdering that can be used to check whether a data type or a sequence of expressions can be used in sorting.
Author: Reynold Xin <rxin@databricks.com>
Closes#7880 from rxin/SPARK-9546 and squashes the following commits:
f9e322d [Reynold Xin] Fixed tests.
0439b43 [Reynold Xin] [SPARK-9546][SQL] Centralize orderable data type checking.
This pull request adds a destructAndCreateExternalSorter method to UnsafeFixedWidthAggregationMap. The new method does the following:
1. Creates a new external sorter UnsafeKVExternalSorter
2. Adds all the data into an in-memory sorter, sorts them
3. Spills the sorted in-memory data to disk
This method can be used to fallback to sort-based aggregation when under memory pressure.
The pull request also includes accounting fixes from JoshRosen.
TODOs (that can be done in follow-up PRs)
- [x] Address Josh's feedbacks from #7849
- [x] More documentation and test cases
- [x] Make sure we are doing memory accounting correctly with test cases (e.g. did we release the memory in BytesToBytesMap twice?)
- [ ] Look harder at possible memory leaks and exception handling
- [ ] Randomized tester for the KV sorter as well as the aggregation map
Author: Reynold Xin <rxin@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7860 from rxin/kvsorter and squashes the following commits:
986a58c [Reynold Xin] Bug fix.
599317c [Reynold Xin] Style fix and slightly more compact code.
fe7bd4e [Reynold Xin] Bug fixes.
fd71bef [Reynold Xin] Merge remote-tracking branch 'josh/large-records-in-sql-sorter' into kvsorter-with-josh-fix
3efae38 [Reynold Xin] More fixes and documentation.
45f1b09 [Josh Rosen] Ensure that spill files are cleaned up
f6a9bd3 [Reynold Xin] Josh feedback.
9be8139 [Reynold Xin] Remove testSpillFrequency.
7cbe759 [Reynold Xin] [SPARK-9531][SQL] UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter.
ae4a8af [Josh Rosen] Detect leaked unsafe memory in UnsafeExternalSorterSuite.
52f9b06 [Josh Rosen] Detect ShuffleMemoryManager leaks in UnsafeExternalSorter.
Author: Reynold Xin <rxin@databricks.com>
Closes#7861 from rxin/api-audit and squashes the following commits:
7200256 [Reynold Xin] [SPARK-9208][SQL] Sort DataFrame functions alphabetically.
Generate prefix for DecimalType, fix the random generator of decimal
cc JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes#7857 from davies/sort_decimal and squashes the following commits:
2433959 [Davies Liu] Merge branch 'master' of github.com:apache/spark into sort_decimal
de24253 [Davies Liu] fix style
0a54c1a [Davies Liu] sort decimal
When accessing a column in UnsafeRow, it's good to avoid the copy, then we should do deep copy when turn the UnsafeRow into generic Row, this PR brings generated FromUnsafeProjection to do that.
This PR also fix the expressions that cache the UTF8String, which should also copy it.
Author: Davies Liu <davies@databricks.com>
Closes#7840 from davies/avoid_copy and squashes the following commits:
230c8a1 [Davies Liu] address comment
fd797c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into avoid_copy
e095dd0 [Davies Liu] rollback rename
8ef5b0b [Davies Liu] copy String in Columnar
81360b8 [Davies Liu] fix class name
9aecb88 [Davies Liu] use FromUnsafeProjection to do deep copy for UTF8String and struct
This PR is based on #7643 , thanks to adrian-wang
Author: Davies Liu <davies@databricks.com>
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#7847 from davies/datediff and squashes the following commits:
74333d7 [Davies Liu] fix bug
22d8a8c [Davies Liu] optimize
85cdd21 [Davies Liu] remove unnecessary tests
241d90c [Davies Liu] Merge branch 'master' of github.com:apache/spark into datediff
e9dc0f5 [Davies Liu] fix datediff/to_utc_timestamp/from_utc_timestamp
c360447 [Daoyuan Wang] function datediff, to_utc_timestamp, from_utc_timestamp (commits merged)
This PR is based on #7208 , thanks to HuJiayin
Closes#7208
Author: HuJiayin <jiayin.hu@intel.com>
Author: Davies Liu <davies@databricks.com>
Closes#7850 from davies/initcap and squashes the following commits:
54472e9 [Davies Liu] fix python test
17ffe51 [Davies Liu] Merge branch 'master' of github.com:apache/spark into initcap
ca46390 [Davies Liu] Merge branch 'master' of github.com:apache/spark into initcap
3a906e4 [Davies Liu] implement title case in UTF8String
8b2506a [HuJiayin] Update functions.py
2cd43e5 [HuJiayin] fix python style check
b616c0e [HuJiayin] add python api
1f5a0ef [HuJiayin] add codegen
7e0c604 [HuJiayin] Merge branch 'master' of https://github.com/apache/spark into initcap
6a0b958 [HuJiayin] add column
c79482d [HuJiayin] support soundex
7ce416b [HuJiayin] support initcap rebase code
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#7856 from davies/sort_improve and squashes the following commits:
5fc81bd [Davies Liu] support DateType/TimestampType
This pull request adds a sortedIterator method to UnsafeFixedWidthAggregationMap that sorts its data in-place by the grouping key.
This is needed so we can fallback to external sorting for aggregation.
Author: Reynold Xin <rxin@databricks.com>
Closes#7849 from rxin/bytes2bytes-sorting and squashes the following commits:
75018c6 [Reynold Xin] Updated documentation.
81a8694 [Reynold Xin] [SPARK-9520][SQL] Support in-place sort in UnsafeFixedWidthAggregationMap.
This is based on #7641, thanks to zhichao-li
Closes#7641
Author: zhichao.li <zhichao.li@intel.com>
Author: Davies Liu <davies@databricks.com>
Closes#7848 from davies/substr and squashes the following commits:
461b709 [Davies Liu] remove bytearry from tests
b45377a [Davies Liu] Merge branch 'master' of github.com:apache/spark into substr
01d795e [zhichao.li] scala style
99aa130 [zhichao.li] add substring to dataframe
4f68bfe [zhichao.li] add binary type support for substring
This PR is based on #7581 , just fix the conflict.
Author: Cheng Hao <hao.cheng@intel.com>
Author: Davies Liu <davies@databricks.com>
Closes#7851 from davies/sort_array and squashes the following commits:
a80ef66 [Davies Liu] fix conflict
7cfda65 [Davies Liu] Merge branch 'master' of github.com:apache/spark into sort_array
664c960 [Cheng Hao] update the sort_array by using the ArrayData
276d2d5 [Cheng Hao] add empty line
0edab9c [Cheng Hao] Add asending/descending support for sort_array
80fc0f8 [Cheng Hao] Add type checking
a42b678 [Cheng Hao] Add sort_array support
This PR adds a `MapData` as internal representation of map type in Spark SQL, and provides a default implementation with just 2 `ArrayData`.
After that, we have specialized getters for all internal type, so I removed generic getter in `ArrayData` and added specialized `toArray` for it.
Also did some refactor and cleanup for `InternalRow` and its subclasses.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7799 from cloud-fan/map-data and squashes the following commits:
77d482f [Wenchen Fan] fix python
e8f6682 [Wenchen Fan] skip MapData equality check in HiveInspectorSuite
40cc9db [Wenchen Fan] add toString
6e06ec9 [Wenchen Fan] some more cleanup
a90aca1 [Wenchen Fan] add MapData
BytesToBytesMap current encodes key/value data in the following format:
```
8B key length, key data, 8B value length, value data
```
UnsafeExternalSorter, on the other hand, encodes data this way:
```
4B record length, data
```
As a result, we cannot pass records encoded by BytesToBytesMap directly into UnsafeExternalSorter for sorting. However, if we rearrange data slightly, we can then pass the key/value records directly into UnsafeExternalSorter:
```
4B key+value length, 4B key length, key data, value data
```
Author: Reynold Xin <rxin@databricks.com>
Closes#7845 from rxin/kvsort-rebase and squashes the following commits:
5716b59 [Reynold Xin] Fixed test.
2e62ccb [Reynold Xin] Updated BytesToBytesMap's data encoding to put the key first.
a51b641 [Reynold Xin] Added a KV sorter interface.
Add expression `sort_array` support.
Author: Cheng Hao <hao.cheng@intel.com>
This patch had conflicts when merged, resolved by
Committer: Davies Liu <davies.liu@gmail.com>
Closes#7581 from chenghao-intel/sort_array and squashes the following commits:
664c960 [Cheng Hao] update the sort_array by using the ArrayData
276d2d5 [Cheng Hao] add empty line
0edab9c [Cheng Hao] Add asending/descending support for sort_array
80fc0f8 [Cheng Hao] Add type checking
a42b678 [Cheng Hao] Add sort_array support
This PR is based on #7533 , thanks to zhichao-li
Closes#7533
Author: zhichao.li <zhichao.li@intel.com>
Author: Davies Liu <davies@databricks.com>
Closes#7843 from davies/str_index and squashes the following commits:
391347b [Davies Liu] add python api
3ce7802 [Davies Liu] fix substringIndex
f2d29a1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into str_index
515519b [zhichao.li] add foldable and remove null checking
9546991 [zhichao.li] scala style
67c253a [zhichao.li] hide some apis and clean code
b19b013 [zhichao.li] add codegen and clean code
ac863e9 [zhichao.li] reduce the calling of numChars
12e108f [zhichao.li] refine unittest
d92951b [zhichao.li] add lastIndexOf
52d7b03 [zhichao.li] add substring_index function
This patch creates a code generated unsafe row concatenator that can be used to concatenate/join two UnsafeRows into a single UnsafeRow.
Since it is inherently hard to test these low level stuff, the test suites employ randomized testing heavily in order to guarantee correctness.
Author: Reynold Xin <rxin@databricks.com>
Closes#7821 from rxin/rowconcat and squashes the following commits:
8717f35 [Reynold Xin] Rebase and code review.
72c5d8e [Reynold Xin] Fixed a bug.
a84ed2e [Reynold Xin] Fixed offset.
40c3fb2 [Reynold Xin] Reset random data generator.
f0913aa [Reynold Xin] Test fixes.
6687b6f [Reynold Xin] Updated documentation.
00354b9 [Reynold Xin] Support concat data as well.
e9a4347 [Reynold Xin] Updated.
6269f96 [Reynold Xin] Fixed a bug .
0f89716 [Reynold Xin] [SPARK-9358][SQL][WIP] Code generation for UnsafeRow concat.
This patch adds support for entries larger than the default page size in BytesToBytesMap. These large rows are handled by allocating special overflow pages to hold individual entries.
In addition, this patch integrates BytesToBytesMap with the ShuffleMemoryManager:
- Move BytesToBytesMap from `unsafe` to `core` so that it can import `ShuffleMemoryManager`.
- Before allocating new data pages, ask the ShuffleMemoryManager to reserve the memory:
- `putNewKey()` now returns a boolean to indicate whether the insert succeeded or failed due to a lack of memory. The caller can use this value to respond to the memory pressure (e.g. by spilling).
- `UnsafeFixedWidthAggregationMap. getAggregationBuffer()` now returns `null` to signal failure due to a lack of memory.
- Updated all uses of these classes to handle these error conditions.
- Added new tests for allocating large records and for allocations which fail due to memory pressure.
- Extended the `afterAll()` test teardown methods to detect ShuffleMemoryManager leaks.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7762 from JoshRosen/large-rows and squashes the following commits:
ae7bc56 [Josh Rosen] Fix compilation
82fc657 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-rows
34ab943 [Josh Rosen] Remove semi
31a525a [Josh Rosen] Integrate BytesToBytesMap with ShuffleMemoryManager.
626b33c [Josh Rosen] Move code to sql/core and spark/core packages so that ShuffleMemoryManager can be integrated
ec4484c [Josh Rosen] Move BytesToBytesMap from unsafe package to core.
642ed69 [Josh Rosen] Rename size to numElements
bea1152 [Josh Rosen] Add basic test.
2cd3570 [Josh Rosen] Remove accidental duplicated code
07ff9ef [Josh Rosen] Basic support for large rows in BytesToBytesMap.
This PR brings SQL function soundex(), see https://issues.apache.org/jira/browse/HIVE-9738
It's based on #7115 , thanks to HuJiayin
Author: HuJiayin <jiayin.hu@intel.com>
Author: Davies Liu <davies@databricks.com>
Closes#7812 from davies/soundex and squashes the following commits:
fa75941 [Davies Liu] Merge branch 'master' of github.com:apache/spark into soundex
a4bd6d8 [Davies Liu] fix soundex
2538908 [HuJiayin] add codegen soundex
d15d329 [HuJiayin] add back ut
ded1a14 [HuJiayin] Merge branch 'master' of https://github.com/apache/spark
e2dec2c [HuJiayin] support soundex rebase code
This PR enables the processing of multiple window frames in a single window operator. This should improve the performance of processing multiple window expressions wich share partition by/order by clauses, because it will be more efficient with respect to memory use and group processing.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#7515 from hvanhovell/SPARK-8640 and squashes the following commits:
f0e1c21 [Herman van Hovell] Changed Window Logical/Physical plans to use partition by/order by specs directly instead of using WindowSpec.
e1711c2 [Herman van Hovell] Enabled the processing of multiple window frames in a single Window operator.
This PR address the comments in #7805
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#7817 from davies/trunc and squashes the following commits:
f729d5f [Davies Liu] rollback
cb7f7832 [Davies Liu] genCode() is protected
31e52ef [Davies Liu] fix style
ed1edc7 [Davies Liu] address comments for #7805
This PR is based on #6988 , thanks to adrian-wang .
This brings two SQL functions: to_date() and trunc().
Closes#6988
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Author: Davies Liu <davies@databricks.com>
Closes#7805 from davies/to_date and squashes the following commits:
2c7beba [Davies Liu] Merge branch 'master' of github.com:apache/spark into to_date
310dd55 [Daoyuan Wang] remove dup test in rebase
980b092 [Daoyuan Wang] resolve rebase conflict
a476c5a [Daoyuan Wang] address comments from davies
d44ea5f [Daoyuan Wang] function to_date, trunc
While reviewing yhuai's patch for SPARK-2205 (#7773), I noticed that Exchange's `compatible` check may be incorrectly returning `false` in many cases. As far as I know, this is not actually a problem because the `compatible`, `meetsRequirements`, and `needsAnySort` checks are serving only as short-circuit performance optimizations that are not necessary for correctness.
In order to reduce code complexity, I think that we should remove these checks and unconditionally rewrite the operator's children. This should be safe because we rewrite the tree in a single bottom-up pass.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7807 from JoshRosen/SPARK-9489 and squashes the following commits:
9d76ce9 [Josh Rosen] [SPARK-9489] Remove compatibleWith, meetsRequirements, and needsAnySort checks from Exchange
This PR brings the support of DecimalType in UnsafeRow, for precision <= 18, it's settable, otherwise it's not settable.
Author: Davies Liu <davies@databricks.com>
Closes#7758 from davies/unsafe_decimal and squashes the following commits:
478b1ba [Davies Liu] address comments
536314c [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_decimal
7c2e77a [Davies Liu] fix JoinedRow
76d6fa4 [Davies Liu] fix tests
99d3151 [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_decimal
d49c6ae [Davies Liu] support DecimalType in UnsafeRow
Author: Reynold Xin <rxin@databricks.com>
Closes#7803 from rxin/SPARK-9458 and squashes the following commits:
5b032dc [Reynold Xin] Fix string.
b670dbb [Reynold Xin] [SPARK-9458][SPARK-9469][SQL] Code generate prefix computation in sorting & moves unsafe conversion out of TungstenSort.
This was previously committed but then reverted due to test failures (see #6769).
Author: Xiangrui Meng <meng@databricks.com>
Closes#7755 from rxin/SPARK-7157 and squashes the following commits:
fbf9044 [Xiangrui Meng] fix python test
542bd37 [Xiangrui Meng] update test
604fe6d [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-7157
f051afd [Xiangrui Meng] use udf instead of building expression
f4e9425 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-7157
8fb990b [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-7157
103beb3 [Xiangrui Meng] add Java-friendly sampleBy
991f26f [Xiangrui Meng] fix seed
4a14834 [Xiangrui Meng] move sampleBy to stat
832f7cc [Xiangrui Meng] add sampleBy to DataFrame
This PR is based on #7589 , thanks to adrian-wang
Added SQL function date_add, date_sub, add_months, month_between, also add a rule for
add/subtract of date/timestamp and interval.
Closes#7589
cc rxin
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Author: Davies Liu <davies@databricks.com>
Closes#7754 from davies/date_add and squashes the following commits:
e8c633a [Davies Liu] Merge branch 'master' of github.com:apache/spark into date_add
9e8e085 [Davies Liu] Merge branch 'master' of github.com:apache/spark into date_add
6224ce4 [Davies Liu] fix conclict
bd18cd4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into date_add
e47ff2c [Davies Liu] add python api, fix date functions
01943d0 [Davies Liu] Merge branch 'master' into date_add
522e91a [Daoyuan Wang] fix
e8a639a [Daoyuan Wang] fix
42df486 [Daoyuan Wang] fix style
87c4b77 [Daoyuan Wang] function add_months, months_between and some fixes
1a68e03 [Daoyuan Wang] poc of time interval calculation
c506661 [Daoyuan Wang] function date_add , date_sub
unix_timestamp(): long
Gets current Unix timestamp in seconds.
unix_timestamp(string|date): long
Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale, return null if fail: unix_timestamp('2009-03-20 11:30:01') = 1237573801
unix_timestamp(string date, string pattern): long
Convert time string with given pattern (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html]) to Unix time stamp (in seconds), return null if fail: unix_timestamp('2009-03-20', 'yyyy-MM-dd') = 1237532400.
from_unixtime(bigint unixtime[, string format]): string
Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the format of "1970-01-01 00:00:00".
Jira:
https://issues.apache.org/jira/browse/SPARK-8174https://issues.apache.org/jira/browse/SPARK-8175
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#7644 from adrian-wang/udfunixtime and squashes the following commits:
2fe20c4 [Daoyuan Wang] util.Date
ea2ec16 [Daoyuan Wang] use util.Date for better performance
a2cf929 [Daoyuan Wang] doc return null instead of 0
f6f070a [Daoyuan Wang] address comments from davies
6a4cbb3 [Daoyuan Wang] temp
56ded53 [Daoyuan Wang] rebase and address comments
14a8b37 [Daoyuan Wang] function unix_timestamp, from_unixtime
This pull request enables Unsafe mode by default in Spark SQL. In order to do this, we had to fix a number of small issues:
**List of fixed blockers**:
- [x] Make some default buffer sizes configurable so that HiveCompatibilitySuite can run properly (#7741).
- [x] Memory leak on grouped aggregation of empty input (fixed by #7560 to fix this)
- [x] Update planner to also check whether codegen is enabled before planning unsafe operators.
- [x] Investigate failing HiveThriftBinaryServerSuite test. This turns out to be caused by a ClassCastException that occurs when Exchange tries to apply an interpreted RowOrdering to an UnsafeRow when range partitioning an RDD. This could be fixed by #7408, but a shorter-term fix is to just skip the Unsafe exchange path when RangePartitioner is used.
- [x] Memory leak exceptions masking exceptions that actually caused tasks to fail (will be fixed by #7603).
- [x] ~~https://issues.apache.org/jira/browse/SPARK-9162, to implement code generation for ScalaUDF. This is necessary for `UDFSuite` to pass. For now, I've just ignored this test in order to try to find other problems while we wait for a fix.~~ This is no longer necessary as of #7682.
- [x] Memory leaks from Limit after UnsafeExternalSort cause the memory leak detector to fail tests. This is a huge problem in the HiveCompatibilitySuite (fixed by f4ac642a4e5b2a7931c5e04e086bb10e263b1db6).
- [x] Tests in `AggregationQuerySuite` are failing due to NaN-handling issues in UnsafeRow, which were fixed in #7736.
- [x] `org.apache.spark.sql.ColumnExpressionSuite.rand` needs to be updated so that the planner check also matches `TungstenProject`.
- [x] After having lowered the buffer sizes to 4MB so that most of HiveCompatibilitySuite runs:
- [x] Wrong answer in `join_1to1` (fixed by #7680)
- [x] Wrong answer in `join_nulls` (fixed by #7680)
- [x] Managed memory OOM / leak in `lateral_view`
- [x] Seems to hang indefinitely in `partcols1`. This might be a deadlock in script transformation or a bug in error-handling code? The hang was fixed by #7710.
- [x] Error while freeing memory in `partcols1`: will be fixed by #7734.
- [x] After fixing the `partcols1` hang, it appears that a number of later tests have issues as well.
- [x] Fix thread-safety bug in codegen fallback expression evaluation (#7759).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7564 from JoshRosen/unsafe-by-default and squashes the following commits:
83c0c56 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default
f4cc859 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default
963f567 [Josh Rosen] Reduce buffer size for R tests
d6986de [Josh Rosen] Lower page size in PySpark tests
013b9da [Josh Rosen] Also match TungstenProject in checkNumProjects
5d0b2d3 [Josh Rosen] Add task completion callback to avoid leak in limit after sort
ea250da [Josh Rosen] Disable unsafe Exchange path when RangePartitioning is used
715517b [Josh Rosen] Enable Unsafe by default
JIRA: https://issues.apache.org/jira/browse/SPARK-9361
Currently, we call `aggregate.Utils.tryConvert` in many places to check it the logical.Aggregate can be run with new aggregation. But looks like `aggregate.Utils.tryConvert` will cost considerable time to run. We should only call `tryConvert` once and keep it value in `logical.Aggregate` and reuse it.
In `org.apache.spark.sql.execution.aggregate.Utils`, the codes involving with `tryConvert` should be moved to catalyst because it actually doesn't deal with execution details.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#7677 from viirya/refactor_aggregate and squashes the following commits:
babea30 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into refactor_aggregate
9a589d7 [Liang-Chi Hsieh] Fix scala style.
0a91329 [Liang-Chi Hsieh] Refactor new aggregation code to reduce the times to call tryConvert.
JIRA: https://issues.apache.org/jira/browse/SPARK-8838
Currently all part-files are merged when merging parquet schema. However, in case there are many part-files and we can make sure that all the part-files have the same schema as their summary file. If so, we provide a configuration to disable merging part-files when merging parquet schema.
In short, we need to merge parquet schema because different summary files may contain different schema. But the part-files are confirmed to have the same schema with summary files.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#7238 from viirya/option_partfile_merge and squashes the following commits:
71d5b5f [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge
8816f44 [Liang-Chi Hsieh] For comments.
dbc8e6b [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge
afc2fa1 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge
d4ed7e6 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge
df43027 [Liang-Chi Hsieh] Get dataStatuses' partitions based on all paths.
4eb2f00 [Liang-Chi Hsieh] Use given parameter.
ea8f6e5 [Liang-Chi Hsieh] Correct the code comments.
a57be0e [Liang-Chi Hsieh] Merge part-files if there are no summary files.
47df981 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge
4caf293 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge
0e734e0 [Liang-Chi Hsieh] Use correct API.
3b6be5b [Liang-Chi Hsieh] Fix key not found.
4bdd7e0 [Liang-Chi Hsieh] Don't read footer files if we can skip them.
8bbebcb [Liang-Chi Hsieh] Figure out how to test the config.
bbd4ce7 [Liang-Chi Hsieh] Add config to enable/disable merging part-files when merging parquet schema.
SparkEnv might not have been set in local unit tests.
Author: Reynold Xin <rxin@databricks.com>
Closes#7784 from rxin/HashedRelationSuite and squashes the following commits:
435d64b [Reynold Xin] Fix flaky HashedRelationSuite
Users can now get the file name of the partition being read in. A thread local variable is in `SQLNewHadoopRDD` and is set when the partition is computed. `SQLNewHadoopRDD` is moved to core so that the catalyst package can reach it.
This supports:
`df.select(inputFileName())`
and
`sqlContext.sql("select input_file_name() from table")`
Author: Joseph Batchik <josephbatchik@gmail.com>
Closes#7743 from JDrit/input_file_name and squashes the following commits:
abb8609 [Joseph Batchik] fixed failing test and changed the default value to be an empty string
d2f323d [Joseph Batchik] updates per review
102061f [Joseph Batchik] updates per review
75313f5 [Joseph Batchik] small fixes
c7f7b5a [Joseph Batchik] addeding input file name to Spark SQL
JIRA: https://issues.apache.org/jira/browse/SPARK-9428
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes#7748 from yjshen/string_cleanup and squashes the following commits:
e0c2b3d [Yijie Shen] update codegen in RegExpExtract and RegExpReplace
26614d2 [Yijie Shen] MathFunctionSuite
a402859 [Yijie Shen] complex_create, conditional and cast
6e4e608 [Yijie Shen] arithmetic and cast
52593c1 [Yijie Shen] null input test cases for StringExpressionSuite
Also we could create a Python UDT without having a Scala one, it's important for Python users.
cc mengxr JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes#7453 from davies/class_in_main and squashes the following commits:
4dfd5e1 [Davies Liu] add tests for Python and Scala UDT
793d9b2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into class_in_main
dc65f19 [Davies Liu] address comment
a9a3c40 [Davies Liu] Merge branch 'master' of github.com:apache/spark into class_in_main
a86e1fc [Davies Liu] fix serialization
ad528ba [Davies Liu] Merge branch 'master' of github.com:apache/spark into class_in_main
63f52ef [Davies Liu] fix pylint check
655b8a9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into class_in_main
316a394 [Davies Liu] support Python UDT with UTF
0bcb3ef [Davies Liu] fix bug in mllib
de986d6 [Davies Liu] fix test
83d65ac [Davies Liu] fix bug in StructType
55bb86e [Davies Liu] support Python UDT in __main__ (without Scala one)
In our existing sort prefix generation code, we use expression's eval method to generate the prefix, which results in object allocation for every prefix. We can use the specialized getters available on InternalRow directly to avoid the object allocation.
I also removed the FLOAT prefix, opting for converting float directly to double.
Author: Reynold Xin <rxin@databricks.com>
Closes#7763 from rxin/sort-prefix and squashes the following commits:
5dc2f06 [Reynold Xin] [SPARK-9458] Avoid object allocation in prefix generation.
We need to make page sizes configurable so we can reduce them in unit tests and increase them in real production workloads. These sizes are now controlled by a new configuration, `spark.buffer.pageSize`. The new default is 64 megabytes.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7741 from JoshRosen/SPARK-9411 and squashes the following commits:
a43c4db [Josh Rosen] Fix pow
2c0eefc [Josh Rosen] Fix MAXIMUM_PAGE_SIZE_BYTES comment + value
bccfb51 [Josh Rosen] Lower page size to 4MB in TestHive
ba54d4b [Josh Rosen] Make UnsafeExternalSorter's page size configurable
0045aa2 [Josh Rosen] Make UnsafeShuffle's page size configurable
bc734f0 [Josh Rosen] Rename configuration
e614858 [Josh Rosen] Makes BytesToBytesMap page size configurable
We want to introduce a new IntervalType in 1.6 that is based on only the number of microseoncds,
so interval can be compared.
Renaming the existing IntervalType to CalendarIntervalType so we can do that in the future.
Author: Reynold Xin <rxin@databricks.com>
Closes#7745 from rxin/calendarintervaltype and squashes the following commits:
99f64e8 [Reynold Xin] One more line ...
13466c8 [Reynold Xin] Fixed tests.
e20f24e [Reynold Xin] [SPARK-9430][SQL] Rename IntervalType to CalendarIntervalType.
https://issues.apache.org/jira/browse/SPARK-9422
Author: Yin Huai <yhuai@databricks.com>
Closes#7737 from yhuai/removePlaceHolder and squashes the following commits:
ec29b44 [Yin Huai] Remove placeholder attributes.
Sort-merge join is more robust in Spark since sorting can be made using the Tungsten sort operator.
Author: Reynold Xin <rxin@databricks.com>
Closes#7733 from rxin/smj and squashes the following commits:
61e4d34 [Reynold Xin] Fixed test case.
5ffd731 [Reynold Xin] Fixed JoinSuite.
a137dc0 [Reynold Xin] [SPARK-9418][SQL] Use sort-merge join as the default shuffle join.
Since catalyst package already depends on Spark core, we can move those expressions
into catalyst, and simplify function registry.
This is a followup of #7478.
Author: Reynold Xin <rxin@databricks.com>
Closes#7735 from rxin/SPARK-8003 and squashes the following commits:
2ffbdc3 [Reynold Xin] [SPARK-8003][SQL] Move expressions in sql/core package to catalyst.
This PR introduce BytesToBytesMap to UnsafeHashedRelation, use it in executor for better performance.
It serialize all the key and values from java HashMap, put them into a BytesToBytesMap while deserializing. All the values for a same key are stored continuous to have better memory locality.
This PR also address the comments for #7480 , do some clean up.
Author: Davies Liu <davies@databricks.com>
Closes#7592 from davies/unsafe_map2 and squashes the following commits:
42c578a [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_map2
fd09528 [Davies Liu] remove thread local cache and update docs
1c5ad8d [Davies Liu] fix test
5eb1b5a [Davies Liu] address comments in #7480
46f1f22 [Davies Liu] fix style
fc221e0 [Davies Liu] use BytesToBytesMap for broadcast join
Certain applications would benefit from being able to inspect DataFrames that are straightforwardly produced by data sources that stem from files, and find out their source data. For example, one might want to display to a user the size of the data underlying a table, or to copy or mutate it.
This PR exposes an `inputFiles` method on DataFrame which attempts to discover the source data in a best-effort manner, by inspecting HadoopFsRelations and JSONRelations.
Author: Aaron Davidson <aaron@databricks.com>
Closes#7717 from aarondav/paths and squashes the following commits:
ff67430 [Aaron Davidson] inputFiles
0acd3ad [Aaron Davidson] [SPARK-9397] DataFrame should provide an API to find source data files if applicable
The original patch didn't handle nulls correctly for next_day.
Author: Reynold Xin <rxin@databricks.com>
Closes#7718 from rxin/next_day and squashes the following commits:
616a425 [Reynold Xin] Merged DatetimeExpressionsSuite into DateFunctionsSuite.
faa78cf [Reynold Xin] Merged DatetimeFunctionsSuite into DateExpressionsSuite.
6c4fb6a [Reynold Xin] [SPARK-8196][SQL] Fix null handling & documentation for next_day.
Author: Reynold Xin <rxin@databricks.com>
Closes#7720 from rxin/struct-followup and squashes the following commits:
d9757f5 [Reynold Xin] [SPARK-9373][SQL] follow up for StructType support in Tungsten projection.
This pull request updates GenerateUnsafeProjection to support StructType. If an input struct type is backed already by an UnsafeRow, GenerateUnsafeProjection copies the bytes directly into its buffer space without any conversion. However, if the input is not an UnsafeRow, GenerateUnsafeProjection runs the code generated recursively to convert the input into an UnsafeRow and then copies it into the buffer space.
Also create a TungstenProject operator that projects data directly into UnsafeRow. Note that I'm not sure if this is the way we want to structure Unsafe+codegen operators, but we can defer that decision to follow-up pull requests.
Author: Reynold Xin <rxin@databricks.com>
Closes#7689 from rxin/tungsten-struct-type and squashes the following commits:
9162f42 [Reynold Xin] Support IntervalType in UnsafeRow's getter.
be9f377 [Reynold Xin] Fixed tests.
10c4b7c [Reynold Xin] Format generated code.
77e8d0e [Reynold Xin] Fixed NondeterministicSuite.
ac4951d [Reynold Xin] Yay.
ac203bf [Reynold Xin] More comments.
9f36216 [Reynold Xin] Updated comment.
6b781fe [Reynold Xin] Reset the change in DataFrameSuite.
525b95b [Reynold Xin] Merged with master, more documentation & test cases.
321859a [Reynold Xin] [SPARK-9373][SQL] Support StructType in Tungsten projection [WIP]
next_day, returns next certain dayofweek.
last_day, returns the last day of the month which given date belongs to.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#6986 from adrian-wang/udfnlday and squashes the following commits:
ef7e3da [Daoyuan Wang] fix
02b3426 [Daoyuan Wang] address 2 comments
dc69630 [Daoyuan Wang] address comments from rxin
8846086 [Daoyuan Wang] address comments from rxin
d09bcce [Daoyuan Wang] multi fix
1a9de3d [Daoyuan Wang] function next_day and last_day
Since we have been seeing a lot of failures related to this new feature, lets put it behind a flag and turn it off by default.
Author: Michael Armbrust <michael@databricks.com>
Closes#7703 from marmbrus/optionalMetastorePruning and squashes the following commits:
6ad128c [Michael Armbrust] style
8447835 [Michael Armbrust] [SPARK-9386][SQL] Feature flag for metastore partition pruning
fd37b87 [Michael Armbrust] add config flag
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7673 from cloud-fan/row-generic-getter-columnar and squashes the following commits:
88b1170 [Wenchen Fan] fix style
eeae712 [Wenchen Fan] Remove Internal.get generic getter call in columnar cache code
https://issues.apache.org/jira/browse/SPARK-9349
With this PR, we only expose `UserDefinedAggregateFunction` (an abstract class) and `MutableAggregationBuffer` (an interface). Other internal wrappers and helper classes are moved to `org.apache.spark.sql.execution.aggregate` and marked as `private[sql]`.
Author: Yin Huai <yhuai@databricks.com>
Closes#7687 from yhuai/UDAF-cleanup and squashes the following commits:
db36542 [Yin Huai] Add comments to UDAF examples.
ae17f66 [Yin Huai] Address comments.
9c9fa5f [Yin Huai] UDAF cleanup.
This PR is based on #6796 authored by rtreffer.
To support large decimal precisions (> 18), we do the following things in this PR:
1. Making `CatalystSchemaConverter` support large decimal precision
Decimal types with large precision are always converted to fixed-length byte array.
2. Making `CatalystRowConverter` support reading decimal values with large precision
When the precision is > 18, constructs `Decimal` values with an unscaled `BigInteger` rather than an unscaled `Long`.
3. Making `RowWriteSupport` support writing decimal values with large precision
In this PR we always write decimals as fixed-length byte array, because Parquet write path hasn't been refactored to conform Parquet format spec (see SPARK-6774 & SPARK-8848).
Two follow-up tasks should be done in future PRs:
- [ ] Writing decimals as `INT32`, `INT64` when possible while fixing SPARK-8848
- [ ] Adding compatibility tests as part of SPARK-5463
Author: Cheng Lian <lian@databricks.com>
Closes#7455 from liancheng/spark-4176 and squashes the following commits:
a543d10 [Cheng Lian] Fixes errors introduced while rebasing
9e31cdf [Cheng Lian] Supports decimals with precision > 18 for Parquet
This PR fixes a set of issues related to multi-database. A new data structure `TableIdentifier` is introduced to identify a table among multiple databases. We should stop using a single `String` (table name without database name), or `Seq[String]` (optional database name plus table name) to identify tables internally.
Author: Cheng Lian <lian@databricks.com>
Closes#7623 from liancheng/spark-8131-multi-db and squashes the following commits:
f3bcd4b [Cheng Lian] Addresses PR comments
e0eb76a [Cheng Lian] Fixes styling issues
41e2207 [Cheng Lian] Fixes multi-database support
d4d1ec2 [Cheng Lian] Adds multi-database test cases
JIRA: https://issues.apache.org/jira/browse/SPARK-9306
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#7645 from viirya/smj_unsortable and squashes the following commits:
a240707 [Liang-Chi Hsieh] Use forall instead of exists for readability.
55221fa [Liang-Chi Hsieh] Shouldn't use SortMergeJoin when joining on unsortable columns.
As Hive does, we need to list all of the registered UDF and its usage for user.
We add the annotation to describe a UDF, so we can get the literal description info while registering the UDF.
e.g.
```scala
ExpressionDescription(
usage = "_FUNC_(expr) - Returns the absolute value of the numeric value",
extended = """> SELECT _FUNC_('-1')
1""")
case class Abs(child: Expression) extends UnaryArithmetic {
...
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes#7259 from chenghao-intel/desc_function and squashes the following commits:
cf29bba [Cheng Hao] fixing the code style issue
5193855 [Cheng Hao] Add more powerful parser for show functions
c645a6b [Cheng Hao] fix bug in unit test
78d40f1 [Cheng Hao] update the padding issue for usage
48ee4b3 [Cheng Hao] update as feedback
70eb4e9 [Cheng Hao] add show/describe function support
This PR removes the old Parquet support:
- Removes the old `ParquetRelation` together with related SQL configuration, plan nodes, strategies, utility classes, and test suites.
- Renames `ParquetRelation2` to `ParquetRelation`
- Renames `RowReadSupport` and `RowRecordMaterializer` to `CatalystReadSupport` and `CatalystRecordMaterializer` respectively, and moved them to separate files.
This follows naming convention used in other Parquet data models implemented in parquet-mr. It should be easier for developers who are familiar with Parquet to follow.
There's still some other code that can be cleaned up. Especially `RowWriteSupport`. But I'd like to leave this part to SPARK-8848.
Author: Cheng Lian <lian@databricks.com>
Closes#7441 from liancheng/spark-9095 and squashes the following commits:
c7b6e38 [Cheng Lian] Removes WriteToFile
2d688d6 [Cheng Lian] Renames ParquetRelation2 to ParquetRelation
ca9e1b7 [Cheng Lian] Removes old Parquet support
JIRA: https://issues.apache.org/jira/browse/SPARK-9356
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes#7671 from yjshen/deprecated_unlimit and squashes the following commits:
c707f56 [Yijie Shen] remove pattern matching in changePrecision
4a1823c [Yijie Shen] remove internal occurrence of Decimal.Unlimited
Currently UnsafeRow cannot support a generic getter. However, if the data type is known, we can support a generic getter.
Author: Reynold Xin <rxin@databricks.com>
Closes#7666 from rxin/generic-getter-with-datatype and squashes the following commits:
ee2874c [Reynold Xin] Add a default implementation for getStruct.
1e109a0 [Reynold Xin] [SPARK-9350][SQL] Introduce an InternalRow generic getter that requires a DataType.
033ee88 [Reynold Xin] Removed getAs in non test code.
Author: Reynold Xin <rxin@databricks.com>
Closes#7665 from rxin/remove-row-apply and squashes the following commits:
0b43001 [Reynold Xin] support getString in UnsafeRow.
176d633 [Reynold Xin] apply -> get.
2941324 [Reynold Xin] [SPARK-9348][SQL] Remove apply method on InternalRow.
Currently nondeterministic expression is broken without a explicit initialization phase.
Let me take `MonotonicallyIncreasingID` as an example. This expression need a mutable state to remember how many times it has been evaluated, so we use `transient var count: Long` there. By being transient, the `count` will be reset to 0 and **only** to 0 when serialize and deserialize it, as deserialize transient variable will result to default value. There is *no way* to use another initial value for `count`, until we add the explicit initialization phase.
Another use case is local execution for `LocalRelation`, there is no serialize and deserialize phase and thus we can't reset mutable states for it.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7535 from cloud-fan/init and squashes the following commits:
6c6f332 [Wenchen Fan] add test
ef68ff4 [Wenchen Fan] fix comments
9eac85e [Wenchen Fan] move init code to interpreted class
bb7d838 [Wenchen Fan] pulls out nondeterministic expressions into a project
b4a4fc7 [Wenchen Fan] revert a refactor
86fee36 [Wenchen Fan] add initialization phase for nondeterministic expression
This is a follow-up of #7626. It fixes `Row`/`InternalRow` conversion for data sources extending `HadoopFsRelation` with `needConversion` being `true`.
Author: Cheng Lian <lian@databricks.com>
Closes#7649 from liancheng/spark-9285-conversion-fix and squashes the following commits:
036a50c [Cheng Lian] Addresses PR comment
f6d7c6a [Cheng Lian] Fixes Row/InternalRow conversion for HadoopFsRelation
They were added to improve performance (so JIT can inline the JoinedRow calls). However, we can also just improve it by projecting output out to UnsafeRow in Tungsten variant of the operators.
Author: Reynold Xin <rxin@databricks.com>
Closes#7659 from rxin/remove-joinedrows and squashes the following commits:
7510447 [Reynold Xin] [SPARK-9336][SQL] Remove extra JoinedRows
Author: JD <jd@csh.rit.edu>
Author: Joseph Batchik <josephbatchik@gmail.com>
Closes#7606 from JDrit/expr and squashes the following commits:
ad7f607 [Joseph Batchik] fixing python linter error
9d6daea [Joseph Batchik] removed order by per @rxin's comment
707d5c6 [Joseph Batchik] Added expr to fuctions.py
79df83c [JD] added example to the docs
b89eec8 [JD] moved function up as per @rxin's comment
4960909 [JD] updated per @JoshRosen's comment
2cb329c [JD] updated per @rxin's comment
9a9ad0c [JD] removing unused import
6dc26d0 [JD] removed split
7f2222c [JD] Adding expr function as per SPARK-8668
JIRA: https://issues.apache.org/jira/browse/SPARK-9067
According to the description of the JIRA ticket, calling `reader.close()` only after the task is finished will cause memory and file open limit problem since these resources are occupied even we don't need that anymore.
This PR simply closes the reader early when we know there is no more data to read.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#7424 from viirya/close_reader and squashes the following commits:
3ff64e5 [Liang-Chi Hsieh] For comments.
3d20267 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader
e152182 [Liang-Chi Hsieh] For comments.
5116cbe [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader
3ceb755 [Liang-Chi Hsieh] For comments.
e34d98e [Liang-Chi Hsieh] For comments.
50ed729 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader
216912f [Liang-Chi Hsieh] Fix it.
f429016 [Liang-Chi Hsieh] Release reader if we don't need it.
a305621 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader
67569da [Liang-Chi Hsieh] Close reader early if there is no more data.
I also changed InternalRow's size/length function to numFields, to make it more obvious that it is not about bytes, but the number of fields.
Author: Reynold Xin <rxin@databricks.com>
Closes#7626 from rxin/internalRow and squashes the following commits:
e124daf [Reynold Xin] Fixed test case.
805ceb7 [Reynold Xin] Commented out the failed test suite.
f8a9ca5 [Reynold Xin] Fixed more bugs. Still at least one more remaining.
76d9081 [Reynold Xin] Fixed data sources.
7807f70 [Reynold Xin] Fixed DataFrameSuite.
cb60cd2 [Reynold Xin] Code review & small bug fixes.
0a2948b [Reynold Xin] Fixed style.
3280d03 [Reynold Xin] [SPARK-9285][SQL] Remove InternalRow's inheritance from Row.
JIRA: https://issues.apache.org/jira/browse/SPARK-8756
Currently, in ParquetRelation2, footers are re-read every time refresh() is called. But we can check if it is possibly changed before we do the reading because reading all footers will be expensive when there are too many partitions. This pr fixes this by keeping some cached information to check it.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#7154 from viirya/cached_footer_parquet_relation and squashes the following commits:
92e9347 [Liang-Chi Hsieh] Fix indentation.
ae0ec64 [Liang-Chi Hsieh] Fix wrong assignment.
c8fdfb7 [Liang-Chi Hsieh] Fix it.
a52b6d1 [Liang-Chi Hsieh] For comments.
c2a2420 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation
fa5458f [Liang-Chi Hsieh] Use Map to cache FileStatus and do merging previously loaded schema and newly loaded one.
6ae0911 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation
21bbdec [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation
12a0ed9 [Liang-Chi Hsieh] Add check of FileStatus's modification time.
186429d [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation
0ef8caf [Liang-Chi Hsieh] Keep cached information and avoid re-calculating footers.
fix some comments and code style for https://github.com/apache/spark/pull/7458
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7619 from cloud-fan/agg-clean and squashes the following commits:
3925457 [Wenchen Fan] one more...
cc78357 [Wenchen Fan] one more cleanup
26f6a93 [Wenchen Fan] some minor cleanup for the new aggregation
Romove Decimal.Unlimited (change to support precision up to 38, to match with Hive and other databases).
In order to keep backward source compatibility, Decimal.Unlimited is still there, but change to Decimal(38, 18).
If no precision and scale is provide, it's Decimal(10, 0) as before.
Author: Davies Liu <davies@databricks.com>
Closes#7605 from davies/decimal_unlimited and squashes the following commits:
aa3f115 [Davies Liu] fix tests and style
fb0d20d [Davies Liu] address comments
bfaae35 [Davies Liu] fix style
df93657 [Davies Liu] address comments and clean up
06727fd [Davies Liu] Merge branch 'master' of github.com:apache/spark into decimal_unlimited
4c28969 [Davies Liu] fix tests
8d783cc [Davies Liu] fix tests
788631c [Davies Liu] fix double with decimal in Union/except
1779bde [Davies Liu] fix scala style
c9c7c78 [Davies Liu] remove Decimal.Unlimited
PARQUET-136 and PARQUET-173 have been fixed in parquet-mr 1.7.0. It's time to enable filter push-down by default now.
Author: Cheng Lian <lian@databricks.com>
Closes#7612 from liancheng/spark-9207 and squashes the following commits:
77e6b5e [Cheng Lian] Enables Parquet filter push-down by default
Author: David Arroyo Cazorla <darroyo@stratio.com>
Closes#7618 from darroyocazorla/master and squashes the following commits:
5f91379 [David Arroyo Cazorla] [SPARK-5447][SQL] Replace reference 'schema rdd' with DataFrame
We forgot to update doc. brkyvz
Author: Xiangrui Meng <meng@databricks.com>
Closes#7608 from mengxr/SPARK-9243 and squashes the following commits:
0ea3236 [Xiangrui Meng] null -> zero in crosstab doc
Reverts ObjectPool. As it stands, it has a few problems:
1. ObjectPool doesn't work with spilling and memory accounting.
2. I don't think in the long run the idea of an object pool is what we want to support, since it essentially goes back to unmanaged memory, and creates pressure on GC, and is hard to account for the total in memory size.
3. The ObjectPool patch removed the specialized getters for strings and binary, and as a result, actually introduced branches when reading non primitive data types.
If we do want to support arbitrary user defined types in the future, I think we can just add an object array in UnsafeRow, rather than relying on indirect memory addressing through a pool. We also need to pick execution strategies that are optimized for those, rather than keeping a lot of unserialized JVM objects in memory during aggregation.
This is probably the hardest thing I had to revert in Spark, due to recent patches that also change the same part of the code. Would be great to get a careful look.
Author: Reynold Xin <rxin@databricks.com>
Closes#7591 from rxin/revert-object-pool and squashes the following commits:
01db0bc [Reynold Xin] Scala style.
eda89fc [Reynold Xin] Fixed describe.
2967118 [Reynold Xin] Fixed accessor for JoinedRow.
e3294eb [Reynold Xin] Merge branch 'master' into revert-object-pool
657855f [Reynold Xin] Temp commit.
c20f2c8 [Reynold Xin] Style fix.
fe37079 [Reynold Xin] Revert "[SPARK-8579] [SQL] support arbitrary object in UnsafeRow"
Spark has an option called spark.localExecution.enabled; according to the docs:
> Enables Spark to run certain jobs, such as first() or take() on the driver, without sending tasks to the cluster. This can make certain jobs execute very quickly, but may require shipping a whole partition of data to the driver.
This feature ends up adding quite a bit of complexity to DAGScheduler, especially in the runLocallyWithinThread method, but as far as I know nobody uses this feature (I searched the mailing list and haven't seen any recent mentions of the configuration nor stacktraces including the runLocally method). As a step towards scheduler complexity reduction, I propose that we remove this feature and all code related to it for Spark 1.5.
This pull request simply brings #7484 up to date.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#7585 from rxin/remove-local-exec and squashes the following commits:
84bd10e [Reynold Xin] Python fix.
1d9739a [Reynold Xin] Merge pull request #7484 from JoshRosen/remove-localexecution
eec39fa [Josh Rosen] Remove allowLocal(); deprecate user-facing uses of it.
b0835dc [Josh Rosen] Remove local execution code in DAGScheduler
8975d96 [Josh Rosen] Remove local execution tests.
ffa8c9b [Josh Rosen] Remove documentation for configuration
I've seen a few cases in the past few weeks that the compiler is throwing warnings that are caused by legitimate bugs. This patch upgrades warnings to errors, except deprecation warnings.
Note that ideally we should be able to mark deprecation warnings as errors as well. However, due to the lack of ability to suppress individual warning messages in the Scala compiler, we cannot do that (since we do need to access deprecated APIs in Hadoop).
Most of the work are done by ericl.
Author: Reynold Xin <rxin@databricks.com>
Author: Eric Liang <ekl@databricks.com>
Closes#7598 from rxin/warnings and squashes the following commits:
beb311b [Reynold Xin] Fixed tests.
542c031 [Reynold Xin] Fixed one more warning.
87c354a [Reynold Xin] Fixed all non-deprecation warnings.
78660ac [Eric Liang] first effort to fix warnings
This PR introduce unsafe version (using UnsafeRow) of HashJoin, HashOuterJoin and HashSemiJoin, including the broadcast one and shuffle one (except FullOuterJoin, which is better to be implemented using SortMergeJoin).
It use HashMap to store UnsafeRow right now, will change to use BytesToBytesMap for better performance (in another PR).
Author: Davies Liu <davies@databricks.com>
Closes#7480 from davies/unsafe_join and squashes the following commits:
6294b1e [Davies Liu] fix projection
10583f1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_join
dede020 [Davies Liu] fix test
84c9807 [Davies Liu] address comments
a05b4f6 [Davies Liu] support UnsafeRow in LeftSemiJoinBNL and BroadcastNestedLoopJoin
611d2ed [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_join
9481ae8 [Davies Liu] return UnsafeRow after join()
ca2b40f [Davies Liu] revert unrelated change
68f5cd9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_join
0f4380d [Davies Liu] ada a comment
69e38f5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_join
1a40f02 [Davies Liu] refactor
ab1690f [Davies Liu] address comments
60371f2 [Davies Liu] use UnsafeRow in SemiJoin
a6c0b7d [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_join
184b852 [Davies Liu] fix style
6acbb11 [Davies Liu] fix tests
95d0762 [Davies Liu] remove println
bea4a50 [Davies Liu] Unsafe HashJoin
This is the first PR for the aggregation improvement, which is tracked by https://issues.apache.org/jira/browse/SPARK-4366 (umbrella JIRA). This PR contains work for its subtasks, SPARK-3056, SPARK-3947, SPARK-4233, and SPARK-4367.
This PR introduces a new code path for evaluating aggregate functions. This code path is guarded by `spark.sql.useAggregate2` and by default the value of this flag is true.
This new code path contains:
* A new aggregate function interface (`AggregateFunction2`) and 7 built-int aggregate functions based on this new interface (`AVG`, `COUNT`, `FIRST`, `LAST`, `MAX`, `MIN`, `SUM`)
* A UDAF interface (`UserDefinedAggregateFunction`) based on the new code path and two example UDAFs (`MyDoubleAvg` and `MyDoubleSum`).
* A sort-based aggregate operator (`Aggregate2Sort`) for the new aggregate function interface .
* A sort-based aggregate operator (`FinalAndCompleteAggregate2Sort`) for distinct aggregations (for distinct aggregations the query plan will use `Aggregate2Sort` and `FinalAndCompleteAggregate2Sort` together).
With this change, `spark.sql.useAggregate2` is `true`, the flow of compiling an aggregation query is:
1. Our analyzer looks up functions and returns aggregate functions built based on the old aggregate function interface.
2. When our planner is compiling the physical plan, it tries try to convert all aggregate functions to the ones built based on the new interface. The planner will fallback to the old code path if any of the following two conditions is true:
* code-gen is disabled.
* there is any function that cannot be converted (right now, Hive UDAFs).
* the schema of grouping expressions contain any complex data type.
* There are multiple distinct columns.
Right now, the new code path handles a single distinct column in the query (you can have multiple aggregate functions using that distinct column). For a query having a aggregate function with DISTINCT and regular aggregate functions, the generated plan will do partial aggregations for those regular aggregate function.
Thanks chenghao-intel for his initial work on it.
Author: Yin Huai <yhuai@databricks.com>
Author: Michael Armbrust <michael@databricks.com>
Closes#7458 from yhuai/UDAF and squashes the following commits:
7865f5e [Yin Huai] Put the catalyst expression in the comment of the generated code for it.
b04d6c8 [Yin Huai] Remove unnecessary change.
f1d5901 [Yin Huai] Merge remote-tracking branch 'upstream/master' into UDAF
35b0520 [Yin Huai] Use semanticEquals to replace grouping expressions in the output of the aggregate operator.
3b43b24 [Yin Huai] bug fix.
00eb298 [Yin Huai] Make it compile.
a3ca551 [Yin Huai] Merge remote-tracking branch 'upstream/master' into UDAF
e0afca3 [Yin Huai] Gracefully fallback to old aggregation code path.
8a8ac4a [Yin Huai] Merge remote-tracking branch 'upstream/master' into UDAF
88c7d4d [Yin Huai] Enable spark.sql.useAggregate2 by default for testing purpose.
dc96fd1 [Yin Huai] Many updates:
85c9c4b [Yin Huai] newline.
43de3de [Yin Huai] Merge remote-tracking branch 'upstream/master' into UDAF
c3614d7 [Yin Huai] Handle single distinct column.
68b8ee9 [Yin Huai] Support single distinct column set. WIP
3013579 [Yin Huai] Format.
d678aee [Yin Huai] Remove AggregateExpressionSuite.scala since our built-in aggregate functions will be based on AlgebraicAggregate and we need to have another way to test it.
e243ca6 [Yin Huai] Add aggregation iterators.
a101960 [Yin Huai] Change MyJavaUDAF to MyDoubleSum.
594cdf5 [Yin Huai] Change existing AggregateExpression to AggregateExpression1 and add an AggregateExpression as the common interface for both AggregateExpression1 and AggregateExpression2.
380880f [Yin Huai] Merge remote-tracking branch 'upstream/master' into UDAF
0a827b3 [Yin Huai] Add comments and doc. Move some classes to the right places.
a19fea6 [Yin Huai] Add UDAF interface.
262d4c4 [Yin Huai] Make it compile.
b2e358e [Yin Huai] Merge remote-tracking branch 'upstream/master' into UDAF
6edb5ac [Yin Huai] Format update.
70b169c [Yin Huai] Remove groupOrdering.
4721936 [Yin Huai] Add CheckAggregateFunction to extendedCheckRules.
d821a34 [Yin Huai] Cleanup.
32aea9c [Yin Huai] Merge remote-tracking branch 'upstream/master' into UDAF
5b46d41 [Yin Huai] Bug fix.
aff9534 [Yin Huai] Make Aggregate2Sort work with both algebraic AggregateFunctions and non-algebraic AggregateFunctions.
2857b55 [Yin Huai] Merge remote-tracking branch 'upstream/master' into UDAF
4435f20 [Yin Huai] Add ConvertAggregateFunction to HiveContext's analyzer.
1b490ed [Michael Armbrust] make hive test
8cfa6a9 [Michael Armbrust] add test
1b0bb3f [Yin Huai] Do not bind references in AlgebraicAggregate and use code gen for all places.
072209f [Yin Huai] Bug fix: Handle expressions in grouping columns that are not attribute references.
f7d9e54 [Michael Armbrust] Merge remote-tracking branch 'apache/master' into UDAF
39ee975 [Yin Huai] Code cleanup: Remove unnecesary AttributeReferences.
b7720ba [Yin Huai] Add an analysis rule to convert aggregate function to the new version.
5c00f3f [Michael Armbrust] First draft of codegen
6bbc6ba [Michael Armbrust] now with correct answers\!
f7996d0 [Michael Armbrust] Add AlgebraicAggregate
dded1c5 [Yin Huai] wip
Author: Andrew Or <andrew@databricks.com>
Closes#7576 from andrewor14/clean-up-json-relation and squashes the following commits:
ea80803 [Andrew Or] Clean up duplicate code
Also make format_string the canonical form, rather than printf.
Author: Reynold Xin <rxin@databricks.com>
Closes#7579 from rxin/format_strings and squashes the following commits:
53ee54f [Reynold Xin] Fixed unit tests.
52357e1 [Reynold Xin] Add format_string alias.
b40a42a [Reynold Xin] [SPARK-9154][SQL] Rename formatString to format_string.
Jira: https://issues.apache.org/jira/browse/SPARK-9154
fixes bug of #7546
marmbrus I can't reopen the other PR, because I didn't closed it. Can you trigger Jenkins?
Author: Tarek Auel <tarek.auel@googlemail.com>
Closes#7571 from tarekauel/SPARK-9154 and squashes the following commits:
dcae272 [Tarek Auel] [SPARK-9154][SQL] build fix
1487602 [Tarek Auel] Merge remote-tracking branch 'upstream/master' into SPARK-9154
f512c5f [Tarek Auel] [SPARK-9154][SQL] build fix
a943d3e [Tarek Auel] [SPARK-9154] implicit input cast, added tests for null, support for null primitives
10b4de8 [Tarek Auel] [SPARK-9154][SQL] codegen removed fallback trait
cd8322b [Tarek Auel] [SPARK-9154][SQL] codegen string format
086caba [Tarek Auel] [SPARK-9154][SQL] codegen string format
This way, the sources package contains only public facing interfaces.
Author: Reynold Xin <rxin@databricks.com>
Closes#7565 from rxin/move-ds and squashes the following commits:
7661aff [Reynold Xin] Mima
9d5196a [Reynold Xin] Rearranged imports.
3dd7174 [Reynold Xin] [SPARK-8906][SQL] Move all internal data source classes into execution.datasources.
This patch fixes a managed memory leak in GeneratedAggregate. The leak occurs when the unsafe aggregation path is used to perform grouped aggregation on an empty input; in this case, GeneratedAggregate allocates an UnsafeFixedWidthAggregationMap that is never cleaned up because `next()` is never called on the aggregate result iterator.
This patch fixes this by short-circuiting on empty inputs.
This patch is an updated version of #6810.
Closes#6810.
Author: navis.ryu <navis@apache.org>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7560 from JoshRosen/SPARK-8357 and squashes the following commits:
3486ce4 [Josh Rosen] Some minor cleanup
c649310 [Josh Rosen] Revert SparkPlan change:
3c7db0f [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-8357
adc8239 [Josh Rosen] Back out Projection changes.
c5419b3 [navis.ryu] addressed comments
143e1ef [navis.ryu] fixed format & added test for CCE case
735972f [navis.ryu] used new conf apis
1a02a55 [navis.ryu] Rolled-back test-conf cleanup & fixed possible CCE & added more tests
51178e8 [navis.ryu] addressed comments
4d326b9 [navis.ryu] fixed test fails
15c5afc [navis.ryu] added a test as suggested by JoshRosen
d396589 [navis.ryu] added comments
1b07556 [navis.ryu] [SPARK-8357] [SQL] Memory leakage on unsafe aggregation path with empty input
JIRA:
https://issues.apache.org/jira/browse/SPARK-9081https://issues.apache.org/jira/browse/SPARK-9168
This PR target at two modifications:
1. Change `isNaN` to return `false` on `null` input
2. Make `dropna` and `fillna` to fill/drop NaN values as well
3. Implement `nanvl`
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes#7523 from yjshen/fillna_dropna and squashes the following commits:
f0a51db [Yijie Shen] make coalesce untouched and implement nanvl
1d3e35f [Yijie Shen] make Coalesce aware of NaN in order to support fillna
2760cbc [Yijie Shen] change isNaN(null) to false as well as implement dropna
Pull Request for: https://issues.apache.org/jira/browse/SPARK-8230
Primary issue resolved is to implement array/map size for Spark SQL. Code is ready for review by a committer. Chen Hao is on the JIRA ticket, but I don't know his username on github, rxin is also on JIRA ticket.
Things to review:
1. Where to put added functions namespace wise, they seem to be part of a few operations on collections which includes `sort_array` and `array_contains`. Hence the name given `collectionOperations.scala` and `_collection_functions` in python.
2. In Python code, should it be in a `1.5.0` function array or in a collections array?
3. Are there any missing methods on the `Size` case class? Looks like many of these functions have generated Java code, is that also needed in this case?
4. Something else?
Author: Pedro Rodriguez <ski.rodriguez@gmail.com>
Author: Pedro Rodriguez <prodriguez@trulia.com>
Closes#7462 from EntilZha/SPARK-8230 and squashes the following commits:
9a442ae [Pedro Rodriguez] fixed functions and sorted __all__
9aea3bb [Pedro Rodriguez] removed imports from python docs
15d4bf1 [Pedro Rodriguez] Added null test case and changed to nullSafeCodeGen
d88247c [Pedro Rodriguez] removed python code
bd5f0e4 [Pedro Rodriguez] removed duplicate function from rebase/merge
59931b4 [Pedro Rodriguez] fixed compile bug instroduced when merging
c187175 [Pedro Rodriguez] updated code to add size to __all__ directly and removed redundent pretty print
130839f [Pedro Rodriguez] fixed failing test
aa9bade [Pedro Rodriguez] fix style
e093473 [Pedro Rodriguez] updated python code with docs, switched classes/traits implemented, added (failing) expression tests
0449377 [Pedro Rodriguez] refactored code to use better abstract classes/traits and implementations
9a1a2ff [Pedro Rodriguez] added unit tests for map size
2bfbcb6 [Pedro Rodriguez] added unit test for size
20df2b4 [Pedro Rodriguez] Finished working version of size function and added it to python
b503e75 [Pedro Rodriguez] First attempt at implementing size for maps and arrays
99a6a5c [Pedro Rodriguez] fixed failing test
cac75ac [Pedro Rodriguez] fix style
933d843 [Pedro Rodriguez] updated python code with docs, switched classes/traits implemented, added (failing) expression tests
42bb7d4 [Pedro Rodriguez] refactored code to use better abstract classes/traits and implementations
f9c3b8a [Pedro Rodriguez] added unit tests for map size
2515d9f [Pedro Rodriguez] added documentation
0e60541 [Pedro Rodriguez] added unit test for size
acf9853 [Pedro Rodriguez] Finished working version of size function and added it to python
84a5d38 [Pedro Rodriguez] First attempt at implementing size for maps and arrays
Add expressions `regex_extract` & `regex_replace`
Author: Cheng Hao <hao.cheng@intel.com>
Closes#7468 from chenghao-intel/regexp and squashes the following commits:
e5ea476 [Cheng Hao] minor update for documentation
ef96fd6 [Cheng Hao] update the code gen
72cf28f [Cheng Hao] Add more log for compilation error
4e11381 [Cheng Hao] Add regexp_replace / regexp_extract support
This PR adds DataFrame reader/writer shortcut methods for ORC in both Scala and Python.
Author: Cheng Lian <lian@databricks.com>
Closes#7444 from liancheng/spark-9100 and squashes the following commits:
284d043 [Cheng Lian] Fixes PySpark test cases and addresses PR comments
e0b09fb [Cheng Lian] Adds DataFrame reader/writer shortcut methods for ORC
This patch addresses code review feedback from #7456.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7551 from JoshRosen/unsafe-exchange-followup and squashes the following commits:
76dbdf8 [Josh Rosen] Add comments + more methods to UnsafeRowSerializer
3d7a1f2 [Josh Rosen] Add writeToStream() method to UnsafeRow
It can be ambiguous whether that is a string literal or a column name.
cc marmbrus
Author: Reynold Xin <rxin@databricks.com>
Closes#7556 from rxin/str-exprs and squashes the following commits:
92afa83 [Reynold Xin] [SPARK-9208][SQL] Remove variant of DataFrame string functions that accept column names.
This PR tries to accelerate Parquet schema discovery and `HadoopFsRelation` partition discovery. The acceleration is done by the following means:
- Turning off schema merging by default
Schema merging is not the most common case, but requires reading footers of all Parquet part-files and can be very slow.
- Avoiding `FileSystem.globStatus()` call when possible
`FileSystem.globStatus()` may issue multiple synchronous RPC calls, and can be very slow (esp. on S3). This PR adds `SparkHadoopUtil.globPathIfNecessary()`, which only issues RPC calls when the path contain glob-pattern specific character(s) (`{}[]*?\`).
This is especially useful when converting a metastore Parquet table with lots of partitions, since Spark SQL adds all partition directories as the input paths, and currently we do a `globStatus` call on each input path sequentially.
- Listing leaf files in parallel when the number of input paths exceeds a threshold
Listing leaf files is required by partition discovery. Currently it is done on driver side, and can be slow when there are lots of (nested) directories, since each `FileSystem.listStatus()` call issues an RPC. In this PR, we list leaf files in a BFS style, and resort to a Spark job once we found that the number of directories need to be listed exceed a threshold.
The threshold is controlled by `SQLConf` option `spark.sql.sources.parallelPartitionDiscovery.threshold`, which defaults to 32.
- Discovering Parquet schema in parallel
Currently, schema merging is also done on driver side, and needs to read footers of all part-files. This PR uses a Spark job to do schema merging. Together with task side metadata reading in Parquet 1.7.0, we never read any footers on driver side now.
Author: Cheng Lian <lian@databricks.com>
Closes#7396 from liancheng/accel-parquet and squashes the following commits:
5598efc [Cheng Lian] Uses ParquetInputFormat[InternalRow] instead of ParquetInputFormat[Row]
ff32cd0 [Cheng Lian] Excludes directories while listing leaf files
3c580f1 [Cheng Lian] Fixes test failure caused by making "mergeSchema" default to "false"
b1646aa [Cheng Lian] Should allow empty input paths
32e5f0d [Cheng Lian] Moves schema merging to executor side
This PR also remove the duplicated code between registerFunction and UserDefinedFunction.
cc JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes#7450 from davies/fix_return_type and squashes the following commits:
e80bf9f [Davies Liu] remove debugging code
f94b1f6 [Davies Liu] fix mima
8f9c58b [Davies Liu] convert returned object from UDF into internal type