[SPARK-23072][SQL][TEST] Add a Unicode schema test for file-based data sources
## What changes were proposed in this pull request? After [SPARK-20682](https://github.com/apache/spark/pull/19651), Apache Spark 2.3 is able to read ORC files with Unicode schema. Previously, it raises `org.apache.spark.sql.catalyst.parser.ParseException`. This PR adds a Unicode schema test for CSV/JSON/ORC/Parquet file-based data sources. Note that TEXT data source only has [a single column with a fixed name 'value'](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L71). ## How was this patch tested? Pass the newly added test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #20266 from dongjoon-hyun/SPARK-23072.
This commit is contained in:
parent
a963980a6d
commit
a0aedb0ded
|
@ -0,0 +1,81 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.sql
|
||||||
|
|
||||||
|
import org.apache.spark.sql.test.SharedSQLContext
|
||||||
|
|
||||||
|
class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
|
||||||
|
import testImplicits._
|
||||||
|
|
||||||
|
private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text")
|
||||||
|
|
||||||
|
allFileBasedDataSources.foreach { format =>
|
||||||
|
test(s"Writing empty datasets should not fail - $format") {
|
||||||
|
withTempPath { dir =>
|
||||||
|
Seq("str").toDS().limit(0).write.format(format).save(dir.getCanonicalPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// `TEXT` data source always has a single column whose name is `value`.
|
||||||
|
allFileBasedDataSources.filterNot(_ == "text").foreach { format =>
|
||||||
|
test(s"SPARK-23072 Write and read back unicode column names - $format") {
|
||||||
|
withTempPath { path =>
|
||||||
|
val dir = path.getCanonicalPath
|
||||||
|
|
||||||
|
// scalastyle:off nonascii
|
||||||
|
val df = Seq("a").toDF("한글")
|
||||||
|
// scalastyle:on nonascii
|
||||||
|
|
||||||
|
df.write.format(format).option("header", "true").save(dir)
|
||||||
|
val answerDf = spark.read.format(format).option("header", "true").load(dir)
|
||||||
|
|
||||||
|
assert(df.schema.sameType(answerDf.schema))
|
||||||
|
checkAnswer(df, answerDf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only ORC/Parquet support this. `CSV` and `JSON` returns an empty schema.
|
||||||
|
// `TEXT` data source always has a single column whose name is `value`.
|
||||||
|
Seq("orc", "parquet").foreach { format =>
|
||||||
|
test(s"SPARK-15474 Write and read back non-emtpy schema with empty dataframe - $format") {
|
||||||
|
withTempPath { file =>
|
||||||
|
val path = file.getCanonicalPath
|
||||||
|
val emptyDf = Seq((true, 1, "str")).toDF().limit(0)
|
||||||
|
emptyDf.write.format(format).save(path)
|
||||||
|
|
||||||
|
val df = spark.read.format(format).load(path)
|
||||||
|
assert(df.schema.sameType(emptyDf.schema))
|
||||||
|
checkAnswer(df, emptyDf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
allFileBasedDataSources.foreach { format =>
|
||||||
|
test(s"SPARK-22146 read files containing special characters using $format") {
|
||||||
|
val nameWithSpecialChars = s"sp&cial%chars"
|
||||||
|
withTempDir { dir =>
|
||||||
|
val tmpFile = s"$dir/$nameWithSpecialChars"
|
||||||
|
spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
|
||||||
|
val fileContent = spark.read.format(format).load(tmpFile)
|
||||||
|
checkAnswer(fileContent, Seq(Row("a"), Row("b")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -2757,20 +2757,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only New OrcFileFormat supports this
|
|
||||||
Seq(classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat].getCanonicalName,
|
|
||||||
"parquet").foreach { format =>
|
|
||||||
test(s"SPARK-15474 Write and read back non-emtpy schema with empty dataframe - $format") {
|
|
||||||
withTempPath { file =>
|
|
||||||
val path = file.getCanonicalPath
|
|
||||||
val emptyDf = Seq((true, 1, "str")).toDF.limit(0)
|
|
||||||
emptyDf.write.format(format).save(path)
|
|
||||||
|
|
||||||
val df = spark.read.format(format).load(path)
|
|
||||||
assert(df.schema.sameType(emptyDf.schema))
|
|
||||||
checkAnswer(df, emptyDf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,14 +23,12 @@ import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
|
|
||||||
import org.apache.spark.SparkContext
|
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
|
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
|
||||||
import org.apache.spark.sql.execution.command.CreateTableCommand
|
import org.apache.spark.sql.execution.command.CreateTableCommand
|
||||||
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
|
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
|
||||||
import org.apache.spark.sql.hive.HiveExternalCatalog._
|
import org.apache.spark.sql.hive.HiveExternalCatalog._
|
||||||
import org.apache.spark.sql.hive.client.HiveClient
|
|
||||||
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.internal.StaticSQLConf._
|
import org.apache.spark.sql.internal.StaticSQLConf._
|
||||||
|
@ -1344,18 +1342,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Seq("orc", "parquet", "csv", "json", "text").foreach { format =>
|
|
||||||
test(s"SPARK-22146: read files containing special characters using $format") {
|
|
||||||
val nameWithSpecialChars = s"sp&cial%chars"
|
|
||||||
withTempDir { dir =>
|
|
||||||
val tmpFile = s"$dir/$nameWithSpecialChars"
|
|
||||||
spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
|
|
||||||
val fileContent = spark.read.format(format).load(tmpFile)
|
|
||||||
checkAnswer(fileContent, Seq(Row("a"), Row("b")))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def withDebugMode(f: => Unit): Unit = {
|
private def withDebugMode(f: => Unit): Unit = {
|
||||||
val previousValue = sparkSession.sparkContext.conf.get(DEBUG_MODE)
|
val previousValue = sparkSession.sparkContext.conf.get(DEBUG_MODE)
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -2159,12 +2159,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Seq("orc", "parquet", "csv", "json", "text").foreach { format =>
|
|
||||||
test(s"Writing empty datasets should not fail - $format") {
|
|
||||||
withTempDir { dir =>
|
|
||||||
Seq("str").toDS.limit(0).write.format(format).save(dir.getCanonicalPath + "/tmp")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue