[SPARK-21693][R][FOLLOWUP] Reduce shuffle partitions running R worker in few tests to speed up

## What changes were proposed in this pull request?

This is a followup to reduce AppVeyor test time. This PR proposes to reduce the number of shuffle partitions to reduce the tasks running R workers in few particular tests.

The symptom is similar as described in `https://github.com/apache/spark/pull/19722`. There are many R processes newly launched on Windows without forking and it makes the differences of elapsed time between Linux and Windows.

Here is the simple comparison for before/after of this change. I manually tested this by disabling `spark.sparkr.use.daemon`. Disabling it resembles the tests on Windows:

**Before**

<img width="672" alt="2017-11-25 12 22 13" src="https://user-images.githubusercontent.com/6477701/33217949-b5528dfa-d17d-11e7-8050-75675c39eb20.png">

**After**

<img width="682" alt="2017-11-25 12 32 00" src="https://user-images.githubusercontent.com/6477701/33217958-c6518052-d17d-11e7-9f8e-1be21a784559.png">

So, this probably will reduce roughly more than 10 minutes.

## How was this patch tested?

AppVeyor tests

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19816 from HyukjinKwon/SPARK-21693-followup.
This commit is contained in:
hyukjinkwon 2017-11-27 10:09:53 +09:00
parent fba63c1a7b
commit d49d9e4038

View file

