73bed408fb
This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records. The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf. The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles. UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: - The shuffle dependency specifies no aggregation or output ordering. - The shuffle serializer supports relocation of serialized values (this is currently supported by KryoSerializer and Spark SQL's custom serializers). - The shuffle produces fewer than 16777216 output partitions. - No individual record is larger than 128 MB when serialized. In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer. At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file. UnsafeShuffleManager optimizes this process in several ways: - Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details. - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache. - The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge. - When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge. The shuffle read path is unchanged. This patch is similar to [SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / #4450 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators (such as the new aggregation operator introduced in #5725). ### Future work There are several tasks that build upon this patch, which will be left to future work: - [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data. - Extension / redesign of the `Serializer` API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how `UnsafeRowConverter` works in Spark SQL). <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5868) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes #5868 from JoshRosen/unsafe-sort and squashes the following commits: ef0a86e [Josh Rosen] Fix scalastyle errors 7610f2f [Josh Rosen] Add tests for proper cleanup of shuffle data. d494ffe [Josh Rosen] Fix deserialization of JavaSerializer instances. 52a9981 [Josh Rosen] Fix some bugs in the address packing code. 51812a7 [Josh Rosen] Change shuffle manager sort name to tungsten-sort 4023fa4 [Josh Rosen] Add @Private annotation to some Java classes. de40b9d [Josh Rosen] More comments to try to explain metrics code df07699 [Josh Rosen] Attempt to clarify confusing metrics update code 5e189c6 [Josh Rosen] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file. d5779c6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort c2ce78e [Josh Rosen] Fix a missed usage of MAX_PARTITION_ID e3b8855 [Josh Rosen] Cleanup in UnsafeShuffleWriter4a2c785
[Josh Rosen] rename 'sort buffer' to 'pointer array' 6276168 [Josh Rosen] Remove ability to disable spilling in UnsafeShuffleExternalSorter. 57312c9 [Josh Rosen] Clarify fileBufferSize units 2d4e4f4 [Josh Rosen] Address some minor comments in UnsafeShuffleExternalSorter. fdcac08 [Josh Rosen] Guard against overflow when expanding sort buffer. 85da63f [Josh Rosen] Cleanup in UnsafeShuffleSorterIterator. 0ad34da [Josh Rosen] Fix off-by-one in nextInt() call 56781a1 [Josh Rosen] Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter e995d1a [Josh Rosen] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS. e58a6b4 [Josh Rosen] Add more tests for PackedRecordPointer encoding. 4f0b770 [Josh Rosen] Attempt to implement proper shuffle write metrics. d4e6d89 [Josh Rosen] Update to bit shifting constants 69d5899 [Josh Rosen] Remove some unnecessary override vals 8531286 [Josh Rosen] Add tests that automatically trigger spills. 7c953f9 [Josh Rosen] Add test that covers UnsafeShuffleSortDataFormat.swap(). e1855e5 [Josh Rosen] Fix a handful of misc. IntelliJ inspections 39434f9 [Josh Rosen] Avoid integer multiplication overflow in getMemoryUsage (thanks FindBugs!) 1e3ad52 [Josh Rosen] Delete unused ByteBufferOutputStream class. ea4f85f [Josh Rosen] Roll back an unnecessary change in Spillable. ae538dc [Josh Rosen] Document UnsafeShuffleManager. ec6d626 [Josh Rosen] Add notes on maximum # of supported shuffle partitions. 0d4d199 [Josh Rosen] Bump up shuffle.memoryFraction to make tests pass. b3b1924 [Josh Rosen] Properly implement close() and flush() in DummySerializerInstance. 1ef56c7 [Josh Rosen] Revise compression codec support in merger; test cross product of configurations. b57c17f [Josh Rosen] Disable some overly-verbose logs that rendered DEBUG useless. f780fb1 [Josh Rosen] Add test demonstrating which compression codecs support concatenation. 4a01c45 [Josh Rosen] Remove unnecessary log message 27b18b0 [Josh Rosen] That for inserting records AT the max record size. fcd9a3c [Josh Rosen] Add notes + tests for maximum record / page sizes. 9d1ee7c [Josh Rosen] Fix MiMa excludes for ShuffleWriter change fd4bb9e [Josh Rosen] Use own ByteBufferOutputStream rather than Kryo's 67d25ba [Josh Rosen] Update Exchange operator's copying logic to account for new shuffle manager 8f5061a [Josh Rosen] Strengthen assertion to check partitioning 01afc74 [Josh Rosen] Actually read data in UnsafeShuffleWriterSuite 1929a74 [Josh Rosen] Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolver rename. e8718dd [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort 9b7ebed [Josh Rosen] More defensive programming RE: cleaning up spill files and memory after errors 7cd013b [Josh Rosen] Begin refactoring to enable proper tests for spilling. 722849b [Josh Rosen] Add workaround for transferTo() bug in merging code; refactor tests.9883e30
[Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort b95e642 [Josh Rosen] Refactor and document logic that decides when to spill. 1ce1300 [Josh Rosen] More minor cleanup 5e8cf75 [Josh Rosen] More minor cleanup e67f1ea [Josh Rosen] Remove upper type bound in ShuffleWriter interface. cfe0ec4 [Josh Rosen] Address a number of minor review comments: 8a6fe52 [Josh Rosen] Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter 11feeb6 [Josh Rosen] Update TODOs related to shuffle write metrics. b674412 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort aaea17b [Josh Rosen] Add comments to UnsafeShuffleSpillWriter. 4f70141 [Josh Rosen] Fix merging; now passes UnsafeShuffleSuite tests. 133c8c9 [Josh Rosen] WIP towards testing UnsafeShuffleWriter. f480fb2 [Josh Rosen] WIP in mega-refactoring towards shuffle-specific sort. 57f1ec0 [Josh Rosen] WIP towards packed record pointers for use in optimized shuffle sort. 69232fd [Josh Rosen] Enable compressible address encoding for off-heap mode. 7ee918e [Josh Rosen] Re-order imports in tests 3aeaff7 [Josh Rosen] More refactoring and cleanup; begin cleaning iterator interfaces 3490512 [Josh Rosen] Misc. cleanup f156a8f [Josh Rosen] Hacky metrics integration; refactor some interfaces. 2776aca [Josh Rosen] First passing test for ExternalSorter. 5e100b2 [Josh Rosen] Super-messy WIP on external sort 595923a [Josh Rosen] Remove some unused variables. 8958584 [Josh Rosen] Fix bug in calculating free space in current page. f17fa8f [Josh Rosen] Add missing newline c2fca17 [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: b8a09fe [Josh Rosen] Back out accidental log4j.properties change bfc12d3 [Josh Rosen] Add tests for serializer relocation property. 240864c [Josh Rosen] Remove PrefixComputer and require prefix to be specified as part of insert() 1433b42 [Josh Rosen] Store record length as int instead of long. 026b497 [Josh Rosen] Re-use a buffer in UnsafeShuffleWriter 0748458 [Josh Rosen] Port UnsafeShuffleWriter to Java. 87e721b [Josh Rosen] Renaming and comments d3cc310 [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation e2d96ca [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. e267cee [Josh Rosen] Fix compilation of UnsafeSorterSuite 9c6cf58 [Josh Rosen] Refactor to use DiskBlockObjectWriter. 253f13e [Josh Rosen] More cleanup 8e3ec20 [Josh Rosen] Begin code cleanup. 4d2f5e1 [Josh Rosen] WIP 3db12de [Josh Rosen] Minor simplification and sanity checks in UnsafeSorter 767d3ca [Josh Rosen] Fix invalid range in UnsafeSorter. e900152 [Josh Rosen] Add test for empty iterator in UnsafeSorter 57a4ea0 [Josh Rosen] Make initialSize configurable in UnsafeSorter abf7bfe [Josh Rosen] Add basic test case. 81d52c5 [Josh Rosen] WIP on UnsafeSorter
486 lines
29 KiB
Scala
486 lines
29 KiB
Scala
/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
* contributor license agreements. See the NOTICE file distributed with
|
|
* this work for additional information regarding copyright ownership.
|
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
* (the "License"); you may not use this file except in compliance with
|
|
* the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
import com.typesafe.tools.mima.core._
|
|
import com.typesafe.tools.mima.core.ProblemFilters._
|
|
|
|
/**
|
|
* Additional excludes for checking of Spark's binary compatibility.
|
|
*
|
|
* The Mima build will automatically exclude @DeveloperApi and @Experimental classes. This acts
|
|
* as an official audit of cases where we excluded other classes. Please use the narrowest
|
|
* possible exclude here. MIMA will usually tell you what exclude to use, e.g.:
|
|
*
|
|
* ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.rdd.RDD.take")
|
|
*
|
|
* It is also possible to exclude Spark classes and packages. This should be used sparingly:
|
|
*
|
|
* MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
|
|
*/
|
|
object MimaExcludes {
|
|
def excludes(version: String) =
|
|
version match {
|
|
case v if v.startsWith("1.4") =>
|
|
Seq(
|
|
MimaBuild.excludeSparkPackage("deploy"),
|
|
MimaBuild.excludeSparkPackage("ml"),
|
|
// SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff"),
|
|
// These are needed if checking against the sbt build, since they are part of
|
|
// the maven-generated artifacts in 1.3.
|
|
excludePackage("org.spark-project.jetty"),
|
|
MimaBuild.excludeSparkPackage("unused"),
|
|
ProblemFilters.exclude[MissingClassProblem]("com.google.common.base.Optional"),
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem](
|
|
"org.apache.spark.rdd.JdbcRDD.compute"),
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem](
|
|
"org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast"),
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem](
|
|
"org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor")
|
|
) ++ Seq(
|
|
// SPARK-4655 - Making Stage an Abstract class broke binary compatility even though
|
|
// the stage class is defined as private[spark]
|
|
ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.scheduler.Stage")
|
|
) ++ Seq(
|
|
// SPARK-6510 Add a Graph#minus method acting as Set#difference
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
|
|
) ++ Seq(
|
|
// SPARK-6492 Fix deadlock in SparkContext.stop()
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.org$" +
|
|
"apache$spark$SparkContext$$SPARK_CONTEXT_CONSTRUCTOR_LOCK")
|
|
)++ Seq(
|
|
// SPARK-6693 add tostring with max lines and width for matrix
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Matrix.toString")
|
|
)++ Seq(
|
|
// SPARK-6703 Add getOrCreate method to SparkContext
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem]
|
|
("org.apache.spark.SparkContext.org$apache$spark$SparkContext$$activeContext")
|
|
)++ Seq(
|
|
// SPARK-7090 Introduce LDAOptimizer to LDA to further improve extensibility
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.mllib.clustering.LDA$EMOptimizer")
|
|
) ++ Seq(
|
|
// SPARK-6756 add toSparse, toDense, numActives, numNonzeros, and compressed to Vector
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Vector.compressed"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Vector.toDense"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Vector.numNonzeros"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Vector.toSparse"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Vector.numActives")
|
|
) ++ Seq(
|
|
// Execution should never be included as its always internal.
|
|
MimaBuild.excludeSparkPackage("sql.execution"),
|
|
// This `protected[sql]` method was removed in 1.3.1
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.sql.SQLContext.checkAnalysis"),
|
|
// These `private[sql]` class were removed in 1.4.0:
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.execution.AddExchange"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.execution.AddExchange$"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.PartitionSpec"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.PartitionSpec$"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.Partition"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.Partition$"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues$"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.ParquetRelation2"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.ParquetRelation2$"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache"),
|
|
// These test support classes were moved out of src/main and into src/test:
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.ParquetTestData"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.ParquetTestData$"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.sql.parquet.TestGroupWriteSupport"),
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData"),
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData$"),
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CacheManager")
|
|
) ++ Seq(
|
|
// SPARK-7530 Added StreamingContext.getState()
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.streaming.StreamingContext.state_=")
|
|
) ++ Seq(
|
|
// SPARK-7081 changed ShuffleWriter from a trait to an abstract class and removed some
|
|
// unnecessary type bounds in order to fix some compiler warnings that occurred when
|
|
// implementing this interface in Java. Note that ShuffleWriter is private[spark].
|
|
ProblemFilters.exclude[IncompatibleTemplateDefProblem](
|
|
"org.apache.spark.shuffle.ShuffleWriter")
|
|
)
|
|
|
|
case v if v.startsWith("1.3") =>
|
|
Seq(
|
|
MimaBuild.excludeSparkPackage("deploy"),
|
|
MimaBuild.excludeSparkPackage("ml"),
|
|
// These are needed if checking against the sbt build, since they are part of
|
|
// the maven-generated artifacts in the 1.2 build.
|
|
MimaBuild.excludeSparkPackage("unused"),
|
|
ProblemFilters.exclude[MissingClassProblem]("com.google.common.base.Optional")
|
|
) ++ Seq(
|
|
// SPARK-2321
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.SparkStageInfoImpl.this"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.SparkStageInfo.submissionTime")
|
|
) ++ Seq(
|
|
// SPARK-4614
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Matrices.randn"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Matrices.rand")
|
|
) ++ Seq(
|
|
// SPARK-5321
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.SparseMatrix.transposeMultiply"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Matrix.transpose"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.DenseMatrix.transposeMultiply"),
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.linalg.Matrix." +
|
|
"org$apache$spark$mllib$linalg$Matrix$_setter_$isTransposed_="),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Matrix.isTransposed"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.linalg.Matrix.foreachActive")
|
|
) ++ Seq(
|
|
// SPARK-5540
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.solveLeastSquares"),
|
|
// SPARK-5536
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateBlock")
|
|
) ++ Seq(
|
|
// SPARK-3325
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.streaming.api.java.JavaDStreamLike.print"),
|
|
// SPARK-2757
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem](
|
|
"org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler." +
|
|
"removeAndGetProcessor")
|
|
) ++ Seq(
|
|
// SPARK-5123 (SparkSQL data type change) - alpha component only
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem](
|
|
"org.apache.spark.ml.feature.HashingTF.outputDataType"),
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem](
|
|
"org.apache.spark.ml.feature.Tokenizer.outputDataType"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem](
|
|
"org.apache.spark.ml.feature.Tokenizer.validateInputType"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem](
|
|
"org.apache.spark.ml.classification.LogisticRegressionModel.validateAndTransformSchema"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem](
|
|
"org.apache.spark.ml.classification.LogisticRegression.validateAndTransformSchema")
|
|
) ++ Seq(
|
|
// SPARK-4014
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.TaskContext.taskAttemptId"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.TaskContext.attemptNumber")
|
|
) ++ Seq(
|
|
// SPARK-5166 Spark SQL API stabilization
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Transformer.transform"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Estimator.fit"),
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.Transformer.transform"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Pipeline.fit"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.PipelineModel.transform"),
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.Estimator.fit"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Evaluator.evaluate"),
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.Evaluator.evaluate"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.tuning.CrossValidator.fit"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.tuning.CrossValidatorModel.transform"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.feature.StandardScaler.fit"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.feature.StandardScalerModel.transform"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.classification.LogisticRegressionModel.transform"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.classification.LogisticRegression.fit"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.evaluation.BinaryClassificationEvaluator.evaluate")
|
|
) ++ Seq(
|
|
// SPARK-5270
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDDLike.isEmpty")
|
|
) ++ Seq(
|
|
// SPARK-5430
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDDLike.treeReduce"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDDLike.treeAggregate")
|
|
) ++ Seq(
|
|
// SPARK-5297 Java FileStream do not work with custom key/values
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
|
|
) ++ Seq(
|
|
// SPARK-5315 Spark Streaming Java API returns Scala DStream
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
|
|
) ++ Seq(
|
|
// SPARK-5461 Graph should have isCheckpointed, getCheckpointFiles methods
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.graphx.Graph.getCheckpointFiles"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.graphx.Graph.isCheckpointed")
|
|
) ++ Seq(
|
|
// SPARK-4789 Standardize ML Prediction APIs
|
|
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.linalg.VectorUDT"),
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.serialize"),
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.sqlType")
|
|
) ++ Seq(
|
|
// SPARK-5814
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$wrapDoubleArray"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$fillFullMatrix"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$iterations"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeOutLinkBlock"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$computeYtY"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeLinkRDDs"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$alpha"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$randomFactor"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeInLinkBlock"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$dspr"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$lambda"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$implicitPrefs"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$rank")
|
|
) ++ Seq(
|
|
// SPARK-4682
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
|
|
) ++ Seq(
|
|
// SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff")
|
|
)
|
|
|
|
case v if v.startsWith("1.2") =>
|
|
Seq(
|
|
MimaBuild.excludeSparkPackage("deploy"),
|
|
MimaBuild.excludeSparkPackage("graphx")
|
|
) ++
|
|
MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++
|
|
MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++
|
|
Seq(
|
|
ProblemFilters.exclude[IncompatibleTemplateDefProblem](
|
|
"org.apache.spark.scheduler.TaskLocation"),
|
|
// Added normL1 and normL2 to trait MultivariateStatisticalSummary
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2"),
|
|
// MapStatus should be private[spark]
|
|
ProblemFilters.exclude[IncompatibleTemplateDefProblem](
|
|
"org.apache.spark.scheduler.MapStatus"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.network.netty.PathResolver"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.spark.network.netty.client.BlockClientListener"),
|
|
|
|
// TaskContext was promoted to Abstract class
|
|
ProblemFilters.exclude[AbstractClassProblem](
|
|
"org.apache.spark.TaskContext"),
|
|
ProblemFilters.exclude[IncompatibleTemplateDefProblem](
|
|
"org.apache.spark.util.collection.SortDataFormat")
|
|
) ++ Seq(
|
|
// Adding new methods to the JavaRDDLike trait:
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDDLike.takeAsync"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDDLike.foreachPartitionAsync"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDDLike.countAsync"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDDLike.collectAsync")
|
|
) ++ Seq(
|
|
// SPARK-3822
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem](
|
|
"org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
|
|
) ++ Seq(
|
|
// SPARK-1209
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil"),
|
|
ProblemFilters.exclude[MissingClassProblem](
|
|
"org.apache.hadoop.mapred.SparkHadoopMapRedUtil"),
|
|
ProblemFilters.exclude[MissingTypesProblem](
|
|
"org.apache.spark.rdd.PairRDDFunctions")
|
|
) ++ Seq(
|
|
// SPARK-4062
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.streaming.kafka.KafkaReceiver#MessageHandler.this")
|
|
)
|
|
|
|
case v if v.startsWith("1.1") =>
|
|
Seq(
|
|
MimaBuild.excludeSparkPackage("deploy"),
|
|
MimaBuild.excludeSparkPackage("graphx")
|
|
) ++
|
|
Seq(
|
|
// Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
|
|
// Should probably mark this as Experimental
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
|
|
// We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
|
|
// for countApproxDistinct* functions, which does not work in Java. We later removed
|
|
// them, and use the following to tell Mima to not care about them.
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem](
|
|
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
|
|
ProblemFilters.exclude[IncompatibleResultTypeProblem](
|
|
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.storage.DiskStore.getValues"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.storage.MemoryStore.Entry")
|
|
) ++
|
|
Seq(
|
|
// Serializer interface change. See SPARK-3045.
|
|
ProblemFilters.exclude[IncompatibleTemplateDefProblem](
|
|
"org.apache.spark.serializer.DeserializationStream"),
|
|
ProblemFilters.exclude[IncompatibleTemplateDefProblem](
|
|
"org.apache.spark.serializer.Serializer"),
|
|
ProblemFilters.exclude[IncompatibleTemplateDefProblem](
|
|
"org.apache.spark.serializer.SerializationStream"),
|
|
ProblemFilters.exclude[IncompatibleTemplateDefProblem](
|
|
"org.apache.spark.serializer.SerializerInstance")
|
|
)++
|
|
Seq(
|
|
// Renamed putValues -> putArray + putIterator
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.storage.MemoryStore.putValues"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.storage.DiskStore.putValues"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.storage.TachyonStore.putValues")
|
|
) ++
|
|
Seq(
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.streaming.flume.FlumeReceiver.this"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem](
|
|
"org.apache.spark.streaming.kafka.KafkaUtils.createStream"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem](
|
|
"org.apache.spark.streaming.kafka.KafkaReceiver.this")
|
|
) ++
|
|
Seq( // Ignore some private methods in ALS.
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
|
|
ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
|
|
"org.apache.spark.mllib.recommendation.ALS.this"),
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem](
|
|
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures")
|
|
) ++
|
|
MimaBuild.excludeSparkClass("mllib.linalg.distributed.ColumnStatisticsAggregator") ++
|
|
MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
|
|
MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
|
|
MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
|
|
MimaBuild.excludeSparkClass("storage.Values") ++
|
|
MimaBuild.excludeSparkClass("storage.Entry") ++
|
|
MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++
|
|
// Class was missing "@DeveloperApi" annotation in 1.0.
|
|
MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++
|
|
Seq(
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem](
|
|
"org.apache.spark.mllib.tree.impurity.Gini.calculate"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem](
|
|
"org.apache.spark.mllib.tree.impurity.Entropy.calculate"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem](
|
|
"org.apache.spark.mllib.tree.impurity.Variance.calculate")
|
|
) ++
|
|
Seq( // Package-private classes removed in SPARK-2341
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser"),
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser$"),
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser"),
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser$"),
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser"),
|
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser$")
|
|
) ++
|
|
Seq( // package-private classes removed in MLlib
|
|
ProblemFilters.exclude[MissingMethodProblem](
|
|
"org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.org$apache$spark$mllib$regression$GeneralizedLinearAlgorithm$$prependOne")
|
|
) ++
|
|
Seq( // new Vector methods in MLlib (binary compatible assuming users do not implement Vector)
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.linalg.Vector.copy")
|
|
) ++
|
|
Seq( // synthetic methods generated in LabeledPoint
|
|
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.regression.LabeledPoint$"),
|
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.regression.LabeledPoint.apply"),
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.regression.LabeledPoint.toString")
|
|
) ++
|
|
Seq ( // Scala 2.11 compatibility fix
|
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.<init>$default$2")
|
|
)
|
|
case v if v.startsWith("1.0") =>
|
|
Seq(
|
|
MimaBuild.excludeSparkPackage("api.java"),
|
|
MimaBuild.excludeSparkPackage("mllib"),
|
|
MimaBuild.excludeSparkPackage("streaming")
|
|
) ++
|
|
MimaBuild.excludeSparkClass("rdd.ClassTags") ++
|
|
MimaBuild.excludeSparkClass("util.XORShiftRandom") ++
|
|
MimaBuild.excludeSparkClass("graphx.EdgeRDD") ++
|
|
MimaBuild.excludeSparkClass("graphx.VertexRDD") ++
|
|
MimaBuild.excludeSparkClass("graphx.impl.GraphImpl") ++
|
|
MimaBuild.excludeSparkClass("graphx.impl.RoutingTable") ++
|
|
MimaBuild.excludeSparkClass("graphx.util.collection.PrimitiveKeyOpenHashMap") ++
|
|
MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap") ++
|
|
MimaBuild.excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
|
|
MimaBuild.excludeSparkClass("mllib.optimization.SquaredGradient") ++
|
|
MimaBuild.excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
|
|
MimaBuild.excludeSparkClass("mllib.regression.LassoWithSGD") ++
|
|
MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
|
|
case _ => Seq()
|
|
}
|
|
}
|