Merge branch 'master' into MatrixFactorizationModel-fix

This commit is contained in:
Hossein Falaki 2014-01-07 15:22:42 -08:00
commit 3a8beb46cb
170 changed files with 1342 additions and 1931 deletions

3
.gitignore vendored
View file

@ -1,7 +1,10 @@
*~
*.swp
*.ipr
*.iml
*.iws
.idea/
sbt/*.jar
.settings
.cache
/build/

View file

@ -13,20 +13,22 @@ This README file only contains basic setup instructions.
## Building
Spark requires Scala 2.10. The project is built using Simple Build Tool (SBT),
which is packaged with it. To build Spark and its example programs, run:
which can be obtained [here](http://www.scala-sbt.org). If SBT is installed we
will use the system version of sbt otherwise we will attempt to download it
automatically. To build Spark and its example programs, run:
sbt/sbt assembly
./sbt/sbt assembly
Once you've built Spark, the easiest way to start using it is the shell:
./spark-shell
./bin/spark-shell
Or, for the Python API, the Python shell (`./pyspark`).
Or, for the Python API, the Python shell (`./bin/pyspark`).
Spark also comes with several sample programs in the `examples` directory.
To run one of them, use `./run-example <class> <params>`. For example:
To run one of them, use `./bin/run-example <class> <params>`. For example:
./run-example org.apache.spark.examples.SparkLR local[2]
./bin/run-example org.apache.spark.examples.SparkLR local[2]
will run the Logistic Regression example locally on 2 CPUs.
@ -36,7 +38,13 @@ All of the Spark samples take a `<master>` parameter that is the cluster URL
to connect to. This can be a mesos:// or spark:// URL, or "local" to run
locally with one thread, or "local[N]" to run locally with N threads.
## Running tests
Testing first requires [Building](#Building) Spark. Once Spark is built, tests
can be run using:
`./sbt/sbt test`
## A Note About Hadoop Versions
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported

View file

@ -1,27 +0,0 @@
Copyright (c) 2009-2011, Barthelemy Dagenais All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
- The name of the author may not be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

View file

@ -1 +0,0 @@
b7924aabe9c5e63f0a4d8bbd17019534c7ec014e

View file

@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.7</version>
<description>POM was created from install:install-file</description>
</project>

View file

@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<metadata>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<versioning>
<release>0.7</release>
<versions>
<version>0.7</version>
</versions>
<lastUpdated>20130828020333</lastUpdated>
</versioning>
</metadata>

View file

@ -67,7 +67,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.7</version>
<version>0.8.1</version>
</dependency>
</dependencies>
@ -124,7 +124,17 @@
<profiles>
<profile>
<id>hadoop2-yarn</id>
<id>yarn-alpha</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn-alpha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>yarn</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>

View file

@ -39,23 +39,20 @@
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/bin/
${project.parent.basedir}/sbin/
</directory>
<outputDirectory>/bin</outputDirectory>
<outputDirectory>/sbin</outputDirectory>
<includes>
<include>**/*</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}
${project.parent.basedir}/bin/
</directory>
<outputDirectory>/bin</outputDirectory>
<includes>
<include>run-example*</include>
<include>spark-class*</include>
<include>spark-shell*</include>
<include>spark-executor*</include>
<include>**/*</include>
</includes>
</fileSet>
</fileSets>

View file

@ -29,7 +29,7 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
rem Build up classpath
set CLASSPATH=%SPARK_CLASSPATH%;%FWDIR%conf
set CLASSPATH=%FWDIR%conf
if exist "%FWDIR%RELEASE" (
for %%d in ("%FWDIR%jars\spark-assembly*.jar") do (
set ASSEMBLY_JAR=%%d

View file

@ -26,7 +26,7 @@ SCALA_VERSION=2.10
FWDIR="$(cd `dirname $0`/..; pwd)"
# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
if [ -e "$FWDIR/conf/spark-env.sh" ] ; then
. $FWDIR/conf/spark-env.sh
fi

View file

@ -18,7 +18,7 @@
#
# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
FWDIR="$(cd `dirname $0`/..; pwd)"
# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"
@ -37,7 +37,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
fi
# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
if [ -e "$FWDIR/conf/spark-env.sh" ] ; then
. $FWDIR/conf/spark-env.sh
fi

View file

@ -20,7 +20,7 @@ rem
set SCALA_VERSION=2.10
rem Figure out where the Spark framework is installed
set FWDIR=%~dp0
set FWDIR=%~dp0..\
rem Export this as SPARK_HOME
set SPARK_HOME=%FWDIR%

View file

@ -25,13 +25,13 @@ esac
SCALA_VERSION=2.10
# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
FWDIR="$(cd `dirname $0`/..; pwd)"
# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"
# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
if [ -e "$FWDIR/conf/spark-env.sh" ] ; then
. $FWDIR/conf/spark-env.sh
fi

View file

@ -20,7 +20,7 @@ rem
set SCALA_VERSION=2.10
rem Figure out where the Spark framework is installed
set FWDIR=%~dp0
set FWDIR=%~dp0..\
rem Export this as SPARK_HOME
set SPARK_HOME=%FWDIR%
@ -49,7 +49,7 @@ if "x%SPARK_EXAMPLES_JAR%"=="x" (
rem Compute Spark classpath using external script
set DONT_PRINT_CLASSPATH=1
call "%FWDIR%bin\compute-classpath.cmd"
call "%FWDIR%sbin\compute-classpath.cmd"
set DONT_PRINT_CLASSPATH=0
set CLASSPATH=%SPARK_EXAMPLES_JAR%;%CLASSPATH%

View file

@ -25,13 +25,13 @@ esac
SCALA_VERSION=2.10
# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
FWDIR="$(cd `dirname $0`/..; pwd)"
# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"
# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
if [ -e "$FWDIR/conf/spark-env.sh" ] ; then
. $FWDIR/conf/spark-env.sh
fi
@ -92,7 +92,7 @@ JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e $FWDIR/conf/java-opts ] ; then
if [ -e "$FWDIR/conf/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
fi
export JAVA_OPTS

View file

@ -20,7 +20,7 @@ rem
set SCALA_VERSION=2.10
rem Figure out where the Spark framework is installed
set FWDIR=%~dp0
set FWDIR=%~dp0..\
rem Export this as SPARK_HOME
set SPARK_HOME=%FWDIR%
@ -73,7 +73,7 @@ for %%d in ("%TOOLS_DIR%\target\scala-%SCALA_VERSION%\spark-tools*assembly*.jar"
rem Compute classpath using external script
set DONT_PRINT_CLASSPATH=1
call "%FWDIR%bin\compute-classpath.cmd"
call "%FWDIR%sbin\compute-classpath.cmd"
set DONT_PRINT_CLASSPATH=0
set CLASSPATH=%CLASSPATH%;%SPARK_TOOLS_JAR%

View file

@ -32,7 +32,7 @@ esac
# Enter posix mode for bash
set -o posix
FWDIR="`dirname $0`"
FWDIR="$(cd `dirname $0`/..; pwd)"
for o in "$@"; do
if [ "$1" = "-c" -o "$1" = "--cores" ]; then
@ -90,10 +90,10 @@ if $cygwin; then
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
$FWDIR/spark-class -Djline.terminal=unix $OPTIONS org.apache.spark.repl.Main "$@"
$FWDIR/bin/spark-class -Djline.terminal=unix $OPTIONS org.apache.spark.repl.Main "$@"
stty icanon echo > /dev/null 2>&1
else
$FWDIR/spark-class $OPTIONS org.apache.spark.repl.Main "$@"
$FWDIR/bin/spark-class $OPTIONS org.apache.spark.repl.Main "$@"
fi
# record the exit status lest it be overwritten:

View file

@ -17,6 +17,7 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
set FWDIR=%~dp0
rem Find the path of sbin
set SBIN=%~dp0..\sbin\
cmd /V /E /C %FWDIR%spark-class2.cmd org.apache.spark.repl.Main %*
cmd /V /E /C %SBIN%spark-class2.cmd org.apache.spark.repl.Main %*

View file

@ -18,4 +18,4 @@
# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes

View file

@ -31,7 +31,8 @@ import java.util.concurrent.TimeUnit;
class FileClient {
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName());
private final FileClientHandler handler;
private Channel channel = null;
private Bootstrap bootstrap = null;
@ -39,7 +40,7 @@ class FileClient {
private final int connectTimeout;
private final int sendTimeout = 60; // 1 min
public FileClient(FileClientHandler handler, int connectTimeout) {
FileClient(FileClientHandler handler, int connectTimeout) {
this.handler = handler;
this.connectTimeout = connectTimeout;
}

View file

@ -25,7 +25,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private final FileClientHandler fhandler;
public FileClientChannelInitializer(FileClientHandler handler) {
FileClientChannelInitializer(FileClientHandler handler) {
fhandler = handler;
}

View file

@ -33,15 +33,14 @@ import org.slf4j.LoggerFactory;
*/
class FileServer {
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private static final Logger LOG = LoggerFactory.getLogger(FileServer.class.getName());
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
private ChannelFuture channelFuture = null;
private int port = 0;
private Thread blockingThread = null;
public FileServer(PathResolver pResolver, int port) {
FileServer(PathResolver pResolver, int port) {
InetSocketAddress addr = new InetSocketAddress(port);
// Configure the server.
@ -70,7 +69,8 @@ class FileServer {
* Start the file server asynchronously in a new thread.
*/
public void start() {
blockingThread = new Thread() {
Thread blockingThread = new Thread() {
@Override
public void run() {
try {
channelFuture.channel().closeFuture().sync();

View file

@ -25,9 +25,9 @@ import io.netty.handler.codec.string.StringDecoder;
class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
PathResolver pResolver;
private final PathResolver pResolver;
public FileServerChannelInitializer(PathResolver pResolver) {
FileServerChannelInitializer(PathResolver pResolver) {
this.pResolver = pResolver;
}

View file

@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory;
class FileServerHandler extends SimpleChannelInboundHandler<String> {
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private static final Logger LOG = LoggerFactory.getLogger(FileServerHandler.class.getName());
private final PathResolver pResolver;
public FileServerHandler(PathResolver pResolver){
FileServerHandler(PathResolver pResolver){
this.pResolver = pResolver;
}
@ -61,7 +61,7 @@ class FileServerHandler extends SimpleChannelInboundHandler<String> {
ctx.flush();
return;
}
int len = new Long(length).intValue();
int len = (int) length;
ctx.write((new FileHeader(len, blockId)).buffer());
try {
ctx.write(new DefaultFileRegion(new FileInputStream(file)

View file

@ -1,26 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.netty;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;
public interface PathResolver {
/** Get the file segment in which the given block resides. */
public FileSegment getBlockLocation(BlockId blockId);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.netty;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;
public interface PathResolver {
/** Get the file segment in which the given block resides. */
FileSegment getBlockLocation(BlockId blockId);
}

View file

@ -24,7 +24,7 @@ import com.typesafe.config.ConfigFactory
*
* @param loadDefaults whether to load values from the system properties and classpath
*/
class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
@ -67,7 +67,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
set("spark.jars", jars.mkString(","))
for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
set("spark.jars", jars.filter(_ != null).mkString(","))
}
/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
@ -171,6 +172,9 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
.map{case (k, v) => (k.substring(prefix.length), v)}
}
/** Get all akka conf variables set on this SparkConf */
def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")}
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)

View file

@ -116,6 +116,10 @@ class SparkContext(
throw new SparkException("An application must be set in your configuration")
}
if (conf.get("spark.log-conf", "false").toBoolean) {
logInfo("Spark configuration:\n" + conf.toDebugString)
}
// Set Spark driver host and port system properties
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")
@ -169,10 +173,16 @@ class SparkContext(
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING");
for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS");
value <- Option(System.getenv(key))) {
executorEnvs(key) = value
}
// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing"))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
// Since memory can be set with a system property too, use that
executorEnvs("SPARK_MEM") = executorMemory + "m"
executorEnvs ++= conf.getExecutorEnv

View file

@ -431,4 +431,10 @@ object JavaSparkContext {
implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to SparkContext.
*/
def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
}

View file

@ -24,7 +24,8 @@ import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
@ -110,6 +111,12 @@ private[spark] class Client(
}
}
private def isPossibleMaster(remoteUrl: Address) = {
masterUrls.map(s => Master.toAkkaUrl(s))
.map(u => AddressFromURIString(u).hostPort)
.contains(remoteUrl.hostPort)
}
override def receive = {
case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
@ -145,6 +152,9 @@ private[spark] class Client(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
logWarning(s"Could not connect to $address: $cause")
case StopClient =>
markDead()
sender ! true

View file

@ -127,7 +127,7 @@ private[spark] class CoarseMesosSchedulerBackend(
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val uri = conf.get("spark.executor.uri", null)
if (uri == null) {
val runScript = new File(sparkHome, "spark-class").getCanonicalPath
val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
@ -136,7 +136,7 @@ private[spark] class CoarseMesosSchedulerBackend(
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
"cd %s*; ./spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
"cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
.format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}

View file

@ -102,12 +102,12 @@ private[spark] class MesosSchedulerBackend(
.setEnvironment(environment)
val uri = sc.conf.get("spark.executor.uri", null)
if (uri == null) {
command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue("cd %s*; ./spark-executor".format(basename))
command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val memory = Resource.newBuilder()

View file

@ -29,6 +29,9 @@ import org.apache.spark.util.{NextIterator, ByteBufferInputStream}
* A serializer. Because some serialization libraries are not thread safe, this class is used to
* create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual serialization and are
* guaranteed to only be called from one thread at a time.
*
* Implementations of this trait should have a zero-arg constructor or a constructor that accepts a
* [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes precedence.
*/
trait Serializer {
def newInstance(): SerializerInstance

View file

@ -27,6 +27,7 @@ import org.apache.spark.SparkConf
* creating a new one.
*/
private[spark] class SerializerManager {
// TODO: Consider moving this into SparkConf itself to remove the global singleton.
private val serializers = new ConcurrentHashMap[String, Serializer]
private var _default: Serializer = _
@ -53,8 +54,18 @@ private[spark] class SerializerManager {
if (serializer == null) {
val clsLoader = Thread.currentThread.getContextClassLoader
val cls = Class.forName(clsName, true, clsLoader)
val constructor = cls.getConstructor(classOf[SparkConf])
serializer = constructor.newInstance(conf).asInstanceOf[Serializer]
// First try with the constructor that takes SparkConf. If we can't find one,
// use a no-arg constructor instead.
try {
val constructor = cls.getConstructor(classOf[SparkConf])
serializer = constructor.newInstance(conf).asInstanceOf[Serializer]
} catch {
case _: NoSuchMethodException =>
val constructor = cls.getConstructor()
serializer = constructor.newInstance().asInstanceOf[Serializer]
}
serializers.put(clsName, serializer)
}
serializer

View file

@ -27,7 +27,7 @@ import org.apache.spark.scheduler.SchedulingMode
/**
* Continuously generates jobs that expose various features of the WebUI (internal testing tool).
*
* Usage: ./run spark.ui.UIWorkloadGenerator [master]
* Usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]
*/
private[spark] object UIWorkloadGenerator {
@ -36,7 +36,7 @@ private[spark] object UIWorkloadGenerator {
def main(args: Array[String]) {
if (args.length < 2) {
println("usage: ./spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
println("usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
System.exit(1)
}

View file

@ -17,10 +17,13 @@
package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
/**
@ -47,15 +50,23 @@ private[spark] object AkkaUtils {
val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
val lifecycleEvents =
if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
if (!akkaLogLifecycleEvents) {
// As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
// See: https://www.assembla.com/spaces/akka/tickets/3787#/
Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
}
val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off"
val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
val akkaFailureDetector =
conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
val akkaConf = ConfigFactory.parseString(
val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@ -73,8 +84,11 @@ private[spark] object AkkaUtils {
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
|akka.log-config-on-start = $logAkkaConfig
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
""".stripMargin)
|akka.log-dead-letters = $lifecycleEvents
|akka.log-dead-letters-during-shutdown = $lifecycleEvents
""".stripMargin))
val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf)

View file

@ -30,13 +30,15 @@ import org.apache.spark.util.Utils
class DriverSuite extends FunSuite with Timeouts {
test("driver should exit after finishing") {
assert(System.getenv("SPARK_HOME") != null)
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
failAfter(60 seconds) {
Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(System.getenv("SPARK_HOME")))
Utils.executeAndGetOutput(
Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
}
}
}

View file

@ -17,33 +17,49 @@
package org.apache.spark
import java.io._
import java.util.jar.{JarEntry, JarOutputStream}
import SparkContext._
import com.google.common.io.Files
import org.scalatest.FunSuite
import java.io.{File, PrintWriter, FileReader, BufferedReader}
import SparkContext._
class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpFile: File = _
@transient var testJarFile: File = _
@transient var tmpJarUrl: String = _
override def beforeEach() {
super.beforeEach()
// Create a sample text file
val tmpdir = new File(Files.createTempDir(), "test")
tmpdir.mkdir()
tmpFile = new File(tmpdir, "FileServerSuite.txt")
val pw = new PrintWriter(tmpFile)
override def beforeAll() {
super.beforeAll()
val tmpDir = new File(Files.createTempDir(), "test")
tmpDir.mkdir()
val textFile = new File(tmpDir, "FileServerSuite.txt")
val pw = new PrintWriter(textFile)
pw.println("100")
pw.close()
}
val jarFile = new File(tmpDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
override def afterEach() {
super.afterEach()
// Clean up downloaded file
if (tmpFile.exists) {
tmpFile.delete()
val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarEntry)
val in = new FileInputStream(textFile)
val buffer = new Array[Byte](10240)
var nRead = 0
while (nRead <= 0) {
nRead = in.read(buffer, 0, buffer.length)
jar.write(buffer, 0, nRead)
}
in.close()
jar.close()
jarStream.close()
tmpFile = textFile
tmpJarUrl = jarFile.toURI.toURL.toString
}
test("Distributing files locally") {
@ -77,18 +93,13 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test ("Dynamically adding JARS locally") {
sc = new SparkContext("local[4]", "test")
val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
val fac = Thread.currentThread.getContextClassLoader()
.loadClass("org.uncommons.maths.Maths")
.getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
sc.addJar(tmpJarUrl)
val testData = Array((1, 1))
sc.parallelize(testData).foreach { x =>
if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
throw new SparkException("jar not added")
}
}
}
test("Distributing files on a standalone cluster") {
@ -107,33 +118,24 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
val fac = Thread.currentThread.getContextClassLoader()
.loadClass("org.uncommons.maths.Maths")
.getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
sc.addJar(tmpJarUrl)
val testData = Array((1,1))
sc.parallelize(testData).foreach { x =>
if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
throw new SparkException("jar not added")
}
}
}
test ("Dynamically adding JARS on a standalone cluster using local: URL") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile.replace("file", "local"))
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
val fac = Thread.currentThread.getContextClassLoader()
.loadClass("org.uncommons.maths.Maths")
.getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
sc.addJar(tmpJarUrl.replace("file", "local"))
val testData = Array((1,1))
sc.parallelize(testData).foreach { x =>
if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
throw new SparkException("jar not added")
}
}
}
}

View file

@ -18,13 +18,15 @@
package org.apache.spark.deploy.worker
import java.io.File
import org.scalatest.FunSuite
import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription}
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
val sparkHome = sys.env("SPARK_HOME")
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"

View file

@ -157,8 +157,8 @@ trait Message[K] {
# Where to Go from Here
Two example jobs, PageRank and shortest path, are included in `examples/src/main/scala/org/apache/spark/examples/bagel`. You can run them by passing the class name to the `run-example` script included in Spark; e.g.:
Two example jobs, PageRank and shortest path, are included in `examples/src/main/scala/org/apache/spark/examples/bagel`. You can run them by passing the class name to the `bin/run-example` script included in Spark; e.g.:
./run-example org.apache.spark.examples.bagel.WikipediaPageRank
./bin/run-example org.apache.spark.examples.bagel.WikipediaPageRank
Each example program prints usage help when run without any arguments.

View file

@ -37,20 +37,16 @@ For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions wit
# Cloudera CDH 4.2.0 with MapReduce v1
$ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package
For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should enable the "hadoop2-yarn" profile and set the "yarn.version" property:
For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should enable the "yarn-alpha" or "yarn" profile and set the "hadoop.version", "yarn.version" property:
# Apache Hadoop 2.0.5-alpha
$ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package
$ mvn -Pyarn-alpha -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package
# Cloudera CDH 4.2.0 with MapReduce v2
$ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-chd4.2.0 -DskipTests clean package
$ mvn -Pyarn-alpha -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-chd4.2.0 -DskipTests clean package
Hadoop versions 2.2.x and newer can be built by setting the ```new-yarn``` and the ```yarn.version``` as follows:
# Apache Hadoop 2.2.X and newer
$ mvn -Dyarn.version=2.2.0 -Dhadoop.version=2.2.0 -Pnew-yarn
The build process handles Hadoop 2.2.x as a special case that uses the directory ```new-yarn```, which supports the new YARN API. Furthermore, for this version, the build depends on artifacts published by the spark-project to enable Akka 2.0.5 to work with protobuf 2.5.
# Apache Hadoop 2.2.X ( e.g. 2.2.0 as below ) and newer
$ mvn -Pyarn -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -DskipTests clean package
## Spark Tests in Maven ##

View file

@ -81,7 +81,8 @@ there are at least five properties that you will commonly want to control:
<td>
When running on a <a href="spark-standalone.html">standalone deploy cluster</a> or a
<a href="running-on-mesos.html#mesos-run-modes">Mesos cluster in "coarse-grained"
sharing mode</a>, how many CPU cores to request at most. The default will use all available cores
sharing mode</a>, the maximum amount of CPU cores to request for the application from
across the cluster (not from each machine). The default will use all available cores
offered by the cluster manager.
</td>
</tr>
@ -359,6 +360,14 @@ Apart from these, the following properties are also available, and may be useful
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
<tr>
<td>akka.x.y....</td>
<td>value</td>
<td>
An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well.
</td>
</tr>
<tr>
<td>spark.shuffle.consolidateFiles</td>
<td>false</td>
@ -394,6 +403,13 @@ Apart from these, the following properties are also available, and may be useful
How many times slower a task is than the median to be considered for speculation.
</td>
</tr>
<tr>
<td>spark.log-conf</td>
<td>false</td>
<td>
Log the supplied SparkConf as INFO at start of spark context.
</td>
</tr>
</table>
## Viewing Spark Properties

View file

@ -24,9 +24,9 @@ For its Scala API, Spark {{site.SPARK_VERSION}} depends on Scala {{site.SCALA_VE
# Running the Examples and Shell
Spark comes with several sample programs in the `examples` directory.
To run one of the samples, use `./run-example <class> <params>` in the top-level Spark directory
(the `run-example` script sets up the appropriate paths and launches that program).
For example, try `./run-example org.apache.spark.examples.SparkPi local`.
To run one of the samples, use `./bin/run-example <class> <params>` in the top-level Spark directory
(the `bin/run-example` script sets up the appropriate paths and launches that program).
For example, try `./bin/run-example org.apache.spark.examples.SparkPi local`.
Each example prints usage help when run with no parameters.
Note that all of the sample programs take a `<master>` parameter specifying the cluster URL
@ -34,8 +34,8 @@ to connect to. This can be a [URL for a distributed cluster](scala-programming-g
or `local` to run locally with one thread, or `local[N]` to run locally with N threads. You should start by using
`local` for testing.
Finally, you can run Spark interactively through modified versions of the Scala shell (`./spark-shell`) or
Python interpreter (`./pyspark`). These are a great way to learn the framework.
Finally, you can run Spark interactively through modified versions of the Scala shell (`./bin/spark-shell`) or
Python interpreter (`./bin/pyspark`). These are a great way to learn the framework.
# Launching on a Cluster

View file

@ -190,9 +190,9 @@ We hope to generate documentation with Java-style syntax in the future.
Spark includes several sample programs using the Java API in
[`examples/src/main/java`](https://github.com/apache/incubator-spark/tree/master/examples/src/main/java/org/apache/spark/examples). You can run them by passing the class name to the
`run-example` script included in Spark; for example:
`bin/run-example` script included in Spark; for example:
./run-example org.apache.spark.examples.JavaWordCount
./bin/run-example org.apache.spark.examples.JavaWordCount
Each example program prints usage help when run
without any arguments.

View file

@ -87,7 +87,7 @@ svmAlg.optimizer.setNumIterations(200)
val modelL1 = svmAlg.run(parsedData)
{% endhighlight %}
Both of the code snippets above can be executed in `spark-shell` to generate a
Both of the code snippets above can be executed in `bin/spark-shell` to generate a
classifier for the provided dataset.
Available algorithms for binary classification:

View file

@ -47,7 +47,7 @@ PySpark will automatically ship these functions to workers, along with any objec
Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers.
The [Standalone Use](#standalone-use) section describes how to ship code dependencies to workers.
In addition, PySpark fully supports interactive use---simply run `./pyspark` to launch an interactive shell.
In addition, PySpark fully supports interactive use---simply run `./bin/pyspark` to launch an interactive shell.
# Installing and Configuring PySpark
@ -60,17 +60,17 @@ By default, PySpark requires `python` to be available on the system `PATH` and u
All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported.
Standalone PySpark applications should be run using the `pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
The script automatically adds the `pyspark` package to the `PYTHONPATH`.
Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.
# Interactive Use
The `pyspark` script launches a Python interpreter that is configured to run PySpark applications. To use `pyspark` interactively, first build Spark, then launch it directly from the command line without any options:
The `bin/pyspark` script launches a Python interpreter that is configured to run PySpark applications. To use `pyspark` interactively, first build Spark, then launch it directly from the command line without any options:
{% highlight bash %}
$ sbt/sbt assembly
$ ./pyspark
$ ./bin/pyspark
{% endhighlight %}
The Python shell can be used explore data interactively and is a simple way to learn the API:
@ -82,35 +82,35 @@ The Python shell can be used explore data interactively and is a simple way to l
>>> help(pyspark) # Show all pyspark functions
{% endhighlight %}
By default, the `pyspark` shell creates SparkContext that runs applications locally on a single core.
By default, the `bin/pyspark` shell creates SparkContext that runs applications locally on a single core.
To connect to a non-local cluster, or use multiple cores, set the `MASTER` environment variable.
For example, to use the `pyspark` shell with a [standalone Spark cluster](spark-standalone.html):
For example, to use the `bin/pyspark` shell with a [standalone Spark cluster](spark-standalone.html):
{% highlight bash %}
$ MASTER=spark://IP:PORT ./pyspark
$ MASTER=spark://IP:PORT ./bin/pyspark
{% endhighlight %}
Or, to use four cores on the local machine:
{% highlight bash %}
$ MASTER=local[4] ./pyspark
$ MASTER=local[4] ./bin/pyspark
{% endhighlight %}
## IPython
It is also possible to launch PySpark in [IPython](http://ipython.org), the enhanced Python interpreter.
To do this, set the `IPYTHON` variable to `1` when running `pyspark`:
To do this, set the `IPYTHON` variable to `1` when running `bin/pyspark`:
{% highlight bash %}
$ IPYTHON=1 ./pyspark
$ IPYTHON=1 ./bin/pyspark
{% endhighlight %}
Alternatively, you can customize the `ipython` command by setting `IPYTHON_OPTS`. For example, to launch
the [IPython Notebook](http://ipython.org/notebook.html) with PyLab graphing support:
{% highlight bash %}
$ IPYTHON_OPTS="notebook --pylab inline" ./pyspark
$ IPYTHON_OPTS="notebook --pylab inline" ./bin/pyspark
{% endhighlight %}
IPython also works on a cluster or on multiple cores if you set the `MASTER` environment variable.
@ -118,7 +118,7 @@ IPython also works on a cluster or on multiple cores if you set the `MASTER` env
# Standalone Programs
PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `pyspark`.
PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/pyspark`.
The Quick Start guide includes a [complete example](quick-start.html#a-standalone-app-in-python) of a standalone Python application.
Code dependencies can be deployed by listing them in the `pyFiles` option in the SparkContext constructor:
@ -153,6 +153,6 @@ Many of the methods also contain [doctests](http://docs.python.org/2/library/doc
PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples).
You can run them by passing the files to `pyspark`; e.g.:
./pyspark python/examples/wordcount.py
./bin/pyspark python/examples/wordcount.py
Each program prints usage help when run without arguments.

View file

@ -20,7 +20,7 @@ $ sbt/sbt assembly
## Basics
Spark's interactive shell provides a simple way to learn the API, as well as a powerful tool to analyze datasets interactively.
Start the shell by running `./spark-shell` in the Spark directory.
Start the shell by running `./bin/spark-shell` in the Spark directory.
Spark's primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs. Let's make a new RDD from the text of the README file in the Spark source directory:
@ -99,7 +99,7 @@ scala> linesWithSpark.count()
res9: Long = 15
{% endhighlight %}
It may seem silly to use Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark).
It may seem silly to use Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `bin/spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark).
# A Standalone App in Scala
Now say we wanted to write a standalone application using the Spark API. We will walk through a simple application in both Scala (with SBT), Java (with Maven), and Python. If you are using other build systems, consider using the Spark assembly JAR described in the developer guide.
@ -146,7 +146,7 @@ If you also wish to read data from Hadoop's HDFS, you will also need to add a de
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "<your-hdfs-version>"
{% endhighlight %}
Finally, for sbt to work correctly, we'll need to layout `SimpleApp.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the application's code, then use `sbt run` to execute our program.
Finally, for sbt to work correctly, we'll need to layout `SimpleApp.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the application's code, then use `sbt/sbt run` to execute our program.
{% highlight bash %}
$ find .
@ -157,8 +157,8 @@ $ find .
./src/main/scala
./src/main/scala/SimpleApp.scala
$ sbt package
$ sbt run
$ sbt/sbt package
$ sbt/sbt run
...
Lines with a: 46, Lines with b: 23
{% endhighlight %}
@ -277,11 +277,11 @@ We can pass Python functions to Spark, which are automatically serialized along
For applications that use custom classes or third-party libraries, we can add those code dependencies to SparkContext to ensure that they will be available on remote machines; this is described in more detail in the [Python programming guide](python-programming-guide.html).
`SimpleApp` is simple enough that we do not need to specify any code dependencies.
We can run this application using the `pyspark` script:
We can run this application using the `bin/pyspark` script:
{% highlight python %}
$ cd $SPARK_HOME
$ ./pyspark SimpleApp.py
$ ./bin/pyspark SimpleApp.py
...
Lines with a: 46, Lines with b: 23
{% endhighlight python %}

View file

@ -12,7 +12,7 @@ was added to Spark in version 0.6.0, and improved in 0.7.0 and 0.8.0.
We need a consolidated Spark JAR (which bundles all the required dependencies) to run Spark jobs on a YARN cluster.
This can be built by setting the Hadoop version and `SPARK_YARN` environment variable, as follows:
SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true ./sbt/sbt assembly
SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly
The assembled JAR will be something like this:
`./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly_{{site.SPARK_VERSION}}-hadoop2.0.5.jar`.
@ -54,7 +54,7 @@ There are two scheduler mode that can be used to launch spark application on YAR
The command to launch the YARN Client is as follows:
SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \
SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
--jar <YOUR_APP_JAR_FILE> \
--class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \
@ -72,14 +72,14 @@ The command to launch the YARN Client is as follows:
For example:
# Build the Spark assembly JAR and the Spark examples JAR
$ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true ./sbt/sbt assembly
$ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly
# Configure logging
$ cp conf/log4j.properties.template conf/log4j.properties
# Submit Spark's ApplicationMaster to YARN's ResourceManager, and instruct Spark to run the SparkPi example
$ SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \
./spark-class org.apache.spark.deploy.yarn.Client \
./bin/spark-class org.apache.spark.deploy.yarn.Client \
--jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
--class org.apache.spark.examples.SparkPi \
--args yarn-standalone \
@ -107,17 +107,15 @@ For example:
SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \
SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
./run-example org.apache.spark.examples.SparkPi yarn-client
./bin/run-example org.apache.spark.examples.SparkPi yarn-client
SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \
SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
MASTER=yarn-client ./spark-shell
MASTER=yarn-client ./bin/spark-shell
# Building Spark for Hadoop/YARN 2.2.x
Hadoop 2.2.x users must build Spark and publish it locally. The SBT build process handles Hadoop 2.2.x as a special case. This version of Hadoop has new YARN API changes and depends on a Protobuf version (2.5) that is not compatible with the Akka version (2.0.5) that Spark uses. Therefore, if the Hadoop version (e.g. set through ```SPARK_HADOOP_VERSION```) starts with 2.2.0 or higher then the build process will depend on Akka artifacts distributed by the Spark project compatible with Protobuf 2.5. Furthermore, the build process then uses the directory ```new-yarn``` (instead of ```yarn```), which supports the new YARN API. The build process should seamlessly work out of the box.
See [Building Spark with Maven](building-with-maven.html) for instructions on how to build Spark using the Maven process.
# Important Notes
@ -126,4 +124,3 @@ See [Building Spark with Maven](building-with-maven.html) for instructions on ho
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
- YARN 2.2.x users cannot simply depend on the Spark packages without building Spark, as the published Spark artifacts are compiled to work with the pre 2.2 API. Those users must build Spark and publish it locally.

View file

@ -13,7 +13,7 @@ At a high level, every Spark application consists of a *driver program* that run
A second abstraction in Spark is *shared variables* that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: *broadcast variables*, which can be used to cache a value in memory on all nodes, and *accumulators*, which are variables that are only "added" to, such as counters and sums.
This guide shows each of these features and walks through some samples. It assumes some familiarity with Scala, especially with the syntax for [closures](http://www.scala-lang.org/node/133). Note that you can also run Spark interactively using the `spark-shell` script. We highly recommend doing that to follow along!
This guide shows each of these features and walks through some samples. It assumes some familiarity with Scala, especially with the syntax for [closures](http://www.scala-lang.org/node/133). Note that you can also run Spark interactively using the `bin/spark-shell` script. We highly recommend doing that to follow along!
# Linking with Spark
@ -54,16 +54,16 @@ object for more advanced configuration.
The `master` parameter is a string specifying a [Spark or Mesos cluster URL](#master-urls) to connect to, or a special "local" string to run in local mode, as described below. `appName` is a name for your application, which will be shown in the cluster web UI. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later.
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable, and you can add JARs to the classpath with the `ADD_JARS` variable. For example, to run `spark-shell` on four cores, use
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable, and you can add JARs to the classpath with the `ADD_JARS` variable. For example, to run `bin/spark-shell` on four cores, use
{% highlight bash %}
$ MASTER=local[4] ./spark-shell
$ MASTER=local[4] ./bin/spark-shell
{% endhighlight %}
Or, to also add `code.jar` to its classpath, use:
{% highlight bash %}
$ MASTER=local[4] ADD_JARS=code.jar ./spark-shell
$ MASTER=local[4] ADD_JARS=code.jar ./bin/spark-shell
{% endhighlight %}
### Master URLs
@ -95,7 +95,7 @@ If you want to run your application on a cluster, you will need to specify the t
* `sparkHome`: The path at which Spark is installed on your worker machines (it should be the same on all of them).
* `jars`: A list of JAR files on the local machine containing your application's code and any dependencies, which Spark will deploy to all the worker nodes. You'll need to package your application into a set of JARs using your build system. For example, if you're using SBT, the [sbt-assembly](https://github.com/sbt/sbt-assembly) plugin is a good way to make a single JAR with your code and dependencies.
If you run `spark-shell` on a cluster, you can add JARs to it by specifying the `ADD_JARS` environment variable before you launch it. This variable should contain a comma-separated list of JARs. For example, `ADD_JARS=a.jar,b.jar ./spark-shell` will launch a shell with `a.jar` and `b.jar` on its classpath. In addition, any new classes you define in the shell will automatically be distributed.
If you run `bin/spark-shell` on a cluster, you can add JARs to it by specifying the `ADD_JARS` environment variable before you launch it. This variable should contain a comma-separated list of JARs. For example, `ADD_JARS=a.jar,b.jar ./bin/spark-shell` will launch a shell with `a.jar` and `b.jar` on its classpath. In addition, any new classes you define in the shell will automatically be distributed.
# Resilient Distributed Datasets (RDDs)
@ -366,9 +366,9 @@ res2: Int = 10
# Where to Go from Here
You can see some [example Spark programs](http://spark.incubator.apache.org/examples.html) on the Spark website.
In addition, Spark includes several samples in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `run-example` script included in Spark; for example:
In addition, Spark includes several samples in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `bin/run-example` script included in Spark; for example:
./run-example org.apache.spark.examples.SparkPi
./bin/run-example org.apache.spark.examples.SparkPi
Each example program prints usage help when run without any arguments.

View file

@ -39,7 +39,7 @@ where `path/to/event-log` is where you want the event log to go relative to `$SP
### Loading the event log into the debugger
1. Run a Spark shell with `MASTER=<i>host</i> ./spark-shell`.
1. Run a Spark shell with `MASTER=<i>host</i> ./bin/spark-shell`.
2. Use `EventLogReader` to load the event log as follows:
{% highlight scala %}
spark> val r = new spark.EventLogReader(sc, Some("path/to/event-log"))

View file

@ -20,7 +20,7 @@ then modify `conf/spark-env.sh` in the `dist/` directory before deploying to all
You can start a standalone master server by executing:
./bin/start-master.sh
./sbin/start-master.sh
Once started, the master will print out a `spark://HOST:PORT` URL for itself, which you can use to connect workers to it,
or pass as the "master" argument to `SparkContext`. You can also find this URL on
@ -28,7 +28,7 @@ the master's web UI, which is [http://localhost:8080](http://localhost:8080) by
Similarly, you can start one or more workers and connect them to the master via:
./spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
Once you have started a worker, look at the master's web UI ([http://localhost:8080](http://localhost:8080) by default).
You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS).
@ -70,12 +70,12 @@ To launch a Spark standalone cluster with the launch scripts, you need to create
Once you've set up this file, you can launch or stop your cluster with the following shell scripts, based on Hadoop's deploy scripts, and available in `SPARK_HOME/bin`:
- `bin/start-master.sh` - Starts a master instance on the machine the script is executed on.
- `bin/start-slaves.sh` - Starts a slave instance on each machine specified in the `conf/slaves` file.
- `bin/start-all.sh` - Starts both a master and a number of slaves as described above.
- `bin/stop-master.sh` - Stops the master that was started via the `bin/start-master.sh` script.
- `bin/stop-slaves.sh` - Stops the slave instances that were started via `bin/start-slaves.sh`.
- `bin/stop-all.sh` - Stops both the master and the slaves as described above.
- `sbin/start-master.sh` - Starts a master instance on the machine the script is executed on.
- `sbin/start-slaves.sh` - Starts a slave instance on each machine specified in the `conf/slaves` file.
- `sbin/start-all.sh` - Starts both a master and a number of slaves as described above.
- `sbin/stop-master.sh` - Stops the master that was started via the `bin/start-master.sh` script.
- `sbin/stop-slaves.sh` - Stops the slave instances that were started via `bin/start-slaves.sh`.
- `sbin/stop-all.sh` - Stops both the master and the slaves as described above.
Note that these scripts must be executed on the machine you want to run the Spark master on, not your local machine.
@ -143,9 +143,9 @@ constructor](scala-programming-guide.html#initializing-spark).
To run an interactive Spark shell against the cluster, run the following command:
MASTER=spark://IP:PORT ./spark-shell
MASTER=spark://IP:PORT ./bin/spark-shell
Note that if you are running spark-shell from one of the spark cluster machines, the `spark-shell` script will
Note that if you are running spark-shell from one of the spark cluster machines, the `bin/spark-shell` script will
automatically set MASTER from the `SPARK_MASTER_IP` and `SPARK_MASTER_PORT` variables in `conf/spark-env.sh`.
You can also pass an option `-c <numCores>` to control the number of cores that spark-shell uses on the cluster.

View file

@ -245,7 +245,7 @@ $ nc -lk 9999
Then, in a different terminal, you can start NetworkWordCount by using
{% highlight bash %}
$ ./run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
{% endhighlight %}
This will make NetworkWordCount connect to the netcat server. Any lines typed in the terminal running the netcat server will be counted and printed on screen.
@ -283,7 +283,7 @@ Time: 1357008430000 ms
</td>
</table>
You can find more examples in `<Spark repo>/streaming/src/main/scala/org/apache/spark/streaming/examples/`. They can be run in the similar manner using `./run-example org.apache.spark.streaming.examples....` . Executing without any parameter would give the required parameter list. Further explanation to run them can be found in comments in the files.
You can find more examples in `<Spark repo>/streaming/src/main/scala/org/apache/spark/streaming/examples/`. They can be run in the similar manner using `./bin/run-example org.apache.spark.streaming.examples....` . Executing without any parameter would give the required parameter list. Further explanation to run them can be found in comments in the files.
# DStream Persistence
Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is, using `persist()` method on a DStream would automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data). For window-based operations like `reduceByWindow` and `reduceByKeyAndWindow` and state-based operations like `updateStateByKey`, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling `persist()`.

View file

@ -436,7 +436,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
def setup_standalone_cluster(master, slave_nodes, opts):
slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
ssh(master, opts, "/root/spark/bin/start-all.sh")
ssh(master, opts, "/root/spark/sbin/start-all.sh")
def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")

View file

@ -24,19 +24,19 @@ import org.apache.spark.api.java.function.Function2;
import java.io.Serializable;
import java.util.Arrays;
import java.util.StringTokenizer;
import java.util.Random;
import java.util.regex.Pattern;
/**
* Logistic regression based classification.
*/
public class JavaHdfsLR {
public final class JavaHdfsLR {
static int D = 10; // Number of dimensions
static Random rand = new Random(42);
private static final int D = 10; // Number of dimensions
private static final Random rand = new Random(42);
static class DataPoint implements Serializable {
public DataPoint(double[] x, double y) {
DataPoint(double[] x, double y) {
this.x = x;
this.y = y;
}
@ -46,20 +46,22 @@ public class JavaHdfsLR {
}
static class ParsePoint extends Function<String, DataPoint> {
private static final Pattern SPACE = Pattern.compile(" ");
@Override
public DataPoint call(String line) {
StringTokenizer tok = new StringTokenizer(line, " ");
double y = Double.parseDouble(tok.nextToken());
String[] tok = SPACE.split(line);
double y = Double.parseDouble(tok[0]);
double[] x = new double[D];
int i = 0;
while (i < D) {
x[i] = Double.parseDouble(tok.nextToken());
i += 1;
for (int i = 0; i < D; i++) {
x[i] = Double.parseDouble(tok[i + 1]);
}
return new DataPoint(x, y);
}
}
static class VectorSum extends Function2<double[], double[], double[]> {
@Override
public double[] call(double[] a, double[] b) {
double[] result = new double[D];
for (int j = 0; j < D; j++) {
@ -70,12 +72,13 @@ public class JavaHdfsLR {
}
static class ComputeGradient extends Function<DataPoint, double[]> {
double[] weights;
private final double[] weights;
public ComputeGradient(double[] weights) {
ComputeGradient(double[] weights) {
this.weights = weights;
}
@Override
public double[] call(DataPoint p) {
double[] gradient = new double[D];
for (int i = 0; i < D; i++) {
@ -106,7 +109,7 @@ public class JavaHdfsLR {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaHdfsLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[2]);

View file

@ -27,19 +27,24 @@ import org.apache.spark.util.Vector;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
* K-means clustering using Java API.
*/
public class JavaKMeans {
public final class JavaKMeans {
private static final Pattern SPACE = Pattern.compile(" ");
/** Parses numbers split by whitespace to a vector */
static Vector parseVector(String line) {
String[] splits = line.split(" ");
String[] splits = SPACE.split(line);
double[] data = new double[splits.length];
int i = 0;
for (String s : splits)
data[i] = Double.parseDouble(splits[i++]);
for (String s : splits) {
data[i] = Double.parseDouble(s);
i++;
}
return new Vector(data);
}
@ -74,7 +79,7 @@ public class JavaKMeans {
System.exit(1);
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
String path = args[1];
int K = Integer.parseInt(args[2]);
double convergeDist = Double.parseDouble(args[3]);
@ -82,7 +87,7 @@ public class JavaKMeans {
JavaRDD<Vector> data = sc.textFile(path).map(
new Function<String, Vector>() {
@Override
public Vector call(String line) throws Exception {
public Vector call(String line) {
return parseVector(line);
}
}
@ -96,7 +101,7 @@ public class JavaKMeans {
JavaPairRDD<Integer, Vector> closest = data.map(
new PairFunction<Vector, Integer, Vector>() {
@Override
public Tuple2<Integer, Vector> call(Vector vector) throws Exception {
public Tuple2<Integer, Vector> call(Vector vector) {
return new Tuple2<Integer, Vector>(
closestPoint(vector, centroids), vector);
}
@ -107,7 +112,8 @@ public class JavaKMeans {
JavaPairRDD<Integer, List<Vector>> pointsGroup = closest.groupByKey();
Map<Integer, Vector> newCentroids = pointsGroup.mapValues(
new Function<List<Vector>, Vector>() {
public Vector call(List<Vector> ps) throws Exception {
@Override
public Vector call(List<Vector> ps) {
return average(ps);
}
}).collectAsMap();
@ -122,8 +128,9 @@ public class JavaKMeans {
} while (tempDist > convergeDist);
System.out.println("Final centers:");
for (Vector c : centroids)
for (Vector c : centroids) {
System.out.println(c);
}
System.exit(0);

View file

@ -35,9 +35,9 @@ import java.util.regex.Pattern;
/**
* Executes a roll up-style query against Apache logs.
*/
public class JavaLogQuery {
public final class JavaLogQuery {
public static List<String> exampleApacheLogs = Lists.newArrayList(
public static final List<String> exampleApacheLogs = Lists.newArrayList(
"10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " +
"HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " +
"Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " +
@ -51,14 +51,14 @@ public class JavaLogQuery {
"3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " +
"0 73.23.2.15 images.com 1358492557 - Whatup");
public static Pattern apacheLogRegex = Pattern.compile(
public static final Pattern apacheLogRegex = Pattern.compile(
"^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*");
/** Tracks the total query count and number of aggregate bytes for a particular group. */
public static class Stats implements Serializable {
private int count;
private int numBytes;
private final int count;
private final int numBytes;
public Stats(int count, int numBytes) {
this.count = count;
@ -92,32 +92,32 @@ public class JavaLogQuery {
if (m.find()) {
int bytes = Integer.parseInt(m.group(7));
return new Stats(1, bytes);
}
else
} else {
return new Stats(1, 0);
}
}
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
if (args.length == 0) {
System.err.println("Usage: JavaLogQuery <master> [logFile]");
System.exit(1);
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLogQuery.class));
JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
@Override
public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception {
public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
}
});
JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
@Override
public Stats call(Stats stats, Stats stats2) throws Exception {
public Stats call(Stats stats, Stats stats2) {
return stats.merge(stats2);
}
});

View file

@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFunction;
import java.util.List;
import java.util.ArrayList;
import java.util.regex.Pattern;
/**
* Computes the PageRank of URLs from an input file. Input file should
@ -38,7 +39,9 @@ import java.util.ArrayList;
* ...
* where URL and their neighbors are separated by space(s).
*/
public class JavaPageRank {
public final class JavaPageRank {
private static final Pattern SPACES = Pattern.compile("\\s+");
private static class Sum extends Function2<Double, Double, Double> {
@Override
public Double call(Double a, Double b) {
@ -53,7 +56,7 @@ public class JavaPageRank {
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaPageRank.class));
// Loads in input file. It should be in format of:
// URL neighbor URL
@ -66,7 +69,7 @@ public class JavaPageRank {
JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = s.split("\\s+");
String[] parts = SPACES.split(s);
return new Tuple2<String, String>(parts[0], parts[1]);
}
}).distinct().groupByKey().cache();
@ -74,7 +77,7 @@ public class JavaPageRank {
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
@Override
public Double call(List<String> rs) throws Exception {
public Double call(List<String> rs) {
return 1.0;
}
});
@ -97,7 +100,7 @@ public class JavaPageRank {
// Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() {
@Override
public Double call(Double sum) throws Exception {
public Double call(Double sum) {
return 0.15 + sum * 0.85;
}
});

View file

@ -26,8 +26,7 @@ import java.util.ArrayList;
import java.util.List;
/** Computes an approximation to pi */
public class JavaSparkPi {
public final class JavaSparkPi {
public static void main(String[] args) throws Exception {
if (args.length == 0) {
@ -36,26 +35,27 @@ public class JavaSparkPi {
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class));
int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++)
for (int i = 0; i < n; i++) {
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) throws Exception {
public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});

View file

@ -31,11 +31,11 @@ import java.util.Set;
/**
* Transitive closure on a graph, implemented in Java.
*/
public class JavaTC {
public final class JavaTC {
static int numEdges = 200;
static int numVertices = 100;
static Random rand = new Random(42);
private static final int numEdges = 200;
private static final int numVertices = 100;
private static final Random rand = new Random(42);
static List<Tuple2<Integer, Integer>> generateGraph() {
Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges);
@ -43,15 +43,18 @@ public class JavaTC {
int from = rand.nextInt(numVertices);
int to = rand.nextInt(numVertices);
Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to);
if (from != to) edges.add(e);
if (from != to) {
edges.add(e);
}
}
return new ArrayList<Tuple2<Integer, Integer>>(edges);
}
static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
Integer, Integer> {
static ProjectFn INSTANCE = new ProjectFn();
static final ProjectFn INSTANCE = new ProjectFn();
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1());
}
@ -64,7 +67,7 @@ public class JavaTC {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaTC.class));
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
@ -76,6 +79,7 @@ public class JavaTC {
// Because join() joins on keys, the edges are stored in reversed order.
JavaPairRDD<Integer, Integer> edges = tc.map(
new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
return new Tuple2<Integer, Integer>(e._2(), e._1());
}

View file

@ -27,8 +27,11 @@ import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public class JavaWordCount {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaWordCount <master> <file>");
@ -36,22 +39,25 @@ public class JavaWordCount {
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}

View file

@ -26,28 +26,32 @@ import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import java.util.Arrays;
import java.util.StringTokenizer;
import java.util.regex.Pattern;
import scala.Tuple2;
/**
* Example using MLLib ALS from Java.
*/
public class JavaALS {
public final class JavaALS {
static class ParseRating extends Function<String, Rating> {
private static final Pattern COMMA = Pattern.compile(",");
@Override
public Rating call(String line) {
StringTokenizer tok = new StringTokenizer(line, ",");
int x = Integer.parseInt(tok.nextToken());
int y = Integer.parseInt(tok.nextToken());
double rating = Double.parseDouble(tok.nextToken());
String[] tok = COMMA.split(line);
int x = Integer.parseInt(tok[0]);
int y = Integer.parseInt(tok[1]);
double rating = Double.parseDouble(tok[2]);
return new Rating(x, y, rating);
}
}
static class FeaturesToString extends Function<Tuple2<Object, double[]>, String> {
@Override
public String call(Tuple2<Object, double[]> element) {
return element._1().toString() + "," + Arrays.toString(element._2());
return element._1() + "," + Arrays.toString(element._2());
}
}
@ -68,7 +72,7 @@ public class JavaALS {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<Rating> ratings = lines.map(new ParseRating());

View file

@ -25,20 +25,22 @@ import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import java.util.Arrays;
import java.util.StringTokenizer;
import java.util.regex.Pattern;
/**
* Example using MLLib KMeans from Java.
*/
public class JavaKMeans {
public final class JavaKMeans {
static class ParsePoint extends Function<String, double[]> {
private static final Pattern SPACE = Pattern.compile(" ");
@Override
public double[] call(String line) {
StringTokenizer tok = new StringTokenizer(line, " ");
int numTokens = tok.countTokens();
double[] point = new double[numTokens];
for (int i = 0; i < numTokens; ++i) {
point[i] = Double.parseDouble(tok.nextToken());
String[] tok = SPACE.split(line);
double[] point = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
point[i] = Double.parseDouble(tok[i]);
}
return point;
}
@ -62,7 +64,7 @@ public class JavaKMeans {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<double[]> points = lines.map(new ParsePoint());

View file

@ -27,22 +27,25 @@ import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import java.util.Arrays;
import java.util.StringTokenizer;
import java.util.regex.Pattern;
/**
* Logistic regression based classification using ML Lib.
*/
public class JavaLR {
public final class JavaLR {
static class ParsePoint extends Function<String, LabeledPoint> {
private static final Pattern COMMA = Pattern.compile(",");
private static final Pattern SPACE = Pattern.compile(" ");
@Override
public LabeledPoint call(String line) {
String[] parts = line.split(",");
String[] parts = COMMA.split(line);
double y = Double.parseDouble(parts[0]);
StringTokenizer tok = new StringTokenizer(parts[1], " ");
int numTokens = tok.countTokens();
double[] x = new double[numTokens];
for (int i = 0; i < numTokens; ++i) {
x[i] = Double.parseDouble(tok.nextToken());
String[] tok = SPACE.split(parts[1]);
double[] x = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, x);
}
@ -59,7 +62,7 @@ public class JavaLR {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
double stepSize = Double.parseDouble(args[2]);

View file

@ -36,7 +36,10 @@ import org.apache.spark.streaming.dstream.SparkFlumeEvent;
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
*/
public class JavaFlumeEventCount {
public final class JavaFlumeEventCount {
private JavaFlumeEventCount() {
}
public static void main(String[] args) {
if (args.length != 3) {
System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
@ -50,7 +53,8 @@ public class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);

View file

@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;
import com.google.common.collect.Lists;
import org.apache.spark.api.java.function.FlatMapFunction;
@ -41,11 +42,16 @@ import scala.Tuple2;
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
* `./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02,
* `./bin/run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02,
* zoo03 my-consumer-group topic1,topic2 1`
*/
public class JavaKafkaWordCount {
public final class JavaKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
private JavaKafkaWordCount() {
}
public static void main(String[] args) {
if (args.length < 5) {
System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>");
@ -54,7 +60,8 @@ public class JavaKafkaWordCount {
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount",
new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
new Duration(2000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
int numThreads = Integer.parseInt(args[4]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
@ -67,7 +74,7 @@ public class JavaKafkaWordCount {
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) throws Exception {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
@ -75,19 +82,19 @@ public class JavaKafkaWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(x.split(" "));
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.map(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

View file

@ -27,6 +27,8 @@ import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.regex.Pattern;
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
* Usage: NetworkWordCount <master> <hostname> <port>
@ -38,7 +40,12 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
* and then run the example
* `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
*/
public class JavaNetworkWordCount {
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
private JavaNetworkWordCount() {
}
public static void main(String[] args) {
if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
@ -48,7 +55,8 @@ public class JavaNetworkWordCount {
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
new Duration(1000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
@ -56,18 +64,18 @@ public class JavaNetworkWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(x.split(" "));
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.map(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

View file

@ -31,8 +31,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
public class JavaQueueStream {
public static void main(String[] args) throws InterruptedException {
public final class JavaQueueStream {
private JavaQueueStream() {
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaQueueStream <master>");
System.exit(1);
@ -40,7 +43,7 @@ public class JavaQueueStream {
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
@ -62,14 +65,14 @@ public class JavaQueueStream {
JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) throws Exception {
public Tuple2<Integer, Integer> call(Integer i) {
return new Tuple2<Integer, Integer>(i % 10, 1);
}
});
JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

View file

@ -33,7 +33,7 @@ object BroadcastTest {
System.setProperty("spark.broadcast.blockSize", blockSize)
val sc = new SparkContext(args(0), "Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000

View file

@ -27,7 +27,7 @@ object ExceptionHandlingTest {
}
val sc = new SparkContext(args(0), "ExceptionHandlingTest",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
if (math.random > 0.75)
throw new Exception("Testing exception handling")

View file

@ -34,7 +34,7 @@ object GroupByTest {
var numReducers = if (args.length > 4) args(4).toInt else numMappers
val sc = new SparkContext(args(0), "GroupBy Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random

View file

@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormat
object HBaseTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HBaseTest",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val conf = HBaseConfiguration.create()

View file

@ -22,7 +22,7 @@ import org.apache.spark._
object HdfsTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HdfsTest",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val file = sc.textFile(args(1))
val mapped = file.map(s => s.length).cache()
for (iter <- 1 to 10) {

View file

@ -45,7 +45,7 @@ object LogQuery {
}
val sc = new SparkContext(args(0), "Log Query",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val dataSet =
if (args.length == 2) sc.textFile(args(1))

View file

@ -28,7 +28,7 @@ object MultiBroadcastTest {
}
val sc = new SparkContext(args(0), "Multi-Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000

View file

@ -36,7 +36,7 @@ object SimpleSkewedGroupByTest {
var ratio = if (args.length > 5) args(5).toInt else 5.0
val sc = new SparkContext(args(0), "GroupBy Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random

View file

@ -34,7 +34,7 @@ object SkewedGroupByTest {
var numReducers = if (args.length > 4) args(4).toInt else numMappers
val sc = new SparkContext(args(0), "GroupBy Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random

View file

@ -112,7 +112,7 @@ object SparkALS {
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
val sc = new SparkContext(host, "SparkALS",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val R = generateR()

View file

@ -54,7 +54,7 @@ object SparkHdfsLR {
val inputPath = args(1)
val conf = SparkHadoopUtil.get.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))))
val lines = sc.textFile(inputPath)

View file

@ -55,7 +55,7 @@ object SparkKMeans {
System.exit(1)
}
val sc = new SparkContext(args(0), "SparkLocalKMeans",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val lines = sc.textFile(args(1))
val data = lines.map(parseVector _).cache()
val K = args(2).toInt

View file

@ -49,7 +49,7 @@ object SparkLR {
System.exit(1)
}
val sc = new SparkContext(args(0), "SparkLR",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val numSlices = if (args.length > 1) args(1).toInt else 2
val points = sc.parallelize(generateData, numSlices).cache()

View file

@ -38,7 +38,7 @@ object SparkPageRank {
}
var iters = args(2).toInt
val ctx = new SparkContext(args(0), "PageRank",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val lines = ctx.textFile(args(1), 1)
val links = lines.map{ s =>
val parts = s.split("\\s+")

View file

@ -29,7 +29,7 @@ object SparkPi {
System.exit(1)
}
val spark = new SparkContext(args(0), "SparkPi",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>

View file

@ -46,7 +46,7 @@ object SparkTC {
System.exit(1)
}
val spark = new SparkContext(args(0), "SparkTC",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
var tc = spark.parallelize(generateGraph, slices).cache()

View file

@ -134,9 +134,9 @@ object FeederActor {
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
*
* To run this example locally, you may run Feeder Actor as
* `$ ./run-example spark.streaming.examples.FeederActor 127.0.1.1 9999`
* `$ ./bin/run-example org.apache.spark.streaming.examples.FeederActor 127.0.1.1 9999`
* and then run the example
* `$ ./run-example spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
* `$ ./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {
@ -151,7 +151,7 @@ object ActorWordCount {
// Create the context and set the batch size
val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
/*
* Following is the use of actorStream to plug in custom actor as receiver

View file

@ -48,7 +48,7 @@ object FlumeEventCount {
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a flume stream
val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)

View file

@ -28,7 +28,7 @@ import org.apache.spark.streaming.StreamingContext._
* <directory> is the directory that Spark Streaming will use to find and read new text files.
*
* To run this on your local machine on directory `localdir`, run this example
* `$ ./run-example spark.streaming.examples.HdfsWordCount local[2] localdir`
* `$ ./bin/run-example org.apache.spark.streaming.examples.HdfsWordCount local[2] localdir`
* Then create a text file in `localdir` and the words in the file will get counted.
*/
object HdfsWordCount {
@ -40,7 +40,7 @@ object HdfsWordCount {
// Create the context
val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created

View file

@ -35,7 +35,7 @@ import org.apache.spark.streaming.util.RawTextHelper._
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
* `./run-example spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
* `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
*/
object KafkaWordCount {
def main(args: Array[String]) {
@ -48,7 +48,7 @@ object KafkaWordCount {
val Array(master, zkQuorum, group, topics, numThreads) = args
val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap

View file

@ -79,9 +79,9 @@ object MQTTPublisher {
* <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
*
* To run this example locally, you may run publisher as
* `$ ./run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo`
* `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo`
* and run the example as
* `$ ./run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
* `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
*/
object MQTTWordCount {
@ -96,7 +96,7 @@ object MQTTWordCount {
val Seq(master, brokerUrl, topic) = args.toSeq
val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
Seq(System.getenv("SPARK_EXAMPLES_JAR")))
StreamingContext.jarOfClass(this.getClass))
val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
val words = lines.flatMap(x => x.toString.split(" "))

View file

@ -29,7 +29,7 @@ import org.apache.spark.streaming.StreamingContext._
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./run-example spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
* `$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
*/
object NetworkWordCount {
def main(args: Array[String]) {
@ -41,7 +41,7 @@ object NetworkWordCount {
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')

View file

@ -33,7 +33,7 @@ object QueueStream {
// Create the context
val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream

View file

@ -49,7 +49,7 @@ object RawNetworkGrep {
// Create the context
val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Warm up the JVMs on master and slave for JIT compilation to kick in
RawTextHelper.warmUp(ssc.sparkContext)

View file

@ -29,7 +29,7 @@ import org.apache.spark.streaming.StreamingContext._
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./run-example spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
* `$ ./bin/run-example org.apache.spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
*/
object StatefulNetworkWordCount {
def main(args: Array[String]) {
@ -49,7 +49,7 @@ object StatefulNetworkWordCount {
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(1),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
ssc.checkpoint(".")
// Create a NetworkInputDStream on target ip:port and count the

View file

@ -60,7 +60,7 @@ object TwitterAlgebirdCMS {
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)

Some files were not shown because too many files have changed in this diff Show more