diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 4a63ea9a86..5e741112fc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -190,8 +190,8 @@ private[spark] object KubernetesConf { } def getResourceNamePrefix(appName: String): String = { - val launchTime = System.currentTimeMillis() - s"$appName-$launchTime" + val id = KubernetesUtils.uniqueID() + s"$appName-$id" .trim .toLowerCase(Locale.ROOT) .replaceAll("\\s+", "-") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index b3f58b0323..3f7fcecb73 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -17,18 +17,23 @@ package org.apache.spark.deploy.k8s import java.io.File +import java.security.SecureRandom import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient +import org.apache.commons.codec.binary.Hex import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, SystemClock, Utils} private[spark] object KubernetesUtils extends Logging { + private val systemClock = new SystemClock() + private lazy val RNG = new SecureRandom() + /** * Extract and parse Spark configuration properties with a given name prefix and * return the result as a Map. Keys must not have more than one value. @@ -185,4 +190,23 @@ private[spark] object KubernetesUtils extends Logging { def formatTime(time: String): String = { if (time != null) time else "N/A" } + + /** + * Generates a unique ID to be used as part of identifiers. The returned ID is a hex string + * of a 64-bit value containing the 40 LSBs from the current time + 24 random bits from a + * cryptographically strong RNG. (40 bits gives about 30 years worth of "unique" timestamps.) + * + * This avoids using a UUID for uniqueness (too long), and relying solely on the current time + * (not unique enough). + */ + def uniqueID(clock: Clock = systemClock): String = { + val random = new Array[Byte](3) + synchronized { + RNG.nextBytes(random) + } + + val time = java.lang.Long.toHexString(clock.getTimeMillis() & 0xFFFFFFFFFFL) + Hex.encodeHexString(random) + time + } + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index 15671179b1..cec8769b83 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -20,14 +20,14 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} -import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.{config, Logging} import org.apache.spark.util.{Clock, SystemClock} private[spark] class DriverServiceFeatureStep( kubernetesConf: KubernetesDriverConf, - clock: Clock = new SystemClock) + clock: Clock = new SystemClock()) extends KubernetesFeatureConfigStep with Logging { import DriverServiceFeatureStep._ @@ -42,7 +42,7 @@ private[spark] class DriverServiceFeatureStep( private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { preferredServiceName } else { - val randomServiceId = clock.getTimeMillis() + val randomServiceId = KubernetesUtils.uniqueID(clock = clock) val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 822f1e3296..fbd99b73b3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ +import com.google.common.net.InternetDomainName import io.fabric8.kubernetes.api.model.Service import org.apache.spark.{SparkConf, SparkFunSuite} @@ -71,7 +72,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { val expectedServiceName = kconf.resourceNamePrefix + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX val expectedHostName = s"$expectedServiceName.my-namespace.svc" val additionalProps = configurationStep.getAdditionalPodSystemProperties() - verifySparkConfHostNames(additionalProps, expectedHostName) + assert(additionalProps(DRIVER_HOST_ADDRESS.key) === expectedHostName) } test("Ports should resolve to defaults in SparkConf and in the service.") { @@ -91,26 +92,37 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === DEFAULT_BLOCKMANAGER_PORT.toString) } - test("Long prefixes should switch to using a generated name.") { - val clock = new ManualClock() - clock.setTime(10000) + test("Long prefixes should switch to using a generated unique name.") { val sparkConf = new SparkConf(false) .set(KUBERNETES_NAMESPACE, "my-namespace") - val configurationStep = new DriverServiceFeatureStep( - KubernetesTestConf.createDriverConf( - sparkConf = sparkConf, - resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX), - labels = DRIVER_LABELS), - clock) - val driverService = configurationStep - .getAdditionalKubernetesResources() - .head - .asInstanceOf[Service] - val expectedServiceName = s"spark-10000${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}" - assert(driverService.getMetadata.getName === expectedServiceName) - val expectedHostName = s"$expectedServiceName.my-namespace.svc" - val additionalProps = configurationStep.getAdditionalPodSystemProperties() - verifySparkConfHostNames(additionalProps, expectedHostName) + val kconf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX), + labels = DRIVER_LABELS) + val clock = new ManualClock() + + // Ensure that multiple services created at the same time generate unique names. + val services = (1 to 10).map { _ => + val configurationStep = new DriverServiceFeatureStep(kconf, clock = clock) + val serviceName = configurationStep + .getAdditionalKubernetesResources() + .head + .asInstanceOf[Service] + .getMetadata + .getName + + val hostAddress = configurationStep + .getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key) + + (serviceName -> hostAddress) + }.toMap + + assert(services.size === 10) + services.foreach { case (name, address) => + assert(!name.startsWith(kconf.resourceNamePrefix)) + assert(!address.startsWith(kconf.resourceNamePrefix)) + assert(InternetDomainName.isValid(address)) + } } test("Disallow bind address and driver host to be set explicitly.") { @@ -156,10 +168,4 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { assert(driverServicePorts(1).getPort.intValue() === blockManagerPort) assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort) } - - private def verifySparkConfHostNames( - driverSparkConf: Map[String, String], expectedHostName: String): Unit = { - assert(driverSparkConf( - org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key) === expectedHostName) - } }