diff --git a/parquet/Makefile b/parquet/Makefile index 4c074dd..00deae9 100644 --- a/parquet/Makefile +++ b/parquet/Makefile @@ -10,18 +10,21 @@ BOOST_LIB = /usr/lib/x86_64-linux-gnu/libboost_regex.so LDFLAGS = -O3 $(PARQUET_LIB) $(THRIFT_LIB) $(ARROW_LIB) $(BOOST_LIB) DEPS = hellomake.h -OBJ = parquet.o parquet_table.o parquet_cursor.o +OBJ = parquet.o parquet_filter.o parquet_table.o parquet_cursor.o libparquet.so: $(OBJ) $(CC) -shared -o $@ $^ $(LDFLAGS) -parquet_cursor.o: parquet_cursor.cc parquet_cursor.h parquet_table.h +parquet_filter.o: parquet_filter.cc parquet_filter.h + $(CC) -c -o $@ $< $(CFLAGS) + +parquet_cursor.o: parquet_cursor.cc parquet_cursor.h parquet_table.h parquet_filter.h $(CC) -c -o $@ $< $(CFLAGS) parquet_table.o: parquet_table.cc parquet_table.h $(CC) -c -o $@ $< $(CFLAGS) -parquet.o: parquet.cc parquet_cursor.h parquet_table.h +parquet.o: parquet.cc parquet_cursor.h parquet_table.h parquet_filter.h $(CC) -c -o $@ $< $(CFLAGS) .PHONY: clean diff --git a/parquet/parquet.cc b/parquet/parquet.cc index 890b0b0..a5b25d1 100644 --- a/parquet/parquet.cc +++ b/parquet/parquet.cc @@ -22,6 +22,7 @@ SQLITE_EXTENSION_INIT1 #include "parquet_table.h" #include "parquet_cursor.h" +#include "parquet_filter.h" /* Forward references to the various virtual table methods implemented * in this file. */ @@ -334,6 +335,42 @@ void debugConstraints(sqlite3_index_info *pIdxInfo, ParquetTable *table, int arg } } +ConstraintOperator constraintOperatorFromSqlite(int op) { + switch(op) { + case SQLITE_INDEX_CONSTRAINT_EQ: + return Equal; + case SQLITE_INDEX_CONSTRAINT_GT: + return GreaterThan; + case SQLITE_INDEX_CONSTRAINT_LE: + return LessThanOrEqual; + case SQLITE_INDEX_CONSTRAINT_LT: + return LessThan; + case SQLITE_INDEX_CONSTRAINT_GE: + return GreaterThanOrEqual; + case SQLITE_INDEX_CONSTRAINT_MATCH: + return Match; + case SQLITE_INDEX_CONSTRAINT_LIKE: + return Like; + case SQLITE_INDEX_CONSTRAINT_GLOB: + return Glob; + case SQLITE_INDEX_CONSTRAINT_REGEXP: + return Regexp; + case SQLITE_INDEX_CONSTRAINT_NE: + return NotEqual; + case SQLITE_INDEX_CONSTRAINT_ISNOT: + return IsNot; + case SQLITE_INDEX_CONSTRAINT_ISNOTNULL: + return IsNotNull; + case SQLITE_INDEX_CONSTRAINT_ISNULL: + return IsNull; + case SQLITE_INDEX_CONSTRAINT_IS: + return Is; + } + + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": operator " << op << " is unsupported"; + throw std::invalid_argument(ss.str()); +} /* ** Only a full table scan is supported. So xFilter simply rewinds to @@ -348,8 +385,58 @@ static int parquetFilter( ){ ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; printf("xFilter: idxNum=%d, idxStr=%lu, argc=%d\n", idxNum, (long unsigned int)idxStr, argc); - debugConstraints((sqlite3_index_info*)idxStr, cursor->getTable(), argc, argv); - cursor->reset(); + sqlite3_index_info* indexInfo = (sqlite3_index_info*)idxStr; + debugConstraints(indexInfo, cursor->getTable(), argc, argv); + std::vector constraints; + int j = 0; + for(int i = 0; i < indexInfo->nConstraint; i++) { + if(!indexInfo->aConstraint[i].usable) { + continue; + } + + ValueType type = Null; + bool boolValue = false; + uintptr_t intValue = 0; + double doubleValue = 0; + std::vector blobValue; + int sqliteType = sqlite3_value_type(argv[j]); + + if(sqliteType == SQLITE_INTEGER) { + type = Integer; + intValue = sqlite3_value_int64(argv[j]); + } else if(sqliteType == SQLITE_FLOAT) { + type = Double; + doubleValue = sqlite3_value_double(argv[j]); + } else if(sqliteType == SQLITE_TEXT) { + type = Text; + int len = sqlite3_value_bytes(argv[j]); + const unsigned char* ptr = sqlite3_value_text(argv[j]); + for(int k = 0; k < len; k++) { + blobValue.push_back(ptr[k]); + } + } else if(sqliteType == SQLITE_BLOB) { + type = Blob; + int len = sqlite3_value_bytes(argv[j]); + const unsigned char* ptr = (const unsigned char*)sqlite3_value_blob(argv[j]); + for(int k = 0; k < len; k++) { + blobValue.push_back(ptr[k]); + } + } else if(sqliteType == SQLITE_NULL) { + type = Null; + } + + Constraint constraint( + indexInfo->aConstraint[i].iColumn, + constraintOperatorFromSqlite(indexInfo->aConstraint[i].op), + type, + boolValue, + intValue, + doubleValue, + blobValue); + constraints.push_back(constraint); + j++; + } + cursor->reset(constraints); return parquetNext(cur); } diff --git a/parquet/parquet_cursor.cc b/parquet/parquet_cursor.cc index 51fdc99..ff1e714 100644 --- a/parquet/parquet_cursor.cc +++ b/parquet/parquet_cursor.cc @@ -3,7 +3,7 @@ ParquetCursor::ParquetCursor(ParquetTable* table) { this->table = table; reader = NULL; - reset(); + reset(std::vector()); } bool ParquetCursor::nextRowGroup() { @@ -39,7 +39,26 @@ bool ParquetCursor::nextRowGroup() { return true; } +// Return true if it is _possible_ that the current +// row satisfies the constraints. Only return false +// if it definitely does not. +bool ParquetCursor::currentRowSatisfiesFilter() { + for(unsigned int i = 0; i < constraints.size(); i++) { + int column = constraints[i].getColumn(); + ensureColumn(column); + int op = constraints[i].getOperator(); + + if(op == IsNull) { + return isNull(column); + } else if(op == IsNotNull) { + return !isNull(column); + } + } + return true; +} + void ParquetCursor::next() { +start: if(rowsLeftInRowGroup == 0) { if(!nextRowGroup()) { // put rowId over the edge so eof returns true @@ -50,6 +69,8 @@ void ParquetCursor::next() { rowsLeftInRowGroup--; rowId++; + if(!currentRowSatisfiesFilter()) + goto start; } int ParquetCursor::getRowId() { @@ -61,6 +82,10 @@ bool ParquetCursor::eof() { } void ParquetCursor::ensureColumn(int col) { + // -1 signals rowid, which is trivially available + if(col == -1) + return; + // need to ensure a scanner exists (and skip the # of rows in the rowgroup) while((unsigned int)col >= scanners.size()) { scanners.push_back(std::shared_ptr()); @@ -280,6 +305,10 @@ void ParquetCursor::ensureColumn(int col) { } bool ParquetCursor::isNull(int col) { + // -1 is rowid, which is trivially non null + if(col == -1) + return false; + return colNulls[col]; } @@ -313,8 +342,9 @@ void ParquetCursor::close() { } } -void ParquetCursor::reset() { +void ParquetCursor::reset(std::vector constraints) { close(); + this->constraints = constraints; rowId = -1; // TODO: consider having a long lived handle in ParquetTable that can be borrowed // without incurring the cost of opening the file from scratch twice diff --git a/parquet/parquet_cursor.h b/parquet/parquet_cursor.h index 1f8d1e7..80bc64a 100644 --- a/parquet/parquet_cursor.h +++ b/parquet/parquet_cursor.h @@ -1,6 +1,7 @@ #ifndef PARQUET_CURSOR_H #define PARQUET_CURSOR_H +#include "parquet_filter.h" #include "parquet_table.h" #include "parquet/api/reader.h" @@ -29,12 +30,15 @@ class ParquetCursor { bool nextRowGroup(); + std::vector constraints; + public: ParquetCursor(ParquetTable* table); int getRowId(); + bool currentRowSatisfiesFilter(); void next(); void close(); - void reset(); + void reset(std::vector constraints); bool eof(); void ensureColumn(int col); diff --git a/parquet/parquet_filter.cc b/parquet/parquet_filter.cc new file mode 100644 index 0000000..4f4b290 --- /dev/null +++ b/parquet/parquet_filter.cc @@ -0,0 +1,47 @@ +#include "parquet_filter.h" + +Constraint::Constraint( + int column, + ConstraintOperator op, + ValueType type, + bool boolValue, + uintptr_t intValue, + double doubleValue, + std::vector blobValue +) { + this->column = column; + this->op = op; + this->type = type; + this->boolValue = boolValue; + this->intValue = intValue; + this->doubleValue = doubleValue; + this->blobValue = blobValue; +} + +int Constraint::getColumn() { + return column; +} + +ConstraintOperator Constraint::getOperator() { + return op; +} + +ValueType Constraint::getType() { + return type; +} + +bool Constraint::getBool() { + return boolValue; +} + +uintptr_t Constraint::getInt() { + return intValue; +} + +double Constraint::getDouble() { + return doubleValue; +} + +std::vector Constraint::getBytes() { + return blobValue; +} diff --git a/parquet/parquet_filter.h b/parquet/parquet_filter.h new file mode 100644 index 0000000..11590a7 --- /dev/null +++ b/parquet/parquet_filter.h @@ -0,0 +1,65 @@ +#ifndef PARQUET_FILTER_H +#define PARQUET_FILTER_H + +#include +#include + +enum ConstraintOperator { + Equal, + GreaterThan, + LessThanOrEqual, + LessThan, + GreaterThanOrEqual, + Match, + Like, + Glob, + Regexp, + NotEqual, + IsNot, + IsNotNull, + IsNull, + Is +}; + +enum ValueType { + Null, + Boolean, + Integer, + Double, + Blob, + Text +}; + +class Constraint { + int column; // underlying column in the query + ConstraintOperator op; + ValueType type; + + bool boolValue; + uintptr_t intValue; + double doubleValue; + // Doubles as string value + std::vector blobValue; + +public: + // Kind of a messy constructor function, but it's just for internal use, so whatever. + Constraint( + int column, + ConstraintOperator op, + ValueType type, + bool boolValue, + uintptr_t intValue, + double doubleValue, + std::vector blobValue + ); + + int getColumn(); + ConstraintOperator getOperator(); + ValueType getType(); + bool getBool(); + uintptr_t getInt(); + double getDouble(); + std::vector getBytes(); +}; + +#endif