From f9957009a222823a0b4cce73b831cc29c113b7f1 Mon Sep 17 00:00:00 2001 From: Kevin Viglucci Date: Tue, 9 Aug 2022 09:35:29 -0500 Subject: [PATCH] Optional metadata for rxjs requesters (#241) * feat: support optional metadata in rxjs adapter requesters Signed-off-by: Kevin Viglucci * chore(release): release - rsocket-adapter-rxjs@1.0.0-alpha-rxjs-adapter-optional-metadata.0 - rsocket-composite-metadata@1.0.0-alpha-rxjs-adapter-optional-metadata.0 - rsocket-core@1.0.0-alpha-rxjs-adapter-optional-metadata.0 - rsocket-examples@1.0.0-alpha-rxjs-adapter-optional-metadata.0 - rsocket-messaging@1.0.0-alpha-rxjs-adapter-optional-metadata.0 - rsocket-tcp-client@1.0.0-alpha-rxjs-adapter-optional-metadata.0 - rsocket-tcp-server@1.0.0-alpha-rxjs-adapter-optional-metadata.0 - rsocket-websocket-client@1.0.0-alpha-rxjs-adapter-optional-metadata.0 - rsocket-websocket-server@1.0.0-alpha-rxjs-adapter-optional-metadata.0 Signed-off-by: Kevin Viglucci --- packages/rsocket-adapter-rxjs/package.json | 6 +- .../rsocket-adapter-rxjs/src/Requesters.ts | 100 ++++++++++-------- .../rsocket-composite-metadata/package.json | 4 +- packages/rsocket-core/package.json | 2 +- packages/rsocket-examples/package.json | 16 +-- packages/rsocket-messaging/package.json | 6 +- packages/rsocket-tcp-client/package.json | 4 +- packages/rsocket-tcp-server/package.json | 4 +- .../rsocket-websocket-client/package.json | 4 +- .../rsocket-websocket-server/package.json | 4 +- 10 files changed, 79 insertions(+), 71 deletions(-) diff --git a/packages/rsocket-adapter-rxjs/package.json b/packages/rsocket-adapter-rxjs/package.json index 745a1d1..5411149 100644 --- a/packages/rsocket-adapter-rxjs/package.json +++ b/packages/rsocket-adapter-rxjs/package.json @@ -1,6 +1,6 @@ { "name": "rsocket-adapter-rxjs", - "version": "1.0.0-alpha.2", + "version": "1.0.0-alpha-rxjs-adapter-optional-metadata.0", "license": "Apache-2.0", "main": "dist/index", "types": "dist/index", @@ -19,8 +19,8 @@ "test": "echo \"Error: no test specified\" && exit 0" }, "dependencies": { - "rsocket-core": "^1.0.0-alpha.1", - "rsocket-messaging": "^1.0.0-alpha.2", + "rsocket-core": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", + "rsocket-messaging": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", "rxjs": "^7.4.0" }, "devDependencies": { diff --git a/packages/rsocket-adapter-rxjs/src/Requesters.ts b/packages/rsocket-adapter-rxjs/src/Requesters.ts index d6b2046..bd6d518 100644 --- a/packages/rsocket-adapter-rxjs/src/Requesters.ts +++ b/packages/rsocket-adapter-rxjs/src/Requesters.ts @@ -48,21 +48,23 @@ export function fireAndForget( inputCodec: Codec ): ( rsocket: RSocket, - metadata: Map + metadata?: Map ) => Observable { return ( rsocket: RSocket, - metadata: Map - ) => - new RSocketPublisherToObservable((s) => + metadata?: Map + ) => { + const encodedMetadata = metadata ? encodeCompositeMetadata(metadata) : null; + return new RSocketPublisherToObservable((s) => rsocket.fireAndForget( { data: data ? inputCodec.encode(data) : Buffer.allocUnsafe(0), - metadata: encodeCompositeMetadata(metadata), + metadata: encodedMetadata, }, s ) ); + }; } export function requestResponse( @@ -71,23 +73,25 @@ export function requestResponse( outputCodec: Codec ): ( rsocket: RSocket, - metadata: Map + metadata?: Map ) => Observable { return ( rsocket: RSocket, - metadata: Map - ) => - new RSocketPublisherToObservable( + metadata?: Map + ) => { + const encodedMetadata = metadata ? encodeCompositeMetadata(metadata) : null; + return new RSocketPublisherToObservable( (s) => rsocket.requestResponse( { data: data ? inputCodec.encode(data) : Buffer.allocUnsafe(0), - metadata: encodeCompositeMetadata(metadata), + metadata: encodedMetadata, }, s ), outputCodec ); + }; } export function requestStream( @@ -98,18 +102,19 @@ export function requestStream( scheduler: SchedulerLike = asyncScheduler ): ( rsocket: RSocket, - metadata: Map + metadata?: Map ) => Observable { return ( rsocket: RSocket, - metadata: Map - ) => - new RSocketPublisherToPrefetchingObservable( + metadata?: Map + ) => { + const encodedMetadata = metadata ? encodeCompositeMetadata(metadata) : null; + return new RSocketPublisherToPrefetchingObservable( (s, n) => rsocket.requestStream( { data: data ? inputCodec.encode(data) : Buffer.allocUnsafe(0), - metadata: encodeCompositeMetadata(metadata), + metadata: encodedMetadata, }, n, s @@ -118,6 +123,7 @@ export function requestStream( outputCodec, scheduler ); + }; } export function requestChannel( @@ -128,9 +134,9 @@ export function requestChannel( scheduler: SchedulerLike = asyncScheduler ): ( rsocket: RSocket, - metadata: Map + metadata?: Map ) => Observable { - const [firstValueObservable, restValuestObservable] = partition( + const [firstValueObservable, restValuesObservable] = partition( datas.pipe( share({ connector: () => new Subject(), @@ -142,35 +148,37 @@ export function requestChannel( return ( rsocket: RSocket, - metadata: Map - ) => - firstValueObservable.pipe( + metadata?: Map + ) => { + const encodedMetadata = metadata ? encodeCompositeMetadata(metadata) : null; + return firstValueObservable.pipe( take(1), - concatMap( - (firstValue) => - new Observer2BufferingSubscriberToPublisher2PrefetchingObservable( - ( - s: OnTerminalSubscriber & - OnNextSubscriber & - OnExtensionSubscriber & - Requestable & - Cancellable - ) => - rsocket.requestChannel( - { - data: inputCodec.encode(firstValue), - metadata: encodeCompositeMetadata(metadata), - }, - prefetch, - false, - s - ), - prefetch, - restValuestObservable, - inputCodec, - outputCodec, - scheduler - ) as Observable - ) + concatMap((firstValue) => { + return new Observer2BufferingSubscriberToPublisher2PrefetchingObservable( + ( + s: OnTerminalSubscriber & + OnNextSubscriber & + OnExtensionSubscriber & + Requestable & + Cancellable + ) => { + return rsocket.requestChannel( + { + data: inputCodec.encode(firstValue), + metadata: encodedMetadata, + }, + prefetch, + false, + s + ); + }, + prefetch, + restValuesObservable, + inputCodec, + outputCodec, + scheduler + ) as Observable; + }) ); + }; } diff --git a/packages/rsocket-composite-metadata/package.json b/packages/rsocket-composite-metadata/package.json index c76a476..2964279 100644 --- a/packages/rsocket-composite-metadata/package.json +++ b/packages/rsocket-composite-metadata/package.json @@ -1,6 +1,6 @@ { "name": "rsocket-composite-metadata", - "version": "1.0.0-alpha.2", + "version": "1.0.0-alpha-rxjs-adapter-optional-metadata.0", "license": "Apache-2.0", "main": "dist/index", "types": "dist/index", @@ -19,7 +19,7 @@ "test": "yarn jest" }, "dependencies": { - "rsocket-core": "^1.0.0-alpha.1" + "rsocket-core": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0" }, "devDependencies": { "rimraf": "~3.0.2", diff --git a/packages/rsocket-core/package.json b/packages/rsocket-core/package.json index 4205ea6..a2dedc9 100644 --- a/packages/rsocket-core/package.json +++ b/packages/rsocket-core/package.json @@ -1,6 +1,6 @@ { "name": "rsocket-core", - "version": "1.0.0-alpha.1", + "version": "1.0.0-alpha-rxjs-adapter-optional-metadata.0", "license": "Apache-2.0", "main": "dist/index", "types": "dist/index", diff --git a/packages/rsocket-examples/package.json b/packages/rsocket-examples/package.json index 1ca9670..4e9a8e3 100644 --- a/packages/rsocket-examples/package.json +++ b/packages/rsocket-examples/package.json @@ -1,6 +1,6 @@ { "name": "rsocket-examples", - "version": "1.0.0-alpha.2", + "version": "1.0.0-alpha-rxjs-adapter-optional-metadata.0", "license": "Apache-2.0", "private": true, "files": [ @@ -19,13 +19,13 @@ "start-client-server-rx-composite-metadata-route": "ts-node -r tsconfig-paths/register src/rxjs/ClientServerCompositeMetadataRouteExample.ts" }, "dependencies": { - "rsocket-adapter-rxjs": "^1.0.0-alpha.2", - "rsocket-composite-metadata": "^1.0.0-alpha.2", - "rsocket-core": "^1.0.0-alpha.1", - "rsocket-tcp-client": "^1.0.0-alpha.1", - "rsocket-tcp-server": "^1.0.0-alpha.1", - "rsocket-websocket-client": "^1.0.0-alpha.1", - "rsocket-websocket-server": "^1.0.0-alpha.1", + "rsocket-adapter-rxjs": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", + "rsocket-composite-metadata": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", + "rsocket-core": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", + "rsocket-tcp-client": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", + "rsocket-tcp-server": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", + "rsocket-websocket-client": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", + "rsocket-websocket-server": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", "ws": "~8.2.3" }, "devDependencies": { diff --git a/packages/rsocket-messaging/package.json b/packages/rsocket-messaging/package.json index 261b20f..170cd85 100644 --- a/packages/rsocket-messaging/package.json +++ b/packages/rsocket-messaging/package.json @@ -1,6 +1,6 @@ { "name": "rsocket-messaging", - "version": "1.0.0-alpha.2", + "version": "1.0.0-alpha-rxjs-adapter-optional-metadata.0", "license": "Apache-2.0", "main": "dist/index", "types": "dist/index", @@ -19,8 +19,8 @@ "test": "echo \"Error: no test specified\" && exit 0" }, "dependencies": { - "rsocket-composite-metadata": "^1.0.0-alpha.2", - "rsocket-core": "^1.0.0-alpha.1" + "rsocket-composite-metadata": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", + "rsocket-core": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0" }, "devDependencies": { "rimraf": "~3.0.2", diff --git a/packages/rsocket-tcp-client/package.json b/packages/rsocket-tcp-client/package.json index a468de4..a07a0f1 100644 --- a/packages/rsocket-tcp-client/package.json +++ b/packages/rsocket-tcp-client/package.json @@ -1,6 +1,6 @@ { "name": "rsocket-tcp-client", - "version": "1.0.0-alpha.1", + "version": "1.0.0-alpha-rxjs-adapter-optional-metadata.0", "license": "Apache-2.0", "main": "dist/index", "types": "dist/index", @@ -20,7 +20,7 @@ "test": "jest" }, "dependencies": { - "rsocket-core": "^1.0.0-alpha.1" + "rsocket-core": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0" }, "devDependencies": { "rimraf": "~3.0.2", diff --git a/packages/rsocket-tcp-server/package.json b/packages/rsocket-tcp-server/package.json index 89cfda8..5022b4e 100644 --- a/packages/rsocket-tcp-server/package.json +++ b/packages/rsocket-tcp-server/package.json @@ -1,6 +1,6 @@ { "name": "rsocket-tcp-server", - "version": "1.0.0-alpha.1", + "version": "1.0.0-alpha-rxjs-adapter-optional-metadata.0", "license": "Apache-2.0", "main": "dist/index", "types": "dist/index", @@ -20,7 +20,7 @@ "test": "jest" }, "dependencies": { - "rsocket-core": "^1.0.0-alpha.1" + "rsocket-core": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0" }, "devDependencies": { "rimraf": "~3.0.2", diff --git a/packages/rsocket-websocket-client/package.json b/packages/rsocket-websocket-client/package.json index 5aa6b16..e6a5989 100644 --- a/packages/rsocket-websocket-client/package.json +++ b/packages/rsocket-websocket-client/package.json @@ -1,6 +1,6 @@ { "name": "rsocket-websocket-client", - "version": "1.0.0-alpha.1", + "version": "1.0.0-alpha-rxjs-adapter-optional-metadata.0", "license": "Apache-2.0", "main": "dist/index", "types": "dist/index", @@ -20,7 +20,7 @@ "test": "jest" }, "dependencies": { - "rsocket-core": "^1.0.0-alpha.1" + "rsocket-core": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0" }, "devDependencies": { "rimraf": "~3.0.2", diff --git a/packages/rsocket-websocket-server/package.json b/packages/rsocket-websocket-server/package.json index 9cbf620..69d653e 100644 --- a/packages/rsocket-websocket-server/package.json +++ b/packages/rsocket-websocket-server/package.json @@ -1,6 +1,6 @@ { "name": "rsocket-websocket-server", - "version": "1.0.0-alpha.1", + "version": "1.0.0-alpha-rxjs-adapter-optional-metadata.0", "license": "Apache-2.0", "main": "dist/index", "types": "dist/index", @@ -20,7 +20,7 @@ "test": "echo \"Error: no test specified\" && exit 0" }, "dependencies": { - "rsocket-core": "^1.0.0-alpha.1", + "rsocket-core": "^1.0.0-alpha-rxjs-adapter-optional-metadata.0", "ws": "~8.2.3" }, "devDependencies": {