From 7bc6f91f6fb5a4ea58e15bc7b2a975075c2269b3 Mon Sep 17 00:00:00 2001 From: Addie Morrison Date: Sun, 8 Dec 2019 16:08:11 -0600 Subject: [PATCH] Run a formatting pass with clang-format to minimize future git churn --- src/parquet.cc | 808 +++++++++++++++---------------- src/parquet_cursor.cc | 1076 ++++++++++++++++++++--------------------- src/parquet_cursor.h | 38 +- src/parquet_filter.cc | 143 +++--- src/parquet_filter.h | 49 +- src/parquet_table.cc | 146 +++--- src/parquet_table.h | 9 +- 7 files changed, 1104 insertions(+), 1165 deletions(-) diff --git a/src/parquet.cc b/src/parquet.cc index 9d5857e..0443b2e 100644 --- a/src/parquet.cc +++ b/src/parquet.cc @@ -1,66 +1,65 @@ /* -* This file contains the implementation of an SQLite virtual table for -* reading Parquet files. -* -* Usage: -* -* .load ./parquet -* CREATE VIRTUAL TABLE demo USING parquet(FILENAME); -* SELECT * FROM demo; -* -*/ + * This file contains the implementation of an SQLite virtual table for + * reading Parquet files. + * + * Usage: + * + * .load ./parquet + * CREATE VIRTUAL TABLE demo USING parquet(FILENAME); + * SELECT * FROM demo; + * + */ #include SQLITE_EXTENSION_INIT1 -#include -#include #include -#include #include -#include #include -#include #include +#include +#include +#include +#include +#include -#include "parquet_table.h" #include "parquet_cursor.h" #include "parquet_filter.h" +#include "parquet_table.h" //#define DEBUG /* Forward references to the various virtual table methods implemented * in this file. */ -static int parquetCreate(sqlite3*, void*, int, const char*const*, - sqlite3_vtab**,char**); -static int parquetConnect(sqlite3*, void*, int, const char*const*, - sqlite3_vtab**,char**); -static int parquetBestIndex(sqlite3_vtab*,sqlite3_index_info*); -static int parquetDisconnect(sqlite3_vtab*); -static int parquetDestroy(sqlite3_vtab*); -static int parquetOpen(sqlite3_vtab*, sqlite3_vtab_cursor**); -static int parquetClose(sqlite3_vtab_cursor*); -static int parquetFilter(sqlite3_vtab_cursor*, int idxNum, const char *idxStr, - int argc, sqlite3_value **argv); -static int parquetNext(sqlite3_vtab_cursor*); -static int parquetEof(sqlite3_vtab_cursor*); -static int parquetColumn(sqlite3_vtab_cursor*,sqlite3_context*,int); -static int parquetRowid(sqlite3_vtab_cursor*,sqlite3_int64*); +static int parquetCreate(sqlite3 *, void *, int, const char *const *, + sqlite3_vtab **, char **); +static int parquetConnect(sqlite3 *, void *, int, const char *const *, + sqlite3_vtab **, char **); +static int parquetBestIndex(sqlite3_vtab *, sqlite3_index_info *); +static int parquetDisconnect(sqlite3_vtab *); +static int parquetDestroy(sqlite3_vtab *); +static int parquetOpen(sqlite3_vtab *, sqlite3_vtab_cursor **); +static int parquetClose(sqlite3_vtab_cursor *); +static int parquetFilter(sqlite3_vtab_cursor *, int idxNum, const char *idxStr, + int argc, sqlite3_value **argv); +static int parquetNext(sqlite3_vtab_cursor *); +static int parquetEof(sqlite3_vtab_cursor *); +static int parquetColumn(sqlite3_vtab_cursor *, sqlite3_context *, int); +static int parquetRowid(sqlite3_vtab_cursor *, sqlite3_int64 *); /* An instance of the Parquet virtual table */ typedef struct sqlite3_vtab_parquet { - sqlite3_vtab base; /* Base class. Must be first */ - ParquetTable* table; - sqlite3* db; + sqlite3_vtab base; /* Base class. Must be first */ + ParquetTable *table; + sqlite3 *db; } sqlite3_vtab_parquet; - /* A cursor for the Parquet virtual table */ typedef struct sqlite3_vtab_cursor_parquet { - sqlite3_vtab_cursor base; /* Base class. Must be first */ - ParquetCursor* cursor; + sqlite3_vtab_cursor base; /* Base class. Must be first */ + ParquetCursor *cursor; } sqlite3_vtab_cursor_parquet; static int parquetDestroy(sqlite3_vtab *pVtab) { - sqlite3_vtab_parquet *p = (sqlite3_vtab_parquet*)pVtab; + sqlite3_vtab_parquet *p = (sqlite3_vtab_parquet *)pVtab; // Clean up our shadow table. This is useful if the user has recreated // the parquet file, and our mappings would now be invalid. @@ -68,7 +67,7 @@ static int parquetDestroy(sqlite3_vtab *pVtab) { drop.append(p->table->getTableName()); drop.append("_rowgroups"); int rv = sqlite3_exec(p->db, drop.data(), 0, 0, 0); - if(rv != 0) + if (rv != 0) return rv; return SQLITE_OK; @@ -77,24 +76,20 @@ static int parquetDestroy(sqlite3_vtab *pVtab) { /* ** This method is the destructor fo a sqlite3_vtab_parquet object. */ -static int parquetDisconnect(sqlite3_vtab *pVtab){ - sqlite3_vtab_parquet *p = (sqlite3_vtab_parquet*)pVtab; +static int parquetDisconnect(sqlite3_vtab *pVtab) { + sqlite3_vtab_parquet *p = (sqlite3_vtab_parquet *)pVtab; delete p->table; sqlite3_free(p); return SQLITE_OK; } -static int parquetConnect( - sqlite3 *db, - void *pAux, - int argc, - const char *const*argv, - sqlite3_vtab **ppVtab, - char **pzErr -){ +static int parquetConnect(sqlite3 *db, void *pAux, int argc, + const char *const *argv, sqlite3_vtab **ppVtab, + char **pzErr) { try { - if(argc != 4 || strlen(argv[3]) < 2) { - *pzErr = sqlite3_mprintf("must provide exactly one argument, the path to a parquet file"); + if (argc != 4 || strlen(argv[3]) < 2) { + *pzErr = sqlite3_mprintf( + "must provide exactly one argument, the path to a parquet file"); return SQLITE_ERROR; } @@ -102,8 +97,8 @@ static int parquetConnect( // Remove the delimiting single quotes std::string fname = argv[3]; fname = fname.substr(1, fname.length() - 2); - std::unique_ptr vtab( - (sqlite3_vtab_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_parquet)), + std::unique_ptr vtab( + (sqlite3_vtab_parquet *)sqlite3_malloc(sizeof(sqlite3_vtab_parquet)), sqlite3_free); memset(vtab.get(), 0, sizeof(*vtab.get())); @@ -112,20 +107,20 @@ static int parquetConnect( std::string create = table->CreateStatement(); int rc = sqlite3_declare_vtab(db, create.data()); - if(rc) + if (rc) return rc; vtab->table = table.release(); vtab->db = db; - *ppVtab = (sqlite3_vtab*)vtab.release(); + *ppVtab = (sqlite3_vtab *)vtab.release(); return SQLITE_OK; - } catch (const std::exception& e) { + } catch (const std::exception &e) { *pzErr = sqlite3_mprintf(e.what()); return SQLITE_ERROR; } - } catch(std::bad_alloc& ba) { + } catch (std::bad_alloc &ba) { return SQLITE_NOMEM; - } catch(std::exception& e) { + } catch (std::exception &e) { return SQLITE_ERROR; } } @@ -134,20 +129,16 @@ static int parquetConnect( ** The xConnect and xCreate methods do the same thing, but they must be ** different so that the virtual table is not an eponymous virtual table. */ -static int parquetCreate( - sqlite3 *db, - void *pAux, - int argc, const char *const*argv, - sqlite3_vtab **ppVtab, - char **pzErr -){ +static int parquetCreate(sqlite3 *db, void *pAux, int argc, + const char *const *argv, sqlite3_vtab **ppVtab, + char **pzErr) { try { // Create shadow table for storing constraint -> rowid mappings std::string create = "CREATE TABLE IF NOT EXISTS _"; create.append(argv[2]); create.append("_rowgroups(clause TEXT, estimate BLOB, actual BLOB)"); int rv = sqlite3_exec(db, create.data(), 0, 0, 0); - if(rv != 0) + if (rv != 0) return rv; create = "CREATE UNIQUE INDEX IF NOT EXISTS _"; @@ -158,28 +149,31 @@ static int parquetCreate( rv = sqlite3_exec(db, create.data(), 0, 0, 0); return parquetConnect(db, pAux, argc, argv, ppVtab, pzErr); - } catch (std::bad_alloc& ba) { + } catch (std::bad_alloc &ba) { return SQLITE_NOMEM; } } -std::string quoteBlob(const std::vector& bytes) { +std::string quoteBlob(const std::vector &bytes) { std::ostringstream ss; ss << "X'" << std::hex; - for(unsigned int i = 0; i < bytes.size(); i++) { - ss << std::setfill('0') << std::setw(2) << (unsigned int)(unsigned char)bytes[i]; + for (unsigned int i = 0; i < bytes.size(); i++) { + ss << std::setfill('0') << std::setw(2) + << (unsigned int)(unsigned char)bytes[i]; } ss << "'"; return ss.str(); } -void persistConstraints(sqlite3* db, ParquetCursor* cursor) { - for(unsigned int i = 0; i < cursor->getNumConstraints(); i++) { - const Constraint& constraint = cursor->getConstraint(i); - const std::vector& estimated = constraint.bitmap.estimatedMembership; - const std::vector& actual = constraint.bitmap.actualMembership; - if(estimated == actual) { +void persistConstraints(sqlite3 *db, ParquetCursor *cursor) { + for (unsigned int i = 0; i < cursor->getNumConstraints(); i++) { + const Constraint &constraint = cursor->getConstraint(i); + const std::vector &estimated = + constraint.bitmap.estimatedMembership; + const std::vector &actual = + constraint.bitmap.actualMembership; + if (estimated == actual) { continue; } std::string desc = constraint.describe(); @@ -188,15 +182,13 @@ void persistConstraints(sqlite3* db, ParquetCursor* cursor) { std::string actualStr = quoteBlob(actual); // This is only advisory, so ignore failures. - char* sql = sqlite3_mprintf( - "INSERT OR REPLACE INTO _%s_rowgroups(clause, estimate, actual) VALUES ('%q', %s, %s)", - cursor->getTable()->getTableName().c_str(), - desc.c_str(), - estimatedStr.c_str(), - actualStr.c_str()); + char *sql = + sqlite3_mprintf("INSERT OR REPLACE INTO _%s_rowgroups(clause, " + "estimate, actual) VALUES ('%q', %s, %s)", + cursor->getTable()->getTableName().c_str(), + desc.c_str(), estimatedStr.c_str(), actualStr.c_str()); - - if(sql == NULL) + if (sql == NULL) return; sqlite3_exec(db, sql, 0, 0, 0); @@ -204,12 +196,12 @@ void persistConstraints(sqlite3* db, ParquetCursor* cursor) { } } - /* ** Destructor for a sqlite3_vtab_cursor_parquet. */ -static int parquetClose(sqlite3_vtab_cursor *cur){ - sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur; +static int parquetClose(sqlite3_vtab_cursor *cur) { + sqlite3_vtab_cursor_parquet *vtab_cursor_parquet = + (sqlite3_vtab_cursor_parquet *)cur; vtab_cursor_parquet->cursor->close(); delete vtab_cursor_parquet->cursor; sqlite3_free(cur); @@ -219,39 +211,40 @@ static int parquetClose(sqlite3_vtab_cursor *cur){ /* ** Constructor for a new sqlite3_vtab_parquet cursor object. */ -static int parquetOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor){ +static int parquetOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor) { try { - std::unique_ptr cursor( - (sqlite3_vtab_cursor_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_cursor_parquet)), + std::unique_ptr cursor( + (sqlite3_vtab_cursor_parquet *)sqlite3_malloc( + sizeof(sqlite3_vtab_cursor_parquet)), sqlite3_free); memset(cursor.get(), 0, sizeof(*cursor.get())); - sqlite3_vtab_parquet* pParquet = (sqlite3_vtab_parquet*)p; + sqlite3_vtab_parquet *pParquet = (sqlite3_vtab_parquet *)p; cursor->cursor = new ParquetCursor(pParquet->table); - *ppCursor = (sqlite3_vtab_cursor*)cursor.release(); + *ppCursor = (sqlite3_vtab_cursor *)cursor.release(); return SQLITE_OK; - } catch(std::bad_alloc& ba) { + } catch (std::bad_alloc &ba) { return SQLITE_NOMEM; - } catch(std::exception& e) { + } catch (std::exception &e) { return SQLITE_ERROR; } } - /* ** Advance a sqlite3_vtab_cursor_parquet to its next row of input. ** Set the EOF marker if we reach the end of input. */ -static int parquetNext(sqlite3_vtab_cursor *cur){ +static int parquetNext(sqlite3_vtab_cursor *cur) { try { - sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur; - ParquetCursor* cursor = vtab_cursor_parquet->cursor; + sqlite3_vtab_cursor_parquet *vtab_cursor_parquet = + (sqlite3_vtab_cursor_parquet *)cur; + ParquetCursor *cursor = vtab_cursor_parquet->cursor; cursor->next(); return SQLITE_OK; - } catch(std::bad_alloc& ba) { + } catch (std::bad_alloc &ba) { return SQLITE_NOMEM; - } catch(std::exception& e) { + } catch (std::exception &e) { return SQLITE_ERROR; } } @@ -260,73 +253,70 @@ static int parquetNext(sqlite3_vtab_cursor *cur){ ** Return values of columns for the row at which the sqlite3_vtab_cursor_parquet ** is currently pointing. */ -static int parquetColumn( - sqlite3_vtab_cursor *cur, /* The cursor */ - sqlite3_context *ctx, /* First argument to sqlite3_result_...() */ - int col /* Which column to return */ -){ +static int +parquetColumn(sqlite3_vtab_cursor *cur, /* The cursor */ + sqlite3_context *ctx, /* First argument to sqlite3_result_...() */ + int col /* Which column to return */ +) { try { - ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; + ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet *)cur)->cursor; cursor->ensureColumn(col); - if(cursor->isNull(col)) { + if (cursor->isNull(col)) { sqlite3_result_null(ctx); } else { - switch(cursor->getPhysicalType(col)) { - case parquet::Type::BOOLEAN: - case parquet::Type::INT32: - { - int rv = cursor->getInt32(col); - sqlite3_result_int(ctx, rv); - break; + switch (cursor->getPhysicalType(col)) { + case parquet::Type::BOOLEAN: + case parquet::Type::INT32: { + int rv = cursor->getInt32(col); + sqlite3_result_int(ctx, rv); + break; + } + case parquet::Type::FLOAT: + case parquet::Type::DOUBLE: { + double rv = cursor->getDouble(col); + sqlite3_result_double(ctx, rv); + break; + } + case parquet::Type::BYTE_ARRAY: { + parquet::ByteArray *rv = cursor->getByteArray(col); + if (cursor->getLogicalType(col) == parquet::LogicalType::UTF8) { + sqlite3_result_text(ctx, (const char *)rv->ptr, rv->len, + SQLITE_TRANSIENT); + } else { + sqlite3_result_blob(ctx, (void *)rv->ptr, rv->len, SQLITE_TRANSIENT); } - case parquet::Type::FLOAT: - case parquet::Type::DOUBLE: - { - double rv = cursor->getDouble(col); - sqlite3_result_double(ctx, rv); - break; - } - case parquet::Type::BYTE_ARRAY: - { - parquet::ByteArray* rv = cursor->getByteArray(col); - if(cursor->getLogicalType(col) == parquet::LogicalType::UTF8) { - sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT); - } else { - sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT); - } - break; - } - case parquet::Type::INT96: - // This type exists to store timestamps in nanoseconds due to legacy - // reasons. We just interpret it as a timestamp in milliseconds. - case parquet::Type::INT64: - { - long rv = cursor->getInt64(col); - sqlite3_result_int64(ctx, rv); - break; - } - case parquet::Type::FIXED_LEN_BYTE_ARRAY: - { - parquet::ByteArray* rv = cursor->getByteArray(col); - sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT); - break; - } - default: - // Should be impossible to get here as we should have forbidden this at - // CREATE time -- maybe file changed underneath us? - std::ostringstream ss; - ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << - parquet::TypeToString(cursor->getPhysicalType(col)); + break; + } + case parquet::Type::INT96: + // This type exists to store timestamps in nanoseconds due to legacy + // reasons. We just interpret it as a timestamp in milliseconds. + case parquet::Type::INT64: { + long rv = cursor->getInt64(col); + sqlite3_result_int64(ctx, rv); + break; + } + case parquet::Type::FIXED_LEN_BYTE_ARRAY: { + parquet::ByteArray *rv = cursor->getByteArray(col); + sqlite3_result_blob(ctx, (void *)rv->ptr, rv->len, SQLITE_TRANSIENT); + break; + } + default: + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << col + << " has unsupported type: " + << parquet::TypeToString(cursor->getPhysicalType(col)); - throw std::invalid_argument(ss.str()); - break; + throw std::invalid_argument(ss.str()); + break; } } return SQLITE_OK; - } catch(std::bad_alloc& ba) { + } catch (std::bad_alloc &ba) { return SQLITE_NOMEM; - } catch(std::exception& e) { + } catch (std::exception &e) { return SQLITE_ERROR; } } @@ -334,9 +324,9 @@ static int parquetColumn( /* ** Return the rowid for the current row. */ -static int parquetRowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid){ - ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; - *pRowid = cursor->getRowId(); +static int parquetRowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid) { + ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet *)cur)->cursor; + *pRowid = cursor->getRowId(); return SQLITE_OK; } @@ -344,11 +334,13 @@ static int parquetRowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid){ ** Return TRUE if the cursor has been moved off of the last ** row of output. */ -static int parquetEof(sqlite3_vtab_cursor *cur){ - ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; - if(cursor->eof()) { - sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur; - sqlite3_vtab_parquet* vtab_parquet = (sqlite3_vtab_parquet*)(vtab_cursor_parquet->base.pVtab); +static int parquetEof(sqlite3_vtab_cursor *cur) { + ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet *)cur)->cursor; + if (cursor->eof()) { + sqlite3_vtab_cursor_parquet *vtab_cursor_parquet = + (sqlite3_vtab_cursor_parquet *)cur; + sqlite3_vtab_parquet *vtab_parquet = + (sqlite3_vtab_parquet *)(vtab_cursor_parquet->base.pVtab); persistConstraints(vtab_parquet->db, cursor); return 1; } @@ -356,125 +348,119 @@ static int parquetEof(sqlite3_vtab_cursor *cur){ } #ifdef DEBUG -const char* opName(int op) { - switch(op) { - case SQLITE_INDEX_CONSTRAINT_EQ: - return "="; - case SQLITE_INDEX_CONSTRAINT_GT: - return ">"; - case SQLITE_INDEX_CONSTRAINT_LE: - return "<="; - case SQLITE_INDEX_CONSTRAINT_LT: - return "<"; - case SQLITE_INDEX_CONSTRAINT_GE: - return ">="; - case SQLITE_INDEX_CONSTRAINT_MATCH: - return "match"; - case SQLITE_INDEX_CONSTRAINT_LIKE: - return "LIKE"; - case SQLITE_INDEX_CONSTRAINT_GLOB: - return "GLOB"; - case SQLITE_INDEX_CONSTRAINT_REGEXP: - return "REGEXP"; - case SQLITE_INDEX_CONSTRAINT_NE: - return "!="; - case SQLITE_INDEX_CONSTRAINT_ISNOT: - return "IS NOT"; - case SQLITE_INDEX_CONSTRAINT_ISNOTNULL: - return "IS NOT NULL"; - case SQLITE_INDEX_CONSTRAINT_ISNULL: - return "IS NULL"; - case SQLITE_INDEX_CONSTRAINT_IS: - return "IS"; - default: - return "unknown"; +const char *opName(int op) { + switch (op) { + case SQLITE_INDEX_CONSTRAINT_EQ: + return "="; + case SQLITE_INDEX_CONSTRAINT_GT: + return ">"; + case SQLITE_INDEX_CONSTRAINT_LE: + return "<="; + case SQLITE_INDEX_CONSTRAINT_LT: + return "<"; + case SQLITE_INDEX_CONSTRAINT_GE: + return ">="; + case SQLITE_INDEX_CONSTRAINT_MATCH: + return "match"; + case SQLITE_INDEX_CONSTRAINT_LIKE: + return "LIKE"; + case SQLITE_INDEX_CONSTRAINT_GLOB: + return "GLOB"; + case SQLITE_INDEX_CONSTRAINT_REGEXP: + return "REGEXP"; + case SQLITE_INDEX_CONSTRAINT_NE: + return "!="; + case SQLITE_INDEX_CONSTRAINT_ISNOT: + return "IS NOT"; + case SQLITE_INDEX_CONSTRAINT_ISNOTNULL: + return "IS NOT NULL"; + case SQLITE_INDEX_CONSTRAINT_ISNULL: + return "IS NULL"; + case SQLITE_INDEX_CONSTRAINT_IS: + return "IS"; + default: + return "unknown"; } } -void debugConstraints(sqlite3_index_info *pIdxInfo, ParquetTable *table, int argc, sqlite3_value** argv) { +void debugConstraints(sqlite3_index_info *pIdxInfo, ParquetTable *table, + int argc, sqlite3_value **argv) { printf("debugConstraints, argc=%d\n", argc); int j = 0; - for(int i = 0; i < pIdxInfo->nConstraint; i++) { + for (int i = 0; i < pIdxInfo->nConstraint; i++) { std::string valueStr = "?"; - if(argv != NULL && pIdxInfo->aConstraint[i].usable) { + if (argv != NULL && pIdxInfo->aConstraint[i].usable) { int type = sqlite3_value_type(argv[j]); - switch(type) { - case SQLITE_INTEGER: - { - sqlite3_int64 rv = sqlite3_value_int64(argv[j]); - std::ostringstream ss; - ss << rv; - valueStr = ss.str(); - break; - } - case SQLITE_FLOAT: - { - double rv = sqlite3_value_double(argv[j]); - std::ostringstream ss; - ss << rv; - valueStr = ss.str(); - break; - } - case SQLITE_TEXT: - { - const unsigned char* rv = sqlite3_value_text(argv[j]); - std::ostringstream ss; - ss << "'" << rv << "'"; - valueStr = ss.str(); - break; - } - case SQLITE_BLOB: - { - int sizeBytes = sqlite3_value_bytes(argv[j]); - std::ostringstream ss; - ss << "'..." << sizeBytes << "-byte blob...'"; - valueStr = ss.str(); - break; - } - case SQLITE_NULL: - { - valueStr = "NULL"; - break; - } + switch (type) { + case SQLITE_INTEGER: { + sqlite3_int64 rv = sqlite3_value_int64(argv[j]); + std::ostringstream ss; + ss << rv; + valueStr = ss.str(); + break; + } + case SQLITE_FLOAT: { + double rv = sqlite3_value_double(argv[j]); + std::ostringstream ss; + ss << rv; + valueStr = ss.str(); + break; + } + case SQLITE_TEXT: { + const unsigned char *rv = sqlite3_value_text(argv[j]); + std::ostringstream ss; + ss << "'" << rv << "'"; + valueStr = ss.str(); + break; + } + case SQLITE_BLOB: { + int sizeBytes = sqlite3_value_bytes(argv[j]); + std::ostringstream ss; + ss << "'..." << sizeBytes << "-byte blob...'"; + valueStr = ss.str(); + break; + } + case SQLITE_NULL: { + valueStr = "NULL"; + break; + } } j++; } - printf(" constraint %d: col %s %s %s, usable %d\n", - i, - table->columnName(pIdxInfo->aConstraint[i].iColumn).data(), - opName(pIdxInfo->aConstraint[i].op), - valueStr.data(), - pIdxInfo->aConstraint[i].usable); + printf(" constraint %d: col %s %s %s, usable %d\n", i, + table->columnName(pIdxInfo->aConstraint[i].iColumn).data(), + opName(pIdxInfo->aConstraint[i].op), valueStr.data(), + pIdxInfo->aConstraint[i].usable); } } #endif ConstraintOperator constraintOperatorFromSqlite(int op) { - switch(op) { - case SQLITE_INDEX_CONSTRAINT_EQ: - return Equal; - case SQLITE_INDEX_CONSTRAINT_GT: - return GreaterThan; - case SQLITE_INDEX_CONSTRAINT_LE: - return LessThanOrEqual; - case SQLITE_INDEX_CONSTRAINT_LT: - return LessThan; - case SQLITE_INDEX_CONSTRAINT_GE: - return GreaterThanOrEqual; - case SQLITE_INDEX_CONSTRAINT_LIKE: - return Like; - case SQLITE_INDEX_CONSTRAINT_GLOB: - return Glob; - case SQLITE_INDEX_CONSTRAINT_NE: - return NotEqual; - case SQLITE_INDEX_CONSTRAINT_ISNOT: - return IsNot; - case SQLITE_INDEX_CONSTRAINT_ISNOTNULL: - return IsNotNull; - case SQLITE_INDEX_CONSTRAINT_ISNULL: - return IsNull; - case SQLITE_INDEX_CONSTRAINT_IS: - return Is; + switch (op) { + case SQLITE_INDEX_CONSTRAINT_EQ: + return Equal; + case SQLITE_INDEX_CONSTRAINT_GT: + return GreaterThan; + case SQLITE_INDEX_CONSTRAINT_LE: + return LessThanOrEqual; + case SQLITE_INDEX_CONSTRAINT_LT: + return LessThan; + case SQLITE_INDEX_CONSTRAINT_GE: + return GreaterThanOrEqual; + case SQLITE_INDEX_CONSTRAINT_LIKE: + return Like; + case SQLITE_INDEX_CONSTRAINT_GLOB: + return Glob; + case SQLITE_INDEX_CONSTRAINT_NE: + return NotEqual; + case SQLITE_INDEX_CONSTRAINT_ISNOT: + return IsNot; + case SQLITE_INDEX_CONSTRAINT_ISNOTNULL: + return IsNotNull; + case SQLITE_INDEX_CONSTRAINT_ISNULL: + return IsNull; + case SQLITE_INDEX_CONSTRAINT_IS: + return Is; } std::ostringstream ss; @@ -482,29 +468,30 @@ ConstraintOperator constraintOperatorFromSqlite(int op) { throw std::invalid_argument(ss.str()); } -std::vector getRowGroupsForClause(sqlite3* db, std::string table, std::string clause) { +std::vector getRowGroupsForClause(sqlite3 *db, std::string table, + std::string clause) { std::vector rv; - std::unique_ptr sql(sqlite3_mprintf( - "SELECT actual FROM _%s_rowgroups WHERE clause = '%q'", - table.c_str(), - clause.c_str()), sqlite3_free); + std::unique_ptr sql( + sqlite3_mprintf("SELECT actual FROM _%s_rowgroups WHERE clause = '%q'", + table.c_str(), clause.c_str()), + sqlite3_free); - if(sql.get() == NULL) + if (sql.get() == NULL) return rv; - sqlite3_stmt* pStmt = NULL; + sqlite3_stmt *pStmt = NULL; int rc = sqlite3_prepare_v2(db, sql.get(), -1, &pStmt, NULL); - if(rc != 0) + if (rc != 0) return rv; rc = sqlite3_step(pStmt); - if(rc == SQLITE_ROW) { + if (rc == SQLITE_ROW) { int size = sqlite3_column_bytes(pStmt, 0); - unsigned char* blob = (unsigned char*)sqlite3_column_blob(pStmt, 0); - // TODO: there is a memory leak here if we get a std::bad_alloc while populating rv; - // we fail to free pStmt - for(int i = 0; i < size; i++) { + unsigned char *blob = (unsigned char *)sqlite3_column_blob(pStmt, 0); + // TODO: there is a memory leak here if we get a std::bad_alloc while + // populating rv; we fail to free pStmt + for (int i = 0; i < size; i++) { rv.push_back(blob[i]); } } @@ -513,39 +500,36 @@ std::vector getRowGroupsForClause(sqlite3* db, std::string table, return rv; } - /* ** Only a full table scan is supported. So xFilter simply rewinds to ** the beginning. */ -static int parquetFilter( - sqlite3_vtab_cursor *cur, - int idxNum, - const char *idxStr, - int argc, - sqlite3_value **argv -){ +static int parquetFilter(sqlite3_vtab_cursor *cur, int idxNum, + const char *idxStr, int argc, sqlite3_value **argv) { try { - sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur; - sqlite3_vtab_parquet* vtab_parquet = (sqlite3_vtab_parquet*)(vtab_cursor_parquet->base.pVtab); - sqlite3* db = vtab_parquet->db; - ParquetCursor* cursor = vtab_cursor_parquet->cursor; - sqlite3_index_info* indexInfo = (sqlite3_index_info*)idxStr; + sqlite3_vtab_cursor_parquet *vtab_cursor_parquet = + (sqlite3_vtab_cursor_parquet *)cur; + sqlite3_vtab_parquet *vtab_parquet = + (sqlite3_vtab_parquet *)(vtab_cursor_parquet->base.pVtab); + sqlite3 *db = vtab_parquet->db; + ParquetCursor *cursor = vtab_cursor_parquet->cursor; + sqlite3_index_info *indexInfo = (sqlite3_index_info *)idxStr; #ifdef DEBUG - struct timeval tv; - gettimeofday(&tv, NULL); - unsigned long long millisecondsSinceEpoch = - (unsigned long long)(tv.tv_sec) * 1000 + - (unsigned long long)(tv.tv_usec) / 1000; + struct timeval tv; + gettimeofday(&tv, NULL); + unsigned long long millisecondsSinceEpoch = + (unsigned long long)(tv.tv_sec) * 1000 + + (unsigned long long)(tv.tv_usec) / 1000; - printf("%llu xFilter: idxNum=%d, idxStr=%lu, argc=%d\n", millisecondsSinceEpoch, idxNum, (long unsigned int)idxStr, argc); + printf("%llu xFilter: idxNum=%d, idxStr=%lu, argc=%d\n", + millisecondsSinceEpoch, idxNum, (long unsigned int)idxStr, argc); debugConstraints(indexInfo, cursor->getTable(), argc, argv); #endif std::vector constraints; int j = 0; - for(int i = 0; i < indexInfo->nConstraint; i++) { - if(!indexInfo->aConstraint[i].usable) { + for (int i = 0; i < indexInfo->nConstraint; i++) { + if (!indexInfo->aConstraint[i].usable) { continue; } @@ -555,107 +539,98 @@ static int parquetFilter( std::vector blobValue; int sqliteType = sqlite3_value_type(argv[j]); - if(sqliteType == SQLITE_INTEGER) { + if (sqliteType == SQLITE_INTEGER) { type = Integer; intValue = sqlite3_value_int64(argv[j]); - } else if(sqliteType == SQLITE_FLOAT) { + } else if (sqliteType == SQLITE_FLOAT) { type = Double; doubleValue = sqlite3_value_double(argv[j]); - } else if(sqliteType == SQLITE_TEXT) { + } else if (sqliteType == SQLITE_TEXT) { type = Text; int len = sqlite3_value_bytes(argv[j]); - const unsigned char* ptr = sqlite3_value_text(argv[j]); - for(int k = 0; k < len; k++) { + const unsigned char *ptr = sqlite3_value_text(argv[j]); + for (int k = 0; k < len; k++) { blobValue.push_back(ptr[k]); } - } else if(sqliteType == SQLITE_BLOB) { + } else if (sqliteType == SQLITE_BLOB) { type = Blob; int len = sqlite3_value_bytes(argv[j]); - const unsigned char* ptr = (const unsigned char*)sqlite3_value_blob(argv[j]); - for(int k = 0; k < len; k++) { + const unsigned char *ptr = + (const unsigned char *)sqlite3_value_blob(argv[j]); + for (int k = 0; k < len; k++) { blobValue.push_back(ptr[k]); } - } else if(sqliteType == SQLITE_NULL) { + } else if (sqliteType == SQLITE_NULL) { type = Null; } std::string columnName = "rowid"; - if(indexInfo->aConstraint[i].iColumn >= 0) { - columnName = cursor->getTable()->columnName(indexInfo->aConstraint[i].iColumn); + if (indexInfo->aConstraint[i].iColumn >= 0) { + columnName = + cursor->getTable()->columnName(indexInfo->aConstraint[i].iColumn); } RowGroupBitmap bitmap = RowGroupBitmap(cursor->getNumRowGroups()); Constraint dummy( - bitmap, - indexInfo->aConstraint[i].iColumn, - columnName, - constraintOperatorFromSqlite(indexInfo->aConstraint[i].op), - type, - intValue, - doubleValue, - blobValue); + bitmap, indexInfo->aConstraint[i].iColumn, columnName, + constraintOperatorFromSqlite(indexInfo->aConstraint[i].op), type, + intValue, doubleValue, blobValue); - std::vector actual = getRowGroupsForClause(db, cursor->getTable()->getTableName(), dummy.describe()); - if(actual.size() > 0) { - // Initialize the estimate to be the actual -- eventually they'll converge - // and we'll stop writing back to the db. + std::vector actual = getRowGroupsForClause( + db, cursor->getTable()->getTableName(), dummy.describe()); + if (actual.size() > 0) { + // Initialize the estimate to be the actual -- eventually they'll + // converge and we'll stop writing back to the db. std::vector estimate = actual; bitmap = RowGroupBitmap(estimate, actual); } Constraint constraint( - bitmap, - indexInfo->aConstraint[i].iColumn, - columnName, - constraintOperatorFromSqlite(indexInfo->aConstraint[i].op), - type, - intValue, - doubleValue, - blobValue); + bitmap, indexInfo->aConstraint[i].iColumn, columnName, + constraintOperatorFromSqlite(indexInfo->aConstraint[i].op), type, + intValue, doubleValue, blobValue); constraints.push_back(constraint); j++; } cursor->reset(constraints); return parquetNext(cur); - } catch(std::bad_alloc& ba) { + } catch (std::bad_alloc &ba) { return SQLITE_NOMEM; - } catch(std::exception& e) { + } catch (std::exception &e) { return SQLITE_ERROR; } } /* -* We'll always indicate to SQLite that we prefer it to use an index so that it will -* pass additional context to xFilter, which we may or may not use. -* -* We copy the sqlite3_index_info structure, as is, into idxStr for later use. -*/ -static int parquetBestIndex( - sqlite3_vtab *tab, - sqlite3_index_info *pIdxInfo -){ + * We'll always indicate to SQLite that we prefer it to use an index so that it + * will pass additional context to xFilter, which we may or may not use. + * + * We copy the sqlite3_index_info structure, as is, into idxStr for later use. + */ +static int parquetBestIndex(sqlite3_vtab *tab, sqlite3_index_info *pIdxInfo) { try { #ifdef DEBUG struct timeval tv; gettimeofday(&tv, NULL); unsigned long long millisecondsSinceEpoch = - (unsigned long long)(tv.tv_sec) * 1000 + - (unsigned long long)(tv.tv_usec) / 1000; + (unsigned long long)(tv.tv_sec) * 1000 + + (unsigned long long)(tv.tv_usec) / 1000; - - ParquetTable* table = ((sqlite3_vtab_parquet*)tab)->table; - printf("%llu xBestIndex: nConstraint=%d, nOrderBy=%d\n", millisecondsSinceEpoch, pIdxInfo->nConstraint, pIdxInfo->nOrderBy); + ParquetTable *table = ((sqlite3_vtab_parquet *)tab)->table; + printf("%llu xBestIndex: nConstraint=%d, nOrderBy=%d\n", + millisecondsSinceEpoch, pIdxInfo->nConstraint, pIdxInfo->nOrderBy); debugConstraints(pIdxInfo, table, 0, NULL); #endif - // We traverse in rowid ascending order, so if they're asking for it to be ordered like that, - // we can tell SQLite that it's guaranteed. This speeds up some DB viewer utilities that - // use rowids for pagination. - if(pIdxInfo->nOrderBy == 1 && pIdxInfo->aOrderBy[0].iColumn == -1 && pIdxInfo->aOrderBy[0].desc == 0) + // We traverse in rowid ascending order, so if they're asking for it to be + // ordered like that, we can tell SQLite that it's guaranteed. This speeds + // up some DB viewer utilities that use rowids for pagination. + if (pIdxInfo->nOrderBy == 1 && pIdxInfo->aOrderBy[0].iColumn == -1 && + pIdxInfo->aOrderBy[0].desc == 0) pIdxInfo->orderByConsumed = 1; - if(pIdxInfo->nConstraint == 0) { + if (pIdxInfo->nConstraint == 0) { pIdxInfo->estimatedCost = 1000000000000; pIdxInfo->idxNum = 0; } else { @@ -663,98 +638,103 @@ static int parquetBestIndex( pIdxInfo->idxNum = 1; int j = 0; - for(int i = 0; i < pIdxInfo->nConstraint; i++) { - if(pIdxInfo->aConstraint[i].usable) { + for (int i = 0; i < pIdxInfo->nConstraint; i++) { + if (pIdxInfo->aConstraint[i].usable) { j++; pIdxInfo->aConstraintUsage[i].argvIndex = j; -// pIdxInfo->aConstraintUsage[i].omit = 1; + // pIdxInfo->aConstraintUsage[i].omit = 1; } } } - size_t dupeSize = sizeof(sqlite3_index_info) + - //pIdxInfo->nConstraint * sizeof(sqlite3_index_constraint) + - pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint) + - pIdxInfo->nOrderBy * sizeof(sqlite3_index_info::sqlite3_index_orderby) + - pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint_usage); - sqlite3_index_info* dupe = (sqlite3_index_info*)sqlite3_malloc(dupeSize); - pIdxInfo->idxStr = (char*)dupe; + size_t dupeSize = + sizeof(sqlite3_index_info) + + // pIdxInfo->nConstraint * sizeof(sqlite3_index_constraint) + + pIdxInfo->nConstraint * + sizeof(sqlite3_index_info::sqlite3_index_constraint) + + pIdxInfo->nOrderBy * sizeof(sqlite3_index_info::sqlite3_index_orderby) + + pIdxInfo->nConstraint * + sizeof(sqlite3_index_info::sqlite3_index_constraint_usage); + sqlite3_index_info *dupe = (sqlite3_index_info *)sqlite3_malloc(dupeSize); + pIdxInfo->idxStr = (char *)dupe; pIdxInfo->needToFreeIdxStr = 1; memset(dupe, 0, dupeSize); memcpy(dupe, pIdxInfo, sizeof(sqlite3_index_info)); - dupe->aConstraint = (sqlite3_index_info::sqlite3_index_constraint*)((char*)dupe + sizeof(sqlite3_index_info)); - dupe->aOrderBy = (sqlite3_index_info::sqlite3_index_orderby*)((char*)dupe + - sizeof(sqlite3_index_info) + - pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint)); - dupe->aConstraintUsage = (sqlite3_index_info::sqlite3_index_constraint_usage*)((char*)dupe + - sizeof(sqlite3_index_info) + - pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint) + - pIdxInfo->nOrderBy * sizeof(sqlite3_index_info::sqlite3_index_orderby)); + dupe->aConstraint = (sqlite3_index_info::sqlite3_index_constraint + *)((char *)dupe + sizeof(sqlite3_index_info)); + dupe->aOrderBy = + (sqlite3_index_info::sqlite3_index_orderby + *)((char *)dupe + sizeof(sqlite3_index_info) + + pIdxInfo->nConstraint * + sizeof(sqlite3_index_info::sqlite3_index_constraint)); + dupe->aConstraintUsage = + (sqlite3_index_info::sqlite3_index_constraint_usage + *)((char *)dupe + sizeof(sqlite3_index_info) + + pIdxInfo->nConstraint * + sizeof(sqlite3_index_info::sqlite3_index_constraint) + + pIdxInfo->nOrderBy * + sizeof(sqlite3_index_info::sqlite3_index_orderby)); - - for(int i = 0; i < pIdxInfo->nConstraint; i++) { + for (int i = 0; i < pIdxInfo->nConstraint; i++) { dupe->aConstraint[i].iColumn = pIdxInfo->aConstraint[i].iColumn; dupe->aConstraint[i].op = pIdxInfo->aConstraint[i].op; dupe->aConstraint[i].usable = pIdxInfo->aConstraint[i].usable; dupe->aConstraint[i].iTermOffset = pIdxInfo->aConstraint[i].iTermOffset; - dupe->aConstraintUsage[i].argvIndex = pIdxInfo->aConstraintUsage[i].argvIndex; + dupe->aConstraintUsage[i].argvIndex = + pIdxInfo->aConstraintUsage[i].argvIndex; dupe->aConstraintUsage[i].omit = pIdxInfo->aConstraintUsage[i].omit; } - for(int i = 0; i < pIdxInfo->nOrderBy; i++) { + for (int i = 0; i < pIdxInfo->nOrderBy; i++) { dupe->aOrderBy[i].iColumn = pIdxInfo->aOrderBy[i].iColumn; dupe->aOrderBy[i].desc = pIdxInfo->aOrderBy[i].desc; } return SQLITE_OK; - } catch(std::bad_alloc& ba) { + } catch (std::bad_alloc &ba) { return SQLITE_NOMEM; - } catch(std::exception& e) { + } catch (std::exception &e) { return SQLITE_ERROR; } } - static sqlite3_module ParquetModule = { - 0, /* iVersion */ - parquetCreate, /* xCreate */ - parquetConnect, /* xConnect */ - parquetBestIndex, /* xBestIndex */ - parquetDisconnect, /* xDisconnect */ - parquetDestroy, /* xDestroy */ - parquetOpen, /* xOpen - open a cursor */ - parquetClose, /* xClose - close a cursor */ - parquetFilter, /* xFilter - configure scan constraints */ - parquetNext, /* xNext - advance a cursor */ - parquetEof, /* xEof - check for end of scan */ - parquetColumn, /* xColumn - read data */ - parquetRowid, /* xRowid - read data */ - 0, /* xUpdate */ - 0, /* xBegin */ - 0, /* xSync */ - 0, /* xCommit */ - 0, /* xRollback */ - 0, /* xFindMethod */ - 0, /* xRename */ + 0, /* iVersion */ + parquetCreate, /* xCreate */ + parquetConnect, /* xConnect */ + parquetBestIndex, /* xBestIndex */ + parquetDisconnect, /* xDisconnect */ + parquetDestroy, /* xDestroy */ + parquetOpen, /* xOpen - open a cursor */ + parquetClose, /* xClose - close a cursor */ + parquetFilter, /* xFilter - configure scan constraints */ + parquetNext, /* xNext - advance a cursor */ + parquetEof, /* xEof - check for end of scan */ + parquetColumn, /* xColumn - read data */ + parquetRowid, /* xRowid - read data */ + 0, /* xUpdate */ + 0, /* xBegin */ + 0, /* xSync */ + 0, /* xCommit */ + 0, /* xRollback */ + 0, /* xFindMethod */ + 0, /* xRename */ }; -/* -* This routine is called when the extension is loaded. The new -* Parquet virtual table module is registered with the calling database -* connection. -*/ +/* + * This routine is called when the extension is loaded. The new + * Parquet virtual table module is registered with the calling database + * connection. + */ extern "C" { - int sqlite3_parquet_init( - sqlite3 *db, - char **pzErrMsg, - const sqlite3_api_routines *pApi - ){ - int rc; - SQLITE_EXTENSION_INIT2(pApi); - rc = sqlite3_create_module(db, "parquet", &ParquetModule, 0); - return rc; - } +int sqlite3_parquet_init(sqlite3 *db, char **pzErrMsg, + const sqlite3_api_routines *pApi) { + int rc; + SQLITE_EXTENSION_INIT2(pApi); + rc = sqlite3_create_module(db, "parquet", &ParquetModule, 0); + return rc; +} } diff --git a/src/parquet_cursor.cc b/src/parquet_cursor.cc index e0ab8b6..ee5452a 100644 --- a/src/parquet_cursor.cc +++ b/src/parquet_cursor.cc @@ -1,61 +1,66 @@ #include "parquet_cursor.h" -ParquetCursor::ParquetCursor(ParquetTable* table): table(table) { +ParquetCursor::ParquetCursor(ParquetTable *table) : table(table) { reader = NULL; reset(std::vector()); } -bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint) { - if(constraint.type != Integer) +bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter( + Constraint &constraint) { + if (constraint.type != Integer) return true; int64_t target = constraint.intValue; - switch(constraint.op) { - case IsNull: - return false; - case Is: - case Equal: - return target >= rowId && target < rowId + rowGroupSize; - case GreaterThan: - // rowId > target - return rowId + rowGroupSize > target; - case GreaterThanOrEqual: - // rowId >= target - return rowId + rowGroupSize >= rowId; - case LessThan: - return target > rowId; - case LessThanOrEqual: - return target >= rowId; - default: - return true; + switch (constraint.op) { + case IsNull: + return false; + case Is: + case Equal: + return target >= rowId && target < rowId + rowGroupSize; + case GreaterThan: + // rowId > target + return rowId + rowGroupSize > target; + case GreaterThanOrEqual: + // rowId >= target + return rowId + rowGroupSize >= rowId; + case LessThan: + return target > rowId; + case LessThanOrEqual: + return target >= rowId; + default: + return true; } } -bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr _stats) { - if(!_stats->HasMinMax()) { +bool ParquetCursor::currentRowGroupSatisfiesBlobFilter( + Constraint &constraint, + std::shared_ptr _stats) { + if (!_stats->HasMinMax()) { return true; } - if(constraint.type != Blob) { + if (constraint.type != Blob) { return true; } - const unsigned char* minPtr = NULL; - const unsigned char* maxPtr = NULL; + const unsigned char *minPtr = NULL; + const unsigned char *maxPtr = NULL; size_t minLen = 0; size_t maxLen = 0; parquet::Type::type pqType = types[constraint.column]; - if(pqType == parquet::Type::BYTE_ARRAY) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + if (pqType == parquet::Type::BYTE_ARRAY) { + parquet::TypedRowGroupStatistics< + parquet::DataType> *stats = + (parquet::TypedRowGroupStatistics< + parquet::DataType> *)_stats.get(); minPtr = stats->min().ptr; minLen = stats->min().len; maxPtr = stats->max().ptr; maxLen = stats->max().len; - } else if(pqType == parquet::Type::FIXED_LEN_BYTE_ARRAY) { + } else if (pqType == parquet::Type::FIXED_LEN_BYTE_ARRAY) { // It seems like parquet-cpp doesn't actually produce stats for FLBA yet, so // rather than have untested code here, we'll just short circuit. // @@ -64,127 +69,116 @@ bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, s } else { // Should be impossible to get here std::ostringstream ss; - ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesBlobFilter called on unsupported type: " << - parquet::TypeToString(pqType); + ss << __FILE__ << ":" << __LINE__ + << ": currentRowGroupSatisfiesBlobFilter called on unsupported type: " + << parquet::TypeToString(pqType); throw std::invalid_argument(ss.str()); } - const std::vector& blob = constraint.blobValue; + const std::vector &blob = constraint.blobValue; - switch(constraint.op) { - case Is: - case Equal: - { - bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0; - bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; + switch (constraint.op) { + case Is: + case Equal: { + bool minEqual = + blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0; + bool maxEqual = + blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; - bool blobGteMinBlob = std::lexicographical_compare( - minPtr, - minPtr + minLen, - &blob[0], - &blob[0] + blob.size()); + bool blobGteMinBlob = std::lexicographical_compare( + minPtr, minPtr + minLen, &blob[0], &blob[0] + blob.size()); - bool blobLtMaxBlob = std::lexicographical_compare( - &blob[0], - &blob[0] + blob.size(), - maxPtr, - maxPtr + maxLen); + bool blobLtMaxBlob = std::lexicographical_compare( + &blob[0], &blob[0] + blob.size(), maxPtr, maxPtr + maxLen); + return (minEqual || blobGteMinBlob) && (maxEqual || blobLtMaxBlob); + } + case GreaterThanOrEqual: { + bool maxEqual = + blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; - return (minEqual || blobGteMinBlob) && (maxEqual || blobLtMaxBlob); - } - case GreaterThanOrEqual: - { - bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; - - return maxEqual || std::lexicographical_compare( - &blob[0], - &blob[0] + blob.size(), - maxPtr, - maxPtr + maxLen); - } - case GreaterThan: - return std::lexicographical_compare( - &blob[0], - &blob[0] + blob.size(), - maxPtr, - maxPtr + maxLen); - case LessThan: - return std::lexicographical_compare( - minPtr, - minPtr + minLen, - &blob[0], - &blob[0] + blob.size()); - case LessThanOrEqual: - { - bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0; - return minEqual || std::lexicographical_compare( - minPtr, - minPtr + minLen, - &blob[0], - &blob[0] + blob.size()); - } - case NotEqual: - { - // If min == max == blob, we can skip this. - bool blobMaxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; - bool minMaxEqual = minLen == maxLen && memcmp(minPtr, maxPtr, minLen) == 0; - return !(blobMaxEqual && minMaxEqual); - } - case IsNot: - default: - return true; + return maxEqual || + std::lexicographical_compare(&blob[0], &blob[0] + blob.size(), + maxPtr, maxPtr + maxLen); + } + case GreaterThan: + return std::lexicographical_compare(&blob[0], &blob[0] + blob.size(), + maxPtr, maxPtr + maxLen); + case LessThan: + return std::lexicographical_compare(minPtr, minPtr + minLen, &blob[0], + &blob[0] + blob.size()); + case LessThanOrEqual: { + bool minEqual = + blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0; + return minEqual || + std::lexicographical_compare(minPtr, minPtr + minLen, &blob[0], + &blob[0] + blob.size()); + } + case NotEqual: { + // If min == max == blob, we can skip this. + bool blobMaxEqual = + blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; + bool minMaxEqual = minLen == maxLen && memcmp(minPtr, maxPtr, minLen) == 0; + return !(blobMaxEqual && minMaxEqual); + } + case IsNot: + default: + return true; } } -bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr _stats) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); +bool ParquetCursor::currentRowGroupSatisfiesTextFilter( + Constraint &constraint, + std::shared_ptr _stats) { + parquet::TypedRowGroupStatistics> + *stats = (parquet::TypedRowGroupStatistics< + parquet::DataType> *)_stats.get(); - if(!stats->HasMinMax()) { + if (!stats->HasMinMax()) { return true; } - if(constraint.type != Text) { + if (constraint.type != Text) { return true; } - const std::string& str = constraint.stringValue; - const parquet::ByteArray& min = stats->min(); - const parquet::ByteArray& max = stats->max(); - std::string minStr((const char*)min.ptr, min.len); - std::string maxStr((const char*)max.ptr, max.len); -// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); + const std::string &str = constraint.stringValue; + const parquet::ByteArray &min = stats->min(); + const parquet::ByteArray &max = stats->max(); + std::string minStr((const char *)min.ptr, min.len); + std::string maxStr((const char *)max.ptr, max.len); + // printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, + // maxStr.data(), max.len, str.data()); - switch(constraint.op) { - case Is: - case Equal: - return str >= minStr && str <= maxStr; - case GreaterThanOrEqual: - return maxStr >= str; - case GreaterThan: - return maxStr > str; - case LessThan: - return minStr < str; - case LessThanOrEqual: - return minStr <= str; - case NotEqual: - // If min == max == str, we can skip this. - return !(minStr == maxStr && str == minStr); - case Like: - { - const std::string& likeStringValue = constraint.likeStringValue; - std::string truncatedMin = minStr.substr(0, likeStringValue.size()); - std::string truncatedMax = maxStr.substr(0, likeStringValue.size()); - return likeStringValue.empty() || (likeStringValue >= truncatedMin && likeStringValue <= truncatedMax); - } - case IsNot: - default: - return true; + switch (constraint.op) { + case Is: + case Equal: + return str >= minStr && str <= maxStr; + case GreaterThanOrEqual: + return maxStr >= str; + case GreaterThan: + return maxStr > str; + case LessThan: + return minStr < str; + case LessThanOrEqual: + return minStr <= str; + case NotEqual: + // If min == max == str, we can skip this. + return !(minStr == maxStr && str == minStr); + case Like: { + const std::string &likeStringValue = constraint.likeStringValue; + std::string truncatedMin = minStr.substr(0, likeStringValue.size()); + std::string truncatedMax = maxStr.substr(0, likeStringValue.size()); + return likeStringValue.empty() || + (likeStringValue >= truncatedMin && likeStringValue <= truncatedMax); + } + case IsNot: + default: + return true; } } -int64_t int96toMsSinceEpoch(const parquet::Int96& rv) { +int64_t int96toMsSinceEpoch(const parquet::Int96 &rv) { __int128 ns = rv.value[0] + ((unsigned long)rv.value[1] << 32); __int128 julianDay = rv.value[2]; __int128 nsSinceEpoch = (julianDay - 2440588); @@ -195,12 +189,14 @@ int64_t int96toMsSinceEpoch(const parquet::Int96& rv) { return nsSinceEpoch; } -bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr _stats) { - if(!_stats->HasMinMax()) { +bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter( + Constraint &constraint, + std::shared_ptr _stats) { + if (!_stats->HasMinMax()) { return true; } - if(constraint.type != Integer) { + if (constraint.type != Integer) { return true; } @@ -210,28 +206,32 @@ bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint int64_t max = std::numeric_limits::max(); parquet::Type::type pqType = types[column]; - if(pqType == parquet::Type::INT32) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + if (pqType == parquet::Type::INT32) { + parquet::TypedRowGroupStatistics> + *stats = (parquet::TypedRowGroupStatistics< + parquet::DataType> *)_stats.get(); min = stats->min(); max = stats->max(); - } else if(pqType == parquet::Type::INT64) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + } else if (pqType == parquet::Type::INT64) { + parquet::TypedRowGroupStatistics> + *stats = (parquet::TypedRowGroupStatistics< + parquet::DataType> *)_stats.get(); min = stats->min(); max = stats->max(); - } else if(pqType == parquet::Type::INT96) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + } else if (pqType == parquet::Type::INT96) { + parquet::TypedRowGroupStatistics> + *stats = (parquet::TypedRowGroupStatistics< + parquet::DataType> *)_stats.get(); min = int96toMsSinceEpoch(stats->min()); max = int96toMsSinceEpoch(stats->max()); - } else if(pqType == parquet::Type::BOOLEAN) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + } else if (pqType == parquet::Type::BOOLEAN) { + parquet::TypedRowGroupStatistics> + *stats = (parquet::TypedRowGroupStatistics< + parquet::DataType> *)_stats.get(); min = stats->min(); max = stats->max(); @@ -240,44 +240,48 @@ bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? std::ostringstream ss; - ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " << - parquet::TypeToString(pqType); + ss << __FILE__ << ":" << __LINE__ + << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " + << parquet::TypeToString(pqType); throw std::invalid_argument(ss.str()); } const int64_t value = constraint.intValue; -// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); + // printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, + // maxStr.data(), max.len, str.data()); - switch(constraint.op) { - case Is: - case Equal: - return value >= min && value <= max; - case GreaterThanOrEqual: - return max >= value; - case GreaterThan: - return max > value; - case LessThan: - return min < value; - case LessThanOrEqual: - return min <= value; - case NotEqual: - // If min == max == str, we can skip this. - return !(min == max && value == min); - case Like: - case IsNot: - default: - return true; + switch (constraint.op) { + case Is: + case Equal: + return value >= min && value <= max; + case GreaterThanOrEqual: + return max >= value; + case GreaterThan: + return max > value; + case LessThan: + return min < value; + case LessThanOrEqual: + return min <= value; + case NotEqual: + // If min == max == str, we can skip this. + return !(min == max && value == min); + case Like: + case IsNot: + default: + return true; } return true; } -bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr _stats) { - if(!_stats->HasMinMax()) { +bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter( + Constraint &constraint, + std::shared_ptr _stats) { + if (!_stats->HasMinMax()) { return true; } - if(constraint.type != Double) { + if (constraint.type != Double) { return true; } @@ -287,15 +291,17 @@ bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, double max = std::numeric_limits::max(); parquet::Type::type pqType = types[column]; - if(pqType == parquet::Type::DOUBLE) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + if (pqType == parquet::Type::DOUBLE) { + parquet::TypedRowGroupStatistics> + *stats = (parquet::TypedRowGroupStatistics< + parquet::DataType> *)_stats.get(); min = stats->min(); max = stats->max(); - } else if(pqType == parquet::Type::FLOAT) { - parquet::TypedRowGroupStatistics>* stats = - (parquet::TypedRowGroupStatistics>*)_stats.get(); + } else if (pqType == parquet::Type::FLOAT) { + parquet::TypedRowGroupStatistics> + *stats = (parquet::TypedRowGroupStatistics< + parquet::DataType> *)_stats.get(); min = stats->min(); max = stats->max(); @@ -303,129 +309,115 @@ bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? std::ostringstream ss; - ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " << - parquet::TypeToString(pqType); + ss << __FILE__ << ":" << __LINE__ + << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " + << parquet::TypeToString(pqType); throw std::invalid_argument(ss.str()); } const double value = constraint.doubleValue; -// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); + // printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, + // maxStr.data(), max.len, str.data()); - switch(constraint.op) { - case Is: - case Equal: - return value >= min && value <= max; - case GreaterThanOrEqual: - return max >= value; - case GreaterThan: - return max > value; - case LessThan: - return min < value; - case LessThanOrEqual: - return min <= value; - case NotEqual: - // If min == max == str, we can skip this. - return !(min == max && value == min); - case Like: - case IsNot: - default: - return true; - } - - return true; - -} - -bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint& constraint) { - if(constraint.type != Text) { + switch (constraint.op) { + case Is: + case Equal: + return value >= min && value <= max; + case GreaterThanOrEqual: + return max >= value; + case GreaterThan: + return max > value; + case LessThan: + return min < value; + case LessThanOrEqual: + return min <= value; + case NotEqual: + // If min == max == str, we can skip this. + return !(min == max && value == min); + case Like: + case IsNot: + default: return true; } - parquet::ByteArray* ba = getByteArray(constraint.column); + return true; +} - switch(constraint.op) { - case Is: - case Equal: - { - const std::vector& blob = constraint.blobValue; +bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint &constraint) { + if (constraint.type != Text) { + return true; + } - if(blob.size() != ba->len) - return false; + parquet::ByteArray *ba = getByteArray(constraint.column); - return 0 == memcmp(&blob[0], ba->ptr, ba->len); - } - case NotEqual: - { - const std::vector& blob = constraint.blobValue; + switch (constraint.op) { + case Is: + case Equal: { + const std::vector &blob = constraint.blobValue; - if(blob.size() != ba->len) - return true; + if (blob.size() != ba->len) + return false; - return 0 != memcmp(&blob[0], ba->ptr, ba->len); - } - case GreaterThan: - { - const std::vector& blob = constraint.blobValue; + return 0 == memcmp(&blob[0], ba->ptr, ba->len); + } + case NotEqual: { + const std::vector &blob = constraint.blobValue; - return std::lexicographical_compare( - &blob[0], - &blob[0] + blob.size(), - ba->ptr, - ba->ptr + ba->len); - } - case GreaterThanOrEqual: - { - const std::vector& blob = constraint.blobValue; - - bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len); - - return equal || std::lexicographical_compare( - &blob[0], - &blob[0] + blob.size(), - ba->ptr, - ba->ptr + ba->len); - } - case LessThan: - { - const std::vector& blob = constraint.blobValue; - - return std::lexicographical_compare( - ba->ptr, - ba->ptr + ba->len, - &blob[0], - &blob[0] + blob.size()); - } - case LessThanOrEqual: - { - const std::vector& blob = constraint.blobValue; - - bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len); - - return equal || std::lexicographical_compare( - ba->ptr, - ba->ptr + ba->len, - &blob[0], - &blob[0] + blob.size()); - } - case Like: - { - const std::string& likeStringValue = constraint.likeStringValue; - if(likeStringValue.size() > ba->len) - return false; - - size_t len = ba->len; - if(likeStringValue.size() < len) - len = likeStringValue.size(); - return 0 == memcmp(&likeStringValue[0], ba->ptr, len); - } - case IsNot: - default: + if (blob.size() != ba->len) return true; + + return 0 != memcmp(&blob[0], ba->ptr, ba->len); + } + case GreaterThan: { + const std::vector &blob = constraint.blobValue; + + return std::lexicographical_compare(&blob[0], &blob[0] + blob.size(), + ba->ptr, ba->ptr + ba->len); + } + case GreaterThanOrEqual: { + const std::vector &blob = constraint.blobValue; + + bool equal = + blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len); + + return equal || + std::lexicographical_compare(&blob[0], &blob[0] + blob.size(), + ba->ptr, ba->ptr + ba->len); + } + case LessThan: { + const std::vector &blob = constraint.blobValue; + + return std::lexicographical_compare(ba->ptr, ba->ptr + ba->len, &blob[0], + &blob[0] + blob.size()); + } + case LessThanOrEqual: { + const std::vector &blob = constraint.blobValue; + + bool equal = + blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len); + + return equal || + std::lexicographical_compare(ba->ptr, ba->ptr + ba->len, &blob[0], + &blob[0] + blob.size()); + } + case Like: { + const std::string &likeStringValue = constraint.likeStringValue; + if (likeStringValue.size() > ba->len) + return false; + + size_t len = ba->len; + if (likeStringValue.size() < len) + len = likeStringValue.size(); + return 0 == memcmp(&likeStringValue[0], ba->ptr, len); + } + case IsNot: + default: + return true; } } -bool ParquetCursor::currentRowSatisfiesIntegerFilter(Constraint& constraint) { - if(constraint.type != Integer) { +bool ParquetCursor::currentRowSatisfiesIntegerFilter(Constraint &constraint) { + if (constraint.type != Integer) { return true; } @@ -434,51 +426,53 @@ bool ParquetCursor::currentRowSatisfiesIntegerFilter(Constraint& constraint) { // CONSIDER: should we just store int64s everywhere? int64_t value = 0; - if(column == -1) { + if (column == -1) { value = rowId; } else { parquet::Type::type pqType = types[column]; - if(pqType == parquet::Type::INT32 || pqType == parquet::Type::BOOLEAN) { + if (pqType == parquet::Type::INT32 || pqType == parquet::Type::BOOLEAN) { value = getInt32(column); - } else if(pqType == parquet::Type::INT64 || pqType == parquet::Type::INT96) { + } else if (pqType == parquet::Type::INT64 || + pqType == parquet::Type::INT96) { value = getInt64(column); } else { // Should be impossible to get here std::ostringstream ss; - ss << __FILE__ << ":" << __LINE__ << ": currentRowSatisfiesIntegerFilter called on unsupported type: " << - parquet::TypeToString(pqType); + ss << __FILE__ << ":" << __LINE__ + << ": currentRowSatisfiesIntegerFilter called on unsupported type: " + << parquet::TypeToString(pqType); throw std::invalid_argument(ss.str()); } } int64_t constraintValue = constraint.intValue; - switch(constraint.op) { - case Is: - case Equal: - return constraintValue == value; - case NotEqual: - return constraintValue != value; - case GreaterThan: - return value > constraintValue; - case GreaterThanOrEqual: - return value >= constraintValue; - case LessThan: - return value < constraintValue; - case LessThanOrEqual: - return value <= constraintValue; - case Like: - case IsNot: - default: - return true; + switch (constraint.op) { + case Is: + case Equal: + return constraintValue == value; + case NotEqual: + return constraintValue != value; + case GreaterThan: + return value > constraintValue; + case GreaterThanOrEqual: + return value >= constraintValue; + case LessThan: + return value < constraintValue; + case LessThanOrEqual: + return value <= constraintValue; + case Like: + case IsNot: + default: + return true; } return true; } -bool ParquetCursor::currentRowSatisfiesDoubleFilter(Constraint& constraint) { - if(constraint.type != Double) { +bool ParquetCursor::currentRowSatisfiesDoubleFilter(Constraint &constraint) { + if (constraint.type != Double) { return true; } @@ -486,30 +480,29 @@ bool ParquetCursor::currentRowSatisfiesDoubleFilter(Constraint& constraint) { double value = getDouble(column); double constraintValue = constraint.doubleValue; - switch(constraint.op) { - case Is: - case Equal: - return constraintValue == value; - case NotEqual: - return constraintValue != value; - case GreaterThan: - return value > constraintValue; - case GreaterThanOrEqual: - return value >= constraintValue; - case LessThan: - return value < constraintValue; - case LessThanOrEqual: - return value <= constraintValue; - case Like: - case IsNot: - default: - return true; + switch (constraint.op) { + case Is: + case Equal: + return constraintValue == value; + case NotEqual: + return constraintValue != value; + case GreaterThan: + return value > constraintValue; + case GreaterThanOrEqual: + return value >= constraintValue; + case LessThan: + return value < constraintValue; + case LessThanOrEqual: + return value <= constraintValue; + case Like: + case IsNot: + default: + return true; } return true; } - // Return true if it is _possible_ that the current // rowgroup satisfies the constraints. Only return false // if it definitely does not. @@ -517,16 +510,17 @@ bool ParquetCursor::currentRowSatisfiesDoubleFilter(Constraint& constraint) { // This avoids opening rowgroups that can't return useful // data, which provides substantial performance benefits. bool ParquetCursor::currentRowGroupSatisfiesFilter() { - for(unsigned int i = 0; i < constraints.size(); i++) { + for (unsigned int i = 0; i < constraints.size(); i++) { int column = constraints[i].column; int op = constraints[i].op; bool rv = true; - if(column == -1) { + if (column == -1) { rv = currentRowGroupSatisfiesRowIdFilter(constraints[i]); } else { - std::unique_ptr md = rowGroupMetadata->ColumnChunk(column); - if(md->is_stats_set()) { + std::unique_ptr md = + rowGroupMetadata->ColumnChunk(column); + if (md->is_stats_set()) { std::shared_ptr stats = md->statistics(); // SQLite is much looser with types than you might expect if you @@ -538,23 +532,25 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() { // the constraint type may be a string, so dispatch to the filter // fn based on the Parquet type. - if(op == IsNull) { + if (op == IsNull) { rv = stats->null_count() > 0; - } else if(op == IsNotNull) { + } else if (op == IsNotNull) { rv = stats->num_values() > 0; } else { parquet::Type::type pqType = types[column]; - if(pqType == parquet::Type::BYTE_ARRAY && logicalTypes[column] == parquet::LogicalType::UTF8) { + if (pqType == parquet::Type::BYTE_ARRAY && + logicalTypes[column] == parquet::LogicalType::UTF8) { rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats); - } else if(pqType == parquet::Type::BYTE_ARRAY) { + } else if (pqType == parquet::Type::BYTE_ARRAY) { rv = currentRowGroupSatisfiesBlobFilter(constraints[i], stats); - } else if(pqType == parquet::Type::INT32 || - pqType == parquet::Type::INT64 || - pqType == parquet::Type::INT96 || - pqType == parquet::Type::BOOLEAN) { + } else if (pqType == parquet::Type::INT32 || + pqType == parquet::Type::INT64 || + pqType == parquet::Type::INT96 || + pqType == parquet::Type::BOOLEAN) { rv = currentRowGroupSatisfiesIntegerFilter(constraints[i], stats); - } else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) { + } else if (pqType == parquet::Type::FLOAT || + pqType == parquet::Type::DOUBLE) { rv = currentRowGroupSatisfiesDoubleFilter(constraints[i], stats); } } @@ -563,29 +559,29 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() { // and it with the existing actual, which may have come from a previous run rv = rv && constraints[i].bitmap.getActualMembership(rowGroupId); - if(!rv) { + if (!rv) { constraints[i].bitmap.setEstimatedMembership(rowGroupId, rv); constraints[i].bitmap.setActualMembership(rowGroupId, rv); return rv; } } -// printf("rowGroup %d %s\n", rowGroupId, overallRv ? "may satisfy" : "does not satisfy"); + // printf("rowGroup %d %s\n", rowGroupId, overallRv ? "may satisfy" : "does + // not satisfy"); return true; } - bool ParquetCursor::nextRowGroup() { start: - // Ensure that rowId points at the start of this rowGroup (eg, in the case where - // we skipped an entire row group). + // Ensure that rowId points at the start of this rowGroup (eg, in the case + // where we skipped an entire row group). rowId = rowGroupStartRowId + rowGroupSize; - if((rowGroupId + 1) >= numRowGroups) { + if ((rowGroupId + 1) >= numRowGroups) { return false; } - while(table->getNumColumns() >= scanners.size()) { + while (table->getNumColumns() >= scanners.size()) { scanners.push_back(std::shared_ptr()); // If it doesn't exist, it's the rowId as of the last nextRowGroup call colRows.push_back(rowGroupStartRowId); @@ -595,29 +591,30 @@ start: colByteArrayValues.push_back(parquet::ByteArray()); } - rowGroupStartRowId = rowId; rowGroupId++; rowGroupMetadata = reader->metadata()->RowGroup(rowGroupId); rowGroupSize = rowsLeftInRowGroup = rowGroupMetadata->num_rows(); rowGroup = reader->RowGroup(rowGroupId); - for(unsigned int i = 0; i < scanners.size(); i++) + for (unsigned int i = 0; i < scanners.size(); i++) scanners[i] = NULL; - while(types.size() < (unsigned int)rowGroupMetadata->num_columns()) { + while (types.size() < (unsigned int)rowGroupMetadata->num_columns()) { types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type()); } - while(logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) { - logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type()); + while (logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) { + logicalTypes.push_back( + rowGroupMetadata->schema()->Column(0)->logical_type()); } - for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) { + for (unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); + i++) { types[i] = rowGroupMetadata->schema()->Column(i)->physical_type(); logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type(); } - for(unsigned int i = 0; i < colRows.size(); i++) { + for (unsigned int i = 0; i < colRows.size(); i++) { colRows[i] = rowId; } @@ -627,17 +624,18 @@ start: // We're going to scan this row group; reset the expectation of discovering // a row - for(unsigned int i = 0; i < constraints.size(); i++) { - if(rowGroupId > 0 && constraints[i].rowGroupId == rowGroupId - 1) { - constraints[i].bitmap.setActualMembership(rowGroupId - 1, constraints[i].hadRows); + for (unsigned int i = 0; i < constraints.size(); i++) { + if (rowGroupId > 0 && constraints[i].rowGroupId == rowGroupId - 1) { + constraints[i].bitmap.setActualMembership(rowGroupId - 1, + constraints[i].hadRows); } constraints[i].hadRows = false; } - if(!currentRowGroupSatisfiesFilter()) + if (!currentRowGroupSatisfiesFilter()) goto start; - for(unsigned int i = 0; i < constraints.size(); i++) { + for (unsigned int i = 0; i < constraints.size(); i++) { constraints[i].rowGroupId = rowGroupId; } return true; @@ -652,28 +650,28 @@ start: // of millions of rows. bool ParquetCursor::currentRowSatisfiesFilter() { bool overallRv = true; - for(unsigned int i = 0; i < constraints.size(); i++) { + for (unsigned int i = 0; i < constraints.size(); i++) { bool rv = true; int column = constraints[i].column; ensureColumn(column); int op = constraints[i].op; - if(op == IsNull) { + if (op == IsNull) { rv = isNull(column); - } else if(op == IsNotNull) { + } else if (op == IsNotNull) { rv = !isNull(column); } else { - if(logicalTypes[column] == parquet::LogicalType::UTF8) { + if (logicalTypes[column] == parquet::LogicalType::UTF8) { rv = currentRowSatisfiesTextFilter(constraints[i]); } else { parquet::Type::type pqType = types[column]; - if(pqType == parquet::Type::INT32 || - pqType == parquet::Type::INT64 || - pqType == parquet::Type::INT96 || - pqType == parquet::Type::BOOLEAN) { + if (pqType == parquet::Type::INT32 || pqType == parquet::Type::INT64 || + pqType == parquet::Type::INT96 || + pqType == parquet::Type::BOOLEAN) { rv = currentRowSatisfiesIntegerFilter(constraints[i]); - } else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) { + } else if (pqType == parquet::Type::FLOAT || + pqType == parquet::Type::DOUBLE) { rv = currentRowSatisfiesDoubleFilter(constraints[i]); } } @@ -681,7 +679,7 @@ bool ParquetCursor::currentRowSatisfiesFilter() { // it defaults to false; so only set it if true // ideally we'd short-circuit if we'd already set this group as visited - if(rv) { + if (rv) { constraints[i].hadRows = true; } overallRv = overallRv && rv; @@ -692,116 +690,106 @@ bool ParquetCursor::currentRowSatisfiesFilter() { void ParquetCursor::next() { // Returns true if we've crossed a row group boundary start: - if(rowsLeftInRowGroup == 0) { - if(!nextRowGroup()) { + if (rowsLeftInRowGroup == 0) { + if (!nextRowGroup()) { // put rowId over the edge so eof returns true rowId = numRows + 1; return; } else { - // After a successful nextRowGroup, rowId is pointing at the current row. Make it - // point before so the rest of the logic works out. + // After a successful nextRowGroup, rowId is pointing at the current row. + // Make it point before so the rest of the logic works out. rowId--; } } rowsLeftInRowGroup--; rowId++; - if(constraints.size() > 0 && !currentRowSatisfiesFilter()) + if (constraints.size() > 0 && !currentRowSatisfiesFilter()) goto start; } -int ParquetCursor::getRowId() { - return rowId; -} +int ParquetCursor::getRowId() { return rowId; } -bool ParquetCursor::eof() { - return rowId > numRows; -} +bool ParquetCursor::eof() { return rowId > numRows; } void ParquetCursor::ensureColumn(int col) { // -1 signals rowid, which is trivially available - if(col == -1) + if (col == -1) return; // need to ensure a scanner exists (and skip the # of rows in the rowgroup) - if(scanners[col].get() == NULL) { + if (scanners[col].get() == NULL) { std::shared_ptr colReader = rowGroup->Column(col); scanners[col] = parquet::Scanner::Make(colReader); } // Actually fetch a value, stash data in colRows, colNulls, colValues - if(colRows[col] != rowId) { + if (colRows[col] != rowId) { // We may need to skip some records, eg, a query like // SELECT a WHERE b = 10 // may have read b, but skipped a until b matches the predicate. bool wasNull = false; - while(colRows[col] + 1 < rowId) { - switch(types[col]) { - case parquet::Type::INT32: - { - parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get(); - int rv = 0; - s->NextValue(&rv, &wasNull); - break; - } - case parquet::Type::FLOAT: - { - parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get(); - float rv = 0; - s->NextValue(&rv, &wasNull); - break; - } - case parquet::Type::DOUBLE: - { - parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); - double rv = 0; - s->NextValue(&rv, &wasNull); - break; - } - case parquet::Type::BYTE_ARRAY: - { - parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); - parquet::ByteArray ba; - s->NextValue(&ba, &wasNull); - break; - } - case parquet::Type::INT96: - { - parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); - parquet::Int96 rv; - s->NextValue(&rv, &wasNull); - break; - } - case parquet::Type::INT64: - { - parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); - long rv = 0; - s->NextValue(&rv, &wasNull); - break; - } - case parquet::Type::BOOLEAN: - { - parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get(); - bool rv = false; - s->NextValue(&rv, &wasNull); - break; - } - case parquet::Type::FIXED_LEN_BYTE_ARRAY: - { - parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get(); - parquet::FixedLenByteArray flba; - s->NextValue(&flba, &wasNull); - break; - } - default: - // Should be impossible to get here as we should have forbidden this at - // CREATE time -- maybe file changed underneath us? - std::ostringstream ss; - ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << - parquet::TypeToString(types[col]); - throw std::invalid_argument(ss.str()); + while (colRows[col] + 1 < rowId) { + switch (types[col]) { + case parquet::Type::INT32: { + parquet::Int32Scanner *s = (parquet::Int32Scanner *)scanners[col].get(); + int rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::FLOAT: { + parquet::FloatScanner *s = (parquet::FloatScanner *)scanners[col].get(); + float rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::DOUBLE: { + parquet::DoubleScanner *s = + (parquet::DoubleScanner *)scanners[col].get(); + double rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::BYTE_ARRAY: { + parquet::ByteArrayScanner *s = + (parquet::ByteArrayScanner *)scanners[col].get(); + parquet::ByteArray ba; + s->NextValue(&ba, &wasNull); + break; + } + case parquet::Type::INT96: { + parquet::Int96Scanner *s = (parquet::Int96Scanner *)scanners[col].get(); + parquet::Int96 rv; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::INT64: { + parquet::Int64Scanner *s = (parquet::Int64Scanner *)scanners[col].get(); + long rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::BOOLEAN: { + parquet::BoolScanner *s = (parquet::BoolScanner *)scanners[col].get(); + bool rv = false; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::FIXED_LEN_BYTE_ARRAY: { + parquet::FixedLenByteArrayScanner *s = + (parquet::FixedLenByteArrayScanner *)scanners[col].get(); + parquet::FixedLenByteArray flba; + s->NextValue(&flba, &wasNull); + break; + } + default: + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << col + << " has unsupported type: " << parquet::TypeToString(types[col]); + throw std::invalid_argument(ss.str()); break; - } colRows[col]++; } @@ -810,90 +798,86 @@ void ParquetCursor::ensureColumn(int col) { wasNull = false; bool hadValue = false; - switch(types[col]) { - case parquet::Type::INT32: - { - parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get(); - int rv = 0; - hadValue = s->NextValue(&rv, &wasNull); - colIntValues[col] = rv; - break; - } - case parquet::Type::FLOAT: - { - parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get(); - float rv = 0; - hadValue = s->NextValue(&rv, &wasNull); - colDoubleValues[col] = rv; - break; - } - case parquet::Type::DOUBLE: - { - parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); - double rv = 0; - hadValue = s->NextValue(&rv, &wasNull); - colDoubleValues[col] = rv; - break; - } - case parquet::Type::BYTE_ARRAY: - { - parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); - hadValue = s->NextValue(&colByteArrayValues[col], &wasNull); - break; - } - case parquet::Type::INT96: - { - // INT96 tracks a date with nanosecond precision, convert to ms since epoch. - // ...see https://github.com/apache/parquet-format/pull/49 for more - // - // First 8 bytes: nanoseconds into the day - // Last 4 bytes: Julian day - // To get nanoseconds since the epoch: - // (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds - parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); - parquet::Int96 rv {0, 0, 0}; - hadValue = s->NextValue(&rv, &wasNull); - colIntValues[col] = int96toMsSinceEpoch(rv); - break; - } - case parquet::Type::INT64: - { - parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); - long rv = 0; - hadValue = s->NextValue(&rv, &wasNull); - colIntValues[col] = rv; - break; - } - - case parquet::Type::BOOLEAN: - { - parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get(); - bool rv = false; - hadValue = s->NextValue(&rv, &wasNull); - colIntValues[col] = rv ? 1 : 0; - break; - } - case parquet::Type::FIXED_LEN_BYTE_ARRAY: - { - parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get(); - parquet::FixedLenByteArray flba; - hadValue = s->NextValue(&flba, &wasNull); - colByteArrayValues[col].ptr = flba.ptr; - // TODO: cache this - colByteArrayValues[col].len = rowGroupMetadata->schema()->Column(col)->type_length(); - break; - } - default: - // Should be impossible to get here as we should have forbidden this at - // CREATE time -- maybe file changed underneath us? - std::ostringstream ss; - ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << - parquet::TypeToString(types[col]); - throw std::invalid_argument(ss.str()); + switch (types[col]) { + case parquet::Type::INT32: { + parquet::Int32Scanner *s = (parquet::Int32Scanner *)scanners[col].get(); + int rv = 0; + hadValue = s->NextValue(&rv, &wasNull); + colIntValues[col] = rv; + break; + } + case parquet::Type::FLOAT: { + parquet::FloatScanner *s = (parquet::FloatScanner *)scanners[col].get(); + float rv = 0; + hadValue = s->NextValue(&rv, &wasNull); + colDoubleValues[col] = rv; + break; + } + case parquet::Type::DOUBLE: { + parquet::DoubleScanner *s = (parquet::DoubleScanner *)scanners[col].get(); + double rv = 0; + hadValue = s->NextValue(&rv, &wasNull); + colDoubleValues[col] = rv; + break; + } + case parquet::Type::BYTE_ARRAY: { + parquet::ByteArrayScanner *s = + (parquet::ByteArrayScanner *)scanners[col].get(); + hadValue = s->NextValue(&colByteArrayValues[col], &wasNull); + break; + } + case parquet::Type::INT96: { + // INT96 tracks a date with nanosecond precision, convert to ms since + // epoch. + // ...see https://github.com/apache/parquet-format/pull/49 for more + // + // First 8 bytes: nanoseconds into the day + // Last 4 bytes: Julian day + // To get nanoseconds since the epoch: + // (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds + parquet::Int96Scanner *s = (parquet::Int96Scanner *)scanners[col].get(); + parquet::Int96 rv{0, 0, 0}; + hadValue = s->NextValue(&rv, &wasNull); + colIntValues[col] = int96toMsSinceEpoch(rv); + break; + } + case parquet::Type::INT64: { + parquet::Int64Scanner *s = (parquet::Int64Scanner *)scanners[col].get(); + long rv = 0; + hadValue = s->NextValue(&rv, &wasNull); + colIntValues[col] = rv; break; } - if(!hadValue) + case parquet::Type::BOOLEAN: { + parquet::BoolScanner *s = (parquet::BoolScanner *)scanners[col].get(); + bool rv = false; + hadValue = s->NextValue(&rv, &wasNull); + colIntValues[col] = rv ? 1 : 0; + break; + } + case parquet::Type::FIXED_LEN_BYTE_ARRAY: { + parquet::FixedLenByteArrayScanner *s = + (parquet::FixedLenByteArrayScanner *)scanners[col].get(); + parquet::FixedLenByteArray flba; + hadValue = s->NextValue(&flba, &wasNull); + colByteArrayValues[col].ptr = flba.ptr; + // TODO: cache this + colByteArrayValues[col].len = + rowGroupMetadata->schema()->Column(col)->type_length(); + break; + } + default: + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << col + << " has unsupported type: " << parquet::TypeToString(types[col]); + throw std::invalid_argument(ss.str()); + break; + } + + if (!hadValue) throw std::invalid_argument("unexpectedly lacking a next value"); colNulls[col] = wasNull; @@ -902,25 +886,19 @@ void ParquetCursor::ensureColumn(int col) { bool ParquetCursor::isNull(int col) { // -1 is rowid, which is trivially non null - if(col == -1) + if (col == -1) return false; return colNulls[col]; } -int ParquetCursor::getInt32(int col) { - return colIntValues[col]; -} +int ParquetCursor::getInt32(int col) { return colIntValues[col]; } -long ParquetCursor::getInt64(int col) { - return colIntValues[col]; -} +long ParquetCursor::getInt64(int col) { return colIntValues[col]; } -double ParquetCursor::getDouble(int col) { - return colDoubleValues[col]; -} +double ParquetCursor::getDouble(int col) { return colDoubleValues[col]; } -parquet::ByteArray* ParquetCursor::getByteArray(int col) { +parquet::ByteArray *ParquetCursor::getByteArray(int col) { return &colByteArrayValues[col]; } @@ -933,7 +911,7 @@ parquet::LogicalType::type ParquetCursor::getLogicalType(int col) { } void ParquetCursor::close() { - if(reader != NULL) { + if (reader != NULL) { reader->Close(); } } @@ -942,12 +920,10 @@ void ParquetCursor::reset(std::vector constraints) { close(); this->constraints = constraints; rowId = 0; - // TODO: consider having a long lived handle in ParquetTable that can be borrowed - // without incurring the cost of opening the file from scratch twice + // TODO: consider having a long lived handle in ParquetTable that can be + // borrowed without incurring the cost of opening the file from scratch twice reader = parquet::ParquetFileReader::OpenFile( - table->getFile().data(), - true, - parquet::default_reader_properties(), + table->getFile().data(), true, parquet::default_reader_properties(), table->getMetadata()); rowGroupId = -1; @@ -961,10 +937,12 @@ void ParquetCursor::reset(std::vector constraints) { numRowGroups = reader->metadata()->num_row_groups(); } -ParquetTable* ParquetCursor::getTable() const { return table; } +ParquetTable *ParquetCursor::getTable() const { return table; } unsigned int ParquetCursor::getNumRowGroups() const { return numRowGroups; } -unsigned int ParquetCursor::getNumConstraints() const { return constraints.size(); } -const Constraint& ParquetCursor::getConstraint(unsigned int i) const { return constraints[i]; } - - +unsigned int ParquetCursor::getNumConstraints() const { + return constraints.size(); +} +const Constraint &ParquetCursor::getConstraint(unsigned int i) const { + return constraints[i]; +} diff --git a/src/parquet_cursor.h b/src/parquet_cursor.h index f7d8c2a..8c6edb4 100644 --- a/src/parquet_cursor.h +++ b/src/parquet_cursor.h @@ -1,13 +1,13 @@ #ifndef PARQUET_CURSOR_H #define PARQUET_CURSOR_H +#include "parquet/api/reader.h" #include "parquet_filter.h" #include "parquet_table.h" -#include "parquet/api/reader.h" class ParquetCursor { - ParquetTable* table; + ParquetTable *table; std::unique_ptr reader; std::unique_ptr rowGroupMetadata; std::shared_ptr rowGroup; @@ -35,19 +35,26 @@ class ParquetCursor { bool currentRowSatisfiesFilter(); bool currentRowGroupSatisfiesFilter(); - bool currentRowGroupSatisfiesRowIdFilter(Constraint& constraint); - bool currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr stats); - bool currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr stats); - bool currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr stats); - bool currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr stats); - - bool currentRowSatisfiesTextFilter(Constraint& constraint); - bool currentRowSatisfiesIntegerFilter(Constraint& constraint); - bool currentRowSatisfiesDoubleFilter(Constraint& constraint); + bool currentRowGroupSatisfiesRowIdFilter(Constraint &constraint); + bool currentRowGroupSatisfiesTextFilter( + Constraint &constraint, + std::shared_ptr stats); + bool currentRowGroupSatisfiesBlobFilter( + Constraint &constraint, + std::shared_ptr stats); + bool currentRowGroupSatisfiesIntegerFilter( + Constraint &constraint, + std::shared_ptr stats); + bool currentRowGroupSatisfiesDoubleFilter( + Constraint &constraint, + std::shared_ptr stats); + bool currentRowSatisfiesTextFilter(Constraint &constraint); + bool currentRowSatisfiesIntegerFilter(Constraint &constraint); + bool currentRowSatisfiesDoubleFilter(Constraint &constraint); public: - ParquetCursor(ParquetTable* table); + ParquetCursor(ParquetTable *table); int getRowId(); void next(); void close(); @@ -58,16 +65,15 @@ public: bool isNull(int col); unsigned int getNumRowGroups() const; unsigned int getNumConstraints() const; - const Constraint& getConstraint(unsigned int i) const; + const Constraint &getConstraint(unsigned int i) const; parquet::Type::type getPhysicalType(int col); parquet::LogicalType::type getLogicalType(int col); - ParquetTable* getTable() const; + ParquetTable *getTable() const; int getInt32(int col); long getInt64(int col); double getDouble(int col); - parquet::ByteArray* getByteArray(int col); + parquet::ByteArray *getByteArray(int col); }; #endif - diff --git a/src/parquet_filter.cc b/src/parquet_filter.cc index 4220dc2..722aa14 100644 --- a/src/parquet_filter.cc +++ b/src/parquet_filter.cc @@ -1,40 +1,29 @@ #include "parquet_filter.h" -Constraint::Constraint( - RowGroupBitmap bitmap, - int column, - std::string columnName, - ConstraintOperator op, - ValueType type, - int64_t intValue, - double doubleValue, - std::vector blobValue -): bitmap(bitmap), - column(column), - columnName(columnName), - op(op), - type(type), - intValue(intValue), - doubleValue(doubleValue), - blobValue(blobValue), - hadRows(false) { - RowGroupBitmap bm = bitmap; - this->bitmap = bm; +Constraint::Constraint(RowGroupBitmap bitmap, int column, + std::string columnName, ConstraintOperator op, + ValueType type, int64_t intValue, double doubleValue, + std::vector blobValue) + : bitmap(bitmap), column(column), columnName(columnName), op(op), + type(type), intValue(intValue), doubleValue(doubleValue), + blobValue(blobValue), hadRows(false) { + RowGroupBitmap bm = bitmap; + this->bitmap = bm; - if(type == Text) { - stringValue = std::string((char*)&blobValue[0], blobValue.size()); + if (type == Text) { + stringValue = std::string((char *)&blobValue[0], blobValue.size()); - if(op == Like) { + if (op == Like) { // This permits more rowgroups than is strictly needed // since it assumes an implicit wildcard. But it's // simple to implement, so we'll go with it. likeStringValue = stringValue; size_t idx = likeStringValue.find_first_of("%"); - if(idx != std::string::npos) { + if (idx != std::string::npos) { likeStringValue = likeStringValue.substr(0, idx); } idx = likeStringValue.find_first_of("_"); - if(idx != std::string::npos) { + if (idx != std::string::npos) { likeStringValue = likeStringValue.substr(0, idx); } } @@ -45,61 +34,61 @@ std::string Constraint::describe() const { std::string rv; rv.append(columnName); rv.append(" "); - switch(op) { - case Equal: - rv.append("="); - break; - case GreaterThan: - rv.append(">"); - break; - case LessThanOrEqual: - rv.append("<="); - break; - case LessThan: - rv.append("<"); - break; - case GreaterThanOrEqual: - rv.append(">="); - break; - case Like: - rv.append("LIKE"); - break; - case Glob: - rv.append("GLOB"); - break; - case NotEqual: - rv.append("<>"); - break; - case IsNot: - rv.append("IS NOT"); - break; - case IsNotNull: - rv.append("IS NOT NULL"); - break; - case IsNull: - rv.append("IS NULL"); - break; - case Is: - rv.append("IS"); - break; + switch (op) { + case Equal: + rv.append("="); + break; + case GreaterThan: + rv.append(">"); + break; + case LessThanOrEqual: + rv.append("<="); + break; + case LessThan: + rv.append("<"); + break; + case GreaterThanOrEqual: + rv.append(">="); + break; + case Like: + rv.append("LIKE"); + break; + case Glob: + rv.append("GLOB"); + break; + case NotEqual: + rv.append("<>"); + break; + case IsNot: + rv.append("IS NOT"); + break; + case IsNotNull: + rv.append("IS NOT NULL"); + break; + case IsNull: + rv.append("IS NULL"); + break; + case Is: + rv.append("IS"); + break; } rv.append(" "); - switch(type) { - case Null: - rv.append("NULL"); - break; - case Integer: - rv.append(std::to_string(intValue)); - break; - case Double: - rv.append(std::to_string(doubleValue)); - break; - case Blob: - break; - case Text: - rv.append(stringValue); - break; + switch (type) { + case Null: + rv.append("NULL"); + break; + case Integer: + rv.append(std::to_string(intValue)); + break; + case Double: + rv.append(std::to_string(doubleValue)); + break; + case Blob: + break; + case Text: + rv.append(stringValue); + break; } return rv; } diff --git a/src/parquet_filter.h b/src/parquet_filter.h index fd95dbb..e22fa07 100644 --- a/src/parquet_filter.h +++ b/src/parquet_filter.h @@ -1,9 +1,9 @@ #ifndef PARQUET_FILTER_H #define PARQUET_FILTER_H -#include -#include #include +#include +#include enum ConstraintOperator { Equal, @@ -20,43 +20,36 @@ enum ConstraintOperator { Is }; -enum ValueType { - Null, - Integer, - Double, - Blob, - Text -}; +enum ValueType { Null, Integer, Double, Blob, Text }; class RowGroupBitmap { - void setBit(std::vector& membership, unsigned int rowGroup, bool isSet) { + void setBit(std::vector &membership, unsigned int rowGroup, + bool isSet) { int byte = rowGroup / 8; int offset = rowGroup % 8; unsigned char c = membership[byte]; c &= ~(1UL << offset); - if(isSet) { + if (isSet) { c |= 1UL << offset; } membership[byte] = c; } -// Compares estimated rowGroupFilter results against observed results -// when we explored the row group. This lets us cache + // Compares estimated rowGroupFilter results against observed results + // when we explored the row group. This lets us cache public: RowGroupBitmap(unsigned int totalRowGroups) { // Initialize everything to assume that all row groups match. // As we discover otherwise, we'll update that assumption. - for(unsigned int i = 0; i < (totalRowGroups + 7) / 8; i++) { + for (unsigned int i = 0; i < (totalRowGroups + 7) / 8; i++) { estimatedMembership.push_back(0xFF); actualMembership.push_back(0xFF); } } - RowGroupBitmap( - std::vector estimatedMembership, - std::vector actualMembership) : - estimatedMembership(estimatedMembership), - actualMembership(actualMembership) { - } + RowGroupBitmap(std::vector estimatedMembership, + std::vector actualMembership) + : estimatedMembership(estimatedMembership), + actualMembership(actualMembership) {} std::vector estimatedMembership; std::vector actualMembership; @@ -80,17 +73,11 @@ public: class Constraint { public: - // Kind of a messy constructor function, but it's just for internal use, so whatever. - Constraint( - RowGroupBitmap bitmap, - int column, - std::string columnName, - ConstraintOperator op, - ValueType type, - int64_t intValue, - double doubleValue, - std::vector blobValue - ); + // Kind of a messy constructor function, but it's just for internal use, so + // whatever. + Constraint(RowGroupBitmap bitmap, int column, std::string columnName, + ConstraintOperator op, ValueType type, int64_t intValue, + double doubleValue, std::vector blobValue); RowGroupBitmap bitmap; int column; // underlying column in the query diff --git a/src/parquet_table.cc b/src/parquet_table.cc index d796b8a..5146cfd 100644 --- a/src/parquet_table.cc +++ b/src/parquet_table.cc @@ -2,61 +2,61 @@ #include "parquet/api/reader.h" -ParquetTable::ParquetTable(std::string file, std::string tableName): file(file), tableName(tableName) { - std::unique_ptr reader = parquet::ParquetFileReader::OpenFile(file.data()); +ParquetTable::ParquetTable(std::string file, std::string tableName) + : file(file), tableName(tableName) { + std::unique_ptr reader = + parquet::ParquetFileReader::OpenFile(file.data()); metadata = reader->metadata(); } std::string ParquetTable::columnName(int i) { - if(i == -1) + if (i == -1) return "rowid"; return columnNames[i]; } -unsigned int ParquetTable::getNumColumns() { - return columnNames.size(); -} - +unsigned int ParquetTable::getNumColumns() { return columnNames.size(); } std::string ParquetTable::CreateStatement() { - std::unique_ptr reader = parquet::ParquetFileReader::OpenFile( - file.data(), - true, - parquet::default_reader_properties(), - metadata); + std::unique_ptr reader = + parquet::ParquetFileReader::OpenFile( + file.data(), true, parquet::default_reader_properties(), metadata); std::string text("CREATE TABLE x("); auto schema = reader->metadata()->schema(); - for(auto i = 0; i < schema->num_columns(); i++) { + for (auto i = 0; i < schema->num_columns(); i++) { auto _col = schema->GetColumnRoot(i); columnNames.push_back(_col->name()); } - for(auto i = 0; i < schema->num_columns(); i++) { + for (auto i = 0; i < schema->num_columns(); i++) { auto _col = schema->GetColumnRoot(i); - if(!_col->is_primitive()) { + if (!_col->is_primitive()) { std::ostringstream ss; - ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-primitive type"; + ss << __FILE__ << ":" << __LINE__ << ": column " << i + << " has non-primitive type"; throw std::invalid_argument(ss.str()); } - if(_col->is_repeated()) { + if (_col->is_repeated()) { std::ostringstream ss; - ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-scalar type"; + ss << __FILE__ << ":" << __LINE__ << ": column " << i + << " has non-scalar type"; throw std::invalid_argument(ss.str()); } - parquet::schema::PrimitiveNode* col = (parquet::schema::PrimitiveNode*)_col; + parquet::schema::PrimitiveNode *col = + (parquet::schema::PrimitiveNode *)_col; - if(i > 0) + if (i > 0) text += ", "; text += "\""; // Horrifically inefficient, but easy to understand. std::string colName = col->name(); - for(char& c : colName) { - if(c == '"') + for (char &c : colName) { + if (c == '"') text += "\"\""; else text += c; @@ -71,7 +71,7 @@ std::string ParquetTable::CreateStatement() { // whose unsigned ints start getting interpreted as signed. (We could // support this for UINT_8/16/32 -- and for UINT_64 we could throw if // the high bit was set.) - if(logical == parquet::LogicalType::NONE || + if (logical == parquet::LogicalType::NONE || logical == parquet::LogicalType::UTF8 || logical == parquet::LogicalType::DATE || logical == parquet::LogicalType::TIME_MILLIS || @@ -82,74 +82,74 @@ std::string ParquetTable::CreateStatement() { logical == parquet::LogicalType::INT_16 || logical == parquet::LogicalType::INT_32 || logical == parquet::LogicalType::INT_64) { - switch(physical) { - case parquet::Type::BOOLEAN: + switch (physical) { + case parquet::Type::BOOLEAN: + type = "TINYINT"; + break; + case parquet::Type::INT32: + if (logical == parquet::LogicalType::NONE || + logical == parquet::LogicalType::INT_32) { + type = "INT"; + } else if (logical == parquet::LogicalType::INT_8) { type = "TINYINT"; - break; - case parquet::Type::INT32: - if(logical == parquet::LogicalType::NONE || - logical == parquet::LogicalType::INT_32) { - type = "INT"; - } else if(logical == parquet::LogicalType::INT_8) { - type = "TINYINT"; - } else if(logical == parquet::LogicalType::INT_16) { - type = "SMALLINT"; - } - break; - case parquet::Type::INT96: - // INT96 is used for nanosecond precision on timestamps; we truncate - // to millisecond precision. - case parquet::Type::INT64: - type = "BIGINT"; - break; - case parquet::Type::FLOAT: - type = "REAL"; - break; - case parquet::Type::DOUBLE: - type = "DOUBLE"; - break; - case parquet::Type::BYTE_ARRAY: - if(logical == parquet::LogicalType::UTF8) { - type = "TEXT"; - } else { - type = "BLOB"; - } - break; - case parquet::Type::FIXED_LEN_BYTE_ARRAY: + } else if (logical == parquet::LogicalType::INT_16) { + type = "SMALLINT"; + } + break; + case parquet::Type::INT96: + // INT96 is used for nanosecond precision on timestamps; we truncate + // to millisecond precision. + case parquet::Type::INT64: + type = "BIGINT"; + break; + case parquet::Type::FLOAT: + type = "REAL"; + break; + case parquet::Type::DOUBLE: + type = "DOUBLE"; + break; + case parquet::Type::BYTE_ARRAY: + if (logical == parquet::LogicalType::UTF8) { + type = "TEXT"; + } else { type = "BLOB"; - break; - default: - break; + } + break; + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + type = "BLOB"; + break; + default: + break; } } - if(type.empty()) { + if (type.empty()) { std::ostringstream ss; - ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has unsupported type: " << - parquet::TypeToString(physical) << "/" << parquet::LogicalTypeToString(logical); + ss << __FILE__ << ":" << __LINE__ << ": column " << i + << " has unsupported type: " << parquet::TypeToString(physical) << "/" + << parquet::LogicalTypeToString(logical); throw std::invalid_argument(ss.str()); } #ifdef DEBUG - printf("col %d[name=%s, p=%d:%s, l=%d:%s] is %s\n", - i, - col->name().data(), + printf( + "col %d[name=%s, p=%d:%s, l=%d:%s] is %s\n", i, col->name().data(), col->physical_type(), - parquet::TypeToString(col->physical_type()).data(), - col->logical_type(), - parquet::LogicalTypeToString(col->logical_type()).data(), - type.data()); + parquet::TypeToString(col->physical_type()).data(), col->logical_type(), + parquet::LogicalTypeToString(col->logical_type()).data(), type.data()); #endif text += " "; text += type; } - text +=");"; + text += ");"; return text; } -std::shared_ptr ParquetTable::getMetadata() { return metadata; } +std::shared_ptr ParquetTable::getMetadata() { + return metadata; +} -const std::string& ParquetTable::getFile() { return file; } -const std::string& ParquetTable::getTableName() { return tableName; } +const std::string &ParquetTable::getFile() { return file; } +const std::string &ParquetTable::getTableName() { return tableName; } diff --git a/src/parquet_table.h b/src/parquet_table.h index 6b35cae..da976c7 100644 --- a/src/parquet_table.h +++ b/src/parquet_table.h @@ -1,9 +1,9 @@ #ifndef PARQUET_TABLE_H #define PARQUET_TABLE_H -#include -#include #include "parquet/api/reader.h" +#include +#include class ParquetTable { std::string file; @@ -11,15 +11,14 @@ class ParquetTable { std::vector columnNames; std::shared_ptr metadata; - public: ParquetTable(std::string file, std::string tableName); std::string CreateStatement(); std::string columnName(int idx); unsigned int getNumColumns(); std::shared_ptr getMetadata(); - const std::string& getFile(); - const std::string& getTableName(); + const std::string &getFile(); + const std::string &getTableName(); }; #endif