From 68ad7f090b6f7634b9f0948f470bca34499a4636 Mon Sep 17 00:00:00 2001 From: Xavrax Date: Tue, 10 Dec 2024 13:29:10 +0100 Subject: [PATCH] sub cmt --- listener_manager.go | 1 + subscription_manager.go | 8 +++++--- subscription_manager_test.go | 37 ++++++++++++++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/listener_manager.go b/listener_manager.go index c4f279ab..ba773bb6 100644 --- a/listener_manager.go +++ b/listener_manager.go @@ -270,6 +270,7 @@ type PNMessage struct { Subscription string Publisher string Timetoken int64 + CustomMessageType string Error error } diff --git a/subscription_manager.go b/subscription_manager.go index cbbe4913..fe20926d 100644 --- a/subscription_manager.go +++ b/subscription_manager.go @@ -481,6 +481,7 @@ type subscribeMessage struct { UserMetadata interface{} `json:"u"` MessageType PNMessageType `json:"e"` SequenceNumber int `json:"s"` + CustomMessageType string `json:"ctm"` PublishMetaData publishMetadata `json:"p"` } @@ -638,7 +639,7 @@ func processNonPresencePayload(m *SubscriptionManager, payload subscribeMessage, switch payload.MessageType { case PNMessageTypeSignal: - pnMessageResult := createPNMessageResult(payload.Payload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, /*no error*/nil) + pnMessageResult := createPNMessageResult(payload.Payload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, payload.CustomMessageType, /*no error*/nil) m.pubnub.Config.Log.Println("announceSignal,", pnMessageResult) m.listenerManager.announceSignal(pnMessageResult) case PNMessageTypeObjects: @@ -693,7 +694,7 @@ func processNonPresencePayload(m *SubscriptionManager, payload subscribeMessage, m.listenerManager.announceStatus(pnStatus) } - pnMessageResult := createPNMessageResult(messagePayload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, err) + pnMessageResult := createPNMessageResult(messagePayload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, payload.CustomMessageType, err) m.pubnub.Config.Log.Println("announceMessage,", pnMessageResult) m.listenerManager.announceMessage(pnMessageResult) } @@ -934,7 +935,7 @@ func createPNObjectsResult(objPayload interface{}, m *SubscriptionManager, actua return pnUUIDEvent, pnChannelEvent, pnMembershipEvent, eventType } -func createPNMessageResult(messagePayload interface{}, actualCh, subscribedCh, channel, subscriptionMatch, issuingClientID string, userMetadata interface{}, timetoken int64, error error) *PNMessage { +func createPNMessageResult(messagePayload interface{}, actualCh, subscribedCh, channel, subscriptionMatch, issuingClientID string, userMetadata interface{}, timetoken int64, CustomMessageType string, error error) *PNMessage { pnMessageResult := &PNMessage{ Message: messagePayload, @@ -945,6 +946,7 @@ func createPNMessageResult(messagePayload interface{}, actualCh, subscribedCh, c Timetoken: timetoken, Publisher: issuingClientID, UserMetadata: userMetadata, + CustomMessageType: CustomMessageType, Error: error, } diff --git a/subscription_manager_test.go b/subscription_manager_test.go index 47f8243f..f5c384fe 100644 --- a/subscription_manager_test.go +++ b/subscription_manager_test.go @@ -571,6 +571,43 @@ func TestProcessSubscribePayloadCipherErr(t *testing.T) { //pn.Destroy() } +func TestProcessSubscribePayloadWithCustomMessageType(t *testing.T) { + assert := assert.New(t) + done := make(chan bool) + pn := NewPubNub(NewDemoConfig()) + + listener := NewListener() + + go func() { + for { + select { + case result := <-listener.Message: + assert.Equal("test", result.Message) + assert.Equal("custom", result.CustomMessageType) + assert.Nil(result.Error) + done <- true + break + case <-listener.Status: + case <-listener.Presence: + break + } + } + }() + + pn.AddListener(listener) + + sm := &subscribeMessage{ + Shard: "1", + SubscriptionMatch: "channel", + Channel: "channel", + Payload: "test", + CustomMessageType: "custom", + } + + processSubscribePayload(pn.subscriptionManager, *sm) + <-done +} + func TestDecryptionProcessOnEncryptedMessage(t *testing.T) { assert := assert.New(t) pn := NewPubNub(NewDemoConfig())