spark-instrumented-optimizer/R/pkg/inst/tests/test_shuffle.R
cafreeman 59e206deb7 [SPARK-6807] [SparkR] Merge recent SparkR-pkg changes
This PR pulls in recent changes in SparkR-pkg, including

cartesian, intersection, sampleByKey, subtract, subtractByKey, except, and some API for StructType and StructField.

Author: cafreeman <cfreeman@alteryx.com>
Author: Davies Liu <davies@databricks.com>
Author: Zongheng Yang <zongheng.y@gmail.com>
Author: Shivaram Venkataraman <shivaram.venkataraman@gmail.com>
Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Author: Sun Rui <rui.sun@intel.com>

Closes #5436 from davies/R3 and squashes the following commits:

c2b09be [Davies Liu] SQLTypes -> schema
a5a02f2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R3
168b7fe [Davies Liu] sort generics
b1fe460 [Davies Liu] fix conflict in README.md
e74c04e [Davies Liu] fix schema.R
4f5ac09 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R5
41f8184 [Davies Liu] rm man
ae78312 [Davies Liu] Merge pull request #237 from sun-rui/SPARKR-154_3
1bdcb63 [Zongheng Yang] Updates to README.md.
5a553e7 [cafreeman] Use object attribute instead of argument
71372d9 [cafreeman] Update docs and examples
8526d2e71 [cafreeman] Remove `tojson` functions
6ef5f2d [cafreeman] Fix spacing
7741d66 [cafreeman] Rename the SQL DataType function
141efd8 [Shivaram Venkataraman] Merge pull request #245 from hqzizania/upstream
9387402 [Davies Liu] fix style
40199eb [Shivaram Venkataraman] Move except into sorted position
07d0dbc [Sun Rui] [SPARKR-244] Fix test failure after integration of subtract() and subtractByKey() for RDD.
7e8caa3 [Shivaram Venkataraman] Merge pull request #246 from hlin09/fixCombineByKey
ed66c81 [cafreeman] Update `subtract` to work with `generics.R`
f3ba785 [cafreeman] Fixed duplicate export
275deb4 [cafreeman] Update `NAMESPACE` and tests
1a3b63d [cafreeman] new version of `CreateDF`
836c4bf [cafreeman] Update `createDataFrame` and `toDF`
be5d5c1 [cafreeman] refactor schema functions
40338a4 [Zongheng Yang] Merge pull request #244 from sun-rui/SPARKR-154_5
20b97a6 [Zongheng Yang] Merge pull request #234 from hqzizania/assist
ba54e34 [Shivaram Venkataraman] Merge pull request #238 from sun-rui/SPARKR-154_4
c9497a3 [Shivaram Venkataraman] Merge pull request #208 from lythesia/master
b317aa7 [Zongheng Yang] Merge pull request #243 from hqzizania/master
136a07e [Zongheng Yang] Merge pull request #242 from hqzizania/stats
cd66603 [cafreeman] new line at EOF
8b76e81 [Shivaram Venkataraman] Merge pull request #233 from redbaron/fail-early-on-missing-dep
7dd81b7 [cafreeman] Documentation
0e2a94f [cafreeman] Define functions for schema and fields
2015-04-17 13:42:19 -07:00

222 lines
7.7 KiB
R

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
context("partitionBy, groupByKey, reduceByKey etc.")
# JavaSparkContext handle
sc <- sparkR.init()
# Data
intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
intRdd <- parallelize(sc, intPairs, 2L)
doublePairs <- list(list(1.5, -1), list(2.5, 100), list(2.5, 1), list(1.5, 200))
doubleRdd <- parallelize(sc, doublePairs, 2L)
numPairs <- list(list(1L, 100), list(2L, 200), list(4L, -1), list(3L, 1),
list(3L, 0))
numPairsRdd <- parallelize(sc, numPairs, length(numPairs))
strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge and ",
"Dexter Morgan: Harry and Dorris Morgan did a wonderful job ")
strListRDD <- parallelize(sc, strList, 4)
test_that("groupByKey for integers", {
grouped <- groupByKey(intRdd, 2L)
actual <- collect(grouped)
expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})
test_that("groupByKey for doubles", {
grouped <- groupByKey(doubleRdd, 2L)
actual <- collect(grouped)
expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})
test_that("reduceByKey for ints", {
reduced <- reduceByKey(intRdd, "+", 2L)
actual <- collect(reduced)
expected <- list(list(2L, 101), list(1L, 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})
test_that("reduceByKey for doubles", {
reduced <- reduceByKey(doubleRdd, "+", 2L)
actual <- collect(reduced)
expected <- list(list(1.5, 199), list(2.5, 101))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})
test_that("combineByKey for ints", {
reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L)
actual <- collect(reduced)
expected <- list(list(2L, 101), list(1L, 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})
test_that("combineByKey for doubles", {
reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L)
actual <- collect(reduced)
expected <- list(list(1.5, 199), list(2.5, 101))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})
test_that("combineByKey for characters", {
stringKeyRDD <- parallelize(sc,
list(list("max", 1L), list("min", 2L),
list("other", 3L), list("max", 4L)), 2L)
reduced <- combineByKey(stringKeyRDD,
function(x) { x }, "+", "+", 2L)
actual <- collect(reduced)
expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})
test_that("aggregateByKey", {
# test aggregateByKey for int keys
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
zeroValue <- list(0, 0)
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
actual <- collect(aggregatedRDD)
expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
# test aggregateByKey for string keys
rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
zeroValue <- list(0, 0)
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
actual <- collect(aggregatedRDD)
expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})
test_that("foldByKey", {
# test foldByKey for int keys
folded <- foldByKey(intRdd, 0, "+", 2L)
actual <- collect(folded)
expected <- list(list(2L, 101), list(1L, 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
# test foldByKey for double keys
folded <- foldByKey(doubleRdd, 0, "+", 2L)
actual <- collect(folded)
expected <- list(list(1.5, 199), list(2.5, 101))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
# test foldByKey for string keys
stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
stringKeyRDD <- parallelize(sc, stringKeyPairs)
folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
actual <- collect(folded)
expected <- list(list("b", 101), list("a", 199))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
# test foldByKey for empty pair RDD
rdd <- parallelize(sc, list())
folded <- foldByKey(rdd, 0, "+", 2L)
actual <- collect(folded)
expected <- list()
expect_equal(actual, expected)
# test foldByKey for RDD with only 1 pair
rdd <- parallelize(sc, list(list(1, 1)))
folded <- foldByKey(rdd, 0, "+", 2L)
actual <- collect(folded)
expected <- list(list(1, 1))
expect_equal(actual, expected)
})
test_that("partitionBy() partitions data correctly", {
# Partition by magnitude
partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }
resultRDD <- partitionBy(numPairsRdd, 2L, partitionByMagnitude)
expected_first <- list(list(1, 100), list(2, 200)) # key < 3
expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key >= 3
actual_first <- collectPartition(resultRDD, 0L)
actual_second <- collectPartition(resultRDD, 1L)
expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
})
test_that("partitionBy works with dependencies", {
kOne <- 1
partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 }
# Partition by parity
resultRDD <- partitionBy(numPairsRdd, numPartitions = 2L, partitionByParity)
# keys even; 100 %% 2 == 0
expected_first <- list(list(2, 200), list(4, -1))
# keys odd; 3 %% 2 == 1
expected_second <- list(list(1, 100), list(3, 1), list(3, 0))
actual_first <- collectPartition(resultRDD, 0L)
actual_second <- collectPartition(resultRDD, 1L)
expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
})
test_that("test partitionBy with string keys", {
words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] })
wordCount <- lapply(words, function(word) { list(word, 1L) })
resultRDD <- partitionBy(wordCount, 2L)
expected_first <- list(list("Dexter", 1), list("Dexter", 1))
expected_second <- list(list("and", 1), list("and", 1))
actual_first <- Filter(function(item) { item[[1]] == "Dexter" },
collectPartition(resultRDD, 0L))
actual_second <- Filter(function(item) { item[[1]] == "and" },
collectPartition(resultRDD, 1L))
expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
})