mirror of
https://github.com/cldellow/sqlite-parquet-vtable.git
synced 2025-09-14 22:39:59 +00:00
Cache clauses -> row group mapping
Create a shadow table. For `stats`, it'd be `_stats_rowgroups`. It contains three columns: - the clause (eg `city = 'Dawson Creek'`) - the initial estimate, as a bitmap of rowgroups based on stats - the actual observed rowgroups, as a bitmap This papers over poorly sorted parquet files, at the cost of some disk space. It makes interactive queries much more natural -- drilldown style queries are much faster, as they can leverage work done by previous queries. eg 'SELECT * FROM stats WHERE city = 'Dawson Creek' and question_id >= 1935 and question_id <= 1940` takes ~584ms on first run, but 9ms on subsequent runs. We only create entries when the estimates don't match the actual results. Fixes #6
This commit is contained in:
@@ -17,6 +17,7 @@ SQLITE_EXTENSION_INIT1
|
||||
#include <stdarg.h>
|
||||
#include <ctype.h>
|
||||
#include <stdio.h>
|
||||
#include <iomanip>
|
||||
|
||||
#include <memory>
|
||||
|
||||
@@ -32,6 +33,7 @@ static int parquetConnect(sqlite3*, void*, int, const char*const*,
|
||||
sqlite3_vtab**,char**);
|
||||
static int parquetBestIndex(sqlite3_vtab*,sqlite3_index_info*);
|
||||
static int parquetDisconnect(sqlite3_vtab*);
|
||||
static int parquetDestroy(sqlite3_vtab*);
|
||||
static int parquetOpen(sqlite3_vtab*, sqlite3_vtab_cursor**);
|
||||
static int parquetClose(sqlite3_vtab_cursor*);
|
||||
static int parquetFilter(sqlite3_vtab_cursor*, int idxNum, const char *idxStr,
|
||||
@@ -45,6 +47,7 @@ static int parquetRowid(sqlite3_vtab_cursor*,sqlite3_int64*);
|
||||
typedef struct sqlite3_vtab_parquet {
|
||||
sqlite3_vtab base; /* Base class. Must be first */
|
||||
ParquetTable* table;
|
||||
sqlite3* db;
|
||||
} sqlite3_vtab_parquet;
|
||||
|
||||
|
||||
@@ -54,6 +57,21 @@ typedef struct sqlite3_vtab_cursor_parquet {
|
||||
ParquetCursor* cursor;
|
||||
} sqlite3_vtab_cursor_parquet;
|
||||
|
||||
static int parquetDestroy(sqlite3_vtab *pVtab) {
|
||||
sqlite3_vtab_parquet *p = (sqlite3_vtab_parquet*)pVtab;
|
||||
|
||||
// Clean up our shadow table. This is useful if the user has recreated
|
||||
// the parquet file, and our mappings would now be invalid.
|
||||
std::string drop = "DROP TABLE IF EXISTS _";
|
||||
drop.append(p->table->getTableName());
|
||||
drop.append("_rowgroups");
|
||||
int rv = sqlite3_exec(p->db, drop.data(), 0, 0, 0);
|
||||
if(rv != 0)
|
||||
return rv;
|
||||
|
||||
return SQLITE_OK;
|
||||
}
|
||||
|
||||
/*
|
||||
** This method is the destructor fo a sqlite3_vtab_parquet object.
|
||||
*/
|
||||
@@ -78,6 +96,7 @@ static int parquetConnect(
|
||||
return SQLITE_ERROR;
|
||||
}
|
||||
|
||||
std::string tableName = argv[2];
|
||||
// Remove the delimiting single quotes
|
||||
std::string fname = argv[3];
|
||||
fname = fname.substr(1, fname.length() - 2);
|
||||
@@ -87,7 +106,7 @@ static int parquetConnect(
|
||||
memset(vtab.get(), 0, sizeof(*vtab.get()));
|
||||
|
||||
try {
|
||||
std::unique_ptr<ParquetTable> table(new ParquetTable(fname));
|
||||
std::unique_ptr<ParquetTable> table(new ParquetTable(fname, tableName));
|
||||
|
||||
std::string create = table->CreateStatement();
|
||||
int rc = sqlite3_declare_vtab(db, create.data());
|
||||
@@ -95,6 +114,7 @@ static int parquetConnect(
|
||||
return rc;
|
||||
|
||||
vtab->table = table.release();
|
||||
vtab->db = db;
|
||||
*ppVtab = (sqlite3_vtab*)vtab.release();
|
||||
return SQLITE_OK;
|
||||
} catch (const std::exception& e) {
|
||||
@@ -119,16 +139,81 @@ static int parquetCreate(
|
||||
sqlite3_vtab **ppVtab,
|
||||
char **pzErr
|
||||
){
|
||||
return parquetConnect(db, pAux, argc, argv, ppVtab, pzErr);
|
||||
try {
|
||||
// Create shadow table for storing constraint -> rowid mappings
|
||||
std::string create = "CREATE TABLE IF NOT EXISTS _";
|
||||
create.append(argv[2]);
|
||||
create.append("_rowgroups(clause TEXT, estimate BLOB, actual BLOB)");
|
||||
int rv = sqlite3_exec(db, create.data(), 0, 0, 0);
|
||||
if(rv != 0)
|
||||
return rv;
|
||||
|
||||
create = "CREATE UNIQUE INDEX IF NOT EXISTS _";
|
||||
create.append(argv[2]);
|
||||
create.append("_index ON _");
|
||||
create.append(argv[2]);
|
||||
create.append("_rowgroups(clause)");
|
||||
rv = sqlite3_exec(db, create.data(), 0, 0, 0);
|
||||
|
||||
return parquetConnect(db, pAux, argc, argv, ppVtab, pzErr);
|
||||
} catch (std::bad_alloc& ba) {
|
||||
return SQLITE_NOMEM;
|
||||
}
|
||||
}
|
||||
|
||||
std::string quoteBlob(const std::vector<unsigned char>& bytes) {
|
||||
std::ostringstream ss;
|
||||
ss << "X'" << std::hex;
|
||||
for(unsigned int i = 0; i < bytes.size(); i++) {
|
||||
ss << std::setfill('0') << std::setw(2) << (unsigned int)(unsigned char)bytes[i];
|
||||
}
|
||||
ss << "'";
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
void persistConstraints(sqlite3* db, ParquetCursor* cursor) {
|
||||
for(unsigned int i = 0; i < cursor->getNumConstraints(); i++) {
|
||||
const Constraint& constraint = cursor->getConstraint(i);
|
||||
const std::vector<unsigned char>& estimated = constraint.bitmap.estimatedMembership;
|
||||
const std::vector<unsigned char>& actual = constraint.bitmap.actualMembership;
|
||||
if(estimated == actual) {
|
||||
continue;
|
||||
}
|
||||
std::string desc = constraint.describe();
|
||||
|
||||
std::string estimatedStr = quoteBlob(estimated);
|
||||
std::string actualStr = quoteBlob(actual);
|
||||
|
||||
// This is only advisory, so ignore failures.
|
||||
char* sql = sqlite3_mprintf(
|
||||
"INSERT OR REPLACE INTO _%s_rowgroups(clause, estimate, actual) VALUES ('%q', %s, %s)",
|
||||
cursor->getTable()->getTableName().c_str(),
|
||||
desc.c_str(),
|
||||
estimatedStr.c_str(),
|
||||
actualStr.c_str());
|
||||
|
||||
|
||||
if(sql == NULL)
|
||||
return;
|
||||
|
||||
sqlite3_exec(db, sql, 0, 0, 0);
|
||||
sqlite3_free(sql);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
** Destructor for a sqlite3_vtab_cursor_parquet.
|
||||
*/
|
||||
static int parquetClose(sqlite3_vtab_cursor *cur){
|
||||
sqlite3_vtab_cursor_parquet* p = (sqlite3_vtab_cursor_parquet*)cur;
|
||||
p->cursor->close();
|
||||
delete p->cursor;
|
||||
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
|
||||
sqlite3_vtab_parquet* vtab_parquet = (sqlite3_vtab_parquet*)(vtab_cursor_parquet->base.pVtab);
|
||||
ParquetCursor* cursor = vtab_cursor_parquet->cursor;
|
||||
persistConstraints(vtab_parquet->db, cursor);
|
||||
|
||||
vtab_cursor_parquet->cursor->close();
|
||||
delete vtab_cursor_parquet->cursor;
|
||||
sqlite3_free(cur);
|
||||
return SQLITE_OK;
|
||||
}
|
||||
@@ -196,7 +281,8 @@ const char* opName(int op) {
|
||||
*/
|
||||
static int parquetNext(sqlite3_vtab_cursor *cur){
|
||||
try {
|
||||
ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor;
|
||||
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
|
||||
ParquetCursor* cursor = vtab_cursor_parquet->cursor;
|
||||
cursor->next();
|
||||
return SQLITE_OK;
|
||||
} catch(std::bad_alloc& ba) {
|
||||
@@ -395,6 +481,38 @@ ConstraintOperator constraintOperatorFromSqlite(int op) {
|
||||
throw std::invalid_argument(ss.str());
|
||||
}
|
||||
|
||||
std::vector<unsigned char> getRowGroupsForClause(sqlite3* db, std::string table, std::string clause) {
|
||||
std::vector<unsigned char> rv;
|
||||
|
||||
std::unique_ptr<char, void(*)(void*)> sql(sqlite3_mprintf(
|
||||
"SELECT actual FROM _%s_rowgroups WHERE clause = '%q'",
|
||||
table.c_str(),
|
||||
clause.c_str()), sqlite3_free);
|
||||
|
||||
if(sql.get() == NULL)
|
||||
return rv;
|
||||
|
||||
sqlite3_stmt* pStmt = NULL;
|
||||
int rc = sqlite3_prepare_v2(db, sql.get(), -1, &pStmt, NULL);
|
||||
if(rc != 0)
|
||||
return rv;
|
||||
|
||||
rc = sqlite3_step(pStmt);
|
||||
if(rc == SQLITE_ROW) {
|
||||
int size = sqlite3_column_bytes(pStmt, 0);
|
||||
unsigned char* blob = (unsigned char*)sqlite3_column_blob(pStmt, 0);
|
||||
// TODO: there is a memory leak here if we get a std::bad_alloc while populating rv;
|
||||
// we fail to free pStmt
|
||||
for(int i = 0; i < size; i++) {
|
||||
rv.push_back(blob[i]);
|
||||
}
|
||||
}
|
||||
|
||||
sqlite3_finalize(pStmt);
|
||||
return rv;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
** Only a full table scan is supported. So xFilter simply rewinds to
|
||||
** the beginning.
|
||||
@@ -407,7 +525,10 @@ static int parquetFilter(
|
||||
sqlite3_value **argv
|
||||
){
|
||||
try {
|
||||
ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor;
|
||||
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
|
||||
sqlite3_vtab_parquet* vtab_parquet = (sqlite3_vtab_parquet*)(vtab_cursor_parquet->base.pVtab);
|
||||
sqlite3* db = vtab_parquet->db;
|
||||
ParquetCursor* cursor = vtab_cursor_parquet->cursor;
|
||||
sqlite3_index_info* indexInfo = (sqlite3_index_info*)idxStr;
|
||||
|
||||
#ifdef DEBUG
|
||||
@@ -451,13 +572,40 @@ static int parquetFilter(
|
||||
type = Null;
|
||||
}
|
||||
|
||||
Constraint constraint(
|
||||
std::string columnName = "rowid";
|
||||
if(indexInfo->aConstraint[i].iColumn >= 0) {
|
||||
columnName = cursor->getTable()->columnName(indexInfo->aConstraint[i].iColumn);
|
||||
}
|
||||
|
||||
RowGroupBitmap bitmap = RowGroupBitmap(cursor->getNumRowGroups());
|
||||
Constraint dummy(
|
||||
bitmap,
|
||||
indexInfo->aConstraint[i].iColumn,
|
||||
columnName,
|
||||
constraintOperatorFromSqlite(indexInfo->aConstraint[i].op),
|
||||
type,
|
||||
intValue,
|
||||
doubleValue,
|
||||
blobValue);
|
||||
|
||||
std::vector<unsigned char> actual = getRowGroupsForClause(db, cursor->getTable()->getTableName(), dummy.describe());
|
||||
if(actual.size() > 0) {
|
||||
// Initialize the estimate to be the actual -- eventually they'll converge
|
||||
// and we'll stop writing back to the db.
|
||||
std::vector<unsigned char> estimate = actual;
|
||||
bitmap = RowGroupBitmap(estimate, actual);
|
||||
}
|
||||
|
||||
Constraint constraint(
|
||||
bitmap,
|
||||
indexInfo->aConstraint[i].iColumn,
|
||||
columnName,
|
||||
constraintOperatorFromSqlite(indexInfo->aConstraint[i].op),
|
||||
type,
|
||||
intValue,
|
||||
doubleValue,
|
||||
blobValue);
|
||||
|
||||
constraints.push_back(constraint);
|
||||
j++;
|
||||
}
|
||||
@@ -555,7 +703,7 @@ static sqlite3_module ParquetModule = {
|
||||
parquetConnect, /* xConnect */
|
||||
parquetBestIndex, /* xBestIndex */
|
||||
parquetDisconnect, /* xDisconnect */
|
||||
parquetDisconnect, /* xDestroy */
|
||||
parquetDestroy, /* xDestroy */
|
||||
parquetOpen, /* xOpen - open a cursor */
|
||||
parquetClose, /* xClose - close a cursor */
|
||||
parquetFilter, /* xFilter - configure scan constraints */
|
||||
|
Reference in New Issue
Block a user