Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to get filtered next event from changelog #132

Merged
merged 2 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pkg/event/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func (c *fileChangelog) start(ctx context.Context) error {
// we are also watching the parent dir of the changelog in order
// to correctly detect file rotation.
if event.Name == c.path {
events.Debug("FS event: %{op}s", event)
select {
case fsNotifyCh <- event:
case <-ctx.Done():
Expand Down Expand Up @@ -160,7 +159,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event
return
}
event := entry.event()
events.Debug("Read sequence %{seq}d", event.Sequence)
c.send(ctx, eventErr{event: event})
}

Expand Down Expand Up @@ -198,7 +196,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event
if err != io.EOF {
return errors.Wrap(err, "read bytes")
}
events.Debug("EOF. Waiting for more content...")
select {
case <-time.After(time.Second):
events.Debug("Manually checking log")
Expand All @@ -211,7 +208,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event
case event := <-fsNotifyCh:
switch event.Op {
case fsnotify.Write:
events.Debug("Update detected")
continue
case fsnotify.Create:
events.Debug("New changelog created. Consuming the rest of current one...")
Expand Down
52 changes: 50 additions & 2 deletions pkg/event/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event

import (
"context"
"strings"

"github.com/segmentio/errors-go"
)
Expand All @@ -16,7 +17,12 @@ type (
cancelFunc context.CancelFunc // used to shut down the changelog
previous *Event // the previous event we read
}
IteratorOpt func(i *Iterator)
IteratorOpt func(i *Iterator)
FilteredIterator struct {
iterator *Iterator
family string
table string
}
)

var (
Expand Down Expand Up @@ -69,10 +75,52 @@ func (i *Iterator) Next(ctx context.Context) (event Event, err error) {
return event, ErrOutOfSync
}
}
return
return event, err
}

func (i *Iterator) Close() error {
i.cancelFunc() // shut down the changelog
return nil
}

// NewFilteredIterator returns a new iterator that looks for changes to the specified family
// and table in the background and then exposes those changes through the Next method.
// Make sure to Close() the iterator when you are done using it.
//
// If ErrOutOfSync is returned, that means that the iterator likely could not keep
// up with the changelog. Please invalidate any caches dependent on this iterator.
//
// If a different error is returned, it's not really known at this time the best way
// to deal with it. It's possible that it could be a change in the changelog json
// schema, or something more temporary. Best response for now will be to log and instrument
// the error, and then just invalidate the cache the same way you would with ErrOutOfSync.
// As time goes on, we'll know a little bit better how to operate this under real-world
// conditions.
func NewFilteredIterator(ctx context.Context, changelogPath, family, table string, opts IteratorOpt) (*FilteredIterator, error) {
iter, err := NewIterator(ctx, changelogPath, opts)
if err != nil {
return nil, err
}
fi := &FilteredIterator{
iterator: iter,
family: family,
table: table,
}
return fi, nil
}

// Next blocks and returns the next event that matches the specified family and table
func (i *FilteredIterator) Next(ctx context.Context) (event Event, err error) {
for {
event, err = i.iterator.Next(ctx)
if err != nil ||
(strings.EqualFold(event.RowUpdate.FamilyName, i.family) && strings.EqualFold(event.RowUpdate.TableName, i.table)) {
break
}
}
return event, err
}

func (i *FilteredIterator) Close() error {
return i.iterator.Close()
}
65 changes: 65 additions & 0 deletions pkg/event/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,48 @@ func TestIterator(t *testing.T) {
}
}

func TestFilteredIterator(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

const numEvents = 10

var fam, tbl string
fam = "numbers"
changelog := &fakeChangelog{}
for i := 0; i < numEvents; i++ {
if i%2 == 0 {
tbl = "even"
} else {
tbl = "odd"
}
changelog.ers = append(changelog.ers, eventErr{
event: Event{
Sequence: int64(i),
RowUpdate: RowUpdate{
FamilyName: fam,
TableName: tbl,
Keys: nil,
},
},
})
}

iter, err := NewFilteredIterator(ctx, "test file", "numbers", "even", func(i *Iterator) {
i.changelog = changelog
})
require.NoError(t, err)
defer func() {
err := iter.Close()
require.NoError(t, err)
}()
for i := 0; i < numEvents/2; i++ {
event, err := iter.Next(ctx)
require.NoError(t, err)
require.EqualValues(t, i*2, event.Sequence)
}
}

func TestIteratorFailedChangelogStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -84,5 +126,28 @@ func TestIteratorSkippedEvent(t *testing.T) {
event, err = iter.Next(ctx)
require.NoError(t, err)
require.EqualValues(t, 4, event.Sequence)
}

func TestFilteredIteratorSkippedEvent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

iter, err := NewFilteredIterator(ctx, "test file", "foo", "bar", func(i *Iterator) {
i.changelog = &fakeChangelog{
ers: []eventErr{
{event: Event{Sequence: 0}},
{event: Event{Sequence: 3}},
{event: Event{Sequence: 4}},
},
}
})
require.NoError(t, err)
defer func() {
err := iter.Close()
require.NoError(t, err)
}()
// even if fam/tbl filter does not match we need to return errors
event, err := iter.Next(ctx)
require.EqualValues(t, 3, event.Sequence)
require.EqualError(t, err, "out of sync with changelog. invalidate caches please.")
}