Merge remote-tracking branch 'apache/master' into conf2

Conflicts:
	core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
	streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
	streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
This commit is contained in:
Matei Zaharia 2013-12-31 18:23:14 -05:00
commit ba9338f104
30 changed files with 347 additions and 259 deletions

View file

@ -20,19 +20,24 @@ package org.apache.spark.network.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
class FileClient {
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private FileClientHandler handler = null;
private final FileClientHandler handler;
private Channel channel = null;
private Bootstrap bootstrap = null;
private int connectTimeout = 60*1000; // 1 min
private EventLoopGroup group = null;
private final int connectTimeout;
private final int sendTimeout = 60; // 1 min
public FileClient(FileClientHandler handler, int connectTimeout) {
this.handler = handler;
@ -40,8 +45,9 @@ class FileClient {
}
public void init() {
group = new OioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(new OioEventLoopGroup())
bootstrap.group(group)
.channel(OioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
@ -56,6 +62,7 @@ class FileClient {
// ChannelFuture cf = channel.closeFuture();
//cf.addListener(new ChannelCloseListener(this));
} catch (InterruptedException e) {
LOG.warn("FileClient interrupted while trying to connect", e);
close();
}
}
@ -71,16 +78,21 @@ class FileClient {
public void sendRequest(String file) {
//assert(file == null);
//assert(channel == null);
channel.write(file + "\r\n");
try {
// Should be able to send the message to network link channel.
boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS);
if (!bSent) {
throw new RuntimeException("Failed to send");
}
} catch (InterruptedException e) {
LOG.error("Error", e);
}
}
public void close() {
if(channel != null) {
channel.close();
channel = null;
}
if ( bootstrap!=null) {
bootstrap.shutdown();
if (group != null) {
group.shutdownGracefully();
group = null;
bootstrap = null;
}
}

View file

@ -17,15 +17,13 @@
package org.apache.spark.network.netty;
import io.netty.buffer.BufType;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringEncoder;
class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private FileClientHandler fhandler;
private final FileClientHandler fhandler;
public FileClientChannelInitializer(FileClientHandler handler) {
fhandler = handler;
@ -35,7 +33,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
public void initChannel(SocketChannel channel) {
// file no more than 2G
channel.pipeline()
.addLast("encoder", new StringEncoder(BufType.BYTE))
.addLast("encoder", new StringEncoder())
.addLast("handler", fhandler);
}
}

View file

@ -19,11 +19,11 @@ package org.apache.spark.network.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.spark.storage.BlockId;
abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
abstract class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private FileHeader currentHeader = null;
@ -37,13 +37,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
public abstract void handleError(BlockId blockId);
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
// Use direct buffer if possible.
return ctx.alloc().ioBuffer();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
// get header
if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) {
currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));

View file

@ -22,13 +22,12 @@ import java.net.InetSocketAddress;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Server that accept the path of a file an echo back its content.
*/
@ -36,7 +35,8 @@ class FileServer {
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private ServerBootstrap bootstrap = null;
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
private ChannelFuture channelFuture = null;
private int port = 0;
private Thread blockingThread = null;
@ -45,8 +45,11 @@ class FileServer {
InetSocketAddress addr = new InetSocketAddress(port);
// Configure the server.
bootstrap = new ServerBootstrap();
bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
bossGroup = new OioEventLoopGroup();
workerGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(OioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.option(ChannelOption.SO_RCVBUF, 1500)
@ -89,13 +92,19 @@ class FileServer {
public void stop() {
// Close the bound channel.
if (channelFuture != null) {
channelFuture.channel().close();
channelFuture.channel().close().awaitUninterruptibly();
channelFuture = null;
}
// Shutdown bootstrap.
if (bootstrap != null) {
bootstrap.shutdown();
bootstrap = null;
// Shutdown event groups
if (bossGroup != null) {
bossGroup.shutdownGracefully();
bossGroup = null;
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
workerGroup = null;
}
// TODO: Shutdown all accepted channels as well ?
}

View file

@ -23,7 +23,6 @@ import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
PathResolver pResolver;
@ -36,7 +35,7 @@ class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
public void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
.addLast("strDecoder", new StringDecoder())
.addLast("stringDecoder", new StringDecoder())
.addLast("handler", new FileServerHandler(pResolver));
}
}

View file

@ -21,22 +21,26 @@ import java.io.File;
import java.io.FileInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.DefaultFileRegion;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
class FileServerHandler extends SimpleChannelInboundHandler<String> {
PathResolver pResolver;
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private final PathResolver pResolver;
public FileServerHandler(PathResolver pResolver){
this.pResolver = pResolver;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
BlockId blockId = BlockId.apply(blockIdString);
FileSegment fileSegment = pResolver.getBlockLocation(blockId);
// if getBlockLocation returns null, close the channel
@ -60,10 +64,10 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
int len = new Long(length).intValue();
ctx.write((new FileHeader(len, blockId)).buffer());
try {
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
ctx.write(new DefaultFileRegion(new FileInputStream(file)
.getChannel(), fileSegment.offset(), fileSegment.length()));
} catch (Exception e) {
e.printStackTrace();
LOG.error("Exception: ", e);
}
} else {
ctx.write(new FileHeader(0, blockId).buffer());
@ -73,7 +77,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
LOG.error("Exception: ", cause);
ctx.close();
}
}

View file

@ -19,7 +19,7 @@ package org.apache.spark
import java.io._
import java.net.URI
import java.util.Properties
import java.util.{UUID, Properties}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.{Map, Set, immutable}
@ -897,22 +897,15 @@ class SparkContext(
/**
* Set the directory under which RDDs are going to be checkpointed. The directory must
* be a HDFS path if running on a cluster. If the directory does not exist, it will
* be created. If the directory exists and useExisting is set to true, then the
* exisiting directory will be used. Otherwise an exception will be thrown to
* prevent accidental overriding of checkpoint files in the existing directory.
* be a HDFS path if running on a cluster.
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
val path = new Path(dir)
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
if (!useExisting) {
if (fs.exists(path)) {
throw new Exception("Checkpoint directory '" + path + "' already exists.")
} else {
fs.mkdirs(path)
}
def setCheckpointDir(directory: String) {
checkpointDir = Option(directory).map { dir =>
val path = new Path(dir, UUID.randomUUID().toString)
val fs = path.getFileSystem(hadoopConfiguration)
fs.mkdirs(path)
fs.getFileStatus(path).getPath().toString
}
checkpointDir = Some(dir)
}
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */

View file

@ -394,20 +394,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* Set the directory under which RDDs are going to be checkpointed. The directory must
* be a HDFS path if running on a cluster. If the directory does not exist, it will
* be created. If the directory exists and useExisting is set to true, then the
* exisiting directory will be used. Otherwise an exception will be thrown to
* prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean) {
sc.setCheckpointDir(dir, useExisting)
}
/**
* Set the directory under which RDDs are going to be checkpointed. The directory must
* be a HDFS path if running on a cluster. If the directory does not exist, it will
* be created. If the directory exists, an exception will be thrown to prevent accidental
* overriding of checkpoint files.
* be a HDFS path if running on a cluster.
*/
def setCheckpointDir(dir: String) {
sc.setCheckpointDir(dir)

View file

@ -18,12 +18,12 @@
package org.apache.spark.rdd
import java.io.IOException
import scala.reflect.ClassTag
import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@ -34,6 +34,8 @@ private[spark]
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {
val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
override def getPartitions: Array[Partition] = {
@ -65,7 +67,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
CheckpointRDD.readFromFile(file, context)
CheckpointRDD.readFromFile(file, broadcastedConf, context)
}
override def checkpoint() {
@ -78,10 +80,14 @@ private[spark] object CheckpointRDD extends Logging {
"part-%05d".format(splitId)
}
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
def writeToFile[T](
path: String,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
blockSize: Int = -1
)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
val fs = outputDir.getFileSystem(broadcastedConf.value.value)
val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
@ -118,9 +124,13 @@ private[spark] object CheckpointRDD extends Logging {
}
}
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
def readFromFile[T](
path: Path,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
context: TaskContext
): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
val fs = path.getFileSystem(broadcastedConf.value.value)
val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
@ -143,8 +153,10 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val conf = SparkHadoopUtil.get.newConfiguration()
val fs = path.getFileSystem(conf)
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")

View file

@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Partition, SparkException, Logging}
import org.apache.spark.{SerializableWritable, Partition, SparkException, Logging}
import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
/**
@ -85,14 +85,21 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
// Create the output path for the checkpoint
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
val fs = path.getFileSystem(new Configuration())
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
throw new SparkException("Failed to create checkpoint path " + path)
}
// Save to file, and reload it as an RDD
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
throw new SparkException(
"Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
}
// Change the dependencies and partitions of the RDD
RDDCheckpointData.synchronized {
@ -101,8 +108,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed
RDDCheckpointData.clearTaskCaches()
logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
}
logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
}
// Get preferred location of a split after checkpointing

View file

@ -828,7 +828,7 @@ class DAGScheduler(
}
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
listenerBus.post(StageCompleted(stageToInfos(stage)))
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
running -= stage
}
event.reason match {

View file

@ -297,7 +297,7 @@ class JobLogger(val user: String, val logDirName: String)
* When stage is completed, record stage completion status
* @param stageCompleted Stage completed event
*/
override def onStageCompleted(stageCompleted: StageCompleted) {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
stageCompleted.stage.stageId))
}

View file

@ -27,7 +27,7 @@ sealed trait SparkListenerEvents
case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
extends SparkListenerEvents
case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents
case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
@ -47,7 +47,7 @@ trait SparkListener {
/**
* Called when a stage is completed, with information on the completed stage
*/
def onStageCompleted(stageCompleted: StageCompleted) { }
def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
/**
* Called when a stage is submitted
@ -86,7 +86,7 @@ trait SparkListener {
* Simple SparkListener that logs a few summary statistics when each stage completes
*/
class StatsReportListener extends SparkListener with Logging {
override def onStageCompleted(stageCompleted: StageCompleted) {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
import org.apache.spark.scheduler.StatsReportListener._
implicit val sc = stageCompleted
this.logInfo("Finished stage: " + stageCompleted.stage)
@ -119,13 +119,17 @@ object StatsReportListener extends Logging {
val probabilities = percentiles.map{_ / 100.0}
val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = {
def extractDoubleDistribution(stage: SparkListenerStageCompleted,
getMetric: (TaskInfo,TaskMetrics) => Option[Double])
: Option[Distribution] = {
Distribution(stage.stage.taskInfos.flatMap {
case ((info,metric)) => getMetric(info, metric)})
}
//is there some way to setup the types that I can get rid of this completely?
def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = {
def extractLongDistribution(stage: SparkListenerStageCompleted,
getMetric: (TaskInfo,TaskMetrics) => Option[Long])
: Option[Distribution] = {
extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
}
@ -147,12 +151,12 @@ object StatsReportListener extends Logging {
}
def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
(implicit stage: StageCompleted) {
(implicit stage: SparkListenerStageCompleted) {
showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
}
def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
(implicit stage: StageCompleted) {
(implicit stage: SparkListenerStageCompleted) {
showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
}
@ -169,7 +173,7 @@ object StatsReportListener extends Logging {
}
def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
(implicit stage: StageCompleted) {
(implicit stage: SparkListenerStageCompleted) {
showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
}

View file

@ -41,7 +41,7 @@ private[spark] class SparkListenerBus() extends Logging {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
case stageCompleted: StageCompleted =>
case stageCompleted: SparkListenerStageCompleted =>
sparkListeners.foreach(_.onStageCompleted(stageCompleted))
case jobStart: SparkListenerJobStart =>
sparkListeners.foreach(_.onJobStart(jobStart))

View file

@ -114,10 +114,6 @@ private[spark] class TaskSetManager(
// Task index, start and finish time for each task attempt (indexed by task ID)
val taskInfos = new HashMap[Long, TaskInfo]
// Did the TaskSet fail?
var failed = false
var causeOfFailure = ""
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
@ -558,8 +554,6 @@ private[spark] class TaskSetManager(
}
def abort(message: String) {
failed = true
causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.dagScheduler.taskSetFailed(taskSet, message)
removeAllRunningTasks()

View file

@ -61,7 +61,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onJobStart(jobStart: SparkListenerJobStart) {}
override def onStageCompleted(stageCompleted: StageCompleted) = synchronized {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stage
poolToActiveStages(stageIdToPool(stage.stageId)) -= stage
activeStages -= stage

View file

@ -851,7 +851,7 @@ public class JavaAPISuite implements Serializable {
public void checkpointAndComputation() {
File tempDir = Files.createTempDir();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
sc.setCheckpointDir(tempDir.getAbsolutePath());
Assert.assertEquals(false, rdd.isCheckpointed());
rdd.checkpoint();
rdd.count(); // Forces the DAG to cause a checkpoint
@ -863,7 +863,7 @@ public class JavaAPISuite implements Serializable {
public void checkpointAndRestore() {
File tempDir = Files.createTempDir();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
sc.setCheckpointDir(tempDir.getAbsolutePath());
Assert.assertEquals(false, rdd.isCheckpointed());
rdd.checkpoint();
rdd.count(); // Forces the DAG to cause a checkpoint

View file

@ -117,7 +117,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1
override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
}
sc.addSparkListener(joblogger)

View file

@ -174,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
class SaveStageInfo extends SparkListener {
val stageInfos = Buffer[StageInfo]()
override def onStageCompleted(stage: StageCompleted) {
override def onStageCompleted(stage: SparkListenerStageCompleted) {
stageInfos += stage.stage
}
}

View file

@ -282,7 +282,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.0.CR1</version>
<version>4.0.13.Final</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>

View file

@ -178,7 +178,7 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.0.CR1",
"io.netty" % "netty-all" % "4.0.13.Final",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),

View file

@ -345,17 +345,12 @@ class SparkContext(object):
self._python_includes.append(filename)
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
def setCheckpointDir(self, dirName, useExisting=False):
def setCheckpointDir(self, dirName):
"""
Set the directory under which RDDs are going to be checkpointed. The
directory must be a HDFS path if running on a cluster.
If the directory does not exist, it will be created. If the directory
exists and C{useExisting} is set to true, then the exisiting directory
will be used. Otherwise an exception will be thrown to prevent
accidental overriding of checkpoint files in the existing directory.
"""
self._jsc.sc().setCheckpointDir(dirName, useExisting)
self._jsc.sc().setCheckpointDir(dirName)
def _getJavaStorageLevel(self, storageLevel):
"""

View file

@ -73,8 +73,8 @@ class TestCheckpoint(PySparkTestCase):
time.sleep(1) # 1 second
self.assertTrue(flatMappedRDD.isCheckpointed())
self.assertEqual(flatMappedRDD.collect(), result)
self.assertEqual(self.checkpointDir.name,
os.path.dirname(flatMappedRDD.getCheckpointFile()))
self.assertEqual("file:" + self.checkpointDir.name,
os.path.dirname(os.path.dirname(flatMappedRDD.getCheckpointFile())))
def test_checkpoint_and_restore(self):
parCollection = self.sc.parallelize([1, 2, 3, 4])

View file

@ -21,12 +21,13 @@ import java.io._
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.deploy.SparkHadoopUtil
private[streaming]
@ -54,36 +55,36 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
/**
* Convenience class to speed up the writing of graph checkpoint to file
* Convenience class to handle the writing of graph checkpoint to file
*/
private[streaming]
class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging {
class CheckpointWriter(conf: SparkConf, checkpointDir: String, hadoopConf: Configuration)
extends Logging
{
val file = new Path(checkpointDir, "graph")
// The file to which we actually write - and then "move" to file.
private val writeFile = new Path(file.getParent, file.getName + ".next")
private val bakFile = new Path(file.getParent, file.getName + ".bk")
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec()
// The file to which we actually write - and then "move" to file
val writeFile = new Path(file.getParent, file.getName + ".next")
// The file to which existing checkpoint is backed up (i.e. "moved")
val bakFile = new Path(file.getParent, file.getName + ".bk")
private var stopped = false
val hadoopConf = new Configuration()
var fs = file.getFileSystem(hadoopConf)
val maxAttempts = 3
val executor = Executors.newFixedThreadPool(1)
private val compressionCodec = CompressionCodec.createCodec(conf)
private var fs_ : FileSystem = _
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
// I did not notice any errors - reintroduce it ?
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
def run() {
var attempts = 0
val startTime = System.currentTimeMillis()
while (attempts < maxAttempts) {
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
// This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
// This is inherently thread unsafe, so alleviating it by writing to '.new' and
// then moving it to the final file
val fos = fs.create(writeFile)
fos.write(bytes)
fos.close()
@ -101,6 +102,7 @@ class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging {
} catch {
case ioe: IOException =>
logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
reset()
}
}
logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
@ -133,7 +135,17 @@ class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging {
val startTime = System.currentTimeMillis()
val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
val endTime = System.currentTimeMillis()
logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.")
logInfo("CheckpointWriter executor terminated ? " + terminated +
", waited for " + (endTime - startTime) + " ms.")
}
private def fs = synchronized {
if (fs_ == null) fs_ = file.getFileSystem(hadoopConf)
fs_
}
private def reset() = synchronized {
fs_ = null
}
}
@ -143,8 +155,8 @@ object CheckpointReader extends Logging {
def read(conf: SparkConf, path: String): Checkpoint = {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(
new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"),
new Path(path), new Path(path + ".bk"))
val compressionCodec = CompressionCodec.createCodec(conf)
@ -159,7 +171,8 @@ object CheckpointReader extends Logging {
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
val zis = compressionCodec.compressedInputStream(fis)
val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
val ois = new ObjectInputStreamWithLoader(zis,
Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
fs.close()

View file

@ -20,17 +20,19 @@ package org.apache.spark.streaming
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
import akka.util.ByteString
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.receivers.ActorReceiver
import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
import org.apache.spark.streaming.receivers.ZeroMQReceiver
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming.receivers.ActorReceiver
import org.apache.spark.streaming.scheduler.{JobScheduler, NetworkInputTracker}
import scala.collection.mutable.Queue
import scala.collection.Map
@ -38,17 +40,15 @@ import scala.reflect.ClassTag
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.streaming.scheduler._
import akka.util.ByteString
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@ -153,7 +153,7 @@ class StreamingContext private (
protected[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
sc.setCheckpointDir(cp_.checkpointDir)
cp_.checkpointDir
} else {
null
@ -188,8 +188,12 @@ class StreamingContext private (
*/
def checkpoint(directory: String) {
if (directory != null) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
checkpointDir = directory
val path = new Path(directory)
val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
fs.mkdirs(path)
val fullPath = fs.getFileStatus(path).getPath().toString
sc.setCheckpointDir(fullPath)
checkpointDir = fullPath
} else {
checkpointDir = null
}
@ -380,7 +384,8 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
@ -399,6 +404,8 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
@ -419,7 +426,9 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored.
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = {
@ -623,8 +632,4 @@ object StreamingContext {
prefix + "-" + time.milliseconds + "." + suffix
}
}
protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
}

View file

@ -265,9 +265,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored.
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): JavaDStream[String] = {
@ -309,9 +311,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
@ -340,7 +343,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Creates a input stream from a Flume source.
* Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/

View file

@ -17,18 +17,17 @@
package org.apache.spark.streaming.dstream
import java.io.{ObjectInputStream, IOException}
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException}
private[streaming]
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@ -41,8 +40,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
// Latest file mod time seen till any point of time
private val lastModTimeFiles = new HashSet[String]()
private var lastModTime = 0L
private val prevModTimeFiles = new HashSet[String]()
private var prevModTime = 0L
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
@ -50,11 +49,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
override def start() {
if (newFilesOnly) {
lastModTime = graph.zeroTime.milliseconds
prevModTime = graph.zeroTime.milliseconds
} else {
lastModTime = 0
prevModTime = 0
}
logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
}
override def stop() { }
@ -69,55 +68,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
assert(validTime.milliseconds >= prevModTime,
"Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")
// Create the filter for selecting new files
val newFilter = new PathFilter() {
// Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
return false
} else { // Accept file only if
val modTime = fs.getFileStatus(path).getModificationTime()
logDebug("Mod time for " + path + " is " + modTime)
if (modTime < lastModTime) {
logDebug("Mod time less than last mod time")
return false // If the file was created before the last time it was called
} else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
logDebug("Mod time equal to last mod time, but file considered already")
return false // If the file was created exactly as lastModTime but not reported yet
} else if (modTime > validTime.milliseconds) {
logDebug("Mod time more than valid time")
return false // If the file was created after the time this function call requires
}
if (modTime > latestModTime) {
latestModTime = modTime
latestModTimeFiles.clear()
logDebug("Latest mod time updated to " + latestModTime)
}
latestModTimeFiles += path.toString
logDebug("Accepted " + path)
return true
}
}
}
logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
// Find new files
val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
if (lastModTime != newFilter.latestModTime) {
lastModTime = newFilter.latestModTime
lastModTimeFiles.clear()
if (prevModTime < latestModTime) {
prevModTime = latestModTime
prevModTimeFiles.clear()
}
lastModTimeFiles ++= newFilter.latestModTimeFiles
logDebug("Last mod time updated to " + lastModTime)
prevModTimeFiles ++= latestModTimeFiles
logDebug("Last mod time updated to " + prevModTime)
}
files += ((validTime, newFiles))
files += ((validTime, newFiles.toArray))
Some(filesToRDD(newFiles))
}
@ -132,12 +98,28 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
/**
* Find files which have modification timestamp <= current time and return a 3-tuple of
* (new files found, latest modification time among them, files with latest modification time)
*/
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
logDebug("Trying to get new files for time " + currentTime)
val filter = new CustomPathFilter(currentTime)
val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
(newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
}
/** Generate one RDD from an array of files */
protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
new UnionRDD(
context.sparkContext,
files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
)
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
files.zip(fileRDDs).foreach { case (file, rdd) => {
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
"Refer to the streaming programming guide for more details.")
}
}}
new UnionRDD(context.sparkContext, fileRDDs)
}
private def path: Path = {
@ -150,6 +132,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
fs_
}
private def reset() {
fs_ = null
}
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
logDebug(this.getClass().getSimpleName + ".readObject used")
@ -191,6 +177,51 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
}
}
/**
* Custom PathFilter class to find new files that have modification timestamps <= current time,
* but have not been seen before (i.e. the file should not be in lastModTimeFiles)
*/
private[streaming]
class CustomPathFilter(maxModTime: Long) extends PathFilter {
// Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
try {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
return false
}
val modTime = fs.getFileStatus(path).getModificationTime()
logDebug("Mod time for " + path + " is " + modTime)
if (modTime < prevModTime) {
logDebug("Mod time less than last mod time")
return false // If the file was created before the last time it was called
} else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
logDebug("Mod time equal to last mod time, but file considered already")
return false // If the file was created exactly as lastModTime but not reported yet
} else if (modTime > maxModTime) {
logDebug("Mod time more than ")
return false // If the file is too new that considering it may give errors
}
if (modTime > latestModTime) {
latestModTime = modTime
latestModTimeFiles.clear()
logDebug("Latest mod time updated to " + latestModTime)
}
latestModTimeFiles += path.toString
logDebug("Accepted " + path)
} catch {
case fnfe: java.io.FileNotFoundException =>
logWarning("Error finding new files", fnfe)
reset()
return false
}
return true
}
}
}
private[streaming]

View file

@ -17,11 +17,18 @@
package org.apache.spark.streaming.scheduler
import akka.actor.{Props, Actor}
import org.apache.spark.SparkEnv
import org.apache.spark.Logging
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
private[scheduler] case class ClearOldMetadata(time: Time) extends JobGeneratorEvent
private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent
/**
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
* up DStream metadata.
@ -32,43 +39,67 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
initLogging()
val ssc = jobScheduler.ssc
val clockClass = ssc.sc.conf.getOrElse(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => generateJobs(new Time(longTime)))
val graph = ssc.graph
val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent =>
logDebug("Got event of type " + event.getClass.getName)
processEvent(event)
}
}))
val clock = {
val clockClass = ssc.sc.conf.getOrElse(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
}
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventProcessorActor ! GenerateJobs(new Time(longTime)))
lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.conf, ssc.checkpointDir)
new CheckpointWriter(ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
null
}
var latestTime: Time = null
def start() = synchronized {
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
logInfo("JobGenerator started")
}
def stop() = synchronized {
def stop() {
timer.stop()
if (checkpointWriter != null) checkpointWriter.stop()
ssc.graph.stop()
logInfo("JobGenerator stopped")
}
/**
* On batch completion, clear old metadata and checkpoint computation.
*/
private[scheduler] def onBatchCompletion(time: Time) {
eventProcessorActor ! ClearOldMetadata(time)
}
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearOldMetadata(time) => clearOldMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
}
}
/** Starts the generator for the first time */
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("JobGenerator's timer started at " + startTime)
logInfo("JobGenerator started at " + startTime)
}
/** Restarts the generator based on the information in checkpoint */
private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
@ -100,7 +131,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// Restart the timer
timer.start(restartTime.milliseconds)
logInfo("JobGenerator's timer restarted at " + restartTime)
logInfo("JobGenerator restarted at " + restartTime)
}
/** Generate jobs and perform checkpoint for the given `time`. */
@ -108,16 +139,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
jobScheduler.runJobs(time, graph.generateJobs(time))
latestTime = time
doCheckpoint(time)
eventProcessorActor ! DoCheckpoint(time)
}
/**
* On batch completion, clear old metadata and checkpoint computation.
*/
private[streaming] def onBatchCompletion(time: Time) {
/** Clear DStream metadata for the given `time`. */
private def clearOldMetadata(time: Time) {
ssc.graph.clearOldMetadata(time)
doCheckpoint(time)
eventProcessorActor ! DoCheckpoint(time)
}
/** Perform checkpoint for the give `time`. */

View file

@ -17,24 +17,18 @@
package org.apache.spark.streaming
import dstream.FileInputDStream
import org.apache.spark.streaming.StreamingContext._
import java.io.File
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import com.google.common.io.Files
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.FileInputDStream
import org.apache.spark.streaming.util.ManualClock
/**
* This test suites tests the checkpointing functionality of DStreams -
* the checkpointing of a DStream's RDDs as well as the checkpointing of
@ -66,7 +60,7 @@ class CheckpointSuite extends TestSuiteBase {
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
val fs = FileSystem.getLocal(new Configuration())
// this ensure checkpointing occurs at least once
val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
val secondNumBatches = firstNumBatches
@ -90,11 +84,12 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
advanceTimeWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
"No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
case (time, file) => {
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
" for state stream before first failure does not exist")
}
}
@ -102,7 +97,8 @@ class CheckpointSuite extends TestSuiteBase {
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
advanceTimeWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
checkpointFiles.foreach(file =>
assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
// Restart stream computation using the checkpoint file and check whether
@ -110,19 +106,20 @@ class CheckpointSuite extends TestSuiteBase {
ssc = new StreamingContext(checkpointDir)
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure")
assert(!stateStream.generatedRDDs.isEmpty,
"No restored RDDs in state stream after recovery from first failure")
// Run one batch to generate a new checkpoint file and check whether some RDD
// is present in the checkpoint data or not
ssc.start()
advanceTimeWithRealDelay(ssc, 1)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
"No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(),
"Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist")
case (time, file) => {
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
" for state stream before seconds failure does not exist")
}
}
ssc.stop()
@ -132,7 +129,8 @@ class CheckpointSuite extends TestSuiteBase {
ssc = new StreamingContext(checkpointDir)
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
assert(!stateStream.generatedRDDs.isEmpty,
"No restored RDDs in state stream after recovery from second failure")
// Adjust manual clock time as if it is being restarted after a delay; this is a hack because
// we modify the conf object, but it works for this one property
@ -144,6 +142,7 @@ class CheckpointSuite extends TestSuiteBase {
ssc = null
}
// This tests whether the systm can recover from a master failure with simple
// non-stateful operations. This assumes as reliable, replayable input
// source - TestInputDStream.
@ -192,6 +191,7 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7)
}
// This tests whether file input stream remembers what files were seen before
// the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the

View file

@ -23,7 +23,7 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent}
import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent}
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}