diff --git a/analytics/agma/agma_module.go b/analytics/agma/agma_module.go index 534c189d914..b531393f2ca 100644 --- a/analytics/agma/agma_module.go +++ b/analytics/agma/agma_module.go @@ -261,6 +261,11 @@ func (l *AgmaLogger) LogVideoObject(event *analytics.VideoObject) { l.bufferCh <- data } +func (l *AgmaLogger) Shutdown() { + glog.Info("[AgmaAnalytics] Shutdown, trying to flush buffer") + l.flush() // mutex safe +} + func (l *AgmaLogger) LogCookieSyncObject(event *analytics.CookieSyncObject) {} func (l *AgmaLogger) LogNotificationEventObject(event *analytics.NotificationEvent) {} func (l *AgmaLogger) LogSetUIDObject(event *analytics.SetUIDObject) {} diff --git a/analytics/agma/agma_module_test.go b/analytics/agma/agma_module_test.go index 3e4955bd8a5..550cf418c8d 100644 --- a/analytics/agma/agma_module_test.go +++ b/analytics/agma/agma_module_test.go @@ -664,3 +664,38 @@ func TestRaceEnd2End(t *testing.T) { assert.Equal(t, expected, actual) } + +func TestShutdownFlush(t *testing.T) { + cfg := config.AgmaAnalytics{ + Enabled: true, + Endpoint: config.AgmaAnalyticsHttpEndpoint{ + Url: "http://localhost:8000/event", + Timeout: "5s", + }, + Buffers: config.AgmaAnalyticsBuffer{ + EventCount: 1000, + BufferSize: "100mb", + Timeout: "5m", + }, + Accounts: []config.AgmaAnalyticsAccount{ + { + PublisherId: "track-me", + Code: "abc", + }, + }, + } + mockedSender := new(MockedSender) + mockedSender.On("Send", mock.Anything).Return(nil) + clockMock := clock.NewMock() + logger, err := newAgmaLogger(cfg, mockedSender.Send, clockMock) + assert.NoError(t, err) + + go logger.start() + logger.LogAuctionObject(&mockValidAuctionObject) + logger.Shutdown() + + time.Sleep(10 * time.Millisecond) + + mockedSender.AssertCalled(t, "Send", mock.Anything) + mockedSender.AssertNumberOfCalls(t, "Send", 1) +} diff --git a/analytics/build/build.go b/analytics/build/build.go index 4cba9a3f1a6..3c9d7ccbaea 100644 --- a/analytics/build/build.go +++ b/analytics/build/build.go @@ -129,6 +129,13 @@ func (ea enabledAnalytics) LogNotificationEventObject(ne *analytics.Notification } } +// Shutdown - correctly shutdown all analytics modules and wait for them to finish +func (ea enabledAnalytics) Shutdown() { + for _, module := range ea { + module.Shutdown() + } +} + func evaluateActivities(rw *openrtb_ext.RequestWrapper, ac privacy.ActivityControl, componentName string) (bool, *openrtb_ext.RequestWrapper) { // returned nil request wrapper means that request wrapper was not modified by activities and doesn't have to be changed in analytics object // it is needed in order to use one function for all analytics objects with RequestWrapper diff --git a/analytics/build/build_test.go b/analytics/build/build_test.go index d794c01ab8c..c455c364e83 100644 --- a/analytics/build/build_test.go +++ b/analytics/build/build_test.go @@ -79,6 +79,8 @@ func (m *sampleModule) LogAmpObject(ao *analytics.AmpObject) { *m.count++ } func (m *sampleModule) LogNotificationEventObject(ne *analytics.NotificationEvent) { *m.count++ } +func (m *sampleModule) Shutdown() { *m.count++ } + func initAnalytics(count *int) analytics.Runner { modules := make(enabledAnalytics, 0) modules["sampleModule"] = &sampleModule{count} @@ -92,6 +94,19 @@ func TestNewPBSAnalytics(t *testing.T) { assert.Equal(t, len(instance), 0) } +func TestPBSAnalyticsShutdown(t *testing.T) { + countA := 0 + countB := 0 + modules := make(enabledAnalytics, 0) + modules["sampleModuleA"] = &sampleModule{count: &countA} + modules["sampleModuleB"] = &sampleModule{count: &countB} + + modules.Shutdown() + + assert.Equal(t, 1, countA, "sampleModuleA should have been shutdown") + assert.Equal(t, 1, countB, "sampleModuleB should have been shutdown") +} + func TestNewPBSAnalytics_FileLogger(t *testing.T) { if _, err := os.Stat(TEST_DIR); os.IsNotExist(err) { if err = os.MkdirAll(TEST_DIR, 0755); err != nil { @@ -415,6 +430,8 @@ func (m *mockAnalytics) LogSetUIDObject(ao *analytics.SetUIDObject) {} func (m *mockAnalytics) LogNotificationEventObject(ao *analytics.NotificationEvent) {} +func (m *mockAnalytics) Shutdown() {} + func TestLogObject(t *testing.T) { tests := []struct { description string diff --git a/analytics/core.go b/analytics/core.go index c9e15180f44..bfbbe229fe9 100644 --- a/analytics/core.go +++ b/analytics/core.go @@ -19,6 +19,7 @@ type Module interface { LogSetUIDObject(*SetUIDObject) LogAmpObject(*AmpObject) LogNotificationEventObject(*NotificationEvent) + Shutdown() } // Loggable object of a transaction at /openrtb2/auction endpoint diff --git a/analytics/filesystem/file_module.go b/analytics/filesystem/file_module.go index 01f9c27bf43..1c1b1310c40 100644 --- a/analytics/filesystem/file_module.go +++ b/analytics/filesystem/file_module.go @@ -4,7 +4,8 @@ import ( "bytes" "fmt" - "github.com/chasex/glog" + cglog "github.com/chasex/glog" + "github.com/golang/glog" "github.com/prebid/openrtb/v20/openrtb2" "github.com/prebid/prebid-server/v2/analytics" "github.com/prebid/prebid-server/v2/util/jsonutil" @@ -21,9 +22,14 @@ const ( NOTIFICATION_EVENT RequestType = "/event" ) +type Logger interface { + Debug(v ...interface{}) + Flush() +} + // Module that can perform transactional logging type FileLogger struct { - Logger *glog.Logger + Logger Logger } // Writes AuctionObject to file @@ -85,15 +91,22 @@ func (f *FileLogger) LogNotificationEventObject(ne *analytics.NotificationEvent) f.Logger.Flush() } +// Shutdown the logger +func (f *FileLogger) Shutdown() { + // clear all pending buffered data in case there is any + glog.Info("[FileLogger] Shutdown, trying to flush buffer") + f.Logger.Flush() +} + // Method to initialize the analytic module func NewFileLogger(filename string) (analytics.Module, error) { - options := glog.LogOptions{ + options := cglog.LogOptions{ File: filename, - Flag: glog.LstdFlags, - Level: glog.Ldebug, - Mode: glog.R_Day, + Flag: cglog.LstdFlags, + Level: cglog.Ldebug, + Mode: cglog.R_Day, } - if logger, err := glog.New(options); err == nil { + if logger, err := cglog.New(options); err == nil { return &FileLogger{ logger, }, nil diff --git a/analytics/filesystem/file_module_test.go b/analytics/filesystem/file_module_test.go index 0e0831d14f1..5bc2439ac94 100644 --- a/analytics/filesystem/file_module_test.go +++ b/analytics/filesystem/file_module_test.go @@ -8,12 +8,25 @@ import ( "github.com/prebid/prebid-server/v2/analytics" "github.com/prebid/prebid-server/v2/config" + "github.com/stretchr/testify/mock" "github.com/prebid/openrtb/v20/openrtb2" ) const TEST_DIR string = "testFiles" +type MockLogger struct { + mock.Mock +} + +func (ml *MockLogger) Debug(v ...interface{}) { + ml.Called(v) +} + +func (ml *MockLogger) Flush() { + ml.Called() +} + func TestAmpObject_ToJson(t *testing.T) { ao := &analytics.AmpObject{ Status: http.StatusOK, @@ -97,3 +110,15 @@ func TestFileLogger_LogObjects(t *testing.T) { t.Fatalf("Couldn't initialize file logger: %v", err) } } + +func TestFileLoggerShutdown(t *testing.T) { + mockLogger := &MockLogger{} + fl := &FileLogger{ + Logger: mockLogger, + } + mockLogger.On("Flush").Return(nil) + + fl.Shutdown() + + mockLogger.AssertNumberOfCalls(t, "Flush", 1) +} diff --git a/analytics/pubstack/pubstack_module.go b/analytics/pubstack/pubstack_module.go index 535118c0000..8b012f172dc 100644 --- a/analytics/pubstack/pubstack_module.go +++ b/analytics/pubstack/pubstack_module.go @@ -200,6 +200,12 @@ func (p *PubstackModule) LogAmpObject(ao *analytics.AmpObject) { p.eventChannels[amp].Push(payload) } +// Shutdown - no op since the analytic module already implements system signal handling +// and trying to close a closed channel will cause panic +func (p *PubstackModule) Shutdown() { + glog.Info("[PubstackModule] Shutdown") +} + func (p *PubstackModule) start(c <-chan *Configuration) { for { select { diff --git a/analytics/runner.go b/analytics/runner.go index 7a2c56f77dd..2ca2841214f 100644 --- a/analytics/runner.go +++ b/analytics/runner.go @@ -11,4 +11,5 @@ type Runner interface { LogSetUIDObject(*SetUIDObject) LogAmpObject(*AmpObject, privacy.ActivityControl) LogNotificationEventObject(*NotificationEvent, privacy.ActivityControl) + Shutdown() } diff --git a/endpoints/cookie_sync_test.go b/endpoints/cookie_sync_test.go index edd1958fcb0..7424d65005e 100644 --- a/endpoints/cookie_sync_test.go +++ b/endpoints/cookie_sync_test.go @@ -2198,6 +2198,10 @@ func (m *MockAnalyticsRunner) LogNotificationEventObject(obj *analytics.Notifica m.Called(obj, ac) } +func (m *MockAnalyticsRunner) Shutdown() { + m.Called() +} + type MockGDPRPerms struct { mock.Mock } diff --git a/endpoints/events/event_test.go b/endpoints/events/event_test.go index 81d000fd8a4..1e07bafbc83 100644 --- a/endpoints/events/event_test.go +++ b/endpoints/events/event_test.go @@ -64,6 +64,8 @@ func (e *eventsMockAnalyticsModule) LogNotificationEventObject(ne *analytics.Not e.Invoked = true } +func (e *eventsMockAnalyticsModule) Shutdown() {} + var mockAccountData = map[string]json.RawMessage{ "events_enabled": json.RawMessage(`{"events": {"enabled":true}}`), "events_disabled": json.RawMessage(`{"events": {"enabled":false}}`), diff --git a/endpoints/openrtb2/amp_auction_test.go b/endpoints/openrtb2/amp_auction_test.go index e9b0015afe7..d73e6fb5aa3 100644 --- a/endpoints/openrtb2/amp_auction_test.go +++ b/endpoints/openrtb2/amp_auction_test.go @@ -1734,6 +1734,7 @@ func (logger mockLogger) LogNotificationEventObject(uuidObj *analytics.Notificat func (logger mockLogger) LogAmpObject(ao *analytics.AmpObject, _ privacy.ActivityControl) { *logger.ampObject = *ao } +func (logger mockLogger) Shutdown() {} func TestBuildAmpObject(t *testing.T) { testCases := []struct { diff --git a/endpoints/openrtb2/video_auction_test.go b/endpoints/openrtb2/video_auction_test.go index 55fc7fab957..c9dff0f7c92 100644 --- a/endpoints/openrtb2/video_auction_test.go +++ b/endpoints/openrtb2/video_auction_test.go @@ -1282,6 +1282,8 @@ func (m *mockAnalyticsModule) LogAmpObject(ao *analytics.AmpObject, _ privacy.Ac func (m *mockAnalyticsModule) LogNotificationEventObject(ne *analytics.NotificationEvent, _ privacy.ActivityControl) { } +func (m *mockAnalyticsModule) Shutdown() {} + func mockDeps(t *testing.T, ex *mockExchangeVideo) *endpointDeps { return &endpointDeps{ fakeUUIDGenerator{}, diff --git a/router/router.go b/router/router.go index 0712f6723dc..867c856b06c 100644 --- a/router/router.go +++ b/router/router.go @@ -124,7 +124,8 @@ type Router struct { *httprouter.Router MetricsEngine *metricsConf.DetailedMetricsEngine ParamsValidator openrtb_ext.BidderParamValidator - Shutdown func() + + shutdowns []func() } func New(cfg *config.Configuration, rateConvertor *currency.RateConverter) (r *Router, err error) { @@ -201,11 +202,12 @@ func New(cfg *config.Configuration, rateConvertor *currency.RateConverter) (r *R // Metrics engine r.MetricsEngine = metricsConf.NewMetricsEngine(cfg, openrtb_ext.CoreBidderNames(), syncerKeys, moduleStageNames) shutdown, fetcher, ampFetcher, accounts, categoriesFetcher, videoFetcher, storedRespFetcher := storedRequestsConf.NewStoredRequests(cfg, r.MetricsEngine, generalHttpClient, r.Router) - // todo(zachbadgett): better shutdown - r.Shutdown = shutdown analyticsRunner := analyticsBuild.New(&cfg.Analytics) + // register the analytics runner for shutdown + r.shutdowns = append(r.shutdowns, shutdown, analyticsRunner.Shutdown) + paramsValidator, err := openrtb_ext.NewBidderParamsValidator(schemaDirectory) if err != nil { glog.Fatalf("Failed to create the bidder params validator. %v", err) @@ -301,6 +303,15 @@ func New(cfg *config.Configuration, rateConvertor *currency.RateConverter) (r *R return r, nil } +// Shutdown closes any dependencies of the router that may need closing +func (r *Router) Shutdown() { + glog.Info("[PBS Router] shutting down") + for _, shutdown := range r.shutdowns { + shutdown() + } + glog.Info("[PBS Router] shut down") +} + func checkSupportedUserSyncEndpoints(bidderInfos config.BidderInfos) error { for name, info := range bidderInfos { if info.Syncer == nil {