forked from quic-go/quic-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstreams_map_incoming_generic_test.go
165 lines (144 loc) · 5.15 KB
/
streams_map_incoming_generic_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package quic
import (
"errors"
"fmt"
"github.com/golang/mock/gomock"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
type mockGenericStream struct {
id protocol.StreamID
closed bool
closeErr error
}
func (s *mockGenericStream) closeForShutdown(err error) {
s.closed = true
s.closeErr = err
}
var _ = Describe("Streams Map (incoming)", func() {
const (
firstNewStream protocol.StreamID = 20
maxNumStreams int = 10
initialMaxStream protocol.StreamID = firstNewStream + 4*protocol.StreamID(maxNumStreams-1)
)
var (
m *incomingItemsMap
newItem func(id protocol.StreamID) item
newItemCounter int
mockSender *MockStreamSender
)
BeforeEach(func() {
newItemCounter = 0
newItem = func(id protocol.StreamID) item {
newItemCounter++
return &mockGenericStream{id: id}
}
mockSender = NewMockStreamSender(mockCtrl)
m = newIncomingItemsMap(firstNewStream, initialMaxStream, maxNumStreams, mockSender.queueControlFrame, newItem)
})
It("opens all streams up to the id on GetOrOpenStream", func() {
_, err := m.GetOrOpenStream(firstNewStream + 4*5)
Expect(err).ToNot(HaveOccurred())
Expect(newItemCounter).To(Equal(6))
})
It("starts opening streams at the right position", func() {
// like the test above, but with 2 calls to GetOrOpenStream
_, err := m.GetOrOpenStream(firstNewStream + 4)
Expect(err).ToNot(HaveOccurred())
Expect(newItemCounter).To(Equal(2))
_, err = m.GetOrOpenStream(firstNewStream + 4*5)
Expect(err).ToNot(HaveOccurred())
Expect(newItemCounter).To(Equal(6))
})
It("accepts streams in the right order", func() {
_, err := m.GetOrOpenStream(firstNewStream + 4) // open stream 20 and 24
Expect(err).ToNot(HaveOccurred())
str, err := m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
str, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream + 4))
})
It("allows opening the maximum stream ID", func() {
str, err := m.GetOrOpenStream(initialMaxStream)
Expect(err).ToNot(HaveOccurred())
Expect(str.(*mockGenericStream).id).To(Equal(initialMaxStream))
})
It("errors when trying to get a stream ID higher than the maximum", func() {
_, err := m.GetOrOpenStream(initialMaxStream + 4)
Expect(err).To(MatchError(fmt.Errorf("peer tried to open stream %d (current limit: %d)", initialMaxStream+4, initialMaxStream)))
})
It("blocks AcceptStream until a new stream is available", func() {
strChan := make(chan item)
go func() {
defer GinkgoRecover()
str, err := m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
strChan <- str
}()
Consistently(strChan).ShouldNot(Receive())
str, err := m.GetOrOpenStream(firstNewStream)
Expect(err).ToNot(HaveOccurred())
Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
var acceptedStr item
Eventually(strChan).Should(Receive(&acceptedStr))
Expect(acceptedStr.(*mockGenericStream).id).To(Equal(firstNewStream))
})
It("unblocks AcceptStream when it is closed", func() {
testErr := errors.New("test error")
done := make(chan struct{})
go func() {
defer GinkgoRecover()
_, err := m.AcceptStream()
Expect(err).To(MatchError(testErr))
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
m.CloseWithError(testErr)
Eventually(done).Should(BeClosed())
})
It("errors AcceptStream immediately if it is closed", func() {
testErr := errors.New("test error")
m.CloseWithError(testErr)
_, err := m.AcceptStream()
Expect(err).To(MatchError(testErr))
})
It("closes all streams when CloseWithError is called", func() {
str1, err := m.GetOrOpenStream(20)
Expect(err).ToNot(HaveOccurred())
str2, err := m.GetOrOpenStream(20 + 8)
Expect(err).ToNot(HaveOccurred())
testErr := errors.New("test err")
m.CloseWithError(testErr)
Expect(str1.(*mockGenericStream).closed).To(BeTrue())
Expect(str1.(*mockGenericStream).closeErr).To(MatchError(testErr))
Expect(str2.(*mockGenericStream).closed).To(BeTrue())
Expect(str2.(*mockGenericStream).closeErr).To(MatchError(testErr))
})
It("deletes streams", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
_, err := m.GetOrOpenStream(20)
Expect(err).ToNot(HaveOccurred())
err = m.DeleteStream(20)
Expect(err).ToNot(HaveOccurred())
str, err := m.GetOrOpenStream(20)
Expect(err).ToNot(HaveOccurred())
Expect(str).To(BeNil())
})
It("errors when deleting a non-existing stream", func() {
err := m.DeleteStream(1337)
Expect(err).To(MatchError("Tried to delete unknown stream 1337"))
})
It("sends MAX_STREAM_ID frames when streams are deleted", func() {
// open a bunch of streams
_, err := m.GetOrOpenStream(firstNewStream + 4*4)
Expect(err).ToNot(HaveOccurred())
mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{StreamID: initialMaxStream + 4})
Expect(m.DeleteStream(firstNewStream + 4)).To(Succeed())
mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{StreamID: initialMaxStream + 8})
Expect(m.DeleteStream(firstNewStream + 3*4)).To(Succeed())
})
})