For SPARK-1082, Use Curator for ZK interaction in standalone cluster

Author: Raymond Liu <raymond.liu@intel.com>

Closes #611 from colorant/curator and squashes the following commits:

7556aa1 [Raymond Liu] Address review comments
af92e1f [Raymond Liu] Fix coding style
964f3c2 [Raymond Liu] Ignore NodeExists exception
6df2966 [Raymond Liu] Rewrite zookeeper client code with curator
This commit is contained in:
Raymond Liu 2014-02-24 23:20:38 -08:00 committed by Aaron Davidson
parent 1f4c7f7ecc
commit c852201ce9
9 changed files with 104 additions and 305 deletions

View file

@ -55,8 +55,8 @@
<artifactId>avro-ipc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>

View file

@ -30,6 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
*/
private[spark] trait LeaderElectionAgent extends Actor {
//TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
val masterActor: ActorRef
}

View file

@ -28,10 +28,6 @@ private[master] object MasterMessages {
case object RevokedLeadership
// Actor System to LeaderElectionAgent
case object CheckLeader
// Actor System to Master
case object CheckForWorkerTimeOut

View file

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.master
import org.apache.spark.{SparkConf, Logging}
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.zookeeper.KeeperException
object SparkCuratorUtil extends Logging {
val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
val ZK_SESSION_TIMEOUT_MILLIS = 60000
val RETRY_WAIT_MILLIS = 5000
val MAX_RECONNECT_ATTEMPTS = 3
def newClient(conf: SparkConf): CuratorFramework = {
val ZK_URL = conf.get("spark.deploy.zookeeper.url")
val zk = CuratorFrameworkFactory.newClient(ZK_URL,
ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))
zk.start()
zk
}
def mkdir(zk: CuratorFramework, path: String) {
if (zk.checkExists().forPath(path) == null) {
try {
zk.create().creatingParentsIfNeeded().forPath(path)
} catch {
case nodeExist: KeeperException.NodeExistsException =>
// do nothing, ignore node existing exception.
case e: Exception => throw e
}
}
}
}

View file

