Skip to content

Commit

Permalink
introduce graceful shutdown of the analytic module/runner
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongshixi committed Dec 5, 2023
1 parent db74611 commit 1f0b6b0
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 2 deletions.
6 changes: 6 additions & 0 deletions analytics/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,9 @@ func (ea enabledAnalytics) LogNotificationEventObject(ne *analytics.Notification
}
}
}

func (ea enabledAnalytics) Shutdown() {
for _, module := range ea {
module.Shutdown()
}
}
2 changes: 2 additions & 0 deletions analytics/build/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (m *sampleModule) LogAmpObject(ao *analytics.AmpObject) { *m.count++ }

func (m *sampleModule) LogNotificationEventObject(ne *analytics.NotificationEvent) { *m.count++ }

func (m *sampleModule) Shutdown() {}

func initAnalytics(count *int) analytics.Runner {
modules := make(enabledAnalytics, 0)
modules["sampleModule"] = &sampleModule{count}
Expand Down
1 change: 1 addition & 0 deletions analytics/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Module interface {
LogSetUIDObject(*SetUIDObject)
LogAmpObject(*AmpObject)
LogNotificationEventObject(*NotificationEvent)
Shutdown()
}

// Loggable object of a transaction at /openrtb2/auction endpoint
Expand Down
3 changes: 3 additions & 0 deletions analytics/filesystem/file_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func (f *FileLogger) LogNotificationEventObject(ne *analytics.NotificationEvent)
f.Logger.Flush()
}

// Shutdown the logger - No-op Implementation
func (f *FileLogger) Shutdown() {}

// Method to initialize the analytic module
func NewFileLogger(filename string) (analytics.Module, error) {
options := glog.LogOptions{
Expand Down
2 changes: 2 additions & 0 deletions analytics/pubstack/pubstack_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ func (p *PubstackModule) LogAmpObject(ao *analytics.AmpObject) {
p.eventChannels[amp].Push(payload)
}

func (p *PubstackModule) Shutdown() {}

func (p *PubstackModule) start(c <-chan *Configuration) {
for {
select {
Expand Down
1 change: 1 addition & 0 deletions analytics/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ type Runner interface {
LogSetUIDObject(*SetUIDObject)
LogAmpObject(*AmpObject, privacy.ActivityControl)
LogNotificationEventObject(*NotificationEvent, privacy.ActivityControl)
Shutdown()
}
4 changes: 4 additions & 0 deletions endpoints/cookie_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2136,6 +2136,10 @@ func (m *MockAnalyticsRunner) LogNotificationEventObject(obj *analytics.Notifica
m.Called(obj, ac)
}

func (m *MockAnalyticsRunner) Shutdown() {
m.Called()
}

type MockGDPRPerms struct {
mock.Mock
}
Expand Down
2 changes: 2 additions & 0 deletions endpoints/events/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (e *eventsMockAnalyticsModule) LogNotificationEventObject(ne *analytics.Not
e.Invoked = true
}

func (e *eventsMockAnalyticsModule) Shutdown() {}

var mockAccountData = map[string]json.RawMessage{
"events_enabled": json.RawMessage(`{"events": {"enabled":true}}`),
"events_disabled": json.RawMessage(`{"events": {"enabled":false}}`),
Expand Down
1 change: 1 addition & 0 deletions endpoints/openrtb2/amp_auction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,7 @@ func (logger mockLogger) LogNotificationEventObject(uuidObj *analytics.Notificat
func (logger mockLogger) LogAmpObject(ao *analytics.AmpObject, _ privacy.ActivityControl) {
*logger.ampObject = *ao
}
func (logger mockLogger) Shutdown() {}

func TestBuildAmpObject(t *testing.T) {
testCases := []struct {
Expand Down
2 changes: 2 additions & 0 deletions endpoints/openrtb2/video_auction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,8 @@ func (m *mockAnalyticsModule) LogAmpObject(ao *analytics.AmpObject, _ privacy.Ac
func (m *mockAnalyticsModule) LogNotificationEventObject(ne *analytics.NotificationEvent, _ privacy.ActivityControl) {
}

func (m *mockAnalyticsModule) Shutdown() {}

func mockDeps(t *testing.T, ex *mockExchangeVideo) *endpointDeps {
return &endpointDeps{
fakeUUIDGenerator{},
Expand Down
8 changes: 6 additions & 2 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,15 @@ func New(cfg *config.Configuration, rateConvertor *currency.RateConverter) (r *R
// Metrics engine
r.MetricsEngine = metricsConf.NewMetricsEngine(cfg, openrtb_ext.CoreBidderNames(), syncerKeys, moduleStageNames)
shutdown, fetcher, ampFetcher, accounts, categoriesFetcher, videoFetcher, storedRespFetcher := storedRequestsConf.NewStoredRequests(cfg, r.MetricsEngine, generalHttpClient, r.Router)
// todo(zachbadgett): better shutdown
r.Shutdown = shutdown

analyticsRunner := analyticsBuild.New(&cfg.Analytics)

// todo(zachbadgett): better shutdown
r.Shutdown = func() {
shutdown()
analyticsRunner.Shutdown()
}

paramsValidator, err := openrtb_ext.NewBidderParamsValidator(schemaDirectory)
if err != nil {
glog.Fatalf("Failed to create the bidder params validator. %v", err)
Expand Down

0 comments on commit 1f0b6b0

Please sign in to comment.