[SPARK-8060] Improve DataFrame Python test coverage and documentation.

Author: Reynold Xin <rxin@databricks.com>

Closes #6601 from rxin/python-read-write-test-and-doc and squashes the following commits:

baa8ad5 [Reynold Xin] Code review feedback.
f081d47 [Reynold Xin] More documentation updates.
c9902fa [Reynold Xin] [SPARK-8060] Improve DataFrame Python reader/writer interface doc and testing.
This commit is contained in:
Reynold Xin 2015-06-03 00:23:34 -07:00
parent 452eb82dd7
commit ce320cb2db
20 changed files with 181 additions and 228 deletions

View file

@ -82,3 +82,4 @@ local-1426633911242/*
local-1430917381534/*
DESCRIPTION
NAMESPACE
test_support/*

View file

@ -45,11 +45,20 @@ from __future__ import absolute_import
def since(version):
"""
A decorator that annotates a function to append the version of Spark the function was added.
"""
import re
indent_p = re.compile(r'\n( +)')
def deco(f):
f.__doc__ = f.__doc__.rstrip() + "\n\n.. versionadded:: %s" % version
indents = indent_p.findall(f.__doc__)
indent = ' ' * (min(len(m) for m in indents) if indents else 0)
f.__doc__ = f.__doc__.rstrip() + "\n\n%s.. versionadded:: %s" % (indent, version)
return f
return deco
from pyspark.sql.types import Row
from pyspark.sql.context import SQLContext, HiveContext
from pyspark.sql.column import Column
@ -58,7 +67,9 @@ from pyspark.sql.group import GroupedData
from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter
from pyspark.sql.window import Window, WindowSpec
__all__ = [
'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row',
'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec',
'DataFrameReader', 'DataFrameWriter'
]

View file

@ -124,7 +124,10 @@ class SQLContext(object):
@property
@since("1.3.1")
def udf(self):
"""Returns a :class:`UDFRegistration` for UDF registration."""
"""Returns a :class:`UDFRegistration` for UDF registration.
:return: :class:`UDFRegistration`
"""
return UDFRegistration(self)
@since(1.4)
@ -138,7 +141,7 @@ class SQLContext(object):
:param end: the end value (exclusive)
:param step: the incremental step (default: 1)
:param numPartitions: the number of partitions of the DataFrame
:return: A new DataFrame
:return: :class:`DataFrame`
>>> sqlContext.range(1, 7, 2).collect()
[Row(id=1), Row(id=3), Row(id=5)]
@ -195,8 +198,8 @@ class SQLContext(object):
raise ValueError("The first row in RDD is empty, "
"can not infer schema")
if type(first) is dict:
warnings.warn("Using RDD of dict to inferSchema is deprecated,"
"please use pyspark.sql.Row instead")
warnings.warn("Using RDD of dict to inferSchema is deprecated. "
"Use pyspark.sql.Row instead")
if samplingRatio is None:
schema = _infer_schema(first)
@ -219,7 +222,7 @@ class SQLContext(object):
"""
.. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
warnings.warn("inferSchema is deprecated, please use createDataFrame instead")
warnings.warn("inferSchema is deprecated, please use createDataFrame instead.")
if isinstance(rdd, DataFrame):
raise TypeError("Cannot apply schema to DataFrame")
@ -262,6 +265,7 @@ class SQLContext(object):
:class:`list`, or :class:`pandas.DataFrame`.
:param schema: a :class:`StructType` or list of column names. default None.
:param samplingRatio: the sample ratio of rows used for inferring
:return: :class:`DataFrame`
>>> l = [('Alice', 1)]
>>> sqlContext.createDataFrame(l).collect()
@ -359,18 +363,15 @@ class SQLContext(object):
else:
raise ValueError("Can only register DataFrame as table")
@since(1.0)
def parquetFile(self, *paths):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> df.saveAsParquetFile(parquetFile)
>>> df2 = sqlContext.parquetFile(parquetFile)
>>> sorted(df.collect()) == sorted(df2.collect())
True
.. note:: Deprecated in 1.4, use :func:`DataFrameReader.parquet` instead.
>>> sqlContext.parquetFile('python/test_support/sql/parquet_partitioned').dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
warnings.warn("parquetFile is deprecated. Use read.parquet() instead.")
gateway = self._sc._gateway
jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths))
for i in range(0, len(paths)):
@ -378,39 +379,15 @@ class SQLContext(object):
jdf = self._ssql_ctx.parquetFile(jpaths)
return DataFrame(jdf, self)
@since(1.0)
def jsonFile(self, path, schema=None, samplingRatio=1.0):
"""Loads a text file storing one JSON object per line as a :class:`DataFrame`.
If the schema is provided, applies the given schema to this JSON dataset.
Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema.
.. note:: Deprecated in 1.4, use :func:`DataFrameReader.json` instead.
>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
>>> with open(jsonFile, 'w') as f:
... f.writelines(jsonStrings)
>>> df1 = sqlContext.jsonFile(jsonFile)
>>> df1.printSchema()
root
|-- field1: long (nullable = true)
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field4: long (nullable = true)
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("field2", StringType()),
... StructField("field3",
... StructType([StructField("field5", ArrayType(IntegerType()))]))])
>>> df2 = sqlContext.jsonFile(jsonFile, schema)
>>> df2.printSchema()
root
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field5: array (nullable = true)
| | |-- element: integer (containsNull = true)
>>> sqlContext.jsonFile('python/test_support/sql/people.json').dtypes
[('age', 'bigint'), ('name', 'string')]
"""
warnings.warn("jsonFile is deprecated. Use read.json() instead.")
if schema is None:
df = self._ssql_ctx.jsonFile(path, samplingRatio)
else:
@ -462,21 +439,16 @@ class SQLContext(object):
df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
return DataFrame(df, self)
@since(1.3)
def load(self, path=None, source=None, schema=None, **options):
"""Returns the dataset in a data source as a :class:`DataFrame`.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Optionally, a schema can be provided as the schema of the returned DataFrame.
.. note:: Deprecated in 1.4, use :func:`DataFrameReader.load` instead.
"""
warnings.warn("load is deprecated. Use read.load() instead.")
return self.read.load(path, source, schema, **options)
@since(1.3)
def createExternalTable(self, tableName, path=None, source=None,
schema=None, **options):
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
"""Creates an external table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
@ -487,6 +459,8 @@ class SQLContext(object):
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
:return: :class:`DataFrame`
"""
if path is not None:
options["path"] = path
@ -508,6 +482,8 @@ class SQLContext(object):
def sql(self, sqlQuery):
"""Returns a :class:`DataFrame` representing the result of the given query.
:return: :class:`DataFrame`
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
@ -519,6 +495,8 @@ class SQLContext(object):
def table(self, tableName):
"""Returns the specified table as a :class:`DataFrame`.
:return: :class:`DataFrame`
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
@ -536,6 +514,9 @@ class SQLContext(object):
The returned DataFrame has two columns: ``tableName`` and ``isTemporary``
(a column with :class:`BooleanType` indicating if a table is a temporary one or not).
:param dbName: string, name of the database to use.
:return: :class:`DataFrame`
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.tables()
>>> df2.filter("tableName = 'table1'").first()
@ -550,7 +531,8 @@ class SQLContext(object):
def tableNames(self, dbName=None):
"""Returns a list of names of tables in the database ``dbName``.
If ``dbName`` is not specified, the current database will be used.
:param dbName: string, name of the database to use. Default to the current database.
:return: list of table names, in string
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlContext.tableNames()
@ -585,8 +567,7 @@ class SQLContext(object):
Returns a :class:`DataFrameReader` that can be used to read data
in as a :class:`DataFrame`.
>>> sqlContext.read
<pyspark.sql.readwriter.DataFrameReader object at ...>
:return: :class:`DataFrameReader`
"""
return DataFrameReader(self)
@ -644,10 +625,14 @@ class UDFRegistration(object):
def _test():
import os
import doctest
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext
import pyspark.sql.context
os.chdir(os.environ["SPARK_HOME"])
globs = pyspark.sql.context.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc

View file

@ -44,7 +44,7 @@ class DataFrame(object):
A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
and can be created using various functions in :class:`SQLContext`::
people = sqlContext.parquetFile("...")
people = sqlContext.read.parquet("...")
Once created, it can be manipulated using the various domain-specific-language
(DSL) functions defined in: :class:`DataFrame`, :class:`Column`.
@ -56,8 +56,8 @@ class DataFrame(object):
A more concrete example::
# To create DataFrame using SQLContext
people = sqlContext.parquetFile("...")
department = sqlContext.parquetFile("...")
people = sqlContext.read.parquet("...")
department = sqlContext.read.parquet("...")
people.filter(people.age > 30).join(department, people.deptId == department.id)) \
.groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
@ -120,21 +120,12 @@ class DataFrame(object):
rdd = self._jdf.toJSON()
return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
@since(1.3)
def saveAsParquetFile(self, path):
"""Saves the contents as a Parquet file, preserving the schema.
Files that are written out using this method can be read back in as
a :class:`DataFrame` using :func:`SQLContext.parquetFile`.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> df.saveAsParquetFile(parquetFile)
>>> df2 = sqlContext.parquetFile(parquetFile)
>>> sorted(df2.collect()) == sorted(df.collect())
True
.. note:: Deprecated in 1.4, use :func:`DataFrameWriter.parquet` instead.
"""
warnings.warn("saveAsParquetFile is deprecated. Use write.parquet() instead.")
self._jdf.saveAsParquetFile(path)
@since(1.3)
@ -151,69 +142,45 @@ class DataFrame(object):
"""
self._jdf.registerTempTable(name)
@since(1.3)
def registerAsTable(self, name):
"""DEPRECATED: use :func:`registerTempTable` instead"""
warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning)
"""
.. note:: Deprecated in 1.4, use :func:`registerTempTable` instead.
"""
warnings.warn("Use registerTempTable instead of registerAsTable.")
self.registerTempTable(name)
@since(1.3)
def insertInto(self, tableName, overwrite=False):
"""Inserts the contents of this :class:`DataFrame` into the specified table.
Optionally overwriting any existing data.
.. note:: Deprecated in 1.4, use :func:`DataFrameWriter.insertInto` instead.
"""
warnings.warn("insertInto is deprecated. Use write.insertInto() instead.")
self.write.insertInto(tableName, overwrite)
@since(1.3)
def saveAsTable(self, tableName, source=None, mode="error", **options):
"""Saves the contents of this :class:`DataFrame` to a data source as a table.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Additionally, mode is used to specify the behavior of the saveAsTable operation when
table already exists in the data source. There are four modes:
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
.. note:: Deprecated in 1.4, use :func:`DataFrameWriter.saveAsTable` instead.
"""
warnings.warn("insertInto is deprecated. Use write.saveAsTable() instead.")
self.write.saveAsTable(tableName, source, mode, **options)
@since(1.3)
def save(self, path=None, source=None, mode="error", **options):
"""Saves the contents of the :class:`DataFrame` to a data source.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Additionally, mode is used to specify the behavior of the save operation when
data already exists in the data source. There are four modes:
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
.. note:: Deprecated in 1.4, use :func:`DataFrameWriter.save` instead.
"""
warnings.warn("insertInto is deprecated. Use write.save() instead.")
return self.write.save(path, source, mode, **options)
@property
@since(1.4)
def write(self):
"""
Interface for saving the content of the :class:`DataFrame` out
into external storage.
Interface for saving the content of the :class:`DataFrame` out into external storage.
:return :class:`DataFrameWriter`
.. note:: Experimental
>>> df.write
<pyspark.sql.readwriter.DataFrameWriter object at ...>
:return: :class:`DataFrameWriter`
"""
return DataFrameWriter(self)
@ -636,6 +603,9 @@ class DataFrame(object):
This include count, mean, stddev, min, and max. If no columns are
given, this function computes statistics for all numerical columns.
.. note:: This function is meant for exploratory data analysis, as we make no \
guarantee about the backward compatibility of the schema of the resulting DataFrame.
>>> df.describe().show()
+-------+---+
|summary|age|
@ -653,9 +623,11 @@ class DataFrame(object):
@ignore_unicode_prefix
@since(1.3)
def head(self, n=None):
"""
Returns the first ``n`` rows as a list of :class:`Row`,
or the first :class:`Row` if ``n`` is ``None.``
"""Returns the first ``n`` rows.
:param n: int, default 1. Number of rows to return.
:return: If n is greater than 1, return a list of :class:`Row`.
If n is 1, return a single Row.
>>> df.head()
Row(age=2, name=u'Alice')
@ -1170,8 +1142,8 @@ class DataFrame(object):
"http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou".
:func:`DataFrame.freqItems` and :func:`DataFrameStatFunctions.freqItems` are aliases.
This function is meant for exploratory data analysis, as we make no guarantee about the
backward compatibility of the schema of the resulting DataFrame.
.. note:: This function is meant for exploratory data analysis, as we make no \
guarantee about the backward compatibility of the schema of the resulting DataFrame.
:param cols: Names of the columns to calculate frequent items for as a list or tuple of
strings.

View file

@ -45,18 +45,24 @@ class DataFrameReader(object):
@since(1.4)
def format(self, source):
"""
Specifies the input data source format.
"""Specifies the input data source format.
:param source: string, name of the data source, e.g. 'json', 'parquet'.
>>> df = sqlContext.read.format('json').load('python/test_support/sql/people.json')
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
"""
self._jreader = self._jreader.format(source)
return self
@since(1.4)
def schema(self, schema):
"""
Specifies the input schema. Some data sources (e.g. JSON) can
infer the input schema automatically from data. By specifying
the schema here, the underlying data source can skip the schema
"""Specifies the input schema.
Some data sources (e.g. JSON) can infer the input schema automatically from data.
By specifying the schema here, the underlying data source can skip the schema
inference step, and thus speed up data loading.
:param schema: a StructType object
@ -69,8 +75,7 @@ class DataFrameReader(object):
@since(1.4)
def options(self, **options):
"""
Adds input options for the underlying data source.
"""Adds input options for the underlying data source.
"""
for k in options:
self._jreader = self._jreader.option(k, options[k])
@ -84,6 +89,10 @@ class DataFrameReader(object):
:param format: optional string for format of the data source. Default to 'parquet'.
:param schema: optional :class:`StructType` for the input schema.
:param options: all other string options
>>> df = sqlContext.read.load('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
if format is not None:
self.format(format)
@ -107,31 +116,10 @@ class DataFrameReader(object):
:param path: string, path to the JSON dataset.
:param schema: an optional :class:`StructType` for the input schema.
>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
>>> with open(jsonFile, 'w') as f:
... f.writelines(jsonStrings)
>>> df1 = sqlContext.read.json(jsonFile)
>>> df1.printSchema()
root
|-- field1: long (nullable = true)
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field4: long (nullable = true)
>>> df = sqlContext.read.json('python/test_support/sql/people.json')
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("field2", StringType()),
... StructField("field3",
... StructType([StructField("field5", ArrayType(IntegerType()))]))])
>>> df2 = sqlContext.read.json(jsonFile, schema)
>>> df2.printSchema()
root
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field5: array (nullable = true)
| | |-- element: integer (containsNull = true)
"""
if schema is not None:
self.schema(schema)
@ -141,10 +129,12 @@ class DataFrameReader(object):
def table(self, tableName):
"""Returns the specified table as a :class:`DataFrame`.
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.read.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True
:param tableName: string, name of the table.
>>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.registerTempTable('tmpTable')
>>> sqlContext.read.table('tmpTable').dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
return self._df(self._jreader.table(tableName))
@ -152,13 +142,9 @@ class DataFrameReader(object):
def parquet(self, *path):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> df.saveAsParquetFile(parquetFile)
>>> df2 = sqlContext.read.parquet(parquetFile)
>>> sorted(df.collect()) == sorted(df2.collect())
True
>>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
@ -221,30 +207,34 @@ class DataFrameWriter(object):
@since(1.4)
def mode(self, saveMode):
"""
Specifies the behavior when data or table already exists. Options include:
"""Specifies the behavior when data or table already exists.
Options include:
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self._jwrite = self._jwrite.mode(saveMode)
return self
@since(1.4)
def format(self, source):
"""
Specifies the underlying output data source. Built-in options include
"parquet", "json", etc.
"""Specifies the underlying output data source.
:param source: string, name of the data source, e.g. 'json', 'parquet'.
>>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self._jwrite = self._jwrite.format(source)
return self
@since(1.4)
def options(self, **options):
"""
Adds output options for the underlying data source.
"""Adds output options for the underlying data source.
"""
for k in options:
self._jwrite = self._jwrite.option(k, options[k])
@ -252,12 +242,14 @@ class DataFrameWriter(object):
@since(1.4)
def partitionBy(self, *cols):
"""
Partitions the output by the given columns on the file system.
"""Partitions the output by the given columns on the file system.
If specified, the output is laid out on the file system similar
to Hive's partitioning scheme.
:param cols: name of columns
>>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
cols = cols[0]
@ -266,25 +258,23 @@ class DataFrameWriter(object):
@since(1.4)
def save(self, path=None, format=None, mode="error", **options):
"""
Saves the contents of the :class:`DataFrame` to a data source.
"""Saves the contents of the :class:`DataFrame` to a data source.
The data source is specified by the ``format`` and a set of ``options``.
If ``format`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Additionally, mode is used to specify the behavior of the save operation when
data already exists in the data source. There are four modes:
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
:param path: the path in a Hadoop supported file system
:param format: the format used to save
:param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
:param mode: specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param options: all other string options
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode).options(**options)
if format is not None:
@ -296,8 +286,8 @@ class DataFrameWriter(object):
@since(1.4)
def insertInto(self, tableName, overwrite=False):
"""
Inserts the content of the :class:`DataFrame` to the specified table.
"""Inserts the content of the :class:`DataFrame` to the specified table.
It requires that the schema of the class:`DataFrame` is the same as the
schema of the table.
@ -307,8 +297,7 @@ class DataFrameWriter(object):
@since(1.4)
def saveAsTable(self, name, format=None, mode="error", **options):
"""
Saves the content of the :class:`DataFrame` as the specified table.
"""Saves the content of the :class:`DataFrame` as the specified table.
In the case the table already exists, behavior of this function depends on the
save mode, specified by the `mode` function (default to throwing an exception).
@ -328,64 +317,55 @@ class DataFrameWriter(object):
self.mode(mode).options(**options)
if format is not None:
self.format(format)
return self._jwrite.saveAsTable(name)
self._jwrite.saveAsTable(name)
@since(1.4)
def json(self, path, mode="error"):
"""
Saves the content of the :class:`DataFrame` in JSON format at the
specified path.
Additionally, mode is used to specify the behavior of the save operation when
data already exists in the data source. There are four modes:
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
:param path: the path in any Hadoop supported file system
:param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
:param mode: specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
return self._jwrite.mode(mode).json(path)
self._jwrite.mode(mode).json(path)
@since(1.4)
def parquet(self, path, mode="error"):
"""
Saves the content of the :class:`DataFrame` in Parquet format at the
specified path.
Additionally, mode is used to specify the behavior of the save operation when
data already exists in the data source. There are four modes:
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
"""Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
:param path: the path in any Hadoop supported file system
:param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
:param mode: specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
return self._jwrite.mode(mode).parquet(path)
self._jwrite.mode(mode).parquet(path)
@since(1.4)
def jdbc(self, url, table, mode="error", properties={}):
"""
Saves the content of the :class:`DataFrame` to a external database table
via JDBC.
"""Saves the content of the :class:`DataFrame` to a external database table via JDBC.
In the case the table already exists in the external database,
behavior of this function depends on the save mode, specified by the `mode`
function (default to throwing an exception). There are four modes:
.. note:: Don't create too many partitions in parallel on a large cluster;\
otherwise Spark might crash your external database systems.
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
:param url: a JDBC URL of the form `jdbc:subprotocol:subname`
:param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
:param table: Name of the table in the external database.
:param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
:param mode: specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param properties: JDBC database connection arguments, a list of
arbitrary string tag/value. Normally at least a
"user" and "password" property should be included.
@ -398,24 +378,23 @@ class DataFrameWriter(object):
def _test():
import doctest
import os
import tempfile
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext
import pyspark.sql.readwriter
os.chdir(os.environ["SPARK_HOME"])
globs = pyspark.sql.readwriter.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['tempfile'] = tempfile
globs['os'] = os
globs['sc'] = sc
globs['sqlContext'] = SQLContext(sc)
globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \
.toDF(StructType([StructField('age', IntegerType()),
StructField('name', StringType())]))
jsonStrings = [
'{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
'{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},'
'"field6":[{"field7": "row2"}]}',
'{"field1" : null, "field2": "row3", '
'"field3":{"field4":33, "field5": []}}'
]
globs['jsonStrings'] = jsonStrings
globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
(failure_count, test_count) = doctest.testmod(
pyspark.sql.readwriter, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)

View file

@ -753,8 +753,10 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
os.unlink(cls.tempdir.name)
_scala_HiveContext =\

Binary file not shown.

View file

@ -0,0 +1,3 @@
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}