Revert "[SPARK-8498] [TUNGSTEN] fix npe in errorhandling path in unsafeshuffle writer"
This reverts commit 3348245055
.
Reverting because `catch (Exception e) ... throw e` doesn't compile under
Java 6 unless the method declares that it throws Exception.
This commit is contained in:
parent
3348245055
commit
77cb1d5ed1
|
@ -139,9 +139,6 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
|
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
|
||||||
// Keep track of success so we know if we ecountered an exception
|
|
||||||
// We do this rather than a standard try/catch/re-throw to handle
|
|
||||||
// generic throwables.
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
while (records.hasNext()) {
|
while (records.hasNext()) {
|
||||||
|
@ -150,19 +147,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
|
||||||
closeAndWriteOutput();
|
closeAndWriteOutput();
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (sorter != null) {
|
if (!success) {
|
||||||
try {
|
sorter.cleanupAfterError();
|
||||||
sorter.cleanupAfterError();
|
|
||||||
} catch (Exception e) {
|
|
||||||
// Only throw this error if we won't be masking another
|
|
||||||
// error.
|
|
||||||
if (success) {
|
|
||||||
throw e;
|
|
||||||
} else {
|
|
||||||
logger.error("In addition to a failure during writing, we failed during " +
|
|
||||||
"cleanup.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -253,23 +253,6 @@ public class UnsafeShuffleWriterSuite {
|
||||||
createWriter(false).stop(false);
|
createWriter(false).stop(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
class PandaException extends RuntimeException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected=PandaException.class)
|
|
||||||
public void writeFailurePropagates() throws Exception {
|
|
||||||
class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> {
|
|
||||||
@Override public boolean hasNext() {
|
|
||||||
throw new PandaException();
|
|
||||||
}
|
|
||||||
@Override public Product2<Object, Object> next() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
|
|
||||||
writer.write(new BadRecords());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void writeEmptyIterator() throws Exception {
|
public void writeEmptyIterator() throws Exception {
|
||||||
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
|
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
|
||||||
|
|
Loading…
Reference in a new issue