Refactored the streaming project to separate external libraries like Twitter, Kafka, Flume, etc.
At a high level, these are the following changes.
1. All the external code was put in `SPARK_HOME/external/` as separate SBT projects and Maven modules. Their artifact names are `spark-streaming-twitter`, `spark-streaming-kafka`, etc. Both SparkBuild.scala and pom.xml files have been updated. References to external libraries and repositories have been removed from the settings of root and streaming projects/modules.
2. To avail the external functionality (say, creating a Twitter stream), the developer has to `import org.apache.spark.streaming.twitter._` . For Scala API, the developer has to call `TwitterUtils.createStream(streamingContext, ...)`. For the Java API, the developer has to call `TwitterUtils.createStream(javaStreamingContext, ...)`.
3. Each external project has its own scala and java unit tests. Note the unit tests of each external library use classes of the streaming unit tests (`TestSuiteBase`, `LocalJavaStreamingContext`, etc.). To enable this code sharing among test classes, `dependsOn(streaming % "compile->compile,test->test")` was used in the SparkBuild.scala . In the streaming/pom.xml, an additional `maven-jar-plugin` was necessary to capture this dependency (see comment inside the pom.xml for more information).
4. Jars of the external projects have been added to examples project but not to the assembly project.
5. In some files, imports have been rearrange to conform to the Spark coding guidelines.
Add a script to download sbt if not present on the system
As per the discussion on the dev mailing list this script will use the system sbt if present or otherwise attempt to install the sbt launcher. The fall back error message in the event it fails instructs the user to install sbt. While the URLs it fetches from aren't controlled by the spark project directly, they are stable and the current authoritative sources.
Approximate distinct count
Added countApproxDistinct() to RDD and countApproxDistinctByKey() to PairRDDFunctions to approximately count distinct number of elements and distinct number of values per key, respectively. Both functions use HyperLogLog from stream-lib for counting. Both functions take a parameter that controls the trade-off between accuracy and memory consumption. Also added Scala docs and test suites for both methods.
Without this, in some cases, Ivy attempts to download the wrong file
and fails, stopping the whole build. See bug for more details.
(This is probably also the beginning of the slow death of our
recently prettified dependencies. Form follow function.)
This also includes:
-Change `isNewYarn` to `isNewHadoop`, since the protobuf-2.5 dependency is from Hadoop-2.2 itself.
-Regexp bugix
Credits to @alig for this patch.
Add graphite sink for metrics
This adds a metrics sink for graphite. The sink must
be configured with the host and port of a graphite node
and optionally may be configured with a prefix that will
be prepended to all metrics that are sent to graphite.
I've diff'd this patch against my own -- since they were both created
independently, this means that two sets of eyes have gone over all the
merge conflicts that were created, so I'm feeling significantly more
confident in the resulting PR.
@rxin has looked at the changes to the repl and is resoundingly
confident that they are correct.
spark-assembly.jar fails to authenticate with YARN ResourceManager
The META-INF/services/ sbt MergeStrategy was discarding support for Kerberos, among others. This pull request changes to a merge strategy similar to sbt-assembly's default. I've also included an update to sbt-assembly 0.9.2, a minor fix to it's zip file handling.
Allow spark on yarn to be run from HDFS.
Allows the spark.jar, app.jar, and log4j.properties to be put into hdfs. Allows you to specify the files on a different hdfs cluster and it will copy them over. It makes sure permissions are correct and makes sure to put things into public distributed cache so they can be reused amongst users if their permissions are appropriate. Also add a bit of error handling for missing arguments.
sbt-assembly is setup to pick the first META-INF/services/org.apache.hadoop.security.SecurityInfo file instead of merging them. This causes Kerberos authentication to fail, this manifests itself in the "info:null" debug log statement:
DEBUG SaslRpcClient: Get token info proto:interface org.apache.hadoop.yarn.api.ApplicationClientProtocolPB info:null
DEBUG SaslRpcClient: Get kerberos info proto:interface org.apache.hadoop.yarn.api.ApplicationClientProtocolPB info:null
ERROR UserGroupInformation: PriviledgedActionException as:foo@BAR (auth:KERBEROS) cause:org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
DEBUG UserGroupInformation: PrivilegedAction as:foo@BAR (auth:KERBEROS) from:org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:583)
WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
ERROR UserGroupInformation: PriviledgedActionException as:foo@BAR (auth:KERBEROS) cause:java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
This previously would just contain a single class:
$ unzip -c assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.2.0.jar META-INF/services/org.apache.hadoop.security.SecurityInfo
Archive: assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.2.0.jar
inflating: META-INF/services/org.apache.hadoop.security.SecurityInfo
org.apache.hadoop.security.AnnotatedSecurityInfo
And now has the full list of classes:
$ unzip -c assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.2.0.jar META-INF/services/org.apache.hadoop.security.SecurityInfoArchive: assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.2.0.jar
inflating: META-INF/services/org.apache.hadoop.security.SecurityInfo
org.apache.hadoop.security.AnnotatedSecurityInfo
org.apache.hadoop.mapreduce.v2.app.MRClientSecurityInfo
org.apache.hadoop.mapreduce.v2.security.client.ClientHSSecurityInfo
org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo
org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo
org.apache.hadoop.yarn.security.SchedulerSecurityInfo
org.apache.hadoop.yarn.security.admin.AdminSecurityInfo
org.apache.hadoop.yarn.server.RMNMSecurityInfoClass
This adds a metrics sink for graphite. The sink must
be configured with the host and port of a graphite node
and optionally may be configured with a prefix that will
be prepended to all metrics that are sent to graphite.
Kafka uses an older version of jopt that causes bad conflicts with the version
used by spark-perf. It's not easy to remove this downstream because of the way
that spark-perf uses Spark (by including a spark assembly as an unmanaged jar).
This fixes the problem at its source by just never including it.
MQTT Adapter for Spark Streaming
MQTT is a machine-to-machine (M2M)/Internet of Things connectivity protocol.
It was designed as an extremely lightweight publish/subscribe messaging transport. You may read more about it here http://mqtt.org/
Message Queue Telemetry Transport (MQTT) is an open message protocol for M2M communications. It enables the transfer of telemetry-style data in the form of messages from devices like sensors and actuators, to mobile phones, embedded systems on vehicles, or laptops and full scale computers.
The protocol was invented by Andy Stanford-Clark of IBM, and Arlen Nipper of Cirrus Link Solutions
This protocol enables a publish/subscribe messaging model in an extremely lightweight way. It is useful for connections with remote locations where line of code and network bandwidth is a constraint.
MQTT is one of the widely used protocol for 'Internet of Things'. This protocol is getting much attraction as anything and everything is getting connected to internet and they all produce data. Researchers and companies predict some 25 billion devices will be connected to the internet by 2015.
Plugin/Support for MQTT is available in popular MQs like RabbitMQ, ActiveMQ etc.
Support for MQTT in Spark will help people with Internet of Things (IoT) projects to use Spark Streaming for their real time data processing needs (from sensors and other embedded devices etc).
Add SBT target to assemble dependencies
This pull request is an attempt to address the long assembly build times during development. Instead of rebuilding the assembly jar for every Spark change, this pull request adds a new SBT target `spark` that packages all the Spark modules and builds an assembly of the dependencies.
So the work flow that should work now would be something like
```
./sbt/sbt spark # Doing this once should suffice
## Make changes
./sbt/sbt compile
./sbt/sbt test or ./spark-shell
```
- Examples assembly included a log4j.properties which clobbered Spark's
- Example had an error where some classes weren't serializable
- Did some other clean-up in this example
Standalone Scheduler fault tolerance using ZooKeeper
This patch implements full distributed fault tolerance for standalone scheduler Masters.
There is only one master Leader at a time, which is actively serving scheduling
requests. If this Leader crashes, another master will eventually be elected, reconstruct
the state from the first Master, and continue serving scheduling requests.
Leader election is performed using the ZooKeeper leader election pattern. We try to minimize
the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of
retries and session monitoring on top of the ZooKeeper client.
Master failover follows directly from the single-node Master recovery via the file
system (patch d5a96fe), save that the Master state is stored in ZooKeeper instead.
Configuration:
By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE).
By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url
to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled.
By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory
to an appropriate directory accessible by the Master, we will keep the behavior of from d5a96fe.
Additionally, places where a Master could be specificied by a spark:// url can now take
comma-delimited lists to specify backup masters. Note that this is only used for registration
of NEW Workers and application Clients. Once a Worker or Client has registered with the
Master Leader, it is "in the system" and will never need to register again.
Conflicts:
bagel/pom.xml
core/pom.xml
core/src/test/scala/org/apache/spark/ui/UISuite.scala
examples/pom.xml
mllib/pom.xml
pom.xml
project/SparkBuild.scala
repl/pom.xml
streaming/pom.xml
tools/pom.xml
In scala 2.10, a shorter representation is used for naming artifacts
so changed to shorter scala version for artifacts and made it a property in pom.
Resolving package conflicts with hadoop 0.23.9
Hadoop 0.23.9 is having a package conflict with easymock's dependencies.
(cherry picked from commit 023e3fdf00)
Signed-off-by: Reynold Xin <rxin@apache.org>
This patch implements full distributed fault tolerance for standalone scheduler Masters.
There is only one master Leader at a time, which is actively serving scheduling
requests. If this Leader crashes, another master will eventually be elected, reconstruct
the state from the first Master, and continue serving scheduling requests.
Leader election is performed using the ZooKeeper leader election pattern. We try to minimize
the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of
retries and session monitoring on top of the ZooKeeper client.
Master failover follows directly from the single-node Master recovery via the file
system (patch 194ba4b8), save that the Master state is stored in ZooKeeper instead.
Configuration:
By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE).
By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url
to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled.
By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory
to an appropriate directory accessible by the Master, we will keep the behavior of from 194ba4b8.
Additionally, places where a Master could be specificied by a spark:// url can now take
comma-delimited lists to specify backup masters. Note that this is only used for registration
of NEW Workers and application Clients. Once a Worker or Client has registered with the
Master Leader, it is "in the system" and will never need to register again.
Forthcoming:
Documentation, tests (! - only ad hoc testing has been performed so far)
I do not intend for this commit to be merged until tests are added, but this patch should
still be mostly reviewable until then.
Due to this change in HDFS:
https://issues.apache.org/jira/browse/HADOOP-7549
there is a bug when using the new assembly builds. The symptom is that any HDFS access
results in an exception saying "No filesystem for scheme 'hdfs'". This adds a merge
strategy in the assembly build which fixes the problem.
This includes the following changes:
- The "assembly" package now builds in Maven by default, and creates an
assembly containing both hadoop-client and Spark, unlike the old
BigTop distribution assembly that skipped hadoop-client
- There is now a bigtop-dist package to build the old BigTop assembly
- The repl-bin package is no longer built by default since the scripts
don't reply on it; instead it can be enabled with -Prepl-bin
- Py4J is now included in the assembly/lib folder as a local Maven repo,
so that the Maven package can link to it
- run-example now adds the original Spark classpath as well because the
Maven examples assembly lists spark-core and such as provided
- The various Maven projects add a spark-yarn dependency correctly
This commit makes Spark invocation saner by using an assembly JAR to
find all of Spark's dependencies instead of adding all the JARs in
lib_managed. It also packages the examples into an assembly and uses
that as SPARK_EXAMPLES_JAR. Finally, it replaces the old "run" script
with two better-named scripts: "run-examples" for examples, and
"spark-class" for Spark internal classes (e.g. REPL, master, etc). This
is also designed to minimize the confusion people have in trying to use
"run" to run their own classes; it's not meant to do that, but now at
least if they look at it, they can modify run-examples to do a decent
job for them.
As part of this, Bagel's examples are also now properly moved to the
examples package instead of bagel.
It made the JSON creation slightly more complicated, but reduces one external dependency. The scala library also properly escape "/" (which lift-json doesn't).
- Changes ALS to accept RDD[Rating] instead of (Int, Int, Double) making it
easier to call from Java
- Renames class methods from `train` to `run` to enable static methods to be
called from Java.
- Add unit tests which check if both static / class methods can be called.
- Also add examples which port the main() function in ALS, KMeans to the
examples project.
Couple of minor changes to existing code:
- Add a toJavaRDD method in RDD to convert scala RDD to java RDD easily
- Workaround a bug where using double[] from Java leads to class cast exception in
KMeans init
requiring Spark to be installed. Using 'make_distribution.sh' a user
can put a Spark distribution at a URI supported by Mesos (e.g.,
'hdfs://...') and then set that when launching their job. Also added
SPARK_EXECUTOR_URI for the REPL.
This is used to find methods in the Scala API that
need to be ported to the Java API. To use it:
./run spark.tools.JavaAPICompletenessChecker
Conflicts:
project/SparkBuild.scala
run
run2.cmd
- Exclude a version of ASM 3.x that comes from HBase
- Don't use a special ASF repo for HBase
- Update SLF4J version
- Add sbt-dependency-graph plugin so we can easily find dependency trees
Unfortunately, in Akka 2.1, ActorSystem.awaitTermination hangs for
remote actors, and Akka also leaves a non-daemon Netty thread even when
run in daemon mode. Thus I had to comment out some of the calls to
awaitTermination, and we still have one failing test.
dependency on hbase introduces netty-3.2.2 which conflicts with
netty-3.5.3 already in Spark. This caused multiple test failures.
This reverts commit 0f1b7a06e1, reversing
changes made to aacca1b8a8.
Shuffle Performance Optimization: do not send 0-byte block requests to reduce network messages
change reference from io.Source to scala.io.Source to avoid looking into io.netty package
Signed-off-by: shane-huang <shengsheng.huang@intel.com>
- akka 2.0.3 → 2.1.0
- spray 1.0-M1 → 1.1-M7
For now the repl subproject is commented out, as scala reflection api changed very much since the introduction of macros.
By default - I'm leaving this commented out. This is because
there is a bug in the PGP signing plugin which causes it to active
even duing a publish-local. So we'll just uncomment when we decide
to publish.
the build file so that mesos-0.9.0-incubating.jar (which contains the
same class files, but has a silightly different name) will be pulled
down from Maven Central instead.