diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/aes/AesCipher.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/aes/AesCipher.java index 78034a69f7..340986a63b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/sasl/aes/AesCipher.java +++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/aes/AesCipher.java @@ -60,7 +60,7 @@ public class AesCipher { private final Properties properties; public AesCipher(AesConfigMessage configMessage, TransportConf conf) throws IOException { - this.properties = CryptoStreamUtils.toCryptoConf(conf); + this.properties = conf.cryptoConf(); this.inKeySpec = new SecretKeySpec(configMessage.inKey, "AES"); this.inIvSpec = new IvParameterSpec(configMessage.inIv); this.outKeySpec = new SecretKeySpec(configMessage.outKey, "AES"); @@ -105,7 +105,7 @@ public class AesCipher { */ public static AesConfigMessage createConfigMessage(TransportConf conf) { int keySize = conf.aesCipherKeySize(); - Properties properties = CryptoStreamUtils.toCryptoConf(conf); + Properties properties = conf.cryptoConf(); try { int paramLen = CryptoCipherFactory.getCryptoCipher(AesCipher.TRANSFORM, properties) @@ -128,19 +128,6 @@ public class AesCipher { } } - /** - * CryptoStreamUtils is used to convert config from TransportConf to AES Crypto config. - */ - private static class CryptoStreamUtils { - public static Properties toCryptoConf(TransportConf conf) { - Properties props = new Properties(); - if (conf.aesCipherClass() != null) { - props.setProperty(CryptoCipherFactory.CLASSES_KEY, conf.aesCipherClass()); - } - return props; - } - } - private static class AesEncryptHandler extends ChannelOutboundHandlerAdapter { private final ByteArrayWritableChannel byteChannel; private final CryptoOutputStream cos; diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/ConfigProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/ConfigProvider.java index d944d9da1c..f6aef499b2 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/ConfigProvider.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/ConfigProvider.java @@ -17,6 +17,7 @@ package org.apache.spark.network.util; +import java.util.Map; import java.util.NoSuchElementException; /** @@ -26,6 +27,9 @@ public abstract class ConfigProvider { /** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. */ public abstract String get(String name); + /** Returns all the config values in the provider. */ + public abstract Iterable> getAll(); + public String get(String name, String defaultValue) { try { return get(name); @@ -49,4 +53,5 @@ public abstract class ConfigProvider { public boolean getBoolean(String name, boolean defaultValue) { return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue))); } + } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/CryptoUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/CryptoUtils.java new file mode 100644 index 0000000000..a6d8358ee9 --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/util/CryptoUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import java.util.Map; +import java.util.Properties; + +/** + * Utility methods related to the commons-crypto library. + */ +public class CryptoUtils { + + // The prefix for the configurations passing to Apache Commons Crypto library. + public static final String COMMONS_CRYPTO_CONFIG_PREFIX = "commons.crypto."; + + /** + * Extract the commons-crypto configuration embedded in a list of config values. + * + * @param prefix Prefix in the given configuration that identifies the commons-crypto configs. + * @param conf List of configuration values. + */ + public static Properties toCryptoConf(String prefix, Iterable> conf) { + Properties props = new Properties(); + for (Map.Entry e : conf) { + String key = e.getKey(); + if (key.startsWith(prefix)) { + props.setProperty(COMMONS_CRYPTO_CONFIG_PREFIX + key.substring(prefix.length()), + e.getValue()); + } + } + return props; + } + +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java index 668d2356b9..b6667998b5 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java @@ -19,11 +19,16 @@ package org.apache.spark.network.util; import com.google.common.collect.Maps; +import java.util.Collections; import java.util.Map; import java.util.NoSuchElementException; /** ConfigProvider based on a Map (copied in the constructor). */ public class MapConfigProvider extends ConfigProvider { + + public static final MapConfigProvider EMPTY = new MapConfigProvider( + Collections.emptyMap()); + private final Map config; public MapConfigProvider(Map config) { @@ -38,4 +43,10 @@ public class MapConfigProvider extends ConfigProvider { } return value; } + + @Override + public Iterable> getAll() { + return config.entrySet(); + } + } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java deleted file mode 100644 index f15ec8d294..0000000000 --- a/common/network-common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.util; - -import java.util.NoSuchElementException; - -/** Uses System properties to obtain config values. */ -public class SystemPropertyConfigProvider extends ConfigProvider { - @Override - public String get(String name) { - String value = System.getProperty(name); - if (value == null) { - throw new NoSuchElementException(name); - } - return value; - } -} diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index fa050363b1..223d6d88de 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -17,6 +17,8 @@ package org.apache.spark.network.util; +import java.util.Properties; + import com.google.common.primitives.Ints; /** @@ -174,21 +176,22 @@ public class TransportConf { * The trigger for enabling AES encryption. */ public boolean aesEncryptionEnabled() { - return conf.getBoolean("spark.authenticate.encryption.aes.enabled", false); + return conf.getBoolean("spark.network.aes.enabled", false); } /** - * The implementation class for crypto cipher - */ - public String aesCipherClass() { - return conf.get("spark.authenticate.encryption.aes.cipher.class", null); - } - - /** - * The bytes of AES cipher key which is effective when AES cipher is enabled. Notice that - * the length should be 16, 24 or 32 bytes. + * The key size to use when AES cipher is enabled. Notice that the length should be 16, 24 or 32 + * bytes. */ public int aesCipherKeySize() { - return conf.getInt("spark.authenticate.encryption.aes.cipher.keySize", 16); + return conf.getInt("spark.network.aes.keySize", 16); } + + /** + * The commons-crypto configuration for the module. + */ + public Properties cryptoConf() { + return CryptoUtils.toCryptoConf("spark.network.aes.config.", conf.getAll()); + } + } diff --git a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java index 6d62eaf35d..5bb8819132 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java @@ -48,7 +48,7 @@ import org.apache.spark.network.client.TransportClientFactory; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.StreamManager; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; public class ChunkFetchIntegrationSuite { @@ -87,7 +87,7 @@ public class ChunkFetchIntegrationSuite { Closeables.close(fp, shouldSuppressIOException); } - final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + final TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY); fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25); streamManager = new StreamManager() { diff --git a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java index a7a99f3bfc..8ff737b129 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java @@ -42,7 +42,7 @@ import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.util.JavaUtils; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; public class RpcIntegrationSuite { @@ -53,7 +53,7 @@ public class RpcIntegrationSuite { @BeforeClass public static void setUp() throws Exception { - TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY); rpcHandler = new RpcHandler() { @Override public void receive( diff --git a/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java b/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java index 9c49556927..f253a07e64 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java @@ -47,7 +47,7 @@ import org.apache.spark.network.client.TransportClientFactory; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.server.TransportServer; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; public class StreamSuite { @@ -91,7 +91,7 @@ public class StreamSuite { fp.close(); } - final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + final TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY); final StreamManager streamManager = new StreamManager() { @Override public ManagedBuffer getChunk(long streamId, int chunkIndex) { diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java index 44d16d5422..b594fa66e5 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java @@ -40,7 +40,7 @@ import org.apache.spark.network.server.NoOpRpcHandler; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.util.ConfigProvider; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; @@ -53,7 +53,7 @@ public class TransportClientFactorySuite { @Before public void setUp() { - conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + conf = new TransportConf("shuffle", MapConfigProvider.EMPTY); RpcHandler rpcHandler = new NoOpRpcHandler(); context = new TransportContext(conf, rpcHandler); server1 = context.createServer(); @@ -199,6 +199,11 @@ public class TransportClientFactorySuite { } return value; } + + @Override + public Iterable> getAll() { + throw new UnsupportedOperationException(); + } }); TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true); TransportClientFactory factory = context.createClientFactory(); diff --git a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java index ef2ab34b22..e27301f49e 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java @@ -24,7 +24,9 @@ import java.io.File; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; @@ -32,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.security.sasl.SaslException; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.ByteStreams; import com.google.common.io.Files; @@ -60,7 +63,7 @@ import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.util.ByteArrayWritableChannel; import org.apache.spark.network.util.JavaUtils; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; /** @@ -225,7 +228,7 @@ public class SparkSaslSuite { public void testEncryptedMessageChunking() throws Exception { File file = File.createTempFile("sasltest", ".txt"); try { - TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY); byte[] data = new byte[8 * 1024]; new Random().nextBytes(data); @@ -253,14 +256,14 @@ public class SparkSaslSuite { @Test public void testFileRegionEncryption() throws Exception { - final String blockSizeConf = "spark.network.sasl.maxEncryptedBlockSize"; - System.setProperty(blockSizeConf, "1k"); + final Map testConf = ImmutableMap.of( + "spark.network.sasl.maxEncryptedBlockSize", "1k"); final AtomicReference response = new AtomicReference<>(); final File file = File.createTempFile("sasltest", ".txt"); SaslTestCtx ctx = null; try { - final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + final TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(testConf)); StreamManager sm = mock(StreamManager.class); when(sm.getChunk(anyLong(), anyInt())).thenAnswer(new Answer() { @Override @@ -276,7 +279,7 @@ public class SparkSaslSuite { new Random().nextBytes(data); Files.write(data, file); - ctx = new SaslTestCtx(rpcHandler, true, false, false); + ctx = new SaslTestCtx(rpcHandler, true, false, false, testConf); final CountDownLatch lock = new CountDownLatch(1); @@ -307,18 +310,15 @@ public class SparkSaslSuite { if (response.get() != null) { response.get().release(); } - System.clearProperty(blockSizeConf); } } @Test public void testServerAlwaysEncrypt() throws Exception { - final String alwaysEncryptConfName = "spark.network.sasl.serverAlwaysEncrypt"; - System.setProperty(alwaysEncryptConfName, "true"); - SaslTestCtx ctx = null; try { - ctx = new SaslTestCtx(mock(RpcHandler.class), false, false, false); + ctx = new SaslTestCtx(mock(RpcHandler.class), false, false, false, + ImmutableMap.of("spark.network.sasl.serverAlwaysEncrypt", "true")); fail("Should have failed to connect without encryption."); } catch (Exception e) { assertTrue(e.getCause() instanceof SaslException); @@ -326,7 +326,6 @@ public class SparkSaslSuite { if (ctx != null) { ctx.close(); } - System.clearProperty(alwaysEncryptConfName); } } @@ -381,7 +380,7 @@ public class SparkSaslSuite { final File file = File.createTempFile("sasltest", ".txt"); SaslTestCtx ctx = null; try { - final TransportConf conf = new TransportConf("rpc", new SystemPropertyConfigProvider()); + final TransportConf conf = new TransportConf("rpc", MapConfigProvider.EMPTY); final TransportConf spyConf = spy(conf); doReturn(true).when(spyConf).aesEncryptionEnabled(); @@ -454,7 +453,19 @@ public class SparkSaslSuite { boolean aesEnable) throws Exception { - TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + this(rpcHandler, encrypt, disableClientEncryption, aesEnable, + Collections.emptyMap()); + } + + SaslTestCtx( + RpcHandler rpcHandler, + boolean encrypt, + boolean disableClientEncryption, + boolean aesEnable, + Map testConf) + throws Exception { + + TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(testConf)); if (aesEnable) { conf = spy(conf); diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/CryptoUtilsSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/CryptoUtilsSuite.java new file mode 100644 index 0000000000..2b45d1e397 --- /dev/null +++ b/common/network-common/src/test/java/org/apache/spark/network/util/CryptoUtilsSuite.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import java.util.Map; +import java.util.Properties; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; +import static org.junit.Assert.*; + +public class CryptoUtilsSuite { + + @Test + public void testConfConversion() { + String prefix = "my.prefix.commons.config."; + + String confKey1 = prefix + "a.b.c"; + String confVal1 = "val1"; + String cryptoKey1 = CryptoUtils.COMMONS_CRYPTO_CONFIG_PREFIX + "a.b.c"; + + String confKey2 = prefix.substring(0, prefix.length() - 1) + "A.b.c"; + String confVal2 = "val2"; + String cryptoKey2 = CryptoUtils.COMMONS_CRYPTO_CONFIG_PREFIX + "A.b.c"; + + Map conf = ImmutableMap.of( + confKey1, confVal1, + confKey2, confVal2); + + Properties cryptoConf = CryptoUtils.toCryptoConf(prefix, conf.entrySet()); + + assertEquals(confVal1, cryptoConf.getProperty(cryptoKey1)); + assertFalse(cryptoConf.containsKey(cryptoKey2)); + } + +} diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 6ba937dddb..298a487ebb 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -55,7 +55,7 @@ import org.apache.spark.network.shuffle.protocol.OpenBlocks; import org.apache.spark.network.shuffle.protocol.RegisterExecutor; import org.apache.spark.network.shuffle.protocol.StreamHandle; import org.apache.spark.network.util.JavaUtils; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; public class SaslIntegrationSuite { @@ -73,7 +73,7 @@ public class SaslIntegrationSuite { @BeforeClass public static void beforeAll() throws IOException { - conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + conf = new TransportConf("shuffle", MapConfigProvider.EMPTY); context = new TransportContext(conf, new TestRpcHandler()); secretKeyHolder = mock(SecretKeyHolder.class); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 35d6346474..bc97594903 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -25,7 +25,7 @@ import java.nio.charset.StandardCharsets; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.CharStreams; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId; import org.junit.AfterClass; @@ -42,7 +42,7 @@ public class ExternalShuffleBlockResolverSuite { private static TestShuffleDataContext dataContext; private static final TransportConf conf = - new TransportConf("shuffle", new SystemPropertyConfigProvider()); + new TransportConf("shuffle", MapConfigProvider.EMPTY); @BeforeClass public static void beforeAll() throws IOException { diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index bdd218db69..7757500b41 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -29,14 +29,14 @@ import org.junit.Test; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; public class ExternalShuffleCleanupSuite { // Same-thread Executor used to ensure cleanup happens synchronously in test thread. private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); - private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + private TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY); private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"; @Test diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 552b5366c5..8dd97b29eb 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.After; @@ -43,7 +44,7 @@ import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; public class ExternalShuffleIntegrationSuite { @@ -84,7 +85,7 @@ public class ExternalShuffleIntegrationSuite { dataContext0.create(); dataContext0.insertSortShuffleData(0, 0, exec0Blocks); - conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + conf = new TransportConf("shuffle", MapConfigProvider.EMPTY); handler = new ExternalShuffleBlockHandler(conf, null); TransportContext transportContext = new TransportContext(conf, handler); server = transportContext.createServer(); @@ -115,12 +116,16 @@ public class ExternalShuffleIntegrationSuite { // Fetch a set of blocks from a pre-registered executor. private FetchResult fetchBlocks(String execId, String[] blockIds) throws Exception { - return fetchBlocks(execId, blockIds, server.getPort()); + return fetchBlocks(execId, blockIds, conf, server.getPort()); } // Fetch a set of blocks from a pre-registered executor. Connects to the server on the given port, // to allow connecting to invalid servers. - private FetchResult fetchBlocks(String execId, String[] blockIds, int port) throws Exception { + private FetchResult fetchBlocks( + String execId, + String[] blockIds, + TransportConf clientConf, + int port) throws Exception { final FetchResult res = new FetchResult(); res.successBlocks = Collections.synchronizedSet(new HashSet()); res.failedBlocks = Collections.synchronizedSet(new HashSet()); @@ -128,7 +133,7 @@ public class ExternalShuffleIntegrationSuite { final Semaphore requestsRemaining = new Semaphore(0); - ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false); + ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, false); client.init(APP_ID); client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, new BlockFetchingListener() { @@ -227,16 +232,13 @@ public class ExternalShuffleIntegrationSuite { @Test public void testFetchNoServer() throws Exception { - System.setProperty("spark.shuffle.io.maxRetries", "0"); - try { - registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER)); - FetchResult execFetch = fetchBlocks("exec-0", - new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, 1 /* port */); - assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks); - } finally { - System.clearProperty("spark.shuffle.io.maxRetries"); - } + TransportConf clientConf = new TransportConf("shuffle", + new MapConfigProvider(ImmutableMap.of("spark.shuffle.io.maxRetries", "0"))); + registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER)); + FetchResult execFetch = fetchBlocks("exec-0", + new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, clientConf, 1 /* port */); + assertTrue(execFetch.successBlocks.isEmpty()); + assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks); } private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index a0f69ca29a..aed25a161e 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -33,12 +33,12 @@ import org.apache.spark.network.sasl.SecretKeyHolder; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; public class ExternalShuffleSecuritySuite { - TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY); TransportServer server; @Before diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 91882e3b3b..a2509f5f34 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -27,8 +27,6 @@ import java.util.Map; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -39,7 +37,7 @@ import static org.mockito.Mockito.*; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; -import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchStarter; @@ -53,18 +51,6 @@ public class RetryingBlockFetcherSuite { ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19])); - @Before - public void beforeEach() { - System.setProperty("spark.shuffle.io.maxRetries", "2"); - System.setProperty("spark.shuffle.io.retryWait", "0"); - } - - @After - public void afterEach() { - System.clearProperty("spark.shuffle.io.maxRetries"); - System.clearProperty("spark.shuffle.io.retryWait"); - } - @Test public void testNoFailures() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); @@ -254,7 +240,10 @@ public class RetryingBlockFetcherSuite { BlockFetchingListener listener) throws IOException { - TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of( + "spark.shuffle.io.maxRetries", "2", + "spark.shuffle.io.retryWait", "0")); + TransportConf conf = new TransportConf("shuffle", provider); BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class); Stubber stub = null; diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java index 884861752e..62a6cca4ed 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java @@ -17,6 +17,7 @@ package org.apache.spark.network.yarn.util; +import java.util.Map; import java.util.NoSuchElementException; import org.apache.hadoop.conf.Configuration; @@ -39,4 +40,10 @@ public class HadoopConfigProvider extends ConfigProvider { } return value; } + + @Override + public Iterable> getAll() { + return conf; + } + } diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala index 86874e2067..df520f804b 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala @@ -17,6 +17,8 @@ package org.apache.spark.network.netty +import scala.collection.JavaConverters._ + import org.apache.spark.SparkConf import org.apache.spark.network.util.{ConfigProvider, TransportConf} @@ -58,6 +60,10 @@ object SparkTransportConf { new TransportConf(module, new ConfigProvider { override def get(name: String): String = conf.get(name) + + override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = { + conf.getAll.toMap.asJava.entrySet() + } }) } diff --git a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala index 8e3436f134..cdd3b8d851 100644 --- a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala +++ b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala @@ -21,12 +21,15 @@ import java.util.Properties import javax.crypto.KeyGenerator import javax.crypto.spec.{IvParameterSpec, SecretKeySpec} +import scala.collection.JavaConverters._ + import org.apache.commons.crypto.random._ import org.apache.commons.crypto.stream._ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.network.util.CryptoUtils /** * A util class for manipulating IO encryption and decryption streams. @@ -37,8 +40,6 @@ private[spark] object CryptoStreamUtils extends Logging { val IV_LENGTH_IN_BYTES = 16 // The prefix of IO encryption related configurations in Spark configuration. val SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX = "spark.io.encryption.commons.config." - // The prefix for the configurations passing to Apache Commons Crypto library. - val COMMONS_CRYPTO_CONF_PREFIX = "commons.crypto." /** * Helper method to wrap `OutputStream` with `CryptoOutputStream` for encryption. @@ -70,18 +71,9 @@ private[spark] object CryptoStreamUtils extends Logging { new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) } - /** - * Get Commons-crypto configurations from Spark configurations identified by prefix. - */ def toCryptoConf(conf: SparkConf): Properties = { - val props = new Properties() - conf.getAll.foreach { case (k, v) => - if (k.startsWith(SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX)) { - props.put(COMMONS_CRYPTO_CONF_PREFIX + k.substring( - SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX.length()), v) - } - } - props + CryptoUtils.toCryptoConf(SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX, + conf.getAll.toMap.asJava.entrySet()) } /** diff --git a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala index a61ec74c7d..0f3a4a0361 100644 --- a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala @@ -24,6 +24,7 @@ import com.google.common.io.ByteStreams import org.apache.spark._ import org.apache.spark.internal.config._ +import org.apache.spark.network.util.CryptoUtils import org.apache.spark.security.CryptoStreamUtils._ import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.storage.TempShuffleBlockId @@ -33,11 +34,11 @@ class CryptoStreamUtilsSuite extends SparkFunSuite { test("crypto configuration conversion") { val sparkKey1 = s"${SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX}a.b.c" val sparkVal1 = "val1" - val cryptoKey1 = s"${COMMONS_CRYPTO_CONF_PREFIX}a.b.c" + val cryptoKey1 = s"${CryptoUtils.COMMONS_CRYPTO_CONFIG_PREFIX}a.b.c" val sparkKey2 = SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX.stripSuffix(".") + "A.b.c" val sparkVal2 = "val2" - val cryptoKey2 = s"${COMMONS_CRYPTO_CONF_PREFIX}A.b.c" + val cryptoKey2 = s"${CryptoUtils.COMMONS_CRYPTO_CONFIG_PREFIX}A.b.c" val conf = new SparkConf() conf.set(sparkKey1, sparkVal1) conf.set(sparkKey2, sparkVal2) diff --git a/docs/configuration.md b/docs/configuration.md index b1e731182f..7e466d7dc1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1558,14 +1558,15 @@ Apart from these, the following properties are also available, and may be useful - spark.authenticate.encryption.aes.enabled + spark.network.aes.enabled false - Enable AES for over-the-wire encryption + Enable AES for over-the-wire encryption. This is supported for RPC and the block transfer service. + This option has precedence over SASL-based encryption if both are enabled. - spark.authenticate.encryption.aes.cipher.keySize + spark.network.aes.keySize 16 The bytes of AES cipher key which is effective when AES cipher is enabled. AES @@ -1573,14 +1574,12 @@ Apart from these, the following properties are also available, and may be useful - spark.authenticate.encryption.aes.cipher.class - null + spark.network.aes.config.* + None - Specify the underlying implementation class of crypto cipher. Set null here to use default. - In order to use OpenSslCipher users should install openssl. Currently, there are two cipher - classes available in Commons Crypto library: - org.apache.commons.crypto.cipher.OpenSslCipher - org.apache.commons.crypto.cipher.JceCipher + Configuration values for the commons-crypto library, such as which cipher implementations to + use. The config name should be the name of commons-crypto configuration without the + "commons.crypto" prefix. @@ -1658,7 +1657,7 @@ Apart from these, the following properties are also available, and may be useful -#### Encryption +#### TLS / SSL
Property NameDefaultMeaning