Merge pull request #551 from qqsun8819/json-protocol.

[SPARK-1038] Add more fields in JsonProtocol and add tests that verify the JSON itself

This is a PR for SPARK-1038. Two major changes:
1 add some fields to JsonProtocol which is new and important to standalone-related data structures
2 Use Diff in liftweb.json to verity the stringified Json output for detecting someone mod type T to Option[T]

Author: qqsun8819 <jin.oyj@alibaba-inc.com>

Closes #551 and squashes the following commits:

fdf0b4e [qqsun8819] [SPARK-1038] 1. Change code style for more readable according to rxin review 2. change submitdate hard-coded string to a date object toString for more complexiblity
095a26f [qqsun8819] [SPARK-1038] mod according to  review of pwendel, use hard-coded json string for json data validation. Each test use its own json string
0524e41 [qqsun8819] Merge remote-tracking branch 'upstream/master' into json-protocol
d203d5c [qqsun8819] [SPARK-1038] Add more fields in JsonProtocol and add tests that verify the JSON itself
This commit is contained in:
qqsun8819 2014-02-09 13:57:29 -08:00 committed by Reynold Xin
parent 94ccf869aa
commit afc8f3cb9a
2 changed files with 109 additions and 8 deletions

View file

@ -20,7 +20,7 @@ package org.apache.spark.deploy
import net.liftweb.json.JsonDSL._ import net.liftweb.json.JsonDSL._
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo, DriverInfo}
import org.apache.spark.deploy.worker.ExecutorRunner import org.apache.spark.deploy.worker.ExecutorRunner
@ -32,9 +32,12 @@ private[spark] object JsonProtocol {
("webuiaddress" -> obj.webUiAddress) ~ ("webuiaddress" -> obj.webUiAddress) ~
("cores" -> obj.cores) ~ ("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~ ("coresused" -> obj.coresUsed) ~
("coresfree" -> obj.coresFree) ~
("memory" -> obj.memory) ~ ("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~ ("memoryused" -> obj.memoryUsed) ~
("state" -> obj.state.toString) ("memoryfree" -> obj.memoryFree) ~
("state" -> obj.state.toString) ~
("lastheartbeat" -> obj.lastHeartbeat)
} }
def writeApplicationInfo(obj: ApplicationInfo) = { def writeApplicationInfo(obj: ApplicationInfo) = {
@ -54,7 +57,9 @@ private[spark] object JsonProtocol {
("name" -> obj.name) ~ ("name" -> obj.name) ~
("cores" -> obj.maxCores) ~ ("cores" -> obj.maxCores) ~
("memoryperslave" -> obj.memoryPerSlave) ~ ("memoryperslave" -> obj.memoryPerSlave) ~
("user" -> obj.user) ("user" -> obj.user) ~
("sparkhome" -> obj.sparkHome) ~
("command" -> obj.command.toString)
} }
def writeExecutorRunner(obj: ExecutorRunner) = { def writeExecutorRunner(obj: ExecutorRunner) = {
@ -64,6 +69,14 @@ private[spark] object JsonProtocol {
("appdesc" -> writeApplicationDescription(obj.appDesc)) ("appdesc" -> writeApplicationDescription(obj.appDesc))
} }
def writeDriverInfo(obj: DriverInfo) = {
("id" -> obj.id) ~
("starttime" -> obj.startTime.toString) ~
("state" -> obj.state.toString) ~
("cores" -> obj.desc.cores) ~
("memory" -> obj.desc.mem)
}
def writeMasterState(obj: MasterStateResponse) = { def writeMasterState(obj: MasterStateResponse) = {
("url" -> obj.uri) ~ ("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
@ -73,6 +86,7 @@ private[spark] object JsonProtocol {
("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~ ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
("status" -> obj.status.toString) ("status" -> obj.status.toString)
} }

View file

@ -20,8 +20,9 @@ package org.apache.spark.deploy
import java.io.File import java.io.File
import java.util.Date import java.util.Date
import net.liftweb.json.Diff
import net.liftweb.json.{JsonAST, JsonParser} import net.liftweb.json.{JsonAST, JsonParser}
import net.liftweb.json.JsonAST.JValue import net.liftweb.json.JsonAST.{JNothing, JValue}
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
@ -29,24 +30,35 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryStat
import org.apache.spark.deploy.worker.{ExecutorRunner, DriverRunner} import org.apache.spark.deploy.worker.{ExecutorRunner, DriverRunner}
class JsonProtocolSuite extends FunSuite { class JsonProtocolSuite extends FunSuite {
test("writeApplicationInfo") { test("writeApplicationInfo") {
val output = JsonProtocol.writeApplicationInfo(createAppInfo()) val output = JsonProtocol.writeApplicationInfo(createAppInfo())
assertValidJson(output) assertValidJson(output)
assertValidDataInJson(output, JsonParser.parse(JsonConstants.appInfoJsonStr))
} }
test("writeWorkerInfo") { test("writeWorkerInfo") {
val output = JsonProtocol.writeWorkerInfo(createWorkerInfo()) val output = JsonProtocol.writeWorkerInfo(createWorkerInfo())
assertValidJson(output) assertValidJson(output)
assertValidDataInJson(output, JsonParser.parse(JsonConstants.workerInfoJsonStr))
} }
test("writeApplicationDescription") { test("writeApplicationDescription") {
val output = JsonProtocol.writeApplicationDescription(createAppDesc()) val output = JsonProtocol.writeApplicationDescription(createAppDesc())
assertValidJson(output) assertValidJson(output)
assertValidDataInJson(output, JsonParser.parse(JsonConstants.appDescJsonStr))
} }
test("writeExecutorRunner") { test("writeExecutorRunner") {
val output = JsonProtocol.writeExecutorRunner(createExecutorRunner()) val output = JsonProtocol.writeExecutorRunner(createExecutorRunner())
assertValidJson(output) assertValidJson(output)
assertValidDataInJson(output, JsonParser.parse(JsonConstants.executorRunnerJsonStr))
}
test("writeDriverInfo") {
val output = JsonProtocol.writeDriverInfo(createDriverInfo())
assertValidJson(output)
assertValidDataInJson(output, JsonParser.parse(JsonConstants.driverInfoJsonStr))
} }
test("writeMasterState") { test("writeMasterState") {
@ -59,6 +71,7 @@ class JsonProtocolSuite extends FunSuite {
activeDrivers, completedDrivers, RecoveryState.ALIVE) activeDrivers, completedDrivers, RecoveryState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse) val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output) assertValidJson(output)
assertValidDataInJson(output, JsonParser.parse(JsonConstants.masterStateJsonStr))
} }
test("writeWorkerState") { test("writeWorkerState") {
@ -70,6 +83,7 @@ class JsonProtocolSuite extends FunSuite {
finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl") finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
val output = JsonProtocol.writeWorkerState(stateResponse) val output = JsonProtocol.writeWorkerState(stateResponse)
assertValidJson(output) assertValidJson(output)
assertValidDataInJson(output, JsonParser.parse(JsonConstants.workerStateJsonStr))
} }
def createAppDesc(): ApplicationDescription = { def createAppDesc(): ApplicationDescription = {
@ -78,8 +92,10 @@ class JsonProtocolSuite extends FunSuite {
} }
def createAppInfo() : ApplicationInfo = { def createAppInfo() : ApplicationInfo = {
new ApplicationInfo( val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
3, "id", createAppDesc(), new Date(123456789), null, "appUriStr", Int.MaxValue) "id", createAppDesc(), JsonConstants.submitDate, null, "appUriStr", Int.MaxValue)
appInfo.endTime = JsonConstants.currTimeInMillis
appInfo
} }
def createDriverCommand() = new Command( def createDriverCommand() = new Command(
@ -90,10 +106,13 @@ class JsonProtocolSuite extends FunSuite {
def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3, def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
false, createDriverCommand()) false, createDriverCommand())
def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3", createDriverDesc(), new Date()) def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
createDriverDesc(), new Date())
def createWorkerInfo(): WorkerInfo = { def createWorkerInfo(): WorkerInfo = {
new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
} }
def createExecutorRunner(): ExecutorRunner = { def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
@ -111,4 +130,72 @@ class JsonProtocolSuite extends FunSuite {
case e: JsonParser.ParseException => fail("Invalid Json detected", e) case e: JsonParser.ParseException => fail("Invalid Json detected", e)
} }
} }
def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
val Diff(c, a, d) = validateJson diff expectedJson
assert(c === JNothing, "Json changed")
assert(a === JNothing, "Json added")
assert(d === JNothing, "Json deleted")
}
}
object JsonConstants {
val currTimeInMillis = System.currentTimeMillis()
val appInfoStartTime = 3
val submitDate = new Date(123456789)
val appInfoJsonStr =
"""
|{"starttime":3,"id":"id","name":"name","appuiurl":"appUriStr",
|"cores":4,"user":"%s",
|"memoryperslave":1234,"submitdate":"%s",
|"state":"WAITING","duration":%d}
""".format(System.getProperty("user.name", "<unknown>"),
submitDate.toString, (currTimeInMillis - appInfoStartTime)).stripMargin
val workerInfoJsonStr =
"""
|{"id":"id","host":"host","port":8080,
|"webuiaddress":"http://publicAddress:80",
|"cores":4,"coresused":0,"coresfree":4,
|"memory":1234,"memoryused":0,"memoryfree":1234,
|"state":"ALIVE","lastheartbeat":%d}
""".format(currTimeInMillis).stripMargin
val appDescJsonStr =
"""
|{"name":"name","cores":4,"memoryperslave":1234,
|"user":"%s","sparkhome":"sparkHome",
|"command":"Command(mainClass,List(arg1, arg2),Map())"}
""".format(System.getProperty("user.name", "<unknown>")).stripMargin
val executorRunnerJsonStr =
"""
|{"id":123,"memory":1234,"appid":"appId",
|"appdesc":%s}
""".format(appDescJsonStr).stripMargin
val driverInfoJsonStr =
"""
|{"id":"driver-3","starttime":"3","state":"SUBMITTED","cores":3,"memory":100}
""".stripMargin
val masterStateJsonStr =
"""
|{"url":"spark://host:8080",
|"workers":[%s,%s],
|"cores":8,"coresused":0,"memory":2468,"memoryused":0,
|"activeapps":[%s],"completedapps":[],
|"activedrivers":[%s],
|"status":"ALIVE"}
""".format(workerInfoJsonStr, workerInfoJsonStr,
appInfoJsonStr, driverInfoJsonStr).stripMargin
val workerStateJsonStr =
"""
|{"id":"workerId","masterurl":"masterUrl",
|"masterwebuiurl":"masterWebUiUrl",
|"cores":4,"coresused":4,"memory":1234,"memoryused":1234,
|"executors":[],
|"finishedexecutors":[%s,%s]}
""".format(executorRunnerJsonStr, executorRunnerJsonStr).stripMargin
} }