[SPARK-18773][CORE] Make commons-crypto config translation consistent.

This change moves the logic that translates Spark configuration to
commons-crypto configuration to the network-common module. It also
extends TransportConf and ConfigProvider to provide the necessary
interfaces for the translation to work.

As part of the change, I removed SystemPropertyConfigProvider, which
was mostly used as an "empty config" in unit tests, and adjusted the
very few tests that required a specific config.

I also changed the config keys for AES encryption to live under the
"spark.network." namespace, which is more correct than their previous
names under "spark.authenticate.".

Tested via existing unit test.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #16200 from vanzin/SPARK-18773.
This commit is contained in:
Marcelo Vanzin 2016-12-12 16:27:04 -08:00 committed by Shixiong Zhu
parent 8a51cfdcad
commit bc59951bab
23 changed files with 231 additions and 145 deletions

View file

@ -60,7 +60,7 @@ public class AesCipher {
private final Properties properties;
public AesCipher(AesConfigMessage configMessage, TransportConf conf) throws IOException {
this.properties = CryptoStreamUtils.toCryptoConf(conf);
this.properties = conf.cryptoConf();
this.inKeySpec = new SecretKeySpec(configMessage.inKey, "AES");
this.inIvSpec = new IvParameterSpec(configMessage.inIv);
this.outKeySpec = new SecretKeySpec(configMessage.outKey, "AES");
@ -105,7 +105,7 @@ public class AesCipher {
*/
public static AesConfigMessage createConfigMessage(TransportConf conf) {
int keySize = conf.aesCipherKeySize();
Properties properties = CryptoStreamUtils.toCryptoConf(conf);
Properties properties = conf.cryptoConf();
try {
int paramLen = CryptoCipherFactory.getCryptoCipher(AesCipher.TRANSFORM, properties)
@ -128,19 +128,6 @@ public class AesCipher {
}
}
/**
* CryptoStreamUtils is used to convert config from TransportConf to AES Crypto config.
*/
private static class CryptoStreamUtils {
public static Properties toCryptoConf(TransportConf conf) {
Properties props = new Properties();
if (conf.aesCipherClass() != null) {
props.setProperty(CryptoCipherFactory.CLASSES_KEY, conf.aesCipherClass());
}
return props;
}
}
private static class AesEncryptHandler extends ChannelOutboundHandlerAdapter {
private final ByteArrayWritableChannel byteChannel;
private final CryptoOutputStream cos;

View file

@ -17,6 +17,7 @@
package org.apache.spark.network.util;
import java.util.Map;
import java.util.NoSuchElementException;
/**
@ -26,6 +27,9 @@ public abstract class ConfigProvider {
/** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. */
public abstract String get(String name);
/** Returns all the config values in the provider. */
public abstract Iterable<Map.Entry<String, String>> getAll();
public String get(String name, String defaultValue) {
try {
return get(name);
@ -49,4 +53,5 @@ public abstract class ConfigProvider {
public boolean getBoolean(String name, boolean defaultValue) {
return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue)));
}
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.util;
import java.util.Map;
import java.util.Properties;
/**
* Utility methods related to the commons-crypto library.
*/
public class CryptoUtils {
// The prefix for the configurations passing to Apache Commons Crypto library.
public static final String COMMONS_CRYPTO_CONFIG_PREFIX = "commons.crypto.";
/**
* Extract the commons-crypto configuration embedded in a list of config values.
*
* @param prefix Prefix in the given configuration that identifies the commons-crypto configs.
* @param conf List of configuration values.
*/
public static Properties toCryptoConf(String prefix, Iterable<Map.Entry<String, String>> conf) {
Properties props = new Properties();
for (Map.Entry<String, String> e : conf) {
String key = e.getKey();
if (key.startsWith(prefix)) {
props.setProperty(COMMONS_CRYPTO_CONFIG_PREFIX + key.substring(prefix.length()),
e.getValue());
}
}
return props;
}
}

View file

@ -19,11 +19,16 @@ package org.apache.spark.network.util;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.Map;
import java.util.NoSuchElementException;
/** ConfigProvider based on a Map (copied in the constructor). */
public class MapConfigProvider extends ConfigProvider {
public static final MapConfigProvider EMPTY = new MapConfigProvider(
Collections.<String, String>emptyMap());
private final Map<String, String> config;
public MapConfigProvider(Map<String, String> config) {
@ -38,4 +43,10 @@ public class MapConfigProvider extends ConfigProvider {
}
return value;
}
@Override
public Iterable<Map.Entry<String, String>> getAll() {
return config.entrySet();
}
}

View file

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.util;
import java.util.NoSuchElementException;
/** Uses System properties to obtain config values. */
public class SystemPropertyConfigProvider extends ConfigProvider {
@Override
public String get(String name) {
String value = System.getProperty(name);
if (value == null) {
throw new NoSuchElementException(name);
}
return value;
}
}

View file

@ -17,6 +17,8 @@
package org.apache.spark.network.util;
import java.util.Properties;
import com.google.common.primitives.Ints;
/**
@ -174,21 +176,22 @@ public class TransportConf {
* The trigger for enabling AES encryption.
*/
public boolean aesEncryptionEnabled() {
return conf.getBoolean("spark.authenticate.encryption.aes.enabled", false);
return conf.getBoolean("spark.network.aes.enabled", false);
}
/**
* The implementation class for crypto cipher
*/
public String aesCipherClass() {
return conf.get("spark.authenticate.encryption.aes.cipher.class", null);
}
/**
* The bytes of AES cipher key which is effective when AES cipher is enabled. Notice that
* the length should be 16, 24 or 32 bytes.
* The key size to use when AES cipher is enabled. Notice that the length should be 16, 24 or 32
* bytes.
*/
public int aesCipherKeySize() {
return conf.getInt("spark.authenticate.encryption.aes.cipher.keySize", 16);
return conf.getInt("spark.network.aes.keySize", 16);
}
/**
* The commons-crypto configuration for the module.
*/
public Properties cryptoConf() {
return CryptoUtils.toCryptoConf("spark.network.aes.config.", conf.getAll());
}
}

View file

@ -48,7 +48,7 @@ import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
public class ChunkFetchIntegrationSuite {
@ -87,7 +87,7 @@ public class ChunkFetchIntegrationSuite {
Closeables.close(fp, shouldSuppressIOException);
}
final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
final TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
streamManager = new StreamManager() {

View file

@ -42,7 +42,7 @@ import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
public class RpcIntegrationSuite {
@ -53,7 +53,7 @@ public class RpcIntegrationSuite {
@BeforeClass
public static void setUp() throws Exception {
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
rpcHandler = new RpcHandler() {
@Override
public void receive(

View file

@ -47,7 +47,7 @@ import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
public class StreamSuite {
@ -91,7 +91,7 @@ public class StreamSuite {
fp.close();
}
final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
final TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
final StreamManager streamManager = new StreamManager() {
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {

View file

@ -40,7 +40,7 @@ import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
@ -53,7 +53,7 @@ public class TransportClientFactorySuite {
@Before
public void setUp() {
conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
RpcHandler rpcHandler = new NoOpRpcHandler();
context = new TransportContext(conf, rpcHandler);
server1 = context.createServer();
@ -199,6 +199,11 @@ public class TransportClientFactorySuite {
}
return value;
}
@Override
public Iterable<Map.Entry<String, String>> getAll() {
throw new UnsupportedOperationException();
}
});
TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true);
TransportClientFactory factory = context.createClientFactory();

View file

@ -24,7 +24,9 @@ import java.io.File;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
@ -32,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.sasl.SaslException;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
@ -60,7 +63,7 @@ import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
/**
@ -225,7 +228,7 @@ public class SparkSaslSuite {
public void testEncryptedMessageChunking() throws Exception {
File file = File.createTempFile("sasltest", ".txt");
try {
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
byte[] data = new byte[8 * 1024];
new Random().nextBytes(data);
@ -253,14 +256,14 @@ public class SparkSaslSuite {
@Test
public void testFileRegionEncryption() throws Exception {
final String blockSizeConf = "spark.network.sasl.maxEncryptedBlockSize";
System.setProperty(blockSizeConf, "1k");
final Map<String, String> testConf = ImmutableMap.of(
"spark.network.sasl.maxEncryptedBlockSize", "1k");
final AtomicReference<ManagedBuffer> response = new AtomicReference<>();
final File file = File.createTempFile("sasltest", ".txt");
SaslTestCtx ctx = null;
try {
final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
final TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(testConf));
StreamManager sm = mock(StreamManager.class);
when(sm.getChunk(anyLong(), anyInt())).thenAnswer(new Answer<ManagedBuffer>() {
@Override
@ -276,7 +279,7 @@ public class SparkSaslSuite {
new Random().nextBytes(data);
Files.write(data, file);
ctx = new SaslTestCtx(rpcHandler, true, false, false);
ctx = new SaslTestCtx(rpcHandler, true, false, false, testConf);
final CountDownLatch lock = new CountDownLatch(1);
@ -307,18 +310,15 @@ public class SparkSaslSuite {
if (response.get() != null) {
response.get().release();
}
System.clearProperty(blockSizeConf);
}
}
@Test
public void testServerAlwaysEncrypt() throws Exception {
final String alwaysEncryptConfName = "spark.network.sasl.serverAlwaysEncrypt";
System.setProperty(alwaysEncryptConfName, "true");
SaslTestCtx ctx = null;
try {
ctx = new SaslTestCtx(mock(RpcHandler.class), false, false, false);
ctx = new SaslTestCtx(mock(RpcHandler.class), false, false, false,
ImmutableMap.of("spark.network.sasl.serverAlwaysEncrypt", "true"));
fail("Should have failed to connect without encryption.");
} catch (Exception e) {
assertTrue(e.getCause() instanceof SaslException);
@ -326,7 +326,6 @@ public class SparkSaslSuite {
if (ctx != null) {
ctx.close();
}
System.clearProperty(alwaysEncryptConfName);
}
}
@ -381,7 +380,7 @@ public class SparkSaslSuite {
final File file = File.createTempFile("sasltest", ".txt");
SaslTestCtx ctx = null;
try {
final TransportConf conf = new TransportConf("rpc", new SystemPropertyConfigProvider());
final TransportConf conf = new TransportConf("rpc", MapConfigProvider.EMPTY);
final TransportConf spyConf = spy(conf);
doReturn(true).when(spyConf).aesEncryptionEnabled();
@ -454,7 +453,19 @@ public class SparkSaslSuite {
boolean aesEnable)
throws Exception {
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
this(rpcHandler, encrypt, disableClientEncryption, aesEnable,
Collections.<String, String>emptyMap());
}
SaslTestCtx(
RpcHandler rpcHandler,
boolean encrypt,
boolean disableClientEncryption,
boolean aesEnable,
Map<String, String> testConf)
throws Exception {
TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(testConf));
if (aesEnable) {
conf = spy(conf);

View file

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.util;
import java.util.Map;
import java.util.Properties;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
import static org.junit.Assert.*;
public class CryptoUtilsSuite {
@Test
public void testConfConversion() {
String prefix = "my.prefix.commons.config.";
String confKey1 = prefix + "a.b.c";
String confVal1 = "val1";
String cryptoKey1 = CryptoUtils.COMMONS_CRYPTO_CONFIG_PREFIX + "a.b.c";
String confKey2 = prefix.substring(0, prefix.length() - 1) + "A.b.c";
String confVal2 = "val2";
String cryptoKey2 = CryptoUtils.COMMONS_CRYPTO_CONFIG_PREFIX + "A.b.c";
Map<String, String> conf = ImmutableMap.of(
confKey1, confVal1,
confKey2, confVal2);
Properties cryptoConf = CryptoUtils.toCryptoConf(prefix, conf.entrySet());
assertEquals(confVal1, cryptoConf.getProperty(cryptoKey1));
assertFalse(cryptoConf.containsKey(cryptoKey2));
}
}

View file

@ -55,7 +55,7 @@ import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
public class SaslIntegrationSuite {
@ -73,7 +73,7 @@ public class SaslIntegrationSuite {
@BeforeClass
public static void beforeAll() throws IOException {
conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
context = new TransportContext(conf, new TestRpcHandler());
secretKeyHolder = mock(SecretKeyHolder.class);

View file

@ -25,7 +25,7 @@ import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.CharStreams;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
import org.junit.AfterClass;
@ -42,7 +42,7 @@ public class ExternalShuffleBlockResolverSuite {
private static TestShuffleDataContext dataContext;
private static final TransportConf conf =
new TransportConf("shuffle", new SystemPropertyConfigProvider());
new TransportConf("shuffle", MapConfigProvider.EMPTY);
@BeforeClass
public static void beforeAll() throws IOException {

View file

@ -29,14 +29,14 @@ import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
public class ExternalShuffleCleanupSuite {
// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
private TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
@Test

View file

@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.After;
@ -43,7 +44,7 @@ import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
public class ExternalShuffleIntegrationSuite {
@ -84,7 +85,7 @@ public class ExternalShuffleIntegrationSuite {
dataContext0.create();
dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
handler = new ExternalShuffleBlockHandler(conf, null);
TransportContext transportContext = new TransportContext(conf, handler);
server = transportContext.createServer();
@ -115,12 +116,16 @@ public class ExternalShuffleIntegrationSuite {
// Fetch a set of blocks from a pre-registered executor.
private FetchResult fetchBlocks(String execId, String[] blockIds) throws Exception {
return fetchBlocks(execId, blockIds, server.getPort());
return fetchBlocks(execId, blockIds, conf, server.getPort());
}
// Fetch a set of blocks from a pre-registered executor. Connects to the server on the given port,
// to allow connecting to invalid servers.
private FetchResult fetchBlocks(String execId, String[] blockIds, int port) throws Exception {
private FetchResult fetchBlocks(
String execId,
String[] blockIds,
TransportConf clientConf,
int port) throws Exception {
final FetchResult res = new FetchResult();
res.successBlocks = Collections.synchronizedSet(new HashSet<String>());
res.failedBlocks = Collections.synchronizedSet(new HashSet<String>());
@ -128,7 +133,7 @@ public class ExternalShuffleIntegrationSuite {
final Semaphore requestsRemaining = new Semaphore(0);
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, false);
client.init(APP_ID);
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
new BlockFetchingListener() {
@ -227,16 +232,13 @@ public class ExternalShuffleIntegrationSuite {
@Test
public void testFetchNoServer() throws Exception {
System.setProperty("spark.shuffle.io.maxRetries", "0");
try {
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
FetchResult execFetch = fetchBlocks("exec-0",
new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, 1 /* port */);
assertTrue(execFetch.successBlocks.isEmpty());
assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
} finally {
System.clearProperty("spark.shuffle.io.maxRetries");
}
TransportConf clientConf = new TransportConf("shuffle",
new MapConfigProvider(ImmutableMap.of("spark.shuffle.io.maxRetries", "0")));
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
FetchResult execFetch = fetchBlocks("exec-0",
new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, clientConf, 1 /* port */);
assertTrue(execFetch.successBlocks.isEmpty());
assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
}
private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)

View file

@ -33,12 +33,12 @@ import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
public class ExternalShuffleSecuritySuite {
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
TransportServer server;
@Before

View file

@ -27,8 +27,6 @@ import java.util.Map;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -39,7 +37,7 @@ import static org.mockito.Mockito.*;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchStarter;
@ -53,18 +51,6 @@ public class RetryingBlockFetcherSuite {
ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
@Before
public void beforeEach() {
System.setProperty("spark.shuffle.io.maxRetries", "2");
System.setProperty("spark.shuffle.io.retryWait", "0");
}
@After
public void afterEach() {
System.clearProperty("spark.shuffle.io.maxRetries");
System.clearProperty("spark.shuffle.io.retryWait");
}
@Test
public void testNoFailures() throws IOException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
@ -254,7 +240,10 @@ public class RetryingBlockFetcherSuite {
BlockFetchingListener listener)
throws IOException {
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of(
"spark.shuffle.io.maxRetries", "2",
"spark.shuffle.io.retryWait", "0"));
TransportConf conf = new TransportConf("shuffle", provider);
BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
Stubber stub = null;

View file

@ -17,6 +17,7 @@
package org.apache.spark.network.yarn.util;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.hadoop.conf.Configuration;
@ -39,4 +40,10 @@ public class HadoopConfigProvider extends ConfigProvider {
}
return value;
}
@Override
public Iterable<Map.Entry<String, String>> getAll() {
return conf;
}
}

View file

@ -17,6 +17,8 @@
package org.apache.spark.network.netty
import scala.collection.JavaConverters._
import org.apache.spark.SparkConf
import org.apache.spark.network.util.{ConfigProvider, TransportConf}
@ -58,6 +60,10 @@ object SparkTransportConf {
new TransportConf(module, new ConfigProvider {
override def get(name: String): String = conf.get(name)
override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
conf.getAll.toMap.asJava.entrySet()
}
})
}

View file

@ -21,12 +21,15 @@ import java.util.Properties
import javax.crypto.KeyGenerator
import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
import scala.collection.JavaConverters._
import org.apache.commons.crypto.random._
import org.apache.commons.crypto.stream._
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.CryptoUtils
/**
* A util class for manipulating IO encryption and decryption streams.
@ -37,8 +40,6 @@ private[spark] object CryptoStreamUtils extends Logging {
val IV_LENGTH_IN_BYTES = 16
// The prefix of IO encryption related configurations in Spark configuration.
val SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX = "spark.io.encryption.commons.config."
// The prefix for the configurations passing to Apache Commons Crypto library.
val COMMONS_CRYPTO_CONF_PREFIX = "commons.crypto."
/**
* Helper method to wrap `OutputStream` with `CryptoOutputStream` for encryption.
@ -70,18 +71,9 @@ private[spark] object CryptoStreamUtils extends Logging {
new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
}
/**
* Get Commons-crypto configurations from Spark configurations identified by prefix.
*/
def toCryptoConf(conf: SparkConf): Properties = {
val props = new Properties()
conf.getAll.foreach { case (k, v) =>
if (k.startsWith(SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX)) {
props.put(COMMONS_CRYPTO_CONF_PREFIX + k.substring(
SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX.length()), v)
}
}
props
CryptoUtils.toCryptoConf(SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX,
conf.getAll.toMap.asJava.entrySet())
}
/**

View file

@ -24,6 +24,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.network.util.CryptoUtils
import org.apache.spark.security.CryptoStreamUtils._
import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
import org.apache.spark.storage.TempShuffleBlockId
@ -33,11 +34,11 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {
test("crypto configuration conversion") {
val sparkKey1 = s"${SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX}a.b.c"
val sparkVal1 = "val1"
val cryptoKey1 = s"${COMMONS_CRYPTO_CONF_PREFIX}a.b.c"
val cryptoKey1 = s"${CryptoUtils.COMMONS_CRYPTO_CONFIG_PREFIX}a.b.c"
val sparkKey2 = SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX.stripSuffix(".") + "A.b.c"
val sparkVal2 = "val2"
val cryptoKey2 = s"${COMMONS_CRYPTO_CONF_PREFIX}A.b.c"
val cryptoKey2 = s"${CryptoUtils.COMMONS_CRYPTO_CONFIG_PREFIX}A.b.c"
val conf = new SparkConf()
conf.set(sparkKey1, sparkVal1)
conf.set(sparkKey2, sparkVal2)

View file

@ -1558,14 +1558,15 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td><code>spark.authenticate.encryption.aes.enabled</code></td>
<td><code>spark.network.aes.enabled</code></td>
<td>false</td>
<td>
Enable AES for over-the-wire encryption
Enable AES for over-the-wire encryption. This is supported for RPC and the block transfer service.
This option has precedence over SASL-based encryption if both are enabled.
</td>
</tr>
<tr>
<td><code>spark.authenticate.encryption.aes.cipher.keySize</code></td>
<td><code>spark.network.aes.keySize</code></td>
<td>16</td>
<td>
The bytes of AES cipher key which is effective when AES cipher is enabled. AES
@ -1573,14 +1574,12 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td><code>spark.authenticate.encryption.aes.cipher.class</code></td>
<td>null</td>
<td><code>spark.network.aes.config.*</code></td>
<td>None</td>
<td>
Specify the underlying implementation class of crypto cipher. Set null here to use default.
In order to use OpenSslCipher users should install openssl. Currently, there are two cipher
classes available in Commons Crypto library:
org.apache.commons.crypto.cipher.OpenSslCipher
org.apache.commons.crypto.cipher.JceCipher
Configuration values for the commons-crypto library, such as which cipher implementations to
use. The config name should be the name of commons-crypto configuration without the
"commons.crypto" prefix.
</td>
</tr>
<tr>
@ -1658,7 +1657,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
</table>
#### Encryption
#### TLS / SSL
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>