[SPARK-13747][SQL] Fix concurrent query with fork-join pool
## What changes were proposed in this pull request? Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264: ``` (1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() } ``` This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA. ## How was this patch tested? New test in `SQLExecutionSuite`. Author: Andrew Or <andrew@databricks.com> Closes #11586 from andrewor14/fix-concurrent-sql.
This commit is contained in:
parent
dbf2a7cfad
commit
37fcda3e6c
|
@ -613,7 +613,12 @@ class DAGScheduler(
|
|||
properties: Properties): Unit = {
|
||||
val start = System.nanoTime
|
||||
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
|
||||
Await.ready(waiter.completionFuture, atMost = Duration.Inf)
|
||||
// Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
|
||||
// which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
|
||||
// due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
|
||||
// safe to pass in null here. For more detail, see SPARK-13747.
|
||||
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
|
||||
waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
|
||||
waiter.completionFuture.value.get match {
|
||||
case scala.util.Success(_) =>
|
||||
logInfo("Job %d finished: %s, took %f s".format
|
||||
|
|
|
@ -49,6 +49,20 @@ class SQLExecutionSuite extends SparkFunSuite {
|
|||
}
|
||||
}
|
||||
|
||||
test("concurrent query execution with fork-join pool (SPARK-13747)") {
|
||||
val sc = new SparkContext("local[*]", "test")
|
||||
val sqlContext = new SQLContext(sc)
|
||||
import sqlContext.implicits._
|
||||
try {
|
||||
// Should not throw IllegalArgumentException
|
||||
(1 to 100).par.foreach { _ =>
|
||||
sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
|
||||
}
|
||||
} finally {
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger SPARK-10548 by mocking a parent and its child thread executing queries concurrently.
|
||||
*/
|
||||
|
|
Loading…
Reference in a new issue