1
0
mirror of https://github.com/cldellow/sqlite-parquet-vtable.git synced 2025-04-03 09:39:47 +00:00

Run a formatting pass with clang-format to minimize future git churn

This commit is contained in:
Addie Morrison 2019-12-08 16:08:11 -06:00
parent ae194c69c5
commit 7bc6f91f6f
7 changed files with 1104 additions and 1165 deletions

View File

@ -11,19 +11,19 @@
*/
#include <sqlite3ext.h>
SQLITE_EXTENSION_INIT1
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <stdarg.h>
#include <ctype.h>
#include <stdio.h>
#include <iomanip>
#include <sys/time.h>
#include <memory>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "parquet_table.h"
#include "parquet_cursor.h"
#include "parquet_filter.h"
#include "parquet_table.h"
//#define DEBUG
@ -52,7 +52,6 @@ typedef struct sqlite3_vtab_parquet {
sqlite3 *db;
} sqlite3_vtab_parquet;
/* A cursor for the Parquet virtual table */
typedef struct sqlite3_vtab_cursor_parquet {
sqlite3_vtab_cursor base; /* Base class. Must be first */
@ -84,17 +83,13 @@ static int parquetDisconnect(sqlite3_vtab *pVtab){
return SQLITE_OK;
}
static int parquetConnect(
sqlite3 *db,
void *pAux,
int argc,
const char *const*argv,
sqlite3_vtab **ppVtab,
char **pzErr
){
static int parquetConnect(sqlite3 *db, void *pAux, int argc,
const char *const *argv, sqlite3_vtab **ppVtab,
char **pzErr) {
try {
if (argc != 4 || strlen(argv[3]) < 2) {
*pzErr = sqlite3_mprintf("must provide exactly one argument, the path to a parquet file");
*pzErr = sqlite3_mprintf(
"must provide exactly one argument, the path to a parquet file");
return SQLITE_ERROR;
}
@ -134,13 +129,9 @@ static int parquetConnect(
** The xConnect and xCreate methods do the same thing, but they must be
** different so that the virtual table is not an eponymous virtual table.
*/
static int parquetCreate(
sqlite3 *db,
void *pAux,
int argc, const char *const*argv,
sqlite3_vtab **ppVtab,
char **pzErr
){
static int parquetCreate(sqlite3 *db, void *pAux, int argc,
const char *const *argv, sqlite3_vtab **ppVtab,
char **pzErr) {
try {
// Create shadow table for storing constraint -> rowid mappings
std::string create = "CREATE TABLE IF NOT EXISTS _";
@ -167,7 +158,8 @@ std::string quoteBlob(const std::vector<unsigned char>& bytes) {
std::ostringstream ss;
ss << "X'" << std::hex;
for (unsigned int i = 0; i < bytes.size(); i++) {
ss << std::setfill('0') << std::setw(2) << (unsigned int)(unsigned char)bytes[i];
ss << std::setfill('0') << std::setw(2)
<< (unsigned int)(unsigned char)bytes[i];
}
ss << "'";
@ -177,8 +169,10 @@ std::string quoteBlob(const std::vector<unsigned char>& bytes) {
void persistConstraints(sqlite3 *db, ParquetCursor *cursor) {
for (unsigned int i = 0; i < cursor->getNumConstraints(); i++) {
const Constraint &constraint = cursor->getConstraint(i);
const std::vector<unsigned char>& estimated = constraint.bitmap.estimatedMembership;
const std::vector<unsigned char>& actual = constraint.bitmap.actualMembership;
const std::vector<unsigned char> &estimated =
constraint.bitmap.estimatedMembership;
const std::vector<unsigned char> &actual =
constraint.bitmap.actualMembership;
if (estimated == actual) {
continue;
}
@ -188,13 +182,11 @@ void persistConstraints(sqlite3* db, ParquetCursor* cursor) {
std::string actualStr = quoteBlob(actual);
// This is only advisory, so ignore failures.
char* sql = sqlite3_mprintf(
"INSERT OR REPLACE INTO _%s_rowgroups(clause, estimate, actual) VALUES ('%q', %s, %s)",
char *sql =
sqlite3_mprintf("INSERT OR REPLACE INTO _%s_rowgroups(clause, "
"estimate, actual) VALUES ('%q', %s, %s)",
cursor->getTable()->getTableName().c_str(),
desc.c_str(),
estimatedStr.c_str(),
actualStr.c_str());
desc.c_str(), estimatedStr.c_str(), actualStr.c_str());
if (sql == NULL)
return;
@ -204,12 +196,12 @@ void persistConstraints(sqlite3* db, ParquetCursor* cursor) {
}
}
/*
** Destructor for a sqlite3_vtab_cursor_parquet.
*/
static int parquetClose(sqlite3_vtab_cursor *cur) {
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
sqlite3_vtab_cursor_parquet *vtab_cursor_parquet =
(sqlite3_vtab_cursor_parquet *)cur;
vtab_cursor_parquet->cursor->close();
delete vtab_cursor_parquet->cursor;
sqlite3_free(cur);
@ -222,7 +214,8 @@ static int parquetClose(sqlite3_vtab_cursor *cur){
static int parquetOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor) {
try {
std::unique_ptr<sqlite3_vtab_cursor_parquet, void (*)(void *)> cursor(
(sqlite3_vtab_cursor_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_cursor_parquet)),
(sqlite3_vtab_cursor_parquet *)sqlite3_malloc(
sizeof(sqlite3_vtab_cursor_parquet)),
sqlite3_free);
memset(cursor.get(), 0, sizeof(*cursor.get()));
@ -238,14 +231,14 @@ static int parquetOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor){
}
}
/*
** Advance a sqlite3_vtab_cursor_parquet to its next row of input.
** Set the EOF marker if we reach the end of input.
*/
static int parquetNext(sqlite3_vtab_cursor *cur) {
try {
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
sqlite3_vtab_cursor_parquet *vtab_cursor_parquet =
(sqlite3_vtab_cursor_parquet *)cur;
ParquetCursor *cursor = vtab_cursor_parquet->cursor;
cursor->next();
return SQLITE_OK;
@ -260,8 +253,8 @@ static int parquetNext(sqlite3_vtab_cursor *cur){
** Return values of columns for the row at which the sqlite3_vtab_cursor_parquet
** is currently pointing.
*/
static int parquetColumn(
sqlite3_vtab_cursor *cur, /* The cursor */
static int
parquetColumn(sqlite3_vtab_cursor *cur, /* The cursor */
sqlite3_context *ctx, /* First argument to sqlite3_result_...() */
int col /* Which column to return */
) {
@ -274,24 +267,22 @@ static int parquetColumn(
} else {
switch (cursor->getPhysicalType(col)) {
case parquet::Type::BOOLEAN:
case parquet::Type::INT32:
{
case parquet::Type::INT32: {
int rv = cursor->getInt32(col);
sqlite3_result_int(ctx, rv);
break;
}
case parquet::Type::FLOAT:
case parquet::Type::DOUBLE:
{
case parquet::Type::DOUBLE: {
double rv = cursor->getDouble(col);
sqlite3_result_double(ctx, rv);
break;
}
case parquet::Type::BYTE_ARRAY:
{
case parquet::Type::BYTE_ARRAY: {
parquet::ByteArray *rv = cursor->getByteArray(col);
if (cursor->getLogicalType(col) == parquet::LogicalType::UTF8) {
sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT);
sqlite3_result_text(ctx, (const char *)rv->ptr, rv->len,
SQLITE_TRANSIENT);
} else {
sqlite3_result_blob(ctx, (void *)rv->ptr, rv->len, SQLITE_TRANSIENT);
}
@ -300,14 +291,12 @@ static int parquetColumn(
case parquet::Type::INT96:
// This type exists to store timestamps in nanoseconds due to legacy
// reasons. We just interpret it as a timestamp in milliseconds.
case parquet::Type::INT64:
{
case parquet::Type::INT64: {
long rv = cursor->getInt64(col);
sqlite3_result_int64(ctx, rv);
break;
}
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
{
case parquet::Type::FIXED_LEN_BYTE_ARRAY: {
parquet::ByteArray *rv = cursor->getByteArray(col);
sqlite3_result_blob(ctx, (void *)rv->ptr, rv->len, SQLITE_TRANSIENT);
break;
@ -316,8 +305,9 @@ static int parquetColumn(
// Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
parquet::TypeToString(cursor->getPhysicalType(col));
ss << __FILE__ << ":" << __LINE__ << ": column " << col
<< " has unsupported type: "
<< parquet::TypeToString(cursor->getPhysicalType(col));
throw std::invalid_argument(ss.str());
break;
@ -347,8 +337,10 @@ static int parquetRowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid){
static int parquetEof(sqlite3_vtab_cursor *cur) {
ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet *)cur)->cursor;
if (cursor->eof()) {
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
sqlite3_vtab_parquet* vtab_parquet = (sqlite3_vtab_parquet*)(vtab_cursor_parquet->base.pVtab);
sqlite3_vtab_cursor_parquet *vtab_cursor_parquet =
(sqlite3_vtab_cursor_parquet *)cur;
sqlite3_vtab_parquet *vtab_parquet =
(sqlite3_vtab_parquet *)(vtab_cursor_parquet->base.pVtab);
persistConstraints(vtab_parquet->db, cursor);
return 1;
}
@ -391,7 +383,8 @@ const char* opName(int op) {
}
}
void debugConstraints(sqlite3_index_info *pIdxInfo, ParquetTable *table, int argc, sqlite3_value** argv) {
void debugConstraints(sqlite3_index_info *pIdxInfo, ParquetTable *table,
int argc, sqlite3_value **argv) {
printf("debugConstraints, argc=%d\n", argc);
int j = 0;
for (int i = 0; i < pIdxInfo->nConstraint; i++) {
@ -399,51 +392,44 @@ void debugConstraints(sqlite3_index_info *pIdxInfo, ParquetTable *table, int arg
if (argv != NULL && pIdxInfo->aConstraint[i].usable) {
int type = sqlite3_value_type(argv[j]);
switch (type) {
case SQLITE_INTEGER:
{
case SQLITE_INTEGER: {
sqlite3_int64 rv = sqlite3_value_int64(argv[j]);
std::ostringstream ss;
ss << rv;
valueStr = ss.str();
break;
}
case SQLITE_FLOAT:
{
case SQLITE_FLOAT: {
double rv = sqlite3_value_double(argv[j]);
std::ostringstream ss;
ss << rv;
valueStr = ss.str();
break;
}
case SQLITE_TEXT:
{
case SQLITE_TEXT: {
const unsigned char *rv = sqlite3_value_text(argv[j]);
std::ostringstream ss;
ss << "'" << rv << "'";
valueStr = ss.str();
break;
}
case SQLITE_BLOB:
{
case SQLITE_BLOB: {
int sizeBytes = sqlite3_value_bytes(argv[j]);
std::ostringstream ss;
ss << "'..." << sizeBytes << "-byte blob...'";
valueStr = ss.str();
break;
}
case SQLITE_NULL:
{
case SQLITE_NULL: {
valueStr = "NULL";
break;
}
}
j++;
}
printf(" constraint %d: col %s %s %s, usable %d\n",
i,
printf(" constraint %d: col %s %s %s, usable %d\n", i,
table->columnName(pIdxInfo->aConstraint[i].iColumn).data(),
opName(pIdxInfo->aConstraint[i].op),
valueStr.data(),
opName(pIdxInfo->aConstraint[i].op), valueStr.data(),
pIdxInfo->aConstraint[i].usable);
}
}
@ -482,13 +468,14 @@ ConstraintOperator constraintOperatorFromSqlite(int op) {
throw std::invalid_argument(ss.str());
}
std::vector<unsigned char> getRowGroupsForClause(sqlite3* db, std::string table, std::string clause) {
std::vector<unsigned char> getRowGroupsForClause(sqlite3 *db, std::string table,
std::string clause) {
std::vector<unsigned char> rv;
std::unique_ptr<char, void(*)(void*)> sql(sqlite3_mprintf(
"SELECT actual FROM _%s_rowgroups WHERE clause = '%q'",
table.c_str(),
clause.c_str()), sqlite3_free);
std::unique_ptr<char, void (*)(void *)> sql(
sqlite3_mprintf("SELECT actual FROM _%s_rowgroups WHERE clause = '%q'",
table.c_str(), clause.c_str()),
sqlite3_free);
if (sql.get() == NULL)
return rv;
@ -502,8 +489,8 @@ std::vector<unsigned char> getRowGroupsForClause(sqlite3* db, std::string table,
if (rc == SQLITE_ROW) {
int size = sqlite3_column_bytes(pStmt, 0);
unsigned char *blob = (unsigned char *)sqlite3_column_blob(pStmt, 0);
// TODO: there is a memory leak here if we get a std::bad_alloc while populating rv;
// we fail to free pStmt
// TODO: there is a memory leak here if we get a std::bad_alloc while
// populating rv; we fail to free pStmt
for (int i = 0; i < size; i++) {
rv.push_back(blob[i]);
}
@ -513,21 +500,17 @@ std::vector<unsigned char> getRowGroupsForClause(sqlite3* db, std::string table,
return rv;
}
/*
** Only a full table scan is supported. So xFilter simply rewinds to
** the beginning.
*/
static int parquetFilter(
sqlite3_vtab_cursor *cur,
int idxNum,
const char *idxStr,
int argc,
sqlite3_value **argv
){
static int parquetFilter(sqlite3_vtab_cursor *cur, int idxNum,
const char *idxStr, int argc, sqlite3_value **argv) {
try {
sqlite3_vtab_cursor_parquet* vtab_cursor_parquet = (sqlite3_vtab_cursor_parquet*)cur;
sqlite3_vtab_parquet* vtab_parquet = (sqlite3_vtab_parquet*)(vtab_cursor_parquet->base.pVtab);
sqlite3_vtab_cursor_parquet *vtab_cursor_parquet =
(sqlite3_vtab_cursor_parquet *)cur;
sqlite3_vtab_parquet *vtab_parquet =
(sqlite3_vtab_parquet *)(vtab_cursor_parquet->base.pVtab);
sqlite3 *db = vtab_parquet->db;
ParquetCursor *cursor = vtab_cursor_parquet->cursor;
sqlite3_index_info *indexInfo = (sqlite3_index_info *)idxStr;
@ -539,7 +522,8 @@ static int parquetFilter(
(unsigned long long)(tv.tv_sec) * 1000 +
(unsigned long long)(tv.tv_usec) / 1000;
printf("%llu xFilter: idxNum=%d, idxStr=%lu, argc=%d\n", millisecondsSinceEpoch, idxNum, (long unsigned int)idxStr, argc);
printf("%llu xFilter: idxNum=%d, idxStr=%lu, argc=%d\n",
millisecondsSinceEpoch, idxNum, (long unsigned int)idxStr, argc);
debugConstraints(indexInfo, cursor->getTable(), argc, argv);
#endif
std::vector<Constraint> constraints;
@ -571,7 +555,8 @@ static int parquetFilter(
} else if (sqliteType == SQLITE_BLOB) {
type = Blob;
int len = sqlite3_value_bytes(argv[j]);
const unsigned char* ptr = (const unsigned char*)sqlite3_value_blob(argv[j]);
const unsigned char *ptr =
(const unsigned char *)sqlite3_value_blob(argv[j]);
for (int k = 0; k < len; k++) {
blobValue.push_back(ptr[k]);
}
@ -581,37 +566,29 @@ static int parquetFilter(
std::string columnName = "rowid";
if (indexInfo->aConstraint[i].iColumn >= 0) {
columnName = cursor->getTable()->columnName(indexInfo->aConstraint[i].iColumn);
columnName =
cursor->getTable()->columnName(indexInfo->aConstraint[i].iColumn);
}
RowGroupBitmap bitmap = RowGroupBitmap(cursor->getNumRowGroups());
Constraint dummy(
bitmap,
indexInfo->aConstraint[i].iColumn,
columnName,
constraintOperatorFromSqlite(indexInfo->aConstraint[i].op),
type,
intValue,
doubleValue,
blobValue);
bitmap, indexInfo->aConstraint[i].iColumn, columnName,
constraintOperatorFromSqlite(indexInfo->aConstraint[i].op), type,
intValue, doubleValue, blobValue);
std::vector<unsigned char> actual = getRowGroupsForClause(db, cursor->getTable()->getTableName(), dummy.describe());
std::vector<unsigned char> actual = getRowGroupsForClause(
db, cursor->getTable()->getTableName(), dummy.describe());
if (actual.size() > 0) {
// Initialize the estimate to be the actual -- eventually they'll converge
// and we'll stop writing back to the db.
// Initialize the estimate to be the actual -- eventually they'll
// converge and we'll stop writing back to the db.
std::vector<unsigned char> estimate = actual;
bitmap = RowGroupBitmap(estimate, actual);
}
Constraint constraint(
bitmap,
indexInfo->aConstraint[i].iColumn,
columnName,
constraintOperatorFromSqlite(indexInfo->aConstraint[i].op),
type,
intValue,
doubleValue,
blobValue);
bitmap, indexInfo->aConstraint[i].iColumn, columnName,
constraintOperatorFromSqlite(indexInfo->aConstraint[i].op), type,
intValue, doubleValue, blobValue);
constraints.push_back(constraint);
j++;
@ -626,15 +603,12 @@ static int parquetFilter(
}
/*
* We'll always indicate to SQLite that we prefer it to use an index so that it will
* pass additional context to xFilter, which we may or may not use.
* We'll always indicate to SQLite that we prefer it to use an index so that it
* will pass additional context to xFilter, which we may or may not use.
*
* We copy the sqlite3_index_info structure, as is, into idxStr for later use.
*/
static int parquetBestIndex(
sqlite3_vtab *tab,
sqlite3_index_info *pIdxInfo
){
static int parquetBestIndex(sqlite3_vtab *tab, sqlite3_index_info *pIdxInfo) {
try {
#ifdef DEBUG
@ -644,15 +618,16 @@ static int parquetBestIndex(
(unsigned long long)(tv.tv_sec) * 1000 +
(unsigned long long)(tv.tv_usec) / 1000;
ParquetTable *table = ((sqlite3_vtab_parquet *)tab)->table;
printf("%llu xBestIndex: nConstraint=%d, nOrderBy=%d\n", millisecondsSinceEpoch, pIdxInfo->nConstraint, pIdxInfo->nOrderBy);
printf("%llu xBestIndex: nConstraint=%d, nOrderBy=%d\n",
millisecondsSinceEpoch, pIdxInfo->nConstraint, pIdxInfo->nOrderBy);
debugConstraints(pIdxInfo, table, 0, NULL);
#endif
// We traverse in rowid ascending order, so if they're asking for it to be ordered like that,
// we can tell SQLite that it's guaranteed. This speeds up some DB viewer utilities that
// use rowids for pagination.
if(pIdxInfo->nOrderBy == 1 && pIdxInfo->aOrderBy[0].iColumn == -1 && pIdxInfo->aOrderBy[0].desc == 0)
// We traverse in rowid ascending order, so if they're asking for it to be
// ordered like that, we can tell SQLite that it's guaranteed. This speeds
// up some DB viewer utilities that use rowids for pagination.
if (pIdxInfo->nOrderBy == 1 && pIdxInfo->aOrderBy[0].iColumn == -1 &&
pIdxInfo->aOrderBy[0].desc == 0)
pIdxInfo->orderByConsumed = 1;
if (pIdxInfo->nConstraint == 0) {
@ -672,11 +647,14 @@ static int parquetBestIndex(
}
}
size_t dupeSize = sizeof(sqlite3_index_info) +
size_t dupeSize =
sizeof(sqlite3_index_info) +
// pIdxInfo->nConstraint * sizeof(sqlite3_index_constraint) +
pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint) +
pIdxInfo->nConstraint *
sizeof(sqlite3_index_info::sqlite3_index_constraint) +
pIdxInfo->nOrderBy * sizeof(sqlite3_index_info::sqlite3_index_orderby) +
pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint_usage);
pIdxInfo->nConstraint *
sizeof(sqlite3_index_info::sqlite3_index_constraint_usage);
sqlite3_index_info *dupe = (sqlite3_index_info *)sqlite3_malloc(dupeSize);
pIdxInfo->idxStr = (char *)dupe;
pIdxInfo->needToFreeIdxStr = 1;
@ -684,15 +662,20 @@ static int parquetBestIndex(
memset(dupe, 0, dupeSize);
memcpy(dupe, pIdxInfo, sizeof(sqlite3_index_info));
dupe->aConstraint = (sqlite3_index_info::sqlite3_index_constraint*)((char*)dupe + sizeof(sqlite3_index_info));
dupe->aOrderBy = (sqlite3_index_info::sqlite3_index_orderby*)((char*)dupe +
sizeof(sqlite3_index_info) +
pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint));
dupe->aConstraintUsage = (sqlite3_index_info::sqlite3_index_constraint_usage*)((char*)dupe +
sizeof(sqlite3_index_info) +
pIdxInfo->nConstraint * sizeof(sqlite3_index_info::sqlite3_index_constraint) +
pIdxInfo->nOrderBy * sizeof(sqlite3_index_info::sqlite3_index_orderby));
dupe->aConstraint = (sqlite3_index_info::sqlite3_index_constraint
*)((char *)dupe + sizeof(sqlite3_index_info));
dupe->aOrderBy =
(sqlite3_index_info::sqlite3_index_orderby
*)((char *)dupe + sizeof(sqlite3_index_info) +
pIdxInfo->nConstraint *
sizeof(sqlite3_index_info::sqlite3_index_constraint));
dupe->aConstraintUsage =
(sqlite3_index_info::sqlite3_index_constraint_usage
*)((char *)dupe + sizeof(sqlite3_index_info) +
pIdxInfo->nConstraint *
sizeof(sqlite3_index_info::sqlite3_index_constraint) +
pIdxInfo->nOrderBy *
sizeof(sqlite3_index_info::sqlite3_index_orderby));
for (int i = 0; i < pIdxInfo->nConstraint; i++) {
dupe->aConstraint[i].iColumn = pIdxInfo->aConstraint[i].iColumn;
@ -700,7 +683,8 @@ static int parquetBestIndex(
dupe->aConstraint[i].usable = pIdxInfo->aConstraint[i].usable;
dupe->aConstraint[i].iTermOffset = pIdxInfo->aConstraint[i].iTermOffset;
dupe->aConstraintUsage[i].argvIndex = pIdxInfo->aConstraintUsage[i].argvIndex;
dupe->aConstraintUsage[i].argvIndex =
pIdxInfo->aConstraintUsage[i].argvIndex;
dupe->aConstraintUsage[i].omit = pIdxInfo->aConstraintUsage[i].omit;
}
@ -717,7 +701,6 @@ static int parquetBestIndex(
}
}
static sqlite3_module ParquetModule = {
0, /* iVersion */
parquetCreate, /* xCreate */
@ -747,11 +730,8 @@ static sqlite3_module ParquetModule = {
* connection.
*/
extern "C" {
int sqlite3_parquet_init(
sqlite3 *db,
char **pzErrMsg,
const sqlite3_api_routines *pApi
){
int sqlite3_parquet_init(sqlite3 *db, char **pzErrMsg,
const sqlite3_api_routines *pApi) {
int rc;
SQLITE_EXTENSION_INIT2(pApi);
rc = sqlite3_create_module(db, "parquet", &ParquetModule, 0);

View File

@ -5,7 +5,8 @@ ParquetCursor::ParquetCursor(ParquetTable* table): table(table) {
reset(std::vector<Constraint>());
}
bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint) {
bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(
Constraint &constraint) {
if (constraint.type != Integer)
return true;
@ -31,7 +32,9 @@ bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint)
}
}
bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) {
bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(
Constraint &constraint,
std::shared_ptr<parquet::RowGroupStatistics> _stats) {
if (!_stats->HasMinMax()) {
return true;
}
@ -48,8 +51,10 @@ bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, s
parquet::Type::type pqType = types[constraint.column];
if (pqType == parquet::Type::BYTE_ARRAY) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>*)_stats.get();
parquet::TypedRowGroupStatistics<
parquet::DataType<parquet::Type::BYTE_ARRAY>> *stats =
(parquet::TypedRowGroupStatistics<
parquet::DataType<parquet::Type::BYTE_ARRAY>> *)_stats.get();
minPtr = stats->min().ptr;
minLen = stats->min().len;
@ -64,8 +69,9 @@ bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, s
} else {
// Should be impossible to get here
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesBlobFilter called on unsupported type: " <<
parquet::TypeToString(pqType);
ss << __FILE__ << ":" << __LINE__
<< ": currentRowGroupSatisfiesBlobFilter called on unsupported type: "
<< parquet::TypeToString(pqType);
throw std::invalid_argument(ss.str());
}
@ -73,61 +79,45 @@ bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, s
switch (constraint.op) {
case Is:
case Equal:
{
bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0;
bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0;
case Equal: {
bool minEqual =
blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0;
bool maxEqual =
blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0;
bool blobGteMinBlob = std::lexicographical_compare(
minPtr,
minPtr + minLen,
&blob[0],
&blob[0] + blob.size());
minPtr, minPtr + minLen, &blob[0], &blob[0] + blob.size());
bool blobLtMaxBlob = std::lexicographical_compare(
&blob[0],
&blob[0] + blob.size(),
maxPtr,
maxPtr + maxLen);
&blob[0], &blob[0] + blob.size(), maxPtr, maxPtr + maxLen);
return (minEqual || blobGteMinBlob) && (maxEqual || blobLtMaxBlob);
}
case GreaterThanOrEqual:
{
bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0;
case GreaterThanOrEqual: {
bool maxEqual =
blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0;
return maxEqual || std::lexicographical_compare(
&blob[0],
&blob[0] + blob.size(),
maxPtr,
maxPtr + maxLen);
return maxEqual ||
std::lexicographical_compare(&blob[0], &blob[0] + blob.size(),
maxPtr, maxPtr + maxLen);
}
case GreaterThan:
return std::lexicographical_compare(
&blob[0],
&blob[0] + blob.size(),
maxPtr,
maxPtr + maxLen);
return std::lexicographical_compare(&blob[0], &blob[0] + blob.size(),
maxPtr, maxPtr + maxLen);
case LessThan:
return std::lexicographical_compare(
minPtr,
minPtr + minLen,
&blob[0],
return std::lexicographical_compare(minPtr, minPtr + minLen, &blob[0],
&blob[0] + blob.size());
case LessThanOrEqual:
{
bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0;
return minEqual || std::lexicographical_compare(
minPtr,
minPtr + minLen,
&blob[0],
case LessThanOrEqual: {
bool minEqual =
blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0;
return minEqual ||
std::lexicographical_compare(minPtr, minPtr + minLen, &blob[0],
&blob[0] + blob.size());
}
case NotEqual:
{
case NotEqual: {
// If min == max == blob, we can skip this.
bool blobMaxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0;
bool blobMaxEqual =
blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0;
bool minMaxEqual = minLen == maxLen && memcmp(minPtr, maxPtr, minLen) == 0;
return !(blobMaxEqual && minMaxEqual);
}
@ -137,9 +127,12 @@ bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, s
}
}
bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>*)_stats.get();
bool ParquetCursor::currentRowGroupSatisfiesTextFilter(
Constraint &constraint,
std::shared_ptr<parquet::RowGroupStatistics> _stats) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>
*stats = (parquet::TypedRowGroupStatistics<
parquet::DataType<parquet::Type::BYTE_ARRAY>> *)_stats.get();
if (!stats->HasMinMax()) {
return true;
@ -154,7 +147,8 @@ bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, s
const parquet::ByteArray &max = stats->max();
std::string minStr((const char *)min.ptr, min.len);
std::string maxStr((const char *)max.ptr, max.len);
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data());
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len,
// maxStr.data(), max.len, str.data());
switch (constraint.op) {
case Is:
@ -171,12 +165,12 @@ bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, s
case NotEqual:
// If min == max == str, we can skip this.
return !(minStr == maxStr && str == minStr);
case Like:
{
case Like: {
const std::string &likeStringValue = constraint.likeStringValue;
std::string truncatedMin = minStr.substr(0, likeStringValue.size());
std::string truncatedMax = maxStr.substr(0, likeStringValue.size());
return likeStringValue.empty() || (likeStringValue >= truncatedMin && likeStringValue <= truncatedMax);
return likeStringValue.empty() ||
(likeStringValue >= truncatedMin && likeStringValue <= truncatedMax);
}
case IsNot:
default:
@ -195,7 +189,9 @@ int64_t int96toMsSinceEpoch(const parquet::Int96& rv) {
return nsSinceEpoch;
}
bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) {
bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(
Constraint &constraint,
std::shared_ptr<parquet::RowGroupStatistics> _stats) {
if (!_stats->HasMinMax()) {
return true;
}
@ -211,27 +207,31 @@ bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint
parquet::Type::type pqType = types[column];
if (pqType == parquet::Type::INT32) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT32>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT32>>*)_stats.get();
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT32>>
*stats = (parquet::TypedRowGroupStatistics<
parquet::DataType<parquet::Type::INT32>> *)_stats.get();
min = stats->min();
max = stats->max();
} else if (pqType == parquet::Type::INT64) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT64>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT64>>*)_stats.get();
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT64>>
*stats = (parquet::TypedRowGroupStatistics<
parquet::DataType<parquet::Type::INT64>> *)_stats.get();
min = stats->min();
max = stats->max();
} else if (pqType == parquet::Type::INT96) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT96>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT96>>*)_stats.get();
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT96>>
*stats = (parquet::TypedRowGroupStatistics<
parquet::DataType<parquet::Type::INT96>> *)_stats.get();
min = int96toMsSinceEpoch(stats->min());
max = int96toMsSinceEpoch(stats->max());
} else if (pqType == parquet::Type::BOOLEAN) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BOOLEAN>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BOOLEAN>>*)_stats.get();
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BOOLEAN>>
*stats = (parquet::TypedRowGroupStatistics<
parquet::DataType<parquet::Type::BOOLEAN>> *)_stats.get();
min = stats->min();
max = stats->max();
@ -240,13 +240,15 @@ bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint
// Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " <<
parquet::TypeToString(pqType);
ss << __FILE__ << ":" << __LINE__
<< ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: "
<< parquet::TypeToString(pqType);
throw std::invalid_argument(ss.str());
}
const int64_t value = constraint.intValue;
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data());
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len,
// maxStr.data(), max.len, str.data());
switch (constraint.op) {
case Is:
@ -272,7 +274,9 @@ bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint
return true;
}
bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) {
bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(
Constraint &constraint,
std::shared_ptr<parquet::RowGroupStatistics> _stats) {
if (!_stats->HasMinMax()) {
return true;
}
@ -288,14 +292,16 @@ bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint,
parquet::Type::type pqType = types[column];
if (pqType == parquet::Type::DOUBLE) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::DOUBLE>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::DOUBLE>>*)_stats.get();
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::DOUBLE>>
*stats = (parquet::TypedRowGroupStatistics<
parquet::DataType<parquet::Type::DOUBLE>> *)_stats.get();
min = stats->min();
max = stats->max();
} else if (pqType == parquet::Type::FLOAT) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::FLOAT>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::FLOAT>>*)_stats.get();
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::FLOAT>>
*stats = (parquet::TypedRowGroupStatistics<
parquet::DataType<parquet::Type::FLOAT>> *)_stats.get();
min = stats->min();
max = stats->max();
@ -303,13 +309,15 @@ bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint,
// Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " <<
parquet::TypeToString(pqType);
ss << __FILE__ << ":" << __LINE__
<< ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: "
<< parquet::TypeToString(pqType);
throw std::invalid_argument(ss.str());
}
const double value = constraint.doubleValue;
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data());
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len,
// maxStr.data(), max.len, str.data());
switch (constraint.op) {
case Is:
@ -333,7 +341,6 @@ bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint,
}
return true;
}
bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint &constraint) {
@ -345,8 +352,7 @@ bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint& constraint) {
switch (constraint.op) {
case Is:
case Equal:
{
case Equal: {
const std::vector<unsigned char> &blob = constraint.blobValue;
if (blob.size() != ba->len)
@ -354,8 +360,7 @@ bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint& constraint) {
return 0 == memcmp(&blob[0], ba->ptr, ba->len);
}
case NotEqual:
{
case NotEqual: {
const std::vector<unsigned char> &blob = constraint.blobValue;
if (blob.size() != ba->len)
@ -363,52 +368,39 @@ bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint& constraint) {
return 0 != memcmp(&blob[0], ba->ptr, ba->len);
}
case GreaterThan:
{
case GreaterThan: {
const std::vector<unsigned char> &blob = constraint.blobValue;
return std::lexicographical_compare(
&blob[0],
&blob[0] + blob.size(),
ba->ptr,
ba->ptr + ba->len);
return std::lexicographical_compare(&blob[0], &blob[0] + blob.size(),
ba->ptr, ba->ptr + ba->len);
}
case GreaterThanOrEqual:
{
case GreaterThanOrEqual: {
const std::vector<unsigned char> &blob = constraint.blobValue;
bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len);
bool equal =
blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len);
return equal || std::lexicographical_compare(
&blob[0],
&blob[0] + blob.size(),
ba->ptr,
ba->ptr + ba->len);
return equal ||
std::lexicographical_compare(&blob[0], &blob[0] + blob.size(),
ba->ptr, ba->ptr + ba->len);
}
case LessThan:
{
case LessThan: {
const std::vector<unsigned char> &blob = constraint.blobValue;
return std::lexicographical_compare(
ba->ptr,
ba->ptr + ba->len,
&blob[0],
return std::lexicographical_compare(ba->ptr, ba->ptr + ba->len, &blob[0],
&blob[0] + blob.size());
}
case LessThanOrEqual:
{
case LessThanOrEqual: {
const std::vector<unsigned char> &blob = constraint.blobValue;
bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len);
bool equal =
blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len);
return equal || std::lexicographical_compare(
ba->ptr,
ba->ptr + ba->len,
&blob[0],
return equal ||
std::lexicographical_compare(ba->ptr, ba->ptr + ba->len, &blob[0],
&blob[0] + blob.size());
}
case Like:
{
case Like: {
const std::string &likeStringValue = constraint.likeStringValue;
if (likeStringValue.size() > ba->len)
return false;
@ -441,13 +433,15 @@ bool ParquetCursor::currentRowSatisfiesIntegerFilter(Constraint& constraint) {
if (pqType == parquet::Type::INT32 || pqType == parquet::Type::BOOLEAN) {
value = getInt32(column);
} else if(pqType == parquet::Type::INT64 || pqType == parquet::Type::INT96) {
} else if (pqType == parquet::Type::INT64 ||
pqType == parquet::Type::INT96) {
value = getInt64(column);
} else {
// Should be impossible to get here
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": currentRowSatisfiesIntegerFilter called on unsupported type: " <<
parquet::TypeToString(pqType);
ss << __FILE__ << ":" << __LINE__
<< ": currentRowSatisfiesIntegerFilter called on unsupported type: "
<< parquet::TypeToString(pqType);
throw std::invalid_argument(ss.str());
}
}
@ -509,7 +503,6 @@ bool ParquetCursor::currentRowSatisfiesDoubleFilter(Constraint& constraint) {
return true;
}
// Return true if it is _possible_ that the current
// rowgroup satisfies the constraints. Only return false
// if it definitely does not.
@ -525,7 +518,8 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() {
if (column == -1) {
rv = currentRowGroupSatisfiesRowIdFilter(constraints[i]);
} else {
std::unique_ptr<parquet::ColumnChunkMetaData> md = rowGroupMetadata->ColumnChunk(column);
std::unique_ptr<parquet::ColumnChunkMetaData> md =
rowGroupMetadata->ColumnChunk(column);
if (md->is_stats_set()) {
std::shared_ptr<parquet::RowGroupStatistics> stats = md->statistics();
@ -545,7 +539,8 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() {
} else {
parquet::Type::type pqType = types[column];
if(pqType == parquet::Type::BYTE_ARRAY && logicalTypes[column] == parquet::LogicalType::UTF8) {
if (pqType == parquet::Type::BYTE_ARRAY &&
logicalTypes[column] == parquet::LogicalType::UTF8) {
rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats);
} else if (pqType == parquet::Type::BYTE_ARRAY) {
rv = currentRowGroupSatisfiesBlobFilter(constraints[i], stats);
@ -554,7 +549,8 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() {
pqType == parquet::Type::INT96 ||
pqType == parquet::Type::BOOLEAN) {
rv = currentRowGroupSatisfiesIntegerFilter(constraints[i], stats);
} else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) {
} else if (pqType == parquet::Type::FLOAT ||
pqType == parquet::Type::DOUBLE) {
rv = currentRowGroupSatisfiesDoubleFilter(constraints[i], stats);
}
}
@ -570,15 +566,15 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() {
}
}
// printf("rowGroup %d %s\n", rowGroupId, overallRv ? "may satisfy" : "does not satisfy");
// printf("rowGroup %d %s\n", rowGroupId, overallRv ? "may satisfy" : "does
// not satisfy");
return true;
}
bool ParquetCursor::nextRowGroup() {
start:
// Ensure that rowId points at the start of this rowGroup (eg, in the case where
// we skipped an entire row group).
// Ensure that rowId points at the start of this rowGroup (eg, in the case
// where we skipped an entire row group).
rowId = rowGroupStartRowId + rowGroupSize;
if ((rowGroupId + 1) >= numRowGroups) {
@ -595,7 +591,6 @@ start:
colByteArrayValues.push_back(parquet::ByteArray());
}
rowGroupStartRowId = rowId;
rowGroupId++;
rowGroupMetadata = reader->metadata()->RowGroup(rowGroupId);
@ -609,10 +604,12 @@ start:
}
while (logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) {
logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type());
logicalTypes.push_back(
rowGroupMetadata->schema()->Column(0)->logical_type());
}
for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) {
for (unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns();
i++) {
types[i] = rowGroupMetadata->schema()->Column(i)->physical_type();
logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type();
}
@ -629,7 +626,8 @@ start:
// a row
for (unsigned int i = 0; i < constraints.size(); i++) {
if (rowGroupId > 0 && constraints[i].rowGroupId == rowGroupId - 1) {
constraints[i].bitmap.setActualMembership(rowGroupId - 1, constraints[i].hadRows);
constraints[i].bitmap.setActualMembership(rowGroupId - 1,
constraints[i].hadRows);
}
constraints[i].hadRows = false;
}
@ -668,12 +666,12 @@ bool ParquetCursor::currentRowSatisfiesFilter() {
rv = currentRowSatisfiesTextFilter(constraints[i]);
} else {
parquet::Type::type pqType = types[column];
if(pqType == parquet::Type::INT32 ||
pqType == parquet::Type::INT64 ||
if (pqType == parquet::Type::INT32 || pqType == parquet::Type::INT64 ||
pqType == parquet::Type::INT96 ||
pqType == parquet::Type::BOOLEAN) {
rv = currentRowSatisfiesIntegerFilter(constraints[i]);
} else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) {
} else if (pqType == parquet::Type::FLOAT ||
pqType == parquet::Type::DOUBLE) {
rv = currentRowSatisfiesDoubleFilter(constraints[i]);
}
}
@ -698,8 +696,8 @@ start:
rowId = numRows + 1;
return;
} else {
// After a successful nextRowGroup, rowId is pointing at the current row. Make it
// point before so the rest of the logic works out.
// After a successful nextRowGroup, rowId is pointing at the current row.
// Make it point before so the rest of the logic works out.
rowId--;
}
}
@ -710,13 +708,9 @@ start:
goto start;
}
int ParquetCursor::getRowId() {
return rowId;
}
int ParquetCursor::getRowId() { return rowId; }
bool ParquetCursor::eof() {
return rowId > numRows;
}
bool ParquetCursor::eof() { return rowId > numRows; }
void ParquetCursor::ensureColumn(int col) {
// -1 signals rowid, which is trivially available
@ -737,58 +731,53 @@ void ParquetCursor::ensureColumn(int col) {
bool wasNull = false;
while (colRows[col] + 1 < rowId) {
switch (types[col]) {
case parquet::Type::INT32:
{
case parquet::Type::INT32: {
parquet::Int32Scanner *s = (parquet::Int32Scanner *)scanners[col].get();
int rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::FLOAT:
{
case parquet::Type::FLOAT: {
parquet::FloatScanner *s = (parquet::FloatScanner *)scanners[col].get();
float rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::DOUBLE:
{
parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get();
case parquet::Type::DOUBLE: {
parquet::DoubleScanner *s =
(parquet::DoubleScanner *)scanners[col].get();
double rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::BYTE_ARRAY:
{
parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get();
case parquet::Type::BYTE_ARRAY: {
parquet::ByteArrayScanner *s =
(parquet::ByteArrayScanner *)scanners[col].get();
parquet::ByteArray ba;
s->NextValue(&ba, &wasNull);
break;
}
case parquet::Type::INT96:
{
case parquet::Type::INT96: {
parquet::Int96Scanner *s = (parquet::Int96Scanner *)scanners[col].get();
parquet::Int96 rv;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::INT64:
{
case parquet::Type::INT64: {
parquet::Int64Scanner *s = (parquet::Int64Scanner *)scanners[col].get();
long rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::BOOLEAN:
{
case parquet::Type::BOOLEAN: {
parquet::BoolScanner *s = (parquet::BoolScanner *)scanners[col].get();
bool rv = false;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
{
parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get();
case parquet::Type::FIXED_LEN_BYTE_ARRAY: {
parquet::FixedLenByteArrayScanner *s =
(parquet::FixedLenByteArrayScanner *)scanners[col].get();
parquet::FixedLenByteArray flba;
s->NextValue(&flba, &wasNull);
break;
@ -797,11 +786,10 @@ void ParquetCursor::ensureColumn(int col) {
// Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
parquet::TypeToString(types[col]);
ss << __FILE__ << ":" << __LINE__ << ": column " << col
<< " has unsupported type: " << parquet::TypeToString(types[col]);
throw std::invalid_argument(ss.str());
break;
}
colRows[col]++;
}
@ -811,39 +799,36 @@ void ParquetCursor::ensureColumn(int col) {
bool hadValue = false;
switch (types[col]) {
case parquet::Type::INT32:
{
case parquet::Type::INT32: {
parquet::Int32Scanner *s = (parquet::Int32Scanner *)scanners[col].get();
int rv = 0;
hadValue = s->NextValue(&rv, &wasNull);
colIntValues[col] = rv;
break;
}
case parquet::Type::FLOAT:
{
case parquet::Type::FLOAT: {
parquet::FloatScanner *s = (parquet::FloatScanner *)scanners[col].get();
float rv = 0;
hadValue = s->NextValue(&rv, &wasNull);
colDoubleValues[col] = rv;
break;
}
case parquet::Type::DOUBLE:
{
case parquet::Type::DOUBLE: {
parquet::DoubleScanner *s = (parquet::DoubleScanner *)scanners[col].get();
double rv = 0;
hadValue = s->NextValue(&rv, &wasNull);
colDoubleValues[col] = rv;
break;
}
case parquet::Type::BYTE_ARRAY:
{
parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get();
case parquet::Type::BYTE_ARRAY: {
parquet::ByteArrayScanner *s =
(parquet::ByteArrayScanner *)scanners[col].get();
hadValue = s->NextValue(&colByteArrayValues[col], &wasNull);
break;
}
case parquet::Type::INT96:
{
// INT96 tracks a date with nanosecond precision, convert to ms since epoch.
case parquet::Type::INT96: {
// INT96 tracks a date with nanosecond precision, convert to ms since
// epoch.
// ...see https://github.com/apache/parquet-format/pull/49 for more
//
// First 8 bytes: nanoseconds into the day
@ -856,8 +841,7 @@ void ParquetCursor::ensureColumn(int col) {
colIntValues[col] = int96toMsSinceEpoch(rv);
break;
}
case parquet::Type::INT64:
{
case parquet::Type::INT64: {
parquet::Int64Scanner *s = (parquet::Int64Scanner *)scanners[col].get();
long rv = 0;
hadValue = s->NextValue(&rv, &wasNull);
@ -865,30 +849,30 @@ void ParquetCursor::ensureColumn(int col) {
break;
}
case parquet::Type::BOOLEAN:
{
case parquet::Type::BOOLEAN: {
parquet::BoolScanner *s = (parquet::BoolScanner *)scanners[col].get();
bool rv = false;
hadValue = s->NextValue(&rv, &wasNull);
colIntValues[col] = rv ? 1 : 0;
break;
}
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
{
parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get();
case parquet::Type::FIXED_LEN_BYTE_ARRAY: {
parquet::FixedLenByteArrayScanner *s =
(parquet::FixedLenByteArrayScanner *)scanners[col].get();
parquet::FixedLenByteArray flba;
hadValue = s->NextValue(&flba, &wasNull);
colByteArrayValues[col].ptr = flba.ptr;
// TODO: cache this
colByteArrayValues[col].len = rowGroupMetadata->schema()->Column(col)->type_length();
colByteArrayValues[col].len =
rowGroupMetadata->schema()->Column(col)->type_length();
break;
}
default:
// Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
parquet::TypeToString(types[col]);
ss << __FILE__ << ":" << __LINE__ << ": column " << col
<< " has unsupported type: " << parquet::TypeToString(types[col]);
throw std::invalid_argument(ss.str());
break;
}
@ -908,17 +892,11 @@ bool ParquetCursor::isNull(int col) {
return colNulls[col];
}
int ParquetCursor::getInt32(int col) {
return colIntValues[col];
}
int ParquetCursor::getInt32(int col) { return colIntValues[col]; }
long ParquetCursor::getInt64(int col) {
return colIntValues[col];
}
long ParquetCursor::getInt64(int col) { return colIntValues[col]; }
double ParquetCursor::getDouble(int col) {
return colDoubleValues[col];
}
double ParquetCursor::getDouble(int col) { return colDoubleValues[col]; }
parquet::ByteArray *ParquetCursor::getByteArray(int col) {
return &colByteArrayValues[col];
@ -942,12 +920,10 @@ void ParquetCursor::reset(std::vector<Constraint> constraints) {
close();
this->constraints = constraints;
rowId = 0;
// TODO: consider having a long lived handle in ParquetTable that can be borrowed
// without incurring the cost of opening the file from scratch twice
// TODO: consider having a long lived handle in ParquetTable that can be
// borrowed without incurring the cost of opening the file from scratch twice
reader = parquet::ParquetFileReader::OpenFile(
table->getFile().data(),
true,
parquet::default_reader_properties(),
table->getFile().data(), true, parquet::default_reader_properties(),
table->getMetadata());
rowGroupId = -1;
@ -964,7 +940,9 @@ void ParquetCursor::reset(std::vector<Constraint> constraints) {
ParquetTable *ParquetCursor::getTable() const { return table; }
unsigned int ParquetCursor::getNumRowGroups() const { return numRowGroups; }
unsigned int ParquetCursor::getNumConstraints() const { return constraints.size(); }
const Constraint& ParquetCursor::getConstraint(unsigned int i) const { return constraints[i]; }
unsigned int ParquetCursor::getNumConstraints() const {
return constraints.size();
}
const Constraint &ParquetCursor::getConstraint(unsigned int i) const {
return constraints[i];
}

View File

@ -1,9 +1,9 @@
#ifndef PARQUET_CURSOR_H
#define PARQUET_CURSOR_H
#include "parquet/api/reader.h"
#include "parquet_filter.h"
#include "parquet_table.h"
#include "parquet/api/reader.h"
class ParquetCursor {
@ -36,16 +36,23 @@ class ParquetCursor {
bool currentRowSatisfiesFilter();
bool currentRowGroupSatisfiesFilter();
bool currentRowGroupSatisfiesRowIdFilter(Constraint &constraint);
bool currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats);
bool currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats);
bool currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats);
bool currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats);
bool currentRowGroupSatisfiesTextFilter(
Constraint &constraint,
std::shared_ptr<parquet::RowGroupStatistics> stats);
bool currentRowGroupSatisfiesBlobFilter(
Constraint &constraint,
std::shared_ptr<parquet::RowGroupStatistics> stats);
bool currentRowGroupSatisfiesIntegerFilter(
Constraint &constraint,
std::shared_ptr<parquet::RowGroupStatistics> stats);
bool currentRowGroupSatisfiesDoubleFilter(
Constraint &constraint,
std::shared_ptr<parquet::RowGroupStatistics> stats);
bool currentRowSatisfiesTextFilter(Constraint &constraint);
bool currentRowSatisfiesIntegerFilter(Constraint &constraint);
bool currentRowSatisfiesDoubleFilter(Constraint &constraint);
public:
ParquetCursor(ParquetTable *table);
int getRowId();
@ -70,4 +77,3 @@ public:
};
#endif

View File

@ -1,23 +1,12 @@
#include "parquet_filter.h"
Constraint::Constraint(
RowGroupBitmap bitmap,
int column,
std::string columnName,
ConstraintOperator op,
ValueType type,
int64_t intValue,
double doubleValue,
std::vector<unsigned char> blobValue
): bitmap(bitmap),
column(column),
columnName(columnName),
op(op),
type(type),
intValue(intValue),
doubleValue(doubleValue),
blobValue(blobValue),
hadRows(false) {
Constraint::Constraint(RowGroupBitmap bitmap, int column,
std::string columnName, ConstraintOperator op,
ValueType type, int64_t intValue, double doubleValue,
std::vector<unsigned char> blobValue)
: bitmap(bitmap), column(column), columnName(columnName), op(op),
type(type), intValue(intValue), doubleValue(doubleValue),
blobValue(blobValue), hadRows(false) {
RowGroupBitmap bm = bitmap;
this->bitmap = bm;

View File

@ -1,9 +1,9 @@
#ifndef PARQUET_FILTER_H
#define PARQUET_FILTER_H
#include <vector>
#include <string>
#include <cstdint>
#include <string>
#include <vector>
enum ConstraintOperator {
Equal,
@ -20,16 +20,11 @@ enum ConstraintOperator {
Is
};
enum ValueType {
Null,
Integer,
Double,
Blob,
Text
};
enum ValueType { Null, Integer, Double, Blob, Text };
class RowGroupBitmap {
void setBit(std::vector<unsigned char>& membership, unsigned int rowGroup, bool isSet) {
void setBit(std::vector<unsigned char> &membership, unsigned int rowGroup,
bool isSet) {
int byte = rowGroup / 8;
int offset = rowGroup % 8;
unsigned char c = membership[byte];
@ -51,12 +46,10 @@ public:
}
}
RowGroupBitmap(
std::vector<unsigned char> estimatedMembership,
std::vector<unsigned char> actualMembership) :
estimatedMembership(estimatedMembership),
actualMembership(actualMembership) {
}
RowGroupBitmap(std::vector<unsigned char> estimatedMembership,
std::vector<unsigned char> actualMembership)
: estimatedMembership(estimatedMembership),
actualMembership(actualMembership) {}
std::vector<unsigned char> estimatedMembership;
std::vector<unsigned char> actualMembership;
@ -80,17 +73,11 @@ public:
class Constraint {
public:
// Kind of a messy constructor function, but it's just for internal use, so whatever.
Constraint(
RowGroupBitmap bitmap,
int column,
std::string columnName,
ConstraintOperator op,
ValueType type,
int64_t intValue,
double doubleValue,
std::vector<unsigned char> blobValue
);
// Kind of a messy constructor function, but it's just for internal use, so
// whatever.
Constraint(RowGroupBitmap bitmap, int column, std::string columnName,
ConstraintOperator op, ValueType type, int64_t intValue,
double doubleValue, std::vector<unsigned char> blobValue);
RowGroupBitmap bitmap;
int column; // underlying column in the query

View File

@ -2,8 +2,10 @@
#include "parquet/api/reader.h"
ParquetTable::ParquetTable(std::string file, std::string tableName): file(file), tableName(tableName) {
std::unique_ptr<parquet::ParquetFileReader> reader = parquet::ParquetFileReader::OpenFile(file.data());
ParquetTable::ParquetTable(std::string file, std::string tableName)
: file(file), tableName(tableName) {
std::unique_ptr<parquet::ParquetFileReader> reader =
parquet::ParquetFileReader::OpenFile(file.data());
metadata = reader->metadata();
}
@ -13,17 +15,12 @@ std::string ParquetTable::columnName(int i) {
return columnNames[i];
}
unsigned int ParquetTable::getNumColumns() {
return columnNames.size();
}
unsigned int ParquetTable::getNumColumns() { return columnNames.size(); }
std::string ParquetTable::CreateStatement() {
std::unique_ptr<parquet::ParquetFileReader> reader = parquet::ParquetFileReader::OpenFile(
file.data(),
true,
parquet::default_reader_properties(),
metadata);
std::unique_ptr<parquet::ParquetFileReader> reader =
parquet::ParquetFileReader::OpenFile(
file.data(), true, parquet::default_reader_properties(), metadata);
std::string text("CREATE TABLE x(");
auto schema = reader->metadata()->schema();
@ -37,17 +34,20 @@ std::string ParquetTable::CreateStatement() {
if (!_col->is_primitive()) {
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-primitive type";
ss << __FILE__ << ":" << __LINE__ << ": column " << i
<< " has non-primitive type";
throw std::invalid_argument(ss.str());
}
if (_col->is_repeated()) {
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-scalar type";
ss << __FILE__ << ":" << __LINE__ << ": column " << i
<< " has non-scalar type";
throw std::invalid_argument(ss.str());
}
parquet::schema::PrimitiveNode* col = (parquet::schema::PrimitiveNode*)_col;
parquet::schema::PrimitiveNode *col =
(parquet::schema::PrimitiveNode *)_col;
if (i > 0)
text += ", ";
@ -125,21 +125,19 @@ std::string ParquetTable::CreateStatement() {
if (type.empty()) {
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has unsupported type: " <<
parquet::TypeToString(physical) << "/" << parquet::LogicalTypeToString(logical);
ss << __FILE__ << ":" << __LINE__ << ": column " << i
<< " has unsupported type: " << parquet::TypeToString(physical) << "/"
<< parquet::LogicalTypeToString(logical);
throw std::invalid_argument(ss.str());
}
#ifdef DEBUG
printf("col %d[name=%s, p=%d:%s, l=%d:%s] is %s\n",
i,
col->name().data(),
printf(
"col %d[name=%s, p=%d:%s, l=%d:%s] is %s\n", i, col->name().data(),
col->physical_type(),
parquet::TypeToString(col->physical_type()).data(),
col->logical_type(),
parquet::LogicalTypeToString(col->logical_type()).data(),
type.data());
parquet::TypeToString(col->physical_type()).data(), col->logical_type(),
parquet::LogicalTypeToString(col->logical_type()).data(), type.data());
#endif
text += " ";
@ -149,7 +147,9 @@ std::string ParquetTable::CreateStatement() {
return text;
}
std::shared_ptr<parquet::FileMetaData> ParquetTable::getMetadata() { return metadata; }
std::shared_ptr<parquet::FileMetaData> ParquetTable::getMetadata() {
return metadata;
}
const std::string &ParquetTable::getFile() { return file; }
const std::string &ParquetTable::getTableName() { return tableName; }

View File

@ -1,9 +1,9 @@
#ifndef PARQUET_TABLE_H
#define PARQUET_TABLE_H
#include <vector>
#include <string>
#include "parquet/api/reader.h"
#include <string>
#include <vector>
class ParquetTable {
std::string file;
@ -11,7 +11,6 @@ class ParquetTable {
std::vector<std::string> columnNames;
std::shared_ptr<parquet::FileMetaData> metadata;
public:
ParquetTable(std::string file, std::string tableName);
std::string CreateStatement();