diff --git a/internal/app/app.go b/internal/app/app.go index 2894be46..f994c671 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -20,6 +20,7 @@ import ( "github.com/metal-toolbox/audito-maldito/internal/auditd" "github.com/metal-toolbox/audito-maldito/internal/auditd/dirreader" "github.com/metal-toolbox/audito-maldito/internal/common" + "github.com/metal-toolbox/audito-maldito/internal/health" "github.com/metal-toolbox/audito-maldito/internal/journald" "github.com/metal-toolbox/audito-maldito/internal/metrics" "github.com/metal-toolbox/audito-maldito/internal/processors" @@ -40,11 +41,12 @@ OPTIONS var logger *zap.SugaredLogger //nolint -func Run(ctx context.Context, osArgs []string, h *common.Health, optLoggerConfig *zap.Config) error { +func Run(ctx context.Context, osArgs []string, h *health.Health, optLoggerConfig *zap.Config) error { var bootID string var auditlogpath string var auditLogDirPath string var enableMetrics bool + var enableHealthz bool logLevel := zapcore.DebugLevel // TODO: Switch default back to zapcore.ErrorLevel. flagSet := flag.NewFlagSet(osArgs[0], flag.ContinueOnError) @@ -55,6 +57,7 @@ func Run(ctx context.Context, osArgs []string, h *common.Health, optLoggerConfig flagSet.StringVar(&auditLogDirPath, "audit-dir-path", "/var/log/audit", "Path to the Linux audit log directory") flagSet.Var(&logLevel, "log-level", "Set the log level according to zapcore.Level") flagSet.BoolVar(&enableMetrics, "metrics", false, "Enable Prometheus HTTP /metrics server") + flagSet.BoolVar(&enableHealthz, "healthz", false, "Enable HTTP health endpoints server") flagSet.Usage = func() { os.Stderr.WriteString(usage) flagSet.PrintDefaults() @@ -114,11 +117,20 @@ func Run(ctx context.Context, osArgs []string, h *common.Health, optLoggerConfig return fmt.Errorf("failed to open audit log file: %w", auditfileerr) } + server := &http.Server{Addr: ":2112"} + if enableMetrics { - server := &http.Server{Addr: ":2112"} + http.Handle("/metrics", promhttp.Handler()) + } + + if enableHealthz { + http.Handle("/readyz", h.ReadyzHandler()) + // TODO: Add livez endpoint + } + + if enableMetrics || enableHealthz { eg.Go(func() error { - http.Handle("/metrics", promhttp.Handler()) - logger.Infoln("Starting HTTP metrics server on :2112") + logger.Infoln("Starting HTTP server on :2112") if err := server.ListenAndServe(); err != nil { logger.Errorf("Failed to start HTTP metrics server: %v", err) return err @@ -139,10 +151,10 @@ func Run(ctx context.Context, osArgs []string, h *common.Health, optLoggerConfig auditLogDirPath, err) } - h.AddReadiness() + h.AddReadiness(dirreader.DirReaderComponentName) go func() { <-logDirReader.InitFilesDone() - h.OnReady() + h.OnReady(dirreader.DirReaderComponentName) }() eg.Go(func() error { @@ -210,7 +222,7 @@ func Run(ctx context.Context, osArgs []string, h *common.Health, optLoggerConfig } }) } else { - h.AddReadiness() + h.AddReadiness(journald.JournaldReaderComponentName) eg.Go(func() error { jp := journald.Processor{ BootID: bootID, @@ -232,7 +244,7 @@ func Run(ctx context.Context, osArgs []string, h *common.Health, optLoggerConfig }) } - h.AddReadiness() + h.AddReadiness(auditd.AuditdProcessorComponentName) eg.Go(func() error { ap := auditd.Auditd{ After: time.UnixMicro(int64(lastReadJournalTS)), diff --git a/internal/auditd/auditd.go b/internal/auditd/auditd.go index 095f1bfb..7731db91 100644 --- a/internal/auditd/auditd.go +++ b/internal/auditd/auditd.go @@ -12,6 +12,13 @@ import ( "github.com/metal-toolbox/audito-maldito/internal/auditd/sessiontracker" "github.com/metal-toolbox/audito-maldito/internal/common" + "github.com/metal-toolbox/audito-maldito/internal/health" +) + +const ( + // AuditdProcessorComponentName is the name of the component + // that reads from auditd. This is used in the health check. + AuditdProcessorComponentName = "auditd-processor" ) // libaudit variables. @@ -48,7 +55,7 @@ type Auditd struct { // EventW is the auditevent.EventWriter to write events to. EventW *auditevent.EventWriter - Health *common.Health + Health *health.Health } // TODO: Write documentation about creating a splunk query that shows @@ -82,7 +89,7 @@ func (o *Auditd) Read(ctx context.Context) error { staleDataTicker := time.NewTicker(staleDataCleanupInterval) defer staleDataTicker.Stop() - o.Health.OnReady() + o.Health.OnReady(AuditdProcessorComponentName) for { select { diff --git a/internal/auditd/auditd_good_test.go b/internal/auditd/auditd_good_test.go index 9b6f7332..11f39c94 100644 --- a/internal/auditd/auditd_good_test.go +++ b/internal/auditd/auditd_good_test.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" "github.com/metal-toolbox/audito-maldito/internal/common" + "github.com/metal-toolbox/audito-maldito/internal/health" "github.com/metal-toolbox/audito-maldito/internal/testtools" ) @@ -82,7 +83,7 @@ func TestAuditd_Read_GoodRemoteUserLoginFirst(t *testing.T) { Events: events, T: t, }), - Health: common.NewSingleReadinessHealth(), + Health: health.NewSingleReadinessHealth(AuditdProcessorComponentName), } exited := make(chan error, 1) @@ -145,7 +146,7 @@ func TestAuditd_Read_GoodAuditdEventsFirst(t *testing.T) { Events: events, T: t, }), - Health: common.NewSingleReadinessHealth(), + Health: health.NewSingleReadinessHealth(AuditdProcessorComponentName), } exited := make(chan error, 1) diff --git a/internal/auditd/auditd_test.go b/internal/auditd/auditd_test.go index b28752aa..8b024114 100644 --- a/internal/auditd/auditd_test.go +++ b/internal/auditd/auditd_test.go @@ -18,6 +18,7 @@ import ( "github.com/metal-toolbox/audito-maldito/internal/auditd/sessiontracker" fakest "github.com/metal-toolbox/audito-maldito/internal/auditd/sessiontracker/fakes" "github.com/metal-toolbox/audito-maldito/internal/common" + "github.com/metal-toolbox/audito-maldito/internal/health" "github.com/metal-toolbox/audito-maldito/internal/testtools" ) @@ -38,7 +39,7 @@ func TestAuditd_Read_RemoteLoginError(t *testing.T) { Events: events, T: t, }), - Health: common.NewSingleReadinessHealth(), + Health: health.NewSingleReadinessHealth(AuditdProcessorComponentName), } errs := make(chan error, 1) @@ -82,7 +83,7 @@ func TestAuditd_Read_ParseAuditLogError(t *testing.T) { Events: events, T: t, }), - Health: common.NewSingleReadinessHealth(), + Health: health.NewSingleReadinessHealth(AuditdProcessorComponentName), } errs := make(chan error, 1) @@ -127,7 +128,7 @@ func TestAuditd_Read_AuditEventError(t *testing.T) { Events: events, T: t, }), - Health: common.NewSingleReadinessHealth(), + Health: health.NewSingleReadinessHealth(AuditdProcessorComponentName), } cancelEventWFn() diff --git a/internal/auditd/dirreader/dirreader.go b/internal/auditd/dirreader/dirreader.go index 4f7b5aca..b97dcd97 100644 --- a/internal/auditd/dirreader/dirreader.go +++ b/internal/auditd/dirreader/dirreader.go @@ -17,6 +17,12 @@ import ( "github.com/fsnotify/fsnotify" ) +const ( + // DirReaderComponentName is the component name for the dir reader. + // This is used for health checks. + DirReaderComponentName = "auditlog-dirreader" +) + // StartLogDirReader creates and starts a LogDirReader for // the specified directory path (e.g., "/var/log/audit"). // diff --git a/internal/common/health.go b/internal/common/health.go deleted file mode 100644 index a15fe5e4..00000000 --- a/internal/common/health.go +++ /dev/null @@ -1,77 +0,0 @@ -package common - -import ( - "context" - "errors" - "fmt" - "sync" - "time" -) - -var errTimedOut = errors.New("timed-out") - -// NewSingleReadinessHealth returns a *Health with its readiness counter -// set to one. -func NewSingleReadinessHealth() *Health { - h := NewHealth() - h.AddReadiness() - - return h -} - -// NewHealth returns a *Health. -func NewHealth() *Health { - return &Health{ - readyWG: &sync.WaitGroup{}, - } -} - -// Health represents the application's health. -type Health struct { - readyWG *sync.WaitGroup -} - -// AddReadiness increments the readiness counter by one. -// -// Refer to WaitForReady for details on readiness functionality. -func (o *Health) AddReadiness() { - o.readyWG.Add(1) -} - -// OnReady decrements the readiness counter by one. -// -// Refer to WaitForReady for details on readiness functionality. -func (o *Health) OnReady() { - o.readyWG.Done() -} - -// WaitForReadyCtxOrTimeout is a wrapper for WaitForReady with the addition -// of monitoring a context.Context for cancellation and a timeout. nil is -// returned if the readiness counter hits zero before ctx is marked as done -// and before the timeout occurs. -// -// A non-nil error is returned if ctx is marked as done or the timeout -// occurs prior to the readiness counter hitting zero. -func (o *Health) WaitForReadyCtxOrTimeout(ctx context.Context, timeout time.Duration) error { - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(timeout): - return fmt.Errorf("timeout of %s exceeded - %w", timeout.String(), errTimedOut) - case <-o.WaitForReady(): - return nil - } -} - -// WaitForReady returns a channel that is closed when the readiness counter -// hits zero, signalling that all internal application services are ready. -func (o *Health) WaitForReady() <-chan struct{} { - ready := make(chan struct{}) - - go func() { - o.readyWG.Wait() - close(ready) - }() - - return ready -} diff --git a/internal/health/health.go b/internal/health/health.go new file mode 100644 index 00000000..0c3f66d4 --- /dev/null +++ b/internal/health/health.go @@ -0,0 +1,144 @@ +package health + +import ( + "context" + "encoding/json" + "net/http" + "time" + + "github.com/metal-toolbox/audito-maldito/internal/common" +) + +const ( + // OverallReady is the key for the overall readiness status. + OverallReady = "overall" + // ComponentReady is the value indicating that a component is ready. + ComponentReady = "ok" + // ComponentNotReady is the value indicating that a component is not ready. + ComponentNotReady = "not-ready" +) + +// DefaultReadyCheckInterval is the interval for which the `WaitOnReady` function will wait. +var DefaultReadyCheckInterval = 500 * time.Millisecond + +// NewHealth returns a *Health. +func NewHealth() *Health { + return &Health{ + readyMap: common.NewGenericSyncMap[string, bool](), + } +} + +// Health represents the application's health. +type Health struct { + readyMap *common.GenericSyncMap[string, bool] +} + +// NewSingleReadinessHealth returns a *Health with its readiness counter +// set to one. +func NewSingleReadinessHealth(component string) *Health { + h := NewHealth() + h.AddReadiness(component) + + return h +} + +// AddReadiness adds another item for the readiness system to wait for. +// This should be called once per internal application service. +// Ensure that OnReady is called for each call to AddReadiness. +func (o *Health) AddReadiness(component string) { + o.readyMap.Store(component, false) +} + +// OnReady marks an item as ready. +// This should be called once per internal application service. +// Ensure that AddReadiness is called for each call to OnReady, +// else, the readiness check will not take that component into account. +func (o *Health) OnReady(component string) { + o.readyMap.Store(component, true) +} + +// WaitForReady returns a channel that is closed when the readiness counter +// hits zero, signalling that all internal application services are ready. +func (o *Health) WaitForReady(ctx context.Context) <-chan error { + out := make(chan error) + + go func() { + ticker := time.NewTicker(DefaultReadyCheckInterval) + for { + select { + case <-ctx.Done(): + out <- ctx.Err() + return + case <-ticker.C: + if o.IsReady() { + close(out) + return + } + } + } + }() + + return out +} + +// IsReady returns true if the readiness counter is less than or equal to. +func (o *Health) IsReady() bool { + isReady := true + o.readyMap.Iterate(func(key string, value bool) bool { + if !value { + isReady = false + return false + } + + return true + }) + + return isReady +} + +func (o *Health) GetReadyzStatusMap() map[string]string { + smap := make(map[string]string, o.readyMap.Len()+1) + overalReady := true + + o.readyMap.Iterate(func(key string, value bool) bool { + var status string + if !value { + overalReady = false + status = ComponentNotReady + } else { + status = ComponentReady + } + + smap[key] = status + return true + }) + + if overalReady { + smap[OverallReady] = ComponentReady + } else { + smap[OverallReady] = ComponentNotReady + } + + return smap +} + +func (o *Health) readyzHandler(w http.ResponseWriter, r *http.Request) { + smap := o.GetReadyzStatusMap() + if err := json.NewEncoder(w).Encode(smap); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + // We don't use `o.IsReady()` here we don't want to iterate + // over the map again. + if smap[OverallReady] == ComponentReady { + w.WriteHeader(http.StatusOK) + return + } + + w.WriteHeader(http.StatusServiceUnavailable) +} + +func (o *Health) ReadyzHandler() http.Handler { + return http.HandlerFunc(o.readyzHandler) +} diff --git a/internal/common/health_test.go b/internal/health/health_test.go similarity index 60% rename from internal/common/health_test.go rename to internal/health/health_test.go index 27a516b3..31bfd53a 100644 --- a/internal/common/health_test.go +++ b/internal/health/health_test.go @@ -1,4 +1,4 @@ -package common +package health import ( "context" @@ -18,11 +18,14 @@ func TestNewHealth_DefaultReadiness(t *testing.T) { h := NewHealth() + assert.True(t, h.IsReady(), "health should be ready by default") + select { case <-ctx.Done(): t.Fatal(ctx.Err()) - case <-h.WaitForReady(): + case err := <-h.WaitForReady(ctx): // Success. + assert.NoError(t, err, "wait for ready should not return an error") } } @@ -36,35 +39,19 @@ func TestHealth_WaitForReady(t *testing.T) { numServices := int(testtools.Intn(t, 0, 20)) for i := 0; i < numServices; i++ { - h.AddReadiness() - go h.OnReady() + h.AddReadiness("test") + + assert.False(t, h.IsReady(), "health should not be ready yet") + + go h.OnReady("test") } select { case <-ctx.Done(): t.Fatal(ctx.Err()) - case <-h.WaitForReady(): - // Success. - } -} - -func TestHealth_WaitForReadyCtxOrTimeout(t *testing.T) { - t.Parallel() - - ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) - defer cancelFn() - - h := NewHealth() - - numServices := int(testtools.Intn(t, 0, 20)) - for i := 0; i < numServices; i++ { - h.AddReadiness() - go h.OnReady() - } - - err := h.WaitForReadyCtxOrTimeout(ctx, time.Second) - if err != nil { - t.Fatal(err) + case err := <-h.WaitForReady(ctx): + assert.NoError(t, err, "wait for ready should not return an error") + assert.True(t, h.IsReady(), "health should be ready now") } } @@ -80,23 +67,28 @@ func TestHealth_WaitForReadyCtxOrTimeout_Canceled(t *testing.T) { numServices := int(testtools.Intn(t, 0, 20)) for i := 0; i < numServices; i++ { - h.AddReadiness() + h.AddReadiness("test") } - err := h.WaitForReadyCtxOrTimeout(ctx, time.Second) + err := <-h.WaitForReady(ctx) assert.ErrorIs(t, err, context.Canceled) + assert.False(t, h.IsReady(), "health should not ready") } func TestHealth_WaitForReadyCtxOrTimeout_TimedOut(t *testing.T) { t.Parallel() + ctx, cancelFn := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancelFn() + h := NewHealth() numServices := int(testtools.Intn(t, 0, 20)) for i := 0; i < numServices; i++ { - h.AddReadiness() + h.AddReadiness("test") } - err := h.WaitForReadyCtxOrTimeout(context.Background(), time.Nanosecond) - assert.ErrorIs(t, err, errTimedOut) + err := <-h.WaitForReady(ctx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + assert.False(t, h.IsReady(), "health should not ready") } diff --git a/internal/integration_tests/ubuntu_test.go b/internal/integration_tests/ubuntu_test.go index 344e53b5..fa453c4c 100644 --- a/internal/integration_tests/ubuntu_test.go +++ b/internal/integration_tests/ubuntu_test.go @@ -15,7 +15,7 @@ import ( "time" "github.com/metal-toolbox/audito-maldito/internal/app" - "github.com/metal-toolbox/audito-maldito/internal/common" + "github.com/metal-toolbox/audito-maldito/internal/health" ) const ( @@ -73,14 +73,17 @@ func TestSSHCertLoginAndExecStuff_Ubuntu(t *testing.T) { readEventsErrs := createPipeAndReadEvents(t, ctx, "/app-audit/audit.log", onEventFn) - appHealth := common.NewHealth() + appHealth := health.NewHealth() + + tmoutctx, tmoutctxFn := context.WithTimeout(ctx, time.Minute) + defer tmoutctxFn() appErrs := make(chan error, 1) go func() { appErrs <- app.Run(ctx, []string{"audito-maldito"}, appHealth, zapLoggerConfig()) }() - err := appHealth.WaitForReadyCtxOrTimeout(ctx, time.Minute) + err := <-appHealth.WaitForReady(tmoutctx) if err != nil { t.Fatalf("failed to wait for app to become ready - %s", err) } diff --git a/internal/journald/reader.go b/internal/journald/reader.go index edceada0..98d7b1ec 100644 --- a/internal/journald/reader.go +++ b/internal/journald/reader.go @@ -13,11 +13,18 @@ import ( "go.uber.org/zap" "github.com/metal-toolbox/audito-maldito/internal/common" + "github.com/metal-toolbox/audito-maldito/internal/health" "github.com/metal-toolbox/audito-maldito/internal/metrics" "github.com/metal-toolbox/audito-maldito/internal/processors" "github.com/metal-toolbox/audito-maldito/internal/util" ) +const ( + // JournaldReaderComponentName is the name of the component + // that reads from journald. This is used in the health check. + JournaldReaderComponentName = "journald-reader" +) + // ErrNonFatal is returned when the error is not fatal // and processing may continue. var ( @@ -37,7 +44,7 @@ type Processor struct { EventW *auditevent.EventWriter Logins chan<- common.RemoteUserLogin CurrentTS uint64 // Microseconds since unix epoch. - Health *common.Health + Health *health.Health Metrics *metrics.PrometheusMetricsProvider jr JournalReader } @@ -74,7 +81,7 @@ func (jp *Processor) Read(ctx context.Context) error { flushLastRead(jp.CurrentTS) }() - jp.Health.OnReady() + jp.Health.OnReady(JournaldReaderComponentName) for { select { diff --git a/main.go b/main.go index 8f66325b..23dca715 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,7 @@ import ( "syscall" "github.com/metal-toolbox/audito-maldito/internal/app" - "github.com/metal-toolbox/audito-maldito/internal/common" + "github.com/metal-toolbox/audito-maldito/internal/health" ) func main() { @@ -25,5 +25,5 @@ func mainWithError() error { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - return app.Run(ctx, os.Args, common.NewHealth(), nil) + return app.Run(ctx, os.Args, health.NewHealth(), nil) }