[SPARK-27971][SQL][R] MapPartitionsInRWithArrowExec.evaluate shouldn't eagerly read the first batch

## What changes were proposed in this pull request?

This PR is the same fix as https://github.com/apache/spark/pull/24816 but in vectorized `dapply` in SparkR.

## How was this patch tested?

Manually tested.

Closes #24818 from HyukjinKwon/SPARK-27971.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
HyukjinKwon 2019-06-09 11:40:20 +09:00
parent e561e92765
commit 6dcf09becc

View file

@ -243,28 +243,11 @@ case class MapPartitionsInRWithArrowExec(
// binary in a batch due to the limitation of R API. See also ARROW-4512.
val columnarBatchIter = runner.compute(batchIter, -1)
val outputProject = UnsafeProjection.create(output, output)
new Iterator[InternalRow] {
private var currentIter = if (columnarBatchIter.hasNext) {
val batch = columnarBatchIter.next()
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType())
assert(outputTypes == actualDataTypes, "Invalid schema from dapply(): " +
s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}")
batch.rowIterator.asScala
} else {
Iterator.empty
}
override def hasNext: Boolean = currentIter.hasNext || {
if (columnarBatchIter.hasNext) {
currentIter = columnarBatchIter.next().rowIterator.asScala
hasNext
} else {
false
}
}
override def next(): InternalRow = currentIter.next()
columnarBatchIter.flatMap { batch =>
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType())
assert(outputTypes == actualDataTypes, "Invalid schema from dapply(): " +
s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}")
batch.rowIterator.asScala
}.map(outputProject)
}
}