From 96405b77dc00f95b27686dad3d88411839ead7be Mon Sep 17 00:00:00 2001 From: Mikko Harju Date: Wed, 13 Nov 2019 15:17:29 +0200 Subject: [PATCH] Refactored the types to match the most recent Apache Arrow version --- parquet/parquet.cc | 14 +++++----- parquet/parquet_cursor.cc | 56 +++++++++++++++++++-------------------- parquet/parquet_cursor.h | 12 ++++----- parquet/parquet_table.cc | 38 +++++++++++++------------- 4 files changed, 60 insertions(+), 60 deletions(-) diff --git a/parquet/parquet.cc b/parquet/parquet.cc index 9d5857e..ffa32b2 100644 --- a/parquet/parquet.cc +++ b/parquet/parquet.cc @@ -29,9 +29,9 @@ SQLITE_EXTENSION_INIT1 /* Forward references to the various virtual table methods implemented * in this file. */ -static int parquetCreate(sqlite3*, void*, int, const char*const*, +static int parquetCreate(sqlite3*, void*, int, const char*const*, sqlite3_vtab**,char**); -static int parquetConnect(sqlite3*, void*, int, const char*const*, +static int parquetConnect(sqlite3*, void*, int, const char*const*, sqlite3_vtab**,char**); static int parquetBestIndex(sqlite3_vtab*,sqlite3_index_info*); static int parquetDisconnect(sqlite3_vtab*); @@ -290,7 +290,7 @@ static int parquetColumn( case parquet::Type::BYTE_ARRAY: { parquet::ByteArray* rv = cursor->getByteArray(col); - if(cursor->getLogicalType(col) == parquet::LogicalType::UTF8) { + if(cursor->getLogicalType(col) == parquet::ConvertedType::UTF8) { sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT); } else { sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT); @@ -336,7 +336,7 @@ static int parquetColumn( */ static int parquetRowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid){ ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; - *pRowid = cursor->getRowId(); + *pRowid = cursor->getRowId(); return SQLITE_OK; } @@ -741,15 +741,15 @@ static sqlite3_module ParquetModule = { 0, /* xRename */ }; -/* +/* * This routine is called when the extension is loaded. The new * Parquet virtual table module is registered with the calling database * connection. */ extern "C" { int sqlite3_parquet_init( - sqlite3 *db, - char **pzErrMsg, + sqlite3 *db, + char **pzErrMsg, const sqlite3_api_routines *pApi ){ int rc; diff --git a/parquet/parquet_cursor.cc b/parquet/parquet_cursor.cc index e0ab8b6..b52feb8 100644 --- a/parquet/parquet_cursor.cc +++ b/parquet/parquet_cursor.cc @@ -31,7 +31,7 @@ bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint) } } -bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr _stats) { +bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr _stats) { if(!_stats->HasMinMax()) { return true; } @@ -48,8 +48,8 @@ bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, s parquet::Type::type pqType = types[constraint.column]; if(pqType == parquet::Type::BYTE_ARRAY) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + parquet::TypedStatistics* stats = + (parquet::TypedStatistics*)_stats.get(); minPtr = stats->min().ptr; minLen = stats->min().len; @@ -137,9 +137,9 @@ bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, s } } -bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr _stats) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); +bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr _stats) { + parquet::TypedStatistics* stats = + (parquet::TypedStatistics*)_stats.get(); if(!stats->HasMinMax()) { return true; @@ -195,7 +195,7 @@ int64_t int96toMsSinceEpoch(const parquet::Int96& rv) { return nsSinceEpoch; } -bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr _stats) { +bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr _stats) { if(!_stats->HasMinMax()) { return true; } @@ -211,27 +211,27 @@ bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::INT32) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + parquet::TypedStatistics* stats = + (parquet::TypedStatistics*)_stats.get(); min = stats->min(); max = stats->max(); } else if(pqType == parquet::Type::INT64) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + parquet::TypedStatistics* stats = + (parquet::TypedStatistics*)_stats.get(); min = stats->min(); max = stats->max(); } else if(pqType == parquet::Type::INT96) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + parquet::TypedStatistics* stats = + (parquet::TypedStatistics*)_stats.get(); min = int96toMsSinceEpoch(stats->min()); max = int96toMsSinceEpoch(stats->max()); } else if(pqType == parquet::Type::BOOLEAN) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + parquet::TypedStatistics* stats = + (parquet::TypedStatistics*)_stats.get(); min = stats->min(); max = stats->max(); @@ -272,7 +272,7 @@ bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint return true; } -bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr _stats) { +bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr _stats) { if(!_stats->HasMinMax()) { return true; } @@ -288,14 +288,14 @@ bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::DOUBLE) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + parquet::TypedStatistics* stats = + (parquet::TypedStatistics*)_stats.get(); min = stats->min(); max = stats->max(); } else if(pqType == parquet::Type::FLOAT) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + parquet::TypedStatistics* stats = + (parquet::TypedStatistics*)_stats.get(); min = stats->min(); max = stats->max(); @@ -527,7 +527,7 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() { } else { std::unique_ptr md = rowGroupMetadata->ColumnChunk(column); if(md->is_stats_set()) { - std::shared_ptr stats = md->statistics(); + std::shared_ptr stats = md->statistics(); // SQLite is much looser with types than you might expect if you // come from a Postgres background. The constraint '30.0' (that is, @@ -545,7 +545,7 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() { } else { parquet::Type::type pqType = types[column]; - if(pqType == parquet::Type::BYTE_ARRAY && logicalTypes[column] == parquet::LogicalType::UTF8) { + if(pqType == parquet::Type::BYTE_ARRAY && logicalTypes[column] == parquet::ConvertedType::UTF8) { rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats); } else if(pqType == parquet::Type::BYTE_ARRAY) { rv = currentRowGroupSatisfiesBlobFilter(constraints[i], stats); @@ -609,12 +609,12 @@ start: } while(logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) { - logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type()); + logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->converted_type()); } for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) { types[i] = rowGroupMetadata->schema()->Column(i)->physical_type(); - logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type(); + logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->converted_type(); } for(unsigned int i = 0; i < colRows.size(); i++) { @@ -664,7 +664,7 @@ bool ParquetCursor::currentRowSatisfiesFilter() { rv = !isNull(column); } else { - if(logicalTypes[column] == parquet::LogicalType::UTF8) { + if(logicalTypes[column] == parquet::ConvertedType::UTF8) { rv = currentRowSatisfiesTextFilter(constraints[i]); } else { parquet::Type::type pqType = types[column]; @@ -775,7 +775,7 @@ void ParquetCursor::ensureColumn(int col) { case parquet::Type::INT64: { parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); - long rv = 0; + long long rv = 0; s->NextValue(&rv, &wasNull); break; } @@ -859,7 +859,7 @@ void ParquetCursor::ensureColumn(int col) { case parquet::Type::INT64: { parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); - long rv = 0; + long long rv = 0; hadValue = s->NextValue(&rv, &wasNull); colIntValues[col] = rv; break; @@ -928,7 +928,7 @@ parquet::Type::type ParquetCursor::getPhysicalType(int col) { return types[col]; } -parquet::LogicalType::type ParquetCursor::getLogicalType(int col) { +parquet::ConvertedType::type ParquetCursor::getLogicalType(int col) { return logicalTypes[col]; } diff --git a/parquet/parquet_cursor.h b/parquet/parquet_cursor.h index f7d8c2a..d05b331 100644 --- a/parquet/parquet_cursor.h +++ b/parquet/parquet_cursor.h @@ -13,7 +13,7 @@ class ParquetCursor { std::shared_ptr rowGroup; std::vector> scanners; std::vector types; - std::vector logicalTypes; + std::vector logicalTypes; std::vector colRows; std::vector colNulls; @@ -36,10 +36,10 @@ class ParquetCursor { bool currentRowSatisfiesFilter(); bool currentRowGroupSatisfiesFilter(); bool currentRowGroupSatisfiesRowIdFilter(Constraint& constraint); - bool currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr stats); - bool currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr stats); - bool currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr stats); - bool currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr stats); + bool currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr stats); + bool currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr stats); + bool currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr stats); + bool currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr stats); bool currentRowSatisfiesTextFilter(Constraint& constraint); bool currentRowSatisfiesIntegerFilter(Constraint& constraint); @@ -60,7 +60,7 @@ public: unsigned int getNumConstraints() const; const Constraint& getConstraint(unsigned int i) const; parquet::Type::type getPhysicalType(int col); - parquet::LogicalType::type getLogicalType(int col); + parquet::ConvertedType::type getLogicalType(int col); ParquetTable* getTable() const; int getInt32(int col); diff --git a/parquet/parquet_table.cc b/parquet/parquet_table.cc index d796b8a..78d01a5 100644 --- a/parquet/parquet_table.cc +++ b/parquet/parquet_table.cc @@ -66,33 +66,33 @@ std::string ParquetTable::CreateStatement() { std::string type; parquet::Type::type physical = col->physical_type(); - parquet::LogicalType::type logical = col->logical_type(); + parquet::ConvertedType::type logical = col->converted_type(); // Be explicit about which types we understand so we don't mislead someone // whose unsigned ints start getting interpreted as signed. (We could // support this for UINT_8/16/32 -- and for UINT_64 we could throw if // the high bit was set.) - if(logical == parquet::LogicalType::NONE || - logical == parquet::LogicalType::UTF8 || - logical == parquet::LogicalType::DATE || - logical == parquet::LogicalType::TIME_MILLIS || - logical == parquet::LogicalType::TIMESTAMP_MILLIS || - logical == parquet::LogicalType::TIME_MICROS || - logical == parquet::LogicalType::TIMESTAMP_MICROS || - logical == parquet::LogicalType::INT_8 || - logical == parquet::LogicalType::INT_16 || - logical == parquet::LogicalType::INT_32 || - logical == parquet::LogicalType::INT_64) { + if(logical == parquet::ConvertedType::NONE || + logical == parquet::ConvertedType::UTF8 || + logical == parquet::ConvertedType::DATE || + logical == parquet::ConvertedType::TIME_MILLIS || + logical == parquet::ConvertedType::TIMESTAMP_MILLIS || + logical == parquet::ConvertedType::TIME_MICROS || + logical == parquet::ConvertedType::TIMESTAMP_MICROS || + logical == parquet::ConvertedType::INT_8 || + logical == parquet::ConvertedType::INT_16 || + logical == parquet::ConvertedType::INT_32 || + logical == parquet::ConvertedType::INT_64) { switch(physical) { case parquet::Type::BOOLEAN: type = "TINYINT"; break; case parquet::Type::INT32: - if(logical == parquet::LogicalType::NONE || - logical == parquet::LogicalType::INT_32) { + if(logical == parquet::ConvertedType::NONE || + logical == parquet::ConvertedType::INT_32) { type = "INT"; - } else if(logical == parquet::LogicalType::INT_8) { + } else if(logical == parquet::ConvertedType::INT_8) { type = "TINYINT"; - } else if(logical == parquet::LogicalType::INT_16) { + } else if(logical == parquet::ConvertedType::INT_16) { type = "SMALLINT"; } break; @@ -109,7 +109,7 @@ std::string ParquetTable::CreateStatement() { type = "DOUBLE"; break; case parquet::Type::BYTE_ARRAY: - if(logical == parquet::LogicalType::UTF8) { + if(logical == parquet::ConvertedType::UTF8) { type = "TEXT"; } else { type = "BLOB"; @@ -126,7 +126,7 @@ std::string ParquetTable::CreateStatement() { if(type.empty()) { std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has unsupported type: " << - parquet::TypeToString(physical) << "/" << parquet::LogicalTypeToString(logical); + parquet::TypeToString(physical) << "/" << parquet::ConvertedTypeToString(logical); throw std::invalid_argument(ss.str()); } @@ -138,7 +138,7 @@ std::string ParquetTable::CreateStatement() { col->physical_type(), parquet::TypeToString(col->physical_type()).data(), col->logical_type(), - parquet::LogicalTypeToString(col->logical_type()).data(), + parquet::ConvertedTypeToString(col->logical_type()).data(), type.data()); #endif