From 130ec219aa40cd8cebf4105053d4c92d840e127e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 22 May 2015 17:39:01 -0700 Subject: [PATCH] [SPARK-7788] Made KinesisReceiver.onStart() non-blocking KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java. This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false. Author: Tathagata Das Closes #6348 from tdas/SPARK-7788 and squashes the following commits: 2584683 [Tathagata Das] Added receiver id in thread name 6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking (cherry picked from commit 1c388a9985999e043fa002618a357bc8f0a8b65a) Signed-off-by: Tathagata Das --- .../streaming/kinesis/KinesisReceiver.scala | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 800202e9fb..7dd8bfdc2a 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -18,6 +18,8 @@ package org.apache.spark.streaming.kinesis import java.util.UUID +import scala.util.control.NonFatal + import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory} import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker} @@ -98,6 +100,9 @@ private[kinesis] class KinesisReceiver( */ private var worker: Worker = null + /** Thread running the worker */ + private var workerThread: Thread = null + /** * This is called when the KinesisReceiver starts and must be non-blocking. * The KCL creates and manages the receiving/processing thread pool through Worker.run(). @@ -126,8 +131,19 @@ private[kinesis] class KinesisReceiver( } worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration) - worker.run() - + workerThread = new Thread() { + override def run(): Unit = { + try { + worker.run() + } catch { + case NonFatal(e) => + restart("Error running the KCL worker in Receiver", e) + } + } + } + workerThread.setName(s"Kinesis Receiver ${streamId}") + workerThread.setDaemon(true) + workerThread.start() logInfo(s"Started receiver with workerId $workerId") } @@ -137,10 +153,14 @@ private[kinesis] class KinesisReceiver( * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown. */ override def onStop() { - if (worker != null) { - worker.shutdown() + if (workerThread != null) { + if (worker != null) { + worker.shutdown() + worker = null + } + workerThread.join() + workerThread = null logInfo(s"Stopped receiver for workerId $workerId") - worker = null } workerId = null }