diff --git a/include/pcp/pcpstream.h b/include/pcp/pcpstream.h index 59b2d8a..cc09f6f 100644 --- a/include/pcp/pcpstream.h +++ b/include/pcp/pcpstream.h @@ -28,6 +28,7 @@ #include "util.h" #include "defines.h" #include "buffer.h" +#include "z85.h" /** * \defgroup Pcpstream PCPSTREAMS @@ -58,9 +59,16 @@ struct _pcp_stream_t { FILE *fd; /**< The backend FILE stream */ Buffer *b; /**< The backend Buffer object */ + Buffer *cache; /**< The caching Buffer object (for look ahead read) */ + Buffer *next; /**< The caching Next-Buffer object (for look ahead read) */ uint8_t is_buffer; /**< Set to 1 if the backend is a Buffer */ uint8_t eof; /**< Set to 1 if EOF reached */ uint8_t err; /**< Set to 1 if an error occured */ + uint8_t armor; /**< Set to 1 if Z85 en/de-coding is requested */ + uint8_t determine; /**< Set to 1 to automatically determine armor mode */ + uint8_t firstread; /**< Internal flag, will be set after first read() */ + size_t linewr; /**< Used for Z85 writing, number of chars written on last line */ + size_t blocksize; /**< Blocksize used for z85, if requested */ }; /** The name used everywhere */ @@ -212,6 +220,28 @@ int ps_end(Pcpstream *stream); int ps_err(Pcpstream *stream); +void ps_setdetermine(Pcpstream *stream, size_t blocksize); +void ps_armor(Pcpstream *stream); + +/* read from primary source, decode z85 and out into cache. + if buf != NULL, consider it as the start of encoded data + and remove headers and comments, then continue as normal. */ +size_t ps_read_decode(Pcpstream *stream, Buffer *cache, void *buf, size_t bufsize); + +/* determine if primary source is z85 encoded, put the data + read from it into the cache */ +void ps_determine(Pcpstream *stream); + +/* read and decode the next chunk and put it into stream->next */ +size_t ps_read_next(Pcpstream *stream); + +/* return readbytes from cache. if it is more than left in the cache + fetch (and decode) the next chunk, append it to cache and return from + that */ +size_t ps_read_cached(Pcpstream *stream, void *buf, size_t readbytes); +size_t ps_read_raw(Pcpstream *stream, void *buf, size_t readbytes); + + #endif // HAVE_PCP_PCPSTEAM_H diff --git a/libpcp/pcpstream.c b/libpcp/pcpstream.c index 436919a..66dee48 100644 --- a/libpcp/pcpstream.c +++ b/libpcp/pcpstream.c @@ -24,10 +24,13 @@ Pcpstream *ps_init(void) { Pcpstream *stream = ucmalloc(sizeof(Pcpstream)); stream->b = NULL; + stream->cache = buffer_new(32, "Pcpstreamcache"); stream->fd = NULL; stream->is_buffer = 0; stream->eof = 0; stream->err = 0; + stream->armor = 0; + stream->determine = 0; return stream; } @@ -46,12 +49,21 @@ Pcpstream *ps_new_inbuffer(Buffer *b) { Pcpstream *ps_new_outbuffer() { Pcpstream *stream = ps_init(); - stream->b = buffer_new(32, "Pcpstream"); + stream->b = buffer_new(32, "Pcpstreamoutbuf"); stream->is_buffer = 1; return stream; } -size_t ps_read(Pcpstream *stream, void *buf, size_t readbytes) { +void ps_setdetermine(Pcpstream *stream, size_t blocksize) { + stream->determine = 1; + stream->blocksize = blocksize; +} + +void ps_armor(Pcpstream *stream) { + stream->armor = 1; +} + +size_t ps_read_raw(Pcpstream *stream, void *buf, size_t readbytes) { size_t gotbytes = 0; if(stream->is_buffer) { @@ -76,10 +88,234 @@ size_t ps_read(Pcpstream *stream, void *buf, size_t readbytes) { } } + stream->firstread = 1; return gotbytes; } +/* return readbytes from cache. if it is more than left in the cache + fetch (and decode) the next chunk, append it to cache and return from + that */ +size_t ps_read_cached(Pcpstream *stream, void *buf, size_t readbytes) { + if(buffer_left(stream->cache) <= readbytes) { + /* enough left in current cache */ + return buffer_get_chunk(stream->cache, buf, readbytes); + } + else { + /* not enough, fetch the next chunk */ + ps_read_next(stream); + + /* determine overlapping bytes */ + size_t overlap = readbytes - buffer_left(stream->cache); + + /* fetch the rest from the cache */ + size_t fromcache = buffer_get_chunk(stream->cache, buf, buffer_left(stream->cache)); + + /* fetch the overlap from next, append to buf */ + if(overlap > buffer_left(stream->next)) + overlap = buffer_left(stream->next); + buffer_get_chunk(stream->next, buf+overlap, overlap); + + /* move the rest of stream->next into cache */ + buffer_clear(stream->cache); + void *rest = buffer_get_remainder(stream->next); + buffer_add(stream->cache, rest, buffer_left(stream->next)); + free(rest); + + /* reset next */ + buffer_clear(stream->next); + + return fromcache + overlap; + } +} + +/* read and decode the next chunk and put it into stream->next */ +size_t ps_read_next(Pcpstream *stream) { + if(stream->armor == 1) { + /* fetch next chunk and decode it */ + return ps_read_decode(stream, stream->next, NULL, 0); + } + else { + /* unencoded source, fetch as is */ + void *buf = ucmalloc(stream->blocksize); + size_t got = ps_read_raw(stream, buf, stream->blocksize); + buffer_add(stream->next, buf, got); + return got; + } +} + +size_t ps_read(Pcpstream *stream, void *buf, size_t readbytes) { + if(buffer_size(stream->cache) > 0) { + /* use cache */ + return ps_read_cached(stream, buf, readbytes); + } + else { + /* no cache yet */ + if(stream->determine == 1 && stream->firstread == 0) { + /* fetch the first chunk into the cache and decode, if required, + recursively call ps_read() again to return the apropriate data */ + ps_determine(stream); + return ps_read(stream, buf, readbytes); + } + else if(stream->armor == 1) { + /* z85 encoding has already been determined, therefore the cache + is now filled, use it then */ + return ps_read_cached(stream, buf, readbytes); + } + else { + /* read directly from source */ + return ps_read_raw(stream, buf, readbytes); + } + } +} + +void ps_determine(Pcpstream *stream) { + /* read a raw chunk from source */ + void *buf = ucmalloc(stream->blocksize); + size_t got = ps_read_raw(stream, buf, stream->blocksize); + + /* check if it's binary or not */ + if(_buffer_is_binary(buf, got) != 0) { + /* no, it's armored */ + stream->armor = 1; + ps_read_decode(stream, stream->cache, buf, got); + } + else { + /* just put the raw stuff into the cache */ + buffer_add(stream->cache, buf, got); + } +} + +size_t ps_read_decode(Pcpstream *stream, Buffer *cache, void *buf, size_t bufsize) { + size_t zdiff = 1; + size_t i = 0; + Buffer *z = buffer_new(32, "ztemp"); + + if(bufsize > 0) { + /* remove newlines, comments and headers, if any */ + char *z85 = pcp_readz85string(buf, bufsize); + buffer_add(z, z85, strlen(z85)); + + /* check if we need to read more in order to get a full block */ + zdiff = stream->blocksize - strlen(z85); + i = strlen(z85); + free(z85); + } + + if(zdiff > 0) { + /* read in bytewise, ignore newlines and add until the block is full */ + uint8_t c; + while (i < stream->blocksize) { + if (ps_read_raw(stream, &c, 1) == 1) { + if(c != '\r' && c != '\n') { + buffer_add8(z, c); + i++; + } + } + else + break; + } + } + + /* finally, decode it and put into cache */ + size_t binlen, outlen; + unsigned char *bin = pcp_z85_decode(buffer_get_str(z), &binlen); + if(bin == NULL) { + /* it's not z85 encoded, so threat it as binary */ + stream->armor = 1; + buffer_add_buf(stream->cache, z); + outlen = buffer_size(stream->cache); + } + else { + /* yes, successfully decoded it, put into cache */ + buffer_add(stream->cache, bin, binlen); + outlen = binlen; + } + + buffer_free(z); + + return outlen; +} + size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes) { + Buffer *z = buffer_new(32, "Pcpwritetemp"); + + if(stream->armor == 1) { + if(buffer_size(stream->cache) + writebytes < stream->blocksize) { + /* just put it into the cache and done with it */ + buffer_add(stream->cache, buf, writebytes); + } + else { + /* z85 encode cache+buf */ + + /* check if there's an overlap, if yes, put it aside for the moment */ + void *aside = NULL; + size_t overlap = (buffer_size(stream->cache) + writebytes) - stream->blocksize; + if(overlap > 0) { + /* yes, store the overlap, put the left part into the cache */ + aside = ucmalloc(overlap); + memcpy(aside, buf + (writebytes - overlap), overlap); /* FIXME: check if this works */ + buffer_add(stream->cache, buf, writebytes - overlap); + } + else { + /* cache+buf == blocksize */ + buffer_add(stream->cache, buf, writebytes); + } + + /* do z85 0 padding, manually */ + if(buffer_size(stream->cache) % 4 != 0) { + size_t outlen = buffer_size(stream->cache); + while (outlen % 4 != 0) + buffer_add8(stream->cache, 0); + } + + size_t zlen, i, pos; + zlen = (buffer_size(stream->cache) * 5 / 4); + char *z85 = ucmalloc(zlen); + + zmq_z85_encode(z85, buffer_get(stream->cache), buffer_size(stream->cache)); + + pos = stream->linewr; + for(i=0; i= 71) { + buffer_add8(z, '\r'); + buffer_add8(z, '\n'); + pos = 1; + } + else + pos++; + buffer_add8(z, z85[i]); + } + stream->linewr = pos; + + buffer_clear(stream->cache); + if(aside != NULL) { + /* there is an overlapping rest, put it into the cache + FIXME: write it on calling ps_close() or ad some ps_finish() function */ + buffer_add(stream->cache, aside, overlap); + } + } + } + else { + buffer_add(z, buf, writebytes); + } + + if(stream->is_buffer) { + buffer_add(stream->b, buffer_get(z), buffer_size(z)); + writebytes = buffer_size(z); + } + else { + writebytes = fwrite(buffer_get(z), 1, buffer_size(z), stream->fd); + if(ferror(stream->fd) != 0 || writebytes < buffer_size(z)) { + stream->err = 1; + writebytes = 0; + } + } + + buffer_free(z); + return writebytes; +} + +size_t ps_writeOLD(Pcpstream *stream, void *buf, size_t writebytes) { size_t donebytes = 0; if(stream->is_buffer) {