Skip to content

Commit

Permalink
sub cmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Xavrax committed Dec 10, 2024
1 parent 69fcff6 commit 68ad7f0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
1 change: 1 addition & 0 deletions listener_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type PNMessage struct {
Subscription string
Publisher string
Timetoken int64
CustomMessageType string
Error error
}

Expand Down
8 changes: 5 additions & 3 deletions subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ type subscribeMessage struct {
UserMetadata interface{} `json:"u"`
MessageType PNMessageType `json:"e"`
SequenceNumber int `json:"s"`
CustomMessageType string `json:"ctm"`

PublishMetaData publishMetadata `json:"p"`
}
Expand Down Expand Up @@ -638,7 +639,7 @@ func processNonPresencePayload(m *SubscriptionManager, payload subscribeMessage,

switch payload.MessageType {
case PNMessageTypeSignal:
pnMessageResult := createPNMessageResult(payload.Payload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, /*no error*/nil)
pnMessageResult := createPNMessageResult(payload.Payload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, payload.CustomMessageType, /*no error*/nil)
m.pubnub.Config.Log.Println("announceSignal,", pnMessageResult)
m.listenerManager.announceSignal(pnMessageResult)
case PNMessageTypeObjects:
Expand Down Expand Up @@ -693,7 +694,7 @@ func processNonPresencePayload(m *SubscriptionManager, payload subscribeMessage,
m.listenerManager.announceStatus(pnStatus)

}
pnMessageResult := createPNMessageResult(messagePayload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, err)
pnMessageResult := createPNMessageResult(messagePayload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, payload.CustomMessageType, err)
m.pubnub.Config.Log.Println("announceMessage,", pnMessageResult)
m.listenerManager.announceMessage(pnMessageResult)
}
Expand Down Expand Up @@ -934,7 +935,7 @@ func createPNObjectsResult(objPayload interface{}, m *SubscriptionManager, actua
return pnUUIDEvent, pnChannelEvent, pnMembershipEvent, eventType
}

func createPNMessageResult(messagePayload interface{}, actualCh, subscribedCh, channel, subscriptionMatch, issuingClientID string, userMetadata interface{}, timetoken int64, error error) *PNMessage {
func createPNMessageResult(messagePayload interface{}, actualCh, subscribedCh, channel, subscriptionMatch, issuingClientID string, userMetadata interface{}, timetoken int64, CustomMessageType string, error error) *PNMessage {

pnMessageResult := &PNMessage{
Message: messagePayload,
Expand All @@ -945,6 +946,7 @@ func createPNMessageResult(messagePayload interface{}, actualCh, subscribedCh, c
Timetoken: timetoken,
Publisher: issuingClientID,
UserMetadata: userMetadata,
CustomMessageType: CustomMessageType,
Error: error,
}

Expand Down
37 changes: 37 additions & 0 deletions subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,43 @@ func TestProcessSubscribePayloadCipherErr(t *testing.T) {
//pn.Destroy()
}

func TestProcessSubscribePayloadWithCustomMessageType(t *testing.T) {
assert := assert.New(t)
done := make(chan bool)
pn := NewPubNub(NewDemoConfig())

listener := NewListener()

go func() {
for {
select {
case result := <-listener.Message:
assert.Equal("test", result.Message)
assert.Equal("custom", result.CustomMessageType)
assert.Nil(result.Error)
done <- true
break
case <-listener.Status:
case <-listener.Presence:
break
}
}
}()

pn.AddListener(listener)

sm := &subscribeMessage{
Shard: "1",
SubscriptionMatch: "channel",
Channel: "channel",
Payload: "test",
CustomMessageType: "custom",
}

processSubscribePayload(pn.subscriptionManager, *sm)
<-done
}

func TestDecryptionProcessOnEncryptedMessage(t *testing.T) {
assert := assert.New(t)
pn := NewPubNub(NewDemoConfig())
Expand Down

0 comments on commit 68ad7f0

Please sign in to comment.