2013-07-16 20:21:33 -04:00
|
|
|
/*
|
|
|
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
* contributor license agreements. See the NOTICE file distributed with
|
|
|
|
* this work for additional information regarding copyright ownership.
|
|
|
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
* (the "License"); you may not use this file except in compliance with
|
|
|
|
* the License. You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
2010-03-29 19:17:55 -04:00
|
|
|
package spark
|
|
|
|
|
|
|
|
import java.io._
|
2013-04-15 08:42:11 -04:00
|
|
|
import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
|
2012-09-26 00:46:58 -04:00
|
|
|
import java.util.{Locale, Random, UUID}
|
2013-04-15 08:42:11 -04:00
|
|
|
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
|
2013-05-30 19:21:49 -04:00
|
|
|
import java.util.regex.Pattern
|
|
|
|
|
2013-07-10 20:59:43 -04:00
|
|
|
import scala.collection.Map
|
2013-04-15 08:42:11 -04:00
|
|
|
import scala.collection.mutable.{ArrayBuffer, HashMap}
|
2012-12-08 03:33:11 -05:00
|
|
|
import scala.collection.JavaConversions._
|
2012-07-01 20:13:31 -04:00
|
|
|
import scala.io.Source
|
2013-05-30 19:21:49 -04:00
|
|
|
|
2012-12-28 19:13:23 -05:00
|
|
|
import com.google.common.io.Files
|
2013-01-22 02:31:00 -05:00
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
2013-05-30 19:21:49 -04:00
|
|
|
|
|
|
|
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
|
|
|
|
|
2013-02-01 18:38:42 -05:00
|
|
|
import spark.serializer.SerializerInstance
|
2013-04-15 08:42:11 -04:00
|
|
|
import spark.deploy.SparkHadoopUtil
|
2013-05-30 19:21:49 -04:00
|
|
|
|
2010-10-04 15:01:05 -04:00
|
|
|
|
2010-10-16 19:14:13 -04:00
|
|
|
/**
|
|
|
|
* Various utility methods used by Spark.
|
|
|
|
*/
|
2012-09-28 20:42:00 -04:00
|
|
|
private object Utils extends Logging {
|
2013-05-30 19:21:49 -04:00
|
|
|
|
2012-07-06 18:23:26 -04:00
|
|
|
/** Serialize an object using Java serialization */
|
2012-04-10 16:29:46 -04:00
|
|
|
def serialize[T](o: T): Array[Byte] = {
|
|
|
|
val bos = new ByteArrayOutputStream()
|
|
|
|
val oos = new ObjectOutputStream(bos)
|
|
|
|
oos.writeObject(o)
|
2012-06-29 21:47:12 -04:00
|
|
|
oos.close()
|
2012-04-10 16:29:46 -04:00
|
|
|
return bos.toByteArray
|
|
|
|
}
|
2012-04-10 00:59:56 -04:00
|
|
|
|
2012-07-06 18:23:26 -04:00
|
|
|
/** Deserialize an object using Java serialization */
|
2012-04-10 16:29:46 -04:00
|
|
|
def deserialize[T](bytes: Array[Byte]): T = {
|
|
|
|
val bis = new ByteArrayInputStream(bytes)
|
|
|
|
val ois = new ObjectInputStream(bis)
|
|
|
|
return ois.readObject.asInstanceOf[T]
|
|
|
|
}
|
2010-03-29 19:17:55 -04:00
|
|
|
|
2012-07-06 18:23:26 -04:00
|
|
|
/** Deserialize an object using Java serialization and the given ClassLoader */
|
2010-03-29 19:17:55 -04:00
|
|
|
def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
|
2012-04-10 16:29:46 -04:00
|
|
|
val bis = new ByteArrayInputStream(bytes)
|
|
|
|
val ois = new ObjectInputStream(bis) {
|
|
|
|
override def resolveClass(desc: ObjectStreamClass) =
|
|
|
|
Class.forName(desc.getName, false, loader)
|
|
|
|
}
|
|
|
|
return ois.readObject.asInstanceOf[T]
|
2010-03-29 19:17:55 -04:00
|
|
|
}
|
2010-10-04 15:01:05 -04:00
|
|
|
|
2012-02-09 18:50:26 -05:00
|
|
|
def isAlpha(c: Char): Boolean = {
|
2010-10-04 15:01:05 -04:00
|
|
|
(c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
|
|
|
|
}
|
|
|
|
|
2012-07-01 20:13:31 -04:00
|
|
|
/** Split a string into words at non-alphabetic characters */
|
2010-10-04 15:01:05 -04:00
|
|
|
def splitWords(s: String): Seq[String] = {
|
|
|
|
val buf = new ArrayBuffer[String]
|
|
|
|
var i = 0
|
|
|
|
while (i < s.length) {
|
|
|
|
var j = i
|
|
|
|
while (j < s.length && isAlpha(s.charAt(j))) {
|
|
|
|
j += 1
|
|
|
|
}
|
|
|
|
if (j > i) {
|
2012-06-29 21:47:12 -04:00
|
|
|
buf += s.substring(i, j)
|
2010-10-04 15:01:05 -04:00
|
|
|
}
|
|
|
|
i = j
|
|
|
|
while (i < s.length && !isAlpha(s.charAt(i))) {
|
|
|
|
i += 1
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return buf
|
|
|
|
}
|
2010-10-16 19:14:13 -04:00
|
|
|
|
2013-04-15 08:42:11 -04:00
|
|
|
private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
|
|
|
|
|
|
|
|
// Register the path to be deleted via shutdown hook
|
|
|
|
def registerShutdownDeleteDir(file: File) {
|
|
|
|
val absolutePath = file.getAbsolutePath()
|
|
|
|
shutdownDeletePaths.synchronized {
|
|
|
|
shutdownDeletePaths += absolutePath
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Is the path already registered to be deleted via a shutdown hook ?
|
|
|
|
def hasShutdownDeleteDir(file: File): Boolean = {
|
|
|
|
val absolutePath = file.getAbsolutePath()
|
|
|
|
shutdownDeletePaths.synchronized {
|
|
|
|
shutdownDeletePaths.contains(absolutePath)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-05-30 19:21:49 -04:00
|
|
|
// Note: if file is child of some registered path, while not equal to it, then return true;
|
|
|
|
// else false. This is to ensure that two shutdown hooks do not try to delete each others
|
|
|
|
// paths - resulting in IOException and incomplete cleanup.
|
2013-04-15 08:42:11 -04:00
|
|
|
def hasRootAsShutdownDeleteDir(file: File): Boolean = {
|
|
|
|
val absolutePath = file.getAbsolutePath()
|
|
|
|
val retval = shutdownDeletePaths.synchronized {
|
2013-05-30 19:21:49 -04:00
|
|
|
shutdownDeletePaths.find { path =>
|
|
|
|
!absolutePath.equals(path) && absolutePath.startsWith(path)
|
|
|
|
}.isDefined
|
|
|
|
}
|
|
|
|
if (retval) {
|
|
|
|
logInfo("path = " + file + ", already present as root for deletion.")
|
2013-04-15 08:42:11 -04:00
|
|
|
}
|
|
|
|
retval
|
|
|
|
}
|
|
|
|
|
2012-07-01 20:13:31 -04:00
|
|
|
/** Create a temporary directory inside the given parent directory */
|
2012-02-10 11:19:53 -05:00
|
|
|
def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
|
2010-10-16 19:14:13 -04:00
|
|
|
var attempts = 0
|
|
|
|
val maxAttempts = 10
|
|
|
|
var dir: File = null
|
|
|
|
while (dir == null) {
|
|
|
|
attempts += 1
|
|
|
|
if (attempts > maxAttempts) {
|
2013-06-17 17:07:51 -04:00
|
|
|
throw new IOException("Failed to create a temp directory (under " + root + ") after " +
|
2013-06-17 16:58:23 -04:00
|
|
|
maxAttempts + " attempts!")
|
2010-10-16 19:14:13 -04:00
|
|
|
}
|
|
|
|
try {
|
|
|
|
dir = new File(root, "spark-" + UUID.randomUUID.toString)
|
|
|
|
if (dir.exists() || !dir.mkdirs()) {
|
|
|
|
dir = null
|
|
|
|
}
|
|
|
|
} catch { case e: IOException => ; }
|
|
|
|
}
|
2013-04-15 08:42:11 -04:00
|
|
|
|
|
|
|
registerShutdownDeleteDir(dir)
|
|
|
|
|
2012-06-09 18:58:07 -04:00
|
|
|
// Add a shutdown hook to delete the temp dir when the JVM exits
|
|
|
|
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
|
|
|
|
override def run() {
|
2013-04-15 08:42:11 -04:00
|
|
|
// Attempt to delete if some patch which is parent of this is not already registered.
|
|
|
|
if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
|
2012-06-09 18:58:07 -04:00
|
|
|
}
|
|
|
|
})
|
2013-05-30 19:21:49 -04:00
|
|
|
dir
|
2010-10-16 19:14:13 -04:00
|
|
|
}
|
|
|
|
|
2012-07-01 20:13:31 -04:00
|
|
|
/** Copy all data from an InputStream to an OutputStream */
|
2010-10-16 19:14:13 -04:00
|
|
|
def copyStream(in: InputStream,
|
|
|
|
out: OutputStream,
|
|
|
|
closeStreams: Boolean = false)
|
|
|
|
{
|
|
|
|
val buf = new Array[Byte](8192)
|
|
|
|
var n = 0
|
|
|
|
while (n != -1) {
|
|
|
|
n = in.read(buf)
|
|
|
|
if (n != -1) {
|
|
|
|
out.write(buf, 0, n)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (closeStreams) {
|
|
|
|
in.close()
|
|
|
|
out.close()
|
|
|
|
}
|
|
|
|
}
|
2010-11-04 01:45:44 -04:00
|
|
|
|
2012-08-30 14:01:43 -04:00
|
|
|
/**
|
|
|
|
* Download a file requested by the executor. Supports fetching the file in a variety of ways,
|
|
|
|
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
|
2012-12-28 20:37:13 -05:00
|
|
|
*
|
|
|
|
* Throws SparkException if the target file already exists and has different contents than
|
|
|
|
* the requested file.
|
2012-08-30 14:01:43 -04:00
|
|
|
*/
|
|
|
|
def fetchFile(url: String, targetDir: File) {
|
|
|
|
val filename = url.split("/").last
|
2013-01-17 20:40:55 -05:00
|
|
|
val tempDir = getLocalDir
|
2012-12-28 19:13:23 -05:00
|
|
|
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
|
2012-08-30 14:01:43 -04:00
|
|
|
val targetFile = new File(targetDir, filename)
|
2012-09-04 21:52:07 -04:00
|
|
|
val uri = new URI(url)
|
|
|
|
uri.getScheme match {
|
|
|
|
case "http" | "https" | "ftp" =>
|
2012-12-28 19:13:23 -05:00
|
|
|
logInfo("Fetching " + url + " to " + tempFile)
|
2012-09-04 21:52:07 -04:00
|
|
|
val in = new URL(url).openStream()
|
2012-12-28 19:13:23 -05:00
|
|
|
val out = new FileOutputStream(tempFile)
|
2012-09-04 21:52:07 -04:00
|
|
|
Utils.copyStream(in, out, true)
|
2012-12-28 19:13:23 -05:00
|
|
|
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
|
|
|
|
tempFile.delete()
|
2013-05-30 19:21:49 -04:00
|
|
|
throw new SparkException(
|
|
|
|
"File " + targetFile + " exists and does not match contents of" + " " + url)
|
2012-12-28 19:13:23 -05:00
|
|
|
} else {
|
|
|
|
Files.move(tempFile, targetFile)
|
|
|
|
}
|
2012-09-04 21:52:07 -04:00
|
|
|
case "file" | null =>
|
2013-05-30 19:21:49 -04:00
|
|
|
// In the case of a local file, copy the local file to the target directory.
|
|
|
|
// Note the difference between uri vs url.
|
|
|
|
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
|
|
|
|
if (targetFile.exists) {
|
|
|
|
// If the target file already exists, warn the user if
|
|
|
|
if (!Files.equal(sourceFile, targetFile)) {
|
|
|
|
throw new SparkException(
|
|
|
|
"File " + targetFile + " exists and does not match contents of" + " " + url)
|
2012-12-28 19:13:23 -05:00
|
|
|
} else {
|
2013-05-30 19:21:49 -04:00
|
|
|
// Do nothing if the file contents are the same, i.e. this file has been copied
|
|
|
|
// previously.
|
|
|
|
logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
|
|
|
|
+ targetFile.getAbsolutePath)
|
2012-12-28 19:13:23 -05:00
|
|
|
}
|
2013-05-30 19:21:49 -04:00
|
|
|
} else {
|
|
|
|
// The file does not exist in the target directory. Copy it there.
|
|
|
|
logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
|
|
|
|
Files.copy(sourceFile, targetFile)
|
2012-10-07 03:54:38 -04:00
|
|
|
}
|
2012-09-04 21:52:07 -04:00
|
|
|
case _ =>
|
|
|
|
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
|
|
|
|
val uri = new URI(url)
|
2013-04-21 22:31:13 -04:00
|
|
|
val conf = SparkHadoopUtil.newConfiguration()
|
2012-09-04 21:52:07 -04:00
|
|
|
val fs = FileSystem.get(uri, conf)
|
|
|
|
val in = fs.open(new Path(uri))
|
2012-12-28 19:13:23 -05:00
|
|
|
val out = new FileOutputStream(tempFile)
|
2012-09-04 21:52:07 -04:00
|
|
|
Utils.copyStream(in, out, true)
|
2012-12-28 19:13:23 -05:00
|
|
|
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
|
|
|
|
tempFile.delete()
|
2012-12-28 20:37:13 -05:00
|
|
|
throw new SparkException("File " + targetFile + " exists and does not match contents of" +
|
|
|
|
" " + url)
|
2012-12-28 19:13:23 -05:00
|
|
|
} else {
|
|
|
|
Files.move(tempFile, targetFile)
|
|
|
|
}
|
2012-08-30 14:01:43 -04:00
|
|
|
}
|
|
|
|
// Decompress the file if it's a .tar or .tar.gz
|
|
|
|
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
|
|
|
|
logInfo("Untarring " + filename)
|
|
|
|
Utils.execute(Seq("tar", "-xzf", filename), targetDir)
|
|
|
|
} else if (filename.endsWith(".tar")) {
|
|
|
|
logInfo("Untarring " + filename)
|
|
|
|
Utils.execute(Seq("tar", "-xf", filename), targetDir)
|
|
|
|
}
|
2012-09-17 13:08:37 -04:00
|
|
|
// Make the file executable - That's necessary for scripts
|
2013-01-21 19:42:24 -05:00
|
|
|
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
|
2012-08-30 14:01:43 -04:00
|
|
|
}
|
2012-07-06 18:23:26 -04:00
|
|
|
|
2013-01-17 20:40:55 -05:00
|
|
|
/**
|
|
|
|
* Get a temporary directory using Spark's spark.local.dir property, if set. This will always
|
|
|
|
* return a single directory, even though the spark.local.dir property might be a list of
|
|
|
|
* multiple paths.
|
|
|
|
*/
|
|
|
|
def getLocalDir: String = {
|
|
|
|
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
|
|
|
|
}
|
|
|
|
|
2012-07-01 20:13:31 -04:00
|
|
|
/**
|
|
|
|
* Shuffle the elements of a collection into a random order, returning the
|
|
|
|
* result in a new collection. Unlike scala.util.Random.shuffle, this method
|
|
|
|
* uses a local random number generator, avoiding inter-thread contention.
|
|
|
|
*/
|
2012-09-26 00:46:58 -04:00
|
|
|
def randomize[T: ClassManifest](seq: TraversableOnce[T]): Seq[T] = {
|
|
|
|
randomizeInPlace(seq.toArray)
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Shuffle the elements of an array into a random order, modifying the
|
|
|
|
* original array. Returns the original array.
|
|
|
|
*/
|
|
|
|
def randomizeInPlace[T](arr: Array[T], rand: Random = new Random): Array[T] = {
|
|
|
|
for (i <- (arr.length - 1) to 1 by -1) {
|
2010-11-04 01:45:44 -04:00
|
|
|
val j = rand.nextInt(i)
|
2012-09-26 00:46:58 -04:00
|
|
|
val tmp = arr(j)
|
|
|
|
arr(j) = arr(i)
|
|
|
|
arr(i) = tmp
|
2010-11-04 01:45:44 -04:00
|
|
|
}
|
2012-09-26 00:46:58 -04:00
|
|
|
arr
|
2010-11-04 01:45:44 -04:00
|
|
|
}
|
2010-11-09 16:46:30 -05:00
|
|
|
|
|
|
|
/**
|
2012-02-10 01:58:24 -05:00
|
|
|
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
|
2013-04-15 08:42:11 -04:00
|
|
|
* Note, this is typically not used from within core spark.
|
2010-11-09 16:46:30 -05:00
|
|
|
*/
|
2012-12-08 03:33:11 -05:00
|
|
|
lazy val localIpAddress: String = findLocalIpAddress()
|
2013-04-15 08:42:11 -04:00
|
|
|
lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress)
|
2012-12-08 03:33:11 -05:00
|
|
|
|
|
|
|
private def findLocalIpAddress(): String = {
|
2012-11-19 14:52:10 -05:00
|
|
|
val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
|
2012-12-08 03:33:11 -05:00
|
|
|
if (defaultIpOverride != null) {
|
2012-11-15 16:44:13 -05:00
|
|
|
defaultIpOverride
|
2012-12-08 03:33:11 -05:00
|
|
|
} else {
|
|
|
|
val address = InetAddress.getLocalHost
|
|
|
|
if (address.isLoopbackAddress) {
|
|
|
|
// Address resolves to something like 127.0.1.1, which happens on Debian; try to find
|
|
|
|
// a better address using the local network interfaces
|
|
|
|
for (ni <- NetworkInterface.getNetworkInterfaces) {
|
2013-01-21 14:59:21 -05:00
|
|
|
for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
|
|
|
|
!addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
|
2012-12-08 03:33:11 -05:00
|
|
|
// We've found an address that looks reasonable!
|
|
|
|
logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
|
|
|
|
" a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
|
|
|
|
" instead (on interface " + ni.getName + ")")
|
|
|
|
logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
|
|
|
|
return addr.getHostAddress
|
|
|
|
}
|
|
|
|
}
|
|
|
|
logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
|
|
|
|
" a loopback address: " + address.getHostAddress + ", but we couldn't find any" +
|
|
|
|
" external IP address!")
|
|
|
|
logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
|
|
|
|
}
|
|
|
|
address.getHostAddress
|
|
|
|
}
|
2012-11-15 16:44:13 -05:00
|
|
|
}
|
2012-06-07 03:25:47 -04:00
|
|
|
|
|
|
|
private var customHostname: Option[String] = None
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Allow setting a custom host name because when we run on Mesos we need to use the same
|
|
|
|
* hostname it reports to the master.
|
|
|
|
*/
|
|
|
|
def setCustomHostname(hostname: String) {
|
2013-04-15 08:42:11 -04:00
|
|
|
// DEBUG code
|
|
|
|
Utils.checkHost(hostname)
|
2012-06-07 03:25:47 -04:00
|
|
|
customHostname = Some(hostname)
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2012-07-01 20:13:31 -04:00
|
|
|
* Get the local machine's hostname.
|
2012-06-07 03:25:47 -04:00
|
|
|
*/
|
|
|
|
def localHostName(): String = {
|
2013-04-15 08:42:11 -04:00
|
|
|
customHostname.getOrElse(localIpAddressHostname)
|
|
|
|
}
|
|
|
|
|
|
|
|
def getAddressHostName(address: String): String = {
|
|
|
|
InetAddress.getByName(address).getHostName
|
|
|
|
}
|
|
|
|
|
|
|
|
def localHostPort(): String = {
|
|
|
|
val retval = System.getProperty("spark.hostPort", null)
|
|
|
|
if (retval == null) {
|
|
|
|
logErrorWithStack("spark.hostPort not set but invoking localHostPort")
|
|
|
|
return localHostName()
|
|
|
|
}
|
|
|
|
|
|
|
|
retval
|
|
|
|
}
|
|
|
|
|
2013-05-01 22:11:33 -04:00
|
|
|
/*
|
2013-04-15 08:42:11 -04:00
|
|
|
// Used by DEBUG code : remove when all testing done
|
2013-04-17 13:43:34 -04:00
|
|
|
private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$")
|
2013-04-15 08:42:11 -04:00
|
|
|
def checkHost(host: String, message: String = "") {
|
|
|
|
// Currently catches only ipv4 pattern, this is just a debugging tool - not rigourous !
|
2013-04-17 13:43:34 -04:00
|
|
|
// if (host.matches("^[0-9]+(\\.[0-9]+)*$")) {
|
|
|
|
if (ipPattern.matcher(host).matches()) {
|
2013-04-15 08:42:11 -04:00
|
|
|
Utils.logErrorWithStack("Unexpected to have host " + host + " which matches IP pattern. Message " + message)
|
|
|
|
}
|
|
|
|
if (Utils.parseHostPort(host)._2 != 0){
|
|
|
|
Utils.logErrorWithStack("Unexpected to have host " + host + " which has port in it. Message " + message)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Used by DEBUG code : remove when all testing done
|
|
|
|
def checkHostPort(hostPort: String, message: String = "") {
|
|
|
|
val (host, port) = Utils.parseHostPort(hostPort)
|
|
|
|
checkHost(host)
|
|
|
|
if (port <= 0){
|
|
|
|
Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message)
|
|
|
|
}
|
|
|
|
}
|
2013-05-01 10:54:00 -04:00
|
|
|
|
|
|
|
// Used by DEBUG code : remove when all testing done
|
|
|
|
def logErrorWithStack(msg: String) {
|
|
|
|
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
|
|
|
|
// temp code for debug
|
|
|
|
System.exit(-1)
|
|
|
|
}
|
2013-05-01 22:11:33 -04:00
|
|
|
*/
|
2013-04-15 08:42:11 -04:00
|
|
|
|
2013-04-17 13:43:34 -04:00
|
|
|
// Once testing is complete in various modes, replace with this ?
|
|
|
|
def checkHost(host: String, message: String = "") {}
|
|
|
|
def checkHostPort(hostPort: String, message: String = "") {}
|
|
|
|
|
2013-04-15 08:42:11 -04:00
|
|
|
// Used by DEBUG code : remove when all testing done
|
|
|
|
def logErrorWithStack(msg: String) {
|
|
|
|
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
|
2013-05-01 10:54:00 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
def getUserNameFromEnvironment(): String = {
|
|
|
|
SparkHadoopUtil.getUserNameFromEnvironment
|
2013-04-15 08:42:11 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
// Typically, this will be of order of number of nodes in cluster
|
|
|
|
// If not, we should change it to LRUCache or something.
|
|
|
|
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
|
2013-05-30 19:21:49 -04:00
|
|
|
|
2013-04-15 08:42:11 -04:00
|
|
|
def parseHostPort(hostPort: String): (String, Int) = {
|
|
|
|
{
|
|
|
|
// Check cache first.
|
|
|
|
var cached = hostPortParseResults.get(hostPort)
|
|
|
|
if (cached != null) return cached
|
|
|
|
}
|
|
|
|
|
|
|
|
val indx: Int = hostPort.lastIndexOf(':')
|
2013-05-30 19:21:49 -04:00
|
|
|
// This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
|
|
|
|
// but then hadoop does not support ipv6 right now.
|
2013-04-15 08:42:11 -04:00
|
|
|
// For now, we assume that if port exists, then it is valid - not check if it is an int > 0
|
|
|
|
if (-1 == indx) {
|
|
|
|
val retval = (hostPort, 0)
|
|
|
|
hostPortParseResults.put(hostPort, retval)
|
|
|
|
return retval
|
|
|
|
}
|
|
|
|
|
|
|
|
val retval = (hostPort.substring(0, indx).trim(), hostPort.substring(indx + 1).trim().toInt)
|
|
|
|
hostPortParseResults.putIfAbsent(hostPort, retval)
|
|
|
|
hostPortParseResults.get(hostPort)
|
|
|
|
}
|
|
|
|
|
2013-01-22 02:31:00 -05:00
|
|
|
private[spark] val daemonThreadFactory: ThreadFactory =
|
|
|
|
new ThreadFactoryBuilder().setDaemon(true).build()
|
2011-04-28 01:13:01 -04:00
|
|
|
|
|
|
|
/**
|
2012-02-10 01:58:24 -05:00
|
|
|
* Wrapper over newCachedThreadPool.
|
2011-04-28 01:13:01 -04:00
|
|
|
*/
|
2013-01-22 02:31:00 -05:00
|
|
|
def newDaemonCachedThreadPool(): ThreadPoolExecutor =
|
|
|
|
Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
|
2012-10-07 03:54:38 -04:00
|
|
|
|
2012-06-07 03:25:47 -04:00
|
|
|
/**
|
2012-10-07 03:54:38 -04:00
|
|
|
* Return the string to tell how long has passed in seconds. The passing parameter should be in
|
|
|
|
* millisecond.
|
2012-06-07 03:25:47 -04:00
|
|
|
*/
|
|
|
|
def getUsedTimeMs(startTimeMs: Long): String = {
|
2012-08-02 01:09:27 -04:00
|
|
|
return " " + (System.currentTimeMillis - startTimeMs) + " ms"
|
2012-06-07 03:25:47 -04:00
|
|
|
}
|
2011-04-28 01:13:01 -04:00
|
|
|
|
|
|
|
/**
|
2012-02-10 01:58:24 -05:00
|
|
|
* Wrapper over newFixedThreadPool.
|
2011-04-28 01:13:01 -04:00
|
|
|
*/
|
2013-01-22 02:31:00 -05:00
|
|
|
def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
|
|
|
|
Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
|
2011-02-27 22:15:52 -05:00
|
|
|
|
|
|
|
/**
|
2012-02-10 01:58:24 -05:00
|
|
|
* Delete a file or directory and its contents recursively.
|
|
|
|
*/
|
|
|
|
def deleteRecursively(file: File) {
|
|
|
|
if (file.isDirectory) {
|
|
|
|
for (child <- file.listFiles()) {
|
|
|
|
deleteRecursively(child)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (!file.delete()) {
|
|
|
|
throw new IOException("Failed to delete: " + file)
|
|
|
|
}
|
|
|
|
}
|
2012-05-14 21:39:04 -04:00
|
|
|
|
|
|
|
/**
|
2012-06-30 19:43:27 -04:00
|
|
|
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
|
|
|
|
* This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
|
|
|
|
* environment variable.
|
|
|
|
*/
|
|
|
|
def memoryStringToMb(str: String): Int = {
|
|
|
|
val lower = str.toLowerCase
|
|
|
|
if (lower.endsWith("k")) {
|
|
|
|
(lower.substring(0, lower.length-1).toLong / 1024).toInt
|
|
|
|
} else if (lower.endsWith("m")) {
|
|
|
|
lower.substring(0, lower.length-1).toInt
|
|
|
|
} else if (lower.endsWith("g")) {
|
|
|
|
lower.substring(0, lower.length-1).toInt * 1024
|
|
|
|
} else if (lower.endsWith("t")) {
|
|
|
|
lower.substring(0, lower.length-1).toInt * 1024 * 1024
|
|
|
|
} else {// no suffix, so it's just a number in bytes
|
|
|
|
(lower.toLong / 1024 / 1024).toInt
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Convert a memory quantity in bytes to a human-readable string such as "4.0 MB".
|
2012-05-14 21:39:04 -04:00
|
|
|
*/
|
2012-05-15 03:31:52 -04:00
|
|
|
def memoryBytesToString(size: Long): String = {
|
2012-06-30 17:45:55 -04:00
|
|
|
val TB = 1L << 40
|
2012-05-14 21:39:04 -04:00
|
|
|
val GB = 1L << 30
|
|
|
|
val MB = 1L << 20
|
|
|
|
val KB = 1L << 10
|
2012-05-19 09:13:20 -04:00
|
|
|
|
2012-05-14 21:39:04 -04:00
|
|
|
val (value, unit) = {
|
2012-06-30 17:45:55 -04:00
|
|
|
if (size >= 2*TB) {
|
|
|
|
(size.asInstanceOf[Double] / TB, "TB")
|
|
|
|
} else if (size >= 2*GB) {
|
2012-05-14 21:39:04 -04:00
|
|
|
(size.asInstanceOf[Double] / GB, "GB")
|
|
|
|
} else if (size >= 2*MB) {
|
|
|
|
(size.asInstanceOf[Double] / MB, "MB")
|
|
|
|
} else if (size >= 2*KB) {
|
|
|
|
(size.asInstanceOf[Double] / KB, "KB")
|
|
|
|
} else {
|
|
|
|
(size.asInstanceOf[Double], "B")
|
|
|
|
}
|
|
|
|
}
|
2012-06-30 17:45:55 -04:00
|
|
|
"%.1f %s".formatLocal(Locale.US, value, unit)
|
2012-05-14 21:39:04 -04:00
|
|
|
}
|
2012-06-30 19:43:27 -04:00
|
|
|
|
2013-06-27 16:02:47 -04:00
|
|
|
/**
|
|
|
|
* Returns a human-readable string representing a duration such as "35ms"
|
|
|
|
*/
|
|
|
|
def msDurationToString(ms: Long): String = {
|
|
|
|
val second = 1000
|
|
|
|
val minute = 60 * second
|
|
|
|
val hour = 60 * minute
|
|
|
|
|
|
|
|
ms match {
|
|
|
|
case t if t < second =>
|
2013-07-01 16:40:12 -04:00
|
|
|
"%d ms".format(t)
|
2013-06-27 16:02:47 -04:00
|
|
|
case t if t < minute =>
|
2013-07-01 16:40:12 -04:00
|
|
|
"%.1f s".format(t.toFloat / second)
|
2013-06-27 16:02:47 -04:00
|
|
|
case t if t < hour =>
|
2013-07-01 16:40:12 -04:00
|
|
|
"%.1f m".format(t.toFloat / minute)
|
2013-06-27 16:02:47 -04:00
|
|
|
case t =>
|
2013-07-01 16:40:12 -04:00
|
|
|
"%.2f h".format(t.toFloat / hour)
|
2013-06-27 16:02:47 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-06-30 19:43:27 -04:00
|
|
|
/**
|
|
|
|
* Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB".
|
|
|
|
*/
|
|
|
|
def memoryMegabytesToString(megabytes: Long): String = {
|
|
|
|
memoryBytesToString(megabytes * 1024L * 1024L)
|
|
|
|
}
|
2012-07-01 20:13:31 -04:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Execute a command in the given working directory, throwing an exception if it completes
|
|
|
|
* with an exit code other than 0.
|
|
|
|
*/
|
|
|
|
def execute(command: Seq[String], workingDir: File) {
|
|
|
|
val process = new ProcessBuilder(command: _*)
|
|
|
|
.directory(workingDir)
|
|
|
|
.redirectErrorStream(true)
|
|
|
|
.start()
|
|
|
|
new Thread("read stdout for " + command(0)) {
|
|
|
|
override def run() {
|
|
|
|
for (line <- Source.fromInputStream(process.getInputStream).getLines) {
|
|
|
|
System.err.println(line)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}.start()
|
|
|
|
val exitCode = process.waitFor()
|
|
|
|
if (exitCode != 0) {
|
|
|
|
throw new SparkException("Process " + command + " exited with code " + exitCode)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Execute a command in the current working directory, throwing an exception if it completes
|
|
|
|
* with an exit code other than 0.
|
|
|
|
*/
|
|
|
|
def execute(command: Seq[String]) {
|
|
|
|
execute(command, new File("."))
|
|
|
|
}
|
2012-09-28 20:42:00 -04:00
|
|
|
|
2013-06-25 18:21:00 -04:00
|
|
|
/**
|
|
|
|
* Execute a command and get its output, throwing an exception if it yields a code other than 0.
|
|
|
|
*/
|
2013-07-10 20:59:43 -04:00
|
|
|
def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."),
|
|
|
|
extraEnvironment: Map[String, String] = Map.empty): String = {
|
|
|
|
val builder = new ProcessBuilder(command: _*)
|
2013-06-25 18:21:00 -04:00
|
|
|
.directory(workingDir)
|
2013-07-10 20:59:43 -04:00
|
|
|
val environment = builder.environment()
|
|
|
|
for ((key, value) <- extraEnvironment) {
|
|
|
|
environment.put(key, value)
|
|
|
|
}
|
|
|
|
val process = builder.start()
|
2013-06-25 18:21:00 -04:00
|
|
|
new Thread("read stderr for " + command(0)) {
|
|
|
|
override def run() {
|
|
|
|
for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
|
|
|
|
System.err.println(line)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}.start()
|
|
|
|
val output = new StringBuffer
|
|
|
|
val stdoutThread = new Thread("read stdout for " + command(0)) {
|
|
|
|
override def run() {
|
|
|
|
for (line <- Source.fromInputStream(process.getInputStream).getLines) {
|
|
|
|
output.append(line)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
stdoutThread.start()
|
|
|
|
val exitCode = process.waitFor()
|
|
|
|
stdoutThread.join() // Wait for it to finish reading output
|
|
|
|
if (exitCode != 0) {
|
|
|
|
throw new SparkException("Process " + command + " exited with code " + exitCode)
|
|
|
|
}
|
|
|
|
output.toString
|
|
|
|
}
|
|
|
|
|
2013-07-14 03:23:09 -04:00
|
|
|
/**
|
|
|
|
* A regular expression to match classes of the "core" Spark API that we want to skip when
|
|
|
|
* finding the call site of a method.
|
|
|
|
*/
|
|
|
|
private val SPARK_CLASS_REGEX = """^spark(\.api\.java)?(\.rdd)?\.[A-Z]""".r
|
|
|
|
|
2013-06-25 17:17:27 -04:00
|
|
|
private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
|
2013-06-21 05:55:32 -04:00
|
|
|
val firstUserLine: Int, val firstUserClass: String)
|
2013-07-14 03:23:09 -04:00
|
|
|
|
2012-09-28 20:42:00 -04:00
|
|
|
/**
|
|
|
|
* When called inside a class in the spark package, returns the name of the user code class
|
|
|
|
* (outside the spark package) that called into Spark, as well as which Spark method they called.
|
|
|
|
* This is used, for example, to tell users where in their code each RDD got created.
|
|
|
|
*/
|
2013-06-21 05:38:23 -04:00
|
|
|
def getCallSiteInfo: CallSiteInfo = {
|
2012-09-28 21:28:13 -04:00
|
|
|
val trace = Thread.currentThread.getStackTrace().filter( el =>
|
|
|
|
(!el.getMethodName.contains("getStackTrace")))
|
2012-09-28 20:42:00 -04:00
|
|
|
|
|
|
|
// Keep crawling up the stack trace until we find the first function not inside of the spark
|
|
|
|
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
|
|
|
|
// transformation, a SparkContext function (such as parallelize), or anything else that leads
|
|
|
|
// to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
|
|
|
|
var lastSparkMethod = "<unknown>"
|
|
|
|
var firstUserFile = "<unknown>"
|
|
|
|
var firstUserLine = 0
|
|
|
|
var finished = false
|
2013-06-08 02:23:39 -04:00
|
|
|
var firstUserClass = "<unknown>"
|
2012-09-28 20:42:00 -04:00
|
|
|
|
|
|
|
for (el <- trace) {
|
|
|
|
if (!finished) {
|
2013-07-14 03:23:09 -04:00
|
|
|
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName) != None) {
|
2012-09-29 02:28:16 -04:00
|
|
|
lastSparkMethod = if (el.getMethodName == "<init>") {
|
|
|
|
// Spark method is a constructor; get its class name
|
|
|
|
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
|
|
|
|
} else {
|
|
|
|
el.getMethodName
|
|
|
|
}
|
2012-09-28 20:42:00 -04:00
|
|
|
}
|
|
|
|
else {
|
2012-09-28 21:28:13 -04:00
|
|
|
firstUserLine = el.getLineNumber
|
|
|
|
firstUserFile = el.getFileName
|
2013-06-08 02:23:39 -04:00
|
|
|
firstUserClass = el.getClassName
|
2012-09-28 20:42:00 -04:00
|
|
|
finished = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2013-06-21 05:38:23 -04:00
|
|
|
new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
|
2012-09-28 20:42:00 -04:00
|
|
|
}
|
2013-01-28 02:56:14 -05:00
|
|
|
|
2013-06-08 02:23:39 -04:00
|
|
|
def formatSparkCallSite = {
|
|
|
|
val callSiteInfo = getCallSiteInfo
|
2013-06-21 05:38:23 -04:00
|
|
|
"%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
|
|
|
|
callSiteInfo.firstUserLine)
|
2013-06-08 02:23:39 -04:00
|
|
|
}
|
2013-02-01 18:38:42 -05:00
|
|
|
|
2013-07-10 16:24:26 -04:00
|
|
|
/** Return a string containing part of a file from byte 'start' to 'end'. */
|
2013-07-10 16:15:42 -04:00
|
|
|
def offsetBytes(path: String, start: Long, end: Long): String = {
|
2013-07-02 18:56:34 -04:00
|
|
|
val file = new File(path)
|
|
|
|
val length = file.length()
|
2013-07-10 17:52:29 -04:00
|
|
|
val effectiveEnd = math.min(length, end)
|
|
|
|
val effectiveStart = math.max(0, start)
|
2013-07-10 16:15:42 -04:00
|
|
|
val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
|
2013-07-02 18:56:34 -04:00
|
|
|
val stream = new FileInputStream(file)
|
|
|
|
|
2013-07-10 16:15:42 -04:00
|
|
|
stream.skip(effectiveStart)
|
2013-07-02 18:56:34 -04:00
|
|
|
stream.read(buff)
|
|
|
|
stream.close()
|
|
|
|
Source.fromBytes(buff).mkString
|
2013-01-28 02:56:14 -05:00
|
|
|
}
|
2013-02-01 18:38:42 -05:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Clone an object using a Spark serializer.
|
|
|
|
*/
|
|
|
|
def clone[T](value: T, serializer: SerializerInstance): T = {
|
|
|
|
serializer.deserialize[T](serializer.serialize(value))
|
|
|
|
}
|
2013-02-19 20:49:55 -05:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Detect whether this thread might be executing a shutdown hook. Will always return true if
|
|
|
|
* the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
|
|
|
|
* if System.exit was just called by a concurrent thread).
|
|
|
|
*
|
|
|
|
* Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
|
|
|
|
* an IllegalStateException.
|
|
|
|
*/
|
|
|
|
def inShutdown(): Boolean = {
|
|
|
|
try {
|
|
|
|
val hook = new Thread {
|
|
|
|
override def run() {}
|
|
|
|
}
|
|
|
|
Runtime.getRuntime.addShutdownHook(hook)
|
|
|
|
Runtime.getRuntime.removeShutdownHook(hook)
|
|
|
|
} catch {
|
|
|
|
case ise: IllegalStateException => return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
2013-06-25 17:17:27 -04:00
|
|
|
|
|
|
|
def isSpace(c: Char): Boolean = {
|
|
|
|
" \t\r\n".indexOf(c) != -1
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Split a string of potentially quoted arguments from the command line the way that a shell
|
|
|
|
* would do it to determine arguments to a command. For example, if the string is 'a "b c" d',
|
|
|
|
* then it would be parsed as three arguments: 'a', 'b c' and 'd'.
|
|
|
|
*/
|
|
|
|
def splitCommandString(s: String): Seq[String] = {
|
|
|
|
val buf = new ArrayBuffer[String]
|
|
|
|
var inWord = false
|
|
|
|
var inSingleQuote = false
|
|
|
|
var inDoubleQuote = false
|
|
|
|
var curWord = new StringBuilder
|
|
|
|
def endWord() {
|
|
|
|
buf += curWord.toString
|
|
|
|
curWord.clear()
|
|
|
|
}
|
|
|
|
var i = 0
|
|
|
|
while (i < s.length) {
|
|
|
|
var nextChar = s.charAt(i)
|
|
|
|
if (inDoubleQuote) {
|
|
|
|
if (nextChar == '"') {
|
|
|
|
inDoubleQuote = false
|
|
|
|
} else if (nextChar == '\\') {
|
|
|
|
if (i < s.length - 1) {
|
|
|
|
// Append the next character directly, because only " and \ may be escaped in
|
|
|
|
// double quotes after the shell's own expansion
|
|
|
|
curWord.append(s.charAt(i + 1))
|
|
|
|
i += 1
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
curWord.append(nextChar)
|
|
|
|
}
|
|
|
|
} else if (inSingleQuote) {
|
|
|
|
if (nextChar == '\'') {
|
|
|
|
inSingleQuote = false
|
|
|
|
} else {
|
|
|
|
curWord.append(nextChar)
|
|
|
|
}
|
|
|
|
// Backslashes are not treated specially in single quotes
|
|
|
|
} else if (nextChar == '"') {
|
|
|
|
inWord = true
|
|
|
|
inDoubleQuote = true
|
|
|
|
} else if (nextChar == '\'') {
|
|
|
|
inWord = true
|
|
|
|
inSingleQuote = true
|
|
|
|
} else if (!isSpace(nextChar)) {
|
|
|
|
curWord.append(nextChar)
|
|
|
|
inWord = true
|
|
|
|
} else if (inWord && isSpace(nextChar)) {
|
|
|
|
endWord()
|
|
|
|
inWord = false
|
|
|
|
}
|
|
|
|
i += 1
|
|
|
|
}
|
|
|
|
if (inWord || inDoubleQuote || inSingleQuote) {
|
|
|
|
endWord()
|
|
|
|
}
|
|
|
|
return buf
|
|
|
|
}
|
2010-03-29 19:17:55 -04:00
|
|
|
}
|