-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathunsubscribe.go
78 lines (67 loc) · 2.01 KB
/
unsubscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Copyright 2019 The mqtt-go authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mqtt
import (
"context"
)
type pktUnsubscribe struct {
ID uint16
Topics []string
}
func (p *pktUnsubscribe) Pack() []byte {
payload := make([]byte, 0, packetBufferCap)
for _, sub := range p.Topics {
payload = appendString(payload, sub)
}
return pack(
byte(packetUnsubscribe|packetFromClient),
packUint16(p.ID),
payload,
)
}
// Unsubscribe topics.
func (c *BaseClient) Unsubscribe(ctx context.Context, subs ...string) error {
return unsubscribeImpl(ctx, c, subs...)
}
func unsubscribeImpl(ctx context.Context, c *BaseClient, subs ...string) error {
c.muConnecting.RLock()
defer c.muConnecting.RUnlock()
id := c.newID()
sig, err := c.signaller()
if err != nil {
return err
}
chUnsubAck := make(chan *pktUnsubAck, 1)
sig.mu.Lock()
if sig.chUnsubAck == nil {
sig.chUnsubAck = make(map[uint16]chan *pktUnsubAck, 1)
}
sig.chUnsubAck[id] = chUnsubAck
sig.mu.Unlock()
retryUnsubscribe := func(ctx context.Context, cli *BaseClient) error {
return unsubscribeImpl(ctx, cli, subs...)
}
pkt := (&pktUnsubscribe{ID: id, Topics: subs}).Pack()
if err := c.write(pkt); err != nil {
return wrapErrorWithRetry(err, retryUnsubscribe, "sending UNSUBSCRIBE")
}
select {
case <-c.connClosed:
return wrapErrorWithRetry(ErrClosedTransport, retryUnsubscribe, "waiting UNSUBACK")
case <-ctx.Done():
return wrapErrorWithRetry(ctx.Err(), retryUnsubscribe, "waiting UNSUBACK")
case <-chUnsubAck:
}
return nil
}