From 19a5c4398830bb819e8e5b7a52eef79aba3fdd55 Mon Sep 17 00:00:00 2001 From: taozhang36 Date: Fri, 23 May 2025 11:46:59 +0800 Subject: [PATCH] add support for windows system --- README.md | 66 +- parquet-windows/parquet.cpp | 760 +++++++++++++++++++++ parquet-windows/parquet_cursor.cpp | 1003 ++++++++++++++++++++++++++++ parquet-windows/parquet_cursor.h | 73 ++ parquet-windows/parquet_filter.cpp | 105 +++ parquet-windows/parquet_filter.h | 120 ++++ parquet-windows/parquet_table.cpp | 160 +++++ parquet-windows/parquet_table.h | 25 + 8 files changed, 2307 insertions(+), 5 deletions(-) create mode 100644 parquet-windows/parquet.cpp create mode 100644 parquet-windows/parquet_cursor.cpp create mode 100644 parquet-windows/parquet_cursor.h create mode 100644 parquet-windows/parquet_filter.cpp create mode 100644 parquet-windows/parquet_filter.h create mode 100644 parquet-windows/parquet_table.cpp create mode 100644 parquet-windows/parquet_table.h diff --git a/README.md b/README.md index 47be8d0..bad9200 100644 --- a/README.md +++ b/README.md @@ -7,13 +7,15 @@ A SQLite [virtual table](https://sqlite.org/vtab.html) extension to expose Parqu This [blog post](https://cldellow.com/2018/06/22/sqlite-parquet-vtable.html) provides some context on why you might use this. -## Installing +## For Linux -### Download +### Installing + +#### Download You can fetch a version built for Ubuntu 16.04 at https://s3.amazonaws.com/cldellow/public/libparquet/libparquet.so.xz -### Building +#### Building ``` ./make-linux @@ -23,11 +25,11 @@ The first run will git clone a bunch of libraries, patch them to be statically l Subsequent builds will only build the parquet virtual table extension. -### Building (release) +#### Building (release) Run `./make-linux-pgo` to build an instrumented binary, run tests to collect real-life usage samples, then build an optimized binary. PGO seems to give a 5-10% reduction in query times. -### Tests +#### Tests Run: @@ -61,6 +63,60 @@ sudo apt-get remove --purge sqlite3 sudo apt-get install sqlite3:amd64 ``` +## For Windows + +The following steps were performed on Windows 10 x64 system. + +### Build + +#### 1 Apache-arrow build + +Configure the environment and build Apache-arrow as follows: + +https://github.com/apache/arrow/blob/apache-arrow-0.9.0/cpp/apidoc/Windows.md + +Once the build is complete, files such as arrow.lib, arrow.dll, and so on are generated. + +#### 2 Parquet-cpp build + +Configure the environment and build Parquet-cpp as follows: + +https://github.com/apache/parquet-cpp/blob/apache-parquet-cpp-1.4.0/docs/Windows.md + +The version of boost-cpp can be specified as 1.66.0 to avoid version compatibility issues. Once the build is complete, files such as parquet.lib, parquet.dll, and so on are generated. + +#### 3 Sqlite3 build + +1 Download and extract the following three packages into the same folder. +sqlite-amalgamation-3490100.zip +sqlite-dll-win-x64-3490100.zip +sqlite-autoconf-3490100.tar.gz +2 Open the developer command prompt for VS 2017, switch to the above folder, and run the following command: + +`lib /DEF:sqlite3.def /OUT:sqlite3.lib ` + +After the command is executed, sqlite3.lib was generated. + +#### 4 sqlite-parquet-vtable (windows) build + +1 Open the parquet directory of sqlite-parquet-vtable as dll in VS2017. +2 Configure the paths for dll, lib, and header files in VS2017. +3 Modify all the “constexpr” in type.h in the source code of arrow to “const”. +4 Build this project, if successful, will generate sqlite-parquet-vtable.lib and sqlite-parquet-vtable.dll. + +### Use + +1 Create a new directory{your-directory} +2 Copy the generated arrow.dll, parquet.dll, sqlite-parquet-vtable.dll from steps 1-4 to {your directory}, and also copy all dlls from C:\local\boost_1_66_0\lib64-msvc-14.1(Your actual boost installation path.) to {your directory}. + +``` +$ sqlite\sqlite3.exe +sqlite> .load sqlite-parquet-vtable.dll +sqlite> CREATE VIRTUAL TABLE demo USING parquet('parquet-generator/99-rows-1.parquet'); +sqlite> SELECT * FROM demo; +...if all goes well, you'll see data here!... +``` + ## Supported features ### Row group filtering diff --git a/parquet-windows/parquet.cpp b/parquet-windows/parquet.cpp new file mode 100644 index 0000000..3c51e85 --- /dev/null +++ b/parquet-windows/parquet.cpp @@ -0,0 +1,760 @@ +/* +* This file contains the implementation of an SQLite virtual table for +* reading Parquet files. +* +* Usage: +* +* .load ./parquet +* CREATE VIRTUAL TABLE demo USING parquet(FILENAME); +* SELECT * FROM demo; +* +*/ +#include +SQLITE_EXTENSION_INIT1 +#include +#include +#include +#include +#include +#include +#include +// #include +#include + +#include "parquet_table.h" +#include "parquet_cursor.h" +#include "parquet_filter.h" + +//#define DEBUG + +/* Forward references to the various virtual table methods implemented + * in this file. */ +static int parquetCreate(sqlite3*, void*, int, const char*const*, + sqlite3_vtab**,char**); +static int parquetConnect(sqlite3*, void*, int, const char*const*, + sqlite3_vtab**,char**); +static int parquetBestIndex(sqlite3_vtab*,sqlite3_index_info*); +static int parquetDisconnect(sqlite3_vtab*); +static int parquetDestroy(sqlite3_vtab*); +static int parquetOpen(sqlite3_vtab*, sqlite3_vtab_cursor**); +static int parquetClose(sqlite3_vtab_cursor*); +static int parquetFilter(sqlite3_vtab_cursor*, int idxNum, const char *idxStr, + int argc, sqlite3_value **argv); +static int parquetNext(sqlite3_vtab_cursor*); +static int parquetEof(sqlite3_vtab_cursor*); +static int parquetColumn(sqlite3_vtab_cursor*,sqlite3_context*,int); +static int parquetRowid(sqlite3_vtab_cursor*,sqlite3_int64*); + +/* An instance of the Parquet virtual table */ +typedef struct sqlite3_vtab_parquet { + sqlite3_vtab base; /* Base class. Must be first */ + ParquetTable* table; + sqlite3* db; +} sqlite3_vtab_parquet; + + +/* A cursor for the Parquet virtual table */ +typedef struct sqlite3_vtab_cursor_parquet { + sqlite3_vtab_cursor base; /* Base class. Must be first */ + ParquetCursor* cursor; +} sqlite3_vtab_cursor_parquet; + +static int parquetDestroy(sqlite3_vtab *pVtab) { + sqlite3_vtab_parquet *p = (sqlite3_vtab_parquet*)pVtab; + + // Clean up our shadow table. This is useful if the user has recreated + // the parquet file, and our mappings would now be invalid. + std::string drop = "DROP TABLE IF EXISTS _"; + drop.append(p->table->getTableName()); + drop.append("_rowgroups"); + int rv = sqlite3_exec(p->db, drop.data(), 0, 0, 0); + if(rv != 0) + return rv; + + return SQLITE_OK; +} + +/* +** This method is the destructor fo a sqlite3_vtab_parquet object. +*/ +static int parquetDisconnect(sqlite3_vtab *pVtab){ + sqlite3_vtab_parquet *p = (sqlite3_vtab_parquet*)pVtab; + delete p->table; + sqlite3_free(p); + return SQLITE_OK; +} + +static int parquetConnect( + sqlite3 *db, + void *pAux, + int argc, + const char *const*argv, + sqlite3_vtab **ppVtab, + char **pzErr +){ + try { + if(argc != 4 || strlen(argv[3]) < 2) { + *pzErr = sqlite3_mprintf("must provide exactly one argument, the path to a parquet file"); + return SQLITE_ERROR; + } + + std::string tableName = argv[2]; + // Remove the delimiting single quotes + std::string fname = argv[3]; + fname = fname.substr(1, fname.length() - 2); + std::unique_ptr vtab( + (sqlite3_vtab_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_parquet)), + sqlite3_free); + memset(vtab.get(), 0, sizeof(*vtab.get())); + + try { + std::unique_ptr table(new ParquetTable(fname, tableName)); + + std::string create = table->CreateStatement(); + int rc = sqlite3_declare_vtab(db, create.data()); + if(rc) + return rc; + + vtab->table = table.release(); + vtab->db = db; + *ppVtab = (sqlite3_vtab*)vtab.release(); + return SQLITE_OK; + } catch (const std::exception& e) { + *pzErr = sqlite3_mprintf(e.what()); + return SQLITE_ERROR; + } + } catch(std::bad_alloc& ba) { + return SQLITE_NOMEM; + } catch(std::exception& e) { + return SQLITE_ERROR; + } +} + +/* +** The xConnect and xCreate methods do the same thing, but they must be +** different so that the virtual table is not an eponymous virtual table. +*/ +static int parquetCreate( + sqlite3 *db, + void *pAux, + int argc, const char *const*argv, + sqlite3_vtab **ppVtab, + char **pzErr +){ + try { + // Create shadow table for storing constraint -> rowid mappings + std::string create = "CREATE TABLE IF NOT EXISTS _"; + create.append(argv[2]); + create.append("_rowgroups(clause TEXT, estimate BLOB, actual BLOB)"); + int rv = sqlite3_exec(db, create.data(), 0, 0, 0); + if(rv != 0) + return rv; + + create = "CREATE UNIQUE INDEX IF NOT EXISTS _"; + create.append(argv[2]); + create.append("_index ON _"); + create.append(argv[2]); + create.append("_rowgroups(clause)"); + rv = sqlite3_exec(db, create.data(), 0, 0, 0); + + return parquetConnect(db, pAux, argc, argv, ppVtab, pzErr); + } catch (std::bad_alloc& ba) { + return SQLITE_NOMEM; + } +} + +std::string quoteBlob(const std::vector& bytes) { + std::ostringstream ss; + ss << "X'" << std::hex; + for(unsigned int i = 0; i < bytes.size(); i++) { + ss << std::setfill('0') << std::setw(2) << (unsigned int)(unsigned char)bytes[i]; + } + ss << "'"; + + return ss.str(); +} + +void persistConstraints(sqlite3* db, ParquetCursor* cursor) { + for(unsigned int i = 0; i < cursor->getNumConstraints(); i++) { + const Constraint& constraint = cursor->getConstraint(i); + const std::vector& estimated = constraint.bitmap.estimatedMembership; + const std::vector& actual = constraint.bitmap.actualMembership; + if(estimated == actual) { + continue; + } + std::string desc = constraint.describe(); + + std::string estimatedStr = quoteBlob(estimated); + std::string actualStr = quoteBlob(actual); + + // This is only advisory, so ignore failures. + char* sql = sqlite3_mprintf( + "INSERT OR REPLACE INTO _%s_rowgroups(clause, estimate, actual) VALUES ('%q', %s, %s)", + cursor->getTable()->getTableName().c_str(), + desc.c_str(), + estimatedStr.c_str(), + actualStr.c_str()); + + + if(sql == NULL) + return; + + sqlite3_exec(db, sql, 0, 0, 0); + sqlite3_free(sql); + } +} + + +/* +** Destructor for a sqlite3_vtab_cursor_parquet. +*/ +static int parquetClose(sqlite3_vtab_cursor *cur){ + sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur; + vtab_cursor_parquet->cursor->close(); + delete vtab_cursor_parquet->cursor; + sqlite3_free(cur); + return SQLITE_OK; +} + +/* +** Constructor for a new sqlite3_vtab_parquet cursor object. +*/ +static int parquetOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor){ + try { + std::unique_ptr cursor( + (sqlite3_vtab_cursor_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_cursor_parquet)), + sqlite3_free); + memset(cursor.get(), 0, sizeof(*cursor.get())); + + sqlite3_vtab_parquet* pParquet = (sqlite3_vtab_parquet*)p; + cursor->cursor = new ParquetCursor(pParquet->table); + + *ppCursor = (sqlite3_vtab_cursor*)cursor.release(); + return SQLITE_OK; + } catch(std::bad_alloc& ba) { + return SQLITE_NOMEM; + } catch(std::exception& e) { + return SQLITE_ERROR; + } +} + + +/* +** Advance a sqlite3_vtab_cursor_parquet to its next row of input. +** Set the EOF marker if we reach the end of input. +*/ +static int parquetNext(sqlite3_vtab_cursor *cur){ + try { + sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur; + ParquetCursor* cursor = vtab_cursor_parquet->cursor; + cursor->next(); + return SQLITE_OK; + } catch(std::bad_alloc& ba) { + return SQLITE_NOMEM; + } catch(std::exception& e) { + return SQLITE_ERROR; + } +} + +/* +** Return values of columns for the row at which the sqlite3_vtab_cursor_parquet +** is currently pointing. +*/ +static int parquetColumn( + sqlite3_vtab_cursor *cur, /* The cursor */ + sqlite3_context *ctx, /* First argument to sqlite3_result_...() */ + int col /* Which column to return */ +){ + try { + ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; + cursor->ensureColumn(col); + + if(cursor->isNull(col)) { + sqlite3_result_null(ctx); + } else { + switch(cursor->getPhysicalType(col)) { + case parquet::Type::BOOLEAN: + case parquet::Type::INT32: + { + int rv = cursor->getInt32(col); + sqlite3_result_int(ctx, rv); + break; + } + case parquet::Type::FLOAT: + case parquet::Type::DOUBLE: + { + double rv = cursor->getDouble(col); + sqlite3_result_double(ctx, rv); + break; + } + case parquet::Type::BYTE_ARRAY: + { + parquet::ByteArray* rv = cursor->getByteArray(col); + if(cursor->getLogicalType(col) == parquet::LogicalType::UTF8) { + sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT); + } else { + sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT); + } + break; + } + case parquet::Type::INT96: + // This type exists to store timestamps in nanoseconds due to legacy + // reasons. We just interpret it as a timestamp in milliseconds. + case parquet::Type::INT64: + { + long rv = cursor->getInt64(col); + sqlite3_result_int64(ctx, rv); + break; + } + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + { + parquet::ByteArray* rv = cursor->getByteArray(col); + sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT); + break; + } + default: + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << + parquet::TypeToString(cursor->getPhysicalType(col)); + + throw std::invalid_argument(ss.str()); + break; + } + } + return SQLITE_OK; + } catch(std::bad_alloc& ba) { + return SQLITE_NOMEM; + } catch(std::exception& e) { + return SQLITE_ERROR; + } +} + +/* +** Return the rowid for the current row. +*/ +static int parquetRowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid){ + ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; + *pRowid = cursor->getRowId(); + return SQLITE_OK; +} + +/* +** Return TRUE if the cursor has been moved off of the last +** row of output. +*/ +static int parquetEof(sqlite3_vtab_cursor *cur){ + ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; + if(cursor->eof()) { + sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur; + sqlite3_vtab_parquet* vtab_parquet = (sqlite3_vtab_parquet*)(vtab_cursor_parquet->base.pVtab); + persistConstraints(vtab_parquet->db, cursor); + return 1; + } + return 0; +} + +#ifdef DEBUG +const char* opName(int op) { + switch(op) { + case SQLITE_INDEX_CONSTRAINT_EQ: + return "="; + case SQLITE_INDEX_CONSTRAINT_GT: + return ">"; + case SQLITE_INDEX_CONSTRAINT_LE: + return "<="; + case SQLITE_INDEX_CONSTRAINT_LT: + return "<"; + case SQLITE_INDEX_CONSTRAINT_GE: + return ">="; + case SQLITE_INDEX_CONSTRAINT_MATCH: + return "match"; + case SQLITE_INDEX_CONSTRAINT_LIKE: + return "LIKE"; + case SQLITE_INDEX_CONSTRAINT_GLOB: + return "GLOB"; + case SQLITE_INDEX_CONSTRAINT_REGEXP: + return "REGEXP"; + case SQLITE_INDEX_CONSTRAINT_NE: + return "!="; + case SQLITE_INDEX_CONSTRAINT_ISNOT: + return "IS NOT"; + case SQLITE_INDEX_CONSTRAINT_ISNOTNULL: + return "IS NOT NULL"; + case SQLITE_INDEX_CONSTRAINT_ISNULL: + return "IS NULL"; + case SQLITE_INDEX_CONSTRAINT_IS: + return "IS"; + default: + return "unknown"; + } +} + +void debugConstraints(sqlite3_index_info *pIdxInfo, ParquetTable *table, int argc, sqlite3_value** argv) { + printf("debugConstraints, argc=%d\n", argc); + int j = 0; + for(int i = 0; i < pIdxInfo->nConstraint; i++) { + std::string valueStr = "?"; + if(argv != NULL && pIdxInfo->aConstraint[i].usable) { + int type = sqlite3_value_type(argv[j]); + switch(type) { + case SQLITE_INTEGER: + { + sqlite3_int64 rv = sqlite3_value_int64(argv[j]); + std::ostringstream ss; + ss << rv; + valueStr = ss.str(); + break; + } + case SQLITE_FLOAT: + { + double rv = sqlite3_value_double(argv[j]); + std::ostringstream ss; + ss << rv; + valueStr = ss.str(); + break; + } + case SQLITE_TEXT: + { + const unsigned char* rv = sqlite3_value_text(argv[j]); + std::ostringstream ss; + ss << "'" << rv << "'"; + valueStr = ss.str(); + break; + } + case SQLITE_BLOB: + { + int sizeBytes = sqlite3_value_bytes(argv[j]); + std::ostringstream ss; + ss << "'..." << sizeBytes << "-byte blob...'"; + valueStr = ss.str(); + break; + } + case SQLITE_NULL: + { + valueStr = "NULL"; + break; + } + } + j++; + } + printf(" constraint %d: col %s %s %s, usable %d\n", + i, + table->columnName(pIdxInfo->aConstraint[i].iColumn).data(), + opName(pIdxInfo->aConstraint[i].op), + valueStr.data(), + pIdxInfo->aConstraint[i].usable); + } +} +#endif + +ConstraintOperator constraintOperatorFromSqlite(int op) { + switch(op) { + case SQLITE_INDEX_CONSTRAINT_EQ: + return Equal; + case SQLITE_INDEX_CONSTRAINT_GT: + return GreaterThan; + case SQLITE_INDEX_CONSTRAINT_LE: + return LessThanOrEqual; + case SQLITE_INDEX_CONSTRAINT_LT: + return LessThan; + case SQLITE_INDEX_CONSTRAINT_GE: + return GreaterThanOrEqual; + case SQLITE_INDEX_CONSTRAINT_LIKE: + return Like; + case SQLITE_INDEX_CONSTRAINT_GLOB: + return Glob; + case SQLITE_INDEX_CONSTRAINT_NE: + return NotEqual; + case SQLITE_INDEX_CONSTRAINT_ISNOT: + return IsNot; + case SQLITE_INDEX_CONSTRAINT_ISNOTNULL: + return IsNotNull; + case SQLITE_INDEX_CONSTRAINT_ISNULL: + return IsNull; + case SQLITE_INDEX_CONSTRAINT_IS: + return Is; + } + + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": operator " << op << " is unsupported"; + throw std::invalid_argument(ss.str()); +} + +std::vector getRowGroupsForClause(sqlite3* db, std::string table, std::string clause) { + std::vector rv; + + std::unique_ptr sql(sqlite3_mprintf( + "SELECT actual FROM _%s_rowgroups WHERE clause = '%q'", + table.c_str(), + clause.c_str()), sqlite3_free); + + if(sql.get() == NULL) + return rv; + + sqlite3_stmt* pStmt = NULL; + int rc = sqlite3_prepare_v2(db, sql.get(), -1, &pStmt, NULL); + if(rc != 0) + return rv; + + rc = sqlite3_step(pStmt); + if(rc == SQLITE_ROW) { + int size = sqlite3_column_bytes(pStmt, 0); + unsigned char* blob = (unsigned char*)sqlite3_column_blob(pStmt, 0); + // TODO: there is a memory leak here if we get a std::bad_alloc while populating rv; + // we fail to free pStmt + for(int i = 0; i < size; i++) { + rv.push_back(blob[i]); + } + } + + sqlite3_finalize(pStmt); + return rv; +} + + +/* +** Only a full table scan is supported. So xFilter simply rewinds to +** the beginning. +*/ +static int parquetFilter( + sqlite3_vtab_cursor *cur, + int idxNum, + const char *idxStr, + int argc, + sqlite3_value **argv +){ + try { + sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur; + sqlite3_vtab_parquet* vtab_parquet = (sqlite3_vtab_parquet*)(vtab_cursor_parquet->base.pVtab); + sqlite3* db = vtab_parquet->db; + ParquetCursor* cursor = vtab_cursor_parquet->cursor; + sqlite3_index_info* indexInfo = (sqlite3_index_info*)idxStr; + +#ifdef DEBUG + struct timeval tv; + gettimeofday(&tv, NULL); + unsigned long long millisecondsSinceEpoch = + (unsigned long long)(tv.tv_sec) * 1000 + + (unsigned long long)(tv.tv_usec) / 1000; + + printf("%llu xFilter: idxNum=%d, idxStr=%lu, argc=%d\n", millisecondsSinceEpoch, idxNum, (long unsigned int)idxStr, argc); + debugConstraints(indexInfo, cursor->getTable(), argc, argv); +#endif + std::vector constraints; + int j = 0; + for(int i = 0; i < indexInfo->nConstraint; i++) { + if(!indexInfo->aConstraint[i].usable) { + continue; + } + + ValueType type = Null; + int64_t intValue = 0; + double doubleValue = 0; + std::vector blobValue; + int sqliteType = sqlite3_value_type(argv[j]); + + if(sqliteType == SQLITE_INTEGER) { + type = Integer; + intValue = sqlite3_value_int64(argv[j]); + } else if(sqliteType == SQLITE_FLOAT) { + type = Double; + doubleValue = sqlite3_value_double(argv[j]); + } else if(sqliteType == SQLITE_TEXT) { + type = Text; + int len = sqlite3_value_bytes(argv[j]); + const unsigned char* ptr = sqlite3_value_text(argv[j]); + for(int k = 0; k < len; k++) { + blobValue.push_back(ptr[k]); + } + } else if(sqliteType == SQLITE_BLOB) { + type = Blob; + int len = sqlite3_value_bytes(argv[j]); + const unsigned char* ptr = (const unsigned char*)sqlite3_value_blob(argv[j]); + for(int k = 0; k < len; k++) { + blobValue.push_back(ptr[k]); + } + } else if(sqliteType == SQLITE_NULL) { + type = Null; + } + + std::string columnName = "rowid"; + if(indexInfo->aConstraint[i].iColumn >= 0) { + columnName = cursor->getTable()->columnName(indexInfo->aConstraint[i].iColumn); + } + + RowGroupBitmap bitmap = RowGroupBitmap(cursor->getNumRowGroups()); + Constraint dummy( + bitmap, + indexInfo->aConstraint[i].iColumn, + columnName, + constraintOperatorFromSqlite(indexInfo->aConstraint[i].op), + type, + intValue, + doubleValue, + blobValue); + + std::vector actual = getRowGroupsForClause(db, cursor->getTable()->getTableName(), dummy.describe()); + if(actual.size() > 0) { + // Initialize the estimate to be the actual -- eventually they'll converge + // and we'll stop writing back to the db. + std::vector estimate = actual; + bitmap = RowGroupBitmap(estimate, actual); + } + + Constraint constraint( + bitmap, + indexInfo->aConstraint[i].iColumn, + columnName, + constraintOperatorFromSqlite(indexInfo->aConstraint[i].op), + type, + intValue, + doubleValue, + blobValue); + + constraints.push_back(constraint); + j++; + } + cursor->reset(constraints); + return parquetNext(cur); + } catch(std::bad_alloc& ba) { + return SQLITE_NOMEM; + } catch(std::exception& e) { + return SQLITE_ERROR; + } +} + +/* +* We'll always indicate to SQLite that we prefer it to use an index so that it will +* pass additional context to xFilter, which we may or may not use. +* +* We copy the sqlite3_index_info structure, as is, into idxStr for later use. +*/ +static int parquetBestIndex( + sqlite3_vtab *tab, + sqlite3_index_info *pIdxInfo +){ + try { + +#ifdef DEBUG + struct timeval tv; + gettimeofday(&tv, NULL); + unsigned long long millisecondsSinceEpoch = + (unsigned long long)(tv.tv_sec) * 1000 + + (unsigned long long)(tv.tv_usec) / 1000; + + + ParquetTable* table = ((sqlite3_vtab_parquet*)tab)->table; + printf("%llu xBestIndex: nConstraint=%d, nOrderBy=%d\n", millisecondsSinceEpoch, pIdxInfo->nConstraint, pIdxInfo->nOrderBy); + debugConstraints(pIdxInfo, table, 0, NULL); +#endif + // We traverse in rowid ascending order, so if they're asking for it to be ordered like that, + // we can tell SQLite that it's guaranteed. This speeds up some DB viewer utilities that + // use rowids for pagination. + if(pIdxInfo->nOrderBy == 1 && pIdxInfo->aOrderBy[0].iColumn == -1 && pIdxInfo->aOrderBy[0].desc == 0) + pIdxInfo->orderByConsumed = 1; + + if(pIdxInfo->nConstraint == 0) { + pIdxInfo->estimatedCost = 1000000000000; + pIdxInfo->idxNum = 0; + } else { + pIdxInfo->estimatedCost = 1; + pIdxInfo->idxNum = 1; + int j = 0; + + for(int i = 0; i < pIdxInfo->nConstraint; i++) { + if(pIdxInfo->aConstraint[i].usable) { + j++; + pIdxInfo->aConstraintUsage[i].argvIndex = j; +// pIdxInfo->aConstraintUsage[i].omit = 1; + } + } + } + + size_t dupeSize = sizeof(sqlite3_index_info) + + //pIdxInfo->nConstraint * sizeof(sqlite3_index_constraint) + + pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint) + + pIdxInfo->nOrderBy * sizeof(sqlite3_index_info::sqlite3_index_orderby) + + pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint_usage); + sqlite3_index_info* dupe = (sqlite3_index_info*)sqlite3_malloc(dupeSize); + pIdxInfo->idxStr = (char*)dupe; + pIdxInfo->needToFreeIdxStr = 1; + + memset(dupe, 0, dupeSize); + memcpy(dupe, pIdxInfo, sizeof(sqlite3_index_info)); + + dupe->aConstraint = (sqlite3_index_info::sqlite3_index_constraint*)((char*)dupe + sizeof(sqlite3_index_info)); + dupe->aOrderBy = (sqlite3_index_info::sqlite3_index_orderby*)((char*)dupe + + sizeof(sqlite3_index_info) + + pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint)); + dupe->aConstraintUsage = (sqlite3_index_info::sqlite3_index_constraint_usage*)((char*)dupe + + sizeof(sqlite3_index_info) + + pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint) + + pIdxInfo->nOrderBy * sizeof(sqlite3_index_info::sqlite3_index_orderby)); + + + for(int i = 0; i < pIdxInfo->nConstraint; i++) { + dupe->aConstraint[i].iColumn = pIdxInfo->aConstraint[i].iColumn; + dupe->aConstraint[i].op = pIdxInfo->aConstraint[i].op; + dupe->aConstraint[i].usable = pIdxInfo->aConstraint[i].usable; + dupe->aConstraint[i].iTermOffset = pIdxInfo->aConstraint[i].iTermOffset; + + dupe->aConstraintUsage[i].argvIndex = pIdxInfo->aConstraintUsage[i].argvIndex; + dupe->aConstraintUsage[i].omit = pIdxInfo->aConstraintUsage[i].omit; + } + + for(int i = 0; i < pIdxInfo->nOrderBy; i++) { + dupe->aOrderBy[i].iColumn = pIdxInfo->aOrderBy[i].iColumn; + dupe->aOrderBy[i].desc = pIdxInfo->aOrderBy[i].desc; + } + + return SQLITE_OK; + } catch(std::bad_alloc& ba) { + return SQLITE_NOMEM; + } catch(std::exception& e) { + return SQLITE_ERROR; + } +} + + +static sqlite3_module ParquetModule = { + 0, /* iVersion */ + parquetCreate, /* xCreate */ + parquetConnect, /* xConnect */ + parquetBestIndex, /* xBestIndex */ + parquetDisconnect, /* xDisconnect */ + parquetDestroy, /* xDestroy */ + parquetOpen, /* xOpen - open a cursor */ + parquetClose, /* xClose - close a cursor */ + parquetFilter, /* xFilter - configure scan constraints */ + parquetNext, /* xNext - advance a cursor */ + parquetEof, /* xEof - check for end of scan */ + parquetColumn, /* xColumn - read data */ + parquetRowid, /* xRowid - read data */ + 0, /* xUpdate */ + 0, /* xBegin */ + 0, /* xSync */ + 0, /* xCommit */ + 0, /* xRollback */ + 0, /* xFindMethod */ + 0, /* xRename */ +}; + +/* +* This routine is called when the extension is loaded. The new +* Parquet virtual table module is registered with the calling database +* connection. +*/ +extern "C" __declspec(dllexport) + int sqlite3_extension_init( + sqlite3 *db, + char **pzErrMsg, + const sqlite3_api_routines *pApi + ){ + int rc; + SQLITE_EXTENSION_INIT2(pApi); + rc = sqlite3_create_module(db, "parquet", &ParquetModule, 0); + return rc; + } + diff --git a/parquet-windows/parquet_cursor.cpp b/parquet-windows/parquet_cursor.cpp new file mode 100644 index 0000000..1fe6e4f --- /dev/null +++ b/parquet-windows/parquet_cursor.cpp @@ -0,0 +1,1003 @@ +#include "parquet_cursor.h" +#include +#include + +ParquetCursor::ParquetCursor(ParquetTable* table): table(table) { + reader = NULL; + reset(std::vector()); +} + +bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint) { + if(constraint.type != Integer) + return true; + + int64_t target = constraint.intValue; + switch(constraint.op) { + case IsNull: + return false; + case Is: + case Equal: + return target >= rowId && target < rowId + rowGroupSize; + case GreaterThan: + // rowId > target + return rowId + rowGroupSize > target; + case GreaterThanOrEqual: + // rowId >= target + return rowId + rowGroupSize >= rowId; + case LessThan: + return target > rowId; + case LessThanOrEqual: + return target >= rowId; + default: + return true; + } +} + +bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr _stats) { + if(!_stats->HasMinMax()) { + return true; + } + + if(constraint.type != Blob) { + return true; + } + + const unsigned char* minPtr = NULL; + const unsigned char* maxPtr = NULL; + size_t minLen = 0; + size_t maxLen = 0; + + parquet::Type::type pqType = types[constraint.column]; + + if(pqType == parquet::Type::BYTE_ARRAY) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + minPtr = stats->min().ptr; + minLen = stats->min().len; + maxPtr = stats->max().ptr; + maxLen = stats->max().len; + } else if(pqType == parquet::Type::FIXED_LEN_BYTE_ARRAY) { + // It seems like parquet-cpp doesn't actually produce stats for FLBA yet, so + // rather than have untested code here, we'll just short circuit. + // + // Once I can get my hands on such a file, it should be easy to add support. + return true; + } else { + // Should be impossible to get here + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesBlobFilter called on unsupported type: " << + parquet::TypeToString(pqType); + throw std::invalid_argument(ss.str()); + } + + const std::vector& blob = constraint.blobValue; + + switch(constraint.op) { + case Is: + case Equal: + { + bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0; + bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; + + bool blobGteMinBlob = std::lexicographical_compare( + minPtr, + minPtr + minLen, + &blob[0], + &blob[0] + blob.size()); + + bool blobLtMaxBlob = std::lexicographical_compare( + &blob[0], + &blob[0] + blob.size(), + maxPtr, + maxPtr + maxLen); + + + return (minEqual || blobGteMinBlob) && (maxEqual || blobLtMaxBlob); + } + case GreaterThanOrEqual: + { + bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; + + return maxEqual || std::lexicographical_compare( + &blob[0], + &blob[0] + blob.size(), + maxPtr, + maxPtr + maxLen); + } + case GreaterThan: + return std::lexicographical_compare( + &blob[0], + &blob[0] + blob.size(), + maxPtr, + maxPtr + maxLen); + case LessThan: + return std::lexicographical_compare( + minPtr, + minPtr + minLen, + &blob[0], + &blob[0] + blob.size()); + case LessThanOrEqual: + { + bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0; + return minEqual || std::lexicographical_compare( + minPtr, + minPtr + minLen, + &blob[0], + &blob[0] + blob.size()); + } + case NotEqual: + { + // If min == max == blob, we can skip this. + bool blobMaxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; + bool minMaxEqual = minLen == maxLen && memcmp(minPtr, maxPtr, minLen) == 0; + return !(blobMaxEqual && minMaxEqual); + } + case IsNot: + default: + return true; + } +} + +bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr _stats) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + if(!stats->HasMinMax()) { + return true; + } + + if(constraint.type != Text) { + return true; + } + + const std::string& str = constraint.stringValue; + const parquet::ByteArray& min = stats->min(); + const parquet::ByteArray& max = stats->max(); + std::string minStr((const char*)min.ptr, min.len); + std::string maxStr((const char*)max.ptr, max.len); +// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); + + switch(constraint.op) { + case Is: + case Equal: + return str >= minStr && str <= maxStr; + case GreaterThanOrEqual: + return maxStr >= str; + case GreaterThan: + return maxStr > str; + case LessThan: + return minStr < str; + case LessThanOrEqual: + return minStr <= str; + case NotEqual: + // If min == max == str, we can skip this. + return !(minStr == maxStr && str == minStr); + case Like: + { + const std::string& likeStringValue = constraint.likeStringValue; + std::string truncatedMin = minStr.substr(0, likeStringValue.size()); + std::string truncatedMax = maxStr.substr(0, likeStringValue.size()); + return likeStringValue.empty() || (likeStringValue >= truncatedMin && likeStringValue <= truncatedMax); + } + case IsNot: + default: + return true; + } +} + +#if defined(_MSC_VER) && !defined(__clang__) +#include +#define UMUL128 _umul128 +#else + +static inline uint64_t UMUL128(uint64_t a, uint64_t b, uint64_t* hi) { + __uint128_t result = (__uint128_t)a * b; + *hi = result >> 64; + return (uint64_t)result; +} +#endif + +int64_t int96toMsSinceEpoch(const parquet::Int96& rv) { + const uint64_t ns_low = rv.value[0]; + const uint64_t ns_high = static_cast(rv.value[1]) << 32; + const uint64_t ns = ns_low + ns_high; + + const uint64_t julianDay = rv.value[2]; + const uint64_t daysSinceEpoch = julianDay - 2440588ULL; + + uint64_t days_sec_high, days_sec_low; + days_sec_low = UMUL128(daysSinceEpoch, 86400ULL, &days_sec_high); + + uint64_t total_ns_high, total_ns_mid, total_ns_low; + total_ns_low = UMUL128(days_sec_low, 1000000000ULL, &total_ns_mid); + total_ns_high = days_sec_high * 1000000000ULL + total_ns_mid; + + uint64_t final_low = total_ns_low + ns; + uint64_t carry = (final_low < total_ns_low) ? 1 : 0; + uint64_t final_high = total_ns_high + carry; + + const uint64_t divisor = 1000000ULL; + + uint64_t high_quotient = final_high / divisor; + uint64_t high_remainder = final_high % divisor; + + uint64_t low_combined = (high_remainder << 64) | final_low; + uint64_t low_quotient = low_combined / divisor; + + return static_cast((high_quotient << 64) + low_quotient); +} + +bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr _stats) { + if(!_stats->HasMinMax()) { + return true; + } + + if(constraint.type != Integer) { + return true; + } + + int column = constraint.column; + + int64_t min = std::numeric_limits::min(); + int64_t max = std::numeric_limits::max(); + parquet::Type::type pqType = types[column]; + + if(pqType == parquet::Type::INT32) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + min = stats->min(); + max = stats->max(); + } else if(pqType == parquet::Type::INT64) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + min = stats->min(); + max = stats->max(); + } else if(pqType == parquet::Type::INT96) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + min = int96toMsSinceEpoch(stats->min()); + max = int96toMsSinceEpoch(stats->max()); + + } else if(pqType == parquet::Type::BOOLEAN) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + min = stats->min(); + max = stats->max(); + + } else { + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " << + parquet::TypeToString(pqType); + throw std::invalid_argument(ss.str()); + } + + const int64_t value = constraint.intValue; +// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); + + switch(constraint.op) { + case Is: + case Equal: + return value >= min && value <= max; + case GreaterThanOrEqual: + return max >= value; + case GreaterThan: + return max > value; + case LessThan: + return min < value; + case LessThanOrEqual: + return min <= value; + case NotEqual: + // If min == max == str, we can skip this. + return !(min == max && value == min); + case Like: + case IsNot: + default: + return true; + } + + return true; +} + +bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr _stats) { + if(!_stats->HasMinMax()) { + return true; + } + + if(constraint.type != Double) { + return true; + } + + int column = constraint.column; + + double min = std::numeric_limits::min(); + double max = std::numeric_limits::max(); + parquet::Type::type pqType = types[column]; + + if(pqType == parquet::Type::DOUBLE) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + min = stats->min(); + max = stats->max(); + } else if(pqType == parquet::Type::FLOAT) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + min = stats->min(); + max = stats->max(); + } else { + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " << + parquet::TypeToString(pqType); + throw std::invalid_argument(ss.str()); + } + + const double value = constraint.doubleValue; +// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); + + switch(constraint.op) { + case Is: + case Equal: + return value >= min && value <= max; + case GreaterThanOrEqual: + return max >= value; + case GreaterThan: + return max > value; + case LessThan: + return min < value; + case LessThanOrEqual: + return min <= value; + case NotEqual: + // If min == max == str, we can skip this. + return !(min == max && value == min); + case Like: + case IsNot: + default: + return true; + } + + return true; + +} + +bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint& constraint) { + if(constraint.type != Text) { + return true; + } + + parquet::ByteArray* ba = getByteArray(constraint.column); + + switch(constraint.op) { + case Is: + case Equal: + { + const std::vector& blob = constraint.blobValue; + + if(blob.size() != ba->len) + return false; + + return 0 == memcmp(&blob[0], ba->ptr, ba->len); + } + case NotEqual: + { + const std::vector& blob = constraint.blobValue; + + if(blob.size() != ba->len) + return true; + + return 0 != memcmp(&blob[0], ba->ptr, ba->len); + } + case GreaterThan: + { + const std::vector& blob = constraint.blobValue; + + return std::lexicographical_compare( + &blob[0], + &blob[0] + blob.size(), + ba->ptr, + ba->ptr + ba->len); + } + case GreaterThanOrEqual: + { + const std::vector& blob = constraint.blobValue; + + bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len); + + return equal || std::lexicographical_compare( + &blob[0], + &blob[0] + blob.size(), + ba->ptr, + ba->ptr + ba->len); + } + case LessThan: + { + const std::vector& blob = constraint.blobValue; + + return std::lexicographical_compare( + ba->ptr, + ba->ptr + ba->len, + &blob[0], + &blob[0] + blob.size()); + } + case LessThanOrEqual: + { + const std::vector& blob = constraint.blobValue; + + bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len); + + return equal || std::lexicographical_compare( + ba->ptr, + ba->ptr + ba->len, + &blob[0], + &blob[0] + blob.size()); + } + case Like: + { + const std::string& likeStringValue = constraint.likeStringValue; + if(likeStringValue.size() > ba->len) + return false; + + size_t len = ba->len; + if(likeStringValue.size() < len) + len = likeStringValue.size(); + return 0 == memcmp(&likeStringValue[0], ba->ptr, len); + } + case IsNot: + default: + return true; + } +} + +bool ParquetCursor::currentRowSatisfiesIntegerFilter(Constraint& constraint) { + if(constraint.type != Integer) { + return true; + } + + int column = constraint.column; + + // CONSIDER: should we just store int64s everywhere? + int64_t value = 0; + + if(column == -1) { + value = rowId; + } else { + parquet::Type::type pqType = types[column]; + + if(pqType == parquet::Type::INT32 || pqType == parquet::Type::BOOLEAN) { + value = getInt32(column); + } else if(pqType == parquet::Type::INT64 || pqType == parquet::Type::INT96) { + value = getInt64(column); + } else { + // Should be impossible to get here + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": currentRowSatisfiesIntegerFilter called on unsupported type: " << + parquet::TypeToString(pqType); + throw std::invalid_argument(ss.str()); + } + } + + int64_t constraintValue = constraint.intValue; + + switch(constraint.op) { + case Is: + case Equal: + return constraintValue == value; + case NotEqual: + return constraintValue != value; + case GreaterThan: + return value > constraintValue; + case GreaterThanOrEqual: + return value >= constraintValue; + case LessThan: + return value < constraintValue; + case LessThanOrEqual: + return value <= constraintValue; + case Like: + case IsNot: + default: + return true; + } + + return true; +} + +bool ParquetCursor::currentRowSatisfiesDoubleFilter(Constraint& constraint) { + if(constraint.type != Double) { + return true; + } + + int column = constraint.column; + double value = getDouble(column); + double constraintValue = constraint.doubleValue; + + switch(constraint.op) { + case Is: + case Equal: + return constraintValue == value; + case NotEqual: + return constraintValue != value; + case GreaterThan: + return value > constraintValue; + case GreaterThanOrEqual: + return value >= constraintValue; + case LessThan: + return value < constraintValue; + case LessThanOrEqual: + return value <= constraintValue; + case Like: + case IsNot: + default: + return true; + } + + return true; +} + + +// Return true if it is _possible_ that the current +// rowgroup satisfies the constraints. Only return false +// if it definitely does not. +// +// This avoids opening rowgroups that can't return useful +// data, which provides substantial performance benefits. +bool ParquetCursor::currentRowGroupSatisfiesFilter() { + for(unsigned int i = 0; i < constraints.size(); i++) { + int column = constraints[i].column; + int op = constraints[i].op; + bool rv = true; + + if(column == -1) { + rv = currentRowGroupSatisfiesRowIdFilter(constraints[i]); + } else { + std::unique_ptr md = rowGroupMetadata->ColumnChunk(column); + if(md->is_stats_set()) { + std::shared_ptr stats = md->statistics(); + + // SQLite is much looser with types than you might expect if you + // come from a Postgres background. The constraint '30.0' (that is, + // a string containing a floating point number) should be treated + // as equal to a field containing an integer 30. + // + // This means that even if the parquet physical type is integer, + // the constraint type may be a string, so dispatch to the filter + // fn based on the Parquet type. + + if(op == IsNull) { + rv = stats->null_count() > 0; + } else if(op == IsNotNull) { + rv = stats->num_values() > 0; + } else { + parquet::Type::type pqType = types[column]; + + if(pqType == parquet::Type::BYTE_ARRAY && logicalTypes[column] == parquet::LogicalType::UTF8) { + rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats); + } else if(pqType == parquet::Type::BYTE_ARRAY) { + rv = currentRowGroupSatisfiesBlobFilter(constraints[i], stats); + } else if(pqType == parquet::Type::INT32 || + pqType == parquet::Type::INT64 || + pqType == parquet::Type::INT96 || + pqType == parquet::Type::BOOLEAN) { + rv = currentRowGroupSatisfiesIntegerFilter(constraints[i], stats); + } else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) { + rv = currentRowGroupSatisfiesDoubleFilter(constraints[i], stats); + } + } + } + } + + // and it with the existing actual, which may have come from a previous run + rv = rv && constraints[i].bitmap.getActualMembership(rowGroupId); + if(!rv) { + constraints[i].bitmap.setEstimatedMembership(rowGroupId, rv); + constraints[i].bitmap.setActualMembership(rowGroupId, rv); + return rv; + } + } + +// printf("rowGroup %d %s\n", rowGroupId, overallRv ? "may satisfy" : "does not satisfy"); + return true; +} + + +bool ParquetCursor::nextRowGroup() { +start: + // Ensure that rowId points at the start of this rowGroup (eg, in the case where + // we skipped an entire row group). + rowId = rowGroupStartRowId + rowGroupSize; + + if((rowGroupId + 1) >= numRowGroups) { + return false; + } + + while(table->getNumColumns() >= scanners.size()) { + scanners.push_back(std::shared_ptr()); + // If it doesn't exist, it's the rowId as of the last nextRowGroup call + colRows.push_back(rowGroupStartRowId); + colNulls.push_back(false); + colIntValues.push_back(0); + colDoubleValues.push_back(0); + colByteArrayValues.push_back(parquet::ByteArray()); + } + + + rowGroupStartRowId = rowId; + rowGroupId++; + rowGroupMetadata = reader->metadata()->RowGroup(rowGroupId); + rowGroupSize = rowsLeftInRowGroup = rowGroupMetadata->num_rows(); + rowGroup = reader->RowGroup(rowGroupId); + for(unsigned int i = 0; i < scanners.size(); i++) + scanners[i] = NULL; + + while(types.size() < (unsigned int)rowGroupMetadata->num_columns()) { + types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type()); + } + + while(logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) { + logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type()); + } + + for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) { + types[i] = rowGroupMetadata->schema()->Column(i)->physical_type(); + logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type(); + } + + for(unsigned int i = 0; i < colRows.size(); i++) { + colRows[i] = rowId; + } + + // Increment rowId so currentRowGroupSatisfiesRowIdFilter can access it; + // it'll get decremented by our caller + rowId++; + + // We're going to scan this row group; reset the expectation of discovering + // a row + for(unsigned int i = 0; i < constraints.size(); i++) { + if(rowGroupId > 0 && constraints[i].rowGroupId == rowGroupId - 1) { + constraints[i].bitmap.setActualMembership(rowGroupId - 1, constraints[i].hadRows); + } + constraints[i].hadRows = false; + } + + if(!currentRowGroupSatisfiesFilter()) + goto start; + + for(unsigned int i = 0; i < constraints.size(); i++) { + constraints[i].rowGroupId = rowGroupId; + } + return true; +} + +// Return true if it is _possible_ that the current +// row satisfies the constraints. Only return false +// if it definitely does not. +// +// This avoids pointless transitions between the SQLite VM +// and the extension, which can add up on a dataset of tens +// of millions of rows. +bool ParquetCursor::currentRowSatisfiesFilter() { + bool overallRv = true; + for(unsigned int i = 0; i < constraints.size(); i++) { + bool rv = true; + int column = constraints[i].column; + ensureColumn(column); + int op = constraints[i].op; + + if(op == IsNull) { + rv = isNull(column); + } else if(op == IsNotNull) { + rv = !isNull(column); + } else { + + if(logicalTypes[column] == parquet::LogicalType::UTF8) { + rv = currentRowSatisfiesTextFilter(constraints[i]); + } else { + parquet::Type::type pqType = types[column]; + if(pqType == parquet::Type::INT32 || + pqType == parquet::Type::INT64 || + pqType == parquet::Type::INT96 || + pqType == parquet::Type::BOOLEAN) { + rv = currentRowSatisfiesIntegerFilter(constraints[i]); + } else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) { + rv = currentRowSatisfiesDoubleFilter(constraints[i]); + } + } + } + + // it defaults to false; so only set it if true + // ideally we'd short-circuit if we'd already set this group as visited + if(rv) { + constraints[i].hadRows = true; + } + overallRv = overallRv && rv; + } + return overallRv; +} + +void ParquetCursor::next() { + // Returns true if we've crossed a row group boundary +start: + if(rowsLeftInRowGroup == 0) { + if(!nextRowGroup()) { + // put rowId over the edge so eof returns true + rowId = numRows + 1; + return; + } else { + // After a successful nextRowGroup, rowId is pointing at the current row. Make it + // point before so the rest of the logic works out. + rowId--; + } + } + + rowsLeftInRowGroup--; + rowId++; + if(constraints.size() > 0 && !currentRowSatisfiesFilter()) + goto start; +} + +int ParquetCursor::getRowId() { + return rowId; +} + +bool ParquetCursor::eof() { + return rowId > numRows; +} + +void ParquetCursor::ensureColumn(int col) { + // -1 signals rowid, which is trivially available + if(col == -1) + return; + + // need to ensure a scanner exists (and skip the # of rows in the rowgroup) + if(scanners[col].get() == NULL) { + std::shared_ptr colReader = rowGroup->Column(col); + scanners[col] = parquet::Scanner::Make(colReader); + } + + // Actually fetch a value, stash data in colRows, colNulls, colValues + if(colRows[col] != rowId) { + // We may need to skip some records, eg, a query like + // SELECT a WHERE b = 10 + // may have read b, but skipped a until b matches the predicate. + bool wasNull = false; + while(colRows[col] + 1 < rowId) { + switch(types[col]) { + case parquet::Type::INT32: + { + parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get(); + int rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::FLOAT: + { + parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get(); + float rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::DOUBLE: + { + parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); + double rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::BYTE_ARRAY: + { + parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); + parquet::ByteArray ba; + s->NextValue(&ba, &wasNull); + break; + } + case parquet::Type::INT96: + { + parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); + parquet::Int96 rv; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::INT64: + { + parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); + int64_t rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::BOOLEAN: + { + parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get(); + bool rv = false; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + { + parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get(); + parquet::FixedLenByteArray flba; + s->NextValue(&flba, &wasNull); + break; + } + default: + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << + parquet::TypeToString(types[col]); + throw std::invalid_argument(ss.str()); + break; + + } + colRows[col]++; + } + + colRows[col] = rowId; + wasNull = false; + + bool hadValue = false; + switch(types[col]) { + case parquet::Type::INT32: + { + parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get(); + int rv = 0; + hadValue = s->NextValue(&rv, &wasNull); + colIntValues[col] = rv; + break; + } + case parquet::Type::FLOAT: + { + parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get(); + float rv = 0; + hadValue = s->NextValue(&rv, &wasNull); + colDoubleValues[col] = rv; + break; + } + case parquet::Type::DOUBLE: + { + parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); + double rv = 0; + hadValue = s->NextValue(&rv, &wasNull); + colDoubleValues[col] = rv; + break; + } + case parquet::Type::BYTE_ARRAY: + { + parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); + hadValue = s->NextValue(&colByteArrayValues[col], &wasNull); + break; + } + case parquet::Type::INT96: + { + // INT96 tracks a date with nanosecond precision, convert to ms since epoch. + // ...see https://github.com/apache/parquet-format/pull/49 for more + // + // First 8 bytes: nanoseconds into the day + // Last 4 bytes: Julian day + // To get nanoseconds since the epoch: + // (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds + parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); + parquet::Int96 rv {0, 0, 0}; + hadValue = s->NextValue(&rv, &wasNull); + colIntValues[col] = int96toMsSinceEpoch(rv); + break; + } + case parquet::Type::INT64: + { + parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); + int64_t rv = 0; + hadValue = s->NextValue(&rv, &wasNull); + colIntValues[col] = rv; + break; + } + + case parquet::Type::BOOLEAN: + { + parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get(); + bool rv = false; + hadValue = s->NextValue(&rv, &wasNull); + colIntValues[col] = rv ? 1 : 0; + break; + } + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + { + parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get(); + parquet::FixedLenByteArray flba; + hadValue = s->NextValue(&flba, &wasNull); + colByteArrayValues[col].ptr = flba.ptr; + // TODO: cache this + colByteArrayValues[col].len = rowGroupMetadata->schema()->Column(col)->type_length(); + break; + } + default: + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << + parquet::TypeToString(types[col]); + throw std::invalid_argument(ss.str()); + break; + } + + if(!hadValue) + throw std::invalid_argument("unexpectedly lacking a next value"); + + colNulls[col] = wasNull; + } +} + +bool ParquetCursor::isNull(int col) { + // -1 is rowid, which is trivially non null + if(col == -1) + return false; + + return colNulls[col]; +} + +int ParquetCursor::getInt32(int col) { + return colIntValues[col]; +} + +long ParquetCursor::getInt64(int col) { + return colIntValues[col]; +} + +double ParquetCursor::getDouble(int col) { + return colDoubleValues[col]; +} + +parquet::ByteArray* ParquetCursor::getByteArray(int col) { + return &colByteArrayValues[col]; +} + +parquet::Type::type ParquetCursor::getPhysicalType(int col) { + return types[col]; +} + +parquet::LogicalType::type ParquetCursor::getLogicalType(int col) { + return logicalTypes[col]; +} + +void ParquetCursor::close() { + if(reader != NULL) { + reader->Close(); + } +} + +void ParquetCursor::reset(std::vector constraints) { + close(); + this->constraints = constraints; + rowId = 0; + // TODO: consider having a long lived handle in ParquetTable that can be borrowed + // without incurring the cost of opening the file from scratch twice + reader = parquet::ParquetFileReader::OpenFile( + table->getFile().data(), + true, + parquet::default_reader_properties(), + table->getMetadata()); + + rowGroupId = -1; + rowGroupSize = 0; + rowGroupStartRowId = 0; + // TODO: handle the case where rowgroups have disjoint schemas? + // TODO: or at least, fail fast if detected + rowsLeftInRowGroup = 0; + + numRows = reader->metadata()->num_rows(); + numRowGroups = reader->metadata()->num_row_groups(); +} + +ParquetTable* ParquetCursor::getTable() const { return table; } + +unsigned int ParquetCursor::getNumRowGroups() const { return numRowGroups; } +unsigned int ParquetCursor::getNumConstraints() const { return constraints.size(); } +const Constraint& ParquetCursor::getConstraint(unsigned int i) const { return constraints[i]; } + + diff --git a/parquet-windows/parquet_cursor.h b/parquet-windows/parquet_cursor.h new file mode 100644 index 0000000..f7d8c2a --- /dev/null +++ b/parquet-windows/parquet_cursor.h @@ -0,0 +1,73 @@ +#ifndef PARQUET_CURSOR_H +#define PARQUET_CURSOR_H + +#include "parquet_filter.h" +#include "parquet_table.h" +#include "parquet/api/reader.h" + +class ParquetCursor { + + ParquetTable* table; + std::unique_ptr reader; + std::unique_ptr rowGroupMetadata; + std::shared_ptr rowGroup; + std::vector> scanners; + std::vector types; + std::vector logicalTypes; + + std::vector colRows; + std::vector colNulls; + std::vector colIntValues; + std::vector colDoubleValues; + std::vector colByteArrayValues; + + int rowId; + int rowGroupId; + int rowGroupStartRowId; + int rowGroupSize; + int numRows; + int numRowGroups; + int rowsLeftInRowGroup; + + bool nextRowGroup(); + + std::vector constraints; + + bool currentRowSatisfiesFilter(); + bool currentRowGroupSatisfiesFilter(); + bool currentRowGroupSatisfiesRowIdFilter(Constraint& constraint); + bool currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr stats); + bool currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr stats); + bool currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr stats); + bool currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr stats); + + bool currentRowSatisfiesTextFilter(Constraint& constraint); + bool currentRowSatisfiesIntegerFilter(Constraint& constraint); + bool currentRowSatisfiesDoubleFilter(Constraint& constraint); + + +public: + ParquetCursor(ParquetTable* table); + int getRowId(); + void next(); + void close(); + void reset(std::vector constraints); + bool eof(); + + void ensureColumn(int col); + bool isNull(int col); + unsigned int getNumRowGroups() const; + unsigned int getNumConstraints() const; + const Constraint& getConstraint(unsigned int i) const; + parquet::Type::type getPhysicalType(int col); + parquet::LogicalType::type getLogicalType(int col); + ParquetTable* getTable() const; + + int getInt32(int col); + long getInt64(int col); + double getDouble(int col); + parquet::ByteArray* getByteArray(int col); +}; + +#endif + diff --git a/parquet-windows/parquet_filter.cpp b/parquet-windows/parquet_filter.cpp new file mode 100644 index 0000000..4220dc2 --- /dev/null +++ b/parquet-windows/parquet_filter.cpp @@ -0,0 +1,105 @@ +#include "parquet_filter.h" + +Constraint::Constraint( + RowGroupBitmap bitmap, + int column, + std::string columnName, + ConstraintOperator op, + ValueType type, + int64_t intValue, + double doubleValue, + std::vector blobValue +): bitmap(bitmap), + column(column), + columnName(columnName), + op(op), + type(type), + intValue(intValue), + doubleValue(doubleValue), + blobValue(blobValue), + hadRows(false) { + RowGroupBitmap bm = bitmap; + this->bitmap = bm; + + if(type == Text) { + stringValue = std::string((char*)&blobValue[0], blobValue.size()); + + if(op == Like) { + // This permits more rowgroups than is strictly needed + // since it assumes an implicit wildcard. But it's + // simple to implement, so we'll go with it. + likeStringValue = stringValue; + size_t idx = likeStringValue.find_first_of("%"); + if(idx != std::string::npos) { + likeStringValue = likeStringValue.substr(0, idx); + } + idx = likeStringValue.find_first_of("_"); + if(idx != std::string::npos) { + likeStringValue = likeStringValue.substr(0, idx); + } + } + } +} + +std::string Constraint::describe() const { + std::string rv; + rv.append(columnName); + rv.append(" "); + switch(op) { + case Equal: + rv.append("="); + break; + case GreaterThan: + rv.append(">"); + break; + case LessThanOrEqual: + rv.append("<="); + break; + case LessThan: + rv.append("<"); + break; + case GreaterThanOrEqual: + rv.append(">="); + break; + case Like: + rv.append("LIKE"); + break; + case Glob: + rv.append("GLOB"); + break; + case NotEqual: + rv.append("<>"); + break; + case IsNot: + rv.append("IS NOT"); + break; + case IsNotNull: + rv.append("IS NOT NULL"); + break; + case IsNull: + rv.append("IS NULL"); + break; + case Is: + rv.append("IS"); + break; + } + rv.append(" "); + + switch(type) { + case Null: + rv.append("NULL"); + break; + case Integer: + rv.append(std::to_string(intValue)); + break; + case Double: + rv.append(std::to_string(doubleValue)); + break; + case Blob: + break; + case Text: + rv.append(stringValue); + break; + } + return rv; +} diff --git a/parquet-windows/parquet_filter.h b/parquet-windows/parquet_filter.h new file mode 100644 index 0000000..fd95dbb --- /dev/null +++ b/parquet-windows/parquet_filter.h @@ -0,0 +1,120 @@ +#ifndef PARQUET_FILTER_H +#define PARQUET_FILTER_H + +#include +#include +#include + +enum ConstraintOperator { + Equal, + GreaterThan, + LessThanOrEqual, + LessThan, + GreaterThanOrEqual, + Like, + Glob, + NotEqual, + IsNot, + IsNotNull, + IsNull, + Is +}; + +enum ValueType { + Null, + Integer, + Double, + Blob, + Text +}; + +class RowGroupBitmap { + void setBit(std::vector& membership, unsigned int rowGroup, bool isSet) { + int byte = rowGroup / 8; + int offset = rowGroup % 8; + unsigned char c = membership[byte]; + c &= ~(1UL << offset); + if(isSet) { + c |= 1UL << offset; + } + membership[byte] = c; + } +// Compares estimated rowGroupFilter results against observed results +// when we explored the row group. This lets us cache +public: + RowGroupBitmap(unsigned int totalRowGroups) { + // Initialize everything to assume that all row groups match. + // As we discover otherwise, we'll update that assumption. + for(unsigned int i = 0; i < (totalRowGroups + 7) / 8; i++) { + estimatedMembership.push_back(0xFF); + actualMembership.push_back(0xFF); + } + } + + RowGroupBitmap( + std::vector estimatedMembership, + std::vector actualMembership) : + estimatedMembership(estimatedMembership), + actualMembership(actualMembership) { + } + + std::vector estimatedMembership; + std::vector actualMembership; + // Pass false only if definitely does not have rows + void setEstimatedMembership(unsigned int rowGroup, bool hasRows) { + setBit(estimatedMembership, rowGroup, hasRows); + } + + // Pass false only after exhausting all rows + void setActualMembership(unsigned int rowGroup, bool hadRows) { + setBit(actualMembership, rowGroup, hadRows); + } + + bool getActualMembership(unsigned int rowGroup) { + int byte = rowGroup / 8; + int offset = rowGroup % 8; + + return (actualMembership[byte] >> offset) & 1U; + } +}; + +class Constraint { +public: + // Kind of a messy constructor function, but it's just for internal use, so whatever. + Constraint( + RowGroupBitmap bitmap, + int column, + std::string columnName, + ConstraintOperator op, + ValueType type, + int64_t intValue, + double doubleValue, + std::vector blobValue + ); + + RowGroupBitmap bitmap; + int column; // underlying column in the query + std::string columnName; + ConstraintOperator op; + ValueType type; + + int64_t intValue; + double doubleValue; + std::vector blobValue; + // Only set when blobValue is set + std::string stringValue; + + // Only set when stringValue is set and op == Like + std::string likeStringValue; + + // A unique identifier for this constraint, e.g. + // col0 = 'Dawson Creek' + std::string describe() const; + + // This is a temp field used while evaluating if a rowgroup had rows + // that matched this constraint. + int rowGroupId; + bool hadRows; +}; + +#endif diff --git a/parquet-windows/parquet_table.cpp b/parquet-windows/parquet_table.cpp new file mode 100644 index 0000000..4abf886 --- /dev/null +++ b/parquet-windows/parquet_table.cpp @@ -0,0 +1,160 @@ +#include "parquet_table.h" + +#include "parquet/api/reader.h" + +ParquetTable::ParquetTable(std::string file, std::string tableName): file(file), tableName(tableName) { + std::unique_ptr reader = parquet::ParquetFileReader::OpenFile(file.data()); + metadata = reader->metadata(); +} + +std::string ParquetTable::columnName(int i) { + if(i == -1) + return "rowid"; + return columnNames[i]; +} + +unsigned int ParquetTable::getNumColumns() { + return columnNames.size(); +} + + +std::string ParquetTable::CreateStatement() { + std::unique_ptr reader = parquet::ParquetFileReader::OpenFile( + file.data(), + true, + parquet::default_reader_properties(), + metadata); + std::string text("CREATE TABLE x("); + auto schema = reader->metadata()->schema(); + + for(auto i = 0; i < schema->num_columns(); i++) { + auto _col = schema->GetColumnRoot(i); + columnNames.push_back(_col->name()); + } + + for(auto i = 0; i < schema->num_columns(); i++) { + auto _col = schema->GetColumnRoot(i); + + if(!_col->is_primitive()) { + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-primitive type"; + throw std::invalid_argument(ss.str()); + } + + if(_col->is_repeated()) { + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-scalar type"; + throw std::invalid_argument(ss.str()); + } + + parquet::schema::PrimitiveNode* col = (parquet::schema::PrimitiveNode*)_col; + + if(i > 0) + text += ", "; + + text += "\""; + // Horrifically inefficient, but easy to understand. + std::string colName = col->name(); + for(char& c : colName) { + if(c == '"') + text += "\"\""; + else + text += c; + } + text += "\""; + + std::string type; + + parquet::Type::type physical = col->physical_type(); + parquet::LogicalType::type logical = col->logical_type(); + // Be explicit about which types we understand so we don't mislead someone + // whose unsigned ints start getting interpreted as signed. (We could + // support this for UINT_8/16/32 -- and for UINT_64 we could throw if + // the high bit was set.) + if(logical == parquet::LogicalType::NONE || + logical == parquet::LogicalType::UTF8 || + logical == parquet::LogicalType::DATE || + logical == parquet::LogicalType::TIME_MILLIS || + logical == parquet::LogicalType::TIMESTAMP_MILLIS || + logical == parquet::LogicalType::TIME_MICROS || + logical == parquet::LogicalType::TIMESTAMP_MICROS || + logical == parquet::LogicalType::INT_8 || + logical == parquet::LogicalType::INT_16 || + logical == parquet::LogicalType::INT_32 || + logical == parquet::LogicalType::INT_64 || + logical == parquet::LogicalType::DECIMAL) { + switch(physical) { + case parquet::Type::BOOLEAN: + type = "TINYINT"; + break; + case parquet::Type::INT32: + if(logical == parquet::LogicalType::NONE || + logical == parquet::LogicalType::INT_32) { + type = "INT"; + } else if(logical == parquet::LogicalType::INT_8) { + type = "TINYINT"; + } else if(logical == parquet::LogicalType::INT_16) { + type = "SMALLINT"; + } + break; + case parquet::Type::INT96: + // INT96 is used for nanosecond precision on timestamps; we truncate + // to millisecond precision. + case parquet::Type::INT64: + type = "BIGINT"; + break; + case parquet::Type::FLOAT: + type = "REAL"; + break; + case parquet::Type::DOUBLE: + type = "DOUBLE"; + break; + case parquet::Type::BYTE_ARRAY: + if(logical == parquet::LogicalType::UTF8) { + type = "TEXT"; + } else { + type = "BLOB"; + } + break; + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + if(logical == parquet::LogicalType::DECIMAL){ + type = "DECIMAL"; + }else{ + type = "BLOB"; + } + break; + default: + break; + } + } + + if(type.empty()) { + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has unsupported type: " << + parquet::TypeToString(physical) << "/" << parquet::LogicalTypeToString(logical); + + throw std::invalid_argument(ss.str()); + } + +#ifdef DEBUG + printf("col %d[name=%s, p=%d:%s, l=%d:%s] is %s\n", + i, + col->name().data(), + col->physical_type(), + parquet::TypeToString(col->physical_type()).data(), + col->logical_type(), + parquet::LogicalTypeToString(col->logical_type()).data(), + type.data()); +#endif + + text += " "; + text += type; + } + text +=");"; + return text; +} + +std::shared_ptr ParquetTable::getMetadata() { return metadata; } + +const std::string& ParquetTable::getFile() { return file; } +const std::string& ParquetTable::getTableName() { return tableName; } diff --git a/parquet-windows/parquet_table.h b/parquet-windows/parquet_table.h new file mode 100644 index 0000000..6b35cae --- /dev/null +++ b/parquet-windows/parquet_table.h @@ -0,0 +1,25 @@ +#ifndef PARQUET_TABLE_H +#define PARQUET_TABLE_H + +#include +#include +#include "parquet/api/reader.h" + +class ParquetTable { + std::string file; + std::string tableName; + std::vector columnNames; + std::shared_ptr metadata; + + +public: + ParquetTable(std::string file, std::string tableName); + std::string CreateStatement(); + std::string columnName(int idx); + unsigned int getNumColumns(); + std::shared_ptr getMetadata(); + const std::string& getFile(); + const std::string& getTableName(); +}; + +#endif