From a8439db90fd11e014b457db476e8327b6ced6358 Mon Sep 17 00:00:00 2001 From: Ariel Gentile Date: Sat, 1 Apr 2023 10:41:04 -0300 Subject: [PATCH] feat: add message pickup module (#1413) Signed-off-by: Ariel Gentile --- packages/core/src/agent/AgentModules.ts | 2 + packages/core/src/agent/BaseAgent.ts | 8 + .../core/src/agent/__tests__/Agent.test.ts | 3 + .../src/agent/__tests__/AgentModules.test.ts | 4 + .../MessagePickupApi.ts" | 96 +++++ .../MessagePickupApiOptions.ts" | 23 + .../MessagePickupModule.ts" | 60 +++ .../MessagePickupModuleConfig.ts" | 57 +++ .../__tests__/MessagePickupModule.test.ts" | 42 ++ .../__tests__/pickup.test.ts" | 11 +- .../modules/message-p\303\254ckup/index.ts" | 5 + .../protocol/BaseMessagePickupProtocol.ts" | 21 + .../protocol/MessagePickupProtocol.ts" | 16 + .../protocol/MessagePickupProtocolOptions.ts" | 12 + .../message-p\303\254ckup/protocol/index.ts" | 0 .../protocol/v1/V1MessagePickupProtocol.ts" | 85 ++++ .../protocol/v1/handlers/V1BatchHandler.ts" | 28 ++ .../v1/handlers/V1BatchPickupHandler.ts" | 19 + .../protocol/v1/handlers/index.ts" | 2 + .../protocol/v1/index.ts" | 2 + .../protocol/v1/messages/V1BatchMessage.ts" | 16 +- .../v1/messages/V1BatchPickupMessage.ts" | 10 +- .../protocol/v1/messages/index.ts" | 2 + .../protocol/v2/V2MessagePickupProtocol.ts" | 253 +++++++++++ .../V2MessagePickupProtocol.test.ts" | 407 ++++++++++++++++++ .../v2/handlers/V2DeliveryRequestHandler.ts" | 19 + .../v2/handlers/V2MessageDeliveryHandler.ts" | 27 ++ .../v2/handlers/V2MessagesReceivedHandler.ts" | 19 + .../protocol/v2/handlers/V2StatusHandler.ts" | 27 ++ .../v2/handlers/V2StatusRequestHandler.ts" | 19 + .../protocol/v2/handlers/index.ts" | 5 + .../protocol/v2/index.ts" | 2 + .../v2/messages/V2DeliveryRequestMessage.ts" | 16 +- .../v2/messages/V2MessageDeliveryMessage.ts" | 18 +- .../v2/messages/V2MessagesReceivedMessage.ts" | 16 +- .../protocol/v2/messages/V2StatusMessage.ts" | 18 +- .../v2/messages/V2StatusRequestMessage.ts" | 14 +- .../protocol/v2/messages/index.ts" | 5 + .../modules/routing/MediationRecipientApi.ts | 68 ++- .../core/src/modules/routing/MediatorApi.ts | 20 +- .../src/modules/routing/MediatorModule.ts | 11 - .../routing/__tests__/MediatorModule.test.ts | 5 +- packages/core/src/modules/routing/index.ts | 1 - .../src/modules/routing/protocol/index.ts | 1 - .../pickup/v1/MessagePickupService.ts | 62 --- .../pickup/v1/handlers/BatchHandler.ts | 32 -- .../pickup/v1/handlers/BatchPickupHandler.ts | 19 - .../protocol/pickup/v1/handlers/index.ts | 2 - .../routing/protocol/pickup/v1/index.ts | 2 - .../protocol/pickup/v1/messages/index.ts | 2 - .../pickup/v2/V2MessagePickupService.ts | 124 ------ .../v2/handlers/DeliveryRequestHandler.ts | 19 - .../v2/handlers/MessageDeliveryHandler.ts | 27 -- .../v2/handlers/MessagesReceivedHandler.ts | 19 - .../pickup/v2/handlers/StatusHandler.ts | 27 -- .../v2/handlers/StatusRequestHandler.ts | 19 - .../protocol/pickup/v2/handlers/index.ts | 5 - .../routing/protocol/pickup/v2/index.ts | 2 - .../protocol/pickup/v2/messages/index.ts | 5 - .../services/MediationRecipientService.ts | 106 +---- .../MediationRecipientService.test.ts | 151 +------ .../__tests__/V2MessagePickupService.test.ts | 251 ----------- 62 files changed, 1375 insertions(+), 994 deletions(-) create mode 100644 "packages/core/src/modules/message-p\303\254ckup/MessagePickupApi.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/MessagePickupApiOptions.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/MessagePickupModule.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/MessagePickupModuleConfig.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/__tests__/MessagePickupModule.test.ts" rename packages/core/src/modules/routing/__tests__/pickup.test.ts => "packages/core/src/modules/message-p\303\254ckup/__tests__/pickup.test.ts" (95%) create mode 100644 "packages/core/src/modules/message-p\303\254ckup/index.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/BaseMessagePickupProtocol.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/MessagePickupProtocol.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/MessagePickupProtocolOptions.ts" rename packages/core/src/modules/routing/protocol/pickup/index.ts => "packages/core/src/modules/message-p\303\254ckup/protocol/index.ts" (100%) create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v1/V1MessagePickupProtocol.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/V1BatchHandler.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/V1BatchPickupHandler.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/index.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v1/index.ts" rename packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchMessage.ts => "packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/V1BatchMessage.ts" (74%) rename packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchPickupMessage.ts => "packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/V1BatchPickupMessage.ts" (77%) create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/index.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v2/V2MessagePickupProtocol.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v2/__tests__/V2MessagePickupProtocol.test.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2DeliveryRequestHandler.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2MessageDeliveryHandler.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2MessagesReceivedHandler.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2StatusHandler.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2StatusRequestHandler.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/index.ts" create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v2/index.ts" rename packages/core/src/modules/routing/protocol/pickup/v2/messages/DeliveryRequestMessage.ts => "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2DeliveryRequestMessage.ts" (59%) rename packages/core/src/modules/routing/protocol/pickup/v2/messages/MessageDeliveryMessage.ts => "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2MessageDeliveryMessage.ts" (57%) rename packages/core/src/modules/routing/protocol/pickup/v2/messages/MessagesReceivedMessage.ts => "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2MessagesReceivedMessage.ts" (55%) rename packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusMessage.ts => "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2StatusMessage.ts" (79%) rename packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusRequestMessage.ts => "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2StatusRequestMessage.ts" (59%) create mode 100644 "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/index.ts" delete mode 100644 packages/core/src/modules/routing/protocol/index.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v1/MessagePickupService.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchHandler.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchPickupHandler.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v1/handlers/index.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v1/index.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v1/messages/index.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/V2MessagePickupService.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/handlers/DeliveryRequestHandler.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessageDeliveryHandler.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessagesReceivedHandler.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusHandler.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusRequestHandler.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/handlers/index.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/index.ts delete mode 100644 packages/core/src/modules/routing/protocol/pickup/v2/messages/index.ts delete mode 100644 packages/core/src/modules/routing/services/__tests__/V2MessagePickupService.test.ts diff --git a/packages/core/src/agent/AgentModules.ts b/packages/core/src/agent/AgentModules.ts index 7259eeb092..c4e8dec0d8 100644 --- a/packages/core/src/agent/AgentModules.ts +++ b/packages/core/src/agent/AgentModules.ts @@ -9,6 +9,7 @@ import { CredentialsModule } from '../modules/credentials' import { DidsModule } from '../modules/dids' import { DiscoverFeaturesModule } from '../modules/discover-features' import { GenericRecordsModule } from '../modules/generic-records' +import { MessagePickupModule } from '../modules/message-pìckup' import { OutOfBandModule } from '../modules/oob' import { ProofsModule } from '../modules/proofs' import { MediatorModule, MediationRecipientModule } from '../modules/routing' @@ -121,6 +122,7 @@ function getDefaultAgentModules() { proofs: () => new ProofsModule(), mediator: () => new MediatorModule(), mediationRecipient: () => new MediationRecipientModule(), + messagePickup: () => new MessagePickupModule(), basicMessages: () => new BasicMessagesModule(), genericRecords: () => new GenericRecordsModule(), discovery: () => new DiscoverFeaturesModule(), diff --git a/packages/core/src/agent/BaseAgent.ts b/packages/core/src/agent/BaseAgent.ts index 444eba3f9d..447c22d677 100644 --- a/packages/core/src/agent/BaseAgent.ts +++ b/packages/core/src/agent/BaseAgent.ts @@ -3,6 +3,7 @@ import type { AgentApi, CustomOrDefaultApi, EmptyModuleMap, ModulesMap, WithoutD import type { TransportSession } from './TransportService' import type { Logger } from '../logger' import type { CredentialsModule } from '../modules/credentials' +import type { MessagePickupModule } from '../modules/message-pìckup' import type { ProofsModule } from '../modules/proofs' import type { DependencyManager } from '../plugins' @@ -13,6 +14,7 @@ import { CredentialsApi } from '../modules/credentials' import { DidsApi } from '../modules/dids' import { DiscoverFeaturesApi } from '../modules/discover-features' import { GenericRecordsApi } from '../modules/generic-records' +import { MessagePickupApi } from '../modules/message-pìckup/MessagePickupApi' import { OutOfBandApi } from '../modules/oob' import { ProofsApi } from '../modules/proofs' import { MediatorApi, MediationRecipientApi } from '../modules/routing' @@ -47,6 +49,7 @@ export abstract class BaseAgent public readonly mediator: MediatorApi public readonly mediationRecipient: MediationRecipientApi + public readonly messagePickup: CustomOrDefaultApi public readonly basicMessages: BasicMessagesApi public readonly genericRecords: GenericRecordsApi public readonly discovery: DiscoverFeaturesApi @@ -90,6 +93,10 @@ export abstract class BaseAgent this.mediator = this.dependencyManager.resolve(MediatorApi) this.mediationRecipient = this.dependencyManager.resolve(MediationRecipientApi) + this.messagePickup = this.dependencyManager.resolve(MessagePickupApi) as CustomOrDefaultApi< + AgentModules['messagePickup'], + MessagePickupModule + > this.basicMessages = this.dependencyManager.resolve(BasicMessagesApi) this.genericRecords = this.dependencyManager.resolve(GenericRecordsApi) this.discovery = this.dependencyManager.resolve(DiscoverFeaturesApi) @@ -103,6 +110,7 @@ export abstract class BaseAgent { expect(container.resolve(MediatorApi)).toBeInstanceOf(MediatorApi) expect(container.resolve(MediationRecipientApi)).toBeInstanceOf(MediationRecipientApi) + expect(container.resolve(MessagePickupApi)).toBeInstanceOf(MessagePickupApi) expect(container.resolve(MediationRepository)).toBeInstanceOf(MediationRepository) expect(container.resolve(MediatorService)).toBeInstanceOf(MediatorService) expect(container.resolve(MediationRecipientService)).toBeInstanceOf(MediationRecipientService) @@ -208,6 +210,7 @@ describe('Agent', () => { expect(container.resolve(MediatorApi)).toBe(container.resolve(MediatorApi)) expect(container.resolve(MediationRecipientApi)).toBe(container.resolve(MediationRecipientApi)) + expect(container.resolve(MessagePickupApi)).toBe(container.resolve(MessagePickupApi)) expect(container.resolve(MediationRepository)).toBe(container.resolve(MediationRepository)) expect(container.resolve(MediatorService)).toBe(container.resolve(MediatorService)) expect(container.resolve(MediationRecipientService)).toBe(container.resolve(MediationRecipientService)) diff --git a/packages/core/src/agent/__tests__/AgentModules.test.ts b/packages/core/src/agent/__tests__/AgentModules.test.ts index 25466501d7..6a7833fbd0 100644 --- a/packages/core/src/agent/__tests__/AgentModules.test.ts +++ b/packages/core/src/agent/__tests__/AgentModules.test.ts @@ -7,6 +7,7 @@ import { CredentialsModule } from '../../modules/credentials' import { DidsModule } from '../../modules/dids' import { DiscoverFeaturesModule } from '../../modules/discover-features' import { GenericRecordsModule } from '../../modules/generic-records' +import { MessagePickupModule } from '../../modules/message-pìckup' import { OutOfBandModule } from '../../modules/oob' import { ProofsModule } from '../../modules/proofs' import { MediatorModule, MediationRecipientModule } from '../../modules/routing' @@ -59,6 +60,7 @@ describe('AgentModules', () => { proofs: expect.any(ProofsModule), mediator: expect.any(MediatorModule), mediationRecipient: expect.any(MediationRecipientModule), + messagePickup: expect.any(MessagePickupModule), basicMessages: expect.any(BasicMessagesModule), genericRecords: expect.any(GenericRecordsModule), discovery: expect.any(DiscoverFeaturesModule), @@ -82,6 +84,7 @@ describe('AgentModules', () => { proofs: expect.any(ProofsModule), mediator: expect.any(MediatorModule), mediationRecipient: expect.any(MediationRecipientModule), + messagePickup: expect.any(MessagePickupModule), basicMessages: expect.any(BasicMessagesModule), genericRecords: expect.any(GenericRecordsModule), discovery: expect.any(DiscoverFeaturesModule), @@ -108,6 +111,7 @@ describe('AgentModules', () => { proofs: expect.any(ProofsModule), mediator: expect.any(MediatorModule), mediationRecipient: expect.any(MediationRecipientModule), + messagePickup: expect.any(MessagePickupModule), basicMessages: expect.any(BasicMessagesModule), genericRecords: expect.any(GenericRecordsModule), discovery: expect.any(DiscoverFeaturesModule), diff --git "a/packages/core/src/modules/message-p\303\254ckup/MessagePickupApi.ts" "b/packages/core/src/modules/message-p\303\254ckup/MessagePickupApi.ts" new file mode 100644 index 0000000000..653fafd7d7 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/MessagePickupApi.ts" @@ -0,0 +1,96 @@ +import type { + PickupMessagesOptions, + PickupMessagesReturnType, + QueueMessageOptions, + QueueMessageReturnType, +} from './MessagePickupApiOptions' +import type { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protocol' +import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' +import type { MessageRepository } from '../../storage/MessageRepository' + +import { AgentContext } from '../../agent' +import { MessageSender } from '../../agent/MessageSender' +import { OutboundMessageContext } from '../../agent/models' +import { InjectionSymbols } from '../../constants' +import { AriesFrameworkError } from '../../error' +import { injectable } from '../../plugins' +import { ConnectionService } from '../connections/services' + +import { MessagePickupModuleConfig } from './MessagePickupModuleConfig' + +export interface MessagePickupApi { + queueMessage(options: QueueMessageOptions): Promise + pickupMessages(options: PickupMessagesOptions): Promise +} + +@injectable() +export class MessagePickupApi + implements MessagePickupApi +{ + public config: MessagePickupModuleConfig + + private messageSender: MessageSender + private agentContext: AgentContext + private connectionService: ConnectionService + + public constructor( + messageSender: MessageSender, + agentContext: AgentContext, + connectionService: ConnectionService, + config: MessagePickupModuleConfig + ) { + this.messageSender = messageSender + this.connectionService = connectionService + this.agentContext = agentContext + this.config = config + } + + private getProtocol(protocolVersion: MPP): MessagePickupProtocol { + const protocol = this.config.protocols.find((protocol) => protocol.version === protocolVersion) + + if (!protocol) { + throw new AriesFrameworkError(`No message pickup protocol registered for protocol version ${protocolVersion}`) + } + + return protocol + } + + /** + * Add an encrypted message to the message pickup queue + * + * @param options: connectionId associated to the message and the encrypted message itself + */ + public async queueMessage(options: QueueMessageOptions): Promise { + const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId) + + const messageRepository = this.agentContext.dependencyManager.resolve( + InjectionSymbols.MessageRepository + ) + + await messageRepository.add(connectionRecord.id, options.message) + } + + /** + * Pickup queued messages from a message holder. It attempts to retrieve all current messages from the + * queue, receiving up to `batchSize` messages per batch retrieval. + * + * @param options connectionId, protocol version to use and batch size + */ + public async pickupMessages(options: PickupMessagesOptions): Promise { + const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId) + + const protocol = this.getProtocol(options.protocolVersion) + const { message } = await protocol.pickupMessages(this.agentContext, { + connectionRecord, + batchSize: options.batchSize, + recipientKey: options.recipientKey, + }) + + await this.messageSender.sendMessage( + new OutboundMessageContext(message, { + agentContext: this.agentContext, + connection: connectionRecord, + }) + ) + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/MessagePickupApiOptions.ts" "b/packages/core/src/modules/message-p\303\254ckup/MessagePickupApiOptions.ts" new file mode 100644 index 0000000000..1f8d54e264 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/MessagePickupApiOptions.ts" @@ -0,0 +1,23 @@ +import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' +import type { EncryptedMessage } from '../../types' + +/** + * Get the supported protocol versions based on the provided discover features services. + */ +export type MessagePickupProtocolVersionType = MPPs[number]['version'] + +export interface QueueMessageOptions { + connectionId: string + message: EncryptedMessage +} + +export interface PickupMessagesOptions { + connectionId: string + protocolVersion: MessagePickupProtocolVersionType + recipientKey?: string + batchSize?: number +} + +export type QueueMessageReturnType = void + +export type PickupMessagesReturnType = void diff --git "a/packages/core/src/modules/message-p\303\254ckup/MessagePickupModule.ts" "b/packages/core/src/modules/message-p\303\254ckup/MessagePickupModule.ts" new file mode 100644 index 0000000000..5cf4540625 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/MessagePickupModule.ts" @@ -0,0 +1,60 @@ +import type { MessagePickupModuleConfigOptions } from './MessagePickupModuleConfig' +import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' +import type { FeatureRegistry } from '../../agent/FeatureRegistry' +import type { ApiModule, DependencyManager } from '../../plugins' +import type { Optional } from '../../utils' +import type { Constructor } from '../../utils/mixins' + +import { InjectionSymbols } from '../../constants' + +import { MessagePickupApi } from './MessagePickupApi' +import { MessagePickupModuleConfig } from './MessagePickupModuleConfig' +import { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protocol' + +/** + * Default protocols that will be registered if the `protocols` property is not configured. + */ +export type DefaultMessagePickupProtocols = [V1MessagePickupProtocol, V2MessagePickupProtocol] + +// MessagePickupModuleOptions makes the protocols property optional from the config, as it will set it when not provided. +export type MessagePickupModuleOptions = Optional< + MessagePickupModuleConfigOptions, + 'protocols' +> + +export class MessagePickupModule + implements ApiModule +{ + public readonly config: MessagePickupModuleConfig + + // Infer Api type from the config + public readonly api: Constructor> = MessagePickupApi + + public constructor(config?: MessagePickupModuleOptions) { + this.config = new MessagePickupModuleConfig({ + ...config, + protocols: config?.protocols ?? [new V1MessagePickupProtocol(), new V2MessagePickupProtocol()], + }) as MessagePickupModuleConfig + } + + /** + * Registers the dependencies of the question answer module on the dependency manager. + */ + public register(dependencyManager: DependencyManager, featureRegistry: FeatureRegistry) { + // Api + dependencyManager.registerContextScoped(MessagePickupApi) + + // Config + dependencyManager.registerInstance(MessagePickupModuleConfig, this.config) + + // Message repository + if (this.config.messageRepository) { + dependencyManager.registerInstance(InjectionSymbols.MessageRepository, this.config.messageRepository) + } + + // Protocol needs to register feature registry items and handlers + for (const protocol of this.config.protocols) { + protocol.register(dependencyManager, featureRegistry) + } + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/MessagePickupModuleConfig.ts" "b/packages/core/src/modules/message-p\303\254ckup/MessagePickupModuleConfig.ts" new file mode 100644 index 0000000000..d755c082e3 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/MessagePickupModuleConfig.ts" @@ -0,0 +1,57 @@ +import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' +import type { MessageRepository } from '../../storage/MessageRepository' + +/** + * MessagePickupModuleConfigOptions defines the interface for the options of the MessagePickupModuleConfig class. + * This can contain optional parameters that have default values in the config class itself. + */ +export interface MessagePickupModuleConfigOptions { + /** + * Maximum number of messages to retrieve in a single batch message pickup + * + * @default 10 + */ + maximumBatchSize?: number + + /** + * Message pickup protocols to make available to the message pickup module. Only one protocol should be registered for each + * protocol version. + * + * When not provided, V1MessagePickupProtocol and V2MessagePickupProtocol` are registered by default. + * + * @default + * ``` + * [V1MessagePickupProtocol, V2MessagePickupProtocol] + * ``` + */ + protocols: MessagePickupProtocols + + /** + * Allows to specify a custom pickup message queue. It defaults to an in-memory repository + * + */ + messageRepository?: MessageRepository +} + +export class MessagePickupModuleConfig { + private options: MessagePickupModuleConfigOptions + + public constructor(options: MessagePickupModuleConfigOptions) { + this.options = options + } + + /** See {@link MessagePickupModuleConfig.maximumBatchSize} */ + public get maximumBatchSize() { + return this.options.maximumBatchSize ?? 10 + } + + /** See {@link MessagePickupModuleConfig.protocols} */ + public get protocols() { + return this.options.protocols + } + + /** See {@link MessagePickupModuleConfig.protocols} */ + public get messageRepository() { + return this.options.messageRepository + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/__tests__/MessagePickupModule.test.ts" "b/packages/core/src/modules/message-p\303\254ckup/__tests__/MessagePickupModule.test.ts" new file mode 100644 index 0000000000..141d8f81af --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/__tests__/MessagePickupModule.test.ts" @@ -0,0 +1,42 @@ +import { FeatureRegistry } from '../../../agent/FeatureRegistry' +import { Protocol } from '../../../agent/models' +import { DependencyManager } from '../../../plugins/DependencyManager' +import { MessagePickupApi } from '../MessagePickupApi' +import { MessagePickupModule } from '../MessagePickupModule' +import { MessagePickupModuleConfig } from '../MessagePickupModuleConfig' + +jest.mock('../../../plugins/DependencyManager') +const DependencyManagerMock = DependencyManager as jest.Mock + +jest.mock('../../../agent/FeatureRegistry') +const FeatureRegistryMock = FeatureRegistry as jest.Mock + +const dependencyManager = new DependencyManagerMock() +const featureRegistry = new FeatureRegistryMock() + +describe('MessagePickupModule', () => { + test('registers dependencies on the dependency manager', () => { + const module = new MessagePickupModule() + module.register(dependencyManager, featureRegistry) + + expect(dependencyManager.registerContextScoped).toHaveBeenCalledTimes(1) + expect(dependencyManager.registerContextScoped).toHaveBeenCalledWith(MessagePickupApi) + + expect(dependencyManager.registerInstance).toHaveBeenCalledTimes(1) + expect(dependencyManager.registerInstance).toHaveBeenCalledWith(MessagePickupModuleConfig, module.config) + + expect(featureRegistry.register).toHaveBeenCalledTimes(2) + expect(featureRegistry.register).toHaveBeenCalledWith( + new Protocol({ + id: 'https://didcomm.org/messagepickup/1.0', + roles: ['message_holder', 'recipient', 'batch_sender', 'batch_recipient'], + }) + ) + expect(featureRegistry.register).toHaveBeenCalledWith( + new Protocol({ + id: 'https://didcomm.org/messagepickup/2.0', + roles: ['mediator', 'recipient'], + }) + ) + }) +}) diff --git a/packages/core/src/modules/routing/__tests__/pickup.test.ts "b/packages/core/src/modules/message-p\303\254ckup/__tests__/pickup.test.ts" similarity index 95% rename from packages/core/src/modules/routing/__tests__/pickup.test.ts rename to "packages/core/src/modules/message-p\303\254ckup/__tests__/pickup.test.ts" index 2879dacca8..6285f2828d 100644 --- a/packages/core/src/modules/routing/__tests__/pickup.test.ts +++ "b/packages/core/src/modules/message-p\303\254ckup/__tests__/pickup.test.ts" @@ -9,7 +9,6 @@ import { getIndySdkModules } from '../../../../../indy-sdk/tests/setupIndySdkMod import { getAgentOptions, waitForBasicMessage, waitForTrustPingReceivedEvent } from '../../../../tests/helpers' import { Agent } from '../../../agent/Agent' import { HandshakeProtocol } from '../../connections' -import { MediatorPickupStrategy } from '../MediatorPickupStrategy' const recipientOptions = getAgentOptions('Mediation: Recipient Pickup', {}, getIndySdkModules()) const mediatorOptions = getAgentOptions( @@ -81,7 +80,10 @@ describe('E2E Pick Up protocol', () => { const message = 'hello pickup V1' await mediatorAgent.basicMessages.sendMessage(mediatorRecipientConnection.id, message) - await recipientAgent.mediationRecipient.pickupMessages(recipientMediatorConnection, MediatorPickupStrategy.PickUpV1) + await recipientAgent.messagePickup.pickupMessages({ + connectionId: recipientMediatorConnection.id, + protocolVersion: 'v1', + }) const basicMessage = await waitForBasicMessage(recipientAgent, { content: message, @@ -142,7 +144,10 @@ describe('E2E Pick Up protocol', () => { content: message, }) const trustPingPromise = waitForTrustPingReceivedEvent(mediatorAgent, {}) - await recipientAgent.mediationRecipient.pickupMessages(recipientMediatorConnection, MediatorPickupStrategy.PickUpV2) + await recipientAgent.messagePickup.pickupMessages({ + connectionId: recipientMediatorConnection.id, + protocolVersion: 'v2', + }) const basicMessage = await basicMessagePromise expect(basicMessage.content).toBe(message) diff --git "a/packages/core/src/modules/message-p\303\254ckup/index.ts" "b/packages/core/src/modules/message-p\303\254ckup/index.ts" new file mode 100644 index 0000000000..b4745b6037 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/index.ts" @@ -0,0 +1,5 @@ +export * from './MessagePickupApi' +export * from './MessagePickupApiOptions' +export * from './MessagePickupModule' +export * from './MessagePickupModuleConfig' +export * from './protocol' diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/BaseMessagePickupProtocol.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/BaseMessagePickupProtocol.ts" new file mode 100644 index 0000000000..ebbd6fde39 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/BaseMessagePickupProtocol.ts" @@ -0,0 +1,21 @@ +import type { MessagePickupProtocol } from './MessagePickupProtocol' +import type { PickupMessagesProtocolOptions, PickupMessagesProtocolReturnType } from './MessagePickupProtocolOptions' +import type { AgentContext } from '../../../agent' +import type { AgentMessage } from '../../../agent/AgentMessage' +import type { FeatureRegistry } from '../../../agent/FeatureRegistry' +import type { DependencyManager } from '../../../plugins' + +/** + * Base implementation of the MessagePickupProtocol that can be used as a foundation for implementing + * the MessagePickupProtocol interface. + */ +export abstract class BaseMessagePickupProtocol implements MessagePickupProtocol { + public abstract readonly version: string + + public abstract pickupMessages( + agentContext: AgentContext, + options: PickupMessagesProtocolOptions + ): Promise> + + public abstract register(dependencyManager: DependencyManager, featureRegistry: FeatureRegistry): void +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/MessagePickupProtocol.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/MessagePickupProtocol.ts" new file mode 100644 index 0000000000..9acdcf5e4d --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/MessagePickupProtocol.ts" @@ -0,0 +1,16 @@ +import type { PickupMessagesProtocolOptions, PickupMessagesProtocolReturnType } from './MessagePickupProtocolOptions' +import type { AgentContext } from '../../../agent' +import type { AgentMessage } from '../../../agent/AgentMessage' +import type { FeatureRegistry } from '../../../agent/FeatureRegistry' +import type { DependencyManager } from '../../../plugins' + +export interface MessagePickupProtocol { + readonly version: string + + pickupMessages( + agentContext: AgentContext, + options: PickupMessagesProtocolOptions + ): Promise> + + register(dependencyManager: DependencyManager, featureRegistry: FeatureRegistry): void +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/MessagePickupProtocolOptions.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/MessagePickupProtocolOptions.ts" new file mode 100644 index 0000000000..9f3f252c6a --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/MessagePickupProtocolOptions.ts" @@ -0,0 +1,12 @@ +import type { AgentMessage } from '../../../agent/AgentMessage' +import type { ConnectionRecord } from '../../connections' + +export interface PickupMessagesProtocolOptions { + connectionRecord: ConnectionRecord + recipientKey?: string + batchSize?: number +} + +export type PickupMessagesProtocolReturnType = { + message: MessageType +} diff --git a/packages/core/src/modules/routing/protocol/pickup/index.ts "b/packages/core/src/modules/message-p\303\254ckup/protocol/index.ts" similarity index 100% rename from packages/core/src/modules/routing/protocol/pickup/index.ts rename to "packages/core/src/modules/message-p\303\254ckup/protocol/index.ts" diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v1/V1MessagePickupProtocol.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/V1MessagePickupProtocol.ts" new file mode 100644 index 0000000000..581d0d31a7 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/V1MessagePickupProtocol.ts" @@ -0,0 +1,85 @@ +import type { AgentContext } from '../../../../agent' +import type { AgentMessage } from '../../../../agent/AgentMessage' +import type { FeatureRegistry } from '../../../../agent/FeatureRegistry' +import type { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' +import type { DependencyManager } from '../../../../plugins' +import type { MessageRepository } from '../../../../storage/MessageRepository' +import type { PickupMessagesProtocolOptions, PickupMessagesProtocolReturnType } from '../MessagePickupProtocolOptions' + +import { OutboundMessageContext, Protocol } from '../../../../agent/models' +import { InjectionSymbols } from '../../../../constants' +import { injectable } from '../../../../plugins' +import { MessagePickupModuleConfig } from '../../MessagePickupModuleConfig' +import { BaseMessagePickupProtocol } from '../BaseMessagePickupProtocol' + +import { V1BatchHandler, V1BatchPickupHandler } from './handlers' +import { V1BatchMessage, BatchMessageMessage, V1BatchPickupMessage } from './messages' + +@injectable() +export class V1MessagePickupProtocol extends BaseMessagePickupProtocol { + public constructor() { + super() + } + + /** + * The version of the message pickup protocol this class supports + */ + public readonly version = 'v1' as const + + /** + * Registers the protocol implementation (handlers, feature registry) on the agent. + */ + public register(dependencyManager: DependencyManager, featureRegistry: FeatureRegistry): void { + dependencyManager.registerMessageHandlers([new V1BatchPickupHandler(this), new V1BatchHandler()]) + + featureRegistry.register( + new Protocol({ + id: 'https://didcomm.org/messagepickup/1.0', + roles: ['message_holder', 'recipient', 'batch_sender', 'batch_recipient'], + }) + ) + } + + public async pickupMessages( + agentContext: AgentContext, + options: PickupMessagesProtocolOptions + ): Promise> { + const { connectionRecord, batchSize } = options + connectionRecord.assertReady() + + const config = agentContext.dependencyManager.resolve(MessagePickupModuleConfig) + const message = new V1BatchPickupMessage({ + batchSize: batchSize ?? config.maximumBatchSize, + }) + + return { message } + } + + public async processBatchPickup(messageContext: InboundMessageContext) { + // Assert ready connection + const connection = messageContext.assertReadyConnection() + + const { message } = messageContext + + const messageRepository = messageContext.agentContext.dependencyManager.resolve( + InjectionSymbols.MessageRepository + ) + + const messages = await messageRepository.takeFromQueue(connection.id, message.batchSize) + + // TODO: each message should be stored with an id. to be able to conform to the id property + // of batch message + const batchMessages = messages.map( + (msg) => + new BatchMessageMessage({ + message: msg, + }) + ) + + const batchMessage = new V1BatchMessage({ + messages: batchMessages, + }) + + return new OutboundMessageContext(batchMessage, { agentContext: messageContext.agentContext, connection }) + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/V1BatchHandler.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/V1BatchHandler.ts" new file mode 100644 index 0000000000..071711f9e3 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/V1BatchHandler.ts" @@ -0,0 +1,28 @@ +import type { AgentMessageReceivedEvent } from '../../../../../agent/Events' +import type { MessageHandler, MessageHandlerInboundMessage } from '../../../../../agent/MessageHandler' + +import { EventEmitter } from '../../../../../agent/EventEmitter' +import { AgentEventTypes } from '../../../../../agent/Events' +import { V1BatchMessage } from '../messages' + +export class V1BatchHandler implements MessageHandler { + public supportedMessages = [V1BatchMessage] + + public async handle(messageContext: MessageHandlerInboundMessage) { + const { message } = messageContext + const eventEmitter = messageContext.agentContext.dependencyManager.resolve(EventEmitter) + + messageContext.assertReadyConnection() + + const forwardedMessages = message.messages + forwardedMessages.forEach((message) => { + eventEmitter.emit(messageContext.agentContext, { + type: AgentEventTypes.AgentMessageReceived, + payload: { + message: message.message, + contextCorrelationId: messageContext.agentContext.contextCorrelationId, + }, + }) + }) + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/V1BatchPickupHandler.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/V1BatchPickupHandler.ts" new file mode 100644 index 0000000000..d9eee7c4d9 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/V1BatchPickupHandler.ts" @@ -0,0 +1,19 @@ +import type { MessageHandler, MessageHandlerInboundMessage } from '../../../../../agent/MessageHandler' +import type { V1MessagePickupProtocol } from '../V1MessagePickupProtocol' + +import { V1BatchPickupMessage } from '../messages' + +export class V1BatchPickupHandler implements MessageHandler { + private messagePickupService: V1MessagePickupProtocol + public supportedMessages = [V1BatchPickupMessage] + + public constructor(messagePickupService: V1MessagePickupProtocol) { + this.messagePickupService = messagePickupService + } + + public async handle(messageContext: MessageHandlerInboundMessage) { + messageContext.assertReadyConnection() + + return this.messagePickupService.processBatchPickup(messageContext) + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/index.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/index.ts" new file mode 100644 index 0000000000..b8aef88046 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/handlers/index.ts" @@ -0,0 +1,2 @@ +export * from './V1BatchHandler' +export * from './V1BatchPickupHandler' diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v1/index.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/index.ts" new file mode 100644 index 0000000000..abf43d6b2a --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/index.ts" @@ -0,0 +1,2 @@ +export * from './V1MessagePickupProtocol' +export * from './messages' diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchMessage.ts "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/V1BatchMessage.ts" similarity index 74% rename from packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchMessage.ts rename to "packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/V1BatchMessage.ts" index fae8f4d3d6..91e0b5debc 100644 --- a/packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchMessage.ts +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/V1BatchMessage.ts" @@ -1,11 +1,11 @@ import { Type, Expose } from 'class-transformer' import { Matches, IsArray, ValidateNested, IsObject, IsInstance } from 'class-validator' -import { AgentMessage } from '../../../../../../agent/AgentMessage' -import { MessageIdRegExp } from '../../../../../../agent/BaseMessage' -import { EncryptedMessage } from '../../../../../../types' -import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' -import { uuid } from '../../../../../../utils/uuid' +import { AgentMessage } from '../../../../../agent/AgentMessage' +import { MessageIdRegExp } from '../../../../../agent/BaseMessage' +import { EncryptedMessage } from '../../../../../types' +import { IsValidMessageType, parseMessageType } from '../../../../../utils/messageType' +import { uuid } from '../../../../../utils/uuid' export class BatchMessageMessage { public constructor(options: { id?: string; message: EncryptedMessage }) { @@ -32,7 +32,7 @@ export interface BatchMessageOptions { * * @see https://github.com/hyperledger/aries-rfcs/blob/master/features/0212-pickup/README.md#batch */ -export class BatchMessage extends AgentMessage { +export class V1BatchMessage extends AgentMessage { public constructor(options: BatchMessageOptions) { super() @@ -42,8 +42,8 @@ export class BatchMessage extends AgentMessage { } } - @IsValidMessageType(BatchMessage.type) - public readonly type = BatchMessage.type.messageTypeUri + @IsValidMessageType(V1BatchMessage.type) + public readonly type = V1BatchMessage.type.messageTypeUri public static readonly type = parseMessageType('https://didcomm.org/messagepickup/1.0/batch') @Type(() => BatchMessageMessage) diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchPickupMessage.ts "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/V1BatchPickupMessage.ts" similarity index 77% rename from packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchPickupMessage.ts rename to "packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/V1BatchPickupMessage.ts" index 4756bc4416..aa5e7ff646 100644 --- a/packages/core/src/modules/routing/protocol/pickup/v1/messages/BatchPickupMessage.ts +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/V1BatchPickupMessage.ts" @@ -1,8 +1,8 @@ import { Expose } from 'class-transformer' import { IsInt } from 'class-validator' -import { AgentMessage } from '../../../../../../agent/AgentMessage' -import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' +import { AgentMessage } from '../../../../../agent/AgentMessage' +import { IsValidMessageType, parseMessageType } from '../../../../../utils/messageType' export interface BatchPickupMessageOptions { id?: string @@ -14,7 +14,7 @@ export interface BatchPickupMessageOptions { * * @see https://github.com/hyperledger/aries-rfcs/blob/master/features/0212-pickup/README.md#batch-pickup */ -export class BatchPickupMessage extends AgentMessage { +export class V1BatchPickupMessage extends AgentMessage { /** * Create new BatchPickupMessage instance. * @@ -29,8 +29,8 @@ export class BatchPickupMessage extends AgentMessage { } } - @IsValidMessageType(BatchPickupMessage.type) - public readonly type = BatchPickupMessage.type.messageTypeUri + @IsValidMessageType(V1BatchPickupMessage.type) + public readonly type = V1BatchPickupMessage.type.messageTypeUri public static readonly type = parseMessageType('https://didcomm.org/messagepickup/1.0/batch-pickup') @IsInt() diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/index.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/index.ts" new file mode 100644 index 0000000000..19c16cf1d8 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v1/messages/index.ts" @@ -0,0 +1,2 @@ +export * from './V1BatchMessage' +export * from './V1BatchPickupMessage' diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v2/V2MessagePickupProtocol.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/V2MessagePickupProtocol.ts" new file mode 100644 index 0000000000..b9dc22ae7e --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/V2MessagePickupProtocol.ts" @@ -0,0 +1,253 @@ +import type { AgentContext } from '../../../../agent' +import type { AgentMessage } from '../../../../agent/AgentMessage' +import type { AgentMessageReceivedEvent } from '../../../../agent/Events' +import type { FeatureRegistry } from '../../../../agent/FeatureRegistry' +import type { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' +import type { DependencyManager } from '../../../../plugins' +import type { MessageRepository } from '../../../../storage/MessageRepository' +import type { EncryptedMessage } from '../../../../types' +import type { PickupMessagesProtocolOptions, PickupMessagesProtocolReturnType } from '../MessagePickupProtocolOptions' + +import { EventEmitter } from '../../../../agent/EventEmitter' +import { AgentEventTypes } from '../../../../agent/Events' +import { MessageSender } from '../../../../agent/MessageSender' +import { OutboundMessageContext, Protocol } from '../../../../agent/models' +import { InjectionSymbols } from '../../../../constants' +import { Attachment } from '../../../../decorators/attachment/Attachment' +import { AriesFrameworkError } from '../../../../error' +import { injectable } from '../../../../plugins' +import { ConnectionService } from '../../../connections' +import { ProblemReportError } from '../../../problem-reports' +import { RoutingProblemReportReason } from '../../../routing/error' +import { MessagePickupModuleConfig } from '../../MessagePickupModuleConfig' +import { BaseMessagePickupProtocol } from '../BaseMessagePickupProtocol' + +import { + V2DeliveryRequestHandler, + V2MessageDeliveryHandler, + V2MessagesReceivedHandler, + V2StatusHandler, + V2StatusRequestHandler, +} from './handlers' +import { + V2MessageDeliveryMessage, + V2StatusMessage, + V2DeliveryRequestMessage, + V2MessagesReceivedMessage, + V2StatusRequestMessage, +} from './messages' + +@injectable() +export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { + public constructor() { + super() + } + + /** + * The version of the message pickup protocol this class supports + */ + public readonly version = 'v2' as const + + /** + * Registers the protocol implementation (handlers, feature registry) on the agent. + */ + public register(dependencyManager: DependencyManager, featureRegistry: FeatureRegistry): void { + dependencyManager.registerMessageHandlers([ + new V2StatusRequestHandler(this), + new V2DeliveryRequestHandler(this), + new V2MessagesReceivedHandler(this), + new V2StatusHandler(this), + new V2MessageDeliveryHandler(this), + ]) + + featureRegistry.register( + new Protocol({ + id: 'https://didcomm.org/messagepickup/2.0', + roles: ['mediator', 'recipient'], + }) + ) + } + + public async pickupMessages( + agentContext: AgentContext, + options: PickupMessagesProtocolOptions + ): Promise> { + const { connectionRecord, recipientKey } = options + connectionRecord.assertReady() + + const message = new V2StatusRequestMessage({ + recipientKey, + }) + + return { message } + } + + public async processStatusRequest(messageContext: InboundMessageContext) { + // Assert ready connection + const connection = messageContext.assertReadyConnection() + + const messageRepository = messageContext.agentContext.dependencyManager.resolve( + InjectionSymbols.MessageRepository + ) + + if (messageContext.message.recipientKey) { + throw new AriesFrameworkError('recipient_key parameter not supported') + } + + const statusMessage = new V2StatusMessage({ + threadId: messageContext.message.threadId, + messageCount: await messageRepository.getAvailableMessageCount(connection.id), + }) + + return new OutboundMessageContext(statusMessage, { agentContext: messageContext.agentContext, connection }) + } + + public async processDeliveryRequest(messageContext: InboundMessageContext) { + // Assert ready connection + const connection = messageContext.assertReadyConnection() + + if (messageContext.message.recipientKey) { + throw new AriesFrameworkError('recipient_key parameter not supported') + } + + const { message } = messageContext + + const messageRepository = messageContext.agentContext.dependencyManager.resolve( + InjectionSymbols.MessageRepository + ) + + // Get available messages from queue, but don't delete them + const messages = await messageRepository.takeFromQueue(connection.id, message.limit, true) + + // TODO: each message should be stored with an id. to be able to conform to the id property + // of delivery message + const attachments = messages.map( + (msg) => + new Attachment({ + data: { + json: msg, + }, + }) + ) + + const outboundMessageContext = + messages.length > 0 + ? new V2MessageDeliveryMessage({ + threadId: messageContext.message.threadId, + attachments, + }) + : new V2StatusMessage({ + threadId: messageContext.message.threadId, + messageCount: 0, + }) + + return new OutboundMessageContext(outboundMessageContext, { agentContext: messageContext.agentContext, connection }) + } + + public async processMessagesReceived(messageContext: InboundMessageContext) { + // Assert ready connection + const connection = messageContext.assertReadyConnection() + + const { message } = messageContext + + const messageRepository = messageContext.agentContext.dependencyManager.resolve( + InjectionSymbols.MessageRepository + ) + + // TODO: Add Queued Message ID + await messageRepository.takeFromQueue( + connection.id, + message.messageIdList ? message.messageIdList.length : undefined + ) + + const statusMessage = new V2StatusMessage({ + threadId: messageContext.message.threadId, + messageCount: await messageRepository.getAvailableMessageCount(connection.id), + }) + + return new OutboundMessageContext(statusMessage, { agentContext: messageContext.agentContext, connection }) + } + + public async processStatus(messageContext: InboundMessageContext) { + const connection = messageContext.assertReadyConnection() + const { message: statusMessage } = messageContext + const { messageCount, recipientKey } = statusMessage + + const connectionService = messageContext.agentContext.dependencyManager.resolve(ConnectionService) + const messageSender = messageContext.agentContext.dependencyManager.resolve(MessageSender) + const messagePickupModuleConfig = messageContext.agentContext.dependencyManager.resolve(MessagePickupModuleConfig) + + //No messages to be sent + if (messageCount === 0) { + const { message, connectionRecord } = await connectionService.createTrustPing( + messageContext.agentContext, + connection, + { + responseRequested: false, + } + ) + + // FIXME: check where this flow fits, as it seems very particular for the AFJ-ACA-Py combination + const websocketSchemes = ['ws', 'wss'] + + await messageSender.sendMessage( + new OutboundMessageContext(message, { + agentContext: messageContext.agentContext, + connection: connectionRecord, + }), + { + transportPriority: { + schemes: websocketSchemes, + restrictive: true, + // TODO: add keepAlive: true to enforce through the public api + // we need to keep the socket alive. It already works this way, but would + // be good to make more explicit from the public facing API. + // This would also make it easier to change the internal API later on. + // keepAlive: true, + }, + } + ) + + return null + } + const { maximumBatchSize: maximumMessagePickup } = messagePickupModuleConfig + const limit = messageCount < maximumMessagePickup ? messageCount : maximumMessagePickup + + const deliveryRequestMessage = new V2DeliveryRequestMessage({ + limit, + recipientKey, + }) + + return deliveryRequestMessage + } + + public async processDelivery(messageContext: InboundMessageContext) { + messageContext.assertReadyConnection() + + const { appendedAttachments } = messageContext.message + + const eventEmitter = messageContext.agentContext.dependencyManager.resolve(EventEmitter) + + if (!appendedAttachments) + throw new ProblemReportError('Error processing attachments', { + problemCode: RoutingProblemReportReason.ErrorProcessingAttachments, + }) + + const ids: string[] = [] + for (const attachment of appendedAttachments) { + ids.push(attachment.id) + + eventEmitter.emit(messageContext.agentContext, { + type: AgentEventTypes.AgentMessageReceived, + payload: { + message: attachment.getDataAsJson(), + contextCorrelationId: messageContext.agentContext.contextCorrelationId, + }, + }) + } + + return new V2MessagesReceivedMessage({ + messageIdList: ids, + }) + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v2/__tests__/V2MessagePickupProtocol.test.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/__tests__/V2MessagePickupProtocol.test.ts" new file mode 100644 index 0000000000..50476217f9 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/__tests__/V2MessagePickupProtocol.test.ts" @@ -0,0 +1,407 @@ +import type { EncryptedMessage } from '../../../../../types' + +import { getAgentContext, getMockConnection, mockFunction } from '../../../../../../tests/helpers' +import { EventEmitter } from '../../../../../agent/EventEmitter' +import { AgentEventTypes } from '../../../../../agent/Events' +import { MessageSender } from '../../../../../agent/MessageSender' +import { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' +import { InjectionSymbols } from '../../../../../constants' +import { Attachment } from '../../../../../decorators/attachment/Attachment' +import { AriesFrameworkError } from '../../../../../error' +import { InMemoryMessageRepository } from '../../../../../storage/InMemoryMessageRepository' +import { uuid } from '../../../../../utils/uuid' +import { DidExchangeState, TrustPingMessage } from '../../../../connections' +import { ConnectionService } from '../../../../connections/services/ConnectionService' +import { MessagePickupModuleConfig } from '../../../MessagePickupModuleConfig' +import { V1MessagePickupProtocol } from '../../v1' +import { V2MessagePickupProtocol } from '../V2MessagePickupProtocol' +import { + V2DeliveryRequestMessage, + V2MessageDeliveryMessage, + V2MessagesReceivedMessage, + V2StatusMessage, + V2StatusRequestMessage, +} from '../messages' + +const mockConnection = getMockConnection({ + state: DidExchangeState.Completed, +}) + +// Mock classes +jest.mock('../../../../../storage/InMemoryMessageRepository') +jest.mock('../../../../../agent/EventEmitter') +jest.mock('../../../../../agent/MessageSender') +jest.mock('../../../../connections/services/ConnectionService') + +// Mock typed object +const InMessageRepositoryMock = InMemoryMessageRepository as jest.Mock +const EventEmitterMock = EventEmitter as jest.Mock +const MessageSenderMock = MessageSender as jest.Mock +const ConnectionServiceMock = ConnectionService as jest.Mock + +const messagePickupModuleConfig = new MessagePickupModuleConfig({ + maximumBatchSize: 10, + protocols: [new V1MessagePickupProtocol(), new V2MessagePickupProtocol()], +}) +const messageSender = new MessageSenderMock() +const eventEmitter = new EventEmitterMock() +const connectionService = new ConnectionServiceMock() +const messageRepository = new InMessageRepositoryMock() + +const agentContext = getAgentContext({ + registerInstances: [ + [InjectionSymbols.MessageRepository, messageRepository], + [EventEmitter, eventEmitter], + [MessageSender, messageSender], + [ConnectionService, connectionService], + [MessagePickupModuleConfig, messagePickupModuleConfig], + ], +}) + +const encryptedMessage: EncryptedMessage = { + protected: 'base64url', + iv: 'base64url', + ciphertext: 'base64url', + tag: 'base64url', +} +const queuedMessages = [encryptedMessage, encryptedMessage, encryptedMessage] + +describe('V2MessagePickupService', () => { + let pickupProtocol: V2MessagePickupProtocol + + beforeEach(async () => { + pickupProtocol = new V2MessagePickupProtocol() + }) + + describe('processStatusRequest', () => { + test('no available messages in queue', async () => { + mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(0) + + const statusRequest = new V2StatusRequestMessage({}) + + const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) + + const { connection, message } = await pickupProtocol.processStatusRequest(messageContext) + + expect(connection).toEqual(mockConnection) + expect(message).toEqual( + new V2StatusMessage({ + id: message.id, + threadId: statusRequest.threadId, + messageCount: 0, + }) + ) + expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) + }) + + test('multiple messages in queue', async () => { + mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(5) + const statusRequest = new V2StatusRequestMessage({}) + + const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) + + const { connection, message } = await pickupProtocol.processStatusRequest(messageContext) + + expect(connection).toEqual(mockConnection) + expect(message).toEqual( + new V2StatusMessage({ + id: message.id, + threadId: statusRequest.threadId, + messageCount: 5, + }) + ) + expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) + }) + + test('status request specifying recipient key', async () => { + mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(10) + + const statusRequest = new V2StatusRequestMessage({ + recipientKey: 'recipientKey', + }) + + const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) + + await expect(pickupProtocol.processStatusRequest(messageContext)).rejects.toThrowError( + 'recipient_key parameter not supported' + ) + }) + }) + + describe('processDeliveryRequest', () => { + test('no available messages in queue', async () => { + mockFunction(messageRepository.takeFromQueue).mockReturnValue([]) + + const deliveryRequest = new V2DeliveryRequestMessage({ limit: 10 }) + + const messageContext = new InboundMessageContext(deliveryRequest, { connection: mockConnection, agentContext }) + + const { connection, message } = await pickupProtocol.processDeliveryRequest(messageContext) + + expect(connection).toEqual(mockConnection) + expect(message).toEqual( + new V2StatusMessage({ + id: message.id, + threadId: deliveryRequest.threadId, + messageCount: 0, + }) + ) + expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 10, true) + }) + + test('less messages in queue than limit', async () => { + mockFunction(messageRepository.takeFromQueue).mockReturnValue(queuedMessages) + + const deliveryRequest = new V2DeliveryRequestMessage({ limit: 10 }) + + const messageContext = new InboundMessageContext(deliveryRequest, { connection: mockConnection, agentContext }) + + const { connection, message } = await pickupProtocol.processDeliveryRequest(messageContext) + + expect(connection).toEqual(mockConnection) + expect(message).toBeInstanceOf(V2MessageDeliveryMessage) + expect(message.threadId).toEqual(deliveryRequest.threadId) + expect(message.appendedAttachments?.length).toEqual(3) + expect(message.appendedAttachments).toEqual( + expect.arrayContaining( + queuedMessages.map((msg) => + expect.objectContaining({ + data: { + json: msg, + }, + }) + ) + ) + ) + expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 10, true) + }) + + test('more messages in queue than limit', async () => { + mockFunction(messageRepository.takeFromQueue).mockReturnValue(queuedMessages.slice(0, 2)) + + const deliveryRequest = new V2DeliveryRequestMessage({ limit: 2 }) + + const messageContext = new InboundMessageContext(deliveryRequest, { connection: mockConnection, agentContext }) + + const { connection, message } = await pickupProtocol.processDeliveryRequest(messageContext) + + expect(connection).toEqual(mockConnection) + expect(message).toBeInstanceOf(V2MessageDeliveryMessage) + expect(message.threadId).toEqual(deliveryRequest.threadId) + expect(message.appendedAttachments?.length).toEqual(2) + expect(message.appendedAttachments).toEqual( + expect.arrayContaining( + queuedMessages.slice(0, 2).map((msg) => + expect.objectContaining({ + data: { + json: msg, + }, + }) + ) + ) + ) + expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 2, true) + }) + + test('delivery request specifying recipient key', async () => { + mockFunction(messageRepository.takeFromQueue).mockReturnValue(queuedMessages) + + const statusRequest = new V2DeliveryRequestMessage({ + limit: 10, + recipientKey: 'recipientKey', + }) + + const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) + + await expect(pickupProtocol.processStatusRequest(messageContext)).rejects.toThrowError( + 'recipient_key parameter not supported' + ) + }) + }) + + describe('processMessagesReceived', () => { + test('messages received partially', async () => { + mockFunction(messageRepository.takeFromQueue).mockReturnValue(queuedMessages) + mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(4) + + const messagesReceived = new V2MessagesReceivedMessage({ + messageIdList: ['1', '2'], + }) + + const messageContext = new InboundMessageContext(messagesReceived, { connection: mockConnection, agentContext }) + + const { connection, message } = await pickupProtocol.processMessagesReceived(messageContext) + + expect(connection).toEqual(mockConnection) + expect(message).toEqual( + new V2StatusMessage({ + id: message.id, + threadId: messagesReceived.threadId, + messageCount: 4, + }) + ) + expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) + expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 2) + }) + + test('all messages have been received', async () => { + mockFunction(messageRepository.takeFromQueue).mockReturnValue(queuedMessages) + mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(0) + + const messagesReceived = new V2MessagesReceivedMessage({ + messageIdList: ['1', '2'], + }) + + const messageContext = new InboundMessageContext(messagesReceived, { connection: mockConnection, agentContext }) + + const { connection, message } = await pickupProtocol.processMessagesReceived(messageContext) + + expect(connection).toEqual(mockConnection) + expect(message).toEqual( + new V2StatusMessage({ + id: message.id, + threadId: messagesReceived.threadId, + messageCount: 0, + }) + ) + + expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) + expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 2) + }) + }) + + describe('pickupMessages', () => { + it('creates a status request message', async () => { + const { message: statusRequestMessage } = await pickupProtocol.pickupMessages(agentContext, { + connectionRecord: mockConnection, + recipientKey: 'a-key', + }) + + expect(statusRequestMessage).toMatchObject({ + id: expect.any(String), + recipientKey: 'a-key', + }) + }) + }) + + describe('processStatus', () => { + it('if status request has a message count of zero returns nothing', async () => { + const status = new V2StatusMessage({ + threadId: uuid(), + messageCount: 0, + }) + + mockFunction(connectionService.createTrustPing).mockResolvedValueOnce({ + message: new TrustPingMessage({}), + connectionRecord: mockConnection, + }) + + const messageContext = new InboundMessageContext(status, { connection: mockConnection, agentContext }) + const deliveryRequestMessage = await pickupProtocol.processStatus(messageContext) + expect(deliveryRequestMessage).toBeNull() + }) + + it('if it has a message count greater than zero return a valid delivery request', async () => { + const status = new V2StatusMessage({ + threadId: uuid(), + messageCount: 1, + }) + const messageContext = new InboundMessageContext(status, { connection: mockConnection, agentContext }) + + const deliveryRequestMessage = await pickupProtocol.processStatus(messageContext) + expect(deliveryRequestMessage) + expect(deliveryRequestMessage).toEqual(new V2DeliveryRequestMessage({ id: deliveryRequestMessage?.id, limit: 1 })) + }) + }) + + describe('processDelivery', () => { + it('if the delivery has no attachments expect an error', async () => { + const messageContext = new InboundMessageContext({} as V2MessageDeliveryMessage, { + connection: mockConnection, + agentContext, + }) + + await expect(pickupProtocol.processDelivery(messageContext)).rejects.toThrowError( + new AriesFrameworkError('Error processing attachments') + ) + }) + + it('should return a message received with an message id list in it', async () => { + const messageDeliveryMessage = new V2MessageDeliveryMessage({ + threadId: uuid(), + attachments: [ + new Attachment({ + id: '1', + data: { + json: { + a: 'value', + }, + }, + }), + ], + }) + const messageContext = new InboundMessageContext(messageDeliveryMessage, { + connection: mockConnection, + agentContext, + }) + + const messagesReceivedMessage = await pickupProtocol.processDelivery(messageContext) + + expect(messagesReceivedMessage).toEqual( + new V2MessagesReceivedMessage({ + id: messagesReceivedMessage.id, + messageIdList: ['1'], + }) + ) + }) + + it('calls the event emitter for each message', async () => { + // This is to not take into account events previously emitted + jest.clearAllMocks() + + const messageDeliveryMessage = new V2MessageDeliveryMessage({ + threadId: uuid(), + attachments: [ + new Attachment({ + id: '1', + data: { + json: { + first: 'value', + }, + }, + }), + new Attachment({ + id: '2', + data: { + json: { + second: 'value', + }, + }, + }), + ], + }) + const messageContext = new InboundMessageContext(messageDeliveryMessage, { + connection: mockConnection, + agentContext, + }) + + await pickupProtocol.processDelivery(messageContext) + + expect(eventEmitter.emit).toHaveBeenCalledTimes(2) + expect(eventEmitter.emit).toHaveBeenNthCalledWith(1, agentContext, { + type: AgentEventTypes.AgentMessageReceived, + payload: { + message: { first: 'value' }, + contextCorrelationId: agentContext.contextCorrelationId, + }, + }) + expect(eventEmitter.emit).toHaveBeenNthCalledWith(2, agentContext, { + type: AgentEventTypes.AgentMessageReceived, + payload: { + message: { second: 'value' }, + contextCorrelationId: agentContext.contextCorrelationId, + }, + }) + }) + }) +}) diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2DeliveryRequestHandler.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2DeliveryRequestHandler.ts" new file mode 100644 index 0000000000..b935dcd512 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2DeliveryRequestHandler.ts" @@ -0,0 +1,19 @@ +import type { MessageHandler } from '../../../../../agent/MessageHandler' +import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' +import type { V2MessagePickupProtocol } from '../V2MessagePickupProtocol' + +import { V2DeliveryRequestMessage } from '../messages' + +export class V2DeliveryRequestHandler implements MessageHandler { + public supportedMessages = [V2DeliveryRequestMessage] + private messagePickupService: V2MessagePickupProtocol + + public constructor(messagePickupService: V2MessagePickupProtocol) { + this.messagePickupService = messagePickupService + } + + public async handle(messageContext: InboundMessageContext) { + messageContext.assertReadyConnection() + return this.messagePickupService.processDeliveryRequest(messageContext) + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2MessageDeliveryHandler.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2MessageDeliveryHandler.ts" new file mode 100644 index 0000000000..918b3f37b8 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2MessageDeliveryHandler.ts" @@ -0,0 +1,27 @@ +import type { MessageHandler } from '../../../../../agent/MessageHandler' +import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' +import type { V2MessagePickupProtocol } from '../V2MessagePickupProtocol' + +import { OutboundMessageContext } from '../../../../../agent/models' +import { V2MessageDeliveryMessage } from '../messages/V2MessageDeliveryMessage' + +export class V2MessageDeliveryHandler implements MessageHandler { + public supportedMessages = [V2MessageDeliveryMessage] + private messagePickupService: V2MessagePickupProtocol + + public constructor(messagePickupService: V2MessagePickupProtocol) { + this.messagePickupService = messagePickupService + } + + public async handle(messageContext: InboundMessageContext) { + const connection = messageContext.assertReadyConnection() + const deliveryReceivedMessage = await this.messagePickupService.processDelivery(messageContext) + + if (deliveryReceivedMessage) { + return new OutboundMessageContext(deliveryReceivedMessage, { + agentContext: messageContext.agentContext, + connection, + }) + } + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2MessagesReceivedHandler.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2MessagesReceivedHandler.ts" new file mode 100644 index 0000000000..5820c4878c --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2MessagesReceivedHandler.ts" @@ -0,0 +1,19 @@ +import type { MessageHandler } from '../../../../../agent/MessageHandler' +import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' +import type { V2MessagePickupProtocol } from '../V2MessagePickupProtocol' + +import { V2MessagesReceivedMessage } from '../messages' + +export class V2MessagesReceivedHandler implements MessageHandler { + public supportedMessages = [V2MessagesReceivedMessage] + private messagePickupService: V2MessagePickupProtocol + + public constructor(messagePickupService: V2MessagePickupProtocol) { + this.messagePickupService = messagePickupService + } + + public async handle(messageContext: InboundMessageContext) { + messageContext.assertReadyConnection() + return this.messagePickupService.processMessagesReceived(messageContext) + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2StatusHandler.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2StatusHandler.ts" new file mode 100644 index 0000000000..0e4d1467f2 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2StatusHandler.ts" @@ -0,0 +1,27 @@ +import type { MessageHandler } from '../../../../../agent/MessageHandler' +import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' +import type { V2MessagePickupProtocol } from '../V2MessagePickupProtocol' + +import { OutboundMessageContext } from '../../../../../agent/models' +import { V2StatusMessage } from '../messages' + +export class V2StatusHandler implements MessageHandler { + public supportedMessages = [V2StatusMessage] + private messagePickupService: V2MessagePickupProtocol + + public constructor(messagePickupService: V2MessagePickupProtocol) { + this.messagePickupService = messagePickupService + } + + public async handle(messageContext: InboundMessageContext) { + const connection = messageContext.assertReadyConnection() + const deliveryRequestMessage = await this.messagePickupService.processStatus(messageContext) + + if (deliveryRequestMessage) { + return new OutboundMessageContext(deliveryRequestMessage, { + agentContext: messageContext.agentContext, + connection, + }) + } + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2StatusRequestHandler.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2StatusRequestHandler.ts" new file mode 100644 index 0000000000..b9e365b8a4 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/V2StatusRequestHandler.ts" @@ -0,0 +1,19 @@ +import type { MessageHandler } from '../../../../../agent/MessageHandler' +import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' +import type { V2MessagePickupProtocol } from '../V2MessagePickupProtocol' + +import { V2StatusRequestMessage } from '../messages' + +export class V2StatusRequestHandler implements MessageHandler { + public supportedMessages = [V2StatusRequestMessage] + private messagePickupService: V2MessagePickupProtocol + + public constructor(messagePickupService: V2MessagePickupProtocol) { + this.messagePickupService = messagePickupService + } + + public async handle(messageContext: InboundMessageContext) { + messageContext.assertReadyConnection() + return this.messagePickupService.processStatusRequest(messageContext) + } +} diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/index.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/index.ts" new file mode 100644 index 0000000000..5f54b56ac7 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/handlers/index.ts" @@ -0,0 +1,5 @@ +export * from './V2DeliveryRequestHandler' +export * from './V2MessageDeliveryHandler' +export * from './V2MessagesReceivedHandler' +export * from './V2StatusHandler' +export * from './V2StatusRequestHandler' diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v2/index.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/index.ts" new file mode 100644 index 0000000000..90567cdaf4 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/index.ts" @@ -0,0 +1,2 @@ +export * from './V2MessagePickupProtocol' +export * from './messages' diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/messages/DeliveryRequestMessage.ts "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2DeliveryRequestMessage.ts" similarity index 59% rename from packages/core/src/modules/routing/protocol/pickup/v2/messages/DeliveryRequestMessage.ts rename to "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2DeliveryRequestMessage.ts" index 21e044309a..b7c37bf426 100644 --- a/packages/core/src/modules/routing/protocol/pickup/v2/messages/DeliveryRequestMessage.ts +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2DeliveryRequestMessage.ts" @@ -1,18 +1,18 @@ import { Expose } from 'class-transformer' import { IsInt, IsOptional, IsString } from 'class-validator' -import { AgentMessage } from '../../../../../../agent/AgentMessage' -import { ReturnRouteTypes } from '../../../../../../decorators/transport/TransportDecorator' -import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' +import { AgentMessage } from '../../../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../../../decorators/transport/TransportDecorator' +import { IsValidMessageType, parseMessageType } from '../../../../../utils/messageType' -export interface DeliveryRequestMessageOptions { +export interface V2DeliveryRequestMessageOptions { id?: string recipientKey?: string limit: number } -export class DeliveryRequestMessage extends AgentMessage { - public constructor(options: DeliveryRequestMessageOptions) { +export class V2DeliveryRequestMessage extends AgentMessage { + public constructor(options: V2DeliveryRequestMessageOptions) { super() if (options) { @@ -23,8 +23,8 @@ export class DeliveryRequestMessage extends AgentMessage { this.setReturnRouting(ReturnRouteTypes.all) } - @IsValidMessageType(DeliveryRequestMessage.type) - public readonly type = DeliveryRequestMessage.type.messageTypeUri + @IsValidMessageType(V2DeliveryRequestMessage.type) + public readonly type = V2DeliveryRequestMessage.type.messageTypeUri public static readonly type = parseMessageType('https://didcomm.org/messagepickup/2.0/delivery-request') @IsString() diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/messages/MessageDeliveryMessage.ts "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2MessageDeliveryMessage.ts" similarity index 57% rename from packages/core/src/modules/routing/protocol/pickup/v2/messages/MessageDeliveryMessage.ts rename to "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2MessageDeliveryMessage.ts" index fc3e215720..48783f634b 100644 --- a/packages/core/src/modules/routing/protocol/pickup/v2/messages/MessageDeliveryMessage.ts +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2MessageDeliveryMessage.ts" @@ -1,21 +1,21 @@ -import type { Attachment } from '../../../../../../decorators/attachment/Attachment' +import type { Attachment } from '../../../../../decorators/attachment/Attachment' import { Expose } from 'class-transformer' import { IsOptional, IsString } from 'class-validator' -import { AgentMessage } from '../../../../../../agent/AgentMessage' -import { ReturnRouteTypes } from '../../../../../../decorators/transport/TransportDecorator' -import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' +import { AgentMessage } from '../../../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../../../decorators/transport/TransportDecorator' +import { IsValidMessageType, parseMessageType } from '../../../../../utils/messageType' -export interface MessageDeliveryMessageOptions { +export interface V2MessageDeliveryMessageOptions { id?: string recipientKey?: string threadId: string attachments: Attachment[] } -export class MessageDeliveryMessage extends AgentMessage { - public constructor(options: MessageDeliveryMessageOptions) { +export class V2MessageDeliveryMessage extends AgentMessage { + public constructor(options: V2MessageDeliveryMessageOptions) { super() if (options) { @@ -29,8 +29,8 @@ export class MessageDeliveryMessage extends AgentMessage { this.setReturnRouting(ReturnRouteTypes.all) } - @IsValidMessageType(MessageDeliveryMessage.type) - public readonly type = MessageDeliveryMessage.type.messageTypeUri + @IsValidMessageType(V2MessageDeliveryMessage.type) + public readonly type = V2MessageDeliveryMessage.type.messageTypeUri public static readonly type = parseMessageType('https://didcomm.org/messagepickup/2.0/delivery') @IsString() diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/messages/MessagesReceivedMessage.ts "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2MessagesReceivedMessage.ts" similarity index 55% rename from packages/core/src/modules/routing/protocol/pickup/v2/messages/MessagesReceivedMessage.ts rename to "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2MessagesReceivedMessage.ts" index be59ba7639..23da433de6 100644 --- a/packages/core/src/modules/routing/protocol/pickup/v2/messages/MessagesReceivedMessage.ts +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2MessagesReceivedMessage.ts" @@ -1,17 +1,17 @@ import { Expose } from 'class-transformer' import { IsArray, IsOptional } from 'class-validator' -import { AgentMessage } from '../../../../../../agent/AgentMessage' -import { ReturnRouteTypes } from '../../../../../../decorators/transport/TransportDecorator' -import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' +import { AgentMessage } from '../../../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../../../decorators/transport/TransportDecorator' +import { IsValidMessageType, parseMessageType } from '../../../../../utils/messageType' -export interface MessagesReceivedMessageOptions { +export interface V2MessagesReceivedMessageOptions { id?: string messageIdList: string[] } -export class MessagesReceivedMessage extends AgentMessage { - public constructor(options: MessagesReceivedMessageOptions) { +export class V2MessagesReceivedMessage extends AgentMessage { + public constructor(options: V2MessagesReceivedMessageOptions) { super() if (options) { @@ -21,8 +21,8 @@ export class MessagesReceivedMessage extends AgentMessage { this.setReturnRouting(ReturnRouteTypes.all) } - @IsValidMessageType(MessagesReceivedMessage.type) - public readonly type = MessagesReceivedMessage.type.messageTypeUri + @IsValidMessageType(V2MessagesReceivedMessage.type) + public readonly type = V2MessagesReceivedMessage.type.messageTypeUri public static readonly type = parseMessageType('https://didcomm.org/messagepickup/2.0/messages-received') @IsArray() diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusMessage.ts "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2StatusMessage.ts" similarity index 79% rename from packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusMessage.ts rename to "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2StatusMessage.ts" index 8e2851ba6c..a28296742e 100644 --- a/packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusMessage.ts +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2StatusMessage.ts" @@ -1,12 +1,12 @@ import { Expose, Transform } from 'class-transformer' import { IsBoolean, IsDate, IsInt, IsOptional, IsString } from 'class-validator' -import { AgentMessage } from '../../../../../../agent/AgentMessage' -import { ReturnRouteTypes } from '../../../../../../decorators/transport/TransportDecorator' -import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' -import { DateParser } from '../../../../../../utils/transformers' +import { AgentMessage } from '../../../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../../../decorators/transport/TransportDecorator' +import { IsValidMessageType, parseMessageType } from '../../../../../utils/messageType' +import { DateParser } from '../../../../../utils/transformers' -export interface StatusMessageOptions { +export interface V2StatusMessageOptions { id?: string recipientKey?: string threadId: string @@ -18,8 +18,8 @@ export interface StatusMessageOptions { liveDelivery?: boolean } -export class StatusMessage extends AgentMessage { - public constructor(options: StatusMessageOptions) { +export class V2StatusMessage extends AgentMessage { + public constructor(options: V2StatusMessageOptions) { super() if (options) { this.id = options.id || this.generateId() @@ -37,8 +37,8 @@ export class StatusMessage extends AgentMessage { this.setReturnRouting(ReturnRouteTypes.all) } - @IsValidMessageType(StatusMessage.type) - public readonly type = StatusMessage.type.messageTypeUri + @IsValidMessageType(V2StatusMessage.type) + public readonly type = V2StatusMessage.type.messageTypeUri public static readonly type = parseMessageType('https://didcomm.org/messagepickup/2.0/status') @IsString() diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusRequestMessage.ts "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2StatusRequestMessage.ts" similarity index 59% rename from packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusRequestMessage.ts rename to "packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2StatusRequestMessage.ts" index c25c1d8c4a..eb6908bae2 100644 --- a/packages/core/src/modules/routing/protocol/pickup/v2/messages/StatusRequestMessage.ts +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/V2StatusRequestMessage.ts" @@ -1,16 +1,16 @@ import { Expose } from 'class-transformer' import { IsOptional, IsString } from 'class-validator' -import { AgentMessage } from '../../../../../../agent/AgentMessage' -import { IsValidMessageType, parseMessageType } from '../../../../../../utils/messageType' +import { AgentMessage } from '../../../../../agent/AgentMessage' +import { IsValidMessageType, parseMessageType } from '../../../../../utils/messageType' -export interface StatusRequestMessageOptions { +export interface V2StatusRequestMessageOptions { id?: string recipientKey?: string } -export class StatusRequestMessage extends AgentMessage { - public constructor(options: StatusRequestMessageOptions) { +export class V2StatusRequestMessage extends AgentMessage { + public constructor(options: V2StatusRequestMessageOptions) { super() if (options) { @@ -19,8 +19,8 @@ export class StatusRequestMessage extends AgentMessage { } } - @IsValidMessageType(StatusRequestMessage.type) - public readonly type = StatusRequestMessage.type.messageTypeUri + @IsValidMessageType(V2StatusRequestMessage.type) + public readonly type = V2StatusRequestMessage.type.messageTypeUri public static readonly type = parseMessageType('https://didcomm.org/messagepickup/2.0/status-request') @IsString() diff --git "a/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/index.ts" "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/index.ts" new file mode 100644 index 0000000000..4746216ec0 --- /dev/null +++ "b/packages/core/src/modules/message-p\303\254ckup/protocol/v2/messages/index.ts" @@ -0,0 +1,5 @@ +export * from './V2DeliveryRequestMessage' +export * from './V2MessageDeliveryMessage' +export * from './V2MessagesReceivedMessage' +export * from './V2StatusMessage' +export * from './V2StatusRequestMessage' diff --git a/packages/core/src/modules/routing/MediationRecipientApi.ts b/packages/core/src/modules/routing/MediationRecipientApi.ts index 7643542ee5..dc8a1023c1 100644 --- a/packages/core/src/modules/routing/MediationRecipientApi.ts +++ b/packages/core/src/modules/routing/MediationRecipientApi.ts @@ -23,6 +23,9 @@ import { ConnectionService } from '../connections/services' import { DidsApi } from '../dids' import { verkeyToDidKey } from '../dids/helpers' import { DiscoverFeaturesApi } from '../discover-features' +import { MessagePickupApi } from '../message-pìckup/MessagePickupApi' +import { V1BatchPickupMessage } from '../message-pìckup/protocol/v1' +import { V2StatusMessage } from '../message-pìckup/protocol/v2' import { MediationRecipientModuleConfig } from './MediationRecipientModuleConfig' import { MediatorPickupStrategy } from './MediatorPickupStrategy' @@ -32,8 +35,6 @@ import { MediationDenyHandler } from './handlers/MediationDenyHandler' import { MediationGrantHandler } from './handlers/MediationGrantHandler' import { KeylistUpdate, KeylistUpdateAction, KeylistUpdateMessage } from './messages' import { MediationState } from './models/MediationState' -import { StatusRequestMessage, BatchPickupMessage, StatusMessage } from './protocol' -import { StatusHandler, MessageDeliveryHandler } from './protocol/pickup/v2/handlers' import { MediationRepository } from './repository' import { MediationRecipientService } from './services/MediationRecipientService' import { RoutingService } from './services/RoutingService' @@ -49,6 +50,7 @@ export class MediationRecipientApi { private eventEmitter: EventEmitter private logger: Logger private discoverFeaturesApi: DiscoverFeaturesApi + private messagePickupApi: MessagePickupApi private mediationRepository: MediationRepository private routingService: RoutingService private agentContext: AgentContext @@ -65,6 +67,7 @@ export class MediationRecipientApi { messageSender: MessageSender, eventEmitter: EventEmitter, discoverFeaturesApi: DiscoverFeaturesApi, + messagePickupApi: MessagePickupApi, mediationRepository: MediationRepository, routingService: RoutingService, @inject(InjectionSymbols.Logger) logger: Logger, @@ -79,6 +82,7 @@ export class MediationRecipientApi { this.eventEmitter = eventEmitter this.logger = logger this.discoverFeaturesApi = discoverFeaturesApi + this.messagePickupApi = messagePickupApi this.mediationRepository = mediationRepository this.routingService = routingService this.agentContext = agentContext @@ -195,7 +199,11 @@ export class MediationRecipientApi { try { if (pickupStrategy === MediatorPickupStrategy.PickUpV2) { // Start Pickup v2 protocol to receive messages received while websocket offline - await this.sendStatusRequest({ mediatorId: mediator.id }) + await this.messagePickupApi.pickupMessages({ + connectionId: mediator.connectionId, + batchSize: this.config.maximumMessagePickup, + protocolVersion: 'v2', + }) } else { await this.openMediationWebSocket(mediator) } @@ -237,7 +245,11 @@ export class MediationRecipientApi { case MediatorPickupStrategy.PickUpV2: this.logger.info(`Starting pickup of messages from mediator '${mediatorRecord.id}'`) await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy) - await this.sendStatusRequest({ mediatorId: mediatorRecord.id }) + await this.messagePickupApi.pickupMessages({ + connectionId: mediatorConnection.id, + batchSize: this.config.maximumMessagePickup, + protocolVersion: 'v2', + }) break case MediatorPickupStrategy.PickUpV1: { const stopConditions$ = merge(this.stop$, this.stopMessagePickup$).pipe() @@ -247,7 +259,11 @@ export class MediationRecipientApi { .pipe(takeUntil(stopConditions$)) .subscribe({ next: async () => { - await this.pickupMessages(mediatorConnection) + await this.messagePickupApi.pickupMessages({ + connectionId: mediatorConnection.id, + batchSize: this.config.maximumMessagePickup, + protocolVersion: 'v1', + }) }, complete: () => this.logger.info(`Stopping pickup of messages from mediator '${mediatorRecord.id}'`), }) @@ -271,22 +287,6 @@ export class MediationRecipientApi { this.stopMessagePickup$.next(true) } - private async sendStatusRequest(config: { mediatorId: string; recipientKey?: string }) { - const mediationRecord = await this.mediationRecipientService.getById(this.agentContext, config.mediatorId) - - const statusRequestMessage = await this.mediationRecipientService.createStatusRequest(mediationRecord, { - recipientKey: config.recipientKey, - }) - - const mediatorConnection = await this.connectionService.getById(this.agentContext, mediationRecord.connectionId) - return this.messageSender.sendMessage( - new OutboundMessageContext(statusRequestMessage, { - agentContext: this.agentContext, - connection: mediatorConnection, - }) - ) - } - private async getPickupStrategyForMediator(mediator: MediationRecord) { let mediatorPickupStrategy = mediator.pickupStrategy ?? this.config.mediatorPickupStrategy @@ -296,22 +296,22 @@ export class MediationRecipientApi { const discloseForPickupV2 = await this.discoverFeaturesApi.queryFeatures({ connectionId: mediator.connectionId, protocolVersion: 'v1', - queries: [{ featureType: 'protocol', match: StatusMessage.type.protocolUri }], + queries: [{ featureType: 'protocol', match: V2StatusMessage.type.protocolUri }], awaitDisclosures: true, }) - if (discloseForPickupV2.features?.find((item) => item.id === StatusMessage.type.protocolUri)) { + if (discloseForPickupV2.features?.find((item) => item.id === V2StatusMessage.type.protocolUri)) { mediatorPickupStrategy = MediatorPickupStrategy.PickUpV2 } else { const discloseForPickupV1 = await this.discoverFeaturesApi.queryFeatures({ connectionId: mediator.connectionId, protocolVersion: 'v1', - queries: [{ featureType: 'protocol', match: BatchPickupMessage.type.protocolUri }], + queries: [{ featureType: 'protocol', match: V1BatchPickupMessage.type.protocolUri }], awaitDisclosures: true, }) // Use explicit pickup strategy mediatorPickupStrategy = discloseForPickupV1.features?.find( - (item) => item.id === BatchPickupMessage.type.protocolUri + (item) => item.id === V1BatchPickupMessage.type.protocolUri ) ? MediatorPickupStrategy.PickUpV1 : MediatorPickupStrategy.Implicit @@ -329,18 +329,18 @@ export class MediationRecipientApi { return this.mediationRecipientService.discoverMediation(this.agentContext) } + /** + * @deprecated Use `MessagePickupApi.pickupMessages` instead. + * */ public async pickupMessages(mediatorConnection: ConnectionRecord, pickupStrategy?: MediatorPickupStrategy) { mediatorConnection.assertReady() - const pickupMessage = - pickupStrategy === MediatorPickupStrategy.PickUpV2 - ? new StatusRequestMessage({}) - : new BatchPickupMessage({ batchSize: 10 }) - const outboundMessageContext = new OutboundMessageContext(pickupMessage, { - agentContext: this.agentContext, - connection: mediatorConnection, + const messagePickupApi = this.agentContext.dependencyManager.resolve(MessagePickupApi) + + await messagePickupApi.pickupMessages({ + connectionId: mediatorConnection.id, + protocolVersion: pickupStrategy === MediatorPickupStrategy.PickUpV2 ? 'v2' : 'v1', }) - await this.sendMessage(outboundMessageContext, pickupStrategy) } public async setDefaultMediator(mediatorRecord: MediationRecord) { @@ -476,8 +476,6 @@ export class MediationRecipientApi { messageHandlerRegistry.registerMessageHandler(new KeylistUpdateResponseHandler(this.mediationRecipientService)) messageHandlerRegistry.registerMessageHandler(new MediationGrantHandler(this.mediationRecipientService)) messageHandlerRegistry.registerMessageHandler(new MediationDenyHandler(this.mediationRecipientService)) - messageHandlerRegistry.registerMessageHandler(new StatusHandler(this.mediationRecipientService)) - messageHandlerRegistry.registerMessageHandler(new MessageDeliveryHandler(this.mediationRecipientService)) //messageHandlerRegistry.registerMessageHandler(new KeylistListHandler(this.mediationRecipientService)) // TODO: write this } } diff --git a/packages/core/src/modules/routing/MediatorApi.ts b/packages/core/src/modules/routing/MediatorApi.ts index af477e6052..af1bf29e79 100644 --- a/packages/core/src/modules/routing/MediatorApi.ts +++ b/packages/core/src/modules/routing/MediatorApi.ts @@ -2,18 +2,16 @@ import type { MediationRecord } from './repository' import type { EncryptedMessage } from '../../types' import { AgentContext } from '../../agent' -import { EventEmitter } from '../../agent/EventEmitter' import { MessageHandlerRegistry } from '../../agent/MessageHandlerRegistry' import { MessageSender } from '../../agent/MessageSender' import { OutboundMessageContext } from '../../agent/models' import { injectable } from '../../plugins' import { ConnectionService } from '../connections/services' +import { MessagePickupApi } from '../message-pìckup' import { MediatorModuleConfig } from './MediatorModuleConfig' import { ForwardHandler, KeylistUpdateHandler } from './handlers' import { MediationRequestHandler } from './handlers/MediationRequestHandler' -import { MessagePickupService, V2MessagePickupService } from './protocol' -import { BatchHandler, BatchPickupHandler } from './protocol/pickup/v1/handlers' import { MediatorService } from './services/MediatorService' @injectable() @@ -21,28 +19,20 @@ export class MediatorApi { public config: MediatorModuleConfig private mediatorService: MediatorService - private messagePickupService: MessagePickupService private messageSender: MessageSender - private eventEmitter: EventEmitter private agentContext: AgentContext private connectionService: ConnectionService public constructor( messageHandlerRegistry: MessageHandlerRegistry, mediationService: MediatorService, - messagePickupService: MessagePickupService, - // Only imported so it is injected and handlers are registered - v2MessagePickupService: V2MessagePickupService, messageSender: MessageSender, - eventEmitter: EventEmitter, agentContext: AgentContext, connectionService: ConnectionService, config: MediatorModuleConfig ) { this.mediatorService = mediationService - this.messagePickupService = messagePickupService this.messageSender = messageSender - this.eventEmitter = eventEmitter this.connectionService = connectionService this.agentContext = agentContext this.config = config @@ -81,8 +71,12 @@ export class MediatorApi { return mediationRecord } + /** + * @deprecated Use `MessagePickupApi.queueMessage` instead. + * */ public queueMessage(connectionId: string, message: EncryptedMessage) { - return this.messagePickupService.queueMessage(connectionId, message) + const messagePickupApi = this.agentContext.dependencyManager.resolve(MessagePickupApi) + return messagePickupApi.queueMessage({ connectionId, message }) } private registerMessageHandlers(messageHandlerRegistry: MessageHandlerRegistry) { @@ -90,8 +84,6 @@ export class MediatorApi { messageHandlerRegistry.registerMessageHandler( new ForwardHandler(this.mediatorService, this.connectionService, this.messageSender) ) - messageHandlerRegistry.registerMessageHandler(new BatchPickupHandler(this.messagePickupService)) - messageHandlerRegistry.registerMessageHandler(new BatchHandler(this.eventEmitter)) messageHandlerRegistry.registerMessageHandler(new MediationRequestHandler(this.mediatorService, this.config)) } } diff --git a/packages/core/src/modules/routing/MediatorModule.ts b/packages/core/src/modules/routing/MediatorModule.ts index fa4ef31f13..0ddc220263 100644 --- a/packages/core/src/modules/routing/MediatorModule.ts +++ b/packages/core/src/modules/routing/MediatorModule.ts @@ -7,7 +7,6 @@ import { Protocol } from '../../agent/models' import { MediatorApi } from './MediatorApi' import { MediatorModuleConfig } from './MediatorModuleConfig' import { MediationRole } from './models' -import { MessagePickupService, V2MessagePickupService } from './protocol' import { MediationRepository, MediatorRoutingRepository } from './repository' import { MediatorService } from './services' @@ -31,8 +30,6 @@ export class MediatorModule implements Module { // Services dependencyManager.registerSingleton(MediatorService) - dependencyManager.registerSingleton(MessagePickupService) - dependencyManager.registerSingleton(V2MessagePickupService) // Repositories dependencyManager.registerSingleton(MediationRepository) @@ -43,14 +40,6 @@ export class MediatorModule implements Module { new Protocol({ id: 'https://didcomm.org/coordinate-mediation/1.0', roles: [MediationRole.Mediator], - }), - new Protocol({ - id: 'https://didcomm.org/messagepickup/1.0', - roles: ['message_holder', 'recipient', 'batch_sender', 'batch_recipient'], - }), - new Protocol({ - id: 'https://didcomm.org/messagepickup/2.0', - roles: ['mediator', 'recipient'], }) ) } diff --git a/packages/core/src/modules/routing/__tests__/MediatorModule.test.ts b/packages/core/src/modules/routing/__tests__/MediatorModule.test.ts index 5835103180..81ba044281 100644 --- a/packages/core/src/modules/routing/__tests__/MediatorModule.test.ts +++ b/packages/core/src/modules/routing/__tests__/MediatorModule.test.ts @@ -2,7 +2,6 @@ import { FeatureRegistry } from '../../../agent/FeatureRegistry' import { DependencyManager } from '../../../plugins/DependencyManager' import { MediatorApi } from '../MediatorApi' import { MediatorModule } from '../MediatorModule' -import { MessagePickupService, V2MessagePickupService } from '../protocol' import { MediationRepository, MediatorRoutingRepository } from '../repository' import { MediatorService } from '../services' @@ -22,10 +21,8 @@ describe('MediatorModule', () => { expect(dependencyManager.registerContextScoped).toHaveBeenCalledTimes(1) expect(dependencyManager.registerContextScoped).toHaveBeenCalledWith(MediatorApi) - expect(dependencyManager.registerSingleton).toHaveBeenCalledTimes(5) + expect(dependencyManager.registerSingleton).toHaveBeenCalledTimes(3) expect(dependencyManager.registerSingleton).toHaveBeenCalledWith(MediatorService) - expect(dependencyManager.registerSingleton).toHaveBeenCalledWith(MessagePickupService) - expect(dependencyManager.registerSingleton).toHaveBeenCalledWith(V2MessagePickupService) expect(dependencyManager.registerSingleton).toHaveBeenCalledWith(MediationRepository) expect(dependencyManager.registerSingleton).toHaveBeenCalledWith(MediatorRoutingRepository) }) diff --git a/packages/core/src/modules/routing/index.ts b/packages/core/src/modules/routing/index.ts index a644af607a..981dbe5207 100644 --- a/packages/core/src/modules/routing/index.ts +++ b/packages/core/src/modules/routing/index.ts @@ -1,6 +1,5 @@ export * from './messages' export * from './services' -export * from './protocol' export * from './repository' export * from './models' export * from './RoutingEvents' diff --git a/packages/core/src/modules/routing/protocol/index.ts b/packages/core/src/modules/routing/protocol/index.ts deleted file mode 100644 index c18db7326d..0000000000 --- a/packages/core/src/modules/routing/protocol/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './pickup' diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/MessagePickupService.ts b/packages/core/src/modules/routing/protocol/pickup/v1/MessagePickupService.ts deleted file mode 100644 index aa97fc51a4..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v1/MessagePickupService.ts +++ /dev/null @@ -1,62 +0,0 @@ -import type { BatchPickupMessage } from './messages' -import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' -import type { EncryptedMessage } from '../../../../../types' - -import { EventEmitter } from '../../../../../agent/EventEmitter' -import { MessageHandlerRegistry } from '../../../../../agent/MessageHandlerRegistry' -import { OutboundMessageContext } from '../../../../../agent/models' -import { InjectionSymbols } from '../../../../../constants' -import { inject, injectable } from '../../../../../plugins' -import { MessageRepository } from '../../../../../storage/MessageRepository' - -import { BatchHandler, BatchPickupHandler } from './handlers' -import { BatchMessage, BatchMessageMessage } from './messages' - -@injectable() -export class MessagePickupService { - private messageRepository: MessageRepository - private eventEmitter: EventEmitter - - public constructor( - @inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository, - messageHandlerRegistry: MessageHandlerRegistry, - eventEmitter: EventEmitter - ) { - this.messageRepository = messageRepository - this.eventEmitter = eventEmitter - - this.registerMessageHandlers(messageHandlerRegistry) - } - - public async batch(messageContext: InboundMessageContext) { - // Assert ready connection - const connection = messageContext.assertReadyConnection() - - const { message } = messageContext - const messages = await this.messageRepository.takeFromQueue(connection.id, message.batchSize) - - // TODO: each message should be stored with an id. to be able to conform to the id property - // of batch message - const batchMessages = messages.map( - (msg) => - new BatchMessageMessage({ - message: msg, - }) - ) - - const batchMessage = new BatchMessage({ - messages: batchMessages, - }) - - return new OutboundMessageContext(batchMessage, { agentContext: messageContext.agentContext, connection }) - } - - public async queueMessage(connectionId: string, message: EncryptedMessage) { - await this.messageRepository.add(connectionId, message) - } - - protected registerMessageHandlers(messageHandlerRegistry: MessageHandlerRegistry) { - messageHandlerRegistry.registerMessageHandler(new BatchPickupHandler(this)) - messageHandlerRegistry.registerMessageHandler(new BatchHandler(this.eventEmitter)) - } -} diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchHandler.ts deleted file mode 100644 index 30d8e5263f..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchHandler.ts +++ /dev/null @@ -1,32 +0,0 @@ -import type { EventEmitter } from '../../../../../../agent/EventEmitter' -import type { AgentMessageReceivedEvent } from '../../../../../../agent/Events' -import type { MessageHandler, MessageHandlerInboundMessage } from '../../../../../../agent/MessageHandler' - -import { AgentEventTypes } from '../../../../../../agent/Events' -import { BatchMessage } from '../messages' - -export class BatchHandler implements MessageHandler { - private eventEmitter: EventEmitter - public supportedMessages = [BatchMessage] - - public constructor(eventEmitter: EventEmitter) { - this.eventEmitter = eventEmitter - } - - public async handle(messageContext: MessageHandlerInboundMessage) { - const { message } = messageContext - - messageContext.assertReadyConnection() - - const forwardedMessages = message.messages - forwardedMessages.forEach((message) => { - this.eventEmitter.emit(messageContext.agentContext, { - type: AgentEventTypes.AgentMessageReceived, - payload: { - message: message.message, - contextCorrelationId: messageContext.agentContext.contextCorrelationId, - }, - }) - }) - } -} diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchPickupHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchPickupHandler.ts deleted file mode 100644 index e47fb6c2f5..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v1/handlers/BatchPickupHandler.ts +++ /dev/null @@ -1,19 +0,0 @@ -import type { MessageHandler, MessageHandlerInboundMessage } from '../../../../../../agent/MessageHandler' -import type { MessagePickupService } from '../MessagePickupService' - -import { BatchPickupMessage } from '../messages' - -export class BatchPickupHandler implements MessageHandler { - private messagePickupService: MessagePickupService - public supportedMessages = [BatchPickupMessage] - - public constructor(messagePickupService: MessagePickupService) { - this.messagePickupService = messagePickupService - } - - public async handle(messageContext: MessageHandlerInboundMessage) { - messageContext.assertReadyConnection() - - return this.messagePickupService.batch(messageContext) - } -} diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/handlers/index.ts b/packages/core/src/modules/routing/protocol/pickup/v1/handlers/index.ts deleted file mode 100644 index d7a709a49d..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v1/handlers/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './BatchHandler' -export * from './BatchPickupHandler' diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/index.ts b/packages/core/src/modules/routing/protocol/pickup/v1/index.ts deleted file mode 100644 index 9174e24a93..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v1/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './MessagePickupService' -export * from './messages' diff --git a/packages/core/src/modules/routing/protocol/pickup/v1/messages/index.ts b/packages/core/src/modules/routing/protocol/pickup/v1/messages/index.ts deleted file mode 100644 index 8e32f97f68..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v1/messages/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './BatchMessage' -export * from './BatchPickupMessage' diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/V2MessagePickupService.ts b/packages/core/src/modules/routing/protocol/pickup/v2/V2MessagePickupService.ts deleted file mode 100644 index dc99c47856..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v2/V2MessagePickupService.ts +++ /dev/null @@ -1,124 +0,0 @@ -import type { DeliveryRequestMessage, MessagesReceivedMessage, StatusRequestMessage } from './messages' -import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' -import type { EncryptedMessage } from '../../../../../types' - -import { MessageHandlerRegistry } from '../../../../../agent/MessageHandlerRegistry' -import { OutboundMessageContext } from '../../../../../agent/models' -import { InjectionSymbols } from '../../../../../constants' -import { Attachment } from '../../../../../decorators/attachment/Attachment' -import { AriesFrameworkError } from '../../../../../error' -import { inject, injectable } from '../../../../../plugins' -import { MessageRepository } from '../../../../../storage/MessageRepository' -import { MediationRecipientService } from '../../../services' - -import { - DeliveryRequestHandler, - MessageDeliveryHandler, - MessagesReceivedHandler, - StatusHandler, - StatusRequestHandler, -} from './handlers' -import { MessageDeliveryMessage, StatusMessage } from './messages' - -@injectable() -export class V2MessagePickupService { - private messageRepository: MessageRepository - private mediationRecipientService: MediationRecipientService - - public constructor( - @inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository, - messageHandlerRegistry: MessageHandlerRegistry, - mediationRecipientService: MediationRecipientService - ) { - this.messageRepository = messageRepository - this.mediationRecipientService = mediationRecipientService - - this.registerMessageHandlers(messageHandlerRegistry) - } - - public async processStatusRequest(messageContext: InboundMessageContext) { - // Assert ready connection - const connection = messageContext.assertReadyConnection() - - if (messageContext.message.recipientKey) { - throw new AriesFrameworkError('recipient_key parameter not supported') - } - - const statusMessage = new StatusMessage({ - threadId: messageContext.message.threadId, - messageCount: await this.messageRepository.getAvailableMessageCount(connection.id), - }) - - return new OutboundMessageContext(statusMessage, { agentContext: messageContext.agentContext, connection }) - } - - public async queueMessage(connectionId: string, message: EncryptedMessage) { - await this.messageRepository.add(connectionId, message) - } - - public async processDeliveryRequest(messageContext: InboundMessageContext) { - // Assert ready connection - const connection = messageContext.assertReadyConnection() - - if (messageContext.message.recipientKey) { - throw new AriesFrameworkError('recipient_key parameter not supported') - } - - const { message } = messageContext - - // Get available messages from queue, but don't delete them - const messages = await this.messageRepository.takeFromQueue(connection.id, message.limit, true) - - // TODO: each message should be stored with an id. to be able to conform to the id property - // of delivery message - const attachments = messages.map( - (msg) => - new Attachment({ - data: { - json: msg, - }, - }) - ) - - const outboundMessageContext = - messages.length > 0 - ? new MessageDeliveryMessage({ - threadId: messageContext.message.threadId, - attachments, - }) - : new StatusMessage({ - threadId: messageContext.message.threadId, - messageCount: 0, - }) - - return new OutboundMessageContext(outboundMessageContext, { agentContext: messageContext.agentContext, connection }) - } - - public async processMessagesReceived(messageContext: InboundMessageContext) { - // Assert ready connection - const connection = messageContext.assertReadyConnection() - - const { message } = messageContext - - // TODO: Add Queued Message ID - await this.messageRepository.takeFromQueue( - connection.id, - message.messageIdList ? message.messageIdList.length : undefined - ) - - const statusMessage = new StatusMessage({ - threadId: messageContext.message.threadId, - messageCount: await this.messageRepository.getAvailableMessageCount(connection.id), - }) - - return new OutboundMessageContext(statusMessage, { agentContext: messageContext.agentContext, connection }) - } - - protected registerMessageHandlers(messageHandlerRegistry: MessageHandlerRegistry) { - messageHandlerRegistry.registerMessageHandler(new StatusRequestHandler(this)) - messageHandlerRegistry.registerMessageHandler(new DeliveryRequestHandler(this)) - messageHandlerRegistry.registerMessageHandler(new MessagesReceivedHandler(this)) - messageHandlerRegistry.registerMessageHandler(new StatusHandler(this.mediationRecipientService)) - messageHandlerRegistry.registerMessageHandler(new MessageDeliveryHandler(this.mediationRecipientService)) - } -} diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/DeliveryRequestHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/DeliveryRequestHandler.ts deleted file mode 100644 index 78334b5448..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/DeliveryRequestHandler.ts +++ /dev/null @@ -1,19 +0,0 @@ -import type { MessageHandler } from '../../../../../../agent/MessageHandler' -import type { InboundMessageContext } from '../../../../../../agent/models/InboundMessageContext' -import type { V2MessagePickupService } from '../V2MessagePickupService' - -import { DeliveryRequestMessage } from '../messages' - -export class DeliveryRequestHandler implements MessageHandler { - public supportedMessages = [DeliveryRequestMessage] - private messagePickupService: V2MessagePickupService - - public constructor(messagePickupService: V2MessagePickupService) { - this.messagePickupService = messagePickupService - } - - public async handle(messageContext: InboundMessageContext) { - messageContext.assertReadyConnection() - return this.messagePickupService.processDeliveryRequest(messageContext) - } -} diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessageDeliveryHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessageDeliveryHandler.ts deleted file mode 100644 index 606647edb9..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessageDeliveryHandler.ts +++ /dev/null @@ -1,27 +0,0 @@ -import type { MessageHandler } from '../../../../../../agent/MessageHandler' -import type { InboundMessageContext } from '../../../../../../agent/models/InboundMessageContext' -import type { MediationRecipientService } from '../../../../services' - -import { OutboundMessageContext } from '../../../../../../agent/models' -import { MessageDeliveryMessage } from '../messages/MessageDeliveryMessage' - -export class MessageDeliveryHandler implements MessageHandler { - public supportedMessages = [MessageDeliveryMessage] - private mediationRecipientService: MediationRecipientService - - public constructor(mediationRecipientService: MediationRecipientService) { - this.mediationRecipientService = mediationRecipientService - } - - public async handle(messageContext: InboundMessageContext) { - const connection = messageContext.assertReadyConnection() - const deliveryReceivedMessage = await this.mediationRecipientService.processDelivery(messageContext) - - if (deliveryReceivedMessage) { - return new OutboundMessageContext(deliveryReceivedMessage, { - agentContext: messageContext.agentContext, - connection, - }) - } - } -} diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessagesReceivedHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessagesReceivedHandler.ts deleted file mode 100644 index 7ddf4b6d75..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/MessagesReceivedHandler.ts +++ /dev/null @@ -1,19 +0,0 @@ -import type { MessageHandler } from '../../../../../../agent/MessageHandler' -import type { InboundMessageContext } from '../../../../../../agent/models/InboundMessageContext' -import type { V2MessagePickupService } from '../V2MessagePickupService' - -import { MessagesReceivedMessage } from '../messages' - -export class MessagesReceivedHandler implements MessageHandler { - public supportedMessages = [MessagesReceivedMessage] - private messagePickupService: V2MessagePickupService - - public constructor(messagePickupService: V2MessagePickupService) { - this.messagePickupService = messagePickupService - } - - public async handle(messageContext: InboundMessageContext) { - messageContext.assertReadyConnection() - return this.messagePickupService.processMessagesReceived(messageContext) - } -} diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusHandler.ts deleted file mode 100644 index 7fedcc381f..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusHandler.ts +++ /dev/null @@ -1,27 +0,0 @@ -import type { MessageHandler } from '../../../../../../agent/MessageHandler' -import type { InboundMessageContext } from '../../../../../../agent/models/InboundMessageContext' -import type { MediationRecipientService } from '../../../../services' - -import { OutboundMessageContext } from '../../../../../../agent/models' -import { StatusMessage } from '../messages' - -export class StatusHandler implements MessageHandler { - public supportedMessages = [StatusMessage] - private mediatorRecipientService: MediationRecipientService - - public constructor(mediatorRecipientService: MediationRecipientService) { - this.mediatorRecipientService = mediatorRecipientService - } - - public async handle(messageContext: InboundMessageContext) { - const connection = messageContext.assertReadyConnection() - const deliveryRequestMessage = await this.mediatorRecipientService.processStatus(messageContext) - - if (deliveryRequestMessage) { - return new OutboundMessageContext(deliveryRequestMessage, { - agentContext: messageContext.agentContext, - connection, - }) - } - } -} diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusRequestHandler.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusRequestHandler.ts deleted file mode 100644 index 1e7c67ae1d..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/StatusRequestHandler.ts +++ /dev/null @@ -1,19 +0,0 @@ -import type { MessageHandler } from '../../../../../../agent/MessageHandler' -import type { InboundMessageContext } from '../../../../../../agent/models/InboundMessageContext' -import type { V2MessagePickupService } from '../V2MessagePickupService' - -import { StatusRequestMessage } from '../messages' - -export class StatusRequestHandler implements MessageHandler { - public supportedMessages = [StatusRequestMessage] - private messagePickupService: V2MessagePickupService - - public constructor(messagePickupService: V2MessagePickupService) { - this.messagePickupService = messagePickupService - } - - public async handle(messageContext: InboundMessageContext) { - messageContext.assertReadyConnection() - return this.messagePickupService.processStatusRequest(messageContext) - } -} diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/index.ts b/packages/core/src/modules/routing/protocol/pickup/v2/handlers/index.ts deleted file mode 100644 index c8f4456634..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v2/handlers/index.ts +++ /dev/null @@ -1,5 +0,0 @@ -export * from './DeliveryRequestHandler' -export * from './MessageDeliveryHandler' -export * from './MessagesReceivedHandler' -export * from './StatusHandler' -export * from './StatusRequestHandler' diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/index.ts b/packages/core/src/modules/routing/protocol/pickup/v2/index.ts deleted file mode 100644 index b6a5eb72c5..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v2/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './V2MessagePickupService' -export * from './messages' diff --git a/packages/core/src/modules/routing/protocol/pickup/v2/messages/index.ts b/packages/core/src/modules/routing/protocol/pickup/v2/messages/index.ts deleted file mode 100644 index fa807e7249..0000000000 --- a/packages/core/src/modules/routing/protocol/pickup/v2/messages/index.ts +++ /dev/null @@ -1,5 +0,0 @@ -export * from './DeliveryRequestMessage' -export * from './MessageDeliveryMessage' -export * from './MessagesReceivedMessage' -export * from './StatusMessage' -export * from './StatusRequestMessage' diff --git a/packages/core/src/modules/routing/services/MediationRecipientService.ts b/packages/core/src/modules/routing/services/MediationRecipientService.ts index f5149a3828..a2b861a602 100644 --- a/packages/core/src/modules/routing/services/MediationRecipientService.ts +++ b/packages/core/src/modules/routing/services/MediationRecipientService.ts @@ -1,21 +1,18 @@ import type { GetRoutingOptions, RemoveRoutingOptions } from './RoutingService' import type { AgentContext } from '../../../agent' import type { AgentMessage } from '../../../agent/AgentMessage' -import type { AgentMessageReceivedEvent } from '../../../agent/Events' import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext' import type { Query } from '../../../storage/StorageService' -import type { EncryptedMessage } from '../../../types' import type { ConnectionRecord } from '../../connections' import type { Routing } from '../../connections/services/ConnectionService' import type { MediationStateChangedEvent, KeylistUpdatedEvent } from '../RoutingEvents' import type { MediationDenyMessage } from '../messages' -import type { StatusMessage, MessageDeliveryMessage } from '../protocol' import { firstValueFrom, ReplaySubject } from 'rxjs' import { filter, first, timeout } from 'rxjs/operators' import { EventEmitter } from '../../../agent/EventEmitter' -import { filterContextCorrelationId, AgentEventTypes } from '../../../agent/Events' +import { filterContextCorrelationId } from '../../../agent/Events' import { MessageSender } from '../../../agent/MessageSender' import { OutboundMessageContext } from '../../../agent/models' import { Key, KeyType } from '../../../crypto' @@ -27,10 +24,7 @@ import { ConnectionMetadataKeys } from '../../connections/repository/ConnectionM import { ConnectionService } from '../../connections/services/ConnectionService' import { DidKey } from '../../dids' import { didKeyToVerkey, isDidKey } from '../../dids/helpers' -import { ProblemReportError } from '../../problem-reports' -import { MediationRecipientModuleConfig } from '../MediationRecipientModuleConfig' import { RoutingEventTypes } from '../RoutingEvents' -import { RoutingProblemReportReason } from '../error' import { KeylistUpdateAction, KeylistUpdateResponseMessage, @@ -39,7 +33,6 @@ import { } from '../messages' import { KeylistUpdate, KeylistUpdateMessage } from '../messages/KeylistUpdateMessage' import { MediationRole, MediationState } from '../models' -import { DeliveryRequestMessage, MessagesReceivedMessage, StatusRequestMessage } from '../protocol/pickup/v2/messages' import { MediationRecord } from '../repository/MediationRecord' import { MediationRepository } from '../repository/MediationRepository' @@ -49,37 +42,17 @@ export class MediationRecipientService { private eventEmitter: EventEmitter private connectionService: ConnectionService private messageSender: MessageSender - private mediationRecipientModuleConfig: MediationRecipientModuleConfig public constructor( connectionService: ConnectionService, messageSender: MessageSender, mediatorRepository: MediationRepository, - eventEmitter: EventEmitter, - mediationRecipientModuleConfig: MediationRecipientModuleConfig + eventEmitter: EventEmitter ) { this.mediationRepository = mediatorRepository this.eventEmitter = eventEmitter this.connectionService = connectionService this.messageSender = messageSender - this.mediationRecipientModuleConfig = mediationRecipientModuleConfig - } - - public async createStatusRequest( - mediationRecord: MediationRecord, - config: { - recipientKey?: string - } = {} - ) { - mediationRecord.assertRole(MediationRole.Recipient) - mediationRecord.assertReady() - - const { recipientKey } = config - const statusRequest = new StatusRequestMessage({ - recipientKey, - }) - - return statusRequest } public async createRequest( @@ -308,81 +281,6 @@ export class MediationRecipientService { return mediationRecord } - public async processStatus(messageContext: InboundMessageContext) { - const connection = messageContext.assertReadyConnection() - const { message: statusMessage } = messageContext - const { messageCount, recipientKey } = statusMessage - - //No messages to be sent - if (messageCount === 0) { - const { message, connectionRecord } = await this.connectionService.createTrustPing( - messageContext.agentContext, - connection, - { - responseRequested: false, - } - ) - const websocketSchemes = ['ws', 'wss'] - - await this.messageSender.sendMessage( - new OutboundMessageContext(message, { - agentContext: messageContext.agentContext, - connection: connectionRecord, - }), - { - transportPriority: { - schemes: websocketSchemes, - restrictive: true, - // TODO: add keepAlive: true to enforce through the public api - // we need to keep the socket alive. It already works this way, but would - // be good to make more explicit from the public facing API. - // This would also make it easier to change the internal API later on. - // keepAlive: true, - }, - } - ) - - return null - } - const { maximumMessagePickup } = this.mediationRecipientModuleConfig - const limit = messageCount < maximumMessagePickup ? messageCount : maximumMessagePickup - - const deliveryRequestMessage = new DeliveryRequestMessage({ - limit, - recipientKey, - }) - - return deliveryRequestMessage - } - - public async processDelivery(messageContext: InboundMessageContext) { - messageContext.assertReadyConnection() - - const { appendedAttachments } = messageContext.message - - if (!appendedAttachments) - throw new ProblemReportError('Error processing attachments', { - problemCode: RoutingProblemReportReason.ErrorProcessingAttachments, - }) - - const ids: string[] = [] - for (const attachment of appendedAttachments) { - ids.push(attachment.id) - - this.eventEmitter.emit(messageContext.agentContext, { - type: AgentEventTypes.AgentMessageReceived, - payload: { - message: attachment.getDataAsJson(), - contextCorrelationId: messageContext.agentContext.contextCorrelationId, - }, - }) - } - - return new MessagesReceivedMessage({ - messageIdList: ids, - }) - } - /** * Update the record to a new state and emit an state changed event. Also updates the record * in storage. diff --git a/packages/core/src/modules/routing/services/__tests__/MediationRecipientService.test.ts b/packages/core/src/modules/routing/services/__tests__/MediationRecipientService.test.ts index e958a3d587..f8c2077310 100644 --- a/packages/core/src/modules/routing/services/__tests__/MediationRecipientService.test.ts +++ b/packages/core/src/modules/routing/services/__tests__/MediationRecipientService.test.ts @@ -3,12 +3,9 @@ import type { Routing } from '../../../connections/services/ConnectionService' import { getAgentConfig, getAgentContext, getMockConnection, mockFunction } from '../../../../../tests/helpers' import { EventEmitter } from '../../../../agent/EventEmitter' -import { AgentEventTypes } from '../../../../agent/Events' import { MessageSender } from '../../../../agent/MessageSender' import { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' import { Key } from '../../../../crypto' -import { Attachment } from '../../../../decorators/attachment/Attachment' -import { AriesFrameworkError } from '../../../../error' import { uuid } from '../../../../utils/uuid' import { DidExchangeState } from '../../../connections' import { ConnectionMetadataKeys } from '../../../connections/repository/ConnectionMetadataTypes' @@ -25,7 +22,6 @@ import { MediationGrantMessage, } from '../../messages' import { MediationRole, MediationState } from '../../models' -import { DeliveryRequestMessage, MessageDeliveryMessage, MessagesReceivedMessage, StatusMessage } from '../../protocol' import { MediationRecord } from '../../repository/MediationRecord' import { MediationRepository } from '../../repository/MediationRepository' import { MediationRecipientService } from '../MediationRecipientService' @@ -50,10 +46,6 @@ const DidRegistrarServiceMock = DidRegistrarService as jest.Mock { const config = getAgentConfig('MediationRecipientServiceTest', { endpoints: ['http://agent.com:8080'], @@ -105,8 +97,7 @@ describe('MediationRecipientService', () => { connectionService, messageSender, mediationRepository, - eventEmitter, - new MediationRecipientModuleConfig() + eventEmitter ) }) @@ -156,33 +147,6 @@ describe('MediationRecipientService', () => { }) }) - describe('createStatusRequest', () => { - it('creates a status request message', async () => { - const statusRequestMessage = await mediationRecipientService.createStatusRequest(mediationRecord, { - recipientKey: 'a-key', - }) - - expect(statusRequestMessage).toMatchObject({ - id: expect.any(String), - recipientKey: 'a-key', - }) - }) - - it('it throws an error when the mediation record has incorrect role or state', async () => { - mediationRecord.role = MediationRole.Mediator - await expect(mediationRecipientService.createStatusRequest(mediationRecord)).rejects.toThrowError( - 'Mediation record has invalid role MEDIATOR. Expected role RECIPIENT.' - ) - - mediationRecord.role = MediationRole.Recipient - mediationRecord.state = MediationState.Requested - - await expect(mediationRecipientService.createStatusRequest(mediationRecord)).rejects.toThrowError( - 'Mediation record is not ready to be used. Expected granted, found invalid state requested' - ) - }) - }) - describe('processKeylistUpdateResults', () => { it('it stores did:key-encoded keys in base58 format', async () => { const spyAddRecipientKey = jest.spyOn(mediationRecord, 'addRecipientKey') @@ -226,119 +190,6 @@ describe('MediationRecipientService', () => { }) }) - describe('processStatus', () => { - it('if status request has a message count of zero returns nothing', async () => { - const status = new StatusMessage({ - threadId: uuid(), - messageCount: 0, - }) - - const messageContext = new InboundMessageContext(status, { connection: mockConnection, agentContext }) - const deliveryRequestMessage = await mediationRecipientService.processStatus(messageContext) - expect(deliveryRequestMessage).toBeNull() - }) - - it('if it has a message count greater than zero return a valid delivery request', async () => { - const status = new StatusMessage({ - threadId: uuid(), - messageCount: 1, - }) - const messageContext = new InboundMessageContext(status, { connection: mockConnection, agentContext }) - - const deliveryRequestMessage = await mediationRecipientService.processStatus(messageContext) - expect(deliveryRequestMessage) - expect(deliveryRequestMessage).toEqual(new DeliveryRequestMessage({ id: deliveryRequestMessage?.id, limit: 1 })) - }) - }) - - describe('processDelivery', () => { - it('if the delivery has no attachments expect an error', async () => { - const messageContext = new InboundMessageContext({} as MessageDeliveryMessage, { - connection: mockConnection, - agentContext, - }) - - await expect(mediationRecipientService.processDelivery(messageContext)).rejects.toThrowError( - new AriesFrameworkError('Error processing attachments') - ) - }) - - it('should return a message received with an message id list in it', async () => { - const messageDeliveryMessage = new MessageDeliveryMessage({ - threadId: uuid(), - attachments: [ - new Attachment({ - id: '1', - data: { - json: { - a: 'value', - }, - }, - }), - ], - }) - const messageContext = new InboundMessageContext(messageDeliveryMessage, { - connection: mockConnection, - agentContext, - }) - - const messagesReceivedMessage = await mediationRecipientService.processDelivery(messageContext) - - expect(messagesReceivedMessage).toEqual( - new MessagesReceivedMessage({ - id: messagesReceivedMessage.id, - messageIdList: ['1'], - }) - ) - }) - - it('calls the event emitter for each message', async () => { - const messageDeliveryMessage = new MessageDeliveryMessage({ - threadId: uuid(), - attachments: [ - new Attachment({ - id: '1', - data: { - json: { - first: 'value', - }, - }, - }), - new Attachment({ - id: '2', - data: { - json: { - second: 'value', - }, - }, - }), - ], - }) - const messageContext = new InboundMessageContext(messageDeliveryMessage, { - connection: mockConnection, - agentContext, - }) - - await mediationRecipientService.processDelivery(messageContext) - - expect(eventEmitter.emit).toHaveBeenCalledTimes(2) - expect(eventEmitter.emit).toHaveBeenNthCalledWith(1, agentContext, { - type: AgentEventTypes.AgentMessageReceived, - payload: { - message: { first: 'value' }, - contextCorrelationId: agentContext.contextCorrelationId, - }, - }) - expect(eventEmitter.emit).toHaveBeenNthCalledWith(2, agentContext, { - type: AgentEventTypes.AgentMessageReceived, - payload: { - message: { second: 'value' }, - contextCorrelationId: agentContext.contextCorrelationId, - }, - }) - }) - }) - describe('addMediationRouting', () => { const routingKey = Key.fromFingerprint('z6Mkk7yqnGF3YwTrLpqrW6PGsKci7dNqh1CjnvMbzrMerSeL') const recipientKey = Key.fromFingerprint('z6MkmjY8GnV5i9YTDtPETC2uUAW6ejw3nk5mXF5yci5ab7th') diff --git a/packages/core/src/modules/routing/services/__tests__/V2MessagePickupService.test.ts b/packages/core/src/modules/routing/services/__tests__/V2MessagePickupService.test.ts deleted file mode 100644 index 95055f4945..0000000000 --- a/packages/core/src/modules/routing/services/__tests__/V2MessagePickupService.test.ts +++ /dev/null @@ -1,251 +0,0 @@ -import type { MessageRepository } from '../../../../storage/MessageRepository' -import type { EncryptedMessage } from '../../../../types' - -import { getAgentContext, getMockConnection, mockFunction } from '../../../../../tests/helpers' -import { MessageHandlerRegistry } from '../../../../agent/MessageHandlerRegistry' -import { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' -import { InMemoryMessageRepository } from '../../../../storage/InMemoryMessageRepository' -import { DidExchangeState } from '../../../connections' -import { - DeliveryRequestMessage, - MessageDeliveryMessage, - MessagesReceivedMessage, - StatusMessage, - StatusRequestMessage, - V2MessagePickupService, -} from '../../protocol' -import { MediationRecipientService } from '../MediationRecipientService' - -const mockConnection = getMockConnection({ - state: DidExchangeState.Completed, -}) - -// Mock classes -jest.mock('../MediationRecipientService') -jest.mock('../../../../storage/InMemoryMessageRepository') -jest.mock('../../../../agent/MessageHandlerRegistry') - -// Mock typed object -const MediationRecipientServiceMock = MediationRecipientService as jest.Mock -const MessageHandlerRegistryMock = MessageHandlerRegistry as jest.Mock -const InMessageRepositoryMock = InMemoryMessageRepository as jest.Mock - -const agentContext = getAgentContext() - -const encryptedMessage: EncryptedMessage = { - protected: 'base64url', - iv: 'base64url', - ciphertext: 'base64url', - tag: 'base64url', -} -const queuedMessages = [encryptedMessage, encryptedMessage, encryptedMessage] - -describe('V2MessagePickupService', () => { - let pickupService: V2MessagePickupService - let messageRepository: MessageRepository - - beforeEach(async () => { - const messageHandlerRegistry = new MessageHandlerRegistryMock() - const mediationRecipientService = new MediationRecipientServiceMock() - - messageRepository = new InMessageRepositoryMock() - pickupService = new V2MessagePickupService(messageRepository, messageHandlerRegistry, mediationRecipientService) - }) - - describe('processStatusRequest', () => { - test('no available messages in queue', async () => { - mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(0) - - const statusRequest = new StatusRequestMessage({}) - - const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) - - const { connection, message } = await pickupService.processStatusRequest(messageContext) - - expect(connection).toEqual(mockConnection) - expect(message).toEqual( - new StatusMessage({ - id: message.id, - threadId: statusRequest.threadId, - messageCount: 0, - }) - ) - expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) - }) - - test('multiple messages in queue', async () => { - mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(5) - const statusRequest = new StatusRequestMessage({}) - - const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) - - const { connection, message } = await pickupService.processStatusRequest(messageContext) - - expect(connection).toEqual(mockConnection) - expect(message).toEqual( - new StatusMessage({ - id: message.id, - threadId: statusRequest.threadId, - messageCount: 5, - }) - ) - expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) - }) - - test('status request specifying recipient key', async () => { - mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(10) - - const statusRequest = new StatusRequestMessage({ - recipientKey: 'recipientKey', - }) - - const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) - - await expect(pickupService.processStatusRequest(messageContext)).rejects.toThrowError( - 'recipient_key parameter not supported' - ) - }) - }) - - describe('processDeliveryRequest', () => { - test('no available messages in queue', async () => { - mockFunction(messageRepository.takeFromQueue).mockResolvedValue([]) - - const deliveryRequest = new DeliveryRequestMessage({ limit: 10 }) - - const messageContext = new InboundMessageContext(deliveryRequest, { connection: mockConnection, agentContext }) - - const { connection, message } = await pickupService.processDeliveryRequest(messageContext) - - expect(connection).toEqual(mockConnection) - expect(message).toEqual( - new StatusMessage({ - id: message.id, - threadId: deliveryRequest.threadId, - messageCount: 0, - }) - ) - expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 10, true) - }) - - test('less messages in queue than limit', async () => { - mockFunction(messageRepository.takeFromQueue).mockResolvedValue(queuedMessages) - - const deliveryRequest = new DeliveryRequestMessage({ limit: 10 }) - - const messageContext = new InboundMessageContext(deliveryRequest, { connection: mockConnection, agentContext }) - - const { connection, message } = await pickupService.processDeliveryRequest(messageContext) - - expect(connection).toEqual(mockConnection) - expect(message).toBeInstanceOf(MessageDeliveryMessage) - expect(message.threadId).toEqual(deliveryRequest.threadId) - expect(message.appendedAttachments?.length).toEqual(3) - expect(message.appendedAttachments).toEqual( - expect.arrayContaining( - queuedMessages.map((msg) => - expect.objectContaining({ - data: { - json: msg, - }, - }) - ) - ) - ) - expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 10, true) - }) - - test('more messages in queue than limit', async () => { - mockFunction(messageRepository.takeFromQueue).mockResolvedValue(queuedMessages.slice(0, 2)) - - const deliveryRequest = new DeliveryRequestMessage({ limit: 2 }) - - const messageContext = new InboundMessageContext(deliveryRequest, { connection: mockConnection, agentContext }) - - const { connection, message } = await pickupService.processDeliveryRequest(messageContext) - - expect(connection).toEqual(mockConnection) - expect(message).toBeInstanceOf(MessageDeliveryMessage) - expect(message.threadId).toEqual(deliveryRequest.threadId) - expect(message.appendedAttachments?.length).toEqual(2) - expect(message.appendedAttachments).toEqual( - expect.arrayContaining( - queuedMessages.slice(0, 2).map((msg) => - expect.objectContaining({ - data: { - json: msg, - }, - }) - ) - ) - ) - expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 2, true) - }) - - test('delivery request specifying recipient key', async () => { - mockFunction(messageRepository.takeFromQueue).mockResolvedValue(queuedMessages) - - const statusRequest = new DeliveryRequestMessage({ - limit: 10, - recipientKey: 'recipientKey', - }) - - const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) - - await expect(pickupService.processStatusRequest(messageContext)).rejects.toThrowError( - 'recipient_key parameter not supported' - ) - }) - }) - - describe('processMessagesReceived', () => { - test('messages received partially', async () => { - mockFunction(messageRepository.takeFromQueue).mockResolvedValue(queuedMessages) - mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(4) - - const messagesReceived = new MessagesReceivedMessage({ - messageIdList: ['1', '2'], - }) - - const messageContext = new InboundMessageContext(messagesReceived, { connection: mockConnection, agentContext }) - - const { connection, message } = await pickupService.processMessagesReceived(messageContext) - - expect(connection).toEqual(mockConnection) - expect(message).toEqual( - new StatusMessage({ - id: message.id, - threadId: messagesReceived.threadId, - messageCount: 4, - }) - ) - expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) - expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 2) - }) - - test('all messages have been received', async () => { - mockFunction(messageRepository.takeFromQueue).mockResolvedValue(queuedMessages) - mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(0) - - const messagesReceived = new MessagesReceivedMessage({ - messageIdList: ['1', '2'], - }) - - const messageContext = new InboundMessageContext(messagesReceived, { connection: mockConnection, agentContext }) - - const { connection, message } = await pickupService.processMessagesReceived(messageContext) - - expect(connection).toEqual(mockConnection) - expect(message).toEqual( - new StatusMessage({ - id: message.id, - threadId: messagesReceived.threadId, - messageCount: 0, - }) - ) - - expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) - expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 2) - }) - }) -})