From a6ae2b48320d367be5fede60687331ce0d563d00 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sun, 27 Oct 2013 13:35:04 -0500 Subject: [PATCH 1/3] Handle ConcurrentModificationExceptions in SparkContext init. System.getProperties.toMap will fail-fast when concurrently modified, and it seems like some other thread started by SparkContext does a System.setProperty during it's initialization. Handle this by just looping on ConcurrentModificationException, which seems the safest, since the non-fail-fast methods (Hastable.entrySet) have undefined behavior under concurrent modification. --- .../src/main/scala/org/apache/spark/SparkContext.scala | 6 +++--- core/src/main/scala/org/apache/spark/util/Utils.scala | 10 ++++++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 564466cfd5..d694dfe4d9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map import scala.collection.generic.Growable -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -248,8 +248,8 @@ class SparkContext( conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) } // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) { - conf.set(key.substring("spark.hadoop.".length), System.getProperty(key)) + Utils.getSystemProperties.foreach { case (key, value) if key.startsWith("spark.hadoop.") => + conf.set(key.substring("spark.hadoop.".length), value) } val bufferSize = System.getProperty("spark.buffer.size", "65536") conf.set("io.file.buffer.size", bufferSize) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a3b3968c5e..d637a0a91d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -37,6 +37,7 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, import org.apache.spark.deploy.SparkHadoopUtil import java.nio.ByteBuffer import org.apache.spark.{SparkEnv, SparkException, Logging} +import java.util.ConcurrentModificationException /** @@ -819,4 +820,13 @@ private[spark] object Utils extends Logging { // Nothing else to guard against ? hashAbs } + + /** Returns a copy of the system properties that is thread-safe to iterator over. */ + def getSystemProperties(): Map[String, String] = { + try { + return System.getProperties().toMap[String, String] + } catch { + case e: ConcurrentModificationException => getSystemProperties() + } + } } From 3a388c320c5079bec44fe51d2f8218af2e56d98e Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 29 Oct 2013 19:20:40 -0500 Subject: [PATCH 2/3] Use Properties.clone() instead. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d637a0a91d..b20c0e5308 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -823,10 +823,7 @@ private[spark] object Utils extends Logging { /** Returns a copy of the system properties that is thread-safe to iterator over. */ def getSystemProperties(): Map[String, String] = { - try { - return System.getProperties().toMap[String, String] - } catch { - case e: ConcurrentModificationException => getSystemProperties() - } + return System.getProperties().clone() + .asInstanceOf[java.util.Properties].toMap[String, String] } } From 09f3b677cb7cce08882ea030e9af5798a63046ba Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Wed, 30 Oct 2013 12:29:39 -0500 Subject: [PATCH 3/3] Avoid match errors when filtering for spark.hadoop settings. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d694dfe4d9..28ac49a24a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -248,8 +248,10 @@ class SparkContext( conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) } // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - Utils.getSystemProperties.foreach { case (key, value) if key.startsWith("spark.hadoop.") => - conf.set(key.substring("spark.hadoop.".length), value) + Utils.getSystemProperties.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + conf.set(key.substring("spark.hadoop.".length), value) + } } val bufferSize = System.getProperty("spark.buffer.size", "65536") conf.set("io.file.buffer.size", bufferSize)