From 1f3ffce560ff30d778c6ee144ba6041f99e33e37 Mon Sep 17 00:00:00 2001 From: Colin Dellow Date: Sun, 18 Mar 2018 15:03:08 -0400 Subject: [PATCH] Row group filtering for BYTE_ARRAY --- parquet/parquet.cc | 4 -- parquet/parquet_cursor.cc | 112 +++++++++++++++++++++++++++++++++++++- parquet/parquet_cursor.h | 1 + 3 files changed, 112 insertions(+), 5 deletions(-) diff --git a/parquet/parquet.cc b/parquet/parquet.cc index 4d32e27..8459a0f 100644 --- a/parquet/parquet.cc +++ b/parquet/parquet.cc @@ -465,9 +465,6 @@ static int parquetBestIndex( pIdxInfo->aConstraintUsage[i].argvIndex = j; } } - - // TODO: consider setting this when querying by rowid? Unclear if that's implied. - // pIdxInfo->idxFlags = SQLITE_INDEX_SCAN_UNIQUE; } printf("idx %d has cost %f\n", pIdxInfo->idxNum, pIdxInfo->estimatedCost); @@ -480,7 +477,6 @@ static int parquetBestIndex( pIdxInfo->idxStr = (char*)dupe; pIdxInfo->needToFreeIdxStr = 1; - // TODO: populate argvIndex. memset(dupe, 0, dupeSize); memcpy(dupe, pIdxInfo, sizeof(sqlite3_index_info)); diff --git a/parquet/parquet_cursor.cc b/parquet/parquet_cursor.cc index 52ee438..361bcfe 100644 --- a/parquet/parquet_cursor.cc +++ b/parquet/parquet_cursor.cc @@ -32,6 +32,114 @@ bool ParquetCursor::currentRowGroupSatisfiesRowIdFilter(Constraint& constraint) } } +bool ParquetCursor::currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr _stats) { + if(!_stats->HasMinMax()) { + return true; + } + + if(constraint.type != Blob) { + return true; + } + + const unsigned char* minPtr = NULL; + const unsigned char* maxPtr = NULL; + size_t minLen = 0; + size_t maxLen = 0; + + parquet::Type::type pqType = types[constraint.column]; + + if(pqType == parquet::Type::BYTE_ARRAY) { + parquet::TypedRowGroupStatistics>* stats = + (parquet::TypedRowGroupStatistics>*)_stats.get(); + + minPtr = stats->min().ptr; + minLen = stats->min().len; + maxPtr = stats->max().ptr; + maxLen = stats->max().len; + } else if(pqType == parquet::Type::FIXED_LEN_BYTE_ARRAY) { + // It seems like parquet-cpp doesn't actually produce stats for FLBA yet, so + // rather than have untested code here, we'll just short circuit. + // + // Once I can get my hands on such a file, it should be easy to add support. + return true; + } else { + // Should be impossible to get here + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": currentRowGroupSatisfiesBlobFilter called on unsupported type: " << + parquet::TypeToString(pqType); + throw std::invalid_argument(ss.str()); + } + + printf("\n\nBLOB\n\n"); + + const std::vector& blob = constraint.blobValue; + + switch(constraint.op) { + case Is: + case Equal: + { + bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0; + bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; + + bool blobGteMinBlob = std::lexicographical_compare( + minPtr, + minPtr + minLen, + &blob[0], + &blob[0] + blob.size()); + + bool blobLtMaxBlob = std::lexicographical_compare( + &blob[0], + &blob[0] + blob.size(), + maxPtr, + maxPtr + maxLen); + + + return (minEqual || blobGteMinBlob) && (maxEqual || blobLtMaxBlob); + } + case GreaterThanOrEqual: + { + bool maxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; + + return maxEqual || std::lexicographical_compare( + &blob[0], + &blob[0] + blob.size(), + maxPtr, + maxPtr + maxLen); + } + case GreaterThan: + return std::lexicographical_compare( + &blob[0], + &blob[0] + blob.size(), + maxPtr, + maxPtr + maxLen); + case LessThan: + return std::lexicographical_compare( + minPtr, + minPtr + minLen, + &blob[0], + &blob[0] + blob.size()); + case LessThanOrEqual: + { + bool minEqual = blob.size() == minLen && memcmp(&blob[0], minPtr, minLen) == 0; + return minEqual || std::lexicographical_compare( + minPtr, + minPtr + minLen, + &blob[0], + &blob[0] + blob.size()); + } + case IsNot: + case NotEqual: + { + // If min == max == blob, we can skip this. + bool blobMaxEqual = blob.size() == maxLen && memcmp(&blob[0], maxPtr, maxLen) == 0; + bool minMaxEqual = minLen == maxLen && memcmp(minPtr, maxPtr, minLen) == 0; + return !(blobMaxEqual && minMaxEqual); + } + default: + return true; + } +} + bool ParquetCursor::currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr _stats) { parquet::TypedRowGroupStatistics>* stats = (parquet::TypedRowGroupStatistics>*)_stats.get(); @@ -442,8 +550,10 @@ bool ParquetCursor::currentRowGroupSatisfiesFilter() { } else { parquet::Type::type pqType = types[column]; - if(pqType == parquet::Type::BYTE_ARRAY) { + if(pqType == parquet::Type::BYTE_ARRAY && logicalTypes[column] == parquet::LogicalType::UTF8) { rv = currentRowGroupSatisfiesTextFilter(constraints[i], stats); + } else if(pqType == parquet::Type::BYTE_ARRAY) { + rv = currentRowGroupSatisfiesBlobFilter(constraints[i], stats); } else if(pqType == parquet::Type::INT32 || pqType == parquet::Type::INT64 || pqType == parquet::Type::INT96 || diff --git a/parquet/parquet_cursor.h b/parquet/parquet_cursor.h index af553fe..a237ecf 100644 --- a/parquet/parquet_cursor.h +++ b/parquet/parquet_cursor.h @@ -37,6 +37,7 @@ class ParquetCursor { bool currentRowGroupSatisfiesFilter(); bool currentRowGroupSatisfiesRowIdFilter(Constraint& constraint); bool currentRowGroupSatisfiesTextFilter(Constraint& constraint, std::shared_ptr stats); + bool currentRowGroupSatisfiesBlobFilter(Constraint& constraint, std::shared_ptr stats); bool currentRowGroupSatisfiesIntegerFilter(Constraint& constraint, std::shared_ptr stats); bool currentRowGroupSatisfiesDoubleFilter(Constraint& constraint, std::shared_ptr stats);