From b06f07550957ba8ba4ff237332f16147a29a6dd2 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Tue, 14 Sep 2021 21:20:44 +0200 Subject: we're sigsum now --- siglog-witness.py | 505 ------------------------------------------------------ sigsum-witness.py | 505 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 505 insertions(+), 505 deletions(-) delete mode 100755 siglog-witness.py create mode 100755 sigsum-witness.py diff --git a/siglog-witness.py b/siglog-witness.py deleted file mode 100755 index 730c6b8..0000000 --- a/siglog-witness.py +++ /dev/null @@ -1,505 +0,0 @@ -#! /usr/bin/env python3 - -# Sign the most recently published tree head from a given ST log, -# after verifying a consistency proof from an already verified tree -# head to this new tree head. - -# A verified tree head is expected to be found in the file -# ~/.config/sigsum-witness/signed-tree-head . It's updated once a -# newer tree head has been verified successfully. - -# If the config file ~/.config/sigsum-witness/sigsum-witness.conf -# exists and is readable, options are read from it. Options read from -# the config file can be overridden on the command line. - -# Pubkey from secret key: -# sigkey = nacl.signing.SigningKey('badc0ffee123456...', nacl.encoding.HexEncoder) -# sigkey.verify_key.encode(nacl.encoding.HexEncoder) - -import sys -import os -from stat import * -import argparse -import requests -import struct -from binascii import hexlify, unhexlify -import nacl.encoding -import nacl.signing -from hashlib import sha256 -import time -from math import floor -from pathlib import PurePath - -BASE_URL_DEFAULT = 'http://poc.sigsum.org:4780/' -CONFIG_DIR_DEFAULT = os.path.expanduser('~/.config/sigsum-witness/') - -ERR_OK = 0 -ERR_USAGE = 1 -ERR_TREEHEAD_READ = 2 -ERR_TREEHEAD_FETCH = 3 -ERR_TREEHEAD_SIGNATURE_INVALID = 4 -ERR_TREEHEAD_INVALID = 5 -ERR_CONSISTENCYPROOF_FETCH = 6 -ERR_CONSISTENCYPROOF_INVALID = 7 -ERR_LOGKEY = 8 -ERR_LOGKEY_FORMAT = 9 -ERR_SIGKEYFILE = 10 -ERR_SIGKEYFILE_MISSING = 11 -ERR_SIGKEY_FORMAT = 12 -ERR_NYI = 13 -ERR_COSIG_POST = 14 - -class Parser: - def __init__(self): - p = argparse.ArgumentParser( - description='Sign the most recently published tree head from a given sigsum log, after verifying it against an older tree.') - - p.add_argument('--bootstrap-log', - action='store_true', - help="Sign and save fetched tree head without verifying a consistency proof against a previous tree head. " - "NOTE: Requires user intervention.") - - p.add_argument('-d', '--base-dir', - default=CONFIG_DIR_DEFAULT, - help="Configuration directory ({})".format(CONFIG_DIR_DEFAULT)) - - p.add_argument('-g', '--generate-signing-key', - action='store_true', - help="Generate signing key if missing. NOTE: Requires user intervention.") - - p.add_argument('-l', '--log-verification-key', - help="Log verification key") - - p.add_argument('--save-config', - action='store_true', - help="Save command line options to the configuration file") - - p.add_argument('-s', '--sigkey-file', - default='signing-key', - help="Signing key file, relative to $base_dir if not an absolute path (signing-key)") - - p.add_argument('-u', '--base-url', - default=BASE_URL_DEFAULT, - help="Log base URL ({})".format(BASE_URL_DEFAULT)) - - self.parser = p - -def parse_config(filename): - try: - lines = [] - with open(filename, 'r') as f: - line = f.readline() - while line: - lines.append(line.strip()) - line = f.readline() - g_args.parser.parse_args(lines, namespace=g_args) - except FileNotFoundError: - pass - -def parse_args(argv): - g_args.parser.parse_args(namespace=g_args) - -def parse_keyval(text): - dictx = {} - for line in text.split(): - (key, val) = line.split('=') - if not key in dictx: - dictx[key] = val - else: - if type(dictx[key]) is list: - dictx[key] += [val] - else: - dictx[key] = [dictx[key], val] - return dictx - -class TreeHead: - def __init__(self, sth_data): - self._text = parse_keyval(sth_data) - assert(len(self._text) == 4) - assert('timestamp' in self._text) - assert('tree_size' in self._text) - assert('root_hash' in self._text) - assert('signature' in self._text) - - @property - def timestamp(self): - return int(self._text['timestamp']) - - @property - def tree_size(self): - return int(self._text['tree_size']) - - @property - def root_hash(self): - return unhexlify(self._text['root_hash']) - - def text(self): - text = 'timestamp={}\n'.format(self._text['timestamp']) - text += 'tree_size={}\n'.format(self._text['tree_size']) - text += 'root_hash={}\n'.format(self._text['root_hash']) - text += 'signature={}\n'.format(self._text['signature']) - return text.encode('ascii') - - def serialise(self, pubkey): - data = struct.pack('!QQ', self.timestamp, self.tree_size) - data += unhexlify(self._text['root_hash']) - data += sha256(pubkey.encode()).digest() - assert(len(data) == 8 + 8 + 32 + 32) - return data - - def signature_valid(self, pubkey): - # Guard against tree head with >1 signature -- don't try to - # validate a cosigned tree head. - assert(type(self._text['signature']) is str) - sig = unhexlify(self._text['signature']) - assert(len(sig) == 64) - data = self.serialise(pubkey) - try: - verified_data = pubkey.verify(sig + data) - except nacl.exceptions.BadSignatureError: - return False - assert(verified_data == data) - return True - - def timestamp_valid(self, now): - ts_sec = self.timestamp - ts_asc = time.ctime(ts_sec) - if ts_sec < now - 12 * 3600: - return (ERR_OK, - "WARNING: Tree head timestamp too old: {} ({})".format(ts_sec, ts_asc)) - if ts_sec > now + 12 * 3600: - return (ERR_OK, - "WARNING: Tree head timestamp too new: {} ({})".format(ts_sec, ts_asc)) - - def history_valid(self, prev): - if self.tree_size < prev.tree_size: - return (ERR_TREEHEAD_INVALID, - "ERROR: Log is shrinking: {} < {} ".format(self.tree_size, - prev.tree_size)) - - if self.timestamp < prev.timestamp: - return (ERR_TREEHEAD_INVALID, - "ERROR: Log is time traveling: {} < {} ".format(time.ctime(self.timestamp), - time.ctime(prev.timestamp))) - - if self.timestamp == prev.timestamp and \ - self.root_hash == prev.root_hash and \ - self.tree_size == prev.tree_size: - return (ERR_OK, - "INFO: Fetched head of tree of size {} already seen".format(prev.tree_size)) - - if self.root_hash == prev.root_hash and \ - self.tree_size != prev.tree_size: - return (ERR_TREEHEAD_INVALID, - "ERROR: Tree size has changed but hash has not: " - "{}: {} != {}".format(self.root_hash, - self.tree_size, - prev.tree_size)) - - if self.root_hash != prev.root_hash and \ - self.tree_size == prev.tree_size: - return (ERR_TREEHEAD_INVALID, - "ERROR: Hash has changed but tree size has not: " - "{}: {} != {}".format(self.tree_size, - self.root_hash, - prev.root_hash)) - - # New timestamp but same hash and size is ok but there's no - # consistency to prove. - if self.root_hash == prev.root_hash: - assert(self.tree_size == prev.tree_size) - assert(self.timestamp != prev.timestamp) - print("INFO: Signing re-published head of tree of size {}".format(self.tree_size)) - return None # Success - - proof, err = fetch_consistency_proof(prev.tree_size, self.tree_size) - if err: return err - if not consistency_proof_valid(prev, self, proof): - errmsg = "ERROR: failing consistency proof check for {}->{}\n".format(prev.tree_size, - self.tree_size) - errmsg += "DEBUG: {}:{}->{}:{}\n {}".format(prev.tree_size, - prev.root_hash, - self.tree_size, - self.root_hash, - proof.path()) - return ERR_CONSISTENCYPROOF_INVALID, errmsg - - return None # Success - -class ConsistencyProof(): - def __init__(self, old_size, new_size, consistency_proof_data): - self._old_size = old_size - self._new_size = new_size - self._text = parse_keyval(consistency_proof_data) - assert(len(self._text) == 1) - assert('consistency_path' in self._text) - - def old_size(self): - return self._old_size - def new_size(self): - return self._new_size - def path(self): - if type(self._text['consistency_path']) is list: - return [unhexlify(e) for e in self._text['consistency_path']] - else: - return [unhexlify(self._text['consistency_path'])] - -def make_base_dir_maybe(): - dirname = os.path.expanduser(g_args.base_dir) - try: - os.stat(dirname) - except FileNotFoundError: - os.makedirs(dirname, mode=0o700) - -def read_tree_head(filename): - try: - with open(filename, mode='r') as f: - return TreeHead(f.read()) - except FileNotFoundError: - return None - -def read_tree_head_and_verify(log_verification_key): - fn = str(PurePath(os.path.expanduser(g_args.base_dir), 'signed-tree-head')) - tree_head = read_tree_head(fn) - if not tree_head: - return None, (ERR_TREEHEAD_READ, - "ERROR: unable to read file {}".format(fn)) - - if not tree_head.signature_valid(log_verification_key): - return None, (ERR_TREEHEAD_SIGNATURE_INVALID, - "ERROR: signature of stored tree head invalid") - - return tree_head, None - -def store_tree_head(tree_head): - path = str(PurePath(os.path.expanduser(g_args.base_dir), 'signed-tree-head')) - with open(path, mode='w+b') as f: - f.write(tree_head.text()) - -def fetch_tree_head_and_verify(log_verification_key): - req = requests.get(g_args.base_url + 'sigsum/v0/get-tree-head-to-sign') - if req.status_code != 200: - return None, (ERR_TREEHEAD_FETCH, - "ERROR: unable to fetch new tree head: {}".format(req.status_code)) - - tree_head = TreeHead(req.content.decode()) - if not tree_head.signature_valid(log_verification_key): - return None, (ERR_TREEHEAD_SIGNATURE_INVALID, - "ERROR: signature of fetched tree head invalid") - - return tree_head, None - -def fetch_consistency_proof(first, second): - post_data = 'old_size={}\n'.format(first) - post_data += 'new_size={}\n'.format(second) - req = requests.post(g_args.base_url + 'sigsum/v0/get-consistency-proof', post_data) - if req.status_code != 200: - return None, (ERR_CONSISTENCYPROOF_FETCH, - "ERROR: unable to fetch consistency proof: {}".format(req.status_code)) - return ConsistencyProof(first, second, req.content.decode()), None - -def numbits(n): - p = 0 - while n > 0: - if n & 1: - p += 1 - n >>= 1 - return p - -# Implements the algorithm for consistency proof verification outlined -# in RFC6962-BIS, see -# https://datatracker.ietf.org/doc/html/draft-ietf-trans-rfc6962-bis-39#section-2.1.4.2 -def consistency_proof_valid(first, second, proof): - assert(first.tree_size == proof.old_size()) - assert(second.tree_size == proof.new_size()) - - path = proof.path() - if len(path) == 0: - return False - if numbits(first.tree_size) == 1: - path = [first.root_hash] + path - - fn = first.tree_size - 1 - sn = second.tree_size - 1 - while fn & 1: - fn >>= 1 - sn >>= 1 - - fr = path[0] - sr = path[0] - - for c in path[1:]: - if sn == 0: - return False - - if fn & 1 or fn == sn: - fr = sha256(b'\x01' + c + fr).digest() - sr = sha256(b'\x01' + c + sr).digest() - while fn != 0 and fn & 1 == 0: - fn >>= 1 - sn >>= 1 - else: - sr = sha256(b'\x01' + sr + c).digest() - - fn >>= 1 - sn >>= 1 - - return sn == 0 and fr == first.root_hash and sr == second.root_hash - -def sign_send_store_tree_head(signing_key, log_key, tree_head): - signature = signing_key.sign(tree_head.serialise(log_key)).signature - hash = sha256(signing_key.verify_key.encode()) - - post_data = 'cosignature={}\n'.format(hexlify(signature).decode('ascii')) - post_data += 'key_hash={}\n'.format(hash.hexdigest()) - - req = requests.post(g_args.base_url + 'sigsum/v0/add-cosignature', post_data) - if req.status_code != 200: - return (ERR_COSIG_POST, - "ERROR: Unable to post signature to log: {} => {}: {}". format(req.url, - req.status_code, - req.text)) - # Store only when all else is done. Next invocation will treat a - # stored tree head as having been verified. - store_tree_head(tree_head) - -def ensure_log_verification_key(): - if not g_args.log_verification_key: - return None, (ERR_LOGKEY, "ERROR: missing log verification key") - try: - log_verification_key = nacl.signing.VerifyKey(g_args.log_verification_key, encoder=nacl.encoding.HexEncoder) - except: - return None, (ERR_LOGKEY_FORMAT, - "ERROR: invalid log verification key: {}".format(g_args.log_verification_key)) - - assert(log_verification_key is not None) - return log_verification_key, None - -def generate_and_store_sigkey(fn): - print("INFO: Generating signing key and writing it to {}".format(fn)) - signing_key = nacl.signing.SigningKey.generate() - verify_key = signing_key.verify_key - print("INFO: verification key: {}".format(verify_key.encode(nacl.encoding.HexEncoder).decode('ascii'))) - with open(fn, 'w') as f: - os.chmod(f.fileno(), S_IRUSR) - f.write(signing_key.encode(encoder=nacl.encoding.HexEncoder).decode('ascii')) - -def read_sigkeyfile(fn): - s = os.stat(fn, follow_symlinks=False) - if not S_ISREG(s.st_mode): - return None, (ERR_SIGKEYFILE, - "ERROR: Signing key file {} must be a regular file".format(fn)) - if S_IMODE(s.st_mode) & 0o077 != 0: - return None, (ERR_SIGKEYFILE, - "ERROR: Signing key file {} permissions too lax: {:04o}".format(fn, S_IMODE(s.st_mode))) - - with open(fn, 'r') as f: - try: - signing_key = nacl.signing.SigningKey(f.readline().strip(), nacl.encoding.HexEncoder) - except: - return None, (ERR_SIGKEY_FORMAT, - "ERROR: Invalid signing key in {}".format(fn)) - - assert(signing_key is not None) - return signing_key, None - - -# Read signature key from file, or generate one and write it to file. -def ensure_sigkey(fn): - try: - os.stat(fn, follow_symlinks=False) - except FileNotFoundError: - if not g_args.generate_signing_key: - return None, (ERR_SIGKEYFILE_MISSING, - "ERROR: Signing key file {} missing. " - "Use --generate-signing-key to create one.".format(fn)) - - if not user_confirm("Really generate a new signing key and store it in {}?".format(fn)): - return None, (ERR_SIGKEYFILE_MISSING, - "ERROR: Signing key file {} missing".format(fn)) - - generate_and_store_sigkey(fn) - return read_sigkeyfile(fn) - - if g_args.generate_signing_key: - return None, (ERR_USAGE, - "ERROR: Signing key file {} already existing".format(fn)) - return read_sigkeyfile(fn) - - -def user_confirm(prompt): - resp = input(prompt + ' y/n> ').lower() - if resp and resp[0] == 'y': - return True - return False - -def main(args): - global g_args - g_args = Parser() - parse_args(args) # get base_dir - parse_config(str(PurePath(g_args.base_dir, 'sigsum-witness.conf'))) - parse_args(args) # override config file options - if g_args.save_config: - # TODO write to config file - return ERR_NYI, "ERROR: --save-config is not yet implemented" - - now = floor(time.time()) - consistency_verified = False - ignore_consistency = False - - make_base_dir_maybe() - - log_verification_key, err = ensure_log_verification_key() - if err: return err - - signing_key, err = ensure_sigkey(str(PurePath(g_args.base_dir, g_args.sigkey_file))) - if err: return err - - cur_tree_head, err = read_tree_head_and_verify(log_verification_key) - if err: - new_tree_head, err2 = fetch_tree_head_and_verify(log_verification_key) - if err2: return err2 - - if not g_args.bootstrap_log: - return err - - print("\nWARNING: We have only seen one single tree head from the\n" - "log {},\n" - "representing a tree of size {}. We are therefore unable to\n" - "verify that the tree it represents is really a superset of an\n" - "earlier version of the tree in this log.\n" - "\nWe are effectively signing this tree head blindly.\n".format(g_args.base_url, - new_tree_head.tree_size)) - if user_confirm("Really sign head for tree of size {} and upload " - "the signature?".format(new_tree_head.tree_size)): - err3 = sign_send_store_tree_head(signing_key, log_verification_key, new_tree_head) - if err3: return err3 - - return 0, None - else: - if g_args.bootstrap_log: - return (ERR_USAGE, - "ERROR: Valid tree head found: --bootstrap-log not allowed") - - new_tree_head, err = fetch_tree_head_and_verify(log_verification_key) - if err: return err - - err = new_tree_head.timestamp_valid(now) - if err: return err - - err = new_tree_head.history_valid(cur_tree_head) - if err: return err - - if not cur_tree_head.signature_valid(log_verification_key): - return ERR_TREEHEAD_SIGNATURE_INVALID, "ERROR: signature of current tree head invalid" - - err = sign_send_store_tree_head(signing_key, log_verification_key, new_tree_head) - if err: return err - - return 0, None - -if __name__ == '__main__': - status = main(sys.argv) - if status[1]: - print(status[1]) - sys.exit(status[0]) diff --git a/sigsum-witness.py b/sigsum-witness.py new file mode 100755 index 0000000..730c6b8 --- /dev/null +++ b/sigsum-witness.py @@ -0,0 +1,505 @@ +#! /usr/bin/env python3 + +# Sign the most recently published tree head from a given ST log, +# after verifying a consistency proof from an already verified tree +# head to this new tree head. + +# A verified tree head is expected to be found in the file +# ~/.config/sigsum-witness/signed-tree-head . It's updated once a +# newer tree head has been verified successfully. + +# If the config file ~/.config/sigsum-witness/sigsum-witness.conf +# exists and is readable, options are read from it. Options read from +# the config file can be overridden on the command line. + +# Pubkey from secret key: +# sigkey = nacl.signing.SigningKey('badc0ffee123456...', nacl.encoding.HexEncoder) +# sigkey.verify_key.encode(nacl.encoding.HexEncoder) + +import sys +import os +from stat import * +import argparse +import requests +import struct +from binascii import hexlify, unhexlify +import nacl.encoding +import nacl.signing +from hashlib import sha256 +import time +from math import floor +from pathlib import PurePath + +BASE_URL_DEFAULT = 'http://poc.sigsum.org:4780/' +CONFIG_DIR_DEFAULT = os.path.expanduser('~/.config/sigsum-witness/') + +ERR_OK = 0 +ERR_USAGE = 1 +ERR_TREEHEAD_READ = 2 +ERR_TREEHEAD_FETCH = 3 +ERR_TREEHEAD_SIGNATURE_INVALID = 4 +ERR_TREEHEAD_INVALID = 5 +ERR_CONSISTENCYPROOF_FETCH = 6 +ERR_CONSISTENCYPROOF_INVALID = 7 +ERR_LOGKEY = 8 +ERR_LOGKEY_FORMAT = 9 +ERR_SIGKEYFILE = 10 +ERR_SIGKEYFILE_MISSING = 11 +ERR_SIGKEY_FORMAT = 12 +ERR_NYI = 13 +ERR_COSIG_POST = 14 + +class Parser: + def __init__(self): + p = argparse.ArgumentParser( + description='Sign the most recently published tree head from a given sigsum log, after verifying it against an older tree.') + + p.add_argument('--bootstrap-log', + action='store_true', + help="Sign and save fetched tree head without verifying a consistency proof against a previous tree head. " + "NOTE: Requires user intervention.") + + p.add_argument('-d', '--base-dir', + default=CONFIG_DIR_DEFAULT, + help="Configuration directory ({})".format(CONFIG_DIR_DEFAULT)) + + p.add_argument('-g', '--generate-signing-key', + action='store_true', + help="Generate signing key if missing. NOTE: Requires user intervention.") + + p.add_argument('-l', '--log-verification-key', + help="Log verification key") + + p.add_argument('--save-config', + action='store_true', + help="Save command line options to the configuration file") + + p.add_argument('-s', '--sigkey-file', + default='signing-key', + help="Signing key file, relative to $base_dir if not an absolute path (signing-key)") + + p.add_argument('-u', '--base-url', + default=BASE_URL_DEFAULT, + help="Log base URL ({})".format(BASE_URL_DEFAULT)) + + self.parser = p + +def parse_config(filename): + try: + lines = [] + with open(filename, 'r') as f: + line = f.readline() + while line: + lines.append(line.strip()) + line = f.readline() + g_args.parser.parse_args(lines, namespace=g_args) + except FileNotFoundError: + pass + +def parse_args(argv): + g_args.parser.parse_args(namespace=g_args) + +def parse_keyval(text): + dictx = {} + for line in text.split(): + (key, val) = line.split('=') + if not key in dictx: + dictx[key] = val + else: + if type(dictx[key]) is list: + dictx[key] += [val] + else: + dictx[key] = [dictx[key], val] + return dictx + +class TreeHead: + def __init__(self, sth_data): + self._text = parse_keyval(sth_data) + assert(len(self._text) == 4) + assert('timestamp' in self._text) + assert('tree_size' in self._text) + assert('root_hash' in self._text) + assert('signature' in self._text) + + @property + def timestamp(self): + return int(self._text['timestamp']) + + @property + def tree_size(self): + return int(self._text['tree_size']) + + @property + def root_hash(self): + return unhexlify(self._text['root_hash']) + + def text(self): + text = 'timestamp={}\n'.format(self._text['timestamp']) + text += 'tree_size={}\n'.format(self._text['tree_size']) + text += 'root_hash={}\n'.format(self._text['root_hash']) + text += 'signature={}\n'.format(self._text['signature']) + return text.encode('ascii') + + def serialise(self, pubkey): + data = struct.pack('!QQ', self.timestamp, self.tree_size) + data += unhexlify(self._text['root_hash']) + data += sha256(pubkey.encode()).digest() + assert(len(data) == 8 + 8 + 32 + 32) + return data + + def signature_valid(self, pubkey): + # Guard against tree head with >1 signature -- don't try to + # validate a cosigned tree head. + assert(type(self._text['signature']) is str) + sig = unhexlify(self._text['signature']) + assert(len(sig) == 64) + data = self.serialise(pubkey) + try: + verified_data = pubkey.verify(sig + data) + except nacl.exceptions.BadSignatureError: + return False + assert(verified_data == data) + return True + + def timestamp_valid(self, now): + ts_sec = self.timestamp + ts_asc = time.ctime(ts_sec) + if ts_sec < now - 12 * 3600: + return (ERR_OK, + "WARNING: Tree head timestamp too old: {} ({})".format(ts_sec, ts_asc)) + if ts_sec > now + 12 * 3600: + return (ERR_OK, + "WARNING: Tree head timestamp too new: {} ({})".format(ts_sec, ts_asc)) + + def history_valid(self, prev): + if self.tree_size < prev.tree_size: + return (ERR_TREEHEAD_INVALID, + "ERROR: Log is shrinking: {} < {} ".format(self.tree_size, + prev.tree_size)) + + if self.timestamp < prev.timestamp: + return (ERR_TREEHEAD_INVALID, + "ERROR: Log is time traveling: {} < {} ".format(time.ctime(self.timestamp), + time.ctime(prev.timestamp))) + + if self.timestamp == prev.timestamp and \ + self.root_hash == prev.root_hash and \ + self.tree_size == prev.tree_size: + return (ERR_OK, + "INFO: Fetched head of tree of size {} already seen".format(prev.tree_size)) + + if self.root_hash == prev.root_hash and \ + self.tree_size != prev.tree_size: + return (ERR_TREEHEAD_INVALID, + "ERROR: Tree size has changed but hash has not: " + "{}: {} != {}".format(self.root_hash, + self.tree_size, + prev.tree_size)) + + if self.root_hash != prev.root_hash and \ + self.tree_size == prev.tree_size: + return (ERR_TREEHEAD_INVALID, + "ERROR: Hash has changed but tree size has not: " + "{}: {} != {}".format(self.tree_size, + self.root_hash, + prev.root_hash)) + + # New timestamp but same hash and size is ok but there's no + # consistency to prove. + if self.root_hash == prev.root_hash: + assert(self.tree_size == prev.tree_size) + assert(self.timestamp != prev.timestamp) + print("INFO: Signing re-published head of tree of size {}".format(self.tree_size)) + return None # Success + + proof, err = fetch_consistency_proof(prev.tree_size, self.tree_size) + if err: return err + if not consistency_proof_valid(prev, self, proof): + errmsg = "ERROR: failing consistency proof check for {}->{}\n".format(prev.tree_size, + self.tree_size) + errmsg += "DEBUG: {}:{}->{}:{}\n {}".format(prev.tree_size, + prev.root_hash, + self.tree_size, + self.root_hash, + proof.path()) + return ERR_CONSISTENCYPROOF_INVALID, errmsg + + return None # Success + +class ConsistencyProof(): + def __init__(self, old_size, new_size, consistency_proof_data): + self._old_size = old_size + self._new_size = new_size + self._text = parse_keyval(consistency_proof_data) + assert(len(self._text) == 1) + assert('consistency_path' in self._text) + + def old_size(self): + return self._old_size + def new_size(self): + return self._new_size + def path(self): + if type(self._text['consistency_path']) is list: + return [unhexlify(e) for e in self._text['consistency_path']] + else: + return [unhexlify(self._text['consistency_path'])] + +def make_base_dir_maybe(): + dirname = os.path.expanduser(g_args.base_dir) + try: + os.stat(dirname) + except FileNotFoundError: + os.makedirs(dirname, mode=0o700) + +def read_tree_head(filename): + try: + with open(filename, mode='r') as f: + return TreeHead(f.read()) + except FileNotFoundError: + return None + +def read_tree_head_and_verify(log_verification_key): + fn = str(PurePath(os.path.expanduser(g_args.base_dir), 'signed-tree-head')) + tree_head = read_tree_head(fn) + if not tree_head: + return None, (ERR_TREEHEAD_READ, + "ERROR: unable to read file {}".format(fn)) + + if not tree_head.signature_valid(log_verification_key): + return None, (ERR_TREEHEAD_SIGNATURE_INVALID, + "ERROR: signature of stored tree head invalid") + + return tree_head, None + +def store_tree_head(tree_head): + path = str(PurePath(os.path.expanduser(g_args.base_dir), 'signed-tree-head')) + with open(path, mode='w+b') as f: + f.write(tree_head.text()) + +def fetch_tree_head_and_verify(log_verification_key): + req = requests.get(g_args.base_url + 'sigsum/v0/get-tree-head-to-sign') + if req.status_code != 200: + return None, (ERR_TREEHEAD_FETCH, + "ERROR: unable to fetch new tree head: {}".format(req.status_code)) + + tree_head = TreeHead(req.content.decode()) + if not tree_head.signature_valid(log_verification_key): + return None, (ERR_TREEHEAD_SIGNATURE_INVALID, + "ERROR: signature of fetched tree head invalid") + + return tree_head, None + +def fetch_consistency_proof(first, second): + post_data = 'old_size={}\n'.format(first) + post_data += 'new_size={}\n'.format(second) + req = requests.post(g_args.base_url + 'sigsum/v0/get-consistency-proof', post_data) + if req.status_code != 200: + return None, (ERR_CONSISTENCYPROOF_FETCH, + "ERROR: unable to fetch consistency proof: {}".format(req.status_code)) + return ConsistencyProof(first, second, req.content.decode()), None + +def numbits(n): + p = 0 + while n > 0: + if n & 1: + p += 1 + n >>= 1 + return p + +# Implements the algorithm for consistency proof verification outlined +# in RFC6962-BIS, see +# https://datatracker.ietf.org/doc/html/draft-ietf-trans-rfc6962-bis-39#section-2.1.4.2 +def consistency_proof_valid(first, second, proof): + assert(first.tree_size == proof.old_size()) + assert(second.tree_size == proof.new_size()) + + path = proof.path() + if len(path) == 0: + return False + if numbits(first.tree_size) == 1: + path = [first.root_hash] + path + + fn = first.tree_size - 1 + sn = second.tree_size - 1 + while fn & 1: + fn >>= 1 + sn >>= 1 + + fr = path[0] + sr = path[0] + + for c in path[1:]: + if sn == 0: + return False + + if fn & 1 or fn == sn: + fr = sha256(b'\x01' + c + fr).digest() + sr = sha256(b'\x01' + c + sr).digest() + while fn != 0 and fn & 1 == 0: + fn >>= 1 + sn >>= 1 + else: + sr = sha256(b'\x01' + sr + c).digest() + + fn >>= 1 + sn >>= 1 + + return sn == 0 and fr == first.root_hash and sr == second.root_hash + +def sign_send_store_tree_head(signing_key, log_key, tree_head): + signature = signing_key.sign(tree_head.serialise(log_key)).signature + hash = sha256(signing_key.verify_key.encode()) + + post_data = 'cosignature={}\n'.format(hexlify(signature).decode('ascii')) + post_data += 'key_hash={}\n'.format(hash.hexdigest()) + + req = requests.post(g_args.base_url + 'sigsum/v0/add-cosignature', post_data) + if req.status_code != 200: + return (ERR_COSIG_POST, + "ERROR: Unable to post signature to log: {} => {}: {}". format(req.url, + req.status_code, + req.text)) + # Store only when all else is done. Next invocation will treat a + # stored tree head as having been verified. + store_tree_head(tree_head) + +def ensure_log_verification_key(): + if not g_args.log_verification_key: + return None, (ERR_LOGKEY, "ERROR: missing log verification key") + try: + log_verification_key = nacl.signing.VerifyKey(g_args.log_verification_key, encoder=nacl.encoding.HexEncoder) + except: + return None, (ERR_LOGKEY_FORMAT, + "ERROR: invalid log verification key: {}".format(g_args.log_verification_key)) + + assert(log_verification_key is not None) + return log_verification_key, None + +def generate_and_store_sigkey(fn): + print("INFO: Generating signing key and writing it to {}".format(fn)) + signing_key = nacl.signing.SigningKey.generate() + verify_key = signing_key.verify_key + print("INFO: verification key: {}".format(verify_key.encode(nacl.encoding.HexEncoder).decode('ascii'))) + with open(fn, 'w') as f: + os.chmod(f.fileno(), S_IRUSR) + f.write(signing_key.encode(encoder=nacl.encoding.HexEncoder).decode('ascii')) + +def read_sigkeyfile(fn): + s = os.stat(fn, follow_symlinks=False) + if not S_ISREG(s.st_mode): + return None, (ERR_SIGKEYFILE, + "ERROR: Signing key file {} must be a regular file".format(fn)) + if S_IMODE(s.st_mode) & 0o077 != 0: + return None, (ERR_SIGKEYFILE, + "ERROR: Signing key file {} permissions too lax: {:04o}".format(fn, S_IMODE(s.st_mode))) + + with open(fn, 'r') as f: + try: + signing_key = nacl.signing.SigningKey(f.readline().strip(), nacl.encoding.HexEncoder) + except: + return None, (ERR_SIGKEY_FORMAT, + "ERROR: Invalid signing key in {}".format(fn)) + + assert(signing_key is not None) + return signing_key, None + + +# Read signature key from file, or generate one and write it to file. +def ensure_sigkey(fn): + try: + os.stat(fn, follow_symlinks=False) + except FileNotFoundError: + if not g_args.generate_signing_key: + return None, (ERR_SIGKEYFILE_MISSING, + "ERROR: Signing key file {} missing. " + "Use --generate-signing-key to create one.".format(fn)) + + if not user_confirm("Really generate a new signing key and store it in {}?".format(fn)): + return None, (ERR_SIGKEYFILE_MISSING, + "ERROR: Signing key file {} missing".format(fn)) + + generate_and_store_sigkey(fn) + return read_sigkeyfile(fn) + + if g_args.generate_signing_key: + return None, (ERR_USAGE, + "ERROR: Signing key file {} already existing".format(fn)) + return read_sigkeyfile(fn) + + +def user_confirm(prompt): + resp = input(prompt + ' y/n> ').lower() + if resp and resp[0] == 'y': + return True + return False + +def main(args): + global g_args + g_args = Parser() + parse_args(args) # get base_dir + parse_config(str(PurePath(g_args.base_dir, 'sigsum-witness.conf'))) + parse_args(args) # override config file options + if g_args.save_config: + # TODO write to config file + return ERR_NYI, "ERROR: --save-config is not yet implemented" + + now = floor(time.time()) + consistency_verified = False + ignore_consistency = False + + make_base_dir_maybe() + + log_verification_key, err = ensure_log_verification_key() + if err: return err + + signing_key, err = ensure_sigkey(str(PurePath(g_args.base_dir, g_args.sigkey_file))) + if err: return err + + cur_tree_head, err = read_tree_head_and_verify(log_verification_key) + if err: + new_tree_head, err2 = fetch_tree_head_and_verify(log_verification_key) + if err2: return err2 + + if not g_args.bootstrap_log: + return err + + print("\nWARNING: We have only seen one single tree head from the\n" + "log {},\n" + "representing a tree of size {}. We are therefore unable to\n" + "verify that the tree it represents is really a superset of an\n" + "earlier version of the tree in this log.\n" + "\nWe are effectively signing this tree head blindly.\n".format(g_args.base_url, + new_tree_head.tree_size)) + if user_confirm("Really sign head for tree of size {} and upload " + "the signature?".format(new_tree_head.tree_size)): + err3 = sign_send_store_tree_head(signing_key, log_verification_key, new_tree_head) + if err3: return err3 + + return 0, None + else: + if g_args.bootstrap_log: + return (ERR_USAGE, + "ERROR: Valid tree head found: --bootstrap-log not allowed") + + new_tree_head, err = fetch_tree_head_and_verify(log_verification_key) + if err: return err + + err = new_tree_head.timestamp_valid(now) + if err: return err + + err = new_tree_head.history_valid(cur_tree_head) + if err: return err + + if not cur_tree_head.signature_valid(log_verification_key): + return ERR_TREEHEAD_SIGNATURE_INVALID, "ERROR: signature of current tree head invalid" + + err = sign_send_store_tree_head(signing_key, log_verification_key, new_tree_head) + if err: return err + + return 0, None + +if __name__ == '__main__': + status = main(sys.argv) + if status[1]: + print(status[1]) + sys.exit(status[0]) -- cgit v1.2.3