From 9d9294aebf7208a76f43d8fc5a0489a83d7215f4 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Tue, 20 Jan 2015 10:17:29 -0800 Subject: [PATCH] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Rewind ByteBuffer before making ByteString (This fixes a bug introduced in #3849 / SPARK-4014) Author: Jongyoul Lee Closes #4119 from jongyoul/SPARK-5333 and squashes the following commits: c6693a8 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - changed logDebug location 4141f58 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Added license information 2190606 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Adjusted imported libraries b7f5517 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Rewind ByteBuffer before making ByteString --- .../cluster/mesos/MesosTaskLaunchData.scala | 9 ++++- .../mesos/MesosTaskLaunchDataSuite.scala | 38 +++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala index 4416ce92ad..5e7e6567a3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala @@ -21,24 +21,29 @@ import java.nio.ByteBuffer import org.apache.mesos.protobuf.ByteString +import org.apache.spark.Logging + /** * Wrapper for serializing the data sent when launching Mesos tasks. */ private[spark] case class MesosTaskLaunchData( serializedTask: ByteBuffer, - attemptNumber: Int) { + attemptNumber: Int) extends Logging { def toByteString: ByteString = { val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit) dataBuffer.putInt(attemptNumber) dataBuffer.put(serializedTask) + dataBuffer.rewind + logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]") ByteString.copyFrom(dataBuffer) } } -private[spark] object MesosTaskLaunchData { +private[spark] object MesosTaskLaunchData extends Logging { def fromByteString(byteString: ByteString): MesosTaskLaunchData = { val byteBuffer = byteString.asReadOnlyByteBuffer() + logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]") val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes val serializedTask = byteBuffer.slice() // subsequence starting at the current position MesosTaskLaunchData(serializedTask, attemptNumber) diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala new file mode 100644 index 0000000000..86a42a7398 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.mesos + +import java.nio.ByteBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData + +class MesosTaskLaunchDataSuite extends FunSuite { + test("serialize and deserialize data must be same") { + val serializedTask = ByteBuffer.allocate(40) + (Range(100, 110).map(serializedTask.putInt(_))) + serializedTask.rewind + val attemptNumber = 100 + val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString + serializedTask.rewind + val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString) + assert(mesosTaskLaunchData.attemptNumber == attemptNumber) + assert(mesosTaskLaunchData.serializedTask.equals(serializedTask)) + } +}