Skip to content

Commit

Permalink
Merge pull request #54 from Kiruse/main
Browse files Browse the repository at this point in the history
WebSocketClient: fix unexpected eof + improve subscribe & query
  • Loading branch information
emidev98 authored Nov 9, 2023
2 parents 137088c + c565d2b commit 8e5b3e8
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 108 deletions.
171 changes: 63 additions & 108 deletions src/client/WebSocketClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EventEmitter } from 'events';
import WebSocket from 'ws';
import { hashToHex } from '../util/hash';
import { TendermintQuery } from '../util/TendermintQuery';

type Callback = (data: TendermintSubscriptionResponse) => void;

Expand Down Expand Up @@ -28,112 +29,54 @@ export type TendermintEventType =
| 'ValidBlock'
| 'Vote';

type TendermintQueryOperand = string | number | Date;

export interface TendermintQuery {
[k: string]:
| TendermintQueryOperand
| ['>', number | Date]
| ['<', number | Date]
| ['<=', number | Date]
| ['>=', number | Date]
| ['CONTAINS', string]
| ['EXISTS'];
}

const escapeSingleQuotes = (str: string) => str.replace(/'/g, "\\'");

function makeQueryParams(query: TendermintQuery): string {
const queryBuilder: string[] = [];
for (const key of Object.keys(query)) {
let queryItem: string;
const value = query[key];
// if value is scalar
if (!Array.isArray(value)) {
switch (typeof value) {
case 'number':
queryItem = `${key}=${value}`;
break;
case 'string':
queryItem = `${key}='${escapeSingleQuotes(value)}'`;
break;
default:
// Date
queryItem = `${key}=${value.toISOString()}`;
}
} else {
switch (value[0]) {
case '>':
case '<':
case '<=':
case '>=':
if (typeof value[1] !== 'number') {
queryItem = `${key}${value[0]}${value[1].toISOString()}`;
} else {
queryItem = `${key}${value[0]}${value[1]}`;
}
break;
case 'CONTAINS':
queryItem = `${key} CONTAINS '${escapeSingleQuotes(value[1])}'`;
break;
case 'EXISTS':
queryItem = `${key} EXISTS`;
break;
}
}
queryBuilder.push(queryItem);
}
return queryBuilder.join(' AND ');
}

/**
* An object repesenting a connection to a Terra node's WebSocket RPC endpoint.
* This allows for subscribing to Tendermint events through WebSocket.
*
* ### Events
* **error** emitted when error raises
* **connect** emitted after connection establishment
* **reconnect** emitted upon every attempt of reconnection
* **destroyed** emitted when socket has been destroyed
* - **error** emitted when error raises
* - **connect** emitted after connection establishment
* - **reconnect** emitted upon every attempt of reconnection
* - **destroyed** emitted when socket has been destroyed
*
* ### Example
*
* ```ts
* import { WebSocketClient } from '@terra-money/terra.js';
* import { TendermintQuery, WebSocketClient } from '@terra-money/terra.js';
*
* const wsclient = new WebSocketClient("ws://localhost:26657/websocket");
* wsclient.start();
* wsclient.on('connect', () => {
* wsclient.subscribe('NewBlock', new TendermintQuery(), (data) => {
* console.log(data.value);
*
* wsclient.subscribe('NewBlock', {}, (data) => {
* console.log(data.value);
*
* // close after receiving one block.
* wsclient.destroy();
* })
* // close after receiving one block.
* wsclient.destroy();
* });
*
* wsclient.subscribe(
* 'Tx',
* {
* 'message.action': 'send',
* 'message.sender': ['CONTAINS', 'terra1...'],
* },
* (data) => {
* console.log(data.value);
* wsclient.subscribe(
* 'Tx',
* new TendermintQuery()
* .exact('message.action', 'send')
* .contains('message.sender', 'terra1...'),
* (data) => {
* console.log(data.value);
*
* // close after receiving one send Tx
* wsclient.destroy();
* // close after receiving one send Tx
* wsclient.destroy();
* },
* );
* });
*
* wsclient.start();
* ```
*/
export class WebSocketClient extends EventEmitter {
public isConnected: boolean;
private _connected: boolean;
private reconnectTimeoutId?: NodeJS.Timeout;
private queryParams?: string;
private callback?: Callback;
private callbacks = new Map<number, (data: any) => void>();
private shouldAttemptReconnect: boolean;
private socket!: WebSocket;
private _reconnectCount: number;
private _nextSubId = 1;

/**
* WebSocketClient constructor
Expand All @@ -149,7 +92,7 @@ export class WebSocketClient extends EventEmitter {
) {
super();
this._reconnectCount = this.reconnectCount;
this.isConnected = false;
this._connected = false;
this.shouldAttemptReconnect = !!this.reconnectInterval;
}

Expand All @@ -171,41 +114,42 @@ export class WebSocketClient extends EventEmitter {
this.socket.onerror = () => undefined;
}

send(data: any) {
if (this.socket) {
this.socket.send(JSON.stringify(data));
}
return this;
}

private onOpen() {
this.isConnected = true;
this._connected = true;
this.emit('connect');
// reset reconnectCount after connection establishment
this._reconnectCount = this.reconnectCount;

this.socket.send(
JSON.stringify({
jsonrpc: '2.0',
method: 'subscribe',
params: [this.queryParams],
id: 1,
})
);
}

private onMessage(message: WebSocket.MessageEvent) {
const data = message.data.toString();

// ignore empty messages. fixes "unexpected EOF"
if (!data.trim()) return;

try {
const parsedData = JSON.parse(message.data.toString());
if (!('result' in parsedData && 'id' in parsedData)) {
throw new Error('Invalid message format');
}

if (
this.callback &&
parsedData.result &&
parsedData.result.query === this.queryParams
) {
// this.emit('message', parsedData.result.data);
this.callback(parsedData.result.data);
if (parsedData.result?.data) {
this.callbacks.get(parsedData.id)?.(parsedData.result.data);
}
} catch (err) {
this.emit('error', err);
}
}

private onClose() {
this.isConnected = false;
this._connected = false;

if (
this.shouldAttemptReconnect &&
Expand All @@ -229,12 +173,19 @@ export class WebSocketClient extends EventEmitter {
event: TendermintEventType,
query: TendermintQuery,
callback: Callback
): void {
this.queryParams = makeQueryParams({
'tm.event': event,
...query,
) {
const id = this._nextSubId++;
query = query.clone().exact('tm.event', event);

this.send({
jsonrpc: '2.0',
method: 'subscribe',
params: [query.toString()],
id,
});
this.callback = callback;
this.callbacks.set(id, callback);

return this;
}

public subscribeTx(query: TendermintQuery, callback: Callback): void {
Expand All @@ -245,4 +196,8 @@ export class WebSocketClient extends EventEmitter {

this.subscribe('Tx', query, newCallback);
}

get isConnected() {
return this._connected;
}
}
57 changes: 57 additions & 0 deletions src/util/TendermintQuery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
const escape = (str: string) => str.replace(/\\/g, '\\\\').replace(/'/g, "\\'");

export class TendermintQuery {
private _query: string[] = [];

getValue(value: number | string | Date) {
if (typeof value === 'number') {
return value;
} else if (typeof value === 'string') {
return `'${escape(value)}'`;
} else {
return value.toISOString();
}
}

exact(field: string, value: number | string | Date) {
this._query.push(`${field}=${this.getValue(value)}`);
return this;
}

compare(field: string, op: `${'<' | '>'}${'' | '='}`, value: number | Date) {
this._query.push(`${field}${op}${this.getValue(value)}`);
return this;
}

exists(field: string) {
this._query.push(`${field} EXISTS`);
return this;
}

contains(field: string, value: string) {
this._query.push(`${field} CONTAINS '${escape(value)}'`);
return this;
}

clone() {
const q = new TendermintQuery();
q._query = this._query.slice();
return q;
}

toString() {
return this._query.join(' AND ');
}

static AND(lhs: TendermintQuery, rhs: TendermintQuery) {
const q = new TendermintQuery();
q._query.push(`(${lhs}) AND (${rhs})`);
return q;
}

static OR(lhs: TendermintQuery, rhs: TendermintQuery) {
const q = new TendermintQuery();
q._query.push(`(${lhs}) OR (${rhs})`);
return q;
}
}
1 change: 1 addition & 0 deletions src/util/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './hash';
export * from './contract';
export * from './TendermintQuery';

0 comments on commit 8e5b3e8

Please sign in to comment.