[MINOR][SQL] Remove extra anonymous closure within functional transformations
## What changes were proposed in this pull request? This PR removes extra anonymous closure within functional transformations. For example, ```scala .map(item => { ... }) ``` which can be just simply as below: ```scala .map { item => ... } ``` ## How was this patch tested? Related unit tests and `sbt scalastyle`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #12382 from HyukjinKwon/minor-extra-closers.
This commit is contained in:
parent
478af2f455
commit
6fc3dc8839
|
@ -723,7 +723,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
||||||
(safeEnd - safeStart) / step + 1
|
(safeEnd - safeStart) / step + 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => {
|
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex { (i, _) =>
|
||||||
val partitionStart = (i * numElements) / numSlices * step + start
|
val partitionStart = (i * numElements) / numSlices * step + start
|
||||||
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
|
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
|
||||||
def getSafeMargin(bi: BigInt): Long =
|
def getSafeMargin(bi: BigInt): Long =
|
||||||
|
@ -762,7 +762,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Distribute a local Scala collection to form an RDD.
|
/** Distribute a local Scala collection to form an RDD.
|
||||||
|
|
|
@ -843,10 +843,10 @@ private[deploy] class Master(
|
||||||
addressToApp -= app.driver.address
|
addressToApp -= app.driver.address
|
||||||
if (completedApps.size >= RETAINED_APPLICATIONS) {
|
if (completedApps.size >= RETAINED_APPLICATIONS) {
|
||||||
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
|
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
|
||||||
completedApps.take(toRemove).foreach( a => {
|
completedApps.take(toRemove).foreach { a =>
|
||||||
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
|
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
|
||||||
applicationMetricsSystem.removeSource(a.appSource)
|
applicationMetricsSystem.removeSource(a.appSource)
|
||||||
})
|
}
|
||||||
completedApps.trimStart(toRemove)
|
completedApps.trimStart(toRemove)
|
||||||
}
|
}
|
||||||
completedApps += app // Remember it in our history
|
completedApps += app // Remember it in our history
|
||||||
|
|
|
@ -35,9 +35,9 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo
|
||||||
|
|
||||||
override def getPartitions: Array[Partition] = {
|
override def getPartitions: Array[Partition] = {
|
||||||
assertValid()
|
assertValid()
|
||||||
(0 until blockIds.length).map(i => {
|
(0 until blockIds.length).map { i =>
|
||||||
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
|
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
|
||||||
}).toArray
|
}.toArray
|
||||||
}
|
}
|
||||||
|
|
||||||
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
|
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
|
||||||
|
|
|
@ -422,7 +422,7 @@ private[spark] object HadoopRDD extends Logging {
|
||||||
|
|
||||||
private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
|
private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
|
||||||
val out = ListBuffer[String]()
|
val out = ListBuffer[String]()
|
||||||
infos.foreach { loc => {
|
infos.foreach { loc =>
|
||||||
val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
|
val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
|
||||||
getLocation.invoke(loc).asInstanceOf[String]
|
getLocation.invoke(loc).asInstanceOf[String]
|
||||||
if (locationStr != "localhost") {
|
if (locationStr != "localhost") {
|
||||||
|
@ -434,7 +434,7 @@ private[spark] object HadoopRDD extends Logging {
|
||||||
out += new HostTaskLocation(locationStr).toString
|
out += new HostTaskLocation(locationStr).toString
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}}
|
}
|
||||||
out.seq
|
out.seq
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,11 +121,11 @@ private object ParallelCollectionRDD {
|
||||||
// Sequences need to be sliced at the same set of index positions for operations
|
// Sequences need to be sliced at the same set of index positions for operations
|
||||||
// like RDD.zip() to behave as expected
|
// like RDD.zip() to behave as expected
|
||||||
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
|
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
|
||||||
(0 until numSlices).iterator.map(i => {
|
(0 until numSlices).iterator.map { i =>
|
||||||
val start = ((i * length) / numSlices).toInt
|
val start = ((i * length) / numSlices).toInt
|
||||||
val end = (((i + 1) * length) / numSlices).toInt
|
val end = (((i + 1) * length) / numSlices).toInt
|
||||||
(start, end)
|
(start, end)
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
seq match {
|
seq match {
|
||||||
case r: Range =>
|
case r: Range =>
|
||||||
|
|
|
@ -68,9 +68,9 @@ class PartitionerAwareUnionRDD[T: ClassTag](
|
||||||
|
|
||||||
override def getPartitions: Array[Partition] = {
|
override def getPartitions: Array[Partition] = {
|
||||||
val numPartitions = partitioner.get.numPartitions
|
val numPartitions = partitioner.get.numPartitions
|
||||||
(0 until numPartitions).map(index => {
|
(0 until numPartitions).map { index =>
|
||||||
new PartitionerAwareUnionRDDPartition(rdds, index)
|
new PartitionerAwareUnionRDDPartition(rdds, index)
|
||||||
}).toArray
|
}.toArray
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the location where most of the partitions of parent RDDs are located
|
// Get the location where most of the partitions of parent RDDs are located
|
||||||
|
|
|
@ -226,7 +226,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
|
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
|
||||||
offerAttributes.asScala.map(attr => {
|
offerAttributes.asScala.map { attr =>
|
||||||
val attrValue = attr.getType match {
|
val attrValue = attr.getType match {
|
||||||
case Value.Type.SCALAR => attr.getScalar
|
case Value.Type.SCALAR => attr.getScalar
|
||||||
case Value.Type.RANGES => attr.getRanges
|
case Value.Type.RANGES => attr.getRanges
|
||||||
|
@ -234,7 +234,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
|
||||||
case Value.Type.TEXT => attr.getText
|
case Value.Type.TEXT => attr.getText
|
||||||
}
|
}
|
||||||
(attr.getName, attrValue)
|
(attr.getName, attrValue)
|
||||||
}).toMap
|
}.toMap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -69,10 +69,10 @@ private[spark] class BlockStoreShuffleReader[K, C](
|
||||||
// Update the context task metrics for each record read.
|
// Update the context task metrics for each record read.
|
||||||
val readMetrics = context.taskMetrics.registerTempShuffleReadMetrics()
|
val readMetrics = context.taskMetrics.registerTempShuffleReadMetrics()
|
||||||
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
|
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
|
||||||
recordIter.map(record => {
|
recordIter.map { record =>
|
||||||
readMetrics.incRecordsRead(1)
|
readMetrics.incRecordsRead(1)
|
||||||
record
|
record
|
||||||
}),
|
},
|
||||||
context.taskMetrics().mergeShuffleReadMetrics())
|
context.taskMetrics().mergeShuffleReadMetrics())
|
||||||
|
|
||||||
// An interruptible iterator must be used here in order to support task cancellation
|
// An interruptible iterator must be used here in order to support task cancellation
|
||||||
|
|
|
@ -42,13 +42,13 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
|
||||||
var hasShuffleWrite = false
|
var hasShuffleWrite = false
|
||||||
var hasShuffleRead = false
|
var hasShuffleRead = false
|
||||||
var hasBytesSpilled = false
|
var hasBytesSpilled = false
|
||||||
stageData.foreach(data => {
|
stageData.foreach { data =>
|
||||||
hasInput = data.hasInput
|
hasInput = data.hasInput
|
||||||
hasOutput = data.hasOutput
|
hasOutput = data.hasOutput
|
||||||
hasShuffleRead = data.hasShuffleRead
|
hasShuffleRead = data.hasShuffleRead
|
||||||
hasShuffleWrite = data.hasShuffleWrite
|
hasShuffleWrite = data.hasShuffleWrite
|
||||||
hasBytesSpilled = data.hasBytesSpilled
|
hasBytesSpilled = data.hasBytesSpilled
|
||||||
})
|
}
|
||||||
|
|
||||||
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
|
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
|
||||||
<thead>
|
<thead>
|
||||||
|
|
|
@ -116,7 +116,7 @@ object RecoverableNetworkWordCount {
|
||||||
val lines = ssc.socketTextStream(ip, port)
|
val lines = ssc.socketTextStream(ip, port)
|
||||||
val words = lines.flatMap(_.split(" "))
|
val words = lines.flatMap(_.split(" "))
|
||||||
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
|
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
|
||||||
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
|
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
|
||||||
// Get or register the blacklist Broadcast
|
// Get or register the blacklist Broadcast
|
||||||
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
|
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
|
||||||
// Get or register the droppedWordsCounter Accumulator
|
// Get or register the droppedWordsCounter Accumulator
|
||||||
|
@ -135,7 +135,7 @@ object RecoverableNetworkWordCount {
|
||||||
println("Dropped " + droppedWordsCounter.value + " word(s) totally")
|
println("Dropped " + droppedWordsCounter.value + " word(s) totally")
|
||||||
println("Appending to " + outputFile.getAbsolutePath)
|
println("Appending to " + outputFile.getAbsolutePath)
|
||||||
Files.append(output + "\n", outputFile, Charset.defaultCharset())
|
Files.append(output + "\n", outputFile, Charset.defaultCharset())
|
||||||
})
|
}
|
||||||
ssc
|
ssc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ object SqlNetworkWordCount {
|
||||||
val words = lines.flatMap(_.split(" "))
|
val words = lines.flatMap(_.split(" "))
|
||||||
|
|
||||||
// Convert RDDs of the words DStream to DataFrame and run SQL query
|
// Convert RDDs of the words DStream to DataFrame and run SQL query
|
||||||
words.foreachRDD((rdd: RDD[String], time: Time) => {
|
words.foreachRDD { (rdd: RDD[String], time: Time) =>
|
||||||
// Get the singleton instance of SQLContext
|
// Get the singleton instance of SQLContext
|
||||||
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
|
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
|
||||||
import sqlContext.implicits._
|
import sqlContext.implicits._
|
||||||
|
@ -75,7 +75,7 @@ object SqlNetworkWordCount {
|
||||||
sqlContext.sql("select word, count(*) as total from words group by word")
|
sqlContext.sql("select word, count(*) as total from words group by word")
|
||||||
println(s"========= $time =========")
|
println(s"========= $time =========")
|
||||||
wordCountsDataFrame.show()
|
wordCountsDataFrame.show()
|
||||||
})
|
}
|
||||||
|
|
||||||
ssc.start()
|
ssc.start()
|
||||||
ssc.awaitTermination()
|
ssc.awaitTermination()
|
||||||
|
|
|
@ -129,9 +129,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
|
||||||
* @param success Whether the batch was successful or not.
|
* @param success Whether the batch was successful or not.
|
||||||
*/
|
*/
|
||||||
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
|
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
|
||||||
removeAndGetProcessor(sequenceNumber).foreach(processor => {
|
removeAndGetProcessor(sequenceNumber).foreach { processor =>
|
||||||
processor.batchProcessed(success)
|
processor.batchProcessed(success)
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -88,23 +88,23 @@ class SparkSink extends AbstractSink with Logging with Configurable {
|
||||||
// dependencies which are being excluded in the build. In practice,
|
// dependencies which are being excluded in the build. In practice,
|
||||||
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
|
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
|
||||||
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
|
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
|
||||||
serverOpt.foreach(server => {
|
serverOpt.foreach { server =>
|
||||||
logInfo("Starting Avro server for sink: " + getName)
|
logInfo("Starting Avro server for sink: " + getName)
|
||||||
server.start()
|
server.start()
|
||||||
})
|
}
|
||||||
super.start()
|
super.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
override def stop() {
|
override def stop() {
|
||||||
logInfo("Stopping Spark Sink: " + getName)
|
logInfo("Stopping Spark Sink: " + getName)
|
||||||
handler.foreach(callbackHandler => {
|
handler.foreach { callbackHandler =>
|
||||||
callbackHandler.shutdown()
|
callbackHandler.shutdown()
|
||||||
})
|
}
|
||||||
serverOpt.foreach(server => {
|
serverOpt.foreach { server =>
|
||||||
logInfo("Stopping Avro Server for sink: " + getName)
|
logInfo("Stopping Avro Server for sink: " + getName)
|
||||||
server.close()
|
server.close()
|
||||||
server.join()
|
server.join()
|
||||||
})
|
}
|
||||||
blockingLatch.countDown()
|
blockingLatch.countDown()
|
||||||
super.stop()
|
super.stop()
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,7 +110,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
|
||||||
eventBatch.setErrorMsg("Something went wrong. Channel was " +
|
eventBatch.setErrorMsg("Something went wrong. Channel was " +
|
||||||
"unable to create a transaction!")
|
"unable to create a transaction!")
|
||||||
}
|
}
|
||||||
txOpt.foreach(tx => {
|
txOpt.foreach { tx =>
|
||||||
tx.begin()
|
tx.begin()
|
||||||
val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
|
val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
|
||||||
val loop = new Breaks
|
val loop = new Breaks
|
||||||
|
@ -145,7 +145,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
|
||||||
// At this point, the events are available, so fill them into the event batch
|
// At this point, the events are available, so fill them into the event batch
|
||||||
eventBatch = new EventBatch("", seqNum, events)
|
eventBatch = new EventBatch("", seqNum, events)
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case interrupted: InterruptedException =>
|
case interrupted: InterruptedException =>
|
||||||
// Don't pollute logs if the InterruptedException came from this being stopped
|
// Don't pollute logs if the InterruptedException came from this being stopped
|
||||||
|
@ -156,9 +156,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
|
||||||
logWarning("Error while processing transaction.", e)
|
logWarning("Error while processing transaction.", e)
|
||||||
eventBatch.setErrorMsg(e.getMessage)
|
eventBatch.setErrorMsg(e.getMessage)
|
||||||
try {
|
try {
|
||||||
txOpt.foreach(tx => {
|
txOpt.foreach { tx =>
|
||||||
rollbackAndClose(tx, close = true)
|
rollbackAndClose(tx, close = true)
|
||||||
})
|
}
|
||||||
} finally {
|
} finally {
|
||||||
txOpt = None
|
txOpt = None
|
||||||
}
|
}
|
||||||
|
@ -174,7 +174,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
|
||||||
*/
|
*/
|
||||||
private def processAckOrNack() {
|
private def processAckOrNack() {
|
||||||
batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
|
batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
|
||||||
txOpt.foreach(tx => {
|
txOpt.foreach { tx =>
|
||||||
if (batchSuccess) {
|
if (batchSuccess) {
|
||||||
try {
|
try {
|
||||||
logDebug("Committing transaction")
|
logDebug("Committing transaction")
|
||||||
|
@ -197,7 +197,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
|
||||||
// cause issues. This is required to ensure the TransactionProcessor instance is not leaked
|
// cause issues. This is required to ensure the TransactionProcessor instance is not leaked
|
||||||
parent.removeAndGetProcessor(seqNum)
|
parent.removeAndGetProcessor(seqNum)
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -79,11 +79,11 @@ private[streaming] class FlumePollingReceiver(
|
||||||
|
|
||||||
override def onStart(): Unit = {
|
override def onStart(): Unit = {
|
||||||
// Create the connections to each Flume agent.
|
// Create the connections to each Flume agent.
|
||||||
addresses.foreach(host => {
|
addresses.foreach { host =>
|
||||||
val transceiver = new NettyTransceiver(host, channelFactory)
|
val transceiver = new NettyTransceiver(host, channelFactory)
|
||||||
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
|
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
|
||||||
connections.add(new FlumeConnection(transceiver, client))
|
connections.add(new FlumeConnection(transceiver, client))
|
||||||
})
|
}
|
||||||
for (i <- 0 until parallelism) {
|
for (i <- 0 until parallelism) {
|
||||||
logInfo("Starting Flume Polling Receiver worker threads..")
|
logInfo("Starting Flume Polling Receiver worker threads..")
|
||||||
// Threads that pull data from Flume.
|
// Threads that pull data from Flume.
|
||||||
|
|
|
@ -123,9 +123,9 @@ private[flume] class PollingFlumeTestUtils {
|
||||||
val latch = new CountDownLatch(batchCount * channels.size)
|
val latch = new CountDownLatch(batchCount * channels.size)
|
||||||
sinks.foreach(_.countdownWhenBatchReceived(latch))
|
sinks.foreach(_.countdownWhenBatchReceived(latch))
|
||||||
|
|
||||||
channels.foreach(channel => {
|
channels.foreach { channel =>
|
||||||
executorCompletion.submit(new TxnSubmitter(channel))
|
executorCompletion.submit(new TxnSubmitter(channel))
|
||||||
})
|
}
|
||||||
|
|
||||||
for (i <- 0 until channels.size) {
|
for (i <- 0 until channels.size) {
|
||||||
executorCompletion.take()
|
executorCompletion.take()
|
||||||
|
|
|
@ -519,7 +519,7 @@ class CodegenContext {
|
||||||
// Get all the expressions that appear at least twice and set up the state for subexpression
|
// Get all the expressions that appear at least twice and set up the state for subexpression
|
||||||
// elimination.
|
// elimination.
|
||||||
val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1)
|
val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1)
|
||||||
commonExprs.foreach(e => {
|
commonExprs.foreach { e =>
|
||||||
val expr = e.head
|
val expr = e.head
|
||||||
val fnName = freshName("evalExpr")
|
val fnName = freshName("evalExpr")
|
||||||
val isNull = s"${fnName}IsNull"
|
val isNull = s"${fnName}IsNull"
|
||||||
|
@ -561,7 +561,7 @@ class CodegenContext {
|
||||||
subexprFunctions += s"$fnName($INPUT_ROW);"
|
subexprFunctions += s"$fnName($INPUT_ROW);"
|
||||||
val state = SubExprEliminationState(isNull, value)
|
val state = SubExprEliminationState(isNull, value)
|
||||||
e.foreach(subExprEliminationExprs.put(_, state))
|
e.foreach(subExprEliminationExprs.put(_, state))
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -286,10 +286,10 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
|
||||||
assert(children.nonEmpty)
|
assert(children.nonEmpty)
|
||||||
if (projectList.forall(_.deterministic)) {
|
if (projectList.forall(_.deterministic)) {
|
||||||
val newFirstChild = Project(projectList, children.head)
|
val newFirstChild = Project(projectList, children.head)
|
||||||
val newOtherChildren = children.tail.map ( child => {
|
val newOtherChildren = children.tail.map { child =>
|
||||||
val rewrites = buildRewrites(children.head, child)
|
val rewrites = buildRewrites(children.head, child)
|
||||||
Project(projectList.map(pushToRight(_, rewrites)), child)
|
Project(projectList.map(pushToRight(_, rewrites)), child)
|
||||||
} )
|
}
|
||||||
Union(newFirstChild +: newOtherChildren)
|
Union(newFirstChild +: newOtherChildren)
|
||||||
} else {
|
} else {
|
||||||
p
|
p
|
||||||
|
|
|
@ -400,7 +400,7 @@ case class Range(
|
||||||
sqlContext
|
sqlContext
|
||||||
.sparkContext
|
.sparkContext
|
||||||
.parallelize(0 until numSlices, numSlices)
|
.parallelize(0 until numSlices, numSlices)
|
||||||
.mapPartitionsWithIndex((i, _) => {
|
.mapPartitionsWithIndex { (i, _) =>
|
||||||
val partitionStart = (i * numElements) / numSlices * step + start
|
val partitionStart = (i * numElements) / numSlices * step + start
|
||||||
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
|
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
|
||||||
def getSafeMargin(bi: BigInt): Long =
|
def getSafeMargin(bi: BigInt): Long =
|
||||||
|
@ -444,7 +444,7 @@ case class Range(
|
||||||
unsafeRow
|
unsafeRow
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -251,12 +251,12 @@ object JdbcUtils extends Logging {
|
||||||
def schemaString(df: DataFrame, url: String): String = {
|
def schemaString(df: DataFrame, url: String): String = {
|
||||||
val sb = new StringBuilder()
|
val sb = new StringBuilder()
|
||||||
val dialect = JdbcDialects.get(url)
|
val dialect = JdbcDialects.get(url)
|
||||||
df.schema.fields foreach { field => {
|
df.schema.fields foreach { field =>
|
||||||
val name = field.name
|
val name = field.name
|
||||||
val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition
|
val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition
|
||||||
val nullable = if (field.nullable) "" else "NOT NULL"
|
val nullable = if (field.nullable) "" else "NOT NULL"
|
||||||
sb.append(s", $name $typ $nullable")
|
sb.append(s", $name $typ $nullable")
|
||||||
}}
|
}
|
||||||
if (sb.length < 2) "" else sb.substring(2)
|
if (sb.length < 2) "" else sb.substring(2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -450,9 +450,7 @@ private[hive] trait HiveInspectors {
|
||||||
if (o != null) {
|
if (o != null) {
|
||||||
val array = o.asInstanceOf[ArrayData]
|
val array = o.asInstanceOf[ArrayData]
|
||||||
val values = new java.util.ArrayList[Any](array.numElements())
|
val values = new java.util.ArrayList[Any](array.numElements())
|
||||||
array.foreach(elementType, (_, e) => {
|
array.foreach(elementType, (_, e) => values.add(wrapper(e)))
|
||||||
values.add(wrapper(e))
|
|
||||||
})
|
|
||||||
values
|
values
|
||||||
} else {
|
} else {
|
||||||
null
|
null
|
||||||
|
@ -468,9 +466,8 @@ private[hive] trait HiveInspectors {
|
||||||
if (o != null) {
|
if (o != null) {
|
||||||
val map = o.asInstanceOf[MapData]
|
val map = o.asInstanceOf[MapData]
|
||||||
val jmap = new java.util.HashMap[Any, Any](map.numElements())
|
val jmap = new java.util.HashMap[Any, Any](map.numElements())
|
||||||
map.foreach(mt.keyType, mt.valueType, (k, v) => {
|
map.foreach(mt.keyType, mt.valueType, (k, v) =>
|
||||||
jmap.put(keyWrapper(k), valueWrapper(v))
|
jmap.put(keyWrapper(k), valueWrapper(v)))
|
||||||
})
|
|
||||||
jmap
|
jmap
|
||||||
} else {
|
} else {
|
||||||
null
|
null
|
||||||
|
@ -587,9 +584,9 @@ private[hive] trait HiveInspectors {
|
||||||
case x: ListObjectInspector =>
|
case x: ListObjectInspector =>
|
||||||
val list = new java.util.ArrayList[Object]
|
val list = new java.util.ArrayList[Object]
|
||||||
val tpe = dataType.asInstanceOf[ArrayType].elementType
|
val tpe = dataType.asInstanceOf[ArrayType].elementType
|
||||||
a.asInstanceOf[ArrayData].foreach(tpe, (_, e) => {
|
a.asInstanceOf[ArrayData].foreach(tpe, (_, e) =>
|
||||||
list.add(wrap(e, x.getListElementObjectInspector, tpe))
|
list.add(wrap(e, x.getListElementObjectInspector, tpe))
|
||||||
})
|
)
|
||||||
list
|
list
|
||||||
case x: MapObjectInspector =>
|
case x: MapObjectInspector =>
|
||||||
val keyType = dataType.asInstanceOf[MapType].keyType
|
val keyType = dataType.asInstanceOf[MapType].keyType
|
||||||
|
@ -599,10 +596,10 @@ private[hive] trait HiveInspectors {
|
||||||
// Some UDFs seem to assume we pass in a HashMap.
|
// Some UDFs seem to assume we pass in a HashMap.
|
||||||
val hashMap = new java.util.HashMap[Any, Any](map.numElements())
|
val hashMap = new java.util.HashMap[Any, Any](map.numElements())
|
||||||
|
|
||||||
map.foreach(keyType, valueType, (k, v) => {
|
map.foreach(keyType, valueType, (k, v) =>
|
||||||
hashMap.put(wrap(k, x.getMapKeyObjectInspector, keyType),
|
hashMap.put(wrap(k, x.getMapKeyObjectInspector, keyType),
|
||||||
wrap(v, x.getMapValueObjectInspector, valueType))
|
wrap(v, x.getMapValueObjectInspector, valueType))
|
||||||
})
|
)
|
||||||
|
|
||||||
hashMap
|
hashMap
|
||||||
}
|
}
|
||||||
|
@ -704,9 +701,8 @@ private[hive] trait HiveInspectors {
|
||||||
ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector, null)
|
ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector, null)
|
||||||
} else {
|
} else {
|
||||||
val list = new java.util.ArrayList[Object]()
|
val list = new java.util.ArrayList[Object]()
|
||||||
value.asInstanceOf[ArrayData].foreach(dt, (_, e) => {
|
value.asInstanceOf[ArrayData].foreach(dt, (_, e) =>
|
||||||
list.add(wrap(e, listObjectInspector, dt))
|
list.add(wrap(e, listObjectInspector, dt)))
|
||||||
})
|
|
||||||
ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector, list)
|
ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector, list)
|
||||||
}
|
}
|
||||||
case Literal(value, MapType(keyType, valueType, _)) =>
|
case Literal(value, MapType(keyType, valueType, _)) =>
|
||||||
|
@ -718,9 +714,8 @@ private[hive] trait HiveInspectors {
|
||||||
val map = value.asInstanceOf[MapData]
|
val map = value.asInstanceOf[MapData]
|
||||||
val jmap = new java.util.HashMap[Any, Any](map.numElements())
|
val jmap = new java.util.HashMap[Any, Any](map.numElements())
|
||||||
|
|
||||||
map.foreach(keyType, valueType, (k, v) => {
|
map.foreach(keyType, valueType, (k, v) =>
|
||||||
jmap.put(wrap(k, keyOI, keyType), wrap(v, valueOI, valueType))
|
jmap.put(wrap(k, keyOI, keyType), wrap(v, valueOI, valueType)))
|
||||||
})
|
|
||||||
|
|
||||||
ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, jmap)
|
ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, jmap)
|
||||||
}
|
}
|
||||||
|
|
|
@ -247,10 +247,10 @@ class CheckpointWriter(
|
||||||
// Delete old checkpoint files
|
// Delete old checkpoint files
|
||||||
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
|
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
|
||||||
if (allCheckpointFiles.size > 10) {
|
if (allCheckpointFiles.size > 10) {
|
||||||
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
|
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file =>
|
||||||
logInfo("Deleting " + file)
|
logInfo("Deleting " + file)
|
||||||
fs.delete(file, true)
|
fs.delete(file, true)
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// All done, print success
|
// All done, print success
|
||||||
|
@ -345,7 +345,7 @@ object CheckpointReader extends Logging {
|
||||||
// Try to read the checkpoint files in the order
|
// Try to read the checkpoint files in the order
|
||||||
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
|
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
|
||||||
var readError: Exception = null
|
var readError: Exception = null
|
||||||
checkpointFiles.foreach(file => {
|
checkpointFiles.foreach { file =>
|
||||||
logInfo("Attempting to load checkpoint from file " + file)
|
logInfo("Attempting to load checkpoint from file " + file)
|
||||||
try {
|
try {
|
||||||
val fis = fs.open(file)
|
val fis = fs.open(file)
|
||||||
|
@ -358,7 +358,7 @@ object CheckpointReader extends Logging {
|
||||||
readError = e
|
readError = e
|
||||||
logWarning("Error reading checkpoint from file " + file, e)
|
logWarning("Error reading checkpoint from file " + file, e)
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
|
|
||||||
// If none of checkpoint files could be read, then throw exception
|
// If none of checkpoint files could be read, then throw exception
|
||||||
if (!ignoreReadError) {
|
if (!ignoreReadError) {
|
||||||
|
|
|
@ -48,11 +48,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
|
||||||
// and then apply the update function
|
// and then apply the update function
|
||||||
val updateFuncLocal = updateFunc
|
val updateFuncLocal = updateFunc
|
||||||
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
|
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
|
||||||
val i = iterator.map(t => {
|
val i = iterator.map { t =>
|
||||||
val itr = t._2._2.iterator
|
val itr = t._2._2.iterator
|
||||||
val headOption = if (itr.hasNext) Some(itr.next()) else None
|
val headOption = if (itr.hasNext) Some(itr.next()) else None
|
||||||
(t._1, t._2._1.toSeq, headOption)
|
(t._1, t._2._1.toSeq, headOption)
|
||||||
})
|
}
|
||||||
updateFuncLocal(i)
|
updateFuncLocal(i)
|
||||||
}
|
}
|
||||||
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
|
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
|
||||||
|
|
|
@ -434,11 +434,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
|
||||||
* worker nodes as a parallel collection, and runs them.
|
* worker nodes as a parallel collection, and runs them.
|
||||||
*/
|
*/
|
||||||
private def launchReceivers(): Unit = {
|
private def launchReceivers(): Unit = {
|
||||||
val receivers = receiverInputStreams.map(nis => {
|
val receivers = receiverInputStreams.map { nis =>
|
||||||
val rcvr = nis.getReceiver()
|
val rcvr = nis.getReceiver()
|
||||||
rcvr.setReceiverId(nis.id)
|
rcvr.setReceiverId(nis.id)
|
||||||
rcvr
|
rcvr
|
||||||
})
|
}
|
||||||
|
|
||||||
runDummySparkJob()
|
runDummySparkJob()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue