Merge branch 'master' into window-improvement

This commit is contained in:
Tathagata Das 2013-12-26 12:03:11 -08:00
commit c4a54f51b5
37 changed files with 466 additions and 124 deletions

View file

@ -29,8 +29,7 @@ import akka.pattern.ask
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
@ -53,7 +52,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
private[spark] class MapOutputTracker extends Logging {
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
private val timeout = AkkaUtils.askTimeout
// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _

View file

@ -125,6 +125,8 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
override def toString = rdd.toString
}
object JavaRDD {

View file

@ -244,6 +244,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
new java.util.ArrayList(arr)
}
/**
* Return an array that contains all of the elements in a specific partition of this RDD.
*/
def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
// This is useful for implementing `take` from other language frontends
// like Python where the data is serialized.
import scala.collection.JavaConversions._
val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
res.map(x => new java.util.ArrayList(x.toSeq)).toArray
}
/**
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/

View file

@ -235,10 +235,6 @@ private[spark] object PythonRDD {
file.close()
}
def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
implicit val cm : ClassTag[T] = rdd.elementClassTag
rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
}
}
private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {

View file

@ -23,14 +23,14 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import akka.actor._
import akka.pattern.AskTimeoutException
import akka.pattern.ask
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.AkkaUtils
/**
@ -178,7 +178,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val timeout = AkkaUtils.askTimeout
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {

View file

@ -18,19 +18,16 @@
package org.apache.spark.deploy.master
import java.text.SimpleDateFormat
import java.util.concurrent.TimeUnit
import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
import akka.pattern.ask
import akka.remote._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@ -38,7 +35,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@ -64,8 +61,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
var firstApp: Option[ApplicationInfo] = None
Utils.checkHost(host, "Expected hostname")
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
@ -444,14 +439,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
idToApp(app.id) = app
actorToApp(app.driver) = app
addressToApp(appAddress) = app
if (firstApp == None) {
firstApp = Some(app)
}
// TODO: What is firstApp?? Can we remove it?
val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) {
logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
}
waitingApps += app
}
@ -537,12 +524,10 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
val timeoutDuration: FiniteDuration = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
implicit val timeout = Timeout(timeoutDuration)
val respFuture = actor ? RequestWebUIPort // ask pattern
val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse]
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
val timeout = AkkaUtils.askTimeout
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
}
}

View file

@ -18,12 +18,12 @@
package org.apache.spark.deploy.master
import scala.collection.JavaConversions._
import scala.concurrent.ops._
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.apache.zookeeper.data.Stat
import org.apache.spark.Logging
import org.apache.zookeeper._
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.Watcher.Event.KeeperState
/**
* Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
@ -33,7 +33,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
* informed via zkDown().
*
* Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
* times or a semantic exception is thrown (e.g.., "node already exists").
* times or a semantic exception is thrown (e.g., "node already exists").
*/
private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
@ -103,6 +103,7 @@ private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) ext
connectToZooKeeper()
case KeeperState.Disconnected =>
logWarning("ZooKeeper disconnected, will retry...")
case s => // Do nothing
}
}
}
@ -179,7 +180,7 @@ private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) ext
} catch {
case e: KeeperException.NoNodeException => throw e
case e: KeeperException.NodeExistsException => throw e
case e if n > 0 =>
case e: Exception if n > 0 =>
logError("ZooKeeper exception, " + n + " more retries...", e)
Thread.sleep(RETRY_WAIT_MILLIS)
retry(fn, n-1)

View file

@ -21,8 +21,8 @@ import akka.actor.ActorRef
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.EventType
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.Logging
import org.apache.spark.deploy.master.MasterMessages._
private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
@ -105,7 +105,7 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, mas
// We found a different master file pointing to this process.
// This can happen in the following two cases:
// (1) The master process was restarted on the same node.
// (2) The ZK server died between creating the node and returning the name of the node.
// (2) The ZK server died between creating the file and returning the name of the file.
// For this case, we will end up creating a second file, and MUST explicitly delete the
// first one, since our ZK session is still open.
// Note that this deletion will cause a NodeDeleted event to be fired so we check again for

View file

@ -17,32 +17,28 @@
package org.apache.spark.deploy.master.ui
import scala.concurrent.Await
import scala.xml.Node
import akka.pattern.ask
import scala.concurrent.Await
import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
implicit val timeout = parent.timeout
val timeout = parent.timeout
/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
@ -53,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})

