[MINOR][SS][TEST] Remove unsupportedOperationCheck setting for TextSocketStreamSuite

### What changes were proposed in this pull request?

This patch simply removes a few `unsupportedOperationCheck` in `TextSocketStreamSuite`.

### Why are the changes needed?

`unsupportedOperationCheck` is used to disable the check of unsupported operations. If we are not to test unsupported operations, it was unnecessarily set in `TextSocketStreamSuite` and could cause unexpected error by missing check.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #34132 from viirya/minor-test.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Liang-Chi Hsieh 2021-09-28 17:45:22 -07:00
parent 05c0fa5738
commit bfcc596398

View file

@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
@ -96,80 +95,76 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession {
serverThread = new ServerThread() serverThread = new ServerThread()
serverThread.start() serverThread.start()
withSQLConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key -> "false") { val ref = spark
val ref = spark import ref.implicits._
import ref.implicits._
val socket = spark val socket = spark
.readStream .readStream
.format("socket") .format("socket")
.options(Map("host" -> "localhost", "port" -> serverThread.port.toString)) .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
.load() .load()
.as[String] .as[String]
assert(socket.schema === StructType(StructField("value", StringType) :: Nil)) assert(socket.schema === StructType(StructField("value", StringType) :: Nil))
testStream(socket)( testStream(socket)(
StartStream(), StartStream(),
AddSocketData("hello"), AddSocketData("hello"),
CheckAnswer("hello"), CheckAnswer("hello"),
AddSocketData("world"), AddSocketData("world"),
CheckLastBatch("world"), CheckLastBatch("world"),
CheckAnswer("hello", "world"), CheckAnswer("hello", "world"),
StopStream StopStream
) )
}
} }
test("timestamped usage") { test("timestamped usage") {
serverThread = new ServerThread() serverThread = new ServerThread()
serverThread.start() serverThread.start()
withSQLConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key -> "false") { val socket = spark
val socket = spark .readStream
.readStream .format("socket")
.format("socket") .options(Map(
.options(Map( "host" -> "localhost",
"host" -> "localhost", "port" -> serverThread.port.toString,
"port" -> serverThread.port.toString, "includeTimestamp" -> "true"))
"includeTimestamp" -> "true")) .load()
.load()
assert(socket.schema === StructType(StructField("value", StringType) :: assert(socket.schema === StructType(StructField("value", StringType) ::
StructField("timestamp", TimestampType) :: Nil)) StructField("timestamp", TimestampType) :: Nil))
var batch1Stamp: Timestamp = null var batch1Stamp: Timestamp = null
var batch2Stamp: Timestamp = null var batch2Stamp: Timestamp = null
val curr = System.currentTimeMillis() val curr = System.currentTimeMillis()
testStream(socket)( testStream(socket)(
StartStream(), StartStream(),
AddSocketData("hello"), AddSocketData("hello"),
CheckAnswerRowsByFunc( CheckAnswerRowsByFunc(
rows => { rows => {
assert(rows.size === 1) assert(rows.size === 1)
assert(rows.head.getAs[String](0) === "hello") assert(rows.head.getAs[String](0) === "hello")
batch1Stamp = rows.head.getAs[Timestamp](1) batch1Stamp = rows.head.getAs[Timestamp](1)
Thread.sleep(10) Thread.sleep(10)
}, },
true), true),
AddSocketData("world"), AddSocketData("world"),
CheckAnswerRowsByFunc( CheckAnswerRowsByFunc(
rows => { rows => {
assert(rows.size === 1) assert(rows.size === 1)
assert(rows.head.getAs[String](0) === "world") assert(rows.head.getAs[String](0) === "world")
batch2Stamp = rows.head.getAs[Timestamp](1) batch2Stamp = rows.head.getAs[Timestamp](1)
}, },
true), true),
StopStream StopStream
) )
// Timestamp for rate stream is round to second which leads to milliseconds lost, that will // Timestamp for rate stream is round to second which leads to milliseconds lost, that will
// make batch1stamp smaller than current timestamp if both of them are in the same second. // make batch1stamp smaller than current timestamp if both of them are in the same second.
// Comparing by second to make sure the correct behavior. // Comparing by second to make sure the correct behavior.
assert(batch1Stamp.getTime >= SECONDS.toMillis(MILLISECONDS.toSeconds(curr))) assert(batch1Stamp.getTime >= SECONDS.toMillis(MILLISECONDS.toSeconds(curr)))
assert(!batch2Stamp.before(batch1Stamp)) assert(!batch2Stamp.before(batch1Stamp))
}
} }
test("params not given") { test("params not given") {
@ -209,51 +204,67 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession {
serverThread = new ServerThread() serverThread = new ServerThread()
serverThread.start() serverThread.start()
withSQLConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key -> "false") { val ref = spark
val ref = spark import ref.implicits._
import ref.implicits._
val socket = spark val socket = spark
.readStream .readStream
.format("socket") .format("socket")
.options(Map("host" -> "localhost", "port" -> serverThread.port.toString)) .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
.load() .load()
.as[String] .as[String]
assert(socket.schema === StructType(StructField("value", StringType) :: Nil)) assert(socket.schema === StructType(StructField("value", StringType) :: Nil))
testStream(socket)( testStream(socket)(
StartStream(), StartStream(),
AddSocketData("hello"), AddSocketData("hello"),
CheckAnswer("hello"), CheckAnswer("hello"),
AssertOnQuery { q => AssertOnQuery { q =>
val numRowMetric = val numRowMetric =
q.lastExecution.executedPlan.collectLeaves().head.metrics.get("numOutputRows") q.lastExecution.executedPlan.collectLeaves().head.metrics.get("numOutputRows")
numRowMetric.nonEmpty && numRowMetric.get.value == 1 numRowMetric.nonEmpty && numRowMetric.get.value == 1
}, },
StopStream StopStream
) )
}
} }
test("verify ServerThread only accepts the first connection") { test("verify ServerThread only accepts the first connection") {
serverThread = new ServerThread() serverThread = new ServerThread()
serverThread.start() serverThread.start()
withSQLConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key -> "false") { val ref = spark
val ref = spark import ref.implicits._
import ref.implicits._
val socket = spark val socket = spark
.readStream
.format("socket")
.options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
.load()
.as[String]
assert(socket.schema === StructType(StructField("value", StringType) :: Nil))
testStream(socket)(
StartStream(),
AddSocketData("hello"),
CheckAnswer("hello"),
AddSocketData("world"),
CheckLastBatch("world"),
CheckAnswer("hello", "world"),
StopStream
)
// we are trying to connect to the server once again which should fail
try {
val socket2 = spark
.readStream .readStream
.format("socket") .format("socket")
.options(Map("host" -> "localhost", "port" -> serverThread.port.toString)) .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
.load() .load()
.as[String] .as[String]
assert(socket.schema === StructType(StructField("value", StringType) :: Nil)) testStream(socket2)(
testStream(socket)(
StartStream(), StartStream(),
AddSocketData("hello"), AddSocketData("hello"),
CheckAnswer("hello"), CheckAnswer("hello"),
@ -263,29 +274,9 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession {
StopStream StopStream
) )
// we are trying to connect to the server once again which should fail fail("StreamingQueryException is expected!")
try { } catch {
val socket2 = spark case e: StreamingQueryException if e.cause.isInstanceOf[SocketException] => // pass
.readStream
.format("socket")
.options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
.load()
.as[String]
testStream(socket2)(
StartStream(),
AddSocketData("hello"),
CheckAnswer("hello"),
AddSocketData("world"),
CheckLastBatch("world"),
CheckAnswer("hello", "world"),
StopStream
)
fail("StreamingQueryException is expected!")
} catch {
case e: StreamingQueryException if e.cause.isInstanceOf[SocketException] => // pass
}
} }
} }