From 9c22fd1f57ed5babdb807e94733ccd64cacf4c7f Mon Sep 17 00:00:00 2001 From: Colin Dellow Date: Fri, 16 Mar 2018 16:07:41 -0400 Subject: [PATCH] Row group filters for strings, int32/64/96, bools --- parquet/parquet_cursor.cc | 122 ++++++++++++++++++++++++++++++++------ 1 file changed, 104 insertions(+), 18 deletions(-) diff --git a/parquet/parquet_cursor.cc b/parquet/parquet_cursor.cc index 03b6e05..11ab67e 100644 --- a/parquet/parquet_cursor.cc +++ b/parquet/parquet_cursor.cc @@ -52,20 +52,111 @@ bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, s case Is: case Equal: return str >= minStr && str <= maxStr; - case GreaterThan: case GreaterThanOrEqual: + return maxStr >= str; + case GreaterThan: + return maxStr > str; case LessThan: + return minStr < str; case LessThanOrEqual: + return minStr <= str; case IsNot: case NotEqual: + // If min == max == str, we can skip this. + return !(minStr == maxStr && str == minStr); case Like: - + // TODO: We could do something here where we filter based on the leading characters + // of the target. For now, do nothing. default: return true; } } -bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr stats) { +int64_t int96toMsSinceEpoch(const parquet::Int96& rv) { + __int128 ns = rv.value[0] + ((unsigned long)rv.value[1] << 32); + __int128 julianDay = rv.value[2]; + __int128 nsSinceEpoch = (julianDay - 2440588); + nsSinceEpoch *= 86400; + nsSinceEpoch *= 1000 * 1000 * 1000; + nsSinceEpoch += ns; + nsSinceEpoch /= 1000000; + return nsSinceEpoch; +} + +bool ParquetCursor::currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr _stats) { + if(!_stats->HasMinMax()) { + return true; + } + + if(constraint.type != Integer) { + return true; + } + + int column = constraint.column; + + int64_t min = std::numeric_limits::min(); + int64_t max = std::numeric_limits::max(); + parquet::Type::type pqType = types[column]; + + if(pqType == parquet::Type::INT32) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + min = stats->min(); + max = stats->max(); + } else if(pqType == parquet::Type::INT64) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + min = stats->min(); + max = stats->max(); + } else if(pqType == parquet::Type::INT96) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + min = int96toMsSinceEpoch(stats->min()); + max = int96toMsSinceEpoch(stats->max()); + + } else if(pqType == parquet::Type::BOOLEAN) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + min = stats->min(); + max = stats->max(); + + } else { + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesIntegerFilter called on unsupported type: " << + parquet::TypeToString(pqType); + throw std::invalid_argument(ss.str()); + } + + const int64_t value = constraint.intValue; +// printf("min=%s [%d], max=%s [%d], target=%s\n", minStr.data(), min.len, maxStr.data(), max.len, str.data()); + + switch(constraint.op) { + case Is: + case Equal: + return value >= min && value <= max; + case GreaterThanOrEqual: + return max >= value; + case GreaterThan: + return max > value; + case LessThan: + return min < value; + case LessThanOrEqual: + return min <= value; + case IsNot: + case NotEqual: + // If min == max == str, we can skip this. + return !(min == max && value == min); + case Like: + default: + return true; + } + return true; } @@ -88,12 +179,17 @@ bool ParquetCursor::currentRowSatisfiesTextFilter(Constraint& constraint) { return false; return 0 == memcmp(&blob[0], ba->ptr, ba->len); + case IsNot: + case NotEqual: + if(blob.size() != ba->len) + return true; + + return 0 != memcmp(&blob[0], ba->ptr, ba->len); case GreaterThan: case GreaterThanOrEqual: case LessThan: case LessThanOrEqual: - case IsNot: - case NotEqual: + case Like: default: @@ -270,6 +366,7 @@ start: rowId++; if(!currentRowSatisfiesFilter()) goto start; + } int ParquetCursor::getRowId() { @@ -433,20 +530,9 @@ void ParquetCursor::ensureColumn(int col) { // To get nanoseconds since the epoch: // (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); - parquet::Int96 rv; - rv.value[0] = 0; - rv.value[1] = 0; - rv.value[2] = 0; + parquet::Int96 rv {0, 0, 0}; if(s->NextValue(&rv, &wasNull)) { - __int128 ns = rv.value[0] + ((unsigned long)rv.value[1] << 32); - __int128 julianDay = rv.value[2]; - __int128 nsSinceEpoch = (julianDay - 2440588); - nsSinceEpoch *= 86400; - nsSinceEpoch *= 1000 * 1000 * 1000; - nsSinceEpoch += ns; - nsSinceEpoch /= 1000000; - - colIntValues[col] = nsSinceEpoch; + colIntValues[col] = int96toMsSinceEpoch(rv); } else { throw std::invalid_argument("unexpectedly lacking a next value"); }