aboutsummaryrefslogtreecommitdiff
path: root/siglog-witness.py
diff options
context:
space:
mode:
Diffstat (limited to 'siglog-witness.py')
-rwxr-xr-xsiglog-witness.py505
1 files changed, 0 insertions, 505 deletions
diff --git a/siglog-witness.py b/siglog-witness.py
deleted file mode 100755
index 730c6b8..0000000
--- a/siglog-witness.py
+++ /dev/null
@@ -1,505 +0,0 @@
-#! /usr/bin/env python3
-
-# Sign the most recently published tree head from a given ST log,
-# after verifying a consistency proof from an already verified tree
-# head to this new tree head.
-
-# A verified tree head is expected to be found in the file
-# ~/.config/sigsum-witness/signed-tree-head . It's updated once a
-# newer tree head has been verified successfully.
-
-# If the config file ~/.config/sigsum-witness/sigsum-witness.conf
-# exists and is readable, options are read from it. Options read from
-# the config file can be overridden on the command line.
-
-# Pubkey from secret key:
-# sigkey = nacl.signing.SigningKey('badc0ffee123456...', nacl.encoding.HexEncoder)
-# sigkey.verify_key.encode(nacl.encoding.HexEncoder)
-
-import sys
-import os
-from stat import *
-import argparse
-import requests
-import struct
-from binascii import hexlify, unhexlify
-import nacl.encoding
-import nacl.signing
-from hashlib import sha256
-import time
-from math import floor
-from pathlib import PurePath
-
-BASE_URL_DEFAULT = 'http://poc.sigsum.org:4780/'
-CONFIG_DIR_DEFAULT = os.path.expanduser('~/.config/sigsum-witness/')
-
-ERR_OK = 0
-ERR_USAGE = 1
-ERR_TREEHEAD_READ = 2
-ERR_TREEHEAD_FETCH = 3
-ERR_TREEHEAD_SIGNATURE_INVALID = 4
-ERR_TREEHEAD_INVALID = 5
-ERR_CONSISTENCYPROOF_FETCH = 6
-ERR_CONSISTENCYPROOF_INVALID = 7
-ERR_LOGKEY = 8
-ERR_LOGKEY_FORMAT = 9
-ERR_SIGKEYFILE = 10
-ERR_SIGKEYFILE_MISSING = 11
-ERR_SIGKEY_FORMAT = 12
-ERR_NYI = 13
-ERR_COSIG_POST = 14
-
-class Parser:
- def __init__(self):
- p = argparse.ArgumentParser(
- description='Sign the most recently published tree head from a given sigsum log, after verifying it against an older tree.')
-
- p.add_argument('--bootstrap-log',
- action='store_true',
- help="Sign and save fetched tree head without verifying a consistency proof against a previous tree head. "
- "NOTE: Requires user intervention.")
-
- p.add_argument('-d', '--base-dir',
- default=CONFIG_DIR_DEFAULT,
- help="Configuration directory ({})".format(CONFIG_DIR_DEFAULT))
-
- p.add_argument('-g', '--generate-signing-key',
- action='store_true',
- help="Generate signing key if missing. NOTE: Requires user intervention.")
-
- p.add_argument('-l', '--log-verification-key',
- help="Log verification key")
-
- p.add_argument('--save-config',
- action='store_true',
- help="Save command line options to the configuration file")
-
- p.add_argument('-s', '--sigkey-file',
- default='signing-key',
- help="Signing key file, relative to $base_dir if not an absolute path (signing-key)")
-
- p.add_argument('-u', '--base-url',
- default=BASE_URL_DEFAULT,
- help="Log base URL ({})".format(BASE_URL_DEFAULT))
-
- self.parser = p
-
-def parse_config(filename):
- try:
- lines = []
- with open(filename, 'r') as f:
- line = f.readline()
- while line:
- lines.append(line.strip())
- line = f.readline()
- g_args.parser.parse_args(lines, namespace=g_args)
- except FileNotFoundError:
- pass
-
-def parse_args(argv):
- g_args.parser.parse_args(namespace=g_args)
-
-def parse_keyval(text):
- dictx = {}
- for line in text.split():
- (key, val) = line.split('=')
- if not key in dictx:
- dictx[key] = val
- else:
- if type(dictx[key]) is list:
- dictx[key] += [val]
- else:
- dictx[key] = [dictx[key], val]
- return dictx
-
-class TreeHead:
- def __init__(self, sth_data):
- self._text = parse_keyval(sth_data)
- assert(len(self._text) == 4)
- assert('timestamp' in self._text)
- assert('tree_size' in self._text)
- assert('root_hash' in self._text)
- assert('signature' in self._text)
-
- @property
- def timestamp(self):
- return int(self._text['timestamp'])
-
- @property
- def tree_size(self):
- return int(self._text['tree_size'])
-
- @property
- def root_hash(self):
- return unhexlify(self._text['root_hash'])
-
- def text(self):
- text = 'timestamp={}\n'.format(self._text['timestamp'])
- text += 'tree_size={}\n'.format(self._text['tree_size'])
- text += 'root_hash={}\n'.format(self._text['root_hash'])
- text += 'signature={}\n'.format(self._text['signature'])
- return text.encode('ascii')
-
- def serialise(self, pubkey):
- data = struct.pack('!QQ', self.timestamp, self.tree_size)
- data += unhexlify(self._text['root_hash'])
- data += sha256(pubkey.encode()).digest()
- assert(len(data) == 8 + 8 + 32 + 32)
- return data
-
- def signature_valid(self, pubkey):
- # Guard against tree head with >1 signature -- don't try to
- # validate a cosigned tree head.
- assert(type(self._text['signature']) is str)
- sig = unhexlify(self._text['signature'])
- assert(len(sig) == 64)
- data = self.serialise(pubkey)
- try:
- verified_data = pubkey.verify(sig + data)
- except nacl.exceptions.BadSignatureError:
- return False
- assert(verified_data == data)
- return True
-
- def timestamp_valid(self, now):
- ts_sec = self.timestamp
- ts_asc = time.ctime(ts_sec)
- if ts_sec < now - 12 * 3600:
- return (ERR_OK,
- "WARNING: Tree head timestamp too old: {} ({})".format(ts_sec, ts_asc))
- if ts_sec > now + 12 * 3600:
- return (ERR_OK,
- "WARNING: Tree head timestamp too new: {} ({})".format(ts_sec, ts_asc))
-
- def history_valid(self, prev):
- if self.tree_size < prev.tree_size:
- return (ERR_TREEHEAD_INVALID,
- "ERROR: Log is shrinking: {} < {} ".format(self.tree_size,
- prev.tree_size))
-
- if self.timestamp < prev.timestamp:
- return (ERR_TREEHEAD_INVALID,
- "ERROR: Log is time traveling: {} < {} ".format(time.ctime(self.timestamp),
- time.ctime(prev.timestamp)))
-
- if self.timestamp == prev.timestamp and \
- self.root_hash == prev.root_hash and \
- self.tree_size == prev.tree_size:
- return (ERR_OK,
- "INFO: Fetched head of tree of size {} already seen".format(prev.tree_size))
-
- if self.root_hash == prev.root_hash and \
- self.tree_size != prev.tree_size:
- return (ERR_TREEHEAD_INVALID,
- "ERROR: Tree size has changed but hash has not: "
- "{}: {} != {}".format(self.root_hash,
- self.tree_size,
- prev.tree_size))
-
- if self.root_hash != prev.root_hash and \
- self.tree_size == prev.tree_size:
- return (ERR_TREEHEAD_INVALID,
- "ERROR: Hash has changed but tree size has not: "
- "{}: {} != {}".format(self.tree_size,
- self.root_hash,
- prev.root_hash))
-
- # New timestamp but same hash and size is ok but there's no
- # consistency to prove.
- if self.root_hash == prev.root_hash:
- assert(self.tree_size == prev.tree_size)
- assert(self.timestamp != prev.timestamp)
- print("INFO: Signing re-published head of tree of size {}".format(self.tree_size))
- return None # Success
-
- proof, err = fetch_consistency_proof(prev.tree_size, self.tree_size)
- if err: return err
- if not consistency_proof_valid(prev, self, proof):
- errmsg = "ERROR: failing consistency proof check for {}->{}\n".format(prev.tree_size,
- self.tree_size)
- errmsg += "DEBUG: {}:{}->{}:{}\n {}".format(prev.tree_size,
- prev.root_hash,
- self.tree_size,
- self.root_hash,
- proof.path())
- return ERR_CONSISTENCYPROOF_INVALID, errmsg
-
- return None # Success
-
-class ConsistencyProof():
- def __init__(self, old_size, new_size, consistency_proof_data):
- self._old_size = old_size
- self._new_size = new_size
- self._text = parse_keyval(consistency_proof_data)
- assert(len(self._text) == 1)
- assert('consistency_path' in self._text)
-
- def old_size(self):
- return self._old_size
- def new_size(self):
- return self._new_size
- def path(self):
- if type(self._text['consistency_path']) is list:
- return [unhexlify(e) for e in self._text['consistency_path']]
- else:
- return [unhexlify(self._text['consistency_path'])]
-
-def make_base_dir_maybe():
- dirname = os.path.expanduser(g_args.base_dir)
- try:
- os.stat(dirname)
- except FileNotFoundError:
- os.makedirs(dirname, mode=0o700)
-
-def read_tree_head(filename):
- try:
- with open(filename, mode='r') as f:
- return TreeHead(f.read())
- except FileNotFoundError:
- return None
-
-def read_tree_head_and_verify(log_verification_key):
- fn = str(PurePath(os.path.expanduser(g_args.base_dir), 'signed-tree-head'))
- tree_head = read_tree_head(fn)
- if not tree_head:
- return None, (ERR_TREEHEAD_READ,
- "ERROR: unable to read file {}".format(fn))
-
- if not tree_head.signature_valid(log_verification_key):
- return None, (ERR_TREEHEAD_SIGNATURE_INVALID,
- "ERROR: signature of stored tree head invalid")
-
- return tree_head, None
-
-def store_tree_head(tree_head):
- path = str(PurePath(os.path.expanduser(g_args.base_dir), 'signed-tree-head'))
- with open(path, mode='w+b') as f:
- f.write(tree_head.text())
-
-def fetch_tree_head_and_verify(log_verification_key):
- req = requests.get(g_args.base_url + 'sigsum/v0/get-tree-head-to-sign')
- if req.status_code != 200:
- return None, (ERR_TREEHEAD_FETCH,
- "ERROR: unable to fetch new tree head: {}".format(req.status_code))
-
- tree_head = TreeHead(req.content.decode())
- if not tree_head.signature_valid(log_verification_key):
- return None, (ERR_TREEHEAD_SIGNATURE_INVALID,
- "ERROR: signature of fetched tree head invalid")
-
- return tree_head, None
-
-def fetch_consistency_proof(first, second):
- post_data = 'old_size={}\n'.format(first)
- post_data += 'new_size={}\n'.format(second)
- req = requests.post(g_args.base_url + 'sigsum/v0/get-consistency-proof', post_data)
- if req.status_code != 200:
- return None, (ERR_CONSISTENCYPROOF_FETCH,
- "ERROR: unable to fetch consistency proof: {}".format(req.status_code))
- return ConsistencyProof(first, second, req.content.decode()), None
-
-def numbits(n):
- p = 0
- while n > 0:
- if n & 1:
- p += 1
- n >>= 1
- return p
-
-# Implements the algorithm for consistency proof verification outlined
-# in RFC6962-BIS, see
-# https://datatracker.ietf.org/doc/html/draft-ietf-trans-rfc6962-bis-39#section-2.1.4.2
-def consistency_proof_valid(first, second, proof):
- assert(first.tree_size == proof.old_size())
- assert(second.tree_size == proof.new_size())
-
- path = proof.path()
- if len(path) == 0:
- return False
- if numbits(first.tree_size) == 1:
- path = [first.root_hash] + path
-
- fn = first.tree_size - 1
- sn = second.tree_size - 1
- while fn & 1:
- fn >>= 1
- sn >>= 1
-
- fr = path[0]
- sr = path[0]
-
- for c in path[1:]:
- if sn == 0:
- return False
-
- if fn & 1 or fn == sn:
- fr = sha256(b'\x01' + c + fr).digest()
- sr = sha256(b'\x01' + c + sr).digest()
- while fn != 0 and fn & 1 == 0:
- fn >>= 1
- sn >>= 1
- else:
- sr = sha256(b'\x01' + sr + c).digest()
-
- fn >>= 1
- sn >>= 1
-
- return sn == 0 and fr == first.root_hash and sr == second.root_hash
-
-def sign_send_store_tree_head(signing_key, log_key, tree_head):
- signature = signing_key.sign(tree_head.serialise(log_key)).signature
- hash = sha256(signing_key.verify_key.encode())
-
- post_data = 'cosignature={}\n'.format(hexlify(signature).decode('ascii'))
- post_data += 'key_hash={}\n'.format(hash.hexdigest())
-
- req = requests.post(g_args.base_url + 'sigsum/v0/add-cosignature', post_data)
- if req.status_code != 200:
- return (ERR_COSIG_POST,
- "ERROR: Unable to post signature to log: {} => {}: {}". format(req.url,
- req.status_code,
- req.text))
- # Store only when all else is done. Next invocation will treat a
- # stored tree head as having been verified.
- store_tree_head(tree_head)
-
-def ensure_log_verification_key():
- if not g_args.log_verification_key:
- return None, (ERR_LOGKEY, "ERROR: missing log verification key")
- try:
- log_verification_key = nacl.signing.VerifyKey(g_args.log_verification_key, encoder=nacl.encoding.HexEncoder)
- except:
- return None, (ERR_LOGKEY_FORMAT,
- "ERROR: invalid log verification key: {}".format(g_args.log_verification_key))
-
- assert(log_verification_key is not None)
- return log_verification_key, None
-
-def generate_and_store_sigkey(fn):
- print("INFO: Generating signing key and writing it to {}".format(fn))
- signing_key = nacl.signing.SigningKey.generate()
- verify_key = signing_key.verify_key
- print("INFO: verification key: {}".format(verify_key.encode(nacl.encoding.HexEncoder).decode('ascii')))
- with open(fn, 'w') as f:
- os.chmod(f.fileno(), S_IRUSR)
- f.write(signing_key.encode(encoder=nacl.encoding.HexEncoder).decode('ascii'))
-
-def read_sigkeyfile(fn):
- s = os.stat(fn, follow_symlinks=False)
- if not S_ISREG(s.st_mode):
- return None, (ERR_SIGKEYFILE,
- "ERROR: Signing key file {} must be a regular file".format(fn))
- if S_IMODE(s.st_mode) & 0o077 != 0:
- return None, (ERR_SIGKEYFILE,
- "ERROR: Signing key file {} permissions too lax: {:04o}".format(fn, S_IMODE(s.st_mode)))
-
- with open(fn, 'r') as f:
- try:
- signing_key = nacl.signing.SigningKey(f.readline().strip(), nacl.encoding.HexEncoder)
- except:
- return None, (ERR_SIGKEY_FORMAT,
- "ERROR: Invalid signing key in {}".format(fn))
-
- assert(signing_key is not None)
- return signing_key, None
-
-
-# Read signature key from file, or generate one and write it to file.
-def ensure_sigkey(fn):
- try:
- os.stat(fn, follow_symlinks=False)
- except FileNotFoundError:
- if not g_args.generate_signing_key:
- return None, (ERR_SIGKEYFILE_MISSING,
- "ERROR: Signing key file {} missing. "
- "Use --generate-signing-key to create one.".format(fn))
-
- if not user_confirm("Really generate a new signing key and store it in {}?".format(fn)):
- return None, (ERR_SIGKEYFILE_MISSING,
- "ERROR: Signing key file {} missing".format(fn))
-
- generate_and_store_sigkey(fn)
- return read_sigkeyfile(fn)
-
- if g_args.generate_signing_key:
- return None, (ERR_USAGE,
- "ERROR: Signing key file {} already existing".format(fn))
- return read_sigkeyfile(fn)
-
-
-def user_confirm(prompt):
- resp = input(prompt + ' y/n> ').lower()
- if resp and resp[0] == 'y':
- return True
- return False
-
-def main(args):
- global g_args
- g_args = Parser()
- parse_args(args) # get base_dir
- parse_config(str(PurePath(g_args.base_dir, 'sigsum-witness.conf')))
- parse_args(args) # override config file options
- if g_args.save_config:
- # TODO write to config file
- return ERR_NYI, "ERROR: --save-config is not yet implemented"
-
- now = floor(time.time())
- consistency_verified = False
- ignore_consistency = False
-
- make_base_dir_maybe()
-
- log_verification_key, err = ensure_log_verification_key()
- if err: return err
-
- signing_key, err = ensure_sigkey(str(PurePath(g_args.base_dir, g_args.sigkey_file)))
- if err: return err
-
- cur_tree_head, err = read_tree_head_and_verify(log_verification_key)
- if err:
- new_tree_head, err2 = fetch_tree_head_and_verify(log_verification_key)
- if err2: return err2
-
- if not g_args.bootstrap_log:
- return err
-
- print("\nWARNING: We have only seen one single tree head from the\n"
- "log {},\n"
- "representing a tree of size {}. We are therefore unable to\n"
- "verify that the tree it represents is really a superset of an\n"
- "earlier version of the tree in this log.\n"
- "\nWe are effectively signing this tree head blindly.\n".format(g_args.base_url,
- new_tree_head.tree_size))
- if user_confirm("Really sign head for tree of size {} and upload "
- "the signature?".format(new_tree_head.tree_size)):
- err3 = sign_send_store_tree_head(signing_key, log_verification_key, new_tree_head)
- if err3: return err3
-
- return 0, None
- else:
- if g_args.bootstrap_log:
- return (ERR_USAGE,
- "ERROR: Valid tree head found: --bootstrap-log not allowed")
-
- new_tree_head, err = fetch_tree_head_and_verify(log_verification_key)
- if err: return err
-
- err = new_tree_head.timestamp_valid(now)
- if err: return err
-
- err = new_tree_head.history_valid(cur_tree_head)
- if err: return err
-
- if not cur_tree_head.signature_valid(log_verification_key):
- return ERR_TREEHEAD_SIGNATURE_INVALID, "ERROR: signature of current tree head invalid"
-
- err = sign_send_store_tree_head(signing_key, log_verification_key, new_tree_head)
- if err: return err
-
- return 0, None
-
-if __name__ == '__main__':
- status = main(sys.argv)
- if status[1]:
- print(status[1])
- sys.exit(status[0])