Skip to content

Commit

Permalink
add deduplicator clean up test
Browse files Browse the repository at this point in the history
  • Loading branch information
dkotik committed Feb 26, 2024
1 parent 911b60e commit 3f5cdc3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
9 changes: 9 additions & 0 deletions message/router/middleware/deduplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ func (d *Deduplicator) cleanOut(tagsBefore time.Time) {
}
}

// Len returns the number of known tags that have not been
// cleaned out yet.
func (d *Deduplicator) Len() (count int) {
d.mu.Lock()
count = len(d.tags)
d.mu.Unlock()
return
}

// IsDuplicate returns true if the message hash tag calculated
// using a [MessageHasher] was seen in deduplication time window.
func (d *Deduplicator) IsDuplicate(m *message.Message) (bool, error) {
Expand Down
33 changes: 33 additions & 0 deletions message/router/middleware/deduplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,36 @@ func TestMessageHasherSHA256(t *testing.T) {
t.Fatal("MessageHasherReadLimitMinimum did not apply to SHA256 message hasher")
}
}

func TestDeduplicatorCleanup(t *testing.T) {
t.Parallel()

count := 0
wait := time.Millisecond * 5
d := middleware.NewDeduplicator(
middleware.NewMessageHasherAdler32(1024),
time.Second,
)
h := d.Middleware(func(msg *message.Message) (messages []*message.Message, e error) {
count++
return nil, nil
})

for i := 0; i < 6; i++ { // only one should go through
msg := message.NewMessage(
fmt.Sprintf("expiring%d", i),
[]byte(fmt.Sprintf("expiring%d", i)),
)
_, err := h(msg)
if err != nil {
t.Fatal(err)
}
}
time.Sleep(wait * 2)
if count != 6 {
t.Errorf("sent six messages, but only received %d", count)
}
if l := d.Len(); l != 0 {
t.Errorf("tags should have been cleaned out, but %d remain", l)
}
}

0 comments on commit 3f5cdc3

Please sign in to comment.