[SPARK-13294][PROJECT INFRA] Remove MiMa's dependency on spark-class / Spark assembly

This patch removes the need to build a full Spark assembly before running the `dev/mima` script.

- I modified the `tools` project to remove a direct dependency on Spark, so `sbt/sbt tools/fullClasspath` will now return the classpath for the `GenerateMIMAIgnore` class itself plus its own dependencies.
   - This required me to delete two classes full of dead code that we don't use anymore
- `GenerateMIMAIgnore` now uses [ClassUtil](http://software.clapper.org/classutil/) to find all of the Spark classes rather than our homemade JAR traversal code. The problem in our own code was that it didn't handle folders of classes properly, which is necessary in order to generate excludes with an assembly-free Spark build.
- `./dev/mima` no longer runs through `spark-class`, eliminating the need to reason about classpath ordering between `SPARK_CLASSPATH` and the assembly.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11178 from JoshRosen/remove-assembly-in-run-tests.
This commit is contained in:
Josh Rosen 2016-03-10 23:28:34 -08:00
parent d18276cb1d
commit 6ca990fb36
8 changed files with 58 additions and 576 deletions

View file

@ -24,24 +24,21 @@ set -e
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
cd "$FWDIR"
echo -e "q\n" | build/sbt oldDeps/update
TOOLS_CLASSPATH="$(build/sbt "export tools/fullClasspath" | tail -n1)"
rm -f .generated-mima*
generate_mima_ignore() {
SPARK_JAVA_OPTS="-XX:MaxPermSize=1g -Xmx2g" \
./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore
java \
-XX:MaxPermSize=1g \
-Xmx2g \
-cp "$TOOLS_CLASSPATH:$1" \
org.apache.spark.tools.GenerateMIMAIgnore
}
# Generate Mima Ignore is called twice, first with latest built jars
# on the classpath and then again with previous version jars on the classpath.
# Because of a bug in GenerateMIMAIgnore that when old jars are ahead on classpath
# it did not process the new classes (which are in assembly jar).
generate_mima_ignore
export SPARK_CLASSPATH="$(build/sbt "export oldDeps/fullClasspath" | tail -n1)"
echo "SPARK_CLASSPATH=$SPARK_CLASSPATH"
generate_mima_ignore
SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export assembly/fullClasspath" | tail -n1)"
generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"
echo -e "q\n" | build/sbt mima-report-binary-issues | grep -v -e "info.*Resolving"
ret_val=$?

View file

@ -336,7 +336,6 @@ def build_spark_sbt(hadoop_version):
# Enable all of the profiles for the build:
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["package",
"assembly/assembly",
"streaming-kafka-assembly/assembly",
"streaming-flume-assembly/assembly",
"streaming-mqtt-assembly/assembly",
@ -350,6 +349,16 @@ def build_spark_sbt(hadoop_version):
exec_sbt(profiles_and_goals)
def build_spark_assembly_sbt(hadoop_version):
# Enable all of the profiles for the build:
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["assembly/assembly"]
profiles_and_goals = build_profiles + sbt_goals
print("[info] Building Spark assembly (w/Hive 1.2.1) using SBT with these arguments: ",
" ".join(profiles_and_goals))
exec_sbt(profiles_and_goals)
def build_apache_spark(build_tool, hadoop_version):
"""Will build Spark against Hive v1.2.1 given the passed in build tool (either `sbt` or
`maven`). Defaults to using `sbt`."""
@ -561,11 +570,14 @@ def main():
# spark build
build_apache_spark(build_tool, hadoop_version)
# TODO Temporarily disable MiMA check for DF-to-DS migration prototyping
# # backwards compatibility checks
# if build_tool == "sbt":
# # Note: compatiblity tests only supported in sbt for now
# detect_binary_inop_with_mima()
# backwards compatibility checks
if build_tool == "sbt":
# Note: compatibility tests only supported in sbt for now
# TODO Temporarily disable MiMA check for DF-to-DS migration prototyping
# detect_binary_inop_with_mima()
# Since we did not build assembly/assembly before running dev/mima, we need to
# do it here because the tests still rely on it; see SPARK-13294 for details.
build_spark_assembly_sbt(hadoop_version)
# run the test suites
run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags)

View file

@ -17,12 +17,10 @@
package org.apache.spark.launcher;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
@ -76,26 +74,6 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder {
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
memKey = "SPARK_DAEMON_MEMORY";
} else if (className.startsWith("org.apache.spark.tools.")) {
String sparkHome = getSparkHome();
File toolsDir = new File(join(File.separator, sparkHome, "tools", "target",
"scala-" + getScalaVersion()));
checkState(toolsDir.isDirectory(), "Cannot find tools build directory.");
Pattern re = Pattern.compile("spark-tools_.*\\.jar");
for (File f : toolsDir.listFiles()) {
if (re.matcher(f.getName()).matches()) {
extraClassPath = f.getAbsolutePath();
break;
}
}
checkState(extraClassPath != null,
"Failed to find Spark Tools Jar in %s.\n" +
"You need to run \"build/sbt tools/package\" before running %s.",
toolsDir.getAbsolutePath(), className);
javaOptsKeys.add("SPARK_JAVA_OPTS");
} else {
javaOptsKeys.add("SPARK_JAVA_OPTS");
memKey = "SPARK_DRIVER_MEMORY";

View file

@ -384,18 +384,19 @@ object OldDeps {
lazy val project = Project("oldDeps", file("dev"), settings = oldDepsSettings)
def versionArtifact(id: String): Option[sbt.ModuleID] = {
val fullId = id + "_2.11"
Some("org.apache.spark" % fullId % "1.2.0")
}
def oldDepsSettings() = Defaults.coreDefaultSettings ++ Seq(
name := "old-deps",
scalaVersion := "2.10.5",
libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
"spark-streaming-flume", "spark-streaming-twitter",
"spark-streaming", "spark-mllib", "spark-graphx",
"spark-core").map(versionArtifact(_).get intransitive())
libraryDependencies := Seq(
"spark-streaming-mqtt",
"spark-streaming-zeromq",
"spark-streaming-flume",
"spark-streaming-twitter",
"spark-streaming",
"spark-mllib",
"spark-graphx",
"spark-core"
).map(id => "org.apache.spark" % (id + "_2.11") % "1.2.0")
)
}

View file

@ -34,16 +34,6 @@
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
@ -52,6 +42,11 @@
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
</dependency>
<dependency>
<groupId>org.clapper</groupId>
<artifactId>classutil_${scala.binary.version}</artifactId>
<version>1.0.6</version>
</dependency>
</dependencies>
<build>

View file

@ -18,15 +18,13 @@
// scalastyle:off classforname
package org.apache.spark.tools
import java.io.File
import java.util.jar.JarFile
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.reflect.runtime.{universe => unv}
import scala.reflect.runtime.universe.runtimeMirror
import scala.util.Try
import org.clapper.classutil.ClassFinder
/**
* A tool for generating classes to be excluded during binary checking with MIMA. It is expected
* that this tool is run with ./spark-class.
@ -42,12 +40,13 @@ object GenerateMIMAIgnore {
private val classLoader = Thread.currentThread().getContextClassLoader
private val mirror = runtimeMirror(classLoader)
private def isDeveloperApi(sym: unv.Symbol) = sym.annotations.exists {
_.tpe =:= mirror.staticClass("org.apache.spark.annotation.DeveloperApi").toType
}
private def isDeveloperApi(sym: unv.Symbol) =
sym.annotations.exists(_.tpe =:= unv.typeOf[org.apache.spark.annotation.DeveloperApi])
private def isExperimental(sym: unv.Symbol) =
sym.annotations.exists(_.tpe =:= unv.typeOf[org.apache.spark.annotation.Experimental])
private def isExperimental(sym: unv.Symbol) = sym.annotations.exists {
_.tpe =:= mirror.staticClass("org.apache.spark.annotation.Experimental").toType
}
private def isPackagePrivate(sym: unv.Symbol) =
@ -160,35 +159,13 @@ object GenerateMIMAIgnore {
* and subpackages both from directories and jars present on the classpath.
*/
private def getClasses(packageName: String): Set[String] = {
val path = packageName.replace('.', '/')
val resources = classLoader.getResources(path)
val jars = resources.asScala.filter(_.getProtocol == "jar")
.map(_.getFile.split(":")(1).split("!")(0)).toSeq
jars.flatMap(getClassesFromJar(_, path))
.map(_.getName)
.filterNot(shouldExclude).toSet
}
/**
* Get all classes in a package from a jar file.
*/
private def getClassesFromJar(jarPath: String, packageName: String) = {
import scala.collection.mutable
val jar = new JarFile(new File(jarPath))
val enums = jar.entries().asScala.map(_.getName).filter(_.startsWith(packageName))
val classes = mutable.HashSet[Class[_]]()
for (entry <- enums if entry.endsWith(".class")) {
try {
classes += Class.forName(entry.replace('/', '.').stripSuffix(".class"), false, classLoader)
} catch {
// scalastyle:off println
case _: Throwable => println("Unable to load:" + entry)
// scalastyle:on println
}
}
classes
val finder = ClassFinder()
finder
.getClasses
.map(_.name)
.filter(_.startsWith(packageName))
.filterNot(shouldExclude)
.toSet
}
}
// scalastyle:on classforname

View file

@ -1,367 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.tools
import java.lang.reflect.{Method, Type}
import scala.collection.mutable.ArrayBuffer
import scala.language.existentials
import org.apache.spark._
import org.apache.spark.api.java._
import org.apache.spark.rdd.{DoubleRDDFunctions, OrderedRDDFunctions, PairRDDFunctions, RDD}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaDStream, JavaPairDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}
private[spark] abstract class SparkType(val name: String)
private[spark] case class BaseType(override val name: String) extends SparkType(name) {
override def toString: String = {
name
}
}
private[spark]
case class ParameterizedType(override val name: String,
parameters: Seq[SparkType],
typebounds: String = "") extends SparkType(name) {
override def toString: String = {
if (typebounds != "") {
typebounds + " " + name + "<" + parameters.mkString(", ") + ">"
} else {
name + "<" + parameters.mkString(", ") + ">"
}
}
}
private[spark]
case class SparkMethod(name: String, returnType: SparkType, parameters: Seq[SparkType]) {
override def toString: String = {
returnType + " " + name + "(" + parameters.mkString(", ") + ")"
}
}
/**
* A tool for identifying methods that need to be ported from Scala to the Java API.
*
* It uses reflection to find methods in the Scala API and rewrites those methods' signatures
* into appropriate Java equivalents. If those equivalent methods have not been implemented in
* the Java API, they are printed.
*/
object JavaAPICompletenessChecker {
private def parseType(typeStr: String): SparkType = {
if (!typeStr.contains("<")) {
// Base types might begin with "class" or "interface", so we have to strip that off:
BaseType(typeStr.trim.split(" ").last)
} else if (typeStr.endsWith("[]")) {
ParameterizedType("Array", Seq(parseType(typeStr.stripSuffix("[]"))))
} else {
val parts = typeStr.split("<", 2)
val name = parts(0).trim
assert (parts(1).last == '>')
val parameters = parts(1).dropRight(1)
ParameterizedType(name, parseTypeList(parameters))
}
}
private def parseTypeList(typeStr: String): Seq[SparkType] = {
val types: ArrayBuffer[SparkType] = new ArrayBuffer[SparkType]
var stack = 0
var token: StringBuffer = new StringBuffer()
for (c <- typeStr.trim) {
if (c == ',' && stack == 0) {
types += parseType(token.toString)
token = new StringBuffer()
} else if (c == ' ' && stack != 0) {
// continue
} else {
if (c == '<') {
stack += 1
} else if (c == '>') {
stack -= 1
}
token.append(c)
}
}
assert (stack == 0)
if (token.toString != "") {
types += parseType(token.toString)
}
types.toSeq
}
private def parseReturnType(typeStr: String): SparkType = {
if (typeStr(0) == '<') {
val parts = typeStr.drop(0).split(">", 2)
val parsed = parseType(parts(1)).asInstanceOf[ParameterizedType]
ParameterizedType(parsed.name, parsed.parameters, parts(0))
} else {
parseType(typeStr)
}
}
private def toSparkMethod(method: Method): SparkMethod = {
val returnType = parseReturnType(method.getGenericReturnType.toString)
val name = method.getName
val parameters = method.getGenericParameterTypes.map(t => parseType(t.toString))
SparkMethod(name, returnType, parameters)
}
private def toJavaType(scalaType: SparkType, isReturnType: Boolean): SparkType = {
val renameSubstitutions = Map(
"scala.collection.Map" -> "java.util.Map",
// TODO: the JavaStreamingContext API accepts Array arguments
// instead of Lists, so this isn't a trivial translation / sub:
"scala.collection.Seq" -> "java.util.List",
"scala.Function2" -> "org.apache.spark.api.java.function.Function2",
"scala.collection.Iterator" -> "java.util.Iterator",
"scala.collection.mutable.Queue" -> "java.util.Queue",
"double" -> "java.lang.Double"
)
// Keep applying the substitutions until we've reached a fixedpoint.
def applySubs(scalaType: SparkType): SparkType = {
scalaType match {
case ParameterizedType(name, parameters, typebounds) =>
name match {
case "org.apache.spark.rdd.RDD" =>
if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
val tupleParams =
parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
ParameterizedType(classOf[JavaPairRDD[_, _]].getName, tupleParams)
} else {
ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(applySubs))
}
case "org.apache.spark.streaming.dstream.DStream" =>
if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
val tupleParams =
parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
ParameterizedType("org.apache.spark.streaming.api.java.JavaPairDStream",
tupleParams)
} else {
ParameterizedType("org.apache.spark.streaming.api.java.JavaDStream",
parameters.map(applySubs))
}
case "scala.Option" => {
if (isReturnType) {
ParameterizedType("org.apache.spark.api.java.Optional", parameters.map(applySubs))
} else {
applySubs(parameters(0))
}
}
case "scala.Function1" =>
val firstParamName = parameters.last.name
if (firstParamName.startsWith("scala.collection.Traversable") ||
firstParamName.startsWith("scala.collection.Iterator")) {
ParameterizedType("org.apache.spark.api.java.function.FlatMapFunction",
Seq(parameters(0),
parameters.last.asInstanceOf[ParameterizedType].parameters(0)).map(applySubs))
} else if (firstParamName == "scala.runtime.BoxedUnit") {
ParameterizedType("org.apache.spark.api.java.function.VoidFunction",
parameters.dropRight(1).map(applySubs))
} else {
ParameterizedType("org.apache.spark.api.java.function.Function",
parameters.map(applySubs))
}
case _ =>
ParameterizedType(renameSubstitutions.getOrElse(name, name),
parameters.map(applySubs))
}
case BaseType(name) =>
if (renameSubstitutions.contains(name)) {
BaseType(renameSubstitutions(name))
} else {
scalaType
}
}
}
var oldType = scalaType
var newType = applySubs(scalaType)
while (oldType != newType) {
oldType = newType
newType = applySubs(scalaType)
}
newType
}
private def toJavaMethod(method: SparkMethod): SparkMethod = {
val params = method.parameters
.filterNot(_.name == "scala.reflect.ClassTag")
.map(toJavaType(_, isReturnType = false))
SparkMethod(method.name, toJavaType(method.returnType, isReturnType = true), params)
}
private def isExcludedByName(method: Method): Boolean = {
val name = method.getDeclaringClass.getName + "." + method.getName
// Scala methods that are declared as private[mypackage] become public in the resulting
// Java bytecode. As a result, we need to manually exclude those methods here.
// This list also includes a few methods that are only used by the web UI or other
// internal Spark components.
val excludedNames = Seq(
"org.apache.spark.rdd.RDD.origin",
"org.apache.spark.rdd.RDD.elementClassTag",
"org.apache.spark.rdd.RDD.checkpointData",
"org.apache.spark.rdd.RDD.partitioner",
"org.apache.spark.rdd.RDD.partitions",
"org.apache.spark.rdd.RDD.firstParent",
"org.apache.spark.rdd.RDD.doCheckpoint",
"org.apache.spark.rdd.RDD.markCheckpointed",
"org.apache.spark.rdd.RDD.clearDependencies",
"org.apache.spark.rdd.RDD.getDependencies",
"org.apache.spark.rdd.RDD.getPartitions",
"org.apache.spark.rdd.RDD.dependencies",
"org.apache.spark.rdd.RDD.getPreferredLocations",
"org.apache.spark.rdd.RDD.collectPartitions",
"org.apache.spark.rdd.RDD.computeOrReadCheckpoint",
"org.apache.spark.rdd.PairRDDFunctions.getKeyClass",
"org.apache.spark.rdd.PairRDDFunctions.getValueClass",
"org.apache.spark.SparkContext.stringToText",
"org.apache.spark.SparkContext.makeRDD",
"org.apache.spark.SparkContext.runJob",
"org.apache.spark.SparkContext.runApproximateJob",
"org.apache.spark.SparkContext.clean",
"org.apache.spark.SparkContext.metadataCleaner",
"org.apache.spark.SparkContext.ui",
"org.apache.spark.SparkContext.newShuffleId",
"org.apache.spark.SparkContext.newRddId",
"org.apache.spark.SparkContext.cleanup",
"org.apache.spark.SparkContext.receiverJobThread",
"org.apache.spark.SparkContext.getRDDStorageInfo",
"org.apache.spark.SparkContext.addedFiles",
"org.apache.spark.SparkContext.addedJars",
"org.apache.spark.SparkContext.persistentRdds",
"org.apache.spark.SparkContext.executorEnvs",
"org.apache.spark.SparkContext.checkpointDir",
"org.apache.spark.SparkContext.getSparkHome",
"org.apache.spark.SparkContext.executorMemoryRequested",
"org.apache.spark.SparkContext.getExecutorStorageStatus",
"org.apache.spark.streaming.dstream.DStream.generatedRDDs",
"org.apache.spark.streaming.dstream.DStream.zeroTime",
"org.apache.spark.streaming.dstream.DStream.rememberDuration",
"org.apache.spark.streaming.dstream.DStream.storageLevel",
"org.apache.spark.streaming.dstream.DStream.mustCheckpoint",
"org.apache.spark.streaming.dstream.DStream.checkpointDuration",
"org.apache.spark.streaming.dstream.DStream.checkpointData",
"org.apache.spark.streaming.dstream.DStream.graph",
"org.apache.spark.streaming.dstream.DStream.isInitialized",
"org.apache.spark.streaming.dstream.DStream.parentRememberDuration",
"org.apache.spark.streaming.dstream.DStream.initialize",
"org.apache.spark.streaming.dstream.DStream.validate",
"org.apache.spark.streaming.dstream.DStream.setContext",
"org.apache.spark.streaming.dstream.DStream.setGraph",
"org.apache.spark.streaming.dstream.DStream.remember",
"org.apache.spark.streaming.dstream.DStream.getOrCompute",
"org.apache.spark.streaming.dstream.DStream.generateJob",
"org.apache.spark.streaming.dstream.DStream.clearOldMetadata",
"org.apache.spark.streaming.dstream.DStream.addMetadata",
"org.apache.spark.streaming.dstream.DStream.updateCheckpointData",
"org.apache.spark.streaming.dstream.DStream.restoreCheckpointData",
"org.apache.spark.streaming.dstream.DStream.isTimeValid",
"org.apache.spark.streaming.StreamingContext.nextNetworkInputStreamId",
"org.apache.spark.streaming.StreamingContext.checkpointDir",
"org.apache.spark.streaming.StreamingContext.checkpointDuration",
"org.apache.spark.streaming.StreamingContext.receiverJobThread",
"org.apache.spark.streaming.StreamingContext.scheduler",
"org.apache.spark.streaming.StreamingContext.initialCheckpoint",
"org.apache.spark.streaming.StreamingContext.getNewNetworkStreamId",
"org.apache.spark.streaming.StreamingContext.validate",
"org.apache.spark.streaming.StreamingContext.createNewSparkContext",
"org.apache.spark.streaming.StreamingContext.rddToFileName",
"org.apache.spark.streaming.StreamingContext.getSparkCheckpointDir",
"org.apache.spark.streaming.StreamingContext.env",
"org.apache.spark.streaming.StreamingContext.graph",
"org.apache.spark.streaming.StreamingContext.isCheckpointPresent"
)
val excludedPatterns = Seq(
"""^org\.apache\.spark\.SparkContext\..*To.*Functions""",
"""^org\.apache\.spark\.SparkContext\..*WritableConverter""",
"""^org\.apache\.spark\.SparkContext\..*To.*Writable"""
).map(_.r)
lazy val excludedByPattern =
!excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).isEmpty
name.contains("$") || excludedNames.contains(name) || excludedByPattern
}
private def isExcludedByInterface(method: Method): Boolean = {
val excludedInterfaces =
Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
def toComparisionKey(method: Method): (Class[_], String, Type) =
(method.getReturnType, method.getName, method.getGenericReturnType)
val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
excludedInterfaces.contains(i.getName)
}
val excludedMethods = interfaces.flatMap(_.getMethods.map(toComparisionKey))
excludedMethods.contains(toComparisionKey(method))
}
private def printMissingMethods(scalaClass: Class[_], javaClass: Class[_]) {
val methods = scalaClass.getMethods
.filterNot(_.isAccessible)
.filterNot(isExcludedByName)
.filterNot(isExcludedByInterface)
val javaEquivalents = methods.map(m => toJavaMethod(toSparkMethod(m))).toSet
val javaMethods = javaClass.getMethods.map(toSparkMethod).toSet
val missingMethods = javaEquivalents -- javaMethods
for (method <- missingMethods) {
// scalastyle:off println
println(method)
// scalastyle:on println
}
}
def main(args: Array[String]) {
// scalastyle:off println
println("Missing RDD methods")
printMissingMethods(classOf[RDD[_]], classOf[JavaRDD[_]])
println()
println("Missing PairRDD methods")
printMissingMethods(classOf[PairRDDFunctions[_, _]], classOf[JavaPairRDD[_, _]])
println()
println("Missing DoubleRDD methods")
printMissingMethods(classOf[DoubleRDDFunctions], classOf[JavaDoubleRDD])
println()
println("Missing OrderedRDD methods")
printMissingMethods(classOf[OrderedRDDFunctions[_, _, _]], classOf[JavaPairRDD[_, _]])
println()
println("Missing SparkContext methods")
printMissingMethods(classOf[SparkContext], classOf[JavaSparkContext])
println()
println("Missing StreamingContext methods")
printMissingMethods(classOf[StreamingContext], classOf[JavaStreamingContext])
println()
println("Missing DStream methods")
printMissingMethods(classOf[DStream[_]], classOf[JavaDStream[_]])
println()
println("Missing PairDStream methods")
printMissingMethods(classOf[PairDStreamFunctions[_, _]], classOf[JavaPairDStream[_, _]])
println()
// scalastyle:on println
}
}

