13fd272cd3
### What changes were proposed in this pull request?
This PR intends to fix typos in the sub-modules:
* `R`
* `common`
* `dev`
* `mlib`
* `external`
* `project`
* `streaming`
* `resource-managers`
* `python`
Split per srowen https://github.com/apache/spark/pull/30323#issuecomment-728981618
NOTE: The misspellings have been reported at 706a726f87 (commitcomment-44064356)
### Why are the changes needed?
Misspelled words make it harder to read / understand content.
### Does this PR introduce _any_ user-facing change?
There are various fixes to documentation, etc...
### How was this patch tested?
No testing was performed
Closes #30402 from jsoref/spelling-R_common_dev_mlib_external_project_streaming_resource-managers_python.
Authored-by: Josh Soref <jsoref@users.noreply.github.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
1178 lines
31 KiB
Plaintext
1178 lines
31 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Quickstart\n",
|
|
"\n",
|
|
"This is a short introduction and quickstart for the PySpark DataFrame API. PySpark DataFrames are lazily evaluated. They are implemented on top of [RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview)s. When Spark [transforms](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) data, it does not immediately compute the transformation but plans how to compute later. When [actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) such as `collect()` are explicitly called, the computation starts.\n",
|
|
"This notebook shows the basic usages of the DataFrame, geared mainly for new users. You can run the latest version of these examples by yourself on a live notebook [here](https://mybinder.org/v2/gh/apache/spark/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).\n",
|
|
"\n",
|
|
"There is also other useful information in Apache Spark documentation site, see the latest version of [Spark SQL and DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html), [RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html), [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), [Spark Streaming Programming Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html) and [Machine Learning Library (MLlib) Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n",
|
|
"\n",
|
|
"PySpark applications start with initializing `SparkSession` which is the entry point of PySpark as below. In case of running it in PySpark shell via <code>pyspark</code> executable, the shell automatically creates the session in the variable <code>spark</code> for users."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 1,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from pyspark.sql import SparkSession\n",
|
|
"\n",
|
|
"spark = SparkSession.builder.getOrCreate()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## DataFrame Creation\n",
|
|
"\n",
|
|
"A PySpark DataFrame can be created via `pyspark.sql.SparkSession.createDataFrame` typically by passing a list of lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) and an RDD consisting of such a list.\n",
|
|
"`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to specify the schema of the DataFrame. When it is omitted, PySpark infers the corresponding schema by taking a sample from the data.\n",
|
|
"\n",
|
|
"Firstly, you can create a PySpark DataFrame from a list of rows"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
|
|
]
|
|
},
|
|
"execution_count": 2,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"from datetime import datetime, date\n",
|
|
"import pandas as pd\n",
|
|
"from pyspark.sql import Row\n",
|
|
"\n",
|
|
"df = spark.createDataFrame([\n",
|
|
" Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),\n",
|
|
" Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),\n",
|
|
" Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))\n",
|
|
"])\n",
|
|
"df"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Create a PySpark DataFrame with an explicit schema."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 3,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
|
|
]
|
|
},
|
|
"execution_count": 3,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df = spark.createDataFrame([\n",
|
|
" (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),\n",
|
|
" (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),\n",
|
|
" (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))\n",
|
|
"], schema='a long, b double, c string, d date, e timestamp')\n",
|
|
"df"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Create a PySpark DataFrame from a pandas DataFrame"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 4,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
|
|
]
|
|
},
|
|
"execution_count": 4,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"pandas_df = pd.DataFrame({\n",
|
|
" 'a': [1, 2, 3],\n",
|
|
" 'b': [2., 3., 4.],\n",
|
|
" 'c': ['string1', 'string2', 'string3'],\n",
|
|
" 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],\n",
|
|
" 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]\n",
|
|
"})\n",
|
|
"df = spark.createDataFrame(pandas_df)\n",
|
|
"df"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Create a PySpark DataFrame from an RDD consisting of a list of tuples."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 5,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
|
|
]
|
|
},
|
|
"execution_count": 5,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"rdd = spark.sparkContext.parallelize([\n",
|
|
" (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),\n",
|
|
" (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),\n",
|
|
" (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))\n",
|
|
"])\n",
|
|
"df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])\n",
|
|
"df"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"The DataFrames created above all have the same results and schema."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 6,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"| a| b| c| d| e|\n",
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
|
|
"| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n",
|
|
"| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n",
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"\n",
|
|
"root\n",
|
|
" |-- a: long (nullable = true)\n",
|
|
" |-- b: double (nullable = true)\n",
|
|
" |-- c: string (nullable = true)\n",
|
|
" |-- d: date (nullable = true)\n",
|
|
" |-- e: timestamp (nullable = true)\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"# All DataFrames above result same.\n",
|
|
"df.show()\n",
|
|
"df.printSchema()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Viewing Data\n",
|
|
"\n",
|
|
"The top rows of a DataFrame can be displayed using `DataFrame.show()`."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 7,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"| a| b| c| d| e|\n",
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"only showing top 1 row\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.show(1)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Alternatively, you can enable `spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via `spark.sql.repl.eagerEval.maxNumRows` configuration."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 8,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/html": [
|
|
"<table border='1'>\n",
|
|
"<tr><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th></tr>\n",
|
|
"<tr><td>1</td><td>2.0</td><td>string1</td><td>2000-01-01</td><td>2000-01-01 12:00:00</td></tr>\n",
|
|
"<tr><td>2</td><td>3.0</td><td>string2</td><td>2000-02-01</td><td>2000-01-02 12:00:00</td></tr>\n",
|
|
"<tr><td>3</td><td>4.0</td><td>string3</td><td>2000-03-01</td><td>2000-01-03 12:00:00</td></tr>\n",
|
|
"</table>\n"
|
|
],
|
|
"text/plain": [
|
|
"DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
|
|
]
|
|
},
|
|
"execution_count": 8,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n",
|
|
"df"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"The rows can also be shown vertically. This is useful when rows are too long to show horizontally."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 9,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"-RECORD 0------------------\n",
|
|
" a | 1 \n",
|
|
" b | 2.0 \n",
|
|
" c | string1 \n",
|
|
" d | 2000-01-01 \n",
|
|
" e | 2000-01-01 12:00:00 \n",
|
|
"only showing top 1 row\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.show(1, vertical=True)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"You can see the DataFrame's schema and column names as follows:"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 10,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"['a', 'b', 'c', 'd', 'e']"
|
|
]
|
|
},
|
|
"execution_count": 10,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df.columns"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 11,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"root\n",
|
|
" |-- a: long (nullable = true)\n",
|
|
" |-- b: double (nullable = true)\n",
|
|
" |-- c: string (nullable = true)\n",
|
|
" |-- d: date (nullable = true)\n",
|
|
" |-- e: timestamp (nullable = true)\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.printSchema()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Show the summary of the DataFrame"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 12,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-------+---+---+-------+\n",
|
|
"|summary| a| b| c|\n",
|
|
"+-------+---+---+-------+\n",
|
|
"| count| 3| 3| 3|\n",
|
|
"| mean|2.0|3.0| null|\n",
|
|
"| stddev|1.0|1.0| null|\n",
|
|
"| min| 1|2.0|string1|\n",
|
|
"| max| 3|4.0|string3|\n",
|
|
"+-------+---+---+-------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.select(\"a\", \"b\", \"c\").describe().show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"`DataFrame.collect()` collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 13,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
|
|
" Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
|
|
" Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]"
|
|
]
|
|
},
|
|
"execution_count": 13,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df.collect()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"In order to avoid throwing an out-of-memory exception, use `DataFrame.take()` or `DataFrame.tail()`."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 14,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]"
|
|
]
|
|
},
|
|
"execution_count": 14,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df.take(1)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"PySpark DataFrame also provides the conversion back to a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) to leverage pandas APIs. Note that `toPandas` also collects all data into the driver side that can easily cause an out-of-memory-error when the data is too large to fit into the driver side."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 15,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/html": [
|
|
"<div>\n",
|
|
"<style scoped>\n",
|
|
" .dataframe tbody tr th:only-of-type {\n",
|
|
" vertical-align: middle;\n",
|
|
" }\n",
|
|
"\n",
|
|
" .dataframe tbody tr th {\n",
|
|
" vertical-align: top;\n",
|
|
" }\n",
|
|
"\n",
|
|
" .dataframe thead th {\n",
|
|
" text-align: right;\n",
|
|
" }\n",
|
|
"</style>\n",
|
|
"<table border=\"1\" class=\"dataframe\">\n",
|
|
" <thead>\n",
|
|
" <tr style=\"text-align: right;\">\n",
|
|
" <th></th>\n",
|
|
" <th>a</th>\n",
|
|
" <th>b</th>\n",
|
|
" <th>c</th>\n",
|
|
" <th>d</th>\n",
|
|
" <th>e</th>\n",
|
|
" </tr>\n",
|
|
" </thead>\n",
|
|
" <tbody>\n",
|
|
" <tr>\n",
|
|
" <th>0</th>\n",
|
|
" <td>1</td>\n",
|
|
" <td>2.0</td>\n",
|
|
" <td>string1</td>\n",
|
|
" <td>2000-01-01</td>\n",
|
|
" <td>2000-01-01 12:00:00</td>\n",
|
|
" </tr>\n",
|
|
" <tr>\n",
|
|
" <th>1</th>\n",
|
|
" <td>2</td>\n",
|
|
" <td>3.0</td>\n",
|
|
" <td>string2</td>\n",
|
|
" <td>2000-02-01</td>\n",
|
|
" <td>2000-01-02 12:00:00</td>\n",
|
|
" </tr>\n",
|
|
" <tr>\n",
|
|
" <th>2</th>\n",
|
|
" <td>3</td>\n",
|
|
" <td>4.0</td>\n",
|
|
" <td>string3</td>\n",
|
|
" <td>2000-03-01</td>\n",
|
|
" <td>2000-01-03 12:00:00</td>\n",
|
|
" </tr>\n",
|
|
" </tbody>\n",
|
|
"</table>\n",
|
|
"</div>"
|
|
],
|
|
"text/plain": [
|
|
" a b c d e\n",
|
|
"0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00\n",
|
|
"1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00\n",
|
|
"2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00"
|
|
]
|
|
},
|
|
"execution_count": 15,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df.toPandas()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Selecting and Accessing Data\n",
|
|
"\n",
|
|
"PySpark DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a `Column` instance."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 16,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"Column<b'a'>"
|
|
]
|
|
},
|
|
"execution_count": 16,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df.a"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"In fact, most of column-wise operations return `Column`s."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 17,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"True"
|
|
]
|
|
},
|
|
"execution_count": 17,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"from pyspark.sql import Column\n",
|
|
"from pyspark.sql.functions import upper\n",
|
|
"\n",
|
|
"type(df.c) == type(upper(df.c)) == type(df.c.isNull())"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"These `Column`s can be used to select the columns from a DataFrame. For example, `DataFrame.select()` takes the `Column` instances that returns another DataFrame."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 18,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-------+\n",
|
|
"| c|\n",
|
|
"+-------+\n",
|
|
"|string1|\n",
|
|
"|string2|\n",
|
|
"|string3|\n",
|
|
"+-------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.select(df.c).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Assign new `Column` instance."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 19,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+---+---+-------+----------+-------------------+-------+\n",
|
|
"| a| b| c| d| e|upper_c|\n",
|
|
"+---+---+-------+----------+-------------------+-------+\n",
|
|
"| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|\n",
|
|
"| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|\n",
|
|
"| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|\n",
|
|
"+---+---+-------+----------+-------------------+-------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.withColumn('upper_c', upper(df.c)).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"To select a subset of rows, use `DataFrame.filter()`."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 20,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"| a| b| c| d| e|\n",
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.filter(df.a == 1).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Applying a Function\n",
|
|
"\n",
|
|
"PySpark supports various UDFs and APIs to allow users to execute Python native functions. See also the latest [Pandas UDFs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs) and [Pandas Function APIs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-function-apis). For instance, the example below allows users to directly use the APIs in [a pandas Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html) within Python native function."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 21,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+------------------+\n",
|
|
"|pandas_plus_one(a)|\n",
|
|
"+------------------+\n",
|
|
"| 2|\n",
|
|
"| 3|\n",
|
|
"| 4|\n",
|
|
"+------------------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"import pandas\n",
|
|
"from pyspark.sql.functions import pandas_udf\n",
|
|
"\n",
|
|
"@pandas_udf('long')\n",
|
|
"def pandas_plus_one(series: pd.Series) -> pd.Series:\n",
|
|
" # Simply plus one by using pandas Series.\n",
|
|
" return series + 1\n",
|
|
"\n",
|
|
"df.select(pandas_plus_one(df.a)).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Another example is `DataFrame.mapInPandas` which allows users directly use the APIs in a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) without any restrictions such as the result length."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 22,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"| a| b| c| d| e|\n",
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
|
|
"+---+---+-------+----------+-------------------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"def pandas_filter_func(iterator):\n",
|
|
" for pandas_df in iterator:\n",
|
|
" yield pandas_df[pandas_df.a == 1]\n",
|
|
"\n",
|
|
"df.mapInPandas(pandas_filter_func, schema=df.schema).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Grouping Data\n",
|
|
"\n",
|
|
"PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy.\n",
|
|
"It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 23,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----+------+---+---+\n",
|
|
"|color| fruit| v1| v2|\n",
|
|
"+-----+------+---+---+\n",
|
|
"| red|banana| 1| 10|\n",
|
|
"| blue|banana| 2| 20|\n",
|
|
"| red|carrot| 3| 30|\n",
|
|
"| blue| grape| 4| 40|\n",
|
|
"| red|carrot| 5| 50|\n",
|
|
"|black|carrot| 6| 60|\n",
|
|
"| red|banana| 7| 70|\n",
|
|
"| red| grape| 8| 80|\n",
|
|
"+-----+------+---+---+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df = spark.createDataFrame([\n",
|
|
" ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],\n",
|
|
" ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],\n",
|
|
" ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])\n",
|
|
"df.show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Grouping and then applying the `avg()` function to the resulting groups."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 24,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----+-------+-------+\n",
|
|
"|color|avg(v1)|avg(v2)|\n",
|
|
"+-----+-------+-------+\n",
|
|
"| red| 4.8| 48.0|\n",
|
|
"|black| 6.0| 60.0|\n",
|
|
"| blue| 3.0| 30.0|\n",
|
|
"+-----+-------+-------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.groupby('color').avg().show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"You can also apply a Python native function against each group by using pandas APIs."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 25,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----+------+---+---+\n",
|
|
"|color| fruit| v1| v2|\n",
|
|
"+-----+------+---+---+\n",
|
|
"| red|banana| -3| 10|\n",
|
|
"| red|carrot| -1| 30|\n",
|
|
"| red|carrot| 0| 50|\n",
|
|
"| red|banana| 2| 70|\n",
|
|
"| red| grape| 3| 80|\n",
|
|
"|black|carrot| 0| 60|\n",
|
|
"| blue|banana| -1| 20|\n",
|
|
"| blue| grape| 1| 40|\n",
|
|
"+-----+------+---+---+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"def plus_mean(pandas_df):\n",
|
|
" return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())\n",
|
|
"\n",
|
|
"df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Co-grouping and applying a function."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 26,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+--------+---+---+---+\n",
|
|
"| time| id| v1| v2|\n",
|
|
"+--------+---+---+---+\n",
|
|
"|20000101| 1|1.0| x|\n",
|
|
"|20000102| 1|3.0| x|\n",
|
|
"|20000101| 2|2.0| y|\n",
|
|
"|20000102| 2|4.0| y|\n",
|
|
"+--------+---+---+---+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df1 = spark.createDataFrame(\n",
|
|
" [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],\n",
|
|
" ('time', 'id', 'v1'))\n",
|
|
"\n",
|
|
"df2 = spark.createDataFrame(\n",
|
|
" [(20000101, 1, 'x'), (20000101, 2, 'y')],\n",
|
|
" ('time', 'id', 'v2'))\n",
|
|
"\n",
|
|
"def asof_join(l, r):\n",
|
|
" return pd.merge_asof(l, r, on='time', by='id')\n",
|
|
"\n",
|
|
"df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(\n",
|
|
" asof_join, schema='time int, id int, v1 double, v2 string').show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Getting Data in/out\n",
|
|
"\n",
|
|
"CSV is straightforward and easy to use. Parquet and ORC are efficient and compact file formats to read and write faster.\n",
|
|
"\n",
|
|
"There are many other data sources available in PySpark such as JDBC, text, binaryFile, Avro, etc. See also the latest [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) in Apache Spark documentation."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### CSV"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 27,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----+------+---+---+\n",
|
|
"|color| fruit| v1| v2|\n",
|
|
"+-----+------+---+---+\n",
|
|
"| red|banana| 1| 10|\n",
|
|
"| blue|banana| 2| 20|\n",
|
|
"| red|carrot| 3| 30|\n",
|
|
"| blue| grape| 4| 40|\n",
|
|
"| red|carrot| 5| 50|\n",
|
|
"|black|carrot| 6| 60|\n",
|
|
"| red|banana| 7| 70|\n",
|
|
"| red| grape| 8| 80|\n",
|
|
"+-----+------+---+---+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.write.csv('foo.csv', header=True)\n",
|
|
"spark.read.csv('foo.csv', header=True).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Parquet"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 28,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----+------+---+---+\n",
|
|
"|color| fruit| v1| v2|\n",
|
|
"+-----+------+---+---+\n",
|
|
"| red|banana| 1| 10|\n",
|
|
"| blue|banana| 2| 20|\n",
|
|
"| red|carrot| 3| 30|\n",
|
|
"| blue| grape| 4| 40|\n",
|
|
"| red|carrot| 5| 50|\n",
|
|
"|black|carrot| 6| 60|\n",
|
|
"| red|banana| 7| 70|\n",
|
|
"| red| grape| 8| 80|\n",
|
|
"+-----+------+---+---+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.write.parquet('bar.parquet')\n",
|
|
"spark.read.parquet('bar.parquet').show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### ORC"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 29,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----+------+---+---+\n",
|
|
"|color| fruit| v1| v2|\n",
|
|
"+-----+------+---+---+\n",
|
|
"| red|banana| 1| 10|\n",
|
|
"| blue|banana| 2| 20|\n",
|
|
"| red|carrot| 3| 30|\n",
|
|
"| blue| grape| 4| 40|\n",
|
|
"| red|carrot| 5| 50|\n",
|
|
"|black|carrot| 6| 60|\n",
|
|
"| red|banana| 7| 70|\n",
|
|
"| red| grape| 8| 80|\n",
|
|
"+-----+------+---+---+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.write.orc('zoo.orc')\n",
|
|
"spark.read.orc('zoo.orc').show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Working with SQL\n",
|
|
"\n",
|
|
"DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 30,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+--------+\n",
|
|
"|count(1)|\n",
|
|
"+--------+\n",
|
|
"| 8|\n",
|
|
"+--------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df.createOrReplaceTempView(\"tableA\")\n",
|
|
"spark.sql(\"SELECT count(*) from tableA\").show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"In addition, UDFs can be registered and invoked in SQL out of the box:"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 31,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----------+\n",
|
|
"|add_one(v1)|\n",
|
|
"+-----------+\n",
|
|
"| 2|\n",
|
|
"| 3|\n",
|
|
"| 4|\n",
|
|
"| 5|\n",
|
|
"| 6|\n",
|
|
"| 7|\n",
|
|
"| 8|\n",
|
|
"| 9|\n",
|
|
"+-----------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"@pandas_udf(\"integer\")\n",
|
|
"def add_one(s: pd.Series) -> pd.Series:\n",
|
|
" return s + 1\n",
|
|
"\n",
|
|
"spark.udf.register(\"add_one\", add_one)\n",
|
|
"spark.sql(\"SELECT add_one(v1) FROM tableA\").show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"These SQL expressions can directly be mixed and used as PySpark columns."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 32,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----------+\n",
|
|
"|add_one(v1)|\n",
|
|
"+-----------+\n",
|
|
"| 2|\n",
|
|
"| 3|\n",
|
|
"| 4|\n",
|
|
"| 5|\n",
|
|
"| 6|\n",
|
|
"| 7|\n",
|
|
"| 8|\n",
|
|
"| 9|\n",
|
|
"+-----------+\n",
|
|
"\n",
|
|
"+--------------+\n",
|
|
"|(count(1) > 0)|\n",
|
|
"+--------------+\n",
|
|
"| true|\n",
|
|
"+--------------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"from pyspark.sql.functions import expr\n",
|
|
"\n",
|
|
"df.selectExpr('add_one(v1)').show()\n",
|
|
"df.select(expr('count(*)') > 0).show()"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.7.8"
|
|
},
|
|
"name": "quickstart",
|
|
"notebookId": 1927513300154480
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 1
|
|
}
|