Added the BytecodeUtils class for analyzing bytecode.
This commit is contained in:
parent
8abf4a8447
commit
ae12d163dc
|
@ -204,7 +204,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
* @param mapFunc the function applied to each edge adjacent to each vertex.
|
||||
* The mapFunc can optionally return None in which case it does not
|
||||
* contribute to the final sum.
|
||||
* @param mergeFunc the function used to merge the results of each map
|
||||
* @param reduceFunc the function used to merge the results of each map
|
||||
* operation.
|
||||
* @param default the default value to use for each vertex if it has no
|
||||
* neighbors or the map function repeatedly evaluates to none
|
||||
|
@ -247,7 +247,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
*
|
||||
* @tparam U the type of entry in the table of updates
|
||||
* @tparam VD2 the new vertex value type
|
||||
* @param tlb the table to join with the vertices in the graph. The table
|
||||
* @param table the table to join with the vertices in the graph. The table
|
||||
* should contain at most one entry for each vertex.
|
||||
* @param mapFunc the function used to compute the new vertex values. The
|
||||
* map function is invoked for all vertices, even those that do not have a
|
||||
|
@ -282,7 +282,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
* map function is skipped and the old value is used.
|
||||
*
|
||||
* @tparam U the type of entry in the table of updates
|
||||
* @param tlb the table to join with the vertices in the graph. The table
|
||||
* @param table the table to join with the vertices in the graph. The table
|
||||
* should contain at most one entry for each vertex.
|
||||
* @param mapFunc the function used to compute the new vertex values. The
|
||||
* map function is invoked only for vertices with a corresponding entry in
|
||||
|
|
113
graph/src/main/scala/spark/graph/util/BytecodeUtils.scala
Normal file
113
graph/src/main/scala/spark/graph/util/BytecodeUtils.scala
Normal file
|
@ -0,0 +1,113 @@
|
|||
package spark.graph.util
|
||||
|
||||
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
|
||||
|
||||
import scala.collection.mutable.HashSet
|
||||
|
||||
import org.objectweb.asm.{ClassReader, MethodVisitor}
|
||||
import org.objectweb.asm.commons.EmptyVisitor
|
||||
import org.objectweb.asm.Opcodes._
|
||||
|
||||
import spark.Utils
|
||||
|
||||
|
||||
private[graph] object BytecodeUtils {
|
||||
|
||||
/**
|
||||
* Test whether the given closure invokes the specified method in the specified class.
|
||||
*/
|
||||
def invokedMethod(closure: AnyRef, targetClass: Class[_], targetMethod: String): Boolean = {
|
||||
if (_invokedMethod(closure.getClass, "apply", targetClass, targetMethod)) {
|
||||
true
|
||||
} else {
|
||||
// look at closures enclosed in this closure
|
||||
for (f <- closure.getClass.getDeclaredFields
|
||||
if f.getType.getName.startsWith("scala.Function")) {
|
||||
f.setAccessible(true)
|
||||
if (invokedMethod(f.get(closure), targetClass, targetMethod)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
private def _invokedMethod(cls: Class[_], method: String,
|
||||
targetClass: Class[_], targetMethod: String): Boolean = {
|
||||
|
||||
val seen = new HashSet[(Class[_], String)]
|
||||
var stack = List[(Class[_], String)]((cls, method))
|
||||
|
||||
while (stack.nonEmpty) {
|
||||
val (c, m) = stack.head
|
||||
stack = stack.tail
|
||||
seen.add((c, m))
|
||||
val finder = new MethodInvocationFinder(c.getName, m)
|
||||
getClassReader(c).accept(finder, 0)
|
||||
for (classMethod <- finder.methodsInvoked) {
|
||||
println(classMethod)
|
||||
if (classMethod._1 == targetClass && classMethod._2 == targetMethod) {
|
||||
return true
|
||||
} else if (!seen.contains(classMethod)) {
|
||||
stack = classMethod :: stack
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an ASM class reader for a given class from the JAR that loaded it.
|
||||
*/
|
||||
private def getClassReader(cls: Class[_]): ClassReader = {
|
||||
// Copy data over, before delegating to ClassReader - else we can run out of open file handles.
|
||||
val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
|
||||
val resourceStream = cls.getResourceAsStream(className)
|
||||
// todo: Fixme - continuing with earlier behavior ...
|
||||
if (resourceStream == null) return new ClassReader(resourceStream)
|
||||
|
||||
val baos = new ByteArrayOutputStream(128)
|
||||
Utils.copyStream(resourceStream, baos, true)
|
||||
new ClassReader(new ByteArrayInputStream(baos.toByteArray))
|
||||
}
|
||||
|
||||
/**
|
||||
* Given the class name, return whether we should look into the class or not. This is used to
|
||||
* skip examing a large quantity of Java or Scala classes that we know for sure wouldn't access
|
||||
* the closures. Note that the class name is expected in ASM style (i.e. use "/" instead of ".").
|
||||
*/
|
||||
private def skipClass(className: String): Boolean = {
|
||||
val c = className
|
||||
c.startsWith("java/") || c.startsWith("scala/") || c.startsWith("javax/")
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the set of methods invoked by the specified method in the specified class.
|
||||
* For example, after running the visitor,
|
||||
* MethodInvocationFinder("spark/graph/Foo", "test")
|
||||
* its methodsInvoked variable will contain the set of methods invoked directly by
|
||||
* Foo.test(). Interface invocations are not returned as part of the result set because we cannot
|
||||
* determine the actual metod invoked by inspecting the bytecode.
|
||||
*/
|
||||
private class MethodInvocationFinder(className: String, methodName: String) extends EmptyVisitor {
|
||||
|
||||
val methodsInvoked = new HashSet[(Class[_], String)]
|
||||
|
||||
override def visitMethod(access: Int, name: String, desc: String,
|
||||
sig: String, exceptions: Array[String]): MethodVisitor = {
|
||||
if (name == methodName) {
|
||||
new EmptyVisitor {
|
||||
override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) {
|
||||
if (op == INVOKEVIRTUAL || op == INVOKESPECIAL || op == INVOKESTATIC) {
|
||||
if (!skipClass(owner)) {
|
||||
methodsInvoked.add((Class.forName(owner.replace("/", ".")), name))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,41 +6,41 @@ import spark.SparkContext
|
|||
|
||||
|
||||
class GraphSuite extends FunSuite with LocalSparkContext {
|
||||
|
||||
test("graph partitioner") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two")))
|
||||
val edges = sc.parallelize(Seq(Edge(1, 2, "onlyedge")))
|
||||
var g = new Graph(vertices, edges)
|
||||
|
||||
g = g.withPartitioner(4, 7)
|
||||
assert(g.numVertexPartitions === 4)
|
||||
assert(g.numEdgePartitions === 7)
|
||||
|
||||
g = g.withVertexPartitioner(5)
|
||||
assert(g.numVertexPartitions === 5)
|
||||
|
||||
g = g.withEdgePartitioner(8)
|
||||
assert(g.numEdgePartitions === 8)
|
||||
|
||||
g = g.mapVertices(x => x)
|
||||
assert(g.numVertexPartitions === 5)
|
||||
assert(g.numEdgePartitions === 8)
|
||||
|
||||
g = g.mapEdges(x => x)
|
||||
assert(g.numVertexPartitions === 5)
|
||||
assert(g.numEdgePartitions === 8)
|
||||
|
||||
val updates = sc.parallelize(Seq((1, " more")))
|
||||
g = g.updateVertices(
|
||||
updates,
|
||||
(v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data)
|
||||
assert(g.numVertexPartitions === 5)
|
||||
assert(g.numEdgePartitions === 8)
|
||||
|
||||
g = g.reverse
|
||||
assert(g.numVertexPartitions === 5)
|
||||
assert(g.numEdgePartitions === 8)
|
||||
|
||||
}
|
||||
//
|
||||
// test("graph partitioner") {
|
||||
// sc = new SparkContext("local", "test")
|
||||
// val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two")))
|
||||
// val edges = sc.parallelize(Seq(Edge(1, 2, "onlyedge")))
|
||||
// var g = new Graph(vertices, edges)
|
||||
//
|
||||
// g = g.withPartitioner(4, 7)
|
||||
// assert(g.numVertexPartitions === 4)
|
||||
// assert(g.numEdgePartitions === 7)
|
||||
//
|
||||
// g = g.withVertexPartitioner(5)
|
||||
// assert(g.numVertexPartitions === 5)
|
||||
//
|
||||
// g = g.withEdgePartitioner(8)
|
||||
// assert(g.numEdgePartitions === 8)
|
||||
//
|
||||
// g = g.mapVertices(x => x)
|
||||
// assert(g.numVertexPartitions === 5)
|
||||
// assert(g.numEdgePartitions === 8)
|
||||
//
|
||||
// g = g.mapEdges(x => x)
|
||||
// assert(g.numVertexPartitions === 5)
|
||||
// assert(g.numEdgePartitions === 8)
|
||||
//
|
||||
// val updates = sc.parallelize(Seq((1, " more")))
|
||||
// g = g.updateVertices(
|
||||
// updates,
|
||||
// (v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data)
|
||||
// assert(g.numVertexPartitions === 5)
|
||||
// assert(g.numEdgePartitions === 8)
|
||||
//
|
||||
// g = g.reverse
|
||||
// assert(g.numVertexPartitions === 5)
|
||||
// assert(g.numEdgePartitions === 8)
|
||||
//
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
package spark.graph.util
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
|
||||
class BytecodeUtilsSuite extends FunSuite {
|
||||
|
||||
import BytecodeUtilsSuite.TestClass
|
||||
|
||||
test("closure invokes a method") {
|
||||
val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); }
|
||||
assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
|
||||
assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
|
||||
assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
|
||||
|
||||
val c2 = {e: TestClass => println(e.foo); println(e.bar); }
|
||||
assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo"))
|
||||
assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar"))
|
||||
assert(!BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz"))
|
||||
|
||||
val c3 = {e: TestClass => println(e.foo); }
|
||||
assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo"))
|
||||
assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar"))
|
||||
assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz"))
|
||||
}
|
||||
|
||||
test("closure inside a closure invokes a method") {
|
||||
val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); }
|
||||
val c2 = {e: TestClass => c1(e); println(e.foo); }
|
||||
assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo"))
|
||||
assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar"))
|
||||
assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz"))
|
||||
}
|
||||
|
||||
test("closure inside a closure inside a closure invokes a method") {
|
||||
val c1 = {e: TestClass => println(e.baz); }
|
||||
val c2 = {e: TestClass => c1(e); println(e.foo); }
|
||||
val c3 = {e: TestClass => c2(e) }
|
||||
assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo"))
|
||||
assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar"))
|
||||
assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz"))
|
||||
}
|
||||
|
||||
test("closure calling a function that invokes a method") {
|
||||
def zoo(e: TestClass) {
|
||||
println(e.baz)
|
||||
}
|
||||
val c1 = {e: TestClass => zoo(e)}
|
||||
assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
|
||||
assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
|
||||
assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
|
||||
}
|
||||
|
||||
test("closure calling a function that invokes a method which uses another closure") {
|
||||
val c2 = {e: TestClass => println(e.baz)}
|
||||
def zoo(e: TestClass) {
|
||||
c2(e)
|
||||
}
|
||||
val c1 = {e: TestClass => zoo(e)}
|
||||
assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
|
||||
assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
|
||||
assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
|
||||
}
|
||||
|
||||
test("nested closure") {
|
||||
val c2 = {e: TestClass => println(e.baz)}
|
||||
def zoo(e: TestClass, c: TestClass => Unit) {
|
||||
c(e)
|
||||
}
|
||||
val c1 = {e: TestClass => zoo(e, c2)}
|
||||
assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
|
||||
assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
|
||||
assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
|
||||
}
|
||||
|
||||
// The following doesn't work yet, because the byte code doesn't contain any information
|
||||
// about what exactly "c" is.
|
||||
// test("invoke interface") {
|
||||
// val c1 = {e: TestClass => c(e)}
|
||||
// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
|
||||
// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
|
||||
// assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
|
||||
// }
|
||||
|
||||
private val c = {e: TestClass => println(e.baz)}
|
||||
}
|
||||
|
||||
|
||||
object BytecodeUtilsSuite {
|
||||
class TestClass(val foo: Int, val bar: Long) {
|
||||
def baz: Boolean = false
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue