[SPARK-5341] Use maven coordinates as dependencies in spark-shell and spark-submit
This PR adds support for using maven coordinates as dependencies to spark-shell.
Coordinates can be provided as a comma-delimited string after the flag `--packages`.
Additional remote repositories (like SonaType) can be supplied as a comma-delimited string after the flag
`--repositories`.
Uses the Ivy library to resolve dependencies. Unfortunately the library has no decent documentation, therefore solving more complex dependency issues can be a problem.
pwendell, mateiz, mengxr
**Note: This is still a WIP. The following need to be handled:**
- [x] add docs for the methods
- [x] take local ivy cache path as an argument
- [x] add tests
- [x] add Windows compatibility
- [x] exclude unused Ivy dependencies
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #4215 from brkyvz/SPARK-5341ivy and squashes the following commits:
9215851 [Burak Yavuz] ready to merge
db2a5cc [Burak Yavuz] changed logging to printStream
9dae87f [Burak Yavuz] file separators changed
71c374d [Burak Yavuz] merge conflicts fixed
c08dc9f [Burak Yavuz] fixed merge conflicts
3ada19a [Burak Yavuz] fixed Jenkins error (hopefully) and added comment on oro
43c2290 [Burak Yavuz] fixed that ONE line
231f72f [Burak Yavuz] addressed code review
2cd6562 [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-5341ivy
85ec5a3 [Burak Yavuz] added oro as a dependency explicitly
ea44ca4 [Burak Yavuz] add oro back to dependencies
cef0e24 [Burak Yavuz] IntelliJ is just messing things up
97c4a92 [Burak Yavuz] fix more weird IntelliJ formatting
9cf077d [Burak Yavuz] fix weird IntelliJ formatting
dcf5e13 [Burak Yavuz] fix windows command line flags
3a23f21 [Burak Yavuz] excluded ivy dependencies
53423e0
[Burak Yavuz] tests added
3705907 [Burak Yavuz] remove ivy-repo as a command line argument. Use global ivy cache as default
c04d885 [Burak Yavuz] take path to ivy cache as a conf
2edc9b5 [Burak Yavuz] managed to exclude Spark and it's dependencies
a0870af [Burak Yavuz] add docs. remove unnecesary new lines
6645af4 [Burak Yavuz] [SPARK-5341] added base implementation
882c4c8 [Burak Yavuz] added maven dependency download
This commit is contained in:
parent
83de71c45b
commit
6aed719e50
|
@ -26,14 +26,14 @@ function gatherSparkSubmitOpts() {
|
|||
exit 1
|
||||
fi
|
||||
|
||||
# NOTE: If you add or remove spark-sumbmit options,
|
||||
# NOTE: If you add or remove spark-submit options,
|
||||
# modify NOT ONLY this script but also SparkSubmitArgument.scala
|
||||
SUBMISSION_OPTS=()
|
||||
APPLICATION_OPTS=()
|
||||
while (($#)); do
|
||||
case "$1" in
|
||||
--master | --deploy-mode | --class | --name | --jars | --py-files | --files | \
|
||||
--conf | --properties-file | --driver-memory | --driver-java-options | \
|
||||
--master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
|
||||
--conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
|
||||
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
|
||||
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
|
||||
if [[ $# -lt 2 ]]; then
|
||||
|
|
|
@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p
|
|||
SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>"
|
||||
SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
|
||||
SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
|
||||
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\>"
|
||||
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"
|
||||
|
||||
echo %1 | findstr %opts% >nul
|
||||
if %ERRORLEVEL% equ 0 (
|
||||
|
|
11
core/pom.xml
11
core/pom.xml
|
@ -241,6 +241,17 @@
|
|||
<artifactId>derby</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.ivy</groupId>
|
||||
<artifactId>ivy</artifactId>
|
||||
<version>${ivy.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>oro</groupId>
|
||||
<!-- oro is needed by ivy, but only listed as an optional dependency, so we include it. -->
|
||||
<artifactId>oro</artifactId>
|
||||
<version>${oro.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.tachyonproject</groupId>
|
||||
<artifactId>tachyon-client</artifactId>
|
||||
|
|
|
@ -1231,7 +1231,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
|||
null
|
||||
}
|
||||
} else {
|
||||
env.httpFileServer.addJar(new File(uri.getPath))
|
||||
try {
|
||||
env.httpFileServer.addJar(new File(uri.getPath))
|
||||
} catch {
|
||||
case exc: FileNotFoundException =>
|
||||
logError(s"Jar not found at $path")
|
||||
null
|
||||
case e: Exception =>
|
||||
// For now just log an error but allow to go through so spark examples work.
|
||||
// The spark examples don't really need the jar distributed since its also
|
||||
// the app jar.
|
||||
logError("Error adding jar (" + e + "), was the --addJars option used?")
|
||||
null
|
||||
}
|
||||
}
|
||||
// A JAR file which exists locally on every worker node
|
||||
case "local" =>
|
||||
|
|
|
@ -25,6 +25,17 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
|
|||
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.ivy.Ivy
|
||||
import org.apache.ivy.core.LogOptions
|
||||
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
|
||||
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
|
||||
import org.apache.ivy.core.report.ResolveReport
|
||||
import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions}
|
||||
import org.apache.ivy.core.retrieve.RetrieveOptions
|
||||
import org.apache.ivy.core.settings.IvySettings
|
||||
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
|
||||
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
|
||||
|
||||
import org.apache.spark.executor.ExecutorURLClassLoader
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
@ -194,6 +205,18 @@ object SparkSubmit {
|
|||
// Special flag to avoid deprecation warnings at the client
|
||||
sysProps("SPARK_SUBMIT") = "true"
|
||||
|
||||
// Resolve maven dependencies if there are any and add classpath to jars
|
||||
val resolvedMavenCoordinates =
|
||||
SparkSubmitUtils.resolveMavenCoordinates(
|
||||
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
|
||||
if (!resolvedMavenCoordinates.trim.isEmpty) {
|
||||
if (args.jars == null || args.jars.trim.isEmpty) {
|
||||
args.jars = resolvedMavenCoordinates
|
||||
} else {
|
||||
args.jars += s",$resolvedMavenCoordinates"
|
||||
}
|
||||
}
|
||||
|
||||
// A list of rules to map each argument to system properties or command-line options in
|
||||
// each deploy mode; we iterate through these below
|
||||
val options = List[OptionAssigner](
|
||||
|
@ -202,6 +225,7 @@ object SparkSubmit {
|
|||
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
|
||||
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
|
||||
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
|
||||
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
|
||||
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
|
||||
sysProp = "spark.driver.memory"),
|
||||
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
|
||||
|
@ -213,6 +237,7 @@ object SparkSubmit {
|
|||
|
||||
// Standalone cluster only
|
||||
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
|
||||
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
|
||||
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
|
||||
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
|
||||
|
||||
|
@ -384,8 +409,8 @@ object SparkSubmit {
|
|||
case e: ClassNotFoundException =>
|
||||
e.printStackTrace(printStream)
|
||||
if (childMainClass.contains("thriftserver")) {
|
||||
println(s"Failed to load main class $childMainClass.")
|
||||
println("You need to build Spark with -Phive and -Phive-thriftserver.")
|
||||
printStream.println(s"Failed to load main class $childMainClass.")
|
||||
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
|
||||
}
|
||||
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
|
||||
}
|
||||
|
@ -475,6 +500,194 @@ object SparkSubmit {
|
|||
}
|
||||
}
|
||||
|
||||
/** Provides utility functions to be used inside SparkSubmit. */
|
||||
private[spark] object SparkSubmitUtils {
|
||||
|
||||
// Exposed for testing
|
||||
private[spark] var printStream = SparkSubmit.printStream
|
||||
|
||||
/**
|
||||
* Represents a Maven Coordinate
|
||||
* @param groupId the groupId of the coordinate
|
||||
* @param artifactId the artifactId of the coordinate
|
||||
* @param version the version of the coordinate
|
||||
*/
|
||||
private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String)
|
||||
|
||||
/**
|
||||
* Extracts maven coordinates from a comma-delimited string
|
||||
* @param coordinates Comma-delimited string of maven coordinates
|
||||
* @return Sequence of Maven coordinates
|
||||
*/
|
||||
private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
|
||||
coordinates.split(",").map { p =>
|
||||
val splits = p.split(":")
|
||||
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
|
||||
s"'groupId:artifactId:version'. The coordinate provided is: $p")
|
||||
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
|
||||
s"be whitespace. The groupId provided is: ${splits(0)}")
|
||||
require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
|
||||
s"be whitespace. The artifactId provided is: ${splits(1)}")
|
||||
require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
|
||||
s"be whitespace. The version provided is: ${splits(2)}")
|
||||
new MavenCoordinate(splits(0), splits(1), splits(2))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts maven coordinates from a comma-delimited string
|
||||
* @param remoteRepos Comma-delimited string of remote repositories
|
||||
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
|
||||
*/
|
||||
private[spark] def createRepoResolvers(remoteRepos: Option[String]): ChainResolver = {
|
||||
// We need a chain resolver if we want to check multiple repositories
|
||||
val cr = new ChainResolver
|
||||
cr.setName("list")
|
||||
|
||||
// the biblio resolver resolves POM declared dependencies
|
||||
val br: IBiblioResolver = new IBiblioResolver
|
||||
br.setM2compatible(true)
|
||||
br.setUsepoms(true)
|
||||
br.setName("central")
|
||||
cr.add(br)
|
||||
|
||||
val repositoryList = remoteRepos.getOrElse("")
|
||||
// add any other remote repositories other than maven central
|
||||
if (repositoryList.trim.nonEmpty) {
|
||||
repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
|
||||
val brr: IBiblioResolver = new IBiblioResolver
|
||||
brr.setM2compatible(true)
|
||||
brr.setUsepoms(true)
|
||||
brr.setRoot(repo)
|
||||
brr.setName(s"repo-${i + 1}")
|
||||
cr.add(brr)
|
||||
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
|
||||
}
|
||||
}
|
||||
cr
|
||||
}
|
||||
|
||||
/**
|
||||
* Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
|
||||
* (will append to jars in SparkSubmit). The name of the jar is given
|
||||
* after a '!' by Ivy. It also sometimes contains '(bundle)' after '.jar'. Remove that as well.
|
||||
* @param artifacts Sequence of dependencies that were resolved and retrieved
|
||||
* @param cacheDirectory directory where jars are cached
|
||||
* @return a comma-delimited list of paths for the dependencies
|
||||
*/
|
||||
private[spark] def resolveDependencyPaths(
|
||||
artifacts: Array[AnyRef],
|
||||
cacheDirectory: File): String = {
|
||||
artifacts.map { artifactInfo =>
|
||||
val artifactString = artifactInfo.toString
|
||||
val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1)
|
||||
cacheDirectory.getAbsolutePath + File.separator +
|
||||
jarName.substring(0, jarName.lastIndexOf(".jar") + 4)
|
||||
}.mkString(",")
|
||||
}
|
||||
|
||||
/** Adds the given maven coordinates to Ivy's module descriptor. */
|
||||
private[spark] def addDependenciesToIvy(
|
||||
md: DefaultModuleDescriptor,
|
||||
artifacts: Seq[MavenCoordinate],
|
||||
ivyConfName: String): Unit = {
|
||||
artifacts.foreach { mvn =>
|
||||
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
|
||||
val dd = new DefaultDependencyDescriptor(ri, false, false)
|
||||
dd.addDependencyConfiguration(ivyConfName, ivyConfName)
|
||||
printStream.println(s"${dd.getDependencyId} added as a dependency")
|
||||
md.addDependency(dd)
|
||||
}
|
||||
}
|
||||
|
||||
/** A nice function to use in tests as well. Values are dummy strings. */
|
||||
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
|
||||
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))
|
||||
|
||||
/**
|
||||
* Resolves any dependencies that were supplied through maven coordinates
|
||||
* @param coordinates Comma-delimited string of maven coordinates
|
||||
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
|
||||
* @param ivyPath The path to the local ivy repository
|
||||
* @return The comma-delimited path to the jars of the given maven artifacts including their
|
||||
* transitive dependencies
|
||||
*/
|
||||
private[spark] def resolveMavenCoordinates(
|
||||
coordinates: String,
|
||||
remoteRepos: Option[String],
|
||||
ivyPath: Option[String],
|
||||
isTest: Boolean = false): String = {
|
||||
if (coordinates == null || coordinates.trim.isEmpty) {
|
||||
""
|
||||
} else {
|
||||
val artifacts = extractMavenCoordinates(coordinates)
|
||||
// Default configuration name for ivy
|
||||
val ivyConfName = "default"
|
||||
// set ivy settings for location of cache
|
||||
val ivySettings: IvySettings = new IvySettings
|
||||
// Directories for caching downloads through ivy and storing the jars when maven coordinates
|
||||
// are supplied to spark-submit
|
||||
val alternateIvyCache = ivyPath.getOrElse("")
|
||||
val packagesDirectory: File =
|
||||
if (alternateIvyCache.trim.isEmpty) {
|
||||
new File(ivySettings.getDefaultIvyUserDir, "jars")
|
||||
} else {
|
||||
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
|
||||
new File(alternateIvyCache, "jars")
|
||||
}
|
||||
printStream.println(
|
||||
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
|
||||
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
|
||||
// create a pattern matcher
|
||||
ivySettings.addMatcher(new GlobPatternMatcher)
|
||||
// create the dependency resolvers
|
||||
val repoResolver = createRepoResolvers(remoteRepos)
|
||||
ivySettings.addResolver(repoResolver)
|
||||
ivySettings.setDefaultResolver(repoResolver.getName)
|
||||
|
||||
val ivy = Ivy.newInstance(ivySettings)
|
||||
// Set resolve options to download transitive dependencies as well
|
||||
val resolveOptions = new ResolveOptions
|
||||
resolveOptions.setTransitive(true)
|
||||
val retrieveOptions = new RetrieveOptions
|
||||
// Turn downloading and logging off for testing
|
||||
if (isTest) {
|
||||
resolveOptions.setDownload(false)
|
||||
resolveOptions.setLog(LogOptions.LOG_QUIET)
|
||||
retrieveOptions.setLog(LogOptions.LOG_QUIET)
|
||||
} else {
|
||||
resolveOptions.setDownload(true)
|
||||
}
|
||||
|
||||
// A Module descriptor must be specified. Entries are dummy strings
|
||||
val md = getModuleDescriptor
|
||||
md.setDefaultConf(ivyConfName)
|
||||
|
||||
// Add an exclusion rule for Spark
|
||||
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
|
||||
val sparkDependencyExcludeRule =
|
||||
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
|
||||
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
|
||||
|
||||
// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
|
||||
md.addExcludeRule(sparkDependencyExcludeRule)
|
||||
addDependenciesToIvy(md, artifacts, ivyConfName)
|
||||
|
||||
// resolve dependencies
|
||||
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
|
||||
if (rr.hasError) {
|
||||
throw new RuntimeException(rr.getAllProblemMessages.toString)
|
||||
}
|
||||
// retrieve all resolved dependencies
|
||||
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
|
||||
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
|
||||
retrieveOptions.setConfs(Array(ivyConfName)))
|
||||
|
||||
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides an indirection layer for passing arguments as system properties or flags to
|
||||
* the user's driver program or to downstream launcher tools.
|
||||
|
|
|
@ -50,6 +50,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
|
|||
var name: String = null
|
||||
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
|
||||
var jars: String = null
|
||||
var packages: String = null
|
||||
var repositories: String = null
|
||||
var ivyRepoPath: String = null
|
||||
var verbose: Boolean = false
|
||||
var isPython: Boolean = false
|
||||
var pyFiles: String = null
|
||||
|
@ -123,6 +126,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
|
|||
.orNull
|
||||
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
|
||||
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
|
||||
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
|
||||
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
|
||||
numExecutors = Option(numExecutors)
|
||||
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
|
||||
|
@ -212,6 +216,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
|
|||
| name $name
|
||||
| childArgs [${childArgs.mkString(" ")}]
|
||||
| jars $jars
|
||||
| packages $packages
|
||||
| repositories $repositories
|
||||
| verbose $verbose
|
||||
|
|
||||
|Spark properties used, including those specified through
|
||||
|
@ -318,6 +324,14 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
|
|||
jars = Utils.resolveURIs(value)
|
||||
parse(tail)
|
||||
|
||||
case ("--packages") :: value :: tail =>
|
||||
packages = value
|
||||
parse(tail)
|
||||
|
||||
case ("--repositories") :: value :: tail =>
|
||||
repositories = value
|
||||
parse(tail)
|
||||
|
||||
case ("--conf" | "-c") :: value :: tail =>
|
||||
value.split("=", 2).toSeq match {
|
||||
case Seq(k, v) => sparkProperties(k) = v
|
||||
|
@ -368,6 +382,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
|
|||
| --name NAME A name of your application.
|
||||
| --jars JARS Comma-separated list of local jars to include on the driver
|
||||
| and executor classpaths.
|
||||
| --packages Comma-separated list of maven coordinates of jars to include
|
||||
| on the driver and executor classpaths. Will search the local
|
||||
| maven repo, then maven central and any additional remote
|
||||
| repositories given by --repositories. The format for the
|
||||
| coordinates should be groupId:artifactId:version.
|
||||
| --repositories Comma-separated list of additional remote repositories to
|
||||
| search for the maven coordinates given with --packages.
|
||||
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
|
||||
| on the PYTHONPATH for Python apps.
|
||||
| --files FILES Comma-separated list of files to be placed in the working
|
||||
|
|
|
@ -307,7 +307,21 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
|
|||
"--name", "testApp",
|
||||
"--master", "local-cluster[2,1,512]",
|
||||
"--jars", jarsString,
|
||||
unusedJar.toString)
|
||||
unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
|
||||
runSparkSubmit(args)
|
||||
}
|
||||
|
||||
test("includes jars passed in through --packages") {
|
||||
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
|
||||
val packagesString = "com.databricks:spark-csv_2.10:0.1,com.databricks:spark-avro_2.10:0.1"
|
||||
val args = Seq(
|
||||
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
|
||||
"--name", "testApp",
|
||||
"--master", "local-cluster[2,1,512]",
|
||||
"--packages", packagesString,
|
||||
"--conf", "spark.ui.enabled=false",
|
||||
unusedJar.toString,
|
||||
"com.databricks.spark.csv.DefaultSource", "com.databricks.spark.avro.DefaultSource")
|
||||
runSparkSubmit(args)
|
||||
}
|
||||
|
||||
|
@ -467,8 +481,8 @@ object JarCreationTest extends Logging {
|
|||
val result = sc.makeRDD(1 to 100, 10).mapPartitions { x =>
|
||||
var exception: String = null
|
||||
try {
|
||||
Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader)
|
||||
Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader)
|
||||
Class.forName(args(0), true, Thread.currentThread().getContextClassLoader)
|
||||
Class.forName(args(1), true, Thread.currentThread().getContextClassLoader)
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
exception = t + "\n" + t.getStackTraceString
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.deploy
|
||||
|
||||
import java.io.{PrintStream, OutputStream, File}
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import org.scalatest.{BeforeAndAfterAll, FunSuite}
|
||||
|
||||
import org.apache.ivy.core.module.descriptor.MDArtifact
|
||||
import org.apache.ivy.plugins.resolver.IBiblioResolver
|
||||
|
||||
class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll {
|
||||
|
||||
private val noOpOutputStream = new OutputStream {
|
||||
def write(b: Int) = {}
|
||||
}
|
||||
|
||||
/** Simple PrintStream that reads data into a buffer */
|
||||
private class BufferPrintStream extends PrintStream(noOpOutputStream) {
|
||||
var lineBuffer = ArrayBuffer[String]()
|
||||
override def println(line: String) {
|
||||
lineBuffer += line
|
||||
}
|
||||
}
|
||||
|
||||
override def beforeAll() {
|
||||
super.beforeAll()
|
||||
// We don't want to write logs during testing
|
||||
SparkSubmitUtils.printStream = new BufferPrintStream
|
||||
}
|
||||
|
||||
test("incorrect maven coordinate throws error") {
|
||||
val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a")
|
||||
for (coordinate <- coordinates) {
|
||||
intercept[IllegalArgumentException] {
|
||||
SparkSubmitUtils.extractMavenCoordinates(coordinate)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("create repo resolvers") {
|
||||
val resolver1 = SparkSubmitUtils.createRepoResolvers(None)
|
||||
// should have central by default
|
||||
assert(resolver1.getResolvers.size() === 1)
|
||||
assert(resolver1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "central")
|
||||
|
||||
val repos = "a/1,b/2,c/3"
|
||||
val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos))
|
||||
assert(resolver2.getResolvers.size() === 4)
|
||||
val expected = repos.split(",").map(r => s"$r/")
|
||||
resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: IBiblioResolver, i) =>
|
||||
if (i == 0) {
|
||||
assert(resolver.getName === "central")
|
||||
} else {
|
||||
assert(resolver.getName === s"repo-$i")
|
||||
assert(resolver.getRoot === expected(i - 1))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("add dependencies works correctly") {
|
||||
val md = SparkSubmitUtils.getModuleDescriptor
|
||||
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," +
|
||||
"com.databricks:spark-avro_2.10:0.1")
|
||||
|
||||
SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default")
|
||||
assert(md.getDependencies.length === 2)
|
||||
}
|
||||
|
||||
test("ivy path works correctly") {
|
||||
val ivyPath = "dummy/ivy"
|
||||
val md = SparkSubmitUtils.getModuleDescriptor
|
||||
val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar")
|
||||
var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath))
|
||||
for (i <- 0 until 3) {
|
||||
val index = jPaths.indexOf(ivyPath)
|
||||
assert(index >= 0)
|
||||
jPaths = jPaths.substring(index + ivyPath.length)
|
||||
}
|
||||
// end to end
|
||||
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
|
||||
"com.databricks:spark-csv_2.10:0.1", None, Option(ivyPath), true)
|
||||
assert(jarPath.indexOf(ivyPath) >= 0, "should use non-default ivy path")
|
||||
}
|
||||
|
||||
test("search for artifact at other repositories") {
|
||||
val path = SparkSubmitUtils.resolveMavenCoordinates("com.agimatec:agimatec-validation:0.9.3",
|
||||
Option("https://oss.sonatype.org/content/repositories/agimatec/"), None, true)
|
||||
assert(path.indexOf("agimatec-validation") >= 0, "should find package. If it doesn't, check" +
|
||||
"if package still exists. If it has been removed, replace the example in this test.")
|
||||
}
|
||||
|
||||
test("dependency not found throws RuntimeException") {
|
||||
intercept[RuntimeException] {
|
||||
SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, true)
|
||||
}
|
||||
}
|
||||
|
||||
test("neglects Spark and Spark's dependencies") {
|
||||
val path = SparkSubmitUtils.resolveMavenCoordinates(
|
||||
"org.apache.spark:spark-core_2.10:1.2.0", None, None, true)
|
||||
assert(path === "", "should return empty path")
|
||||
}
|
||||
}
|
2
pom.xml
2
pom.xml
|
@ -136,6 +136,8 @@
|
|||
<jblas.version>1.2.3</jblas.version>
|
||||
<jetty.version>8.1.14.v20131031</jetty.version>
|
||||
<chill.version>0.5.0</chill.version>
|
||||
<ivy.version>2.4.0</ivy.version>
|
||||
<oro.version>2.0.8</oro.version>
|
||||
<codahale.metrics.version>3.1.0</codahale.metrics.version>
|
||||
<avro.version>1.7.6</avro.version>
|
||||
<avro.mapred.classifier></avro.mapred.classifier>
|
||||
|
|
Loading…
Reference in a new issue