59e206deb7
This PR pulls in recent changes in SparkR-pkg, including cartesian, intersection, sampleByKey, subtract, subtractByKey, except, and some API for StructType and StructField. Author: cafreeman <cfreeman@alteryx.com> Author: Davies Liu <davies@databricks.com> Author: Zongheng Yang <zongheng.y@gmail.com> Author: Shivaram Venkataraman <shivaram.venkataraman@gmail.com> Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu> Author: Sun Rui <rui.sun@intel.com> Closes #5436 from davies/R3 and squashes the following commits: c2b09be [Davies Liu] SQLTypes -> schema a5a02f2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R3 168b7fe [Davies Liu] sort generics b1fe460 [Davies Liu] fix conflict in README.md e74c04e [Davies Liu] fix schema.R 4f5ac09 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R5 41f8184 [Davies Liu] rm man ae78312 [Davies Liu] Merge pull request #237 from sun-rui/SPARKR-154_3 1bdcb63 [Zongheng Yang] Updates to README.md. 5a553e7 [cafreeman] Use object attribute instead of argument 71372d9 [cafreeman] Update docs and examples 8526d2e71 [cafreeman] Remove `tojson` functions 6ef5f2d [cafreeman] Fix spacing 7741d66 [cafreeman] Rename the SQL DataType function 141efd8 [Shivaram Venkataraman] Merge pull request #245 from hqzizania/upstream 9387402 [Davies Liu] fix style 40199eb [Shivaram Venkataraman] Move except into sorted position 07d0dbc [Sun Rui] [SPARKR-244] Fix test failure after integration of subtract() and subtractByKey() for RDD. 7e8caa3 [Shivaram Venkataraman] Merge pull request #246 from hlin09/fixCombineByKey ed66c81 [cafreeman] Update `subtract` to work with `generics.R` f3ba785 [cafreeman] Fixed duplicate export 275deb4 [cafreeman] Update `NAMESPACE` and tests 1a3b63d [cafreeman] new version of `CreateDF` 836c4bf [cafreeman] Update `createDataFrame` and `toDF` be5d5c1 [cafreeman] refactor schema functions 40338a4 [Zongheng Yang] Merge pull request #244 from sun-rui/SPARKR-154_5 20b97a6 [Zongheng Yang] Merge pull request #234 from hqzizania/assist ba54e34 [Shivaram Venkataraman] Merge pull request #238 from sun-rui/SPARKR-154_4 c9497a3 [Shivaram Venkataraman] Merge pull request #208 from lythesia/master b317aa7 [Zongheng Yang] Merge pull request #243 from hqzizania/master 136a07e [Zongheng Yang] Merge pull request #242 from hqzizania/stats cd66603 [cafreeman] new line at EOF 8b76e81 [Shivaram Venkataraman] Merge pull request #233 from redbaron/fail-early-on-missing-dep 7dd81b7 [cafreeman] Documentation 0e2a94f [cafreeman] Define functions for schema and fields
222 lines
7.7 KiB
R
222 lines
7.7 KiB
R
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
context("partitionBy, groupByKey, reduceByKey etc.")
|
|
|
|
# JavaSparkContext handle
|
|
sc <- sparkR.init()
|
|
|
|
# Data
|
|
intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
|
|
intRdd <- parallelize(sc, intPairs, 2L)
|
|
|
|
doublePairs <- list(list(1.5, -1), list(2.5, 100), list(2.5, 1), list(1.5, 200))
|
|
doubleRdd <- parallelize(sc, doublePairs, 2L)
|
|
|
|
numPairs <- list(list(1L, 100), list(2L, 200), list(4L, -1), list(3L, 1),
|
|
list(3L, 0))
|
|
numPairsRdd <- parallelize(sc, numPairs, length(numPairs))
|
|
|
|
strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge and ",
|
|
"Dexter Morgan: Harry and Dorris Morgan did a wonderful job ")
|
|
strListRDD <- parallelize(sc, strList, 4)
|
|
|
|
test_that("groupByKey for integers", {
|
|
grouped <- groupByKey(intRdd, 2L)
|
|
|
|
actual <- collect(grouped)
|
|
|
|
expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200)))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
})
|
|
|
|
test_that("groupByKey for doubles", {
|
|
grouped <- groupByKey(doubleRdd, 2L)
|
|
|
|
actual <- collect(grouped)
|
|
|
|
expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1)))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
})
|
|
|
|
test_that("reduceByKey for ints", {
|
|
reduced <- reduceByKey(intRdd, "+", 2L)
|
|
|
|
actual <- collect(reduced)
|
|
|
|
expected <- list(list(2L, 101), list(1L, 199))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
})
|
|
|
|
test_that("reduceByKey for doubles", {
|
|
reduced <- reduceByKey(doubleRdd, "+", 2L)
|
|
actual <- collect(reduced)
|
|
|
|
expected <- list(list(1.5, 199), list(2.5, 101))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
})
|
|
|
|
test_that("combineByKey for ints", {
|
|
reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L)
|
|
|
|
actual <- collect(reduced)
|
|
|
|
expected <- list(list(2L, 101), list(1L, 199))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
})
|
|
|
|
test_that("combineByKey for doubles", {
|
|
reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L)
|
|
actual <- collect(reduced)
|
|
|
|
expected <- list(list(1.5, 199), list(2.5, 101))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
})
|
|
|
|
test_that("combineByKey for characters", {
|
|
stringKeyRDD <- parallelize(sc,
|
|
list(list("max", 1L), list("min", 2L),
|
|
list("other", 3L), list("max", 4L)), 2L)
|
|
reduced <- combineByKey(stringKeyRDD,
|
|
function(x) { x }, "+", "+", 2L)
|
|
actual <- collect(reduced)
|
|
|
|
expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
})
|
|
|
|
test_that("aggregateByKey", {
|
|
# test aggregateByKey for int keys
|
|
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
|
|
|
|
zeroValue <- list(0, 0)
|
|
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
|
|
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
|
|
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
|
|
|
|
actual <- collect(aggregatedRDD)
|
|
|
|
expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
|
|
# test aggregateByKey for string keys
|
|
rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
|
|
|
|
zeroValue <- list(0, 0)
|
|
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
|
|
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
|
|
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
|
|
|
|
actual <- collect(aggregatedRDD)
|
|
|
|
expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
})
|
|
|
|
test_that("foldByKey", {
|
|
# test foldByKey for int keys
|
|
folded <- foldByKey(intRdd, 0, "+", 2L)
|
|
|
|
actual <- collect(folded)
|
|
|
|
expected <- list(list(2L, 101), list(1L, 199))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
|
|
# test foldByKey for double keys
|
|
folded <- foldByKey(doubleRdd, 0, "+", 2L)
|
|
|
|
actual <- collect(folded)
|
|
|
|
expected <- list(list(1.5, 199), list(2.5, 101))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
|
|
# test foldByKey for string keys
|
|
stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
|
|
|
|
stringKeyRDD <- parallelize(sc, stringKeyPairs)
|
|
folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
|
|
|
|
actual <- collect(folded)
|
|
|
|
expected <- list(list("b", 101), list("a", 199))
|
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
|
|
|
# test foldByKey for empty pair RDD
|
|
rdd <- parallelize(sc, list())
|
|
folded <- foldByKey(rdd, 0, "+", 2L)
|
|
actual <- collect(folded)
|
|
expected <- list()
|
|
expect_equal(actual, expected)
|
|
|
|
# test foldByKey for RDD with only 1 pair
|
|
rdd <- parallelize(sc, list(list(1, 1)))
|
|
folded <- foldByKey(rdd, 0, "+", 2L)
|
|
actual <- collect(folded)
|
|
expected <- list(list(1, 1))
|
|
expect_equal(actual, expected)
|
|
})
|
|
|
|
test_that("partitionBy() partitions data correctly", {
|
|
# Partition by magnitude
|
|
partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }
|
|
|
|
resultRDD <- partitionBy(numPairsRdd, 2L, partitionByMagnitude)
|
|
|
|
expected_first <- list(list(1, 100), list(2, 200)) # key < 3
|
|
expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key >= 3
|
|
actual_first <- collectPartition(resultRDD, 0L)
|
|
actual_second <- collectPartition(resultRDD, 1L)
|
|
|
|
expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
|
|
expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
|
|
})
|
|
|
|
test_that("partitionBy works with dependencies", {
|
|
kOne <- 1
|
|
partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 }
|
|
|
|
# Partition by parity
|
|
resultRDD <- partitionBy(numPairsRdd, numPartitions = 2L, partitionByParity)
|
|
|
|
# keys even; 100 %% 2 == 0
|
|
expected_first <- list(list(2, 200), list(4, -1))
|
|
# keys odd; 3 %% 2 == 1
|
|
expected_second <- list(list(1, 100), list(3, 1), list(3, 0))
|
|
actual_first <- collectPartition(resultRDD, 0L)
|
|
actual_second <- collectPartition(resultRDD, 1L)
|
|
|
|
expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
|
|
expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
|
|
})
|
|
|
|
test_that("test partitionBy with string keys", {
|
|
words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] })
|
|
wordCount <- lapply(words, function(word) { list(word, 1L) })
|
|
|
|
resultRDD <- partitionBy(wordCount, 2L)
|
|
expected_first <- list(list("Dexter", 1), list("Dexter", 1))
|
|
expected_second <- list(list("and", 1), list("and", 1))
|
|
|
|
actual_first <- Filter(function(item) { item[[1]] == "Dexter" },
|
|
collectPartition(resultRDD, 0L))
|
|
actual_second <- Filter(function(item) { item[[1]] == "and" },
|
|
collectPartition(resultRDD, 1L))
|
|
|
|
expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
|
|
expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
|
|
})
|