[SPARK-14685][CORE] Document heritability of localProperties
## What changes were proposed in this pull request? This updates the java-/scala- doc for setLocalProperty to document heritability of localProperties. This also adds tests for that behaviour. ## How was this patch tested? Tests pass. New tests were added. Author: Marcin Tustin <marcin.tustin@gmail.com> Closes #12455 from marcintustin/SPARK-14685.
This commit is contained in:
parent
4e3685ae5e
commit
8028f3a0b4
|
@ -608,6 +608,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
|||
* scheduler pool. User-defined properties may also be set here. These properties are propagated
|
||||
* through to worker tasks and can be accessed there via
|
||||
* [[org.apache.spark.TaskContext#getLocalProperty]].
|
||||
*
|
||||
* These properties are inherited by child threads spawned from this thread. This
|
||||
* may have unexpected consequences when working with thread pools. The standard java
|
||||
* implementation of thread pools have worker threads spawn other worker threads.
|
||||
* As a result, local properties may propagate unpredictably.
|
||||
*/
|
||||
def setLocalProperty(key: String, value: String) {
|
||||
if (value == null) {
|
||||
|
|
|
@ -712,8 +712,13 @@ class JavaSparkContext(val sc: SparkContext)
|
|||
}
|
||||
|
||||
/**
|
||||
* Set a local property that affects jobs submitted from this thread, such as the
|
||||
* Spark fair scheduler pool.
|
||||
* Set a local property that affects jobs submitted from this thread, and all child
|
||||
* threads, such as the Spark fair scheduler pool.
|
||||
*
|
||||
* These properties are inherited by child threads spawned from this thread. This
|
||||
* may have unexpected consequences when working with thread pools. The standard java
|
||||
* implementation of thread pools have worker threads spawn other worker threads.
|
||||
* As a result, local properties may propagate unpredictably.
|
||||
*/
|
||||
def setLocalProperty(key: String, value: String): Unit = sc.setLocalProperty(key, value)
|
||||
|
||||
|
|
|
@ -323,4 +323,32 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
|
|||
assert(sc.getConf.getInt("spark.executor.instances", 0) === 6)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
test("localProperties are inherited by spawned threads.") {
|
||||
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
|
||||
sc.setLocalProperty("testProperty", "testValue")
|
||||
var result = "unset";
|
||||
val thread = new Thread() { override def run() = {result = sc.getLocalProperty("testProperty")}}
|
||||
thread.start()
|
||||
thread.join()
|
||||
sc.stop()
|
||||
assert(result == "testValue")
|
||||
}
|
||||
|
||||
test("localProperties do not cross-talk between threads.") {
|
||||
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
|
||||
var result = "unset";
|
||||
val thread1 = new Thread() {
|
||||
override def run() = {sc.setLocalProperty("testProperty", "testValue")}}
|
||||
// testProperty should be unset and thus return null
|
||||
val thread2 = new Thread() {
|
||||
override def run() = {result = sc.getLocalProperty("testProperty")}}
|
||||
thread1.start()
|
||||
thread1.join()
|
||||
thread2.start()
|
||||
thread2.join()
|
||||
sc.stop()
|
||||
assert(result == null)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue