spark-instrumented-optimizer/python/pyspark/ml/image.py

199 lines
6.8 KiB
Python
Raw Normal View History

[SPARK-21866][ML][PYSPARK] Adding spark image reader ## What changes were proposed in this pull request? Adding spark image reader, an implementation of schema for representing images in spark DataFrames The code is taken from the spark package located here: (https://github.com/Microsoft/spark-images) Please see the JIRA for more information (https://issues.apache.org/jira/browse/SPARK-21866) Please see mailing list for SPIP vote and approval information: (http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-SPIP-SPARK-21866-Image-support-in-Apache-Spark-td22510.html) # Background and motivation As Apache Spark is being used more and more in the industry, some new use cases are emerging for different data formats beyond the traditional SQL types or the numerical types (vectors and matrices). Deep Learning applications commonly deal with image processing. A number of projects add some Deep Learning capabilities to Spark (see list below), but they struggle to communicate with each other or with MLlib pipelines because there is no standard way to represent an image in Spark DataFrames. We propose to federate efforts for representing images in Spark by defining a representation that caters to the most common needs of users and library developers. This SPIP proposes a specification to represent images in Spark DataFrames and Datasets (based on existing industrial standards), and an interface for loading sources of images. It is not meant to be a full-fledged image processing library, but rather the core description that other libraries and users can rely on. Several packages already offer various processing facilities for transforming images or doing more complex operations, and each has various design tradeoffs that make them better as standalone solutions. This project is a joint collaboration between Microsoft and Databricks, which have been testing this design in two open source packages: MMLSpark and Deep Learning Pipelines. The proposed image format is an in-memory, decompressed representation that targets low-level applications. It is significantly more liberal in memory usage than compressed image representations such as JPEG, PNG, etc., but it allows easy communication with popular image processing libraries and has no decoding overhead. ## How was this patch tested? Unit tests in scala ImageSchemaSuite, unit tests in python Author: Ilya Matiach <ilmat@microsoft.com> Author: hyukjinkwon <gurwls223@gmail.com> Closes #19439 from imatiach-msft/ilmat/spark-images.
2017-11-22 18:45:45 -05:00
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
.. attribute:: ImageSchema
An attribute of this module that contains the instance of :class:`_ImageSchema`.
.. autoclass:: _ImageSchema
:members:
"""
import numpy as np
from pyspark import SparkContext
from pyspark.sql.types import Row, _create_row, _parse_datatype_json_string
from pyspark.sql import DataFrame, SparkSession
class _ImageSchema(object):
"""
Internal class for `pyspark.ml.image.ImageSchema` attribute. Meant to be private and
not to be instantized. Use `pyspark.ml.image.ImageSchema` attribute to access the
APIs of this class.
"""
def __init__(self):
self._imageSchema = None
self._ocvTypes = None
self._imageFields = None
self._undefinedImageType = None
@property
def imageSchema(self):
"""
Returns the image schema.
:return: a :class:`StructType` with a single column of images
named "image" (nullable).
.. versionadded:: 2.3.0
"""
if self._imageSchema is None:
ctx = SparkContext._active_spark_context
jschema = ctx._jvm.org.apache.spark.ml.image.ImageSchema.imageSchema()
self._imageSchema = _parse_datatype_json_string(jschema.json())
return self._imageSchema
@property
def ocvTypes(self):
"""
Returns the OpenCV type mapping supported.
:return: a dictionary containing the OpenCV type mapping supported.
.. versionadded:: 2.3.0
"""
if self._ocvTypes is None:
ctx = SparkContext._active_spark_context
self._ocvTypes = dict(ctx._jvm.org.apache.spark.ml.image.ImageSchema.javaOcvTypes())
return self._ocvTypes
@property
def imageFields(self):
"""
Returns field names of image columns.
:return: a list of field names.
.. versionadded:: 2.3.0
"""
if self._imageFields is None:
ctx = SparkContext._active_spark_context
self._imageFields = list(ctx._jvm.org.apache.spark.ml.image.ImageSchema.imageFields())
return self._imageFields
@property
def undefinedImageType(self):
"""
Returns the name of undefined image type for the invalid image.
.. versionadded:: 2.3.0
"""
if self._undefinedImageType is None:
ctx = SparkContext._active_spark_context
self._undefinedImageType = \
ctx._jvm.org.apache.spark.ml.image.ImageSchema.undefinedImageType()
return self._undefinedImageType
def toNDArray(self, image):
"""
Converts an image to an array with metadata.
:param image: The image to be converted.
:return: a `numpy.ndarray` that is an image.
.. versionadded:: 2.3.0
"""
height = image.height
width = image.width
nChannels = image.nChannels
return np.ndarray(
shape=(height, width, nChannels),
dtype=np.uint8,
buffer=image.data,
strides=(width * nChannels, nChannels, 1))
def toImage(self, array, origin=""):
"""
Converts an array with metadata to a two-dimensional image.
:param array array: The array to convert to image.
:param str origin: Path to the image, optional.
:return: a :class:`Row` that is a two dimensional image.
.. versionadded:: 2.3.0
"""
if array.ndim != 3:
raise ValueError("Invalid array shape")
height, width, nChannels = array.shape
ocvTypes = ImageSchema.ocvTypes
if nChannels == 1:
mode = ocvTypes["CV_8UC1"]
elif nChannels == 3:
mode = ocvTypes["CV_8UC3"]
elif nChannels == 4:
mode = ocvTypes["CV_8UC4"]
else:
raise ValueError("Invalid number of channels")
data = bytearray(array.astype(dtype=np.uint8).ravel())
# Creating new Row with _create_row(), because Row(name = value, ... )
# orders fields by name, which conflicts with expected schema order
# when the new DataFrame is created by UDF
return _create_row(self.imageFields,
[origin, height, width, nChannels, mode, data])
def readImages(self, path, recursive=False, numPartitions=-1,
dropImageFailures=False, sampleRatio=1.0, seed=0):
"""
Reads the directory of images from the local or remote source.
.. note:: If multiple jobs are run in parallel with different sampleRatio or recursive flag,
there may be a race condition where one job overwrites the hadoop configs of another.
.. note:: If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
potentially non-deterministic.
:param str path: Path to the image directory.
:param bool recursive: Recursive search flag.
:param int numPartitions: Number of DataFrame partitions.
:param bool dropImageFailures: Drop the files that are not valid images.
:param float sampleRatio: Fraction of the images loaded.
:param int seed: Random number seed.
:return: a :class:`DataFrame` with a single column of "images",
see ImageSchema for details.
>>> df = ImageSchema.readImages('python/test_support/image/kittens', recursive=True)
>>> df.count()
4
.. versionadded:: 2.3.0
"""
ctx = SparkContext._active_spark_context
spark = SparkSession(ctx)
image_schema = ctx._jvm.org.apache.spark.ml.image.ImageSchema
jsession = spark._jsparkSession
jresult = image_schema.readImages(path, jsession, recursive, numPartitions,
dropImageFailures, float(sampleRatio), seed)
return DataFrame(jresult, spark._wrapped)
ImageSchema = _ImageSchema()
# Monkey patch to disallow instantization of this class.
def _disallow_instance(_):
raise RuntimeError("Creating instance of _ImageSchema class is disallowed.")
_ImageSchema.__init__ = _disallow_instance