[SPARK-10301] [SPARK-10428] [SQL] Addresses comments of PR #8583 and #8509 for master

Author: Cheng Lian <lian@databricks.com>

Closes #8670 from liancheng/spark-10301/address-pr-comments.
This commit is contained in:
Cheng Lian 2015-09-10 11:01:08 -07:00 committed by Davies Liu
parent f892d927d7
commit 49da38e5f7
4 changed files with 524 additions and 47 deletions

View file

@ -117,14 +117,18 @@ private[parquet] object CatalystReadSupport {
// Only clips array types with nested type as element type.
clipParquetListType(parquetType.asGroupType(), t.elementType)
case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested type as value type.
case t: MapType
if !isPrimitiveCatalystType(t.keyType) ||
!isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested key type or value type
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t)
case _ =>
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
// to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.
parquetType
}
}
@ -204,14 +208,14 @@ private[parquet] object CatalystReadSupport {
}
/**
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. The value type
* of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
* [[StructType]]. Note that key type of any [[MapType]] is always a primitive type.
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. Either key type or
* value type of the [[MapType]] must be a nested type, namely an [[ArrayType]], a [[MapType]], or
* a [[StructType]].
*/
private def clipParquetMapType(
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
// Precondition of this method, should only be called for maps with nested value types.
assert(!isPrimitiveCatalystType(valueType))
// Precondition of this method, only handles maps with nested key types or value types.
assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
val repeatedGroup = parquetMap.getType(0).asGroupType()
val parquetKeyType = repeatedGroup.getType(0)
@ -221,7 +225,7 @@ private[parquet] object CatalystReadSupport {
Types
.repeatedGroup()
.as(repeatedGroup.getOriginalType)
.addField(parquetKeyType)
.addField(clipParquetType(parquetKeyType, keyType))
.addField(clipParquetType(parquetValueType, valueType))
.named(repeatedGroup.getName)

View file

@ -123,6 +123,16 @@ private[parquet] class CatalystRowConverter(
updater: ParentContainerUpdater)
extends CatalystGroupConverter(updater) with Logging {
assert(
parquetType.getFieldCount == catalystType.length,
s"""Field counts of the Parquet schema and the Catalyst schema don't match:
|
|Parquet schema:
|$parquetType
|Catalyst schema:
|${catalystType.prettyJson}
""".stripMargin)
logDebug(
s"""Building row converter for the following schema:
|

View file

@ -22,6 +22,9 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@ -228,54 +231,168 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
test("SPARK-10301 Clipping nested structs in requested schema") {
test("SPARK-10301 requested schema clipping - same schema") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1)
df.write.parquet(path)
val userDefinedSchema =
new StructType()
.add(
"s",
new StructType()
.add("a", LongType, nullable = true)
.add("b", LongType, nullable = true),
nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
Row(Row(0L, 1L)))
}
}
// This test case is ignored because of parquet-mr bug PARQUET-370
ignore("SPARK-10301 requested schema clipping - schemas with disjoint sets of fields") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1)
df.write.parquet(path)
val userDefinedSchema =
new StructType()
.add(
"s",
new StructType()
.add("c", LongType, nullable = true)
.add("d", LongType, nullable = true),
nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
Row(Row(null, null)))
}
}
test("SPARK-10301 requested schema clipping - requested schema contains physical schema") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1)
df.write.parquet(path)
val userDefinedSchema =
new StructType()
.add(
"s",
new StructType()
.add("a", LongType, nullable = true)
.add("b", LongType, nullable = true)
.add("c", LongType, nullable = true)
.add("d", LongType, nullable = true),
nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
Row(Row(0L, 1L, null, null)))
}
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'd', id + 3) AS s").coalesce(1)
df.write.parquet(path)
val userDefinedSchema =
new StructType()
.add(
"s",
new StructType()
.add("a", LongType, nullable = true)
.add("b", LongType, nullable = true)
.add("c", LongType, nullable = true)
.add("d", LongType, nullable = true),
nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
Row(Row(0L, null, null, 3L)))
}
}
test("SPARK-10301 requested schema clipping - physical schema contains requested schema") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext
.range(1)
.selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
.selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2, 'd', id + 3) AS s")
.coalesce(1)
df.write.mode("append").parquet(path)
df.write.parquet(path)
val userDefinedSchema = new StructType()
.add("s", new StructType().add("a", LongType, nullable = true), nullable = true)
val userDefinedSchema =
new StructType()
.add(
"s",
new StructType()
.add("a", LongType, nullable = true)
.add("b", LongType, nullable = true),
nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
Row(Row(0)))
Row(Row(0L, 1L)))
}
withTempPath { dir =>
val path = dir.getCanonicalPath
val df1 = sqlContext
val df = sqlContext
.range(1)
.selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
.selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2, 'd', id + 3) AS s")
.coalesce(1)
val df2 = sqlContext
.range(1, 2)
.selectExpr("NAMED_STRUCT('b', id, 'c', id) AS s")
.coalesce(1)
df.write.parquet(path)
df1.write.parquet(path)
df2.write.mode(SaveMode.Append).parquet(path)
val userDefinedSchema = new StructType()
.add("s",
new StructType()
.add("a", LongType, nullable = true)
.add("c", LongType, nullable = true),
nullable = true)
val userDefinedSchema =
new StructType()
.add(
"s",
new StructType()
.add("a", LongType, nullable = true)
.add("d", LongType, nullable = true),
nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
Seq(
Row(Row(0, null)),
Row(Row(null, 1))))
Row(Row(0L, 3L)))
}
}
test("SPARK-10301 requested schema clipping - schemas overlap but don't contain each other") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext
.range(1)
.selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s")
.coalesce(1)
df.write.parquet(path)
val userDefinedSchema =
new StructType()
.add(
"s",
new StructType()
.add("b", LongType, nullable = true)
.add("c", LongType, nullable = true)
.add("d", LongType, nullable = true),
nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
Row(Row(1L, 2L, null)))
}
}
test("SPARK-10301 requested schema clipping - deeply nested struct") {
withTempPath { dir =>
val path = dir.getCanonicalPath
@ -304,4 +421,132 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
Row(Row(Seq(Row(0, null)))))
}
}
test("SPARK-10301 requested schema clipping - out of order") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df1 = sqlContext
.range(1)
.selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s")
.coalesce(1)
val df2 = sqlContext
.range(1, 2)
.selectExpr("NAMED_STRUCT('c', id + 2, 'b', id + 1, 'd', id + 3) AS s")
.coalesce(1)
df1.write.parquet(path)
df2.write.mode(SaveMode.Append).parquet(path)
val userDefinedSchema = new StructType()
.add("s",
new StructType()
.add("a", LongType, nullable = true)
.add("b", LongType, nullable = true)
.add("d", LongType, nullable = true),
nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
Seq(
Row(Row(0, 1, null)),
Row(Row(null, 2, 4))))
}
}
test("SPARK-10301 requested schema clipping - schema merging") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df1 = sqlContext
.range(1)
.selectExpr("NAMED_STRUCT('a', id, 'c', id + 2) AS s")
.coalesce(1)
val df2 = sqlContext
.range(1, 2)
.selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s")
.coalesce(1)
df1.write.mode(SaveMode.Append).parquet(path)
df2.write.mode(SaveMode.Append).parquet(path)
checkAnswer(
sqlContext
.read
.option("mergeSchema", "true")
.parquet(path)
.selectExpr("s.a", "s.b", "s.c"),
Seq(
Row(0, null, 2),
Row(1, 2, 3)))
}
}
test("SPARK-10301 requested schema clipping - UDT") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext
.range(1)
.selectExpr(
"""NAMED_STRUCT(
| 'f0', CAST(id AS STRING),
| 'f1', NAMED_STRUCT(
| 'a', CAST(id + 1 AS INT),
| 'b', CAST(id + 2 AS LONG),
| 'c', CAST(id + 3.5 AS DOUBLE)
| )
|) AS s
""".stripMargin)
.coalesce(1)
df.write.mode(SaveMode.Append).parquet(path)
val userDefinedSchema =
new StructType()
.add(
"s",
new StructType()
.add("f1", new NestedStructUDT, nullable = true),
nullable = true)
checkAnswer(
sqlContext.read.schema(userDefinedSchema).parquet(path),
Row(Row(NestedStruct(1, 2L, 3.5D))))
}
}
}
object TestingUDT {
@SQLUserDefinedType(udt = classOf[NestedStructUDT])
case class NestedStruct(a: Integer, b: Long, c: Double)
class NestedStructUDT extends UserDefinedType[NestedStruct] {
override def sqlType: DataType =
new StructType()
.add("a", IntegerType, nullable = true)
.add("b", LongType, nullable = false)
.add("c", DoubleType, nullable = false)
override def serialize(obj: Any): Any = {
val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
obj match {
case n: NestedStruct =>
row.setInt(0, n.a)
row.setLong(1, n.b)
row.setDouble(2, n.c)
}
}
override def userClass: Class[NestedStruct] = classOf[NestedStruct]
override def deserialize(datum: Any): NestedStruct = {
datum match {
case row: InternalRow =>
NestedStruct(row.getInt(0), row.getLong(1), row.getDouble(2))
}
}
}
}

View file

@ -1012,12 +1012,17 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
""".stripMargin,
catalystSchema = {
val f11Type = new StructType().add("f011", DoubleType, nullable = true)
val f01Type = ArrayType(StringType, containsNull = false)
val f00Type = ArrayType(StringType, containsNull = false)
val f01Type = ArrayType(
new StructType()
.add("f011", DoubleType, nullable = true),
containsNull = false)
val f0Type = new StructType()
.add("f00", f01Type, nullable = false)
.add("f01", f11Type, nullable = false)
.add("f00", f00Type, nullable = false)
.add("f01", f01Type, nullable = false)
val f1Type = ArrayType(IntegerType, containsNull = true)
new StructType()
.add("f0", f0Type, nullable = false)
.add("f1", f1Type, nullable = true)
@ -1046,7 +1051,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
parquetSchema =
"""message root {
| required group f0 {
| optional group f00 {
| optional group f00 (LIST) {
| repeated binary f00_tuple (UTF8);
| }
|
@ -1061,13 +1066,13 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
""".stripMargin,
catalystSchema = {
val f11ElementType = new StructType()
val f01ElementType = new StructType()
.add("f011", DoubleType, nullable = true)
.add("f012", LongType, nullable = true)
val f0Type = new StructType()
.add("f00", ArrayType(StringType, containsNull = false), nullable = false)
.add("f01", ArrayType(f11ElementType, containsNull = false), nullable = false)
.add("f00", ArrayType(StringType, containsNull = false), nullable = true)
.add("f01", ArrayType(f01ElementType, containsNull = false), nullable = true)
new StructType().add("f0", f0Type, nullable = false)
},
@ -1075,7 +1080,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
expectedSchema =
"""message root {
| required group f0 {
| optional group f00 {
| optional group f00 (LIST) {
| repeated binary f00_tuple (UTF8);
| }
|
@ -1095,7 +1100,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
parquetSchema =
"""message root {
| required group f0 {
| optional group f00 {
| optional group f00 (LIST) {
| repeated binary array (UTF8);
| }
|
@ -1110,13 +1115,13 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
""".stripMargin,
catalystSchema = {
val f11ElementType = new StructType()
val f01ElementType = new StructType()
.add("f011", DoubleType, nullable = true)
.add("f012", LongType, nullable = true)
val f0Type = new StructType()
.add("f00", ArrayType(StringType, containsNull = false), nullable = false)
.add("f01", ArrayType(f11ElementType, containsNull = false), nullable = false)
.add("f00", ArrayType(StringType, containsNull = false), nullable = true)
.add("f01", ArrayType(f01ElementType, containsNull = false), nullable = true)
new StructType().add("f0", f0Type, nullable = false)
},
@ -1124,7 +1129,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
expectedSchema =
"""message root {
| required group f0 {
| optional group f00 {
| optional group f00 (LIST) {
| repeated binary array (UTF8);
| }
|
@ -1236,6 +1241,63 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin)
testSchemaClipping(
"standard array",
parquetSchema =
"""message root {
| required group f0 {
| optional group f00 (LIST) {
| repeated group list {
| required binary element (UTF8);
| }
| }
|
| optional group f01 (LIST) {
| repeated group list {
| required group element {
| optional int32 f010;
| optional double f011;
| }
| }
| }
| }
|}
""".stripMargin,
catalystSchema = {
val f01ElementType = new StructType()
.add("f011", DoubleType, nullable = true)
.add("f012", LongType, nullable = true)
val f0Type = new StructType()
.add("f00", ArrayType(StringType, containsNull = false), nullable = true)
.add("f01", ArrayType(f01ElementType, containsNull = false), nullable = true)
new StructType().add("f0", f0Type, nullable = false)
},
expectedSchema =
"""message root {
| required group f0 {
| optional group f00 (LIST) {
| repeated group list {
| required binary element (UTF8);
| }
| }
|
| optional group f01 (LIST) {
| repeated group list {
| required group element {
| optional double f011;
| optional int64 f012;
| }
| }
| }
| }
|}
""".stripMargin)
testSchemaClipping(
"empty requested schema",
@ -1251,4 +1313,160 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
catalystSchema = new StructType(),
expectedSchema = "message root {}")
testSchemaClipping(
"disjoint field sets",
parquetSchema =
"""message root {
| required group f0 {
| required int32 f00;
| required int64 f01;
| }
|}
""".stripMargin,
catalystSchema =
new StructType()
.add(
"f0",
new StructType()
.add("f02", FloatType, nullable = true)
.add("f03", DoubleType, nullable = true),
nullable = true),
expectedSchema =
"""message root {
| required group f0 {
| optional float f02;
| optional double f03;
| }
|}
""".stripMargin)
testSchemaClipping(
"parquet-avro style map",
parquetSchema =
"""message root {
| required group f0 (MAP) {
| repeated group map (MAP_KEY_VALUE) {
| required int32 key;
| required group value {
| required int32 value_f0;
| required int64 value_f1;
| }
| }
| }
|}
""".stripMargin,
catalystSchema = {
val valueType =
new StructType()
.add("value_f1", LongType, nullable = false)
.add("value_f2", DoubleType, nullable = false)
val f0Type = MapType(IntegerType, valueType, valueContainsNull = false)
new StructType().add("f0", f0Type, nullable = false)
},
expectedSchema =
"""message root {
| required group f0 (MAP) {
| repeated group map (MAP_KEY_VALUE) {
| required int32 key;
| required group value {
| required int64 value_f1;
| required double value_f2;
| }
| }
| }
|}
""".stripMargin)
testSchemaClipping(
"standard map",
parquetSchema =
"""message root {
| required group f0 (MAP) {
| repeated group key_value {
| required int32 key;
| required group value {
| required int32 value_f0;
| required int64 value_f1;
| }
| }
| }
|}
""".stripMargin,
catalystSchema = {
val valueType =
new StructType()
.add("value_f1", LongType, nullable = false)
.add("value_f2", DoubleType, nullable = false)
val f0Type = MapType(IntegerType, valueType, valueContainsNull = false)
new StructType().add("f0", f0Type, nullable = false)
},
expectedSchema =
"""message root {
| required group f0 (MAP) {
| repeated group key_value {
| required int32 key;
| required group value {
| required int64 value_f1;
| required double value_f2;
| }
| }
| }
|}
""".stripMargin)
testSchemaClipping(
"standard map with complex key",
parquetSchema =
"""message root {
| required group f0 (MAP) {
| repeated group key_value {
| required group key {
| required int32 value_f0;
| required int64 value_f1;
| }
| required int32 value;
| }
| }
|}
""".stripMargin,
catalystSchema = {
val keyType =
new StructType()
.add("value_f1", LongType, nullable = false)
.add("value_f2", DoubleType, nullable = false)
val f0Type = MapType(keyType, IntegerType, valueContainsNull = false)
new StructType().add("f0", f0Type, nullable = false)
},
expectedSchema =
"""message root {
| required group f0 (MAP) {
| repeated group key_value {
| required group key {
| required int64 value_f1;
| required double value_f2;
| }
| required int32 value;
| }
| }
|}
""".stripMargin)
}