[SPARK-27446][R] Use existing spark conf if available.
## What changes were proposed in this pull request?
The RBackend and RBackendHandler create new conf objects that don't pick up conf values from the existing SparkSession and therefore always use the default conf values instead of values specified by the user.
In this fix we check to see if the spark env already exists, and get the conf from there. We fall back to creating a new conf. This follows the pattern used in other places including this: 3725b1324f/core/src/main/scala/org/apache/spark/api/r/BaseRRunner.scala (L261)
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes #24353 from MrBago/r-backend-use-existing-conf.
Authored-by: Bago Amirbekian <bago@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
4704af4c26
commit
eea3f55a31
|
@ -30,7 +30,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder
|
|||
import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}
|
||||
import io.netty.handler.timeout.ReadTimeoutHandler
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.{SparkConf, SparkEnv}
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.R._
|
||||
|
||||
|
@ -47,7 +47,7 @@ private[spark] class RBackend {
|
|||
private[r] val jvmObjectTracker = new JVMObjectTracker
|
||||
|
||||
def init(): (Int, RAuthHelper) = {
|
||||
val conf = new SparkConf()
|
||||
val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
|
||||
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
|
||||
bossGroup = new NioEventLoopGroup(conf.get(R_NUM_BACKEND_THREADS))
|
||||
val workerGroup = bossGroup
|
||||
|
@ -124,7 +124,7 @@ private[spark] object RBackend extends Logging {
|
|||
val listenPort = serverSocket.getLocalPort()
|
||||
// Connection timeout is set by socket client. To make it configurable we will pass the
|
||||
// timeout value to client inside the temp file
|
||||
val conf = new SparkConf()
|
||||
val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
|
||||
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
|
||||
|
||||
// tell the R process via temporary file
|
||||
|
|
|
@ -26,7 +26,7 @@ import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
|
|||
import io.netty.channel.ChannelHandler.Sharable
|
||||
import io.netty.handler.timeout.ReadTimeoutException
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.{SparkConf, SparkEnv}
|
||||
import org.apache.spark.api.r.SerDe._
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.R._
|
||||
|
@ -98,7 +98,7 @@ private[r] class RBackendHandler(server: RBackend)
|
|||
ctx.write(pingBaos.toByteArray)
|
||||
}
|
||||
}
|
||||
val conf = new SparkConf()
|
||||
val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
|
||||
val heartBeatInterval = conf.get(R_HEARTBEAT_INTERVAL)
|
||||
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
|
||||
val interval = Math.min(heartBeatInterval, backendConnectionTimeout - 1)
|
||||
|
|
Loading…
Reference in a new issue