Commit graph

207 commits

Author SHA1 Message Date
Patrick Wendell dab37966b0 Revert "[SPARK-1470][SPARK-1842] Use the scala-logging wrapper instead of the directly sfl4j api"
This reverts commit adc8303294.
2014-08-01 23:55:30 -07:00
GuoQiang Li adc8303294 [SPARK-1470][SPARK-1842] Use the scala-logging wrapper instead of the directly sfl4j api
Author: GuoQiang Li <witgo@qq.com>

Closes #1369 from witgo/SPARK-1470_new and squashes the following commits:

66a1641 [GuoQiang Li] IncompatibleResultTypeProblem
73a89ba [GuoQiang Li] Use the scala-logging wrapper instead of the directly sfl4j api.
2014-08-01 23:55:11 -07:00
Yin Huai 3822f33f3c [SPARK-2212][SQL] Hash Outer Join (follow-up bug fix).
We need to carefully set the ouputPartitioning of the HashOuterJoin Operator. Otherwise, we may not correctly handle nulls.

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1721 from yhuai/SPARK-2212-BugFix and squashes the following commits:

ed5eef7 [Yin Huai] Correctly choosing outputPartitioning for the HashOuterJoin operator.
2014-08-01 18:52:01 -07:00
Davies Liu 880eabec37 [SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD
Convert Row in JavaSchemaRDD into Array[Any] and unpickle them as tuple in Python, then convert them into namedtuple, so use can access fields just like attributes.

This will let nested structure can be accessed as object, also it will reduce the size of serialized data and better performance.

root
 |-- field1: integer (nullable = true)
 |-- field2: string (nullable = true)
 |-- field3: struct (nullable = true)
 |    |-- field4: integer (nullable = true)
 |    |-- field5: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |-- field6: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- field7: string (nullable = true)

Then we can access them by row.field3.field5[0]  or row.field6[5].field7

It also will infer the schema in Python, convert Row/dict/namedtuple/objects into tuple before serialization, then call applySchema in JVM. During inferSchema(), the top level of dict in row will be StructType, but any nested dictionary will be MapType.

You can use pyspark.sql.Row to convert unnamed structure into Row object, make the RDD can be inferable. Such as:

ctx.inferSchema(rdd.map(lambda x: Row(a=x[0], b=x[1]))

Or you could use Row to create a class just like namedtuple, for example:

Person = Row("name", "age")
ctx.inferSchema(rdd.map(lambda x: Person(*x)))

Also, you can call applySchema to apply an schema to a RDD of tuple/list and turn it into a SchemaRDD. The `schema` should be StructType, see the API docs for details.

schema = StructType([StructField("name, StringType, True),
                                    StructType("age", IntegerType, True)])
ctx.applySchema(rdd, schema)

PS: In order to use namedtuple to inferSchema, you should make namedtuple picklable.

Author: Davies Liu <davies.liu@gmail.com>

Closes #1598 from davies/nested and squashes the following commits:

f1d15b6 [Davies Liu] verify schema with the first few rows
8852aaf [Davies Liu] check type of schema
abe9e6e [Davies Liu] address comments
61b2292 [Davies Liu] add @deprecated to pythonToJavaMap
1e5b801 [Davies Liu] improve cache of classes
51aa135 [Davies Liu] use Row to infer schema
e9c0d5c [Davies Liu] remove string typed schema
353a3f2 [Davies Liu] fix code style
63de8f8 [Davies Liu] fix typo
c79ca67 [Davies Liu] fix serialization of nested data
6b258b5 [Davies Liu] fix pep8
9d8447c [Davies Liu] apply schema provided by string of names
f5df97f [Davies Liu] refactor, address comments
9d9af55 [Davies Liu] use arrry to applySchema and infer schema in Python
84679b3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into nested
0eaaf56 [Davies Liu] fix doc tests
b3559b4 [Davies Liu] use generated Row instead of namedtuple
c4ddc30 [Davies Liu] fix conflict between name of fields and variables
7f6f251 [Davies Liu] address all comments
d69d397 [Davies Liu] refactor
2cc2d45 [Davies Liu] refactor
182fb46 [Davies Liu] refactor
bc6e9e1 [Davies Liu] switch to new Schema API
547bf3e [Davies Liu] Merge branch 'master' into nested
a435b5a [Davies Liu] add docs and code refactor
2c8debc [Davies Liu] Merge branch 'master' into nested
644665a [Davies Liu] use tuple and namedtuple for schemardd
2014-08-01 18:47:41 -07:00
chutium 580c7011ca [SPARK-2729] [SQL] Forgot to match Timestamp type in ColumnBuilder
just a match forgot, found after SPARK-2710 , TimestampType can be used by a SchemaRDD generated from JDBC ResultSet

Author: chutium <teng.qiu@gmail.com>

Closes #1636 from chutium/SPARK-2729 and squashes the following commits:

71af77a [chutium] [SPARK-2729] [SQL] added Timestamp in NullableColumnAccessorSuite
39cf9f8 [chutium] [SPARK-2729] add Timestamp Type into ColumnBuilder TestSuite, ref. #1636
ab6ff97 [chutium] [SPARK-2729] Forgot to match Timestamp type in ColumnBuilder
2014-08-01 11:31:44 -07:00
Cheng Hao 4415722e91 [SQL][SPARK-2212]Hash Outer Join
This patch is to support the hash based outer join. Currently, outer join for big relations are resort to `BoradcastNestedLoopJoin`, which is super slow. This PR will create 2 hash tables for both relations in the same partition, which greatly reduce the table scans.

Here is the testing code that I used:
```
package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._

case class Record(key: String, value: String)

object JoinTablePrepare extends App {
  import TestHive2._

  val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"${i % 828193}", s"val_$i")))

  runSqlHive("SHOW TABLES")
  runSqlHive("DROP TABLE if exists a")
  runSqlHive("DROP TABLE if exists b")
  runSqlHive("DROP TABLE if exists result")
  rdd.registerAsTable("records")

  runSqlHive("""CREATE TABLE a (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE b (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE result (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)

  hql(s"""from records
             | insert into table a
             | select key, value
           """.stripMargin)
  hql(s"""from records
             | insert into table b select key + 100000, value
           """.stripMargin)
}

object JoinTablePerformanceTest extends App {
  import TestHive2._

  hql("SHOW TABLES")
  hql("set spark.sql.shuffle.partitions=20")

  val leftOuterJoin = "insert overwrite table result select a.key, b.value from a left outer join b on a.key=b.key"
  val rightOuterJoin = "insert overwrite table result select a.key, b.value from a right outer join b on a.key=b.key"
  val fullOuterJoin = "insert overwrite table result select a.key, b.value from a full outer join b on a.key=b.key"

  val results = ("LeftOuterJoin", benchmark(leftOuterJoin)) :: ("LeftOuterJoin", benchmark(leftOuterJoin)) ::
                ("RightOuterJoin", benchmark(rightOuterJoin)) :: ("RightOuterJoin", benchmark(rightOuterJoin)) ::
                ("FullOuterJoin", benchmark(fullOuterJoin)) :: ("FullOuterJoin", benchmark(fullOuterJoin)) :: Nil
  val explains = hql(s"explain $leftOuterJoin").collect ++ hql(s"explain $rightOuterJoin").collect ++ hql(s"explain $fullOuterJoin").collect
  println(explains.mkString(",\n"))
  results.foreach { case (prompt, result) => {
      println(s"$prompt: took ${result._1} ms (${result._2} records)")
    }
  }

  def benchmark(cmd: String) = {
    val begin = System.currentTimeMillis()
    val result = hql(cmd)
    val end = System.currentTimeMillis()
    val count = hql("select count(1) from result").collect.mkString("")
    ((end - begin), count)
  }
}
```
And the result as shown below:
```
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#95,value#98]],
[  HashOuterJoin [key#95], [key#97], LeftOuter, None],
[   Exchange (HashPartitioning [key#95], 20)],
[    HiveTableScan [key#95], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#97], 20)],
[    HiveTableScan [key#97,value#98], (MetastoreRelation default, b, None), None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#102,value#105]],
[  HashOuterJoin [key#102], [key#104], RightOuter, None],
[   Exchange (HashPartitioning [key#102], 20)],
[    HiveTableScan [key#102], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#104], 20)],
[    HiveTableScan [key#104,value#105], (MetastoreRelation default, b, None), None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#109,value#112]],
[  HashOuterJoin [key#109], [key#111], FullOuter, None],
[   Exchange (HashPartitioning [key#109], 20)],
[    HiveTableScan [key#109], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#111], 20)],
[    HiveTableScan [key#111,value#112], (MetastoreRelation default, b, None), None]
LeftOuterJoin: took 16072 ms ([3000000] records)
LeftOuterJoin: took 14394 ms ([3000000] records)
RightOuterJoin: took 14802 ms ([3000000] records)
RightOuterJoin: took 14747 ms ([3000000] records)
FullOuterJoin: took 17715 ms ([6000000] records)
FullOuterJoin: took 17629 ms ([6000000] records)
```

Without this PR, the benchmark will run seems never end.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #1147 from chenghao-intel/hash_based_outer_join and squashes the following commits:

65c599e [Cheng Hao] Fix issues with the community comments
72b1394 [Cheng Hao] Fix bug of stale value in joinedRow
55baef7 [Cheng Hao] Add HashOuterJoin
2014-08-01 11:27:12 -07:00
Yin Huai c41fdf04f4 [SPARK-2179][SQL] A minor refactoring Java data type APIs (2179 follow-up).
It is a follow-up PR of SPARK-2179 (https://issues.apache.org/jira/browse/SPARK-2179). It makes package names of data type APIs more consistent across languages (Scala: `org.apache.spark.sql`, Java: `org.apache.spark.sql.api.java`, Python: `pyspark.sql`).

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1712 from yhuai/javaDataType and squashes the following commits:

62eb705 [Yin Huai] Move package-info.
add4bcb [Yin Huai] Make the package names of data type classes consistent across languages by moving all Java data type classes to package sql.api.java.
2014-08-01 11:14:53 -07:00
Yin Huai 9632719c9e [SPARK-2779] [SQL] asInstanceOf[Map[...]] should use scala.collection.Map instead of scala.collection.immutable.Map
Since we let users create Rows. It makes sense to accept mutable Maps as values of MapType columns.

JIRA: https://issues.apache.org/jira/browse/SPARK-2779

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1705 from yhuai/SPARK-2779 and squashes the following commits:

00d72fd [Yin Huai] Use scala.collection.Map.
2014-07-31 21:02:11 -07:00
Zongheng Yang 8f51491ea7 [SPARK-2531 & SPARK-2436] [SQL] Optimize the BuildSide when planning BroadcastNestedLoopJoin.
This PR resolves the following two tickets:

- [SPARK-2531](https://issues.apache.org/jira/browse/SPARK-2531): BNLJ currently assumes the build side is the right relation. This patch refactors some of its logic to take into account a BuildSide properly.
- [SPARK-2436](https://issues.apache.org/jira/browse/SPARK-2436): building on top of the above, we simply use the physical size statistics (if available) of both relations, and make the smaller relation the build side in the planner.

Author: Zongheng Yang <zongheng.y@gmail.com>

Closes #1448 from concretevitamin/bnlj-buildSide and squashes the following commits:

1780351 [Zongheng Yang] Use size estimation to decide optimal build side of BNLJ.
68e6c5b [Zongheng Yang] Consolidate two adjacent pattern matchings.
96d312a [Zongheng Yang] Use a while loop instead of collection methods chaining.
4bc525e [Zongheng Yang] Make BroadcastNestedLoopJoin take a BuildSide.
2014-07-31 19:32:16 -07:00
Michael Armbrust 3072b96026 [SPARK-2743][SQL] Resolve original attributes in ParquetTableScan
Author: Michael Armbrust <michael@databricks.com>

Closes #1647 from marmbrus/parquetCase and squashes the following commits:

a1799b7 [Michael Armbrust] move comment
2a2a68b [Michael Armbrust] Merge remote-tracking branch 'apache/master' into parquetCase
bb35d5b [Michael Armbrust] Fix test case that produced an invalid plan.
e6870bf [Michael Armbrust] Better error message.
539a2e1 [Michael Armbrust] Resolve original attributes in ParquetTableScan
2014-07-31 11:15:25 -07:00
Matei Zaharia e966284409 SPARK-2045 Sort-based shuffle
This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.)

The main TODOs still left are:
- [x] enabling ExternalSorter to merge across spilled files
  - [x] with an Ordering
  - [x] without an Ordering, using the keys' hash codes
- [x] adding more tests (e.g. a version of our shuffle suite that runs on this)
- [x] rebasing on top of the size-tracking refactoring in #1165 when that is merged
- [x] disabling spilling if spark.shuffle.spill is set to false

Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback.

After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in `sortByKey`.

Author: Matei Zaharia <matei@databricks.com>

Closes #1499 from mateiz/sort-based-shuffle and squashes the following commits:

bd841f9 [Matei Zaharia] Various review comments
d1c137fd [Matei Zaharia] Various review comments
a611159 [Matei Zaharia] Compile fixes due to rebase
62c56c8 [Matei Zaharia] Fix ShuffledRDD sometimes not returning Tuple2s.
f617432 [Matei Zaharia] Fix a failing test (seems to be due to change in SizeTracker logic)
9464d5f [Matei Zaharia] Simplify code and fix conflicts after latest rebase
0174149 [Matei Zaharia] Add cleanup behavior and cleanup tests for sort-based shuffle
eb4ee0d [Matei Zaharia] Remove customizable element type in ShuffledRDD
fa2e8db [Matei Zaharia] Allow nextBatchStream to be called after we're done looking at all streams
a34b352 [Matei Zaharia] Fix tracking of indices within a partition in SpillReader, and add test
03e1006 [Matei Zaharia] Add a SortShuffleSuite that runs ShuffleSuite with sort-based shuffle
3c7ff1f [Matei Zaharia] Obey the spark.shuffle.spill setting in ExternalSorter
ad65fbd [Matei Zaharia] Rebase on top of Aaron's Sorter change, and use Sorter in our buffer
44d2a93 [Matei Zaharia] Use estimateSize instead of atGrowThreshold to test collection sizes
5686f71 [Matei Zaharia] Optimize merging phase for in-memory only data:
5461cbb [Matei Zaharia] Review comments and more tests (e.g. tests with 1 element per partition)
e9ad356 [Matei Zaharia] Update ContextCleanerSuite to make sure shuffle cleanup tests use hash shuffle (since they were written for it)
c72362a [Matei Zaharia] Added bug fix and test for when iterators are empty
de1fb40 [Matei Zaharia] Make trait SizeTrackingCollection private[spark]
4988d16 [Matei Zaharia] tweak
c1b7572 [Matei Zaharia] Small optimization
ba7db7f [Matei Zaharia] Handle null keys in hash-based comparator, and add tests for collisions
ef4e397 [Matei Zaharia] Support for partial aggregation even without an Ordering
4b7a5ce [Matei Zaharia] More tests, and ability to sort data if a total ordering is given
e1f84be [Matei Zaharia] Fix disk block manager test
5a40a1c [Matei Zaharia] More tests
614f1b4 [Matei Zaharia] Add spill metrics to map tasks
cc52caf [Matei Zaharia] Add more error handling and tests for error cases
bbf359d [Matei Zaharia] More work
3a56341 [Matei Zaharia] More partial work towards sort-based shuffle
7a0895d [Matei Zaharia] Some more partial work towards sort-based shuffle
b615476 [Matei Zaharia] Scaffolding for sort-based shuffle
2014-07-30 18:07:59 -07:00
Yin Huai 7003c163db [SPARK-2179][SQL] Public API for DataTypes and Schema
The current PR contains the following changes:
* Expose `DataType`s in the sql package (internal details are private to sql).
* Users can create Rows.
* Introduce `applySchema` to create a `SchemaRDD` by applying a `schema: StructType` to an `RDD[Row]`.
* Add a function `simpleString` to every `DataType`. Also, the schema represented by a `StructType` can be visualized by `printSchema`.
* `ScalaReflection.typeOfObject` provides a way to infer the Catalyst data type based on an object. Also, we can compose `typeOfObject` with some custom logics to form a new function to infer the data type (for different use cases).
* `JsonRDD` has been refactored to use changes introduced by this PR.
* Add a field `containsNull` to `ArrayType`. So, we can explicitly mark if an `ArrayType` can contain null values. The default value of `containsNull` is `false`.

New APIs are introduced in the sql package object and SQLContext. You can find the scaladoc at
[sql package object](http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.package) and [SQLContext](http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext).

An example of using `applySchema` is shown below.
```scala
import org.apache.spark.sql._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val schema =
  StructType(
    StructField("name", StringType, false) ::
    StructField("age", IntegerType, true) :: Nil)

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val peopleSchemaRDD = sqlContext. applySchema(people, schema)
peopleSchemaRDD.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)

peopleSchemaRDD.registerAsTable("people")
sqlContext.sql("select name from people").collect.foreach(println)
```

I will add new contents to the SQL programming guide later.

JIRA: https://issues.apache.org/jira/browse/SPARK-2179

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1346 from yhuai/dataTypeAndSchema and squashes the following commits:

1d45977 [Yin Huai] Clean up.
a6e08b4 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
c712fbf [Yin Huai] Converts types of values based on defined schema.
4ceeb66 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
e5f8df5 [Yin Huai] Scaladoc.
122d1e7 [Yin Huai] Address comments.
03bfd95 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
2476ed0 [Yin Huai] Minor updates.
ab71f21 [Yin Huai] Format.
fc2bed1 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
bd40a33 [Yin Huai] Address comments.
991f860 [Yin Huai] Move "asJavaDataType" and "asScalaDataType" to DataTypeConversions.scala.
1cb35fe [Yin Huai] Add "valueContainsNull" to MapType.
3edb3ae [Yin Huai] Python doc.
692c0b9 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
1d93395 [Yin Huai] Python APIs.
246da96 [Yin Huai] Add java data type APIs to javadoc index.
1db9531 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
d48fc7b [Yin Huai] Minor updates.
33c4fec [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
b9f3071 [Yin Huai] Java API for applySchema.
1c9f33c [Yin Huai] Java APIs for DataTypes and Row.
624765c [Yin Huai] Tests for applySchema.
aa92e84 [Yin Huai] Update data type tests.
8da1a17 [Yin Huai] Add Row.fromSeq.
9c99bc0 [Yin Huai] Several minor updates.
1d9c13a [Yin Huai] Update applySchema API.
85e9b51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
e495e4e [Yin Huai] More comments.
42d47a3 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
c3f4a02 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
2e58dbd [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
b8b7db4 [Yin Huai] 1. Move sql package object and package-info to sql-core. 2. Minor updates on APIs. 3. Update scala doc.
68525a2 [Yin Huai] Update JSON unit test.
3209108 [Yin Huai] Add unit tests.
dcaf22f [Yin Huai] Add a field containsNull to ArrayType to indicate if an array can contain null values or not. If an ArrayType is constructed by "ArrayType(elementType)" (the existing constructor), the value of containsNull is false.
9168b83 [Yin Huai] Update comments.
fc649d7 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
eca7d04 [Yin Huai] Add two apply methods which will be used to extract StructField(s) from a StructType.
949d6bb [Yin Huai] When creating a SchemaRDD for a JSON dataset, users can apply an existing schema.
7a6a7e5 [Yin Huai] Fix bug introduced by the change made on SQLContext.inferSchema.
43a45e1 [Yin Huai] Remove sql.util.package introduced in a previous commit.
0266761 [Yin Huai] Format
03eec4c [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
90460ac [Yin Huai] Infer the Catalyst data type from an object and cast a data value to the expected type.
3fa0df5 [Yin Huai] Provide easier ways to construct a StructType.
16be3e5 [Yin Huai] This commit contains three changes: * Expose `DataType`s in the sql package (internal details are private to sql). * Introduce `createSchemaRDD` to create a `SchemaRDD` from an `RDD` with a provided schema (represented by a `StructType`) and a provided function to construct `Row`, * Add a function `simpleString` to every `DataType`. Also, the schema represented by a `StructType` can be visualized by `printSchema`.
2014-07-30 00:15:31 -07:00
Michael Armbrust 077f633b47 [SQL] Handle null values in debug()
Author: Michael Armbrust <michael@databricks.com>

Closes #1646 from marmbrus/nullDebug and squashes the following commits:

49050a8 [Michael Armbrust] Handle null values in debug()
2014-07-29 22:42:54 -07:00
Michael Armbrust 84467468d4 [SPARK-2054][SQL] Code Generation for Expression Evaluation
Adds a new method for evaluating expressions using code that is generated though Scala reflection.  This functionality is configured by the SQLConf option `spark.sql.codegen` and is currently turned off by default.

Evaluation can be done in several specialized ways:
 - *Projection* - Given an input row, produce a new row from a set of expressions that define each column in terms of the input row.  This can either produce a new Row object or perform the projection in-place on an existing Row (MutableProjection).
 - *Ordering* - Compares two rows based on a list of `SortOrder` expressions
 - *Condition* - Returns `true` or `false` given an input row.

For each of the above operations there is both a Generated and Interpreted version.  When generation for a given expression type is undefined, the code generator falls back on calling the `eval` function of the expression class.  Even without custom code, there is still a potential speed up, as loops are unrolled and code can still be inlined by JIT.

This PR also contains a new type of Aggregation operator, `GeneratedAggregate`, that performs aggregation by using generated `Projection` code.  Currently the required expression rewriting only works for simple aggregations like `SUM` and `COUNT`.  This functionality will be extended in a future PR.

This PR also performs several clean ups that simplified the implementation:
 - The notion of `Binding` all expressions in a tree automatically before query execution has been removed.  Instead it is the responsibly of an operator to provide the input schema when creating one of the specialized evaluators defined above.  In cases when the standard eval method is going to be called, binding can still be done manually using `BindReferences`.  There are a few reasons for this change:  First, there were many operators where it just didn't work before.  For example, operators with more than one child, and operators like aggregation that do significant rewriting of the expression. Second, the semantics of equality with `BoundReferences` are broken.  Specifically, we have had a few bugs where partitioning breaks because of the binding.
 - A copy of the current `SQLContext` is automatically propagated to all `SparkPlan` nodes by the query planner.  Before this was done ad-hoc for the nodes that needed this.  However, this required a lot of boilerplate as one had to always remember to make it `transient` and also had to modify the `otherCopyArgs`.

Author: Michael Armbrust <michael@databricks.com>

Closes #993 from marmbrus/newCodeGen and squashes the following commits:

96ef82c [Michael Armbrust] Merge remote-tracking branch 'apache/master' into newCodeGen
f34122d [Michael Armbrust] Merge remote-tracking branch 'apache/master' into newCodeGen
67b1c48 [Michael Armbrust] Use conf variable in SQLConf object
4bdc42c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into newCodeGen
41a40c9 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into newCodeGen
de22aac [Michael Armbrust] Merge remote-tracking branch 'origin/master' into newCodeGen
fed3634 [Michael Armbrust] Inspectors are not serializable.
ef8d42b [Michael Armbrust] comments
533fdfd [Michael Armbrust] More logging of expression rewriting for GeneratedAggregate.
3cd773e [Michael Armbrust] Allow codegen for Generate.
64b2ee1 [Michael Armbrust] Implement copy
3587460 [Michael Armbrust] Drop unused string builder function.
9cce346 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into newCodeGen
1a61293 [Michael Armbrust] Address review comments.
0672e8a [Michael Armbrust] Address comments.
1ec2d6e [Michael Armbrust] Address comments
033abc6 [Michael Armbrust] off by default
4771fab [Michael Armbrust] Docs, more test coverage.
d30fee2 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into newCodeGen
d2ad5c5 [Michael Armbrust] Refactor putting SQLContext into SparkPlan. Fix ordering, other test cases.
be2cd6b [Michael Armbrust] WIP: Remove old method for reference binding, more work on configuration.
bc88ecd [Michael Armbrust] Style
6cc97ca [Michael Armbrust] Merge remote-tracking branch 'origin/master' into newCodeGen
4220f1e [Michael Armbrust] Better config, docs, etc.
ca6cc6b [Michael Armbrust] WIP
9d67d85 [Michael Armbrust] Fix hive planner
fc522d5 [Michael Armbrust] Hook generated aggregation in to the planner.
e742640 [Michael Armbrust] Remove unneeded changes and code.
675e679 [Michael Armbrust] Upgrade paradise.
0093376 [Michael Armbrust] Comment / indenting cleanup.
d81f998 [Michael Armbrust] include schema for binding.
0e889e8 [Michael Armbrust] Use typeOf instead tq
f623ffd [Michael Armbrust] Quiet logging from test suite.
efad14f [Michael Armbrust] Remove some half finished functions.
92e74a4 [Michael Armbrust] add overrides
a2b5408 [Michael Armbrust] WIP: Code generation with scala reflection.
2014-07-29 20:58:05 -07:00
Michael Armbrust 86534d0f52 [SPARK-2631][SQL] Use SQLConf to configure in-memory columnar caching
Author: Michael Armbrust <michael@databricks.com>

Closes #1638 from marmbrus/cachedConfig and squashes the following commits:

2362082 [Michael Armbrust] Use SQLConf to configure in-memory columnar caching
2014-07-29 18:20:51 -07:00
Zongheng Yang c7db274be7 [SPARK-2393][SQL] Cost estimation optimization framework for Catalyst logical plans & sample usage.
The idea is that every Catalyst logical plan gets hold of a Statistics class, the usage of which provides useful estimations on various statistics. See the implementations of `MetastoreRelation`.

This patch also includes several usages of the estimation interface in the planner. For instance, we now use physical table sizes from the estimate interface to convert an equi-join to a broadcast join (when doing so is beneficial, as determined by a size threshold).

Finally, there are a couple minor accompanying changes including:
- Remove the not-in-use `BaseRelation`.
- Make SparkLogicalPlan take a `SQLContext` in the second param list.

Author: Zongheng Yang <zongheng.y@gmail.com>

Closes #1238 from concretevitamin/estimates and squashes the following commits:

329071d [Zongheng Yang] Address review comments; turn config name from string to field in SQLConf.
8663e84 [Zongheng Yang] Use BigInt for stat; for logical leaves, by default throw an exception.
2f2fb89 [Zongheng Yang] Fix statistics for SparkLogicalPlan.
9951305 [Zongheng Yang] Remove childrenStats.
16fc60a [Zongheng Yang] Avoid calling statistics on plans if auto join conversion is disabled.
8bd2816 [Zongheng Yang] Add a note on performance of statistics.
6e594b8 [Zongheng Yang] Get size info from metastore for MetastoreRelation.
01b7a3e [Zongheng Yang] Update scaladoc for a field and move it to @param section.
549061c [Zongheng Yang] Remove numTuples in Statistics for now.
729a8e2 [Zongheng Yang] Update docs to be more explicit.
573e644 [Zongheng Yang] Remove singleton SQLConf and move back `settings` to the trait.
2d99eb5 [Zongheng Yang] {Cleanup, use synchronized in, enrich} StatisticsSuite.
ca5b825 [Zongheng Yang] Inject SQLContext into SparkLogicalPlan, removing SQLConf mixin from it.
43d38a6 [Zongheng Yang] Revert optimization for BroadcastNestedLoopJoin (this fixes tests).
0ef9e5b [Zongheng Yang] Use multiplication instead of sum for default estimates.
4ef0d26 [Zongheng Yang] Make Statistics a case class.
3ba8f3e [Zongheng Yang] Add comment.
e5bcf5b [Zongheng Yang] Fix optimization conditions & update scala docs to explain.
7d9216a [Zongheng Yang] Apply estimation to planning ShuffleHashJoin & BroadcastNestedLoopJoin.
73cde01 [Zongheng Yang] Move SQLConf back. Assign default sizeInBytes to SparkLogicalPlan.
73412be [Zongheng Yang] Move SQLConf to Catalyst & add default val for sizeInBytes.
7a60ab7 [Zongheng Yang] s/Estimates/Statistics, s/cardinality/numTuples.
de3ae13 [Zongheng Yang] Add parquetAfter() properly in test.
dcff9bd [Zongheng Yang] Cleanups.
84301a4 [Zongheng Yang] Refactors.
5bf5586 [Zongheng Yang] Typo.
56a8e6e [Zongheng Yang] Prototype impl of estimations for Catalyst logical plans.
2014-07-29 15:32:50 -07:00
Davies Liu f0d880e288 [SPARK-2674] [SQL] [PySpark] support datetime type for SchemaRDD
Datetime and time in Python will be converted into java.util.Calendar after serialization, it will be converted into java.sql.Timestamp during inferSchema().

In javaToPython(), Timestamp will be converted into Calendar, then be converted into datetime in Python after pickling.

Author: Davies Liu <davies.liu@gmail.com>

Closes #1601 from davies/date and squashes the following commits:

f0599b0 [Davies Liu] remove tests for sets and tuple in sql, fix list of list
c9d607a [Davies Liu] convert datetype for runtime
709d40d [Davies Liu] remove brackets
96db384 [Davies Liu] support datetime type for SchemaRDD
2014-07-29 12:31:39 -07:00
Cheng Lian a7a9d14479 [SPARK-2410][SQL] Merging Hive Thrift/JDBC server (with Maven profile fix)
JIRA issue: [SPARK-2410](https://issues.apache.org/jira/browse/SPARK-2410)

Another try for #1399 & #1600. Those two PR breaks Jenkins builds because we made a separate profile `hive-thriftserver` in sub-project `assembly`, but the `hive-thriftserver` module is defined outside the `hive-thriftserver` profile. Thus every time a pull request that doesn't touch SQL code will also execute test suites defined in `hive-thriftserver`, but tests fail because related .class files are not included in the assembly jar.

In the most recent commit, module `hive-thriftserver` is moved into its own profile to fix this problem. All previous commits are squashed for clarity.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1620 from liancheng/jdbc-with-maven-fix and squashes the following commits:

629988e [Cheng Lian] Moved hive-thriftserver module definition into its own profile
ec3c7a7 [Cheng Lian] Cherry picked the Hive Thrift server
2014-07-28 12:07:30 -07:00
Patrick Wendell e5bbce9a60 Revert "[SPARK-2410][SQL] Merging Hive Thrift/JDBC server"
This reverts commit f6ff2a61d0.
2014-07-27 18:46:58 -07:00
Cheng Lian f6ff2a61d0 [SPARK-2410][SQL] Merging Hive Thrift/JDBC server
(This is a replacement of #1399, trying to fix potential `HiveThriftServer2` port collision between parallel builds. Please refer to [these comments](https://github.com/apache/spark/pull/1399#issuecomment-50212572) for details.)

JIRA issue: [SPARK-2410](https://issues.apache.org/jira/browse/SPARK-2410)

Merging the Hive Thrift/JDBC server from [branch-1.0-jdbc](https://github.com/apache/spark/tree/branch-1.0-jdbc).

Thanks chenghao-intel for his initial contribution of the Spark SQL CLI.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1600 from liancheng/jdbc and squashes the following commits:

ac4618b [Cheng Lian] Uses random port for HiveThriftServer2 to avoid collision with parallel builds
090beea [Cheng Lian] Revert changes related to SPARK-2678, decided to move them to another PR
21c6cf4 [Cheng Lian] Updated Spark SQL programming guide docs
fe0af31 [Cheng Lian] Reordered spark-submit options in spark-shell[.cmd]
199e3fb [Cheng Lian] Disabled MIMA for hive-thriftserver
1083e9d [Cheng Lian] Fixed failed test suites
7db82a1 [Cheng Lian] Fixed spark-submit application options handling logic
9cc0f06 [Cheng Lian] Starts beeline with spark-submit
cfcf461 [Cheng Lian] Updated documents and build scripts for the newly added hive-thriftserver profile
061880f [Cheng Lian] Addressed all comments by @pwendell
7755062 [Cheng Lian] Adapts test suites to spark-submit settings
40bafef [Cheng Lian] Fixed more license header issues
e214aab [Cheng Lian] Added missing license headers
b8905ba [Cheng Lian] Fixed minor issues in spark-sql and start-thriftserver.sh
f975d22 [Cheng Lian] Updated docs for Hive compatibility and Shark migration guide draft
3ad4e75 [Cheng Lian] Starts spark-sql shell with spark-submit
a5310d1 [Cheng Lian] Make HiveThriftServer2 play well with spark-submit
61f39f4 [Cheng Lian] Starts Hive Thrift server via spark-submit
2c4c539 [Cheng Lian] Cherry picked the Hive Thrift server
2014-07-27 13:03:38 -07:00
Michael Armbrust afd757a241 Revert "[SPARK-2410][SQL] Merging Hive Thrift/JDBC server"
This reverts commit 06dc0d2c6b.

#1399 is making Jenkins fail.  We should investigate and put this back after its passing tests.

Author: Michael Armbrust <michael@databricks.com>

Closes #1594 from marmbrus/revertJDBC and squashes the following commits:

59748da [Michael Armbrust] Revert "[SPARK-2410][SQL] Merging Hive Thrift/JDBC server"
2014-07-25 15:36:57 -07:00
Cheng Lian 06dc0d2c6b [SPARK-2410][SQL] Merging Hive Thrift/JDBC server
JIRA issue:

- Main: [SPARK-2410](https://issues.apache.org/jira/browse/SPARK-2410)
- Related: [SPARK-2678](https://issues.apache.org/jira/browse/SPARK-2678)

Cherry picked the Hive Thrift/JDBC server from [branch-1.0-jdbc](https://github.com/apache/spark/tree/branch-1.0-jdbc).

(Thanks chenghao-intel for his initial contribution of the Spark SQL CLI.)

TODO

- [x] Use `spark-submit` to launch the server, the CLI and beeline
- [x] Migration guideline draft for Shark users

----

Hit by a bug in `SparkSubmitArguments` while working on this PR: all application options that are recognized by `SparkSubmitArguments` are stolen as `SparkSubmit` options. For example:

```bash
$ spark-submit --class org.apache.hive.beeline.BeeLine spark-internal --help
```

This actually shows usage information of `SparkSubmit` rather than `BeeLine`.

~~Fixed this bug here since the `spark-internal` related stuff also touches `SparkSubmitArguments` and I'd like to avoid conflict.~~

**UPDATE** The bug mentioned above is now tracked by [SPARK-2678](https://issues.apache.org/jira/browse/SPARK-2678). Decided to revert changes to this bug since it involves more subtle considerations and worth a separate PR.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1399 from liancheng/thriftserver and squashes the following commits:

090beea [Cheng Lian] Revert changes related to SPARK-2678, decided to move them to another PR
21c6cf4 [Cheng Lian] Updated Spark SQL programming guide docs
fe0af31 [Cheng Lian] Reordered spark-submit options in spark-shell[.cmd]
199e3fb [Cheng Lian] Disabled MIMA for hive-thriftserver
1083e9d [Cheng Lian] Fixed failed test suites
7db82a1 [Cheng Lian] Fixed spark-submit application options handling logic
9cc0f06 [Cheng Lian] Starts beeline with spark-submit
cfcf461 [Cheng Lian] Updated documents and build scripts for the newly added hive-thriftserver profile
061880f [Cheng Lian] Addressed all comments by @pwendell
7755062 [Cheng Lian] Adapts test suites to spark-submit settings
40bafef [Cheng Lian] Fixed more license header issues
e214aab [Cheng Lian] Added missing license headers
b8905ba [Cheng Lian] Fixed minor issues in spark-sql and start-thriftserver.sh
f975d22 [Cheng Lian] Updated docs for Hive compatibility and Shark migration guide draft
3ad4e75 [Cheng Lian] Starts spark-sql shell with spark-submit
a5310d1 [Cheng Lian] Make HiveThriftServer2 play well with spark-submit
61f39f4 [Cheng Lian] Starts Hive Thrift server via spark-submit
2c4c539 [Cheng Lian] Cherry picked the Hive Thrift server
2014-07-25 12:20:49 -07:00
Yin Huai b352ef175c [SPARK-2603][SQL] Remove unnecessary toMap and toList in converting Java collections to Scala collections JsonRDD.scala
In JsonRDD.scalafy, we are using toMap/toList to convert a Java Map/List to a Scala one. These two operations are pretty expensive because they read elements from a Java Map/List and then load to a Scala Map/List. We can use Scala wrappers to wrap those Java collections instead of using toMap/toList.

I did a quick test to see the performance. I had a 2.9GB cached RDD[String] storing one JSON object per record (twitter dataset). My simple test program is attached below.
```scala
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

val jsonData = sc.textFile("...")
jsonData.cache.count

val jsonSchemaRDD = sqlContext.jsonRDD(jsonData)
jsonSchemaRDD.registerAsTable("jt")

sqlContext.sql("select count(*) from jt").collect
```
Stages for the schema inference and the table scan both had 48 tasks. These tasks were executed sequentially. For the current implementation, scanning the JSON dataset will materialize values of all fields of a record. The inferred schema of the dataset can be accessed at https://gist.github.com/yhuai/05fe8a57c638c6666f8d.

From the result, there was no significant difference on running `jsonRDD`. For the simple aggregation query, results are attached below.
```
Original:
Run 1: 26.1s
Run 2: 27.03s
Run 3: 27.035s

With this change:
Run 1: 21.086s
Run 2: 21.035s
Run 3: 21.029s
```

JIRA: https://issues.apache.org/jira/browse/SPARK-2603

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1504 from yhuai/removeToMapToList and squashes the following commits:

6831b77 [Yin Huai] Fix failed tests.
09b9bca [Yin Huai] Merge remote-tracking branch 'upstream/master' into removeToMapToList
d1abdb8 [Yin Huai] Remove unnecessary toMap and toList.
2014-07-24 11:19:19 -07:00
Ian O Connell efdaeb1119 [SPARK-2102][SQL][CORE] Add option for kryo registration required and use a resource pool in Spark SQL for Kryo instances.
Author: Ian O Connell <ioconnell@twitter.com>

Closes #1377 from ianoc/feature/SPARK-2102 and squashes the following commits:

5498566 [Ian O Connell] Docs update suggested by Patrick
20e8555 [Ian O Connell] Slight style change
f92c294 [Ian O Connell] Add docs for new KryoSerializer option
f3735c8 [Ian O Connell] Add using a kryo resource pool for the SqlSerializer
4e5c342 [Ian O Connell] Register the SparkConf for kryo, it gets swept into serialization
665805a [Ian O Connell] Add a spark.kryo.registrationRequired option for configuring the Kryo Serializer
2014-07-23 16:30:11 -07:00
Takuya UESHIN 1b790cf775 [SPARK-2588][SQL] Add some more DSLs.
Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1491 from ueshin/issues/SPARK-2588 and squashes the following commits:

43d0a46 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-2588
1023ea0 [Takuya UESHIN] Modify tests to use DSLs.
2310bf1 [Takuya UESHIN] Add some more DSLs.
2014-07-23 14:47:23 -07:00
Michael Armbrust 511a731403 [SPARK-2561][SQL] Fix apply schema
We need to use the analyzed attributes otherwise we end up with a tree that will never resolve.

Author: Michael Armbrust <michael@databricks.com>

Closes #1470 from marmbrus/fixApplySchema and squashes the following commits:

f968195 [Michael Armbrust] Use analyzed attributes when applying the schema.
4969015 [Michael Armbrust] Add test case.
2014-07-21 18:18:17 -07:00
Cheng Lian cd273a2381 [SPARK-2190][SQL] Specialized ColumnType for Timestamp
JIRA issue: [SPARK-2190](https://issues.apache.org/jira/browse/SPARK-2190)

Added specialized in-memory column type for `Timestamp`. Whitelisted all timestamp related Hive tests except `timestamp_udf`, which is timezone sensitive.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1440 from liancheng/timestamp-column-type and squashes the following commits:

e682175 [Cheng Lian] Enabled more timezone sensitive Hive tests.
53a358f [Cheng Lian] Fixed failed test suites
01b592d [Cheng Lian] Fixed SimpleDateFormat thread safety issue
2a59343 [Cheng Lian] Removed timezone sensitive Hive timestamp tests
45dd05d [Cheng Lian] Added Timestamp specific in-memory columnar representation
2014-07-21 00:46:28 -07:00
chutium 2a732110d4 SPARK-2407: Added Parser of SQL SUBSTR()
follow-up of #1359

Author: chutium <teng.qiu@gmail.com>

Closes #1442 from chutium/master and squashes the following commits:

b49cc8a [chutium] SPARK-2407: Added Parser of SQL SUBSTRING() #1442
9a60ccf [chutium] SPARK-2407: Added Parser of SQL SUBSTR() #1442
06e933b [chutium] Merge https://github.com/apache/spark
c870172 [chutium] Merge https://github.com/apache/spark
094f773 [chutium] Merge https://github.com/apache/spark
88cb37d [chutium] Merge https://github.com/apache/spark
1de83a7 [chutium] SPARK-2407: Added Parse of SQL SUBSTR()
2014-07-19 11:04:41 -05:00
Yin Huai df95d82da7 [SPARK-2525][SQL] Remove as many compilation warning messages as possible in Spark SQL
JIRA: https://issues.apache.org/jira/browse/SPARK-2525.

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1444 from yhuai/SPARK-2517 and squashes the following commits:

edbac3f [Yin Huai] Removed some compiler type erasure warnings.
2014-07-16 10:53:59 -07:00
Cheng Lian efc452a163 [SPARK-2119][SQL] Improved Parquet performance when reading off S3
JIRA issue: [SPARK-2119](https://issues.apache.org/jira/browse/SPARK-2119)

Essentially this PR fixed three issues to gain much better performance when reading large Parquet file off S3.

1. When reading the schema, fetching Parquet metadata from a part-file rather than the `_metadata` file

   The `_metadata` file contains metadata of all row groups, and can be very large if there are many row groups. Since schema information and row group metadata are coupled within a single Thrift object, we have to read the whole `_metadata` to fetch the schema. On the other hand, schema is replicated among footers of all part-files, which are fairly small.

1. Only add the root directory of the Parquet file rather than all the part-files to input paths

   HDFS API can automatically filter out all hidden files and underscore files (`_SUCCESS` & `_metadata`), there's no need to filter out all part-files and add them individually to input paths. What make it much worse is that, `FileInputFormat.listStatus()` calls `FileSystem.globStatus()` on each individual input path sequentially, each results a blocking remote S3 HTTP request.

1. Worked around [PARQUET-16](https://issues.apache.org/jira/browse/PARQUET-16)

   Essentially PARQUET-16 is similar to the above issue, and results lots of sequential `FileSystem.getFileStatus()` calls, which are further translated into a bunch of remote S3 HTTP requests.

   `FilteringParquetRowInputFormat` should be cleaned up once PARQUET-16 is fixed.

Below is the micro benchmark result. The dataset used is a S3 Parquet file consists of 3,793 partitions, about 110MB per partition in average. The benchmark is done with a 9-node AWS cluster.

- Creating a Parquet `SchemaRDD` (Parquet schema is fetched)

  ```scala
  val tweets = parquetFile(uri)
  ```

  - Before: 17.80s
  - After: 8.61s

- Fetching partition information

  ```scala
  tweets.getPartitions
  ```

  - Before: 700.87s
  - After: 21.47s

- Counting the whole file (both steps above are executed altogether)

  ```scala
  parquetFile(uri).count()
  ```

  - Before: ??? (haven't test yet)
  - After: 53.26s

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1370 from liancheng/faster-parquet and squashes the following commits:

94a2821 [Cheng Lian] Added comments about schema consistency
d2c4417 [Cheng Lian] Worked around PARQUET-16 to improve Parquet performance
1c0d1b9 [Cheng Lian] Accelerated Parquet schema retrieving
5bd3d29 [Cheng Lian] Fixed Parquet log level
2014-07-16 12:44:51 -04:00
Aaron Staple 90ca532a0f [SPARK-2314][SQL] Override collect and take in JavaSchemaRDD, forwarding to SchemaRDD implementations.
Author: Aaron Staple <aaron.staple@gmail.com>

Closes #1421 from staple/SPARK-2314 and squashes the following commits:

73e04dc [Aaron Staple] [SPARK-2314] Override collect and take in JavaSchemaRDD, forwarding to SchemaRDD implementations.
2014-07-15 21:35:36 -07:00
Michael Armbrust 502f90782a [SQL] Attribute equality comparisons should be done by exprId.
Author: Michael Armbrust <michael@databricks.com>

Closes #1414 from marmbrus/exprIdResolution and squashes the following commits:

97b47bc [Michael Armbrust] Attribute equality comparisons should be done by exprId.
2014-07-15 17:56:17 -07:00
Takuya UESHIN 9fe693b5b6 [SPARK-2446][SQL] Add BinaryType support to Parquet I/O.
Note that this commit changes the semantics when loading in data that was created with prior versions of Spark SQL.  Before, we were writing out strings as Binary data without adding any other annotations. Thus, when data is read in from prior versions, data that was StringType will now become BinaryType.  Users that need strings can CAST that column to a String.  It was decided that while this breaks compatibility, it does make us compatible with other systems (Hive, Thrift, etc) and adds support for Binary data, so this is the right decision long term.

To support `BinaryType`, the following changes are needed:
- Make `StringType` use `OriginalType.UTF8`
- Add `BinaryType` using `PrimitiveTypeName.BINARY` without `OriginalType`

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1373 from ueshin/issues/SPARK-2446 and squashes the following commits:

ecacb92 [Takuya UESHIN] Add BinaryType support to Parquet I/O.
616e04a [Takuya UESHIN] Make StringType use OriginalType.UTF8.
2014-07-14 15:42:35 -07:00
Michael Armbrust 1a7d7cc85f [SPARK-2405][SQL] Reusue same byte buffers when creating new instance of InMemoryRelation
Reuse byte buffers when creating unique attributes for multiple instances of an InMemoryRelation in a single query plan.

Author: Michael Armbrust <michael@databricks.com>

Closes #1332 from marmbrus/doubleCache and squashes the following commits:

4a19609 [Michael Armbrust] Clean up concurrency story by calculating buffersn the constructor.
b39c931 [Michael Armbrust] Allocations are kind of a side effect.
f67eff7 [Michael Armbrust] Reusue same byte buffers when creating new instance of InMemoryRelation
2014-07-12 12:13:32 -07:00
Michael Armbrust 7e26b57615 [SPARK-2441][SQL] Add more efficient distinct operator.
Author: Michael Armbrust <michael@databricks.com>

Closes #1366 from marmbrus/partialDistinct and squashes the following commits:

12a31ab [Michael Armbrust] Add more efficient distinct operator.
2014-07-12 12:07:27 -07:00
Takuya UESHIN 10b59ba230 [SPARK-2428][SQL] Add except and intersect methods to SchemaRDD.
Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1355 from ueshin/issues/SPARK-2428 and squashes the following commits:

b6fa264 [Takuya UESHIN] Add except and intersect methods to SchemaRDD.
2014-07-10 19:27:24 -07:00
Takuya UESHIN f5abd27129 [SPARK-2415] [SQL] RowWriteSupport should handle empty ArrayType correctly.
`RowWriteSupport` doesn't write empty `ArrayType` value, so the read value becomes `null`.
It should write empty `ArrayType` value as it is.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1339 from ueshin/issues/SPARK-2415 and squashes the following commits:

32afc87 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-2415
2f05196 [Takuya UESHIN] Fix RowWriteSupport to handle empty ArrayType correctly.
2014-07-10 19:23:44 -07:00
Patrick Wendell 553c578de1 HOTFIX: Remove persistently failing test in master.
Apparently this functionality is going to be removed soon anywyas.
2014-07-09 19:44:24 -07:00
Patrick Wendell dd22bc2d57 Revert "[HOTFIX] Synchronize on SQLContext.settings in tests."
This reverts commit d4c30cd991.
2014-07-09 19:36:38 -07:00
Reynold Xin 32516f866a [SPARK-2409] Make SQLConf thread safe.
Author: Reynold Xin <rxin@apache.org>

Closes #1334 from rxin/sqlConfThreadSafetuy and squashes the following commits:

c1e0a5a [Reynold Xin] Fixed the duplicate comment.
7614372 [Reynold Xin] [SPARK-2409] Make SQLConf thread safe.
2014-07-08 14:00:47 -07:00
Michael Armbrust 5a4063645d [SPARK-2391][SQL] Custom take() for LIMIT queries.
Using Spark's take can result in an entire in-memory partition to be shipped in order to retrieve a single row.

Author: Michael Armbrust <michael@databricks.com>

Closes #1318 from marmbrus/takeLimit and squashes the following commits:

77289a5 [Michael Armbrust] Update scala doc
32f0674 [Michael Armbrust] Custom take implementation for LIMIT queries.
2014-07-08 00:41:46 -07:00
witgo 3cd5029be7 Resolve sbt warnings during build Ⅱ
Author: witgo <witgo@qq.com>

Closes #1153 from witgo/expectResult and squashes the following commits:

97541d8 [witgo] merge master
ead26e7 [witgo] Resolve sbt warnings during build
2014-07-08 00:31:42 -07:00
Yanjie Gao 50561f4396 [SPARK-2235][SQL]Spark SQL basicOperator add Intersect operator
Hi all,
I want to submit a basic operator Intersect
For example , in sql case
select * from table1
intersect
select * from table2
So ,i want use this operator support this function in Spark SQL
This operator will return the  the intersection of SparkPlan child table RDD .
JIRA:https://issues.apache.org/jira/browse/SPARK-2235

Author: Yanjie Gao <gaoyanjie55@163.com>
Author: YanjieGao <396154235@qq.com>

Closes #1150 from YanjieGao/patch-5 and squashes the following commits:

4629afe [YanjieGao] reformat the code
bdc2ac0 [YanjieGao] reformat the code as Michael's suggestion
3b29ad6 [YanjieGao] Merge remote branch 'upstream/master' into patch-5
1cfbfe6 [YanjieGao] refomat some files
ea78f33 [YanjieGao] resolve conflict and add annotation on basicOperator and remove HiveQl
0c7cca5 [YanjieGao] modify format problem
a802ca8 [YanjieGao] Merge remote branch 'upstream/master' into patch-5
5e374c7 [YanjieGao] resolve conflict in SparkStrategies and basicOperator
f7961f6 [Yanjie Gao] update the line less than
bdc4a05 [Yanjie Gao] Update basicOperators.scala
0b49837 [Yanjie Gao] delete the annotation
f1288b4 [Yanjie Gao] delete annotation
e2b64be [Yanjie Gao] Update basicOperators.scala
4dd453e [Yanjie Gao] Update SQLQuerySuite.scala
790765d [Yanjie Gao] Update SparkStrategies.scala
ac73e60 [Yanjie Gao] Update basicOperators.scala
d4ac5e5 [Yanjie Gao] Update HiveQl.scala
61e88e7 [Yanjie Gao] Update SqlParser.scala
469f099 [Yanjie Gao] Update basicOperators.scala
e5bff61 [Yanjie Gao] Spark SQL basicOperator add Intersect operator
2014-07-07 19:40:04 -07:00
Yin Huai 4352a2fdaa [SPARK-2376][SQL] Selecting list values inside nested JSON objects raises java.lang.IllegalArgumentException
JIRA: https://issues.apache.org/jira/browse/SPARK-2376

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1320 from yhuai/SPARK-2376 and squashes the following commits:

0107417 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2376
480803d [Yin Huai] Correctly handling JSON arrays in PySpark.
2014-07-07 18:37:38 -07:00
Yin Huai f0496ee108 [SPARK-2375][SQL] JSON schema inference may not resolve type conflicts correctly for a field inside an array of structs
For example, for
```
{"array": [{"field":214748364700}, {"field":1}]}
```
the type of field is resolved as IntType. While, for
```
{"array": [{"field":1}, {"field":214748364700}]}
```
the type of field is resolved as LongType.

JIRA: https://issues.apache.org/jira/browse/SPARK-2375

Author: Yin Huai <huaiyin.thu@gmail.com>

Closes #1308 from yhuai/SPARK-2375 and squashes the following commits:

3e2e312 [Yin Huai] Update unit test.
1b2ff9f [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2375
10794eb [Yin Huai] Correctly resolve the type of a field inside an array of structs.
2014-07-07 17:05:59 -07:00
Takuya UESHIN 4deeed17c4 [SPARK-2386] [SQL] RowWriteSupport should use the exact types to cast.
When execute `saveAsParquetFile` with non-primitive type, `RowWriteSupport` uses wrong type `Int` for `ByteType` and `ShortType`.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1315 from ueshin/issues/SPARK-2386 and squashes the following commits:

20d89ec [Takuya UESHIN] Use None instead of null.
bd88741 [Takuya UESHIN] Add a test.
323d1d2 [Takuya UESHIN] Modify RowWriteSupport to use the exact types to cast.
2014-07-07 17:04:02 -07:00
Yin Huai c0b4cf097d [SPARK-2339][SQL] SQL parser in sql-core is case sensitive, but a table alias is converted to lower case when we create Subquery
Reported by http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-throws-exception-td8599.html
After we get the table from the catalog, because the table has an alias, we will temporarily insert a Subquery. Then, we convert the table alias to lower case no matter if the parser is case sensitive or not.
To see the issue ...
```
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD

case class Person(name: String, age: Int)

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerAsTable("people")

sqlContext.sql("select PEOPLE.name from people PEOPLE")
```
The plan is ...
```
== Query Plan ==
Project ['PEOPLE.name]
 ExistingRdd [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:176
```
You can find that `PEOPLE.name` is not resolved.

This PR introduces three changes.
1.  If a table has an alias, the catalog will not lowercase the alias. If a lowercase alias is needed, the analyzer will do the work.
2.  A catalog has a new val caseSensitive that indicates if this catalog is case sensitive or not. For example, a SimpleCatalog is case sensitive, but
3.  Corresponding unit tests.
With this PR, case sensitivity of database names and table names is handled by the catalog. Case sensitivity of other identifiers are handled by the analyzer.

JIRA: https://issues.apache.org/jira/browse/SPARK-2339

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1317 from yhuai/SPARK-2339 and squashes the following commits:

12d8006 [Yin Huai] Handling case sensitivity correctly. This patch introduces three changes. 1. If a table has an alias, the catalog will not lowercase the alias. If a lowercase alias is needed, the analyzer will do the work. 2. A catalog has a new val caseSensitive that indicates if this catalog is case sensitive or not. For example, a SimpleCatalog is case sensitive, but 3. Corresponding unit tests. With this patch, case sensitivity of database names and table names is handled by the catalog. Case sensitivity of other identifiers is handled by the analyzer.
2014-07-07 17:01:44 -07:00
Takuya UESHIN 9d5ecf8205 [SPARK-2327] [SQL] Fix nullabilities of Join/Generate/Aggregate.
Fix nullabilities of `Join`/`Generate`/`Aggregate` because:
- Output attributes of opposite side of `OuterJoin` should be nullable.
- Output attributes of generater side of `Generate` should be nullable if `join` is `true` and `outer` is `true`.
- `AttributeReference` of `computedAggregates` of `Aggregate` should be the same as `aggregateExpression`'s.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1266 from ueshin/issues/SPARK-2327 and squashes the following commits:

3ace83a [Takuya UESHIN] Add withNullability to Attribute and use it to change nullabilities.
df1ae53 [Takuya UESHIN] Modify nullabilize to leave attribute if not resolved.
799ce56 [Takuya UESHIN] Add nullabilization to Generate of SparkPlan.
a0fc9bc [Takuya UESHIN] Fix scalastyle errors.
0e31e37 [Takuya UESHIN] Fix Aggregate resultAttribute nullabilities.
09532ec [Takuya UESHIN] Fix Generate output nullabilities.
f20f196 [Takuya UESHIN] Fix Join output nullabilities.
2014-07-05 11:51:48 -07:00
Yanjie Gao 5dadda8645 [SPARK-2234][SQL]Spark SQL basicOperators add Except operator
Hi all,
I want to submit a Except operator in basicOperators.scala
In SQL case.SQL support two table do except operator.
select * from table1
except
select * from table2
This operator support the substract function .Return an table with the elements from `this` that are not in `other`.This operator should limit the input SparkPlan Seq only has two member.The check will later support
JIRA:https://issues.apache.org/jira/browse/SPARK-2234

Author: Yanjie Gao <gaoyanjie55@163.com>
Author: YanjieGao <396154235@qq.com>
Author: root <root@node4.(none)>
Author: gaoyanjie <gaoyanjie55@163.com>

Closes #1151 from YanjieGao/patch-6 and squashes the following commits:

f19f899 [YanjieGao] add a new blank line in basicoperators.scala
2ff7d73 [YanjieGao] resolve the identation in SqlParser and SparkStrategies
fdb5227 [YanjieGao] Merge remote branch 'upstream/master' into patch-6
9940d19 [YanjieGao] make comment less than 100c
09c7413 [YanjieGao] pr 1151 SqlParser add cache ,basic Operator rename Except and modify comment
b4b5867 [root] Merge remote branch 'upstream/master' into patch-6
b4c3869 [Yanjie Gao] change SparkStrategies Sparkcontext to SqlContext
7e0ec29 [Yanjie Gao] delete multi test
7e7c83f [Yanjie Gao] delete conflict except
b01beb8 [YanjieGao] resolve conflict sparkstrategies and basicOperators
4dc8166 [YanjieGao] resolve conflict
fa68a98 [Yanjie Gao] Update joins.scala
8e6bb00 [Yanjie Gao] delete conflict except
dd9ba5e [Yanjie Gao] Update joins.scala
a0d4e73 [Yanjie Gao] delete skew join
60f5ddd [Yanjie Gao] update less than 100c
0e72233 [Yanjie Gao] update SQLQuerySuite on master branch
7f916b5 [Yanjie Gao] update execution/basicOperators on master branch
a28dece [Yanjie Gao] Update logical/basicOperators on master branch
a639935 [Yanjie Gao] Update SparkStrategies.scala
3bf7def [Yanjie Gao] update SqlParser on master branch
26f833f [Yanjie Gao] update SparkStrategies.scala on master branch
8dd063f [Yanjie Gao] Update logical/basicOperators on master branch
9847dcf [Yanjie Gao] update SqlParser on masterbranch
d6a4604 [Yanjie Gao] Update joins.scala
424c507 [Yanjie Gao] Update joins.scala
7680742 [Yanjie Gao] Update SqlParser.scala
a7193d8 [gaoyanjie] [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151
5c8a224 [Yanjie Gao] update the line less than 100c
ee066b3 [Yanjie Gao] Update basicOperators.scala
32a80ab [Yanjie Gao] remove except in HiveQl
cf232eb [Yanjie Gao] update 1comment 2space3 left.out
f1ea3f3 [Yanjie Gao] remove comment
7ea9b91 [Yanjie Gao] remove annotation
7f3d613 [Yanjie Gao] update .map(_.copy())
670a1bb [Yanjie Gao] Update HiveQl.scala
3fe7746 [Yanjie Gao] Update SQLQuerySuite.scala
a36eb0a [Yanjie Gao] Update basicOperators.scala
7859e56 [Yanjie Gao] Update SparkStrategies.scala
052346d [Yanjie Gao] Subtract is conflict with Subtract(e1,e2)
aab3785 [Yanjie Gao] Update SQLQuerySuite.scala
4bf80b1 [Yanjie Gao] update subtract to except
4bdd520 [Yanjie Gao] Update SqlParser.scala
2d4bfbd [Yanjie Gao] Update SQLQuerySuite.scala
0808921 [Yanjie Gao] SQLQuerySuite
a8a1948 [Yanjie Gao] SparkStrategies
1fe96c0 [Yanjie Gao] HiveQl.scala update
3305e40 [Yanjie Gao] SqlParser
7a98c37 [Yanjie Gao] Update basicOperators.scala
cf5b9d0 [Yanjie Gao] Update basicOperators.scala
8945835 [Yanjie Gao] object SkewJoin extends Strategy
2b98962 [Yanjie Gao] Update SqlParser.scala
dd32980 [Yanjie Gao] update1
68815b2 [Yanjie Gao] Reformat the code style
4eb43ec [Yanjie Gao] Update basicOperators.scala
aa06072 [Yanjie Gao] Reformat the code sytle
2014-07-04 02:43:57 -07:00
baishuo(白硕) 0bbe61223e Update SQLConf.scala
use concurrent.ConcurrentHashMap instead of util.Collections.synchronizedMap

Author: baishuo(白硕) <vc_java@hotmail.com>

Closes #1272 from baishuo/master and squashes the following commits:

51ec55d [baishuo(白硕)] Update SQLConf.scala
63da043 [baishuo(白硕)] Update SQLConf.scala
36b6dbd [baishuo(白硕)] Update SQLConf.scala
864faa0 [baishuo(白硕)] Update SQLConf.scala
593096b [baishuo(白硕)] Update SQLConf.scala
7304d9b [baishuo(白硕)] Update SQLConf.scala
843581c [baishuo(白硕)] Update SQLConf.scala
1d3e4a2 [baishuo(白硕)] Update SQLConf.scala
0740f28 [baishuo(白硕)] Update SQLConf.scala
2014-07-04 00:25:31 -07:00
Cheng Lian 544880457d [SPARK-2059][SQL] Don't throw TreeNodeException in execution.ExplainCommand
This is a fix for the problem revealed by PR #1265.

Currently `HiveComparisonSuite` ignores output of `ExplainCommand` since Catalyst query plan is quite different from Hive query plan. But exceptions throw from `CheckResolution` still breaks test cases. This PR catches any `TreeNodeException` and reports it as part of the query explanation.

After merging this PR, PR #1265 can also be merged safely.

For a normal query:

```
scala> hql("explain select key from src").foreach(println)
...
[Physical execution plan:]
[HiveTableScan [key#9], (MetastoreRelation default, src, None), None]
```

For a wrong query with unresolved attribute(s):

```
scala> hql("explain select kay from src").foreach(println)
...
[Error occurred during query planning: ]
[Unresolved attributes: 'kay, tree:]
[Project ['kay]]
[ LowerCaseSchema ]
[  MetastoreRelation default, src, None]
```

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1294 from liancheng/safe-explain and squashes the following commits:

4318911 [Cheng Lian] Don't throw TreeNodeException in `execution.ExplainCommand`
2014-07-03 23:41:54 -07:00
Zongheng Yang d4c30cd991 [HOTFIX] Synchronize on SQLContext.settings in tests.
Let's see if this fixes the ongoing series of test failures in a master build machine (https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT-pre-YARN/SPARK_HADOOP_VERSION=1.0.4,label=centos/81/).

pwendell marmbrus

Author: Zongheng Yang <zongheng.y@gmail.com>

Closes #1277 from concretevitamin/test-fix and squashes the following commits:

28c88bd [Zongheng Yang] Synchronize on SQLContext.settings in tests.
2014-07-03 17:37:53 -07:00
Ximo Guanter Gonzalbez 5c6ec94da1 SPARK-2186: Spark SQL DSL support for simple aggregations such as SUM and AVG
**Description** This patch enables using the `.select()` function in SchemaRDD with functions such as `Sum`, `Count` and other.
**Testing** Unit tests added.

Author: Ximo Guanter Gonzalbez <ximo@tid.es>

Closes #1211 from edrevo/add-expression-support-in-select and squashes the following commits:

fe4a1e1 [Ximo Guanter Gonzalbez] Extend SQL DSL to functions
e1d344a [Ximo Guanter Gonzalbez] SPARK-2186: Spark SQL DSL support for simple aggregations such as SUM and AVG
2014-07-02 10:03:44 -07:00
Cheng Hao 981bde9b05 [SQL]Extract the joinkeys from join condition
Extract the join keys from equality conditions, that can be evaluated using equi-join.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #1190 from chenghao-intel/extract_join_keys and squashes the following commits:

4a1060a [Cheng Hao] Fix some of the small issues
ceb4924 [Cheng Hao] Remove the redundant pattern of join keys extraction
cec34e8 [Cheng Hao] Update the code style issues
dcc4584 [Cheng Hao] Extract the joinkeys from join condition
2014-06-26 19:18:11 -07:00
Takuya UESHIN 32a1ad7531 [SPARK-2295] [SQL] Make JavaBeans nullability stricter.
Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1235 from ueshin/issues/SPARK-2295 and squashes the following commits:

201c508 [Takuya UESHIN] Make JavaBeans nullability stricter.
2014-06-26 13:37:19 -07:00
Zongheng Yang 9d824fed8c [SQL] SPARK-1800 Add broadcast hash join operator & associated hints.
This PR is based off Michael's [PR 734](https://github.com/apache/spark/pull/734) and includes a bunch of cleanups.

Moreover, this PR also
- makes `SparkLogicalPlan` take a `tableName: String`, which facilitates testing.
- moves join-related tests to a single file.

Author: Zongheng Yang <zongheng.y@gmail.com>
Author: Michael Armbrust <michael@databricks.com>

Closes #1163 from concretevitamin/auto-broadcast-hash-join and squashes the following commits:

d0f4991 [Zongheng Yang] Fix bug in broadcast hash join & add test to cover it.
af080d7 [Zongheng Yang] Fix in joinIterators()'s next().
440d277 [Zongheng Yang] Fixes to imports; add back requiredChildDistribution (lost when merging)
208d5f6 [Zongheng Yang] Make LeftSemiJoinHash mix in HashJoin.
ad6c7cc [Zongheng Yang] Minor cleanups.
814b3bf [Zongheng Yang] Merge branch 'master' into auto-broadcast-hash-join
a8a093e [Zongheng Yang] Minor cleanups.
6fd8443 [Zongheng Yang] Cut down size estimation related stuff.
a4267be [Zongheng Yang] Add test for broadcast hash join and related necessary refactorings:
0e64b08 [Zongheng Yang] Scalastyle fix.
91461c2 [Zongheng Yang] Merge branch 'master' into auto-broadcast-hash-join
7c7158b [Zongheng Yang] Prototype of auto conversion to broadcast hash join.
0ad122f [Zongheng Yang] Merge branch 'master' into auto-broadcast-hash-join
3e5d77c [Zongheng Yang] WIP: giant and messy WIP.
a92ed0c [Michael Armbrust] Formatting.
76ca434 [Michael Armbrust] A simple strategy that broadcasts tables only when they are found in a configuration hint.
cf6b381 [Michael Armbrust] Split out generic logic for hash joins and create two concrete physical operators: BroadcastHashJoin and ShuffledHashJoin.
a8420ca [Michael Armbrust] Copy records in executeCollect to avoid issues with mutable rows.
2014-06-25 18:06:33 -07:00
Michael Armbrust a162c9b337 [SPARK-2264][SQL] Fix failing CachedTableSuite
Author: Michael Armbrust <michael@databricks.com>

Closes #1201 from marmbrus/fixCacheTests and squashes the following commits:

9d87ed1 [Michael Armbrust] Use analyzer (which runs to fixed point) instead of manually removing analysis operators.
2014-06-24 19:04:29 -07:00
Patrick Wendell 221909e678 HOTFIX: Disabling tests per SPARK-2264 2014-06-24 15:09:38 -07:00
jerryshao 56eb8af187 [SPARK-2124] Move aggregation into shuffle implementations
This PR is a sub-task of SPARK-2044 to move the execution of aggregation into shuffle implementations.

I leave `CoGoupedRDD` and `SubtractedRDD` unchanged because they have their implementations of aggregation. I'm not sure is it suitable to change these two RDDs.

Also I do not move sort related code of `OrderedRDDFunctions` into shuffle, this will be solved in another sub-task.

Author: jerryshao <saisai.shao@intel.com>

Closes #1064 from jerryshao/SPARK-2124 and squashes the following commits:

4a05a40 [jerryshao] Modify according to comments
1f7dcc8 [jerryshao] Style changes
50a2fd6 [jerryshao] Fix test suite issue after moving aggregator to Shuffle reader and writer
1a96190 [jerryshao] Code modification related to the ShuffledRDD
308f635 [jerryshao] initial works of move combiner to ShuffleManager's reader and writer
2014-06-23 20:25:46 -07:00
Cheng Lian a4bc442ca2 [SPARK-1669][SQL] Made cacheTable idempotent
JIRA issue: [SPARK-1669](https://issues.apache.org/jira/browse/SPARK-1669)

Caching the same table multiple times should end up with only 1 in-memory columnar representation of this table.

Before:

```
scala> loadTestTable("src")
...
scala> cacheTable("src")
...
scala> cacheTable("src")
...
scala> table("src")
...
== Query Plan ==
InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None))))
```

After:

```
scala> loadTestTable("src")
...
scala> cacheTable("src")
...
scala> cacheTable("src")
...
scala> table("src")
...
== Query Plan ==
InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None))
```

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1183 from liancheng/spark-1669 and squashes the following commits:

68f8a20 [Cheng Lian] Removed an unused import
51bae90 [Cheng Lian] Made cacheTable idempotent
2014-06-23 13:24:33 -07:00
Reynold Xin ca5d8b5904 [SQL] Pass SQLContext instead of SparkContext into physical operators.
This makes it easier to use config options in operators.

Author: Reynold Xin <rxin@apache.org>

Closes #1164 from rxin/sqlcontext and squashes the following commits:

797b2fd [Reynold Xin] Pass SQLContext instead of SparkContext into physical operators.
2014-06-20 22:49:48 -07:00
Reynold Xin 2f6a835e1a [SPARK-2218] rename Equals to EqualTo in Spark SQL expressions.
Due to the existence of scala.Equals, it is very error prone to name the expression Equals, especially because we use a lot of partial functions and pattern matching in the optimizer.

Note that this sits on top of #1144.

Author: Reynold Xin <rxin@apache.org>

Closes #1146 from rxin/equals and squashes the following commits:

f8583fd [Reynold Xin] Merge branch 'master' of github.com:apache/spark into equals
326b388 [Reynold Xin] Merge branch 'master' of github.com:apache/spark into equals
bd19807 [Reynold Xin] Rename EqualsTo to EqualTo.
81148d1 [Reynold Xin] [SPARK-2218] rename Equals to EqualsTo in Spark SQL expressions.
c4e543d [Reynold Xin] [SPARK-2210] boolean cast on boolean value should be removed.
2014-06-20 00:34:59 -07:00
Andre Schumacher f479cf3743 SPARK-1293 [SQL] Parquet support for nested types
It should be possible to import and export data stored in Parquet's columnar format that contains nested types. For example:
```java
message AddressBook {
   required binary owner;
   optional group ownerPhoneNumbers {
      repeated binary array;
   }
   optional group contacts {
      repeated group array {
         required binary name;
         optional binary phoneNumber;
      }
   }
   optional group nameToApartmentNumber {
      repeated group map {
         required binary key;
         required int32 value;
      }
   }
}
```
The example could model a type (AddressBook) that contains records made of strings (owner), lists (ownerPhoneNumbers) and a table of contacts (e.g., a list of pairs or a map that can contain null values but keys must not be null). The list of tasks are as follows:

<h6>Implement support for converting nested Parquet types to Spark/Catalyst types:</h6>
- [x] Structs
- [x] Lists
- [x] Maps (note: currently keys need to be Strings)

<h6>Implement import (via ``parquetFile``) of nested Parquet types (first version in this PR)</h6>
- [x] Initial version

<h6>Implement export (via ``saveAsParquetFile``)</h6>
- [x] Initial version

<h6>Test support for AvroParquet, etc.</h6>
- [x] Initial testing of import of avro-generated Parquet data (simple + nested)

Example:
```scala
val data = TestSQLContext
  .parquetFile("input.dir")
  .toSchemaRDD
data.registerAsTable("data")
sql("SELECT owner, contacts[1].name, nameToApartmentNumber['John'] FROM data").collect()
```

Author: Andre Schumacher <andre.schumacher@iki.fi>
Author: Michael Armbrust <michael@databricks.com>

Closes #360 from AndreSchumacher/nested_parquet and squashes the following commits:

30708c8 [Andre Schumacher] Taking out AvroParquet test for now to remove Avro dependency
95c1367 [Andre Schumacher] Changes to ParquetRelation and its metadata
7eceb67 [Andre Schumacher] Review feedback
94eea3a [Andre Schumacher] Scalastyle
403061f [Andre Schumacher] Fixing some issues with tests and schema metadata
b8a8b9a [Andre Schumacher] More fixes to short and byte conversion
63d1b57 [Andre Schumacher] Cleaning up and Scalastyle
88e6bdb [Andre Schumacher] Attempting to fix loss of schema
37e0a0a [Andre Schumacher] Cleaning up
14c3fd8 [Andre Schumacher] Attempting to fix Spark-Parquet schema conversion
3e1456c [Michael Armbrust] WIP: Directly serialize catalyst attributes.
f7aeba3 [Michael Armbrust] [SPARK-1982] Support for ByteType and ShortType.
3104886 [Michael Armbrust] Nested Rows should be Rows, not Seqs.
3c6b25f [Andre Schumacher] Trying to reduce no-op changes wrt master
31465d6 [Andre Schumacher] Scalastyle: fixing commented out bottom
de02538 [Andre Schumacher] Cleaning up ParquetTestData
2f5a805 [Andre Schumacher] Removing stripMargin from test schemas
191bc0d [Andre Schumacher] Changing to Seq for ArrayType, refactoring SQLParser for nested field extension
cbb5793 [Andre Schumacher] Code review feedback
32229c7 [Andre Schumacher] Removing Row nested values and placing by generic types
0ae9376 [Andre Schumacher] Doc strings and simplifying ParquetConverter.scala
a6b4f05 [Andre Schumacher] Cleaning up ArrayConverter, moving classTag to NativeType, adding NativeRow
431f00f [Andre Schumacher] Fixing problems introduced during rebase
c52ff2c [Andre Schumacher] Adding native-array converter
619c397 [Andre Schumacher] Completing Map testcase
79d81d5 [Andre Schumacher] Replacing field names for array and map in WriteSupport
f466ff0 [Andre Schumacher] Added ParquetAvro tests and revised Array conversion
adc1258 [Andre Schumacher] Optimizing imports
e99cc51 [Andre Schumacher] Fixing nested WriteSupport and adding tests
1dc5ac9 [Andre Schumacher] First version of WriteSupport for nested types
d1911dc [Andre Schumacher] Simplifying ArrayType conversion
f777b4b [Andre Schumacher] Scalastyle
824500c [Andre Schumacher] Adding attribute resolution for MapType
b539fde [Andre Schumacher] First commit for MapType
a594aed [Andre Schumacher] Scalastyle
4e25fcb [Andre Schumacher] Adding resolution of complex ArrayTypes
f8f8911 [Andre Schumacher] For primitive rows fall back to more efficient converter, code reorg
6dbc9b7 [Andre Schumacher] Fixing some problems intruduced during rebase
b7fcc35 [Andre Schumacher] Documenting conversions, bugfix, wrappers of Rows
ee70125 [Andre Schumacher] fixing one problem with arrayconverter
98219cf [Andre Schumacher] added struct converter
5d80461 [Andre Schumacher] fixing one problem with nested structs and breaking up files
1b1b3d6 [Andre Schumacher] Fixing one problem with nested arrays
ddb40d2 [Andre Schumacher] Extending tests for nested Parquet data
745a42b [Andre Schumacher] Completing testcase for nested data (Addressbook(
6125c75 [Andre Schumacher] First working nested Parquet record input
4d4892a [Andre Schumacher] First commit nested Parquet read converters
aa688fe [Andre Schumacher] Adding conversion of nested Parquet schemas
2014-06-19 23:47:45 -07:00
Yin Huai f397e92eb2 [SPARK-2177][SQL] describe table result contains only one column
```
scala> hql("describe src").collect().foreach(println)

[key                 	string              	None                ]
[value               	string              	None                ]
```

The result should contain 3 columns instead of one. This screws up JDBC or even the downstream consumer of the Scala/Java/Python APIs.

I am providing a workaround. We handle a subset of describe commands in Spark SQL, which are defined by ...
```
DESCRIBE [EXTENDED] [db_name.]table_name
```
All other cases are treated as Hive native commands.

Also, if we upgrade Hive to 0.13, we need to check the results of context.sessionState.isHiveServerQuery() to determine how to split the result. This method is introduced by https://issues.apache.org/jira/browse/HIVE-4545. We may want to set Hive to use JsonMetaDataFormatter for the output of a DDL statement (`set hive.ddl.output.format=json` introduced by https://issues.apache.org/jira/browse/HIVE-2822).

The link to JIRA: https://issues.apache.org/jira/browse/SPARK-2177

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1118 from yhuai/SPARK-2177 and squashes the following commits:

fd2534c [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
b9b9aa5 [Yin Huai] rxin's comments.
e7c4e72 [Yin Huai] Fix unit test.
656b068 [Yin Huai] 100 characters.
6387217 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
8003cf3 [Yin Huai] Generate strings with the format like Hive for unit tests.
9787fff [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
440c5af [Yin Huai] rxin's comments.
f1a417e [Yin Huai] Update doc.
83adb2f [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
366f891 [Yin Huai] Add describe command.
74bd1d4 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
342fdf7 [Yin Huai] Split to up to 3 parts.
725e88c [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
bb8bbef [Yin Huai] Split every string in the result of a describe command.
2014-06-19 23:41:38 -07:00
Reynold Xin 5464e79175 A few minor Spark SQL Scaladoc fixes.
Author: Reynold Xin <rxin@apache.org>

Closes #1139 from rxin/sparksqldoc and squashes the following commits:

c3049d8 [Reynold Xin] Fixed line length.
66dc72c [Reynold Xin] A few minor Spark SQL Scaladoc fixes.
2014-06-19 18:24:05 -07:00
Reynold Xin 640c294369 [SPARK-2187] Explain should not run the optimizer twice.
@yhuai @marmbrus @concretevitamin

Author: Reynold Xin <rxin@apache.org>

Closes #1123 from rxin/explain and squashes the following commits:

def83b0 [Reynold Xin] Update unit tests for explain.
a9d3ba8 [Reynold Xin] [SPARK-2187] Explain should not run the optimizer twice.
2014-06-18 22:44:12 -07:00
Michael Armbrust 5ff75c748a [SPARK-2184][SQL] AddExchange isn't idempotent
...redPartitioning.

Author: Michael Armbrust <michael@databricks.com>

Closes #1122 from marmbrus/fixAddExchange and squashes the following commits:

3417537 [Michael Armbrust] Don't bind partitioning expressions as that breaks comparison with requiredPartitioning.
2014-06-18 17:52:42 -07:00
Yin Huai 587d32012c [SPARK-2176][SQL] Extra unnecessary exchange operator in the result of an explain command
```
hql("explain select * from src group by key").collect().foreach(println)

[ExplainCommand [plan#27:0]]
[ Aggregate false, [key#25], [key#25,value#26]]
[  Exchange (HashPartitioning [key#25:0], 200)]
[   Exchange (HashPartitioning [key#25:0], 200)]
[    Aggregate true, [key#25], [key#25]]
[     HiveTableScan [key#25,value#26], (MetastoreRelation default, src, None), None]
```

There are two exchange operators.

However, if we do not use explain...
```
hql("select * from src group by key")

res4: org.apache.spark.sql.SchemaRDD =
SchemaRDD[8] at RDD at SchemaRDD.scala:100
== Query Plan ==
Aggregate false, [key#8], [key#8,value#9]
 Exchange (HashPartitioning [key#8:0], 200)
  Aggregate true, [key#8], [key#8]
   HiveTableScan [key#8,value#9], (MetastoreRelation default, src, None), None
```
The plan is fine.

The cause of this bug is explained below.

When we create an `execution.ExplainCommand`, we use the `executedPlan` as the child of this `ExplainCommand`. But, this `executedPlan` is prepared for execution again when we generate the `executedPlan` for the `ExplainCommand`. Basically, `prepareForExecution` is called twice on a physical plan. Because after `prepareForExecution` we have already bounded those references (in `BoundReference`s), `AddExchange` cannot figure out we are using the same partitioning (we use `AttributeReference`s to create an `ExchangeOperator` and then those references will be changed to `BoundReference`s after `prepareForExecution` is called). So, an extra `ExchangeOperator` is inserted.

I think in `CommandStrategy`, we should just use the `sparkPlan` (`sparkPlan` is the input of `prepareForExecution`) to initialize the `ExplainCommand` instead of using `executedPlan`.

The link to JIRA: https://issues.apache.org/jira/browse/SPARK-2176

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1116 from yhuai/SPARK-2176 and squashes the following commits:

197c19c [Yin Huai] Use sparkPlan to initialize a Physical Explain Command instead of using executedPlan.
2014-06-18 10:51:32 -07:00
Yin Huai d2f4f30b12 [SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL
JIRA: https://issues.apache.org/jira/browse/SPARK-2060

Programming guide: http://yhuai.github.io/site/sql-programming-guide.html

Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #999 from yhuai/newJson and squashes the following commits:

227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
ce8eedd [Yin Huai] rxin's comments.
bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
94ffdaa [Yin Huai] Remove "get" from method names.
ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
79ea9ba [Yin Huai] Fix typos.
5428451 [Yin Huai] Newline
1f908ce [Yin Huai] Remove extra line.
d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
7ea750e [Yin Huai] marmbrus's comments.
6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
83013fb [Yin Huai] Update Java Example.
e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map.
6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
4fbddf0 [Yin Huai] Programming guide.
9df8c5a [Yin Huai] Python API.
7027634 [Yin Huai] Java API.
cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset.
d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
ab810b0 [Yin Huai] Make JsonRDD private.
6df0891 [Yin Huai] Apache header.
8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema.
8ffed79 [Yin Huai] Update the example.
a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution.
65b87f0 [Yin Huai] Fix sampling...
8846af5 [Yin Huai] API doc.
52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
0387523 [Yin Huai] Address PR comments.
666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
a2313a6 [Yin Huai] Address PR comments.
f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used.
0576406 [Yin Huai] Add Apache license header.
af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD.
f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema.
2014-06-17 19:14:59 -07:00
Cheng Lian 237b96bc59 Minor fix: made "EXPLAIN" output to play well with JDBC output format
Fixed the broken JDBC output. Test from Shark `beeline`:

```
beeline> !connect jdbc:hive2://localhost:10000/
scan complete in 2ms
Connecting to jdbc:hive2://localhost:10000/
Enter username for jdbc:hive2://localhost:10000/: lian
Enter password for jdbc:hive2://localhost:10000/:
Connected to: Hive (version 0.12.0)
Driver: Hive (version 0.12.0)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000/>
0: jdbc:hive2://localhost:10000/> explain select * from src;
+-------------------------------------------------------------------------------+
|                                     plan                                      |
+-------------------------------------------------------------------------------+
| ExplainCommand [plan#2:0]                                                     |
|  HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None  |
+-------------------------------------------------------------------------------+
2 rows selected (1.386 seconds)
```

Before this change, the output looked something like this:

```
+-------------------------------------------------------------------------------+
|                                     plan                                      |
+-------------------------------------------------------------------------------+
| ExplainCommand [plan#2:0]
 HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None  |
+-------------------------------------------------------------------------------+
```

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1097 from liancheng/multiLineExplain and squashes the following commits:

eb37967 [Cheng Lian] Made output of "EXPLAIN" play well with JDBC output format
2014-06-16 16:42:17 -07:00
Cheng Lian 273afcb254 [SQL][SPARK-2094] Follow up of PR #1071 for Java API
Updated `JavaSQLContext` and `JavaHiveContext` similar to what we've done to `SQLContext` and `HiveContext` in PR #1071. Added corresponding test case for Spark SQL Java API.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1085 from liancheng/spark-2094-java and squashes the following commits:

29b8a51 [Cheng Lian] Avoided instantiating JavaSparkContext & JavaHiveContext to workaround test failure
92bb4fb [Cheng Lian] Marked test cases in JavaHiveQLSuite with "ignore"
22aec97 [Cheng Lian] Follow up of PR #1071 for Java API
2014-06-16 21:32:51 +02:00
Kan Zhang 4fdb491775 [SPARK-2010] Support for nested data in PySpark SQL
JIRA issue https://issues.apache.org/jira/browse/SPARK-2010

This PR adds support for nested collection types in PySpark SQL, including
array, dict, list, set, and tuple. Example,

```
>>> from array import array
>>> from pyspark.sql import SQLContext
>>> sqlCtx = SQLContext(sc)
>>> rdd = sc.parallelize([
...         {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...         {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...                    {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]
True
>>> rdd = sc.parallelize([
...         {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...         {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == \
... [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...  {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]
True
```

Author: Kan Zhang <kzhang@apache.org>

Closes #1041 from kanzhang/SPARK-2010 and squashes the following commits:

1b2891d [Kan Zhang] [SPARK-2010] minor doc change and adding a TODO
504f27e [Kan Zhang] [SPARK-2010] Support for nested data in PySpark SQL
2014-06-16 11:11:29 -07:00
Kan Zhang 2550533a28 [SPARK-2079] Support batching when serializing SchemaRDD to Python
Added batching with default batch size 10 in SchemaRDD.javaToPython

Author: Kan Zhang <kzhang@apache.org>

Closes #1023 from kanzhang/SPARK-2079 and squashes the following commits:

2d1915e [Kan Zhang] [SPARK-2079] Add batching in SchemaRDD.javaToPython
19b0c09 [Kan Zhang] [SPARK-2079] Removing unnecessary wrapping in SchemaRDD.javaToPython
2014-06-14 13:17:22 -07:00
Cheng Lian ac96d9657c [SPARK-2094][SQL] "Exactly once" semantics for DDL and command statements
## Related JIRA issues

- Main issue:

  - [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure exactly once semantics for DDL/Commands

- Issues resolved as dependencies:

  - [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081): Undefine output() from the abstract class Command and implement it in concrete subclasses
  - [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan for DESCRIBE
  - [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852): SparkSQL Queries with Sorts run before the user asks them to

- Other related issue:

  - [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE thrown while lookup a view

    Two test cases, `join_view` and `mergejoin_mixed`, within the `HiveCompatibilitySuite` are removed from the whitelist to workaround this issue.

## PR Overview

This PR defines physical plans for DDL statements and commands and wraps their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that they are executed eagerly and exactly once.  Also, as a positive side effect, now DDL statements and commands can be turned into proper `SchemaRDD`s and let user query the execution results.

This PR defines schemas for the following DDL/commands:

- EXPLAIN command

  - `plan`: String, the plan explanation

- SET command

  - `key`: String, the key(s) of the propert(y/ies) being set or queried
  - `value`: String, the value(s) of the propert(y/ies) being queried

- Other Hive native command

  - `result`: String, execution result returned by Hive

  **NOTE**: We should refine schemas for different native commands by defining physical plans for them in the future.

## Examples

### EXPLAIN command

Take the "EXPLAIN" command as an example, we first execute the command and obtain a `SchemaRDD` at the same time, then query the `plan` field with the schema DSL:

```
scala> loadTestTable("src")
...

scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key")
...
q0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
ExplainCommandPhysical [plan#11:0]
 Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L]
  Exchange (HashPartitioning [key#4:0], 200)
   Exchange (HashPartitioning [key#4:0], 200)
    Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L]
     HiveTableScan [key#4], (MetastoreRelation default, src, None), None

scala> q0.select('plan).collect()
...
[ExplainCommandPhysical [plan#24:0]
 Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L]
  Exchange (HashPartitioning [key#17:0], 200)
   Exchange (HashPartitioning [key#17:0], 200)
    Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L]
     HiveTableScan [key#17], (MetastoreRelation default, src, None), None]

scala>
```

### SET command

In this example we query all the properties set in `SQLConf`, register the result as a table, and then query the table with HiveQL:

```
scala> val q1 = hql("SET")
...
q1: org.apache.spark.sql.SchemaRDD =
SchemaRDD[7] at RDD at SchemaRDD.scala:98
== Query Plan ==
<SET command: executed by Hive, and noted by SQLContext>

scala> q1.registerAsTable("properties")

scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT 10").foreach(println)
...
== Query Plan ==
TakeOrdered 10, [key#51:0 ASC]
 Project [key#51:0,value#52:1]
  SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no missing parents
14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98
== Query Plan ==
TakeOrdered 10, [key#51:0 ASC]
 Project [key#51:0,value#52:1]
  SetCommandPhysical None, None, [key#55:0,value#56:1])
...
[datanucleus.autoCreateSchema,true]
[datanucleus.autoStartMechanismMode,checked]
[datanucleus.cache.level2,false]
[datanucleus.cache.level2.type,none]
[datanucleus.connectionPoolingType,BONECP]
[datanucleus.fixedDatastore,false]
[datanucleus.identifierFactory,datanucleus1]
[datanucleus.plugin.pluginRegistryBundleCheck,LOG]
[datanucleus.rdbms.useLegacyNativeValueStrategy,true]
[datanucleus.storeManagerType,rdbms]

scala>
```

### "Exactly once" semantics

At last, an example of the "exactly once" semantics:

```
scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)")
...
q2: org.apache.spark.sql.SchemaRDD =
SchemaRDD[28] at RDD at SchemaRDD.scala:98
== Query Plan ==
<Native command: executed by Hive>

scala> table("t1")
...
res9: org.apache.spark.sql.SchemaRDD =
SchemaRDD[32] at RDD at SchemaRDD.scala:98
== Query Plan ==
HiveTableScan [key#58,value#59], (MetastoreRelation default, t1, None), None

scala> q2.collect()
...
res10: Array[org.apache.spark.sql.Row] = Array([])

scala>
```

As we can see, the "CREATE TABLE" command is executed eagerly right after the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger a duplicated execution.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1071 from liancheng/exactlyOnceCommand and squashes the following commits:

d005b03 [Cheng Lian] Made "SET key=value" returns the newly set key value pair
f6c7715 [Cheng Lian] Added test cases for DDL/command statement RDDs
1d00937 [Cheng Lian] Makes SchemaRDD DSLs work for DDL/command statement RDDs
5c7e680 [Cheng Lian] Bug fix: wrong type used in pattern matching
48aa2e5 [Cheng Lian] Refined SQLContext.emptyResult as an empty RDD[Row]
cc64f32 [Cheng Lian] Renamed physical plan classes for DDL/commands
74789c1 [Cheng Lian] Fixed failing test cases
0ad343a [Cheng Lian] Added physical plan for DDL and commands to ensure the "exactly once" semantics
2014-06-13 12:59:48 -07:00
Michael Armbrust 13f8cfdc04 [SPARK-2135][SQL] Use planner for in-memory scans
Author: Michael Armbrust <michael@databricks.com>

Closes #1072 from marmbrus/cachedStars and squashes the following commits:

8757c8e [Michael Armbrust] Use planner for in-memory scans.
2014-06-12 23:09:41 -07:00
Patrick Wendell d45e0c6b98 HOTFIX: Forgot to remove false change in previous commit 2014-06-11 15:55:41 -07:00
Patrick Wendell 14e6dc94f6 HOTFIX: PySpark tests should be order insensitive.
This has been messing up the SQL PySpark tests on Jenkins.

Author: Patrick Wendell <pwendell@gmail.com>

Closes #1054 from pwendell/pyspark and squashes the following commits:

1eb5487 [Patrick Wendell] False change
06f062d [Patrick Wendell] HOTFIX: PySpark tests should be order insensitive
2014-06-11 15:54:41 -07:00
Daoyuan ce6deb1e5b [SQL] Code Cleanup: Left Semi Hash Join
Some improvement for PR #837, add another case to white list and use `filter` to build result iterator.

Author: Daoyuan <daoyuan.wang@intel.com>

Closes #1049 from adrian-wang/clean-LeftSemiJoinHash and squashes the following commits:

b314d5a [Daoyuan] change hashSet name
27579a9 [Daoyuan] add semijoin to white list and use filter to create new iterator in LeftSemiJoinBNL

Signed-off-by: Michael Armbrust <michael@databricks.com>
2014-06-11 12:09:42 -07:00
Sameer Agarwal 4107cce58c [SPARK-2042] Prevent unnecessary shuffle triggered by take()
This PR implements `take()` on a `SchemaRDD` by inserting a logical limit that is followed by a `collect()`. This is also accompanied by adding a catalyst optimizer rule for collapsing adjacent limits. Doing so prevents an unnecessary shuffle that is sometimes triggered by `take()`.

Author: Sameer Agarwal <sameer@databricks.com>

Closes #1048 from sameeragarwal/master and squashes the following commits:

3eeb848 [Sameer Agarwal] Fixing Tests
1b76ff1 [Sameer Agarwal] Deprecating limit(limitExpr: Expression) in v1.1.0
b723ac4 [Sameer Agarwal] Added limit folding tests
a0ff7c4 [Sameer Agarwal] Adding catalyst rule to fold two consecutive limits
8d42d03 [Sameer Agarwal] Implement trigger() as limit() followed by collect()
2014-06-11 12:01:04 -07:00
Cheng Lian 0266a0c8a7 [SPARK-1968][SQL] SQL/HiveQL command for caching/uncaching tables
JIRA issue: [SPARK-1968](https://issues.apache.org/jira/browse/SPARK-1968)

This PR added support for SQL/HiveQL command for caching/uncaching tables:

```
scala> sql("CACHE TABLE src")
...
res0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
CacheCommandPhysical src, true

scala> table("src")
...
res1: org.apache.spark.sql.SchemaRDD =
SchemaRDD[3] at RDD at SchemaRDD.scala:98
== Query Plan ==
InMemoryColumnarTableScan [key#0,value#1], (HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None), false

scala> isCached("src")
res2: Boolean = true

scala> sql("CACHE TABLE src")
...
res3: org.apache.spark.sql.SchemaRDD =
SchemaRDD[4] at RDD at SchemaRDD.scala:98
== Query Plan ==
CacheCommandPhysical src, false

scala> table("src")
...
res4: org.apache.spark.sql.SchemaRDD =
SchemaRDD[11] at RDD at SchemaRDD.scala:98
== Query Plan ==
HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None

scala> isCached("src")
res5: Boolean = false
```

Things also work for `hql`.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1038 from liancheng/sqlCacheTable and squashes the following commits:

ecb7194 [Cheng Lian] Trimmed the SQL string before parsing special commands
6f4ce42 [Cheng Lian] Moved logical command classes to a separate file
3458a24 [Cheng Lian] Added comment for public API
f0ffacc [Cheng Lian] Added isCached() predicate
15ec6d2 [Cheng Lian] Added "(UN)CACHE TABLE" SQL/HiveQL statements
2014-06-11 00:06:50 -07:00
Zongheng Yang 601032f5bf HOTFIX: clear() configs in SQLConf-related unit tests.
Thanks goes to @liancheng, who pointed out that `sql/test-only *.SQLConfSuite *.SQLQuerySuite` passed but `sql/test-only *.SQLQuerySuite *.SQLConfSuite` failed. The reason is that some tests use the same test keys and without clear()'ing, they get carried over to other tests. This hotfix simply adds some `clear()` calls.

This problem was not evident on Jenkins before probably because `parallelExecution` is not set to `false` for `sqlCoreSettings`.

Author: Zongheng Yang <zongheng.y@gmail.com>

Closes #1040 from concretevitamin/sqlconf-tests and squashes the following commits:

6d14ceb [Zongheng Yang] HOTFIX: clear() confs in SQLConf related unit tests.
2014-06-10 21:59:01 -07:00
egraldlo 1abbde0e89 [SQL] Add average overflow test case from #978
By @egraldlo.

Author: egraldlo <egraldlo@gmail.com>
Author: Michael Armbrust <michael@databricks.com>

Closes #1033 from marmbrus/pr/978 and squashes the following commits:

e228c5e [Michael Armbrust] Remove "test".
762aeaf [Michael Armbrust] Remove unneeded rule. More descriptive name for test table.
d414cd7 [egraldlo] fommatting issues
1153f75 [egraldlo] do best to avoid overflowing in function avg().
2014-06-10 14:07:55 -07:00
Zongheng Yang 08ed9ad813 [SPARK-1508][SQL] Add SQLConf to SQLContext.
This PR (1) introduces a new class SQLConf that stores key-value properties for a SQLContext (2) clean up the semantics of various forms of SET commands.

The SQLConf class unlocks user-controllable optimization opportunities; for example, user can now override the number of partitions used during an Exchange. A SQLConf can be accessed and modified programmatically through its getters and setters. It can also be modified through SET commands executed by `sql()` or `hql()`. Note that users now have the ability to change a particular property for different queries inside the same Spark job, unlike settings configured in SparkConf.

For SET commands: "SET" will return all properties currently set in a SQLConf, "SET key" will return the key-value pair (if set) or an undefined message, and "SET key=value" will call the setter on SQLConf, and if a HiveContext is used, it will be executed in Hive as well.

Author: Zongheng Yang <zongheng.y@gmail.com>

Closes #956 from concretevitamin/sqlconf and squashes the following commits:

4968c11 [Zongheng Yang] Very minor cleanup.
d74dde5 [Zongheng Yang] Remove the redundant mkQueryExecution() method.
c129b86 [Zongheng Yang] Merge remote-tracking branch 'upstream/master' into sqlconf
26c40eb [Zongheng Yang] Make SQLConf a trait and have SQLContext mix it in.
dd19666 [Zongheng Yang] Update a comment.
baa5d29 [Zongheng Yang] Remove default param for shuffle partitions accessor.
5f7e6d8 [Zongheng Yang] Add default num partitions.
22d9ed7 [Zongheng Yang] Fix output() of Set physical. Add SQLConf param accessor method.
e9856c4 [Zongheng Yang] Use java.util.Collections.synchronizedMap on a Java HashMap.
88dd0c8 [Zongheng Yang] Remove redundant SET Keyword.
271f0b1 [Zongheng Yang] Minor change.
f8983d1 [Zongheng Yang] Minor changes per review comments.
1ce8a5e [Zongheng Yang] Invoke runSqlHive() in SQLConf#get for the HiveContext case.
b766af9 [Zongheng Yang] Remove a test.
d52e1bd [Zongheng Yang] De-hardcode number of shuffle partitions for BasicOperators (read from SQLConf).
555599c [Zongheng Yang] Bullet-proof (relatively) parsing SET per review comment.
c2067e8 [Zongheng Yang] Mark SQLContext transient and put it in a second param list.
2ea8cdc [Zongheng Yang] Wrap long line.
41d7f09 [Zongheng Yang] Fix imports.
13279e6 [Zongheng Yang] Refactor the logic of eagerly processing SET commands.
b14b83e [Zongheng Yang] In a HiveContext, make SQLConf a subset of HiveConf.
6983180 [Zongheng Yang] Move a SET test to SQLQuerySuite and make it complete.
5b67985 [Zongheng Yang] New line at EOF.
c651797 [Zongheng Yang] Add commands.scala.
efd82db [Zongheng Yang] Clean up semantics of several cases of SET.
c1017c2 [Zongheng Yang] WIP in changing SetCommand to take two Options (for different semantics of SETs).
0f00d86 [Zongheng Yang] Add a test for singleton set command in SQL.
41acd75 [Zongheng Yang] Add a test for hql() in HiveQuerySuite.
2276929 [Zongheng Yang] Fix default hive result for set commands in HiveComparisonTest.
3b0c71b [Zongheng Yang] Remove Parser for set commands. A few other fixes.
d0c4578 [Zongheng Yang] Tmux typo.
0ecea46 [Zongheng Yang] Changes for HiveQl and HiveContext.
ce22d80 [Zongheng Yang] Fix parsing issues.
cb722c1 [Zongheng Yang] Finish up SQLConf patch.
4ebf362 [Zongheng Yang] First cut at SQLConf inside SQLContext.
2014-06-10 00:49:09 -07:00
Zongheng Yang a9ec033c8c [SPARK-1704][SQL] Fully support EXPLAIN commands as SchemaRDD.
This PR attempts to resolve [SPARK-1704](https://issues.apache.org/jira/browse/SPARK-1704) by introducing a physical plan for EXPLAIN commands, which just prints out the debug string (containing various SparkSQL's plans) of the corresponding QueryExecution for the actual query.

Author: Zongheng Yang <zongheng.y@gmail.com>

Closes #1003 from concretevitamin/explain-cmd and squashes the following commits:

5b7911f [Zongheng Yang] Add a regression test.
1bfa379 [Zongheng Yang] Modify output().
719ada9 [Zongheng Yang] Override otherCopyArgs for ExplainCommandPhysical.
4318fd7 [Zongheng Yang] Make all output one Row.
439c6ab [Zongheng Yang] Minor cleanups.
408f574 [Zongheng Yang] SPARK-1704: Add CommandStrategy and ExplainCommandPhysical.
2014-06-09 16:47:44 -07:00
Michael Armbrust c6e041d171 [SQL] Simple framework for debugging query execution
Only records number of tuples and unique dataTypes output right now...

Example:
```scala
scala> import org.apache.spark.sql.execution.debug._
scala> hql("SELECT value FROM src WHERE key > 10").debug(sparkContext)

Results returned: 489
== Project [value#1:0] ==
Tuples output: 489
 value StringType: {java.lang.String}
== Filter (key#0:1 > 10) ==
Tuples output: 489
 value StringType: {java.lang.String}
 key IntegerType: {java.lang.Integer}
== HiveTableScan [value#1,key#0], (MetastoreRelation default, src, None), None ==
Tuples output: 500
 value StringType: {java.lang.String}
 key IntegerType: {java.lang.Integer}
```

Author: Michael Armbrust <michael@databricks.com>

Closes #1005 from marmbrus/debug and squashes the following commits:

dcc3ca6 [Michael Armbrust] Add comments.
c9dded2 [Michael Armbrust] Simple framework for debugging query execution
2014-06-09 14:24:19 -07:00
Daoyuan 0cf6002801 [SPARK-1495][SQL]add support for left semi join
Just submit another solution for #395

Author: Daoyuan <daoyuan.wang@intel.com>
Author: Michael Armbrust <michael@databricks.com>
Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #837 from adrian-wang/left-semi-join-support and squashes the following commits:

d39cd12 [Daoyuan Wang] Merge pull request #1 from marmbrus/pr/837
6713c09 [Michael Armbrust] Better debugging for failed query tests.
035b73e [Michael Armbrust] Add test for left semi that can't be done with a hash join.
5ec6fa4 [Michael Armbrust] Add left semi to SQL Parser.
4c726e5 [Daoyuan] improvement according to Michael
8d4a121 [Daoyuan] add golden files for leftsemijoin
83a3c8a [Daoyuan] scala style fix
14cff80 [Daoyuan] add support for left semi join
2014-06-09 11:31:36 -07:00
Michael Armbrust a6c72ab16e [SPARK-1994][SQL] Weird data corruption bug when running Spark SQL on data in HDFS
Basically there is a race condition (possibly a scala bug?) when these values are recomputed on all of the slaves that results in an incorrect projection being generated (possibly because the GUID uniqueness contract is broken?).

In general we should probably enforce that all expression planing occurs on the driver, as is now occurring here.

Author: Michael Armbrust <michael@databricks.com>

Closes #1004 from marmbrus/fixAggBug and squashes the following commits:

e0c116c [Michael Armbrust] Compute aggregate expression during planning instead of lazily on workers.
2014-06-07 14:20:33 -07:00
witgo 41c4a33105 [SPARK-1841]: update scalatest to version 2.1.5
Author: witgo <witgo@qq.com>

Closes #713 from witgo/scalatest and squashes the following commits:

b627a6a [witgo] merge master
51fb3d6 [witgo] merge master
3771474 [witgo] fix RDDSuite
996d6f9 [witgo] fix TimeStampedWeakValueHashMap test
9dfa4e7 [witgo] merge bug
1479b22 [witgo] merge master
29b9194 [witgo] fix code style
022a7a2 [witgo] fix test dependency
a52c0fa [witgo] fix test dependency
cd8f59d [witgo] Merge branch 'master' of https://github.com/apache/spark into scalatest
046540d [witgo] fix RDDSuite.scala
2c543b9 [witgo] fix ReplSuite.scala
c458928 [witgo] update scalatest to version 2.1.5
2014-06-06 11:45:21 -07:00
Michael Armbrust c7a183b2c2 [SPARK-2041][SQL] Correctly analyze queries where columnName == tableName.
Author: Michael Armbrust <michael@databricks.com>

Closes #985 from marmbrus/tableName and squashes the following commits:

3caaa27 [Michael Armbrust] Correctly analyze queries where columnName == tableName.
2014-06-05 17:42:08 -07:00
Takuya UESHIN e4c11eef2f [SPARK-2036] [SQL] CaseConversionExpression should check if the evaluated value is null.
`CaseConversionExpression` should check if the evaluated value is `null`.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #982 from ueshin/issues/SPARK-2036 and squashes the following commits:

61e1c54 [Takuya UESHIN] Add check if the evaluated value is null.
2014-06-05 12:00:31 -07:00
egraldlo ec8be274a7 [SPARK-1995][SQL] system function upper and lower can be supported
I don't know whether it's time to implement system function about string operation in spark sql now.

Author: egraldlo <egraldlo@gmail.com>

Closes #936 from egraldlo/stringoperator and squashes the following commits:

3c6c60a [egraldlo] Add UPPER, LOWER, MAX and MIN into hive parser
ea76d0a [egraldlo] modify the formatting issues
b49f25e [egraldlo] modify the formatting issues
1f0bbb5 [egraldlo] system function upper and lower supported
13d3267 [egraldlo] system function upper and lower supported
2014-06-02 18:02:57 -07:00
Cheng Lian d000ca98a8 [SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan.
In cases like `Limit` and `TakeOrdered`, `executeCollect()` makes optimizations that `execute().collect()` will not.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #939 from liancheng/spark-1958 and squashes the following commits:

bdc4a14 [Cheng Lian] Copy rows to present immutable data to users
8250976 [Cheng Lian] Added return type explicitly for public API
192a25c [Cheng Lian] [SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan.
2014-06-02 12:09:43 -07:00
Cheng Lian 8f7141fbc0 [SPARK-1368][SQL] Optimized HiveTableScan
JIRA issue: [SPARK-1368](https://issues.apache.org/jira/browse/SPARK-1368)

This PR introduces two major updates:

- Replaced FP style code with `while` loop and reusable `GenericMutableRow` object in critical path of `HiveTableScan`.
- Using `ColumnProjectionUtils` to help optimizing RCFile and ORC column pruning.

My quick micro benchmark suggests these two optimizations made the optimized version 2x and 2.5x faster when scanning CSV table and RCFile table respectively:

```
Original:

[info] CSV: 27676 ms, RCFile: 26415 ms
[info] CSV: 27703 ms, RCFile: 26029 ms
[info] CSV: 27511 ms, RCFile: 25962 ms

Optimized:

[info] CSV: 13820 ms, RCFile: 10402 ms
[info] CSV: 14158 ms, RCFile: 10691 ms
[info] CSV: 13606 ms, RCFile: 10346 ms
```

The micro benchmark loads a 609MB CVS file (structurally similar to the `src` test table) into a normal Hive table with `LazySimpleSerDe` and a RCFile table, then scans these tables respectively.

Preparation code:

```scala
package org.apache.spark.examples.sql.hive

import org.apache.spark.sql.hive.LocalHiveContext
import org.apache.spark.{SparkConf, SparkContext}

object HiveTableScanPrepare extends App {
  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  import hiveContext._

  hql("drop table scan_csv")
  hql("drop table scan_rcfile")

  hql("""create table scan_csv (key int, value string)
        |  row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
        |  with serdeproperties ('field.delim'=',')
      """.stripMargin)

  hql(s"""load data local inpath "${args(0)}" into table scan_csv""")

  hql("""create table scan_rcfile (key int, value string)
        |  row format serde 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
        |stored as
        |  inputformat 'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
        |  outputformat 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'
      """.stripMargin)

  hql(
    """
      |from scan_csv
      |insert overwrite table scan_rcfile
      |select scan_csv.key, scan_csv.value
    """.stripMargin)
}
```

Benchmark code:

```scala
package org.apache.spark.examples.sql.hive

import org.apache.spark.sql.hive.LocalHiveContext
import org.apache.spark.{SparkConf, SparkContext}

object HiveTableScanBenchmark extends App {
  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  import hiveContext._

  val scanCsv = hql("select key from scan_csv")
  val scanRcfile = hql("select key from scan_rcfile")

  val csvDuration = benchmark(scanCsv.count())
  val rcfileDuration = benchmark(scanRcfile.count())

  println(s"CSV: $csvDuration ms, RCFile: $rcfileDuration ms")

  def benchmark(f: => Unit) = {
    val begin = System.currentTimeMillis()
    f
    val end = System.currentTimeMillis()
    end - begin
  }
}
```

@marmbrus Please help review, thanks!

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #758 from liancheng/fastHiveTableScan and squashes the following commits:

4241a19 [Cheng Lian] Distinguishes sorted and possibly not sorted operations more accurately in HiveComparisonTest
cf640d8 [Cheng Lian] More HiveTableScan optimisations:
bf0e7dc [Cheng Lian] Added SortedOperation pattern to match *some* definitely sorted operations and avoid some sorting cost in HiveComparisonTest.
6d1c642 [Cheng Lian] Using ColumnProjectionUtils to optimise RCFile and ORC column pruning
eb62fd3 [Cheng Lian] [SPARK-1368] Optimized HiveTableScan
2014-05-29 15:24:03 -07:00
Takuya UESHIN 3b0babad1f [SPARK-1915] [SQL] AverageFunction should not count if the evaluated value is null.
Average values are difference between the calculation is done partially or not partially.
Because `AverageFunction` (in not-partially calculation) counts even if the evaluated value is null.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #862 from ueshin/issues/SPARK-1915 and squashes the following commits:

b1ff3c0 [Takuya UESHIN] Modify AverageFunction not to count if the evaluated value is null.
2014-05-27 14:55:23 -07:00
Takuya UESHIN d6395d86f9 [SPARK-1914] [SQL] Simplify CountFunction not to traverse to evaluate all child expressions.
`CountFunction` should count up only if the child's evaluated value is not null.

Because it traverses to evaluate all child expressions, even if the child is null, it counts up if one of the all children is not null.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #861 from ueshin/issues/SPARK-1914 and squashes the following commits:

3b37315 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-1914
2afa238 [Takuya UESHIN] Simplify CountFunction not to traverse to evaluate all child expressions.
2014-05-26 00:17:20 -07:00
Aaron Davidson c3576ffcd7 [SQL] Minor: Introduce SchemaRDD#aggregate() for simple aggregations
```scala
rdd.aggregate(Sum('val))
```
is just shorthand for

```scala
rdd.groupBy()(Sum('val))
```

but seems be more natural than doing a groupBy with no grouping expressions when you really just want an aggregation over all rows.

Did not add a JavaSchemaRDD or Python API, as these seem to be lacking several other methods like groupBy() already -- leaving that cleanup for future patches.

Author: Aaron Davidson <aaron@databricks.com>

Closes #874 from aarondav/schemardd and squashes the following commits:

e9e68ee [Aaron Davidson] Add comment
db6afe2 [Aaron Davidson] Introduce SchemaRDD#aggregate() for simple aggregations
2014-05-25 18:37:44 -07:00
Reynold Xin d66642e397 SPARK-1822: Some minor cleanup work on SchemaRDD.count()
Minor cleanup following #841.

Author: Reynold Xin <rxin@apache.org>

Closes #868 from rxin/schema-count and squashes the following commits:

5442651 [Reynold Xin] SPARK-1822: Some minor cleanup work on SchemaRDD.count()
2014-05-25 01:44:49 -07:00
Kan Zhang 6052db9dc1 [SPARK-1822] SchemaRDD.count() should use query optimizer
Author: Kan Zhang <kzhang@apache.org>

Closes #841 from kanzhang/SPARK-1822 and squashes the following commits:

2f8072a [Kan Zhang] [SPARK-1822] Minor style update
cf4baa4 [Kan Zhang] [SPARK-1822] Adding Scaladoc
e67c910 [Kan Zhang] [SPARK-1822] SchemaRDD.count() should use optimizer
2014-05-25 00:06:42 -07:00
Cheng Lian 5afe6af0b1 [SPARK-1913][SQL] Bug fix: column pruning error in Parquet support
JIRA issue: [SPARK-1913](https://issues.apache.org/jira/browse/SPARK-1913)

When scanning Parquet tables, attributes referenced only in predicates that are pushed down are not passed to the `ParquetTableScan` operator and causes exception.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #863 from liancheng/spark-1913 and squashes the following commits:

f976b73 [Cheng Lian] Addessed the readability issue commented by @rxin
f5b257d [Cheng Lian] Added back comments deleted by mistake
ae60ab3 [Cheng Lian] [SPARK-1913] Attributes referenced only in predicates pushed down should remain in ParquetTableScan operator
2014-05-24 20:42:01 -07:00
Takuya UESHIN bb88875ad5 [SPARK-1889] [SQL] Apply splitConjunctivePredicates to join condition while finding join ke...
...ys.

When tables are equi-joined by multiple-keys `HashJoin` should be used, but `CartesianProduct` and then `Filter` are used.
The join keys are paired by `And` expression so we need to apply `splitConjunctivePredicates` to join condition while finding join keys.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #836 from ueshin/issues/SPARK-1889 and squashes the following commits:

fe1c387 [Takuya UESHIN] Apply splitConjunctivePredicates to join condition while finding join keys.
2014-05-21 15:37:47 -07:00