Switch to Arrow and Ubuntu packages

This commit is contained in:
Dan Forsberg 2020-10-19 12:12:15 +01:00
parent d44c88ad64
commit 723b3d6ac7
6 changed files with 82 additions and 190 deletions

View File

@ -3,119 +3,39 @@ ROOT:=$(HERE)/../..
VTABLE:=$(ROOT)/parquet VTABLE:=$(ROOT)/parquet
SQLITE:=$(ROOT)/sqlite SQLITE:=$(ROOT)/sqlite
# Directories
ARROW=$(HERE)/arrow
ARROW_RELEASE=$(ARROW)/cpp/release
BOOST_ROOT=$(ARROW_RELEASE)/boost_ep-prefix/src/boost_ep
BOOST=$(BOOST_ROOT)/stage/lib
BROTLI=$(ARROW_RELEASE)/brotli_ep/src/brotli_ep-install/lib/x86_64-linux-gnu
ICU=$(HERE)/icu
LZ4=$(ARROW_RELEASE)/lz4_ep-prefix/src/lz4_ep/lib
PARQUET_CPP=$(HERE)/parquet-cpp
SNAPPY=$(ARROW_RELEASE)/snappy_ep/src/snappy_ep-install/lib
ZLIB=$(ARROW_RELEASE)/zlib_ep/src/zlib_ep-install/lib
ZSTD=$(ARROW_RELEASE)/zstd_ep-prefix/src/zstd_ep/lib
# Libraries
# profile_gen, profile_build for PGO
APACHE_BUILD=release
ARROW_LIB = $(ARROW_RELEASE)/$(APACHE_BUILD)/libarrow.a
BOOST_FILESYSTEM_LIB = $(BOOST)/libboost_filesystem.a
BOOST_REGEX_LIB = $(BOOST)/libboost_regex.a
BOOST_SYSTEM_LIB = $(BOOST)/libboost_system.a
BROTLI_COMMON_LIB = $(BROTLI)/libbrotlicommon.a
BROTLI_DEC_LIB = $(BROTLI)/libbrotlidec.a
BROTLI_ENC_LIB = $(BROTLI)/libbrotlienc.a
ICU_I18N_LIB=$(ICU)/source/lib/libicui18n.a
ICU_UC_LIB=$(ICU)/source/lib/libicuuc.a
ICU_DATA_LIB=$(ICU)/source/lib/libicudata.a
LZ4_LIB = $(LZ4)/liblz4.a
PARQUET_CPP_LIB = $(PARQUET_CPP)/build/$(APACHE_BUILD)/libparquet.a
SNAPPY_LIB = $(SNAPPY)/libsnappy.a
THRIFT_LIB = $(PARQUET_CPP)/thrift_ep/src/thrift_ep-install/lib/libthrift.a
ZLIB_LIB = $(ZLIB)/libz.a
ZSTD_LIB = $(ZSTD)/libzstd.a
# Flags # Flags
CC = gcc
CXX = g++ CXX = g++
OPTIMIZATIONS = -O3 OPTIMIZATIONS = -O3
CPUS:=$(shell nproc) CFLAGS = -I $(SQLITE) $(OPTIMIZATIONS) -std=c++11 -Wall -fPIC -g
CFLAGS = -I $(SQLITE) -I $(PARQUET_CPP)/src -I $(ARROW)/cpp/src $(OPTIMIZATIONS) -std=c++11 -Wall -fPIC -g LIBS = -lparquet -lboost_regex -lboost_system -lboost_filesystem \
-lbrotlienc -lbrotlicommon -lbrotlidec -licui18n -licuuc -licudata \
-llz4 -lsnappy -lthrift -lz -lzstd -lcrypto -lssl
ALL_LIBS = $(PARQUET_CPP_LIB) $(LZ4_LIB) $(ZSTD_LIB) $(THRIFT_LIB) $(SNAPPY_LIB) $(ARROW_LIB) \ LDFLAGS = $(OPTIMIZATIONS) -Wl,--no-whole-archive $(LIBS) -lz -lcrypto -lssl
$(ICU_I18N_LIB) $(ICU_UC_LIB) $(ICU_DATA_LIB) \
$(BROTLI_ENC_LIB) $(BROTLI_COMMON_LIB) $(BROTLI_DEC_LIB) $(BOOST_REGEX_LIB) $(BOOST_SYSTEM_LIB) $(BOOST_FILESYSTEM_LIB)
LDFLAGS = $(OPTIMIZATIONS) \
-Wl,--whole-archive $(ALL_LIBS) \
-Wl,--no-whole-archive -lz -lcrypto -lssl
OBJ = parquet.o parquet_filter.o parquet_table.o parquet_cursor.o OBJ = parquet.o parquet_filter.o parquet_table.o parquet_cursor.o
LIBS = $(ARROW_LIB) $(PARQUET_CPP_LIB) $(ICU_I18N_LIB)
PROF = PROF =
libparquet.so: $(LIBS) $(OBJ) libparquet.so: $(OBJ)
$(CXX) $(PROF) -shared -o $@ $(OBJ) $(LDFLAGS) $(CXX) $(PROF) -shared -o $@ $(OBJ) $(LDFLAGS)
parquet_filter.o: $(VTABLE)/parquet_filter.cc $(VTABLE)/parquet_filter.h $(ARROW) $(PARQUET_CPP) parquet_filter.o: $(VTABLE)/parquet_filter.cc $(VTABLE)/parquet_filter.h
$(CXX) $(PROF) -c -o $@ $< $(CFLAGS) $(CXX) $(PROF) -c -o $@ $< $(CFLAGS)
parquet_cursor.o: $(VTABLE)/parquet_cursor.cc $(VTABLE)/parquet_cursor.h $(VTABLE)/parquet_table.h $(VTABLE)/parquet_filter.h $(ARROW) $(PARQUET_CPP) parquet_cursor.o: $(VTABLE)/parquet_cursor.cc $(VTABLE)/parquet_cursor.h $(VTABLE)/parquet_table.h $(VTABLE)/parquet_filter.h
$(CXX) $(PROF) -c -o $@ $< $(CFLAGS) $(CXX) $(PROF) -c -o $@ $< $(CFLAGS)
parquet_table.o: $(VTABLE)/parquet_table.cc $(VTABLE)/parquet_table.h $(ARROW) $(PARQUET_CPP) parquet_table.o: $(VTABLE)/parquet_table.cc $(VTABLE)/parquet_table.h
$(CXX) $(PROF) -c -o $@ $< $(CFLAGS) $(CXX) $(PROF) -c -o $@ $< $(CFLAGS)
parquet.o: $(VTABLE)/parquet.cc $(VTABLE)/parquet_cursor.h $(VTABLE)/parquet_table.h $(VTABLE)/parquet_filter.h $(ARROW) $(PARQUET_CPP) parquet.o: $(VTABLE)/parquet.cc $(VTABLE)/parquet_cursor.h $(VTABLE)/parquet_table.h $(VTABLE)/parquet_filter.h
$(CXX) $(PROF) -c -o $@ $< $(CFLAGS) $(CXX) $(PROF) -c -o $@ $< $(CFLAGS)
$(ARROW): .PHONY: clean parquet
rm -rf $(ARROW)
git clone https://github.com/apache/arrow.git $(ARROW)
cd $(ARROW) && git checkout apache-arrow-0.9.0
mkdir $(ARROW)/cpp/release
cd $(ARROW)/cpp/release && cmake -DCMAKE_BUILD_TYPE=$(APACHE_BUILD) -DARROW_BOOST_VENDORED=ON -DARROW_BOOST_USE_SHARED=OFF -DPARQUET_BUILD_SHARED=OFF ..
touch -d @0 $(ARROW)
$(ARROW_LIB): $(ARROW)
cd $(ARROW)/cpp/release && make -j$(CPUS)
# This is pretty gross. I'm sure someone who knows what they're doing could do this more cleanly.
$(ICU_I18N_LIB):
rm -rf $(ICU)
mkdir $(ICU)
cd $(ICU) && wget https://github.com/unicode-org/icu/releases/download/release-$(ICU_VERSION)/icu4c-$(ICU_VERSION_U)-src.tgz
cd $(ICU) && tar xf icu4c-$(ICU_VERSION_U)-src.tgz --strip-components=1
cd $(ICU)/source && ./configure --enable-static
cd $(ICU)/source && make -j$(CPUS) LIBCFLAGS='-fPIC' LIBCXXFLAGS='-fPIC'
$(PARQUET_CPP):
rm -rf $(PARQUET_CPP)
git clone https://github.com/apache/parquet-cpp.git $(PARQUET_CPP)
cd $(PARQUET_CPP) && git checkout apache-parquet-cpp-1.4.0
cd $(PARQUET_CPP) && BOOST_ROOT=$(BOOST_ROOT) BOOST_STATIC_REGEX_LIBRARY=$(BOOST_REGEX_LIB) SNAPPY_STATIC_LIB=$(SNAPPY_LIB) BROTLI_STATIC_LIB_ENC=$(BROTLI_ENC_LIB) BROTLI_STATIC_LIB_DEC=$(BROTLI_DEC_LIB) BROTLI_STATIC_LIB_COMMON=$(BROTLI_COMMON_LIB) ZLIB_STATIC_LIB=$(ZLIB_LIB) LZ4_STATIC_LIB=$(LZ4_LIB) ZSTD_STATIC_LIB=$(ZSTD_LIB) cmake -DCMAKE_BUILD_TYPE=$(APACHE_BUILD) -DPARQUET_MINIMAL_DEPENDENCY=ON -DPARQUET_ARROW_LINKAGE=static -DPARQUET_BOOST_USE_SHARED=OFF -DPARQUET_BUILD_SHARED=OFF .
touch -d @0 $(PARQUET_CPP)
$(PARQUET_CPP_LIB): $(PARQUET_CPP) $(ARROW_LIB)
cd $(PARQUET_CPP) && make -j$(CPUS)
.PHONY: clean arrow icu parquet publish_libs
clean: clean:
rm -f *.o *.so rm -f *.o *.so
distclean: distclean:
rm -rf $(SQLITE) $(HERE) rm -rf $(SQLITE) $(HERE)
arrow: $(ARROW_LIB)
icu: $(ICU_I18N_LIB)
parquet: $(PARQUET_CPP_LIB)
publish_libs:
tar -cJf libs.tar.xz $(ALL_LIBS) $(SQLITE)/sqlite3
s3cmd put libs.tar.xz s3://cldellow/public/libparquet/$$(lsb_release -s -r)/libs.tar.xz

View File

@ -1,31 +1,36 @@
#!/bin/bash #!/bin/bash
set -euo pipefail set -euo pipefail
apt install -y sudo lsb-release wget
here=$(dirname "${BASH_SOURCE[0]}") here=$(dirname "${BASH_SOURCE[0]}")
here=$(readlink -f "$here") here=$(readlink -f "$here")
prebuilt="$here"/build/linux/prebuilt
ubuntu="$(lsb_release -s -r)" ubuntu="$(lsb_release -s -r)"
libs=(libarrow.a libboost_filesystem.a libboost_regex.a libboost_system.a libbrotlicommon.a libbrotlidec.a \
libbrotlienc.a libicudata.a libicui18n.a libicuuc.a liblz4.a libparquet.a libsnappy.a libthrift.a libzstd.a)
lib_locs=()
setup_directories() { setup_directories() {
cd "$here" cd "$here"
mkdir -p build/linux mkdir -p build/linux
mkdir -p "$prebuilt"
cp -f build/Makefile.linux build/linux/Makefile cp -f build/Makefile.linux build/linux/Makefile
cd build/linux cd build/linux
} }
install_prerequisites() { install_prerequisites() {
# install Apache Arrow libs
# NOTE: Pinned to Ubuntu Focal
wget https://apache.bintray.com/arrow/ubuntu/apache-arrow-archive-keyring-latest-focal.deb
sudo apt install -y -V ./apache-arrow-archive-keyring-latest-focal.deb
sudo apt update -y
sudo apt install -y -V libparquet-dev liblz4-dev libzstd-dev libthrift-dev \
libsnappy-dev libthrift-dev libbrotli-dev libz-dev
# Install prereqs based on https://github.com/apache/parquet-cpp#linux # Install prereqs based on https://github.com/apache/parquet-cpp#linux
sudo apt-get install libboost-dev g++ libboost-filesystem-dev \ sudo apt install -y libboost-dev g++ libboost-filesystem-dev \
libboost-program-options-dev libboost-regex-dev \ libboost-program-options-dev libboost-regex-dev \
libboost-system-dev libboost-test-dev \ libboost-system-dev libboost-test-dev \
libssl-dev libtool bison flex pkg-config libreadline-dev libncurses-dev libssl-dev libtool bison flex pkg-config libreadline-dev libncurses-dev
# Install prereqs based on https://github.com/apache/arrow/tree/master/cpp # Install prereqs based on https://github.com/apache/arrow/tree/master/cpp
sudo apt-get install cmake \ sudo apt install -y cmake \
libboost-dev \ libboost-dev \
libboost-filesystem-dev \ libboost-filesystem-dev \
libboost-system-dev libboost-system-dev
@ -48,6 +53,9 @@ set_icu_version() {
18.04) 18.04)
export ICU_VERSION=60-2 export ICU_VERSION=60-2
;; ;;
20.10)
export ICU_VERSION=67-1
;;
*) *)
echo "unsure what libicu version to use" >&2 echo "unsure what libicu version to use" >&2
exit 1 exit 1
@ -56,47 +64,11 @@ set_icu_version() {
export ICU_VERSION_U=${ICU_VERSION//-/_} export ICU_VERSION_U=${ICU_VERSION//-/_}
} }
add_prebuilt_lib() {
lib_locs+=("$1=$prebuilt/$2.a")
}
fetch_prebuilt_libs() {
if [ ! -e "$prebuilt"/complete ]; then
(
cd "$prebuilt"
curl "https://s3.amazonaws.com/cldellow/public/libparquet/$ubuntu/libs.tar.xz" > libs.tar.xz
tar xf libs.tar.xz --xform 's#.*/##'
touch "$prebuilt"/complete
)
fi
if [ ! -e "$here"/sqlite/sqlite3 ]; then
ln -s "$prebuilt"/sqlite3 "$here"/sqlite/sqlite3
fi
add_prebuilt_lib "PARQUET_CPP_LIB" libparquet
add_prebuilt_lib "LZ4_LIB" liblz4
add_prebuilt_lib "ZSTD_LIB" libzstd
add_prebuilt_lib "THRIFT_LIB" libthrift
add_prebuilt_lib "SNAPPY_LIB" libsnappy
add_prebuilt_lib "ARROW_LIB" libarrow
add_prebuilt_lib "ICU_I18N_LIB" libicui18n
add_prebuilt_lib "ICU_UC_LIB" libicuuc
add_prebuilt_lib "ICU_DATA_LIB" libicudata
add_prebuilt_lib "BROTLI_ENC_LIB" libbrotlienc
add_prebuilt_lib "BROTLI_COMMON_LIB" libbrotlicommon
add_prebuilt_lib "BROTLI_DEC_LIB" libbrotlidec
add_prebuilt_lib "BOOST_REGEX_LIB" libboost_regex
add_prebuilt_lib "BOOST_SYSTEM_LIB" libboost_system
add_prebuilt_lib "BOOST_FILESYSTEM_LIB" libboost_filesystem
}
main() { main() {
set_icu_version
setup_directories setup_directories
install_prerequisites install_prerequisites
build_sqlite build_sqlite
set_icu_version
if [ -v PREBUILT ]; then if [ -v PREBUILT ]; then
fetch_prebuilt_libs fetch_prebuilt_libs

View File

@ -290,7 +290,7 @@ static int parquetColumn(
case parquet::Type::BYTE_ARRAY: case parquet::Type::BYTE_ARRAY:
{ {
parquet::ByteArray* rv = cursor->getByteArray(col); parquet::ByteArray* rv = cursor->getByteArray(col);
if(cursor->getLogicalType(col) == parquet::LogicalType::UTF8) { if(cursor->getConvertedType(col) == parquet::ConvertedType::UTF8) {
sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT); sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT);
} else { } else {
sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT); sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT);

View File

@ -31,7 +31,7 @@ bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint)
} }
} }
bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) { bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr<parquet::Statistics> _stats) {
if(!_stats->HasMinMax()) { if(!_stats->HasMinMax()) {
return true; return true;
} }
@ -48,8 +48,8 @@ bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, s
parquet::Type::type pqType = types[constraint.column]; parquet::Type::type pqType = types[constraint.column];
if(pqType == parquet::Type::BYTE_ARRAY) { if(pqType == parquet::Type::BYTE_ARRAY) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>* stats = parquet::TypedStatistics<parquet::ByteArrayType>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>*)_stats.get(); (parquet::TypedStatistics<parquet::ByteArrayType>*)_stats.get();
minPtr = stats->min().ptr; minPtr = stats->min().ptr;
minLen = stats->min().len; minLen = stats->min().len;
@ -137,9 +137,9 @@ bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, s
} }
} }
bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) { bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr<parquet::Statistics> _stats) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>* stats = parquet::TypedStatistics<parquet::ByteArrayType>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>*)_stats.get(); (parquet::TypedStatistics<parquet::ByteArrayType>*)_stats.get();
if(!stats->HasMinMax()) { if(!stats->HasMinMax()) {
return true; return true;
@ -195,7 +195,7 @@ int64_t int96toMsSinceEpoch(const parquet::Int96& rv) {
return nsSinceEpoch; return nsSinceEpoch;
} }
bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) { bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr<parquet::Statistics> _stats) {
if(!_stats->HasMinMax()) { if(!_stats->HasMinMax()) {
return true; return true;
} }
@ -211,27 +211,27 @@ bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint
parquet::Type::type pqType = types[column]; parquet::Type::type pqType = types[column];
if(pqType == parquet::Type::INT32) { if(pqType == parquet::Type::INT32) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT32>>* stats = parquet::TypedStatistics<parquet::Int32Type>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT32>>*)_stats.get(); (parquet::TypedStatistics<parquet::Int32Type>*)_stats.get();
min = stats->min(); min = stats->min();
max = stats->max(); max = stats->max();
} else if(pqType == parquet::Type::INT64) { } else if(pqType == parquet::Type::INT64) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT64>>* stats = parquet::TypedStatistics<parquet::Int64Type>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT64>>*)_stats.get(); (parquet::TypedStatistics<parquet::Int64Type>*)_stats.get();
min = stats->min(); min = stats->min();
max = stats->max(); max = stats->max();
} else if(pqType == parquet::Type::INT96) { } else if(pqType == parquet::Type::INT96) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT96>>* stats = parquet::TypedStatistics<parquet::Int96Type>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT96>>*)_stats.get(); (parquet::TypedStatistics<parquet::Int96Type>*)_stats.get();
min = int96toMsSinceEpoch(stats->min()); min = int96toMsSinceEpoch(stats->min());
max = int96toMsSinceEpoch(stats->max()); max = int96toMsSinceEpoch(stats->max());
} else if(pqType == parquet::Type::BOOLEAN) { } else if(pqType == parquet::Type::BOOLEAN) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BOOLEAN>>* stats = parquet::TypedStatistics<parquet::BooleanType>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BOOLEAN>>*)_stats.get(); (parquet::TypedStatistics<parquet::BooleanType>*)_stats.get();
min = stats->min(); min = stats->min();
max = stats->max(); max = stats->max();
@ -272,7 +272,7 @@ bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint
return true; return true;
} }
bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) { bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr<parquet::Statistics> _stats) {
if(!_stats->HasMinMax()) { if(!_stats->HasMinMax()) {
return true; return true;
} }
@ -288,14 +288,14 @@ bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint,
parquet::Type::type pqType = types[column]; parquet::Type::type pqType = types[column];
if(pqType == parquet::Type::DOUBLE) { if(pqType == parquet::Type::DOUBLE) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::DOUBLE>>* stats = parquet::TypedStatistics<parquet::DoubleType>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::DOUBLE>>*)_stats.get(); (parquet::TypedStatistics<parquet::DoubleType>*)_stats.get();
min = stats->min(); min = stats->min();
max = stats->max(); max = stats->max();
} else if(pqType == parquet::Type::FLOAT) { } else if(pqType == parquet::Type::FLOAT) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::FLOAT>>* stats = parquet::TypedStatistics<parquet::FloatType>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::FLOAT>>*)_stats.get(); (parquet::TypedStatistics<parquet::FloatType>*)_stats.get();
min = stats->min(); min = stats->min();
max = stats->max(); max = stats->max();
@ -527,7 +527,7 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() {
} else { } else {
std::unique_ptr<parquet::ColumnChunkMetaData> md = rowGroupMetadata->ColumnChunk(column); std::unique_ptr<parquet::ColumnChunkMetaData> md = rowGroupMetadata->ColumnChunk(column);
if(md->is_stats_set()) { if(md->is_stats_set()) {
std::shared_ptr<parquet::RowGroupStatistics> stats = md->statistics(); std::shared_ptr<parquet::Statistics> stats = md->statistics();
// SQLite is much looser with types than you might expect if you // SQLite is much looser with types than you might expect if you
// come from a Postgres background. The constraint '30.0' (that is, // come from a Postgres background. The constraint '30.0' (that is,
@ -545,7 +545,7 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() {
} else { } else {
parquet::Type::type pqType = types[column]; parquet::Type::type pqType = types[column];
if(pqType == parquet::Type::BYTE_ARRAY && logicalTypes[column] == parquet::LogicalType::UTF8) { if(pqType == parquet::Type::BYTE_ARRAY && ConvertedTypes[column] == parquet::ConvertedType::UTF8) {
rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats); rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats);
} else if(pqType == parquet::Type::BYTE_ARRAY) { } else if(pqType == parquet::Type::BYTE_ARRAY) {
rv = currentRowGroupSatisfiesBlobFilter(constraints[i], stats); rv = currentRowGroupSatisfiesBlobFilter(constraints[i], stats);
@ -608,13 +608,13 @@ start:
types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type()); types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type());
} }
while(logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) { while(ConvertedTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) {
logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type()); ConvertedTypes.push_back(rowGroupMetadata->schema()->Column(0)->converted_type());
} }
for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) { for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) {
types[i] = rowGroupMetadata->schema()->Column(i)->physical_type(); types[i] = rowGroupMetadata->schema()->Column(i)->physical_type();
logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type(); ConvertedTypes[i] = rowGroupMetadata->schema()->Column(i)->converted_type();
} }
for(unsigned int i = 0; i < colRows.size(); i++) { for(unsigned int i = 0; i < colRows.size(); i++) {
@ -664,7 +664,7 @@ bool ParquetCursor::currentRowSatisfiesFilter() {
rv = !isNull(column); rv = !isNull(column);
} else { } else {
if(logicalTypes[column] == parquet::LogicalType::UTF8) { if(ConvertedTypes[column] == parquet::ConvertedType::UTF8) {
rv = currentRowSatisfiesTextFilter(constraints[i]); rv = currentRowSatisfiesTextFilter(constraints[i]);
} else { } else {
parquet::Type::type pqType = types[column]; parquet::Type::type pqType = types[column];
@ -928,8 +928,8 @@ parquet::Type::type ParquetCursor::getPhysicalType(int col) {
return types[col]; return types[col];
} }
parquet::LogicalType::type ParquetCursor::getLogicalType(int col) { parquet::ConvertedType::type ParquetCursor::getConvertedType(int col) {
return logicalTypes[col]; return ConvertedTypes[col];
} }
void ParquetCursor::close() { void ParquetCursor::close() {

View File

@ -13,7 +13,7 @@ class ParquetCursor {
std::shared_ptr<parquet::RowGroupReader> rowGroup; std::shared_ptr<parquet::RowGroupReader> rowGroup;
std::vector<std::shared_ptr<parquet::Scanner>> scanners; std::vector<std::shared_ptr<parquet::Scanner>> scanners;
std::vector<parquet::Type::type> types; std::vector<parquet::Type::type> types;
std::vector<parquet::LogicalType::type> logicalTypes; std::vector<parquet::ConvertedType::type> ConvertedTypes;
std::vector<int> colRows; std::vector<int> colRows;
std::vector<bool> colNulls; std::vector<bool> colNulls;
@ -36,10 +36,10 @@ class ParquetCursor {
bool currentRowSatisfiesFilter(); bool currentRowSatisfiesFilter();
bool currentRowGroupSatisfiesFilter(); bool currentRowGroupSatisfiesFilter();
bool currentRowGroupSatisfiesRowIdFilter(Constraint& constraint); bool currentRowGroupSatisfiesRowIdFilter(Constraint& constraint);
bool currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats); bool currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr<parquet::Statistics> stats);
bool currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats); bool currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr<parquet::Statistics> stats);
bool currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats); bool currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr<parquet::Statistics> stats);
bool currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats); bool currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr<parquet::Statistics> stats);
bool currentRowSatisfiesTextFilter(Constraint& constraint); bool currentRowSatisfiesTextFilter(Constraint& constraint);
bool currentRowSatisfiesIntegerFilter(Constraint& constraint); bool currentRowSatisfiesIntegerFilter(Constraint& constraint);
@ -60,7 +60,7 @@ public:
unsigned int getNumConstraints() const; unsigned int getNumConstraints() const;
const Constraint& getConstraint(unsigned int i) const; const Constraint& getConstraint(unsigned int i) const;
parquet::Type::type getPhysicalType(int col); parquet::Type::type getPhysicalType(int col);
parquet::LogicalType::type getLogicalType(int col); parquet::ConvertedType::type getConvertedType(int col);
ParquetTable* getTable() const; ParquetTable* getTable() const;
int getInt32(int col); int getInt32(int col);

