[SPARK-4795][Core] Redesign the "primitive type => Writable" implicit APIs to make them be activated automatically
Try to redesign the "primitive type => Writable" implicit APIs to make them be activated automatically and without breaking binary compatibility. However, this PR will breaking the source compatibility if people use `xxxToXxxWritable` occasionally. See the unit test in `graphx`. Author: zsxwing <zsxwing@gmail.com> Closes #3642 from zsxwing/SPARK-4795 and squashes the following commits: 914b2d6 [zsxwing] Add implicit back to the Writables methods 0b9017f [zsxwing] Add some docs a0e8509 [zsxwing] Merge branch 'master' into SPARK-4795 39343de [zsxwing] Fix the unit test 64853af [zsxwing] Reorganize the rest 'implicit' methods in SparkContext
This commit is contained in:
parent
1077f2e1de
commit
d37978d8aa
|
@ -1749,8 +1749,14 @@ object SparkContext extends Logging {
|
|||
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
|
||||
"kept here only for backward compatibility.", "1.3.0")
|
||||
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
|
||||
rdd: RDD[(K, V)]) =
|
||||
rdd: RDD[(K, V)]) = {
|
||||
val kf = implicitly[K => Writable]
|
||||
val vf = implicitly[V => Writable]
|
||||
// Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it
|
||||
implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf)
|
||||
implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf)
|
||||
RDD.rddToSequenceFileRDDFunctions(rdd)
|
||||
}
|
||||
|
||||
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
|
||||
"kept here only for backward compatibility.", "1.3.0")
|
||||
|
@ -1767,20 +1773,35 @@ object SparkContext extends Logging {
|
|||
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
|
||||
RDD.numericRDDToDoubleRDDFunctions(rdd)
|
||||
|
||||
// Implicit conversions to common Writable types, for saveAsSequenceFile
|
||||
// The following deprecated functions have already been moved to `object WritableFactory` to
|
||||
// make the compiler find them automatically. They are still kept here for backward compatibility.
|
||||
|
||||
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
|
||||
"kept here only for backward compatibility.", "1.3.0")
|
||||
implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i)
|
||||
|
||||
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
|
||||
"kept here only for backward compatibility.", "1.3.0")
|
||||
implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l)
|
||||
|
||||
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
|
||||
"kept here only for backward compatibility.", "1.3.0")
|
||||
implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f)
|
||||
|
||||
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
|
||||
"kept here only for backward compatibility.", "1.3.0")
|
||||
implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d)
|
||||
|
||||
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
|
||||
"kept here only for backward compatibility.", "1.3.0")
|
||||
implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b)
|
||||
|
||||
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
|
||||
"kept here only for backward compatibility.", "1.3.0")
|
||||
implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob)
|
||||
|
||||
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
|
||||
"kept here only for backward compatibility.", "1.3.0")
|
||||
implicit def stringToText(s: String): Text = new Text(s)
|
||||
|
||||
private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
|
||||
|
@ -2070,7 +2091,7 @@ object WritableConverter {
|
|||
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
|
||||
}
|
||||
|
||||
// The following implicit functions were in SparkContext before 1.2 and users had to
|
||||
// The following implicit functions were in SparkContext before 1.3 and users had to
|
||||
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
|
||||
// them automatically. However, we still keep the old functions in SparkContext for backward
|
||||
// compatibility and forward to the following functions directly.
|
||||
|
@ -2103,3 +2124,46 @@ object WritableConverter {
|
|||
implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
|
||||
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
|
||||
}
|
||||
|
||||
/**
|
||||
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
|
||||
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
|
||||
* The Writable class will be used in `SequenceFileRDDFunctions`.
|
||||
*/
|
||||
private[spark] class WritableFactory[T](
|
||||
val writableClass: ClassTag[T] => Class[_ <: Writable],
|
||||
val convert: T => Writable) extends Serializable
|
||||
|
||||
object WritableFactory {
|
||||
|
||||
private[spark] def simpleWritableFactory[T: ClassTag, W <: Writable : ClassTag](convert: T => W)
|
||||
: WritableFactory[T] = {
|
||||
val writableClass = implicitly[ClassTag[W]].runtimeClass.asInstanceOf[Class[W]]
|
||||
new WritableFactory[T](_ => writableClass, convert)
|
||||
}
|
||||
|
||||
implicit def intWritableFactory: WritableFactory[Int] =
|
||||
simpleWritableFactory(new IntWritable(_))
|
||||
|
||||
implicit def longWritableFactory: WritableFactory[Long] =
|
||||
simpleWritableFactory(new LongWritable(_))
|
||||
|
||||
implicit def floatWritableFactory: WritableFactory[Float] =
|
||||
simpleWritableFactory(new FloatWritable(_))
|
||||
|
||||
implicit def doubleWritableFactory: WritableFactory[Double] =
|
||||
simpleWritableFactory(new DoubleWritable(_))
|
||||
|
||||
implicit def booleanWritableFactory: WritableFactory[Boolean] =
|
||||
simpleWritableFactory(new BooleanWritable(_))
|
||||
|
||||
implicit def bytesWritableFactory: WritableFactory[Array[Byte]] =
|
||||
simpleWritableFactory(new BytesWritable(_))
|
||||
|
||||
implicit def stringWritableFactory: WritableFactory[String] =
|
||||
simpleWritableFactory(new Text(_))
|
||||
|
||||
implicit def writableWritableFactory[T <: Writable: ClassTag]: WritableFactory[T] =
|
||||
simpleWritableFactory(w => w)
|
||||
|
||||
}
|
||||
|
|
|
@ -107,7 +107,6 @@ private[python] class WritableToDoubleArrayConverter extends Converter[Any, Arra
|
|||
* given directory (probably a temp directory)
|
||||
*/
|
||||
object WriteInputFormatTestDataGenerator {
|
||||
import SparkContext._
|
||||
|
||||
def main(args: Array[String]) {
|
||||
val path = args(0)
|
||||
|
|
|
@ -27,8 +27,7 @@ package org.apache
|
|||
* contains operations available only on RDDs of Doubles; and
|
||||
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can
|
||||
* be saved as SequenceFiles. These operations are automatically available on any RDD of the right
|
||||
* type (e.g. RDD[(Int, Int)] through implicit conversions except `saveAsSequenceFile`. You need to
|
||||
* `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work.
|
||||
* type (e.g. RDD[(Int, Int)] through implicit conversions.
|
||||
*
|
||||
* Java programmers should reference the [[org.apache.spark.api.java]] package
|
||||
* for Spark programming APIs in Java.
|
||||
|
|
|
@ -25,11 +25,8 @@ import scala.language.implicitConversions
|
|||
import scala.reflect.{classTag, ClassTag}
|
||||
|
||||
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
|
||||
import org.apache.hadoop.io.BytesWritable
|
||||
import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text}
|
||||
import org.apache.hadoop.io.compress.CompressionCodec
|
||||
import org.apache.hadoop.io.NullWritable
|
||||
import org.apache.hadoop.io.Text
|
||||
import org.apache.hadoop.io.Writable
|
||||
import org.apache.hadoop.mapred.TextOutputFormat
|
||||
|
||||
import org.apache.spark._
|
||||
|
@ -57,8 +54,7 @@ import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, Bernoulli
|
|||
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
|
||||
* can be saved as SequenceFiles.
|
||||
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
|
||||
* through implicit conversions except `saveAsSequenceFile`. You need to
|
||||
* `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work.
|
||||
* through implicit.
|
||||
*
|
||||
* Internally, each RDD is characterized by five main properties:
|
||||
*
|
||||
|
@ -1527,7 +1523,7 @@ abstract class RDD[T: ClassTag](
|
|||
*/
|
||||
object RDD {
|
||||
|
||||
// The following implicit functions were in SparkContext before 1.2 and users had to
|
||||
// The following implicit functions were in SparkContext before 1.3 and users had to
|
||||
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
|
||||
// them automatically. However, we still keep the old functions in SparkContext for backward
|
||||
// compatibility and forward to the following functions directly.
|
||||
|
@ -1541,9 +1537,15 @@ object RDD {
|
|||
new AsyncRDDActions(rdd)
|
||||
}
|
||||
|
||||
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
|
||||
rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = {
|
||||
new SequenceFileRDDFunctions(rdd)
|
||||
implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
|
||||
(implicit kt: ClassTag[K], vt: ClassTag[V],
|
||||
keyWritableFactory: WritableFactory[K],
|
||||
valueWritableFactory: WritableFactory[V])
|
||||
: SequenceFileRDDFunctions[K, V] = {
|
||||
implicit val keyConverter = keyWritableFactory.convert
|
||||
implicit val valueConverter = valueWritableFactory.convert
|
||||
new SequenceFileRDDFunctions(rdd,
|
||||
keyWritableFactory.writableClass(kt), valueWritableFactory.writableClass(vt))
|
||||
}
|
||||
|
||||
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
|
||||
|
|
|
@ -30,13 +30,35 @@ import org.apache.spark.Logging
|
|||
* through an implicit conversion. Note that this can't be part of PairRDDFunctions because
|
||||
* we need more implicit parameters to convert our keys and values to Writable.
|
||||
*
|
||||
* Import `org.apache.spark.SparkContext._` at the top of their program to use these functions.
|
||||
*/
|
||||
class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag](
|
||||
self: RDD[(K, V)])
|
||||
self: RDD[(K, V)],
|
||||
_keyWritableClass: Class[_ <: Writable],
|
||||
_valueWritableClass: Class[_ <: Writable])
|
||||
extends Logging
|
||||
with Serializable {
|
||||
|
||||
@deprecated("It's used to provide backward compatibility for pre 1.3.0.", "1.3.0")
|
||||
def this(self: RDD[(K, V)]) {
|
||||
this(self, null, null)
|
||||
}
|
||||
|
||||
private val keyWritableClass =
|
||||
if (_keyWritableClass == null) {
|
||||
// pre 1.3.0, we need to use Reflection to get the Writable class
|
||||
getWritableClass[K]()
|
||||
} else {
|
||||
_keyWritableClass
|
||||
}
|
||||
|
||||
private val valueWritableClass =
|
||||
if (_valueWritableClass == null) {
|
||||
// pre 1.3.0, we need to use Reflection to get the Writable class
|
||||
getWritableClass[V]()
|
||||
} else {
|
||||
_valueWritableClass
|
||||
}
|
||||
|
||||
private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = {
|
||||
val c = {
|
||||
if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) {
|
||||
|
@ -55,6 +77,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
|
|||
c.asInstanceOf[Class[_ <: Writable]]
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
|
||||
* and value types. If the key or value are Writable, then we use their classes directly;
|
||||
|
@ -65,26 +88,28 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
|
|||
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
|
||||
def anyToWritable[U <% Writable](u: U): Writable = u
|
||||
|
||||
val keyClass = getWritableClass[K]
|
||||
val valueClass = getWritableClass[V]
|
||||
val convertKey = !classOf[Writable].isAssignableFrom(self.keyClass)
|
||||
val convertValue = !classOf[Writable].isAssignableFrom(self.valueClass)
|
||||
// TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and
|
||||
// valueWritableClass at the compile time. To implement that, we need to add type parameters to
|
||||
// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a
|
||||
// breaking change.
|
||||
val convertKey = self.keyClass != keyWritableClass
|
||||
val convertValue = self.valueClass != valueWritableClass
|
||||
|
||||
logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," +
|
||||
valueClass.getSimpleName + ")" )
|
||||
logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +
|
||||
valueWritableClass.getSimpleName + ")" )
|
||||
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
|
||||
val jobConf = new JobConf(self.context.hadoopConfiguration)
|
||||
if (!convertKey && !convertValue) {
|
||||
self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
|
||||
self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
|
||||
} else if (!convertKey && convertValue) {
|
||||
self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
|
||||
path, keyClass, valueClass, format, jobConf, codec)
|
||||
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
|
||||
} else if (convertKey && !convertValue) {
|
||||
self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
|
||||
path, keyClass, valueClass, format, jobConf, codec)
|
||||
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
|
||||
} else if (convertKey && convertValue) {
|
||||
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
|
||||
path, keyClass, valueClass, format, jobConf, codec)
|
||||
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
|
|||
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark.rdd.{NewHadoopRDD, HadoopRDD}
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
|
|
@ -44,13 +44,21 @@ class ImplicitSuite {
|
|||
}
|
||||
|
||||
def testRddToSequenceFileRDDFunctions(): Unit = {
|
||||
// TODO eliminating `import intToIntWritable` needs refactoring SequenceFileRDDFunctions.
|
||||
// That will be a breaking change.
|
||||
import org.apache.spark.SparkContext.intToIntWritable
|
||||
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
|
||||
rdd.saveAsSequenceFile("/a/test/path")
|
||||
}
|
||||
|
||||
def testRddToSequenceFileRDDFunctionsWithWritable(): Unit = {
|
||||
val rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text)]
|
||||
= mockRDD
|
||||
rdd.saveAsSequenceFile("/a/test/path")
|
||||
}
|
||||
|
||||
def testRddToSequenceFileRDDFunctionsWithBytesArray(): Unit = {
|
||||
val rdd: org.apache.spark.rdd.RDD[(Int, Array[Byte])] = mockRDD
|
||||
rdd.saveAsSequenceFile("/a/test/path")
|
||||
}
|
||||
|
||||
def testRddToOrderedRDDFunctions(): Unit = {
|
||||
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
|
||||
rdd.sortByKey()
|
||||
|
|
|
@ -40,7 +40,7 @@ class ShortestPathsSuite extends FunSuite with LocalSparkContext {
|
|||
val graph = Graph.fromEdgeTuples(edges, 1)
|
||||
val landmarks = Seq(1, 4).map(_.toLong)
|
||||
val results = ShortestPaths.run(graph, landmarks).vertices.collect.map {
|
||||
case (v, spMap) => (v, spMap.mapValues(_.get))
|
||||
case (v, spMap) => (v, spMap.mapValues(i => i))
|
||||
}
|
||||
assert(results.toSet === shortestPaths)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue