diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index 53be8709e9..dc534abce2 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -198,9 +198,22 @@ Data source options of Avro can be set via: avroSchema None - Optional Avro schema provided by a user in JSON format. The data type and naming of record fields - should match the Avro data type when reading from Avro or match the Spark's internal data type (e.g., StringType, IntegerType) when writing to Avro files; otherwise, the read/write action will fail. - read and write + Optional schema provided by a user in JSON format. + + + read, write and function from_avro recordName @@ -240,15 +253,6 @@ Data source options of Avro can be set via: function from_avro - - actualSchema - None - Optional Avro schema (in JSON format) that was used to serialize the data. This should be set if the schema provided - for deserialization is compatible with - but not the same as - the one used to originally convert the data to Avro. - For more information on Avro's schema evolution and compatibility, please refer to the [documentation of Confluent](https://docs.confluent.io/current/schema-registry/avro.html). - - function from_avro - ## Configuration diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 8570828aa1..79c72057c5 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -39,7 +39,7 @@ case class AvroDataToCatalyst( override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) override lazy val dataType: DataType = { - val dt = SchemaConverters.toSqlType(avroSchema).dataType + val dt = SchemaConverters.toSqlType(expectedSchema).dataType parseMode match { // With PermissiveMode, the output Catalyst row might contain columns of null values for // corrupt records, even if some of the columns are not nullable in the user-provided schema. @@ -53,14 +53,15 @@ case class AvroDataToCatalyst( private lazy val avroOptions = AvroOptions(options) - @transient private lazy val avroSchema = new Schema.Parser().parse(jsonFormatSchema) + @transient private lazy val actualSchema = new Schema.Parser().parse(jsonFormatSchema) - @transient private lazy val reader = avroOptions.actualSchema - .map(actualSchema => - new GenericDatumReader[Any](new Schema.Parser().parse(actualSchema), avroSchema)) - .getOrElse(new GenericDatumReader[Any](avroSchema)) + @transient private lazy val expectedSchema = avroOptions.schema + .map(expectedSchema => new Schema.Parser().parse(expectedSchema)) + .getOrElse(actualSchema) - @transient private lazy val deserializer = new AvroDeserializer(avroSchema, dataType) + @transient private lazy val reader = new GenericDatumReader[Any](actualSchema, expectedSchema) + + @transient private lazy val deserializer = new AvroDeserializer(expectedSchema, dataType) @transient private var decoder: BinaryDecoder = _ diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index b133a84f18..f3ea78583f 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -36,18 +36,19 @@ class AvroOptions( } /** - * Optional schema provided by an user in JSON format. + * Optional schema provided by a user in JSON format. + * + * When reading Avro, this option can be set to an evolved schema, which is compatible but + * different with the actual Avro schema. The deserialization schema will be consistent with + * the evolved schema. For example, if we set an evolved schema containing one additional + * column with a default value, the reading result in Spark will contain the new column too. + * + * When writing Avro, this option can be set if the expected output Avro schema doesn't match the + * schema converted by Spark. For example, the expected schema of one column is of "enum" type, + * instead of "string" type in the default converted schema. */ val schema: Option[String] = parameters.get("avroSchema") - /** - * Optional Avro schema (in JSON format) that was used to serialize the data. - * This should be set if the schema provided for deserialization is compatible - * with - but not the same as - the one used to originally convert the data to Avro. - * See SPARK-27506 for more details. - */ - val actualSchema: Option[String] = parameters.get("actualSchema") - /** * Top level record name in write result, which is required in Avro spec. * See https://avro.apache.org/docs/1.8.2/spec.html#schema_record . diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala index 03fcd20c52..74bfaaed9d 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala @@ -45,10 +45,11 @@ object functions { } /** - * Converts a binary column of Avro format into its corresponding catalyst value. If a schema is - * provided via the option actualSchema, a different (but compatible) schema can be used for - * reading. If no actualSchema option is provided, the specified schema must match the read data, - * otherwise the behavior is undefined: it may fail or return arbitrary result. + * Converts a binary column of Avro format into its corresponding catalyst value. + * The specified schema must match actual schema of the read data, otherwise the behavior + * is undefined: it may fail or return arbitrary result. + * To deserialize the data with a compatible and evolved schema, the expected Avro schema can be + * set via the option avroSchema. * * @param data the binary column. * @param jsonFormatSchema the avro schema in JSON string format. diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index d6c22c00af..7f14efe15a 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -197,8 +197,8 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { avroStructDF.select( functions.from_avro( 'avro, - evolvedAvroSchema, - Map("actualSchema" -> actualAvroSchema).asJava)), + actualAvroSchema, + Map("avroSchema" -> evolvedAvroSchema).asJava)), expected) } } diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py index 3ecdb877bd..ed62a72d6c 100644 --- a/python/pyspark/sql/avro/functions.py +++ b/python/pyspark/sql/avro/functions.py @@ -30,10 +30,11 @@ from pyspark.util import _print_missing_jar @since(3.0) def from_avro(data, jsonFormatSchema, options={}): """ - Converts a binary column of Avro format into its corresponding catalyst value. If a schema is - provided via the option actualSchema, a different (but compatible) schema can be used for - reading. If no actualSchema option is provided, the specified schema must match the read data, - otherwise the behavior is undefined: it may fail or return arbitrary result. + Converts a binary column of Avro format into its corresponding catalyst value. + The specified schema must match the read data, otherwise the behavior is undefined: + it may fail or return arbitrary result. + To deserialize the data with a compatible and evolved schema, the expected Avro schema can be + set via the option avroSchema. Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".