diff --git a/pool/object.go b/pool/object.go index be186f9..f8faaee 100644 --- a/pool/object.go +++ b/pool/object.go @@ -52,13 +52,6 @@ type objectPool struct { metrics objectPoolMetrics } -type objectPoolMetrics struct { - free tally.Gauge - total tally.Gauge - getOnEmpty tally.Counter - putOnFull tally.Counter -} - // NewObjectPool creates a new pool func NewObjectPool(opts ObjectPoolOptions) ObjectPool { if opts == nil { @@ -75,12 +68,7 @@ func NewObjectPool(opts ObjectPoolOptions) ObjectPool { opts.RefillLowWatermark() * float64(opts.Size()))), refillHighWatermark: int(math.Ceil( opts.RefillHighWatermark() * float64(opts.Size()))), - metrics: objectPoolMetrics{ - free: m.Gauge("free"), - total: m.Gauge("total"), - getOnEmpty: m.Counter("get-on-empty"), - putOnFull: m.Counter("put-on-full"), - }, + metrics: newObjectPoolMetrics(m), } p.setGauges() @@ -172,3 +160,19 @@ func (p *objectPool) tryFill() { } }() } + +func newObjectPoolMetrics(m tally.Scope) objectPoolMetrics { + return objectPoolMetrics{ + free: m.Gauge("free"), + total: m.Gauge("total"), + getOnEmpty: m.Counter("get-on-empty"), + putOnFull: m.Counter("put-on-full"), + } +} + +type objectPoolMetrics struct { + free tally.Gauge + total tally.Gauge + getOnEmpty tally.Counter + putOnFull tally.Counter +} diff --git a/pool/runtime.go b/pool/runtime.go new file mode 100644 index 0000000..c3378d5 --- /dev/null +++ b/pool/runtime.go @@ -0,0 +1,35 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pool + +import ( + _ "unsafe" // required for go:linkname +) + +//go:linkname ProcPin runtime.procPin +func ProcPin() int + +//go:linkname ProcUnpin runtime.procUnpin +func ProcUnpin() + +// rotl is forward declared to bypass the go build 'complete' restriction, this is +// required to be able to use the `go:linkname` directive. +func rotl(x, y int64) int64 diff --git a/pool/runtime_amd64.s b/pool/runtime_amd64.s new file mode 100644 index 0000000..1fad507 --- /dev/null +++ b/pool/runtime_amd64.s @@ -0,0 +1,29 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +TEXT ·rotl(SB), $0 + MOVQ x+0(FP), BX + + // C is the counter register, used for args + // for things like bit shift commands + MOVQ y+8(FP), CX + ROLQ CX, BX + MOVQ BX, ret+16(FP) + RET diff --git a/pool/sharded_object.go b/pool/sharded_object.go new file mode 100644 index 0000000..d444fc6 --- /dev/null +++ b/pool/sharded_object.go @@ -0,0 +1,198 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pool + +import ( + "fmt" + "math" + "runtime" + "sync" + "sync/atomic" + gounsafe "unsafe" +) + +// NB: heavily inspired by https://golang.org/src/sync/pool.go +type shardedObjectPool struct { + local []*poolLocal // local fixed-size per-P pool + + localPoolSize int + refillLowWatermark int + refillHighWatermark int + + alloc Allocator + initialized int32 + metrics objectPoolMetrics + opts ObjectPoolOptions +} + +// Local per-P Pool appendix. +type poolLocalInternal struct { + sync.Mutex // Protects shared. + shared []interface{} // Can be used by any P. + + private interface{} // Can be used only by the respective P. + + filling int32 +} + +type poolLocal struct { + poolLocalInternal + + // Prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + pad [128 - gounsafe.Sizeof(poolLocalInternal{})%128]byte +} + +func NewShardedObjectPool(opts ObjectPoolOptions) ObjectPool { + if opts == nil { + opts = NewObjectPoolOptions() + } + + return &shardedObjectPool{ + opts: opts, + metrics: newObjectPoolMetrics(opts.InstrumentOptions().MetricsScope()), + } +} + +func (p *shardedObjectPool) Init(alloc Allocator) { + if !atomic.CompareAndSwapInt32(&p.initialized, 0, 1) { + fn := p.opts.OnPoolAccessErrorFn() + fn(errPoolAlreadyInitialized) + return + } + + p.alloc = alloc + + numProcs := runtime.GOMAXPROCS(0) + localPoolSize := int(math.Ceil(float64(p.opts.Size()) / float64(numProcs))) + if localPoolSize <= 0 { + fn := p.opts.OnPoolAccessErrorFn() + fn(fmt.Errorf( + "unable to compute localPoolSize [%d], GOMAXPROCS: %d, Size: %d", + localPoolSize, numProcs, p.opts.Size())) + return + } + lowWatermark := int(math.Ceil(p.opts.RefillLowWatermark() * float64(localPoolSize))) + highWatermark := int(math.Ceil(p.opts.RefillHighWatermark() * float64(localPoolSize))) + p.localPoolSize, p.refillLowWatermark, p.refillHighWatermark = localPoolSize, lowWatermark, highWatermark + + local := make([]*poolLocal, numProcs) + for i := 0; i < len(local); i++ { + local[i] = &poolLocal{} + local[i].private = p.alloc() + shared := make([]interface{}, 0, 2*localPoolSize) + for j := 0; j < localPoolSize; j++ { + shared = append(shared, p.alloc()) + } + local[i].shared = shared + } + p.local = local + + p.setGauges() +} + +// Put adds x to the pool. +func (p *shardedObjectPool) Put(x interface{}) { + if x == nil { + return + } + l := p.pin() + if l.private == nil { + l.private = x + x = nil + } + ProcUnpin() + if x != nil { + l.Lock() + l.shared = append(l.shared, x) + l.Unlock() + } +} + +// Get selects an arbitrary item from the Pool, removes it from the +// Pool, and returns it to the caller. +// Get may choose to ignore the pool and treat it as empty. +// Callers should not assume any relation between values passed to Put and +// the values returned by Get. +// +// If Get would otherwise return nil and p.New is non-nil, Get returns +// the result of calling p.New. +func (p *shardedObjectPool) Get() interface{} { + l := p.pin() + x := l.private + l.private = nil + ProcUnpin() + if x == nil { + l.Lock() + last := len(l.shared) - 1 + if last >= 0 { + x = l.shared[last] + l.shared = l.shared[:last] + } + l.Unlock() + if x == nil { + x = p.getSlow() + } + } + if x == nil { + x = p.alloc() + } + return x +} + +func (p *shardedObjectPool) getSlow() (x interface{}) { + local := p.local + // Try to steal one element from other procs. + pid := ProcPin() + ProcUnpin() + for i := 0; i < len(local); i++ { + l := indexLocal(local, pid+i+1) + l.Lock() + last := len(l.shared) - 1 + if last >= 0 { + x = l.shared[last] + l.shared = l.shared[:last] + l.Unlock() + break + } + l.Unlock() + } + return x +} + +// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P. +// Caller must call ProcUnpin() when done with the pool. +func (p *shardedObjectPool) pin() *poolLocal { + pid := ProcPin() + return indexLocal(p.local, pid) +} + +func indexLocal(l []*poolLocal, i int) *poolLocal { + idx := i % len(l) + return l[idx] +} + +func (p *shardedObjectPool) setGauges() { + /* + p.metrics.free.Update(float64(len(p.values))) + p.metrics.total.Update(float64(p.size)) + */ +} diff --git a/pool/sharded_object_test.go b/pool/sharded_object_test.go new file mode 100644 index 0000000..2d3e7ff --- /dev/null +++ b/pool/sharded_object_test.go @@ -0,0 +1,277 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// +build !race + +package pool + +import ( + "runtime" + "runtime/debug" + "testing" +) + +func newTestPool() *shardedObjectPool { + p := NewShardedObjectPool(nil).(*shardedObjectPool) + p.Init(func() interface{} { + return nil + }) + return p +} + +func TestPool(t *testing.T) { + // disable GC so we can control when it happens. + defer debug.SetGCPercent(debug.SetGCPercent(-1)) + p := newTestPool() + if p.Get() != nil { + t.Fatal("expected empty") + } + + // Make sure that the goroutine doesn't migrate to another P + // between Put and Get calls. + ProcPin() + p.Put("a") + p.Put("b") + if g := p.Get(); g != "a" { + t.Fatalf("got %#v; want a", g) + } + if g := p.Get(); g != "b" { + t.Fatalf("got %#v; want b", g) + } + if g := p.Get(); g != nil { + t.Fatalf("got %#v; want nil", g) + } + ProcUnpin() + + p.Put("c") + debug.SetGCPercent(100) // to allow following GC to actually run + runtime.GC() + if g := p.Get(); g != nil { + t.Fatalf("got %#v; want nil after GC", g) + } +} + +/* +func TestPoolNew(t *testing.T) { + // disable GC so we can control when it happens. + defer debug.SetGCPercent(debug.SetGCPercent(-1)) + + i := 0 + p := Pool{ + New: func() interface{} { + i++ + return i + }, + } + if v := p.Get(); v != 1 { + t.Fatalf("got %v; want 1", v) + } + if v := p.Get(); v != 2 { + t.Fatalf("got %v; want 2", v) + } + + // Make sure that the goroutine doesn't migrate to another P + // between Put and Get calls. + ProcPin() + p.Put(42) + if v := p.Get(); v != 42 { + t.Fatalf("got %v; want 42", v) + } + ProcUnpin() + + if v := p.Get(); v != 3 { + t.Fatalf("got %v; want 3", v) + } +} + +// Test that Pool does not hold pointers to previously cached resources. +func TestPoolGC(t *testing.T) { + testPool(t, true) +} + +// Test that Pool releases resources on GC. +func TestPoolRelease(t *testing.T) { + testPool(t, false) +} + +func testPool(t *testing.T, drain bool) { + var p Pool + const N = 100 +loop: + for try := 0; try < 3; try++ { + var fin, fin1 uint32 + for i := 0; i < N; i++ { + v := new(string) + runtime.SetFinalizer(v, func(vv *string) { + atomic.AddUint32(&fin, 1) + }) + p.Put(v) + } + if drain { + for i := 0; i < N; i++ { + p.Get() + } + } + for i := 0; i < 5; i++ { + runtime.GC() + time.Sleep(time.Duration(i*100+10) * time.Millisecond) + // 1 pointer can remain on stack or elsewhere + if fin1 = atomic.LoadUint32(&fin); fin1 >= N-1 { + continue loop + } + } + t.Fatalf("only %v out of %v resources are finalized on try %v", fin1, N, try) + } +} + +func TestPoolStress(t *testing.T) { + const P = 10 + N := int(1e6) + if testing.Short() { + N /= 100 + } + var p Pool + done := make(chan bool) + for i := 0; i < P; i++ { + go func() { + var v interface{} = 0 + for j := 0; j < N; j++ { + if v == nil { + v = 0 + } + p.Put(v) + v = p.Get() + if v != nil && v.(int) != 0 { + t.Errorf("expect 0, got %v", v) + break + } + } + done <- true + }() + } + for i := 0; i < P; i++ { + <-done + } +} +*/ + +// ❯ go test -bench=Pool -cpu 1,2,4 -run '^$' ./pool +// goos: darwin +// goarch: amd64 +// pkg: github.com/m3db/m3x/pool +// BenchmarkObjectPoolGetPut 10000000 111 ns/op +// BenchmarkObjectPoolGetPut-2 20000000 102 ns/op +// BenchmarkObjectPoolGetPut-4 20000000 93.9 ns/op +// BenchmarkShardedPoolGetPut 20000000 93.4 ns/op +// BenchmarkShardedPoolGetPut-2 20000000 90.6 ns/op +// BenchmarkShardedPoolGetPut-4 20000000 91.3 ns/op +// BenchmarkShardedPoolConcurrentGetPut 50000000 38.0 ns/op +// BenchmarkShardedPoolConcurrentGetPut-2 100000000 20.4 ns/op +// BenchmarkShardedPoolConcurrentGetPut-4 100000000 16.3 ns/op +// BenchmarkChannelPoolConcurrentGetPut 20000000 91.0 ns/op +// BenchmarkChannelPoolConcurrentGetPut-2 10000000 142 ns/op +// BenchmarkChannelPoolConcurrentGetPut-4 10000000 175 ns/op +// BenchmarkShardedPoolOverflow 300000 5866 ns/op +// BenchmarkShardedPoolOverflow-2 500000 3979 ns/op +// BenchmarkShardedPoolOverflow-4 500000 2783 ns/op +// BenchmarkChannelPoolOverflow 200000 5852 ns/op +// BenchmarkChannelPoolOverflow-2 500000 3233 ns/op +// BenchmarkChannelPoolOverflow-4 500000 2669 ns/op + +func BenchmarkShardedPoolGetPut(b *testing.B) { + opts := NewObjectPoolOptions().SetSize(1) + pool := NewObjectPool(opts) + pool.Init(func() interface{} { + return 1 + }) + + for n := 0; n < b.N; n++ { + o := pool.Get() + pool.Put(o) + } +} + +func BenchmarkShardedPoolConcurrentGetPut(b *testing.B) { + p := NewShardedObjectPool(nil) + p.Init(func() interface{} { + return 0 + }) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p.Put(1) + p.Get() + } + }) +} +func BenchmarkChannelPoolConcurrentGetPut(b *testing.B) { + p := NewObjectPool(nil) + p.Init(func() interface{} { + return 0 + }) + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p.Put(1) + p.Get() + } + }) +} + +func BenchmarkShardedPoolOverflow(b *testing.B) { + p := NewShardedObjectPool(nil) + p.Init(func() interface{} { + return 0 + }) + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for b := 0; b < 100; b++ { + p.Put(1) + } + for b := 0; b < 100; b++ { + p.Get() + } + } + }) +} + +func BenchmarkChannelPoolOverflow(b *testing.B) { + p := NewShardedObjectPool(nil) + p.Init(func() interface{} { + return 0 + }) + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for b := 0; b < 100; b++ { + p.Put(1) + } + for b := 0; b < 100; b++ { + p.Get() + } + } + }) +}