[SPARK-34029][SQL][TESTS] Add OrcEncryptionSuite and FakeKeyProvider

### What changes were proposed in this pull request?

This is a retry of #31065 . Last time, the newly add test cases passed in Jenkins and individually, but it's reverted because they fail when `GitHub Action` runs with  `SERIAL_SBT_TESTS=1`.

In this PR, `SecurityTest` tag is used to isolate `KeyProvider`.

This PR aims to add a basis for columnar encryption test framework by add `OrcEncryptionSuite` and `FakeKeyProvider`.

Please note that we will improve more in both Apache Spark and Apache ORC in Apache Spark 3.2.0 timeframe.

### Why are the changes needed?

Apache ORC 1.6 supports columnar encryption.

### Does this PR introduce _any_ user-facing change?

No. This is for a test case.

### How was this patch tested?

Pass the newly added test suite.

Closes #31603 from dongjoon-hyun/SPARK-34486-RETRY.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-02-21 15:05:29 -08:00 committed by Dongjoon Hyun
parent 94f9617cb4
commit 03f4cf5845
6 changed files with 298 additions and 1 deletions

View file

@ -61,6 +61,12 @@ jobs:
excluded-tags: org.apache.spark.tags.SlowHiveTest
comment: "- other tests"
# SQL tests
- modules: sql
java: 8
hadoop: hadoop3.2
hive: hive2.3
included-tags: org.apache.spark.tags.SecurityTest
comment: "- security tests"
- modules: sql
java: 8
hadoop: hadoop3.2
@ -71,7 +77,7 @@ jobs:
java: 8
hadoop: hadoop3.2
hive: hive2.3
excluded-tags: org.apache.spark.tags.ExtendedSQLTest
excluded-tags: org.apache.spark.tags.SecurityTest,org.apache.spark.tags.ExtendedSQLTest
comment: "- other tests"
env:
MODULES_TO_TEST: ${{ matrix.modules }}

View file

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.tags;
import org.scalatest.TagAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@TagAnnotation
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface SecurityTest { }

View file

@ -489,6 +489,7 @@ object SparkParallelTestGrouping {
"org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite",
"org.apache.spark.sql.catalyst.expressions.CastSuite",
"org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite",
"org.apache.spark.sql.execution.datasources.orc.OrcEncryptionSuite",
"org.apache.spark.sql.hive.HiveExternalCatalogSuite",
"org.apache.spark.sql.hive.StatisticsSuite",
"org.apache.spark.sql.hive.client.VersionsSuite",

View file

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.org.apache.spark.sql.execution.datasources.orc;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
/**
* A Hadoop KeyProvider that lets us test the interaction
* with the Hadoop code.
*
* https://github.com/apache/orc/blob/rel/release-1.6.7/java/tools/src/test/org/apache/orc/impl/FakeKeyProvider.java
*
* This file intentionally keeps the original file except
* (1) package name, (2) import order, (3) a few indentation
*/
public class FakeKeyProvider extends KeyProvider {
// map from key name to metadata
private final Map<String, TestMetadata> keyMetdata = new HashMap<>();
// map from key version name to material
private final Map<String, KeyVersion> keyVersions = new HashMap<>();
public FakeKeyProvider(Configuration conf) {
super(conf);
}
@Override
public KeyVersion getKeyVersion(String name) {
return keyVersions.get(name);
}
@Override
public List<String> getKeys() {
return new ArrayList<>(keyMetdata.keySet());
}
@Override
public List<KeyVersion> getKeyVersions(String name) {
List<KeyVersion> result = new ArrayList<>();
Metadata meta = getMetadata(name);
for(int v=0; v < meta.getVersions(); ++v) {
String versionName = buildVersionName(name, v);
KeyVersion material = keyVersions.get(versionName);
if (material != null) {
result.add(material);
}
}
return result;
}
@Override
public Metadata getMetadata(String name) {
return keyMetdata.get(name);
}
@Override
public KeyVersion createKey(String name, byte[] bytes, Options options) {
String versionName = buildVersionName(name, 0);
keyMetdata.put(name, new TestMetadata(options.getCipher(),
options.getBitLength(), 1));
KeyVersion result = new KMSClientProvider.KMSKeyVersion(name, versionName, bytes);
keyVersions.put(versionName, result);
return result;
}
@Override
public void deleteKey(String name) {
throw new UnsupportedOperationException("Can't delete keys");
}
@Override
public KeyVersion rollNewVersion(String name, byte[] bytes) {
TestMetadata key = keyMetdata.get(name);
String versionName = buildVersionName(name, key.addVersion());
KeyVersion result = new KMSClientProvider.KMSKeyVersion(name, versionName,
bytes);
keyVersions.put(versionName, result);
return result;
}
@Override
public void flush() {
// Nothing
}
static class TestMetadata extends KeyProvider.Metadata {
TestMetadata(String cipher, int bitLength, int versions) {
super(cipher, bitLength, null, null, null, versions);
}
public int addVersion() {
return super.addVersion();
}
}
public static class Factory extends KeyProviderFactory {
@Override
public KeyProvider createProvider(URI uri, Configuration conf) throws IOException {
if ("test".equals(uri.getScheme())) {
KeyProvider provider = new FakeKeyProvider(conf);
// populate a couple keys into the provider
byte[] piiKey = new byte[]{0,1,2,3,4,5,6,7,8,9,0xa,0xb,0xc,0xd,0xe,0xf};
org.apache.hadoop.crypto.key.KeyProvider.Options aes128 = new KeyProvider.Options(conf);
provider.createKey("pii", piiKey, aes128);
byte[] piiKey2 = new byte[]{0x10,0x11,0x12,0x13,0x14,0x15,0x16,0x17,
0x18,0x19,0x1a,0x1b,0x1c,0x1d,0x1e,0x1f};
provider.rollNewVersion("pii", piiKey2);
byte[] secretKey = new byte[]{0x20,0x21,0x22,0x23,0x24,0x25,0x26,0x27,
0x28,0x29,0x2a,0x2b,0x2c,0x2d,0x2e,0x2f};
provider.createKey("secret", secretKey, aes128);
return KeyProviderCryptoExtension.createKeyProviderCryptoExtension(provider);
}
return null;
}
}
}

View file

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.org.apache.spark.sql.execution.datasources.orc.FakeKeyProvider$Factory

View file

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.orc
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.tags.SecurityTest
@SecurityTest
class OrcEncryptionSuite extends OrcTest with SharedSparkSession {
import testImplicits._
val originalData = Seq(("123456789", "dongjoon@apache.org", "Dongjoon Hyun"))
val rowDataWithoutKey =
Row(null, "841626795E7D351555B835A002E3BF10669DE9B81C95A3D59E10865AC37EA7C3", "Dongjoon Hyun")
test("Write and read an encrypted file") {
val df = originalData.toDF("ssn", "email", "name")
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(
"hadoop.security.key.provider.path" -> "test:///",
"orc.key.provider" -> "hadoop",
"orc.encrypt" -> "pii:ssn,email",
"orc.mask" -> "nullify:ssn;sha256:email") {
df.write.mode("overwrite").orc(path)
checkAnswer(spark.read.orc(path), df)
}
withSQLConf(
"orc.key.provider" -> "memory",
"orc.encrypt" -> "pii:ssn,email",
"orc.mask" -> "nullify:ssn;sha256:email") {
checkAnswer(spark.read.orc(path), rowDataWithoutKey)
}
}
}
test("Write and read an encrypted table") {
val df = originalData.toDF("ssn", "email", "name")
withTempPath { dir =>
val path = dir.getAbsolutePath
withTable("encrypted") {
sql(
s"""
|CREATE TABLE encrypted (
| ssn STRING,
| email STRING,
| name STRING
|)
|USING ORC
|LOCATION "$path"
|OPTIONS (
| hadoop.security.key.provider.path "test:///",
| orc.key.provider "hadoop",
| orc.encrypt "pii:ssn,email",
| orc.mask "nullify:ssn;sha256:email"
|)
|""".stripMargin)
sql("INSERT INTO encrypted VALUES('123456789', 'dongjoon@apache.org', 'Dongjoon Hyun')")
checkAnswer(sql("SELECT * FROM encrypted"), df)
}
withTable("normal") {
sql(
s"""
|CREATE TABLE normal (
| ssn STRING,
| email STRING,
| name STRING
|)
|USING ORC
|LOCATION "$path"
|OPTIONS (
| orc.key.provider "memory",
| orc.encrypt "pii:ssn,email",
| orc.mask "nullify:ssn;sha256:email"
|)
|""".stripMargin)
checkAnswer(sql("SELECT * FROM normal"), rowDataWithoutKey)
}
}
}
}