Skip to content

Commit

Permalink
Add SQLite adapter (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
krzysztofreczek authored Dec 8, 2022
1 parent 2c9c766 commit 8fbb493
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 62 deletions.
78 changes: 60 additions & 18 deletions example/storage/eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"database/sql"
"fmt"
"os"
"testing"

"github.com/google/uuid"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -30,18 +32,9 @@ var (
}
)

const (
host = "localhost"
port = 5432
user = "postgres"
password = "password"
dbname = "postgres"
)

func TestPostcard_Repositories(t *testing.T) {
conn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
db, err := sql.Open("postgres", conn)
require.NoError(t, err)
postgresDB := testPostgresDB(t)
sqliteDB := testSQLiteDB(t)

testCases := []struct {
name string
Expand All @@ -54,47 +47,63 @@ func TestPostcard_Repositories(t *testing.T) {
{
name: "postgres_simple",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewDefaultSimplePostcardRepository(context.Background(), db)
repo, err := storage.NewDefaultSimplePostcardRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "postgres_simple_custom",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewCustomSimplePostcardRepository(context.Background(), db)
repo, err := storage.NewCustomSimplePostcardRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "postgres_simple_anonymized",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewSimpleAnonymizingPostcardRepository(context.Background(), db)
repo, err := storage.NewSimpleAnonymizingPostcardRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "postgres_mapping",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewDefaultMappingPostgresRepository(context.Background(), db)
repo, err := storage.NewDefaultMappingPostgresRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "postgres_mapping",
name: "postgres_mapping_custom",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewCustomMappingPostcardRepository(context.Background(), db)
repo, err := storage.NewCustomMappingPostcardRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "postgres_mapping_anonymized",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewMappingAnonymizingPostcardRepository(context.Background(), db)
repo, err := storage.NewMappingAnonymizingPostcardRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "sqlite_simple",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewSimpleSQLitePostcardRepository(context.Background(), sqliteDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "sqlite_mapping",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewMappingSQLitePostcardRepository(context.Background(), sqliteDB)
require.NoError(t, err)
return repo
}(),
Expand Down Expand Up @@ -160,3 +169,36 @@ func TestPostcard_Repositories(t *testing.T) {
})
}
}

const (
host = "localhost"
port = 5432
user = "postgres"
password = "password"
dbname = "postgres"
)

func testPostgresDB(t *testing.T) *sql.DB {
conn := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
host,
port,
user,
password,
dbname,
)
postgresDB, err := sql.Open("postgres", conn)
require.NoError(t, err)

return postgresDB
}

func testSQLiteDB(t *testing.T) *sql.DB {
dbFile, err := os.CreateTemp("", "tmp_*.db")
require.NoError(t, err)

sqliteDB, err := sql.Open("sqlite3", dbFile.Name())
require.NoError(t, err)

return sqliteDB
}
15 changes: 15 additions & 0 deletions example/storage/mapping_postgres_eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ func NewMappingAnonymizingPostcardRepository(ctx context.Context, db *sql.DB) (e
)
}

func NewMappingSQLitePostcardRepository(ctx context.Context, db *sql.DB) (eventstore.EventStore[*postcard.Postcard], error) {
return eventstore.NewSQLStore[*postcard.Postcard](
ctx,
db,
eventstore.NewMappingSQLiteConfig[*postcard.Postcard](
[]transport.EventMapper[*postcard.Postcard]{
CreatedMapper{},
AddressedMapper{},
WrittenMapper{},
SentMapper{},
},
),
)
}

type Created struct {
ID string `json:"id"`
}
Expand Down
15 changes: 15 additions & 0 deletions example/storage/simple_postgres_eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ func NewSimpleAnonymizingPostcardRepository(ctx context.Context, db *sql.DB) (ev
)
}

func NewSimpleSQLitePostcardRepository(ctx context.Context, db *sql.DB) (eventstore.EventStore[*postcard.Postcard], error) {
return eventstore.NewSQLStore[*postcard.Postcard](
ctx,
db,
eventstore.NewSQLiteConfig[*postcard.Postcard](
[]aggregate.Event[*postcard.Postcard]{
postcard.Created{},
postcard.Addressed{},
postcard.Written{},
postcard.Sent{},
},
),
)
}

type ConstantSecretProvider struct{}

func (c ConstantSecretProvider) SecretForKey(aggregateID aggregate.ID) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ go 1.18
require (
github.com/google/uuid v1.3.0
github.com/lib/pq v1.10.6
github.com/mattn/go-sqlite3 v1.14.16
github.com/stretchr/testify v1.7.0
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs=
github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
18 changes: 18 additions & 0 deletions pkg/eventstore/sql_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,21 @@ func NewMappingPostgresSQLConfig[T any](
Serializer: transport.NewMappingSerializer[T](transport.JSONMarshaler{}, eventMappers),
}
}

