`ensureColumn` catches up when rows are skipped

This commit is contained in:
Colin Dellow 2018-03-04 22:29:35 -05:00
parent bb3a9440f7
commit 67005623df
11 changed files with 100 additions and 9 deletions

3
.gitignore vendored
View File

@ -40,4 +40,5 @@
/cmds.txt /cmds.txt
/sqlite-with-parquet /sqlite-with-parquet
/testcase-out.txt /testcase-out.txt
/testcase-err.txt /testcase-stdout.txt
/testcase-stderr.txt

View File

@ -131,8 +131,6 @@ static int parquetClose(sqlite3_vtab_cursor *cur){
** Constructor for a new sqlite3_vtab_parquet cursor object. ** Constructor for a new sqlite3_vtab_parquet cursor object.
*/ */
static int parquetOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor){ static int parquetOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor){
printf("xOpen\n");
std::unique_ptr<sqlite3_vtab_cursor_parquet, void(*)(void*)> cursor( std::unique_ptr<sqlite3_vtab_cursor_parquet, void(*)(void*)> cursor(
(sqlite3_vtab_cursor_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_cursor_parquet)), (sqlite3_vtab_cursor_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_cursor_parquet)),
sqlite3_free); sqlite3_free);
@ -254,7 +252,6 @@ static int parquetFilter(
int idxNum, const char *idxStr, int idxNum, const char *idxStr,
int argc, sqlite3_value **argv int argc, sqlite3_value **argv
){ ){
printf("xFilter\n");
ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor;
cursor->reset(); cursor->reset();
return parquetNext(cur); return parquetNext(cur);

View File

@ -11,6 +11,7 @@ bool ParquetCursor::nextRowGroup() {
if((rowGroupId + 1) >= numRowGroups) if((rowGroupId + 1) >= numRowGroups)
return false; return false;
rowGroupStartRowId = rowId;
rowGroupId++; rowGroupId++;
rowGroupMetadata = reader->metadata()->RowGroup(0); rowGroupMetadata = reader->metadata()->RowGroup(0);
rowsLeftInRowGroup = rowGroupMetadata->num_rows(); rowsLeftInRowGroup = rowGroupMetadata->num_rows();
@ -31,6 +32,10 @@ bool ParquetCursor::nextRowGroup() {
logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type(); logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type();
} }
for(unsigned int i = 0; i < colRows.size(); i++) {
colRows[i] = rowId;
}
return true; return true;
} }
@ -59,7 +64,8 @@ void ParquetCursor::ensureColumn(int col) {
// need to ensure a scanner exists (and skip the # of rows in the rowgroup) // need to ensure a scanner exists (and skip the # of rows in the rowgroup)
while((unsigned int)col >= scanners.size()) { while((unsigned int)col >= scanners.size()) {
scanners.push_back(std::shared_ptr<parquet::Scanner>()); scanners.push_back(std::shared_ptr<parquet::Scanner>());
colRows.push_back(-1); // If it doesn't exist, it's the rowId as of the last nextRowGroup call
colRows.push_back(rowGroupStartRowId);
colNulls.push_back(false); colNulls.push_back(false);
colIntValues.push_back(0); colIntValues.push_back(0);
colDoubleValues.push_back(0); colDoubleValues.push_back(0);
@ -74,8 +80,83 @@ void ParquetCursor::ensureColumn(int col) {
// Actually fetch a value, stash data in colRows, colNulls, colValues // Actually fetch a value, stash data in colRows, colNulls, colValues
if(colRows[col] != rowId) { if(colRows[col] != rowId) {
colRows[col] = rowId; // We may need to skip some records, eg, a query like
// SELECT a WHERE b = 10
// may have read b, but skipped a until b matches the predicate.
bool wasNull = false; bool wasNull = false;
while(colRows[col] + 1 < rowId) {
switch(types[col]) {
case parquet::Type::INT32:
{
parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get();
int rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::FLOAT:
{
parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get();
float rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::DOUBLE:
{
parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get();
double rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::BYTE_ARRAY:
{
parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get();
parquet::ByteArray ba;
s->NextValue(&ba, &wasNull);
break;
}
case parquet::Type::INT96:
{
parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get();
parquet::Int96 rv;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::INT64:
{
parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get();
long rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::BOOLEAN:
{
parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get();
bool rv = false;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
{
parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get();
parquet::FixedLenByteArray flba;
s->NextValue(&flba, &wasNull);
break;
}
default:
// Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
parquet::TypeToString(types[col]);
throw std::invalid_argument(ss.str());
break;
}
colRows[col]++;
}
colRows[col] = rowId;
wasNull = false;
switch(types[col]) { switch(types[col]) {
case parquet::Type::INT32: case parquet::Type::INT32:

View File

@ -22,6 +22,7 @@ class ParquetCursor {
int rowId; int rowId;
int rowGroupId; int rowGroupId;
int rowGroupStartRowId;
int numRows; int numRows;
int numRowGroups; int numRowGroups;
int rowsLeftInRowGroup; int rowsLeftInRowGroup;

View File

@ -11,7 +11,6 @@ std::string ParquetTable::CreateStatement() {
// TODO: parse columns from file // TODO: parse columns from file
std::string text("CREATE TABLE x("); std::string text("CREATE TABLE x(");
auto schema = reader->metadata()->schema(); auto schema = reader->metadata()->schema();
printf("num cols: %d\n", schema->num_columns());
for(auto i = 0; i < schema->num_columns(); i++) { for(auto i = 0; i < schema->num_columns(); i++) {
auto _col = schema->GetColumnRoot(i); auto _col = schema->GetColumnRoot(i);

View File

@ -0,0 +1,3 @@
100-rows-1.parquet
select count(*) from (select * from test t1, test t2);
10000

View File

@ -0,0 +1,3 @@
100-rows-1.parquet
select int8_1 from test where rowid = 50;
0

View File

@ -0,0 +1,3 @@
100-rows-10.parquet
select int8_1 from test where rowid = 50;
0

View File

@ -0,0 +1,3 @@
100-rows-10.parquet
select int8_1 from test where rowid = 55;
-5

View File

@ -23,13 +23,13 @@ main() {
root=$(readlink -f "$root") root=$(readlink -f "$root")
cd "$root" cd "$root"
queries=$(find tests/queries -type f -name '*.sql') queries=$(find tests/queries -type f -name '*.sql' | sort)
while read -r file; do while read -r file; do
echo "Testing: $file" echo "Testing: $file"
parquet_file=$(head -n1 "$file") parquet_file=$(head -n1 "$file")
query=$(head -n2 "$file" | tail -n1) query=$(head -n2 "$file" | tail -n1)
results=$(tail -n+3 "$file") results=$(tail -n+3 "$file")
if ! "$root"/sqlite/sqlite3 -init <(run_query "$file" "$parquet_file" "$query") < /dev/null > /dev/null 2> testcase-err.txt; then if ! "$root"/sqlite/sqlite3 -init <(run_query "$file" "$parquet_file" "$query") < /dev/null > testcase-stdout.txt 2> testcase-stderr.txt; then
echo "...FAILED; check testcase-{out,err}.txt" >&2 echo "...FAILED; check testcase-{out,err}.txt" >&2
exit 1 exit 1
fi fi