[SPARK-21403][MESOS] fix --packages for mesos

## What changes were proposed in this pull request?
Fixes --packages flag for mesos in cluster mode. Probably I will handle standalone and Yarn in another commit, I need to investigate those cases as they are different.

## How was this patch tested?
Tested with a community 1.9 dc/os cluster. packages were successfully resolved in cluster mode within a container.

andrewor14  susanxhuynh ArtRand srowen  pls review.

Author: Stavros Kontopoulos <st.kontopoulos@gmail.com>

Closes #18587 from skonto/fix_packages_mesos_cluster.
This commit is contained in:
Stavros Kontopoulos 2017-07-13 10:37:15 -07:00 committed by Marcelo Vanzin
parent af80e01b57
commit d8257b99dd

View file

@ -273,6 +273,25 @@ object SparkSubmit extends CommandLineUtils {
}
}
// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
}
// Update args.deployMode if it is null. It will be passed down as a Spark property later.
(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
@ -282,36 +301,40 @@ object SparkSubmit extends CommandLineUtils {
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val exclusions: Seq[String] =
if (!isMesosCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val exclusions: Seq[String] =
if (!StringUtils.isBlank(args.packagesExclusions)) {
args.packagesExclusions.split(",")
} else {
Nil
}
// Create the IvySettings, either load from file or build defaults
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
Option(args.ivyRepoPath))
}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
}
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
ivySettings, exclusions = exclusions)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
// Create the IvySettings, either load from file or build defaults
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
Option(args.ivyRepoPath))
}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
}
}
// install any R packages that may have been passed through --jars or --packages.
// Spark Packages may contain R source code inside the jar.
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
ivySettings, exclusions = exclusions)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
}
}
// install any R packages that may have been passed through --jars or --packages.
// Spark Packages may contain R source code inside the jar.
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
}
}
val hadoopConf = new HadoopConfiguration()
@ -343,24 +366,6 @@ object SparkSubmit extends CommandLineUtils {
}.orNull
}
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
}
// If we're running a python app, set the main class to our specific python runner
if (args.isPython && deployMode == CLIENT) {
@ -468,6 +473,12 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),
// Mesos only - propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"),
OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"),
// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,