[SPARK-7606] [SQL] [PySpark] add version to Python SQL API docs

Add version info for public Python SQL API.

cc rxin

Author: Davies Liu <davies@databricks.com>

Closes #6295 from davies/versions and squashes the following commits:

cfd91e6 [Davies Liu] add more version for DataFrame API
600834d [Davies Liu] add version to SQL API docs

(cherry picked from commit 8ddcb25b39)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This commit is contained in:
Davies Liu 2015-05-20 23:05:54 -07:00 committed by Reynold Xin
parent e70be6987b
commit b0e7c66338
7 changed files with 170 additions and 18 deletions

View file

@ -41,6 +41,13 @@ Important classes of Spark SQL and DataFrames:
""" """
from __future__ import absolute_import from __future__ import absolute_import
def since(version):
def deco(f):
f.__doc__ = f.__doc__.rstrip() + "\n\n.. versionadded:: %s" % version
return f
return deco
# fix the module name conflict for Python 3+ # fix the module name conflict for Python 3+
import sys import sys
from . import _types as types from . import _types as types

View file

@ -23,6 +23,7 @@ if sys.version >= '3':
from pyspark.context import SparkContext from pyspark.context import SparkContext
from pyspark.rdd import ignore_unicode_prefix from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql import since
from pyspark.sql.types import * from pyspark.sql.types import *
__all__ = ["DataFrame", "Column", "SchemaRDD", "DataFrameNaFunctions", __all__ = ["DataFrame", "Column", "SchemaRDD", "DataFrameNaFunctions",
@ -114,6 +115,8 @@ class Column(object):
# 2. Create from an expression # 2. Create from an expression
df.colName + 1 df.colName + 1
1 / df.colName 1 / df.colName
.. versionadded:: 1.3
""" """
def __init__(self, jc): def __init__(self, jc):
@ -159,6 +162,7 @@ class Column(object):
bitwiseAND = _bin_op("bitwiseAND") bitwiseAND = _bin_op("bitwiseAND")
bitwiseXOR = _bin_op("bitwiseXOR") bitwiseXOR = _bin_op("bitwiseXOR")
@since(1.3)
def getItem(self, key): def getItem(self, key):
"""An expression that gets an item at position `ordinal` out of a list, """An expression that gets an item at position `ordinal` out of a list,
or gets an item by key out of a dict. or gets an item by key out of a dict.
@ -179,6 +183,7 @@ class Column(object):
""" """
return self[key] return self[key]
@since(1.3)
def getField(self, name): def getField(self, name):
"""An expression that gets a field by name in a StructField. """An expression that gets a field by name in a StructField.
@ -211,6 +216,7 @@ class Column(object):
endswith = _bin_op("endsWith") endswith = _bin_op("endsWith")
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def substr(self, startPos, length): def substr(self, startPos, length):
""" """
Return a :class:`Column` which is a substring of the column Return a :class:`Column` which is a substring of the column
@ -234,6 +240,7 @@ class Column(object):
__getslice__ = substr __getslice__ = substr
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def inSet(self, *cols): def inSet(self, *cols):
""" A boolean expression that is evaluated to true if the value of this """ A boolean expression that is evaluated to true if the value of this
expression is contained by the evaluated values of the arguments. expression is contained by the evaluated values of the arguments.
@ -259,6 +266,7 @@ class Column(object):
isNull = _unary_op("isNull", "True if the current expression is null.") isNull = _unary_op("isNull", "True if the current expression is null.")
isNotNull = _unary_op("isNotNull", "True if the current expression is not null.") isNotNull = _unary_op("isNotNull", "True if the current expression is not null.")
@since(1.3)
def alias(self, *alias): def alias(self, *alias):
"""Returns this column aliased with a new name or names (in the case of expressions that """Returns this column aliased with a new name or names (in the case of expressions that
return more than one column, such as explode). return more than one column, such as explode).
@ -274,6 +282,7 @@ class Column(object):
return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias)))) return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias))))
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def cast(self, dataType): def cast(self, dataType):
""" Convert the column into type `dataType` """ Convert the column into type `dataType`
@ -294,6 +303,7 @@ class Column(object):
return Column(jc) return Column(jc)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def between(self, lowerBound, upperBound): def between(self, lowerBound, upperBound):
""" A boolean expression that is evaluated to true if the value of this """ A boolean expression that is evaluated to true if the value of this
expression is between the given columns. expression is between the given columns.
@ -301,6 +311,7 @@ class Column(object):
return (self >= lowerBound) & (self <= upperBound) return (self >= lowerBound) & (self <= upperBound)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.4)
def when(self, condition, value): def when(self, condition, value):
"""Evaluates a list of conditions and returns one of multiple possible result expressions. """Evaluates a list of conditions and returns one of multiple possible result expressions.
If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions. If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
@ -319,6 +330,7 @@ class Column(object):
return Column(jc) return Column(jc)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.4)
def otherwise(self, value): def otherwise(self, value):
"""Evaluates a list of conditions and returns one of multiple possible result expressions. """Evaluates a list of conditions and returns one of multiple possible result expressions.
If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions. If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.

View file

@ -28,6 +28,7 @@ from py4j.protocol import Py4JError
from pyspark.rdd import RDD, _prepare_for_python_RDD, ignore_unicode_prefix from pyspark.rdd import RDD, _prepare_for_python_RDD, ignore_unicode_prefix
from pyspark.serializers import AutoBatchedSerializer, PickleSerializer from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
from pyspark.sql import since
from pyspark.sql.types import Row, StringType, StructType, _verify_type, \ from pyspark.sql.types import Row, StringType, StructType, _verify_type, \
_infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter _infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter
from pyspark.sql.dataframe import DataFrame from pyspark.sql.dataframe import DataFrame
@ -106,11 +107,13 @@ class SQLContext(object):
self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc()) self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())
return self._scala_SQLContext return self._scala_SQLContext
@since(1.3)
def setConf(self, key, value): def setConf(self, key, value):
"""Sets the given Spark SQL configuration property. """Sets the given Spark SQL configuration property.
""" """
self._ssql_ctx.setConf(key, value) self._ssql_ctx.setConf(key, value)
@since(1.3)
def getConf(self, key, defaultValue): def getConf(self, key, defaultValue):
"""Returns the value of Spark SQL configuration property for the given key. """Returns the value of Spark SQL configuration property for the given key.
@ -119,10 +122,12 @@ class SQLContext(object):
return self._ssql_ctx.getConf(key, defaultValue) return self._ssql_ctx.getConf(key, defaultValue)
@property @property
@since("1.3.1")
def udf(self): def udf(self):
"""Returns a :class:`UDFRegistration` for UDF registration.""" """Returns a :class:`UDFRegistration` for UDF registration."""
return UDFRegistration(self) return UDFRegistration(self)
@since(1.4)
def range(self, start, end, step=1, numPartitions=None): def range(self, start, end, step=1, numPartitions=None):
""" """
Create a :class:`DataFrame` with single LongType column named `id`, Create a :class:`DataFrame` with single LongType column named `id`,
@ -144,6 +149,7 @@ class SQLContext(object):
return DataFrame(jdf, self) return DataFrame(jdf, self)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.2)
def registerFunction(self, name, f, returnType=StringType()): def registerFunction(self, name, f, returnType=StringType()):
"""Registers a lambda function as a UDF so it can be used in SQL statements. """Registers a lambda function as a UDF so it can be used in SQL statements.
@ -210,7 +216,8 @@ class SQLContext(object):
@ignore_unicode_prefix @ignore_unicode_prefix
def inferSchema(self, rdd, samplingRatio=None): def inferSchema(self, rdd, samplingRatio=None):
"""::note: Deprecated in 1.3, use :func:`createDataFrame` instead. """
.. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
""" """
warnings.warn("inferSchema is deprecated, please use createDataFrame instead") warnings.warn("inferSchema is deprecated, please use createDataFrame instead")
@ -221,7 +228,8 @@ class SQLContext(object):
@ignore_unicode_prefix @ignore_unicode_prefix
def applySchema(self, rdd, schema): def applySchema(self, rdd, schema):
"""::note: Deprecated in 1.3, use :func:`createDataFrame` instead. """
.. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
""" """
warnings.warn("applySchema is deprecated, please use createDataFrame instead") warnings.warn("applySchema is deprecated, please use createDataFrame instead")
@ -233,6 +241,7 @@ class SQLContext(object):
return self.createDataFrame(rdd, schema) return self.createDataFrame(rdd, schema)
@since(1.3)
@ignore_unicode_prefix @ignore_unicode_prefix
def createDataFrame(self, data, schema=None, samplingRatio=None): def createDataFrame(self, data, schema=None, samplingRatio=None):
""" """
@ -337,6 +346,7 @@ class SQLContext(object):
df = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) df = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
return DataFrame(df, self) return DataFrame(df, self)
@since(1.3)
def registerDataFrameAsTable(self, df, tableName): def registerDataFrameAsTable(self, df, tableName):
"""Registers the given :class:`DataFrame` as a temporary table in the catalog. """Registers the given :class:`DataFrame` as a temporary table in the catalog.
@ -349,6 +359,7 @@ class SQLContext(object):
else: else:
raise ValueError("Can only register DataFrame as table") raise ValueError("Can only register DataFrame as table")
@since(1.0)
def parquetFile(self, *paths): def parquetFile(self, *paths):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`. """Loads a Parquet file, returning the result as a :class:`DataFrame`.
@ -367,6 +378,7 @@ class SQLContext(object):
jdf = self._ssql_ctx.parquetFile(jpaths) jdf = self._ssql_ctx.parquetFile(jpaths)
return DataFrame(jdf, self) return DataFrame(jdf, self)
@since(1.0)
def jsonFile(self, path, schema=None, samplingRatio=1.0): def jsonFile(self, path, schema=None, samplingRatio=1.0):
"""Loads a text file storing one JSON object per line as a :class:`DataFrame`. """Loads a text file storing one JSON object per line as a :class:`DataFrame`.
@ -407,6 +419,7 @@ class SQLContext(object):
return DataFrame(df, self) return DataFrame(df, self)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.0)
def jsonRDD(self, rdd, schema=None, samplingRatio=1.0): def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
"""Loads an RDD storing one JSON object per string as a :class:`DataFrame`. """Loads an RDD storing one JSON object per string as a :class:`DataFrame`.
@ -449,6 +462,7 @@ class SQLContext(object):
df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype) df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
return DataFrame(df, self) return DataFrame(df, self)
@since(1.3)
def load(self, path=None, source=None, schema=None, **options): def load(self, path=None, source=None, schema=None, **options):
"""Returns the dataset in a data source as a :class:`DataFrame`. """Returns the dataset in a data source as a :class:`DataFrame`.
@ -460,6 +474,7 @@ class SQLContext(object):
""" """
return self.read.load(path, source, schema, **options) return self.read.load(path, source, schema, **options)
@since(1.3)
def createExternalTable(self, tableName, path=None, source=None, def createExternalTable(self, tableName, path=None, source=None,
schema=None, **options): schema=None, **options):
"""Creates an external table based on the dataset in a data source. """Creates an external table based on the dataset in a data source.
@ -489,6 +504,7 @@ class SQLContext(object):
return DataFrame(df, self) return DataFrame(df, self)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.0)
def sql(self, sqlQuery): def sql(self, sqlQuery):
"""Returns a :class:`DataFrame` representing the result of the given query. """Returns a :class:`DataFrame` representing the result of the given query.
@ -499,6 +515,7 @@ class SQLContext(object):
""" """
return DataFrame(self._ssql_ctx.sql(sqlQuery), self) return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
@since(1.0)
def table(self, tableName): def table(self, tableName):
"""Returns the specified table as a :class:`DataFrame`. """Returns the specified table as a :class:`DataFrame`.
@ -510,6 +527,7 @@ class SQLContext(object):
return DataFrame(self._ssql_ctx.table(tableName), self) return DataFrame(self._ssql_ctx.table(tableName), self)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def tables(self, dbName=None): def tables(self, dbName=None):
"""Returns a :class:`DataFrame` containing names of tables in the given database. """Returns a :class:`DataFrame` containing names of tables in the given database.
@ -528,6 +546,7 @@ class SQLContext(object):
else: else:
return DataFrame(self._ssql_ctx.tables(dbName), self) return DataFrame(self._ssql_ctx.tables(dbName), self)
@since(1.3)
def tableNames(self, dbName=None): def tableNames(self, dbName=None):
"""Returns a list of names of tables in the database ``dbName``. """Returns a list of names of tables in the database ``dbName``.
@ -544,25 +563,29 @@ class SQLContext(object):
else: else:
return [name for name in self._ssql_ctx.tableNames(dbName)] return [name for name in self._ssql_ctx.tableNames(dbName)]
@since(1.0)
def cacheTable(self, tableName): def cacheTable(self, tableName):
"""Caches the specified table in-memory.""" """Caches the specified table in-memory."""
self._ssql_ctx.cacheTable(tableName) self._ssql_ctx.cacheTable(tableName)
@since(1.0)
def uncacheTable(self, tableName): def uncacheTable(self, tableName):
"""Removes the specified table from the in-memory cache.""" """Removes the specified table from the in-memory cache."""
self._ssql_ctx.uncacheTable(tableName) self._ssql_ctx.uncacheTable(tableName)
@since(1.3)
def clearCache(self): def clearCache(self):
"""Removes all cached tables from the in-memory cache. """ """Removes all cached tables from the in-memory cache. """
self._ssql_ctx.clearCache() self._ssql_ctx.clearCache()
@property @property
@since(1.4)
def read(self): def read(self):
""" """
Returns a :class:`DataFrameReader` that can be used to read data Returns a :class:`DataFrameReader` that can be used to read data
in as a :class:`DataFrame`. in as a :class:`DataFrame`.
::note: Experimental .. note:: Experimental
>>> sqlContext.read >>> sqlContext.read
<pyspark.sql.readwriter.DataFrameReader object at ...> <pyspark.sql.readwriter.DataFrameReader object at ...>

View file

@ -29,6 +29,7 @@ from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync from pyspark.traceback_utils import SCCallSiteSync
from pyspark.sql import since
from pyspark.sql.types import _create_cls, _parse_datatype_json_string from pyspark.sql.types import _create_cls, _parse_datatype_json_string
from pyspark.sql.column import Column, _to_seq, _to_java_column from pyspark.sql.column import Column, _to_seq, _to_java_column
from pyspark.sql.readwriter import DataFrameWriter from pyspark.sql.readwriter import DataFrameWriter
@ -60,6 +61,8 @@ class DataFrame(object):
people.filter(people.age > 30).join(department, people.deptId == department.id)) \ people.filter(people.age > 30).join(department, people.deptId == department.id)) \
.groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"}) .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
.. versionadded:: 1.3
""" """
def __init__(self, jdf, sql_ctx): def __init__(self, jdf, sql_ctx):
@ -71,6 +74,7 @@ class DataFrame(object):
self._lazy_rdd = None self._lazy_rdd = None
@property @property
@since(1.3)
def rdd(self): def rdd(self):
"""Returns the content as an :class:`pyspark.RDD` of :class:`Row`. """Returns the content as an :class:`pyspark.RDD` of :class:`Row`.
""" """
@ -88,18 +92,21 @@ class DataFrame(object):
return self._lazy_rdd return self._lazy_rdd
@property @property
@since("1.3.1")
def na(self): def na(self):
"""Returns a :class:`DataFrameNaFunctions` for handling missing values. """Returns a :class:`DataFrameNaFunctions` for handling missing values.
""" """
return DataFrameNaFunctions(self) return DataFrameNaFunctions(self)
@property @property
@since(1.4)
def stat(self): def stat(self):
"""Returns a :class:`DataFrameStatFunctions` for statistic functions. """Returns a :class:`DataFrameStatFunctions` for statistic functions.
""" """
return DataFrameStatFunctions(self) return DataFrameStatFunctions(self)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def toJSON(self, use_unicode=True): def toJSON(self, use_unicode=True):
"""Converts a :class:`DataFrame` into a :class:`RDD` of string. """Converts a :class:`DataFrame` into a :class:`RDD` of string.
@ -111,6 +118,7 @@ class DataFrame(object):
rdd = self._jdf.toJSON() rdd = self._jdf.toJSON()
return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode)) return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
@since(1.3)
def saveAsParquetFile(self, path): def saveAsParquetFile(self, path):
"""Saves the contents as a Parquet file, preserving the schema. """Saves the contents as a Parquet file, preserving the schema.
@ -127,6 +135,7 @@ class DataFrame(object):
""" """
self._jdf.saveAsParquetFile(path) self._jdf.saveAsParquetFile(path)
@since(1.3)
def registerTempTable(self, name): def registerTempTable(self, name):
"""Registers this RDD as a temporary table using the given name. """Registers this RDD as a temporary table using the given name.
@ -140,11 +149,13 @@ class DataFrame(object):
""" """
self._jdf.registerTempTable(name) self._jdf.registerTempTable(name)
@since(1.3)
def registerAsTable(self, name): def registerAsTable(self, name):
"""DEPRECATED: use :func:`registerTempTable` instead""" """DEPRECATED: use :func:`registerTempTable` instead"""
warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning) warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning)
self.registerTempTable(name) self.registerTempTable(name)
@since(1.3)
def insertInto(self, tableName, overwrite=False): def insertInto(self, tableName, overwrite=False):
"""Inserts the contents of this :class:`DataFrame` into the specified table. """Inserts the contents of this :class:`DataFrame` into the specified table.
@ -152,6 +163,7 @@ class DataFrame(object):
""" """
self._jdf.insertInto(tableName, overwrite) self._jdf.insertInto(tableName, overwrite)
@since(1.3)
def saveAsTable(self, tableName, source=None, mode="error", **options): def saveAsTable(self, tableName, source=None, mode="error", **options):
"""Saves the contents of this :class:`DataFrame` to a data source as a table. """Saves the contents of this :class:`DataFrame` to a data source as a table.
@ -169,6 +181,7 @@ class DataFrame(object):
""" """
self.write.saveAsTable(tableName, source, mode, **options) self.write.saveAsTable(tableName, source, mode, **options)
@since(1.3)
def save(self, path=None, source=None, mode="error", **options): def save(self, path=None, source=None, mode="error", **options):
"""Saves the contents of the :class:`DataFrame` to a data source. """Saves the contents of the :class:`DataFrame` to a data source.
@ -187,6 +200,7 @@ class DataFrame(object):
return self.write.save(path, source, mode, **options) return self.write.save(path, source, mode, **options)
@property @property
@since(1.4)
def write(self): def write(self):
""" """
Interface for saving the content of the :class:`DataFrame` out Interface for saving the content of the :class:`DataFrame` out
@ -194,7 +208,7 @@ class DataFrame(object):
:return :class:`DataFrameWriter` :return :class:`DataFrameWriter`
::note: Experimental .. note:: Experimental
>>> df.write >>> df.write
<pyspark.sql.readwriter.DataFrameWriter object at ...> <pyspark.sql.readwriter.DataFrameWriter object at ...>
@ -202,6 +216,7 @@ class DataFrame(object):
return DataFrameWriter(self) return DataFrameWriter(self)
@property @property
@since(1.3)
def schema(self): def schema(self):
"""Returns the schema of this :class:`DataFrame` as a :class:`types.StructType`. """Returns the schema of this :class:`DataFrame` as a :class:`types.StructType`.
@ -212,6 +227,7 @@ class DataFrame(object):
self._schema = _parse_datatype_json_string(self._jdf.schema().json()) self._schema = _parse_datatype_json_string(self._jdf.schema().json())
return self._schema return self._schema
@since(1.3)
def printSchema(self): def printSchema(self):
"""Prints out the schema in the tree format. """Prints out the schema in the tree format.
@ -223,6 +239,7 @@ class DataFrame(object):
""" """
print(self._jdf.schema().treeString()) print(self._jdf.schema().treeString())
@since(1.3)
def explain(self, extended=False): def explain(self, extended=False):
"""Prints the (logical and physical) plans to the console for debugging purpose. """Prints the (logical and physical) plans to the console for debugging purpose.
@ -248,12 +265,14 @@ class DataFrame(object):
else: else:
print(self._jdf.queryExecution().executedPlan().toString()) print(self._jdf.queryExecution().executedPlan().toString())
@since(1.3)
def isLocal(self): def isLocal(self):
"""Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
(without any Spark executors). (without any Spark executors).
""" """
return self._jdf.isLocal() return self._jdf.isLocal()
@since(1.3)
def show(self, n=20): def show(self, n=20):
"""Prints the first ``n`` rows to the console. """Prints the first ``n`` rows to the console.
@ -272,6 +291,7 @@ class DataFrame(object):
def __repr__(self): def __repr__(self):
return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
@since(1.3)
def count(self): def count(self):
"""Returns the number of rows in this :class:`DataFrame`. """Returns the number of rows in this :class:`DataFrame`.
@ -281,6 +301,7 @@ class DataFrame(object):
return int(self._jdf.count()) return int(self._jdf.count())
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def collect(self): def collect(self):
"""Returns all the records as a list of :class:`Row`. """Returns all the records as a list of :class:`Row`.
@ -294,6 +315,7 @@ class DataFrame(object):
return [cls(r) for r in rs] return [cls(r) for r in rs]
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def limit(self, num): def limit(self, num):
"""Limits the result count to the number specified. """Limits the result count to the number specified.
@ -306,6 +328,7 @@ class DataFrame(object):
return DataFrame(jdf, self.sql_ctx) return DataFrame(jdf, self.sql_ctx)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def take(self, num): def take(self, num):
"""Returns the first ``num`` rows as a :class:`list` of :class:`Row`. """Returns the first ``num`` rows as a :class:`list` of :class:`Row`.
@ -315,6 +338,7 @@ class DataFrame(object):
return self.limit(num).collect() return self.limit(num).collect()
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def map(self, f): def map(self, f):
""" Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`. """ Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`.
@ -326,6 +350,7 @@ class DataFrame(object):
return self.rdd.map(f) return self.rdd.map(f)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def flatMap(self, f): def flatMap(self, f):
""" Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`, """ Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`,
and then flattening the results. and then flattening the results.
@ -337,6 +362,7 @@ class DataFrame(object):
""" """
return self.rdd.flatMap(f) return self.rdd.flatMap(f)
@since(1.3)
def mapPartitions(self, f, preservesPartitioning=False): def mapPartitions(self, f, preservesPartitioning=False):
"""Returns a new :class:`RDD` by applying the ``f`` function to each partition. """Returns a new :class:`RDD` by applying the ``f`` function to each partition.
@ -349,6 +375,7 @@ class DataFrame(object):
""" """
return self.rdd.mapPartitions(f, preservesPartitioning) return self.rdd.mapPartitions(f, preservesPartitioning)
@since(1.3)
def foreach(self, f): def foreach(self, f):
"""Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`. """Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`.
@ -360,6 +387,7 @@ class DataFrame(object):
""" """
return self.rdd.foreach(f) return self.rdd.foreach(f)
@since(1.3)
def foreachPartition(self, f): def foreachPartition(self, f):
"""Applies the ``f`` function to each partition of this :class:`DataFrame`. """Applies the ``f`` function to each partition of this :class:`DataFrame`.
@ -372,6 +400,7 @@ class DataFrame(object):
""" """
return self.rdd.foreachPartition(f) return self.rdd.foreachPartition(f)
@since(1.3)
def cache(self): def cache(self):
""" Persists with the default storage level (C{MEMORY_ONLY_SER}). """ Persists with the default storage level (C{MEMORY_ONLY_SER}).
""" """
@ -379,6 +408,7 @@ class DataFrame(object):
self._jdf.cache() self._jdf.cache()
return self return self
@since(1.3)
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER): def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER):
"""Sets the storage level to persist its values across operations """Sets the storage level to persist its values across operations
after the first time it is computed. This can only be used to assign after the first time it is computed. This can only be used to assign
@ -390,6 +420,7 @@ class DataFrame(object):
self._jdf.persist(javaStorageLevel) self._jdf.persist(javaStorageLevel)
return self return self
@since(1.3)
def unpersist(self, blocking=True): def unpersist(self, blocking=True):
"""Marks the :class:`DataFrame` as non-persistent, and remove all blocks for it from """Marks the :class:`DataFrame` as non-persistent, and remove all blocks for it from
memory and disk. memory and disk.
@ -398,6 +429,7 @@ class DataFrame(object):
self._jdf.unpersist(blocking) self._jdf.unpersist(blocking)
return self return self
@since(1.4)
def coalesce(self, numPartitions): def coalesce(self, numPartitions):
""" """
Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions. Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions.
@ -412,6 +444,7 @@ class DataFrame(object):
""" """
return DataFrame(self._jdf.coalesce(numPartitions), self.sql_ctx) return DataFrame(self._jdf.coalesce(numPartitions), self.sql_ctx)
@since(1.3)
def repartition(self, numPartitions): def repartition(self, numPartitions):
"""Returns a new :class:`DataFrame` that has exactly ``numPartitions`` partitions. """Returns a new :class:`DataFrame` that has exactly ``numPartitions`` partitions.
@ -420,6 +453,7 @@ class DataFrame(object):
""" """
return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx) return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx)
@since(1.3)
def distinct(self): def distinct(self):
"""Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`. """Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`.
@ -428,6 +462,7 @@ class DataFrame(object):
""" """
return DataFrame(self._jdf.distinct(), self.sql_ctx) return DataFrame(self._jdf.distinct(), self.sql_ctx)
@since(1.3)
def sample(self, withReplacement, fraction, seed=None): def sample(self, withReplacement, fraction, seed=None):
"""Returns a sampled subset of this :class:`DataFrame`. """Returns a sampled subset of this :class:`DataFrame`.
@ -439,6 +474,7 @@ class DataFrame(object):
rdd = self._jdf.sample(withReplacement, fraction, long(seed)) rdd = self._jdf.sample(withReplacement, fraction, long(seed))
return DataFrame(rdd, self.sql_ctx) return DataFrame(rdd, self.sql_ctx)
@since(1.4)
def randomSplit(self, weights, seed=None): def randomSplit(self, weights, seed=None):
"""Randomly splits this :class:`DataFrame` with the provided weights. """Randomly splits this :class:`DataFrame` with the provided weights.
@ -461,6 +497,7 @@ class DataFrame(object):
return [DataFrame(rdd, self.sql_ctx) for rdd in rdd_array] return [DataFrame(rdd, self.sql_ctx) for rdd in rdd_array]
@property @property
@since(1.3)
def dtypes(self): def dtypes(self):
"""Returns all column names and their data types as a list. """Returns all column names and their data types as a list.
@ -471,6 +508,7 @@ class DataFrame(object):
@property @property
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def columns(self): def columns(self):
"""Returns all column names as a list. """Returns all column names as a list.
@ -480,6 +518,7 @@ class DataFrame(object):
return [f.name for f in self.schema.fields] return [f.name for f in self.schema.fields]
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def alias(self, alias): def alias(self, alias):
"""Returns a new :class:`DataFrame` with an alias set. """Returns a new :class:`DataFrame` with an alias set.
@ -494,6 +533,7 @@ class DataFrame(object):
return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx) return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def join(self, other, joinExprs=None, joinType=None): def join(self, other, joinExprs=None, joinType=None):
"""Joins with another :class:`DataFrame`, using the given join expression. """Joins with another :class:`DataFrame`, using the given join expression.
@ -527,6 +567,7 @@ class DataFrame(object):
return DataFrame(jdf, self.sql_ctx) return DataFrame(jdf, self.sql_ctx)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def sort(self, *cols, **kwargs): def sort(self, *cols, **kwargs):
"""Returns a new :class:`DataFrame` sorted by the specified column(s). """Returns a new :class:`DataFrame` sorted by the specified column(s).
@ -586,6 +627,7 @@ class DataFrame(object):
cols = cols[0] cols = cols[0]
return self._jseq(cols, _to_java_column) return self._jseq(cols, _to_java_column)
@since("1.3.1")
def describe(self, *cols): def describe(self, *cols):
"""Computes statistics for numeric columns. """Computes statistics for numeric columns.
@ -607,6 +649,7 @@ class DataFrame(object):
return DataFrame(jdf, self.sql_ctx) return DataFrame(jdf, self.sql_ctx)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def head(self, n=None): def head(self, n=None):
""" """
Returns the first ``n`` rows as a list of :class:`Row`, Returns the first ``n`` rows as a list of :class:`Row`,
@ -623,6 +666,7 @@ class DataFrame(object):
return self.take(n) return self.take(n)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def first(self): def first(self):
"""Returns the first row as a :class:`Row`. """Returns the first row as a :class:`Row`.
@ -632,6 +676,7 @@ class DataFrame(object):
return self.head() return self.head()
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def __getitem__(self, item): def __getitem__(self, item):
"""Returns the column as a :class:`Column`. """Returns the column as a :class:`Column`.
@ -659,6 +704,7 @@ class DataFrame(object):
else: else:
raise TypeError("unexpected item type: %s" % type(item)) raise TypeError("unexpected item type: %s" % type(item))
@since(1.3)
def __getattr__(self, name): def __getattr__(self, name):
"""Returns the :class:`Column` denoted by ``name``. """Returns the :class:`Column` denoted by ``name``.
@ -672,6 +718,7 @@ class DataFrame(object):
return Column(jc) return Column(jc)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def select(self, *cols): def select(self, *cols):
"""Projects a set of expressions and returns a new :class:`DataFrame`. """Projects a set of expressions and returns a new :class:`DataFrame`.
@ -689,6 +736,7 @@ class DataFrame(object):
jdf = self._jdf.select(self._jcols(*cols)) jdf = self._jdf.select(self._jcols(*cols))
return DataFrame(jdf, self.sql_ctx) return DataFrame(jdf, self.sql_ctx)
@since(1.3)
def selectExpr(self, *expr): def selectExpr(self, *expr):
"""Projects a set of SQL expressions and returns a new :class:`DataFrame`. """Projects a set of SQL expressions and returns a new :class:`DataFrame`.
@ -703,6 +751,7 @@ class DataFrame(object):
return DataFrame(jdf, self.sql_ctx) return DataFrame(jdf, self.sql_ctx)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def filter(self, condition): def filter(self, condition):
"""Filters rows using the given condition. """Filters rows using the given condition.
@ -732,6 +781,7 @@ class DataFrame(object):
where = filter where = filter
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def groupBy(self, *cols): def groupBy(self, *cols):
"""Groups the :class:`DataFrame` using the specified columns, """Groups the :class:`DataFrame` using the specified columns,
so we can run aggregation on them. See :class:`GroupedData` so we can run aggregation on them. See :class:`GroupedData`
@ -755,6 +805,7 @@ class DataFrame(object):
from pyspark.sql.group import GroupedData from pyspark.sql.group import GroupedData
return GroupedData(jdf, self.sql_ctx) return GroupedData(jdf, self.sql_ctx)
@since(1.3)
def agg(self, *exprs): def agg(self, *exprs):
""" Aggregate on the entire :class:`DataFrame` without groups """ Aggregate on the entire :class:`DataFrame` without groups
(shorthand for ``df.groupBy.agg()``). (shorthand for ``df.groupBy.agg()``).
@ -767,6 +818,7 @@ class DataFrame(object):
""" """
return self.groupBy().agg(*exprs) return self.groupBy().agg(*exprs)
@since(1.3)
def unionAll(self, other): def unionAll(self, other):
""" Return a new :class:`DataFrame` containing union of rows in this """ Return a new :class:`DataFrame` containing union of rows in this
frame and another frame. frame and another frame.
@ -775,6 +827,7 @@ class DataFrame(object):
""" """
return DataFrame(self._jdf.unionAll(other._jdf), self.sql_ctx) return DataFrame(self._jdf.unionAll(other._jdf), self.sql_ctx)
@since(1.3)
def intersect(self, other): def intersect(self, other):
""" Return a new :class:`DataFrame` containing rows only in """ Return a new :class:`DataFrame` containing rows only in
both this frame and another frame. both this frame and another frame.
@ -783,6 +836,7 @@ class DataFrame(object):
""" """
return DataFrame(self._jdf.intersect(other._jdf), self.sql_ctx) return DataFrame(self._jdf.intersect(other._jdf), self.sql_ctx)
@since(1.3)
def subtract(self, other): def subtract(self, other):
""" Return a new :class:`DataFrame` containing rows in this frame """ Return a new :class:`DataFrame` containing rows in this frame
but not in another frame. but not in another frame.
@ -791,6 +845,7 @@ class DataFrame(object):
""" """
return DataFrame(getattr(self._jdf, "except")(other._jdf), self.sql_ctx) return DataFrame(getattr(self._jdf, "except")(other._jdf), self.sql_ctx)
@since(1.4)
def dropDuplicates(self, subset=None): def dropDuplicates(self, subset=None):
"""Return a new :class:`DataFrame` with duplicate rows removed, """Return a new :class:`DataFrame` with duplicate rows removed,
optionally only considering certain columns. optionally only considering certain columns.
@ -821,6 +876,7 @@ class DataFrame(object):
jdf = self._jdf.dropDuplicates(self._jseq(subset)) jdf = self._jdf.dropDuplicates(self._jseq(subset))
return DataFrame(jdf, self.sql_ctx) return DataFrame(jdf, self.sql_ctx)
@since("1.3.1")
def dropna(self, how='any', thresh=None, subset=None): def dropna(self, how='any', thresh=None, subset=None):
"""Returns a new :class:`DataFrame` omitting rows with null values. """Returns a new :class:`DataFrame` omitting rows with null values.
@ -863,6 +919,7 @@ class DataFrame(object):
return DataFrame(self._jdf.na().drop(thresh, self._jseq(subset)), self.sql_ctx) return DataFrame(self._jdf.na().drop(thresh, self._jseq(subset)), self.sql_ctx)
@since("1.3.1")
def fillna(self, value, subset=None): def fillna(self, value, subset=None):
"""Replace null values, alias for ``na.fill()``. """Replace null values, alias for ``na.fill()``.
@ -924,6 +981,7 @@ class DataFrame(object):
return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), self.sql_ctx) return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), self.sql_ctx)
@since(1.4)
def replace(self, to_replace, value, subset=None): def replace(self, to_replace, value, subset=None):
"""Returns a new :class:`DataFrame` replacing a value with another value. """Returns a new :class:`DataFrame` replacing a value with another value.
@ -999,6 +1057,7 @@ class DataFrame(object):
return DataFrame( return DataFrame(
self._jdf.na().replace(self._jseq(subset), self._jmap(rep_dict)), self.sql_ctx) self._jdf.na().replace(self._jseq(subset), self._jmap(rep_dict)), self.sql_ctx)
@since(1.4)
def corr(self, col1, col2, method=None): def corr(self, col1, col2, method=None):
""" """
Calculates the correlation of two columns of a DataFrame as a double value. Currently only Calculates the correlation of two columns of a DataFrame as a double value. Currently only
@ -1020,6 +1079,7 @@ class DataFrame(object):
"coefficient is supported.") "coefficient is supported.")
return self._jdf.stat().corr(col1, col2, method) return self._jdf.stat().corr(col1, col2, method)
@since(1.4)
def cov(self, col1, col2): def cov(self, col1, col2):
""" """
Calculate the sample covariance for the given columns, specified by their names, as a Calculate the sample covariance for the given columns, specified by their names, as a
@ -1034,6 +1094,7 @@ class DataFrame(object):
raise ValueError("col2 should be a string.") raise ValueError("col2 should be a string.")
return self._jdf.stat().cov(col1, col2) return self._jdf.stat().cov(col1, col2)
@since(1.4)
def crosstab(self, col1, col2): def crosstab(self, col1, col2):
""" """
Computes a pair-wise frequency table of the given columns. Also known as a contingency Computes a pair-wise frequency table of the given columns. Also known as a contingency
@ -1055,6 +1116,7 @@ class DataFrame(object):
raise ValueError("col2 should be a string.") raise ValueError("col2 should be a string.")
return DataFrame(self._jdf.stat().crosstab(col1, col2), self.sql_ctx) return DataFrame(self._jdf.stat().crosstab(col1, col2), self.sql_ctx)
@since(1.4)
def freqItems(self, cols, support=None): def freqItems(self, cols, support=None):
""" """
Finding frequent items for columns, possibly with false positives. Using the Finding frequent items for columns, possibly with false positives. Using the
@ -1076,6 +1138,7 @@ class DataFrame(object):
return DataFrame(self._jdf.stat().freqItems(_to_seq(self._sc, cols), support), self.sql_ctx) return DataFrame(self._jdf.stat().freqItems(_to_seq(self._sc, cols), support), self.sql_ctx)
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def withColumn(self, colName, col): def withColumn(self, colName, col):
"""Returns a new :class:`DataFrame` by adding a column. """Returns a new :class:`DataFrame` by adding a column.
@ -1088,6 +1151,7 @@ class DataFrame(object):
return self.select('*', col.alias(colName)) return self.select('*', col.alias(colName))
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def withColumnRenamed(self, existing, new): def withColumnRenamed(self, existing, new):
"""Returns a new :class:`DataFrame` by renaming an existing column. """Returns a new :class:`DataFrame` by renaming an existing column.
@ -1102,6 +1166,7 @@ class DataFrame(object):
for c in self.columns] for c in self.columns]
return self.select(*cols) return self.select(*cols)
@since(1.4)
@ignore_unicode_prefix @ignore_unicode_prefix
def drop(self, colName): def drop(self, colName):
"""Returns a new :class:`DataFrame` that drops the specified column. """Returns a new :class:`DataFrame` that drops the specified column.
@ -1114,6 +1179,7 @@ class DataFrame(object):
jdf = self._jdf.drop(colName) jdf = self._jdf.drop(colName)
return DataFrame(jdf, self.sql_ctx) return DataFrame(jdf, self.sql_ctx)
@since(1.3)
def toPandas(self): def toPandas(self):
"""Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``. """Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``.

View file

@ -26,6 +26,7 @@ if sys.version < "3":
from pyspark import SparkContext from pyspark import SparkContext
from pyspark.rdd import _prepare_for_python_RDD, ignore_unicode_prefix from pyspark.rdd import _prepare_for_python_RDD, ignore_unicode_prefix
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.sql import since
from pyspark.sql.types import StringType from pyspark.sql.types import StringType
from pyspark.sql.column import Column, _to_java_column, _to_seq from pyspark.sql.column import Column, _to_java_column, _to_seq
@ -78,6 +79,18 @@ _functions = {
'sqrt': 'Computes the square root of the specified float value.', 'sqrt': 'Computes the square root of the specified float value.',
'abs': 'Computes the absolute value.', 'abs': 'Computes the absolute value.',
'max': 'Aggregate function: returns the maximum value of the expression in a group.',
'min': 'Aggregate function: returns the minimum value of the expression in a group.',
'first': 'Aggregate function: returns the first value in a group.',
'last': 'Aggregate function: returns the last value in a group.',
'count': 'Aggregate function: returns the number of items in a group.',
'sum': 'Aggregate function: returns the sum of all values in the expression.',
'avg': 'Aggregate function: returns the average of the values in a group.',
'mean': 'Aggregate function: returns the average of the values in a group.',
'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.',
}
_functions_1_4 = {
# unary math functions # unary math functions
'acos': 'Computes the cosine inverse of the given value; the returned angle is in the range' + 'acos': 'Computes the cosine inverse of the given value; the returned angle is in the range' +
'0.0 through pi.', '0.0 through pi.',
@ -102,21 +115,11 @@ _functions = {
'tan': 'Computes the tangent of the given value.', 'tan': 'Computes the tangent of the given value.',
'tanh': 'Computes the hyperbolic tangent of the given value.', 'tanh': 'Computes the hyperbolic tangent of the given value.',
'toDegrees': 'Converts an angle measured in radians to an approximately equivalent angle ' + 'toDegrees': 'Converts an angle measured in radians to an approximately equivalent angle ' +
'measured in degrees.', 'measured in degrees.',
'toRadians': 'Converts an angle measured in degrees to an approximately equivalent angle ' + 'toRadians': 'Converts an angle measured in degrees to an approximately equivalent angle ' +
'measured in radians.', 'measured in radians.',
'bitwiseNOT': 'Computes bitwise not.', 'bitwiseNOT': 'Computes bitwise not.',
'max': 'Aggregate function: returns the maximum value of the expression in a group.',
'min': 'Aggregate function: returns the minimum value of the expression in a group.',
'first': 'Aggregate function: returns the first value in a group.',
'last': 'Aggregate function: returns the last value in a group.',
'count': 'Aggregate function: returns the number of items in a group.',
'sum': 'Aggregate function: returns the sum of all values in the expression.',
'avg': 'Aggregate function: returns the average of the values in a group.',
'mean': 'Aggregate function: returns the average of the values in a group.',
'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.',
} }
# math functions that take two arguments as input # math functions that take two arguments as input
@ -128,15 +131,18 @@ _binary_mathfunctions = {
} }
for _name, _doc in _functions.items(): for _name, _doc in _functions.items():
globals()[_name] = _create_function(_name, _doc) globals()[_name] = since(1.3)(_create_function(_name, _doc))
for _name, _doc in _functions_1_4.items():
globals()[_name] = since(1.4)(_create_function(_name, _doc))
for _name, _doc in _binary_mathfunctions.items(): for _name, _doc in _binary_mathfunctions.items():
globals()[_name] = _create_binary_mathfunction(_name, _doc) globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc))
del _name, _doc del _name, _doc
__all__ += _functions.keys() __all__ += _functions.keys()
__all__ += _binary_mathfunctions.keys() __all__ += _binary_mathfunctions.keys()
__all__.sort() __all__.sort()
@since(1.4)
def array(*cols): def array(*cols):
"""Creates a new array column. """Creates a new array column.
@ -155,6 +161,7 @@ def array(*cols):
return Column(jc) return Column(jc)
@since(1.3)
def approxCountDistinct(col, rsd=None): def approxCountDistinct(col, rsd=None):
"""Returns a new :class:`Column` for approximate distinct count of ``col``. """Returns a new :class:`Column` for approximate distinct count of ``col``.
@ -169,6 +176,7 @@ def approxCountDistinct(col, rsd=None):
return Column(jc) return Column(jc)
@since(1.4)
def explode(col): def explode(col):
"""Returns a new row for each element in the given array or map. """Returns a new row for each element in the given array or map.
@ -189,6 +197,7 @@ def explode(col):
return Column(jc) return Column(jc)
@since(1.4)
def coalesce(*cols): def coalesce(*cols):
"""Returns the first column that is not null. """Returns the first column that is not null.
@ -225,6 +234,7 @@ def coalesce(*cols):
return Column(jc) return Column(jc)
@since(1.3)
def countDistinct(col, *cols): def countDistinct(col, *cols):
"""Returns a new :class:`Column` for distinct count of ``col`` or ``cols``. """Returns a new :class:`Column` for distinct count of ``col`` or ``cols``.
@ -239,6 +249,7 @@ def countDistinct(col, *cols):
return Column(jc) return Column(jc)
@since(1.4)
def monotonicallyIncreasingId(): def monotonicallyIncreasingId():
"""A column that generates monotonically increasing 64-bit integers. """A column that generates monotonically increasing 64-bit integers.
@ -259,6 +270,7 @@ def monotonicallyIncreasingId():
return Column(sc._jvm.functions.monotonicallyIncreasingId()) return Column(sc._jvm.functions.monotonicallyIncreasingId())
@since(1.4)
def rand(seed=None): def rand(seed=None):
"""Generates a random column with i.i.d. samples from U[0.0, 1.0]. """Generates a random column with i.i.d. samples from U[0.0, 1.0].
""" """
@ -270,6 +282,7 @@ def rand(seed=None):
return Column(jc) return Column(jc)
@since(1.4)
def randn(seed=None): def randn(seed=None):
"""Generates a column with i.i.d. samples from the standard normal distribution. """Generates a column with i.i.d. samples from the standard normal distribution.
""" """
@ -281,6 +294,7 @@ def randn(seed=None):
return Column(jc) return Column(jc)
@since(1.4)
def sparkPartitionId(): def sparkPartitionId():
"""A column for partition ID of the Spark task. """A column for partition ID of the Spark task.
@ -294,6 +308,7 @@ def sparkPartitionId():
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.4)
def struct(*cols): def struct(*cols):
"""Creates a new struct column. """Creates a new struct column.
@ -312,6 +327,7 @@ def struct(*cols):
return Column(jc) return Column(jc)
@since(1.4)
def when(condition, value): def when(condition, value):
"""Evaluates a list of conditions and returns one of multiple possible result expressions. """Evaluates a list of conditions and returns one of multiple possible result expressions.
If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions. If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
@ -336,6 +352,8 @@ def when(condition, value):
class UserDefinedFunction(object): class UserDefinedFunction(object):
""" """
User defined function in Python User defined function in Python
.. versionadded:: 1.3
""" """
def __init__(self, func, returnType): def __init__(self, func, returnType):
self.func = func self.func = func
@ -369,6 +387,7 @@ class UserDefinedFunction(object):
return Column(jc) return Column(jc)
@since(1.3)
def udf(f, returnType=StringType()): def udf(f, returnType=StringType()):
"""Creates a :class:`Column` expression representing a user defined function (UDF). """Creates a :class:`Column` expression representing a user defined function (UDF).

View file

@ -16,6 +16,7 @@
# #
from pyspark.rdd import ignore_unicode_prefix from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql import since
from pyspark.sql.column import Column, _to_seq from pyspark.sql.column import Column, _to_seq
from pyspark.sql.dataframe import DataFrame from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import * from pyspark.sql.types import *
@ -47,6 +48,8 @@ class GroupedData(object):
""" """
A set of methods for aggregations on a :class:`DataFrame`, A set of methods for aggregations on a :class:`DataFrame`,
created by :func:`DataFrame.groupBy`. created by :func:`DataFrame.groupBy`.
.. versionadded:: 1.3
""" """
def __init__(self, jdf, sql_ctx): def __init__(self, jdf, sql_ctx):
@ -54,6 +57,7 @@ class GroupedData(object):
self.sql_ctx = sql_ctx self.sql_ctx = sql_ctx
@ignore_unicode_prefix @ignore_unicode_prefix
@since(1.3)
def agg(self, *exprs): def agg(self, *exprs):
"""Compute aggregates and returns the result as a :class:`DataFrame`. """Compute aggregates and returns the result as a :class:`DataFrame`.
@ -86,6 +90,7 @@ class GroupedData(object):
return DataFrame(jdf, self.sql_ctx) return DataFrame(jdf, self.sql_ctx)
@dfapi @dfapi
@since(1.3)
def count(self): def count(self):
"""Counts the number of records for each group. """Counts the number of records for each group.
@ -94,6 +99,7 @@ class GroupedData(object):
""" """
@df_varargs_api @df_varargs_api
@since(1.3)
def mean(self, *cols): def mean(self, *cols):
"""Computes average values for each numeric columns for each group. """Computes average values for each numeric columns for each group.
@ -108,6 +114,7 @@ class GroupedData(object):
""" """
@df_varargs_api @df_varargs_api
@since(1.3)
def avg(self, *cols): def avg(self, *cols):
"""Computes average values for each numeric columns for each group. """Computes average values for each numeric columns for each group.
@ -122,6 +129,7 @@ class GroupedData(object):
""" """
@df_varargs_api @df_varargs_api
@since(1.3)
def max(self, *cols): def max(self, *cols):
"""Computes the max value for each numeric columns for each group. """Computes the max value for each numeric columns for each group.
@ -132,6 +140,7 @@ class GroupedData(object):
""" """
@df_varargs_api @df_varargs_api
@since(1.3)
def min(self, *cols): def min(self, *cols):
"""Computes the min value for each numeric column for each group. """Computes the min value for each numeric column for each group.
@ -144,6 +153,7 @@ class GroupedData(object):
""" """
@df_varargs_api @df_varargs_api
@since(1.3)
def sum(self, *cols): def sum(self, *cols):
"""Compute the sum for each numeric columns for each group. """Compute the sum for each numeric columns for each group.

View file

@ -17,6 +17,7 @@
from py4j.java_gateway import JavaClass from py4j.java_gateway import JavaClass
from pyspark.sql import since
from pyspark.sql.column import _to_seq from pyspark.sql.column import _to_seq
from pyspark.sql.types import * from pyspark.sql.types import *
@ -30,6 +31,8 @@ class DataFrameReader(object):
to access this. to access this.
::Note: Experimental ::Note: Experimental
.. versionadded:: 1.4
""" """
def __init__(self, sqlContext): def __init__(self, sqlContext):
@ -40,6 +43,7 @@ class DataFrameReader(object):
from pyspark.sql.dataframe import DataFrame from pyspark.sql.dataframe import DataFrame
return DataFrame(jdf, self._sqlContext) return DataFrame(jdf, self._sqlContext)
@since(1.4)
def load(self, path=None, format=None, schema=None, **options): def load(self, path=None, format=None, schema=None, **options):
"""Loads data from a data source and returns it as a :class`DataFrame`. """Loads data from a data source and returns it as a :class`DataFrame`.
@ -63,6 +67,7 @@ class DataFrameReader(object):
else: else:
return self._df(jreader.load()) return self._df(jreader.load())
@since(1.4)
def json(self, path, schema=None): def json(self, path, schema=None):
""" """
Loads a JSON file (one object per line) and returns the result as Loads a JSON file (one object per line) and returns the result as
@ -107,6 +112,7 @@ class DataFrameReader(object):
jdf = self._jreader.schema(jschema).json(path) jdf = self._jreader.schema(jschema).json(path)
return self._df(jdf) return self._df(jdf)
@since(1.4)
def table(self, tableName): def table(self, tableName):
"""Returns the specified table as a :class:`DataFrame`. """Returns the specified table as a :class:`DataFrame`.
@ -117,6 +123,7 @@ class DataFrameReader(object):
""" """
return self._df(self._jreader.table(tableName)) return self._df(self._jreader.table(tableName))
@since(1.4)
def parquet(self, *path): def parquet(self, *path):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`. """Loads a Parquet file, returning the result as a :class:`DataFrame`.
@ -130,6 +137,7 @@ class DataFrameReader(object):
""" """
return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path))) return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
@since(1.4)
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
predicates=None, properties={}): predicates=None, properties={}):
""" """
@ -178,12 +186,15 @@ class DataFrameWriter(object):
to access this. to access this.
::Note: Experimental ::Note: Experimental
.. versionadded:: 1.4
""" """
def __init__(self, df): def __init__(self, df):
self._df = df self._df = df
self._sqlContext = df.sql_ctx self._sqlContext = df.sql_ctx
self._jwrite = df._jdf.write() self._jwrite = df._jdf.write()
@since(1.4)
def save(self, path=None, format=None, mode="error", **options): def save(self, path=None, format=None, mode="error", **options):
""" """
Saves the contents of the :class:`DataFrame` to a data source. Saves the contents of the :class:`DataFrame` to a data source.
@ -215,6 +226,7 @@ class DataFrameWriter(object):
else: else:
jwrite.save(path) jwrite.save(path)
@since(1.4)
def saveAsTable(self, name, format=None, mode="error", **options): def saveAsTable(self, name, format=None, mode="error", **options):
""" """
Saves the contents of this :class:`DataFrame` to a data source as a table. Saves the contents of this :class:`DataFrame` to a data source as a table.
@ -243,6 +255,7 @@ class DataFrameWriter(object):
jwrite = jwrite.option(k, options[k]) jwrite = jwrite.option(k, options[k])
return jwrite.saveAsTable(name) return jwrite.saveAsTable(name)
@since(1.4)
def json(self, path, mode="error"): def json(self, path, mode="error"):
""" """
Saves the content of the :class:`DataFrame` in JSON format at the Saves the content of the :class:`DataFrame` in JSON format at the
@ -261,6 +274,7 @@ class DataFrameWriter(object):
""" """
return self._jwrite.mode(mode).json(path) return self._jwrite.mode(mode).json(path)
@since(1.4)
def parquet(self, path, mode="error"): def parquet(self, path, mode="error"):
""" """
Saves the content of the :class:`DataFrame` in Parquet format at the Saves the content of the :class:`DataFrame` in Parquet format at the
@ -279,6 +293,7 @@ class DataFrameWriter(object):
""" """
return self._jwrite.mode(mode).parquet(path) return self._jwrite.mode(mode).parquet(path)
@since(1.4)
def jdbc(self, url, table, mode="error", properties={}): def jdbc(self, url, table, mode="error", properties={}):
""" """
Saves the content of the :class:`DataFrame` to a external database table Saves the content of the :class:`DataFrame` to a external database table