summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordberg.se>2021-06-02 23:15:26 +0200
committerLinus Nordberg <linus@nordberg.se>2021-06-02 23:19:46 +0200
commit4c61f7e4c4c471ae2fdf6aee13c2d147eed7282d (patch)
tree70ae9905f1e10a2878cc8c68ac4d14c7ec4a5735
implement witness
First stab, rough edges, unstable interfaces, will break things, you don't want to use it.
-rwxr-xr-xsiglog-witness.py341
1 files changed, 341 insertions, 0 deletions
diff --git a/siglog-witness.py b/siglog-witness.py
new file mode 100755
index 0000000..ece3957
--- /dev/null
+++ b/siglog-witness.py
@@ -0,0 +1,341 @@
+#! /usr/bin/env python3
+
+# Sign the most recently published tree head from a given ST log,
+# after verifying a consistency proof from an already verified tree
+# head to this new tree head.
+
+# A verified tree head is expected to be found in the file
+# ~/.config/siglog-witness/signed_tree_head . It's updated once a
+# newer tree head has been verified successfully.
+
+# If the config file ~/.config/siglog-witness/siglog-witness.conf
+# exists and is readable, options are read from it. Options read from
+# the config file can be overridden on the command line.
+
+import sys
+import os
+from stat import *
+import argparse
+import requests
+import struct
+from binascii import hexlify, unhexlify
+import nacl.encoding
+import nacl.signing
+from hashlib import sha256
+
+# TODO maybe stop mixing dashes and underscores in directory names and filenames
+
+BASE_URL_DEFAULT = 'http://tlog-poc.system-transparency.org:6965/'
+CONFIG_DIR_DEFAULT = os.path.expanduser('~/.config/siglog-witness/')
+SIGKEY_FILE_DEFAULT = CONFIG_DIR_DEFAULT + 'signing_key'
+
+CONFIG_FILE = CONFIG_DIR_DEFAULT + 'siglog-witness.conf'
+
+class Parser:
+ def __init__(self):
+ p = argparse.ArgumentParser(
+ description='Sign the most recently published tree head from a given siglog, after verifying it against an older tree.')
+
+ p.add_argument('-b', '--bootstrap-log',
+ action='store_true',
+ help="Sign and save fetched tree head without verifying a consistency proof against a previous tree head. NOTE: User intervention required.")
+
+ p.add_argument('-d', '--base-dir',
+ default=CONFIG_DIR_DEFAULT,
+ help="Configuration directory ({})".format(CONFIG_DIR_DEFAULT))
+
+ p.add_argument('-l', '--log-verification-key',
+ help="Log verification key")
+
+ p.add_argument('--save-config',
+ action='store_true',
+ help="Save command line options to the configuration file")
+
+ p.add_argument('-s', '--sigkey-file',
+ default=SIGKEY_FILE_DEFAULT,
+ help="Signing key file ({})".format(SIGKEY_FILE_DEFAULT))
+
+ p.add_argument('-u', '--base-url',
+ default=BASE_URL_DEFAULT,
+ help="Log base URL ({})".format(BASE_URL_DEFAULT))
+
+ self.parser = p
+
+def parse_config(filename):
+ try:
+ lines = []
+ with open(filename, 'r') as f:
+ line = f.readline()
+ while line:
+ lines.append(line.strip())
+ line = f.readline()
+ g_args.parser.parse_args(lines, namespace=g_args)
+ except FileNotFoundError:
+ pass
+
+def parse_args(argv):
+ g_args.parser.parse_args(namespace=g_args)
+
+def parse_keyval(text):
+ dictx = {}
+ for line in text.split():
+ (key, val) = line.split('=')
+ if not key in dictx:
+ dictx[key] = val
+ else:
+ if type(dictx[key]) is list:
+ dictx[key] += [val]
+ else:
+ dictx[key] = [dictx[key], val]
+ return dictx
+
+class TreeHead:
+ def __init__(self, sth_data):
+ self._text = parse_keyval(sth_data)
+ assert(len(self._text) == 5)
+ assert('timestamp' in self._text)
+ assert('tree_size' in self._text)
+ assert('root_hash' in self._text)
+ assert('signature' in self._text)
+ assert('key_hash' in self._text)
+
+ def text(self):
+ text = 'timestamp={}\n'.format(self._text['timestamp'])
+ text += 'tree_size={}\n'.format(self._text['tree_size'])
+ text += 'root_hash={}\n'.format(self._text['root_hash'])
+ text += 'signature={}\n'.format(self._text['signature'])
+ text += 'key_hash={}\n'.format(self._text['key_hash'])
+ return text.encode('ascii')
+
+ def serialise(self):
+ data = struct.pack('!QQ', self.timestamp(), self.tree_size())
+ data += unhexlify(self._text['root_hash'])
+ assert(len(data) == 48)
+ return data
+
+ def signature_valid(self, pubkey):
+ # Guard against tree head with >1 signature -- don't try to
+ # validate a cosigned tree head.
+ assert(type(self._text['signature']) is str)
+ sig = unhexlify(self._text['signature'])
+ assert(len(sig) == 64)
+ data = self.serialise()
+ try:
+ verified_data = pubkey.verify(sig + data)
+ except nacl.exceptions.BadSignatureError:
+ return False
+ assert(verified_data == data)
+ return True
+
+ def timestamp(self):
+ return int(self._text['timestamp'])
+ def tree_size(self):
+ return int(self._text['tree_size'])
+ def root_hash(self):
+ return unhexlify(self._text['root_hash'])
+
+class ConsistencyProof():
+ def __init__(self, consistency_proof_data):
+ self._text = parse_keyval(consistency_proof_data)
+ assert(len(self._text) == 3)
+ assert('old_size' in self._text)
+ assert('new_size' in self._text)
+ assert('consistency_path' in self._text)
+
+ def old_size(self):
+ return int(self._text['old_size'])
+ def new_size(self):
+ return int(self._text['new_size'])
+ def path(self):
+ if type(self._text['consistency_path']) is list:
+ return [unhexlify(e) for e in self._text['consistency_path']]
+ else:
+ return [unhexlify(self._text['consistency_path'])]
+
+def read_tree_head():
+ filename = os.path.expanduser(g_args.base_dir) + 'signed_tree_head'
+ try:
+ with open(filename, mode='r') as f:
+ return TreeHead(f.read())
+ except FileNotFoundError:
+ return None
+
+def store_tree_head(tree_head):
+ dirname = os.path.expanduser(g_args.base_dir)
+ try:
+ os.stat(dirname)
+ except FileNotFoundError:
+ os.makedirs(dirname)
+ with open(dirname + 'signed_tree_head', mode='w+b') as f:
+ f.write(tree_head.text())
+
+def fetch_tree_head():
+ req = requests.get(g_args.base_url + 'st/v0/get-tree-head-to-sign')
+ if req.status_code != 200:
+ return None
+ return TreeHead(req.content.decode())
+
+def fetch_consistency_proof(first, second):
+ post_data = 'old_size={}\n'.format(first)
+ post_data += 'new_size={}\n'.format(second)
+ req = requests.post(g_args.base_url + 'st/v0/get-consistency-proof', post_data)
+ if req.status_code != 200:
+ print("ERROR: st/v0/get-consistency-proof({}) => {}".format(post_data, req))
+ return None
+ return ConsistencyProof(req.content.decode())
+
+def numbits(n):
+ p = 0
+ while n > 0:
+ if n & 1:
+ p += 1
+ n >>= 1
+ return p
+
+# Implements the algorithm for consistency proof verification outlined
+# in RFC6962-BIS, see
+# https://datatracker.ietf.org/doc/html/draft-ietf-trans-rfc6962-bis-39#section-2.1.4.2
+def consistency_proof_valid(first, second, proof):
+ assert(first.tree_size() == proof.old_size())
+ assert(second.tree_size() == proof.new_size())
+
+ path = proof.path()
+ if len(path) == 0:
+ return False
+ if numbits(first.tree_size()) == 1:
+ path = [first.root_hash()] + path
+
+ fn = first.tree_size() - 1
+ sn = second.tree_size() - 1
+ while fn & 1:
+ fn >>= 1
+ sn >>= 1
+
+ fr = path[0]
+ sr = path[0]
+
+ for c in path[1:]:
+ if sn == 0:
+ return False
+
+ if fn & 1 or fn == sn:
+ fr = sha256(b'\x01' + c + fr).digest()
+ sr = sha256(b'\x01' + c + sr).digest()
+ while fn != 0 and fn & 1 == 0:
+ fn >>= 1
+ sn >>= 1
+ else:
+ sr = sha256(b'\x01' + sr + c).digest()
+
+ fn >>= 1
+ sn >>= 1
+
+ return sn == 0 and fr == first.root_hash() and sr == second.root_hash()
+
+def send_to_log(keyhash_hex, signature_hex):
+ post_data = 'signature={}\n'.format(signature_hex)
+ post_data += 'key_hash={}\n'.format(keyhash_hex)
+ req = requests.post(g_args.base_url + 'st/v0/add-cosignature', post_data)
+ if req.status_code != 200:
+ return req
+
+def sign_and_send_sig(signing_key, sth):
+ keyhash = sha256(signing_key.verify_key.encode()).hexdigest()
+ status = send_to_log(keyhash,
+ hexlify(signing_key.sign(sth.serialise()).signature).decode('ascii'))
+ if status:
+ print("ERROR: Unable to post signature to log: {} => {}: {}".format(status.url,
+ status.status_code,
+ status.text))
+def main(args):
+ global g_args
+ g_args = Parser()
+ parse_config(CONFIG_FILE)
+ parse_args(args)
+ if g_args.save_config:
+ # TODO write config file
+ print("ERROR: --save-config is not yet implemented")
+ return 12
+
+ consistency_verified = False
+ ignore_consistency = False
+
+ # TODO stop returning random integers -- use 1 all over or do something clever
+
+ if not g_args.log_verification_key:
+ print("ERROR: missing log verification key")
+ return 7
+ try:
+ log_verification_key = nacl.signing.VerifyKey(g_args.log_verification_key, encoder=nacl.encoding.HexEncoder)
+ except:
+ print("ERROR: invalid log verification key: {}".format(g_args.log_verification_key))
+ return 8
+
+ try:
+ s = os.stat(g_args.sigkey_file, follow_symlinks=False)
+ if not S_ISREG(s.st_mode):
+ print("ERROR: Signing key file {} must be a regular file".format(g_args.sigkey_file))
+ return 9
+ if S_IMODE(s.st_mode) & 0o077 != 0:
+ print("ERROR: Signing key file {} permissions too lax: {:04o}".format(g_args.sigkey_file, S_IMODE(s.st_mode)))
+ return 10
+ except FileNotFoundError:
+ print("INFO: Signing key file {} not found -- generating new signing key".format(g_args.sigkey_file))
+ signing_key = nacl.signing.SigningKey.generate()
+ print("INFO: verification key: {}".format(signing_key.verify_key.encode(encoder=nacl.encoding.HexEncoder)))
+ with open(g_args.sigkey_file, 'w') as f:
+ os.chmod(f.fileno(), S_IRUSR)
+ f.write(signing_key.encode(encoder=nacl.encoding.HexEncoder).decode('ascii'))
+
+ with open(g_args.sigkey_file, 'r') as f:
+ try:
+ signing_key = nacl.signing.SigningKey(f.readline().strip(), encoder=nacl.encoding.HexEncoder)
+ except:
+ print("ERROR: Invalid signing key in {}".format(g_args.sigkey_file))
+ return 11
+
+ new = fetch_tree_head()
+ if not new:
+ print("ERROR: unable to fetch new tree head")
+ return 6
+ if not new.signature_valid(log_verification_key):
+ print("ERROR: signature of new tree head not valid")
+ return 2
+
+ cur = read_tree_head()
+ if not cur:
+ print("INFO: No current tree head found in {}".format(g_args.base_dir))
+ else:
+ if not cur.signature_valid(log_verification_key):
+ print("ERROR: signature of current tree head not valid")
+ return 3
+ if new.tree_size() <= cur.tree_size():
+ print("INFO: Fetched tree already verified, size {}".format(cur.tree_size()))
+ else:
+ proof = fetch_consistency_proof(cur.tree_size(), new.tree_size())
+ if not proof:
+ print("ERROR: unable to fetch consistency proof")
+ return 4
+ if consistency_proof_valid(cur, new, proof):
+ consistency_verified = True
+ else:
+ print("ERROR: failing consistency proof check for {}->{}".format(cur.tree_size(), new.tree_size()))
+ print("DEBUG: {}:{}->{}:{}\n {}".format(cur.tree_size(),
+ cur.root_hash(),
+ new.tree_size(),
+ new.root_hash(),
+ proof.path()))
+ return 5
+
+ if g_args.bootstrap_log:
+ # TODO maybe require user confirmation
+ ignore_consistency = True
+
+ store_tree_head(new)
+ if consistency_verified or ignore_consistency:
+ sign_and_send_sig(signing_key, new)
+
+ return 0
+
+if __name__ == '__main__':
+ sys.exit(main(sys.argv))