From 9c01188671322d92e29d9610697402ab049e8882 Mon Sep 17 00:00:00 2001 From: Grégoire Détrez Date: Mon, 5 Sep 2022 16:44:42 +0200 Subject: Modularize: ASCII serialization/deserialization Add a sigsum.ascii module that handle Sigsum ASCII serialisation format and an associated test module. --- sigsum-witness.py | 53 ++++++++++++++++++++++++----------------------------- 1 file changed, 24 insertions(+), 29 deletions(-) (limited to 'sigsum-witness.py') diff --git a/sigsum-witness.py b/sigsum-witness.py index 5de433a..3f5ac38 100755 --- a/sigsum-witness.py +++ b/sigsum-witness.py @@ -23,7 +23,7 @@ import struct import sys import threading import time -from binascii import hexlify, unhexlify +from binascii import hexlify from hashlib import sha256 from math import floor from pathlib import PurePath @@ -34,6 +34,7 @@ import nacl.signing import prometheus_client as prometheus import requests +from sigsum import ascii from tools.libsigntools import ssh_to_sign BASE_URL_DEFAULT = 'http://poc.sigsum.org:4780/' @@ -169,44 +170,39 @@ def parse_keyval(text): class TreeHead: def __init__(self, sth_data): - self._text = parse_keyval(sth_data) - assert(len(self._text) == 4) - assert('timestamp' in self._text) - assert('tree_size' in self._text) - assert('root_hash' in self._text) - assert('signature' in self._text) + self._data = ascii.loads(sth_data) + assert(len(self._data) == 4) + assert('timestamp' in self._data) + assert('tree_size' in self._data) + assert('root_hash' in self._data) + assert('signature' in self._data) @property def timestamp(self): - return int(self._text['timestamp']) + return self._data.getint('timestamp') @property def tree_size(self): - return int(self._text['tree_size']) + return self._data.getint('tree_size') @property def root_hash(self): - return unhexlify(self._text['root_hash']) + return self._data.getbytes('root_hash') def text(self): - text = 'timestamp={}\n'.format(self._text['timestamp']) - text += 'tree_size={}\n'.format(self._text['tree_size']) - text += 'root_hash={}\n'.format(self._text['root_hash']) - text += 'signature={}\n'.format(self._text['signature']) - return text.encode('ascii') + return ascii.dumps(self._data).encode('ascii') def to_signed_data(self, pubkey): namespace = 'tree_head:v0:{}@sigsum.org'.format(hexlify(sha256(pubkey.encode()).digest()).decode()) msg = struct.pack('!QQ', self.timestamp, self.tree_size) - msg += unhexlify(self._text['root_hash']) + msg += self.root_hash assert(len(msg) == 8 + 8 + 32) return ssh_to_sign(namespace, 'sha256', sha256(msg).digest()) def signature_valid(self, pubkey): # Guard against tree head with >1 signature -- don't try to # validate a cosigned tree head. - assert(type(self._text['signature']) is str) - sig = unhexlify(self._text['signature']) + sig = self._data.getbytes('signature') assert(len(sig) == 64) data = self.to_signed_data(pubkey) try: @@ -286,19 +282,18 @@ class ConsistencyProof(): def __init__(self, old_size, new_size, consistency_proof_data): self._old_size = old_size self._new_size = new_size - self._text = parse_keyval(consistency_proof_data) - assert(len(self._text) == 1) - assert('consistency_path' in self._text) + self._data = ascii.loads(consistency_proof_data) + assert(len(self._data) == 1) + assert('consistency_path' in self._data) def old_size(self): return self._old_size def new_size(self): return self._new_size + def path(self): - if type(self._text['consistency_path']) is list: - return [unhexlify(e) for e in self._text['consistency_path']] - else: - return [unhexlify(self._text['consistency_path'])] + return self._data.getbytes('consistency_path', many=True) + def make_base_dir_maybe(): dirname = os.path.expanduser(g_args.base_dir) @@ -410,10 +405,10 @@ def consistency_proof_valid(first, second, proof): def sign_send_store_tree_head(signing_key, log_key, tree_head): signature = signing_key.sign(tree_head.to_signed_data(log_key)).signature hash = sha256(signing_key.verify_key.encode()) - - post_data = 'cosignature={}\n'.format(hexlify(signature).decode('ascii')) - post_data += 'key_hash={}\n'.format(hash.hexdigest()) - + post_data = ascii.dumps({ + 'cosignature': signature.hex(), + 'key_hash': hash.hexdigest(), + }) try: req = requests.post(g_args.base_url + 'sigsum/v0/add-cosignature', post_data) except requests.ConnectionError as err: -- cgit v1.2.3