[SPARK-25398] Minor bugs from comparing unrelated types

## What changes were proposed in this pull request?

Correct some comparisons between unrelated types to what they seem to… have been trying to do

## How was this patch tested?

Existing tests.

Closes #22384 from srowen/SPARK-25398.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Sean Owen 2018-09-11 14:46:03 -05:00
parent 9d9601ac8a
commit cfbdd6a1f5
11 changed files with 27 additions and 31 deletions

View file

@ -33,7 +33,6 @@ import org.apache.spark.storage.RDDInfo
import org.apache.spark.ui.SparkUI import org.apache.spark.ui.SparkUI
import org.apache.spark.util.AccumulatorContext import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.util.kvstore.KVStore
/** /**
* A mutable representation of a live entity in Spark (jobs, stages, tasks, et al). Every live * A mutable representation of a live entity in Spark (jobs, stages, tasks, et al). Every live
@ -588,8 +587,7 @@ private object LiveEntityHelpers {
.filter { acc => .filter { acc =>
// We don't need to store internal or SQL accumulables as their values will be shown in // We don't need to store internal or SQL accumulables as their values will be shown in
// other places, so drop them to reduce the memory usage. // other places, so drop them to reduce the memory usage.
!acc.internal && (!acc.metadata.isDefined || !acc.internal && acc.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
acc.metadata.get != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
} }
.map { acc => .map { acc =>
new v1.AccumulableInfo( new v1.AccumulableInfo(

View file

@ -175,7 +175,7 @@ private[spark] object ClosureCleaner extends Logging {
closure.getClass.isSynthetic && closure.getClass.isSynthetic &&
closure closure
.getClass .getClass
.getInterfaces.exists(_.getName.equals("scala.Serializable")) .getInterfaces.exists(_.getName == "scala.Serializable")
if (isClosureCandidate) { if (isClosureCandidate) {
try { try {

View file

@ -17,8 +17,6 @@
package org.apache.spark.util.collection package org.apache.spark.util.collection
import java.util.Objects
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.ref.WeakReference import scala.ref.WeakReference
@ -509,7 +507,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
.sorted .sorted
assert(it.isEmpty) assert(it.isEmpty)
assert(keys == (0 until 100)) assert(keys == (0 until 100).toList)
assert(map.numSpills == 0) assert(map.numSpills == 0)
// these asserts try to show that we're no longer holding references to the underlying map. // these asserts try to show that we're no longer holding references to the underlying map.

View file

@ -202,7 +202,7 @@ private[spark] class MesosClusterScheduler(
} else if (removeFromPendingRetryDrivers(submissionId)) { } else if (removeFromPendingRetryDrivers(submissionId)) {
k.success = true k.success = true
k.message = "Removed driver while it's being retried" k.message = "Removed driver while it's being retried"
} else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) { } else if (finishedDrivers.exists(_.driverDescription.submissionId == submissionId)) {
k.success = false k.success = false
k.message = "Driver already terminated" k.message = "Driver already terminated"
} else { } else {
@ -222,21 +222,21 @@ private[spark] class MesosClusterScheduler(
} }
s.submissionId = submissionId s.submissionId = submissionId
stateLock.synchronized { stateLock.synchronized {
if (queuedDrivers.exists(_.submissionId.equals(submissionId))) { if (queuedDrivers.exists(_.submissionId == submissionId)) {
s.success = true s.success = true
s.driverState = "QUEUED" s.driverState = "QUEUED"
} else if (launchedDrivers.contains(submissionId)) { } else if (launchedDrivers.contains(submissionId)) {
s.success = true s.success = true
s.driverState = "RUNNING" s.driverState = "RUNNING"
launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString) launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString)
} else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) { } else if (finishedDrivers.exists(_.driverDescription.submissionId == submissionId)) {
s.success = true s.success = true
s.driverState = "FINISHED" s.driverState = "FINISHED"
finishedDrivers finishedDrivers
.find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus .find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus
.foreach(state => s.message = state.toString) .foreach(state => s.message = state.toString)
} else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) { } else if (pendingRetryDrivers.exists(_.submissionId == submissionId)) {
val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId)) val status = pendingRetryDrivers.find(_.submissionId == submissionId)
.get.retryState.get.lastFailureStatus .get.retryState.get.lastFailureStatus
s.success = true s.success = true
s.driverState = "RETRYING" s.driverState = "RETRYING"
@ -254,13 +254,13 @@ private[spark] class MesosClusterScheduler(
*/ */
def getDriverState(submissionId: String): Option[MesosDriverState] = { def getDriverState(submissionId: String): Option[MesosDriverState] = {
stateLock.synchronized { stateLock.synchronized {
queuedDrivers.find(_.submissionId.equals(submissionId)) queuedDrivers.find(_.submissionId == submissionId)
.map(d => new MesosDriverState("QUEUED", d)) .map(d => new MesosDriverState("QUEUED", d))
.orElse(launchedDrivers.get(submissionId) .orElse(launchedDrivers.get(submissionId)
.map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d)))) .map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d))))
.orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId)) .orElse(finishedDrivers.find(_.driverDescription.submissionId == submissionId)
.map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d)))) .map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d))))
.orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId)) .orElse(pendingRetryDrivers.find(_.submissionId == submissionId)
.map(d => new MesosDriverState("RETRYING", d))) .map(d => new MesosDriverState("RETRYING", d)))
} }
} }
@ -814,7 +814,7 @@ private[spark] class MesosClusterScheduler(
status: Int): Unit = {} status: Int): Unit = {}
private def removeFromQueuedDrivers(subId: String): Boolean = { private def removeFromQueuedDrivers(subId: String): Boolean = {
val index = queuedDrivers.indexWhere(_.submissionId.equals(subId)) val index = queuedDrivers.indexWhere(_.submissionId == subId)
if (index != -1) { if (index != -1) {
queuedDrivers.remove(index) queuedDrivers.remove(index)
queuedDriversState.expunge(subId) queuedDriversState.expunge(subId)
@ -834,7 +834,7 @@ private[spark] class MesosClusterScheduler(
} }
private def removeFromPendingRetryDrivers(subId: String): Boolean = { private def removeFromPendingRetryDrivers(subId: String): Boolean = {
val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(subId)) val index = pendingRetryDrivers.indexWhere(_.submissionId == subId)
if (index != -1) { if (index != -1) {
pendingRetryDrivers.remove(index) pendingRetryDrivers.remove(index)
pendingRetryDriversState.expunge(subId) pendingRetryDriversState.expunge(subId)

View file

@ -21,7 +21,7 @@ import java.util.{Collection, Collections, Date}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import org.apache.mesos.Protos.{Environment, Secret, TaskState => MesosTaskState, _} import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Value.{Scalar, Type} import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.{ArgumentCaptor, Matchers}
@ -146,14 +146,14 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(scheduler.getResource(resources, "cpus") == 1.5) assert(scheduler.getResource(resources, "cpus") == 1.5)
assert(scheduler.getResource(resources, "mem") == 1200) assert(scheduler.getResource(resources, "mem") == 1200)
val resourcesSeq: Seq[Resource] = resources.asScala val resourcesSeq: Seq[Resource] = resources.asScala
val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList val cpus = resourcesSeq.filter(_.getName == "cpus").toList
assert(cpus.size == 2) assert(cpus.size == 2)
assert(cpus.exists(_.getRole().equals("role2"))) assert(cpus.exists(_.getRole() == "role2"))
assert(cpus.exists(_.getRole().equals("*"))) assert(cpus.exists(_.getRole() == "*"))
val mem = resourcesSeq.filter(_.getName.equals("mem")).toList val mem = resourcesSeq.filter(_.getName == "mem").toList
assert(mem.size == 2) assert(mem.size == 2)
assert(mem.exists(_.getRole().equals("role2"))) assert(mem.exists(_.getRole() == "role2"))
assert(mem.exists(_.getRole().equals("*"))) assert(mem.exists(_.getRole() == "*"))
verify(driver, times(1)).launchTasks( verify(driver, times(1)).launchTasks(
Matchers.eq(Collections.singleton(offer.getId)), Matchers.eq(Collections.singleton(offer.getId)),

View file

@ -106,7 +106,7 @@ class MesosFineGrainedSchedulerBackendSuite
// uri is null. // uri is null.
val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
val executorResources = executorInfo.getResourcesList val executorResources = executorInfo.getResourcesList
val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue val cpus = executorResources.asScala.find(_.getName == "cpus").get.getScalar.getValue
assert(cpus === mesosExecutorCores) assert(cpus === mesosExecutorCores)
} }

View file

@ -191,7 +191,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
appContext.getQueue should be ("staging-queue") appContext.getQueue should be ("staging-queue")
appContext.getAMContainerSpec should be (containerLaunchContext) appContext.getAMContainerSpec should be (containerLaunchContext)
appContext.getApplicationType should be ("SPARK") appContext.getApplicationType should be ("SPARK")
appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method => appContext.getClass.getMethods.filter(_.getName == "getApplicationTags").foreach { method =>
val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]] val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]]
tags should contain allOf ("tag1", "dup", "tag2", "multi word") tags should contain allOf ("tag1", "dup", "tag2", "multi word")
tags.asScala.count(_.nonEmpty) should be (4) tags.asScala.count(_.nonEmpty) should be (4)

View file

@ -147,7 +147,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
.where(false) .where(false)
.select('a) .select('a)
.where('a > 1) .where('a > 1)
.where('a != 200) .where('a =!= 200)
.orderBy('a.asc) .orderBy('a.asc)
val optimized = Optimize.execute(query.analyze) val optimized = Optimize.execute(query.analyze)

View file

@ -114,7 +114,7 @@ class UnsafeArraySuite extends SparkFunSuite {
assert(unsafeDate.isInstanceOf[UnsafeArrayData]) assert(unsafeDate.isInstanceOf[UnsafeArrayData])
assert(unsafeDate.numElements == dateArray.length) assert(unsafeDate.numElements == dateArray.length)
dateArray.zipWithIndex.map { case (e, i) => dateArray.zipWithIndex.map { case (e, i) =>
assert(unsafeDate.get(i, DateType) == e) assert(unsafeDate.get(i, DateType).asInstanceOf[Int] == e)
} }
val unsafeTimestamp = ExpressionEncoder[Array[Long]].resolveAndBind(). val unsafeTimestamp = ExpressionEncoder[Array[Long]].resolveAndBind().
@ -122,7 +122,7 @@ class UnsafeArraySuite extends SparkFunSuite {
assert(unsafeTimestamp.isInstanceOf[UnsafeArrayData]) assert(unsafeTimestamp.isInstanceOf[UnsafeArrayData])
assert(unsafeTimestamp.numElements == timestampArray.length) assert(unsafeTimestamp.numElements == timestampArray.length)
timestampArray.zipWithIndex.map { case (e, i) => timestampArray.zipWithIndex.map { case (e, i) =>
assert(unsafeTimestamp.get(i, TimestampType) == e) assert(unsafeTimestamp.get(i, TimestampType).asInstanceOf[Long] == e)
} }
Seq(decimalArray4_1, decimalArray20_20).map { decimalArray => Seq(decimalArray4_1, decimalArray20_20).map { decimalArray =>

View file

@ -611,7 +611,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
).toDF("id", "stringData") ).toDF("id", "stringData")
val sampleDF = df.sample(false, 0.7, 50) val sampleDF = df.sample(false, 0.7, 50)
// After sampling, sampleDF doesn't contain id=1. // After sampling, sampleDF doesn't contain id=1.
assert(!sampleDF.select("id").collect.contains(1)) assert(!sampleDF.select("id").as[Int].collect.contains(1))
// simpleUdf should not encounter id=1. // simpleUdf should not encounter id=1.
checkAnswer(sampleDF.select(simpleUdf($"id")), List.fill(sampleDF.count.toInt)(Row(1))) checkAnswer(sampleDF.select(simpleUdf($"id")), List.fill(sampleDF.count.toInt)(Row(1)))
} }

View file

@ -427,7 +427,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
assert(errMsg.startsWith("Parquet column cannot be converted in file")) assert(errMsg.startsWith("Parquet column cannot be converted in file"))
val file = errMsg.substring("Parquet column cannot be converted in file ".length, val file = errMsg.substring("Parquet column cannot be converted in file ".length,
errMsg.indexOf(". ")) errMsg.indexOf(". "))
val col = spark.read.parquet(file).schema.fields.filter(_.name.equals("a")) val col = spark.read.parquet(file).schema.fields.filter(_.name == "a")
assert(col.length == 1) assert(col.length == 1)
if (col(0).dataType == StringType) { if (col(0).dataType == StringType) {
assert(errMsg.contains("Column: [a], Expected: int, Found: BINARY")) assert(errMsg.contains("Column: [a], Expected: int, Found: BINARY"))