[SPARK-3797] Minor addendum to Yarn shuffle service
I did not realize there was a `network.util.JavaUtils` when I wrote this code. This PR moves the `ByteBuffer` string conversion to the appropriate place. I tested the changes on a stable yarn cluster. Author: Andrew Or <andrew@databricks.com> Closes #3144 from andrewor14/yarn-shuffle-util and squashes the following commits: b6c08bf [Andrew Or] Remove unused import 94e205c [Andrew Or] Use netty Unpooled 85202a5 [Andrew Or] Use guava Charsets 057135b [Andrew Or] Reword comment adf186d [Andrew Or] Move byte buffer String conversion logic to JavaUtils
This commit is contained in:
parent
470881b24a
commit
96136f222a
|
@ -17,6 +17,8 @@
|
||||||
|
|
||||||
package org.apache.spark.network.util;
|
package org.apache.spark.network.util;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -25,6 +27,8 @@ import java.io.ObjectInputStream;
|
||||||
import java.io.ObjectOutputStream;
|
import java.io.ObjectOutputStream;
|
||||||
|
|
||||||
import com.google.common.io.Closeables;
|
import com.google.common.io.Closeables;
|
||||||
|
import com.google.common.base.Charsets;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -73,4 +77,20 @@ public class JavaUtils {
|
||||||
int hash = obj.hashCode();
|
int hash = obj.hashCode();
|
||||||
return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0;
|
return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the given string to a byte buffer. The resulting buffer can be
|
||||||
|
* converted back to the same string through {@link #bytesToString(ByteBuffer)}.
|
||||||
|
*/
|
||||||
|
public static ByteBuffer stringToBytes(String s) {
|
||||||
|
return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the given byte buffer to a string. The resulting string can be
|
||||||
|
* converted back to the same byte buffer through {@link #stringToBytes(String)}.
|
||||||
|
*/
|
||||||
|
public static String bytesToString(ByteBuffer b) {
|
||||||
|
return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,13 @@ package org.apache.spark.network.sasl;
|
||||||
|
|
||||||
import java.lang.Override;
|
import java.lang.Override;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.Charset;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.spark.network.sasl.SecretKeyHolder;
|
import org.apache.spark.network.sasl.SecretKeyHolder;
|
||||||
|
import org.apache.spark.network.util.JavaUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class that manages shuffle secret used by the external shuffle service.
|
* A class that manages shuffle secret used by the external shuffle service.
|
||||||
|
@ -34,30 +34,10 @@ public class ShuffleSecretManager implements SecretKeyHolder {
|
||||||
private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class);
|
private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class);
|
||||||
private final ConcurrentHashMap<String, String> shuffleSecretMap;
|
private final ConcurrentHashMap<String, String> shuffleSecretMap;
|
||||||
|
|
||||||
private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
|
|
||||||
|
|
||||||
// Spark user used for authenticating SASL connections
|
// Spark user used for authenticating SASL connections
|
||||||
// Note that this must match the value in org.apache.spark.SecurityManager
|
// Note that this must match the value in org.apache.spark.SecurityManager
|
||||||
private static final String SPARK_SASL_USER = "sparkSaslUser";
|
private static final String SPARK_SASL_USER = "sparkSaslUser";
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert the given string to a byte buffer. The resulting buffer can be converted back to
|
|
||||||
* the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external
|
|
||||||
* shuffle service represents shuffle secrets as bytes buffers instead of strings.
|
|
||||||
*/
|
|
||||||
public static ByteBuffer stringToBytes(String s) {
|
|
||||||
return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert the given byte buffer to a string. The resulting string can be converted back to
|
|
||||||
* the same byte buffer through {@link #stringToBytes(String)}. This is used if the external
|
|
||||||
* shuffle service represents shuffle secrets as bytes buffers instead of strings.
|
|
||||||
*/
|
|
||||||
public static String bytesToString(ByteBuffer b) {
|
|
||||||
return new String(b.array(), UTF8_CHARSET);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ShuffleSecretManager() {
|
public ShuffleSecretManager() {
|
||||||
shuffleSecretMap = new ConcurrentHashMap<String, String>();
|
shuffleSecretMap = new ConcurrentHashMap<String, String>();
|
||||||
}
|
}
|
||||||
|
@ -80,7 +60,7 @@ public class ShuffleSecretManager implements SecretKeyHolder {
|
||||||
* Register an application with its secret specified as a byte buffer.
|
* Register an application with its secret specified as a byte buffer.
|
||||||
*/
|
*/
|
||||||
public void registerApp(String appId, ByteBuffer shuffleSecret) {
|
public void registerApp(String appId, ByteBuffer shuffleSecret) {
|
||||||
registerApp(appId, bytesToString(shuffleSecret));
|
registerApp(appId, JavaUtils.bytesToString(shuffleSecret));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
|
||||||
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
|
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
|
||||||
|
|
||||||
import org.apache.spark.{SecurityManager, SparkConf, Logging}
|
import org.apache.spark.{SecurityManager, SparkConf, Logging}
|
||||||
import org.apache.spark.network.sasl.ShuffleSecretManager
|
import org.apache.spark.network.util.JavaUtils
|
||||||
|
|
||||||
@deprecated("use yarn/stable", "1.2.0")
|
@deprecated("use yarn/stable", "1.2.0")
|
||||||
class ExecutorRunnable(
|
class ExecutorRunnable(
|
||||||
|
@ -98,7 +98,8 @@ class ExecutorRunnable(
|
||||||
val secretString = securityMgr.getSecretKey()
|
val secretString = securityMgr.getSecretKey()
|
||||||
val secretBytes =
|
val secretBytes =
|
||||||
if (secretString != null) {
|
if (secretString != null) {
|
||||||
ShuffleSecretManager.stringToBytes(secretString)
|
// This conversion must match how the YarnShuffleService decodes our secret
|
||||||
|
JavaUtils.stringToBytes(secretString)
|
||||||
} else {
|
} else {
|
||||||
// Authentication is not enabled, so just provide dummy metadata
|
// Authentication is not enabled, so just provide dummy metadata
|
||||||
ByteBuffer.allocate(0)
|
ByteBuffer.allocate(0)
|
||||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
|
||||||
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
|
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
|
||||||
|
|
||||||
import org.apache.spark.{SecurityManager, SparkConf, Logging}
|
import org.apache.spark.{SecurityManager, SparkConf, Logging}
|
||||||
import org.apache.spark.network.sasl.ShuffleSecretManager
|
import org.apache.spark.network.util.JavaUtils
|
||||||
|
|
||||||
|
|
||||||
class ExecutorRunnable(
|
class ExecutorRunnable(
|
||||||
|
@ -97,7 +97,8 @@ class ExecutorRunnable(
|
||||||
val secretString = securityMgr.getSecretKey()
|
val secretString = securityMgr.getSecretKey()
|
||||||
val secretBytes =
|
val secretBytes =
|
||||||
if (secretString != null) {
|
if (secretString != null) {
|
||||||
ShuffleSecretManager.stringToBytes(secretString)
|
// This conversion must match how the YarnShuffleService decodes our secret
|
||||||
|
JavaUtils.stringToBytes(secretString)
|
||||||
} else {
|
} else {
|
||||||
// Authentication is not enabled, so just provide dummy metadata
|
// Authentication is not enabled, so just provide dummy metadata
|
||||||
ByteBuffer.allocate(0)
|
ByteBuffer.allocate(0)
|
||||||
|
|
Loading…
Reference in a new issue