View file

@ -17,37 +17,33 @@
package org.apache.spark.deploy.master.ui
import javax.servlet.http.HttpServletRequest
import scala.concurrent.Await
import scala.xml.Node
import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
import org.apache.spark.deploy.DeployWebUI
import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
implicit val timeout = parent.timeout
val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
}
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val state = Await.result(stateFuture, timeout)
val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
val workers = state.workers.sortBy(_.id)

View file

@ -17,25 +17,21 @@
package org.apache.spark.deploy.master.ui
import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.{Logging}
import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.JettyUtils
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
implicit val timeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort

View file

@ -42,13 +42,13 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
val workerState = Await.result(stateFuture, timeout)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
val workerState = Await.result(stateFuture, timeout)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
val runningExecutorTable =

View file

@ -19,17 +19,14 @@ package org.apache.spark.deploy.worker.ui
import java.io.File
import scala.concurrent.duration._
import akka.util.Timeout
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone worker.
@ -37,8 +34,7 @@ import org.eclipse.jetty.server.{Handler, Server}
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
implicit val timeout = Timeout(
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)

View file

@ -222,18 +222,22 @@ private[spark] class Executor(
return
}
val resultSer = SparkEnv.get.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
m.hostname = Utils.localHostName()
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
}
// TODO I'd also like to track the time it takes to serialize the task results, but that is
// huge headache, b/c we need to serialize the task metrics first. If TaskMetrics had a
// custom serialized format, we could just change the relevants bytes in the byte buffer
val accumUpdates = Accumulators.values
val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null))
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
val serializedResult = {

View file

@ -43,6 +43,11 @@ class TaskMetrics extends Serializable {
*/
var jvmGCTime: Long = _
/**
* Amount of time spent serializing the task result
*/
var resultSerializationTime: Long = _
/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/

View file

@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se
/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark]
class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable {
def this() = this(null.asInstanceOf[T], null, null)
def this() = this(null.asInstanceOf[ByteBuffer], null, null)
override def writeExternal(out: ObjectOutput) {
val objectSer = SparkEnv.get.serializer.newInstance()
val bb = objectSer.serialize(value)
out.writeInt(bb.remaining())
Utils.writeByteBuffer(bb, out)
out.writeInt(valueBytes.remaining);
Utils.writeByteBuffer(valueBytes, out)
out.writeInt(accumUpdates.size)
for ((key, value) <- accumUpdates) {
@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
override def readExternal(in: ObjectInput) {
val objectSer = SparkEnv.get.serializer.newInstance()
val blen = in.readInt()
val byteVal = new Array[Byte](blen)
in.readFully(byteVal)
value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
valueBytes = ByteBuffer.wrap(byteVal)
val numUpdates = in.readInt
if (numUpdates == 0) {
@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
}
metrics = in.readObject().asInstanceOf[TaskMetrics]
}
def value(): T = {
val resultSer = SparkEnv.get.serializer.newInstance()
return resultSer.deserialize(valueBytes)
}
}

View file

@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.Utils
import org.apache.spark.util.{AkkaUtils, Utils}
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@ -47,6 +47,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
private val timeout = AkkaUtils.askTimeout
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
@ -172,10 +174,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
private val timeout = {
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
}
def stopExecutors() {
try {
if (driverActor != null) {

View file

@ -18,7 +18,6 @@
package org.apache.spark.storage
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
@ -26,15 +25,17 @@ import akka.pattern.ask
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
private[spark]
class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val timeout = AkkaUtils.askTimeout
/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {

View file

@ -21,17 +21,15 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask
import scala.concurrent.duration._
import scala.concurrent.Future
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.Utils
import org.apache.spark.util.{AkkaUtils, Utils}
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
@ -50,8 +48,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
val akkaTimeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
private val akkaTimeout = AkkaUtils.askTimeout
initLogging()

View file

@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
"Shuffle Write")
def execRow(kv: Seq[String]) = {
<tr>
@ -73,6 +74,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(7)}</td>
<td>{kv(8)}</td>
<td>{kv(9)}</td>
<td>{Utils.msDurationToString(kv(10).toLong)}</td>
<td>{Utils.bytesToString(kv(11).toLong)}</td>
<td>{Utils.bytesToString(kv(12).toLong)}</td>
</tr>
}
@ -111,6 +115,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
Seq(
execId,
@ -122,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
activeTasks.toString,
failedTasks.toString,
completedTasks.toString,
totalTasks.toString
totalTasks.toString,
totalDuration.toString,
totalShuffleRead.toString,
totalShuffleWrite.toString
)
}
@ -130,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
@ -140,6 +153,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
executorToDuration.put(eid, newDuration)
activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
@ -150,6 +166,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
// update shuffle read/write
if (null != taskEnd.taskMetrics) {
taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead =>
executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) +
shuffleRead.remoteBytesRead))
taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite =>
executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) +
shuffleWrite.shuffleBytesWritten))
}
}
}
}

View file

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.jobs
/** class for reporting aggregated metrics for each executors in stageUI */
private[spark] class ExecutorSummary {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
}

View file

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.jobs
import scala.xml.Node
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.util.Utils
import scala.collection.mutable
/** Page showing executor summary */
private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {
val listener = parent.listener
val dateFmt = parent.dateFmt
val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
def toNodeSeq(): Seq[Node] = {
listener.synchronized {
executorTable()
}
}
/** Special table which merges two header cells. */
private def executorTable[T](): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Executor ID</th>
<th>Address</th>
<th>Task Time</th>
<th>Total Tasks</th>
<th>Failed Tasks</th>
<th>Succeeded Tasks</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
</thead>
<tbody>
{createExecutorTable()}
</tbody>
</table>
}
private def createExecutorTable() : Seq[Node] = {
// make a executor-id -> address map
val executorIdToAddress = mutable.HashMap[String, String]()
val storageStatusList = parent.sc.getExecutorStorageStatus
for (statusId <- 0 until storageStatusList.size) {
val blockManagerId = parent.sc.getExecutorStorageStatus(statusId).blockManagerId
val address = blockManagerId.hostPort
val executorId = blockManagerId.executorId
executorIdToAddress.put(executorId, address)
}
val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
executorIdToSummary match {
case Some(x) => {
x.toSeq.sortBy(_._1).map{
case (k,v) => {
<tr>
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
<td>{parent.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
<td>{Utils.bytesToString(v.shuffleRead)}</td>
<td>{Utils.bytesToString(v.shuffleWrite)}</td>
</tr>
}
}
}
case _ => { Seq[Node]() }
}
}
}

View file

@ -57,6 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTasksFailed = HashMap[Int, Int]()
val stageIdToTaskInfos =
HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@ -124,8 +125,41 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val sid = taskEnd.task.stageId
// create executor summary map if necessary
val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
op = new HashMap[String, ExecutorSummary]())
executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId,
op = new ExecutorSummary())
val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId)
executorSummary match {
case Some(y) => {
// first update failed-task, succeed-task
taskEnd.reason match {
case Success =>
y.succeededTasks += 1
case _ =>
y.failedTasks += 1
}
// update duration
y.taskTime += taskEnd.taskInfo.duration
taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead =>
y.shuffleRead += shuffleRead.remoteBytesRead
}
taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite =>
y.shuffleWrite += shuffleWrite.shuffleBytesWritten
}
}
case _ => {}
}
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>

View file

@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
<div>
<ul class="unstyled">
<li>
<strong>Total duration across all tasks: </strong>
<strong>Total task time across all tasks: </strong>
{parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskHeaders: Seq[String] =
Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
Seq("Duration", "GC Time") ++
Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
Seq("Errors")
@ -101,6 +101,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
None
}
else {
val serializationTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.resultSerializationTime.toDouble}
val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong))
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.executorRunTime.toDouble}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
@ -149,6 +154,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
val listings: Seq[Seq[String]] = Seq(
serializationQuantiles,
serviceQuantiles,
gettingResultQuantiles,
schedulerDelayQuantiles,
@ -160,11 +166,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
val executorTable = new ExecutorTable(parent, stageId)
val content =
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
@ -183,6 +190,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L)
val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead}
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
@ -210,6 +218,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) parent.formatDuration(gcTime) else ""}
</td>
<td sorttable_customkey={serializationTime.toString}>
{if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
</td>
{if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}

View file

@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
<th>Duration</th>
<th>Task Time</th>
<th>Tasks: Succeeded/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>

View file

@ -28,9 +28,6 @@ import org.apache.spark.ui.JettyUtils._
/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
implicit val timeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val indexPage = new IndexPage(this)
val rddPage = new RDDPage(this)

View file

@ -17,6 +17,8 @@
package org.apache.spark.util
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
@ -84,4 +86,8 @@ private[spark] object AkkaUtils {
(actorSystem, boundPort)
}
/** Returns the default Spark timeout to use for Akka ask operations. */
def askTimeout: FiniteDuration = {
Duration.create(System.getProperty("spark.akka.askTimeout", "30").toLong, "seconds")
}
}

View file

@ -897,4 +897,37 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
}
@Test
public void collectPartitions() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) throws Exception {
return new Tuple2<Integer, Integer>(i, i % 2);
}
});
List[] parts = rdd1.collectPartitions(new int[] {0});
Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
parts = rdd1.collectPartitions(new int[] {1, 2});
Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(2, 0)),
rdd2.collectPartitions(new int[] {0})[0]);
parts = rdd2.collectPartitions(new int[] {1, 2});
Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
new Tuple2<Integer, Integer>(4, 0)),
parts[0]);
Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
new Tuple2<Integer, Integer>(6, 0),
new Tuple2<Integer, Integer>(7, 1)),
parts[1]);
}
}

View file

@ -313,6 +313,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
}
def createTaskResult(id: Int): DirectTaskResult[Int] = {
new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
}
}

View file

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.jobs
import org.scalatest.FunSuite
import org.apache.spark.scheduler._
import org.apache.spark.{LocalSparkContext, SparkContext, Success}
import org.apache.spark.scheduler.SparkListenerTaskStart
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
test("test executor id to summary") {
val sc = new SparkContext("local", "test")
val listener = new JobProgressListener(sc)
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
// nothing in it
assert(listener.stageIdToExecutorSummaries.size == 0)
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
.shuffleRead == 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.size == 1)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
.shuffleRead == 2000)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
.shuffleRead == 1000)
}
}

View file

@ -114,6 +114,9 @@ object SparkBuild extends Build {
fork := true,
javaOptions += "-Xmx3g",
// Show full stack trace and duration in test cases.
testOptions in Test += Tests.Argument("-oDF"),
// Only allow one test at a time, even across projects, since they run in the same JVM
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
@ -177,6 +180,8 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.0.CR1",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.9" % "test",
@ -257,7 +262,7 @@ object SparkBuild extends Build {
libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v )
)
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
libraryDependencies ++= Seq(

View file

@ -43,7 +43,6 @@ class SparkContext(object):
_gateway = None
_jvm = None
_writeToFile = None
_takePartition = None
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
@ -134,8 +133,6 @@ class SparkContext(object):
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = \
SparkContext._jvm.PythonRDD.writeToFile
SparkContext._takePartition = \
SparkContext._jvm.PythonRDD.takePartition
if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:

View file

@ -54,6 +54,9 @@ class RDD(object):
self.ctx = ctx
self._jrdd_deserializer = jrdd_deserializer
def __repr__(self):
return self._jrdd.toString()
@property
def context(self):
"""
@ -576,8 +579,13 @@ class RDD(object):
# Take only up to num elements from each partition we try
mapped = self.mapPartitions(takeUpToNum)
items = []
# TODO(shivaram): Similar to the scala implementation, update the take
# method to scan multiple splits based on an estimate of how many elements
# we have per-split.
for partition in range(mapped._jrdd.splits().size()):
iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition)
partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
partitionsToTake[0] = partition
iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
items.extend(mapped._collect_iterator_through_file(iterator))
if len(items) >= num:
break

View file

@ -17,6 +17,11 @@
# limitations under the License.
#
cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac
SCALA_VERSION=2.10
# Figure out where the Scala framework is installed
@ -59,6 +64,11 @@ fi
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
if $cygwin; then
CLASSPATH=`cygpath -wp $CLASSPATH`
export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
fi
# Find java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"

21
sbt/sbt
View file

@ -17,12 +17,27 @@
# limitations under the License.
#
EXTRA_ARGS=""
cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac
EXTRA_ARGS="-Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m"
if [ "$MESOS_HOME" != "" ]; then
EXTRA_ARGS="-Djava.library.path=$MESOS_HOME/lib/java"
EXTRA_ARGS="$EXTRA_ARGS -Djava.library.path=$MESOS_HOME/lib/java"
fi
export SPARK_HOME=$(cd "$(dirname $0)/.." 2>&1 >/dev/null ; pwd)
export SPARK_TESTING=1 # To put test classes on classpath
java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m $EXTRA_ARGS $SBT_OPTS -jar "$SPARK_HOME"/sbt/sbt-launch-*.jar "$@"
SBT_JAR="$SPARK_HOME"/sbt/sbt-launch-*.jar
if $cygwin; then
SBT_JAR=`cygpath -w $SBT_JAR`
export SPARK_HOME=`cygpath -w $SPARK_HOME`
EXTRA_ARGS="$EXTRA_ARGS -Djline.terminal=jline.UnixTerminal -Dsbt.cygwin=true"
stty -icanon min 1 -echo > /dev/null 2>&1
java $EXTRA_ARGS $SBT_OPTS -jar $SBT_JAR "$@"
stty icanon echo > /dev/null 2>&1
else
java $EXTRA_ARGS $SBT_OPTS -jar $SBT_JAR "$@"
fi

View file

@ -17,6 +17,11 @@
# limitations under the License.
#
cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac
SCALA_VERSION=2.10
# Figure out where the Scala framework is installed
@ -125,6 +130,11 @@ fi
# Compute classpath using external script
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSPATH="$SPARK_TOOLS_JAR:$CLASSPATH"
if $cygwin; then
CLASSPATH=`cygpath -wp $CLASSPATH`
export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR`
fi
export CLASSPATH
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then

View file

@ -23,7 +23,11 @@
# if those two env vars are set in spark-env.sh but MASTER is not.
# Options:
# -c <cores> Set the number of cores for REPL to use
#
cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac
# Enter posix mode for bash
set -o posix
@ -79,7 +83,18 @@ if [[ ! $? ]]; then
saved_stty=""
fi
$FWDIR/spark-class $OPTIONS org.apache.spark.repl.Main "$@"
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
# (see http://sourceforge.net/p/jline/bugs/40/).
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
$FWDIR/spark-class -Djline.terminal=unix $OPTIONS org.apache.spark.repl.Main "$@"
stty icanon echo > /dev/null 2>&1
else
$FWDIR/spark-class $OPTIONS org.apache.spark.repl.Main "$@"
fi
# record the exit status lest it be overwritten:
# then reenable echo and propagate the code.