[SPARK-20961][SQL] generalize the dictionary in ColumnVector

## What changes were proposed in this pull request?

As the first step of https://issues.apache.org/jira/browse/SPARK-20960 , to make `ColumnVector` public, this PR generalize `ColumnVector.dictionary` to not couple with parquet.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18183 from cloud-fan/dictionary.
This commit is contained in:
Wenchen Fan 2017-06-04 13:43:51 -07:00 committed by Xiao Li
parent c70c38eb93
commit dec9aa3b37
5 changed files with 100 additions and 21 deletions

View file

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet;
import org.apache.spark.sql.execution.vectorized.Dictionary;
public final class ParquetDictionary implements Dictionary {
private org.apache.parquet.column.Dictionary dictionary;
public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary) {
this.dictionary = dictionary;
}
@Override
public int decodeToInt(int id) {
return dictionary.decodeToInt(id);
}
@Override
public long decodeToLong(int id) {
return dictionary.decodeToLong(id);
}
@Override
public float decodeToFloat(int id) {
return dictionary.decodeToFloat(id);
}
@Override
public double decodeToDouble(int id) {
return dictionary.decodeToDouble(id);
}
@Override
public byte[] decodeToBinary(int id) {
return dictionary.decodeToBinary(id).getBytes();
}
}

View file

@ -169,7 +169,7 @@ public class VectorizedColumnReader {
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
// non-dictionary encoded values have already been added).
column.setDictionary(dictionary);
column.setDictionary(new ParquetDictionary(dictionary));
} else {
decodeDictionaryIds(rowId, num, column, dictionaryIds);
}

View file

@ -154,12 +154,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
return (float) rowsReturned / totalRowCount;
}
/**
* Returns the ColumnarBatch object that will be used for all rows returned by this reader.
* This object is reused. Calling this enables the vectorized reader. This should be called
* before any calls to nextKeyValue/nextBatch.
*/
// Creates a columnar batch that includes the schema from the data files and the additional
// partition columns appended to the end of the batch.
// For example, if the data contains two columns, with 2 partition columns:
@ -204,12 +198,17 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
}
/**
* Returns the ColumnarBatch object that will be used for all rows returned by this reader.
* This object is reused. Calling this enables the vectorized reader. This should be called
* before any calls to nextKeyValue/nextBatch.
*/
public ColumnarBatch resultBatch() {
if (columnarBatch == null) initBatch();
return columnarBatch;
}
/*
/**
* Can be called before any rows are returned to enable returning columnar batches directly.
*/
public void enableReturningBatches() {
@ -237,9 +236,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
}
private void initializeInternal() throws IOException, UnsupportedOperationException {
/**
* Check that the requested schema is supported.
*/
// Check that the requested schema is supported.
missingColumns = new boolean[requestedSchema.getFieldCount()];
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
Type t = requestedSchema.getFields().get(i);

View file

@ -20,8 +20,6 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
@ -313,8 +311,8 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
* Ensures that there is enough storage to store capcity elements. That is, the put() APIs
* must work for all rowIds < capcity.
* Ensures that there is enough storage to store capacity elements. That is, the put() APIs
* must work for all rowIds < capacity.
*/
protected abstract void reserveInternal(int capacity);
@ -479,7 +477,6 @@ public abstract class ColumnVector implements AutoCloseable {
/**
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
* src should contain `count` doubles written as ieee format.
*/
public abstract void putFloats(int rowId, int count, float[] src, int srcIndex);
@ -506,7 +503,6 @@ public abstract class ColumnVector implements AutoCloseable {
/**
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
* src should contain `count` doubles written as ieee format.
*/
public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
@ -628,8 +624,8 @@ public abstract class ColumnVector implements AutoCloseable {
ColumnVector.Array a = getByteArray(rowId);
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
} else {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
return UTF8String.fromBytes(v.getBytes());
byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
return UTF8String.fromBytes(bytes);
}
}
@ -643,8 +639,7 @@ public abstract class ColumnVector implements AutoCloseable {
System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
return bytes;
} else {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
return v.getBytes();
return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
}
}

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.vectorized;
/**
* The interface for dictionary in ColumnVector to decode dictionary encoded values.
*/
public interface Dictionary {
int decodeToInt(int id);
long decodeToLong(int id);
float decodeToFloat(int id);
double decodeToDouble(int id);
byte[] decodeToBinary(int id);
}