[SPARK-21415] Triage scapegoat warnings, part 1

## What changes were proposed in this pull request?

Address scapegoat warnings for:
- BigDecimal double constructor
- Catching NPE
- Finalizer without super
- List.size is O(n)
- Prefer Seq.empty
- Prefer Set.empty
- reverse.map instead of reverseMap
- Type shadowing
- Unnecessary if condition.
- Use .log1p
- Var could be val

In some instances like Seq.empty, I avoided making the change even where valid in test code to keep the scope of the change smaller. Those issues are concerned with performance and it won't matter for tests.

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #18635 from srowen/Scapegoat1.
This commit is contained in:
Sean Owen 2017-07-18 08:47:17 +01:00
parent 26cd2ca040
commit e26dac5feb
82 changed files with 186 additions and 195 deletions

View file

@ -94,22 +94,24 @@ private[spark] case class SSLOptions(
* are supported by the current Java security provider for this protocol. * are supported by the current Java security provider for this protocol.
*/ */
private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) { private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) {
Set() Set.empty
} else { } else {
var context: SSLContext = null var context: SSLContext = null
if (protocol.isEmpty) {
logDebug("No SSL protocol specified")
context = SSLContext.getDefault
} else {
try { try {
context = SSLContext.getInstance(protocol.orNull) context = SSLContext.getInstance(protocol.get)
/* The set of supported algorithms does not depend upon the keys, trust, or /* The set of supported algorithms does not depend upon the keys, trust, or
rng, although they will influence which algorithms are eventually used. */ rng, although they will influence which algorithms are eventually used. */
context.init(null, null, null) context.init(null, null, null)
} catch { } catch {
case npe: NullPointerException =>
logDebug("No SSL protocol specified")
context = SSLContext.getDefault
case nsa: NoSuchAlgorithmException => case nsa: NoSuchAlgorithmException =>
logDebug(s"No support for requested SSL protocol ${protocol.get}") logDebug(s"No support for requested SSL protocol ${protocol.get}")
context = SSLContext.getDefault context = SSLContext.getDefault
} }
}
val providerAlgorithms = context.getServerSocketFactory.getSupportedCipherSuites.toSet val providerAlgorithms = context.getServerSocketFactory.getSupportedCipherSuites.toSet

View file

@ -420,7 +420,7 @@ object SparkEnv extends Logging {
if (!conf.contains("spark.scheduler.mode")) { if (!conf.contains("spark.scheduler.mode")) {
Seq(("spark.scheduler.mode", schedulingMode)) Seq(("spark.scheduler.mode", schedulingMode))
} else { } else {
Seq[(String, String)]() Seq.empty[(String, String)]
} }
val sparkProperties = (conf.getAll ++ schedulerMode).sorted val sparkProperties = (conf.getAll ++ schedulerMode).sorted

View file

@ -58,8 +58,8 @@ private[spark] object TestUtils {
def createJarWithClasses( def createJarWithClasses(
classNames: Seq[String], classNames: Seq[String],
toStringValue: String = "", toStringValue: String = "",
classNamesWithBase: Seq[(String, String)] = Seq(), classNamesWithBase: Seq[(String, String)] = Seq.empty,
classpathUrls: Seq[URL] = Seq()): URL = { classpathUrls: Seq[URL] = Seq.empty): URL = {
val tempDir = Utils.createTempDir() val tempDir = Utils.createTempDir()
val files1 = for (name <- classNames) yield { val files1 = for (name <- classNames) yield {
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls) createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
@ -137,7 +137,7 @@ private[spark] object TestUtils {
val options = if (classpathUrls.nonEmpty) { val options = if (classpathUrls.nonEmpty) {
Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator)) Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator))
} else { } else {
Seq() Seq.empty
} }
compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call() compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call()
@ -160,7 +160,7 @@ private[spark] object TestUtils {
destDir: File, destDir: File,
toStringValue: String = "", toStringValue: String = "",
baseClass: String = null, baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = { classpathUrls: Seq[URL] = Seq.empty): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className, val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" + "public class " + className + extendsText + " implements java.io.Serializable {" +

View file

@ -974,6 +974,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
} }
} }
} }
super.finalize()
} }
} }
// scalastyle:on no.finalize // scalastyle:on no.finalize

View file

