diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1b67e99064..eafe3b17c2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -372,7 +372,7 @@ abstract class RDD[T: ClassTag]( */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) - new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) + new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) } /** @@ -381,7 +381,7 @@ abstract class RDD[T: ClassTag]( */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) - new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) + new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF)) } /** @@ -391,7 +391,7 @@ abstract class RDD[T: ClassTag]( val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, - (context, pid, iter) => iter.filter(cleanF), + (_, _, iter) => iter.filter(cleanF), preservesPartitioning = true) } @@ -402,16 +402,16 @@ abstract class RDD[T: ClassTag]( def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = { // Create an instance of external append only map which ignores values. val map = new ExternalAppendOnlyMap[T, Null, Null]( - createCombiner = value => null, + createCombiner = _ => null, mergeValue = (a, b) => a, mergeCombiners = (a, b) => a) map.insertAll(partition.map(_ -> null)) map.iterator.map(_._1) } partitioner match { - case Some(p) if numPartitions == partitions.length => + case Some(_) if numPartitions == partitions.length => mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true) - case _ => map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) + case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1) } } @@ -684,7 +684,7 @@ abstract class RDD[T: ClassTag]( * Return an RDD created by coalescing all elements within each partition into an array. */ def glom(): RDD[Array[T]] = withScope { - new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) + new MapPartitionsRDD[Array[T], T](this, (_, _, iter) => Iterator(iter.toArray)) } /** @@ -814,7 +814,7 @@ abstract class RDD[T: ClassTag]( val cleanedF = sc.clean(f) new MapPartitionsRDD( this, - (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), + (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) } @@ -836,7 +836,7 @@ abstract class RDD[T: ClassTag]( isOrderSensitive: Boolean = false): RDD[U] = withScope { new MapPartitionsRDD( this, - (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter), + (_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter), preservesPartitioning = preservesPartitioning, isOrderSensitive = isOrderSensitive) } @@ -849,7 +849,7 @@ abstract class RDD[T: ClassTag]( preservesPartitioning: Boolean = false): RDD[U] = withScope { new MapPartitionsRDD( this, - (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter), + (_: TaskContext, _: Int, iter: Iterator[T]) => f(iter), preservesPartitioning) } @@ -866,7 +866,7 @@ abstract class RDD[T: ClassTag]( val cleanedF = sc.clean(f) new MapPartitionsRDD( this, - (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter), + (_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter), preservesPartitioning) } @@ -1040,7 +1040,7 @@ abstract class RDD[T: ClassTag]( } } var jobResult: Option[T] = None - val mergeResult = (index: Int, taskResult: Option[T]) => { + val mergeResult = (_: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) @@ -1110,7 +1110,7 @@ abstract class RDD[T: ClassTag]( var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanOp = sc.clean(op) val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp) - val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult) + val mergeResult = (_: Int, taskResult: T) => jobResult = op(jobResult, taskResult) sc.runJob(this, foldPartition, mergeResult) jobResult } @@ -1136,7 +1136,7 @@ abstract class RDD[T: ClassTag]( val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) - val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) + val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult } @@ -1201,7 +1201,7 @@ abstract class RDD[T: ClassTag]( timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope { require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]") - val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) => + val countElements: (TaskContext, Iterator[T]) => Long = { (_, iter) => var result = 0L while (iter.hasNext) { result += 1L @@ -1244,7 +1244,7 @@ abstract class RDD[T: ClassTag]( if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValueApprox() does not support arrays") } - val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (ctx, iter) => + val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (_, iter) => val map = new OpenHashMap[T, Long] iter.foreach { t => map.changeValue(t, 1L, _ + 1L) diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 62824a5bec..e7eef8ec51 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -71,7 +71,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { taskAccum.value.get.asInstanceOf[Long] } // Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions - assert(taskAccumValues.sorted === (1L to numPartitions).toSeq) + assert(taskAccumValues.sorted === (1L to numPartitions)) } rdd.count() listener.awaitNextJobCompletion() diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index b0009e0170..9a6e302656 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -404,7 +404,7 @@ The following configurations are optional: spark-kafka-source streaming and batch Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming - queries. If "kafka.group.id" is set, this option will be ignored. + queries. If "kafka.group.id" is set, this option will be ignored. kafka.group.id @@ -421,7 +421,7 @@ The following configurations are optional: same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to - be very small. When this is set, option "groupIdPrefix" will be ignored. + be very small. When this is set, option "groupIdPrefix" will be ignored. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java index 60ac69b059..58973e7a3a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java @@ -143,7 +143,7 @@ public final class UnsafeMapData extends MapData implements Externalizable, Kryo } @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + public void readExternal(ObjectInput in) throws IOException { this.baseOffset = BYTE_ARRAY_OFFSET; this.sizeInBytes = in.readInt(); this.baseObject = new byte[sizeInBytes]; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 932c364737..7164b6b82a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -177,7 +177,7 @@ package object expressions { a => (resolver(dbPart, a.qualifier.head) && resolver(tblPart, a.qualifier.last)) } (attributes, nestedFields) - case all => + case _ => (Seq.empty, Seq.empty) } @@ -197,7 +197,7 @@ package object expressions { resolver(qualifier, a.qualifier.last) } (attributes, nestedFields) - case all => + case _ => (Seq.empty[Attribute], Seq.empty[String]) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index b6ca52f1d9..a171885471 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -330,9 +330,9 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { case null => assert(result.asInstanceOf[ArrayData].array.toSeq == expected) case l if classOf[java.util.List[_]].isAssignableFrom(l) => - assert(result.asInstanceOf[java.util.List[_]].asScala.toSeq == expected) + assert(result.asInstanceOf[java.util.List[_]].asScala == expected) case s if classOf[Seq[_]].isAssignableFrom(s) => - assert(result.asInstanceOf[Seq[_]].toSeq == expected) + assert(result.asInstanceOf[Seq[_]] == expected) case s if classOf[scala.collection.Set[_]].isAssignableFrom(s) => assert(result.asInstanceOf[scala.collection.Set[_]] == expected.toSet) } @@ -532,8 +532,6 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { private def scalaMapSerializerFor[T: TypeTag, U: TypeTag](inputObject: Expression): Expression = { import org.apache.spark.sql.catalyst.ScalaReflection._ - val curId = new java.util.concurrent.atomic.AtomicInteger() - def kvSerializerFor[V: TypeTag](inputObject: Expression): Expression = localTypeOf[V].dealias match { case t if t <:< localTypeOf[java.lang.Integer] => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala index 61ce63faf0..0b9e023b0b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala @@ -40,7 +40,6 @@ class UnsafeArraySuite extends SparkFunSuite { val dateArray = Array( DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1")).get, DateTimeUtils.stringToDate(UTF8String.fromString("2016-7-26")).get) - private def defaultTz = DateTimeUtils.defaultTimeZone() private def defaultZoneId = ZoneId.systemDefault() val timestampArray = Array( DateTimeUtils.stringToTimestamp( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index daac207caf..2b39bda97b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -51,13 +51,13 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sparkSession: S override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { def unhandled(filter: Filter): Boolean = { filter match { - case EqualTo(col, v) => col == "b" - case EqualNullSafe(col, v) => col == "b" - case LessThan(col, v: Int) => col == "b" - case LessThanOrEqual(col, v: Int) => col == "b" - case GreaterThan(col, v: Int) => col == "b" - case GreaterThanOrEqual(col, v: Int) => col == "b" - case In(col, values) => col == "b" + case EqualTo(col, _) => col == "b" + case EqualNullSafe(col, _) => col == "b" + case LessThan(col, _: Int) => col == "b" + case LessThanOrEqual(col, _: Int) => col == "b" + case GreaterThan(col, _: Int) => col == "b" + case GreaterThanOrEqual(col, _: Int) => col == "b" + case In(col, _) => col == "b" case IsNull(col) => col == "b" case IsNotNull(col) => col == "b" case Not(pred) => unhandled(pred) @@ -107,7 +107,7 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sparkSession: S case StringEndsWith("c", v) => _.endsWith(v) case StringContains("c", v) => _.contains(v) case EqualTo("c", v: String) => _.equals(v) - case EqualTo("c", v: UTF8String) => sys.error("UTF8String should not appear in filters") + case EqualTo("c", _: UTF8String) => sys.error("UTF8String should not appear in filters") case In("c", values) => (s: String) => values.map(_.asInstanceOf[String]).toSet.contains(s) case _ => (c: String) => true } @@ -152,39 +152,39 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic sqlTest( "SELECT * FROM oneToTenFiltered", (1 to 10).map(i => Row(i, i * 2, (i - 1 + 'a').toChar.toString * 5 - + (i - 1 + 'a').toChar.toString.toUpperCase(Locale.ROOT) * 5)).toSeq) + + (i - 1 + 'a').toChar.toString.toUpperCase(Locale.ROOT) * 5))) sqlTest( "SELECT a, b FROM oneToTenFiltered", - (1 to 10).map(i => Row(i, i * 2)).toSeq) + (1 to 10).map(i => Row(i, i * 2))) sqlTest( "SELECT b, a FROM oneToTenFiltered", - (1 to 10).map(i => Row(i * 2, i)).toSeq) + (1 to 10).map(i => Row(i * 2, i))) sqlTest( "SELECT a FROM oneToTenFiltered", - (1 to 10).map(i => Row(i)).toSeq) + (1 to 10).map(i => Row(i))) sqlTest( "SELECT b FROM oneToTenFiltered", - (1 to 10).map(i => Row(i * 2)).toSeq) + (1 to 10).map(i => Row(i * 2))) sqlTest( "SELECT a * 2 FROM oneToTenFiltered", - (1 to 10).map(i => Row(i * 2)).toSeq) + (1 to 10).map(i => Row(i * 2))) sqlTest( "SELECT A AS b FROM oneToTenFiltered", - (1 to 10).map(i => Row(i)).toSeq) + (1 to 10).map(i => Row(i))) sqlTest( "SELECT x.b, y.a FROM oneToTenFiltered x JOIN oneToTenFiltered y ON x.a = y.b", - (1 to 5).map(i => Row(i * 4, i)).toSeq) + (1 to 5).map(i => Row(i * 4, i))) sqlTest( "SELECT x.a, y.b FROM oneToTenFiltered x JOIN oneToTenFiltered y ON x.a = y.b", - (2 to 10 by 2).map(i => Row(i, i)).toSeq) + (2 to 10 by 2).map(i => Row(i, i))) sqlTest( "SELECT a, b FROM oneToTenFiltered WHERE a = 1", @@ -208,11 +208,11 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic sqlTest( "SELECT a, b FROM oneToTenFiltered WHERE a IS NOT NULL", - (1 to 10).map(i => Row(i, i * 2)).toSeq) + (1 to 10).map(i => Row(i, i * 2))) sqlTest( "SELECT a, b FROM oneToTenFiltered WHERE a < 5 AND a > 1", - (2 to 4).map(i => Row(i, i * 2)).toSeq) + (2 to 4).map(i => Row(i, i * 2))) sqlTest( "SELECT a, b FROM oneToTenFiltered WHERE a < 3 OR a > 8", @@ -220,7 +220,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic sqlTest( "SELECT a, b FROM oneToTenFiltered WHERE NOT (a < 6)", - (6 to 10).map(i => Row(i, i * 2)).toSeq) + (6 to 10).map(i => Row(i, i * 2))) sqlTest( "SELECT a, b, c FROM oneToTenFiltered WHERE c like 'c%'",