2015-02-09 23:49:22 -05:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
2015-05-02 18:04:13 -04:00
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
2015-02-09 23:49:22 -05:00
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
|
|
|
import sys
|
|
|
|
import warnings
|
|
|
|
import random
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
if sys.version >= '3':
|
|
|
|
basestring = unicode = str
|
|
|
|
long = int
|
|
|
|
else:
|
|
|
|
from itertools import imap as map
|
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
from pyspark.context import SparkContext
|
2015-04-16 19:20:57 -04:00
|
|
|
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
|
2015-02-14 02:03:22 -05:00
|
|
|
from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer
|
2015-02-09 23:49:22 -05:00
|
|
|
from pyspark.storagelevel import StorageLevel
|
|
|
|
from pyspark.traceback_utils import SCCallSiteSync
|
|
|
|
from pyspark.sql.types import *
|
|
|
|
from pyspark.sql.types import _create_cls, _parse_datatype_json_string
|
|
|
|
|
|
|
|
|
2015-05-01 16:29:17 -04:00
|
|
|
__all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD", "DataFrameNaFunctions",
|
|
|
|
"DataFrameStatFunctions"]
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
|
|
|
|
class DataFrame(object):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""A distributed collection of data grouped into named columns.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
|
|
|
|
and can be created using various functions in :class:`SQLContext`::
|
|
|
|
|
|
|
|
people = sqlContext.parquetFile("...")
|
|
|
|
|
|
|
|
Once created, it can be manipulated using the various domain-specific-language
|
|
|
|
(DSL) functions defined in: :class:`DataFrame`, :class:`Column`.
|
|
|
|
|
|
|
|
To select a column from the data frame, use the apply method::
|
|
|
|
|
|
|
|
ageCol = people.age
|
|
|
|
|
|
|
|
A more concrete example::
|
|
|
|
|
|
|
|
# To create DataFrame using SQLContext
|
|
|
|
people = sqlContext.parquetFile("...")
|
|
|
|
department = sqlContext.parquetFile("...")
|
|
|
|
|
|
|
|
people.filter(people.age > 30).join(department, people.deptId == department.id)) \
|
|
|
|
.groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, jdf, sql_ctx):
|
|
|
|
self._jdf = jdf
|
|
|
|
self.sql_ctx = sql_ctx
|
|
|
|
self._sc = sql_ctx and sql_ctx._sc
|
|
|
|
self.is_cached = False
|
2015-02-14 02:03:22 -05:00
|
|
|
self._schema = None # initialized lazily
|
2015-04-16 19:20:57 -04:00
|
|
|
self._lazy_rdd = None
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
@property
|
|
|
|
def rdd(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns the content as an :class:`pyspark.RDD` of :class:`Row`.
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
2015-04-16 19:20:57 -04:00
|
|
|
if self._lazy_rdd is None:
|
2015-02-09 23:49:22 -05:00
|
|
|
jrdd = self._jdf.javaToPython()
|
|
|
|
rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
|
2015-02-14 02:03:22 -05:00
|
|
|
schema = self.schema
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
def applySchema(it):
|
|
|
|
cls = _create_cls(schema)
|
2015-04-16 19:20:57 -04:00
|
|
|
return map(cls, it)
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
self._lazy_rdd = rdd.mapPartitions(applySchema)
|
|
|
|
|
|
|
|
return self._lazy_rdd
|
|
|
|
|
2015-03-31 03:25:23 -04:00
|
|
|
@property
|
|
|
|
def na(self):
|
|
|
|
"""Returns a :class:`DataFrameNaFunctions` for handling missing values.
|
|
|
|
"""
|
|
|
|
return DataFrameNaFunctions(self)
|
|
|
|
|
2015-05-01 16:29:17 -04:00
|
|
|
@property
|
|
|
|
def stat(self):
|
|
|
|
"""Returns a :class:`DataFrameStatFunctions` for statistic functions.
|
|
|
|
"""
|
|
|
|
return DataFrameStatFunctions(self)
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
|
|
|
def toJSON(self, use_unicode=True):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Converts a :class:`DataFrame` into a :class:`RDD` of string.
|
|
|
|
|
|
|
|
Each row is turned into a JSON document as one element in the returned RDD.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.toJSON().first()
|
2015-04-16 19:20:57 -04:00
|
|
|
u'{"age":2,"name":"Alice"}'
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
|
|
|
rdd = self._jdf.toJSON()
|
|
|
|
return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
|
|
|
|
|
|
|
|
def saveAsParquetFile(self, path):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Saves the contents as a Parquet file, preserving the schema.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
Files that are written out using this method can be read back in as
|
2015-03-31 21:31:36 -04:00
|
|
|
a :class:`DataFrame` using :func:`SQLContext.parquetFile`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> import tempfile, shutil
|
|
|
|
>>> parquetFile = tempfile.mkdtemp()
|
|
|
|
>>> shutil.rmtree(parquetFile)
|
|
|
|
>>> df.saveAsParquetFile(parquetFile)
|
2015-04-08 16:31:45 -04:00
|
|
|
>>> df2 = sqlContext.parquetFile(parquetFile)
|
2015-02-09 23:49:22 -05:00
|
|
|
>>> sorted(df2.collect()) == sorted(df.collect())
|
|
|
|
True
|
|
|
|
"""
|
|
|
|
self._jdf.saveAsParquetFile(path)
|
|
|
|
|
|
|
|
def registerTempTable(self, name):
|
|
|
|
"""Registers this RDD as a temporary table using the given name.
|
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
The lifetime of this temporary table is tied to the :class:`SQLContext`
|
|
|
|
that was used to create this :class:`DataFrame`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.registerTempTable("people")
|
2015-04-08 16:31:45 -04:00
|
|
|
>>> df2 = sqlContext.sql("select * from people")
|
2015-02-09 23:49:22 -05:00
|
|
|
>>> sorted(df.collect()) == sorted(df2.collect())
|
|
|
|
True
|
|
|
|
"""
|
|
|
|
self._jdf.registerTempTable(name)
|
|
|
|
|
|
|
|
def registerAsTable(self, name):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""DEPRECATED: use :func:`registerTempTable` instead"""
|
2015-02-09 23:49:22 -05:00
|
|
|
warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning)
|
|
|
|
self.registerTempTable(name)
|
|
|
|
|
|
|
|
def insertInto(self, tableName, overwrite=False):
|
2015-02-24 23:51:55 -05:00
|
|
|
"""Inserts the contents of this :class:`DataFrame` into the specified table.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
Optionally overwriting any existing data.
|
|
|
|
"""
|
|
|
|
self._jdf.insertInto(tableName, overwrite)
|
|
|
|
|
2015-02-10 20:29:52 -05:00
|
|
|
def _java_save_mode(self, mode):
|
|
|
|
"""Returns the Java save mode based on the Python save mode represented by a string.
|
|
|
|
"""
|
2015-02-12 18:32:17 -05:00
|
|
|
jSaveMode = self._sc._jvm.org.apache.spark.sql.SaveMode
|
2015-02-10 20:29:52 -05:00
|
|
|
jmode = jSaveMode.ErrorIfExists
|
|
|
|
mode = mode.lower()
|
|
|
|
if mode == "append":
|
|
|
|
jmode = jSaveMode.Append
|
|
|
|
elif mode == "overwrite":
|
|
|
|
jmode = jSaveMode.Overwrite
|
|
|
|
elif mode == "ignore":
|
|
|
|
jmode = jSaveMode.Ignore
|
|
|
|
elif mode == "error":
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
raise ValueError(
|
|
|
|
"Only 'append', 'overwrite', 'ignore', and 'error' are acceptable save mode.")
|
|
|
|
return jmode
|
|
|
|
|
2015-03-17 21:41:06 -04:00
|
|
|
def saveAsTable(self, tableName, source=None, mode="error", **options):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Saves the contents of this :class:`DataFrame` to a data source as a table.
|
2015-02-10 20:29:52 -05:00
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
The data source is specified by the ``source`` and a set of ``options``.
|
|
|
|
If ``source`` is not specified, the default data source configured by
|
|
|
|
``spark.sql.sources.default`` will be used.
|
2015-02-10 20:29:52 -05:00
|
|
|
|
|
|
|
Additionally, mode is used to specify the behavior of the saveAsTable operation when
|
|
|
|
table already exists in the data source. There are four modes:
|
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
* `append`: Append contents of this :class:`DataFrame` to existing data.
|
|
|
|
* `overwrite`: Overwrite existing data.
|
|
|
|
* `error`: Throw an exception if data already exists.
|
|
|
|
* `ignore`: Silently ignore this operation if data already exists.
|
2015-02-10 20:29:52 -05:00
|
|
|
"""
|
|
|
|
if source is None:
|
|
|
|
source = self.sql_ctx.getConf("spark.sql.sources.default",
|
|
|
|
"org.apache.spark.sql.parquet")
|
|
|
|
jmode = self._java_save_mode(mode)
|
2015-04-21 03:08:18 -04:00
|
|
|
self._jdf.saveAsTable(tableName, source, jmode, options)
|
2015-02-10 20:29:52 -05:00
|
|
|
|
2015-03-17 21:41:06 -04:00
|
|
|
def save(self, path=None, source=None, mode="error", **options):
|
2015-02-24 23:51:55 -05:00
|
|
|
"""Saves the contents of the :class:`DataFrame` to a data source.
|
2015-02-10 20:29:52 -05:00
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
The data source is specified by the ``source`` and a set of ``options``.
|
|
|
|
If ``source`` is not specified, the default data source configured by
|
|
|
|
``spark.sql.sources.default`` will be used.
|
2015-02-10 20:29:52 -05:00
|
|
|
|
|
|
|
Additionally, mode is used to specify the behavior of the save operation when
|
|
|
|
data already exists in the data source. There are four modes:
|
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
* `append`: Append contents of this :class:`DataFrame` to existing data.
|
|
|
|
* `overwrite`: Overwrite existing data.
|
|
|
|
* `error`: Throw an exception if data already exists.
|
|
|
|
* `ignore`: Silently ignore this operation if data already exists.
|
2015-02-10 20:29:52 -05:00
|
|
|
"""
|
|
|
|
if path is not None:
|
|
|
|
options["path"] = path
|
|
|
|
if source is None:
|
|
|
|
source = self.sql_ctx.getConf("spark.sql.sources.default",
|
|
|
|
"org.apache.spark.sql.parquet")
|
|
|
|
jmode = self._java_save_mode(mode)
|
2015-04-21 03:08:18 -04:00
|
|
|
self._jdf.save(source, jmode, options)
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-02-14 02:03:22 -05:00
|
|
|
@property
|
2015-02-09 23:49:22 -05:00
|
|
|
def schema(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns the schema of this :class:`DataFrame` as a :class:`types.StructType`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-02-14 02:03:22 -05:00
|
|
|
>>> df.schema
|
2015-02-09 23:49:22 -05:00
|
|
|
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true)))
|
|
|
|
"""
|
2015-02-14 02:03:22 -05:00
|
|
|
if self._schema is None:
|
|
|
|
self._schema = _parse_datatype_json_string(self._jdf.schema().json())
|
|
|
|
return self._schema
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
def printSchema(self):
|
|
|
|
"""Prints out the schema in the tree format.
|
|
|
|
|
|
|
|
>>> df.printSchema()
|
|
|
|
root
|
|
|
|
|-- age: integer (nullable = true)
|
|
|
|
|-- name: string (nullable = true)
|
|
|
|
<BLANKLINE>
|
|
|
|
"""
|
2015-04-16 19:20:57 -04:00
|
|
|
print(self._jdf.schema().treeString())
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-02-17 13:22:48 -05:00
|
|
|
def explain(self, extended=False):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Prints the (logical and physical) plans to the console for debugging purpose.
|
2015-02-17 13:22:48 -05:00
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
|
2015-02-17 16:48:38 -05:00
|
|
|
|
|
|
|
>>> df.explain()
|
2015-04-29 03:35:08 -04:00
|
|
|
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at\
|
|
|
|
NativeMethodAccessorImpl.java:...
|
2015-02-17 16:48:38 -05:00
|
|
|
|
|
|
|
>>> df.explain(True)
|
|
|
|
== Parsed Logical Plan ==
|
|
|
|
...
|
|
|
|
== Analyzed Logical Plan ==
|
|
|
|
...
|
|
|
|
== Optimized Logical Plan ==
|
|
|
|
...
|
|
|
|
== Physical Plan ==
|
|
|
|
...
|
|
|
|
== RDD ==
|
|
|
|
"""
|
|
|
|
if extended:
|
2015-04-16 19:20:57 -04:00
|
|
|
print(self._jdf.queryExecution().toString())
|
2015-02-17 16:48:38 -05:00
|
|
|
else:
|
2015-04-16 19:20:57 -04:00
|
|
|
print(self._jdf.queryExecution().executedPlan().toString())
|
2015-02-17 13:22:48 -05:00
|
|
|
|
|
|
|
def isLocal(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
|
2015-02-17 13:22:48 -05:00
|
|
|
(without any Spark executors).
|
|
|
|
"""
|
|
|
|
return self._jdf.isLocal()
|
|
|
|
|
2015-02-26 13:40:58 -05:00
|
|
|
def show(self, n=20):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Prints the first ``n`` rows to the console.
|
2015-02-11 15:13:16 -05:00
|
|
|
|
2015-02-24 23:51:55 -05:00
|
|
|
>>> df
|
|
|
|
DataFrame[age: int, name: string]
|
2015-02-11 15:13:16 -05:00
|
|
|
>>> df.show()
|
2015-05-04 15:08:38 -04:00
|
|
|
+---+-----+
|
|
|
|
|age| name|
|
|
|
|
+---+-----+
|
|
|
|
| 2|Alice|
|
|
|
|
| 5| Bob|
|
|
|
|
+---+-----+
|
2015-02-11 15:13:16 -05:00
|
|
|
"""
|
2015-04-16 19:20:57 -04:00
|
|
|
print(self._jdf.showString(n))
|
2015-02-11 15:13:16 -05:00
|
|
|
|
|
|
|
def __repr__(self):
|
2015-02-17 13:22:48 -05:00
|
|
|
return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
|
2015-02-11 15:13:16 -05:00
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
def count(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns the number of rows in this :class:`DataFrame`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.count()
|
2015-04-16 19:20:57 -04:00
|
|
|
2
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
2015-04-16 19:20:57 -04:00
|
|
|
return int(self._jdf.count())
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def collect(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns all the records as a list of :class:`Row`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.collect()
|
|
|
|
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
|
|
|
|
"""
|
|
|
|
with SCCallSiteSync(self._sc) as css:
|
2015-03-09 19:24:06 -04:00
|
|
|
port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
|
|
|
|
rs = list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
|
2015-02-14 02:03:22 -05:00
|
|
|
cls = _create_cls(self.schema)
|
2015-02-09 23:49:22 -05:00
|
|
|
return [cls(r) for r in rs]
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def limit(self, num):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Limits the result count to the number specified.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.limit(1).collect()
|
|
|
|
[Row(age=2, name=u'Alice')]
|
|
|
|
>>> df.limit(0).collect()
|
|
|
|
[]
|
|
|
|
"""
|
|
|
|
jdf = self._jdf.limit(num)
|
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def take(self, num):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns the first ``num`` rows as a :class:`list` of :class:`Row`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.take(2)
|
|
|
|
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
|
|
|
|
"""
|
|
|
|
return self.limit(num).collect()
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def map(self, f):
|
2015-03-31 21:31:36 -04:00
|
|
|
""" Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`.
|
2015-02-14 02:03:22 -05:00
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
This is a shorthand for ``df.rdd.map()``.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.map(lambda p: p.name).collect()
|
|
|
|
[u'Alice', u'Bob']
|
|
|
|
"""
|
|
|
|
return self.rdd.map(f)
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-14 02:03:22 -05:00
|
|
|
def flatMap(self, f):
|
2015-03-31 21:31:36 -04:00
|
|
|
""" Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`,
|
2015-02-14 02:03:22 -05:00
|
|
|
and then flattening the results.
|
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
This is a shorthand for ``df.rdd.flatMap()``.
|
2015-02-14 02:03:22 -05:00
|
|
|
|
|
|
|
>>> df.flatMap(lambda p: p.name).collect()
|
|
|
|
[u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
|
|
|
|
"""
|
|
|
|
return self.rdd.flatMap(f)
|
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
def mapPartitions(self, f, preservesPartitioning=False):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns a new :class:`RDD` by applying the ``f`` function to each partition.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
This is a shorthand for ``df.rdd.mapPartitions()``.
|
2015-02-17 13:22:48 -05:00
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
|
|
|
|
>>> def f(iterator): yield 1
|
|
|
|
>>> rdd.mapPartitions(f).sum()
|
|
|
|
4
|
|
|
|
"""
|
|
|
|
return self.rdd.mapPartitions(f, preservesPartitioning)
|
|
|
|
|
2015-02-17 13:22:48 -05:00
|
|
|
def foreach(self, f):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`.
|
2015-02-17 13:22:48 -05:00
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
This is a shorthand for ``df.rdd.foreach()``.
|
2015-02-17 13:22:48 -05:00
|
|
|
|
|
|
|
>>> def f(person):
|
2015-04-16 19:20:57 -04:00
|
|
|
... print(person.name)
|
2015-02-17 13:22:48 -05:00
|
|
|
>>> df.foreach(f)
|
|
|
|
"""
|
|
|
|
return self.rdd.foreach(f)
|
|
|
|
|
|
|
|
def foreachPartition(self, f):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Applies the ``f`` function to each partition of this :class:`DataFrame`.
|
2015-02-17 13:22:48 -05:00
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
This a shorthand for ``df.rdd.foreachPartition()``.
|
2015-02-17 13:22:48 -05:00
|
|
|
|
|
|
|
>>> def f(people):
|
|
|
|
... for person in people:
|
2015-04-16 19:20:57 -04:00
|
|
|
... print(person.name)
|
2015-02-17 13:22:48 -05:00
|
|
|
>>> df.foreachPartition(f)
|
|
|
|
"""
|
|
|
|
return self.rdd.foreachPartition(f)
|
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
def cache(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
""" Persists with the default storage level (C{MEMORY_ONLY_SER}).
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
|
|
|
self.is_cached = True
|
|
|
|
self._jdf.cache()
|
|
|
|
return self
|
|
|
|
|
|
|
|
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Sets the storage level to persist its values across operations
|
2015-02-09 23:49:22 -05:00
|
|
|
after the first time it is computed. This can only be used to assign
|
|
|
|
a new storage level if the RDD does not have a storage level set yet.
|
|
|
|
If no storage level is specified defaults to (C{MEMORY_ONLY_SER}).
|
|
|
|
"""
|
|
|
|
self.is_cached = True
|
|
|
|
javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
|
|
|
|
self._jdf.persist(javaStorageLevel)
|
|
|
|
return self
|
|
|
|
|
|
|
|
def unpersist(self, blocking=True):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Marks the :class:`DataFrame` as non-persistent, and remove all blocks for it from
|
2015-02-09 23:49:22 -05:00
|
|
|
memory and disk.
|
|
|
|
"""
|
|
|
|
self.is_cached = False
|
|
|
|
self._jdf.unpersist(blocking)
|
|
|
|
return self
|
|
|
|
|
|
|
|
# def coalesce(self, numPartitions, shuffle=False):
|
|
|
|
# rdd = self._jdf.coalesce(numPartitions, shuffle, None)
|
|
|
|
# return DataFrame(rdd, self.sql_ctx)
|
|
|
|
|
|
|
|
def repartition(self, numPartitions):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns a new :class:`DataFrame` that has exactly ``numPartitions`` partitions.
|
2015-02-18 04:00:54 -05:00
|
|
|
|
|
|
|
>>> df.repartition(10).rdd.getNumPartitions()
|
|
|
|
10
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
2015-02-18 04:00:54 -05:00
|
|
|
return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx)
|
2015-02-17 13:22:48 -05:00
|
|
|
|
|
|
|
def distinct(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`.
|
2015-02-18 04:00:54 -05:00
|
|
|
|
|
|
|
>>> df.distinct().count()
|
2015-04-16 19:20:57 -04:00
|
|
|
2
|
2015-02-17 13:22:48 -05:00
|
|
|
"""
|
|
|
|
return DataFrame(self._jdf.distinct(), self.sql_ctx)
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
def sample(self, withReplacement, fraction, seed=None):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns a sampled subset of this :class:`DataFrame`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-04-29 18:34:05 -04:00
|
|
|
>>> df.sample(False, 0.5, 42).count()
|
2015-04-16 19:20:57 -04:00
|
|
|
1
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
|
|
|
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
|
2015-04-16 19:20:57 -04:00
|
|
|
seed = seed if seed is not None else random.randint(0, sys.maxsize)
|
2015-02-09 23:49:22 -05:00
|
|
|
rdd = self._jdf.sample(withReplacement, fraction, long(seed))
|
|
|
|
return DataFrame(rdd, self.sql_ctx)
|
|
|
|
|
2015-04-29 18:34:05 -04:00
|
|
|
def randomSplit(self, weights, seed=None):
|
|
|
|
"""Randomly splits this :class:`DataFrame` with the provided weights.
|
|
|
|
|
2015-04-29 22:13:47 -04:00
|
|
|
:param weights: list of doubles as weights with which to split the DataFrame. Weights will
|
|
|
|
be normalized if they don't sum up to 1.0.
|
|
|
|
:param seed: The seed for sampling.
|
|
|
|
|
2015-04-29 18:34:05 -04:00
|
|
|
>>> splits = df4.randomSplit([1.0, 2.0], 24)
|
|
|
|
>>> splits[0].count()
|
|
|
|
1
|
|
|
|
|
|
|
|
>>> splits[1].count()
|
|
|
|
3
|
|
|
|
"""
|
|
|
|
for w in weights:
|
2015-04-29 22:13:47 -04:00
|
|
|
if w < 0.0:
|
|
|
|
raise ValueError("Weights must be positive. Found weight value: %s" % w)
|
2015-04-29 18:34:05 -04:00
|
|
|
seed = seed if seed is not None else random.randint(0, sys.maxsize)
|
|
|
|
rdd_array = self._jdf.randomSplit(_to_seq(self.sql_ctx._sc, weights), long(seed))
|
|
|
|
return [DataFrame(rdd, self.sql_ctx) for rdd in rdd_array]
|
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
@property
|
|
|
|
def dtypes(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns all column names and their data types as a list.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.dtypes
|
2015-02-11 15:13:16 -05:00
|
|
|
[('age', 'int'), ('name', 'string')]
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
2015-02-14 02:03:22 -05:00
|
|
|
return [(str(f.name), f.dataType.simpleString()) for f in self.schema.fields]
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
@property
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def columns(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns all column names as a list.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.columns
|
|
|
|
[u'age', u'name']
|
|
|
|
"""
|
2015-02-14 02:03:22 -05:00
|
|
|
return [f.name for f in self.schema.fields]
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-04-23 21:52:55 -04:00
|
|
|
@ignore_unicode_prefix
|
|
|
|
def alias(self, alias):
|
|
|
|
"""Returns a new :class:`DataFrame` with an alias set.
|
|
|
|
|
|
|
|
>>> from pyspark.sql.functions import *
|
|
|
|
>>> df_as1 = df.alias("df_as1")
|
|
|
|
>>> df_as2 = df.alias("df_as2")
|
|
|
|
>>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
|
|
|
|
>>> joined_df.select(col("df_as1.name"), col("df_as2.name"), col("df_as2.age")).collect()
|
|
|
|
[Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)]
|
|
|
|
"""
|
|
|
|
assert isinstance(alias, basestring), "alias should be a string"
|
|
|
|
return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx)
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def join(self, other, joinExprs=None, joinType=None):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Joins with another :class:`DataFrame`, using the given join expression.
|
|
|
|
|
|
|
|
The following performs a full outer join between ``df1`` and ``df2``.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
:param other: Right side of the join
|
2015-04-22 18:26:58 -04:00
|
|
|
:param joinExprs: a string for join column name, or a join expression (Column).
|
|
|
|
If joinExprs is a string indicating the name of the join column,
|
|
|
|
the column must exist on both sides, and this performs an inner equi-join.
|
2015-03-31 21:31:36 -04:00
|
|
|
:param joinType: str, default 'inner'.
|
|
|
|
One of `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
|
2015-04-15 16:06:38 -04:00
|
|
|
[Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)]
|
2015-04-22 18:26:58 -04:00
|
|
|
|
|
|
|
>>> df.join(df2, 'name').select(df.name, df2.height).collect()
|
|
|
|
[Row(name=u'Bob', height=85)]
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
|
|
|
|
|
|
|
if joinExprs is None:
|
|
|
|
jdf = self._jdf.join(other._jdf)
|
2015-04-22 18:26:58 -04:00
|
|
|
elif isinstance(joinExprs, basestring):
|
|
|
|
jdf = self._jdf.join(other._jdf, joinExprs)
|
2015-02-09 23:49:22 -05:00
|
|
|
else:
|
|
|
|
assert isinstance(joinExprs, Column), "joinExprs should be Column"
|
|
|
|
if joinType is None:
|
|
|
|
jdf = self._jdf.join(other._jdf, joinExprs._jc)
|
|
|
|
else:
|
|
|
|
assert isinstance(joinType, basestring), "joinType should be basestring"
|
|
|
|
jdf = self._jdf.join(other._jdf, joinExprs._jc, joinType)
|
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-04-17 12:29:27 -04:00
|
|
|
def sort(self, *cols, **kwargs):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns a new :class:`DataFrame` sorted by the specified column(s).
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-04-17 12:29:27 -04:00
|
|
|
:param cols: list of :class:`Column` or column names to sort by.
|
2015-04-17 17:30:13 -04:00
|
|
|
:param ascending: boolean or list of boolean (default True).
|
|
|
|
Sort ascending vs. descending. Specify list for multiple sort orders.
|
|
|
|
If a list is specified, length of the list must equal length of the `cols`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.sort(df.age.desc()).collect()
|
|
|
|
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
|
2015-04-17 12:29:27 -04:00
|
|
|
>>> df.sort("age", ascending=False).collect()
|
|
|
|
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
|
2015-02-24 21:59:23 -05:00
|
|
|
>>> df.orderBy(df.age.desc()).collect()
|
|
|
|
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
|
|
|
|
>>> from pyspark.sql.functions import *
|
|
|
|
>>> df.sort(asc("age")).collect()
|
|
|
|
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
|
|
|
|
>>> df.orderBy(desc("age"), "name").collect()
|
2015-02-09 23:49:22 -05:00
|
|
|
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
|
2015-04-17 12:29:27 -04:00
|
|
|
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
|
|
|
|
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
|
|
|
if not cols:
|
|
|
|
raise ValueError("should sort by at least one column")
|
2015-04-17 12:29:27 -04:00
|
|
|
if len(cols) == 1 and isinstance(cols[0], list):
|
|
|
|
cols = cols[0]
|
|
|
|
jcols = [_to_java_column(c) for c in cols]
|
|
|
|
ascending = kwargs.get('ascending', True)
|
|
|
|
if isinstance(ascending, (bool, int)):
|
|
|
|
if not ascending:
|
|
|
|
jcols = [jc.desc() for jc in jcols]
|
|
|
|
elif isinstance(ascending, list):
|
|
|
|
jcols = [jc if asc else jc.desc()
|
|
|
|
for asc, jc in zip(ascending, jcols)]
|
|
|
|
else:
|
2015-04-17 17:30:13 -04:00
|
|
|
raise TypeError("ascending can only be boolean or list, but got %s" % type(ascending))
|
2015-04-17 12:29:27 -04:00
|
|
|
|
|
|
|
jdf = self._jdf.sort(self._jseq(jcols))
|
2015-02-09 23:49:22 -05:00
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
|
2015-02-24 21:59:23 -05:00
|
|
|
orderBy = sort
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-04-17 12:29:27 -04:00
|
|
|
def _jseq(self, cols, converter=None):
|
|
|
|
"""Return a JVM Seq of Columns from a list of Column or names"""
|
|
|
|
return _to_seq(self.sql_ctx._sc, cols, converter)
|
|
|
|
|
2015-05-12 13:23:41 -04:00
|
|
|
def _jmap(self, jm):
|
|
|
|
"""Return a JVM Scala Map from a dict"""
|
|
|
|
return _to_scala_map(self.sql_ctx._sc, jm)
|
|
|
|
|
2015-04-17 12:29:27 -04:00
|
|
|
def _jcols(self, *cols):
|
|
|
|
"""Return a JVM Seq of Columns from a list of Column or column names
|
|
|
|
|
|
|
|
If `cols` has only one list in it, cols[0] will be used as the list.
|
|
|
|
"""
|
|
|
|
if len(cols) == 1 and isinstance(cols[0], list):
|
|
|
|
cols = cols[0]
|
|
|
|
return self._jseq(cols, _to_java_column)
|
|
|
|
|
2015-03-26 15:26:13 -04:00
|
|
|
def describe(self, *cols):
|
|
|
|
"""Computes statistics for numeric columns.
|
|
|
|
|
|
|
|
This include count, mean, stddev, min, and max. If no columns are
|
|
|
|
given, this function computes statistics for all numerical columns.
|
|
|
|
|
|
|
|
>>> df.describe().show()
|
2015-05-04 15:08:38 -04:00
|
|
|
+-------+---+
|
|
|
|
|summary|age|
|
|
|
|
+-------+---+
|
|
|
|
| count| 2|
|
|
|
|
| mean|3.5|
|
|
|
|
| stddev|1.5|
|
|
|
|
| min| 2|
|
|
|
|
| max| 5|
|
|
|
|
+-------+---+
|
2015-03-26 15:26:13 -04:00
|
|
|
"""
|
2015-04-17 12:29:27 -04:00
|
|
|
jdf = self._jdf.describe(self._jseq(cols))
|
2015-03-26 15:26:13 -04:00
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def head(self, n=None):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""
|
|
|
|
Returns the first ``n`` rows as a list of :class:`Row`,
|
|
|
|
or the first :class:`Row` if ``n`` is ``None.``
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.head()
|
|
|
|
Row(age=2, name=u'Alice')
|
|
|
|
>>> df.head(1)
|
|
|
|
[Row(age=2, name=u'Alice')]
|
|
|
|
"""
|
|
|
|
if n is None:
|
|
|
|
rs = self.head(1)
|
|
|
|
return rs[0] if rs else None
|
|
|
|
return self.take(n)
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def first(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns the first row as a :class:`Row`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.first()
|
|
|
|
Row(age=2, name=u'Alice')
|
|
|
|
"""
|
|
|
|
return self.head()
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def __getitem__(self, item):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns the column as a :class:`Column`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-02-19 15:09:44 -05:00
|
|
|
>>> df.select(df['age']).collect()
|
2015-02-09 23:49:22 -05:00
|
|
|
[Row(age=2), Row(age=5)]
|
|
|
|
>>> df[ ["name", "age"]].collect()
|
|
|
|
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
|
|
|
|
>>> df[ df.age > 3 ].collect()
|
|
|
|
[Row(age=5, name=u'Bob')]
|
2015-04-16 20:33:57 -04:00
|
|
|
>>> df[df[0] > 3].collect()
|
|
|
|
[Row(age=5, name=u'Bob')]
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
|
|
|
if isinstance(item, basestring):
|
2015-04-16 20:33:57 -04:00
|
|
|
if item not in self.columns:
|
|
|
|
raise IndexError("no such column: %s" % item)
|
2015-02-09 23:49:22 -05:00
|
|
|
jc = self._jdf.apply(item)
|
2015-02-19 15:09:44 -05:00
|
|
|
return Column(jc)
|
2015-02-09 23:49:22 -05:00
|
|
|
elif isinstance(item, Column):
|
|
|
|
return self.filter(item)
|
2015-04-16 20:33:57 -04:00
|
|
|
elif isinstance(item, (list, tuple)):
|
2015-02-09 23:49:22 -05:00
|
|
|
return self.select(*item)
|
2015-04-16 20:33:57 -04:00
|
|
|
elif isinstance(item, int):
|
|
|
|
jc = self._jdf.apply(self.columns[item])
|
|
|
|
return Column(jc)
|
2015-02-09 23:49:22 -05:00
|
|
|
else:
|
2015-04-20 13:44:09 -04:00
|
|
|
raise TypeError("unexpected item type: %s" % type(item))
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
def __getattr__(self, name):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns the :class:`Column` denoted by ``name``.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-02-19 15:09:44 -05:00
|
|
|
>>> df.select(df.age).collect()
|
2015-02-09 23:49:22 -05:00
|
|
|
[Row(age=2), Row(age=5)]
|
|
|
|
"""
|
2015-04-16 20:33:57 -04:00
|
|
|
if name not in self.columns:
|
2015-04-29 12:48:47 -04:00
|
|
|
raise AttributeError(
|
|
|
|
"'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
|
2015-02-09 23:49:22 -05:00
|
|
|
jc = self._jdf.apply(name)
|
2015-02-19 15:09:44 -05:00
|
|
|
return Column(jc)
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def select(self, *cols):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Projects a set of expressions and returns a new :class:`DataFrame`.
|
|
|
|
|
|
|
|
:param cols: list of column names (string) or expressions (:class:`Column`).
|
|
|
|
If one of the column names is '*', that column is expanded to include all columns
|
|
|
|
in the current DataFrame.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.select('*').collect()
|
|
|
|
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
|
|
|
|
>>> df.select('name', 'age').collect()
|
|
|
|
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
|
|
|
|
>>> df.select(df.name, (df.age + 10).alias('age')).collect()
|
|
|
|
[Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
|
|
|
|
"""
|
2015-04-17 12:29:27 -04:00
|
|
|
jdf = self._jdf.select(self._jcols(*cols))
|
2015-02-09 23:49:22 -05:00
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
|
|
|
|
def selectExpr(self, *expr):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Projects a set of SQL expressions and returns a new :class:`DataFrame`.
|
|
|
|
|
|
|
|
This is a variant of :func:`select` that accepts SQL expressions.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.selectExpr("age * 2", "abs(age)").collect()
|
2015-02-10 16:14:01 -05:00
|
|
|
[Row((age * 2)=4, Abs(age)=2), Row((age * 2)=10, Abs(age)=5)]
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
2015-04-17 12:29:27 -04:00
|
|
|
if len(expr) == 1 and isinstance(expr[0], list):
|
|
|
|
expr = expr[0]
|
|
|
|
jdf = self._jdf.selectExpr(self._jseq(expr))
|
2015-02-09 23:49:22 -05:00
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def filter(self, condition):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Filters rows using the given condition.
|
|
|
|
|
|
|
|
:func:`where` is an alias for :func:`filter`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
:param condition: a :class:`Column` of :class:`types.BooleanType`
|
|
|
|
or a string of SQL expression.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.filter(df.age > 3).collect()
|
|
|
|
[Row(age=5, name=u'Bob')]
|
|
|
|
>>> df.where(df.age == 2).collect()
|
|
|
|
[Row(age=2, name=u'Alice')]
|
|
|
|
|
|
|
|
>>> df.filter("age > 3").collect()
|
|
|
|
[Row(age=5, name=u'Bob')]
|
|
|
|
>>> df.where("age = 2").collect()
|
|
|
|
[Row(age=2, name=u'Alice')]
|
|
|
|
"""
|
|
|
|
if isinstance(condition, basestring):
|
|
|
|
jdf = self._jdf.filter(condition)
|
|
|
|
elif isinstance(condition, Column):
|
|
|
|
jdf = self._jdf.filter(condition._jc)
|
|
|
|
else:
|
|
|
|
raise TypeError("condition should be string or Column")
|
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
|
|
|
|
where = filter
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def groupBy(self, *cols):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Groups the :class:`DataFrame` using the specified columns,
|
2015-02-09 23:49:22 -05:00
|
|
|
so we can run aggregation on them. See :class:`GroupedData`
|
|
|
|
for all the available aggregate functions.
|
|
|
|
|
2015-04-17 12:29:27 -04:00
|
|
|
:func:`groupby` is an alias for :func:`groupBy`.
|
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
:param cols: list of columns to group by.
|
|
|
|
Each element should be a column name (string) or an expression (:class:`Column`).
|
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
>>> df.groupBy().avg().collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(AVG(age)=3.5)]
|
2015-02-09 23:49:22 -05:00
|
|
|
>>> df.groupBy('name').agg({'age': 'mean'}).collect()
|
2015-04-15 16:06:38 -04:00
|
|
|
[Row(name=u'Alice', AVG(age)=2.0), Row(name=u'Bob', AVG(age)=5.0)]
|
2015-02-09 23:49:22 -05:00
|
|
|
>>> df.groupBy(df.name).avg().collect()
|
2015-04-15 16:06:38 -04:00
|
|
|
[Row(name=u'Alice', AVG(age)=2.0), Row(name=u'Bob', AVG(age)=5.0)]
|
2015-04-17 12:29:27 -04:00
|
|
|
>>> df.groupBy(['name', df.age]).count().collect()
|
|
|
|
[Row(name=u'Bob', age=5, count=1), Row(name=u'Alice', age=2, count=1)]
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
2015-04-17 12:29:27 -04:00
|
|
|
jdf = self._jdf.groupBy(self._jcols(*cols))
|
2015-02-09 23:49:22 -05:00
|
|
|
return GroupedData(jdf, self.sql_ctx)
|
|
|
|
|
|
|
|
def agg(self, *exprs):
|
|
|
|
""" Aggregate on the entire :class:`DataFrame` without groups
|
2015-03-31 21:31:36 -04:00
|
|
|
(shorthand for ``df.groupBy.agg()``).
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.agg({"age": "max"}).collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(MAX(age)=5)]
|
2015-02-14 02:03:22 -05:00
|
|
|
>>> from pyspark.sql import functions as F
|
|
|
|
>>> df.agg(F.min(df.age)).collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(MIN(age)=2)]
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
|
|
|
return self.groupBy().agg(*exprs)
|
|
|
|
|
|
|
|
def unionAll(self, other):
|
2015-02-24 23:51:55 -05:00
|
|
|
""" Return a new :class:`DataFrame` containing union of rows in this
|
2015-02-09 23:49:22 -05:00
|
|
|
frame and another frame.
|
|
|
|
|
|
|
|
This is equivalent to `UNION ALL` in SQL.
|
|
|
|
"""
|
|
|
|
return DataFrame(self._jdf.unionAll(other._jdf), self.sql_ctx)
|
|
|
|
|
|
|
|
def intersect(self, other):
|
|
|
|
""" Return a new :class:`DataFrame` containing rows only in
|
|
|
|
both this frame and another frame.
|
|
|
|
|
|
|
|
This is equivalent to `INTERSECT` in SQL.
|
|
|
|
"""
|
|
|
|
return DataFrame(self._jdf.intersect(other._jdf), self.sql_ctx)
|
|
|
|
|
|
|
|
def subtract(self, other):
|
|
|
|
""" Return a new :class:`DataFrame` containing rows in this frame
|
|
|
|
but not in another frame.
|
|
|
|
|
|
|
|
This is equivalent to `EXCEPT` in SQL.
|
|
|
|
"""
|
|
|
|
return DataFrame(getattr(self._jdf, "except")(other._jdf), self.sql_ctx)
|
|
|
|
|
2015-05-11 22:15:14 -04:00
|
|
|
def dropDuplicates(self, subset=None):
|
|
|
|
"""Return a new :class:`DataFrame` with duplicate rows removed,
|
|
|
|
optionally only considering certain columns.
|
|
|
|
|
|
|
|
>>> from pyspark.sql import Row
|
|
|
|
>>> df = sc.parallelize([ \
|
|
|
|
Row(name='Alice', age=5, height=80), \
|
|
|
|
Row(name='Alice', age=5, height=80), \
|
|
|
|
Row(name='Alice', age=10, height=80)]).toDF()
|
|
|
|
>>> df.dropDuplicates().show()
|
|
|
|
+---+------+-----+
|
|
|
|
|age|height| name|
|
|
|
|
+---+------+-----+
|
|
|
|
| 5| 80|Alice|
|
|
|
|
| 10| 80|Alice|
|
|
|
|
+---+------+-----+
|
|
|
|
|
|
|
|
>>> df.dropDuplicates(['name', 'height']).show()
|
|
|
|
+---+------+-----+
|
|
|
|
|age|height| name|
|
|
|
|
+---+------+-----+
|
|
|
|
| 5| 80|Alice|
|
|
|
|
+---+------+-----+
|
|
|
|
"""
|
|
|
|
if subset is None:
|
|
|
|
jdf = self._jdf.dropDuplicates()
|
|
|
|
else:
|
|
|
|
jdf = self._jdf.dropDuplicates(self._jseq(subset))
|
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
|
2015-03-30 23:47:10 -04:00
|
|
|
def dropna(self, how='any', thresh=None, subset=None):
|
|
|
|
"""Returns a new :class:`DataFrame` omitting rows with null values.
|
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
This is an alias for ``na.drop()``.
|
2015-03-31 03:25:23 -04:00
|
|
|
|
2015-03-30 23:47:10 -04:00
|
|
|
:param how: 'any' or 'all'.
|
|
|
|
If 'any', drop a row if it contains any nulls.
|
|
|
|
If 'all', drop a row only if all its values are null.
|
|
|
|
:param thresh: int, default None
|
|
|
|
If specified, drop rows that have less than `thresh` non-null values.
|
|
|
|
This overwrites the `how` parameter.
|
|
|
|
:param subset: optional list of column names to consider.
|
|
|
|
|
|
|
|
>>> df4.dropna().show()
|
2015-05-04 15:08:38 -04:00
|
|
|
+---+------+-----+
|
|
|
|
|age|height| name|
|
|
|
|
+---+------+-----+
|
|
|
|
| 10| 80|Alice|
|
|
|
|
+---+------+-----+
|
2015-03-31 03:25:23 -04:00
|
|
|
|
|
|
|
>>> df4.na.drop().show()
|
2015-05-04 15:08:38 -04:00
|
|
|
+---+------+-----+
|
|
|
|
|age|height| name|
|
|
|
|
+---+------+-----+
|
|
|
|
| 10| 80|Alice|
|
|
|
|
+---+------+-----+
|
2015-03-30 23:47:10 -04:00
|
|
|
"""
|
|
|
|
if how is not None and how not in ['any', 'all']:
|
|
|
|
raise ValueError("how ('" + how + "') should be 'any' or 'all'")
|
|
|
|
|
|
|
|
if subset is None:
|
|
|
|
subset = self.columns
|
|
|
|
elif isinstance(subset, basestring):
|
|
|
|
subset = [subset]
|
|
|
|
elif not isinstance(subset, (list, tuple)):
|
|
|
|
raise ValueError("subset should be a list or tuple of column names")
|
|
|
|
|
|
|
|
if thresh is None:
|
|
|
|
thresh = len(subset) if how == 'any' else 1
|
|
|
|
|
2015-04-17 12:29:27 -04:00
|
|
|
return DataFrame(self._jdf.na().drop(thresh, self._jseq(subset)), self.sql_ctx)
|
2015-03-30 23:47:10 -04:00
|
|
|
|
|
|
|
def fillna(self, value, subset=None):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Replace null values, alias for ``na.fill()``.
|
2015-03-30 23:47:10 -04:00
|
|
|
|
|
|
|
:param value: int, long, float, string, or dict.
|
|
|
|
Value to replace null values with.
|
|
|
|
If the value is a dict, then `subset` is ignored and `value` must be a mapping
|
|
|
|
from column name (string) to replacement value. The replacement value must be
|
|
|
|
an int, long, float, or string.
|
|
|
|
:param subset: optional list of column names to consider.
|
|
|
|
Columns specified in subset that do not have matching data type are ignored.
|
|
|
|
For example, if `value` is a string, and subset contains a non-string column,
|
|
|
|
then the non-string column is simply ignored.
|
|
|
|
|
|
|
|
>>> df4.fillna(50).show()
|
2015-05-04 15:08:38 -04:00
|
|
|
+---+------+-----+
|
|
|
|
|age|height| name|
|
|
|
|
+---+------+-----+
|
|
|
|
| 10| 80|Alice|
|
|
|
|
| 5| 50| Bob|
|
|
|
|
| 50| 50| Tom|
|
|
|
|
| 50| 50| null|
|
|
|
|
+---+------+-----+
|
2015-03-30 23:47:10 -04:00
|
|
|
|
|
|
|
>>> df4.fillna({'age': 50, 'name': 'unknown'}).show()
|
2015-05-04 15:08:38 -04:00
|
|
|
+---+------+-------+
|
|
|
|
|age|height| name|
|
|
|
|
+---+------+-------+
|
|
|
|
| 10| 80| Alice|
|
|
|
|
| 5| null| Bob|
|
|
|
|
| 50| null| Tom|
|
|
|
|
| 50| null|unknown|
|
|
|
|
+---+------+-------+
|
2015-03-31 03:25:23 -04:00
|
|
|
|
|
|
|
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
|
2015-05-04 15:08:38 -04:00
|
|
|
+---+------+-------+
|
|
|
|
|age|height| name|
|
|
|
|
+---+------+-------+
|
|
|
|
| 10| 80| Alice|
|
|
|
|
| 5| null| Bob|
|
|
|
|
| 50| null| Tom|
|
|
|
|
| 50| null|unknown|
|
|
|
|
+---+------+-------+
|
2015-03-30 23:47:10 -04:00
|
|
|
"""
|
|
|
|
if not isinstance(value, (float, int, long, basestring, dict)):
|
|
|
|
raise ValueError("value should be a float, int, long, string, or dict")
|
|
|
|
|
|
|
|
if isinstance(value, (int, long)):
|
|
|
|
value = float(value)
|
|
|
|
|
|
|
|
if isinstance(value, dict):
|
|
|
|
return DataFrame(self._jdf.na().fill(value), self.sql_ctx)
|
|
|
|
elif subset is None:
|
|
|
|
return DataFrame(self._jdf.na().fill(value), self.sql_ctx)
|
|
|
|
else:
|
|
|
|
if isinstance(subset, basestring):
|
|
|
|
subset = [subset]
|
|
|
|
elif not isinstance(subset, (list, tuple)):
|
|
|
|
raise ValueError("subset should be a list or tuple of column names")
|
|
|
|
|
2015-04-17 12:29:27 -04:00
|
|
|
return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), self.sql_ctx)
|
2015-03-30 23:47:10 -04:00
|
|
|
|
2015-05-12 13:23:41 -04:00
|
|
|
def replace(self, to_replace, value, subset=None):
|
|
|
|
"""Returns a new :class:`DataFrame` replacing a value with another value.
|
|
|
|
|
|
|
|
:param to_replace: int, long, float, string, or list.
|
|
|
|
Value to be replaced.
|
|
|
|
If the value is a dict, then `value` is ignored and `to_replace` must be a
|
|
|
|
mapping from column name (string) to replacement value. The value to be
|
|
|
|
replaced must be an int, long, float, or string.
|
|
|
|
:param value: int, long, float, string, or list.
|
|
|
|
Value to use to replace holes.
|
|
|
|
The replacement value must be an int, long, float, or string. If `value` is a
|
|
|
|
list or tuple, `value` should be of the same length with `to_replace`.
|
|
|
|
:param subset: optional list of column names to consider.
|
|
|
|
Columns specified in subset that do not have matching data type are ignored.
|
|
|
|
For example, if `value` is a string, and subset contains a non-string column,
|
|
|
|
then the non-string column is simply ignored.
|
|
|
|
>>> df4.replace(10, 20).show()
|
|
|
|
+----+------+-----+
|
|
|
|
| age|height| name|
|
|
|
|
+----+------+-----+
|
|
|
|
| 20| 80|Alice|
|
|
|
|
| 5| null| Bob|
|
|
|
|
|null| null| Tom|
|
|
|
|
|null| null| null|
|
|
|
|
+----+------+-----+
|
|
|
|
|
|
|
|
>>> df4.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
|
|
|
|
+----+------+----+
|
|
|
|
| age|height|name|
|
|
|
|
+----+------+----+
|
|
|
|
| 10| 80| A|
|
|
|
|
| 5| null| B|
|
|
|
|
|null| null| Tom|
|
|
|
|
|null| null|null|
|
|
|
|
+----+------+----+
|
|
|
|
"""
|
|
|
|
if not isinstance(to_replace, (float, int, long, basestring, list, tuple, dict)):
|
|
|
|
raise ValueError(
|
|
|
|
"to_replace should be a float, int, long, string, list, tuple, or dict")
|
|
|
|
|
|
|
|
if not isinstance(value, (float, int, long, basestring, list, tuple)):
|
|
|
|
raise ValueError("value should be a float, int, long, string, list, or tuple")
|
|
|
|
|
|
|
|
rep_dict = dict()
|
|
|
|
|
|
|
|
if isinstance(to_replace, (float, int, long, basestring)):
|
|
|
|
to_replace = [to_replace]
|
|
|
|
|
|
|
|
if isinstance(to_replace, tuple):
|
|
|
|
to_replace = list(to_replace)
|
|
|
|
|
|
|
|
if isinstance(value, tuple):
|
|
|
|
value = list(value)
|
|
|
|
|
|
|
|
if isinstance(to_replace, list) and isinstance(value, list):
|
|
|
|
if len(to_replace) != len(value):
|
|
|
|
raise ValueError("to_replace and value lists should be of the same length")
|
|
|
|
rep_dict = dict(zip(to_replace, value))
|
|
|
|
elif isinstance(to_replace, list) and isinstance(value, (float, int, long, basestring)):
|
|
|
|
rep_dict = dict([(tr, value) for tr in to_replace])
|
|
|
|
elif isinstance(to_replace, dict):
|
|
|
|
rep_dict = to_replace
|
|
|
|
|
|
|
|
if subset is None:
|
|
|
|
return DataFrame(self._jdf.na().replace('*', rep_dict), self.sql_ctx)
|
|
|
|
elif isinstance(subset, basestring):
|
|
|
|
subset = [subset]
|
|
|
|
|
|
|
|
if not isinstance(subset, (list, tuple)):
|
|
|
|
raise ValueError("subset should be a list or tuple of column names")
|
|
|
|
|
|
|
|
return DataFrame(
|
|
|
|
self._jdf.na().replace(self._jseq(subset), self._jmap(rep_dict)), self.sql_ctx)
|
|
|
|
|
2015-05-04 00:44:39 -04:00
|
|
|
def corr(self, col1, col2, method=None):
|
|
|
|
"""
|
|
|
|
Calculates the correlation of two columns of a DataFrame as a double value. Currently only
|
|
|
|
supports the Pearson Correlation Coefficient.
|
|
|
|
:func:`DataFrame.corr` and :func:`DataFrameStatFunctions.corr` are aliases.
|
|
|
|
|
|
|
|
:param col1: The name of the first column
|
|
|
|
:param col2: The name of the second column
|
|
|
|
:param method: The correlation method. Currently only supports "pearson"
|
|
|
|
"""
|
|
|
|
if not isinstance(col1, str):
|
|
|
|
raise ValueError("col1 should be a string.")
|
|
|
|
if not isinstance(col2, str):
|
|
|
|
raise ValueError("col2 should be a string.")
|
|
|
|
if not method:
|
|
|
|
method = "pearson"
|
|
|
|
if not method == "pearson":
|
|
|
|
raise ValueError("Currently only the calculation of the Pearson Correlation " +
|
|
|
|
"coefficient is supported.")
|
|
|
|
return self._jdf.stat().corr(col1, col2, method)
|
|
|
|
|
2015-05-01 16:29:17 -04:00
|
|
|
def cov(self, col1, col2):
|
|
|
|
"""
|
|
|
|
Calculate the sample covariance for the given columns, specified by their names, as a
|
|
|
|
double value. :func:`DataFrame.cov` and :func:`DataFrameStatFunctions.cov` are aliases.
|
|
|
|
|
|
|
|
:param col1: The name of the first column
|
|
|
|
:param col2: The name of the second column
|
|
|
|
"""
|
|
|
|
if not isinstance(col1, str):
|
|
|
|
raise ValueError("col1 should be a string.")
|
|
|
|
if not isinstance(col2, str):
|
|
|
|
raise ValueError("col2 should be a string.")
|
|
|
|
return self._jdf.stat().cov(col1, col2)
|
|
|
|
|
2015-05-04 20:02:49 -04:00
|
|
|
def crosstab(self, col1, col2):
|
|
|
|
"""
|
|
|
|
Computes a pair-wise frequency table of the given columns. Also known as a contingency
|
2015-05-05 14:01:25 -04:00
|
|
|
table. The number of distinct values for each column should be less than 1e4. At most 1e6
|
|
|
|
non-zero pair frequencies will be returned.
|
|
|
|
The first column of each row will be the distinct values of `col1` and the column names
|
|
|
|
will be the distinct values of `col2`. The name of the first column will be `$col1_$col2`.
|
|
|
|
Pairs that have no occurrences will have `null` as their counts.
|
2015-05-04 20:02:49 -04:00
|
|
|
:func:`DataFrame.crosstab` and :func:`DataFrameStatFunctions.crosstab` are aliases.
|
|
|
|
|
|
|
|
:param col1: The name of the first column. Distinct items will make the first item of
|
|
|
|
each row.
|
|
|
|
:param col2: The name of the second column. Distinct items will make the column names
|
|
|
|
of the DataFrame.
|
|
|
|
"""
|
|
|
|
if not isinstance(col1, str):
|
|
|
|
raise ValueError("col1 should be a string.")
|
|
|
|
if not isinstance(col2, str):
|
|
|
|
raise ValueError("col2 should be a string.")
|
|
|
|
return DataFrame(self._jdf.stat().crosstab(col1, col2), self.sql_ctx)
|
|
|
|
|
2015-05-02 02:43:24 -04:00
|
|
|
def freqItems(self, cols, support=None):
|
|
|
|
"""
|
|
|
|
Finding frequent items for columns, possibly with false positives. Using the
|
|
|
|
frequent element count algorithm described in
|
|
|
|
"http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou".
|
|
|
|
:func:`DataFrame.freqItems` and :func:`DataFrameStatFunctions.freqItems` are aliases.
|
|
|
|
|
|
|
|
:param cols: Names of the columns to calculate frequent items for as a list or tuple of
|
|
|
|
strings.
|
|
|
|
:param support: The frequency with which to consider an item 'frequent'. Default is 1%.
|
|
|
|
The support must be greater than 1e-4.
|
|
|
|
"""
|
|
|
|
if isinstance(cols, tuple):
|
|
|
|
cols = list(cols)
|
|
|
|
if not isinstance(cols, list):
|
|
|
|
raise ValueError("cols must be a list or tuple of column names as strings.")
|
|
|
|
if not support:
|
|
|
|
support = 0.01
|
|
|
|
return DataFrame(self._jdf.stat().freqItems(_to_seq(self._sc, cols), support), self.sql_ctx)
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-14 02:03:22 -05:00
|
|
|
def withColumn(self, colName, col):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns a new :class:`DataFrame` by adding a column.
|
|
|
|
|
|
|
|
:param colName: string, name of the new column.
|
|
|
|
:param col: a :class:`Column` expression for the new column.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-02-14 02:03:22 -05:00
|
|
|
>>> df.withColumn('age2', df.age + 2).collect()
|
2015-02-09 23:49:22 -05:00
|
|
|
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
|
|
|
|
"""
|
|
|
|
return self.select('*', col.alias(colName))
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-14 02:03:22 -05:00
|
|
|
def withColumnRenamed(self, existing, new):
|
2015-05-11 23:04:36 -04:00
|
|
|
"""Returns a new :class:`DataFrame` by renaming an existing column.
|
2015-03-31 21:31:36 -04:00
|
|
|
|
|
|
|
:param existing: string, name of the existing column to rename.
|
|
|
|
:param col: string, new name of the column.
|
2015-02-11 15:13:16 -05:00
|
|
|
|
2015-02-14 02:03:22 -05:00
|
|
|
>>> df.withColumnRenamed('age', 'age2').collect()
|
2015-02-11 15:13:16 -05:00
|
|
|
[Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
|
|
|
|
"""
|
2015-02-19 15:09:44 -05:00
|
|
|
cols = [Column(_to_java_column(c)).alias(new)
|
2015-02-11 15:13:16 -05:00
|
|
|
if c == existing else c
|
|
|
|
for c in self.columns]
|
|
|
|
return self.select(*cols)
|
|
|
|
|
2015-05-11 23:04:36 -04:00
|
|
|
@ignore_unicode_prefix
|
|
|
|
def drop(self, colName):
|
|
|
|
"""Returns a new :class:`DataFrame` that drops the specified column.
|
|
|
|
|
|
|
|
:param colName: string, name of the column to drop.
|
|
|
|
|
|
|
|
>>> df.drop('age').collect()
|
|
|
|
[Row(name=u'Alice'), Row(name=u'Bob')]
|
|
|
|
"""
|
|
|
|
jdf = self._jdf.drop(colName)
|
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
|
2015-02-14 02:03:22 -05:00
|
|
|
def toPandas(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``.
|
|
|
|
|
|
|
|
This is only available if Pandas is installed and available.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-02-14 02:03:22 -05:00
|
|
|
>>> df.toPandas() # doctest: +SKIP
|
2015-02-09 23:49:22 -05:00
|
|
|
age name
|
|
|
|
0 2 Alice
|
|
|
|
1 5 Bob
|
|
|
|
"""
|
|
|
|
import pandas as pd
|
|
|
|
return pd.DataFrame.from_records(self.collect(), columns=self.columns)
|
|
|
|
|
2015-05-11 22:15:14 -04:00
|
|
|
# Pandas compatibility
|
|
|
|
groupby = groupBy
|
|
|
|
drop_duplicates = dropDuplicates
|
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
# Having SchemaRDD for backward compatibility (for docs)
|
|
|
|
class SchemaRDD(DataFrame):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""SchemaRDD is deprecated, please use :class:`DataFrame`.
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def dfapi(f):
|
|
|
|
def _api(self):
|
|
|
|
name = f.__name__
|
|
|
|
jdf = getattr(self._jdf, name)()
|
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
_api.__name__ = f.__name__
|
|
|
|
_api.__doc__ = f.__doc__
|
|
|
|
return _api
|
|
|
|
|
|
|
|
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
def df_varargs_api(f):
|
|
|
|
def _api(self, *args):
|
|
|
|
name = f.__name__
|
2015-04-17 12:29:27 -04:00
|
|
|
jdf = getattr(self._jdf, name)(_to_seq(self.sql_ctx._sc, args))
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
_api.__name__ = f.__name__
|
|
|
|
_api.__doc__ = f.__doc__
|
|
|
|
return _api
|
|
|
|
|
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
class GroupedData(object):
|
|
|
|
"""
|
|
|
|
A set of methods for aggregations on a :class:`DataFrame`,
|
2015-03-31 21:31:36 -04:00
|
|
|
created by :func:`DataFrame.groupBy`.
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, jdf, sql_ctx):
|
|
|
|
self._jdf = jdf
|
|
|
|
self.sql_ctx = sql_ctx
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def agg(self, *exprs):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Compute aggregates and returns the result as a :class:`DataFrame`.
|
|
|
|
|
|
|
|
The available aggregate functions are `avg`, `max`, `min`, `sum`, `count`.
|
|
|
|
|
|
|
|
If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
|
|
|
|
is the column to perform aggregation on, and the value is the aggregate function.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-03-31 21:31:36 -04:00
|
|
|
:param exprs: a dict mapping from column name (string) to aggregate functions (string),
|
|
|
|
or a list of :class:`Column`.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> gdf = df.groupBy(df.name)
|
2015-02-14 02:03:22 -05:00
|
|
|
>>> gdf.agg({"*": "count"}).collect()
|
2015-04-15 16:06:38 -04:00
|
|
|
[Row(name=u'Alice', COUNT(1)=1), Row(name=u'Bob', COUNT(1)=1)]
|
2015-02-14 02:03:22 -05:00
|
|
|
|
|
|
|
>>> from pyspark.sql import functions as F
|
|
|
|
>>> gdf.agg(F.min(df.age)).collect()
|
2015-05-11 14:35:16 -04:00
|
|
|
[Row(name=u'Alice', MIN(age)=2), Row(name=u'Bob', MIN(age)=5)]
|
2015-02-09 23:49:22 -05:00
|
|
|
"""
|
|
|
|
assert exprs, "exprs should not be empty"
|
|
|
|
if len(exprs) == 1 and isinstance(exprs[0], dict):
|
2015-04-21 03:08:18 -04:00
|
|
|
jdf = self._jdf.agg(exprs[0])
|
2015-02-09 23:49:22 -05:00
|
|
|
else:
|
|
|
|
# Columns
|
|
|
|
assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
|
2015-04-17 12:29:27 -04:00
|
|
|
jdf = self._jdf.agg(exprs[0]._jc,
|
|
|
|
_to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]]))
|
2015-02-09 23:49:22 -05:00
|
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
|
|
|
|
@dfapi
|
|
|
|
def count(self):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Counts the number of records for each group.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
>>> df.groupBy(df.age).count().collect()
|
|
|
|
[Row(age=2, count=1), Row(age=5, count=1)]
|
|
|
|
"""
|
|
|
|
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
@df_varargs_api
|
|
|
|
def mean(self, *cols):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Computes average values for each numeric columns for each group.
|
|
|
|
|
|
|
|
:func:`mean` is an alias for :func:`avg`.
|
|
|
|
|
|
|
|
:param cols: list of column names (string). Non-numeric columns are ignored.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
>>> df.groupBy().mean('age').collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(AVG(age)=3.5)]
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
>>> df3.groupBy().mean('age', 'height').collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(AVG(age)=3.5, AVG(height)=82.5)]
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
"""
|
|
|
|
|
|
|
|
@df_varargs_api
|
|
|
|
def avg(self, *cols):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Computes average values for each numeric columns for each group.
|
|
|
|
|
|
|
|
:func:`mean` is an alias for :func:`avg`.
|
|
|
|
|
|
|
|
:param cols: list of column names (string). Non-numeric columns are ignored.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
>>> df.groupBy().avg('age').collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(AVG(age)=3.5)]
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
>>> df3.groupBy().avg('age', 'height').collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(AVG(age)=3.5, AVG(height)=82.5)]
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
"""
|
|
|
|
|
|
|
|
@df_varargs_api
|
|
|
|
def max(self, *cols):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Computes the max value for each numeric columns for each group.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
>>> df.groupBy().max('age').collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(MAX(age)=5)]
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
>>> df3.groupBy().max('age', 'height').collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(MAX(age)=5, MAX(height)=85)]
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
"""
|
|
|
|
|
|
|
|
@df_varargs_api
|
|
|
|
def min(self, *cols):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Computes the min value for each numeric column for each group.
|
|
|
|
|
|
|
|
:param cols: list of column names (string). Non-numeric columns are ignored.
|
2015-02-09 23:49:22 -05:00
|
|
|
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
>>> df.groupBy().min('age').collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(MIN(age)=2)]
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
>>> df3.groupBy().min('age', 'height').collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(MIN(age)=2, MIN(height)=80)]
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
"""
|
|
|
|
|
|
|
|
@df_varargs_api
|
|
|
|
def sum(self, *cols):
|
2015-03-31 21:31:36 -04:00
|
|
|
"""Compute the sum for each numeric columns for each group.
|
|
|
|
|
|
|
|
:param cols: list of column names (string). Non-numeric columns are ignored.
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
|
|
|
|
>>> df.groupBy().sum('age').collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(SUM(age)=7)]
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
>>> df3.groupBy().sum('age', 'height').collect()
|
2015-03-14 03:43:33 -04:00
|
|
|
[Row(SUM(age)=7, SUM(height)=165)]
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
"""
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
|
|
|
|
def _create_column_from_literal(literal):
|
|
|
|
sc = SparkContext._active_spark_context
|
2015-02-14 02:03:22 -05:00
|
|
|
return sc._jvm.functions.lit(literal)
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
|
|
|
|
def _create_column_from_name(name):
|
|
|
|
sc = SparkContext._active_spark_context
|
2015-02-14 02:03:22 -05:00
|
|
|
return sc._jvm.functions.col(name)
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
|
|
|
|
def _to_java_column(col):
|
|
|
|
if isinstance(col, Column):
|
|
|
|
jcol = col._jc
|
|
|
|
else:
|
|
|
|
jcol = _create_column_from_name(col)
|
|
|
|
return jcol
|
|
|
|
|
|
|
|
|
2015-04-17 12:29:27 -04:00
|
|
|
def _to_seq(sc, cols, converter=None):
|
|
|
|
"""
|
|
|
|
Convert a list of Column (or names) into a JVM Seq of Column.
|
|
|
|
|
|
|
|
An optional `converter` could be used to convert items in `cols`
|
|
|
|
into JVM Column objects.
|
|
|
|
"""
|
|
|
|
if converter:
|
|
|
|
cols = [converter(c) for c in cols]
|
2015-04-21 03:08:18 -04:00
|
|
|
return sc._jvm.PythonUtils.toSeq(cols)
|
2015-04-17 12:29:27 -04:00
|
|
|
|
|
|
|
|
2015-05-12 13:23:41 -04:00
|
|
|
def _to_scala_map(sc, jm):
|
|
|
|
"""
|
|
|
|
Convert a dict into a JVM Map.
|
|
|
|
"""
|
|
|
|
return sc._jvm.PythonUtils.toScalaMap(jm)
|
|
|
|
|
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
def _unary_op(name, doc="unary operator"):
|
|
|
|
""" Create a method for given unary operator """
|
|
|
|
def _(self):
|
|
|
|
jc = getattr(self._jc, name)()
|
2015-02-19 15:09:44 -05:00
|
|
|
return Column(jc)
|
2015-02-09 23:49:22 -05:00
|
|
|
_.__doc__ = doc
|
|
|
|
return _
|
|
|
|
|
|
|
|
|
2015-02-14 02:03:22 -05:00
|
|
|
def _func_op(name, doc=''):
|
2015-02-09 23:49:22 -05:00
|
|
|
def _(self):
|
2015-02-19 15:09:44 -05:00
|
|
|
sc = SparkContext._active_spark_context
|
|
|
|
jc = getattr(sc._jvm.functions, name)(self._jc)
|
|
|
|
return Column(jc)
|
2015-02-09 23:49:22 -05:00
|
|
|
_.__doc__ = doc
|
|
|
|
return _
|
|
|
|
|
|
|
|
|
|
|
|
def _bin_op(name, doc="binary operator"):
|
|
|
|
""" Create a method for given binary operator
|
|
|
|
"""
|
|
|
|
def _(self, other):
|
|
|
|
jc = other._jc if isinstance(other, Column) else other
|
|
|
|
njc = getattr(self._jc, name)(jc)
|
2015-02-19 15:09:44 -05:00
|
|
|
return Column(njc)
|
2015-02-09 23:49:22 -05:00
|
|
|
_.__doc__ = doc
|
|
|
|
return _
|
|
|
|
|
|
|
|
|
|
|
|
def _reverse_op(name, doc="binary operator"):
|
|
|
|
""" Create a method for binary operator (this object is on right side)
|
|
|
|
"""
|
|
|
|
def _(self, other):
|
|
|
|
jother = _create_column_from_literal(other)
|
|
|
|
jc = getattr(jother, name)(self._jc)
|
2015-02-19 15:09:44 -05:00
|
|
|
return Column(jc)
|
2015-02-09 23:49:22 -05:00
|
|
|
_.__doc__ = doc
|
|
|
|
return _
|
|
|
|
|
|
|
|
|
2015-02-19 15:09:44 -05:00
|
|
|
class Column(object):
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
"""
|
|
|
|
A column in a DataFrame.
|
|
|
|
|
2015-02-24 23:51:55 -05:00
|
|
|
:class:`Column` instances can be created by::
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
# 1. Select a column out of a DataFrame
|
2015-02-24 23:51:55 -05:00
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
df.colName
|
|
|
|
df["colName"]
|
|
|
|
|
|
|
|
# 2. Create from an expression
|
|
|
|
df.colName + 1
|
|
|
|
1 / df.colName
|
|
|
|
"""
|
|
|
|
|
2015-02-19 15:09:44 -05:00
|
|
|
def __init__(self, jc):
|
2015-02-09 23:49:22 -05:00
|
|
|
self._jc = jc
|
|
|
|
|
|
|
|
# arithmetic operators
|
2015-02-14 02:03:22 -05:00
|
|
|
__neg__ = _func_op("negate")
|
2015-02-09 23:49:22 -05:00
|
|
|
__add__ = _bin_op("plus")
|
|
|
|
__sub__ = _bin_op("minus")
|
|
|
|
__mul__ = _bin_op("multiply")
|
|
|
|
__div__ = _bin_op("divide")
|
2015-04-16 19:20:57 -04:00
|
|
|
__truediv__ = _bin_op("divide")
|
2015-02-09 23:49:22 -05:00
|
|
|
__mod__ = _bin_op("mod")
|
|
|
|
__radd__ = _bin_op("plus")
|
|
|
|
__rsub__ = _reverse_op("minus")
|
|
|
|
__rmul__ = _bin_op("multiply")
|
|
|
|
__rdiv__ = _reverse_op("divide")
|
2015-04-16 19:20:57 -04:00
|
|
|
__rtruediv__ = _reverse_op("divide")
|
2015-02-09 23:49:22 -05:00
|
|
|
__rmod__ = _reverse_op("mod")
|
|
|
|
|
|
|
|
# logistic operators
|
|
|
|
__eq__ = _bin_op("equalTo")
|
|
|
|
__ne__ = _bin_op("notEqual")
|
|
|
|
__lt__ = _bin_op("lt")
|
|
|
|
__le__ = _bin_op("leq")
|
|
|
|
__ge__ = _bin_op("geq")
|
|
|
|
__gt__ = _bin_op("gt")
|
|
|
|
|
|
|
|
# `and`, `or`, `not` cannot be overloaded in Python,
|
|
|
|
# so use bitwise operators as boolean operators
|
|
|
|
__and__ = _bin_op('and')
|
|
|
|
__or__ = _bin_op('or')
|
2015-02-14 02:03:22 -05:00
|
|
|
__invert__ = _func_op('not')
|
2015-02-09 23:49:22 -05:00
|
|
|
__rand__ = _bin_op("and")
|
|
|
|
__ror__ = _bin_op("or")
|
|
|
|
|
|
|
|
# container operators
|
|
|
|
__contains__ = _bin_op("contains")
|
2015-05-08 14:49:38 -04:00
|
|
|
__getitem__ = _bin_op("apply")
|
2015-04-16 20:33:57 -04:00
|
|
|
|
2015-05-07 04:00:29 -04:00
|
|
|
# bitwise operators
|
|
|
|
bitwiseOR = _bin_op("bitwiseOR")
|
|
|
|
bitwiseAND = _bin_op("bitwiseAND")
|
|
|
|
bitwiseXOR = _bin_op("bitwiseXOR")
|
|
|
|
|
2015-04-16 20:33:57 -04:00
|
|
|
def getItem(self, key):
|
|
|
|
"""An expression that gets an item at position `ordinal` out of a list,
|
|
|
|
or gets an item by key out of a dict.
|
|
|
|
|
|
|
|
>>> df = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
|
|
|
|
>>> df.select(df.l.getItem(0), df.d.getItem("key")).show()
|
2015-05-04 15:08:38 -04:00
|
|
|
+----+------+
|
|
|
|
|l[0]|d[key]|
|
|
|
|
+----+------+
|
|
|
|
| 1| value|
|
|
|
|
+----+------+
|
2015-04-16 20:33:57 -04:00
|
|
|
>>> df.select(df.l[0], df.d["key"]).show()
|
2015-05-04 15:08:38 -04:00
|
|
|
+----+------+
|
|
|
|
|l[0]|d[key]|
|
|
|
|
+----+------+
|
|
|
|
| 1| value|
|
|
|
|
+----+------+
|
2015-04-16 20:33:57 -04:00
|
|
|
"""
|
|
|
|
return self[key]
|
|
|
|
|
|
|
|
def getField(self, name):
|
|
|
|
"""An expression that gets a field by name in a StructField.
|
|
|
|
|
|
|
|
>>> from pyspark.sql import Row
|
|
|
|
>>> df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
|
|
|
|
>>> df.select(df.r.getField("b")).show()
|
2015-05-08 14:49:38 -04:00
|
|
|
+----+
|
|
|
|
|r[b]|
|
|
|
|
+----+
|
|
|
|
| b|
|
|
|
|
+----+
|
2015-04-16 20:33:57 -04:00
|
|
|
>>> df.select(df.r.a).show()
|
2015-05-08 14:49:38 -04:00
|
|
|
+----+
|
|
|
|
|r[a]|
|
|
|
|
+----+
|
|
|
|
| 1|
|
|
|
|
+----+
|
2015-04-16 20:33:57 -04:00
|
|
|
"""
|
2015-05-08 14:49:38 -04:00
|
|
|
return self[name]
|
2015-04-16 20:33:57 -04:00
|
|
|
|
|
|
|
def __getattr__(self, item):
|
|
|
|
if item.startswith("__"):
|
|
|
|
raise AttributeError(item)
|
|
|
|
return self.getField(item)
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
# string methods
|
|
|
|
rlike = _bin_op("rlike")
|
|
|
|
like = _bin_op("like")
|
|
|
|
startswith = _bin_op("startsWith")
|
|
|
|
endswith = _bin_op("endsWith")
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def substr(self, startPos, length):
|
|
|
|
"""
|
2015-02-24 23:51:55 -05:00
|
|
|
Return a :class:`Column` which is a substring of the column
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
:param startPos: start position (int or Column)
|
|
|
|
:param length: length of the substring (int or Column)
|
|
|
|
|
2015-02-19 15:09:44 -05:00
|
|
|
>>> df.select(df.name.substr(1, 3).alias("col")).collect()
|
2015-02-09 23:49:22 -05:00
|
|
|
[Row(col=u'Ali'), Row(col=u'Bob')]
|
|
|
|
"""
|
|
|
|
if type(startPos) != type(length):
|
|
|
|
raise TypeError("Can not mix the type")
|
|
|
|
if isinstance(startPos, (int, long)):
|
|
|
|
jc = self._jc.substr(startPos, length)
|
|
|
|
elif isinstance(startPos, Column):
|
|
|
|
jc = self._jc.substr(startPos._jc, length._jc)
|
|
|
|
else:
|
|
|
|
raise TypeError("Unexpected type: %s" % type(startPos))
|
2015-02-19 15:09:44 -05:00
|
|
|
return Column(jc)
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
__getslice__ = substr
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-03-26 03:01:24 -04:00
|
|
|
def inSet(self, *cols):
|
|
|
|
""" A boolean expression that is evaluated to true if the value of this
|
|
|
|
expression is contained by the evaluated values of the arguments.
|
|
|
|
|
|
|
|
>>> df[df.name.inSet("Bob", "Mike")].collect()
|
|
|
|
[Row(age=5, name=u'Bob')]
|
|
|
|
>>> df[df.age.inSet([1, 2, 3])].collect()
|
|
|
|
[Row(age=2, name=u'Alice')]
|
|
|
|
"""
|
|
|
|
if len(cols) == 1 and isinstance(cols[0], (list, set)):
|
|
|
|
cols = cols[0]
|
|
|
|
cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
|
|
|
|
sc = SparkContext._active_spark_context
|
2015-04-17 12:29:27 -04:00
|
|
|
jc = getattr(self._jc, "in")(_to_seq(sc, cols))
|
2015-03-26 03:01:24 -04:00
|
|
|
return Column(jc)
|
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
# order
|
2015-02-24 23:51:55 -05:00
|
|
|
asc = _unary_op("asc", "Returns a sort expression based on the"
|
|
|
|
" ascending order of the given column name.")
|
|
|
|
desc = _unary_op("desc", "Returns a sort expression based on the"
|
|
|
|
" descending order of the given column name.")
|
2015-02-09 23:49:22 -05:00
|
|
|
|
|
|
|
isNull = _unary_op("isNull", "True if the current expression is null.")
|
|
|
|
isNotNull = _unary_op("isNotNull", "True if the current expression is not null.")
|
|
|
|
|
|
|
|
def alias(self, alias):
|
|
|
|
"""Return a alias for this column
|
|
|
|
|
2015-02-19 15:09:44 -05:00
|
|
|
>>> df.select(df.age.alias("age2")).collect()
|
2015-02-09 23:49:22 -05:00
|
|
|
[Row(age2=2), Row(age2=5)]
|
|
|
|
"""
|
2015-02-19 15:09:44 -05:00
|
|
|
return Column(getattr(self._jc, "as")(alias))
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
@ignore_unicode_prefix
|
2015-02-09 23:49:22 -05:00
|
|
|
def cast(self, dataType):
|
|
|
|
""" Convert the column into type `dataType`
|
|
|
|
|
|
|
|
>>> df.select(df.age.cast("string").alias('ages')).collect()
|
|
|
|
[Row(ages=u'2'), Row(ages=u'5')]
|
|
|
|
>>> df.select(df.age.cast(StringType()).alias('ages')).collect()
|
|
|
|
[Row(ages=u'2'), Row(ages=u'5')]
|
|
|
|
"""
|
|
|
|
if isinstance(dataType, basestring):
|
|
|
|
jc = self._jc.cast(dataType)
|
|
|
|
elif isinstance(dataType, DataType):
|
2015-02-19 15:09:44 -05:00
|
|
|
sc = SparkContext._active_spark_context
|
|
|
|
ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc())
|
2015-02-09 23:49:22 -05:00
|
|
|
jdt = ssql_ctx.parseDataType(dataType.json())
|
|
|
|
jc = self._jc.cast(jdt)
|
2015-02-27 23:07:17 -05:00
|
|
|
else:
|
|
|
|
raise TypeError("unexpected type: %s" % type(dataType))
|
2015-02-19 15:09:44 -05:00
|
|
|
return Column(jc)
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-05-05 16:23:53 -04:00
|
|
|
@ignore_unicode_prefix
|
|
|
|
def between(self, lowerBound, upperBound):
|
|
|
|
""" A boolean expression that is evaluated to true if the value of this
|
|
|
|
expression is between the given columns.
|
|
|
|
"""
|
|
|
|
return (self >= lowerBound) & (self <= upperBound)
|
|
|
|
|
2015-02-11 15:13:16 -05:00
|
|
|
def __repr__(self):
|
2015-02-27 23:07:17 -05:00
|
|
|
return 'Column<%s>' % self._jc.toString().encode('utf8')
|
2015-02-11 15:13:16 -05:00
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
|
2015-03-31 03:25:23 -04:00
|
|
|
class DataFrameNaFunctions(object):
|
|
|
|
"""Functionality for working with missing data in :class:`DataFrame`.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, df):
|
|
|
|
self.df = df
|
|
|
|
|
|
|
|
def drop(self, how='any', thresh=None, subset=None):
|
|
|
|
return self.df.dropna(how=how, thresh=thresh, subset=subset)
|
|
|
|
|
|
|
|
drop.__doc__ = DataFrame.dropna.__doc__
|
|
|
|
|
|
|
|
def fill(self, value, subset=None):
|
|
|
|
return self.df.fillna(value=value, subset=subset)
|
|
|
|
|
|
|
|
fill.__doc__ = DataFrame.fillna.__doc__
|
|
|
|
|
|
|
|
|
2015-05-01 16:29:17 -04:00
|
|
|
class DataFrameStatFunctions(object):
|
|
|
|
"""Functionality for statistic functions with :class:`DataFrame`.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, df):
|
|
|
|
self.df = df
|
|
|
|
|
2015-05-04 00:44:39 -04:00
|
|
|
def corr(self, col1, col2, method=None):
|
|
|
|
return self.df.corr(col1, col2, method)
|
|
|
|
|
|
|
|
corr.__doc__ = DataFrame.corr.__doc__
|
|
|
|
|
2015-05-01 16:29:17 -04:00
|
|
|
def cov(self, col1, col2):
|
|
|
|
return self.df.cov(col1, col2)
|
|
|
|
|
|
|
|
cov.__doc__ = DataFrame.cov.__doc__
|
|
|
|
|
2015-05-04 20:02:49 -04:00
|
|
|
def crosstab(self, col1, col2):
|
|
|
|
return self.df.crosstab(col1, col2)
|
|
|
|
|
|
|
|
crosstab.__doc__ = DataFrame.crosstab.__doc__
|
|
|
|
|
2015-05-02 02:43:24 -04:00
|
|
|
def freqItems(self, cols, support=None):
|
|
|
|
return self.df.freqItems(cols, support)
|
|
|
|
|
|
|
|
freqItems.__doc__ = DataFrame.freqItems.__doc__
|
|
|
|
|
2015-05-01 16:29:17 -04:00
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
def _test():
|
|
|
|
import doctest
|
|
|
|
from pyspark.context import SparkContext
|
|
|
|
from pyspark.sql import Row, SQLContext
|
|
|
|
import pyspark.sql.dataframe
|
|
|
|
globs = pyspark.sql.dataframe.__dict__.copy()
|
|
|
|
sc = SparkContext('local[4]', 'PythonTest')
|
|
|
|
globs['sc'] = sc
|
2015-04-08 16:31:45 -04:00
|
|
|
globs['sqlContext'] = SQLContext(sc)
|
2015-02-18 17:17:04 -05:00
|
|
|
globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')])\
|
|
|
|
.toDF(StructType([StructField('age', IntegerType()),
|
|
|
|
StructField('name', StringType())]))
|
2015-02-14 02:03:22 -05:00
|
|
|
globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF()
|
[SPARK-5799][SQL] Compute aggregation function on specified numeric columns
Compute aggregation function on specified numeric columns. For example:
val df = Seq(("a", 1, 0, "b"), ("b", 2, 4, "c"), ("a", 2, 3, "d")).toDataFrame("key", "value1", "value2", "rest")
df.groupBy("key").min("value2")
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #4592 from viirya/specific_cols_agg and squashes the following commits:
9446896 [Liang-Chi Hsieh] For comments.
314c4cd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
353fad7 [Liang-Chi Hsieh] For python unit tests.
54ed0c4 [Liang-Chi Hsieh] Address comments.
b079e6b [Liang-Chi Hsieh] Remove duplicate codes.
55100fb [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
880c2ac [Liang-Chi Hsieh] Fix Python style checks.
4c63a01 [Liang-Chi Hsieh] Fix pyspark.
b1a24fc [Liang-Chi Hsieh] Address comments.
2592f29 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into specific_cols_agg
27069c3 [Liang-Chi Hsieh] Combine functions and add varargs annotation.
371a3f7 [Liang-Chi Hsieh] Compute aggregation function on specified numeric columns.
2015-02-16 13:06:11 -05:00
|
|
|
globs['df3'] = sc.parallelize([Row(name='Alice', age=2, height=80),
|
|
|
|
Row(name='Bob', age=5, height=85)]).toDF()
|
2015-03-30 23:47:10 -04:00
|
|
|
|
|
|
|
globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80),
|
|
|
|
Row(name='Bob', age=5, height=None),
|
|
|
|
Row(name='Tom', age=None, height=None),
|
|
|
|
Row(name=None, age=None, height=None)]).toDF()
|
|
|
|
|
2015-02-09 23:49:22 -05:00
|
|
|
(failure_count, test_count) = doctest.testmod(
|
2015-02-11 15:13:16 -05:00
|
|
|
pyspark.sql.dataframe, globs=globs,
|
2015-02-17 16:48:38 -05:00
|
|
|
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
|
2015-02-09 23:49:22 -05:00
|
|
|
globs['sc'].stop()
|
|
|
|
if failure_count:
|
|
|
|
exit(-1)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
_test()
|