finally fixed all stream related problems, z85 transparent en/decoding works, unittests all ok.

This commit is contained in:
git@daemon.de
2014-02-27 13:55:43 +01:00
parent c11ce76d21
commit 97f4d14d3b
17 changed files with 209 additions and 68 deletions

View File

@@ -60,8 +60,8 @@ void ps_setdetermine(Pcpstream *stream, size_t blocksize) {
stream->determine = 1;
stream->blocksize = blocksize + (5 - (blocksize % 5));
if(stream->cache == NULL) {
stream->cache = buffer_new(32, "Pcpstreamcache");
stream->next = buffer_new(32, "Pcpstreamcachenext");
stream->cache = buffer_new(32, "Pcpstreamcachedetermine");
stream->next = buffer_new(32, "Pcpstreamcachenextdetermin");
}
}
@@ -75,6 +75,10 @@ void ps_armor(Pcpstream *stream, size_t blocksize) {
}
}
void ps_unarmor(Pcpstream *stream) {
stream->armor = 0;
}
size_t ps_read_raw(Pcpstream *stream, void *buf, size_t readbytes) {
size_t gotbytes = 0;
@@ -107,18 +111,28 @@ size_t ps_read_raw(Pcpstream *stream, void *buf, size_t readbytes) {
/* return readbytes from cache. if it is more than left in the cache
fetch (and decode) the next chunk, append it to cache and return from
that */
size_t ps_read_cached(Pcpstream *stream, void *buf, size_t readbytes) {
if(buffer_left(stream->cache) <= readbytes && buffer_left(stream->cache) > 0 && readbytes <= stream->blocksize) {
/*
fprintf(stderr, "%ld <= %ld && %ld <= %ld\n",
readbytes, buffer_left(stream->cache), readbytes, stream->blocksize) ;
fprintf(stderr, "%d == 1 && %ld >= %ld\n", ps_left(stream), readbytes, buffer_left(stream->cache));
*/
if(readbytes <= buffer_left(stream->cache) && readbytes <= stream->blocksize) {
/* enough left in current cache */
return buffer_get_chunk(stream->cache, buf, readbytes);
}
else if(ps_left(stream) == 1 && readbytes >= buffer_left(stream->cache)) {
return buffer_get_chunk(stream->cache, buf, buffer_left(stream->cache));
}
else {
/* request for chunk larger than what we've got in the cache */
Buffer *tmp = buffer_new(stream->blocksize, "Pcpreadover");
Buffer *tmp = buffer_new(stream->blocksize, "Pcpreadchunktmp");
if( buffer_left(stream->cache) > 0) {
/* put the remaining cache into dest */
buffer_get_chunk_tobuf(stream->cache, tmp, buffer_size(stream->cache));
buffer_get_chunk_tobuf(stream->cache, tmp, buffer_left(stream->cache));
}
/* how much left to fetch */
@@ -170,26 +184,31 @@ size_t ps_read_cached(Pcpstream *stream, void *buf, size_t readbytes) {
/* read and decode the next chunk and put it into stream->next */
size_t ps_read_next(Pcpstream *stream) {
if(stream->armor == 1) {
/* fetch next chunk and decode it */
return ps_read_decode(stream, NULL, 0);
}
else {
/* unencoded source, fetch as is */
void *buf = ucmalloc(stream->blocksize);
size_t got = ps_read_raw(stream, buf, stream->blocksize);
buffer_add(stream->next, buf, got);
return got;
if(ps_left(stream) == 0) {
if(stream->armor == 1) {
/* fetch next chunk and decode it */
return ps_read_decode(stream, NULL, 0);
}
else {
/* unencoded source, fetch as is */
void *buf = ucmalloc(stream->blocksize);
size_t got = ps_read_raw(stream, buf, stream->blocksize);
buffer_add(stream->next, buf, got);
return got;
}
}
else
return 0;
}
size_t ps_read(Pcpstream *stream, void *buf, size_t readbytes) {
size_t got = 0;
if(stream->cache == NULL) {
return ps_read_raw(stream, buf, readbytes);
got = ps_read_raw(stream, buf, readbytes);
}
else if(buffer_size(stream->cache) > 0) {
/* use cache */
return ps_read_cached(stream, buf, readbytes);
got = ps_read_cached(stream, buf, readbytes);
}
else {
/* no cache yet */
@@ -197,18 +216,22 @@ size_t ps_read(Pcpstream *stream, void *buf, size_t readbytes) {
/* fetch the first chunk into the cache and decode, if required,
recursively call ps_read() again to return the apropriate data */
ps_determine(stream);
return ps_read(stream, buf, readbytes);
got = ps_read(stream, buf, readbytes);
}
else if(stream->armor == 1) {
/* z85 encoding has already been determined, therefore the cache
is now filled, use it then */
return ps_read_cached(stream, buf, readbytes);
got = ps_read_cached(stream, buf, readbytes);
}
else {
/* read directly from source */
return ps_read_raw(stream, buf, readbytes);
got = ps_read_raw(stream, buf, readbytes);
}
}
stream->pos += got;
return got;
}
void ps_determine(Pcpstream *stream) {
@@ -285,6 +308,8 @@ size_t ps_read_decode(Pcpstream *stream, void *buf, size_t bufsize) {
size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes) {
Buffer *z = buffer_new(32, "Pcpwritetemp");
stream->is_output = 1;
if(stream->armor == 1) {
if(buffer_size(stream->cache) + writebytes < stream->blocksize) {
/* just put it into the cache and done with it */
@@ -350,13 +375,16 @@ size_t ps_write(Pcpstream *stream, void *buf, size_t writebytes) {
/* actually put it out */
size_t outsize = ps_write_buf(stream, z);
buffer_free(z);
return outsize;
writebytes = outsize;
}
else {
/* buf has been put into the cache only, no writing required */
buffer_free(z);
return writebytes;
}
stream->pos += writebytes;
return writebytes;
}
void ps_write_encode(Pcpstream *stream, Buffer *dst) {
@@ -372,14 +400,14 @@ void ps_write_encode(Pcpstream *stream, Buffer *dst) {
}
/* z85 encode */
zlen = (buffer_size(stream->cache) * 5 / 4);
zlen = (buffer_size(stream->cache) * 5 / 4) + 1;
char *z85 = ucmalloc(zlen);
zmq_z85_encode(z85, buffer_get(stream->cache), buffer_size(stream->cache));
/* add newlines */
pos = stream->linewr;
for(i=0; i<zlen; ++i) {
for(i=0; i<zlen-1; ++i) {
if(pos >= 71) {
buffer_add8(dst, '\r');
buffer_add8(dst, '\n');
@@ -445,7 +473,11 @@ size_t ps_print(Pcpstream *stream, const char * fmt, ...) {
void ps_close(Pcpstream *stream) {
if(stream->cache != NULL) {
assert(buffer_left(stream->cache) == 0); /* there's something left in the cache, call ps_finish() */
if(stream->is_output == 1) {
if(buffer_left(stream->cache) != 0)
buffer_info(stream->cache);
assert(buffer_left(stream->cache) == 0); /* there's something left in the cache, call ps_finish() */
}
buffer_free(stream->cache);
}
@@ -467,25 +499,31 @@ void ps_close(Pcpstream *stream) {
int ps_end(Pcpstream *stream) {
/* simulate open file if there's still something in the cache */
if(stream->cache != NULL)
if(buffer_left(stream->cache) > 0)
if(buffer_left(stream->cache) > 0) {
return 0;
}
return stream->eof;
}
int ps_left(Pcpstream *stream) {
/* used internally to determine if we reached end of source */
if(stream->is_buffer) {
if(buffer_left(stream->b) == 0)
return 1;
else
return 0;
}
else {
return feof(stream->fd);
}
}
int ps_err(Pcpstream *stream) {
return stream->err;
}
size_t ps_tell(Pcpstream *stream) {
if(stream->is_buffer) {
if(stream->b->end > stream->b->offset)
return stream->b->end; /* write buffer */
else
return stream->b->offset; /* read buffer */
}
else {
return ftell(stream->fd);
}
return stream->pos;
}
Buffer *ps_buffer(Pcpstream *stream) {