[SPARK-22222][CORE] Fix the ARRAY_MAX in BufferHolder and add a test

## What changes were proposed in this pull request?

We should not break the assumption that the length of the allocated byte array is word rounded:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L170
So we want to use `Integer.MAX_VALUE - 15` instead of `Integer.MAX_VALUE - 8` as the upper bound of an allocated byte array.

cc: srowen gatorsmile
## How was this patch tested?

Since the Spark unit test JVM has less than 1GB heap, here we run the test code as a submit job, so it can run on a JVM has 4GB memory.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Feng Liu <fengliu@databricks.com>

Closes #19460 from liufengdb/fix_array_max.
This commit is contained in:
Feng Liu 2017-10-09 21:34:37 -07:00 committed by gatorsmile
parent 71c2b81aa0
commit bebd2e1ce1
7 changed files with 124 additions and 35 deletions

View file

@ -40,6 +40,13 @@ public class ByteArrayMethods {
}
}
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat smaller.
// Be conservative and lower the cap a little.
// Refer to "http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l229"
// This value is word rounded. Use this value if the allocated byte arrays are used to store other
// types rather than bytes.
public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
private static final boolean unaligned = Platform.unaligned();
/**
* Optimized byte array equality check for byte arrays.

View file

@ -17,6 +17,8 @@
package org.apache.spark.unsafe.map;
import org.apache.spark.unsafe.array.ByteArrayMethods;
/**
* Interface that defines how we can grow the size of a hash map when it is over a threshold.
*/
@ -31,9 +33,7 @@ public interface HashMapGrowthStrategy {
class Doubling implements HashMapGrowthStrategy {
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
@Override
public int nextCapacity(int currentCapacity) {

View file

@ -19,6 +19,8 @@ package org.apache.spark.util.collection
import java.util.Comparator
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.collection.WritablePartitionedPairCollection._
/**
@ -96,7 +98,5 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
}
private object PartitionedPairBuffer {
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
val MAXIMUM_CAPACITY: Int = (Int.MaxValue - 8) / 2
val MAXIMUM_CAPACITY: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 2
}

View file

@ -100,6 +100,8 @@ class SparkSubmitSuite
with TimeLimits
with TestPrematureExit {
import SparkSubmitSuite._
override def beforeEach() {
super.beforeEach()
System.setProperty("spark.testing", "true")
@ -974,30 +976,6 @@ class SparkSubmitSuite
}
}
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
private def runSparkSubmit(args: Seq[String]): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val sparkSubmitFile = if (Utils.isWindows) {
new File("..\\bin\\spark-submit.cmd")
} else {
new File("../bin/spark-submit")
}
val process = Utils.executeCommand(
Seq(sparkSubmitFile.getCanonicalPath) ++ args,
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
try {
val exitCode = failAfter(60 seconds) { process.waitFor() }
if (exitCode != 0) {
fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.")
}
} finally {
// Ensure we still kill the process in case it timed out
process.destroy()
}
}
private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
val tmpDir = Utils.createTempDir()
@ -1020,6 +998,32 @@ class SparkSubmitSuite
}
}
object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String], root: String = ".."): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val sparkSubmitFile = if (Utils.isWindows) {
new File(s"$root\\bin\\spark-submit.cmd")
} else {
new File(s"$root/bin/spark-submit")
}
val process = Utils.executeCommand(
Seq(sparkSubmitFile.getCanonicalPath) ++ args,
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
try {
val exitCode = failAfter(60 seconds) { process.waitFor() }
if (exitCode != 0) {
fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.")
}
} finally {
// Ensure we still kill the process in case it timed out
process.destroy()
}
}
}
object JarCreationTest extends Logging {
def main(args: Array[String]) {
Utils.configTestLog4j("INFO")

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
/**
* A helper class to manage the data buffer for an unsafe row. The data buffer can grow and
@ -36,9 +37,7 @@ import org.apache.spark.unsafe.Platform;
*/
public class BufferHolder {
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
public byte[] buffer;
public int cursor = Platform.BYTE_ARRAY_OFFSET;
@ -51,7 +50,7 @@ public class BufferHolder {
public BufferHolder(UnsafeRow row, int initialSize) {
int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
if (row.numFields() > (Integer.MAX_VALUE - initialSize - bitsetWidthInBytes) / 8) {
if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {
throw new UnsupportedOperationException(
"Cannot create BufferHolder for input UnsafeRow because there are " +
"too many fields (number of fields: " + row.numFields() + ")");

View file

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions.codegen
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Timeouts
import org.apache.spark.{SparkFunSuite, TestUtils}
import org.apache.spark.deploy.SparkSubmitSuite
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.ResetSystemProperties
// A test for growing the buffer holder to nearly 2GB. Due to the heap size limitation of the Spark
// unit tests JVM, the actually test code is running as a submit job.
class BufferHolderSparkSubmitSuite
extends SparkFunSuite
with Matchers
with BeforeAndAfterEach
with ResetSystemProperties
with Timeouts {
test("SPARK-22222: Buffer holder should be able to allocate memory larger than 1GB") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val argsForSparkSubmit = Seq(
"--class", BufferHolderSparkSubmitSuite.getClass.getName.stripSuffix("$"),
"--name", "SPARK-22222",
"--master", "local-cluster[2,1,1024]",
"--driver-memory", "4g",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
"--conf", "spark.driver.extraJavaOptions=-ea",
unusedJar.toString)
SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..")
}
}
object BufferHolderSparkSubmitSuite {
def main(args: Array[String]): Unit = {
val ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
val holder = new BufferHolder(new UnsafeRow(1000))
holder.reset()
holder.grow(roundToWord(ARRAY_MAX / 2))
holder.reset()
holder.grow(roundToWord(ARRAY_MAX / 2 + 8))
holder.reset()
holder.grow(roundToWord(Integer.MAX_VALUE / 2))
holder.reset()
holder.grow(roundToWord(Integer.MAX_VALUE))
}
private def roundToWord(len: Int): Int = {
ByteArrayMethods.roundNumberOfBytesToNearestWord(len)
}
}

View file

@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.types.UTF8String;
/**
@ -595,7 +596,7 @@ public abstract class WritableColumnVector extends ColumnVector {
* Upper limit for the maximum capacity for this column.
*/
@VisibleForTesting
protected int MAX_CAPACITY = Integer.MAX_VALUE - 8;
protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
/**
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.