diff options
Diffstat (limited to 'cmd/sigsum-debug/spam/leaf')
| -rw-r--r-- | cmd/sigsum-debug/spam/leaf/config.go | 60 | ||||
| -rw-r--r-- | cmd/sigsum-debug/spam/leaf/main.go | 88 | ||||
| -rw-r--r-- | cmd/sigsum-debug/spam/leaf/stats.go | 104 | ||||
| -rw-r--r-- | cmd/sigsum-debug/spam/leaf/worker.go | 177 | 
4 files changed, 429 insertions, 0 deletions
| diff --git a/cmd/sigsum-debug/spam/leaf/config.go b/cmd/sigsum-debug/spam/leaf/config.go new file mode 100644 index 0000000..2e20777 --- /dev/null +++ b/cmd/sigsum-debug/spam/leaf/config.go @@ -0,0 +1,60 @@ +package leaf + +import ( +	"crypto" +	"crypto/ed25519" +	"fmt" +	"net/http" +	"time" + +	"git.sigsum.org/sigsum-go/internal/fmtio" +	"git.sigsum.org/sigsum-go/pkg/types" +) + +type Config struct { +	LogURL        string +	PrivateKey    string        // a private key to sign checksum with in hex +	DomainHint    string        // a domain hint that is valid for the above key +	Duration      time.Duration // how long to run test +	Interval      time.Duration // how often to emit stats +	Wait          time.Duration // time to wait between submits +	NumSubmitters uint64        // at least one +	NumCheckers   uint64        // zero to disable checkers + +	url    string +	signer crypto.Signer +	pub    types.PublicKey +	cli    http.Client + +	maxEvents int           // maximum number of events to queue at a checker +	backoff   time.Duration // time to backoff when waiting for 200 OK +} + +func (cfg *Config) parse(args []string) (err error) { +	if len(args) != 0 { +		return fmt.Errorf("trailing arguments: %v", args) +	} +	if len(cfg.LogURL) == 0 { +		return fmt.Errorf("url is a required option") +	} +	if len(cfg.PrivateKey) == 0 { +		return fmt.Errorf("private key is a required option") +	} +	if len(cfg.DomainHint) == 0 { +		return fmt.Errorf("domain hint is a required option") +	} + +	if cfg.signer, err = fmtio.SignerFromHex(cfg.PrivateKey); err != nil { +		return fmt.Errorf("parse private key: %v", err) +	} +	if cfg.NumSubmitters == 0 { +		return fmt.Errorf("at least one submitter is required") +	} + +	cfg.url = types.EndpointAddLeaf.Path(cfg.LogURL, "sigsum/v0") +	cfg.maxEvents = 16384 +	cfg.backoff = 5 * time.Second +	cfg.cli = http.Client{Timeout: 10 * time.Second} +	copy(cfg.pub[:], (cfg.signer.Public().(ed25519.PublicKey))[:]) +	return nil +} diff --git a/cmd/sigsum-debug/spam/leaf/main.go b/cmd/sigsum-debug/spam/leaf/main.go new file mode 100644 index 0000000..b5895e2 --- /dev/null +++ b/cmd/sigsum-debug/spam/leaf/main.go @@ -0,0 +1,88 @@ +package leaf + +import ( +	"context" +	"os" +	"os/signal" +	"sync" +	"syscall" +	"time" + +	"git.sigsum.org/sigsum-go/pkg/log" +) + +func Main(args []string, cfg Config) error { +	if err := cfg.parse(args); err != nil { +		return err +	} + +	ctx, cancel := context.WithTimeout(context.Background(), cfg.Duration) +	s := newStatus() + +	var wg sync.WaitGroup +	events := make(chan *event, 1024) +	defer close(events) +	for i := uint64(0); i < cfg.NumSubmitters; i++ { +		wg.Add(1) +		w := worker{Config: cfg, outCh: events} +		go func() { +			defer wg.Done() +			defer cancel() +			if err := w.submit(ctx); err != nil { +				log.Fatal("submitter died: %v", err) +			} +		}() +	} + +	var checkChs []chan *event +	for i := uint64(0); i < cfg.NumCheckers; i++ { +		checkCh := make(chan *event, 1024) +		checkChs = append(checkChs, checkCh) +		defer close(checkCh) + +		wg.Add(1) +		w := worker{Config: cfg, inCh: checkCh, outCh: events} +		go func() { +			defer wg.Done() +			defer cancel() +			if err := w.check(ctx); err != nil { +				log.Fatal("checker died: %v", err) +			} +		}() +	} + +	sigs := make(chan os.Signal, 1) +	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) +	defer close(sigs) + +	ticker := time.NewTicker(cfg.Interval) +	defer wg.Wait() +	defer cancel() +	defer ticker.Stop() + +	log.Info("Output format is: \n\n%s\n\n", s.format()) +	nextChecker := 0 +	for { +		select { +		case <-ctx.Done(): +			log.Info("received done signal, closing...") +			return nil +		case ev := <-events: +			s.register(ev) +			if cfg.NumCheckers > 0 && !ev.got200 { +				select { +				case checkChs[nextChecker] <- ev: +				default: +					log.Fatal("check channel %d is full", nextChecker) +				} +				nextChecker = (int(nextChecker) + 1) % int(cfg.NumCheckers) +			} +		case <-ticker.C: +			s.rotate(os.Stdout) +		case <-sigs: +			log.Info("received shutdown signal, closing...") +			return nil +		} +	} +	return nil +} diff --git a/cmd/sigsum-debug/spam/leaf/stats.go b/cmd/sigsum-debug/spam/leaf/stats.go new file mode 100644 index 0000000..b66e83a --- /dev/null +++ b/cmd/sigsum-debug/spam/leaf/stats.go @@ -0,0 +1,104 @@ +package leaf + +import ( +	"fmt" +	"os" +	"time" +) + +type samples struct { +	num uint64 // number of samples +	sum uint64 // total query time (ms) +	min uint64 // minimum query time (ms) +	max uint64 // maximum query time (ms) +} + +func newSamples() samples { +	return samples{min: 18446744073709551615} +} + +func (s *samples) update(sample uint64) { +	s.num += 1 +	s.sum += sample +	if sample < s.min { +		s.min = sample +	} +	if sample > s.max { +		s.max = sample +	} +} + +func (s *samples) avg() uint64 { +	if s.num == 0 { +		return 0 +	} +	return s.sum / s.num +} + +type status struct { +	start time.Time // time that measurement started + +	num3xx uint64 // number of 3xx responses since last status +	num4xx uint64 // number of 4xx responses since last status +	num5xx uint64 // number of 5xx responses since last status + +	status200 samples // info about 200 responses since last status +	status202 samples // info about 202 responses since last status +} + +func newStatus() status { +	return status{ +		start:     time.Now(), +		status200: newSamples(), +		status202: newSamples(), +	} +} + +func (s *status) reset() { +	s.num3xx = 0 +	s.num4xx = 0 +	s.num5xx = 0 + +	s.status200 = newSamples() +	s.status202 = newSamples() +} + +func (s *status) register(ev *event) { +	s.num3xx += ev.num3xx +	s.num4xx += ev.num4xx +	s.num5xx += ev.num5xx + +	delay := uint64(ev.end.Sub(ev.start).Milliseconds()) +	if ev.got200 { +		s.status200.update(delay) +	} else { +		s.status202.update(delay) +	} + +	ev.resetCounters() +} + +func (s *status) rotate(f *os.File) error { +	str := fmt.Sprintf("%s ", time.Now().Format(time.RFC3339)) +	str += fmt.Sprintf("%d ", uint64(time.Now().Sub(s.start).Milliseconds()/1000)) +	str += fmt.Sprintf("%d ", s.status200.num) +	str += fmt.Sprintf("%d ", s.status202.num) +	str += fmt.Sprintf("%d ", s.num3xx) +	str += fmt.Sprintf("%d ", s.num4xx) +	str += fmt.Sprintf("%d ", s.num5xx) +	str += fmt.Sprintf("%d ", s.status200.min) +	str += fmt.Sprintf("%d ", s.status200.avg()) +	str += fmt.Sprintf("%d ", s.status200.max) +	str += fmt.Sprintf("%d ", s.status202.min) +	str += fmt.Sprintf("%d ", s.status202.avg()) +	str += fmt.Sprintf("%d\n", s.status202.max) +	s.reset() + +	_, err := f.WriteString(str) +	return err +} + +func (s *status) format() string { +	return "Time RelTime Num200 Num202 Num3xx Num4xx Num5xx Min200 Avg200 Max200 Min202 Avg202 Max202\n" + +		"      (s)                                       ----> observed response delays (ms) <----" +} diff --git a/cmd/sigsum-debug/spam/leaf/worker.go b/cmd/sigsum-debug/spam/leaf/worker.go new file mode 100644 index 0000000..9b0e0b7 --- /dev/null +++ b/cmd/sigsum-debug/spam/leaf/worker.go @@ -0,0 +1,177 @@ +package leaf + +import ( +	"bytes" +	"context" +	"crypto/rand" +	"encoding/binary" +	"fmt" +	"net/http" +	"time" + +	"git.sigsum.org/sigsum-go/pkg/merkle" +	"git.sigsum.org/sigsum-go/pkg/requests" +	"git.sigsum.org/sigsum-go/pkg/types" +) + +type worker struct { +	Config +	inCh  chan *event // events that received 202 +	outCh chan *event // events that received status 2XX +} + +type event struct { +	start time.Time // time that request started +	end   time.Time // time that request ended with 2XX + +	req    *http.Request // prepared add-leaf request +	got200 bool          // true if 200 response +	got202 bool          // true if at least one 202 responses + +	num3xx uint64 // number of encountered 3xx before 2XX +	num4xx uint64 // number of encountered 4xx before 2XX +	num5xx uint64 // number of encountered 5xx before 2XX +} + +func (w *worker) submit(ctx context.Context) error { +	data := make([]byte, 40) +	if _, err := rand.Read(data); err != nil { +		return fmt.Errorf("generate data: %v", err) +	} + +	ctr := uint64(0) +	for { +		ctr += 1 +		binary.BigEndian.PutUint64(data[:8], ctr) +		req, err := w.newRequest(merkle.HashFn(data)) +		if err != nil { +			return err +		} + +		ev := event{req: req, start: time.Now()} +		for { +			select { +			case <-ctx.Done(): +				return nil +			case <-time.After(w.Wait): +			} + +			if err := w.doRequest(&ev); err == nil { +				break // success +			} +		} + +		select { +		case w.outCh <- &ev: +		default: +			return fmt.Errorf("out channel is full") +		} +	} +} + +func (w *worker) check(ctx context.Context) error { +	// setup a backoff mechanism that doesn't block +	next := make(chan struct{}, 1) +	backoff := func() { +		time.Sleep(w.backoff) +		next <- struct{}{} +	} +	next <- struct{}{} +	defer func() { +		<-next // empty channel before closing +		defer close(next) +	}() + +	evs := make([]*event, 0, w.maxEvents) +	for { +		if len(evs) == w.maxEvents { +			return fmt.Errorf("checker has too many queued events: %d", w.maxEvents) +		} + +		select { +		case <-ctx.Done(): +			return nil +		case ev := <-w.inCh: +			evs = append(evs, ev) +			continue +		case <-next: +			if len(evs) == 0 { +				go backoff() +				continue +			} +		} + +		if err := w.doRequest(evs[0]); err != nil { +			go backoff() +			continue +		} +		next <- struct{}{} // no backoff after success + +		select { +		case w.outCh <- evs[0]: +		default: +			return fmt.Errorf("out channel is full") +		} +		evs = evs[1:] +	} +} + +func (w *worker) newRequest(msg *merkle.Hash) (*http.Request, error) { +	stm := types.Statement{ +		ShardHint: uint64(time.Now().Unix()), +		Checksum:  *merkle.HashFn(msg[:]), +	} +	sig, err := stm.Sign(w.signer) +	if err != nil { +		return nil, err +	} +	leaf := requests.Leaf{ +		ShardHint:  stm.ShardHint, +		Message:    *msg, +		Signature:  *sig, +		PublicKey:  w.pub, +		DomainHint: w.DomainHint, +	} +	buf := bytes.NewBuffer(nil) +	if err := leaf.ToASCII(buf); err != nil { +		return nil, fmt.Errorf("serialize leaf request: %v", err) +	} +	req, err := http.NewRequest(http.MethodPost, w.url, buf) +	if err != nil { +		return nil, fmt.Errorf("create http request: %v", err) +	} +	return req, nil +} + +func (w *worker) doRequest(ev *event) error { +	rsp, err := w.cli.Do(ev.req) +	if err != nil { +		return err +	} +	defer rsp.Body.Close() + +	if rsp.StatusCode == http.StatusOK { +		ev.got200 = true +		ev.end = time.Now() +		return nil +	} else if rsp.StatusCode == http.StatusAccepted { +		if !ev.got202 { +			ev.got202 = true +			ev.end = time.Now() +			return nil // first 202 response +		} +	} else if rsp.StatusCode >= 300 && rsp.StatusCode < 400 { +		ev.num3xx += 1 +	} else if rsp.StatusCode >= 400 && rsp.StatusCode < 500 { +		ev.num4xx += 1 +	} else if rsp.StatusCode >= 500 && rsp.StatusCode < 600 { +		ev.num5xx += 1 +	} +	return fmt.Errorf("status %d", err) +} + +func (ev *event) resetCounters() { +	ev.num3xx = 0 +	ev.num4xx = 0 +	ev.num5xx = 0 +} | 
