[SPARK-15483][SQL] IncrementalExecution should use extra strategies.
## What changes were proposed in this pull request? Extra strategies does not work for streams because `IncrementalExecution` uses modified planner with stateful operations but it does not include extra strategies. This pr fixes `IncrementalExecution` to include extra strategies to use them. ## How was this patch tested? I added a test to check if extra strategies work for streams. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #13261 from ueshin/issues/SPARK-15483.
This commit is contained in:
parent
1cb347fbc4
commit
4b88067416
|
@ -36,7 +36,8 @@ class IncrementalExecution private[sql](
|
||||||
extends QueryExecution(sparkSession, logicalPlan) {
|
extends QueryExecution(sparkSession, logicalPlan) {
|
||||||
|
|
||||||
// TODO: make this always part of planning.
|
// TODO: make this always part of planning.
|
||||||
val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy :: Nil
|
val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +:
|
||||||
|
sparkSession.sessionState.experimentalMethods.extraStrategies
|
||||||
|
|
||||||
// Modified planner with stateful operations.
|
// Modified planner with stateful operations.
|
||||||
override def planner: SparkPlanner =
|
override def planner: SparkPlanner =
|
||||||
|
|
|
@ -220,6 +220,21 @@ class StreamSuite extends StreamTest with SharedSQLContext {
|
||||||
CheckOffsetLogLatestBatchId(2),
|
CheckOffsetLogLatestBatchId(2),
|
||||||
CheckSinkLatestBatchId(2))
|
CheckSinkLatestBatchId(2))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("insert an extraStrategy") {
|
||||||
|
try {
|
||||||
|
spark.experimental.extraStrategies = TestStrategy :: Nil
|
||||||
|
|
||||||
|
val inputData = MemoryStream[(String, Int)]
|
||||||
|
val df = inputData.toDS().map(_._1).toDF("a")
|
||||||
|
|
||||||
|
testStream(df)(
|
||||||
|
AddData(inputData, ("so slow", 1)),
|
||||||
|
CheckAnswer("so fast"))
|
||||||
|
} finally {
|
||||||
|
spark.experimental.extraStrategies = Nil
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue