Boolean, INT96, INT64

This commit is contained in:
Colin Dellow 2018-03-03 20:00:50 -05:00
parent 1de843fca8
commit eb0b48f867
6 changed files with 135 additions and 44 deletions

2
.gitignore vendored
View File

@ -37,3 +37,5 @@
# transient sqlite files # transient sqlite files
/sqlite /sqlite
/sqlite-autoconf* /sqlite-autoconf*
/cmds.txt
/sqlite-with-parquet

View File

@ -1,19 +1,24 @@
.load ./libparquet .load ./libparquet
.headers on
select 'creating without enough args'; select 'creating without enough args';
create virtual table noargs using parquet; create virtual table noargs using parquet;
select 'creating with invalid file'; select 'creating with invalid file';
create virtual table nonexistent using parquet('nonexistent'); create virtual table nonexistent using parquet('nonexistent');
select 'creating with valid file'; select 'creating others';
create virtual table parquet using parquet('/home/cldellow/src/csv2parquet/12m.parquet.snappy'); create virtual table others using parquet('/home/cldellow/src/parquet-vtable/parquet/others.parquet');
.tables select * from others limit 1;
.schema parquet
.timer on --select 'creating with valid file';
.headers on --create virtual table parquet using parquet('/home/cldellow/src/csv2parquet/12m.parquet.snappy');
select count(*) from (select * from parquet limit 1); --.tables
select rowid,col0 from parquet where rowid > 5 limit 5; --.schema parquet
select count(*) from parquet limit 1; --.fullschema
select sum(col0) from parquet limit 1; --.timer on
select * from parquet limit 10; --select count(*) from (select * from parquet limit 1);
select sum(length(col3)) from parquet; --select rowid,col0 from parquet where rowid > 5 limit 5;
--select count(*) from parquet limit 1;
--select sum(col0) from parquet limit 1;
--select * from parquet limit 10;
--select sum(length(col3)) from parquet;

View File

@ -171,33 +171,44 @@ static int parquetColumn(
sqlite3_result_null(ctx); sqlite3_result_null(ctx);
} else { } else {
switch(cursor->getPhysicalType(col)) { switch(cursor->getPhysicalType(col)) {
case parquet::Type::BOOLEAN:
case parquet::Type::INT32: case parquet::Type::INT32:
{ {
int rv = cursor->getInt(col); int rv = cursor->getInt32(col);
sqlite3_result_int(ctx, rv); sqlite3_result_int(ctx, rv);
}
break; break;
}
case parquet::Type::DOUBLE: case parquet::Type::DOUBLE:
{ {
double rv = cursor->getDouble(col); double rv = cursor->getDouble(col);
sqlite3_result_double(ctx, rv); sqlite3_result_double(ctx, rv);
}
break; break;
}
case parquet::Type::BYTE_ARRAY: case parquet::Type::BYTE_ARRAY:
{ {
parquet::ByteArray* rv = cursor->getByteArray(col); parquet::ByteArray* rv = cursor->getByteArray(col);
sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT); sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT);
}
break; break;
}
case parquet::Type::BOOLEAN:
case parquet::Type::INT64:
case parquet::Type::FLOAT:
case parquet::Type::INT96: case parquet::Type::INT96:
// This type exists to store timestamps in nanoseconds due to legacy
// reasons. We just interpret it as a timestamp in milliseconds.
case parquet::Type::INT64:
{
long rv = cursor->getInt64(col);
sqlite3_result_int64(ctx, rv);
break;
}
case parquet::Type::FLOAT:
case parquet::Type::FIXED_LEN_BYTE_ARRAY: case parquet::Type::FIXED_LEN_BYTE_ARRAY:
default: default:
throw std::invalid_argument("cannot handle this type"); // Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
parquet::TypeToString(cursor->getPhysicalType(col));
throw std::invalid_argument(ss.str());
break; break;
} }
} }

View File

@ -84,8 +84,8 @@ void ParquetCursor::ensureColumn(int col) {
} else { } else {
throw std::invalid_argument("unexpectedly lacking a next value"); throw std::invalid_argument("unexpectedly lacking a next value");
} }
}
break; break;
}
case parquet::Type::DOUBLE: case parquet::Type::DOUBLE:
{ {
parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get();
@ -95,24 +95,77 @@ void ParquetCursor::ensureColumn(int col) {
} else { } else {
throw std::invalid_argument("unexpectedly lacking a next value"); throw std::invalid_argument("unexpectedly lacking a next value");
} }
}
break; break;
}
case parquet::Type::BYTE_ARRAY: case parquet::Type::BYTE_ARRAY:
{ {
parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get();
if(!s->NextValue(&colByteArrayValues[col], &wasNull)) { if(!s->NextValue(&colByteArrayValues[col], &wasNull)) {
throw std::invalid_argument("unexpectedly lacking a next value"); throw std::invalid_argument("unexpectedly lacking a next value");
} }
break;
}
case parquet::Type::INT96:
{
// INT96 tracks a date with nanosecond precision, convert to ms since epoch.
// ...see https://github.com/apache/parquet-format/pull/49 for more
//
// First 8 bytes: nanoseconds into the day
// Last 4 bytes: Julian day
// To get nanoseconds since the epoch:
// (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds
parquet::Int96Scanner* s = (parquet::Int96Scanner*)scanners[col].get();
parquet::Int96 rv;
rv.value[0] = 0;
rv.value[1] = 0;
rv.value[2] = 0;
if(s->NextValue(&rv, &wasNull)) {
__int128 ns = rv.value[0] + ((unsigned long)rv.value[1] << 32);
__int128 julianDay = rv.value[2];
__int128 nsSinceEpoch = (julianDay - 2440588);
nsSinceEpoch *= 86400;
nsSinceEpoch *= 1000 * 1000 * 1000;
nsSinceEpoch += ns;
nsSinceEpoch /= 1000000;
colIntValues[col] = nsSinceEpoch;
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
} }
break; break;
}
case parquet::Type::INT64:
{
parquet::Int64Scanner* s = (parquet::Int64Scanner*)scanners[col].get();
long rv = 0;
if(s->NextValue(&rv, &wasNull)) {
colIntValues[col] = rv;
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
case parquet::Type::BOOLEAN: case parquet::Type::BOOLEAN:
case parquet::Type::INT64: {
parquet::BoolScanner* s = (parquet::BoolScanner*)scanners[col].get();
bool rv = false;
if(s->NextValue(&rv, &wasNull)) {
colIntValues[col] = rv ? 1 : 0;
} else {
throw std::invalid_argument("unexpectedly lacking a next value");
}
break;
}
case parquet::Type::FLOAT: case parquet::Type::FLOAT:
case parquet::Type::INT96:
case parquet::Type::FIXED_LEN_BYTE_ARRAY: case parquet::Type::FIXED_LEN_BYTE_ARRAY:
default: default:
throw std::invalid_argument("cannot handle"); // Should be impossible to get here as we should have forbidden this at
// CREATE time -- maybe file changed underneath us?
std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << col << " has unsupported type: " <<
parquet::TypeToString(types[col]);
throw std::invalid_argument(ss.str());
break; break;
} }
@ -124,7 +177,11 @@ bool ParquetCursor::isNull(int col) {
return colNulls[col]; return colNulls[col];
} }
int ParquetCursor::getInt(int col) { int ParquetCursor::getInt32(int col) {
return colIntValues[col];
}
long ParquetCursor::getInt64(int col) {
return colIntValues[col]; return colIntValues[col];
} }

View File

@ -35,10 +35,13 @@ public:
void ensureColumn(int col); void ensureColumn(int col);
bool isNull(int col); bool isNull(int col);
int getInt(int col); parquet::Type::type getPhysicalType(int col);
int getInt32(int col);
long getInt64(int col);
double getDouble(int col); double getDouble(int col);
parquet::ByteArray* getByteArray(int col); parquet::ByteArray* getByteArray(int col);
parquet::Type::type getPhysicalType(int col);
/* /*
sqlite3_result_double() sqlite3_result_double()
sqlite3_result_int() sqlite3_result_int()

View File

@ -16,23 +16,19 @@ std::string ParquetTable::CreateStatement() {
auto _col = schema->GetColumnRoot(i); auto _col = schema->GetColumnRoot(i);
if(!_col->is_primitive()) { if(!_col->is_primitive()) {
throw std::invalid_argument("parquet file has non-primitive column"); std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-primitive type";
throw std::invalid_argument(ss.str());
} }
if(_col->is_repeated()) { if(_col->is_repeated()) {
throw std::invalid_argument("parquet file has non-scalar column"); std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has non-scalar type";
throw std::invalid_argument(ss.str());
} }
parquet::schema::PrimitiveNode* col = (parquet::schema::PrimitiveNode*)_col; parquet::schema::PrimitiveNode* col = (parquet::schema::PrimitiveNode*)_col;
printf("col %d[p=%d:%s, l=%d:%s] is %s\n",
i,
col->physical_type(),
parquet::TypeToString(col->physical_type()).data(),
col->logical_type(),
parquet::LogicalTypeToString(col->logical_type()).data(),
col->name().data());
if(i > 0) if(i > 0)
text += ", "; text += ", ";
@ -52,6 +48,9 @@ std::string ParquetTable::CreateStatement() {
type = "SMALLINT"; type = "SMALLINT";
} }
break; break;
case parquet::Type::INT96:
// INT96 is used for nanosecond precision on timestamps; we truncate
// to millisecond precision.
case parquet::Type::INT64: case parquet::Type::INT64:
type = "BIGINT"; type = "BIGINT";
break; break;
@ -67,15 +66,29 @@ std::string ParquetTable::CreateStatement() {
} }
break; break;
case parquet::Type::FIXED_LEN_BYTE_ARRAY: case parquet::Type::FIXED_LEN_BYTE_ARRAY:
case parquet::Type::INT96:
default: default:
break; break;
} }
if(type.empty()) { if(type.empty()) {
throw std::invalid_argument("unsupported type"); std::ostringstream ss;
ss << __FILE__ << ":" << __LINE__ << ": column " << i << " has unsupported type: " <<
parquet::TypeToString(col->physical_type()) << "/" << parquet::LogicalTypeToString(col->logical_type());
throw std::invalid_argument(ss.str());
} }
printf("...%s\n", type.data());
#ifdef DEBUG
printf("col %d[name=%s, p=%d:%s, l=%d:%s] is %s\n",
i,
col->name().data(),
col->physical_type(),
parquet::TypeToString(col->physical_type()).data(),
col->logical_type(),
parquet::LogicalTypeToString(col->logical_type()).data(),
type.data());
#endif
text += " "; text += " ";
text += type; text += type;
} }