|
|
|
@ -38,7 +38,7 @@ import org.apache.spark.util.Utils
|
|
|
|
|
|
|
|
|
|
class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
test("aggregateByKey") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 1), (1, 1), (3, 2), (5, 1), (5, 3)), 2)
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 1), (1, 1), (3, 2), (5, 1), (5, 3)), 2)
|
|
|
|
|
|
|
|
|
|
val sets = pairs.aggregateByKey(new HashSet[Int]())(_ += _, _ ++= _).collect()
|
|
|
|
|
assert(sets.size === 3)
|
|
|
|
@ -51,7 +51,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("groupByKey") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1)))
|
|
|
|
|
val groups = pairs.groupByKey().collect()
|
|
|
|
|
assert(groups.size === 2)
|
|
|
|
|
val valuesFor1 = groups.find(_._1 == 1).get._2
|
|
|
|
@ -61,7 +61,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("groupByKey with duplicates") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val groups = pairs.groupByKey().collect()
|
|
|
|
|
assert(groups.size === 2)
|
|
|
|
|
val valuesFor1 = groups.find(_._1 == 1).get._2
|
|
|
|
@ -71,7 +71,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("groupByKey with negative key hash codes") {
|
|
|
|
|
val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((-1, 1), (-1, 2), (-1, 3), (2, 1)))
|
|
|
|
|
val groups = pairs.groupByKey().collect()
|
|
|
|
|
assert(groups.size === 2)
|
|
|
|
|
val valuesForMinus1 = groups.find(_._1 == -1).get._2
|
|
|
|
@ -81,7 +81,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("groupByKey with many output partitions") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1)))
|
|
|
|
|
val groups = pairs.groupByKey(10).collect()
|
|
|
|
|
assert(groups.size === 2)
|
|
|
|
|
val valuesFor1 = groups.find(_._1 == 1).get._2
|
|
|
|
@ -170,13 +170,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("reduceByKey") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val sums = pairs.reduceByKey(_ + _).collect()
|
|
|
|
|
assert(sums.toSet === Set((1, 7), (2, 1)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("reduceByKey with collectAsMap") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val sums = pairs.reduceByKey(_ + _).collectAsMap()
|
|
|
|
|
assert(sums.size === 2)
|
|
|
|
|
assert(sums(1) === 7)
|
|
|
|
@ -184,7 +184,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("reduceByKey with many output partitions") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val sums = pairs.reduceByKey(_ + _, 10).collect()
|
|
|
|
|
assert(sums.toSet === Set((1, 7), (2, 1)))
|
|
|
|
|
}
|
|
|
|
@ -194,7 +194,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
def numPartitions = 2
|
|
|
|
|
def getPartition(key: Any) = key.asInstanceOf[Int]
|
|
|
|
|
}
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
|
|
|
|
|
val sums = pairs.reduceByKey(_ + _)
|
|
|
|
|
assert(sums.collect().toSet === Set((1, 4), (0, 1)))
|
|
|
|
|
assert(sums.partitioner === Some(p))
|
|
|
|
@ -246,8 +246,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("join") {
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val joined = rdd1.join(rdd2).collect()
|
|
|
|
|
assert(joined.size === 4)
|
|
|
|
|
assert(joined.toSet === Set(
|
|
|
|
@ -259,8 +259,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("join all-to-all") {
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (1, 3)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 'x'), (1, 'y')))
|
|
|
|
|
val joined = rdd1.join(rdd2).collect()
|
|
|
|
|
assert(joined.size === 6)
|
|
|
|
|
assert(joined.toSet === Set(
|
|
|
|
@ -274,8 +274,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("leftOuterJoin") {
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val joined = rdd1.leftOuterJoin(rdd2).collect()
|
|
|
|
|
assert(joined.size === 5)
|
|
|
|
|
assert(joined.toSet === Set(
|
|
|
|
@ -292,7 +292,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
import scala.reflect.classTag
|
|
|
|
|
val intPairCT = classTag[(Int, Int)]
|
|
|
|
|
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.emptyRDD[(Int, Int)](intPairCT)
|
|
|
|
|
|
|
|
|
|
val joined = rdd1.cogroup(rdd2).collect()
|
|
|
|
@ -304,7 +304,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
import scala.reflect.classTag
|
|
|
|
|
val intCT = classTag[Int]
|
|
|
|
|
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.emptyRDD[Int](intCT).groupBy((x) => 5)
|
|
|
|
|
val joined = rdd1.cogroup(rdd2).collect()
|
|
|
|
|
assert(joined.size > 0)
|
|
|
|
@ -315,7 +315,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
"with an order of magnitude difference in number of partitions") {
|
|
|
|
|
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000)
|
|
|
|
|
val rdd2 = sc
|
|
|
|
|
.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
.partitionBy(new HashPartitioner(10))
|
|
|
|
|
val joined = rdd1.cogroup(rdd2)
|
|
|
|
|
assert(joined.getNumPartitions == rdd1.getNumPartitions)
|
|
|
|
@ -325,7 +325,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
test("cogroup between multiple RDD with number of partitions similar in order of magnitude") {
|
|
|
|
|
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
|
|
|
|
|
val rdd2 = sc
|
|
|
|
|
.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
.partitionBy(new HashPartitioner(10))
|
|
|
|
|
val joined = rdd1.cogroup(rdd2)
|
|
|
|
|
assert(joined.getNumPartitions == rdd2.getNumPartitions)
|
|
|
|
@ -336,7 +336,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
try {
|
|
|
|
|
sc.conf.set("spark.default.parallelism", "4")
|
|
|
|
|
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)), 10)
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)), 10)
|
|
|
|
|
val joined = rdd1.cogroup(rdd2)
|
|
|
|
|
assert(joined.getNumPartitions == sc.defaultParallelism)
|
|
|
|
|
} finally {
|
|
|
|
@ -349,7 +349,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
try {
|
|
|
|
|
sc.conf.set("spark.default.parallelism", "4")
|
|
|
|
|
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
.partitionBy(new HashPartitioner(10))
|
|
|
|
|
val joined = rdd1.cogroup(rdd2)
|
|
|
|
|
assert(joined.getNumPartitions == rdd2.getNumPartitions)
|
|
|
|
@ -364,7 +364,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
try {
|
|
|
|
|
sc.conf.set("spark.default.parallelism", "4")
|
|
|
|
|
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000)
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
.partitionBy(new HashPartitioner(10))
|
|
|
|
|
val joined = rdd1.cogroup(rdd2)
|
|
|
|
|
assert(joined.getNumPartitions == rdd2.getNumPartitions)
|
|
|
|
@ -374,8 +374,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("rightOuterJoin") {
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val joined = rdd1.rightOuterJoin(rdd2).collect()
|
|
|
|
|
assert(joined.size === 5)
|
|
|
|
|
assert(joined.toSet === Set(
|
|
|
|
@ -388,8 +388,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("fullOuterJoin") {
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val joined = rdd1.fullOuterJoin(rdd2).collect()
|
|
|
|
|
assert(joined.size === 6)
|
|
|
|
|
assert(joined.toSet === Set(
|
|
|
|
@ -403,15 +403,15 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("join with no matches") {
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
|
|
|
|
|
val joined = rdd1.join(rdd2).collect()
|
|
|
|
|
assert(joined.size === 0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("join with many output partitions") {
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val joined = rdd1.join(rdd2, 10).collect()
|
|
|
|
|
assert(joined.size === 4)
|
|
|
|
|
assert(joined.toSet === Set(
|
|
|
|
@ -423,8 +423,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("groupWith") {
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val joined = rdd1.groupWith(rdd2).collect()
|
|
|
|
|
assert(joined.size === 4)
|
|
|
|
|
val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList))).toSet
|
|
|
|
@ -437,9 +437,9 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("groupWith3") {
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd')))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val rdd3 = sc.parallelize(Seq((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd')))
|
|
|
|
|
val joined = rdd1.groupWith(rdd2, rdd3).collect()
|
|
|
|
|
assert(joined.size === 4)
|
|
|
|
|
val joinedSet = joined.map(x => (x._1,
|
|
|
|
@ -453,10 +453,10 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("groupWith4") {
|
|
|
|
|
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd')))
|
|
|
|
|
val rdd4 = sc.parallelize(Array((2, '@')))
|
|
|
|
|
val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
|
|
|
|
|
val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
|
|
|
|
val rdd3 = sc.parallelize(Seq((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd')))
|
|
|
|
|
val rdd4 = sc.parallelize(Seq((2, '@')))
|
|
|
|
|
val joined = rdd1.groupWith(rdd2, rdd3, rdd4).collect()
|
|
|
|
|
assert(joined.size === 4)
|
|
|
|
|
val joinedSet = joined.map(x => (x._1,
|
|
|
|
@ -480,7 +480,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("keys and values") {
|
|
|
|
|
val rdd = sc.parallelize(Array((1, "a"), (2, "b")))
|
|
|
|
|
val rdd = sc.parallelize(Seq((1, "a"), (2, "b")))
|
|
|
|
|
assert(rdd.keys.collect().toList === List(1, 2))
|
|
|
|
|
assert(rdd.values.collect().toList === List("a", "b"))
|
|
|
|
|
}
|
|
|
|
@ -517,9 +517,9 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
def getPartition(key: Any) = key.asInstanceOf[Int]
|
|
|
|
|
}
|
|
|
|
|
// partitionBy so we have a narrow dependency
|
|
|
|
|
val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
|
|
|
|
|
val a = sc.parallelize(Seq((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
|
|
|
|
|
// more partitions/no partitioner so a shuffle dependency
|
|
|
|
|
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
|
|
|
|
|
val b = sc.parallelize(Seq((2, "b"), (3, "cc"), (4, "d")), 4)
|
|
|
|
|
val c = a.subtract(b)
|
|
|
|
|
assert(c.collect().toSet === Set((1, "a"), (3, "c")))
|
|
|
|
|
// Ideally we could keep the original partitioner...
|
|
|
|
@ -527,8 +527,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("subtractByKey") {
|
|
|
|
|
val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2)
|
|
|
|
|
val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
|
|
|
|
|
val a = sc.parallelize(Seq((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2)
|
|
|
|
|
val b = sc.parallelize(Seq((2, 20), (3, 30), (4, 40)), 4)
|
|
|
|
|
val c = a.subtractByKey(b)
|
|
|
|
|
assert(c.collect().toSet === Set((1, "a"), (1, "a")))
|
|
|
|
|
assert(c.partitions.size === a.partitions.size)
|
|
|
|
@ -541,22 +541,22 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
def getPartition(key: Any) = key.asInstanceOf[Int]
|
|
|
|
|
}
|
|
|
|
|
// partitionBy so we have a narrow dependency
|
|
|
|
|
val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
|
|
|
|
|
val a = sc.parallelize(Seq((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
|
|
|
|
|
// more partitions/no partitioner so a shuffle dependency
|
|
|
|
|
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
|
|
|
|
|
val b = sc.parallelize(Seq((2, "b"), (3, "cc"), (4, "d")), 4)
|
|
|
|
|
val c = a.subtractByKey(b)
|
|
|
|
|
assert(c.collect().toSet === Set((1, "a"), (1, "a")))
|
|
|
|
|
assert(c.partitioner.get === p)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("foldByKey") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val sums = pairs.foldByKey(0)(_ + _).collect()
|
|
|
|
|
assert(sums.toSet === Set((1, 7), (2, 1)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("foldByKey with mutable result type") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
|
|
|
|
val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
|
|
|
|
|
// Fold the values using in-place mutation
|
|
|
|
|
val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
|
|
|
|
@ -571,7 +571,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("saveNewAPIHadoopFile should call setConf if format is configurable") {
|
|
|
|
|
val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(1))))
|
|
|
|
|
val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(1))))
|
|
|
|
|
|
|
|
|
|
// No error, non-configurable formats still work
|
|
|
|
|
pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
|
|
|
|
@ -587,7 +587,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
|
|
|
|
|
test("The JobId on the driver and executors should be the same during the commit") {
|
|
|
|
|
// Create more than one rdd to mimic stageId not equal to rddId
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2)
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 2), (2, 3)), 2)
|
|
|
|
|
.map { p => (Integer.valueOf(p._1 + 1), Integer.valueOf(p._2 + 1)) }
|
|
|
|
|
.filter { p => p._1 > 0 }
|
|
|
|
|
pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored")
|
|
|
|
@ -595,7 +595,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("saveAsHadoopFile should respect configured output committers") {
|
|
|
|
|
val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(1))))
|
|
|
|
|
val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(1))))
|
|
|
|
|
val conf = new JobConf()
|
|
|
|
|
conf.setOutputCommitter(classOf[FakeOutputCommitter])
|
|
|
|
|
|
|
|
|
@ -607,7 +607,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("failure callbacks should be called before calling writer.close() in saveNewAPIHadoopFile") {
|
|
|
|
|
val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(2))), 1)
|
|
|
|
|
val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(2))), 1)
|
|
|
|
|
|
|
|
|
|
FakeWriterWithCallback.calledBy = ""
|
|
|
|
|
FakeWriterWithCallback.exception = null
|
|
|
|
@ -622,7 +622,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("failure callbacks should be called before calling writer.close() in saveAsHadoopFile") {
|
|
|
|
|
val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(2))), 1)
|
|
|
|
|
val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(2))), 1)
|
|
|
|
|
val conf = new JobConf()
|
|
|
|
|
|
|
|
|
|
FakeWriterWithCallback.calledBy = ""
|
|
|
|
@ -640,7 +640,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
|
|
|
|
|
test("saveAsNewAPIHadoopDataset should support invalid output paths when " +
|
|
|
|
|
"there are no files to be committed to an absolute output location") {
|
|
|
|
|
val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(2))), 1)
|
|
|
|
|
val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(2))), 1)
|
|
|
|
|
|
|
|
|
|
def saveRddWithPath(path: String): Unit = {
|
|
|
|
|
val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
|
|
|
|
@ -668,7 +668,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
// for non-null invalid paths.
|
|
|
|
|
test("saveAsHadoopDataset should respect empty output directory when " +
|
|
|
|
|
"there are no files to be committed to an absolute output location") {
|
|
|
|
|
val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(2))), 1)
|
|
|
|
|
val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(2))), 1)
|
|
|
|
|
|
|
|
|
|
val conf = new JobConf()
|
|
|
|
|
conf.setOutputKeyClass(classOf[Integer])
|
|
|
|
@ -683,7 +683,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("lookup") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 2), (3, 4), (5, 6), (5, 7)))
|
|
|
|
|
|
|
|
|
|
assert(pairs.partitioner === None)
|
|
|
|
|
assert(pairs.lookup(1) === Seq(2))
|
|
|
|
@ -693,7 +693,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("lookup with partitioner") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 2), (3, 4), (5, 6), (5, 7)))
|
|
|
|
|
|
|
|
|
|
val p = new Partitioner {
|
|
|
|
|
def numPartitions: Int = 2
|
|
|
|
@ -709,7 +709,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
test("lookup with bad partitioner") {
|
|
|
|
|
val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7)))
|
|
|
|
|
val pairs = sc.parallelize(Seq((1, 2), (3, 4), (5, 6), (5, 7)))
|
|
|
|
|
|
|
|
|
|
val p = new Partitioner {
|
|
|
|
|
def numPartitions: Int = 2
|
|
|
|
|