## What changes were proposed in this pull request?
`BytesToBytesMapOnHeapSuite`.`randomizedStressTest` caused `OutOfMemoryError` on several test runs. Seems better to reduce memory usage in this test.
## How was this patch tested?
Unit tests.
Closes#22297 from viirya/SPARK-25290.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This PR works for one of TODOs in `GenerateUnsafeProjection` "if the nullability of field is correct, we can use it to save null check" to simplify generated code.
When `nullable=false` in `DataType`, `GenerateUnsafeProjection` removed code for null checks in the generated Java code.
## How was this patch tested?
Added new test cases into `GenerateUnsafeProjectionSuite`
Closes#20637 from kiszk/SPARK-23466.
Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
## What changes were proposed in this pull request?
Fixes the issue brought up in https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/273 where the arguments were being comma-delineated, which was incorrect wrt to the PythonRunner and RRunner.
## How was this patch tested?
Modified unit test to test this change.
Author: Ilan Filonenko <if56@cornell.edu>
Closes#22257 from ifilonenko/SPARK-25264.
## What changes were proposed in this pull request?
I propose to remove one of `parmap` methods which accepts an execution context as a parameter. The method should be removed to eliminate any deadlocks that can occur if `parmap` is called recursively on thread pools restricted by size.
Closes#22292 from MaxGekk/remove-overloaded-parmap.
Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
Move the output verification of Explain test cases to a new suite ExplainSuite.
## How was this patch tested?
N/A
Closes#22300 from gatorsmile/test3200.
Authored-by: Xiao Li <gatorsmile@gmail.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
As described in [SPARK-25261](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-25261),the unit of spark.executor.memory and spark.driver.memory is parsed as bytes in some cases if no unit specified, while in https://spark.apache.org/docs/latest/configuration.html#application-properties, they are descibed as MiB, which may lead to some misunderstandings.
## How was this patch tested?
N/A
Closes#22252 from ivoson/branch-correct-configuration.
Lead-authored-by: huangtengfei02 <huangtengfei02@baidu.com>
Co-authored-by: Huang Tengfei <tengfei.h@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Currently, filter pushdown will not work if Parquet schema and Hive metastore schema are in different letter cases even spark.sql.caseSensitive is false.
Like the below case:
```scala
spark.sparkContext.hadoopConfiguration.setInt("parquet.block.size", 8 * 1024 * 1024)
spark.range(1, 40 * 1024 * 1024, 1, 1).sortWithinPartitions("id").write.parquet("/tmp/t")
sql("CREATE TABLE t (ID LONG) USING parquet LOCATION '/tmp/t'")
sql("select * from t where id < 100L").write.csv("/tmp/id")
```
Although filter "ID < 100L" is generated by Spark, it fails to pushdown into parquet actually, Spark still does the full table scan when reading.
This PR provides a case-insensitive field resolution to make it work.
Before - "ID < 100L" fail to pushedown:
<img width="273" alt="screen shot 2018-08-23 at 10 08 26 pm" src="https://user-images.githubusercontent.com/2989575/44530558-40ef8b00-a721-11e8-8abc-7f97671590d3.png">
After - "ID < 100L" pushedown sucessfully:
<img width="267" alt="screen shot 2018-08-23 at 10 08 40 pm" src="https://user-images.githubusercontent.com/2989575/44530567-44831200-a721-11e8-8634-e9f664b33d39.png">
## How was this patch tested?
Added UTs.
Closes#22197 from yucai/SPARK-25207.
Authored-by: yucai <yyu1@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Switch `org.apache.hive.service.server.HiveServer2` to register its shutdown callback with Spark's `ShutdownHookManager`, rather than direct with the Java Runtime callback.
This avoids race conditions in shutdown where the filesystem is shutdown before the flush/write/rename of the event log is completed, particularly on object stores where the write and rename can be slow.
## How was this patch tested?
There's no explicit unit for test this, which is consistent with every other shutdown hook in the codebase.
* There's an implicit test when the scalatest process is halted.
* More manual/integration testing is needed.
HADOOP-15679 has added the ability to explicitly execute the hadoop shutdown hook sequence which spark uses; that could be stabilized for testing if desired, after which all the spark hooks could be tested. Until then: external system tests only.
Author: Steve Loughran <stevel@hortonworks.com>
Closes#22186 from steveloughran/BUG/SPARK-25183-shutdown.
## What changes were proposed in this pull request?
### For `SPARK-5775 read array from partitioned_parquet_with_key_and_complextypes`:
scala2.12
```
scala> (1 to 10).toString
res4: String = Range 1 to 10
```
scala2.11
```
scala> (1 to 10).toString
res2: String = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
```
And
```
def prepareAnswer(answer: Seq[Row], isSorted: Boolean): Seq[Row] = {
val converted: Seq[Row] = answer.map(prepareRow)
if (!isSorted) converted.sortBy(_.toString()) else converted
}
```
sortBy `_.toString` is not a good idea.
### Other failures are caused by
```
Array(Int.box(1)).toSeq == Array(Double.box(1.0)).toSeq
```
It is false in 2.12.2 + and is true in 2.11.x , 2.12.0, 2.12.1
## How was this patch tested?
This is a patch on a specific unit test.
Closes#22264 from sadhen/SPARK25256.
Authored-by: 忍冬 <rendong@wacai.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Add an up-front check that `JIRA_USERNAME` and `JIRA_PASSWORD` have been set. If they haven't, ask user if they want to continue. This prevents the JIRA state update from failing at the very end of the process because user forgot to set these environment variables.
## How was this patch tested?
I ran the script with environment vars set, and unset, to verify it works as specified.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#22294 from erikerlandson/spark-25287.
Authored-by: Erik Erlandson <eerlands@redhat.com>
Signed-off-by: Erik Erlandson <eerlands@redhat.com>
## What changes were proposed in this pull request?
Add a PAM configuration in k8s dockerfile to require authentication into wheel to run as `su`
## How was this patch tested?
Verify against CI that PAM config succeeds & causes no regressions
Closes#22285 from erikerlandson/spark-25275.
Authored-by: Erik Erlandson <eerlands@redhat.com>
Signed-off-by: Erik Erlandson <eerlands@redhat.com>
## What changes were proposed in this pull request?
Error messages from https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test/job/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/183/
```
[INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) spark-repl_2.12 ---
[INFO] Using zinc server for incremental compilation
[warn] Pruning sources from previous analysis, due to incompatible CompileSetup.
[info] Compiling 6 Scala sources to /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/repl/target/scala-2.12/classes...
[error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala:80: overriding lazy value importableSymbolsWithRenames in class ImportHandler of type List[(this.intp.global.Symbol, this.intp.global.Name)];
[error] lazy value importableSymbolsWithRenames needs `override' modifier
[error] lazy val importableSymbolsWithRenames: List[(Symbol, Name)] = {
[error] ^
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala:53: variable addedClasspath in class ILoop is deprecated (since 2.11.0): use reset, replay or require to update class path
[warn] if (addedClasspath != "") {
[warn] ^
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala:54: variable addedClasspath in class ILoop is deprecated (since 2.11.0): use reset, replay or require to update class path
[warn] settings.classpath append addedClasspath
[warn] ^
[warn] two warnings found
[error] one error found
[error] Compile failed at Aug 29, 2018 5:28:22 PM [0.679s]
```
Readd the profile for `scala-2.12`. Using `-Pscala-2.12` will overrides `extra.source.dir` and `extra.testsource.dir` with two non-exist directories.
## How was this patch tested?
First, make sure it compiles.
```
dev/change-scala-version.sh 2.12
mvn -Pscala-2.12 -DskipTests compile install
```
Then, make a distribution to try the repl:
`./dev/make-distribution.sh --name custom-spark --tgz -Phadoop-2.7 -Phive -Pyarn -Pscala-2.12`
```
18/08/30 16:04:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://172.16.131.140:4040
Spark context available as 'sc' (master = local[*], app id = local-1535616298812).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT
/_/
Using Scala version 2.12.6 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("select percentile(key, 1) from values (1, 1),(2, 1) T(key, value)").show
+-------------------------------------+
|percentile(key, CAST(1 AS DOUBLE), 1)|
+-------------------------------------+
| 2.0|
+-------------------------------------+
```
Closes#22280 from sadhen/SPARK_24785_FOLLOWUP.
Authored-by: 忍冬 <rendong@wacai.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
`JavaColumnExpressionSuite.java` was added and `org.apache.spark.sql.ColumnExpressionSuite#test("isInCollection: Java Collection")` was removed.
It provides native Java tests for the method `org.apache.spark.sql.Column#isInCollection`.
Closes#22253 from aai95/isInCollectionJavaTest.
Authored-by: aai95 <aai95@yandex.ru>
Signed-off-by: DB Tsai <d_tsai@apple.com>
After SPARK-18371, it is guaranteed that there would be at least one message per partition per batch using direct kafka API when new messages exist in the topics. This change will give the user the option of setting the minimum instead of just a hard coded 1 limit
The related unit test is updated and some internal tests verified that the topic partitions with new messages will be progressed by the specified minimum.
Author: Reza Safi <rezasafi@cloudera.com>
Closes#22223 from rezasafi/streaminglag.
## What changes were proposed in this pull request?
This PR is an follow-up PR of #21087 based on [a discussion thread](https://github.com/apache/spark/pull/21087#discussion_r211080067]. Since #21087 changed a condition of `if` statement, the message in an exception is not consistent of the current behavior.
This PR updates the exception message.
## How was this patch tested?
Existing UTs
Closes#22269 from kiszk/SPARK-23997-followup.
Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
R tests require `testthat` v1.0.2. In the PR, I described how to install the version in the section http://spark.apache.org/docs/latest/building-spark.html#running-r-tests.
Closes#22272 from MaxGekk/r-testthat-doc.
Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Since https://github.com/apache/spark/pull/21696. Spark uses Parquet schema instead of Hive metastore schema to do pushdown.
That change can avoid wrong records returned when Hive metastore schema and parquet schema are in different letter cases. This pr add a test case for it.
More details:
https://issues.apache.org/jira/browse/SPARK-25206
## How was this patch tested?
unit tests
Closes#22267 from wangyum/SPARK-24716-TESTS.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Introduced by #21320 and #11744
```
$ sbt
> ++2.12.6
> project sql
> compile
...
[error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:41: match may not be exhaustive.
[error] It would fail on the following inputs: (_, ArrayType(_, _)), (_, _)
[error] [warn] getProjection(a.child).map(p => (p, p.dataType)).map {
[error] [warn]
[error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:52: match may not be exhaustive.
[error] It would fail on the following input: (_, _)
[error] [warn] getProjection(child).map(p => (p, p.dataType)).map {
[error] [warn]
...
```
And
```
$ sbt
> ++2.12.6
> project hive
> testOnly *ParquetMetastoreSuite
...
[error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:22: object tools is not a member of package scala
[error] import scala.tools.nsc.Properties
[error] ^
[error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:146: not found: value Properties
[error] val version = Properties.versionNumberString match {
[error] ^
[error] two errors found
...
```
## How was this patch tested?
Existing tests.
Closes#22260 from sadhen/fix_exhaustive_match.
Authored-by: 忍冬 <rendong@wacai.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
HyukjinKwon
## What changes were proposed in this pull request?
add __from pyspark.util import \_exception_message__ to python/pyspark/java_gateway.py
## How was this patch tested?
[flake8](http://flake8.pycqa.org) testing of https://github.com/apache/spark on Python 3.7.0
$ __flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics__
```
./python/pyspark/java_gateway.py:172:20: F821 undefined name '_exception_message'
emsg = _exception_message(e)
^
1 F821 undefined name '_exception_message'
1
```
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#22265 from cclauss/patch-2.
Authored-by: cclauss <cclauss@bluewin.ch>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
Spark scheduler can hang when fetch failures, executor lost, task running on lost executor, and multiple stage attempts. To fix this we change to always unregister the pending partition on task completion.
## What changes were proposed in this pull request?
this PR is actually reverting the change in SPARK-19263, so that it always does shuffleStage.pendingPartitions -= task.partitionId. The change in SPARK-23433, should fix the issue originally from SPARK-19263.
## How was this patch tested?
Unit tests. The condition happens on a race which I haven't reproduced on a real customer, just see it sometimes on customers jobs in a real cluster.
I am also working on adding spark scheduler integration tests.
Closes#21976 from tgravescs/SPARK-24909.
Authored-by: Thomas Graves <tgraves@unharmedunarmed.corp.ne1.yahoo.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
`fromId` is the child, and `toId` is the parent, see line 127 in `buildSparkPlanGraphNode` above.
The edges in Spark UI also go from child to parent.
## How was this patch tested?
Comment change only. Inspected code above. Inspected how the edges in Spark UI look like.
Closes#22268 from juliuszsompolski/sparkplangraphedgedoc.
Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
I made one pass over the Python APIs for barrier mode and updated them to match the Scala doc in #22240 . Major changes:
* export the public classes
* expand the docs
* add doc for BarrierTaskInfo.addresss
cc: jiangxb1987
Closes#22261 from mengxr/SPARK-25248.1.
Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
## What changes were proposed in this pull request?
BarrierCoordinator uses Timer and TimerTask. `TimerTask#cancel()` is invoked in ContextBarrierState#cancelTimerTask but `Timer#purge()` is never invoked.
Once a TimerTask is scheduled, the reference to it is not released until `Timer#purge()` is invoked even though `TimerTask#cancel()` is invoked.
## How was this patch tested?
I checked the number of instances related to the TimerTask using jmap.
Closes#22258 from sarutak/fix-barrierexec-oom.
Authored-by: sarutak <sarutak@oss.nttdata.co.jp>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
## What changes were proposed in this pull request?
Alternative take on https://github.com/apache/spark/pull/22063 that does not introduce udfInternal.
Resolve issue with inferring func types in 2.12 by instead using info captured when UDF is registered -- capturing which types are nullable (i.e. not primitive)
## How was this patch tested?
Existing tests.
Closes#22259 from srowen/SPARK-25044.2.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This changes the calls of `toPandas()` and `createDataFrame()` to use the Arrow stream format, when Arrow is enabled. Previously, Arrow data was written to byte arrays where each chunk is an output of the Arrow file format. This was mainly due to constraints at the time, and caused some overhead by writing the schema/footer on each chunk of data and then having to read multiple Arrow file inputs and concat them together.
Using the Arrow stream format has improved these by increasing performance, lower memory overhead for the average case, and simplified the code. Here are the details of this change:
**toPandas()**
_Before:_
Spark internal rows are converted to Arrow file format, each group of records is a complete Arrow file which contains the schema and other metadata. Next a collect is done and an Array of Arrow files is the result. After that each Arrow file is sent to Python driver which then loads each file and concats them to a single Arrow DataFrame.
_After:_
Spark internal rows are converted to ArrowRecordBatches directly, which is the simplest Arrow component for IPC data transfers. The driver JVM then immediately starts serving data to Python as an Arrow stream, sending the schema first. It then starts a Spark job with a custom handler that sends Arrow RecordBatches to Python. Partitions arriving in order are sent immediately, and out-of-order partitions are buffered until the ones that precede it come in. This improves performance, simplifies memory usage on executors, and improves the average memory usage on the JVM driver. Since the order of partitions must be preserved, the worst case is that the first partition will be the last to arrive all data must be buffered in memory until then. This case is no worse that before when doing a full collect.
**createDataFrame()**
_Before:_
A Pandas DataFrame is split into parts and each part is made into an Arrow file. Then each file is prefixed by the buffer size and written to a temp file. The temp file is read and each Arrow file is parallelized as a byte array.
_After:_
A Pandas DataFrame is split into parts, then an Arrow stream is written to a temp file where each part is an ArrowRecordBatch. The temp file is read as a stream and the Arrow messages are examined. If the message is an ArrowRecordBatch, the data is saved as a byte array. After reading the file, each ArrowRecordBatch is parallelized as a byte array. This has slightly more processing than before because we must look each Arrow message to extract the record batches, but performance ends up a litle better. It is cleaner in the sense that IPC from Python to JVM is done over a single Arrow stream.
## How was this patch tested?
Added new unit tests for the additions to ArrowConverters in Scala, existing tests for Python.
## Performance Tests - toPandas
Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each.
Test code
```python
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand())
for i in range(5):
start = time.time()
_ = df.toPandas()
elapsed = time.time() - start
```
Current Master | This PR
---------------------|------------
5.803557 | 5.16207
5.409119 | 5.133671
5.493509 | 5.147513
5.433107 | 5.105243
5.488757 | 5.018685
Avg Master | Avg This PR
------------------|--------------
5.5256098 | 5.1134364
Speedup of **1.08060595**
## Performance Tests - createDataFrame
Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `createDataFrame()` and get the first record. Took the average best time of 5 runs/5 loops each.
Test code
```python
def run():
pdf = pd.DataFrame(np.random.rand(10000000, 10))
spark.createDataFrame(pdf).first()
for i in range(6):
start = time.time()
run()
elapsed = time.time() - start
gc.collect()
print("Run %d: %f" % (i, elapsed))
```
Current Master | This PR
--------------------|----------
6.234608 | 5.665641
6.32144 | 5.3475
6.527859 | 5.370803
6.95089 | 5.479151
6.235046 | 5.529167
Avg Master | Avg This PR
---------------|----------------
6.4539686 | 5.4784524
Speedup of **1.178064192**
## Memory Improvements
**toPandas()**
The most significant improvement is reduction of the upper bound space complexity in the JVM driver. Before, the entire dataset was collected in the JVM first before sending it to Python. With this change, as soon as a partition is collected, the result handler immediately sends it to Python, so the upper bound is the size of the largest partition. Also, using the Arrow stream format is more efficient because the schema is written once per stream, followed by record batches. The schema is now only send from driver JVM to Python. Before, multiple Arrow file formats were used that each contained the schema. This duplicated schema was created in the executors, sent to the driver JVM, and then Python where all but the first one received are discarded.
I verified the upper bound limit by running a test that would collect data that would exceed the amount of driver JVM memory available. Using these settings on a standalone cluster:
```
spark.driver.memory 1g
spark.executor.memory 5g
spark.sql.execution.arrow.enabled true
spark.sql.execution.arrow.fallback.enabled false
spark.sql.execution.arrow.maxRecordsPerBatch 0
spark.driver.maxResultSize 2g
```
Test code:
```python
from pyspark.sql.functions import rand
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand())
df.toPandas()
```
This makes total data size of 33554432×8×4 = 1073741824
With the current master, it fails with OOM but passes using this PR.
**createDataFrame()**
No significant change in memory except that using the stream format instead of separate file formats avoids duplicated the schema, similar to toPandas above. The process of reading the stream and parallelizing the batches does cause the record batch message metadata to be copied, but it's size is insignificant.
Closes#21546 from BryanCutler/arrow-toPandas-stream-SPARK-23030.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Using some reflection tricks to merge Scala 2.11 and 2.12 codebase.
## How was this patch tested?
Existing tests.
Closes#22246 from dbtsai/repl.
Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter:
1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator)
2) for `rdd._load_from_socket`, the timeout is only increased after authentication.
Closes#22247 from squito/py_connection_refactor.
Authored-by: Imran Rashid <irashid@cloudera.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
`toAvroType` converts spark data type to avro schema. It always appends the record name to namespace so its impossible to have an Avro namespace independent of the record name.
When invoked with a spark data type like,
```java
val sparkSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("address", StructType(Seq(
StructField("city", StringType, nullable = false),
StructField("state", StringType, nullable = false))),
nullable = false)))
// map it to an avro schema with record name "employee" and top level namespace "foo.bar",
val avroSchema = SchemaConverters.toAvroType(sparkSchema, false, "employee", "foo.bar")
// result is
// avroSchema.getName = employee
// avroSchema.getNamespace = foo.bar.employee
// avroSchema.getFullname = foo.bar.employee.employee
```
The patch proposes to fix this so that the result is
```
avroSchema.getName = employee
avroSchema.getNamespace = foo.bar
avroSchema.getFullname = foo.bar.employee
```
## How was this patch tested?
New and existing unit tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#22251 from arunmahadevan/avro-fix.
Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Most of `HigherOrderFunction`s have the same `nullable` definition, ie. they are nullable when one of their arguments is nullable. The PR refactors it in order to avoid code duplication.
## How was this patch tested?
NA
Closes#22243 from mgaido91/MINOR_nullable_hof.
Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Fix the issue that minPartitions was not used in the method. This is a simple fix and I am not trying to make it complicated. The purpose is to still allow user to control the defaultParallelism through the value of minPartitions, while also via sc.defaultParallelism parameters.
## How was this patch tested?
I have not provided the additional test since the fix is very straightforward.
Closes#21638 from bomeng/22357.
Lead-authored-by: Bo Meng <mengbo@hotmail.com>
Co-authored-by: Bo Meng <bo.meng@jd.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Support Filter in ConvertToLocalRelation, similar to how Project works.
Additionally, in Optimizer, run ConvertToLocalRelation earlier to simplify the plan. This is good for very short queries which often are queries on local relations.
## How was this patch tested?
New test. Manual benchmark.
Author: Bogdan Raducanu <bogdan@databricks.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Author: Yinan Li <ynli@google.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: s71955 <sujithchacko.2010@gmail.com>
Author: DB Tsai <d_tsai@apple.com>
Author: jaroslav chládek <mastermism@gmail.com>
Author: Huangweizhe <huangweizhe@bbdservice.com>
Author: Xiangrui Meng <meng@databricks.com>
Author: hyukjinkwon <gurwls223@apache.org>
Author: Kent Yao <yaooqinn@hotmail.com>
Author: caoxuewen <cao.xuewen@zte.com.cn>
Author: liuxian <liu.xian3@zte.com.cn>
Author: Adam Bradbury <abradbury@users.noreply.github.com>
Author: Jose Torres <torres.joseph.f+github@gmail.com>
Author: Yuming Wang <yumwang@ebay.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#22205 from bogdanrdc/local-relation-filter.
## What changes were proposed in this pull request?
This adds `spark.executor.pyspark.memory` to configure Python's address space limit, [`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS). Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory:
```
File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in fe_engineer
fe_eval_rec.update(f(src_rec_prep, mat_rec_prep))
File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp
comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, []), mat_rec_prep.get(item, []))
File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in leven_list_compare
permutations = sorted(permutations, reverse=True)
MemoryError
```
The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity.
## How was this patch tested?
Tested memory limits in our YARN cluster and verified that MemoryError is thrown.
Author: Ryan Blue <blue@apache.org>
Closes#21977 from rdblue/SPARK-25004-add-python-memory-limit.
## What changes were proposed in this pull request?
In the PR, I propose to not perform recursive parallel listening of files in the `scanPartitions` method because it can cause a deadlock. Instead of that I propose to do `scanPartitions` in parallel for top level partitions only.
## How was this patch tested?
I extended an existing test to trigger the deadlock.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#22233 from MaxGekk/fix-recover-partitions.
## What changes were proposed in this pull request?
YARN `AmIpFilter` adds a new parameter "RM_HA_URLS" to support RM HA, but Spark on YARN doesn't provide a such parameter, so it will be failed to redirect when running on RM HA. The detailed exception can be checked from JIRA. So here fixing this issue by adding "RM_HA_URLS" parameter.
## How was this patch tested?
Local verification.
Closes#22164 from jerryshao/SPARK-23679.
Authored-by: jerryshao <sshao@hortonworks.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
This PR implements the possibility of the user to override the maximum number of buckets when saving to a table.
Currently the limit is a hard-coded 100k, which might be insufficient for large workloads.
A new configuration entry is proposed: `spark.sql.bucketing.maxBuckets`, which defaults to the previous 100k.
## How was this patch tested?
Added unit tests in the following spark.sql test suites:
- CreateTableAsSelectSuite
- BucketedWriteSuite
Author: Fernando Pereira <fernando.pereira@epfl.ch>
Closes#21087 from ferdonline/enh/configurable_bucket_limit.
## What changes were proposed in this pull request?
As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support:
- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch
They are all covered by the new unit tests.
## How was this patch tested?
The new unit tests.
Closes#22042 from zsxwing/kafka-transaction-read.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
Make sure TransportServer and SocketAuthHelper close the resources for all types of errors.
## How was this patch tested?
Jenkins
Closes#22210 from zsxwing/SPARK-25218.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
The PR excludes Python UDFs filters in FileSourceStrategy so that they don't ExtractPythonUDF rule to throw exception. It doesn't make sense to pass Python UDF filters in FileSourceStrategy anyway because they cannot be used as push down filters.
## How was this patch tested?
Add a new regression test
Closes#22104 from icexelloss/SPARK-24721-udf-filter.
Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Updated documentation for Spark on Kubernetes for the upcoming 2.4.0.
Please review http://spark.apache.org/contributing.html before opening a pull request.
mccheah erikerlandson
Closes#22224 from liyinan926/master.
Authored-by: Yinan Li <ynli@google.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Our HDFS cluster configured 5 nameservices: `nameservices1`, `nameservices2`, `nameservices3`, `nameservices-dev1` and `nameservices4`, but `nameservices-dev1` unstable. So sometimes an error occurred and causing the entire job failed since [SPARK-24149](https://issues.apache.org/jira/browse/SPARK-24149):
![image](https://user-images.githubusercontent.com/5399861/42434779-f10c48fc-8386-11e8-98b0-4d9786014744.png)
I think it's best to add a switch here.
## How was this patch tested?
manual tests
Closes#21734 from wangyum/SPARK-24149.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Fix flaky synchronization in Kafka tests - we need to use the scan config that was persisted rather than reconstructing it to identify the stream's current configuration.
We caught most instances of this in the original PR, but this one slipped through.
## How was this patch tested?
n/a
Closes#22245 from jose-torres/fixflake.
Authored-by: Jose Torres <torres.joseph.f+github@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
This PR adds a unit test for OpenHashMap , this can help developers to distinguish between the 0/0.0/0L and null
## How was this patch tested?
Closes#22241 from 10110346/openhashmap.
Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
this pr add a configuration parameter to configure the capacity of fast aggregation.
Performance comparison:
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
Aggregate w multiple keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
fasthash = default 5612 / 5882 3.7 267.6 1.0X
fasthash = config 3586 / 3595 5.8 171.0 1.6X
```
## How was this patch tested?
the existed test cases.
Closes#21931 from heary-cao/FastHashCapacity.
Authored-by: caoxuewen <cao.xuewen@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This is based on the discussion https://github.com/apache/spark/pull/16677/files#r212805327.
As SQL standard doesn't mandate that a nested order by followed by a limit has to respect that ordering clause, this patch removes the `child.outputOrdering` check.
## How was this patch tested?
Unit tests.
Closes#22239 from viirya/improve-global-limit-parallelism-followup.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
`__version__` in `setup.py` is currently being dynamically read by `exec`; so the linter complains. Better just switch it off for this line for now.
**Before:**
```bash
$ python -m flake8 . --count --select=E9,F82 --show-source --statistics
./setup.py:37:11: F821 undefined name '__version__'
VERSION = __version__
^
1 F821 undefined name '__version__'
1
```
**After:**
```bash
$ python -m flake8 . --count --select=E9,F82 --show-source --statistics
0
```
## How was this patch tested?
Manually tested.
Closes#22235 from HyukjinKwon/SPARK-23698.
Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Improved the documentation for the datetime functions in `org.apache.spark.sql.functions` by adding details about the supported column input types, the column return type, behaviour on invalid input, supporting examples and clarifications.
## How was this patch tested?
Manually testing each of the datetime functions with different input to ensure that the corresponding Javadoc/Scaladoc matches the behaviour of the function. Successfully ran the `unidoc` SBT process.
Closes#20901 from abradbury/SPARK-23792.
Authored-by: Adam Bradbury <abradbury@users.noreply.github.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This is a follow up PR for #22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query.
## How was this patch tested?
Jenkins.
Closes#22230 from zsxwing/SPARK-25214-2.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
An RDD is created using LabeledPoint, but the comment is like #LabeledPoint(feature, label).
Although in the method ChiSquareTest.test, the second parameter is feature and the third parameter is label, it it better to write label in front of feature here because if an RDD is created using LabeldPoint, what we get are actually (label, feature) pairs.
Now it is changed as LabeledPoint(label, feature).
The comments in Scala and Java example have the same typos.
## How was this patch tested?
tested
https://issues.apache.org/jira/browse/SPARK-24688
Author: Weizhe Huang 492816239qq.com
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#21665 from uzmijnlm/my_change.
Authored-by: Huangweizhe <huangweizhe@bbdservice.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
[SPARK-25095](ad45299d04) introduced `ambiguous reference to overloaded definition`
```
[error] /Users/d_tsai/dev/apache-spark/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:242: ambiguous reference to overloaded definition,
[error] both method addTaskCompletionListener in class TaskContext of type [U](f: org.apache.spark.TaskContext => U)org.apache.spark.TaskContext
[error] and method addTaskCompletionListener in class TaskContext of type (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error] match argument types (org.apache.spark.TaskContext => Unit)
[error] context.addTaskCompletionListener(_ => server.close())
[error] ^
[error] one error found
[error] Compile failed at Aug 24, 2018 1:56:06 PM [31.582s]
```
which fails the Scala 2.12 branch build.
## How was this patch tested?
Existing tests
Closes#22229 from dbtsai/fix-2.12-build.
Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>