678294ddc2
### What changes were proposed in this pull request? This PR proposes to clarify: - `PYSPARK_DRIVER_PYTHON` should not be set for cluster modes in YARN and Kubernates. - `spark.yarn.appMasterEnv.PYSPARK_PYTHON` is not required in YARN. This is just another way to set `PYSPARK_PYTHON` that is specific for a Spark application. ### Why are the changes needed? To clarify what's required and not. ### Does this PR introduce _any_ user-facing change? Yes, this is a user-facing doc change. ### How was this patch tested? Manually tested. Note that this credits to gaborgsomogyi who actually tested and raised a doubt about this offline to me. I also manually tested all again to double check. Closes #30938 from HyukjinKwon/SPARK-33824-followup. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
253 lines
9.3 KiB
ReStructuredText
253 lines
9.3 KiB
ReStructuredText
.. Licensed to the Apache Software Foundation (ASF) under one
|
||
or more contributor license agreements. See the NOTICE file
|
||
distributed with this work for additional information
|
||
regarding copyright ownership. The ASF licenses this file
|
||
to you under the Apache License, Version 2.0 (the
|
||
"License"); you may not use this file except in compliance
|
||
with the License. You may obtain a copy of the License at
|
||
|
||
.. http://www.apache.org/licenses/LICENSE-2.0
|
||
|
||
.. Unless required by applicable law or agreed to in writing,
|
||
software distributed under the License is distributed on an
|
||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||
KIND, either express or implied. See the License for the
|
||
specific language governing permissions and limitations
|
||
under the License.
|
||
|
||
|
||
=========================
|
||
Python Package Management
|
||
=========================
|
||
|
||
When you want to run your PySpark application on a cluster such as YARN, Kubernetes, Mesos, etc., you need to make
|
||
sure that your code and all used libraries are available on the executors.
|
||
|
||
As an example let's say you may want to run the `Pandas UDF's examples <arrow_pandas.rst#series-to-scalar>`_.
|
||
As it uses pyarrow as an underlying implementation we need to make sure to have pyarrow installed on each executor
|
||
on the cluster. Otherwise you may get errors such as ``ModuleNotFoundError: No module named 'pyarrow'``.
|
||
|
||
Here is the script ``app.py`` from the previous example that will be executed on the cluster:
|
||
|
||
.. code-block:: python
|
||
|
||
import pandas as pd
|
||
from pyspark.sql.functions import pandas_udf
|
||
from pyspark.sql import SparkSession
|
||
|
||
def main(spark):
|
||
df = spark.createDataFrame(
|
||
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
|
||
("id", "v"))
|
||
|
||
@pandas_udf("double")
|
||
def mean_udf(v: pd.Series) -> float:
|
||
return v.mean()
|
||
|
||
print(df.groupby("id").agg(mean_udf(df['v'])).collect())
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main(SparkSession.builder.getOrCreate())
|
||
|
||
|
||
There are multiple ways to manage Python dependencies in the cluster:
|
||
|
||
- Using PySpark Native Features
|
||
- Using Conda
|
||
- Using Virtualenv
|
||
- Using PEX
|
||
|
||
|
||
Using PySpark Native Features
|
||
-----------------------------
|
||
|
||
PySpark allows to upload Python files (``.py``), zipped Python packages (``.zip``), and Egg files (``.egg``)
|
||
to the executors by:
|
||
|
||
- Setting the configuration setting ``spark.submit.pyFiles``
|
||
- Setting ``--py-files`` option in Spark scripts
|
||
- Directly calling :meth:`pyspark.SparkContext.addPyFile` in applications
|
||
|
||
This is a straightforward method to ship additional custom Python code to the cluster. You can just add individual files or zip whole
|
||
packages and upload them. Using :meth:`pyspark.SparkContext.addPyFile` allows to upload code even after having started your job.
|
||
|
||
However, it does not allow to add packages built as `Wheels <https://www.python.org/dev/peps/pep-0427/>`_ and therefore
|
||
does not allow to include dependencies with native code.
|
||
|
||
|
||
Using Conda
|
||
-----------
|
||
|
||
`Conda <https://docs.conda.io/en/latest/>`_ is one of the most widely-used Python package management systems. PySpark users can directly
|
||
use a Conda environment to ship their third-party Python packages by leveraging
|
||
`conda-pack <https://conda.github.io/conda-pack/spark.html>`_ which is a command line tool creating
|
||
relocatable Conda environments.
|
||
|
||
The example below creates a Conda environment to use on both the driver and executor and packs
|
||
it into an archive file. This archive file captures the Conda environment for Python and stores
|
||
both Python interpreter and all its relevant dependencies.
|
||
|
||
.. code-block:: bash
|
||
|
||
conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
|
||
conda activate pyspark_conda_env
|
||
conda pack -f -o pyspark_conda_env.tar.gz
|
||
|
||
After that, you can ship it together with scripts or in the code by using the ``--archives`` option
|
||
or ``spark.archives`` configuration (``spark.yarn.dist.archives`` in YARN). It automatically unpacks the archive on executors.
|
||
|
||
In the case of a ``spark-submit`` script, you can use it as follows:
|
||
|
||
.. code-block:: bash
|
||
|
||
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
|
||
export PYSPARK_PYTHON=./environment/bin/python
|
||
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py
|
||
|
||
Note that ``PYSPARK_DRIVER_PYTHON`` above should not be set for cluster modes in YARN or Kubernetes.
|
||
|
||
If you’re on a regular Python shell or notebook, you can try it as shown below:
|
||
|
||
.. code-block:: python
|
||
|
||
import os
|
||
from pyspark.sql import SparkSession
|
||
from app import main
|
||
|
||
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
|
||
spark = SparkSession.builder.config(
|
||
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
|
||
"pyspark_conda_env.tar.gz#environment").getOrCreate()
|
||
main(spark)
|
||
|
||
For a pyspark shell:
|
||
|
||
.. code-block:: bash
|
||
|
||
export PYSPARK_DRIVER_PYTHON=python
|
||
export PYSPARK_PYTHON=./environment/bin/python
|
||
pyspark --archives pyspark_conda_env.tar.gz#environment
|
||
|
||
|
||
Using Virtualenv
|
||
----------------
|
||
|
||
`Virtualenv <https://virtualenv.pypa.io/en/latest/>`_ is a Python tool to create isolated Python environments.
|
||
Since Python 3.3, a subset of its features has been integrated into Python as a standard library under
|
||
the `venv <https://docs.python.org/3/library/venv.html>`_ module. PySpark users can use virtualenv to manage
|
||
Python dependencies in their clusters by using `venv-pack <https://jcristharif.com/venv-pack/index.html>`_
|
||
in a similar way as conda-pack.
|
||
|
||
A virtual environment to use on both driver and executor can be created as demonstrated below.
|
||
It packs the current virtual environment to an archive file, and It self-contains both Python interpreter
|
||
and the dependencies.
|
||
|
||
|
||
.. code-block:: bash
|
||
|
||
python -m venv pyspark_venv
|
||
source pyspark_venv/bin/activate
|
||
pip install pyarrow pandas venv-pack
|
||
venv-pack -o pyspark_venv.tar.gz
|
||
|
||
You can directly pass/unpack the archive file and enable the environment on executors by leveraging
|
||
the ``--archives`` option or ``spark.archives`` configuration (``spark.yarn.dist.archives`` in YARN).
|
||
|
||
For ``spark-submit``, you can use it by running the command as follows. Also, notice that
|
||
``PYSPARK_DRIVER_PYTHON`` has to be unset in Kubernetes or YARN cluster modes.
|
||
|
||
.. code-block:: bash
|
||
|
||
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
|
||
export PYSPARK_PYTHON=./environment/bin/python
|
||
spark-submit --archives pyspark_venv.tar.gz#environment app.py
|
||
|
||
For regular Python shells or notebooks:
|
||
|
||
.. code-block:: bash
|
||
|
||
import os
|
||
from pyspark.sql import SparkSession
|
||
from app import main
|
||
|
||
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
|
||
spark = SparkSession.builder.config(
|
||
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
|
||
"pyspark_venv.tar.gz#environment").getOrCreate()
|
||
main(spark)
|
||
|
||
In the case of a pyspark shell:
|
||
|
||
.. code-block:: bash
|
||
|
||
export PYSPARK_DRIVER_PYTHON=python
|
||
export PYSPARK_PYTHON=./environment/bin/python
|
||
pyspark --archives pyspark_venv.tar.gz#environment
|
||
|
||
|
||
Using PEX
|
||
---------
|
||
|
||
PySpark can also use `PEX <https://github.com/pantsbuild/pex>`_ to ship the Python packages
|
||
together. PEX is a tool that creates a self-contained Python environment. This is similar
|
||
to Conda or virtualenv, but a ``.pex`` file is executable by itself.
|
||
|
||
The following example creates a ``.pex`` file for the driver and executor to use.
|
||
The file contains the Python dependencies specified with the ``pex`` command.
|
||
|
||
.. code-block:: bash
|
||
|
||
pip install pyarrow pandas pex
|
||
pex pyspark pyarrow pandas -o pyspark_pex_env.pex
|
||
|
||
This file behaves similarly with a regular Python interpreter.
|
||
|
||
.. code-block:: bash
|
||
|
||
./pyspark_pex_env.pex -c "import pandas; print(pandas.__version__)"
|
||
1.1.5
|
||
|
||
However, ``.pex`` file does not include a Python interpreter itself under the hood so all
|
||
nodes in a cluster should have the same Python interpreter installed.
|
||
|
||
In order to transfer and use the ``.pex`` file in a cluster, you should ship it via the
|
||
``spark.files`` configuration (``spark.yarn.dist.files`` in YARN) or ``--files`` option because they are regular files instead
|
||
of directories or archive files.
|
||
|
||
For application submission, you run the commands as shown below.
|
||
Note that ``PYSPARK_DRIVER_PYTHON`` should not be set for cluster modes in YARN or Kubernetes.
|
||
|
||
.. code-block:: bash
|
||
|
||
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
|
||
export PYSPARK_PYTHON=./pyspark_pex_env.pex
|
||
spark-submit --files pyspark_pex_env.pex app.py
|
||
|
||
For regular Python shells or notebooks:
|
||
|
||
.. code-block:: python
|
||
|
||
import os
|
||
from pyspark.sql import SparkSession
|
||
from app import main
|
||
|
||
os.environ['PYSPARK_PYTHON'] = "./pyspark_pex_env.pex"
|
||
spark = SparkSession.builder.config(
|
||
"spark.files", # 'spark.yarn.dist.files' in YARN.
|
||
"pyspark_pex_env.pex").getOrCreate()
|
||
main(spark)
|
||
|
||
For the interactive pyspark shell, the commands are almost the same:
|
||
|
||
.. code-block:: bash
|
||
|
||
export PYSPARK_DRIVER_PYTHON=python
|
||
export PYSPARK_PYTHON=./pyspark_pex_env.pex
|
||
pyspark --files pyspark_pex_env.pex
|
||
|
||
An end-to-end Docker example for deploying a standalone PySpark with ``SparkSession.builder`` and PEX
|
||
can be found `here <https://github.com/criteo/cluster-pack/blob/master/examples/spark-with-S3/README.md>`_
|
||
- it uses cluster-pack, a library on top of PEX that automatizes the the intermediate step of having
|
||
to create & upload the PEX manually.
|