-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathsemaphore.go
142 lines (120 loc) · 3 KB
/
semaphore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package semaphore
import (
"context"
"sync"
"time"
)
// Semaphore is an implementation of semaphore.
type Semaphore struct {
permits int
avail int
channel chan struct{}
aMutex sync.Mutex // acquire
rMutex sync.Mutex // release
pMutex sync.RWMutex // number of permits
}
// New creates a new Semaphore with specified number of permits.
func New(permits int) *Semaphore {
if permits < 1 {
panic("Invalid number of permits. Less than 1")
}
// fill channel buffer
channel := make(chan struct{}, permits)
for i := 0; i < permits; i++ {
channel <- struct{}{}
}
return &Semaphore{
permits: permits,
avail: permits,
channel: channel,
}
}
// Acquire acquires one permit. If it is not available, the goroutine will block until it is available.
func (s *Semaphore) Acquire() {
s.aMutex.Lock()
defer s.aMutex.Unlock()
s.pMutex.Lock()
s.avail--
s.pMutex.Unlock()
<-s.channel
}
// AcquireMany is similar to Acquire() but for many permits.
//
// The number of permits acquired is at most the number of permits in the semaphore.
// i.e. if n = 5 and s was created with New(2), at most 2 permits will be acquired.
func (s *Semaphore) AcquireMany(n int) {
if n > s.permits {
n = s.permits
}
for ; n > 0; n-- {
s.Acquire()
}
}
// AcquireContext is similar to AcquireMany() but takes a context. Returns true if successful
// or false if the context is done first.
func (s *Semaphore) AcquireContext(ctx context.Context, n int) bool {
acquired := make(chan struct{}, 1)
reverse := make(chan bool, 1)
go func() {
s.AcquireMany(n)
acquired <- struct{}{}
if <-reverse {
s.ReleaseMany(n)
}
close(acquired)
close(reverse)
}()
select {
case <-ctx.Done():
reverse <- true
return false
case <-acquired:
reverse <- false
return true
}
}
// AcquireWithin is similar to AcquireMany() but cancels if duration elapses before getting the permits.
// Returns true if successful and false if timeout occurs.
func (s *Semaphore) AcquireWithin(n int, d time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), d)
defer cancel()
return s.AcquireContext(ctx, n)
}
// Release releases one permit.
func (s *Semaphore) Release() {
s.rMutex.Lock()
defer s.rMutex.Unlock()
s.channel <- struct{}{}
s.pMutex.Lock()
s.avail++
s.pMutex.Unlock()
}
// ReleaseMany releases n permits.
//
// The number of permits released is at most the number of permits in the semaphore.
// i.e. if n = 5 and s was created with New(2), at most 2 permits will be released.
func (s *Semaphore) ReleaseMany(n int) {
if n > s.permits {
n = s.permits
}
for ; n > 0; n-- {
s.Release()
}
}
// AvailablePermits gives number of available unacquired permits.
func (s *Semaphore) AvailablePermits() int {
s.pMutex.RLock()
defer s.pMutex.RUnlock()
if s.avail < 0 {
return 0
}
return s.avail
}
// DrainPermits acquires all available permits and return the number of permits acquired.
func (s *Semaphore) DrainPermits() int {
n := s.AvailablePermits()
if n > 0 {
s.AcquireMany(n)
}
return n
}