#include "parquet_cursor.h" ParquetCursor::ParquetCursor(ParquetTable* table) { this->table = table; reader = NULL; reset(std::vector()); } bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint) { int64_t target = constraint.intValue; switch(constraint.op) { case IsNull: return false; case Is: case Equal: return target >= rowId && target < rowId + rowGroupSize; case GreaterThan: // rowId > target return rowId + rowGroupSize > target; case GreaterThanOrEqual: // rowId >= target return rowId + rowGroupSize >= rowId; case LessThan: return target > rowId; case LessThanOrEqual: return target >= rowId; default: return true; } } bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr _stats) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); if(!stats->HasMinMax()) { return true; } if(constraint.type != Text) { return true; } const std::string& str = constraint.stringValue; const parquet::ByteArray& min = stats->min(); const parquet::ByteArray& max = stats->max(); std::string minStr((const char*)min.ptr, min.len); std::string maxStr((const char*)max.ptr, max.len); // printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); switch(constraint.op) { case Is: case Equal: return str >= minStr && str <= maxStr; case GreaterThanOrEqual: return maxStr >= str; case GreaterThan: return maxStr > str; case LessThan: return minStr < str; case LessThanOrEqual: return minStr <= str; case IsNot: case NotEqual: // If min == max == str, we can skip this. return !(minStr == maxStr && str == minStr); case Like: { const std::string& likeStringValue = constraint.likeStringValue; std::string truncatedMin = minStr.substr(0, likeStringValue.size()); std::string truncatedMax = maxStr.substr(0, likeStringValue.size()); return likeStringValue.empty() || (likeStringValue >= truncatedMin && likeStringValue <= truncatedMax); } default: return true; } } int64_t int96toMsSinceEpoch(const parquet::Int96& rv) { __int128 ns = rv.value[0] + ((unsigned long)rv.value[1] << 32); __int128 julianDay = rv.value[2]; __int128 nsSinceEpoch = (julianDay - 2440588); nsSinceEpoch *= 86400; nsSinceEpoch *= 1000 * 1000 * 1000; nsSinceEpoch += ns; nsSinceEpoch /= 1000000; return nsSinceEpoch; } bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr _stats) { if(!_stats->HasMinMax()) { return true; } if(constraint.type != Integer) { return true; } int column = constraint.column; int64_t min = std::numeric_limits::min(); int64_t max = std::numeric_limits::max(); parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::INT32) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = stats->min(); max = stats->max(); } else if(pqType == parquet::Type::INT64) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = stats->min(); max = stats->max(); } else if(pqType == parquet::Type::INT96) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = int96toMsSinceEpoch(stats->min()); max = int96toMsSinceEpoch(stats->max()); } else if(pqType == parquet::Type::BOOLEAN) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = stats->min(); max = stats->max(); } else { // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " << parquet::TypeToString(pqType); throw std::invalid_argument(ss.str()); } const int64_t value = constraint.intValue; // printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); switch(constraint.op) { case Is: case Equal: return value >= min && value <= max; case GreaterThanOrEqual: return max >= value; case GreaterThan: return max > value; case LessThan: return min < value; case LessThanOrEqual: return min <= value; case IsNot: case NotEqual: // If min == max == str, we can skip this. return !(min == max && value == min); case Like: default: return true; } return true; } bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr _stats) { if(!_stats->HasMinMax()) { return true; } if(constraint.type != Double) { return true; } int column = constraint.column; double min = std::numeric_limits::min(); double max = std::numeric_limits::max(); parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::DOUBLE) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = stats->min(); max = stats->max(); } else if(pqType == parquet::Type::FLOAT) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = stats->min(); max = stats->max(); } else { // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " << parquet::TypeToString(pqType); throw std::invalid_argument(ss.str()); } const double value = constraint.doubleValue; // printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); switch(constraint.op) { case Is: case Equal: return value >= min && value <= max; case GreaterThanOrEqual: return max >= value; case GreaterThan: return max > value; case LessThan: return min < value; case LessThanOrEqual: return min <= value; case IsNot: case NotEqual: // If min == max == str, we can skip this. return !(min == max && value == min); case Like: default: return true; } return true; } bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint& constraint) { if(constraint.type != Text) { return true; } parquet::ByteArray* ba = getByteArray(constraint.column); switch(constraint.op) { case Is: case Equal: { const std::vector& blob = constraint.blobValue; if(blob.size() != ba->len) return false; return 0 == memcmp(&blob[0], ba->ptr, ba->len); } case IsNot: case NotEqual: { const std::vector& blob = constraint.blobValue; if(blob.size() != ba->len) return true; return 0 != memcmp(&blob[0], ba->ptr, ba->len); } case GreaterThan: { const std::vector& blob = constraint.blobValue; return std::lexicographical_compare( &blob[0], &blob[0] + blob.size(), ba->ptr, ba->ptr + ba->len); } case GreaterThanOrEqual: { const std::vector& blob = constraint.blobValue; bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len); return equal || std::lexicographical_compare( &blob[0], &blob[0] + blob.size(), ba->ptr, ba->ptr + ba->len); } case LessThan: { const std::vector& blob = constraint.blobValue; return std::lexicographical_compare( ba->ptr, ba->ptr + ba->len, &blob[0], &blob[0] + blob.size()); } case LessThanOrEqual: { const std::vector& blob = constraint.blobValue; bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len); return equal || std::lexicographical_compare( ba->ptr, ba->ptr + ba->len, &blob[0], &blob[0] + blob.size()); } case Like: { const std::string& likeStringValue = constraint.likeStringValue; if(likeStringValue.size() > ba->len) return false; size_t len = ba->len; if(likeStringValue.size() < len) len = likeStringValue.size(); return 0 == memcmp(&likeStringValue[0], ba->ptr, len); } default: return true; } } bool ParquetCursor::currentRowSatisfiesIntegerFilter(Constraint& constraint) { return true; } bool ParquetCursor::currentRowSatisfiesDoubleFilter(Constraint& constraint) { return true; } // Return true if it is _possible_ that the current // rowgroup satisfies the constraints. Only return false // if it definitely does not. // // This avoids opening rowgroups that can't return useful // data, which provides substantial performance benefits. bool ParquetCursor::currentRowGroupSatisfiesFilter() { for(unsigned int i = 0; i < constraints.size(); i++) { int column = constraints[i].column; int op = constraints[i].op; bool rv = true; if(column == -1) { rv = currentRowGroupSatisfiesRowIdFilter(constraints[i]); } else { std::unique_ptr md = rowGroupMetadata->ColumnChunk(column); if(!md->is_stats_set()) { continue; } std::shared_ptr stats = md->statistics(); // SQLite is much looser with types than you might expect if you // come from a Postgres background. The constraint '30.0' (that is, // a string containing a floating point number) should be treated // as equal to a field containing an integer 30. // // This means that even if the parquet physical type is integer, // the constraint type may be a string, so dispatch to the filter // fn based on the Parquet type. if(op == IsNull) { rv = stats->null_count() > 0; } else if(op == IsNotNull) { rv = stats->num_values() > 0; } else { parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::BYTE_ARRAY) { rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats); } else if(pqType == parquet::Type::INT32 || pqType == parquet::Type::INT64 || pqType == parquet::Type::INT96 || pqType == parquet::Type::BOOLEAN) { rv = currentRowGroupSatisfiesIntegerFilter(constraints[i], stats); } else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) { rv = currentRowGroupSatisfiesDoubleFilter(constraints[i], stats); } } } if(!rv) return false; } return true; } bool ParquetCursor::nextRowGroup() { start: // Ensure that rowId points at the start of this rowGroup (eg, in the case where // we skipped an entire row group). rowId = rowGroupStartRowId + rowGroupSize; if((rowGroupId + 1) >= numRowGroups) { return false; } rowGroupStartRowId = rowId; rowGroupId++; rowGroupMetadata = reader->metadata()->RowGroup(rowGroupId); rowGroupSize = rowsLeftInRowGroup = rowGroupMetadata->num_rows(); rowGroup = reader->RowGroup(rowGroupId); for(unsigned int i = 0; i < scanners.size(); i++) scanners[i] = NULL; while(types.size() < (unsigned int)rowGroupMetadata->num_columns()) { types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type()); } while(logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) { logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type()); } for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) { types[i] = rowGroupMetadata->schema()->Column(i)->physical_type(); logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type(); } for(unsigned int i = 0; i < colRows.size(); i++) { colRows[i] = rowId; } // Increment rowId so currentRowGroupSatisfiesRowIdFilter can access it; // it'll get decremented by our caller rowId++; if(!currentRowGroupSatisfiesFilter()) goto start; return true; } // Return true if it is _possible_ that the current // row satisfies the constraints. Only return false // if it definitely does not. // // This avoids pointless transitions between the SQLite VM // and the extension, which can add up on a dataset of tens // of millions of rows. bool ParquetCursor::currentRowSatisfiesFilter() { for(unsigned int i = 0; i < constraints.size(); i++) { bool rv = true; int column = constraints[i].column; ensureColumn(column); int op = constraints[i].op; if(op == IsNull) { rv = isNull(column); } else if(op == IsNotNull) { rv = !isNull(column); } else { parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::BYTE_ARRAY) { rv = currentRowSatisfiesTextFilter(constraints[i]); } else if(pqType == parquet::Type::INT32 || pqType == parquet::Type::INT64 || pqType == parquet::Type::INT96 || pqType == parquet::Type::BOOLEAN) { rv = currentRowSatisfiesIntegerFilter(constraints[i]); } else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) { rv = currentRowSatisfiesDoubleFilter(constraints[i]); } } if(!rv) return false; } return true; } void ParquetCursor::next() { start: if(rowsLeftInRowGroup == 0) { if(!nextRowGroup()) { // put rowId over the edge so eof returns true rowId = numRows + 1; return; } else { // After a successful nextRowGroup, rowId is pointing at the current row. Make it // point before so the rest of the logic works out. rowId--; } } rowsLeftInRowGroup--; rowId++; if(!currentRowSatisfiesFilter()) goto start; } int ParquetCursor::getRowId() { return rowId; } bool ParquetCursor::eof() { return rowId >= numRows; } void ParquetCursor::ensureColumn(int col) { // -1 signals rowid, which is trivially available if(col == -1) return; // need to ensure a scanner exists (and skip the # of rows in the rowgroup) while((unsigned int)col >= scanners.size()) { scanners.push_back(std::shared_ptr()); // If it doesn't exist, it's the rowId as of the last nextRowGroup call colRows.push_back(rowGroupStartRowId); colNulls.push_back(false); colIntValues.push_back(0); colDoubleValues.push_back(0); colByteArrayValues.push_back(parquet::ByteArray()); } if(scanners[col].get() == NULL) { std::shared_ptr colReader = rowGroup->Column(col); scanners[col] = parquet::Scanner::Make(colReader); } // Actually fetch a value, stash data in colRows, colNulls, colValues if(colRows[col] != rowId) { // We may need to skip some records, eg, a query like // SELECT a WHERE b = 10 // may have read b, but skipped a until b matches the predicate. bool wasNull = false; while(colRows[col] + 1 < rowId) { switch(types[col]) { case parquet::Type::INT32: { parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get(); int rv = 0; s->NextValue(&rv, &wasNull); break; } case parquet::Type::FLOAT: { parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get(); float rv = 0; s->NextValue(&rv, &wasNull); break; } case parquet::Type::DOUBLE: { parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); double rv = 0; s->NextValue(&rv, &wasNull); break; } case parquet::Type::BYTE_ARRAY: { parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); parquet::ByteArray ba; s->NextValue(&ba, &wasNull); break; } case parquet::Type::INT96: { parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); parquet::Int96 rv; s->NextValue(&rv, &wasNull); break; } case parquet::Type::INT64: { parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); long rv = 0; s->NextValue(&rv, &wasNull); break; } case parquet::Type::BOOLEAN: { parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get(); bool rv = false; s->NextValue(&rv, &wasNull); break; } case parquet::Type::FIXED_LEN_BYTE_ARRAY: { parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get(); parquet::FixedLenByteArray flba; s->NextValue(&flba, &wasNull); break; } default: // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << parquet::TypeToString(types[col]); throw std::invalid_argument(ss.str()); break; } colRows[col]++; } colRows[col] = rowId; wasNull = false; switch(types[col]) { case parquet::Type::INT32: { parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get(); int rv = 0; if(s->NextValue(&rv, &wasNull)) { colIntValues[col] = rv; } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::FLOAT: { parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get(); float rv = 0; if(s->NextValue(&rv, &wasNull)) { colDoubleValues[col] = rv; } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::DOUBLE: { parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); double rv = 0; if(s->NextValue(&rv, &wasNull)) { colDoubleValues[col] = rv; } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::BYTE_ARRAY: { parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); if(!s->NextValue(&colByteArrayValues[col], &wasNull)) { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::INT96: { // INT96 tracks a date with nanosecond precision, convert to ms since epoch. // ...see https://github.com/apache/parquet-format/pull/49 for more // // First 8 bytes: nanoseconds into the day // Last 4 bytes: Julian day // To get nanoseconds since the epoch: // (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); parquet::Int96 rv {0, 0, 0}; if(s->NextValue(&rv, &wasNull)) { colIntValues[col] = int96toMsSinceEpoch(rv); } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::INT64: { parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); long rv = 0; if(s->NextValue(&rv, &wasNull)) { colIntValues[col] = rv; } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::BOOLEAN: { parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get(); bool rv = false; if(s->NextValue(&rv, &wasNull)) { colIntValues[col] = rv ? 1 : 0; } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::FIXED_LEN_BYTE_ARRAY: { parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get(); parquet::FixedLenByteArray flba; if(s->NextValue(&flba, &wasNull)) { colByteArrayValues[col].ptr = flba.ptr; // TODO: cache this colByteArrayValues[col].len = rowGroupMetadata->schema()->Column(col)->type_length(); } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } default: // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << parquet::TypeToString(types[col]); throw std::invalid_argument(ss.str()); break; } colNulls[col] = wasNull; } } bool ParquetCursor::isNull(int col) { // -1 is rowid, which is trivially non null if(col == -1) return false; return colNulls[col]; } int ParquetCursor::getInt32(int col) { return colIntValues[col]; } long ParquetCursor::getInt64(int col) { return colIntValues[col]; } double ParquetCursor::getDouble(int col) { return colDoubleValues[col]; } parquet::ByteArray* ParquetCursor::getByteArray(int col) { return &colByteArrayValues[col]; } parquet::Type::type ParquetCursor::getPhysicalType(int col) { return types[col]; } parquet::LogicalType::type ParquetCursor::getLogicalType(int col) { return logicalTypes[col]; } void ParquetCursor::close() { if(reader != NULL) { reader->Close(); } } void ParquetCursor::reset(std::vector constraints) { close(); this->constraints = constraints; rowId = -1; // TODO: consider having a long lived handle in ParquetTable that can be borrowed // without incurring the cost of opening the file from scratch twice reader = parquet::ParquetFileReader::OpenFile( table->file.data(), true, parquet::default_reader_properties(), table->getMetadata()); rowGroupId = -1; rowGroupSize = 0; rowGroupStartRowId = -1; // TODO: handle the case where rowgroups have disjoint schemas? // TODO: or at least, fail fast if detected rowsLeftInRowGroup = 0; numRows = reader->metadata()->num_rows(); numRowGroups = reader->metadata()->num_row_groups(); } ParquetTable* ParquetCursor::getTable() { return table; }