diff --git a/pkg/event/changelog.go b/pkg/event/changelog.go index 09cf8925..eca611f8 100644 --- a/pkg/event/changelog.go +++ b/pkg/event/changelog.go @@ -107,7 +107,6 @@ func (c *fileChangelog) start(ctx context.Context) error { // we are also watching the parent dir of the changelog in order // to correctly detect file rotation. if event.Name == c.path { - events.Debug("FS event: %{op}s", event) select { case fsNotifyCh <- event: case <-ctx.Done(): @@ -160,7 +159,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event return } event := entry.event() - events.Debug("Read sequence %{seq}d", event.Sequence) c.send(ctx, eventErr{event: event}) } @@ -198,7 +196,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event if err != io.EOF { return errors.Wrap(err, "read bytes") } - events.Debug("EOF. Waiting for more content...") select { case <-time.After(time.Second): events.Debug("Manually checking log") @@ -211,7 +208,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event case event := <-fsNotifyCh: switch event.Op { case fsnotify.Write: - events.Debug("Update detected") continue case fsnotify.Create: events.Debug("New changelog created. Consuming the rest of current one...") diff --git a/pkg/event/iterator.go b/pkg/event/iterator.go index d6536ee6..1dce089d 100644 --- a/pkg/event/iterator.go +++ b/pkg/event/iterator.go @@ -2,6 +2,7 @@ package event import ( "context" + "strings" "github.com/segmentio/errors-go" ) @@ -16,7 +17,12 @@ type ( cancelFunc context.CancelFunc // used to shut down the changelog previous *Event // the previous event we read } - IteratorOpt func(i *Iterator) + IteratorOpt func(i *Iterator) + FilteredIterator struct { + iterator *Iterator + family string + table string + } ) var ( @@ -69,10 +75,52 @@ func (i *Iterator) Next(ctx context.Context) (event Event, err error) { return event, ErrOutOfSync } } - return + return event, err } func (i *Iterator) Close() error { i.cancelFunc() // shut down the changelog return nil } + +// NewFilteredIterator returns a new iterator that looks for changes to the specified family +// and table in the background and then exposes those changes through the Next method. +// Make sure to Close() the iterator when you are done using it. +// +// If ErrOutOfSync is returned, that means that the iterator likely could not keep +// up with the changelog. Please invalidate any caches dependent on this iterator. +// +// If a different error is returned, it's not really known at this time the best way +// to deal with it. It's possible that it could be a change in the changelog json +// schema, or something more temporary. Best response for now will be to log and instrument +// the error, and then just invalidate the cache the same way you would with ErrOutOfSync. +// As time goes on, we'll know a little bit better how to operate this under real-world +// conditions. +func NewFilteredIterator(ctx context.Context, changelogPath, family, table string, opts IteratorOpt) (*FilteredIterator, error) { + iter, err := NewIterator(ctx, changelogPath, opts) + if err != nil { + return nil, err + } + fi := &FilteredIterator{ + iterator: iter, + family: family, + table: table, + } + return fi, nil +} + +// Next blocks and returns the next event that matches the specified family and table +func (i *FilteredIterator) Next(ctx context.Context) (event Event, err error) { + for { + event, err = i.iterator.Next(ctx) + if err != nil || + (strings.EqualFold(event.RowUpdate.FamilyName, i.family) && strings.EqualFold(event.RowUpdate.TableName, i.table)) { + break + } + } + return event, err +} + +func (i *FilteredIterator) Close() error { + return i.iterator.Close() +} diff --git a/pkg/event/iterator_test.go b/pkg/event/iterator_test.go index 3c664cc4..26ded33d 100644 --- a/pkg/event/iterator_test.go +++ b/pkg/event/iterator_test.go @@ -36,6 +36,48 @@ func TestIterator(t *testing.T) { } } +func TestFilteredIterator(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + const numEvents = 10 + + var fam, tbl string + fam = "numbers" + changelog := &fakeChangelog{} + for i := 0; i < numEvents; i++ { + if i%2 == 0 { + tbl = "even" + } else { + tbl = "odd" + } + changelog.ers = append(changelog.ers, eventErr{ + event: Event{ + Sequence: int64(i), + RowUpdate: RowUpdate{ + FamilyName: fam, + TableName: tbl, + Keys: nil, + }, + }, + }) + } + + iter, err := NewFilteredIterator(ctx, "test file", "numbers", "even", func(i *Iterator) { + i.changelog = changelog + }) + require.NoError(t, err) + defer func() { + err := iter.Close() + require.NoError(t, err) + }() + for i := 0; i < numEvents/2; i++ { + event, err := iter.Next(ctx) + require.NoError(t, err) + require.EqualValues(t, i*2, event.Sequence) + } +} + func TestIteratorFailedChangelogStart(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -84,5 +126,28 @@ func TestIteratorSkippedEvent(t *testing.T) { event, err = iter.Next(ctx) require.NoError(t, err) require.EqualValues(t, 4, event.Sequence) +} + +func TestFilteredIteratorSkippedEvent(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + iter, err := NewFilteredIterator(ctx, "test file", "foo", "bar", func(i *Iterator) { + i.changelog = &fakeChangelog{ + ers: []eventErr{ + {event: Event{Sequence: 0}}, + {event: Event{Sequence: 3}}, + {event: Event{Sequence: 4}}, + }, + } + }) + require.NoError(t, err) + defer func() { + err := iter.Close() + require.NoError(t, err) + }() + // even if fam/tbl filter does not match we need to return errors + event, err := iter.Next(ctx) + require.EqualValues(t, 3, event.Sequence) + require.EqualError(t, err, "out of sync with changelog. invalidate caches please.") }