Added the missing RDD files and cleaned up SparkContext.

This commit is contained in:
Reynold Xin 2013-08-18 20:39:29 -07:00
parent 82bf4c0339
commit 2a7b99c08b
4 changed files with 126 additions and 12 deletions

View file

@ -20,19 +20,14 @@ package spark
import java.io._
import java.net.URI
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.util.DynamicVariable
import scala.collection.mutable.{ConcurrentMap, HashMap}
import akka.actor.Actor._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@ -54,7 +49,6 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.MesosNativeLibrary
@ -63,15 +57,14 @@ import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
OrderedRDDFunctions}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
import spark.ui.SparkUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import ui.{SparkUI}
import spark.metrics._
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@ -887,7 +880,7 @@ object SparkContext {
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions(rdd.asInstanceOf[RDD[Product2[K, V]]])
new OrderedRDDFunctions(rdd)
implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)

View file

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.rdd
import spark.{TaskContext, Partition, RDD}
private[spark]
class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
extends RDD[(K, U)](prev) {
override def getPartitions = firstParent[Product2[K, V]].partitions
override val partitioner = firstParent[Product2[K, V]].partitioner
override def compute(split: Partition, context: TaskContext) = {
firstParent[Product2[K, V]].iterator(split, context).flatMap { case (k, v) =>
f(v).map(x => (k, x))
}
}
}

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.rdd
import spark.{TaskContext, Partition, RDD}
private[spark]
class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
extends RDD[(K, U)](prev) {
override def getPartitions = firstParent[Product2[K, U]].partitions
override val partitioner = firstParent[Product2[K, U]].partitioner
override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
firstParent[Product2[K, V]].iterator(split, context).map { case(k ,v) => (k, f(v)) }
}
}

View file

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.rdd
import spark.{RangePartitioner, Logging, RDD}
/**
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
* an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
* functions. They will work with any key type that has a `scala.math.Ordered` implementation.
*/
class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
self: RDD[_ <: Product2[K, V]])
extends Logging with Serializable {
/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).
*/
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
: RDD[(K, V)] =
{
val part = new RangePartitioner(numPartitions, self.asInstanceOf[RDD[Product2[K,V]]], ascending)
val shuffled = new ShuffledRDD[K, V](self, part)
shuffled.mapPartitions(iter => {
val buf = iter.toArray
if (ascending) {
buf.sortWith((x, y) => x._1 < y._1).iterator
} else {
buf.sortWith((x, y) => x._1 > y._1).iterator
}
}, preservesPartitioning = true)
}
}