[SPARK-35226][SQL] Support refreshKrb5Config option in JDBC datasources

### What changes were proposed in this pull request?

This PR proposes to introduce a new JDBC option `refreshKrb5Config` which allows to reflect the change of `krb5.conf`.

### Why are the changes needed?

In the current master, JDBC datasources can't accept `refreshKrb5Config` which is defined in `Krb5LoginModule`.
So even if we change the `krb5.conf` after establishing a connection, the change will not be reflected.

The similar issue happens when we run multiple `*KrbIntegrationSuites` at the same time.
`MiniKDC` starts and stops every KerberosIntegrationSuite and different port number is recorded to `krb5.conf`.
Due to `SecureConnectionProvider.JDBCConfiguration` doesn't take `refreshKrb5Config`, KerberosIntegrationSuites except the first running one see the wrong port so those suites fail.
You can easily confirm with the following command.
```
build/sbt -Phive Phive-thriftserver -Pdocker-integration-tests "testOnly org.apache.spark.sql.jdbc.*KrbIntegrationSuite"
```
### Does this PR introduce _any_ user-facing change?

Yes. Users can set `refreshKrb5Config` to refresh krb5 relevant configuration.

### How was this patch tested?

New test.

Closes #32344 from sarutak/kerberos-refresh-issue.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
This commit is contained in:
Kousuke Saruta 2021-04-29 13:55:53 +09:00
parent 771356555c
commit 529b875901
7 changed files with 81 additions and 6 deletions

View file

@ -211,6 +211,25 @@ the following case-insensitive options:
Specifies kerberos principal name for the JDBC client. If both <code>keytab</code> and <code>principal</code> are defined then Spark tries to do kerberos authentication.
</td>
</tr>
<tr>
<td><code>refreshKrb5Config</code></td>
<td>
This option controls whether the kerberos configuration is to be refreshed or not for the JDBC client before
establishing a new connection. Set to true if you want to refresh the configuration, otherwise set to false.
The default value is false. Note that if you set this option to true and try to establish multiple connections,
a race condition can occur. One possble situation would be like as follows.
<ol>
<li>refreshKrb5Config flag is set with security context 1</li>
<li>A JDBC connection provider is used for the corresponding DBMS</li>
<li>The krb5.conf is modified but the JVM not yet realized that it must be reloaded</li>
<li>Spark authenticates successfully for security context 1</li>
<li>The JVM loads security context 2 from the modified krb5.conf</li>
<li>Spark restores the previously saved security context 1</li>
<li>The modified krb5.conf content just gone</li>
</ol>
</td>
</tr>
</table>
Note that kerberos authentication with keytab is not always supported by the JDBC driver.<br>

View file

@ -81,7 +81,7 @@ class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
override protected def setAuthentication(keytabFile: String, principal: String): Unit = {
val config = new SecureConnectionProvider.JDBCConfiguration(
Configuration.getConfiguration, "JaasClient", keytabFile, principal)
Configuration.getConfiguration, "JaasClient", keytabFile, principal, true)
Configuration.setConfiguration(config)
}

View file

@ -32,6 +32,7 @@ import org.apache.spark.util.{SecurityUtils, Utils}
abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite {
private var kdc: MiniKdc = _
private val KRB5_CONF_PROP = "java.security.krb5.conf"
protected var entryPointDir: File = _
protected var initDbDir: File = _
protected val userName: String
@ -160,4 +161,53 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite
assert(rows(0).getString(0) === "foo")
assert(rows(0).getString(1) === "bar")
}
test("SPARK-35226: JDBCOption should accept refreshKrb5Config parameter") {
// This makes sure Spark must do authentication
Configuration.setConfiguration(null)
withTempDir { dir =>
val dummyKrb5Conf = File.createTempFile("dummy", "krb5.conf", dir)
val origKrb5Conf = sys.props(KRB5_CONF_PROP)
try {
// Set dummy krb5.conf and refresh config so this assertion is expected to fail.
// The thrown exception is dependent on the actual JDBC driver class.
intercept[Exception] {
sys.props(KRB5_CONF_PROP) = dummyKrb5Conf.getAbsolutePath
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("keytab", keytabFullPath)
.option("principal", principal)
.option("refreshKrb5Config", "true")
.option("query", "SELECT 1")
.load()
}
// Set the authentic krb5.conf but doesn't refresh config
// so this assertion is expected to fail.
intercept[Exception] {
sys.props(KRB5_CONF_PROP) = origKrb5Conf
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("keytab", keytabFullPath)
.option("principal", principal)
.option("query", "SELECT 1")
.load()
}
sys.props(KRB5_CONF_PROP) = origKrb5Conf
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("keytab", keytabFullPath)
.option("principal", principal)
.option("refreshKrb5Config", "true")
.option("query", "SELECT 1")
.load()
val result = df.collect().map(_.getInt(0))
assert(result.length === 1)
assert(result(0) === 1)
} finally {
sys.props(KRB5_CONF_PROP) = origKrb5Conf
}
}
}
}

View file

@ -68,7 +68,7 @@ class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
override protected def setAuthentication(keytabFile: String, principal: String): Unit = {
val config = new SecureConnectionProvider.JDBCConfiguration(
Configuration.getConfiguration, "Krb5ConnectorContext", keytabFile, principal)
Configuration.getConfiguration, "Krb5ConnectorContext", keytabFile, principal, true)
Configuration.setConfiguration(config)
}
}

View file

@ -61,7 +61,7 @@ class PostgresKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
override protected def setAuthentication(keytabFile: String, principal: String): Unit = {
val config = new SecureConnectionProvider.JDBCConfiguration(
Configuration.getConfiguration, "pgjdbc", keytabFile, principal)
Configuration.getConfiguration, "pgjdbc", keytabFile, principal, true)
Configuration.setConfiguration(config)
}
}

View file

@ -205,6 +205,8 @@ class JDBCOptions(
val principal = parameters.getOrElse(JDBC_PRINCIPAL, null)
val tableComment = parameters.getOrElse(JDBC_TABLE_COMMENT, "").toString
val refreshKrb5Config = parameters.getOrElse(JDBC_REFRESH_KRB5_CONFIG, "false").toBoolean
}
class JdbcOptionsInWrite(
@ -260,4 +262,5 @@ object JDBCOptions {
val JDBC_KEYTAB = newOption("keytab")
val JDBC_PRINCIPAL = newOption("principal")
val JDBC_TABLE_COMMENT = newOption("tableComment")
val JDBC_REFRESH_KRB5_CONFIG = newOption("refreshKrb5Config")
}

View file

@ -52,7 +52,8 @@ private[jdbc] abstract class SecureConnectionProvider extends BasicConnectionPro
private[connection] def setAuthenticationConfig(driver: Driver, options: JDBCOptions) = {
val parent = Configuration.getConfiguration
val config = new SecureConnectionProvider.JDBCConfiguration(
parent, appEntry(driver, options), options.keytab, options.principal)
parent, appEntry(driver, options), options.keytab,
options.principal, options.refreshKrb5Config)
logDebug("Adding database specific security configuration")
Configuration.setConfiguration(config)
}
@ -63,7 +64,8 @@ object SecureConnectionProvider {
parent: Configuration,
appEntry: String,
keytab: String,
principal: String) extends Configuration {
principal: String,
refreshKrb5Config: Boolean) extends Configuration {
val entry =
new AppConfigurationEntry(
SecurityUtils.getKrb5LoginModuleName(),
@ -73,7 +75,8 @@ object SecureConnectionProvider {
"useKeyTab" -> "true",
"keyTab" -> keytab,
"principal" -> principal,
"debug" -> "true"
"debug" -> "true",
"refreshKrb5Config" -> refreshKrb5Config.toString
).asJava
)