added ps_finish() to write the remainder out (if any), split writing into a couple of helpers

This commit is contained in:
TLINDEN
2014-02-22 19:58:56 +01:00
parent ab142a47b6
commit 272f7b3dfb
2 changed files with 94 additions and 42 deletions

View File

@@ -146,6 +146,19 @@ size_t ps_read(Pcpstream *stream, void *buf, size_t readbytes);
*/ */
size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes); size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes);
/** Finalize writing the stream.
You NEED to call this function after you're done writing
to the stream if you enabled Z85 armoring using ps_armor().
It writes the remaining bytes (encoded) to the stream destination.
\param[in] stream The input stream to write to.
\return Returns the number of bytes written. in case of errors it returns 0.
*/
size_t ps_finish(Pcpstream *stream);
/** Write a formatted string to the stream. /** Write a formatted string to the stream.
Use an printf() style format string to print something out Use an printf() style format string to print something out
@@ -220,8 +233,25 @@ int ps_end(Pcpstream *stream);
int ps_err(Pcpstream *stream); int ps_err(Pcpstream *stream);
/** Enable auto Z85 encoding detection for an input stream.
If you're not sure if your input is Z85 encoded, enable
detection.
\param[in] stream The stream object.
\param[in] blocksize The blocksize to for Z85 decoding (if encoded).
*/
void ps_setdetermine(Pcpstream *stream, size_t blocksize); void ps_setdetermine(Pcpstream *stream, size_t blocksize);
void ps_armor(Pcpstream *stream);
/** Enable Z85 encoding for an output stream.
\param[in] stream The stream object.
\param[in] blocksize The blocksize to for Z85 encoding.
*/
void ps_armor(Pcpstream *stream, size_t blocksize);
/* read from primary source, decode z85 and out into cache. /* read from primary source, decode z85 and out into cache.
if buf != NULL, consider it as the start of encoded data if buf != NULL, consider it as the start of encoded data
@@ -239,8 +269,15 @@ size_t ps_read_next(Pcpstream *stream);
fetch (and decode) the next chunk, append it to cache and return from fetch (and decode) the next chunk, append it to cache and return from
that */ that */
size_t ps_read_cached(Pcpstream *stream, void *buf, size_t readbytes); size_t ps_read_cached(Pcpstream *stream, void *buf, size_t readbytes);
/* really read from the source */
size_t ps_read_raw(Pcpstream *stream, void *buf, size_t readbytes); size_t ps_read_raw(Pcpstream *stream, void *buf, size_t readbytes);
/* helper, Z85 encodes current cache into the dst buffer */
void ps_write_encode(Pcpstream *stream, Buffer *dst);
/* really write the buffer z into the output stream */
size_t ps_write_buf(Pcpstream *stream, Buffer *z);
#endif // HAVE_PCP_PCPSTEAM_H #endif // HAVE_PCP_PCPSTEAM_H

View File

@@ -59,8 +59,9 @@ void ps_setdetermine(Pcpstream *stream, size_t blocksize) {
stream->blocksize = blocksize; stream->blocksize = blocksize;
} }
void ps_armor(Pcpstream *stream) { void ps_armor(Pcpstream *stream, size_t blocksize) {
stream->armor = 1; stream->armor = 1;
stream->blocksize = blocksize;
} }
size_t ps_read_raw(Pcpstream *stream, void *buf, size_t readbytes) { size_t ps_read_raw(Pcpstream *stream, void *buf, size_t readbytes) {
@@ -261,31 +262,8 @@ size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes) {
buffer_add(stream->cache, buf, writebytes); buffer_add(stream->cache, buf, writebytes);
} }
/* do z85 0 padding, manually */ /* encode the cache into z */
if(buffer_size(stream->cache) % 4 != 0) { ps_write_encode(stream, z);
size_t outlen = buffer_size(stream->cache);
while (outlen % 4 != 0)
buffer_add8(stream->cache, 0);
}
size_t zlen, i, pos;
zlen = (buffer_size(stream->cache) * 5 / 4);
char *z85 = ucmalloc(zlen);
zmq_z85_encode(z85, buffer_get(stream->cache), buffer_size(stream->cache));
pos = stream->linewr;
for(i=0; i<zlen; ++i) {
if(pos >= 71) {
buffer_add8(z, '\r');
buffer_add8(z, '\n');
pos = 1;
}
else
pos++;
buffer_add8(z, z85[i]);
}
stream->linewr = pos;
buffer_clear(stream->cache); buffer_clear(stream->cache);
if(aside != NULL) { if(aside != NULL) {
@@ -299,6 +277,49 @@ size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes) {
buffer_add(z, buf, writebytes); buffer_add(z, buf, writebytes);
} }
size_t outsize = ps_write_buf(stream, z);
buffer_free(z);
return outsize;
}
void ps_write_encode(Pcpstream *stream, Buffer *dst) {
size_t zlen, i, pos;
/* do z85 0 padding, manually */
if(buffer_size(stream->cache) % 4 != 0) {
size_t outlen = buffer_size(stream->cache);
while (outlen % 4 != 0)
buffer_add8(stream->cache, 0);
}
/* z85 encode */
zlen = (buffer_size(stream->cache) * 5 / 4);
char *z85 = ucmalloc(zlen);
zmq_z85_encode(z85, buffer_get(stream->cache), buffer_size(stream->cache));
/* add newlines */
pos = stream->linewr;
for(i=0; i<zlen; ++i) {
if(pos >= 71) {
buffer_add8(dst, '\r');
buffer_add8(dst, '\n');
pos = 1;
}
else
pos++;
buffer_add8(dst, z85[i]);
}
/* remember where to start next */
stream->linewr = pos;
}
size_t ps_write_buf(Pcpstream *stream, Buffer *z) {
size_t writebytes;
if(stream->is_buffer) { if(stream->is_buffer) {
buffer_add(stream->b, buffer_get(z), buffer_size(z)); buffer_add(stream->b, buffer_get(z), buffer_size(z));
writebytes = buffer_size(z); writebytes = buffer_size(z);
@@ -311,24 +332,18 @@ size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes) {
} }
} }
buffer_free(z);
return writebytes; return writebytes;
} }
size_t ps_writeOLD(Pcpstream *stream, void *buf, size_t writebytes) { size_t ps_finish(Pcpstream *stream) {
size_t donebytes = 0; size_t outsize = 0;
if(buffer_left(stream->cache) > 0) {
if(stream->is_buffer) { Buffer *z = buffer_new(32, "Pcpwritetemp");
buffer_add(stream->b, buf, writebytes); ps_write_encode(stream, z);
donebytes = writebytes; outsize = ps_write_buf(stream, z);
buffer_free(z);
} }
else { return outsize;
donebytes = fwrite(buf, 1, writebytes, stream->fd);
if(ferror(stream->fd) != 0 || donebytes < writebytes)
stream->err = 1;
}
return writebytes;
} }
size_t ps_print(Pcpstream *stream, const char * fmt, ...) { size_t ps_print(Pcpstream *stream, const char * fmt, ...) {