[SPARK-22142][BUILD][STREAMING] Move Flume support behind a profile, take 2

## What changes were proposed in this pull request?

Move flume behind a profile, take 2. See https://github.com/apache/spark/pull/19365 for most of the back-story.

This change should fix the problem by removing the examples module dependency and moving Flume examples to the module itself. It also adds deprecation messages, per a discussion on dev about deprecating for 2.3.0.

## How was this patch tested?

Existing tests, which still enable flume integration.

Author: Sean Owen <sowen@cloudera.com>

Closes #19412 from srowen/SPARK-22142.2.
This commit is contained in:
Sean Owen 2017-10-06 15:08:28 +01:00
parent 83488cc318
commit 0c03297bf0
16 changed files with 73 additions and 40 deletions

View file

@ -84,9 +84,9 @@ MVN="build/mvn --force"
# Hive-specific profiles for some builds
HIVE_PROFILES="-Phive -Phive-thriftserver"
# Profiles for publishing snapshots and release to Maven Central
PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
PUBLISH_PROFILES="-Pmesos -Pyarn -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
# Profiles for building binary releases
BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr"
BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pflume -Psparkr"
# Scala 2.11 only profiles for some builds
SCALA_2_11_PROFILES="-Pkafka-0-8"
# Scala 2.12 only profiles for some builds

View file

@ -24,7 +24,7 @@ set -e
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
cd "$FWDIR"
SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pflume -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)"
OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"

View file

@ -25,6 +25,7 @@ ERRORS=$(echo -e "q\n" \
-Pmesos \
-Pkafka-0-8 \
-Pyarn \
-Pflume \
-Phive \
-Phive-thriftserver \
scalastyle test:scalastyle \

View file

@ -279,6 +279,12 @@ streaming_flume_sink = Module(
source_file_regexes=[
"external/flume-sink",
],
build_profile_flags=[
"-Pflume",
],
environ={
"ENABLE_FLUME_TESTS": "1"
},
sbt_test_goals=[
"streaming-flume-sink/test",
]
@ -291,6 +297,12 @@ streaming_flume = Module(
source_file_regexes=[
"external/flume",
],
build_profile_flags=[
"-Pflume",
],
environ={
"ENABLE_FLUME_TESTS": "1"
},
sbt_test_goals=[
"streaming-flume/test",
]
@ -302,7 +314,13 @@ streaming_flume_assembly = Module(
dependencies=[streaming_flume, streaming_flume_sink],
source_file_regexes=[
"external/flume-assembly",
]
],
build_profile_flags=[
"-Pflume",
],
environ={
"ENABLE_FLUME_TESTS": "1"
}
)

View file

@ -29,7 +29,7 @@ export LC_ALL=C
# TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution.
# NOTE: These should match those in the release publishing script
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Phive"
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Pflume -Phive"
MVN="build/mvn"
HADOOP_PROFILES=(
hadoop-2.6

View file

@ -100,6 +100,13 @@ Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.
Kafka 0.10 support is still automatically built.
## Building with Flume support
Apache Flume support must be explicitly enabled with the `flume` profile.
Note: Flume support is deprecated as of Spark 2.3.0.
./build/mvn -Pflume -DskipTests clean package
## Building submodules individually
It's possible to build Spark sub-modules using the `mvn -pl` option.

View file

@ -5,6 +5,8 @@ title: Spark Streaming + Flume Integration Guide
[Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this.
**Note: Flume support is deprecated as of Spark 2.3.0.**
## Approach 1: Flume-style Push-based Approach
Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps.
@ -44,8 +46,7 @@ configuring Flume agents.
val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala).
See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$).
</div>
<div data-lang="java" markdown="1">
import org.apache.spark.streaming.flume.*;
@ -53,8 +54,7 @@ configuring Flume agents.
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]);
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html).
</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.flume import FlumeUtils
@ -62,8 +62,7 @@ configuring Flume agents.
flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/flume_wordcount.py).
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
</div>
</div>
@ -162,8 +161,6 @@ configuring Flume agents.
</div>
</div>
See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
Note that each input DStream can be configured to receive data from multiple sinks.
3. **Deploying:** This is same as the first approach.

View file

@ -34,7 +34,6 @@
<sbt.project.name>examples</sbt.project.name>
<build.testJarPhase>none</build.testJarPhase>
<build.copyDependenciesPhase>package</build.copyDependenciesPhase>
<flume.deps.scope>provided</flume.deps.scope>
<hadoop.deps.scope>provided</hadoop.deps.scope>
<hive.deps.scope>provided</hive.deps.scope>
<parquet.deps.scope>provided</parquet.deps.scope>
@ -78,12 +77,6 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>

