From d49d9e40383209eed9584a4ef2c3964f27f4a08f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 27 Nov 2017 10:09:53 +0900 Subject: [PATCH] [SPARK-21693][R][FOLLOWUP] Reduce shuffle partitions running R worker in few tests to speed up ## What changes were proposed in this pull request? This is a followup to reduce AppVeyor test time. This PR proposes to reduce the number of shuffle partitions to reduce the tasks running R workers in few particular tests. The symptom is similar as described in `https://github.com/apache/spark/pull/19722`. There are many R processes newly launched on Windows without forking and it makes the differences of elapsed time between Linux and Windows. Here is the simple comparison for before/after of this change. I manually tested this by disabling `spark.sparkr.use.daemon`. Disabling it resembles the tests on Windows: **Before** 2017-11-25 12 22 13 **After** 2017-11-25 12 32 00 So, this probably will reduce roughly more than 10 minutes. ## How was this patch tested? AppVeyor tests Author: hyukjinkwon Closes #19816 from HyukjinKwon/SPARK-21693-followup. --- R/pkg/tests/fulltests/test_sparkSQL.R | 261 ++++++++++++++------------ 1 file changed, 145 insertions(+), 116 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 00217c892f..d87f5d2705 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -3021,41 +3021,54 @@ test_that("dapplyCollect() on DataFrame with a binary column", { }) test_that("repartition by columns on DataFrame", { - df <- createDataFrame( - list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)), - c("a", "b", "c", "d")) + # The tasks here launch R workers with shuffles. So, we decrease the number of shuffle + # partitions to reduce the number of the tasks to speed up the test. This is particularly + # slow on Windows because the R workers are unable to be forked. See also SPARK-21693. + conf <- callJMethod(sparkSession, "conf") + shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions") + callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5") + tryCatch({ + df <- createDataFrame( + list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)), + c("a", "b", "c", "d")) - # no column and number of partitions specified - retError <- tryCatch(repartition(df), error = function(e) e) - expect_equal(grepl - ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE) + # no column and number of partitions specified + retError <- tryCatch(repartition(df), error = function(e) e) + expect_equal(grepl + ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE) - # repartition by column and number of partitions - actual <- repartition(df, 3, col = df$"a") + # repartition by column and number of partitions + actual <- repartition(df, 3, col = df$"a") - # Checking that at least the dimensions are identical - expect_identical(dim(df), dim(actual)) - expect_equal(getNumPartitions(actual), 3L) + # Checking that at least the dimensions are identical + expect_identical(dim(df), dim(actual)) + expect_equal(getNumPartitions(actual), 3L) - # repartition by number of partitions - actual <- repartition(df, 13L) - expect_identical(dim(df), dim(actual)) - expect_equal(getNumPartitions(actual), 13L) + # repartition by number of partitions + actual <- repartition(df, 13L) + expect_identical(dim(df), dim(actual)) + expect_equal(getNumPartitions(actual), 13L) - expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L) + expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L) - # a test case with a column and dapply - schema <- structType(structField("a", "integer"), structField("avg", "double")) - df <- repartition(df, col = df$"a") - df1 <- dapply( - df, - function(x) { - y <- (data.frame(x$a[1], mean(x$b))) - }, - schema) + # a test case with a column and dapply + schema <- structType(structField("a", "integer"), structField("avg", "double")) + df <- repartition(df, col = df$"a") - # Number of partitions is equal to 2 - expect_equal(nrow(df1), 2) + df1 <- dapply( + df, + function(x) { + y <- (data.frame(x$a[1], mean(x$b))) + }, + schema) + + # Number of partitions is equal to 2 + expect_equal(nrow(df1), 2) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue) + }) }) test_that("coalesce, repartition, numPartitions", { @@ -3078,101 +3091,117 @@ test_that("coalesce, repartition, numPartitions", { }) test_that("gapply() and gapplyCollect() on a DataFrame", { - df <- createDataFrame( - list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), - c("a", "b", "c", "d")) - expected <- collect(df) - df1 <- gapply(df, "a", function(key, x) { x }, schema(df)) - actual <- collect(df1) - expect_identical(actual, expected) - - df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x }) - expect_identical(df1Collect, expected) - - # gapply on empty grouping columns. - df1 <- gapply(df, c(), function(key, x) { x }, schema(df)) - actual <- collect(df1) - expect_identical(actual, expected) - - # Computes the sum of second column by grouping on the first and third columns - # and checks if the sum is larger than 2 - schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")), - "a INT, e BOOLEAN") - for (schema in schemas) { - df2 <- gapply( - df, - c(df$"a", df$"c"), - function(key, x) { - y <- data.frame(key[1], sum(x$b) > 2) - }, - schema) - actual <- collect(df2)$e - expected <- c(TRUE, TRUE) + # The tasks here launch R workers with shuffles. So, we decrease the number of shuffle + # partitions to reduce the number of the tasks to speed up the test. This is particularly + # slow on Windows because the R workers are unable to be forked. See also SPARK-21693. + conf <- callJMethod(sparkSession, "conf") + shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions") + # TODO: Lower number of 'spark.sql.shuffle.partitions' causes test failures + # for an unknown reason. Probably we should fix it. + callJMethod(conf, "set", "spark.sql.shuffle.partitions", "16") + tryCatch({ + df <- createDataFrame( + list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), + c("a", "b", "c", "d")) + expected <- collect(df) + df1 <- gapply(df, "a", function(key, x) { x }, schema(df)) + actual <- collect(df1) expect_identical(actual, expected) - df2Collect <- gapplyCollect( - df, - c(df$"a", df$"c"), - function(key, x) { - y <- data.frame(key[1], sum(x$b) > 2) - colnames(y) <- c("a", "e") - y - }) + df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x }) + expect_identical(df1Collect, expected) + + # gapply on empty grouping columns. + df1 <- gapply(df, c(), function(key, x) { x }, schema(df)) + actual <- collect(df1) + expect_identical(actual, expected) + + # Computes the sum of second column by grouping on the first and third columns + # and checks if the sum is larger than 2 + schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")), + "a INT, e BOOLEAN") + for (schema in schemas) { + df2 <- gapply( + df, + c(df$"a", df$"c"), + function(key, x) { + y <- data.frame(key[1], sum(x$b) > 2) + }, + schema) + actual <- collect(df2)$e + expected <- c(TRUE, TRUE) + expect_identical(actual, expected) + + df2Collect <- gapplyCollect( + df, + c(df$"a", df$"c"), + function(key, x) { + y <- data.frame(key[1], sum(x$b) > 2) + colnames(y) <- c("a", "e") + y + }) actual <- df2Collect$e expect_identical(actual, expected) - } + } - # Computes the arithmetic mean of the second column by grouping - # on the first and third columns. Output the groupping value and the average. - schema <- structType(structField("a", "integer"), structField("c", "string"), - structField("avg", "double")) - df3 <- gapply( - df, - c("a", "c"), - function(key, x) { - y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) - }, - schema) - actual <- collect(df3) - actual <- actual[order(actual$a), ] - rownames(actual) <- NULL - expected <- collect(select(df, "a", "b", "c")) - expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean)) - colnames(expected) <- c("a", "c", "avg") - expected <- expected[order(expected$a), ] - rownames(expected) <- NULL - expect_identical(actual, expected) + # Computes the arithmetic mean of the second column by grouping + # on the first and third columns. Output the groupping value and the average. + schema <- structType(structField("a", "integer"), structField("c", "string"), + structField("avg", "double")) + df3 <- gapply( + df, + c("a", "c"), + function(key, x) { + y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) + }, + schema) + actual <- collect(df3) + actual <- actual[order(actual$a), ] + rownames(actual) <- NULL + expected <- collect(select(df, "a", "b", "c")) + expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean)) + colnames(expected) <- c("a", "c", "avg") + expected <- expected[order(expected$a), ] + rownames(expected) <- NULL + expect_identical(actual, expected) - df3Collect <- gapplyCollect( - df, - c("a", "c"), - function(key, x) { - y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) - colnames(y) <- c("a", "c", "avg") - y - }) - actual <- df3Collect[order(df3Collect$a), ] - expect_identical(actual$avg, expected$avg) + df3Collect <- gapplyCollect( + df, + c("a", "c"), + function(key, x) { + y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) + colnames(y) <- c("a", "c", "avg") + y + }) + actual <- df3Collect[order(df3Collect$a), ] + expect_identical(actual$avg, expected$avg) - irisDF <- suppressWarnings(createDataFrame(iris)) - schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double")) - # Groups by `Sepal_Length` and computes the average for `Sepal_Width` - df4 <- gapply( - cols = "Sepal_Length", - irisDF, - function(key, x) { - y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE) - }, - schema) - actual <- collect(df4) - actual <- actual[order(actual$Sepal_Length), ] - rownames(actual) <- NULL - agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean), - stringsAsFactors = FALSE) - colnames(agg_local_df) <- c("Sepal_Length", "Avg") - expected <- agg_local_df[order(agg_local_df$Sepal_Length), ] - rownames(expected) <- NULL - expect_identical(actual, expected) + irisDF <- suppressWarnings(createDataFrame(iris)) + schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double")) + # Groups by `Sepal_Length` and computes the average for `Sepal_Width` + df4 <- gapply( + cols = "Sepal_Length", + irisDF, + function(key, x) { + y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE) + }, + schema) + actual <- collect(df4) + actual <- actual[order(actual$Sepal_Length), ] + rownames(actual) <- NULL + agg_local_df <- data.frame(aggregate(iris$Sepal.Width, + by = list(iris$Sepal.Length), + FUN = mean), + stringsAsFactors = FALSE) + colnames(agg_local_df) <- c("Sepal_Length", "Avg") + expected <- agg_local_df[order(agg_local_df$Sepal_Length), ] + rownames(expected) <- NULL + expect_identical(actual, expected) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue) + }) }) test_that("Window functions on a DataFrame", {