[SPARK-6537] UIWorkloadGenerator: The main thread should not stop SparkContext until all jobs finish

The main thread of UIWorkloadGenerator spawn sub threads to launch jobs but the main thread stop SparkContext without waiting for finishing those threads.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #5187 from sarutak/SPARK-6537 and squashes the following commits:

4e9307a [Kousuke Saruta] Fixed UIWorkloadGenerator so that the main thread stop SparkContext after all jobs finish
This commit is contained in:
Kousuke Saruta 2015-03-25 13:27:15 -07:00 committed by Andrew Or
parent 883b7e9030
commit acef51defb

View file

@ -17,6 +17,8 @@
package org.apache.spark.ui package org.apache.spark.ui
import java.util.concurrent.Semaphore
import scala.util.Random import scala.util.Random
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext}
@ -88,6 +90,8 @@ private[spark] object UIWorkloadGenerator {
("Job with delays", baseData.map(x => Thread.sleep(100)).count) ("Job with delays", baseData.map(x => Thread.sleep(100)).count)
) )
val barrier = new Semaphore(-nJobSet * jobs.size + 1)
(1 to nJobSet).foreach { _ => (1 to nJobSet).foreach { _ =>
for ((desc, job) <- jobs) { for ((desc, job) <- jobs) {
new Thread { new Thread {
@ -99,12 +103,17 @@ private[spark] object UIWorkloadGenerator {
} catch { } catch {
case e: Exception => case e: Exception =>
println("Job Failed: " + desc) println("Job Failed: " + desc)
} finally {
barrier.release()
} }
} }
}.start }.start
Thread.sleep(INTER_JOB_WAIT_MS) Thread.sleep(INTER_JOB_WAIT_MS)
} }
} }
// Waiting for threads.
barrier.acquire()
sc.stop() sc.stop()
} }
} }