#include "parquet_cursor.h" ParquetCursor::ParquetCursor(ParquetTable* table): table(table) { reader = NULL; reset(std::vector()); } bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint) { if(constraint.type != Integer) return true; int64_t target = constraint.intValue; switch(constraint.op) { case IsNull: return false; case Is: case Equal: return target >= rowId && target < rowId + rowGroupSize; case GreaterThan: // rowId > target return rowId + rowGroupSize > target; case GreaterThanOrEqual: // rowId >= target return rowId + rowGroupSize >= rowId; case LessThan: return target > rowId; case LessThanOrEqual: return target >= rowId; default: return true; } } bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr _stats) { if(!_stats->HasMinMax()) { return true; } if(constraint.type != Blob) { return true; } const unsigned char* minPtr = NULL; const unsigned char* maxPtr = NULL; size_t minLen = 0; size_t maxLen = 0; parquet::Type::type pqType = types[constraint.column]; if(pqType == parquet::Type::BYTE_ARRAY) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); minPtr = stats->min().ptr; minLen = stats->min().len; maxPtr = stats->max().ptr; maxLen = stats->max().len; } else if(pqType == parquet::Type::FIXED_LEN_BYTE_ARRAY) { // It seems like parquet-cpp doesn't actually produce stats for FLBA yet, so // rather than have untested code here, we'll just short circuit. // // Once I can get my hands on such a file, it should be easy to add support. return true; } else { // Should be impossible to get here std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesBlobFilter called on unsupported type: " << parquet::TypeToString(pqType); throw std::invalid_argument(ss.str()); } const std::vector& blob = constraint.blobValue; switch(constraint.op) { case Is: case Equal: { bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0; bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; bool blobGteMinBlob = std::lexicographical_compare( minPtr, minPtr + minLen, &blob[0], &blob[0] + blob.size()); bool blobLtMaxBlob = std::lexicographical_compare( &blob[0], &blob[0] + blob.size(), maxPtr, maxPtr + maxLen); return (minEqual || blobGteMinBlob) && (maxEqual || blobLtMaxBlob); } case GreaterThanOrEqual: { bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; return maxEqual || std::lexicographical_compare( &blob[0], &blob[0] + blob.size(), maxPtr, maxPtr + maxLen); } case GreaterThan: return std::lexicographical_compare( &blob[0], &blob[0] + blob.size(), maxPtr, maxPtr + maxLen); case LessThan: return std::lexicographical_compare( minPtr, minPtr + minLen, &blob[0], &blob[0] + blob.size()); case LessThanOrEqual: { bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0; return minEqual || std::lexicographical_compare( minPtr, minPtr + minLen, &blob[0], &blob[0] + blob.size()); } case IsNot: case NotEqual: { // If min == max == blob, we can skip this. bool blobMaxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; bool minMaxEqual = minLen == maxLen && memcmp(minPtr, maxPtr, minLen) == 0; return !(blobMaxEqual && minMaxEqual); } default: return true; } } bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr _stats) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); if(!stats->HasMinMax()) { return true; } if(constraint.type != Text) { return true; } const std::string& str = constraint.stringValue; const parquet::ByteArray& min = stats->min(); const parquet::ByteArray& max = stats->max(); std::string minStr((const char*)min.ptr, min.len); std::string maxStr((const char*)max.ptr, max.len); // printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); switch(constraint.op) { case Is: case Equal: return str >= minStr && str <= maxStr; case GreaterThanOrEqual: return maxStr >= str; case GreaterThan: return maxStr > str; case LessThan: return minStr < str; case LessThanOrEqual: return minStr <= str; case IsNot: case NotEqual: // If min == max == str, we can skip this. return !(minStr == maxStr && str == minStr); case Like: { const std::string& likeStringValue = constraint.likeStringValue; std::string truncatedMin = minStr.substr(0, likeStringValue.size()); std::string truncatedMax = maxStr.substr(0, likeStringValue.size()); return likeStringValue.empty() || (likeStringValue >= truncatedMin && likeStringValue <= truncatedMax); } default: return true; } } int64_t int96toMsSinceEpoch(const parquet::Int96& rv) { __int128 ns = rv.value[0] + ((unsigned long)rv.value[1] << 32); __int128 julianDay = rv.value[2]; __int128 nsSinceEpoch = (julianDay - 2440588); nsSinceEpoch *= 86400; nsSinceEpoch *= 1000 * 1000 * 1000; nsSinceEpoch += ns; nsSinceEpoch /= 1000000; return nsSinceEpoch; } bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr _stats) { if(!_stats->HasMinMax()) { return true; } if(constraint.type != Integer) { return true; } int column = constraint.column; int64_t min = std::numeric_limits::min(); int64_t max = std::numeric_limits::max(); parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::INT32) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = stats->min(); max = stats->max(); } else if(pqType == parquet::Type::INT64) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = stats->min(); max = stats->max(); } else if(pqType == parquet::Type::INT96) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = int96toMsSinceEpoch(stats->min()); max = int96toMsSinceEpoch(stats->max()); } else if(pqType == parquet::Type::BOOLEAN) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = stats->min(); max = stats->max(); } else { // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " << parquet::TypeToString(pqType); throw std::invalid_argument(ss.str()); } const int64_t value = constraint.intValue; // printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); switch(constraint.op) { case Is: case Equal: return value >= min && value <= max; case GreaterThanOrEqual: return max >= value; case GreaterThan: return max > value; case LessThan: return min < value; case LessThanOrEqual: return min <= value; case IsNot: case NotEqual: // If min == max == str, we can skip this. return !(min == max && value == min); case Like: default: return true; } return true; } bool ParquetCursor::currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr _stats) { if(!_stats->HasMinMax()) { return true; } if(constraint.type != Double) { return true; } int column = constraint.column; double min = std::numeric_limits::min(); double max = std::numeric_limits::max(); parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::DOUBLE) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = stats->min(); max = stats->max(); } else if(pqType == parquet::Type::FLOAT) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); min = stats->min(); max = stats->max(); } else { // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " << parquet::TypeToString(pqType); throw std::invalid_argument(ss.str()); } const double value = constraint.doubleValue; // printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); switch(constraint.op) { case Is: case Equal: return value >= min && value <= max; case GreaterThanOrEqual: return max >= value; case GreaterThan: return max > value; case LessThan: return min < value; case LessThanOrEqual: return min <= value; case IsNot: case NotEqual: // If min == max == str, we can skip this. return !(min == max && value == min); case Like: default: return true; } return true; } bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint& constraint) { if(constraint.type != Text) { return true; } parquet::ByteArray* ba = getByteArray(constraint.column); switch(constraint.op) { case Is: case Equal: { const std::vector& blob = constraint.blobValue; if(blob.size() != ba->len) return false; return 0 == memcmp(&blob[0], ba->ptr, ba->len); } case IsNot: case NotEqual: { const std::vector& blob = constraint.blobValue; if(blob.size() != ba->len) return true; return 0 != memcmp(&blob[0], ba->ptr, ba->len); } case GreaterThan: { const std::vector& blob = constraint.blobValue; return std::lexicographical_compare( &blob[0], &blob[0] + blob.size(), ba->ptr, ba->ptr + ba->len); } case GreaterThanOrEqual: { const std::vector& blob = constraint.blobValue; bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len); return equal || std::lexicographical_compare( &blob[0], &blob[0] + blob.size(), ba->ptr, ba->ptr + ba->len); } case LessThan: { const std::vector& blob = constraint.blobValue; return std::lexicographical_compare( ba->ptr, ba->ptr + ba->len, &blob[0], &blob[0] + blob.size()); } case LessThanOrEqual: { const std::vector& blob = constraint.blobValue; bool equal = blob.size() == ba->len && 0 == memcmp(&blob[0], ba->ptr, ba->len); return equal || std::lexicographical_compare( ba->ptr, ba->ptr + ba->len, &blob[0], &blob[0] + blob.size()); } case Like: { const std::string& likeStringValue = constraint.likeStringValue; if(likeStringValue.size() > ba->len) return false; size_t len = ba->len; if(likeStringValue.size() < len) len = likeStringValue.size(); return 0 == memcmp(&likeStringValue[0], ba->ptr, len); } default: return true; } } bool ParquetCursor::currentRowSatisfiesIntegerFilter(Constraint& constraint) { if(constraint.type != Integer) { return true; } int column = constraint.column; // CONSIDER: should we just store int64s everywhere? int64_t value = 0; if(column == -1) { value = rowId; } else { parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::INT32 || pqType == parquet::Type::BOOLEAN) { value = getInt32(column); } else if(pqType == parquet::Type::INT64 || pqType == parquet::Type::INT96) { value = getInt64(column); } else { // Should be impossible to get here std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": currentRowSatisfiesIntegerFilter called on unsupported type: " << parquet::TypeToString(pqType); throw std::invalid_argument(ss.str()); } } int64_t constraintValue = constraint.intValue; switch(constraint.op) { case Is: case Equal: return constraintValue == value; case IsNot: case NotEqual: return constraintValue != value; case GreaterThan: return value > constraintValue; case GreaterThanOrEqual: return value >= constraintValue; case LessThan: return value < constraintValue; case LessThanOrEqual: return value <= constraintValue; case Like: default: return true; } return true; } bool ParquetCursor::currentRowSatisfiesDoubleFilter(Constraint& constraint) { if(constraint.type != Double) { return true; } int column = constraint.column; double value = getDouble(column); double constraintValue = constraint.doubleValue; switch(constraint.op) { case Is: case Equal: return constraintValue == value; case IsNot: case NotEqual: return constraintValue != value; case GreaterThan: return value > constraintValue; case GreaterThanOrEqual: return value >= constraintValue; case LessThan: return value < constraintValue; case LessThanOrEqual: return value <= constraintValue; case Like: default: return true; } return true; } // Return true if it is _possible_ that the current // rowgroup satisfies the constraints. Only return false // if it definitely does not. // // This avoids opening rowgroups that can't return useful // data, which provides substantial performance benefits. bool ParquetCursor::currentRowGroupSatisfiesFilter() { bool overallRv = true; for(unsigned int i = 0; i < constraints.size(); i++) { int column = constraints[i].column; int op = constraints[i].op; bool rv = true; if(column == -1) { rv = currentRowGroupSatisfiesRowIdFilter(constraints[i]); } else { std::unique_ptr md = rowGroupMetadata->ColumnChunk(column); if(md->is_stats_set()) { std::shared_ptr stats = md->statistics(); // SQLite is much looser with types than you might expect if you // come from a Postgres background. The constraint '30.0' (that is, // a string containing a floating point number) should be treated // as equal to a field containing an integer 30. // // This means that even if the parquet physical type is integer, // the constraint type may be a string, so dispatch to the filter // fn based on the Parquet type. if(op == IsNull) { rv = stats->null_count() > 0; } else if(op == IsNotNull) { rv = stats->num_values() > 0; } else { parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::BYTE_ARRAY && logicalTypes[column] == parquet::LogicalType::UTF8) { rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats); } else if(pqType == parquet::Type::BYTE_ARRAY) { rv = currentRowGroupSatisfiesBlobFilter(constraints[i], stats); } else if(pqType == parquet::Type::INT32 || pqType == parquet::Type::INT64 || pqType == parquet::Type::INT96 || pqType == parquet::Type::BOOLEAN) { rv = currentRowGroupSatisfiesIntegerFilter(constraints[i], stats); } else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) { rv = currentRowGroupSatisfiesDoubleFilter(constraints[i], stats); } } } } // and it with the existing actual, which may have come from a previous run rv = rv && constraints[i].bitmap.getActualMembership(rowGroupId); if(!rv) { constraints[i].bitmap.setEstimatedMembership(rowGroupId, rv); constraints[i].bitmap.setActualMembership(rowGroupId, rv); } overallRv = overallRv && rv; } // printf("rowGroup %d %s\n", rowGroupId, overallRv ? "may satisfy" : "does not satisfy"); return overallRv; } bool ParquetCursor::nextRowGroup() { start: // Ensure that rowId points at the start of this rowGroup (eg, in the case where // we skipped an entire row group). rowId = rowGroupStartRowId + rowGroupSize; if((rowGroupId + 1) >= numRowGroups) { return false; } while(table->getNumColumns() >= scanners.size()) { scanners.push_back(std::shared_ptr()); // If it doesn't exist, it's the rowId as of the last nextRowGroup call colRows.push_back(rowGroupStartRowId); colNulls.push_back(false); colIntValues.push_back(0); colDoubleValues.push_back(0); colByteArrayValues.push_back(parquet::ByteArray()); } rowGroupStartRowId = rowId; rowGroupId++; rowGroupMetadata = reader->metadata()->RowGroup(rowGroupId); rowGroupSize = rowsLeftInRowGroup = rowGroupMetadata->num_rows(); rowGroup = reader->RowGroup(rowGroupId); for(unsigned int i = 0; i < scanners.size(); i++) scanners[i] = NULL; while(types.size() < (unsigned int)rowGroupMetadata->num_columns()) { types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type()); } while(logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) { logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type()); } for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) { types[i] = rowGroupMetadata->schema()->Column(i)->physical_type(); logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type(); } for(unsigned int i = 0; i < colRows.size(); i++) { colRows[i] = rowId; } // Increment rowId so currentRowGroupSatisfiesRowIdFilter can access it; // it'll get decremented by our caller rowId++; // We're going to scan this row group; reset the expectation of discovering // a row for(unsigned int i = 0; i < constraints.size(); i++) { if(rowGroupId > 0 && constraints[i].rowGroupId == rowGroupId - 1) { if(constraints[i].valid || constraints[i].hadRows) { constraints[i].bitmap.setActualMembership(rowGroupId - 1, constraints[i].hadRows); } } constraints[i].hadRows = false; constraints[i].valid = true; } if(!currentRowGroupSatisfiesFilter()) goto start; for(unsigned int i = 0; i < constraints.size(); i++) { constraints[i].rowGroupId = rowGroupId; } return true; } // Return true if it is _possible_ that the current // row satisfies the constraints. Only return false // if it definitely does not. // // This avoids pointless transitions between the SQLite VM // and the extension, which can add up on a dataset of tens // of millions of rows. bool ParquetCursor::currentRowSatisfiesFilter() { for(unsigned int i = 0; i < constraints.size(); i++) { bool rv = true; int column = constraints[i].column; ensureColumn(column); int op = constraints[i].op; if(op == IsNull) { rv = isNull(column); } else if(op == IsNotNull) { rv = !isNull(column); } else { parquet::Type::type pqType = types[column]; if(pqType == parquet::Type::BYTE_ARRAY && logicalTypes[column] == parquet::LogicalType::UTF8) { rv = currentRowSatisfiesTextFilter(constraints[i]); } else if(pqType == parquet::Type::INT32 || pqType == parquet::Type::INT64 || pqType == parquet::Type::INT96 || pqType == parquet::Type::BOOLEAN) { rv = currentRowSatisfiesIntegerFilter(constraints[i]); } else if(pqType == parquet::Type::FLOAT || pqType == parquet::Type::DOUBLE) { rv = currentRowSatisfiesDoubleFilter(constraints[i]); } } // it defaults to false; so only set it if true // ideally we'd short-circuit if we'd already set this group as visited if(rv) { constraints[i].hadRows = true; } else { // When we short circuit, mark the other constraints as not evaluated to avoid persisting incorrect data. for(unsigned int j = i + 1; j < constraints.size(); j++) { constraints[j].valid = false; } return false; } } return true; } void ParquetCursor::next() { // Returns true if we've crossed a row group boundary start: if(rowsLeftInRowGroup == 0) { if(!nextRowGroup()) { // put rowId over the edge so eof returns true rowId = numRows + 1; return; } else { // After a successful nextRowGroup, rowId is pointing at the current row. Make it // point before so the rest of the logic works out. rowId--; } } rowsLeftInRowGroup--; rowId++; if(!currentRowSatisfiesFilter()) goto start; } int ParquetCursor::getRowId() { return rowId; } bool ParquetCursor::eof() { return rowId > numRows; } void ParquetCursor::ensureColumn(int col) { // -1 signals rowid, which is trivially available if(col == -1) return; // need to ensure a scanner exists (and skip the # of rows in the rowgroup) if(scanners[col].get() == NULL) { std::shared_ptr colReader = rowGroup->Column(col); scanners[col] = parquet::Scanner::Make(colReader); } // Actually fetch a value, stash data in colRows, colNulls, colValues if(colRows[col] != rowId) { // We may need to skip some records, eg, a query like // SELECT a WHERE b = 10 // may have read b, but skipped a until b matches the predicate. bool wasNull = false; while(colRows[col] + 1 < rowId) { switch(types[col]) { case parquet::Type::INT32: { parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get(); int rv = 0; s->NextValue(&rv, &wasNull); break; } case parquet::Type::FLOAT: { parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get(); float rv = 0; s->NextValue(&rv, &wasNull); break; } case parquet::Type::DOUBLE: { parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); double rv = 0; s->NextValue(&rv, &wasNull); break; } case parquet::Type::BYTE_ARRAY: { parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); parquet::ByteArray ba; s->NextValue(&ba, &wasNull); break; } case parquet::Type::INT96: { parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); parquet::Int96 rv; s->NextValue(&rv, &wasNull); break; } case parquet::Type::INT64: { parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); long rv = 0; s->NextValue(&rv, &wasNull); break; } case parquet::Type::BOOLEAN: { parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get(); bool rv = false; s->NextValue(&rv, &wasNull); break; } case parquet::Type::FIXED_LEN_BYTE_ARRAY: { parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get(); parquet::FixedLenByteArray flba; s->NextValue(&flba, &wasNull); break; } default: // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << parquet::TypeToString(types[col]); throw std::invalid_argument(ss.str()); break; } colRows[col]++; } colRows[col] = rowId; wasNull = false; switch(types[col]) { case parquet::Type::INT32: { parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get(); int rv = 0; if(s->NextValue(&rv, &wasNull)) { colIntValues[col] = rv; } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::FLOAT: { parquet::FloatScanner* s = (parquet::FloatScanner*)scanners[col].get(); float rv = 0; if(s->NextValue(&rv, &wasNull)) { colDoubleValues[col] = rv; } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::DOUBLE: { parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); double rv = 0; if(s->NextValue(&rv, &wasNull)) { colDoubleValues[col] = rv; } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::BYTE_ARRAY: { parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); if(!s->NextValue(&colByteArrayValues[col], &wasNull)) { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::INT96: { // INT96 tracks a date with nanosecond precision, convert to ms since epoch. // ...see https://github.com/apache/parquet-format/pull/49 for more // // First 8 bytes: nanoseconds into the day // Last 4 bytes: Julian day // To get nanoseconds since the epoch: // (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); parquet::Int96 rv {0, 0, 0}; if(s->NextValue(&rv, &wasNull)) { colIntValues[col] = int96toMsSinceEpoch(rv); } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::INT64: { parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); long rv = 0; if(s->NextValue(&rv, &wasNull)) { colIntValues[col] = rv; } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::BOOLEAN: { parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get(); bool rv = false; if(s->NextValue(&rv, &wasNull)) { colIntValues[col] = rv ? 1 : 0; } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } case parquet::Type::FIXED_LEN_BYTE_ARRAY: { parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get(); parquet::FixedLenByteArray flba; if(s->NextValue(&flba, &wasNull)) { colByteArrayValues[col].ptr = flba.ptr; // TODO: cache this colByteArrayValues[col].len = rowGroupMetadata->schema()->Column(col)->type_length(); } else { throw std::invalid_argument("unexpectedly lacking a next value"); } break; } default: // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? std::ostringstream ss; ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << parquet::TypeToString(types[col]); throw std::invalid_argument(ss.str()); break; } colNulls[col] = wasNull; } } bool ParquetCursor::isNull(int col) { // -1 is rowid, which is trivially non null if(col == -1) return false; return colNulls[col]; } int ParquetCursor::getInt32(int col) { return colIntValues[col]; } long ParquetCursor::getInt64(int col) { return colIntValues[col]; } double ParquetCursor::getDouble(int col) { return colDoubleValues[col]; } parquet::ByteArray* ParquetCursor::getByteArray(int col) { return &colByteArrayValues[col]; } parquet::Type::type ParquetCursor::getPhysicalType(int col) { return types[col]; } parquet::LogicalType::type ParquetCursor::getLogicalType(int col) { return logicalTypes[col]; } void ParquetCursor::close() { if(reader != NULL) { reader->Close(); } } void ParquetCursor::reset(std::vector constraints) { close(); this->constraints = constraints; rowId = 0; // TODO: consider having a long lived handle in ParquetTable that can be borrowed // without incurring the cost of opening the file from scratch twice reader = parquet::ParquetFileReader::OpenFile( table->getFile().data(), true, parquet::default_reader_properties(), table->getMetadata()); rowGroupId = -1; rowGroupSize = 0; rowGroupStartRowId = 0; // TODO: handle the case where rowgroups have disjoint schemas? // TODO: or at least, fail fast if detected rowsLeftInRowGroup = 0; numRows = reader->metadata()->num_rows(); numRowGroups = reader->metadata()->num_row_groups(); } ParquetTable* ParquetCursor::getTable() const { return table; } unsigned int ParquetCursor::getNumRowGroups() const { return numRowGroups; } unsigned int ParquetCursor::getNumConstraints() const { return constraints.size(); } const Constraint& ParquetCursor::getConstraint(unsigned int i) const { return constraints[i]; }