diff --git a/include/pcp/pcpstream.h b/include/pcp/pcpstream.h index cc09f6f..debed1a 100644 --- a/include/pcp/pcpstream.h +++ b/include/pcp/pcpstream.h @@ -146,6 +146,19 @@ size_t ps_read(Pcpstream *stream, void *buf, size_t readbytes); */ size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes); +/** Finalize writing the stream. + + You NEED to call this function after you're done writing + to the stream if you enabled Z85 armoring using ps_armor(). + + It writes the remaining bytes (encoded) to the stream destination. + + \param[in] stream The input stream to write to. + + \return Returns the number of bytes written. in case of errors it returns 0. + */ +size_t ps_finish(Pcpstream *stream); + /** Write a formatted string to the stream. Use an printf() style format string to print something out @@ -220,8 +233,25 @@ int ps_end(Pcpstream *stream); int ps_err(Pcpstream *stream); +/** Enable auto Z85 encoding detection for an input stream. + + If you're not sure if your input is Z85 encoded, enable + detection. + + \param[in] stream The stream object. + + \param[in] blocksize The blocksize to for Z85 decoding (if encoded). + */ void ps_setdetermine(Pcpstream *stream, size_t blocksize); -void ps_armor(Pcpstream *stream); + + +/** Enable Z85 encoding for an output stream. + + \param[in] stream The stream object. + + \param[in] blocksize The blocksize to for Z85 encoding. + */ +void ps_armor(Pcpstream *stream, size_t blocksize); /* read from primary source, decode z85 and out into cache. if buf != NULL, consider it as the start of encoded data @@ -239,8 +269,15 @@ size_t ps_read_next(Pcpstream *stream); fetch (and decode) the next chunk, append it to cache and return from that */ size_t ps_read_cached(Pcpstream *stream, void *buf, size_t readbytes); + +/* really read from the source */ size_t ps_read_raw(Pcpstream *stream, void *buf, size_t readbytes); +/* helper, Z85 encodes current cache into the dst buffer */ +void ps_write_encode(Pcpstream *stream, Buffer *dst); + +/* really write the buffer z into the output stream */ +size_t ps_write_buf(Pcpstream *stream, Buffer *z); #endif // HAVE_PCP_PCPSTEAM_H diff --git a/libpcp/pcpstream.c b/libpcp/pcpstream.c index 66dee48..eefbce6 100644 --- a/libpcp/pcpstream.c +++ b/libpcp/pcpstream.c @@ -59,8 +59,9 @@ void ps_setdetermine(Pcpstream *stream, size_t blocksize) { stream->blocksize = blocksize; } -void ps_armor(Pcpstream *stream) { +void ps_armor(Pcpstream *stream, size_t blocksize) { stream->armor = 1; + stream->blocksize = blocksize; } size_t ps_read_raw(Pcpstream *stream, void *buf, size_t readbytes) { @@ -261,31 +262,8 @@ size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes) { buffer_add(stream->cache, buf, writebytes); } - /* do z85 0 padding, manually */ - if(buffer_size(stream->cache) % 4 != 0) { - size_t outlen = buffer_size(stream->cache); - while (outlen % 4 != 0) - buffer_add8(stream->cache, 0); - } - - size_t zlen, i, pos; - zlen = (buffer_size(stream->cache) * 5 / 4); - char *z85 = ucmalloc(zlen); - - zmq_z85_encode(z85, buffer_get(stream->cache), buffer_size(stream->cache)); - - pos = stream->linewr; - for(i=0; i= 71) { - buffer_add8(z, '\r'); - buffer_add8(z, '\n'); - pos = 1; - } - else - pos++; - buffer_add8(z, z85[i]); - } - stream->linewr = pos; + /* encode the cache into z */ + ps_write_encode(stream, z); buffer_clear(stream->cache); if(aside != NULL) { @@ -299,6 +277,49 @@ size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes) { buffer_add(z, buf, writebytes); } + size_t outsize = ps_write_buf(stream, z); + + buffer_free(z); + + return outsize; +} + +void ps_write_encode(Pcpstream *stream, Buffer *dst) { + size_t zlen, i, pos; + + /* do z85 0 padding, manually */ + if(buffer_size(stream->cache) % 4 != 0) { + size_t outlen = buffer_size(stream->cache); + while (outlen % 4 != 0) + buffer_add8(stream->cache, 0); + } + + /* z85 encode */ + zlen = (buffer_size(stream->cache) * 5 / 4); + char *z85 = ucmalloc(zlen); + + zmq_z85_encode(z85, buffer_get(stream->cache), buffer_size(stream->cache)); + + /* add newlines */ + pos = stream->linewr; + for(i=0; i= 71) { + buffer_add8(dst, '\r'); + buffer_add8(dst, '\n'); + pos = 1; + } + else + pos++; + buffer_add8(dst, z85[i]); + } + + /* remember where to start next */ + stream->linewr = pos; +} + +size_t ps_write_buf(Pcpstream *stream, Buffer *z) { + size_t writebytes; + if(stream->is_buffer) { buffer_add(stream->b, buffer_get(z), buffer_size(z)); writebytes = buffer_size(z); @@ -311,24 +332,18 @@ size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes) { } } - buffer_free(z); return writebytes; } -size_t ps_writeOLD(Pcpstream *stream, void *buf, size_t writebytes) { - size_t donebytes = 0; - - if(stream->is_buffer) { - buffer_add(stream->b, buf, writebytes); - donebytes = writebytes; - } - else { - donebytes = fwrite(buf, 1, writebytes, stream->fd); - if(ferror(stream->fd) != 0 || donebytes < writebytes) - stream->err = 1; - } - - return writebytes; +size_t ps_finish(Pcpstream *stream) { + size_t outsize = 0; + if(buffer_left(stream->cache) > 0) { + Buffer *z = buffer_new(32, "Pcpwritetemp"); + ps_write_encode(stream, z); + outsize = ps_write_buf(stream, z); + buffer_free(z); + } + return outsize; } size_t ps_print(Pcpstream *stream, const char * fmt, ...) {