[SPARK-36602][COER][SQL] Clean up redundant asInstanceOf casts
### What changes were proposed in this pull request? The change of this pr is remove redundant asInstanceOf casts in Spark code. ### Why are the changes needed? Code simplification ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GA or Jenkins Tests. Closes #33852 from LuciferYang/cleanup-asInstanceof. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
parent
6bd491ecb8
commit
35848385ae
|
@ -165,7 +165,7 @@ private[r] class RBackendHandler(server: RBackend)
|
|||
|
||||
// Write status bit
|
||||
writeInt(dos, 0)
|
||||
writeObject(dos, ret.asInstanceOf[AnyRef], server.jvmObjectTracker)
|
||||
writeObject(dos, ret, server.jvmObjectTracker)
|
||||
} else if (methodName == "<init>") {
|
||||
// methodName should be "<init>" for constructor
|
||||
val ctors = cls.getConstructors
|
||||
|
|
|
@ -393,7 +393,7 @@ abstract class RDD[T: ClassTag](
|
|||
}
|
||||
// Need to compute the block.
|
||||
case Right(iter) =>
|
||||
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
|
||||
new InterruptibleIterator(context, iter)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
|
|||
FileInputFormat.setInputPaths(conf, path)
|
||||
|
||||
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
|
||||
ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], conf).asInstanceOf[
|
||||
ReflectionUtils.newInstance(inputFormatClazz, conf).asInstanceOf[
|
||||
org.apache.hadoop.mapreduce.InputFormat[_, _]]
|
||||
val job = Job.getInstance(conf)
|
||||
|
||||
|
@ -119,7 +119,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
|
|||
FileInputFormat.setInputPaths(jobConf, path)
|
||||
|
||||
val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
|
||||
ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], jobConf).asInstanceOf[
|
||||
ReflectionUtils.newInstance(inputFormatClazz, jobConf).asInstanceOf[
|
||||
org.apache.hadoop.mapred.InputFormat[_, _]]
|
||||
|
||||
val retval = new ArrayBuffer[SplitInfo]()
|
||||
|
|
|
@ -2820,7 +2820,7 @@ private[spark] object Utils extends Logging {
|
|||
klass.getConstructor().newInstance()
|
||||
}
|
||||
|
||||
Some(ext.asInstanceOf[T])
|
||||
Some(ext)
|
||||
} catch {
|
||||
case _: NoSuchMethodException =>
|
||||
throw new SparkException(
|
||||
|
|
|
@ -238,7 +238,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
|
|||
test("aggregate") {
|
||||
val pairs = sc.makeRDD(Seq(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
|
||||
type StringMap = scala.collection.mutable.Map[String, Int]
|
||||
val emptyMap = HashMap[String, Int]().withDefaultValue(0).asInstanceOf[StringMap]
|
||||
val emptyMap = HashMap[String, Int]().withDefaultValue(0)
|
||||
val mergeElement: (StringMap, (String, Int)) => StringMap = (map, pair) => {
|
||||
map(pair._1) += pair._2
|
||||
map
|
||||
|
|
|
@ -315,7 +315,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
|
|||
hostLocalBlocks.foreach { case (blockId, buf) =>
|
||||
doReturn(buf)
|
||||
.when(blockManager)
|
||||
.getHostLocalShuffleData(meq(blockId.asInstanceOf[ShuffleBlockId]), any())
|
||||
.getHostLocalShuffleData(meq(blockId), any())
|
||||
}
|
||||
val hostLocalDirs = Map("test-host-local-client-1" -> Array("local-dir"))
|
||||
// returning local dir for hostLocalBmId
|
||||
|
|
|
@ -118,7 +118,7 @@ class ThreadUtilsSuite extends SparkFunSuite {
|
|||
val exception = intercept[IllegalArgumentException] {
|
||||
runInNewThread("thread-name") { throw new IllegalArgumentException(uniqueExceptionMessage) }
|
||||
}
|
||||
assert(exception.asInstanceOf[IllegalArgumentException].getMessage === uniqueExceptionMessage)
|
||||
assert(exception.getMessage === uniqueExceptionMessage)
|
||||
assert(exception.getStackTrace.mkString("\n").contains(
|
||||
"... run in separate thread using org.apache.spark.util.ThreadUtils ..."),
|
||||
"stack trace does not contain expected place holder"
|
||||
|
|
|
@ -95,7 +95,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
|
|||
}
|
||||
|
||||
test("RDD generation") {
|
||||
val inputStream = KinesisInputDStream.builder.
|
||||
val kinesisStream = KinesisInputDStream.builder.
|
||||
streamingContext(ssc).
|
||||
checkpointAppName(appName).
|
||||
streamName("dummyStream").
|
||||
|
@ -105,9 +105,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
|
|||
storageLevel(StorageLevel.MEMORY_AND_DISK_2).
|
||||
kinesisCredentials(BasicCredentials(dummyAWSAccessKey, dummyAWSSecretKey)).
|
||||
build()
|
||||
assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]])
|
||||
assert(kinesisStream.isInstanceOf[KinesisInputDStream[Array[Byte]]])
|
||||
|
||||
val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]]
|
||||
val time = Time(1000)
|
||||
|
||||
// Generate block info data for testing
|
||||
|
|
|
@ -267,7 +267,7 @@ class BLASSuite extends SparkMLFunSuite {
|
|||
|
||||
val dATT = dATman.transpose
|
||||
val sATT = sATman.transpose
|
||||
val BTT = BTman.transpose.asInstanceOf[DenseMatrix]
|
||||
val BTT = BTman.transpose
|
||||
|
||||
assert(dATT.multiply(B) ~== expected absTol 1e-15)
|
||||
assert(sATT.multiply(B) ~== expected absTol 1e-15)
|
||||
|
|
|
@ -596,8 +596,8 @@ class MatricesSuite extends SparkMLFunSuite {
|
|||
new DenseMatrix(4, 3, Array(0.0, 1.0, 0.0, 0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 3.0))
|
||||
val sA = new SparseMatrix(4, 3, Array(0, 1, 3, 4), Array(1, 0, 2, 3), Array(1.0, 2.0, 1.0, 3.0))
|
||||
|
||||
val dAT = dA.transpose.asInstanceOf[DenseMatrix]
|
||||
val sAT = sA.transpose.asInstanceOf[SparseMatrix]
|
||||
val dAT = dA.transpose
|
||||
val sAT = sA.transpose
|
||||
val dATexpected =
|
||||
new DenseMatrix(3, 4, Array(0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 3.0))
|
||||
val sATexpected =
|
||||
|
@ -611,7 +611,7 @@ class MatricesSuite extends SparkMLFunSuite {
|
|||
assert(sA(2, 1) === sAT(1, 2))
|
||||
|
||||
assert(!dA.toArray.eq(dAT.toArray), "has to have a new array")
|
||||
assert(dA.values.eq(dAT.transpose.asInstanceOf[DenseMatrix].values), "should not copy array")
|
||||
assert(dA.values.eq(dAT.transpose.values), "should not copy array")
|
||||
|
||||
assert(dAT.toSparse.asBreeze === sATexpected.asBreeze)
|
||||
assert(sAT.toDense.asBreeze === dATexpected.asBreeze)
|
||||
|
|
|
@ -398,7 +398,7 @@ private[python] class PythonMLLibAPI extends Serializable {
|
|||
|
||||
if (initialModelWeights != null && initialModelMu != null && initialModelSigma != null) {
|
||||
val gaussians = initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map {
|
||||
case (x, y) => new MultivariateGaussian(x.asInstanceOf[Vector], y.asInstanceOf[Matrix])
|
||||
case (x, y) => new MultivariateGaussian(x, y)
|
||||
}
|
||||
val initialModel = new GaussianMixtureModel(
|
||||
initialModelWeights.asScala.toArray, gaussians.toArray)
|
||||
|
|
|
@ -267,7 +267,7 @@ class BLASSuite extends SparkFunSuite {
|
|||
|
||||
val dATT = dATman.transpose
|
||||
val sATT = sATman.transpose
|
||||
val BTT = BTman.transpose.asInstanceOf[DenseMatrix]
|
||||
val BTT = BTman.transpose
|
||||
|
||||
assert(dATT.multiply(B) ~== expected absTol 1e-15)
|
||||
assert(sATT.multiply(B) ~== expected absTol 1e-15)
|
||||
|
|
|
@ -231,8 +231,8 @@ class MatricesSuite extends SparkFunSuite {
|
|||
new DenseMatrix(4, 3, Array(0.0, 1.0, 0.0, 0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 3.0))
|
||||
val sA = new SparseMatrix(4, 3, Array(0, 1, 3, 4), Array(1, 0, 2, 3), Array(1.0, 2.0, 1.0, 3.0))
|
||||
|
||||
val dAT = dA.transpose.asInstanceOf[DenseMatrix]
|
||||
val sAT = sA.transpose.asInstanceOf[SparseMatrix]
|
||||
val dAT = dA.transpose
|
||||
val sAT = sA.transpose
|
||||
val dATexpected =
|
||||
new DenseMatrix(3, 4, Array(0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 3.0))
|
||||
val sATexpected =
|
||||
|
@ -246,7 +246,7 @@ class MatricesSuite extends SparkFunSuite {
|
|||
assert(sA(2, 1) === sAT(1, 2))
|
||||
|
||||
assert(!dA.toArray.eq(dAT.toArray), "has to have a new array")
|
||||
assert(dA.values.eq(dAT.transpose.asInstanceOf[DenseMatrix].values), "should not copy array")
|
||||
assert(dA.values.eq(dAT.transpose.values), "should not copy array")
|
||||
|
||||
assert(dAT.toSparse.asBreeze === sATexpected.asBreeze)
|
||||
assert(sAT.toDense.asBreeze === dATexpected.asBreeze)
|
||||
|
|
|
@ -35,7 +35,7 @@ class BinaryClassificationPMMLModelExportSuite extends SparkFunSuite {
|
|||
|
||||
// assert that the PMML format is as expected
|
||||
assert(logisticModelExport.isInstanceOf[PMMLModelExport])
|
||||
val pmml = logisticModelExport.asInstanceOf[PMMLModelExport].getPmml
|
||||
val pmml = logisticModelExport.getPmml
|
||||
assert(pmml.getHeader.getDescription === "logistic regression")
|
||||
// check that the number of fields match the weights size
|
||||
assert(pmml.getDataDictionary.getNumberOfFields === logisticRegressionModel.weights.size + 1)
|
||||
|
|
|
@ -36,7 +36,7 @@ class KMeansPMMLModelExportSuite extends SparkFunSuite {
|
|||
|
||||
// assert that the PMML format is as expected
|
||||
assert(modelExport.isInstanceOf[PMMLModelExport])
|
||||
val pmml = modelExport.asInstanceOf[PMMLModelExport].getPmml
|
||||
val pmml = modelExport.getPmml
|
||||
assert(pmml.getHeader.getDescription === "k-means clustering")
|
||||
// check that the number of fields match the single vector size
|
||||
assert(pmml.getDataDictionary.getNumberOfFields === clusterCenters(0).size)
|
||||
|
|
|
@ -809,7 +809,7 @@ object ScalaReflection extends ScalaReflection {
|
|||
*/
|
||||
def findConstructor[T](cls: Class[T], paramTypes: Seq[Class[_]]): Option[Seq[AnyRef] => T] = {
|
||||
Option(ConstructorUtils.getMatchingAccessibleConstructor(cls, paramTypes: _*)) match {
|
||||
case Some(c) => Some(x => c.newInstance(x: _*).asInstanceOf[T])
|
||||
case Some(c) => Some(x => c.newInstance(x: _*))
|
||||
case None =>
|
||||
val companion = mirror.staticClass(cls.getName).companion
|
||||
val moduleMirror = mirror.reflectModule(companion.asModule)
|
||||
|
|
|
@ -175,7 +175,7 @@ final class MutableAny extends MutableValue {
|
|||
override def boxed: Any = if (isNull) null else value
|
||||
override def update(v: Any): Unit = {
|
||||
isNull = false
|
||||
value = v.asInstanceOf[Any]
|
||||
value = v
|
||||
}
|
||||
override def copy(): MutableAny = {
|
||||
val newCopy = new MutableAny
|
||||
|
|
|
@ -597,7 +597,7 @@ case class IntegralDivide(
|
|||
if (res == null) {
|
||||
null
|
||||
} else {
|
||||
integral.asInstanceOf[Integral[Any]].toLong(res)
|
||||
integral.toLong(res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,7 +114,7 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB
|
|||
case (result: Array[Byte], expected: Array[Byte]) =>
|
||||
java.util.Arrays.equals(result, expected)
|
||||
case (result: Double, expected: Spread[Double @unchecked]) =>
|
||||
expected.asInstanceOf[Spread[Double]].isWithin(result)
|
||||
expected.isWithin(result)
|
||||
case (result: InternalRow, expected: InternalRow) =>
|
||||
val st = dataType.asInstanceOf[StructType]
|
||||
assert(result.numFields == st.length && expected.numFields == st.length)
|
||||
|
|
|
@ -49,7 +49,7 @@ class ReusableStringReaderSuite extends SparkFunSuite {
|
|||
val reader = new ReusableStringReader
|
||||
|
||||
if (reader.markSupported()) {
|
||||
reader.asInstanceOf[ReusableStringReader].set(fox)
|
||||
reader.set(fox)
|
||||
assert(reader.ready())
|
||||
|
||||
val cc = new Array[Char](6)
|
||||
|
@ -73,14 +73,14 @@ class ReusableStringReaderSuite extends SparkFunSuite {
|
|||
|
||||
test("skip") {
|
||||
val reader = new ReusableStringReader
|
||||
reader.asInstanceOf[ReusableStringReader].set(fox)
|
||||
reader.set(fox)
|
||||
|
||||
// skip entire the data:
|
||||
var skipped = reader.skip(fox.length() + 1)
|
||||
assert(fox.length() == skipped)
|
||||
assert(-1 == reader.read())
|
||||
|
||||
reader.asInstanceOf[ReusableStringReader].set(fox) // reset the data
|
||||
reader.set(fox) // reset the data
|
||||
val cc = new Array[Char](6)
|
||||
var read = reader.read(cc)
|
||||
assert(read == 6)
|
||||
|
|
|
@ -116,6 +116,6 @@ class UDFXPathUtilSuite extends SparkFunSuite {
|
|||
test("node list eval") {
|
||||
val ret = util.evalNodeList("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", "a/*")
|
||||
assert(ret != null && ret.isInstanceOf[NodeList])
|
||||
assert(ret.asInstanceOf[NodeList].getLength == 5)
|
||||
assert(ret.getLength == 5)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import scala.util.Random
|
|||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.sql.RandomDataGenerator
|
||||
import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder}
|
||||
import org.apache.spark.sql.catalyst.expressions.{SafeProjection, UnsafeArrayData, UnsafeProjection}
|
||||
import org.apache.spark.sql.catalyst.expressions.{SafeProjection, UnsafeProjection}
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
class ArrayDataIndexedSeqSuite extends SparkFunSuite {
|
||||
|
@ -83,7 +83,7 @@ class ArrayDataIndexedSeqSuite extends SparkFunSuite {
|
|||
val safeRow = safeRowConverter(unsafeRow)
|
||||
|
||||
val genericArrayData = safeRow.getArray(0).asInstanceOf[GenericArrayData]
|
||||
val unsafeArrayData = unsafeRow.getArray(0).asInstanceOf[UnsafeArrayData]
|
||||
val unsafeArrayData = unsafeRow.getArray(0)
|
||||
|
||||
val elementType = dt.elementType
|
||||
test("ArrayDataIndexedSeq - UnsafeArrayData - " + dt.toString) {
|
||||
|
|
|
@ -48,7 +48,7 @@ class CaseInsensitiveMapSuite extends SparkFunSuite {
|
|||
test("SPARK-32377: CaseInsensitiveMap should be deterministic for addition") {
|
||||
var m = CaseInsensitiveMap(Map.empty[String, String])
|
||||
Seq(("paTh", "1"), ("PATH", "2"), ("Path", "3"), ("patH", "4"), ("path", "5")).foreach { kv =>
|
||||
m = (m + kv).asInstanceOf[CaseInsensitiveMap[String]]
|
||||
m = (m + kv)
|
||||
assert(m.get("path").contains(kv._2))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -190,7 +190,7 @@ private[sql] object SQLUtils extends Logging {
|
|||
dataType match {
|
||||
case 's' =>
|
||||
// Read StructType for DataFrame
|
||||
val fields = SerDe.readList(dis, jvmObjectTracker = null).asInstanceOf[Array[Object]]
|
||||
val fields = SerDe.readList(dis, jvmObjectTracker = null)
|
||||
Row.fromSeq(fields)
|
||||
case _ => null
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ case class ContinuousScanExec(
|
|||
conf.continuousStreamingExecutorPollIntervalMs,
|
||||
partitions,
|
||||
schema,
|
||||
readerFactory.asInstanceOf[ContinuousPartitionReaderFactory],
|
||||
readerFactory,
|
||||
customMetrics)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
|||
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
|
||||
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
|
||||
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability}
|
||||
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset => OffsetV2, PartitionOffset, ReadLimit}
|
||||
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, PartitionOffset, ReadLimit}
|
||||
import org.apache.spark.sql.errors.QueryExecutionErrors
|
||||
import org.apache.spark.sql.execution.SQLExecution
|
||||
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
|
||||
|
@ -335,7 +335,7 @@ class ContinuousExecution(
|
|||
val offset =
|
||||
sources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
|
||||
committedOffsets ++= Seq(sources(0) -> offset)
|
||||
sources(0).commit(offset.asInstanceOf[OffsetV2])
|
||||
sources(0).commit(offset)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -208,7 +208,7 @@ abstract class QueryTest extends PlanTest {
|
|||
def assertCached(query: Dataset[_], cachedName: String, storageLevel: StorageLevel): Unit = {
|
||||
val planWithCaching = query.queryExecution.withCachedData
|
||||
val matched = planWithCaching.collectFirst { case cached: InMemoryRelation =>
|
||||
val cacheBuilder = cached.asInstanceOf[InMemoryRelation].cacheBuilder
|
||||
val cacheBuilder = cached.cacheBuilder
|
||||
cachedName == cacheBuilder.tableName.get &&
|
||||
(storageLevel == cacheBuilder.storageLevel)
|
||||
}.getOrElse(false)
|
||||
|
|
|
@ -94,7 +94,7 @@ class HashedRelationSuite extends SharedSparkSession {
|
|||
|
||||
val os2 = new ByteArrayOutputStream()
|
||||
val out2 = new ObjectOutputStream(os2)
|
||||
hashed2.asInstanceOf[UnsafeHashedRelation].writeExternal(out2)
|
||||
hashed2.writeExternal(out2)
|
||||
out2.flush()
|
||||
// This depends on that the order of items in BytesToBytesMap.iterator() is exactly the same
|
||||
// as they are inserted
|
||||
|
|
|
@ -310,7 +310,7 @@ class StreamSuite extends StreamTest {
|
|||
// For each batch, we would log the state change during the execution
|
||||
// This checks whether the key of the state change log is the expected batch id
|
||||
def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery =
|
||||
AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId == expectedId,
|
||||
AssertOnQuery(_.lastExecution.currentBatchId == expectedId,
|
||||
s"lastExecution's currentBatchId should be $expectedId")
|
||||
|
||||
// For each batch, we would log the sink change after the execution
|
||||
|
|
|
@ -64,7 +64,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
|
|||
extends AssertOnQuery(q => {
|
||||
eventually(Timeout(streamingTimeout)) {
|
||||
if (q.exception.isEmpty) {
|
||||
assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
|
||||
assert(clock.isStreamWaitingAt(clock.getTimeMillis))
|
||||
}
|
||||
}
|
||||
if (q.exception.isDefined) {
|
||||
|
|
|
@ -353,7 +353,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
|
|||
Trigger.Once()) { v2Query =>
|
||||
val sink = v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
|
||||
assert(sink.isInstanceOf[Table])
|
||||
assert(sink.asInstanceOf[Table].schema() == StructType(Nil))
|
||||
assert(sink.schema() == StructType(Nil))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.io.Writable
|
|||
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat => oldInputClass, JobConf}
|
||||
import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
|
||||
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD, UnionRDD}
|
||||
|
@ -352,7 +351,7 @@ class HadoopTableReader(
|
|||
initializeJobConfFunc: JobConf => Unit): RDD[Writable] = {
|
||||
val rdd = new HadoopRDD(
|
||||
sparkSession.sparkContext,
|
||||
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
|
||||
_broadcastedHadoopConf,
|
||||
Some(initializeJobConfFunc),
|
||||
inputFormatClass,
|
||||
classOf[Writable],
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.spark.{HashPartitioner, SparkConf, SparkException}
|
|||
import org.apache.spark.rdd.{BlockRDD, RDD}
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
|
||||
import org.apache.spark.util.{Clock, ManualClock}
|
||||
import org.apache.spark.util.ManualClock
|
||||
|
||||
class BasicOperationsSuite extends TestSuiteBase {
|
||||
test("map") {
|
||||
|
@ -807,7 +807,7 @@ class BasicOperationsSuite extends TestSuiteBase {
|
|||
cleanupTestInput.size,
|
||||
numExpectedOutput,
|
||||
() => assertCleanup(operatedStream))
|
||||
val clock = ssc.scheduler.clock.asInstanceOf[Clock]
|
||||
val clock = ssc.scheduler.clock
|
||||
assert(clock.getTimeMillis() === Seconds(10).milliseconds)
|
||||
assert(output.size === numExpectedOutput)
|
||||
operatedStream
|
||||
|
|
Loading…
Reference in a new issue