[SPARK-33980][SS] Invalidate char/varchar in spark.readStream.schema

### What changes were proposed in this pull request?

invalidate char/varchar in `spark.readStream.schema` just like what we've done for `spark.read.schema` in da72b87374

### Why are the changes needed?

bugfix, char/varchar is only for table schema while `spark.sql.legacy.charVarcharAsString=false`

### Does this PR introduce _any_ user-facing change?

yes, char/varchar will fail to define ss readers when `spark.sql.legacy.charVarcharAsString=false`

### How was this patch tested?

new tests

Closes #31003 from yaooqinn/SPARK-33980.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Kent Yao 2021-01-04 12:59:45 -08:00 committed by Dongjoon Hyun
parent d6322bf70c
commit ac4651a7d1
2 changed files with 20 additions and 2 deletions

View file

@ -64,7 +64,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* @since 2.0.0
*/
def schema(schema: StructType): DataStreamReader = {
this.userSpecifiedSchema = Option(CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema))
val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
this.userSpecifiedSchema = Option(replaced)
this
}
@ -76,7 +77,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* @since 2.3.0
*/
def schema(schemaString: String): DataStreamReader = {
this.userSpecifiedSchema = Option(StructType.fromDDL(schemaString))
val rawSchema = StructType.fromDDL(schemaString)
val schema = CharVarcharUtils.failIfHasCharVarchar(rawSchema).asInstanceOf[StructType]
this.userSpecifiedSchema = Option(schema)
this
}

View file

@ -549,6 +549,21 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
assert(df2.schema.head.dataType === StringType)
}
}
test("invalidate char/varchar in spark.readStream.schema") {
failWithInvalidCharUsage(spark.readStream.schema(new StructType().add("id", CharType(5))))
failWithInvalidCharUsage(spark.readStream.schema("id char(5)"))
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
withTempPath { dir =>
spark.range(2).write.save(dir.toString)
val df1 = spark.readStream.schema(new StructType().add("id", CharType(5)))
.load(dir.toString)
assert(df1.schema.map(_.dataType) == Seq(StringType))
val df2 = spark.readStream.schema("id char(5)").load(dir.toString)
assert(df2.schema.map(_.dataType) == Seq(StringType))
}
}
}
}
class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSparkSession {