[SPARK-30338][SQL] Avoid unnecessary InternalRow copies in ParquetRowConverter

### What changes were proposed in this pull request?

This PR modifies `ParquetRowConverter` to remove unnecessary `InternalRow.copy()` calls for structs that are directly nested in other structs.

### Why are the changes needed?

These changes  can significantly improve performance when reading Parquet files that contain deeply-nested structs with many fields.

The `ParquetRowConverter` uses per-field `Converter`s for handling individual fields. Internally, these converters may have mutable state and may return mutable objects. In most cases, each `converter` is only invoked once per Parquet record (this is true for top-level fields, for example). However, arrays and maps may call their child element converters multiple times per Parquet record: in these cases we must be careful to copy any mutable outputs returned by child converters.

In the existing code, `InternalRow`s are copied whenever they are stored into _any_ parent container (not just maps and arrays). This copying can be especially expensive for deeply-nested fields, since a deep copy is performed at every level of nesting.

This PR modifies the code to avoid copies for structs that are directly nested in structs; see inline code comments for an argument for why this is safe.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

**Correctness**:  I added new test cases to `ParquetIOSuite` to increase coverage of nested structs, including structs nested in arrays: previously this suite didn't test that case, so we used to lack mutation coverage of this `copy()` code (the suite's tests still passed if I incorrectly removed the `.copy()` in all cases). I also added a test for maps with struct keys and modified the existing "map with struct values" test case include maps with two elements (since the incorrect omission of a `copy()` can only be detected if the map has multiple elements).

**Performance**: I put together a simple local benchmark demonstrating the performance problems:

First, construct a nested schema:

```scala
  case class Inner(
    f1: Int,
    f2: Long,
    f3: String,
    f4: Int,
    f5: Long,
    f6: String,
    f7: Int,
    f8: Long,
    f9: String,
    f10: Int
  )

  case class Wrapper1(inner: Inner)
  case class Wrapper2(wrapper1: Wrapper1)
  case class Wrapper3(wrapper2: Wrapper2)
```

`Wrapper3`'s schema looks like:

```
root
 |-- wrapper2: struct (nullable = true)
 |    |-- wrapper1: struct (nullable = true)
 |    |    |-- inner: struct (nullable = true)
 |    |    |    |-- f1: integer (nullable = true)
 |    |    |    |-- f2: long (nullable = true)
 |    |    |    |-- f3: string (nullable = true)
 |    |    |    |-- f4: integer (nullable = true)
 |    |    |    |-- f5: long (nullable = true)
 |    |    |    |-- f6: string (nullable = true)
 |    |    |    |-- f7: integer (nullable = true)
 |    |    |    |-- f8: long (nullable = true)
 |    |    |    |-- f9: string (nullable = true)
 |    |    |    |-- f10: integer (nullable = true)
```

Next, generate some fake data:

```scala
  val data = spark.range(1, 1000 * 1000 * 25, 1, 1).map { i =>
    Wrapper3(Wrapper2(Wrapper1(Inner(
      i.toInt,
      i * 2,
      (i * 3).toString,
      (i * 4).toInt,
      i * 5,
      (i * 6).toString,
      (i * 7).toInt,
      i * 8,
      (i * 9).toString,
      (i * 10).toInt
    ))))
  }

  data.write.mode("overwrite").parquet("/tmp/parquet-test")
```

I then ran a simple benchmark consisting of

```
spark.read.parquet("/tmp/parquet-test").selectExpr("hash(*)").rdd.count()
```

where the `hash(*)` is designed to force decoding of all Parquet fields but avoids `RowEncoder` costs in the `.rdd.count()` stage.

In the old code, expensive copying takes place at every level of nesting; this is apparent in the following flame graph:

![image](https://user-images.githubusercontent.com/50748/71389014-88a15380-25af-11ea-9537-3e87a2aef179.png)

After this PR's changes, the above toy benchmark runs ~30% faster.

Closes #26993 from JoshRosen/joshrosen/faster-parquet-nested-scan-by-avoiding-copies.

Lead-authored-by: Josh Rosen <rosenville@gmail.com>
Co-authored-by: Josh Rosen <joshrosen@stripe.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Josh Rosen 2020-01-07 13:01:37 +08:00 committed by Wenchen Fan
parent da076153aa
commit 93d3ab88cd
2 changed files with 90 additions and 5 deletions

View file

@ -318,10 +318,34 @@ private[parquet] class ParquetRowConverter(
new ParquetMapConverter(parquetType.asGroupType(), t, updater)
case t: StructType =>
val wrappedUpdater = {
// SPARK-30338: avoid unnecessary InternalRow copying for nested structs:
// There are two cases to handle here:
//
// 1. Parent container is a map or array: we must make a deep copy of the mutable row
// because this converter may be invoked multiple times per Parquet input record
// (if the map or array contains multiple elements).
//
// 2. Parent container is a struct: we don't need to copy the row here because either:
//
// (a) all ancestors are structs and therefore no copying is required because this
// converter will only be invoked once per Parquet input record, or
// (b) some ancestor is struct that is nested in a map or array and that ancestor's
// converter will perform deep-copying (which will recursively copy this row).
if (updater.isInstanceOf[RowUpdater]) {
// `updater` is a RowUpdater, implying that the parent container is a struct.
updater
} else {
// `updater` is NOT a RowUpdater, implying that the parent container a map or array.
new ParentContainerUpdater {
override def set(value: Any): Unit = {
updater.set(value.asInstanceOf[SpecificInternalRow].copy()) // deep copy
}
}
}
}
new ParquetRowConverter(
schemaConverter, parquetType.asGroupType(), t, convertTz, new ParentContainerUpdater {
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})
schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater)
case t =>
throw new RuntimeException(

View file

@ -204,6 +204,42 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
testStandardAndLegacyModes("array of struct") {
val data = (1 to 4).map { i =>
Tuple1(
Seq(
Tuple1(s"1st_val_$i"),
Tuple1(s"2nd_val_$i")
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(array) =>
Row(array.map(struct => Row(struct.productIterator.toSeq: _*)))
})
}
}
testStandardAndLegacyModes("array of nested struct") {
val data = (1 to 4).map { i =>
Tuple1(
Seq(
Tuple1(
Tuple1(s"1st_val_$i")),
Tuple1(
Tuple1(s"2nd_val_$i"))
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(array) =>
Row(array.map { case Tuple1(Tuple1(str)) => Row(Row(str))})
})
}
}
testStandardAndLegacyModes("nested struct with array of array as field") {
val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
withParquetDataFrame(data) { df =>
@ -214,9 +250,34 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
testStandardAndLegacyModes("nested map with struct as value type") {
val data = (1 to 4).map(i => Tuple1(Map(i -> ((i, s"val_$i")))))
testStandardAndLegacyModes("nested map with struct as key type") {
val data = (1 to 4).map { i =>
Tuple1(
Map(
(i, s"kA_$i") -> s"vA_$i",
(i, s"kB_$i") -> s"vB_$i"
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(m) =>
Row(m.map { case (k, v) => Row(k.productIterator.toSeq: _*) -> v })
})
}
}
testStandardAndLegacyModes("nested map with struct as value type") {
val data = (1 to 4).map { i =>
Tuple1(
Map(
s"kA_$i" -> ((i, s"vA_$i")),
s"kB_$i" -> ((i, s"vB_$i"))
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(m) =>
Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
})