Commit graph

2266 commits

Author SHA1 Message Date
Huaxin Gao b072ff4d1d [SPARK-11474][SQL] change fetchSize to fetchsize
In DefaultDataSource.scala, it has
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
The parameters is CaseInsensitiveMap.
After this line
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
properties is set to all lower case key/value pairs and fetchSize becomes fetchsize.
However, in compute method in JDBCRDD, it has
val fetchSize = properties.getProperty("fetchSize", "0").toInt
so fetchSize value is always 0 and never gets set correctly.

Author: Huaxin Gao <huaxing@oc0558782468.ibm.com>

Closes #9473 from huaxingao/spark-11474.
2015-11-05 09:41:14 -08:00
Cheng Lian 77488fb8e5 [MINOR][SQL] A minor log line fix
`jars` in the log line is an array, so `$jars` doesn't print its content.

Author: Cheng Lian <lian@databricks.com>

Closes #9494 from liancheng/minor.log-fix.
2015-11-05 23:49:44 +08:00
Sean Owen 6f81eae24f [SPARK-11440][CORE][STREAMING][BUILD] Declare rest of @Experimental items non-experimental if they've existed since 1.2.0
Remove `Experimental` annotations in core, streaming for items that existed in 1.2.0 or before. The changes are:

* SparkContext
  * binary{Files,Records} : 1.2.0
  * submitJob : 1.0.0
* JavaSparkContext
  * binary{Files,Records} : 1.2.0
* DoubleRDDFunctions, JavaDoubleRDD
  * {mean,sum}Approx : 1.0.0
* PairRDDFunctions, JavaPairRDD
  * sampleByKeyExact : 1.2.0
  * countByKeyApprox : 1.0.0
* PairRDDFunctions
  * countApproxDistinctByKey : 1.1.0
* RDD
  * countApprox, countByValueApprox, countApproxDistinct : 1.0.0
* JavaRDDLike
  * countApprox : 1.0.0
* PythonHadoopUtil.Converter : 1.1.0
* PortableDataStream : 1.2.0 (related to binaryFiles)
* BoundedDouble : 1.0.0
* PartialResult : 1.0.0
* StreamingContext, JavaStreamingContext
  * binaryRecordsStream : 1.2.0
* HiveContext
  * analyze : 1.2.0

Author: Sean Owen <sowen@cloudera.com>

Closes #9396 from srowen/SPARK-11440.
2015-11-05 09:08:53 +00:00
Davies Liu 81498dd5c8 [SPARK-11425] [SPARK-11486] Improve hybrid aggregation
After aggregation, the dataset could be smaller than inputs, so it's better to do hash based aggregation for all inputs, then using sort based aggregation to merge them.

Author: Davies Liu <davies@databricks.com>

Closes #9383 from davies/fix_switch.
2015-11-04 21:30:21 -08:00
Zhenhua Wang a752ddad7f [SPARK-11398] [SQL] unnecessary def dialectClassName in HiveContext, and misleading dialect conf at the start of spark-sql
1. def dialectClassName in HiveContext is unnecessary.
In HiveContext, if conf.dialect == "hiveql", getSQLDialect() will return new HiveQLDialect(this);
else it will use super.getSQLDialect(). Then in super.getSQLDialect(), it calls dialectClassName, which is overriden in HiveContext and still return super.dialectClassName.
So we'll never reach the code "classOf[HiveQLDialect].getCanonicalName" of def dialectClassName in HiveContext.

2. When we start bin/spark-sql, the default context is HiveContext, and the corresponding dialect is hiveql.
However, if we type "set spark.sql.dialect;", the result is "sql", which is inconsistent with the actual dialect and is misleading. For example, we can use sql like "create table" which is only allowed in hiveql, but this dialect conf shows it's "sql".
Although this problem will not cause any execution error, it's misleading to spark sql users. Therefore I think we should fix it.
In this pr, while procesing “set spark.sql.dialect” in SetCommand, I use "conf.dialect" instead of "getConf()" for the case of key == SQLConf.DIALECT.key, so that it will return the right dialect conf.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #9349 from wzhfy/dialect.
2015-11-04 17:16:00 -08:00
Reynold Xin b6e0a5ae6f [SPARK-11510][SQL] Remove SQL aggregation tests for higher order statistics
We have some aggregate function tests in both DataFrameAggregateSuite and SQLQuerySuite. The two have almost the same coverage and we should just remove the SQL one.

Author: Reynold Xin <rxin@databricks.com>

Closes #9475 from rxin/SPARK-11510.
2015-11-04 16:49:25 -08:00
Reynold Xin d19f4fda63 [SPARK-11505][SQL] Break aggregate functions into multiple files
functions.scala was getting pretty long. I broke it into multiple files.

I also added explicit data types for some public vals, and renamed aggregate function pretty names to lower case, which is more consistent with rest of the functions.

Author: Reynold Xin <rxin@databricks.com>

Closes #9471 from rxin/SPARK-11505.
2015-11-04 13:44:07 -08:00
Reynold Xin abf5e4285d [SPARK-11504][SQL] API audit for distributeBy and localSort
1. Renamed localSort -> sortWithinPartitions to avoid ambiguity in "local"
2. distributeBy -> repartition to match the existing repartition.

Author: Reynold Xin <rxin@databricks.com>

Closes #9470 from rxin/SPARK-11504.
2015-11-04 12:33:47 -08:00
Liang-Chi Hsieh de289bf279 [SPARK-10304][SQL] Following up checking valid dir structure for partition discovery
This patch follows up #8840.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #9459 from viirya/detect_invalid_part_dir_following.
2015-11-04 10:56:32 -08:00
Reynold Xin 3bd6f5d2ae [SPARK-11490][SQL] variance should alias var_samp instead of var_pop.
stddev is an alias for stddev_samp. variance should be consistent with stddev.

Also took the chance to remove internal Stddev and Variance, and only kept StddevSamp/StddevPop and VarianceSamp/VariancePop.

Author: Reynold Xin <rxin@databricks.com>

Closes #9449 from rxin/SPARK-11490.
2015-11-04 09:34:52 -08:00
Reynold Xin cd1df66238 [SPARK-11485][SQL] Make DataFrameHolder and DatasetHolder public.
These two classes should be public, since they are used in public code.

Author: Reynold Xin <rxin@databricks.com>

Closes #9445 from rxin/SPARK-11485.
2015-11-04 09:32:30 -08:00
Wenchen Fan 2692bdb7db [SPARK-11455][SQL] fix case sensitivity of partition by
depend on `caseSensitive` to do column name equality check, instead of just `==`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #9410 from cloud-fan/partition.
2015-11-03 20:25:58 -08:00
Nong e352de0db2 [SPARK-11329] [SQL] Cleanup from spark-11329 fix.
Author: Nong <nong@cloudera.com>

Closes #9442 from nongli/spark-11483.
2015-11-03 16:44:37 -08:00
Reynold Xin 5051262d4c [SPARK-11489][SQL] Only include common first order statistics in GroupedData
We added a bunch of higher order statistics such as skewness and kurtosis to GroupedData. I don't think they are common enough to justify being listed, since users can always use the normal statistics aggregate functions.

That is to say, after this change, we won't support
```scala
df.groupBy("key").kurtosis("colA", "colB")
```

However, we will still support
```scala
df.groupBy("key").agg(kurtosis(col("colA")), kurtosis(col("colB")))
```

Author: Reynold Xin <rxin@databricks.com>

Closes #9446 from rxin/SPARK-11489.
2015-11-03 16:27:56 -08:00
Wenchen Fan f6fcb4874c [SPARK-11477] [SQL] support create Dataset from RDD
Author: Wenchen Fan <wenchen@databricks.com>

Closes #9434 from cloud-fan/rdd2ds and squashes the following commits:

0892d72 [Wenchen Fan] support create Dataset from RDD
2015-11-04 00:15:50 +01:00
Davies Liu 1d04dc95c0 [SPARK-11467][SQL] add Python API for stddev/variance
Add Python API for stddev/stddev_pop/stddev_samp/variance/var_pop/var_samp/skewness/kurtosis

Author: Davies Liu <davies@databricks.com>

Closes #9424 from davies/py_var.
2015-11-03 13:33:46 -08:00
Cheng Lian ebf8b0b48d [SPARK-10978][SQL] Allow data sources to eliminate filters
This PR adds a new method `unhandledFilters` to `BaseRelation`. Data sources which implement this method properly may avoid the overhead of defensive filtering done by Spark SQL.

Author: Cheng Lian <lian@databricks.com>

Closes #9399 from liancheng/spark-10978.unhandled-filters.
2015-11-03 10:07:45 -08:00
Liang-Chi Hsieh d6035d97c9 [SPARK-10304] [SQL] Partition discovery should throw an exception if the dir structure is invalid
JIRA: https://issues.apache.org/jira/browse/SPARK-10304

This patch detects if the structure of partition directories is not valid.

The test cases are from #8547. Thanks zhzhan.

cc liancheng

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #8840 from viirya/detect_invalid_part_dir.
2015-11-03 07:41:50 -08:00
Daoyuan Wang d188a67762 [SPARK-10533][SQL] handle scientific notation in sqlParser
https://issues.apache.org/jira/browse/SPARK-10533

val df = sqlContext.createDataFrame(Seq(("a",1.0),("b",2.0),("c",3.0)))
df.filter("_2 < 2.0e1").show

Scientific notation didn't work.

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #9085 from adrian-wang/scinotation.
2015-11-03 22:30:23 +08:00
Michael Armbrust b86f2cab67 [SPARK-11404] [SQL] Support for groupBy using column expressions
This PR adds a new method `groupBy(cols: Column*)` to `Dataset` that allows users to group using column expressions instead of a lambda function.  Since the return type of these expressions is not known at compile time, we just set the key type as a generic `Row`.  If the user would like to work the key in a type-safe way, they can call `grouped.asKey[Type]`, which is also added in this PR.

```scala
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
val grouped = ds.groupBy($"_1").asKey[String]
val agged = grouped.mapGroups { case (g, iter) =>
  Iterator((g, iter.map(_._2).sum))
}

agged.collect()

res0: Array(("a", 30), ("b", 3), ("c", 1))
```

Author: Michael Armbrust <michael@databricks.com>

Closes #9359 from marmbrus/columnGroupBy and squashes the following commits:

bbcb03b [Michael Armbrust] Update DatasetSuite.scala
8fd2908 [Michael Armbrust] Update DatasetSuite.scala
0b0e2f8 [Michael Armbrust] [SPARK-11404] [SQL] Support for groupBy using column expressions
2015-11-03 13:02:17 +01:00
Wenchen Fan 425ff03f5a [SPARK-11436] [SQL] rebind right encoder when join 2 datasets
When we join 2 datasets, we will combine 2 encoders into a tupled one, and use it as the encoder for the jioned dataset. Assume both of the 2 encoders are flat, their `constructExpression`s both reference to the first element of input row. However, when we combine 2 encoders, the schema of input row changed,  now the right encoder should reference to second element of input row. So we should rebind right encoder to let it know the new schema of input row before combine it.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #9391 from cloud-fan/join and squashes the following commits:

846d3ab [Wenchen Fan] rebind right encoder when join 2 datasets
2015-11-03 12:47:39 +01:00
Davies Liu 67e23b39ac [SPARK-10429] [SQL] make mutableProjection atomic
Right now, SQL's mutable projection updates every value of the mutable project after it evaluates the corresponding expression. This makes the behavior of MutableProjection confusing and complicate the implementation of common aggregate functions like stddev because developers need to be aware that when evaluating {{i+1}}th expression of a mutable projection, {{i}}th slot of the mutable row has already been updated.

