[SPARK-8452] [SPARKR] expose jobGroup API in SparkR

This pull request adds following methods to SparkR:

```R
setJobGroup()
cancelJobGroup()
clearJobGroup()
```
For each method, the spark context is passed as the first argument. There does not seem to be a good way to test these in R.

cc shivaram and davies

Author: Hossein <hossein@databricks.com>

Closes #6889 from falaki/SPARK-8452 and squashes the following commits:

9ce9f1e [Hossein] Added basic tests to verify methods can be called and won't throw errors
c706af9 [Hossein] Added examples
a2c19af [Hossein] taking spark context as first argument
343ca77 [Hossein] Added setJobGroup, cancelJobGroup and clearJobGroup to SparkR

(cherry picked from commit 1fa29c2df2)
Signed-off-by: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
This commit is contained in:
Hossein 2015-06-19 15:47:22 -07:00 committed by Shivaram Venkataraman
parent 9ac8393663
commit 1a6b510784
3 changed files with 56 additions and 0 deletions

View file

@ -10,6 +10,11 @@ export("sparkR.init")
export("sparkR.stop") export("sparkR.stop")
export("print.jobj") export("print.jobj")
# Job group lifecycle management methods
export("setJobGroup",
"clearJobGroup",
"cancelJobGroup")
exportClasses("DataFrame") exportClasses("DataFrame")
exportMethods("arrange", exportMethods("arrange",

View file

@ -278,3 +278,47 @@ sparkRHive.init <- function(jsc = NULL) {
assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv) assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
hiveCtx hiveCtx
} }
#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
#' different value or cleared.
#'
#' @param sc existing spark context
#' @param groupid the ID to be assigned to job groups
#' @param description description for the the job group ID
#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE)
#'}
setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
}
#' Clear current job group ID and its description
#'
#' @param sc existing spark context
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' clearJobGroup(sc)
#'}
clearJobGroup <- function(sc) {
callJMethod(sc, "clearJobGroup")
}
#' Cancel active jobs for the specified group
#'
#' @param sc existing spark context
#' @param groupId the ID of job group to be cancelled
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' cancelJobGroup(sc, "myJobGroup")
#'}
cancelJobGroup <- function(sc, groupId) {
callJMethod(sc, "cancelJobGroup", groupId)
}

View file

@ -48,3 +48,10 @@ test_that("rdd GC across sparkR.stop", {
count(rdd3) count(rdd3)
count(rdd4) count(rdd4)
}) })
test_that("job group functions can be called", {
sc <- sparkR.init()
setJobGroup(sc, "groupId", "job description", TRUE)
cancelJobGroup(sc, "groupId")
clearJobGroup(sc)
})