[SPARK-7929] Turn whitespace checker on for more token types.
This is the last batch of changes to complete SPARK-7929. Previous related PRs: https://github.com/apache/spark/pull/6480 https://github.com/apache/spark/pull/6478 https://github.com/apache/spark/pull/6477 https://github.com/apache/spark/pull/6476 https://github.com/apache/spark/pull/6475 https://github.com/apache/spark/pull/6474 https://github.com/apache/spark/pull/6473 Author: Reynold Xin <rxin@databricks.com> Closes #6487 from rxin/whitespace-lint and squashes the following commits: b33d43d [Reynold Xin] [SPARK-7929] Turn whitespace checker on for more token types.
This commit is contained in:
parent
36067ce398
commit
97a60cf75d
|
@ -143,7 +143,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
|
|||
eventBatch.setErrorMsg(msg)
|
||||
} else {
|
||||
// At this point, the events are available, so fill them into the event batch
|
||||
eventBatch = new EventBatch("",seqNum, events)
|
||||
eventBatch = new EventBatch("", seqNum, events)
|
||||
}
|
||||
})
|
||||
} catch {
|
||||
|
|
|
@ -60,7 +60,7 @@ private[streaming] object EventTransformer extends Logging {
|
|||
out.write(body)
|
||||
val numHeaders = headers.size()
|
||||
out.writeInt(numHeaders)
|
||||
for ((k,v) <- headers) {
|
||||
for ((k, v) <- headers) {
|
||||
val keyBuff = Utils.serialize(k.toString)
|
||||
out.writeInt(keyBuff.length)
|
||||
out.write(keyBuff)
|
||||
|
|
|
@ -65,7 +65,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
|
|||
|
||||
val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
|
||||
|
||||
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
|
||||
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
|
||||
sc, kafkaParams, offsetRanges)
|
||||
|
||||
val received = rdd.map(_._2).collect.toSet
|
||||
|
|
|
@ -17,22 +17,10 @@
|
|||
|
||||
package org.apache.spark.streaming.mqtt
|
||||
|
||||
import java.io.IOException
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.Properties
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.Map
|
||||
import scala.collection.mutable.HashMap
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
|
||||
import org.eclipse.paho.client.mqttv3.MqttCallback
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient
|
||||
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
|
||||
import org.eclipse.paho.client.mqttv3.MqttException
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage
|
||||
import org.eclipse.paho.client.mqttv3.MqttTopic
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
|
||||
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
@ -87,7 +75,7 @@ class MQTTReceiver(
|
|||
|
||||
// Handles Mqtt message
|
||||
override def messageArrived(topic: String, message: MqttMessage) {
|
||||
store(new String(message.getPayload(),"utf-8"))
|
||||
store(new String(message.getPayload(), "utf-8"))
|
||||
}
|
||||
|
||||
override def deliveryComplete(token: IMqttDeliveryToken) {
|
||||
|
|
|
@ -208,7 +208,7 @@ object KinesisWordProducerASL {
|
|||
recordsPerSecond: Int,
|
||||
wordsPerRecord: Int): Seq[(String, Int)] = {
|
||||
|
||||
val randomWords = List("spark","you","are","my","father")
|
||||
val randomWords = List("spark", "you", "are", "my", "father")
|
||||
val totals = scala.collection.mutable.Map[String, Int]()
|
||||
|
||||
// Create the low-level Kinesis Client from the AWS Java SDK.
|
||||
|
|
|
@ -55,7 +55,7 @@ object KinesisUtils {
|
|||
*/
|
||||
def createStream(
|
||||
ssc: StreamingContext,
|
||||
kinesisAppName: String,
|
||||
kinesisAppName: String,
|
||||
streamName: String,
|
||||
endpointUrl: String,
|
||||
regionName: String,
|
||||
|
@ -102,7 +102,7 @@ object KinesisUtils {
|
|||
*/
|
||||
def createStream(
|
||||
ssc: StreamingContext,
|
||||
kinesisAppName: String,
|
||||
kinesisAppName: String,
|
||||
streamName: String,
|
||||
endpointUrl: String,
|
||||
regionName: String,
|
||||
|
|
|
@ -51,8 +51,8 @@
|
|||
</parameters>
|
||||
</check>
|
||||
<check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
|
||||
<check level="error" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="false"></check>
|
||||
<check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
|
||||
<check level="error" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="false"></check>
|
||||
<check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
|
||||
<parameters>
|
||||
<parameter name="maxLineLength"><![CDATA[100]]></parameter>
|
||||
|
@ -142,4 +142,15 @@
|
|||
<check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
|
||||
<check level="error" class="org.scalastyle.scalariform.NonASCIICharacterChecker" enabled="true"></check>
|
||||
<check level="error" class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" enabled="true"></check>
|
||||
<check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceBeforeTokenChecker" enabled="true">
|
||||
<parameters>
|
||||
<parameter name="tokens">ARROW, EQUALS</parameter>
|
||||
</parameters>
|
||||
</check>
|
||||
<check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceAfterTokenChecker" enabled="true">
|
||||
<parameters>
|
||||
<parameter name="tokens">ARROW, EQUALS, COMMA, COLON, IF, WHILE, FOR</parameter>
|
||||
</parameters>
|
||||
</check>
|
||||
<check level="error" class="org.scalastyle.scalariform.NotImplementedErrorUsage" enabled="true"></check>
|
||||
</scalastyle>
|
||||
|
|
|
@ -78,10 +78,10 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
|
|||
Literal(java.sql.Date.valueOf("2014-09-23")) ::
|
||||
Literal(Decimal(BigDecimal(123.123))) ::
|
||||
Literal(new java.sql.Timestamp(123123)) ::
|
||||
Literal(Array[Byte](1,2,3)) ::
|
||||
Literal.create(Seq[Int](1,2,3), ArrayType(IntegerType)) ::
|
||||
Literal.create(Map[Int, Int](1->2, 2->1), MapType(IntegerType, IntegerType)) ::
|
||||
Literal.create(Row(1,2.0d,3.0f),
|
||||
Literal(Array[Byte](1, 2, 3)) ::
|
||||
Literal.create(Seq[Int](1, 2, 3), ArrayType(IntegerType)) ::
|
||||
Literal.create(Map[Int, Int](1 -> 2, 2 -> 1), MapType(IntegerType, IntegerType)) ::
|
||||
Literal.create(Row(1, 2.0d, 3.0f),
|
||||
StructType(StructField("c1", IntegerType) ::
|
||||
StructField("c2", DoubleType) ::
|
||||
StructField("c3", FloatType) :: Nil)) ::
|
||||
|
@ -111,8 +111,8 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
|
|||
case DecimalType() => PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector
|
||||
case StructType(fields) =>
|
||||
ObjectInspectorFactory.getStandardStructObjectInspector(
|
||||
java.util.Arrays.asList(fields.map(f => f.name) :_*),
|
||||
java.util.Arrays.asList(fields.map(f => toWritableInspector(f.dataType)) :_*))
|
||||
java.util.Arrays.asList(fields.map(f => f.name) : _*),
|
||||
java.util.Arrays.asList(fields.map(f => toWritableInspector(f.dataType)) : _*))
|
||||
}
|
||||
|
||||
def checkDataType(dt1: Seq[DataType], dt2: Seq[DataType]): Unit = {
|
||||
|
|
|
@ -160,7 +160,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
|
|||
"p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=1"::Nil ,
|
||||
"p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=4"::Nil
|
||||
)
|
||||
assert(listFolders(tmpDir,List()).sortBy(_.toString()) == expected.sortBy(_.toString))
|
||||
assert(listFolders(tmpDir, List()).sortBy(_.toString()) == expected.sortBy(_.toString))
|
||||
sql("DROP TABLE table_with_partition")
|
||||
sql("DROP TABLE tmp_table")
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfterAll {
|
|||
import org.apache.spark.sql.hive.test.TestHive.implicits._
|
||||
|
||||
val df =
|
||||
sparkContext.parallelize((1 to 10).map(i => (i,s"str$i"))).toDF("key", "value")
|
||||
sparkContext.parallelize((1 to 10).map(i => (i, s"str$i"))).toDF("key", "value")
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
// The catalog in HiveContext is a case insensitive one.
|
||||
|
|
|
@ -26,9 +26,9 @@ case class FunctionResult(f1: String, f2: String)
|
|||
|
||||
class UDFSuite extends QueryTest {
|
||||
test("UDF case insensitive") {
|
||||
udf.register("random0", () => { Math.random()})
|
||||
udf.register("RANDOM1", () => { Math.random()})
|
||||
udf.register("strlenScala", (_: String).length + (_:Int))
|
||||
udf.register("random0", () => { Math.random() })
|
||||
udf.register("RANDOM1", () => { Math.random() })
|
||||
udf.register("strlenScala", (_: String).length + (_: Int))
|
||||
assert(sql("SELECT RANDOM0() FROM src LIMIT 1").head().getDouble(0) >= 0.0)
|
||||
assert(sql("SELECT RANDOm1() FROM src LIMIT 1").head().getDouble(0) >= 0.0)
|
||||
assert(sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5)
|
||||
|
|
|
@ -273,7 +273,7 @@ abstract class HiveComparisonTest
|
|||
}
|
||||
|
||||
val hiveCacheFiles = queryList.zipWithIndex.map {
|
||||
case (queryString, i) =>
|
||||
case (queryString, i) =>
|
||||
val cachedAnswerName = s"$testCaseName-$i-${getMd5(queryString)}"
|
||||
new File(answerCache, cachedAnswerName)
|
||||
}
|
||||
|
@ -304,7 +304,7 @@ abstract class HiveComparisonTest
|
|||
// other DDL has not been executed yet.
|
||||
hiveQueries.foreach(_.logical)
|
||||
val computedResults = (queryList.zipWithIndex, hiveQueries, hiveCacheFiles).zipped.map {
|
||||
case ((queryString, i), hiveQuery, cachedAnswerFile)=>
|
||||
case ((queryString, i), hiveQuery, cachedAnswerFile) =>
|
||||
try {
|
||||
// Hooks often break the harness and don't really affect our test anyway, don't
|
||||
// even try running them.
|
||||
|
|
|
@ -77,7 +77,7 @@ class HiveResolutionSuite extends HiveComparisonTest {
|
|||
|
||||
test("case insensitivity with scala reflection") {
|
||||
// Test resolution with Scala Reflection
|
||||
sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
|
||||
sparkContext.parallelize(Data(1, 2, Nested(1, 2), Seq(Nested(1, 2))) :: Nil)
|
||||
.toDF().registerTempTable("caseSensitivityTest")
|
||||
|
||||
val query = sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest")
|
||||
|
@ -88,14 +88,14 @@ class HiveResolutionSuite extends HiveComparisonTest {
|
|||
|
||||
ignore("case insensitivity with scala reflection joins") {
|
||||
// Test resolution with Scala Reflection
|
||||
sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
|
||||
sparkContext.parallelize(Data(1, 2, Nested(1, 2), Seq(Nested(1, 2))) :: Nil)
|
||||
.toDF().registerTempTable("caseSensitivityTest")
|
||||
|
||||
sql("SELECT * FROM casesensitivitytest a JOIN casesensitivitytest b ON a.a = b.a").collect()
|
||||
}
|
||||
|
||||
test("nested repeated resolution") {
|
||||
sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
|
||||
sparkContext.parallelize(Data(1, 2, Nested(1, 2), Seq(Nested(1, 2))) :: Nil)
|
||||
.toDF().registerTempTable("nestedRepeatedTest")
|
||||
assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1)
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ class HiveTableScanSuite extends HiveComparisonTest {
|
|||
|
||||
TestHive.sql(s"LOAD DATA LOCAL INPATH '$location' INTO TABLE timestamp_query_null")
|
||||
assert(TestHive.sql("SELECT time from timestamp_query_null limit 2").collect()
|
||||
=== Array(Row(java.sql.Timestamp.valueOf("2014-12-11 00:00:00")),Row(null)))
|
||||
=== Array(Row(java.sql.Timestamp.valueOf("2014-12-11 00:00:00")), Row(null)))
|
||||
TestHive.sql("DROP TABLE timestamp_query_null")
|
||||
}
|
||||
|
||||
|
|
|
@ -327,7 +327,7 @@ class SQLQuerySuite extends QueryTest {
|
|||
"org.apache.hadoop.hive.ql.io.RCFileInputFormat",
|
||||
"org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
|
||||
"org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
|
||||
"serde_p1=p1", "serde_p2=p2", "tbl_p1=p11", "tbl_p2=p22","MANAGED_TABLE"
|
||||
"serde_p1=p1", "serde_p2=p2", "tbl_p1=p11", "tbl_p2=p22", "MANAGED_TABLE"
|
||||
)
|
||||
|
||||
if (HiveShim.version =="0.13.1") {
|
||||
|
|
|
@ -38,7 +38,7 @@ case class ParquetData(intField: Int, stringField: String)
|
|||
// The data that also includes the partitioning key
|
||||
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
|
||||
|
||||
case class StructContainer(intStructField :Int, stringStructField: String)
|
||||
case class StructContainer(intStructField: Int, stringStructField: String)
|
||||
|
||||
case class ParquetDataWithComplexTypes(
|
||||
intField: Int,
|
||||
|
@ -735,7 +735,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
|
|||
val filePath = new File(tempDir, "testParquet").getCanonicalPath
|
||||
val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath
|
||||
|
||||
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
|
||||
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
|
||||
val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
|
||||
intercept[Throwable](df2.write.parquet(filePath))
|
||||
|
||||
|
|
|
@ -1142,9 +1142,9 @@ object Client extends Logging {
|
|||
logDebug("HiveMetaStore configured in localmode")
|
||||
}
|
||||
} catch {
|
||||
case e:java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return }
|
||||
case e:java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return }
|
||||
case e:Exception => { logError("Unexpected Exception " + e)
|
||||
case e: java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return }
|
||||
case e: java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return }
|
||||
case e: Exception => { logError("Unexpected Exception " + e)
|
||||
throw new RuntimeException("Unexpected exception", e)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,13 +95,13 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
|
|||
val (keys, tupleValues) = distCacheFiles.unzip
|
||||
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
|
||||
if (keys.size > 0) {
|
||||
env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
|
||||
env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
|
||||
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
|
||||
timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
|
||||
timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
|
||||
env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
|
||||
sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
|
||||
sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
|
||||
env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
|
||||
visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
|
||||
visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -112,13 +112,13 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
|
|||
val (keys, tupleValues) = distCacheArchives.unzip
|
||||
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
|
||||
if (keys.size > 0) {
|
||||
env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
|
||||
env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
|
||||
env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
|
||||
timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
|
||||
timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
|
||||
env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
|
||||
sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
|
||||
sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
|
||||
env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
|
||||
visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
|
||||
visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -160,7 +160,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
|
|||
def ancestorsHaveExecutePermissions(
|
||||
fs: FileSystem,
|
||||
path: Path,
|
||||
statCache: Map[URI, FileStatus]): Boolean = {
|
||||
statCache: Map[URI, FileStatus]): Boolean = {
|
||||
var current = path
|
||||
while (current != null) {
|
||||
// the subdirs in the path should have execute permissions for others
|
||||
|
|
|
@ -203,7 +203,7 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
|
|||
def getFieldValue2[A: ClassTag, A1: ClassTag, B](
|
||||
clazz: Class[_],
|
||||
field: String,
|
||||
defaults: => B)(mapTo: A => B)(mapTo1: A1 => B): B = {
|
||||
defaults: => B)(mapTo: A => B)(mapTo1: A1 => B): B = {
|
||||
Try(clazz.getField(field)).map(_.get(null)).map {
|
||||
case v: A => mapTo(v)
|
||||
case v1: A1 => mapTo1(v1)
|
||||
|
|
Loading…
Reference in a new issue