aboutsummaryrefslogtreecommitdiff
path: root/internal/state
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordberg.se>2022-05-24 23:33:38 +0200
committerRasmus Dahlberg <rasmus@mullvad.net>2022-06-23 11:33:17 +0200
commit559bccccd40d028e412d9f11709ded0250ba6dcd (patch)
tree50f3193dbe70fec21357963c11e5f663013f4b4c /internal/state
parent4b20ef0c1732bcef633c0ed7104501898aa84e2c (diff)
implement primary and secondary role, for replicationv0.5.0
Diffstat (limited to 'internal/state')
-rw-r--r--internal/state/single.go265
-rw-r--r--internal/state/single_test.go233
-rw-r--r--internal/state/state_manager.go30
3 files changed, 528 insertions, 0 deletions
diff --git a/internal/state/single.go b/internal/state/single.go
new file mode 100644
index 0000000..fd73b3f
--- /dev/null
+++ b/internal/state/single.go
@@ -0,0 +1,265 @@
+package state
+
+import (
+ "context"
+ "crypto"
+ "crypto/ed25519"
+ "fmt"
+ "sync"
+ "time"
+
+ "git.sigsum.org/log-go/internal/db"
+ "git.sigsum.org/sigsum-go/pkg/client"
+ "git.sigsum.org/sigsum-go/pkg/log"
+ "git.sigsum.org/sigsum-go/pkg/merkle"
+ "git.sigsum.org/sigsum-go/pkg/requests"
+ "git.sigsum.org/sigsum-go/pkg/types"
+)
+
+// StateManagerSingle implements a single-instance StateManagerPrimary for primary nodes
+type StateManagerSingle struct {
+ client db.Client
+ signer crypto.Signer
+ namespace merkle.Hash
+ interval time.Duration
+ deadline time.Duration
+ secondary client.Client
+
+ // Lock-protected access to pointers. A write lock is only obtained once
+ // per interval when doing pointer rotation. All endpoints are readers.
+ sync.RWMutex
+ signedTreeHead *types.SignedTreeHead
+ cosignedTreeHead *types.CosignedTreeHead
+
+ // Syncronized and deduplicated witness cosignatures for signedTreeHead
+ events chan *event
+ cosignatures map[merkle.Hash]*types.Signature
+}
+
+// NewStateManagerSingle() sets up a new state manager, in particular its
+// signedTreeHead. An optional secondary node can be used to ensure that
+// a newer primary tree is not signed unless it has been replicated.
+func NewStateManagerSingle(dbcli db.Client, signer crypto.Signer, interval, deadline time.Duration, secondary client.Client) (*StateManagerSingle, error) {
+ sm := &StateManagerSingle{
+ client: dbcli,
+ signer: signer,
+ namespace: *merkle.HashFn(signer.Public().(ed25519.PublicKey)),
+ interval: interval,
+ deadline: deadline,
+ secondary: secondary,
+ }
+ sth, err := sm.restoreTreeHead()
+ if err != nil {
+ return nil, fmt.Errorf("restore signed tree head: %v", err)
+ }
+ sm.signedTreeHead = sth
+
+ ictx, cancel := context.WithTimeout(context.Background(), sm.deadline)
+ defer cancel()
+ return sm, sm.tryRotate(ictx)
+}
+
+func (sm *StateManagerSingle) ToCosignTreeHead() *types.SignedTreeHead {
+ sm.RLock()
+ defer sm.RUnlock()
+ return sm.signedTreeHead
+}
+
+func (sm *StateManagerSingle) CosignedTreeHead(_ context.Context) (*types.CosignedTreeHead, error) {
+ sm.RLock()
+ defer sm.RUnlock()
+ if sm.cosignedTreeHead == nil {
+ return nil, fmt.Errorf("no cosignatures available")
+ }
+ return sm.cosignedTreeHead, nil
+}
+
+func (sm *StateManagerSingle) AddCosignature(ctx context.Context, pub *types.PublicKey, sig *types.Signature) error {
+ sm.RLock()
+ defer sm.RUnlock()
+
+ msg := sm.signedTreeHead.TreeHead.ToBinary(&sm.namespace)
+ if !ed25519.Verify(ed25519.PublicKey(pub[:]), msg, sig[:]) {
+ return fmt.Errorf("invalid cosignature")
+ }
+ select {
+ case sm.events <- &event{merkle.HashFn(pub[:]), sig}:
+ return nil
+ case <-ctx.Done():
+ return fmt.Errorf("request timeout")
+ }
+}
+
+func (sm *StateManagerSingle) Run(ctx context.Context) {
+ sm.events = make(chan *event, 4096)
+ defer close(sm.events)
+ ticker := time.NewTicker(sm.interval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ ictx, cancel := context.WithTimeout(ctx, sm.deadline)
+ defer cancel()
+ if err := sm.tryRotate(ictx); err != nil {
+ log.Warning("failed rotating tree heads: %v", err)
+ }
+ case ev := <-sm.events:
+ sm.handleEvent(ev)
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (sm *StateManagerSingle) tryRotate(ctx context.Context) error {
+ th, err := sm.client.GetTreeHead(ctx)
+ if err != nil {
+ return fmt.Errorf("get tree head: %v", err)
+ }
+ nextSTH, err := sm.chooseTree(ctx, th).Sign(sm.signer, &sm.namespace)
+ if err != nil {
+ return fmt.Errorf("sign tree head: %v", err)
+ }
+ log.Debug("wanted to advance to size %d, chose size %d", th.TreeSize, nextSTH.TreeSize)
+
+ sm.rotate(nextSTH)
+ return nil
+}
+
+// chooseTree picks a tree to publish, taking the state of a possible secondary node into account.
+func (sm *StateManagerSingle) chooseTree(ctx context.Context, proposedTreeHead *types.TreeHead) *types.TreeHead {
+ if !sm.secondary.Initiated() {
+ return proposedTreeHead
+ }
+
+ secSTH, err := sm.secondary.GetToCosignTreeHead(ctx)
+ if err != nil {
+ log.Warning("failed fetching tree head from secondary: %v", err)
+ return refreshTreeHead(sm.signedTreeHead.TreeHead)
+ }
+ if secSTH.TreeSize > proposedTreeHead.TreeSize {
+ log.Error("secondary is ahead of us: %d > %d", secSTH.TreeSize, proposedTreeHead.TreeSize)
+ return refreshTreeHead(sm.signedTreeHead.TreeHead)
+ }
+
+ if secSTH.TreeSize == proposedTreeHead.TreeSize {
+ if secSTH.RootHash != proposedTreeHead.RootHash {
+ log.Error("secondary root hash doesn't match our root hash at tree size %d", secSTH.TreeSize)
+ return refreshTreeHead(sm.signedTreeHead.TreeHead)
+ }
+ log.Debug("secondary is up-to-date with matching tree head, using proposed tree, size %d", proposedTreeHead.TreeSize)
+ return proposedTreeHead
+ }
+ //
+ // Now we know that the proposed tree size is larger than the secondary's tree size.
+ // We also now that the secondary's minimum tree size is 0.
+ // This means that the proposed tree size is at least 1.
+ //
+ // Case 1: secondary tree size is 0, primary tree size is >0 --> return based on what we signed before
+ // Case 2: secondary tree size is 1, primary tree size is >1 --> fetch consistency proof, if ok ->
+ // 2a) secondary tree size is smaller than or equal to what we than signed before -> return whatever we signed before
+ // 2b) secondary tree size is larger than what we signed before -> return secondary tree head
+ //
+ // (If not ok in case 2, return based on what we signed before)
+ //
+ if secSTH.TreeSize == 0 {
+ return refreshTreeHead(sm.signedTreeHead.TreeHead)
+ }
+ if err := sm.verifyConsistencyWithLatest(ctx, secSTH.TreeHead); err != nil {
+ log.Error("secondaries tree not consistent with ours: %v", err)
+ return refreshTreeHead(sm.signedTreeHead.TreeHead)
+ }
+ if secSTH.TreeSize <= sm.signedTreeHead.TreeSize {
+ log.Warning("secondary is behind what primary already signed: %d <= %d", secSTH.TreeSize, sm.signedTreeHead.TreeSize)
+ return refreshTreeHead(sm.signedTreeHead.TreeHead)
+ }
+
+ log.Debug("using latest tree head from secondary: size %d", secSTH.TreeSize)
+ return refreshTreeHead(secSTH.TreeHead)
+}
+
+func (sm *StateManagerSingle) verifyConsistencyWithLatest(ctx context.Context, to types.TreeHead) error {
+ from := sm.signedTreeHead.TreeHead
+ req := &requests.ConsistencyProof{
+ OldSize: from.TreeSize,
+ NewSize: to.TreeSize,
+ }
+ proof, err := sm.client.GetConsistencyProof(ctx, req)
+ if err != nil {
+ return fmt.Errorf("unable to get consistency proof from %d to %d: %w", req.OldSize, req.NewSize, err)
+ }
+ if err := proof.Verify(&from.RootHash, &to.RootHash); err != nil {
+ return fmt.Errorf("invalid consistency proof from %d to %d: %v", req.OldSize, req.NewSize, err)
+ }
+ log.Debug("consistency proof from %d to %d verified", req.OldSize, req.NewSize)
+ return nil
+}
+
+func (sm *StateManagerSingle) rotate(nextSTH *types.SignedTreeHead) {
+ sm.Lock()
+ defer sm.Unlock()
+
+ log.Debug("about to rotate tree heads, next at %d: %s", nextSTH.TreeSize, sm.treeStatusString())
+ sm.handleEvents()
+ sm.setCosignedTreeHead()
+ sm.setToCosignTreeHead(nextSTH)
+ log.Debug("tree heads rotated: %s", sm.treeStatusString())
+}
+
+func (sm *StateManagerSingle) handleEvents() {
+ log.Debug("handling any outstanding events")
+ for i, n := 0, len(sm.events); i < n; i++ {
+ sm.handleEvent(<-sm.events)
+ }
+}
+
+func (sm *StateManagerSingle) handleEvent(ev *event) {
+ log.Debug("handling event from witness %x", ev.keyHash[:])
+ sm.cosignatures[*ev.keyHash] = ev.cosignature
+}
+
+func (sm *StateManagerSingle) setCosignedTreeHead() {
+ n := len(sm.cosignatures)
+ if n == 0 {
+ sm.cosignedTreeHead = nil
+ return
+ }
+
+ var cth types.CosignedTreeHead
+ cth.SignedTreeHead = *sm.signedTreeHead
+ cth.Cosignature = make([]types.Signature, 0, n)
+ cth.KeyHash = make([]merkle.Hash, 0, n)
+ for keyHash, cosignature := range sm.cosignatures {
+ cth.KeyHash = append(cth.KeyHash, keyHash)
+ cth.Cosignature = append(cth.Cosignature, *cosignature)
+ }
+ sm.cosignedTreeHead = &cth
+}
+
+func (sm *StateManagerSingle) setToCosignTreeHead(nextSTH *types.SignedTreeHead) {
+ sm.cosignatures = make(map[merkle.Hash]*types.Signature)
+ sm.signedTreeHead = nextSTH
+}
+
+func (sm *StateManagerSingle) treeStatusString() string {
+ var cosigned uint64
+ if sm.cosignedTreeHead != nil {
+ cosigned = sm.cosignedTreeHead.TreeSize
+ }
+ return fmt.Sprintf("signed at %d, cosigned at %d", sm.signedTreeHead.TreeSize, cosigned)
+}
+
+func (sm *StateManagerSingle) restoreTreeHead() (*types.SignedTreeHead, error) {
+ th := zeroTreeHead() // TODO: restore from disk, stored when advanced the tree; zero tree head if "bootstrap"
+ return refreshTreeHead(*th).Sign(sm.signer, &sm.namespace)
+}
+
+func zeroTreeHead() *types.TreeHead {
+ return refreshTreeHead(types.TreeHead{RootHash: *merkle.HashFn([]byte(""))})
+}
+
+func refreshTreeHead(th types.TreeHead) *types.TreeHead {
+ th.Timestamp = uint64(time.Now().Unix())
+ return &th
+}
diff --git a/internal/state/single_test.go b/internal/state/single_test.go
new file mode 100644
index 0000000..9442fdc
--- /dev/null
+++ b/internal/state/single_test.go
@@ -0,0 +1,233 @@
+package state
+
+import (
+ "bytes"
+ "context"
+ "crypto"
+ "crypto/ed25519"
+ "crypto/rand"
+ "fmt"
+ "io"
+ "reflect"
+ "testing"
+ "time"
+
+ mocksClient "git.sigsum.org/log-go/internal/mocks/client"
+ mocksDB "git.sigsum.org/log-go/internal/mocks/db"
+ "git.sigsum.org/sigsum-go/pkg/hex"
+ "git.sigsum.org/sigsum-go/pkg/merkle"
+ "git.sigsum.org/sigsum-go/pkg/types"
+ "github.com/golang/mock/gomock"
+)
+
+// TestSigner implements the signer interface. It can be used to mock
+// an Ed25519 signer that always return the same public key,
+// signature, and error.
+// NOTE: Code duplication with internal/node/secondary/endpoint_internal_test.go
+type TestSigner struct {
+ PublicKey [ed25519.PublicKeySize]byte
+ Signature [ed25519.SignatureSize]byte
+ Error error
+}
+
+func (ts *TestSigner) Public() crypto.PublicKey {
+ return ed25519.PublicKey(ts.PublicKey[:])
+}
+
+func (ts *TestSigner) Sign(rand io.Reader, digest []byte, opts crypto.SignerOpts) ([]byte, error) {
+ return ts.Signature[:], ts.Error
+}
+
+func TestNewStateManagerSingle(t *testing.T) {
+ signerOk := &TestSigner{types.PublicKey{}, types.Signature{}, nil}
+ signerErr := &TestSigner{types.PublicKey{}, types.Signature{}, fmt.Errorf("err")}
+ for _, table := range []struct {
+ description string
+ signer crypto.Signer
+ thExp bool
+ thErr error
+ th types.TreeHead
+ secExp bool
+ wantErr bool
+ }{
+ {"invalid: signer failure", signerErr, false, nil, types.TreeHead{}, false, true},
+ {"valid", signerOk, true, nil, types.TreeHead{Timestamp: now(t)}, true, false},
+ } {
+ func() {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ trillianClient := mocksDB.NewMockClient(ctrl)
+ if table.thExp {
+ trillianClient.EXPECT().GetTreeHead(gomock.Any()).Return(&table.th, table.thErr)
+ }
+ secondary := mocksClient.NewMockClient(ctrl)
+ if table.secExp {
+ secondary.EXPECT().Initiated().Return(false)
+ }
+
+ sm, err := NewStateManagerSingle(trillianClient, table.signer, time.Duration(0), time.Duration(0), secondary)
+ if got, want := err != nil, table.description != "valid"; got != want {
+ t.Errorf("got error %v but wanted %v in test %q: %v", got, want, table.description, err)
+ }
+ if err != nil {
+ return
+ }
+
+ if got, want := sm.signedTreeHead.TreeSize, table.th.TreeSize; got != want {
+ t.Errorf("%q: got tree size %d but wanted %d", table.description, got, want)
+ }
+ if got, want := sm.signedTreeHead.RootHash[:], table.th.RootHash[:]; !bytes.Equal(got, want) {
+ t.Errorf("%q: got tree size %v but wanted %v", table.description, got, want)
+ }
+ if got, want := sm.signedTreeHead.Timestamp, table.th.Timestamp; got < want {
+ t.Errorf("%q: got timestamp %d but wanted at least %d", table.description, got, want)
+ }
+ if got := sm.cosignedTreeHead; got != nil {
+ t.Errorf("%q: got cosigned tree head but should have none", table.description)
+ }
+ }()
+ }
+}
+
+func TestToCosignTreeHead(t *testing.T) {
+ want := &types.SignedTreeHead{}
+ sm := StateManagerSingle{
+ signedTreeHead: want,
+ }
+ sth := sm.ToCosignTreeHead()
+ if got := sth; !reflect.DeepEqual(got, want) {
+ t.Errorf("got signed tree head\n\t%v\nbut wanted\n\t%v", got, want)
+ }
+}
+
+func TestCosignedTreeHead(t *testing.T) {
+ want := &types.CosignedTreeHead{
+ Cosignature: make([]types.Signature, 1),
+ KeyHash: make([]merkle.Hash, 1),
+ }
+ sm := StateManagerSingle{
+ cosignedTreeHead: want,
+ }
+ cth, err := sm.CosignedTreeHead(context.Background())
+ if err != nil {
+ t.Errorf("should not fail with error: %v", err)
+ return
+ }
+ if got := cth; !reflect.DeepEqual(got, want) {
+ t.Errorf("got cosigned tree head\n\t%v\nbut wanted\n\t%v", got, want)
+ }
+
+ sm.cosignedTreeHead = nil
+ cth, err = sm.CosignedTreeHead(context.Background())
+ if err == nil {
+ t.Errorf("should fail without a cosigned tree head")
+ return
+ }
+}
+
+func TestAddCosignature(t *testing.T) {
+ secret, public := mustKeyPair(t)
+ for _, table := range []struct {
+ desc string
+ signer crypto.Signer
+ vk types.PublicKey
+ wantErr bool
+ }{
+ {
+ desc: "invalid: wrong public key",
+ signer: secret,
+ vk: types.PublicKey{},
+ wantErr: true,
+ },
+ {
+ desc: "valid",
+ signer: secret,
+ vk: public,
+ },
+ } {
+ sm := &StateManagerSingle{
+ namespace: *merkle.HashFn(nil),
+ signedTreeHead: &types.SignedTreeHead{},
+ events: make(chan *event, 1),
+ }
+ defer close(sm.events)
+
+ sth := mustSign(t, table.signer, &sm.signedTreeHead.TreeHead, &sm.namespace)
+ ctx := context.Background()
+ err := sm.AddCosignature(ctx, &table.vk, &sth.Signature)
+ if got, want := err != nil, table.wantErr; got != want {
+ t.Errorf("got error %v but wanted %v in test %q: %v", got, want, table.desc, err)
+ }
+ if err != nil {
+ continue
+ }
+
+ ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
+ defer cancel()
+ if err := sm.AddCosignature(ctx, &table.vk, &sth.Signature); err == nil {
+ t.Errorf("expected full channel in test %q", table.desc)
+ }
+ if got, want := len(sm.events), 1; got != want {
+ t.Errorf("wanted %d cosignatures but got %d in test %q", want, got, table.desc)
+ }
+ }
+}
+
+func mustKeyPair(t *testing.T) (crypto.Signer, types.PublicKey) {
+ t.Helper()
+ vk, sk, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatal(err)
+ }
+ var pub types.PublicKey
+ copy(pub[:], vk[:])
+ return sk, pub
+}
+
+func mustSign(t *testing.T, s crypto.Signer, th *types.TreeHead, kh *merkle.Hash) *types.SignedTreeHead {
+ t.Helper()
+ sth, err := th.Sign(s, kh)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return sth
+}
+
+func newHashBufferInc(t *testing.T) *merkle.Hash {
+ t.Helper()
+
+ var buf merkle.Hash
+ for i := 0; i < len(buf); i++ {
+ buf[i] = byte(i)
+ }
+ return &buf
+}
+func validConsistencyProof_5_10(t *testing.T) *types.ConsistencyProof {
+ t.Helper()
+ // # old tree head
+ // tree_size=5
+ // root_hash=c8e73a8c09e44c344d515eb717e248c5dbf12420908a6d29568197fae7751803
+ // # new tree head
+ // tree_size=10
+ // root_hash=2a40f11563b45522ca9eccf993c934238a8fbadcf7d7d65be3583ab2584838aa
+ r := bytes.NewReader([]byte("consistency_path=fadca95ab8ca34f17c5f3fa719183fe0e5c194a44c25324745388964a743ecce\nconsistency_path=6366fc0c20f9b8a8c089ed210191e401da6c995592eba78125f0ba0ba142ebaf\nconsistency_path=72b8d4f990b555a72d76fb8da075a65234519070cfa42e082026a8c686160349\nconsistency_path=d92714be792598ff55560298cd3ff099dfe5724646282578531c0d0063437c00\nconsistency_path=4b20d58bbae723755304fb179aef6d5f04d755a601884828c62c07929f6bd84a\n"))
+ var proof types.ConsistencyProof
+ if err := proof.FromASCII(r, 5, 10); err != nil {
+ t.Fatal(err)
+ }
+ return &proof
+}
+
+func hashFromString(t *testing.T, s string) (h merkle.Hash) {
+ b, err := hex.Deserialize(s)
+ if err != nil {
+ t.Fatal(err)
+ }
+ copy(h[:], b)
+ return h
+}
+
+func now(t *testing.T) uint64 {
+ t.Helper()
+ return uint64(time.Now().Unix())
+}
diff --git a/internal/state/state_manager.go b/internal/state/state_manager.go
new file mode 100644
index 0000000..60d2af1
--- /dev/null
+++ b/internal/state/state_manager.go
@@ -0,0 +1,30 @@
+package state
+
+import (
+ "context"
+
+ "git.sigsum.org/sigsum-go/pkg/merkle"
+ "git.sigsum.org/sigsum-go/pkg/types"
+)
+
+// StateManager coordinates access to a nodes tree heads and (co)signatures.
+type StateManager interface {
+ // ToCosignTreeHead returns the node's to-cosign tree head
+ ToCosignTreeHead() *types.SignedTreeHead
+
+ // CosignedTreeHead returns the node's cosigned tree head
+ CosignedTreeHead(context.Context) (*types.CosignedTreeHead, error)
+
+ // AddCosignature verifies that a cosignature is valid for the to-cosign
+ // tree head before adding it
+ AddCosignature(context.Context, *types.PublicKey, *types.Signature) error
+
+ // Run peridically rotates the node's to-cosign and cosigned tree heads
+ Run(context.Context)
+}
+
+// event is a verified cosignature request
+type event struct {
+ keyHash *merkle.Hash
+ cosignature *types.Signature
+}