[SPARK-4242] [Core] Add SASL to external shuffle service
Does three things: (1) Adds SASL to ExternalShuffleClient, (2) puts SecurityManager in BlockManager's constructor, and (3) adds unit test. Author: Aaron Davidson <aaron@databricks.com> Closes #3108 from aarondav/sasl-client and squashes the following commits: 48b622d [Aaron Davidson] Screw it, let's just get LimitedInputStream 3543b70 [Aaron Davidson] Back out of pom change due to unknown test issue? b58518a [Aaron Davidson] ByteStreams.limit() not available :( cbe451a [Aaron Davidson] Address comments 2bf2908 [Aaron Davidson] [SPARK-4242] [Core] Add SASL to external shuffle service
This commit is contained in:
parent
5b3b6f6f5f
commit
4c42986cc0
21
LICENSE
21
LICENSE
|
@ -754,7 +754,7 @@ SUCH DAMAGE.
|
|||
|
||||
|
||||
========================================================================
|
||||
For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
|
||||
For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
|
||||
========================================================================
|
||||
Copyright (C) 2008 The Android Open Source Project
|
||||
|
||||
|
@ -771,6 +771,25 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
|
||||
|
||||
========================================================================
|
||||
For LimitedInputStream
|
||||
(network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
|
||||
========================================================================
|
||||
Copyright (C) 2007 The Guava Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
|
||||
========================================================================
|
||||
BSD-style licenses
|
||||
========================================================================
|
||||
|
|
|
@ -287,7 +287,7 @@ object SparkEnv extends Logging {
|
|||
|
||||
// NB: blockManager is not valid until initialize() is called later.
|
||||
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
|
||||
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
|
||||
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
|
||||
|
||||
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
|
||||
|
||||
|
|
|
@ -72,7 +72,8 @@ private[spark] class BlockManager(
|
|||
val conf: SparkConf,
|
||||
mapOutputTracker: MapOutputTracker,
|
||||
shuffleManager: ShuffleManager,
|
||||
blockTransferService: BlockTransferService)
|
||||
blockTransferService: BlockTransferService,
|
||||
securityManager: SecurityManager)
|
||||
extends BlockDataManager with Logging {
|
||||
|
||||
val diskBlockManager = new DiskBlockManager(this, conf)
|
||||
|
@ -115,7 +116,8 @@ private[spark] class BlockManager(
|
|||
// Client to read other executors' shuffle files. This is either an external service, or just the
|
||||
// standard BlockTranserService to directly connect to other Executors.
|
||||
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
|
||||
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf))
|
||||
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
|
||||
securityManager.isAuthenticationEnabled())
|
||||
} else {
|
||||
blockTransferService
|
||||
}
|
||||
|
@ -166,9 +168,10 @@ private[spark] class BlockManager(
|
|||
conf: SparkConf,
|
||||
mapOutputTracker: MapOutputTracker,
|
||||
shuffleManager: ShuffleManager,
|
||||
blockTransferService: BlockTransferService) = {
|
||||
blockTransferService: BlockTransferService,
|
||||
securityManager: SecurityManager) = {
|
||||
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
|
||||
conf, mapOutputTracker, shuffleManager, blockTransferService)
|
||||
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -219,7 +222,6 @@ private[spark] class BlockManager(
|
|||
return
|
||||
} catch {
|
||||
case e: Exception if i < MAX_ATTEMPTS =>
|
||||
val attemptsRemaining =
|
||||
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}}"
|
||||
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
|
||||
Thread.sleep(SLEEP_TIME_SECS * 1000)
|
||||
|
|
|
@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
|
|||
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
|
||||
val transfer = new NioBlockTransferService(conf, securityMgr)
|
||||
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
|
||||
mapOutputTracker, shuffleManager, transfer)
|
||||
mapOutputTracker, shuffleManager, transfer, securityMgr)
|
||||
store.initialize("app-id")
|
||||
allStores += store
|
||||
store
|
||||
|
@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
|
|||
when(failableTransfer.hostName).thenReturn("some-hostname")
|
||||
when(failableTransfer.port).thenReturn(1000)
|
||||
val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
|
||||
10000, conf, mapOutputTracker, shuffleManager, failableTransfer)
|
||||
10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr)
|
||||
failableStore.initialize("app-id")
|
||||
allStores += failableStore // so that this gets stopped after test
|
||||
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
|
||||
|
|
|
@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
|
|||
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
|
||||
val transfer = new NioBlockTransferService(conf, securityMgr)
|
||||
val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
|
||||
mapOutputTracker, shuffleManager, transfer)
|
||||
mapOutputTracker, shuffleManager, transfer, securityMgr)
|
||||
manager.initialize("app-id")
|
||||
manager
|
||||
}
|
||||
|
@ -795,7 +795,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
|
|||
// Use Java serializer so we can create an unserializable error.
|
||||
val transfer = new NioBlockTransferService(conf, securityMgr)
|
||||
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
|
||||
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer)
|
||||
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr)
|
||||
|
||||
// The put should fail since a1 is not serializable.
|
||||
class UnserializableClass
|
||||
|
|
|
@ -50,6 +50,7 @@
|
|||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>11.0.2</version> <!-- yarn 2.4.0's version -->
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import com.google.common.io.ByteStreams;
|
|||
import io.netty.channel.DefaultFileRegion;
|
||||
|
||||
import org.apache.spark.network.util.JavaUtils;
|
||||
import org.apache.spark.network.util.LimitedInputStream;
|
||||
|
||||
/**
|
||||
* A {@link ManagedBuffer} backed by a segment in a file.
|
||||
|
@ -101,7 +102,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
|
|||
try {
|
||||
is = new FileInputStream(file);
|
||||
ByteStreams.skipFully(is, offset);
|
||||
return ByteStreams.limit(is, length);
|
||||
return new LimitedInputStream(is, length);
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
if (is != null) {
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.network.util;
|
||||
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Wraps a {@link InputStream}, limiting the number of bytes which can be read.
|
||||
*
|
||||
* This code is from Guava's 14.0 source code, because there is no compatible way to
|
||||
* use this functionality in both a Guava 11 environment and a Guava >14 environment.
|
||||
*/
|
||||
public final class LimitedInputStream extends FilterInputStream {
|
||||
private long left;
|
||||
private long mark = -1;
|
||||
|
||||
public LimitedInputStream(InputStream in, long limit) {
|
||||
super(in);
|
||||
Preconditions.checkNotNull(in);
|
||||
Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
|
||||
left = limit;
|
||||
}
|
||||
@Override public int available() throws IOException {
|
||||
return (int) Math.min(in.available(), left);
|
||||
}
|
||||
// it's okay to mark even if mark isn't supported, as reset won't work
|
||||
@Override public synchronized void mark(int readLimit) {
|
||||
in.mark(readLimit);
|
||||
mark = left;
|
||||
}
|
||||
@Override public int read() throws IOException {
|
||||
if (left == 0) {
|
||||
return -1;
|
||||
}
|
||||
int result = in.read();
|
||||
if (result != -1) {
|
||||
--left;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@Override public int read(byte[] b, int off, int len) throws IOException {
|
||||
if (left == 0) {
|
||||
return -1;
|
||||
}
|
||||
len = (int) Math.min(len, left);
|
||||
int result = in.read(b, off, len);
|
||||
if (result != -1) {
|
||||
left -= result;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@Override public synchronized void reset() throws IOException {
|
||||
if (!in.markSupported()) {
|
||||
throw new IOException("Mark not supported");
|
||||
}
|
||||
if (mark == -1) {
|
||||
throw new IOException("Mark not set");
|
||||
}
|
||||
in.reset();
|
||||
left = mark;
|
||||
}
|
||||
@Override public long skip(long n) throws IOException {
|
||||
n = Math.min(n, left);
|
||||
long skipped = in.skip(n);
|
||||
left -= skipped;
|
||||
return skipped;
|
||||
}
|
||||
}
|
|
@ -51,6 +51,7 @@
|
|||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>11.0.2</version> <!-- yarn 2.4.0's version -->
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
|
|
|
@ -126,7 +126,6 @@ public class SparkSaslClient {
|
|||
logger.trace("SASL client callback: setting realm");
|
||||
RealmCallback rc = (RealmCallback) callback;
|
||||
rc.setText(rc.getDefaultText());
|
||||
logger.info("Realm callback");
|
||||
} else if (callback instanceof RealmChoiceCallback) {
|
||||
// ignore (?)
|
||||
} else {
|
||||
|
|
|
@ -34,7 +34,8 @@ import com.google.common.base.Charsets;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.BaseEncoding;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.handler.codec.base64.Base64;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -159,12 +160,14 @@ public class SparkSaslServer {
|
|||
/* Encode a byte[] identifier as a Base64-encoded string. */
|
||||
public static String encodeIdentifier(String identifier) {
|
||||
Preconditions.checkNotNull(identifier, "User cannot be null if SASL is enabled");
|
||||
return BaseEncoding.base64().encode(identifier.getBytes(Charsets.UTF_8));
|
||||
return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(Charsets.UTF_8)))
|
||||
.toString(Charsets.UTF_8);
|
||||
}
|
||||
|
||||
/** Encode a password as a base64-encoded char[] array. */
|
||||
public static char[] encodePassword(String password) {
|
||||
Preconditions.checkNotNull(password, "Password cannot be null if SASL is enabled");
|
||||
return BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8)).toCharArray();
|
||||
return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(Charsets.UTF_8)))
|
||||
.toString(Charsets.UTF_8).toCharArray();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,12 +17,18 @@
|
|||
|
||||
package org.apache.spark.network.shuffle;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.spark.network.TransportContext;
|
||||
import org.apache.spark.network.client.TransportClient;
|
||||
import org.apache.spark.network.client.TransportClientBootstrap;
|
||||
import org.apache.spark.network.client.TransportClientFactory;
|
||||
import org.apache.spark.network.sasl.SaslClientBootstrap;
|
||||
import org.apache.spark.network.sasl.SecretKeyHolder;
|
||||
import org.apache.spark.network.server.NoOpRpcHandler;
|
||||
import org.apache.spark.network.shuffle.ExternalShuffleMessages.RegisterExecutor;
|
||||
import org.apache.spark.network.util.JavaUtils;
|
||||
|
@ -37,18 +43,35 @@ import org.apache.spark.network.util.TransportConf;
|
|||
public class ExternalShuffleClient extends ShuffleClient {
|
||||
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleClient.class);
|
||||
|
||||
private final TransportClientFactory clientFactory;
|
||||
private final TransportConf conf;
|
||||
private final boolean saslEnabled;
|
||||
private final SecretKeyHolder secretKeyHolder;
|
||||
|
||||
private TransportClientFactory clientFactory;
|
||||
private String appId;
|
||||
|
||||
public ExternalShuffleClient(TransportConf conf) {
|
||||
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
|
||||
this.clientFactory = context.createClientFactory();
|
||||
/**
|
||||
* Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
|
||||
* then secretKeyHolder may be null.
|
||||
*/
|
||||
public ExternalShuffleClient(
|
||||
TransportConf conf,
|
||||
SecretKeyHolder secretKeyHolder,
|
||||
boolean saslEnabled) {
|
||||
this.conf = conf;
|
||||
this.secretKeyHolder = secretKeyHolder;
|
||||
this.saslEnabled = saslEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(String appId) {
|
||||
this.appId = appId;
|
||||
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
|
||||
List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
|
||||
if (saslEnabled) {
|
||||
bootstraps.add(new SaslClientBootstrap(conf, appId, secretKeyHolder));
|
||||
}
|
||||
clientFactory = context.createClientFactory(bootstraps);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -135,7 +135,7 @@ public class ExternalShuffleIntegrationSuite {
|
|||
|
||||
final Semaphore requestsRemaining = new Semaphore(0);
|
||||
|
||||
ExternalShuffleClient client = new ExternalShuffleClient(conf);
|
||||
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
|
||||
client.init(APP_ID);
|
||||
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
|
||||
new BlockFetchingListener() {
|
||||
|
@ -267,7 +267,7 @@ public class ExternalShuffleIntegrationSuite {
|
|||
}
|
||||
|
||||
private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) {
|
||||
ExternalShuffleClient client = new ExternalShuffleClient(conf);
|
||||
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
|
||||
client.init(APP_ID);
|
||||
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
|
||||
executorId, executorInfo);
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.network.shuffle;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.spark.network.TestUtils;
|
||||
import org.apache.spark.network.TransportContext;
|
||||
import org.apache.spark.network.sasl.SaslRpcHandler;
|
||||
import org.apache.spark.network.sasl.SecretKeyHolder;
|
||||
import org.apache.spark.network.server.RpcHandler;
|
||||
import org.apache.spark.network.server.TransportServer;
|
||||
import org.apache.spark.network.util.SystemPropertyConfigProvider;
|
||||
import org.apache.spark.network.util.TransportConf;
|
||||
|
||||
public class ExternalShuffleSecuritySuite {
|
||||
|
||||
TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
|
||||
TransportServer server;
|
||||
|
||||
@Before
|
||||
public void beforeEach() {
|
||||
RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(),
|
||||
new TestSecretKeyHolder("my-app-id", "secret"));
|
||||
TransportContext context = new TransportContext(conf, handler);
|
||||
this.server = context.createServer();
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterEach() {
|
||||
if (server != null) {
|
||||
server.close();
|
||||
server = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValid() {
|
||||
validate("my-app-id", "secret");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadAppId() {
|
||||
try {
|
||||
validate("wrong-app-id", "secret");
|
||||
} catch (Exception e) {
|
||||
assertTrue(e.getMessage(), e.getMessage().contains("Wrong appId!"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadSecret() {
|
||||
try {
|
||||
validate("my-app-id", "bad-secret");
|
||||
} catch (Exception e) {
|
||||
assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
|
||||
}
|
||||
}
|
||||
|
||||
/** Creates an ExternalShuffleClient and attempts to register with the server. */
|
||||
private void validate(String appId, String secretKey) {
|
||||
ExternalShuffleClient client =
|
||||
new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true);
|
||||
client.init(appId);
|
||||
// Registration either succeeds or throws an exception.
|
||||
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
|
||||
new ExecutorShuffleInfo(new String[0], 0, ""));
|
||||
client.close();
|
||||
}
|
||||
|
||||
/** Provides a secret key holder which always returns the given secret key, for a single appId. */
|
||||
static class TestSecretKeyHolder implements SecretKeyHolder {
|
||||
private final String appId;
|
||||
private final String secretKey;
|
||||
|
||||
TestSecretKeyHolder(String appId, String secretKey) {
|
||||
this.appId = appId;
|
||||
this.secretKey = secretKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSaslUser(String appId) {
|
||||
return "user";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSecretKey(String appId) {
|
||||
if (!appId.equals(this.appId)) {
|
||||
throw new IllegalArgumentException("Wrong appId!");
|
||||
}
|
||||
return secretKey;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -73,7 +73,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
|
|||
|
||||
blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer,
|
||||
blockManagerSize, conf, mapOutputTracker, shuffleManager,
|
||||
new NioBlockTransferService(conf, securityMgr))
|
||||
new NioBlockTransferService(conf, securityMgr), securityMgr)
|
||||
blockManager.initialize("app-id")
|
||||
|
||||
tempDirectory = Files.createTempDir()
|
||||
|
|
Loading…
Reference in a new issue