_With[Matei]
This commit is contained in:
parent
38454c4aed
commit
80fc8c82ed
|
@ -369,25 +369,25 @@ abstract class RDD[T: ClassManifest](
|
||||||
* additional parameter is produced by constructorOfA, which is called in each
|
* additional parameter is produced by constructorOfA, which is called in each
|
||||||
* partition with the index of that partition.
|
* partition with the index of that partition.
|
||||||
*/
|
*/
|
||||||
def mapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false)
|
def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
|
||||||
(f:(A, T) => U): RDD[U] = {
|
(f:(T, A) => U): RDD[U] = {
|
||||||
def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
|
def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
|
||||||
val a = constructorOfA(index)
|
val a = constructA(index)
|
||||||
iter.map(t => f(a, t))
|
iter.map(t => f(t, a))
|
||||||
}
|
}
|
||||||
new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
|
new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* FlatMaps f over this RDD, where f takes an additional parameter of type A. This
|
* FlatMaps f over this RDD, where f takes an additional parameter of type A. This
|
||||||
* additional parameter is produced by constructorOfA, which is called in each
|
* additional parameter is produced by constructorOfA, which is called in each
|
||||||
* partition with the index of that partition.
|
* partition with the index of that partition.
|
||||||
*/
|
*/
|
||||||
def flatMapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false)
|
def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
|
||||||
(f:(A, T) => Seq[U]): RDD[U] = {
|
(f:(T, A) => Seq[U]): RDD[U] = {
|
||||||
def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
|
def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
|
||||||
val a = constructorOfA(index)
|
val a = constructA(index)
|
||||||
iter.flatMap(t => f(a, t))
|
iter.flatMap(t => f(t, a))
|
||||||
}
|
}
|
||||||
new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
|
new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
|
||||||
}
|
}
|
||||||
|
@ -397,11 +397,11 @@ abstract class RDD[T: ClassManifest](
|
||||||
* This additional parameter is produced by constructorOfA, which is called in each
|
* This additional parameter is produced by constructorOfA, which is called in each
|
||||||
* partition with the index of that partition.
|
* partition with the index of that partition.
|
||||||
*/
|
*/
|
||||||
def foreachWith[A: ClassManifest](constructorOfA: Int => A)
|
def foreachWith[A: ClassManifest](constructA: Int => A)
|
||||||
(f:(A, T) => Unit) {
|
(f:(T, A) => Unit) {
|
||||||
def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
|
def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
|
||||||
val a = constructorOfA(index)
|
val a = constructA(index)
|
||||||
iter.map(t => {f(a, t); t})
|
iter.map(t => {f(t, a); t})
|
||||||
}
|
}
|
||||||
(new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
|
(new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
|
||||||
}
|
}
|
||||||
|
@ -411,11 +411,11 @@ abstract class RDD[T: ClassManifest](
|
||||||
* additional parameter is produced by constructorOfA, which is called in each
|
* additional parameter is produced by constructorOfA, which is called in each
|
||||||
* partition with the index of that partition.
|
* partition with the index of that partition.
|
||||||
*/
|
*/
|
||||||
def filterWith[A: ClassManifest](constructorOfA: Int => A)
|
def filterWith[A: ClassManifest](constructA: Int => A)
|
||||||
(p:(A, T) => Boolean): RDD[T] = {
|
(p:(T, A) => Boolean): RDD[T] = {
|
||||||
def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
|
def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
|
||||||
val a = constructorOfA(index)
|
val a = constructA(index)
|
||||||
iter.filter(t => p(a, t))
|
iter.filter(t => p(t, a))
|
||||||
}
|
}
|
||||||
new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
|
new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,7 +185,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
|
||||||
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
|
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
|
||||||
val randoms = ones.mapWith(
|
val randoms = ones.mapWith(
|
||||||
(index: Int) => new Random(index + 42))
|
(index: Int) => new Random(index + 42))
|
||||||
{(prng: Random, t: Int) => prng.nextDouble * t}.collect()
|
{(t: Int, prng: Random) => prng.nextDouble * t}.collect()
|
||||||
val prn42_3 = {
|
val prn42_3 = {
|
||||||
val prng42 = new Random(42)
|
val prng42 = new Random(42)
|
||||||
prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
|
prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
|
||||||
|
@ -204,7 +204,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
|
||||||
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
|
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
|
||||||
val randoms = ones.flatMapWith(
|
val randoms = ones.flatMapWith(
|
||||||
(index: Int) => new Random(index + 42))
|
(index: Int) => new Random(index + 42))
|
||||||
{(prng: Random, t: Int) =>
|
{(t: Int, prng: Random) =>
|
||||||
val random = prng.nextDouble()
|
val random = prng.nextDouble()
|
||||||
Seq(random * t, random * t * 10)}.
|
Seq(random * t, random * t * 10)}.
|
||||||
collect()
|
collect()
|
||||||
|
@ -226,7 +226,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
|
||||||
val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
|
val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
|
||||||
val sample = ints.filterWith(
|
val sample = ints.filterWith(
|
||||||
(index: Int) => new Random(index + 42))
|
(index: Int) => new Random(index + 42))
|
||||||
{(prng: Random, t: Int) => prng.nextInt(3) == 0}.
|
{(t: Int, prng: Random) => prng.nextInt(3) == 0}.
|
||||||
collect()
|
collect()
|
||||||
val checkSample = {
|
val checkSample = {
|
||||||
val prng42 = new Random(42)
|
val prng42 = new Random(42)
|
||||||
|
|
Loading…
Reference in a new issue