@ -1,205 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.master
import scala.collection.JavaConversions._
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.apache.zookeeper.data.Stat
import org.apache.spark.{Logging, SparkConf}
/**
* Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
* logic. If the ZooKeeper session expires or otherwise dies, a new ZooKeeper session will be
* created. If ZooKeeper remains down after several retries, the given
* [[org.apache.spark.deploy.master.SparkZooKeeperWatcher SparkZooKeeperWatcher]] will be
* informed via zkDown().
*
* Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
* times or a semantic exception is thrown (e.g., "node already exists").
*/
private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
conf: SparkConf) extends Logging {
val ZK_URL = conf.get("spark.deploy.zookeeper.url", "")
val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
val ZK_TIMEOUT_MILLIS = 30000
val RETRY_WAIT_MILLIS = 5000
val ZK_CHECK_PERIOD_MILLIS = 10000
val MAX_RECONNECT_ATTEMPTS = 3
private var zk: ZooKeeper = _
private val watcher = new ZooKeeperWatcher()
private var reconnectAttempts = 0
private var closed = false
/** Connect to ZooKeeper to start the session. Must be called before anything else. */
def connect() {
connectToZooKeeper()
new Thread() {
override def run() = sessionMonitorThread()
}.start()
}
def sessionMonitorThread(): Unit = {
while (!closed) {
Thread.sleep(ZK_CHECK_PERIOD_MILLIS)
if (zk.getState != ZooKeeper.States.CONNECTED) {
reconnectAttempts += 1
val attemptsLeft = MAX_RECONNECT_ATTEMPTS - reconnectAttempts
if (attemptsLeft <= 0) {
logError("Could not connect to ZooKeeper: system failure")
zkWatcher.zkDown()
close()
} else {
logWarning("ZooKeeper connection failed, retrying " + attemptsLeft + " more times...")
connectToZooKeeper()
}
}
}
}
def close() {
if (!closed && zk != null) { zk.close() }
closed = true
}
private def connectToZooKeeper() {
if (zk != null) zk.close()
zk = new ZooKeeper(ZK_URL, ZK_TIMEOUT_MILLIS, watcher)
}
/**
* Attempts to maintain a live ZooKeeper exception despite (very) transient failures.
* Mainly useful for handling the natural ZooKeeper session expiration.
*/
private class ZooKeeperWatcher extends Watcher {
def process(event: WatchedEvent) {
if (closed) { return }
event.getState match {
case KeeperState.SyncConnected =>
reconnectAttempts = 0
zkWatcher.zkSessionCreated()
case KeeperState.Expired =>
connectToZooKeeper()
case KeeperState.Disconnected =>
logWarning("ZooKeeper disconnected, will retry...")
case s => // Do nothing
}
}
}
def create(path: String, bytes: Array[Byte], createMode: CreateMode): String = {
retry {
zk.create(path, bytes, ZK_ACL, createMode)
}
}
def exists(path: String, watcher: Watcher = null): Stat = {
retry {
zk.exists(path, watcher)
}
}
def getChildren(path: String, watcher: Watcher = null): List[String] = {
retry {
zk.getChildren(path, watcher).toList
}
}
def getData(path: String): Array[Byte] = {
retry {
zk.getData(path, false, null)
}
}
def delete(path: String, version: Int = -1): Unit = {
retry {
zk.delete(path, version)
}
}
/**
* Creates the given directory (non-recursively) if it doesn't exist.
* All znodes are created in PERSISTENT mode with no data.
*/
def mkdir(path: String) {
if (exists(path) == null) {
try {
create(path, "".getBytes, CreateMode.PERSISTENT)
} catch {
case e: Exception =>
// If the exception caused the directory not to be created, bubble it up,
// otherwise ignore it.
if (exists(path) == null) { throw e }
}
}
}
/**
* Recursively creates all directories up to the given one.
* All znodes are created in PERSISTENT mode with no data.
*/
def mkdirRecursive(path: String) {
var fullDir = ""
for (dentry <- path.split("/").tail) {
fullDir += "/" + dentry
mkdir(fullDir)
}
}
/**
* Retries the given function up to 3 times. The assumption is that failure is transient,
* UNLESS it is a semantic exception (i.e., trying to get data from a node that doesn't exist),
* in which case the exception will be thrown without retries.
*
* @param fn Block to execute, possibly multiple times.
*/
def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = {
try {
fn
} catch {
case e: KeeperException.NoNodeException => throw e
case e: KeeperException.NodeExistsException => throw e
case e: Exception if n > 0 =>
logError("ZooKeeper exception, " + n + " more retries...", e)
Thread.sleep(RETRY_WAIT_MILLIS)
retry(fn, n-1)
}
}
}
trait SparkZooKeeperWatcher {
/**
* Called whenever a ZK session is created --
* this will occur when we create our first session as well as each time
* the session expires or errors out.
*/
def zkSessionCreated()
/**
* Called if ZK appears to be completely down (i.e., not just a transient error).
* We will no longer attempt to reconnect to ZK, and the SparkZooKeeperSession is considered dead.
*/
def zkDown()
}

View file

