1
0
mirror of https://github.com/cldellow/sqlite-parquet-vtable.git synced 2025-09-14 22:39:59 +00:00

Add rowgroup filtering for rowid

This commit is contained in:
Colin Dellow
2018-03-12 20:42:50 -04:00
parent 1f938a005d
commit acc15256ec
9 changed files with 61 additions and 15 deletions

View File

@@ -6,6 +6,29 @@ ParquetCursor::ParquetCursor(ParquetTable* table) {
reset(std::vector<Constraint>());
}
bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint constraint) {
int64_t target = constraint.getInt();
switch(constraint.getOperator()) {
case IsNull:
return false;
case Is:
case Equal:
return target >= rowId && target < rowId + rowGroupSize;
case GreaterThan:
// rowId > target
return rowId + rowGroupSize > target;
case GreaterThanOrEqual:
// rowId >= target
return rowId + rowGroupSize >= rowId;
case LessThan:
return target > rowId;
case LessThanOrEqual:
return target >= rowId;
default:
return true;
}
}
// Return true if it is _possible_ that the current
// rowgroup satisfies the constraints. Only return false
// if it definitely does not.
@@ -19,9 +42,7 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() {
bool rv = true;
if(column == -1) {
if(op == IsNull) {
return false;
}
rv = currentRowGroupSatisfiesRowIdFilter(constraints[i]);
} else {
// printf("column = %d\n", column);
// std::unique_ptr<parquet::ColumnChunkMetaData> md = rowGroupMetadata->ColumnChunk(column);
@@ -41,13 +62,18 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() {
bool ParquetCursor::nextRowGroup() {
start:
if((rowGroupId + 1) >= numRowGroups)
// Ensure that rowId points at the start of this rowGroup (eg, in the case where
// we skipped an entire row group).
rowId = rowGroupStartRowId + rowGroupSize;
if((rowGroupId + 1) >= numRowGroups) {
return false;
}
rowGroupStartRowId = rowId;
rowGroupId++;
rowGroupMetadata = reader->metadata()->RowGroup(rowGroupId);
rowsLeftInRowGroup = rowGroupMetadata->num_rows();
rowGroupSize = rowsLeftInRowGroup = rowGroupMetadata->num_rows();
rowGroup = reader->RowGroup(rowGroupId);
for(unsigned int i = 0; i < scanners.size(); i++)
scanners[i] = NULL;
@@ -69,6 +95,9 @@ start:
colRows[i] = rowId;
}
// Increment rowId so currentRowGroupSatisfiesRowIdFilter can access it;
// it'll get decremented by our caller
rowId++;
if(!currentRowGroupSatisfiesFilter())
goto start;
@@ -106,8 +135,12 @@ start:
if(rowsLeftInRowGroup == 0) {
if(!nextRowGroup()) {
// put rowId over the edge so eof returns true
rowId++;
rowId = numRows + 1;
return;
} else {
// After a successful nextRowGroup, rowId is pointing at the current row. Make it
// point before so the rest of the logic works out.
rowId--;
}
}
@@ -395,6 +428,8 @@ void ParquetCursor::reset(std::vector<Constraint> constraints) {
reader = parquet::ParquetFileReader::OpenFile(table->file.data());
rowGroupId = -1;
rowGroupSize = 0;
rowGroupStartRowId = -1;
// TODO: handle the case where rowgroups have disjoint schemas?
// TODO: or at least, fail fast if detected
rowsLeftInRowGroup = 0;