a71dd6af2f
### What changes were proposed in this pull request? This PR proposes to use Python 3.9 in documentation and linter at GitHub Actions. This PR also contains the fixes for mypy check (introduced by Python 3.9 upgrade) ``` python/pyspark/sql/pandas/_typing/protocols/frame.pyi:64: error: Name "np.ndarray" is not defined python/pyspark/sql/pandas/_typing/protocols/frame.pyi:91: error: Name "np.recarray" is not defined python/pyspark/sql/pandas/_typing/protocols/frame.pyi:165: error: Name "np.ndarray" is not defined python/pyspark/pandas/categorical.py:82: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories" python/pyspark/pandas/categorical.py:109: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "ordered" python/pyspark/ml/linalg/__init__.pyi:184: error: Return type "ndarray[Any, Any]" of "toArray" incompatible with return type "NoReturn" in supertype "Matrix" python/pyspark/ml/linalg/__init__.pyi:217: error: Return type "ndarray[Any, Any]" of "toArray" incompatible with return type "NoReturn" in supertype "Matrix" python/pyspark/pandas/typedef/typehints.py:163: error: Module has no attribute "bool"; maybe "bool_" or "bool8"? python/pyspark/pandas/typedef/typehints.py:174: error: Module has no attribute "float"; maybe "float_", "cfloat", or "float96"? python/pyspark/pandas/typedef/typehints.py:180: error: Module has no attribute "int"; maybe "uint", "rint", or "intp"? python/pyspark/pandas/ml.py:81: error: Value of type variable "_DTypeScalar_co" of "dtype" cannot be "object" python/pyspark/pandas/indexing.py:1649: error: Module has no attribute "int"; maybe "uint", "rint", or "intp"? python/pyspark/pandas/indexing.py:1656: error: Module has no attribute "int"; maybe "uint", "rint", or "intp"? python/pyspark/pandas/frame.py:4969: error: Function "numpy.array" is not valid as a type python/pyspark/pandas/frame.py:4969: note: Perhaps you need "Callable[...]" or a callback protocol? python/pyspark/pandas/frame.py:4970: error: Function "numpy.array" is not valid as a type python/pyspark/pandas/frame.py:4970: note: Perhaps you need "Callable[...]" or a callback protocol? python/pyspark/pandas/frame.py:7402: error: "List[Any]" has no attribute "tolist" python/pyspark/pandas/series.py:1030: error: Module has no attribute "_NoValue" python/pyspark/pandas/series.py:1031: error: Module has no attribute "_NoValue" python/pyspark/pandas/indexes/category.py:159: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories" python/pyspark/pandas/indexes/category.py:180: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "ordered" python/pyspark/pandas/namespace.py:2036: error: Argument 1 to "column_name" has incompatible type "float"; expected "str" python/pyspark/pandas/mlflow.py:59: error: Incompatible types in assignment (expression has type "Type[floating[Any]]", variable has type "str") python/pyspark/pandas/data_type_ops/categorical_ops.py:43: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories" python/pyspark/pandas/data_type_ops/categorical_ops.py:43: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "ordered" python/pyspark/pandas/data_type_ops/categorical_ops.py:56: error: Item "dtype[Any]" of "Union[dtype[Any], Any]" has no attribute "categories" python/pyspark/pandas/tests/test_typedef.py:70: error: Name "np.float" is not defined python/pyspark/pandas/tests/test_typedef.py:77: error: Name "np.float" is not defined python/pyspark/pandas/tests/test_typedef.py:85: error: Name "np.float" is not defined python/pyspark/pandas/tests/test_typedef.py💯 error: Name "np.float" is not defined python/pyspark/pandas/tests/test_typedef.py:108: error: Name "np.float" is not defined python/pyspark/mllib/clustering.pyi:152: error: Incompatible types in assignment (expression has type "ndarray[Any, Any]", base class "KMeansModel" defined the type as "List[ndarray[Any, Any]]") python/pyspark/mllib/classification.pyi:93: error: Signature of "predict" incompatible with supertype "LinearClassificationModel" Found 32 errors in 15 files (checked 315 source files) 1 ``` ### Why are the changes needed? Python 3.6 is deprecated at SPARK-35938 ### Does this PR introduce _any_ user-facing change? No. Maybe static analysis, etc. by some type hints but they are really non-breaking.. ### How was this patch tested? I manually checked by GitHub Actions build in forked repository. Closes #33356 from HyukjinKwon/SPARK-36146. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
239 lines
8.4 KiB
Python
239 lines
8.4 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
"""
|
|
MLflow-related functions to load models and apply them to pandas-on-Spark dataframes.
|
|
"""
|
|
from typing import List, Union # noqa: F401 (SPARK-34943)
|
|
|
|
from pyspark.sql.types import DataType
|
|
import pandas as pd
|
|
import numpy as np
|
|
from typing import Any
|
|
|
|
from pyspark.pandas._typing import Label, Dtype # noqa: F401 (SPARK-34943)
|
|
from pyspark.pandas.utils import lazy_property, default_session
|
|
from pyspark.pandas.frame import DataFrame
|
|
from pyspark.pandas.series import Series, first_series
|
|
from pyspark.pandas.typedef import as_spark_type
|
|
|
|
__all__ = ["PythonModelWrapper", "load_model"]
|
|
|
|
|
|
class PythonModelWrapper(object):
|
|
"""
|
|
A wrapper around MLflow's Python object model.
|
|
|
|
This wrapper acts as a predictor on pandas-on-Spark
|
|
|
|
"""
|
|
|
|
def __init__(self, model_uri: str, return_type_hint: Union[str, type, Dtype]):
|
|
self._model_uri = model_uri
|
|
self._return_type_hint = return_type_hint
|
|
|
|
@lazy_property
|
|
def _return_type(self) -> DataType:
|
|
hint = self._return_type_hint
|
|
# The logic is simple for now, because it corresponds to the default
|
|
# case: continuous predictions
|
|
# TODO: do something smarter, for example when there is a sklearn.Classifier (it should
|
|
# return an integer or a categorical)
|
|
# We can do the same for pytorch/tensorflow/keras models by looking at the output types.
|
|
# However, this is probably better done in mlflow than here.
|
|
if hint == "infer" or not hint:
|
|
hint = np.float64
|
|
return as_spark_type(hint)
|
|
|
|
@lazy_property
|
|
def _model(self) -> Any:
|
|
"""
|
|
The return object has to follow the API of mlflow.pyfunc.PythonModel.
|
|
"""
|
|
from mlflow import pyfunc
|
|
|
|
return pyfunc.load_model(model_uri=self._model_uri)
|
|
|
|
@lazy_property
|
|
def _model_udf(self) -> Any:
|
|
from mlflow import pyfunc
|
|
|
|
spark = default_session()
|
|
return pyfunc.spark_udf(spark, model_uri=self._model_uri, result_type=self._return_type)
|
|
|
|
def __str__(self) -> str:
|
|
return "PythonModelWrapper({})".format(str(self._model))
|
|
|
|
def __repr__(self) -> str:
|
|
return "PythonModelWrapper({})".format(repr(self._model))
|
|
|
|
def predict(self, data: Union[DataFrame, pd.DataFrame]) -> Union[Series, pd.Series]:
|
|
"""
|
|
Returns a prediction on the data.
|
|
|
|
If the data is a pandas-on-Spark DataFrame, the return is a pandas-on-Spark Series.
|
|
|
|
If the data is a pandas Dataframe, the return is the expected output of the underlying
|
|
pyfunc object (typically a pandas Series or a numpy array).
|
|
"""
|
|
if isinstance(data, pd.DataFrame):
|
|
return self._model.predict(data)
|
|
elif isinstance(data, DataFrame):
|
|
return_col = self._model_udf(*data._internal.data_spark_columns)
|
|
# TODO: the columns should be named according to the mlflow spec
|
|
# However, this is only possible with spark >= 3.0
|
|
# s = F.struct(*data.columns)
|
|
# return_col = self._model_udf(s)
|
|
column_labels = [
|
|
(col,) for col in data._internal.spark_frame.select(return_col).columns
|
|
] # type: List[Label]
|
|
internal = data._internal.copy(
|
|
column_labels=column_labels, data_spark_columns=[return_col], data_fields=None
|
|
)
|
|
return first_series(DataFrame(internal))
|
|
else:
|
|
raise ValueError("unknown data type: {}".format(type(data).__name__))
|
|
|
|
|
|
def load_model(
|
|
model_uri: str, predict_type: Union[str, type, Dtype] = "infer"
|
|
) -> PythonModelWrapper:
|
|
"""
|
|
Loads an MLflow model into an wrapper that can be used both for pandas and pandas-on-Spark
|
|
DataFrame.
|
|
|
|
Parameters
|
|
----------
|
|
model_uri : str
|
|
URI pointing to the model. See MLflow documentation for more details.
|
|
predict_type : a python basic type, a numpy basic type, a Spark type or 'infer'.
|
|
This is the return type that is expected when calling the predict function of the model.
|
|
If 'infer' is specified, the wrapper will attempt to determine automatically the return type
|
|
based on the model type.
|
|
|
|
Returns
|
|
-------
|
|
PythonModelWrapper
|
|
A wrapper around MLflow PythonModel objects. This wrapper is expected to adhere to the
|
|
interface of mlflow.pyfunc.PythonModel.
|
|
|
|
Examples
|
|
--------
|
|
Here is a full example that creates a model with scikit-learn and saves the model with
|
|
MLflow. The model is then loaded as a predictor that can be applied on a pandas-on-Spark
|
|
Dataframe.
|
|
|
|
We first initialize our MLflow environment:
|
|
|
|
>>> from mlflow.tracking import MlflowClient, set_tracking_uri
|
|
>>> import mlflow.sklearn
|
|
>>> from tempfile import mkdtemp
|
|
>>> d = mkdtemp("pandas_on_spark_mlflow")
|
|
>>> set_tracking_uri("file:%s"%d)
|
|
>>> client = MlflowClient()
|
|
>>> exp = mlflow.create_experiment("my_experiment")
|
|
>>> mlflow.set_experiment("my_experiment")
|
|
|
|
We aim at learning this numerical function using a simple linear regressor.
|
|
|
|
>>> from sklearn.linear_model import LinearRegression
|
|
>>> train = pd.DataFrame({"x1": np.arange(8), "x2": np.arange(8)**2,
|
|
... "y": np.log(2 + np.arange(8))})
|
|
>>> train_x = train[["x1", "x2"]]
|
|
>>> train_y = train[["y"]]
|
|
>>> with mlflow.start_run():
|
|
... lr = LinearRegression()
|
|
... lr.fit(train_x, train_y)
|
|
... mlflow.sklearn.log_model(lr, "model")
|
|
LinearRegression(...)
|
|
|
|
Now that our model is logged using MLflow, we load it back and apply it on a pandas-on-Spark
|
|
dataframe:
|
|
|
|
>>> from pyspark.pandas.mlflow import load_model
|
|
>>> run_info = client.list_run_infos(exp)[-1]
|
|
>>> model = load_model("runs:/{run_id}/model".format(run_id=run_info.run_uuid))
|
|
>>> prediction_df = ps.DataFrame({"x1": [2.0], "x2": [4.0]})
|
|
>>> prediction_df["prediction"] = model.predict(prediction_df)
|
|
>>> prediction_df
|
|
x1 x2 prediction
|
|
0 2.0 4.0 1.355551
|
|
|
|
The model also works on pandas DataFrames as expected:
|
|
|
|
>>> model.predict(prediction_df[["x1", "x2"]].to_pandas())
|
|
array([[1.35555142]])
|
|
|
|
Notes
|
|
-----
|
|
Currently, the model prediction can only be merged back with the existing dataframe.
|
|
Other columns have to be manually joined.
|
|
For example, this code will not work:
|
|
|
|
>>> df = ps.DataFrame({"x1": [2.0], "x2": [3.0], "z": [-1]})
|
|
>>> features = df[["x1", "x2"]]
|
|
>>> y = model.predict(features)
|
|
>>> # Works:
|
|
>>> features["y"] = y # doctest: +SKIP
|
|
>>> # Will fail with a message about dataframes not aligned.
|
|
>>> df["y"] = y # doctest: +SKIP
|
|
|
|
A current workaround is to use the .merge() function, using the feature values
|
|
as merging keys.
|
|
|
|
>>> features['y'] = y
|
|
>>> everything = df.merge(features, on=['x1', 'x2'])
|
|
>>> everything
|
|
x1 x2 z y
|
|
0 2.0 3.0 -1 1.376932
|
|
"""
|
|
return PythonModelWrapper(model_uri, predict_type)
|
|
|
|
|
|
def _test() -> None:
|
|
import os
|
|
import doctest
|
|
import sys
|
|
from pyspark.sql import SparkSession
|
|
import pyspark.pandas.mlflow
|
|
|
|
os.chdir(os.environ["SPARK_HOME"])
|
|
|
|
globs = pyspark.pandas.mlflow.__dict__.copy()
|
|
globs["ps"] = pyspark.pandas
|
|
spark = (
|
|
SparkSession.builder.master("local[4]").appName("pyspark.pandas.mlflow tests").getOrCreate()
|
|
)
|
|
(failure_count, test_count) = doctest.testmod(
|
|
pyspark.pandas.mlflow,
|
|
globs=globs,
|
|
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
|
|
)
|
|
spark.stop()
|
|
if failure_count:
|
|
sys.exit(-1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
import mlflow # noqa: F401
|
|
import sklearn # noqa: F401
|
|
|
|
_test()
|
|
except ImportError:
|
|
pass
|