[SPARK-8015] [FLUME] Remove Guava dependency from flume-sink.

The minimal change would be to disable shading of Guava in the module,
and rely on the transitive dependency from other libraries instead. But
since Guava's use is so localized, I think it's better to just not use
it instead, so I replaced that code and removed all traces of Guava from
the module's build.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #6555 from vanzin/SPARK-8015 and squashes the following commits:

c0ceea8 [Marcelo Vanzin] Add comments about dependency management.
c38228d [Marcelo Vanzin] Add guava dep in test scope.
b7a0349 [Marcelo Vanzin] Add libthrift exclusion.
6e0942d [Marcelo Vanzin] Add comment in pom.
2d79260 [Marcelo Vanzin] [SPARK-8015] [flume] Remove Guava dependency from flume-sink.
This commit is contained in:
Marcelo Vanzin 2015-06-02 11:20:33 -07:00 committed by Tathagata Das
parent 1bb5d716c0
commit 0071bd8d31
4 changed files with 77 additions and 7 deletions

View file

@ -42,15 +42,46 @@
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<exclusions>
<!-- Guava is excluded to avoid its use in this module. -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<!--
Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
dependency.
-->
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<!-- Add Guava in test scope since flume actually needs it. -->
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<!--
Netty explicitly added in test as it has been excluded from
@ -85,6 +116,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<!-- Disable all relocations defined in the parent pom. -->
<relocations combine.self="override" />
</configuration>
</plugin>
</plugins>
</build>
</project>

View file

@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.flume.Channel
import org.apache.commons.lang3.RandomStringUtils
@ -45,8 +44,7 @@ import org.apache.commons.lang3.RandomStringUtils
private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build()))
new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
// Protected by `sequenceNumberToProcessor`
private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted.

View file

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.flume.sink
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicLong
/**
* Thread factory that generates daemon threads with a specified name format.
*/
private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
private val threadId = new AtomicLong()
override def newThread(r: Runnable): Thread = {
val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
t.setDaemon(true)
t
}
}

View file

@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.Context
@ -194,9 +193,8 @@ class SparkSinkSuite extends FunSuite {
count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
(1 to count).map(_ => {
lazy val channelFactoryExecutor =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
setNameFormat("Flume Receiver Channel Thread - %d").build())
lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
lazy val channelFactory =
new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
val transceiver = new NettyTransceiver(address, channelFactory)