sqlite-parquet-vtable/parquet/parquet_cursor.cc

653 lines
20 KiB
C++

#include "parquet_cursor.h"
ParquetCursor::ParquetCursor(ParquetTable* table) {
this->table = table;
reader = NULL;
reset(std::vector<Constraint>());
}
bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint) {
int64_t target = constraint.intValue;
switch(constraint.op) {
case IsNull:
return false;
case Is:
case Equal:
return target >= rowId && target < rowId + rowGroupSize;
case GreaterThan:
// rowId > target
return rowId + rowGroupSize > target;
case GreaterThanOrEqual:
// rowId >= target
return rowId + rowGroupSize >= rowId;
case LessThan:
return target > rowId;
case LessThanOrEqual:
return target >= rowId;
default:
return true;
}
}
bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BYTE_ARRAY>>*)_stats.get();
if(!stats->HasMinMax()) {
return true;
}
if(constraint.type != Text) {
return true;
}
const std::string& str = constraint.stringValue;
const parquet::ByteArray& min = stats->min();
const parquet::ByteArray& max = stats->max();
std::string minStr((const char*)min.ptr, min.len);
std::string maxStr((const char*)max.ptr, max.len);
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data());
switch(constraint.op) {
case Is:
case Equal:
return str >= minStr && str <= maxStr;
case GreaterThanOrEqual:
return maxStr >= str;
case GreaterThan:
return maxStr > str;
case LessThan:
return minStr < str;
case LessThanOrEqual:
return minStr <= str;
case IsNot:
case NotEqual:
// If min == max == str, we can skip this.
return !(minStr == maxStr && str == minStr);
case Like:
// TODO: We could do something here where we filter based on the leading characters
// of the target. For now, do nothing.
default:
return true;
}
}
int64_t int96toMsSinceEpoch(const parquet::Int96& rv) {
__int128 ns = rv.value[0] + ((unsigned long)rv.value[1] << 32);
__int128 julianDay = rv.value[2];
__int128 nsSinceEpoch = (julianDay - 2440588);
nsSinceEpoch *= 86400;
nsSinceEpoch *= 1000 * 1000 * 1000;
nsSinceEpoch += ns;
nsSinceEpoch /= 1000000;
return nsSinceEpoch;
}
bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> _stats) {
if(!_stats->HasMinMax()) {
return true;
}
if(constraint.type != Integer) {
return true;
}
int column = constraint.column;
int64_t min = std::numeric_limits<int64_t>::min();
int64_t max = std::numeric_limits<int64_t>::max();
parquet::Type::type pqType = types[column];
if(pqType == parquet::Type::INT32) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT32>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT32>>*)_stats.get();
min = stats->min();
max = stats->max();
} else if(pqType == parquet::Type::INT64) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT64>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT64>>*)_stats.get();
min = stats->min();
max = stats->max();
} else if(pqType == parquet::Type::INT96) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT96>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::INT96>>*)_stats.get();
min = int96toMsSinceEpoch(stats->min());
max = int96toMsSinceEpoch(stats->max());
} else if(pqType == parquet::Type::BOOLEAN) {
parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BOOLEAN>>* stats =
(parquet::TypedRowGroupStatistics<parquet::DataType<parquet::Type::BOOLEAN>>*)_stats.get();
min = stats->min();
max = stats->max();
} else {
// Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " <<
parquet::TypeToString(pqType);
throw std::invalid_argument(ss.str());
}
const int64_t value = constraint.intValue;
// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data());
switch(constraint.op) {
case Is:
case Equal:
return value >= min && value <= max;
case GreaterThanOrEqual:
return max >= value;
case GreaterThan:
return max > value;
case LessThan:
return min < value;
case LessThanOrEqual:
return min <= value;
case IsNot:
case NotEqual:
// If min == max == str, we can skip this.
return !(min == max && value == min);
case Like:
default:
return true;
}
return true;
}
bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr<parquet::RowGroupStatistics> stats) {
return true;
}
bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint& constraint) {
if(constraint.type != Text) {
return true;
}
const std::vector<unsigned char>& blob = constraint.blobValue;
parquet::ByteArray* ba = getByteArray(constraint.column);
switch(constraint.op) {
case Is:
case Equal:
if(blob.size() != ba->len)
return false;
return 0 == memcmp(&blob[0], ba->ptr, ba->len);
case IsNot:
case NotEqual:
if(blob.size() != ba->len)
return true;
return 0 != memcmp(&blob[0], ba->ptr, ba->len);
case GreaterThan:
case GreaterThanOrEqual:
case LessThan:
case LessThanOrEqual:
case Like:
default:
return true;
}
}
bool ParquetCursor::currentRowSatisfiesIntegerFilter(Constraint& constraint) {
return true;
}
bool ParquetCursor::currentRowSatisfiesDoubleFilter(Constraint& constraint) {
return true;
}
// Return true if it is _possible_ that the current
// rowgroup satisfies the constraints. Only return false
// if it definitely does not.
//
// This avoids opening rowgroups that can't return useful
// data, which provides substantial performance benefits.
bool ParquetCursor::currentRowGroupSatisfiesFilter() {
for(unsigned int i = 0; i < constraints.size(); i++) {
int column = constraints[i].column;
int op = constraints[i].op;
bool rv = true;
if(column == -1) {
rv = currentRowGroupSatisfiesRowIdFilter(constraints[i]);
} else {
std::unique_ptr<parquet::ColumnChunkMetaData> md = rowGroupMetadata->ColumnChunk(column);
if(!md->is_stats_set()) {
continue;
}
std::shared_ptr<parquet::RowGroupStatistics> stats = md->statistics();
// SQLite is much looser with types than you might expect if you
// come from a Postgres background. The constraint '30.0' (that is,
// a string containing a floating point number) should be treated
// as equal to a field containing an integer 30.
//
// This means that even if the parquet physical type is integer,
// the constraint type may be a string, so dispatch to the filter
// fn based on the Parquet type.
if(op == IsNull) {
rv = stats->null_count() > 0;
} else if(op == IsNotNull) {
rv = stats->num_values() > 0;
} else {
parquet::Type::type pqType = types[column];
if(pqType == parquet::Type::BYTE_ARRAY) {
rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats);
} else if(pqType == parquet::Type::INT32 ||
pqType == parquet::Type::INT64 ||
pqType == parquet::Type::INT96 ||
pqType == parquet::Type::BOOLEAN) {
rv = currentRowGroupSatisfiesIntegerFilter(constraints[i], stats);
} else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) {
rv = currentRowGroupSatisfiesDoubleFilter(constraints[i], stats);
}
}
}
if(!rv)
return false;
}
return true;
}
bool ParquetCursor::nextRowGroup() {
start:
// Ensure that rowId points at the start of this rowGroup (eg, in the case where
// we skipped an entire row group).
rowId = rowGroupStartRowId + rowGroupSize;
if((rowGroupId + 1) >= numRowGroups) {
return false;
}
rowGroupStartRowId = rowId;
rowGroupId++;
rowGroupMetadata = reader->metadata()->RowGroup(rowGroupId);
rowGroupSize = rowsLeftInRowGroup = rowGroupMetadata->num_rows();
rowGroup = reader->RowGroup(rowGroupId);
for(unsigned int i = 0; i < scanners.size(); i++)
scanners[i] = NULL;
while(types.size() < (unsigned int)rowGroupMetadata->num_columns()) {
types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type());
}
while(logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) {
logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type());
}
for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) {
types[i] = rowGroupMetadata->schema()->Column(i)->physical_type();
logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type();
}
for(unsigned int i = 0; i < colRows.size(); i++) {
colRows[i] = rowId;
}
// Increment rowId so currentRowGroupSatisfiesRowIdFilter can access it;
// it'll get decremented by our caller
rowId++;
if(!currentRowGroupSatisfiesFilter())
goto start;
return true;
}
// Return true if it is _possible_ that the current
// row satisfies the constraints. Only return false
// if it definitely does not.
//
// This avoids pointless transitions between the SQLite VM
// and the extension, which can add up on a dataset of tens
// of millions of rows.
bool ParquetCursor::currentRowSatisfiesFilter() {
for(unsigned int i = 0; i < constraints.size(); i++) {
bool rv = true;
int column = constraints[i].column;
ensureColumn(column);
int op = constraints[i].op;
if(op == IsNull) {
rv = isNull(column);
} else if(op == IsNotNull) {
rv = !isNull(column);
} else {
parquet::Type::type pqType = types[column];
if(pqType == parquet::Type::BYTE_ARRAY) {
rv = currentRowSatisfiesTextFilter(constraints[i]);
} else if(pqType == parquet::Type::INT32 ||
pqType == parquet::Type::INT64 ||
pqType == parquet::Type::INT96 ||
pqType == parquet::Type::BOOLEAN) {
rv = currentRowSatisfiesIntegerFilter(constraints[i]);
} else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) {
rv = currentRowSatisfiesDoubleFilter(constraints[i]);
}
}
if(!rv)
return false;
}
return true;
}
void ParquetCursor::next() {
start:
if(rowsLeftInRowGroup == 0) {
if(!nextRowGroup()) {
// put rowId over the edge so eof returns true
rowId = numRows + 1;
return;
} else {
// After a successful nextRowGroup, rowId is pointing at the current row. Make it
// point before so the rest of the logic works out.
rowId--;
}
}
rowsLeftInRowGroup--;
rowId++;
if(!currentRowSatisfiesFilter())
goto start;
}
int ParquetCursor::getRowId() {
return rowId;
}
bool ParquetCursor::eof() {
return rowId >= numRows;
}
void ParquetCursor::ensureColumn(int col) {
// -1 signals rowid, which is trivially available
if(col == -1)
return;
// need to ensure a scanner exists (and skip the # of rows in the rowgroup)
while((unsigned int)col >= scanners.size()) {
scanners.push_back(std::shared_ptr<parquet::Scanner>());
// If it doesn't exist, it's the rowId as of the last nextRowGroup call
colRows.push_back(rowGroupStartRowId);
colNulls.push_back(false);
colIntValues.push_back(0);
colDoubleValues.push_back(0);
colByteArrayValues.push_back(parquet::ByteArray());
}
if(scanners[col].get() == NULL) {
std::shared_ptr<parquet::ColumnReader> colReader = rowGroup->Column(col);
scanners[col] = parquet::Scanner::Make(colReader);
}
// Actually fetch a value, stash data in colRows, colNulls, colValues
if(colRows[col] != rowId) {
// We may need to skip some records, eg, a query like
// SELECT a WHERE b = 10
// may have read b, but skipped a until b matches the predicate.
bool wasNull = false;
while(colRows[col] + 1 < rowId) {
switch(types[col]) {
case parquet::Type::INT32:
{
parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get();
int rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::FLOAT:
{
parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get();
float rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::DOUBLE:
{
parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get();
double rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::BYTE_ARRAY:
{
parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get();
parquet::ByteArray ba;
s->NextValue(&ba, &wasNull);
break;
}
case parquet::Type::INT96:
{
parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get();
parquet::Int96 rv;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::INT64:
{
parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get();
long rv = 0;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::BOOLEAN:
{
parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get();
bool rv = false;
s->NextValue(&rv, &wasNull);
break;
}
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
{
parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get();
parquet::FixedLenByteArray flba;
s->NextValue(&flba, &wasNull);
break;
}
default:
// Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
parquet::TypeToString(types[col]);
throw std::invalid_argument(ss.str());
break;
}
colRows[col]++;
}
colRows[col] = rowId;
wasNull = false;
switch(types[col]) {
case parquet::Type::INT32:
{
parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get();
int rv = 0;
if(s->NextValue(&rv, &wasNull)) {
colIntValues[col] = rv;
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
case parquet::Type::FLOAT:
{
parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get();
float rv = 0;
if(s->NextValue(&rv, &wasNull)) {
colDoubleValues[col] = rv;
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
case parquet::Type::DOUBLE:
{
parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get();
double rv = 0;
if(s->NextValue(&rv, &wasNull)) {
colDoubleValues[col] = rv;
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
case parquet::Type::BYTE_ARRAY:
{
parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get();
if(!s->NextValue(&colByteArrayValues[col], &wasNull)) {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
case parquet::Type::INT96:
{
// INT96 tracks a date with nanosecond precision, convert to ms since epoch.
// ...see https://github.com/apache/parquet-format/pull/49 for more
//
// First 8 bytes: nanoseconds into the day
// Last 4 bytes: Julian day
// To get nanoseconds since the epoch:
// (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds
parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get();
parquet::Int96 rv {0, 0, 0};
if(s->NextValue(&rv, &wasNull)) {
colIntValues[col] = int96toMsSinceEpoch(rv);
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
case parquet::Type::INT64:
{
parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get();
long rv = 0;
if(s->NextValue(&rv, &wasNull)) {
colIntValues[col] = rv;
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
case parquet::Type::BOOLEAN:
{
parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get();
bool rv = false;
if(s->NextValue(&rv, &wasNull)) {
colIntValues[col] = rv ? 1 : 0;
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
{
parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get();
parquet::FixedLenByteArray flba;
if(s->NextValue(&flba, &wasNull)) {
colByteArrayValues[col].ptr = flba.ptr;
// TODO: cache this
colByteArrayValues[col].len = rowGroupMetadata->schema()->Column(col)->type_length();
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
default:
// Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
parquet::TypeToString(types[col]);
throw std::invalid_argument(ss.str());
break;
}
colNulls[col] = wasNull;
}
}
bool ParquetCursor::isNull(int col) {
// -1 is rowid, which is trivially non null
if(col == -1)
return false;
return colNulls[col];
}
int ParquetCursor::getInt32(int col) {
return colIntValues[col];
}
long ParquetCursor::getInt64(int col) {
return colIntValues[col];
}
double ParquetCursor::getDouble(int col) {
return colDoubleValues[col];
}
parquet::ByteArray* ParquetCursor::getByteArray(int col) {
return &colByteArrayValues[col];
}
parquet::Type::type ParquetCursor::getPhysicalType(int col) {
return types[col];
}
parquet::LogicalType::type ParquetCursor::getLogicalType(int col) {
return logicalTypes[col];
}
void ParquetCursor::close() {
if(reader != NULL) {
reader->Close();
}
}
void ParquetCursor::reset(std::vector<Constraint> constraints) {
close();
this->constraints = constraints;
rowId = -1;
// TODO: consider having a long lived handle in ParquetTable that can be borrowed
// without incurring the cost of opening the file from scratch twice
reader = parquet::ParquetFileReader::OpenFile(
table->file.data(),
true,
parquet::default_reader_properties(),
table->getMetadata());
rowGroupId = -1;
rowGroupSize = 0;
rowGroupStartRowId = -1;
// TODO: handle the case where rowgroups have disjoint schemas?
// TODO: or at least, fail fast if detected
rowsLeftInRowGroup = 0;
numRows = reader->metadata()->num_rows();
numRowGroups = reader->metadata()->num_row_groups();
}
ParquetTable* ParquetCursor::getTable() { return table; }