diff options
author | Rasmus Dahlberg <rasmus@mullvad.net> | 2022-07-22 18:25:56 +0200 |
---|---|---|
committer | Rasmus Dahlberg <rasmus@mullvad.net> | 2022-07-22 18:25:56 +0200 |
commit | f04e26523189ac3fc54381376966f98d2f52440b (patch) | |
tree | 52f664a52759322947420f7313573e92002fa6f9 /cmd/sigsum-debug/spam | |
parent | 722e48feb2aa85d8cb75069a06024e506367e34f (diff) |
add spam command to sigsum-debugsigsum-spam
Diffstat (limited to 'cmd/sigsum-debug/spam')
-rw-r--r-- | cmd/sigsum-debug/spam/leaf/config.go | 60 | ||||
-rw-r--r-- | cmd/sigsum-debug/spam/leaf/main.go | 88 | ||||
-rw-r--r-- | cmd/sigsum-debug/spam/leaf/stats.go | 104 | ||||
-rw-r--r-- | cmd/sigsum-debug/spam/leaf/worker.go | 177 | ||||
-rw-r--r-- | cmd/sigsum-debug/spam/spam.go | 77 |
5 files changed, 506 insertions, 0 deletions
diff --git a/cmd/sigsum-debug/spam/leaf/config.go b/cmd/sigsum-debug/spam/leaf/config.go new file mode 100644 index 0000000..2e20777 --- /dev/null +++ b/cmd/sigsum-debug/spam/leaf/config.go @@ -0,0 +1,60 @@ +package leaf + +import ( + "crypto" + "crypto/ed25519" + "fmt" + "net/http" + "time" + + "git.sigsum.org/sigsum-go/internal/fmtio" + "git.sigsum.org/sigsum-go/pkg/types" +) + +type Config struct { + LogURL string + PrivateKey string // a private key to sign checksum with in hex + DomainHint string // a domain hint that is valid for the above key + Duration time.Duration // how long to run test + Interval time.Duration // how often to emit stats + Wait time.Duration // time to wait between submits + NumSubmitters uint64 // at least one + NumCheckers uint64 // zero to disable checkers + + url string + signer crypto.Signer + pub types.PublicKey + cli http.Client + + maxEvents int // maximum number of events to queue at a checker + backoff time.Duration // time to backoff when waiting for 200 OK +} + +func (cfg *Config) parse(args []string) (err error) { + if len(args) != 0 { + return fmt.Errorf("trailing arguments: %v", args) + } + if len(cfg.LogURL) == 0 { + return fmt.Errorf("url is a required option") + } + if len(cfg.PrivateKey) == 0 { + return fmt.Errorf("private key is a required option") + } + if len(cfg.DomainHint) == 0 { + return fmt.Errorf("domain hint is a required option") + } + + if cfg.signer, err = fmtio.SignerFromHex(cfg.PrivateKey); err != nil { + return fmt.Errorf("parse private key: %v", err) + } + if cfg.NumSubmitters == 0 { + return fmt.Errorf("at least one submitter is required") + } + + cfg.url = types.EndpointAddLeaf.Path(cfg.LogURL, "sigsum/v0") + cfg.maxEvents = 16384 + cfg.backoff = 5 * time.Second + cfg.cli = http.Client{Timeout: 10 * time.Second} + copy(cfg.pub[:], (cfg.signer.Public().(ed25519.PublicKey))[:]) + return nil +} diff --git a/cmd/sigsum-debug/spam/leaf/main.go b/cmd/sigsum-debug/spam/leaf/main.go new file mode 100644 index 0000000..b5895e2 --- /dev/null +++ b/cmd/sigsum-debug/spam/leaf/main.go @@ -0,0 +1,88 @@ +package leaf + +import ( + "context" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "git.sigsum.org/sigsum-go/pkg/log" +) + +func Main(args []string, cfg Config) error { + if err := cfg.parse(args); err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), cfg.Duration) + s := newStatus() + + var wg sync.WaitGroup + events := make(chan *event, 1024) + defer close(events) + for i := uint64(0); i < cfg.NumSubmitters; i++ { + wg.Add(1) + w := worker{Config: cfg, outCh: events} + go func() { + defer wg.Done() + defer cancel() + if err := w.submit(ctx); err != nil { + log.Fatal("submitter died: %v", err) + } + }() + } + + var checkChs []chan *event + for i := uint64(0); i < cfg.NumCheckers; i++ { + checkCh := make(chan *event, 1024) + checkChs = append(checkChs, checkCh) + defer close(checkCh) + + wg.Add(1) + w := worker{Config: cfg, inCh: checkCh, outCh: events} + go func() { + defer wg.Done() + defer cancel() + if err := w.check(ctx); err != nil { + log.Fatal("checker died: %v", err) + } + }() + } + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + defer close(sigs) + + ticker := time.NewTicker(cfg.Interval) + defer wg.Wait() + defer cancel() + defer ticker.Stop() + + log.Info("Output format is: \n\n%s\n\n", s.format()) + nextChecker := 0 + for { + select { + case <-ctx.Done(): + log.Info("received done signal, closing...") + return nil + case ev := <-events: + s.register(ev) + if cfg.NumCheckers > 0 && !ev.got200 { + select { + case checkChs[nextChecker] <- ev: + default: + log.Fatal("check channel %d is full", nextChecker) + } + nextChecker = (int(nextChecker) + 1) % int(cfg.NumCheckers) + } + case <-ticker.C: + s.rotate(os.Stdout) + case <-sigs: + log.Info("received shutdown signal, closing...") + return nil + } + } + return nil +} diff --git a/cmd/sigsum-debug/spam/leaf/stats.go b/cmd/sigsum-debug/spam/leaf/stats.go new file mode 100644 index 0000000..b66e83a --- /dev/null +++ b/cmd/sigsum-debug/spam/leaf/stats.go @@ -0,0 +1,104 @@ +package leaf + +import ( + "fmt" + "os" + "time" +) + +type samples struct { + num uint64 // number of samples + sum uint64 // total query time (ms) + min uint64 // minimum query time (ms) + max uint64 // maximum query time (ms) +} + +func newSamples() samples { + return samples{min: 18446744073709551615} +} + +func (s *samples) update(sample uint64) { + s.num += 1 + s.sum += sample + if sample < s.min { + s.min = sample + } + if sample > s.max { + s.max = sample + } +} + +func (s *samples) avg() uint64 { + if s.num == 0 { + return 0 + } + return s.sum / s.num +} + +type status struct { + start time.Time // time that measurement started + + num3xx uint64 // number of 3xx responses since last status + num4xx uint64 // number of 4xx responses since last status + num5xx uint64 // number of 5xx responses since last status + + status200 samples // info about 200 responses since last status + status202 samples // info about 202 responses since last status +} + +func newStatus() status { + return status{ + start: time.Now(), + status200: newSamples(), + status202: newSamples(), + } +} + +func (s *status) reset() { + s.num3xx = 0 + s.num4xx = 0 + s.num5xx = 0 + + s.status200 = newSamples() + s.status202 = newSamples() +} + +func (s *status) register(ev *event) { + s.num3xx += ev.num3xx + s.num4xx += ev.num4xx + s.num5xx += ev.num5xx + + delay := uint64(ev.end.Sub(ev.start).Milliseconds()) + if ev.got200 { + s.status200.update(delay) + } else { + s.status202.update(delay) + } + + ev.resetCounters() +} + +func (s *status) rotate(f *os.File) error { + str := fmt.Sprintf("%s ", time.Now().Format(time.RFC3339)) + str += fmt.Sprintf("%d ", uint64(time.Now().Sub(s.start).Milliseconds()/1000)) + str += fmt.Sprintf("%d ", s.status200.num) + str += fmt.Sprintf("%d ", s.status202.num) + str += fmt.Sprintf("%d ", s.num3xx) + str += fmt.Sprintf("%d ", s.num4xx) + str += fmt.Sprintf("%d ", s.num5xx) + str += fmt.Sprintf("%d ", s.status200.min) + str += fmt.Sprintf("%d ", s.status200.avg()) + str += fmt.Sprintf("%d ", s.status200.max) + str += fmt.Sprintf("%d ", s.status202.min) + str += fmt.Sprintf("%d ", s.status202.avg()) + str += fmt.Sprintf("%d\n", s.status202.max) + s.reset() + + _, err := f.WriteString(str) + return err +} + +func (s *status) format() string { + return "Time RelTime Num200 Num202 Num3xx Num4xx Num5xx Min200 Avg200 Max200 Min202 Avg202 Max202\n" + + " (s) ----> observed response delays (ms) <----" +} diff --git a/cmd/sigsum-debug/spam/leaf/worker.go b/cmd/sigsum-debug/spam/leaf/worker.go new file mode 100644 index 0000000..9b0e0b7 --- /dev/null +++ b/cmd/sigsum-debug/spam/leaf/worker.go @@ -0,0 +1,177 @@ +package leaf + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/binary" + "fmt" + "net/http" + "time" + + "git.sigsum.org/sigsum-go/pkg/merkle" + "git.sigsum.org/sigsum-go/pkg/requests" + "git.sigsum.org/sigsum-go/pkg/types" +) + +type worker struct { + Config + inCh chan *event // events that received 202 + outCh chan *event // events that received status 2XX +} + +type event struct { + start time.Time // time that request started + end time.Time // time that request ended with 2XX + + req *http.Request // prepared add-leaf request + got200 bool // true if 200 response + got202 bool // true if at least one 202 responses + + num3xx uint64 // number of encountered 3xx before 2XX + num4xx uint64 // number of encountered 4xx before 2XX + num5xx uint64 // number of encountered 5xx before 2XX +} + +func (w *worker) submit(ctx context.Context) error { + data := make([]byte, 40) + if _, err := rand.Read(data); err != nil { + return fmt.Errorf("generate data: %v", err) + } + + ctr := uint64(0) + for { + ctr += 1 + binary.BigEndian.PutUint64(data[:8], ctr) + req, err := w.newRequest(merkle.HashFn(data)) + if err != nil { + return err + } + + ev := event{req: req, start: time.Now()} + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(w.Wait): + } + + if err := w.doRequest(&ev); err == nil { + break // success + } + } + + select { + case w.outCh <- &ev: + default: + return fmt.Errorf("out channel is full") + } + } +} + +func (w *worker) check(ctx context.Context) error { + // setup a backoff mechanism that doesn't block + next := make(chan struct{}, 1) + backoff := func() { + time.Sleep(w.backoff) + next <- struct{}{} + } + next <- struct{}{} + defer func() { + <-next // empty channel before closing + defer close(next) + }() + + evs := make([]*event, 0, w.maxEvents) + for { + if len(evs) == w.maxEvents { + return fmt.Errorf("checker has too many queued events: %d", w.maxEvents) + } + + select { + case <-ctx.Done(): + return nil + case ev := <-w.inCh: + evs = append(evs, ev) + continue + case <-next: + if len(evs) == 0 { + go backoff() + continue + } + } + + if err := w.doRequest(evs[0]); err != nil { + go backoff() + continue + } + next <- struct{}{} // no backoff after success + + select { + case w.outCh <- evs[0]: + default: + return fmt.Errorf("out channel is full") + } + evs = evs[1:] + } +} + +func (w *worker) newRequest(msg *merkle.Hash) (*http.Request, error) { + stm := types.Statement{ + ShardHint: uint64(time.Now().Unix()), + Checksum: *merkle.HashFn(msg[:]), + } + sig, err := stm.Sign(w.signer) + if err != nil { + return nil, err + } + leaf := requests.Leaf{ + ShardHint: stm.ShardHint, + Message: *msg, + Signature: *sig, + PublicKey: w.pub, + DomainHint: w.DomainHint, + } + buf := bytes.NewBuffer(nil) + if err := leaf.ToASCII(buf); err != nil { + return nil, fmt.Errorf("serialize leaf request: %v", err) + } + req, err := http.NewRequest(http.MethodPost, w.url, buf) + if err != nil { + return nil, fmt.Errorf("create http request: %v", err) + } + return req, nil +} + +func (w *worker) doRequest(ev *event) error { + rsp, err := w.cli.Do(ev.req) + if err != nil { + return err + } + defer rsp.Body.Close() + + if rsp.StatusCode == http.StatusOK { + ev.got200 = true + ev.end = time.Now() + return nil + } else if rsp.StatusCode == http.StatusAccepted { + if !ev.got202 { + ev.got202 = true + ev.end = time.Now() + return nil // first 202 response + } + } else if rsp.StatusCode >= 300 && rsp.StatusCode < 400 { + ev.num3xx += 1 + } else if rsp.StatusCode >= 400 && rsp.StatusCode < 500 { + ev.num4xx += 1 + } else if rsp.StatusCode >= 500 && rsp.StatusCode < 600 { + ev.num5xx += 1 + } + return fmt.Errorf("status %d", err) +} + +func (ev *event) resetCounters() { + ev.num3xx = 0 + ev.num4xx = 0 + ev.num5xx = 0 +} diff --git a/cmd/sigsum-debug/spam/spam.go b/cmd/sigsum-debug/spam/spam.go new file mode 100644 index 0000000..29fde6e --- /dev/null +++ b/cmd/sigsum-debug/spam/spam.go @@ -0,0 +1,77 @@ +package spam + +import ( + "flag" + "fmt" + "log" + "time" + + "git.sigsum.org/sigsum-go/cmd/sigsum-debug/spam/leaf" + "git.sigsum.org/sigsum-go/internal/options" +) + +const usage = ` +sigsum-debug spam sends many requests to a sigsum log + +Usage: + + sigsum-debug spam leaf -u URL -k KEY -h DOMAIN_HINT [-d DURATION] + [-i INTERVAL] [-w WAIT] [-s SUBMITTERS] [-c CHECKERS] + + Sends add-leaf requests to a sigsum log from one or more parallel + submitters. Use the -c option to also check for 200 OK responses. + Unless the -w flag is specified, you then need ~5x more checkers. + + Options: + -u, --log-url URL of a log to spam with add-leaf requests + -k, --private-key Private key to sign checksums with in hex + -h, --domain-hint Domain hint for the specified private key + -d, --duration Duration to run sigsum-spam (Default: 5m) + -i, --interval Duration between emitting stats (Default: 1s) + -w, --wait Time to wait between submits (Default: 0s) + -s, --submitters Number of submitters to use (Default: 1) + -c, --checkers Number of checkers to use (Default: 0) +` + +var ( + leafConfig leaf.Config +) + +func setOptions(fs *flag.FlagSet) { + switch cmd := fs.Name(); cmd { + case "leaf": + options.AddString(fs, &leafConfig.LogURL, "u", "log-url", "") + options.AddString(fs, &leafConfig.PrivateKey, "k", "private-key", "") + options.AddString(fs, &leafConfig.DomainHint, "h", "domain-hint", "") + options.AddDuration(fs, &leafConfig.Duration, "d", "duration", 5*time.Minute) + options.AddDuration(fs, &leafConfig.Interval, "i", "interval", 1*time.Second) + options.AddDuration(fs, &leafConfig.Wait, "w", "wait", 0*time.Second) + options.AddUint64(fs, &leafConfig.NumSubmitters, "s", "submitters", 1) + options.AddUint64(fs, &leafConfig.NumCheckers, "c", "checkers", 0) + } +} + +func Main(args []string) error { + var err error + + opt := options.New(args, func() { log.Printf(usage[1:]) }, setOptions) + if err == nil { + switch opt.Name() { + case "help", "": + opt.Usage() + case "leaf": + err = leaf.Main(opt.Args(), leafConfig) + default: + err = fmt.Errorf("invalid command %q, try \"help\"", opt.Name()) + } + } + if err != nil { + format := " %s: %w" + if len(opt.Name()) == 0 { + format = "%s: %w" + } + err = fmt.Errorf(format, opt.Name(), err) + } + + return err +} |