spark-instrumented-optimizer/R/pkg/inst/tests/testthat/test_sparkSQL.R
Reynold Xin 890abd1279 [SPARK-14869][SQL] Don't mask exceptions in ResolveRelations
## What changes were proposed in this pull request?
In order to support running SQL directly on files, we added some code in ResolveRelations to catch the exception thrown by catalog.lookupRelation and ignore it. This unfortunately masks all the exceptions. This patch changes the logic to simply test the table's existence.

## How was this patch tested?
I manually hacked some bugs into Spark and made sure the exceptions were being propagated up.

Author: Reynold Xin <rxin@databricks.com>

Closes #12634 from rxin/SPARK-14869.
2016-04-23 12:49:36 -07:00

1978 lines
74 KiB
R

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(testthat)
context("SparkSQL functions")
# Utility function for easily checking the values of a StructField
checkStructField <- function(actual, expectedName, expectedType, expectedNullable) {
expect_equal(class(actual), "structField")
expect_equal(actual$name(), expectedName)
expect_equal(actual$dataType.toString(), expectedType)
expect_equal(actual$nullable(), expectedNullable)
}
markUtf8 <- function(s) {
Encoding(s) <- "UTF-8"
s
}
# Tests for SparkSQL functions in SparkR
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
mockLines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}")
jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
writeLines(mockLines, jsonPath)
# For test nafunctions, like dropna(), fillna(),...
mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
"{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
"{\"name\":\"David\",\"age\":60,\"height\":null}",
"{\"name\":\"Amy\",\"age\":null,\"height\":null}",
"{\"name\":null,\"age\":null,\"height\":null}")
jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesNa, jsonPathNa)
# For test complex types in DataFrame
mockLinesComplexType <-
c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}",
"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}",
"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}")
complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesComplexType, complexTypeJsonPath)
test_that("calling sparkRSQL.init returns existing SQL context", {
expect_equal(sparkRSQL.init(sc), sqlContext)
})
test_that("infer types and check types", {
expect_equal(infer_type(1L), "integer")
expect_equal(infer_type(1.0), "double")
expect_equal(infer_type("abc"), "string")
expect_equal(infer_type(TRUE), "boolean")
expect_equal(infer_type(as.Date("2015-03-11")), "date")
expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp")
expect_equal(infer_type(c(1L, 2L)), "array<integer>")
expect_equal(infer_type(list(1L, 2L)), "array<integer>")
expect_equal(infer_type(listToStruct(list(a = 1L, b = "2"))), "struct<a:integer,b:string>")
e <- new.env()
assign("a", 1L, envir = e)
expect_equal(infer_type(e), "map<string,integer>")
expect_error(checkType("map<integer,integer>"), "Key type in a map must be string or character")
expect_equal(infer_type(as.raw(c(1, 2, 3))), "binary")
})
test_that("structType and structField", {
testField <- structField("a", "string")
expect_is(testField, "structField")
expect_equal(testField$name(), "a")
expect_true(testField$nullable())
testSchema <- structType(testField, structField("b", "integer"))
expect_is(testSchema, "structType")
expect_is(testSchema$fields()[[2]], "structField")
expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType")
})
test_that("create DataFrame from RDD", {
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
dfAsDF <- as.DataFrame(sqlContext, rdd, list("a", "b"))
expect_is(df, "SparkDataFrame")
expect_is(dfAsDF, "SparkDataFrame")
expect_equal(count(df), 10)
expect_equal(count(dfAsDF), 10)
expect_equal(nrow(df), 10)
expect_equal(nrow(dfAsDF), 10)
expect_equal(ncol(df), 2)
expect_equal(ncol(dfAsDF), 2)
expect_equal(dim(df), c(10, 2))
expect_equal(dim(dfAsDF), c(10, 2))
expect_equal(columns(df), c("a", "b"))
expect_equal(columns(dfAsDF), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
expect_equal(dtypes(dfAsDF), list(c("a", "int"), c("b", "string")))
df <- createDataFrame(sqlContext, rdd)
dfAsDF <- as.DataFrame(sqlContext, rdd)
expect_is(df, "SparkDataFrame")
expect_is(dfAsDF, "SparkDataFrame")
expect_equal(columns(df), c("_1", "_2"))
expect_equal(columns(dfAsDF), c("_1", "_2"))
schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
structField(x = "b", type = "string", nullable = TRUE))
df <- createDataFrame(sqlContext, rdd, schema)
expect_is(df, "SparkDataFrame")
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) })
df <- createDataFrame(sqlContext, rdd)
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
df <- read.df(sqlContext, jsonPathNa, "json", schema)
df2 <- createDataFrame(sqlContext, toRDD(df), schema)
df2AsDF <- as.DataFrame(sqlContext, toRDD(df), schema)
expect_equal(columns(df2), c("name", "age", "height"))
expect_equal(columns(df2AsDF), c("name", "age", "height"))
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(dtypes(df2AsDF), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(as.list(collect(where(df2, df2$name == "Bob"))),
list(name = "Bob", age = 16, height = 176.5))
expect_equal(as.list(collect(where(df2AsDF, df2AsDF$name == "Bob"))),
list(name = "Bob", age = 16, height = 176.5))
localDF <- data.frame(name = c("John", "Smith", "Sarah"),
age = c(19L, 23L, 18L),
height = c(176.5, 181.4, 173.7))
df <- createDataFrame(sqlContext, localDF, schema)
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
expect_equal(columns(df), c("name", "age", "height"))
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(as.list(collect(where(df, df$name == "John"))),
list(name = "John", age = 19L, height = 176.5))
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
},
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
df <- read.df(hiveCtx, jsonPathNa, "json", schema)
invisible(insertInto(df, "people"))
expect_equal(collect(sql(hiveCtx, "SELECT age from people WHERE name = 'Bob'"))$age,
c(16))
expect_equal(collect(sql(hiveCtx, "SELECT height from people WHERE name ='Bob'"))$height,
c(176.5))
})
test_that("convert NAs to null type in DataFrames", {
rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
expect_true(is.na(collect(df)[2, "a"]))
expect_equal(collect(df)[2, "b"], 4L)
l <- data.frame(x = 1L, y = c(1L, NA_integer_, 3L))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(df)[2, "x"], 1L)
expect_true(is.na(collect(df)[2, "y"]))
rdd <- parallelize(sc, list(list(1, 2), list(NA, 4)))
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
expect_true(is.na(collect(df)[2, "a"]))
expect_equal(collect(df)[2, "b"], 4)
l <- data.frame(x = 1, y = c(1, NA_real_, 3))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(df)[2, "x"], 1)
expect_true(is.na(collect(df)[2, "y"]))
l <- list("a", "b", NA, "d")
df <- createDataFrame(sqlContext, l)
expect_true(is.na(collect(df)[3, "_1"]))
expect_equal(collect(df)[4, "_1"], "d")
l <- list("a", "b", NA_character_, "d")
df <- createDataFrame(sqlContext, l)
expect_true(is.na(collect(df)[3, "_1"]))
expect_equal(collect(df)[4, "_1"], "d")
l <- list(TRUE, FALSE, NA, TRUE)
df <- createDataFrame(sqlContext, l)
expect_true(is.na(collect(df)[3, "_1"]))
expect_equal(collect(df)[4, "_1"], TRUE)
})
test_that("toDF", {
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
df <- toDF(rdd, list("a", "b"))
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
df <- toDF(rdd)
expect_is(df, "SparkDataFrame")
expect_equal(columns(df), c("_1", "_2"))
schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
structField(x = "b", type = "string", nullable = TRUE))
df <- toDF(rdd, schema)
expect_is(df, "SparkDataFrame")
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) })
df <- toDF(rdd)
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
})
test_that("create DataFrame from list or data.frame", {
l <- list(list(1, 2), list(3, 4))
df <- createDataFrame(sqlContext, l, c("a", "b"))
expect_equal(columns(df), c("a", "b"))
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(sqlContext, l)
expect_equal(columns(df), c("a", "b"))
a <- 1:3
b <- c("a", "b", "c")
ldf <- data.frame(a, b)
df <- createDataFrame(sqlContext, ldf)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
expect_equal(count(df), 3)
ldf2 <- collect(df)
expect_equal(ldf$a, ldf2$a)
irisdf <- suppressWarnings(createDataFrame(sqlContext, iris))
iris_collected <- collect(irisdf)
expect_equivalent(iris_collected[, -5], iris[, -5])
expect_equal(iris_collected$Species, as.character(iris$Species))
mtcarsdf <- createDataFrame(sqlContext, mtcars)
expect_equivalent(collect(mtcarsdf), mtcars)
bytes <- as.raw(c(1, 2, 3))
df <- createDataFrame(sqlContext, list(list(bytes)))
expect_equal(collect(df)[[1]][[1]], bytes)
})
test_that("create DataFrame with different data types", {
l <- list(a = 1L, b = 2, c = TRUE, d = "ss", e = as.Date("2012-12-13"),
f = as.POSIXct("2015-03-15 12:13:14.056"))
df <- createDataFrame(sqlContext, list(l))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "double"), c("c", "boolean"),
c("d", "string"), c("e", "date"), c("f", "timestamp")))
expect_equal(count(df), 1)
expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
})
test_that("create DataFrame with complex types", {
e <- new.env()
assign("n", 3L, envir = e)
s <- listToStruct(list(a = "aa", b = 3L))
l <- list(as.list(1:10), list("a", "b"), e, s)
df <- createDataFrame(sqlContext, list(l), c("a", "b", "c", "d"))
expect_equal(dtypes(df), list(c("a", "array<int>"),
c("b", "array<string>"),
c("c", "map<string,int>"),
c("d", "struct<a:string,b:int>")))
expect_equal(count(df), 1)
ldf <- collect(df)
expect_equal(names(ldf), c("a", "b", "c", "d"))
expect_equal(ldf[1, 1][[1]], l[[1]])
expect_equal(ldf[1, 2][[1]], l[[2]])
e <- ldf$c[[1]]
expect_equal(class(e), "environment")
expect_equal(ls(e), "n")
expect_equal(e$n, 3L)
s <- ldf$d[[1]]
expect_equal(class(s), "struct")
expect_equal(s$a, "aa")
expect_equal(s$b, 3L)
})
test_that("create DataFrame from a data.frame with complex types", {
ldf <- data.frame(row.names = 1:2)
ldf$a_list <- list(list(1, 2), list(3, 4))
ldf$an_envir <- c(as.environment(list(a = 1, b = 2)), as.environment(list(c = 3)))
sdf <- createDataFrame(sqlContext, ldf)
collected <- collect(sdf)
expect_identical(ldf[, 1, FALSE], collected[, 1, FALSE])
expect_equal(ldf$an_envir, collected$an_envir)
})
# For test map type and struct type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)
test_that("Collect DataFrame with complex types", {
# ArrayType
df <- read.json(sqlContext, complexTypeJsonPath)
ldf <- collect(df)
expect_equal(nrow(ldf), 3)
expect_equal(ncol(ldf), 3)
expect_equal(names(ldf), c("c1", "c2", "c3"))
expect_equal(ldf$c1, list(list(1, 2, 3), list(4, 5, 6), list (7, 8, 9)))
expect_equal(ldf$c2, list(list("a", "b", "c"), list("d", "e", "f"), list ("g", "h", "i")))
expect_equal(ldf$c3, list(list(1.0, 2.0, 3.0), list(4.0, 5.0, 6.0), list (7.0, 8.0, 9.0)))
# MapType
schema <- structType(structField("name", "string"),
structField("info", "map<string,double>"))
df <- read.df(sqlContext, mapTypeJsonPath, "json", schema)
expect_equal(dtypes(df), list(c("name", "string"),
c("info", "map<string,double>")))
ldf <- collect(df)
expect_equal(nrow(ldf), 3)
expect_equal(ncol(ldf), 2)
expect_equal(names(ldf), c("name", "info"))
expect_equal(ldf$name, c("Bob", "Alice", "David"))
bob <- ldf$info[[1]]
expect_equal(class(bob), "environment")
expect_equal(bob$age, 16)
expect_equal(bob$height, 176.5)
# StructType
df <- read.json(sqlContext, mapTypeJsonPath)
expect_equal(dtypes(df), list(c("info", "struct<age:bigint,height:double>"),
c("name", "string")))
ldf <- collect(df)
expect_equal(nrow(ldf), 3)
expect_equal(ncol(ldf), 2)
expect_equal(names(ldf), c("info", "name"))
expect_equal(ldf$name, c("Bob", "Alice", "David"))
bob <- ldf$info[[1]]
expect_equal(class(bob), "struct")
expect_equal(bob$age, 16)
expect_equal(bob$height, 176.5)
})
test_that("read/write json files", {
# Test read.df
df <- read.df(sqlContext, jsonPath, "json")
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
# Test read.df with a user defined schema
schema <- structType(structField("name", type = "string"),
structField("age", type = "double"))
df1 <- read.df(sqlContext, jsonPath, "json", schema)
expect_is(df1, "SparkDataFrame")
expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
# Test loadDF
df2 <- loadDF(sqlContext, jsonPath, "json", schema)
expect_is(df2, "SparkDataFrame")
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
# Test read.json
df <- read.json(sqlContext, jsonPath)
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
# Test write.df
jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".json")
write.df(df, jsonPath2, "json", mode = "overwrite")
# Test write.json
jsonPath3 <- tempfile(pattern = "jsonPath3", fileext = ".json")
write.json(df, jsonPath3)
# Test read.json()/jsonFile() works with multiple input paths
jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3))
expect_is(jsonDF1, "SparkDataFrame")
expect_equal(count(jsonDF1), 6)
# Suppress warnings because jsonFile is deprecated
jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3)))
expect_is(jsonDF2, "SparkDataFrame")
expect_equal(count(jsonDF2), 6)
unlink(jsonPath2)
unlink(jsonPath3)
})
test_that("jsonRDD() on a RDD with json string", {
rdd <- parallelize(sc, mockLines)
expect_equal(count(rdd), 3)
df <- suppressWarnings(jsonRDD(sqlContext, rdd))
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
rdd2 <- flatMap(rdd, function(x) c(x, x))
df <- suppressWarnings(jsonRDD(sqlContext, rdd2))
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 6)
})
test_that("test cache, uncache and clearCache", {
df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
cacheTable(sqlContext, "table1")
uncacheTable(sqlContext, "table1")
clearCache(sqlContext)
dropTempTable(sqlContext, "table1")
})
test_that("test tableNames and tables", {
df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
expect_equal(length(tableNames(sqlContext)), 1)
df <- tables(sqlContext)
expect_equal(count(df), 1)
dropTempTable(sqlContext, "table1")
})
test_that("registerTempTable() results in a queryable table and sql() results in a new DataFrame", {
df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
newdf <- sql(sqlContext, "SELECT * FROM table1 where name = 'Michael'")
expect_is(newdf, "SparkDataFrame")
expect_equal(count(newdf), 1)
dropTempTable(sqlContext, "table1")
})
test_that("insertInto() on a registered table", {
df <- read.df(sqlContext, jsonPath, "json")
write.df(df, parquetPath, "parquet", "overwrite")
dfParquet <- read.df(sqlContext, parquetPath, "parquet")
lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".tmp")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
writeLines(lines, jsonPath2)
df2 <- read.df(sqlContext, jsonPath2, "json")
write.df(df2, parquetPath2, "parquet", "overwrite")
dfParquet2 <- read.df(sqlContext, parquetPath2, "parquet")
registerTempTable(dfParquet, "table1")
insertInto(dfParquet2, "table1")
expect_equal(count(sql(sqlContext, "select * from table1")), 5)
expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Michael")
dropTempTable(sqlContext, "table1")
registerTempTable(dfParquet, "table1")
insertInto(dfParquet2, "table1", overwrite = TRUE)
expect_equal(count(sql(sqlContext, "select * from table1")), 2)
expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob")
dropTempTable(sqlContext, "table1")
unlink(jsonPath2)
unlink(parquetPath2)
})
test_that("tableToDF() returns a new DataFrame", {
df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
tabledf <- tableToDF(sqlContext, "table1")
expect_is(tabledf, "SparkDataFrame")
expect_equal(count(tabledf), 3)
tabledf2 <- tableToDF(sqlContext, "table1")
expect_equal(count(tabledf2), 3)
dropTempTable(sqlContext, "table1")
})
test_that("toRDD() returns an RRDD", {
df <- read.json(sqlContext, jsonPath)
testRDD <- toRDD(df)
expect_is(testRDD, "RDD")
expect_equal(count(testRDD), 3)
})
test_that("union on two RDDs created from DataFrames returns an RRDD", {
df <- read.json(sqlContext, jsonPath)
RDD1 <- toRDD(df)
RDD2 <- toRDD(df)
unioned <- unionRDD(RDD1, RDD2)
expect_is(unioned, "RDD")
expect_equal(getSerializedMode(unioned), "byte")
expect_equal(collect(unioned)[[2]]$name, "Andy")
})
test_that("union on mixed serialization types correctly returns a byte RRDD", {
# Byte RDD
nums <- 1:10
rdd <- parallelize(sc, nums, 2L)
# String RDD
textLines <- c("Michael",
"Andy, 30",
"Justin, 19")
textPath <- tempfile(pattern = "sparkr-textLines", fileext = ".tmp")
writeLines(textLines, textPath)
textRDD <- textFile(sc, textPath)
df <- read.json(sqlContext, jsonPath)
dfRDD <- toRDD(df)
unionByte <- unionRDD(rdd, dfRDD)
expect_is(unionByte, "RDD")
expect_equal(getSerializedMode(unionByte), "byte")
expect_equal(collect(unionByte)[[1]], 1)
expect_equal(collect(unionByte)[[12]]$name, "Andy")
unionString <- unionRDD(textRDD, dfRDD)
expect_is(unionString, "RDD")
expect_equal(getSerializedMode(unionString), "byte")
expect_equal(collect(unionString)[[1]], "Michael")
expect_equal(collect(unionString)[[5]]$name, "Andy")
})
test_that("objectFile() works with row serialization", {
objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp")
df <- read.json(sqlContext, jsonPath)
dfRDD <- toRDD(df)
saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
objectIn <- objectFile(sc, objectPath)
expect_is(objectIn, "RDD")
expect_equal(getSerializedMode(objectIn), "byte")
expect_equal(collect(objectIn)[[2]]$age, 30)
})
test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
df <- read.json(sqlContext, jsonPath)
testRDD <- lapply(df, function(row) {
row$newCol <- row$age + 5
row
})
expect_is(testRDD, "RDD")
collected <- collect(testRDD)
expect_equal(collected[[1]]$name, "Michael")
expect_equal(collected[[2]]$newCol, 35)
})
test_that("collect() returns a data.frame", {
df <- read.json(sqlContext, jsonPath)
rdf <- collect(df)
expect_true(is.data.frame(rdf))
expect_equal(names(rdf)[1], "age")
expect_equal(nrow(rdf), 3)
expect_equal(ncol(rdf), 2)
# collect() returns data correctly from a DataFrame with 0 row
df0 <- limit(df, 0)
rdf <- collect(df0)
expect_true(is.data.frame(rdf))
expect_equal(names(rdf)[1], "age")
expect_equal(nrow(rdf), 0)
expect_equal(ncol(rdf), 2)
# collect() correctly handles multiple columns with same name
df <- createDataFrame(sqlContext, list(list(1, 2)), schema = c("name", "name"))
ldf <- collect(df)
expect_equal(names(ldf), c("name", "name"))
})
test_that("limit() returns DataFrame with the correct number of rows", {
df <- read.json(sqlContext, jsonPath)
dfLimited <- limit(df, 2)
expect_is(dfLimited, "SparkDataFrame")
expect_equal(count(dfLimited), 2)
})
test_that("collect() and take() on a DataFrame return the same number of rows and columns", {
df <- read.json(sqlContext, jsonPath)
expect_equal(nrow(collect(df)), nrow(take(df, 10)))
expect_equal(ncol(collect(df)), ncol(take(df, 10)))
})
test_that("collect() support Unicode characters", {
lines <- c("{\"name\":\"안녕하세요\"}",
"{\"name\":\"您好\", \"age\":30}",
"{\"name\":\"こんにちは\", \"age\":19}",
"{\"name\":\"Xin chào\"}")
jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(lines, jsonPath)
df <- read.df(sqlContext, jsonPath, "json")
rdf <- collect(df)
expect_true(is.data.frame(rdf))
expect_equal(rdf$name[1], markUtf8("안녕하세요"))
expect_equal(rdf$name[2], markUtf8("您好"))
expect_equal(rdf$name[3], markUtf8("こんにちは"))
expect_equal(rdf$name[4], markUtf8("Xin chào"))
df1 <- createDataFrame(sqlContext, rdf)
expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好"))
})
test_that("multiple pipeline transformations result in an RDD with the correct values", {
df <- read.json(sqlContext, jsonPath)
first <- lapply(df, function(row) {
row$age <- row$age + 5
row
})
second <- lapply(first, function(row) {
row$testCol <- if (row$age == 35 && !is.na(row$age)) TRUE else FALSE
row
})
expect_is(second, "RDD")
expect_equal(count(second), 3)
expect_equal(collect(second)[[2]]$age, 35)
expect_true(collect(second)[[2]]$testCol)
expect_false(collect(second)[[3]]$testCol)
})
test_that("cache(), persist(), and unpersist() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
expect_false(df@env$isCached)
cache(df)
expect_true(df@env$isCached)
unpersist(df)
expect_false(df@env$isCached)
persist(df, "MEMORY_AND_DISK")
expect_true(df@env$isCached)
unpersist(df)
expect_false(df@env$isCached)
# make sure the data is collectable
expect_true(is.data.frame(collect(df)))
})
test_that("schema(), dtypes(), columns(), names() return the correct values/format", {
df <- read.json(sqlContext, jsonPath)
testSchema <- schema(df)
expect_equal(length(testSchema$fields()), 2)
expect_equal(testSchema$fields()[[1]]$dataType.toString(), "LongType")
expect_equal(testSchema$fields()[[2]]$dataType.simpleString(), "string")
expect_equal(testSchema$fields()[[1]]$name(), "age")
testTypes <- dtypes(df)
expect_equal(length(testTypes[[1]]), 2)
expect_equal(testTypes[[1]][1], "age")
testCols <- columns(df)
expect_equal(length(testCols), 2)
expect_equal(testCols[2], "name")
testNames <- names(df)
expect_equal(length(testNames), 2)
expect_equal(testNames[2], "name")
})
test_that("names() colnames() set the column names", {
df <- read.json(sqlContext, jsonPath)
names(df) <- c("col1", "col2")
expect_equal(colnames(df)[2], "col2")
colnames(df) <- c("col3", "col4")
expect_equal(names(df)[1], "col3")
expect_error(colnames(df) <- c("sepal.length", "sepal_width"),
"Colum names cannot contain the '.' symbol.")
expect_error(colnames(df) <- c(1, 2), "Invalid column names.")
expect_error(colnames(df) <- c("a"),
"Column names must have the same length as the number of columns in the dataset.")
expect_error(colnames(df) <- c("1", NA), "Column names cannot be NA.")
# Note: if this test is broken, remove check for "." character on colnames<- method
irisDF <- suppressWarnings(createDataFrame(sqlContext, iris))
expect_equal(names(irisDF)[1], "Sepal_Length")
# Test base::colnames base::names
m2 <- cbind(1, 1:4)
expect_equal(colnames(m2, do.NULL = FALSE), c("col1", "col2"))
colnames(m2) <- c("x", "Y")
expect_equal(colnames(m2), c("x", "Y"))
z <- list(a = 1, b = "c", c = 1:3)
expect_equal(names(z)[3], "c")
names(z)[3] <- "c2"
expect_equal(names(z)[3], "c2")
})
test_that("head() and first() return the correct data", {
df <- read.json(sqlContext, jsonPath)
testHead <- head(df)
expect_equal(nrow(testHead), 3)
expect_equal(ncol(testHead), 2)
testHead2 <- head(df, 2)
expect_equal(nrow(testHead2), 2)
expect_equal(ncol(testHead2), 2)
testFirst <- first(df)
expect_equal(nrow(testFirst), 1)
# head() and first() return the correct data on
# a DataFrame with 0 row
df0 <- limit(df, 0)
testHead <- head(df0)
expect_equal(nrow(testHead), 0)
expect_equal(ncol(testHead), 2)
testFirst <- first(df0)
expect_equal(nrow(testFirst), 0)
expect_equal(ncol(testFirst), 2)
})
test_that("distinct(), unique() and dropDuplicates() on DataFrames", {
lines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}",
"{\"name\":\"Justin\", \"age\":19}")
jsonPathWithDup <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(lines, jsonPathWithDup)
df <- read.json(sqlContext, jsonPathWithDup)
uniques <- distinct(df)
expect_is(uniques, "SparkDataFrame")
expect_equal(count(uniques), 3)
uniques2 <- unique(df)
expect_is(uniques2, "SparkDataFrame")
expect_equal(count(uniques2), 3)
# Test dropDuplicates()
df <- createDataFrame(
sqlContext,
list(
list(2, 1, 2), list(1, 1, 1),
list(1, 2, 1), list(2, 1, 2),
list(2, 2, 2), list(2, 2, 1),
list(2, 1, 1), list(1, 1, 2),
list(1, 2, 2), list(1, 2, 1)),
schema = c("key", "value1", "value2"))
result <- collect(dropDuplicates(df))
expected <- rbind.data.frame(
c(1, 1, 1), c(1, 1, 2), c(1, 2, 1),
c(1, 2, 2), c(2, 1, 1), c(2, 1, 2),
c(2, 2, 1), c(2, 2, 2))
names(expected) <- c("key", "value1", "value2")
expect_equivalent(
result[order(result$key, result$value1, result$value2), ],
expected)
result <- collect(dropDuplicates(df, c("key", "value1")))
expected <- rbind.data.frame(
c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2))
names(expected) <- c("key", "value1", "value2")
expect_equivalent(
result[order(result$key, result$value1, result$value2), ],
expected)
result <- collect(dropDuplicates(df, "key"))
expected <- rbind.data.frame(
c(1, 1, 1), c(2, 1, 2))
names(expected) <- c("key", "value1", "value2")
expect_equivalent(
result[order(result$key, result$value1, result$value2), ],
expected)
})
test_that("sample on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
sampled <- sample(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_is(sampled, "SparkDataFrame")
sampled2 <- sample(df, FALSE, 0.1, 0) # set seed for predictable result
expect_true(count(sampled2) < 3)
count1 <- count(sample(df, FALSE, 0.1, 0))
count2 <- count(sample(df, FALSE, 0.1, 0))
expect_equal(count1, count2)
# Also test sample_frac
sampled3 <- sample_frac(df, FALSE, 0.1, 0) # set seed for predictable result
expect_true(count(sampled3) < 3)
# nolint start
# Test base::sample is working
#expect_equal(length(sample(1:12)), 12)
# nolint end
})
test_that("select operators", {
df <- select(read.json(sqlContext, jsonPath), "name", "age")
expect_is(df$name, "Column")
expect_is(df[[2]], "Column")
expect_is(df[["age"]], "Column")
expect_is(df[, 1], "SparkDataFrame")
expect_equal(columns(df[, 1]), c("name"))
expect_equal(columns(df[, "age"]), c("age"))
df2 <- df[, c("age", "name")]
expect_is(df2, "SparkDataFrame")
expect_equal(columns(df2), c("age", "name"))
df$age2 <- df$age
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == df$age)), 2)
df$age2 <- df$age * 2
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == df$age * 2)), 2)
})
test_that("select with column", {
df <- read.json(sqlContext, jsonPath)
df1 <- select(df, "name")
expect_equal(columns(df1), c("name"))
expect_equal(count(df1), 3)
df2 <- select(df, df$age)
expect_equal(columns(df2), c("age"))
expect_equal(count(df2), 3)
df3 <- select(df, lit("x"))
expect_equal(columns(df3), c("x"))
expect_equal(count(df3), 3)
expect_equal(collect(select(df3, "x"))[[1, 1]], "x")
df4 <- select(df, c("name", "age"))
expect_equal(columns(df4), c("name", "age"))
expect_equal(count(df4), 3)
expect_error(select(df, c("name", "age"), "name"),
"To select multiple columns, use a character vector or list for col")
})
test_that("drop column", {
df <- select(read.json(sqlContext, jsonPath), "name", "age")
df1 <- drop(df, "name")
expect_equal(columns(df1), c("age"))
df$age2 <- df$age
df1 <- drop(df, c("name", "age"))
expect_equal(columns(df1), c("age2"))
df1 <- drop(df, df$age)
expect_equal(columns(df1), c("name", "age2"))
df$age2 <- NULL
expect_equal(columns(df), c("name", "age"))
df$age3 <- NULL
expect_equal(columns(df), c("name", "age"))
# Test to make sure base::drop is not masked
expect_equal(drop(1:3 %*% 2:4), 20)
})
test_that("subsetting", {
# read.json returns columns in random order
df <- select(read.json(sqlContext, jsonPath), "name", "age")
filtered <- df[df$age > 20, ]
expect_equal(count(filtered), 1)
expect_equal(columns(filtered), c("name", "age"))
expect_equal(collect(filtered)$name, "Andy")
df2 <- df[df$age == 19, 1]
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 1)
expect_equal(columns(df2), c("name"))
expect_equal(collect(df2)$name, "Justin")
df3 <- df[df$age > 20, 2]
expect_equal(count(df3), 1)
expect_equal(columns(df3), c("age"))
df4 <- df[df$age %in% c(19, 30), 1:2]
expect_equal(count(df4), 2)
expect_equal(columns(df4), c("name", "age"))
df5 <- df[df$age %in% c(19), c(1, 2)]
expect_equal(count(df5), 1)
expect_equal(columns(df5), c("name", "age"))
df6 <- subset(df, df$age %in% c(30), c(1, 2))
expect_equal(count(df6), 1)
expect_equal(columns(df6), c("name", "age"))
df7 <- subset(df, select = "name")
expect_equal(count(df7), 3)
expect_equal(columns(df7), c("name"))
# Test base::subset is working
expect_equal(nrow(subset(airquality, Temp > 80, select = c(Ozone, Temp))), 68)
})
test_that("selectExpr() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
selected <- selectExpr(df, "age * 2")
expect_equal(names(selected), "(age * 2)")
expect_equal(collect(selected), collect(select(df, df$age * 2L)))
selected2 <- selectExpr(df, "name as newName", "abs(age) as age")
expect_equal(names(selected2), c("newName", "age"))
expect_equal(count(selected2), 3)
})
test_that("expr() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
expect_equal(collect(select(df, expr("abs(-123)")))[1, 1], 123)
})
test_that("column calculation", {
df <- read.json(sqlContext, jsonPath)
d <- collect(select(df, alias(df$age + 1, "age2")))
expect_equal(names(d), c("age2"))
df2 <- select(df, lower(df$name), abs(df$age))
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)
})
test_that("test HiveContext", {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
},
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
df <- createExternalTable(hiveCtx, "json", jsonPath, "json")
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
df2 <- sql(hiveCtx, "select * from json")
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)
jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2))
df3 <- sql(hiveCtx, "select * from json2")
expect_is(df3, "SparkDataFrame")
expect_equal(count(df3), 3)
unlink(jsonPath2)
hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
df4 <- sql(hiveCtx, "select * from hivetestbl")
expect_is(df4, "SparkDataFrame")
expect_equal(count(df4), 3)
unlink(hivetestDataPath)
parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath))
df5 <- sql(hiveCtx, "select * from parquetest")
expect_is(df5, "SparkDataFrame")
expect_equal(count(df5), 3)
unlink(parquetDataPath)
})
test_that("column operators", {
c <- column("a")
c2 <- (- c + 1 - 2) * 3 / 4.0
c3 <- (c + c2 - c2) * c2 %% c2
c4 <- (c > c2) & (c2 <= c3) | (c == c2) & (c2 != c3)
c5 <- c2 ^ c3 ^ c4
})
test_that("column functions", {
c <- column("a")
c1 <- abs(c) + acos(c) + approxCountDistinct(c) + ascii(c) + asin(c) + atan(c)
c2 <- avg(c) + base64(c) + bin(c) + bitwiseNOT(c) + cbrt(c) + ceil(c) + cos(c)
c3 <- cosh(c) + count(c) + crc32(c) + hash(c) + exp(c)
c4 <- explode(c) + expm1(c) + factorial(c) + first(c) + floor(c) + hex(c)
c5 <- hour(c) + initcap(c) + last(c) + last_day(c) + length(c)
c6 <- log(c) + (c) + log1p(c) + log2(c) + lower(c) + ltrim(c) + max(c) + md5(c)
c7 <- mean(c) + min(c) + month(c) + negate(c) + quarter(c)
c8 <- reverse(c) + rint(c) + round(c) + rtrim(c) + sha1(c)
c9 <- signum(c) + sin(c) + sinh(c) + size(c) + stddev(c) + soundex(c) + sqrt(c) + sum(c)
c10 <- sumDistinct(c) + tan(c) + tanh(c) + toDegrees(c) + toRadians(c)
c11 <- to_date(c) + trim(c) + unbase64(c) + unhex(c) + upper(c)
c12 <- variance(c)
c13 <- lead("col", 1) + lead(c, 1) + lag("col", 1) + lag(c, 1)
c14 <- cume_dist() + ntile(1) + corr(c, c1)
c15 <- dense_rank() + percent_rank() + rank() + row_number()
c16 <- is.nan(c) + isnan(c) + isNaN(c)
c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
# Test if base::is.nan() is exposed
expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))
# Test if base::rank() is exposed
expect_equal(class(rank())[[1]], "Column")
expect_equal(rank(1:3), as.numeric(c(1:3)))
df <- read.json(sqlContext, jsonPath)
df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
expect_equal(collect(df2)[[2, 1]], TRUE)
expect_equal(collect(df2)[[2, 2]], FALSE)
expect_equal(collect(df2)[[3, 1]], FALSE)
expect_equal(collect(df2)[[3, 2]], TRUE)
df3 <- select(df, between(df$name, c("Apache", "Spark")))
expect_equal(collect(df3)[[1, 1]], TRUE)
expect_equal(collect(df3)[[2, 1]], FALSE)
expect_equal(collect(df3)[[3, 1]], TRUE)
df4 <- select(df, countDistinct(df$age, df$name))
expect_equal(collect(df4)[[1, 1]], 2)
expect_equal(collect(select(df, sum(df$age)))[1, 1], 49)
expect_true(abs(collect(select(df, stddev(df$age)))[1, 1] - 7.778175) < 1e-6)
expect_equal(collect(select(df, var_pop(df$age)))[1, 1], 30.25)
df5 <- createDataFrame(sqlContext, list(list(a = "010101")))
expect_equal(collect(select(df5, conv(df5$a, 2, 16)))[1, 1], "15")
# Test array_contains() and sort_array()
df <- createDataFrame(sqlContext, list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L))))
result <- collect(select(df, array_contains(df[[1]], 1L)))[[1]]
expect_equal(result, c(TRUE, FALSE))
result <- collect(select(df, sort_array(df[[1]], FALSE)))[[1]]
expect_equal(result, list(list(3L, 2L, 1L), list(6L, 5L, 4L)))
result <- collect(select(df, sort_array(df[[1]])))[[1]]
expect_equal(result, list(list(1L, 2L, 3L), list(4L, 5L, 6L)))
# Test that stats::lag is working
expect_equal(length(lag(ldeaths, 12)), 72)
# Test struct()
df <- createDataFrame(sqlContext,
list(list(1L, 2L, 3L), list(4L, 5L, 6L)),
schema = c("a", "b", "c"))
result <- collect(select(df, struct("a", "c")))
expected <- data.frame(row.names = 1:2)
expected$"struct(a, c)" <- list(listToStruct(list(a = 1L, c = 3L)),
listToStruct(list(a = 4L, c = 6L)))
expect_equal(result, expected)
result <- collect(select(df, struct(df$a, df$b)))
expected <- data.frame(row.names = 1:2)
expected$"struct(a, b)" <- list(listToStruct(list(a = 1L, b = 2L)),
listToStruct(list(a = 4L, b = 5L)))
expect_equal(result, expected)
# Test encode(), decode()
bytes <- as.raw(c(0xe5, 0xa4, 0xa7, 0xe5, 0x8d, 0x83, 0xe4, 0xb8, 0x96, 0xe7, 0x95, 0x8c))
df <- createDataFrame(sqlContext,
list(list(markUtf8("大千世界"), "utf-8", bytes)),
schema = c("a", "b", "c"))
result <- collect(select(df, encode(df$a, "utf-8"), decode(df$c, "utf-8")))
expect_equal(result[[1]][[1]], bytes)
expect_equal(result[[2]], markUtf8("大千世界"))
# Test first(), last()
df <- read.json(sqlContext, jsonPath)
expect_equal(collect(select(df, first(df$age)))[[1]], NA)
expect_equal(collect(select(df, first(df$age, TRUE)))[[1]], 30)
expect_equal(collect(select(df, first("age")))[[1]], NA)
expect_equal(collect(select(df, first("age", TRUE)))[[1]], 30)
expect_equal(collect(select(df, last(df$age)))[[1]], 19)
expect_equal(collect(select(df, last(df$age, TRUE)))[[1]], 19)
expect_equal(collect(select(df, last("age")))[[1]], 19)
expect_equal(collect(select(df, last("age", TRUE)))[[1]], 19)
# Test bround()
df <- createDataFrame(sqlContext, data.frame(x = c(2.5, 3.5)))
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)
})
test_that("column binary mathfunctions", {
lines <- c("{\"a\":1, \"b\":5}",
"{\"a\":2, \"b\":6}",
"{\"a\":3, \"b\":7}",
"{\"a\":4, \"b\":8}")
jsonPathWithDup <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(lines, jsonPathWithDup)
df <- read.json(sqlContext, jsonPathWithDup)
expect_equal(collect(select(df, atan2(df$a, df$b)))[1, "ATAN2(a, b)"], atan2(1, 5))
expect_equal(collect(select(df, atan2(df$a, df$b)))[2, "ATAN2(a, b)"], atan2(2, 6))
expect_equal(collect(select(df, atan2(df$a, df$b)))[3, "ATAN2(a, b)"], atan2(3, 7))
expect_equal(collect(select(df, atan2(df$a, df$b)))[4, "ATAN2(a, b)"], atan2(4, 8))
## nolint start
expect_equal(collect(select(df, hypot(df$a, df$b)))[1, "HYPOT(a, b)"], sqrt(1^2 + 5^2))
expect_equal(collect(select(df, hypot(df$a, df$b)))[2, "HYPOT(a, b)"], sqrt(2^2 + 6^2))
expect_equal(collect(select(df, hypot(df$a, df$b)))[3, "HYPOT(a, b)"], sqrt(3^2 + 7^2))
expect_equal(collect(select(df, hypot(df$a, df$b)))[4, "HYPOT(a, b)"], sqrt(4^2 + 8^2))
## nolint end
expect_equal(collect(select(df, shiftLeft(df$b, 1)))[4, 1], 16)
expect_equal(collect(select(df, shiftRight(df$b, 1)))[4, 1], 4)
expect_equal(collect(select(df, shiftRightUnsigned(df$b, 1)))[4, 1], 4)
expect_equal(class(collect(select(df, rand()))[2, 1]), "numeric")
expect_equal(collect(select(df, rand(1)))[1, 1], 0.134, tolerance = 0.01)
expect_equal(class(collect(select(df, randn()))[2, 1]), "numeric")
expect_equal(collect(select(df, randn(1)))[1, 1], -1.03, tolerance = 0.01)
})
test_that("string operators", {
df <- read.json(sqlContext, jsonPath)
expect_equal(count(where(df, like(df$name, "A%"))), 1)
expect_equal(count(where(df, startsWith(df$name, "A"))), 1)
expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi")
expect_equal(collect(select(df, cast(df$age, "string")))[[2, 1]], "30")
expect_equal(collect(select(df, concat(df$name, lit(":"), df$age)))[[2, 1]], "Andy:30")
expect_equal(collect(select(df, concat_ws(":", df$name)))[[2, 1]], "Andy")
expect_equal(collect(select(df, concat_ws(":", df$name, df$age)))[[2, 1]], "Andy:30")
expect_equal(collect(select(df, instr(df$name, "i")))[, 1], c(2, 0, 5))
expect_equal(collect(select(df, format_number(df$age, 2)))[2, 1], "30.00")
expect_equal(collect(select(df, sha1(df$name)))[2, 1],
"ab5a000e88b5d9d0fa2575f5c6263eb93452405d")
expect_equal(collect(select(df, sha2(df$name, 256)))[2, 1],
"80f2aed3c618c423ddf05a2891229fba44942d907173152442cf6591441ed6dc")
expect_equal(collect(select(df, format_string("Name:%s", df$name)))[2, 1], "Name:Andy")
expect_equal(collect(select(df, format_string("%s, %d", df$name, df$age)))[2, 1], "Andy, 30")
expect_equal(collect(select(df, regexp_extract(df$name, "(n.y)", 1)))[2, 1], "ndy")
expect_equal(collect(select(df, regexp_replace(df$name, "(n.y)", "ydn")))[2, 1], "Aydn")
l2 <- list(list(a = "aaads"))
df2 <- createDataFrame(sqlContext, l2)
expect_equal(collect(select(df2, locate("aa", df2$a)))[1, 1], 1)
expect_equal(collect(select(df2, locate("aa", df2$a, 1)))[1, 1], 2)
expect_equal(collect(select(df2, lpad(df2$a, 8, "#")))[1, 1], "###aaads") # nolint
expect_equal(collect(select(df2, rpad(df2$a, 8, "#")))[1, 1], "aaads###") # nolint
l3 <- list(list(a = "a.b.c.d"))
df3 <- createDataFrame(sqlContext, l3)
expect_equal(collect(select(df3, substring_index(df3$a, ".", 2)))[1, 1], "a.b")
expect_equal(collect(select(df3, substring_index(df3$a, ".", -3)))[1, 1], "b.c.d")
expect_equal(collect(select(df3, translate(df3$a, "bc", "12")))[1, 1], "a.1.2.d")
})
test_that("date functions on a DataFrame", {
.originalTimeZone <- Sys.getenv("TZ")
Sys.setenv(TZ = "UTC")
l <- list(list(a = 1L, b = as.Date("2012-12-13")),
list(a = 2L, b = as.Date("2013-12-14")),
list(a = 3L, b = as.Date("2014-12-15")))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(select(df, dayofmonth(df$b)))[, 1], c(13, 14, 15))
expect_equal(collect(select(df, dayofyear(df$b)))[, 1], c(348, 348, 349))
expect_equal(collect(select(df, weekofyear(df$b)))[, 1], c(50, 50, 51))
expect_equal(collect(select(df, year(df$b)))[, 1], c(2012, 2013, 2014))
expect_equal(collect(select(df, month(df$b)))[, 1], c(12, 12, 12))
expect_equal(collect(select(df, last_day(df$b)))[, 1],
c(as.Date("2012-12-31"), as.Date("2013-12-31"), as.Date("2014-12-31")))
expect_equal(collect(select(df, next_day(df$b, "MONDAY")))[, 1],
c(as.Date("2012-12-17"), as.Date("2013-12-16"), as.Date("2014-12-22")))
expect_equal(collect(select(df, date_format(df$b, "y")))[, 1], c("2012", "2013", "2014"))
expect_equal(collect(select(df, add_months(df$b, 3)))[, 1],
c(as.Date("2013-03-13"), as.Date("2014-03-14"), as.Date("2015-03-15")))
expect_equal(collect(select(df, date_add(df$b, 1)))[, 1],
c(as.Date("2012-12-14"), as.Date("2013-12-15"), as.Date("2014-12-16")))
expect_equal(collect(select(df, date_sub(df$b, 1)))[, 1],
c(as.Date("2012-12-12"), as.Date("2013-12-13"), as.Date("2014-12-14")))
l2 <- list(list(a = 1L, b = as.POSIXlt("2012-12-13 12:34:00", tz = "UTC")),
list(a = 2L, b = as.POSIXlt("2014-12-15 01:24:34", tz = "UTC")))
df2 <- createDataFrame(sqlContext, l2)
expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24))
expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34))
expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1],
c(as.POSIXlt("2012-12-13 21:34:00 UTC"), as.POSIXlt("2014-12-15 10:24:34 UTC")))
expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1],
c(as.POSIXlt("2012-12-13 03:34:00 UTC"), as.POSIXlt("2014-12-14 16:24:34 UTC")))
expect_more_than(collect(select(df2, unix_timestamp()))[1, 1], 0)
expect_more_than(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0)
expect_more_than(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0)
l3 <- list(list(a = 1000), list(a = -1000))
df3 <- createDataFrame(sqlContext, l3)
result31 <- collect(select(df3, from_unixtime(df3$a)))
expect_equal(grep("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", result31[, 1], perl = TRUE),
c(1, 2))
result32 <- collect(select(df3, from_unixtime(df3$a, "yyyy")))
expect_equal(grep("\\d{4}", result32[, 1]), c(1, 2))
Sys.setenv(TZ = .originalTimeZone)
})
test_that("greatest() and least() on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(select(df, greatest(df$a, df$b)))[, 1], c(2, 4))
expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3))
})
test_that("time windowing (window()) with all inputs", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
})
test_that("time windowing (window()) with slide duration", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", "2 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1, 1))
})
test_that("time windowing (window()) with start time", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", startTime = "2 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
})
test_that("time windowing (window()) with just window duration", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
})
test_that("when(), otherwise() and ifelse() on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, 1)))[, 1], c(NA, 1))
expect_equal(collect(select(df, otherwise(when(df$a > 1, 1), 0)))[, 1], c(0, 1))
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, 0, 1)))[, 1], c(1, 0))
})
test_that("when(), otherwise() and ifelse() with column on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, lit(1))))[, 1], c(NA, 1))
expect_equal(collect(select(df, otherwise(when(df$a > 1, lit(1)), lit(0))))[, 1], c(0, 1))
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, lit(0), lit(1))))[, 1], c(1, 0))
})
test_that("group by, agg functions", {
df <- read.json(sqlContext, jsonPath)
df1 <- agg(df, name = "max", age = "sum")
expect_equal(1, count(df1))
df1 <- agg(df, age2 = max(df$age))
expect_equal(1, count(df1))
expect_equal(columns(df1), c("age2"))
gd <- groupBy(df, "name")
expect_is(gd, "GroupedData")
df2 <- count(gd)
expect_is(df2, "SparkDataFrame")
expect_equal(3, count(df2))
# Also test group_by, summarize, mean
gd1 <- group_by(df, "name")
expect_is(gd1, "GroupedData")
df_summarized <- summarize(gd, mean_age = mean(df$age))
expect_is(df_summarized, "SparkDataFrame")
expect_equal(3, count(df_summarized))
df3 <- agg(gd, age = "stddev")
expect_is(df3, "SparkDataFrame")
df3_local <- collect(df3)
expect_true(is.nan(df3_local[df3_local$name == "Andy", ][1, 2]))
df4 <- agg(gd, sumAge = sum(df$age))
expect_is(df4, "SparkDataFrame")
expect_equal(3, count(df4))
expect_equal(columns(df4), c("name", "sumAge"))
df5 <- sum(gd, "age")
expect_is(df5, "SparkDataFrame")
expect_equal(3, count(df5))
expect_equal(3, count(mean(gd)))
expect_equal(3, count(max(gd)))
expect_equal(30, collect(max(gd))[2, 2])
expect_equal(1, collect(count(gd))[1, 2])
mockLines2 <- c("{\"name\":\"ID1\", \"value\": \"10\"}",
"{\"name\":\"ID1\", \"value\": \"10\"}",
"{\"name\":\"ID1\", \"value\": \"22\"}",
"{\"name\":\"ID2\", \"value\": \"-3\"}")
jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLines2, jsonPath2)
gd2 <- groupBy(read.json(sqlContext, jsonPath2), "name")
df6 <- agg(gd2, value = "sum")
df6_local <- collect(df6)
expect_equal(42, df6_local[df6_local$name == "ID1", ][1, 2])
expect_equal(-3, df6_local[df6_local$name == "ID2", ][1, 2])
df7 <- agg(gd2, value = "stddev")
df7_local <- collect(df7)
expect_true(abs(df7_local[df7_local$name == "ID1", ][1, 2] - 6.928203) < 1e-6)
expect_true(is.nan(df7_local[df7_local$name == "ID2", ][1, 2]))
mockLines3 <- c("{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}",
"{\"name\":\"Justin\", \"age\":1}")
jsonPath3 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLines3, jsonPath3)
df8 <- read.json(sqlContext, jsonPath3)
gd3 <- groupBy(df8, "name")
gd3_local <- collect(sum(gd3))
expect_equal(60, gd3_local[gd3_local$name == "Andy", ][1, 2])
expect_equal(20, gd3_local[gd3_local$name == "Justin", ][1, 2])
expect_true(abs(collect(agg(df, sd(df$age)))[1, 1] - 7.778175) < 1e-6)
gd3_local <- collect(agg(gd3, var(df8$age)))
expect_equal(162, gd3_local[gd3_local$name == "Justin", ][1, 2])
# Test stats::sd, stats::var are working
expect_true(abs(sd(1:2) - 0.7071068) < 1e-6)
expect_true(abs(var(1:5, 1:5) - 2.5) < 1e-6)
unlink(jsonPath2)
unlink(jsonPath3)
})
test_that("arrange() and orderBy() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
sorted <- arrange(df, df$age)
expect_equal(collect(sorted)[1, 2], "Michael")
sorted2 <- arrange(df, "name", decreasing = FALSE)
expect_equal(collect(sorted2)[2, "age"], 19)
sorted3 <- orderBy(df, asc(df$age))
expect_true(is.na(first(sorted3)$age))
expect_equal(collect(sorted3)[2, "age"], 19)
sorted4 <- orderBy(df, desc(df$name))
expect_equal(first(sorted4)$name, "Michael")
expect_equal(collect(sorted4)[3, "name"], "Andy")
sorted5 <- arrange(df, "age", "name", decreasing = TRUE)
expect_equal(collect(sorted5)[1, 2], "Andy")
sorted6 <- arrange(df, "age", "name", decreasing = c(T, F))
expect_equal(collect(sorted6)[1, 2], "Andy")
sorted7 <- arrange(df, "name", decreasing = FALSE)
expect_equal(collect(sorted7)[2, "age"], 19)
})
test_that("filter() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
filtered <- filter(df, "age > 20")
expect_equal(count(filtered), 1)
expect_equal(collect(filtered)$name, "Andy")
filtered2 <- where(df, df$name != "Michael")
expect_equal(count(filtered2), 2)
expect_equal(collect(filtered2)$age[2], 19)
# test suites for %in%
filtered3 <- filter(df, "age in (19)")
expect_equal(count(filtered3), 1)
filtered4 <- filter(df, "age in (19, 30)")
expect_equal(count(filtered4), 2)
filtered5 <- where(df, df$age %in% c(19))
expect_equal(count(filtered5), 1)
filtered6 <- where(df, df$age %in% c(19, 30))
expect_equal(count(filtered6), 2)
# Test stats::filter is working
#expect_true(is.ts(filter(1:100, rep(1, 3)))) # nolint
})
test_that("join() and merge() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
"{\"name\":\"Andy\", \"test\": \"no\"}",
"{\"name\":\"Justin\", \"test\": \"yes\"}",
"{\"name\":\"Bob\", \"test\": \"yes\"}")
jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLines2, jsonPath2)
df2 <- read.json(sqlContext, jsonPath2)
joined <- join(df, df2)
expect_equal(names(joined), c("age", "name", "name", "test"))
expect_equal(count(joined), 12)
expect_equal(names(collect(joined)), c("age", "name", "name", "test"))
joined2 <- join(df, df2, df$name == df2$name)
expect_equal(names(joined2), c("age", "name", "name", "test"))
expect_equal(count(joined2), 3)
joined3 <- join(df, df2, df$name == df2$name, "rightouter")
expect_equal(names(joined3), c("age", "name", "name", "test"))
expect_equal(count(joined3), 4)
expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2]))
joined4 <- select(join(df, df2, df$name == df2$name, "outer"),
alias(df$age + 5, "newAge"), df$name, df2$test)
expect_equal(names(joined4), c("newAge", "name", "test"))
expect_equal(count(joined4), 4)
expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24)
joined5 <- join(df, df2, df$name == df2$name, "leftouter")
expect_equal(names(joined5), c("age", "name", "name", "test"))
expect_equal(count(joined5), 3)
expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1]))
joined6 <- join(df, df2, df$name == df2$name, "inner")
expect_equal(names(joined6), c("age", "name", "name", "test"))
expect_equal(count(joined6), 3)
joined7 <- join(df, df2, df$name == df2$name, "leftsemi")
expect_equal(names(joined7), c("age", "name"))
expect_equal(count(joined7), 3)
joined8 <- join(df, df2, df$name == df2$name, "left_outer")
expect_equal(names(joined8), c("age", "name", "name", "test"))
expect_equal(count(joined8), 3)
expect_true(is.na(collect(orderBy(joined8, joined8$age))$age[1]))
joined9 <- join(df, df2, df$name == df2$name, "right_outer")
expect_equal(names(joined9), c("age", "name", "name", "test"))
expect_equal(count(joined9), 4)
expect_true(is.na(collect(orderBy(joined9, joined9$age))$age[2]))
merged <- merge(df, df2, by.x = "name", by.y = "name", all.x = TRUE, all.y = TRUE)
expect_equal(count(merged), 4)
expect_equal(names(merged), c("age", "name_x", "name_y", "test"))
expect_equal(collect(orderBy(merged, merged$name_x))$age[3], 19)
merged <- merge(df, df2, suffixes = c("-X", "-Y"))
expect_equal(count(merged), 3)
expect_equal(names(merged), c("age", "name-X", "name-Y", "test"))
expect_equal(collect(orderBy(merged, merged$"name-X"))$age[1], 30)
merged <- merge(df, df2, by = "name", suffixes = c("-X", "-Y"), sort = FALSE)
expect_equal(count(merged), 3)
expect_equal(names(merged), c("age", "name-X", "name-Y", "test"))
expect_equal(collect(orderBy(merged, merged$"name-Y"))$"name-X"[3], "Michael")
merged <- merge(df, df2, by = "name", all = T, sort = T)
expect_equal(count(merged), 4)
expect_equal(names(merged), c("age", "name_x", "name_y", "test"))
expect_equal(collect(orderBy(merged, merged$"name_y"))$"name_x"[1], "Andy")
merged <- merge(df, df2, by = NULL)
expect_equal(count(merged), 12)
expect_equal(names(merged), c("age", "name", "name", "test"))
mockLines3 <- c("{\"name\":\"Michael\", \"name_y\":\"Michael\", \"test\": \"yes\"}",
"{\"name\":\"Andy\", \"name_y\":\"Andy\", \"test\": \"no\"}",
"{\"name\":\"Justin\", \"name_y\":\"Justin\", \"test\": \"yes\"}",
"{\"name\":\"Bob\", \"name_y\":\"Bob\", \"test\": \"yes\"}")
jsonPath3 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLines3, jsonPath3)
df3 <- read.json(sqlContext, jsonPath3)
expect_error(merge(df, df3),
paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.", sep = ""))
unlink(jsonPath2)
unlink(jsonPath3)
})
test_that("toJSON() returns an RDD of the correct values", {
df <- read.json(sqlContext, jsonPath)
testRDD <- toJSON(df)
expect_is(testRDD, "RDD")
expect_equal(getSerializedMode(testRDD), "string")
expect_equal(collect(testRDD)[[1]], mockLines[1])
})
test_that("showDF()", {
df <- read.json(sqlContext, jsonPath)
s <- capture.output(showDF(df))
expected <- paste("+----+-------+\n",
"| age| name|\n",
"+----+-------+\n",
"|null|Michael|\n",
"| 30| Andy|\n",
"| 19| Justin|\n",
"+----+-------+\n", sep = "")
expect_output(s, expected)
})
test_that("isLocal()", {
df <- read.json(sqlContext, jsonPath)
expect_false(isLocal(df))
})
test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(lines, jsonPath2)
df2 <- read.df(sqlContext, jsonPath2, "json")
unioned <- arrange(unionAll(df, df2), df$age)
expect_is(unioned, "SparkDataFrame")
expect_equal(count(unioned), 6)
expect_equal(first(unioned)$name, "Michael")
unioned2 <- arrange(rbind(unioned, df, df2), df$age)
expect_is(unioned2, "SparkDataFrame")
expect_equal(count(unioned2), 12)
expect_equal(first(unioned2)$name, "Michael")
excepted <- arrange(except(df, df2), desc(df$age))
expect_is(unioned, "SparkDataFrame")
expect_equal(count(excepted), 2)
expect_equal(first(excepted)$name, "Justin")
intersected <- arrange(intersect(df, df2), df$age)
expect_is(unioned, "SparkDataFrame")
expect_equal(count(intersected), 1)
expect_equal(first(intersected)$name, "Andy")
# Test base::rbind is working
expect_equal(length(rbind(1:4, c = 2, a = 10, 10, deparse.level = 0)), 16)
# Test base::intersect is working
expect_equal(length(intersect(1:20, 3:23)), 18)
unlink(jsonPath2)
})
test_that("withColumn() and withColumnRenamed()", {
df <- read.json(sqlContext, jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[3], "newAge")
expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32)
# Replace existing column
newDF <- withColumn(df, "age", df$age + 2)
expect_equal(length(columns(newDF)), 2)
expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32)
newDF2 <- withColumnRenamed(df, "age", "newerAge")
expect_equal(length(columns(newDF2)), 2)
expect_equal(columns(newDF2)[1], "newerAge")
})
test_that("mutate(), transform(), rename() and names()", {
df <- read.json(sqlContext, jsonPath)
newDF <- mutate(df, newAge = df$age + 2)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[3], "newAge")
expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32)
newDF2 <- rename(df, newerAge = df$age)
expect_equal(length(columns(newDF2)), 2)
expect_equal(columns(newDF2)[1], "newerAge")
names(newDF2) <- c("newerName", "evenNewerAge")
expect_equal(length(names(newDF2)), 2)
expect_equal(names(newDF2)[1], "newerName")
transformedDF <- transform(df, newAge = -df$age, newAge2 = df$age / 2)
expect_equal(length(columns(transformedDF)), 4)
expect_equal(columns(transformedDF)[3], "newAge")
expect_equal(columns(transformedDF)[4], "newAge2")
expect_equal(first(filter(transformedDF, transformedDF$name == "Andy"))$newAge, -30)
# test if base::transform on local data frames works
# ensure the proper signature is used - otherwise this will fail to run
attach(airquality)
result <- transform(Ozone, logOzone = log(Ozone))
expect_equal(nrow(result), 153)
expect_equal(ncol(result), 2)
detach(airquality)
})
test_that("read/write Parquet files", {
df <- read.df(sqlContext, jsonPath, "json")
# Test write.df and read.df
write.df(df, parquetPath, "parquet", mode = "overwrite")
df2 <- read.df(sqlContext, parquetPath, "parquet")
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)
# Test write.parquet/saveAsParquetFile and read.parquet/parquetFile
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
write.parquet(df, parquetPath2)
parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
suppressWarnings(saveAsParquetFile(df, parquetPath3))
parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3))
expect_is(parquetDF, "SparkDataFrame")
expect_equal(count(parquetDF), count(df) * 2)
parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3))
expect_is(parquetDF2, "SparkDataFrame")
expect_equal(count(parquetDF2), count(df) * 2)
# Test if varargs works with variables
saveMode <- "overwrite"
mergeSchema <- "true"
parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema)
unlink(parquetPath2)
unlink(parquetPath3)
unlink(parquetPath4)
})
test_that("read/write text files", {
# Test write.df and read.df
df <- read.df(sqlContext, jsonPath, "text")
expect_is(df, "SparkDataFrame")
expect_equal(colnames(df), c("value"))
expect_equal(count(df), 3)
textPath <- tempfile(pattern = "textPath", fileext = ".txt")
write.df(df, textPath, "text", mode = "overwrite")
# Test write.text and read.text
textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt")
write.text(df, textPath2)
df2 <- read.text(sqlContext, c(textPath, textPath2))
expect_is(df2, "SparkDataFrame")
expect_equal(colnames(df2), c("value"))
expect_equal(count(df2), count(df) * 2)
unlink(textPath)
unlink(textPath2)
})
test_that("describe() and summarize() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
stats <- describe(df, "age")
expect_equal(collect(stats)[1, "summary"], "count")
expect_equal(collect(stats)[2, "age"], "24.5")
expect_equal(collect(stats)[3, "age"], "7.7781745930520225")
stats <- describe(df)
expect_equal(collect(stats)[4, "name"], "Andy")
expect_equal(collect(stats)[5, "age"], "30")
stats2 <- summary(df)
expect_equal(collect(stats2)[4, "name"], "Andy")
expect_equal(collect(stats2)[5, "age"], "30")
# Test base::summary is working
expect_equal(length(summary(attenu, digits = 4)), 35)
})
test_that("dropna() and na.omit() on a DataFrame", {
df <- read.json(sqlContext, jsonPathNa)
rows <- collect(df)
# drop with columns
expected <- rows[!is.na(rows$name), ]
actual <- collect(dropna(df, cols = "name"))
expect_identical(expected, actual)
actual <- collect(na.omit(df, cols = "name"))
expect_identical(expected, actual)
expected <- rows[!is.na(rows$age), ]
actual <- collect(dropna(df, cols = "age"))
row.names(expected) <- row.names(actual)
# identical on two dataframes does not work here. Don't know why.
# use identical on all columns as a workaround.
expect_identical(expected$age, actual$age)
expect_identical(expected$height, actual$height)
expect_identical(expected$name, actual$name)
actual <- collect(na.omit(df, cols = "age"))
expected <- rows[!is.na(rows$age) & !is.na(rows$height), ]
actual <- collect(dropna(df, cols = c("age", "height")))
expect_identical(expected, actual)
actual <- collect(na.omit(df, cols = c("age", "height")))
expect_identical(expected, actual)
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name), ]
actual <- collect(dropna(df))
expect_identical(expected, actual)
actual <- collect(na.omit(df))
expect_identical(expected, actual)
# drop with how
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name), ]
actual <- collect(dropna(df))
expect_identical(expected, actual)
actual <- collect(na.omit(df))
expect_identical(expected, actual)
expected <- rows[!is.na(rows$age) | !is.na(rows$height) | !is.na(rows$name), ]
actual <- collect(dropna(df, "all"))
expect_identical(expected, actual)
actual <- collect(na.omit(df, "all"))
expect_identical(expected, actual)
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name), ]
actual <- collect(dropna(df, "any"))
expect_identical(expected, actual)
actual <- collect(na.omit(df, "any"))
expect_identical(expected, actual)
expected <- rows[!is.na(rows$age) & !is.na(rows$height), ]
actual <- collect(dropna(df, "any", cols = c("age", "height")))
expect_identical(expected, actual)
actual <- collect(na.omit(df, "any", cols = c("age", "height")))
expect_identical(expected, actual)
expected <- rows[!is.na(rows$age) | !is.na(rows$height), ]
actual <- collect(dropna(df, "all", cols = c("age", "height")))
expect_identical(expected, actual)
actual <- collect(na.omit(df, "all", cols = c("age", "height")))
expect_identical(expected, actual)
# drop with threshold
expected <- rows[as.integer(!is.na(rows$age)) + as.integer(!is.na(rows$height)) >= 2, ]
actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height")))
expect_identical(expected, actual)
actual <- collect(na.omit(df, minNonNulls = 2, cols = c("age", "height")))
expect_identical(expected, actual)
expected <- rows[as.integer(!is.na(rows$age)) +
as.integer(!is.na(rows$height)) +
as.integer(!is.na(rows$name)) >= 3, ]
actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height")))
expect_identical(expected, actual)
actual <- collect(na.omit(df, minNonNulls = 3, cols = c("name", "age", "height")))
expect_identical(expected, actual)
# Test stats::na.omit is working
expect_equal(nrow(na.omit(data.frame(x = c(0, 10, NA)))), 2)
})
test_that("fillna() on a DataFrame", {
df <- read.json(sqlContext, jsonPathNa)
rows <- collect(df)
# fill with value
expected <- rows
expected$age[is.na(expected$age)] <- 50
expected$height[is.na(expected$height)] <- 50.6
actual <- collect(fillna(df, 50.6))
expect_identical(expected, actual)
expected <- rows
expected$name[is.na(expected$name)] <- "unknown"
actual <- collect(fillna(df, "unknown"))
expect_identical(expected, actual)
expected <- rows
expected$age[is.na(expected$age)] <- 50
actual <- collect(fillna(df, 50.6, "age"))
expect_identical(expected, actual)
expected <- rows
expected$name[is.na(expected$name)] <- "unknown"
actual <- collect(fillna(df, "unknown", c("age", "name")))
expect_identical(expected, actual)
# fill with named list
expected <- rows
expected$age[is.na(expected$age)] <- 50
expected$height[is.na(expected$height)] <- 50.6
expected$name[is.na(expected$name)] <- "unknown"
actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown")))
expect_identical(expected, actual)
})
test_that("crosstab() on a DataFrame", {
rdd <- lapply(parallelize(sc, 0:3), function(x) {
list(paste0("a", x %% 3), paste0("b", x %% 2))
})
df <- toDF(rdd, list("a", "b"))
ct <- crosstab(df, "a", "b")
ordered <- ct[order(ct$a_b), ]
row.names(ordered) <- NULL
expected <- data.frame("a_b" = c("a0", "a1", "a2"), "b0" = c(1, 0, 1), "b1" = c(1, 1, 0),
stringsAsFactors = FALSE, row.names = NULL)
expect_identical(expected, ordered)
})
test_that("cov() and corr() on a DataFrame", {
l <- lapply(c(0:9), function(x) { list(x, x * 2.0) })
df <- createDataFrame(sqlContext, l, c("singles", "doubles"))
result <- cov(df, "singles", "doubles")
expect_true(abs(result - 55.0 / 3) < 1e-12)
result <- corr(df, "singles", "doubles")
expect_true(abs(result - 1.0) < 1e-12)
result <- corr(df, "singles", "doubles", "pearson")
expect_true(abs(result - 1.0) < 1e-12)
# Test stats::cov is working
#expect_true(abs(max(cov(swiss)) - 1739.295) < 1e-3) # nolint
})
test_that("freqItems() on a DataFrame", {
input <- 1:1000
rdf <- data.frame(numbers = input, letters = as.character(input),
negDoubles = input * -1.0, stringsAsFactors = F)
rdf[ input %% 3 == 0, ] <- c(1, "1", -1)
df <- createDataFrame(sqlContext, rdf)
multiColResults <- freqItems(df, c("numbers", "letters"), support = 0.1)
expect_true(1 %in% multiColResults$numbers[[1]])
expect_true("1" %in% multiColResults$letters[[1]])
singleColResult <- freqItems(df, "negDoubles", support = 0.1)
expect_true(-1 %in% head(singleColResult$negDoubles)[[1]])
l <- lapply(c(0:99), function(i) {
if (i %% 2 == 0) { list(1L, -1.0) }
else { list(i, i * -1.0) }})
df <- createDataFrame(sqlContext, l, c("a", "b"))
result <- freqItems(df, c("a", "b"), 0.4)
expect_identical(result[[1]], list(list(1L, 99L)))
expect_identical(result[[2]], list(list(-1, -99)))
})
test_that("sampleBy() on a DataFrame", {
l <- lapply(c(0:99), function(i) { as.character(i %% 3) })
df <- createDataFrame(sqlContext, l, "key")
fractions <- list("0" = 0.1, "1" = 0.2)
sample <- sampleBy(df, "key", fractions, 0)
result <- collect(orderBy(count(groupBy(sample, "key")), "key"))
expect_identical(as.list(result[1, ]), list(key = "0", count = 3))
expect_identical(as.list(result[2, ]), list(key = "1", count = 7))
})
test_that("approxQuantile() on a DataFrame", {
l <- lapply(c(0:99), function(i) { i })
df <- createDataFrame(sqlContext, l, "key")
quantiles <- approxQuantile(df, "key", c(0.5, 0.8), 0.0)
expect_equal(quantiles[[1]], 50)
expect_equal(quantiles[[2]], 80)
})
test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
expect_equal(grepl("Table or view not found", retError), TRUE)
expect_equal(grepl("blah", retError), TRUE)
})
irisDF <- suppressWarnings(createDataFrame(sqlContext, iris))
test_that("Method as.data.frame as a synonym for collect()", {
expect_equal(as.data.frame(irisDF), collect(irisDF))
irisDF2 <- irisDF[irisDF$Species == "setosa", ]
expect_equal(as.data.frame(irisDF2), collect(irisDF2))
# Make sure as.data.frame in the R base package is not covered
expect_that(as.data.frame(c(1, 2)), not(throws_error()))
})
test_that("attach() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
expect_error(age)
attach(df)
expect_is(age, "SparkDataFrame")
expected_age <- data.frame(age = c(NA, 30, 19))
expect_equal(head(age), expected_age)
stat <- summary(age)
expect_equal(collect(stat)[5, "age"], "30")
age <- age$age + 1
expect_is(age, "Column")
rm(age)
stat2 <- summary(age)
expect_equal(collect(stat2)[5, "age"], "30")
detach("df")
stat3 <- summary(df[, "age"])
expect_equal(collect(stat3)[5, "age"], "30")
expect_error(age)
})
test_that("with() on a DataFrame", {
df <- suppressWarnings(createDataFrame(sqlContext, iris))
expect_error(Sepal_Length)
sum1 <- with(df, list(summary(Sepal_Length), summary(Sepal_Width)))
expect_equal(collect(sum1[[1]])[1, "Sepal_Length"], "150")
sum2 <- with(df, distinct(Sepal_Length))
expect_equal(nrow(sum2), 35)
})
test_that("Method coltypes() to get and set R's data types of a DataFrame", {
expect_equal(coltypes(irisDF), c(rep("numeric", 4), "character"))
data <- data.frame(c1 = c(1, 2, 3),
c2 = c(T, F, T),
c3 = c("2015/01/01 10:00:00", "2015/01/02 10:00:00", "2015/01/03 10:00:00"))
schema <- structType(structField("c1", "byte"),
structField("c3", "boolean"),
structField("c4", "timestamp"))
# Test primitive types
DF <- createDataFrame(sqlContext, data, schema)
expect_equal(coltypes(DF), c("integer", "logical", "POSIXct"))
# Test complex types
x <- createDataFrame(sqlContext, list(list(as.environment(
list("a" = "b", "c" = "d", "e" = "f")))))
expect_equal(coltypes(x), "map<string,string>")
df <- selectExpr(read.json(sqlContext, jsonPath), "name", "(age * 1.21) as age")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)")))
df1 <- select(df, cast(df$age, "integer"))
coltypes(df) <- c("character", "integer")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int")))
value <- collect(df[, 2])[[3, 1]]
expect_equal(value, collect(df1)[[3, 1]])
expect_equal(value, 22)
coltypes(df) <- c(NA, "numeric")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "double")))
expect_error(coltypes(df) <- c("character"),
"Length of type vector should match the number of columns for SparkDataFrame")
expect_error(coltypes(df) <- c("environment", "list"),
"Only atomic type is supported for column types")
})
test_that("Method str()", {
# Structure of Iris
iris2 <- iris
colnames(iris2) <- c("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species")
iris2$col <- TRUE
irisDF2 <- createDataFrame(sqlContext, iris2)
out <- capture.output(str(irisDF2))
expect_equal(length(out), 7)
expect_equal(out[1], "'SparkDataFrame': 6 variables:")
expect_equal(out[2], " $ Sepal_Length: num 5.1 4.9 4.7 4.6 5 5.4")
expect_equal(out[3], " $ Sepal_Width : num 3.5 3 3.2 3.1 3.6 3.9")
expect_equal(out[4], " $ Petal_Length: num 1.4 1.4 1.3 1.5 1.4 1.7")
expect_equal(out[5], " $ Petal_Width : num 0.2 0.2 0.2 0.2 0.2 0.4")
expect_equal(out[6], paste0(" $ Species : chr \"setosa\" \"setosa\" \"",
"setosa\" \"setosa\" \"setosa\" \"setosa\""))
expect_equal(out[7], " $ col : logi TRUE TRUE TRUE TRUE TRUE TRUE")
# A random dataset with many columns. This test is to check str limits
# the number of columns. Therefore, it will suffice to check for the
# number of returned rows
x <- runif(200, 1, 10)
df <- data.frame(t(as.matrix(data.frame(x, x, x, x, x, x, x, x, x))))
DF <- createDataFrame(sqlContext, df)
out <- capture.output(str(DF))
expect_equal(length(out), 103)
# Test utils:::str
expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris)))
})
unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)