Skip to content

Commit

Permalink
Merge branch 'master' into multi-shoveler-open-close
Browse files Browse the repository at this point in the history
  • Loading branch information
wavetylor authored Nov 21, 2023
2 parents 32a0753 + 0bef9c2 commit b94e5c2
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 6 deletions.
4 changes: 0 additions & 4 deletions pkg/event/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func (c *fileChangelog) start(ctx context.Context) error {
// we are also watching the parent dir of the changelog in order
// to correctly detect file rotation.
if event.Name == c.path {
events.Debug("FS event: %{op}s", event)
select {
case fsNotifyCh <- event:
case <-ctx.Done():
Expand Down Expand Up @@ -160,7 +159,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event
return
}
event := entry.event()
events.Debug("Read sequence %{seq}d", event.Sequence)
c.send(ctx, eventErr{event: event})
}

Expand Down Expand Up @@ -198,7 +196,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event
if err != io.EOF {
return errors.Wrap(err, "read bytes")
}
events.Debug("EOF. Waiting for more content...")
select {
case <-time.After(time.Second):
events.Debug("Manually checking log")
Expand All @@ -211,7 +208,6 @@ func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event
case event := <-fsNotifyCh:
switch event.Op {
case fsnotify.Write:
events.Debug("Update detected")
continue
case fsnotify.Create:
events.Debug("New changelog created. Consuming the rest of current one...")
Expand Down
52 changes: 50 additions & 2 deletions pkg/event/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event

import (
"context"
"strings"

"github.com/segmentio/errors-go"
)
Expand All @@ -16,7 +17,12 @@ type (
cancelFunc context.CancelFunc // used to shut down the changelog
previous *Event // the previous event we read
}
IteratorOpt func(i *Iterator)
IteratorOpt func(i *Iterator)
FilteredIterator struct {
iterator *Iterator
family string
table string
}
)

var (
Expand Down Expand Up @@ -69,10 +75,52 @@ func (i *Iterator) Next(ctx context.Context) (event Event, err error) {
return event, ErrOutOfSync
}
}
return
return event, err
}

func (i *Iterator) Close() error {
i.cancelFunc() // shut down the changelog
return nil
}

// NewFilteredIterator returns a new iterator that looks for changes to the specified family
// and table in the background and then exposes those changes through the Next method.
// Make sure to Close() the iterator when you are done using it.
//
// If ErrOutOfSync is returned, that means that the iterator likely could not keep
// up with the changelog. Please invalidate any caches dependent on this iterator.
//
// If a different error is returned, it's not really known at this time the best way
// to deal with it. It's possible that it could be a change in the changelog json
// schema, or something more temporary. Best response for now will be to log and instrument
// the error, and then just invalidate the cache the same way you would with ErrOutOfSync.
// As time goes on, we'll know a little bit better how to operate this under real-world
// conditions.
func NewFilteredIterator(ctx context.Context, changelogPath, family, table string, opts IteratorOpt) (*FilteredIterator, error) {
iter, err := NewIterator(ctx, changelogPath, opts)
if err != nil {
return nil, err
}
fi := &FilteredIterator{
iterator: iter,
family: family,
table: table,
}
return fi, nil
}

// Next blocks and returns the next event that matches the specified family and table
func (i *FilteredIterator) Next(ctx context.Context) (event Event, err error) {
for {
event, err = i.iterator.Next(ctx)
if err != nil ||
(strings.EqualFold(event.RowUpdate.FamilyName, i.family) && strings.EqualFold(event.RowUpdate.TableName, i.table)) {
break
}
}
return event, err
}

func (i *FilteredIterator) Close() error {
return i.iterator.Close()
}
65 changes: 65 additions & 0 deletions pkg/event/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,48 @@ func TestIterator(t *testing.T) {
}
}

func TestFilteredIterator(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

const numEvents = 10

var fam, tbl string
fam = "numbers"
changelog := &fakeChangelog{}
for i := 0; i < numEvents; i++ {
if i%2 == 0 {
tbl = "even"
} else {
tbl = "odd"
}
changelog.ers = append(changelog.ers, eventErr{
event: Event{
Sequence: int64(i),
RowUpdate: RowUpdate{
FamilyName: fam,
TableName: tbl,
Keys: nil,
},
},
})
}

iter, err := NewFilteredIterator(ctx, "test file", "numbers", "even", func(i *Iterator) {
i.changelog = changelog
})
require.NoError(t, err)
defer func() {
err := iter.Close()
require.NoError(t, err)
}()
for i := 0; i < numEvents/2; i++ {
event, err := iter.Next(ctx)
require.NoError(t, err)
require.EqualValues(t, i*2, event.Sequence)
}
}

func TestIteratorFailedChangelogStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -84,5 +126,28 @@ func TestIteratorSkippedEvent(t *testing.T) {
event, err = iter.Next(ctx)
require.NoError(t, err)
require.EqualValues(t, 4, event.Sequence)
}

func TestFilteredIteratorSkippedEvent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

iter, err := NewFilteredIterator(ctx, "test file", "foo", "bar", func(i *Iterator) {
i.changelog = &fakeChangelog{
ers: []eventErr{
{event: Event{Sequence: 0}},
{event: Event{Sequence: 3}},
{event: Event{Sequence: 4}},
},
}
})
require.NoError(t, err)
defer func() {
err := iter.Close()
require.NoError(t, err)
}()
// even if fam/tbl filter does not match we need to return errors
event, err := iter.Next(ctx)
require.EqualValues(t, 3, event.Sequence)
require.EqualError(t, err, "out of sync with changelog. invalidate caches please.")
}

0 comments on commit b94e5c2

Please sign in to comment.