it seems that set command does not run by SparkSQLDriver. it runs on hive api.
user can not change reduce number by setting spark.sql.shuffle.partitions
but i think setting hive properties seems just a role to spark sql.
Author: guowei <guowei@upyoo.com>
Closes#1904 from guowei2/temp-branch and squashes the following commits:
7d47dde [guowei] fixed: setting properties like spark.sql.shuffle.partitions does not effective
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#1891 from sarutak/SPARK-2970 and squashes the following commits:
4a2d2fe [Kousuke Saruta] Modified comment style
8bd833c [Kousuke Saruta] Modified style
6c0997c [Kousuke Saruta] Modified the timing of shutdown hook execution. It should be executed before shutdown hook of o.a.h.f.FileSystem
Author: Michael Armbrust <michael@databricks.com>
Closes#1863 from marmbrus/parquetPredicates and squashes the following commits:
10ad202 [Michael Armbrust] left <=> right
f249158 [Michael Armbrust] quiet parquet tests.
802da5b [Michael Armbrust] Add test case.
eab2eda [Michael Armbrust] Fix parquet predicate push down bug
This is a follow up of #1880.
Since the row number within a single batch is known, we can estimate a much more precise initial buffer size when building an in-memory column buffer.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes#1901 from liancheng/precise-init-buffer-size and squashes the following commits:
d5501fa [Cheng Lian] More precise initial buffer size estimation for in-memory column buffer
Author: Michael Armbrust <michael@databricks.com>
Closes#1915 from marmbrus/arrayUDF and squashes the following commits:
a1c503d [Michael Armbrust] Support for udfs that take complex types
In spark sql component, the "show create table" syntax had been disabled.
We thought it is a useful funciton to describe a hive table.
Author: tianyi <tianyi@asiainfo-linkage.com>
Author: tianyi <tianyi@asiainfo.com>
Author: tianyi <tianyi.asiainfo@gmail.com>
Closes#1760 from tianyi/spark-2817 and squashes the following commits:
7d28b15 [tianyi] [SPARK-2817] fix too short prefix problem
cbffe8b [tianyi] [SPARK-2817] fix the case problem
565ec14 [tianyi] [SPARK-2817] fix the case problem
60d48a9 [tianyi] [SPARK-2817] use system temporary folder instead of temporary files in the source tree, and also clean some empty line
dbe1031 [tianyi] [SPARK-2817] move some code out of function rewritePaths, as it may be called multiple times
9b2ba11 [tianyi] [SPARK-2817] fix the line length problem
9f97586 [tianyi] [SPARK-2817] remove test.tmp.dir from pom.xml
bfc2999 [tianyi] [SPARK-2817] add "File.separator" support, create a "testTmpDir" outside the rewritePaths
bde800a [tianyi] [SPARK-2817] add "${system:test.tmp.dir}" support add "last_modified_by" to nonDeterministicLineIndicators in HiveComparisonTest
bb82726 [tianyi] [SPARK-2817] remove test which requires a system from the whitelist.
bbf6b42 [tianyi] [SPARK-2817] add a systemProperties named "test.tmp.dir" to pass the test which contains "${system:test.tmp.dir}"
a337bd6 [tianyi] [SPARK-2817] add "show create table" support
a03db77 [tianyi] [SPARK-2817] add "show create table" support
JIRA issue: [SPARK-3004](https://issues.apache.org/jira/browse/SPARK-3004)
HiveThriftServer2 throws exception when the result set contains `NULL`. Should check `isNullAt` in `SparkSQLOperationManager.getNextRowSet`.
Note that simply using `row.addColumnValue(null)` doesn't work, since Hive set the column type of a null `ColumnValue` to String by default.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes#1920 from liancheng/spark-3004 and squashes the following commits:
1b1db1c [Cheng Lian] Adding NULL column values in the Hive way
2217722 [Cheng Lian] Fixed SPARK-3004: added null checking when retrieving row set
This is a follow up for #1147 , this PR will improve the performance about 10% - 15% in my local tests.
```
Before:
LeftOuterJoin: took 16750 ms ([3000000] records)
LeftOuterJoin: took 15179 ms ([3000000] records)
RightOuterJoin: took 15515 ms ([3000000] records)
RightOuterJoin: took 15276 ms ([3000000] records)
FullOuterJoin: took 19150 ms ([6000000] records)
FullOuterJoin: took 18935 ms ([6000000] records)
After:
LeftOuterJoin: took 15218 ms ([3000000] records)
LeftOuterJoin: took 13503 ms ([3000000] records)
RightOuterJoin: took 13663 ms ([3000000] records)
RightOuterJoin: took 14025 ms ([3000000] records)
FullOuterJoin: took 16624 ms ([6000000] records)
FullOuterJoin: took 16578 ms ([6000000] records)
```
Besides the performance improvement, I also do some clean up as suggested in #1147
Author: Cheng Hao <hao.cheng@intel.com>
Closes#1765 from chenghao-intel/hash_outer_join_fixing and squashes the following commits:
ab1f9e0 [Cheng Hao] Reduce the memory copy while building the hashmap
Author: Michael Armbrust <michael@databricks.com>
Closes#1880 from marmbrus/columnBatches and squashes the following commits:
0649987 [Michael Armbrust] add test
4756fad [Michael Armbrust] fix compilation
2314532 [Michael Armbrust] Build column buffers in smaller batches
Output nullabilities of `Explode` could be detemined by `ArrayType.containsNull` or `MapType.valueContainsNull`.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#1888 from ueshin/issues/SPARK-2968 and squashes the following commits:
d128c95 [Takuya UESHIN] Fix nullability of Explode.
Output attributes of opposite side of `OuterJoin` should be nullable.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#1887 from ueshin/issues/SPARK-2965 and squashes the following commits:
bcb2d37 [Takuya UESHIN] Fix HashOuterJoin output nullabilities.
I should use `EliminateAnalysisOperators` in `analyze` instead of manually pattern matching.
Author: Yin Huai <huaiyin.thu@gmail.com>
Closes#1881 from yhuai/useEliminateAnalysisOperators and squashes the following commits:
f3e1e7f [Yin Huai] Use EliminateAnalysisOperators.
Author: wangfei <wangfei1@huawei.com>
Closes#1852 from scwf/patch-3 and squashes the following commits:
ae28c29 [wangfei] use SparkSQLEnv.stop() in ShutdownHook
JIRA issue: [SPARK-2590](https://issues.apache.org/jira/browse/SPARK-2590)
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes#1853 from liancheng/inc-collect-option and squashes the following commits:
cb3ea45 [Cheng Lian] Moved incremental collection option to Thrift server
43ce3aa [Cheng Lian] Changed incremental collect option name
623abde [Cheng Lian] Added option to handle incremental collection, disabled by default
Author: Reynold Xin <rxin@apache.org>
Closes#1867 from rxin/sql-readme and squashes the following commits:
42a5307 [Reynold Xin] Updated Spark SQL README to include the hive-thriftserver module
Author: chutium <teng.qiu@gmail.com>
Closes#1691 from chutium/SPARK-2700 and squashes the following commits:
b76ae8c [chutium] [SPARK-2700] [SQL] fixed styling issue
d75a8bd [chutium] [SPARK-2700] [SQL] Hidden files (such as .impala_insert_staging) should be filtered out by sqlContext.parquetFile
JIRA: https://issues.apache.org/jira/browse/SPARK-2908
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes#1840 from yhuai/SPARK-2908 and squashes the following commits:
86e833e [Yin Huai] Update test.
cb11759 [Yin Huai] nullTypeToStringType should check columns with the type of array of structs.
JIRA: https://issues.apache.org/jira/browse/SPARK-2888
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes#1817 from yhuai/fixAddColumnMetadataToConf and squashes the following commits:
fba728c [Yin Huai] Fix addColumnMetadataToConf.
JIRA issues:
- Main: [SPARK-2678](https://issues.apache.org/jira/browse/SPARK-2678)
- Related: [SPARK-2874](https://issues.apache.org/jira/browse/SPARK-2874)
Related PR:
- #1715
This PR is both a fix for SPARK-2874 and a workaround for SPARK-2678. Fixing SPARK-2678 completely requires some API level changes that need further discussion, and we decided not to include it in Spark 1.1 release. As currently SPARK-2678 only affects Spark SQL scripts, this workaround is enough for Spark 1.1. Command line option handling logic in bash scripts looks somewhat dirty and duplicated, but it helps to provide a cleaner user interface as well as retain full downward compatibility for now.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes#1801 from liancheng/spark-2874 and squashes the following commits:
8045d7a [Cheng Lian] Make sure test suites pass
8493a9e [Cheng Lian] Using eval to retain quoted arguments
aed523f [Cheng Lian] Fixed typo in bin/spark-sql
f12a0b1 [Cheng Lian] Worked arount SPARK-2678
daee105 [Cheng Lian] Fixed usage messages of all Spark SQL related scripts
Handle null in schemaRDD during converting them into Python.
Author: Davies Liu <davies.liu@gmail.com>
Closes#1802 from davies/json and squashes the following commits:
88e6b1f [Davies Liu] handle null in schemaRDD()
Author: Michael Armbrust <michael@databricks.com>
Closes#1800 from marmbrus/warning and squashes the following commits:
8ea9cf1 [Michael Armbrust] [SQL] Fix logging warn -> debug.
Author: Reynold Xin <rxin@apache.org>
Closes#1794 from rxin/sql-conf and squashes the following commits:
3ac11ef [Reynold Xin] getAllConfs return an immutable Map instead of an Array.
4b19d6c [Reynold Xin] Tighten the visibility of various SQLConf methods and renamed setter/getters.
Minor refactoring to allow resolution either using a nodes input or output.
Author: Michael Armbrust <michael@databricks.com>
Closes#1795 from marmbrus/ordering and squashes the following commits:
237f580 [Michael Armbrust] style
74d833b [Michael Armbrust] newline
705d963 [Michael Armbrust] Add a rule for resolving ORDER BY expressions that reference attributes not present in the SELECT clause.
82cabda [Michael Armbrust] Generalize attribute resolution.
This PR aims to finalize accepted data value types in Python RDDs provided to Python `applySchema`.
JIRA: https://issues.apache.org/jira/browse/SPARK-2854
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes#1793 from yhuai/SPARK-2854 and squashes the following commits:
32f0708 [Yin Huai] LongType only accepts long values.
c2b23dd [Yin Huai] Do data type conversions based on the specified Spark SQL data type.
JIRA issue: [SPARK-2650](https://issues.apache.org/jira/browse/SPARK-2650)
Please refer to [comments](https://issues.apache.org/jira/browse/SPARK-2650?focusedCommentId=14084397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14084397) of SPARK-2650 for some other details.
This PR adjusts the initial in-memory columnar buffer size to 1MB, same as the default value of Shark's `shark.column.partitionSize.mb` property when running in local mode. Will add Shark style partition size estimation in another PR.
Also, before this PR, `NullableColumnBuilder` copies the whole buffer to add the null positions section, and then `CompressibleColumnBuilder` copies and compresses the buffer again, even if compression is disabled (`PassThrough` compression scheme is used to disable compression). In this PR the first buffer copy is eliminated to reduce memory consumption.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes#1769 from liancheng/spark-2650 and squashes the following commits:
88a042e [Cheng Lian] Fixed method visibility and removed dead code
001f2e5 [Cheng Lian] Try fixing SPARK-2650 by adjusting initial buffer size and reducing memory allocation
module spark-hive-thriftserver_2.10 and spark-hive_2.10 both named "Spark Project Hive" in pom.xml, so rename spark-hive-thriftserver_2.10 project name to "Spark Project Hive Thrift Server"
Author: wangfei <wangfei1@huawei.com>
Closes#1789 from scwf/patch-1 and squashes the following commits:
ca1f5e9 [wangfei] [sql] rename module name of hive-thriftserver
Author: Michael Armbrust <michael@databricks.com>
Closes#1785 from marmbrus/caseNull and squashes the following commits:
126006d [Michael Armbrust] better error message
2fe357f [Michael Armbrust] Fix coercion of CASE WHEN.
JIRA: https://issues.apache.org/jira/browse/SPARK-2783
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes#1741 from yhuai/analyzeTable and squashes the following commits:
7bb5f02 [Yin Huai] Use sql instead of hql.
4d09325 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable
e3ebcd4 [Yin Huai] Renaming.
c170f4e [Yin Huai] Do not use getContentSummary.
62393b6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable
db233a6 [Yin Huai] Trying to debug jenkins...
fee84f0 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable
f0501f3 [Yin Huai] Fix compilation error.
24ad391 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable
8918140 [Yin Huai] Wording.
23df227 [Yin Huai] Add a simple analyze method to get the size of a table and update the "totalSize" property of this table in the Hive metastore.
JIRA issue: [SPARK-2814](https://issues.apache.org/jira/browse/SPARK-2814)
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes#1753 from liancheng/spark-2814 and squashes the following commits:
c74a3b2 [Cheng Lian] Fixed SPARK-2814
Many users have reported being confused by the distinction between the `sql` and `hql` methods. Specifically, many users think that `sql(...)` cannot be used to read hive tables. In this PR I introduce a new configuration option `spark.sql.dialect` that picks which dialect with be used for parsing. For SQLContext this must be set to `sql`. In `HiveContext` it defaults to `hiveql` but can also be set to `sql`.
The `hql` and `hiveql` methods continue to act the same but are now marked as deprecated.
**This is a possibly breaking change for some users unless they set the dialect manually, though this is unlikely.**
For example: `hiveContex.sql("SELECT 1")` will now throw a parsing exception by default.
Author: Michael Armbrust <michael@databricks.com>
Closes#1746 from marmbrus/sqlLanguageConf and squashes the following commits:
ad375cc [Michael Armbrust] Merge remote-tracking branch 'apache/master' into sqlLanguageConf
20c43f8 [Michael Armbrust] override function instead of just setting the value
7e4ae93 [Michael Armbrust] Deprecate hql() method in favor of a config option, 'spark.sql.dialect'
There have been user complaints that the difference between `registerAsTable` and `saveAsTable` is too subtle. This PR addresses this by renaming `registerAsTable` to `registerTempTable`, which more clearly reflects what is happening. `registerAsTable` remains, but will cause a deprecation warning.
Author: Michael Armbrust <michael@databricks.com>
Closes#1743 from marmbrus/registerTempTable and squashes the following commits:
d031348 [Michael Armbrust] Merge remote-tracking branch 'apache/master' into registerTempTable
4dff086 [Michael Armbrust] Fix .java files too
89a2f12 [Michael Armbrust] Merge remote-tracking branch 'apache/master' into registerTempTable
0b7b71e [Michael Armbrust] Rename registerAsTable to registerTempTable
This is a follow up of #1636.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes#1738 from liancheng/test-for-spark-2729 and squashes the following commits:
b13692a [Cheng Lian] Added test case for SPARK-2729
Author: Michael Armbrust <michael@databricks.com>
Closes#1742 from marmbrus/asserts and squashes the following commits:
5182d54 [Michael Armbrust] Remove assertions that throw when users try unsupported Hive commands.
This patch adds the ability to register lambda functions written in Python, Java or Scala as UDFs for use in SQL or HiveQL.
Scala:
```scala
registerFunction("strLenScala", (_: String).length)
sql("SELECT strLenScala('test')")
```
Python:
```python
sqlCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType())
sqlCtx.sql("SELECT strLenPython('test')")
```
Java:
```java
sqlContext.registerFunction("stringLengthJava", new UDF1<String, Integer>() {
Override
public Integer call(String str) throws Exception {
return str.length();
}
}, DataType.IntegerType);
sqlContext.sql("SELECT stringLengthJava('test')");
```
Author: Michael Armbrust <michael@databricks.com>
Closes#1063 from marmbrus/udfs and squashes the following commits:
9eda0fe [Michael Armbrust] newline
747c05e [Michael Armbrust] Add some scala UDF tests.
d92727d [Michael Armbrust] Merge remote-tracking branch 'apache/master' into udfs
005d684 [Michael Armbrust] Fix naming and formatting.
d14dac8 [Michael Armbrust] Fix last line of autogened java files.
8135c48 [Michael Armbrust] Move UDF unit tests to pyspark.
40b0ffd [Michael Armbrust] Merge remote-tracking branch 'apache/master' into udfs
6a36890 [Michael Armbrust] Switch logging so that SQLContext can be serializable.
7a83101 [Michael Armbrust] Drop toString
795fd15 [Michael Armbrust] Try to avoid capturing SQLContext.
e54fb45 [Michael Armbrust] Docs and tests.
437cbe3 [Michael Armbrust] Update use of dataTypes, fix some python tests, address review comments.
01517d6 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into udfs
8e6c932 [Michael Armbrust] WIP
3f96a52 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into udfs
6237c8d [Michael Armbrust] WIP
2766f0b [Michael Armbrust] Move udfs support to SQL from hive. Add support for Java UDFs.
0f7d50c [Michael Armbrust] Draft of native Spark SQL UDFs for Scala and Python.
This also Closes#1701.
Author: GuoQiang Li <witgo@qq.com>
Closes#1208 from witgo/SPARK-1470 and squashes the following commits:
422646b [GuoQiang Li] Remove scalalogging-slf4j dependency
I think we will not generate the plan triggering this bug at this moment. But, let me explain it...
Right now, we are using `left.outputPartitioning` as the `outputPartitioning` of a `BroadcastHashJoin`. We may have a wrong physical plan for cases like...
```sql
SELECT l.key, count(*)
FROM (SELECT key, count(*) as cnt
FROM src
GROUP BY key) l // This is buildPlan
JOIN r // This is the streamedPlan
ON (l.cnt = r.value)
GROUP BY l.key
```
Let's say we have a `BroadcastHashJoin` on `l` and `r`. For this case, we will pick `l`'s `outputPartitioning` for the `outputPartitioning`of the `BroadcastHashJoin` on `l` and `r`. Also, because the last `GROUP BY` is using `l.key` as the key, we will not introduce an `Exchange` for this aggregation. However, `r`'s outputPartitioning may not match the required distribution of the last `GROUP BY` and we fail to group data correctly.
JIRA is being reindexed. I will create a JIRA ticket once it is back online.
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes#1735 from yhuai/BroadcastHashJoin and squashes the following commits:
96d9cb3 [Yin Huai] Set outputPartitioning correctly.
For Scala 2.11 compatibility.
Without the explicit type specification, withNullability
return type is inferred to be Attribute, and thus calling
at() on the returned object fails in these tests:
[ERROR] /Users/avati/work/spark/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala:370: value at is not a
[ERROR] val c4_notNull = 'a.boolean.notNull.at(3)
[ERROR] ^
[ERROR] /Users/avati/work/spark/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala:371: value at is not a
[ERROR] val c5_notNull = 'a.boolean.notNull.at(4)
[ERROR] ^
[ERROR] /Users/avati/work/spark/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala:372: value at is not a
[ERROR] val c6_notNull = 'a.boolean.notNull.at(5)
[ERROR] ^
[ERROR] /Users/avati/work/spark/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala:558: value at is not a
[ERROR] val s_notNull = 'a.string.notNull.at(0)
Signed-off-by: Anand Avati <avatiredhat.com>
Author: Anand Avati <avati@redhat.com>
Closes#1709 from avati/SPARK-1812-notnull and squashes the following commits:
0470eb3 [Anand Avati] SPARK-1812: sql/catalyst - Provide explicit type information
Author: GuoQiang Li <witgo@qq.com>
Closes#1369 from witgo/SPARK-1470_new and squashes the following commits:
66a1641 [GuoQiang Li] IncompatibleResultTypeProblem
73a89ba [GuoQiang Li] Use the scala-logging wrapper instead of the directly sfl4j api.
We need to carefully set the ouputPartitioning of the HashOuterJoin Operator. Otherwise, we may not correctly handle nulls.
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes#1721 from yhuai/SPARK-2212-BugFix and squashes the following commits:
ed5eef7 [Yin Huai] Correctly choosing outputPartitioning for the HashOuterJoin operator.
Convert Row in JavaSchemaRDD into Array[Any] and unpickle them as tuple in Python, then convert them into namedtuple, so use can access fields just like attributes.
This will let nested structure can be accessed as object, also it will reduce the size of serialized data and better performance.
root
|-- field1: integer (nullable = true)
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field4: integer (nullable = true)
| |-- field5: array (nullable = true)
| | |-- element: integer (containsNull = false)
|-- field6: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- field7: string (nullable = true)
Then we can access them by row.field3.field5[0] or row.field6[5].field7
It also will infer the schema in Python, convert Row/dict/namedtuple/objects into tuple before serialization, then call applySchema in JVM. During inferSchema(), the top level of dict in row will be StructType, but any nested dictionary will be MapType.
You can use pyspark.sql.Row to convert unnamed structure into Row object, make the RDD can be inferable. Such as:
ctx.inferSchema(rdd.map(lambda x: Row(a=x[0], b=x[1]))
Or you could use Row to create a class just like namedtuple, for example:
Person = Row("name", "age")
ctx.inferSchema(rdd.map(lambda x: Person(*x)))
Also, you can call applySchema to apply an schema to a RDD of tuple/list and turn it into a SchemaRDD. The `schema` should be StructType, see the API docs for details.
schema = StructType([StructField("name, StringType, True),
StructType("age", IntegerType, True)])
ctx.applySchema(rdd, schema)
PS: In order to use namedtuple to inferSchema, you should make namedtuple picklable.
Author: Davies Liu <davies.liu@gmail.com>
Closes#1598 from davies/nested and squashes the following commits:
f1d15b6 [Davies Liu] verify schema with the first few rows
8852aaf [Davies Liu] check type of schema
abe9e6e [Davies Liu] address comments
61b2292 [Davies Liu] add @deprecated to pythonToJavaMap
1e5b801 [Davies Liu] improve cache of classes
51aa135 [Davies Liu] use Row to infer schema
e9c0d5c [Davies Liu] remove string typed schema
353a3f2 [Davies Liu] fix code style
63de8f8 [Davies Liu] fix typo
c79ca67 [Davies Liu] fix serialization of nested data
6b258b5 [Davies Liu] fix pep8
9d8447c [Davies Liu] apply schema provided by string of names
f5df97f [Davies Liu] refactor, address comments
9d9af55 [Davies Liu] use arrry to applySchema and infer schema in Python
84679b3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into nested
0eaaf56 [Davies Liu] fix doc tests
b3559b4 [Davies Liu] use generated Row instead of namedtuple
c4ddc30 [Davies Liu] fix conflict between name of fields and variables
7f6f251 [Davies Liu] address all comments
d69d397 [Davies Liu] refactor
2cc2d45 [Davies Liu] refactor
182fb46 [Davies Liu] refactor
bc6e9e1 [Davies Liu] switch to new Schema API
547bf3e [Davies Liu] Merge branch 'master' into nested
a435b5a [Davies Liu] add docs and code refactor
2c8debc [Davies Liu] Merge branch 'master' into nested
644665a [Davies Liu] use tuple and namedtuple for schemardd
Author: Cheng Hao <hao.cheng@intel.com>
Closes#1686 from chenghao-intel/spark_sql_cli and squashes the following commits:
eb664cc [Cheng Hao] Output detailed failure message in console
93b0382 [Cheng Hao] Fix Bug of no output in cli if exception thrown internally
just a match forgot, found after SPARK-2710 , TimestampType can be used by a SchemaRDD generated from JDBC ResultSet
Author: chutium <teng.qiu@gmail.com>
Closes#1636 from chutium/SPARK-2729 and squashes the following commits:
71af77a [chutium] [SPARK-2729] [SQL] added Timestamp in NullableColumnAccessorSuite
39cf9f8 [chutium] [SPARK-2729] add Timestamp Type into ColumnBuilder TestSuite, ref. #1636
ab6ff97 [chutium] [SPARK-2729] Forgot to match Timestamp type in ColumnBuilder
This patch is to support the hash based outer join. Currently, outer join for big relations are resort to `BoradcastNestedLoopJoin`, which is super slow. This PR will create 2 hash tables for both relations in the same partition, which greatly reduce the table scans.
Here is the testing code that I used:
```
package org.apache.spark.sql.hive
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._
case class Record(key: String, value: String)
object JoinTablePrepare extends App {
import TestHive2._
val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"${i % 828193}", s"val_$i")))
runSqlHive("SHOW TABLES")
runSqlHive("DROP TABLE if exists a")
runSqlHive("DROP TABLE if exists b")
runSqlHive("DROP TABLE if exists result")
rdd.registerAsTable("records")
runSqlHive("""CREATE TABLE a (key STRING, value STRING)
| ROW FORMAT SERDE
| 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
| STORED AS RCFILE
""".stripMargin)
runSqlHive("""CREATE TABLE b (key STRING, value STRING)
| ROW FORMAT SERDE
| 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
| STORED AS RCFILE
""".stripMargin)
runSqlHive("""CREATE TABLE result (key STRING, value STRING)
| ROW FORMAT SERDE
| 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
| STORED AS RCFILE
""".stripMargin)
hql(s"""from records
| insert into table a
| select key, value
""".stripMargin)
hql(s"""from records
| insert into table b select key + 100000, value
""".stripMargin)
}
object JoinTablePerformanceTest extends App {
import TestHive2._
hql("SHOW TABLES")
hql("set spark.sql.shuffle.partitions=20")
val leftOuterJoin = "insert overwrite table result select a.key, b.value from a left outer join b on a.key=b.key"
val rightOuterJoin = "insert overwrite table result select a.key, b.value from a right outer join b on a.key=b.key"
val fullOuterJoin = "insert overwrite table result select a.key, b.value from a full outer join b on a.key=b.key"
val results = ("LeftOuterJoin", benchmark(leftOuterJoin)) :: ("LeftOuterJoin", benchmark(leftOuterJoin)) ::
("RightOuterJoin", benchmark(rightOuterJoin)) :: ("RightOuterJoin", benchmark(rightOuterJoin)) ::
("FullOuterJoin", benchmark(fullOuterJoin)) :: ("FullOuterJoin", benchmark(fullOuterJoin)) :: Nil
val explains = hql(s"explain $leftOuterJoin").collect ++ hql(s"explain $rightOuterJoin").collect ++ hql(s"explain $fullOuterJoin").collect
println(explains.mkString(",\n"))
results.foreach { case (prompt, result) => {
println(s"$prompt: took ${result._1} ms (${result._2} records)")
}
}
def benchmark(cmd: String) = {
val begin = System.currentTimeMillis()
val result = hql(cmd)
val end = System.currentTimeMillis()
val count = hql("select count(1) from result").collect.mkString("")
((end - begin), count)
}
}
```
And the result as shown below:
```
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#95,value#98]],
[ HashOuterJoin [key#95], [key#97], LeftOuter, None],
[ Exchange (HashPartitioning [key#95], 20)],
[ HiveTableScan [key#95], (MetastoreRelation default, a, None), None],
[ Exchange (HashPartitioning [key#97], 20)],
[ HiveTableScan [key#97,value#98], (MetastoreRelation default, b, None), None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#102,value#105]],
[ HashOuterJoin [key#102], [key#104], RightOuter, None],
[ Exchange (HashPartitioning [key#102], 20)],
[ HiveTableScan [key#102], (MetastoreRelation default, a, None), None],
[ Exchange (HashPartitioning [key#104], 20)],
[ HiveTableScan [key#104,value#105], (MetastoreRelation default, b, None), None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#109,value#112]],
[ HashOuterJoin [key#109], [key#111], FullOuter, None],
[ Exchange (HashPartitioning [key#109], 20)],
[ HiveTableScan [key#109], (MetastoreRelation default, a, None), None],
[ Exchange (HashPartitioning [key#111], 20)],
[ HiveTableScan [key#111,value#112], (MetastoreRelation default, b, None), None]
LeftOuterJoin: took 16072 ms ([3000000] records)
LeftOuterJoin: took 14394 ms ([3000000] records)
RightOuterJoin: took 14802 ms ([3000000] records)
RightOuterJoin: took 14747 ms ([3000000] records)
FullOuterJoin: took 17715 ms ([6000000] records)
FullOuterJoin: took 17629 ms ([6000000] records)
```
Without this PR, the benchmark will run seems never end.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#1147 from chenghao-intel/hash_based_outer_join and squashes the following commits:
65c599e [Cheng Hao] Fix issues with the community comments
72b1394 [Cheng Hao] Fix bug of stale value in joinedRow
55baef7 [Cheng Hao] Add HashOuterJoin
It is a follow-up PR of SPARK-2179 (https://issues.apache.org/jira/browse/SPARK-2179). It makes package names of data type APIs more consistent across languages (Scala: `org.apache.spark.sql`, Java: `org.apache.spark.sql.api.java`, Python: `pyspark.sql`).
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes#1712 from yhuai/javaDataType and squashes the following commits:
62eb705 [Yin Huai] Move package-info.
add4bcb [Yin Huai] Make the package names of data type classes consistent across languages by moving all Java data type classes to package sql.api.java.
Author: GuoQiang Li <witgo@qq.com>
Closes#1683 from witgo/SPARK-2766 and squashes the following commits:
d0db00c [GuoQiang Li] ScalaReflectionSuite throw an llegalArgumentException in JDK 6
Since we let users create Rows. It makes sense to accept mutable Maps as values of MapType columns.
JIRA: https://issues.apache.org/jira/browse/SPARK-2779
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes#1705 from yhuai/SPARK-2779 and squashes the following commits:
00d72fd [Yin Huai] Use scala.collection.Map.
This PR resolves the following two tickets:
- [SPARK-2531](https://issues.apache.org/jira/browse/SPARK-2531): BNLJ currently assumes the build side is the right relation. This patch refactors some of its logic to take into account a BuildSide properly.
- [SPARK-2436](https://issues.apache.org/jira/browse/SPARK-2436): building on top of the above, we simply use the physical size statistics (if available) of both relations, and make the smaller relation the build side in the planner.
Author: Zongheng Yang <zongheng.y@gmail.com>
Closes#1448 from concretevitamin/bnlj-buildSide and squashes the following commits:
1780351 [Zongheng Yang] Use size estimation to decide optimal build side of BNLJ.
68e6c5b [Zongheng Yang] Consolidate two adjacent pattern matchings.
96d312a [Zongheng Yang] Use a while loop instead of collection methods chaining.
4bc525e [Zongheng Yang] Make BroadcastNestedLoopJoin take a BuildSide.