func NewSQLiteConfig[T any](
supportedEvents []aggregate.Event[T],
) SQLConfig[T] {
return SQLConfig[T]{
SchemaAdapter: NewSQLiteSchemaAdapter[T](""),
Serializer: transport.NewSimpleSerializer(transport.JSONMarshaler{}, supportedEvents),
}
}

func NewMappingSQLiteConfig[T any](
eventMappers []transport.EventMapper[T],
) SQLConfig[T] {
return SQLConfig[T]{
SchemaAdapter: NewSQLiteSchemaAdapter[T](""),
Serializer: transport.NewMappingSerializer[T](transport.JSONMarshaler{}, eventMappers),
}
}
52 changes: 52 additions & 0 deletions pkg/eventstore/sql_schema_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package eventstore

import (
"fmt"
"strings"
)

const (
defaultEventsTableName = "events"
defaultSelectQuery = `
SELECT
aggregate_id,
aggregate_version,
event_name,
event_payload
FROM %s
WHERE aggregate_id = $1 AND aggregate_type = $2
ORDER BY aggregate_version ASC;
`
defaultInsertQuery = `
INSERT INTO %s (
aggregate_id,
aggregate_version,
aggregate_type,
event_name,
event_payload
)
VALUES %s
`
defaultInsertMarkersCount = 5
defaultInsertMarkersPattern = "($%d,$%d,$%d,$%d,$%d),"
)

func defaultInsertMarkers(count int) string {
result := strings.Builder{}

var indices []any
for i := 1; i <= count*defaultInsertMarkersCount; i++ {
indices = append(indices, i)
if i%defaultInsertMarkersCount == 0 {
result.WriteString(
fmt.Sprintf(
defaultInsertMarkersPattern,
indices...,
),
)
indices = nil
}
}

return strings.TrimRight(result.String(), ",")
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,9 @@ package eventstore

import (
"fmt"
"strings"
)

const eventsTableName = "events"

type PostgresSchemaAdapter[A any] struct {
aggregateType string
}

func NewPostgresSchemaAdapter[A any](aggregateType string) PostgresSchemaAdapter[A] {
return PostgresSchemaAdapter[A]{
aggregateType: aggregateType,
}
}

func (a PostgresSchemaAdapter[A]) InitializeSchemaQuery() string {
query := `
const postgresInitializeSchemaQuery = `
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE IF NOT EXISTS %[1]s (
id serial NOT NULL PRIMARY KEY,
Expand All @@ -32,19 +18,23 @@ CREATE TABLE IF NOT EXISTS %[1]s (
CREATE INDEX IF NOT EXISTS idx_aggregate_id ON %[1]s (aggregate_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_aggregate_id_version ON %[1]s (aggregate_id, aggregate_version);
`
return fmt.Sprintf(query, eventsTableName)

type PostgresSchemaAdapter[A any] struct {
aggregateType string
}

func (a PostgresSchemaAdapter[A]) SelectQuery(aggregateID string) (string, []any, error) {
query := `
SELECT
aggregate_id, aggregate_version, event_name, event_payload
FROM "%s"
WHERE aggregate_id = $1 AND aggregate_type = $2
ORDER BY aggregate_version ASC;
`
func NewPostgresSchemaAdapter[A any](aggregateType string) PostgresSchemaAdapter[A] {
return PostgresSchemaAdapter[A]{
aggregateType: aggregateType,
}
}

func (a PostgresSchemaAdapter[A]) InitializeSchemaQuery() string {
return fmt.Sprintf(postgresInitializeSchemaQuery, defaultEventsTableName)
}

query = fmt.Sprintf(query, eventsTableName)
func (a PostgresSchemaAdapter[A]) SelectQuery(aggregateID string) (string, []any, error) {
query := fmt.Sprintf(defaultSelectQuery, defaultEventsTableName)

args := []any{
aggregateID, a.aggregateType,
Expand All @@ -54,14 +44,9 @@ ORDER BY aggregate_version ASC;
}

func (a PostgresSchemaAdapter[A]) InsertQuery(events []storageEvent[A]) (string, []any, error) {
query := `
INSERT INTO %s (aggregate_id, aggregate_version, aggregate_type, event_name, event_payload)
VALUES %s`

query = fmt.Sprintf(query, eventsTableName, defaultInsertMarkers(len(events)))
query := fmt.Sprintf(defaultInsertQuery, defaultEventsTableName, defaultInsertMarkers(len(events)))

var args []any

for _, e := range events {
args = append(
args,
Expand All @@ -76,15 +61,3 @@ VALUES %s`
return query, args, nil

}

func defaultInsertMarkers(count int) string {
result := strings.Builder{}

index := 1
for i := 0; i < count; i++ {
result.WriteString(fmt.Sprintf("($%d,$%d,$%d,$%d,$%d),", index, index+1, index+2, index+3, index+4))
index += 5
}

return strings.TrimRight(result.String(), ",")
}
Loading

0 comments on commit 8fbb493

Please sign in to comment.