aboutsummaryrefslogtreecommitdiff
path: root/siglog-witness.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordberg.se>2021-06-03 09:11:00 +0200
committerLinus Nordberg <linus@nordberg.se>2021-06-03 09:11:00 +0200
commitb1c954687ca049ac7e34034f98ba9fd8c15258e7 (patch)
treeaba0e8fda433bc67cc38c101527731f4217c5ddc /siglog-witness.py
parent1d147b0906e1b3ba6ac2a59f99503d0e67933330 (diff)
error handling cleaned up a bit
Also, create base_dir in time, if it doesn't exist. Also also, set permission on base_dir when creating it.
Diffstat (limited to 'siglog-witness.py')
-rwxr-xr-xsiglog-witness.py168
1 files changed, 98 insertions, 70 deletions
diff --git a/siglog-witness.py b/siglog-witness.py
index 146caf7..f3475d2 100755
--- a/siglog-witness.py
+++ b/siglog-witness.py
@@ -28,9 +28,20 @@ from hashlib import sha256
BASE_URL_DEFAULT = 'http://tlog-poc.system-transparency.org:6965/'
CONFIG_DIR_DEFAULT = os.path.expanduser('~/.config/siglog-witness/')
SIGKEY_FILE_DEFAULT = CONFIG_DIR_DEFAULT + 'signing_key'
-
CONFIG_FILE = CONFIG_DIR_DEFAULT + 'siglog-witness.conf'
+ERR_TREEHEAD_SIGNATURE_INVALID = 2
+ERR_CONSISTENCYPROOF_FETCH = 4
+ERR_CONSISTENCYPROOF_INVALID = 5
+ERR_TREEHEAD_FETCH = 6
+ERR_LOGKEYFILE = 7
+ERR_LOGKEY_FORMAT = 8
+ERR_SIGKEYFILE_TYPE = 9
+ERR_SIGKEYFILE_MODE = 10
+ERR_SIGKEY_FORMAT = 11
+ERR_NYI = 12
+ERR_COSIG_POST = 13
+
class Parser:
def __init__(self):
p = argparse.ArgumentParser(
@@ -152,6 +163,13 @@ class ConsistencyProof():
else:
return [unhexlify(self._text['consistency_path'])]
+def make_base_dir_maybe():
+ dirname = os.path.expanduser(g_args.base_dir)
+ try:
+ os.stat(dirname)
+ except FileNotFoundError:
+ os.makedirs(dirname, mode=0o700)
+
def read_tree_head():
filename = os.path.expanduser(g_args.base_dir) + 'signed_tree_head'
try:
@@ -162,18 +180,21 @@ def read_tree_head():
def store_tree_head(tree_head):
dirname = os.path.expanduser(g_args.base_dir)
- try:
- os.stat(dirname)
- except FileNotFoundError:
- os.makedirs(dirname)
with open(dirname + 'signed_tree_head', mode='w+b') as f:
f.write(tree_head.text())
-def fetch_tree_head():
+def fetch_tree_head(log_verification_key):
req = requests.get(g_args.base_url + 'st/v0/get-tree-head-to-sign')
if req.status_code != 200:
- return None
- return TreeHead(req.content.decode())
+ return None, (ERR_TREEHEAD_FETCH, "ERROR: unable to fetch new tree head: {}".format(req.status_code))
+
+ tree_head = TreeHead(req.content.decode())
+
+ if not tree_head.signature_valid(log_verification_key):
+ return None, (ERR_TREEHEAD_SIGNATURE_INVALID, "ERROR: signature of fetched tree head not valid")
+
+ assert(tree_head is not None)
+ return tree_head, None
def fetch_consistency_proof(first, second):
post_data = 'old_size={}\n'.format(first)
@@ -241,92 +262,96 @@ def sign_and_send_sig(signing_key, sth):
req = requests.post(g_args.base_url + 'st/v0/add-cosignature', post_data)
if req.status_code != 200:
- print("ERROR: Unable to post signature to log: {} => {}: {}".
- format(req.url,
- req.err_code,
- req.text))
- return None
- return True
-
-def main(args):
- global g_args
- g_args = Parser()
- parse_config(CONFIG_FILE)
- parse_args(args)
- if g_args.save_config:
- # TODO write config file
- print("ERROR: --save-config is not yet implemented")
- return 12
-
- consistency_verified = False
- ignore_consistency = False
-
- # TODO stop returning random integers -- use 1 all over or do something clever
+ return (ERR_COSIG_POST,
+ "ERROR: Unable to post signature to log: {} => {}: {}". format(req.url,
+ req.status_code,
+ req.text))
+def ensure_log_verification_key():
if not g_args.log_verification_key:
- print("ERROR: missing log verification key")
- return 7
+ return None, (ERR_LOGKEYFILE, "ERROR: missing log verification key")
try:
log_verification_key = nacl.signing.VerifyKey(g_args.log_verification_key, encoder=nacl.encoding.HexEncoder)
except:
- print("ERROR: invalid log verification key: {}".format(g_args.log_verification_key))
- return 8
+ return None, (ERR_LOGKEY_FORMAT, "ERROR: invalid log verification key: {}".format(g_args.log_verification_key))
+
+ assert(log_verification_key is not None)
+ return log_verification_key, None
+# Read signature key from file, or generate one and write it to file.
+def ensure_sigkey(fn):
try:
- s = os.stat(g_args.sigkey_file, follow_symlinks=False)
- if not S_ISREG(s.st_mode):
- print("ERROR: Signing key file {} must be a regular file".format(g_args.sigkey_file))
- return 9
- if S_IMODE(s.st_mode) & 0o077 != 0:
- print("ERROR: Signing key file {} permissions too lax: {:04o}".format(g_args.sigkey_file, S_IMODE(s.st_mode)))
- return 10
+ os.stat(fn, follow_symlinks=False)
except FileNotFoundError:
- print("INFO: Signing key file {} not found -- generating new signing key".format(g_args.sigkey_file))
+ print("INFO: Signing key file {} not found -- generating new signing key".format(fn))
signing_key = nacl.signing.SigningKey.generate()
- print("INFO: verification key: {}".format(signing_key.verify_key.encode(encoder=nacl.encoding.HexEncoder)))
- with open(g_args.sigkey_file, 'w') as f:
+ print("INFO: verification key: {}".format(signing_key.verify_key.encode(nacl.encoding.HexEncoder).decode('ascii')))
+ with open(fn, 'w') as f:
os.chmod(f.fileno(), S_IRUSR)
f.write(signing_key.encode(encoder=nacl.encoding.HexEncoder).decode('ascii'))
- with open(g_args.sigkey_file, 'r') as f:
+ s = os.stat(fn, follow_symlinks=False)
+ if not S_ISREG(s.st_mode):
+ return None, (ERR_SIGKEYFILE_TYPE, "ERROR: Signing key file {} must be a regular file".format(fn))
+ if S_IMODE(s.st_mode) & 0o077 != 0:
+ return None, (ERR_SIGKEYFILE_MODE, "ERROR: Signing key file {} permissions too lax: {:04o}".format(fn, S_IMODE(s.st_mode)))
+
+ with open(fn, 'r') as f:
try:
- signing_key = nacl.signing.SigningKey(f.readline().strip(), encoder=nacl.encoding.HexEncoder)
+ signing_key = nacl.signing.SigningKey(f.readline().strip(), nacl.encoding.HexEncoder)
except:
- print("ERROR: Invalid signing key in {}".format(g_args.sigkey_file))
- return 11
+ return None, (ERR_SIGKEY_FORMAT, "ERROR: Invalid signing key in {}".format(fn))
+
+ assert(signing_key is not None)
+ return signing_key, None
+
+def main(args):
+ msg = None
+ global g_args
+ g_args = Parser()
+ parse_config(CONFIG_FILE)
+ parse_args(args)
+ if g_args.save_config:
+ # TODO write to config file
+ return ERR_NYI, "ERROR: --save-config is not yet implemented"
+
+ consistency_verified = False
+ ignore_consistency = False
+
+ make_base_dir_maybe()
+
+ log_verification_key, err = ensure_log_verification_key()
+ if err: return err
+
+ signing_key, err = ensure_sigkey(g_args.sigkey_file)
+ if err: return err
- new = fetch_tree_head()
- if not new:
- print("ERROR: unable to fetch new tree head")
- return 6
- if not new.signature_valid(log_verification_key):
- print("ERROR: signature of new tree head not valid")
- return 2
+ new, err = fetch_tree_head(log_verification_key)
+ if err: return err
+ # FIXME: if we're bootstrapping, ignore potential tree head on disk
cur = read_tree_head()
if not cur:
print("INFO: No current tree head found in {}".format(g_args.base_dir))
else:
if not cur.signature_valid(log_verification_key):
- print("ERROR: signature of current tree head not valid")
- return 3
+ return ERR_TREEHEAD_SIGNATURE_INVALID, "ERROR: signature of current tree head invalid"
if new.tree_size() <= cur.tree_size():
- print("INFO: Fetched tree already verified, size {}".format(cur.tree_size()))
+ msg = "INFO: Fetched tree already verified, size {}".format(cur.tree_size())
else:
proof = fetch_consistency_proof(cur.tree_size(), new.tree_size())
if not proof:
- print("ERROR: unable to fetch consistency proof")
- return 4
+ return ERR_CONSISTENCYPROOF_FETCH, "ERROR: unable to fetch consistency proof"
if consistency_proof_valid(cur, new, proof):
consistency_verified = True
else:
- print("ERROR: failing consistency proof check for {}->{}".format(cur.tree_size(), new.tree_size()))
- print("DEBUG: {}:{}->{}:{}\n {}".format(cur.tree_size(),
- cur.root_hash(),
- new.tree_size(),
- new.root_hash(),
- proof.path()))
- return 5
+ errmsg = "ERROR: failing consistency proof check for {}->{}".format(cur.tree_size(), new.tree_size())
+ errmsg += "DEBUG: {}:{}->{}:{}\n {}".format(cur.tree_size(),
+ cur.root_hash(),
+ new.tree_size(),
+ new.root_hash(),
+ proof.path())
+ return ERR_CONSISTENCYPROOF_INVALID, errmsg
if g_args.bootstrap_log:
# TODO maybe require user confirmation
@@ -334,10 +359,13 @@ def main(args):
store_tree_head(new)
if consistency_verified or ignore_consistency:
- if not sign_and_send_sig(signing_key, new):
- return 13
+ err = sign_and_send_sig(signing_key, new)
+ if err: return err
- return 0
+ return 0, msg
if __name__ == '__main__':
- sys.exit(main(sys.argv))
+ status = main(sys.argv)
+ if status[1]:
+ print(status[1])
+ sys.exit(status[0])