SPARK-571: forbid return statements in cleaned closures
This patch checks top-level closure arguments to `ClosureCleaner.clean` for `return` statements and raises an exception if it finds any. This is mainly a user-friendliness addition, since programs with return statements in closure arguments will currently fail upon RDD actions with a less-than-intuitive error message. Author: William Benton <willb@redhat.com> Closes #717 from willb/spark-571 and squashes the following commits: c41eb7d [William Benton] Another test case for SPARK-571 30c42f4 [William Benton] Stylistic cleanups 559b16b [William Benton] Stylistic cleanups from review de13b79 [William Benton] Style fixes 295b6a5 [William Benton] Forbid return statements in closure arguments. b017c47 [William Benton] Added a test for SPARK-571
This commit is contained in:
parent
52d905296f
commit
16ffadcc4a
|
@ -25,7 +25,7 @@ import scala.collection.mutable.Set
|
|||
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
|
||||
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
|
||||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.{Logging, SparkException}
|
||||
|
||||
private[spark] object ClosureCleaner extends Logging {
|
||||
// Get an ASM class reader for a given class from the JAR that loaded it
|
||||
|
@ -108,6 +108,9 @@ private[spark] object ClosureCleaner extends Logging {
|
|||
val outerObjects = getOuterObjects(func)
|
||||
|
||||
val accessedFields = Map[Class[_], Set[String]]()
|
||||
|
||||
getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
|
||||
|
||||
for (cls <- outerClasses)
|
||||
accessedFields(cls) = Set[String]()
|
||||
for (cls <- func.getClass :: innerClasses)
|
||||
|
@ -180,6 +183,24 @@ private[spark] object ClosureCleaner extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
private[spark]
|
||||
class ReturnStatementFinder extends ClassVisitor(ASM4) {
|
||||
override def visitMethod(access: Int, name: String, desc: String,
|
||||
sig: String, exceptions: Array[String]): MethodVisitor = {
|
||||
if (name.contains("apply")) {
|
||||
new MethodVisitor(ASM4) {
|
||||
override def visitTypeInsn(op: Int, tp: String) {
|
||||
if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {
|
||||
throw new SparkException("Return statements aren't allowed in Spark closures")
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
new MethodVisitor(ASM4) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[spark]
|
||||
class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
|
||||
override def visitMethod(access: Int, name: String, desc: String,
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.util
|
|||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.LocalSparkContext._
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.{SparkContext, SparkException}
|
||||
|
||||
class ClosureCleanerSuite extends FunSuite {
|
||||
test("closures inside an object") {
|
||||
|
@ -50,6 +50,19 @@ class ClosureCleanerSuite extends FunSuite {
|
|||
val obj = new TestClassWithNesting(1)
|
||||
assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
|
||||
}
|
||||
|
||||
test("toplevel return statements in closures are identified at cleaning time") {
|
||||
val ex = intercept[SparkException] {
|
||||
TestObjectWithBogusReturns.run()
|
||||
}
|
||||
|
||||
assert(ex.getMessage.contains("Return statements aren't allowed in Spark closures"))
|
||||
}
|
||||
|
||||
test("return statements from named functions nested in closures don't raise exceptions") {
|
||||
val result = TestObjectWithNestedReturns.run()
|
||||
assert(result == 1)
|
||||
}
|
||||
}
|
||||
|
||||
// A non-serializable class we create in closures to make sure that we aren't
|
||||
|
@ -108,6 +121,30 @@ class TestClassWithoutFieldAccess {
|
|||
}
|
||||
}
|
||||
|
||||
object TestObjectWithBogusReturns {
|
||||
def run(): Int = {
|
||||
withSpark(new SparkContext("local", "test")) { sc =>
|
||||
val nums = sc.parallelize(Array(1, 2, 3, 4))
|
||||
// this return is invalid since it will transfer control outside the closure
|
||||
nums.map {x => return 1 ; x * 2}
|
||||
1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object TestObjectWithNestedReturns {
|
||||
def run(): Int = {
|
||||
withSpark(new SparkContext("local", "test")) { sc =>
|
||||
val nums = sc.parallelize(Array(1, 2, 3, 4))
|
||||
nums.map {x =>
|
||||
// this return is fine since it will not transfer control outside the closure
|
||||
def foo(): Int = { return 5; 1 }
|
||||
foo()
|
||||
}
|
||||
1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object TestObjectWithNesting {
|
||||
def run(): Int = {
|
||||
|
|
Loading…
Reference in a new issue