[SPARK-15081] Move AccumulatorV2 and subclasses into util package

## What changes were proposed in this pull request?
This patch moves AccumulatorV2 and subclasses into util package.

## How was this patch tested?
Updated relevant tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #12863 from rxin/SPARK-15081.
This commit is contained in:
Reynold Xin 2016-05-03 19:45:12 +08:00 committed by Wenchen Fan
parent a744457076
commit d557a5e01e
30 changed files with 44 additions and 34 deletions

View file

@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, LegacyAccumulatorWrapper}
/**

View file

@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, Utils}
/**
* Classes that represent cleaning tasks.

View file

@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util._
/**
* A heartbeat from executors to the driver. This is a shared message used by several internal

View file

@ -24,7 +24,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.Source
import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener}
import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, TaskFailureListener}
object TaskContext {

View file

@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
import org.apache.spark.util.{AccumulatorV2, Utils}
// ==============================================================================================
// NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol!

View file

@ -17,8 +17,8 @@
package org.apache.spark.executor
import org.apache.spark.LongAccumulator
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.LongAccumulator
/**

View file

@ -17,8 +17,8 @@
package org.apache.spark.executor
import org.apache.spark.LongAccumulator
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.LongAccumulator
/**

View file

@ -17,8 +17,8 @@
package org.apache.spark.executor
import org.apache.spark.LongAccumulator
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.LongAccumulator
/**

View file

@ -17,8 +17,8 @@
package org.apache.spark.executor
import org.apache.spark.LongAccumulator
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.LongAccumulator
/**

View file

@ -24,6 +24,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator}
/**

View file

@ -23,7 +23,7 @@ import scala.language.existentials
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.CallSite
import org.apache.spark.util.{AccumulatorV2, CallSite}
/**
* Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue

View file

@ -28,7 +28,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}
import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOutputStream, Utils}
/**
* A unit of execution. We have two kinds of Task's in Spark:

View file

@ -22,9 +22,9 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{AccumulatorV2, SparkEnv}
import org.apache.spark.SparkEnv
import org.apache.spark.storage.BlockId
import org.apache.spark.util.Utils
import org.apache.spark.util.{AccumulatorV2, Utils}
// Task result. Also contains updates to accumulator variables.
private[spark] sealed trait TaskResult[T]

View file

@ -27,7 +27,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{LongAccumulator, ThreadUtils, Utils}
/**
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.

View file

@ -17,9 +17,9 @@
package org.apache.spark.scheduler
import org.apache.spark.AccumulatorV2
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AccumulatorV2
/**
* Low-level task scheduler interface, currently implemented exclusively by

View file

@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.

View file

@ -32,7 +32,7 @@ import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.SchedulingMode._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils}
/**
* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of

View file

@ -15,15 +15,15 @@
* limitations under the License.
*/
package org.apache.spark
package org.apache.spark.util
import java.{lang => jl}
import java.io.ObjectInputStream
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext}
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.Utils
private[spark] case class AccumulatorMetadata(
@ -204,8 +204,8 @@ private[spark] object AccumulatorContext {
private[this] val nextId = new AtomicLong(0L)
/**
* Returns a globally unique ID for a new [[Accumulator]].
* Note: Once you copy the [[Accumulator]] the ID is no longer unique.
* Returns a globally unique ID for a new [[AccumulatorV2]].
* Note: Once you copy the [[AccumulatorV2]] the ID is no longer unique.
*/
def newId(): Long = nextId.getAndIncrement
@ -213,14 +213,14 @@ private[spark] object AccumulatorContext {
def numAccums: Int = originals.size
/**
* Registers an [[Accumulator]] created on the driver such that it can be used on the executors.
* Registers an [[AccumulatorV2]] created on the driver such that it can be used on the executors.
*
* All accumulators registered here can later be used as a container for accumulating partial
* values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does.
* Note: if an accumulator is registered here, it should also be registered with the active
* context cleaner for cleanup so as to avoid memory leaks.
*
* If an [[Accumulator]] with the same ID was already registered, this does nothing instead
* If an [[AccumulatorV2]] with the same ID was already registered, this does nothing instead
* of overwriting it. We will never register same accumulator twice, this is just a sanity check.
*/
def register(a: AccumulatorV2[_, _]): Unit = {
@ -228,14 +228,14 @@ private[spark] object AccumulatorContext {
}
/**
* Unregisters the [[Accumulator]] with the given ID, if any.
* Unregisters the [[AccumulatorV2]] with the given ID, if any.
*/
def remove(id: Long): Unit = {
originals.remove(id)
}
/**
* Returns the [[Accumulator]] registered with the given ID, if any.
* Returns the [[AccumulatorV2]] registered with the given ID, if any.
*/
def get(id: Long): Option[AccumulatorV2[_, _]] = {
Option(originals.get(id)).map { ref =>
@ -249,7 +249,7 @@ private[spark] object AccumulatorContext {
}
/**
* Clears all registered [[Accumulator]]s. For testing only.
* Clears all registered [[AccumulatorV2]]s. For testing only.
*/
def clear(): Unit = {
originals.clear()

View file

@ -31,6 +31,7 @@ import org.scalatest.exceptions.TestFailedException
import org.apache.spark.AccumulatorParam.StringAccumulatorParam
import org.apache.spark.scheduler._
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator}
class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext {

View file

@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {

View file

@ -21,6 +21,7 @@ package org.apache.spark
import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
import org.apache.spark.internal.Logging
import org.apache.spark.util.AccumulatorContext
/**
* Base abstract class for all unit tests in Spark for handling common functionality.

View file

@ -21,6 +21,7 @@ import org.scalatest.Assertions
import org.apache.spark._
import org.apache.spark.storage.{BlockStatus, StorageLevel, TestBlockId}
import org.apache.spark.util.AccumulatorV2
class TaskMetricsSuite extends SparkFunSuite {

View file

@ -28,11 +28,10 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{CallSite, Utils}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
extends DAGSchedulerEventProcessLoop(dagScheduler) {

View file

@ -17,9 +17,10 @@
package org.apache.spark.scheduler
import org.apache.spark.{AccumulatorV2, LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AccumulatorV2
class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
test("launch of backend and scheduler") {

View file

@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.util.ManualClock
import org.apache.spark.util.{AccumulatorV2, ManualClock}
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
extends DAGScheduler(sc) {

View file

@ -17,7 +17,7 @@
package org.apache.spark.util
import org.apache.spark.{DoubleAccumulator, LongAccumulator, SparkFunSuite}
import org.apache.spark.SparkFunSuite
class AccumulatorV2Suite extends SparkFunSuite {

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Accumulable, Accumulator, AccumulatorContext}
import org.apache.spark.{Accumulable, Accumulator}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@ -33,6 +33,8 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.AccumulatorContext
private[sql] object InMemoryRelation {
def apply(

View file

@ -19,9 +19,9 @@ package org.apache.spark.sql.execution.metric
import java.text.NumberFormat
import org.apache.spark.{AccumulatorV2, SparkContext}
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.Utils
import org.apache.spark.util.{AccumulatorV2, Utils}
class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] {

View file

@ -22,13 +22,13 @@ import scala.language.postfixOps
import org.scalatest.concurrent.Eventually._
import org.apache.spark.AccumulatorContext
import org.apache.spark.sql.execution.RDDScanExec
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.AccumulatorContext
private case class BigData(s: String)

View file

@ -30,6 +30,8 @@ import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator}
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
import testImplicits._