[SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException
- Rewind ByteBuffer before making ByteString (This fixes a bug introduced in #3849 / SPARK-4014) Author: Jongyoul Lee <jongyoul@gmail.com> Closes #4119 from jongyoul/SPARK-5333 and squashes the following commits: c6693a8 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - changed logDebug location 4141f58 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Added license information 2190606 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Adjusted imported libraries b7f5517 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Rewind ByteBuffer before making ByteString
This commit is contained in:
parent
4afad9c770
commit
9d9294aebf
|
@ -21,24 +21,29 @@ import java.nio.ByteBuffer
|
|||
|
||||
import org.apache.mesos.protobuf.ByteString
|
||||
|
||||
import org.apache.spark.Logging
|
||||
|
||||
/**
|
||||
* Wrapper for serializing the data sent when launching Mesos tasks.
|
||||
*/
|
||||
private[spark] case class MesosTaskLaunchData(
|
||||
serializedTask: ByteBuffer,
|
||||
attemptNumber: Int) {
|
||||
attemptNumber: Int) extends Logging {
|
||||
|
||||
def toByteString: ByteString = {
|
||||
val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit)
|
||||
dataBuffer.putInt(attemptNumber)
|
||||
dataBuffer.put(serializedTask)
|
||||
dataBuffer.rewind
|
||||
logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]")
|
||||
ByteString.copyFrom(dataBuffer)
|
||||
}
|
||||
}
|
||||
|
||||
private[spark] object MesosTaskLaunchData {
|
||||
private[spark] object MesosTaskLaunchData extends Logging {
|
||||
def fromByteString(byteString: ByteString): MesosTaskLaunchData = {
|
||||
val byteBuffer = byteString.asReadOnlyByteBuffer()
|
||||
logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]")
|
||||
val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes
|
||||
val serializedTask = byteBuffer.slice() // subsequence starting at the current position
|
||||
MesosTaskLaunchData(serializedTask, attemptNumber)
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler.mesos
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData
|
||||
|
||||
class MesosTaskLaunchDataSuite extends FunSuite {
|
||||
test("serialize and deserialize data must be same") {
|
||||
val serializedTask = ByteBuffer.allocate(40)
|
||||
(Range(100, 110).map(serializedTask.putInt(_)))
|
||||
serializedTask.rewind
|
||||
val attemptNumber = 100
|
||||
val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
|
||||
serializedTask.rewind
|
||||
val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
|
||||
assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
|
||||
assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue