diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 800202e9fb..7dd8bfdc2a 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -18,6 +18,8 @@ package org.apache.spark.streaming.kinesis import java.util.UUID +import scala.util.control.NonFatal + import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory} import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker} @@ -98,6 +100,9 @@ private[kinesis] class KinesisReceiver( */ private var worker: Worker = null + /** Thread running the worker */ + private var workerThread: Thread = null + /** * This is called when the KinesisReceiver starts and must be non-blocking. * The KCL creates and manages the receiving/processing thread pool through Worker.run(). @@ -126,8 +131,19 @@ private[kinesis] class KinesisReceiver( } worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration) - worker.run() - + workerThread = new Thread() { + override def run(): Unit = { + try { + worker.run() + } catch { + case NonFatal(e) => + restart("Error running the KCL worker in Receiver", e) + } + } + } + workerThread.setName(s"Kinesis Receiver ${streamId}") + workerThread.setDaemon(true) + workerThread.start() logInfo(s"Started receiver with workerId $workerId") } @@ -137,10 +153,14 @@ private[kinesis] class KinesisReceiver( * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown. */ override def onStop() { - if (worker != null) { - worker.shutdown() + if (workerThread != null) { + if (worker != null) { + worker.shutdown() + worker = null + } + workerThread.join() + workerThread = null logInfo(s"Stopped receiver for workerId $workerId") - worker = null } workerId = null }