View file

@ -1,111 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.tools
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.util.Utils
/**
* Internal utility for micro-benchmarking shuffle write performance.
*
* Writes simulated shuffle output from several threads and records the observed throughput.
*/
object StoragePerfTester {
def main(args: Array[String]): Unit = {
/** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
/** Number of map tasks. All tasks execute concurrently. */
val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
/** Number of reduce splits for each map task. */
val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
val recordLength = 1000 // ~1KB records
val totalRecords = dataSizeMb * 1000
val recordsPerMap = totalRecords / numMaps
val writeKey = "1" * (recordLength / 2)
val writeValue = "1" * (recordLength / 2)
val executor = Executors.newFixedThreadPool(numMaps)
val conf = new SparkConf()
.set("spark.shuffle.compress", "false")
.set("spark.shuffle.sync", "true")
.set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
// This is only used to instantiate a BlockManager. All thread scheduling is done manually.
val sc = new SparkContext("local[4]", "Write Tester", conf)
val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
def writeOutputBytes(mapId: Int, total: AtomicLong): Unit = {
val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, mapId, numOutputSplits,
new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
val writers = shuffle.writers
for (i <- 1 to recordsPerMap) {
writers(i % numOutputSplits).write(writeKey, writeValue)
}
writers.map { w =>
w.commitAndClose()
total.addAndGet(w.fileSegment().length)
}
shuffle.releaseWriters(true)
}
val start = System.currentTimeMillis()
val latch = new CountDownLatch(numMaps)
val totalBytes = new AtomicLong()
for (task <- 1 to numMaps) {
executor.submit(new Runnable() {
override def run(): Unit = {
try {
writeOutputBytes(task, totalBytes)
latch.countDown()
} catch {
case e: Exception =>
// scalastyle:off println
println("Exception in child thread: " + e + " " + e.getMessage)
// scalastyle:on println
System.exit(1)
}
}
})
}
latch.await()
val end = System.currentTimeMillis()
val time = (end - start) / 1000.0
val bytesPerSecond = totalBytes.get() / time
val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
// scalastyle:off println
System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
// scalastyle:on println
executor.shutdown()
sc.stop()
}
}