[SPARK-8015] [FLUME] Remove Guava dependency from flume-sink.
The minimal change would be to disable shading of Guava in the module,
and rely on the transitive dependency from other libraries instead. But
since Guava's use is so localized, I think it's better to just not use
it instead, so I replaced that code and removed all traces of Guava from
the module's build.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #6555 from vanzin/SPARK-8015 and squashes the following commits:
c0ceea8 [Marcelo Vanzin] Add comments about dependency management.
c38228d [Marcelo Vanzin] Add guava dep in test scope.
b7a0349 [Marcelo Vanzin] Add libthrift exclusion.
6e0942d [Marcelo Vanzin] Add comment in pom.
2d79260 [Marcelo Vanzin] [SPARK-8015] [flume] Remove Guava dependency from flume-sink.
(cherry picked from commit 0071bd8d31
)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
This commit is contained in:
parent
f71a09de6e
commit
fa292dc3db
39
external/flume-sink/pom.xml
vendored
39
external/flume-sink/pom.xml
vendored
|
@ -42,15 +42,46 @@
|
|||
<dependency>
|
||||
<groupId>org.apache.flume</groupId>
|
||||
<artifactId>flume-ng-sdk</artifactId>
|
||||
<exclusions>
|
||||
<!-- Guava is excluded to avoid its use in this module. -->
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
<!--
|
||||
Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
|
||||
dependency.
|
||||
-->
|
||||
<exclusion>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
<artifactId>libthrift</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flume</groupId>
|
||||
<artifactId>flume-ng-core</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
<artifactId>libthrift</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<!-- Add Guava in test scope since flume actually needs it. -->
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<!--
|
||||
Netty explicitly added in test as it has been excluded from
|
||||
|
@ -85,6 +116,14 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<configuration>
|
||||
<!-- Disable all relocations defined in the parent pom. -->
|
||||
<relocations combine.self="override" />
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong
|
|||
|
||||
import scala.collection.mutable
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
import org.apache.flume.Channel
|
||||
import org.apache.commons.lang3.RandomStringUtils
|
||||
|
||||
|
@ -45,8 +44,7 @@ import org.apache.commons.lang3.RandomStringUtils
|
|||
private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
|
||||
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
|
||||
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
|
||||
new ThreadFactoryBuilder().setDaemon(true)
|
||||
.setNameFormat("Spark Sink Processor Thread - %d").build()))
|
||||
new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
|
||||
// Protected by `sequenceNumberToProcessor`
|
||||
private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
|
||||
// This sink will not persist sequence numbers and reuses them if it gets restarted.
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.streaming.flume.sink
|
||||
|
||||
import java.util.concurrent.ThreadFactory
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
/**
|
||||
* Thread factory that generates daemon threads with a specified name format.
|
||||
*/
|
||||
private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
|
||||
|
||||
private val threadId = new AtomicLong()
|
||||
|
||||
override def newThread(r: Runnable): Thread = {
|
||||
val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
|
||||
t.setDaemon(true)
|
||||
t
|
||||
}
|
||||
|
||||
}
|
|
@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
|
|||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
import org.apache.avro.ipc.NettyTransceiver
|
||||
import org.apache.avro.ipc.specific.SpecificRequestor
|
||||
import org.apache.flume.Context
|
||||
|
@ -185,9 +184,8 @@ class SparkSinkSuite extends FunSuite {
|
|||
count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
|
||||
|
||||
(1 to count).map(_ => {
|
||||
lazy val channelFactoryExecutor =
|
||||
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
|
||||
setNameFormat("Flume Receiver Channel Thread - %d").build())
|
||||
lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
|
||||
new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
|
||||
lazy val channelFactory =
|
||||
new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
|
||||
val transceiver = new NettyTransceiver(address, channelFactory)
|
||||
|
|
Loading…
Reference in a new issue