@ -128,8 +128,7 @@ private[spark] object SerDe {
} }
def readBoolean(in: DataInputStream): Boolean = { def readBoolean(in: DataInputStream): Boolean = {
val intVal = in.readInt() in.readInt() != 0
if (intVal == 0) false else true
} }
def readDate(in: DataInputStream): Date = { def readDate(in: DataInputStream): Date = {

View file

@ -337,7 +337,7 @@ class SparkHadoopUtil extends Logging {
if (credentials != null) { if (credentials != null) {
credentials.getAllTokens.asScala.map(tokenToString) credentials.getAllTokens.asScala.map(tokenToString)
} else { } else {
Seq() Seq.empty
} }
} }

View file

@ -317,7 +317,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newLastScanTime = getNewLastScanTime() val newLastScanTime = getNewLastScanTime()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]()) .getOrElse(Seq.empty[FileStatus])
// scan for modified applications, replay and merge them // scan for modified applications, replay and merge them
val logInfos: Seq[FileStatus] = statusList val logInfos: Seq[FileStatus] = statusList
.filter { entry => .filter { entry =>

View file

@ -55,7 +55,7 @@ class MasterWebUI(
} }
def addProxyTargets(id: String, target: String): Unit = { def addProxyTargets(id: String, target: String): Unit = {
var endTarget = target.stripSuffix("/") val endTarget = target.stripSuffix("/")
val handler = createProxyHandler("/proxy/" + id, endTarget) val handler = createProxyHandler("/proxy/" + id, endTarget)
attachHandler(handler) attachHandler(handler)
proxyHandlers(id) = handler proxyHandlers(id) = handler

View file

@ -44,7 +44,7 @@ object CommandUtils extends Logging {
memory: Int, memory: Int,
sparkHome: String, sparkHome: String,
substituteArguments: String => String, substituteArguments: String => String,
classPaths: Seq[String] = Seq[String](), classPaths: Seq[String] = Seq.empty,
env: Map[String, String] = sys.env): ProcessBuilder = { env: Map[String, String] = sys.env): ProcessBuilder = {
val localCommand = buildLocalCommand( val localCommand = buildLocalCommand(
command, securityMgr, substituteArguments, classPaths, env) command, securityMgr, substituteArguments, classPaths, env)
@ -73,7 +73,7 @@ object CommandUtils extends Logging {
command: Command, command: Command,
securityMgr: SecurityManager, securityMgr: SecurityManager,
substituteArguments: String => String, substituteArguments: String => String,
classPath: Seq[String] = Seq[String](), classPath: Seq[String] = Seq.empty,
env: Map[String, String]): Command = { env: Map[String, String]): Command = {
val libraryPathName = Utils.libraryPathEnvName val libraryPathName = Utils.libraryPathEnvName
val libraryPathEntries = command.libraryPathEntries val libraryPathEntries = command.libraryPathEntries
@ -96,7 +96,7 @@ object CommandUtils extends Logging {
command.arguments.map(substituteArguments), command.arguments.map(substituteArguments),
newEnvironment, newEnvironment,
command.classPathEntries ++ classPath, command.classPathEntries ++ classPath,
Seq[String](), // library path already captured in environment variable Seq.empty, // library path already captured in environment variable
// filter out auth secret from java options // filter out auth secret from java options
command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF)))
} }

View file

@ -17,6 +17,8 @@
package org.apache package org.apache
import java.util.Properties
/** /**
* Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to
* Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection, * Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection,
@ -40,9 +42,6 @@ package org.apache
* Developer API</span> are intended for advanced users want to extend Spark through lower * Developer API</span> are intended for advanced users want to extend Spark through lower
* level interfaces. These are subject to changes or removal in minor releases. * level interfaces. These are subject to changes or removal in minor releases.
*/ */
import java.util.Properties
package object spark { package object spark {
private object SparkBuildInfo { private object SparkBuildInfo {
@ -57,6 +56,9 @@ package object spark {
val resourceStream = Thread.currentThread().getContextClassLoader. val resourceStream = Thread.currentThread().getContextClassLoader.
getResourceAsStream("spark-version-info.properties") getResourceAsStream("spark-version-info.properties")
if (resourceStream == null) {
throw new SparkException("Could not find spark-version-info.properties")
}
try { try {
val unknownProp = "<unknown>" val unknownProp = "<unknown>"
@ -71,8 +73,6 @@ package object spark {
props.getProperty("date", unknownProp) props.getProperty("date", unknownProp)
) )
} catch { } catch {
case npe: NullPointerException =>
throw new SparkException("Error while locating file spark-version-info.properties", npe)
case e: Exception => case e: Exception =>
throw new SparkException("Error loading properties from spark-version-info.properties", e) throw new SparkException("Error loading properties from spark-version-info.properties", e)
} finally { } finally {

View file

@ -269,7 +269,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
tries = 0 tries = 0
// if we don't have enough partition groups, create duplicates // if we don't have enough partition groups, create duplicates
while (numCreated < targetLen) { while (numCreated < targetLen) {
var (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
tries += 1 tries += 1
val pgroup = new PartitionGroup(Some(nxt_replica)) val pgroup = new PartitionGroup(Some(nxt_replica))
groupArr += pgroup groupArr += pgroup

View file

@ -97,7 +97,7 @@ private[spark] class Pool(
} }
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue = val sortedSchedulableQueue =
schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) { for (schedulable <- sortedSchedulableQueue) {

View file

@ -60,7 +60,7 @@ private[spark] class DirectTaskResult[T](
val numUpdates = in.readInt val numUpdates = in.readInt
if (numUpdates == 0) { if (numUpdates == 0) {
accumUpdates = Seq() accumUpdates = Seq.empty
} else { } else {
val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]] val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]]
for (i <- 0 until numUpdates) { for (i <- 0 until numUpdates) {

View file

@ -891,7 +891,7 @@ private[spark] class TaskSetManager(
override def removeSchedulable(schedulable: Schedulable) {} override def removeSchedulable(schedulable: Schedulable) {}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]() val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
sortedTaskSetQueue += this sortedTaskSetQueue += this
sortedTaskSetQueue sortedTaskSetQueue
} }
@ -948,7 +948,7 @@ private[spark] class TaskSetManager(
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
val time = clock.getTimeMillis() val time = clock.getTimeMillis()
var medianDuration = successfulTaskDurations.median val medianDuration = successfulTaskDurations.median
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
// TODO: Threshold should also look at standard deviation of task durations and have a lower // TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that. // bound based on that.

View file

@ -23,7 +23,6 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.Duration
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
@ -427,11 +426,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* be called in the yarn-client mode when AM re-registers after a failure. * be called in the yarn-client mode when AM re-registers after a failure.
* */ * */
protected def reset(): Unit = { protected def reset(): Unit = {
val executors = synchronized { val executors: Set[String] = synchronized {
requestedTotalExecutors = 0 requestedTotalExecutors = 0
numPendingExecutors = 0 numPendingExecutors = 0
executorsPendingToRemove.clear() executorsPendingToRemove.clear()
Set() ++ executorDataMap.keys executorDataMap.keys.toSet
} }
// Remove all the lingering executors that should be removed but not yet. The reason might be // Remove all the lingering executors that should be removed but not yet. The reason might be

View file

@ -1275,11 +1275,11 @@ private[spark] class BlockManager(
val numPeersToReplicateTo = level.replication - 1 val numPeersToReplicateTo = level.replication - 1
val startTime = System.nanoTime val startTime = System.nanoTime
var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas val peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId] val peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
var numFailures = 0 var numFailures = 0
val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_)) val initialPeers = getPeers(false).filterNot(existingReplicas.contains)
var peersForReplication = blockReplicationPolicy.prioritize( var peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId, blockManagerId,

View file

@ -241,7 +241,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}.getOrElse(jobIdTitle) }.getOrElse(jobIdTitle)
val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse( val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
// New jobs should be shown above old jobs by default. // New jobs should be shown above old jobs by default.
if (jobSortColumn == jobIdTitle) true else false jobSortColumn == jobIdTitle
) )
val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100) val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize) val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)

View file

@ -115,7 +115,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
if (sc.isDefined && isFairScheduler) { if (sc.isDefined && isFairScheduler) {
<h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
} else { } else {
Seq[Node]() Seq.empty[Node]
} }
} }
if (shouldShowActiveStages) { if (shouldShowActiveStages) {

View file

@ -41,7 +41,7 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
val poolToActiveStages = listener.poolToActiveStages val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.get(poolName) match { val activeStages = poolToActiveStages.get(poolName) match {
case Some(s) => s.values.toSeq case Some(s) => s.values.toSeq
case None => Seq[StageInfo]() case None => Seq.empty[StageInfo]
} }
val shouldShowActiveStages = activeStages.nonEmpty val shouldShowActiveStages = activeStages.nonEmpty
val activeStagesTable = val activeStagesTable =

View file

@ -565,7 +565,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val executorTable = new ExecutorTable(stageId, stageAttemptId, parent) val executorTable = new ExecutorTable(stageId, stageAttemptId, parent)
val maybeAccumulableTable: Seq[Node] = val maybeAccumulableTable: Seq[Node] =
if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq() if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq.empty
val aggMetrics = val aggMetrics =
<span class="collapse-aggregated-metrics collapse-table" <span class="collapse-aggregated-metrics collapse-table"

View file

@ -60,7 +60,7 @@ private[ui] class StageTableBase(
}.getOrElse("Stage Id") }.getOrElse("Stage Id")
val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse( val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse(
// New stages should be shown above old jobs by default. // New stages should be shown above old jobs by default.
if (stageSortColumn == "Stage Id") true else false stageSortColumn == "Stage Id"
) )
val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100) val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100)
val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt) val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt)

View file

@ -51,7 +51,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true) val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true)
.getOrElse { .getOrElse {
// Rather than crashing, render an "RDD Not Found" page // Rather than crashing, render an "RDD Not Found" page
return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent) return UIUtils.headerSparkPage("RDD Not Found", Seq.empty[Node], parent)
} }
// Worker table // Worker table

View file

@ -81,7 +81,7 @@ private[spark] object ClosureCleaner extends Logging {
val stack = Stack[Class[_]](obj.getClass) val stack = Stack[Class[_]](obj.getClass)
while (!stack.isEmpty) { while (!stack.isEmpty) {
val cr = getClassReader(stack.pop()) val cr = getClassReader(stack.pop())
val set = Set[Class[_]]() val set = Set.empty[Class[_]]
cr.accept(new InnerClosureFinder(set), 0) cr.accept(new InnerClosureFinder(set), 0)
for (cls <- set -- seen) { for (cls <- set -- seen) {
seen += cls seen += cls
@ -180,6 +180,7 @@ private[spark] object ClosureCleaner extends Logging {
val declaredFields = func.getClass.getDeclaredFields val declaredFields = func.getClass.getDeclaredFields
val declaredMethods = func.getClass.getDeclaredMethods val declaredMethods = func.getClass.getDeclaredMethods
if (log.isDebugEnabled) {
logDebug(" + declared fields: " + declaredFields.size) logDebug(" + declared fields: " + declaredFields.size)
declaredFields.foreach { f => logDebug(" " + f) } declaredFields.foreach { f => logDebug(" " + f) }
logDebug(" + declared methods: " + declaredMethods.size) logDebug(" + declared methods: " + declaredMethods.size)
@ -190,6 +191,7 @@ private[spark] object ClosureCleaner extends Logging {
outerClasses.foreach { c => logDebug(" " + c.getName) } outerClasses.foreach { c => logDebug(" " + c.getName) }
logDebug(" + outer objects: " + outerObjects.size) logDebug(" + outer objects: " + outerObjects.size)
outerObjects.foreach { o => logDebug(" " + o) } outerObjects.foreach { o => logDebug(" " + o) }
}
// Fail fast if we detect return statements in closures // Fail fast if we detect return statements in closures
getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
@ -201,7 +203,7 @@ private[spark] object ClosureCleaner extends Logging {
// Initialize accessed fields with the outer classes first // Initialize accessed fields with the outer classes first
// This step is needed to associate the fields to the correct classes later // This step is needed to associate the fields to the correct classes later
for (cls <- outerClasses) { for (cls <- outerClasses) {
accessedFields(cls) = Set[String]() accessedFields(cls) = Set.empty[String]
} }
// Populate accessed fields by visiting all fields and methods accessed by this and // Populate accessed fields by visiting all fields and methods accessed by this and
// all of its inner closures. If transitive cleaning is enabled, this may recursively // all of its inner closures. If transitive cleaning is enabled, this may recursively

View file

@ -696,7 +696,7 @@ private[spark] object JsonProtocol {
val accumulatedValues = { val accumulatedValues = {
Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match { Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson) case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]() case None => Seq.empty[AccumulableInfo]
} }
} }
@ -726,7 +726,7 @@ private[spark] object JsonProtocol {
val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean]) val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean])
val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match { val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson) case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]() case None => Seq.empty[AccumulableInfo]
} }
val taskInfo = val taskInfo =

View file

@ -1443,7 +1443,7 @@ private[spark] object Utils extends Logging {
var firstUserFile = "<unknown>" var firstUserFile = "<unknown>"
var firstUserLine = 0 var firstUserLine = 0
var insideSpark = true var insideSpark = true
var callStack = new ArrayBuffer[String]() :+ "<unknown>" val callStack = new ArrayBuffer[String]() :+ "<unknown>"
Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement => Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus // When running under some profilers, the current stack trace might contain some bogus
@ -2438,7 +2438,7 @@ private[spark] object Utils extends Logging {
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName()) .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
} }
val EMPTY_USER_GROUPS = Set[String]() val EMPTY_USER_GROUPS = Set.empty[String]
// Returns the groups to which the current user belongs. // Returns the groups to which the current user belongs.
def getCurrentUserGroups(sparkConf: SparkConf, username: String): Set[String] = { def getCurrentUserGroups(sparkConf: SparkConf, username: String): Set[String] = {
@ -2587,7 +2587,7 @@ private[spark] object Utils extends Logging {
* Unions two comma-separated lists of files and filters out empty strings. * Unions two comma-separated lists of files and filters out empty strings.
*/ */
def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = { def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = {
var allFiles = Set[String]() var allFiles = Set.empty[String]
leftList.foreach { value => allFiles ++= value.split(",") } leftList.foreach { value => allFiles ++= value.split(",") }
rightList.foreach { value => allFiles ++= value.split(",") } rightList.foreach { value => allFiles ++= value.split(",") }
allFiles.filter { _.nonEmpty } allFiles.filter { _.nonEmpty }

View file

@ -53,16 +53,16 @@ object LocalFileLR {
val fileSrc = scala.io.Source.fromFile(args(0)) val fileSrc = scala.io.Source.fromFile(args(0))
val lines = fileSrc.getLines().toArray val lines = fileSrc.getLines().toArray
val points = lines.map(parsePoint _) val points = lines.map(parsePoint)
val ITERATIONS = args(1).toInt val ITERATIONS = args(1).toInt
// Initialize w to a random value // Initialize w to a random value
var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w) println("Initial w: " + w)
for (i <- 1 to ITERATIONS) { for (i <- 1 to ITERATIONS) {
println("On iteration " + i) println("On iteration " + i)
var gradient = DenseVector.zeros[Double](D) val gradient = DenseVector.zeros[Double](D)
for (p <- points) { for (p <- points) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
gradient += p.x * scale gradient += p.x * scale

View file

@ -47,12 +47,11 @@ object LocalKMeans {
} }
def closestPoint(p: Vector[Double], centers: HashMap[Int, Vector[Double]]): Int = { def closestPoint(p: Vector[Double], centers: HashMap[Int, Vector[Double]]): Int = {
var index = 0
var bestIndex = 0 var bestIndex = 0
var closest = Double.PositiveInfinity var closest = Double.PositiveInfinity
for (i <- 1 to centers.size) { for (i <- 1 to centers.size) {
val vCurr = centers.get(i).get val vCurr = centers(i)
val tempDist = squaredDistance(p, vCurr) val tempDist = squaredDistance(p, vCurr)
if (tempDist < closest) { if (tempDist < closest) {
closest = tempDist closest = tempDist
@ -76,8 +75,8 @@ object LocalKMeans {
showWarning() showWarning()
val data = generateData val data = generateData
var points = new HashSet[Vector[Double]] val points = new HashSet[Vector[Double]]
var kPoints = new HashMap[Int, Vector[Double]] val kPoints = new HashMap[Int, Vector[Double]]
var tempDist = 1.0 var tempDist = 1.0
while (points.size < K) { while (points.size < K) {
@ -92,11 +91,11 @@ object LocalKMeans {
println("Initial centers: " + kPoints) println("Initial centers: " + kPoints)
while(tempDist > convergeDist) { while(tempDist > convergeDist) {
var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
var mappings = closest.groupBy[Int] (x => x._1) val mappings = closest.groupBy[Int] (x => x._1)
var pointStats = mappings.map { pair => val pointStats = mappings.map { pair =>
pair._2.reduceLeft [(Int, (Vector[Double], Int))] { pair._2.reduceLeft [(Int, (Vector[Double], Int))] {
case ((id1, (p1, c1)), (id2, (p2, c2))) => (id1, (p1 + p2, c1 + c2)) case ((id1, (p1, c1)), (id2, (p2, c2))) => (id1, (p1 + p2, c1 + c2))
} }
@ -107,7 +106,7 @@ object LocalKMeans {
tempDist = 0.0 tempDist = 0.0
for (mapping <- newPoints) { for (mapping <- newPoints) {
tempDist += squaredDistance(kPoints.get(mapping._1).get, mapping._2) tempDist += squaredDistance(kPoints(mapping._1), mapping._2)
} }
for (newP <- newPoints) { for (newP <- newPoints) {

View file

@ -60,12 +60,12 @@ object LocalLR {
val data = generateData val data = generateData
// Initialize w to a random value // Initialize w to a random value
var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w) println("Initial w: " + w)
for (i <- 1 to ITERATIONS) { for (i <- 1 to ITERATIONS) {
println("On iteration " + i) println("On iteration " + i)
var gradient = DenseVector.zeros[Double](D) val gradient = DenseVector.zeros[Double](D)
for (p <- data) { for (p <- data) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
gradient += p.x * scale gradient += p.x * scale

View file

@ -40,8 +40,8 @@ object SparkHdfsLR {
def parsePoint(line: String): DataPoint = { def parsePoint(line: String): DataPoint = {
val tok = new java.util.StringTokenizer(line, " ") val tok = new java.util.StringTokenizer(line, " ")
var y = tok.nextToken.toDouble val y = tok.nextToken.toDouble
var x = new Array[Double](D) val x = new Array[Double](D)
var i = 0 var i = 0
while (i < D) { while (i < D) {
x(i) = tok.nextToken.toDouble; i += 1 x(i) = tok.nextToken.toDouble; i += 1
@ -78,7 +78,7 @@ object SparkHdfsLR {
val ITERATIONS = args(1).toInt val ITERATIONS = args(1).toInt
// Initialize w to a random value // Initialize w to a random value
var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w) println("Initial w: " + w)
for (i <- 1 to ITERATIONS) { for (i <- 1 to ITERATIONS) {

View file

@ -72,7 +72,7 @@ object SparkLR {
val points = spark.sparkContext.parallelize(generateData, numSlices).cache() val points = spark.sparkContext.parallelize(generateData, numSlices).cache()
// Initialize w to a random value // Initialize w to a random value
var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w) println("Initial w: " + w)
for (i <- 1 to ITERATIONS) { for (i <- 1 to ITERATIONS) {

View file

@ -251,7 +251,7 @@ object DecisionTreeExample {
.setMinInfoGain(params.minInfoGain) .setMinInfoGain(params.minInfoGain)
.setCacheNodeIds(params.cacheNodeIds) .setCacheNodeIds(params.cacheNodeIds)
.setCheckpointInterval(params.checkpointInterval) .setCheckpointInterval(params.checkpointInterval)
case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
} }
stages += dt stages += dt
val pipeline = new Pipeline().setStages(stages.toArray) val pipeline = new Pipeline().setStages(stages.toArray)
@ -278,7 +278,7 @@ object DecisionTreeExample {
} else { } else {
println(treeModel) // Print model summary. println(treeModel) // Print model summary.
} }
case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
} }
// Evaluate model on training, test data. // Evaluate model on training, test data.
@ -294,7 +294,7 @@ object DecisionTreeExample {
println("Test data results:") println("Test data results:")
evaluateRegressionModel(pipelineModel, test, labelColName) evaluateRegressionModel(pipelineModel, test, labelColName)
case _ => case _ =>
throw new IllegalArgumentException("Algo ${params.algo} not supported.") throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
} }
spark.stop() spark.stop()

View file

@ -190,7 +190,7 @@ object GBTExample {
.setCacheNodeIds(params.cacheNodeIds) .setCacheNodeIds(params.cacheNodeIds)
.setCheckpointInterval(params.checkpointInterval) .setCheckpointInterval(params.checkpointInterval)
.setMaxIter(params.maxIter) .setMaxIter(params.maxIter)
case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
} }
stages += dt stages += dt
val pipeline = new Pipeline().setStages(stages.toArray) val pipeline = new Pipeline().setStages(stages.toArray)
@ -217,7 +217,7 @@ object GBTExample {
} else { } else {
println(rfModel) // Print model summary. println(rfModel) // Print model summary.
} }
case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
} }
// Evaluate model on training, test data. // Evaluate model on training, test data.
@ -233,7 +233,7 @@ object GBTExample {
println("Test data results:") println("Test data results:")
DecisionTreeExample.evaluateRegressionModel(pipelineModel, test, labelColName) DecisionTreeExample.evaluateRegressionModel(pipelineModel, test, labelColName)
case _ => case _ =>
throw new IllegalArgumentException("Algo ${params.algo} not supported.") throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
} }
spark.stop() spark.stop()

View file

@ -198,7 +198,7 @@ object RandomForestExample {
.setCheckpointInterval(params.checkpointInterval) .setCheckpointInterval(params.checkpointInterval)
.setFeatureSubsetStrategy(params.featureSubsetStrategy) .setFeatureSubsetStrategy(params.featureSubsetStrategy)
.setNumTrees(params.numTrees) .setNumTrees(params.numTrees)
case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
} }
stages += dt stages += dt
val pipeline = new Pipeline().setStages(stages.toArray) val pipeline = new Pipeline().setStages(stages.toArray)
@ -225,7 +225,7 @@ object RandomForestExample {
} else { } else {
println(rfModel) // Print model summary. println(rfModel) // Print model summary.
} }
case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
} }
// Evaluate model on training, test data. // Evaluate model on training, test data.
@ -241,7 +241,7 @@ object RandomForestExample {
println("Test data results:") println("Test data results:")
DecisionTreeExample.evaluateRegressionModel(pipelineModel, test, labelColName) DecisionTreeExample.evaluateRegressionModel(pipelineModel, test, labelColName)
case _ => case _ =>
throw new IllegalArgumentException("Algo ${params.algo} not supported.") throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
} }
spark.stop() spark.stop()

View file

@ -211,7 +211,7 @@ object DecisionTreeRunner {
case Regression => case Regression =>
(origExamples, null, 0) (origExamples, null, 0)
case _ => case _ =>
throw new IllegalArgumentException("Algo ${params.algo} not supported.") throw new IllegalArgumentException(s"Algo $algo not supported.")
} }
// Create training, test sets. // Create training, test sets.

View file

@ -378,7 +378,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
zkUtils.getLeaderForPartition(topic, partition).isDefined && zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
leaderAndInSyncReplicas.isr.size >= 1 leaderAndInSyncReplicas.isr.nonEmpty
case _ => case _ =>
false false

View file

@ -156,7 +156,7 @@ private[spark] class KafkaRDD[K, V](
val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost) val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost)
val execs = if (prefExecs.isEmpty) allExecs else prefExecs val execs = if (prefExecs.isEmpty) allExecs else prefExecs
if (execs.isEmpty) { if (execs.isEmpty) {
Seq() Seq.empty
} else { } else {
// execs is sorted, tp.hashCode depends only on topic and partition, so consistent index // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index
val index = Math.floorMod(tp.hashCode, execs.length) val index = Math.floorMod(tp.hashCode, execs.length)

View file

@ -257,7 +257,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
zkUtils.getLeaderForPartition(topic, partition).isDefined && zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
leaderAndInSyncReplicas.isr.size >= 1 leaderAndInSyncReplicas.isr.nonEmpty
case _ => case _ =>
false false

View file

@ -388,7 +388,7 @@ class EdgePartition[
val aggregates = new Array[A](vertexAttrs.length) val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length) val bitset = new BitSet(vertexAttrs.length)
var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) val ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
var i = 0 var i = 0
while (i < size) { while (i < size) {
val localSrcId = localSrcIds(i) val localSrcId = localSrcIds(i)
@ -433,7 +433,7 @@ class EdgePartition[
val aggregates = new Array[A](vertexAttrs.length) val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length) val bitset = new BitSet(vertexAttrs.length)
var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) val ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
index.iterator.foreach { cluster => index.iterator.foreach { cluster =>
val clusterSrcId = cluster._1 val clusterSrcId = cluster._1
val clusterPos = cluster._2 val clusterPos = cluster._2

View file

@ -74,7 +74,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def getCheckpointFiles: Seq[String] = { override def getCheckpointFiles: Seq[String] = {
Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap { Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap {
case Some(path) => Seq(path) case Some(path) => Seq(path)
case None => Seq() case None => Seq.empty
} }
} }

View file

@ -18,6 +18,7 @@
package org.apache.spark.graphx.util package org.apache.spark.graphx.util
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.mutable
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.util._ import scala.util._
@ -133,7 +134,7 @@ object GraphGenerators extends Logging {
throw new IllegalArgumentException( throw new IllegalArgumentException(
s"numEdges must be <= $numEdgesUpperBound but was $numEdges") s"numEdges must be <= $numEdgesUpperBound but was $numEdges")
} }
var edges: Set[Edge[Int]] = Set() var edges = mutable.Set.empty[Edge[Int]]
while (edges.size < numEdges) { while (edges.size < numEdges) {
if (edges.size % 100 == 0) { if (edges.size % 100 == 0) {
logDebug(edges.size + " edges") logDebug(edges.size + " edges")

View file

@ -736,7 +736,7 @@ class LogisticRegression @Since("1.2.0") (
b_k' = b_k - \mean(b_k) b_k' = b_k - \mean(b_k)
}}} }}}
*/ */
val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing val rawIntercepts = histogram.map(math.log1p) // add 1 for smoothing (log1p(x) = log(1+x))
val rawMean = rawIntercepts.sum / rawIntercepts.length val rawMean = rawIntercepts.sum / rawIntercepts.length
rawIntercepts.indices.foreach { i => rawIntercepts.indices.foreach { i =>
initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean) initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean)
@ -820,7 +820,7 @@ class LogisticRegression @Since("1.2.0") (
val interceptVec = if ($(fitIntercept) || !isMultinomial) { val interceptVec = if ($(fitIntercept) || !isMultinomial) {
Vectors.zeros(numCoefficientSets) Vectors.zeros(numCoefficientSets)
} else { } else {
Vectors.sparse(numCoefficientSets, Seq()) Vectors.sparse(numCoefficientSets, Seq.empty)
} }
// separate intercepts and coefficients from the combined matrix // separate intercepts and coefficients from the combined matrix
allCoefMatrix.foreachActive { (classIndex, featureIndex, value) => allCoefMatrix.foreachActive { (classIndex, featureIndex, value) =>

View file

@ -99,7 +99,7 @@ private[ml] case class ParsedRFormula(label: ColumnRef, terms: Seq[Term]) {
}).map(_.distinct) }).map(_.distinct)
// Deduplicates feature interactions, for example, a:b is the same as b:a. // Deduplicates feature interactions, for example, a:b is the same as b:a.
var seen = mutable.Set[Set[String]]() val seen = mutable.Set[Set[String]]()
validInteractions.flatMap { validInteractions.flatMap {
case t if seen.contains(t.toSet) => case t if seen.contains(t.toSet) =>
None None

View file

@ -286,7 +286,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
s"training is not needed.") s"training is not needed.")
} }
if (handlePersistence) instances.unpersist() if (handlePersistence) instances.unpersist()
val coefficients = Vectors.sparse(numFeatures, Seq()) val coefficients = Vectors.sparse(numFeatures, Seq.empty)
val intercept = yMean val intercept = yMean
val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept)) val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept))

View file

@ -363,7 +363,7 @@ class KMeans private (
// to their squared distance from the centers. Note that only distances between points // to their squared distance from the centers. Note that only distances between points
// and new centers are computed in each iteration. // and new centers are computed in each iteration.
var step = 0 var step = 0
var bcNewCentersList = ArrayBuffer[Broadcast[_]]() val bcNewCentersList = ArrayBuffer[Broadcast[_]]()
while (step < initializationSteps) { while (step < initializationSteps) {
val bcNewCenters = data.context.broadcast(newCenters) val bcNewCenters = data.context.broadcast(newCenters)
bcNewCentersList += bcNewCenters bcNewCentersList += bcNewCenters

View file

@ -78,13 +78,13 @@ private[mllib] object EigenValueDecomposition {
require(n * ncv.toLong <= Integer.MAX_VALUE && ncv * (ncv.toLong + 8) <= Integer.MAX_VALUE, require(n * ncv.toLong <= Integer.MAX_VALUE && ncv * (ncv.toLong + 8) <= Integer.MAX_VALUE,
s"k = $k and/or n = $n are too large to compute an eigendecomposition") s"k = $k and/or n = $n are too large to compute an eigendecomposition")
var ido = new intW(0) val ido = new intW(0)
var info = new intW(0) val info = new intW(0)
var resid = new Array[Double](n) val resid = new Array[Double](n)
var v = new Array[Double](n * ncv) val v = new Array[Double](n * ncv)
var workd = new Array[Double](n * 3) val workd = new Array[Double](n * 3)
var workl = new Array[Double](ncv * (ncv + 8)) val workl = new Array[Double](ncv * (ncv + 8))
var ipntr = new Array[Int](11) val ipntr = new Array[Int](11)
// call ARPACK's reverse communication, first iteration with ido = 0 // call ARPACK's reverse communication, first iteration with ido = 0
arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd,

View file

@ -257,7 +257,7 @@ object LBFGS extends Logging {
(denseGrad1, loss1 + loss2) (denseGrad1, loss1 + loss2)
} }
val zeroSparseVector = Vectors.sparse(n, Seq()) val zeroSparseVector = Vectors.sparse(n, Seq.empty)
val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp) val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp)
// broadcasted model is not needed anymore // broadcasted model is not needed anymore

View file

@ -57,7 +57,7 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
var preCol = -1 var preCol = -1
var preVal = Double.NaN var preVal = Double.NaN
var startRank = -1.0 var startRank = -1.0
var cachedUids = ArrayBuffer.empty[Long] val cachedUids = ArrayBuffer.empty[Long]
val flush: () => Iterable[(Long, (Int, Double))] = () => { val flush: () => Iterable[(Long, (Int, Double))] = () => {
val averageRank = startRank + (cachedUids.size - 1) / 2.0 val averageRank = startRank + (cachedUids.size - 1) / 2.0
val output = cachedUids.map { uid => val output = cachedUids.map { uid =>

View file

@ -133,7 +133,7 @@ class StreamingTest @Since("1.6.0") () extends Logging with Serializable {
if (time.milliseconds > data.slideDuration.milliseconds * peacePeriod) { if (time.milliseconds > data.slideDuration.milliseconds * peacePeriod) {
rdd rdd
} else { } else {
data.context.sparkContext.parallelize(Seq()) data.context.sparkContext.parallelize(Seq.empty)
} }
} }
} }

View file

@ -589,7 +589,7 @@ object PySparkAssembly {
val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
zipFile.delete() zipFile.delete()
zipRecursive(src, zipFile) zipRecursive(src, zipFile)
Seq[File]() Seq.empty[File]
}).value }).value
) )
@ -810,7 +810,7 @@ object TestSettings {
require(d.mkdir() || d.isDirectory(), s"Failed to create directory $d") require(d.mkdir() || d.isDirectory(), s"Failed to create directory $d")
} }
} }
Seq[File]() Seq.empty[File]
}).value, }).value,
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
// Remove certain packages from Scaladoc // Remove certain packages from Scaladoc

View file

@ -112,7 +112,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
<td>Last Task Status</td> <td>Last Task Status</td>
<td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td> <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td>
</tr> </tr>
}.getOrElse(Seq[Node]()) }.getOrElse(Seq.empty[Node])
} }
private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = { private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = {
@ -175,6 +175,6 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
{state.retries} {state.retries}
</td> </td>
</tr> </tr>
}.getOrElse(Seq[Node]()) }.getOrElse(Seq.empty[Node])
} }
} }

