[SPARK-6703][Core] Provide a way to discover existing SparkContext's

I've added a static getOrCreate method to the static SparkContext object that allows one to either retrieve a previously created SparkContext or to instantiate a new one with the provided config. The method accepts an optional SparkConf to make usage intuitive.

Still working on a test for this, basically want to create a new context from scratch, then ensure that subsequent calls don't overwrite that.

Author: Ilya Ganelin <ilya.ganelin@capitalone.com>

Closes #5501 from ilganeli/SPARK-6703 and squashes the following commits:

db9a963 [Ilya Ganelin] Closing second spark context
1dc0444 [Ilya Ganelin] Added ref equality check
8c884fa [Ilya Ganelin] Made getOrCreate synchronized
cb0c6b7 [Ilya Ganelin] Doc updates and code cleanup
270cfe3 [Ilya Ganelin] [SPARK-6703] Documentation fixes
15e8dea [Ilya Ganelin] Updated comments and added MiMa Exclude
0e1567c [Ilya Ganelin] Got rid of unecessary option for AtomicReference
dfec4da [Ilya Ganelin] Changed activeContext to AtomicReference
733ec9f [Ilya Ganelin] Fixed some bugs in test code
8be2f83 [Ilya Ganelin] Replaced match with if
e92caf7 [Ilya Ganelin] [SPARK-6703] Added test to ensure that getOrCreate both allows creation, retrieval, and a second context if desired
a99032f [Ilya Ganelin] Spacing fix
d7a06b8 [Ilya Ganelin] Updated SparkConf class to add getOrCreate method. Started test suite implementation
This commit is contained in:
Ilya Ganelin 2015-04-17 18:28:42 -07:00 committed by Andrew Or
parent a452c59210
commit c5ed510135
3 changed files with 66 additions and 7 deletions

View file

@ -23,7 +23,7 @@ import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicInteger}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
@ -1887,11 +1887,12 @@ object SparkContext extends Logging {
private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()
/**
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `None`.
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`.
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
*/
private var activeContext: Option[SparkContext] = None
private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)
/**
* Points to a partially-constructed SparkContext if some thread is in the SparkContext
@ -1926,7 +1927,8 @@ object SparkContext extends Logging {
logWarning(warnMsg)
}
activeContext.foreach { ctx =>
if (activeContext.get() != null) {
val ctx = activeContext.get()
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
" To ignore this error, set spark.driver.allowMultipleContexts = true. " +
s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
@ -1941,6 +1943,39 @@ object SparkContext extends Logging {
}
}
/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(config: SparkConf): SparkContext = {
// Synchronize to ensure that multiple create requests don't trigger an exception
// from assertNoOtherContextIsRunning within setActiveContext
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
}
activeContext.get()
}
}
/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
* This method allows not passing a SparkConf (useful if just retrieving).
*
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(): SparkContext = {
getOrCreate(new SparkConf())
}
/**
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
* running. Throws an exception if a running context is detected and logs a warning if another
@ -1967,7 +2002,7 @@ object SparkContext extends Logging {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
contextBeingConstructed = None
activeContext = Some(sc)
activeContext.set(sc)
}
}
@ -1978,7 +2013,7 @@ object SparkContext extends Logging {
*/
private[spark] def clearActiveContext(): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
activeContext = None
activeContext.set(null)
}
}

View file

@ -67,6 +67,26 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
}
}
test("Test getOrCreate") {
var sc2: SparkContext = null
SparkContext.clearActiveContext()
val conf = new SparkConf().setAppName("test").setMaster("local")
sc = SparkContext.getOrCreate(conf)
assert(sc.getConf.get("spark.app.name").equals("test"))
sc2 = SparkContext.getOrCreate(new SparkConf().setAppName("test2").setMaster("local"))
assert(sc2.getConf.get("spark.app.name").equals("test"))
assert(sc === sc2)
assert(sc eq sc2)
// Try creating second context to confirm that it's still possible, if desired
sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local")
.set("spark.driver.allowMultipleContexts", "true"))
sc2.stop()
}
test("BytesWritable implicit conversion is correct") {
// Regression test for SPARK-3121
val bytesWritable = new BytesWritable()

View file

@ -68,6 +68,10 @@ object MimaExcludes {
// SPARK-6693 add tostring with max lines and width for matrix
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.linalg.Matrix.toString")
)++ Seq(
// SPARK-6703 Add getOrCreate method to SparkContext
ProblemFilters.exclude[IncompatibleResultTypeProblem]
("org.apache.spark.SparkContext.org$apache$spark$SparkContext$$activeContext")
)
case v if v.startsWith("1.3") =>