aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Dahlberg <rasmus@mullvad.net>2022-07-22 18:25:56 +0200
committerRasmus Dahlberg <rasmus@mullvad.net>2022-07-22 18:25:56 +0200
commitf04e26523189ac3fc54381376966f98d2f52440b (patch)
tree52f664a52759322947420f7313573e92002fa6f9
parent722e48feb2aa85d8cb75069a06024e506367e34f (diff)
add spam command to sigsum-debugsigsum-spam
-rw-r--r--cmd/sigsum-debug/main.go7
-rw-r--r--cmd/sigsum-debug/spam/leaf/config.go60
-rw-r--r--cmd/sigsum-debug/spam/leaf/main.go88
-rw-r--r--cmd/sigsum-debug/spam/leaf/stats.go104
-rw-r--r--cmd/sigsum-debug/spam/leaf/worker.go177
-rw-r--r--cmd/sigsum-debug/spam/spam.go77
-rw-r--r--internal/options/options.go7
7 files changed, 518 insertions, 2 deletions
diff --git a/cmd/sigsum-debug/main.go b/cmd/sigsum-debug/main.go
index 2c10be2..af2093c 100644
--- a/cmd/sigsum-debug/main.go
+++ b/cmd/sigsum-debug/main.go
@@ -19,12 +19,12 @@ import (
"git.sigsum.org/sigsum-go/cmd/sigsum-debug/head"
"git.sigsum.org/sigsum-go/cmd/sigsum-debug/key"
"git.sigsum.org/sigsum-go/cmd/sigsum-debug/leaf"
+ "git.sigsum.org/sigsum-go/cmd/sigsum-debug/spam"
"git.sigsum.org/sigsum-go/internal/options"
)
const usage = `
-sigsum-debug is a tool that helps debug sigsum logs on the command-line.
-It is meant to be used in conjuction with other utilities such as curl.
+sigsum-debug is a tool that helps debug sigsum logs on the command-line
Usage:
@@ -32,6 +32,7 @@ Usage:
sigsum-debug key Private and public key utilities
sigsum-debug leaf Hash, sign, and verify tree leaves
sigsum-debug head Sign and verify tree heads
+ sigsum-debug spam Send many requests to a sigsum log
`
@@ -49,6 +50,8 @@ func main() {
err = leaf.Main(opt.Args())
case "head":
err = head.Main(opt.Args())
+ case "spam":
+ err = spam.Main(opt.Args())
default:
err = fmt.Errorf(": invalid command %q, try \"help\"", opt.Name())
}
diff --git a/cmd/sigsum-debug/spam/leaf/config.go b/cmd/sigsum-debug/spam/leaf/config.go
new file mode 100644
index 0000000..2e20777
--- /dev/null
+++ b/cmd/sigsum-debug/spam/leaf/config.go
@@ -0,0 +1,60 @@
+package leaf
+
+import (
+ "crypto"
+ "crypto/ed25519"
+ "fmt"
+ "net/http"
+ "time"
+
+ "git.sigsum.org/sigsum-go/internal/fmtio"
+ "git.sigsum.org/sigsum-go/pkg/types"
+)
+
+type Config struct {
+ LogURL string
+ PrivateKey string // a private key to sign checksum with in hex
+ DomainHint string // a domain hint that is valid for the above key
+ Duration time.Duration // how long to run test
+ Interval time.Duration // how often to emit stats
+ Wait time.Duration // time to wait between submits
+ NumSubmitters uint64 // at least one
+ NumCheckers uint64 // zero to disable checkers
+
+ url string
+ signer crypto.Signer
+ pub types.PublicKey
+ cli http.Client
+
+ maxEvents int // maximum number of events to queue at a checker
+ backoff time.Duration // time to backoff when waiting for 200 OK
+}
+
+func (cfg *Config) parse(args []string) (err error) {
+ if len(args) != 0 {
+ return fmt.Errorf("trailing arguments: %v", args)
+ }
+ if len(cfg.LogURL) == 0 {
+ return fmt.Errorf("url is a required option")
+ }
+ if len(cfg.PrivateKey) == 0 {
+ return fmt.Errorf("private key is a required option")
+ }
+ if len(cfg.DomainHint) == 0 {
+ return fmt.Errorf("domain hint is a required option")
+ }
+
+ if cfg.signer, err = fmtio.SignerFromHex(cfg.PrivateKey); err != nil {
+ return fmt.Errorf("parse private key: %v", err)
+ }
+ if cfg.NumSubmitters == 0 {
+ return fmt.Errorf("at least one submitter is required")
+ }
+
+ cfg.url = types.EndpointAddLeaf.Path(cfg.LogURL, "sigsum/v0")
+ cfg.maxEvents = 16384
+ cfg.backoff = 5 * time.Second
+ cfg.cli = http.Client{Timeout: 10 * time.Second}
+ copy(cfg.pub[:], (cfg.signer.Public().(ed25519.PublicKey))[:])
+ return nil
+}
diff --git a/cmd/sigsum-debug/spam/leaf/main.go b/cmd/sigsum-debug/spam/leaf/main.go
new file mode 100644
index 0000000..b5895e2
--- /dev/null
+++ b/cmd/sigsum-debug/spam/leaf/main.go
@@ -0,0 +1,88 @@
+package leaf
+
+import (
+ "context"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "time"
+
+ "git.sigsum.org/sigsum-go/pkg/log"
+)
+
+func Main(args []string, cfg Config) error {
+ if err := cfg.parse(args); err != nil {
+ return err
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), cfg.Duration)
+ s := newStatus()
+
+ var wg sync.WaitGroup
+ events := make(chan *event, 1024)
+ defer close(events)
+ for i := uint64(0); i < cfg.NumSubmitters; i++ {
+ wg.Add(1)
+ w := worker{Config: cfg, outCh: events}
+ go func() {
+ defer wg.Done()
+ defer cancel()
+ if err := w.submit(ctx); err != nil {
+ log.Fatal("submitter died: %v", err)
+ }
+ }()
+ }
+
+ var checkChs []chan *event
+ for i := uint64(0); i < cfg.NumCheckers; i++ {
+ checkCh := make(chan *event, 1024)
+ checkChs = append(checkChs, checkCh)
+ defer close(checkCh)
+
+ wg.Add(1)
+ w := worker{Config: cfg, inCh: checkCh, outCh: events}
+ go func() {
+ defer wg.Done()
+ defer cancel()
+ if err := w.check(ctx); err != nil {
+ log.Fatal("checker died: %v", err)
+ }
+ }()
+ }
+
+ sigs := make(chan os.Signal, 1)
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+ defer close(sigs)
+
+ ticker := time.NewTicker(cfg.Interval)
+ defer wg.Wait()
+ defer cancel()
+ defer ticker.Stop()
+
+ log.Info("Output format is: \n\n%s\n\n", s.format())
+ nextChecker := 0
+ for {
+ select {
+ case <-ctx.Done():
+ log.Info("received done signal, closing...")
+ return nil
+ case ev := <-events:
+ s.register(ev)
+ if cfg.NumCheckers > 0 && !ev.got200 {
+ select {
+ case checkChs[nextChecker] <- ev:
+ default:
+ log.Fatal("check channel %d is full", nextChecker)
+ }
+ nextChecker = (int(nextChecker) + 1) % int(cfg.NumCheckers)
+ }
+ case <-ticker.C:
+ s.rotate(os.Stdout)
+ case <-sigs:
+ log.Info("received shutdown signal, closing...")
+ return nil
+ }
+ }
+ return nil
+}
diff --git a/cmd/sigsum-debug/spam/leaf/stats.go b/cmd/sigsum-debug/spam/leaf/stats.go
new file mode 100644
index 0000000..b66e83a
--- /dev/null
+++ b/cmd/sigsum-debug/spam/leaf/stats.go
@@ -0,0 +1,104 @@
+package leaf
+
+import (
+ "fmt"
+ "os"
+ "time"
+)
+
+type samples struct {
+ num uint64 // number of samples
+ sum uint64 // total query time (ms)
+ min uint64 // minimum query time (ms)
+ max uint64 // maximum query time (ms)
+}
+
+func newSamples() samples {
+ return samples{min: 18446744073709551615}
+}
+
+func (s *samples) update(sample uint64) {
+ s.num += 1
+ s.sum += sample
+ if sample < s.min {
+ s.min = sample
+ }
+ if sample > s.max {
+ s.max = sample
+ }
+}
+
+func (s *samples) avg() uint64 {
+ if s.num == 0 {
+ return 0
+ }
+ return s.sum / s.num
+}
+
+type status struct {
+ start time.Time // time that measurement started
+
+ num3xx uint64 // number of 3xx responses since last status
+ num4xx uint64 // number of 4xx responses since last status
+ num5xx uint64 // number of 5xx responses since last status
+
+ status200 samples // info about 200 responses since last status
+ status202 samples // info about 202 responses since last status
+}
+
+func newStatus() status {
+ return status{
+ start: time.Now(),
+ status200: newSamples(),
+ status202: newSamples(),
+ }
+}
+
+func (s *status) reset() {
+ s.num3xx = 0
+ s.num4xx = 0
+ s.num5xx = 0
+
+ s.status200 = newSamples()
+ s.status202 = newSamples()
+}
+
+func (s *status) register(ev *event) {
+ s.num3xx += ev.num3xx
+ s.num4xx += ev.num4xx
+ s.num5xx += ev.num5xx
+
+ delay := uint64(ev.end.Sub(ev.start).Milliseconds())
+ if ev.got200 {
+ s.status200.update(delay)
+ } else {
+ s.status202.update(delay)
+ }
+
+ ev.resetCounters()
+}
+
+func (s *status) rotate(f *os.File) error {
+ str := fmt.Sprintf("%s ", time.Now().Format(time.RFC3339))
+ str += fmt.Sprintf("%d ", uint64(time.Now().Sub(s.start).Milliseconds()/1000))
+ str += fmt.Sprintf("%d ", s.status200.num)
+ str += fmt.Sprintf("%d ", s.status202.num)
+ str += fmt.Sprintf("%d ", s.num3xx)
+ str += fmt.Sprintf("%d ", s.num4xx)
+ str += fmt.Sprintf("%d ", s.num5xx)
+ str += fmt.Sprintf("%d ", s.status200.min)
+ str += fmt.Sprintf("%d ", s.status200.avg())
+ str += fmt.Sprintf("%d ", s.status200.max)
+ str += fmt.Sprintf("%d ", s.status202.min)
+ str += fmt.Sprintf("%d ", s.status202.avg())
+ str += fmt.Sprintf("%d\n", s.status202.max)
+ s.reset()
+
+ _, err := f.WriteString(str)
+ return err
+}
+
+func (s *status) format() string {
+ return "Time RelTime Num200 Num202 Num3xx Num4xx Num5xx Min200 Avg200 Max200 Min202 Avg202 Max202\n" +
+ " (s) ----> observed response delays (ms) <----"
+}
diff --git a/cmd/sigsum-debug/spam/leaf/worker.go b/cmd/sigsum-debug/spam/leaf/worker.go
new file mode 100644
index 0000000..9b0e0b7
--- /dev/null
+++ b/cmd/sigsum-debug/spam/leaf/worker.go
@@ -0,0 +1,177 @@
+package leaf
+
+import (
+ "bytes"
+ "context"
+ "crypto/rand"
+ "encoding/binary"
+ "fmt"
+ "net/http"
+ "time"
+
+ "git.sigsum.org/sigsum-go/pkg/merkle"
+ "git.sigsum.org/sigsum-go/pkg/requests"
+ "git.sigsum.org/sigsum-go/pkg/types"
+)
+
+type worker struct {
+ Config
+ inCh chan *event // events that received 202
+ outCh chan *event // events that received status 2XX
+}
+
+type event struct {
+ start time.Time // time that request started
+ end time.Time // time that request ended with 2XX
+
+ req *http.Request // prepared add-leaf request
+ got200 bool // true if 200 response
+ got202 bool // true if at least one 202 responses
+
+ num3xx uint64 // number of encountered 3xx before 2XX
+ num4xx uint64 // number of encountered 4xx before 2XX
+ num5xx uint64 // number of encountered 5xx before 2XX
+}
+
+func (w *worker) submit(ctx context.Context) error {
+ data := make([]byte, 40)
+ if _, err := rand.Read(data); err != nil {
+ return fmt.Errorf("generate data: %v", err)
+ }
+
+ ctr := uint64(0)
+ for {
+ ctr += 1
+ binary.BigEndian.PutUint64(data[:8], ctr)
+ req, err := w.newRequest(merkle.HashFn(data))
+ if err != nil {
+ return err
+ }
+
+ ev := event{req: req, start: time.Now()}
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-time.After(w.Wait):
+ }
+
+ if err := w.doRequest(&ev); err == nil {
+ break // success
+ }
+ }
+
+ select {
+ case w.outCh <- &ev:
+ default:
+ return fmt.Errorf("out channel is full")
+ }
+ }
+}
+
+func (w *worker) check(ctx context.Context) error {
+ // setup a backoff mechanism that doesn't block
+ next := make(chan struct{}, 1)
+ backoff := func() {
+ time.Sleep(w.backoff)
+ next <- struct{}{}
+ }
+ next <- struct{}{}
+ defer func() {
+ <-next // empty channel before closing
+ defer close(next)
+ }()
+
+ evs := make([]*event, 0, w.maxEvents)
+ for {
+ if len(evs) == w.maxEvents {
+ return fmt.Errorf("checker has too many queued events: %d", w.maxEvents)
+ }
+
+ select {
+ case <-ctx.Done():
+ return nil
+ case ev := <-w.inCh:
+ evs = append(evs, ev)
+ continue
+ case <-next:
+ if len(evs) == 0 {
+ go backoff()
+ continue
+ }
+ }
+
+ if err := w.doRequest(evs[0]); err != nil {
+ go backoff()
+ continue
+ }
+ next <- struct{}{} // no backoff after success
+
+ select {
+ case w.outCh <- evs[0]:
+ default:
+ return fmt.Errorf("out channel is full")
+ }
+ evs = evs[1:]
+ }
+}
+
+func (w *worker) newRequest(msg *merkle.Hash) (*http.Request, error) {
+ stm := types.Statement{
+ ShardHint: uint64(time.Now().Unix()),
+ Checksum: *merkle.HashFn(msg[:]),
+ }
+ sig, err := stm.Sign(w.signer)
+ if err != nil {
+ return nil, err
+ }
+ leaf := requests.Leaf{
+ ShardHint: stm.ShardHint,
+ Message: *msg,
+ Signature: *sig,
+ PublicKey: w.pub,
+ DomainHint: w.DomainHint,
+ }
+ buf := bytes.NewBuffer(nil)
+ if err := leaf.ToASCII(buf); err != nil {
+ return nil, fmt.Errorf("serialize leaf request: %v", err)
+ }
+ req, err := http.NewRequest(http.MethodPost, w.url, buf)
+ if err != nil {
+ return nil, fmt.Errorf("create http request: %v", err)
+ }
+ return req, nil
+}
+
+func (w *worker) doRequest(ev *event) error {
+ rsp, err := w.cli.Do(ev.req)
+ if err != nil {
+ return err
+ }
+ defer rsp.Body.Close()
+
+ if rsp.StatusCode == http.StatusOK {
+ ev.got200 = true
+ ev.end = time.Now()
+ return nil
+ } else if rsp.StatusCode == http.StatusAccepted {
+ if !ev.got202 {
+ ev.got202 = true
+ ev.end = time.Now()
+ return nil // first 202 response
+ }
+ } else if rsp.StatusCode >= 300 && rsp.StatusCode < 400 {
+ ev.num3xx += 1
+ } else if rsp.StatusCode >= 400 && rsp.StatusCode < 500 {
+ ev.num4xx += 1
+ } else if rsp.StatusCode >= 500 && rsp.StatusCode < 600 {
+ ev.num5xx += 1
+ }
+ return fmt.Errorf("status %d", err)
+}
+
+func (ev *event) resetCounters() {
+ ev.num3xx = 0
+ ev.num4xx = 0
+ ev.num5xx = 0
+}
diff --git a/cmd/sigsum-debug/spam/spam.go b/cmd/sigsum-debug/spam/spam.go
new file mode 100644
index 0000000..29fde6e
--- /dev/null
+++ b/cmd/sigsum-debug/spam/spam.go
@@ -0,0 +1,77 @@
+package spam
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "time"
+
+ "git.sigsum.org/sigsum-go/cmd/sigsum-debug/spam/leaf"
+ "git.sigsum.org/sigsum-go/internal/options"
+)
+
+const usage = `
+sigsum-debug spam sends many requests to a sigsum log
+
+Usage:
+
+ sigsum-debug spam leaf -u URL -k KEY -h DOMAIN_HINT [-d DURATION]
+ [-i INTERVAL] [-w WAIT] [-s SUBMITTERS] [-c CHECKERS]
+
+ Sends add-leaf requests to a sigsum log from one or more parallel
+ submitters. Use the -c option to also check for 200 OK responses.
+ Unless the -w flag is specified, you then need ~5x more checkers.
+
+ Options:
+ -u, --log-url URL of a log to spam with add-leaf requests
+ -k, --private-key Private key to sign checksums with in hex
+ -h, --domain-hint Domain hint for the specified private key
+ -d, --duration Duration to run sigsum-spam (Default: 5m)
+ -i, --interval Duration between emitting stats (Default: 1s)
+ -w, --wait Time to wait between submits (Default: 0s)
+ -s, --submitters Number of submitters to use (Default: 1)
+ -c, --checkers Number of checkers to use (Default: 0)
+`
+
+var (
+ leafConfig leaf.Config
+)
+
+func setOptions(fs *flag.FlagSet) {
+ switch cmd := fs.Name(); cmd {
+ case "leaf":
+ options.AddString(fs, &leafConfig.LogURL, "u", "log-url", "")
+ options.AddString(fs, &leafConfig.PrivateKey, "k", "private-key", "")
+ options.AddString(fs, &leafConfig.DomainHint, "h", "domain-hint", "")
+ options.AddDuration(fs, &leafConfig.Duration, "d", "duration", 5*time.Minute)
+ options.AddDuration(fs, &leafConfig.Interval, "i", "interval", 1*time.Second)
+ options.AddDuration(fs, &leafConfig.Wait, "w", "wait", 0*time.Second)
+ options.AddUint64(fs, &leafConfig.NumSubmitters, "s", "submitters", 1)
+ options.AddUint64(fs, &leafConfig.NumCheckers, "c", "checkers", 0)
+ }
+}
+
+func Main(args []string) error {
+ var err error
+
+ opt := options.New(args, func() { log.Printf(usage[1:]) }, setOptions)
+ if err == nil {
+ switch opt.Name() {
+ case "help", "":
+ opt.Usage()
+ case "leaf":
+ err = leaf.Main(opt.Args(), leafConfig)
+ default:
+ err = fmt.Errorf("invalid command %q, try \"help\"", opt.Name())
+ }
+ }
+ if err != nil {
+ format := " %s: %w"
+ if len(opt.Name()) == 0 {
+ format = "%s: %w"
+ }
+ err = fmt.Errorf(format, opt.Name(), err)
+ }
+
+ return err
+}
diff --git a/internal/options/options.go b/internal/options/options.go
index 8e4ab0c..d6cbd70 100644
--- a/internal/options/options.go
+++ b/internal/options/options.go
@@ -3,6 +3,7 @@ package options
import (
"flag"
"fmt"
+ "time"
)
const (
@@ -42,6 +43,12 @@ func AddUint64(fs *flag.FlagSet, opt *uint64, short, long string, value uint64)
fs.Uint64Var(opt, long, value, "")
}
+// AddDuration adds a duration option to a flag set
+func AddDuration(fs *flag.FlagSet, opt *time.Duration, short, long string, value time.Duration) {
+ fs.DurationVar(opt, short, value, "")
+ fs.DurationVar(opt, long, value, "")
+}
+
// CheckString checks that a string option has a non-default value
func CheckString(optionName, value string, err error) error {
if err != nil {