diff options
| author | Linus Nordberg <linus@nordberg.se> | 2022-05-24 23:33:38 +0200 | 
|---|---|---|
| committer | Rasmus Dahlberg <rasmus@mullvad.net> | 2022-06-23 11:33:17 +0200 | 
| commit | 559bccccd40d028e412d9f11709ded0250ba6dcd (patch) | |
| tree | 50f3193dbe70fec21357963c11e5f663013f4b4c /internal/state | |
| parent | 4b20ef0c1732bcef633c0ed7104501898aa84e2c (diff) | |
implement primary and secondary role, for replicationv0.5.0
Diffstat (limited to 'internal/state')
| -rw-r--r-- | internal/state/single.go | 265 | ||||
| -rw-r--r-- | internal/state/single_test.go | 233 | ||||
| -rw-r--r-- | internal/state/state_manager.go | 30 | 
3 files changed, 528 insertions, 0 deletions
| diff --git a/internal/state/single.go b/internal/state/single.go new file mode 100644 index 0000000..fd73b3f --- /dev/null +++ b/internal/state/single.go @@ -0,0 +1,265 @@ +package state + +import ( +	"context" +	"crypto" +	"crypto/ed25519" +	"fmt" +	"sync" +	"time" + +	"git.sigsum.org/log-go/internal/db" +	"git.sigsum.org/sigsum-go/pkg/client" +	"git.sigsum.org/sigsum-go/pkg/log" +	"git.sigsum.org/sigsum-go/pkg/merkle" +	"git.sigsum.org/sigsum-go/pkg/requests" +	"git.sigsum.org/sigsum-go/pkg/types" +) + +// StateManagerSingle implements a single-instance StateManagerPrimary for primary nodes +type StateManagerSingle struct { +	client    db.Client +	signer    crypto.Signer +	namespace merkle.Hash +	interval  time.Duration +	deadline  time.Duration +	secondary client.Client + +	// Lock-protected access to pointers.  A write lock is only obtained once +	// per interval when doing pointer rotation.  All endpoints are readers. +	sync.RWMutex +	signedTreeHead   *types.SignedTreeHead +	cosignedTreeHead *types.CosignedTreeHead + +	// Syncronized and deduplicated witness cosignatures for signedTreeHead +	events       chan *event +	cosignatures map[merkle.Hash]*types.Signature +} + +// NewStateManagerSingle() sets up a new state manager, in particular its +// signedTreeHead.  An optional secondary node can be used to ensure that +// a newer primary tree is not signed unless it has been replicated. +func NewStateManagerSingle(dbcli db.Client, signer crypto.Signer, interval, deadline time.Duration, secondary client.Client) (*StateManagerSingle, error) { +	sm := &StateManagerSingle{ +		client:    dbcli, +		signer:    signer, +		namespace: *merkle.HashFn(signer.Public().(ed25519.PublicKey)), +		interval:  interval, +		deadline:  deadline, +		secondary: secondary, +	} +	sth, err := sm.restoreTreeHead() +	if err != nil { +		return nil, fmt.Errorf("restore signed tree head: %v", err) +	} +	sm.signedTreeHead = sth + +	ictx, cancel := context.WithTimeout(context.Background(), sm.deadline) +	defer cancel() +	return sm, sm.tryRotate(ictx) +} + +func (sm *StateManagerSingle) ToCosignTreeHead() *types.SignedTreeHead { +	sm.RLock() +	defer sm.RUnlock() +	return sm.signedTreeHead +} + +func (sm *StateManagerSingle) CosignedTreeHead(_ context.Context) (*types.CosignedTreeHead, error) { +	sm.RLock() +	defer sm.RUnlock() +	if sm.cosignedTreeHead == nil { +		return nil, fmt.Errorf("no cosignatures available") +	} +	return sm.cosignedTreeHead, nil +} + +func (sm *StateManagerSingle) AddCosignature(ctx context.Context, pub *types.PublicKey, sig *types.Signature) error { +	sm.RLock() +	defer sm.RUnlock() + +	msg := sm.signedTreeHead.TreeHead.ToBinary(&sm.namespace) +	if !ed25519.Verify(ed25519.PublicKey(pub[:]), msg, sig[:]) { +		return fmt.Errorf("invalid cosignature") +	} +	select { +	case sm.events <- &event{merkle.HashFn(pub[:]), sig}: +		return nil +	case <-ctx.Done(): +		return fmt.Errorf("request timeout") +	} +} + +func (sm *StateManagerSingle) Run(ctx context.Context) { +	sm.events = make(chan *event, 4096) +	defer close(sm.events) +	ticker := time.NewTicker(sm.interval) +	defer ticker.Stop() + +	for { +		select { +		case <-ticker.C: +			ictx, cancel := context.WithTimeout(ctx, sm.deadline) +			defer cancel() +			if err := sm.tryRotate(ictx); err != nil { +				log.Warning("failed rotating tree heads: %v", err) +			} +		case ev := <-sm.events: +			sm.handleEvent(ev) +		case <-ctx.Done(): +			return +		} +	} +} + +func (sm *StateManagerSingle) tryRotate(ctx context.Context) error { +	th, err := sm.client.GetTreeHead(ctx) +	if err != nil { +		return fmt.Errorf("get tree head: %v", err) +	} +	nextSTH, err := sm.chooseTree(ctx, th).Sign(sm.signer, &sm.namespace) +	if err != nil { +		return fmt.Errorf("sign tree head: %v", err) +	} +	log.Debug("wanted to advance to size %d, chose size %d", th.TreeSize, nextSTH.TreeSize) + +	sm.rotate(nextSTH) +	return nil +} + +// chooseTree picks a tree to publish, taking the state of a possible secondary node into account. +func (sm *StateManagerSingle) chooseTree(ctx context.Context, proposedTreeHead *types.TreeHead) *types.TreeHead { +	if !sm.secondary.Initiated() { +		return proposedTreeHead +	} + +	secSTH, err := sm.secondary.GetToCosignTreeHead(ctx) +	if err != nil { +		log.Warning("failed fetching tree head from secondary: %v", err) +		return refreshTreeHead(sm.signedTreeHead.TreeHead) +	} +	if secSTH.TreeSize > proposedTreeHead.TreeSize { +		log.Error("secondary is ahead of us: %d > %d", secSTH.TreeSize, proposedTreeHead.TreeSize) +		return refreshTreeHead(sm.signedTreeHead.TreeHead) +	} + +	if secSTH.TreeSize == proposedTreeHead.TreeSize { +		if secSTH.RootHash != proposedTreeHead.RootHash { +			log.Error("secondary root hash doesn't match our root hash at tree size %d", secSTH.TreeSize) +			return refreshTreeHead(sm.signedTreeHead.TreeHead) +		} +		log.Debug("secondary is up-to-date with matching tree head, using proposed tree, size %d", proposedTreeHead.TreeSize) +		return proposedTreeHead +	} +	// +	// Now we know that the proposed tree size is larger than the secondary's tree size. +	// We also now that the secondary's minimum tree size is 0. +	// This means that the proposed tree size is at least 1. +	// +	// Case 1: secondary tree size is 0, primary tree size is >0 --> return based on what we signed before +	// Case 2: secondary tree size is 1, primary tree size is >1 --> fetch consistency proof, if ok -> +	//   2a) secondary tree size is smaller than or equal to what we than signed before -> return whatever we signed before +	//   2b) secondary tree size is larger than what we signed before -> return secondary tree head +	// +	// (If not ok in case 2, return based on what we signed before) +	// +	if secSTH.TreeSize == 0 { +		return refreshTreeHead(sm.signedTreeHead.TreeHead) +	} +	if err := sm.verifyConsistencyWithLatest(ctx, secSTH.TreeHead); err != nil { +		log.Error("secondaries tree not consistent with ours: %v", err) +		return refreshTreeHead(sm.signedTreeHead.TreeHead) +	} +	if secSTH.TreeSize <= sm.signedTreeHead.TreeSize { +		log.Warning("secondary is behind what primary already signed: %d <= %d", secSTH.TreeSize, sm.signedTreeHead.TreeSize) +		return refreshTreeHead(sm.signedTreeHead.TreeHead) +	} + +	log.Debug("using latest tree head from secondary: size %d", secSTH.TreeSize) +	return refreshTreeHead(secSTH.TreeHead) +} + +func (sm *StateManagerSingle) verifyConsistencyWithLatest(ctx context.Context, to types.TreeHead) error { +	from := sm.signedTreeHead.TreeHead +	req := &requests.ConsistencyProof{ +		OldSize: from.TreeSize, +		NewSize: to.TreeSize, +	} +	proof, err := sm.client.GetConsistencyProof(ctx, req) +	if err != nil { +		return fmt.Errorf("unable to get consistency proof from %d to %d: %w", req.OldSize, req.NewSize, err) +	} +	if err := proof.Verify(&from.RootHash, &to.RootHash); err != nil { +		return fmt.Errorf("invalid consistency proof from %d to %d: %v", req.OldSize, req.NewSize, err) +	} +	log.Debug("consistency proof from %d to %d verified", req.OldSize, req.NewSize) +	return nil +} + +func (sm *StateManagerSingle) rotate(nextSTH *types.SignedTreeHead) { +	sm.Lock() +	defer sm.Unlock() + +	log.Debug("about to rotate tree heads, next at %d: %s", nextSTH.TreeSize, sm.treeStatusString()) +	sm.handleEvents() +	sm.setCosignedTreeHead() +	sm.setToCosignTreeHead(nextSTH) +	log.Debug("tree heads rotated: %s", sm.treeStatusString()) +} + +func (sm *StateManagerSingle) handleEvents() { +	log.Debug("handling any outstanding events") +	for i, n := 0, len(sm.events); i < n; i++ { +		sm.handleEvent(<-sm.events) +	} +} + +func (sm *StateManagerSingle) handleEvent(ev *event) { +	log.Debug("handling event from witness %x", ev.keyHash[:]) +	sm.cosignatures[*ev.keyHash] = ev.cosignature +} + +func (sm *StateManagerSingle) setCosignedTreeHead() { +	n := len(sm.cosignatures) +	if n == 0 { +		sm.cosignedTreeHead = nil +		return +	} + +	var cth types.CosignedTreeHead +	cth.SignedTreeHead = *sm.signedTreeHead +	cth.Cosignature = make([]types.Signature, 0, n) +	cth.KeyHash = make([]merkle.Hash, 0, n) +	for keyHash, cosignature := range sm.cosignatures { +		cth.KeyHash = append(cth.KeyHash, keyHash) +		cth.Cosignature = append(cth.Cosignature, *cosignature) +	} +	sm.cosignedTreeHead = &cth +} + +func (sm *StateManagerSingle) setToCosignTreeHead(nextSTH *types.SignedTreeHead) { +	sm.cosignatures = make(map[merkle.Hash]*types.Signature) +	sm.signedTreeHead = nextSTH +} + +func (sm *StateManagerSingle) treeStatusString() string { +	var cosigned uint64 +	if sm.cosignedTreeHead != nil { +		cosigned = sm.cosignedTreeHead.TreeSize +	} +	return fmt.Sprintf("signed at %d, cosigned at %d", sm.signedTreeHead.TreeSize, cosigned) +} + +func (sm *StateManagerSingle) restoreTreeHead() (*types.SignedTreeHead, error) { +	th := zeroTreeHead() // TODO: restore from disk, stored when advanced the tree; zero tree head if "bootstrap" +	return refreshTreeHead(*th).Sign(sm.signer, &sm.namespace) +} + +func zeroTreeHead() *types.TreeHead { +	return refreshTreeHead(types.TreeHead{RootHash: *merkle.HashFn([]byte(""))}) +} + +func refreshTreeHead(th types.TreeHead) *types.TreeHead { +	th.Timestamp = uint64(time.Now().Unix()) +	return &th +} diff --git a/internal/state/single_test.go b/internal/state/single_test.go new file mode 100644 index 0000000..9442fdc --- /dev/null +++ b/internal/state/single_test.go @@ -0,0 +1,233 @@ +package state + +import ( +	"bytes" +	"context" +	"crypto" +	"crypto/ed25519" +	"crypto/rand" +	"fmt" +	"io" +	"reflect" +	"testing" +	"time" + +	mocksClient "git.sigsum.org/log-go/internal/mocks/client" +	mocksDB "git.sigsum.org/log-go/internal/mocks/db" +	"git.sigsum.org/sigsum-go/pkg/hex" +	"git.sigsum.org/sigsum-go/pkg/merkle" +	"git.sigsum.org/sigsum-go/pkg/types" +	"github.com/golang/mock/gomock" +) + +// TestSigner implements the signer interface.  It can be used to mock +// an Ed25519 signer that always return the same public key, +// signature, and error. +// NOTE: Code duplication with internal/node/secondary/endpoint_internal_test.go +type TestSigner struct { +	PublicKey [ed25519.PublicKeySize]byte +	Signature [ed25519.SignatureSize]byte +	Error     error +} + +func (ts *TestSigner) Public() crypto.PublicKey { +	return ed25519.PublicKey(ts.PublicKey[:]) +} + +func (ts *TestSigner) Sign(rand io.Reader, digest []byte, opts crypto.SignerOpts) ([]byte, error) { +	return ts.Signature[:], ts.Error +} + +func TestNewStateManagerSingle(t *testing.T) { +	signerOk := &TestSigner{types.PublicKey{}, types.Signature{}, nil} +	signerErr := &TestSigner{types.PublicKey{}, types.Signature{}, fmt.Errorf("err")} +	for _, table := range []struct { +		description string +		signer      crypto.Signer +		thExp       bool +		thErr       error +		th          types.TreeHead +		secExp      bool +		wantErr     bool +	}{ +		{"invalid: signer failure", signerErr, false, nil, types.TreeHead{}, false, true}, +		{"valid", signerOk, true, nil, types.TreeHead{Timestamp: now(t)}, true, false}, +	} { +		func() { +			ctrl := gomock.NewController(t) +			defer ctrl.Finish() +			trillianClient := mocksDB.NewMockClient(ctrl) +			if table.thExp { +				trillianClient.EXPECT().GetTreeHead(gomock.Any()).Return(&table.th, table.thErr) +			} +			secondary := mocksClient.NewMockClient(ctrl) +			if table.secExp { +				secondary.EXPECT().Initiated().Return(false) +			} + +			sm, err := NewStateManagerSingle(trillianClient, table.signer, time.Duration(0), time.Duration(0), secondary) +			if got, want := err != nil, table.description != "valid"; got != want { +				t.Errorf("got error %v but wanted %v in test %q: %v", got, want, table.description, err) +			} +			if err != nil { +				return +			} + +			if got, want := sm.signedTreeHead.TreeSize, table.th.TreeSize; got != want { +				t.Errorf("%q: got tree size %d but wanted %d", table.description, got, want) +			} +			if got, want := sm.signedTreeHead.RootHash[:], table.th.RootHash[:]; !bytes.Equal(got, want) { +				t.Errorf("%q: got tree size %v but wanted %v", table.description, got, want) +			} +			if got, want := sm.signedTreeHead.Timestamp, table.th.Timestamp; got < want { +				t.Errorf("%q: got timestamp %d but wanted at least %d", table.description, got, want) +			} +			if got := sm.cosignedTreeHead; got != nil { +				t.Errorf("%q: got cosigned tree head but should have none", table.description) +			} +		}() +	} +} + +func TestToCosignTreeHead(t *testing.T) { +	want := &types.SignedTreeHead{} +	sm := StateManagerSingle{ +		signedTreeHead: want, +	} +	sth := sm.ToCosignTreeHead() +	if got := sth; !reflect.DeepEqual(got, want) { +		t.Errorf("got signed tree head\n\t%v\nbut wanted\n\t%v", got, want) +	} +} + +func TestCosignedTreeHead(t *testing.T) { +	want := &types.CosignedTreeHead{ +		Cosignature: make([]types.Signature, 1), +		KeyHash:     make([]merkle.Hash, 1), +	} +	sm := StateManagerSingle{ +		cosignedTreeHead: want, +	} +	cth, err := sm.CosignedTreeHead(context.Background()) +	if err != nil { +		t.Errorf("should not fail with error: %v", err) +		return +	} +	if got := cth; !reflect.DeepEqual(got, want) { +		t.Errorf("got cosigned tree head\n\t%v\nbut wanted\n\t%v", got, want) +	} + +	sm.cosignedTreeHead = nil +	cth, err = sm.CosignedTreeHead(context.Background()) +	if err == nil { +		t.Errorf("should fail without a cosigned tree head") +		return +	} +} + +func TestAddCosignature(t *testing.T) { +	secret, public := mustKeyPair(t) +	for _, table := range []struct { +		desc    string +		signer  crypto.Signer +		vk      types.PublicKey +		wantErr bool +	}{ +		{ +			desc:    "invalid: wrong public key", +			signer:  secret, +			vk:      types.PublicKey{}, +			wantErr: true, +		}, +		{ +			desc:   "valid", +			signer: secret, +			vk:     public, +		}, +	} { +		sm := &StateManagerSingle{ +			namespace:      *merkle.HashFn(nil), +			signedTreeHead: &types.SignedTreeHead{}, +			events:         make(chan *event, 1), +		} +		defer close(sm.events) + +		sth := mustSign(t, table.signer, &sm.signedTreeHead.TreeHead, &sm.namespace) +		ctx := context.Background() +		err := sm.AddCosignature(ctx, &table.vk, &sth.Signature) +		if got, want := err != nil, table.wantErr; got != want { +			t.Errorf("got error %v but wanted %v in test %q: %v", got, want, table.desc, err) +		} +		if err != nil { +			continue +		} + +		ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond) +		defer cancel() +		if err := sm.AddCosignature(ctx, &table.vk, &sth.Signature); err == nil { +			t.Errorf("expected full channel in test %q", table.desc) +		} +		if got, want := len(sm.events), 1; got != want { +			t.Errorf("wanted %d cosignatures but got %d in test %q", want, got, table.desc) +		} +	} +} + +func mustKeyPair(t *testing.T) (crypto.Signer, types.PublicKey) { +	t.Helper() +	vk, sk, err := ed25519.GenerateKey(rand.Reader) +	if err != nil { +		t.Fatal(err) +	} +	var pub types.PublicKey +	copy(pub[:], vk[:]) +	return sk, pub +} + +func mustSign(t *testing.T, s crypto.Signer, th *types.TreeHead, kh *merkle.Hash) *types.SignedTreeHead { +	t.Helper() +	sth, err := th.Sign(s, kh) +	if err != nil { +		t.Fatal(err) +	} +	return sth +} + +func newHashBufferInc(t *testing.T) *merkle.Hash { +	t.Helper() + +	var buf merkle.Hash +	for i := 0; i < len(buf); i++ { +		buf[i] = byte(i) +	} +	return &buf +} +func validConsistencyProof_5_10(t *testing.T) *types.ConsistencyProof { +	t.Helper() +	// # old tree head +	//     tree_size=5 +	//     root_hash=c8e73a8c09e44c344d515eb717e248c5dbf12420908a6d29568197fae7751803 +	// # new tree head +	//     tree_size=10 +	//     root_hash=2a40f11563b45522ca9eccf993c934238a8fbadcf7d7d65be3583ab2584838aa +	r := bytes.NewReader([]byte("consistency_path=fadca95ab8ca34f17c5f3fa719183fe0e5c194a44c25324745388964a743ecce\nconsistency_path=6366fc0c20f9b8a8c089ed210191e401da6c995592eba78125f0ba0ba142ebaf\nconsistency_path=72b8d4f990b555a72d76fb8da075a65234519070cfa42e082026a8c686160349\nconsistency_path=d92714be792598ff55560298cd3ff099dfe5724646282578531c0d0063437c00\nconsistency_path=4b20d58bbae723755304fb179aef6d5f04d755a601884828c62c07929f6bd84a\n")) +	var proof types.ConsistencyProof +	if err := proof.FromASCII(r, 5, 10); err != nil { +		t.Fatal(err) +	} +	return &proof +} + +func hashFromString(t *testing.T, s string) (h merkle.Hash) { +	b, err := hex.Deserialize(s) +	if err != nil { +		t.Fatal(err) +	} +	copy(h[:], b) +	return h +} + +func now(t *testing.T) uint64 { +	t.Helper() +	return uint64(time.Now().Unix()) +} diff --git a/internal/state/state_manager.go b/internal/state/state_manager.go new file mode 100644 index 0000000..60d2af1 --- /dev/null +++ b/internal/state/state_manager.go @@ -0,0 +1,30 @@ +package state + +import ( +	"context" + +	"git.sigsum.org/sigsum-go/pkg/merkle" +	"git.sigsum.org/sigsum-go/pkg/types" +) + +// StateManager coordinates access to a nodes tree heads and (co)signatures. +type StateManager interface { +	// ToCosignTreeHead returns the node's to-cosign tree head +	ToCosignTreeHead() *types.SignedTreeHead + +	// CosignedTreeHead returns the node's cosigned tree head +	CosignedTreeHead(context.Context) (*types.CosignedTreeHead, error) + +	// AddCosignature verifies that a cosignature is valid for the to-cosign +	// tree head before adding it +	AddCosignature(context.Context, *types.PublicKey, *types.Signature) error + +	// Run peridically rotates the node's to-cosign and cosigned tree heads +	Run(context.Context) +} + +// event is a verified cosignature request +type event struct { +	keyHash     *merkle.Hash +	cosignature *types.Signature +} | 
