Support BLOBs

This commit is contained in:
Colin Dellow 2018-03-04 17:20:28 -05:00
parent f3e78408bf
commit 7edb5e472f
8 changed files with 88 additions and 6 deletions

View File

@ -46,9 +46,9 @@ These types are supported:
* BOOLEAN * BOOLEAN
* FLOAT * FLOAT
* DOUBLE * DOUBLE
* Variable- and fixed-length byte arrays
These are not supported: These are not supported:
* UINT8/UINT16/UINT32/UINT64 * UINT8/UINT16/UINT32/UINT64
* Fixed length byte arrays, including JSON and BSON subtypes
* DECIMAL * DECIMAL

View File

@ -188,7 +188,11 @@ static int parquetColumn(
case parquet::Type::BYTE_ARRAY: case parquet::Type::BYTE_ARRAY:
{ {
parquet::ByteArray* rv = cursor->getByteArray(col); parquet::ByteArray* rv = cursor->getByteArray(col);
if(cursor->getLogicalType(col) == parquet::LogicalType::UTF8) {
sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT); sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT);
} else {
sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT);
}
break; break;
} }
case parquet::Type::INT96: case parquet::Type::INT96:
@ -201,6 +205,11 @@ static int parquetColumn(
break; break;
} }
case parquet::Type::FIXED_LEN_BYTE_ARRAY: case parquet::Type::FIXED_LEN_BYTE_ARRAY:
{
parquet::ByteArray* rv = cursor->getByteArray(col);
sqlite3_result_blob(ctx, (void*)rv->ptr, rv->len, SQLITE_TRANSIENT);
break;
}
default: default:
// Should be impossible to get here as we should have forbidden this at // Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us? // CREATE time -- maybe file changed underneath us?

View File

@ -32,8 +32,13 @@ void ParquetCursor::nextRowGroup() {
types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type()); types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type());
} }
while(logicalTypes.size() < (unsigned int)rowGroupMetadata->num_columns()) {
logicalTypes.push_back(rowGroupMetadata->schema()->Column(0)->logical_type());
}
for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) { for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) {
types[i] = rowGroupMetadata->schema()->Column(i)->physical_type(); types[i] = rowGroupMetadata->schema()->Column(i)->physical_type();
logicalTypes[i] = rowGroupMetadata->schema()->Column(i)->logical_type();
} }
} }
@ -169,6 +174,18 @@ void ParquetCursor::ensureColumn(int col) {
break; break;
} }
case parquet::Type::FIXED_LEN_BYTE_ARRAY: case parquet::Type::FIXED_LEN_BYTE_ARRAY:
{
parquet::FixedLenByteArrayScanner* s = (parquet::FixedLenByteArrayScanner*)scanners[col].get();
parquet::FixedLenByteArray flba;
if(s->NextValue(&flba, &wasNull)) {
colByteArrayValues[col].ptr = flba.ptr;
// TODO: cache this
colByteArrayValues[col].len = rowGroupMetadata->schema()->Column(col)->type_length();
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
default: default:
// Should be impossible to get here as we should have forbidden this at // Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us? // CREATE time -- maybe file changed underneath us?
@ -203,9 +220,10 @@ parquet::ByteArray* ParquetCursor::getByteArray(int col) {
return &colByteArrayValues[col]; return &colByteArrayValues[col];
} }
parquet::Type::type ParquetCursor::getPhysicalType(int col) { parquet::Type::type ParquetCursor::getPhysicalType(int col) {
// return rowGroupMetadata->schema()->Column(col)->physical_type();
return types[col]; return types[col];
} }
parquet::LogicalType::type ParquetCursor::getLogicalType(int col) {
return logicalTypes[col];
}

View File

@ -12,6 +12,7 @@ class ParquetCursor {
std::shared_ptr<parquet::RowGroupReader> rowGroup; std::shared_ptr<parquet::RowGroupReader> rowGroup;
std::vector<std::shared_ptr<parquet::Scanner>> scanners; std::vector<std::shared_ptr<parquet::Scanner>> scanners;
std::vector<parquet::Type::type> types; std::vector<parquet::Type::type> types;
std::vector<parquet::LogicalType::type> logicalTypes;
std::vector<int> colRows; std::vector<int> colRows;
std::vector<bool> colNulls; std::vector<bool> colNulls;
@ -36,6 +37,7 @@ public:
void ensureColumn(int col); void ensureColumn(int col);
bool isNull(int col); bool isNull(int col);
parquet::Type::type getPhysicalType(int col); parquet::Type::type getPhysicalType(int col);
parquet::LogicalType::type getLogicalType(int col);
int getInt32(int col); int getInt32(int col);
long getInt64(int col); long getInt64(int col);

View File

@ -82,9 +82,13 @@ std::string ParquetTable::CreateStatement() {
case parquet::Type::BYTE_ARRAY: case parquet::Type::BYTE_ARRAY:
if(logical == parquet::LogicalType::UTF8) { if(logical == parquet::LogicalType::UTF8) {
type = "TEXT"; type = "TEXT";
} else {
type = "BLOB";
} }
break; break;
case parquet::Type::FIXED_LEN_BYTE_ARRAY: case parquet::Type::FIXED_LEN_BYTE_ARRAY:
type = "BLOB";
break;
default: default:
break; break;
} }

8
tests/test-all Executable file
View File

@ -0,0 +1,8 @@
#!/bin/bash
set -euo pipefail
here=$(dirname "${BASH_SOURCE[0]}")
set -x
"$here"/test-unsupported
"$here"/test-supported

41
tests/test-supported Executable file
View File

@ -0,0 +1,41 @@
#!/bin/bash
set -euo pipefail
# Verify that all the non-unsupported.*parquet files can be loaded and 'SELECT * FROM x LIMIT 1'ed
# without segfaulting.
load_supported() {
file=${1:?must provide file to load}
basename=$(basename "$file")
cat <<EOF
.echo on
.load parquet/libparquet
.testcase $basename
.bail on
CREATE VIRTUAL TABLE test USING parquet('$file');
SELECT * FROM test LIMIT 1;
SELECT 123;
EOF
}
main() {
root=$(dirname "${BASH_SOURCE[0]}")/..
root=$(readlink -f "$root")
cd "$root"
supported_files=$(find . -type f -name '*.parquet' -not -name 'unsupported*.parquet')
while read -r supported; do
echo "Testing: $supported"
if ! "$root"/sqlite/sqlite3 -init <(load_supported "$supported") < /dev/null > /dev/null 2> testcase-err.txt; then
echo "...FAILED; check testcase-{out,err}.txt" >&2
exit 1
fi
# We expect the 'SELECT 123' command to have been run
if ! grep -q 123 testcase-out.txt; then
echo "...FAILED; check testcase-{out,err}.txt" >&2
exit 1
fi
done < <(echo "$supported_files")
}
main "$@"

View File

@ -28,7 +28,7 @@ main() {
"$root"/sqlite/sqlite3 -init <(load_unsupported "$unsupported") < /dev/null > /dev/null 2> testcase-err.txt "$root"/sqlite/sqlite3 -init <(load_unsupported "$unsupported") < /dev/null > /dev/null 2> testcase-err.txt
# We expect the 'SELECT 123' command to NOT have been run # We expect the 'SELECT 123' command to NOT have been run
if grep -q 123 testcase-out.txt; then if grep -q 123 testcase-out.txt; then
echo "...FAILED" >&2 echo "...FAILED; expected an error message. Check testcase-{out,err}.txt" >&2
exit 1 exit 1
fi fi
done < <(echo "$unsupported_files") done < <(echo "$unsupported_files")