@ -3021,41 +3021,54 @@ test_that("dapplyCollect() on DataFrame with a binary column", {
})
test_that("repartition by columns on DataFrame", {
df <- createDataFrame(
list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
c("a", "b", "c", "d"))
# The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
# partitions to reduce the number of the tasks to speed up the test. This is particularly
# slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
conf <- callJMethod(sparkSession, "conf")
shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5")
tryCatch({
df <- createDataFrame(
list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
c("a", "b", "c", "d"))
# no column and number of partitions specified
retError <- tryCatch(repartition(df), error = function(e) e)
expect_equal(grepl
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
# no column and number of partitions specified
retError <- tryCatch(repartition(df), error = function(e) e)
expect_equal(grepl
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
# repartition by column and number of partitions
actual <- repartition(df, 3, col = df$"a")
# repartition by column and number of partitions
actual <- repartition(df, 3, col = df$"a")
# Checking that at least the dimensions are identical
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 3L)
# Checking that at least the dimensions are identical
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 3L)
# repartition by number of partitions
actual <- repartition(df, 13L)
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 13L)
# repartition by number of partitions
actual <- repartition(df, 13L)
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 13L)
expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
# a test case with a column and dapply
schema <- structType(structField("a", "integer"), structField("avg", "double"))
df <- repartition(df, col = df$"a")
df1 <- dapply(
df,
function(x) {
y <- (data.frame(x$a[1], mean(x$b)))
},
schema)
# a test case with a column and dapply
schema <- structType(structField("a", "integer"), structField("avg", "double"))
df <- repartition(df, col = df$"a")
# Number of partitions is equal to 2
expect_equal(nrow(df1), 2)
df1 <- dapply(
df,
function(x) {
y <- (data.frame(x$a[1], mean(x$b)))
},
schema)
# Number of partitions is equal to 2
expect_equal(nrow(df1), 2)
},
finally = {
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
})
})
test_that("coalesce, repartition, numPartitions", {
@ -3078,101 +3091,117 @@ test_that("coalesce, repartition, numPartitions", {
})
test_that("gapply() and gapplyCollect() on a DataFrame", {
df <- createDataFrame(
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
c("a", "b", "c", "d"))
expected <- collect(df)
df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
actual <- collect(df1)
expect_identical(actual, expected)
df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
expect_identical(df1Collect, expected)
# gapply on empty grouping columns.
df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
actual <- collect(df1)
expect_identical(actual, expected)
# Computes the sum of second column by grouping on the first and third columns
# and checks if the sum is larger than 2
schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
"a INT, e BOOLEAN")
for (schema in schemas) {
df2 <- gapply(
df,
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
},
schema)
actual <- collect(df2)$e
expected <- c(TRUE, TRUE)
# The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
# partitions to reduce the number of the tasks to speed up the test. This is particularly
# slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
conf <- callJMethod(sparkSession, "conf")
shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
# TODO: Lower number of 'spark.sql.shuffle.partitions' causes test failures
# for an unknown reason. Probably we should fix it.
callJMethod(conf, "set", "spark.sql.shuffle.partitions", "16")
tryCatch({
df <- createDataFrame(
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
c("a", "b", "c", "d"))
expected <- collect(df)
df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
actual <- collect(df1)
expect_identical(actual, expected)
df2Collect <- gapplyCollect(
df,
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
colnames(y) <- c("a", "e")
y
})
df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
expect_identical(df1Collect, expected)
# gapply on empty grouping columns.
df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
actual <- collect(df1)
expect_identical(actual, expected)
# Computes the sum of second column by grouping on the first and third columns
# and checks if the sum is larger than 2
schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
"a INT, e BOOLEAN")
for (schema in schemas) {
df2 <- gapply(
df,
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
},
schema)
actual <- collect(df2)$e
expected <- c(TRUE, TRUE)
expect_identical(actual, expected)
df2Collect <- gapplyCollect(
df,
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
colnames(y) <- c("a", "e")
y
})
actual <- df2Collect$e
expect_identical(actual, expected)
}
}
# Computes the arithmetic mean of the second column by grouping
# on the first and third columns. Output the groupping value and the average.
schema <- structType(structField("a", "integer"), structField("c", "string"),
structField("avg", "double"))
df3 <- gapply(
df,
c("a", "c"),
function(key, x) {
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
},
schema)
actual <- collect(df3)
actual <- actual[order(actual$a), ]
rownames(actual) <- NULL
expected <- collect(select(df, "a", "b", "c"))
expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
colnames(expected) <- c("a", "c", "avg")
expected <- expected[order(expected$a), ]
rownames(expected) <- NULL
expect_identical(actual, expected)
# Computes the arithmetic mean of the second column by grouping
# on the first and third columns. Output the groupping value and the average.
schema <- structType(structField("a", "integer"), structField("c", "string"),
structField("avg", "double"))
df3 <- gapply(
df,
c("a", "c"),
function(key, x) {
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
},
schema)
actual <- collect(df3)
actual <- actual[order(actual$a), ]
rownames(actual) <- NULL
expected <- collect(select(df, "a", "b", "c"))
expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
colnames(expected) <- c("a", "c", "avg")
expected <- expected[order(expected$a), ]
rownames(expected) <- NULL
expect_identical(actual, expected)
df3Collect <- gapplyCollect(
df,
c("a", "c"),
function(key, x) {
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
colnames(y) <- c("a", "c", "avg")
y
})
actual <- df3Collect[order(df3Collect$a), ]
expect_identical(actual$avg, expected$avg)
df3Collect <- gapplyCollect(
df,
c("a", "c"),
function(key, x) {
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
colnames(y) <- c("a", "c", "avg")
y
})
actual <- df3Collect[order(df3Collect$a), ]
expect_identical(actual$avg, expected$avg)
irisDF <- suppressWarnings(createDataFrame(iris))
schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
# Groups by `Sepal_Length` and computes the average for `Sepal_Width`
df4 <- gapply(
cols = "Sepal_Length",
irisDF,
function(key, x) {
y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
},
schema)
actual <- collect(df4)
actual <- actual[order(actual$Sepal_Length), ]
rownames(actual) <- NULL
agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean),
stringsAsFactors = FALSE)
colnames(agg_local_df) <- c("Sepal_Length", "Avg")
expected <- agg_local_df[order(agg_local_df$Sepal_Length), ]
rownames(expected) <- NULL
expect_identical(actual, expected)
irisDF <- suppressWarnings(createDataFrame(iris))
schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
# Groups by `Sepal_Length` and computes the average for `Sepal_Width`
df4 <- gapply(
cols = "Sepal_Length",
irisDF,
function(key, x) {
y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
},
schema)
actual <- collect(df4)
actual <- actual[order(actual$Sepal_Length), ]
rownames(actual) <- NULL
agg_local_df <- data.frame(aggregate(iris$Sepal.Width,
by = list(iris$Sepal.Length),
FUN = mean),
stringsAsFactors = FALSE)
colnames(agg_local_df) <- c("Sepal_Length", "Avg")
expected <- agg_local_df[order(agg_local_df$Sepal_Length), ]
rownames(expected) <- NULL
expect_identical(actual, expected)
},
finally = {
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
})
})
test_that("Window functions on a DataFrame", {