[SPARK-35566][SS] Fix StateStoreRestoreExec output rows

### What changes were proposed in this pull request?

This is a minor change to update how `StateStoreRestoreExec` computes its number of output rows. Previously we only count input rows, but the optionally restored rows are not counted in.

### Why are the changes needed?

Currently the number of output rows of `StateStoreRestoreExec` only counts the each input row. But it actually outputs input rows + optional restored rows. We should provide correct number of output rows.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32703 from viirya/fix-outputrows.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
Liang-Chi Hsieh 2021-05-31 16:45:56 +09:00 committed by Hyukjin Kwon
parent c225196be0
commit 73ba4492b1

View file

@ -263,8 +263,9 @@ case class StateStoreRestoreExec(
iter.flatMap { row => iter.flatMap { row =>
val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
val restoredRow = stateManager.get(store, key) val restoredRow = stateManager.get(store, key)
numOutputRows += 1 val outputRows = Option(restoredRow).toSeq :+ row
Option(restoredRow).toSeq :+ row numOutputRows += outputRows.size
outputRows
} }
} }
} }