From b52ac4b0df641da1bbd526be18d2360db2531252 Mon Sep 17 00:00:00 2001 From: TLINDEN Date: Tue, 23 Dec 2014 20:13:52 +0100 Subject: [PATCH] py: added asym en/decryption, fine tuned test script --- bindings/py/pypcp/context.py | 32 ++++++++---- bindings/py/pypcp/stream.py | 10 ++-- bindings/py/test.py | 99 ++++++++++++++++++++++++++++-------- 3 files changed, 106 insertions(+), 35 deletions(-) diff --git a/bindings/py/pypcp/context.py b/bindings/py/pypcp/context.py index f2d6838..26edfac 100644 --- a/bindings/py/pypcp/context.py +++ b/bindings/py/pypcp/context.py @@ -29,16 +29,16 @@ class Context(object): for key in recipients: libpcp.pcphash_add(self._ctx, key._pk, key._pk.type) - def encrypt(self, string=None, file=None, sign=False, passphrase=None, armor=False): + def encrypt(self, source=None, sign=False, passphrase=None, armor=False): anon = 0 dosign = 0 instream = None outstream = None - if string: - instream = Stream(string=string) + if source: + instream = Stream(source) else: - instream = Stream(file) + self.throw(RuntimeError, "source argument required") outstream = Stream() @@ -84,18 +84,19 @@ class Context(object): - def decrypt(self, string=None, file=None, verify=False, passphrase=None): + def decrypt(self, source=None, verify=False, passphrase=None): doverify = 0 + doanon = 0 instream = None outstream = None if verify: doverify = 1 - if string: - instream = Stream(string=string) + if source: + instream = Stream(source) else: - instream = Stream(file) + self.throw(RuntimeError, "source argument required") libpcp.ps_setdetermine(instream._stream, PCP_BLOCK_SIZE/2) outstream = Stream() @@ -118,8 +119,19 @@ class Context(object): self.throw(IOError, "failed to decrypt") else: # asymmetric - # TODO - pass + if not self._sk: + self.throw(RuntimeError, "no secret key associated with current context") + + if head[0] == PCP_ASYM_CIPHER_ANON: + doanon = 1 + if head[0] == PCP_ASYM_CIPHER_SIG: + doverify + + size = libpcp.pcp_decrypt_stream(self._ctx, instream._stream, + outstream._stream, self._sk, + ffi.NULL, doverify, doanon) + if size <= 0: + self.throw(IOError, "failed to decrypt") # return the raw buffer contents return ffi.buffer(libpcp.buffer_get_remainder(outstream._stream.b), diff --git a/bindings/py/pypcp/stream.py b/bindings/py/pypcp/stream.py index 85d4151..1a1bf8d 100644 --- a/bindings/py/pypcp/stream.py +++ b/bindings/py/pypcp/stream.py @@ -2,12 +2,12 @@ from pypcp.dll import * class Stream(object): - def __init__(self, string=None, file=None): - if file: - fd = open(file, 'r') + def __init__(self, backend=None): + if type(backend) is file: + fd = open(backend, 'r') self._stream = libpcp.ps_new_file(fd) - elif string: - buf = libpcp.buffer_new_buf('inbuf', string, len(string)) + elif type(backend) is str: + buf = libpcp.buffer_new_buf('inbuf', backend, len(backend)) self._stream = libpcp.ps_new_inbuffer(buf) else: self._stream = libpcp.ps_new_outbuffer() diff --git a/bindings/py/test.py b/bindings/py/test.py index 3f26ded..f58ad01 100755 --- a/bindings/py/test.py +++ b/bindings/py/test.py @@ -1,30 +1,89 @@ #!/usr/local/bin/python +from sys import argv, stdout from pypcp import * from pprint import pprint -# import secret key -zbobsec = open("../../tests/key-bobby-sec", "r").read() -bobsec = Key(encoded=zbobsec, passphrase="b") -#bobsec.dump() +orig = "hello world" -# import public key -zalipub = open("../../tests/key-alicia-pub", "r").read() -alipub = PublicKey(encoded=zalipub) -#alipub.dump() -# always required -ctx = Context() +def importkey(filename, passwd=None, secret=True, public=False): + raw = open(filename, "r").read() + if secret and not public: + key = Key(encoded=raw, passphrase=passwd) + return key + else: + key = PublicKey(encoded=raw) + return key + +def asym(): + # import keys + bobSec = importkey("../../tests/key-bobby-sec", "b") + bobPub = importkey("../../tests/key-bobby-pub", public=True) + aliSec = importkey("../../tests/key-alicia-sec", "a") + aliPub = importkey("../../tests/key-alicia-pub", public=True) + + # one context for each + ctxA = Context() + ctxB = Context() + + # prepare ctx' for crypto + ctxA.addkey(aliSec) + ctxA.recipients(bobPub) + + ctxB.addkey(bobSec) + ctxB.recipients(aliPub) + + # Alice encrypt => Bob + # FIXME: if passed as is, then it's empty later on + encrypted = ctxA.encrypt(source="%s" % orig, armor=True) + #print "encrypted:\n%s" % encrypted + + # Bob decrypt from Alice + clear = ctxB.decrypt(source=encrypted) + #print "clear: %s" % clear + + if clear == orig: + return True + else: + return False + + +def sym(): + # always required + ctx = Context() + + # symmetric encryption (self mode) + encrypted = ctx.encrypt(source="%s" % orig, passphrase="x", armor=True) + + # decrypt + clear = ctx.decrypt(source=encrypted, passphrase="x") + + if clear == orig: + return True + else: + return False + + + +def defun(name): + if not globals()[name](): + print "%s: failed" % name + exit(1) + else: + print "%s: ok" % name + return True + + +if len(argv) == 2: + if defun(argv[1]): + exit(0) +else: + # execute all + for func in ["asym", "sym"]: + defun(func) + exit(0) + -# symmetric encryption (self mode) -sencrypted = ctx.encrypt(string="hello world", passphrase="x", armor=True) -print sencrypted -# asymmetric encryption bob => alice -ctx.addkey(bobsec) -ctx.recipients(alipub) -aencrypted = ctx.encrypt(string="hello world", armor=True, sign=False) -#print aencrypted -sclear = ctx.decrypt(string=sencrypted, passphrase="x") -print sclear