From 529b875901a91a03caeb73d9eb7b3008b552c736 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 29 Apr 2021 13:55:53 +0900 Subject: [PATCH] [SPARK-35226][SQL] Support refreshKrb5Config option in JDBC datasources ### What changes were proposed in this pull request? This PR proposes to introduce a new JDBC option `refreshKrb5Config` which allows to reflect the change of `krb5.conf`. ### Why are the changes needed? In the current master, JDBC datasources can't accept `refreshKrb5Config` which is defined in `Krb5LoginModule`. So even if we change the `krb5.conf` after establishing a connection, the change will not be reflected. The similar issue happens when we run multiple `*KrbIntegrationSuites` at the same time. `MiniKDC` starts and stops every KerberosIntegrationSuite and different port number is recorded to `krb5.conf`. Due to `SecureConnectionProvider.JDBCConfiguration` doesn't take `refreshKrb5Config`, KerberosIntegrationSuites except the first running one see the wrong port so those suites fail. You can easily confirm with the following command. ``` build/sbt -Phive Phive-thriftserver -Pdocker-integration-tests "testOnly org.apache.spark.sql.jdbc.*KrbIntegrationSuite" ``` ### Does this PR introduce _any_ user-facing change? Yes. Users can set `refreshKrb5Config` to refresh krb5 relevant configuration. ### How was this patch tested? New test. Closes #32344 from sarutak/kerberos-refresh-issue. Authored-by: Kousuke Saruta Signed-off-by: Kousuke Saruta --- docs/sql-data-sources-jdbc.md | 19 +++++++ .../sql/jdbc/DB2KrbIntegrationSuite.scala | 2 +- .../jdbc/DockerKrbJDBCIntegrationSuite.scala | 50 +++++++++++++++++++ .../sql/jdbc/MariaDBKrbIntegrationSuite.scala | 2 +- .../jdbc/PostgresKrbIntegrationSuite.scala | 2 +- .../datasources/jdbc/JDBCOptions.scala | 3 ++ .../connection/SecureConnectionProvider.scala | 9 ++-- 7 files changed, 81 insertions(+), 6 deletions(-) diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md index 3c96264098..90353ef998 100644 --- a/docs/sql-data-sources-jdbc.md +++ b/docs/sql-data-sources-jdbc.md @@ -211,6 +211,25 @@ the following case-insensitive options: Specifies kerberos principal name for the JDBC client. If both keytab and principal are defined then Spark tries to do kerberos authentication. + + + refreshKrb5Config + + This option controls whether the kerberos configuration is to be refreshed or not for the JDBC client before + establishing a new connection. Set to true if you want to refresh the configuration, otherwise set to false. + The default value is false. Note that if you set this option to true and try to establish multiple connections, + a race condition can occur. One possble situation would be like as follows. +
    +
  1. refreshKrb5Config flag is set with security context 1
  2. +
  3. A JDBC connection provider is used for the corresponding DBMS
  4. +
  5. The krb5.conf is modified but the JVM not yet realized that it must be reloaded
  6. +
  7. Spark authenticates successfully for security context 1
  8. +
  9. The JVM loads security context 2 from the modified krb5.conf
  10. +
  11. Spark restores the previously saved security context 1
  12. +
  13. The modified krb5.conf content just gone
  14. +
