[SPARK-20554][BUILD] Remove usage of scala.language.reflectiveCalls
## What changes were proposed in this pull request? Remove uses of scala.language.reflectiveCalls that are either unnecessary or probably resulting in more complex code. This turned out to be less significant than I thought, but, still worth a touch-up. ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #17949 from srowen/SPARK-20554.
This commit is contained in:
parent
720708ccdd
commit
fc8a2b6ee6
|
@ -19,8 +19,6 @@ package org.apache.spark.storage
|
||||||
|
|
||||||
import java.io.{File, FileWriter}
|
import java.io.{File, FileWriter}
|
||||||
|
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
|
|
||||||
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
|
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
|
||||||
|
|
||||||
import org.apache.spark.{SparkConf, SparkFunSuite}
|
import org.apache.spark.{SparkConf, SparkFunSuite}
|
||||||
|
|
|
@ -17,8 +17,6 @@
|
||||||
|
|
||||||
package org.apache.spark.util.random
|
package org.apache.spark.util.random
|
||||||
|
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
|
|
||||||
import org.apache.commons.math3.stat.inference.ChiSquareTest
|
import org.apache.commons.math3.stat.inference.ChiSquareTest
|
||||||
import org.scalatest.Matchers
|
import org.scalatest.Matchers
|
||||||
|
|
||||||
|
@ -27,26 +25,22 @@ import org.apache.spark.util.Utils.times
|
||||||
|
|
||||||
class XORShiftRandomSuite extends SparkFunSuite with Matchers {
|
class XORShiftRandomSuite extends SparkFunSuite with Matchers {
|
||||||
|
|
||||||
private def fixture = new {
|
|
||||||
val seed = 1L
|
|
||||||
val xorRand = new XORShiftRandom(seed)
|
|
||||||
val hundMil = 1e8.toInt
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This test is based on a chi-squared test for randomness.
|
* This test is based on a chi-squared test for randomness.
|
||||||
*/
|
*/
|
||||||
test ("XORShift generates valid random numbers") {
|
test ("XORShift generates valid random numbers") {
|
||||||
|
|
||||||
val f = fixture
|
val xorRand = new XORShiftRandom(1L)
|
||||||
|
|
||||||
val numBins = 10 // create 10 bins
|
val numBins = 10 // create 10 bins
|
||||||
val numRows = 5 // create 5 rows
|
val numRows = 5 // create 5 rows
|
||||||
val bins = Array.ofDim[Long](numRows, numBins)
|
val bins = Array.ofDim[Long](numRows, numBins)
|
||||||
|
|
||||||
// populate bins based on modulus of the random number for each row
|
// populate bins based on modulus of the random number for each row
|
||||||
for (r <- 0 to numRows-1) {
|
for (r <- 0 until numRows) {
|
||||||
times(f.hundMil) {bins(r)(math.abs(f.xorRand.nextInt) % numBins) += 1}
|
times(100000000) {
|
||||||
|
bins(r)(math.abs(xorRand.nextInt) % numBins) += 1
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.spark.examples.ml
|
||||||
import java.util.Locale
|
import java.util.Locale
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
|
|
||||||
import scopt.OptionParser
|
import scopt.OptionParser
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.spark.examples.ml
|
||||||
import java.util.Locale
|
import java.util.Locale
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
|
|
||||||
import scopt.OptionParser
|
import scopt.OptionParser
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
// scalastyle:off println
|
// scalastyle:off println
|
||||||
package org.apache.spark.examples.ml
|
package org.apache.spark.examples.ml
|
||||||
|
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
|
|
||||||
import scopt.OptionParser
|
import scopt.OptionParser
|
||||||
|
|
||||||
import org.apache.spark.examples.mllib.AbstractParams
|
import org.apache.spark.examples.mllib.AbstractParams
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.spark.examples.ml
|
package org.apache.spark.examples.ml
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
|
|
||||||
import scopt.OptionParser
|
import scopt.OptionParser
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.spark.examples.ml
|
||||||
import java.util.Locale
|
import java.util.Locale
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
|
|
||||||
import scopt.OptionParser
|
import scopt.OptionParser
|
||||||
|
|
||||||
|
|
|
@ -16,39 +16,36 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.spark.sql.execution
|
package org.apache.spark.sql.execution
|
||||||
|
|
||||||
import java.util.Locale
|
|
||||||
|
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
|
|
||||||
import org.apache.spark.sql.AnalysisException
|
import org.apache.spark.sql.AnalysisException
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
|
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
|
||||||
import org.apache.spark.sql.test.SharedSQLContext
|
import org.apache.spark.sql.test.SharedSQLContext
|
||||||
|
|
||||||
class QueryExecutionSuite extends SharedSQLContext {
|
class QueryExecutionSuite extends SharedSQLContext {
|
||||||
test("toString() exception/error handling") {
|
test("toString() exception/error handling") {
|
||||||
val badRule = new SparkStrategy {
|
spark.experimental.extraStrategies = Seq(
|
||||||
var mode: String = ""
|
new SparkStrategy {
|
||||||
override def apply(plan: LogicalPlan): Seq[SparkPlan] =
|
override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil
|
||||||
mode.toLowerCase(Locale.ROOT) match {
|
})
|
||||||
case "exception" => throw new AnalysisException(mode)
|
|
||||||
case "error" => throw new Error(mode)
|
|
||||||
case _ => Nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
spark.experimental.extraStrategies = badRule :: Nil
|
|
||||||
|
|
||||||
def qe: QueryExecution = new QueryExecution(spark, OneRowRelation)
|
def qe: QueryExecution = new QueryExecution(spark, OneRowRelation)
|
||||||
|
|
||||||
// Nothing!
|
// Nothing!
|
||||||
badRule.mode = ""
|
|
||||||
assert(qe.toString.contains("OneRowRelation"))
|
assert(qe.toString.contains("OneRowRelation"))
|
||||||
|
|
||||||
// Throw an AnalysisException - this should be captured.
|
// Throw an AnalysisException - this should be captured.
|
||||||
badRule.mode = "exception"
|
spark.experimental.extraStrategies = Seq(
|
||||||
|
new SparkStrategy {
|
||||||
|
override def apply(plan: LogicalPlan): Seq[SparkPlan] =
|
||||||
|
throw new AnalysisException("exception")
|
||||||
|
})
|
||||||
assert(qe.toString.contains("org.apache.spark.sql.AnalysisException"))
|
assert(qe.toString.contains("org.apache.spark.sql.AnalysisException"))
|
||||||
|
|
||||||
// Throw an Error - this should not be captured.
|
// Throw an Error - this should not be captured.
|
||||||
badRule.mode = "error"
|
spark.experimental.extraStrategies = Seq(
|
||||||
|
new SparkStrategy {
|
||||||
|
override def apply(plan: LogicalPlan): Seq[SparkPlan] =
|
||||||
|
throw new Error("error")
|
||||||
|
})
|
||||||
val error = intercept[Error](qe.toString)
|
val error = intercept[Error](qe.toString)
|
||||||
assert(error.getMessage.contains("error"))
|
assert(error.getMessage.contains("error"))
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ import java.util.Locale
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
|
|
|
@ -22,7 +22,6 @@ import java.lang.reflect.InvocationTargetException
|
||||||
import java.net.{URL, URLClassLoader}
|
import java.net.{URL, URLClassLoader}
|
||||||
import java.util
|
import java.util
|
||||||
|
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
|
|
||||||
import org.apache.commons.io.{FileUtils, IOUtils}
|
import org.apache.commons.io.{FileUtils, IOUtils}
|
||||||
|
|
|
@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.language.reflectiveCalls
|
|
||||||
|
|
||||||
import org.scalatest.BeforeAndAfter
|
import org.scalatest.BeforeAndAfter
|
||||||
import org.scalatest.Matchers._
|
import org.scalatest.Matchers._
|
||||||
|
@ -202,21 +201,17 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
|
||||||
|
|
||||||
test("block push errors are reported") {
|
test("block push errors are reported") {
|
||||||
val listener = new TestBlockGeneratorListener {
|
val listener = new TestBlockGeneratorListener {
|
||||||
@volatile var errorReported = false
|
|
||||||
override def onPushBlock(
|
override def onPushBlock(
|
||||||
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
|
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
|
||||||
throw new SparkException("test")
|
throw new SparkException("test")
|
||||||
}
|
}
|
||||||
override def onError(message: String, throwable: Throwable): Unit = {
|
|
||||||
errorReported = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
blockGenerator = new BlockGenerator(listener, 0, conf)
|
blockGenerator = new BlockGenerator(listener, 0, conf)
|
||||||
blockGenerator.start()
|
blockGenerator.start()
|
||||||
assert(listener.errorReported === false)
|
assert(listener.onErrorCalled === false)
|
||||||
blockGenerator.addData(1)
|
blockGenerator.addData(1)
|
||||||
eventually(timeout(1 second), interval(10 milliseconds)) {
|
eventually(timeout(1 second), interval(10 milliseconds)) {
|
||||||
assert(listener.errorReported === true)
|
assert(listener.onErrorCalled === true)
|
||||||
}
|
}
|
||||||
blockGenerator.stop()
|
blockGenerator.stop()
|
||||||
}
|
}
|
||||||
|
@ -243,12 +238,15 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
|
||||||
@volatile var onGenerateBlockCalled = false
|
@volatile var onGenerateBlockCalled = false
|
||||||
@volatile var onAddDataCalled = false
|
@volatile var onAddDataCalled = false
|
||||||
@volatile var onPushBlockCalled = false
|
@volatile var onPushBlockCalled = false
|
||||||
|
@volatile var onErrorCalled = false
|
||||||
|
|
||||||
override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
|
override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
|
||||||
pushedData.addAll(arrayBuffer.asJava)
|
pushedData.addAll(arrayBuffer.asJava)
|
||||||
onPushBlockCalled = true
|
onPushBlockCalled = true
|
||||||
}
|
}
|
||||||
override def onError(message: String, throwable: Throwable): Unit = {}
|
override def onError(message: String, throwable: Throwable): Unit = {
|
||||||
|
onErrorCalled = true
|
||||||
|
}
|
||||||
override def onGenerateBlock(blockId: StreamBlockId): Unit = {
|
override def onGenerateBlock(blockId: StreamBlockId): Unit = {
|
||||||
onGenerateBlockCalled = true
|
onGenerateBlockCalled = true
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue