[SPARK-27834][SQL][R][PYTHON] Make separate PySpark/SparkR vectorization configurations

## What changes were proposed in this pull request?

`spark.sql.execution.arrow.enabled` was added when we add PySpark arrow optimization.
Later, in the current master, SparkR arrow optimization was added and it's controlled by the same configuration `spark.sql.execution.arrow.enabled`.

There look two issues about this:

1. `spark.sql.execution.arrow.enabled` in PySpark was added from 2.3.0 whereas SparkR optimization was added 3.0.0. The stability is different so it's problematic when we change the default value for one of both optimization first.

2. Suppose users want to share some JVM by PySpark and SparkR. They are currently forced to use the optimization for all or none if the configuration is set globally.

This PR proposes two separate configuration groups for PySpark and SparkR about Arrow optimization:

- Deprecate `spark.sql.execution.arrow.enabled`
- Add `spark.sql.execution.arrow.pyspark.enabled` (fallback to `spark.sql.execution.arrow.enabled`)
- Add `spark.sql.execution.arrow.sparkr.enabled`
- Deprecate `spark.sql.execution.arrow.fallback.enabled`
- Add `spark.sql.execution.arrow.pyspark.fallback.enabled ` (fallback to `spark.sql.execution.arrow.fallback.enabled`)

Note that `spark.sql.execution.arrow.maxRecordsPerBatch` is used within JVM side for both.
Note that `spark.sql.execution.arrow.fallback.enabled` was added due to behaviour change. We don't need it in SparkR - SparkR side has the automatic fallback.

## How was this patch tested?

Manually tested and some unittests were added.

Closes #24700 from HyukjinKwon/separate-sparkr-arrow.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
HyukjinKwon 2019-06-03 10:01:37 +09:00
parent 3806887afb
commit db48da87f0
13 changed files with 127 additions and 94 deletions

View file

@ -1179,7 +1179,7 @@ setMethod("collect",
function(x, stringsAsFactors = FALSE) {
connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
useArrow <- FALSE
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]] == "true"
if (arrowEnabled) {
useArrow <- tryCatch({
checkSchemaInArrow(schema(x))
@ -1187,8 +1187,8 @@ setMethod("collect",
}, error = function(e) {
warning(paste0("The conversion from Spark DataFrame to R DataFrame was attempted ",
"with Arrow optimization because ",
"'spark.sql.execution.arrow.enabled' is set to true; however, ",
"failed, attempting non-optimization. Reason: ",
"'spark.sql.execution.arrow.sparkr.enabled' is set to true; ",
"however, failed, attempting non-optimization. Reason: ",
e))
FALSE
})
@ -1476,7 +1476,7 @@ dapplyInternal <- function(x, func, schema) {
schema <- structType(schema)
}
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]] == "true"
if (arrowEnabled) {
if (inherits(schema, "structType")) {
checkSchemaInArrow(schema)

View file

@ -259,7 +259,7 @@ getSchema <- function(schema, firstRow = NULL, rdd = NULL) {
createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
numPartitions = NULL) {
sparkSession <- getSparkSession()
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]] == "true"
useArrow <- FALSE
firstRow <- NULL
@ -302,7 +302,7 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
},
error = function(e) {
warning(paste0("createDataFrame attempted Arrow optimization because ",
"'spark.sql.execution.arrow.enabled' is set to true; however, ",
"'spark.sql.execution.arrow.sparkr.enabled' is set to true; however, ",
"failed, attempting non-optimization. Reason: ",
e))
FALSE

View file

@ -229,7 +229,7 @@ gapplyInternal <- function(x, func, schema) {
if (is.character(schema)) {
schema <- structType(schema)
}
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]] == "true"
if (arrowEnabled) {
if (inherits(schema, "structType")) {
checkSchemaInArrow(schema)

View file

@ -25,22 +25,22 @@ test_that("createDataFrame/collect Arrow optimization", {
skip_if_not_installed("arrow")
conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "false")
tryCatch({
expected <- collect(createDataFrame(mtcars))
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
expect_equal(collect(createDataFrame(mtcars)), expected)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})
@ -48,15 +48,15 @@ test_that("createDataFrame/collect Arrow optimization - many partitions (partiti
skip_if_not_installed("arrow")
conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
expect_equal(collect(createDataFrame(mtcars, numPartitions = 32)),
collect(createDataFrame(mtcars, numPartitions = 1)))
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})
@ -70,23 +70,23 @@ test_that("createDataFrame/collect Arrow optimization - type specification", {
f = as.Date("1990-02-24"),
g = as.POSIXct("1990-02-24 12:34:56"))))
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
conf <- callJMethod(sparkSession, "conf")
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "false")
tryCatch({
expected <- collect(createDataFrame(rdf))
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
expect_equal(collect(createDataFrame(rdf)), expected)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})
@ -95,9 +95,9 @@ test_that("dapply() Arrow optimization", {
df <- createDataFrame(mtcars)
conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "false")
tryCatch({
ret <- dapply(df,
function(rdf) {
@ -108,10 +108,10 @@ test_that("dapply() Arrow optimization", {
expected <- collect(ret)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- dapply(df,
function(rdf) {
@ -126,7 +126,7 @@ test_that("dapply() Arrow optimization", {
expect_equal(count(ret), nrow(mtcars))
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})
@ -143,25 +143,25 @@ test_that("dapply() Arrow optimization - type specification", {
df <- createDataFrame(rdf, numPartitions = 8)
conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "false")
tryCatch({
ret <- dapply(df, function(rdf) { rdf }, schema(df))
expected <- collect(ret)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- dapply(df, function(rdf) { rdf }, schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})
@ -172,15 +172,15 @@ test_that("dapply() Arrow optimization - type specification (date and timestamp)
df <- createDataFrame(rdf)
conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- dapply(df, function(rdf) { rdf }, schema(df))
expect_equal(collect(ret), rdf)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})
@ -189,9 +189,9 @@ test_that("gapply() Arrow optimization", {
df <- createDataFrame(mtcars)
conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "false")
tryCatch({
ret <- gapply(df,
"gear",
@ -206,10 +206,10 @@ test_that("gapply() Arrow optimization", {
expected <- collect(ret)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- gapply(df,
"gear",
@ -229,7 +229,7 @@ test_that("gapply() Arrow optimization", {
expect_equal(count(ret), nrow(mtcars))
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})
@ -245,9 +245,9 @@ test_that("gapply() Arrow optimization - type specification", {
df <- createDataFrame(rdf)
conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "false")
tryCatch({
ret <- gapply(df,
"a",
@ -255,11 +255,11 @@ test_that("gapply() Arrow optimization - type specification", {
expected <- collect(ret)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- gapply(df,
"a",
@ -268,7 +268,7 @@ test_that("gapply() Arrow optimization - type specification", {
expect_equal(actual, expected)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})
@ -279,9 +279,9 @@ test_that("gapply() Arrow optimization - type specification (date and timestamp)
df <- createDataFrame(rdf)
conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- gapply(df,
"a",
@ -289,7 +289,7 @@ test_that("gapply() Arrow optimization - type specification (date and timestamp)
expect_equal(collect(ret), rdf)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})
@ -297,8 +297,8 @@ test_that("Arrow optimization - unsupported types", {
skip_if_not_installed("arrow")
conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
expect_error(checkSchemaInArrow(structType("a FLOAT")), "not support float type")
expect_error(checkSchemaInArrow(structType("a BINARY")), "not support binary type")
@ -308,7 +308,7 @@ test_that("Arrow optimization - unsupported types", {
"not support nested struct type")
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})

View file

@ -676,10 +676,10 @@ Rscript -e 'remotes::install_github("apache/arrow@apache-arrow-0.12.1", subdir =
Arrow optimization is available when converting a Spark DataFrame to an R DataFrame using the call `collect(spark_df)`,
when creating a Spark DataFrame from an R DataFrame with `createDataFrame(r_df)`, when applying an R native function to each partition
via `dapply(...)` and when applying an R native function to grouped data via `gapply(...)`.
To use Arrow when executing these calls, users need to first set the Spark configuration spark.sql.execution.arrow.enabled
To use Arrow when executing these calls, users need to first set the Spark configuration spark.sql.execution.arrow.sparkr.enabled
to true. This is disabled by default.
In addition, optimizations enabled by spark.sql.execution.arrow.enabled could fallback automatically to non-Arrow optimization
In addition, optimizations enabled by spark.sql.execution.arrow.sparkr.enabled could fallback automatically to non-Arrow optimization
implementation if an error occurs before the actual computation within Spark during converting a Spark DataFrame to/from an R
DataFrame.
@ -687,7 +687,7 @@ DataFrame.
{% highlight r %}
# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.execution.arrow.enabled = "true"))
sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)

View file

@ -44,11 +44,11 @@ You can install using pip or conda from the conda-forge channel. See PyArrow
Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame
using the call `toPandas()` and when creating a Spark DataFrame from a Pandas DataFrame with
`createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set
the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default.
the Spark configuration 'spark.sql.execution.arrow.pyspark.enabled' to 'true'. This is disabled by default.
In addition, optimizations enabled by 'spark.sql.execution.arrow.enabled' could fallback automatically
In addition, optimizations enabled by 'spark.sql.execution.arrow.pyspark.enabled' could fallback automatically
to non-Arrow optimization implementation if an error occurs before the actual computation within Spark.
This can be controlled by 'spark.sql.execution.arrow.fallback.enabled'.
This can be controlled by 'spark.sql.execution.arrow.pyspark.fallback.enabled'.
<div class="codetabs">
<div data-lang="python" markdown="1">

View file

@ -36,7 +36,7 @@ def dataframe_with_arrow_example(spark):
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

View file

@ -2087,7 +2087,7 @@ class DataFrame(object):
.. note:: This method should only be used if the resulting Pandas's DataFrame is expected
to be small, as all the data is loaded into the driver's memory.
.. note:: Usage with spark.sql.execution.arrow.enabled=True is experimental.
.. note:: Usage with spark.sql.execution.arrow.pyspark.enabled=True is experimental.
>>> df.toPandas() # doctest: +SKIP
age name
@ -2104,7 +2104,7 @@ class DataFrame(object):
else:
timezone = None
if self.sql_ctx._conf.arrowEnabled():
if self.sql_ctx._conf.arrowPySparkEnabled():
use_arrow = True
try:
from pyspark.sql.types import to_arrow_schema
@ -2114,28 +2114,28 @@ class DataFrame(object):
to_arrow_schema(self.schema)
except Exception as e:
if self.sql_ctx._conf.arrowFallbackEnabled():
if self.sql_ctx._conf.arrowPySparkFallbackEnabled():
msg = (
"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
"Attempting non-optimization as "
"'spark.sql.execution.arrow.fallback.enabled' is set to "
"'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to "
"true." % _exception_message(e))
warnings.warn(msg)
use_arrow = False
else:
msg = (
"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true, but has reached "
"the error below and will not continue because automatic fallback "
"with 'spark.sql.execution.arrow.fallback.enabled' has been set to "
"'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has "
"reached the error below and will not continue because automatic fallback "
"with 'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to "
"false.\n %s" % _exception_message(e))
warnings.warn(msg)
raise
# Try to use Arrow optimization when the schema is supported and the required version
# of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled.
# of PyArrow is found, if 'spark.sql.execution.arrow.pyspark.enabled' is enabled.
if use_arrow:
try:
from pyspark.sql.types import _check_dataframe_localize_timestamps
@ -2155,10 +2155,11 @@ class DataFrame(object):
# be executed. So, simply fail in this case for now.
msg = (
"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true, but has reached "
"the error below and can not continue. Note that "
"'spark.sql.execution.arrow.fallback.enabled' does not have an effect "
"on failures in the middle of computation.\n %s" % _exception_message(e))
"'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has "
"reached the error below and can not continue. Note that "
"'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an "
"effect on failures in the middle of "
"computation.\n %s" % _exception_message(e))
warnings.warn(msg)
raise

View file

@ -651,7 +651,7 @@ class SparkSession(object):
.. versionchanged:: 2.1
Added verifySchema.
.. note:: Usage with spark.sql.execution.arrow.enabled=True is experimental.
.. note:: Usage with spark.sql.execution.arrow.pyspark.enabled=True is experimental.
>>> l = [('Alice', 1)]
>>> spark.createDataFrame(l).collect()
@ -731,28 +731,28 @@ class SparkSession(object):
(x.encode('utf-8') if not isinstance(x, str) else x)
for x in data.columns]
if self._wrapped._conf.arrowEnabled() and len(data) > 0:
if self._wrapped._conf.arrowPySparkEnabled() and len(data) > 0:
try:
return self._create_from_pandas_with_arrow(data, schema, timezone)
except Exception as e:
from pyspark.util import _exception_message
if self._wrapped._conf.arrowFallbackEnabled():
if self._wrapped._conf.arrowPySparkFallbackEnabled():
msg = (
"createDataFrame attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
"Attempting non-optimization as "
"'spark.sql.execution.arrow.fallback.enabled' is set to "
"'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to "
"true." % _exception_message(e))
warnings.warn(msg)
else:
msg = (
"createDataFrame attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true, but has reached "
"the error below and will not continue because automatic fallback "
"with 'spark.sql.execution.arrow.fallback.enabled' has been set to "
"false.\n %s" % _exception_message(e))
"'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has "
"reached the error below and will not continue because automatic "
"fallback with 'spark.sql.execution.arrow.pyspark.fallback.enabled' "
"has been set to false.\n %s" % _exception_message(e))
warnings.warn(msg)
raise
data = self._convert_from_pandas(data, schema, timezone)

View file

@ -56,9 +56,23 @@ class ArrowTests(ReusedSQLTestCase):
time.tzset()
cls.spark.conf.set("spark.sql.session.timeZone", tz)
# Test fallback
cls.spark.conf.set("spark.sql.execution.arrow.enabled", "false")
assert cls.spark.conf.get("spark.sql.execution.arrow.pyspark.enabled") == "false"
cls.spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# Disable fallback by default to easily detect the failures.
assert cls.spark.conf.get("spark.sql.execution.arrow.pyspark.enabled") == "true"
cls.spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
assert cls.spark.conf.get("spark.sql.execution.arrow.pyspark.fallback.enabled") == "true"
cls.spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "false")
assert cls.spark.conf.get("spark.sql.execution.arrow.pyspark.fallback.enabled") == "false"
# Enable Arrow optimization in this tests.
cls.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Disable fallback by default to easily detect the failures.
cls.spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")
cls.schema = StructType([
StructField("1_str_t", StringType(), True),
StructField("2_int_t", IntegerType(), True),
@ -97,7 +111,7 @@ class ArrowTests(ReusedSQLTestCase):
return pd.DataFrame(data=data_dict)
def test_toPandas_fallback_enabled(self):
with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": True}):
with self.sql_conf({"spark.sql.execution.arrow.pyspark.fallback.enabled": True}):
schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
df = self.spark.createDataFrame([({u'a': 1},)], schema=schema)
with QuietTest(self.sc):
@ -130,7 +144,7 @@ class ArrowTests(ReusedSQLTestCase):
self.assertTrue(all([c == 1 for c in null_counts]))
def _toPandas_arrow_toggle(self, df):
with self.sql_conf({"spark.sql.execution.arrow.enabled": False}):
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):
pdf = df.toPandas()
pdf_arrow = df.toPandas()
@ -192,7 +206,7 @@ class ArrowTests(ReusedSQLTestCase):
self.assertTrue(pdf.empty)
def _createDataFrame_toggle(self, pdf, schema=None):
with self.sql_conf({"spark.sql.execution.arrow.enabled": False}):
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):
df_no_arrow = self.spark.createDataFrame(pdf, schema=schema)
df_arrow = self.spark.createDataFrame(pdf, schema=schema)
@ -323,7 +337,7 @@ class ArrowTests(ReusedSQLTestCase):
def test_createDataFrame_fallback_enabled(self):
with QuietTest(self.sc):
with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": True}):
with self.sql_conf({"spark.sql.execution.arrow.pyspark.fallback.enabled": True}):
with warnings.catch_warnings(record=True) as warns:
# we want the warnings to appear even if this test is run from a subclass
warnings.simplefilter("always")

View file

@ -758,7 +758,7 @@ class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):
not have_pandas or not have_pyarrow,
pandas_requirement_message or pyarrow_requirement_message)
def test_query_execution_listener_on_collect_with_arrow(self):
with self.sql_conf({"spark.sql.execution.arrow.enabled": True}):
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": True}):
self.assertFalse(
self.spark._jvm.OnSuccessCall.isCalled(),
"The callback from the query execution listener should not be "

View file

@ -120,7 +120,7 @@ object MapPartitionsInR {
schema: StructType,
encoder: ExpressionEncoder[Row],
child: LogicalPlan): LogicalPlan = {
if (SQLConf.get.arrowEnabled) {
if (SQLConf.get.arrowSparkREnabled) {
MapPartitionsInRWithArrow(
func,
packageNames,
@ -466,7 +466,7 @@ object FlatMapGroupsInR {
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
child: LogicalPlan): LogicalPlan = {
if (SQLConf.get.arrowEnabled) {
if (SQLConf.get.arrowSparkREnabled) {
FlatMapGroupsInRWithArrow(
func,
packageNames,

View file

@ -1326,14 +1326,24 @@ object SQLConf {
val ARROW_EXECUTION_ENABLED =
buildConf("spark.sql.execution.arrow.enabled")
.doc("When true, make use of Apache Arrow for columnar data transfers." +
"In case of PySpark, " +
.doc("(Deprecated since Spark 3.0, please set 'spark.sql.execution.arrow.pyspark.enabled'.)")
.booleanConf
.createWithDefault(false)
val ARROW_PYSPARK_EXECUTION_ENABLED =
buildConf("spark.sql.execution.arrow.pyspark.enabled")
.doc("When true, make use of Apache Arrow for columnar data transfers in PySpark. " +
"This optimization applies to: " +
"1. pyspark.sql.DataFrame.toPandas " +
"2. pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame " +
"The following data types are unsupported: " +
"BinaryType, MapType, ArrayType of TimestampType, and nested StructType." +
"BinaryType, MapType, ArrayType of TimestampType, and nested StructType.")
.fallbackConf(ARROW_EXECUTION_ENABLED)
"In case of SparkR," +
val ARROW_SPARKR_EXECUTION_ENABLED =
buildConf("spark.sql.execution.arrow.sparkr.enabled")
.doc("When true, make use of Apache Arrow for columnar data transfers in SparkR. " +
"This optimization applies to: " +
"1. createDataFrame when its input is an R DataFrame " +
"2. collect " +
"3. dapply " +
@ -1345,11 +1355,17 @@ object SQLConf {
val ARROW_FALLBACK_ENABLED =
buildConf("spark.sql.execution.arrow.fallback.enabled")
.doc(s"When true, optimizations enabled by '${ARROW_EXECUTION_ENABLED.key}' will " +
"fallback automatically to non-optimized implementations if an error occurs.")
.doc("(Deprecated since Spark 3.0, please set " +
"'spark.sql.execution.arrow.pyspark.fallback.enabled'.)")
.booleanConf
.createWithDefault(true)
val ARROW_PYSPARK_FALLBACK_ENABLED =
buildConf("spark.sql.execution.arrow.pyspark.fallback.enabled")
.doc(s"When true, optimizations enabled by '${ARROW_PYSPARK_EXECUTION_ENABLED.key}' will " +
"fallback automatically to non-optimized implementations if an error occurs.")
.fallbackConf(ARROW_FALLBACK_ENABLED)
val ARROW_EXECUTION_MAX_RECORDS_PER_BATCH =
buildConf("spark.sql.execution.arrow.maxRecordsPerBatch")
.doc("When using Apache Arrow, limit the maximum number of records that can be written " +
@ -2147,9 +2163,11 @@ class SQLConf extends Serializable with Logging {
def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION)
def arrowEnabled: Boolean = getConf(ARROW_EXECUTION_ENABLED)
def arrowPySparkEnabled: Boolean = getConf(ARROW_PYSPARK_EXECUTION_ENABLED)
def arrowFallbackEnabled: Boolean = getConf(ARROW_FALLBACK_ENABLED)
def arrowSparkREnabled: Boolean = getConf(ARROW_SPARKR_EXECUTION_ENABLED)
def arrowPySparkFallbackEnabled: Boolean = getConf(ARROW_PYSPARK_FALLBACK_ENABLED)
def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH)