Skip to content

Commit

Permalink
Merge branch '1.0.x-alpha' into feature/apollo-graphql-link-server
Browse files Browse the repository at this point in the history
  • Loading branch information
viglucci committed Sep 4, 2022
2 parents 4748c68 + f734934 commit f260f34
Show file tree
Hide file tree
Showing 24 changed files with 7,118 additions and 6,260 deletions.
6 changes: 6 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ ex: `[email protected]`

Lerna will not push the git tags after creation. You should push the git tags once you are confident in your changes.

### Example

```
lerna version prerelease --sign-git-commit
```

## Publishing

The `Test, Build, Release` Workflow on GitHub can be run to [manually trigger](https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow) publishing of packages to NPM. This workflow will only publish versions which do not already exist on NPM.
Expand Down
6 changes: 3 additions & 3 deletions packages/rsocket-adapter-rxjs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "rsocket-adapter-rxjs",
"version": "1.0.0-alpha.1",
"version": "1.0.0-alpha.4",
"license": "Apache-2.0",
"main": "dist/index",
"types": "dist/index",
Expand All @@ -19,8 +19,8 @@
"test": "echo \"Error: no test specified\" && exit 0"
},
"dependencies": {
"rsocket-core": "^1.0.0-alpha.1",
"rsocket-messaging": "^1.0.0-alpha.1",
"rsocket-core": "^1.0.0-alpha.3",
"rsocket-messaging": "^1.0.0-alpha.3",
"rxjs": "^7.4.0"
},
"devDependencies": {
Expand Down
110 changes: 63 additions & 47 deletions packages/rsocket-adapter-rxjs/src/Requesters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,23 @@ export function fireAndForget<TData>(
inputCodec: Codec<TData>
): (
rsocket: RSocket,
metadata: Map<string | number | WellKnownMimeType, Buffer>
metadata?: Map<string | number | WellKnownMimeType, Buffer>
) => Observable<void> {
return (
rsocket: RSocket,
metadata: Map<string | number | WellKnownMimeType, Buffer>
) =>
new RSocketPublisherToObservable((s) =>
metadata?: Map<string | number | WellKnownMimeType, Buffer>
) => {
const encodedMetadata = metadata ? encodeCompositeMetadata(metadata) : null;
return new RSocketPublisherToObservable((s) =>
rsocket.fireAndForget(
{
data: data ? inputCodec.encode(data) : Buffer.allocUnsafe(0),
metadata: encodeCompositeMetadata(metadata),
metadata: encodedMetadata,
},
s
)
);
};
}

export function requestResponse<TData, RData>(
Expand All @@ -71,23 +73,25 @@ export function requestResponse<TData, RData>(
outputCodec: Codec<RData>
): (
rsocket: RSocket,
metadata: Map<string | number | WellKnownMimeType, Buffer>
metadata?: Map<string | number | WellKnownMimeType, Buffer>
) => Observable<RData> {
return (
rsocket: RSocket,
metadata: Map<string | number | WellKnownMimeType, Buffer>
) =>
new RSocketPublisherToObservable(
metadata?: Map<string | number | WellKnownMimeType, Buffer>
) => {
const encodedMetadata = metadata ? encodeCompositeMetadata(metadata) : null;
return new RSocketPublisherToObservable(
(s) =>
rsocket.requestResponse(
{
data: data ? inputCodec.encode(data) : Buffer.allocUnsafe(0),
metadata: encodeCompositeMetadata(metadata),
metadata: encodedMetadata,
},
s
),
outputCodec
);
};
}

export function requestStream<TData, RData>(
Expand All @@ -98,18 +102,19 @@ export function requestStream<TData, RData>(
scheduler: SchedulerLike = asyncScheduler
): (
rsocket: RSocket,
metadata: Map<string | number | WellKnownMimeType, Buffer>
metadata?: Map<string | number | WellKnownMimeType, Buffer>
) => Observable<RData> {
return (
rsocket: RSocket,
metadata: Map<string | number | WellKnownMimeType, Buffer>
) =>
new RSocketPublisherToPrefetchingObservable(
metadata?: Map<string | number | WellKnownMimeType, Buffer>
) => {
const encodedMetadata = metadata ? encodeCompositeMetadata(metadata) : null;
return new RSocketPublisherToPrefetchingObservable(
(s, n) =>
rsocket.requestStream(
{
data: data ? inputCodec.encode(data) : Buffer.allocUnsafe(0),
metadata: encodeCompositeMetadata(metadata),
metadata: encodedMetadata,
},
n,
s
Expand All @@ -118,6 +123,7 @@ export function requestStream<TData, RData>(
outputCodec,
scheduler
);
};
}

export function requestChannel<TData, RData>(
Expand All @@ -128,49 +134,59 @@ export function requestChannel<TData, RData>(
scheduler: SchedulerLike = asyncScheduler
): (
rsocket: RSocket,
metadata: Map<string | number | WellKnownMimeType, Buffer>
metadata?: Map<string | number | WellKnownMimeType, Buffer>
) => Observable<RData> {
const [firstValueObservable, restValuestObservable] = partition(
let once = false;
const [firstValueObservable, restValuesObservable] = partition(
datas.pipe(
share({
connector: () => new Subject(),
resetOnRefCountZero: true,
})
),
(_value, index) => index === 0
(_value) => {
const previous = once;
if (!previous) {
once = true;
}

return !previous;
}
);

return (
rsocket: RSocket,
metadata: Map<string | number | WellKnownMimeType, Buffer>
) =>
firstValueObservable.pipe(
metadata?: Map<string | number | WellKnownMimeType, Buffer>
) => {
const encodedMetadata = metadata ? encodeCompositeMetadata(metadata) : null;
return firstValueObservable.pipe(
take(1),
concatMap(
(firstValue) =>
new Observer2BufferingSubscriberToPublisher2PrefetchingObservable(
(
s: OnTerminalSubscriber &
OnNextSubscriber &
OnExtensionSubscriber &
Requestable &
Cancellable
) =>
rsocket.requestChannel(
{
data: inputCodec.encode(firstValue),
metadata: encodeCompositeMetadata(metadata),
},
prefetch,
false,
s
),
prefetch,
restValuestObservable,
inputCodec,
outputCodec,
scheduler
) as Observable<RData>
)
concatMap((firstValue) => {
return new Observer2BufferingSubscriberToPublisher2PrefetchingObservable(
(
s: OnTerminalSubscriber &
OnNextSubscriber &
OnExtensionSubscriber &
Requestable &
Cancellable
) => {
return rsocket.requestChannel(
{
data: inputCodec.encode(firstValue),
metadata: encodedMetadata,
},
prefetch,
false,
s
);
},
prefetch,
restValuesObservable,
inputCodec,
outputCodec,
scheduler
) as Observable<RData>;
})
);
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`encodeWellKnownMetadataHeader encodes the header as per spec 1`] = `
Array [
133,
0,
0,
16,
]
`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { encodeAndAddCustomMetadata } from "rsocket-composite-metadata";
import { hex } from "./test-utils/hex";

describe("encodeAndAddCustomMetadata", () => {
it("throws if custom mimtype length is less than 1", () => {
expect(() =>
encodeAndAddCustomMetadata(Buffer.from([]), "", Buffer.from("1234"))
).toThrow(
"Custom mime type must have a strictly positive length that fits on 7 unsigned bits, ie 1-128"
);
});

it("throws if custom mimtype length is greater than 127", () => {
let mime = "";
while (mime.length < 130) {
mime += "a";
}
expect(() =>
encodeAndAddCustomMetadata(Buffer.from([]), mime, Buffer.from("1234"))
).toThrow(
"Custom mime type must have a strictly positive length that fits on 7 unsigned bits, ie 1-128"
);
});

it("encodes the header and payload as per spec", () => {
const { c, u, s, t, o, m } = hex;
const metadata = encodeAndAddCustomMetadata(
Buffer.from([]),
"custom",
Buffer.from("1234")
);
const expectedHeaderLength8 = "05";
const expectedPayloadLength24 = "000004";
const expectedHeader = `${expectedHeaderLength8}${c}${u}${s}${t}${o}${m}${expectedPayloadLength24}`;
const expectedPayload = `${hex["1"]}${hex["2"]}${hex["3"]}${hex["4"]}`;
expect(metadata.toString("hex")).toBe(
`${expectedHeader}${expectedPayload}`
);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {
encodeAndAddWellKnownMetadata,
WellKnownMimeType,
} from "rsocket-composite-metadata";
import { readUInt24BE } from "rsocket-core";

describe("encodeWellKnownMetadataHeader", () => {
it("encodes the header as per spec when WellKnownMimeType given", () => {
const metadata = encodeAndAddWellKnownMetadata(
Buffer.from([]),
WellKnownMimeType.MESSAGE_RSOCKET_MIMETYPE,
Buffer.from("test")
);

// 122 | 128
const maskedId = metadata.readUInt8(0);
const length = readUInt24BE(metadata, 1);
const value = metadata.slice(4, metadata.length);

expect(maskedId).toBe(250);
expect(length).toBe(4);
expect(value.length).toBe(4);
expect(value.toString("utf-8")).toBe("test");
});

it("encodes the header as per spec when identifier given", () => {
const metadata = encodeAndAddWellKnownMetadata(
Buffer.from([]),
// MESSAGE_RSOCKET_MIMETYPE
122,
Buffer.from("test")
);

// 122 | 128
const maskedId = metadata.readUInt8(0);
const length = readUInt24BE(metadata, 1);
const value = metadata.slice(4, metadata.length);

expect(maskedId).toBe(250);
expect(length).toBe(4);
expect(value.length).toBe(4);
expect(value.toString("utf-8")).toBe("test");
});
});
Loading

0 comments on commit f260f34

Please sign in to comment.