-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon.ts
87 lines (78 loc) · 3.16 KB
/
common.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import * as grpc from "@grpc/grpc-js";
import {Ignore, ProjectionRequest, ProjectionResponse, ProjectionServer, ProjectionService} from "./compiled/proto/project";
import {Any} from "./compiled/google/protobuf/any";
import {ValidResult} from "./elasticProjectors";
import {messageTypeRegistry} from "./compiled/typeRegistry";
export interface EventHandler<T> {
(event: T): Any;
}
interface EventHandlerMap<T> {
eventType: string;
handler: EventHandler<T>;
}
export type AnyEventHandlerMap = EventHandlerMap<any>;
export type Projector = AnyEventHandlerMap[];
export function WrapToAny(event: ValidResult): Any {
return Any.fromPartial({
typeUrl: `/${event.$type}`,
value: messageTypeRegistry.get(event.$type)!.encode(event).finish()
});
}
export function project<T>(eventType: string, handler: EventHandler<T>): AnyEventHandlerMap {
// @ts-ignore
return {eventType, handler: x => handler(x as T)};
}
const getProjectionServer = (projector: Projector): ProjectionServer => {
const projections = new Map(projector.map(obj => [obj.eventType, obj.handler]));
const ignored: Any = WrapToAny(Ignore.fromJSON({}));
return {
project(call: grpc.ServerDuplexStream<ProjectionRequest, ProjectionResponse>): void {
call.on("data", (ctx: ProjectionRequest) => {
console.log(`(server) Got event: ${ctx.eventType}`);
if (ctx.eventType == undefined) {
call.write({
eventId: ctx.eventId,
operation: ignored,
metadata: {},
$type: ProjectionResponse.$type
});
} else {
const handler = projections.get(ctx.eventType);
if (handler) {
console.log(`(server) Invoking handler for event: ${ctx.eventType}`);
const result = ProjectionResponse.fromPartial({
eventId: ctx.eventId,
operation: handler(ctx.eventPayload),
metadata: {}
});
console.log(`(server) Responding with: ${result.operation?.$type}`);
call.write(result);
} else {
console.log(`(server) No handler found for event: ${ctx.eventType}`);
}
}
});
}
}
}
const getServer = (projector: Projector): grpc.Server => {
const server = new grpc.Server();
server.addService(ProjectionService, getProjectionServer(projector));
return server;
};
export const runServer = (host: string, projector: Projector): void => {
const server = getServer(projector);
console.log(`(server) Starting server on ${host}`);
server.bindAsync(
host,
grpc.ServerCredentials.createInsecure(),
(err: Error | null, port: number) => {
if (err) {
console.error(`Server error: ${err.message}`);
} else {
console.log(`Server bound on port: ${port}`);
server.start();
}
}
);
}