diff --git a/.gitignore b/.gitignore index acee4a7..79de523 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,5 @@ # transient sqlite files /sqlite /sqlite-autoconf* +/cmds.txt +/sqlite-with-parquet diff --git a/parquet/cmds.txt b/parquet/cmds.txt index 726d85d..122e932 100644 --- a/parquet/cmds.txt +++ b/parquet/cmds.txt @@ -1,19 +1,24 @@ .load ./libparquet +.headers on select 'creating without enough args'; create virtual table noargs using parquet; select 'creating with invalid file'; create virtual table nonexistent using parquet('nonexistent'); -select 'creating with valid file'; -create virtual table parquet using parquet('/home/cldellow/src/csv2parquet/12m.parquet.snappy'); -.tables -.schema parquet -.timer on -.headers on -select count(*) from (select * from parquet limit 1); -select rowid,col0 from parquet where rowid > 5 limit 5; -select count(*) from parquet limit 1; -select sum(col0) from parquet limit 1; -select * from parquet limit 10; -select sum(length(col3)) from parquet; +select 'creating others'; +create virtual table others using parquet('/home/cldellow/src/parquet-vtable/parquet/others.parquet'); +select * from others limit 1; + +--select 'creating with valid file'; +--create virtual table parquet using parquet('/home/cldellow/src/csv2parquet/12m.parquet.snappy'); +--.tables +--.schema parquet +--.fullschema +--.timer on +--select count(*) from (select * from parquet limit 1); +--select rowid,col0 from parquet where rowid > 5 limit 5; +--select count(*) from parquet limit 1; +--select sum(col0) from parquet limit 1; +--select * from parquet limit 10; +--select sum(length(col3)) from parquet; diff --git a/parquet/parquet.cc b/parquet/parquet.cc index c5eac0d..6650e94 100644 --- a/parquet/parquet.cc +++ b/parquet/parquet.cc @@ -171,33 +171,44 @@ static int parquetColumn( sqlite3_result_null(ctx); } else { switch(cursor->getPhysicalType(col)) { + case parquet::Type::BOOLEAN: case parquet::Type::INT32: { - int rv = cursor->getInt(col); + int rv = cursor->getInt32(col); sqlite3_result_int(ctx, rv); + break; } - break; - case parquet::Type::DOUBLE: { double rv = cursor->getDouble(col); sqlite3_result_double(ctx, rv); + break; } - break; case parquet::Type::BYTE_ARRAY: { parquet::ByteArray* rv = cursor->getByteArray(col); sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT); + break; } - break; - - case parquet::Type::BOOLEAN: - case parquet::Type::INT64: - case parquet::Type::FLOAT: case parquet::Type::INT96: + // This type exists to store timestamps in nanoseconds due to legacy + // reasons. We just interpret it as a timestamp in milliseconds. + case parquet::Type::INT64: + { + long rv = cursor->getInt64(col); + sqlite3_result_int64(ctx, rv); + break; + } + case parquet::Type::FLOAT: case parquet::Type::FIXED_LEN_BYTE_ARRAY: default: - throw std::invalid_argument("cannot handle this type"); + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << + parquet::TypeToString(cursor->getPhysicalType(col)); + + throw std::invalid_argument(ss.str()); break; } } diff --git a/parquet/parquet_cursor.cc b/parquet/parquet_cursor.cc index 1573abc..875c41a 100644 --- a/parquet/parquet_cursor.cc +++ b/parquet/parquet_cursor.cc @@ -84,8 +84,8 @@ void ParquetCursor::ensureColumn(int col) { } else { throw std::invalid_argument("unexpectedly lacking a next value"); } + break; } - break; case parquet::Type::DOUBLE: { parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); @@ -95,24 +95,77 @@ void ParquetCursor::ensureColumn(int col) { } else { throw std::invalid_argument("unexpectedly lacking a next value"); } + break; } - break; case parquet::Type::BYTE_ARRAY: { parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); if(!s->NextValue(&colByteArrayValues[col], &wasNull)) { throw std::invalid_argument("unexpectedly lacking a next value"); } + break; + } + case parquet::Type::INT96: + { + // INT96 tracks a date with nanosecond precision, convert to ms since epoch. + // ...see https://github.com/apache/parquet-format/pull/49 for more + // + // First 8 bytes: nanoseconds into the day + // Last 4 bytes: Julian day + // To get nanoseconds since the epoch: + // (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds + parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get(); + parquet::Int96 rv; + rv.value[0] = 0; + rv.value[1] = 0; + rv.value[2] = 0; + if(s->NextValue(&rv, &wasNull)) { + __int128 ns = rv.value[0] + ((unsigned long)rv.value[1] << 32); + __int128 julianDay = rv.value[2]; + __int128 nsSinceEpoch = (julianDay - 2440588); + nsSinceEpoch *= 86400; + nsSinceEpoch *= 1000 * 1000 * 1000; + nsSinceEpoch += ns; + nsSinceEpoch /= 1000000; + + colIntValues[col] = nsSinceEpoch; + } else { + throw std::invalid_argument("unexpectedly lacking a next value"); + } + break; + } + case parquet::Type::INT64: + { + parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get(); + long rv = 0; + if(s->NextValue(&rv, &wasNull)) { + colIntValues[col] = rv; + } else { + throw std::invalid_argument("unexpectedly lacking a next value"); + } + break; } - break; case parquet::Type::BOOLEAN: - case parquet::Type::INT64: + { + parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get(); + bool rv = false; + if(s->NextValue(&rv, &wasNull)) { + colIntValues[col] = rv ? 1 : 0; + } else { + throw std::invalid_argument("unexpectedly lacking a next value"); + } + break; + } case parquet::Type::FLOAT: - case parquet::Type::INT96: case parquet::Type::FIXED_LEN_BYTE_ARRAY: default: - throw std::invalid_argument("cannot handle"); + // Should be impossible to get here as we should have forbidden this at + // CREATE time -- maybe file changed underneath us? + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " << + parquet::TypeToString(types[col]); + throw std::invalid_argument(ss.str()); break; } @@ -124,7 +177,11 @@ bool ParquetCursor::isNull(int col) { return colNulls[col]; } -int ParquetCursor::getInt(int col) { +int ParquetCursor::getInt32(int col) { + return colIntValues[col]; +} + +long ParquetCursor::getInt64(int col) { return colIntValues[col]; } diff --git a/parquet/parquet_cursor.h b/parquet/parquet_cursor.h index b12b96b..9e66836 100644 --- a/parquet/parquet_cursor.h +++ b/parquet/parquet_cursor.h @@ -35,10 +35,13 @@ public: void ensureColumn(int col); bool isNull(int col); - int getInt(int col); + parquet::Type::type getPhysicalType(int col); + + int getInt32(int col); + long getInt64(int col); double getDouble(int col); parquet::ByteArray* getByteArray(int col); - parquet::Type::type getPhysicalType(int col); + /* sqlite3_result_double() sqlite3_result_int() diff --git a/parquet/parquet_table.cc b/parquet/parquet_table.cc index de69961..1f23010 100644 --- a/parquet/parquet_table.cc +++ b/parquet/parquet_table.cc @@ -16,23 +16,19 @@ std::string ParquetTable::CreateStatement() { auto _col = schema->GetColumnRoot(i); if(!_col->is_primitive()) { - throw std::invalid_argument("parquet file has non-primitive column"); + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-primitive type"; + throw std::invalid_argument(ss.str()); } if(_col->is_repeated()) { - throw std::invalid_argument("parquet file has non-scalar column"); + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-scalar type"; + throw std::invalid_argument(ss.str()); } parquet::schema::PrimitiveNode* col = (parquet::schema::PrimitiveNode*)_col; - printf("col %d[p=%d:%s, l=%d:%s] is %s\n", - i, - col->physical_type(), - parquet::TypeToString(col->physical_type()).data(), - col->logical_type(), - parquet::LogicalTypeToString(col->logical_type()).data(), - col->name().data()); - if(i > 0) text += ", "; @@ -52,6 +48,9 @@ std::string ParquetTable::CreateStatement() { type = "SMALLINT"; } break; + case parquet::Type::INT96: + // INT96 is used for nanosecond precision on timestamps; we truncate + // to millisecond precision. case parquet::Type::INT64: type = "BIGINT"; break; @@ -67,15 +66,29 @@ std::string ParquetTable::CreateStatement() { } break; case parquet::Type::FIXED_LEN_BYTE_ARRAY: - case parquet::Type::INT96: default: break; } if(type.empty()) { - throw std::invalid_argument("unsupported type"); + std::ostringstream ss; + ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has unsupported type: " << + parquet::TypeToString(col->physical_type()) << "/" << parquet::LogicalTypeToString(col->logical_type()); + + throw std::invalid_argument(ss.str()); } - printf("...%s\n", type.data()); + +#ifdef DEBUG + printf("col %d[name=%s, p=%d:%s, l=%d:%s] is %s\n", + i, + col->name().data(), + col->physical_type(), + parquet::TypeToString(col->physical_type()).data(), + col->logical_type(), + parquet::LogicalTypeToString(col->logical_type()).data(), + type.data()); +#endif + text += " "; text += type; }