[SPARK-36919][SQL] Make BadRecordException fields transient

### What changes were proposed in this pull request?
Migrating a Spark application from 2.4.x to 3.1.x and finding a difference in the exception chaining behavior. In a case of parsing a malformed CSV, where the root cause exception should be Caused by: java.lang.RuntimeException: Malformed CSV record, only the top level exception is kept, and all lower level exceptions and root cause are lost. Thus, when we call ExceptionUtils.getRootCause on the exception, we still get itself.
The reason for the difference is that RuntimeException is wrapped in BadRecordException, which has unserializable fields. When we try to serialize the exception from tasks and deserialize from scheduler, the exception is lost.
This PR makes unserializable fields of BadRecordException transient, so the rest of the exception could be serialized and deserialized properly.

### Why are the changes needed?
Make BadRecordException serializable

### Does this PR introduce _any_ user-facing change?
User could get root cause of BadRecordException

### How was this patch tested?
Unit testing

Closes #34167 from tianhanhu/master.

Authored-by: tianhanhu <adrianhu96@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
tianhanhu 2021-10-06 19:06:09 +09:00 committed by Hyukjin Kwon
parent f6013c8ce5
commit aed977c468
2 changed files with 4 additions and 2 deletions

View file

@ -38,6 +38,6 @@ case class PartialResultException(
* @param cause the actual exception about why the record is bad and can't be parsed. * @param cause the actual exception about why the record is bad and can't be parsed.
*/ */
case class BadRecordException( case class BadRecordException(
record: () => UTF8String, @transient record: () => UTF8String,
partialResult: () => Option[InternalRow], @transient partialResult: () => Option[InternalRow],
cause: Throwable) extends Exception(cause) cause: Throwable) extends Exception(cause)

View file

@ -30,6 +30,7 @@ import scala.collection.JavaConverters._
import scala.util.Properties import scala.util.Properties
import com.univocity.parsers.common.TextParsingException import com.univocity.parsers.common.TextParsingException
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.commons.lang3.time.FastDateFormat import org.apache.commons.lang3.time.FastDateFormat
import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.io.compress.GzipCodec
@ -366,6 +367,7 @@ abstract class CSVSuite
} }
assert(exception.getMessage.contains("Malformed CSV record")) assert(exception.getMessage.contains("Malformed CSV record"))
assert(ExceptionUtils.getRootCause(exception).isInstanceOf[RuntimeException])
} }
} }