@ -18,106 +18,68 @@
package org.apache.spark.deploy.master
import akka.actor.ActorRef
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.EventType
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
masterUrl: String, conf: SparkConf)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
extends LeaderElectionAgent with LeaderLatchListener with Logging {
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private val watcher = new ZooKeeperWatcher()
private val zk = new SparkZooKeeperSession(this, conf)
private var zk: CuratorFramework = _
private var leaderLatch: LeaderLatch = _
private var status = LeadershipStatus.NOT_LEADER
private var myLeaderFile: String = _
private var leaderUrl: String = _
override def preStart() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk.connect()
}
override def zkSessionCreated() {
synchronized {
zk.mkdirRecursive(WORKING_DIR)
myLeaderFile =
zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL)
self ! CheckLeader
}
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)
leaderLatch.start()
}
override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) {
logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS + "...", reason)
Thread.sleep(zk.ZK_TIMEOUT_MILLIS)
logError("LeaderElectionAgent failed...", reason)
super.preRestart(reason, message)
}
override def zkDown() {
logError("ZooKeeper down! LeaderElectionAgent shutting down Master.")
System.exit(1)
}
override def postStop() {
leaderLatch.close()
zk.close()
}
override def receive = {
case CheckLeader => checkLeader()
case _ =>
}
private class ZooKeeperWatcher extends Watcher {
def process(event: WatchedEvent) {
if (event.getType == EventType.NodeDeleted) {
logInfo("Leader file disappeared, a master is down!")
self ! CheckLeader
}
}
}
/** Uses ZK leader election. Navigates several ZK potholes along the way. */
def checkLeader() {
val masters = zk.getChildren(WORKING_DIR).toList
val leader = masters.sorted.head
val leaderFile = WORKING_DIR + "/" + leader
// Setup a watch for the current leader.
zk.exists(leaderFile, watcher)
try {
leaderUrl = new String(zk.getData(leaderFile))
} catch {
// A NoNodeException may be thrown if old leader died since the start of this method call.
// This is fine -- just check again, since we're guaranteed to see the new values.
case e: KeeperException.NoNodeException =>
logInfo("Leader disappeared while reading it -- finding next leader")
checkLeader()
override def isLeader() {
synchronized {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership) {
return
}
// Synchronization used to ensure no interleaving between the creation of a new session and the
// checking of a leader, which could cause us to delete our real leader file erroneously.
synchronized {
val isLeader = myLeaderFile == leaderFile
if (!isLeader && leaderUrl == masterUrl) {
// We found a different master file pointing to this process.
// This can happen in the following two cases:
// (1) The master process was restarted on the same node.
// (2) The ZK server died between creating the file and returning the name of the file.
// For this case, we will end up creating a second file, and MUST explicitly delete the
// first one, since our ZK session is still open.
// Note that this deletion will cause a NodeDeleted event to be fired so we check again for
// leader changes.
assert(leaderFile < myLeaderFile)
logWarning("Cleaning up old ZK master election file that points to this master.")
zk.delete(leaderFile)
} else {
updateLeadershipStatus(isLeader)
logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}
override def notLeader() {
synchronized {
// could have gained leadership by now.
if (leaderLatch.hasLeadership) {
return
}
logInfo("We have lost leadership")
updateLeadershipStatus(false)
}
}
def updateLeadershipStatus(isLeader: Boolean) {

View file

@ -17,36 +17,28 @@
package org.apache.spark.deploy.master
import scala.collection.JavaConversions._
import akka.serialization.Serialization
import org.apache.zookeeper._
import org.apache.zookeeper.CreateMode
import org.apache.spark.{Logging, SparkConf}
class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
extends PersistenceEngine
with SparkZooKeeperWatcher
with Logging
{
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
val zk = SparkCuratorUtil.newClient(conf)
val zk = new SparkZooKeeperSession(this, conf)
zk.connect()
override def zkSessionCreated() {
zk.mkdirRecursive(WORKING_DIR)
}
override def zkDown() {
logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.")
}
SparkCuratorUtil.mkdir(zk, WORKING_DIR)
override def addApplication(app: ApplicationInfo) {
serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
}
override def removeApplication(app: ApplicationInfo) {
zk.delete(WORKING_DIR + "/app_" + app.id)
zk.delete().forPath(WORKING_DIR + "/app_" + app.id)
}
override def addDriver(driver: DriverInfo) {
@ -54,7 +46,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
}
override def removeDriver(driver: DriverInfo) {
zk.delete(WORKING_DIR + "/driver_" + driver.id)
zk.delete().forPath(WORKING_DIR + "/driver_" + driver.id)
}
override def addWorker(worker: WorkerInfo) {
@ -62,7 +54,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
}
override def removeWorker(worker: WorkerInfo) {
zk.delete(WORKING_DIR + "/worker_" + worker.id)
zk.delete().forPath(WORKING_DIR + "/worker_" + worker.id)
}
override def close() {
@ -70,7 +62,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
}
override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted
val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted
val appFiles = sortedFiles.filter(_.startsWith("app_"))
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
@ -83,11 +75,11 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
private def serializeIntoFile(path: String, value: AnyRef) {
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
zk.create(path, serialized, CreateMode.PERSISTENT)
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
}
def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = {
val fileData = zk.getData(WORKING_DIR + "/" + filename)
val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]

View file

@ -393,9 +393,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>org.jboss.netty</groupId>

View file

@ -277,7 +277,7 @@ object SparkBuild extends Build {
"org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib, excludeCommonsLogging, excludeSLF4J),
"org.apache.avro" % "avro" % "1.7.4",
"org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
"org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty),
"org.apache.curator" % "curator-recipes" % "2.4.0" excludeAll(excludeNetty),
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",