mirror of
https://codeberg.org/scip/pcp.git
synced 2025-12-17 12:00:56 +01:00
py: added asym en/decryption, fine tuned test script
This commit is contained in:
@@ -29,16 +29,16 @@ class Context(object):
|
|||||||
for key in recipients:
|
for key in recipients:
|
||||||
libpcp.pcphash_add(self._ctx, key._pk, key._pk.type)
|
libpcp.pcphash_add(self._ctx, key._pk, key._pk.type)
|
||||||
|
|
||||||
def encrypt(self, string=None, file=None, sign=False, passphrase=None, armor=False):
|
def encrypt(self, source=None, sign=False, passphrase=None, armor=False):
|
||||||
anon = 0
|
anon = 0
|
||||||
dosign = 0
|
dosign = 0
|
||||||
instream = None
|
instream = None
|
||||||
outstream = None
|
outstream = None
|
||||||
|
|
||||||
if string:
|
if source:
|
||||||
instream = Stream(string=string)
|
instream = Stream(source)
|
||||||
else:
|
else:
|
||||||
instream = Stream(file)
|
self.throw(RuntimeError, "source argument required")
|
||||||
|
|
||||||
outstream = Stream()
|
outstream = Stream()
|
||||||
|
|
||||||
@@ -84,18 +84,19 @@ class Context(object):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
def decrypt(self, string=None, file=None, verify=False, passphrase=None):
|
def decrypt(self, source=None, verify=False, passphrase=None):
|
||||||
doverify = 0
|
doverify = 0
|
||||||
|
doanon = 0
|
||||||
instream = None
|
instream = None
|
||||||
outstream = None
|
outstream = None
|
||||||
|
|
||||||
if verify:
|
if verify:
|
||||||
doverify = 1
|
doverify = 1
|
||||||
|
|
||||||
if string:
|
if source:
|
||||||
instream = Stream(string=string)
|
instream = Stream(source)
|
||||||
else:
|
else:
|
||||||
instream = Stream(file)
|
self.throw(RuntimeError, "source argument required")
|
||||||
|
|
||||||
libpcp.ps_setdetermine(instream._stream, PCP_BLOCK_SIZE/2)
|
libpcp.ps_setdetermine(instream._stream, PCP_BLOCK_SIZE/2)
|
||||||
outstream = Stream()
|
outstream = Stream()
|
||||||
@@ -118,8 +119,19 @@ class Context(object):
|
|||||||
self.throw(IOError, "failed to decrypt")
|
self.throw(IOError, "failed to decrypt")
|
||||||
else:
|
else:
|
||||||
# asymmetric
|
# asymmetric
|
||||||
# TODO
|
if not self._sk:
|
||||||
pass
|
self.throw(RuntimeError, "no secret key associated with current context")
|
||||||
|
|
||||||
|
if head[0] == PCP_ASYM_CIPHER_ANON:
|
||||||
|
doanon = 1
|
||||||
|
if head[0] == PCP_ASYM_CIPHER_SIG:
|
||||||
|
doverify
|
||||||
|
|
||||||
|
size = libpcp.pcp_decrypt_stream(self._ctx, instream._stream,
|
||||||
|
outstream._stream, self._sk,
|
||||||
|
ffi.NULL, doverify, doanon)
|
||||||
|
if size <= 0:
|
||||||
|
self.throw(IOError, "failed to decrypt")
|
||||||
|
|
||||||
# return the raw buffer contents
|
# return the raw buffer contents
|
||||||
return ffi.buffer(libpcp.buffer_get_remainder(outstream._stream.b),
|
return ffi.buffer(libpcp.buffer_get_remainder(outstream._stream.b),
|
||||||
|
|||||||
@@ -2,12 +2,12 @@ from pypcp.dll import *
|
|||||||
|
|
||||||
|
|
||||||
class Stream(object):
|
class Stream(object):
|
||||||
def __init__(self, string=None, file=None):
|
def __init__(self, backend=None):
|
||||||
if file:
|
if type(backend) is file:
|
||||||
fd = open(file, 'r')
|
fd = open(backend, 'r')
|
||||||
self._stream = libpcp.ps_new_file(fd)
|
self._stream = libpcp.ps_new_file(fd)
|
||||||
elif string:
|
elif type(backend) is str:
|
||||||
buf = libpcp.buffer_new_buf('inbuf', string, len(string))
|
buf = libpcp.buffer_new_buf('inbuf', backend, len(backend))
|
||||||
self._stream = libpcp.ps_new_inbuffer(buf)
|
self._stream = libpcp.ps_new_inbuffer(buf)
|
||||||
else:
|
else:
|
||||||
self._stream = libpcp.ps_new_outbuffer()
|
self._stream = libpcp.ps_new_outbuffer()
|
||||||
|
|||||||
@@ -1,30 +1,89 @@
|
|||||||
#!/usr/local/bin/python
|
#!/usr/local/bin/python
|
||||||
|
|
||||||
|
from sys import argv, stdout
|
||||||
from pypcp import *
|
from pypcp import *
|
||||||
from pprint import pprint
|
from pprint import pprint
|
||||||
|
|
||||||
# import secret key
|
orig = "hello world"
|
||||||
zbobsec = open("../../tests/key-bobby-sec", "r").read()
|
|
||||||
bobsec = Key(encoded=zbobsec, passphrase="b")
|
|
||||||
#bobsec.dump()
|
|
||||||
|
|
||||||
# import public key
|
|
||||||
zalipub = open("../../tests/key-alicia-pub", "r").read()
|
|
||||||
alipub = PublicKey(encoded=zalipub)
|
|
||||||
#alipub.dump()
|
|
||||||
|
|
||||||
# always required
|
def importkey(filename, passwd=None, secret=True, public=False):
|
||||||
ctx = Context()
|
raw = open(filename, "r").read()
|
||||||
|
if secret and not public:
|
||||||
|
key = Key(encoded=raw, passphrase=passwd)
|
||||||
|
return key
|
||||||
|
else:
|
||||||
|
key = PublicKey(encoded=raw)
|
||||||
|
return key
|
||||||
|
|
||||||
|
def asym():
|
||||||
|
# import keys
|
||||||
|
bobSec = importkey("../../tests/key-bobby-sec", "b")
|
||||||
|
bobPub = importkey("../../tests/key-bobby-pub", public=True)
|
||||||
|
aliSec = importkey("../../tests/key-alicia-sec", "a")
|
||||||
|
aliPub = importkey("../../tests/key-alicia-pub", public=True)
|
||||||
|
|
||||||
|
# one context for each
|
||||||
|
ctxA = Context()
|
||||||
|
ctxB = Context()
|
||||||
|
|
||||||
|
# prepare ctx' for crypto
|
||||||
|
ctxA.addkey(aliSec)
|
||||||
|
ctxA.recipients(bobPub)
|
||||||
|
|
||||||
|
ctxB.addkey(bobSec)
|
||||||
|
ctxB.recipients(aliPub)
|
||||||
|
|
||||||
|
# Alice encrypt => Bob
|
||||||
|
# FIXME: if passed as is, then it's empty later on
|
||||||
|
encrypted = ctxA.encrypt(source="%s" % orig, armor=True)
|
||||||
|
#print "encrypted:\n%s" % encrypted
|
||||||
|
|
||||||
|
# Bob decrypt from Alice
|
||||||
|
clear = ctxB.decrypt(source=encrypted)
|
||||||
|
#print "clear: %s" % clear
|
||||||
|
|
||||||
|
if clear == orig:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def sym():
|
||||||
|
# always required
|
||||||
|
ctx = Context()
|
||||||
|
|
||||||
|
# symmetric encryption (self mode)
|
||||||
|
encrypted = ctx.encrypt(source="%s" % orig, passphrase="x", armor=True)
|
||||||
|
|
||||||
|
# decrypt
|
||||||
|
clear = ctx.decrypt(source=encrypted, passphrase="x")
|
||||||
|
|
||||||
|
if clear == orig:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def defun(name):
|
||||||
|
if not globals()[name]():
|
||||||
|
print "%s: failed" % name
|
||||||
|
exit(1)
|
||||||
|
else:
|
||||||
|
print "%s: ok" % name
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if len(argv) == 2:
|
||||||
|
if defun(argv[1]):
|
||||||
|
exit(0)
|
||||||
|
else:
|
||||||
|
# execute all
|
||||||
|
for func in ["asym", "sym"]:
|
||||||
|
defun(func)
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
|
||||||
# symmetric encryption (self mode)
|
|
||||||
sencrypted = ctx.encrypt(string="hello world", passphrase="x", armor=True)
|
|
||||||
print sencrypted
|
|
||||||
|
|
||||||
# asymmetric encryption bob => alice
|
|
||||||
ctx.addkey(bobsec)
|
|
||||||
ctx.recipients(alipub)
|
|
||||||
aencrypted = ctx.encrypt(string="hello world", armor=True, sign=False)
|
|
||||||
#print aencrypted
|
|
||||||
|
|
||||||
sclear = ctx.decrypt(string=sencrypted, passphrase="x")
|
|
||||||
print sclear
|
|
||||||
|
|||||||
Reference in New Issue
Block a user