cbaa729132
## What changes were proposed in this pull request? 1. Refactor ExecuteWriteTask in FileFormatWriter to reduce common logic and improve readability. After the change, callers only need to call `commit()` or `abort` at the end of task. Also there is less code in `SingleDirectoryWriteTask` and `DynamicPartitionWriteTask`. Definitions of related classes are moved to a new file, and `ExecuteWriteTask` is renamed to `FileFormatDataWriter`. 2. As per code style guide: https://github.com/databricks/scala-style-guide#traversal-and-zipwithindex , we avoid using `for` for looping in [FileFormatWriter](https://github.com/apache/spark/pull/21381/files#diff-3b69eb0963b68c65cfe8075f8a42e850L536) , or `foreach` in [WriteToDataSourceV2Exec](https://github.com/apache/spark/pull/21381/files#diff-6fbe10db766049a395bae2e785e9d56eL119). In such critical code path, using `while` is good for performance. ## How was this patch tested? Existing unit test. I tried the microbenchmark in https://github.com/apache/spark/pull/21409 | Workload | Before changes(Best/Avg Time(ms)) | After changes(Best/Avg Time(ms)) | | --- | --- | -- | |Output Single Int Column| 2018 / 2043 | 2096 / 2236 | |Output Single Double Column| 1978 / 2043 | 2013 / 2018 | |Output Int and String Column| 6332 / 6706 | 6162 / 6298 | |Output Partitions| 4458 / 5094 | 3792 / 4008 | |Output Buckets| 5695 / 6102 | 5120 / 5154 | Also a microbenchmark on my laptop for general comparison among while/foreach/for : ``` class Writer { var sum = 0L def write(l: Long): Unit = sum += l } def testWhile(iterator: Iterator[Long]): Long = { val w = new Writer while (iterator.hasNext) { w.write(iterator.next()) } w.sum } def testForeach(iterator: Iterator[Long]): Long = { val w = new Writer iterator.foreach(w.write) w.sum } def testFor(iterator: Iterator[Long]): Long = { val w = new Writer for (x <- iterator) { w.write(x) } w.sum } val data = 0L to 100000000L val start = System.nanoTime (0 to 10).foreach(_ => testWhile(data.iterator)) println("benchmark while: " + (System.nanoTime - start)/1000000) val start2 = System.nanoTime (0 to 10).foreach(_ => testForeach(data.iterator)) println("benchmark foreach: " + (System.nanoTime - start2)/1000000) val start3 = System.nanoTime (0 to 10).foreach(_ => testForeach(data.iterator)) println("benchmark for: " + (System.nanoTime - start3)/1000000) ``` Benchmark result: `while`: 15401 ms `foreach`: 43034 ms `for`: 41279 ms Author: Gengliang Wang <gengliang.wang@databricks.com> Closes #21381 from gengliangwang/refactorExecuteWriteTask. |
||
---|---|---|
.. | ||
catalyst | ||
core | ||
hive | ||
hive-thriftserver | ||
create-docs.sh | ||
gen-sql-markdown.py | ||
mkdocs.yml | ||
README.md |
Spark SQL
This module provides support for executing relational queries expressed in either SQL or the DataFrame/Dataset API.
Spark SQL is broken up into four subprojects:
- Catalyst (sql/catalyst) - An implementation-agnostic framework for manipulating trees of relational operators and expressions.
- Execution (sql/core) - A query planner / execution engine for translating Catalyst's logical query plans into Spark RDDs. This component also includes a new public interface, SQLContext, that allows users to execute SQL or LINQ statements against existing RDDs and Parquet files.
- Hive Support (sql/hive) - Includes an extension of SQLContext called HiveContext that allows users to write queries using a subset of HiveQL and access data from a Hive Metastore using Hive SerDes. There are also wrappers that allow users to run queries that include Hive UDFs, UDAFs, and UDTFs.
- HiveServer and CLI support (sql/hive-thriftserver) - Includes support for the SQL CLI (bin/spark-sql) and a HiveServer2 (for JDBC/ODBC) compatible server.
Running sql/create-docs.sh
generates SQL documentation for built-in functions under sql/site
.