Skip to content

Commit

Permalink
publish idempotency key in Broker
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Nov 19, 2023
1 parent 17486ca commit 054cd34
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 36 deletions.
4 changes: 4 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ type PublishOptions struct {
ClientInfo *ClientInfo
// Tags to set Publication.Tags.
Tags map[string]string
// IdempotencyKey is an optional key for idempotent publish. Broker implementation
// may cache these keys for some time to prevent duplicate publications. In this case
// the returned result is the same as from the previous publication with the same key.
IdempotencyKey string
}

// Broker is responsible for PUB/SUB mechanics.
Expand Down
78 changes: 73 additions & 5 deletions broker_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type MemoryBroker struct {

closeOnce sync.Once
closeCh chan struct{}

resultKeyExpSeconds int64
nextExpireCheck int64
resultExpireQueue priority.Queue
resultCache map[string]StreamPosition
resultCacheMu sync.RWMutex
}

var _ Broker = (*MemoryBroker)(nil)
Expand All @@ -42,6 +48,8 @@ type MemoryBrokerConfig struct{}

const numPubLocks = 4096

const idempotentResulExpireSeconds = 30

// NewMemoryBroker initializes MemoryBroker.
func NewMemoryBroker(n *Node, _ MemoryBrokerConfig) (*MemoryBroker, error) {
pubLocks := make(map[int]*sync.Mutex, numPubLocks)
Expand All @@ -50,17 +58,20 @@ func NewMemoryBroker(n *Node, _ MemoryBrokerConfig) (*MemoryBroker, error) {
}
closeCh := make(chan struct{})
b := &MemoryBroker{
node: n,
historyHub: newHistoryHub(n.config.HistoryMetaTTL, closeCh),
pubLocks: pubLocks,
closeCh: closeCh,
node: n,
historyHub: newHistoryHub(n.config.HistoryMetaTTL, closeCh),
pubLocks: pubLocks,
closeCh: closeCh,
resultCache: map[string]StreamPosition{},
resultKeyExpSeconds: idempotentResulExpireSeconds,
}
return b, nil
}

// Run runs memory broker.
func (b *MemoryBroker) Run(h BrokerEventHandler) error {
b.eventHandler = h
go b.expireResultCache()
b.historyHub.runCleanups()
return nil
}
Expand All @@ -84,6 +95,15 @@ func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (Str
mu.Lock()
defer mu.Unlock()

if opts.IdempotencyKey != "" {
b.resultCacheMu.RLock()
if res, ok := b.resultCache[opts.IdempotencyKey]; ok {
b.resultCacheMu.RUnlock()
return res, nil
}
b.resultCacheMu.RUnlock()
}

pub := &Publication{
Data: data,
Info: opts.ClientInfo,
Expand All @@ -95,9 +115,57 @@ func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (Str
return StreamPosition{}, err
}
pub.Offset = streamTop.Offset
if opts.IdempotencyKey != "" {
b.saveToResultCache(opts.IdempotencyKey, streamTop)
}
return streamTop, b.eventHandler.HandlePublication(ch, pub, streamTop)
}
return StreamPosition{}, b.eventHandler.HandlePublication(ch, pub, StreamPosition{})
streamPosition := StreamPosition{}
if opts.IdempotencyKey != "" {
b.saveToResultCache(opts.IdempotencyKey, streamPosition)
}
return streamPosition, b.eventHandler.HandlePublication(ch, pub, StreamPosition{})
}

func (b *MemoryBroker) saveToResultCache(key string, sp StreamPosition) {
b.resultCacheMu.Lock()
b.resultCache[key] = sp
expireAt := time.Now().Unix() + b.resultKeyExpSeconds
heap.Push(&b.resultExpireQueue, &priority.Item{Value: key, Priority: expireAt})
if b.nextExpireCheck == 0 || b.nextExpireCheck > expireAt {
b.nextExpireCheck = expireAt
}
b.resultCacheMu.Unlock()
}

func (b *MemoryBroker) expireResultCache() {
var nextExpireCheck int64
for {
select {
case <-time.After(time.Second):
case <-b.closeCh:
return
}
b.resultCacheMu.Lock()
if b.nextExpireCheck == 0 || b.nextExpireCheck > time.Now().Unix() {
b.resultCacheMu.Unlock()
continue
}
nextExpireCheck = 0
for b.resultExpireQueue.Len() > 0 {
item := heap.Pop(&b.resultExpireQueue).(*priority.Item)
expireAt := item.Priority
if expireAt > time.Now().Unix() {
heap.Push(&b.resultExpireQueue, item)
nextExpireCheck = expireAt
break
}
key := item.Value
delete(b.resultCache, key)
}
b.nextExpireCheck = nextExpireCheck
b.resultCacheMu.Unlock()
}
}

// PublishJoin - see Broker interface description.
Expand Down
42 changes: 42 additions & 0 deletions broker_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,48 @@ func TestMemoryBrokerPublishHistory(t *testing.T) {
require.Equal(t, 1, len(pubs))
}

func TestMemoryBrokerPublishIdempotent(t *testing.T) {
e := testMemoryBroker()
defer func() { _ = e.node.Shutdown(context.Background()) }()

require.NotEqual(t, nil, e.historyHub)

// Test publish with history and with idempotency key.
sp1, err := e.Publish("channel", testPublicationData(), PublishOptions{
HistorySize: 4,
HistoryTTL: time.Second,
IdempotencyKey: "test",
})
require.NoError(t, err)
pubs, _, err := e.History("channel", HistoryOptions{
Filter: HistoryFilter{
Limit: -1,
Since: nil,
},
})
require.NoError(t, err)
require.Equal(t, 1, len(pubs))

// Publish with same key.
sp2, err := e.Publish("channel", testPublicationData(), PublishOptions{
HistorySize: 4,
HistoryTTL: time.Second,
IdempotencyKey: "test",
})
require.NoError(t, err)
pubs, _, err = e.History("channel", HistoryOptions{
Filter: HistoryFilter{
Limit: -1,
Since: nil,
},
})
require.NoError(t, err)
require.Equal(t, 1, len(pubs))

// Make sure stream positions match.
require.Equal(t, sp1, sp2)
}

func TestMemoryEngineSubscribeUnsubscribe(t *testing.T) {
e := testMemoryBroker()
defer func() { _ = e.node.Shutdown(context.Background()) }()
Expand Down
111 changes: 82 additions & 29 deletions broker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,22 @@ type shardWrapper struct {
// By default, Redis >= 5 required (due to the fact RedisBroker uses STREAM data structure
// to keep publication history for a channel).
type RedisBroker struct {
controlRound uint64
node *Node
sharding bool
config RedisBrokerConfig
shards []*shardWrapper
historyListScript *rueidis.Lua
historyStreamScript *rueidis.Lua
addHistoryListScript *rueidis.Lua
addHistoryStreamScript *rueidis.Lua
shardChannel string
messagePrefix string
controlChannel string
nodeChannel string
closeOnce sync.Once
closeCh chan struct{}
controlRound uint64
node *Node
sharding bool
config RedisBrokerConfig
shards []*shardWrapper
publishIdempotentScript *rueidis.Lua
historyListScript *rueidis.Lua
historyStreamScript *rueidis.Lua
addHistoryListScript *rueidis.Lua
addHistoryStreamScript *rueidis.Lua
shardChannel string
messagePrefix string
controlChannel string
nodeChannel string
closeOnce sync.Once
closeCh chan struct{}
}

// RedisBrokerConfig is a config for Broker.
Expand Down Expand Up @@ -173,15 +174,16 @@ func NewRedisBroker(n *Node, config RedisBrokerConfig) (*RedisBroker, error) {
}

b := &RedisBroker{
node: n,
config: config,
shards: shardWrappers,
sharding: len(config.Shards) > 1,
historyStreamScript: rueidis.NewLuaScript(historyStreamSource),
historyListScript: rueidis.NewLuaScript(historyListSource),
addHistoryStreamScript: rueidis.NewLuaScript(addHistoryStreamSource),
addHistoryListScript: rueidis.NewLuaScript(addHistoryListSource),
closeCh: make(chan struct{}),
node: n,
config: config,
shards: shardWrappers,
sharding: len(config.Shards) > 1,
publishIdempotentScript: rueidis.NewLuaScript(publishIdempotentSource),
historyStreamScript: rueidis.NewLuaScript(historyStreamSource),
historyListScript: rueidis.NewLuaScript(historyListSource),
addHistoryStreamScript: rueidis.NewLuaScript(addHistoryStreamSource),
addHistoryListScript: rueidis.NewLuaScript(addHistoryListSource),
closeCh: make(chan struct{}),
}
b.shardChannel = config.Prefix + redisPubSubShardChannelSuffix
b.messagePrefix = config.Prefix + redisClientChannelPrefix
Expand Down Expand Up @@ -222,6 +224,9 @@ func NewRedisBroker(n *Node, config RedisBrokerConfig) (*RedisBroker, error) {
}

var (
//go:embed internal/redis_lua/broker_publish_idempotent.lua
publishIdempotentSource string

//go:embed internal/redis_lua/broker_history_add_list.lua
addHistoryListSource string

Expand Down Expand Up @@ -614,14 +619,50 @@ func (b *RedisBroker) publish(s *shardWrapper, ch string, data []byte, opts Publ
publishCommand = "spublish"
}

idempotencyKey := opts.IdempotencyKey
var resultKey channelID
var resultExpire string
if idempotencyKey != "" {
resultKey = b.resultCacheKey(s.shard, ch)
resultExpire = strconv.Itoa(idempotentResulExpireSeconds)
}

if opts.HistorySize <= 0 || opts.HistoryTTL <= 0 {
var resp rueidis.RedisResult
if useShardedPublish {
cmd := s.shard.client.B().Spublish().Channel(string(publishChannel)).Message(convert.BytesToString(byteMessage)).Build()
resp = s.shard.client.Do(context.Background(), cmd)
if resultKey == "" {
cmd := s.shard.client.B().Spublish().Channel(string(publishChannel)).Message(convert.BytesToString(byteMessage)).Build()
resp = s.shard.client.Do(context.Background(), cmd)
} else {
resp = b.publishIdempotentScript.Exec(
context.Background(),
s.shard.client,
[]string{string(resultKey)},
[]string{
convert.BytesToString(byteMessage),
string(publishChannel),
publishCommand,
resultExpire,
},
)
}
} else {
cmd := s.shard.client.B().Publish().Channel(string(publishChannel)).Message(convert.BytesToString(byteMessage)).Build()
resp = s.shard.client.Do(context.Background(), cmd)
if resultKey == "" {
cmd := s.shard.client.B().Publish().Channel(string(publishChannel)).Message(convert.BytesToString(byteMessage)).Build()
resp = s.shard.client.Do(context.Background(), cmd)
} else {
resp = b.publishIdempotentScript.Exec(
context.Background(),
s.shard.client,
[]string{string(resultKey)},
[]string{
convert.BytesToString(byteMessage),
string(publishChannel),
publishCommand,
resultExpire,
},
)
}
}
return StreamPosition{}, resp.Error()
}
Expand Down Expand Up @@ -651,7 +692,7 @@ func (b *RedisBroker) publish(s *shardWrapper, ch string, data []byte, opts Publ
replies, err := script.Exec(
context.Background(),
s.shard.client,
[]string{string(streamKey), string(historyMetaKey)},
[]string{string(streamKey), string(historyMetaKey), string(resultKey)},
[]string{
convert.BytesToString(byteMessage),
strconv.Itoa(size),
Expand All @@ -660,6 +701,7 @@ func (b *RedisBroker) publish(s *shardWrapper, ch string, data []byte, opts Publ
strconv.Itoa(historyMetaTTLSeconds),
strconv.FormatInt(time.Now().Unix(), 10),
publishCommand,
resultExpire,
},
).ToArray()
if err != nil {
Expand Down Expand Up @@ -858,6 +900,17 @@ func (b *RedisBroker) nodeChannelID(nodeID string) channelID {
return channelID(b.config.Prefix + redisNodeChannelPrefix + nodeID)
}

func (b *RedisBroker) resultCacheKey(s *RedisShard, ch string) channelID {
if s.useCluster {
if b.config.numClusterShards > 0 {
ch = "{" + strconv.Itoa(consistentIndex(ch, b.config.numClusterShards)) + "}." + ch
} else {
ch = "{" + ch + "}"
}
}
return channelID(b.config.Prefix + ".result." + ch)
}

func (b *RedisBroker) historyListKey(s *RedisShard, ch string) channelID {
if s.useCluster {
if b.config.numClusterShards > 0 {
Expand Down
Loading

0 comments on commit 054cd34

Please sign in to comment.