From 1f0b6b0c51436a145a71b89df6dd2e29433346dd Mon Sep 17 00:00:00 2001 From: Zhongshi Xi Date: Tue, 5 Dec 2023 10:06:19 -0500 Subject: [PATCH] introduce graceful shutdown of the analytic module/runner --- analytics/build/build.go | 6 ++++++ analytics/build/build_test.go | 2 ++ analytics/core.go | 1 + analytics/filesystem/file_module.go | 3 +++ analytics/pubstack/pubstack_module.go | 2 ++ analytics/runner.go | 1 + endpoints/cookie_sync_test.go | 4 ++++ endpoints/events/event_test.go | 2 ++ endpoints/openrtb2/amp_auction_test.go | 1 + endpoints/openrtb2/video_auction_test.go | 2 ++ router/router.go | 8 ++++++-- 11 files changed, 30 insertions(+), 2 deletions(-) diff --git a/analytics/build/build.go b/analytics/build/build.go index 7fc577daedf..c3134780541 100644 --- a/analytics/build/build.go +++ b/analytics/build/build.go @@ -91,3 +91,9 @@ func (ea enabledAnalytics) LogNotificationEventObject(ne *analytics.Notification } } } + +func (ea enabledAnalytics) Shutdown() { + for _, module := range ea { + module.Shutdown() + } +} diff --git a/analytics/build/build_test.go b/analytics/build/build_test.go index efc0c862564..572671824a1 100644 --- a/analytics/build/build_test.go +++ b/analytics/build/build_test.go @@ -75,6 +75,8 @@ func (m *sampleModule) LogAmpObject(ao *analytics.AmpObject) { *m.count++ } func (m *sampleModule) LogNotificationEventObject(ne *analytics.NotificationEvent) { *m.count++ } +func (m *sampleModule) Shutdown() {} + func initAnalytics(count *int) analytics.Runner { modules := make(enabledAnalytics, 0) modules["sampleModule"] = &sampleModule{count} diff --git a/analytics/core.go b/analytics/core.go index 122a3da8ad3..6e58a133999 100644 --- a/analytics/core.go +++ b/analytics/core.go @@ -19,6 +19,7 @@ type Module interface { LogSetUIDObject(*SetUIDObject) LogAmpObject(*AmpObject) LogNotificationEventObject(*NotificationEvent) + Shutdown() } // Loggable object of a transaction at /openrtb2/auction endpoint diff --git a/analytics/filesystem/file_module.go b/analytics/filesystem/file_module.go index 4f7886c1206..2ec2d9ee867 100644 --- a/analytics/filesystem/file_module.go +++ b/analytics/filesystem/file_module.go @@ -85,6 +85,9 @@ func (f *FileLogger) LogNotificationEventObject(ne *analytics.NotificationEvent) f.Logger.Flush() } +// Shutdown the logger - No-op Implementation +func (f *FileLogger) Shutdown() {} + // Method to initialize the analytic module func NewFileLogger(filename string) (analytics.Module, error) { options := glog.LogOptions{ diff --git a/analytics/pubstack/pubstack_module.go b/analytics/pubstack/pubstack_module.go index 535118c0000..e6fe5cd8a3d 100644 --- a/analytics/pubstack/pubstack_module.go +++ b/analytics/pubstack/pubstack_module.go @@ -200,6 +200,8 @@ func (p *PubstackModule) LogAmpObject(ao *analytics.AmpObject) { p.eventChannels[amp].Push(payload) } +func (p *PubstackModule) Shutdown() {} + func (p *PubstackModule) start(c <-chan *Configuration) { for { select { diff --git a/analytics/runner.go b/analytics/runner.go index 7a2c56f77dd..2ca2841214f 100644 --- a/analytics/runner.go +++ b/analytics/runner.go @@ -11,4 +11,5 @@ type Runner interface { LogSetUIDObject(*SetUIDObject) LogAmpObject(*AmpObject, privacy.ActivityControl) LogNotificationEventObject(*NotificationEvent, privacy.ActivityControl) + Shutdown() } diff --git a/endpoints/cookie_sync_test.go b/endpoints/cookie_sync_test.go index 050e137ffed..c943b50713c 100644 --- a/endpoints/cookie_sync_test.go +++ b/endpoints/cookie_sync_test.go @@ -2136,6 +2136,10 @@ func (m *MockAnalyticsRunner) LogNotificationEventObject(obj *analytics.Notifica m.Called(obj, ac) } +func (m *MockAnalyticsRunner) Shutdown() { + m.Called() +} + type MockGDPRPerms struct { mock.Mock } diff --git a/endpoints/events/event_test.go b/endpoints/events/event_test.go index 81d000fd8a4..1e07bafbc83 100644 --- a/endpoints/events/event_test.go +++ b/endpoints/events/event_test.go @@ -64,6 +64,8 @@ func (e *eventsMockAnalyticsModule) LogNotificationEventObject(ne *analytics.Not e.Invoked = true } +func (e *eventsMockAnalyticsModule) Shutdown() {} + var mockAccountData = map[string]json.RawMessage{ "events_enabled": json.RawMessage(`{"events": {"enabled":true}}`), "events_disabled": json.RawMessage(`{"events": {"enabled":false}}`), diff --git a/endpoints/openrtb2/amp_auction_test.go b/endpoints/openrtb2/amp_auction_test.go index bd56457b3d7..b582cd35f66 100644 --- a/endpoints/openrtb2/amp_auction_test.go +++ b/endpoints/openrtb2/amp_auction_test.go @@ -1657,6 +1657,7 @@ func (logger mockLogger) LogNotificationEventObject(uuidObj *analytics.Notificat func (logger mockLogger) LogAmpObject(ao *analytics.AmpObject, _ privacy.ActivityControl) { *logger.ampObject = *ao } +func (logger mockLogger) Shutdown() {} func TestBuildAmpObject(t *testing.T) { testCases := []struct { diff --git a/endpoints/openrtb2/video_auction_test.go b/endpoints/openrtb2/video_auction_test.go index 70a37aab5df..61bf12fb4d1 100644 --- a/endpoints/openrtb2/video_auction_test.go +++ b/endpoints/openrtb2/video_auction_test.go @@ -1255,6 +1255,8 @@ func (m *mockAnalyticsModule) LogAmpObject(ao *analytics.AmpObject, _ privacy.Ac func (m *mockAnalyticsModule) LogNotificationEventObject(ne *analytics.NotificationEvent, _ privacy.ActivityControl) { } +func (m *mockAnalyticsModule) Shutdown() {} + func mockDeps(t *testing.T, ex *mockExchangeVideo) *endpointDeps { return &endpointDeps{ fakeUUIDGenerator{}, diff --git a/router/router.go b/router/router.go index d89d1f59ca2..14ed31908d6 100644 --- a/router/router.go +++ b/router/router.go @@ -200,11 +200,15 @@ func New(cfg *config.Configuration, rateConvertor *currency.RateConverter) (r *R // Metrics engine r.MetricsEngine = metricsConf.NewMetricsEngine(cfg, openrtb_ext.CoreBidderNames(), syncerKeys, moduleStageNames) shutdown, fetcher, ampFetcher, accounts, categoriesFetcher, videoFetcher, storedRespFetcher := storedRequestsConf.NewStoredRequests(cfg, r.MetricsEngine, generalHttpClient, r.Router) - // todo(zachbadgett): better shutdown - r.Shutdown = shutdown analyticsRunner := analyticsBuild.New(&cfg.Analytics) + // todo(zachbadgett): better shutdown + r.Shutdown = func() { + shutdown() + analyticsRunner.Shutdown() + } + paramsValidator, err := openrtb_ext.NewBidderParamsValidator(schemaDirectory) if err != nil { glog.Fatalf("Failed to create the bidder params validator. %v", err)