[SPARK-24421][BUILD][CORE] Accessing sun.misc.Cleaner in JDK11

…. Other related changes to get JDK 11 working, to test

## What changes were proposed in this pull request?

- Access `sun.misc.Cleaner` (Java 8) and `jdk.internal.ref.Cleaner` (JDK 9+) by reflection (note: the latter only works if illegal reflective access is allowed)
- Access `sun.misc.Unsafe.invokeCleaner` in Java 9+ instead of `sun.misc.Cleaner` (Java 8)

In order to test anything on JDK 11, I also fixed a few small things, which I include here:

- Fix minor JDK 11 compile issues
- Update scala plugin, Jetty for JDK 11, to facilitate tests too

This doesn't mean JDK 11 tests all pass now, but lots do. Note also that the JDK 9+ solution for the Cleaner has a big caveat.

## How was this patch tested?

Existing tests. Manually tested JDK 11 build and tests, and tests covering this change appear to pass. All Java 8 tests should still pass, but this change alone does not achieve full JDK 11 compatibility.

Closes #22993 from srowen/SPARK-24421.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Sean Owen 2018-11-14 12:52:54 -08:00
parent 922dfe4865
commit 722369ee55
10 changed files with 123 additions and 39 deletions

View file

@ -19,10 +19,10 @@ package org.apache.spark.unsafe;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import sun.misc.Cleaner;
import sun.misc.Unsafe;
public final class Platform {
@ -67,6 +67,60 @@ public final class Platform {
unaligned = _unaligned;
}
// Access fields and constructors once and store them, for performance:
private static final Constructor<?> DBB_CONSTRUCTOR;
private static final Field DBB_CLEANER_FIELD;
static {
try {
Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
constructor.setAccessible(true);
Field cleanerField = cls.getDeclaredField("cleaner");
cleanerField.setAccessible(true);
DBB_CONSTRUCTOR = constructor;
DBB_CLEANER_FIELD = cleanerField;
} catch (ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e) {
throw new IllegalStateException(e);
}
}
private static final Method CLEANER_CREATE_METHOD;
static {
// The implementation of Cleaner changed from JDK 8 to 9
// Split java.version on non-digit chars:
int majorVersion = Integer.parseInt(System.getProperty("java.version").split("\\D+")[0]);
String cleanerClassName;
if (majorVersion < 9) {
cleanerClassName = "sun.misc.Cleaner";
} else {
cleanerClassName = "jdk.internal.ref.Cleaner";
}
try {
Class<?> cleanerClass = Class.forName(cleanerClassName);
Method createMethod = cleanerClass.getMethod("create", Object.class, Runnable.class);
// Accessing jdk.internal.ref.Cleaner should actually fail by default in JDK 9+,
// unfortunately, unless the user has allowed access with something like
// --add-opens java.base/java.lang=ALL-UNNAMED If not, we can't really use the Cleaner
// hack below. It doesn't break, just means the user might run into the default JVM limit
// on off-heap memory and increase it or set the flag above. This tests whether it's
// available:
try {
createMethod.invoke(null, null, null);
} catch (IllegalAccessException e) {
// Don't throw an exception, but can't log here?
createMethod = null;
} catch (InvocationTargetException ite) {
// shouldn't happen; report it
throw new IllegalStateException(ite);
}
CLEANER_CREATE_METHOD = createMethod;
} catch (ClassNotFoundException | NoSuchMethodException e) {
throw new IllegalStateException(e);
}
}
/**
* @return true when running JVM is having sun's Unsafe package available in it and underlying
* system having unaligned-access capability.
@ -159,18 +213,18 @@ public final class Platform {
* MaxDirectMemorySize limit (the default limit is too low and we do not want to require users
* to increase it).
*/
@SuppressWarnings("unchecked")
public static ByteBuffer allocateDirectBuffer(int size) {
try {
Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
constructor.setAccessible(true);
Field cleanerField = cls.getDeclaredField("cleaner");
cleanerField.setAccessible(true);
long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
cleanerField.set(buffer, cleaner);
ByteBuffer buffer = (ByteBuffer) DBB_CONSTRUCTOR.newInstance(memory, size);
if (CLEANER_CREATE_METHOD != null) {
try {
DBB_CLEANER_FIELD.set(buffer,
CLEANER_CREATE_METHOD.invoke(null, buffer, (Runnable) () -> freeMemory(memory)));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(e);
}
}
return buffer;
} catch (Exception e) {
throwException(e);

View file

@ -22,9 +22,12 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.Map
import scala.collection.mutable
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import sun.misc.Unsafe
import sun.nio.ch.DirectBuffer
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
/**
* Storage information for each BlockManager.
@ -193,6 +196,31 @@ private[spark] class StorageStatus(
/** Helper methods for storage-related objects. */
private[spark] object StorageUtils extends Logging {
// In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible
// to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to
// jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with
// reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is
// still accessible with reflection.
private val bufferCleaner: DirectBuffer => Unit =
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
val cleanerMethod =
Utils.classForName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer])
val unsafeField = classOf[Unsafe].getDeclaredField("theUnsafe")
unsafeField.setAccessible(true)
val unsafe = unsafeField.get(null).asInstanceOf[Unsafe]
buffer: DirectBuffer => cleanerMethod.invoke(unsafe, buffer)
} else {
val cleanerMethod = Utils.classForName("sun.misc.Cleaner").getMethod("clean")
buffer: DirectBuffer => {
// Careful to avoid the return type of .cleaner(), which changes with JDK
val cleaner: AnyRef = buffer.cleaner()
if (cleaner != null) {
cleanerMethod.invoke(cleaner)
}
}
}
/**
* Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun
* API that will cause errors if one attempts to read from the disposed buffer. However, neither
@ -204,14 +232,8 @@ private[spark] object StorageUtils extends Logging {
def dispose(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
logTrace(s"Disposing of $buffer")
cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
bufferCleaner(buffer.asInstanceOf[DirectBuffer])
}
}
private def cleanDirectBuffer(buffer: DirectBuffer) = {
val cleaner = buffer.cleaner()
if (cleaner != null) {
cleaner.clean()
}
}
}

View file

@ -33,7 +33,7 @@ import scala.util.Random
import com.google.common.io.Files
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.SystemUtils
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.commons.math3.stat.inference.ChiSquareTest
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@ -932,10 +932,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
signal(pid, "SIGKILL")
}
val versionParts = System.getProperty("java.version").split("[+.\\-]+", 3)
var majorVersion = versionParts(0).toInt
if (majorVersion == 1) majorVersion = versionParts(1).toInt
if (majorVersion >= 8) {
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_1_8)) {
// We'll make sure that forcibly terminating a process works by
// creating a very misbehaving process. It ignores SIGTERM and has been SIGSTOPed. On
// older versions of java, this will *not* terminate.

View file

@ -43,7 +43,7 @@ commons-digester-1.8.jar
commons-httpclient-3.1.jar
commons-io-2.4.jar
commons-lang-2.6.jar
commons-lang3-3.5.jar
commons-lang3-3.8.1.jar
commons-logging-1.1.3.jar
commons-math3-3.4.1.jar
commons-net-3.1.jar

View file

@ -40,7 +40,7 @@ commons-dbcp-1.4.jar
commons-httpclient-3.1.jar
commons-io-2.4.jar
commons-lang-2.6.jar
commons-lang3-3.5.jar
commons-lang3-3.8.1.jar
commons-logging-1.1.3.jar
commons-math3-3.4.1.jar
commons-net-3.1.jar
@ -116,8 +116,8 @@ jersey-container-servlet-core-2.22.2.jar
jersey-guava-2.22.2.jar
jersey-media-jaxb-2.22.2.jar
jersey-server-2.22.2.jar
jetty-webapp-9.3.24.v20180605.jar
jetty-xml-9.3.24.v20180605.jar
jetty-webapp-9.4.12.v20180830.jar
jetty-xml-9.4.12.v20180830.jar
jline-2.14.6.jar
joda-time-2.9.3.jar
jodd-core-3.5.2.jar

View file

@ -32,13 +32,13 @@ object LogQuery {
| GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
| 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
| 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 ""
| 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.lines.mkString,
| 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.split('\n').mkString,
"""10.10.10.10 - "FRED" [18/Jan/2013:18:02:37 +1100] "GET http://images.com/2013/Generic.jpg
| HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
| GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
| 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
| 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 ""
| 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.lines.mkString
| 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.split('\n').mkString
)
def main(args: Array[String]) {

View file

@ -862,10 +862,10 @@ class MatricesSuite extends SparkMLFunSuite {
mat.toString(0, 0)
mat.toString(Int.MinValue, Int.MinValue)
mat.toString(Int.MaxValue, Int.MaxValue)
var lines = mat.toString(6, 50).lines.toArray
var lines = mat.toString(6, 50).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 50))
lines = mat.toString(5, 100).lines.toArray
lines = mat.toString(5, 100).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 100))
}

View file

@ -511,10 +511,10 @@ class MatricesSuite extends SparkFunSuite {
mat.toString(0, 0)
mat.toString(Int.MinValue, Int.MinValue)
mat.toString(Int.MaxValue, Int.MaxValue)
var lines = mat.toString(6, 50).lines.toArray
var lines = mat.toString(6, 50).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 50))
lines = mat.toString(5, 100).lines.toArray
lines = mat.toString(5, 100).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 100))
}

22
pom.xml
View file

@ -133,7 +133,7 @@
<orc.version>1.5.3</orc.version>
<orc.classifier>nohive</orc.classifier>
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.3.24.v20180605</jetty.version>
<jetty.version>9.4.12.v20180830</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
<chill.version>0.9.3</chill.version>
<ivy.version>2.4.0</ivy.version>
@ -166,7 +166,7 @@
<!-- org.apache.commons/commons-lang/-->
<commons-lang2.version>2.6</commons-lang2.version>
<!-- org.apache.commons/commons-lang3/-->
<commons-lang3.version>3.5</commons-lang3.version>
<commons-lang3.version>3.8.1</commons-lang3.version>
<datanucleus-core.version>3.2.10</datanucleus-core.version>
<janino.version>3.0.10</janino.version>
<jersey.version>2.22.2</jersey.version>
@ -2016,7 +2016,7 @@
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<!-- 3.3.1 won't work with zinc; fails to find javac from java.home -->
<version>3.2.2</version>
<version>3.4.4</version>
<executions>
<execution>
<id>eclipse-add-source</id>
@ -2281,7 +2281,19 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<version>3.2.0</version>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>7.0</version>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm-commons</artifactId>
<version>7.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@ -2296,7 +2308,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.2</version>
<version>3.1.1</version>
<executions>
<execution>
<id>default-cli</id>

View file

@ -19,7 +19,6 @@
package org.apache.hive.service.cli.thrift;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -65,7 +64,7 @@ public class ThriftHttpCLIService extends ThriftCLIService {
// Server thread pool
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
String threadPoolName = "HiveServer2-HttpHandler-Pool";
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
ThreadPoolExecutor executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);