From 0d3e71298fece29846c39923afa8057eff7204c2 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Mon, 13 May 2024 15:31:34 +0100 Subject: [PATCH] Event-handler: call `SearchUnstructuredEvents` with smaller windows This PR aims to reduce the windows sent from `SearchUnstructuredEvents`. StartTime is never update so it's kept with its initial value which causes problems to the event handler sending windows that include more than 1Gb of data when using the Athena backend which causes failures. Signed-off-by: Tiago Silva --- event-handler/teleport_events_watcher.go | 49 ++++++++++++--- event-handler/teleport_events_watcher_test.go | 62 ++++++++++++++++++- 2 files changed, 100 insertions(+), 11 deletions(-) diff --git a/event-handler/teleport_events_watcher.go b/event-handler/teleport_events_watcher.go index f715f3191..4b7399408 100644 --- a/event-handler/teleport_events_watcher.go +++ b/event-handler/teleport_events_watcher.go @@ -207,16 +207,45 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error { // getEvents calls Teleport client and loads events func (t *TeleportEventsWatcher) getEvents(ctx context.Context) ([]*auditlogpb.EventUnstructured, string, error) { - return t.client.SearchUnstructuredEvents( - ctx, - t.startTime, - time.Now().UTC(), - "default", - t.config.Types, - t.config.BatchSize, - types.EventOrderAscending, - t.cursor, - ) + rangeSplitByDay := splitRangeByDay(t.startTime, time.Now().UTC()) + for i := 1; i < len(rangeSplitByDay); i++ { + startTime := rangeSplitByDay[i-1] + endTime := rangeSplitByDay[i] + log.Debugf("Fetching events from %v to %v", startTime, endTime) + evts, cursor, err := t.client.SearchUnstructuredEvents( + ctx, + startTime, + endTime, + "default", + t.config.Types, + t.config.BatchSize, + types.EventOrderAscending, + t.cursor, + ) + if err != nil { + return nil, "", trace.Wrap(err) + } + + // if no events are found, the cursor is out of the range [startTime, endTime] + // and it's the last complete day, update start time to the next day. + if len(evts) == 0 && i < len(rangeSplitByDay)-1 { + log.Infof("No events found for the range %v to %v", startTime, endTime) + t.startTime = endTime + continue + } + // if any events are found, return them + return evts, cursor, nil + } + return nil, t.cursor, nil +} + +func splitRangeByDay(from, to time.Time) []time.Time { + // splitRangeByDay splits the range into days + var days []time.Time + for d := from; d.Before(to); d = d.AddDate(0, 0, 1) { + days = append(days, d) + } + return append(days, to) // add the last date } // pause sleeps for timeout seconds diff --git a/event-handler/teleport_events_watcher_test.go b/event-handler/teleport_events_watcher_test.go index b3242ac7c..3a98384a9 100644 --- a/event-handler/teleport_events_watcher_test.go +++ b/event-handler/teleport_events_watcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "strconv" "sync" "testing" @@ -28,7 +29,6 @@ import ( "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/trace" "github.com/stretchr/testify/require" - "golang.org/x/net/context" ) // mockTeleportEventWatcher is Teleport client mock @@ -121,6 +121,7 @@ func (c *mockTeleportEventWatcher) Close() error { } func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClient) *TeleportEventsWatcher { + client := &TeleportEventsWatcher{ client: eventsClient, pos: -1, @@ -414,3 +415,62 @@ func TestValidateConfig(t *testing.T) { }) } } + +func Test_splitRangeByDay(t *testing.T) { + type args struct { + from time.Time + to time.Time + } + tests := []struct { + name string + args args + want []time.Time + }{ + { + name: "Same day", + args: args{ + from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 1, 23, 59, 59, 0, time.UTC), + }, + want: []time.Time{ + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 1, 23, 59, 59, 0, time.UTC), + }, + }, + { + name: "Two days", + args: args{ + from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 2, 23, 59, 59, 0, time.UTC), + }, + want: []time.Time{ + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 2, 23, 59, 59, 0, time.UTC), + }, + }, + { + name: "week", + args: args{ + from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 7, 23, 59, 59, 0, time.UTC), + }, + want: []time.Time{ + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 5, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 6, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 7, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 7, 23, 59, 59, 0, time.UTC), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := splitRangeByDay(tt.args.from, tt.args.to) + require.Equal(t, tt.want, got) + }) + } +}