[SPARK-7927] whitespace fixes for core.
So we can enable a whitespace enforcement rule in the style checker to save code review time. Author: Reynold Xin <rxin@databricks.com> Closes #6473 from rxin/whitespace-core and squashes the following commits: 058195d [Reynold Xin] Fixed tests. fce11e9 [Reynold Xin] [SPARK-7927] whitespace fixes for core.
This commit is contained in:
parent
8da560d7de
commit
7f7505d8db
|
@ -228,7 +228,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
|
|||
* @tparam T result type
|
||||
*/
|
||||
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
|
||||
extends Accumulable[T,T](initialValue, param, name) {
|
||||
extends Accumulable[T, T](initialValue, param, name) {
|
||||
|
||||
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ case class Aggregator[K, V, C] (
|
|||
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
|
||||
context: TaskContext): Iterator[(K, C)] = {
|
||||
if (!isSpillEnabled) {
|
||||
val combiners = new AppendOnlyMap[K,C]
|
||||
val combiners = new AppendOnlyMap[K, C]
|
||||
var kv: Product2[K, V] = null
|
||||
val update = (hadValue: Boolean, oldValue: C) => {
|
||||
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
|
||||
|
@ -76,7 +76,7 @@ case class Aggregator[K, V, C] (
|
|||
: Iterator[(K, C)] =
|
||||
{
|
||||
if (!isSpillEnabled) {
|
||||
val combiners = new AppendOnlyMap[K,C]
|
||||
val combiners = new AppendOnlyMap[K, C]
|
||||
var kc: Product2[K, C] = null
|
||||
val update = (hadValue: Boolean, oldValue: C) => {
|
||||
if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
|
||||
|
|
|
@ -103,7 +103,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
|
|||
*/
|
||||
class RangePartitioner[K : Ordering : ClassTag, V](
|
||||
@transient partitions: Int,
|
||||
@transient rdd: RDD[_ <: Product2[K,V]],
|
||||
@transient rdd: RDD[_ <: Product2[K, V]],
|
||||
private var ascending: Boolean = true)
|
||||
extends Partitioner {
|
||||
|
||||
|
@ -185,7 +185,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
|
|||
}
|
||||
|
||||
override def equals(other: Any): Boolean = other match {
|
||||
case r: RangePartitioner[_,_] =>
|
||||
case r: RangePartitioner[_, _] =>
|
||||
r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
|
||||
case _ =>
|
||||
false
|
||||
|
@ -249,7 +249,7 @@ private[spark] object RangePartitioner {
|
|||
* @param sampleSizePerPartition max sample size per partition
|
||||
* @return (total number of items, an array of (partitionId, number of items, sample))
|
||||
*/
|
||||
def sketch[K:ClassTag](
|
||||
def sketch[K : ClassTag](
|
||||
rdd: RDD[K],
|
||||
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
|
||||
val shift = rdd.id
|
||||
|
@ -272,7 +272,7 @@ private[spark] object RangePartitioner {
|
|||
* @param partitions number of partitions
|
||||
* @return selected bounds
|
||||
*/
|
||||
def determineBounds[K:Ordering:ClassTag](
|
||||
def determineBounds[K : Ordering : ClassTag](
|
||||
candidates: ArrayBuffer[(K, Float)],
|
||||
partitions: Int): Array[K] = {
|
||||
val ordering = implicitly[Ordering[K]]
|
||||
|
|
|
@ -481,7 +481,7 @@ private[spark] object SparkConf extends Logging {
|
|||
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
|
||||
)
|
||||
|
||||
Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
|
||||
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -389,7 +389,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
|||
|
||||
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
|
||||
|
||||
_jars =_conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
|
||||
_jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
|
||||
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
|
||||
.toSeq.flatten
|
||||
|
||||
|
@ -438,7 +438,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
|||
_ui =
|
||||
if (conf.getBoolean("spark.ui.enabled", true)) {
|
||||
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
|
||||
_env.securityManager,appName, startTime = startTime))
|
||||
_env.securityManager, appName, startTime = startTime))
|
||||
} else {
|
||||
// For tests, do not enable the UI
|
||||
None
|
||||
|
@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
|||
classOf[FixedLengthBinaryInputFormat],
|
||||
classOf[LongWritable],
|
||||
classOf[BytesWritable],
|
||||
conf=conf)
|
||||
conf = conf)
|
||||
val data = br.map { case (k, v) =>
|
||||
val bytes = v.getBytes
|
||||
assert(bytes.length == recordLength, "Byte array does not have correct length")
|
||||
|
@ -1267,7 +1267,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
|||
*/
|
||||
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
|
||||
(initialValue: R): Accumulable[R, T] = {
|
||||
val param = new GrowableAccumulableParam[R,T]
|
||||
val param = new GrowableAccumulableParam[R, T]
|
||||
val acc = new Accumulable(initialValue, param)
|
||||
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
|
||||
acc
|
||||
|
@ -1316,7 +1316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
|||
val uri = new URI(path)
|
||||
val schemeCorrectedPath = uri.getScheme match {
|
||||
case null | "local" => new File(path).getCanonicalFile.toURI.toString
|
||||
case _ => path
|
||||
case _ => path
|
||||
}
|
||||
|
||||
val hadoopPath = new Path(schemeCorrectedPath)
|
||||
|
|
|
@ -298,7 +298,7 @@ object SparkEnv extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
val mapOutputTracker = if (isDriver) {
|
||||
val mapOutputTracker = if (isDriver) {
|
||||
new MapOutputTrackerMaster(conf)
|
||||
} else {
|
||||
new MapOutputTrackerWorker(conf)
|
||||
|
@ -348,7 +348,7 @@ object SparkEnv extends Logging {
|
|||
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
|
||||
val server = new HttpFileServer(conf, securityManager, fileServerPort)
|
||||
server.initialize()
|
||||
conf.set("spark.fileserver.uri", server.serverUri)
|
||||
conf.set("spark.fileserver.uri", server.serverUri)
|
||||
server
|
||||
} else {
|
||||
null
|
||||
|
|
|
@ -50,8 +50,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
|
|||
private var jID: SerializableWritable[JobID] = null
|
||||
private var taID: SerializableWritable[TaskAttemptID] = null
|
||||
|
||||
@transient private var writer: RecordWriter[AnyRef,AnyRef] = null
|
||||
@transient private var format: OutputFormat[AnyRef,AnyRef] = null
|
||||
@transient private var writer: RecordWriter[AnyRef, AnyRef] = null
|
||||
@transient private var format: OutputFormat[AnyRef, AnyRef] = null
|
||||
@transient private var committer: OutputCommitter = null
|
||||
@transient private var jobContext: JobContext = null
|
||||
@transient private var taskContext: TaskAttemptContext = null
|
||||
|
@ -114,10 +114,10 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
|
|||
|
||||
// ********* Private Functions *********
|
||||
|
||||
private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
|
||||
private def getOutputFormat(): OutputFormat[AnyRef, AnyRef] = {
|
||||
if (format == null) {
|
||||
format = conf.value.getOutputFormat()
|
||||
.asInstanceOf[OutputFormat[AnyRef,AnyRef]]
|
||||
.asInstanceOf[OutputFormat[AnyRef, AnyRef]]
|
||||
}
|
||||
format
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
|
|||
|
||||
private def getTaskContext(): TaskAttemptContext = {
|
||||
if (taskContext == null) {
|
||||
taskContext = newTaskAttemptContext(conf.value, taID.value)
|
||||
taskContext = newTaskAttemptContext(conf.value, taID.value)
|
||||
}
|
||||
taskContext
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
def mapPartitionsWithIndex[R](
|
||||
f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
|
||||
preservesPartitioning: Boolean = false): JavaRDD[R] =
|
||||
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
|
||||
new JavaRDD(rdd.mapPartitionsWithIndex(((a, b) => f(a, asJavaIterator(b))),
|
||||
preservesPartitioning)(fakeClassTag))(fakeClassTag)
|
||||
|
||||
/**
|
||||
|
|
|
@ -723,7 +723,7 @@ private[spark] object PythonRDD extends Logging {
|
|||
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
|
||||
new JavaToWritableConverter)
|
||||
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
|
||||
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
|
||||
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec = codec)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -124,7 +124,7 @@ private[r] class RBackendHandler(server: RBackend)
|
|||
}
|
||||
throw new Exception(s"No matched method found for $cls.$methodName")
|
||||
}
|
||||
val ret = methods.head.invoke(obj, args:_*)
|
||||
val ret = methods.head.invoke(obj, args : _*)
|
||||
|
||||
// Write status bit
|
||||
writeInt(dos, 0)
|
||||
|
@ -135,7 +135,7 @@ private[r] class RBackendHandler(server: RBackend)
|
|||
matchMethod(numArgs, args, x.getParameterTypes)
|
||||
}.head
|
||||
|
||||
val obj = ctor.newInstance(args:_*)
|
||||
val obj = ctor.newInstance(args : _*)
|
||||
|
||||
writeInt(dos, 0)
|
||||
writeObject(dos, obj.asInstanceOf[AnyRef])
|
||||
|
|
|
@ -309,7 +309,7 @@ private class StringRRDD[T: ClassTag](
|
|||
}
|
||||
|
||||
private object SpecialLengths {
|
||||
val TIMING_DATA = -1
|
||||
val TIMING_DATA = -1
|
||||
}
|
||||
|
||||
private[r] class BufferedStreamThread(
|
||||
|
|
|
@ -125,7 +125,7 @@ private[broadcast] object HttpBroadcast extends Logging {
|
|||
securityManager = securityMgr
|
||||
if (isDriver) {
|
||||
createServer(conf)
|
||||
conf.set("spark.httpBroadcast.uri", serverUri)
|
||||
conf.set("spark.httpBroadcast.uri", serverUri)
|
||||
}
|
||||
serverUri = conf.get("spark.httpBroadcast.uri")
|
||||
cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
|
||||
|
@ -187,7 +187,7 @@ private[broadcast] object HttpBroadcast extends Logging {
|
|||
}
|
||||
|
||||
private def read[T: ClassTag](id: Long): T = {
|
||||
logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
|
||||
logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
|
||||
val url = serverUri + "/" + BroadcastBlockId(id).name
|
||||
|
||||
var uc: URLConnection = null
|
||||
|
|
|
@ -65,7 +65,7 @@ private object FaultToleranceTest extends App with Logging {
|
|||
private val workers = ListBuffer[TestWorkerInfo]()
|
||||
private var sc: SparkContext = _
|
||||
|
||||
private val zk = SparkCuratorUtil.newClient(conf)
|
||||
private val zk = SparkCuratorUtil.newClient(conf)
|
||||
|
||||
private var numPassed = 0
|
||||
private var numFailed = 0
|
||||
|
|
|
@ -361,7 +361,7 @@ object SparkSubmit {
|
|||
pyArchives = pythonPath.mkString(",")
|
||||
}
|
||||
|
||||
pyArchives = pyArchives.split(",").map { localPath=>
|
||||
pyArchives = pyArchives.split(",").map { localPath =>
|
||||
val localURI = Utils.resolveURI(localPath)
|
||||
if (localURI.getScheme != "local") {
|
||||
args.files = mergeFileLists(args.files, localURI.toString)
|
||||
|
|
|
@ -554,7 +554,7 @@ private[deploy] object Worker extends Logging {
|
|||
conf = conf, securityManager = securityMgr)
|
||||
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
|
||||
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
|
||||
masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
|
||||
masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
|
||||
(actorSystem, boundPort)
|
||||
}
|
||||
|
||||
|
|
|
@ -261,7 +261,7 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
|
|||
*/
|
||||
private var _recordsRead: Long = _
|
||||
def recordsRead: Long = _recordsRead
|
||||
def incRecordsRead(records: Long): Unit = _recordsRead += records
|
||||
def incRecordsRead(records: Long): Unit = _recordsRead += records
|
||||
|
||||
/**
|
||||
* Invoke the bytesReadCallback and mutate bytesRead.
|
||||
|
|
|
@ -60,7 +60,7 @@ trait SparkHadoopMapReduceUtil {
|
|||
val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType")
|
||||
.asInstanceOf[Class[Enum[_]]]
|
||||
val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(
|
||||
taskTypeClass, if(isMap) "MAP" else "REDUCE")
|
||||
taskTypeClass, if (isMap) "MAP" else "REDUCE")
|
||||
val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
|
||||
classOf[Int], classOf[Int])
|
||||
ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId),
|
||||
|
|
|
@ -110,7 +110,7 @@ private[nio] class BlockMessage() {
|
|||
def getType: Int = typ
|
||||
def getId: BlockId = id
|
||||
def getData: ByteBuffer = data
|
||||
def getLevel: StorageLevel = level
|
||||
def getLevel: StorageLevel = level
|
||||
|
||||
def toBufferMessage: BufferMessage = {
|
||||
val buffers = new ArrayBuffer[ByteBuffer]()
|
||||
|
|
|
@ -114,8 +114,8 @@ private[nio] object BlockMessageArray {
|
|||
val blockMessages =
|
||||
(0 until 10).map { i =>
|
||||
if (i % 2 == 0) {
|
||||
val buffer = ByteBuffer.allocate(100)
|
||||
buffer.clear
|
||||
val buffer = ByteBuffer.allocate(100)
|
||||
buffer.clear()
|
||||
BlockMessage.fromPutBlock(PutBlock(TestBlockId(i.toString), buffer,
|
||||
StorageLevel.MEMORY_ONLY_SER))
|
||||
} else {
|
||||
|
|
|
@ -75,7 +75,7 @@ private[nio] class SecurityMessage extends Logging {
|
|||
for (i <- 1 to idLength) {
|
||||
idBuilder += buffer.getChar()
|
||||
}
|
||||
connectionId = idBuilder.toString()
|
||||
connectionId = idBuilder.toString()
|
||||
|
||||
val tokenLength = buffer.getInt()
|
||||
token = new Array[Byte](tokenLength)
|
||||
|
|
|
@ -32,12 +32,12 @@ import org.apache.spark.util.collection.OpenHashMap
|
|||
* An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
|
||||
*/
|
||||
private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double)
|
||||
extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] {
|
||||
extends ApproximateEvaluator[OpenHashMap[T, Long], Map[T, BoundedDouble]] {
|
||||
|
||||
var outputsMerged = 0
|
||||
var sums = new OpenHashMap[T,Long]() // Sum of counts for each key
|
||||
var sums = new OpenHashMap[T, Long]() // Sum of counts for each key
|
||||
|
||||
override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) {
|
||||
override def merge(outputId: Int, taskResult: OpenHashMap[T, Long]) {
|
||||
outputsMerged += 1
|
||||
taskResult.foreach { case (key, value) =>
|
||||
sums.changeValue(key, value, _ + value)
|
||||
|
|
|
@ -49,7 +49,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
|
|||
if (fs.exists(cpath)) {
|
||||
val dirContents = fs.listStatus(cpath).map(_.getPath)
|
||||
val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
|
||||
val numPart = partitionFiles.length
|
||||
val numPart = partitionFiles.length
|
||||
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
|
||||
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
|
||||
throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
|
||||
|
|
|
@ -310,11 +310,11 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
|
|||
def throwBalls() {
|
||||
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
|
||||
if (maxPartitions > groupArr.size) { // just return prev.partitions
|
||||
for ((p,i) <- prev.partitions.zipWithIndex) {
|
||||
for ((p, i) <- prev.partitions.zipWithIndex) {
|
||||
groupArr(i).arr += p
|
||||
}
|
||||
} else { // no locality available, then simply split partitions based on positions in array
|
||||
for(i <- 0 until maxPartitions) {
|
||||
for (i <- 0 until maxPartitions) {
|
||||
val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
|
||||
val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
|
||||
(rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
|
||||
|
|
|
@ -467,7 +467,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
|
||||
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
|
||||
val bufs = combineByKey[CompactBuffer[V]](
|
||||
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
|
||||
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
|
||||
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
|
||||
}
|
||||
|
||||
|
@ -1011,7 +1011,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
jobFormat.checkOutputSpecs(job)
|
||||
}
|
||||
|
||||
val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
|
||||
val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => {
|
||||
val config = wrappedConf.value
|
||||
/* "reduce task" <split #> <attempt # = spark task #> */
|
||||
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
|
||||
|
@ -1027,7 +1027,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
|
||||
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
|
||||
|
||||
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
|
||||
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]]
|
||||
require(writer != null, "Unable to obtain RecordWriter")
|
||||
var recordsWritten = 0L
|
||||
Utils.tryWithSafeFinally {
|
||||
|
|
|
@ -454,7 +454,7 @@ abstract class RDD[T: ClassTag](
|
|||
withReplacement: Boolean,
|
||||
num: Int,
|
||||
seed: Long = Utils.random.nextLong): Array[T] = {
|
||||
val numStDev = 10.0
|
||||
val numStDev = 10.0
|
||||
|
||||
if (num < 0) {
|
||||
throw new IllegalArgumentException("Negative number of elements requested")
|
||||
|
@ -1138,8 +1138,8 @@ abstract class RDD[T: ClassTag](
|
|||
if (elementClassTag.runtimeClass.isArray) {
|
||||
throw new SparkException("countByValueApprox() does not support arrays")
|
||||
}
|
||||
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) =>
|
||||
val map = new OpenHashMap[T,Long]
|
||||
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (ctx, iter) =>
|
||||
val map = new OpenHashMap[T, Long]
|
||||
iter.foreach {
|
||||
t => map.changeValue(t, 1L, _ + 1L)
|
||||
}
|
||||
|
@ -1585,15 +1585,15 @@ abstract class RDD[T: ClassTag](
|
|||
case 0 => Seq.empty
|
||||
case 1 =>
|
||||
val d = rdd.dependencies.head
|
||||
debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]], true)
|
||||
debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]], true)
|
||||
case _ =>
|
||||
val frontDeps = rdd.dependencies.take(len - 1)
|
||||
val frontDepStrings = frontDeps.flatMap(
|
||||
d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]]))
|
||||
d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]]))
|
||||
|
||||
val lastDep = rdd.dependencies.last
|
||||
val lastDepStrings =
|
||||
debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_,_,_]], true)
|
||||
debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_, _, _]], true)
|
||||
|
||||
(frontDepStrings ++ lastDepStrings)
|
||||
}
|
||||
|
|
|
@ -104,13 +104,13 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
|
|||
if (!convertKey && !convertValue) {
|
||||
self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
|
||||
} else if (!convertKey && convertValue) {
|
||||
self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
|
||||
self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(
|
||||
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
|
||||
} else if (convertKey && !convertValue) {
|
||||
self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
|
||||
self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
|
||||
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
|
||||
} else if (convertKey && convertValue) {
|
||||
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
|
||||
self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
|
||||
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,7 +125,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
|
|||
integrate(0, t => getSeq(t._1) += t._2)
|
||||
// the second dep is rdd2; remove all of its keys
|
||||
integrate(1, t => map.remove(t._1))
|
||||
map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten
|
||||
map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten
|
||||
}
|
||||
|
||||
override def clearDependencies() {
|
||||
|
|
|
@ -123,7 +123,7 @@ private[spark] class ZippedPartitionsRDD3
|
|||
}
|
||||
|
||||
private[spark] class ZippedPartitionsRDD4
|
||||
[A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag](
|
||||
[A: ClassTag, B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
|
||||
sc: SparkContext,
|
||||
var f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
|
||||
var rdd1: RDD[A],
|
||||
|
|
|
@ -1367,10 +1367,10 @@ class DAGScheduler(
|
|||
private def getPreferredLocsInternal(
|
||||
rdd: RDD[_],
|
||||
partition: Int,
|
||||
visited: HashSet[(RDD[_],Int)]): Seq[TaskLocation] = {
|
||||
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
|
||||
// If the partition has already been visited, no need to re-visit.
|
||||
// This avoids exponential path exploration. SPARK-695
|
||||
if (!visited.add((rdd,partition))) {
|
||||
if (!visited.add((rdd, partition))) {
|
||||
// Nil has already been returned for previously visited partitions.
|
||||
return Nil
|
||||
}
|
||||
|
|
|
@ -17,9 +17,8 @@
|
|||
|
||||
package org.apache.spark.scheduler
|
||||
|
||||
import com.codahale.metrics.{Gauge,MetricRegistry}
|
||||
import com.codahale.metrics.{Gauge, MetricRegistry}
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.metrics.source.Source
|
||||
|
||||
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
|
||||
|
|
|
@ -56,7 +56,7 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
|
|||
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
|
||||
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
|
||||
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
|
||||
var compare:Int = 0
|
||||
var compare: Int = 0
|
||||
|
||||
if (s1Needy && !s2Needy) {
|
||||
return true
|
||||
|
|
|
@ -270,7 +270,7 @@ class StatsReportListener extends SparkListener with Logging {
|
|||
private[spark] object StatsReportListener extends Logging {
|
||||
|
||||
// For profiling, the extremes are more interesting
|
||||
val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
|
||||
val percentiles = Array[Int](0, 5, 10, 25, 50, 75, 90, 95, 100)
|
||||
val probabilities = percentiles.map(_ / 100.0)
|
||||
val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
|
||||
|
||||
|
@ -304,7 +304,7 @@ private[spark] object StatsReportListener extends Logging {
|
|||
dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
|
||||
}
|
||||
|
||||
def showDistribution(heading: String, dOpt: Option[Distribution], format:String) {
|
||||
def showDistribution(heading: String, dOpt: Option[Distribution], format: String) {
|
||||
def f(d: Double): String = format.format(d)
|
||||
showDistribution(heading, dOpt, f _)
|
||||
}
|
||||
|
@ -318,7 +318,7 @@ private[spark] object StatsReportListener extends Logging {
|
|||
}
|
||||
|
||||
def showBytesDistribution(
|
||||
heading:String,
|
||||
heading: String,
|
||||
getMetric: (TaskInfo, TaskMetrics) => Option[Long],
|
||||
taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
|
||||
showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
|
||||
|
|
|
@ -781,10 +781,10 @@ private[spark] class TaskSetManager(
|
|||
// that it's okay if we add a task to the same queue twice (if it had multiple preferred
|
||||
// locations), because dequeueTaskFromList will skip already-running tasks.
|
||||
for (index <- getPendingTasksForExecutor(execId)) {
|
||||
addPendingTask(index, readding=true)
|
||||
addPendingTask(index, readding = true)
|
||||
}
|
||||
for (index <- getPendingTasksForHost(host)) {
|
||||
addPendingTask(index, readding=true)
|
||||
addPendingTask(index, readding = true)
|
||||
}
|
||||
|
||||
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
|
||||
|
|
|
@ -75,7 +75,8 @@ private[spark] object CoarseGrainedClusterMessages {
|
|||
case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage
|
||||
|
||||
// Exchanged between the driver and the AM in Yarn client mode
|
||||
case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase: String)
|
||||
case class AddWebUIFilter(
|
||||
filterName: String, filterParams: Map[String, String], proxyBase: String)
|
||||
extends CoarseGrainedClusterMessage
|
||||
|
||||
// Messages exchanged between the driver and the cluster manager for executor allocation
|
||||
|
|
|
@ -149,7 +149,7 @@ private[spark] abstract class YarnSchedulerBackend(
|
|||
}
|
||||
}
|
||||
|
||||
override def onStop(): Unit ={
|
||||
override def onStop(): Unit = {
|
||||
askAmThreadPool.shutdownNow()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ private[spark] class CoarseMesosSchedulerBackend(
|
|||
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
|
||||
|
||||
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
|
||||
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
|
||||
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
|
||||
|
||||
// Cores we have acquired with each Mesos task ID
|
||||
val coresByTaskId = new HashMap[Int, Int]
|
||||
|
|
|
@ -146,7 +146,7 @@ private[spark] class MesosSchedulerBackend(
|
|||
private def createExecArg(): Array[Byte] = {
|
||||
if (execArgs == null) {
|
||||
val props = new HashMap[String, String]
|
||||
for ((key,value) <- sc.conf.getAll) {
|
||||
for ((key, value) <- sc.conf.getAll) {
|
||||
props(key) = value
|
||||
}
|
||||
// Serialize the map as an array of (String, String) pairs
|
||||
|
|
|
@ -108,7 +108,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
|
|||
image: String,
|
||||
volumes: Option[List[Volume]] = None,
|
||||
network: Option[ContainerInfo.DockerInfo.Network] = None,
|
||||
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None):Unit = {
|
||||
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
|
||||
|
||||
val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
|
||||
|
||||
|
|
|
@ -169,7 +169,7 @@ private[v1] object AllStagesResource {
|
|||
|
||||
val outputMetrics: Option[OutputMetricDistributions] =
|
||||
new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) {
|
||||
def getSubmetrics(raw:InternalTaskMetrics): Option[InternalOutputMetrics] = {
|
||||
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalOutputMetrics] = {
|
||||
raw.outputMetrics
|
||||
}
|
||||
def build: OutputMetricDistributions = new OutputMetricDistributions(
|
||||
|
@ -284,7 +284,7 @@ private[v1] object AllStagesResource {
|
|||
* the options (returning None if the metrics are all empty), and extract the quantiles for each
|
||||
* metric. After creating an instance, call metricOption to get the result type.
|
||||
*/
|
||||
private[v1] abstract class MetricHelper[I,O](
|
||||
private[v1] abstract class MetricHelper[I, O](
|
||||
rawMetrics: Seq[InternalTaskMetrics],
|
||||
quantiles: Array[Double]) {
|
||||
|
||||
|
|
|
@ -101,7 +101,7 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
|
|||
|
||||
|
||||
@Path("applications/{appId}/stages")
|
||||
def getStages(@PathParam("appId") appId: String): AllStagesResource= {
|
||||
def getStages(@PathParam("appId") appId: String): AllStagesResource = {
|
||||
uiRoot.withSparkUI(appId, None) { ui =>
|
||||
new AllStagesResource(ui)
|
||||
}
|
||||
|
@ -110,14 +110,14 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
|
|||
@Path("applications/{appId}/{attemptId}/stages")
|
||||
def getStages(
|
||||
@PathParam("appId") appId: String,
|
||||
@PathParam("attemptId") attemptId: String): AllStagesResource= {
|
||||
@PathParam("attemptId") attemptId: String): AllStagesResource = {
|
||||
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
|
||||
new AllStagesResource(ui)
|
||||
}
|
||||
}
|
||||
|
||||
@Path("applications/{appId}/stages/{stageId: \\d+}")
|
||||
def getStage(@PathParam("appId") appId: String): OneStageResource= {
|
||||
def getStage(@PathParam("appId") appId: String): OneStageResource = {
|
||||
uiRoot.withSparkUI(appId, None) { ui =>
|
||||
new OneStageResource(ui)
|
||||
}
|
||||
|
@ -171,7 +171,7 @@ private[spark] object ApiRootResource {
|
|||
def getServletHandler(uiRoot: UIRoot): ServletContextHandler = {
|
||||
val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
|
||||
jerseyContext.setContextPath("/api")
|
||||
val holder:ServletHolder = new ServletHolder(classOf[ServletContainer])
|
||||
val holder: ServletHolder = new ServletHolder(classOf[ServletContainer])
|
||||
holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
|
||||
"com.sun.jersey.api.core.PackagesResourceConfig")
|
||||
holder.setInitParameter("com.sun.jersey.config.property.packages",
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.spark.ui.SparkUI
|
|||
private[v1] class OneRDDResource(ui: SparkUI) {
|
||||
|
||||
@GET
|
||||
def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
|
||||
def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
|
||||
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
|
||||
throw new NotFoundException(s"no rdd found w/ id $rddId")
|
||||
)
|
||||
|
|
|
@ -134,7 +134,7 @@ class StageData private[spark](
|
|||
|
||||
val accumulatorUpdates: Seq[AccumulableInfo],
|
||||
val tasks: Option[Map[Long, TaskData]],
|
||||
val executorSummary:Option[Map[String,ExecutorStageSummary]])
|
||||
val executorSummary: Option[Map[String, ExecutorStageSummary]])
|
||||
|
||||
class TaskData private[spark](
|
||||
val taskId: Long,
|
||||
|
|
|
@ -40,7 +40,7 @@ class BlockManagerSlaveEndpoint(
|
|||
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
|
||||
|
||||
// Operations that involve removing blocks may be slow and should be done asynchronously
|
||||
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
|
||||
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
|
||||
case RemoveBlock(blockId) =>
|
||||
doAsync[Boolean]("removing block " + blockId, context) {
|
||||
blockManager.removeBlock(blockId)
|
||||
|
|
|
@ -17,9 +17,8 @@
|
|||
|
||||
package org.apache.spark.storage
|
||||
|
||||
import com.codahale.metrics.{Gauge,MetricRegistry}
|
||||
import com.codahale.metrics.{Gauge, MetricRegistry}
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.metrics.source.Source
|
||||
|
||||
private[spark] class BlockManagerSource(val blockManager: BlockManager)
|
||||
|
|
|
@ -137,7 +137,7 @@ private[spark] object SparkUI {
|
|||
jobProgressListener: JobProgressListener,
|
||||
securityManager: SecurityManager,
|
||||
appName: String,
|
||||
startTime: Long): SparkUI = {
|
||||
startTime: Long): SparkUI = {
|
||||
create(Some(sc), conf, listenerBus, securityManager, appName,
|
||||
jobProgressListener = Some(jobProgressListener), startTime = startTime)
|
||||
}
|
||||
|
|
|
@ -309,7 +309,7 @@ private[spark] object UIUtils extends Logging {
|
|||
started: Int,
|
||||
completed: Int,
|
||||
failed: Int,
|
||||
skipped:Int,
|
||||
skipped: Int,
|
||||
total: Int): Seq[Node] = {
|
||||
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
|
||||
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
|
||||
|
|
|
@ -54,7 +54,7 @@ private[spark] object UIWorkloadGenerator {
|
|||
val sc = new SparkContext(conf)
|
||||
|
||||
def setProperties(s: String): Unit = {
|
||||
if(schedulingMode == SchedulingMode.FAIR) {
|
||||
if (schedulingMode == SchedulingMode.FAIR) {
|
||||
sc.setLocalProperty("spark.scheduler.pool", s)
|
||||
}
|
||||
sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
|
||||
|
|
|
@ -33,7 +33,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
|
|||
val parameterId = request.getParameter("id")
|
||||
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
|
||||
val rddId = parameterId.toInt
|
||||
val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener,includeDetails = true)
|
||||
val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true)
|
||||
.getOrElse {
|
||||
// Rather than crashing, render an "RDD Not Found" page
|
||||
return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent)
|
||||
|
|
|
@ -63,7 +63,7 @@ private[spark] object AkkaUtils extends Logging {
|
|||
conf: SparkConf,
|
||||
securityManager: SecurityManager): (ActorSystem, Int) = {
|
||||
|
||||
val akkaThreads = conf.getInt("spark.akka.threads", 4)
|
||||
val akkaThreads = conf.getInt("spark.akka.threads", 4)
|
||||
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
|
||||
val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
|
||||
conf.get("spark.network.timeout", "120s"))
|
||||
|
|
|
@ -42,7 +42,7 @@ abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterat
|
|||
|
||||
private[spark] object CompletionIterator {
|
||||
def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A, I] = {
|
||||
new CompletionIterator[A,I](sub) {
|
||||
new CompletionIterator[A, I](sub) {
|
||||
def completion(): Unit = completionFunction
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va
|
|||
java.util.Arrays.sort(data, startIdx, endIdx)
|
||||
val length = endIdx - startIdx
|
||||
|
||||
val defaultProbabilities = Array(0,0.25,0.5,0.75,1.0)
|
||||
val defaultProbabilities = Array(0, 0.25, 0.5, 0.75, 1.0)
|
||||
|
||||
/**
|
||||
* Get the value of the distribution at the given probabilities. Probabilities should be
|
||||
|
@ -44,7 +44,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va
|
|||
*/
|
||||
def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities)
|
||||
: IndexedSeq[Double] = {
|
||||
probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))}
|
||||
probabilities.toIndexedSeq.map { p: Double => data(closestIndex(p)) }
|
||||
}
|
||||
|
||||
private def closestIndex(p: Double) = {
|
||||
|
|
|
@ -89,7 +89,7 @@ private[spark] object MetadataCleaner {
|
|||
conf: SparkConf,
|
||||
cleanerType: MetadataCleanerType.MetadataCleanerType,
|
||||
delay: Int) {
|
||||
conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
|
||||
conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -45,5 +45,5 @@ case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef
|
|||
|
||||
override def toString: String = "(" + _1 + "," + _2 + ")"
|
||||
|
||||
override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[_,_]]
|
||||
override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[_, _]]
|
||||
}
|
||||
|
|
|
@ -54,14 +54,14 @@ object SizeEstimator extends Logging {
|
|||
def estimate(obj: AnyRef): Long = estimate(obj, new IdentityHashMap[AnyRef, AnyRef])
|
||||
|
||||
// Sizes of primitive types
|
||||
private val BYTE_SIZE = 1
|
||||
private val BYTE_SIZE = 1
|
||||
private val BOOLEAN_SIZE = 1
|
||||
private val CHAR_SIZE = 2
|
||||
private val SHORT_SIZE = 2
|
||||
private val INT_SIZE = 4
|
||||
private val LONG_SIZE = 8
|
||||
private val FLOAT_SIZE = 4
|
||||
private val DOUBLE_SIZE = 8
|
||||
private val CHAR_SIZE = 2
|
||||
private val SHORT_SIZE = 2
|
||||
private val INT_SIZE = 4
|
||||
private val LONG_SIZE = 8
|
||||
private val FLOAT_SIZE = 4
|
||||
private val DOUBLE_SIZE = 8
|
||||
|
||||
// Fields can be primitive types, sizes are: 1, 2, 4, 8. Or fields can be pointers. The size of
|
||||
// a pointer is 4 or 8 depending on the JVM (32-bit or 64-bit) and UseCompressedOops flag.
|
||||
|
@ -96,7 +96,7 @@ object SizeEstimator extends Logging {
|
|||
isCompressedOops = getIsCompressedOops
|
||||
|
||||
objectSize = if (!is64bit) 8 else {
|
||||
if(!isCompressedOops) {
|
||||
if (!isCompressedOops) {
|
||||
16
|
||||
} else {
|
||||
12
|
||||
|
|
|
@ -882,7 +882,7 @@ private[spark] object Utils extends Logging {
|
|||
// If not, we should change it to LRUCache or something.
|
||||
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
|
||||
|
||||
def parseHostPort(hostPort: String): (String, Int) = {
|
||||
def parseHostPort(hostPort: String): (String, Int) = {
|
||||
// Check cache first.
|
||||
val cached = hostPortParseResults.get(hostPort)
|
||||
if (cached != null) {
|
||||
|
|
|
@ -161,7 +161,7 @@ class BitSet(numBits: Int) extends Serializable {
|
|||
override def hasNext: Boolean = ind >= 0
|
||||
override def next(): Int = {
|
||||
val tmp = ind
|
||||
ind = nextSetBit(ind + 1)
|
||||
ind = nextSetBit(ind + 1)
|
||||
tmp
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,9 +90,9 @@ class KVArraySortDataFormat[K, T <: AnyRef : ClassTag] extends SortDataFormat[K,
|
|||
override def swap(data: Array[T], pos0: Int, pos1: Int) {
|
||||
val tmpKey = data(2 * pos0)
|
||||
val tmpVal = data(2 * pos0 + 1)
|
||||
data(2 * pos0) = data(2 * pos1)
|
||||
data(2 * pos0) = data(2 * pos1)
|
||||
data(2 * pos0 + 1) = data(2 * pos1 + 1)
|
||||
data(2 * pos1) = tmpKey
|
||||
data(2 * pos1) = tmpKey
|
||||
data(2 * pos1 + 1) = tmpVal
|
||||
}
|
||||
|
||||
|
|
|
@ -196,7 +196,7 @@ private[spark] object StratifiedSamplingUtils extends Logging {
|
|||
*
|
||||
* The sampling function has a unique seed per partition.
|
||||
*/
|
||||
def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)],
|
||||
def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)],
|
||||
fractions: Map[K, Double],
|
||||
exact: Boolean,
|
||||
seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = {
|
||||
|
|
|
@ -103,7 +103,7 @@ class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
|
|||
sc = new SparkContext("local[" + nThreads + "]", "test")
|
||||
val setAcc = sc.accumulableCollection(mutable.HashSet[Int]())
|
||||
val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]())
|
||||
val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]())
|
||||
val mapAcc = sc.accumulableCollection(mutable.HashMap[Int, String]())
|
||||
val d = sc.parallelize((1 to maxI) ++ (1 to maxI))
|
||||
d.foreach {
|
||||
x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)}
|
||||
|
|
|
@ -218,10 +218,10 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
|
|||
val pairRDD = generateFatPairRDD()
|
||||
pairRDD.checkpoint()
|
||||
val unionRDD = new PartitionerAwareUnionRDD(sc, Array(pairRDD))
|
||||
val partitionBeforeCheckpoint = serializeDeserialize(
|
||||
val partitionBeforeCheckpoint = serializeDeserialize(
|
||||
unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
|
||||
pairRDD.count()
|
||||
val partitionAfterCheckpoint = serializeDeserialize(
|
||||
val partitionAfterCheckpoint = serializeDeserialize(
|
||||
unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
|
||||
assert(
|
||||
partitionBeforeCheckpoint.parents.head.getClass !=
|
||||
|
|
|
@ -158,7 +158,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
|
|||
rdd.count()
|
||||
|
||||
// Test that GC does not cause RDD cleanup due to a strong reference
|
||||
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
|
||||
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
|
||||
runGC()
|
||||
intercept[Exception] {
|
||||
preGCTester.assertCleanup()(timeout(1000 millis))
|
||||
|
@ -195,7 +195,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
|
|||
var broadcast = newBroadcast()
|
||||
|
||||
// Test that GC does not cause broadcast cleanup due to a strong reference
|
||||
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
|
||||
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
|
||||
runGC()
|
||||
intercept[Exception] {
|
||||
preGCTester.assertCleanup()(timeout(1000 millis))
|
||||
|
@ -267,7 +267,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
|
|||
val shuffleIds = 0 until sc.newShuffleId
|
||||
val broadcastIds = broadcastBuffer.map(_.id)
|
||||
|
||||
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
|
||||
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
|
||||
runGC()
|
||||
intercept[Exception] {
|
||||
preGCTester.assertCleanup()(timeout(1000 millis))
|
||||
|
|
|
@ -57,7 +57,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
|
|||
FailureSuiteState.synchronized {
|
||||
assert(FailureSuiteState.tasksRun === 4)
|
||||
}
|
||||
assert(results.toList === List(1,4,9))
|
||||
assert(results.toList === List(1, 4, 9))
|
||||
FailureSuiteState.clear()
|
||||
}
|
||||
|
||||
|
|
|
@ -81,7 +81,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
|
|||
test("Distributing files locally") {
|
||||
sc = new SparkContext("local[4]", "test", newConf)
|
||||
sc.addFile(tmpFile.toString)
|
||||
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
|
||||
val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
|
||||
val result = sc.parallelize(testData).reduceByKey {
|
||||
val path = SparkFiles.get("FileServerSuite.txt")
|
||||
val in = new BufferedReader(new FileReader(path))
|
||||
|
@ -89,7 +89,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
|
|||
in.close()
|
||||
_ * fileVal + _ * fileVal
|
||||
}.collect()
|
||||
assert(result.toSet === Set((1,200), (2,300), (3,500)))
|
||||
assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
|
||||
}
|
||||
|
||||
test("Distributing files locally security On") {
|
||||
|
@ -100,7 +100,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
|
|||
|
||||
sc.addFile(tmpFile.toString)
|
||||
assert(sc.env.securityManager.isAuthenticationEnabled() === true)
|
||||
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
|
||||
val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
|
||||
val result = sc.parallelize(testData).reduceByKey {
|
||||
val path = SparkFiles.get("FileServerSuite.txt")
|
||||
val in = new BufferedReader(new FileReader(path))
|
||||
|
@ -108,14 +108,14 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
|
|||
in.close()
|
||||
_ * fileVal + _ * fileVal
|
||||
}.collect()
|
||||
assert(result.toSet === Set((1,200), (2,300), (3,500)))
|
||||
assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
|
||||
}
|
||||
|
||||
test("Distributing files locally using URL as input") {
|
||||
// addFile("file:///....")
|
||||
sc = new SparkContext("local[4]", "test", newConf)
|
||||
sc.addFile(new File(tmpFile.toString).toURI.toString)
|
||||
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
|
||||
val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
|
||||
val result = sc.parallelize(testData).reduceByKey {
|
||||
val path = SparkFiles.get("FileServerSuite.txt")
|
||||
val in = new BufferedReader(new FileReader(path))
|
||||
|
@ -123,7 +123,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
|
|||
in.close()
|
||||
_ * fileVal + _ * fileVal
|
||||
}.collect()
|
||||
assert(result.toSet === Set((1,200), (2,300), (3,500)))
|
||||
assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
|
||||
}
|
||||
|
||||
test ("Dynamically adding JARS locally") {
|
||||
|
@ -140,7 +140,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
|
|||
test("Distributing files on a standalone cluster") {
|
||||
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
|
||||
sc.addFile(tmpFile.toString)
|
||||
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
|
||||
val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
|
||||
val result = sc.parallelize(testData).reduceByKey {
|
||||
val path = SparkFiles.get("FileServerSuite.txt")
|
||||
val in = new BufferedReader(new FileReader(path))
|
||||
|
@ -148,13 +148,13 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
|
|||
in.close()
|
||||
_ * fileVal + _ * fileVal
|
||||
}.collect()
|
||||
assert(result.toSet === Set((1,200), (2,300), (3,500)))
|
||||
assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
|
||||
}
|
||||
|
||||
test ("Dynamically adding JARS on a standalone cluster") {
|
||||
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
|
||||
sc.addJar(tmpJarUrl)
|
||||
val testData = Array((1,1))
|
||||
val testData = Array((1, 1))
|
||||
sc.parallelize(testData).foreach { x =>
|
||||
if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
|
||||
throw new SparkException("jar not added")
|
||||
|
@ -165,7 +165,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
|
|||
test ("Dynamically adding JARS on a standalone cluster using local: URL") {
|
||||
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
|
||||
sc.addJar(tmpJarUrl.replace("file", "local"))
|
||||
val testData = Array((1,1))
|
||||
val testData = Array((1, 1))
|
||||
sc.parallelize(testData).foreach { x =>
|
||||
if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
|
||||
throw new SparkException("jar not added")
|
||||
|
|
|
@ -334,7 +334,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
|
|||
}
|
||||
val copyRdd = mappedRdd.flatMap {
|
||||
curData: (String, PortableDataStream) =>
|
||||
for(i <- 1 to numOfCopies) yield (i, curData._2)
|
||||
for (i <- 1 to numOfCopies) yield (i, curData._2)
|
||||
}
|
||||
|
||||
val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect()
|
||||
|
|
|
@ -44,11 +44,11 @@ private object ImplicitOrderingSuite {
|
|||
class NonOrderedClass {}
|
||||
|
||||
class ComparableClass extends Comparable[ComparableClass] {
|
||||
override def compareTo(o: ComparableClass): Int = ???
|
||||
override def compareTo(o: ComparableClass): Int = throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
class OrderedClass extends Ordered[OrderedClass] {
|
||||
override def compare(o: OrderedClass): Int = ???
|
||||
override def compare(o: OrderedClass): Int = throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
def basicMapExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
|
||||
|
|
|
@ -34,18 +34,18 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
|
|||
val conf = new SparkConf()
|
||||
// Simply exercise the API, we don't need a complete conversion test since that's handled in
|
||||
// UtilsSuite.scala
|
||||
assert(conf.getSizeAsBytes("fake","1k") === ByteUnit.KiB.toBytes(1))
|
||||
assert(conf.getSizeAsKb("fake","1k") === ByteUnit.KiB.toKiB(1))
|
||||
assert(conf.getSizeAsMb("fake","1k") === ByteUnit.KiB.toMiB(1))
|
||||
assert(conf.getSizeAsGb("fake","1k") === ByteUnit.KiB.toGiB(1))
|
||||
assert(conf.getSizeAsBytes("fake", "1k") === ByteUnit.KiB.toBytes(1))
|
||||
assert(conf.getSizeAsKb("fake", "1k") === ByteUnit.KiB.toKiB(1))
|
||||
assert(conf.getSizeAsMb("fake", "1k") === ByteUnit.KiB.toMiB(1))
|
||||
assert(conf.getSizeAsGb("fake", "1k") === ByteUnit.KiB.toGiB(1))
|
||||
}
|
||||
|
||||
test("Test timeString conversion") {
|
||||
val conf = new SparkConf()
|
||||
// Simply exercise the API, we don't need a complete conversion test since that's handled in
|
||||
// UtilsSuite.scala
|
||||
assert(conf.getTimeAsMs("fake","1ms") === TimeUnit.MILLISECONDS.toMillis(1))
|
||||
assert(conf.getTimeAsSeconds("fake","1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000))
|
||||
assert(conf.getTimeAsMs("fake", "1ms") === TimeUnit.MILLISECONDS.toMillis(1))
|
||||
assert(conf.getTimeAsSeconds("fake", "1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000))
|
||||
}
|
||||
|
||||
test("loading from system properties") {
|
||||
|
|
|
@ -222,8 +222,8 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
|
|||
val dir1 = Utils.createTempDir()
|
||||
val dir2 = Utils.createTempDir()
|
||||
|
||||
val dirpath1=dir1.getAbsolutePath
|
||||
val dirpath2=dir2.getAbsolutePath
|
||||
val dirpath1 = dir1.getAbsolutePath
|
||||
val dirpath2 = dir2.getAbsolutePath
|
||||
|
||||
// file1 and file2 are placed inside dir1, they are also used for
|
||||
// textFile, hadoopFile, and newAPIHadoopFile
|
||||
|
@ -235,11 +235,11 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
|
|||
val file4 = new File(dir2, "part-00001")
|
||||
val file5 = new File(dir2, "part-00002")
|
||||
|
||||
val filepath1=file1.getAbsolutePath
|
||||
val filepath2=file2.getAbsolutePath
|
||||
val filepath3=file3.getAbsolutePath
|
||||
val filepath4=file4.getAbsolutePath
|
||||
val filepath5=file5.getAbsolutePath
|
||||
val filepath1 = file1.getAbsolutePath
|
||||
val filepath2 = file2.getAbsolutePath
|
||||
val filepath3 = file3.getAbsolutePath
|
||||
val filepath4 = file4.getAbsolutePath
|
||||
val filepath5 = file5.getAbsolutePath
|
||||
|
||||
|
||||
try {
|
||||
|
|
|
@ -286,7 +286,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
|
|||
assert(statuses.size === expectedNumBlocks)
|
||||
}
|
||||
|
||||
testUnpersistBroadcast(distributed, numSlaves, torrentConf, afterCreation,
|
||||
testUnpersistBroadcast(distributed, numSlaves, torrentConf, afterCreation,
|
||||
afterUsingBroadcast, afterUnpersist, removeFromDriver)
|
||||
}
|
||||
|
||||
|
|
|
@ -66,7 +66,7 @@ class WorkerArgumentsTest extends FunSuite {
|
|||
}
|
||||
}
|
||||
val conf = new MySparkConf()
|
||||
val workerArgs = new WorkerArguments(args, conf)
|
||||
val workerArgs = new WorkerArguments(args, conf)
|
||||
assert(workerArgs.memory === 5120)
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.scalatest.{Matchers, FunSuite}
|
|||
class WorkerSuite extends FunSuite with Matchers {
|
||||
|
||||
def cmd(javaOpts: String*): Command = {
|
||||
Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts:_*))
|
||||
Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts : _*))
|
||||
}
|
||||
def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)
|
||||
|
||||
|
|
|
@ -263,7 +263,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext
|
|||
|
||||
val tmpRdd = sc.textFile(tmpFilePath, numPartitions)
|
||||
|
||||
val firstSize= runAndReturnBytesRead {
|
||||
val firstSize = runAndReturnBytesRead {
|
||||
aRdd.count()
|
||||
}
|
||||
val secondSize = runAndReturnBytesRead {
|
||||
|
@ -433,10 +433,10 @@ class OldCombineTextRecordReaderWrapper(
|
|||
/**
|
||||
* Hadoop 2 has a version of this, but we can't use it for backwards compatibility
|
||||
*/
|
||||
class NewCombineTextInputFormat extends NewCombineFileInputFormat[LongWritable,Text] {
|
||||
class NewCombineTextInputFormat extends NewCombineFileInputFormat[LongWritable, Text] {
|
||||
def createRecordReader(split: NewInputSplit, context: TaskAttemptContext)
|
||||
: NewRecordReader[LongWritable, Text] = {
|
||||
new NewCombineFileRecordReader[LongWritable,Text](split.asInstanceOf[NewCombineFileSplit],
|
||||
new NewCombineFileRecordReader[LongWritable, Text](split.asInstanceOf[NewCombineFileSplit],
|
||||
context, classOf[NewCombineTextRecordReaderWrapper])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -512,17 +512,17 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
|
|||
}
|
||||
|
||||
test("lookup") {
|
||||
val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
|
||||
val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7)))
|
||||
|
||||
assert(pairs.partitioner === None)
|
||||
assert(pairs.lookup(1) === Seq(2))
|
||||
assert(pairs.lookup(5) === Seq(6,7))
|
||||
assert(pairs.lookup(5) === Seq(6, 7))
|
||||
assert(pairs.lookup(-1) === Seq())
|
||||
|
||||
}
|
||||
|
||||
test("lookup with partitioner") {
|
||||
val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
|
||||
val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7)))
|
||||
|
||||
val p = new Partitioner {
|
||||
def numPartitions: Int = 2
|
||||
|
@ -533,12 +533,12 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
|
|||
|
||||
assert(shuffled.partitioner === Some(p))
|
||||
assert(shuffled.lookup(1) === Seq(2))
|
||||
assert(shuffled.lookup(5) === Seq(6,7))
|
||||
assert(shuffled.lookup(5) === Seq(6, 7))
|
||||
assert(shuffled.lookup(-1) === Seq())
|
||||
}
|
||||
|
||||
test("lookup with bad partitioner") {
|
||||
val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
|
||||
val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7)))
|
||||
|
||||
val p = new Partitioner {
|
||||
def numPartitions: Int = 2
|
||||
|
|
|
@ -338,10 +338,10 @@ class RDDSuite extends FunSuite with SharedSparkContext {
|
|||
}
|
||||
|
||||
test("coalesced RDDs with locality") {
|
||||
val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
|
||||
val data3 = sc.makeRDD(List((1, List("a", "c")), (2, List("a", "b", "c")), (3, List("b"))))
|
||||
val coal3 = data3.coalesce(3)
|
||||
val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation)
|
||||
assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
|
||||
assert(list3.sorted === Array("a", "b", "c"), "Locality preferences are dropped")
|
||||
|
||||
// RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
|
||||
val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i + 2)).map{ j => "m" + (j%6)})))
|
||||
|
@ -591,8 +591,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
|
|||
assert(sc.emptyRDD.isEmpty())
|
||||
assert(sc.parallelize(Seq[Int]()).isEmpty())
|
||||
assert(!sc.parallelize(Seq(1)).isEmpty())
|
||||
assert(sc.parallelize(Seq(1,2,3), 3).filter(_ < 0).isEmpty())
|
||||
assert(!sc.parallelize(Seq(1,2,3), 3).filter(_ > 1).isEmpty())
|
||||
assert(sc.parallelize(Seq(1, 2, 3), 3).filter(_ < 0).isEmpty())
|
||||
assert(!sc.parallelize(Seq(1, 2, 3), 3).filter(_ > 1).isEmpty())
|
||||
}
|
||||
|
||||
test("sample preserves partitioner") {
|
||||
|
@ -609,49 +609,49 @@ class RDDSuite extends FunSuite with SharedSparkContext {
|
|||
val data = sc.parallelize(1 to n, 2)
|
||||
|
||||
for (num <- List(5, 20, 100)) {
|
||||
val sample = data.takeSample(withReplacement=false, num=num)
|
||||
val sample = data.takeSample(withReplacement = false, num = num)
|
||||
assert(sample.size === num) // Got exactly num elements
|
||||
assert(sample.toSet.size === num) // Elements are distinct
|
||||
assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]")
|
||||
}
|
||||
for (seed <- 1 to 5) {
|
||||
val sample = data.takeSample(withReplacement=false, 20, seed)
|
||||
val sample = data.takeSample(withReplacement = false, 20, seed)
|
||||
assert(sample.size === 20) // Got exactly 20 elements
|
||||
assert(sample.toSet.size === 20) // Elements are distinct
|
||||
assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]")
|
||||
}
|
||||
for (seed <- 1 to 5) {
|
||||
val sample = data.takeSample(withReplacement=false, 100, seed)
|
||||
val sample = data.takeSample(withReplacement = false, 100, seed)
|
||||
assert(sample.size === 100) // Got only 100 elements
|
||||
assert(sample.toSet.size === 100) // Elements are distinct
|
||||
assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]")
|
||||
}
|
||||
for (seed <- 1 to 5) {
|
||||
val sample = data.takeSample(withReplacement=true, 20, seed)
|
||||
val sample = data.takeSample(withReplacement = true, 20, seed)
|
||||
assert(sample.size === 20) // Got exactly 20 elements
|
||||
assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]")
|
||||
}
|
||||
{
|
||||
val sample = data.takeSample(withReplacement=true, num=20)
|
||||
val sample = data.takeSample(withReplacement = true, num = 20)
|
||||
assert(sample.size === 20) // Got exactly 100 elements
|
||||
assert(sample.toSet.size <= 20, "sampling with replacement returned all distinct elements")
|
||||
assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]")
|
||||
}
|
||||
{
|
||||
val sample = data.takeSample(withReplacement=true, num=n)
|
||||
val sample = data.takeSample(withReplacement = true, num = n)
|
||||
assert(sample.size === n) // Got exactly 100 elements
|
||||
// Chance of getting all distinct elements is astronomically low, so test we got < 100
|
||||
assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements")
|
||||
assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]")
|
||||
}
|
||||
for (seed <- 1 to 5) {
|
||||
val sample = data.takeSample(withReplacement=true, n, seed)
|
||||
val sample = data.takeSample(withReplacement = true, n, seed)
|
||||
assert(sample.size === n) // Got exactly 100 elements
|
||||
// Chance of getting all distinct elements is astronomically low, so test we got < 100
|
||||
assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements")
|
||||
}
|
||||
for (seed <- 1 to 5) {
|
||||
val sample = data.takeSample(withReplacement=true, 2 * n, seed)
|
||||
val sample = data.takeSample(withReplacement = true, 2 * n, seed)
|
||||
assert(sample.size === 2 * n) // Got exactly 200 elements
|
||||
// Chance of getting all distinct elements is still quite low, so test we got < 100
|
||||
assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements")
|
||||
|
@ -691,7 +691,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
|
|||
}
|
||||
|
||||
test("sortByKey") {
|
||||
val data = sc.parallelize(Seq("5|50|A","4|60|C", "6|40|B"))
|
||||
val data = sc.parallelize(Seq("5|50|A", "4|60|C", "6|40|B"))
|
||||
|
||||
val col1 = Array("4|60|C", "5|50|A", "6|40|B")
|
||||
val col2 = Array("6|40|B", "5|50|A", "4|60|C")
|
||||
|
@ -703,7 +703,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
|
|||
}
|
||||
|
||||
test("sortByKey ascending parameter") {
|
||||
val data = sc.parallelize(Seq("5|50|A","4|60|C", "6|40|B"))
|
||||
val data = sc.parallelize(Seq("5|50|A", "4|60|C", "6|40|B"))
|
||||
|
||||
val asc = Array("4|60|C", "5|50|A", "6|40|B")
|
||||
val desc = Array("6|40|B", "5|50|A", "4|60|C")
|
||||
|
@ -764,9 +764,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
|
|||
}
|
||||
|
||||
test("intersection strips duplicates in an input") {
|
||||
val a = sc.parallelize(Seq(1,2,3,3))
|
||||
val b = sc.parallelize(Seq(1,1,2,3))
|
||||
val intersection = Array(1,2,3)
|
||||
val a = sc.parallelize(Seq(1, 2, 3, 3))
|
||||
val b = sc.parallelize(Seq(1, 1, 2, 3))
|
||||
val intersection = Array(1, 2, 3)
|
||||
|
||||
assert(a.intersection(b).collect().sorted === intersection)
|
||||
assert(b.intersection(a).collect().sorted === intersection)
|
||||
|
|
|
@ -21,11 +21,11 @@ object RDDSuiteUtils {
|
|||
case class Person(first: String, last: String, age: Int)
|
||||
|
||||
object AgeOrdering extends Ordering[Person] {
|
||||
def compare(a:Person, b:Person): Int = a.age.compare(b.age)
|
||||
def compare(a: Person, b: Person): Int = a.age.compare(b.age)
|
||||
}
|
||||
|
||||
object NameOrdering extends Ordering[Person] {
|
||||
def compare(a:Person, b:Person): Int =
|
||||
implicitly[Ordering[Tuple2[String,String]]].compare((a.last, a.first), (b.last, b.first))
|
||||
def compare(a: Person, b: Person): Int =
|
||||
implicitly[Ordering[Tuple2[String, String]]].compare((a.last, a.first), (b.last, b.first))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ class SortingSuite extends FunSuite with SharedSparkContext with Matchers with L
|
|||
|
||||
test("sortByKey") {
|
||||
val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)
|
||||
assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
|
||||
assert(pairs.sortByKey().collect() === Array((0, 0), (1, 0), (2, 0), (3, 0)))
|
||||
}
|
||||
|
||||
test("large array") {
|
||||
|
@ -136,7 +136,7 @@ class SortingSuite extends FunSuite with SharedSparkContext with Matchers with L
|
|||
|
||||
test("get a range of elements in an array not partitioned by a range partitioner") {
|
||||
val pairArr = util.Random.shuffle((1 to 1000).toList).map(x => (x, x))
|
||||
val pairs = sc.parallelize(pairArr,10)
|
||||
val pairs = sc.parallelize(pairArr, 10)
|
||||
val range = pairs.filterByRange(200, 800).collect()
|
||||
assert((800 to 200 by -1).toArray.sorted === range.map(_._1).sorted)
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
|
|||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
if(env != null) {
|
||||
if (env != null) {
|
||||
env.shutdown()
|
||||
}
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
|
|||
}
|
||||
})
|
||||
|
||||
val anotherEnv = createRpcEnv(new SparkConf(), "remote" ,13345)
|
||||
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
|
||||
// Use anotherEnv to find out the RpcEndpointRef
|
||||
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "send-remotely")
|
||||
try {
|
||||
|
@ -338,7 +338,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
|
|||
|
||||
test("call receive in sequence") {
|
||||
// If a RpcEnv implementation breaks the `receive` contract, hope this test can expose it
|
||||
for(i <- 0 until 100) {
|
||||
for (i <- 0 until 100) {
|
||||
@volatile var result = 0
|
||||
val endpointRef = env.setupEndpoint(s"receive-in-sequence-$i", new ThreadSafeRpcEndpoint {
|
||||
override val rpcEnv = env
|
||||
|
|
|
@ -26,8 +26,8 @@ class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext
|
|||
|
||||
test("serialized task larger than akka frame size") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.akka.frameSize","1")
|
||||
conf.set("spark.default.parallelism","1")
|
||||
conf.set("spark.akka.frameSize", "1")
|
||||
conf.set("spark.default.parallelism", "1")
|
||||
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
|
||||
val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
|
||||
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
|
||||
|
|
|
@ -375,7 +375,7 @@ class DAGSchedulerSuite
|
|||
(1 to 30).foreach(_ => rdd = rdd.zip(rdd))
|
||||
// getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided.
|
||||
failAfter(10 seconds) {
|
||||
val preferredLocs = scheduler.getPreferredLocs(rdd,0)
|
||||
val preferredLocs = scheduler.getPreferredLocs(rdd, 0)
|
||||
// No preferred locations are returned.
|
||||
assert(preferredLocs.length === 0)
|
||||
}
|
||||
|
@ -634,8 +634,8 @@ class DAGSchedulerSuite
|
|||
val listener1 = new FailureRecordingJobListener()
|
||||
val listener2 = new FailureRecordingJobListener()
|
||||
|
||||
submit(reduceRdd1, Array(0, 1), listener=listener1)
|
||||
submit(reduceRdd2, Array(0, 1), listener=listener2)
|
||||
submit(reduceRdd1, Array(0, 1), listener = listener1)
|
||||
submit(reduceRdd2, Array(0, 1), listener = listener2)
|
||||
|
||||
val stageFailureMessage = "Exception failure in map stage"
|
||||
failed(taskSets(0), stageFailureMessage)
|
||||
|
|
|
@ -97,9 +97,9 @@ class PoolSuite extends FunSuite with LocalSparkContext {
|
|||
assert(rootPool.getSchedulableByName("3").weight === 1)
|
||||
|
||||
val properties1 = new Properties()
|
||||
properties1.setProperty("spark.scheduler.pool","1")
|
||||
properties1.setProperty("spark.scheduler.pool", "1")
|
||||
val properties2 = new Properties()
|
||||
properties2.setProperty("spark.scheduler.pool","2")
|
||||
properties2.setProperty("spark.scheduler.pool", "2")
|
||||
|
||||
val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler)
|
||||
val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler)
|
||||
|
|
|
@ -109,7 +109,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
|
|||
check((1, 1))
|
||||
check((1, 1L))
|
||||
check((1L, 1))
|
||||
check((1L, 1L))
|
||||
check((1L, 1L))
|
||||
check((1.0, 1))
|
||||
check((1, 1.0))
|
||||
check((1.0, 1.0))
|
||||
|
@ -147,7 +147,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
|
|||
check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4))))
|
||||
check(List(
|
||||
mutable.HashMap("one" -> 1, "two" -> 2),
|
||||
mutable.HashMap(1->"one",2->"two",3->"three")))
|
||||
mutable.HashMap(1->"one", 2->"two", 3->"three")))
|
||||
}
|
||||
|
||||
test("ranges") {
|
||||
|
|
|
@ -66,18 +66,18 @@ class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContex
|
|||
}
|
||||
|
||||
private def xmap(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
x.map(y=>uc.op(y))
|
||||
x.map(y => uc.op(y))
|
||||
|
||||
private def xflatMap(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
x.flatMap(y=>Seq(uc.op(y)))
|
||||
x.flatMap(y => Seq(uc.op(y)))
|
||||
|
||||
private def xfilter(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
x.filter(y=>uc.pred(y))
|
||||
x.filter(y => uc.pred(y))
|
||||
|
||||
private def xmapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
x.mapPartitions(_.map(y=>uc.op(y)))
|
||||
x.mapPartitions(_.map(y => uc.op(y)))
|
||||
|
||||
private def xmapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
x.mapPartitionsWithIndex((_, it) => it.map(y=>uc.op(y)))
|
||||
x.mapPartitionsWithIndex((_, it) => it.map(y => uc.op(y)))
|
||||
|
||||
}
|
||||
|
|
|
@ -32,16 +32,19 @@ class TestSerializer extends Serializer {
|
|||
|
||||
|
||||
class TestSerializerInstance extends SerializerInstance {
|
||||
override def serialize[T: ClassTag](t: T): ByteBuffer = ???
|
||||
override def serialize[T: ClassTag](t: T): ByteBuffer = throw new UnsupportedOperationException
|
||||
|
||||
override def serializeStream(s: OutputStream): SerializationStream = ???
|
||||
override def serializeStream(s: OutputStream): SerializationStream =
|
||||
throw new UnsupportedOperationException
|
||||
|
||||
override def deserializeStream(s: InputStream): TestDeserializationStream =
|
||||
new TestDeserializationStream
|
||||
|
||||
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = ???
|
||||
override def deserialize[T: ClassTag](bytes: ByteBuffer): T =
|
||||
throw new UnsupportedOperationException
|
||||
|
||||
override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = ???
|
||||
override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T =
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -59,10 +59,10 @@ class FlatmapIteratorSuite extends FunSuite with LocalSparkContext {
|
|||
.set("spark.serializer.objectStreamReset", "10")
|
||||
sc = new SparkContext(sconf)
|
||||
val expand_size = 500
|
||||
val data = sc.parallelize(Seq(1,2)).
|
||||
val data = sc.parallelize(Seq(1, 2)).
|
||||
flatMap(x => Stream.range(1, expand_size).
|
||||
map(y => "%d: string test %d".format(y,x)))
|
||||
var persisted = data.persist(StorageLevel.MEMORY_ONLY_SER)
|
||||
map(y => "%d: string test %d".format(y, x)))
|
||||
val persisted = data.persist(StorageLevel.MEMORY_ONLY_SER)
|
||||
assert(persisted.filter(_.startsWith("1:")).count()===2)
|
||||
}
|
||||
|
||||
|
|
|
@ -483,11 +483,11 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
|
|||
val jobsJson = getJson(sc.ui.get, "jobs")
|
||||
jobsJson.children.size should be (expJobInfo.size)
|
||||
for {
|
||||
(job @ JObject(_),idx) <- jobsJson.children.zipWithIndex
|
||||
(job @ JObject(_), idx) <- jobsJson.children.zipWithIndex
|
||||
id = (job \ "jobId").extract[String]
|
||||
name = (job \ "name").extract[String]
|
||||
} {
|
||||
withClue(s"idx = $idx; id = $id; name = ${name.substring(0,20)}") {
|
||||
withClue(s"idx = $idx; id = $id; name = ${name.substring(0, 20)}") {
|
||||
id should be (expJobInfo(idx)._1)
|
||||
name should include (expJobInfo(idx)._2)
|
||||
}
|
||||
|
@ -540,12 +540,12 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
|
|||
|
||||
goToUi(sc, "/stages/stage/?id=12&attempt=0")
|
||||
find("no-info").get.text should be ("No information to display for Stage 12 (Attempt 0)")
|
||||
val badStage = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get,"stages/12/0"))
|
||||
val badStage = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get, "stages/12/0"))
|
||||
badStage._1 should be (HttpServletResponse.SC_NOT_FOUND)
|
||||
badStage._2 should be (None)
|
||||
badStage._3 should be (Some("unknown stage: 12"))
|
||||
|
||||
val badAttempt = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get,"stages/19/15"))
|
||||
val badAttempt = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get, "stages/19/15"))
|
||||
badAttempt._1 should be (HttpServletResponse.SC_NOT_FOUND)
|
||||
badAttempt._2 should be (None)
|
||||
badAttempt._3 should be (Some("unknown attempt for stage 19. Found attempts: [0]"))
|
||||
|
|
|
@ -169,7 +169,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
|
|||
test("verify StorageTab contains all cached rdds") {
|
||||
|
||||
val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, Seq(4))
|
||||
val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly, Seq(4))
|
||||
val rddInfo1 = new RDDInfo(1, "rdd1", 1, memOnly, Seq(4))
|
||||
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details")
|
||||
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details")
|
||||
val taskMetrics0 = new TaskMetrics
|
||||
|
|
|
@ -138,7 +138,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
|
|||
|
||||
assert(securityManagerGood.isAuthenticationEnabled() === true)
|
||||
|
||||
val slaveRpcEnv =RpcEnv.create("spark-slave", hostname, 0, goodconf, securityManagerGood)
|
||||
val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, goodconf, securityManagerGood)
|
||||
val slaveTracker = new MapOutputTrackerWorker(conf)
|
||||
slaveTracker.trackerEndpoint =
|
||||
slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
|
||||
|
|
|
@ -551,7 +551,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties with Logging {
|
|||
test("fetch hcfs dir") {
|
||||
val tempDir = Utils.createTempDir()
|
||||
val sourceDir = new File(tempDir, "source-dir")
|
||||
val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath)
|
||||
val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
|
||||
val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
|
||||
val targetDir = new File(tempDir, "target-dir")
|
||||
Files.write("some text", sourceFile, UTF_8)
|
||||
|
|
|
@ -94,7 +94,7 @@ class BitSetSuite extends FunSuite {
|
|||
|
||||
test( "xor len(bitsetX) > len(bitsetY)" ) {
|
||||
val setBitsX = Seq( 0, 1, 3, 37, 38, 41, 85)
|
||||
val setBitsY = Seq( 0, 2, 3, 37, 41 )
|
||||
val setBitsY = Seq( 0, 2, 3, 37, 41)
|
||||
val bitsetX = new BitSet(100)
|
||||
setBitsX.foreach( i => bitsetX.set(i))
|
||||
val bitsetY = new BitSet(60)
|
||||
|
|
Loading…
Reference in a new issue