Skip to content

Commit

Permalink
add deduplicator metadata hash pull and clean up tests a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
dkotik committed Feb 26, 2024
1 parent 3f5cdc3 commit 48d5ec8
Showing 1 changed file with 34 additions and 34 deletions.
68 changes: 34 additions & 34 deletions message/router/middleware/deduplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ func TestDeduplicatorMiddleware(t *testing.T) {
[]byte("1"),
)
_, err := h(msg)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)
}

for i := 0; i < 2; i++ { // only one should go through
Expand All @@ -42,9 +40,7 @@ func TestDeduplicatorMiddleware(t *testing.T) {
[]byte("2"),
)
_, err := h(msg)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)
}

assert.Equal(t, 2, count)
Expand All @@ -68,19 +64,15 @@ func TestDeduplicatorPublisherDecorator(t *testing.T) {
middleware.NewMessageHasherSHA256(1024),
time.Second,
).PublisherDecorator()(pubSub)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)

for i := 0; i < 6; i++ { // only one should go through
msg := message.NewMessage(
fmt.Sprintf("first%d", i),
[]byte("1"),
)
err := decorated.Publish(testDedupeTopic, msg)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)
}

for i := 0; i < 2; i++ { // only one should go through
Expand All @@ -89,15 +81,11 @@ func TestDeduplicatorPublisherDecorator(t *testing.T) {
[]byte("2"),
)
err := decorated.Publish(testDedupeTopic, msg)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)
}

got, err := pubSub.Subscribe(ctx, testDedupeTopic)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)
count := 0
for m := range got {
count++
Expand All @@ -115,13 +103,9 @@ func TestMessageHasherAdler32(t *testing.T) {

msg := message.NewMessage("adlerTest", []byte("some random data"))
h1, err := short(msg)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)
h2, err := full(msg)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)

if h1 != h2 {
t.Fatal("MessageHasherReadLimitMinimum did not apply to Adler32 message hasher")
Expand All @@ -136,27 +120,41 @@ func TestMessageHasherSHA256(t *testing.T) {

msg := message.NewMessage("adlerTest", []byte("some random data"))
h1, err := short(msg)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)
h2, err := full(msg)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)

if h1 != h2 {
t.Fatal("MessageHasherReadLimitMinimum did not apply to SHA256 message hasher")
}
}

func TestMessageHasherFromMetadataField(t *testing.T) {
t.Parallel()

field := "hash"
value := "someHash"
msg := message.NewMessage("one", []byte("1"))
msg.Metadata[field] = value
metadataPull := middleware.NewMessageHasherFromMetadataField(field)

h, err := metadataPull(msg)
assert.NoError(t, err)
assert.Equal(t, h, value)

delete(msg.Metadata, field) // empty out
_, err = metadataPull(msg)
assert.Error(t, err)
}

func TestDeduplicatorCleanup(t *testing.T) {
t.Parallel()

count := 0
wait := time.Millisecond * 5
d := middleware.NewDeduplicator(
middleware.NewMessageHasherAdler32(1024),
time.Second,
wait,
)
h := d.Middleware(func(msg *message.Message) (messages []*message.Message, e error) {
count++
Expand All @@ -169,10 +167,12 @@ func TestDeduplicatorCleanup(t *testing.T) {
[]byte(fmt.Sprintf("expiring%d", i)),
)
_, err := h(msg)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)
}
if l := d.Len(); l != 6 {
t.Errorf("expected 6 tags, but %d remain", l)
}

time.Sleep(wait * 2)
if count != 6 {
t.Errorf("sent six messages, but only received %d", count)
Expand Down

0 comments on commit 48d5ec8

Please sign in to comment.