diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index ab1bf69e27..eee6e4b28a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -27,13 +27,15 @@ import org.apache.spark.SparkContext import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} private[spark] object PythonUtils { + val PY4J_ZIP_NAME = "py4j-0.10.8.1-src.zip" + /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */ def sparkPythonPath: String = { val pythonPath = new ArrayBuffer[String] for (sparkHome <- sys.env.get("SPARK_HOME")) { pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator) pythonPath += - Seq(sparkHome, "python", "lib", "py4j-0.10.8.1-src.zip").mkString(File.separator) + Seq(sparkHome, "python", "lib", PY4J_ZIP_NAME).mkString(File.separator) } pythonPath ++= SparkContext.jarOfObject(this) pythonPath.mkString(File.pathSeparator) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b26e56a68f..0eade29aca 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.api.python.PythonUtils import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil} import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ @@ -1201,7 +1202,7 @@ private[spark] class Client( val pyArchivesFile = new File(pyLibPath, "pyspark.zip") require(pyArchivesFile.exists(), s"$pyArchivesFile not found; cannot run pyspark application in YARN mode.") - val py4jFile = new File(pyLibPath, "py4j-0.10.8.1-src.zip") + val py4jFile = new File(pyLibPath, PythonUtils.PY4J_ZIP_NAME) require(py4jFile.exists(), s"$py4jFile not found; cannot run pyspark application in YARN mode.") Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath()) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index c5142fa550..7264e2e51e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -32,6 +32,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.api.python.PythonUtils import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging @@ -269,7 +270,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { // needed locations. val sparkHome = sys.props("spark.test.home") val pythonPath = Seq( - s"$sparkHome/python/lib/py4j-0.10.8.1-src.zip", + s"$sparkHome/python/lib/${PythonUtils.PY4J_ZIP_NAME}", s"$sparkHome/python") val extraEnvVars = Map( "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala index 9e3b1ca77b..3228a2bd8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import scala.util.Try import org.apache.spark.TestUtils -import org.apache.spark.api.python.{PythonBroadcast, PythonEvalType, PythonFunction} +import org.apache.spark.api.python.{PythonBroadcast, PythonEvalType, PythonFunction, PythonUtils} import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.config.Tests import org.apache.spark.sql.catalyst.plans.SQLHelper @@ -74,7 +74,7 @@ object IntegratedUDFTestUtils extends SQLHelper { // It is possible the test is being ran without the build. private lazy val sourcePath = Paths.get(sparkHome, "python").toAbsolutePath private lazy val py4jPath = Paths.get( - sparkHome, "python", "lib", "py4j-0.10.8.1-src.zip").toAbsolutePath + sparkHome, "python", "lib", PythonUtils.PY4J_ZIP_NAME).toAbsolutePath private lazy val pysparkPythonPath = s"$py4jPath:$sourcePath" private lazy val isPythonAvailable: Boolean = TestUtils.testCommandAvailable(pythonExec)