View File

@ -66,33 +66,33 @@ std::string ParquetTable::CreateStatement() {
std::string type; std::string type;
parquet::Type::type physical = col->physical_type(); parquet::Type::type physical = col->physical_type();
parquet::LogicalType::type logical = col->logical_type(); parquet::ConvertedType::type converted = col->converted_type();
// Be explicit about which types we understand so we don't mislead someone // Be explicit about which types we understand so we don't mislead someone
// whose unsigned ints start getting interpreted as signed. (We could // whose unsigned ints start getting interpreted as signed. (We could
// support this for UINT_8/16/32 -- and for UINT_64 we could throw if // support this for UINT_8/16/32 -- and for UINT_64 we could throw if
// the high bit was set.) // the high bit was set.)
if(logical == parquet::LogicalType::NONE || if(converted == parquet::ConvertedType::NONE ||
logical == parquet::LogicalType::UTF8 || converted == parquet::ConvertedType::UTF8 ||
logical == parquet::LogicalType::DATE || converted == parquet::ConvertedType::DATE ||
logical == parquet::LogicalType::TIME_MILLIS || converted == parquet::ConvertedType::TIME_MILLIS ||
logical == parquet::LogicalType::TIMESTAMP_MILLIS || converted == parquet::ConvertedType::TIMESTAMP_MILLIS ||
logical == parquet::LogicalType::TIME_MICROS || converted == parquet::ConvertedType::TIME_MICROS ||
logical == parquet::LogicalType::TIMESTAMP_MICROS || converted == parquet::ConvertedType::TIMESTAMP_MICROS ||
logical == parquet::LogicalType::INT_8 || converted == parquet::ConvertedType::INT_8 ||
logical == parquet::LogicalType::INT_16 || converted == parquet::ConvertedType::INT_16 ||
logical == parquet::LogicalType::INT_32 || converted == parquet::ConvertedType::INT_32 ||
logical == parquet::LogicalType::INT_64) { converted == parquet::ConvertedType::INT_64) {
switch(physical) { switch(physical) {
case parquet::Type::BOOLEAN: case parquet::Type::BOOLEAN:
type = "TINYINT"; type = "TINYINT";
break; break;
case parquet::Type::INT32: case parquet::Type::INT32:
if(logical == parquet::LogicalType::NONE || if(converted == parquet::ConvertedType::NONE ||
logical == parquet::LogicalType::INT_32) { converted == parquet::ConvertedType::INT_32) {
type = "INT"; type = "INT";
} else if(logical == parquet::LogicalType::INT_8) { } else if(converted == parquet::ConvertedType::INT_8) {
type = "TINYINT"; type = "TINYINT";
} else if(logical == parquet::LogicalType::INT_16) { } else if(converted == parquet::ConvertedType::INT_16) {
type = "SMALLINT"; type = "SMALLINT";
} }
break; break;
@ -109,7 +109,7 @@ std::string ParquetTable::CreateStatement() {
type = "DOUBLE"; type = "DOUBLE";
break; break;
case parquet::Type::BYTE_ARRAY: case parquet::Type::BYTE_ARRAY:
if(logical == parquet::LogicalType::UTF8) { if(converted == parquet::ConvertedType::UTF8) {
type = "TEXT"; type = "TEXT";
} else { } else {
type = "BLOB"; type = "BLOB";
@ -126,7 +126,7 @@ std::string ParquetTable::CreateStatement() {
if(type.empty()) { if(type.empty()) {
std::ostringstream ss; std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has unsupported type: " << ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has unsupported type: " <<
parquet::TypeToString(physical) << "/" << parquet::LogicalTypeToString(logical); parquet::TypeToString(physical) << "/" << parquet::ConvertedTypeToString(converted);
throw std::invalid_argument(ss.str()); throw std::invalid_argument(ss.str());
} }
@ -137,8 +137,8 @@ std::string ParquetTable::CreateStatement() {
col->name().data(), col->name().data(),
col->physical_type(), col->physical_type(),
parquet::TypeToString(col->physical_type()).data(), parquet::TypeToString(col->physical_type()).data(),
col->logical_type(), col->converted_type(),
parquet::LogicalTypeToString(col->logical_type()).data(), parquet::ConvertedTypeToString(col->converted_type()).data(),
type.data()); type.data());
#endif #endif