spark-instrumented-optimizer/docs/streaming-flume-integration.md
Tathagata Das b004150adb [SPARK-4806] Streaming doc update for 1.2
Important updates to the streaming programming guide
- Make the fault-tolerance properties easier to understand, with information about write ahead logs
- Update the information about deploying the spark streaming app with information about Driver HA
- Update Receiver guide to discuss reliable vs unreliable receivers.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Josh Rosen <joshrosen@databricks.com>
Author: Josh Rosen <rosenville@gmail.com>

Closes #3653 from tdas/streaming-doc-update-1.2 and squashes the following commits:

f53154a [Tathagata Das] Addressed Josh's comments.
ce299e4 [Tathagata Das] Minor update.
ca19078 [Tathagata Das] Minor change
f746951 [Tathagata Das] Mentioned performance problem with WAL
7787209 [Tathagata Das] Merge branch 'streaming-doc-update-1.2' of github.com:tdas/spark into streaming-doc-update-1.2
2184729 [Tathagata Das] Updated Kafka and Flume guides with reliability information.
2f3178c [Tathagata Das] Added more information about writing reliable receivers in the custom receiver guide.
91aa5aa [Tathagata Das] Improved API Docs menu
5707581 [Tathagata Das] Added Pythn API badge
b9c8c24 [Tathagata Das] Merge pull request #26 from JoshRosen/streaming-programming-guide
b8c8382 [Josh Rosen] minor fixes
a4ef126 [Josh Rosen] Restructure parts of the fault-tolerance section to read a bit nicer when skipping over the headings
65f66cd [Josh Rosen] Fix broken link to fault-tolerance semantics section.
f015397 [Josh Rosen] Minor grammar / pluralization fixes.
3019f3a [Josh Rosen] Fix minor Markdown formatting issues
aa8bb87 [Tathagata Das] Small update.
195852c [Tathagata Das] Updated based on Josh's comments, updated receiver reliability and deploying section, and also updated configuration.
17b99fb [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-doc-update-1.2
a0217c0 [Tathagata Das] Changed Deploying menu layout
67fcffc [Tathagata Das] Added cluster mode + supervise example to submitting application guide.
e45453b [Tathagata Das] Update streaming guide, added deploying section.
192c7a7 [Tathagata Das] Added more info about Python API, and rewrote the checkpointing section.
2014-12-11 06:21:23 -08:00

7.9 KiB

layout title
global Spark Streaming + Flume Integration Guide

Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this.

Approach 1: Flume-style Push-based Approach

Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps.

General Requirements

Choose a machine in your cluster such that

  • When your Flume + Spark Streaming application is launched, one of the Spark workers must run on that machine.

  • Flume can be configured to push data to a port on that machine.

Due to the push model, the streaming application needs to be up, with the receiver scheduled and listening on the chosen port, for Flume to be able push data.

Configuring Flume

Configure Flume agent to send data to an Avro sink by having the following in the configuration file.

agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <chosen machine's hostname>
agent.sinks.avroSink.port = <chosen port on the machine>

See the Flume's documentation for more information about configuring Flume agents.

Configuring Spark Streaming Application

  1. Linking: In your SBT/Maven projrect definition, link your streaming application against the following artifact (see Linking section in the main programming guide for further information).

     groupId = org.apache.spark
     artifactId = spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}
     version = {{site.SPARK_VERSION_SHORT}}
    
  2. Programming: In the streaming application code, import FlumeUtils and create input DStream as follows.

    import org.apache.spark.streaming.flume._
     val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
    

    See the API docs and the example.

    import org.apache.spark.streaming.flume.*;
     JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
     	FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]);
    

    See the API docs and the example.

    Note that the hostname should be the same as the one used by the resource manager in the cluster (Mesos, YARN or Spark Standalone), so that resource allocation can match the names and launch the receiver in the right machine.

  3. Deploying: Package spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} and its dependencies (except spark-core_{{site.SCALA_BINARY_VERSION}} and spark-streaming_{{site.SCALA_BINARY_VERSION}} which are provided by spark-submit) into the application JAR. Then use spark-submit to launch your application (see Deploying section in the main programming guide).

Approach 2 (Experimental): Pull-based Approach using a Custom Sink

Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume sink that allows the following.

  • Flume pushes data into the sink, and the data stays buffered.
  • Spark Streaming uses a reliable Flume receiver and transactions to pull data from the sink. Transactions succeed only after data is received and replicated by Spark Streaming.

This ensures stronger reliability and fault-tolerance guarantees than the previous approach. However, this requires configuring Flume to run a custom sink. Here are the configuration steps.

General Requirements

Choose a machine that will run the custom sink in a Flume agent. The rest of the Flume pipeline is configured to send data to that agent. Machines in the Spark cluster should have access to the chosen machine running the custom sink.

Configuring Flume

Configuring Flume on the chosen machine requires the following two steps.

  1. Sink JARs: Add the following JARs to Flume's classpath (see Flume's documentation to see how) in the machine designated to run the custom sink .

    (i) Custom sink JAR: Download the JAR corresponding to the following artifact (or direct link).

     groupId = org.apache.spark
     artifactId = spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}
     version = {{site.SPARK_VERSION_SHORT}}
    

    (ii) Scala library JAR: Download the Scala library JAR for Scala {{site.SCALA_VERSION}}. It can be found with the following artifact detail (or, direct link).

     groupId = org.scala-lang
     artifactId = scala-library
     version = {{site.SCALA_VERSION}}
    
  2. Configuration file: On that machine, configure Flume agent to send data to an Avro sink by having the following in the configuration file.

     agent.sinks = spark
     agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
     agent.sinks.spark.hostname = <hostname of the local machine>
     agent.sinks.spark.port = <port to listen on for connection from Spark>
     agent.sinks.spark.channel = memoryChannel
    

    Also make sure that the upstream Flume pipeline is configured to send the data to the Flume agent running this sink.

See the Flume's documentation for more information about configuring Flume agents.

Configuring Spark Streaming Application

  1. Linking: In your SBT/Maven project definition, link your streaming application against the spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} (see Linking section in the main programming guide).

  2. Programming: In the streaming application code, import FlumeUtils and create input DStream as follows.

    import org.apache.spark.streaming.flume._
     val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])
    
    import org.apache.spark.streaming.flume.*;
     JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
     	FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
    

    See the Scala example FlumePollingEventCount.

    Note that each input DStream can be configured to receive data from multiple sinks.

  3. Deploying: Package spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} and its dependencies (except spark-core_{{site.SCALA_BINARY_VERSION}} and spark-streaming_{{site.SCALA_BINARY_VERSION}} which are provided by spark-submit) into the application JAR. Then use spark-submit to launch your application (see Deploying section in the main programming guide).