Merge remote-tracking branch 'upstream/master' into sparsesvd
This commit is contained in:
commit
7c04b3134a
422
core/pom.xml
422
core/pom.xml
|
@ -17,215 +17,219 @@
|
|||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-parent</artifactId>
|
||||
<version>0.9.0-incubating-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-parent</artifactId>
|
||||
<version>0.9.0-incubating-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<artifactId>spark-core_2.10</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Spark Project Core</name>
|
||||
<url>http://spark.incubator.apache.org/</url>
|
||||
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.10</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Spark Project Core</name>
|
||||
<url>http://spark.incubator.apache.org/</url>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.java.dev.jets3t</groupId>
|
||||
<artifactId>jets3t</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro-ipc</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.ning</groupId>
|
||||
<artifactId>compress-lzf</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.ow2.asm</groupId>
|
||||
<artifactId>asm</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.twitter</groupId>
|
||||
<artifactId>chill_${scala.binary.version}</artifactId>
|
||||
<version>0.3.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.twitter</groupId>
|
||||
<artifactId>chill-java</artifactId>
|
||||
<version>0.3.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${akka.group}</groupId>
|
||||
<artifactId>akka-remote_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${akka.group}</groupId>
|
||||
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.liftweb</groupId>
|
||||
<artifactId>lift-json_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>it.unimi.dsi</groupId>
|
||||
<artifactId>fastutil</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>colt</groupId>
|
||||
<artifactId>colt</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.mesos</groupId>
|
||||
<artifactId>mesos</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-jvm</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-json</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-ganglia</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-graphite</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scalacheck</groupId>
|
||||
<artifactId>scalacheck_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.easymock</groupId>
|
||||
<artifactId>easymock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.novocode</groupId>
|
||||
<artifactId>junit-interface</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
|
||||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>test</phase>
|
||||
<goals>
|
||||
<goal>run</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<exportAntProperties>true</exportAntProperties>
|
||||
<tasks>
|
||||
<property name="spark.classpath" refid="maven.test.classpath" />
|
||||
<property environment="env" />
|
||||
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
|
||||
<condition>
|
||||
<not>
|
||||
<or>
|
||||
<isset property="env.SCALA_HOME" />
|
||||
<isset property="env.SCALA_LIBRARY_PATH" />
|
||||
</or>
|
||||
</not>
|
||||
</condition>
|
||||
</fail>
|
||||
</tasks>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<environmentVariables>
|
||||
<SPARK_HOME>${basedir}/..</SPARK_HOME>
|
||||
<SPARK_TESTING>1</SPARK_TESTING>
|
||||
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
|
||||
</environmentVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.java.dev.jets3t</groupId>
|
||||
<artifactId>jets3t</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro-ipc</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.ning</groupId>
|
||||
<artifactId>compress-lzf</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.ow2.asm</groupId>
|
||||
<artifactId>asm</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.twitter</groupId>
|
||||
<artifactId>chill_${scala.binary.version}</artifactId>
|
||||
<version>0.3.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.twitter</groupId>
|
||||
<artifactId>chill-java</artifactId>
|
||||
<version>0.3.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${akka.group}</groupId>
|
||||
<artifactId>akka-remote_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${akka.group}</groupId>
|
||||
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.liftweb</groupId>
|
||||
<artifactId>lift-json_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>it.unimi.dsi</groupId>
|
||||
<artifactId>fastutil</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>colt</groupId>
|
||||
<artifactId>colt</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.mesos</groupId>
|
||||
<artifactId>mesos</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.clearspring.analytics</groupId>
|
||||
<artifactId>stream</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-jvm</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-json</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-ganglia</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-graphite</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scalacheck</groupId>
|
||||
<artifactId>scalacheck_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.easymock</groupId>
|
||||
<artifactId>easymock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.novocode</groupId>
|
||||
<artifactId>junit-interface</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
|
||||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>test</phase>
|
||||
<goals>
|
||||
<goal>run</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<exportAntProperties>true</exportAntProperties>
|
||||
<tasks>
|
||||
<property name="spark.classpath" refid="maven.test.classpath" />
|
||||
<property environment="env" />
|
||||
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
|
||||
<condition>
|
||||
<not>
|
||||
<or>
|
||||
<isset property="env.SCALA_HOME" />
|
||||
<isset property="env.SCALA_LIBRARY_PATH" />
|
||||
</or>
|
||||
</not>
|
||||
</condition>
|
||||
</fail>
|
||||
</tasks>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<environmentVariables>
|
||||
<SPARK_HOME>${basedir}/..</SPARK_HOME>
|
||||
<SPARK_TESTING>1</SPARK_TESTING>
|
||||
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
|
||||
</environmentVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
@ -20,19 +20,24 @@ package org.apache.spark.network.netty;
|
|||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.oio.OioEventLoopGroup;
|
||||
import io.netty.channel.socket.oio.OioSocketChannel;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
class FileClient {
|
||||
|
||||
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
|
||||
private FileClientHandler handler = null;
|
||||
private final FileClientHandler handler;
|
||||
private Channel channel = null;
|
||||
private Bootstrap bootstrap = null;
|
||||
private int connectTimeout = 60*1000; // 1 min
|
||||
private EventLoopGroup group = null;
|
||||
private final int connectTimeout;
|
||||
private final int sendTimeout = 60; // 1 min
|
||||
|
||||
public FileClient(FileClientHandler handler, int connectTimeout) {
|
||||
this.handler = handler;
|
||||
|
@ -40,8 +45,9 @@ class FileClient {
|
|||
}
|
||||
|
||||
public void init() {
|
||||
group = new OioEventLoopGroup();
|
||||
bootstrap = new Bootstrap();
|
||||
bootstrap.group(new OioEventLoopGroup())
|
||||
bootstrap.group(group)
|
||||
.channel(OioSocketChannel.class)
|
||||
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||
.option(ChannelOption.TCP_NODELAY, true)
|
||||
|
@ -56,6 +62,7 @@ class FileClient {
|
|||
// ChannelFuture cf = channel.closeFuture();
|
||||
//cf.addListener(new ChannelCloseListener(this));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("FileClient interrupted while trying to connect", e);
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
@ -71,16 +78,21 @@ class FileClient {
|
|||
public void sendRequest(String file) {
|
||||
//assert(file == null);
|
||||
//assert(channel == null);
|
||||
channel.write(file + "\r\n");
|
||||
try {
|
||||
// Should be able to send the message to network link channel.
|
||||
boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS);
|
||||
if (!bSent) {
|
||||
throw new RuntimeException("Failed to send");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Error", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if(channel != null) {
|
||||
channel.close();
|
||||
channel = null;
|
||||
}
|
||||
if ( bootstrap!=null) {
|
||||
bootstrap.shutdown();
|
||||
if (group != null) {
|
||||
group.shutdownGracefully();
|
||||
group = null;
|
||||
bootstrap = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,15 +17,13 @@
|
|||
|
||||
package org.apache.spark.network.netty;
|
||||
|
||||
import io.netty.buffer.BufType;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.string.StringEncoder;
|
||||
|
||||
|
||||
class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
private FileClientHandler fhandler;
|
||||
private final FileClientHandler fhandler;
|
||||
|
||||
public FileClientChannelInitializer(FileClientHandler handler) {
|
||||
fhandler = handler;
|
||||
|
@ -35,7 +33,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
|
|||
public void initChannel(SocketChannel channel) {
|
||||
// file no more than 2G
|
||||
channel.pipeline()
|
||||
.addLast("encoder", new StringEncoder(BufType.BYTE))
|
||||
.addLast("encoder", new StringEncoder())
|
||||
.addLast("handler", fhandler);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,11 +19,11 @@ package org.apache.spark.network.netty;
|
|||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundByteHandlerAdapter;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
import org.apache.spark.storage.BlockId;
|
||||
|
||||
abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
|
||||
abstract class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
|
||||
private FileHeader currentHeader = null;
|
||||
|
||||
|
@ -37,13 +37,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
|
|||
public abstract void handleError(BlockId blockId);
|
||||
|
||||
@Override
|
||||
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
|
||||
// Use direct buffer if possible.
|
||||
return ctx.alloc().ioBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
|
||||
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
|
||||
// get header
|
||||
if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) {
|
||||
currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));
|
||||
|
|
|
@ -22,13 +22,12 @@ import java.net.InetSocketAddress;
|
|||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.oio.OioEventLoopGroup;
|
||||
import io.netty.channel.socket.oio.OioServerSocketChannel;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* Server that accept the path of a file an echo back its content.
|
||||
*/
|
||||
|
@ -36,7 +35,8 @@ class FileServer {
|
|||
|
||||
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
|
||||
|
||||
private ServerBootstrap bootstrap = null;
|
||||
private EventLoopGroup bossGroup = null;
|
||||
private EventLoopGroup workerGroup = null;
|
||||
private ChannelFuture channelFuture = null;
|
||||
private int port = 0;
|
||||
private Thread blockingThread = null;
|
||||
|
@ -45,8 +45,11 @@ class FileServer {
|
|||
InetSocketAddress addr = new InetSocketAddress(port);
|
||||
|
||||
// Configure the server.
|
||||
bootstrap = new ServerBootstrap();
|
||||
bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
|
||||
bossGroup = new OioEventLoopGroup();
|
||||
workerGroup = new OioEventLoopGroup();
|
||||
|
||||
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||
bootstrap.group(bossGroup, workerGroup)
|
||||
.channel(OioServerSocketChannel.class)
|
||||
.option(ChannelOption.SO_BACKLOG, 100)
|
||||
.option(ChannelOption.SO_RCVBUF, 1500)
|
||||
|
@ -89,13 +92,19 @@ class FileServer {
|
|||
public void stop() {
|
||||
// Close the bound channel.
|
||||
if (channelFuture != null) {
|
||||
channelFuture.channel().close();
|
||||
channelFuture.channel().close().awaitUninterruptibly();
|
||||
channelFuture = null;
|
||||
}
|
||||
// Shutdown bootstrap.
|
||||
if (bootstrap != null) {
|
||||
bootstrap.shutdown();
|
||||
bootstrap = null;
|
||||
|
||||
// Shutdown event groups
|
||||
if (bossGroup != null) {
|
||||
bossGroup.shutdownGracefully();
|
||||
bossGroup = null;
|
||||
}
|
||||
|
||||
if (workerGroup != null) {
|
||||
workerGroup.shutdownGracefully();
|
||||
workerGroup = null;
|
||||
}
|
||||
// TODO: Shutdown all accepted channels as well ?
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import io.netty.handler.codec.DelimiterBasedFrameDecoder;
|
|||
import io.netty.handler.codec.Delimiters;
|
||||
import io.netty.handler.codec.string.StringDecoder;
|
||||
|
||||
|
||||
class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
PathResolver pResolver;
|
||||
|
@ -36,7 +35,7 @@ class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
|
|||
public void initChannel(SocketChannel channel) {
|
||||
channel.pipeline()
|
||||
.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
|
||||
.addLast("strDecoder", new StringDecoder())
|
||||
.addLast("stringDecoder", new StringDecoder())
|
||||
.addLast("handler", new FileServerHandler(pResolver));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,22 +21,26 @@ import java.io.File;
|
|||
import java.io.FileInputStream;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
|
||||
import org.apache.spark.storage.BlockId;
|
||||
import org.apache.spark.storage.FileSegment;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
|
||||
class FileServerHandler extends SimpleChannelInboundHandler<String> {
|
||||
|
||||
PathResolver pResolver;
|
||||
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
|
||||
|
||||
private final PathResolver pResolver;
|
||||
|
||||
public FileServerHandler(PathResolver pResolver){
|
||||
this.pResolver = pResolver;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
|
||||
public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
|
||||
BlockId blockId = BlockId.apply(blockIdString);
|
||||
FileSegment fileSegment = pResolver.getBlockLocation(blockId);
|
||||
// if getBlockLocation returns null, close the channel
|
||||
|
@ -60,10 +64,10 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
|
|||
int len = new Long(length).intValue();
|
||||
ctx.write((new FileHeader(len, blockId)).buffer());
|
||||
try {
|
||||
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
|
||||
ctx.write(new DefaultFileRegion(new FileInputStream(file)
|
||||
.getChannel(), fileSegment.offset(), fileSegment.length()));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
LOG.error("Exception: ", e);
|
||||
}
|
||||
} else {
|
||||
ctx.write(new FileHeader(0, blockId).buffer());
|
||||
|
@ -73,7 +77,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
|
|||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
cause.printStackTrace();
|
||||
LOG.error("Exception: ", cause);
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
# Set everything to be logged to the console
|
||||
log4j.rootCategory=INFO, console
|
||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
|
||||
|
||||
# Ignore messages below warning level from Jetty, because it's a bit verbose
|
||||
log4j.logger.org.eclipse.jetty=WARN
|
|
@ -46,6 +46,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
|
|||
if (server != null) {
|
||||
throw new ServerStateException("Server is already started")
|
||||
} else {
|
||||
logInfo("Starting HTTP Server")
|
||||
server = new Server()
|
||||
val connector = new SocketConnector
|
||||
connector.setMaxIdleTime(60*1000)
|
||||
|
|
|
@ -17,8 +17,8 @@
|
|||
|
||||
package org.apache.spark
|
||||
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.apache.log4j.{LogManager, PropertyConfigurator}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
/**
|
||||
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
|
||||
|
@ -33,6 +33,7 @@ trait Logging {
|
|||
// Method to get or create the logger for this object
|
||||
protected def log: Logger = {
|
||||
if (log_ == null) {
|
||||
initializeIfNecessary()
|
||||
var className = this.getClass.getName
|
||||
// Ignore trailing $'s in the class names for Scala objects
|
||||
if (className.endsWith("$")) {
|
||||
|
@ -89,7 +90,37 @@ trait Logging {
|
|||
log.isTraceEnabled
|
||||
}
|
||||
|
||||
// Method for ensuring that logging is initialized, to avoid having multiple
|
||||
// threads do it concurrently (as SLF4J initialization is not thread safe).
|
||||
protected def initLogging() { log }
|
||||
private def initializeIfNecessary() {
|
||||
if (!Logging.initialized) {
|
||||
Logging.initLock.synchronized {
|
||||
if (!Logging.initialized) {
|
||||
initializeLogging()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def initializeLogging() {
|
||||
// If Log4j doesn't seem initialized, load a default properties file
|
||||
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
|
||||
if (!log4jInitialized) {
|
||||
val defaultLogProps = "org/apache/spark/default-log4j.properties"
|
||||
val classLoader = this.getClass.getClassLoader
|
||||
Option(classLoader.getResource(defaultLogProps)) match {
|
||||
case Some(url) => PropertyConfigurator.configure(url)
|
||||
case None => System.err.println(s"Spark was unable to load $defaultLogProps")
|
||||
}
|
||||
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
|
||||
}
|
||||
Logging.initialized = true
|
||||
|
||||
// Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
|
||||
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
|
||||
log
|
||||
}
|
||||
}
|
||||
|
||||
object Logging {
|
||||
@volatile private var initialized = false
|
||||
val initLock = new Object()
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.spark
|
|||
|
||||
import java.io._
|
||||
import java.net.URI
|
||||
import java.util.Properties
|
||||
import java.util.{UUID, Properties}
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.collection.Map
|
||||
|
@ -88,9 +88,6 @@ class SparkContext(
|
|||
scala.collection.immutable.Map())
|
||||
extends Logging {
|
||||
|
||||
// Ensure logging is initialized before we spawn any threads
|
||||
initLogging()
|
||||
|
||||
// Set Spark driver host and port system properties
|
||||
if (System.getProperty("spark.driver.host") == null) {
|
||||
System.setProperty("spark.driver.host", Utils.localHostName())
|
||||
|
@ -855,22 +852,15 @@ class SparkContext(
|
|||
|
||||
/**
|
||||
* Set the directory under which RDDs are going to be checkpointed. The directory must
|
||||
* be a HDFS path if running on a cluster. If the directory does not exist, it will
|
||||
* be created. If the directory exists and useExisting is set to true, then the
|
||||
* exisiting directory will be used. Otherwise an exception will be thrown to
|
||||
* prevent accidental overriding of checkpoint files in the existing directory.
|
||||
* be a HDFS path if running on a cluster.
|
||||
*/
|
||||
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
|
||||
val path = new Path(dir)
|
||||
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
|
||||
if (!useExisting) {
|
||||
if (fs.exists(path)) {
|
||||
throw new Exception("Checkpoint directory '" + path + "' already exists.")
|
||||
} else {
|
||||
fs.mkdirs(path)
|
||||
}
|
||||
def setCheckpointDir(directory: String) {
|
||||
checkpointDir = Option(directory).map { dir =>
|
||||
val path = new Path(dir, UUID.randomUUID().toString)
|
||||
val fs = path.getFileSystem(hadoopConfiguration)
|
||||
fs.mkdirs(path)
|
||||
fs.getFileStatus(path).getPath().toString
|
||||
}
|
||||
checkpointDir = Some(dir)
|
||||
}
|
||||
|
||||
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
|
||||
|
|
|
@ -53,5 +53,3 @@ private[spark] case class ExceptionFailure(
|
|||
private[spark] case object TaskResultLost extends TaskEndReason
|
||||
|
||||
private[spark] case object TaskKilled extends TaskEndReason
|
||||
|
||||
private[spark] case class OtherFailure(message: String) extends TaskEndReason
|
||||
|
|
|
@ -611,6 +611,42 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
|
|||
* Return an RDD with the values of each tuple.
|
||||
*/
|
||||
def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
|
||||
|
||||
/**
|
||||
* Return approximate number of distinct values for each key in this RDD.
|
||||
* The accuracy of approximation can be controlled through the relative standard deviation
|
||||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
|
||||
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
|
||||
* Partitioner to partition the output RDD.
|
||||
*/
|
||||
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
|
||||
rdd.countApproxDistinctByKey(relativeSD, partitioner)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return approximate number of distinct values for each key this RDD.
|
||||
* The accuracy of approximation can be controlled through the relative standard deviation
|
||||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
|
||||
* more accurate counts but increase the memory footprint and vise versa. The default value of
|
||||
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
|
||||
* level.
|
||||
*/
|
||||
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
|
||||
rdd.countApproxDistinctByKey(relativeSD)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return approximate number of distinct values for each key in this RDD.
|
||||
* The accuracy of approximation can be controlled through the relative standard deviation
|
||||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
|
||||
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
|
||||
* output RDD into numPartitions.
|
||||
*
|
||||
*/
|
||||
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
|
||||
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
|
||||
}
|
||||
}
|
||||
|
||||
object JavaPairRDD {
|
||||
|
|
|
@ -444,4 +444,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
|
||||
takeOrdered(num, comp)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return approximate number of distinct elements in the RDD.
|
||||
*
|
||||
* The accuracy of approximation can be controlled through the relative standard deviation
|
||||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
|
||||
* more accurate counts but increase the memory footprint and vise versa. The default value of
|
||||
* relativeSD is 0.05.
|
||||
*/
|
||||
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
|
||||
|
||||
}
|
||||
|
|
|
@ -381,20 +381,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
|
|||
|
||||
/**
|
||||
* Set the directory under which RDDs are going to be checkpointed. The directory must
|
||||
* be a HDFS path if running on a cluster. If the directory does not exist, it will
|
||||
* be created. If the directory exists and useExisting is set to true, then the
|
||||
* exisiting directory will be used. Otherwise an exception will be thrown to
|
||||
* prevent accidental overriding of checkpoint files in the existing directory.
|
||||
*/
|
||||
def setCheckpointDir(dir: String, useExisting: Boolean) {
|
||||
sc.setCheckpointDir(dir, useExisting)
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the directory under which RDDs are going to be checkpointed. The directory must
|
||||
* be a HDFS path if running on a cluster. If the directory does not exist, it will
|
||||
* be created. If the directory exists, an exception will be thrown to prevent accidental
|
||||
* overriding of checkpoint files.
|
||||
* be a HDFS path if running on a cluster.
|
||||
*/
|
||||
def setCheckpointDir(dir: String) {
|
||||
sc.setCheckpointDir(dir)
|
||||
|
|
|
@ -140,12 +140,12 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
|
|||
<body>
|
||||
{linkToMaster}
|
||||
<div>
|
||||
<div style="float:left;width:40%">{backButton}</div>
|
||||
<div style="float:left; margin-right:10px">{backButton}</div>
|
||||
<div style="float:left;">{range}</div>
|
||||
<div style="float:right;">{nextButton}</div>
|
||||
<div style="float:right; margin-left:10px">{nextButton}</div>
|
||||
</div>
|
||||
<br />
|
||||
<div style="height:500px;overflow:auto;padding:5px;">
|
||||
<div style="height:500px; overflow:auto; padding:5px;">
|
||||
<pre>{logText}</pre>
|
||||
</div>
|
||||
</body>
|
||||
|
|
|
@ -48,8 +48,6 @@ private[spark] class Executor(
|
|||
|
||||
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
|
||||
|
||||
initLogging()
|
||||
|
||||
// No ip or host:port - just hostname
|
||||
Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
|
||||
// must not have port specified.
|
||||
|
|
|
@ -26,7 +26,6 @@ import scala.util.matching.Regex
|
|||
import org.apache.spark.Logging
|
||||
|
||||
private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
|
||||
initLogging()
|
||||
|
||||
val DEFAULT_PREFIX = "*"
|
||||
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
|
||||
|
|
|
@ -63,7 +63,6 @@ import org.apache.spark.metrics.source.Source
|
|||
* [options] is the specific property of this source or sink.
|
||||
*/
|
||||
private[spark] class MetricsSystem private (val instance: String) extends Logging {
|
||||
initLogging()
|
||||
|
||||
val confFile = System.getProperty("spark.metrics.conf")
|
||||
val metricsConfig = new MetricsConfig(Option(confFile))
|
||||
|
|
|
@ -18,12 +18,12 @@
|
|||
package org.apache.spark.rdd
|
||||
|
||||
import java.io.IOException
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
|
||||
|
||||
|
@ -34,6 +34,8 @@ private[spark]
|
|||
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
|
||||
extends RDD[T](sc, Nil) {
|
||||
|
||||
val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
|
||||
|
||||
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
|
||||
|
||||
override def getPartitions: Array[Partition] = {
|
||||
|
@ -65,7 +67,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
|
|||
|
||||
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
|
||||
val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
|
||||
CheckpointRDD.readFromFile(file, context)
|
||||
CheckpointRDD.readFromFile(file, broadcastedConf, context)
|
||||
}
|
||||
|
||||
override def checkpoint() {
|
||||
|
@ -79,10 +81,14 @@ private[spark] object CheckpointRDD extends Logging {
|
|||
"part-%05d".format(splitId)
|
||||
}
|
||||
|
||||
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
|
||||
def writeToFile[T](
|
||||
path: String,
|
||||
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
|
||||
blockSize: Int = -1
|
||||
)(ctx: TaskContext, iterator: Iterator[T]) {
|
||||
val env = SparkEnv.get
|
||||
val outputDir = new Path(path)
|
||||
val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
|
||||
val fs = outputDir.getFileSystem(broadcastedConf.value.value)
|
||||
|
||||
val finalOutputName = splitIdToFile(ctx.partitionId)
|
||||
val finalOutputPath = new Path(outputDir, finalOutputName)
|
||||
|
@ -119,9 +125,13 @@ private[spark] object CheckpointRDD extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
|
||||
def readFromFile[T](
|
||||
path: Path,
|
||||
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
|
||||
context: TaskContext
|
||||
): Iterator[T] = {
|
||||
val env = SparkEnv.get
|
||||
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
|
||||
val fs = path.getFileSystem(broadcastedConf.value.value)
|
||||
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
|
||||
val fileInputStream = fs.open(path, bufferSize)
|
||||
val serializer = env.serializer.newInstance()
|
||||
|
@ -144,8 +154,10 @@ private[spark] object CheckpointRDD extends Logging {
|
|||
val sc = new SparkContext(cluster, "CheckpointRDD Test")
|
||||
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
|
||||
val path = new Path(hdfsPath, "temp")
|
||||
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
|
||||
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
|
||||
val conf = SparkHadoopUtil.get.newConfiguration()
|
||||
val fs = path.getFileSystem(conf)
|
||||
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
|
||||
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf, 1024) _)
|
||||
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
|
||||
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
|
||||
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
|
||||
|
|
|
@ -40,12 +40,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
|
|||
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
|
||||
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
|
||||
|
||||
import com.clearspring.analytics.stream.cardinality.HyperLogLog
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark.partial.{BoundedDouble, PartialResult}
|
||||
import org.apache.spark.Aggregator
|
||||
import org.apache.spark.Partitioner
|
||||
import org.apache.spark.Partitioner.defaultPartitioner
|
||||
import org.apache.spark.util.SerializableHyperLogLog
|
||||
|
||||
/**
|
||||
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
|
||||
|
@ -207,6 +210,45 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
|
|||
self.map(_._1).countByValueApprox(timeout, confidence)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return approximate number of distinct values for each key in this RDD.
|
||||
* The accuracy of approximation can be controlled through the relative standard deviation
|
||||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
|
||||
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
|
||||
* Partitioner to partition the output RDD.
|
||||
*/
|
||||
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
|
||||
val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
|
||||
val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
|
||||
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
|
||||
|
||||
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
|
||||
}
|
||||
|
||||
/**
|
||||
* Return approximate number of distinct values for each key in this RDD.
|
||||
* The accuracy of approximation can be controlled through the relative standard deviation
|
||||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
|
||||
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
|
||||
* output RDD into numPartitions.
|
||||
*
|
||||
*/
|
||||
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
|
||||
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
|
||||
}
|
||||
|
||||
/**
|
||||
* Return approximate number of distinct values for each key this RDD.
|
||||
* The accuracy of approximation can be controlled through the relative standard deviation
|
||||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
|
||||
* more accurate counts but increase the memory footprint and vise versa. The default value of
|
||||
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
|
||||
* level.
|
||||
*/
|
||||
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
|
||||
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge the values for each key using an associative reduce function. This will also perform
|
||||
* the merging locally on each mapper before sending results to a reducer, similarly to a
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text
|
|||
import org.apache.hadoop.mapred.TextOutputFormat
|
||||
|
||||
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
|
||||
import com.clearspring.analytics.stream.cardinality.HyperLogLog
|
||||
|
||||
import org.apache.spark.Partitioner._
|
||||
import org.apache.spark.api.java.JavaRDD
|
||||
|
@ -41,7 +42,7 @@ import org.apache.spark.partial.CountEvaluator
|
|||
import org.apache.spark.partial.GroupedCountEvaluator
|
||||
import org.apache.spark.partial.PartialResult
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.util.{Utils, BoundedPriorityQueue}
|
||||
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
|
||||
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark._
|
||||
|
@ -788,6 +789,19 @@ abstract class RDD[T: ClassTag](
|
|||
sc.runApproximateJob(this, countPartition, evaluator, timeout)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return approximate number of distinct elements in the RDD.
|
||||
*
|
||||
* The accuracy of approximation can be controlled through the relative standard deviation
|
||||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
|
||||
* more accurate counts but increase the memory footprint and vise versa. The default value of
|
||||
* relativeSD is 0.05.
|
||||
*/
|
||||
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
|
||||
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
|
||||
aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
|
||||
}
|
||||
|
||||
/**
|
||||
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
|
||||
* results from that partition to estimate the number of additional partitions needed to satisfy
|
||||
|
|
|
@ -22,7 +22,7 @@ import scala.reflect.ClassTag
|
|||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
|
||||
import org.apache.spark.{Partition, SparkException, Logging}
|
||||
import org.apache.spark.{SerializableWritable, Partition, SparkException, Logging}
|
||||
import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
|
||||
|
||||
/**
|
||||
|
@ -85,14 +85,21 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
|
|||
|
||||
// Create the output path for the checkpoint
|
||||
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
|
||||
val fs = path.getFileSystem(new Configuration())
|
||||
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
|
||||
if (!fs.mkdirs(path)) {
|
||||
throw new SparkException("Failed to create checkpoint path " + path)
|
||||
}
|
||||
|
||||
// Save to file, and reload it as an RDD
|
||||
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
|
||||
val broadcastedConf = rdd.context.broadcast(
|
||||
new SerializableWritable(rdd.context.hadoopConfiguration))
|
||||
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
|
||||
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
|
||||
if (newRDD.partitions.size != rdd.partitions.size) {
|
||||
throw new SparkException(
|
||||
"Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " +
|
||||
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
|
||||
}
|
||||
|
||||
// Change the dependencies and partitions of the RDD
|
||||
RDDCheckpointData.synchronized {
|
||||
|
@ -101,8 +108,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
|
|||
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
|
||||
cpState = Checkpointed
|
||||
RDDCheckpointData.clearTaskCaches()
|
||||
logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
|
||||
}
|
||||
logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
|
||||
}
|
||||
|
||||
// Get preferred location of a split after checkpointing
|
||||
|
|
|
@ -152,7 +152,8 @@ class DAGScheduler(
|
|||
val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
|
||||
val running = new HashSet[Stage] // Stages we are running right now
|
||||
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
|
||||
val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage
|
||||
// Missing tasks from each stage
|
||||
val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
|
||||
var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits
|
||||
|
||||
val activeJobs = new HashSet[ActiveJob]
|
||||
|
@ -239,7 +240,8 @@ class DAGScheduler(
|
|||
shuffleToMapStage.get(shuffleDep.shuffleId) match {
|
||||
case Some(stage) => stage
|
||||
case None =>
|
||||
val stage = newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
|
||||
val stage =
|
||||
newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
|
||||
shuffleToMapStage(shuffleDep.shuffleId) = stage
|
||||
stage
|
||||
}
|
||||
|
@ -248,7 +250,8 @@ class DAGScheduler(
|
|||
/**
|
||||
* Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
|
||||
* of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided
|
||||
* jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage directly.
|
||||
* jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
|
||||
* directly.
|
||||
*/
|
||||
private def newStage(
|
||||
rdd: RDD[_],
|
||||
|
@ -358,7 +361,8 @@ class DAGScheduler(
|
|||
stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
|
||||
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
|
||||
val parents = getParentStages(s.rdd, jobId)
|
||||
val parentsWithoutThisJobId = parents.filter(p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
|
||||
val parentsWithoutThisJobId = parents.filter(p =>
|
||||
!stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
|
||||
updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
|
||||
}
|
||||
}
|
||||
|
@ -366,8 +370,9 @@ class DAGScheduler(
|
|||
}
|
||||
|
||||
/**
|
||||
* Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that
|
||||
* were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation.
|
||||
* Removes job and any stages that are not needed by any other job. Returns the set of ids for
|
||||
* stages that were removed. The associated tasks for those stages need to be cancelled if we
|
||||
* got here via job cancellation.
|
||||
*/
|
||||
private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
|
||||
val registeredStages = jobIdToStageIds(jobId)
|
||||
|
@ -378,7 +383,8 @@ class DAGScheduler(
|
|||
stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach {
|
||||
case (stageId, jobSet) =>
|
||||
if (!jobSet.contains(jobId)) {
|
||||
logError("Job %d not registered for stage %d even though that stage was registered for the job"
|
||||
logError(
|
||||
"Job %d not registered for stage %d even though that stage was registered for the job"
|
||||
.format(jobId, stageId))
|
||||
} else {
|
||||
def removeStage(stageId: Int) {
|
||||
|
@ -389,7 +395,8 @@ class DAGScheduler(
|
|||
running -= s
|
||||
}
|
||||
stageToInfos -= s
|
||||
shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleToMapStage.remove)
|
||||
shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
|
||||
shuffleToMapStage.remove(shuffleId))
|
||||
if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
|
||||
logDebug("Removing pending status for stage %d".format(stageId))
|
||||
}
|
||||
|
@ -407,7 +414,8 @@ class DAGScheduler(
|
|||
stageIdToStage -= stageId
|
||||
stageIdToJobIds -= stageId
|
||||
|
||||
logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size))
|
||||
logDebug("After removal of stage %d, remaining stages = %d"
|
||||
.format(stageId, stageIdToStage.size))
|
||||
}
|
||||
|
||||
jobSet -= jobId
|
||||
|
@ -459,7 +467,8 @@ class DAGScheduler(
|
|||
assert(partitions.size > 0)
|
||||
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
|
||||
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
|
||||
eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
|
||||
eventProcessActor ! JobSubmitted(
|
||||
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
|
||||
waiter
|
||||
}
|
||||
|
||||
|
@ -494,7 +503,8 @@ class DAGScheduler(
|
|||
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
|
||||
val partitions = (0 until rdd.partitions.size).toArray
|
||||
val jobId = nextJobId.getAndIncrement()
|
||||
eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
|
||||
eventProcessActor ! JobSubmitted(
|
||||
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
|
||||
listener.awaitResult() // Will throw an exception if the job fails
|
||||
}
|
||||
|
||||
|
@ -529,8 +539,8 @@ class DAGScheduler(
|
|||
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
|
||||
var finalStage: Stage = null
|
||||
try {
|
||||
// New stage creation at times and if its not protected, the scheduler thread is killed.
|
||||
// e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted
|
||||
// New stage creation may throw an exception if, for example, jobs are run on a HadoopRDD
|
||||
// whose underlying HDFS files have been deleted.
|
||||
finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
|
@ -563,7 +573,8 @@ class DAGScheduler(
|
|||
case JobGroupCancelled(groupId) =>
|
||||
// Cancel all jobs belonging to this job group.
|
||||
// First finds all active jobs with this group id, and then kill stages for them.
|
||||
val activeInGroup = activeJobs.filter(groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
|
||||
val activeInGroup = activeJobs.filter(activeJob =>
|
||||
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
|
||||
val jobIds = activeInGroup.map(_.jobId)
|
||||
jobIds.foreach { handleJobCancellation }
|
||||
|
||||
|
@ -585,7 +596,8 @@ class DAGScheduler(
|
|||
stage <- stageIdToStage.get(task.stageId);
|
||||
stageInfo <- stageToInfos.get(stage)
|
||||
) {
|
||||
if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && !stageInfo.emittedTaskSizeWarning) {
|
||||
if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 &&
|
||||
!stageInfo.emittedTaskSizeWarning) {
|
||||
stageInfo.emittedTaskSizeWarning = true
|
||||
logWarning(("Stage %d (%s) contains a task of very large " +
|
||||
"size (%d KB). The maximum recommended task size is %d KB.").format(
|
||||
|
@ -815,7 +827,7 @@ class DAGScheduler(
|
|||
}
|
||||
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
|
||||
stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
|
||||
listenerBus.post(StageCompleted(stageToInfos(stage)))
|
||||
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
|
||||
running -= stage
|
||||
}
|
||||
event.reason match {
|
||||
|
|
|
@ -297,7 +297,7 @@ class JobLogger(val user: String, val logDirName: String)
|
|||
* When stage is completed, record stage completion status
|
||||
* @param stageCompleted Stage completed event
|
||||
*/
|
||||
override def onStageCompleted(stageCompleted: StageCompleted) {
|
||||
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
|
||||
stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
|
||||
stageCompleted.stage.stageId))
|
||||
}
|
||||
|
@ -328,10 +328,6 @@ class JobLogger(val user: String, val logDirName: String)
|
|||
task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
|
||||
mapId + " REDUCE_ID=" + reduceId
|
||||
stageLogInfo(task.stageId, taskStatus)
|
||||
case OtherFailure(message) =>
|
||||
taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId +
|
||||
" STAGE_ID=" + task.stageId + " INFO=" + message
|
||||
stageLogInfo(task.stageId, taskStatus)
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,8 +117,4 @@ private[spark] class Pool(
|
|||
parent.decreaseRunningTasks(taskNum)
|
||||
}
|
||||
}
|
||||
|
||||
override def hasPendingTasks(): Boolean = {
|
||||
schedulableQueue.exists(_.hasPendingTasks())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,5 +42,4 @@ private[spark] trait Schedulable {
|
|||
def executorLost(executorId: String, host: String): Unit
|
||||
def checkSpeculatableTasks(): Boolean
|
||||
def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
|
||||
def hasPendingTasks(): Boolean
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ sealed trait SparkListenerEvents
|
|||
case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
|
||||
extends SparkListenerEvents
|
||||
|
||||
case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents
|
||||
case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
|
||||
|
||||
case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
|
||||
|
||||
|
@ -47,7 +47,7 @@ trait SparkListener {
|
|||
/**
|
||||
* Called when a stage is completed, with information on the completed stage
|
||||
*/
|
||||
def onStageCompleted(stageCompleted: StageCompleted) { }
|
||||
def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
|
||||
|
||||
/**
|
||||
* Called when a stage is submitted
|
||||
|
@ -86,7 +86,7 @@ trait SparkListener {
|
|||
* Simple SparkListener that logs a few summary statistics when each stage completes
|
||||
*/
|
||||
class StatsReportListener extends SparkListener with Logging {
|
||||
override def onStageCompleted(stageCompleted: StageCompleted) {
|
||||
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
|
||||
import org.apache.spark.scheduler.StatsReportListener._
|
||||
implicit val sc = stageCompleted
|
||||
this.logInfo("Finished stage: " + stageCompleted.stage)
|
||||
|
@ -119,13 +119,17 @@ object StatsReportListener extends Logging {
|
|||
val probabilities = percentiles.map{_ / 100.0}
|
||||
val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
|
||||
|
||||
def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = {
|
||||
def extractDoubleDistribution(stage: SparkListenerStageCompleted,
|
||||
getMetric: (TaskInfo,TaskMetrics) => Option[Double])
|
||||
: Option[Distribution] = {
|
||||
Distribution(stage.stage.taskInfos.flatMap {
|
||||
case ((info,metric)) => getMetric(info, metric)})
|
||||
}
|
||||
|
||||
//is there some way to setup the types that I can get rid of this completely?
|
||||
def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = {
|
||||
def extractLongDistribution(stage: SparkListenerStageCompleted,
|
||||
getMetric: (TaskInfo,TaskMetrics) => Option[Long])
|
||||
: Option[Distribution] = {
|
||||
extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
|
||||
}
|
||||
|
||||
|
@ -147,12 +151,12 @@ object StatsReportListener extends Logging {
|
|||
}
|
||||
|
||||
def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
|
||||
(implicit stage: StageCompleted) {
|
||||
(implicit stage: SparkListenerStageCompleted) {
|
||||
showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
|
||||
}
|
||||
|
||||
def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
|
||||
(implicit stage: StageCompleted) {
|
||||
(implicit stage: SparkListenerStageCompleted) {
|
||||
showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
|
||||
}
|
||||
|
||||
|
@ -169,7 +173,7 @@ object StatsReportListener extends Logging {
|
|||
}
|
||||
|
||||
def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
|
||||
(implicit stage: StageCompleted) {
|
||||
(implicit stage: SparkListenerStageCompleted) {
|
||||
showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ private[spark] class SparkListenerBus() extends Logging {
|
|||
event match {
|
||||
case stageSubmitted: SparkListenerStageSubmitted =>
|
||||
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
|
||||
case stageCompleted: StageCompleted =>
|
||||
case stageCompleted: SparkListenerStageCompleted =>
|
||||
sparkListeners.foreach(_.onStageCompleted(stageCompleted))
|
||||
case jobStart: SparkListenerJobStart =>
|
||||
sparkListeners.foreach(_.onJobStart(jobStart))
|
||||
|
|
|
@ -365,13 +365,6 @@ private[spark] class TaskSchedulerImpl(
|
|||
}
|
||||
}
|
||||
|
||||
// Check for pending tasks in all our active jobs.
|
||||
def hasPendingTasks: Boolean = {
|
||||
synchronized {
|
||||
rootPool.hasPendingTasks()
|
||||
}
|
||||
}
|
||||
|
||||
def executorLost(executorId: String, reason: ExecutorLossReason) {
|
||||
var failedExecutor: Option[String] = None
|
||||
|
||||
|
|
|
@ -112,10 +112,6 @@ private[spark] class TaskSetManager(
|
|||
// Task index, start and finish time for each task attempt (indexed by task ID)
|
||||
val taskInfos = new HashMap[Long, TaskInfo]
|
||||
|
||||
// Did the TaskSet fail?
|
||||
var failed = false
|
||||
var causeOfFailure = ""
|
||||
|
||||
// How frequently to reprint duplicate exceptions in full, in milliseconds
|
||||
val EXCEPTION_PRINT_INTERVAL =
|
||||
System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
|
||||
|
@ -556,8 +552,6 @@ private[spark] class TaskSetManager(
|
|||
}
|
||||
|
||||
def abort(message: String) {
|
||||
failed = true
|
||||
causeOfFailure = message
|
||||
// TODO: Kill running tasks if we were not terminated due to a Mesos error
|
||||
sched.dagScheduler.taskSetFailed(taskSet, message)
|
||||
removeAllRunningTasks()
|
||||
|
@ -681,10 +675,6 @@ private[spark] class TaskSetManager(
|
|||
return foundTasks
|
||||
}
|
||||
|
||||
override def hasPendingTasks(): Boolean = {
|
||||
numTasks > 0 && tasksSuccessful < numTasks
|
||||
}
|
||||
|
||||
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
|
||||
val defaultWait = System.getProperty("spark.locality.wait", "3000")
|
||||
level match {
|
||||
|
|
|
@ -50,8 +50,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
|
|||
|
||||
private val akkaTimeout = AkkaUtils.askTimeout
|
||||
|
||||
initLogging()
|
||||
|
||||
val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
|
||||
"" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.spark.util.Utils
|
|||
* TODO: Use event model.
|
||||
*/
|
||||
private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
|
||||
initLogging()
|
||||
|
||||
blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
|
||||
|
||||
|
@ -101,8 +100,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
|
|||
private[spark] object BlockManagerWorker extends Logging {
|
||||
private var blockManagerWorker: BlockManagerWorker = null
|
||||
|
||||
initLogging()
|
||||
|
||||
def startBlockManagerWorker(manager: BlockManager) {
|
||||
blockManagerWorker = new BlockManagerWorker(manager)
|
||||
}
|
||||
|
|
|
@ -37,8 +37,6 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
|
|||
|
||||
def length = blockMessages.length
|
||||
|
||||
initLogging()
|
||||
|
||||
def set(bufferMessage: BufferMessage) {
|
||||
val startTime = System.currentTimeMillis
|
||||
val newBlockMessages = new ArrayBuffer[BlockMessage]()
|
||||
|
|
|
@ -61,7 +61,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
|
|||
|
||||
override def onJobStart(jobStart: SparkListenerJobStart) {}
|
||||
|
||||
override def onStageCompleted(stageCompleted: StageCompleted) = synchronized {
|
||||
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
|
||||
val stage = stageCompleted.stage
|
||||
poolToActiveStages(stageIdToPool(stage.stageId)) -= stage
|
||||
activeStages -= stage
|
||||
|
@ -146,12 +146,9 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
|
|||
// update duration
|
||||
y.taskTime += taskEnd.taskInfo.duration
|
||||
|
||||
taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead =>
|
||||
y.shuffleRead += shuffleRead.remoteBytesRead
|
||||
}
|
||||
|
||||
taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite =>
|
||||
y.shuffleWrite += shuffleWrite.shuffleBytesWritten
|
||||
Option(taskEnd.taskMetrics).foreach { taskMetrics =>
|
||||
taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
|
||||
taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
|
||||
}
|
||||
}
|
||||
case _ => {}
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.util
|
||||
|
||||
import java.io.{Externalizable, ObjectOutput, ObjectInput}
|
||||
import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
|
||||
|
||||
/**
|
||||
* A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
|
||||
*/
|
||||
private[spark]
|
||||
class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
|
||||
|
||||
def this() = this(null) // For deserialization
|
||||
|
||||
def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
|
||||
|
||||
def add[T](elem: T) = {
|
||||
this.value.offer(elem)
|
||||
this
|
||||
}
|
||||
|
||||
def readExternal(in: ObjectInput) {
|
||||
val byteLength = in.readInt()
|
||||
val bytes = new Array[Byte](byteLength)
|
||||
in.readFully(bytes)
|
||||
value = HyperLogLog.Builder.build(bytes)
|
||||
}
|
||||
|
||||
def writeExternal(out: ObjectOutput) {
|
||||
val bytes = value.getBytes()
|
||||
out.writeInt(bytes.length)
|
||||
out.write(bytes)
|
||||
}
|
||||
}
|
|
@ -26,8 +26,6 @@ import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
|
|||
import org.apache.spark.util.Utils
|
||||
|
||||
class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
|
||||
initLogging()
|
||||
|
||||
var checkpointDir: File = _
|
||||
val partitioner = new HashPartitioner(2)
|
||||
|
||||
|
|
|
@ -851,7 +851,7 @@ public class JavaAPISuite implements Serializable {
|
|||
public void checkpointAndComputation() {
|
||||
File tempDir = Files.createTempDir();
|
||||
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
|
||||
sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
|
||||
sc.setCheckpointDir(tempDir.getAbsolutePath());
|
||||
Assert.assertEquals(false, rdd.isCheckpointed());
|
||||
rdd.checkpoint();
|
||||
rdd.count(); // Forces the DAG to cause a checkpoint
|
||||
|
@ -863,7 +863,7 @@ public class JavaAPISuite implements Serializable {
|
|||
public void checkpointAndRestore() {
|
||||
File tempDir = Files.createTempDir();
|
||||
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
|
||||
sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
|
||||
sc.setCheckpointDir(tempDir.getAbsolutePath());
|
||||
Assert.assertEquals(false, rdd.isCheckpointed());
|
||||
rdd.checkpoint();
|
||||
rdd.count(); // Forces the DAG to cause a checkpoint
|
||||
|
@ -930,4 +930,36 @@ public class JavaAPISuite implements Serializable {
|
|||
parts[1]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void countApproxDistinct() {
|
||||
List<Integer> arrayData = new ArrayList<Integer>();
|
||||
int size = 100;
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
arrayData.add(i % size);
|
||||
}
|
||||
JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
|
||||
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
|
||||
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
|
||||
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void countApproxDistinctByKey() {
|
||||
double relativeSD = 0.001;
|
||||
|
||||
List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
|
||||
for (int i = 10; i < 100; i++)
|
||||
for (int j = 0; j < i; j++)
|
||||
arrayData.add(new Tuple2<Integer, Integer>(i, j));
|
||||
|
||||
JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
|
||||
List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect();
|
||||
for (Tuple2<Integer, Object> resItem : res) {
|
||||
double count = (double)resItem._1();
|
||||
Long resCount = (Long)resItem._2();
|
||||
Double error = Math.abs((resCount - count) / count);
|
||||
Assert.assertTrue(error < relativeSD);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.spark.rdd
|
|||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.collection.mutable.HashSet
|
||||
import scala.util.Random
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
|
@ -109,6 +110,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
|
|||
assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
|
||||
}
|
||||
|
||||
test("countApproxDistinctByKey") {
|
||||
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
|
||||
|
||||
/* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
|
||||
* only a statistical bound, the tests can fail for large values of relativeSD. We will be using
|
||||
* relatively tight error bounds to check correctness of functionality rather than checking
|
||||
* whether the approximation conforms with the requested bound.
|
||||
*/
|
||||
val relativeSD = 0.001
|
||||
|
||||
// For each value i, there are i tuples with first element equal to i.
|
||||
// Therefore, the expected count for key i would be i.
|
||||
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
|
||||
val rdd1 = sc.parallelize(stacked)
|
||||
val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
|
||||
counted1.foreach{
|
||||
case(k, count) => assert(error(count, k) < relativeSD)
|
||||
}
|
||||
|
||||
val rnd = new Random()
|
||||
|
||||
// The expected count for key num would be num
|
||||
val randStacked = (1 to 100).flatMap { i =>
|
||||
val num = rnd.nextInt % 500
|
||||
(1 to num).map(j => (num, j))
|
||||
}
|
||||
val rdd2 = sc.parallelize(randStacked)
|
||||
val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
|
||||
counted2.foreach{
|
||||
case(k, count) => assert(error(count, k) < relativeSD)
|
||||
}
|
||||
}
|
||||
|
||||
test("join") {
|
||||
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
||||
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
||||
|
|
|
@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext {
|
|||
}
|
||||
}
|
||||
|
||||
test("countApproxDistinct") {
|
||||
|
||||
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
|
||||
|
||||
val size = 100
|
||||
val uniformDistro = for (i <- 1 to 100000) yield i % size
|
||||
val simpleRdd = sc.makeRDD(uniformDistro)
|
||||
assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
|
||||
assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
|
||||
assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
|
||||
assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
|
||||
}
|
||||
|
||||
test("SparkContext.union") {
|
||||
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
|
||||
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
|
||||
|
|
|
@ -117,7 +117,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
|
|||
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1
|
||||
override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
|
||||
override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
|
||||
override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
|
||||
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1
|
||||
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
|
||||
}
|
||||
sc.addSparkListener(joblogger)
|
||||
|
|
|
@ -174,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
|
|||
|
||||
class SaveStageInfo extends SparkListener {
|
||||
val stageInfos = Buffer[StageInfo]()
|
||||
override def onStageCompleted(stage: StageCompleted) {
|
||||
override def onStageCompleted(stage: SparkListenerStageCompleted) {
|
||||
stageInfos += stage.stage
|
||||
}
|
||||
}
|
||||
|
|
|
@ -172,6 +172,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
|
|||
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11))
|
||||
}
|
||||
|
||||
test("kryo with SerializableHyperLogLog") {
|
||||
assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3)
|
||||
}
|
||||
|
||||
test("kryo with reduce") {
|
||||
val control = 1 :: 2 :: Nil
|
||||
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
|
||||
|
|
7
pom.xml
7
pom.xml
|
@ -200,6 +200,11 @@
|
|||
<artifactId>asm</artifactId>
|
||||
<version>4.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.clearspring.analytics</groupId>
|
||||
<artifactId>stream</artifactId>
|
||||
<version>2.4.0</version>
|
||||
</dependency>
|
||||
<!-- In theory we need not directly depend on protobuf since Spark does not directly
|
||||
use it. However, when building with Hadoop/YARN 2.2 Maven doesn't correctly bump
|
||||
the protobuf version up from the one Mesos gives. For now we include this variable
|
||||
|
@ -282,7 +287,7 @@
|
|||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
<version>4.0.0.CR1</version>
|
||||
<version>4.0.13.Final</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
|
|
|
@ -178,7 +178,7 @@ object SparkBuild extends Build {
|
|||
|
||||
|
||||
libraryDependencies ++= Seq(
|
||||
"io.netty" % "netty-all" % "4.0.0.CR1",
|
||||
"io.netty" % "netty-all" % "4.0.13.Final",
|
||||
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
|
||||
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
|
||||
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
|
||||
|
@ -247,7 +247,8 @@ object SparkBuild extends Build {
|
|||
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
|
||||
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
|
||||
"com.twitter" %% "chill" % "0.3.1",
|
||||
"com.twitter" % "chill-java" % "0.3.1"
|
||||
"com.twitter" % "chill-java" % "0.3.1",
|
||||
"com.clearspring.analytics" % "stream" % "2.5.1"
|
||||
)
|
||||
)
|
||||
|
||||
|
|
|
@ -320,17 +320,12 @@ class SparkContext(object):
|
|||
self._python_includes.append(filename)
|
||||
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
|
||||
|
||||
def setCheckpointDir(self, dirName, useExisting=False):
|
||||
def setCheckpointDir(self, dirName):
|
||||
"""
|
||||
Set the directory under which RDDs are going to be checkpointed. The
|
||||
directory must be a HDFS path if running on a cluster.
|
||||
|
||||
If the directory does not exist, it will be created. If the directory
|
||||
exists and C{useExisting} is set to true, then the exisiting directory
|
||||
will be used. Otherwise an exception will be thrown to prevent
|
||||
accidental overriding of checkpoint files in the existing directory.
|
||||
"""
|
||||
self._jsc.sc().setCheckpointDir(dirName, useExisting)
|
||||
self._jsc.sc().setCheckpointDir(dirName)
|
||||
|
||||
def _getJavaStorageLevel(self, storageLevel):
|
||||
"""
|
||||
|
|
|
@ -73,8 +73,8 @@ class TestCheckpoint(PySparkTestCase):
|
|||
time.sleep(1) # 1 second
|
||||
self.assertTrue(flatMappedRDD.isCheckpointed())
|
||||
self.assertEqual(flatMappedRDD.collect(), result)
|
||||
self.assertEqual(self.checkpointDir.name,
|
||||
os.path.dirname(flatMappedRDD.getCheckpointFile()))
|
||||
self.assertEqual("file:" + self.checkpointDir.name,
|
||||
os.path.dirname(os.path.dirname(flatMappedRDD.getCheckpointFile())))
|
||||
|
||||
def test_checkpoint_and_restore(self):
|
||||
parCollection = self.sc.parallelize([1, 2, 3, 4])
|
||||
|
|
|
@ -129,11 +129,16 @@ fi
|
|||
|
||||
# Compute classpath using external script
|
||||
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
|
||||
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
|
||||
|
||||
if [ "$1" == "org.apache.spark.tools.JavaAPICompletenessChecker" ]; then
|
||||
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
|
||||
fi
|
||||
|
||||
if $cygwin; then
|
||||
CLASSPATH=`cygpath -wp $CLASSPATH`
|
||||
export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR`
|
||||
if [ "$1" == "org.apache.spark.tools.JavaAPICompletenessChecker" ]; then
|
||||
export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR`
|
||||
fi
|
||||
fi
|
||||
export CLASSPATH
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ rem See the License for the specific language governing permissions and
|
|||
rem limitations under the License.
|
||||
rem
|
||||
|
||||
set SCALA_VERSION=2.9.3
|
||||
set SCALA_VERSION=2.10
|
||||
|
||||
rem Figure out where the Spark framework is installed
|
||||
set FWDIR=%~dp0
|
||||
|
|
|
@ -21,12 +21,13 @@ import java.io._
|
|||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.RejectedExecutionException
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.io.CompressionCodec
|
||||
import org.apache.spark.util.MetadataCleaner
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
|
||||
|
||||
private[streaming]
|
||||
|
@ -54,36 +55,34 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
|
|||
|
||||
|
||||
/**
|
||||
* Convenience class to speed up the writing of graph checkpoint to file
|
||||
* Convenience class to handle the writing of graph checkpoint to file
|
||||
*/
|
||||
private[streaming]
|
||||
class CheckpointWriter(checkpointDir: String) extends Logging {
|
||||
class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging {
|
||||
val file = new Path(checkpointDir, "graph")
|
||||
// The file to which we actually write - and then "move" to file.
|
||||
private val writeFile = new Path(file.getParent, file.getName + ".next")
|
||||
private val bakFile = new Path(file.getParent, file.getName + ".bk")
|
||||
val MAX_ATTEMPTS = 3
|
||||
val executor = Executors.newFixedThreadPool(1)
|
||||
val compressionCodec = CompressionCodec.createCodec()
|
||||
// The file to which we actually write - and then "move" to file
|
||||
val writeFile = new Path(file.getParent, file.getName + ".next")
|
||||
// The file to which existing checkpoint is backed up (i.e. "moved")
|
||||
val bakFile = new Path(file.getParent, file.getName + ".bk")
|
||||
|
||||
private var stopped = false
|
||||
private var fs_ : FileSystem = _
|
||||
|
||||
val conf = new Configuration()
|
||||
var fs = file.getFileSystem(conf)
|
||||
val maxAttempts = 3
|
||||
val executor = Executors.newFixedThreadPool(1)
|
||||
|
||||
private val compressionCodec = CompressionCodec.createCodec()
|
||||
|
||||
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
|
||||
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
|
||||
// I did not notice any errors - reintroduce it ?
|
||||
|
||||
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
|
||||
def run() {
|
||||
var attempts = 0
|
||||
val startTime = System.currentTimeMillis()
|
||||
while (attempts < maxAttempts) {
|
||||
while (attempts < MAX_ATTEMPTS && !stopped) {
|
||||
attempts += 1
|
||||
try {
|
||||
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
|
||||
// This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
|
||||
// This is inherently thread unsafe, so alleviating it by writing to '.new' and
|
||||
// then moving it to the final file
|
||||
val fos = fs.create(writeFile)
|
||||
fos.write(bytes)
|
||||
fos.close()
|
||||
|
@ -101,6 +100,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
|
|||
} catch {
|
||||
case ioe: IOException =>
|
||||
logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
|
||||
reset()
|
||||
}
|
||||
}
|
||||
logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
|
||||
|
@ -133,7 +133,17 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
|
|||
val startTime = System.currentTimeMillis()
|
||||
val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
|
||||
val endTime = System.currentTimeMillis()
|
||||
logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.")
|
||||
logInfo("CheckpointWriter executor terminated ? " + terminated +
|
||||
", waited for " + (endTime - startTime) + " ms.")
|
||||
}
|
||||
|
||||
private def fs = synchronized {
|
||||
if (fs_ == null) fs_ = file.getFileSystem(hadoopConf)
|
||||
fs_
|
||||
}
|
||||
|
||||
private def reset() = synchronized {
|
||||
fs_ = null
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -143,7 +153,8 @@ object CheckpointReader extends Logging {
|
|||
|
||||
def read(path: String): Checkpoint = {
|
||||
val fs = new Path(path).getFileSystem(new Configuration())
|
||||
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
|
||||
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"),
|
||||
new Path(path), new Path(path + ".bk"))
|
||||
|
||||
val compressionCodec = CompressionCodec.createCodec()
|
||||
|
||||
|
@ -158,7 +169,8 @@ object CheckpointReader extends Logging {
|
|||
// loader to find and load classes. This is a well know Java issue and has popped up
|
||||
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
|
||||
val zis = compressionCodec.compressedInputStream(fis)
|
||||
val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
|
||||
val ois = new ObjectInputStreamWithLoader(zis,
|
||||
Thread.currentThread().getContextClassLoader)
|
||||
val cp = ois.readObject.asInstanceOf[Checkpoint]
|
||||
ois.close()
|
||||
fs.close()
|
||||
|
|
|
@ -56,8 +56,6 @@ abstract class DStream[T: ClassTag] (
|
|||
@transient protected[streaming] var ssc: StreamingContext
|
||||
) extends Serializable with Logging {
|
||||
|
||||
initLogging()
|
||||
|
||||
// =======================================================================
|
||||
// Methods that should be implemented by subclasses of DStream
|
||||
// =======================================================================
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.spark.Logging
|
|||
import org.apache.spark.streaming.scheduler.Job
|
||||
|
||||
final private[streaming] class DStreamGraph extends Serializable with Logging {
|
||||
initLogging()
|
||||
|
||||
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
|
||||
private val outputStreams = new ArrayBuffer[DStream[_]]()
|
||||
|
|
|
@ -20,17 +20,19 @@ package org.apache.spark.streaming
|
|||
import akka.actor.Props
|
||||
import akka.actor.SupervisorStrategy
|
||||
import akka.zeromq.Subscribe
|
||||
import akka.util.ByteString
|
||||
|
||||
import org.apache.spark.streaming.dstream._
|
||||
import org.apache.spark.streaming.scheduler.StreamingListener
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.streaming.receivers.ActorReceiver
|
||||
import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
|
||||
import org.apache.spark.streaming.receivers.ZeroMQReceiver
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.util.MetadataCleaner
|
||||
import org.apache.spark.streaming.receivers.ActorReceiver
|
||||
import org.apache.spark.streaming.scheduler.{JobScheduler, NetworkInputTracker}
|
||||
|
||||
import scala.collection.mutable.Queue
|
||||
import scala.collection.Map
|
||||
|
@ -38,17 +40,15 @@ import scala.reflect.ClassTag
|
|||
|
||||
import java.io.InputStream
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.UUID
|
||||
|
||||
import org.apache.hadoop.io.LongWritable
|
||||
import org.apache.hadoop.io.Text
|
||||
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import twitter4j.Status
|
||||
import twitter4j.auth.Authorization
|
||||
import org.apache.spark.streaming.scheduler._
|
||||
import akka.util.ByteString
|
||||
|
||||
/**
|
||||
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
|
||||
|
@ -87,7 +87,6 @@ class StreamingContext private (
|
|||
null, batchDuration)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Re-create a StreamingContext from a checkpoint file.
|
||||
* @param path Path either to the directory that was specified as the checkpoint directory, or
|
||||
|
@ -95,8 +94,6 @@ class StreamingContext private (
|
|||
*/
|
||||
def this(path: String) = this(null, CheckpointReader.read(path), null)
|
||||
|
||||
initLogging()
|
||||
|
||||
if (sc_ == null && cp_ == null) {
|
||||
throw new Exception("Spark Streaming cannot be initialized with " +
|
||||
"both SparkContext and checkpoint as null")
|
||||
|
@ -141,7 +138,7 @@ class StreamingContext private (
|
|||
|
||||
protected[streaming] var checkpointDir: String = {
|
||||
if (isCheckpointPresent) {
|
||||
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
|
||||
sc.setCheckpointDir(cp_.checkpointDir)
|
||||
cp_.checkpointDir
|
||||
} else {
|
||||
null
|
||||
|
@ -176,8 +173,12 @@ class StreamingContext private (
|
|||
*/
|
||||
def checkpoint(directory: String) {
|
||||
if (directory != null) {
|
||||
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
|
||||
checkpointDir = directory
|
||||
val path = new Path(directory)
|
||||
val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
|
||||
fs.mkdirs(path)
|
||||
val fullPath = fs.getFileStatus(path).getPath().toString
|
||||
sc.setCheckpointDir(fullPath)
|
||||
checkpointDir = fullPath
|
||||
} else {
|
||||
checkpointDir = null
|
||||
}
|
||||
|
@ -368,7 +369,8 @@ class StreamingContext private (
|
|||
/**
|
||||
* Create a input stream that monitors a Hadoop-compatible filesystem
|
||||
* for new files and reads them using the given key-value types and input format.
|
||||
* File names starting with . are ignored.
|
||||
* Files must be written to the monitored directory by "moving" them from another
|
||||
* location within the same file system. File names starting with . are ignored.
|
||||
* @param directory HDFS directory to monitor for new file
|
||||
* @tparam K Key type for reading HDFS file
|
||||
* @tparam V Value type for reading HDFS file
|
||||
|
@ -387,6 +389,8 @@ class StreamingContext private (
|
|||
/**
|
||||
* Create a input stream that monitors a Hadoop-compatible filesystem
|
||||
* for new files and reads them using the given key-value types and input format.
|
||||
* Files must be written to the monitored directory by "moving" them from another
|
||||
* location within the same file system.
|
||||
* @param directory HDFS directory to monitor for new file
|
||||
* @param filter Function to filter paths to process
|
||||
* @param newFilesOnly Should process only new files and ignore existing files in the directory
|
||||
|
@ -407,7 +411,9 @@ class StreamingContext private (
|
|||
/**
|
||||
* Create a input stream that monitors a Hadoop-compatible filesystem
|
||||
* for new files and reads them as text files (using key as LongWritable, value
|
||||
* as Text and input format as TextInputFormat). File names starting with . are ignored.
|
||||
* as Text and input format as TextInputFormat). Files must be written to the
|
||||
* monitored directory by "moving" them from another location within the same
|
||||
* file system. File names starting with . are ignored.
|
||||
* @param directory HDFS directory to monitor for new file
|
||||
*/
|
||||
def textFileStream(directory: String): DStream[String] = {
|
||||
|
@ -599,8 +605,4 @@ object StreamingContext {
|
|||
prefix + "-" + time.milliseconds + "." + suffix
|
||||
}
|
||||
}
|
||||
|
||||
protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
|
||||
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
|
||||
}
|
||||
}
|
||||
|
|
|
@ -256,9 +256,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a input stream that monitors a Hadoop-compatible filesystem
|
||||
* Create a input stream that monitors a Hadoop-compatible filesystem
|
||||
* for new files and reads them as text files (using key as LongWritable, value
|
||||
* as Text and input format as TextInputFormat). File names starting with . are ignored.
|
||||
* as Text and input format as TextInputFormat). Files must be written to the
|
||||
* monitored directory by "moving" them from another location within the same
|
||||
* file system. File names starting with . are ignored.
|
||||
* @param directory HDFS directory to monitor for new file
|
||||
*/
|
||||
def textFileStream(directory: String): JavaDStream[String] = {
|
||||
|
@ -300,9 +302,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a input stream that monitors a Hadoop-compatible filesystem
|
||||
* Create a input stream that monitors a Hadoop-compatible filesystem
|
||||
* for new files and reads them using the given key-value types and input format.
|
||||
* File names starting with . are ignored.
|
||||
* Files must be written to the monitored directory by "moving" them from another
|
||||
* location within the same file system. File names starting with . are ignored.
|
||||
* @param directory HDFS directory to monitor for new file
|
||||
* @tparam K Key type for reading HDFS file
|
||||
* @tparam V Value type for reading HDFS file
|
||||
|
@ -331,7 +334,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
|||
|
||||
|
||||
/**
|
||||
* Creates a input stream from a Flume source.
|
||||
* Create a input stream from a Flume source.
|
||||
* @param hostname Hostname of the slave machine to which the flume data will be sent
|
||||
* @param port Port of the slave machine to which the flume data will be sent
|
||||
*/
|
||||
|
|
|
@ -17,18 +17,17 @@
|
|||
|
||||
package org.apache.spark.streaming.dstream
|
||||
|
||||
import java.io.{ObjectInputStream, IOException}
|
||||
import scala.collection.mutable.{HashSet, HashMap}
|
||||
import scala.reflect.ClassTag
|
||||
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
||||
import org.apache.spark.SparkException
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.rdd.UnionRDD
|
||||
import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
|
||||
|
||||
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
||||
|
||||
import scala.collection.mutable.{HashSet, HashMap}
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import java.io.{ObjectInputStream, IOException}
|
||||
|
||||
private[streaming]
|
||||
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
|
||||
|
@ -41,8 +40,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
|
|||
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
|
||||
|
||||
// Latest file mod time seen till any point of time
|
||||
private val lastModTimeFiles = new HashSet[String]()
|
||||
private var lastModTime = 0L
|
||||
private val prevModTimeFiles = new HashSet[String]()
|
||||
private var prevModTime = 0L
|
||||
|
||||
@transient private var path_ : Path = null
|
||||
@transient private var fs_ : FileSystem = null
|
||||
|
@ -50,11 +49,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
|
|||
|
||||
override def start() {
|
||||
if (newFilesOnly) {
|
||||
lastModTime = graph.zeroTime.milliseconds
|
||||
prevModTime = graph.zeroTime.milliseconds
|
||||
} else {
|
||||
lastModTime = 0
|
||||
prevModTime = 0
|
||||
}
|
||||
logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
|
||||
logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
|
||||
}
|
||||
|
||||
override def stop() { }
|
||||
|
@ -69,55 +68,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
|
|||
* the previous call.
|
||||
*/
|
||||
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
|
||||
assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
|
||||
assert(validTime.milliseconds >= prevModTime,
|
||||
"Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")
|
||||
|
||||
// Create the filter for selecting new files
|
||||
val newFilter = new PathFilter() {
|
||||
// Latest file mod time seen in this round of fetching files and its corresponding files
|
||||
var latestModTime = 0L
|
||||
val latestModTimeFiles = new HashSet[String]()
|
||||
|
||||
def accept(path: Path): Boolean = {
|
||||
if (!filter(path)) { // Reject file if it does not satisfy filter
|
||||
logDebug("Rejected by filter " + path)
|
||||
return false
|
||||
} else { // Accept file only if
|
||||
val modTime = fs.getFileStatus(path).getModificationTime()
|
||||
logDebug("Mod time for " + path + " is " + modTime)
|
||||
if (modTime < lastModTime) {
|
||||
logDebug("Mod time less than last mod time")
|
||||
return false // If the file was created before the last time it was called
|
||||
} else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
|
||||
logDebug("Mod time equal to last mod time, but file considered already")
|
||||
return false // If the file was created exactly as lastModTime but not reported yet
|
||||
} else if (modTime > validTime.milliseconds) {
|
||||
logDebug("Mod time more than valid time")
|
||||
return false // If the file was created after the time this function call requires
|
||||
}
|
||||
if (modTime > latestModTime) {
|
||||
latestModTime = modTime
|
||||
latestModTimeFiles.clear()
|
||||
logDebug("Latest mod time updated to " + latestModTime)
|
||||
}
|
||||
latestModTimeFiles += path.toString
|
||||
logDebug("Accepted " + path)
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
|
||||
val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
|
||||
// Find new files
|
||||
val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
|
||||
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
|
||||
if (newFiles.length > 0) {
|
||||
// Update the modification time and the files processed for that modification time
|
||||
if (lastModTime != newFilter.latestModTime) {
|
||||
lastModTime = newFilter.latestModTime
|
||||
lastModTimeFiles.clear()
|
||||
if (prevModTime < latestModTime) {
|
||||
prevModTime = latestModTime
|
||||
prevModTimeFiles.clear()
|
||||
}
|
||||
lastModTimeFiles ++= newFilter.latestModTimeFiles
|
||||
logDebug("Last mod time updated to " + lastModTime)
|
||||
prevModTimeFiles ++= latestModTimeFiles
|
||||
logDebug("Last mod time updated to " + prevModTime)
|
||||
}
|
||||
files += ((validTime, newFiles))
|
||||
files += ((validTime, newFiles.toArray))
|
||||
Some(filesToRDD(newFiles))
|
||||
}
|
||||
|
||||
|
@ -132,12 +98,28 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
|
|||
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
|
||||
}
|
||||
|
||||
/**
|
||||
* Find files which have modification timestamp <= current time and return a 3-tuple of
|
||||
* (new files found, latest modification time among them, files with latest modification time)
|
||||
*/
|
||||
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
|
||||
logDebug("Trying to get new files for time " + currentTime)
|
||||
val filter = new CustomPathFilter(currentTime)
|
||||
val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
|
||||
(newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
|
||||
}
|
||||
|
||||
/** Generate one RDD from an array of files */
|
||||
protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
|
||||
new UnionRDD(
|
||||
context.sparkContext,
|
||||
files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
|
||||
)
|
||||
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
|
||||
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
|
||||
files.zip(fileRDDs).foreach { case (file, rdd) => {
|
||||
if (rdd.partitions.size == 0) {
|
||||
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
|
||||
"files that have been \"moved\" to the directory assigned to the file stream. " +
|
||||
"Refer to the streaming programming guide for more details.")
|
||||
}
|
||||
}}
|
||||
new UnionRDD(context.sparkContext, fileRDDs)
|
||||
}
|
||||
|
||||
private def path: Path = {
|
||||
|
@ -150,6 +132,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
|
|||
fs_
|
||||
}
|
||||
|
||||
private def reset() {
|
||||
fs_ = null
|
||||
}
|
||||
|
||||
@throws(classOf[IOException])
|
||||
private def readObject(ois: ObjectInputStream) {
|
||||
logDebug(this.getClass().getSimpleName + ".readObject used")
|
||||
|
@ -191,6 +177,51 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
|
|||
hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom PathFilter class to find new files that have modification timestamps <= current time,
|
||||
* but have not been seen before (i.e. the file should not be in lastModTimeFiles)
|
||||
*/
|
||||
private[streaming]
|
||||
class CustomPathFilter(maxModTime: Long) extends PathFilter {
|
||||
// Latest file mod time seen in this round of fetching files and its corresponding files
|
||||
var latestModTime = 0L
|
||||
val latestModTimeFiles = new HashSet[String]()
|
||||
|
||||
def accept(path: Path): Boolean = {
|
||||
try {
|
||||
if (!filter(path)) { // Reject file if it does not satisfy filter
|
||||
logDebug("Rejected by filter " + path)
|
||||
return false
|
||||
}
|
||||
val modTime = fs.getFileStatus(path).getModificationTime()
|
||||
logDebug("Mod time for " + path + " is " + modTime)
|
||||
if (modTime < prevModTime) {
|
||||
logDebug("Mod time less than last mod time")
|
||||
return false // If the file was created before the last time it was called
|
||||
} else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
|
||||
logDebug("Mod time equal to last mod time, but file considered already")
|
||||
return false // If the file was created exactly as lastModTime but not reported yet
|
||||
} else if (modTime > maxModTime) {
|
||||
logDebug("Mod time more than ")
|
||||
return false // If the file is too new that considering it may give errors
|
||||
}
|
||||
if (modTime > latestModTime) {
|
||||
latestModTime = modTime
|
||||
latestModTimeFiles.clear()
|
||||
logDebug("Latest mod time updated to " + latestModTime)
|
||||
}
|
||||
latestModTimeFiles += path.toString
|
||||
logDebug("Accepted " + path)
|
||||
} catch {
|
||||
case fnfe: java.io.FileNotFoundException =>
|
||||
logWarning("Error finding new files", fnfe)
|
||||
reset()
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[streaming]
|
||||
|
|
|
@ -88,8 +88,6 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe
|
|||
*/
|
||||
abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {
|
||||
|
||||
initLogging()
|
||||
|
||||
lazy protected val env = SparkEnv.get
|
||||
|
||||
lazy protected val actor = env.actorSystem.actorOf(
|
||||
|
|
|
@ -17,11 +17,18 @@
|
|||
|
||||
package org.apache.spark.streaming.scheduler
|
||||
|
||||
import akka.actor.{Props, Actor}
|
||||
import org.apache.spark.SparkEnv
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
|
||||
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
|
||||
|
||||
/** Event classes for JobGenerator */
|
||||
private[scheduler] sealed trait JobGeneratorEvent
|
||||
private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
|
||||
private[scheduler] case class ClearOldMetadata(time: Time) extends JobGeneratorEvent
|
||||
private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent
|
||||
|
||||
/**
|
||||
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
|
||||
* up DStream metadata.
|
||||
|
@ -29,45 +36,68 @@ import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
|
|||
private[streaming]
|
||||
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
|
||||
|
||||
initLogging()
|
||||
val ssc = jobScheduler.ssc
|
||||
val clockClass = System.getProperty(
|
||||
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
|
||||
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
|
||||
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
|
||||
longTime => generateJobs(new Time(longTime)))
|
||||
val graph = ssc.graph
|
||||
val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case event: JobGeneratorEvent =>
|
||||
logDebug("Got event of type " + event.getClass.getName)
|
||||
processEvent(event)
|
||||
}
|
||||
}))
|
||||
val clock = {
|
||||
val clockClass = System.getProperty(
|
||||
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
|
||||
Class.forName(clockClass).newInstance().asInstanceOf[Clock]
|
||||
}
|
||||
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
|
||||
longTime => eventProcessorActor ! GenerateJobs(new Time(longTime)))
|
||||
lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
|
||||
new CheckpointWriter(ssc.checkpointDir)
|
||||
new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
|
||||
var latestTime: Time = null
|
||||
|
||||
def start() = synchronized {
|
||||
if (ssc.isCheckpointPresent) {
|
||||
restart()
|
||||
} else {
|
||||
startFirstTime()
|
||||
}
|
||||
logInfo("JobGenerator started")
|
||||
}
|
||||
|
||||
def stop() = synchronized {
|
||||
def stop() {
|
||||
timer.stop()
|
||||
if (checkpointWriter != null) checkpointWriter.stop()
|
||||
ssc.graph.stop()
|
||||
logInfo("JobGenerator stopped")
|
||||
}
|
||||
|
||||
/**
|
||||
* On batch completion, clear old metadata and checkpoint computation.
|
||||
*/
|
||||
private[scheduler] def onBatchCompletion(time: Time) {
|
||||
eventProcessorActor ! ClearOldMetadata(time)
|
||||
}
|
||||
|
||||
/** Processes all events */
|
||||
private def processEvent(event: JobGeneratorEvent) {
|
||||
event match {
|
||||
case GenerateJobs(time) => generateJobs(time)
|
||||
case ClearOldMetadata(time) => clearOldMetadata(time)
|
||||
case DoCheckpoint(time) => doCheckpoint(time)
|
||||
}
|
||||
}
|
||||
|
||||
/** Starts the generator for the first time */
|
||||
private def startFirstTime() {
|
||||
val startTime = new Time(timer.getStartTime())
|
||||
graph.start(startTime - graph.batchDuration)
|
||||
timer.start(startTime.milliseconds)
|
||||
logInfo("JobGenerator's timer started at " + startTime)
|
||||
logInfo("JobGenerator started at " + startTime)
|
||||
}
|
||||
|
||||
/** Restarts the generator based on the information in checkpoint */
|
||||
private def restart() {
|
||||
// If manual clock is being used for testing, then
|
||||
// either set the manual clock to the last checkpointed time,
|
||||
|
@ -99,7 +129,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
|
|||
|
||||
// Restart the timer
|
||||
timer.start(restartTime.milliseconds)
|
||||
logInfo("JobGenerator's timer restarted at " + restartTime)
|
||||
logInfo("JobGenerator restarted at " + restartTime)
|
||||
}
|
||||
|
||||
/** Generate jobs and perform checkpoint for the given `time`. */
|
||||
|
@ -107,16 +137,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
|
|||
SparkEnv.set(ssc.env)
|
||||
logInfo("\n-----------------------------------------------------\n")
|
||||
jobScheduler.runJobs(time, graph.generateJobs(time))
|
||||
latestTime = time
|
||||
doCheckpoint(time)
|
||||
eventProcessorActor ! DoCheckpoint(time)
|
||||
}
|
||||
|
||||
/**
|
||||
* On batch completion, clear old metadata and checkpoint computation.
|
||||
*/
|
||||
private[streaming] def onBatchCompletion(time: Time) {
|
||||
/** Clear DStream metadata for the given `time`. */
|
||||
private def clearOldMetadata(time: Time) {
|
||||
ssc.graph.clearOldMetadata(time)
|
||||
doCheckpoint(time)
|
||||
eventProcessorActor ! DoCheckpoint(time)
|
||||
}
|
||||
|
||||
/** Perform checkpoint for the give `time`. */
|
||||
|
|
|
@ -30,8 +30,6 @@ import org.apache.spark.streaming._
|
|||
private[streaming]
|
||||
class JobScheduler(val ssc: StreamingContext) extends Logging {
|
||||
|
||||
initLogging()
|
||||
|
||||
val jobSets = new ConcurrentHashMap[Time, JobSet]
|
||||
val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
|
||||
val executor = Executors.newFixedThreadPool(numConcurrentJobs)
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration
|
|||
|
||||
private[streaming]
|
||||
object MasterFailureTest extends Logging {
|
||||
initLogging()
|
||||
|
||||
@volatile var killed = false
|
||||
@volatile var killCount = 0
|
||||
|
@ -331,7 +330,6 @@ class TestOutputStream[T: ClassTag](
|
|||
*/
|
||||
private[streaming]
|
||||
class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
|
||||
initLogging()
|
||||
|
||||
override def run() {
|
||||
try {
|
||||
|
@ -366,7 +364,6 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread
|
|||
private[streaming]
|
||||
class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
|
||||
extends Thread with Logging {
|
||||
initLogging()
|
||||
|
||||
override def run() {
|
||||
val localTestDir = Files.createTempDir()
|
||||
|
|
|
@ -17,24 +17,18 @@
|
|||
|
||||
package org.apache.spark.streaming
|
||||
|
||||
import dstream.FileInputDStream
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import java.io.File
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.scalatest.BeforeAndAfter
|
||||
|
||||
import com.google.common.io.Files
|
||||
|
||||
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
|
||||
import org.apache.hadoop.fs.{Path, FileSystem}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.dstream.FileInputDStream
|
||||
import org.apache.spark.streaming.util.ManualClock
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* This test suites tests the checkpointing functionality of DStreams -
|
||||
* the checkpointing of a DStream's RDDs as well as the checkpointing of
|
||||
|
@ -66,7 +60,7 @@ class CheckpointSuite extends TestSuiteBase {
|
|||
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
|
||||
|
||||
val stateStreamCheckpointInterval = Seconds(1)
|
||||
|
||||
val fs = FileSystem.getLocal(new Configuration())
|
||||
// this ensure checkpointing occurs at least once
|
||||
val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
|
||||
val secondNumBatches = firstNumBatches
|
||||
|
@ -90,11 +84,12 @@ class CheckpointSuite extends TestSuiteBase {
|
|||
ssc.start()
|
||||
advanceTimeWithRealDelay(ssc, firstNumBatches)
|
||||
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
|
||||
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
|
||||
assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
|
||||
"No checkpointed RDDs in state stream before first failure")
|
||||
stateStream.checkpointData.checkpointFiles.foreach {
|
||||
case (time, data) => {
|
||||
val file = new File(data.toString)
|
||||
assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
|
||||
case (time, file) => {
|
||||
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
|
||||
" for state stream before first failure does not exist")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,7 +97,8 @@ class CheckpointSuite extends TestSuiteBase {
|
|||
// and check whether the earlier checkpoint files are deleted
|
||||
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
|
||||
advanceTimeWithRealDelay(ssc, secondNumBatches)
|
||||
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
|
||||
checkpointFiles.foreach(file =>
|
||||
assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
|
||||
ssc.stop()
|
||||
|
||||
// Restart stream computation using the checkpoint file and check whether
|
||||
|
@ -110,19 +106,20 @@ class CheckpointSuite extends TestSuiteBase {
|
|||
ssc = new StreamingContext(checkpointDir)
|
||||
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
|
||||
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
|
||||
assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure")
|
||||
assert(!stateStream.generatedRDDs.isEmpty,
|
||||
"No restored RDDs in state stream after recovery from first failure")
|
||||
|
||||
|
||||
// Run one batch to generate a new checkpoint file and check whether some RDD
|
||||
// is present in the checkpoint data or not
|
||||
ssc.start()
|
||||
advanceTimeWithRealDelay(ssc, 1)
|
||||
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
|
||||
assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
|
||||
"No checkpointed RDDs in state stream before second failure")
|
||||
stateStream.checkpointData.checkpointFiles.foreach {
|
||||
case (time, data) => {
|
||||
val file = new File(data.toString)
|
||||
assert(file.exists(),
|
||||
"Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist")
|
||||
case (time, file) => {
|
||||
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
|
||||
" for state stream before seconds failure does not exist")
|
||||
}
|
||||
}
|
||||
ssc.stop()
|
||||
|
@ -132,7 +129,8 @@ class CheckpointSuite extends TestSuiteBase {
|
|||
ssc = new StreamingContext(checkpointDir)
|
||||
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
|
||||
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
|
||||
assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
|
||||
assert(!stateStream.generatedRDDs.isEmpty,
|
||||
"No restored RDDs in state stream after recovery from second failure")
|
||||
|
||||
// Adjust manual clock time as if it is being restarted after a delay
|
||||
System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
|
||||
|
@ -143,6 +141,7 @@ class CheckpointSuite extends TestSuiteBase {
|
|||
ssc = null
|
||||
}
|
||||
|
||||
|
||||
// This tests whether the systm can recover from a master failure with simple
|
||||
// non-stateful operations. This assumes as reliable, replayable input
|
||||
// source - TestInputDStream.
|
||||
|
@ -191,6 +190,7 @@ class CheckpointSuite extends TestSuiteBase {
|
|||
testCheckpointedOperation(input, operation, output, 7)
|
||||
}
|
||||
|
||||
|
||||
// This tests whether file input stream remembers what files were seen before
|
||||
// the master failure and uses them again to process a large window operation.
|
||||
// It also tests whether batches, whose processing was incomplete due to the
|
||||
|
|
|
@ -23,7 +23,7 @@ import akka.actor.IOManager
|
|||
import akka.actor.Props
|
||||
import akka.util.ByteString
|
||||
|
||||
import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent}
|
||||
import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent}
|
||||
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
|
||||
import java.io.{File, BufferedWriter, OutputStreamWriter}
|
||||
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
|
||||
|
|
Loading…
Reference in a new issue