## What changes were proposed in this pull request?
The `percentile_approx` function previously accepted numeric type input and output double type results.
But since all numeric types, date and timestamp types are represented as numerics internally, `percentile_approx` can support them easily.
After this PR, it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles.
This change is also required when we generate equi-height histograms for these types.
## How was this patch tested?
Added a new test and modified some existing tests.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19321 from wzhfy/approx_percentile_support_types.
## What changes were proposed in this pull request?
Updated docs so that a line of python in the quick start guide executes. Closes#19283
## How was this patch tested?
Existing tests.
Author: John O'Leary <jgoleary@gmail.com>
Closes#19326 from jgoleary/issues/22107.
## What changes were proposed in this pull request?
Enable Scala 2.12 REPL. Fix most remaining issues with 2.12 compilation and warnings, including:
- Selecting Kafka 0.10.1+ for Scala 2.12 and patching over a minor API difference
- Fixing lots of "eta expansion of zero arg method deprecated" warnings
- Resolving the SparkContext.sequenceFile implicits compile problem
- Fixing an odd but valid jetty-server missing dependency in hive-thriftserver
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19307 from srowen/Scala212.
## What changes were proposed in this pull request?
EventLoggingListener use `val in = new BufferedInputStream(fs.open(log))` and will close it if `codec.map(_.compressedInputStream(in)).getOrElse(in)` occurs an exception .
But, if `CompressionCodec.createCodec(new SparkConf, c)` throws an exception, the BufferedInputStream `in` will not be closed anymore.
## How was this patch tested?
exist tests
Author: zuotingbing <zuo.tingbing9@zte.com.cn>
Closes#19277 from zuotingbing/SPARK-22058.
## What changes were proposed in this pull request?
This PR proposes to remove `assume` in `Utils.resolveURIs` and replace `assume` to `assert` in `Utils.resolveURI` in the test cases in `UtilsSuite`.
It looks `Utils.resolveURIs` supports multiple but also single paths as input. So, it looks not meaningful to check if the input has `,`.
For the test for `Utils.resolveURI`, I replaced it to `assert` because it looks taking single path and in order to prevent future mistakes when adding more tests here.
For `assume` in `HiveDDLSuite`, it looks it should be `assert` to test at the last
## How was this patch tested?
Fixed unit tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19332 from HyukjinKwon/SPARK-22093.
## What changes were proposed in this pull request?
The implemented `isCascadingTruncateTable` in `AggregatedDialect` is wrong. When no dialect claims cascading, once there is an unknown cascading truncate in the dialects, we should return unknown cascading, instead of false.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19286 from viirya/SPARK-21338-followup.
## What changes were proposed in this pull request?
This PR proposes to enhance the documentation for `trim` functions in the function description session.
- Add more `usage`, `arguments` and `examples` for the trim function
- Adjust space in the `usage` session
After the changes, the trim function documentation will look like this:
- `trim`
```trim(str) - Removes the leading and trailing space characters from str.
trim(BOTH trimStr FROM str) - Remove the leading and trailing trimStr characters from str
trim(LEADING trimStr FROM str) - Remove the leading trimStr characters from str
trim(TRAILING trimStr FROM str) - Remove the trailing trimStr characters from str
Arguments:
str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
BOTH, FROM - these are keywords to specify trimming string characters from both ends of the string
LEADING, FROM - these are keywords to specify trimming string characters from the left end of the string
TRAILING, FROM - these are keywords to specify trimming string characters from the right end of the string
Examples:
> SELECT trim(' SparkSQL ');
SparkSQL
> SELECT trim('SL', 'SSparkSQLS');
parkSQ
> SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
parkSQ
> SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
parkSQLS
> SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
SSparkSQ
```
- `ltrim`
```ltrim
ltrim(str) - Removes the leading space characters from str.
ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string
Arguments:
str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT ltrim(' SparkSQL ');
SparkSQL
> SELECT ltrim('Sp', 'SSparkSQLS');
arkSQLS
```
- `rtrim`
```rtrim
rtrim(str) - Removes the trailing space characters from str.
rtrim(trimStr, str) - Removes the trailing string which contains the characters from the trim string from the str
Arguments:
str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT rtrim(' SparkSQL ');
SparkSQL
> SELECT rtrim('LQSa', 'SSparkSQLS');
SSpark
```
This is the trim characters function jira: [trim function](https://issues.apache.org/jira/browse/SPARK-14878)
## How was this patch tested?
Manually tested
```
spark-sql> describe function extended trim;
17/09/22 17:03:04 INFO CodeGenerator: Code generated in 153.026533 ms
Function: trim
Class: org.apache.spark.sql.catalyst.expressions.StringTrim
Usage:
trim(str) - Removes the leading and trailing space characters from `str`.
trim(BOTH trimStr FROM str) - Remove the leading and trailing `trimStr` characters from `str`
trim(LEADING trimStr FROM str) - Remove the leading `trimStr` characters from `str`
trim(TRAILING trimStr FROM str) - Remove the trailing `trimStr` characters from `str`
Extended Usage:
Arguments:
* str - a string expression
* trimStr - the trim string characters to trim, the default value is a single space
* BOTH, FROM - these are keywords to specify trimming string characters from both ends of
the string
* LEADING, FROM - these are keywords to specify trimming string characters from the left
end of the string
* TRAILING, FROM - these are keywords to specify trimming string characters from the right
end of the string
Examples:
> SELECT trim(' SparkSQL ');
SparkSQL
> SELECT trim('SL', 'SSparkSQLS');
parkSQ
> SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
parkSQ
> SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
parkSQLS
> SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
SSparkSQ
```
```
spark-sql> describe function extended ltrim;
Function: ltrim
Class: org.apache.spark.sql.catalyst.expressions.StringTrimLeft
Usage:
ltrim(str) - Removes the leading space characters from `str`.
ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string
Extended Usage:
Arguments:
* str - a string expression
* trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT ltrim(' SparkSQL ');
SparkSQL
> SELECT ltrim('Sp', 'SSparkSQLS');
arkSQLS
```
```
spark-sql> describe function extended rtrim;
Function: rtrim
Class: org.apache.spark.sql.catalyst.expressions.StringTrimRight
Usage:
rtrim(str) - Removes the trailing space characters from `str`.
rtrim(trimStr, str) - Removes the trailing string which contains the characters from the trim string from the `str`
Extended Usage:
Arguments:
* str - a string expression
* trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT rtrim(' SparkSQL ');
SparkSQL
> SELECT rtrim('LQSa', 'SSparkSQLS');
SSpark
```
Author: Kevin Yu <qyu@us.ibm.com>
Closes#19329 from kevinyu98/spark-14878-5.
Change-Id: I88c272444ca734dc2cbc2592607c11287b90a383
## What changes were proposed in this pull request?
The documentation on File DStreams is enhanced to
1. Detail the exact timestamp logic for examining directories and files.
1. Detail how object stores different from filesystems, and so how using them as a source of data should be treated with caution, possibly publishing data to the store differently (direct PUTs as opposed to stage + rename)
## How was this patch tested?
n/a
Author: Steve Loughran <stevel@hortonworks.com>
Closes#17743 from steveloughran/cloud/SPARK-20448-document-dstream-blobstore.
## What changes were proposed in this pull request?
This PR proposes to resolve the type conflicts in strings and timestamps in partition column values.
It looks we need to set the timezone as it needs a cast between strings and timestamps.
```scala
val df = Seq((1, "2015-01-01 00:00:00"), (2, "2014-01-01 00:00:00"), (3, "blah")).toDF("i", "str")
val path = "/tmp/test.parquet"
df.write.format("parquet").partitionBy("str").save(path)
spark.read.parquet(path).show()
```
**Before**
```
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression$class.timeZone(datetimeExpressions.scala:46)
at org.apache.spark.sql.catalyst.expressions.Cast.timeZone$lzycompute(Cast.scala:172)
at org.apache.spark.sql.catalyst.expressions.Cast.timeZone(Cast.scala:172)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$buildCast(Cast.scala:201)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3.apply(Cast.scala:207)
at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:533)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:331)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:481)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:480)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
```
**After**
```
+---+-------------------+
| i| str|
+---+-------------------+
| 2|2014-01-01 00:00:00|
| 1|2015-01-01 00:00:00|
| 3| blah|
+---+-------------------+
```
## How was this patch tested?
Unit tests added in `ParquetPartitionDiscoverySuite` and manual tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19331 from HyukjinKwon/SPARK-22109.
## What changes were proposed in this pull request?
Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19266 from srowen/SPARK-22033.
## What changes were proposed in this pull request?
Fix for setup of `SPARK_JARS_DIR` on Windows as it looks for `%SPARK_HOME%\RELEASE` file instead of `%SPARK_HOME%\jars` as it should. RELEASE file is not included in the `pip` build of PySpark.
## How was this patch tested?
Local install of PySpark on Anaconda 4.4.0 (Python 3.6.1).
Author: Jakub Nowacki <j.s.nowacki@gmail.com>
Closes#19310 from jsnowacki/master.
## What changes were proposed in this pull request?
Currently the param of CrossValidator/TrainValidationSplit persist/loading is hardcoding, which is different with other ML estimators. This cause persist bug for new added `parallelism` param.
I refactor related code, avoid hardcoding persist/load param. And in the same time, it solve the `parallelism` persisting bug.
This refactoring is very useful because we will add more new params in #19208 , hardcoding param persisting/loading making the thing adding new params very troublesome.
## How was this patch tested?
Test added.
Author: WeichenXu <weichen.xu@databricks.com>
Closes#19278 from WeichenXu123/fix-tuning-param-bug.
## What changes were proposed in this pull request?
When calling `DataFrame.toPandas()` (without Arrow enabled), if there is a `IntegralType` column (`IntegerType`, `ShortType`, `ByteType`) that has null values the following exception is thrown:
ValueError: Cannot convert non-finite values (NA or inf) to integer
This is because the null values first get converted to float NaN during the construction of the Pandas DataFrame in `from_records`, and then it is attempted to be converted back to to an integer where it fails.
The fix is going to check if the Pandas DataFrame can cause such failure when converting, if so, we don't do the conversion and use the inferred type by Pandas.
Closes#18945
## How was this patch tested?
Added pyspark test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19319 from viirya/SPARK-21766.
## What changes were proposed in this pull request?
`OffHeapColumnVector.reserveInternal()` will only copy already inserted values during reallocation if `data != null`. In vectors containing arrays or structs this is incorrect, since there field `data` is not used at all. We need to check `nulls` instead.
## How was this patch tested?
Adds new tests to `ColumnVectorSuite` that reproduce the errors.
Author: Ala Luszczak <ala@databricks.com>
Closes#19308 from ala/vector-realloc.
This PR adds vectorized UDFs to the Python API
**Proposed API**
Introduce a flag to turn on vectorization for a defined UDF, for example:
```
pandas_udf(DoubleType())
def plus(a, b)
return a + b
```
or
```
plus = pandas_udf(lambda a, b: a + b, DoubleType())
```
Usage is the same as normal UDFs
0-parameter UDFs
pandas_udf functions can declare an optional `**kwargs` and when evaluated, will contain a key "size" that will give the required length of the output. For example:
```
pandas_udf(LongType())
def f0(**kwargs):
return pd.Series(1).repeat(kwargs["size"])
df.select(f0())
```
Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available.
- [x] Fix support for promoted types with null values
- [ ] Discuss 0-param UDF API (use of kwargs)
- [x] Add tests for chained UDFs
- [ ] Discuss behavior when pyarrow not installed / enabled
- [ ] Cleanup pydoc and add user docs
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404.
## What changes were proposed in this pull request?
Check JDK version (with javac) and use SPARK_VERSION for publish-release
## How was this patch tested?
Manually tried local build with wrong JDK / JAVA_HOME & built a local release (LFTP disabled)
Author: Holden Karau <holden@us.ibm.com>
Closes#19312 from holdenk/improve-release-scripts-r2.
## What changes were proposed in this pull request?
Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children.
For example, J = {A join B on key1 = key2}
1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should include "key1 ASC, sameOrderExp=c1"
So to fix this I changed the behavior of <code>getKeyOrdering(keys, childOutputOrdering)</code> to:
1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering
2. Otherwise => required child ordering
In addition, I organized the logic for deciding the relationship between two orderings into SparkPlan, so that it can be reused by EnsureRequirements and SortMergeJoinExec, and potentially other classes.
## How was this patch tested?
Added new test cases.
Passed all integration tests.
Author: maryannxue <maryann.xue@gmail.com>
Closes#19281 from maryannxue/spark-21998.
## What changes were proposed in this pull request?
Added Python interface for ClusteringEvaluator
## How was this patch tested?
Manual test, eg. the example Python code in the comments.
cc yanboliang
Author: Marco Gaido <mgaido@hortonworks.com>
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#19204 from mgaido91/SPARK-21981.
## What changes were proposed in this pull request?
`processAllAvailable` should also check the query state and if the query is stopped, it should return.
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19314 from zsxwing/SPARK-22094.
## What changes were proposed in this pull request?
#### Architecture
This PR implements stream-stream inner join using a two-way symmetric hash join. At a high level, we want to do the following.
1. For each stream, we maintain the past rows as state in State Store.
- For each joining key, there can be multiple rows that have been received.
- So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
2. In each batch, for each input row in each stream
- Look up the other streams state to see if there are matching rows, and output them if they satisfy the joining condition
- Add the input row to corresponding stream’s state.
- If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches and drop the rest from the state.
Cleaning up old unnecessary state rows depends completely on whether watermark has been defined and what are join conditions. We definitely want to support state clean up two types of queries that are likely to be common.
- Queries to time range conditions - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR`
- Queries with windows as the matching key - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = window(rightTime, "1 hour")` (pseudo-SQL)
#### Implementation
The stream-stream join is primarily implemented in three classes
- `StreamingSymmetricHashJoinExec` implements the above symmetric join algorithm.
- `SymmetricsHashJoinStateManagers` manages the streaming state for the join. This essentially is a fault-tolerant key-to-list-of-values multimap built on the StateStore APIs. `StreamingSymmetricHashJoinExec` instantiates two such managers, one for each join side.
- `StreamingSymmetricHashJoinExecHelper` is a helper class to extract threshold for the state based on the join conditions and the event watermark.
Refer to the scaladocs class for more implementation details.
Besides the implementation of stream-stream inner join SparkPlan. Some additional changes are
- Allowed inner join in append mode in UnsupportedOperationChecker
- Prevented stream-stream join on an empty batch dataframe to be collapsed by the optimizer
## How was this patch tested?
- New tests in StreamingJoinSuite
- Updated tests UnsupportedOperationSuite
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19271 from tdas/SPARK-22053.
## What changes were proposed in this pull request?
I test on a dataset of about 13M instances, and found that using `treeAggregate` give a speedup in following algs:
|Algs| SpeedUp |
|------|-----------|
|OneHotEncoder| 5% |
|StatFunctions.calculateCov| 7% |
|StatFunctions.multipleApproxQuantiles| 9% |
|RegressionEvaluator| 8% |
## How was this patch tested?
existing tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#19232 from zhengruifeng/use_treeAggregate.
## What changes were proposed in this pull request?
`PeriodicRDDCheckpointer` will automatically persist the last 3 datasets called by `PeriodicRDDCheckpointer.update()`.
In GBTs, the last 3 intermediate rdds are still cached after `fit()`
## How was this patch tested?
existing tests and local test in spark-shell
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#19288 from zhengruifeng/gbt_unpersist.
## What changes were proposed in this pull request?
There is an incorrect `scalastyle:on` comment in `stringExpressions.scala` and causes the line size limit check ineffective in the file. There are many lines of code and comment which are more than 100 chars.
## How was this patch tested?
Code style change only.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19305 from viirya/fix-wrong-style.
## What changes were proposed in this pull request?
We have to make sure that SerializerManager's private instance of
kryo also uses the right classloader, regardless of the current thread
classloader. In particular, this fixes serde during remote cache
fetches, as those occur in netty threads.
## How was this patch tested?
Manual tests & existing suite via jenkins. I haven't been able to reproduce this is in a unit test, because when a remote RDD partition can be fetched, there is a warning message and then the partition is just recomputed locally. I manually verified the warning message is no longer present.
Author: Imran Rashid <irashid@cloudera.com>
Closes#19280 from squito/SPARK-21928_ser_classloader.
## What changes were proposed in this pull request?
Adjust EnsureStatefulOpPartitioningSuite to use scalatest lifecycle normally instead of constructor; fixes:
```
*** RUN ABORTED ***
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.streaming.EnsureStatefulOpPartitioningSuite.<init>(EnsureStatefulOpPartitioningSuite.scala:35)
```
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19306 from srowen/SPARK-21977.2.
## What changes were proposed in this pull request?
In SQL conditional expressions, only CASE WHEN lacks for expression description. This patch fills the gap.
## How was this patch tested?
Only documentation change.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19304 from viirya/casewhen-doc.
## What changes were proposed in this pull request?
This work is a part of [SPARK-17074](https://issues.apache.org/jira/browse/SPARK-17074) to compute equi-height histograms. Equi-height histogram is an array of bins. A bin consists of two endpoints which form an interval of values and the ndv in that interval.
This PR creates a new aggregate function, given an array of endpoints, counting distinct values (ndv) in intervals among those endpoints.
This PR also refactors `HyperLogLogPlusPlus` by extracting a helper class `HyperLogLogPlusPlusHelper`, where the underlying HLLPP algorithm locates.
## How was this patch tested?
Add new test cases.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#15544 from wzhfy/countIntervals.
## What changes were proposed in this pull request?
This PR make `sample(...)` able to omit `withReplacement` defaulting to `FALSE`.
In short, the following examples are allowed:
```r
> df <- createDataFrame(as.list(seq(10)))
> count(sample(df, fraction=0.5, seed=3))
[1] 4
> count(sample(df, fraction=1.0))
[1] 10
```
In addition, this PR also adds some type checking logics as below:
```r
> sample(df, fraction = "a")
Error in sample(df, fraction = "a") :
fraction must be numeric; however, got character
> sample(df, fraction = 1, seed = NULL)
Error in sample(df, fraction = 1, seed = NULL) :
seed must not be NULL or NA; however, got NULL
> sample(df, list(1), 1.0)
Error in sample(df, list(1), 1) :
withReplacement must be logical; however, got list
> sample(df, fraction = -1.0)
...
Error in sample : illegal argument - requirement failed: Sampling fraction (-1.0) must be on interval [0, 1] without replacement
```
## How was this patch tested?
Manually tested, unit tests added in `R/pkg/tests/fulltests/test_sparkSQL.R`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19243 from HyukjinKwon/SPARK-21780.
## What changes were proposed in this pull request?
This is a followup work of SPARK-9104 to expose the Netty memory usage to MetricsSystem. Current the shuffle Netty memory usage of `NettyBlockTransferService` will be exposed, if using external shuffle, then the Netty memory usage of `ExternalShuffleClient` and `ExternalShuffleService` will be exposed instead. Currently I don't expose Netty memory usage of `YarnShuffleService`, because `YarnShuffleService` doesn't have `MetricsSystem` itself, and is better to connect to Hadoop's MetricsSystem.
## How was this patch tested?
Manually verified in local cluster.
Author: jerryshao <sshao@hortonworks.com>
Closes#19160 from jerryshao/SPARK-21934.
## What changes were proposed in this pull request?
This a follow-up of https://github.com/apache/spark/pull/19289 , we missed another place: `rollup`. `Seq.init.toSeq` also returns a `Stream`, we should fix it too.
## How was this patch tested?
manually
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19298 from cloud-fan/bug.
## What changes were proposed in this pull request?
When the libraries temp directory(i.e. __spark_libs__*.zip dir) file system and staging dir(destination) file systems are the same then the __spark_libs__*.zip is not copying to the staging directory. But after making this decision the libraries zip file is getting deleted immediately and becoming unavailable for the Node Manager's localization.
With this change, client copies the files to remote always when the source scheme is "file".
## How was this patch tested?
I have verified it manually in yarn/cluster and yarn/client modes with hdfs and local file systems.
Author: Devaraj K <devaraj@apache.org>
Closes#19141 from devaraj-kavali/SPARK-21384.
The live listener bus now cleans up after itself and releases listeners
after stopping, so code cannot get references to listeners after the
Spark context is stopped.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19297 from vanzin/SPARK-18838.hotfix.
## What changes were proposed in this pull request?
Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```
It can be traced back to https://github.com/apache/spark/pull/15484 , which made `Expand.projections` a lazy `Stream` for group by cube.
In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts.
This change is also good for master branch, to reduce the serialized size of `Expand.projections`.
## How was this patch tested?
manually verified with Spark with Scala 2.10.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19289 from cloud-fan/bug.
## What changes were proposed in this pull request?
Clarify behavior of to_utc_timestamp/from_utc_timestamp with an example
## How was this patch tested?
Doc only change / existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19276 from srowen/SPARK-22049.
## What changes were proposed in this pull request?
See https://github.com/apache/spark/pull/19282
Revert scala-maven-plugin to 3.2.2 to work with Maven+zinc again
## How was this patch tested?
Reproduced locally with zinc, and confirmed this removes the problem.
Author: Sean Owen <sowen@cloudera.com>
Closes#19292 from srowen/SPARK-22066.2.
## What changes were proposed in this pull request?
Update plugins, including scala-maven-plugin, to latest versions. Update checkstyle to 8.2. Remove bogus checkstyle config and enable it. Fix existing and new Java checkstyle errors.
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19282 from srowen/SPARK-22066.
## What changes were proposed in this pull request?
This is a bit hard to explain as there are several issues here, I'll try my best. Here are the requirements:
1. A StructuredStreaming Source that can generate empty RDDs with 0 partitions
2. A StructuredStreaming query that uses the above source, performs a stateful aggregation
(mapGroupsWithState, groupBy.count, ...), and coalesce's by 1
The crux of the problem is that when a dataset has a `coalesce(1)` call, it receives a `SinglePartition` partitioning scheme. This scheme satisfies most required distributions used for aggregations such as HashAggregateExec. This causes a world of problems:
Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 0 partitions, nothing will be executed, the state store will not create any delta files. When this happens, the next trigger fails, because the StateStore fails to load the delta file for the previous trigger
Symptom 2. Let's say that there was data. Then in this case, if you stop your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` number of StateStores will fail to find its delta files.
To fix the issues above, we must check that the partitioning of the child of a `StatefulOperator` satisfies:
If the grouping expressions are empty:
a) AllTuple distribution
b) Single physical partition
If the grouping expressions are non empty:
a) Clustered distribution
b) spark.sql.shuffle.partition # of partitions
whether or not `coalesce(1)` exists in the plan, and whether or not the input RDD for the trigger has any data.
Once you fix the above problem by adding an Exchange to the plan, you come across the following bug:
If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if you have a trigger with no data, `StateStoreRestoreExec` doesn't return the prior state. However, for this specific aggregation, `HashAggregateExec` after the restore returns a (0, 0) row, since we're performing a count, and there is no data. Then this data gets stored in `StateStoreSaveExec` causing the previous counts to be overwritten and lost.
## How was this patch tested?
Regression tests
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#19196 from brkyvz/sa-0.
This change modifies the live listener bus so that all listeners are
added to queues; each queue has its own thread to dispatch events,
making it possible to separate slow listeners from other more
performance-sensitive ones.
The public API has not changed - all listeners added with the existing
"addListener" method, which after this change mostly means all
user-defined listeners, end up in a default queue. Internally, there's
an API allowing listeners to be added to specific queues, and that API
is used to separate the internal Spark listeners into 3 categories:
application status listeners (e.g. UI), executor management (e.g. dynamic
allocation), and the event log.
The queueing logic, while abstracted away in a separate class, is kept
as much as possible hidden away from consumers. Aside from choosing their
queue, there's no code change needed to take advantage of queues.
Test coverage relies on existing tests; a few tests had to be tweaked
because they relied on `LiveListenerBus.postToAll` being synchronous,
and the change makes that method asynchronous. Other tests were simplified
not to use the asynchronous LiveListenerBus.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19211 from vanzin/SPARK-18838.
## What changes were proposed in this pull request?
The ArrowWriter StringWriter was setting Arrow data using a position of 0 instead of the actual position in the ByteBuffer. This was currently working because of a bug ARROW-1443, and has been fixed as of
Arrow 0.7.0. Testing with this version revealed the error in ArrowConvertersSuite test string conversion.
## How was this patch tested?
Existing tests, manually verified working with Arrow 0.7.0
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19284 from BryanCutler/arrow-ArrowWriter-StringWriter-position-SPARK-22067.
## What changes were proposed in this pull request?
Tables in the catalog cache are not invalidated once their statistics are updated. As a consequence, existing sessions will use the cached information even though it is not valid anymore. Consider and an example below.
```
// step 1
spark.range(100).write.saveAsTable("tab1")
// step 2
spark.sql("analyze table tab1 compute statistics")
// step 3
spark.sql("explain cost select distinct * from tab1").show(false)
// step 4
spark.range(100).write.mode("append").saveAsTable("tab1")
// step 5
spark.sql("explain cost select distinct * from tab1").show(false)
```
After step 3, the table will be present in the catalog relation cache. Step 4 will correctly update the metadata inside the catalog but will NOT invalidate the cache.
By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 3 and step 4 would also solve the problem.
## How was this patch tested?
Current and additional unit tests.
Author: aokolnychyi <anton.okolnychyi@sap.com>
Closes#19252 from aokolnychyi/spark-21969.
## What changes were proposed in this pull request?
org.apache.spark.sql.jdbc.JdbcDialect's method:
def isCascadingTruncateTable(): Option[Boolean] = None
is not overriden in org.apache.spark.sql.jdbc.AggregatedDialect class.
Because of this issue, when you add more than one dialect Spark doesn't truncate table because isCascadingTruncateTable always returns default None for Aggregated Dialect.
Will implement isCascadingTruncateTable in AggregatedDialect class in this PR.
## How was this patch tested?
In JDBCSuite, inside test("Aggregated dialects"), will add one line to test AggregatedDialect.isCascadingTruncateTable
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#19256 from huaxingao/spark-21338.
## What changes were proposed in this pull request?
Remove unnecessary default value setting for all evaluators, as we have set them in corresponding _HasXXX_ base classes.
## How was this patch tested?
Existing tests.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#19262 from yanboliang/evaluation.
## What changes were proposed in this pull request?
In the current Spark, when submitting application on YARN with remote resources `./bin/spark-shell --jars http://central.maven.org/maven2/com/github/swagger-akka-http/swagger-akka-http_2.11/0.10.1/swagger-akka-http_2.11-0.10.1.jar --master yarn-client -v`, Spark will be failed with:
```
java.io.IOException: No FileSystem for scheme: http
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:354)
at org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:478)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:600)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:599)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:599)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:598)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:598)
at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:848)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:173)
```
This is because `YARN#client` assumes resources are on the Hadoop compatible FS. To fix this problem, here propose to download remote http(s) resources to local and add this local downloaded resources to dist cache. This solution has one downside: remote resources are downloaded and uploaded again, but it only restricted to only remote http(s) resources, also the overhead is not so big. The advantages of this solution is that it is simple and the code changes restricts to only `SparkSubmit`.
## How was this patch tested?
Unit test added, also verified in local cluster.
Author: jerryshao <sshao@hortonworks.com>
Closes#19130 from jerryshao/SPARK-21917.
## What changes were proposed in this pull request?
While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too.
## How was this patch tested?
existing ut
cc cloud-fan jiangxb1987
Author: Kent Yao <yaooqinn@hotmail.com>
Closes#19068 from yaooqinn/SPARK-21428-FOLLOWUP.
Current implementation for processingRate-total uses wrong metric:
mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond
## What changes were proposed in this pull request?
Adjust processingRate-total from using inputRowsPerSecond to processedRowsPerSecond
## How was this patch tested?
Built spark from source with proposed change and tested output with correct parameter. Before change the csv metrics file for inputRate-total and processingRate-total displayed the same values due to the error. After changing MetricsReporter.scala the processingRate-total csv file displayed the correct metric.
<img width="963" alt="processed rows per second" src="https://user-images.githubusercontent.com/32072374/30554340-82eea12c-9ca4-11e7-8370-8168526ff9a2.png">
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Taaffy <32072374+Taaffy@users.noreply.github.com>
Closes#19268 from Taaffy/patch-1.
## What changes were proposed in this pull request?
* Removed the method `org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`.
It became unused as a result of 85b0a15754
(SPARK-15962) introducing word alignment for unsafe arrays.
* Cleaned up duplicate code in memory management and unsafe sorters
* The change extracting the exception paths is more than just cosmetics since it def. reduces the size the affected methods compile to
## How was this patch tested?
* Build still passes after removing the method, grepping the codebase for `alignToWords` shows no reference to it anywhere either.
* Dried up code is covered by existing tests.
Author: Armin <me@obrown.io>
Closes#19254 from original-brownbear/cleanup-mem-consumer.
## What changes were proposed in this pull request?
When Spark persist data to Unsafe memory, we call the method `MemoryStore.putIteratorAsBytes`, which need synchronize the `memoryManager` for every record write. This implementation is not necessary, we can apply for more memory at a time to reduce unnecessary synchronization.
## How was this patch tested?
Test case (with 1 executor 20 core):
```scala
val start = System.currentTimeMillis()
val data = sc.parallelize(0 until Integer.MAX_VALUE, 100)
.persist(StorageLevel.OFF_HEAP)
.count()
println(System.currentTimeMillis() - start)
```
Test result:
before
| 27647 | 29108 | 28591 | 28264 | 27232 |
after
| 26868 | 26358 | 27767 | 26653 | 26693 |
Author: Xianyang Liu <xianyang.liu@intel.com>
Closes#19135 from ConeyLiu/memorystore.