diff --git a/.gitignore b/.gitignore index 935812d..0c51391 100644 --- a/.gitignore +++ b/.gitignore @@ -40,4 +40,5 @@ /cmds.txt /sqlite-with-parquet /testcase-out.txt -/testcase-err.txt +/testcase-stdout.txt +/testcase-stderr.txt diff --git a/parquet/parquet.cc b/parquet/parquet.cc index c854a0d..7f2cb9e 100644 --- a/parquet/parquet.cc +++ b/parquet/parquet.cc @@ -131,8 +131,6 @@ static int parquetClose(sqlite3_vtab_cursor *cur){ ** Constructor for a new sqlite3_vtab_parquet cursor object. */ static int parquetOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor){ - printf("xOpen\n"); - std::unique_ptr cursor( (sqlite3_vtab_cursor_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_cursor_parquet)), sqlite3_free); @@ -254,7 +252,6 @@ static int parquetFilter( int idxNum, const char *idxStr, int argc, sqlite3_value **argv ){ - printf("xFilter\n"); ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; cursor->reset(); return parquetNext(cur); diff --git a/parquet/parquet_cursor.cc b/parquet/parquet_cursor.cc index 5aaf1b1..2543a79 100644 --- a/parquet/parquet_cursor.cc +++ b/parquet/parquet_cursor.cc @@ -11,6 +11,7 @@ bool ParquetCursor::nextRowGroup() { if((rowGroupId + 1) >= numRowGroups) return false; + rowGroupStartRowId = rowId; rowGroupId++; rowGroupMetadata = reader->metadata()->RowGroup(0); rowsLeftInRowGroup = rowGroupMetadata->num_rows(); @@ -31,6 +32,10 @@ bool ParquetCursor::nextRowGroup() { logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type(); } + for(unsigned int i = 0; i < colRows.size(); i++) { + colRows[i] = rowId; + } + return true; } @@ -59,7 +64,8 @@ void ParquetCursor::ensureColumn(int col) { // need to ensure a scanner exists (and skip the # of rows in the rowgroup) while((unsigned int)col >= scanners.size()) { scanners.push_back(std::shared_ptr()); - colRows.push_back(-1); + // If it doesn't exist, it's the rowId as of the last nextRowGroup call + colRows.push_back(rowGroupStartRowId); colNulls.push_back(false); colIntValues.push_back(0); colDoubleValues.push_back(0); @@ -74,8 +80,83 @@ void ParquetCursor::ensureColumn(int col) { // Actually fetch a value, stash data in colRows, colNulls, colValues if(colRows[col] != rowId) { - colRows[col] = rowId; + // We may need to skip some records, eg, a query like + // SELECT a WHERE b = 10 + // may have read b, but skipped a until b matches the predicate. bool wasNull = false; + while(colRows[col] + 1 < rowId) { + switch(types[col]) { + case parquet::Type::INT32: + { + parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get(); + int rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::FLOAT: + { + parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get(); + float rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::DOUBLE: + { + parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); + double rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::BYTE_ARRAY: + { + parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); + parquet::ByteArray ba; + s->NextValue(&ba, &wasNull); + break; + } + case parquet::Type::INT96: + { + parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); + parquet::Int96 rv; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::INT64: + { + parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); + long rv = 0; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::BOOLEAN: + { + parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get(); + bool rv = false; + s->NextValue(&rv, &wasNull); + break; + } + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + { + parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get(); + parquet::FixedLenByteArray flba; + s->NextValue(&flba, &wasNull); + break; + } + default: + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << + parquet::TypeToString(types[col]); + throw std::invalid_argument(ss.str()); + break; + + } + colRows[col]++; + } + + colRows[col] = rowId; + wasNull = false; switch(types[col]) { case parquet::Type::INT32: diff --git a/parquet/parquet_cursor.h b/parquet/parquet_cursor.h index 88c0a5e..8347bb7 100644 --- a/parquet/parquet_cursor.h +++ b/parquet/parquet_cursor.h @@ -22,6 +22,7 @@ class ParquetCursor { int rowId; int rowGroupId; + int rowGroupStartRowId; int numRows; int numRowGroups; int rowsLeftInRowGroup; diff --git a/parquet/parquet_table.cc b/parquet/parquet_table.cc index 162a252..16dc2fb 100644 --- a/parquet/parquet_table.cc +++ b/parquet/parquet_table.cc @@ -11,7 +11,6 @@ std::string ParquetTable::CreateStatement() { // TODO: parse columns from file std::string text("CREATE TABLE x("); auto schema = reader->metadata()->schema(); - printf("num cols: %d\n", schema->num_columns()); for(auto i = 0; i < schema->num_columns(); i++) { auto _col = schema->GetColumnRoot(i); diff --git a/tests/queries/003-cross-join-count-1-rowgroup.sql b/tests/queries/003-cross-join-count-1-rowgroup.sql new file mode 100644 index 0000000..4d0f646 --- /dev/null +++ b/tests/queries/003-cross-join-count-1-rowgroup.sql @@ -0,0 +1,3 @@ +100-rows-1.parquet +select count(*) from (select * from test t1, test t2); +10000 diff --git a/tests/queries/003-cross-join-count.sql b/tests/queries/004-cross-join-10-rowgroups.sql similarity index 100% rename from tests/queries/003-cross-join-count.sql rename to tests/queries/004-cross-join-10-rowgroups.sql diff --git a/tests/queries/005-rowid-1-rowgroup.sql b/tests/queries/005-rowid-1-rowgroup.sql new file mode 100644 index 0000000..8ba7801 --- /dev/null +++ b/tests/queries/005-rowid-1-rowgroup.sql @@ -0,0 +1,3 @@ +100-rows-1.parquet +select int8_1 from test where rowid = 50; +0 diff --git a/tests/queries/006-rowid-10-rowgroups.sql b/tests/queries/006-rowid-10-rowgroups.sql new file mode 100644 index 0000000..3ef3362 --- /dev/null +++ b/tests/queries/006-rowid-10-rowgroups.sql @@ -0,0 +1,3 @@ +100-rows-10.parquet +select int8_1 from test where rowid = 50; +0 diff --git a/tests/queries/007-rowid-55-10-rowgroups.sql b/tests/queries/007-rowid-55-10-rowgroups.sql new file mode 100644 index 0000000..d9a63ee --- /dev/null +++ b/tests/queries/007-rowid-55-10-rowgroups.sql @@ -0,0 +1,3 @@ +100-rows-10.parquet +select int8_1 from test where rowid = 55; +-5 diff --git a/tests/test-queries b/tests/test-queries index 7c7c994..1856f9c 100755 --- a/tests/test-queries +++ b/tests/test-queries @@ -23,13 +23,13 @@ main() { root=$(readlink -f "$root") cd "$root" - queries=$(find tests/queries -type f -name '*.sql') + queries=$(find tests/queries -type f -name '*.sql' | sort) while read -r file; do echo "Testing: $file" parquet_file=$(head -n1 "$file") query=$(head -n2 "$file" | tail -n1) results=$(tail -n+3 "$file") - if ! "$root"/sqlite/sqlite3 -init <(run_query "$file" "$parquet_file" "$query") < /dev/null > /dev/null 2> testcase-err.txt; then + if ! "$root"/sqlite/sqlite3 -init <(run_query "$file" "$parquet_file" "$query") < /dev/null > testcase-stdout.txt 2> testcase-stderr.txt; then echo "...FAILED; check testcase-{out,err}.txt" >&2 exit 1 fi