Merge pull request #117 from stephenh/avoid_concurrent_modification_exception
Handle ConcurrentModificationExceptions in SparkContext init. System.getProperties.toMap will fail-fast when concurrently modified, and it seems like some other thread started by SparkContext does a System.setProperty during it's initialization. Handle this by just looping on ConcurrentModificationException, which seems the safest, since the non-fail-fast methods (Hastable.entrySet) have undefined behavior under concurrent modification.
This commit is contained in:
commit
8f1098a3f0
|
@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
|
||||
import scala.collection.Map
|
||||
import scala.collection.generic.Growable
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.collection.mutable.HashMap
|
||||
|
||||
|
@ -255,8 +255,10 @@ class SparkContext(
|
|||
conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
|
||||
}
|
||||
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
|
||||
for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) {
|
||||
conf.set(key.substring("spark.hadoop.".length), System.getProperty(key))
|
||||
Utils.getSystemProperties.foreach { case (key, value) =>
|
||||
if (key.startsWith("spark.hadoop.")) {
|
||||
conf.set(key.substring("spark.hadoop.".length), value)
|
||||
}
|
||||
}
|
||||
val bufferSize = System.getProperty("spark.buffer.size", "65536")
|
||||
conf.set("io.file.buffer.size", bufferSize)
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
|
|||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import java.nio.ByteBuffer
|
||||
import org.apache.spark.{SparkEnv, SparkException, Logging}
|
||||
import java.util.ConcurrentModificationException
|
||||
|
||||
|
||||
/**
|
||||
|
@ -818,4 +819,10 @@ private[spark] object Utils extends Logging {
|
|||
// Nothing else to guard against ?
|
||||
hashAbs
|
||||
}
|
||||
|
||||
/** Returns a copy of the system properties that is thread-safe to iterator over. */
|
||||
def getSystemProperties(): Map[String, String] = {
|
||||
return System.getProperties().clone()
|
||||
.asInstanceOf[java.util.Properties].toMap[String, String]
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue