[SPARK-28878][SQL][FOLLOWUP] Remove extra project for DSv2 streaming scan

### What changes were proposed in this pull request?

Remove the project node if the streaming scan is columnar

### Why are the changes needed?

This is a followup of https://github.com/apache/spark/pull/25586. Batch and streaming share the same DS v2 read API so both can support columnar reads. We should apply #25586 to streaming scan as well.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

existing tests

Closes #25727 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2019-09-10 11:01:57 +08:00
parent 86fc890d8c
commit c2d8ee9c54

View file

@ -155,17 +155,30 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined =>
val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
// ensure there is a projection, which will produce unsafe rows required by some operators
ProjectExec(r.output,
MicroBatchScanExec(
r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil
val scanExec = MicroBatchScanExec(
r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)
val withProjection = if (scanExec.supportsColumnar) {
scanExec
} else {
// Add a Project here to make sure we produce unsafe rows.
ProjectExec(r.output, scanExec)
}
withProjection :: Nil
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty =>
val continuousStream = r.stream.asInstanceOf[ContinuousStream]
// ensure there is a projection, which will produce unsafe rows required by some operators
ProjectExec(r.output,
ContinuousScanExec(
r.output, r.scan, continuousStream, r.startOffset.get)) :: Nil
val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
val withProjection = if (scanExec.supportsColumnar) {
scanExec
} else {
// Add a Project here to make sure we produce unsafe rows.
ProjectExec(r.output, scanExec)
}
withProjection :: Nil
case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil