Skip to content

Commit

Permalink
use subscribe option
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jan 1, 2024
1 parent 2b2306e commit ada4d62
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 14 deletions.
7 changes: 4 additions & 3 deletions _examples/chat_json/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
max-width: 90%;
}
</style>
<script type="text/javascript" src="http://localhost:2000/centrifuge.js"></script>
<script type="text/javascript" src="https://unpkg.com/centrifuge@^5/dist/centrifuge.js"></script>
<script type="text/javascript">
class Message {
constructor(time, text, meta) {
Expand Down Expand Up @@ -225,8 +225,9 @@
// subscription request will be sent after client connects to
// a server.
const sub = centrifuge.newSubscription(channel, {
sinceLatestPublication: {
epoch: "",
since: {
'offset': 0,
'epoch': '',
}
});

Expand Down
2 changes: 0 additions & 2 deletions _examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ go 1.20

replace github.com/centrifugal/centrifuge => ../

replace github.com/centrifugal/protocol => ../../protocol

require (
github.com/FZambia/tarantool v0.2.2
github.com/centrifugal/centrifuge v0.8.2
Expand Down
2 changes: 2 additions & 0 deletions _examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/bradleypeabody/gorilla-sessions-memcache v0.0.0-20181103040241-659414
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/centrifugal/protocol v0.11.1-0.20231217174539-d7863acb8224 h1:msOUFVZH4p0o/GYiIqp8IDfx9CjlBewG09L6Gl3gyxs=
github.com/centrifugal/protocol v0.11.1-0.20231217174539-d7863acb8224/go.mod h1:33nZhrA2iRoR6jT+oVzu1ARx+iWIgxOgYuZMhWMWVM4=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
Expand Down
20 changes: 13 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2669,15 +2669,21 @@ func isRecovered(historyResult HistoryResult, cmdOffset uint64, cmdEpoch string)
return recoveredPubs, recovered
}

func isLastMessageRecovered(historyResult HistoryResult, cmdEpoch string) ([]*protocol.Publication, bool) {
func isStateRecovered(historyResult HistoryResult, cmdOffset uint64, cmdEpoch string) ([]*protocol.Publication, bool) {
latestOffset := historyResult.Offset
latestEpoch := historyResult.Epoch
var recovered bool
recoveredPubs := make([]*protocol.Publication, 0, len(historyResult.Publications))
if len(historyResult.Publications) > 0 {
protoPub := pubToProto(historyResult.Publications[0])
recoveredPubs = append(recoveredPubs, protoPub)
recovered = recoveredPubs[0].Offset == latestOffset && (cmdEpoch == "" || latestEpoch == cmdEpoch)
publication := historyResult.Publications[0]
recovered = publication.Offset == latestOffset && (cmdEpoch == "" || latestEpoch == cmdEpoch)
if recovered && publication.Offset > cmdOffset {
protoPub := pubToProto(publication)
recoveredPubs = append(recoveredPubs, protoPub)
}
} else if cmdOffset > 0 && (cmdEpoch == "" || latestEpoch == cmdEpoch) && latestOffset == cmdOffset {
// Client already had state, which has not been modified since.
recovered = true
}
return recoveredPubs, recovered
}
Expand Down Expand Up @@ -2760,9 +2766,9 @@ func (c *Client) subscribeCmd(req *protocol.SubscribeRequest, reply SubscribeRep
if reply.Options.EnableRecovery && req.Recover {
cmdOffset := req.Offset
cmdEpoch := req.Epoch
cmdRecoverLatestPublication := req.RecoverLatestPublication
recoveryMode := reply.Options.RecoveryMode

if cmdRecoverLatestPublication {
if recoveryMode == RecoveryModeState {
historyResult, err := c.node.History(channel, WithHistoryFilter(HistoryFilter{
Limit: 1,
Reverse: true,
Expand All @@ -2779,7 +2785,7 @@ func (c *Client) subscribeCmd(req *protocol.SubscribeRequest, reply SubscribeRep
latestOffset = historyResult.Offset
latestEpoch = historyResult.Epoch
var recovered bool
recoveredPubs, recovered = isLastMessageRecovered(historyResult, cmdEpoch)
recoveredPubs, recovered = isStateRecovered(historyResult, cmdOffset, cmdEpoch)
res.Recovered = recovered
c.node.metrics.incRecover(res.Recovered)
} else {
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ module github.com/centrifugal/centrifuge

go 1.20

replace github.com/centrifugal/protocol => ../protocol

require (
github.com/FZambia/eagle v0.1.0
github.com/centrifugal/protocol v0.11.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/FZambia/eagle v0.1.0 h1:9gyX6x+xjoIfglgyPTcYm7dvY7FJ93us1QY5De4CyXA=
github.com/FZambia/eagle v0.1.0/go.mod h1:YjGSPVkQTNcVLfzEUQJNgW9ScPR0K4u/Ky0yeFa4oDA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/centrifugal/protocol v0.11.0 h1:pQKfVT4c3/uiRNszaOenE4NqJqL4VBlzBku8skI739w=
github.com/centrifugal/protocol v0.11.0/go.mod h1:33nZhrA2iRoR6jT+oVzu1ARx+iWIgxOgYuZMhWMWVM4=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
16 changes: 16 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type SubscribeOptions struct {
// Make sure you are using EnableRecovery in channels that maintain Publication
// history stream.
EnableRecovery bool
// RecoveryMode is by default RecoveryModeStream, but can be also RecoveryModeState.
RecoveryMode RecoveryMode
// Data to send to a client with Subscribe Push.
Data []byte
// RecoverSince will try to subscribe a client and recover from a certain StreamPosition.
Expand Down Expand Up @@ -148,6 +150,20 @@ func WithRecovery(enabled bool) SubscribeOption {
}
}

type RecoveryMode int32

const (
RecoveryModeStream RecoveryMode = 0
RecoveryModeState RecoveryMode = 1
)

// WithRecoveryMode ...
func WithRecoveryMode(mode RecoveryMode) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.RecoveryMode = mode
}
}

// WithSubscribeClient allows setting client ID that should be subscribed.
// This option not used when Client.Subscribe called.
func WithSubscribeClient(clientID string) SubscribeOption {
Expand Down

0 comments on commit ada4d62

Please sign in to comment.