aboutsummaryrefslogtreecommitdiff
path: root/cmd/sigsum-debug/spam/leaf/main.go
diff options
context:
space:
mode:
authorRasmus Dahlberg <rasmus@mullvad.net>2022-07-22 18:25:56 +0200
committerRasmus Dahlberg <rasmus@mullvad.net>2022-07-22 18:25:56 +0200
commitf04e26523189ac3fc54381376966f98d2f52440b (patch)
tree52f664a52759322947420f7313573e92002fa6f9 /cmd/sigsum-debug/spam/leaf/main.go
parent722e48feb2aa85d8cb75069a06024e506367e34f (diff)
add spam command to sigsum-debugsigsum-spam
Diffstat (limited to 'cmd/sigsum-debug/spam/leaf/main.go')
-rw-r--r--cmd/sigsum-debug/spam/leaf/main.go88
1 files changed, 88 insertions, 0 deletions
diff --git a/cmd/sigsum-debug/spam/leaf/main.go b/cmd/sigsum-debug/spam/leaf/main.go
new file mode 100644
index 0000000..b5895e2
--- /dev/null
+++ b/cmd/sigsum-debug/spam/leaf/main.go
@@ -0,0 +1,88 @@
+package leaf
+
+import (
+ "context"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "time"
+
+ "git.sigsum.org/sigsum-go/pkg/log"
+)
+
+func Main(args []string, cfg Config) error {
+ if err := cfg.parse(args); err != nil {
+ return err
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), cfg.Duration)
+ s := newStatus()
+
+ var wg sync.WaitGroup
+ events := make(chan *event, 1024)
+ defer close(events)
+ for i := uint64(0); i < cfg.NumSubmitters; i++ {
+ wg.Add(1)
+ w := worker{Config: cfg, outCh: events}
+ go func() {
+ defer wg.Done()
+ defer cancel()
+ if err := w.submit(ctx); err != nil {
+ log.Fatal("submitter died: %v", err)
+ }
+ }()
+ }
+
+ var checkChs []chan *event
+ for i := uint64(0); i < cfg.NumCheckers; i++ {
+ checkCh := make(chan *event, 1024)
+ checkChs = append(checkChs, checkCh)
+ defer close(checkCh)
+
+ wg.Add(1)
+ w := worker{Config: cfg, inCh: checkCh, outCh: events}
+ go func() {
+ defer wg.Done()
+ defer cancel()
+ if err := w.check(ctx); err != nil {
+ log.Fatal("checker died: %v", err)
+ }
+ }()
+ }
+
+ sigs := make(chan os.Signal, 1)
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+ defer close(sigs)
+
+ ticker := time.NewTicker(cfg.Interval)
+ defer wg.Wait()
+ defer cancel()
+ defer ticker.Stop()
+
+ log.Info("Output format is: \n\n%s\n\n", s.format())
+ nextChecker := 0
+ for {
+ select {
+ case <-ctx.Done():
+ log.Info("received done signal, closing...")
+ return nil
+ case ev := <-events:
+ s.register(ev)
+ if cfg.NumCheckers > 0 && !ev.got200 {
+ select {
+ case checkChs[nextChecker] <- ev:
+ default:
+ log.Fatal("check channel %d is full", nextChecker)
+ }
+ nextChecker = (int(nextChecker) + 1) % int(cfg.NumCheckers)
+ }
+ case <-ticker.C:
+ s.rotate(os.Stdout)
+ case <-sigs:
+ log.Info("received shutdown signal, closing...")
+ return nil
+ }
+ }
+ return nil
+}