This PR make the MutableProjection atomic, by generating all the results of expressions first, then copy them into mutableRow.

Had run a mircro-benchmark, there is no notable performance difference between using class members and local variables.

cc yhuai

Author: Davies Liu <davies@databricks.com>

Closes #9422 from davies/atomic_mutable and squashes the following commits:

bbc1758 [Davies Liu] support wide table
8a0ae14 [Davies Liu] fix bug
bec07da [Davies Liu] refactor
2891628 [Davies Liu] make mutableProjection atomic
2015-11-03 11:42:08 +01:00
Yin Huai d728d5c986 [SPARK-9858][SPARK-9859][SPARK-9861][SQL] Add an ExchangeCoordinator to estimate the number of post-shuffle partitions for aggregates and joins
https://issues.apache.org/jira/browse/SPARK-9858
https://issues.apache.org/jira/browse/SPARK-9859
https://issues.apache.org/jira/browse/SPARK-9861

Author: Yin Huai <yhuai@databricks.com>

Closes #9276 from yhuai/numReducer.
2015-11-03 00:12:49 -08:00
navis.ryu c34c27fe92 [SPARK-9034][SQL] Reflect field names defined in GenericUDTF
Hive GenericUDTF#initialize() defines field names in a returned schema though,
the current HiveGenericUDTF drops these names.
We might need to reflect these in a logical plan tree.

Author: navis.ryu <navis@apache.org>

