e44697606f
## What changes were proposed in this pull request? Integrate Apache Arrow with Spark to increase performance of `DataFrame.toPandas`. This has been done by using Arrow to convert data partitions on the executor JVM to Arrow payload byte arrays where they are then served to the Python process. The Python DataFrame can then collect the Arrow payloads where they are combined and converted to a Pandas DataFrame. All non-complex data types are currently supported, otherwise an `UnsupportedOperation` exception is thrown. Additions to Spark include a Scala package private method `Dataset.toArrowPayloadBytes` that will convert data partitions in the executor JVM to `ArrowPayload`s as byte arrays so they can be easily served. A package private class/object `ArrowConverters` that provide data type mappings and conversion routines. In Python, a public method `DataFrame.collectAsArrow` is added to collect Arrow payloads and an optional flag in `toPandas(useArrow=False)` to enable using Arrow (uses the old conversion by default). ## How was this patch tested? Added a new test suite `ArrowConvertersSuite` that will run tests on conversion of Datasets to Arrow payloads for supported types. The suite will generate a Dataset and matching Arrow JSON data, then the dataset is converted to an Arrow payload and finally validated against the JSON data. This will ensure that the schema and data has been converted correctly. Added PySpark tests to verify the `toPandas` method is producing equal DataFrames with and without pyarrow. A roundtrip test to ensure the pandas DataFrame produced by pyspark is equal to a one made directly with pandas. Author: Bryan Cutler <cutlerb@gmail.com> Author: Li Jin <ice.xelloss@gmail.com> Author: Li Jin <li.jin@twosigma.com> Author: Wes McKinney <wes.mckinney@twosigma.com> Closes #15821 from BryanCutler/wip-toPandas_with_arrow-SPARK-13534.
143 lines
4.9 KiB
Bash
Executable file
143 lines
4.9 KiB
Bash
Executable file
#!/usr/bin/env bash
|
|
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
# Stop on error
|
|
set -e
|
|
# Set nullglob for when we are checking existence based on globs
|
|
shopt -s nullglob
|
|
|
|
FWDIR="$(cd "$(dirname "$0")"/..; pwd)"
|
|
cd "$FWDIR"
|
|
|
|
echo "Constucting virtual env for testing"
|
|
VIRTUALENV_BASE=$(mktemp -d)
|
|
|
|
# Clean up the virtual env enviroment used if we created one.
|
|
function delete_virtualenv() {
|
|
echo "Cleaning up temporary directory - $VIRTUALENV_BASE"
|
|
rm -rf "$VIRTUALENV_BASE"
|
|
}
|
|
trap delete_virtualenv EXIT
|
|
|
|
PYTHON_EXECS=()
|
|
# Some systems don't have pip or virtualenv - in those cases our tests won't work.
|
|
if hash virtualenv 2>/dev/null && [ ! -n "$USE_CONDA" ]; then
|
|
echo "virtualenv installed - using. Note if this is a conda virtual env you may wish to set USE_CONDA"
|
|
# Figure out which Python execs we should test pip installation with
|
|
if hash python2 2>/dev/null; then
|
|
# We do this since we are testing with virtualenv and the default virtual env python
|
|
# is in /usr/bin/python
|
|
PYTHON_EXECS+=('python2')
|
|
elif hash python 2>/dev/null; then
|
|
# If python2 isn't installed fallback to python if available
|
|
PYTHON_EXECS+=('python')
|
|
fi
|
|
if hash python3 2>/dev/null; then
|
|
PYTHON_EXECS+=('python3')
|
|
fi
|
|
elif hash conda 2>/dev/null; then
|
|
echo "Using conda virtual enviroments"
|
|
PYTHON_EXECS=('3.5')
|
|
USE_CONDA=1
|
|
else
|
|
echo "Missing virtualenv & conda, skipping pip installability tests"
|
|
exit 0
|
|
fi
|
|
if ! hash pip 2>/dev/null; then
|
|
echo "Missing pip, skipping pip installability tests."
|
|
exit 0
|
|
fi
|
|
|
|
# Determine which version of PySpark we are building for archive name
|
|
PYSPARK_VERSION=$(python3 -c "exec(open('python/pyspark/version.py').read());print(__version__)")
|
|
PYSPARK_DIST="$FWDIR/python/dist/pyspark-$PYSPARK_VERSION.tar.gz"
|
|
# The pip install options we use for all the pip commands
|
|
PIP_OPTIONS="--upgrade --no-cache-dir --force-reinstall "
|
|
# Test both regular user and edit/dev install modes.
|
|
PIP_COMMANDS=("pip install $PIP_OPTIONS $PYSPARK_DIST"
|
|
"pip install $PIP_OPTIONS -e python/")
|
|
|
|
for python in "${PYTHON_EXECS[@]}"; do
|
|
for install_command in "${PIP_COMMANDS[@]}"; do
|
|
echo "Testing pip installation with python $python"
|
|
# Create a temp directory for us to work in and save its name to a file for cleanup
|
|
echo "Using $VIRTUALENV_BASE for virtualenv"
|
|
VIRTUALENV_PATH="$VIRTUALENV_BASE"/$python
|
|
rm -rf "$VIRTUALENV_PATH"
|
|
if [ -n "$USE_CONDA" ]; then
|
|
conda create -y -p "$VIRTUALENV_PATH" python=$python numpy pandas pip setuptools
|
|
source activate "$VIRTUALENV_PATH"
|
|
conda install -y -c conda-forge pyarrow=0.4.0
|
|
TEST_PYARROW=1
|
|
else
|
|
mkdir -p "$VIRTUALENV_PATH"
|
|
virtualenv --python=$python "$VIRTUALENV_PATH"
|
|
source "$VIRTUALENV_PATH"/bin/activate
|
|
fi
|
|
# Upgrade pip & friends if using virutal env
|
|
if [ ! -n "USE_CONDA" ]; then
|
|
pip install --upgrade pip pypandoc wheel numpy
|
|
fi
|
|
|
|
echo "Creating pip installable source dist"
|
|
cd "$FWDIR"/python
|
|
# Delete the egg info file if it exists, this can cache the setup file.
|
|
rm -rf pyspark.egg-info || echo "No existing egg info file, skipping deletion"
|
|
python setup.py sdist
|
|
|
|
|
|
echo "Installing dist into virtual env"
|
|
cd dist
|
|
# Verify that the dist directory only contains one thing to install
|
|
sdists=(*.tar.gz)
|
|
if [ ${#sdists[@]} -ne 1 ]; then
|
|
echo "Unexpected number of targets found in dist directory - please cleanup existing sdists first."
|
|
exit -1
|
|
fi
|
|
# Do the actual installation
|
|
cd "$FWDIR"
|
|
$install_command
|
|
|
|
cd /
|
|
|
|
echo "Run basic sanity check on pip installed version with spark-submit"
|
|
spark-submit "$FWDIR"/dev/pip-sanity-check.py
|
|
echo "Run basic sanity check with import based"
|
|
python "$FWDIR"/dev/pip-sanity-check.py
|
|
echo "Run the tests for context.py"
|
|
python "$FWDIR"/python/pyspark/context.py
|
|
if [ -n "$TEST_PYARROW" ]; then
|
|
echo "Run tests for pyarrow"
|
|
SPARK_TESTING=1 "$FWDIR"/bin/pyspark pyspark.sql.tests ArrowTests
|
|
fi
|
|
|
|
cd "$FWDIR"
|
|
|
|
# conda / virtualenv enviroments need to be deactivated differently
|
|
if [ -n "$USE_CONDA" ]; then
|
|
source deactivate
|
|
else
|
|
deactivate
|
|
fi
|
|
|
|
done
|
|
done
|
|
|
|
exit 0
|