[SPARK-16129][CORE][SQL] Eliminate direct use of commons-lang classes in favor of commons-lang3

## What changes were proposed in this pull request?

Replace use of `commons-lang` in favor of `commons-lang3` and forbid the former via scalastyle; remove `NotImplementedException` from `comons-lang` in favor of JDK `UnsupportedOperationException`

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #13843 from srowen/SPARK-16129.
This commit is contained in:
Sean Owen 2016-06-24 10:35:54 +01:00
parent f4fd7432fb
commit 158af162ea
13 changed files with 44 additions and 48 deletions

View file

@ -24,7 +24,6 @@ import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.generic.Growable
@ -34,7 +33,7 @@ import scala.reflect.{classTag, ClassTag}
import scala.util.control.NonFatal
import com.google.common.collect.MapMaker
import org.apache.commons.lang.SerializationUtils
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
@ -334,7 +333,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
override protected def childValue(parent: Properties): Properties = {
// Note: make a clone such that changes in the parent properties aren't reflected in
// the those of the children threads, which has confusing semantics (SPARK-10563).
SerializationUtils.clone(parent).asInstanceOf[Properties]
SerializationUtils.clone(parent)
}
override protected def initialValue(): Properties = new Properties()
}

View file

@ -210,6 +210,12 @@ This file is divided into 3 sections:
scala.collection.JavaConverters._ and use .asScala / .asJava methods</customMessage>
</check>
<check customId="commonslang2" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
<parameters><parameter name="regex">org\.apache\.commons\.lang\.</parameter></parameters>
<customMessage>Use Commons Lang 3 classes (package org.apache.commons.lang3.*) instead
of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
</check>
<check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
<parameters>
<parameter name="groups">java,scala,3rdParty,spark</parameter>

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult

View file

@ -23,7 +23,7 @@ import scala.collection.Map
import scala.collection.mutable.Stack
import scala.reflect.ClassTag
import org.apache.commons.lang.ClassUtils
import org.apache.commons.lang3.ClassUtils
import org.json4s.JsonAST._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

View file

@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
@ -228,7 +227,7 @@ public class VectorizedColumnReader {
column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
break;
@ -239,7 +238,7 @@ public class VectorizedColumnReader {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
break;
@ -262,7 +261,7 @@ public class VectorizedColumnReader {
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
}
} else {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
break;
case BINARY:
@ -293,12 +292,12 @@ public class VectorizedColumnReader {
column.putByteArray(i, v.getBytes());
}
} else {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
break;
default:
throw new NotImplementedException("Unsupported type: " + descriptor.getType());
throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
}
}
@ -327,7 +326,7 @@ public class VectorizedColumnReader {
defColumn.readShorts(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
}
@ -360,7 +359,7 @@ public class VectorizedColumnReader {
defColumn.readDoubles(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
}
@ -381,7 +380,7 @@ public class VectorizedColumnReader {
}
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
}
@ -417,7 +416,7 @@ public class VectorizedColumnReader {
}
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
}
@ -459,13 +458,13 @@ public class VectorizedColumnReader {
@SuppressWarnings("deprecation")
Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
}
this.dataColumn = new VectorizedRleValuesReader();
this.useDictionary = true;
} else {
if (dataEncoding != Encoding.PLAIN) {
throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
}
this.dataColumn = new VectorizedPlainValuesReader();
this.useDictionary = false;
@ -485,7 +484,7 @@ public class VectorizedColumnReader {
// Initialize the decoders.
if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding());
throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
}
int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
this.defColumn = new VectorizedRleValuesReader(bitWidth);

View file

@ -20,7 +20,6 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.api.Binary;
@ -100,7 +99,7 @@ public abstract class ColumnVector implements AutoCloseable {
@Override
public ArrayData copy() {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
// TODO: this is extremely expensive.
@ -171,7 +170,7 @@ public abstract class ColumnVector implements AutoCloseable {
}
}
} else {
throw new NotImplementedException("Type " + dt);
throw new UnsupportedOperationException("Type " + dt);
}
return list;
}
@ -181,7 +180,7 @@ public abstract class ColumnVector implements AutoCloseable {
@Override
public boolean getBoolean(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
@Override
@ -189,7 +188,7 @@ public abstract class ColumnVector implements AutoCloseable {
@Override
public short getShort(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
@Override
@ -200,7 +199,7 @@ public abstract class ColumnVector implements AutoCloseable {
@Override
public float getFloat(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
@Override
@ -240,12 +239,12 @@ public abstract class ColumnVector implements AutoCloseable {
@Override
public MapData getMap(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
@Override
public Object get(int ordinal, DataType dataType) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
}
@ -562,7 +561,7 @@ public abstract class ColumnVector implements AutoCloseable {
* Returns the value for rowId.
*/
public MapData getMap(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
/**

View file

@ -23,8 +23,6 @@ import java.sql.Date;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.NotImplementedException;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
@ -112,7 +110,7 @@ public class ColumnVectorUtils {
}
return result;
} else {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
}
@ -161,7 +159,7 @@ public class ColumnVectorUtils {
} else if (t instanceof DateType) {
dst.appendInt(DateTimeUtils.fromJavaDate((Date)o));
} else {
throw new NotImplementedException("Type " + t);
throw new UnsupportedOperationException("Type " + t);
}
}
}

View file

@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.vectorized;
import java.math.BigDecimal;
import java.util.*;
import org.apache.commons.lang.NotImplementedException;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
@ -166,7 +164,7 @@ public final class ColumnarBatch {
@Override
public boolean anyNull() {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
@Override
@ -227,12 +225,12 @@ public final class ColumnarBatch {
@Override
public MapData getMap(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
@Override
public Object get(int ordinal, DataType dataType) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
@Override
@ -258,7 +256,7 @@ public final class ColumnarBatch {
setDecimal(ordinal, Decimal.apply((BigDecimal) value, t.precision(), t.scale()),
t.precision());
} else {
throw new NotImplementedException("Datatype not supported " + dt);
throw new UnsupportedOperationException("Datatype not supported " + dt);
}
}
}
@ -430,7 +428,7 @@ public final class ColumnarBatch {
*/
public void setColumn(int ordinal, ColumnVector column) {
if (column instanceof OffHeapColumnVector) {
throw new NotImplementedException("Need to ref count columns.");
throw new UnsupportedOperationException("Need to ref count columns.");
}
columns[ordinal] = column;
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
import scala.collection.JavaConverters._
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD

View file

@ -29,7 +29,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.cli.HiveFileProcessor;

View file

@ -26,7 +26,7 @@ import scala.collection.mutable.Queue
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import org.apache.commons.lang.SerializationUtils
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@ -579,8 +579,7 @@ class StreamingContext private[streaming] (
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(
sparkContext.localProperties.get()).asInstanceOf[Properties])
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
scheduler.start()
}
state = StreamingContextState.ACTIVE

View file

@ -17,13 +17,12 @@
package org.apache.spark.streaming.scheduler
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.Failure
import org.apache.commons.lang.SerializationUtils
import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
@ -219,8 +218,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
def run() {
val oldProps = ssc.sparkContext.getLocalProperties
try {
ssc.sparkContext.setLocalProperties(
SerializationUtils.clone(ssc.savedProperties.get()).asInstanceOf[Properties])
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"