2015-05-19 17:23:28 -04:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
2015-08-27 01:19:11 -04:00
|
|
|
import sys
|
|
|
|
|
|
|
|
if sys.version >= '3':
|
|
|
|
basestring = unicode = str
|
|
|
|
|
2015-05-19 17:23:28 -04:00
|
|
|
from py4j.java_gateway import JavaClass
|
|
|
|
|
2016-04-20 13:32:01 -04:00
|
|
|
from pyspark import RDD, since, keyword_only
|
2015-10-28 17:28:38 -04:00
|
|
|
from pyspark.rdd import ignore_unicode_prefix
|
2015-05-19 17:23:28 -04:00
|
|
|
from pyspark.sql.column import _to_seq
|
|
|
|
from pyspark.sql.types import *
|
2015-11-18 11:18:54 -05:00
|
|
|
from pyspark.sql import utils
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2016-06-29 01:07:11 -04:00
|
|
|
__all__ = ["DataFrameReader", "DataFrameWriter"]
|
2015-05-19 17:23:28 -04:00
|
|
|
|
|
|
|
|
2015-08-05 20:28:23 -04:00
|
|
|
def to_str(value):
|
|
|
|
"""
|
2016-04-22 12:19:36 -04:00
|
|
|
A wrapper over str(), but converts bool values to lower case strings.
|
|
|
|
If None is given, just returns None, instead of converting it to string "None".
|
2015-08-05 20:28:23 -04:00
|
|
|
"""
|
|
|
|
if isinstance(value, bool):
|
|
|
|
return str(value).lower()
|
2016-04-22 12:19:36 -04:00
|
|
|
elif value is None:
|
|
|
|
return value
|
2015-08-05 20:28:23 -04:00
|
|
|
else:
|
|
|
|
return str(value)
|
|
|
|
|
|
|
|
|
2016-06-28 16:43:59 -04:00
|
|
|
class OptionUtils(object):
|
[SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API
## What changes were proposed in this pull request?
- Fixed bug in Python API of DataStreamReader. Because a single path was being converted to a array before calling Java DataStreamReader method (which takes a string only), it gave the following error.
```
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 947, in pyspark.sql.readwriter.DataStreamReader.json
Failed example:
json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), schema = sdf_schema)
Exception raised:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/doctest.py", line 1253, in __run
compileflags, 1) in test.globs
File "<doctest pyspark.sql.readwriter.DataStreamReader.json[0]>", line 1, in <module>
json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), schema = sdf_schema)
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 963, in json
return self._df(self._jreader.json(path))
File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 316, in get_return_value
format(target_id, ".", name, value))
Py4JError: An error occurred while calling o121.json. Trace:
py4j.Py4JException: Method json([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:744)
```
- Reduced code duplication between DataStreamReader and DataFrameWriter
- Added missing Python doctests
## How was this patch tested?
New tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #13703 from tdas/SPARK-15981.
2016-06-16 16:17:41 -04:00
|
|
|
|
2016-06-28 16:43:59 -04:00
|
|
|
def _set_opts(self, schema=None, **options):
|
[SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API
## What changes were proposed in this pull request?
- Fixed bug in Python API of DataStreamReader. Because a single path was being converted to a array before calling Java DataStreamReader method (which takes a string only), it gave the following error.
```
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 947, in pyspark.sql.readwriter.DataStreamReader.json
Failed example:
json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), schema = sdf_schema)
Exception raised:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/doctest.py", line 1253, in __run
compileflags, 1) in test.globs
File "<doctest pyspark.sql.readwriter.DataStreamReader.json[0]>", line 1, in <module>
json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), schema = sdf_schema)
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 963, in json
return self._df(self._jreader.json(path))
File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 316, in get_return_value
format(target_id, ".", name, value))
Py4JError: An error occurred while calling o121.json. Trace:
py4j.Py4JException: Method json([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:744)
```
- Reduced code duplication between DataStreamReader and DataFrameWriter
- Added missing Python doctests
## How was this patch tested?
New tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #13703 from tdas/SPARK-15981.
2016-06-16 16:17:41 -04:00
|
|
|
"""
|
2016-06-28 16:43:59 -04:00
|
|
|
Set named options (filter out those the value is None)
|
[SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API
## What changes were proposed in this pull request?
- Fixed bug in Python API of DataStreamReader. Because a single path was being converted to a array before calling Java DataStreamReader method (which takes a string only), it gave the following error.
```
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 947, in pyspark.sql.readwriter.DataStreamReader.json
Failed example:
json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), schema = sdf_schema)
Exception raised:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/doctest.py", line 1253, in __run
compileflags, 1) in test.globs
File "<doctest pyspark.sql.readwriter.DataStreamReader.json[0]>", line 1, in <module>
json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), schema = sdf_schema)
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 963, in json
return self._df(self._jreader.json(path))
File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 316, in get_return_value
format(target_id, ".", name, value))
Py4JError: An error occurred while calling o121.json. Trace:
py4j.Py4JException: Method json([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:744)
```
- Reduced code duplication between DataStreamReader and DataFrameWriter
- Added missing Python doctests
## How was this patch tested?
New tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #13703 from tdas/SPARK-15981.
2016-06-16 16:17:41 -04:00
|
|
|
"""
|
|
|
|
if schema is not None:
|
|
|
|
self.schema(schema)
|
2016-06-28 16:43:59 -04:00
|
|
|
for k, v in options.items():
|
|
|
|
if v is not None:
|
|
|
|
self.option(k, v)
|
|
|
|
|
|
|
|
|
|
|
|
class DataFrameReader(OptionUtils):
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
|
|
|
Interface used to load a :class:`DataFrame` from external storage systems
|
2016-05-11 14:24:16 -04:00
|
|
|
(e.g. file systems, key-value stores, etc). Use :func:`spark.read`
|
2015-05-19 17:23:28 -04:00
|
|
|
to access this.
|
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
.. versionadded:: 1.4
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
|
|
|
|
2016-05-11 14:24:16 -04:00
|
|
|
def __init__(self, spark):
|
|
|
|
self._jreader = spark._ssql_ctx.read()
|
|
|
|
self._spark = spark
|
2015-05-19 17:23:28 -04:00
|
|
|
|
|
|
|
def _df(self, jdf):
|
|
|
|
from pyspark.sql.dataframe import DataFrame
|
2016-05-11 14:24:16 -04:00
|
|
|
return DataFrame(jdf, self._spark)
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-06-02 11:37:18 -04:00
|
|
|
@since(1.4)
|
|
|
|
def format(self, source):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Specifies the input data source format.
|
|
|
|
|
|
|
|
:param source: string, name of the data source, e.g. 'json', 'parquet'.
|
|
|
|
|
2016-05-11 14:24:16 -04:00
|
|
|
>>> df = spark.read.format('json').load('python/test_support/sql/people.json')
|
2015-06-03 03:23:34 -04:00
|
|
|
>>> df.dtypes
|
|
|
|
[('age', 'bigint'), ('name', 'string')]
|
|
|
|
|
2015-06-02 11:37:18 -04:00
|
|
|
"""
|
|
|
|
self._jreader = self._jreader.format(source)
|
|
|
|
return self
|
|
|
|
|
|
|
|
@since(1.4)
|
|
|
|
def schema(self, schema):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Specifies the input schema.
|
|
|
|
|
|
|
|
Some data sources (e.g. JSON) can infer the input schema automatically from data.
|
|
|
|
By specifying the schema here, the underlying data source can skip the schema
|
2015-06-02 11:37:18 -04:00
|
|
|
inference step, and thus speed up data loading.
|
|
|
|
|
|
|
|
:param schema: a StructType object
|
|
|
|
"""
|
|
|
|
if not isinstance(schema, StructType):
|
|
|
|
raise TypeError("schema should be StructType")
|
2016-05-11 14:24:16 -04:00
|
|
|
jschema = self._spark._ssql_ctx.parseDataType(schema.json())
|
2015-06-02 11:37:18 -04:00
|
|
|
self._jreader = self._jreader.schema(jschema)
|
|
|
|
return self
|
|
|
|
|
2015-06-29 03:13:39 -04:00
|
|
|
@since(1.5)
|
|
|
|
def option(self, key, value):
|
|
|
|
"""Adds an input option for the underlying data source.
|
|
|
|
"""
|
2015-08-05 20:28:23 -04:00
|
|
|
self._jreader = self._jreader.option(key, to_str(value))
|
2015-06-29 03:13:39 -04:00
|
|
|
return self
|
|
|
|
|
2015-06-02 11:37:18 -04:00
|
|
|
@since(1.4)
|
|
|
|
def options(self, **options):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Adds input options for the underlying data source.
|
2015-06-02 11:37:18 -04:00
|
|
|
"""
|
|
|
|
for k in options:
|
2015-08-05 20:28:23 -04:00
|
|
|
self._jreader = self._jreader.option(k, to_str(options[k]))
|
2015-06-02 11:37:18 -04:00
|
|
|
return self
|
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
@since(1.4)
|
2015-05-19 17:23:28 -04:00
|
|
|
def load(self, path=None, format=None, schema=None, **options):
|
|
|
|
"""Loads data from a data source and returns it as a :class`DataFrame`.
|
|
|
|
|
2015-11-24 21:16:07 -05:00
|
|
|
:param path: optional string or a list of string for file-system backed data sources.
|
2015-05-19 17:23:28 -04:00
|
|
|
:param format: optional string for format of the data source. Default to 'parquet'.
|
|
|
|
:param schema: optional :class:`StructType` for the input schema.
|
|
|
|
:param options: all other string options
|
2015-06-03 03:23:34 -04:00
|
|
|
|
2016-05-11 14:24:16 -04:00
|
|
|
>>> df = spark.read.load('python/test_support/sql/parquet_partitioned', opt1=True,
|
2015-08-05 20:28:23 -04:00
|
|
|
... opt2=1, opt3='str')
|
2015-06-03 03:23:34 -04:00
|
|
|
>>> df.dtypes
|
|
|
|
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
|
2015-11-24 21:16:07 -05:00
|
|
|
|
2016-05-11 14:24:16 -04:00
|
|
|
>>> df = spark.read.format('json').load(['python/test_support/sql/people.json',
|
2015-10-17 17:56:24 -04:00
|
|
|
... 'python/test_support/sql/people1.json'])
|
|
|
|
>>> df.dtypes
|
|
|
|
[('age', 'bigint'), ('aka', 'string'), ('name', 'string')]
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
|
|
|
if format is not None:
|
2015-06-02 11:37:18 -04:00
|
|
|
self.format(format)
|
2015-05-19 17:23:28 -04:00
|
|
|
if schema is not None:
|
2015-06-02 11:37:18 -04:00
|
|
|
self.schema(schema)
|
|
|
|
self.options(**options)
|
2015-05-19 17:23:28 -04:00
|
|
|
if path is not None:
|
2016-01-04 21:02:38 -05:00
|
|
|
if type(path) != list:
|
|
|
|
path = [path]
|
2016-05-11 14:24:16 -04:00
|
|
|
return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
|
2015-05-19 17:23:28 -04:00
|
|
|
else:
|
2015-06-02 11:37:18 -04:00
|
|
|
return self._df(self._jreader.load())
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
@since(1.4)
|
2016-05-02 20:50:40 -04:00
|
|
|
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
|
|
|
|
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
|
|
|
|
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
|
|
|
|
mode=None, columnNameOfCorruptRecord=None):
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
2015-08-27 01:19:11 -04:00
|
|
|
Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
|
|
|
|
(one object per record) and returns the result as a :class`DataFrame`.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
|
|
|
If the ``schema`` parameter is not specified, this function goes
|
|
|
|
through the input once to determine the input schema.
|
|
|
|
|
2015-08-27 01:19:11 -04:00
|
|
|
:param path: string represents path to the JSON dataset,
|
|
|
|
or RDD of Strings storing JSON objects.
|
2015-05-19 17:23:28 -04:00
|
|
|
:param schema: an optional :class:`StructType` for the input schema.
|
2016-05-02 20:50:40 -04:00
|
|
|
:param primitivesAsString: infers all primitive values as a string type. If None is set,
|
|
|
|
it uses the default value, ``false``.
|
|
|
|
:param prefersDecimal: infers all floating-point values as a decimal type. If the values
|
|
|
|
do not fit in decimal, then it infers them as doubles. If None is
|
|
|
|
set, it uses the default value, ``false``.
|
|
|
|
:param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
|
|
|
|
it uses the default value, ``false``.
|
|
|
|
:param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
|
|
|
|
it uses the default value, ``false``.
|
|
|
|
:param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
|
|
|
|
set, it uses the default value, ``true``.
|
|
|
|
:param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
|
|
|
|
set, it uses the default value, ``false``.
|
|
|
|
:param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
|
|
|
|
using backslash quoting mechanism. If None is
|
|
|
|
set, it uses the default value, ``false``.
|
|
|
|
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
|
|
|
|
set, it uses the default value, ``PERMISSIVE``.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2016-03-21 03:42:35 -04:00
|
|
|
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
|
|
|
|
record and puts the malformed string into a new field configured by \
|
2016-03-22 08:30:48 -04:00
|
|
|
``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
|
2016-03-21 03:42:35 -04:00
|
|
|
``null`` for extra fields.
|
|
|
|
* ``DROPMALFORMED`` : ignores the whole corrupted records.
|
|
|
|
* ``FAILFAST`` : throws an exception when it meets corrupted records.
|
2016-05-02 20:50:40 -04:00
|
|
|
|
|
|
|
:param columnNameOfCorruptRecord: allows renaming the new field having malformed string
|
|
|
|
created by ``PERMISSIVE`` mode. This overrides
|
|
|
|
``spark.sql.columnNameOfCorruptRecord``. If None is set,
|
2016-06-12 02:20:40 -04:00
|
|
|
it uses the value specified in
|
|
|
|
``spark.sql.columnNameOfCorruptRecord``.
|
2015-11-16 03:06:14 -05:00
|
|
|
|
2016-05-11 14:24:16 -04:00
|
|
|
>>> df1 = spark.read.json('python/test_support/sql/people.json')
|
2015-08-27 01:19:11 -04:00
|
|
|
>>> df1.dtypes
|
|
|
|
[('age', 'bigint'), ('name', 'string')]
|
|
|
|
>>> rdd = sc.textFile('python/test_support/sql/people.json')
|
2016-05-11 14:24:16 -04:00
|
|
|
>>> df2 = spark.read.json(rdd)
|
2015-08-27 01:19:11 -04:00
|
|
|
>>> df2.dtypes
|
2015-06-03 03:23:34 -04:00
|
|
|
[('age', 'bigint'), ('name', 'string')]
|
|
|
|
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
2016-06-28 16:43:59 -04:00
|
|
|
self._set_opts(
|
2016-06-21 13:47:51 -04:00
|
|
|
schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
|
|
|
|
allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
|
|
|
|
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
|
|
|
|
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
|
|
|
|
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
|
2015-08-27 01:19:11 -04:00
|
|
|
if isinstance(path, basestring):
|
2016-05-11 01:21:17 -04:00
|
|
|
path = [path]
|
|
|
|
if type(path) == list:
|
2016-05-11 14:24:16 -04:00
|
|
|
return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
|
2015-08-27 01:19:11 -04:00
|
|
|
elif isinstance(path, RDD):
|
2016-01-04 21:02:38 -05:00
|
|
|
def func(iterator):
|
|
|
|
for x in iterator:
|
|
|
|
if not isinstance(x, basestring):
|
|
|
|
x = unicode(x)
|
|
|
|
if isinstance(x, unicode):
|
|
|
|
x = x.encode("utf-8")
|
|
|
|
yield x
|
|
|
|
keyed = path.mapPartitions(func)
|
|
|
|
keyed._bypass_serializer = True
|
2016-05-11 14:24:16 -04:00
|
|
|
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
|
2016-01-04 21:02:38 -05:00
|
|
|
return self._df(self._jreader.json(jrdd))
|
2015-08-27 01:19:11 -04:00
|
|
|
else:
|
|
|
|
raise TypeError("path can be only string or RDD")
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
@since(1.4)
|
2015-05-19 17:23:28 -04:00
|
|
|
def table(self, tableName):
|
|
|
|
"""Returns the specified table as a :class:`DataFrame`.
|
|
|
|
|
2015-06-03 03:23:34 -04:00
|
|
|
:param tableName: string, name of the table.
|
|
|
|
|
2016-05-11 14:24:16 -04:00
|
|
|
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
|
2016-05-17 21:01:59 -04:00
|
|
|
>>> df.createOrReplaceTempView('tmpTable')
|
2016-05-11 14:24:16 -04:00
|
|
|
>>> spark.read.table('tmpTable').dtypes
|
2015-06-03 03:23:34 -04:00
|
|
|
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
|
|
|
return self._df(self._jreader.table(tableName))
|
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
@since(1.4)
|
2015-07-21 03:08:44 -04:00
|
|
|
def parquet(self, *paths):
|
2015-05-19 17:23:28 -04:00
|
|
|
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
|
|
|
|
|
2016-06-12 02:20:40 -04:00
|
|
|
You can set the following Parquet-specific option(s) for reading Parquet files:
|
|
|
|
* ``mergeSchema``: sets whether we should merge schemas collected from all \
|
|
|
|
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
|
|
|
|
The default value is specified in ``spark.sql.parquet.mergeSchema``.
|
|
|
|
|
2016-05-11 14:24:16 -04:00
|
|
|
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
|
2015-06-03 03:23:34 -04:00
|
|
|
>>> df.dtypes
|
|
|
|
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
2016-05-11 14:24:16 -04:00
|
|
|
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
|
2015-07-21 03:08:44 -04:00
|
|
|
|
2015-10-28 17:28:38 -04:00
|
|
|
@ignore_unicode_prefix
|
|
|
|
@since(1.6)
|
2015-11-24 21:16:07 -05:00
|
|
|
def text(self, paths):
|
2016-06-13 00:36:41 -04:00
|
|
|
"""
|
|
|
|
Loads text files and returns a :class:`DataFrame` whose schema starts with a
|
|
|
|
string column named "value", and followed by partitioned columns if there
|
|
|
|
are any.
|
2015-10-28 17:28:38 -04:00
|
|
|
|
|
|
|
Each line in the text file is a new row in the resulting DataFrame.
|
|
|
|
|
2015-11-24 21:16:07 -05:00
|
|
|
:param paths: string, or list of strings, for input path(s).
|
|
|
|
|
2016-05-11 14:24:16 -04:00
|
|
|
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
|
2015-10-28 17:28:38 -04:00
|
|
|
>>> df.collect()
|
2015-11-06 20:22:30 -05:00
|
|
|
[Row(value=u'hello'), Row(value=u'this')]
|
2015-10-28 17:28:38 -04:00
|
|
|
"""
|
2015-11-24 21:16:07 -05:00
|
|
|
if isinstance(paths, basestring):
|
2016-05-02 20:50:40 -04:00
|
|
|
path = [paths]
|
2016-05-11 14:24:16 -04:00
|
|
|
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(path)))
|
2015-10-28 17:28:38 -04:00
|
|
|
|
2016-02-29 12:44:29 -05:00
|
|
|
@since(2.0)
|
2016-06-06 02:40:13 -04:00
|
|
|
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
|
2016-06-12 02:20:40 -04:00
|
|
|
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
|
|
|
|
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
|
2016-06-21 13:47:51 -04:00
|
|
|
negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
|
|
|
|
maxMalformedLogPerPartition=None, mode=None):
|
2016-06-12 02:20:40 -04:00
|
|
|
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
|
2016-02-29 12:44:29 -05:00
|
|
|
|
2016-06-12 02:20:40 -04:00
|
|
|
This function will go through the input once to determine the input schema if
|
|
|
|
``inferSchema`` is enabled. To avoid going through the entire data once, disable
|
|
|
|
``inferSchema`` option or specify the schema explicitly using ``schema``.
|
2016-02-29 12:44:29 -05:00
|
|
|
|
2016-05-02 20:50:40 -04:00
|
|
|
:param path: string, or list of strings, for input path(s).
|
|
|
|
:param schema: an optional :class:`StructType` for the input schema.
|
|
|
|
:param sep: sets the single character as a separator for each field and value.
|
2016-06-06 02:40:13 -04:00
|
|
|
If None is set, it uses the default value, ``,``.
|
|
|
|
:param encoding: decodes the CSV files by the given encoding type. If None is set,
|
|
|
|
it uses the default value, ``UTF-8``.
|
2016-05-02 20:50:40 -04:00
|
|
|
:param quote: sets the single character used for escaping quoted values where the
|
2016-06-06 02:40:13 -04:00
|
|
|
separator can be part of the value. If None is set, it uses the default
|
2016-06-11 18:12:21 -04:00
|
|
|
value, ``"``. If you would like to turn off quotations, you need to set an
|
|
|
|
empty string.
|
2016-05-02 20:50:40 -04:00
|
|
|
:param escape: sets the single character used for escaping quotes inside an already
|
2016-06-06 02:40:13 -04:00
|
|
|
quoted value. If None is set, it uses the default value, ``\``.
|
2016-05-02 20:50:40 -04:00
|
|
|
:param comment: sets the single character used for skipping lines beginning with this
|
|
|
|
character. By default (None), it is disabled.
|
2016-06-06 02:40:13 -04:00
|
|
|
:param header: uses the first line as names of columns. If None is set, it uses the
|
|
|
|
default value, ``false``.
|
2016-06-12 02:20:40 -04:00
|
|
|
:param inferSchema: infers the input schema automatically from data. It requires one extra
|
|
|
|
pass over the data. If None is set, it uses the default value, ``false``.
|
2016-05-02 20:50:40 -04:00
|
|
|
:param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
|
2016-06-06 02:40:13 -04:00
|
|
|
being read should be skipped. If None is set, it uses
|
|
|
|
the default value, ``false``.
|
2016-05-02 20:50:40 -04:00
|
|
|
:param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
|
2016-06-06 02:40:13 -04:00
|
|
|
being read should be skipped. If None is set, it uses
|
|
|
|
the default value, ``false``.
|
|
|
|
:param nullValue: sets the string representation of a null value. If None is set, it uses
|
|
|
|
the default value, empty string.
|
|
|
|
:param nanValue: sets the string representation of a non-number value. If None is set, it
|
|
|
|
uses the default value, ``NaN``.
|
|
|
|
:param positiveInf: sets the string representation of a positive infinity value. If None
|
|
|
|
is set, it uses the default value, ``Inf``.
|
|
|
|
:param negativeInf: sets the string representation of a negative infinity value. If None
|
|
|
|
is set, it uses the default value, ``Inf``.
|
2016-05-02 20:50:40 -04:00
|
|
|
:param dateFormat: sets the string that indicates a date format. Custom date formats
|
|
|
|
follow the formats at ``java.text.SimpleDateFormat``. This
|
|
|
|
applies to both date type and timestamp type. By default, it is None
|
|
|
|
which means trying to parse times and date by
|
|
|
|
``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
|
2016-06-06 02:40:13 -04:00
|
|
|
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
|
|
|
|
set, it uses the default value, ``20480``.
|
2016-05-02 20:50:40 -04:00
|
|
|
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
|
2016-06-06 02:40:13 -04:00
|
|
|
value being read. If None is set, it uses the default value,
|
|
|
|
``1000000``.
|
[SPARK-13792][SQL] Limit logging of bad records in CSV data source
## What changes were proposed in this pull request?
This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader to limit the maximum of logging message Spark generates per partition for malformed records.
The error log looks something like
```
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been found on this partition. Malformed records from now on will not be logged.
```
Closes #12173
## How was this patch tested?
Manually tested.
Author: Reynold Xin <rxin@databricks.com>
Closes #13795 from rxin/SPARK-13792.
2016-06-21 00:46:12 -04:00
|
|
|
:param maxMalformedLogPerPartition: sets the maximum number of malformed rows Spark will
|
|
|
|
log for each partition. Malformed records beyond this
|
|
|
|
number will be ignored. If None is set, it
|
|
|
|
uses the default value, ``10``.
|
2016-06-06 02:40:13 -04:00
|
|
|
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
|
|
|
|
set, it uses the default value, ``PERMISSIVE``.
|
2016-05-02 20:50:40 -04:00
|
|
|
|
|
|
|
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
|
2016-05-01 22:05:20 -04:00
|
|
|
When a schema is set by user, it sets ``null`` for extra fields.
|
|
|
|
* ``DROPMALFORMED`` : ignores the whole corrupted records.
|
|
|
|
* ``FAILFAST`` : throws an exception when it meets corrupted records.
|
|
|
|
|
2016-05-11 14:24:16 -04:00
|
|
|
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
|
2016-02-29 12:44:29 -05:00
|
|
|
>>> df.dtypes
|
[SPARK-15264][SPARK-15274][SQL] CSV Reader Error on Blank Column Names
## What changes were proposed in this pull request?
When a CSV begins with:
- `,,`
OR
- `"","",`
meaning that the first column names are either empty or blank strings and `header` is specified to be `true`, then the column name is replaced with `C` + the index number of that given column. For example, if you were to read in the CSV:
```
"","second column"
"hello", "there"
```
Then column names would become `"C0", "second column"`.
This behavior aligns with what currently happens when `header` is specified to be `false` in recent versions of Spark.
### Current Behavior in Spark <=1.6
In Spark <=1.6, a CSV with a blank column name becomes a blank string, `""`, meaning that this column cannot be accessed. However the CSV reads in without issue.
### Current Behavior in Spark 2.0
Spark throws a NullPointerError and will not read in the file.
#### Reproduction in 2.0
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/346304/2828750690305044/484361/latest.html
## How was this patch tested?
A new test was added to `CSVSuite` to account for this issue. We then have asserts that test for being able to select both the empty column names as well as the regular column names.
Author: Bill Chambers <bill@databricks.com>
Author: Bill Chambers <wchambers@ischool.berkeley.edu>
Closes #13041 from anabranch/master.
2016-05-11 20:42:13 -04:00
|
|
|
[('_c0', 'string'), ('_c1', 'string')]
|
2016-02-29 12:44:29 -05:00
|
|
|
"""
|
2016-06-28 16:43:59 -04:00
|
|
|
self._set_opts(
|
2016-06-21 13:47:51 -04:00
|
|
|
schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
|
|
|
|
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
|
|
|
|
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
|
|
|
|
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
|
|
|
|
dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
|
|
|
|
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
|
2016-05-02 20:50:40 -04:00
|
|
|
if isinstance(path, basestring):
|
|
|
|
path = [path]
|
2016-05-11 14:24:16 -04:00
|
|
|
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
|
2016-02-29 12:44:29 -05:00
|
|
|
|
2015-07-21 03:08:44 -04:00
|
|
|
@since(1.5)
|
|
|
|
def orc(self, path):
|
2015-10-28 17:28:38 -04:00
|
|
|
"""Loads an ORC file, returning the result as a :class:`DataFrame`.
|
2015-07-21 03:08:44 -04:00
|
|
|
|
2016-05-11 18:31:16 -04:00
|
|
|
.. note:: Currently ORC support is only available together with Hive support.
|
2015-07-21 03:08:44 -04:00
|
|
|
|
2016-05-11 18:31:16 -04:00
|
|
|
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
|
2015-07-21 03:08:44 -04:00
|
|
|
>>> df.dtypes
|
|
|
|
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
|
|
|
|
"""
|
|
|
|
return self._df(self._jreader.orc(path))
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
@since(1.4)
|
2015-05-19 17:23:28 -04:00
|
|
|
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
|
2015-08-14 15:46:05 -04:00
|
|
|
predicates=None, properties=None):
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
2016-05-11 18:31:16 -04:00
|
|
|
Construct a :class:`DataFrame` representing the database table named ``table``
|
|
|
|
accessible via JDBC URL ``url`` and connection ``properties``.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2016-05-11 18:31:16 -04:00
|
|
|
Partitions of the table will be retrieved in parallel if either ``column`` or
|
|
|
|
``predicates`` is specified.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2016-05-11 18:31:16 -04:00
|
|
|
If both ``column`` and ``predicates`` are specified, ``column`` will be used.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2016-05-11 18:31:16 -04:00
|
|
|
.. note:: Don't create too many partitions in parallel on a large cluster; \
|
2015-05-19 17:23:28 -04:00
|
|
|
otherwise Spark might crash your external database systems.
|
|
|
|
|
2016-05-11 18:31:16 -04:00
|
|
|
:param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
|
|
|
|
:param table: the name of the table
|
|
|
|
:param column: the name of an integer column that will be used for partitioning;
|
|
|
|
if this parameter is specified, then ``numPartitions``, ``lowerBound``
|
|
|
|
(inclusive), and ``upperBound`` (exclusive) will form partition strides
|
|
|
|
for generated WHERE clause expressions used to split the column
|
|
|
|
``column`` evenly
|
|
|
|
:param lowerBound: the minimum value of ``column`` used to decide partition stride
|
|
|
|
:param upperBound: the maximum value of ``column`` used to decide partition stride
|
2015-05-19 17:23:28 -04:00
|
|
|
:param numPartitions: the number of partitions
|
2016-05-11 18:31:16 -04:00
|
|
|
:param predicates: a list of expressions suitable for inclusion in WHERE clauses;
|
|
|
|
each one defines one partition of the :class:`DataFrame`
|
|
|
|
:param properties: a dictionary of JDBC database connection arguments; normally,
|
|
|
|
at least a "user" and "password" property should be included
|
2015-05-19 17:23:28 -04:00
|
|
|
:return: a DataFrame
|
|
|
|
"""
|
2015-08-14 15:46:05 -04:00
|
|
|
if properties is None:
|
|
|
|
properties = dict()
|
2016-05-11 14:24:16 -04:00
|
|
|
jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
|
2015-05-19 17:23:28 -04:00
|
|
|
for k in properties:
|
|
|
|
jprop.setProperty(k, properties[k])
|
|
|
|
if column is not None:
|
|
|
|
if numPartitions is None:
|
2016-05-11 14:24:16 -04:00
|
|
|
numPartitions = self._spark._sc.defaultParallelism
|
2015-05-19 17:23:28 -04:00
|
|
|
return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound),
|
|
|
|
int(numPartitions), jprop))
|
|
|
|
if predicates is not None:
|
2016-05-11 14:24:16 -04:00
|
|
|
gateway = self._spark._sc._gateway
|
2015-11-18 11:18:54 -05:00
|
|
|
jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
|
|
|
|
return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
|
2015-05-19 17:23:28 -04:00
|
|
|
return self._df(self._jreader.jdbc(url, table, jprop))
|
|
|
|
|
|
|
|
|
2016-06-28 16:43:59 -04:00
|
|
|
class DataFrameWriter(OptionUtils):
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
2016-06-12 02:20:40 -04:00
|
|
|
Interface used to write a :class:`DataFrame` to external storage systems
|
2015-05-19 17:23:28 -04:00
|
|
|
(e.g. file systems, key-value stores, etc). Use :func:`DataFrame.write`
|
|
|
|
to access this.
|
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
.. versionadded:: 1.4
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
|
|
|
def __init__(self, df):
|
|
|
|
self._df = df
|
2016-05-11 14:24:16 -04:00
|
|
|
self._spark = df.sql_ctx
|
2015-05-19 17:23:28 -04:00
|
|
|
self._jwrite = df._jdf.write()
|
|
|
|
|
2016-06-15 13:46:02 -04:00
|
|
|
def _sq(self, jsq):
|
|
|
|
from pyspark.sql.streaming import StreamingQuery
|
|
|
|
return StreamingQuery(jsq)
|
2016-04-20 13:32:01 -04:00
|
|
|
|
2015-06-02 11:37:18 -04:00
|
|
|
@since(1.4)
|
|
|
|
def mode(self, saveMode):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Specifies the behavior when data or table already exists.
|
|
|
|
|
|
|
|
Options include:
|
2015-06-02 11:37:18 -04:00
|
|
|
|
|
|
|
* `append`: Append contents of this :class:`DataFrame` to existing data.
|
|
|
|
* `overwrite`: Overwrite existing data.
|
|
|
|
* `error`: Throw an exception if data already exists.
|
|
|
|
* `ignore`: Silently ignore this operation if data already exists.
|
2015-06-03 03:23:34 -04:00
|
|
|
|
|
|
|
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
|
2015-06-02 11:37:18 -04:00
|
|
|
"""
|
2015-06-22 16:51:23 -04:00
|
|
|
# At the JVM side, the default value of mode is already set to "error".
|
|
|
|
# So, if the given saveMode is None, we will not call JVM-side's mode method.
|
|
|
|
if saveMode is not None:
|
|
|
|
self._jwrite = self._jwrite.mode(saveMode)
|
2015-06-02 11:37:18 -04:00
|
|
|
return self
|
|
|
|
|
|
|
|
@since(1.4)
|
|
|
|
def format(self, source):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Specifies the underlying output data source.
|
|
|
|
|
|
|
|
:param source: string, name of the data source, e.g. 'json', 'parquet'.
|
|
|
|
|
|
|
|
>>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
|
2015-06-02 11:37:18 -04:00
|
|
|
"""
|
|
|
|
self._jwrite = self._jwrite.format(source)
|
|
|
|
return self
|
|
|
|
|
2015-06-29 03:13:39 -04:00
|
|
|
@since(1.5)
|
|
|
|
def option(self, key, value):
|
|
|
|
"""Adds an output option for the underlying data source.
|
|
|
|
"""
|
2016-04-22 12:19:36 -04:00
|
|
|
self._jwrite = self._jwrite.option(key, to_str(value))
|
2015-06-29 03:13:39 -04:00
|
|
|
return self
|
|
|
|
|
2015-06-02 11:37:18 -04:00
|
|
|
@since(1.4)
|
|
|
|
def options(self, **options):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Adds output options for the underlying data source.
|
2015-06-02 11:37:18 -04:00
|
|
|
"""
|
|
|
|
for k in options:
|
2016-04-22 12:19:36 -04:00
|
|
|
self._jwrite = self._jwrite.option(k, to_str(options[k]))
|
2015-06-02 11:37:18 -04:00
|
|
|
return self
|
|
|
|
|
|
|
|
@since(1.4)
|
|
|
|
def partitionBy(self, *cols):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Partitions the output by the given columns on the file system.
|
|
|
|
|
2015-06-02 11:37:18 -04:00
|
|
|
If specified, the output is laid out on the file system similar
|
|
|
|
to Hive's partitioning scheme.
|
|
|
|
|
|
|
|
:param cols: name of columns
|
2015-06-03 03:23:34 -04:00
|
|
|
|
|
|
|
>>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
|
2015-06-02 11:37:18 -04:00
|
|
|
"""
|
|
|
|
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
|
|
|
|
cols = cols[0]
|
2016-05-11 14:24:16 -04:00
|
|
|
self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
|
2015-06-02 11:37:18 -04:00
|
|
|
return self
|
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
@since(1.4)
|
2015-06-29 03:22:44 -04:00
|
|
|
def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Saves the contents of the :class:`DataFrame` to a data source.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
|
|
|
The data source is specified by the ``format`` and a set of ``options``.
|
|
|
|
If ``format`` is not specified, the default data source configured by
|
|
|
|
``spark.sql.sources.default`` will be used.
|
|
|
|
|
|
|
|
:param path: the path in a Hadoop supported file system
|
|
|
|
:param format: the format used to save
|
2015-06-03 03:23:34 -04:00
|
|
|
:param mode: specifies the behavior of the save operation when data already exists.
|
|
|
|
|
|
|
|
* ``append``: Append contents of this :class:`DataFrame` to existing data.
|
|
|
|
* ``overwrite``: Overwrite existing data.
|
|
|
|
* ``ignore``: Silently ignore this operation if data already exists.
|
|
|
|
* ``error`` (default case): Throw an exception if data already exists.
|
2015-06-22 16:51:23 -04:00
|
|
|
:param partitionBy: names of partitioning columns
|
2015-05-19 17:23:28 -04:00
|
|
|
:param options: all other string options
|
2015-06-03 03:23:34 -04:00
|
|
|
|
|
|
|
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
2015-06-29 03:22:44 -04:00
|
|
|
self.mode(mode).options(**options)
|
|
|
|
if partitionBy is not None:
|
|
|
|
self.partitionBy(partitionBy)
|
2015-05-19 17:23:28 -04:00
|
|
|
if format is not None:
|
2015-06-02 11:37:18 -04:00
|
|
|
self.format(format)
|
2015-05-19 17:23:28 -04:00
|
|
|
if path is None:
|
2015-06-02 11:37:18 -04:00
|
|
|
self._jwrite.save()
|
2015-05-19 17:23:28 -04:00
|
|
|
else:
|
2015-06-02 11:37:18 -04:00
|
|
|
self._jwrite.save(path)
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-06-02 11:37:18 -04:00
|
|
|
@since(1.4)
|
2015-05-23 12:07:14 -04:00
|
|
|
def insertInto(self, tableName, overwrite=False):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Inserts the content of the :class:`DataFrame` to the specified table.
|
|
|
|
|
2015-05-23 12:07:14 -04:00
|
|
|
It requires that the schema of the class:`DataFrame` is the same as the
|
|
|
|
schema of the table.
|
|
|
|
|
|
|
|
Optionally overwriting any existing data.
|
|
|
|
"""
|
|
|
|
self._jwrite.mode("overwrite" if overwrite else "append").insertInto(tableName)
|
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
@since(1.4)
|
2015-06-29 03:22:44 -04:00
|
|
|
def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Saves the content of the :class:`DataFrame` as the specified table.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-05-23 12:07:14 -04:00
|
|
|
In the case the table already exists, behavior of this function depends on the
|
|
|
|
save mode, specified by the `mode` function (default to throwing an exception).
|
2016-06-12 02:20:40 -04:00
|
|
|
When `mode` is `Overwrite`, the schema of the :class:`DataFrame` does not need to be
|
2015-05-23 12:07:14 -04:00
|
|
|
the same as that of the existing table.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
|
|
|
* `append`: Append contents of this :class:`DataFrame` to existing data.
|
|
|
|
* `overwrite`: Overwrite existing data.
|
|
|
|
* `error`: Throw an exception if data already exists.
|
|
|
|
* `ignore`: Silently ignore this operation if data already exists.
|
|
|
|
|
|
|
|
:param name: the table name
|
|
|
|
:param format: the format used to save
|
|
|
|
:param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
|
2015-06-22 16:51:23 -04:00
|
|
|
:param partitionBy: names of partitioning columns
|
2015-05-19 17:23:28 -04:00
|
|
|
:param options: all other string options
|
|
|
|
"""
|
2015-06-29 03:22:44 -04:00
|
|
|
self.mode(mode).options(**options)
|
|
|
|
if partitionBy is not None:
|
|
|
|
self.partitionBy(partitionBy)
|
2015-05-19 17:23:28 -04:00
|
|
|
if format is not None:
|
2015-06-02 11:37:18 -04:00
|
|
|
self.format(format)
|
2015-06-03 03:23:34 -04:00
|
|
|
self._jwrite.saveAsTable(name)
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
@since(1.4)
|
2016-03-03 13:30:55 -05:00
|
|
|
def json(self, path, mode=None, compression=None):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-06-03 03:23:34 -04:00
|
|
|
:param path: the path in any Hadoop supported file system
|
|
|
|
:param mode: specifies the behavior of the save operation when data already exists.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-06-03 03:23:34 -04:00
|
|
|
* ``append``: Append contents of this :class:`DataFrame` to existing data.
|
|
|
|
* ``overwrite``: Overwrite existing data.
|
|
|
|
* ``ignore``: Silently ignore this operation if data already exists.
|
|
|
|
* ``error`` (default case): Throw an exception if data already exists.
|
2016-03-03 13:30:55 -05:00
|
|
|
:param compression: compression codec to use when saving to file. This can be one of the
|
|
|
|
known case-insensitive shorten names (none, bzip2, gzip, lz4,
|
|
|
|
snappy and deflate).
|
2016-02-29 12:44:29 -05:00
|
|
|
|
2015-06-03 03:23:34 -04:00
|
|
|
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
2016-03-03 13:30:55 -05:00
|
|
|
self.mode(mode)
|
2016-06-28 16:43:59 -04:00
|
|
|
self._set_opts(compression=compression)
|
2016-03-03 13:30:55 -05:00
|
|
|
self._jwrite.json(path)
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
@since(1.4)
|
2016-03-03 13:30:55 -05:00
|
|
|
def parquet(self, path, mode=None, partitionBy=None, compression=None):
|
2015-06-03 03:23:34 -04:00
|
|
|
"""Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-06-03 03:23:34 -04:00
|
|
|
:param path: the path in any Hadoop supported file system
|
|
|
|
:param mode: specifies the behavior of the save operation when data already exists.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-06-03 03:23:34 -04:00
|
|
|
* ``append``: Append contents of this :class:`DataFrame` to existing data.
|
|
|
|
* ``overwrite``: Overwrite existing data.
|
|
|
|
* ``ignore``: Silently ignore this operation if data already exists.
|
|
|
|
* ``error`` (default case): Throw an exception if data already exists.
|
2015-06-22 16:51:23 -04:00
|
|
|
:param partitionBy: names of partitioning columns
|
2016-03-03 13:30:55 -05:00
|
|
|
:param compression: compression codec to use when saving to file. This can be one of the
|
|
|
|
known case-insensitive shorten names (none, snappy, gzip, and lzo).
|
2016-06-12 02:20:40 -04:00
|
|
|
This will override ``spark.sql.parquet.compression.codec``. If None
|
|
|
|
is set, it uses the value specified in
|
|
|
|
``spark.sql.parquet.compression.codec``.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-06-03 03:23:34 -04:00
|
|
|
>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
2015-06-29 03:22:44 -04:00
|
|
|
self.mode(mode)
|
|
|
|
if partitionBy is not None:
|
|
|
|
self.partitionBy(partitionBy)
|
2016-06-28 16:43:59 -04:00
|
|
|
self._set_opts(compression=compression)
|
2015-06-22 16:51:23 -04:00
|
|
|
self._jwrite.parquet(path)
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-10-28 17:28:38 -04:00
|
|
|
@since(1.6)
|
2016-03-03 13:30:55 -05:00
|
|
|
def text(self, path, compression=None):
|
2015-10-28 17:28:38 -04:00
|
|
|
"""Saves the content of the DataFrame in a text file at the specified path.
|
|
|
|
|
2016-02-29 12:44:29 -05:00
|
|
|
:param path: the path in any Hadoop supported file system
|
2016-03-03 13:30:55 -05:00
|
|
|
:param compression: compression codec to use when saving to file. This can be one of the
|
|
|
|
known case-insensitive shorten names (none, bzip2, gzip, lz4,
|
|
|
|
snappy and deflate).
|
2016-02-29 12:44:29 -05:00
|
|
|
|
2015-10-28 17:28:38 -04:00
|
|
|
The DataFrame must have only one column that is of string type.
|
|
|
|
Each row becomes a new line in the output file.
|
|
|
|
"""
|
2016-06-28 16:43:59 -04:00
|
|
|
self._set_opts(compression=compression)
|
2015-10-28 17:28:38 -04:00
|
|
|
self._jwrite.text(path)
|
|
|
|
|
2016-02-29 12:44:29 -05:00
|
|
|
@since(2.0)
|
2016-06-06 02:40:13 -04:00
|
|
|
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
|
|
|
|
header=None, nullValue=None, escapeQuotes=None):
|
2016-06-12 02:20:40 -04:00
|
|
|
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
|
2016-02-29 12:44:29 -05:00
|
|
|
|
|
|
|
:param path: the path in any Hadoop supported file system
|
|
|
|
:param mode: specifies the behavior of the save operation when data already exists.
|
|
|
|
|
|
|
|
* ``append``: Append contents of this :class:`DataFrame` to existing data.
|
|
|
|
* ``overwrite``: Overwrite existing data.
|
|
|
|
* ``ignore``: Silently ignore this operation if data already exists.
|
|
|
|
* ``error`` (default case): Throw an exception if data already exists.
|
|
|
|
|
2016-03-03 13:30:55 -05:00
|
|
|
:param compression: compression codec to use when saving to file. This can be one of the
|
|
|
|
known case-insensitive shorten names (none, bzip2, gzip, lz4,
|
|
|
|
snappy and deflate).
|
2016-06-06 02:40:13 -04:00
|
|
|
:param sep: sets the single character as a separator for each field and value. If None is
|
|
|
|
set, it uses the default value, ``,``.
|
2016-05-02 20:50:40 -04:00
|
|
|
:param quote: sets the single character used for escaping quoted values where the
|
2016-06-06 02:40:13 -04:00
|
|
|
separator can be part of the value. If None is set, it uses the default
|
2016-06-11 18:12:21 -04:00
|
|
|
value, ``"``. If you would like to turn off quotations, you need to set an
|
|
|
|
empty string.
|
2016-05-02 20:50:40 -04:00
|
|
|
:param escape: sets the single character used for escaping quotes inside an already
|
2016-06-06 02:40:13 -04:00
|
|
|
quoted value. If None is set, it uses the default value, ``\``
|
2016-05-25 15:40:16 -04:00
|
|
|
:param escapeQuotes: A flag indicating whether values containing quotes should always
|
|
|
|
be enclosed in quotes. If None is set, it uses the default value
|
|
|
|
``true``, escaping all values containing a quote character.
|
2016-06-06 02:40:13 -04:00
|
|
|
:param header: writes the names of columns as the first line. If None is set, it uses
|
|
|
|
the default value, ``false``.
|
|
|
|
:param nullValue: sets the string representation of a null value. If None is set, it uses
|
|
|
|
the default value, empty string.
|
2016-02-29 12:44:29 -05:00
|
|
|
|
|
|
|
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
|
|
|
|
"""
|
2016-03-03 13:30:55 -05:00
|
|
|
self.mode(mode)
|
2016-06-28 16:43:59 -04:00
|
|
|
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
|
|
|
|
nullValue=nullValue, escapeQuotes=escapeQuotes)
|
2016-03-03 13:30:55 -05:00
|
|
|
self._jwrite.csv(path)
|
2016-02-29 12:44:29 -05:00
|
|
|
|
2015-10-28 17:28:38 -04:00
|
|
|
@since(1.5)
|
2016-03-03 13:30:55 -05:00
|
|
|
def orc(self, path, mode=None, partitionBy=None, compression=None):
|
2015-07-21 03:08:44 -04:00
|
|
|
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.
|
|
|
|
|
2016-05-11 18:31:16 -04:00
|
|
|
.. note:: Currently ORC support is only available together with Hive support.
|
2015-07-21 03:08:44 -04:00
|
|
|
|
|
|
|
:param path: the path in any Hadoop supported file system
|
|
|
|
:param mode: specifies the behavior of the save operation when data already exists.
|
|
|
|
|
|
|
|
* ``append``: Append contents of this :class:`DataFrame` to existing data.
|
|
|
|
* ``overwrite``: Overwrite existing data.
|
|
|
|
* ``ignore``: Silently ignore this operation if data already exists.
|
|
|
|
* ``error`` (default case): Throw an exception if data already exists.
|
|
|
|
:param partitionBy: names of partitioning columns
|
2016-03-03 13:30:55 -05:00
|
|
|
:param compression: compression codec to use when saving to file. This can be one of the
|
|
|
|
known case-insensitive shorten names (none, snappy, zlib, and lzo).
|
2016-06-12 02:20:40 -04:00
|
|
|
This will override ``orc.compress``. If None is set, it uses the
|
|
|
|
default value, ``snappy``.
|
2015-07-21 03:08:44 -04:00
|
|
|
|
2016-05-11 18:31:16 -04:00
|
|
|
>>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned')
|
2015-07-21 03:08:44 -04:00
|
|
|
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
|
|
|
|
"""
|
|
|
|
self.mode(mode)
|
|
|
|
if partitionBy is not None:
|
|
|
|
self.partitionBy(partitionBy)
|
2016-03-03 13:30:55 -05:00
|
|
|
if compression is not None:
|
|
|
|
self.option("compression", compression)
|
2015-07-21 03:08:44 -04:00
|
|
|
self._jwrite.orc(path)
|
|
|
|
|
2015-05-21 02:05:54 -04:00
|
|
|
@since(1.4)
|
2015-08-14 15:46:05 -04:00
|
|
|
def jdbc(self, url, table, mode=None, properties=None):
|
2016-05-27 01:39:14 -04:00
|
|
|
"""Saves the content of the :class:`DataFrame` to an external database table via JDBC.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2016-05-11 18:31:16 -04:00
|
|
|
.. note:: Don't create too many partitions in parallel on a large cluster; \
|
2015-06-03 03:23:34 -04:00
|
|
|
otherwise Spark might crash your external database systems.
|
2015-05-19 17:23:28 -04:00
|
|
|
|
2015-06-03 03:23:34 -04:00
|
|
|
:param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
|
2015-05-19 17:23:28 -04:00
|
|
|
:param table: Name of the table in the external database.
|
2015-06-03 03:23:34 -04:00
|
|
|
:param mode: specifies the behavior of the save operation when data already exists.
|
|
|
|
|
|
|
|
* ``append``: Append contents of this :class:`DataFrame` to existing data.
|
|
|
|
* ``overwrite``: Overwrite existing data.
|
|
|
|
* ``ignore``: Silently ignore this operation if data already exists.
|
|
|
|
* ``error`` (default case): Throw an exception if data already exists.
|
2015-05-19 17:23:28 -04:00
|
|
|
:param properties: JDBC database connection arguments, a list of
|
2015-06-03 03:23:34 -04:00
|
|
|
arbitrary string tag/value. Normally at least a
|
|
|
|
"user" and "password" property should be included.
|
2015-05-19 17:23:28 -04:00
|
|
|
"""
|
2015-08-14 15:46:05 -04:00
|
|
|
if properties is None:
|
|
|
|
properties = dict()
|
2016-05-11 14:24:16 -04:00
|
|
|
jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
|
2015-05-19 17:23:28 -04:00
|
|
|
for k in properties:
|
|
|
|
jprop.setProperty(k, properties[k])
|
|
|
|
self._jwrite.mode(mode).jdbc(url, table, jprop)
|
|
|
|
|
|
|
|
|
|
|
|
def _test():
|
|
|
|
import doctest
|
2015-06-03 03:23:34 -04:00
|
|
|
import os
|
|
|
|
import tempfile
|
2016-05-11 18:31:16 -04:00
|
|
|
import py4j
|
2015-05-19 17:23:28 -04:00
|
|
|
from pyspark.context import SparkContext
|
2016-05-11 18:31:16 -04:00
|
|
|
from pyspark.sql import SparkSession, Row
|
2015-05-19 17:23:28 -04:00
|
|
|
import pyspark.sql.readwriter
|
2015-06-03 03:23:34 -04:00
|
|
|
|
|
|
|
os.chdir(os.environ["SPARK_HOME"])
|
|
|
|
|
2015-05-19 17:23:28 -04:00
|
|
|
globs = pyspark.sql.readwriter.__dict__.copy()
|
|
|
|
sc = SparkContext('local[4]', 'PythonTest')
|
2016-05-11 18:31:16 -04:00
|
|
|
try:
|
2016-05-12 00:43:56 -04:00
|
|
|
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
|
2016-05-11 18:31:16 -04:00
|
|
|
except py4j.protocol.Py4JError:
|
|
|
|
spark = SparkSession(sc)
|
2015-06-03 03:23:34 -04:00
|
|
|
|
|
|
|
globs['tempfile'] = tempfile
|
|
|
|
globs['os'] = os
|
2015-05-19 17:23:28 -04:00
|
|
|
globs['sc'] = sc
|
2016-05-11 18:31:16 -04:00
|
|
|
globs['spark'] = spark
|
|
|
|
globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned')
|
2015-05-19 17:23:28 -04:00
|
|
|
(failure_count, test_count) = doctest.testmod(
|
|
|
|
pyspark.sql.readwriter, globs=globs,
|
|
|
|
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
|
2016-05-11 18:31:16 -04:00
|
|
|
sc.stop()
|
2015-05-19 17:23:28 -04:00
|
|
|
if failure_count:
|
|
|
|
exit(-1)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
_test()
|