Skip to content

Commit

Permalink
Simplify conneciton retry logic, add backoff to block retries (#2301)
Browse files Browse the repository at this point in the history
* Simplify conneciton retry logic, add backoff to block retries

* Fix tests

* Update changelog
  • Loading branch information
stwiname authored Mar 15, 2024
1 parent e00c546 commit 53909b7
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 96 deletions.
3 changes: 3 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Graphql comments not being escaped (#2299)

### Changed
- Update connection retry logic and add backoff to fetch blocks retries (#2301)

## [7.4.1] - 2024-03-08
### Fixed
- Memory leak with workers and large number of (dynamic) datasources (#2292)
Expand Down
47 changes: 20 additions & 27 deletions packages/node-core/src/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {ApiConnectionError, ApiErrorType} from './api.connection.error';
import {IndexerEvent, NetworkMetadataPayload} from './events';
import {ConnectionPoolService} from './indexer';
import {getLogger} from './logger';
import {raceFulfilled, retryWithBackoff} from './utils';
import {backoffRetry, isBackoffError, raceFulfilled} from './utils';

const logger = getLogger('api');

Expand Down Expand Up @@ -40,7 +40,7 @@ export abstract class ApiService<A = any, SA = any, B extends Array<any> = any[]
return this._networkMeta;
}

private timeouts: Record<string, NodeJS.Timeout | undefined> = {};
private connectionRetrys: Record<string, Promise<void> | undefined> = {};

async fetchBlocks(heights: number[], numAttempts = MAX_RECONNECT_ATTEMPTS): Promise<B> {
return this.retryFetch(async () => {
Expand All @@ -51,23 +51,15 @@ export abstract class ApiService<A = any, SA = any, B extends Array<any> = any[]
}

protected async retryFetch(fn: () => Promise<B>, numAttempts = MAX_RECONNECT_ATTEMPTS): Promise<B> {
let reconnectAttempts = 0;
let lastError: Error | null = null;
while (reconnectAttempts < numAttempts) {
try {
return await fn();
} catch (e: any) {
lastError = e;
reconnectAttempts++;
try {
return await backoffRetry(fn, numAttempts);
} catch (e) {
if (isBackoffError(e)) {
logger.error(e.message);
throw e.lastError;
}
throw e;
}
if (lastError !== null) {
logger.error(
`Maximum number of retries (${numAttempts}) reached. See the following error for the underlying reason.`
);
throw lastError;
}
throw new Error(`Maximum number of retries (${numAttempts}) reached.`);
}

get api(): A {
Expand Down Expand Up @@ -117,7 +109,7 @@ export abstract class ApiService<A = any, SA = any, B extends Array<any> = any[]
throw this.metadataMismatchError('ChainId', network.chainId, chainId);
}

this.connectionPoolService.addToConnections(connection, endpoint);
void this.connectionPoolService.addToConnections(connection, endpoint);
} catch (e) {
logger.error(`Failed to init ${endpoint}: ${e}`);
endpointToApiIndex[endpoint] = null as unknown as IApiConnectionSpecific<A, SA, B>;
Expand All @@ -136,7 +128,7 @@ export abstract class ApiService<A = any, SA = any, B extends Array<any> = any[]
throw new Error('All endpoints failed to initialize. Please add healthier endpoints');
}

Promise.allSettled(connectionPromises).then((res) => {
void Promise.allSettled(connectionPromises).then((res) => {
// Retry failed connections in the background
for (const [index, endpoint] of failedConnections) {
this.retryConnection(createConnection, getChainId, network, index, endpoint, postConnectedHook);
Expand Down Expand Up @@ -176,15 +168,16 @@ export abstract class ApiService<A = any, SA = any, B extends Array<any> = any[]
endpoint: string,
postConnectedHook?: (connection: IApiConnectionSpecific, endpoint: string, index: number) => void
): void {
this.timeouts[endpoint] = retryWithBackoff(
() => this.performConnection(createConnection, getChainId, network, index, endpoint, postConnectedHook),
(error) => {
logger.error(`Initialization retry failed for ${endpoint}: ${error}`);
},
() => {
logger.error(`Initialization retry attempts exhausted for ${endpoint}`);
this.connectionRetrys[endpoint] = backoffRetry(async () => {
try {
await this.performConnection(createConnection, getChainId, network, index, endpoint, postConnectedHook);
} catch (e) {
logger.error(`Initialization retry failed for ${endpoint}: ${e}`);
throw e;
}
);
}, 5).catch((e) => {
logger.error(e, `Initialization retry attempts exhausted for ${endpoint}`);
});
}

protected metadataMismatchError(metadata: string, expected: string, actual: string): Error {
Expand Down
31 changes: 25 additions & 6 deletions packages/node-core/src/db/sync-helper.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,30 @@ describe('sync-helper', () => {
console.log(getFkConstraint('many_to_many_test_entities', 'AccountId'));
expect(getFkConstraint('ManyToManyTestEntities', 'AccountId')).toBe('many_to_many_test_entities_account_id_fkey');
});

it('Generate SQL statement for table creation with historical', () => {
const statement = generateCreateTableQuery(mockModel, 'test', false);
const expectedStatement = [
'CREATE TABLE IF NOT EXISTS "test"."test-table" ("id" text NOT NULL,\n "amount" numeric NOT NULL,\n "date" timestamp NOT NULL,\n "from_id" text NOT NULL,\n "_id" UUID NOT NULL,\n "_block_range" int8range NOT NULL,\n "last_transfer_block" integer, PRIMARY KEY ("_id"));',

`COMMENT ON COLUMN "test"."test-table"."id" IS E'id field is always required and must look like this';`,
`COMMENT ON COLUMN "test"."test-table"."amount" IS E'Amount that is transferred';`,
`COMMENT ON COLUMN "test"."test-table"."date" IS E'The date of the transfer';`,
`COMMENT ON COLUMN "test"."test-table"."from_id" IS E'The account that transfers are made from';`,
`COMMENT ON COLUMN "test"."test-table"."last_transfer_block" IS E'The most recent block on which we see a transfer involving this account';`,
{
sql: `COMMENT ON COLUMN "test"."test-table"."id" IS ?;`,
replacements: [`id field is always required and must look like this`],
},
{sql: `COMMENT ON COLUMN "test"."test-table"."amount" IS ?;`, replacements: [`Amount that is transferred`]},
{sql: `COMMENT ON COLUMN "test"."test-table"."date" IS ?;`, replacements: [`The date of the transfer`]},
{
sql: `COMMENT ON COLUMN "test"."test-table"."from_id" IS ?;`,
replacements: [`The account that transfers are made from`],
},
{
sql: `COMMENT ON COLUMN "test"."test-table"."last_transfer_block" IS ?;`,
replacements: [`The most recent block on which we see a transfer involving this account`],
},
];
expect(statement).toStrictEqual(expectedStatement);
});

it('Generate SQL statement for Indexes', () => {
const statement = generateCreateIndexQuery(
mockModel.options.indexes as any,
Expand All @@ -173,6 +184,7 @@ describe('sync-helper', () => {
`CREATE INDEX IF NOT EXISTS "0xb91efc8ed4021e6e" ON "test"."${mockModel.tableName}" ("id");`,
]);
});

it('Generate table statement no historical, no multi primary keys', () => {
jest.spyOn(mockModel, 'getAttributes').mockImplementationOnce(() => {
return {
Expand All @@ -190,9 +202,13 @@ describe('sync-helper', () => {
const statement = generateCreateTableQuery(mockModel, 'test', false);
expect(statement).toStrictEqual([
`CREATE TABLE IF NOT EXISTS "test"."test-table" ("id" text NOT NULL, PRIMARY KEY ("id"));`,
`COMMENT ON COLUMN "test"."test-table"."id" IS E'id field is always required and must look like this';`,
{
sql: `COMMENT ON COLUMN "test"."test-table"."id" IS ?;`,
replacements: ['id field is always required and must look like this'],
},
]);
});

it('Reference statement', () => {
const attribute = {
type: 'text',
Expand All @@ -219,6 +235,7 @@ describe('sync-helper', () => {
const statement = formatReferences(attribute, 'test');
expect(statement).toBe(`REFERENCES "test"."accounts" ("id") ON DELETE NO ACTION ON UPDATE CASCADE`);
});

it('Ensure correct foreignkey statement', () => {
jest.spyOn(mockModel, 'getAttributes').mockImplementationOnce(() => {
return {
Expand Down Expand Up @@ -324,6 +341,7 @@ describe('sync-helper', () => {
},
]);
});

it('Ensure correct enumTypeMap', async () => {
const sequelize = new Sequelize() as any;
sequelize.query.mockResolvedValue([
Expand Down Expand Up @@ -353,6 +371,7 @@ describe('sync-helper', () => {

expect(Array.from(v.entries())).toEqual(expectedMap);
});

it('sequelize to correct postgres type map', () => {
// TODO this should be using {types: postgres: ['jsonb']} instead of its key
const v = formatDataType({key: 'JSONB'} as any);
Expand Down
44 changes: 19 additions & 25 deletions packages/node-core/src/indexer/connectionPool.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {ApiConnectionError, ApiErrorType} from '../api.connection.error';
import {IApiConnectionSpecific} from '../api.service';
import {NodeConfig} from '../configure';
import {getLogger} from '../logger';
import {delay, retryWithBackoff} from '../utils';
import {backoffRetry, delay} from '../utils';
import {ConnectionPoolStateManager} from './connectionPoolState.manager';

const logger = getLogger('connection-pool');
Expand All @@ -34,7 +34,7 @@ type ResultCacheEntry<T> = {
export class ConnectionPoolService<T extends IApiConnectionSpecific<any, any, any>> implements OnApplicationShutdown {
private allApi: Record<string, T> = {};
private cachedEndpoint: string | undefined;
private reconnectingEndpoints: Record<string, NodeJS.Timeout | undefined> = {};
private reconnectingEndpoints: Record<string, Promise<void> | undefined> = {};
private resultCache: Array<ResultCacheEntry<number | ApiConnectionError['errorType']>> = [];
private lastCacheFlushTime: number = Date.now();
private cacheSizeThreshold = 10;
Expand Down Expand Up @@ -133,30 +133,24 @@ export class ConnectionPoolService<T extends IApiConnectionSpecific<any, any, an
async handleApiDisconnects(endpoint: string): Promise<void> {
logger.warn(`disconnected from ${endpoint}`);

const maxAttempts = 5;

const tryReconnect = async () => {
logger.info(`Attempting to reconnect to ${endpoint}`);

await this.allApi[endpoint].apiConnect();
await this.poolStateManager.setFieldValue(endpoint, 'connected', true);
this.reconnectingEndpoints[endpoint] = undefined;
logger.info(`Reconnected to ${endpoint} successfully`);
try {
logger.info(`Attempting to reconnect to ${endpoint}`);

await this.allApi[endpoint].apiConnect();
await this.poolStateManager.setFieldValue(endpoint, 'connected', true);
this.reconnectingEndpoints[endpoint] = undefined;
logger.info(`Reconnected to ${endpoint} successfully`);
} catch (e) {
logger.error(`Reconnection failed: ${e}`);
throw e;
}
};

this.reconnectingEndpoints[endpoint] = retryWithBackoff(
tryReconnect,
(error) => {
logger.error(`Reconnection failed: ${error}`);
},
// eslint-disable-next-line @typescript-eslint/no-misused-promises
async () => {
logger.error(`Reached max reconnection attempts. Removing connection ${endpoint} from pool.`);
await this.poolStateManager.removeFromConnections(endpoint);
},
0,
maxAttempts
);
this.reconnectingEndpoints[endpoint] = backoffRetry(tryReconnect, 5).catch(async (e: any) => {
logger.error(`Reached max reconnection attempts. Removing connection ${endpoint} from pool.`);
await this.poolStateManager.removeFromConnections(endpoint);
});

await this.handleConnectionStateChange();
logger.info(`reconnected to ${endpoint}!`);
Expand Down Expand Up @@ -205,10 +199,10 @@ export class ConnectionPoolService<T extends IApiConnectionSpecific<any, any, an
await this.updateNextConnectedApiIndex();
const disconnectedEndpoints = await this.poolStateManager.getDisconnectedEndpoints();
disconnectedEndpoints.map((endpoint) => {
if (this.reconnectingEndpoints[endpoint]) {
if (this.reconnectingEndpoints[endpoint] !== undefined) {
return;
}
this.handleApiDisconnects(endpoint);
void this.handleApiDisconnects(endpoint);
});
}

Expand Down
64 changes: 48 additions & 16 deletions packages/node-core/src/utils/promise.spec.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,54 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {delay, timeout} from './promise';

it('utils.promise delay()', async () => {
const start = new Date();
await delay(1);
const millsecDiff = new Date().getTime() - start.getTime();
expect(millsecDiff).toBeGreaterThanOrEqual(1000);
expect(millsecDiff).toBeLessThan(1050);
});
import {BackoffError, backoffRetry, delay, timeout} from './promise';

it('utils.promise timeout()', async () => {
const firstPromise = (async () => {
describe('Promise Utils', () => {
it('utils.promise delay()', async () => {
const start = new Date();
await delay(1);
return true;
})();
await expect(timeout(firstPromise, 2)).resolves.toEqual(true);
const secondPromise = delay(3);
await expect(timeout(secondPromise, 2)).rejects.toThrow(/timeout/);
const millsecDiff = new Date().getTime() - start.getTime();
expect(millsecDiff).toBeGreaterThanOrEqual(1000);
expect(millsecDiff).toBeLessThan(1050);
});

it('utils.promise timeout()', async () => {
const firstPromise = (async () => {
await delay(1);
return true;
})();
await expect(timeout(firstPromise, 2)).resolves.toEqual(true);
const secondPromise = delay(3);
await expect(timeout(secondPromise, 2)).rejects.toThrow(/timeout/);
});

describe('BackoffRetry', () => {
it(`doesn't retry with success`, async () => {
const fn = jest.fn(() => Promise.resolve());
await backoffRetry(fn, 5);
expect(fn).toHaveBeenCalledTimes(1);
});

it(`retries multiple times`, async () => {
let count = 0;

const fn = jest.fn(() => {
if (count < 1) {
count++;
return Promise.reject();
}
return Promise.resolve();
});

await backoffRetry(fn, 5);

expect(fn).toHaveBeenCalledTimes(2);
});

it(`throws when reaching the number of retries`, async () => {
const e = new Error('Test');
const fn = jest.fn(() => Promise.reject(e));
await expect(() => backoffRetry(fn, 2)).rejects.toEqual(new BackoffError(e));
});
});
});
45 changes: 23 additions & 22 deletions packages/node-core/src/utils/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,35 @@ export async function timeout<T>(promise: Promise<T>, sec: number, errMsg = 'tim
]);
}

function backoff(attempt: number): number {
return Math.pow(2, attempt) * 1000; // Exponential backoff
export class BackoffError extends Error {
readonly lastError: any;

constructor(lastError: any) {
super('Maximum number of retries reached');
this.lastError = lastError;
}
}

export function retryWithBackoff<T>(
tryFunction: () => Promise<T>,
onError: (error: any) => void,
onMaxAttempts: () => void,
attempt = 0,
maxAttempts = 5
): NodeJS.Timeout | undefined {
if (attempt >= maxAttempts) {
onMaxAttempts();
} else {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
const timeout = setTimeout(async () => {
try {
await tryFunction();
} catch (error) {
onError(error);
retryWithBackoff(tryFunction, onError, onMaxAttempts, attempt + 1, maxAttempts);
}
}, backoff(attempt));
export function isBackoffError(error: any): error is BackoffError {
return error instanceof BackoffError;
}

return timeout;
async function backoffRetryInternal<T>(fn: () => Promise<T>, maxAttempts: number, currentAttempt = 0): Promise<T> {
try {
return await fn();
} catch (e) {
if (maxAttempts - 1 === currentAttempt) {
throw new BackoffError(e);
}
await delay(Math.pow(2, currentAttempt));
return backoffRetryInternal(fn, maxAttempts, currentAttempt + 1);
}
}

export async function backoffRetry<T>(fn: () => Promise<T>, attempts = 5): Promise<T> {
return backoffRetryInternal(fn, attempts);
}

/* eslint-disable @typescript-eslint/no-misused-promises */
export async function raceFulfilled<T = any>(promises: Promise<T>[]): Promise<{result: T; fulfilledIndex: number}> {
//eslint-disable-next-line no-async-promise-executor
Expand Down

0 comments on commit 53909b7

Please sign in to comment.