mirror of
https://codeberg.org/scip/pcp.git
synced 2025-12-17 12:00:56 +01:00
py: added doc, unittests, anonymous encryption mode
This commit is contained in:
@@ -1,44 +1,144 @@
|
||||
#
|
||||
# This file is part of Pretty Curved Privacy (pcp1).
|
||||
#
|
||||
# Copyright (C) 2013-2015 T. von Dein.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
# You can contact me by mail: <tlinden AT cpan DOT org>.
|
||||
#
|
||||
|
||||
from pypcp.dll import *
|
||||
from pypcp.stream import *
|
||||
|
||||
class Context(object):
|
||||
"""
|
||||
A libpcp context object, required for all operations.
|
||||
"""
|
||||
def __init__(self):
|
||||
"""
|
||||
Create a new instance:
|
||||
|
||||
>>> ctx = Context()
|
||||
"""
|
||||
self._ctx = libpcp.ptx_new()
|
||||
self._sk = None
|
||||
|
||||
def __del__(self):
|
||||
"""
|
||||
Destructor, automatically called
|
||||
"""
|
||||
libpcp.ptx_clean(self._ctx)
|
||||
|
||||
def throw(self, E=None, msg=None):
|
||||
"""
|
||||
Used internally to propagate libpcp errors.
|
||||
|
||||
Throw libpcp error:
|
||||
>>> ctx.throw(IOError, "additional message")
|
||||
|
||||
Throw pure python exception:
|
||||
>>> ctx.throw(None, "pypcp error")
|
||||
"""
|
||||
# forward libpcp generated exceptions
|
||||
pcpmsg = ffi.string(self._ctx.pcp_err)
|
||||
if msg:
|
||||
pcpmsg = "%s: %s" % (msg, pcpmsg)
|
||||
pcpmsg = ''
|
||||
|
||||
if self._ctx.pcp_errset == 1:
|
||||
pcpmsg = ffi.string(self._ctx.pcp_err)
|
||||
|
||||
if E:
|
||||
if msg:
|
||||
pcpmsg = "%s: %s" % (msg, pcpmsg)
|
||||
raise E(pcpmsg)
|
||||
else:
|
||||
raise RuntimeError(pcpmsg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
def addkey(self, Key):
|
||||
"""
|
||||
Add a secret key to the contect. Required for encryption
|
||||
and signing operations. If you don't add a secret key to
|
||||
the context, anonymous encryption will be done.
|
||||
|
||||
>>> key = Key(owner="alicia")
|
||||
>>> ctx.addkey(key)
|
||||
"""
|
||||
self._sk = Key._sk
|
||||
|
||||
def pubkeycount(self):
|
||||
"""
|
||||
Return the number of public keys known.
|
||||
"""
|
||||
return libpcp.pcphash_countpub(self._ctx)
|
||||
|
||||
def recipients(self, *recipients):
|
||||
"""
|
||||
Add one or more public keys. Required for encryption
|
||||
and signing operations.
|
||||
|
||||
>>> keyfile = open("bobby.pub", "r")
|
||||
>>> bobby = PublicKey(encoded=keyfile).read()
|
||||
>>> ctx.recipients(bobby)
|
||||
"""
|
||||
for key in recipients:
|
||||
libpcp.pcphash_add(self._ctx, key._pk, key._pk.type)
|
||||
|
||||
def encrypt(self, source=None, sign=False, passphrase=None, armor=False):
|
||||
"""
|
||||
Encrypt a file or a string.
|
||||
|
||||
Args:
|
||||
- 'source' can be a file handle (returned by open())
|
||||
or a python string containing the data to be encrypted
|
||||
- 'sign': if set to True, append a signature to encrypted output
|
||||
- 'passphrase': If set, do symmetric encryption (self mode)
|
||||
- 'armor': if set to True, encode the output using Z85
|
||||
|
||||
Returns:
|
||||
returns the encrypted result.
|
||||
|
||||
Encrypt asymmetrically:
|
||||
>>> seca = Key(encoded=alicesec, 'a') # import keys
|
||||
>>> secb = Key(encoded=bobsec, 'b')
|
||||
>>> puba = PublicKey(encoded=alicepub)
|
||||
>>> pubb = PublicKey(encoded=bobpub)
|
||||
>>>
|
||||
>>> ctxA = Context() # 2 contexts, one for sender, one for recipient
|
||||
>>> ctxA.addkey(seca)
|
||||
>>> ctxA.recipients(pubb)
|
||||
>>> ctxB = Context()
|
||||
>>> ctxB.addkey(secb)
|
||||
>>> ctxB.recipients(puba)
|
||||
>>>
|
||||
>>> encrypted = ctxA.encrypt(source='hello world') # encrypt alice => bob
|
||||
>>> decrypted = ctxB.decrypt(source=encrypted) # decrypt bob <= alice
|
||||
|
||||
Encrypt symmetrically:
|
||||
>>> ctx = Context()
|
||||
>>> encrypted = ctx.encrypt(source='hello world', passphrase='x')
|
||||
>>> decrypted = ctx.decrypt(source=encrypted, passphrase='x')
|
||||
|
||||
"""
|
||||
anon = 0
|
||||
dosign = 0
|
||||
instream = None
|
||||
outstream = None
|
||||
sk = self._sk
|
||||
|
||||
if source:
|
||||
instream = Stream(source)
|
||||
else:
|
||||
self.throw(RuntimeError, "source argument required")
|
||||
self.throw(None, "source argument required")
|
||||
|
||||
outstream = Stream()
|
||||
|
||||
@@ -53,12 +153,13 @@ class Context(object):
|
||||
if not self._sk:
|
||||
# no secret key known, anonymous mode
|
||||
anon = 1
|
||||
sk = libpcp.pcpkey_new()
|
||||
|
||||
if sign:
|
||||
dosign = 1
|
||||
|
||||
size = libpcp.pcp_encrypt_stream(self._ctx, instream._stream, outstream._stream,
|
||||
self._sk, self._ctx.pcppubkey_hash, dosign, anon)
|
||||
sk, self._ctx.pcppubkey_hash, dosign, anon)
|
||||
if size <= 0:
|
||||
self.throw(IOError, "failed to encrypt")
|
||||
|
||||
@@ -85,6 +186,20 @@ class Context(object):
|
||||
|
||||
|
||||
def decrypt(self, source=None, verify=False, passphrase=None):
|
||||
"""
|
||||
Decrypt a file or string
|
||||
|
||||
Args:
|
||||
- 'source' can be a file handle (returned by open())
|
||||
or a python string containing the data to be encrypted
|
||||
- 'verify': if set to True, verify a signature, if present
|
||||
- 'passphrase': If set, do symmetric encryption (self mode)
|
||||
|
||||
Returns:
|
||||
returns the decrypted result.
|
||||
|
||||
For usage example see encrypt().
|
||||
"""
|
||||
doverify = 0
|
||||
doanon = 0
|
||||
instream = None
|
||||
@@ -120,7 +235,7 @@ class Context(object):
|
||||
else:
|
||||
# asymmetric
|
||||
if not self._sk:
|
||||
self.throw(RuntimeError, "no secret key associated with current context")
|
||||
self.throw(None, "no secret key associated with current context")
|
||||
|
||||
if head[0] == PCP_ASYM_CIPHER_ANON:
|
||||
doanon = 1
|
||||
|
||||
Reference in New Issue
Block a user