Closes #8456 from navis/SPARK-9034.
2015-11-02 23:52:36 -08:00
Yin Huai 9cf56c96b7 [SPARK-11469][SQL] Allow users to define nondeterministic udfs.
This is the first task (https://issues.apache.org/jira/browse/SPARK-11469) of https://issues.apache.org/jira/browse/SPARK-11438

Author: Yin Huai <yhuai@databricks.com>

Closes #9393 from yhuai/udfNondeterministic.
2015-11-02 21:18:38 -08:00
Nong Li 9cb5c731da [SPARK-11329][SQL] Support star expansion for structs.
1. Supporting expanding structs in Projections. i.e.
  "SELECT s.*" where s is a struct type.
  This is fixed by allowing the expand function to handle structs in addition to tables.

2. Supporting expanding * inside aggregate functions of structs.
   "SELECT max(struct(col1, structCol.*))"
   This requires recursively expanding the expressions. In this case, it it the aggregate
   expression "max(...)" and we need to recursively expand its children inputs.

Author: Nong Li <nongli@gmail.com>

Closes #9343 from nongli/spark-11329.
2015-11-02 20:32:08 -08:00
Nong Li 2cef1bb0b5 [SPARK-5354][SQL] Cached tables should preserve partitioning and ord…
…ering.

For cached tables, we can just maintain the partitioning and ordering from the
source relation.

Author: Nong Li <nongli@gmail.com>

Closes #9404 from nongli/spark-5354.
2015-11-02 19:18:45 -08:00
tedyu db11ee5e56 [SPARK-11371] Make "mean" an alias for "avg" operator
From Reynold in the thread 'Exception when using some aggregate operators' (http://search-hadoop.com/m/q3RTt0xFr22nXB4/):

I don't think these are bugs. The SQL standard for average is "avg", not "mean". Similarly, a distinct count is supposed to be written as "count(distinct col)", not "countDistinct(col)".
We can, however, make "mean" an alias for "avg" to improve compatibility between DataFrame and SQL.

Author: tedyu <yuzhihong@gmail.com>

Closes #9332 from ted-yu/master.
2015-11-02 13:51:53 -08:00
Daoyuan Wang 74ba95228d [SPARK-11311][SQL] spark cannot describe temporary functions
When describe temporary function, spark would return 'Unable to find function', this is not right.

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #9277 from adrian-wang/functionreg.
2015-11-02 23:07:30 +08:00
huangzhaowei 767522dc4e [SPARK-10786][SQL] Take the whole statement to generate the CommandProcessor
In the now implementation of `SparkSQLCLIDriver.scala`:
`val proc: CommandProcessor = CommandProcessorFactory.get(Array(tokens(0)), hconf)`
`CommandProcessorFactory` only take the first token of the statement, and this will be hard to diff the statement `delete jar xxx` and `delete from xxx`.
So maybe it's better to take the whole statement into the `CommandProcessorFactory`.

And in [HiveCommand](https://github.com/SaintBacchus/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java#L76), it already special handing these two statement.
```java
if(command.length > 1 && "from".equalsIgnoreCase(command[1])) {
  //special handling for SQL "delete from <table> where..."
  return null;
}
```

Author: huangzhaowei <carlmartinmax@gmail.com>

Closes #8895 from SaintBacchus/SPARK-10786.
2015-11-02 21:31:10 +08:00
Liang-Chi Hsieh 3e770a64a4 [SPARK-9298][SQL] Add pearson correlation aggregation function
JIRA: https://issues.apache.org/jira/browse/SPARK-9298

This patch adds pearson correlation aggregation function based on `AggregateExpression2`.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #8587 from viirya/corr_aggregation.
2015-11-01 18:37:27 -08:00
Nong Li 046e32ed84 [SPARK-11410][SQL] Add APIs to provide functionality similar to Hive's DISTRIBUTE BY and SORT BY.
DISTRIBUTE BY allows the user to hash partition the data by specified exprs. It also allows for
optioning sorting within each resulting partition. There is no required relationship between the
exprs for partitioning and sorting (i.e. one does not need to be a prefix of the other).

This patch adds to APIs to DataFrames which can be used together to provide this functionality:
  1. distributeBy() which partitions the data frame into a specified number of partitions using the
     partitioning exprs.
  2. localSort() which sorts each partition using the provided sorting exprs.

To get the DISTRIBUTE BY functionality, the user simply does: df.distributeBy(...).localSort(...)

Author: Nong Li <nongli@gmail.com>

Closes #9364 from nongli/spark-11410.
2015-11-01 14:34:06 -08:00
Cheng Lian aa494a9c2e [SPARK-11117] [SPARK-11345] [SQL] Makes all HadoopFsRelation data sources produce UnsafeRow
This PR fixes two issues:

1.  `PhysicalRDD.outputsUnsafeRows` is always `false`

    Thus a `ConvertToUnsafe` operator is often required even if the underlying data source relation does output `UnsafeRow`.

1.  Internal/external row conversion for `HadoopFsRelation` is kinda messy

    Currently we're using `HadoopFsRelation.needConversion` and [dirty type erasure hacks][1] to indicate whether the relation outputs external row or internal row and apply external-to-internal conversion when necessary.  Basically, all builtin `HadoopFsRelation` data sources, i.e. Parquet, JSON, ORC, and Text output `InternalRow`, while typical external `HadoopFsRelation` data sources, e.g. spark-avro and spark-csv, output `Row`.

This PR adds a `private[sql]` interface method `HadoopFsRelation.buildInternalScan`, which by default invokes `HadoopFsRelation.buildScan` and converts `Row`s to `UnsafeRow`s (which are also `InternalRow`s).  All builtin `HadoopFsRelation` data sources override this method and directly output `UnsafeRow`s.  In this way, now `HadoopFsRelation` always produces `UnsafeRow`s. Thus `PhysicalRDD.outputsUnsafeRows` can be properly set by checking whether the underlying data source is a `HadoopFsRelation`.

A remaining question is that, can we assume that all non-builtin `HadoopFsRelation` data sources output external rows?  At least all well known ones do so.  However it's possible that some users implemented their own `HadoopFsRelation` data sources that leverages `InternalRow` and thus all those unstable internal data representations.  If this assumption is safe, we can deprecate `HadoopFsRelation.needConversion` and cleanup some more conversion code (like [here][2] and [here][3]).

This PR supersedes #9125.

Follow-ups:

1.  Makes JSON and ORC data sources output `UnsafeRow` directly

1.  Makes `HiveTableScan` output `UnsafeRow` directly

    This is related to 1 since ORC data source shares the same `Writable` unwrapping code with `HiveTableScan`.

[1]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L353
[2]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L331-L335
[3]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L630-L669

Author: Cheng Lian <lian@databricks.com>

Closes #9305 from liancheng/spark-11345.unsafe-hadoop-fs-relation.
2015-10-31 21:16:09 -07:00
Dilip Biswal fc27dfbf0f [SPARK-11024][SQL] Optimize NULL in <inlist-expressions> by folding it to Literal(null)
Add a rule in optimizer to convert NULL [NOT] IN (expr1,...,expr2) to
Literal(null).

This is a follow up defect to SPARK-8654

cloud-fan Can you please take a look ?

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #9348 from dilipbiswal/spark_11024.
2015-10-31 12:55:33 -07:00
Jeff Zhang 97b3c8fb47 [SPARK-11226][SQL] Empty line in json file should be skipped
Currently the empty line in json file will be parsed into Row with all null field values. But in json, "{}" represents a json object, empty line is supposed to be skipped.

Make a trivial change for this.

Author: Jeff Zhang <zjffdu@apache.org>

Closes #9211 from zjffdu/SPARK-11226.
2015-10-31 11:10:37 +00:00
Yin Huai 3c471885dc [SPARK-11434][SPARK-11103][SQL] Fix test ": Filter applied on merged Parquet schema with new column fails"
https://issues.apache.org/jira/browse/SPARK-11434

Author: Yin Huai <yhuai@databricks.com>

Closes #9387 from yhuai/SPARK-11434.
2015-10-30 20:05:07 -07:00
Davies Liu 45029bfdea [SPARK-11423] remove MapPartitionsWithPreparationRDD
Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore.

This PR basically revert #8543, #8511, #8038, #8011

Author: Davies Liu <davies@databricks.com>

Closes #9381 from davies/remove_prepare2.
2015-10-30 15:47:40 -07:00
Wenchen Fan 14d08b9908 [SPARK-11393] [SQL] CoGroupedIterator should respect the fact that GroupedIterator.hasNext is not idempotent
When we cogroup 2 `GroupedIterator`s in `CoGroupedIterator`, if the right side is smaller, we will consume right data and keep the left data unchanged. Then we call `hasNext` which will call `left.hasNext`. This will make `GroupedIterator` generate an extra group as the previous one has not been comsumed yet.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #9346 from cloud-fan/cogroup and squashes the following commits:

9be67c8 [Wenchen Fan] SPARK-11393
2015-10-30 12:17:51 +01:00
hyukjinkwon 59db9e9c38 [SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail
When enabling mergedSchema and predicate filter, this fails since Parquet does not accept filters pushed down when the columns of the filters do not exist in the schema.
This is related with Parquet issue (https://issues.apache.org/jira/browse/PARQUET-389).

For now, it just simply disables predicate push down when using merged schema in this PR.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #9327 from HyukjinKwon/SPARK-11103.
2015-10-30 18:17:35 +08:00
Davies Liu eb59b94c45 [SPARK-11417] [SQL] no @Override in codegen
Older version of Janino (>2.7) does not support Override, we should not use that in codegen.

Author: Davies Liu <davies@databricks.com>

Closes #9372 from davies/no_override.
2015-10-30 00:36:20 -07:00
Davies Liu 56419cf11f [SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative memory management
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.

Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).

The PrepareRDD may be not needed anymore, could be removed in follow up PR.

The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).

```python
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()
```

For thread-safety, here what I'm got:

1) Without calling spill(), the operators should only be used by single thread, no safety problems.

2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.

3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.

4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.

5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).

Author: Davies Liu <davies@databricks.com>

Closes #9241 from davies/force_spill.
2015-10-29 23:38:06 -07:00
Wenchen Fan 96cf87f66d [SPARK-11301] [SQL] fix case sensitivity for filter on partitioned columns
Author: Wenchen Fan <wenchen@databricks.com>

Closes #9271 from cloud-fan/filter.
2015-10-29 16:36:52 -07:00
sethah a01cbf5daa [SPARK-10641][SQL] Add Skewness and Kurtosis Support
Implementing skewness and kurtosis support based on following algorithm:
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics

Author: sethah <seth.hendrickson16@gmail.com>

Closes #9003 from sethah/SPARK-10641.
2015-10-29 11:58:39 -07:00
Dilip Biswal 8185f038c1 [SPARK-11188][SQL] Elide stacktraces in bin/spark-sql for AnalysisExceptions
Only print the error message to the console for Analysis Exceptions in sql-shell.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #9194 from dilipbiswal/spark-11188.
2015-10-29 18:29:50 +01:00
xin Wu f7a51deeba [SPARK-11246] [SQL] Table cache for Parquet broken in 1.5
The root cause is that when spark.sql.hive.convertMetastoreParquet=true by default, the cached InMemoryRelation of the ParquetRelation can not be looked up from the cachedData of CacheManager because the key comparison fails even though it is the same LogicalPlan representing the Subquery that wraps the ParquetRelation.
The solution in this PR is overriding the LogicalPlan.sameResult function in Subquery case class to eliminate subquery node first before directly comparing the child (ParquetRelation), which will find the key  to the cached InMemoryRelation.

Author: xin Wu <xinwu@us.ibm.com>

Closes #9326 from xwu0226/spark-11246-commit.
2015-10-29 07:42:46 -07:00
Wenchen Fan f79ebf2a9e [SPARK-11370] [SQL] fix a bug in GroupedIterator and create unit test for it
Before this PR, user has to consume the iterator of one group before process next group, or we will get into infinite loops.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #9330 from cloud-fan/group.
2015-10-29 11:49:45 +01:00
Wenchen Fan 87f28fc240 [SPARK-11379][SQL] ExpressionEncoder can't handle top level primitive type correctly
For inner primitive type(e.g. inside `Product`), we use `schemaFor` to get the catalyst type for it, https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L403.

However, for top level primitive type, we use `dataTypeFor`, which is wrong.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #9337 from cloud-fan/encoder.
2015-10-29 11:17:03 +01:00
Wenchen Fan 0cb7662d86 [SPARK-11351] [SQL] support hive interval literal
Author: Wenchen Fan <wenchen@databricks.com>

Closes #9304 from cloud-fan/interval.
2015-10-28 21:35:57 -07:00
Cheng Lian e5b89978ed [SPARK-11376][SQL] Removes duplicated mutableRow field
This PR fixes a mistake in the code generated by `GenerateColumnAccessor`. Interestingly, although the code is illegal in Java (the class has two fields with the same name), Janino accepts it happily and accidentally works properly.

Author: Cheng Lian <lian@databricks.com>

Closes #9335 from liancheng/spark-11376.fix-generated-code.
2015-10-29 11:34:54 +08:00
Liang-Chi Hsieh 20dfd46743 [SPARK-11363] [SQL] LeftSemiJoin should be LeftSemi in SparkStrategies
JIRA: https://issues.apache.org/jira/browse/SPARK-11363

In SparkStrategies some places use LeftSemiJoin. It should be LeftSemi.

cc chenghao-intel liancheng

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #9318 from viirya/no-left-semi-join.
2015-10-28 15:57:01 -07:00