-
-
Notifications
You must be signed in to change notification settings - Fork 93
/
Copy pathclient_experimental.go
101 lines (91 loc) · 3.66 KB
/
client_experimental.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package centrifuge
import (
"errors"
"github.com/centrifugal/protocol"
)
var errNoSubscription = errors.New("no subscription to a channel")
// WritePublication allows sending publications to Client subscription directly
// without HUB and Broker semantics. The possible use case is to turn subscription
// to a channel into an individual data stream.
// This API is EXPERIMENTAL and may be changed/removed.
func (c *Client) WritePublication(channel string, publication *Publication, sp StreamPosition) error {
if !c.IsSubscribed(channel) {
return errNoSubscription
}
pub := pubToProto(publication)
protoType := c.transport.Protocol().toProto()
if protoType == protocol.TypeJSON {
if c.transport.Unidirectional() {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
jsonPush, err := protocol.DefaultJsonPushEncoder.Encode(push)
if err != nil {
go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c)
return err
}
return c.writePublicationNoDelta(channel, pub, jsonPush, sp)
} else {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
jsonReply, err := protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push})
if err != nil {
go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c)
return err
}
return c.writePublicationNoDelta(channel, pub, jsonReply, sp)
}
} else if protoType == protocol.TypeProtobuf {
if c.transport.Unidirectional() {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
protobufPush, err := protocol.DefaultProtobufPushEncoder.Encode(push)
if err != nil {
return err
}
return c.writePublicationNoDelta(channel, pub, protobufPush, sp)
} else {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
protobufReply, err := protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push})
if err != nil {
return err
}
return c.writePublicationNoDelta(channel, pub, protobufReply, sp)
}
}
return errors.New("unknown protocol type")
}
// AcquireStorage returns an attached connection storage (a map) and a function to be
// called when the application finished working with the storage map. Be accurate when
// using this API – avoid acquiring storage for a long time - i.e. on the time of IO operations.
// Do the work fast and release with the updated map. The API designed this way to allow
// reading, modifying or fully overriding storage map and avoid making deep copies each time.
// Note, that if storage map has not been initialized yet - i.e. if it's nil - then it will
// be initialized to an empty map and then returned – so you never receive nil map when
// acquiring. The purpose of this map is to simplify handling user-defined state during the
// lifetime of connection. Try to keep this map reasonably small.
// This API is EXPERIMENTAL and may be changed/removed.
func (c *Client) AcquireStorage() (map[string]any, func(map[string]any)) {
c.storageMu.Lock()
if c.storage == nil {
c.storage = map[string]any{}
}
return c.storage, func(updatedStorage map[string]any) {
c.storage = updatedStorage
c.storageMu.Unlock()
}
}
// OnStateSnapshot allows settings StateSnapshotHandler.
// This API is EXPERIMENTAL and may be changed/removed.
func (c *Client) OnStateSnapshot(h StateSnapshotHandler) {
c.eventHub.stateSnapshotHandler = h
}
// StateSnapshot allows collecting current state copy.
// Mostly useful for connection introspection from the outside.
// This API is EXPERIMENTAL and may be changed/removed.
func (c *Client) StateSnapshot() (any, error) {
if c.eventHub.stateSnapshotHandler != nil {
return c.eventHub.stateSnapshotHandler()
}
return nil, nil
}