View file

@ -48,8 +48,6 @@ public final class JavaFlumeEventCount {
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
String host = args[0];
int port = Integer.parseInt(args[1]);

View file

@ -47,8 +47,6 @@ object FlumeEventCount {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(host, IntParam(port)) = args
val batchInterval = Milliseconds(2000)

View file

@ -44,8 +44,6 @@ object FlumePollingEventCount {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(host, IntParam(port)) = args
val batchInterval = Milliseconds(2000)

View file

@ -30,6 +30,7 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
@deprecated("Deprecated without replacement", "2.3.0")
object FlumeUtils {
private val DEFAULT_POLLING_PARALLELISM = 5
private val DEFAULT_POLLING_BATCH_SIZE = 1000

13
pom.xml
View file

@ -98,15 +98,13 @@
<module>sql/core</module>
<module>sql/hive</module>
<module>assembly</module>
<module>external/flume</module>
<module>external/flume-sink</module>
<module>external/flume-assembly</module>
<module>examples</module>
<module>repl</module>
<module>launcher</module>
<module>external/kafka-0-10</module>
<module>external/kafka-0-10-assembly</module>
<module>external/kafka-0-10-sql</module>
<!-- See additional modules enabled by profiles below -->
</modules>
<properties>
@ -2583,6 +2581,15 @@
</dependencies>
</profile>
<profile>
<id>flume</id>
<modules>
<module>external/flume</module>
<module>external/flume-sink</module>
<module>external/flume-assembly</module>
</modules>
</profile>
<!-- Ganglia integration is not included by default due to LGPL-licensed code -->
<profile>
<id>spark-ganglia-lgpl</id>

View file

@ -43,11 +43,8 @@ object BuildCommons {
"catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10"
).map(ProjectRef(buildLocation, _))
val streamingProjects@Seq(
streaming, streamingFlumeSink, streamingFlume, streamingKafka010
) = Seq(
"streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-10"
).map(ProjectRef(buildLocation, _))
val streamingProjects@Seq(streaming, streamingKafka010) =
Seq("streaming", "streaming-kafka-0-10").map(ProjectRef(buildLocation, _))
val allProjects@Seq(
core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, _*
@ -56,9 +53,13 @@ object BuildCommons {
"tags", "sketch", "kvstore"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
val optionallyEnabledProjects@Seq(mesos, yarn, streamingKafka, sparkGangliaLgpl,
streamingKinesisAsl, dockerIntegrationTests, hadoopCloud) =
Seq("mesos", "yarn", "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
val optionallyEnabledProjects@Seq(mesos, yarn,
streamingFlumeSink, streamingFlume,
streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
dockerIntegrationTests, hadoopCloud) =
Seq("mesos", "yarn",
"streaming-flume-sink", "streaming-flume",
"streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) =

View file

@ -53,6 +53,8 @@ class FlumeUtils(object):
:param enableDecompression: Should netty server decompress input stream
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
:return: A DStream object
.. note:: Deprecated in 2.3.0
"""
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
helper = FlumeUtils._get_helper(ssc._sc)
@ -79,6 +81,8 @@ class FlumeUtils(object):
will result in this stream using more threads
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
:return: A DStream object
.. note:: Deprecated in 2.3.0
"""
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
hosts = []

View file

@ -1478,7 +1478,7 @@ def search_kafka_assembly_jar():
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/package streaming-kafka-0-8-assembly/assembly' or "
"'build/mvn package' before running this test.")
"'build/mvn -Pkafka-0-8 package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
@ -1495,7 +1495,7 @@ def search_flume_assembly_jar():
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
"'build/mvn package' before running this test.")
"'build/mvn -Pflume package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
@ -1516,6 +1516,9 @@ def search_kinesis_asl_assembly_jar():
return jars[0]
# Must be same as the variable and condition defined in modules.py
flume_test_environ_var = "ENABLE_FLUME_TESTS"
are_flume_tests_enabled = os.environ.get(flume_test_environ_var) == '1'
# Must be same as the variable and condition defined in modules.py
kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS"
are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1'
@ -1538,9 +1541,16 @@ if __name__ == "__main__":
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
FlumeStreamTests, FlumePollingStreamTests,
StreamingListenerTests]
if are_flume_tests_enabled:
testcases.append(FlumeStreamTests)
testcases.append(FlumePollingStreamTests)
else:
sys.stderr.write(
"Skipped test_flume_stream (enable by setting environment variable %s=1"
% flume_test_environ_var)
if are_kafka_tests_enabled:
testcases.append(KafkaStreamTests)
else: