[SPARK-11462][STREAMING] Add JavaStreamingListener
Currently, StreamingListener is not Java friendly because it exposes some Scala collections to Java users directly, such as Option, Map. This PR added a Java version of StreamingListener and a bunch of Java friendly classes for Java users. Author: zsxwing <zsxwing@gmail.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #9420 from zsxwing/java-streaming-listener.
This commit is contained in:
parent
0ce6f9b2d2
commit
1f0f14efe3
|
@ -0,0 +1,168 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.streaming.api.java
|
||||
|
||||
import org.apache.spark.streaming.Time
|
||||
|
||||
/**
|
||||
* A listener interface for receiving information about an ongoing streaming computation.
|
||||
*/
|
||||
private[streaming] class JavaStreamingListener {
|
||||
|
||||
/** Called when a receiver has been started */
|
||||
def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { }
|
||||
|
||||
/** Called when a receiver has reported an error */
|
||||
def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = { }
|
||||
|
||||
/** Called when a receiver has been stopped */
|
||||
def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = { }
|
||||
|
||||
/** Called when a batch of jobs has been submitted for processing. */
|
||||
def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = { }
|
||||
|
||||
/** Called when processing of a batch of jobs has started. */
|
||||
def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = { }
|
||||
|
||||
/** Called when processing of a batch of jobs has completed. */
|
||||
def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = { }
|
||||
|
||||
/** Called when processing of a job of a batch has started. */
|
||||
def onOutputOperationStarted(
|
||||
outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = { }
|
||||
|
||||
/** Called when processing of a job of a batch has completed. */
|
||||
def onOutputOperationCompleted(
|
||||
outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = { }
|
||||
}
|
||||
|
||||
/**
|
||||
* Base trait for events related to JavaStreamingListener
|
||||
*/
|
||||
private[streaming] sealed trait JavaStreamingListenerEvent
|
||||
|
||||
private[streaming] class JavaStreamingListenerBatchSubmitted(val batchInfo: JavaBatchInfo)
|
||||
extends JavaStreamingListenerEvent
|
||||
|
||||
private[streaming] class JavaStreamingListenerBatchCompleted(val batchInfo: JavaBatchInfo)
|
||||
extends JavaStreamingListenerEvent
|
||||
|
||||
private[streaming] class JavaStreamingListenerBatchStarted(val batchInfo: JavaBatchInfo)
|
||||
extends JavaStreamingListenerEvent
|
||||
|
||||
private[streaming] class JavaStreamingListenerOutputOperationStarted(
|
||||
val outputOperationInfo: JavaOutputOperationInfo) extends JavaStreamingListenerEvent
|
||||
|
||||
private[streaming] class JavaStreamingListenerOutputOperationCompleted(
|
||||
val outputOperationInfo: JavaOutputOperationInfo) extends JavaStreamingListenerEvent
|
||||
|
||||
private[streaming] class JavaStreamingListenerReceiverStarted(val receiverInfo: JavaReceiverInfo)
|
||||
extends JavaStreamingListenerEvent
|
||||
|
||||
private[streaming] class JavaStreamingListenerReceiverError(val receiverInfo: JavaReceiverInfo)
|
||||
extends JavaStreamingListenerEvent
|
||||
|
||||
private[streaming] class JavaStreamingListenerReceiverStopped(val receiverInfo: JavaReceiverInfo)
|
||||
extends JavaStreamingListenerEvent
|
||||
|
||||
/**
|
||||
* Class having information on batches.
|
||||
*
|
||||
* @param batchTime Time of the batch
|
||||
* @param streamIdToInputInfo A map of input stream id to its input info
|
||||
* @param submissionTime Clock time of when jobs of this batch was submitted to the streaming
|
||||
* scheduler queue
|
||||
* @param processingStartTime Clock time of when the first job of this batch started processing.
|
||||
* `-1` means the batch has not yet started
|
||||
* @param processingEndTime Clock time of when the last job of this batch finished processing. `-1`
|
||||
* means the batch has not yet completed.
|
||||
* @param schedulingDelay Time taken for the first job of this batch to start processing from the
|
||||
* time this batch was submitted to the streaming scheduler. Essentially, it
|
||||
* is `processingStartTime` - `submissionTime`. `-1` means the batch has not
|
||||
* yet started
|
||||
* @param processingDelay Time taken for the all jobs of this batch to finish processing from the
|
||||
* time they started processing. Essentially, it is
|
||||
* `processingEndTime` - `processingStartTime`. `-1` means the batch has not
|
||||
* yet completed.
|
||||
* @param totalDelay Time taken for all the jobs of this batch to finish processing from the time
|
||||
* they were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
|
||||
* `-1` means the batch has not yet completed.
|
||||
* @param numRecords The number of recorders received by the receivers in this batch
|
||||
* @param outputOperationInfos The output operations in this batch
|
||||
*/
|
||||
private[streaming] case class JavaBatchInfo(
|
||||
batchTime: Time,
|
||||
streamIdToInputInfo: java.util.Map[Int, JavaStreamInputInfo],
|
||||
submissionTime: Long,
|
||||
processingStartTime: Long,
|
||||
processingEndTime: Long,
|
||||
schedulingDelay: Long,
|
||||
processingDelay: Long,
|
||||
totalDelay: Long,
|
||||
numRecords: Long,
|
||||
outputOperationInfos: java.util.Map[Int, JavaOutputOperationInfo])
|
||||
|
||||
/**
|
||||
* Track the information of input stream at specified batch time.
|
||||
*
|
||||
* @param inputStreamId the input stream id
|
||||
* @param numRecords the number of records in a batch
|
||||
* @param metadata metadata for this batch. It should contain at least one standard field named
|
||||
* "Description" which maps to the content that will be shown in the UI.
|
||||
* @param metadataDescription description of this input stream
|
||||
*/
|
||||
private[streaming] case class JavaStreamInputInfo(
|
||||
inputStreamId: Int,
|
||||
numRecords: Long,
|
||||
metadata: java.util.Map[String, Any],
|
||||
metadataDescription: String)
|
||||
|
||||
/**
|
||||
* Class having information about a receiver
|
||||
*/
|
||||
private[streaming] case class JavaReceiverInfo(
|
||||
streamId: Int,
|
||||
name: String,
|
||||
active: Boolean,
|
||||
location: String,
|
||||
lastErrorMessage: String,
|
||||
lastError: String,
|
||||
lastErrorTime: Long)
|
||||
|
||||
/**
|
||||
* Class having information on output operations.
|
||||
*
|
||||
* @param batchTime Time of the batch
|
||||
* @param id Id of this output operation. Different output operations have different ids in a batch.
|
||||
* @param name The name of this output operation.
|
||||
* @param description The description of this output operation.
|
||||
* @param startTime Clock time of when the output operation started processing. `-1` means the
|
||||
* output operation has not yet started
|
||||
* @param endTime Clock time of when the output operation started processing. `-1` means the output
|
||||
* operation has not yet completed
|
||||
* @param failureReason Failure reason if this output operation fails. If the output operation is
|
||||
* successful, this field is `null`.
|
||||
*/
|
||||
private[streaming] case class JavaOutputOperationInfo(
|
||||
batchTime: Time,
|
||||
id: Int,
|
||||
name: String,
|
||||
description: String,
|
||||
startTime: Long,
|
||||
endTime: Long,
|
||||
failureReason: String)
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.streaming.api.java
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.spark.streaming.scheduler._
|
||||
|
||||
/**
|
||||
* A wrapper to convert a [[JavaStreamingListener]] to a [[StreamingListener]].
|
||||
*/
|
||||
private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: JavaStreamingListener)
|
||||
extends StreamingListener {
|
||||
|
||||
private def toJavaReceiverInfo(receiverInfo: ReceiverInfo): JavaReceiverInfo = {
|
||||
JavaReceiverInfo(
|
||||
receiverInfo.streamId,
|
||||
receiverInfo.name,
|
||||
receiverInfo.active,
|
||||
receiverInfo.location,
|
||||
receiverInfo.lastErrorMessage,
|
||||
receiverInfo.lastError,
|
||||
receiverInfo.lastErrorTime
|
||||
)
|
||||
}
|
||||
|
||||
private def toJavaStreamInputInfo(streamInputInfo: StreamInputInfo): JavaStreamInputInfo = {
|
||||
JavaStreamInputInfo(
|
||||
streamInputInfo.inputStreamId,
|
||||
streamInputInfo.numRecords: Long,
|
||||
streamInputInfo.metadata.asJava,
|
||||
streamInputInfo.metadataDescription.orNull
|
||||
)
|
||||
}
|
||||
|
||||
private def toJavaOutputOperationInfo(
|
||||
outputOperationInfo: OutputOperationInfo): JavaOutputOperationInfo = {
|
||||
JavaOutputOperationInfo(
|
||||
outputOperationInfo.batchTime,
|
||||
outputOperationInfo.id,
|
||||
outputOperationInfo.name,
|
||||
outputOperationInfo.description: String,
|
||||
outputOperationInfo.startTime.getOrElse(-1),
|
||||
outputOperationInfo.endTime.getOrElse(-1),
|
||||
outputOperationInfo.failureReason.orNull
|
||||
)
|
||||
}
|
||||
|
||||
private def toJavaBatchInfo(batchInfo: BatchInfo): JavaBatchInfo = {
|
||||
JavaBatchInfo(
|
||||
batchInfo.batchTime,
|
||||
batchInfo.streamIdToInputInfo.mapValues(toJavaStreamInputInfo(_)).asJava,
|
||||
batchInfo.submissionTime,
|
||||
batchInfo.processingStartTime.getOrElse(-1),
|
||||
batchInfo.processingEndTime.getOrElse(-1),
|
||||
batchInfo.schedulingDelay.getOrElse(-1),
|
||||
batchInfo.processingDelay.getOrElse(-1),
|
||||
batchInfo.totalDelay.getOrElse(-1),
|
||||
batchInfo.numRecords,
|
||||
batchInfo.outputOperationInfos.mapValues(toJavaOutputOperationInfo(_)).asJava
|
||||
)
|
||||
}
|
||||
|
||||
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
|
||||
javaStreamingListener.onReceiverStarted(
|
||||
new JavaStreamingListenerReceiverStarted(toJavaReceiverInfo(receiverStarted.receiverInfo)))
|
||||
}
|
||||
|
||||
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
|
||||
javaStreamingListener.onReceiverError(
|
||||
new JavaStreamingListenerReceiverError(toJavaReceiverInfo(receiverError.receiverInfo)))
|
||||
}
|
||||
|
||||
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {
|
||||
javaStreamingListener.onReceiverStopped(
|
||||
new JavaStreamingListenerReceiverStopped(toJavaReceiverInfo(receiverStopped.receiverInfo)))
|
||||
}
|
||||
|
||||
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
|
||||
javaStreamingListener.onBatchSubmitted(
|
||||
new JavaStreamingListenerBatchSubmitted(toJavaBatchInfo(batchSubmitted.batchInfo)))
|
||||
}
|
||||
|
||||
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
|
||||
javaStreamingListener.onBatchStarted(
|
||||
new JavaStreamingListenerBatchStarted(toJavaBatchInfo(batchStarted.batchInfo)))
|
||||
}
|
||||
|
||||
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
|
||||
javaStreamingListener.onBatchCompleted(
|
||||
new JavaStreamingListenerBatchCompleted(toJavaBatchInfo(batchCompleted.batchInfo)))
|
||||
}
|
||||
|
||||
override def onOutputOperationStarted(
|
||||
outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
|
||||
javaStreamingListener.onOutputOperationStarted(new JavaStreamingListenerOutputOperationStarted(
|
||||
toJavaOutputOperationInfo(outputOperationStarted.outputOperationInfo)))
|
||||
}
|
||||
|
||||
override def onOutputOperationCompleted(
|
||||
outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
|
||||
javaStreamingListener.onOutputOperationCompleted(
|
||||
new JavaStreamingListenerOutputOperationCompleted(
|
||||
toJavaOutputOperationInfo(outputOperationCompleted.outputOperationInfo)))
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.spark.streaming;
|
||||
|
||||
import org.apache.spark.streaming.api.java.*;
|
||||
|
||||
public class JavaStreamingListenerAPISuite extends JavaStreamingListener {
|
||||
|
||||
@Override
|
||||
public void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted) {
|
||||
JavaReceiverInfo receiverInfo = receiverStarted.receiverInfo();
|
||||
receiverInfo.streamId();
|
||||
receiverInfo.name();
|
||||
receiverInfo.active();
|
||||
receiverInfo.location();
|
||||
receiverInfo.lastErrorMessage();
|
||||
receiverInfo.lastError();
|
||||
receiverInfo.lastErrorTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceiverError(JavaStreamingListenerReceiverError receiverError) {
|
||||
JavaReceiverInfo receiverInfo = receiverError.receiverInfo();
|
||||
receiverInfo.streamId();
|
||||
receiverInfo.name();
|
||||
receiverInfo.active();
|
||||
receiverInfo.location();
|
||||
receiverInfo.lastErrorMessage();
|
||||
receiverInfo.lastError();
|
||||
receiverInfo.lastErrorTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceiverStopped(JavaStreamingListenerReceiverStopped receiverStopped) {
|
||||
JavaReceiverInfo receiverInfo = receiverStopped.receiverInfo();
|
||||
receiverInfo.streamId();
|
||||
receiverInfo.name();
|
||||
receiverInfo.active();
|
||||
receiverInfo.location();
|
||||
receiverInfo.lastErrorMessage();
|
||||
receiverInfo.lastError();
|
||||
receiverInfo.lastErrorTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchSubmitted(JavaStreamingListenerBatchSubmitted batchSubmitted) {
|
||||
super.onBatchSubmitted(batchSubmitted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchStarted(JavaStreamingListenerBatchStarted batchStarted) {
|
||||
super.onBatchStarted(batchStarted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
|
||||
super.onBatchCompleted(batchCompleted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOutputOperationStarted(JavaStreamingListenerOutputOperationStarted outputOperationStarted) {
|
||||
super.onOutputOperationStarted(outputOperationStarted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOutputOperationCompleted(JavaStreamingListenerOutputOperationCompleted outputOperationCompleted) {
|
||||
super.onOutputOperationCompleted(outputOperationCompleted);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,290 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.streaming.api.java
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.streaming.Time
|
||||
import org.apache.spark.streaming.scheduler._
|
||||
|
||||
class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
|
||||
|
||||
test("basic") {
|
||||
val listener = new TestJavaStreamingListener()
|
||||
val listenerWrapper = new JavaStreamingListenerWrapper(listener)
|
||||
|
||||
val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
|
||||
streamId = 2,
|
||||
name = "test",
|
||||
active = true,
|
||||
location = "localhost"
|
||||
))
|
||||
listenerWrapper.onReceiverStarted(receiverStarted)
|
||||
assertReceiverInfo(listener.receiverStarted.receiverInfo, receiverStarted.receiverInfo)
|
||||
|
||||
val receiverStopped = StreamingListenerReceiverStopped(ReceiverInfo(
|
||||
streamId = 2,
|
||||
name = "test",
|
||||
active = false,
|
||||
location = "localhost"
|
||||
))
|
||||
listenerWrapper.onReceiverStopped(receiverStopped)
|
||||
assertReceiverInfo(listener.receiverStopped.receiverInfo, receiverStopped.receiverInfo)
|
||||
|
||||
val receiverError = StreamingListenerReceiverError(ReceiverInfo(
|
||||
streamId = 2,
|
||||
name = "test",
|
||||
active = false,
|
||||
location = "localhost",
|
||||
lastErrorMessage = "failed",
|
||||
lastError = "failed",
|
||||
lastErrorTime = System.currentTimeMillis()
|
||||
))
|
||||
listenerWrapper.onReceiverError(receiverError)
|
||||
assertReceiverInfo(listener.receiverError.receiverInfo, receiverError.receiverInfo)
|
||||
|
||||
val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo(
|
||||
batchTime = Time(1000L),
|
||||
streamIdToInputInfo = Map(
|
||||
0 -> StreamInputInfo(
|
||||
inputStreamId = 0,
|
||||
numRecords = 1000,
|
||||
metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
|
||||
1 -> StreamInputInfo(
|
||||
inputStreamId = 1,
|
||||
numRecords = 2000,
|
||||
metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
|
||||
submissionTime = 1001L,
|
||||
None,
|
||||
None,
|
||||
outputOperationInfos = Map(
|
||||
0 -> OutputOperationInfo(
|
||||
batchTime = Time(1000L),
|
||||
id = 0,
|
||||
name = "op1",
|
||||
description = "operation1",
|
||||
startTime = None,
|
||||
endTime = None,
|
||||
failureReason = None),
|
||||
1 -> OutputOperationInfo(
|
||||
batchTime = Time(1000L),
|
||||
id = 1,
|
||||
name = "op2",
|
||||
description = "operation2",
|
||||
startTime = None,
|
||||
endTime = None,
|
||||
failureReason = None))
|
||||
))
|
||||
listenerWrapper.onBatchSubmitted(batchSubmitted)
|
||||
assertBatchInfo(listener.batchSubmitted.batchInfo, batchSubmitted.batchInfo)
|
||||
|
||||
val batchStarted = StreamingListenerBatchStarted(BatchInfo(
|
||||
batchTime = Time(1000L),
|
||||
streamIdToInputInfo = Map(
|
||||
0 -> StreamInputInfo(
|
||||
inputStreamId = 0,
|
||||
numRecords = 1000,
|
||||
metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
|
||||
1 -> StreamInputInfo(
|
||||
inputStreamId = 1,
|
||||
numRecords = 2000,
|
||||
metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
|
||||
submissionTime = 1001L,
|
||||
Some(1002L),
|
||||
None,
|
||||
outputOperationInfos = Map(
|
||||
0 -> OutputOperationInfo(
|
||||
batchTime = Time(1000L),
|
||||
id = 0,
|
||||
name = "op1",
|
||||
description = "operation1",
|
||||
startTime = Some(1003L),
|
||||
endTime = None,
|
||||
failureReason = None),
|
||||
1 -> OutputOperationInfo(
|
||||
batchTime = Time(1000L),
|
||||
id = 1,
|
||||
name = "op2",
|
||||
description = "operation2",
|
||||
startTime = Some(1005L),
|
||||
endTime = None,
|
||||
failureReason = None))
|
||||
))
|
||||
listenerWrapper.onBatchStarted(batchStarted)
|
||||
assertBatchInfo(listener.batchStarted.batchInfo, batchStarted.batchInfo)
|
||||
|
||||
val batchCompleted = StreamingListenerBatchCompleted(BatchInfo(
|
||||
batchTime = Time(1000L),
|
||||
streamIdToInputInfo = Map(
|
||||
0 -> StreamInputInfo(
|
||||
inputStreamId = 0,
|
||||
numRecords = 1000,
|
||||
metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
|
||||
1 -> StreamInputInfo(
|
||||
inputStreamId = 1,
|
||||
numRecords = 2000,
|
||||
metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
|
||||
submissionTime = 1001L,
|
||||
Some(1002L),
|
||||
Some(1010L),
|
||||
outputOperationInfos = Map(
|
||||
0 -> OutputOperationInfo(
|
||||
batchTime = Time(1000L),
|
||||
id = 0,
|
||||
name = "op1",
|
||||
description = "operation1",
|
||||
startTime = Some(1003L),
|
||||
endTime = Some(1004L),
|
||||
failureReason = None),
|
||||
1 -> OutputOperationInfo(
|
||||
batchTime = Time(1000L),
|
||||
id = 1,
|
||||
name = "op2",
|
||||
description = "operation2",
|
||||
startTime = Some(1005L),
|
||||
endTime = Some(1010L),
|
||||
failureReason = None))
|
||||
))
|
||||
listenerWrapper.onBatchCompleted(batchCompleted)
|
||||
assertBatchInfo(listener.batchCompleted.batchInfo, batchCompleted.batchInfo)
|
||||
|
||||
val outputOperationStarted = StreamingListenerOutputOperationStarted(OutputOperationInfo(
|
||||
batchTime = Time(1000L),
|
||||
id = 0,
|
||||
name = "op1",
|
||||
description = "operation1",
|
||||
startTime = Some(1003L),
|
||||
endTime = None,
|
||||
failureReason = None
|
||||
))
|
||||
listenerWrapper.onOutputOperationStarted(outputOperationStarted)
|
||||
assertOutputOperationInfo(listener.outputOperationStarted.outputOperationInfo,
|
||||
outputOperationStarted.outputOperationInfo)
|
||||
|
||||
val outputOperationCompleted = StreamingListenerOutputOperationCompleted(OutputOperationInfo(
|
||||
batchTime = Time(1000L),
|
||||
id = 0,
|
||||
name = "op1",
|
||||
description = "operation1",
|
||||
startTime = Some(1003L),
|
||||
endTime = Some(1004L),
|
||||
failureReason = None
|
||||
))
|
||||
listenerWrapper.onOutputOperationCompleted(outputOperationCompleted)
|
||||
assertOutputOperationInfo(listener.outputOperationCompleted.outputOperationInfo,
|
||||
outputOperationCompleted.outputOperationInfo)
|
||||
}
|
||||
|
||||
private def assertReceiverInfo(
|
||||
javaReceiverInfo: JavaReceiverInfo, receiverInfo: ReceiverInfo): Unit = {
|
||||
assert(javaReceiverInfo.streamId === receiverInfo.streamId)
|
||||
assert(javaReceiverInfo.name === receiverInfo.name)
|
||||
assert(javaReceiverInfo.active === receiverInfo.active)
|
||||
assert(javaReceiverInfo.location === receiverInfo.location)
|
||||
assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage)
|
||||
assert(javaReceiverInfo.lastError === receiverInfo.lastError)
|
||||
assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime)
|
||||
}
|
||||
|
||||
private def assertBatchInfo(javaBatchInfo: JavaBatchInfo, batchInfo: BatchInfo): Unit = {
|
||||
assert(javaBatchInfo.batchTime === batchInfo.batchTime)
|
||||
assert(javaBatchInfo.streamIdToInputInfo.size === batchInfo.streamIdToInputInfo.size)
|
||||
batchInfo.streamIdToInputInfo.foreach { case (streamId, streamInputInfo) =>
|
||||
assertStreamingInfo(javaBatchInfo.streamIdToInputInfo.get(streamId), streamInputInfo)
|
||||
}
|
||||
assert(javaBatchInfo.submissionTime === batchInfo.submissionTime)
|
||||
assert(javaBatchInfo.processingStartTime === batchInfo.processingStartTime.getOrElse(-1))
|
||||
assert(javaBatchInfo.processingEndTime === batchInfo.processingEndTime.getOrElse(-1))
|
||||
assert(javaBatchInfo.schedulingDelay === batchInfo.schedulingDelay.getOrElse(-1))
|
||||
assert(javaBatchInfo.processingDelay === batchInfo.processingDelay.getOrElse(-1))
|
||||
assert(javaBatchInfo.totalDelay === batchInfo.totalDelay.getOrElse(-1))
|
||||
assert(javaBatchInfo.numRecords === batchInfo.numRecords)
|
||||
assert(javaBatchInfo.outputOperationInfos.size === batchInfo.outputOperationInfos.size)
|
||||
batchInfo.outputOperationInfos.foreach { case (outputOperationId, outputOperationInfo) =>
|
||||
assertOutputOperationInfo(
|
||||
javaBatchInfo.outputOperationInfos.get(outputOperationId), outputOperationInfo)
|
||||
}
|
||||
}
|
||||
|
||||
private def assertStreamingInfo(
|
||||
javaStreamInputInfo: JavaStreamInputInfo, streamInputInfo: StreamInputInfo): Unit = {
|
||||
assert(javaStreamInputInfo.inputStreamId === streamInputInfo.inputStreamId)
|
||||
assert(javaStreamInputInfo.numRecords === streamInputInfo.numRecords)
|
||||
assert(javaStreamInputInfo.metadata === streamInputInfo.metadata.asJava)
|
||||
assert(javaStreamInputInfo.metadataDescription === streamInputInfo.metadataDescription.orNull)
|
||||
}
|
||||
|
||||
private def assertOutputOperationInfo(
|
||||
javaOutputOperationInfo: JavaOutputOperationInfo,
|
||||
outputOperationInfo: OutputOperationInfo): Unit = {
|
||||
assert(javaOutputOperationInfo.batchTime === outputOperationInfo.batchTime)
|
||||
assert(javaOutputOperationInfo.id === outputOperationInfo.id)
|
||||
assert(javaOutputOperationInfo.name === outputOperationInfo.name)
|
||||
assert(javaOutputOperationInfo.description === outputOperationInfo.description)
|
||||
assert(javaOutputOperationInfo.startTime === outputOperationInfo.startTime.getOrElse(-1))
|
||||
assert(javaOutputOperationInfo.endTime === outputOperationInfo.endTime.getOrElse(-1))
|
||||
assert(javaOutputOperationInfo.failureReason === outputOperationInfo.failureReason.orNull)
|
||||
}
|
||||
}
|
||||
|
||||
class TestJavaStreamingListener extends JavaStreamingListener {
|
||||
|
||||
var receiverStarted: JavaStreamingListenerReceiverStarted = null
|
||||
var receiverError: JavaStreamingListenerReceiverError = null
|
||||
var receiverStopped: JavaStreamingListenerReceiverStopped = null
|
||||
var batchSubmitted: JavaStreamingListenerBatchSubmitted = null
|
||||
var batchStarted: JavaStreamingListenerBatchStarted = null
|
||||
var batchCompleted: JavaStreamingListenerBatchCompleted = null
|
||||
var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
|
||||
var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
|
||||
|
||||
override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
|
||||
this.receiverStarted = receiverStarted
|
||||
}
|
||||
|
||||
override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
|
||||
this.receiverError = receiverError
|
||||
}
|
||||
|
||||
override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
|
||||
this.receiverStopped = receiverStopped
|
||||
}
|
||||
|
||||
override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
|
||||
this.batchSubmitted = batchSubmitted
|
||||
}
|
||||
|
||||
override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
|
||||
this.batchStarted = batchStarted
|
||||
}
|
||||
|
||||
override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
|
||||
this.batchCompleted = batchCompleted
|
||||
}
|
||||
|
||||
override def onOutputOperationStarted(
|
||||
outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
|
||||
this.outputOperationStarted = outputOperationStarted
|
||||
}
|
||||
|
||||
override def onOutputOperationCompleted(
|
||||
outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
|
||||
this.outputOperationCompleted = outputOperationCompleted
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue