spark-instrumented-optimizer/sql/hive/src
Takeshi Yamamuro 647963a26a [SPARK-20460][SQL] Make it more consistent to handle column name duplication
## What changes were proposed in this pull request?
This pr made it more consistent to handle column name duplication. In the current master, error handling is different when hitting column name duplication:
```
// json
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("json").schema(schema).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#12, a#13.;
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)

scala> spark.read.format("json").load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Duplicate column(s) : "a" found, cannot save to JSON format;
  at org.apache.spark.sql.execution.datasources.json.JsonDataSource.checkConstraints(JsonDataSource.scala:81)
  at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:63)
  at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:57)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176)

// csv
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("csv").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#41, a#42.;
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152)

// If `inferSchema` is true, a CSV format is duplicate-safe (See SPARK-16896)
scala> spark.read.format("csv").option("header", true).load("/tmp/data").show
+---+---+
| a0| a1|
+---+---+
|  1|  1|
+---+---+

// parquet
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/tmp/data")
scala> spark.read.format("parquet").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#110, a#111.;
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
```
When this patch applied, the results change to;
```

// json
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("json").schema(schema).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)

scala> spark.read.format("json").load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:156)

// csv
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("csv").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

scala> spark.read.format("csv").option("header", true).load("/tmp/data").show
+---+---+
| a0| a1|
+---+---+
|  1|  1|
+---+---+

// parquet
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/tmp/data")
scala> spark.read.format("parquet").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
```

## How was this patch tested?
Added tests in `DataFrameReaderWriterSuite` and `SQLQueryTestSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #17758 from maropu/SPARK-20460.
2017-07-10 15:58:34 +08:00
..
main [SPARK-20703][SQL] Associate metrics with data writes onto DataFrameWriter operations 2017-07-06 15:47:09 +08:00
test [SPARK-20460][SQL] Make it more consistent to handle column name duplication 2017-07-10 15:58:34 +08:00