Honor default fs name when initializing event logger.
This is related to SPARK-1459 / PR #375. Without this fix, FileLogger.createLogDir() may try to create the log dir on HDFS, while createWriter() will try to open the log file on the local file system, leading to interesting errors and confusion. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #450 from vanzin/event-file-2 and squashes the following commits: 592cdb3 [Marcelo Vanzin] Honor default fs name when initializing event logger.
This commit is contained in:
parent
a967b005c8
commit
dd1b7a61d9
|
@ -216,10 +216,33 @@ class SparkContext(config: SparkConf) extends Logging {
|
|||
private[spark] val ui = new SparkUI(this)
|
||||
ui.bind()
|
||||
|
||||
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
|
||||
val hadoopConfiguration: Configuration = {
|
||||
val env = SparkEnv.get
|
||||
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
|
||||
// Explicitly check for S3 environment variables
|
||||
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
|
||||
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
|
||||
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
|
||||
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
|
||||
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
|
||||
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
|
||||
}
|
||||
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
|
||||
conf.getAll.foreach { case (key, value) =>
|
||||
if (key.startsWith("spark.hadoop.")) {
|
||||
hadoopConf.set(key.substring("spark.hadoop.".length), value)
|
||||
}
|
||||
}
|
||||
val bufferSize = conf.get("spark.buffer.size", "65536")
|
||||
hadoopConf.set("io.file.buffer.size", bufferSize)
|
||||
hadoopConf
|
||||
}
|
||||
|
||||
// Optionally log Spark events
|
||||
private[spark] val eventLogger: Option[EventLoggingListener] = {
|
||||
if (conf.getBoolean("spark.eventLog.enabled", false)) {
|
||||
val logger = new EventLoggingListener(appName, conf)
|
||||
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
|
||||
logger.start()
|
||||
listenerBus.addListener(logger)
|
||||
Some(logger)
|
||||
|
@ -294,29 +317,6 @@ class SparkContext(config: SparkConf) extends Logging {
|
|||
postEnvironmentUpdate()
|
||||
postApplicationStart()
|
||||
|
||||
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
|
||||
val hadoopConfiguration: Configuration = {
|
||||
val env = SparkEnv.get
|
||||
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
|
||||
// Explicitly check for S3 environment variables
|
||||
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
|
||||
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
|
||||
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
|
||||
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
|
||||
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
|
||||
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
|
||||
}
|
||||
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
|
||||
conf.getAll.foreach { case (key, value) =>
|
||||
if (key.startsWith("spark.hadoop.")) {
|
||||
hadoopConf.set(key.substring("spark.hadoop.".length), value)
|
||||
}
|
||||
}
|
||||
val bufferSize = conf.get("spark.buffer.size", "65536")
|
||||
hadoopConf.set("io.file.buffer.size", bufferSize)
|
||||
hadoopConf
|
||||
}
|
||||
|
||||
private[spark] var checkpointDir: Option[String] = None
|
||||
|
||||
// Thread Local variable that can be used by users to pass information down the stack
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.spark.scheduler
|
|||
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.json4s.jackson.JsonMethods._
|
||||
|
||||
|
@ -36,7 +37,10 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
|
|||
* spark.eventLog.dir - Path to the directory in which events are logged.
|
||||
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
|
||||
*/
|
||||
private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
|
||||
private[spark] class EventLoggingListener(
|
||||
appName: String,
|
||||
conf: SparkConf,
|
||||
hadoopConfiguration: Configuration)
|
||||
extends SparkListener with Logging {
|
||||
|
||||
import EventLoggingListener._
|
||||
|
@ -49,7 +53,8 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
|
|||
val logDir = logBaseDir + "/" + name
|
||||
|
||||
private val logger =
|
||||
new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)
|
||||
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
|
||||
shouldOverwrite)
|
||||
|
||||
/**
|
||||
* Begin logging events.
|
||||
|
|
|
@ -22,7 +22,8 @@ import java.net.URI
|
|||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
|
||||
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
|
||||
|
||||
import org.apache.spark.{Logging, SparkConf}
|
||||
import org.apache.spark.io.CompressionCodec
|
||||
|
@ -37,7 +38,8 @@ import org.apache.spark.io.CompressionCodec
|
|||
*/
|
||||
private[spark] class FileLogger(
|
||||
logDir: String,
|
||||
conf: SparkConf = new SparkConf,
|
||||
conf: SparkConf,
|
||||
hadoopConfiguration: Configuration,
|
||||
outputBufferSize: Int = 8 * 1024, // 8 KB
|
||||
compress: Boolean = false,
|
||||
overwrite: Boolean = true)
|
||||
|
@ -85,19 +87,20 @@ private[spark] class FileLogger(
|
|||
private def createWriter(fileName: String): PrintWriter = {
|
||||
val logPath = logDir + "/" + fileName
|
||||
val uri = new URI(logPath)
|
||||
val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
|
||||
val isDefaultLocal = (defaultFs == null || defaultFs == "file")
|
||||
|
||||
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
|
||||
* Therefore, for local files, use FileOutputStream instead. */
|
||||
val dstream = uri.getScheme match {
|
||||
case "file" | null =>
|
||||
val dstream =
|
||||
if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
|
||||
// Second parameter is whether to append
|
||||
new FileOutputStream(uri.getPath, !overwrite)
|
||||
|
||||
case _ =>
|
||||
} else {
|
||||
val path = new Path(logPath)
|
||||
hadoopDataStream = Some(fileSystem.create(path, overwrite))
|
||||
hadoopDataStream.get
|
||||
}
|
||||
}
|
||||
|
||||
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
|
||||
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
|
||||
|
|
Loading…
Reference in a new issue