aboutsummaryrefslogtreecommitdiff
path: root/sigsum-witness.py
diff options
context:
space:
mode:
authorGrégoire Détrez <gregoire@mullvad.net>2022-09-05 16:44:42 +0200
committerGrégoire Détrez <gregoire@mullvad.net>2022-09-05 16:50:42 +0200
commit9c01188671322d92e29d9610697402ab049e8882 (patch)
tree0053fb67eae3479210bd41f31de8348e1b684664 /sigsum-witness.py
parent3b4bbebc98db8411b996291ae30c5d15800ea107 (diff)
Modularize: ASCII serialization/deserializationgregoire/modules
Add a sigsum.ascii module that handle Sigsum ASCII serialisation format and an associated test module.
Diffstat (limited to 'sigsum-witness.py')
-rwxr-xr-xsigsum-witness.py53
1 files changed, 24 insertions, 29 deletions
diff --git a/sigsum-witness.py b/sigsum-witness.py
index 5de433a..3f5ac38 100755
--- a/sigsum-witness.py
+++ b/sigsum-witness.py
@@ -23,7 +23,7 @@ import struct
import sys
import threading
import time
-from binascii import hexlify, unhexlify
+from binascii import hexlify
from hashlib import sha256
from math import floor
from pathlib import PurePath
@@ -34,6 +34,7 @@ import nacl.signing
import prometheus_client as prometheus
import requests
+from sigsum import ascii
from tools.libsigntools import ssh_to_sign
BASE_URL_DEFAULT = 'http://poc.sigsum.org:4780/'
@@ -169,44 +170,39 @@ def parse_keyval(text):
class TreeHead:
def __init__(self, sth_data):
- self._text = parse_keyval(sth_data)
- assert(len(self._text) == 4)
- assert('timestamp' in self._text)
- assert('tree_size' in self._text)
- assert('root_hash' in self._text)
- assert('signature' in self._text)
+ self._data = ascii.loads(sth_data)
+ assert(len(self._data) == 4)
+ assert('timestamp' in self._data)
+ assert('tree_size' in self._data)
+ assert('root_hash' in self._data)
+ assert('signature' in self._data)
@property
def timestamp(self):
- return int(self._text['timestamp'])
+ return self._data.getint('timestamp')
@property
def tree_size(self):
- return int(self._text['tree_size'])
+ return self._data.getint('tree_size')
@property
def root_hash(self):
- return unhexlify(self._text['root_hash'])
+ return self._data.getbytes('root_hash')
def text(self):
- text = 'timestamp={}\n'.format(self._text['timestamp'])
- text += 'tree_size={}\n'.format(self._text['tree_size'])
- text += 'root_hash={}\n'.format(self._text['root_hash'])
- text += 'signature={}\n'.format(self._text['signature'])
- return text.encode('ascii')
+ return ascii.dumps(self._data).encode('ascii')
def to_signed_data(self, pubkey):
namespace = 'tree_head:v0:{}@sigsum.org'.format(hexlify(sha256(pubkey.encode()).digest()).decode())
msg = struct.pack('!QQ', self.timestamp, self.tree_size)
- msg += unhexlify(self._text['root_hash'])
+ msg += self.root_hash
assert(len(msg) == 8 + 8 + 32)
return ssh_to_sign(namespace, 'sha256', sha256(msg).digest())
def signature_valid(self, pubkey):
# Guard against tree head with >1 signature -- don't try to
# validate a cosigned tree head.
- assert(type(self._text['signature']) is str)
- sig = unhexlify(self._text['signature'])
+ sig = self._data.getbytes('signature')
assert(len(sig) == 64)
data = self.to_signed_data(pubkey)
try:
@@ -286,19 +282,18 @@ class ConsistencyProof():
def __init__(self, old_size, new_size, consistency_proof_data):
self._old_size = old_size
self._new_size = new_size
- self._text = parse_keyval(consistency_proof_data)
- assert(len(self._text) == 1)
- assert('consistency_path' in self._text)
+ self._data = ascii.loads(consistency_proof_data)
+ assert(len(self._data) == 1)
+ assert('consistency_path' in self._data)
def old_size(self):
return self._old_size
def new_size(self):
return self._new_size
+
def path(self):
- if type(self._text['consistency_path']) is list:
- return [unhexlify(e) for e in self._text['consistency_path']]
- else:
- return [unhexlify(self._text['consistency_path'])]
+ return self._data.getbytes('consistency_path', many=True)
+
def make_base_dir_maybe():
dirname = os.path.expanduser(g_args.base_dir)
@@ -410,10 +405,10 @@ def consistency_proof_valid(first, second, proof):
def sign_send_store_tree_head(signing_key, log_key, tree_head):
signature = signing_key.sign(tree_head.to_signed_data(log_key)).signature
hash = sha256(signing_key.verify_key.encode())
-
- post_data = 'cosignature={}\n'.format(hexlify(signature).decode('ascii'))
- post_data += 'key_hash={}\n'.format(hash.hexdigest())
-
+ post_data = ascii.dumps({
+ 'cosignature': signature.hex(),
+ 'key_hash': hash.hexdigest(),
+ })
try:
req = requests.post(g_args.base_url + 'sigsum/v0/add-cosignature', post_data)
except requests.ConnectionError as err: