Skip to content

Commit

Permalink
Use event time also in map, to process sequential events with the sam…
Browse files Browse the repository at this point in the history
…e correlation ID
  • Loading branch information
Juan Font committed Dec 12, 2024
1 parent 481e553 commit f89b337
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 11 deletions.
5 changes: 3 additions & 2 deletions app.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package streamer

import (
"fmt"
"os"
"time"

Expand Down Expand Up @@ -100,7 +101,7 @@ func (s *GitLabLogStreamer) preloadDBRecentData() error {
}

for _, event := range auditEvents {
s.latestAuditLogEvents.Store(event.CorrelationID, *event)
s.latestAuditLogEvents.Store(fmt.Sprintf("%s,%s", event.CorrelationID, event.Time.Format(time.RFC3339)), *event)
}

// load the auth log events from the last preloadEventsPeriodDays from s.db
Expand All @@ -112,7 +113,7 @@ func (s *GitLabLogStreamer) preloadDBRecentData() error {
}

for _, event := range authEvent {
s.latestAuthEvents.Store(event.CorrelationID, *event)
s.latestAuthEvents.Store(fmt.Sprintf("%s,%s", event.CorrelationID, event.Time.Format(time.RFC3339)), *event)
}

return nil
Expand Down
9 changes: 5 additions & 4 deletions audit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -83,9 +84,9 @@ func (s *GitLabLogStreamer) processNewAuditLogEvents(auditEvents []*AuditEvent)
// if it does, we skip it
// if it doesn't, we insert it

_, ok := s.latestAuditLogEvents.Load(auditEvent.CorrelationID)
_, ok := s.latestAuditLogEvents.Load(fmt.Sprintf("%s,%s", auditEvent.CorrelationID, auditEvent.Time.Format(time.RFC3339)))
if ok {
log.Debug().Msgf("Audit event with correlation ID %s already exists. Skipping", auditEvent.CorrelationID)
log.Debug().Msgf("Audit event with correlation ID %s at %s already exists. Skipping", auditEvent.CorrelationID, auditEvent.Time.Format(time.RFC3339))
continue
}

Expand All @@ -95,7 +96,7 @@ func (s *GitLabLogStreamer) processNewAuditLogEvents(auditEvents []*AuditEvent)
return newEvents, err
}

s.latestAuditLogEvents.Store(auditEvent.CorrelationID, *auditEvent)
s.latestAuditLogEvents.Store(fmt.Sprintf("%s,%s", auditEvent.CorrelationID, auditEvent.Time.Format(time.RFC3339)), *auditEvent)
newEvents = append(newEvents, auditEvent)
log.Info().Msgf("Inserted audit event with correlation ID %s", auditEvent.CorrelationID)
}
Expand Down Expand Up @@ -167,7 +168,7 @@ func auditEventToUserMessageType(auditEvent *AuditEvent) (string, string) {
return "User event - remove", fmt.Sprintf("User %s removed %s for %s", auditEvent.AuthorName, *auditEvent.Remove, auditEvent.EntityPath)
}

return "User event - unknown", fmt.Sprintf("User %s %s (target %s)", auditEvent.AuthorName, auditEvent.Action, auditEvent.EntityPath)
return "User event - unknown", fmt.Sprintf("User %s %s (target %s)", auditEvent.AuthorName, *auditEvent.Action, auditEvent.EntityPath)
}

func auditEventToProjectMessageType(auditEvent *AuditEvent) (string, string) {
Expand Down
8 changes: 5 additions & 3 deletions auth_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package streamer
import (
"bufio"
"encoding/json"
"fmt"
"os"
"strings"
"time"

"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -87,9 +89,9 @@ func (s *GitLabLogStreamer) processNewAuthEvents(authEvents []*AuthEvent) ([]*Au
// if it does, we skip it
// if it doesn't, we insert it

_, ok := s.latestAuthEvents.Load(authEvent.CorrelationID)
_, ok := s.latestAuthEvents.Load(fmt.Sprintf("%s,%s", authEvent.CorrelationID, authEvent.Time.Format(time.RFC3339)))
if ok {
log.Debug().Msgf("Auth event with correlation ID %s already exists. Skipping", authEvent.CorrelationID)
log.Debug().Msgf("Auth event with correlation ID %s at %s already exists. Skipping", authEvent.CorrelationID, authEvent.Time.Format(time.RFC3339))
continue
}

Expand All @@ -99,7 +101,7 @@ func (s *GitLabLogStreamer) processNewAuthEvents(authEvents []*AuthEvent) ([]*Au
return newEvents, err
}

s.latestAuthEvents.Store(authEvent.CorrelationID, *authEvent)
s.latestAuthEvents.Store(fmt.Sprintf("%s,%s", authEvent.CorrelationID, authEvent.Time.Format(time.RFC3339)), *authEvent)
newEvents = append(newEvents, authEvent)
log.Info().Msgf("Inserted auth event with correlation ID %s", authEvent.CorrelationID)
}
Expand Down
2 changes: 1 addition & 1 deletion types_audit_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (

type AuditEvent struct {
ID uint64 `gorm:"primary_key" json:"-"`
CorrelationID string `gorm:"type:varchar(64);unique_index" json:"correlation_id"`
CorrelationID string `gorm:"type:varchar(64)" json:"correlation_id"`

Severity string `json:"severity"`
Time time.Time `json:"time"`
Expand Down
2 changes: 1 addition & 1 deletion types_auth_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

type AuthEvent struct {
ID uint64 `gorm:"primary_key" json:"-"`
CorrelationID string `gorm:"type:varchar(64);unique_index" json:"correlation_id"`
CorrelationID string `gorm:"type:varchar(64)" json:"correlation_id"`
Severity string `json:"severity"`
Time time.Time `json:"time"`

Expand Down

0 comments on commit f89b337

Please sign in to comment.