[SPARK-22830] Scala Coding style has been improved in Spark Examples

## What changes were proposed in this pull request?

* Under Spark Scala Examples: Some of the syntax were written like Java way, It has been re-written as per scala style guide.
* Most of all changes are followed to println() statement.

## How was this patch tested?

Since, All changes proposed are re-writing println statements in scala way, manual run used to test println.

Author: chetkhatri <ckhatrimanjal@gmail.com>

Closes #20016 from chetkhatri/scala-style-spark-examples.
This commit is contained in:
chetkhatri 2017-12-20 14:47:49 -06:00 committed by Sean Owen
parent c89b431183
commit 792915c844
17 changed files with 54 additions and 58 deletions

View file

@ -42,7 +42,7 @@ object BroadcastTest {
val arr1 = (0 until num).toArray
for (i <- 0 until 3) {
println("Iteration " + i)
println(s"Iteration $i")
println("===========")
val startTime = System.nanoTime
val barr1 = sc.broadcast(arr1)

View file

@ -49,12 +49,10 @@ object DFSReadWriteTest {
}
private def printUsage(): Unit = {
val usage: String = "DFS Read-Write Test\n" +
"\n" +
"Usage: localFile dfsDir\n" +
"\n" +
"localFile - (string) local file to use in test\n" +
"dfsDir - (string) DFS directory for read/write tests\n"
val usage = """DFS Read-Write Test
|Usage: localFile dfsDir
|localFile - (string) local file to use in test
|dfsDir - (string) DFS directory for read/write tests""".stripMargin
println(usage)
}
@ -69,13 +67,13 @@ object DFSReadWriteTest {
localFilePath = new File(args(i))
if (!localFilePath.exists) {
System.err.println("Given path (" + args(i) + ") does not exist.\n")
System.err.println(s"Given path (${args(i)}) does not exist")
printUsage()
System.exit(1)
}
if (!localFilePath.isFile) {
System.err.println("Given path (" + args(i) + ") is not a file.\n")
System.err.println(s"Given path (${args(i)}) is not a file")
printUsage()
System.exit(1)
}
@ -108,7 +106,7 @@ object DFSReadWriteTest {
.getOrCreate()
println("Writing local file to DFS")
val dfsFilename = dfsDirPath + "/dfs_read_write_test"
val dfsFilename = s"$dfsDirPath/dfs_read_write_test"
val fileRDD = spark.sparkContext.parallelize(fileContents)
fileRDD.saveAsTextFile(dfsFilename)
@ -127,11 +125,11 @@ object DFSReadWriteTest {
spark.stop()
if (localWordCount == dfsWordCount) {
println(s"Success! Local Word Count ($localWordCount) " +
s"and DFS Word Count ($dfsWordCount) agree.")
println(s"Success! Local Word Count $localWordCount and " +
s"DFS Word Count $dfsWordCount agree.")
} else {
println(s"Failure! Local Word Count ($localWordCount) " +
s"and DFS Word Count ($dfsWordCount) disagree.")
println(s"Failure! Local Word Count $localWordCount " +
s"and DFS Word Count $dfsWordCount disagree.")
}
}

View file

@ -39,7 +39,7 @@ object HdfsTest {
val start = System.currentTimeMillis()
for (x <- mapped) { x + 2 }
val end = System.currentTimeMillis()
println("Iteration " + iter + " took " + (end-start) + " ms")
println(s"Iteration $iter took ${end-start} ms")
}
spark.stop()
}

View file

@ -129,8 +129,7 @@ object LocalALS {
println(s"Iteration $iter:")
ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
println("RMSE = " + rmse(R, ms, us))
println()
println(s"RMSE = ${rmse(R, ms, us)}")
}
}

View file

@ -58,10 +58,10 @@ object LocalFileLR {
// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")
for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = DenseVector.zeros[Double](D)
for (p <- points) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
@ -71,7 +71,7 @@ object LocalFileLR {
}
fileSrc.close()
println("Final w: " + w)
println(s"Final w: $w")
}
}
// scalastyle:on println

View file

@ -88,7 +88,7 @@ object LocalKMeans {
kPoints.put(i, iter.next())
}
println("Initial centers: " + kPoints)
println(s"Initial centers: $kPoints")
while(tempDist > convergeDist) {
val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
@ -114,7 +114,7 @@ object LocalKMeans {
}
}
println("Final centers: " + kPoints)
println(s"Final centers: $kPoints")
}
}
// scalastyle:on println

View file

@ -61,10 +61,10 @@ object LocalLR {
val data = generateData
// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")
for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = DenseVector.zeros[Double](D)
for (p <- data) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
@ -73,7 +73,7 @@ object LocalLR {
w -= gradient
}
println("Final w: " + w)
println(s"Final w: $w")
}
}
// scalastyle:on println

View file

@ -28,7 +28,7 @@ object LocalPi {
val y = random * 2 - 1
if (x*x + y*y <= 1) count += 1
}
println("Pi is roughly " + 4 * count / 100000.0)
println(s"Pi is roughly ${4 * count / 100000.0}")
}
}
// scalastyle:on println