View file

@ -333,7 +333,7 @@ trait MesosSchedulerUtils extends Logging {
try { try {
splitter.split(constraintsVal).asScala.toMap.mapValues(v => splitter.split(constraintsVal).asScala.toMap.mapValues(v =>
if (v == null || v.isEmpty) { if (v == null || v.isEmpty) {
Set[String]() Set.empty[String]
} else { } else {
v.split(',').toSet v.split(',').toSet
} }

View file

@ -2334,8 +2334,9 @@ object TimeWindowing extends Rule[LogicalPlan] {
val windowExpressions = val windowExpressions =
p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet
val numWindowExpr = windowExpressions.size
// Only support a single window expression for now // Only support a single window expression for now
if (windowExpressions.size == 1 && if (numWindowExpr == 1 &&
windowExpressions.head.timeColumn.resolved && windowExpressions.head.timeColumn.resolved &&
windowExpressions.head.checkInputDataTypes().isSuccess) { windowExpressions.head.checkInputDataTypes().isSuccess) {
@ -2402,7 +2403,7 @@ object TimeWindowing extends Rule[LogicalPlan] {
renamedPlan.withNewChildren(substitutedPlan :: Nil) renamedPlan.withNewChildren(substitutedPlan :: Nil)
} }
} else if (windowExpressions.size > 1) { } else if (numWindowExpr > 1) {
p.failAnalysis("Multiple time window expressions would result in a cartesian product " + p.failAnalysis("Multiple time window expressions would result in a cartesian product " +
"of rows, therefore they are currently not supported.") "of rows, therefore they are currently not supported.")
} else { } else {

View file

@ -228,10 +228,10 @@ case class ArrayContains(left: Expression, right: Expression)
override def dataType: DataType = BooleanType override def dataType: DataType = BooleanType
override def inputTypes: Seq[AbstractDataType] = right.dataType match { override def inputTypes: Seq[AbstractDataType] = right.dataType match {
case NullType => Seq() case NullType => Seq.empty
case _ => left.dataType match { case _ => left.dataType match {
case n @ ArrayType(element, _) => Seq(n, element) case n @ ArrayType(element, _) => Seq(n, element)
case _ => Seq() case _ => Seq.empty
} }
} }

View file

@ -150,7 +150,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
// Create the initial plans: each plan is a single item with zero cost. // Create the initial plans: each plan is a single item with zero cost.
val itemIndex = items.zipWithIndex val itemIndex = items.zipWithIndex
val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map { val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map {
case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set.empty, Cost(0, 0))
}.toMap) }.toMap)
// Build filters from the join graph to be used by the search algorithm. // Build filters from the join graph to be used by the search algorithm.

View file

@ -134,7 +134,7 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] {
private def collectGroupingExpressions(plan: LogicalPlan): ExpressionSet = plan match { private def collectGroupingExpressions(plan: LogicalPlan): ExpressionSet = plan match {
case Aggregate(groupingExpressions, aggregateExpressions, child) => case Aggregate(groupingExpressions, aggregateExpressions, child) =>
ExpressionSet.apply(groupingExpressions) ExpressionSet.apply(groupingExpressions)
case _ => ExpressionSet(Seq()) case _ => ExpressionSet(Seq.empty)
} }
def apply(plan: LogicalPlan): LogicalPlan = plan transform { def apply(plan: LogicalPlan): LogicalPlan = plan transform {

View file

@ -877,7 +877,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
// Reverse the contexts to have them in the same sequence as in the SQL statement & turn them // Reverse the contexts to have them in the same sequence as in the SQL statement & turn them
// into expressions. // into expressions.
val expressions = contexts.reverse.map(expression) val expressions = contexts.reverseMap(expression)
// Create a balanced tree. // Create a balanced tree.
def reduceToExpressionTree(low: Int, high: Int): Expression = high - low match { def reduceToExpressionTree(low: Int, high: Int): Expression = high - low match {

View file

@ -173,7 +173,7 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper {
val (plans, conditions) = flattenJoin(j) val (plans, conditions) = flattenJoin(j)
(plans, conditions ++ splitConjunctivePredicates(filterCondition)) (plans, conditions ++ splitConjunctivePredicates(filterCondition))
case _ => (Seq((plan, parentJoinType)), Seq()) case _ => (Seq((plan, parentJoinType)), Seq.empty)
} }
def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]

View file

@ -47,7 +47,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
// Estimate selectivity of this filter predicate, and update column stats if needed. // Estimate selectivity of this filter predicate, and update column stats if needed.
// For not-supported condition, set filter selectivity to a conservative estimate 100% // For not-supported condition, set filter selectivity to a conservative estimate 100%
val filterSelectivity = calculateFilterSelectivity(plan.condition).getOrElse(BigDecimal(1.0)) val filterSelectivity = calculateFilterSelectivity(plan.condition).getOrElse(BigDecimal(1))
val filteredRowCount: BigInt = ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity) val filteredRowCount: BigInt = ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
val newColStats = if (filteredRowCount == 0) { val newColStats = if (filteredRowCount == 0) {
@ -83,13 +83,13 @@ case class FilterEstimation(plan: Filter) extends Logging {
: Option[BigDecimal] = { : Option[BigDecimal] = {
condition match { condition match {
case And(cond1, cond2) => case And(cond1, cond2) =>
val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(BigDecimal(1.0)) val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(BigDecimal(1))
val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(BigDecimal(1.0)) val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(BigDecimal(1))
Some(percent1 * percent2) Some(percent1 * percent2)
case Or(cond1, cond2) => case Or(cond1, cond2) =>
val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(BigDecimal(1.0)) val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(BigDecimal(1))
val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(BigDecimal(1.0)) val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(BigDecimal(1))
Some(percent1 + percent2 - (percent1 * percent2)) Some(percent1 + percent2 - (percent1 * percent2))
// Not-operator pushdown // Not-operator pushdown
@ -464,7 +464,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
(numericLiteral > max, numericLiteral <= min) (numericLiteral > max, numericLiteral <= min)
} }
var percent = BigDecimal(1.0) var percent = BigDecimal(1)
if (noOverlap) { if (noOverlap) {
percent = 0.0 percent = 0.0
} else if (completeOverlap) { } else if (completeOverlap) {
@ -630,7 +630,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
) )
} }
var percent = BigDecimal(1.0) var percent = BigDecimal(1)
if (noOverlap) { if (noOverlap) {
percent = 0.0 percent = 0.0
} else if (completeOverlap) { } else if (completeOverlap) {

View file

@ -609,9 +609,9 @@ class MathExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(BRound(floatPi, scale), floatResults(i), EmptyRow) checkEvaluation(BRound(floatPi, scale), floatResults(i), EmptyRow)
} }
val bdResults: Seq[BigDecimal] = Seq(BigDecimal(3.0), BigDecimal(3.1), BigDecimal(3.14), val bdResults: Seq[BigDecimal] = Seq(BigDecimal(3), BigDecimal("3.1"), BigDecimal("3.14"),
BigDecimal(3.142), BigDecimal(3.1416), BigDecimal(3.14159), BigDecimal("3.142"), BigDecimal("3.1416"), BigDecimal("3.14159"),
BigDecimal(3.141593), BigDecimal(3.1415927)) BigDecimal("3.141593"), BigDecimal("3.1415927"))
(0 to 7).foreach { i => (0 to 7).foreach { i =>
checkEvaluation(Round(bdPi, i), bdResults(i), EmptyRow) checkEvaluation(Round(bdPi, i), bdResults(i), EmptyRow)

View file

@ -109,8 +109,8 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester {
test("small decimals represented as unscaled long") { test("small decimals represented as unscaled long") {
checkCompact(new Decimal(), true) checkCompact(new Decimal(), true)
checkCompact(Decimal(BigDecimal(10.03)), false) checkCompact(Decimal(BigDecimal("10.03")), false)
checkCompact(Decimal(BigDecimal(1e20)), false) checkCompact(Decimal(BigDecimal("100000000000000000000")), false)
checkCompact(Decimal(17L), true) checkCompact(Decimal(17L), true)
checkCompact(Decimal(17), true) checkCompact(Decimal(17), true)
checkCompact(Decimal(17L, 2, 1), true) checkCompact(Decimal(17L, 2, 1), true)

View file

@ -671,11 +671,11 @@ case class AlterTableRecoverPartitionsCommand(
} else { } else {
logWarning( logWarning(
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it") s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
Seq() Seq.empty
} }
} else { } else {
logWarning(s"ignore ${new Path(path, name)}") logWarning(s"ignore ${new Path(path, name)}")
Seq() Seq.empty
} }
} }
} }

View file

@ -23,7 +23,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.Partition import org.apache.spark.Partition
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
@ -81,7 +80,7 @@ private[sql] object JDBCRelation extends Logging {
val column = partitioning.column val column = partitioning.column
var i: Int = 0 var i: Int = 0
var currentValue: Long = lowerBound var currentValue: Long = lowerBound
var ans = new ArrayBuffer[Partition]() val ans = new ArrayBuffer[Partition]()
while (i < numPartitions) { while (i < numPartitions) {
val lBound = if (i != 0) s"$column >= $currentValue" else null val lBound = if (i != 0) s"$column >= $currentValue" else null
currentValue += stride currentValue += stride

View file

@ -220,7 +220,7 @@ class ParquetFileFormat
val needMerged: Seq[FileStatus] = val needMerged: Seq[FileStatus] =
if (mergeRespectSummaries) { if (mergeRespectSummaries) {
Seq() Seq.empty
} else { } else {
filesByType.data filesByType.data
} }

View file

@ -35,8 +35,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
import org.apache.spark.sql.execution.streaming.GroupStateImpl import org.apache.spark.sql.execution.streaming.GroupStateImpl
import org.apache.spark.sql.streaming.GroupStateTimeout import org.apache.spark.sql.streaming.GroupStateTimeout
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
/** /**
* Physical version of `ObjectProducer`. * Physical version of `ObjectProducer`.
@ -403,8 +401,7 @@ case class FlatMapGroupsInRExec(
Seq(groupingAttributes.map(SortOrder(_, Ascending))) Seq(groupingAttributes.map(SortOrder(_, Ascending)))
override protected def doExecute(): RDD[InternalRow] = { override protected def doExecute(): RDD[InternalRow] = {
val isSerializedRData = val isSerializedRData = outputSchema == SERIALIZED_R_DATA_SCHEMA
if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false
val serializerForR = if (!isSerializedRData) { val serializerForR = if (!isSerializedRData) {
SerializationFormats.ROW SerializationFormats.ROW
} else { } else {

View file

@ -34,8 +34,7 @@ case class MapPartitionsRWrapper(
outputSchema: StructType) extends (Iterator[Any] => Iterator[Any]) { outputSchema: StructType) extends (Iterator[Any] => Iterator[Any]) {
def apply(iter: Iterator[Any]): Iterator[Any] = { def apply(iter: Iterator[Any]): Iterator[Any] = {
// If the content of current DataFrame is serialized R data? // If the content of current DataFrame is serialized R data?
val isSerializedRData = val isSerializedRData = inputSchema == SERIALIZED_R_DATA_SCHEMA
if (inputSchema == SERIALIZED_R_DATA_SCHEMA) true else false
val (newIter, deserializer, colNames) = val (newIter, deserializer, colNames) =
if (!isSerializedRData) { if (!isSerializedRData) {

View file

@ -170,12 +170,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
private def compact(batchId: Long, logs: Array[T]): Boolean = { private def compact(batchId: Long, logs: Array[T]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs
if (super.add(batchId, compactLogs(allLogs).toArray)) {
true
} else {
// Return false as there is another writer. // Return false as there is another writer.
false super.add(batchId, compactLogs(allLogs).toArray)
}
} }
/** /**

View file

@ -609,7 +609,7 @@ class StreamExecution(
} }
// A list of attributes that will need to be updated. // A list of attributes that will need to be updated.
var replacements = new ArrayBuffer[(Attribute, Attribute)] val replacements = new ArrayBuffer[(Attribute, Attribute)]
// Replace sources in the logical plan with data that has arrived since the last batch. // Replace sources in the logical plan with data that has arrived since the last batch.
val withNewSources = logicalPlan transform { val withNewSources = logicalPlan transform {
case StreamingExecutionRelation(source, output) => case StreamingExecutionRelation(source, output) =>

View file

@ -65,9 +65,9 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
checkAnswer( checkAnswer(
decimalData.groupBy("a").agg(sum("b")), decimalData.groupBy("a").agg(sum("b")),
Seq(Row(new java.math.BigDecimal(1.0), new java.math.BigDecimal(3.0)), Seq(Row(new java.math.BigDecimal(1), new java.math.BigDecimal(3)),
Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(3.0)), Row(new java.math.BigDecimal(2), new java.math.BigDecimal(3)),
Row(new java.math.BigDecimal(3.0), new java.math.BigDecimal(3.0))) Row(new java.math.BigDecimal(3), new java.math.BigDecimal(3)))
) )
val decimalDataWithNulls = spark.sparkContext.parallelize( val decimalDataWithNulls = spark.sparkContext.parallelize(
@ -80,10 +80,10 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
DecimalData(null, 2) :: Nil).toDF() DecimalData(null, 2) :: Nil).toDF()
checkAnswer( checkAnswer(
decimalDataWithNulls.groupBy("a").agg(sum("b")), decimalDataWithNulls.groupBy("a").agg(sum("b")),
Seq(Row(new java.math.BigDecimal(1.0), new java.math.BigDecimal(1.0)), Seq(Row(new java.math.BigDecimal(1), new java.math.BigDecimal(1)),
Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(1.0)), Row(new java.math.BigDecimal(2), new java.math.BigDecimal(1)),
Row(new java.math.BigDecimal(3.0), new java.math.BigDecimal(3.0)), Row(new java.math.BigDecimal(3), new java.math.BigDecimal(3)),
Row(null, new java.math.BigDecimal(2.0))) Row(null, new java.math.BigDecimal(2)))
) )
} }
@ -259,19 +259,19 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
checkAnswer( checkAnswer(
decimalData.agg(avg('a)), decimalData.agg(avg('a)),
Row(new java.math.BigDecimal(2.0))) Row(new java.math.BigDecimal(2)))
checkAnswer( checkAnswer(
decimalData.agg(avg('a), sumDistinct('a)), // non-partial decimalData.agg(avg('a), sumDistinct('a)), // non-partial
Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(6)) :: Nil) Row(new java.math.BigDecimal(2), new java.math.BigDecimal(6)) :: Nil)
checkAnswer( checkAnswer(
decimalData.agg(avg('a cast DecimalType(10, 2))), decimalData.agg(avg('a cast DecimalType(10, 2))),
Row(new java.math.BigDecimal(2.0))) Row(new java.math.BigDecimal(2)))
// non-partial // non-partial
checkAnswer( checkAnswer(
decimalData.agg(avg('a cast DecimalType(10, 2)), sumDistinct('a cast DecimalType(10, 2))), decimalData.agg(avg('a cast DecimalType(10, 2)), sumDistinct('a cast DecimalType(10, 2))),
Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(6)) :: Nil) Row(new java.math.BigDecimal(2), new java.math.BigDecimal(6)) :: Nil)
} }
test("null average") { test("null average") {
@ -520,9 +520,9 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
test("SQL decimal test (used for catching certain decimal handling bugs in aggregates)") { test("SQL decimal test (used for catching certain decimal handling bugs in aggregates)") {
checkAnswer( checkAnswer(
decimalData.groupBy('a cast DecimalType(10, 2)).agg(avg('b cast DecimalType(10, 2))), decimalData.groupBy('a cast DecimalType(10, 2)).agg(avg('b cast DecimalType(10, 2))),
Seq(Row(new java.math.BigDecimal(1.0), new java.math.BigDecimal(1.5)), Seq(Row(new java.math.BigDecimal(1), new java.math.BigDecimal("1.5")),
Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(1.5)), Row(new java.math.BigDecimal(2), new java.math.BigDecimal("1.5")),
Row(new java.math.BigDecimal(3.0), new java.math.BigDecimal(1.5)))) Row(new java.math.BigDecimal(3), new java.math.BigDecimal("1.5"))))
} }
test("SPARK-17616: distinct aggregate combined with a non-partial aggregate") { test("SPARK-17616: distinct aggregate combined with a non-partial aggregate") {

View file

@ -1167,7 +1167,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
} }
test("SPARK-6899: type should match when using codegen") { test("SPARK-6899: type should match when using codegen") {
checkAnswer(decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2.0))) checkAnswer(decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2)))
} }
test("SPARK-7133: Implement struct, array, and map field accessor") { test("SPARK-7133: Implement struct, array, and map field accessor") {
@ -1971,7 +1971,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("SPARK-19691 Calculating percentile of decimal column fails with ClassCastException") { test("SPARK-19691 Calculating percentile of decimal column fails with ClassCastException") {
val df = spark.range(1).selectExpr("CAST(id as DECIMAL) as x").selectExpr("percentile(x, 0.5)") val df = spark.range(1).selectExpr("CAST(id as DECIMAL) as x").selectExpr("percentile(x, 0.5)")
checkAnswer(df, Row(BigDecimal(0.0)) :: Nil) checkAnswer(df, Row(BigDecimal(0)) :: Nil)
} }
test("SPARK-19893: cannot run set operations with map type") { test("SPARK-19893: cannot run set operations with map type") {

View file

@ -1546,10 +1546,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Seq(Row(d))) Seq(Row(d)))
checkAnswer( checkAnswer(
df.selectExpr("b * a + b"), df.selectExpr("b * a + b"),
Seq(Row(BigDecimal(2.12321)))) Seq(Row(BigDecimal("2.12321"))))
checkAnswer( checkAnswer(
df.selectExpr("b * a - b"), df.selectExpr("b * a - b"),
Seq(Row(BigDecimal(0.12321)))) Seq(Row(BigDecimal("0.12321"))))
checkAnswer( checkAnswer(
df.selectExpr("b * a * b"), df.selectExpr("b * a * b"),
Seq(Row(d))) Seq(Row(d)))

View file

@ -387,7 +387,7 @@ class StringFunctionsSuite extends QueryTest with SharedSQLContext {
Row("6.4817")) Row("6.4817"))
checkAnswer( checkAnswer(
df.select(format_number(lit(BigDecimal(7.128381)), 4)), // not convert anything df.select(format_number(lit(BigDecimal("7.128381")), 4)), // not convert anything
Row("7.1284")) Row("7.1284"))
intercept[AnalysisException] { intercept[AnalysisException] {

View file

@ -826,7 +826,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
StructField("b", DecimalType(2, 2), true):: Nil) StructField("b", DecimalType(2, 2), true):: Nil)
assert(expectedSchema === jsonDF.schema) assert(expectedSchema === jsonDF.schema)
checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal(0.01))) checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal("0.01")))
val mergedJsonDF = spark.read val mergedJsonDF = spark.read
.option("prefersDecimal", "true") .option("prefersDecimal", "true")
@ -839,7 +839,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(expectedMergedSchema === mergedJsonDF.schema) assert(expectedMergedSchema === mergedJsonDF.schema)
checkAnswer( checkAnswer(
mergedJsonDF, mergedJsonDF,
Row(1.0E-39D, BigDecimal(0.01)) :: Row(1.0E-39D, BigDecimal("0.01")) ::
Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
) )
} }

View file

@ -676,7 +676,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
1.5.toFloat, 1.5.toFloat,
4.5, 4.5,
new java.math.BigDecimal(new BigInteger("212500"), 5), new java.math.BigDecimal(new BigInteger("212500"), 5),
new java.math.BigDecimal(2.125), new java.math.BigDecimal("2.125"),
java.sql.Date.valueOf("2015-05-23"), java.sql.Date.valueOf("2015-05-23"),
new Timestamp(0), new Timestamp(0),
"This is a string, /[]?=:", "This is a string, /[]?=:",

View file

@ -245,7 +245,7 @@ private[spark] object HiveUtils extends Logging {
val loader = new IsolatedClientLoader( val loader = new IsolatedClientLoader(
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
sparkConf = conf, sparkConf = conf,
execJars = Seq(), execJars = Seq.empty,
hadoopConf = hadoopConf, hadoopConf = hadoopConf,
config = newTemporaryConfiguration(useInMemoryDerby = true), config = newTemporaryConfiguration(useInMemoryDerby = true),
isolationOn = false, isolationOn = false,

View file

@ -162,8 +162,8 @@ class HadoopTableReader(
if (!sparkSession.sessionState.conf.verifyPartitionPath) { if (!sparkSession.sessionState.conf.verifyPartitionPath) {
partitionToDeserializer partitionToDeserializer
} else { } else {
var existPathSet = collection.mutable.Set[String]() val existPathSet = collection.mutable.Set[String]()
var pathPatternSet = collection.mutable.Set[String]() val pathPatternSet = collection.mutable.Set[String]()
partitionToDeserializer.filter { partitionToDeserializer.filter {
case (partition, partDeserializer) => case (partition, partDeserializer) =>
def updateExistPathSetByPathPattern(pathPatternStr: String) { def updateExistPathSetByPathPattern(pathPatternStr: String) {
@ -181,8 +181,8 @@ class HadoopTableReader(
} }
val partPath = partition.getDataLocation val partPath = partition.getDataLocation
val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size()
var pathPatternStr = getPathPatternByPath(partNum, partPath) val pathPatternStr = getPathPatternByPath(partNum, partPath)
if (!pathPatternSet.contains(pathPatternStr)) { if (!pathPatternSet.contains(pathPatternStr)) {
pathPatternSet += pathPatternStr pathPatternSet += pathPatternStr
updateExistPathSetByPathPattern(pathPatternStr) updateExistPathSetByPathPattern(pathPatternStr)

View file

@ -391,7 +391,7 @@ private[hive] class HiveClientImpl(
val sortColumnNames = if (allAscendingSorted) { val sortColumnNames = if (allAscendingSorted) {
sortColumnOrders.map(_.getCol) sortColumnOrders.map(_.getCol)
} else { } else {
Seq() Seq.empty
} }
Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames)) Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames))
} else { } else {

View file

@ -90,7 +90,7 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors {
Literal(0.asInstanceOf[Double]) :: Literal(0.asInstanceOf[Double]) ::
Literal("0") :: Literal("0") ::
Literal(java.sql.Date.valueOf("2014-09-23")) :: Literal(java.sql.Date.valueOf("2014-09-23")) ::
Literal(Decimal(BigDecimal(123.123))) :: Literal(Decimal(BigDecimal("123.123"))) ::
Literal(new java.sql.Timestamp(123123)) :: Literal(new java.sql.Timestamp(123123)) ::
Literal(Array[Byte](1, 2, 3)) :: Literal(Array[Byte](1, 2, 3)) ::
Literal.create(Seq[Int](1, 2, 3), ArrayType(IntegerType)) :: Literal.create(Seq[Int](1, 2, 3), ArrayType(IntegerType)) ::

View file

@ -21,12 +21,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, InSet, LessThan, LessThanOrEqual, Like, Literal, Or} import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, In, InSet}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.{ByteType, IntegerType, StringType}
// TODO: Refactor this to `HivePartitionFilteringSuite` // TODO: Refactor this to `HivePartitionFilteringSuite`
class HiveClientSuite(version: String) class HiveClientSuite(version: String)
@ -146,7 +143,7 @@ class HiveClientSuite(version: String)
0 to 23, 0 to 23,
"aa" :: "ab" :: "ba" :: "bb" :: Nil, { "aa" :: "ab" :: "ba" :: "bb" :: Nil, {
case expr @ In(v, list) if expr.inSetConvertible => case expr @ In(v, list) if expr.inSetConvertible =>
InSet(v, Set() ++ list.map(_.eval(EmptyRow))) InSet(v, list.map(_.eval(EmptyRow)).toSet)
}) })
} }
@ -165,7 +162,7 @@ class HiveClientSuite(version: String)
0 to 23, 0 to 23,
"ab" :: "ba" :: Nil, { "ab" :: "ba" :: Nil, {
case expr @ In(v, list) if expr.inSetConvertible => case expr @ In(v, list) if expr.inSetConvertible =>
InSet(v, Set() ++ list.map(_.eval(EmptyRow))) InSet(v, list.map(_.eval(EmptyRow)).toSet)
}) })
} }

View file

@ -458,7 +458,7 @@ class StreamingContext private[streaming] (
queue: Queue[RDD[T]], queue: Queue[RDD[T]],
oneAtATime: Boolean = true oneAtATime: Boolean = true
): InputDStream[T] = { ): InputDStream[T] = {
queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) queueStream(queue, oneAtATime, sc.makeRDD(Seq.empty[T], 1))
} }
/** /**

View file

@ -153,7 +153,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def context(): StreamingContext = dstream.context def context(): StreamingContext = dstream.context
/** Return a new DStream by applying a function to all elements of this DStream. */ /** Return a new DStream by applying a function to all elements of this DStream. */
def map[R](f: JFunction[T, R]): JavaDStream[R] = { def map[U](f: JFunction[T, U]): JavaDStream[U] = {
new JavaDStream(dstream.map(f)(fakeClassTag))(fakeClassTag) new JavaDStream(dstream.map(f)(fakeClassTag))(fakeClassTag)
} }

View file

@ -76,7 +76,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// Re-apply the update function to the old state RDD // Re-apply the update function to the old state RDD
val updateFuncLocal = updateFunc val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, S)]) => { val finalFunc = (iterator: Iterator[(K, S)]) => {
val i = iterator.map(t => (t._1, Seq[V](), Option(t._2))) val i = iterator.map(t => (t._1, Seq.empty[V], Option(t._2)))
updateFuncLocal(validTime, i) updateFuncLocal(validTime, i)
} }
val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)

View file

@ -63,7 +63,6 @@ object RawTextHelper {
var i = 0 var i = 0
var len = 0 var len = 0
var done = false
var value: (String, Long) = null var value: (String, Long) = null
var swap: (String, Long) = null var swap: (String, Long) = null
var count = 0 var count = 0