Scaffolding for in-extension filtering
Supports IS NULL and IS NOT NULL checks
This commit is contained in:
parent
d28ae86d15
commit
830053c1fc
|
@ -10,18 +10,21 @@ BOOST_LIB = /usr/lib/x86_64-linux-gnu/libboost_regex.so
|
||||||
|
|
||||||
LDFLAGS = -O3 $(PARQUET_LIB) $(THRIFT_LIB) $(ARROW_LIB) $(BOOST_LIB)
|
LDFLAGS = -O3 $(PARQUET_LIB) $(THRIFT_LIB) $(ARROW_LIB) $(BOOST_LIB)
|
||||||
DEPS = hellomake.h
|
DEPS = hellomake.h
|
||||||
OBJ = parquet.o parquet_table.o parquet_cursor.o
|
OBJ = parquet.o parquet_filter.o parquet_table.o parquet_cursor.o
|
||||||
|
|
||||||
libparquet.so: $(OBJ)
|
libparquet.so: $(OBJ)
|
||||||
$(CC) -shared -o $@ $^ $(LDFLAGS)
|
$(CC) -shared -o $@ $^ $(LDFLAGS)
|
||||||
|
|
||||||
parquet_cursor.o: parquet_cursor.cc parquet_cursor.h parquet_table.h
|
parquet_filter.o: parquet_filter.cc parquet_filter.h
|
||||||
|
$(CC) -c -o $@ $< $(CFLAGS)
|
||||||
|
|
||||||
|
parquet_cursor.o: parquet_cursor.cc parquet_cursor.h parquet_table.h parquet_filter.h
|
||||||
$(CC) -c -o $@ $< $(CFLAGS)
|
$(CC) -c -o $@ $< $(CFLAGS)
|
||||||
|
|
||||||
parquet_table.o: parquet_table.cc parquet_table.h
|
parquet_table.o: parquet_table.cc parquet_table.h
|
||||||
$(CC) -c -o $@ $< $(CFLAGS)
|
$(CC) -c -o $@ $< $(CFLAGS)
|
||||||
|
|
||||||
parquet.o: parquet.cc parquet_cursor.h parquet_table.h
|
parquet.o: parquet.cc parquet_cursor.h parquet_table.h parquet_filter.h
|
||||||
$(CC) -c -o $@ $< $(CFLAGS)
|
$(CC) -c -o $@ $< $(CFLAGS)
|
||||||
|
|
||||||
.PHONY: clean
|
.PHONY: clean
|
||||||
|
|
|
@ -22,6 +22,7 @@ SQLITE_EXTENSION_INIT1
|
||||||
|
|
||||||
#include "parquet_table.h"
|
#include "parquet_table.h"
|
||||||
#include "parquet_cursor.h"
|
#include "parquet_cursor.h"
|
||||||
|
#include "parquet_filter.h"
|
||||||
|
|
||||||
/* Forward references to the various virtual table methods implemented
|
/* Forward references to the various virtual table methods implemented
|
||||||
* in this file. */
|
* in this file. */
|
||||||
|
@ -334,6 +335,42 @@ void debugConstraints(sqlite3_index_info *pIdxInfo, ParquetTable *table, int arg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ConstraintOperator constraintOperatorFromSqlite(int op) {
|
||||||
|
switch(op) {
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_EQ:
|
||||||
|
return Equal;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_GT:
|
||||||
|
return GreaterThan;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_LE:
|
||||||
|
return LessThanOrEqual;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_LT:
|
||||||
|
return LessThan;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_GE:
|
||||||
|
return GreaterThanOrEqual;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_MATCH:
|
||||||
|
return Match;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_LIKE:
|
||||||
|
return Like;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_GLOB:
|
||||||
|
return Glob;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_REGEXP:
|
||||||
|
return Regexp;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_NE:
|
||||||
|
return NotEqual;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_ISNOT:
|
||||||
|
return IsNot;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
|
||||||
|
return IsNotNull;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_ISNULL:
|
||||||
|
return IsNull;
|
||||||
|
case SQLITE_INDEX_CONSTRAINT_IS:
|
||||||
|
return Is;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::ostringstream ss;
|
||||||
|
ss << __FILE__ << ":" << __LINE__ << ": operator " << op << " is unsupported";
|
||||||
|
throw std::invalid_argument(ss.str());
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
** Only a full table scan is supported. So xFilter simply rewinds to
|
** Only a full table scan is supported. So xFilter simply rewinds to
|
||||||
|
@ -348,8 +385,58 @@ static int parquetFilter(
|
||||||
){
|
){
|
||||||
ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor;
|
ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor;
|
||||||
printf("xFilter: idxNum=%d, idxStr=%lu, argc=%d\n", idxNum, (long unsigned int)idxStr, argc);
|
printf("xFilter: idxNum=%d, idxStr=%lu, argc=%d\n", idxNum, (long unsigned int)idxStr, argc);
|
||||||
debugConstraints((sqlite3_index_info*)idxStr, cursor->getTable(), argc, argv);
|
sqlite3_index_info* indexInfo = (sqlite3_index_info*)idxStr;
|
||||||
cursor->reset();
|
debugConstraints(indexInfo, cursor->getTable(), argc, argv);
|
||||||
|
std::vector<Constraint> constraints;
|
||||||
|
int j = 0;
|
||||||
|
for(int i = 0; i < indexInfo->nConstraint; i++) {
|
||||||
|
if(!indexInfo->aConstraint[i].usable) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ValueType type = Null;
|
||||||
|
bool boolValue = false;
|
||||||
|
uintptr_t intValue = 0;
|
||||||
|
double doubleValue = 0;
|
||||||
|
std::vector<unsigned char> blobValue;
|
||||||
|
int sqliteType = sqlite3_value_type(argv[j]);
|
||||||
|
|
||||||
|
if(sqliteType == SQLITE_INTEGER) {
|
||||||
|
type = Integer;
|
||||||
|
intValue = sqlite3_value_int64(argv[j]);
|
||||||
|
} else if(sqliteType == SQLITE_FLOAT) {
|
||||||
|
type = Double;
|
||||||
|
doubleValue = sqlite3_value_double(argv[j]);
|
||||||
|
} else if(sqliteType == SQLITE_TEXT) {
|
||||||
|
type = Text;
|
||||||
|
int len = sqlite3_value_bytes(argv[j]);
|
||||||
|
const unsigned char* ptr = sqlite3_value_text(argv[j]);
|
||||||
|
for(int k = 0; k < len; k++) {
|
||||||
|
blobValue.push_back(ptr[k]);
|
||||||
|
}
|
||||||
|
} else if(sqliteType == SQLITE_BLOB) {
|
||||||
|
type = Blob;
|
||||||
|
int len = sqlite3_value_bytes(argv[j]);
|
||||||
|
const unsigned char* ptr = (const unsigned char*)sqlite3_value_blob(argv[j]);
|
||||||
|
for(int k = 0; k < len; k++) {
|
||||||
|
blobValue.push_back(ptr[k]);
|
||||||
|
}
|
||||||
|
} else if(sqliteType == SQLITE_NULL) {
|
||||||
|
type = Null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Constraint constraint(
|
||||||
|
indexInfo->aConstraint[i].iColumn,
|
||||||
|
constraintOperatorFromSqlite(indexInfo->aConstraint[i].op),
|
||||||
|
type,
|
||||||
|
boolValue,
|
||||||
|
intValue,
|
||||||
|
doubleValue,
|
||||||
|
blobValue);
|
||||||
|
constraints.push_back(constraint);
|
||||||
|
j++;
|
||||||
|
}
|
||||||
|
cursor->reset(constraints);
|
||||||
return parquetNext(cur);
|
return parquetNext(cur);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
ParquetCursor::ParquetCursor(ParquetTable* table) {
|
ParquetCursor::ParquetCursor(ParquetTable* table) {
|
||||||
this->table = table;
|
this->table = table;
|
||||||
reader = NULL;
|
reader = NULL;
|
||||||
reset();
|
reset(std::vector<Constraint>());
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ParquetCursor::nextRowGroup() {
|
bool ParquetCursor::nextRowGroup() {
|
||||||
|
@ -39,7 +39,26 @@ bool ParquetCursor::nextRowGroup() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return true if it is _possible_ that the current
|
||||||
|
// row satisfies the constraints. Only return false
|
||||||
|
// if it definitely does not.
|
||||||
|
bool ParquetCursor::currentRowSatisfiesFilter() {
|
||||||
|
for(unsigned int i = 0; i < constraints.size(); i++) {
|
||||||
|
int column = constraints[i].getColumn();
|
||||||
|
ensureColumn(column);
|
||||||
|
int op = constraints[i].getOperator();
|
||||||
|
|
||||||
|
if(op == IsNull) {
|
||||||
|
return isNull(column);
|
||||||
|
} else if(op == IsNotNull) {
|
||||||
|
return !isNull(column);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
void ParquetCursor::next() {
|
void ParquetCursor::next() {
|
||||||
|
start:
|
||||||
if(rowsLeftInRowGroup == 0) {
|
if(rowsLeftInRowGroup == 0) {
|
||||||
if(!nextRowGroup()) {
|
if(!nextRowGroup()) {
|
||||||
// put rowId over the edge so eof returns true
|
// put rowId over the edge so eof returns true
|
||||||
|
@ -50,6 +69,8 @@ void ParquetCursor::next() {
|
||||||
|
|
||||||
rowsLeftInRowGroup--;
|
rowsLeftInRowGroup--;
|
||||||
rowId++;
|
rowId++;
|
||||||
|
if(!currentRowSatisfiesFilter())
|
||||||
|
goto start;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ParquetCursor::getRowId() {
|
int ParquetCursor::getRowId() {
|
||||||
|
@ -61,6 +82,10 @@ bool ParquetCursor::eof() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ParquetCursor::ensureColumn(int col) {
|
void ParquetCursor::ensureColumn(int col) {
|
||||||
|
// -1 signals rowid, which is trivially available
|
||||||
|
if(col == -1)
|
||||||
|
return;
|
||||||
|
|
||||||
// need to ensure a scanner exists (and skip the # of rows in the rowgroup)
|
// need to ensure a scanner exists (and skip the # of rows in the rowgroup)
|
||||||
while((unsigned int)col >= scanners.size()) {
|
while((unsigned int)col >= scanners.size()) {
|
||||||
scanners.push_back(std::shared_ptr<parquet::Scanner>());
|
scanners.push_back(std::shared_ptr<parquet::Scanner>());
|
||||||
|
@ -280,6 +305,10 @@ void ParquetCursor::ensureColumn(int col) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ParquetCursor::isNull(int col) {
|
bool ParquetCursor::isNull(int col) {
|
||||||
|
// -1 is rowid, which is trivially non null
|
||||||
|
if(col == -1)
|
||||||
|
return false;
|
||||||
|
|
||||||
return colNulls[col];
|
return colNulls[col];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,8 +342,9 @@ void ParquetCursor::close() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ParquetCursor::reset() {
|
void ParquetCursor::reset(std::vector<Constraint> constraints) {
|
||||||
close();
|
close();
|
||||||
|
this->constraints = constraints;
|
||||||
rowId = -1;
|
rowId = -1;
|
||||||
// TODO: consider having a long lived handle in ParquetTable that can be borrowed
|
// TODO: consider having a long lived handle in ParquetTable that can be borrowed
|
||||||
// without incurring the cost of opening the file from scratch twice
|
// without incurring the cost of opening the file from scratch twice
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#ifndef PARQUET_CURSOR_H
|
#ifndef PARQUET_CURSOR_H
|
||||||
#define PARQUET_CURSOR_H
|
#define PARQUET_CURSOR_H
|
||||||
|
|
||||||
|
#include "parquet_filter.h"
|
||||||
#include "parquet_table.h"
|
#include "parquet_table.h"
|
||||||
#include "parquet/api/reader.h"
|
#include "parquet/api/reader.h"
|
||||||
|
|
||||||
|
@ -29,12 +30,15 @@ class ParquetCursor {
|
||||||
|
|
||||||
bool nextRowGroup();
|
bool nextRowGroup();
|
||||||
|
|
||||||
|
std::vector<Constraint> constraints;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ParquetCursor(ParquetTable* table);
|
ParquetCursor(ParquetTable* table);
|
||||||
int getRowId();
|
int getRowId();
|
||||||
|
bool currentRowSatisfiesFilter();
|
||||||
void next();
|
void next();
|
||||||
void close();
|
void close();
|
||||||
void reset();
|
void reset(std::vector<Constraint> constraints);
|
||||||
bool eof();
|
bool eof();
|
||||||
|
|
||||||
void ensureColumn(int col);
|
void ensureColumn(int col);
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
#include "parquet_filter.h"
|
||||||
|
|
||||||
|
Constraint::Constraint(
|
||||||
|
int column,
|
||||||
|
ConstraintOperator op,
|
||||||
|
ValueType type,
|
||||||
|
bool boolValue,
|
||||||
|
uintptr_t intValue,
|
||||||
|
double doubleValue,
|
||||||
|
std::vector<unsigned char> blobValue
|
||||||
|
) {
|
||||||
|
this->column = column;
|
||||||
|
this->op = op;
|
||||||
|
this->type = type;
|
||||||
|
this->boolValue = boolValue;
|
||||||
|
this->intValue = intValue;
|
||||||
|
this->doubleValue = doubleValue;
|
||||||
|
this->blobValue = blobValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int Constraint::getColumn() {
|
||||||
|
return column;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConstraintOperator Constraint::getOperator() {
|
||||||
|
return op;
|
||||||
|
}
|
||||||
|
|
||||||
|
ValueType Constraint::getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Constraint::getBool() {
|
||||||
|
return boolValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
uintptr_t Constraint::getInt() {
|
||||||
|
return intValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
double Constraint::getDouble() {
|
||||||
|
return doubleValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<unsigned char> Constraint::getBytes() {
|
||||||
|
return blobValue;
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
#ifndef PARQUET_FILTER_H
|
||||||
|
#define PARQUET_FILTER_H
|
||||||
|
|
||||||
|
#include <vector>
|
||||||
|
#include <cstdint>
|
||||||
|
|
||||||
|
enum ConstraintOperator {
|
||||||
|
Equal,
|
||||||
|
GreaterThan,
|
||||||
|
LessThanOrEqual,
|
||||||
|
LessThan,
|
||||||
|
GreaterThanOrEqual,
|
||||||
|
Match,
|
||||||
|
Like,
|
||||||
|
Glob,
|
||||||
|
Regexp,
|
||||||
|
NotEqual,
|
||||||
|
IsNot,
|
||||||
|
IsNotNull,
|
||||||
|
IsNull,
|
||||||
|
Is
|
||||||
|
};
|
||||||
|
|
||||||
|
enum ValueType {
|
||||||
|
Null,
|
||||||
|
Boolean,
|
||||||
|
Integer,
|
||||||
|
Double,
|
||||||
|
Blob,
|
||||||
|
Text
|
||||||
|
};
|
||||||
|
|
||||||
|
class Constraint {
|
||||||
|
int column; // underlying column in the query
|
||||||
|
ConstraintOperator op;
|
||||||
|
ValueType type;
|
||||||
|
|
||||||
|
bool boolValue;
|
||||||
|
uintptr_t intValue;
|
||||||
|
double doubleValue;
|
||||||
|
// Doubles as string value
|
||||||
|
std::vector<unsigned char> blobValue;
|
||||||
|
|
||||||
|
public:
|
||||||
|
// Kind of a messy constructor function, but it's just for internal use, so whatever.
|
||||||
|
Constraint(
|
||||||
|
int column,
|
||||||
|
ConstraintOperator op,
|
||||||
|
ValueType type,
|
||||||
|
bool boolValue,
|
||||||
|
uintptr_t intValue,
|
||||||
|
double doubleValue,
|
||||||
|
std::vector<unsigned char> blobValue
|
||||||
|
);
|
||||||
|
|
||||||
|
int getColumn();
|
||||||
|
ConstraintOperator getOperator();
|
||||||
|
ValueType getType();
|
||||||
|
bool getBool();
|
||||||
|
uintptr_t getInt();
|
||||||
|
double getDouble();
|
||||||
|
std::vector<unsigned char> getBytes();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif
|
Loading…
Reference in New Issue