Skip to content

Commit

Permalink
Sync read/write api (#2)
Browse files Browse the repository at this point in the history
* hide channels manipulations from pool, make api synced

* suppress linter

* add cursor tests

* add comments
  • Loading branch information
umputun authored Oct 9, 2020
1 parent 507090e commit 0b3b717
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 139 deletions.
46 changes: 26 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,11 @@ func ExampleFlow_parallel() {

## Details about `pool` package

- In addition to the default "run a func in multiple goroutines" mode, it also provides an optional support of chunked workers. It means - each key, detected by user-provide func guaranteed to be processed by the same worker. Such mode needed for stateful flows where each set of input records has to be processed sequentially and some state should be kept.
- another thing `pool` provides is a batch size. This one is a simple performance optimization keeping input request into a buffer and send them to worker channel in batches (slices) instead of per-submit call
Pool package provides thin implementation of workers pool. In addition to the default "run a func in multiple goroutines" mode,
it also provides an optional support of chunked workers. In this mode each key, detected by user-provide func, guaranteed to be
processed by the same worker. Such mode needed for stateful flows where each set of input records has to be processed sequentially
and some state should be kept. Another thing `pool` allows to define is the batch size. This one is a simple performance optimization
collecting input request into a buffer and send them to worker channel in batches (slices) instead of per-submit call.

Options:

Expand All @@ -212,16 +215,17 @@ Options:
- `ChanResSize` sets the size of output buffered channel (default 1)
- `ChanWorkerSize` sets the size of workers buffered channel (default 1)
- `ContinueOnError` allows workers continuation after error occurred
- `OnCompletion` sets callback for each worker called on successful completion
- `OnCompletion` sets callback (for each worker) called on successful completion

### worker function

Worker function passed by user and will run in multiple workers (goroutines).
This is the function: `type workerFn func(ctx context.Context, inp interface{}, resCh interface{}, store WorkerStore} error`
Worker function passed by user and runs in multiple workers (goroutines) concurrently.
This is the function: `type workerFn func(ctx context.Context, inp interface{}, sender SenderFn, store WorkerStore} error`

It takes `inp` parameter, does the job and optionally send result(s) to `resCh`. Error will terminate all workers.
Note: `workerFn` can be stateful, collect anything it needs and sends 0 or more results. Results wrapped in `Response` struct
allowing to communicate error code back to consumer. `workerFn` doesn't need to send errors, enough just return non-nil error.
It takes `inp` parameter, does the job and optionally send result(s) with `SenderFn` to the common results channel.
Error will terminate all workers unless `ContinueOnError` set.

Note: `workerFn` can be stateful, collect anything it needs and sends 0 or more results by calling `SenderFn` one or more times.

### worker store

Expand All @@ -245,7 +249,7 @@ _alternatively state can be kept outside of workers as a slice of values and acc
### usage

```go
p := pool.New(8, func(ctx context.Context, v interface{}, resCh interface{}, ws pool.WorkerStore} error {
p := pool.New(8, func(ctx context.Context, v interface{}, sendFn pool.Sender, ws pool.WorkerStore} error {
// worker function gets input v processes it and response(s) channel to send results

input, ok := v.(string) // in this case it gets string as input
Expand All @@ -256,15 +260,15 @@ _alternatively state can be kept outside of workers as a slice of values and acc
// ...

v := ws.GetInt("something") // access thread-local var
resCh <- pool.Response{Data: "foo"}
resCh <- pool.Response{Data: "bar"}

sendFn("foo", nil) // send "foo" and nil error
sendFn("bar", nil) // send "foo" and nil error
pool.Metrics(ctx).Inc("counter")
ws.Set("something", 1234) // keep thread-local things
return "something", true, nil
})

ch := p.Go(context.TODO()) // start all workers in 8 goroutines
cursor, err := p.Go(context.TODO()) // start all workers in 8 goroutines and get back result's cursor

// submit values (consumer side)
go func() {
Expand All @@ -273,15 +277,17 @@ _alternatively state can be kept outside of workers as a slice of values and acc
p.Close() // indicates completion of all inputs
}()

for rec := range ch {
if rec.Errors != nil { // error happened
return err
}
log.Print(rec.Data) // print value
var v interface{}
for cursor(ctx, &v) {
log.Print(v) // print value
}

if cursor.Err() != nil { // error happened
return cursor.Err()
}

// alternatively ReadAll helper can be used to get everything from response channel
res, err := pool.ReadAll(ch)
// alternatively read all from the cursor (response channel)
res, err := cursor.All(ctx)

// metrics the same as for flow
metrics := pool.Metrics()
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
43 changes: 43 additions & 0 deletions pool/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package pool

import "context"

// Cursor provides synchronous access to async data from pool's response channel
type Cursor struct {
ch chan response
err error
}

// Next returns next result from the cursor, ok = false on completion.
// Any error saved internally and can be returned by Err call
func (c *Cursor) Next(ctx context.Context, v *interface{}) bool {
for {
select {
case resp, ok := <-c.ch:
if !ok {
return false
}
if resp.err != nil {
c.err = resp.err
continue
}
*v = resp.value
return ok
case <-ctx.Done():
c.err = ctx.Err()
return false
}
}
}

// All gets all data from the cursor
func (c *Cursor) All(ctx context.Context) (res []interface{}, err error) {
var v interface{}
for c.Next(ctx, &v) {
res = append(res, v)
}
return res, c.err
}

// Err returns error collected by Next
func (c *Cursor) Err() error { return c.err }
69 changes: 69 additions & 0 deletions pool/cursor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package pool

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCursor_Next(t *testing.T) {
c := Cursor{ch: make(chan response, 3)}

c.ch <- response{value: "12345"}
c.ch <- response{value: "abc"}
c.ch <- response{value: "xyz 0987"}
close(c.ch)

var v interface{}
next := c.Next(context.Background(), &v)
assert.True(t, next)
assert.Equal(t, "12345", v.(string))

next = c.Next(context.Background(), &v)
assert.True(t, next)
assert.Equal(t, "abc", v.(string))

next = c.Next(context.Background(), &v)
assert.True(t, next)
assert.Equal(t, "xyz 0987", v.(string))

next = c.Next(context.Background(), &v)
assert.False(t, next)
}

func TestCursor_All(t *testing.T) {
c := Cursor{ch: make(chan response, 3)}

c.ch <- response{value: "12345"}
c.ch <- response{value: "abc"}
c.ch <- response{value: "xyz 0987"}
close(c.ch)

res, err := c.All(context.Background())
require.NoError(t, err)
assert.Len(t, res, 3)

res, err = c.All(context.Background())
require.NoError(t, err)
assert.Len(t, res, 0)
}

func TestCursor_AllWithError(t *testing.T) {
c := Cursor{ch: make(chan response, 3)}

c.ch <- response{value: "12345"}
c.ch <- response{value: "abc", err: errors.New("failed")}
c.ch <- response{value: "xyz 0987"}
close(c.ch)

res, err := c.All(context.Background())
require.EqualError(t, err, "failed")
assert.Len(t, res, 2)

res, err = c.All(context.Background())
require.EqualError(t, err, "failed")
assert.Len(t, res, 0)
}
Loading

0 comments on commit 0b3b717

Please sign in to comment.