[SPARK-13655] Improve isolation between tests in KinesisBackedBlockRDDSuite

This patch modifies `KinesisBackedBlockRDDTests` to increase the isolation between tests in order to fix a bug which causes the tests to hang.

See #11558 for more details.

/cc zsxwing srowen

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11564 from JoshRosen/SPARK-13655.
This commit is contained in:
Josh Rosen 2016-03-07 13:45:45 -08:00 committed by Shixiong Zhu
parent b6071a7001
commit e9e67b39ab

View file

@ -17,13 +17,13 @@
package org.apache.spark.streaming.kinesis package org.apache.spark.streaming.kinesis
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
extends KinesisFunSuite with BeforeAndAfterAll { extends KinesisFunSuite with BeforeAndAfterEach with LocalSparkContext {
private val testData = 1 to 8 private val testData = 1 to 8
@ -35,10 +35,10 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
private var shardIdToRange: Map[String, SequenceNumberRange] = null private var shardIdToRange: Map[String, SequenceNumberRange] = null
private var allRanges: Seq[SequenceNumberRange] = null private var allRanges: Seq[SequenceNumberRange] = null
private var sc: SparkContext = null
private var blockManager: BlockManager = null private var blockManager: BlockManager = null
override def beforeAll(): Unit = { override def beforeAll(): Unit = {
super.beforeAll()
runIfTestsEnabled("Prepare KinesisTestUtils") { runIfTestsEnabled("Prepare KinesisTestUtils") {
testUtils = new KPLBasedKinesisTestUtils() testUtils = new KPLBasedKinesisTestUtils()
testUtils.createStream() testUtils.createStream()
@ -55,19 +55,23 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
(shardId, seqNumRange) (shardId, seqNumRange)
} }
allRanges = shardIdToRange.values.toSeq allRanges = shardIdToRange.values.toSeq
}
}
override def beforeEach(): Unit = {
super.beforeEach()
val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite") val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite")
sc = new SparkContext(conf) sc = new SparkContext(conf)
blockManager = sc.env.blockManager blockManager = sc.env.blockManager
} }
}
override def afterAll(): Unit = { override def afterAll(): Unit = {
try {
if (testUtils != null) { if (testUtils != null) {
testUtils.deleteStream() testUtils.deleteStream()
} }
if (sc != null) { } finally {
sc.stop() super.afterAll()
} }
} }