diff options
Diffstat (limited to 'cmd/sigsum-debug/spam/leaf/worker.go')
-rw-r--r-- | cmd/sigsum-debug/spam/leaf/worker.go | 177 |
1 files changed, 177 insertions, 0 deletions
diff --git a/cmd/sigsum-debug/spam/leaf/worker.go b/cmd/sigsum-debug/spam/leaf/worker.go new file mode 100644 index 0000000..9b0e0b7 --- /dev/null +++ b/cmd/sigsum-debug/spam/leaf/worker.go @@ -0,0 +1,177 @@ +package leaf + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/binary" + "fmt" + "net/http" + "time" + + "git.sigsum.org/sigsum-go/pkg/merkle" + "git.sigsum.org/sigsum-go/pkg/requests" + "git.sigsum.org/sigsum-go/pkg/types" +) + +type worker struct { + Config + inCh chan *event // events that received 202 + outCh chan *event // events that received status 2XX +} + +type event struct { + start time.Time // time that request started + end time.Time // time that request ended with 2XX + + req *http.Request // prepared add-leaf request + got200 bool // true if 200 response + got202 bool // true if at least one 202 responses + + num3xx uint64 // number of encountered 3xx before 2XX + num4xx uint64 // number of encountered 4xx before 2XX + num5xx uint64 // number of encountered 5xx before 2XX +} + +func (w *worker) submit(ctx context.Context) error { + data := make([]byte, 40) + if _, err := rand.Read(data); err != nil { + return fmt.Errorf("generate data: %v", err) + } + + ctr := uint64(0) + for { + ctr += 1 + binary.BigEndian.PutUint64(data[:8], ctr) + req, err := w.newRequest(merkle.HashFn(data)) + if err != nil { + return err + } + + ev := event{req: req, start: time.Now()} + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(w.Wait): + } + + if err := w.doRequest(&ev); err == nil { + break // success + } + } + + select { + case w.outCh <- &ev: + default: + return fmt.Errorf("out channel is full") + } + } +} + +func (w *worker) check(ctx context.Context) error { + // setup a backoff mechanism that doesn't block + next := make(chan struct{}, 1) + backoff := func() { + time.Sleep(w.backoff) + next <- struct{}{} + } + next <- struct{}{} + defer func() { + <-next // empty channel before closing + defer close(next) + }() + + evs := make([]*event, 0, w.maxEvents) + for { + if len(evs) == w.maxEvents { + return fmt.Errorf("checker has too many queued events: %d", w.maxEvents) + } + + select { + case <-ctx.Done(): + return nil + case ev := <-w.inCh: + evs = append(evs, ev) + continue + case <-next: + if len(evs) == 0 { + go backoff() + continue + } + } + + if err := w.doRequest(evs[0]); err != nil { + go backoff() + continue + } + next <- struct{}{} // no backoff after success + + select { + case w.outCh <- evs[0]: + default: + return fmt.Errorf("out channel is full") + } + evs = evs[1:] + } +} + +func (w *worker) newRequest(msg *merkle.Hash) (*http.Request, error) { + stm := types.Statement{ + ShardHint: uint64(time.Now().Unix()), + Checksum: *merkle.HashFn(msg[:]), + } + sig, err := stm.Sign(w.signer) + if err != nil { + return nil, err + } + leaf := requests.Leaf{ + ShardHint: stm.ShardHint, + Message: *msg, + Signature: *sig, + PublicKey: w.pub, + DomainHint: w.DomainHint, + } + buf := bytes.NewBuffer(nil) + if err := leaf.ToASCII(buf); err != nil { + return nil, fmt.Errorf("serialize leaf request: %v", err) + } + req, err := http.NewRequest(http.MethodPost, w.url, buf) + if err != nil { + return nil, fmt.Errorf("create http request: %v", err) + } + return req, nil +} + +func (w *worker) doRequest(ev *event) error { + rsp, err := w.cli.Do(ev.req) + if err != nil { + return err + } + defer rsp.Body.Close() + + if rsp.StatusCode == http.StatusOK { + ev.got200 = true + ev.end = time.Now() + return nil + } else if rsp.StatusCode == http.StatusAccepted { + if !ev.got202 { + ev.got202 = true + ev.end = time.Now() + return nil // first 202 response + } + } else if rsp.StatusCode >= 300 && rsp.StatusCode < 400 { + ev.num3xx += 1 + } else if rsp.StatusCode >= 400 && rsp.StatusCode < 500 { + ev.num4xx += 1 + } else if rsp.StatusCode >= 500 && rsp.StatusCode < 600 { + ev.num5xx += 1 + } + return fmt.Errorf("status %d", err) +} + +func (ev *event) resetCounters() { + ev.num3xx = 0 + ev.num4xx = 0 + ev.num5xx = 0 +} |