This proposes an alternative way to load secret keys into a Spark application that is running on Kubernetes. Instead of automatically generating the secret, the secret key can reside in a file that is shared between both the driver and executor containers.
Unit tests.
Closes#23252 from mccheah/auth-secret-with-file.
Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
In `BytesToBytesMap.MapIterator.advanceToNextPage`, We will first lock this `MapIterator` and then `TaskMemoryManager` when going to free a memory page by calling `freePage`. At the same time, it is possibly that another memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it acquires memory and causes spilling on this `MapIterator`.
So it ends with the `MapIterator` object holds lock to the `MapIterator` object and waits for lock on `TaskMemoryManager`, and the other consumer holds lock to `TaskMemoryManager` and waits for lock on the `MapIterator` object.
To avoid deadlock here, this patch proposes to keep reference to the page to free and free it after releasing the lock of `MapIterator`.
## How was this patch tested?
Added test and manually test by running the test 100 times to make sure there is no deadlock.
Closes#23272 from viirya/SPARK-26265.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… incorrect.
## What changes were proposed in this pull request?
In the reported heartbeat information, the unit of the memory data is bytes, which is converted by the formatBytes() function in the utils.js file before being displayed in the interface. The cardinality of the unit conversion in the formatBytes function is 1000, which should be 1024.
Change the cardinality of the unit conversion in the formatBytes function to 1024.
## How was this patch tested?
manual tests
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#22683 from httfighter/SPARK-25696.
Lead-authored-by: 韩田田00222924 <han.tiantian@zte.com.cn>
Co-authored-by: han.tiantian@zte.com.cn <han.tiantian@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This adds the entire memory used by spark’s executor (as measured by procfs) to the executor metrics. The memory usage is collected from the entire process tree under the executor. The metrics are subdivided into memory used by java, by python, and by other processes, to aid users in diagnosing the source of high memory usage.
The additional metrics are sent to the driver in heartbeats, using the mechanism introduced by SPARK-23429. This also slightly extends that approach to allow one ExecutorMetricType to collect multiple metrics.
Added unit tests and also tested on a live cluster.
Closes#22612 from rezasafi/ptreememory2.
Authored-by: Reza Safi <rezasafi@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request?
Add MAXIMUM_PAGE_SIZE_BYTES Exception test
## How was this patch tested?
Existing tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#23226 from wangjiaochun/BytesToBytesMapSuite.
Authored-by: 10087686 <wang.jiaochun@zte.com.cn>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
If there are no records in memory, then we don't need to create an empty temp spill file.
## How was this patch tested?
Existing tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#23225 from wangjiaochun/ShufflSorter.
Authored-by: 10087686 <wang.jiaochun@zte.com.cn>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
spark.kafka.sasl.kerberos.service.name is an optional parameter but most of the time value `kafka` has to be set. As I've written in the jira the following reasoning is behind:
* Kafka's configuration guide suggest the same value: https://kafka.apache.org/documentation/#security_sasl_kerberos_brokerconfig
* It would be easier for spark users by providing less configuration
* Other streaming engines are doing the same
In this PR I've changed the parameter from optional to `WithDefault` and set `kafka` as default value.
## How was this patch tested?
Available unit tests + on cluster.
Closes#23254 from gaborgsomogyi/SPARK-26304.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Adds a new method to SparkAppHandle called getError which returns
the exception (if present) that caused the underlying Spark app to
fail.
New tests added to SparkLauncherSuite for the new method.
Closes#21849Closes#23221 from vanzin/SPARK-24243.
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
`enablePerfMetrics `was originally designed in `BytesToBytesMap `to control `getNumHashCollisions getTimeSpentResizingNs getAverageProbesPerLookup`.
However, as the Spark version gradual progress. this parameter is only used for `getAverageProbesPerLookup ` and always given to true when using `BytesToBytesMap`.
it is also dangerous to determine whether `getAverageProbesPerLookup `opens and throws an `IllegalStateException `exception.
So this pr will be remove `enablePerfMetrics `parameter from `BytesToBytesMap`. thanks.
## How was this patch tested?
the existed test cases.
Closes#23244 from heary-cao/enablePerfMetrics.
Authored-by: caoxuewen <cao.xuewen@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This change modifies the logic in the SecurityManager to do two
things:
- generate unique app secrets also when k8s is being used
- only store the secret in the user's UGI on YARN
The latter is needed so that k8s won't unnecessarily create
k8s secrets for the UGI credentials when only the auth token
is stored there.
On the k8s side, the secret is propagated to executors using
an environment variable instead. This ensures it works in both
client and cluster mode.
Security doc was updated to mention the feature and clarify that
proper access control in k8s should be enabled for it to be secure.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#23174 from vanzin/SPARK-26194.
## What changes were proposed in this pull request?
Currently, the common `withTempDir` function is used in Spark SQL test cases. To handle `val dir = Utils. createTempDir()` and `Utils. deleteRecursively (dir)`. Unfortunately, the `withTempDir` function cannot be used in the Spark Core test case. This PR Sharing `withTempDir` function in Spark Sql and SparkCore to clean up SparkCore test cases. thanks.
## How was this patch tested?
N / A
Closes#23151 from heary-cao/withCreateTempDir.
Authored-by: caoxuewen <cao.xuewen@zte.com.cn>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
It adds kafka delegation token support for structured streaming. Please see the relevant [SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing)
What this PR contains:
* Configuration parameters for the feature
* Delegation token fetching from broker
* Usage of token through dynamic JAAS configuration
* Minor refactoring in the existing code
What this PR doesn't contain:
* Documentation changes because design can change
## How was this patch tested?
Existing tests + added small amount of additional unit tests.
Because it's an external service integration mainly tested on cluster.
* 4 node cluster
* Kafka broker version 1.1.0
* Topic with 4 partitions
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-256
An example of obtaining a token:
```
18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID HMAC OWNER RENEWERS ISSUEDATE EXPIRYDATE MAXDATE
18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user [] 2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07
18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67
```
An example token usage:
```
18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]";
18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login.
```
Closes#22598 from gaborgsomogyi/SPARK-25501.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
In `BlockManager`, `getRemoteValues` gets a `ChunkedByteBuffer` (by calling `getRemoteBytes`) and creates an `InputStream` from it. `getRemoteBytes`, in turn, gets a `ManagedBuffer` and converts it to a `ChunkedByteBuffer`.
Instead, expose a `getRemoteManagedBuffer` method so `getRemoteValues` can just get this `ManagedBuffer` and use its `InputStream`.
When reading a remote cache block from disk, this reduces heap memory usage significantly.
Retain `getRemoteBytes` for other callers.
## How was this patch tested?
Imran Rashid wrote an application (https://github.com/squito/spark_2gb_test/blob/master/src/main/scala/com/cloudera/sparktest/LargeBlocks.scala), that among other things, tests reading remote cache blocks. I ran this application, using 2500MB blocks, to test reading a cache block on disk. Without this change, with `--executor-memory 5g`, the test fails with `java.lang.OutOfMemoryError: Java heap space`. With the change, the test passes with `--executor-memory 2g`.
I also ran the unit tests in core. In particular, `DistributedSuite` has a set of tests that exercise the `getRemoteValues` code path. `BlockManagerSuite` has several tests that call `getRemoteBytes`; I left these unchanged, so `getRemoteBytes` still gets exercised.
Closes#23058 from wypoon/SPARK-25905.
Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
… of hard coded "/" in DependencyUtils
## What changes were proposed in this pull request?
Use Java system property "file.separator" instead of hard coded "/" in DependencyUtils.
## How was this patch tested?
Manual test:
Submit Spark application via REST API that reads data from Elasticsearch using spark-elasticsearch library.
Without fix application fails with error:
18/11/22 10:36:20 ERROR Version: Multiple ES-Hadoop versions detected in the classpath; please use only one
jar:file:/C:/<...>/spark-2.4.0-bin-hadoop2.6/work/driver-20181122103610-0001/myApp-assembly-1.0.jar
jar:file:/C:/<...>/myApp-assembly-1.0.jar
18/11/22 10:36:20 ERROR Main: Application [MyApp] failed:
java.lang.Error: Multiple ES-Hadoop versions detected in the classpath; please use only one
jar:file:/C:/<...>/spark-2.4.0-bin-hadoop2.6/work/driver-20181122103610-0001/myApp-assembly-1.0.jar
jar:file:/C:/<...>/myApp-assembly-1.0.jar
at org.elasticsearch.hadoop.util.Version.<clinit>(Version.java:73)
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:214)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:73)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:72)
at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:44)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
...
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
With fix application runs successfully.
Closes#23102 from markpavey/JIRA_SPARK-26137_DependencyUtilsFileSeparatorFix.
Authored-by: Mark Pavey <markpavey@exabre.co.uk>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations.
The leak occurs because of not cleaning up `ExternalSorter`'s `readingIterator` field as it's done for its `map` and `buffer` fields.
Additionally there are changes to the `CompletionIterator` to prevent capturing its `sub`-iterator and holding it even after the completion iterator completes. It is necessary because in some cases, e.g. in case of standard scala's `flatMap` iterator (which is used is `CoalescedRDD`'s `compute` method) the next value of the main iterator is assigned to `flatMap`'s `cur` field only after it is available.
For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data should be fetched from the map-side of the shuffle, but the process of fetching this data consumes quite a lot of memory in addition to the memory already consumed by the iterator held by `flatMap`'s `cur` field (until it is reassigned).
For the following data
```scala
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress._
import org.apache.commons.lang._
import org.apache.spark._
// generate 100M records of sample data
sc.makeRDD(1 to 1000, 1000)
.flatMap(item => (1 to 100000)
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024))))
.saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
```
and the following job
```scala
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._
val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd
.map(item => item._1.toString -> item._2.toString)
.repartitionAndSortWithinPartitions(new HashPartitioner(1000))
.coalesce(10,false)
.count
```
... executed like the following
```bash
spark-shell \
--num-executors=5 \
--executor-cores=2 \
--master=yarn \
--deploy-mode=client \
--conf spark.executor.memoryOverhead=512 \
--conf spark.executor.memory=1g \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
```
... executors are always failing with OutOfMemoryErrors.
The main issue is multiple leaks of ExternalSorter references.
For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones.
![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png)
P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places.
## How was this patch tested?
- Existing unit tests
- New unit tests
- Job executions on the live environment
Here is the screenshot before applying this patch
![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png)
Here is the screenshot after applying this patch
![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png)
And in case of reducing the number of executors even more the job is still stable
![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png)
Closes#23083 from szhem/SPARK-26114-externalsorter-leak.
Authored-by: Sergey Zhemzhitsky <szhemzhitski@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This is the write side counterpart to https://github.com/apache/spark/pull/23105
## How was this patch tested?
No behavior change expected, as it is a straightforward refactoring. Updated all existing test cases.
Closes#23106 from rxin/SPARK-26141.
Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: Reynold Xin <rxin@databricks.com>
Total tasks in the aggregated table and the tasks table are not matching some times in the WEBUI.
We need to force update the executor summary of the particular executorId, when ever last task of that executor has reached. Currently it force update based on last task on the stage end. So, for some particular executorId task might miss at the stage end.
Tests to reproduce:
```
bin/spark-shell --master yarn --conf spark.executor.instances=3
sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect()
```
Before patch:
![screenshot from 2018-11-15 02-24-05](https://user-images.githubusercontent.com/23054875/48511776-b0d36480-e87d-11e8-89a8-ab97216e2c21.png)
After patch:
![screenshot from 2018-11-15 02-32-38](https://user-images.githubusercontent.com/23054875/48512141-c39a6900-e87e-11e8-8535-903e1d11d13e.png)
Closes#23038 from shahidki31/SPARK-25451.
Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Support column sort, pagination and search for Stage Page using jQuery DataTable and REST API. Before this commit, the Stage page generated a hard-coded HTML table that could not support search. Supporting search and sort (over all applications rather than the 20 entries in the current page) in any case will greatly improve the user experience.
Created the stagespage-template.html for displaying application information in datables. Added REST api endpoint and javascript code to fetch data from the endpoint and display it on the data table.
Because of the above change, certain functionalities in the page had to be modified to support the addition of datatables. For example, the toggle checkbox 'Select All' previously would add the checked fields as columns in the Task table and as rows in the Summary Metrics table, but after the change, only columns are added in the Task Table as it got tricky to add rows dynamically in the datatables.
## How was this patch tested?
I have attached the screenshots of the Stage Page UI before and after the fix.
**Before:**
<img width="1419" alt="30564304-35991e1c-9c8a-11e7-850f-2ac7a347f600" src="https://user-images.githubusercontent.com/22228190/42137915-52054558-7d3a-11e8-8c85-433b2c94161d.png">
<img width="1435" alt="31360592-cbaa2bae-ad14-11e7-941d-95b4c7d14970" src="https://user-images.githubusercontent.com/22228190/42137928-79df500a-7d3a-11e8-9068-5630afe46ff3.png">
**After:**
<img width="1432" alt="31360591-c5650ee4-ad14-11e7-9665-5a08d8f21830" src="https://user-images.githubusercontent.com/22228190/42137936-a3fb9f42-7d3a-11e8-8502-22b3897cbf64.png">
<img width="1388" alt="31360604-d266b6b0-ad14-11e7-94b5-dcc4bb5443f4" src="https://user-images.githubusercontent.com/22228190/42137970-0fabc58c-7d3b-11e8-95ad-383b1bd1f106.png">
Closes#21688 from pgandhi999/SPARK-21809-2.3.
Authored-by: pgandhi <pgandhi@oath.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
## What changes were proposed in this pull request?
`deserialize` for kryo, the type of input parameter is ByteBuffer, if it is not backed by an accessible byte array. it will throw `UnsupportedOperationException`
Exception Info:
```
java.lang.UnsupportedOperationException was thrown.
java.lang.UnsupportedOperationException
at java.nio.ByteBuffer.array(ByteBuffer.java:994)
at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:362)
```
## How was this patch tested?
Added a unit test
Closes#22779 from 10110346/InputStreamKryo.
Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This patch defines an internal Spark interface for reporting shuffle metrics and uses that in shuffle reader. Before this patch, shuffle metrics is tied to a specific implementation (using a thread local temporary data structure and accumulators). After this patch, callers that define their own shuffle RDDs can create a custom metrics implementation.
With this patch, we would be able to create a better metrics for the SQL layer, e.g. reporting shuffle metrics in the SQL UI, for each exchange operator.
Note that I'm separating read side and write side implementations, as they are very different, to simplify code review. Write side change is at https://github.com/apache/spark/pull/23106
## How was this patch tested?
No behavior change expected, as it is a straightforward refactoring. Updated all existing test cases.
Closes#23105 from rxin/SPARK-26140.
Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
the pr #20014 which introduced `SparkOutOfMemoryError` to avoid killing the entire executor when an `OutOfMemoryError `is thrown.
so apply for memory using `MemoryConsumer. allocatePage `when catch exception, use `SparkOutOfMemoryError `instead of `OutOfMemoryError`
## How was this patch tested?
N / A
Closes#23084 from heary-cao/SparkOutOfMemoryError.
Authored-by: caoxuewen <cao.xuewen@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This PR is a follow-up PR of #21600 to fix the unnecessary UI redirect.
## How was this patch tested?
Local verification
Closes#23116 from jerryshao/SPARK-24553.
Authored-by: jerryshao <jerryshao@tencent.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
In the PR, I propose:
- new SQL config `spark.sql.debug.maxToStringFields` to control maximum number fields up to which `truncatedString` cuts its input sequences.
- Moving `truncatedString` out of `core` to `sql/catalyst` because it is used only in the `sql/catalyst` packages for restricting number of fields converted to strings from `TreeNode` and expressions of`StructType`.
## How was this patch tested?
Added a test to `QueryExecutionSuite` to check that `spark.sql.debug.maxToStringFields` impacts to behavior of `truncatedString`.
Closes#23039 from MaxGekk/truncated-string-catalyst.
Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
The build has a lot of deprecation warnings. Some are new in Scala 2.12 and Java 11. We've fixed some, but I wanted to take a pass at fixing lots of easy miscellaneous ones here.
They're too numerous and small to list here; see the pull request. Some highlights:
- `BeanInfo` is deprecated in 2.12, and BeanInfo classes are pretty ancient in Java. Instead, case classes can explicitly declare getters
- Eta expansion of zero-arg methods; foo() becomes () => foo() in many cases
- Floating-point Range is inexact and deprecated, like 0.0 to 100.0 by 1.0
- finalize() is finally deprecated (just needs to be suppressed)
- StageInfo.attempId was deprecated and easiest to remove here
I'm not now going to touch some chunks of deprecation warnings:
- Parquet deprecations
- Hive deprecations (particularly serde2 classes)
- Deprecations in generated code (mostly Thriftserver CLI)
- ProcessingTime deprecations (we may need to revive this class as internal)
- many MLlib deprecations because they concern methods that may be removed anyway
- a few Kinesis deprecations I couldn't figure out
- Mesos get/setRole, which I don't know well
- Kafka/ZK deprecations (e.g. poll())
- Kinesis
- a few other ones that will probably resolve by deleting a deprecated method
## How was this patch tested?
Existing tests, including manual testing with the 2.11 build and Java 11.
Closes#23065 from srowen/SPARK-26090.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Empty chunk in ChunkedByteBuffer will truncate the ChunkedByteBufferInputStream.
The detail reason is described in: https://issues.apache.org/jira/browse/SPARK-26068
## How was this patch tested?
Modified current UT to cover this case.
Closes#23040 from LinhongLiu/fix-empty-chunked-byte-buffer.
Lead-authored-by: Liu,Linhong <liulinhong@baidu.com>
Co-authored-by: Xianjin YE <yexianjin@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Don't propagate SPARK_CONF_DIR to the driver in mesos cluster mode.
## How was this patch tested?
I built the 2.3.2 tag with this patch added and deployed a test job to a mesos cluster to confirm that the incorrect SPARK_CONF_DIR was no longer passed from the submit command.
Closes#22937 from mpmolek/fix-conf-dir.
Authored-by: Matt Molek <mpmolek@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Add scala and java lint check rules to ban the usage of `throw new xxxErrors` and fix up all exists instance followed by https://github.com/apache/spark/pull/22989#issuecomment-437939830. See more details in https://github.com/apache/spark/pull/22969.
## How was this patch tested?
Local test with lint-scala and lint-java.
Closes#22989 from xuanyuanking/SPARK-25986.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
…. Other related changes to get JDK 11 working, to test
## What changes were proposed in this pull request?
- Access `sun.misc.Cleaner` (Java 8) and `jdk.internal.ref.Cleaner` (JDK 9+) by reflection (note: the latter only works if illegal reflective access is allowed)
- Access `sun.misc.Unsafe.invokeCleaner` in Java 9+ instead of `sun.misc.Cleaner` (Java 8)
In order to test anything on JDK 11, I also fixed a few small things, which I include here:
- Fix minor JDK 11 compile issues
- Update scala plugin, Jetty for JDK 11, to facilitate tests too
This doesn't mean JDK 11 tests all pass now, but lots do. Note also that the JDK 9+ solution for the Cleaner has a big caveat.
## How was this patch tested?
Existing tests. Manually tested JDK 11 build and tests, and tests covering this change appear to pass. All Java 8 tests should still pass, but this change alone does not achieve full JDK 11 compatibility.
Closes#22993 from srowen/SPARK-24421.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
Currently, we do not have a mechanism to collect driver logs if a user chooses
to run their application in client mode. This is a big issue as admin teams need
to create their own mechanisms to capture driver logs.
This commit adds a logger which, if enabled, adds a local log appender to the
root logger and asynchronously syncs it an application specific log file on hdfs
(Spark Driver Log Dir).
Additionally, this collects spark-shell driver logs at INFO level by default.
The change is that instead of setting root logger level to WARN, we will set the
consoleAppender threshold to WARN, in case of spark-shell. This ensures that
only WARN logs are printed on CONSOLE but other log appenders still capture INFO
(or the default log level logs).
1. Verified that logs are written to local and remote dir
2. Added a unit test case
3. Verified this for spark-shell, client mode and pyspark.
4. Verified in both non-kerberos and kerberos environment
5. Verified with following unexpected termination conditions: Ctrl + C, Driver
OOM, Large Log Files
6. Ran an application in spark-shell and ensured that driver logs were
captured at INFO level
7. Started the application at WARN level, programmatically changed the level to
INFO and ensured that logs on console were printed at INFO level
Closes#22504 from ankuriitg/ankurgupta/SPARK-25118.
Authored-by: ankurgupta <ankur.gupta@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
* Implement (optional) use of KryoPool in KryoSerializer, an alternative to the existing implementation of caching a Kryo instance inside KryoSerializerInstance
* Add config key & documentation of spark.kryo.pool in order to turn this on
* Add benchmark KryoSerializerBenchmark to compare new and old implementation
* Add results of benchmark
## How was this patch tested?
Added new tests inside KryoSerializerSuite to test the pool implementation as well as added the pool option to the existing regression testing for SPARK-7766
This is my original work and I license the work to the project under the project’s open source license.
Closes#22855 from patrickbrownsync/kryo-pool.
Authored-by: Patrick Brown <patrick.brown@blyncsy.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Deprecated in Java 11, replace Class.newInstance with Class.getConstructor.getInstance, and primtive wrapper class constructors with valueOf or equivalent
## How was this patch tested?
Existing tests.
Closes#22988 from srowen/SPARK-25984.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Currently, Spark writes Spark version number into Hive Table properties with `spark.sql.create.version`.
```
parameters:{
spark.sql.sources.schema.part.0={
"type":"struct",
"fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}}]
},
transient_lastDdlTime=1541142761,
spark.sql.sources.schema.numParts=1,
spark.sql.create.version=2.4.0
}
```
This PR aims to write Spark versions to ORC/Parquet file metadata with `org.apache.spark.sql.create.version` because we used `org.apache.` prefix in Parquet metadata already. It's different from Hive Table property key `spark.sql.create.version`, but it seems that we cannot change Hive Table property for backward compatibility.
After this PR, ORC and Parquet file generated by Spark will have the following metadata.
**ORC (`native` and `hive` implmentation)**
```
$ orc-tools meta /tmp/o
File Version: 0.12 with ...
...
User Metadata:
org.apache.spark.sql.create.version=3.0.0
```
**PARQUET**
```
$ parquet-tools meta /tmp/p
...
creator: parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a)
extra: org.apache.spark.sql.create.version = 3.0.0
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}
```
## How was this patch tested?
Pass the Jenkins with newly added test cases.
This closes#22255.
Closes#22932 from dongjoon-hyun/SPARK-25102.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
Every time a task is unschedulable because of the condition where no. of task failures < no. of executors available, we currently abort the taskSet - failing the job. This change tries to acquire new executors so that we can complete the job successfully. We try to acquire a new executor only when we can kill an existing idle executor. We fallback to the older implementation where we abort the job if we cannot find an idle executor.
## How was this patch tested?
I performed some manual tests to check and validate the behavior.
```scala
val rdd = sc.parallelize(Seq(1 to 10), 3)
import org.apache.spark.TaskContext
val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index == 2) { Thread.sleep(30 * 1000); val attemptNum = TaskContext.get.attemptNumber; if (attemptNum < 3) throw new Exception("Fail for blacklisting")}; iterator.toList.map (x => x + " -> " + index).iterator } )
mapped.collect
```
Closes#22288 from dhruve/bug/SPARK-22148.
Lead-authored-by: Dhruve Ashar <dhruveashar@gmail.com>
Co-authored-by: Dhruve Ashar <dhruve@users.noreply.github.com>
Co-authored-by: Tom Graves <tgraves@apache.org>
Signed-off-by: Thomas Graves <tgraves@apache.org>
## What changes were proposed in this pull request?
Currently definitions of config entries in `core` module are in several files separately. We should move them into `internal/config` to be easy to manage.
## How was this patch tested?
Existing tests.
Closes#22928 from ueshin/issues/SPARK-25926/single_config_file.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Fix typos and misspellings, per https://github.com/apache/spark-website/pull/158#issuecomment-435790366
## How was this patch tested?
Existing tests.
Closes#22950 from srowen/Typos.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
I saw CoarseGrainedSchedulerBackendSuite failed in my PR and finally reproduced the following error on a very busy machine:
```
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 400 times over 10.009828643999999 seconds. Last failure message: ArrayBuffer("2", "0", "3") had length 3 instead of expected length 4.
```
The logs in this test shows executor 1 was not up when the test failed.
```
18/10/30 11:34:03.563 dispatcher-event-loop-12 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.17.0.2:43656) with ID 2
18/10/30 11:34:03.593 dispatcher-event-loop-3 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.17.0.2:43658) with ID 3
18/10/30 11:34:03.629 dispatcher-event-loop-6 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.17.0.2:43654) with ID 0
18/10/30 11:34:03.885 pool-1-thread-1-ScalaTest-running-CoarseGrainedSchedulerBackendSuite INFO CoarseGrainedSchedulerBackendSuite:
===== FINISHED o.a.s.scheduler.CoarseGrainedSchedulerBackendSuite: 'compute max number of concurrent tasks can be launched' =====
```
And the following logs in executor 1 shows it was still doing the initialization when the timeout happened (at 18/10/30 11:34:03.885).
```
18/10/30 11:34:03.463 netty-rpc-connection-0 INFO TransportClientFactory: Successfully created connection to 54b6b6217301/172.17.0.2:33741 after 37 ms (0 ms spent in bootstraps)
18/10/30 11:34:03.959 main INFO DiskBlockManager: Created local directory at /home/jenkins/workspace/core/target/tmp/spark-383518bc-53bd-4d9c-885b-d881f03875bf/executor-61c406e4-178f-40a6-ac2c-7314ee6fb142/blockmgr-03fb84a1-eedc-4055-8743-682eb3ac5c67
18/10/30 11:34:03.993 main INFO MemoryStore: MemoryStore started with capacity 546.3 MB
```
Hence, I think our current 10 seconds is not enough on a slow Jenkins machine. This PR just increases the timeout from 10 seconds to 60 seconds to make the test more stable.
## How was this patch tested?
Jenkins
Closes#22910 from zsxwing/fix-flaky-test.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This avoids having two classes to deal with tokens; now the above
class is a one-stop shop for dealing with delegation tokens. The
YARN backend extends that class instead of doing composition like
before, resulting in a bit less code there too.
The renewer functionality is basically the same code that used to
be in YARN's AMCredentialRenewer. That is also the reason why the
public API of HadoopDelegationTokenManager is a little bit odd;
the YARN AM has some odd requirements for how this all should be
initialized, and the weirdness is needed currently to support that.
Tested:
- YARN with stress app for DT renewal
- Mesos and K8S with basic kerberos tests (both tgt and keytab)
Closes#22624 from vanzin/SPARK-23781.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request?
When a job finishes, there may be some zombie tasks still running due to stage retry. Since a result stage will never be used by other jobs, running these tasks are just wasting the cluster resource. This PR just asks TaskScheduler to cancel the running tasks of a result stage when it's already finished. Credits go to srinathshankar who suggested this idea to me.
This PR also fixes two minor issues while I'm touching DAGScheduler:
- Invalid spark.job.interruptOnCancel should not crash DAGScheduler.
- Non fatal errors should not crash DAGScheduler.
## How was this patch tested?
The new unit tests.
Closes#22771 from zsxwing/SPARK-25773.
Lead-authored-by: Shixiong Zhu <zsxwing@gmail.com>
Co-authored-by: Shixiong Zhu <shixiong@databricks.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
Set main args correctly in BenchmarkBase, to make it accessible for its subclass.
It will benefit:
- BuiltInDataSourceWriteBenchmark
- AvroWriteBenchmark
## How was this patch tested?
manual tests
Closes#22872 from yucai/main_args.
Authored-by: yucai <yyu1@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
`hsync` has been added as part of SPARK-19531 to get the latest data in the history sever ui, but that is causing the performance overhead and also leading to drop many history log events. `hsync` uses the force `FileChannel.force` to sync the data to the disk and happens for the data pipeline, it is costly operation and making the application to face overhead and drop the events.
I think getting the latest data in history server can be done in different way (no impact to application while writing events), there is an api `DFSInputStream.getFileLength()` which gives the file length including the `lastBlockBeingWrittenLength`(different from `FileStatus.getLen()`), this api can be used when the file status length and previously cached length are equal to verify whether any new data has been written or not, if there is any update in data length then the history server can update the in progress history log. And also I made this change as configurable with the default value false, and can be enabled for history server if users want to see the updated data in ui.
## How was this patch tested?
Added new test and verified manually, with the added conf `spark.history.fs.inProgressAbsoluteLengthCheck.enabled=true`, history server is reading the logs including the last block data which is being written and updating the Web UI with the latest data.
Closes#22752 from devaraj-kavali/SPARK-24787.
Authored-by: Devaraj K <devaraj@apache.org>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
In standalone cluster mode, one could launch driver with supervise mode
enabled. StandaloneRestServer class uses the host and port of current
master as the spark.master property while launching the driver
(even if you are running in HA mode). This class also ignores the
spark.master property passed as part of the request.
Due to the above problem, if the Spark masters switch due to some reason
and your driver is killed unexpectedly and relaunched, it will try to
connect to the master which is in the driver command specified as
-Dspark.master. But this master will be in STANDBY mode and after trying
multiple times, the SparkContext will kill itself (even though secondary
master was alive and healthy).
This change picks the spark.master property from request and uses it to
launch the driver process. Due to this, the driver process has both
masters in -Dspark.master property. Even if the masters switch, SparkContext
can still connect to the ALIVE master and work correctly.
## How was this patch tested?
This patch was manually tested on a standalone cluster running 2.2.1. It was rebased on current master and all tests were executed. I have added a unit test for this change (but since I am new I hope I have covered all).
Closes#21816 from bsikander/rest_driver_fix.
Authored-by: Behroz Sikander <behroz.sikander@sap.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Before the code changes, I tried to run it with 8G memory:
```
build/sbt -mem 8000 "core/testOnly org.apache.spark.serializer.KryoBenchmark"
```
Still I got got OOM.
This is because the lengths of the arrays are random
669ade3a8e/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala (L90-L91)
And the 2D array is usually large: `10000 * Random.nextInt(0, 10000)`
This PR is to fix it and refactor it to use main method.
The benchmark result is also reason compared to the original one.
## How was this patch tested?
Run with
```
bin/spark-submit --class org.apache.spark.serializer.KryoBenchmark core/target/scala-2.11/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar
```
and
```
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain org.apache.spark.serializer.KryoBenchmark"
Closes#22663 from gengliangwang/kyroBenchmark.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Remove JavaSparkContextVarargsWorkaround
## How was this patch tested?
Existing tests.
Closes#22729 from srowen/SPARK-25737.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Remove deprecated accumulator v1
## How was this patch tested?
Existing tests.
Closes#22730 from srowen/SPARK-16775.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>