[SPARK-20202][BUILD][SQL] Remove references to org.spark-project.hive (Hive 1.2.1)

### What changes were proposed in this pull request?

As of today,
- SPARK-30034 Apache Spark 3.0.0 switched its default Hive execution engine from Hive 1.2 to Hive 2.3. This removes the direct dependency to the forked Hive 1.2.1 in maven repository.
- SPARK-32981 Apache Spark 3.1.0(`master` branch) removed Hive 1.2 related artifacts from Apache Spark binary distributions.

This PR(SPARK-20202) aims to remove the following usage of unofficial Apache Hive fork completely from Apache Spark master for Apache Spark 3.1.0.
```
<hive.group>org.spark-project.hive</hive.group>
<hive.version>1.2.1.spark2</hive.version>
```

For the forked Hive 1.2.1.spark2 users, Apache Spark 2.4(LTS) and 3.0 (~ 2021.12) will provide it.

### Why are the changes needed?

- First, Apache Spark community should not use the unofficial forked release of another Apache project.
- Second, Apache Hive 1.2.1 was released at 2015-06-26 and the forked Hive `1.2.1.spark2` exposed many unfixable bugs in Apache because the forked `1.2.1.spark2` is not maintained at all. Apache Hive 2.3.0 was released at 2017-07-19 and it has been used with less number of bugs compared with `1.2.1.spark2`. Many bugs still exist in `hive-1.2` profile and new Apache Spark unit tests are added with `HiveUtils.isHive23` condition so far.

### Does this PR introduce _any_ user-facing change?

No. This is a dev-only change. PRBuilder will not accept `[test-hive1.2]` on master and `branch-3.1`.

### How was this patch tested?

1. SBT/Hadoop 3.2/Hive 2.3 (https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129366)
2. SBT/Hadoop 2.7/Hive 2.3 (https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129382)
3. SBT/Hadoop 3.2/Hive 1.2 (This has not been supported already due to Hive 1.2 doesn't work with Hadoop 3.2.)
4. SBT/Hadoop 2.7/Hive 1.2 (https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129383, This is rejected)

Closes #29936 from dongjoon-hyun/SPARK-REMOVE-HIVE1.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2020-10-05 15:29:56 -07:00
parent 14aeab3b27
commit 008a2ad1f8
320 changed files with 7 additions and 69240 deletions

View file

@ -325,7 +325,6 @@ def get_hive_profiles(hive_version):
""" """
sbt_maven_hive_profiles = { sbt_maven_hive_profiles = {
"hive1.2": ["-Phive-1.2"],
"hive2.3": ["-Phive-2.3"], "hive2.3": ["-Phive-2.3"],
} }

View file

@ -32,7 +32,6 @@ export LC_ALL=C
HADOOP_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkubernetes -Pyarn -Phive" HADOOP_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkubernetes -Pyarn -Phive"
MVN="build/mvn" MVN="build/mvn"
HADOOP_HIVE_PROFILES=( HADOOP_HIVE_PROFILES=(
hadoop-2.7-hive-1.2
hadoop-2.7-hive-2.3 hadoop-2.7-hive-2.3
hadoop-3.2-hive-2.3 hadoop-3.2-hive-2.3
) )
@ -71,12 +70,9 @@ for HADOOP_HIVE_PROFILE in "${HADOOP_HIVE_PROFILES[@]}"; do
if [[ $HADOOP_HIVE_PROFILE == **hadoop-3.2-hive-2.3** ]]; then if [[ $HADOOP_HIVE_PROFILE == **hadoop-3.2-hive-2.3** ]]; then
HADOOP_PROFILE=hadoop-3.2 HADOOP_PROFILE=hadoop-3.2
HIVE_PROFILE=hive-2.3 HIVE_PROFILE=hive-2.3
elif [[ $HADOOP_HIVE_PROFILE == **hadoop-2.7-hive-2.3** ]]; then
HADOOP_PROFILE=hadoop-2.7
HIVE_PROFILE=hive-2.3
else else
HADOOP_PROFILE=hadoop-2.7 HADOOP_PROFILE=hadoop-2.7
HIVE_PROFILE=hive-1.2 HIVE_PROFILE=hive-2.3
fi fi
echo "Performing Maven install for $HADOOP_HIVE_PROFILE" echo "Performing Maven install for $HADOOP_HIVE_PROFILE"
$MVN $HADOOP_MODULE_PROFILES -P$HADOOP_PROFILE -P$HIVE_PROFILE jar:jar jar:test-jar install:install clean -q $MVN $HADOOP_MODULE_PROFILES -P$HADOOP_PROFILE -P$HIVE_PROFILE jar:jar jar:test-jar install:install clean -q

View file

@ -42,6 +42,8 @@ license: |
- In Spark 3.1, incomplete interval literals, e.g. `INTERVAL '1'`, `INTERVAL '1 DAY 2'` will fail with IllegalArgumentException. In Spark 3.0, they result `NULL`s. - In Spark 3.1, incomplete interval literals, e.g. `INTERVAL '1'`, `INTERVAL '1 DAY 2'` will fail with IllegalArgumentException. In Spark 3.0, they result `NULL`s.
- In Spark 3.1, we remove the built-in Hive 1.2. You need to migrate your custom SerDes to Hive 2.3. See [HIVE-15167](https://issues.apache.org/jira/browse/HIVE-15167) for more details.
## Upgrading from Spark SQL 3.0 to 3.0.1 ## Upgrading from Spark SQL 3.0 to 3.0.1
- In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference. - In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference.

25
pom.xml
View file

@ -2970,13 +2970,9 @@
<sourceDirectories> <sourceDirectories>
<directory>${basedir}/src/main/java</directory> <directory>${basedir}/src/main/java</directory>
<directory>${basedir}/src/main/scala</directory> <directory>${basedir}/src/main/scala</directory>
<directory>${basedir}/v${hive.version.short}/src/main/java</directory>
<directory>${basedir}/v${hive.version.short}/src/main/scala</directory>
</sourceDirectories> </sourceDirectories>
<testSourceDirectories> <testSourceDirectories>
<directory>${basedir}/src/test/java</directory> <directory>${basedir}/src/test/java</directory>
<directory>${basedir}/v${hive.version.short}/src/test/java</directory>
<directory>${basedir}/v${hive.version.short}/src/test/scala</directory>
</testSourceDirectories> </testSourceDirectories>
<configLocation>dev/checkstyle.xml</configLocation> <configLocation>dev/checkstyle.xml</configLocation>
<outputFile>${basedir}/target/checkstyle-output.xml</outputFile> <outputFile>${basedir}/target/checkstyle-output.xml</outputFile>
@ -3148,27 +3144,6 @@
<!-- Default hadoop profile. Uses global properties. --> <!-- Default hadoop profile. Uses global properties. -->
</profile> </profile>
<profile>
<id>hive-1.2</id>
<properties>
<hive.group>org.spark-project.hive</hive.group>
<hive.classifier></hive.classifier>
<!-- Version used in Maven Hive dependency -->
<hive.version>1.2.1.spark2</hive.version>
<!-- Version used for internal directory structure -->
<hive.version.short>1.2</hive.version.short>
<hive.parquet.scope>${hive.deps.scope}</hive.parquet.scope>
<hive.storage.version>2.6.0</hive.storage.version>
<hive.storage.scope>provided</hive.storage.scope>
<hive.common.scope>provided</hive.common.scope>
<hive.llap.scope>provided</hive.llap.scope>
<hive.serde.scope>provided</hive.serde.scope>
<hive.shims.scope>provided</hive.shims.scope>
<orc.classifier>nohive</orc.classifier>
<datanucleus-core.version>3.2.10</datanucleus-core.version>
</properties>
</profile>
<profile> <profile>
<id>hive-2.3</id> <id>hive-2.3</id>
<!-- Default hive profile. Uses global properties. --> <!-- Default hive profile. Uses global properties. -->

View file

@ -221,8 +221,6 @@
</goals> </goals>
<configuration> <configuration>
<sources> <sources>
<source>v${hive.version.short}/src/main/scala</source>
<source>v${hive.version.short}/src/main/java</source>
<source>src/main/scala-${scala.binary.version}</source> <source>src/main/scala-${scala.binary.version}</source>
</sources> </sources>
</configuration> </configuration>
@ -235,7 +233,6 @@
</goals> </goals>
<configuration> <configuration>
<sources> <sources>
<source>v${hive.version.short}/src/test/scala</source>
<source>src/test/gen-java</source> <source>src/test/gen-java</source>
</sources> </sources>
</configuration> </configuration>

View file

@ -1,208 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.orc;
import java.math.BigDecimal;
import org.apache.orc.storage.ql.exec.vector.*;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;
/**
* A column vector class wrapping Hive's ColumnVector. Because Spark ColumnarBatch only accepts
* Spark's vectorized.ColumnVector, this column vector is used to adapt Hive ColumnVector with
* Spark ColumnarVector.
*/
public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVector {
private ColumnVector baseData;
private LongColumnVector longData;
private DoubleColumnVector doubleData;
private BytesColumnVector bytesData;
private DecimalColumnVector decimalData;
private TimestampColumnVector timestampData;
private final boolean isTimestamp;
private final boolean isDate;
private int batchSize;
OrcColumnVector(DataType type, ColumnVector vector) {
super(type);
if (type instanceof TimestampType) {
isTimestamp = true;
} else {
isTimestamp = false;
}
if (type instanceof DateType) {
isDate = true;
} else {
isDate = false;
}
baseData = vector;
if (vector instanceof LongColumnVector) {
longData = (LongColumnVector) vector;
} else if (vector instanceof DoubleColumnVector) {
doubleData = (DoubleColumnVector) vector;
} else if (vector instanceof BytesColumnVector) {
bytesData = (BytesColumnVector) vector;
} else if (vector instanceof DecimalColumnVector) {
decimalData = (DecimalColumnVector) vector;
} else if (vector instanceof TimestampColumnVector) {
timestampData = (TimestampColumnVector) vector;
} else {
throw new UnsupportedOperationException();
}
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
@Override
public void close() {
}
@Override
public boolean hasNull() {
return !baseData.noNulls;
}
@Override
public int numNulls() {
if (baseData.isRepeating) {
if (baseData.isNull[0]) {
return batchSize;
} else {
return 0;
}
} else if (baseData.noNulls) {
return 0;
} else {
int count = 0;
for (int i = 0; i < batchSize; i++) {
if (baseData.isNull[i]) count++;
}
return count;
}
}
/* A helper method to get the row index in a column. */
private int getRowIndex(int rowId) {
return baseData.isRepeating ? 0 : rowId;
}
@Override
public boolean isNullAt(int rowId) {
return baseData.isNull[getRowIndex(rowId)];
}
@Override
public boolean getBoolean(int rowId) {
return longData.vector[getRowIndex(rowId)] == 1;
}
@Override
public byte getByte(int rowId) {
return (byte) longData.vector[getRowIndex(rowId)];
}
@Override
public short getShort(int rowId) {
return (short) longData.vector[getRowIndex(rowId)];
}
@Override
public int getInt(int rowId) {
int value = (int) longData.vector[getRowIndex(rowId)];
if (isDate) {
return RebaseDateTime.rebaseJulianToGregorianDays(value);
} else {
return value;
}
}
@Override
public long getLong(int rowId) {
int index = getRowIndex(rowId);
if (isTimestamp) {
return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
} else {
return longData.vector[index];
}
}
@Override
public float getFloat(int rowId) {
return (float) doubleData.vector[getRowIndex(rowId)];
}
@Override
public double getDouble(int rowId) {
return doubleData.vector[getRowIndex(rowId)];
}
@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
if (isNullAt(rowId)) return null;
BigDecimal data = decimalData.vector[getRowIndex(rowId)].getHiveDecimal().bigDecimalValue();
return Decimal.apply(data, precision, scale);
}
@Override
public UTF8String getUTF8String(int rowId) {
if (isNullAt(rowId)) return null;
int index = getRowIndex(rowId);
BytesColumnVector col = bytesData;
return UTF8String.fromBytes(col.vector[index], col.start[index], col.length[index]);
}
@Override
public byte[] getBinary(int rowId) {
if (isNullAt(rowId)) return null;
int index = getRowIndex(rowId);
byte[] binary = new byte[bytesData.length[index]];
System.arraycopy(bytesData.vector[index], bytesData.start[index], binary, 0, binary.length);
return binary;
}
@Override
public ColumnarArray getArray(int rowId) {
throw new UnsupportedOperationException();
}
@Override
public ColumnarMap getMap(int rowId) {
throw new UnsupportedOperationException();
}
@Override
public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) {
throw new UnsupportedOperationException();
}
}

View file

@ -1,79 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.orc
import java.io.{DataInput, DataOutput, IOException}
import java.sql.Date
import org.apache.hadoop.io.WritableUtils
import org.apache.orc.storage.serde2.io.DateWritable
import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseJulianToGregorianDays}
/**
* The class accepts/returns days in Gregorian calendar and rebase them
* via conversion to local date in Julian calendar for dates before 1582-10-15
* in read/write for backward compatibility with Spark 2.4 and earlier versions.
*
* This is a clone of `org.apache.spark.sql.execution.datasources.DaysWritable`.
* The class is cloned because Hive ORC v1.2 uses different `DateWritable`:
* - v1.2: `org.apache.orc.storage.serde2.io.DateWritable`
* - v2.3 and `HiveInspectors`: `org.apache.hadoop.hive.serde2.io.DateWritable`
*
* @param gregorianDays The number of days since the epoch 1970-01-01 in
* Gregorian calendar.
* @param julianDays The number of days since the epoch 1970-01-01 in
* Julian calendar.
*/
class DaysWritable(
var gregorianDays: Int,
var julianDays: Int)
extends DateWritable {
def this() = this(0, 0)
def this(gregorianDays: Int) =
this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays))
def this(dateWritable: DateWritable) = {
this(
gregorianDays = dateWritable match {
case daysWritable: DaysWritable => daysWritable.gregorianDays
case dateWritable: DateWritable =>
rebaseJulianToGregorianDays(dateWritable.getDays)
},
julianDays = dateWritable.getDays)
}
override def getDays: Int = julianDays
override def get(): Date = new Date(DateWritable.daysToMillis(julianDays))
override def set(d: Int): Unit = {
gregorianDays = d
julianDays = rebaseGregorianToJulianDays(d)
}
@throws[IOException]
override def write(out: DataOutput): Unit = {
WritableUtils.writeVInt(out, julianDays)
}
@throws[IOException]
override def readFields(in: DataInput): Unit = {
julianDays = WritableUtils.readVInt(in)
gregorianDays = rebaseJulianToGregorianDays(julianDays)
}
}

View file

@ -1,275 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.orc
import java.time.{Instant, LocalDate}
import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.orc.storage.serde2.io.HiveDecimalWritable
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
/**
* Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down.
*
* Due to limitation of ORC `SearchArgument` builder, we had to implement separate checking and
* conversion passes through the Filter to make sure we only convert predicates that are known
* to be convertible.
*
* An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't
* build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite
* different from the cases in Spark SQL or Parquet, where complex filters can be easily built using
* existing simpler ones.
*
* The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and
* `startNot()` mutate internal state of the builder instance. This forces us to translate all
* convertible filters with a single builder instance. However, if we try to translate a filter
* before checking whether it can be converted or not, we may end up with a builder whose internal
* state is inconsistent in the case of an inconvertible filter.
*
* For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then
* try to convert its children. Say we convert `left` child successfully, but find that `right`
* child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent
* now.
*
* The workaround employed here is to trim the Spark filters before trying to convert them. This
* way, we can only do the actual conversion on the part of the Filter that is known to be
* convertible.
*
* P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of
* builder methods mentioned above can only be found in test code, where all tested filters are
* known to be convertible.
*/
private[sql] object OrcFilters extends OrcFiltersBase {
/**
* Create ORC filter as a SearchArgument instance.
*/
def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = {
val dataTypeMap = OrcFilters.getSearchableTypeMap(schema, SQLConf.get.caseSensitiveAnalysis)
// Combines all convertible filters using `And` to produce a single conjunction
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters))
conjunctionOptional.map { conjunction =>
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate.
// The input predicate is fully convertible. There should not be any empty result in the
// following recursive method call `buildSearchArgument`.
buildSearchArgument(dataTypeMap, conjunction, newBuilder).build()
}
}
def convertibleFilters(
schema: StructType,
dataTypeMap: Map[String, OrcPrimitiveField],
filters: Seq[Filter]): Seq[Filter] = {
import org.apache.spark.sql.sources._
def convertibleFiltersHelper(
filter: Filter,
canPartialPushDown: Boolean): Option[Filter] = filter match {
// At here, it is not safe to just convert one side and remove the other side
// if we do not understand what the parent filters are.
//
// Here is an example used to explain the reason.
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
// NOT(a = 2), which will generate wrong results.
//
// Pushing one side of AND down is only safe to do at the top level or in the child
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
// can be safely removed.
case And(left, right) =>
val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown)
val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown)
(leftResultOptional, rightResultOptional) match {
case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult))
case (Some(leftResult), None) if canPartialPushDown => Some(leftResult)
case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult)
case _ => None
}
// The Or predicate is convertible when both of its children can be pushed down.
// That is to say, if one/both of the children can be partially pushed down, the Or
// predicate can be partially pushed down as well.
//
// Here is an example used to explain the reason.
// Let's say we have
// (a1 AND a2) OR (b1 AND b2),
// a1 and b1 is convertible, while a2 and b2 is not.
// The predicate can be converted as
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
// As per the logical in And predicate, we can push down (a1 OR b1).
case Or(left, right) =>
for {
lhs <- convertibleFiltersHelper(left, canPartialPushDown)
rhs <- convertibleFiltersHelper(right, canPartialPushDown)
} yield Or(lhs, rhs)
case Not(pred) =>
val childResultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false)
childResultOptional.map(Not)
case other =>
for (_ <- buildLeafSearchArgument(dataTypeMap, other, newBuilder())) yield other
}
filters.flatMap { filter =>
convertibleFiltersHelper(filter, true)
}
}
/**
* Get PredicateLeafType which is corresponding to the given DataType.
*/
def getPredicateLeafType(dataType: DataType): PredicateLeaf.Type = dataType match {
case BooleanType => PredicateLeaf.Type.BOOLEAN
case ByteType | ShortType | IntegerType | LongType => PredicateLeaf.Type.LONG
case FloatType | DoubleType => PredicateLeaf.Type.FLOAT
case StringType => PredicateLeaf.Type.STRING
case DateType => PredicateLeaf.Type.DATE
case TimestampType => PredicateLeaf.Type.TIMESTAMP
case _: DecimalType => PredicateLeaf.Type.DECIMAL
case _ => throw new UnsupportedOperationException(s"DataType: ${dataType.catalogString}")
}
/**
* Cast literal values for filters.
*
* We need to cast to long because ORC raises exceptions
* at 'checkLiteralType' of SearchArgumentImpl.java.
*/
private def castLiteralValue(value: Any, dataType: DataType): Any = dataType match {
case ByteType | ShortType | IntegerType | LongType =>
value.asInstanceOf[Number].longValue
case FloatType | DoubleType =>
value.asInstanceOf[Number].doubleValue()
case _: DecimalType =>
new HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal]))
case _: DateType if value.isInstanceOf[LocalDate] =>
toJavaDate(localDateToDays(value.asInstanceOf[LocalDate]))
case _: TimestampType if value.isInstanceOf[Instant] =>
toJavaTimestamp(instantToMicros(value.asInstanceOf[Instant]))
case _ => value
}
/**
* Build a SearchArgument and return the builder so far.
*
* @param dataTypeMap a map from the attribute name to its data type.
* @param expression the input predicates, which should be fully convertible to SearchArgument.
* @param builder the input SearchArgument.Builder.
* @return the builder so far.
*/
private def buildSearchArgument(
dataTypeMap: Map[String, OrcPrimitiveField],
expression: Filter,
builder: Builder): Builder = {
import org.apache.spark.sql.sources._
expression match {
case And(left, right) =>
val lhs = buildSearchArgument(dataTypeMap, left, builder.startAnd())
val rhs = buildSearchArgument(dataTypeMap, right, lhs)
rhs.end()
case Or(left, right) =>
val lhs = buildSearchArgument(dataTypeMap, left, builder.startOr())
val rhs = buildSearchArgument(dataTypeMap, right, lhs)
rhs.end()
case Not(child) =>
buildSearchArgument(dataTypeMap, child, builder.startNot()).end()
case other =>
buildLeafSearchArgument(dataTypeMap, other, builder).getOrElse {
throw new SparkException(
"The input filter of OrcFilters.buildSearchArgument should be fully convertible.")
}
}
}
/**
* Build a SearchArgument for a leaf predicate and return the builder so far.
*
* @param dataTypeMap a map from the attribute name to its data type.
* @param expression the input filter predicates.
* @param builder the input SearchArgument.Builder.
* @return the builder so far.
*/
private def buildLeafSearchArgument(
dataTypeMap: Map[String, OrcPrimitiveField],
expression: Filter,
builder: Builder): Option[Builder] = {
def getType(attribute: String): PredicateLeaf.Type =
getPredicateLeafType(dataTypeMap(attribute).fieldType)
import org.apache.spark.sql.sources._
// NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
// call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
expression match {
case EqualTo(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
Some(builder.startAnd()
.equals(dataTypeMap(name).fieldName, getType(name), castedValue).end())
case EqualNullSafe(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
Some(builder.startAnd()
.nullSafeEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end())
case LessThan(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
Some(builder.startAnd()
.lessThan(dataTypeMap(name).fieldName, getType(name), castedValue).end())
case LessThanOrEqual(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
Some(builder.startAnd()
.lessThanEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end())
case GreaterThan(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
Some(builder.startNot()
.lessThanEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end())
case GreaterThanOrEqual(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
Some(builder.startNot()
.lessThan(dataTypeMap(name).fieldName, getType(name), castedValue).end())
case IsNull(name) if dataTypeMap.contains(name) =>
Some(builder.startAnd().isNull(dataTypeMap(name).fieldName, getType(name)).end())
case IsNotNull(name) if dataTypeMap.contains(name) =>
Some(builder.startNot().isNull(dataTypeMap(name).fieldName, getType(name)).end())
case In(name, values) if dataTypeMap.contains(name) =>
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name).fieldType))
Some(builder.startAnd().in(dataTypeMap(name).fieldName, getType(name),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
case _ => None
}
}
}

View file

@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.orc
import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch
import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument}
import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.types.Decimal
/**
* Various utilities for ORC used to upgrade the built-in Hive.
*/
private[sql] object OrcShimUtils {
class VectorizedRowBatchWrap(val batch: VectorizedRowBatch) {}
private[sql] type Operator = OrcOperator
private[sql] type SearchArgument = OrcSearchArgument
def getGregorianDays(value: Any): Int = {
new DaysWritable(value.asInstanceOf[DateWritable]).gregorianDays
}
def getDecimal(value: Any): Decimal = {
val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
}
def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
if (reuseObj) {
val result = new DaysWritable()
(getter, ordinal) =>
result.set(getter.getInt(ordinal))
result
} else {
(getter: SpecializedGetters, ordinal: Int) =>
new DaysWritable(getter.getInt(ordinal))
}
}
def getHiveDecimalWritable(precision: Int, scale: Int):
(SpecializedGetters, Int) => HiveDecimalWritable = {
(getter, ordinal) =>
val d = getter.getDecimal(ordinal, precision, scale)
new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal))
}
}

View file

@ -1,676 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.orc
import java.math.MathContext
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
/**
* A test suite that tests Apache ORC filter API based filter pushdown optimization.
* OrcFilterSuite and HiveOrcFilterSuite is logically duplicated to provide the same test coverage.
* The difference are the packages containing 'Predicate' and 'SearchArgument' classes.
* - OrcFilterSuite uses 'org.apache.orc.storage.ql.io.sarg' package.
* - HiveOrcFilterSuite uses 'org.apache.hadoop.hive.ql.io.sarg' package.
*/
class OrcFilterSuite extends OrcTest with SharedSparkSession {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")
protected def checkFilterPredicate(
df: DataFrame,
predicate: Predicate,
checker: (SearchArgument) => Unit): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
query.queryExecution.optimizedPlan match {
case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, o: OrcScan, _)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
assert(o.pushedFilters.nonEmpty, "No filter is pushed down")
val maybeFilter = OrcFilters.createFilter(query.schema, o.pushedFilters)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for ${o.pushedFilters}")
checker(maybeFilter.get)
case _ =>
throw new AnalysisException("Can not match OrcTable in the query.")
}
}
protected def checkFilterPredicate
(predicate: Predicate, filterOperator: PredicateLeaf.Operator)
(implicit df: DataFrame): Unit = {
def checkComparisonOperator(filter: SearchArgument) = {
val operator = filter.getLeaves.asScala
assert(operator.map(_.getOperator).contains(filterOperator))
}
checkFilterPredicate(df, predicate, checkComparisonOperator)
}
protected def checkFilterPredicate
(predicate: Predicate, stringExpr: String)
(implicit df: DataFrame): Unit = {
def checkLogicalOperator(filter: SearchArgument) = {
assert(filter.toString == stringExpr)
}
checkFilterPredicate(df, predicate, checkLogicalOperator)
}
test("filter pushdown - integer") {
withNestedOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { case (inputDF, colName, _) =>
implicit val df: DataFrame = inputDF
val intAttr = df(colName).expr
assert(df(colName).expr.dataType === IntegerType)
checkFilterPredicate(intAttr.isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate(intAttr === 1, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(intAttr <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(intAttr < 2, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(intAttr > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(intAttr <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(intAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(1) === intAttr, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(Literal(1) <=> intAttr, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(Literal(2) > intAttr, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(3) < intAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(1) >= intAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(4) <= intAttr, PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - long") {
withNestedOrcDataFrame(
(1 to 4).map(i => Tuple1(Option(i.toLong)))) { case (inputDF, colName, _) =>
implicit val df: DataFrame = inputDF
val longAttr = df(colName).expr
assert(df(colName).expr.dataType === LongType)
checkFilterPredicate(longAttr.isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate(longAttr === 1, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(longAttr <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(longAttr < 2, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(longAttr > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(longAttr <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(longAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(1) === longAttr, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(Literal(1) <=> longAttr, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(Literal(2) > longAttr, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(3) < longAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(1) >= longAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(4) <= longAttr, PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - float") {
withNestedOrcDataFrame(
(1 to 4).map(i => Tuple1(Option(i.toFloat)))) { case (inputDF, colName, _) =>
implicit val df: DataFrame = inputDF
val floatAttr = df(colName).expr
assert(df(colName).expr.dataType === FloatType)
checkFilterPredicate(floatAttr.isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate(floatAttr === 1, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(floatAttr <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(floatAttr < 2, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(floatAttr > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(floatAttr <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(floatAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(1) === floatAttr, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(Literal(1) <=> floatAttr, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(Literal(2) > floatAttr, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(3) < floatAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(1) >= floatAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(4) <= floatAttr, PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - double") {
withNestedOrcDataFrame(
(1 to 4).map(i => Tuple1(Option(i.toDouble)))) { case (inputDF, colName, _) =>
implicit val df: DataFrame = inputDF
val doubleAttr = df(colName).expr
assert(df(colName).expr.dataType === DoubleType)
checkFilterPredicate(doubleAttr.isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate(doubleAttr === 1, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(doubleAttr <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(doubleAttr < 2, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(doubleAttr > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(doubleAttr <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(doubleAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(1) === doubleAttr, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(Literal(1) <=> doubleAttr, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(Literal(2) > doubleAttr, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(3) < doubleAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(1) >= doubleAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(4) <= doubleAttr, PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - string") {
withNestedOrcDataFrame((1 to 4).map(i => Tuple1(i.toString))) { case (inputDF, colName, _) =>
implicit val df: DataFrame = inputDF
val strAttr = df(colName).expr
assert(df(colName).expr.dataType === StringType)
checkFilterPredicate(strAttr.isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate(strAttr === "1", PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(strAttr <=> "1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(strAttr < "2", PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(strAttr > "3", PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(strAttr <= "1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(strAttr >= "4", PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal("1") === strAttr, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(Literal("1") <=> strAttr, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(Literal("2") > strAttr, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal("3") < strAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal("1") >= strAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal("4") <= strAttr, PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - boolean") {
withNestedOrcDataFrame(
(true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { case (inputDF, colName, _) =>
implicit val df: DataFrame = inputDF
val booleanAttr = df(colName).expr
assert(df(colName).expr.dataType === BooleanType)
checkFilterPredicate(booleanAttr.isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate(booleanAttr === true, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(booleanAttr <=> true, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(booleanAttr < true, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(booleanAttr > false, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(booleanAttr <= false, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(booleanAttr >= false, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(false) === booleanAttr, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(Literal(false) <=> booleanAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(Literal(false) > booleanAttr, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(true) < booleanAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(true) >= booleanAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(true) <= booleanAttr, PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - decimal") {
withNestedOrcDataFrame(
(1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { case (inputDF, colName, _) =>
implicit val df: DataFrame = inputDF
val decimalAttr = df(colName).expr
assert(df(colName).expr.dataType === DecimalType(38, 18))
checkFilterPredicate(decimalAttr.isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate(decimalAttr === BigDecimal.valueOf(1), PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(decimalAttr <=> BigDecimal.valueOf(1),
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(decimalAttr < BigDecimal.valueOf(2), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(decimalAttr > BigDecimal.valueOf(3),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(decimalAttr <= BigDecimal.valueOf(1),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(decimalAttr >= BigDecimal.valueOf(4), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(
Literal(BigDecimal.valueOf(1)) === decimalAttr, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(
Literal(BigDecimal.valueOf(1)) <=> decimalAttr, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(
Literal(BigDecimal.valueOf(2)) > decimalAttr, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(
Literal(BigDecimal.valueOf(3)) < decimalAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(
Literal(BigDecimal.valueOf(1)) >= decimalAttr, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(
Literal(BigDecimal.valueOf(4)) <= decimalAttr, PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - timestamp") {
val input = Seq(
"1000-01-01 01:02:03",
"1582-10-01 00:11:22",
"1900-01-01 23:59:59",
"2020-05-25 10:11:12").map(Timestamp.valueOf)
withOrcFile(input.map(Tuple1(_))) { path =>
Seq(false, true).foreach { java8Api =>
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
readFile(path) { implicit df =>
val timestamps = input.map(Literal(_))
checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate($"_1" === timestamps(0), PredicateLeaf.Operator.EQUALS)
checkFilterPredicate($"_1" <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate($"_1" < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate($"_1" > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate($"_1" <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate($"_1" >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(timestamps(0)) === $"_1", PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(
Literal(timestamps(0)) <=> $"_1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(Literal(timestamps(1)) > $"_1", PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(
Literal(timestamps(2)) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(
Literal(timestamps(0)) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(timestamps(3)) <= $"_1", PredicateLeaf.Operator.LESS_THAN)
}
}
}
}
}
test("filter pushdown - combinations with logical operators") {
withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
checkFilterPredicate(
$"_1".isNotNull,
"leaf-0 = (IS_NULL _1), expr = (not leaf-0)"
)
checkFilterPredicate(
$"_1" =!= 1,
"leaf-0 = (IS_NULL _1), leaf-1 = (EQUALS _1 1), expr = (and (not leaf-0) (not leaf-1))"
)
checkFilterPredicate(
!($"_1" < 4),
"leaf-0 = (IS_NULL _1), leaf-1 = (LESS_THAN _1 4), expr = (and (not leaf-0) (not leaf-1))"
)
checkFilterPredicate(
$"_1" < 2 || $"_1" > 3,
"leaf-0 = (LESS_THAN _1 2), leaf-1 = (LESS_THAN_EQUALS _1 3), " +
"expr = (or leaf-0 (not leaf-1))"
)
checkFilterPredicate(
$"_1" < 2 && $"_1" > 3,
"leaf-0 = (IS_NULL _1), leaf-1 = (LESS_THAN _1 2), leaf-2 = (LESS_THAN_EQUALS _1 3), " +
"expr = (and (not leaf-0) leaf-1 (not leaf-2))"
)
}
}
test("filter pushdown - date") {
val input = Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map { day =>
Date.valueOf(day)
}
withOrcFile(input.map(Tuple1(_))) { path =>
Seq(false, true).foreach { java8Api =>
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
readFile(path) { implicit df =>
val dates = input.map(Literal(_))
checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate($"_1" === dates(0), PredicateLeaf.Operator.EQUALS)
checkFilterPredicate($"_1" <=> dates(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate($"_1" < dates(1), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate($"_1" > dates(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate($"_1" <= dates(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate($"_1" >= dates(3), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(dates(0) === $"_1", PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(dates(0) <=> $"_1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(dates(1) > $"_1", PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(dates(2) < $"_1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(dates(0) >= $"_1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(dates(3) <= $"_1", PredicateLeaf.Operator.LESS_THAN)
}
}
}
}
}
test("no filter pushdown - non-supported types") {
implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
}
// ArrayType
withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df =>
checkNoFilterPredicate($"_1".isNull, noneSupported = true)
}
// BinaryType
withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
checkNoFilterPredicate($"_1" <=> 1.b, noneSupported = true)
}
// MapType
withOrcDataFrame((1 to 4).map(i => Tuple1(Map(i -> i)))) { implicit df =>
checkNoFilterPredicate($"_1".isNotNull, noneSupported = true)
}
}
test("SPARK-12218 and SPARK-25699 Converting conjunctions into ORC SearchArguments") {
import org.apache.spark.sql.sources._
// The `LessThan` should be converted while the `StringContains` shouldn't
val schema = new StructType(
Array(
StructField("a", IntegerType, nullable = true),
StructField("b", StringType, nullable = true)))
assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") {
OrcFilters.createFilter(schema, Array(
LessThan("a", 10),
StringContains("b", "prefix")
)).get.toString
}
// The `LessThan` should be converted while the whole inner `And` shouldn't
assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") {
OrcFilters.createFilter(schema, Array(
LessThan("a", 10),
Not(And(
GreaterThan("a", 1),
StringContains("b", "prefix")
))
)).get.toString
}
// Safely remove unsupported `StringContains` predicate and push down `LessThan`
assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") {
OrcFilters.createFilter(schema, Array(
And(
LessThan("a", 10),
StringContains("b", "prefix")
)
)).get.toString
}
// Safely remove unsupported `StringContains` predicate, push down `LessThan` and `GreaterThan`.
assertResult("leaf-0 = (LESS_THAN a 10), leaf-1 = (LESS_THAN_EQUALS a 1)," +
" expr = (and leaf-0 (not leaf-1))") {
OrcFilters.createFilter(schema, Array(
And(
And(
LessThan("a", 10),
StringContains("b", "prefix")
),
GreaterThan("a", 1)
)
)).get.toString
}
}
test("SPARK-27699 Converting disjunctions into ORC SearchArguments") {
import org.apache.spark.sql.sources._
// The `LessThan` should be converted while the `StringContains` shouldn't
val schema = new StructType(
Array(
StructField("a", IntegerType, nullable = true),
StructField("b", StringType, nullable = true)))
// The predicate `StringContains` predicate is not able to be pushed down.
assertResult("leaf-0 = (LESS_THAN_EQUALS a 10), leaf-1 = (LESS_THAN a 1)," +
" expr = (or (not leaf-0) leaf-1)") {
OrcFilters.createFilter(schema, Array(
Or(
GreaterThan("a", 10),
And(
StringContains("b", "prefix"),
LessThan("a", 1)
)
)
)).get.toString
}
assertResult("leaf-0 = (LESS_THAN_EQUALS a 10), leaf-1 = (LESS_THAN a 1)," +
" expr = (or (not leaf-0) leaf-1)") {
OrcFilters.createFilter(schema, Array(
Or(
And(
GreaterThan("a", 10),
StringContains("b", "foobar")
),
And(
StringContains("b", "prefix"),
LessThan("a", 1)
)
)
)).get.toString
}
assert(OrcFilters.createFilter(schema, Array(
Or(
StringContains("b", "foobar"),
And(
StringContains("b", "prefix"),
LessThan("a", 1)
)
)
)).isEmpty)
}
test("SPARK-27160: Fix casting of the DecimalType literal") {
import org.apache.spark.sql.sources._
val schema = StructType(Array(StructField("a", DecimalType(3, 2))))
assertResult("leaf-0 = (LESS_THAN a 3.14), expr = leaf-0") {
OrcFilters.createFilter(schema, Array(
LessThan(
"a",
new java.math.BigDecimal(3.14, MathContext.DECIMAL64).setScale(2)))
).get.toString
}
}
test("SPARK-32622: case sensitivity in predicate pushdown") {
withTempPath { dir =>
val count = 10
val tableName = "spark_32622"
val tableDir1 = dir.getAbsoluteFile + "/table1"
// Physical ORC files have both `A` and `a` fields.
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
spark.range(count).repartition(count).selectExpr("id - 1 as A", "id as a")
.write.mode("overwrite").orc(tableDir1)
}
// Metastore table has both `A` and `a` fields too.
withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
sql(
s"""
|CREATE TABLE $tableName (A LONG, a LONG) USING ORC LOCATION '$tableDir1'
""".stripMargin)
checkAnswer(sql(s"select a, A from $tableName"), (0 until count).map(c => Row(c, c - 1)))
val actual1 = stripSparkFilter(sql(s"select A from $tableName where A < 0"))
assert(actual1.count() == 1)
val actual2 = stripSparkFilter(sql(s"select A from $tableName where a < 0"))
assert(actual2.count() == 0)
}
// Exception thrown for ambiguous case.
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
val e = intercept[AnalysisException] {
sql(s"select a from $tableName where a < 0").collect()
}
assert(e.getMessage.contains(
"Reference 'a' is ambiguous"))
}
}
// Metastore table has only `A` field.
withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(
s"""
|CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir1'
""".stripMargin)
val e = intercept[SparkException] {
sql(s"select A from $tableName where A < 0").collect()
}
assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
"""Found duplicate field(s) "A": [A, a] in case-insensitive mode"""))
}
}
// Physical ORC files have only `A` field.
val tableDir2 = dir.getAbsoluteFile + "/table2"
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
spark.range(count).repartition(count).selectExpr("id - 1 as A")
.write.mode("overwrite").orc(tableDir2)
}
withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(
s"""
|CREATE TABLE $tableName (a LONG) USING ORC LOCATION '$tableDir2'
""".stripMargin)
checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c => Row(c - 1)))
val actual = stripSparkFilter(sql(s"select a from $tableName where a < 0"))
assert(actual.count() == 1)
}
}
withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
sql(
s"""
|CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir2'
""".stripMargin)
checkAnswer(sql(s"select A from $tableName"), (0 until count).map(c => Row(c - 1)))
val actual = stripSparkFilter(sql(s"select A from $tableName where A < 0"))
assert(actual.count() == 1)
}
}
}
}
test("SPARK-32646: Case-insensitive field resolution for pushdown when reading ORC") {
import org.apache.spark.sql.sources._
def getOrcFilter(
schema: StructType,
filters: Seq[Filter],
caseSensitive: String): Option[SearchArgument] = {
var orcFilter: Option[SearchArgument] = None
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
orcFilter =
OrcFilters.createFilter(schema, filters)
}
orcFilter
}
def testFilter(
schema: StructType,
filters: Seq[Filter],
expected: SearchArgument): Unit = {
val caseSensitiveFilters = getOrcFilter(schema, filters, "true")
val caseInsensitiveFilters = getOrcFilter(schema, filters, "false")
assert(caseSensitiveFilters.isEmpty)
assert(caseInsensitiveFilters.isDefined)
assert(caseInsensitiveFilters.get.getLeaves().size() > 0)
assert(caseInsensitiveFilters.get.getLeaves().size() == expected.getLeaves().size())
(0 until expected.getLeaves().size()).foreach { index =>
assert(caseInsensitiveFilters.get.getLeaves().get(index) == expected.getLeaves().get(index))
}
}
val schema1 = StructType(Seq(StructField("cint", IntegerType)))
testFilter(schema1, Seq(GreaterThan("CINT", 1)),
newBuilder.startNot()
.lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build())
testFilter(schema1, Seq(
And(GreaterThan("CINT", 1), EqualTo("Cint", 2))),
newBuilder.startAnd()
.startNot()
.lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`()
.equals("cint", OrcFilters.getPredicateLeafType(IntegerType), 2L)
.`end`().build())
// Nested column case
val schema2 = StructType(Seq(StructField("a",
StructType(Seq(StructField("cint", IntegerType))))))
testFilter(schema2, Seq(GreaterThan("A.CINT", 1)),
newBuilder.startNot()
.lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build())
testFilter(schema2, Seq(GreaterThan("a.CINT", 1)),
newBuilder.startNot()
.lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build())
testFilter(schema2, Seq(GreaterThan("A.cint", 1)),
newBuilder.startNot()
.lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build())
testFilter(schema2, Seq(
And(GreaterThan("a.CINT", 1), EqualTo("a.Cint", 2))),
newBuilder.startAnd()
.startNot()
.lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`()
.equals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 2L)
.`end`().build())
}
}

View file

@ -146,9 +146,7 @@
</goals> </goals>
<configuration> <configuration>
<sources> <sources>
<source>v${hive.version.short}/src/gen/java</source> <source>src/gen/java</source>
<source>v${hive.version.short}/src/main/java</source>
<source>v${hive.version.short}/src/main/scala</source>
</sources> </sources>
</configuration> </configuration>
</execution> </execution>

Some files were not shown because too many files have changed in this diff Show more