diff --git a/_examples/counter/counter.go b/_examples/counter/counter.go new file mode 100644 index 0000000..d3e956a --- /dev/null +++ b/_examples/counter/counter.go @@ -0,0 +1,61 @@ +package counter + +import ( + "github.com/ThreeDotsLabs/esja" +) + +type Counter struct { + stream *esja.Stream[Counter] + + id string + currentValue int +} + +func NewCounter(id string) (*Counter, error) { + s, err := esja.NewStreamWithType[Counter](id, "Counter") + if err != nil { + return nil, err + } + + p := &Counter{ + stream: s, + } + + err = p.stream.Record(p, Created{ + ID: id, + }) + if err != nil { + return nil, err + } + + return p, nil +} + +func (c Counter) Stream() *esja.Stream[Counter] { + return c.stream +} + +func (c Counter) NewWithStream(s *esja.Stream[Counter]) *Counter { + return &Counter{stream: s} +} + +func (c Counter) Snapshot() esja.Snapshot[Counter] { + return Snapshot{ + ID: c.id, + CurrentValue: c.currentValue, + } +} + +func (c Counter) ID() string { + return c.id +} + +func (c *Counter) CurrentValue() int { + return c.currentValue +} + +func (c *Counter) IncrementBy(v int) error { + return c.stream.Record(c, IncrementedBy{ + Value: v, + }) +} diff --git a/_examples/counter/counter_test.go b/_examples/counter/counter_test.go new file mode 100644 index 0000000..9f31863 --- /dev/null +++ b/_examples/counter/counter_test.go @@ -0,0 +1,28 @@ +package counter_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "counter" +) + +func TestCounter(t *testing.T) { + c, err := counter.NewCounter("ID") + require.NoError(t, err) + + require.Equal(t, 0, c.CurrentValue()) + + err = c.IncrementBy(10) + require.NoError(t, err) + + require.Equal(t, 10, c.CurrentValue()) + + err = c.IncrementBy(20) + require.NoError(t, err) + err = c.IncrementBy(10) + require.NoError(t, err) + + require.Equal(t, 40, c.CurrentValue()) +} diff --git a/_examples/counter/events.go b/_examples/counter/events.go new file mode 100644 index 0000000..84e249e --- /dev/null +++ b/_examples/counter/events.go @@ -0,0 +1,27 @@ +package counter + +type Created struct { + ID string +} + +func (Created) EventName() string { + return "Created_v1" +} + +func (e Created) ApplyTo(c *Counter) error { + c.id = e.ID + return nil +} + +type IncrementedBy struct { + Value int +} + +func (IncrementedBy) EventName() string { + return "IncrementedBy_v1" +} + +func (e IncrementedBy) ApplyTo(c *Counter) error { + c.currentValue += e.Value + return nil +} diff --git a/_examples/counter/go.mod b/_examples/counter/go.mod new file mode 100644 index 0000000..227e4d8 --- /dev/null +++ b/_examples/counter/go.mod @@ -0,0 +1,17 @@ +module counter + +go 1.18 + +require ( + github.com/ThreeDotsLabs/esja v0.0.0-20221208191400-8fbb493947e7 + github.com/google/uuid v1.3.0 + github.com/stretchr/testify v1.8.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/ThreeDotsLabs/esja => ../../ diff --git a/_examples/counter/go.sum b/_examples/counter/go.sum new file mode 100644 index 0000000..629bca6 --- /dev/null +++ b/_examples/counter/go.sum @@ -0,0 +1,19 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/_examples/counter/snapshot.go b/_examples/counter/snapshot.go new file mode 100644 index 0000000..3da4e57 --- /dev/null +++ b/_examples/counter/snapshot.go @@ -0,0 +1,16 @@ +package counter + +type Snapshot struct { + ID string + CurrentValue int +} + +func (s Snapshot) SnapshotName() string { + return "CounterSnapshot_v1" +} + +func (s Snapshot) ApplyTo(c *Counter) error { + c.id = s.ID + c.currentValue = s.CurrentValue + return nil +} diff --git a/_examples/counter/storage/eventstore_test.go b/_examples/counter/storage/eventstore_test.go new file mode 100644 index 0000000..f536fa0 --- /dev/null +++ b/_examples/counter/storage/eventstore_test.go @@ -0,0 +1,67 @@ +package storage_test + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ThreeDotsLabs/esja/eventstore" + + "counter" +) + +func TestCounter_Repositories(t *testing.T) { + testCases := []struct { + name string + repository eventstore.EventStore[counter.Counter] + }{ + { + name: "in_memory", + repository: eventstore.NewInMemoryStore[counter.Counter](eventstore.InMemoryStoreConfig{MakeSnapshotEveryNVersions: 100}), + }, + } + + ctx := context.Background() + for i := range testCases { + tc := testCases[i] + t.Run(tc.name, func(t *testing.T) { + id := uuid.NewString() + + c, err := counter.NewCounter(id) + assert.NoError(t, err) + assert.Equal(t, id, c.ID()) + assert.Equal(t, 0, c.CurrentValue()) + + _, err = tc.repository.Load(ctx, id) + assert.ErrorIs(t, err, eventstore.ErrEntityNotFound) + + err = tc.repository.Save(ctx, c) + require.NoError(t, err) + + fromRepo, err := tc.repository.Load(ctx, id) + assert.NoError(t, err) + assert.Equal(t, c.ID(), fromRepo.ID()) + assert.Equal(t, c.CurrentValue(), fromRepo.CurrentValue()) + + incrementFor := 300 + for i := 0; i < incrementFor; i++ { + c, err = tc.repository.Load(ctx, id) + require.NoError(t, err) + + err = c.IncrementBy(1) + require.NoError(t, err) + + err = tc.repository.Save(ctx, c) + require.NoError(t, err) + } + + fromRepo, err = tc.repository.Load(ctx, id) + assert.NoError(t, err) + assert.Equal(t, c.ID(), fromRepo.ID()) + assert.Equal(t, incrementFor, fromRepo.CurrentValue()) + }) + } +} diff --git a/_examples/postcard/go.mod b/_examples/postcard/go.mod index 33f18a3..e861062 100644 --- a/_examples/postcard/go.mod +++ b/_examples/postcard/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( github.com/ThreeDotsLabs/esja v0.0.0-20221208191400-8fbb493947e7 + github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963 github.com/google/uuid v1.3.0 github.com/lib/pq v1.10.6 github.com/mattn/go-sqlite3 v1.14.16 @@ -11,7 +12,6 @@ require ( ) require ( - github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/_examples/postcard/go.sum b/_examples/postcard/go.sum index acabb5d..8b99bf7 100644 --- a/_examples/postcard/go.sum +++ b/_examples/postcard/go.sum @@ -1,5 +1,3 @@ -github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d h1:bZBc4vDne17OYqoeSceaq5eaZM6vOUMVaJIBy+dNRH4= -github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d/go.mod h1:wu5cEZEjFUIXR9hdniDvGbbZARrYHTRi6G2bNaSCC/E= github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963 h1:4EQlsCpfwxjn5ijR8fdL6ap1q04guWUCHgnZ+jPdEjY= github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963/go.mod h1:wu5cEZEjFUIXR9hdniDvGbbZARrYHTRi6G2bNaSCC/E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/_examples/postcard/postcard.go b/_examples/postcard/postcard.go index fbe2e0c..dca6258 100644 --- a/_examples/postcard/postcard.go +++ b/_examples/postcard/postcard.go @@ -44,14 +44,6 @@ func NewPostcard(id string) (*Postcard, error) { return p, nil } -func (p *Postcard) Send() error { - if p.sent { - return fmt.Errorf("postcard already sent") - } - - return p.stream.Record(p, Sent{}) -} - func (p Postcard) Stream() *esja.Stream[Postcard] { return p.stream } @@ -80,6 +72,14 @@ func (p Postcard) Sent() bool { return p.sent } +func (p *Postcard) Send() error { + if p.sent { + return fmt.Errorf("postcard already sent") + } + + return p.stream.Record(p, Sent{}) +} + func (p *Postcard) Write(content string) error { return p.stream.Record(p, Written{ Content: content, diff --git a/_examples/postcard/storage/eventstore_test.go b/_examples/postcard/storage/eventstore_test.go index f09e04e..c0d5293 100644 --- a/_examples/postcard/storage/eventstore_test.go +++ b/_examples/postcard/storage/eventstore_test.go @@ -12,11 +12,11 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "postcard/storage" "github.com/ThreeDotsLabs/esja/eventstore" "postcard" - "postcard/storage" ) var ( @@ -42,7 +42,7 @@ func TestPostcard_Repositories(t *testing.T) { }{ { name: "in_memory", - repository: eventstore.NewInMemoryStore[postcard.Postcard](), + repository: eventstore.NewInMemoryStore[postcard.Postcard](eventstore.InMemoryStoreConfig{}), }, { name: "postgres_simple", diff --git a/entity.go b/entity.go index 6c69039..5867650 100644 --- a/entity.go +++ b/entity.go @@ -1,5 +1,7 @@ package esja +import "fmt" + // Entity represents the event-sourced type saved and loaded by the event store. // In DDD terms, it is the "aggregate root". // @@ -32,21 +34,70 @@ type Entity[T any] interface { NewWithStream(*Stream[T]) *T } +// NewEntityWithSnapshot instantiates a new T with the given snapshot and events applied to it. +// At the same time the entity's internal Stream is initialised, +// so it can record new upcoming events. +func NewEntityWithSnapshot[T Entity[T]]( + id string, + snapshot VersionedSnapshot[T], + events []VersionedEvent[T], +) (*T, error) { + var t T + + stream, err := NewStream[T](id) + if err != nil { + return nil, err + } + + stream.queue = events + stream.version = snapshot.StreamVersion + if len(events) != 0 { + stream.version = events[len(events)-1].StreamVersion + } + + target := t.NewWithStream(stream) + + err = snapshot.ApplyTo(target) + if err != nil { + return nil, err + } + + events = stream.PopEvents() + for _, e := range events { + err := e.ApplyTo(target) + if err != nil { + return nil, err + } + } + + return target, nil +} + // NewEntity instantiates a new T with the given events applied to it. // At the same time the entity's internal Stream is initialised, -// so it can record new upcoming stream. -func NewEntity[T Entity[T]](id string, eventsSlice []VersionedEvent[T]) (*T, error) { +// so it can record new upcoming events. +func NewEntity[T Entity[T]]( + id string, + events []VersionedEvent[T], +) (*T, error) { + if len(events) == 0 { + return nil, fmt.Errorf("no stream to load") + } + var t T - stream, err := newStream(id, eventsSlice) + stream, err := NewStream[T](id) if err != nil { return nil, err } - eventsSlice = stream.PopEvents() + stream.queue = events + stream.version = events[len(events)-1].StreamVersion target := t.NewWithStream(stream) - for _, e := range eventsSlice { + + events = stream.PopEvents() + for _, e := range events { err := e.ApplyTo(target) if err != nil { return nil, err diff --git a/event.go b/event.go index 5d25d78..239dafc 100644 --- a/event.go +++ b/event.go @@ -1,6 +1,10 @@ package esja +// Event is a simple Entity event model type Event[T any] interface { + // ApplyTo applies the event to the entity. + ApplyTo(*T) error + // EventName should identify the event and the version of its schema. // // Example: @@ -9,9 +13,6 @@ type Event[T any] interface { // return "FooCreated_v1" // } EventName() string - - // ApplyTo applies the event to the entity. - ApplyTo(*T) error } // VersionedEvent is an event with a corresponding stream version. diff --git a/eventstore/inmemory.go b/eventstore/inmemory.go index a35c05f..5c3efcf 100644 --- a/eventstore/inmemory.go +++ b/eventstore/inmemory.go @@ -8,15 +8,26 @@ import ( "github.com/ThreeDotsLabs/esja" ) +type InMemoryStoreConfig struct { + // MakeSnapshotEveryNVersions configures a frequency of snapshot creation + // Once the current event version and last snapshot version difference exceeds the value, + // a new snapshot will be created with a version of the current event version. + MakeSnapshotEveryNVersions int +} + type InMemoryStore[T esja.Entity[T]] struct { - lock sync.RWMutex - events map[string][]esja.VersionedEvent[T] + lock sync.RWMutex + events map[string][]esja.VersionedEvent[T] + snapshots map[string][]esja.VersionedSnapshot[T] + config InMemoryStoreConfig } -func NewInMemoryStore[T esja.Entity[T]]() *InMemoryStore[T] { +func NewInMemoryStore[T esja.Entity[T]](config InMemoryStoreConfig) *InMemoryStore[T] { return &InMemoryStore[T]{ - lock: sync.RWMutex{}, - events: map[string][]esja.VersionedEvent[T]{}, + lock: sync.RWMutex{}, + events: map[string][]esja.VersionedEvent[T]{}, + snapshots: map[string][]esja.VersionedSnapshot[T]{}, + config: config, } } @@ -24,12 +35,27 @@ func (i *InMemoryStore[T]) Load(_ context.Context, id string) (*T, error) { i.lock.RLock() defer i.lock.RUnlock() + // In the other databases this could be optimized + // as we do not need to load events of version lower than the lastSnapshot version. events, ok := i.events[id] if !ok { return nil, ErrEntityNotFound } - return esja.NewEntity(id, events) + var eventsToApply []esja.VersionedEvent[T] + + s, found := i.loadLastSnapshot(id) + if !found { + return esja.NewEntity(id, events) + } + + for _, e := range events { + if e.StreamVersion > s.StreamVersion { + eventsToApply = append(eventsToApply, e) + } + } + + return esja.NewEntityWithSnapshot(id, s, eventsToApply) } func (i *InMemoryStore[T]) Save(_ context.Context, t *T) error { @@ -40,25 +66,112 @@ func (i *InMemoryStore[T]) Save(_ context.Context, t *T) error { return errors.New("target to save must not be nil") } - stm := *t + entity := *t + currentVersion, err := i.storeEntityEvents(entity) + if err != nil { + return err + } + + entityWithSnapshots, ok := supportsSnapshots(t) + if !ok { + return nil + } + + err = i.storeEntitySnapshot(entityWithSnapshots, currentVersion) + if err != nil { + return err + } + + return nil +} + +func (i *InMemoryStore[T]) loadLastSnapshot(id string) (esja.VersionedSnapshot[T], bool) { + snapshots, found := i.snapshots[id] + var lastSnapshot esja.VersionedSnapshot[T] + for _, s := range snapshots { + if s.StreamVersion >= lastSnapshot.StreamVersion { + lastSnapshot = s + } + } + return lastSnapshot, found +} - events := stm.Stream().PopEvents() +func (i *InMemoryStore[T]) storeEntityEvents(entity T) (int, error) { + events := entity.Stream().PopEvents() if len(events) == 0 { - return errors.New("no events to save") - } - - if priorEvents, ok := i.events[stm.Stream().ID()]; !ok { - i.events[stm.Stream().ID()] = events - } else { - for _, event := range events { - if len(priorEvents) > 0 { - if priorEvents[len(priorEvents)-1].StreamVersion >= event.StreamVersion { - return errors.New("stream version duplicate") - } - } - i.events[stm.Stream().ID()] = append(i.events[stm.Stream().ID()], event) + return 0, errors.New("no events to save") + } + + priorEvents, ok := i.events[entity.Stream().ID()] + if !ok { + i.events[entity.Stream().ID()] = make([]esja.VersionedEvent[T], 0) + } + + lastVersion := 0 + for _, e := range priorEvents { + if e.StreamVersion > lastVersion { + lastVersion = e.StreamVersion + } + } + + currentVersion := lastVersion + for _, e := range events { + if e.StreamVersion <= currentVersion { + return 0, errors.New("stream version duplicate") + } + if e.StreamVersion > currentVersion { + currentVersion = e.StreamVersion } + + i.events[entity.Stream().ID()] = append( + i.events[entity.Stream().ID()], + e, + ) + } + + return currentVersion, nil +} + +func (i *InMemoryStore[T]) storeEntitySnapshot( + entity esja.EntityWithSnapshots[T], + currentVersion int, +) error { + if i.config.MakeSnapshotEveryNVersions <= 0 { + return nil } + lastSnapshot, found := i.loadLastSnapshot(entity.Stream().ID()) + lastSnapshotVersion := 0 + if found { + lastSnapshotVersion = lastSnapshot.StreamVersion + } + + if currentVersion-lastSnapshotVersion < i.config.MakeSnapshotEveryNVersions { + return nil + } + + snapshot := entity.Snapshot() + snapshotVersioned := esja.VersionedSnapshot[T]{ + Snapshot: snapshot, + StreamVersion: currentVersion, + } + + _, ok := i.snapshots[entity.Stream().ID()] + if !ok { + i.snapshots[entity.Stream().ID()] = make([]esja.VersionedSnapshot[T], 0) + } + + i.snapshots[entity.Stream().ID()] = append( + i.snapshots[entity.Stream().ID()], + snapshotVersioned, + ) + return nil } + +func supportsSnapshots[T esja.Entity[T]](t *T) (esja.EntityWithSnapshots[T], bool) { + var entity interface{} + entity = *t + entityWithSnapshots, ok := entity.(esja.EntityWithSnapshots[T]) + return entityWithSnapshots, ok +} diff --git a/snapshot.go b/snapshot.go new file mode 100644 index 0000000..62a8cb1 --- /dev/null +++ b/snapshot.go @@ -0,0 +1,26 @@ +package esja + +// Snapshot is an Event that stores and applies the current state back to the Entity. +type Snapshot[T any] interface { + // ApplyTo applies the snapshot to the entity. + ApplyTo(*T) error + + // SnapshotName should identify the snapshot and the version of its schema. + SnapshotName() string +} + +// VersionedSnapshot is a snapshot with a corresponding current stream version. +type VersionedSnapshot[T any] struct { + Snapshot[T] + StreamVersion int +} + +// EntityWithSnapshots is an optional extension to the Entity interface. +// When implemented it informs that Entity supports snapshots +// and those should be created in the event store when applicable. +type EntityWithSnapshots[T any] interface { + Entity[T] + + // Snapshot returns a Snapshot representing current the state of the Entity. + Snapshot() Snapshot[T] +} diff --git a/stream.go b/stream.go index 265d435..31ac097 100644 --- a/stream.go +++ b/stream.go @@ -2,7 +2,6 @@ package esja import ( "errors" - "fmt" ) // Stream represents a queue of events and basic stream properties. @@ -74,18 +73,3 @@ func (s *Stream[T]) PopEvents() []VersionedEvent[T] { func (s *Stream[T]) HasEvents() bool { return len(s.queue) > 0 } - -func newStream[T any](id string, events []VersionedEvent[T]) (*Stream[T], error) { - if len(events) == 0 { - return nil, fmt.Errorf("no stream to load") - } - - e, err := NewStream[T](id) - if err != nil { - return nil, err - } - e.version = events[len(events)-1].StreamVersion - e.queue = events - - return e, nil -}