Merge branch 'master' of git://github.com/apache/incubator-spark
This commit is contained in:
commit
25fa976580
|
@ -529,10 +529,10 @@ private[spark] class ClusterTaskSetManager(
|
|||
addPendingTask(index)
|
||||
if (state != TaskState.KILLED) {
|
||||
numFailures(index) += 1
|
||||
if (numFailures(index) > MAX_TASK_FAILURES) {
|
||||
logError("Task %s:%d failed more than %d times; aborting job".format(
|
||||
if (numFailures(index) >= MAX_TASK_FAILURES) {
|
||||
logError("Task %s:%d failed %d times; aborting job".format(
|
||||
taskSet.id, index, MAX_TASK_FAILURES))
|
||||
abort("Task %s:%d failed more than %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
|
||||
abort("Task %s:%d failed %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -122,7 +122,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
|
|||
sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
|
||||
}
|
||||
assert(thrown.getClass === classOf[SparkException])
|
||||
assert(thrown.getMessage.contains("more than 4 times"))
|
||||
assert(thrown.getMessage.contains("failed 4 times"))
|
||||
}
|
||||
|
||||
test("caching") {
|
||||
|
|
|
@ -283,7 +283,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
|
|||
|
||||
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
|
||||
// after the last failure.
|
||||
(0 until manager.MAX_TASK_FAILURES).foreach { index =>
|
||||
(1 to manager.MAX_TASK_FAILURES).foreach { index =>
|
||||
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
|
||||
assert(offerResult != None,
|
||||
"Expect resource offer on iteration %s to return a task".format(index))
|
||||
|
|
|
@ -589,7 +589,7 @@ def ssh(host, opts, command):
|
|||
while True:
|
||||
try:
|
||||
return subprocess.check_call(
|
||||
ssh_command(opts) + ['-t', '%s@%s' % (opts.user, host), stringify_command(command)])
|
||||
ssh_command(opts) + ['-t', '-t', '%s@%s' % (opts.user, host), stringify_command(command)])
|
||||
except subprocess.CalledProcessError as e:
|
||||
if (tries > 2):
|
||||
# If this was an ssh failure, provide the user with hints.
|
||||
|
@ -730,7 +730,7 @@ def real_main():
|
|||
if opts.proxy_port != None:
|
||||
proxy_opt = ['-D', opts.proxy_port]
|
||||
subprocess.check_call(
|
||||
ssh_command(opts) + proxy_opt + ['-t', "%s@%s" % (opts.user, master)])
|
||||
ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)])
|
||||
|
||||
elif action == "get-master":
|
||||
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
|
||||
|
|
Loading…
Reference in a new issue