[SPARK-22660][BUILD] Use position() and limit() to fix ambiguity issue in scala-2.12

…a-2.12 and JDK9

## What changes were proposed in this pull request?
Some compile error after upgrading to scala-2.12
```javascript
spark_source/core/src/main/scala/org/apache/spark/executor/Executor.scala:455: ambiguous reference to overloaded definition, method limit in class ByteBuffer of type (x$1: Int)java.nio.ByteBuffer
method limit in class Buffer of type ()Int
match expected type ?
     val resultSize = serializedDirectResult.limit
error
```
The limit method was moved from ByteBuffer to the superclass Buffer and it can no longer be called without (). The same reason for position method.

```javascript
/home/zly/prj/oss/jdk9_HOS_SOURCE/spark_source/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala:427: ambiguous reference to overloaded definition, [error] both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit [error] and  method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit [error] match argument types (java.util.Map[String,String])
 [error]       props.putAll(outputSerdeProps.toMap.asJava)
 [error]             ^
 ```
This is because the key type is Object instead of String which is unsafe.

## How was this patch tested?

running tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: kellyzly <kellyzly@126.com>

Closes #19854 from kellyzly/SPARK-22660.
This commit is contained in:
kellyzly 2017-12-07 10:04:04 -06:00 committed by Sean Owen
parent b79071910e
commit f41c0a93fd
12 changed files with 22 additions and 15 deletions

View file

@ -99,7 +99,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
private def calcChecksum(block: ByteBuffer): Int = {
val adler = new Adler32()
if (block.hasArray) {
adler.update(block.array, block.arrayOffset + block.position, block.limit - block.position)
adler.update(block.array, block.arrayOffset + block.position(), block.limit()
- block.position())
} else {
val bytes = new Array[Byte](block.remaining())
block.duplicate.get(bytes)

View file

@ -452,7 +452,7 @@ private[spark] class Executor(
// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
val resultSize = serializedDirectResult.limit()
// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {

View file

@ -287,13 +287,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
if (serializedTask.limit() >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)

View file

@ -54,7 +54,7 @@ class ByteBufferInputStream(private var buffer: ByteBuffer)
override def skip(bytes: Long): Long = {
if (buffer != null) {
val amountToSkip = math.min(bytes, buffer.remaining).toInt
buffer.position(buffer.position + amountToSkip)
buffer.position(buffer.position() + amountToSkip)
if (buffer.remaining() == 0) {
cleanUp()
}

View file

@ -65,7 +65,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
for (bytes <- getChunks()) {
while (bytes.remaining() > 0) {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position + ioSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
}
}
@ -206,7 +206,7 @@ private[spark] class ChunkedByteBufferInputStream(
override def skip(bytes: Long): Long = {
if (currentChunk != null) {
val amountToSkip = math.min(bytes, currentChunk.remaining).toInt
currentChunk.position(currentChunk.position + amountToSkip)
currentChunk.position(currentChunk.position() + amountToSkip)
if (currentChunk.remaining() == 0) {
if (chunks.hasNext) {
currentChunk = chunks.next()

View file

@ -199,7 +199,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
assert(ser.serialize(t).limit < 100)
assert(ser.serialize(t).limit() < 100)
}
check(1 to 1000000)
check(1 to 1000000 by 2)

View file

@ -118,7 +118,7 @@ class DiskStoreSuite extends SparkFunSuite {
val chunks = chunkedByteBuffer.chunks
assert(chunks.size === 2)
for (chunk <- chunks) {
assert(chunk.limit === 10 * 1024)
assert(chunk.limit() === 10 * 1024)
}
val e = intercept[IllegalArgumentException]{

View file

@ -296,7 +296,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("replica.socket.timeout.ms", "1500")
props.put("delete.topic.enable", "true")
props.put("offsets.topic.num.partitions", "1")
props.putAll(withBrokerProps.asJava)
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
withBrokerProps.foreach { case (k, v) => props.put(k, v) }
props
}

View file

@ -35,7 +35,7 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1
pos = 0
underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4)
underlyingBuffer.position(underlyingBuffer.position() + 4 + nullCount * 4)
super.initialize()
}

View file

@ -112,7 +112,7 @@ private[columnar] case object PassThrough extends CompressionScheme {
var nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else capacity
var pos = 0
var seenNulls = 0
var bufferPos = buffer.position
var bufferPos = buffer.position()
while (pos < capacity) {
if (pos != nextNullIndex) {
val len = nextNullIndex - pos

View file

@ -412,7 +412,9 @@ case class HiveScriptIOSchema (
propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames)
val properties = new Properties()
properties.putAll(propsMap.asJava)
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
propsMap.foreach { case (k, v) => properties.put(k, v) }
serde.initialize(null, properties)
serde
@ -424,7 +426,9 @@ case class HiveScriptIOSchema (
recordReaderClass.map { klass =>
val instance = Utils.classForName(klass).newInstance().asInstanceOf[RecordReader]
val props = new Properties()
props.putAll(outputSerdeProps.toMap.asJava)
// Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
outputSerdeProps.toMap.foreach { case (k, v) => props.put(k, v) }
instance.initialize(inputStream, conf, props)
instance
}

View file

@ -98,7 +98,7 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
/** Read a buffer fully from a given Channel */
private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) {
while (dest.position < dest.limit) {
while (dest.position() < dest.limit()) {
if (channel.read(dest) == -1) {
throw new EOFException("End of channel")
}