From ac4651a7d19b248c86290d419ac3f6d69ed2b61e Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 4 Jan 2021 12:59:45 -0800 Subject: [PATCH] [SPARK-33980][SS] Invalidate char/varchar in spark.readStream.schema ### What changes were proposed in this pull request? invalidate char/varchar in `spark.readStream.schema` just like what we've done for `spark.read.schema` in da72b87374a7be5416b99ed016dc2fc9da0ed88a ### Why are the changes needed? bugfix, char/varchar is only for table schema while `spark.sql.legacy.charVarcharAsString=false` ### Does this PR introduce _any_ user-facing change? yes, char/varchar will fail to define ss readers when `spark.sql.legacy.charVarcharAsString=false` ### How was this patch tested? new tests Closes #31003 from yaooqinn/SPARK-33980. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun --- .../spark/sql/streaming/DataStreamReader.scala | 7 +++++-- .../apache/spark/sql/CharVarcharTestSuite.scala | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index eb7bb5c87a..d82fa9e885 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -64,7 +64,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * @since 2.0.0 */ def schema(schema: StructType): DataStreamReader = { - this.userSpecifiedSchema = Option(CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)) + val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] + this.userSpecifiedSchema = Option(replaced) this } @@ -76,7 +77,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * @since 2.3.0 */ def schema(schemaString: String): DataStreamReader = { - this.userSpecifiedSchema = Option(StructType.fromDDL(schemaString)) + val rawSchema = StructType.fromDDL(schemaString) + val schema = CharVarcharUtils.failIfHasCharVarchar(rawSchema).asInstanceOf[StructType] + this.userSpecifiedSchema = Option(schema) this } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 9d4b7c4f82..62d0f51e5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -549,6 +549,21 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession { assert(df2.schema.head.dataType === StringType) } } + + test("invalidate char/varchar in spark.readStream.schema") { + failWithInvalidCharUsage(spark.readStream.schema(new StructType().add("id", CharType(5)))) + failWithInvalidCharUsage(spark.readStream.schema("id char(5)")) + withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) { + withTempPath { dir => + spark.range(2).write.save(dir.toString) + val df1 = spark.readStream.schema(new StructType().add("id", CharType(5))) + .load(dir.toString) + assert(df1.schema.map(_.dataType) == Seq(StringType)) + val df2 = spark.readStream.schema("id char(5)").load(dir.toString) + assert(df2.schema.map(_.dataType) == Seq(StringType)) + } + } + } } class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSparkSession {