Skip to content

Commit

Permalink
chore: add rotating log for audit data
Browse files Browse the repository at this point in the history
Adds rotating audit log writer. Also minor improvements.

For #37

Signed-off-by: Dmitriy Matrenichev <[email protected]>
  • Loading branch information
DmitriyMV committed Jul 22, 2024
1 parent 67bcc75 commit b7ee728
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 180 deletions.
4 changes: 0 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ require (
)

require (
github.com/Masterminds/semver/v3 v3.2.1 // indirect
github.com/ProtonMail/go-crypto v1.0.0 // indirect
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f // indirect
github.com/aws/aws-sdk-go v1.44.256 // indirect
Expand Down Expand Up @@ -140,7 +139,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/docker/cli v27.0.3+incompatible // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/docker v27.0.3+incompatible // indirect
github.com/docker/docker-credential-helpers v0.8.2 // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/evanphx/json-patch v5.9.0+incompatible // indirect
Expand Down Expand Up @@ -173,10 +171,8 @@ require (
github.com/hashicorp/hcl v1.0.1-vault-5 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/jsimonetti/rtnetlink v1.4.2 // indirect
github.com/jsimonetti/rtnetlink/v2 v2.0.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
Expand Down
151 changes: 8 additions & 143 deletions go.sum

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions internal/backend/runtime/omni/audit/audit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

// Package audit provides a state wrapper that logs audit events.
package audit

import (
"github.com/siderolabs/omni/internal/pkg/auth/role"
)

// Data contains the audit data.
type Data struct {
UserAgent string `json:"user_agent,omitempty"`
IPAddress string `json:"ip_address,omitempty"`
UserID string `json:"user_id,omitempty"`
Identity string `json:"identity,omitempty"`
Role role.Role `json:"role,omitempty"`
Email string `json:"email,omitempty"`
}
12 changes: 12 additions & 0 deletions internal/backend/runtime/omni/audit/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

package audit

import "time"

func (l *LogFile) WriteAt(data any, at time.Time) error {
return l.writeAt(data, at)
}
87 changes: 87 additions & 0 deletions internal/backend/runtime/omni/audit/log_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

package audit

import (
"encoding/json"
"os"
"path/filepath"
"runtime"
"sync"
"time"

"github.com/siderolabs/gen/pair/ordered"
)

// LogFile is a rotating log file.
//
//nolint:govet
type LogFile struct {
dir string

mu sync.Mutex
f *os.File
lastWrite time.Time
}

// NewLogFile creates a new rotating log file.
func NewLogFile(dir string) *LogFile {
return &LogFile{dir: dir}
}

// Write writes data to the log file, creating new one on demand.
func (l *LogFile) Write(data any) error {
return l.writeAt(data, time.Now())
}

func (l *LogFile) writeAt(data any, at time.Time) error {
f, err := l.openFile(at)
if err != nil {
return err
}

if err := json.NewEncoder(f).Encode(data); err != nil {
return err
}

l.mu.Lock()
l.lastWrite = at
l.mu.Unlock()

return nil
}

// openFile opens a file for the given date. It returns the file is date for at matches
// the last write date. Otherwise, it opens a new file.
func (l *LogFile) openFile(at time.Time) (*os.File, error) {
l.mu.Lock()
defer l.mu.Unlock()

// TODO(Dmitriy): decide what to do with when we write entry for the previous day
// into the current day (happens on day's border). One solution is to have two decriptors opened:
// one for the current day and one for the previous day. We can close the previous day's descriptor
// when we open the 3rd day's descriptor. This way we will have at most two descriptors opened.
if l.f != nil && ordered.MakeTriple(at.Date()).Compare(ordered.MakeTriple(l.lastWrite.Date())) <= 0 {
return l.f, nil
}

logPath := filepath.Join(l.dir, at.Format("2006-01-02")) + ".jsonlog"

f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return nil, err
}

l.f = f

// We are not closing files ourselves and this is intentional. The reason is: many writers can come to write
// audit data at the day's border so, it's possible that instead of 1(old) -> 2(old) -> 3(old) -> new_file -> 4 -> 5 -> 6
// the order of operations will be 1(old) -> 2(old) -> new_file -> 3(old) -> 4 -> 5 -> 6. In this case, we don't
// want goroutine to observe closed file descriptor.
runtime.SetFinalizer(f, (*os.File).Close)

return f, nil
}
106 changes: 106 additions & 0 deletions internal/backend/runtime/omni/audit/log_file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

package audit_test

import (
"embed"
"io/fs"
"os"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/siderolabs/gen/xtesting/must"
"github.com/stretchr/testify/require"

"github.com/siderolabs/omni/internal/backend/runtime/omni/audit"
)

//go:embed testdata/currentday
var currentDay embed.FS

func TestLogFile_CurrentDay(t *testing.T) {
dir := must.Value(os.MkdirTemp("", "log_file_test"))(t)

t.Cleanup(func() { os.RemoveAll(dir) }) //nolint:errcheck

entries := []entry{
{shift: time.Second, data: audit.Data{UserAgent: "Mozilla/5.0", IPAddress: "10.10.0.1", Email: "[email protected]"}},
{shift: time.Minute, data: audit.Data{UserAgent: "Mozilla/5.0", IPAddress: "10.10.0.2", Email: "[email protected]"}},
{shift: 30 * time.Minute, data: audit.Data{UserAgent: "Mozilla/5.0", IPAddress: "10.10.0.3", Email: "[email protected]"}},
}

start := time.Date(2012, 1, 1, 23, 0, 0, 0, time.Local)
now := start
file := audit.NewLogFile(dir)

for _, e := range entries {
now = now.Add(e.shift)

require.NoError(t, file.WriteAt(e.data, now))
}

checkFiles(t, dir, currentDay, "currentday")

for range 100 {
runtime.GC() // ensure file is collected
}
}

//go:embed testdata/nextday
var nextDay embed.FS

func TestLogFile_CurrentAndNewDay(t *testing.T) {
dir := must.Value(os.MkdirTemp("", "log_file_test"))(t)

t.Cleanup(func() { os.RemoveAll(dir) }) //nolint:errcheck

entries := []entry{
{shift: 0, data: audit.Data{UserAgent: "Mozilla/5.0", IPAddress: "10.10.0.1", Email: "[email protected]"}},
{shift: 55 * time.Minute, data: audit.Data{UserAgent: "Mozilla/5.0", IPAddress: "10.10.0.2", Email: "[email protected]"}},
{shift: 5 * time.Minute, data: audit.Data{UserAgent: "Mozilla/5.0", IPAddress: "10.10.0.3", Email: "[email protected]"}},
}

start := time.Date(2012, 1, 1, 23, 0, 0, 0, time.Local)
now := start
file := audit.NewLogFile(dir)

for _, e := range entries {
now = now.Add(e.shift)

require.NoError(t, file.WriteAt(e.data, now))
}

checkFiles(t, dir, nextDay, "nextday")
}

//nolint:govet
type entry struct {
shift time.Duration
data audit.Data
}

type subFS interface {
fs.ReadFileFS
fs.ReadDirFS
}

func checkFiles(t *testing.T, dir string, subFs subFS, folder string) {
subFs = must.Value(fs.Sub(subFs, filepath.Join("testdata", folder)))(t).(subFS) //nolint:errcheck,forcetypeassert
files := must.Value(subFs.ReadDir("."))(t)

for _, f := range files {
if f.IsDir() {
t.Fatal("unexpected directory", f.Name())
}

expectedData := string(must.Value(subFs.ReadFile(f.Name()))(t))
actualData := string(must.Value(os.ReadFile(filepath.Join(dir, f.Name())))(t))

require.Equal(t, expectedData, actualData, "file %s", f.Name())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"user_agent":"Mozilla/5.0","ip_address":"10.10.0.1","email":"[email protected]"}
{"user_agent":"Mozilla/5.0","ip_address":"10.10.0.2","email":"[email protected]"}
{"user_agent":"Mozilla/5.0","ip_address":"10.10.0.3","email":"[email protected]"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"user_agent":"Mozilla/5.0","ip_address":"10.10.0.1","email":"[email protected]"}
{"user_agent":"Mozilla/5.0","ip_address":"10.10.0.2","email":"[email protected]"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"user_agent":"Mozilla/5.0","ip_address":"10.10.0.3","email":"[email protected]"}
45 changes: 18 additions & 27 deletions internal/backend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,15 @@ func (s *Server) buildServerOptions() ([]grpc.ServerOption, error) {
grpc_recovery.StreamServerInterceptor(recoveryOpt),
}

unaryAuthInterceptors, streamAuthInterceptors, err := s.getAuthInterceptors()
authIntereceptors, err := s.getAuthInterceptors()
if err != nil {
return nil, err
}

unaryInterceptors = append(unaryInterceptors, unaryAuthInterceptors...)
streamInterceptors = append(streamInterceptors, streamAuthInterceptors...)
for _, authIntereceptor := range authIntereceptors {
unaryInterceptors = append(unaryInterceptors, authIntereceptor.Unary())
streamInterceptors = append(streamInterceptors, authIntereceptor.Stream())
}

return []grpc.ServerOption{
grpc.MaxRecvMsgSize(constants.GRPCMaxMessageSize),
Expand All @@ -368,48 +370,37 @@ func (s *Server) buildServerOptions() ([]grpc.ServerOption, error) {
}, nil
}

func (s *Server) getAuthInterceptors() ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor, error) {
authEnabled := authres.Enabled(s.authConfig)

authConfigInterceptor := interceptor.NewAuthConfig(authEnabled, s.logger)
type interceptorCreator interface {
Unary() grpc.UnaryServerInterceptor
Stream() grpc.StreamServerInterceptor
}

unaryInterceptors := []grpc.UnaryServerInterceptor{
authConfigInterceptor.Unary(),
}
func (s *Server) getAuthInterceptors() ([]interceptorCreator, error) {
authEnabled := authres.Enabled(s.authConfig)

streamInterceptors := []grpc.StreamServerInterceptor{
authConfigInterceptor.Stream(),
}
result := []interceptorCreator{interceptor.NewAuthConfig(authEnabled, s.logger)}

if !authEnabled {
return unaryInterceptors, streamInterceptors, nil
return result, nil
}

// auth is enabled, add signature and jwt interceptors
signatureInterceptor := interceptor.NewSignature(s.authenticatorFunc(), s.logger)

unaryInterceptors = append(unaryInterceptors, signatureInterceptor.Unary())
streamInterceptors = append(streamInterceptors, signatureInterceptor.Stream())
result = append(result, interceptor.NewSignature(s.authenticatorFunc(), s.logger))

switch {
case s.authConfig.TypedSpec().Value.Auth0.Enabled:
verifier, err := auth0.NewIDTokenVerifier(s.authConfig.TypedSpec().Value.GetAuth0().Domain)
if err != nil {
return nil, nil, err
return nil, err
}

jwtInterceptor := interceptor.NewJWT(verifier, s.logger)
result = append(result, interceptor.NewJWT(verifier, s.logger))

unaryInterceptors = append(unaryInterceptors, jwtInterceptor.Unary())
streamInterceptors = append(streamInterceptors, jwtInterceptor.Stream())
case s.authConfig.TypedSpec().Value.Saml.Enabled:
samlInterceptor := interceptor.NewSAML(s.omniRuntime.State(), s.logger)

unaryInterceptors = append(unaryInterceptors, samlInterceptor.Unary())
streamInterceptors = append(streamInterceptors, samlInterceptor.Stream())
result = append(result, interceptor.NewSAML(s.omniRuntime.State(), s.logger))
}

return unaryInterceptors, streamInterceptors, nil
return result, nil
}

func (s *Server) authenticatorFunc() auth.AuthenticatorFunc {
Expand Down
12 changes: 6 additions & 6 deletions internal/pkg/auth/role/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ const (

var roles = []Role{None, Reader, Operator, Admin}

var indexes map[Role]int

func init() {
indexes = make(map[Role]int, len(roles))
var indexes = func() map[Role]int {
result := make(map[Role]int, len(roles))

for i, role := range roles {
indexes[role] = i
result[role] = i
}
}

return result
}()

// Parse parses the role string.
func Parse(role string) (Role, error) {
Expand Down

0 comments on commit b7ee728

Please sign in to comment.