[SPARK-30240][CORE] Support HTTP redirects directly to a proxy server
### What changes were proposed in this pull request? The PR adds a new config option to configure an address for the proxy server, and a new handler that intercepts redirects and replaces the URL with one pointing at the proxy server. This is needed on top of the "proxy base path" support because redirects use full URLs, not just absolute paths from the server's root. ### Why are the changes needed? Spark's web UI has support for generating links to paths with a prefix, to support a proxy server, but those do not apply when the UI is responding with redirects. In that case, Spark is sending its own URL back to the client, and if it's behind a dumb proxy server that doesn't do rewriting (like when using stunnel for HTTPS support) then the client will see the wrong URL and may fail. ### Does this PR introduce any user-facing change? Yes. It's a new UI option. ### How was this patch tested? Tested with added unit test, with Spark behind stunnel, and in a more complicated app using a different HTTPS proxy. Closes #26873 from vanzin/SPARK-30240. Authored-by: Marcelo Vanzin <vanzin@cloudera.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
parent
fb2f5a4906
commit
a9fbd31030
|
@ -247,6 +247,16 @@ private[spark] object TestUtils {
|
||||||
url: URL,
|
url: URL,
|
||||||
method: String = "GET",
|
method: String = "GET",
|
||||||
headers: Seq[(String, String)] = Nil): Int = {
|
headers: Seq[(String, String)] = Nil): Int = {
|
||||||
|
withHttpConnection(url, method, headers = headers) { connection =>
|
||||||
|
connection.getResponseCode()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def withHttpConnection[T](
|
||||||
|
url: URL,
|
||||||
|
method: String = "GET",
|
||||||
|
headers: Seq[(String, String)] = Nil)
|
||||||
|
(fn: HttpURLConnection => T): T = {
|
||||||
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
|
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
|
||||||
connection.setRequestMethod(method)
|
connection.setRequestMethod(method)
|
||||||
headers.foreach { case (k, v) => connection.setRequestProperty(k, v) }
|
headers.foreach { case (k, v) => connection.setRequestProperty(k, v) }
|
||||||
|
@ -271,7 +281,7 @@ private[spark] object TestUtils {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
connection.connect()
|
connection.connect()
|
||||||
connection.getResponseCode()
|
fn(connection)
|
||||||
} finally {
|
} finally {
|
||||||
connection.disconnect()
|
connection.disconnect()
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,6 +150,11 @@ private[spark] object UI {
|
||||||
.stringConf
|
.stringConf
|
||||||
.createWithDefault("org.apache.spark.security.ShellBasedGroupsMappingProvider")
|
.createWithDefault("org.apache.spark.security.ShellBasedGroupsMappingProvider")
|
||||||
|
|
||||||
|
val PROXY_REDIRECT_URI = ConfigBuilder("spark.ui.proxyRedirectUri")
|
||||||
|
.doc("Proxy address to use when responding with HTTP redirects.")
|
||||||
|
.stringConf
|
||||||
|
.createOptional
|
||||||
|
|
||||||
val CUSTOM_EXECUTOR_LOG_URL = ConfigBuilder("spark.ui.custom.executor.log.url")
|
val CUSTOM_EXECUTOR_LOG_URL = ConfigBuilder("spark.ui.custom.executor.log.url")
|
||||||
.doc("Specifies custom spark executor log url for supporting external log service instead of " +
|
.doc("Specifies custom spark executor log url for supporting external log service instead of " +
|
||||||
"using cluster managers' application log urls in the Spark UI. Spark will support " +
|
"using cluster managers' application log urls in the Spark UI. Spark will support " +
|
||||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.ui
|
||||||
import java.net.{URI, URL}
|
import java.net.{URI, URL}
|
||||||
import java.util.EnumSet
|
import java.util.EnumSet
|
||||||
import javax.servlet.DispatcherType
|
import javax.servlet.DispatcherType
|
||||||
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
|
import javax.servlet.http._
|
||||||
|
|
||||||
import scala.language.implicitConversions
|
import scala.language.implicitConversions
|
||||||
import scala.xml.Node
|
import scala.xml.Node
|
||||||
|
@ -259,7 +259,15 @@ private[spark] object JettyUtils extends Logging {
|
||||||
server.addBean(errorHandler)
|
server.addBean(errorHandler)
|
||||||
|
|
||||||
val collection = new ContextHandlerCollection
|
val collection = new ContextHandlerCollection
|
||||||
server.setHandler(collection)
|
conf.get(PROXY_REDIRECT_URI) match {
|
||||||
|
case Some(proxyUri) =>
|
||||||
|
val proxyHandler = new ProxyRedirectHandler(proxyUri)
|
||||||
|
proxyHandler.setHandler(collection)
|
||||||
|
server.setHandler(proxyHandler)
|
||||||
|
|
||||||
|
case _ =>
|
||||||
|
server.setHandler(collection)
|
||||||
|
}
|
||||||
|
|
||||||
// Executor used to create daemon threads for the Jetty connectors.
|
// Executor used to create daemon threads for the Jetty connectors.
|
||||||
val serverExecutor = new ScheduledExecutorScheduler(s"$serverName-JettyScheduler", true)
|
val serverExecutor = new ScheduledExecutorScheduler(s"$serverName-JettyScheduler", true)
|
||||||
|
@ -526,3 +534,51 @@ private[spark] case class ServerInfo(
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Jetty handler to handle redirects to a proxy server. It intercepts redirects and rewrites the
|
||||||
|
* location to point to the proxy server.
|
||||||
|
*
|
||||||
|
* The handler needs to be set as the server's handler, because Jetty sometimes generates redirects
|
||||||
|
* before invoking any servlet handlers or filters. One of such cases is when asking for the root of
|
||||||
|
* a servlet context without the trailing slash (e.g. "/jobs") - Jetty will send a redirect to the
|
||||||
|
* same URL, but with a trailing slash.
|
||||||
|
*/
|
||||||
|
private class ProxyRedirectHandler(_proxyUri: String) extends HandlerWrapper {
|
||||||
|
|
||||||
|
private val proxyUri = _proxyUri.stripSuffix("/")
|
||||||
|
|
||||||
|
override def handle(
|
||||||
|
target: String,
|
||||||
|
baseRequest: Request,
|
||||||
|
request: HttpServletRequest,
|
||||||
|
response: HttpServletResponse): Unit = {
|
||||||
|
super.handle(target, baseRequest, request, new ResponseWrapper(request, response))
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ResponseWrapper(
|
||||||
|
req: HttpServletRequest,
|
||||||
|
res: HttpServletResponse)
|
||||||
|
extends HttpServletResponseWrapper(res) {
|
||||||
|
|
||||||
|
override def sendRedirect(location: String): Unit = {
|
||||||
|
val newTarget = if (location != null) {
|
||||||
|
val target = new URI(location)
|
||||||
|
val path = if (target.getPath().startsWith("/")) {
|
||||||
|
target.getPath()
|
||||||
|
} else {
|
||||||
|
req.getRequestURI().stripSuffix("/") + "/" + target.getPath()
|
||||||
|
}
|
||||||
|
// The target path should already be encoded, so don't re-encode it, just the
|
||||||
|
// proxy address part.
|
||||||
|
val proxyBase = UIUtils.uiRoot(req)
|
||||||
|
val proxyPrefix = if (proxyBase.nonEmpty) s"$proxyUri$proxyBase" else proxyUri
|
||||||
|
s"${res.encodeURL(proxyPrefix)}${target.getPath()}"
|
||||||
|
} else {
|
||||||
|
null
|
||||||
|
}
|
||||||
|
super.sendRedirect(newTarget)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ import org.scalatest.time.SpanSugar._
|
||||||
|
|
||||||
import org.apache.spark._
|
import org.apache.spark._
|
||||||
import org.apache.spark.LocalSparkContext._
|
import org.apache.spark.LocalSparkContext._
|
||||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
import org.apache.spark.internal.config.UI
|
||||||
import org.apache.spark.util.Utils
|
import org.apache.spark.util.Utils
|
||||||
|
|
||||||
class UISuite extends SparkFunSuite {
|
class UISuite extends SparkFunSuite {
|
||||||
|
@ -45,7 +45,7 @@ class UISuite extends SparkFunSuite {
|
||||||
val conf = new SparkConf()
|
val conf = new SparkConf()
|
||||||
.setMaster("local")
|
.setMaster("local")
|
||||||
.setAppName("test")
|
.setAppName("test")
|
||||||
.set(UI_ENABLED, true)
|
.set(UI.UI_ENABLED, true)
|
||||||
val sc = new SparkContext(conf)
|
val sc = new SparkContext(conf)
|
||||||
assert(sc.ui.isDefined)
|
assert(sc.ui.isDefined)
|
||||||
sc
|
sc
|
||||||
|
@ -273,7 +273,6 @@ class UISuite extends SparkFunSuite {
|
||||||
|
|
||||||
val (_, testContext) = newContext("/test2")
|
val (_, testContext) = newContext("/test2")
|
||||||
serverInfo.addHandler(testContext, securityMgr)
|
serverInfo.addHandler(testContext, securityMgr)
|
||||||
testContext.start()
|
|
||||||
|
|
||||||
val httpPort = serverInfo.boundPort
|
val httpPort = serverInfo.boundPort
|
||||||
|
|
||||||
|
@ -318,6 +317,54 @@ class UISuite extends SparkFunSuite {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("redirect with proxy server support") {
|
||||||
|
val proxyRoot = "https://proxy.example.com:443/prefix"
|
||||||
|
val (conf, securityMgr, sslOptions) = sslDisabledConf()
|
||||||
|
conf.set(UI.PROXY_REDIRECT_URI, proxyRoot)
|
||||||
|
|
||||||
|
val serverInfo = JettyUtils.startJettyServer("0.0.0.0", 0, sslOptions, conf)
|
||||||
|
try {
|
||||||
|
val serverAddr = s"http://localhost:${serverInfo.boundPort}"
|
||||||
|
|
||||||
|
val (_, ctx) = newContext("/ctx1")
|
||||||
|
serverInfo.addHandler(ctx, securityMgr)
|
||||||
|
|
||||||
|
val redirect = JettyUtils.createRedirectHandler("/src", "/dst")
|
||||||
|
serverInfo.addHandler(redirect, securityMgr)
|
||||||
|
|
||||||
|
// Test Jetty's built-in redirect to add the trailing slash to the context path.
|
||||||
|
TestUtils.withHttpConnection(new URL(s"$serverAddr/ctx1")) { conn =>
|
||||||
|
assert(conn.getResponseCode() === HttpServletResponse.SC_FOUND)
|
||||||
|
val location = Option(conn.getHeaderFields().get("Location"))
|
||||||
|
.map(_.get(0)).orNull
|
||||||
|
assert(location === s"$proxyRoot/ctx1/")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with a URL handled by the added redirect handler, and also including a path prefix.
|
||||||
|
val headers = Seq("X-Forwarded-Context" -> "/prefix")
|
||||||
|
TestUtils.withHttpConnection(
|
||||||
|
new URL(s"$serverAddr/src/"),
|
||||||
|
headers = headers) { conn =>
|
||||||
|
assert(conn.getResponseCode() === HttpServletResponse.SC_FOUND)
|
||||||
|
val location = Option(conn.getHeaderFields().get("Location"))
|
||||||
|
.map(_.get(0)).orNull
|
||||||
|
assert(location === s"$proxyRoot/prefix/dst")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Not really used by Spark, but test with a relative redirect.
|
||||||
|
val relative = JettyUtils.createRedirectHandler("/rel", "root")
|
||||||
|
serverInfo.addHandler(relative, securityMgr)
|
||||||
|
TestUtils.withHttpConnection(new URL(s"$serverAddr/rel/")) { conn =>
|
||||||
|
assert(conn.getResponseCode() === HttpServletResponse.SC_FOUND)
|
||||||
|
val location = Option(conn.getHeaderFields().get("Location"))
|
||||||
|
.map(_.get(0)).orNull
|
||||||
|
assert(location === s"$proxyRoot/rel/root")
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stopServer(serverInfo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new context handler for the given path, with a single servlet that responds to
|
* Create a new context handler for the given path, with a single servlet that responds to
|
||||||
* requests in `$path/root`.
|
* requests in `$path/root`.
|
||||||
|
|
|
@ -1106,6 +1106,18 @@ Apart from these, the following properties are also available, and may be useful
|
||||||
This is the URL where your proxy is running. This URL is for proxy which is running in front of Spark Master. This is useful when running proxy for authentication e.g. OAuth proxy. Make sure this is a complete URL including scheme (http/https) and port to reach your proxy.
|
This is the URL where your proxy is running. This URL is for proxy which is running in front of Spark Master. This is useful when running proxy for authentication e.g. OAuth proxy. Make sure this is a complete URL including scheme (http/https) and port to reach your proxy.
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td><code>spark.ui.proxyRedirectUri</code></td>
|
||||||
|
<td></td>
|
||||||
|
<td>
|
||||||
|
Where to address redirects when Spark is running behind a proxy. This will make Spark
|
||||||
|
modify redirect responses so they point to the proxy server, instead of the Spark UI's own
|
||||||
|
address. This should be only the address of the server, without any prefix paths for the
|
||||||
|
application; the prefix should be set either by the proxy server itself (by adding the
|
||||||
|
<code>X-Forwarded-Context</code> request header), or by setting the proxy base in the Spark
|
||||||
|
app's configuration.
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td><code>spark.ui.showConsoleProgress</code></td>
|
<td><code>spark.ui.showConsoleProgress</code></td>
|
||||||
<td>false</td>
|
<td>false</td>
|
||||||
|
|
Loading…
Reference in a new issue