From 1de843fca82980c0bec8174ad76720e72fa1a266 Mon Sep 17 00:00:00 2001 From: Colin Dellow Date: Sat, 3 Mar 2018 15:44:01 -0500 Subject: [PATCH] Very rough first cut supports int32, double, strings. --- parquet/cmds.txt | 20 +- parquet/go | 13 +- parquet/make | 8 +- parquet/parquet.cc | 674 ++++++-------------------------------- parquet/parquet_cursor.cc | 144 ++++++++ parquet/parquet_cursor.h | 52 +++ parquet/parquet_table.cc | 84 +++++ parquet/parquet_table.h | 14 + 8 files changed, 437 insertions(+), 572 deletions(-) create mode 100644 parquet/parquet_cursor.cc create mode 100644 parquet/parquet_cursor.h create mode 100644 parquet/parquet_table.cc create mode 100644 parquet/parquet_table.h diff --git a/parquet/cmds.txt b/parquet/cmds.txt index 4c77a73..726d85d 100644 --- a/parquet/cmds.txt +++ b/parquet/cmds.txt @@ -1,3 +1,19 @@ .load ./libparquet -create virtual table parquet using parquet(filename='../csv.csv'); -select * from parquet; +select 'creating without enough args'; +create virtual table noargs using parquet; + +select 'creating with invalid file'; +create virtual table nonexistent using parquet('nonexistent'); + +select 'creating with valid file'; +create virtual table parquet using parquet('/home/cldellow/src/csv2parquet/12m.parquet.snappy'); +.tables +.schema parquet +.timer on +.headers on +select count(*) from (select * from parquet limit 1); +select rowid,col0 from parquet where rowid > 5 limit 5; +select count(*) from parquet limit 1; +select sum(col0) from parquet limit 1; +select * from parquet limit 10; +select sum(length(col3)) from parquet; diff --git a/parquet/go b/parquet/go index ec520d8..c033fde 100755 --- a/parquet/go +++ b/parquet/go @@ -1 +1,12 @@ -./make && ../sqlite/sqlite3 -init ./cmds.txt < /dev/null +#!/bin/bash + +set -euo pipefail +./make + + +if [ ! -v DEBUG ]; then + ../sqlite/sqlite3 -init ./cmds.txt < /dev/null +else + gdb -ex run --args ../sqlite/sqlite3 -init ./cmds.txt +fi + diff --git a/parquet/make b/parquet/make index 1ab3a82..2abb735 100755 --- a/parquet/make +++ b/parquet/make @@ -1,6 +1,7 @@ #!/bin/bash -set -x +set -euo pipefail +#set -x PARQUET_CPP=${PARQUET_CPP:-~/src/parquet-cpp} @@ -11,9 +12,10 @@ clean() { build() { clean - g++ -std=c++11 -Wall -fPIC -c -g parquet.cc -I ../sqlite -lz -ldl -lpthread + # -O3 -s -DNDEBUG + g++ -O3 -std=c++11 -Wall -fPIC -c -g *.cc -I ../sqlite -lz -ldl -lpthread - g++ -shared -o libparquet.so parquet.o \ + g++ -O3 -shared -o libparquet.so *.o \ ${PARQUET_CPP}/build/release/libparquet.a \ ${PARQUET_CPP}/thrift_ep/src/thrift_ep-install/lib/libthrift.a \ ${PARQUET_CPP}/build/release/libarrow.so \ diff --git a/parquet/parquet.cc b/parquet/parquet.cc index 61b03bc..c5eac0d 100644 --- a/parquet/parquet.cc +++ b/parquet/parquet.cc @@ -18,245 +18,13 @@ SQLITE_EXTENSION_INIT1 #include #include -#include "parquet/api/reader.h" - -char const *EMPTY_STRING = ""; - -void gogo() { - printf("ok"); - try { - std::unique_ptr reader = - parquet::ParquetFileReader::OpenFile("/home/cldellow/src/csv2parquet/12m.parquet.snappy"); - printf("%d\n", reader->metadata()->size()); - printf("%ld\n", reader->metadata()->num_rows()); - } catch (const std::exception& e) { - std::cerr << "Parquet error: " << e.what() << std::endl; - } -} - -/* Max size of the error message in a CsvReader */ -#define PARQUET_MXERR 200 - -/* Size of the CsvReader input buffer */ -#define PARQUET_INBUFSZ 1024 - -/* A context object used when read a Parquet file. */ -typedef struct CsvReader CsvReader; -struct CsvReader { - FILE *in; /* Read the Parquet text from this input stream */ - char *z; /* Accumulated text for a field */ - int n; /* Number of bytes in z */ - int nAlloc; /* Space allocated for z[] */ - int nLine; /* Current line number */ - int bNotFirst; /* True if prior text has been seen */ - int cTerm; /* Character that terminated the most recent field */ - size_t iIn; /* Next unread character in the input buffer */ - size_t nIn; /* Number of characters in the input buffer */ - char *zIn; /* The input buffer */ - char zErr[PARQUET_MXERR]; /* Error message */ -}; - -/* Initialize a CsvReader object */ -static void csv_reader_init(CsvReader *p){ - p->in = 0; - p->z = 0; - p->n = 0; - p->nAlloc = 0; - p->nLine = 0; - p->bNotFirst = 0; - p->nIn = 0; - p->zIn = 0; - p->zErr[0] = 0; -} - -/* Close and reset a CsvReader object */ -static void csv_reader_reset(CsvReader *p){ - if( p->in ){ - fclose(p->in); - sqlite3_free(p->zIn); - } - sqlite3_free(p->z); - csv_reader_init(p); -} - -/* Report an error on a CsvReader */ -static void csv_errmsg(CsvReader *p, const char *zFormat, ...){ - va_list ap; - va_start(ap, zFormat); - sqlite3_vsnprintf(PARQUET_MXERR, p->zErr, zFormat, ap); - va_end(ap); -} - -/* Open the file associated with a CsvReader -** Return the number of errors. -*/ -static int csv_reader_open( - CsvReader *p, /* The reader to open */ - const char *zFilename, /* Read from this filename */ - const char *zData /* ... or use this data */ -){ - if( zFilename ){ - p->zIn = (char*)sqlite3_malloc( PARQUET_INBUFSZ ); - if( p->zIn==0 ){ - csv_errmsg(p, "out of memory"); - return 1; - } - p->in = fopen(zFilename, "rb"); - if( p->in==0 ){ - csv_reader_reset(p); - csv_errmsg(p, "cannot open '%s' for reading", zFilename); - return 1; - } - }else{ - assert( p->in==0 ); - p->zIn = (char*)zData; - p->nIn = strlen(zData); - } - return 0; -} - -/* The input buffer has overflowed. Refill the input buffer, then -** return the next character -*/ -static int csv_getc_refill(CsvReader *p){ - size_t got; - - assert( p->iIn>=p->nIn ); /* Only called on an empty input buffer */ - assert( p->in!=0 ); /* Only called if reading froma file */ - - got = fread(p->zIn, 1, PARQUET_INBUFSZ, p->in); - if( got==0 ) return EOF; - p->nIn = got; - p->iIn = 1; - return p->zIn[0]; -} - -/* Return the next character of input. Return EOF at end of input. */ -static int csv_getc(CsvReader *p){ - if( p->iIn >= p->nIn ){ - if( p->in!=0 ) return csv_getc_refill(p); - return EOF; - } - return ((unsigned char*)p->zIn)[p->iIn++]; -} - -/* Increase the size of p->z and append character c to the end. -** Return 0 on success and non-zero if there is an OOM error */ -static int csv_resize_and_append(CsvReader *p, char c){ - char *zNew; - int nNew = p->nAlloc*2 + 100; - zNew = (char*)sqlite3_realloc64(p->z, nNew); - if( zNew ){ - p->z = zNew; - p->nAlloc = nNew; - p->z[p->n++] = c; - return 0; - }else{ - csv_errmsg(p, "out of memory"); - return 1; - } -} - -/* Append a single character to the CsvReader.z[] array. -** Return 0 on success and non-zero if there is an OOM error */ -static int csv_append(CsvReader *p, char c){ - if( p->n>=p->nAlloc-1 ) return csv_resize_and_append(p, c); - p->z[p->n++] = c; - return 0; -} - -/* Read a single field of Parquet text. Compatible with rfc4180 and extended -** with the option of having a separator other than ",". -** -** + Input comes from p->in. -** + Store results in p->z of length p->n. Space to hold p->z comes -** from sqlite3_malloc64(). -** + Keep track of the line number in p->nLine. -** + Store the character that terminates the field in p->cTerm. Store -** EOF on end-of-file. -** -** Return "" at EOF. Return 0 on an OOM error. -*/ -static const char *csv_read_one_field(CsvReader *p){ - int c; - p->n = 0; - c = csv_getc(p); - if( c==EOF ){ - p->cTerm = EOF; - return EMPTY_STRING; - } - if( c=='"' ){ - int pc, ppc; - int startLine = p->nLine; - pc = ppc = 0; - while( 1 ){ - c = csv_getc(p); - if( c<='"' || pc=='"' ){ - if( c=='\n' ) p->nLine++; - if( c=='"' ){ - if( pc=='"' ){ - pc = 0; - continue; - } - } - if( (c==',' && pc=='"') - || (c=='\n' && pc=='"') - || (c=='\n' && pc=='\r' && ppc=='"') - || (c==EOF && pc=='"') - ){ - do{ p->n--; }while( p->z[p->n]!='"' ); - p->cTerm = (char)c; - break; - } - if( pc=='"' && c!='\r' ){ - csv_errmsg(p, "line %d: unescaped %c character", p->nLine, '"'); - break; - } - if( c==EOF ){ - csv_errmsg(p, "line %d: unterminated %c-quoted field\n", - startLine, '"'); - p->cTerm = (char)c; - break; - } - } - if( csv_append(p, (char)c) ) return 0; - ppc = pc; - pc = c; - } - }else{ - /* If this is the first field being parsed and it begins with the - ** UTF-8 BOM (0xEF BB BF) then skip the BOM */ - if( (c&0xff)==0xef && p->bNotFirst==0 ){ - csv_append(p, (char)c); - c = csv_getc(p); - if( (c&0xff)==0xbb ){ - csv_append(p, (char)c); - c = csv_getc(p); - if( (c&0xff)==0xbf ){ - p->bNotFirst = 1; - p->n = 0; - return csv_read_one_field(p); - } - } - } - while( c>',' || (c!=EOF && c!=',' && c!='\n') ){ - if( csv_append(p, (char)c) ) return 0; - c = csv_getc(p); - } - if( c=='\n' ){ - p->nLine++; - if( p->n>0 && p->z[p->n-1]=='\r' ) p->n--; - } - p->cTerm = (char)c; - } - if( p->z ) p->z[p->n] = 0; - p->bNotFirst = 1; - return p->z; -} +#include +#include "parquet_table.h" +#include "parquet_cursor.h" /* Forward references to the various virtual table methods implemented -** in this file. */ + * in this file. */ static int parquetCreate(sqlite3*, void*, int, const char*const*, sqlite3_vtab**,char**); static int parquetConnect(sqlite3*, void*, int, const char*const*, @@ -273,290 +41,65 @@ static int parquetColumn(sqlite3_vtab_cursor*,sqlite3_context*,int); static int parquetRowid(sqlite3_vtab_cursor*,sqlite3_int64*); /* An instance of the Parquet virtual table */ -typedef struct ParquetTable { +typedef struct sqlite3_vtab_parquet { sqlite3_vtab base; /* Base class. Must be first */ - char *zFilename; /* Name of the Parquet file */ - char *zData; /* Raw Parquet data in lieu of zFilename */ - long iStart; /* Offset to start of data in zFilename */ - unsigned int nCol; /* Number of columns in the Parquet file */ - unsigned int tstFlags; /* Bit values used for testing */ -} ParquetTable; + ParquetTable* table; +} sqlite3_vtab_parquet; /* A cursor for the Parquet virtual table */ -typedef struct ParquetCursor { +typedef struct sqlite3_vtab_cursor_parquet { sqlite3_vtab_cursor base; /* Base class. Must be first */ - CsvReader rdr; /* The CsvReader object */ - char **azVal; /* Value of the current row */ - int *aLen; /* Length of each entry */ - sqlite3_int64 iRowid; /* The current rowid. Negative for EOF */ -} ParquetCursor; - -/* Transfer error message text from a reader into a ParquetTable */ -static void csv_xfer_error(ParquetTable *pTab, CsvReader *pRdr){ - sqlite3_free(pTab->base.zErrMsg); - pTab->base.zErrMsg = sqlite3_mprintf("%s", pRdr->zErr); -} + ParquetCursor* cursor; +} sqlite3_vtab_cursor_parquet; /* -** This method is the destructor fo a ParquetTable object. +** This method is the destructor fo a sqlite3_vtab_parquet object. */ static int parquetDisconnect(sqlite3_vtab *pVtab){ - ParquetTable *p = (ParquetTable*)pVtab; - sqlite3_free(p->zFilename); - sqlite3_free(p->zData); + sqlite3_vtab_parquet *p = (sqlite3_vtab_parquet*)pVtab; + delete p->table; sqlite3_free(p); return SQLITE_OK; } -/* Skip leading whitespace. Return a pointer to the first non-whitespace -** character, or to the zero terminator if the string has only whitespace */ -static const char *csv_skip_whitespace(const char *z){ - while( isspace((unsigned char)z[0]) ) z++; - return z; -} - -/* Remove trailing whitespace from the end of string z[] */ -static void csv_trim_whitespace(char *z){ - size_t n = strlen(z); - while( n>0 && isspace((unsigned char)z[n]) ) n--; - z[n] = 0; -} - -/* Dequote the string */ -static void csv_dequote(char *z){ - int j; - char cQuote = z[0]; - size_t i, n; - - if( cQuote!='\'' && cQuote!='"' ) return; - n = strlen(z); - if( n<2 || z[n-1]!=z[0] ) return; - for(i=1, j=0; izErr. If there are no errors, p->zErr[0]==0. -*/ -static int csv_string_parameter( - CsvReader *p, /* Leave the error message here, if there is one */ - const char *zParam, /* Parameter we are checking for */ - const char *zArg, /* Raw text of the virtual table argment */ - char **pzVal /* Write the dequoted string value here */ -){ - const char *zValue; - zValue = csv_parameter(zParam,(int)strlen(zParam),zArg); - if( zValue==0 ) return 0; - p->zErr[0] = 0; - if( *pzVal ){ - csv_errmsg(p, "more than one '%s' parameter", zParam); - return 1; - } - *pzVal = sqlite3_mprintf("%s", zValue); - if( *pzVal==0 ){ - csv_errmsg(p, "out of memory"); - return 1; - } - csv_trim_whitespace(*pzVal); - csv_dequote(*pzVal); - return 1; -} - - -/* Return 0 if the argument is false and 1 if it is true. Return -1 if -** we cannot really tell. -*/ -static int csv_boolean(const char *z){ - if( sqlite3_stricmp("yes",z)==0 - || sqlite3_stricmp("on",z)==0 - || sqlite3_stricmp("true",z)==0 - || (z[0]=='1' && z[1]==0) - ){ - return 1; - } - if( sqlite3_stricmp("no",z)==0 - || sqlite3_stricmp("off",z)==0 - || sqlite3_stricmp("false",z)==0 - || (z[0]=='0' && z[1]==0) - ){ - return 0; - } - return -1; -} - - -/* -** Parameters: -** filename=FILENAME Name of file containing Parquet content -** data=TEXT Direct Parquet content. -** schema=SCHEMA Alternative Parquet schema. -** header=YES|NO First row of Parquet defines the names of -** columns if "yes". Default "no". -** columns=N Assume the Parquet file contains N columns. -** -** -** If schema= is omitted, then the columns are named "c0", "c1", "c2", -** and so forth. If columns=N is omitted, then the file is opened and -** the number of columns in the first row is counted to determine the -** column count. If header=YES, then the first row is skipped. -*/ static int parquetConnect( sqlite3 *db, void *pAux, - int argcOrig, const char *const*argv, + int argc, + const char *const*argv, sqlite3_vtab **ppVtab, char **pzErr ){ - unsigned int argc = argcOrig; - ParquetTable *pNew = 0; /* The ParquetTable object to construct */ - int bHeader = -1; /* header= flags. -1 means not seen yet */ - int rc = SQLITE_OK; /* Result code from this routine */ - unsigned int i, j; /* Loop counters */ - int nCol = -99; /* Value of the columns= parameter */ - CsvReader sRdr; /* A Parquet file reader used to store an error - ** message and/or to count the number of columns */ - static const char *azParam[] = { - "filename", "data", "schema", - }; - char *azPValue[3]; /* Parameter values */ -# define PARQUET_FILENAME (azPValue[0]) -# define PARQUET_DATA (azPValue[1]) -# define PARQUET_SCHEMA (azPValue[2]) + if(argc != 4 || strlen(argv[3]) < 2) { + *pzErr = sqlite3_mprintf("must provide exactly one argument, the path to a parquet file"); + return SQLITE_ERROR; + } + // Remove the delimiting single quotes + std::string fname = argv[3]; + fname = fname.substr(1, fname.length() - 2); + std::unique_ptr table(new ParquetTable(fname)); - assert( sizeof(azPValue)==sizeof(azParam) ); - memset(&sRdr, 0, sizeof(sRdr)); - memset(azPValue, 0, sizeof(azPValue)); - for(i=3; i=0 ){ - csv_errmsg(&sRdr, "more than one 'header' parameter"); - goto parquet_connect_error; - } - x = csv_boolean(zValue); - if( x==1 ){ - bHeader = 1; - }else if( x==0 ){ - bHeader = 0; - }else{ - csv_errmsg(&sRdr, "unrecognized argument to 'header': %s", zValue); - goto parquet_connect_error; - } - }else - if( (zValue = csv_parameter("columns",7,z))!=0 ){ - if( nCol>0 ){ - csv_errmsg(&sRdr, "more than one 'columns' parameter"); - goto parquet_connect_error; - } - nCol = atoi(zValue); - if( nCol<=0 ){ - csv_errmsg(&sRdr, "must have at least one column"); - goto parquet_connect_error; - } - }else - { - csv_errmsg(&sRdr, "unrecognized parameter '%s'", z); - goto parquet_connect_error; - } - } - if( (PARQUET_FILENAME==0)==(PARQUET_DATA==0) ){ - csv_errmsg(&sRdr, "must either filename= or data= but not both"); - goto parquet_connect_error; - } - if( nCol<=0 && csv_reader_open(&sRdr, PARQUET_FILENAME, PARQUET_DATA) ){ - goto parquet_connect_error; - } - pNew = (ParquetTable*)sqlite3_malloc( sizeof(*pNew) ); - *ppVtab = (sqlite3_vtab*)pNew; - if( pNew==0 ) goto parquet_connect_oom; - memset(pNew, 0, sizeof(*pNew)); - if( nCol>0 ){ - pNew->nCol = nCol; - }else{ - do{ - const char *z = csv_read_one_field(&sRdr); - if( z==0 ) goto parquet_connect_oom; - pNew->nCol++; - }while( sRdr.cTerm==',' ); - } - pNew->zFilename = PARQUET_FILENAME; PARQUET_FILENAME = 0; - pNew->zData = PARQUET_DATA; PARQUET_DATA = 0; - pNew->iStart = bHeader==1 ? ftell(sRdr.in) : 0; - csv_reader_reset(&sRdr); - if( PARQUET_SCHEMA==0 ){ - const char *zSep = EMPTY_STRING; - PARQUET_SCHEMA = sqlite3_mprintf("CREATE TABLE x("); - if( PARQUET_SCHEMA==0 ) goto parquet_connect_oom; - for(i=0; inCol; i++){ - PARQUET_SCHEMA = sqlite3_mprintf("%z%sc%d TEXT",PARQUET_SCHEMA, zSep, i); - zSep = ","; - } - PARQUET_SCHEMA = sqlite3_mprintf("%z);", PARQUET_SCHEMA); - } - rc = sqlite3_declare_vtab(db, PARQUET_SCHEMA); - if( rc ) goto parquet_connect_error; - for(i=0; i vtab( + (sqlite3_vtab_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_parquet)), + sqlite3_free); + memset(vtab.get(), 0, sizeof(*vtab.get())); + + try { + std::string create = table->CreateStatement(); + int rc = sqlite3_declare_vtab(db, create.data()); + if(rc) + return rc; + + } catch (const std::exception& e) { + *pzErr = sqlite3_mprintf(e.what()); + return SQLITE_ERROR; } + + vtab->table = table.release(); + *ppVtab = (sqlite3_vtab*)vtab.release(); return SQLITE_OK; - -parquet_connect_oom: - rc = SQLITE_NOMEM; - csv_errmsg(&sRdr, "out of memory"); - -parquet_connect_error: - if( pNew ) parquetDisconnect(&pNew->base); - for(i=0; ibase.pVtab; - unsigned int i; - for(i=0; inCol; i++){ - sqlite3_free(pCur->azVal[i]); - pCur->azVal[i] = 0; - pCur->aLen[i] = 0; - } } /* @@ -574,96 +117,89 @@ static int parquetCreate( } /* -** Destructor for a ParquetCursor. +** Destructor for a sqlite3_vtab_cursor_parquet. */ static int parquetClose(sqlite3_vtab_cursor *cur){ - ParquetCursor *pCur = (ParquetCursor*)cur; - parquetCursorRowReset(pCur); - csv_reader_reset(&pCur->rdr); + sqlite3_vtab_cursor_parquet* p = (sqlite3_vtab_cursor_parquet*)cur; + delete p->cursor; sqlite3_free(cur); return SQLITE_OK; } /* -** Constructor for a new ParquetTable cursor object. +** Constructor for a new sqlite3_vtab_parquet cursor object. */ static int parquetOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor){ - ParquetTable *pTab = (ParquetTable*)p; - ParquetCursor *pCur; - size_t nByte; - nByte = sizeof(*pCur) + (sizeof(char*)+sizeof(int))*pTab->nCol; - pCur = (ParquetCursor*)sqlite3_malloc64( nByte ); - if( pCur==0 ) return SQLITE_NOMEM; - memset(pCur, 0, nByte); - pCur->azVal = (char**)&pCur[1]; - pCur->aLen = (int*)&pCur->azVal[pTab->nCol]; - *ppCursor = &pCur->base; - if( csv_reader_open(&pCur->rdr, pTab->zFilename, pTab->zData) ){ - csv_xfer_error(pTab, &pCur->rdr); - return SQLITE_ERROR; - } + printf("xOpen\n"); + + std::unique_ptr cursor( + (sqlite3_vtab_cursor_parquet*)sqlite3_malloc(sizeof(sqlite3_vtab_cursor_parquet)), + sqlite3_free); + memset(cursor.get(), 0, sizeof(*cursor.get())); + + sqlite3_vtab_parquet* pParquet = (sqlite3_vtab_parquet*)p; + cursor->cursor = new ParquetCursor(pParquet->table); + + *ppCursor = (sqlite3_vtab_cursor*)cursor.release(); return SQLITE_OK; } /* -** Advance a ParquetCursor to its next row of input. +** Advance a sqlite3_vtab_cursor_parquet to its next row of input. ** Set the EOF marker if we reach the end of input. */ static int parquetNext(sqlite3_vtab_cursor *cur){ - ParquetCursor *pCur = (ParquetCursor*)cur; - ParquetTable *pTab = (ParquetTable*)cur->pVtab; - unsigned int i = 0; - const char *z; - do{ - z = csv_read_one_field(&pCur->rdr); - if( z==0 ){ - csv_xfer_error(pTab, &pCur->rdr); - break; - } - if( inCol ){ - if( pCur->aLen[i] < pCur->rdr.n+1 ){ - char *zNew = (char*)sqlite3_realloc64(pCur->azVal[i], pCur->rdr.n+1); - if( zNew==0 ){ - csv_errmsg(&pCur->rdr, "out of memory"); - csv_xfer_error(pTab, &pCur->rdr); - break; - } - pCur->azVal[i] = zNew; - pCur->aLen[i] = pCur->rdr.n+1; - } - memcpy(pCur->azVal[i], z, pCur->rdr.n+1); - i++; - } - }while( pCur->rdr.cTerm==',' ); - if( z==0 || (pCur->rdr.cTerm==EOF && inCol) ){ - pCur->iRowid = -1; - }else{ - pCur->iRowid++; - while( inCol ){ - sqlite3_free(pCur->azVal[i]); - pCur->azVal[i] = 0; - pCur->aLen[i] = 0; - i++; - } - } + ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; + cursor->next(); return SQLITE_OK; } /* -** Return values of columns for the row at which the ParquetCursor +** Return values of columns for the row at which the sqlite3_vtab_cursor_parquet ** is currently pointing. */ static int parquetColumn( sqlite3_vtab_cursor *cur, /* The cursor */ sqlite3_context *ctx, /* First argument to sqlite3_result_...() */ - int iOrig /* Which column to return */ + int col /* Which column to return */ ){ - unsigned int i = iOrig; - ParquetCursor *pCur = (ParquetCursor*)cur; - ParquetTable *pTab = (ParquetTable*)cur->pVtab; - if( i>=0 && inCol && pCur->azVal[i]!=0 ){ - sqlite3_result_text(ctx, pCur->azVal[i], -1, SQLITE_STATIC); + ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; + cursor->ensureColumn(col); + + if(cursor->isNull(col)) { + sqlite3_result_null(ctx); + } else { + switch(cursor->getPhysicalType(col)) { + case parquet::Type::INT32: + { + int rv = cursor->getInt(col); + sqlite3_result_int(ctx, rv); + } + break; + + case parquet::Type::DOUBLE: + { + double rv = cursor->getDouble(col); + sqlite3_result_double(ctx, rv); + } + break; + case parquet::Type::BYTE_ARRAY: + { + parquet::ByteArray* rv = cursor->getByteArray(col); + sqlite3_result_text(ctx, (const char*)rv->ptr, rv->len, SQLITE_TRANSIENT); + } + break; + + case parquet::Type::BOOLEAN: + case parquet::Type::INT64: + case parquet::Type::FLOAT: + case parquet::Type::INT96: + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + default: + throw std::invalid_argument("cannot handle this type"); + break; + } } return SQLITE_OK; } @@ -672,8 +208,8 @@ static int parquetColumn( ** Return the rowid for the current row. */ static int parquetRowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid){ - ParquetCursor *pCur = (ParquetCursor*)cur; - *pRowid = pCur->iRowid; + ParquetCursor *cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; + *pRowid = cursor->getRowId(); return SQLITE_OK; } @@ -682,8 +218,10 @@ static int parquetRowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid){ ** row of output. */ static int parquetEof(sqlite3_vtab_cursor *cur){ - ParquetCursor *pCur = (ParquetCursor*)cur; - return pCur->iRowid<0; + ParquetCursor* cursor = ((sqlite3_vtab_cursor_parquet*)cur)->cursor; + if(cursor->eof()) + return 1; + return 0; } /* @@ -695,8 +233,11 @@ static int parquetFilter( int idxNum, const char *idxStr, int argc, sqlite3_value **argv ){ - ParquetCursor *pCur = (ParquetCursor*)pVtabCursor; - ParquetTable *pTab = (ParquetTable*)pVtabCursor->pVtab; + printf("xFilter\n"); + //sqlite3_vtab_cursor_parquet *pCur = (sqlite3_vtab_cursor_parquet*)pVtabCursor; + //sqlite3_vtab_parquet *pTab = (sqlite3_vtab_parquet*)pVtabCursor->pVtab; + + /* pCur->iRowid = 0; if( pCur->rdr.in==0 ){ assert( pCur->rdr.zIn==pTab->zData ); @@ -708,6 +249,7 @@ static int parquetFilter( pCur->rdr.iIn = 0; pCur->rdr.nIn = 0; } + */ return parquetNext(pVtabCursor); } diff --git a/parquet/parquet_cursor.cc b/parquet/parquet_cursor.cc new file mode 100644 index 0000000..1573abc --- /dev/null +++ b/parquet/parquet_cursor.cc @@ -0,0 +1,144 @@ +#include "parquet_cursor.h" + +ParquetCursor::ParquetCursor(ParquetTable* table) { + this->table = table; + this->rowId = -1; + // TODO: consider having a long lived handle in ParquetTable that can be borrowed + // without incurring the cost of opening the file from scratch twice + this->reader = parquet::ParquetFileReader::OpenFile(this->table->file.data()); + + this->rowGroupId = -1; + // TODO: handle the case where rowgroups have disjoint schemas? + // TODO: or at least, fail fast if detected + this->rowsLeftInRowGroup = 0; + + this->numRows = reader->metadata()->num_rows(); + this->numRowGroups = reader->metadata()->num_row_groups(); +} + +void ParquetCursor::nextRowGroup() { + // TODO: skip row groups that cannot satisfy the constraints + if(this->rowGroupId >= this->numRowGroups) + return; + + rowGroupId++; + rowGroupMetadata = this->reader->metadata()->RowGroup(0); + rowsLeftInRowGroup = rowGroupMetadata->num_rows(); + rowGroup = reader->RowGroup(rowGroupId); + for(unsigned int i = 0; i < scanners.size(); i++) + scanners[i] = NULL; + + while(types.size() < (unsigned int)rowGroupMetadata->num_columns()) { + types.push_back(rowGroupMetadata->schema()->Column(0)->physical_type()); + } + + for(unsigned int i = 0; i < (unsigned int)rowGroupMetadata->num_columns(); i++) { + types[i] = rowGroupMetadata->schema()->Column(i)->physical_type(); + } +} + +void ParquetCursor::next() { + if(rowsLeftInRowGroup == 0) + nextRowGroup(); + rowsLeftInRowGroup--; + rowId++; +} + +int ParquetCursor::getRowId() { + return rowId; +} + +bool ParquetCursor::eof() { + return rowId >= numRows; +} + +void ParquetCursor::ensureColumn(int col) { + // need to ensure a scanner exists (and skip the # of rows in the rowgroup) + while((unsigned int)col >= scanners.size()) { + scanners.push_back(std::shared_ptr()); + colRows.push_back(-1); + colNulls.push_back(false); + colIntValues.push_back(0); + colDoubleValues.push_back(0); + colByteArrayValues.push_back(parquet::ByteArray()); + } + + if(scanners[col].get() == NULL) { + std::shared_ptr colReader = rowGroup->Column(col); + scanners[col] = parquet::Scanner::Make(colReader); + // TODO: potentially skip rows if rowsLeftInRowGroup != rowGroupMetadata->num_rows() + } + + // Actually fetch a value, stash data in colRows, colNulls, colValues + if(colRows[col] != rowId) { + colRows[col] = rowId; + bool wasNull = false; + + switch(types[col]) { + case parquet::Type::INT32: + { + parquet::Int32Scanner* s = (parquet::Int32Scanner*)scanners[col].get(); + int rv = 0; + if(s->NextValue(&rv, &wasNull)) { + colIntValues[col] = rv; + } else { + throw std::invalid_argument("unexpectedly lacking a next value"); + } + } + break; + case parquet::Type::DOUBLE: + { + parquet::DoubleScanner* s = (parquet::DoubleScanner*)scanners[col].get(); + double rv = 0; + if(s->NextValue(&rv, &wasNull)) { + colDoubleValues[col] = rv; + } else { + throw std::invalid_argument("unexpectedly lacking a next value"); + } + } + break; + case parquet::Type::BYTE_ARRAY: + { + parquet::ByteArrayScanner* s = (parquet::ByteArrayScanner*)scanners[col].get(); + if(!s->NextValue(&colByteArrayValues[col], &wasNull)) { + throw std::invalid_argument("unexpectedly lacking a next value"); + } + } + break; + + case parquet::Type::BOOLEAN: + case parquet::Type::INT64: + case parquet::Type::FLOAT: + case parquet::Type::INT96: + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + default: + throw std::invalid_argument("cannot handle"); + break; + } + + colNulls[col] = wasNull; + } +} + +bool ParquetCursor::isNull(int col) { + return colNulls[col]; +} + +int ParquetCursor::getInt(int col) { + return colIntValues[col]; +} + +double ParquetCursor::getDouble(int col) { + return colDoubleValues[col]; +} + +parquet::ByteArray* ParquetCursor::getByteArray(int col) { + return &colByteArrayValues[col]; +} + + + +parquet::Type::type ParquetCursor::getPhysicalType(int col) { +// return rowGroupMetadata->schema()->Column(col)->physical_type(); + return types[col]; +} diff --git a/parquet/parquet_cursor.h b/parquet/parquet_cursor.h new file mode 100644 index 0000000..b12b96b --- /dev/null +++ b/parquet/parquet_cursor.h @@ -0,0 +1,52 @@ +#ifndef PARQUET_CURSOR_H +#define PARQUET_CURSOR_H + +#include "parquet_table.h" +#include "parquet/api/reader.h" + +class ParquetCursor { + + ParquetTable* table; + std::unique_ptr reader; + std::unique_ptr rowGroupMetadata; + std::shared_ptr rowGroup; + std::vector> scanners; + std::vector types; + + std::vector colRows; + std::vector colNulls; + std::vector colIntValues; + std::vector colDoubleValues; + std::vector colByteArrayValues; + + int rowId; + int rowGroupId; + int numRows; + int numRowGroups; + int rowsLeftInRowGroup; + + void nextRowGroup(); + +public: + ParquetCursor(ParquetTable* table); + int getRowId(); + void next(); + bool eof(); + + void ensureColumn(int col); + bool isNull(int col); + int getInt(int col); + double getDouble(int col); + parquet::ByteArray* getByteArray(int col); + parquet::Type::type getPhysicalType(int col); + /* + sqlite3_result_double() + sqlite3_result_int() + sqlite3_result_int64() + sqlite3_result_null() + sqlite3_result_text() + */ +}; + +#endif + diff --git a/parquet/parquet_table.cc b/parquet/parquet_table.cc new file mode 100644 index 0000000..de69961 --- /dev/null +++ b/parquet/parquet_table.cc @@ -0,0 +1,84 @@ +#include "parquet_table.h" + +#include "parquet/api/reader.h" + +ParquetTable::ParquetTable(std::string file) { + this->file = file; +} + +std::string ParquetTable::CreateStatement() { + std::unique_ptr reader = parquet::ParquetFileReader::OpenFile(file.data()); + // TODO: parse columns from file + std::string text("CREATE TABLE x("); + auto schema = reader->metadata()->schema(); + printf("num cols: %d\n", schema->num_columns()); + for(auto i = 0; i < schema->num_columns(); i++) { + auto _col = schema->GetColumnRoot(i); + + if(!_col->is_primitive()) { + throw std::invalid_argument("parquet file has non-primitive column"); + } + + if(_col->is_repeated()) { + throw std::invalid_argument("parquet file has non-scalar column"); + } + + parquet::schema::PrimitiveNode* col = (parquet::schema::PrimitiveNode*)_col; + + printf("col %d[p=%d:%s, l=%d:%s] is %s\n", + i, + col->physical_type(), + parquet::TypeToString(col->physical_type()).data(), + col->logical_type(), + parquet::LogicalTypeToString(col->logical_type()).data(), + col->name().data()); + + if(i > 0) + text += ", "; + + text += col->name(); + + std::string type; + switch(col->physical_type()) { + case parquet::Type::BOOLEAN: + type = "TINYINT"; + break; + case parquet::Type::INT32: + if(col->logical_type() == parquet::LogicalType::NONE) { + type = "INT"; + } else if(col->logical_type() == parquet::LogicalType::INT_8) { + type = "TINYINT"; + } else if(col->logical_type() == parquet::LogicalType::INT_16) { + type = "SMALLINT"; + } + break; + case parquet::Type::INT64: + type = "BIGINT"; + break; + case parquet::Type::FLOAT: + type = "REAL"; + break; + case parquet::Type::DOUBLE: + type = "DOUBLE"; + break; + case parquet::Type::BYTE_ARRAY: + if(col->logical_type() == parquet::LogicalType::UTF8) { + type = "TEXT"; + } + break; + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + case parquet::Type::INT96: + default: + break; + } + + if(type.empty()) { + throw std::invalid_argument("unsupported type"); + } + printf("...%s\n", type.data()); + text += " "; + text += type; + } + text +=");"; + return text; +} diff --git a/parquet/parquet_table.h b/parquet/parquet_table.h new file mode 100644 index 0000000..fb55e10 --- /dev/null +++ b/parquet/parquet_table.h @@ -0,0 +1,14 @@ +#ifndef PARQUET_TABLE_H +#define PARQUET_TABLE_H + +#include + +class ParquetTable { +public: + ParquetTable(std::string file); + std::string CreateStatement(); + std::string file; + +}; + +#endif