View file

@ -59,7 +59,7 @@ object SimpleSkewedGroupByTest {
// Enforce that everything has been calculated and in cache
pairs1.count
println("RESULT: " + pairs1.groupByKey(numReducers).count)
println(s"RESULT: ${pairs1.groupByKey(numReducers).count}")
// Print how many keys each reducer got (for debugging)
// println("RESULT: " + pairs1.groupByKey(numReducers)
// .map{case (k,v) => (k, v.size)}

View file

@ -135,10 +135,8 @@ object SparkALS {
.map(i => update(i, usb.value(i), msb.value, Rc.value.transpose()))
.collect()
usb = sc.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))
println()
println(s"RMSE = ${rmse(R, ms, us)}")
}
spark.stop()
}

View file

@ -79,17 +79,17 @@ object SparkHdfsLR {
// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")
for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = points.map { p =>
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}
println("Final w: " + w)
println(s"Final w: $w")
spark.stop()
}
}

View file

@ -95,7 +95,7 @@ object SparkKMeans {
for (newP <- newPoints) {
kPoints(newP._1) = newP._2
}
println("Finished iteration (delta = " + tempDist + ")")
println(s"Finished iteration (delta = $tempDist)")
}
println("Final centers:")

View file

@ -73,17 +73,17 @@ object SparkLR {
// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")
for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = points.map { p =>
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}
println("Final w: " + w)
println(s"Final w: $w")
spark.stop()
}

View file

@ -77,7 +77,7 @@ object SparkPageRank {
}
val output = ranks.collect()
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
output.foreach(tup => println(s"${tup._1} has rank: ${tup._2} ."))
spark.stop()
}

View file

@ -36,7 +36,7 @@ object SparkPi {
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / (n - 1))
println(s"Pi is roughly ${4.0 * count / (n - 1)}")
spark.stop()
}
}

View file

@ -68,7 +68,7 @@ object SparkTC {
nextCount = tc.count()
} while (nextCount != oldCount)
println("TC has " + tc.count() + " edges.")
println(s"TC has ${tc.count()} edges.")
spark.stop()
}
}

View file

@ -27,6 +27,7 @@ import org.apache.spark.graphx.lib._
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
/**
* Driver program for running graph algorithms.
*/
@ -34,12 +35,12 @@ object Analytics extends Logging {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println(
"Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]")
System.err.println("Supported 'taskType' as follows:")
System.err.println(" pagerank Compute PageRank")
System.err.println(" cc Compute the connected components of vertices")
System.err.println(" triangles Count the number of triangles")
val usage = """Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions>
|[other options] Supported 'taskType' as follows:
|pagerank Compute PageRank
|cc Compute the connected components of vertices
|triangles Count the number of triangles""".stripMargin
System.err.println(usage)
System.exit(1)
}
@ -48,7 +49,7 @@ object Analytics extends Logging {
val optionsList = args.drop(2).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
case _ => throw new IllegalArgumentException(s"Invalid argument: $arg")
}
}
val options = mutable.Map(optionsList: _*)
@ -74,14 +75,14 @@ object Analytics extends Logging {
val numIterOpt = options.remove("numIter").map(_.toInt)
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}
println("======================================")
println("| PageRank |")
println("======================================")
val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
val sc = new SparkContext(conf.setAppName(s"PageRank($fname)"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
@ -89,18 +90,18 @@ object Analytics extends Logging {
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
println("GRAPHX: Number of vertices " + graph.vertices.count)
println("GRAPHX: Number of edges " + graph.edges.count)
println(s"GRAPHX: Number of vertices ${graph.vertices.count}")
println(s"GRAPHX: Number of edges ${graph.edges.count}")
val pr = (numIterOpt match {
case Some(numIter) => PageRank.run(graph, numIter)
case None => PageRank.runUntilConvergence(graph, tol)
}).vertices.cache()
println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
println(s"GRAPHX: Total rank: ${pr.map(_._2).reduce(_ + _)}")
if (!outFname.isEmpty) {
logWarning("Saving pageranks of pages to " + outFname)
logWarning(s"Saving pageranks of pages to $outFname")
pr.map { case (id, r) => id + "\t" + r }.saveAsTextFile(outFname)
}
@ -108,14 +109,14 @@ object Analytics extends Logging {
case "cc" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}
println("======================================")
println("| Connected Components |")
println("======================================")
val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
val sc = new SparkContext(conf.setAppName(s"ConnectedComponents($fname)"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
@ -123,19 +124,19 @@ object Analytics extends Logging {
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
val cc = ConnectedComponents.run(graph)
println("Components: " + cc.vertices.map { case (vid, data) => data }.distinct())
println(s"Components: ${cc.vertices.map { case (vid, data) => data }.distinct()}")
sc.stop()
case "triangles" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}
println("======================================")
println("| Triangle Count |")
println("======================================")
val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
val sc = new SparkContext(conf.setAppName(s"TriangleCount($fname)"))
val graph = GraphLoader.edgeListFile(sc, fname,
canonicalOrientation = true,
numEdgePartitions = numEPart,