From d8257b99ddae23f702f312640a5335ddb4554403 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Thu, 13 Jul 2017 10:37:15 -0700 Subject: [PATCH] [SPARK-21403][MESOS] fix --packages for mesos ## What changes were proposed in this pull request? Fixes --packages flag for mesos in cluster mode. Probably I will handle standalone and Yarn in another commit, I need to investigate those cases as they are different. ## How was this patch tested? Tested with a community 1.9 dc/os cluster. packages were successfully resolved in cluster mode within a container. andrewor14 susanxhuynh ArtRand srowen pls review. Author: Stavros Kontopoulos Closes #18587 from skonto/fix_packages_mesos_cluster. --- .../org/apache/spark/deploy/SparkSubmit.scala | 91 +++++++++++-------- 1 file changed, 51 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index abde04062c..0ea14361b2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -273,6 +273,25 @@ object SparkSubmit extends CommandLineUtils { } } + // Fail fast, the following modes are not supported or applicable + (clusterManager, deployMode) match { + case (STANDALONE, CLUSTER) if args.isPython => + printErrorAndExit("Cluster deploy mode is currently not supported for python " + + "applications on standalone clusters.") + case (STANDALONE, CLUSTER) if args.isR => + printErrorAndExit("Cluster deploy mode is currently not supported for R " + + "applications on standalone clusters.") + case (LOCAL, CLUSTER) => + printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") + case (_, CLUSTER) if isShell(args.primaryResource) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") + case (_, CLUSTER) if isSqlShell(args.mainClass) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.") + case (_, CLUSTER) if isThriftServer(args.mainClass) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.") + case _ => + } + // Update args.deployMode if it is null. It will be passed down as a Spark property later. (args.deployMode, deployMode) match { case (null, CLIENT) => args.deployMode = "client" @@ -282,36 +301,40 @@ object SparkSubmit extends CommandLineUtils { val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER - // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files - // too for packages that include Python code - val exclusions: Seq[String] = + if (!isMesosCluster) { + // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files + // too for packages that include Python code + val exclusions: Seq[String] = if (!StringUtils.isBlank(args.packagesExclusions)) { args.packagesExclusions.split(",") } else { Nil } - // Create the IvySettings, either load from file or build defaults - val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile => - SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories), - Option(args.ivyRepoPath)) - }.getOrElse { - SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath)) - } - - val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, - ivySettings, exclusions = exclusions) - if (!StringUtils.isBlank(resolvedMavenCoordinates)) { - args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) - if (args.isPython) { - args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + // Create the IvySettings, either load from file or build defaults + val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile => + SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories), + Option(args.ivyRepoPath)) + }.getOrElse { + SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath)) } - } - // install any R packages that may have been passed through --jars or --packages. - // Spark Packages may contain R source code inside the jar. - if (args.isR && !StringUtils.isBlank(args.jars)) { - RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) + val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, + ivySettings, exclusions = exclusions) + + + if (!StringUtils.isBlank(resolvedMavenCoordinates)) { + args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) + if (args.isPython) { + args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + } + } + + // install any R packages that may have been passed through --jars or --packages. + // Spark Packages may contain R source code inside the jar. + if (args.isR && !StringUtils.isBlank(args.jars)) { + RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) + } } val hadoopConf = new HadoopConfiguration() @@ -343,24 +366,6 @@ object SparkSubmit extends CommandLineUtils { }.orNull } - // The following modes are not supported or applicable - (clusterManager, deployMode) match { - case (STANDALONE, CLUSTER) if args.isPython => - printErrorAndExit("Cluster deploy mode is currently not supported for python " + - "applications on standalone clusters.") - case (STANDALONE, CLUSTER) if args.isR => - printErrorAndExit("Cluster deploy mode is currently not supported for R " + - "applications on standalone clusters.") - case (LOCAL, CLUSTER) => - printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") - case (_, CLUSTER) if isShell(args.primaryResource) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") - case (_, CLUSTER) if isSqlShell(args.mainClass) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.") - case (_, CLUSTER) if isThriftServer(args.mainClass) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.") - case _ => - } // If we're running a python app, set the main class to our specific python runner if (args.isPython && deployMode == CLIENT) { @@ -468,6 +473,12 @@ object SparkSubmit extends CommandLineUtils { OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraLibraryPath"), + // Mesos only - propagate attributes for dependency resolution at the driver side + OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"), + OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"), + OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"), + OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"), + // Yarn only OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,