[SPARK-7482] [SPARKR] Rename some DataFrame API methods in SparkR to match their counterparts in Scala.
Author: Sun Rui <rui.sun@intel.com> Closes #6007 from sun-rui/SPARK-7482 and squashes the following commits: 5c5cf5e [Sun Rui] Implement alias loadDF() as a new function. 3a30c10 [Sun Rui] Rename load()/save() to read.df()/write.df(). Also add loadDF()/saveDF() as aliases. 9f569d6 [Sun Rui] [SPARK-7482][SparkR] Rename some DataFrame API methods in SparkR to match their counterparts in Scala.
This commit is contained in:
parent
208b902257
commit
df9b94a57c
|
@ -37,7 +37,7 @@ exportMethods("arrange",
|
|||
"registerTempTable",
|
||||
"rename",
|
||||
"repartition",
|
||||
"sampleDF",
|
||||
"sample",
|
||||
"sample_frac",
|
||||
"saveAsParquetFile",
|
||||
"saveAsTable",
|
||||
|
@ -53,7 +53,8 @@ exportMethods("arrange",
|
|||
"unpersist",
|
||||
"where",
|
||||
"withColumn",
|
||||
"withColumnRenamed")
|
||||
"withColumnRenamed",
|
||||
"write.df")
|
||||
|
||||
exportClasses("Column")
|
||||
|
||||
|
@ -101,6 +102,7 @@ export("cacheTable",
|
|||
"jsonFile",
|
||||
"loadDF",
|
||||
"parquetFile",
|
||||
"read.df",
|
||||
"sql",
|
||||
"table",
|
||||
"tableNames",
|
||||
|
|
|
@ -294,8 +294,8 @@ setMethod("registerTempTable",
|
|||
#'\dontrun{
|
||||
#' sc <- sparkR.init()
|
||||
#' sqlCtx <- sparkRSQL.init(sc)
|
||||
#' df <- loadDF(sqlCtx, path, "parquet")
|
||||
#' df2 <- loadDF(sqlCtx, path2, "parquet")
|
||||
#' df <- read.df(sqlCtx, path, "parquet")
|
||||
#' df2 <- read.df(sqlCtx, path2, "parquet")
|
||||
#' registerTempTable(df, "table1")
|
||||
#' insertInto(df2, "table1", overwrite = TRUE)
|
||||
#'}
|
||||
|
@ -473,14 +473,14 @@ setMethod("distinct",
|
|||
dataFrame(sdf)
|
||||
})
|
||||
|
||||
#' SampleDF
|
||||
#' Sample
|
||||
#'
|
||||
#' Return a sampled subset of this DataFrame using a random seed.
|
||||
#'
|
||||
#' @param x A SparkSQL DataFrame
|
||||
#' @param withReplacement Sampling with replacement or not
|
||||
#' @param fraction The (rough) sample target fraction
|
||||
#' @rdname sampleDF
|
||||
#' @rdname sample
|
||||
#' @aliases sample_frac
|
||||
#' @export
|
||||
#' @examples
|
||||
|
@ -489,10 +489,10 @@ setMethod("distinct",
|
|||
#' sqlCtx <- sparkRSQL.init(sc)
|
||||
#' path <- "path/to/file.json"
|
||||
#' df <- jsonFile(sqlCtx, path)
|
||||
#' collect(sampleDF(df, FALSE, 0.5))
|
||||
#' collect(sampleDF(df, TRUE, 0.5))
|
||||
#' collect(sample(df, FALSE, 0.5))
|
||||
#' collect(sample(df, TRUE, 0.5))
|
||||
#'}
|
||||
setMethod("sampleDF",
|
||||
setMethod("sample",
|
||||
# TODO : Figure out how to send integer as java.lang.Long to JVM so
|
||||
# we can send seed as an argument through callJMethod
|
||||
signature(x = "DataFrame", withReplacement = "logical",
|
||||
|
@ -503,13 +503,13 @@ setMethod("sampleDF",
|
|||
dataFrame(sdf)
|
||||
})
|
||||
|
||||
#' @rdname sampleDF
|
||||
#' @aliases sampleDF
|
||||
#' @rdname sample
|
||||
#' @aliases sample
|
||||
setMethod("sample_frac",
|
||||
signature(x = "DataFrame", withReplacement = "logical",
|
||||
fraction = "numeric"),
|
||||
function(x, withReplacement, fraction) {
|
||||
sampleDF(x, withReplacement, fraction)
|
||||
sample(x, withReplacement, fraction)
|
||||
})
|
||||
|
||||
#' Count
|
||||
|
@ -1303,7 +1303,7 @@ setMethod("except",
|
|||
#' @param source A name for external data source
|
||||
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
|
||||
#'
|
||||
#' @rdname saveAsTable
|
||||
#' @rdname write.df
|
||||
#' @export
|
||||
#' @examples
|
||||
#'\dontrun{
|
||||
|
@ -1311,9 +1311,9 @@ setMethod("except",
|
|||
#' sqlCtx <- sparkRSQL.init(sc)
|
||||
#' path <- "path/to/file.json"
|
||||
#' df <- jsonFile(sqlCtx, path)
|
||||
#' saveAsTable(df, "myfile")
|
||||
#' write.df(df, "myfile", "parquet", "overwrite")
|
||||
#' }
|
||||
setMethod("saveDF",
|
||||
setMethod("write.df",
|
||||
signature(df = "DataFrame", path = 'character', source = 'character',
|
||||
mode = 'character'),
|
||||
function(df, path = NULL, source = NULL, mode = "append", ...){
|
||||
|
@ -1334,6 +1334,15 @@ setMethod("saveDF",
|
|||
callJMethod(df@sdf, "save", source, jmode, options)
|
||||
})
|
||||
|
||||
#' @rdname write.df
|
||||
#' @aliases saveDF
|
||||
#' @export
|
||||
setMethod("saveDF",
|
||||
signature(df = "DataFrame", path = 'character', source = 'character',
|
||||
mode = 'character'),
|
||||
function(df, path = NULL, source = NULL, mode = "append", ...){
|
||||
write.df(df, path, source, mode, ...)
|
||||
})
|
||||
|
||||
#' saveAsTable
|
||||
#'
|
||||
|
|
|
@ -927,7 +927,7 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
|
|||
MAXINT)))))
|
||||
|
||||
# TODO(zongheng): investigate if this call is an in-place shuffle?
|
||||
sample(samples)[1:total]
|
||||
base::sample(samples)[1:total]
|
||||
})
|
||||
|
||||
# Creates tuples of the elements in this RDD by applying a function.
|
||||
|
@ -996,7 +996,7 @@ setMethod("coalesce",
|
|||
if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
|
||||
func <- function(partIndex, part) {
|
||||
set.seed(partIndex) # partIndex as seed
|
||||
start <- as.integer(sample(numPartitions, 1) - 1)
|
||||
start <- as.integer(base::sample(numPartitions, 1) - 1)
|
||||
lapply(seq_along(part),
|
||||
function(i) {
|
||||
pos <- (start + i) %% numPartitions
|
||||
|
|
|
@ -421,7 +421,7 @@ clearCache <- function(sqlCtx) {
|
|||
#' \dontrun{
|
||||
#' sc <- sparkR.init()
|
||||
#' sqlCtx <- sparkRSQL.init(sc)
|
||||
#' df <- loadDF(sqlCtx, path, "parquet")
|
||||
#' df <- read.df(sqlCtx, path, "parquet")
|
||||
#' registerTempTable(df, "table")
|
||||
#' dropTempTable(sqlCtx, "table")
|
||||
#' }
|
||||
|
@ -450,10 +450,10 @@ dropTempTable <- function(sqlCtx, tableName) {
|
|||
#'\dontrun{
|
||||
#' sc <- sparkR.init()
|
||||
#' sqlCtx <- sparkRSQL.init(sc)
|
||||
#' df <- load(sqlCtx, "path/to/file.json", source = "json")
|
||||
#' df <- read.df(sqlCtx, "path/to/file.json", source = "json")
|
||||
#' }
|
||||
|
||||
loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
|
||||
read.df <- function(sqlCtx, path = NULL, source = NULL, ...) {
|
||||
options <- varargsToEnv(...)
|
||||
if (!is.null(path)) {
|
||||
options[['path']] <- path
|
||||
|
@ -462,6 +462,13 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
|
|||
dataFrame(sdf)
|
||||
}
|
||||
|
||||
#' @aliases loadDF
|
||||
#' @export
|
||||
|
||||
loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
|
||||
read.df(sqlCtx, path, source, ...)
|
||||
}
|
||||
|
||||
#' Create an external table
|
||||
#'
|
||||
#' Creates an external table based on the dataset in a data source,
|
||||
|
|
|
@ -456,19 +456,19 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
|
|||
#' @export
|
||||
setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") })
|
||||
|
||||
#' @rdname sampleDF
|
||||
#' @rdname sample
|
||||
#' @export
|
||||
setGeneric("sample",
|
||||
function(x, withReplacement, fraction, seed) {
|
||||
standardGeneric("sample")
|
||||
})
|
||||
|
||||
#' @rdname sample
|
||||
#' @export
|
||||
setGeneric("sample_frac",
|
||||
function(x, withReplacement, fraction, seed) {
|
||||
standardGeneric("sample_frac")
|
||||
})
|
||||
|
||||
#' @rdname sampleDF
|
||||
#' @export
|
||||
setGeneric("sampleDF",
|
||||
function(x, withReplacement, fraction, seed) {
|
||||
standardGeneric("sampleDF")
|
||||
})
|
||||
})
|
||||
|
||||
#' @rdname saveAsParquetFile
|
||||
#' @export
|
||||
|
@ -480,7 +480,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
|
|||
standardGeneric("saveAsTable")
|
||||
})
|
||||
|
||||
#' @rdname saveAsTable
|
||||
#' @rdname write.df
|
||||
#' @export
|
||||
setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })
|
||||
|
||||
#' @rdname write.df
|
||||
#' @export
|
||||
setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })
|
||||
|
||||
|
|
|
@ -209,18 +209,18 @@ test_that("registerTempTable() results in a queryable table and sql() results in
|
|||
})
|
||||
|
||||
test_that("insertInto() on a registered table", {
|
||||
df <- loadDF(sqlCtx, jsonPath, "json")
|
||||
saveDF(df, parquetPath, "parquet", "overwrite")
|
||||
dfParquet <- loadDF(sqlCtx, parquetPath, "parquet")
|
||||
df <- read.df(sqlCtx, jsonPath, "json")
|
||||
write.df(df, parquetPath, "parquet", "overwrite")
|
||||
dfParquet <- read.df(sqlCtx, parquetPath, "parquet")
|
||||
|
||||
lines <- c("{\"name\":\"Bob\", \"age\":24}",
|
||||
"{\"name\":\"James\", \"age\":35}")
|
||||
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
|
||||
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
|
||||
writeLines(lines, jsonPath2)
|
||||
df2 <- loadDF(sqlCtx, jsonPath2, "json")
|
||||
saveDF(df2, parquetPath2, "parquet", "overwrite")
|
||||
dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet")
|
||||
df2 <- read.df(sqlCtx, jsonPath2, "json")
|
||||
write.df(df2, parquetPath2, "parquet", "overwrite")
|
||||
dfParquet2 <- read.df(sqlCtx, parquetPath2, "parquet")
|
||||
|
||||
registerTempTable(dfParquet, "table1")
|
||||
insertInto(dfParquet2, "table1")
|
||||
|
@ -421,12 +421,12 @@ test_that("distinct() on DataFrames", {
|
|||
expect_true(count(uniques) == 3)
|
||||
})
|
||||
|
||||
test_that("sampleDF on a DataFrame", {
|
||||
test_that("sample on a DataFrame", {
|
||||
df <- jsonFile(sqlCtx, jsonPath)
|
||||
sampled <- sampleDF(df, FALSE, 1.0)
|
||||
sampled <- sample(df, FALSE, 1.0)
|
||||
expect_equal(nrow(collect(sampled)), count(df))
|
||||
expect_true(inherits(sampled, "DataFrame"))
|
||||
sampled2 <- sampleDF(df, FALSE, 0.1)
|
||||
sampled2 <- sample(df, FALSE, 0.1)
|
||||
expect_true(count(sampled2) < 3)
|
||||
|
||||
# Also test sample_frac
|
||||
|
@ -491,16 +491,16 @@ test_that("column calculation", {
|
|||
expect_true(count(df2) == 3)
|
||||
})
|
||||
|
||||
test_that("load() from json file", {
|
||||
df <- loadDF(sqlCtx, jsonPath, "json")
|
||||
test_that("read.df() from json file", {
|
||||
df <- read.df(sqlCtx, jsonPath, "json")
|
||||
expect_true(inherits(df, "DataFrame"))
|
||||
expect_true(count(df) == 3)
|
||||
})
|
||||
|
||||
test_that("save() as parquet file", {
|
||||
df <- loadDF(sqlCtx, jsonPath, "json")
|
||||
saveDF(df, parquetPath, "parquet", mode="overwrite")
|
||||
df2 <- loadDF(sqlCtx, parquetPath, "parquet")
|
||||
test_that("write.df() as parquet file", {
|
||||
df <- read.df(sqlCtx, jsonPath, "json")
|
||||
write.df(df, parquetPath, "parquet", mode="overwrite")
|
||||
df2 <- read.df(sqlCtx, parquetPath, "parquet")
|
||||
expect_true(inherits(df2, "DataFrame"))
|
||||
expect_true(count(df2) == 3)
|
||||
})
|
||||
|
@ -670,7 +670,7 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", {
|
|||
"{\"name\":\"James\", \"age\":35}")
|
||||
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
|
||||
writeLines(lines, jsonPath2)
|
||||
df2 <- loadDF(sqlCtx, jsonPath2, "json")
|
||||
df2 <- read.df(sqlCtx, jsonPath2, "json")
|
||||
|
||||
unioned <- arrange(unionAll(df, df2), df$age)
|
||||
expect_true(inherits(unioned, "DataFrame"))
|
||||
|
@ -712,9 +712,9 @@ test_that("mutate() and rename()", {
|
|||
expect_true(columns(newDF2)[1] == "newerAge")
|
||||
})
|
||||
|
||||
test_that("saveDF() on DataFrame and works with parquetFile", {
|
||||
test_that("write.df() on DataFrame and works with parquetFile", {
|
||||
df <- jsonFile(sqlCtx, jsonPath)
|
||||
saveDF(df, parquetPath, "parquet", mode="overwrite")
|
||||
write.df(df, parquetPath, "parquet", mode="overwrite")
|
||||
parquetDF <- parquetFile(sqlCtx, parquetPath)
|
||||
expect_true(inherits(parquetDF, "DataFrame"))
|
||||
expect_equal(count(df), count(parquetDF))
|
||||
|
@ -722,9 +722,9 @@ test_that("saveDF() on DataFrame and works with parquetFile", {
|
|||
|
||||
test_that("parquetFile works with multiple input paths", {
|
||||
df <- jsonFile(sqlCtx, jsonPath)
|
||||
saveDF(df, parquetPath, "parquet", mode="overwrite")
|
||||
write.df(df, parquetPath, "parquet", mode="overwrite")
|
||||
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
|
||||
saveDF(df, parquetPath2, "parquet", mode="overwrite")
|
||||
write.df(df, parquetPath2, "parquet", mode="overwrite")
|
||||
parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2)
|
||||
expect_true(inherits(parquetDF, "DataFrame"))
|
||||
expect_true(count(parquetDF) == count(df)*2)
|
||||
|
|
Loading…
Reference in a new issue