SPARK-1496: Have jarOfClass return Option[String]
A simple change, mostly had to change a bunch of example code. Author: Patrick Wendell <pwendell@gmail.com> Closes #438 from pwendell/jar-of-class and squashes the following commits: aa010ff [Patrick Wendell] SPARK-1496: Have jarOfClass return Option[String]
This commit is contained in:
parent
ac164b79d1
commit
83084d3b7b
|
@ -1346,19 +1346,19 @@ object SparkContext extends Logging {
|
|||
* Find the JAR from which a given class was loaded, to make it easy for users to pass
|
||||
* their JARs to SparkContext.
|
||||
*/
|
||||
def jarOfClass(cls: Class[_]): Seq[String] = {
|
||||
def jarOfClass(cls: Class[_]): Option[String] = {
|
||||
val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
|
||||
if (uri != null) {
|
||||
val uriStr = uri.toString
|
||||
if (uriStr.startsWith("jar:file:")) {
|
||||
// URI will be of the form "jar:file:/path/foo.jar!/package/cls.class",
|
||||
// so pull out the /path/foo.jar
|
||||
List(uriStr.substring("jar:file:".length, uriStr.indexOf('!')))
|
||||
Some(uriStr.substring("jar:file:".length, uriStr.indexOf('!')))
|
||||
} else {
|
||||
Nil
|
||||
None
|
||||
}
|
||||
} else {
|
||||
Nil
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1367,7 +1367,7 @@ object SparkContext extends Logging {
|
|||
* to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in
|
||||
* your driver program.
|
||||
*/
|
||||
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
|
||||
def jarOfObject(obj: AnyRef): Option[String] = jarOfClass(obj.getClass)
|
||||
|
||||
/**
|
||||
* Creates a modified version of a SparkConf with the parameters that can be passed separately
|
||||
|
|
|
@ -35,7 +35,7 @@ object BroadcastTest {
|
|||
System.setProperty("spark.broadcast.blockSize", blockSize)
|
||||
|
||||
val sc = new SparkContext(args(0), "Broadcast Test",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
val slices = if (args.length > 1) args(1).toInt else 2
|
||||
val num = if (args.length > 2) args(2).toInt else 1000000
|
||||
|
|
|
@ -75,7 +75,7 @@ object CassandraCQLTest {
|
|||
val sc = new SparkContext(args(0),
|
||||
"CQLTestApp",
|
||||
System.getenv("SPARK_HOME"),
|
||||
SparkContext.jarOfClass(this.getClass))
|
||||
SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
val cHost: String = args(1)
|
||||
val cPort: String = args(2)
|
||||
val KeySpace = "retail"
|
||||
|
|
|
@ -27,7 +27,7 @@ object ExceptionHandlingTest {
|
|||
}
|
||||
|
||||
val sc = new SparkContext(args(0), "ExceptionHandlingTest",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
|
||||
if (math.random > 0.75) {
|
||||
throw new Exception("Testing exception handling")
|
||||
|
|
|
@ -36,7 +36,7 @@ object GroupByTest {
|
|||
var numReducers = if (args.length > 4) args(4).toInt else numMappers
|
||||
|
||||
val sc = new SparkContext(args(0), "GroupBy Test",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
|
||||
val ranGen = new Random
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.spark.rdd.NewHadoopRDD
|
|||
object HBaseTest {
|
||||
def main(args: Array[String]) {
|
||||
val sc = new SparkContext(args(0), "HBaseTest",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
val conf = HBaseConfiguration.create()
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.apache.spark._
|
|||
object HdfsTest {
|
||||
def main(args: Array[String]) {
|
||||
val sc = new SparkContext(args(0), "HdfsTest",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
val file = sc.textFile(args(1))
|
||||
val mapped = file.map(s => s.length).cache()
|
||||
for (iter <- 1 to 10) {
|
||||
|
|
|
@ -46,7 +46,7 @@ object LogQuery {
|
|||
}
|
||||
|
||||
val sc = new SparkContext(args(0), "Log Query",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
val dataSet =
|
||||
if (args.length == 2) sc.textFile(args(1))
|
||||
|
|
|
@ -28,7 +28,7 @@ object MultiBroadcastTest {
|
|||
}
|
||||
|
||||
val sc = new SparkContext(args(0), "Multi-Broadcast Test",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
val slices = if (args.length > 1) args(1).toInt else 2
|
||||
val num = if (args.length > 2) args(2).toInt else 1000000
|
||||
|
|
|
@ -37,7 +37,7 @@ object SimpleSkewedGroupByTest {
|
|||
var ratio = if (args.length > 5) args(5).toInt else 5.0
|
||||
|
||||
val sc = new SparkContext(args(0), "GroupBy Test",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
|
||||
val ranGen = new Random
|
||||
|
|
|
@ -36,7 +36,7 @@ object SkewedGroupByTest {
|
|||
var numReducers = if (args.length > 4) args(4).toInt else numMappers
|
||||
|
||||
val sc = new SparkContext(args(0), "GroupBy Test",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
|
||||
val ranGen = new Random
|
||||
|
|
|
@ -113,7 +113,7 @@ object SparkALS {
|
|||
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
|
||||
|
||||
val sc = new SparkContext(host, "SparkALS",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
val R = generateR()
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ object SparkHdfsLR {
|
|||
val inputPath = args(1)
|
||||
val conf = SparkHadoopUtil.get.newConfiguration()
|
||||
val sc = new SparkContext(args(0), "SparkHdfsLR",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq, Map(),
|
||||
InputFormatInfo.computePreferredLocations(
|
||||
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
|
||||
))
|
||||
|
|
|
@ -57,7 +57,7 @@ object SparkKMeans {
|
|||
System.exit(1)
|
||||
}
|
||||
val sc = new SparkContext(args(0), "SparkLocalKMeans",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
val lines = sc.textFile(args(1))
|
||||
val data = lines.map(parseVector _).cache()
|
||||
val K = args(2).toInt
|
||||
|
|
|
@ -52,7 +52,7 @@ object SparkLR {
|
|||
System.exit(1)
|
||||
}
|
||||
val sc = new SparkContext(args(0), "SparkLR",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
val numSlices = if (args.length > 1) args(1).toInt else 2
|
||||
val points = sc.parallelize(generateData, numSlices).cache()
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ object SparkPageRank {
|
|||
}
|
||||
var iters = args(2).toInt
|
||||
val ctx = new SparkContext(args(0), "PageRank",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
val lines = ctx.textFile(args(1), 1)
|
||||
val links = lines.map{ s =>
|
||||
val parts = s.split("\\s+")
|
||||
|
|
|
@ -29,7 +29,7 @@ object SparkPi {
|
|||
System.exit(1)
|
||||
}
|
||||
val spark = new SparkContext(args(0), "SparkPi",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
val slices = if (args.length > 1) args(1).toInt else 2
|
||||
val n = 100000 * slices
|
||||
val count = spark.parallelize(1 to n, slices).map { i =>
|
||||
|
|
|
@ -47,7 +47,7 @@ object SparkTC {
|
|||
System.exit(1)
|
||||
}
|
||||
val spark = new SparkContext(args(0), "SparkTC",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
val slices = if (args.length > 1) args(1).toInt else 2
|
||||
var tc = spark.parallelize(generateGraph, slices).cache()
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ object SparkTachyonHdfsLR {
|
|||
val inputPath = args(1)
|
||||
val conf = SparkHadoopUtil.get.newConfiguration()
|
||||
val sc = new SparkContext(args(0), "SparkTachyonHdfsLR",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq, Map(),
|
||||
InputFormatInfo.computePreferredLocations(
|
||||
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
|
||||
))
|
||||
|
|
|
@ -33,7 +33,7 @@ object SparkTachyonPi {
|
|||
System.exit(1)
|
||||
}
|
||||
val spark = new SparkContext(args(0), "SparkTachyonPi",
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
val slices = if (args.length > 1) args(1).toInt else 2
|
||||
val n = 100000 * slices
|
||||
|
|
|
@ -44,7 +44,7 @@ object TallSkinnyPCA {
|
|||
.setMaster(args(0))
|
||||
.setAppName("TallSkinnyPCA")
|
||||
.setSparkHome(System.getenv("SPARK_HOME"))
|
||||
.setJars(SparkContext.jarOfClass(this.getClass))
|
||||
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
val sc = new SparkContext(conf)
|
||||
|
||||
// Load and parse the data file.
|
||||
|
|
|
@ -44,7 +44,7 @@ object TallSkinnySVD {
|
|||
.setMaster(args(0))
|
||||
.setAppName("TallSkinnySVD")
|
||||
.setSparkHome(System.getenv("SPARK_HOME"))
|
||||
.setJars(SparkContext.jarOfClass(this.getClass))
|
||||
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
|
||||
val sc = new SparkContext(conf)
|
||||
|
||||
// Load and parse the data file.
|
||||
|
|
|
@ -150,7 +150,7 @@ object ActorWordCount {
|
|||
|
||||
// Create the context and set the batch size
|
||||
val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
/*
|
||||
* Following is the use of actorStream to plug in custom actor as receiver
|
||||
|
|
|
@ -51,7 +51,7 @@ object FlumeEventCount {
|
|||
val batchInterval = Milliseconds(2000)
|
||||
// Create the context and set the batch size
|
||||
val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
// Create a flume stream
|
||||
val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2)
|
||||
|
|
|
@ -41,7 +41,7 @@ object HdfsWordCount {
|
|||
|
||||
// Create the context
|
||||
val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
// Create the FileInputDStream on the directory and use the
|
||||
// stream to count words in new files created
|
||||
|
|
|
@ -52,7 +52,7 @@ object KafkaWordCount {
|
|||
val Array(master, zkQuorum, group, topics, numThreads) = args
|
||||
|
||||
val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
ssc.checkpoint("checkpoint")
|
||||
|
||||
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
|
||||
|
|
|
@ -97,7 +97,7 @@ object MQTTWordCount {
|
|||
val Seq(master, brokerUrl, topic) = args.toSeq
|
||||
|
||||
val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
|
||||
StreamingContext.jarOfClass(this.getClass))
|
||||
StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
|
||||
|
||||
val words = lines.flatMap(x => x.toString.split(" "))
|
||||
|
|
|
@ -47,7 +47,7 @@ object NetworkWordCount {
|
|||
|
||||
// Create the context with a 1 second batch size
|
||||
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
// Create a NetworkInputDStream on target ip:port and count the
|
||||
// words in input stream of \n delimited text (eg. generated by 'nc')
|
||||
|
|
|
@ -35,7 +35,7 @@ object QueueStream {
|
|||
|
||||
// Create the context
|
||||
val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
// Create the queue through which RDDs can be pushed to
|
||||
// a QueueInputDStream
|
||||
|
|
|
@ -50,7 +50,7 @@ object RawNetworkGrep {
|
|||
|
||||
// Create the context
|
||||
val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
// Warm up the JVMs on master and slave for JIT compilation to kick in
|
||||
RawTextHelper.warmUp(ssc.sparkContext)
|
||||
|
|
|
@ -77,7 +77,7 @@ object RecoverableNetworkWordCount {
|
|||
|
||||
// Create the context with a 1 second batch size
|
||||
val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
// Create a NetworkInputDStream on target ip:port and count the
|
||||
// words in input stream of \n delimited text (eg. generated by 'nc')
|
||||
|
|
|
@ -54,7 +54,7 @@ object StatefulNetworkWordCount {
|
|||
|
||||
// Create the context with a 1 second batch size
|
||||
val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey",
|
||||
Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
ssc.checkpoint(".")
|
||||
|
||||
// Create a NetworkInputDStream on target ip:port and count the
|
||||
|
|
|
@ -68,7 +68,7 @@ object TwitterAlgebirdCMS {
|
|||
val (master, filters) = (args.head, args.tail)
|
||||
|
||||
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
|
||||
|
||||
val users = stream.map(status => status.getUser.getId)
|
||||
|
|
|
@ -55,7 +55,7 @@ object TwitterAlgebirdHLL {
|
|||
val (master, filters) = (args.head, args.tail)
|
||||
|
||||
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
|
||||
|
||||
val users = stream.map(status => status.getUser.getId)
|
||||
|
|
|
@ -41,7 +41,7 @@ object TwitterPopularTags {
|
|||
val (master, filters) = (args.head, args.tail)
|
||||
|
||||
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
val stream = TwitterUtils.createStream(ssc, None, filters)
|
||||
|
||||
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
|
||||
|
|
|
@ -86,7 +86,7 @@ object ZeroMQWordCount {
|
|||
|
||||
// Create the context and set the batch size
|
||||
val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ object PageViewStream {
|
|||
|
||||
// Create the context
|
||||
val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1),
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
|
||||
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
|
||||
|
||||
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
|
||||
val pageViews = ssc.socketTextStream(host, port)
|
||||
|
|
|
@ -543,7 +543,7 @@ object StreamingContext extends Logging {
|
|||
* Find the JAR from which a given class was loaded, to make it easy for users to pass
|
||||
* their JARs to StreamingContext.
|
||||
*/
|
||||
def jarOfClass(cls: Class[_]): Seq[String] = SparkContext.jarOfClass(cls)
|
||||
def jarOfClass(cls: Class[_]): Option[String] = SparkContext.jarOfClass(cls)
|
||||
|
||||
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
|
||||
// Set the default cleaner delay to an hour if not already set.
|
||||
|
|
Loading…
Reference in a new issue