diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java index 9c551ab19e..f121b1cd74 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java @@ -40,6 +40,13 @@ public class ByteArrayMethods { } } + // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat smaller. + // Be conservative and lower the cap a little. + // Refer to "http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l229" + // This value is word rounded. Use this value if the allocated byte arrays are used to store other + // types rather than bytes. + public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15; + private static final boolean unaligned = Platform.unaligned(); /** * Optimized byte array equality check for byte arrays. diff --git a/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java b/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java index b8c2294c7b..ee6d9f75ac 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java @@ -17,6 +17,8 @@ package org.apache.spark.unsafe.map; +import org.apache.spark.unsafe.array.ByteArrayMethods; + /** * Interface that defines how we can grow the size of a hash map when it is over a threshold. */ @@ -31,9 +33,7 @@ public interface HashMapGrowthStrategy { class Doubling implements HashMapGrowthStrategy { - // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat - // smaller. Be conservative and lower the cap a little. - private static final int ARRAY_MAX = Integer.MAX_VALUE - 8; + private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; @Override public int nextCapacity(int currentCapacity) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala index b755e5da51..e17a9de97e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala @@ -19,6 +19,8 @@ package org.apache.spark.util.collection import java.util.Comparator +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.collection.WritablePartitionedPairCollection._ /** @@ -96,7 +98,5 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) } private object PartitionedPairBuffer { - // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat - // smaller. Be conservative and lower the cap a little. - val MAXIMUM_CAPACITY: Int = (Int.MaxValue - 8) / 2 + val MAXIMUM_CAPACITY: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 2 } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index b06f2e26a4..b52da4c0c8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -100,6 +100,8 @@ class SparkSubmitSuite with TimeLimits with TestPrematureExit { + import SparkSubmitSuite._ + override def beforeEach() { super.beforeEach() System.setProperty("spark.testing", "true") @@ -974,30 +976,6 @@ class SparkSubmitSuite } } - // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. - private def runSparkSubmit(args: Seq[String]): Unit = { - val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) - val sparkSubmitFile = if (Utils.isWindows) { - new File("..\\bin\\spark-submit.cmd") - } else { - new File("../bin/spark-submit") - } - val process = Utils.executeCommand( - Seq(sparkSubmitFile.getCanonicalPath) ++ args, - new File(sparkHome), - Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) - - try { - val exitCode = failAfter(60 seconds) { process.waitFor() } - if (exitCode != 0) { - fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.") - } - } finally { - // Ensure we still kill the process in case it timed out - process.destroy() - } - } - private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = { val tmpDir = Utils.createTempDir() @@ -1020,6 +998,32 @@ class SparkSubmitSuite } } +object SparkSubmitSuite extends SparkFunSuite with TimeLimits { + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. + def runSparkSubmit(args: Seq[String], root: String = ".."): Unit = { + val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) + val sparkSubmitFile = if (Utils.isWindows) { + new File(s"$root\\bin\\spark-submit.cmd") + } else { + new File(s"$root/bin/spark-submit") + } + val process = Utils.executeCommand( + Seq(sparkSubmitFile.getCanonicalPath) ++ args, + new File(sparkHome), + Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) + + try { + val exitCode = failAfter(60 seconds) { process.waitFor() } + if (exitCode != 0) { + fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.") + } + } finally { + // Ensure we still kill the process in case it timed out + process.destroy() + } + } +} + object JarCreationTest extends Logging { def main(args: Array[String]) { Utils.configTestLog4j("INFO") diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index 971d19973f..259976118c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.ByteArrayMethods; /** * A helper class to manage the data buffer for an unsafe row. The data buffer can grow and @@ -36,9 +37,7 @@ import org.apache.spark.unsafe.Platform; */ public class BufferHolder { - // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat - // smaller. Be conservative and lower the cap a little. - private static final int ARRAY_MAX = Integer.MAX_VALUE - 8; + private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; public byte[] buffer; public int cursor = Platform.BYTE_ARRAY_OFFSET; @@ -51,7 +50,7 @@ public class BufferHolder { public BufferHolder(UnsafeRow row, int initialSize) { int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields()); - if (row.numFields() > (Integer.MAX_VALUE - initialSize - bitsetWidthInBytes) / 8) { + if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) { throw new UnsupportedOperationException( "Cannot create BufferHolder for input UnsafeRow because there are " + "too many fields (number of fields: " + row.numFields() + ")"); diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSutie.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSutie.scala new file mode 100644 index 0000000000..1167d2f3f3 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSutie.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import org.scalatest.{BeforeAndAfterEach, Matchers} +import org.scalatest.concurrent.Timeouts + +import org.apache.spark.{SparkFunSuite, TestUtils} +import org.apache.spark.deploy.SparkSubmitSuite +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.array.ByteArrayMethods +import org.apache.spark.util.ResetSystemProperties + +// A test for growing the buffer holder to nearly 2GB. Due to the heap size limitation of the Spark +// unit tests JVM, the actually test code is running as a submit job. +class BufferHolderSparkSubmitSuite + extends SparkFunSuite + with Matchers + with BeforeAndAfterEach + with ResetSystemProperties + with Timeouts { + + test("SPARK-22222: Buffer holder should be able to allocate memory larger than 1GB") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + + val argsForSparkSubmit = Seq( + "--class", BufferHolderSparkSubmitSuite.getClass.getName.stripSuffix("$"), + "--name", "SPARK-22222", + "--master", "local-cluster[2,1,1024]", + "--driver-memory", "4g", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--conf", "spark.driver.extraJavaOptions=-ea", + unusedJar.toString) + SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..") + } +} + +object BufferHolderSparkSubmitSuite { + + def main(args: Array[String]): Unit = { + + val ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + val holder = new BufferHolder(new UnsafeRow(1000)) + + holder.reset() + holder.grow(roundToWord(ARRAY_MAX / 2)) + + holder.reset() + holder.grow(roundToWord(ARRAY_MAX / 2 + 8)) + + holder.reset() + holder.grow(roundToWord(Integer.MAX_VALUE / 2)) + + holder.reset() + holder.grow(roundToWord(Integer.MAX_VALUE)) + } + + private def roundToWord(len: Int): Int = { + ByteArrayMethods.roundNumberOfBytesToNearestWord(len) + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index da72954ddc..d3a14b9d8b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.types.UTF8String; /** @@ -595,7 +596,7 @@ public abstract class WritableColumnVector extends ColumnVector { * Upper limit for the maximum capacity for this column. */ @VisibleForTesting - protected int MAX_CAPACITY = Integer.MAX_VALUE - 8; + protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; /** * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.