mirror of
https://github.com/cldellow/sqlite-parquet-vtable.git
synced 2025-09-14 22:39:59 +00:00
Move source files to a more conventional layout
This commit is contained in:
2
src/.gitignore
vendored
Normal file
2
src/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
cmds.txt
|
||||
go
|
760
src/parquet.cc
Normal file
760
src/parquet.cc
Normal file
@@ -0,0 +1,760 @@
|
||||
/*
|
||||
* This file contains the implementation of an SQLite virtual table for
|
||||
* reading Parquet files.
|
||||
*
|
||||
* Usage:
|
||||
*
|
||||
* .load ./parquet
|
||||
* CREATE VIRTUAL TABLE demo USING parquet(FILENAME);
|
||||
* SELECT * FROM demo;
|
||||
*
|
||||
*/
|
||||
#include <sqlite3ext.h>
|
||||
SQLITE_EXTENSION_INIT1
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include <assert.h>
|
||||
#include <stdarg.h>
|
||||
#include <ctype.h>
|
||||
#include <stdio.h>
|
||||
#include <iomanip>
|
||||
#include <sys/time.h>
|
||||
#include <memory>
|
||||
|
||||
#include "parquet_table.h"
|
||||
#include "parquet_cursor.h"
|
||||
#include "parquet_filter.h"
|
||||
|
||||
//#define DEBUG
|
||||
|
||||
/* Forward references to the various virtual table methods implemented
|
||||
* in this file. */
|
||||
static int parquetCreate(sqlite3*, void*, int, const char*const*,
|
||||
sqlite3_vtab**,char**);
|
||||
static int parquetConnect(sqlite3*, void*, int, const char*const*,
|
||||
sqlite3_vtab**,char**);
|
||||
static int parquetBestIndex(sqlite3_vtab*,sqlite3_index_info*);
|
||||
static int parquetDisconnect(sqlite3_vtab*);
|
||||
static int parquetDestroy(sqlite3_vtab*);
|
||||
static int parquetOpen(sqlite3_vtab*, sqlite3_vtab_cursor**);
|
||||
static int parquetClose(sqlite3_vtab_cursor*);
|
||||
static int parquetFilter(sqlite3_vtab_cursor*, int idxNum, const char *idxStr,
|
||||
int argc, sqlite3_value **argv);
|
||||
static int parquetNext(sqlite3_vtab_cursor*);
|
||||
static int parquetEof(sqlite3_vtab_cursor*);
|
||||
static int parquetColumn(sqlite3_vtab_cursor*,sqlite3_context*,int);
|
||||
static int parquetRowid(sqlite3_vtab_cursor*,sqlite3_int64*);
|
||||
|
||||
/* An instance of the Parquet virtual table */
|
||||
typedef struct sqlite3_vtab_parquet {
|
||||
sqlite3_vtab base; /* Base class. Must be first */
|
||||
ParquetTable* table;
|
||||
sqlite3* db;
|
||||
} sqlite3_vtab_parquet;
|
||||
|
||||
|
||||
/* A cursor for the Parquet virtual table */
|
||||
typedef struct sqlite3_vtab_cursor_parquet {
|
||||
sqlite3_vtab_cursor base; /* Base class. Must be first */
|
||||
ParquetCursor* cursor;
|
||||
} sqlite3_vtab_cursor_parquet;
|
||||
|
||||
static int parquetDestroy(sqlite3_vtab *pVtab) {
|
||||
sqlite3_vtab_parquet *p = (sqlite3_vtab_parquet*)pVtab;
|
||||
|
||||
// Clean up our shadow table. This is useful if the user has recreated
|
||||
// the parquet file, and our mappings would now be invalid.
|
||||
std::string drop = "DROP TABLE IF EXISTS _";
|
||||
drop.append(p->table->getTableName());
|
||||
drop.append("_rowgroups");
|
||||
int rv = sqlite3_exec(p->db, drop.data(), 0, 0, 0);
|
||||
if(rv != 0)
|
||||
return rv;
|
||||
|
||||
return SQLITE_OK;
|
||||
}
|
||||
|
||||
/*
|
||||
** This method is the destructor fo a sqlite3_vtab_parquet object.
|
||||
*/
|
||||
static int parquetDisconnect(sqlite3_vtab *pVtab){
|
||||
sqlite3_vtab_parquet *p = (sqlite3_vtab_parquet*)pVtab;
|
||||
delete p->table;
|
||||
sqlite3_free(p);
|
||||
return SQLITE_OK;
|
||||
}
|
||||
|
||||
static int parquetConnect(
|
||||
sqlite3 *db,
|
||||
void *pAux,
|
||||
int argc,
|
||||
const char *const*argv,
|
||||
sqlite3_vtab **ppVtab,
|
||||
char **pzErr
|
||||
){
|
||||
try {
|
||||
if(argc != 4 || strlen(argv[3]) < 2) {
|
||||
*pzErr = sqlite3_mprintf("must provide exactly one argument, the path to a parquet file");
|
||||
return SQLITE_ERROR;
|
||||
}
|
||||
|
||||
std::string tableName = argv[2];
|
||||
// Remove the delimiting single quotes
|
||||
std::string fname = argv[3];
|
||||
fname = fname.substr(1, fname.length() - 2);
|
||||
std::unique_ptr<sqlite3_vtab_parquet, void(*)(void*)> vtab(
|
||||
(sqlite3_vtab_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_parquet)),
|
||||
sqlite3_free);
|
||||
memset(vtab.get(), 0, sizeof(*vtab.get()));
|
||||
|
||||
try {
|
||||
std::unique_ptr<ParquetTable> table(new ParquetTable(fname, tableName));
|
||||
|
||||
std::string create = table->CreateStatement();
|
||||
int rc = sqlite3_declare_vtab(db, create.data());
|
||||
if(rc)
|
||||
return rc;
|
||||
|
||||
vtab->table = table.release();
|
||||
vtab->db = db;
|
||||
*ppVtab = (sqlite3_vtab*)vtab.release();
|
||||
return SQLITE_OK;
|
||||
} catch (const std::exception& e) {
|
||||
*pzErr = sqlite3_mprintf(e.what());
|
||||
return SQLITE_ERROR;
|
||||
}
|
||||
} catch(std::bad_alloc& ba) {
|
||||
return SQLITE_NOMEM;
|
||||
} catch(std::exception& e) {
|
||||
return SQLITE_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
** The xConnect and xCreate methods do the same thing, but they must be
|
||||
** different so that the virtual table is not an eponymous virtual table.
|
||||
*/
|
||||
static int parquetCreate(
|
||||
sqlite3 *db,
|
||||
void *pAux,
|
||||
int argc, const char *const*argv,
|
||||
sqlite3_vtab **ppVtab,
|
||||
char **pzErr
|
||||
){
|
||||
try {
|
||||
// Create shadow table for storing constraint -> rowid mappings
|
||||
std::string create = "CREATE TABLE IF NOT EXISTS _";
|
||||
create.append(argv[2]);
|
||||
create.append("_rowgroups(clause TEXT, estimate BLOB, actual BLOB)");
|
||||
int rv = sqlite3_exec(db, create.data(), 0, 0, 0);
|
||||
if(rv != 0)
|
||||
return rv;
|
||||
|
||||
create = "CREATE UNIQUE INDEX IF NOT EXISTS _";
|
||||
create.append(argv[2]);
|
||||
create.append("_index ON _");
|
||||
create.append(argv[2]);
|
||||
create.append("_rowgroups(clause)");
|
||||
rv = sqlite3_exec(db, create.data(), 0, 0, 0);
|
||||
|
||||
return parquetConnect(db, pAux, argc, argv, ppVtab, pzErr);
|
||||
} catch (std::bad_alloc& ba) {
|
||||
return SQLITE_NOMEM;
|
||||
}
|
||||
}
|
||||
|
||||
std::string quoteBlob(const std::vector<unsigned char>& bytes) {
|
||||
std::ostringstream ss;
|
||||
ss << "X'" << std::hex;
|
||||
for(unsigned int i = 0; i < bytes.size(); i++) {
|
||||
ss << std::setfill('0') << std::setw(2) << (unsigned int)(unsigned char)bytes[i];
|
||||
}
|
||||
ss << "'";
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
void persistConstraints(sqlite3* db, ParquetCursor* cursor) {
|
||||
for(unsigned int i = 0; i < cursor->getNumConstraints(); i++) {
|
||||
const Constraint& constraint = cursor->getConstraint(i);
|
||||
const std::vector<unsigned char>& estimated = constraint.bitmap.estimatedMembership;
|
||||
const std::vector<unsigned char>& actual = constraint.bitmap.actualMembership;
|
||||
if(estimated == actual) {
|
||||
continue;
|
||||
}
|
||||
std::string desc = constraint.describe();
|
||||
|
||||
std::string estimatedStr = quoteBlob(estimated);
|
||||
std::string actualStr = quoteBlob(actual);
|
||||
|
||||
// This is only advisory, so ignore failures.
|
||||
char* sql = sqlite3_mprintf(
|
||||
"INSERT OR REPLACE INTO _%s_rowgroups(clause, estimate, actual) VALUES ('%q', %s, %s)",
|
||||
cursor->getTable()->getTableName().c_str(),
|
||||
desc.c_str(),
|
||||
estimatedStr.c_str(),
|
||||
actualStr.c_str());
|
||||
|
||||
|
||||
if(sql == NULL)
|
||||
return;
|
||||
|
||||
sqlite3_exec(db, sql, 0, 0, 0);
|
||||
sqlite3_free(sql);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
** Destructor for a sqlite3_vtab_cursor_parquet.
|
||||
*/
|
||||
static int parquetClose(sqlite3_vtab_cursor *cur){
|
||||
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
|
||||
vtab_cursor_parquet->cursor->close();
|
||||
delete vtab_cursor_parquet->cursor;
|
||||
sqlite3_free(cur);
|
||||
return SQLITE_OK;
|
||||
}
|
||||
|
||||
/*
|
||||
** Constructor for a new sqlite3_vtab_parquet cursor object.
|
||||
*/
|
||||
static int parquetOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor){
|
||||
try {
|
||||
std::unique_ptr<sqlite3_vtab_cursor_parquet, void(*)(void*)> cursor(
|
||||
(sqlite3_vtab_cursor_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_cursor_parquet)),
|
||||
sqlite3_free);
|
||||
memset(cursor.get(), 0, sizeof(*cursor.get()));
|
||||
|
||||
sqlite3_vtab_parquet* pParquet = (sqlite3_vtab_parquet*)p;
|
||||
cursor->cursor = new ParquetCursor(pParquet->table);
|
||||
|
||||
*ppCursor = (sqlite3_vtab_cursor*)cursor.release();
|
||||
return SQLITE_OK;
|
||||
} catch(std::bad_alloc& ba) {
|
||||
return SQLITE_NOMEM;
|
||||
} catch(std::exception& e) {
|
||||
return SQLITE_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
** Advance a sqlite3_vtab_cursor_parquet to its next row of input.
|
||||
** Set the EOF marker if we reach the end of input.
|
||||
*/
|
||||
static int parquetNext(sqlite3_vtab_cursor *cur){
|
||||
try {
|
||||
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
|
||||
ParquetCursor* cursor = vtab_cursor_parquet->cursor;
|
||||
cursor->next();
|
||||
return SQLITE_OK;
|
||||
} catch(std::bad_alloc& ba) {
|
||||
return SQLITE_NOMEM;
|
||||
} catch(std::exception& e) {
|
||||
return SQLITE_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
** Return values of columns for the row at which the sqlite3_vtab_cursor_parquet
|
||||
** is currently pointing.
|
||||
*/
|
||||
static int parquetColumn(
|
||||
sqlite3_vtab_cursor *cur, /* The cursor */
|
||||
sqlite3_context *ctx, /* First argument to sqlite3_result_...() */
|
||||
int col /* Which column to return */
|
||||
){
|
||||
try {
|
||||
ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor;
|
||||
cursor->ensureColumn(col);
|
||||
|
||||
if(cursor->isNull(col)) {
|
||||
sqlite3_result_null(ctx);
|
||||
} else {
|
||||
switch(cursor->getPhysicalType(col)) {
|
||||
case parquet::Type::BOOLEAN:
|
||||
case parquet::Type::INT32:
|
||||
{
|
||||
int rv = cursor->getInt32(col);
|
||||
sqlite3_result_int(ctx, rv);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::FLOAT:
|
||||
case parquet::Type::DOUBLE:
|
||||
{
|
||||
double rv = cursor->getDouble(col);
|
||||
sqlite3_result_double(ctx, rv);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::BYTE_ARRAY:
|
||||
{
|
||||
parquet::ByteArray* rv = cursor->getByteArray(col);
|
||||
if(cursor->getLogicalType(col) == parquet::LogicalType::UTF8) {
|
||||
sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT);
|
||||
} else {
|
||||
sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case parquet::Type::INT96:
|
||||
// This type exists to store timestamps in nanoseconds due to legacy
|
||||
// reasons. We just interpret it as a timestamp in milliseconds.
|
||||
case parquet::Type::INT64:
|
||||
{
|
||||
long rv = cursor->getInt64(col);
|
||||
sqlite3_result_int64(ctx, rv);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
|
||||
{
|
||||
parquet::ByteArray* rv = cursor->getByteArray(col);
|
||||
sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// Should be impossible to get here as we should have forbidden this at
|
||||
// CREATE time -- maybe file changed underneath us?
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
|
||||
parquet::TypeToString(cursor->getPhysicalType(col));
|
||||
|
||||
throw std::invalid_argument(ss.str());
|
||||
break;
|
||||
}
|
||||
}
|
||||
return SQLITE_OK;
|
||||
} catch(std::bad_alloc& ba) {
|
||||
return SQLITE_NOMEM;
|
||||
} catch(std::exception& e) {
|
||||
return SQLITE_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
** Return the rowid for the current row.
|
||||
*/
|
||||
static int parquetRowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid){
|
||||
ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor;
|
||||
*pRowid = cursor->getRowId();
|
||||
return SQLITE_OK;
|
||||
}
|
||||
|
||||
/*
|
||||
** Return TRUE if the cursor has been moved off of the last
|
||||
** row of output.
|
||||
*/
|
||||
static int parquetEof(sqlite3_vtab_cursor *cur){
|
||||
ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor;
|
||||
if(cursor->eof()) {
|
||||
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
|
||||
sqlite3_vtab_parquet* vtab_parquet = (sqlite3_vtab_parquet*)(vtab_cursor_parquet->base.pVtab);
|
||||
persistConstraints(vtab_parquet->db, cursor);
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
const char* opName(int op) {
|
||||
switch(op) {
|
||||
case SQLITE_INDEX_CONSTRAINT_EQ:
|
||||
return "=";
|
||||
case SQLITE_INDEX_CONSTRAINT_GT:
|
||||
return ">";
|
||||
case SQLITE_INDEX_CONSTRAINT_LE:
|
||||
return "<=";
|
||||
case SQLITE_INDEX_CONSTRAINT_LT:
|
||||
return "<";
|
||||
case SQLITE_INDEX_CONSTRAINT_GE:
|
||||
return ">=";
|
||||
case SQLITE_INDEX_CONSTRAINT_MATCH:
|
||||
return "match";
|
||||
case SQLITE_INDEX_CONSTRAINT_LIKE:
|
||||
return "LIKE";
|
||||
case SQLITE_INDEX_CONSTRAINT_GLOB:
|
||||
return "GLOB";
|
||||
case SQLITE_INDEX_CONSTRAINT_REGEXP:
|
||||
return "REGEXP";
|
||||
case SQLITE_INDEX_CONSTRAINT_NE:
|
||||
return "!=";
|
||||
case SQLITE_INDEX_CONSTRAINT_ISNOT:
|
||||
return "IS NOT";
|
||||
case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
|
||||
return "IS NOT NULL";
|
||||
case SQLITE_INDEX_CONSTRAINT_ISNULL:
|
||||
return "IS NULL";
|
||||
case SQLITE_INDEX_CONSTRAINT_IS:
|
||||
return "IS";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
void debugConstraints(sqlite3_index_info *pIdxInfo, ParquetTable *table, int argc, sqlite3_value** argv) {
|
||||
printf("debugConstraints, argc=%d\n", argc);
|
||||
int j = 0;
|
||||
for(int i = 0; i < pIdxInfo->nConstraint; i++) {
|
||||
std::string valueStr = "?";
|
||||
if(argv != NULL && pIdxInfo->aConstraint[i].usable) {
|
||||
int type = sqlite3_value_type(argv[j]);
|
||||
switch(type) {
|
||||
case SQLITE_INTEGER:
|
||||
{
|
||||
sqlite3_int64 rv = sqlite3_value_int64(argv[j]);
|
||||
std::ostringstream ss;
|
||||
ss << rv;
|
||||
valueStr = ss.str();
|
||||
break;
|
||||
}
|
||||
case SQLITE_FLOAT:
|
||||
{
|
||||
double rv = sqlite3_value_double(argv[j]);
|
||||
std::ostringstream ss;
|
||||
ss << rv;
|
||||
valueStr = ss.str();
|
||||
break;
|
||||
}
|
||||
case SQLITE_TEXT:
|
||||
{
|
||||
const unsigned char* rv = sqlite3_value_text(argv[j]);
|
||||
std::ostringstream ss;
|
||||
ss << "'" << rv << "'";
|
||||
valueStr = ss.str();
|
||||
break;
|
||||
}
|
||||
case SQLITE_BLOB:
|
||||
{
|
||||
int sizeBytes = sqlite3_value_bytes(argv[j]);
|
||||
std::ostringstream ss;
|
||||
ss << "'..." << sizeBytes << "-byte blob...'";
|
||||
valueStr = ss.str();
|
||||
break;
|
||||
}
|
||||
case SQLITE_NULL:
|
||||
{
|
||||
valueStr = "NULL";
|
||||
break;
|
||||
}
|
||||
}
|
||||
j++;
|
||||
}
|
||||
printf(" constraint %d: col %s %s %s, usable %d\n",
|
||||
i,
|
||||
table->columnName(pIdxInfo->aConstraint[i].iColumn).data(),
|
||||
opName(pIdxInfo->aConstraint[i].op),
|
||||
valueStr.data(),
|
||||
pIdxInfo->aConstraint[i].usable);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
ConstraintOperator constraintOperatorFromSqlite(int op) {
|
||||
switch(op) {
|
||||
case SQLITE_INDEX_CONSTRAINT_EQ:
|
||||
return Equal;
|
||||
case SQLITE_INDEX_CONSTRAINT_GT:
|
||||
return GreaterThan;
|
||||
case SQLITE_INDEX_CONSTRAINT_LE:
|
||||
return LessThanOrEqual;
|
||||
case SQLITE_INDEX_CONSTRAINT_LT:
|
||||
return LessThan;
|
||||
case SQLITE_INDEX_CONSTRAINT_GE:
|
||||
return GreaterThanOrEqual;
|
||||
case SQLITE_INDEX_CONSTRAINT_LIKE:
|
||||
return Like;
|
||||
case SQLITE_INDEX_CONSTRAINT_GLOB:
|
||||
return Glob;
|
||||
case SQLITE_INDEX_CONSTRAINT_NE:
|
||||
return NotEqual;
|
||||
case SQLITE_INDEX_CONSTRAINT_ISNOT:
|
||||
return IsNot;
|
||||
case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
|
||||
return IsNotNull;
|
||||
case SQLITE_INDEX_CONSTRAINT_ISNULL:
|
||||
return IsNull;
|
||||
case SQLITE_INDEX_CONSTRAINT_IS:
|
||||
return Is;
|
||||
}
|
||||
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": operator " << op << " is unsupported";
|
||||
throw std::invalid_argument(ss.str());
|
||||
}
|
||||
|
||||
std::vector<unsigned char> getRowGroupsForClause(sqlite3* db, std::string table, std::string clause) {
|
||||
std::vector<unsigned char> rv;
|
||||
|
||||
std::unique_ptr<char, void(*)(void*)> sql(sqlite3_mprintf(
|
||||
"SELECT actual FROM _%s_rowgroups WHERE clause = '%q'",
|
||||
table.c_str(),
|
||||
clause.c_str()), sqlite3_free);
|
||||
|
||||
if(sql.get() == NULL)
|
||||
return rv;
|
||||
|
||||
sqlite3_stmt* pStmt = NULL;
|
||||
int rc = sqlite3_prepare_v2(db, sql.get(), -1, &pStmt, NULL);
|
||||
if(rc != 0)
|
||||
return rv;
|
||||
|
||||
rc = sqlite3_step(pStmt);
|
||||
if(rc == SQLITE_ROW) {
|
||||
int size = sqlite3_column_bytes(pStmt, 0);
|
||||
unsigned char* blob = (unsigned char*)sqlite3_column_blob(pStmt, 0);
|
||||
// TODO: there is a memory leak here if we get a std::bad_alloc while populating rv;
|
||||
// we fail to free pStmt
|
||||
for(int i = 0; i < size; i++) {
|
||||
rv.push_back(blob[i]);
|
||||
}
|
||||
}
|
||||
|
||||
sqlite3_finalize(pStmt);
|
||||
return rv;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
** Only a full table scan is supported. So xFilter simply rewinds to
|
||||
** the beginning.
|
||||
*/
|
||||
static int parquetFilter(
|
||||
sqlite3_vtab_cursor *cur,
|
||||
int idxNum,
|
||||
const char *idxStr,
|
||||
int argc,
|
||||
sqlite3_value **argv
|
||||
){
|
||||
try {
|
||||
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
|
||||
sqlite3_vtab_parquet* vtab_parquet = (sqlite3_vtab_parquet*)(vtab_cursor_parquet->base.pVtab);
|
||||
sqlite3* db = vtab_parquet->db;
|
||||
ParquetCursor* cursor = vtab_cursor_parquet->cursor;
|
||||
sqlite3_index_info* indexInfo = (sqlite3_index_info*)idxStr;
|
||||
|
||||
#ifdef DEBUG
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv, NULL);
|
||||
unsigned long long millisecondsSinceEpoch =
|
||||
(unsigned long long)(tv.tv_sec) * 1000 +
|
||||
(unsigned long long)(tv.tv_usec) / 1000;
|
||||
|
||||
printf("%llu xFilter: idxNum=%d, idxStr=%lu, argc=%d\n", millisecondsSinceEpoch, idxNum, (long unsigned int)idxStr, argc);
|
||||
debugConstraints(indexInfo, cursor->getTable(), argc, argv);
|
||||
#endif
|
||||
std::vector<Constraint> constraints;
|
||||
int j = 0;
|
||||
for(int i = 0; i < indexInfo->nConstraint; i++) {
|
||||
if(!indexInfo->aConstraint[i].usable) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ValueType type = Null;
|
||||
int64_t intValue = 0;
|
||||
double doubleValue = 0;
|
||||
std::vector<unsigned char> blobValue;
|
||||
int sqliteType = sqlite3_value_type(argv[j]);
|
||||
|
||||
if(sqliteType == SQLITE_INTEGER) {
|
||||
type = Integer;
|
||||
intValue = sqlite3_value_int64(argv[j]);
|
||||
} else if(sqliteType == SQLITE_FLOAT) {
|
||||
type = Double;
|
||||
doubleValue = sqlite3_value_double(argv[j]);
|
||||
} else if(sqliteType == SQLITE_TEXT) {
|
||||
type = Text;
|
||||
int len = sqlite3_value_bytes(argv[j]);
|
||||
const unsigned char* ptr = sqlite3_value_text(argv[j]);
|
||||
for(int k = 0; k < len; k++) {
|
||||
blobValue.push_back(ptr[k]);
|
||||
}
|
||||
} else if(sqliteType == SQLITE_BLOB) {
|
||||
type = Blob;
|
||||
int len = sqlite3_value_bytes(argv[j]);
|
||||
const unsigned char* ptr = (const unsigned char*)sqlite3_value_blob(argv[j]);
|
||||
for(int k = 0; k < len; k++) {
|
||||
blobValue.push_back(ptr[k]);
|
||||
}
|
||||
} else if(sqliteType == SQLITE_NULL) {
|
||||
type = Null;
|
||||
}
|
||||
|
||||
std::string columnName = "rowid";
|
||||
if(indexInfo->aConstraint[i].iColumn >= 0) {
|
||||
columnName = cursor->getTable()->columnName(indexInfo->aConstraint[i].iColumn);
|
||||
}
|
||||
|
||||
RowGroupBitmap bitmap = RowGroupBitmap(cursor->getNumRowGroups());
|
||||
Constraint dummy(
|
||||
bitmap,
|
||||
indexInfo->aConstraint[i].iColumn,
|
||||
columnName,
|
||||
constraintOperatorFromSqlite(indexInfo->aConstraint[i].op),
|
||||
type,
|
||||
intValue,
|
||||
doubleValue,
|
||||
blobValue);
|
||||
|
||||
std::vector<unsigned char> actual = getRowGroupsForClause(db, cursor->getTable()->getTableName(), dummy.describe());
|
||||
if(actual.size() > 0) {
|
||||
// Initialize the estimate to be the actual -- eventually they'll converge
|
||||
// and we'll stop writing back to the db.
|
||||
std::vector<unsigned char> estimate = actual;
|
||||
bitmap = RowGroupBitmap(estimate, actual);
|
||||
}
|
||||
|
||||
Constraint constraint(
|
||||
bitmap,
|
||||
indexInfo->aConstraint[i].iColumn,
|
||||
columnName,
|
||||
constraintOperatorFromSqlite(indexInfo->aConstraint[i].op),
|
||||
type,
|
||||
intValue,
|
||||
doubleValue,
|
||||
blobValue);
|
||||
|
||||
constraints.push_back(constraint);
|
||||
j++;
|
||||
}
|
||||
cursor->reset(constraints);
|
||||
return parquetNext(cur);
|
||||
} catch(std::bad_alloc& ba) {
|
||||
return SQLITE_NOMEM;
|
||||
} catch(std::exception& e) {
|
||||
return SQLITE_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We'll always indicate to SQLite that we prefer it to use an index so that it will
|
||||
* pass additional context to xFilter, which we may or may not use.
|
||||
*
|
||||
* We copy the sqlite3_index_info structure, as is, into idxStr for later use.
|
||||
*/
|
||||
static int parquetBestIndex(
|
||||
sqlite3_vtab *tab,
|
||||
sqlite3_index_info *pIdxInfo
|
||||
){
|
||||
try {
|
||||
|
||||
#ifdef DEBUG
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv, NULL);
|
||||
unsigned long long millisecondsSinceEpoch =
|
||||
(unsigned long long)(tv.tv_sec) * 1000 +
|
||||
(unsigned long long)(tv.tv_usec) / 1000;
|
||||
|
||||
|
||||
ParquetTable* table = ((sqlite3_vtab_parquet*)tab)->table;
|
||||
printf("%llu xBestIndex: nConstraint=%d, nOrderBy=%d\n", millisecondsSinceEpoch, pIdxInfo->nConstraint, pIdxInfo->nOrderBy);
|
||||
debugConstraints(pIdxInfo, table, 0, NULL);
|
||||
#endif
|
||||
// We traverse in rowid ascending order, so if they're asking for it to be ordered like that,
|
||||
// we can tell SQLite that it's guaranteed. This speeds up some DB viewer utilities that
|
||||
// use rowids for pagination.
|
||||
if(pIdxInfo->nOrderBy == 1 && pIdxInfo->aOrderBy[0].iColumn == -1 && pIdxInfo->aOrderBy[0].desc == 0)
|
||||
pIdxInfo->orderByConsumed = 1;
|
||||
|
||||
if(pIdxInfo->nConstraint == 0) {
|
||||
pIdxInfo->estimatedCost = 1000000000000;
|
||||
pIdxInfo->idxNum = 0;
|
||||
} else {
|
||||
pIdxInfo->estimatedCost = 1;
|
||||
pIdxInfo->idxNum = 1;
|
||||
int j = 0;
|
||||
|
||||
for(int i = 0; i < pIdxInfo->nConstraint; i++) {
|
||||
if(pIdxInfo->aConstraint[i].usable) {
|
||||
j++;
|
||||
pIdxInfo->aConstraintUsage[i].argvIndex = j;
|
||||
// pIdxInfo->aConstraintUsage[i].omit = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t dupeSize = sizeof(sqlite3_index_info) +
|
||||
//pIdxInfo->nConstraint * sizeof(sqlite3_index_constraint) +
|
||||
pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint) +
|
||||
pIdxInfo->nOrderBy * sizeof(sqlite3_index_info::sqlite3_index_orderby) +
|
||||
pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint_usage);
|
||||
sqlite3_index_info* dupe = (sqlite3_index_info*)sqlite3_malloc(dupeSize);
|
||||
pIdxInfo->idxStr = (char*)dupe;
|
||||
pIdxInfo->needToFreeIdxStr = 1;
|
||||
|
||||
memset(dupe, 0, dupeSize);
|
||||
memcpy(dupe, pIdxInfo, sizeof(sqlite3_index_info));
|
||||
|
||||
dupe->aConstraint = (sqlite3_index_info::sqlite3_index_constraint*)((char*)dupe + sizeof(sqlite3_index_info));
|
||||
dupe->aOrderBy = (sqlite3_index_info::sqlite3_index_orderby*)((char*)dupe +
|
||||
sizeof(sqlite3_index_info) +
|
||||
pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint));
|
||||
dupe->aConstraintUsage = (sqlite3_index_info::sqlite3_index_constraint_usage*)((char*)dupe +
|
||||
sizeof(sqlite3_index_info) +
|
||||
pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint) +
|
||||
pIdxInfo->nOrderBy * sizeof(sqlite3_index_info::sqlite3_index_orderby));
|
||||
|
||||
|
||||
for(int i = 0; i < pIdxInfo->nConstraint; i++) {
|
||||
dupe->aConstraint[i].iColumn = pIdxInfo->aConstraint[i].iColumn;
|
||||
dupe->aConstraint[i].op = pIdxInfo->aConstraint[i].op;
|
||||
dupe->aConstraint[i].usable = pIdxInfo->aConstraint[i].usable;
|
||||
dupe->aConstraint[i].iTermOffset = pIdxInfo->aConstraint[i].iTermOffset;
|
||||
|
||||
dupe->aConstraintUsage[i].argvIndex = pIdxInfo->aConstraintUsage[i].argvIndex;
|
||||
dupe->aConstraintUsage[i].omit = pIdxInfo->aConstraintUsage[i].omit;
|
||||
}
|
||||
|
||||
for(int i = 0; i < pIdxInfo->nOrderBy; i++) {
|
||||
dupe->aOrderBy[i].iColumn = pIdxInfo->aOrderBy[i].iColumn;
|
||||
dupe->aOrderBy[i].desc = pIdxInfo->aOrderBy[i].desc;
|
||||
}
|
||||
|
||||
return SQLITE_OK;
|
||||
} catch(std::bad_alloc& ba) {
|
||||
return SQLITE_NOMEM;
|
||||
} catch(std::exception& e) {
|
||||
return SQLITE_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static sqlite3_module ParquetModule = {
|
||||
0, /* iVersion */
|
||||
parquetCreate, /* xCreate */
|
||||
parquetConnect, /* xConnect */
|
||||
parquetBestIndex, /* xBestIndex */
|
||||
parquetDisconnect, /* xDisconnect */
|
||||
parquetDestroy, /* xDestroy */
|
||||
parquetOpen, /* xOpen - open a cursor */
|
||||
parquetClose, /* xClose - close a cursor */
|
||||
parquetFilter, /* xFilter - configure scan constraints */
|
||||
parquetNext, /* xNext - advance a cursor */
|
||||
parquetEof, /* xEof - check for end of scan */
|
||||
parquetColumn, /* xColumn - read data */
|
||||
parquetRowid, /* xRowid - read data */
|
||||
0, /* xUpdate */
|
||||
0, /* xBegin */
|
||||
0, /* xSync */
|
||||
0, /* xCommit */
|
||||
0, /* xRollback */
|
||||
0, /* xFindMethod */
|
||||
0, /* xRename */
|
||||
};
|
||||
|
||||
/*
|
||||
* This routine is called when the extension is loaded. The new
|
||||
* Parquet virtual table module is registered with the calling database
|
||||
* connection.
|
||||
*/
|
||||
extern "C" {
|
||||
int sqlite3_parquet_init(
|
||||
sqlite3 *db,
|
||||
char **pzErrMsg,
|
||||
const sqlite3_api_routines *pApi
|
||||
){
|
||||
int rc;
|
||||
SQLITE_EXTENSION_INIT2(pApi);
|
||||
rc = sqlite3_create_module(db, "parquet", &ParquetModule, 0);
|
||||
return rc;
|
||||
}
|
||||
}
|
970
src/parquet_cursor.cc
Normal file
970
src/parquet_cursor.cc
Normal file
@@ -0,0 +1,970 @@
|
||||
#include "parquet_cursor.h"
|
||||
|
||||
ParquetCursor::ParquetCursor(ParquetTable* table): table(table) {
|
||||
reader = NULL;
|
||||
reset(std::vector<Constraint>());
|
||||
}
|
||||
|
||||
bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint) {
|
||||
if(constraint.type != Integer)
|
||||
return true;
|
||||
|
||||
int64_t target = constraint.intValue;
|
||||
switch(constraint.op) {
|
||||
case IsNull:
|
||||
return false;
|
||||
case Is:
|
||||
case Equal:
|
||||
return target >= rowId && target < rowId + rowGroupSize;
|
||||
case GreaterThan:
|
||||
// rowId > target
|
||||
return rowId + rowGroupSize > target;
|
||||
case GreaterThanOrEqual:
|
||||
// rowId >= target
|
||||
return rowId + rowGroupSize >= rowId;
|
||||
case LessThan:
|
||||
return target > rowId;
|
||||
case LessThanOrEqual:
|
||||
return target >= rowId;
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) {
|
||||
if(!_stats->HasMinMax()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if(constraint.type != Blob) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const unsigned char* minPtr = NULL;
|
||||
const unsigned char* maxPtr = NULL;
|
||||
size_t minLen = 0;
|
||||
size_t maxLen = 0;
|
||||
|
||||
parquet::Type::type pqType = types[constraint.column];
|
||||
|
||||
if(pqType == parquet::Type::BYTE_ARRAY) {
|
||||
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>* stats =
|
||||
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>*)_stats.get();
|
||||
|
||||
minPtr = stats->min().ptr;
|
||||
minLen = stats->min().len;
|
||||
maxPtr = stats->max().ptr;
|
||||
maxLen = stats->max().len;
|
||||
} else if(pqType == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
|
||||
// It seems like parquet-cpp doesn't actually produce stats for FLBA yet, so
|
||||
// rather than have untested code here, we'll just short circuit.
|
||||
//
|
||||
// Once I can get my hands on such a file, it should be easy to add support.
|
||||
return true;
|
||||
} else {
|
||||
// Should be impossible to get here
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesBlobFilter called on unsupported type: " <<
|
||||
parquet::TypeToString(pqType);
|
||||
throw std::invalid_argument(ss.str());
|
||||
}
|
||||
|
||||
const std::vector<unsigned char>& blob = constraint.blobValue;
|
||||
|
||||
switch(constraint.op) {
|
||||
case Is:
|
||||
case Equal:
|
||||
{
|
||||
bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0;
|
||||
bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0;
|
||||
|
||||
bool blobGteMinBlob = std::lexicographical_compare(
|
||||
minPtr,
|
||||
minPtr + minLen,
|
||||
&blob[0],
|
||||
&blob[0] + blob.size());
|
||||
|
||||
bool blobLtMaxBlob = std::lexicographical_compare(
|
||||
&blob[0],
|
||||
&blob[0] + blob.size(),
|
||||
maxPtr,
|
||||
maxPtr + maxLen);
|
||||
|
||||
|
||||
return (minEqual || blobGteMinBlob) && (maxEqual || blobLtMaxBlob);
|
||||
}
|
||||
case GreaterThanOrEqual:
|
||||
{
|
||||
bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0;
|
||||
|
||||
return maxEqual || std::lexicographical_compare(
|
||||
&blob[0],
|
||||
&blob[0] + blob.size(),
|
||||
maxPtr,
|
||||
maxPtr + maxLen);
|
||||
}
|
||||
case GreaterThan:
|
||||
return std::lexicographical_compare(
|
||||
&blob[0],
|
||||
&blob[0] + blob.size(),
|
||||
maxPtr,
|
||||
maxPtr + maxLen);
|
||||
case LessThan:
|
||||
return std::lexicographical_compare(
|
||||
minPtr,
|
||||
minPtr + minLen,
|
||||
&blob[0],
|
||||
&blob[0] + blob.size());
|
||||
case LessThanOrEqual:
|
||||
{
|
||||
bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0;
|
||||
return minEqual || std::lexicographical_compare(
|
||||
minPtr,
|
||||
minPtr + minLen,
|
||||
&blob[0],
|
||||
&blob[0] + blob.size());
|
||||
}
|
||||
case NotEqual:
|
||||
{
|
||||
// If min == max == blob, we can skip this.
|
||||
bool blobMaxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0;
|
||||
bool minMaxEqual = minLen == maxLen && memcmp(minPtr, maxPtr, minLen) == 0;
|
||||
return !(blobMaxEqual && minMaxEqual);
|
||||
}
|
||||
case IsNot:
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) {
|
||||
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>* stats =
|
||||
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>*)_stats.get();
|
||||
|
||||
if(!stats->HasMinMax()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if(constraint.type != Text) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const std::string& str = constraint.stringValue;
|
||||
const parquet::ByteArray& min = stats->min();
|
||||
const parquet::ByteArray& max = stats->max();
|
||||
std::string minStr((const char*)min.ptr, min.len);
|
||||
std::string maxStr((const char*)max.ptr, max.len);
|
||||
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data());
|
||||
|
||||
switch(constraint.op) {
|
||||
case Is:
|
||||
case Equal:
|
||||
return str >= minStr && str <= maxStr;
|
||||
case GreaterThanOrEqual:
|
||||
return maxStr >= str;
|
||||
case GreaterThan:
|
||||
return maxStr > str;
|
||||
case LessThan:
|
||||
return minStr < str;
|
||||
case LessThanOrEqual:
|
||||
return minStr <= str;
|
||||
case NotEqual:
|
||||
// If min == max == str, we can skip this.
|
||||
return !(minStr == maxStr && str == minStr);
|
||||
case Like:
|
||||
{
|
||||
const std::string& likeStringValue = constraint.likeStringValue;
|
||||
std::string truncatedMin = minStr.substr(0, likeStringValue.size());
|
||||
std::string truncatedMax = maxStr.substr(0, likeStringValue.size());
|
||||
return likeStringValue.empty() || (likeStringValue >= truncatedMin && likeStringValue <= truncatedMax);
|
||||
}
|
||||
case IsNot:
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t int96toMsSinceEpoch(const parquet::Int96& rv) {
|
||||
__int128 ns = rv.value[0] + ((unsigned long)rv.value[1] << 32);
|
||||
__int128 julianDay = rv.value[2];
|
||||
__int128 nsSinceEpoch = (julianDay - 2440588);
|
||||
nsSinceEpoch *= 86400;
|
||||
nsSinceEpoch *= 1000 * 1000 * 1000;
|
||||
nsSinceEpoch += ns;
|
||||
nsSinceEpoch /= 1000000;
|
||||
return nsSinceEpoch;
|
||||
}
|
||||
|
||||
bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) {
|
||||
if(!_stats->HasMinMax()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if(constraint.type != Integer) {
|
||||
return true;
|
||||
}
|
||||
|
||||
int column = constraint.column;
|
||||
|
||||
int64_t min = std::numeric_limits<int64_t>::min();
|
||||
int64_t max = std::numeric_limits<int64_t>::max();
|
||||
parquet::Type::type pqType = types[column];
|
||||
|
||||
if(pqType == parquet::Type::INT32) {
|
||||
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT32>>* stats =
|
||||
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT32>>*)_stats.get();
|
||||
|
||||
min = stats->min();
|
||||
max = stats->max();
|
||||
} else if(pqType == parquet::Type::INT64) {
|
||||
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT64>>* stats =
|
||||
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT64>>*)_stats.get();
|
||||
|
||||
min = stats->min();
|
||||
max = stats->max();
|
||||
} else if(pqType == parquet::Type::INT96) {
|
||||
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT96>>* stats =
|
||||
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT96>>*)_stats.get();
|
||||
|
||||
min = int96toMsSinceEpoch(stats->min());
|
||||
max = int96toMsSinceEpoch(stats->max());
|
||||
|
||||
} else if(pqType == parquet::Type::BOOLEAN) {
|
||||
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BOOLEAN>>* stats =
|
||||
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BOOLEAN>>*)_stats.get();
|
||||
|
||||
min = stats->min();
|
||||
max = stats->max();
|
||||
|
||||
} else {
|
||||
// Should be impossible to get here as we should have forbidden this at
|
||||
// CREATE time -- maybe file changed underneath us?
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " <<
|
||||
parquet::TypeToString(pqType);
|
||||
throw std::invalid_argument(ss.str());
|
||||
}
|
||||
|
||||
const int64_t value = constraint.intValue;
|
||||
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data());
|
||||
|
||||
switch(constraint.op) {
|
||||
case Is:
|
||||
case Equal:
|
||||
return value >= min && value <= max;
|
||||
case GreaterThanOrEqual:
|
||||
return max >= value;
|
||||
case GreaterThan:
|
||||
return max > value;
|
||||
case LessThan:
|
||||
return min < value;
|
||||
case LessThanOrEqual:
|
||||
return min <= value;
|
||||
case NotEqual:
|
||||
// If min == max == str, we can skip this.
|
||||
return !(min == max && value == min);
|
||||
case Like:
|
||||
case IsNot:
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) {
|
||||
if(!_stats->HasMinMax()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if(constraint.type != Double) {
|
||||
return true;
|
||||
}
|
||||
|
||||
int column = constraint.column;
|
||||
|
||||
double min = std::numeric_limits<double>::min();
|
||||
double max = std::numeric_limits<double>::max();
|
||||
parquet::Type::type pqType = types[column];
|
||||
|
||||
if(pqType == parquet::Type::DOUBLE) {
|
||||
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::DOUBLE>>* stats =
|
||||
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::DOUBLE>>*)_stats.get();
|
||||
|
||||
min = stats->min();
|
||||
max = stats->max();
|
||||
} else if(pqType == parquet::Type::FLOAT) {
|
||||
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::FLOAT>>* stats =
|
||||
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::FLOAT>>*)_stats.get();
|
||||
|
||||
min = stats->min();
|
||||
max = stats->max();
|
||||
} else {
|
||||
// Should be impossible to get here as we should have forbidden this at
|
||||
// CREATE time -- maybe file changed underneath us?
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " <<
|
||||
parquet::TypeToString(pqType);
|
||||
throw std::invalid_argument(ss.str());
|
||||
}
|
||||
|
||||
const double value = constraint.doubleValue;
|
||||
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data());
|
||||
|
||||
switch(constraint.op) {
|
||||
case Is:
|
||||
case Equal:
|
||||
return value >= min && value <= max;
|
||||
case GreaterThanOrEqual:
|
||||
return max >= value;
|
||||
case GreaterThan:
|
||||
return max > value;
|
||||
case LessThan:
|
||||
return min < value;
|
||||
case LessThanOrEqual:
|
||||
return min <= value;
|
||||
case NotEqual:
|
||||
// If min == max == str, we can skip this.
|
||||
return !(min == max && value == min);
|
||||
case Like:
|
||||
case IsNot:
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
}
|
||||
|
||||
bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint& constraint) {
|
||||
if(constraint.type != Text) {
|
||||
return true;
|
||||
}
|
||||
|
||||
parquet::ByteArray* ba = getByteArray(constraint.column);
|
||||
|
||||
switch(constraint.op) {
|
||||
case Is:
|
||||
case Equal:
|
||||
{
|
||||
const std::vector<unsigned char>& blob = constraint.blobValue;
|
||||
|
||||
if(blob.size() != ba->len)
|
||||
return false;
|
||||
|
||||
return 0 == memcmp(&blob[0], ba->ptr, ba->len);
|
||||
}
|
||||
case NotEqual:
|
||||
{
|
||||
const std::vector<unsigned char>& blob = constraint.blobValue;
|
||||
|
||||
if(blob.size() != ba->len)
|
||||
return true;
|
||||
|
||||
return 0 != memcmp(&blob[0], ba->ptr, ba->len);
|
||||
}
|
||||
case GreaterThan:
|
||||
{
|
||||
const std::vector<unsigned char>& blob = constraint.blobValue;
|
||||
|
||||
return std::lexicographical_compare(
|
||||
&blob[0],
|
||||
&blob[0] + blob.size(),
|
||||
ba->ptr,
|
||||
ba->ptr + ba->len);
|
||||
}
|
||||
case GreaterThanOrEqual:
|
||||
{
|
||||
const std::vector<unsigned char>& blob = constraint.blobValue;
|
||||
|
||||
bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len);
|
||||
|
||||
return equal || std::lexicographical_compare(
|
||||
&blob[0],
|
||||
&blob[0] + blob.size(),
|
||||
ba->ptr,
|
||||
ba->ptr + ba->len);
|
||||
}
|
||||
case LessThan:
|
||||
{
|
||||
const std::vector<unsigned char>& blob = constraint.blobValue;
|
||||
|
||||
return std::lexicographical_compare(
|
||||
ba->ptr,
|
||||
ba->ptr + ba->len,
|
||||
&blob[0],
|
||||
&blob[0] + blob.size());
|
||||
}
|
||||
case LessThanOrEqual:
|
||||
{
|
||||
const std::vector<unsigned char>& blob = constraint.blobValue;
|
||||
|
||||
bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len);
|
||||
|
||||
return equal || std::lexicographical_compare(
|
||||
ba->ptr,
|
||||
ba->ptr + ba->len,
|
||||
&blob[0],
|
||||
&blob[0] + blob.size());
|
||||
}
|
||||
case Like:
|
||||
{
|
||||
const std::string& likeStringValue = constraint.likeStringValue;
|
||||
if(likeStringValue.size() > ba->len)
|
||||
return false;
|
||||
|
||||
size_t len = ba->len;
|
||||
if(likeStringValue.size() < len)
|
||||
len = likeStringValue.size();
|
||||
return 0 == memcmp(&likeStringValue[0], ba->ptr, len);
|
||||
}
|
||||
case IsNot:
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool ParquetCursor::currentRowSatisfiesIntegerFilter(Constraint& constraint) {
|
||||
if(constraint.type != Integer) {
|
||||
return true;
|
||||
}
|
||||
|
||||
int column = constraint.column;
|
||||
|
||||
// CONSIDER: should we just store int64s everywhere?
|
||||
int64_t value = 0;
|
||||
|
||||
if(column == -1) {
|
||||
value = rowId;
|
||||
} else {
|
||||
parquet::Type::type pqType = types[column];
|
||||
|
||||
if(pqType == parquet::Type::INT32 || pqType == parquet::Type::BOOLEAN) {
|
||||
value = getInt32(column);
|
||||
} else if(pqType == parquet::Type::INT64 || pqType == parquet::Type::INT96) {
|
||||
value = getInt64(column);
|
||||
} else {
|
||||
// Should be impossible to get here
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": currentRowSatisfiesIntegerFilter called on unsupported type: " <<
|
||||
parquet::TypeToString(pqType);
|
||||
throw std::invalid_argument(ss.str());
|
||||
}
|
||||
}
|
||||
|
||||
int64_t constraintValue = constraint.intValue;
|
||||
|
||||
switch(constraint.op) {
|
||||
case Is:
|
||||
case Equal:
|
||||
return constraintValue == value;
|
||||
case NotEqual:
|
||||
return constraintValue != value;
|
||||
case GreaterThan:
|
||||
return value > constraintValue;
|
||||
case GreaterThanOrEqual:
|
||||
return value >= constraintValue;
|
||||
case LessThan:
|
||||
return value < constraintValue;
|
||||
case LessThanOrEqual:
|
||||
return value <= constraintValue;
|
||||
case Like:
|
||||
case IsNot:
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ParquetCursor::currentRowSatisfiesDoubleFilter(Constraint& constraint) {
|
||||
if(constraint.type != Double) {
|
||||
return true;
|
||||
}
|
||||
|
||||
int column = constraint.column;
|
||||
double value = getDouble(column);
|
||||
double constraintValue = constraint.doubleValue;
|
||||
|
||||
switch(constraint.op) {
|
||||
case Is:
|
||||
case Equal:
|
||||
return constraintValue == value;
|
||||
case NotEqual:
|
||||
return constraintValue != value;
|
||||
case GreaterThan:
|
||||
return value > constraintValue;
|
||||
case GreaterThanOrEqual:
|
||||
return value >= constraintValue;
|
||||
case LessThan:
|
||||
return value < constraintValue;
|
||||
case LessThanOrEqual:
|
||||
return value <= constraintValue;
|
||||
case Like:
|
||||
case IsNot:
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
// Return true if it is _possible_ that the current
|
||||
// rowgroup satisfies the constraints. Only return false
|
||||
// if it definitely does not.
|
||||
//
|
||||
// This avoids opening rowgroups that can't return useful
|
||||
// data, which provides substantial performance benefits.
|
||||
bool ParquetCursor::currentRowGroupSatisfiesFilter() {
|
||||
for(unsigned int i = 0; i < constraints.size(); i++) {
|
||||
int column = constraints[i].column;
|
||||
int op = constraints[i].op;
|
||||
bool rv = true;
|
||||
|
||||
if(column == -1) {
|
||||
rv = currentRowGroupSatisfiesRowIdFilter(constraints[i]);
|
||||
} else {
|
||||
std::unique_ptr<parquet::ColumnChunkMetaData> md = rowGroupMetadata->ColumnChunk(column);
|
||||
if(md->is_stats_set()) {
|
||||
std::shared_ptr<parquet::RowGroupStatistics> stats = md->statistics();
|
||||
|
||||
// SQLite is much looser with types than you might expect if you
|
||||
// come from a Postgres background. The constraint '30.0' (that is,
|
||||
// a string containing a floating point number) should be treated
|
||||
// as equal to a field containing an integer 30.
|
||||
//
|
||||
// This means that even if the parquet physical type is integer,
|
||||
// the constraint type may be a string, so dispatch to the filter
|
||||
// fn based on the Parquet type.
|
||||
|
||||
if(op == IsNull) {
|
||||
rv = stats->null_count() > 0;
|
||||
} else if(op == IsNotNull) {
|
||||
rv = stats->num_values() > 0;
|
||||
} else {
|
||||
parquet::Type::type pqType = types[column];
|
||||
|
||||
if(pqType == parquet::Type::BYTE_ARRAY && logicalTypes[column] == parquet::LogicalType::UTF8) {
|
||||
rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats);
|
||||
} else if(pqType == parquet::Type::BYTE_ARRAY) {
|
||||
rv = currentRowGroupSatisfiesBlobFilter(constraints[i], stats);
|
||||
} else if(pqType == parquet::Type::INT32 ||
|
||||
pqType == parquet::Type::INT64 ||
|
||||
pqType == parquet::Type::INT96 ||
|
||||
pqType == parquet::Type::BOOLEAN) {
|
||||
rv = currentRowGroupSatisfiesIntegerFilter(constraints[i], stats);
|
||||
} else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) {
|
||||
rv = currentRowGroupSatisfiesDoubleFilter(constraints[i], stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// and it with the existing actual, which may have come from a previous run
|
||||
rv = rv && constraints[i].bitmap.getActualMembership(rowGroupId);
|
||||
if(!rv) {
|
||||
constraints[i].bitmap.setEstimatedMembership(rowGroupId, rv);
|
||||
constraints[i].bitmap.setActualMembership(rowGroupId, rv);
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
|
||||
// printf("rowGroup %d %s\n", rowGroupId, overallRv ? "may satisfy" : "does not satisfy");
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ParquetCursor::nextRowGroup() {
|
||||
start:
|
||||
// Ensure that rowId points at the start of this rowGroup (eg, in the case where
|
||||
// we skipped an entire row group).
|
||||
rowId = rowGroupStartRowId + rowGroupSize;
|
||||
|
||||
if((rowGroupId + 1) >= numRowGroups) {
|
||||
return false;
|
||||
}
|
||||
|
||||
while(table->getNumColumns() >= scanners.size()) {
|
||||
scanners.push_back(std::shared_ptr<parquet::Scanner>());
|
||||
// If it doesn't exist, it's the rowId as of the last nextRowGroup call
|
||||
colRows.push_back(rowGroupStartRowId);
|
||||
colNulls.push_back(false);
|
||||
colIntValues.push_back(0);
|
||||
colDoubleValues.push_back(0);
|
||||
colByteArrayValues.push_back(parquet::ByteArray());
|
||||
}
|
||||
|
||||
|
||||
rowGroupStartRowId = rowId;
|
||||
rowGroupId++;
|
||||
rowGroupMetadata = reader->metadata()->RowGroup(rowGroupId);
|
||||
rowGroupSize = rowsLeftInRowGroup = rowGroupMetadata->num_rows();
|
||||
rowGroup = reader->RowGroup(rowGroupId);
|
||||
for(unsigned int i = 0; i < scanners.size(); i++)
|
||||
scanners[i] = NULL;
|
||||
|
||||
while(types.size() < (unsigned int)rowGroupMetadata->num_columns()) {
|
||||
types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type());
|
||||
}
|
||||
|
||||
while(logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) {
|
||||
logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type());
|
||||
}
|
||||
|
||||
for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) {
|
||||
types[i] = rowGroupMetadata->schema()->Column(i)->physical_type();
|
||||
logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type();
|
||||
}
|
||||
|
||||
for(unsigned int i = 0; i < colRows.size(); i++) {
|
||||
colRows[i] = rowId;
|
||||
}
|
||||
|
||||
// Increment rowId so currentRowGroupSatisfiesRowIdFilter can access it;
|
||||
// it'll get decremented by our caller
|
||||
rowId++;
|
||||
|
||||
// We're going to scan this row group; reset the expectation of discovering
|
||||
// a row
|
||||
for(unsigned int i = 0; i < constraints.size(); i++) {
|
||||
if(rowGroupId > 0 && constraints[i].rowGroupId == rowGroupId - 1) {
|
||||
constraints[i].bitmap.setActualMembership(rowGroupId - 1, constraints[i].hadRows);
|
||||
}
|
||||
constraints[i].hadRows = false;
|
||||
}
|
||||
|
||||
if(!currentRowGroupSatisfiesFilter())
|
||||
goto start;
|
||||
|
||||
for(unsigned int i = 0; i < constraints.size(); i++) {
|
||||
constraints[i].rowGroupId = rowGroupId;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Return true if it is _possible_ that the current
|
||||
// row satisfies the constraints. Only return false
|
||||
// if it definitely does not.
|
||||
//
|
||||
// This avoids pointless transitions between the SQLite VM
|
||||
// and the extension, which can add up on a dataset of tens
|
||||
// of millions of rows.
|
||||
bool ParquetCursor::currentRowSatisfiesFilter() {
|
||||
bool overallRv = true;
|
||||
for(unsigned int i = 0; i < constraints.size(); i++) {
|
||||
bool rv = true;
|
||||
int column = constraints[i].column;
|
||||
ensureColumn(column);
|
||||
int op = constraints[i].op;
|
||||
|
||||
if(op == IsNull) {
|
||||
rv = isNull(column);
|
||||
} else if(op == IsNotNull) {
|
||||
rv = !isNull(column);
|
||||
} else {
|
||||
|
||||
if(logicalTypes[column] == parquet::LogicalType::UTF8) {
|
||||
rv = currentRowSatisfiesTextFilter(constraints[i]);
|
||||
} else {
|
||||
parquet::Type::type pqType = types[column];
|
||||
if(pqType == parquet::Type::INT32 ||
|
||||
pqType == parquet::Type::INT64 ||
|
||||
pqType == parquet::Type::INT96 ||
|
||||
pqType == parquet::Type::BOOLEAN) {
|
||||
rv = currentRowSatisfiesIntegerFilter(constraints[i]);
|
||||
} else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) {
|
||||
rv = currentRowSatisfiesDoubleFilter(constraints[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// it defaults to false; so only set it if true
|
||||
// ideally we'd short-circuit if we'd already set this group as visited
|
||||
if(rv) {
|
||||
constraints[i].hadRows = true;
|
||||
}
|
||||
overallRv = overallRv && rv;
|
||||
}
|
||||
return overallRv;
|
||||
}
|
||||
|
||||
void ParquetCursor::next() {
|
||||
// Returns true if we've crossed a row group boundary
|
||||
start:
|
||||
if(rowsLeftInRowGroup == 0) {
|
||||
if(!nextRowGroup()) {
|
||||
// put rowId over the edge so eof returns true
|
||||
rowId = numRows + 1;
|
||||
return;
|
||||
} else {
|
||||
// After a successful nextRowGroup, rowId is pointing at the current row. Make it
|
||||
// point before so the rest of the logic works out.
|
||||
rowId--;
|
||||
}
|
||||
}
|
||||
|
||||
rowsLeftInRowGroup--;
|
||||
rowId++;
|
||||
if(constraints.size() > 0 && !currentRowSatisfiesFilter())
|
||||
goto start;
|
||||
}
|
||||
|
||||
int ParquetCursor::getRowId() {
|
||||
return rowId;
|
||||
}
|
||||
|
||||
bool ParquetCursor::eof() {
|
||||
return rowId > numRows;
|
||||
}
|
||||
|
||||
void ParquetCursor::ensureColumn(int col) {
|
||||
// -1 signals rowid, which is trivially available
|
||||
if(col == -1)
|
||||
return;
|
||||
|
||||
// need to ensure a scanner exists (and skip the # of rows in the rowgroup)
|
||||
if(scanners[col].get() == NULL) {
|
||||
std::shared_ptr<parquet::ColumnReader> colReader = rowGroup->Column(col);
|
||||
scanners[col] = parquet::Scanner::Make(colReader);
|
||||
}
|
||||
|
||||
// Actually fetch a value, stash data in colRows, colNulls, colValues
|
||||
if(colRows[col] != rowId) {
|
||||
// We may need to skip some records, eg, a query like
|
||||
// SELECT a WHERE b = 10
|
||||
// may have read b, but skipped a until b matches the predicate.
|
||||
bool wasNull = false;
|
||||
while(colRows[col] + 1 < rowId) {
|
||||
switch(types[col]) {
|
||||
case parquet::Type::INT32:
|
||||
{
|
||||
parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get();
|
||||
int rv = 0;
|
||||
s->NextValue(&rv, &wasNull);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::FLOAT:
|
||||
{
|
||||
parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get();
|
||||
float rv = 0;
|
||||
s->NextValue(&rv, &wasNull);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::DOUBLE:
|
||||
{
|
||||
parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get();
|
||||
double rv = 0;
|
||||
s->NextValue(&rv, &wasNull);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::BYTE_ARRAY:
|
||||
{
|
||||
parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get();
|
||||
parquet::ByteArray ba;
|
||||
s->NextValue(&ba, &wasNull);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::INT96:
|
||||
{
|
||||
parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get();
|
||||
parquet::Int96 rv;
|
||||
s->NextValue(&rv, &wasNull);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::INT64:
|
||||
{
|
||||
parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get();
|
||||
long rv = 0;
|
||||
s->NextValue(&rv, &wasNull);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::BOOLEAN:
|
||||
{
|
||||
parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get();
|
||||
bool rv = false;
|
||||
s->NextValue(&rv, &wasNull);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
|
||||
{
|
||||
parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get();
|
||||
parquet::FixedLenByteArray flba;
|
||||
s->NextValue(&flba, &wasNull);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// Should be impossible to get here as we should have forbidden this at
|
||||
// CREATE time -- maybe file changed underneath us?
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
|
||||
parquet::TypeToString(types[col]);
|
||||
throw std::invalid_argument(ss.str());
|
||||
break;
|
||||
|
||||
}
|
||||
colRows[col]++;
|
||||
}
|
||||
|
||||
colRows[col] = rowId;
|
||||
wasNull = false;
|
||||
|
||||
bool hadValue = false;
|
||||
switch(types[col]) {
|
||||
case parquet::Type::INT32:
|
||||
{
|
||||
parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get();
|
||||
int rv = 0;
|
||||
hadValue = s->NextValue(&rv, &wasNull);
|
||||
colIntValues[col] = rv;
|
||||
break;
|
||||
}
|
||||
case parquet::Type::FLOAT:
|
||||
{
|
||||
parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get();
|
||||
float rv = 0;
|
||||
hadValue = s->NextValue(&rv, &wasNull);
|
||||
colDoubleValues[col] = rv;
|
||||
break;
|
||||
}
|
||||
case parquet::Type::DOUBLE:
|
||||
{
|
||||
parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get();
|
||||
double rv = 0;
|
||||
hadValue = s->NextValue(&rv, &wasNull);
|
||||
colDoubleValues[col] = rv;
|
||||
break;
|
||||
}
|
||||
case parquet::Type::BYTE_ARRAY:
|
||||
{
|
||||
parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get();
|
||||
hadValue = s->NextValue(&colByteArrayValues[col], &wasNull);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::INT96:
|
||||
{
|
||||
// INT96 tracks a date with nanosecond precision, convert to ms since epoch.
|
||||
// ...see https://github.com/apache/parquet-format/pull/49 for more
|
||||
//
|
||||
// First 8 bytes: nanoseconds into the day
|
||||
// Last 4 bytes: Julian day
|
||||
// To get nanoseconds since the epoch:
|
||||
// (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds
|
||||
parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get();
|
||||
parquet::Int96 rv {0, 0, 0};
|
||||
hadValue = s->NextValue(&rv, &wasNull);
|
||||
colIntValues[col] = int96toMsSinceEpoch(rv);
|
||||
break;
|
||||
}
|
||||
case parquet::Type::INT64:
|
||||
{
|
||||
parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get();
|
||||
long rv = 0;
|
||||
hadValue = s->NextValue(&rv, &wasNull);
|
||||
colIntValues[col] = rv;
|
||||
break;
|
||||
}
|
||||
|
||||
case parquet::Type::BOOLEAN:
|
||||
{
|
||||
parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get();
|
||||
bool rv = false;
|
||||
hadValue = s->NextValue(&rv, &wasNull);
|
||||
colIntValues[col] = rv ? 1 : 0;
|
||||
break;
|
||||
}
|
||||
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
|
||||
{
|
||||
parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get();
|
||||
parquet::FixedLenByteArray flba;
|
||||
hadValue = s->NextValue(&flba, &wasNull);
|
||||
colByteArrayValues[col].ptr = flba.ptr;
|
||||
// TODO: cache this
|
||||
colByteArrayValues[col].len = rowGroupMetadata->schema()->Column(col)->type_length();
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// Should be impossible to get here as we should have forbidden this at
|
||||
// CREATE time -- maybe file changed underneath us?
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
|
||||
parquet::TypeToString(types[col]);
|
||||
throw std::invalid_argument(ss.str());
|
||||
break;
|
||||
}
|
||||
|
||||
if(!hadValue)
|
||||
throw std::invalid_argument("unexpectedly lacking a next value");
|
||||
|
||||
colNulls[col] = wasNull;
|
||||
}
|
||||
}
|
||||
|
||||
bool ParquetCursor::isNull(int col) {
|
||||
// -1 is rowid, which is trivially non null
|
||||
if(col == -1)
|
||||
return false;
|
||||
|
||||
return colNulls[col];
|
||||
}
|
||||
|
||||
int ParquetCursor::getInt32(int col) {
|
||||
return colIntValues[col];
|
||||
}
|
||||
|
||||
long ParquetCursor::getInt64(int col) {
|
||||
return colIntValues[col];
|
||||
}
|
||||
|
||||
double ParquetCursor::getDouble(int col) {
|
||||
return colDoubleValues[col];
|
||||
}
|
||||
|
||||
parquet::ByteArray* ParquetCursor::getByteArray(int col) {
|
||||
return &colByteArrayValues[col];
|
||||
}
|
||||
|
||||
parquet::Type::type ParquetCursor::getPhysicalType(int col) {
|
||||
return types[col];
|
||||
}
|
||||
|
||||
parquet::LogicalType::type ParquetCursor::getLogicalType(int col) {
|
||||
return logicalTypes[col];
|
||||
}
|
||||
|
||||
void ParquetCursor::close() {
|
||||
if(reader != NULL) {
|
||||
reader->Close();
|
||||
}
|
||||
}
|
||||
|
||||
void ParquetCursor::reset(std::vector<Constraint> constraints) {
|
||||
close();
|
||||
this->constraints = constraints;
|
||||
rowId = 0;
|
||||
// TODO: consider having a long lived handle in ParquetTable that can be borrowed
|
||||
// without incurring the cost of opening the file from scratch twice
|
||||
reader = parquet::ParquetFileReader::OpenFile(
|
||||
table->getFile().data(),
|
||||
true,
|
||||
parquet::default_reader_properties(),
|
||||
table->getMetadata());
|
||||
|
||||
rowGroupId = -1;
|
||||
rowGroupSize = 0;
|
||||
rowGroupStartRowId = 0;
|
||||
// TODO: handle the case where rowgroups have disjoint schemas?
|
||||
// TODO: or at least, fail fast if detected
|
||||
rowsLeftInRowGroup = 0;
|
||||
|
||||
numRows = reader->metadata()->num_rows();
|
||||
numRowGroups = reader->metadata()->num_row_groups();
|
||||
}
|
||||
|
||||
ParquetTable* ParquetCursor::getTable() const { return table; }
|
||||
|
||||
unsigned int ParquetCursor::getNumRowGroups() const { return numRowGroups; }
|
||||
unsigned int ParquetCursor::getNumConstraints() const { return constraints.size(); }
|
||||
const Constraint& ParquetCursor::getConstraint(unsigned int i) const { return constraints[i]; }
|
||||
|
||||
|
73
src/parquet_cursor.h
Normal file
73
src/parquet_cursor.h
Normal file
@@ -0,0 +1,73 @@
|
||||
#ifndef PARQUET_CURSOR_H
|
||||
#define PARQUET_CURSOR_H
|
||||
|
||||
#include "parquet_filter.h"
|
||||
#include "parquet_table.h"
|
||||
#include "parquet/api/reader.h"
|
||||
|
||||
class ParquetCursor {
|
||||
|
||||
ParquetTable* table;
|
||||
std::unique_ptr<parquet::ParquetFileReader> reader;
|
||||
std::unique_ptr<parquet::RowGroupMetaData> rowGroupMetadata;
|
||||
std::shared_ptr<parquet::RowGroupReader> rowGroup;
|
||||
std::vector<std::shared_ptr<parquet::Scanner>> scanners;
|
||||
std::vector<parquet::Type::type> types;
|
||||
std::vector<parquet::LogicalType::type> logicalTypes;
|
||||
|
||||
std::vector<int> colRows;
|
||||
std::vector<bool> colNulls;
|
||||
std::vector<int64_t> colIntValues;
|
||||
std::vector<double> colDoubleValues;
|
||||
std::vector<parquet::ByteArray> colByteArrayValues;
|
||||
|
||||
int rowId;
|
||||
int rowGroupId;
|
||||
int rowGroupStartRowId;
|
||||
int rowGroupSize;
|
||||
int numRows;
|
||||
int numRowGroups;
|
||||
int rowsLeftInRowGroup;
|
||||
|
||||
bool nextRowGroup();
|
||||
|
||||
std::vector<Constraint> constraints;
|
||||
|
||||
bool currentRowSatisfiesFilter();
|
||||
bool currentRowGroupSatisfiesFilter();
|
||||
bool currentRowGroupSatisfiesRowIdFilter(Constraint& constraint);
|
||||
bool currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats);
|
||||
bool currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats);
|
||||
bool currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats);
|
||||
bool currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats);
|
||||
|
||||
bool currentRowSatisfiesTextFilter(Constraint& constraint);
|
||||
bool currentRowSatisfiesIntegerFilter(Constraint& constraint);
|
||||
bool currentRowSatisfiesDoubleFilter(Constraint& constraint);
|
||||
|
||||
|
||||
public:
|
||||
ParquetCursor(ParquetTable* table);
|
||||
int getRowId();
|
||||
void next();
|
||||
void close();
|
||||
void reset(std::vector<Constraint> constraints);
|
||||
bool eof();
|
||||
|
||||
void ensureColumn(int col);
|
||||
bool isNull(int col);
|
||||
unsigned int getNumRowGroups() const;
|
||||
unsigned int getNumConstraints() const;
|
||||
const Constraint& getConstraint(unsigned int i) const;
|
||||
parquet::Type::type getPhysicalType(int col);
|
||||
parquet::LogicalType::type getLogicalType(int col);
|
||||
ParquetTable* getTable() const;
|
||||
|
||||
int getInt32(int col);
|
||||
long getInt64(int col);
|
||||
double getDouble(int col);
|
||||
parquet::ByteArray* getByteArray(int col);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
105
src/parquet_filter.cc
Normal file
105
src/parquet_filter.cc
Normal file
@@ -0,0 +1,105 @@
|
||||
#include "parquet_filter.h"
|
||||
|
||||
Constraint::Constraint(
|
||||
RowGroupBitmap bitmap,
|
||||
int column,
|
||||
std::string columnName,
|
||||
ConstraintOperator op,
|
||||
ValueType type,
|
||||
int64_t intValue,
|
||||
double doubleValue,
|
||||
std::vector<unsigned char> blobValue
|
||||
): bitmap(bitmap),
|
||||
column(column),
|
||||
columnName(columnName),
|
||||
op(op),
|
||||
type(type),
|
||||
intValue(intValue),
|
||||
doubleValue(doubleValue),
|
||||
blobValue(blobValue),
|
||||
hadRows(false) {
|
||||
RowGroupBitmap bm = bitmap;
|
||||
this->bitmap = bm;
|
||||
|
||||
if(type == Text) {
|
||||
stringValue = std::string((char*)&blobValue[0], blobValue.size());
|
||||
|
||||
if(op == Like) {
|
||||
// This permits more rowgroups than is strictly needed
|
||||
// since it assumes an implicit wildcard. But it's
|
||||
// simple to implement, so we'll go with it.
|
||||
likeStringValue = stringValue;
|
||||
size_t idx = likeStringValue.find_first_of("%");
|
||||
if(idx != std::string::npos) {
|
||||
likeStringValue = likeStringValue.substr(0, idx);
|
||||
}
|
||||
idx = likeStringValue.find_first_of("_");
|
||||
if(idx != std::string::npos) {
|
||||
likeStringValue = likeStringValue.substr(0, idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string Constraint::describe() const {
|
||||
std::string rv;
|
||||
rv.append(columnName);
|
||||
rv.append(" ");
|
||||
switch(op) {
|
||||
case Equal:
|
||||
rv.append("=");
|
||||
break;
|
||||
case GreaterThan:
|
||||
rv.append(">");
|
||||
break;
|
||||
case LessThanOrEqual:
|
||||
rv.append("<=");
|
||||
break;
|
||||
case LessThan:
|
||||
rv.append("<");
|
||||
break;
|
||||
case GreaterThanOrEqual:
|
||||
rv.append(">=");
|
||||
break;
|
||||
case Like:
|
||||
rv.append("LIKE");
|
||||
break;
|
||||
case Glob:
|
||||
rv.append("GLOB");
|
||||
break;
|
||||
case NotEqual:
|
||||
rv.append("<>");
|
||||
break;
|
||||
case IsNot:
|
||||
rv.append("IS NOT");
|
||||
break;
|
||||
case IsNotNull:
|
||||
rv.append("IS NOT NULL");
|
||||
break;
|
||||
case IsNull:
|
||||
rv.append("IS NULL");
|
||||
break;
|
||||
case Is:
|
||||
rv.append("IS");
|
||||
break;
|
||||
}
|
||||
rv.append(" ");
|
||||
|
||||
switch(type) {
|
||||
case Null:
|
||||
rv.append("NULL");
|
||||
break;
|
||||
case Integer:
|
||||
rv.append(std::to_string(intValue));
|
||||
break;
|
||||
case Double:
|
||||
rv.append(std::to_string(doubleValue));
|
||||
break;
|
||||
case Blob:
|
||||
break;
|
||||
case Text:
|
||||
rv.append(stringValue);
|
||||
break;
|
||||
}
|
||||
return rv;
|
||||
}
|
120
src/parquet_filter.h
Normal file
120
src/parquet_filter.h
Normal file
@@ -0,0 +1,120 @@
|
||||
#ifndef PARQUET_FILTER_H
|
||||
#define PARQUET_FILTER_H
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <cstdint>
|
||||
|
||||
enum ConstraintOperator {
|
||||
Equal,
|
||||
GreaterThan,
|
||||
LessThanOrEqual,
|
||||
LessThan,
|
||||
GreaterThanOrEqual,
|
||||
Like,
|
||||
Glob,
|
||||
NotEqual,
|
||||
IsNot,
|
||||
IsNotNull,
|
||||
IsNull,
|
||||
Is
|
||||
};
|
||||
|
||||
enum ValueType {
|
||||
Null,
|
||||
Integer,
|
||||
Double,
|
||||
Blob,
|
||||
Text
|
||||
};
|
||||
|
||||
class RowGroupBitmap {
|
||||
void setBit(std::vector<unsigned char>& membership, unsigned int rowGroup, bool isSet) {
|
||||
int byte = rowGroup / 8;
|
||||
int offset = rowGroup % 8;
|
||||
unsigned char c = membership[byte];
|
||||
c &= ~(1UL << offset);
|
||||
if(isSet) {
|
||||
c |= 1UL << offset;
|
||||
}
|
||||
membership[byte] = c;
|
||||
}
|
||||
// Compares estimated rowGroupFilter results against observed results
|
||||
// when we explored the row group. This lets us cache
|
||||
public:
|
||||
RowGroupBitmap(unsigned int totalRowGroups) {
|
||||
// Initialize everything to assume that all row groups match.
|
||||
// As we discover otherwise, we'll update that assumption.
|
||||
for(unsigned int i = 0; i < (totalRowGroups + 7) / 8; i++) {
|
||||
estimatedMembership.push_back(0xFF);
|
||||
actualMembership.push_back(0xFF);
|
||||
}
|
||||
}
|
||||
|
||||
RowGroupBitmap(
|
||||
std::vector<unsigned char> estimatedMembership,
|
||||
std::vector<unsigned char> actualMembership) :
|
||||
estimatedMembership(estimatedMembership),
|
||||
actualMembership(actualMembership) {
|
||||
}
|
||||
|
||||
std::vector<unsigned char> estimatedMembership;
|
||||
std::vector<unsigned char> actualMembership;
|
||||
// Pass false only if definitely does not have rows
|
||||
void setEstimatedMembership(unsigned int rowGroup, bool hasRows) {
|
||||
setBit(estimatedMembership, rowGroup, hasRows);
|
||||
}
|
||||
|
||||
// Pass false only after exhausting all rows
|
||||
void setActualMembership(unsigned int rowGroup, bool hadRows) {
|
||||
setBit(actualMembership, rowGroup, hadRows);
|
||||
}
|
||||
|
||||
bool getActualMembership(unsigned int rowGroup) {
|
||||
int byte = rowGroup / 8;
|
||||
int offset = rowGroup % 8;
|
||||
|
||||
return (actualMembership[byte] >> offset) & 1U;
|
||||
}
|
||||
};
|
||||
|
||||
class Constraint {
|
||||
public:
|
||||
// Kind of a messy constructor function, but it's just for internal use, so whatever.
|
||||
Constraint(
|
||||
RowGroupBitmap bitmap,
|
||||
int column,
|
||||
std::string columnName,
|
||||
ConstraintOperator op,
|
||||
ValueType type,
|
||||
int64_t intValue,
|
||||
double doubleValue,
|
||||
std::vector<unsigned char> blobValue
|
||||
);
|
||||
|
||||
RowGroupBitmap bitmap;
|
||||
int column; // underlying column in the query
|
||||
std::string columnName;
|
||||
ConstraintOperator op;
|
||||
ValueType type;
|
||||
|
||||
int64_t intValue;
|
||||
double doubleValue;
|
||||
std::vector<unsigned char> blobValue;
|
||||
// Only set when blobValue is set
|
||||
std::string stringValue;
|
||||
|
||||
// Only set when stringValue is set and op == Like
|
||||
std::string likeStringValue;
|
||||
|
||||
// A unique identifier for this constraint, e.g.
|
||||
// col0 = 'Dawson Creek'
|
||||
std::string describe() const;
|
||||
|
||||
// This is a temp field used while evaluating if a rowgroup had rows
|
||||
// that matched this constraint.
|
||||
int rowGroupId;
|
||||
bool hadRows;
|
||||
};
|
||||
|
||||
#endif
|
155
src/parquet_table.cc
Normal file
155
src/parquet_table.cc
Normal file
@@ -0,0 +1,155 @@
|
||||
#include "parquet_table.h"
|
||||
|
||||
#include "parquet/api/reader.h"
|
||||
|
||||
ParquetTable::ParquetTable(std::string file, std::string tableName): file(file), tableName(tableName) {
|
||||
std::unique_ptr<parquet::ParquetFileReader> reader = parquet::ParquetFileReader::OpenFile(file.data());
|
||||
metadata = reader->metadata();
|
||||
}
|
||||
|
||||
std::string ParquetTable::columnName(int i) {
|
||||
if(i == -1)
|
||||
return "rowid";
|
||||
return columnNames[i];
|
||||
}
|
||||
|
||||
unsigned int ParquetTable::getNumColumns() {
|
||||
return columnNames.size();
|
||||
}
|
||||
|
||||
|
||||
std::string ParquetTable::CreateStatement() {
|
||||
std::unique_ptr<parquet::ParquetFileReader> reader = parquet::ParquetFileReader::OpenFile(
|
||||
file.data(),
|
||||
true,
|
||||
parquet::default_reader_properties(),
|
||||
metadata);
|
||||
std::string text("CREATE TABLE x(");
|
||||
auto schema = reader->metadata()->schema();
|
||||
|
||||
for(auto i = 0; i < schema->num_columns(); i++) {
|
||||
auto _col = schema->GetColumnRoot(i);
|
||||
columnNames.push_back(_col->name());
|
||||
}
|
||||
|
||||
for(auto i = 0; i < schema->num_columns(); i++) {
|
||||
auto _col = schema->GetColumnRoot(i);
|
||||
|
||||
if(!_col->is_primitive()) {
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-primitive type";
|
||||
throw std::invalid_argument(ss.str());
|
||||
}
|
||||
|
||||
if(_col->is_repeated()) {
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-scalar type";
|
||||
throw std::invalid_argument(ss.str());
|
||||
}
|
||||
|
||||
parquet::schema::PrimitiveNode* col = (parquet::schema::PrimitiveNode*)_col;
|
||||
|
||||
if(i > 0)
|
||||
text += ", ";
|
||||
|
||||
text += "\"";
|
||||
// Horrifically inefficient, but easy to understand.
|
||||
std::string colName = col->name();
|
||||
for(char& c : colName) {
|
||||
if(c == '"')
|
||||
text += "\"\"";
|
||||
else
|
||||
text += c;
|
||||
}
|
||||
text += "\"";
|
||||
|
||||
std::string type;
|
||||
|
||||
parquet::Type::type physical = col->physical_type();
|
||||
parquet::LogicalType::type logical = col->logical_type();
|
||||
// Be explicit about which types we understand so we don't mislead someone
|
||||
// whose unsigned ints start getting interpreted as signed. (We could
|
||||
// support this for UINT_8/16/32 -- and for UINT_64 we could throw if
|
||||
// the high bit was set.)
|
||||
if(logical == parquet::LogicalType::NONE ||
|
||||
logical == parquet::LogicalType::UTF8 ||
|
||||
logical == parquet::LogicalType::DATE ||
|
||||
logical == parquet::LogicalType::TIME_MILLIS ||
|
||||
logical == parquet::LogicalType::TIMESTAMP_MILLIS ||
|
||||
logical == parquet::LogicalType::TIME_MICROS ||
|
||||
logical == parquet::LogicalType::TIMESTAMP_MICROS ||
|
||||
logical == parquet::LogicalType::INT_8 ||
|
||||
logical == parquet::LogicalType::INT_16 ||
|
||||
logical == parquet::LogicalType::INT_32 ||
|
||||
logical == parquet::LogicalType::INT_64) {
|
||||
switch(physical) {
|
||||
case parquet::Type::BOOLEAN:
|
||||
type = "TINYINT";
|
||||
break;
|
||||
case parquet::Type::INT32:
|
||||
if(logical == parquet::LogicalType::NONE ||
|
||||
logical == parquet::LogicalType::INT_32) {
|
||||
type = "INT";
|
||||
} else if(logical == parquet::LogicalType::INT_8) {
|
||||
type = "TINYINT";
|
||||
} else if(logical == parquet::LogicalType::INT_16) {
|
||||
type = "SMALLINT";
|
||||
}
|
||||
break;
|
||||
case parquet::Type::INT96:
|
||||
// INT96 is used for nanosecond precision on timestamps; we truncate
|
||||
// to millisecond precision.
|
||||
case parquet::Type::INT64:
|
||||
type = "BIGINT";
|
||||
break;
|
||||
case parquet::Type::FLOAT:
|
||||
type = "REAL";
|
||||
break;
|
||||
case parquet::Type::DOUBLE:
|
||||
type = "DOUBLE";
|
||||
break;
|
||||
case parquet::Type::BYTE_ARRAY:
|
||||
if(logical == parquet::LogicalType::UTF8) {
|
||||
type = "TEXT";
|
||||
} else {
|
||||
type = "BLOB";
|
||||
}
|
||||
break;
|
||||
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
|
||||
type = "BLOB";
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(type.empty()) {
|
||||
std::ostringstream ss;
|
||||
ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has unsupported type: " <<
|
||||
parquet::TypeToString(physical) << "/" << parquet::LogicalTypeToString(logical);
|
||||
|
||||
throw std::invalid_argument(ss.str());
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
printf("col %d[name=%s, p=%d:%s, l=%d:%s] is %s\n",
|
||||
i,
|
||||
col->name().data(),
|
||||
col->physical_type(),
|
||||
parquet::TypeToString(col->physical_type()).data(),
|
||||
col->logical_type(),
|
||||
parquet::LogicalTypeToString(col->logical_type()).data(),
|
||||
type.data());
|
||||
#endif
|
||||
|
||||
text += " ";
|
||||
text += type;
|
||||
}
|
||||
text +=");";
|
||||
return text;
|
||||
}
|
||||
|
||||
std::shared_ptr<parquet::FileMetaData> ParquetTable::getMetadata() { return metadata; }
|
||||
|
||||
const std::string& ParquetTable::getFile() { return file; }
|
||||
const std::string& ParquetTable::getTableName() { return tableName; }
|
25
src/parquet_table.h
Normal file
25
src/parquet_table.h
Normal file
@@ -0,0 +1,25 @@
|
||||
#ifndef PARQUET_TABLE_H
|
||||
#define PARQUET_TABLE_H
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include "parquet/api/reader.h"
|
||||
|
||||
class ParquetTable {
|
||||
std::string file;
|
||||
std::string tableName;
|
||||
std::vector<std::string> columnNames;
|
||||
std::shared_ptr<parquet::FileMetaData> metadata;
|
||||
|
||||
|
||||
public:
|
||||
ParquetTable(std::string file, std::string tableName);
|
||||
std::string CreateStatement();
|
||||
std::string columnName(int idx);
|
||||
unsigned int getNumColumns();
|
||||
std::shared_ptr<parquet::FileMetaData> getMetadata();
|
||||
const std::string& getFile();
|
||||
const std::string& getTableName();
|
||||
};
|
||||
|
||||
#endif
|
Reference in New Issue
Block a user