Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to get filtered next event from changelog #132

Merged
merged 2 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pkg/event/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func (c *fileChangelog) start(ctx context.Context) error {
// we are also watching the parent dir of the changelog in order
// to correctly detect file rotation.
if event.Name == c.path {
events.Debug("FS event: %{op}s", event)
select {
case fsNotifyCh <- event:
case <-ctx.Done():
Expand Down Expand Up @@ -160,7 +159,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event
return
}
event := entry.event()
events.Debug("Read sequence %{seq}d", event.Sequence)
c.send(ctx, eventErr{event: event})
}

Expand Down Expand Up @@ -198,7 +196,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event
if err != io.EOF {
return errors.Wrap(err, "read bytes")
}
events.Debug("EOF. Waiting for more content...")
select {
case <-time.After(time.Second):
events.Debug("Manually checking log")
Expand All @@ -211,7 +208,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event
case event := <-fsNotifyCh:
switch event.Op {
case fsnotify.Write:
events.Debug("Update detected")
continue
case fsnotify.Create:
events.Debug("New changelog created. Consuming the rest of current one...")
Expand Down
15 changes: 14 additions & 1 deletion pkg/event/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event

import (
"context"
"strings"

"github.com/segmentio/errors-go"
)
Expand Down Expand Up @@ -69,7 +70,19 @@ func (i *Iterator) Next(ctx context.Context) (event Event, err error) {
return event, ErrOutOfSync
}
}
return
return event, err
}

// NextForFamilyTable blocks and returns the next event that matches the specified family and table
func (i *Iterator) NextForFamilyTable(ctx context.Context, family, table string) (event Event, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if int-consumer wanted to use this, it would be to open an iterator for every table it wanted to track?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, do you see any issues with that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, just wanted to clarify since using Next() and NextForFamilyTable() feel pretty incompatible with how they'd behave. My only suggestion would be to perhaps create a filtered_iterator struct with the same Next() api and have family and table as part of its construction. But I won't block this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the use of Next and NextForFamilyTable together would not make sense and would not be the intended usage model. Perhaps to make this clearer a separate struct would be better. Will modify and update shortly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great, thank you for doing that

for {
event, err = i.Next(ctx)
if err != nil ||
(strings.EqualFold(event.RowUpdate.FamilyName, family) && strings.EqualFold(event.RowUpdate.TableName, table)) {
break
}
}
return event, err
}

func (i *Iterator) Close() error {
Expand Down
67 changes: 66 additions & 1 deletion pkg/event/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestIterator(t *testing.T) {
func TestIteratorNext(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Expand All @@ -36,6 +36,48 @@ func TestIterator(t *testing.T) {
}
}

func TestIteratorNextForFamilyTable(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

const numEvents = 10

var fam, tbl string
fam = "numbers"
changelog := &fakeChangelog{}
for i := 0; i < numEvents; i++ {
if i%2 == 0 {
tbl = "even"
} else {
tbl = "odd"
}
changelog.ers = append(changelog.ers, eventErr{
event: Event{
Sequence: int64(i),
RowUpdate: RowUpdate{
FamilyName: fam,
TableName: tbl,
Keys: nil,
},
},
})
}

iter, err := NewIterator(ctx, "test file", func(i *Iterator) {
i.changelog = changelog
})
require.NoError(t, err)
defer func() {
err := iter.Close()
require.NoError(t, err)
}()
for i := 0; i < numEvents/2; i++ {
event, err := iter.NextForFamilyTable(ctx, "numbers", "even")
require.NoError(t, err)
require.EqualValues(t, i*2, event.Sequence)
}
}

func TestIteratorFailedChangelogStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -84,5 +126,28 @@ func TestIteratorSkippedEvent(t *testing.T) {
event, err = iter.Next(ctx)
require.NoError(t, err)
require.EqualValues(t, 4, event.Sequence)
}

func TestIteratorNextForFamilyTableReturnsErrForSkippedEvent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

iter, err := NewIterator(ctx, "test file", func(i *Iterator) {
i.changelog = &fakeChangelog{
ers: []eventErr{
{event: Event{Sequence: 0}},
{event: Event{Sequence: 3}},
{event: Event{Sequence: 4}},
},
}
})
require.NoError(t, err)
defer func() {
err := iter.Close()
require.NoError(t, err)
}()
// even if fam/tbl filter does not match we need to return errors
event, err := iter.NextForFamilyTable(ctx, "foo", "bar")
require.EqualValues(t, 3, event.Sequence)
require.EqualError(t, err, "out of sync with changelog. invalidate caches please.")
}