Wenchen Fan 68d65fae71 [SPARK-19949][SQL] unify bad record handling in CSV and JSON
## What changes were proposed in this pull request?

Currently JSON and CSV have exactly the same logic about handling bad records, this PR tries to abstract it and put it in a upper level to reduce code duplication.

The overall idea is, we make the JSON and CSV parser to throw a BadRecordException, then the upper level, FailureSafeParser, handles bad records according to the parse mode.

Behavior changes:
1. with PERMISSIVE mode, if the number of tokens doesn't match the schema, previously CSV parser will treat it as a legal record and parse as many tokens as possible. After this PR, we treat it as an illegal record, and put the raw record string in a special column, but we still parse as many tokens as possible.
2. all logging is removed as they are not very useful in practice.

## How was this patch tested?

existing tests

Author: Wenchen Fan <>
Author: hyukjinkwon <>
Author: Wenchen Fan <>

Closes #17315 from cloud-fan/bad-record2.
2017-03-20 21:43:14 -07:00

2968 lines
110 KiB

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
context("SparkSQL functions")
# Utility function for easily checking the values of a StructField
checkStructField <- function(actual, expectedName, expectedType, expectedNullable) {
expect_equal(class(actual), "structField")
expect_equal(actual$name(), expectedName)
expect_equal(actual$dataType.toString(), expectedType)
expect_equal(actual$nullable(), expectedNullable)
markUtf8 <- function(s) {
Encoding(s) <- "UTF-8"
setHiveContext <- function(sc) {
if (exists(".testHiveSession", envir = .sparkREnv)) {
hiveSession <- get(".testHiveSession", envir = .sparkREnv)
} else {
# initialize once and reuse
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE)
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
hiveSession <- callJMethod(hiveCtx, "sparkSession")
previousSession <- get(".sparkRsession", envir = .sparkREnv)
assign(".sparkRsession", hiveSession, envir = .sparkREnv)
assign(".prevSparkRsession", previousSession, envir = .sparkREnv)
unsetHiveContext <- function() {
previousSession <- get(".prevSparkRsession", envir = .sparkREnv)
assign(".sparkRsession", previousSession, envir = .sparkREnv)
remove(".prevSparkRsession", envir = .sparkREnv)
# Tests for SparkSQL functions in SparkR
filesBefore <- list.files(path = sparkRDir, all.files = TRUE)
sparkSession <- sparkR.session()
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
mockLines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}")
jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc")
writeLines(mockLines, jsonPath)
# For test nafunctions, like dropna(), fillna(),...
mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesNa, jsonPathNa)
# For test complex types in DataFrame
mockLinesComplexType <-
c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}",
"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}",
"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}")
complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesComplexType, complexTypeJsonPath)
# For test map type and struct type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)
test_that("calling sparkRSQL.init returns existing SQL context", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
expect_equal(suppressWarnings(sparkRSQL.init(sc)), sqlContext)
test_that("calling sparkRSQL.init returns existing SparkSession", {
expect_equal(suppressWarnings(sparkRSQL.init(sc)), sparkSession)
test_that("calling sparkR.session returns existing SparkSession", {
expect_equal(sparkR.session(), sparkSession)
test_that("infer types and check types", {
expect_equal(infer_type(1L), "integer")
expect_equal(infer_type(1.0), "double")
expect_equal(infer_type("abc"), "string")
expect_equal(infer_type(TRUE), "boolean")
expect_equal(infer_type(as.Date("2015-03-11")), "date")
expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp")
expect_equal(infer_type(c(1L, 2L)), "array<integer>")
expect_equal(infer_type(list(1L, 2L)), "array<integer>")
expect_equal(infer_type(listToStruct(list(a = 1L, b = "2"))), "struct<a:integer,b:string>")
e <- new.env()
assign("a", 1L, envir = e)
expect_equal(infer_type(e), "map<string,integer>")
expect_error(checkType("map<integer,integer>"), "Key type in a map must be string or character")
expect_equal(infer_type(as.raw(c(1, 2, 3))), "binary")
test_that("structType and structField", {
testField <- structField("a", "string")
expect_is(testField, "structField")
expect_equal(testField$name(), "a")
testSchema <- structType(testField, structField("b", "integer"))
expect_is(testSchema, "structType")
expect_is(testSchema$fields()[[2]], "structField")
expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType")
test_that("create DataFrame from RDD", {
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
df <- createDataFrame(rdd, list("a", "b"))
dfAsDF <- as.DataFrame(rdd, list("a", "b"))
expect_is(df, "SparkDataFrame")
expect_is(dfAsDF, "SparkDataFrame")
expect_equal(count(df), 10)
expect_equal(count(dfAsDF), 10)
expect_equal(nrow(df), 10)
expect_equal(nrow(dfAsDF), 10)
expect_equal(ncol(df), 2)
expect_equal(ncol(dfAsDF), 2)
expect_equal(dim(df), c(10, 2))
expect_equal(dim(dfAsDF), c(10, 2))
expect_equal(columns(df), c("a", "b"))
expect_equal(columns(dfAsDF), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
expect_equal(dtypes(dfAsDF), list(c("a", "int"), c("b", "string")))
df <- createDataFrame(rdd)
dfAsDF <- as.DataFrame(rdd)
expect_is(df, "SparkDataFrame")
expect_is(dfAsDF, "SparkDataFrame")
expect_equal(columns(df), c("_1", "_2"))
expect_equal(columns(dfAsDF), c("_1", "_2"))
schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
structField(x = "b", type = "string", nullable = TRUE))
df <- createDataFrame(rdd, schema)
expect_is(df, "SparkDataFrame")
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) })
df <- createDataFrame(rdd)
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
df <- read.df(jsonPathNa, "json", schema)
df2 <- createDataFrame(toRDD(df), schema)
df2AsDF <- as.DataFrame(toRDD(df), schema)
expect_equal(columns(df2), c("name", "age", "height"))
expect_equal(columns(df2AsDF), c("name", "age", "height"))
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(dtypes(df2AsDF), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(as.list(collect(where(df2, df2$name == "Bob"))),
list(name = "Bob", age = 16, height = 176.5))
expect_equal(as.list(collect(where(df2AsDF, df2AsDF$name == "Bob"))),
list(name = "Bob", age = 16, height = 176.5))
localDF <- data.frame(name = c("John", "Smith", "Sarah"),
age = c(19L, 23L, 18L),
height = c(176.5, 181.4, 173.7))
df <- createDataFrame(localDF, schema)
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
expect_equal(columns(df), c("name", "age", "height"))
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(as.list(collect(where(df, df$name == "John"))),
list(name = "John", age = 19L, height = 176.5))
expect_equal(getNumPartitions(df), 1)
df <- as.DataFrame(cars, numPartitions = 2)
expect_equal(getNumPartitions(df), 2)
df <- createDataFrame(cars, numPartitions = 3)
expect_equal(getNumPartitions(df), 3)
# validate limit by num of rows
df <- createDataFrame(cars, numPartitions = 60)
expect_equal(getNumPartitions(df), 50)
# validate when 1 < (length(coll) / numSlices) << length(coll)
df <- createDataFrame(cars, numPartitions = 20)
expect_equal(getNumPartitions(df), 20)
df <- as.DataFrame(data.frame(0))
expect_is(df, "SparkDataFrame")
df <- createDataFrame(list(list(1)))
expect_is(df, "SparkDataFrame")
df <- as.DataFrame(data.frame(0), numPartitions = 2)
# no data to partition, goes to 1
expect_equal(getNumPartitions(df), 1)
sql("CREATE TABLE people (name string, age double, height float)")
df <- read.df(jsonPathNa, "json", schema)
invisible(insertInto(df, "people"))
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age,
expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height,
sql("DROP TABLE people")
test_that("createDataFrame uses files for large objects", {
# To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value
conf <- callJMethod(sparkSession, "conf")
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
df <- suppressWarnings(createDataFrame(iris, numPartitions = 3))
expect_equal(getNumPartitions(df), 3)
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))
expect_equal(dim(df), dim(iris))
test_that("read/write csv as DataFrame", {
csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
mockLinesCsv <- c("year,make,model,comment,blank",
"\"2012\",\"Tesla\",\"S\",\"No comment\",",
"1997,Ford,E350,\"Go get one now they are going fast\",",
writeLines(mockLinesCsv, csvPath)
# default "header" is false, inferSchema to handle "year" as "int"
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true")
expect_equal(count(df), 4)
expect_equal(columns(df), c("year", "make", "model", "comment", "blank"))
expect_equal(sort(unlist(collect(where(df, df$year == 2015)))),
sort(unlist(list(year = 2015, make = "Chevy", model = "Volt"))))
# since "year" is "int", let's skip the NA values
withoutna <- na.omit(df, how = "any", cols = "year")
expect_equal(count(withoutna), 3)
csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
mockLinesCsv <- c("year,make,model,comment,blank",
"\"2012\",\"Tesla\",\"S\",\"No comment\",",
"1997,Ford,E350,\"Go get one now they are going fast\",",
writeLines(mockLinesCsv, csvPath)
df2 <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "Empty")
expect_equal(count(df2), 4)
withoutna2 <- na.omit(df2, how = "any", cols = "year")
expect_equal(count(withoutna2), 3)
expect_equal(count(where(withoutna2, withoutna2$make == "Dummy")), 0)
# writing csv file
csvPath2 <- tempfile(pattern = "csvtest2", fileext = ".csv")
write.df(df2, path = csvPath2, "csv", header = "true")
df3 <- read.df(csvPath2, "csv", header = "true")
expect_equal(nrow(df3), nrow(df2))
expect_equal(colnames(df3), colnames(df2))
csv <- read.csv(file = list.files(csvPath2, pattern = "^part", full.names = T)[[1]])
expect_equal(colnames(df3), colnames(csv))
test_that("Support other types for options", {
csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
mockLinesCsv <- c("year,make,model,comment,blank",
"\"2012\",\"Tesla\",\"S\",\"No comment\",",
"1997,Ford,E350,\"Go get one now they are going fast\",",
writeLines(mockLinesCsv, csvPath)
csvDf <- read.df(csvPath, "csv", header = "true", inferSchema = "true")
expected <- read.df(csvPath, "csv", header = TRUE, inferSchema = TRUE)
expect_equal(collect(csvDf), collect(expected))
expect_error(read.df(csvPath, "csv", header = TRUE, maxColumns = 3))
test_that("convert NAs to null type in DataFrames", {
rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
df <- createDataFrame(rdd, list("a", "b"))
expect_true([2, "a"]))
expect_equal(collect(df)[2, "b"], 4L)
l <- data.frame(x = 1L, y = c(1L, NA_integer_, 3L))
df <- createDataFrame(l)
expect_equal(collect(df)[2, "x"], 1L)
expect_true([2, "y"]))
rdd <- parallelize(sc, list(list(1, 2), list(NA, 4)))
df <- createDataFrame(rdd, list("a", "b"))
expect_true([2, "a"]))
expect_equal(collect(df)[2, "b"], 4)
l <- data.frame(x = 1, y = c(1, NA_real_, 3))
df <- createDataFrame(l)
expect_equal(collect(df)[2, "x"], 1)
expect_true([2, "y"]))
l <- list("a", "b", NA, "d")
df <- createDataFrame(l)
expect_true([3, "_1"]))
expect_equal(collect(df)[4, "_1"], "d")
l <- list("a", "b", NA_character_, "d")
df <- createDataFrame(l)
expect_true([3, "_1"]))
expect_equal(collect(df)[4, "_1"], "d")
l <- list(TRUE, FALSE, NA, TRUE)
df <- createDataFrame(l)
expect_true([3, "_1"]))
expect_equal(collect(df)[4, "_1"], TRUE)
test_that("toDF", {
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
df <- toDF(rdd, list("a", "b"))
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
df <- toDF(rdd)
expect_is(df, "SparkDataFrame")
expect_equal(columns(df), c("_1", "_2"))
schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
structField(x = "b", type = "string", nullable = TRUE))
df <- toDF(rdd, schema)
expect_is(df, "SparkDataFrame")
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) })
df <- toDF(rdd)
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
test_that("create DataFrame from list or data.frame", {
l <- list(list(1, 2), list(3, 4))
df <- createDataFrame(l, c("a", "b"))
expect_equal(columns(df), c("a", "b"))
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(l)
expect_equal(columns(df), c("a", "b"))
a <- 1:3
b <- c("a", "b", "c")
ldf <- data.frame(a, b)
df <- createDataFrame(ldf)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
expect_equal(count(df), 3)
ldf2 <- collect(df)
expect_equal(ldf$a, ldf2$a)
irisdf <- suppressWarnings(createDataFrame(iris))
iris_collected <- collect(irisdf)
expect_equivalent(iris_collected[, -5], iris[, -5])
expect_equal(iris_collected$Species, as.character(iris$Species))
mtcarsdf <- createDataFrame(mtcars)
expect_equivalent(collect(mtcarsdf), mtcars)
bytes <- as.raw(c(1, 2, 3))
df <- createDataFrame(list(list(bytes)))
expect_equal(collect(df)[[1]][[1]], bytes)
test_that("create DataFrame with different data types", {
l <- list(a = 1L, b = 2, c = TRUE, d = "ss", e = as.Date("2012-12-13"),
f = as.POSIXct("2015-03-15 12:13:14.056"))
df <- createDataFrame(list(l))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "double"), c("c", "boolean"),
c("d", "string"), c("e", "date"), c("f", "timestamp")))
expect_equal(count(df), 1)
expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
df <- data.frame(
id = 1:2,
time = c(as.POSIXlt("2016-01-10"), NA),
date = c(as.Date("2016-10-01"), NA))
DF <- collect(createDataFrame(df))
expect_equal(DF$date[1], as.Date("2016-10-01"))
expect_equal(DF$time[1], as.POSIXlt("2016-01-10"))
test_that("create DataFrame with complex types", {
e <- new.env()
assign("n", 3L, envir = e)
s <- listToStruct(list(a = "aa", b = 3L))
l <- list(as.list(1:10), list("a", "b"), e, s)
df <- createDataFrame(list(l), c("a", "b", "c", "d"))
expect_equal(dtypes(df), list(c("a", "array<int>"),
c("b", "array<string>"),
c("c", "map<string,int>"),
c("d", "struct<a:string,b:int>")))
expect_equal(count(df), 1)
ldf <- collect(df)
expect_equal(names(ldf), c("a", "b", "c", "d"))
expect_equal(ldf[1, 1][[1]], l[[1]])
expect_equal(ldf[1, 2][[1]], l[[2]])
e <- ldf$c[[1]]
expect_equal(class(e), "environment")
expect_equal(ls(e), "n")
expect_equal(e$n, 3L)
s <- ldf$d[[1]]
expect_equal(class(s), "struct")
expect_equal(s$a, "aa")
expect_equal(s$b, 3L)
test_that("create DataFrame from a data.frame with complex types", {
ldf <- data.frame(row.names = 1:2)
ldf$a_list <- list(list(1, 2), list(3, 4))
ldf$an_envir <- c(as.environment(list(a = 1, b = 2)), as.environment(list(c = 3)))
sdf <- createDataFrame(ldf)
collected <- collect(sdf)
expect_identical(ldf[, 1, FALSE], collected[, 1, FALSE])
expect_equal(ldf$an_envir, collected$an_envir)
test_that("Collect DataFrame with complex types", {
# ArrayType
df <- read.json(complexTypeJsonPath)
ldf <- collect(df)
expect_equal(nrow(ldf), 3)
expect_equal(ncol(ldf), 3)
expect_equal(names(ldf), c("c1", "c2", "c3"))
expect_equal(ldf$c1, list(list(1, 2, 3), list(4, 5, 6), list (7, 8, 9)))
expect_equal(ldf$c2, list(list("a", "b", "c"), list("d", "e", "f"), list ("g", "h", "i")))
expect_equal(ldf$c3, list(list(1.0, 2.0, 3.0), list(4.0, 5.0, 6.0), list (7.0, 8.0, 9.0)))
# MapType
schema <- structType(structField("name", "string"),
structField("info", "map<string,double>"))
df <- read.df(mapTypeJsonPath, "json", schema)
expect_equal(dtypes(df), list(c("name", "string"),
c("info", "map<string,double>")))
ldf <- collect(df)
expect_equal(nrow(ldf), 3)
expect_equal(ncol(ldf), 2)
expect_equal(names(ldf), c("name", "info"))
expect_equal(ldf$name, c("Bob", "Alice", "David"))
bob <- ldf$info[[1]]
expect_equal(class(bob), "environment")
expect_equal(bob$age, 16)
expect_equal(bob$height, 176.5)
# StructType
df <- read.json(mapTypeJsonPath)
expect_equal(dtypes(df), list(c("info", "struct<age:bigint,height:double>"),
c("name", "string")))
ldf <- collect(df)
expect_equal(nrow(ldf), 3)
expect_equal(ncol(ldf), 2)
expect_equal(names(ldf), c("info", "name"))
expect_equal(ldf$name, c("Bob", "Alice", "David"))
bob <- ldf$info[[1]]
expect_equal(class(bob), "struct")
expect_equal(bob$age, 16)
expect_equal(bob$height, 176.5)
test_that("read/write json files", {
# Test read.df
df <- read.df(jsonPath, "json")
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
# Test read.df with a user defined schema
schema <- structType(structField("name", type = "string"),
structField("age", type = "double"))
df1 <- read.df(jsonPath, "json", schema)
expect_is(df1, "SparkDataFrame")
expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
# Test loadDF
df2 <- loadDF(jsonPath, "json", schema)
expect_is(df2, "SparkDataFrame")
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
# Test read.json
df <- read.json(jsonPath)
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
# Test write.df
jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".json")
write.df(df, jsonPath2, "json", mode = "overwrite")
# Test write.json
jsonPath3 <- tempfile(pattern = "jsonPath3", fileext = ".json")
write.json(df, jsonPath3)
# Test read.json()/jsonFile() works with multiple input paths
jsonDF1 <- read.json(c(jsonPath2, jsonPath3))
expect_is(jsonDF1, "SparkDataFrame")
expect_equal(count(jsonDF1), 6)
# Suppress warnings because jsonFile is deprecated
jsonDF2 <- suppressWarnings(jsonFile(c(jsonPath2, jsonPath3)))
expect_is(jsonDF2, "SparkDataFrame")
expect_equal(count(jsonDF2), 6)
test_that("read/write json files - compression option", {
df <- read.df(jsonPath, "json")
jsonPath <- tempfile(pattern = "jsonPath", fileext = ".json")
write.json(df, jsonPath, compression = "gzip")
jsonDF <- read.json(jsonPath)
expect_is(jsonDF, "SparkDataFrame")
expect_equal(count(jsonDF), count(df))
expect_true(length(list.files(jsonPath, pattern = ".gz")) > 0)
test_that("jsonRDD() on a RDD with json string", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
rdd <- parallelize(sc, mockLines)
expect_equal(countRDD(rdd), 3)
df <- suppressWarnings(jsonRDD(sqlContext, rdd))
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
rdd2 <- flatMap(rdd, function(x) c(x, x))
df <- suppressWarnings(jsonRDD(sqlContext, rdd2))
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 6)
test_that("test tableNames and tables", {
df <- read.json(jsonPath)
createOrReplaceTempView(df, "table1")
expect_equal(length(tableNames()), 1)
tables <- tables()
expect_equal(count(tables), 1)
suppressWarnings(registerTempTable(df, "table2"))
tables <- tables()
expect_equal(count(tables), 2)
tables <- tables()
expect_equal(count(tables), 0)
"createOrReplaceTempView() results in a queryable table and sql() results in a new DataFrame", {
df <- read.json(jsonPath)
createOrReplaceTempView(df, "table1")
newdf <- sql("SELECT * FROM table1 where name = 'Michael'")
expect_is(newdf, "SparkDataFrame")
expect_equal(count(newdf), 1)
createOrReplaceTempView(df, "dfView")
sqlCast <- collect(sql("select cast('2' as decimal) as x from dfView limit 1"))
out <- capture.output(sqlCast)
expect_equal(names(sqlCast)[1], "x")
expect_equal(nrow(sqlCast), 1)
expect_equal(ncol(sqlCast), 1)
expect_equal(out[1], " x")
expect_equal(out[2], "1 2")
test_that("test cache, uncache and clearCache", {
df <- read.json(jsonPath)
createOrReplaceTempView(df, "table1")
test_that("insertInto() on a registered table", {
df <- read.df(jsonPath, "json")
write.df(df, parquetPath, "parquet", "overwrite")
dfParquet <- read.df(parquetPath, "parquet")
lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".tmp")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
writeLines(lines, jsonPath2)
df2 <- read.df(jsonPath2, "json")
write.df(df2, parquetPath2, "parquet", "overwrite")
dfParquet2 <- read.df(parquetPath2, "parquet")
createOrReplaceTempView(dfParquet, "table1")
insertInto(dfParquet2, "table1")
expect_equal(count(sql("select * from table1")), 5)
expect_equal(first(sql("select * from table1 order by age"))$name, "Michael")
createOrReplaceTempView(dfParquet, "table1")
insertInto(dfParquet2, "table1", overwrite = TRUE)
expect_equal(count(sql("select * from table1")), 2)
expect_equal(first(sql("select * from table1 order by age"))$name, "Bob")
test_that("tableToDF() returns a new DataFrame", {
df <- read.json(jsonPath)
createOrReplaceTempView(df, "table1")
tabledf <- tableToDF("table1")
expect_is(tabledf, "SparkDataFrame")
expect_equal(count(tabledf), 3)
tabledf2 <- tableToDF("table1")
expect_equal(count(tabledf2), 3)
test_that("toRDD() returns an RRDD", {
df <- read.json(jsonPath)
testRDD <- toRDD(df)
expect_is(testRDD, "RDD")
expect_equal(countRDD(testRDD), 3)
test_that("union on two RDDs created from DataFrames returns an RRDD", {
df <- read.json(jsonPath)
RDD1 <- toRDD(df)
RDD2 <- toRDD(df)
unioned <- unionRDD(RDD1, RDD2)
expect_is(unioned, "RDD")
expect_equal(getSerializedMode(unioned), "byte")
expect_equal(collectRDD(unioned)[[2]]$name, "Andy")
test_that("union on mixed serialization types correctly returns a byte RRDD", {
# Byte RDD
nums <- 1:10
rdd <- parallelize(sc, nums, 2L)
# String RDD
textLines <- c("Michael",
"Andy, 30",
"Justin, 19")
textPath <- tempfile(pattern = "sparkr-textLines", fileext = ".tmp")
writeLines(textLines, textPath)
textRDD <- textFile(sc, textPath)
df <- read.json(jsonPath)
dfRDD <- toRDD(df)
unionByte <- unionRDD(rdd, dfRDD)
expect_is(unionByte, "RDD")
expect_equal(getSerializedMode(unionByte), "byte")
expect_equal(collectRDD(unionByte)[[1]], 1)
expect_equal(collectRDD(unionByte)[[12]]$name, "Andy")
unionString <- unionRDD(textRDD, dfRDD)
expect_is(unionString, "RDD")
expect_equal(getSerializedMode(unionString), "byte")
expect_equal(collectRDD(unionString)[[1]], "Michael")
expect_equal(collectRDD(unionString)[[5]]$name, "Andy")
test_that("objectFile() works with row serialization", {
objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp")
df <- read.json(jsonPath)
dfRDD <- toRDD(df)
saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath)
objectIn <- objectFile(sc, objectPath)
expect_is(objectIn, "RDD")
expect_equal(getSerializedMode(objectIn), "byte")
expect_equal(collectRDD(objectIn)[[2]]$age, 30)
test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
df <- read.json(jsonPath)
testRDD <- lapply(df, function(row) {
row$newCol <- row$age + 5
expect_is(testRDD, "RDD")
collected <- collectRDD(testRDD)
expect_equal(collected[[1]]$name, "Michael")
expect_equal(collected[[2]]$newCol, 35)
test_that("collect() returns a data.frame", {
df <- read.json(jsonPath)
rdf <- collect(df)
expect_equal(names(rdf)[1], "age")
expect_equal(nrow(rdf), 3)
expect_equal(ncol(rdf), 2)
# collect() returns data correctly from a DataFrame with 0 row
df0 <- limit(df, 0)
rdf <- collect(df0)
expect_equal(names(rdf)[1], "age")
expect_equal(nrow(rdf), 0)
expect_equal(ncol(rdf), 2)
# collect() correctly handles multiple columns with same name
df <- createDataFrame(list(list(1, 2)), schema = c("name", "name"))
ldf <- collect(df)
expect_equal(names(ldf), c("name", "name"))
test_that("limit() returns DataFrame with the correct number of rows", {
df <- read.json(jsonPath)
dfLimited <- limit(df, 2)
expect_is(dfLimited, "SparkDataFrame")
expect_equal(count(dfLimited), 2)
test_that("collect() and take() on a DataFrame return the same number of rows and columns", {
df <- read.json(jsonPath)
expect_equal(nrow(collect(df)), nrow(take(df, 10)))
expect_equal(ncol(collect(df)), ncol(take(df, 10)))
test_that("collect() support Unicode characters", {
lines <- c("{\"name\":\"안녕하세요\"}",
"{\"name\":\"您好\", \"age\":30}",
"{\"name\":\"こんにちは\", \"age\":19}",
"{\"name\":\"Xin chào\"}")
jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(lines, jsonPath)
df <- read.df(jsonPath, "json")
rdf <- collect(df)
expect_equal(rdf$name[1], markUtf8("안녕하세요"))
expect_equal(rdf$name[2], markUtf8("您好"))
expect_equal(rdf$name[3], markUtf8("こんにちは"))
expect_equal(rdf$name[4], markUtf8("Xin chào"))
df1 <- createDataFrame(rdf)
expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好"))
test_that("multiple pipeline transformations result in an RDD with the correct values", {
df <- read.json(jsonPath)
first <- lapply(df, function(row) {
row$age <- row$age + 5
second <- lapply(first, function(row) {
row$testCol <- if (row$age == 35 && !$age)) TRUE else FALSE
expect_is(second, "RDD")
expect_equal(countRDD(second), 3)
expect_equal(collectRDD(second)[[2]]$age, 35)
test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame", {
df <- read.json(jsonPath)
persist(df, "MEMORY_AND_DISK")
"MEMORY_AND_DISK - StorageLevel(disk, memory, deserialized, 1 replicas)")
# make sure the data is collectable
test_that("setCheckpointDir(), checkpoint() on a DataFrame", {
checkpointDir <- file.path(tempdir(), "cproot")
expect_true(length(list.files(path = checkpointDir, all.files = TRUE)) == 0)
df <- read.json(jsonPath)
df <- checkpoint(df)
expect_is(df, "SparkDataFrame")
expect_false(length(list.files(path = checkpointDir, all.files = TRUE)) == 0)
test_that("schema(), dtypes(), columns(), names() return the correct values/format", {
df <- read.json(jsonPath)
testSchema <- schema(df)
expect_equal(length(testSchema$fields()), 2)
expect_equal(testSchema$fields()[[1]]$dataType.toString(), "LongType")
expect_equal(testSchema$fields()[[2]]$dataType.simpleString(), "string")
expect_equal(testSchema$fields()[[1]]$name(), "age")
testTypes <- dtypes(df)
expect_equal(length(testTypes[[1]]), 2)
expect_equal(testTypes[[1]][1], "age")
testCols <- columns(df)
expect_equal(length(testCols), 2)
expect_equal(testCols[2], "name")
testNames <- names(df)
expect_equal(length(testNames), 2)
expect_equal(testNames[2], "name")
test_that("names() colnames() set the column names", {
df <- read.json(jsonPath)
names(df) <- c("col1", "col2")
expect_equal(colnames(df)[2], "col2")
colnames(df) <- c("col3", "col4")
expect_equal(names(df)[1], "col3")
expect_error(names(df) <- NULL, "Invalid column names.")
expect_error(names(df) <- c("sepal.length", "sepal_width"),
"Column names cannot contain the '.' symbol.")
expect_error(names(df) <- c(1, 2), "Invalid column names.")
expect_error(names(df) <- c("a"),
"Column names must have the same length as the number of columns in the dataset.")
expect_error(names(df) <- c("1", NA), "Column names cannot be NA.")
expect_error(colnames(df) <- c("sepal.length", "sepal_width"),
"Column names cannot contain the '.' symbol.")
expect_error(colnames(df) <- c(1, 2), "Invalid column names.")
expect_error(colnames(df) <- c("a"),
"Column names must have the same length as the number of columns in the dataset.")
expect_error(colnames(df) <- c("1", NA), "Column names cannot be NA.")
# Note: if this test is broken, remove check for "." character on colnames<- method
irisDF <- suppressWarnings(createDataFrame(iris))
expect_equal(names(irisDF)[1], "Sepal_Length")
# Test base::colnames base::names
m2 <- cbind(1, 1:4)
expect_equal(colnames(m2, do.NULL = FALSE), c("col1", "col2"))
colnames(m2) <- c("x", "Y")
expect_equal(colnames(m2), c("x", "Y"))
z <- list(a = 1, b = "c", c = 1:3)
expect_equal(names(z)[3], "c")
names(z)[3] <- "c2"
expect_equal(names(z)[3], "c2")
# Test subset assignment
colnames(df)[1] <- "col5"
expect_equal(colnames(df)[1], "col5")
names(df)[2] <- "col6"
expect_equal(names(df)[2], "col6")
test_that("head() and first() return the correct data", {
df <- read.json(jsonPath)
testHead <- head(df)
expect_equal(nrow(testHead), 3)
expect_equal(ncol(testHead), 2)
testHead2 <- head(df, 2)
expect_equal(nrow(testHead2), 2)
expect_equal(ncol(testHead2), 2)
testFirst <- first(df)
expect_equal(nrow(testFirst), 1)
# head() and first() return the correct data on
# a DataFrame with 0 row
df0 <- limit(df, 0)
testHead <- head(df0)
expect_equal(nrow(testHead), 0)
expect_equal(ncol(testHead), 2)
testFirst <- first(df0)
expect_equal(nrow(testFirst), 0)
expect_equal(ncol(testFirst), 2)
test_that("distinct(), unique() and dropDuplicates() on DataFrames", {
lines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}",
"{\"name\":\"Justin\", \"age\":19}")
jsonPathWithDup <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(lines, jsonPathWithDup)
df <- read.json(jsonPathWithDup)
uniques <- distinct(df)
expect_is(uniques, "SparkDataFrame")
expect_equal(count(uniques), 3)
uniques2 <- unique(df)
expect_is(uniques2, "SparkDataFrame")
expect_equal(count(uniques2), 3)
# Test dropDuplicates()
df <- createDataFrame(
list(2, 1, 2), list(1, 1, 1),
list(1, 2, 1), list(2, 1, 2),
list(2, 2, 2), list(2, 2, 1),
list(2, 1, 1), list(1, 1, 2),
list(1, 2, 2), list(1, 2, 1)),
schema = c("key", "value1", "value2"))
result <- collect(dropDuplicates(df))
expected <-
c(1, 1, 1), c(1, 1, 2), c(1, 2, 1),
c(1, 2, 2), c(2, 1, 1), c(2, 1, 2),
c(2, 2, 1), c(2, 2, 2))
names(expected) <- c("key", "value1", "value2")
result[order(result$key, result$value1, result$value2), ],
result <- collect(dropDuplicates(df, c("key", "value1")))
expected <-
c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2))
names(expected) <- c("key", "value1", "value2")
result[order(result$key, result$value1, result$value2), ],
result <- collect(dropDuplicates(df, "key", "value1"))
expected <-
c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2))
names(expected) <- c("key", "value1", "value2")
result[order(result$key, result$value1, result$value2), ],
result <- collect(dropDuplicates(df, "key"))
expected <-
c(1, 1, 1), c(2, 1, 2))
names(expected) <- c("key", "value1", "value2")
result[order(result$key, result$value1, result$value2), ],
test_that("sample on a DataFrame", {
df <- read.json(jsonPath)
sampled <- sample(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_is(sampled, "SparkDataFrame")
sampled2 <- sample(df, FALSE, 0.1, 0) # set seed for predictable result
expect_true(count(sampled2) < 3)
count1 <- count(sample(df, FALSE, 0.1, 0))
count2 <- count(sample(df, FALSE, 0.1, 0))
expect_equal(count1, count2)
# Also test sample_frac
sampled3 <- sample_frac(df, FALSE, 0.1, 0) # set seed for predictable result
expect_true(count(sampled3) < 3)
# nolint start
# Test base::sample is working
#expect_equal(length(sample(1:12)), 12)
# nolint end
test_that("select operators", {
df <- select(read.json(jsonPath), "name", "age")
expect_is(df$name, "Column")
expect_is(df[[2]], "Column")
expect_is(df[["age"]], "Column")
"Subset index has length > 1. Only the first index is used.")
expect_is(suppressWarnings(df[[1:2]]), "Column")
expect_warning(df[[c("name", "age")]],
"Subset index has length > 1. Only the first index is used.")
expect_is(suppressWarnings(df[[c("name", "age")]]), "Column")
expect_warning(df[[1:2]] <- df[[1]],
"Subset index has length > 1. Only the first index is used.")
expect_warning(df[[c("name", "age")]] <- df[[1]],
"Subset index has length > 1. Only the first index is used.")
expect_is(df[, 1, drop = F], "SparkDataFrame")
expect_equal(columns(df[, 1, drop = F]), c("name"))
expect_equal(columns(df[, "age", drop = F]), c("age"))
df2 <- df[, c("age", "name")]
expect_is(df2, "SparkDataFrame")
expect_equal(columns(df2), c("age", "name"))
df$age2 <- df$age
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == df$age)), 2)
df$age2 <- df$age * 2
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == df$age * 2)), 2)
df$age2 <- df[["age"]] * 3
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == df$age * 3)), 2)
df$age2 <- 21
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == 21)), 3)
df$age2 <- c(22)
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == 22)), 3)
expect_error(df$age3 <- c(22, NA),
"value must be a Column, literal value as atomic in length of 1, or NULL")
df[["age2"]] <- 23
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == 23)), 3)
df[[3]] <- 24
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == 24)), 3)
df[[3]] <- df$age
expect_equal(count(where(df, df$age2 == df$age)), 2)
df[["age2"]] <- df[["name"]]
expect_equal(count(where(df, df$age2 == df$name)), 3)
expect_error(df[["age3"]] <- c(22, 23),
"value must be a Column, literal value as atomic in length of 1, or NULL")
# Test parameter drop
expect_equal(class(df[, 1]) == "SparkDataFrame", T)
expect_equal(class(df[, 1, drop = T]) == "Column", T)
expect_equal(class(df[, 1, drop = F]) == "SparkDataFrame", T)
expect_equal(class(df[df$age > 4, 2, drop = T]) == "Column", T)
expect_equal(class(df[df$age > 4, 2, drop = F]) == "SparkDataFrame", T)
test_that("select with column", {
df <- read.json(jsonPath)
df1 <- select(df, "name")
expect_equal(columns(df1), c("name"))
expect_equal(count(df1), 3)
df2 <- select(df, df$age)
expect_equal(columns(df2), c("age"))
expect_equal(count(df2), 3)
df3 <- select(df, lit("x"))
expect_equal(columns(df3), c("x"))
expect_equal(count(df3), 3)
expect_equal(collect(select(df3, "x"))[[1, 1]], "x")
df4 <- select(df, c("name", "age"))
expect_equal(columns(df4), c("name", "age"))
expect_equal(count(df4), 3)
expect_error(select(df, c("name", "age"), "name"),
"To select multiple columns, use a character vector or list for col")
test_that("drop column", {
df <- select(read.json(jsonPath), "name", "age")
df1 <- drop(df, "name")
expect_equal(columns(df1), c("age"))
df$age2 <- df$age
df1 <- drop(df, c("name", "age"))
expect_equal(columns(df1), c("age2"))
df1 <- drop(df, df$age)
expect_equal(columns(df1), c("name", "age2"))
df$age2 <- NULL
expect_equal(columns(df), c("name", "age"))
df$age3 <- NULL
expect_equal(columns(df), c("name", "age"))
# Test to make sure base::drop is not masked
expect_equal(drop(1:3 %*% 2:4), 20)
test_that("subsetting", {
# read.json returns columns in random order
df <- select(read.json(jsonPath), "name", "age")
filtered <- df[df$age > 20, ]
expect_equal(count(filtered), 1)
expect_equal(columns(filtered), c("name", "age"))
expect_equal(collect(filtered)$name, "Andy")
df2 <- df[df$age == 19, 1, drop = F]
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 1)
expect_equal(columns(df2), c("name"))
expect_equal(collect(df2)$name, "Justin")
df3 <- df[df$age > 20, 2, drop = F]
expect_equal(count(df3), 1)
expect_equal(columns(df3), c("age"))
df4 <- df[df$age %in% c(19, 30), 1:2]
expect_equal(count(df4), 2)
expect_equal(columns(df4), c("name", "age"))
df5 <- df[df$age %in% c(19), c(1, 2)]
expect_equal(count(df5), 1)
expect_equal(columns(df5), c("name", "age"))
df6 <- subset(df, df$age %in% c(30), c(1, 2))
expect_equal(count(df6), 1)
expect_equal(columns(df6), c("name", "age"))
df7 <- subset(df, select = "name", drop = F)
expect_equal(count(df7), 3)
expect_equal(columns(df7), c("name"))
# Test base::subset is working
expect_equal(nrow(subset(airquality, Temp > 80, select = c(Ozone, Temp))), 68)
test_that("selectExpr() on a DataFrame", {
df <- read.json(jsonPath)
selected <- selectExpr(df, "age * 2")
expect_equal(names(selected), "(age * 2)")
expect_equal(collect(selected), collect(select(df, df$age * 2L)))
selected2 <- selectExpr(df, "name as newName", "abs(age) as age")
expect_equal(names(selected2), c("newName", "age"))
expect_equal(count(selected2), 3)
test_that("expr() on a DataFrame", {
df <- read.json(jsonPath)
expect_equal(collect(select(df, expr("abs(-123)")))[1, 1], 123)
test_that("column calculation", {
df <- read.json(jsonPath)
d <- collect(select(df, alias(df$age + 1, "age2")))
expect_equal(names(d), c("age2"))
df2 <- select(df, lower(df$name), abs(df$age))
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)
test_that("test HiveContext", {
df <- createExternalTable("json", jsonPath, "json")
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
df2 <- sql("select * from json")
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)
jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2))
df3 <- sql("select * from json2")
expect_is(df3, "SparkDataFrame")
expect_equal(count(df3), 3)
hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
df4 <- sql("select * from hivetestbl")
expect_is(df4, "SparkDataFrame")
expect_equal(count(df4), 3)
parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath))
df5 <- sql("select * from parquetest")
expect_is(df5, "SparkDataFrame")
expect_equal(count(df5), 3)
test_that("column operators", {
c <- column("a")
c2 <- (- c + 1 - 2) * 3 / 4.0
c3 <- (c + c2 - c2) * c2 %% c2
c4 <- (c > c2) & (c2 <= c3) | (c == c2) & (c2 != c3)
c5 <- c2 ^ c3 ^ c4
test_that("column functions", {
c <- column("a")
c1 <- abs(c) + acos(c) + approxCountDistinct(c) + ascii(c) + asin(c) + atan(c)
c2 <- avg(c) + base64(c) + bin(c) + bitwiseNOT(c) + cbrt(c) + ceil(c) + cos(c)
c3 <- cosh(c) + count(c) + crc32(c) + hash(c) + exp(c)
c4 <- explode(c) + expm1(c) + factorial(c) + first(c) + floor(c) + hex(c)
c5 <- hour(c) + initcap(c) + last(c) + last_day(c) + length(c)
c6 <- log(c) + (c) + log1p(c) + log2(c) + lower(c) + ltrim(c) + max(c) + md5(c)
c7 <- mean(c) + min(c) + month(c) + negate(c) + posexplode(c) + quarter(c)
c8 <- reverse(c) + rint(c) + round(c) + rtrim(c) + sha1(c) + monotonically_increasing_id()
c9 <- signum(c) + sin(c) + sinh(c) + size(c) + stddev(c) + soundex(c) + sqrt(c) + sum(c)
c10 <- sumDistinct(c) + tan(c) + tanh(c) + toDegrees(c) + toRadians(c)
c11 <- to_date(c) + trim(c) + unbase64(c) + unhex(c) + upper(c)
c12 <- variance(c)
c13 <- lead("col", 1) + lead(c, 1) + lag("col", 1) + lag(c, 1)
c14 <- cume_dist() + ntile(1) + corr(c, c1)
c15 <- dense_rank() + percent_rank() + rank() + row_number()
c16 <- is.nan(c) + isnan(c) + isNaN(c)
c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3)
c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy")
# Test if base::is.nan() is exposed
expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))
# Test if base::rank() is exposed
expect_equal(class(rank())[[1]], "Column")
expect_equal(rank(1:3), as.numeric(c(1:3)))
df <- read.json(jsonPath)
df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
expect_equal(collect(df2)[[2, 1]], TRUE)
expect_equal(collect(df2)[[2, 2]], FALSE)
expect_equal(collect(df2)[[3, 1]], FALSE)
expect_equal(collect(df2)[[3, 2]], TRUE)
df3 <- select(df, between(df$name, c("Apache", "Spark")))
expect_equal(collect(df3)[[1, 1]], TRUE)
expect_equal(collect(df3)[[2, 1]], FALSE)
expect_equal(collect(df3)[[3, 1]], TRUE)
df4 <- select(df, countDistinct(df$age, df$name))
expect_equal(collect(df4)[[1, 1]], 2)
expect_equal(collect(select(df, sum(df$age)))[1, 1], 49)
expect_true(abs(collect(select(df, stddev(df$age)))[1, 1] - 7.778175) < 1e-6)
expect_equal(collect(select(df, var_pop(df$age)))[1, 1], 30.25)
df5 <- createDataFrame(list(list(a = "010101")))
expect_equal(collect(select(df5, conv(df5$a, 2, 16)))[1, 1], "15")
# Test array_contains() and sort_array()
df <- createDataFrame(list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L))))
result <- collect(select(df, array_contains(df[[1]], 1L)))[[1]]
expect_equal(result, c(TRUE, FALSE))
result <- collect(select(df, sort_array(df[[1]], FALSE)))[[1]]
expect_equal(result, list(list(3L, 2L, 1L), list(6L, 5L, 4L)))
result <- collect(select(df, sort_array(df[[1]])))[[1]]
expect_equal(result, list(list(1L, 2L, 3L), list(4L, 5L, 6L)))
# Test that stats::lag is working
expect_equal(length(lag(ldeaths, 12)), 72)
# Test struct()
df <- createDataFrame(list(list(1L, 2L, 3L), list(4L, 5L, 6L)),
schema = c("a", "b", "c"))
result <- collect(select(df, alias(struct("a", "c"), "d")))
expected <- data.frame(row.names = 1:2)
expected$"d" <- list(listToStruct(list(a = 1L, c = 3L)),
listToStruct(list(a = 4L, c = 6L)))
expect_equal(result, expected)
result <- collect(select(df, alias(struct(df$a, df$b), "d")))
expected <- data.frame(row.names = 1:2)
expected$"d" <- list(listToStruct(list(a = 1L, b = 2L)),
listToStruct(list(a = 4L, b = 5L)))
expect_equal(result, expected)
# Test encode(), decode()
bytes <- as.raw(c(0xe5, 0xa4, 0xa7, 0xe5, 0x8d, 0x83, 0xe4, 0xb8, 0x96, 0xe7, 0x95, 0x8c))
df <- createDataFrame(list(list(markUtf8("大千世界"), "utf-8", bytes)),
schema = c("a", "b", "c"))
result <- collect(select(df, encode(df$a, "utf-8"), decode(df$c, "utf-8")))
expect_equal(result[[1]][[1]], bytes)
expect_equal(result[[2]], markUtf8("大千世界"))
# Test first(), last()
df <- read.json(jsonPath)
expect_equal(collect(select(df, first(df$age)))[[1]], NA_real_)
expect_equal(collect(select(df, first(df$age, TRUE)))[[1]], 30)
expect_equal(collect(select(df, first("age")))[[1]], NA_real_)
expect_equal(collect(select(df, first("age", TRUE)))[[1]], 30)
expect_equal(collect(select(df, last(df$age)))[[1]], 19)
expect_equal(collect(select(df, last(df$age, TRUE)))[[1]], 19)
expect_equal(collect(select(df, last("age")))[[1]], 19)
expect_equal(collect(select(df, last("age", TRUE)))[[1]], 19)
# Test bround()
df <- createDataFrame(data.frame(x = c(2.5, 3.5)))
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)
# Test to_json(), from_json()
df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
j <- collect(select(df, alias(to_json(df$people), "json")))
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
df <- read.json(mapTypeJsonPath)
j <- collect(select(df, alias(to_json(df$info), "json")))
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
df <- as.DataFrame(j)
schema <- structType(structField("age", "integer"),
structField("height", "double"))
s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
expect_equal(ncol(s), 1)
expect_equal(nrow(s), 3)
expect_is(s[[1]][[1]], "struct")
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } )))
# passing option
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
schema2 <- structType(structField("date", "date"))
s <- collect(select(df, from_json(df$col, schema2)))
expect_equal(s[[1]][[1]], NA)
s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy")))
expect_is(s[[1]][[1]]$date, "Date")
expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21")
# check for unparseable
df <- as.DataFrame(list(list("a" = "")))
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
df <- as.DataFrame(list(list("people" = jsonArr)))
schema <- structType(structField("name", "string"))
arr <- collect(select(df, alias(from_json(df$people, schema, asJsonArray = TRUE), "arrcol")))
expect_equal(ncol(arr), 1)
expect_equal(nrow(arr), 1)
expect_is(arr[[1]][[1]], "list")
expect_equal(length(arr$arrcol[[1]]), 2)
expect_equal(arr$arrcol[[1]][[1]]$name, "Bob")
expect_equal(arr$arrcol[[1]][[2]]$name, "Alice")
test_that("column binary mathfunctions", {
lines <- c("{\"a\":1, \"b\":5}",
"{\"a\":2, \"b\":6}",
"{\"a\":3, \"b\":7}",
"{\"a\":4, \"b\":8}")
jsonPathWithDup <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(lines, jsonPathWithDup)
df <- read.json(jsonPathWithDup)
expect_equal(collect(select(df, atan2(df$a, df$b)))[1, "ATAN2(a, b)"], atan2(1, 5))
expect_equal(collect(select(df, atan2(df$a, df$b)))[2, "ATAN2(a, b)"], atan2(2, 6))
expect_equal(collect(select(df, atan2(df$a, df$b)))[3, "ATAN2(a, b)"], atan2(3, 7))
expect_equal(collect(select(df, atan2(df$a, df$b)))[4, "ATAN2(a, b)"], atan2(4, 8))
## nolint start
expect_equal(collect(select(df, hypot(df$a, df$b)))[1, "HYPOT(a, b)"], sqrt(1^2 + 5^2))
expect_equal(collect(select(df, hypot(df$a, df$b)))[2, "HYPOT(a, b)"], sqrt(2^2 + 6^2))
expect_equal(collect(select(df, hypot(df$a, df$b)))[3, "HYPOT(a, b)"], sqrt(3^2 + 7^2))
expect_equal(collect(select(df, hypot(df$a, df$b)))[4, "HYPOT(a, b)"], sqrt(4^2 + 8^2))
## nolint end
expect_equal(collect(select(df, shiftLeft(df$b, 1)))[4, 1], 16)
expect_equal(collect(select(df, shiftRight(df$b, 1)))[4, 1], 4)
expect_equal(collect(select(df, shiftRightUnsigned(df$b, 1)))[4, 1], 4)
expect_equal(class(collect(select(df, rand()))[2, 1]), "numeric")
expect_equal(collect(select(df, rand(1)))[1, 1], 0.134, tolerance = 0.01)
expect_equal(class(collect(select(df, randn()))[2, 1]), "numeric")
expect_equal(collect(select(df, randn(1)))[1, 1], -1.03, tolerance = 0.01)
test_that("string operators", {
df <- read.json(jsonPath)
expect_equal(count(where(df, like(df$name, "A%"))), 1)
expect_equal(count(where(df, startsWith(df$name, "A"))), 1)
expect_true(first(select(df, startsWith(df$name, "M")))[[1]])
expect_false(first(select(df, startsWith(df$name, "m")))[[1]])
expect_true(first(select(df, endsWith(df$name, "el")))[[1]])
expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi")
if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) {
expect_true(startsWith("Hello World", "Hello"))
expect_false(endsWith("Hello World", "a"))
expect_equal(collect(select(df, cast(df$age, "string")))[[2, 1]], "30")
expect_equal(collect(select(df, concat(df$name, lit(":"), df$age)))[[2, 1]], "Andy:30")
expect_equal(collect(select(df, concat_ws(":", df$name)))[[2, 1]], "Andy")
expect_equal(collect(select(df, concat_ws(":", df$name, df$age)))[[2, 1]], "Andy:30")
expect_equal(collect(select(df, instr(df$name, "i")))[, 1], c(2, 0, 5))
expect_equal(collect(select(df, format_number(df$age, 2)))[2, 1], "30.00")
expect_equal(collect(select(df, sha1(df$name)))[2, 1],
expect_equal(collect(select(df, sha2(df$name, 256)))[2, 1],
expect_equal(collect(select(df, format_string("Name:%s", df$name)))[2, 1], "Name:Andy")
expect_equal(collect(select(df, format_string("%s, %d", df$name, df$age)))[2, 1], "Andy, 30")
expect_equal(collect(select(df, regexp_extract(df$name, "(n.y)", 1)))[2, 1], "ndy")
expect_equal(collect(select(df, regexp_replace(df$name, "(n.y)", "ydn")))[2, 1], "Aydn")
l2 <- list(list(a = "aaads"))
df2 <- createDataFrame(l2)
expect_equal(collect(select(df2, locate("aa", df2$a)))[1, 1], 1)
expect_equal(collect(select(df2, locate("aa", df2$a, 2)))[1, 1], 2)
expect_equal(collect(select(df2, lpad(df2$a, 8, "#")))[1, 1], "###aaads") # nolint
expect_equal(collect(select(df2, rpad(df2$a, 8, "#")))[1, 1], "aaads###") # nolint
l3 <- list(list(a = "a.b.c.d"))
df3 <- createDataFrame(l3)
expect_equal(collect(select(df3, substring_index(df3$a, ".", 2)))[1, 1], "a.b")
expect_equal(collect(select(df3, substring_index(df3$a, ".", -3)))[1, 1], "b.c.d")
expect_equal(collect(select(df3, translate(df3$a, "bc", "12")))[1, 1], "a.1.2.d")
test_that("date functions on a DataFrame", {
.originalTimeZone <- Sys.getenv("TZ")
Sys.setenv(TZ = "UTC")
l <- list(list(a = 1L, b = as.Date("2012-12-13")),
list(a = 2L, b = as.Date("2013-12-14")),
list(a = 3L, b = as.Date("2014-12-15")))
df <- createDataFrame(l)
expect_equal(collect(select(df, dayofmonth(df$b)))[, 1], c(13, 14, 15))
expect_equal(collect(select(df, dayofyear(df$b)))[, 1], c(348, 348, 349))
expect_equal(collect(select(df, weekofyear(df$b)))[, 1], c(50, 50, 51))
expect_equal(collect(select(df, year(df$b)))[, 1], c(2012, 2013, 2014))
expect_equal(collect(select(df, month(df$b)))[, 1], c(12, 12, 12))
expect_equal(collect(select(df, last_day(df$b)))[, 1],
c(as.Date("2012-12-31"), as.Date("2013-12-31"), as.Date("2014-12-31")))
expect_equal(collect(select(df, next_day(df$b, "MONDAY")))[, 1],
c(as.Date("2012-12-17"), as.Date("2013-12-16"), as.Date("2014-12-22")))
expect_equal(collect(select(df, date_format(df$b, "y")))[, 1], c("2012", "2013", "2014"))
expect_equal(collect(select(df, add_months(df$b, 3)))[, 1],
c(as.Date("2013-03-13"), as.Date("2014-03-14"), as.Date("2015-03-15")))
expect_equal(collect(select(df, date_add(df$b, 1)))[, 1],
c(as.Date("2012-12-14"), as.Date("2013-12-15"), as.Date("2014-12-16")))
expect_equal(collect(select(df, date_sub(df$b, 1)))[, 1],
c(as.Date("2012-12-12"), as.Date("2013-12-13"), as.Date("2014-12-14")))
l2 <- list(list(a = 1L, b = as.POSIXlt("2012-12-13 12:34:00", tz = "UTC")),
list(a = 2L, b = as.POSIXlt("2014-12-15 01:24:34", tz = "UTC")))
df2 <- createDataFrame(l2)
expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24))
expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34))
expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1],
c(as.POSIXlt("2012-12-13 21:34:00 UTC"), as.POSIXlt("2014-12-15 10:24:34 UTC")))
expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1],
c(as.POSIXlt("2012-12-13 03:34:00 UTC"), as.POSIXlt("2014-12-14 16:24:34 UTC")))
expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0)
expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0)
expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0)
l3 <- list(list(a = 1000), list(a = -1000))
df3 <- createDataFrame(l3)
result31 <- collect(select(df3, from_unixtime(df3$a)))
expect_equal(grep("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", result31[, 1], perl = TRUE),
c(1, 2))
result32 <- collect(select(df3, from_unixtime(df3$a, "yyyy")))
expect_equal(grep("\\d{4}", result32[, 1]), c(1, 2))
Sys.setenv(TZ = .originalTimeZone)
test_that("greatest() and least() on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(l)
expect_equal(collect(select(df, greatest(df$a, df$b)))[, 1], c(2, 4))
expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3))
test_that("time windowing (window()) with all inputs", {
df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
test_that("time windowing (window()) with slide duration", {
df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", "2 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1, 1))
test_that("time windowing (window()) with start time", {
df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", startTime = "2 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
test_that("time windowing (window()) with just window duration", {
df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
test_that("when(), otherwise() and ifelse() on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(l)
expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, 1)))[, 1], c(NA, 1))
expect_equal(collect(select(df, otherwise(when(df$a > 1, 1), 0)))[, 1], c(0, 1))
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, 0, 1)))[, 1], c(1, 0))
test_that("when(), otherwise() and ifelse() with column on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(l)
expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, lit(1))))[, 1], c(NA, 1))
expect_equal(collect(select(df, otherwise(when(df$a > 1, lit(1)), lit(0))))[, 1], c(0, 1))
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, lit(0), lit(1))))[, 1], c(1, 0))
test_that("group by, agg functions", {
df <- read.json(jsonPath)
df1 <- agg(df, name = "max", age = "sum")
expect_equal(1, count(df1))
df1 <- agg(df, age2 = max(df$age))
expect_equal(1, count(df1))
expect_equal(columns(df1), c("age2"))
gd <- groupBy(df, "name")
expect_is(gd, "GroupedData")
df2 <- count(gd)
expect_is(df2, "SparkDataFrame")
expect_equal(3, count(df2))
# Also test group_by, summarize, mean
gd1 <- group_by(df, "name")
expect_is(gd1, "GroupedData")
df_summarized <- summarize(gd, mean_age = mean(df$age))
expect_is(df_summarized, "SparkDataFrame")
expect_equal(3, count(df_summarized))
df3 <- agg(gd, age = "stddev")
expect_is(df3, "SparkDataFrame")
df3_local <- collect(df3)
expect_true(is.nan(df3_local[df3_local$name == "Andy", ][1, 2]))
df4 <- agg(gd, sumAge = sum(df$age))
expect_is(df4, "SparkDataFrame")
expect_equal(3, count(df4))
expect_equal(columns(df4), c("name", "sumAge"))
df5 <- sum(gd, "age")
expect_is(df5, "SparkDataFrame")
expect_equal(3, count(df5))
expect_equal(3, count(mean(gd)))
expect_equal(3, count(max(gd)))
expect_equal(30, collect(max(gd))[2, 2])
expect_equal(1, collect(count(gd))[1, 2])
mockLines2 <- c("{\"name\":\"ID1\", \"value\": \"10\"}",
"{\"name\":\"ID1\", \"value\": \"10\"}",
"{\"name\":\"ID1\", \"value\": \"22\"}",
"{\"name\":\"ID2\", \"value\": \"-3\"}")
jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLines2, jsonPath2)
gd2 <- groupBy(read.json(jsonPath2), "name")
df6 <- agg(gd2, value = "sum")
df6_local <- collect(df6)
expect_equal(42, df6_local[df6_local$name == "ID1", ][1, 2])
expect_equal(-3, df6_local[df6_local$name == "ID2", ][1, 2])
df7 <- agg(gd2, value = "stddev")
df7_local <- collect(df7)
expect_true(abs(df7_local[df7_local$name == "ID1", ][1, 2] - 6.928203) < 1e-6)
expect_true(is.nan(df7_local[df7_local$name == "ID2", ][1, 2]))
mockLines3 <- c("{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}",
"{\"name\":\"Justin\", \"age\":1}")
jsonPath3 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLines3, jsonPath3)
df8 <- read.json(jsonPath3)
gd3 <- groupBy(df8, "name")
gd3_local <- collect(sum(gd3))
expect_equal(60, gd3_local[gd3_local$name == "Andy", ][1, 2])
expect_equal(20, gd3_local[gd3_local$name == "Justin", ][1, 2])
expect_true(abs(collect(agg(df, sd(df$age)))[1, 1] - 7.778175) < 1e-6)
gd3_local <- collect(agg(gd3, var(df8$age)))
expect_equal(162, gd3_local[gd3_local$name == "Justin", ][1, 2])
# Test stats::sd, stats::var are working
expect_true(abs(sd(1:2) - 0.7071068) < 1e-6)
expect_true(abs(var(1:5, 1:5) - 2.5) < 1e-6)
test_that("pivot GroupedData column", {
df <- createDataFrame(data.frame(
earnings = c(10000, 10000, 11000, 15000, 12000, 20000, 21000, 22000),
course = c("R", "Python", "R", "Python", "R", "Python", "R", "Python"),
year = c(2013, 2013, 2014, 2014, 2015, 2015, 2016, 2016)
sum1 <- collect(sum(pivot(groupBy(df, "year"), "course"), "earnings"))
sum2 <- collect(sum(pivot(groupBy(df, "year"), "course", c("Python", "R")), "earnings"))
sum3 <- collect(sum(pivot(groupBy(df, "year"), "course", list("Python", "R")), "earnings"))
sum4 <- collect(sum(pivot(groupBy(df, "year"), "course", "R"), "earnings"))
correct_answer <- data.frame(
year = c(2013, 2014, 2015, 2016),
Python = c(10000, 15000, 20000, 22000),
R = c(10000, 11000, 12000, 21000)
expect_equal(sum1, correct_answer)
expect_equal(sum2, correct_answer)
expect_equal(sum3, correct_answer)
expect_equal(sum4, correct_answer[, c("year", "R")])
expect_error(collect(sum(pivot(groupBy(df, "year"), "course", c("R", "R")), "earnings")))
expect_error(collect(sum(pivot(groupBy(df, "year"), "course", list("R", "R")), "earnings")))
test_that("arrange() and orderBy() on a DataFrame", {
df <- read.json(jsonPath)
sorted <- arrange(df, df$age)
expect_equal(collect(sorted)[1, 2], "Michael")
sorted2 <- arrange(df, "name", decreasing = FALSE)
expect_equal(collect(sorted2)[2, "age"], 19)
sorted3 <- orderBy(df, asc(df$age))
expect_equal(collect(sorted3)[2, "age"], 19)
sorted4 <- orderBy(df, desc(df$name))
expect_equal(first(sorted4)$name, "Michael")
expect_equal(collect(sorted4)[3, "name"], "Andy")
sorted5 <- arrange(df, "age", "name", decreasing = TRUE)
expect_equal(collect(sorted5)[1, 2], "Andy")
sorted6 <- arrange(df, "age", "name", decreasing = c(T, F))
expect_equal(collect(sorted6)[1, 2], "Andy")
sorted7 <- arrange(df, "name", decreasing = FALSE)
expect_equal(collect(sorted7)[2, "age"], 19)
test_that("filter() on a DataFrame", {
df <- read.json(jsonPath)
filtered <- filter(df, "age > 20")
expect_equal(count(filtered), 1)
expect_equal(collect(filtered)$name, "Andy")
filtered2 <- where(df, df$name != "Michael")
expect_equal(count(filtered2), 2)
expect_equal(collect(filtered2)$age[2], 19)
# test suites for %in%
filtered3 <- filter(df, "age in (19)")
expect_equal(count(filtered3), 1)
filtered4 <- filter(df, "age in (19, 30)")
expect_equal(count(filtered4), 2)
filtered5 <- where(df, df$age %in% c(19))
expect_equal(count(filtered5), 1)
filtered6 <- where(df, df$age %in% c(19, 30))
expect_equal(count(filtered6), 2)
# Test stats::filter is working
#expect_true(is.ts(filter(1:100, rep(1, 3)))) # nolint
test_that("join(), crossJoin() and merge() on a DataFrame", {
df <- read.json(jsonPath)
mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
"{\"name\":\"Andy\", \"test\": \"no\"}",
"{\"name\":\"Justin\", \"test\": \"yes\"}",
"{\"name\":\"Bob\", \"test\": \"yes\"}")
jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLines2, jsonPath2)
df2 <- read.json(jsonPath2)
# inner join, not cartesian join
expect_equal(count(where(join(df, df2), df$name == df2$name)), 3)
# cartesian join
expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }),
paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for",
" INNER join between logical plans).*"))
joined <- crossJoin(df, df2)
expect_equal(names(joined), c("age", "name", "name", "test"))
expect_equal(count(joined), 12)
expect_equal(names(collect(joined)), c("age", "name", "name", "test"))
joined2 <- join(df, df2, df$name == df2$name)
expect_equal(names(joined2), c("age", "name", "name", "test"))
expect_equal(count(joined2), 3)
joined3 <- join(df, df2, df$name == df2$name, "rightouter")
expect_equal(names(joined3), c("age", "name", "name", "test"))
expect_equal(count(joined3), 4)
expect_true(, joined3$age))$age[2]))
joined4 <- select(join(df, df2, df$name == df2$name, "outer"),
alias(df$age + 5, "newAge"), df$name, df2$test)
expect_equal(names(joined4), c("newAge", "name", "test"))
expect_equal(count(joined4), 4)
expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24)
joined5 <- join(df, df2, df$name == df2$name, "leftouter")
expect_equal(names(joined5), c("age", "name", "name", "test"))
expect_equal(count(joined5), 3)
expect_true(, joined5$age))$age[1]))
joined6 <- join(df, df2, df$name == df2$name, "inner")
expect_equal(names(joined6), c("age", "name", "name", "test"))
expect_equal(count(joined6), 3)
joined7 <- join(df, df2, df$name == df2$name, "leftsemi")
expect_equal(names(joined7), c("age", "name"))
expect_equal(count(joined7), 3)
joined8 <- join(df, df2, df$name == df2$name, "left_outer")
expect_equal(names(joined8), c("age", "name", "name", "test"))
expect_equal(count(joined8), 3)
expect_true(, joined8$age))$age[1]))
joined9 <- join(df, df2, df$name == df2$name, "right_outer")
expect_equal(names(joined9), c("age", "name", "name", "test"))
expect_equal(count(joined9), 4)
expect_true(, joined9$age))$age[2]))
merged <- merge(df, df2, by.x = "name", by.y = "name", all.x = TRUE, all.y = TRUE)
expect_equal(count(merged), 4)
expect_equal(names(merged), c("age", "name_x", "name_y", "test"))
expect_equal(collect(orderBy(merged, merged$name_x))$age[3], 19)
merged <- merge(df, df2, suffixes = c("-X", "-Y"))
expect_equal(count(merged), 3)
expect_equal(names(merged), c("age", "name-X", "name-Y", "test"))
expect_equal(collect(orderBy(merged, merged$"name-X"))$age[1], 30)
merged <- merge(df, df2, by = "name", suffixes = c("-X", "-Y"), sort = FALSE)
expect_equal(count(merged), 3)
expect_equal(names(merged), c("age", "name-X", "name-Y", "test"))
expect_equal(collect(orderBy(merged, merged$"name-Y"))$"name-X"[3], "Michael")
merged <- merge(df, df2, by = "name", all = T, sort = T)
expect_equal(count(merged), 4)
expect_equal(names(merged), c("age", "name_x", "name_y", "test"))
expect_equal(collect(orderBy(merged, merged$"name_y"))$"name_x"[1], "Andy")
merged <- merge(df, df2, by = NULL)
expect_equal(count(merged), 12)
expect_equal(names(merged), c("age", "name", "name", "test"))
mockLines3 <- c("{\"name\":\"Michael\", \"name_y\":\"Michael\", \"test\": \"yes\"}",
"{\"name\":\"Andy\", \"name_y\":\"Andy\", \"test\": \"no\"}",
"{\"name\":\"Justin\", \"name_y\":\"Justin\", \"test\": \"yes\"}",
"{\"name\":\"Bob\", \"name_y\":\"Bob\", \"test\": \"yes\"}")
jsonPath3 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLines3, jsonPath3)
df3 <- read.json(jsonPath3)
expect_error(merge(df, df3),
paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.", sep = ""))
test_that("toJSON() on DataFrame", {
df <- as.DataFrame(cars)
df_json <- toJSON(df)
expect_is(df_json, "SparkDataFrame")
expect_equal(colnames(df_json), c("value"))
expect_equal(head(df_json, 1),
data.frame(value = "{\"speed\":4.0,\"dist\":2.0}", stringsAsFactors = FALSE))
test_that("showDF()", {
df <- read.json(jsonPath)
expected <- paste("+----+-------+\n",
"| age| name|\n",
"| 30| Andy|\n",
"| 19| Justin|\n",
"+----+-------+\n", sep = "")
expected2 <- paste("+---+----+\n",
"|nul| Mic|\n",
"| 30| And|\n",
"| 19| Jus|\n",
"+---+----+\n", sep = "")
expect_output(showDF(df), expected)
expect_output(showDF(df, truncate = 3), expected2)
test_that("isLocal()", {
df <- read.json(jsonPath)
test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
df <- read.json(jsonPath)
lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(lines, jsonPath2)
df2 <- read.df(jsonPath2, "json")
unioned <- arrange(union(df, df2), df$age)
expect_is(unioned, "SparkDataFrame")
expect_equal(count(unioned), 6)
expect_equal(first(unioned)$name, "Michael")
expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6)
unioned2 <- arrange(rbind(unioned, df, df2), df$age)
expect_is(unioned2, "SparkDataFrame")
expect_equal(count(unioned2), 12)
expect_equal(first(unioned2)$name, "Michael")
df3 <- df2
names(df3)[1] <- "newName"
expect_error(rbind(df, df3),
"Names of input data frames are different.")
expect_error(rbind(df, df2, df3),
"Names of input data frames are different.")
excepted <- arrange(except(df, df2), desc(df$age))
expect_is(unioned, "SparkDataFrame")
expect_equal(count(excepted), 2)
expect_equal(first(excepted)$name, "Justin")
intersected <- arrange(intersect(df, df2), df$age)
expect_is(unioned, "SparkDataFrame")
expect_equal(count(intersected), 1)
expect_equal(first(intersected)$name, "Andy")
# Test base::union is working
expect_equal(union(c(1:3), c(3:5)), c(1:5))
# Test base::rbind is working
expect_equal(length(rbind(1:4, c = 2, a = 10, 10, deparse.level = 0)), 16)
# Test base::intersect is working
expect_equal(length(intersect(1:20, 3:23)), 18)
test_that("withColumn() and withColumnRenamed()", {
df <- read.json(jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[3], "newAge")
expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32)
# Replace existing column
newDF <- withColumn(df, "age", df$age + 2)
expect_equal(length(columns(newDF)), 2)
expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32)
newDF <- withColumn(df, "age", 18)
expect_equal(length(columns(newDF)), 2)
expect_equal(first(newDF)$age, 18)
expect_error(withColumn(df, "age", list("a")),
"Literal value must be atomic in length of 1")
newDF2 <- withColumnRenamed(df, "age", "newerAge")
expect_equal(length(columns(newDF2)), 2)
expect_equal(columns(newDF2)[1], "newerAge")
test_that("mutate(), transform(), rename() and names()", {
df <- read.json(jsonPath)
newDF <- mutate(df, newAge = df$age + 2)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[3], "newAge")
expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32)
newDF <- mutate(df, age = df$age + 2, newAge = df$age + 3)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[3], "newAge")
expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 33)
expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32)
newDF <- mutate(df, age = df$age + 2, newAge = df$age + 3,
age = df$age + 4, newAge = df$age + 5)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[3], "newAge")
expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 35)
expect_equal(first(filter(newDF, df$name != "Michael"))$age, 34)
newDF <- mutate(df, df$age + 3)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[[3]], "df$age + 3")
expect_equal(first(filter(newDF, df$name != "Michael"))[[3]], 33)
newDF2 <- rename(df, newerAge = df$age)
expect_equal(length(columns(newDF2)), 2)
expect_equal(columns(newDF2)[1], "newerAge")
names(newDF2) <- c("newerName", "evenNewerAge")
expect_equal(length(names(newDF2)), 2)
expect_equal(names(newDF2)[1], "newerName")
transformedDF <- transform(df, newAge = -df$age, newAge2 = df$age / 2)
expect_equal(length(columns(transformedDF)), 4)
expect_equal(columns(transformedDF)[3], "newAge")
expect_equal(columns(transformedDF)[4], "newAge2")
expect_equal(first(filter(transformedDF, transformedDF$name == "Andy"))$newAge, -30)
# test if base::transform on local data frames works
# ensure the proper signature is used - otherwise this will fail to run
result <- transform(Ozone, logOzone = log(Ozone))
expect_equal(nrow(result), 153)
expect_equal(ncol(result), 2)
test_that("read/write ORC files", {
df <- read.df(jsonPath, "json")
# Test write.df and read.df
write.df(df, orcPath, "orc", mode = "overwrite")
df2 <- read.df(orcPath, "orc")
expect_is(df2, "SparkDataFrame")
expect_equal(count(df), count(df2))
# Test write.orc and read.orc
orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc")
write.orc(df, orcPath2)
orcDF <- read.orc(orcPath2)
expect_is(orcDF, "SparkDataFrame")
expect_equal(count(orcDF), count(df))
test_that("read/write ORC files - compression option", {
df <- read.df(jsonPath, "json")
orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc")
write.orc(df, orcPath2, compression = "ZLIB")
orcDF <- read.orc(orcPath2)
expect_is(orcDF, "SparkDataFrame")
expect_equal(count(orcDF), count(df))
expect_true(length(list.files(orcPath2, pattern = ".zlib.orc")) > 0)
test_that("read/write Parquet files", {
df <- read.df(jsonPath, "json")
# Test write.df and read.df
write.df(df, parquetPath, "parquet", mode = "overwrite")
df2 <- read.df(parquetPath, "parquet")
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)
# Test write.parquet/saveAsParquetFile and read.parquet/parquetFile
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
write.parquet(df, parquetPath2)
parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
suppressWarnings(saveAsParquetFile(df, parquetPath3))
parquetDF <- read.parquet(c(parquetPath2, parquetPath3))
expect_is(parquetDF, "SparkDataFrame")
expect_equal(count(parquetDF), count(df) * 2)
parquetDF2 <- suppressWarnings(parquetFile(parquetPath2, parquetPath3))
expect_is(parquetDF2, "SparkDataFrame")
expect_equal(count(parquetDF2), count(df) * 2)
# Test if varargs works with variables
saveMode <- "overwrite"
mergeSchema <- "true"
parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema)
test_that("read/write Parquet files - compression option/mode", {
df <- read.df(jsonPath, "json")
tempPath <- tempfile(pattern = "tempPath", fileext = ".parquet")
# Test write.df and read.df
write.parquet(df, tempPath, compression = "GZIP")
df2 <- read.parquet(tempPath)
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)
expect_true(length(list.files(tempPath, pattern = ".gz.parquet")) > 0)
write.parquet(df, tempPath, mode = "overwrite")
df3 <- read.parquet(tempPath)
expect_is(df3, "SparkDataFrame")
expect_equal(count(df3), 3)
test_that("read/write text files", {
# Test write.df and read.df
df <- read.df(jsonPath, "text")
expect_is(df, "SparkDataFrame")
expect_equal(colnames(df), c("value"))
expect_equal(count(df), 3)
textPath <- tempfile(pattern = "textPath", fileext = ".txt")
write.df(df, textPath, "text", mode = "overwrite")
# Test write.text and read.text
textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt")
write.text(df, textPath2)
df2 <- read.text(c(textPath, textPath2))
expect_is(df2, "SparkDataFrame")
expect_equal(colnames(df2), c("value"))
expect_equal(count(df2), count(df) * 2)
test_that("read/write text files - compression option", {
df <- read.df(jsonPath, "text")
textPath <- tempfile(pattern = "textPath", fileext = ".txt")
write.text(df, textPath, compression = "GZIP")
textDF <- read.text(textPath)
expect_is(textDF, "SparkDataFrame")
expect_equal(count(textDF), count(df))
expect_true(length(list.files(textPath, pattern = ".gz")) > 0)
test_that("describe() and summarize() on a DataFrame", {
df <- read.json(jsonPath)
stats <- describe(df, "age")
expect_equal(collect(stats)[1, "summary"], "count")
expect_equal(collect(stats)[2, "age"], "24.5")
expect_equal(collect(stats)[3, "age"], "7.7781745930520225")
stats <- describe(df)
expect_equal(collect(stats)[4, "summary"], "min")
expect_equal(collect(stats)[5, "age"], "30")
stats2 <- summary(df)
expect_equal(collect(stats2)[4, "summary"], "min")
expect_equal(collect(stats2)[5, "age"], "30")
# SPARK-16425: SparkR summary() fails on column of type logical
df <- withColumn(df, "boolean", df$age == 30)
# Test base::summary is working
expect_equal(length(summary(attenu, digits = 4)), 35)
test_that("dropna() and na.omit() on a DataFrame", {
df <- read.json(jsonPathNa)
rows <- collect(df)
# drop with columns
expected <- rows[!$name), ]
actual <- collect(dropna(df, cols = "name"))
expect_identical(expected, actual)
actual <- collect(na.omit(df, cols = "name"))
expect_identical(expected, actual)
expected <- rows[!$age), ]
actual <- collect(dropna(df, cols = "age"))
row.names(expected) <- row.names(actual)
# identical on two dataframes does not work here. Don't know why.
# use identical on all columns as a workaround.
expect_identical(expected$age, actual$age)
expect_identical(expected$height, actual$height)
expect_identical(expected$name, actual$name)
actual <- collect(na.omit(df, cols = "age"))
expected <- rows[!$age) & !$height), ]
actual <- collect(dropna(df, cols = c("age", "height")))
expect_identical(expected, actual)
actual <- collect(na.omit(df, cols = c("age", "height")))
expect_identical(expected, actual)
expected <- rows[!$age) & !$height) & !$name), ]
actual <- collect(dropna(df))
expect_identical(expected, actual)
actual <- collect(na.omit(df))
expect_identical(expected, actual)
# drop with how
expected <- rows[!$age) & !$height) & !$name), ]
actual <- collect(dropna(df))
expect_identical(expected, actual)
actual <- collect(na.omit(df))
expect_identical(expected, actual)
expected <- rows[!$age) | !$height) | !$name), ]
actual <- collect(dropna(df, "all"))
expect_identical(expected, actual)
actual <- collect(na.omit(df, "all"))
expect_identical(expected, actual)
expected <- rows[!$age) & !$height) & !$name), ]
actual <- collect(dropna(df, "any"))
expect_identical(expected, actual)
actual <- collect(na.omit(df, "any"))
expect_identical(expected, actual)
expected <- rows[!$age) & !$height), ]
actual <- collect(dropna(df, "any", cols = c("age", "height")))
expect_identical(expected, actual)
actual <- collect(na.omit(df, "any", cols = c("age", "height")))
expect_identical(expected, actual)
expected <- rows[!$age) | !$height), ]
actual <- collect(dropna(df, "all", cols = c("age", "height")))
expect_identical(expected, actual)
actual <- collect(na.omit(df, "all", cols = c("age", "height")))
expect_identical(expected, actual)
# drop with threshold
expected <- rows[as.integer(!$age)) + as.integer(!$height)) >= 2, ]
actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height")))
expect_identical(expected, actual)
actual <- collect(na.omit(df, minNonNulls = 2, cols = c("age", "height")))
expect_identical(expected, actual)
expected <- rows[as.integer(!$age)) +
as.integer(!$height)) +
as.integer(!$name)) >= 3, ]
actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height")))
expect_identical(expected, actual)
actual <- collect(na.omit(df, minNonNulls = 3, cols = c("name", "age", "height")))
expect_identical(expected, actual)
# Test stats::na.omit is working
expect_equal(nrow(na.omit(data.frame(x = c(0, 10, NA)))), 2)
test_that("fillna() on a DataFrame", {
df <- read.json(jsonPathNa)
rows <- collect(df)
# fill with value
expected <- rows
expected$age[$age)] <- 50
expected$height[$height)] <- 50.6
actual <- collect(fillna(df, 50.6))
expect_identical(expected, actual)
expected <- rows
expected$name[$name)] <- "unknown"
actual <- collect(fillna(df, "unknown"))
expect_identical(expected, actual)
expected <- rows
expected$age[$age)] <- 50
actual <- collect(fillna(df, 50.6, "age"))
expect_identical(expected, actual)
expected <- rows
expected$name[$name)] <- "unknown"
actual <- collect(fillna(df, "unknown", c("age", "name")))
expect_identical(expected, actual)
# fill with named list
expected <- rows
expected$age[$age)] <- 50
expected$height[$height)] <- 50.6
expected$name[$name)] <- "unknown"
actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown")))
expect_identical(expected, actual)
test_that("crosstab() on a DataFrame", {
rdd <- lapply(parallelize(sc, 0:3), function(x) {
list(paste0("a", x %% 3), paste0("b", x %% 2))
df <- toDF(rdd, list("a", "b"))
ct <- crosstab(df, "a", "b")
ordered <- ct[order(ct$a_b), ]
row.names(ordered) <- NULL
expected <- data.frame("a_b" = c("a0", "a1", "a2"), "b0" = c(1, 0, 1), "b1" = c(1, 1, 0),
stringsAsFactors = FALSE, row.names = NULL)
expect_identical(expected, ordered)
test_that("cov() and corr() on a DataFrame", {
l <- lapply(c(0:9), function(x) { list(x, x * 2.0) })
df <- createDataFrame(l, c("singles", "doubles"))
result <- cov(df, "singles", "doubles")
expect_true(abs(result - 55.0 / 3) < 1e-12)
result <- corr(df, "singles", "doubles")
expect_true(abs(result - 1.0) < 1e-12)
result <- corr(df, "singles", "doubles", "pearson")
expect_true(abs(result - 1.0) < 1e-12)
# Test stats::cov is working
#expect_true(abs(max(cov(swiss)) - 1739.295) < 1e-3) # nolint
test_that("freqItems() on a DataFrame", {
input <- 1:1000
rdf <- data.frame(numbers = input, letters = as.character(input),
negDoubles = input * -1.0, stringsAsFactors = F)
rdf[ input %% 3 == 0, ] <- c(1, "1", -1)
df <- createDataFrame(rdf)
multiColResults <- freqItems(df, c("numbers", "letters"), support = 0.1)
expect_true(1 %in% multiColResults$numbers[[1]])
expect_true("1" %in% multiColResults$letters[[1]])
singleColResult <- freqItems(df, "negDoubles", support = 0.1)
expect_true(-1 %in% head(singleColResult$negDoubles)[[1]])
l <- lapply(c(0:99), function(i) {
if (i %% 2 == 0) { list(1L, -1.0) }
else { list(i, i * -1.0) }})
df <- createDataFrame(l, c("a", "b"))
result <- freqItems(df, c("a", "b"), 0.4)
expect_identical(result[[1]], list(list(1L, 99L)))
expect_identical(result[[2]], list(list(-1, -99)))
test_that("sampleBy() on a DataFrame", {
l <- lapply(c(0:99), function(i) { as.character(i %% 3) })
df <- createDataFrame(l, "key")
fractions <- list("0" = 0.1, "1" = 0.2)
sample <- sampleBy(df, "key", fractions, 0)
result <- collect(orderBy(count(groupBy(sample, "key")), "key"))
expect_identical(as.list(result[1, ]), list(key = "0", count = 3))
expect_identical(as.list(result[2, ]), list(key = "1", count = 7))
test_that("approxQuantile() on a DataFrame", {
l <- lapply(c(0:99), function(i) { list(i, 99 - i) })
df <- createDataFrame(l, list("a", "b"))
quantiles <- approxQuantile(df, "a", c(0.5, 0.8), 0.0)
expect_equal(quantiles, list(50, 80))
quantiles2 <- approxQuantile(df, c("a", "b"), c(0.5, 0.8), 0.0)
expect_equal(quantiles2[[1]], list(50, 80))
expect_equal(quantiles2[[2]], list(50, 80))
dfWithNA <- createDataFrame(data.frame(a = c(NA, 30, 19, 11, 28, 15),
b = c(-30, -19, NA, -11, -28, -15)))
quantiles3 <- approxQuantile(dfWithNA, c("a", "b"), c(0.5), 0.0)
expect_equal(quantiles3[[1]], list(28))
expect_equal(quantiles3[[2]], list(-15))
test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql("select * from blah"), error = function(e) e)
expect_equal(grepl("Table or view not found", retError), TRUE)
expect_equal(grepl("blah", retError), TRUE)
irisDF <- suppressWarnings(createDataFrame(iris))
test_that("Method as a synonym for collect()", {
expect_equal(, collect(irisDF))
irisDF2 <- irisDF[irisDF$Species == "setosa", ]
expect_equal(, collect(irisDF2))
# Make sure in the R base package is not covered
expect_error(, 2)), NA)
test_that("attach() on a DataFrame", {
df <- read.json(jsonPath)
expect_is(age, "SparkDataFrame")
expected_age <- data.frame(age = c(NA, 30, 19))
expect_equal(head(age), expected_age)
stat <- summary(age)
expect_equal(collect(stat)[5, "age"], "30")
age <- age$age + 1
expect_is(age, "Column")
stat2 <- summary(age)
expect_equal(collect(stat2)[5, "age"], "30")
stat3 <- summary(df[, "age", drop = F])
expect_equal(collect(stat3)[5, "age"], "30")
test_that("with() on a DataFrame", {
df <- suppressWarnings(createDataFrame(iris))
sum1 <- with(df, list(summary(Sepal_Length), summary(Sepal_Width)))
expect_equal(collect(sum1[[1]])[1, "Sepal_Length"], "150")
sum2 <- with(df, distinct(Sepal_Length))
expect_equal(nrow(sum2), 35)
test_that("Method coltypes() to get and set R's data types of a DataFrame", {
expect_equal(coltypes(irisDF), c(rep("numeric", 4), "character"))
data <- data.frame(c1 = c(1, 2, 3),
c2 = c(T, F, T),
c3 = c("2015/01/01 10:00:00", "2015/01/02 10:00:00", "2015/01/03 10:00:00"))
schema <- structType(structField("c1", "byte"),
structField("c3", "boolean"),
structField("c4", "timestamp"))
# Test primitive types
DF <- createDataFrame(data, schema)
expect_equal(coltypes(DF), c("integer", "logical", "POSIXct"))
createOrReplaceTempView(DF, "DFView")
sqlCast <- sql("select cast('2' as decimal) as x from DFView limit 1")
expect_equal(coltypes(sqlCast), "numeric")
# Test complex types
x <- createDataFrame(list(list(as.environment(
list("a" = "b", "c" = "d", "e" = "f")))))
expect_equal(coltypes(x), "map<string,string>")
df <- selectExpr(read.json(jsonPath), "name", "(age * 1.21) as age")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)")))
df1 <- select(df, cast(df$age, "integer"))
coltypes(df) <- c("character", "integer")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int")))
value <- collect(df[, 2, drop = F])[[3, 1]]
expect_equal(value, collect(df1)[[3, 1]])
expect_equal(value, 22)
coltypes(df) <- c(NA, "numeric")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "double")))
expect_error(coltypes(df) <- c("character"),
"Length of type vector should match the number of columns for SparkDataFrame")
expect_error(coltypes(df) <- c("environment", "list"),
"Only atomic type is supported for column types")
test_that("Method str()", {
# Structure of Iris
iris2 <- iris
colnames(iris2) <- c("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species")
iris2$col <- TRUE
irisDF2 <- createDataFrame(iris2)
out <- capture.output(str(irisDF2))
expect_equal(length(out), 7)
expect_equal(out[1], "'SparkDataFrame': 6 variables:")
expect_equal(out[2], " $ Sepal_Length: num 5.1 4.9 4.7 4.6 5 5.4")
expect_equal(out[3], " $ Sepal_Width : num 3.5 3 3.2 3.1 3.6 3.9")
expect_equal(out[4], " $ Petal_Length: num 1.4 1.4 1.3 1.5 1.4 1.7")
expect_equal(out[5], " $ Petal_Width : num 0.2 0.2 0.2 0.2 0.2 0.4")
expect_equal(out[6], paste0(" $ Species : chr \"setosa\" \"setosa\" \"",
"setosa\" \"setosa\" \"setosa\" \"setosa\""))
expect_equal(out[7], " $ col : logi TRUE TRUE TRUE TRUE TRUE TRUE")
createOrReplaceTempView(irisDF2, "irisView")
sqlCast <- sql("select cast('2' as decimal) as x from irisView limit 1")
castStr <- capture.output(str(sqlCast))
expect_equal(length(castStr), 2)
expect_equal(castStr[1], "'SparkDataFrame': 1 variables:")
expect_equal(castStr[2], " $ x: num 2")
# A random dataset with many columns. This test is to check str limits
# the number of columns. Therefore, it will suffice to check for the
# number of returned rows
x <- runif(200, 1, 10)
df <- data.frame(t(as.matrix(data.frame(x, x, x, x, x, x, x, x, x))))
DF <- createDataFrame(df)
out <- capture.output(str(DF))
expect_equal(length(out), 103)
# Test utils:::str
expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris)))
test_that("Histogram", {
# Basic histogram test with colname
all(histogram(irisDF, "Petal_Width", 8) ==
data.frame(bins = seq(0, 7),
counts = c(48, 2, 7, 21, 24, 19, 15, 14),
centroids = seq(0, 7) * 0.3 + 0.25)),
# Basic histogram test with Column
all(histogram(irisDF, irisDF$Petal_Width, 8) ==
data.frame(bins = seq(0, 7),
counts = c(48, 2, 7, 21, 24, 19, 15, 14),
centroids = seq(0, 7) * 0.3 + 0.25)),
# Basic histogram test with derived column
all(round(histogram(irisDF, irisDF$Petal_Width + 1, 8), 2) ==
data.frame(bins = seq(0, 7),
counts = c(48, 2, 7, 21, 24, 19, 15, 14),
centroids = seq(0, 7) * 0.3 + 1.25)),
# Missing nbins
expect_equal(length(histogram(irisDF, "Petal_Width")$counts), 10)
# Wrong colname
expect_error(histogram(irisDF, "xxx"),
"Specified colname does not belong to the given SparkDataFrame.")
# Invalid nbins
expect_error(histogram(irisDF, "Petal_Width", nbins = 0),
"The number of bins must be a positive integer number greater than 1.")
# Test against R's hist
expect_equal(all(hist(iris$Sepal.Width)$counts ==
histogram(irisDF, "Sepal_Width", 12)$counts), T)
# Test when there are zero counts
df <- as.DataFrame(data.frame(x = c(1, 2, 3, 4, 100)))
expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1))
test_that("dapply() and dapplyCollect() on a DataFrame", {
df <- createDataFrame(
list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),
c("a", "b", "c"))
ldf <- collect(df)
df1 <- dapply(df, function(x) { x }, schema(df))
result <- collect(df1)
expect_identical(ldf, result)
result <- dapplyCollect(df, function(x) { x })
expect_identical(ldf, result)
# Filter and add a column
schema <- structType(structField("a", "integer"), structField("b", "double"),
structField("c", "string"), structField("d", "integer"))
df1 <- dapply(
function(x) {
y <- x[x$a > 1, ]
y <- cbind(y, y$a + 1L)
result <- collect(df1)
expected <- ldf[ldf$a > 1, ]
expected$d <- expected$a + 1L
rownames(expected) <- NULL
expect_identical(expected, result)
result <- dapplyCollect(
function(x) {
y <- x[x$a > 1, ]
y <- cbind(y, y$a + 1L)
expected1 <- expected
names(expected1) <- names(result)
expect_identical(expected1, result)
# Remove the added column
df2 <- dapply(
function(x) {
x[, c("a", "b", "c")]
result <- collect(df2)
expected <- expected[, c("a", "b", "c")]
expect_identical(expected, result)
result <- dapplyCollect(
function(x) {
x[, c("a", "b", "c")]
expect_identical(expected, result)
test_that("dapplyCollect() on DataFrame with a binary column", {
df <- data.frame(key = 1:3)
df$bytes <- lapply(df$key, serialize, connection = NULL)
df_spark <- createDataFrame(df)
result1 <- collect(df_spark)
expect_identical(df, result1)
result2 <- dapplyCollect(df_spark, function(x) x)
expect_identical(df, result2)
# A data.frame with a single column of bytes
scb <- subset(df, select = "bytes")
scb_spark <- createDataFrame(scb)
result <- dapplyCollect(scb_spark, function(x) x)
expect_identical(scb, result)
test_that("repartition by columns on DataFrame", {
df <- createDataFrame(
list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
c("a", "b", "c", "d"))
# no column and number of partitions specified
retError <- tryCatch(repartition(df), error = function(e) e)
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
# repartition by column and number of partitions
actual <- repartition(df, 3, col = df$"a")
# Checking that at least the dimensions are identical
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 3L)
# repartition by number of partitions
actual <- repartition(df, 13L)
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 13L)
expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
# a test case with a column and dapply
schema <- structType(structField("a", "integer"), structField("avg", "double"))
df <- repartition(df, col = df$"a")
df1 <- dapply(
function(x) {
y <- (data.frame(x$a[1], mean(x$b)))
# Number of partitions is equal to 2
expect_equal(nrow(df1), 2)
test_that("coalesce, repartition, numPartitions", {
df <- as.DataFrame(cars, numPartitions = 5)
expect_equal(getNumPartitions(df), 5)
expect_equal(getNumPartitions(coalesce(df, 3)), 3)
expect_equal(getNumPartitions(coalesce(df, 6)), 5)
df1 <- coalesce(df, 3)
expect_equal(getNumPartitions(df1), 3)
expect_equal(getNumPartitions(coalesce(df1, 6)), 5)
expect_equal(getNumPartitions(coalesce(df1, 4)), 4)
expect_equal(getNumPartitions(coalesce(df1, 2)), 2)
df2 <- repartition(df1, 10)
expect_equal(getNumPartitions(df2), 10)
expect_equal(getNumPartitions(coalesce(df2, 13)), 10)
expect_equal(getNumPartitions(coalesce(df2, 7)), 7)
expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
test_that("gapply() and gapplyCollect() on a DataFrame", {
df <- createDataFrame (
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
c("a", "b", "c", "d"))
expected <- collect(df)
df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
actual <- collect(df1)
expect_identical(actual, expected)
df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
expect_identical(df1Collect, expected)
# Computes the sum of second column by grouping on the first and third columns
# and checks if the sum is larger than 2
schema <- structType(structField("a", "integer"), structField("e", "boolean"))
df2 <- gapply(
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
actual <- collect(df2)$e
expected <- c(TRUE, TRUE)
expect_identical(actual, expected)
df2Collect <- gapplyCollect(
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
colnames(y) <- c("a", "e")
actual <- df2Collect$e
expect_identical(actual, expected)
# Computes the arithmetic mean of the second column by grouping
# on the first and third columns. Output the groupping value and the average.
schema <- structType(structField("a", "integer"), structField("c", "string"),
structField("avg", "double"))
df3 <- gapply(
c("a", "c"),
function(key, x) {
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
actual <- collect(df3)
actual <- actual[order(actual$a), ]
rownames(actual) <- NULL
expected <- collect(select(df, "a", "b", "c"))
expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
colnames(expected) <- c("a", "c", "avg")
expected <- expected[order(expected$a), ]
rownames(expected) <- NULL
expect_identical(actual, expected)
df3Collect <- gapplyCollect(
c("a", "c"),
function(key, x) {
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
colnames(y) <- c("a", "c", "avg")
actual <- df3Collect[order(df3Collect$a), ]
expect_identical(actual$avg, expected$avg)
irisDF <- suppressWarnings(createDataFrame (iris))
schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
# Groups by `Sepal_Length` and computes the average for `Sepal_Width`
df4 <- gapply(
cols = "Sepal_Length",
function(key, x) {
y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
actual <- collect(df4)
actual <- actual[order(actual$Sepal_Length), ]
rownames(actual) <- NULL
agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean),
stringsAsFactors = FALSE)
colnames(agg_local_df) <- c("Sepal_Length", "Avg")
expected <- agg_local_df[order(agg_local_df$Sepal_Length), ]
rownames(expected) <- NULL
expect_identical(actual, expected)
test_that("Window functions on a DataFrame", {
df <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
schema = c("key", "value"))
ws <- orderBy(windowPartitionBy("key"), "value")
result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
names(result) <- c("key", "value")
expected <- data.frame(key = c(1L, NA, 2L, NA),
value = c("1", NA, "2", NA),
stringsAsFactors = FALSE)
expect_equal(result, expected)
ws <- orderBy(windowPartitionBy(df$key), df$value)
result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
names(result) <- c("key", "value")
expect_equal(result, expected)
ws <- partitionBy(windowOrderBy("value"), "key")
result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
names(result) <- c("key", "value")
expect_equal(result, expected)
ws <- partitionBy(windowOrderBy(df$value), df$key)
result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
names(result) <- c("key", "value")
expect_equal(result, expected)
test_that("createDataFrame sqlContext parameter backward compatibility", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
a <- 1:3
b <- c("a", "b", "c")
ldf <- data.frame(a, b)
# Call function with namespace :: operator - SPARK-16538
df <- suppressWarnings(SparkR::createDataFrame(sqlContext, ldf))
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
expect_equal(count(df), 3)
ldf2 <- collect(df)
expect_equal(ldf$a, ldf2$a)
df2 <- suppressWarnings(createDataFrame(sqlContext, iris))
expect_equal(count(df2), 150)
expect_equal(ncol(df2), 5)
df3 <- suppressWarnings(read.df(sqlContext, jsonPath, "json"))
expect_is(df3, "SparkDataFrame")
expect_equal(count(df3), 3)
before <- suppressWarnings(createDataFrame(sqlContext, iris))
after <- suppressWarnings(createDataFrame(iris))
expect_equal(collect(before), collect(after))
# more tests for SPARK-16538
createOrReplaceTempView(df, "table")
SparkR::sql("SELECT 1")
suppressWarnings(SparkR::sql(sqlContext, "SELECT * FROM table"))
suppressWarnings(SparkR::dropTempTable(sqlContext, "table"))
test_that("randomSplit", {
num <- 4000
df <- createDataFrame(data.frame(id = 1:num))
weights <- c(2, 3, 5)
df_list <- randomSplit(df, weights)
expect_equal(length(weights), length(df_list))
counts <- sapply(df_list, count)
expect_equal(num, sum(counts))
expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 })))
df_list <- randomSplit(df, weights, 0)
expect_equal(length(weights), length(df_list))
counts <- sapply(df_list, count)
expect_equal(num, sum(counts))
expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 })))
test_that("Setting and getting config on SparkSession, sparkR.conf(), sparkR.uiWebUrl()", {
# first, set it to a random but known value
conf <- callJMethod(sparkSession, "conf")
property <- paste0("spark.testing.", as.character(runif(1)))
value1 <- as.character(runif(1))
callJMethod(conf, "set", property, value1)
# next, change the same property to the new value
value2 <- as.character(runif(1))
l <- list(value2)
names(l) <- property
sparkR.session(sparkConfig = l)
newValue <- unlist(sparkR.conf(property, ""), use.names = FALSE)
expect_equal(value2, newValue)
value <- as.character(runif(1))
sparkR.session( = "sparkSession test", spark.testing.r.session.r = value)
allconf <- sparkR.conf()
appNameValue <- allconf[[""]]
testValue <- allconf[["spark.testing.r.session.r"]]
expect_equal(appNameValue, "sparkSession test")
expect_equal(testValue, value)
expect_error(sparkR.conf("completely.dummy"), "Config 'completely.dummy' is not set")
url <- sparkR.uiWebUrl()
expect_equal(substr(url, 1, 7), "http://")
test_that("enableHiveSupport on SparkSession", {
# if we are still here, it must be built with hive
conf <- callJMethod(sparkSession, "conf")
value <- callJMethod(conf, "get", "spark.sql.catalogImplementation")
expect_equal(value, "hive")
test_that("Spark version from SparkSession", {
ver <- callJMethod(sc, "version")
version <- sparkR.version()
expect_equal(ver, version)
test_that("Call API in Java without path and check argument types", {
df <- read.df(jsonPath, "json")
# This tests if the exception is thrown from JVM not from SparkR side.
# It makes sure that we can omit path argument in write.df API and then it calls
# without path.
expect_error(write.df(df, source = "csv"),
"Error in save : illegal argument - Expected exactly one path to be specified")
expect_error(write.json(df, jsonPath),
"Error in json : analysis error - path file:.*already exists")
expect_error(write.text(df, jsonPath),
"Error in text : analysis error - path file:.*already exists")
expect_error(write.orc(df, jsonPath),
"Error in orc : analysis error - path file:.*already exists")
expect_error(write.parquet(df, jsonPath),
"Error in parquet : analysis error - path file:.*already exists")
# Arguments checking in R side.
expect_error(write.df(df, "data.tmp", source = c(1, 2)),
paste("source should be character, NULL or omitted. It is the datasource specified",
"in 'spark.sql.sources.default' configuration by default."))
expect_error(write.df(df, path = c(3)),
"path should be charactor, NULL or omitted.")
expect_error(write.df(df, mode = TRUE),
"mode should be charactor or omitted. It is 'error' by default.")
test_that("Call DataFrameWriter.load() API in Java without path and check argument types", {
# This tests if the exception is thrown from JVM not from SparkR side.
# It makes sure that we can omit path argument in read.df API and then it calls
# DataFrameWriter.load() without path.
expect_error(read.df(source = "json"),
paste("Error in loadDF : analysis error - Unable to infer schema for JSON.",
"It must be specified manually"))
expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not exist")
expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist")
expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist")
expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist")
"Error in parquet : analysis error - Path does not exist")
# Arguments checking in R side.
expect_error(read.df(path = c(3)),
"path should be charactor, NULL or omitted.")
expect_error(read.df(jsonPath, source = c(1, 2)),
paste("source should be character, NULL or omitted. It is the datasource specified",
"in 'spark.sql.sources.default' configuration by default."))
expect_warning(read.json(jsonPath, a = 1, 2, 3, "a"),
"Unnamed arguments ignored: 2, 3, a.")
test_that("Collect on DataFrame when NAs exists at the top of a timestamp column", {
ldf <- data.frame(col1 = c(0, 1, 2),
col2 = c(as.POSIXct("2017-01-01 00:00:01"),
as.POSIXct("2017-01-01 12:00:01")),
col3 = c(as.POSIXlt("2016-01-01 00:59:59"),
as.POSIXlt("2016-01-01 12:01:01")))
sdf1 <- createDataFrame(ldf)
ldf1 <- collect(sdf1)
expect_equal(dtypes(sdf1), list(c("col1", "double"),
c("col2", "timestamp"),
c("col3", "timestamp")))
expect_equal(class(ldf1$col1), "numeric")
expect_equal(class(ldf1$col2), c("POSIXct", "POSIXt"))
expect_equal(class(ldf1$col3), c("POSIXct", "POSIXt"))
# Columns with NAs at the top
sdf2 <- filter(sdf1, "col1 > 1")
ldf2 <- collect(sdf2)
expect_equal(dtypes(sdf2), list(c("col1", "double"),
c("col2", "timestamp"),
c("col3", "timestamp")))
expect_equal(class(ldf2$col1), "numeric")
expect_equal(class(ldf2$col2), c("POSIXct", "POSIXt"))
expect_equal(class(ldf2$col3), c("POSIXct", "POSIXt"))
# Columns with only NAs, the type will also be cast to PRIMITIVE_TYPE
sdf3 <- filter(sdf1, "col1 == 0")
ldf3 <- collect(sdf3)
expect_equal(dtypes(sdf3), list(c("col1", "double"),
c("col2", "timestamp"),
c("col3", "timestamp")))
expect_equal(class(ldf3$col1), "numeric")
expect_equal(class(ldf3$col2), c("POSIXct", "POSIXt"))
expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt"))
compare_list <- function(list1, list2) {
# get testthat to show the diff by first making the 2 lists equal in length
expect_equal(length(list1), length(list2))
l <- max(length(list1), length(list2))
length(list1) <- l
length(list2) <- l
expect_equal(sort(list1, na.last = TRUE), sort(list2, na.last = TRUE))
# This should always be the **very last test** in this test file.
test_that("No extra files are created in SPARK_HOME by starting session and making calls", {
# Check that it is not creating any extra file.
# Does not check the tempdir which would be cleaned up after.
filesAfter <- list.files(path = sparkRDir, all.files = TRUE)
expect_true(length(sparkRFilesBefore) > 0)
# first, ensure derby.log is not there
expect_false("derby.log" %in% filesAfter)
# second, ensure only spark-warehouse is created when calling SparkSession, enableHiveSupport = F
# note: currently all other test files have enableHiveSupport = F, so we capture the list of files
# before creating a SparkSession with enableHiveSupport = T at the top of this test file
# (filesBefore). The test here is to compare that (filesBefore) against the list of files before
# any test is run in run-all.R (sparkRFilesBefore).
# sparkRWhitelistSQLDirs is also defined in run-all.R, and should contain only 2 whitelisted dirs,
# here allow the first value, spark-warehouse, in the diff, everything else should be exactly the
# same as before any test is run.
compare_list(sparkRFilesBefore, setdiff(filesBefore, sparkRWhitelistSQLDirs[[1]]))
# third, ensure only spark-warehouse and metastore_db are created when enableHiveSupport = T
# note: as the note above, after running all tests in this file while enableHiveSupport = T, we
# check the list of files again. This time we allow both whitelisted dirs to be in the diff.
compare_list(sparkRFilesBefore, setdiff(filesAfter, sparkRWhitelistSQLDirs))