99ea324b6f
Follow up of https://github.com/apache/spark/pull/24405 ### What changes were proposed in this pull request? The current implementation of _from_avro_ and _AvroDataToCatalyst_ doesn't allow doing schema evolution since it requires the deserialization of an Avro record with the exact same schema with which it was serialized. The proposed change is to add a new option `actualSchema` to allow passing the schema used to serialize the records. This allows using a different compatible schema for reading by passing both schemas to _GenericDatumReader_. If no writer's schema is provided, nothing changes from before. ### Why are the changes needed? Consider the following example. ``` // schema ID: 1 val schema1 = """ { "type": "record", "name": "MySchema", "fields": [ {"name": "col1", "type": "int"}, {"name": "col2", "type": "string"} ] } """ // schema ID: 2 val schema2 = """ { "type": "record", "name": "MySchema", "fields": [ {"name": "col1", "type": "int"}, {"name": "col2", "type": "string"}, {"name": "col3", "type": "string", "default": ""} ] } """ ``` The two schemas are compatible - i.e. you can use `schema2` to deserialize events serialized with `schema1`, in which case there will be the field `col3` with the default value. Now imagine that you have two dataframes (read from batch or streaming), one with Avro events from schema1 and the other with events from schema2. **We want to combine them into one dataframe** for storing or further processing. With the current `from_avro` function we can only decode each of them with the corresponding schema: ``` scalaval df1 = ... // Avro events created with schema1 df1: org.apache.spark.sql.DataFrame = [eventBytes: binary] scalaval decodedDf1 = df1.select(from_avro('eventBytes, schema1) as "decoded") decodedDf1: org.apache.spark.sql.DataFrame = [decoded: struct<col1: int, col2: string>] scalaval df2= ... // Avro events created with schema2 df2: org.apache.spark.sql.DataFrame = [eventBytes: binary] scalaval decodedDf2 = df2.select(from_avro('eventBytes, schema2) as "decoded") decodedDf2: org.apache.spark.sql.DataFrame = [decoded: struct<col1: int, col2: string, col3: string>] ``` but then `decodedDf1` and `decodedDf2` have different Spark schemas and we can't union them. Instead, with the proposed change we can decode `df1` in the following way: ``` scalaimport scala.collection.JavaConverters._ scalaval decodedDf1 = df1.select(from_avro(data = 'eventBytes, jsonFormatSchema = schema2, options = Map("actualSchema" -> schema1).asJava) as "decoded") decodedDf1: org.apache.spark.sql.DataFrame = [decoded: struct<col1: int, col2: string, col3: string>] ``` so that both dataframes have the same schemas and can be merged. ### Does this PR introduce any user-facing change? This PR allows users to pass a new configuration but it doesn't affect current code. ### How was this patch tested? A new unit test was added. Closes #26780 from Fokko/SPARK-27506. Lead-authored-by: Fokko Driesprong <fokko@apache.org> Co-authored-by: Gianluca Amori <gianluca.amori@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com> |
||
---|---|---|
.. | ||
docs | ||
lib | ||
pyspark | ||
test_coverage | ||
test_support | ||
.coveragerc | ||
.gitignore | ||
MANIFEST.in | ||
pylintrc | ||
README.md | ||
run-tests | ||
run-tests-with-coverage | ||
run-tests.py | ||
setup.cfg | ||
setup.py |
Apache Spark
Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.
Online Documentation
You can find the latest Spark documentation, including a programming guide, on the project web page
Python Packaging
This README file only contains basic information related to pip installed PySpark. This packaging is currently experimental and may change in future versions (although we will do our best to keep compatibility). Using PySpark requires the Spark JARs, and if you are building this from source please see the builder instructions at "Building Spark".
The Python packaging for Spark is not intended to replace all of the other use cases. This Python packaged version of Spark is suitable for interacting with an existing cluster (be it Spark standalone, YARN, or Mesos) - but does not contain the tools required to set up your own standalone Spark cluster. You can download the full version of Spark from the Apache Spark downloads page.
NOTE: If you are using this with a Spark standalone cluster you must ensure that the version (including minor version) matches or you may experience odd errors.
Python Requirements
At its core PySpark depends on Py4J (currently version 0.10.8.1), but some additional sub-packages have their own extra requirements for some features (including numpy, pandas, and pyarrow).