From 7edb5e472febe5efb360b39ca09976730efa3bbf Mon Sep 17 00:00:00 2001 From: Colin Dellow Date: Sun, 4 Mar 2018 17:20:28 -0500 Subject: [PATCH] Support BLOBs --- README.md | 2 +- parquet/parquet.cc | 11 ++++++++++- parquet/parquet_cursor.cc | 24 ++++++++++++++++++++--- parquet/parquet_cursor.h | 2 ++ parquet/parquet_table.cc | 4 ++++ tests/test-all | 8 ++++++++ tests/test-supported | 41 +++++++++++++++++++++++++++++++++++++++ tests/test-unsupported | 2 +- 8 files changed, 88 insertions(+), 6 deletions(-) create mode 100755 tests/test-all create mode 100755 tests/test-supported diff --git a/README.md b/README.md index a35c1cc..50b0c32 100644 --- a/README.md +++ b/README.md @@ -46,9 +46,9 @@ These types are supported: * BOOLEAN * FLOAT * DOUBLE +* Variable- and fixed-length byte arrays These are not supported: * UINT8/UINT16/UINT32/UINT64 -* Fixed length byte arrays, including JSON and BSON subtypes * DECIMAL diff --git a/parquet/parquet.cc b/parquet/parquet.cc index 64a341f..4b2eea9 100644 --- a/parquet/parquet.cc +++ b/parquet/parquet.cc @@ -188,7 +188,11 @@ static int parquetColumn( case parquet::Type::BYTE_ARRAY: { parquet::ByteArray* rv = cursor->getByteArray(col); - sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT); + if(cursor->getLogicalType(col) == parquet::LogicalType::UTF8) { + sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT); + } else { + sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT); + } break; } case parquet::Type::INT96: @@ -201,6 +205,11 @@ static int parquetColumn( break; } case parquet::Type::FIXED_LEN_BYTE_ARRAY: + { + parquet::ByteArray* rv = cursor->getByteArray(col); + sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT); + break; + } default: // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? diff --git a/parquet/parquet_cursor.cc b/parquet/parquet_cursor.cc index 68fc666..205e4d5 100644 --- a/parquet/parquet_cursor.cc +++ b/parquet/parquet_cursor.cc @@ -32,8 +32,13 @@ void ParquetCursor::nextRowGroup() { types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type()); } + while(logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) { + logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type()); + } + for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) { types[i] = rowGroupMetadata->schema()->Column(i)->physical_type(); + logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type(); } } @@ -169,6 +174,18 @@ void ParquetCursor::ensureColumn(int col) { break; } case parquet::Type::FIXED_LEN_BYTE_ARRAY: + { + parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get(); + parquet::FixedLenByteArray flba; + if(s->NextValue(&flba, &wasNull)) { + colByteArrayValues[col].ptr = flba.ptr; + // TODO: cache this + colByteArrayValues[col].len = rowGroupMetadata->schema()->Column(col)->type_length(); + } else { + throw std::invalid_argument("unexpectedly lacking a next value"); + } + break; + } default: // Should be impossible to get here as we should have forbidden this at // CREATE time -- maybe file changed underneath us? @@ -203,9 +220,10 @@ parquet::ByteArray* ParquetCursor::getByteArray(int col) { return &colByteArrayValues[col]; } - - parquet::Type::type ParquetCursor::getPhysicalType(int col) { -// return rowGroupMetadata->schema()->Column(col)->physical_type(); return types[col]; } + +parquet::LogicalType::type ParquetCursor::getLogicalType(int col) { + return logicalTypes[col]; +} diff --git a/parquet/parquet_cursor.h b/parquet/parquet_cursor.h index 71dd9b8..1497603 100644 --- a/parquet/parquet_cursor.h +++ b/parquet/parquet_cursor.h @@ -12,6 +12,7 @@ class ParquetCursor { std::shared_ptr rowGroup; std::vector> scanners; std::vector types; + std::vector logicalTypes; std::vector colRows; std::vector colNulls; @@ -36,6 +37,7 @@ public: void ensureColumn(int col); bool isNull(int col); parquet::Type::type getPhysicalType(int col); + parquet::LogicalType::type getLogicalType(int col); int getInt32(int col); long getInt64(int col); diff --git a/parquet/parquet_table.cc b/parquet/parquet_table.cc index eb76dd9..162a252 100644 --- a/parquet/parquet_table.cc +++ b/parquet/parquet_table.cc @@ -82,9 +82,13 @@ std::string ParquetTable::CreateStatement() { case parquet::Type::BYTE_ARRAY: if(logical == parquet::LogicalType::UTF8) { type = "TEXT"; + } else { + type = "BLOB"; } break; case parquet::Type::FIXED_LEN_BYTE_ARRAY: + type = "BLOB"; + break; default: break; } diff --git a/tests/test-all b/tests/test-all new file mode 100755 index 0000000..5e50d4f --- /dev/null +++ b/tests/test-all @@ -0,0 +1,8 @@ +#!/bin/bash +set -euo pipefail + +here=$(dirname "${BASH_SOURCE[0]}") + +set -x +"$here"/test-unsupported +"$here"/test-supported diff --git a/tests/test-supported b/tests/test-supported new file mode 100755 index 0000000..759fda0 --- /dev/null +++ b/tests/test-supported @@ -0,0 +1,41 @@ +#!/bin/bash +set -euo pipefail + +# Verify that all the non-unsupported.*parquet files can be loaded and 'SELECT * FROM x LIMIT 1'ed +# without segfaulting. + +load_supported() { + file=${1:?must provide file to load} + basename=$(basename "$file") + cat < /dev/null 2> testcase-err.txt; then + echo "...FAILED; check testcase-{out,err}.txt" >&2 + exit 1 + fi + # We expect the 'SELECT 123' command to have been run + if ! grep -q 123 testcase-out.txt; then + echo "...FAILED; check testcase-{out,err}.txt" >&2 + exit 1 + fi + done < <(echo "$supported_files") +} + +main "$@" diff --git a/tests/test-unsupported b/tests/test-unsupported index 1e538df..e5734ef 100755 --- a/tests/test-unsupported +++ b/tests/test-unsupported @@ -28,7 +28,7 @@ main() { "$root"/sqlite/sqlite3 -init <(load_unsupported "$unsupported") < /dev/null > /dev/null 2> testcase-err.txt # We expect the 'SELECT 123' command to NOT have been run if grep -q 123 testcase-out.txt; then - echo "...FAILED" >&2 + echo "...FAILED; expected an error message. Check testcase-{out,err}.txt" >&2 exit 1 fi done < <(echo "$unsupported_files")