[SPARK-7760] add /json back into master & worker pages; add test

Author: Imran Rashid <irashid@cloudera.com>

Closes #6284 from squito/SPARK-7760 and squashes the following commits:

5e02d8a [Imran Rashid] style; increase timeout
9987399 [Imran Rashid] comment
8c7ed63 [Imran Rashid] add /json back into master & worker pages; add test

(cherry picked from commit 821254fb94)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
This commit is contained in:
Imran Rashid 2015-05-22 16:05:07 -07:00 committed by Josh Rosen
parent d6cb044630
commit afde4019b8
3 changed files with 37 additions and 3 deletions

View file

@ -43,6 +43,8 @@ class LocalSparkCluster(
private val localHostname = Utils.localHostName()
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
// exposed for testing
var masterWebUIPort = -1
def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
@ -53,7 +55,9 @@ class LocalSparkCluster(
.set("spark.shuffle.service.enabled", "false")
/* Start the Master */
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
val (masterSystem, masterPort, webUiPort, _) =
Master.startSystemAndActor(localHostname, 0, 0, _conf)
masterWebUIPort = webUiPort
masterActorSystems += masterSystem
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
val masters = Array(masterUrl)

View file

@ -77,7 +77,10 @@ private[spark] abstract class WebUI(
val pagePath = "/" + page.prefix
val renderHandler = createServletHandler(pagePath,
(request: HttpServletRequest) => page.render(request), securityManager, basePath)
val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
(request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)
attachHandler(renderHandler)
attachHandler(renderJsonHandler)
pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
.append(renderHandler)
}

View file

@ -21,16 +21,20 @@ import java.util.Date
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.io.Source
import scala.language.postfixOps
import akka.actor.Address
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.scalatest.{FunSuite, Matchers}
import org.scalatest.concurrent.Eventually
import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
import org.apache.spark.deploy._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy._
class MasterSuite extends FunSuite with Matchers {
class MasterSuite extends FunSuite with Matchers with Eventually {
test("toAkkaUrl") {
val conf = new SparkConf(loadDefaults = false)
@ -157,4 +161,27 @@ class MasterSuite extends FunSuite with Matchers {
CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
}
test("Master & worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
try {
eventually(timeout(5 seconds), interval(100 milliseconds)) {
val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
.getLines().mkString("\n")
val JArray(workers) = (parse(json) \ "workers")
workers.size should be (2)
workers.foreach { workerSummaryJson =>
val JString(workerWebUi) = workerSummaryJson \ "webuiaddress"
val workerResponse = parse(Source.fromURL(s"${workerWebUi}/json")
.getLines().mkString("\n"))
(workerResponse \ "cores").extract[Int] should be (2)
}
}
} finally {
localCluster.stop()
}
}
}