From fcb4fc653d1a66ff393dde335551ae3059f112c9 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 14 Jan 2014 20:13:55 +0530 Subject: [PATCH] adding clone records field to equivaled java apis --- .../spark/api/java/JavaSparkContext.scala | 116 +++++++++++++++++- 1 file changed, 114 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 7a6f044965..0dd0a111b0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -137,7 +137,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork */ def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits) - /**Get an RDD for a Hadoop SequenceFile with given key and value types. */ + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -148,7 +148,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits)) } - /**Get an RDD for a Hadoop SequenceFile. */ + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ + def sequenceFile[K, V](path: String, + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int, + cloneRecords: Boolean + ): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits, cloneRecords)) + } + + /** Get an RDD for a Hadoop SequenceFile. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): JavaPairRDD[K, V] = { implicit val kcm: ClassTag[K] = ClassTag(keyClass) @@ -156,6 +168,15 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass)) } + /** Get an RDD for a Hadoop SequenceFile. */ + def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], + cloneRecords: Boolean): + JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, cloneRecords)) + } + /** * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and * BytesWritable values that contain a serialized partition. This is still an experimental storage @@ -197,6 +218,37 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits)) } + + /** + * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other + * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable), + * using the older MapReduce API (`org.apache.hadoop.mapred`). + * + * @param conf JobConf for setting up the dataset + * @param inputFormatClass Class of the [[InputFormat]] + * @param keyClass Class of the keys + * @param valueClass Class of the values + * @param minSplits Minimum number of Hadoop Splits to generate. + * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader. + * Most RecordReader implementations reuse wrapper objects across multiple + * records, and can cause problems in RDD collect or aggregation operations. + * By default the records are cloned in Spark. However, application + * programmers can explicitly disable the cloning for better performance. + */ + def hadoopRDD[K, V, F <: InputFormat[K, V]]( + conf: JobConf, + inputFormatClass: Class[F], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int, + cloneRecords: Boolean + ): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits, + cloneRecords)) + } + /** * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, @@ -226,6 +278,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)) } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ + def hadoopFile[K, V, F <: InputFormat[K, V]]( + path: String, + inputFormatClass: Class[F], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int, + cloneRecords: Boolean + ): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, + minSplits, cloneRecords)) + } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ def hadoopFile[K, V, F <: InputFormat[K, V]]( path: String, @@ -239,6 +306,20 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork inputFormatClass, keyClass, valueClass)) } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ + def hadoopFile[K, V, F <: InputFormat[K, V]]( + path: String, + inputFormatClass: Class[F], + keyClass: Class[K], + valueClass: Class[V], + cloneRecords: Boolean + ): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.hadoopFile(path, + inputFormatClass, keyClass, valueClass, cloneRecords = cloneRecords)) + } + /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. @@ -254,6 +335,22 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf)) } + /** + * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat + * and extra configuration options to pass to the input format. + */ + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( + path: String, + fClass: Class[F], + kClass: Class[K], + vClass: Class[V], + conf: Configuration, + cloneRecords: Boolean): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(kClass) + implicit val vcm: ClassTag[V] = ClassTag(vClass) + new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf, cloneRecords)) + } + /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. @@ -268,6 +365,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)) } + /** + * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat + * and extra configuration options to pass to the input format. + */ + def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( + conf: Configuration, + fClass: Class[F], + kClass: Class[K], + vClass: Class[V], + cloneRecords: Boolean): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(kClass) + implicit val vcm: ClassTag[V] = ClassTag(vClass) + new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass, cloneRecords)) + } + /** Build the union of two or more RDDs. */ override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)