[SPARK-23446][PYTHON] Explicitly check supported types in toPandas

## What changes were proposed in this pull request?

This PR explicitly specifies and checks the types we supported in `toPandas`. This was a hole. For example, we haven't finished the binary type support in Python side yet but now it allows as below:

```python
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
df = spark.createDataFrame([[bytearray("a")]])
df.toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df.toPandas()
```

```
     _1
0  [97]
  _1
0  a
```

This should be disallowed. I think the same things also apply to nested timestamps too.

I also added some nicer message about `spark.sql.execution.arrow.enabled` in the error message.

## How was this patch tested?

Manually tested and tests added in `python/pyspark/sql/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20625 from HyukjinKwon/pandas_convertion_supported_type.
This commit is contained in:
hyukjinkwon 2018-02-16 09:41:17 -08:00 committed by gatorsmile
parent 1dc2c1d5e8
commit c5857e496f
2 changed files with 17 additions and 7 deletions

View file

@ -1988,10 +1988,11 @@ class DataFrame(object):
if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
try:
from pyspark.sql.types import _check_dataframe_convert_date, \
_check_dataframe_localize_timestamps
_check_dataframe_localize_timestamps, to_arrow_schema
from pyspark.sql.utils import require_minimum_pyarrow_version
import pyarrow
require_minimum_pyarrow_version()
import pyarrow
to_arrow_schema(self.schema)
tables = self._collectAsArrow()
if tables:
table = pyarrow.concat_tables(tables)
@ -2000,10 +2001,12 @@ class DataFrame(object):
return _check_dataframe_localize_timestamps(pdf, timezone)
else:
return pd.DataFrame.from_records([], columns=self.columns)
except ImportError as e:
msg = "note: pyarrow must be installed and available on calling Python process " \
"if using spark.sql.execution.arrow.enabled=true"
raise ImportError("%s\n%s" % (_exception_message(e), msg))
except Exception as e:
msg = (
"Note: toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
"to disable this.")
raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
else:
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

View file

@ -3497,7 +3497,14 @@ class ArrowTests(ReusedSQLTestCase):
schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
df = self.spark.createDataFrame([(None,)], schema=schema)
with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
with self.assertRaisesRegexp(Exception, 'Unsupported type'):
df.toPandas()
df = self.spark.createDataFrame([(None,)], schema="a binary")
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'Unsupported type.*\nNote: toPandas attempted Arrow optimization because'):
df.toPandas()
def test_null_conversion(self):