Commit graph

371 commits

Author SHA1 Message Date
Davies Liu 6cf507685e [SPARK-4548] []SPARK-4517] improve performance of python broadcast
Re-implement the Python broadcast using file:

1) serialize the python object using cPickle, write into disks.
2) Create a wrapper in JVM (for the dumped file), it read data from during serialization
3) Using TorrentBroadcast or HttpBroadcast to transfer the data (compressed) into executors
4) During deserialization, writing the data into disk.
5) Passing the path into Python worker, read data from disk and unpickle it into python object, until the first access.

It fixes the performance regression introduced in #2659, has similar performance as 1.1, but support object larger than 2G, also improve the memory efficiency (only one compressed copy in driver and executor).

Testing with a 500M broadcast and 4 tasks (excluding the benefit from reused worker in 1.2):

         name |   1.1   | 1.2 with this patch |  improvement
---------|--------|---------|--------
      python-broadcast-w-bytes  |	25.20  |	9.33   |	170.13% |
        python-broadcast-w-set	  |     4.13	   |    4.50  |	-8.35%  |

Testing with 100 tasks (16 CPUs):

         name |   1.1   | 1.2 with this patch |  improvement
---------|--------|---------|--------
     python-broadcast-w-bytes	| 38.16	| 8.40	 | 353.98%
        python-broadcast-w-set	| 23.29	| 9.59 |	142.80%

Author: Davies Liu <davies@databricks.com>

Closes #3417 from davies/pybroadcast and squashes the following commits:

50a58e0 [Davies Liu] address comments
b98de1d [Davies Liu] disable gc while unpickle
e5ee6b9 [Davies Liu] support large string
09303b8 [Davies Liu] read all data into memory
dde02dd [Davies Liu] improve performance of python broadcast
2014-11-24 17:17:03 -08:00
Cheng Lian a6d7b61f92 [SPARK-4479][SQL] Avoids unnecessary defensive copies when sort based shuffle is on
This PR is a workaround for SPARK-4479. Two changes are introduced: when merge sort is bypassed in `ExternalSorter`,

1. also bypass RDD elements buffering as buffering is the reason that `MutableRow` backed row objects must be copied, and
2. avoids defensive copies in `Exchange` operator

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3422)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #3422 from liancheng/avoids-defensive-copies and squashes the following commits:

591f2e9 [Cheng Lian] Passes all shuffle suites
0c3c91e [Cheng Lian] Fixes shuffle write metrics when merge sort is bypassed
ed5df3c [Cheng Lian] Fixes styling changes
f75089b [Cheng Lian] Avoids unnecessary defensive copies when sort based shuffle is on
2014-11-24 12:43:45 -08:00
Michael Armbrust 02ec058efe [SPARK-4413][SQL] Parquet support through datasource API
Goals:
 - Support for accessing parquet using SQL but not requiring Hive (thus allowing support of parquet tables with decimal columns)
 - Support for folder based partitioning with automatic discovery of available partitions
 - Caching of file metadata

See scaladoc of `ParquetRelation2` for more details.

Author: Michael Armbrust <michael@databricks.com>

Closes #3269 from marmbrus/newParquet and squashes the following commits:

1dd75f1 [Michael Armbrust] Pass all paths for FileInputFormat at once.
645768b [Michael Armbrust] Review comments.
abd8e2f [Michael Armbrust] Alternative implementation of parquet based on the datasources API.
938019e [Michael Armbrust] Add an experimental interface to data sources that exposes catalyst expressions.
e9d2641 [Michael Armbrust] logging / formatting improvements.
2014-11-20 18:31:02 -08:00
Jacky Li ad5f1f3ca2 [SQL] fix function description mistake
Sample code in the description of SchemaRDD.where is not correct

Author: Jacky Li <jacky.likun@gmail.com>

Closes #3344 from jackylk/patch-6 and squashes the following commits:

62cd126 [Jacky Li] [SQL] fix function description mistake
2014-11-20 15:48:36 -08:00
Takuya UESHIN 2c2e7a44db [SPARK-4318][SQL] Fix empty sum distinct.
Executing sum distinct for empty table throws `java.lang.UnsupportedOperationException: empty.reduceLeft`.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #3184 from ueshin/issues/SPARK-4318 and squashes the following commits:

8168c42 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-4318
66fdb0a [Takuya UESHIN] Re-refine aggregate functions.
6186eb4 [Takuya UESHIN] Fix Sum of GeneratedAggregate.
d2975f6 [Takuya UESHIN] Refine Sum and Average of GeneratedAggregate.
1bba675 [Takuya UESHIN] Refine Sum, SumDistinct and Average functions.
917e533 [Takuya UESHIN] Use aggregate instead of groupBy().
1a5f874 [Takuya UESHIN] Add tests to be executed as non-partial aggregation.
a5a57d2 [Takuya UESHIN] Fix empty Average.
22799dc [Takuya UESHIN] Fix empty Sum and SumDistinct.
65b7dd2 [Takuya UESHIN] Fix empty sum distinct.
2014-11-20 15:41:24 -08:00
Dan McClary b8e6886fb8 [SPARK-4228][SQL] SchemaRDD to JSON
Here's a simple fix for SchemaRDD to JSON.

Author: Dan McClary <dan.mcclary@gmail.com>

Closes #3213 from dwmclary/SPARK-4228 and squashes the following commits:

d714e1d [Dan McClary] fixed PEP 8 error
cac2879 [Dan McClary] move pyspark comment and doctest to correct location
f9471d3 [Dan McClary] added pyspark doc and doctest
6598cee [Dan McClary] adding complex type queries
1a5fd30 [Dan McClary] removing SPARK-4228 from SQLQuerySuite
4a651f0 [Dan McClary] cleaned PEP and Scala style failures.  Moved tests to JsonSuite
47ceff6 [Dan McClary] cleaned up scala style issues
2ee1e70 [Dan McClary] moved rowToJSON to JsonRDD
4387dd5 [Dan McClary] Added UserDefinedType, cleaned up case formatting
8f7bfb6 [Dan McClary] Map type added to SchemaRDD.toJSON
1b11980 [Dan McClary] Map and UserDefinedTypes partially done
11d2016 [Dan McClary] formatting and unicode deserialization default fixed
6af72d1 [Dan McClary] deleted extaneous comment
4d11c0c [Dan McClary] JsonFactory rewrite of toJSON for SchemaRDD
149dafd [Dan McClary] wrapped scala toJSON in sql.py
5e5eb1b [Dan McClary] switched to Jackson for JSON processing
6c94a54 [Dan McClary] added toJSON to pyspark SchemaRDD
aaeba58 [Dan McClary] added toJSON to pyspark SchemaRDD
1d171aa [Dan McClary] upated missing brace on if statement
319e3ba [Dan McClary] updated to upstream master with merged SPARK-4228
424f130 [Dan McClary] tests pass, ready for pull and PR
626a5b1 [Dan McClary] added toJSON to SchemaRDD
f7d166a [Dan McClary] added toJSON method
5d34e37 [Dan McClary] merge resolved
d6d19e9 [Dan McClary] pr example
2014-11-20 13:44:19 -08:00
Cheng Lian abf29187f0 [SPARK-3938][SQL] Names in-memory columnar RDD with corresponding table name
This PR enables the Web UI storage tab to show the in-memory table name instead of the mysterious query plan string as the name of the in-memory columnar RDD.

Note that after #2501, a single columnar RDD can be shared by multiple in-memory tables, as long as their query results are the same. In this case, only the first cached table name is shown. For example:

```sql
CACHE TABLE first AS SELECT * FROM src;
CACHE TABLE second AS SELECT * FROM src;
```

The Web UI only shows "In-memory table first".

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3383)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #3383 from liancheng/columnar-rdd-name and squashes the following commits:

071907f [Cheng Lian] Fixes tests
12ddfa6 [Cheng Lian] Names in-memory columnar RDD with corresponding table name
2014-11-20 13:12:24 -08:00
Cheng Lian 423baea953 [SPARK-4468][SQL] Fixes Parquet filter creation for inequality predicates with literals on the left hand side
For expressions like `10 < someVar`, we should create an `Operators.Gt` filter, but right now an `Operators.Lt` is created. This issue affects all inequality predicates with literals on the left hand side.

(This bug existed before #3317 and affects branch-1.1. #3338 was opened to backport this to branch-1.1.)

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3334)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #3334 from liancheng/fix-parquet-comp-filter and squashes the following commits:

0130897 [Cheng Lian] Fixes Parquet comparison filter generation
2014-11-18 17:41:54 -08:00
Davies Liu 4a377aff2d [SPARK-3721] [PySpark] broadcast objects larger than 2G
This patch will bring support for broadcasting objects larger than 2G.

pickle, zlib, FrameSerializer and Array[Byte] all can not support objects larger than 2G, so this patch introduce LargeObjectSerializer to serialize broadcast objects, the object will be serialized and compressed into small chunks, it also change the type of Broadcast[Array[Byte]]] into Broadcast[Array[Array[Byte]]]].

Testing for support broadcast objects larger than 2G is slow and memory hungry, so this is tested manually, could be added into SparkPerf.

Author: Davies Liu <davies@databricks.com>
Author: Davies Liu <davies.liu@gmail.com>

Closes #2659 from davies/huge and squashes the following commits:

7b57a14 [Davies Liu] add more tests for broadcast
28acff9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
a2f6a02 [Davies Liu] bug fix
4820613 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
5875c73 [Davies Liu] address comments
10a349b [Davies Liu] address comments
0c33016 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
6182c8f [Davies Liu] Merge branch 'master' into huge
d94b68f [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
2514848 [Davies Liu] address comments
fda395b [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
1c2d928 [Davies Liu] fix scala style
091b107 [Davies Liu] broadcast objects larger than 2G
2014-11-18 16:17:51 -08:00
Cheng Lian 36b0956a3e [SPARK-4453][SPARK-4213][SQL] Simplifies Parquet filter generation code
While reviewing PR #3083 and #3161, I noticed that Parquet record filter generation code can be simplified significantly according to the clue stated in [SPARK-4453](https://issues.apache.org/jira/browse/SPARK-4213). This PR addresses both SPARK-4453 and SPARK-4213 with this simplification.

While generating `ParquetTableScan` operator, we need to remove all Catalyst predicates that have already been pushed down to Parquet. Originally, we first generate the record filter, and then call `findExpression` to traverse the generated filter to find out all pushed down predicates [[1](64c6b9bad5/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala (L213-L228))]. In this way, we have to introduce the `CatalystFilter` class hierarchy to bind the Catalyst predicates together with their generated Parquet filter, and complicate the code base a lot.

The basic idea of this PR is that, we don't need `findExpression` after filter generation, because we already know a predicate can be pushed down if we can successfully generate its corresponding Parquet filter. SPARK-4213 is fixed by returning `None` for any unsupported predicate type.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3317)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #3317 from liancheng/simplify-parquet-filters and squashes the following commits:

d6a9499 [Cheng Lian] Fixes import styling issue
43760e8 [Cheng Lian] Simplifies Parquet filter generation logic
2014-11-17 16:55:12 -08:00
Cheng Lian 5ce7dae859 [SQL] Makes conjunction pushdown more aggressive for in-memory table
This is inspired by the [Parquet record filter generation code](64c6b9bad5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala (L387-L400)).

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3318)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #3318 from liancheng/aggresive-conj-pushdown and squashes the following commits:

78b69d2 [Cheng Lian] Makes conjunction pushdown more aggressive
2014-11-17 15:33:13 -08:00
Michael Armbrust 64c6b9bad5 [SPARK-4410][SQL] Add support for external sort
Adds a new operator that uses Spark's `ExternalSort` class.  It is off by default now, but we might consider making it the default if benchmarks show that it does not regress performance.

Author: Michael Armbrust <michael@databricks.com>

Closes #3268 from marmbrus/externalSort and squashes the following commits:

48b9726 [Michael Armbrust] comments
b98799d [Michael Armbrust] Add test
afd7562 [Michael Armbrust] Add support for external sort.
2014-11-16 21:55:57 -08:00
Jim Carroll 37482ce5a7 [SPARK-4412][SQL] Fix Spark's control of Parquet logging.
The Spark ParquetRelation.scala code makes the assumption that the parquet.Log class has already been loaded. If ParquetRelation.enableLogForwarding executes prior to the parquet.Log class being loaded then the code in enableLogForwarding has no affect.

ParquetRelation.scala attempts to override the parquet logger but, at least currently (and if your application simply reads a parquet file before it does anything else with Parquet), the parquet.Log class hasn't been loaded yet. Therefore the code in ParquetRelation.enableLogForwarding has no affect. If you look at the code in parquet.Log there's a static initializer that needs to be called prior to enableLogForwarding or whatever enableLogForwarding does gets undone by this static initializer.

The "fix" would be to force the static initializer to get called in parquet.Log as part of enableForwardLogging.

Author: Jim Carroll <jim@dontcallme.com>

Closes #3271 from jimfcarroll/parquet-logging and squashes the following commits:

37bdff7 [Jim Carroll] Fix Spark's control of Parquet logging.
2014-11-14 15:33:21 -08:00
Yash Datta 63ca3af66f [SPARK-4365][SQL] Remove unnecessary filter call on records returned from parquet library
Since parquet library has been updated , we no longer need to filter the records returned from parquet library for null records , as now the library skips those :

from parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java

public boolean nextKeyValue() throws IOException, InterruptedException {
boolean recordFound = false;
while (!recordFound) {
// no more records left
if (current >= total)
{ return false; }
try {
checkRead();
currentValue = recordReader.read();
current ++;
if (recordReader.shouldSkipCurrentRecord())
{
 // this record is being filtered via the filter2 package
if (DEBUG) LOG.debug("skipping record");
 continue;
 }
if (currentValue == null)
{
// only happens with FilteredRecordReader at end of block current = totalCountLoadedSoFar;
 if (DEBUG) LOG.debug("filtered record reader reached end of block");
 continue;
}

recordFound = true;
if (DEBUG) LOG.debug("read value: " + currentValue);
} catch (RuntimeException e)
{ throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e); }

}
return true;
}

Author: Yash Datta <Yash.Datta@guavus.com>

Closes #3229 from saucam/remove_filter and squashes the following commits:

8909ae9 [Yash Datta] SPARK-4365: Remove unnecessary filter call on records returned from parquet library
2014-11-14 15:16:40 -08:00
Jim Carroll f76b968370 [SPARK-4386] Improve performance when writing Parquet files.
If you profile the writing of a Parquet file, the single worst time consuming call inside of org.apache.spark.sql.parquet.MutableRowWriteSupport.write is actually in the scala.collection.AbstractSequence.size call. This is because the size call actually ends up COUNTING the elements in a scala.collection.LinearSeqOptimized.length ("optimized?").

This doesn't need to be done. "size" is called repeatedly where needed rather than called once at the top of the method and stored in a 'val'.

Author: Jim Carroll <jim@dontcallme.com>

Closes #3254 from jimfcarroll/parquet-perf and squashes the following commits:

30cc0b5 [Jim Carroll] Improve performance when writing Parquet files.
2014-11-14 15:11:53 -08:00
Michael Armbrust 4b4b50c9e5 [SQL] Don't shuffle code generated rows
When sort based shuffle and code gen are on we were trying to ship the code generated rows during a shuffle.  This doesn't work because the classes don't exist on the other side.  Instead we now copy into a generic row before shipping.

Author: Michael Armbrust <michael@databricks.com>

Closes #3263 from marmbrus/aggCodeGen and squashes the following commits:

f6ba8cf [Michael Armbrust] fix and test
2014-11-14 15:03:23 -08:00
Michael Armbrust e47c387639 [SPARK-4391][SQL] Configure parquet filters using SQLConf
This is more uniform with the rest of SQL configuration and allows it to be turned on and off without restarting the SparkContext.  In this PR I also turn off filter pushdown by default due to a number of outstanding issues (in particular SPARK-4258).  When those are fixed we should turn it back on by default.

Author: Michael Armbrust <michael@databricks.com>

Closes #3258 from marmbrus/parquetFilters and squashes the following commits:

5655bfe [Michael Armbrust] Remove extra line.
15e9a98 [Michael Armbrust] Enable filters for tests
75afd39 [Michael Armbrust] Fix comments
78fa02d [Michael Armbrust] off by default
e7f9e16 [Michael Armbrust] First draft of correctly configuring parquet filter pushdown
2014-11-14 14:59:35 -08:00
Michael Armbrust 77e845ca77 [SPARK-4394][SQL] Data Sources API Improvements
This PR adds two features to the data sources API:
 - Support for pushing down `IN` filters
 - The ability for relations to optionally provide information about their `sizeInBytes`.

Author: Michael Armbrust <michael@databricks.com>

Closes #3260 from marmbrus/sourcesImprovements and squashes the following commits:

9a5e171 [Michael Armbrust] Use method instead of configuration directly
99c0e6b [Michael Armbrust] Add support for sizeInBytes.
416f167 [Michael Armbrust] Support for IN in data sources API.
2a04ab3 [Michael Armbrust] Simplify implementation of InSet.
2014-11-14 12:00:08 -08:00
Cheng Hao c764d0ac1c [SPARK-4274] [SQL] Fix NPE in printing the details of the query plan
Author: Cheng Hao <hao.cheng@intel.com>

Closes #3139 from chenghao-intel/comparison_test and squashes the following commits:

f5d7146 [Cheng Hao] avoid exception in printing the codegen enabled
2014-11-10 17:46:05 -08:00
Daoyuan Wang a1fc059b69 [SPARK-4149][SQL] ISO 8601 support for json date time strings
This implement the feature davies mentioned in https://github.com/apache/spark/pull/2901#discussion-diff-19313312

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #3012 from adrian-wang/iso8601 and squashes the following commits:

50df6e7 [Daoyuan Wang] json data timestamp ISO8601 support
2014-11-10 17:26:03 -08:00
Xiangrui Meng d793d80c80 [SQL] remove a decimal case branch that has no effect at runtime
it generates warnings at compile time marmbrus

Author: Xiangrui Meng <meng@databricks.com>

Closes #3192 from mengxr/dtc-decimal and squashes the following commits:

955e9fb [Xiangrui Meng] remove a decimal case branch that has no effect
2014-11-10 17:20:52 -08:00
Sean Owen f8e5732307 SPARK-1209 [CORE] (Take 2) SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop
andrewor14 Another try at SPARK-1209, to address https://github.com/apache/spark/pull/2814#issuecomment-61197619

I successfully tested with `mvn -Dhadoop.version=1.0.4 -DskipTests clean package; mvn -Dhadoop.version=1.0.4 test` I assume that is what failed Jenkins last time. I also tried `-Dhadoop.version1.2.1` and `-Phadoop-2.4 -Pyarn -Phive` for more coverage.

So this is why the class was put in `org.apache.hadoop` to begin with, I assume. One option is to leave this as-is for now and move it only when Hadoop 1.0.x support goes away.

This is the other option, which adds a call to force the constructor to be public at run-time. It's probably less surprising than putting Spark code in `org.apache.hadoop`, but, does involve reflection. A `SecurityManager` might forbid this, but it would forbid a lot of stuff Spark does. This would also only affect Hadoop 1.0.x it seems.

Author: Sean Owen <sowen@cloudera.com>

Closes #3048 from srowen/SPARK-1209 and squashes the following commits:

0d48f4b [Sean Owen] For Hadoop 1.0.x, make certain constructors public, which were public in later versions
466e179 [Sean Owen] Disable MIMA warnings resulting from moving the class -- this was also part of the PairRDDFunctions type hierarchy though?
eb61820 [Sean Owen] Move SparkHadoopMapRedUtil / SparkHadoopMapReduceUtil from org.apache.hadoop to org.apache.spark
2014-11-09 22:11:20 -08:00
Kousuke Saruta 14c54f1876 [SPARK-4213][SQL] ParquetFilters - No support for LT, LTE, GT, GTE operators
Following description is quoted from JIRA:

When I issue a hql query against a HiveContext where my predicate uses a column of string type with one of LT, LTE, GT, or GTE operator, I get the following error:
scala.MatchError: StringType (of class org.apache.spark.sql.catalyst.types.StringType$)
Looking at the code in org.apache.spark.sql.parquet.ParquetFilters, StringType is absent from the corresponding functions for creating these filters.
To reproduce, in a Hive 0.13.1 shell, I created the following table (at a specified DB):

    create table sparkbug (
    id int,
    event string
    ) stored as parquet;

Insert some sample data:

    insert into table sparkbug select 1, '2011-06-18' from <some table> limit 1;
    insert into table sparkbug select 2, '2012-01-01' from <some table> limit 1;

Launch a spark shell and create a HiveContext to the metastore where the table above is located.

    import org.apache.spark.sql._
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.hive.HiveContext
    val hc = new HiveContext(sc)
    hc.setConf("spark.sql.shuffle.partitions", "10")
    hc.setConf("spark.sql.hive.convertMetastoreParquet", "true")
    hc.setConf("spark.sql.parquet.compression.codec", "snappy")
    import hc._
    hc.hql("select * from <db>.sparkbug where event >= '2011-12-01'")

A scala.MatchError will appear in the output.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #3083 from sarutak/SPARK-4213 and squashes the following commits:

4ab6e56 [Kousuke Saruta] WIP
b6890c6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4213
9a1fae7 [Kousuke Saruta] Fixed ParquetFilters so that compare Strings
2014-11-07 11:56:40 -08:00
Xiangrui Meng 3d2b5bc5bb [SPARK-4262][SQL] add .schemaRDD to JavaSchemaRDD
marmbrus

Author: Xiangrui Meng <meng@databricks.com>

Closes #3125 from mengxr/SPARK-4262 and squashes the following commits:

307695e [Xiangrui Meng] add .schemaRDD to JavaSchemaRDD
2014-11-05 19:56:16 -08:00
Davies Liu e4f42631a6 [SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1.

Author: Davies Liu <davies@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Josh Rosen <joshrosen@databricks.com>

Closes #2920 from davies/fix_autobatch and squashes the following commits:

e544ef9 [Davies Liu] revert unrelated change
6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
1d557fc [Davies Liu] fix tests
8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
76abdce [Davies Liu] clean up
53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
b4292ce [Davies Liu] fix bug in master
d79744c [Davies Liu] recover hive tests
be37ece [Davies Liu] refactor
eb3938d [Davies Liu] refactor serializer in scala
8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.
2014-11-03 23:56:14 -08:00
Xiangrui Meng 04450d1154 [SPARK-4192][SQL] Internal API for Python UDT
Following #2919, this PR adds Python UDT (for internal use only) with tests under "pyspark.tests". Before `SQLContext.applySchema`, we check whether we need to convert user-type instances into SQL recognizable data. In the current implementation, a Python UDT must be paired with a Scala UDT for serialization on the JVM side. A following PR will add VectorUDT in MLlib for both Scala and Python.

marmbrus jkbradley davies

Author: Xiangrui Meng <meng@databricks.com>

Closes #3068 from mengxr/SPARK-4192-sql and squashes the following commits:

acff637 [Xiangrui Meng] merge master
dba5ea7 [Xiangrui Meng] only use pyClass for Python UDT output sqlType as well
2c9d7e4 [Xiangrui Meng] move import to global setup; update needsConversion
7c4a6a9 [Xiangrui Meng] address comments
75223db [Xiangrui Meng] minor update
f740379 [Xiangrui Meng] remove UDT from default imports
e98d9d0 [Xiangrui Meng] fix py style
4e84fce [Xiangrui Meng] remove local hive tests and add more tests
39f19e0 [Xiangrui Meng] add tests
b7f666d [Xiangrui Meng] add Python UDT
2014-11-03 19:29:11 -08:00
Michael Armbrust 25bef7e695 [SQL] More aggressive defaults
- Turns on compression for in-memory cached data by default
 - Changes the default parquet compression format back to gzip (we have seen more OOMs with production workloads due to the way Snappy allocates memory)
 - Ups the batch size to 10,000 rows
 - Increases the broadcast threshold to 10mb.
 - Uses our parquet implementation instead of the hive one by default.
 - Cache parquet metadata by default.

Author: Michael Armbrust <michael@databricks.com>

Closes #3064 from marmbrus/fasterDefaults and squashes the following commits:

97ee9f8 [Michael Armbrust] parquet codec docs
e641694 [Michael Armbrust] Remote also
a12866a [Michael Armbrust] Cache metadata.
2d73acc [Michael Armbrust] Update docs defaults.
d63d2d5 [Michael Armbrust] document parquet option
da373f9 [Michael Armbrust] More aggressive defaults
2014-11-03 14:08:27 -08:00
Joseph K. Bradley ebd6480587 [SPARK-3572] [SQL] Internal API for User-Defined Types
This PR adds User-Defined Types (UDTs) to SQL. It is a precursor to using SchemaRDD as a Dataset for the new MLlib API. Currently, the UDT API is private since there is incomplete support (e.g., no Java or Python support yet).

Author: Joseph K. Bradley <joseph@databricks.com>
Author: Michael Armbrust <michael@databricks.com>
Author: Xiangrui Meng <meng@databricks.com>

Closes #3063 from marmbrus/udts and squashes the following commits:

7ccfc0d [Michael Armbrust] remove println
46a3aee [Michael Armbrust] Slightly easier to read test output.
6cc434d [Michael Armbrust] Recursively convert rows.
e369b91 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into udts
15c10a6 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into sql-udt2
f3c72fe [Joseph K. Bradley] Fixing merge
e13cd8a [Joseph K. Bradley] Removed Vector UDTs
5817b2b [Joseph K. Bradley] style edits
30ce5b2 [Joseph K. Bradley] updates based on code review
d063380 [Joseph K. Bradley] Cleaned up Java UDT Suite, and added warning about element ordering when creating schema from Java Bean
a571bb6 [Joseph K. Bradley] Removed old UDT code (registry and Java UDTs).  Cleaned up other code.  Extended JavaUserDefinedTypeSuite
6fddc1c [Joseph K. Bradley] Made MyLabeledPoint into a Java Bean
20630bc [Joseph K. Bradley] fixed scalastyle
fa86b20 [Joseph K. Bradley] Removed Java UserDefinedType, and made UDTs private[spark] for now
8de957c [Joseph K. Bradley] Modified UserDefinedType to store Java class of user type so that registerUDT takes only the udt argument.
8b242ea [Joseph K. Bradley] Fixed merge error after last merge.  Note: Last merge commit also removed SQL UDT examples from mllib.
7f29656 [Joseph K. Bradley] Moved udt case to top of all matches.  Small cleanups
b028675 [Xiangrui Meng] allow any type in UDT
4500d8a [Xiangrui Meng] update example code
87264a5 [Xiangrui Meng] remove debug code
3143ac3 [Xiangrui Meng] remove unnecessary changes
cfbc321 [Xiangrui Meng] support UDT in parquet
db16139 [Joseph K. Bradley] Added more doc for UserDefinedType.  Removed unused code in Suite
759af7a [Joseph K. Bradley] Added more doc to UserDefineType
63626a4 [Joseph K. Bradley] Updated ScalaReflectionsSuite per @marmbrus suggestions
51e5282 [Joseph K. Bradley] fixed 1 test
f025035 [Joseph K. Bradley] Cleanups before PR.  Added new tests
85872f6 [Michael Armbrust] Allow schema calculation to be lazy, but ensure its available on executors.
dff99d6 [Joseph K. Bradley] Added UDTs for Vectors in MLlib, plus DatasetExample using the UDTs
cd60cb4 [Joseph K. Bradley] Trying to get other SQL tests to run
34a5831 [Joseph K. Bradley] Added MLlib dependency on SQL.
e1f7b9c [Joseph K. Bradley] blah
2f40c02 [Joseph K. Bradley] renamed UDT types
3579035 [Joseph K. Bradley] udt annotation now working
b226b9e [Joseph K. Bradley] Changing UDT to annotation
fea04af [Joseph K. Bradley] more cleanups
964b32e [Joseph K. Bradley] some cleanups
893ee4c [Joseph K. Bradley] udt finallly working
50f9726 [Joseph K. Bradley] udts
04303c9 [Joseph K. Bradley] udts
39f8707 [Joseph K. Bradley] removed old udt suite
273ac96 [Joseph K. Bradley] basic UDT is working, but deserialization has yet to be done
8bebf24 [Joseph K. Bradley] commented out convertRowToScala for debugging
53de70f [Joseph K. Bradley] more udts...
982c035 [Joseph K. Bradley] still working on UDTs
19b2f60 [Joseph K. Bradley] still working on UDTs
0eaeb81 [Joseph K. Bradley] Still working on UDTs
105c5a3 [Joseph K. Bradley] Adding UserDefinedType to SQL, not done yet.
2014-11-02 17:56:00 -08:00
Cheng Lian 9081b9f9f7 [SPARK-2189][SQL] Adds dropTempTable API
This PR adds an API for unregistering temporary tables. If a temporary table has been cached before, it's unpersisted as well.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #3039 from liancheng/unregister-temp-table and squashes the following commits:

54ae99f [Cheng Lian] Fixes Scala styling issue
1948c14 [Cheng Lian] Removes the unpersist argument
aca41d3 [Cheng Lian] Ensures thread safety
7d4fb2b [Cheng Lian] Adds unregisterTempTable API
2014-11-02 16:00:24 -08:00
Yin Huai 06232d23ff [SPARK-4185][SQL] JSON schema inference failed when dealing with type conflicts in arrays
JIRA: https://issues.apache.org/jira/browse/SPARK-4185.

This PR also has the fix of #3052.

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #3056 from yhuai/SPARK-4185 and squashes the following commits:

ed3a5a8 [Yin Huai] Correctly handle type conflicts between structs and primitive types in an array.
2014-11-02 15:46:56 -08:00
Cheng Lian c9f840046f [SPARK-3791][SQL] Provides Spark version and Hive version in HiveThriftServer2
This PR overrides the `GetInfo` Hive Thrift API to provide correct version information. Another property `spark.sql.hive.version` is added to reveal the underlying Hive version. These are generally useful for Spark SQL ODBC driver providers. The Spark version information is extracted from the jar manifest. Also took the chance to remove the `SET -v` hack, which was a workaround for Simba ODBC driver connectivity.

TODO

- [x] Find a general way to figure out Hive (or even any dependency) version.

  This [blog post](http://blog.soebes.de/blog/2014/01/02/version-information-into-your-appas-with-maven/) suggests several methods to inspect application version. In the case of Spark, this can be tricky because the chosen method:

  1. must applies to both Maven build and SBT build

    For Maven builds, we can retrieve the version information from the META-INF/maven directory within the assembly jar. But this doesn't work for SBT builds.

  2. must not rely on the original jars of dependencies to extract specific dependency version, because Spark uses assembly jar.

    This implies we can't read Hive version from Hive jar files since standard Spark distribution doesn't include them.

  3. should play well with `SPARK_PREPEND_CLASSES` to ease local testing during development.

     `SPARK_PREPEND_CLASSES` prevents classes to be loaded from the assembly jar, thus we can't locate the jar file and read its manifest.

  Given these, maybe the only reliable method is to generate a source file containing version information at build time. pwendell Do you have any suggestions from the perspective of the build process?

**Update** Hive version is now retrieved from the newly introduced `HiveShim` object.

Author: Cheng Lian <lian.cs.zju@gmail.com>
Author: Cheng Lian <lian@databricks.com>

Closes #2843 from liancheng/get-info and squashes the following commits:

a873d0f [Cheng Lian] Updates test case
53f43cd [Cheng Lian] Retrieves underlying Hive verson via HiveShim
1d282b8 [Cheng Lian] Removes the Simba ODBC "SET -v" hack
f857fce [Cheng Lian] Overrides Hive GetInfo Thrift API and adds Hive version property
2014-11-02 15:18:29 -08:00
Cheng Lian e4b80894bd [SPARK-4182][SQL] Fixes ColumnStats classes for boolean, binary and complex data types
`NoopColumnStats` was once used for binary, boolean and complex data types. This `ColumnStats` doesn't return properly shaped column statistics and causes caching failure if a table contains columns of the aforementioned types.

This PR adds `BooleanColumnStats`, `BinaryColumnStats` and `GenericColumnStats`, used for boolean, binary and all complex data types respectively. In addition, `NoopColumnStats` returns properly shaped column statistics containing null count and row count, but this class is now used for testing purpose only.

Author: Cheng Lian <lian@databricks.com>

Closes #3059 from liancheng/spark-4182 and squashes the following commits:

b398cfd [Cheng Lian] Fixes failed test case
fb3ee85 [Cheng Lian] Fixes SPARK-4182
2014-11-02 15:14:44 -08:00
Michael Armbrust 9c0eb57c73 [SPARK-3247][SQL] An API for adding data sources to Spark SQL
This PR introduces a new set of APIs to Spark SQL to allow other developers to add support for reading data from new sources in `org.apache.spark.sql.sources`.

New sources must implement the interface `BaseRelation`, which is responsible for describing the schema of the data.  BaseRelations have three `Scan` subclasses, which are responsible for producing an RDD containing row objects.  The [various Scan interfaces](https://github.com/marmbrus/spark/blob/foreign/sql/core/src/main/scala/org/apache/spark/sql/sources/package.scala#L50) allow for optimizations such as column pruning and filter push down, when the underlying data source can handle these operations.

By implementing a class that inherits from RelationProvider these data sources can be accessed using using pure SQL.  I've used the functionality to update the JSON support so it can now be used in this way as follows:

```sql
CREATE TEMPORARY TABLE jsonTableSQL
USING org.apache.spark.sql.json
OPTIONS (
  path '/home/michael/data.json'
)
```

Further example usage can be found in the test cases: https://github.com/marmbrus/spark/tree/foreign/sql/core/src/test/scala/org/apache/spark/sql/sources

There is also a library that uses this new API to read avro data available here:
https://github.com/marmbrus/sql-avro

Author: Michael Armbrust <michael@databricks.com>

Closes #2475 from marmbrus/foreign and squashes the following commits:

1ed6010 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into foreign
ab2c31f [Michael Armbrust] fix test
1d41bb5 [Michael Armbrust] unify argument names
5b47901 [Michael Armbrust] Remove sealed, more filter types
fab154a [Michael Armbrust] Merge remote-tracking branch 'origin/master' into foreign
e3e690e [Michael Armbrust] Add hook for extraStrategies
a70d602 [Michael Armbrust] Fix style, more tests, FilteredSuite => PrunedFilteredSuite
70da6d9 [Michael Armbrust] Modify API to ease binary compatibility and interop with Java
7d948ae [Michael Armbrust] Fix equality of AttributeReference.
5545491 [Michael Armbrust] Address comments
5031ac3 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into foreign
22963ef [Michael Armbrust] package objects compile wierdly...
b069146 [Michael Armbrust] traits => abstract classes
34f836a [Michael Armbrust] Make @DeveloperApi
0d74bcf [Michael Armbrust] Add documention on object life cycle
3e06776 [Michael Armbrust] remove line wraps
de3b68c [Michael Armbrust] Remove empty file
360cb30 [Michael Armbrust] style and java api
2957875 [Michael Armbrust] add override
0fd3a07 [Michael Armbrust] Draft of data sources API
2014-11-02 15:08:35 -08:00
Matei Zaharia 23f966f475 [SPARK-3930] [SPARK-3933] Support fixed-precision decimal in SQL, and some optimizations
- Adds optional precision and scale to Spark SQL's decimal type, which behave similarly to those in Hive 13 (https://cwiki.apache.org/confluence/download/attachments/27362075/Hive_Decimal_Precision_Scale_Support.pdf)
- Replaces our internal representation of decimals with a Decimal class that can store small values in a mutable Long, saving memory in this situation and letting some operations happen directly on Longs

This is still marked WIP because there are a few TODOs, but I'll remove that tag when done.

Author: Matei Zaharia <matei@databricks.com>

Closes #2983 from mateiz/decimal-1 and squashes the following commits:

35e6b02 [Matei Zaharia] Fix issues after merge
227f24a [Matei Zaharia] Review comments
31f915e [Matei Zaharia] Implement Davies's suggestions in Python
eb84820 [Matei Zaharia] Support reading/writing decimals as fixed-length binary in Parquet
4dc6bae [Matei Zaharia] Fix decimal support in PySpark
d1d9d68 [Matei Zaharia] Fix compile error and test issues after rebase
b28933d [Matei Zaharia] Support decimal precision/scale in Hive metastore
2118c0d [Matei Zaharia] Some test and bug fixes
81db9cb [Matei Zaharia] Added mutable Decimal that will be more efficient for small precisions
7af0c3b [Matei Zaharia] Add optional precision and scale to DecimalType, but use Unlimited for now
ec0a947 [Matei Zaharia] Make the result of AVG on Decimals be Decimal, not Double
2014-11-01 19:29:14 -07:00
Xiangrui Meng 1d4f355203 [SPARK-3569][SQL] Add metadata field to StructField
Add `metadata: Metadata` to `StructField` to store extra information of columns. `Metadata` is a simple wrapper over `Map[String, Any]` with value types restricted to Boolean, Long, Double, String, Metadata, and arrays of those types. SerDe is via JSON.

Metadata is preserved through simple operations like `SELECT`.

marmbrus liancheng

Author: Xiangrui Meng <meng@databricks.com>
Author: Michael Armbrust <michael@databricks.com>

Closes #2701 from mengxr/structfield-metadata and squashes the following commits:

dedda56 [Xiangrui Meng] merge remote
5ef930a [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
c35203f [Xiangrui Meng] Merge pull request #1 from marmbrus/pr/2701
886b85c [Michael Armbrust] Expose Metadata and MetadataBuilder through the public scala and java packages.
589f314 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
1e2abcf [Xiangrui Meng] change default value of metadata to None in python
611d3c2 [Xiangrui Meng] move metadata from Expr to NamedExpr
ddfcfad [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
a438440 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
4266f4d [Xiangrui Meng] add StructField.toString back for backward compatibility
3f49aab [Xiangrui Meng] remove StructField.toString
24a9f80 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
473a7c5 [Xiangrui Meng] merge master
c9d7301 [Xiangrui Meng] organize imports
1fcbf13 [Xiangrui Meng] change metadata type in StructField for Scala/Java
60cc131 [Xiangrui Meng] add doc and header
60614c7 [Xiangrui Meng] add metadata
e42c452 [Xiangrui Meng] merge master
93518fb [Xiangrui Meng] support metadata in python
905bb89 [Xiangrui Meng] java conversions
618e349 [Xiangrui Meng] make tests work in scala
61b8e0f [Xiangrui Meng] merge master
7e5a322 [Xiangrui Meng] do not output metadata in StructField.toString
c41a664 [Xiangrui Meng] merge master
d8af0ed [Xiangrui Meng] move tests to SQLQuerySuite
67fdebb [Xiangrui Meng] add test on join
d65072e [Xiangrui Meng] remove Map.empty
367d237 [Xiangrui Meng] add test
c194d5e [Xiangrui Meng] add metadata field to StructField and Attribute
2014-11-01 14:37:00 -07:00
Andrew Or 26d31d15fd Revert "SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop"
This reverts commit 68cb69daf3.
2014-10-30 17:56:10 -07:00
Yash Datta 2e35e24294 [SPARK-3968][SQL] Use parquet-mr filter2 api
The parquet-mr project has introduced a new filter api  (https://github.com/apache/incubator-parquet-mr/pull/4), along with several fixes . It can also eliminate entire RowGroups depending on certain statistics like min/max
We can leverage that to further improve performance of queries with filters.
Also filter2 api introduces ability to create custom filters. We can create a custom filter for the optimized In clause (InSet) , so that elimination happens in the ParquetRecordReader itself

Author: Yash Datta <Yash.Datta@guavus.com>

Closes #2841 from saucam/master and squashes the following commits:

8282ba0 [Yash Datta] SPARK-3968: fix scala code style and add some more tests for filtering on optional columns
515df1c [Yash Datta] SPARK-3968: Add a test case for filter pushdown on optional column
5f4530e [Yash Datta] SPARK-3968: Fix scala code style
f304667 [Yash Datta] SPARK-3968: Using task metadata strategy for row group filtering
ec53e92 [Yash Datta] SPARK-3968: No push down should result in case we are unable to create a record filter
48163c3 [Yash Datta] SPARK-3968: Code cleanup
cc7b596 [Yash Datta] SPARK-3968: 1. Fix RowGroupFiltering not working             2. Use the serialization/deserialization from Parquet library for filter pushdown
caed851 [Yash Datta] Revert "SPARK-3968: Not pushing the filters in case of OPTIONAL columns" since filtering on optional columns is now supported in filter2 api
49703c9 [Yash Datta] SPARK-3968: Not pushing the filters in case of OPTIONAL columns
9d09741 [Yash Datta] SPARK-3968: Change parquet filter pushdown to use filter2 api of parquet-mr
2014-10-30 17:17:31 -07:00
Sean Owen 68cb69daf3 SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop
(This is just a look at what completely moving the classes would look like. I know Patrick flagged that as maybe not OK, although, it's private?)

Author: Sean Owen <sowen@cloudera.com>

Closes #2814 from srowen/SPARK-1209 and squashes the following commits:

ead1115 [Sean Owen] Disable MIMA warnings resulting from moving the class -- this was also part of the PairRDDFunctions type hierarchy though?
2d42c1d [Sean Owen] Move SparkHadoopMapRedUtil / SparkHadoopMapReduceUtil from org.apache.hadoop to org.apache.spark
2014-10-30 15:54:53 -07:00
Daoyuan Wang 3535467663 [SPARK-4003] [SQL] add 3 types for java SQL context
In JavaSqlContext, we need to let java program use big decimal, timestamp, date types.

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #2850 from adrian-wang/javacontext and squashes the following commits:

4c4292c [Daoyuan Wang] change underlying type of JavaSchemaRDD as scala
bb0508f [Daoyuan Wang] add test cases
3c58b0d [Daoyuan Wang] add 3 types for java SQL context
2014-10-29 12:10:58 -07:00
Davies Liu 8c0bfd08fc [SPARK-4133] [SQL] [PySpark] type conversionfor python udf
Call Python UDF on ArrayType/MapType/PrimitiveType, the returnType can also be ArrayType/MapType/PrimitiveType.

For StructType, it will act as tuple (without attributes). If returnType is StructType, it also should be tuple.

Author: Davies Liu <davies@databricks.com>

Closes #2973 from davies/udf_array and squashes the following commits:

306956e [Davies Liu] Merge branch 'master' of github.com:apache/spark into udf_array
2c00e43 [Davies Liu] fix merge
11395fa [Davies Liu] Merge branch 'master' of github.com:apache/spark into udf_array
9df50a2 [Davies Liu] address comments
79afb4e [Davies Liu] type conversionfor python udf
2014-10-28 19:38:16 -07:00
Cheng Hao 4b55482abf [SPARK-3343] [SQL] Add serde support for CTAS
Currently, `CTAS` (Create Table As Select) doesn't support specifying the `SerDe` in HQL. This PR will pass down the `ASTNode` into the physical operator `execution.CreateTableAsSelect`, which will extract the `CreateTableDesc` object via Hive `SemanticAnalyzer`. In the meantime, I also update the `HiveMetastoreCatalog.createTable` to optionally support the `CreateTableDesc` for table creation.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #2570 from chenghao-intel/ctas_serde and squashes the following commits:

e011ef5 [Cheng Hao] shim for both 0.12 & 0.13.1
cfb3662 [Cheng Hao] revert to hive 0.12
c8a547d [Cheng Hao] Support SerDe properties within CTAS
2014-10-28 14:36:06 -07:00
Daoyuan Wang 47a40f60d6 [SPARK-3988][SQL] add public API for date type
Add json and python api for date type.
By using Pickle, `java.sql.Date` was serialized as calendar, and recognized in python as `datetime.datetime`.

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #2901 from adrian-wang/spark3988 and squashes the following commits:

c51a24d [Daoyuan Wang] convert datetime to date
5670626 [Daoyuan Wang] minor line combine
f760d8e [Daoyuan Wang] fix indent
444f100 [Daoyuan Wang] fix a typo
1d74448 [Daoyuan Wang] fix scala style
8d7dd22 [Daoyuan Wang] add json and python api for date type
2014-10-28 13:43:25 -07:00
Yin Huai 0481aaa8d7 [SPARK-4068][SQL] NPE in jsonRDD schema inference
Please refer to added tests for cases that can trigger the bug.

JIRA: https://issues.apache.org/jira/browse/SPARK-4068

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #2918 from yhuai/SPARK-4068 and squashes the following commits:

d360eae [Yin Huai] Handle nulls when building key paths from elements of an array.
2014-10-26 16:32:02 -07:00
Cheng Lian 2838bf8aad [SPARK-3537][SPARK-3914][SQL] Refines in-memory columnar table statistics
This PR refines in-memory columnar table statistics:

1. adds 2 more statistics for in-memory table columns: `count` and `sizeInBytes`
1. adds filter pushdown support for `IS NULL` and `IS NOT NULL`.
1. caches and propagates statistics in `InMemoryRelation` once the underlying cached RDD is materialized.

   Statistics are collected to driver side with an accumulator.

This PR also fixes SPARK-3914 by properly propagating in-memory statistics.

Author: Cheng Lian <lian@databricks.com>

Closes #2860 from liancheng/propagates-in-mem-stats and squashes the following commits:

0cc5271 [Cheng Lian] Restricts visibility of o.a.s.s.c.p.l.Statistics
c5ff904 [Cheng Lian] Fixes test table name conflict
a8c818d [Cheng Lian] Refines tests
1d01074 [Cheng Lian] Bug fix: shouldn't call STRING.actualSize on null string value
7dc6a34 [Cheng Lian] Adds more in-memory table statistics and propagates them properly
2014-10-26 16:10:09 -07:00
Sean Owen df7974b8e5 SPARK-3359 [DOCS] sbt/sbt unidoc doesn't work with Java 8
This follows https://github.com/apache/spark/pull/2893 , but does not completely fix SPARK-3359 either. This fixes minor scaladoc/javadoc issues that Javadoc 8 will treat as errors.

Author: Sean Owen <sowen@cloudera.com>

Closes #2909 from srowen/SPARK-3359 and squashes the following commits:

f62c347 [Sean Owen] Fix some javadoc issues that javadoc 8 considers errors. This is not all of the errors turned up when javadoc 8 runs on output of genjavadoc.
2014-10-25 23:18:02 -07:00
Michael Armbrust 3a845d3c04 [SQL] Update Hive test harness for Hive 12 and 13
As part of the upgrade I also copy the newest version of the query tests, and whitelist a bunch of new ones that are now passing.

Author: Michael Armbrust <michael@databricks.com>

Closes #2936 from marmbrus/fix13tests and squashes the following commits:

d9cbdab [Michael Armbrust] Remove user specific tests
65801cd [Michael Armbrust] style and rat
8f6b09a [Michael Armbrust] Update test harness to work with both Hive 12 and 13.
f044843 [Michael Armbrust] Update Hive query tests and golden files to 0.13
2014-10-24 18:36:35 -07:00
Michael Armbrust 0e886610ee [SPARK-4050][SQL] Fix caching of temporary tables with projections.
Previously cached data was found by `sameResult` plan matching on optimized plans.  This technique however fails to locate the cached data when a temporary table with a projection is queried with a further reduced projection.  The failure is due to the fact that optimization will collapse the projections, producing a plan that no longer produces the sameResult as the cached data (though the cached data still subsumes the desired data).  For example consider the following previously failing test case.

```scala
sql("CACHE TABLE tempTable AS SELECT key FROM testData")
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
```

In this PR I change the matching to occur after analysis instead of optimization, so that in the case of temporary tables, the plans will always match.  I think this should work generally, however, this error does raise questions about the need to do more thorough subsumption checking when locating cached data.

Another question is what sort of semantics we want to provide when uncaching data from temporary tables.  For example consider the following sequence of commands:

```scala
testData.select('key).registerTempTable("tempTable1")
testData.select('key).registerTempTable("tempTable2")
cacheTable("tempTable1")

// This obviously works.
assertCached(sql("SELECT COUNT(*) FROM tempTable1"))

// It seems good that this works ...
assertCached(sql("SELECT COUNT(*) FROM tempTable2"))

// ... but is this valid?
uncacheTable("tempTable2")

// Should this still be cached?
assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
```

Author: Michael Armbrust <michael@databricks.com>

Closes #2912 from marmbrus/cachingBug and squashes the following commits:

9c822d4 [Michael Armbrust] remove commented out code
5c72fb7 [Michael Armbrust] Add a test case / question about uncaching semantics.
63a23e4 [Michael Armbrust] Perform caching on analyzed instead of optimized plan.
03f1cfe [Michael Armbrust] Clean-up / add tests to SameResult suite.
2014-10-24 10:52:25 -07:00
Takuya UESHIN 7586e2e67a [SPARK-3969][SQL] Optimizer should have a super class as an interface.
Some developers want to replace `Optimizer` to fit their projects but can't do so because currently `Optimizer` is an `object`.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #2825 from ueshin/issues/SPARK-3969 and squashes the following commits:

abbc53c [Takuya UESHIN] Re-rename Optimizer object.
4d2e1bc [Takuya UESHIN] Rename Optimizer object.
9547a23 [Takuya UESHIN] Extract abstract class from Optimizer for developers to be able to replace Optimizer.
2014-10-20 17:09:12 -07:00
Sean Owen f406a83918 SPARK-3926 [CORE] Result of JavaRDD.collectAsMap() is not Serializable
Make JavaPairRDD.collectAsMap result Serializable since Java Maps generally are

Author: Sean Owen <sowen@cloudera.com>

Closes #2805 from srowen/SPARK-3926 and squashes the following commits:

ecb78ee [Sean Owen] Fix conflict between java.io.Serializable and use of Scala's Serializable
f4717f9 [Sean Owen] Oops, fix compile problem
ae1b36f [Sean Owen] Expand to cover Maps returned from other Java API methods as well
51c26c2 [Sean Owen] Make JavaPairRDD.collectAsMap result Serializable since Java Maps generally are
2014-10-18 12:38:18 -07:00
Michael Armbrust adcb7d3350 [SPARK-3855][SQL] Preserve the result attribute of python UDFs though transformations
In the current implementation it was possible for the reference to change after analysis.

Author: Michael Armbrust <michael@databricks.com>

Closes #2717 from marmbrus/pythonUdfResults and squashes the following commits:

da14879 [Michael Armbrust] Fix test
6343bcb [Michael Armbrust] add test
9533286 [Michael Armbrust] Correctly preserve the result attribute of python UDFs though transformations
2014-10-17 14:12:07 -07:00
Prashant Sharma 2fe0ba9561 SPARK-3874: Provide stable TaskContext API
This is a small number of clean-up changes on top of #2782. Closes #2782.

Author: Prashant Sharma <prashant.s@imaginea.com>
Author: Patrick Wendell <pwendell@gmail.com>

Closes #2803 from pwendell/pr-2782 and squashes the following commits:

56d5b7a [Patrick Wendell] Minor clean-up
44089ec [Patrick Wendell] Clean-up the TaskContext API.
ed551ce [Prashant Sharma] Fixed a typo
df261d0 [Prashant Sharma] Josh's suggestion
facf3b1 [Prashant Sharma] Fixed the mima issue.
7ecc2fe [Prashant Sharma] CR, Moved implementations to TaskContextImpl
bbd9e05 [Prashant Sharma] adding missed out files to git.
ef633f5 [Prashant Sharma] SPARK-3874, Provide stable TaskContext API
2014-10-16 21:38:45 -04:00
Michael Armbrust 371321cade [SQL] Add type checking debugging functions
Adds some functions that were very useful when trying to track down the bug from #2656.  This change also changes the tree output for query plans to include the `'` prefix to unresolved nodes and `!` prefix to nodes that refer to non-existent attributes.

Author: Michael Armbrust <michael@databricks.com>

Closes #2657 from marmbrus/debugging and squashes the following commits:

654b926 [Michael Armbrust] Clean-up, add tests
763af15 [Michael Armbrust] Add typeChecking debugging functions
8c69303 [Michael Armbrust] Add inputSet, references to QueryPlan. Improve tree string with a prefix to denote invalid or unresolved nodes.
fbeab54 [Michael Armbrust] Better toString, factories for AttributeSet.
2014-10-13 13:46:34 -07:00
Takuya UESHIN 73da9c26b0 [SPARK-3771][SQL] AppendingParquetOutputFormat should use reflection to prevent from breaking binary-compatibility.
Original problem is [SPARK-3764](https://issues.apache.org/jira/browse/SPARK-3764).

`AppendingParquetOutputFormat` uses a binary-incompatible method `context.getTaskAttemptID`.
This causes binary-incompatible of Spark itself, i.e. if Spark itself is built against hadoop-1, the artifact is for only hadoop-1, and vice versa.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #2638 from ueshin/issues/SPARK-3771 and squashes the following commits:

efd3784 [Takuya UESHIN] Add a comment to explain the reason to use reflection.
ec213c1 [Takuya UESHIN] Use reflection to prevent breaking binary-compatibility.
2014-10-13 13:43:41 -07:00
Daoyuan Wang 2ac40da3f9 [SPARK-3407][SQL]Add Date type support
Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #2344 from adrian-wang/date and squashes the following commits:

f15074a [Daoyuan Wang] remove outdated lines
2038085 [Daoyuan Wang] update return type
00fe81f [Daoyuan Wang] address lian cheng's comments
0df6ea1 [Daoyuan Wang] rebase and remove simple string
bb1b1ef [Daoyuan Wang] remove failing test
aa96735 [Daoyuan Wang] not cast for same type compare
30bf48b [Daoyuan Wang] resolve rebase conflict
617d1a8 [Daoyuan Wang] add date_udf case to white list
c37e848 [Daoyuan Wang] comment update
5429212 [Daoyuan Wang] change to long
f8f219f [Daoyuan Wang] revise according to Cheng Hao
0e0a4f5 [Daoyuan Wang] minor format
4ddcb92 [Daoyuan Wang] add java api for date
0e3110e [Daoyuan Wang] try to fix timezone issue
17fda35 [Daoyuan Wang] set test list
2dfbb5b [Daoyuan Wang] support date type
2014-10-13 13:33:12 -07:00
Reynold Xin 39ccabacf1 [SPARK-3861][SQL] Avoid rebuilding hash tables for broadcast joins on each partition
Author: Reynold Xin <rxin@apache.org>

Closes #2727 from rxin/SPARK-3861-broadcast-hash-2 and squashes the following commits:

9c7b1a2 [Reynold Xin] Revert "Reuse CompactBuffer in UniqueKeyHashedRelation."
97626a1 [Reynold Xin] Reuse CompactBuffer in UniqueKeyHashedRelation.
7fcffb5 [Reynold Xin] Make UniqueKeyHashedRelation private[joins].
18eb214 [Reynold Xin] Merge branch 'SPARK-3861-broadcast-hash' into SPARK-3861-broadcast-hash-1
4b9d0c9 [Reynold Xin] UniqueKeyHashedRelation.get should return null if the value is null.
e0ebdd1 [Reynold Xin] Added a test case.
90b58c0 [Reynold Xin] [SPARK-3861] Avoid rebuilding hash tables on each partition
0c0082b [Reynold Xin] Fix line length.
cbc664c [Reynold Xin] Rename join -> joins package.
a070d44 [Reynold Xin] Fix line length in HashJoin
a39be8c [Reynold Xin] [SPARK-3857] Create a join package for various join operators.
2014-10-13 11:50:42 -07:00
Cheng Lian 421382d0e7 [SPARK-3824][SQL] Sets in-memory table default storage level to MEMORY_AND_DISK
Using `MEMORY_AND_DISK` as default storage level for in-memory table caching. Due to the in-memory columnar representation, recomputing an in-memory cached table partitions can be very expensive.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2686 from liancheng/spark-3824 and squashes the following commits:

35d2ed0 [Cheng Lian] Removes extra space
1ab7967 [Cheng Lian] Reduces test data size to fit DiskStore.getBytes()
ba565f0 [Cheng Lian] Maks CachedBatch serializable
07f0204 [Cheng Lian] Sets in-memory table default storage level to MEMORY_AND_DISK
2014-10-09 18:26:43 -07:00
Cheng Lian edf02da389 [SPARK-3654][SQL] Unifies SQL and HiveQL parsers
This PR is a follow up of #2590, and tries to introduce a top level SQL parser entry point for all SQL dialects supported by Spark SQL.

A top level parser `SparkSQLParser` is introduced to handle the syntaxes that all SQL dialects should recognize (e.g. `CACHE TABLE`, `UNCACHE TABLE` and `SET`, etc.). For all the syntaxes this parser doesn't recognize directly, it fallbacks to a specified function that tries to parse arbitrary input to a `LogicalPlan`. This function is typically another parser combinator like `SqlParser`. DDL syntaxes introduced in #2475 can be moved to here.

The `ExtendedHiveQlParser` now only handle Hive specific extensions.

Also took the chance to refactor/reformat `SqlParser` for better readability.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2698 from liancheng/gen-sql-parser and squashes the following commits:

ceada76 [Cheng Lian] Minor styling fixes
9738934 [Cheng Lian] Minor refactoring, removes optional trailing ";" in the parser
bb2ab12 [Cheng Lian] SET property value can be empty string
ce8860b [Cheng Lian] Passes test suites
e86968e [Cheng Lian] Removes debugging code
8bcace5 [Cheng Lian] Replaces digit.+ to rep1(digit) (Scala style checking doesn't like it)
d15d54f [Cheng Lian] Unifies SQL and HiveQL parsers
2014-10-09 18:25:06 -07:00
Michael Armbrust 2837bf8548 [SPARK-3798][SQL] Store the output of a generator in a val
This prevents it from changing during serialization, leading to corrupted results.

Author: Michael Armbrust <michael@databricks.com>

Closes #2656 from marmbrus/generateBug and squashes the following commits:

efa32eb [Michael Armbrust] Store the output of a generator in a val. This prevents it from changing during serialization.
2014-10-09 17:54:02 -07:00
Nathan Howell bc3b6cb061 [SPARK-3858][SQL] Pass the generator alias into logical plan node
The alias parameter is being ignored, which makes it more difficult to specify a qualifier for Generator expressions.

Author: Nathan Howell <nhowell@godaddy.com>

Closes #2721 from NathanHowell/SPARK-3858 and squashes the following commits:

8aa0f43 [Nathan Howell] [SPARK-3858][SQL] Pass the generator alias into logical plan node
2014-10-09 15:03:01 -07:00
Yin Huai 1c7f0ab302 [SPARK-3339][SQL] Support for skipping json lines that fail to parse
This PR aims to provide a way to skip/query corrupt JSON records. To do so, we introduce an internal column to hold corrupt records (the default name is `_corrupt_record`. This name can be changed by setting the value of `spark.sql.columnNameOfCorruptRecord`). When there is a parsing error, we will put the corrupt record in its unparsed format to the internal column. Users can skip/query this column through SQL.

* To query those corrupt records
```
-- For Hive parser
SELECT `_corrupt_record`
FROM jsonTable
WHERE `_corrupt_record` IS NOT NULL
-- For our SQL parser
SELECT _corrupt_record
FROM jsonTable
WHERE _corrupt_record IS NOT NULL
```
* To skip corrupt records and query regular records
```
-- For Hive parser
SELECT field1, field2
FROM jsonTable
WHERE `_corrupt_record` IS NULL
-- For our SQL parser
SELECT field1, field2
FROM jsonTable
WHERE _corrupt_record IS NULL
```

Generally, it is not recommended to change the name of the internal column. If the name has to be changed to avoid possible name conflicts, you can use `sqlContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, <new column name>)` or `sqlContext.sql(SET spark.sql.columnNameOfCorruptRecord=<new column name>)`.

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #2680 from yhuai/corruptJsonRecord and squashes the following commits:

4c9828e [Yin Huai] Merge remote-tracking branch 'upstream/master' into corruptJsonRecord
309616a [Yin Huai] Change the default name of corrupt record to "_corrupt_record".
b4a3632 [Yin Huai] Merge remote-tracking branch 'upstream/master' into corruptJsonRecord
9375ae9 [Yin Huai] Set the column name of corrupt json record back to the default one after the unit test.
ee584c0 [Yin Huai] Provide a way to query corrupt json records as unparsed strings.
2014-10-09 14:57:27 -07:00
Mike Timper ec4d40e481 [SPARK-3853][SQL] JSON Schema support for Timestamp fields
In JSONRDD.scala, add 'case TimestampType' in the enforceCorrectType function and a toTimestamp function.

Author: Mike Timper <mike@aurorafeint.com>

Closes #2720 from mtimper/master and squashes the following commits:

9386ab8 [Mike Timper] Fix and tests for SPARK-3853
2014-10-09 14:02:27 -07:00
Reynold Xin bcb1ae049b [SPARK-3857] Create joins package for various join operators.
Author: Reynold Xin <rxin@apache.org>

Closes #2719 from rxin/sql-join-break and squashes the following commits:

0c0082b [Reynold Xin] Fix line length.
cbc664c [Reynold Xin] Rename join -> joins package.
a070d44 [Reynold Xin] Fix line length in HashJoin
a39be8c [Reynold Xin] [SPARK-3857] Create a join package for various join operators.
2014-10-08 18:17:01 -07:00
Cheng Lian a42cc08d21 [SPARK-3713][SQL] Uses JSON to serialize DataType objects
This PR uses JSON instead of `toString` to serialize `DataType`s. The latter is not only hard to parse but also flaky in many cases.

Since we already write schema information to Parquet metadata in the old style, we have to reserve the old `DataType` parser and ensure downward compatibility. The old parser is now renamed to `CaseClassStringParser` and moved into `object DataType`.

JoshRosen davies Please help review PySpark related changes, thanks!

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2563 from liancheng/datatype-to-json and squashes the following commits:

fc92eb3 [Cheng Lian] Reverts debugging code, simplifies primitive type JSON representation
438c75f [Cheng Lian] Refactors PySpark DataType JSON SerDe per comments
6b6387b [Cheng Lian] Removes debugging code
6a3ee3a [Cheng Lian] Addresses per review comments
dc158b5 [Cheng Lian] Addresses PEP8 issues
99ab4ee [Cheng Lian] Adds compatibility est case for Parquet type conversion
a983a6c [Cheng Lian] Adds PySpark support
f608c6e [Cheng Lian] De/serializes DataType objects from/to JSON
2014-10-08 17:04:49 -07:00
Kousuke Saruta a85f24accd [SPARK-3831] [SQL] Filter rule Improvement and bool expression optimization.
If we write the filter which is always FALSE like

    SELECT * from person WHERE FALSE;

200 tasks will run. I think, 1 task is enough.

And current optimizer cannot optimize the case NOT is duplicated like

    SELECT * from person WHERE NOT ( NOT (age > 30));

The filter rule above should be simplified

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #2692 from sarutak/SPARK-3831 and squashes the following commits:

25f3e20 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3831
23c750c [Kousuke Saruta] Improved unsupported predicate test case
a11b9f3 [Kousuke Saruta] Modified NOT predicate test case in PartitionBatchPruningSuite
8ea872b [Kousuke Saruta] Fixed the number of tasks when the data of  LocalRelation is empty.
2014-10-08 17:03:47 -07:00
Cheng Lian 34b97a067d [SPARK-3645][SQL] Makes table caching eager by default and adds syntax for lazy caching
Although lazy caching for in-memory table seems consistent with the `RDD.cache()` API, it's relatively confusing for users who mainly work with SQL and not familiar with Spark internals. The `CACHE TABLE t; SELECT COUNT(*) FROM t;` pattern is also commonly seen just to ensure predictable performance.

This PR makes both the `CACHE TABLE t [AS SELECT ...]` statement and the `SQLContext.cacheTable()` API eager by default, and adds a new `CACHE LAZY TABLE t [AS SELECT ...]` syntax to provide lazy in-memory table caching.

Also, took the chance to make some refactoring: `CacheCommand` and `CacheTableAsSelectCommand` are now merged and renamed to `CacheTableCommand` since the former is strictly a special case of the latter. A new `UncacheTableCommand` is added for the `UNCACHE TABLE t` statement.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2513 from liancheng/eager-caching and squashes the following commits:

fe92287 [Cheng Lian] Makes table caching eager by default and adds syntax for lazy caching
2014-10-05 17:51:59 -07:00
Michael Armbrust 6a1d48f4f0 [SPARK-3212][SQL] Use logical plan matching instead of temporary tables for table caching
_Also addresses: SPARK-1671, SPARK-1379 and SPARK-3641_

This PR introduces a new trait, `CacheManger`, which replaces the previous temporary table based caching system.  Instead of creating a temporary table that shadows an existing table with and equivalent cached representation, the cached manager maintains a separate list of logical plans and their cached data.  After optimization, this list is searched for any matching plan fragments.  When a matching plan fragment is found it is replaced with the cached data.

There are several advantages to this approach:
 - Calling .cache() on a SchemaRDD now works as you would expect, and uses the more efficient columnar representation.
 - Its now possible to provide a list of temporary tables, without having to decide if a given table is actually just a  cached persistent table. (To be done in a follow-up PR)
 - In some cases it is possible that cached data will be used, even if a cached table was not explicitly requested.  This is because we now look at the logical structure instead of the table name.
 - We now correctly invalidate when data is inserted into a hive table.

Author: Michael Armbrust <michael@databricks.com>

Closes #2501 from marmbrus/caching and squashes the following commits:

63fbc2c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into caching.
0ea889e [Michael Armbrust] Address comments.
1e23287 [Michael Armbrust] Add support for cache invalidation for hive inserts.
65ed04a [Michael Armbrust] fix tests.
bdf9a3f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into caching
b4b77f2 [Michael Armbrust] Address comments
6923c9d [Michael Armbrust] More comments / tests
80f26ac [Michael Armbrust] First draft of improved semantics for Spark SQL caching.
2014-10-03 12:34:27 -07:00
Cheng Lian a31f4ff22f [SQL] Made Command.sideEffectResult protected
Considering `Command.executeCollect()` simply delegates to `Command.sideEffectResult`, we no longer need to leave the latter `protected[sql]`.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2431 from liancheng/narrow-scope and squashes the following commits:

1bfc16a [Cheng Lian] Made Command.sideEffectResult protected
2014-10-01 16:00:29 -07:00
Reynold Xin f350cd3070 [SPARK-3543] TaskContext remaining cleanup work.
Author: Reynold Xin <rxin@apache.org>

Closes #2560 from rxin/TaskContext and squashes the following commits:

9eff95a [Reynold Xin] [SPARK-3543] remaining cleanup work.
2014-09-28 20:32:54 -07:00
Michael Armbrust a08153f8a3 [SPARK-3646][SQL] Copy SQL configuration from SparkConf when a SQLContext is created.
This will allow us to take advantage of things like the spark.defaults file.

Author: Michael Armbrust <michael@databricks.com>

Closes #2493 from marmbrus/copySparkConf and squashes the following commits:

0bd1377 [Michael Armbrust] Copy SQL configuration from SparkConf when a SQLContext is created.
2014-09-23 12:27:12 -07:00
ravipesala 3b8eefa9b8 [SPARK-3536][SQL] SELECT on empty parquet table throws exception
It returns null metadata from parquet if querying on empty parquet file while calculating splits.So added null check and returns the empty splits.

Author : ravipesala ravindra.pesalahuawei.com

Author: ravipesala <ravindra.pesala@huawei.com>

Closes #2456 from ravipesala/SPARK-3536 and squashes the following commits:

1e81a50 [ravipesala] Fixed the issue when querying on empty parquet file.
2014-09-23 11:52:13 -07:00
Michael Armbrust 293ce85145 [SPARK-3414][SQL] Replace LowerCaseSchema with Resolver
**This PR introduces a subtle change in semantics for HiveContext when using the results in Python or Scala.  Specifically, while resolution remains case insensitive, it is now case preserving.**

_This PR is a follow up to #2293 (and to a lesser extent #2262 #2334)._

In #2293 the catalog was changed to store analyzed logical plans instead of unresolved ones.  While this change fixed the reported bug (which was caused by yet another instance of us forgetting to put in a `LowerCaseSchema` operator) it had the consequence of breaking assumptions made by `MultiInstanceRelation`.  Specifically, we can't replace swap out leaf operators in a tree without rewriting changed expression ids (which happens when you self join the same RDD that has been registered as a temp table).

In this PR, I instead remove the need to insert `LowerCaseSchema` operators at all, by moving the concern of matching up identifiers completely into analysis.  Doing so allows the test cases from both #2293 and #2262 to pass at the same time (and likely fixes a slew of other "unknown unknown" bugs).

While it is rolled back in this PR, storing the analyzed plan might actually be a good idea.  For instance, it is kind of confusing if you register a temporary table, change the case sensitivity of resolution and now you can't query that table anymore.  This can be addressed in a follow up PR.

Follow-ups:
 - Configurable case sensitivity
 - Consider storing analyzed plans for temp tables

Author: Michael Armbrust <michael@databricks.com>

Closes #2382 from marmbrus/lowercase and squashes the following commits:

c21171e [Michael Armbrust] Ensure the resolver is used for field lookups and ensure that case insensitive resolution is still case preserving.
d4320f1 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into lowercase
2de881e [Michael Armbrust] Address comments.
219805a [Michael Armbrust] style
5b93711 [Michael Armbrust] Replace LowerCaseSchema with Resolver.
2014-09-20 16:41:14 -07:00
Sandy Ryza 3b9cd13ebc SPARK-3605. Fix typo in SchemaRDD.
Author: Sandy Ryza <sandy@cloudera.com>

Closes #2460 from sryza/sandy-spark-3605 and squashes the following commits:

09d940b [Sandy Ryza] SPARK-3605. Fix typo in SchemaRDD.
2014-09-19 15:34:48 -07:00
ravipesala 5522151eb1 [SPARK-2594][SQL] Support CACHE TABLE <name> AS SELECT ...
This feature allows user to add cache table from the select query.
Example : ```CACHE TABLE testCacheTable AS SELECT * FROM TEST_TABLE```
Spark takes this type of SQL as command and it does lazy caching just like ```SQLContext.cacheTable```, ```CACHE TABLE <name>``` does.
It can be executed from both SQLContext and HiveContext.

Recreated the pull request after rebasing with master.And fixed all the comments raised in previous pull requests.
https://github.com/apache/spark/pull/2381
https://github.com/apache/spark/pull/2390

Author : ravipesala ravindra.pesalahuawei.com

Author: ravipesala <ravindra.pesala@huawei.com>

Closes #2397 from ravipesala/SPARK-2594 and squashes the following commits:

a5f0beb [ravipesala] Simplified the code as per Admin comment.
8059cd2 [ravipesala] Changed the behaviour from eager caching to lazy caching.
d6e469d [ravipesala] Code review comments by Admin are handled.
c18aa38 [ravipesala] Merge remote-tracking branch 'remotes/ravipesala/Add-Cache-table-as' into SPARK-2594
394d5ca [ravipesala] Changed style
fb1759b [ravipesala] Updated as per Admin comments
8c9993c [ravipesala] Changed the style
d8b37b2 [ravipesala] Updated as per the comments by Admin
bc0bffc [ravipesala] Merge remote-tracking branch 'ravipesala/Add-Cache-table-as' into Add-Cache-table-as
e3265d0 [ravipesala] Updated the code as per the comments by Admin in pull request.
724b9db [ravipesala] Changed style
aaf5b59 [ravipesala] Added comment
dc33895 [ravipesala] Updated parser to support add cache table command
b5276b2 [ravipesala] Updated parser to support add cache table command
eebc0c1 [ravipesala] Add CACHE TABLE <name> AS SELECT ...
6758f80 [ravipesala] Changed style
7459ce3 [ravipesala] Added comment
13c8e27 [ravipesala] Updated parser to support add cache table command
4e858d8 [ravipesala] Updated parser to support add cache table command
b803fc8 [ravipesala] Add CACHE TABLE <name> AS SELECT ...
2014-09-19 15:31:57 -07:00
Aaron Staple 8e7ae477ba [SPARK-2314][SQL] Override collect and take in python library, and count in java library, with optimized versions.
SchemaRDD overrides RDD functions, including collect, count, and take, with optimized versions making use of the query optimizer.  The java and python interface classes wrapping SchemaRDD need to ensure the optimized versions are called as well.  This patch overrides relevant calls in the python and java interfaces with optimized versions.

Adds a new Row serialization pathway between python and java, based on JList[Array[Byte]] versus the existing RDD[Array[Byte]]. I wasn’t overjoyed about doing this, but I noticed that some QueryPlans implement optimizations in executeCollect(), which outputs an Array[Row] rather than the typical RDD[Row] that can be shipped to python using the existing serialization code. To me it made sense to ship the Array[Row] over to python directly instead of converting it back to an RDD[Row] just for the purpose of sending the Rows to python using the existing serialization code.

Author: Aaron Staple <aaron.staple@gmail.com>

Closes #1592 from staple/SPARK-2314 and squashes the following commits:

89ff550 [Aaron Staple] Merge with master.
6bb7b6c [Aaron Staple] Fix typo.
b56d0ac [Aaron Staple] [SPARK-2314][SQL] Override count in JavaSchemaRDD, forwarding to SchemaRDD's count.
0fc9d40 [Aaron Staple] Fix comment typos.
f03cdfa [Aaron Staple] [SPARK-2314][SQL] Override collect and take in sql.py, forwarding to SchemaRDD's collect.
2014-09-16 11:45:35 -07:00
Yin Huai 7583699873 [SPARK-3308][SQL] Ability to read JSON Arrays as tables
This PR aims to support reading top level JSON arrays and take every element in such an array as a row (an empty array will not generate a row).

JIRA: https://issues.apache.org/jira/browse/SPARK-3308

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #2400 from yhuai/SPARK-3308 and squashes the following commits:

990077a [Yin Huai] Handle top level JSON arrays.
2014-09-16 11:40:28 -07:00
Cheng Hao 86d253ec4e [SPARK-3527] [SQL] Strip the string message
Author: Cheng Hao <hao.cheng@intel.com>

Closes #2392 from chenghao-intel/trim and squashes the following commits:

e52024f [Cheng Hao] trim the string message
2014-09-16 11:21:30 -07:00
Michael Armbrust 0f8c4edf4e [SQL] Decrease partitions when testing
Author: Michael Armbrust <michael@databricks.com>

Closes #2164 from marmbrus/shufflePartitions and squashes the following commits:

0da1e8c [Michael Armbrust] test hax
ef2d985 [Michael Armbrust] more test hacks.
2dabae3 [Michael Armbrust] more test fixes
0bdbf21 [Michael Armbrust] Make parquet tests less order dependent
b42eeab [Michael Armbrust] increase test parallelism
80453d5 [Michael Armbrust] Decrease partitions when testing
2014-09-13 16:08:04 -07:00
Cheng Lian 74049249ab [SPARK-3294][SQL] Eliminates boxing costs from in-memory columnar storage
This is a major refactoring of the in-memory columnar storage implementation, aims to eliminate boxing costs from critical paths (building/accessing column buffers) as much as possible. The basic idea is to refactor all major interfaces into a row-based form and use them together with `SpecificMutableRow`. The difficult part is how to adapt all compression schemes, esp. `RunLengthEncoding` and `DictionaryEncoding`, to this design. Since in-memory compression is disabled by default for now, and this PR should be strictly better than before no matter in-memory compression is enabled or not, maybe I'll finish that part in another PR.

**UPDATE** This PR also took the chance to optimize `HiveTableScan` by

1. leveraging `SpecificMutableRow` to avoid boxing cost, and
1. building specific `Writable` unwrapper functions a head of time to avoid per row pattern matching and branching costs.

TODO

- [x] Benchmark
- [ ] ~~Eliminate boxing costs in `RunLengthEncoding`~~ (left to future PRs)
- [ ] ~~Eliminate boxing costs in `DictionaryEncoding` (seems not easy to do without specializing `DictionaryEncoding` for every supported column type)~~  (left to future PRs)

## Micro benchmark

The benchmark uses a 10 million line CSV table consists of bytes, shorts, integers, longs, floats and doubles, measures the time to build the in-memory version of this table, and the time to scan the whole in-memory table.

Benchmark code can be found [here](https://gist.github.com/liancheng/fe70a148de82e77bd2c8#file-hivetablescanbenchmark-scala). Script used to generate the input table can be found [here](https://gist.github.com/liancheng/fe70a148de82e77bd2c8#file-tablegen-scala).

Speedup:

- Hive table scanning + column buffer building: **18.74%**

  The original benchmark uses 1K as in-memory batch size, when increased to 10K, it can be 28.32% faster.

- In-memory table scanning: **7.95%**

Before:

        | Building | Scanning
------- | -------- | --------
1       | 16472    | 525
2       | 16168    | 530
3       | 16386    | 529
4       | 16184    | 538
5       | 16209    | 521
Average | 16283.8  | 528.6

After:

        | Building | Scanning
------- | -------- | --------
1       | 13124    | 458
2       | 13260    | 529
3       | 12981    | 463
4       | 13214    | 483
5       | 13583    | 500
Average | 13232.4  | 486.6

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2327 from liancheng/prevent-boxing/unboxing and squashes the following commits:

4419fe4 [Cheng Lian] Addressing comments
e5d2cf2 [Cheng Lian] Bug fix: should call setNullAt when field value is null to avoid NPE
8b8552b [Cheng Lian] Only checks for partition batch pruning flag once
489f97b [Cheng Lian] Bug fix: TableReader.fillObject uses wrong ordinals
97bbc4e [Cheng Lian] Optimizes hive.TableReader by by providing specific Writable unwrappers a head of time
3dc1f94 [Cheng Lian] Minor changes to eliminate row object creation
5b39cb9 [Cheng Lian] Lowers log level of compression scheme details
f2a7890 [Cheng Lian] Use SpecificMutableRow in InMemoryColumnarTableScan to avoid boxing
9cf30b0 [Cheng Lian] Added row based ColumnType.append/extract
456c366 [Cheng Lian] Made compression decoder row based
edac3cd [Cheng Lian] Makes ColumnAccessor.extractSingle row based
8216936 [Cheng Lian] Removes boxing cost in IntDelta and LongDelta by providing specialized implementations
b70d519 [Cheng Lian] Made some in-memory columnar storage interfaces row-based
2014-09-13 15:08:30 -07:00
Yin Huai 4bc9e046cb [SPARK-3390][SQL] sqlContext.jsonRDD fails on a complex structure of JSON array and JSON object nesting
This PR aims to correctly handle JSON arrays in the type of `ArrayType(...(ArrayType(StructType)))`.

JIRA: https://issues.apache.org/jira/browse/SPARK-3390.

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #2364 from yhuai/SPARK-3390 and squashes the following commits:

46db418 [Yin Huai] Handle JSON arrays in the type of ArrayType(...(ArrayType(StructType))).
2014-09-11 15:23:33 -07:00
Cheng Hao ca83f1e2c4 [SPARK-2917] [SQL] Avoid table creation in logical plan analyzing for CTAS
Author: Cheng Hao <hao.cheng@intel.com>

Closes #1846 from chenghao-intel/ctas and squashes the following commits:

56a0578 [Cheng Hao] remove the unused imports
9a57abc [Cheng Hao] Avoid table creation in logical plan analyzing
2014-09-11 11:57:01 -07:00
Michael Armbrust f92cde24e8 [SPARK-3447][SQL] Remove explicit conversion with JListWrapper to avoid NPE
Author: Michael Armbrust <michael@databricks.com>

Closes #2323 from marmbrus/kryoJListNPE and squashes the following commits:

9634f11 [Michael Armbrust] Rollback JSON RDD changes
4d4d93c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into kryoJListNPE
646976b [Michael Armbrust] Fix JSON RDD Conversion too
59065bc [Michael Armbrust] Remove explicit conversion to avoid NPE
2014-09-10 20:59:40 -07:00
Daoyuan Wang f0c87dc86a [SPARK-3363][SQL] Type Coercion should promote null to all other types.
Type Coercion should support every type to have null value

Author: Daoyuan Wang <daoyuan.wang@intel.com>
Author: Michael Armbrust <michael@databricks.com>

Closes #2246 from adrian-wang/spark3363-0 and squashes the following commits:

c6241de [Daoyuan Wang] minor code clean
595b417 [Daoyuan Wang] Merge pull request #2 from marmbrus/pr/2246
832e640 [Michael Armbrust] reduce code duplication
ef6f986 [Daoyuan Wang] make double boolean miss in jsonRDD compatibleType
c619f0a [Daoyuan Wang] Type Coercion should support every type to have null value
2014-09-10 10:48:36 -07:00
Eric Liang b734ed0c22 [SPARK-3395] [SQL] DSL sometimes incorrectly reuses attribute ids, breaking queries
This resolves https://issues.apache.org/jira/browse/SPARK-3395

Author: Eric Liang <ekl@google.com>

Closes #2266 from ericl/spark-3395 and squashes the following commits:

7f2b6f0 [Eric Liang] add regression test
05bd1e4 [Eric Liang] in the dsl, create a new schema instance in each applySchema
2014-09-09 23:47:12 -07:00
Cheng Lian dc1dbf206e [SPARK-3414][SQL] Stores analyzed logical plan when registering a temp table
Case insensitivity breaks when unresolved relation contains attributes with uppercase letters in their names, because we store unanalyzed logical plan when registering temp tables while the `CaseInsensitivityAttributeReferences` batch runs before the `Resolution` batch. To fix this issue, we need to store analyzed logical plan.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2293 from liancheng/spark-3414 and squashes the following commits:

d9fa1d6 [Cheng Lian] Stores analyzed logical plan when registering a temp table
2014-09-08 19:08:05 -07:00
Eric Liang 7db53391f1 [SPARK-3349][SQL] Output partitioning of limit should not be inherited from child
This resolves https://issues.apache.org/jira/browse/SPARK-3349

Author: Eric Liang <ekl@google.com>

Closes #2262 from ericl/spark-3349 and squashes the following commits:

3e1b05c [Eric Liang] add regression test
ac32723 [Eric Liang] make limit/takeOrdered output SinglePartition
2014-09-08 16:14:36 -07:00
Reynold Xin e2614038e7 [SPARK-3408] Fixed Limit operator so it works with sort-based shuffle.
Author: Reynold Xin <rxin@apache.org>

Closes #2281 from rxin/sql-limit-sort and squashes the following commits:

1ef7780 [Reynold Xin] [SPARK-3408] Fixed Limit operator so it works with sort-based shuffle.
2014-09-07 18:42:24 -07:00
Michael Armbrust 39db1bfdab [SQL] Update SQL Programming Guide
Author: Michael Armbrust <michael@databricks.com>
Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #2258 from marmbrus/sqlDocUpdate and squashes the following commits:

f3d450b [Michael Armbrust] fix brackets
bea3bfa [Michael Armbrust] Davies suggestions
3a29fe2 [Michael Armbrust] tighten visibility
a71aa36 [Michael Armbrust] Draft of doc updates
52932c0 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into sqlDocUpdate
1e8c849 [Yin Huai] Update the example used for applySchema.
9457c39 [Yin Huai] Update doc.
31ba240 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeDoc
29bc668 [Yin Huai] Draft doc for data type and schema APIs.
2014-09-07 21:34:46 -04:00
Reynold Xin 1b9001f78d [SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.
This is a tiny teeny optimization to move the if check of sortBasedShuffledOn to outside the closures so the closures don't need to pull in the entire Exchange operator object.

Author: Reynold Xin <rxin@apache.org>

Closes #2282 from rxin/SPARK-3409 and squashes the following commits:

1de3f88 [Reynold Xin] [SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.
2014-09-06 00:33:00 -07:00
Cheng Hao 1904bac38d [SPARK-3392] [SQL] Show value spark.sql.shuffle.partitions for mapred.reduce.tasks
This is a tiny fix for getting the value of "mapred.reduce.tasks", which make more sense for the hive user.
As well as the command "set -v", which should output verbose information for all of the key/values.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #2261 from chenghao-intel/set_mapreduce_tasks and squashes the following commits:

653858a [Cheng Hao] show value spark.sql.shuffle.partitions for mapred.reduce.tasks
2014-09-04 19:16:12 -07:00
Liang-Chi Hsieh 3eb6ef316c [SPARK-3310][SQL] Directly use currentTable without unnecessary implicit conversion
We can directly use currentTable there without unnecessary implicit conversion.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #2203 from viirya/direct_use_inmemoryrelation and squashes the following commits:

4741d02 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into direct_use_inmemoryrelation
b671f67 [Liang-Chi Hsieh] Can directly use currentTable there without unnecessary implicit conversion.
2014-09-04 18:46:09 -07:00
Kousuke Saruta dc1ba9e9fc [SPARK-3378] [DOCS] Replace the word "SparkSQL" with right word "Spark SQL"
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #2251 from sarutak/SPARK-3378 and squashes the following commits:

0bfe234 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3378
bb5938f [Kousuke Saruta] Replaced rest of "SparkSQL" with "Spark SQL"
6df66de [Kousuke Saruta] Replaced "SparkSQL" with "Spark SQL"
2014-09-04 15:06:08 -07:00
Davies Liu c5cbc49233 [SPARK-3335] [SQL] [PySpark] support broadcast in Python UDF
After this patch, broadcast can be used in Python UDF.

Author: Davies Liu <davies.liu@gmail.com>

Closes #2243 from davies/udf_broadcast and squashes the following commits:

7b88861 [Davies Liu] support broadcast in UDF
2014-09-03 19:08:39 -07:00
Cheng Lian 248067adbe [SPARK-2961][SQL] Use statistics to prune batches within cached partitions
This PR is based on #1883 authored by marmbrus. Key differences:

1. Batch pruning instead of partition pruning

   When #1883 was authored, batched column buffer building (#1880) hadn't been introduced. This PR combines these two and provide partition batch level pruning, which leads to smaller memory footprints and can generally skip more elements. The cost is that the pruning predicates are evaluated more frequently (partition number multiplies batch number per partition).

1. More filters are supported

   Filter predicates consist of `=`, `<`, `<=`, `>`, `>=` and their conjunctions and disjunctions are supported.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2188 from liancheng/in-mem-batch-pruning and squashes the following commits:

68cf019 [Cheng Lian] Marked sqlContext as @transient
4254f6c [Cheng Lian] Enables in-memory partition pruning in PartitionBatchPruningSuite
3784105 [Cheng Lian] Overrides InMemoryColumnarTableScan.sqlContext
d2a1d66 [Cheng Lian] Disables in-memory partition pruning by default
062c315 [Cheng Lian] HiveCompatibilitySuite code cleanup
16b77bf [Cheng Lian] Fixed pruning predication conjunctions and disjunctions
16195c5 [Cheng Lian] Enabled both disjunction and conjunction
89950d0 [Cheng Lian] Worked around Scala style check
9c167f6 [Cheng Lian] Minor code cleanup
3c4d5c7 [Cheng Lian] Minor code cleanup
ea59ee5 [Cheng Lian] Renamed PartitionSkippingSuite to PartitionBatchPruningSuite
fc517d0 [Cheng Lian] More test cases
1868c18 [Cheng Lian] Code cleanup, bugfix, and adding tests
cb76da4 [Cheng Lian] Added more predicate filters, fixed table scan stats for testing purposes
385474a [Cheng Lian] Merge branch 'inMemStats' into in-mem-batch-pruning
2014-09-03 18:59:26 -07:00
Cheng Lian f48420fde5 [SPARK-2973][SQL] Lightweight SQL commands without distributed jobs when calling .collect()
By overriding `executeCollect()` in physical plan classes of all commands, we can avoid to kick off a distributed job when collecting result of a SQL command, e.g. `sql("SET").collect()`.

Previously, `Command.sideEffectResult` returns a `Seq[Any]`, and the `execute()` method in sub-classes of `Command` typically convert that to a `Seq[Row]` then parallelize it to an RDD. Now with this PR, `sideEffectResult` is required to return a `Seq[Row]` directly, so that `executeCollect()` can directly leverage that and be factored to the `Command` parent class.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2215 from liancheng/lightweight-commands and squashes the following commits:

3fbef60 [Cheng Lian] Factored execute() method of physical commands to parent class Command
5a0e16c [Cheng Lian] Passes test suites
e0e12e9 [Cheng Lian] Refactored Command.sideEffectResult and Command.executeCollect
995bdd8 [Cheng Lian] Cleaned up DescribeHiveTableCommand
542977c [Cheng Lian] Avoids confusion between logical and physical plan by adding package prefixes
55b2aa5 [Cheng Lian] Avoids distributed jobs when execution SQL commands
2014-09-03 18:57:20 -07:00
Liang-Chi Hsieh 24ab384018 [SPARK-3300][SQL] No need to call clear() and shorten build()
The function `ensureFreeSpace` in object `ColumnBuilder` clears old buffer before copying its content to new buffer. This PR fixes it.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #2195 from viirya/fix_buffer_clear and squashes the following commits:

792f009 [Liang-Chi Hsieh] no need to call clear(). use flip() instead of calling limit(), position() and rewind().
df2169f [Liang-Chi Hsieh] should clean old buffer after copying its content.
2014-09-02 20:51:25 -07:00
Cheng Lian 19d3e1e8e9 [SQL] Renamed ColumnStat to ColumnMetrics to avoid confusion between ColumnStats
Class names of these two are just too similar.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2189 from liancheng/column-metrics and squashes the following commits:

8bb3b21 [Cheng Lian] Renamed ColumnStat to ColumnMetrics to avoid confusion between ColumnStats
2014-09-02 20:49:36 -07:00
scwf 725715cbf3 [SPARK-3010] fix redundant conditional
https://issues.apache.org/jira/browse/SPARK-3010

this pr is to fix redundant conditional in spark, such as
1.
private[spark] def codegenEnabled: Boolean =
if (getConf(CODEGEN_ENABLED, "false") == "true") true else false
2.
x => if (x == 2) true else false
...

Author: scwf <wangfei1@huawei.com>
Author: wangfei <wangfei_hello@126.com>

Closes #1992 from scwf/condition and squashes the following commits:

b2a044a [scwf] merge SecurityManager
e16239c [scwf] fix confilct
6811401 [scwf] fix merge confilct
0824df4 [scwf] Merge branch 'master' of https://github.com/apache/spark into patch-4
e274515 [scwf] fix redundant conditions
d032bf9 [wangfei] [SQL]Excess judgment
2014-08-31 14:02:11 -07:00
Cheng Lian 32b18dd52c [SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions
Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2213 from liancheng/spark-3320 and squashes the following commits:

45a0139 [Cheng Lian] Fixed typo in InMemoryColumnarQuerySuite
f67067d [Cheng Lian] Fixed SPARK-3320
2014-08-29 18:16:47 -07:00
Cheng Hao dc4d577c65 [SPARK-3198] [SQL] Remove the TreeNode.id
Thus id property of the TreeNode API does save time in a faster way to compare 2 TreeNodes, it is kind of performance bottleneck during the expression object creation in a multi-threading env (because of the memory barrier).
Fortunately, the tree node comparison only happen once in master, so even we remove it, the entire performance will not be affected.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #2155 from chenghao-intel/treenode and squashes the following commits:

7cf2cd2 [Cheng Hao] Remove the implicit keyword for TreeNodeRef and some other small issues
5873415 [Cheng Hao] Remove the TreeNode.id
2014-08-29 15:32:26 -07:00
Michael Armbrust 76e3ba4264 [SPARK-3230][SQL] Fix udfs that return structs
We need to convert the case classes into Rows.

Author: Michael Armbrust <michael@databricks.com>

Closes #2133 from marmbrus/structUdfs and squashes the following commits:

189722f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into structUdfs
8e29b1c [Michael Armbrust] Use existing function
d8d0b76 [Michael Armbrust] Fix udfs that return structs
2014-08-28 00:15:23 -07:00
Cheng Lian 68f75dcdfe [SQL] Fixed 2 comment typos in SQLConf
Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2172 from liancheng/sqlconf-typo and squashes the following commits:

115cc71 [Cheng Lian] Fixed 2 comment typos in SQLConf
2014-08-28 00:08:09 -07:00
Michael Armbrust 7d2a7a91f2 [SPARK-3235][SQL] Ensure in-memory tables don't always broadcast.
Author: Michael Armbrust <michael@databricks.com>

Closes #2147 from marmbrus/inMemDefaultSize and squashes the following commits:

5390360 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into inMemDefaultSize
14204d3 [Michael Armbrust] Set the context before creating SparkLogicalPlans.
8da4414 [Michael Armbrust] Make sure we throw errors when leaf nodes fail to provide statistcs
18ce029 [Michael Armbrust] Ensure in-memory tables don't always broadcast.
2014-08-27 15:14:08 -07:00
chutium 48f42781de [SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file as parameter
```if (!fs.getFileStatus(path).isDir) throw Exception``` make no sense after this commit #1370

be careful if someone is working on SPARK-2551, make sure the new change passes test case ```test("Read a parquet file instead of a directory")```

Author: chutium <teng.qiu@gmail.com>

Closes #2044 from chutium/parquet-singlefile and squashes the following commits:

4ae477f [chutium] [SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file as parameter
2014-08-27 13:13:04 -07:00
Michael Armbrust e1139dd60e [SPARK-3237][SQL] Fix parquet filters with UDFs
Author: Michael Armbrust <michael@databricks.com>

Closes #2153 from marmbrus/parquetFilters and squashes the following commits:

712731a [Michael Armbrust] Use closure serializer for sending filters.
1e83f80 [Michael Armbrust] Clean udf functions.
2014-08-27 00:59:23 -07:00
Takuya UESHIN 727cb25bcc [SPARK-3036][SPARK-3037][SQL] Add MapType/ArrayType containing null value support to Parquet.
JIRA:
- https://issues.apache.org/jira/browse/SPARK-3036
- https://issues.apache.org/jira/browse/SPARK-3037

Currently this uses the following Parquet schema for `MapType` when `valueContainsNull` is `true`:

```
message root {
  optional group a (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required int32 key;
      optional int32 value;
    }
  }
}
```

for `ArrayType` when `containsNull` is `true`:

```
message root {
  optional group a (LIST) {
    repeated group bag {
      optional int32 array;
    }
  }
}
```

We have to think about compatibilities with older version of Spark or Hive or others I mentioned in the JIRA issues.

Notice:
This PR is based on #1963 and #1889.
Please check them first.

/cc marmbrus, yhuai

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #2032 from ueshin/issues/SPARK-3036_3037 and squashes the following commits:

4e8e9e7 [Takuya UESHIN] Add ArrayType containing null value support to Parquet.
013c2ca [Takuya UESHIN] Add MapType containing null value support to Parquet.
62989de [Takuya UESHIN] Merge branch 'issues/SPARK-2969' into issues/SPARK-3036_3037
8e38b53 [Takuya UESHIN] Merge branch 'issues/SPARK-3063' into issues/SPARK-3036_3037
2014-08-26 18:28:41 -07:00
Michael Armbrust c4787a3690 [SPARK-3194][SQL] Add AttributeSet to fix bugs with invalid comparisons of AttributeReferences
It is common to want to describe sets of attributes that are in various parts of a query plan.  However, the semantics of putting `AttributeReference` objects into a standard Scala `Set` result in subtle bugs when references differ cosmetically.  For example, with case insensitive resolution it is possible to have two references to the same attribute whose names are not equal.

In this PR I introduce a new abstraction, an `AttributeSet`, which performs all comparisons using the globally unique `ExpressionId` instead of case class equality.  (There is already a related class, [`AttributeMap`](https://github.com/marmbrus/spark/blob/inMemStats/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala#L32))  This new type of set is used to fix a bug in the optimizer where needed attributes were getting projected away underneath join operators.

I also took this opportunity to refactor the expression and query plan base classes.  In all but one instance the logic for computing the `references` of an `Expression` were the same.  Thus, I moved this logic into the base class.

For query plans the semantics of  the `references` method were ill defined (is it the references output? or is it those used by expression evaluation? or what?).  As a result, this method wasn't really used very much.  So, I removed it.

TODO:
 - [x] Finish scala doc for `AttributeSet`
 - [x] Scan the code for other instances of `Set[Attribute]` and refactor them.
 - [x] Finish removing `references` from `QueryPlan`

Author: Michael Armbrust <michael@databricks.com>

Closes #2109 from marmbrus/attributeSets and squashes the following commits:

1c0dae5 [Michael Armbrust] work on serialization bug.
9ba868d [Michael Armbrust] Merge remote-tracking branch 'origin/master' into attributeSets
3ae5288 [Michael Armbrust] review comments
40ce7f6 [Michael Armbrust] style
d577cc7 [Michael Armbrust] Scaladoc
cae5d22 [Michael Armbrust] remove more references implementations
d6e16be [Michael Armbrust] Remove more instances of "def references" and normal sets of attributes.
fc26b49 [Michael Armbrust] Add AttributeSet class, remove references from Expression.
2014-08-26 16:29:14 -07:00
Takuya UESHIN 6b5584ef1c [SPARK-3063][SQL] ExistingRdd should convert Map to catalyst Map.
Currently `ExistingRdd.convertToCatalyst` doesn't convert `Map` value.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1963 from ueshin/issues/SPARK-3063 and squashes the following commits:

3ba41f2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063
4d7bae2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063
9321379 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063
d8a900a [Takuya UESHIN] Make ExistingRdd.convertToCatalyst be able to convert Map value.
2014-08-26 15:04:08 -07:00
Takuya UESHIN 98c2bb0bbd [SPARK-2969][SQL] Make ScalaReflection be able to handle ArrayType.containsNull and MapType.valueContainsNull.
Make `ScalaReflection` be able to handle like:

- `Seq[Int]` as `ArrayType(IntegerType, containsNull = false)`
- `Seq[java.lang.Integer]` as `ArrayType(IntegerType, containsNull = true)`
- `Map[Int, Long]` as `MapType(IntegerType, LongType, valueContainsNull = false)`
- `Map[Int, java.lang.Long]` as `MapType(IntegerType, LongType, valueContainsNull = true)`

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1889 from ueshin/issues/SPARK-2969 and squashes the following commits:

24f1c5c [Takuya UESHIN] Change the default value of ArrayType.containsNull to true in Python API.
79f5b65 [Takuya UESHIN] Change the default value of ArrayType.containsNull to true in Java API.
7cd1a7a [Takuya UESHIN] Fix json test failures.
2cfb862 [Takuya UESHIN] Change the default value of ArrayType.containsNull to true.
2f38e61 [Takuya UESHIN] Revert the default value of MapTypes.valueContainsNull.
9fa02f5 [Takuya UESHIN] Fix a test failure.
1a9a96b [Takuya UESHIN] Modify ScalaReflection to handle ArrayType.containsNull and MapType.valueContainsNull.
2014-08-26 13:22:55 -07:00
chutium 8856c3d860 [SPARK-3131][SQL] Allow user to set parquet compression codec for writing ParquetFile in SQLContext
There are 4 different compression codec available for ```ParquetOutputFormat```

in Spark SQL, it was set as a hard-coded value in ```ParquetRelation.defaultCompression```

original discuss:
https://github.com/apache/spark/pull/195#discussion-diff-11002083

i added a new config property in SQLConf to allow user to change this compression codec, and i used similar short names syntax as described in SPARK-2953 #1873 (https://github.com/apache/spark/pull/1873/files#diff-0)

btw, which codec should we use as default? it was set to GZIP (https://github.com/apache/spark/pull/195/files#diff-4), but i think maybe we should change this to SNAPPY, since SNAPPY is already the default codec for shuffling in spark-core (SPARK-2469, #1415), and parquet-mr supports Snappy codec natively (e440108de5).

Author: chutium <teng.qiu@gmail.com>

Closes #2039 from chutium/parquet-compression and squashes the following commits:

2f44964 [chutium] [SPARK-3131][SQL] parquet compression default codec set to snappy, also in test suite
e578e21 [chutium] [SPARK-3131][SQL] compression codec config property name and default codec set to snappy
21235dc [chutium] [SPARK-3131][SQL] Allow user to set parquet compression codec for writing ParquetFile in SQLContext
2014-08-26 11:51:26 -07:00
Chia-Yung Su 4243bb6634 [SPARK-3011][SQL] _temporary directory should be filtered out by sqlContext.parquetFile
fix compile error on hadoop 0.23 for the pull request #1924.

Author: Chia-Yung Su <chiayung@appier.com>

Closes #1959 from joesu/bugfix-spark3011 and squashes the following commits:

be30793 [Chia-Yung Su] remove .* and _* except _metadata
8fe2398 [Chia-Yung Su] add note to explain
40ea9bd [Chia-Yung Su] fix hadoop-0.23 compile error
c7e44f2 [Chia-Yung Su] match syntax
f8fc32a [Chia-Yung Su] filter out tmp dir
2014-08-25 18:20:19 -07:00
Cheng Hao 156eb39661 [SPARK-3058] [SQL] Support EXTENDED for EXPLAIN
Provide `extended` keyword support for `explain` command in SQL. e.g.
```
explain extended select key as a1, value as a2 from src where key=1;
== Parsed Logical Plan ==
Project ['key AS a1#3,'value AS a2#4]
 Filter ('key = 1)
  UnresolvedRelation None, src, None

== Analyzed Logical Plan ==
Project [key#8 AS a1#3,value#9 AS a2#4]
 Filter (CAST(key#8, DoubleType) = CAST(1, DoubleType))
  MetastoreRelation default, src, None

== Optimized Logical Plan ==
Project [key#8 AS a1#3,value#9 AS a2#4]
 Filter (CAST(key#8, DoubleType) = 1.0)
  MetastoreRelation default, src, None

== Physical Plan ==
Project [key#8 AS a1#3,value#9 AS a2#4]
 Filter (CAST(key#8, DoubleType) = 1.0)
  HiveTableScan [key#8,value#9], (MetastoreRelation default, src, None), None

Code Generation: false
== RDD ==
(2) MappedRDD[14] at map at HiveContext.scala:350
  MapPartitionsRDD[13] at mapPartitions at basicOperators.scala:42
  MapPartitionsRDD[12] at mapPartitions at basicOperators.scala:57
  MapPartitionsRDD[11] at mapPartitions at TableReader.scala:112
  MappedRDD[10] at map at TableReader.scala:240
  HadoopRDD[9] at HadoopRDD at TableReader.scala:230
```

It's the sub task of #1847. But can go without any dependency.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #1962 from chenghao-intel/explain_extended and squashes the following commits:

295db74 [Cheng Hao] Fix bug in printing the simple execution plan
48bc989 [Cheng Hao] Support EXTENDED for EXPLAIN
2014-08-25 17:43:56 -07:00
Michael Armbrust 3519b5e8e5 [SPARK-2967][SQL] Follow-up: Also copy hash expressions in sort based shuffle fix.
Follow-up to #2066

Author: Michael Armbrust <michael@databricks.com>

Closes #2072 from marmbrus/sortShuffle and squashes the following commits:

2ff8114 [Michael Armbrust] Fix bug
2014-08-23 16:21:08 -07:00
Michael Armbrust 7e191fe29b [SPARK-2554][SQL] CountDistinct partial aggregation and object allocation improvements
Author: Michael Armbrust <michael@databricks.com>
Author: Gregory Owen <greowen@gmail.com>

Closes #1935 from marmbrus/countDistinctPartial and squashes the following commits:

5c7848d [Michael Armbrust] turn off caching in the constructor
8074a80 [Michael Armbrust] fix tests
32d216f [Michael Armbrust] reynolds comments
c122cca [Michael Armbrust] Address comments, add tests
b2e8ef3 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
fae38f4 [Michael Armbrust] Fix style
fdca896 [Michael Armbrust] cleanup
93d0f64 [Michael Armbrust] metastore concurrency fix.
db44a30 [Michael Armbrust] JIT hax.
3868f6c [Michael Armbrust] Merge pull request #9 from GregOwen/countDistinctPartial
c9e67de [Gregory Owen] Made SpecificRow and types serializable by Kryo
2b46c4b [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
8ff6402 [Michael Armbrust] Add specific row.
58d15f1 [Michael Armbrust] disable codegen logging
87d101d [Michael Armbrust] Fix isNullAt bug
abee26d [Michael Armbrust] WIP
27984d0 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial
57ae3b1 [Michael Armbrust] Fix order dependent test
b3d0f64 [Michael Armbrust] Add golden files.
c1f7114 [Michael Armbrust] Improve tests / fix serialization.
f31b8ad [Michael Armbrust] more fixes
38c7449 [Michael Armbrust] comments and style
9153652 [Michael Armbrust] better toString
d494598 [Michael Armbrust] Fix tests now that the planner is better
41fbd1d [Michael Armbrust] Never try and create an empty hash set.
050bb97 [Michael Armbrust] Skip no-arg constructors for kryo,
bd08239 [Michael Armbrust] WIP
213ada8 [Michael Armbrust] First draft of partially aggregated and code generated count distinct / max
2014-08-23 16:19:10 -07:00
Michael Armbrust a2e658dcda [SPARK-2967][SQL] Fix sort based shuffle for spark sql.
Add explicit row copies when sort based shuffle is on.

Author: Michael Armbrust <michael@databricks.com>

Closes #2066 from marmbrus/sortShuffle and squashes the following commits:

fcd7bb2 [Michael Armbrust] Fix sort based shuffle for spark sql.
2014-08-20 15:51:14 -07:00
wangfei 0e3ab94d41 [SQL] add note of use synchronizedMap in SQLConf
Refer to:
http://stackoverflow.com/questions/510632/whats-the-difference-between-concurrenthashmap-and-collections-synchronizedmap
Collections.synchronizedMap(map) creates a blocking Map which will degrade performance, albeit ensure consistency. So use ConcurrentHashMap(a more effective thread-safe hashmap) instead.

also update HiveQuerySuite to fix test error when changed to ConcurrentHashMap.

Author: wangfei <wangfei_hello@126.com>
Author: scwf <wangfei1@huawei.com>

Closes #1996 from scwf/sqlconf and squashes the following commits:

93bc0c5 [wangfei] revert change of HiveQuerySuite
0cc05dd [wangfei] add note for use synchronizedMap
3c224d31 [scwf] fix formate
a7bcb98 [scwf] use ConcurrentHashMap in sql conf, intead synchronizedMap
2014-08-19 19:37:02 -07:00
Michael Armbrust 3abd0c1cda [SPARK-2406][SQL] Initial support for using ParquetTableScan to read HiveMetaStore tables.
This PR adds an experimental flag `spark.sql.hive.convertMetastoreParquet` that when true causes the planner to detects tables that use Hive's Parquet SerDe and instead plans them using Spark SQL's native `ParquetTableScan`.

Author: Michael Armbrust <michael@databricks.com>
Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1819 from marmbrus/parquetMetastore and squashes the following commits:

1620079 [Michael Armbrust] Revert "remove hive parquet bundle"
cc30430 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore
4f3d54f [Michael Armbrust] fix style
41ebc5f [Michael Armbrust] remove hive parquet bundle
a43e0da [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore
4c4dc19 [Michael Armbrust] Fix bug with tree splicing.
ebb267e [Michael Armbrust] include parquet hive to tests pass (Remove this later).
c0d9b72 [Michael Armbrust] Avoid creating a HadoopRDD per partition.  Add dirty hacks to retrieve partition values from the InputSplit.
8cdc93c [Michael Armbrust] Merge pull request #8 from yhuai/parquetMetastore
a0baec7 [Yin Huai] Partitioning columns can be resolved.
1161338 [Michael Armbrust] Add a test to make sure conversion is actually happening
212d5cd [Michael Armbrust] Initial support for using ParquetTableScan to read HiveMetaStore tables.
2014-08-18 13:17:10 -07:00
Matei Zaharia 9eb74c7d2c [SPARK-3091] [SQL] Add support for caching metadata on Parquet files
For larger Parquet files, reading the file footers (which is done in parallel on up to 5 threads) and HDFS block locations (which is serial) can take multiple seconds. We can add an option to cache this data within FilteringParquetInputFormat. Unfortunately ParquetInputFormat only caches footers within each instance of ParquetInputFormat, not across them.

Note: this PR leaves this turned off by default for 1.1, but I believe it's safe to turn it on after. The keys in the hash maps are FileStatus objects that include a modification time, so this will work fine if files are modified. The location cache could become invalid if files have moved within HDFS, but that's rare so I just made it invalidate entries every 15 minutes.

Author: Matei Zaharia <matei@databricks.com>

Closes #2005 from mateiz/parquet-cache and squashes the following commits:

dae8efe [Matei Zaharia] Bug fix
c71e9ed [Matei Zaharia] Handle empty statuses directly
22072b0 [Matei Zaharia] Use Guava caches and add a config option for caching metadata
8fb56ce [Matei Zaharia] Cache file block locations too
453bd21 [Matei Zaharia] Bug fix
4094df6 [Matei Zaharia] First attempt at caching Parquet footers
2014-08-18 11:00:10 -07:00
Patrick Wendell 6bca8898a1 SPARK-3025 [SQL]: Allow JDBC clients to set a fair scheduler pool
This definitely needs review as I am not familiar with this part of Spark.
I tested this locally and it did seem to work.

Author: Patrick Wendell <pwendell@gmail.com>

Closes #1937 from pwendell/scheduler and squashes the following commits:

b858e33 [Patrick Wendell] SPARK-3025: Allow JDBC clients to set a fair scheduler pool
2014-08-18 10:52:20 -07:00
Matei Zaharia 4bf3de7107 [SPARK-3085] [SQL] Use compact data structures in SQL joins
This reuses the CompactBuffer from Spark Core to save memory and pointer
dereferences. I also tried AppendOnlyMap instead of java.util.HashMap
but unfortunately that slows things down because it seems to do more
equals() calls and the equals on GenericRow, and especially JoinedRow,
is pretty expensive.

Author: Matei Zaharia <matei@databricks.com>

Closes #1993 from mateiz/spark-3085 and squashes the following commits:

188221e [Matei Zaharia] Remove unneeded import
5f903ee [Matei Zaharia] [SPARK-3085] [SQL] Use compact data structures in SQL joins
2014-08-18 10:45:24 -07:00
Matei Zaharia 6a13dca12f [SPARK-3084] [SQL] Collect broadcasted tables in parallel in joins
BroadcastHashJoin has a broadcastFuture variable that tries to collect
the broadcasted table in a separate thread, but this doesn't help
because it's a lazy val that only gets initialized when you attempt to
build the RDD. Thus queries that broadcast multiple tables would collect
and broadcast them sequentially. I changed this to a val to let it start
collecting right when the operator is created.

Author: Matei Zaharia <matei@databricks.com>

Closes #1990 from mateiz/spark-3084 and squashes the following commits:

f468766 [Matei Zaharia] [SPARK-3084] Collect broadcasted tables in parallel in joins
2014-08-18 10:05:52 -07:00
Michael Armbrust a7f8a4f5ee Revert [SPARK-3011][SQL] _temporary directory should be filtered out by sqlContext.parquetFile
Reverts #1924 due to build failures with hadoop 0.23.

Author: Michael Armbrust <michael@databricks.com>

Closes #1949 from marmbrus/revert1924 and squashes the following commits:

6bff940 [Michael Armbrust] Revert "[SPARK-3011][SQL] _temporary directory should be filtered out by sqlContext.parquetFile"
2014-08-14 13:00:21 -07:00
Yin Huai add75d4831 [SPARK-2927][SQL] Add a conf to configure if we always read Binary columns stored in Parquet as String columns
This PR adds a new conf flag `spark.sql.parquet.binaryAsString`. When it is `true`, if there is no parquet metadata file available to provide the schema of the data, we will always treat binary fields stored in parquet as string fields. This conf is used to provide a way to read string fields generated without UTF8 decoration.

JIRA: https://issues.apache.org/jira/browse/SPARK-2927

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1855 from yhuai/parquetBinaryAsString and squashes the following commits:

689ffa9 [Yin Huai] Add missing "=".
80827de [Yin Huai] Unit test.
1765ca4 [Yin Huai] Use .toBoolean.
9d3f199 [Yin Huai] Merge remote-tracking branch 'upstream/master' into parquetBinaryAsString
5d436a1 [Yin Huai] The initial support of adding a conf to treat binary columns stored in Parquet as string columns.
2014-08-14 10:46:33 -07:00
Chia-Yung Su 078f3fbda8 [SPARK-3011][SQL] _temporary directory should be filtered out by sqlContext.parquetFile
Author: Chia-Yung Su <chiayung@appier.com>

Closes #1924 from joesu/bugfix-spark3011 and squashes the following commits:

c7e44f2 [Chia-Yung Su] match syntax
f8fc32a [Chia-Yung Su] filter out tmp dir
2014-08-14 10:43:08 -07:00
Michael Armbrust 9fde1ff5fc [SPARK-2935][SQL]Fix parquet predicate push down bug
Author: Michael Armbrust <michael@databricks.com>

Closes #1863 from marmbrus/parquetPredicates and squashes the following commits:

10ad202 [Michael Armbrust] left <=> right
f249158 [Michael Armbrust] quiet parquet tests.
802da5b [Michael Armbrust] Add test case.
eab2eda [Michael Armbrust] Fix parquet predicate push down bug
2014-08-13 17:40:59 -07:00
Cheng Lian 376a82e196 [SPARK-2650][SQL] More precise initial buffer size estimation for in-memory column buffer
This is a follow up of #1880.

Since the row number within a single batch is known, we can estimate a much more precise initial buffer size when building an in-memory column buffer.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1901 from liancheng/precise-init-buffer-size and squashes the following commits:

d5501fa [Cheng Lian] More precise initial buffer size estimation for in-memory column buffer
2014-08-13 17:37:55 -07:00
Cheng Hao 5d54d71ddb [SQL] [SPARK-2826] Reduce the memory copy while building the hashmap for HashOuterJoin
This is a follow up for #1147 , this PR will improve the performance about 10% - 15% in my local tests.
```
Before:
LeftOuterJoin: took 16750 ms ([3000000] records)
LeftOuterJoin: took 15179 ms ([3000000] records)
RightOuterJoin: took 15515 ms ([3000000] records)
RightOuterJoin: took 15276 ms ([3000000] records)
FullOuterJoin: took 19150 ms ([6000000] records)
FullOuterJoin: took 18935 ms ([6000000] records)

After:
LeftOuterJoin: took 15218 ms ([3000000] records)
LeftOuterJoin: took 13503 ms ([3000000] records)
RightOuterJoin: took 13663 ms ([3000000] records)
RightOuterJoin: took 14025 ms ([3000000] records)
FullOuterJoin: took 16624 ms ([6000000] records)
FullOuterJoin: took 16578 ms ([6000000] records)
```

Besides the performance improvement, I also do some clean up as suggested in #1147

Author: Cheng Hao <hao.cheng@intel.com>

Closes #1765 from chenghao-intel/hash_outer_join_fixing and squashes the following commits:

ab1f9e0 [Cheng Hao] Reduce the memory copy while building the hashmap
2014-08-11 20:45:14 -07:00
Michael Armbrust bad21ed085 [SPARK-2650][SQL] Build column buffers in smaller batches
Author: Michael Armbrust <michael@databricks.com>

Closes #1880 from marmbrus/columnBatches and squashes the following commits:

0649987 [Michael Armbrust] add test
4756fad [Michael Armbrust] fix compilation
2314532 [Michael Armbrust] Build column buffers in smaller batches
2014-08-11 20:21:56 -07:00
Takuya UESHIN c9c89c31b6 [SPARK-2965][SQL] Fix HashOuterJoin output nullabilities.
Output attributes of opposite side of `OuterJoin` should be nullable.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1887 from ueshin/issues/SPARK-2965 and squashes the following commits:

bcb2d37 [Takuya UESHIN] Fix HashOuterJoin output nullabilities.
2014-08-11 20:15:01 -07:00
chutium b7c89a7f0c [SPARK-2700] [SQL] Hidden files (such as .impala_insert_staging) should be filtered out by sqlContext.parquetFile
Author: chutium <teng.qiu@gmail.com>

Closes #1691 from chutium/SPARK-2700 and squashes the following commits:

b76ae8c [chutium] [SPARK-2700] [SQL] fixed styling issue
d75a8bd [chutium] [SPARK-2700] [SQL] Hidden files (such as .impala_insert_staging) should be filtered out by sqlContext.parquetFile
2014-08-08 13:31:08 -07:00
Yin Huai 0489cee6b2 [SPARK-2908] [SQL] JsonRDD.nullTypeToStringType does not convert all NullType to StringType
JIRA: https://issues.apache.org/jira/browse/SPARK-2908

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1840 from yhuai/SPARK-2908 and squashes the following commits:

86e833e [Yin Huai] Update test.
cb11759 [Yin Huai] nullTypeToStringType should check columns with the type of array of structs.
2014-08-08 11:10:11 -07:00
Davies Liu 48789117c2 [SPARK-2875] [PySpark] [SQL] handle null in schemaRDD()
Handle null in schemaRDD during converting them into Python.

Author: Davies Liu <davies.liu@gmail.com>

Closes #1802 from davies/json and squashes the following commits:

88e6b1f [Davies Liu] handle null in schemaRDD()
2014-08-06 11:08:12 -07:00
Reynold Xin b70bae40eb [SQL] Tighten the visibility of various SQLConf methods and renamed setter/getters
Author: Reynold Xin <rxin@apache.org>

Closes #1794 from rxin/sql-conf and squashes the following commits:

3ac11ef [Reynold Xin] getAllConfs return an immutable Map instead of an Array.
4b19d6c [Reynold Xin] Tighten the visibility of various SQLConf methods and renamed setter/getters.
2014-08-05 22:29:19 -07:00
Yin Huai 69ec678d3a [SPARK-2854][SQL] Finalize _acceptable_types in pyspark.sql
This PR aims to finalize accepted data value types in Python RDDs provided to Python `applySchema`.

JIRA: https://issues.apache.org/jira/browse/SPARK-2854

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1793 from yhuai/SPARK-2854 and squashes the following commits:

32f0708 [Yin Huai] LongType only accepts long values.
c2b23dd [Yin Huai] Do data type conversions based on the specified Spark SQL data type.
2014-08-05 18:56:10 -07:00
Cheng Lian d0ae3f3912 [SPARK-2650][SQL] Try to partially fix SPARK-2650 by adjusting initial buffer size and reducing memory allocation
JIRA issue: [SPARK-2650](https://issues.apache.org/jira/browse/SPARK-2650)

Please refer to [comments](https://issues.apache.org/jira/browse/SPARK-2650?focusedCommentId=14084397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14084397) of SPARK-2650 for some other details.

This PR adjusts the initial in-memory columnar buffer size to 1MB, same as the default value of Shark's `shark.column.partitionSize.mb` property when running in local mode. Will add Shark style partition size estimation in another PR.

Also, before this PR, `NullableColumnBuilder` copies the whole buffer to add the null positions section, and then `CompressibleColumnBuilder` copies and compresses the buffer again, even if compression is disabled (`PassThrough` compression scheme is used to disable compression). In this PR the first buffer copy is eliminated to reduce memory consumption.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1769 from liancheng/spark-2650 and squashes the following commits:

88a042e [Cheng Lian] Fixed method visibility and removed dead code
001f2e5 [Cheng Lian] Try fixing SPARK-2650 by adjusting initial buffer size and reducing memory allocation
2014-08-05 18:50:37 -07:00
Michael Armbrust 236dfac676 [SPARK-2784][SQL] Deprecate hql() method in favor of a config option, 'spark.sql.dialect'
Many users have reported being confused by the distinction between the `sql` and `hql` methods.  Specifically, many users think that `sql(...)` cannot be used to read hive tables.  In this PR I introduce a new configuration option `spark.sql.dialect` that picks which dialect with be used for parsing.  For SQLContext this must be set to `sql`.  In `HiveContext` it defaults to `hiveql` but can also be set to `sql`.

The `hql` and `hiveql` methods continue to act the same but are now marked as deprecated.

**This is a possibly breaking change for some users unless they set the dialect manually, though this is unlikely.**

For example: `hiveContex.sql("SELECT 1")` will now throw a parsing exception by default.

Author: Michael Armbrust <michael@databricks.com>

Closes #1746 from marmbrus/sqlLanguageConf and squashes the following commits:

ad375cc [Michael Armbrust] Merge remote-tracking branch 'apache/master' into sqlLanguageConf
20c43f8 [Michael Armbrust] override function instead of just setting the value
7e4ae93 [Michael Armbrust] Deprecate hql() method in favor of a config option, 'spark.sql.dialect'
2014-08-03 12:28:29 -07:00
Michael Armbrust 1a8043739d [SPARK-2739][SQL] Rename registerAsTable to registerTempTable
There have been user complaints that the difference between `registerAsTable` and `saveAsTable` is too subtle.  This PR addresses this by renaming `registerAsTable` to `registerTempTable`, which more clearly reflects what is happening.  `registerAsTable` remains, but will cause a deprecation warning.

Author: Michael Armbrust <michael@databricks.com>

Closes #1743 from marmbrus/registerTempTable and squashes the following commits:

d031348 [Michael Armbrust] Merge remote-tracking branch 'apache/master' into registerTempTable
4dff086 [Michael Armbrust] Fix .java files too
89a2f12 [Michael Armbrust] Merge remote-tracking branch 'apache/master' into registerTempTable
0b7b71e [Michael Armbrust] Rename registerAsTable to registerTempTable
2014-08-02 18:27:04 -07:00
Michael Armbrust 158ad0bba9 [SPARK-2097][SQL] UDF Support
This patch adds the ability to register lambda functions written in Python, Java or Scala as UDFs for use in SQL or HiveQL.

Scala:
```scala
registerFunction("strLenScala", (_: String).length)
sql("SELECT strLenScala('test')")
```
Python:
```python
sqlCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType())
sqlCtx.sql("SELECT strLenPython('test')")
```
Java:
```java
sqlContext.registerFunction("stringLengthJava", new UDF1<String, Integer>() {
  Override
  public Integer call(String str) throws Exception {
    return str.length();
  }
}, DataType.IntegerType);

sqlContext.sql("SELECT stringLengthJava('test')");
```

Author: Michael Armbrust <michael@databricks.com>

Closes #1063 from marmbrus/udfs and squashes the following commits:

9eda0fe [Michael Armbrust] newline
747c05e [Michael Armbrust] Add some scala UDF tests.
d92727d [Michael Armbrust] Merge remote-tracking branch 'apache/master' into udfs
005d684 [Michael Armbrust] Fix naming and formatting.
d14dac8 [Michael Armbrust] Fix last line of autogened java files.
8135c48 [Michael Armbrust] Move UDF unit tests to pyspark.
40b0ffd [Michael Armbrust] Merge remote-tracking branch 'apache/master' into udfs
6a36890 [Michael Armbrust] Switch logging so that SQLContext can be serializable.
7a83101 [Michael Armbrust] Drop toString
795fd15 [Michael Armbrust] Try to avoid capturing SQLContext.
e54fb45 [Michael Armbrust] Docs and tests.
437cbe3 [Michael Armbrust] Update use of dataTypes, fix some python tests, address review comments.
01517d6 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into udfs
8e6c932 [Michael Armbrust] WIP
3f96a52 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into udfs
6237c8d [Michael Armbrust] WIP
2766f0b [Michael Armbrust] Move udfs support to SQL from hive. Add support for Java UDFs.
0f7d50c [Michael Armbrust] Draft of native Spark SQL UDFs for Scala and Python.
2014-08-02 16:33:48 -07:00
GuoQiang Li 4c477117bb SPARK-2804: Remove scalalogging-slf4j dependency
This also Closes #1701.

Author: GuoQiang Li <witgo@qq.com>

Closes #1208 from witgo/SPARK-1470 and squashes the following commits:

422646b [GuoQiang Li] Remove scalalogging-slf4j dependency
2014-08-02 13:59:58 -07:00
Yin Huai 67bd8e3c21 [SQL] Set outputPartitioning of BroadcastHashJoin correctly.
I think we will not generate the plan triggering this bug at this moment. But, let me explain it...

Right now, we are using `left.outputPartitioning` as the `outputPartitioning` of a `BroadcastHashJoin`. We may have a wrong physical plan for cases like...
```sql
SELECT l.key, count(*)
FROM (SELECT key, count(*) as cnt
      FROM src
      GROUP BY key) l // This is buildPlan
JOIN r // This is the streamedPlan
ON (l.cnt = r.value)
GROUP BY l.key
```
Let's say we have a `BroadcastHashJoin` on `l` and `r`. For this case, we will pick `l`'s `outputPartitioning` for the `outputPartitioning`of the `BroadcastHashJoin` on `l` and `r`. Also, because the last `GROUP BY` is using `l.key` as the key, we will not introduce an `Exchange` for this aggregation. However, `r`'s outputPartitioning may not match the required distribution of the last `GROUP BY` and we fail to group data correctly.

JIRA is being reindexed. I will create a JIRA ticket once it is back online.

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1735 from yhuai/BroadcastHashJoin and squashes the following commits:

96d9cb3 [Yin Huai] Set outputPartitioning correctly.
2014-08-02 13:16:41 -07:00
Patrick Wendell dab37966b0 Revert "[SPARK-1470][SPARK-1842] Use the scala-logging wrapper instead of the directly sfl4j api"
This reverts commit adc8303294.
2014-08-01 23:55:30 -07:00
GuoQiang Li adc8303294 [SPARK-1470][SPARK-1842] Use the scala-logging wrapper instead of the directly sfl4j api
Author: GuoQiang Li <witgo@qq.com>

Closes #1369 from witgo/SPARK-1470_new and squashes the following commits:

66a1641 [GuoQiang Li] IncompatibleResultTypeProblem
73a89ba [GuoQiang Li] Use the scala-logging wrapper instead of the directly sfl4j api.
2014-08-01 23:55:11 -07:00
Yin Huai 3822f33f3c [SPARK-2212][SQL] Hash Outer Join (follow-up bug fix).
We need to carefully set the ouputPartitioning of the HashOuterJoin Operator. Otherwise, we may not correctly handle nulls.

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1721 from yhuai/SPARK-2212-BugFix and squashes the following commits:

ed5eef7 [Yin Huai] Correctly choosing outputPartitioning for the HashOuterJoin operator.
2014-08-01 18:52:01 -07:00
Davies Liu 880eabec37 [SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD
Convert Row in JavaSchemaRDD into Array[Any] and unpickle them as tuple in Python, then convert them into namedtuple, so use can access fields just like attributes.

This will let nested structure can be accessed as object, also it will reduce the size of serialized data and better performance.

root
 |-- field1: integer (nullable = true)
 |-- field2: string (nullable = true)
 |-- field3: struct (nullable = true)
 |    |-- field4: integer (nullable = true)
 |    |-- field5: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |-- field6: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- field7: string (nullable = true)

Then we can access them by row.field3.field5[0]  or row.field6[5].field7

It also will infer the schema in Python, convert Row/dict/namedtuple/objects into tuple before serialization, then call applySchema in JVM. During inferSchema(), the top level of dict in row will be StructType, but any nested dictionary will be MapType.

You can use pyspark.sql.Row to convert unnamed structure into Row object, make the RDD can be inferable. Such as:

ctx.inferSchema(rdd.map(lambda x: Row(a=x[0], b=x[1]))

Or you could use Row to create a class just like namedtuple, for example:

Person = Row("name", "age")
ctx.inferSchema(rdd.map(lambda x: Person(*x)))

Also, you can call applySchema to apply an schema to a RDD of tuple/list and turn it into a SchemaRDD. The `schema` should be StructType, see the API docs for details.

schema = StructType([StructField("name, StringType, True),
                                    StructType("age", IntegerType, True)])
ctx.applySchema(rdd, schema)

PS: In order to use namedtuple to inferSchema, you should make namedtuple picklable.

Author: Davies Liu <davies.liu@gmail.com>

Closes #1598 from davies/nested and squashes the following commits:

f1d15b6 [Davies Liu] verify schema with the first few rows
8852aaf [Davies Liu] check type of schema
abe9e6e [Davies Liu] address comments
61b2292 [Davies Liu] add @deprecated to pythonToJavaMap
1e5b801 [Davies Liu] improve cache of classes
51aa135 [Davies Liu] use Row to infer schema
e9c0d5c [Davies Liu] remove string typed schema
353a3f2 [Davies Liu] fix code style
63de8f8 [Davies Liu] fix typo
c79ca67 [Davies Liu] fix serialization of nested data
6b258b5 [Davies Liu] fix pep8
9d8447c [Davies Liu] apply schema provided by string of names
f5df97f [Davies Liu] refactor, address comments
9d9af55 [Davies Liu] use arrry to applySchema and infer schema in Python
84679b3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into nested
0eaaf56 [Davies Liu] fix doc tests
b3559b4 [Davies Liu] use generated Row instead of namedtuple
c4ddc30 [Davies Liu] fix conflict between name of fields and variables
7f6f251 [Davies Liu] address all comments
d69d397 [Davies Liu] refactor
2cc2d45 [Davies Liu] refactor
182fb46 [Davies Liu] refactor
bc6e9e1 [Davies Liu] switch to new Schema API
547bf3e [Davies Liu] Merge branch 'master' into nested
a435b5a [Davies Liu] add docs and code refactor
2c8debc [Davies Liu] Merge branch 'master' into nested
644665a [Davies Liu] use tuple and namedtuple for schemardd
2014-08-01 18:47:41 -07:00
chutium 580c7011ca [SPARK-2729] [SQL] Forgot to match Timestamp type in ColumnBuilder
just a match forgot, found after SPARK-2710 , TimestampType can be used by a SchemaRDD generated from JDBC ResultSet

Author: chutium <teng.qiu@gmail.com>

Closes #1636 from chutium/SPARK-2729 and squashes the following commits:

71af77a [chutium] [SPARK-2729] [SQL] added Timestamp in NullableColumnAccessorSuite
39cf9f8 [chutium] [SPARK-2729] add Timestamp Type into ColumnBuilder TestSuite, ref. #1636
ab6ff97 [chutium] [SPARK-2729] Forgot to match Timestamp type in ColumnBuilder
2014-08-01 11:31:44 -07:00
Cheng Hao 4415722e91 [SQL][SPARK-2212]Hash Outer Join
This patch is to support the hash based outer join. Currently, outer join for big relations are resort to `BoradcastNestedLoopJoin`, which is super slow. This PR will create 2 hash tables for both relations in the same partition, which greatly reduce the table scans.

Here is the testing code that I used:
```
package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._

case class Record(key: String, value: String)

object JoinTablePrepare extends App {
  import TestHive2._

  val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"${i % 828193}", s"val_$i")))

  runSqlHive("SHOW TABLES")
  runSqlHive("DROP TABLE if exists a")
  runSqlHive("DROP TABLE if exists b")
  runSqlHive("DROP TABLE if exists result")
  rdd.registerAsTable("records")

  runSqlHive("""CREATE TABLE a (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE b (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE result (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)

  hql(s"""from records
             | insert into table a
             | select key, value
           """.stripMargin)
  hql(s"""from records
             | insert into table b select key + 100000, value
           """.stripMargin)
}

object JoinTablePerformanceTest extends App {
  import TestHive2._

  hql("SHOW TABLES")
  hql("set spark.sql.shuffle.partitions=20")

  val leftOuterJoin = "insert overwrite table result select a.key, b.value from a left outer join b on a.key=b.key"
  val rightOuterJoin = "insert overwrite table result select a.key, b.value from a right outer join b on a.key=b.key"
  val fullOuterJoin = "insert overwrite table result select a.key, b.value from a full outer join b on a.key=b.key"

  val results = ("LeftOuterJoin", benchmark(leftOuterJoin)) :: ("LeftOuterJoin", benchmark(leftOuterJoin)) ::
                ("RightOuterJoin", benchmark(rightOuterJoin)) :: ("RightOuterJoin", benchmark(rightOuterJoin)) ::
                ("FullOuterJoin", benchmark(fullOuterJoin)) :: ("FullOuterJoin", benchmark(fullOuterJoin)) :: Nil
  val explains = hql(s"explain $leftOuterJoin").collect ++ hql(s"explain $rightOuterJoin").collect ++ hql(s"explain $fullOuterJoin").collect
  println(explains.mkString(",\n"))
  results.foreach { case (prompt, result) => {
      println(s"$prompt: took ${result._1} ms (${result._2} records)")
    }
  }

  def benchmark(cmd: String) = {
    val begin = System.currentTimeMillis()
    val result = hql(cmd)
    val end = System.currentTimeMillis()
    val count = hql("select count(1) from result").collect.mkString("")
    ((end - begin), count)
  }
}
```
And the result as shown below:
```
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#95,value#98]],
[  HashOuterJoin [key#95], [key#97], LeftOuter, None],
[   Exchange (HashPartitioning [key#95], 20)],
[    HiveTableScan [key#95], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#97], 20)],
[    HiveTableScan [key#97,value#98], (MetastoreRelation default, b, None), None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#102,value#105]],
[  HashOuterJoin [key#102], [key#104], RightOuter, None],
[   Exchange (HashPartitioning [key#102], 20)],
[    HiveTableScan [key#102], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#104], 20)],
[    HiveTableScan [key#104,value#105], (MetastoreRelation default, b, None), None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#109,value#112]],
[  HashOuterJoin [key#109], [key#111], FullOuter, None],
[   Exchange (HashPartitioning [key#109], 20)],
[    HiveTableScan [key#109], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#111], 20)],
[    HiveTableScan [key#111,value#112], (MetastoreRelation default, b, None), None]
LeftOuterJoin: took 16072 ms ([3000000] records)
LeftOuterJoin: took 14394 ms ([3000000] records)
RightOuterJoin: took 14802 ms ([3000000] records)
RightOuterJoin: took 14747 ms ([3000000] records)
FullOuterJoin: took 17715 ms ([6000000] records)
FullOuterJoin: took 17629 ms ([6000000] records)
```

Without this PR, the benchmark will run seems never end.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #1147 from chenghao-intel/hash_based_outer_join and squashes the following commits:

65c599e [Cheng Hao] Fix issues with the community comments
72b1394 [Cheng Hao] Fix bug of stale value in joinedRow
55baef7 [Cheng Hao] Add HashOuterJoin
2014-08-01 11:27:12 -07:00
Yin Huai c41fdf04f4 [SPARK-2179][SQL] A minor refactoring Java data type APIs (2179 follow-up).
It is a follow-up PR of SPARK-2179 (https://issues.apache.org/jira/browse/SPARK-2179). It makes package names of data type APIs more consistent across languages (Scala: `org.apache.spark.sql`, Java: `org.apache.spark.sql.api.java`, Python: `pyspark.sql`).

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1712 from yhuai/javaDataType and squashes the following commits:

62eb705 [Yin Huai] Move package-info.
add4bcb [Yin Huai] Make the package names of data type classes consistent across languages by moving all Java data type classes to package sql.api.java.
2014-08-01 11:14:53 -07:00
Yin Huai 9632719c9e [SPARK-2779] [SQL] asInstanceOf[Map[...]] should use scala.collection.Map instead of scala.collection.immutable.Map
Since we let users create Rows. It makes sense to accept mutable Maps as values of MapType columns.

JIRA: https://issues.apache.org/jira/browse/SPARK-2779

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1705 from yhuai/SPARK-2779 and squashes the following commits:

00d72fd [Yin Huai] Use scala.collection.Map.
2014-07-31 21:02:11 -07:00
Zongheng Yang 8f51491ea7 [SPARK-2531 & SPARK-2436] [SQL] Optimize the BuildSide when planning BroadcastNestedLoopJoin.
This PR resolves the following two tickets:

- [SPARK-2531](https://issues.apache.org/jira/browse/SPARK-2531): BNLJ currently assumes the build side is the right relation. This patch refactors some of its logic to take into account a BuildSide properly.
- [SPARK-2436](https://issues.apache.org/jira/browse/SPARK-2436): building on top of the above, we simply use the physical size statistics (if available) of both relations, and make the smaller relation the build side in the planner.

Author: Zongheng Yang <zongheng.y@gmail.com>

Closes #1448 from concretevitamin/bnlj-buildSide and squashes the following commits:

1780351 [Zongheng Yang] Use size estimation to decide optimal build side of BNLJ.
68e6c5b [Zongheng Yang] Consolidate two adjacent pattern matchings.
96d312a [Zongheng Yang] Use a while loop instead of collection methods chaining.
4bc525e [Zongheng Yang] Make BroadcastNestedLoopJoin take a BuildSide.
2014-07-31 19:32:16 -07:00
Michael Armbrust 3072b96026 [SPARK-2743][SQL] Resolve original attributes in ParquetTableScan
Author: Michael Armbrust <michael@databricks.com>

Closes #1647 from marmbrus/parquetCase and squashes the following commits:

a1799b7 [Michael Armbrust] move comment
2a2a68b [Michael Armbrust] Merge remote-tracking branch 'apache/master' into parquetCase
bb35d5b [Michael Armbrust] Fix test case that produced an invalid plan.
e6870bf [Michael Armbrust] Better error message.
539a2e1 [Michael Armbrust] Resolve original attributes in ParquetTableScan
2014-07-31 11:15:25 -07:00
Matei Zaharia e966284409 SPARK-2045 Sort-based shuffle
This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.)

The main TODOs still left are:
- [x] enabling ExternalSorter to merge across spilled files
  - [x] with an Ordering
  - [x] without an Ordering, using the keys' hash codes
- [x] adding more tests (e.g. a version of our shuffle suite that runs on this)
- [x] rebasing on top of the size-tracking refactoring in #1165 when that is merged
- [x] disabling spilling if spark.shuffle.spill is set to false

Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback.

After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in `sortByKey`.

Author: Matei Zaharia <matei@databricks.com>

Closes #1499 from mateiz/sort-based-shuffle and squashes the following commits:

bd841f9 [Matei Zaharia] Various review comments
d1c137fd [Matei Zaharia] Various review comments
a611159 [Matei Zaharia] Compile fixes due to rebase
62c56c8 [Matei Zaharia] Fix ShuffledRDD sometimes not returning Tuple2s.
f617432 [Matei Zaharia] Fix a failing test (seems to be due to change in SizeTracker logic)
9464d5f [Matei Zaharia] Simplify code and fix conflicts after latest rebase
0174149 [Matei Zaharia] Add cleanup behavior and cleanup tests for sort-based shuffle
eb4ee0d [Matei Zaharia] Remove customizable element type in ShuffledRDD
fa2e8db [Matei Zaharia] Allow nextBatchStream to be called after we're done looking at all streams
a34b352 [Matei Zaharia] Fix tracking of indices within a partition in SpillReader, and add test
03e1006 [Matei Zaharia] Add a SortShuffleSuite that runs ShuffleSuite with sort-based shuffle
3c7ff1f [Matei Zaharia] Obey the spark.shuffle.spill setting in ExternalSorter
ad65fbd [Matei Zaharia] Rebase on top of Aaron's Sorter change, and use Sorter in our buffer
44d2a93 [Matei Zaharia] Use estimateSize instead of atGrowThreshold to test collection sizes
5686f71 [Matei Zaharia] Optimize merging phase for in-memory only data:
5461cbb [Matei Zaharia] Review comments and more tests (e.g. tests with 1 element per partition)
e9ad356 [Matei Zaharia] Update ContextCleanerSuite to make sure shuffle cleanup tests use hash shuffle (since they were written for it)
c72362a [Matei Zaharia] Added bug fix and test for when iterators are empty
de1fb40 [Matei Zaharia] Make trait SizeTrackingCollection private[spark]
4988d16 [Matei Zaharia] tweak
c1b7572 [Matei Zaharia] Small optimization
ba7db7f [Matei Zaharia] Handle null keys in hash-based comparator, and add tests for collisions
ef4e397 [Matei Zaharia] Support for partial aggregation even without an Ordering
4b7a5ce [Matei Zaharia] More tests, and ability to sort data if a total ordering is given
e1f84be [Matei Zaharia] Fix disk block manager test
5a40a1c [Matei Zaharia] More tests
614f1b4 [Matei Zaharia] Add spill metrics to map tasks
cc52caf [Matei Zaharia] Add more error handling and tests for error cases
bbf359d [Matei Zaharia] More work
3a56341 [Matei Zaharia] More partial work towards sort-based shuffle
7a0895d [Matei Zaharia] Some more partial work towards sort-based shuffle
b615476 [Matei Zaharia] Scaffolding for sort-based shuffle
2014-07-30 18:07:59 -07:00