+ + Note that kerberos authentication with keytab is not always supported by the JDBC driver.
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala index 5cbe6fab18..f79809f355 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala @@ -81,7 +81,7 @@ class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { override protected def setAuthentication(keytabFile: String, principal: String): Unit = { val config = new SecureConnectionProvider.JDBCConfiguration( - Configuration.getConfiguration, "JaasClient", keytabFile, principal) + Configuration.getConfiguration, "JaasClient", keytabFile, principal, true) Configuration.setConfiguration(config) } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala index c20c006f3b..4a828ae7ee 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerKrbJDBCIntegrationSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.util.{SecurityUtils, Utils} abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite { private var kdc: MiniKdc = _ + private val KRB5_CONF_PROP = "java.security.krb5.conf" protected var entryPointDir: File = _ protected var initDbDir: File = _ protected val userName: String @@ -160,4 +161,53 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite assert(rows(0).getString(0) === "foo") assert(rows(0).getString(1) === "bar") } + + test("SPARK-35226: JDBCOption should accept refreshKrb5Config parameter") { + // This makes sure Spark must do authentication + Configuration.setConfiguration(null) + withTempDir { dir => + val dummyKrb5Conf = File.createTempFile("dummy", "krb5.conf", dir) + val origKrb5Conf = sys.props(KRB5_CONF_PROP) + try { + // Set dummy krb5.conf and refresh config so this assertion is expected to fail. + // The thrown exception is dependent on the actual JDBC driver class. + intercept[Exception] { + sys.props(KRB5_CONF_PROP) = dummyKrb5Conf.getAbsolutePath + spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("keytab", keytabFullPath) + .option("principal", principal) + .option("refreshKrb5Config", "true") + .option("query", "SELECT 1") + .load() + } + + // Set the authentic krb5.conf but doesn't refresh config + // so this assertion is expected to fail. + intercept[Exception] { + sys.props(KRB5_CONF_PROP) = origKrb5Conf + spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("keytab", keytabFullPath) + .option("principal", principal) + .option("query", "SELECT 1") + .load() + } + + sys.props(KRB5_CONF_PROP) = origKrb5Conf + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("keytab", keytabFullPath) + .option("principal", principal) + .option("refreshKrb5Config", "true") + .option("query", "SELECT 1") + .load() + val result = df.collect().map(_.getInt(0)) + assert(result.length === 1) + assert(result(0) === 1) + } finally { + sys.props(KRB5_CONF_PROP) = origKrb5Conf + } + } + } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala index 59a6f530af..9b653f81af 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala @@ -68,7 +68,7 @@ class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { override protected def setAuthentication(keytabFile: String, principal: String): Unit = { val config = new SecureConnectionProvider.JDBCConfiguration( - Configuration.getConfiguration, "Krb5ConnectorContext", keytabFile, principal) + Configuration.getConfiguration, "Krb5ConnectorContext", keytabFile, principal, true) Configuration.setConfiguration(config) } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala index 984890f22f..1198ba8a3e 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresKrbIntegrationSuite.scala @@ -61,7 +61,7 @@ class PostgresKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { override protected def setAuthentication(keytabFile: String, principal: String): Unit = { val config = new SecureConnectionProvider.JDBCConfiguration( - Configuration.getConfiguration, "pgjdbc", keytabFile, principal) + Configuration.getConfiguration, "pgjdbc", keytabFile, principal, true) Configuration.setConfiguration(config) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 85326891dc..97d4f2d976 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -205,6 +205,8 @@ class JDBCOptions( val principal = parameters.getOrElse(JDBC_PRINCIPAL, null) val tableComment = parameters.getOrElse(JDBC_TABLE_COMMENT, "").toString + + val refreshKrb5Config = parameters.getOrElse(JDBC_REFRESH_KRB5_CONFIG, "false").toBoolean } class JdbcOptionsInWrite( @@ -260,4 +262,5 @@ object JDBCOptions { val JDBC_KEYTAB = newOption("keytab") val JDBC_PRINCIPAL = newOption("principal") val JDBC_TABLE_COMMENT = newOption("tableComment") + val JDBC_REFRESH_KRB5_CONFIG = newOption("refreshKrb5Config") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala index 4138c72169..71c20e11f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala @@ -52,7 +52,8 @@ private[jdbc] abstract class SecureConnectionProvider extends BasicConnectionPro private[connection] def setAuthenticationConfig(driver: Driver, options: JDBCOptions) = { val parent = Configuration.getConfiguration val config = new SecureConnectionProvider.JDBCConfiguration( - parent, appEntry(driver, options), options.keytab, options.principal) + parent, appEntry(driver, options), options.keytab, + options.principal, options.refreshKrb5Config) logDebug("Adding database specific security configuration") Configuration.setConfiguration(config) } @@ -63,7 +64,8 @@ object SecureConnectionProvider { parent: Configuration, appEntry: String, keytab: String, - principal: String) extends Configuration { + principal: String, + refreshKrb5Config: Boolean) extends Configuration { val entry = new AppConfigurationEntry( SecurityUtils.getKrb5LoginModuleName(), @@ -73,7 +75,8 @@ object SecureConnectionProvider { "useKeyTab" -> "true", "keyTab" -> keytab, "principal" -> principal, - "debug" -> "true" + "debug" -> "true", + "refreshKrb5Config" -> refreshKrb5Config.toString ).asJava )