[SPARK-23247][SQL] combines Unsafe operations and statistics operations in Scan Data Source

## What changes were proposed in this pull request?

Currently, we scan the execution plan of the data source, first the unsafe operation of each row of data, and then re traverse the data for the count of rows. In terms of performance, this is not necessary. this PR combines the two operations and makes statistics on the number of rows while performing the unsafe operation.

Before modified,

```
val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
 proj.initialize(index)
iter.map(proj)
}

val numOutputRows = longMetric("numOutputRows")
unsafeRow.map { r =>
numOutputRows += 1
 r
}
```
After modified,

    val numOutputRows = longMetric("numOutputRows")

    rdd.mapPartitionsWithIndexInternal { (index, iter) =>
      val proj = UnsafeProjection.create(schema)
      proj.initialize(index)
      iter.map( r => {
        numOutputRows += 1
        proj(r)
      })
    }

## How was this patch tested?

the existed test cases.

Author: caoxuewen <cao.xuewen@zte.com.cn>

Closes #20415 from heary-cao/DataSourceScanExec.
This commit is contained in:
caoxuewen 2018-02-01 12:05:12 +08:00 committed by Wenchen Fan
parent 52e00f7066
commit 2ac895be90

View file

@ -90,16 +90,15 @@ case class RowDataSourceScanExec(
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
protected override def doExecute(): RDD[InternalRow] = {
val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
val numOutputRows = longMetric("numOutputRows")
rdd.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}
val numOutputRows = longMetric("numOutputRows")
unsafeRow.map { r =>
numOutputRows += 1
r
iter.map( r => {
numOutputRows += 1
proj(r)
})
}
}
@ -326,22 +325,22 @@ case class FileSourceScanExec(
// 2) the number of columns should be smaller than spark.sql.codegen.maxFields
WholeStageCodegenExec(this)(codegenStageId = 0).execute()
} else {
val unsafeRows = {
val scan = inputRDD
if (needsUnsafeRowConversion) {
scan.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}
} else {
scan
}
}
val numOutputRows = longMetric("numOutputRows")
unsafeRows.map { r =>
numOutputRows += 1
r
if (needsUnsafeRowConversion) {
inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map( r => {
numOutputRows += 1
proj(r)
})
}
} else {
inputRDD.map { r =>
numOutputRows += 1
r
}
}
}
}