## What changes were proposed in this pull request?
#### Architecture
This PR implements stream-stream inner join using a two-way symmetric hash join. At a high level, we want to do the following.
1. For each stream, we maintain the past rows as state in State Store.
- For each joining key, there can be multiple rows that have been received.
- So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
2. In each batch, for each input row in each stream
- Look up the other streams state to see if there are matching rows, and output them if they satisfy the joining condition
- Add the input row to corresponding stream’s state.
- If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches and drop the rest from the state.
Cleaning up old unnecessary state rows depends completely on whether watermark has been defined and what are join conditions. We definitely want to support state clean up two types of queries that are likely to be common.
- Queries to time range conditions - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR`
- Queries with windows as the matching key - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = window(rightTime, "1 hour")` (pseudo-SQL)
#### Implementation
The stream-stream join is primarily implemented in three classes
- `StreamingSymmetricHashJoinExec` implements the above symmetric join algorithm.
- `SymmetricsHashJoinStateManagers` manages the streaming state for the join. This essentially is a fault-tolerant key-to-list-of-values multimap built on the StateStore APIs. `StreamingSymmetricHashJoinExec` instantiates two such managers, one for each join side.
- `StreamingSymmetricHashJoinExecHelper` is a helper class to extract threshold for the state based on the join conditions and the event watermark.
Refer to the scaladocs class for more implementation details.
Besides the implementation of stream-stream inner join SparkPlan. Some additional changes are
- Allowed inner join in append mode in UnsupportedOperationChecker
- Prevented stream-stream join on an empty batch dataframe to be collapsed by the optimizer
## How was this patch tested?
- New tests in StreamingJoinSuite
- Updated tests UnsupportedOperationSuite
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19271 from tdas/SPARK-22053.
## What changes were proposed in this pull request?
I test on a dataset of about 13M instances, and found that using `treeAggregate` give a speedup in following algs:
|Algs| SpeedUp |
|------|-----------|
|OneHotEncoder| 5% |
|StatFunctions.calculateCov| 7% |
|StatFunctions.multipleApproxQuantiles| 9% |
|RegressionEvaluator| 8% |
## How was this patch tested?
existing tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#19232 from zhengruifeng/use_treeAggregate.
## What changes were proposed in this pull request?
`PeriodicRDDCheckpointer` will automatically persist the last 3 datasets called by `PeriodicRDDCheckpointer.update()`.
In GBTs, the last 3 intermediate rdds are still cached after `fit()`
## How was this patch tested?
existing tests and local test in spark-shell
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#19288 from zhengruifeng/gbt_unpersist.
## What changes were proposed in this pull request?
There is an incorrect `scalastyle:on` comment in `stringExpressions.scala` and causes the line size limit check ineffective in the file. There are many lines of code and comment which are more than 100 chars.
## How was this patch tested?
Code style change only.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19305 from viirya/fix-wrong-style.
## What changes were proposed in this pull request?
We have to make sure that SerializerManager's private instance of
kryo also uses the right classloader, regardless of the current thread
classloader. In particular, this fixes serde during remote cache
fetches, as those occur in netty threads.
## How was this patch tested?
Manual tests & existing suite via jenkins. I haven't been able to reproduce this is in a unit test, because when a remote RDD partition can be fetched, there is a warning message and then the partition is just recomputed locally. I manually verified the warning message is no longer present.
Author: Imran Rashid <irashid@cloudera.com>
Closes#19280 from squito/SPARK-21928_ser_classloader.
## What changes were proposed in this pull request?
Adjust EnsureStatefulOpPartitioningSuite to use scalatest lifecycle normally instead of constructor; fixes:
```
*** RUN ABORTED ***
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.streaming.EnsureStatefulOpPartitioningSuite.<init>(EnsureStatefulOpPartitioningSuite.scala:35)
```
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19306 from srowen/SPARK-21977.2.
## What changes were proposed in this pull request?
In SQL conditional expressions, only CASE WHEN lacks for expression description. This patch fills the gap.
## How was this patch tested?
Only documentation change.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19304 from viirya/casewhen-doc.
## What changes were proposed in this pull request?
This work is a part of [SPARK-17074](https://issues.apache.org/jira/browse/SPARK-17074) to compute equi-height histograms. Equi-height histogram is an array of bins. A bin consists of two endpoints which form an interval of values and the ndv in that interval.
This PR creates a new aggregate function, given an array of endpoints, counting distinct values (ndv) in intervals among those endpoints.
This PR also refactors `HyperLogLogPlusPlus` by extracting a helper class `HyperLogLogPlusPlusHelper`, where the underlying HLLPP algorithm locates.
## How was this patch tested?
Add new test cases.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#15544 from wzhfy/countIntervals.
## What changes were proposed in this pull request?
This PR make `sample(...)` able to omit `withReplacement` defaulting to `FALSE`.
In short, the following examples are allowed:
```r
> df <- createDataFrame(as.list(seq(10)))
> count(sample(df, fraction=0.5, seed=3))
[1] 4
> count(sample(df, fraction=1.0))
[1] 10
```
In addition, this PR also adds some type checking logics as below:
```r
> sample(df, fraction = "a")
Error in sample(df, fraction = "a") :
fraction must be numeric; however, got character
> sample(df, fraction = 1, seed = NULL)
Error in sample(df, fraction = 1, seed = NULL) :
seed must not be NULL or NA; however, got NULL
> sample(df, list(1), 1.0)
Error in sample(df, list(1), 1) :
withReplacement must be logical; however, got list
> sample(df, fraction = -1.0)
...
Error in sample : illegal argument - requirement failed: Sampling fraction (-1.0) must be on interval [0, 1] without replacement
```
## How was this patch tested?
Manually tested, unit tests added in `R/pkg/tests/fulltests/test_sparkSQL.R`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19243 from HyukjinKwon/SPARK-21780.
## What changes were proposed in this pull request?
This is a followup work of SPARK-9104 to expose the Netty memory usage to MetricsSystem. Current the shuffle Netty memory usage of `NettyBlockTransferService` will be exposed, if using external shuffle, then the Netty memory usage of `ExternalShuffleClient` and `ExternalShuffleService` will be exposed instead. Currently I don't expose Netty memory usage of `YarnShuffleService`, because `YarnShuffleService` doesn't have `MetricsSystem` itself, and is better to connect to Hadoop's MetricsSystem.
## How was this patch tested?
Manually verified in local cluster.
Author: jerryshao <sshao@hortonworks.com>
Closes#19160 from jerryshao/SPARK-21934.
## What changes were proposed in this pull request?
This a follow-up of https://github.com/apache/spark/pull/19289 , we missed another place: `rollup`. `Seq.init.toSeq` also returns a `Stream`, we should fix it too.
## How was this patch tested?
manually
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19298 from cloud-fan/bug.
## What changes were proposed in this pull request?
When the libraries temp directory(i.e. __spark_libs__*.zip dir) file system and staging dir(destination) file systems are the same then the __spark_libs__*.zip is not copying to the staging directory. But after making this decision the libraries zip file is getting deleted immediately and becoming unavailable for the Node Manager's localization.
With this change, client copies the files to remote always when the source scheme is "file".
## How was this patch tested?
I have verified it manually in yarn/cluster and yarn/client modes with hdfs and local file systems.
Author: Devaraj K <devaraj@apache.org>
Closes#19141 from devaraj-kavali/SPARK-21384.
The live listener bus now cleans up after itself and releases listeners
after stopping, so code cannot get references to listeners after the
Spark context is stopped.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19297 from vanzin/SPARK-18838.hotfix.
## What changes were proposed in this pull request?
Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```
It can be traced back to https://github.com/apache/spark/pull/15484 , which made `Expand.projections` a lazy `Stream` for group by cube.
In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts.
This change is also good for master branch, to reduce the serialized size of `Expand.projections`.
## How was this patch tested?
manually verified with Spark with Scala 2.10.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19289 from cloud-fan/bug.
## What changes were proposed in this pull request?
Clarify behavior of to_utc_timestamp/from_utc_timestamp with an example
## How was this patch tested?
Doc only change / existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19276 from srowen/SPARK-22049.
## What changes were proposed in this pull request?
See https://github.com/apache/spark/pull/19282
Revert scala-maven-plugin to 3.2.2 to work with Maven+zinc again
## How was this patch tested?
Reproduced locally with zinc, and confirmed this removes the problem.
Author: Sean Owen <sowen@cloudera.com>
Closes#19292 from srowen/SPARK-22066.2.
## What changes were proposed in this pull request?
Update plugins, including scala-maven-plugin, to latest versions. Update checkstyle to 8.2. Remove bogus checkstyle config and enable it. Fix existing and new Java checkstyle errors.
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19282 from srowen/SPARK-22066.
## What changes were proposed in this pull request?
This is a bit hard to explain as there are several issues here, I'll try my best. Here are the requirements:
1. A StructuredStreaming Source that can generate empty RDDs with 0 partitions
2. A StructuredStreaming query that uses the above source, performs a stateful aggregation
(mapGroupsWithState, groupBy.count, ...), and coalesce's by 1
The crux of the problem is that when a dataset has a `coalesce(1)` call, it receives a `SinglePartition` partitioning scheme. This scheme satisfies most required distributions used for aggregations such as HashAggregateExec. This causes a world of problems:
Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 0 partitions, nothing will be executed, the state store will not create any delta files. When this happens, the next trigger fails, because the StateStore fails to load the delta file for the previous trigger
Symptom 2. Let's say that there was data. Then in this case, if you stop your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` number of StateStores will fail to find its delta files.
To fix the issues above, we must check that the partitioning of the child of a `StatefulOperator` satisfies:
If the grouping expressions are empty:
a) AllTuple distribution
b) Single physical partition
If the grouping expressions are non empty:
a) Clustered distribution
b) spark.sql.shuffle.partition # of partitions
whether or not `coalesce(1)` exists in the plan, and whether or not the input RDD for the trigger has any data.
Once you fix the above problem by adding an Exchange to the plan, you come across the following bug:
If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if you have a trigger with no data, `StateStoreRestoreExec` doesn't return the prior state. However, for this specific aggregation, `HashAggregateExec` after the restore returns a (0, 0) row, since we're performing a count, and there is no data. Then this data gets stored in `StateStoreSaveExec` causing the previous counts to be overwritten and lost.
## How was this patch tested?
Regression tests
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#19196 from brkyvz/sa-0.
This change modifies the live listener bus so that all listeners are
added to queues; each queue has its own thread to dispatch events,
making it possible to separate slow listeners from other more
performance-sensitive ones.
The public API has not changed - all listeners added with the existing
"addListener" method, which after this change mostly means all
user-defined listeners, end up in a default queue. Internally, there's
an API allowing listeners to be added to specific queues, and that API
is used to separate the internal Spark listeners into 3 categories:
application status listeners (e.g. UI), executor management (e.g. dynamic
allocation), and the event log.
The queueing logic, while abstracted away in a separate class, is kept
as much as possible hidden away from consumers. Aside from choosing their
queue, there's no code change needed to take advantage of queues.
Test coverage relies on existing tests; a few tests had to be tweaked
because they relied on `LiveListenerBus.postToAll` being synchronous,
and the change makes that method asynchronous. Other tests were simplified
not to use the asynchronous LiveListenerBus.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19211 from vanzin/SPARK-18838.
## What changes were proposed in this pull request?
The ArrowWriter StringWriter was setting Arrow data using a position of 0 instead of the actual position in the ByteBuffer. This was currently working because of a bug ARROW-1443, and has been fixed as of
Arrow 0.7.0. Testing with this version revealed the error in ArrowConvertersSuite test string conversion.
## How was this patch tested?
Existing tests, manually verified working with Arrow 0.7.0
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19284 from BryanCutler/arrow-ArrowWriter-StringWriter-position-SPARK-22067.
## What changes were proposed in this pull request?
Tables in the catalog cache are not invalidated once their statistics are updated. As a consequence, existing sessions will use the cached information even though it is not valid anymore. Consider and an example below.
```
// step 1
spark.range(100).write.saveAsTable("tab1")
// step 2
spark.sql("analyze table tab1 compute statistics")
// step 3
spark.sql("explain cost select distinct * from tab1").show(false)
// step 4
spark.range(100).write.mode("append").saveAsTable("tab1")
// step 5
spark.sql("explain cost select distinct * from tab1").show(false)
```
After step 3, the table will be present in the catalog relation cache. Step 4 will correctly update the metadata inside the catalog but will NOT invalidate the cache.
By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 3 and step 4 would also solve the problem.
## How was this patch tested?
Current and additional unit tests.
Author: aokolnychyi <anton.okolnychyi@sap.com>
Closes#19252 from aokolnychyi/spark-21969.
## What changes were proposed in this pull request?
org.apache.spark.sql.jdbc.JdbcDialect's method:
def isCascadingTruncateTable(): Option[Boolean] = None
is not overriden in org.apache.spark.sql.jdbc.AggregatedDialect class.
Because of this issue, when you add more than one dialect Spark doesn't truncate table because isCascadingTruncateTable always returns default None for Aggregated Dialect.
Will implement isCascadingTruncateTable in AggregatedDialect class in this PR.
## How was this patch tested?
In JDBCSuite, inside test("Aggregated dialects"), will add one line to test AggregatedDialect.isCascadingTruncateTable
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#19256 from huaxingao/spark-21338.
## What changes were proposed in this pull request?
Remove unnecessary default value setting for all evaluators, as we have set them in corresponding _HasXXX_ base classes.
## How was this patch tested?
Existing tests.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#19262 from yanboliang/evaluation.
## What changes were proposed in this pull request?
In the current Spark, when submitting application on YARN with remote resources `./bin/spark-shell --jars http://central.maven.org/maven2/com/github/swagger-akka-http/swagger-akka-http_2.11/0.10.1/swagger-akka-http_2.11-0.10.1.jar --master yarn-client -v`, Spark will be failed with:
```
java.io.IOException: No FileSystem for scheme: http
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:354)
at org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:478)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:600)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:599)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:599)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:598)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:598)
at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:848)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:173)
```
This is because `YARN#client` assumes resources are on the Hadoop compatible FS. To fix this problem, here propose to download remote http(s) resources to local and add this local downloaded resources to dist cache. This solution has one downside: remote resources are downloaded and uploaded again, but it only restricted to only remote http(s) resources, also the overhead is not so big. The advantages of this solution is that it is simple and the code changes restricts to only `SparkSubmit`.
## How was this patch tested?
Unit test added, also verified in local cluster.
Author: jerryshao <sshao@hortonworks.com>
Closes#19130 from jerryshao/SPARK-21917.
## What changes were proposed in this pull request?
While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too.
## How was this patch tested?
existing ut
cc cloud-fan jiangxb1987
Author: Kent Yao <yaooqinn@hotmail.com>
Closes#19068 from yaooqinn/SPARK-21428-FOLLOWUP.
Current implementation for processingRate-total uses wrong metric:
mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond
## What changes were proposed in this pull request?
Adjust processingRate-total from using inputRowsPerSecond to processedRowsPerSecond
## How was this patch tested?
Built spark from source with proposed change and tested output with correct parameter. Before change the csv metrics file for inputRate-total and processingRate-total displayed the same values due to the error. After changing MetricsReporter.scala the processingRate-total csv file displayed the correct metric.
<img width="963" alt="processed rows per second" src="https://user-images.githubusercontent.com/32072374/30554340-82eea12c-9ca4-11e7-8370-8168526ff9a2.png">
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Taaffy <32072374+Taaffy@users.noreply.github.com>
Closes#19268 from Taaffy/patch-1.
## What changes were proposed in this pull request?
* Removed the method `org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`.
It became unused as a result of 85b0a15754
(SPARK-15962) introducing word alignment for unsafe arrays.
* Cleaned up duplicate code in memory management and unsafe sorters
* The change extracting the exception paths is more than just cosmetics since it def. reduces the size the affected methods compile to
## How was this patch tested?
* Build still passes after removing the method, grepping the codebase for `alignToWords` shows no reference to it anywhere either.
* Dried up code is covered by existing tests.
Author: Armin <me@obrown.io>
Closes#19254 from original-brownbear/cleanup-mem-consumer.
## What changes were proposed in this pull request?
When Spark persist data to Unsafe memory, we call the method `MemoryStore.putIteratorAsBytes`, which need synchronize the `memoryManager` for every record write. This implementation is not necessary, we can apply for more memory at a time to reduce unnecessary synchronization.
## How was this patch tested?
Test case (with 1 executor 20 core):
```scala
val start = System.currentTimeMillis()
val data = sc.parallelize(0 until Integer.MAX_VALUE, 100)
.persist(StorageLevel.OFF_HEAP)
.count()
println(System.currentTimeMillis() - start)
```
Test result:
before
| 27647 | 29108 | 28591 | 28264 | 27232 |
after
| 26868 | 26358 | 27767 | 26653 | 26693 |
Author: Xianyang Liu <xianyang.liu@intel.com>
Closes#19135 from ConeyLiu/memorystore.
## What changes were proposed in this pull request?
This PR tries to download Spark for each test run, to make sure each test run is absolutely isolated.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19265 from cloud-fan/test.
## What changes were proposed in this pull request?
Upgrade codahale metrics library so that Graphite constructor can re-resolve hosts behind a CNAME with re-tried DNS lookups. When Graphite is deployed behind an ELB, ELB may change IP addresses based on auto-scaling needs. Using current approach yields Graphite usage impossible, fixing for that use case
- Upgrade to codahale 3.1.5
- Use new Graphite(host, port) constructor instead of new Graphite(new InetSocketAddress(host, port)) constructor
## How was this patch tested?
The same logic is used for another project that is using the same configuration and code path, and graphite re-connect's behind ELB's are no longer an issue
This are proposed changes for codahale lib - https://github.com/dropwizard/metrics/compare/v3.1.2...v3.1.5#diff-6916c85d2dd08d89fe771c952e3b8512R120. Specifically, b4d246d34e/metrics-graphite/src/main/java/com/codahale/metrics/graphite/Graphite.java (L120)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: alexmnyc <project@alexandermarkham.com>
Closes#19210 from alexmnyc/patch-1.
#### What changes were proposed in this pull request?
This PR enhances the TRIM function support in Spark SQL by allowing the specification
of trim characters set. Below is the SQL syntax :
``` SQL
<trim function> ::= TRIM <left paren> <trim operands> <right paren>
<trim operands> ::= [ [ <trim specification> ] [ <trim character set> ] FROM ] <trim source>
<trim source> ::= <character value expression>
<trim specification> ::=
LEADING
| TRAILING
| BOTH
<trim character set> ::= <characters value expression>
```
or
``` SQL
LTRIM (source-exp [, trim-exp])
RTRIM (source-exp [, trim-exp])
```
Here are the documentation link of support of this feature by other mainstream databases.
- **Oracle:** [TRIM function](http://docs.oracle.com/cd/B28359_01/olap.111/b28126/dml_functions_2126.htm#OLADM704)
- **DB2:** [TRIM scalar function](https://www.ibm.com/support/knowledgecenter/en/SSMKHH_10.0.0/com.ibm.etools.mft.doc/ak05270_.htm)
- **MySQL:** [Trim function](http://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_trim)
- **Oracle:** [ltrim](https://docs.oracle.com/cd/B28359_01/olap.111/b28126/dml_functions_2018.htm#OLADM594)
- **DB2:** [ltrim](https://www.ibm.com/support/knowledgecenter/en/SSEPEK_11.0.0/sqlref/src/tpc/db2z_bif_ltrim.html)
This PR is to implement the above enhancement. In the implementation, the design principle is to keep the changes to the minimum. Also, the exiting trim functions (which handles a special case, i.e., trimming space characters) are kept unchanged for performane reasons.
#### How was this patch tested?
The unit test cases are added in the following files:
- UTF8StringSuite.java
- StringExpressionsSuite.scala
- sql/SQLQuerySuite.scala
- StringFunctionsSuite.scala
Author: Kevin Yu <qyu@us.ibm.com>
Closes#12646 from kevinyu98/spark-14878.
## What changes were proposed in this pull request?
The UDF needs to deserialize the `UnsafeRow`. When the column type is Array, the `get` method from the `ColumnVector`, which is used by the vectorized reader, is called, but this method is not implemented.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Feng Liu <fengliu@databricks.com>
Closes#19230 from liufengdb/fix_array_open.
## What changes were proposed in this pull request?
As reported in https://issues.apache.org/jira/browse/SPARK-22047 , HiveExternalCatalogVersionsSuite is failing frequently, let's disable this test suite to unblock other PRs, I'm looking into the root cause.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19264 from cloud-fan/test.
Profiling some of our big jobs, we see that around 30% of the time is being spent in reading the spill files from disk. In order to amortize the disk IO cost, the idea is to implement a read ahead input stream which asynchronously reads ahead from the underlying input stream when specified amount of data has been read from the current buffer. It does it by maintaining two buffer - active buffer and read ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read-ahead buffer is used to asynchronously read from the underlying input stream and once the active buffer is exhausted, we flip the two buffers so that we can start reading from the read ahead buffer without being blocked in disk I/O.
## How was this patch tested?
Tested by running a job on the cluster and could see up to 8% CPU improvement.
Author: Sital Kedia <skedia@fb.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Author: Sital Kedia <sitalkedia@users.noreply.github.com>
Closes#18317 from sitalkedia/read_ahead_buffer.
## What changes were proposed in this pull request?
This PR proposes to improve error message from:
```
>>> sc.show_profiles()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/context.py", line 1000, in show_profiles
self.profiler_collector.show_profiles()
AttributeError: 'NoneType' object has no attribute 'show_profiles'
>>> sc.dump_profiles("/tmp/abc")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/context.py", line 1005, in dump_profiles
self.profiler_collector.dump_profiles(path)
AttributeError: 'NoneType' object has no attribute 'dump_profiles'
```
to
```
>>> sc.show_profiles()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/context.py", line 1003, in show_profiles
raise RuntimeError("'spark.python.profile' configuration must be set "
RuntimeError: 'spark.python.profile' configuration must be set to 'true' to enable Python profile.
>>> sc.dump_profiles("/tmp/abc")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/context.py", line 1012, in dump_profiles
raise RuntimeError("'spark.python.profile' configuration must be set "
RuntimeError: 'spark.python.profile' configuration must be set to 'true' to enable Python profile.
```
## How was this patch tested?
Unit tests added in `python/pyspark/tests.py` and manual tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19260 from HyukjinKwon/profile-errors.
As written now, there must be both memory and disk bytes spilled to show either of them. If there is only one of those types of spill recorded, it will be hidden.
Author: Andrew Ash <andrew@andrewash.com>
Closes#19164 from ash211/patch-3.
## What changes were proposed in this pull request?
(edited)
Fixes a bug introduced in #16121
In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done.
## How was this patch tested?
Additional unit test
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#19226 from aray/SPARK-21985.
## What changes were proposed in this pull request?
StructType.fromInternal is calling f.fromInternal(v) for every field.
We can use precalculated information about type to limit the number of function calls. (its calculated once per StructType and used in per record calculations)
Benchmarks (Python profiler)
```
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
```
Before
```
310274584 function calls (300272456 primitive calls) in 1320.684 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
10000000 253.417 0.000 486.991 0.000 types.py:619(<listcomp>)
30000000 192.272 0.000 1009.986 0.000 types.py:612(fromInternal)
100000000 176.140 0.000 176.140 0.000 types.py:88(fromInternal)
20000000 156.832 0.000 328.093 0.000 types.py:1471(_create_row)
14000 107.206 0.008 1237.917 0.088 {built-in method loads}
20000000 80.176 0.000 1090.162 0.000 types.py:1468(<lambda>)
```
After
```
210274584 function calls (200272456 primitive calls) in 1035.974 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
30000000 215.845 0.000 698.748 0.000 types.py:612(fromInternal)
20000000 165.042 0.000 351.572 0.000 types.py:1471(_create_row)
14000 116.834 0.008 946.791 0.068 {built-in method loads}
20000000 87.326 0.000 786.073 0.000 types.py:1468(<lambda>)
20000000 85.477 0.000 134.607 0.000 types.py:1519(__new__)
10000000 65.777 0.000 126.712 0.000 types.py:619(<listcomp>)
```
Main difference is types.py:619(<listcomp>) and types.py:88(fromInternal) (which is removed in After)
The number of function calls is 100 million less. And performance is 20% better.
Benchmark (worst case scenario.)
Test
```
df = spark.range(1000000).selectExpr("current_timestamp as id0", "current_timestamp as id1", "current_timestamp as id2", "current_timestamp as id3", "current_timestamp as id4", "current_timestamp as id5", "current_timestamp as id6", "current_timestamp as id7", "current_timestamp as id8", "current_timestamp as id9").cache()
df.count()
df.rdd.map(lambda x: x).count()
```
Before
```
31166064 function calls (31163984 primitive calls) in 150.882 seconds
```
After
```
31166064 function calls (31163984 primitive calls) in 153.220 seconds
```
IMPORTANT:
The benchmark was done on top of https://github.com/apache/spark/pull/19246.
Without https://github.com/apache/spark/pull/19246 the performance improvement will be even greater.
## How was this patch tested?
Existing tests.
Performance benchmark.
Author: Maciej Bryński <maciek-github@brynski.pl>
Closes#19249 from maver1ck/spark_22032.
## What changes were proposed in this pull request?
* Using 64 bit unsigned long comparison instead of unsigned int comparison in `org.apache.spark.unsafe.types.UTF8String#compareTo` for better performance.
* Making `IS_LITTLE_ENDIAN` a constant for correctness reasons (shouldn't use a non-constant in `compareTo` implementations and it def. is a constant per JVM)
## How was this patch tested?
Build passes and the functionality is widely covered by existing tests as far as I can see.
Author: Armin <me@obrown.io>
Closes#19180 from original-brownbear/SPARK-21967.
## What changes were proposed in this pull request?
Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily.
## How was this patch tested?
new unit test
Author: Jose Torres <jose@databricks.com>
Closes#19239 from joseph-torres/SPARK-22017.
## What changes were proposed in this pull request?
This PR adds the infrastructure for data source v2, and implement features which Spark already have in data source v1, i.e. column pruning, filter push down, catalyst expression filter push down, InternalRow scan, schema inference, data size report. The write path is excluded to avoid making this PR growing too big, and will be added in follow-up PR.
## How was this patch tested?
new tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19136 from cloud-fan/data-source-v2.
## What changes were proposed in this pull request?
Change a data transformation while saving a Word2VecModel to happen with distributed data instead of local driver data.
## How was this patch tested?
Unit tests for the ML sub-component still pass.
Running this patch against v2.2.0 in a fully distributed production cluster allows a 4.0G model to save and load correctly, where it would not do so without the patch.
Author: Travis Hegner <thegner@trilliumit.com>
Closes#19191 from travishegner/master.
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/18600 we removed the `metadata` field from `SparkPlanInfo`. This causes a problem when we replay event logs that are generated by older Spark versions.
## How was this patch tested?
a regression test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19237 from cloud-fan/event.
## What changes were proposed in this pull request?
https://github.com/apache/spark/pull/18266 add a new feature to support read JDBC table use custom schema, but we must specify all the fields. For simplicity, this PR support specify partial fields.
## How was this patch tested?
unit tests
Author: Yuming Wang <wgyumg@gmail.com>
Closes#19231 from wangyum/SPARK-22002.
## What changes were proposed in this pull request?
As logging below, actually exception will be hidden when removeBlockInternal throw an exception.
`2017-08-31,10:26:57,733 WARN org.apache.spark.storage.BlockManager: Putting block broadcast_110 failed due to an exception
2017-08-31,10:26:57,734 WARN org.apache.spark.broadcast.BroadcastManager: Failed to create a new broadcast in 1 attempts
java.io.IOException: Failed to create local dir in /tmp/blockmgr-5bb5ac1e-c494-434a-ab89-bd1808c6b9ed/2e.
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:70)
at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:115)
at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1339)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:726)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1233)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:122)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager$$anonfun$newBroadcast$1.apply$mcVI$sp(BroadcastManager.scala:60)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:58)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1415)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1002)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:924)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:771)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:770)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.scheduler.DAGScheduler.submitWaitingChildStages(DAGScheduler.scala:770)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1235)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1662)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1620)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1609)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)`
In this pr i will print exception first make troubleshooting more conveniently.
PS:
This one split from [PR-19133](https://github.com/apache/spark/pull/19133)
## How was this patch tested?
Exsist unit test
Author: zhoukang <zhoukang199191@gmail.com>
Closes#19171 from caneGuy/zhoukang/print-rootcause.
## What changes were proposed in this pull request?
If there are two projects like as follows.
```
Project [a_with_metadata#27 AS b#26]
+- Project [a#0 AS a_with_metadata#27]
+- LocalRelation <empty>, [a#0, b#1]
```
Child Project has an output column with a metadata in it, and the parent Project has an alias that implicitly forwards the metadata. So this metadata is visible for higher operators. Upon applying CollapseProject optimizer rule, the metadata is not preserved.
```
Project [a#0 AS b#26]
+- LocalRelation <empty>, [a#0, b#1]
```
This is incorrect, as downstream operators that expect certain metadata (e.g. watermark in structured streaming) to identify certain fields will fail to do so. This PR fixes it by preserving the metadata of top-level aliases.
## How was this patch tested?
New unit test
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19240 from tdas/SPARK-22018.
## What changes were proposed in this pull request?
In previous work SPARK-21513, we has allowed `MapType` and `ArrayType` of `MapType`s convert to a json string but only for Scala API. In this follow-up PR, we will make SparkSQL support it for PySpark and SparkR, too. We also fix some little bugs and comments of the previous work in this follow-up PR.
### For PySpark
```
>>> data = [(1, {"name": "Alice"})]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"name":"Alice")']
>>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
```
### For SparkR
```
# Converts a map into a JSON object
df2 <- sql("SELECT map('name', 'Bob')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
# Converts an array of maps into a JSON array
df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
```
## How was this patch tested?
Add unit test cases.
cc viirya HyukjinKwon
Author: goldmedal <liugs963@gmail.com>
Closes#19223 from goldmedal/SPARK-21513-fp-PySaprkAndSparkR.
## What changes were proposed in this pull request?
Add default stats to StreamingExecutionRelation.
## How was this patch tested?
existing unit tests and an explain() test to be sure
Author: Jose Torres <jose@databricks.com>
Closes#19212 from joseph-torres/SPARK-21988.
## What changes were proposed in this pull request?
Drop test tables and improve comments.
## How was this patch tested?
Modified existing test.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19213 from wzhfy/useless_comment.