Merge pull request #1 from aarondav/scala210-master
Various merge corrections
This commit is contained in:
commit
f6b2e590b1
|
@ -29,8 +29,6 @@ import java.io.Serializable;
|
||||||
* when mapping RDDs of other types.
|
* when mapping RDDs of other types.
|
||||||
*/
|
*/
|
||||||
public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
|
public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
|
||||||
public abstract R call(T t) throws Exception;
|
|
||||||
|
|
||||||
public ClassTag<R> returnType() {
|
public ClassTag<R> returnType() {
|
||||||
return ClassTag$.MODULE$.apply(Object.class);
|
return ClassTag$.MODULE$.apply(Object.class);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,8 +28,6 @@ import java.io.Serializable;
|
||||||
public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
|
public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
|
||||||
implements Serializable {
|
implements Serializable {
|
||||||
|
|
||||||
public abstract R call(T1 t1, T2 t2) throws Exception;
|
|
||||||
|
|
||||||
public ClassTag<R> returnType() {
|
public ClassTag<R> returnType() {
|
||||||
return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
|
return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,11 +145,11 @@ private[spark] class Client(
|
||||||
markDisconnected()
|
markDisconnected()
|
||||||
|
|
||||||
case DisassociatedEvent(_, address, _) if address == masterAddress =>
|
case DisassociatedEvent(_, address, _) if address == masterAddress =>
|
||||||
logError("Connection to master failed; stopping client")
|
logWarning("Connection to master failed; waiting for master to reconnect...")
|
||||||
markDisconnected()
|
markDisconnected()
|
||||||
|
|
||||||
case AssociationErrorEvent(_, _, address, _) if address == masterAddress =>
|
case AssociationErrorEvent(_, _, address, _) if address == masterAddress =>
|
||||||
logError("Connection to master failed; stopping client")
|
logWarning("Connection to master failed; waiting for master to reconnect...")
|
||||||
markDisconnected()
|
markDisconnected()
|
||||||
|
|
||||||
case StopClient =>
|
case StopClient =>
|
||||||
|
|
|
@ -17,8 +17,7 @@
|
||||||
|
|
||||||
package org.apache.spark.deploy.master
|
package org.apache.spark.deploy.master
|
||||||
|
|
||||||
private[spark] object ApplicationState
|
private[spark] object ApplicationState extends Enumeration {
|
||||||
extends Enumeration {
|
|
||||||
|
|
||||||
type ApplicationState = Value
|
type ApplicationState = Value
|
||||||
|
|
||||||
|
|
|
@ -41,16 +41,6 @@ import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
|
||||||
import org.apache.spark.deploy.DeployMessages.KillExecutor
|
import org.apache.spark.deploy.DeployMessages.KillExecutor
|
||||||
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
|
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
|
||||||
import scala.Some
|
import scala.Some
|
||||||
import org.apache.spark.deploy.DeployMessages.LaunchExecutor
|
|
||||||
import org.apache.spark.deploy.DeployMessages.RegisteredApplication
|
|
||||||
import org.apache.spark.deploy.DeployMessages.RegisterWorker
|
|
||||||
import org.apache.spark.deploy.DeployMessages.ExecutorUpdated
|
|
||||||
import org.apache.spark.deploy.DeployMessages.MasterStateResponse
|
|
||||||
import org.apache.spark.deploy.DeployMessages.ExecutorAdded
|
|
||||||
import org.apache.spark.deploy.DeployMessages.RegisterApplication
|
|
||||||
import org.apache.spark.deploy.DeployMessages.ApplicationRemoved
|
|
||||||
import org.apache.spark.deploy.DeployMessages.Heartbeat
|
|
||||||
import org.apache.spark.deploy.DeployMessages.RegisteredWorker
|
|
||||||
import akka.actor.Terminated
|
import akka.actor.Terminated
|
||||||
import akka.serialization.SerializationExtension
|
import akka.serialization.SerializationExtension
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
|
@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent}
|
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
|
||||||
|
|
||||||
import org.apache.spark.Logging
|
import org.apache.spark.Logging
|
||||||
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
|
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
|
||||||
|
@ -34,19 +34,6 @@ import org.apache.spark.deploy.master.Master
|
||||||
import org.apache.spark.deploy.worker.ui.WorkerWebUI
|
import org.apache.spark.deploy.worker.ui.WorkerWebUI
|
||||||
import org.apache.spark.metrics.MetricsSystem
|
import org.apache.spark.metrics.MetricsSystem
|
||||||
import org.apache.spark.util.{Utils, AkkaUtils}
|
import org.apache.spark.util.{Utils, AkkaUtils}
|
||||||
import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
|
|
||||||
import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
|
|
||||||
import org.apache.spark.deploy.DeployMessages.KillExecutor
|
|
||||||
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
|
|
||||||
import scala.Some
|
|
||||||
import akka.remote.DisassociatedEvent
|
|
||||||
import org.apache.spark.deploy.DeployMessages.LaunchExecutor
|
|
||||||
import org.apache.spark.deploy.DeployMessages.RegisterWorker
|
|
||||||
import org.apache.spark.deploy.DeployMessages.WorkerSchedulerStateResponse
|
|
||||||
import org.apache.spark.deploy.DeployMessages.MasterChanged
|
|
||||||
import org.apache.spark.deploy.DeployMessages.Heartbeat
|
|
||||||
import org.apache.spark.deploy.DeployMessages.RegisteredWorker
|
|
||||||
import akka.actor.Terminated
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param masterUrls Each url should look like spark://host:port.
|
* @param masterUrls Each url should look like spark://host:port.
|
||||||
|
@ -248,7 +235,7 @@ private[spark] class Worker(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case DisassociatedEvent(_, _, _) =>
|
case DisassociatedEvent(_, address, _) if address == master.path.address =>
|
||||||
masterDisconnected()
|
masterDisconnected()
|
||||||
|
|
||||||
case RequestWorkerState => {
|
case RequestWorkerState => {
|
||||||
|
|
|
@ -107,7 +107,6 @@ private[spark] object CoarseGrainedExecutorBackend {
|
||||||
// set it
|
// set it
|
||||||
val sparkHostPort = hostname + ":" + boundPort
|
val sparkHostPort = hostname + ":" + boundPort
|
||||||
System.setProperty("spark.hostPort", sparkHostPort)
|
System.setProperty("spark.hostPort", sparkHostPort)
|
||||||
|
|
||||||
actorSystem.actorOf(
|
actorSystem.actorOf(
|
||||||
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
|
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
|
||||||
name = "Executor")
|
name = "Executor")
|
||||||
|
|
|
@ -118,7 +118,11 @@ private[spark] class Executor(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
|
// Akka's message frame size. If task result is bigger than this, we use the block manager
|
||||||
|
// to send the result back.
|
||||||
|
private val akkaFrameSize = {
|
||||||
|
env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
|
||||||
|
}
|
||||||
|
|
||||||
// Start worker thread pool
|
// Start worker thread pool
|
||||||
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
|
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
|
||||||
|
|
|
@ -21,9 +21,9 @@ import java.util.concurrent.atomic.AtomicLong
|
||||||
|
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.concurrent.ExecutionContext.Implicits.global
|
import scala.concurrent.ExecutionContext.Implicits.global
|
||||||
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
|
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
|
||||||
import scala.reflect.ClassTag
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A set of asynchronous RDD actions available through an implicit conversion.
|
* A set of asynchronous RDD actions available through an implicit conversion.
|
||||||
|
|
|
@ -17,9 +17,10 @@
|
||||||
|
|
||||||
package org.apache.spark.rdd
|
package org.apache.spark.rdd
|
||||||
|
|
||||||
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
|
import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
|
||||||
import org.apache.spark.storage.{BlockId, BlockManager}
|
import org.apache.spark.storage.{BlockId, BlockManager}
|
||||||
import scala.reflect.ClassTag
|
|
||||||
|
|
||||||
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
|
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
|
||||||
val index = idx
|
val index = idx
|
||||||
|
|
|
@ -52,7 +52,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
|
||||||
* sources in HBase, or S3).
|
* sources in HBase, or S3).
|
||||||
*
|
*
|
||||||
* @param sc The SparkContext to associate the RDD with.
|
* @param sc The SparkContext to associate the RDD with.
|
||||||
* @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
|
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
|
||||||
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
|
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
|
||||||
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
|
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
|
||||||
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
|
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
|
||||||
|
|
|
@ -20,12 +20,13 @@ package org.apache.spark.scheduler
|
||||||
import java.io.NotSerializableException
|
import java.io.NotSerializableException
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import scala.concurrent.duration._
|
|
||||||
|
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
|
||||||
import scala.concurrent.ExecutionContext.Implicits.global
|
import scala.concurrent.ExecutionContext.Implicits.global
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
|
|
||||||
import scala.reflect.ClassTag
|
|
||||||
|
|
||||||
import org.apache.spark._
|
import org.apache.spark._
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
|
|
|
@ -25,8 +25,8 @@ import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.collection.mutable.HashMap
|
import scala.collection.mutable.HashMap
|
||||||
import scala.collection.mutable.HashSet
|
import scala.collection.mutable.HashSet
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
|
||||||
import scala.concurrent.ExecutionContext.Implicits.global
|
import scala.concurrent.ExecutionContext.Implicits.global
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
import org.apache.spark._
|
import org.apache.spark._
|
||||||
import org.apache.spark.TaskState.TaskState
|
import org.apache.spark.TaskState.TaskState
|
||||||
|
|
|
@ -181,6 +181,7 @@ private[spark] class CoarseMesosSchedulerBackend(
|
||||||
!slaveIdsWithExecutors.contains(slaveId)) {
|
!slaveIdsWithExecutors.contains(slaveId)) {
|
||||||
// Launch an executor on the slave
|
// Launch an executor on the slave
|
||||||
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
|
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
|
||||||
|
totalCoresAcquired += cpusToUse
|
||||||
val taskId = newMesosTaskId()
|
val taskId = newMesosTaskId()
|
||||||
taskIdToSlaveId(taskId) = slaveId
|
taskIdToSlaveId(taskId) = slaveId
|
||||||
slaveIdsWithExecutors += slaveId
|
slaveIdsWithExecutors += slaveId
|
||||||
|
|
|
@ -22,14 +22,11 @@ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
|
||||||
import java.util.{Locale, Random, UUID}
|
import java.util.{Locale, Random, UUID}
|
||||||
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
|
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
|
||||||
|
|
||||||
|
|
||||||
import scala.collection.mutable.ArrayBuffer
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.collection.Map
|
import scala.collection.Map
|
||||||
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.io.Source
|
import scala.io.Source
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import scala.Some
|
|
||||||
|
|
||||||
|
|
||||||
import com.google.common.io.Files
|
import com.google.common.io.Files
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.spark.util.collection
|
||||||
|
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
|
* A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
|
||||||
* but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
|
* but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
|
||||||
|
|
|
@ -25,8 +25,8 @@ the _exact_ Hadoop version you are running to avoid any compatibility errors.
|
||||||
<h3>CDH Releases</h3>
|
<h3>CDH Releases</h3>
|
||||||
<table class="table" style="width:350px; margin-right: 20px;">
|
<table class="table" style="width:350px; margin-right: 20px;">
|
||||||
<tr><th>Release</th><th>Version code</th></tr>
|
<tr><th>Release</th><th>Version code</th></tr>
|
||||||
<tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-chd4.X.X</td></tr>
|
<tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-cdh4.X.X</td></tr>
|
||||||
<tr><td>CDH 4.X.X</td><td>2.0.0-mr1-chd4.X.X</td></tr>
|
<tr><td>CDH 4.X.X</td><td>2.0.0-mr1-cdh4.X.X</td></tr>
|
||||||
<tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr>
|
<tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr>
|
||||||
<tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr>
|
<tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr>
|
||||||
<tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr>
|
<tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr>
|
||||||
|
|
|
@ -105,12 +105,6 @@ object SparkBuild extends Build {
|
||||||
// also check the local Maven repository ~/.m2
|
// also check the local Maven repository ~/.m2
|
||||||
resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))),
|
resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))),
|
||||||
|
|
||||||
// Shared between both core and streaming.
|
|
||||||
resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"),
|
|
||||||
|
|
||||||
// Shared between both examples and streaming.
|
|
||||||
resolvers ++= Seq("Mqtt Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"),
|
|
||||||
|
|
||||||
// For Sonatype publishing
|
// For Sonatype publishing
|
||||||
resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
|
resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
|
||||||
"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"),
|
"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"),
|
||||||
|
|
|
@ -940,17 +940,9 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
|
||||||
if (prop != null) prop else "local"
|
if (prop != null) prop else "local"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
val jars = Option(System.getenv("ADD_JARS")).map(_.split(','))
|
val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
|
||||||
.getOrElse(new Array[String](0))
|
|
||||||
.map(new java.io.File(_).getAbsolutePath)
|
|
||||||
try {
|
|
||||||
sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars)
|
sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars)
|
||||||
} catch {
|
echo("Created spark context..")
|
||||||
case e: Exception =>
|
|
||||||
e.printStackTrace()
|
|
||||||
echo("Failed to create SparkContext, exiting...")
|
|
||||||
sys.exit(1)
|
|
||||||
}
|
|
||||||
sparkContext
|
sparkContext
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,10 +25,10 @@ import org.apache.spark.SparkContext._
|
||||||
|
|
||||||
import scala.collection.mutable.HashMap
|
import scala.collection.mutable.HashMap
|
||||||
import scala.collection.mutable.Queue
|
import scala.collection.mutable.Queue
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
import scala.concurrent.duration._
|
|
||||||
import akka.dispatch._
|
import akka.dispatch._
|
||||||
import org.apache.spark.storage.BlockId
|
import org.apache.spark.storage.BlockId
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
|
||||||
import org.apache.hadoop.mapred.OutputFormat
|
import org.apache.hadoop.mapred.OutputFormat
|
||||||
import org.apache.hadoop.security.UserGroupInformation
|
import org.apache.hadoop.security.UserGroupInformation
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import scala.Some
|
|
||||||
|
|
||||||
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
|
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
|
||||||
extends Serializable {
|
extends Serializable {
|
||||||
|
|
|
@ -728,7 +728,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
|
||||||
}
|
}
|
||||||
|
|
||||||
object JavaPairDStream {
|
object JavaPairDStream {
|
||||||
implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) : JavaPairDStream[K, V] = {
|
implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) = {
|
||||||
new JavaPairDStream[K, V](dstream)
|
new JavaPairDStream[K, V](dstream)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,59 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.spark.streaming.dstream
|
|
||||||
|
|
||||||
import org.apache.spark.Partitioner
|
|
||||||
import org.apache.spark.rdd.RDD
|
|
||||||
import org.apache.spark.rdd.CoGroupedRDD
|
|
||||||
import org.apache.spark.streaming.{Time, DStream, Duration}
|
|
||||||
import scala.reflect.ClassTag
|
|
||||||
|
|
||||||
private[streaming]
|
|
||||||
class CoGroupedDStream[K : ClassTag](
|
|
||||||
parents: Seq[DStream[(K, _)]],
|
|
||||||
partitioner: Partitioner
|
|
||||||
) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
|
|
||||||
|
|
||||||
if (parents.length == 0) {
|
|
||||||
throw new IllegalArgumentException("Empty array of parents")
|
|
||||||
}
|
|
||||||
|
|
||||||
if (parents.map(_.ssc).distinct.size > 1) {
|
|
||||||
throw new IllegalArgumentException("Array of parents have different StreamingContexts")
|
|
||||||
}
|
|
||||||
|
|
||||||
if (parents.map(_.slideDuration).distinct.size > 1) {
|
|
||||||
throw new IllegalArgumentException("Array of parents have different slide times")
|
|
||||||
}
|
|
||||||
|
|
||||||
override def dependencies = parents.toList
|
|
||||||
|
|
||||||
override def slideDuration: Duration = parents.head.slideDuration
|
|
||||||
|
|
||||||
override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
|
|
||||||
val part = partitioner
|
|
||||||
val rdds = parents.flatMap(_.getOrCompute(validTime))
|
|
||||||
if (rdds.size > 0) {
|
|
||||||
val q = new CoGroupedRDD[K](rdds, part)
|
|
||||||
Some(q)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -33,7 +33,6 @@ import org.I0Itec.zkclient._
|
||||||
import scala.collection.Map
|
import scala.collection.Map
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Input stream that pulls messages from a Kafka Broker.
|
* Input stream that pulls messages from a Kafka Broker.
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in a new issue