forked from quic-go/quic-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwindow_update_queue.go
79 lines (70 loc) · 2.01 KB
/
window_update_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package quic
import (
"sync"
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
)
type windowUpdateQueue struct {
mutex sync.Mutex
queue map[protocol.StreamID]bool // used as a set
queuedConn bool // connection-level window update
cryptoStream cryptoStreamI
streamGetter streamGetter
connFlowController flowcontrol.ConnectionFlowController
callback func(wire.Frame)
}
func newWindowUpdateQueue(
streamGetter streamGetter,
cryptoStream cryptoStreamI,
connFC flowcontrol.ConnectionFlowController,
cb func(wire.Frame),
) *windowUpdateQueue {
return &windowUpdateQueue{
queue: make(map[protocol.StreamID]bool),
streamGetter: streamGetter,
cryptoStream: cryptoStream,
connFlowController: connFC,
callback: cb,
}
}
func (q *windowUpdateQueue) AddStream(id protocol.StreamID) {
q.mutex.Lock()
q.queue[id] = true
q.mutex.Unlock()
}
func (q *windowUpdateQueue) AddConnection() {
q.mutex.Lock()
q.queuedConn = true
q.mutex.Unlock()
}
func (q *windowUpdateQueue) QueueAll() {
q.mutex.Lock()
// queue a connection-level window update
if q.queuedConn {
q.callback(&wire.MaxDataFrame{ByteOffset: q.connFlowController.GetWindowUpdate()})
q.queuedConn = false
}
// queue all stream-level window updates
var offset protocol.ByteCount
for id := range q.queue {
if id == q.cryptoStream.StreamID() {
offset = q.cryptoStream.getWindowUpdate()
} else {
str, err := q.streamGetter.GetOrOpenReceiveStream(id)
if err != nil || str == nil { // the stream can be nil if it was completed before dequeing the window update
continue
}
offset = str.getWindowUpdate()
}
if offset == 0 { // can happen if we received a final offset, right after queueing the window update
continue
}
q.callback(&wire.MaxStreamDataFrame{
StreamID: id,
ByteOffset: offset,
})
delete(q.queue, id)
}
q.mutex.Unlock()
}