From b1c954687ca049ac7e34034f98ba9fd8c15258e7 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Thu, 3 Jun 2021 09:11:00 +0200 Subject: error handling cleaned up a bit Also, create base_dir in time, if it doesn't exist. Also also, set permission on base_dir when creating it. --- siglog-witness.py | 168 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 98 insertions(+), 70 deletions(-) diff --git a/siglog-witness.py b/siglog-witness.py index 146caf7..f3475d2 100755 --- a/siglog-witness.py +++ b/siglog-witness.py @@ -28,9 +28,20 @@ from hashlib import sha256 BASE_URL_DEFAULT = 'http://tlog-poc.system-transparency.org:6965/' CONFIG_DIR_DEFAULT = os.path.expanduser('~/.config/siglog-witness/') SIGKEY_FILE_DEFAULT = CONFIG_DIR_DEFAULT + 'signing_key' - CONFIG_FILE = CONFIG_DIR_DEFAULT + 'siglog-witness.conf' +ERR_TREEHEAD_SIGNATURE_INVALID = 2 +ERR_CONSISTENCYPROOF_FETCH = 4 +ERR_CONSISTENCYPROOF_INVALID = 5 +ERR_TREEHEAD_FETCH = 6 +ERR_LOGKEYFILE = 7 +ERR_LOGKEY_FORMAT = 8 +ERR_SIGKEYFILE_TYPE = 9 +ERR_SIGKEYFILE_MODE = 10 +ERR_SIGKEY_FORMAT = 11 +ERR_NYI = 12 +ERR_COSIG_POST = 13 + class Parser: def __init__(self): p = argparse.ArgumentParser( @@ -152,6 +163,13 @@ class ConsistencyProof(): else: return [unhexlify(self._text['consistency_path'])] +def make_base_dir_maybe(): + dirname = os.path.expanduser(g_args.base_dir) + try: + os.stat(dirname) + except FileNotFoundError: + os.makedirs(dirname, mode=0o700) + def read_tree_head(): filename = os.path.expanduser(g_args.base_dir) + 'signed_tree_head' try: @@ -162,18 +180,21 @@ def read_tree_head(): def store_tree_head(tree_head): dirname = os.path.expanduser(g_args.base_dir) - try: - os.stat(dirname) - except FileNotFoundError: - os.makedirs(dirname) with open(dirname + 'signed_tree_head', mode='w+b') as f: f.write(tree_head.text()) -def fetch_tree_head(): +def fetch_tree_head(log_verification_key): req = requests.get(g_args.base_url + 'st/v0/get-tree-head-to-sign') if req.status_code != 200: - return None - return TreeHead(req.content.decode()) + return None, (ERR_TREEHEAD_FETCH, "ERROR: unable to fetch new tree head: {}".format(req.status_code)) + + tree_head = TreeHead(req.content.decode()) + + if not tree_head.signature_valid(log_verification_key): + return None, (ERR_TREEHEAD_SIGNATURE_INVALID, "ERROR: signature of fetched tree head not valid") + + assert(tree_head is not None) + return tree_head, None def fetch_consistency_proof(first, second): post_data = 'old_size={}\n'.format(first) @@ -241,92 +262,96 @@ def sign_and_send_sig(signing_key, sth): req = requests.post(g_args.base_url + 'st/v0/add-cosignature', post_data) if req.status_code != 200: - print("ERROR: Unable to post signature to log: {} => {}: {}". - format(req.url, - req.err_code, - req.text)) - return None - return True - -def main(args): - global g_args - g_args = Parser() - parse_config(CONFIG_FILE) - parse_args(args) - if g_args.save_config: - # TODO write config file - print("ERROR: --save-config is not yet implemented") - return 12 - - consistency_verified = False - ignore_consistency = False - - # TODO stop returning random integers -- use 1 all over or do something clever + return (ERR_COSIG_POST, + "ERROR: Unable to post signature to log: {} => {}: {}". format(req.url, + req.status_code, + req.text)) +def ensure_log_verification_key(): if not g_args.log_verification_key: - print("ERROR: missing log verification key") - return 7 + return None, (ERR_LOGKEYFILE, "ERROR: missing log verification key") try: log_verification_key = nacl.signing.VerifyKey(g_args.log_verification_key, encoder=nacl.encoding.HexEncoder) except: - print("ERROR: invalid log verification key: {}".format(g_args.log_verification_key)) - return 8 + return None, (ERR_LOGKEY_FORMAT, "ERROR: invalid log verification key: {}".format(g_args.log_verification_key)) + + assert(log_verification_key is not None) + return log_verification_key, None +# Read signature key from file, or generate one and write it to file. +def ensure_sigkey(fn): try: - s = os.stat(g_args.sigkey_file, follow_symlinks=False) - if not S_ISREG(s.st_mode): - print("ERROR: Signing key file {} must be a regular file".format(g_args.sigkey_file)) - return 9 - if S_IMODE(s.st_mode) & 0o077 != 0: - print("ERROR: Signing key file {} permissions too lax: {:04o}".format(g_args.sigkey_file, S_IMODE(s.st_mode))) - return 10 + os.stat(fn, follow_symlinks=False) except FileNotFoundError: - print("INFO: Signing key file {} not found -- generating new signing key".format(g_args.sigkey_file)) + print("INFO: Signing key file {} not found -- generating new signing key".format(fn)) signing_key = nacl.signing.SigningKey.generate() - print("INFO: verification key: {}".format(signing_key.verify_key.encode(encoder=nacl.encoding.HexEncoder))) - with open(g_args.sigkey_file, 'w') as f: + print("INFO: verification key: {}".format(signing_key.verify_key.encode(nacl.encoding.HexEncoder).decode('ascii'))) + with open(fn, 'w') as f: os.chmod(f.fileno(), S_IRUSR) f.write(signing_key.encode(encoder=nacl.encoding.HexEncoder).decode('ascii')) - with open(g_args.sigkey_file, 'r') as f: + s = os.stat(fn, follow_symlinks=False) + if not S_ISREG(s.st_mode): + return None, (ERR_SIGKEYFILE_TYPE, "ERROR: Signing key file {} must be a regular file".format(fn)) + if S_IMODE(s.st_mode) & 0o077 != 0: + return None, (ERR_SIGKEYFILE_MODE, "ERROR: Signing key file {} permissions too lax: {:04o}".format(fn, S_IMODE(s.st_mode))) + + with open(fn, 'r') as f: try: - signing_key = nacl.signing.SigningKey(f.readline().strip(), encoder=nacl.encoding.HexEncoder) + signing_key = nacl.signing.SigningKey(f.readline().strip(), nacl.encoding.HexEncoder) except: - print("ERROR: Invalid signing key in {}".format(g_args.sigkey_file)) - return 11 + return None, (ERR_SIGKEY_FORMAT, "ERROR: Invalid signing key in {}".format(fn)) + + assert(signing_key is not None) + return signing_key, None + +def main(args): + msg = None + global g_args + g_args = Parser() + parse_config(CONFIG_FILE) + parse_args(args) + if g_args.save_config: + # TODO write to config file + return ERR_NYI, "ERROR: --save-config is not yet implemented" + + consistency_verified = False + ignore_consistency = False + + make_base_dir_maybe() + + log_verification_key, err = ensure_log_verification_key() + if err: return err + + signing_key, err = ensure_sigkey(g_args.sigkey_file) + if err: return err - new = fetch_tree_head() - if not new: - print("ERROR: unable to fetch new tree head") - return 6 - if not new.signature_valid(log_verification_key): - print("ERROR: signature of new tree head not valid") - return 2 + new, err = fetch_tree_head(log_verification_key) + if err: return err + # FIXME: if we're bootstrapping, ignore potential tree head on disk cur = read_tree_head() if not cur: print("INFO: No current tree head found in {}".format(g_args.base_dir)) else: if not cur.signature_valid(log_verification_key): - print("ERROR: signature of current tree head not valid") - return 3 + return ERR_TREEHEAD_SIGNATURE_INVALID, "ERROR: signature of current tree head invalid" if new.tree_size() <= cur.tree_size(): - print("INFO: Fetched tree already verified, size {}".format(cur.tree_size())) + msg = "INFO: Fetched tree already verified, size {}".format(cur.tree_size()) else: proof = fetch_consistency_proof(cur.tree_size(), new.tree_size()) if not proof: - print("ERROR: unable to fetch consistency proof") - return 4 + return ERR_CONSISTENCYPROOF_FETCH, "ERROR: unable to fetch consistency proof" if consistency_proof_valid(cur, new, proof): consistency_verified = True else: - print("ERROR: failing consistency proof check for {}->{}".format(cur.tree_size(), new.tree_size())) - print("DEBUG: {}:{}->{}:{}\n {}".format(cur.tree_size(), - cur.root_hash(), - new.tree_size(), - new.root_hash(), - proof.path())) - return 5 + errmsg = "ERROR: failing consistency proof check for {}->{}".format(cur.tree_size(), new.tree_size()) + errmsg += "DEBUG: {}:{}->{}:{}\n {}".format(cur.tree_size(), + cur.root_hash(), + new.tree_size(), + new.root_hash(), + proof.path()) + return ERR_CONSISTENCYPROOF_INVALID, errmsg if g_args.bootstrap_log: # TODO maybe require user confirmation @@ -334,10 +359,13 @@ def main(args): store_tree_head(new) if consistency_verified or ignore_consistency: - if not sign_and_send_sig(signing_key, new): - return 13 + err = sign_and_send_sig(signing_key, new) + if err: return err - return 0 + return 0, msg if __name__ == '__main__': - sys.exit(main(sys.argv)) + status = main(sys.argv) + if status[1]: + print(status[1]) + sys.exit(status[0]) -- cgit v1.2.3