[SPARK-24061][SS] Add TypedFilter support for continuous processing

## What changes were proposed in this pull request?

Add TypedFilter support for continuous processing application.

## How was this patch tested?

unit tests

Author: wangyanlin01 <wangyanlin01@baidu.com>

Closes #21136 from yanlin-Lynn/SPARK-24061.
This commit is contained in:
wangyanlin01 2018-05-01 16:22:52 +08:00 committed by Shixiong Zhu
parent b857fb549f
commit 7bbec0dced
2 changed files with 25 additions and 1 deletions

View file

@ -345,7 +345,8 @@ object UnsupportedOperationChecker {
plan.foreachUp { implicit subPlan =>
subPlan match {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias) =>
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias |
_: TypedFilter) =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")

View file

@ -621,6 +621,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append,
expectedMsgs = Seq("monotonically_increasing_id"))
assertSupportedForContinuousProcessing(
"TypedFilter", TypedFilter(
null,
null,
null,
null,
new TestStreamingRelationV2(attribute)), OutputMode.Append())
/*
=======================================================================================
@ -771,6 +778,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
}
}
/** Assert that the logical plan is supported for continuous procsssing mode */
def assertSupportedForContinuousProcessing(
name: String,
plan: LogicalPlan,
outputMode: OutputMode): Unit = {
test(s"continuous processing - $name: supported") {
UnsupportedOperationChecker.checkForContinuous(plan, outputMode)
}
}
/**
* Assert that the logical plan is not supported inside a streaming plan.
*
@ -840,4 +857,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
def this(attribute: Attribute) = this(Seq(attribute))
override def isStreaming: Boolean = true
}
case class TestStreamingRelationV2(output: Seq[Attribute]) extends LeafNode {
def this(attribute: Attribute) = this(Seq(attribute))
override def isStreaming: Boolean = true
override def nodeName: String = "StreamingRelationV2"
}
}