[SPARK-27506][SQL][FOLLOWUP] Use option avroSchema to specify an evolved schema in from_avro

### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/26780
In https://github.com/apache/spark/pull/26780, a new Avro data source option `actualSchema` is introduced for setting the original Avro schema in function `from_avro`, while the expected schema is supposed to be set in the parameter `jsonFormatSchema` of `from_avro`.

However, there is another Avro data source option `avroSchema`. It is used for setting the expected schema in readiong and writing.

This PR is to use the option `avroSchema` option for  reading Avro data with an evolved schema and remove the new one `actualSchema`

### Why are the changes needed?

Unify and simplify the Avro data source options.

### Does this PR introduce any user-facing change?

Yes.
To deserialize Avro data with an evolved schema, before changes:
```
from_avro('col, expectedSchema, ("actualSchema" -> actualSchema))
```

After changes:
```
from_avro('col, actualSchema, ("avroSchema" -> expectedSchema))
```

The second parameter is always the actual Avro schema after changes.
### How was this patch tested?

Update the existing tests in https://github.com/apache/spark/pull/26780

Closes #27045 from gengliangwang/renameAvroOption.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Gengliang Wang 2019-12-30 18:14:21 +09:00 committed by HyukjinKwon
parent a90ad5bf2a
commit 07593d362f
6 changed files with 46 additions and 38 deletions

View file

@ -198,9 +198,22 @@ Data source options of Avro can be set via:
<tr> <tr>
<td><code>avroSchema</code></td> <td><code>avroSchema</code></td>
<td>None</td> <td>None</td>
<td>Optional Avro schema provided by a user in JSON format. The data type and naming of record fields <td>Optional schema provided by a user in JSON format.
should match the Avro data type when reading from Avro or match the Spark's internal data type (e.g., StringType, IntegerType) when writing to Avro files; otherwise, the read/write action will fail.</td> <ul>
<td>read and write</td> <li>
When reading Avro, this option can be set to an evolved schema, which is compatible but different with
the actual Avro schema. The deserialization schema will be consistent with the evolved schema.
For example, if we set an evolved schema containing one additional column with a default value,
the reading result in Spark will contain the new column too.
</li>
<li>
When writing Avro, this option can be set if the expected output Avro schema doesn't match the
schema converted by Spark. For example, the expected schema of one column is of "enum" type,
instead of "string" type in the default converted schema.
</li>
</ul>
</td>
<td> read, write and function <code>from_avro</code></td>
</tr> </tr>
<tr> <tr>
<td><code>recordName</code></td> <td><code>recordName</code></td>
@ -240,15 +253,6 @@ Data source options of Avro can be set via:
</td> </td>
<td>function <code>from_avro</code></td> <td>function <code>from_avro</code></td>
</tr> </tr>
<tr>
<td><code>actualSchema</code></td>
<td>None</td>
<td>Optional Avro schema (in JSON format) that was used to serialize the data. This should be set if the schema provided
for deserialization is compatible with - but not the same as - the one used to originally convert the data to Avro.
For more information on Avro's schema evolution and compatibility, please refer to the [documentation of Confluent](https://docs.confluent.io/current/schema-registry/avro.html).
</td>
<td>function <code>from_avro</code></td>
</tr>
</table> </table>
## Configuration ## Configuration

View file

@ -39,7 +39,7 @@ case class AvroDataToCatalyst(
override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
override lazy val dataType: DataType = { override lazy val dataType: DataType = {
val dt = SchemaConverters.toSqlType(avroSchema).dataType val dt = SchemaConverters.toSqlType(expectedSchema).dataType
parseMode match { parseMode match {
// With PermissiveMode, the output Catalyst row might contain columns of null values for // With PermissiveMode, the output Catalyst row might contain columns of null values for
// corrupt records, even if some of the columns are not nullable in the user-provided schema. // corrupt records, even if some of the columns are not nullable in the user-provided schema.
@ -53,14 +53,15 @@ case class AvroDataToCatalyst(
private lazy val avroOptions = AvroOptions(options) private lazy val avroOptions = AvroOptions(options)
@transient private lazy val avroSchema = new Schema.Parser().parse(jsonFormatSchema) @transient private lazy val actualSchema = new Schema.Parser().parse(jsonFormatSchema)
@transient private lazy val reader = avroOptions.actualSchema @transient private lazy val expectedSchema = avroOptions.schema
.map(actualSchema => .map(expectedSchema => new Schema.Parser().parse(expectedSchema))
new GenericDatumReader[Any](new Schema.Parser().parse(actualSchema), avroSchema)) .getOrElse(actualSchema)
.getOrElse(new GenericDatumReader[Any](avroSchema))
@transient private lazy val deserializer = new AvroDeserializer(avroSchema, dataType) @transient private lazy val reader = new GenericDatumReader[Any](actualSchema, expectedSchema)
@transient private lazy val deserializer = new AvroDeserializer(expectedSchema, dataType)
@transient private var decoder: BinaryDecoder = _ @transient private var decoder: BinaryDecoder = _

View file

@ -36,18 +36,19 @@ class AvroOptions(
} }
/** /**
* Optional schema provided by an user in JSON format. * Optional schema provided by a user in JSON format.
*
* When reading Avro, this option can be set to an evolved schema, which is compatible but
* different with the actual Avro schema. The deserialization schema will be consistent with
* the evolved schema. For example, if we set an evolved schema containing one additional
* column with a default value, the reading result in Spark will contain the new column too.
*
* When writing Avro, this option can be set if the expected output Avro schema doesn't match the
* schema converted by Spark. For example, the expected schema of one column is of "enum" type,
* instead of "string" type in the default converted schema.
*/ */
val schema: Option[String] = parameters.get("avroSchema") val schema: Option[String] = parameters.get("avroSchema")
/**
* Optional Avro schema (in JSON format) that was used to serialize the data.
* This should be set if the schema provided for deserialization is compatible
* with - but not the same as - the one used to originally convert the data to Avro.
* See SPARK-27506 for more details.
*/
val actualSchema: Option[String] = parameters.get("actualSchema")
/** /**
* Top level record name in write result, which is required in Avro spec. * Top level record name in write result, which is required in Avro spec.
* See https://avro.apache.org/docs/1.8.2/spec.html#schema_record . * See https://avro.apache.org/docs/1.8.2/spec.html#schema_record .

View file

@ -45,10 +45,11 @@ object functions {
} }
/** /**
* Converts a binary column of Avro format into its corresponding catalyst value. If a schema is * Converts a binary column of Avro format into its corresponding catalyst value.
* provided via the option actualSchema, a different (but compatible) schema can be used for * The specified schema must match actual schema of the read data, otherwise the behavior
* reading. If no actualSchema option is provided, the specified schema must match the read data, * is undefined: it may fail or return arbitrary result.
* otherwise the behavior is undefined: it may fail or return arbitrary result. * To deserialize the data with a compatible and evolved schema, the expected Avro schema can be
* set via the option avroSchema.
* *
* @param data the binary column. * @param data the binary column.
* @param jsonFormatSchema the avro schema in JSON string format. * @param jsonFormatSchema the avro schema in JSON string format.

View file

@ -197,8 +197,8 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
avroStructDF.select( avroStructDF.select(
functions.from_avro( functions.from_avro(
'avro, 'avro,
evolvedAvroSchema, actualAvroSchema,
Map("actualSchema" -> actualAvroSchema).asJava)), Map("avroSchema" -> evolvedAvroSchema).asJava)),
expected) expected)
} }
} }

View file

@ -30,10 +30,11 @@ from pyspark.util import _print_missing_jar
@since(3.0) @since(3.0)
def from_avro(data, jsonFormatSchema, options={}): def from_avro(data, jsonFormatSchema, options={}):
""" """
Converts a binary column of Avro format into its corresponding catalyst value. If a schema is Converts a binary column of Avro format into its corresponding catalyst value.
provided via the option actualSchema, a different (but compatible) schema can be used for The specified schema must match the read data, otherwise the behavior is undefined:
reading. If no actualSchema option is provided, the specified schema must match the read data, it may fail or return arbitrary result.
otherwise the behavior is undefined: it may fail or return arbitrary result. To deserialize the data with a compatible and evolved schema, the expected Avro schema can be
set via the option avroSchema.
Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
application as per the deployment section of "Apache Avro Data Source Guide". application as per the deployment section of "Apache Avro Data Source Guide".