diff --git a/packages/apollo-links/CHANGELOG.md b/packages/apollo-links/CHANGELOG.md index a6c2769b..f67c85d0 100644 --- a/packages/apollo-links/CHANGELOG.md +++ b/packages/apollo-links/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.3.0] - 2024-01-22 + ## [1.2.6] - 2024-01-11 ## [1.2.5] - 2023-12-21 @@ -92,7 +94,8 @@ Breaking change for `dictHttpLink` and `deploymentHttpLink`, use `const { link } - Add Authlink for Apollo client -[unreleased]: https://github.com/subquery/network-clients/compare/v1.2.6...HEAD +[unreleased]: https://github.com/subquery/network-clients/compare/v1.3.0...HEAD +[1.3.0]: https://github.com/subquery/network-clients/compare/v1.2.6...v1.3.0 [1.2.6]: https://github.com/subquery/network-clients/compare/v1.2.4...v1.2.6 [1.2.5]: https://github.com/subquery/network-clients/compare/v1.2.4...v1.2.5 [1.2.4]: https://github.com/subquery/network-clients/compare/v1.2.3...v1.2.4 diff --git a/packages/apollo-links/package.json b/packages/apollo-links/package.json index 8517adc8..85811c73 100644 --- a/packages/apollo-links/package.json +++ b/packages/apollo-links/package.json @@ -1,6 +1,6 @@ { "name": "@subql/apollo-links", - "version": "1.2.6", + "version": "1.3.0", "description": "SubQuery Network - graphql links", "main": "dist/index.js", "author": "SubQuery Pte Limited", @@ -15,7 +15,6 @@ "apollo-link-error": "^1.1.13", "buffer": "^6.0.3", "cross-fetch": "^4.0.0", - "ethers": "^5.6.8", "js-base64": "^3.7.5", "jwt-decode": "^3.1.2", "lru-cache": "^10.0.1" diff --git a/packages/apollo-links/src/authHttpLink.ts b/packages/apollo-links/src/authHttpLink.ts index 02cdf38f..9807810a 100644 --- a/packages/apollo-links/src/authHttpLink.ts +++ b/packages/apollo-links/src/authHttpLink.ts @@ -79,6 +79,7 @@ function authHttpLink(options: AuthOptions): AuthHttpLink { const logger = _logger ?? silentLogger(); const orderManager = new OrderManager({ authUrl, + fallbackServiceUrl, projectId: deploymentId, projectType, logger, @@ -91,7 +92,7 @@ function authHttpLink(options: AuthOptions): AuthHttpLink { const retryLink = createRetryLink({ orderManager, logger, maxRetries }); const fallbackLink = new FallbackLink(fallbackServiceUrl, logger); const httpLink = new DynamicHttpLink({ httpOptions, logger }); - const responseLink = new ResponseLink({ authUrl, logger }); + const responseLink = new ResponseLink({ authUrl, orderManager, logger }); const errorLink = creatErrorLink({ orderManager, fallbackLink, diff --git a/packages/apollo-links/src/core/clusterAuthLink.ts b/packages/apollo-links/src/core/clusterAuthLink.ts index fa585266..2432a93d 100644 --- a/packages/apollo-links/src/core/clusterAuthLink.ts +++ b/packages/apollo-links/src/core/clusterAuthLink.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import { ApolloLink, FetchResult, NextLink, Observable, Operation } from '@apollo/client/core'; -import { OrderManager } from '@subql/network-support'; +import { OrderManager, generateUniqueId } from '@subql/network-support'; import { Subscription } from 'zen-observable-ts'; import { Logger } from '../utils/logger'; @@ -33,7 +33,7 @@ export class ClusterAuthLink extends ApolloLink { let sub: Subscription; this.orderManager - .getRequestParams() + .getRequestParams(this.getRequestId(operation)) .then((params) => { if (params) { const { headers, url, type, runner } = params; @@ -69,4 +69,12 @@ export class ClusterAuthLink extends ApolloLink { private tokenToAuthHeader(token: string) { return { authorization: `Bearer ${token}` }; } + + private getRequestId(operation: Operation): string { + let { requestId } = operation.getContext(); + if (requestId) return requestId; + requestId = generateUniqueId(); + operation.setContext({ requestId }); + return requestId; + } } diff --git a/packages/apollo-links/src/core/responseLink.ts b/packages/apollo-links/src/core/responseLink.ts index 07600582..71891337 100644 --- a/packages/apollo-links/src/core/responseLink.ts +++ b/packages/apollo-links/src/core/responseLink.ts @@ -2,12 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 import { ApolloLink, FetchResult, NextLink, Observable, Operation } from '@apollo/client/core'; -import { ChannelState, OrderType, POST } from '@subql/network-support'; +import { ChannelState, OrderManager, OrderType, POST, ScoreType } from '@subql/network-support'; import { Base64 } from 'js-base64'; import { Logger } from '../utils/logger'; export type ResponseLinkOptions = { authUrl: string; + orderManager: OrderManager; logger?: Logger; }; @@ -41,11 +42,13 @@ export class ResponseLink extends ApolloLink { override request(operation: Operation, forward: NextLink): Observable | null { if (!forward) return null; - const { type } = operation.getContext(); + const { type, indexer } = operation.getContext(); return new Observable((observer) => { const subscription = forward(operation).subscribe({ next: (response: FetchResult> & { state: ChannelState }) => { + this.options.orderManager.updateScore(indexer, ScoreType.SUCCESS); + if (type === OrderType.flexPlan) { const responseHeaders = operation.getContext().response.headers; if (responseHeaders) { diff --git a/packages/eth-provider/CHANGELOG.md b/packages/eth-provider/CHANGELOG.md index 6b392160..6e0b57f4 100644 --- a/packages/eth-provider/CHANGELOG.md +++ b/packages/eth-provider/CHANGELOG.md @@ -7,11 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.0] 2024-01-22 + ## [0.1.0] 2023-11-27 ### Added - Add Auth eth provider for rpc endpoint. -[unreleased]: https://github.com/subquery/eth-provider/compare/v0.1.0...HEAD +[unreleased]: https://github.com/subquery/eth-provider/compare/v0.2.0...HEAD +[0.2.0]: https://github.com/subquery/eth-provider/releases/tag/v0.2.0 [0.1.0]: https://github.com/subquery/eth-provider/releases/tag/v0.1.0 diff --git a/packages/eth-provider/package.json b/packages/eth-provider/package.json index 54f2b6cb..8d2e37e5 100644 --- a/packages/eth-provider/package.json +++ b/packages/eth-provider/package.json @@ -1,6 +1,6 @@ { "name": "@subql/eth-provider", - "version": "0.1.0", + "version": "0.2.0", "main": "dist/index.js", "author": "SubQuery Pte Limited", "license": "Apache-2.0", diff --git a/packages/eth-provider/src/provider.ts b/packages/eth-provider/src/provider.ts index 6a7cdbdd..c6d334be 100644 --- a/packages/eth-provider/src/provider.ts +++ b/packages/eth-provider/src/provider.ts @@ -14,6 +14,7 @@ import { ProjectType, ResponseFormat, ScoreType, + generateUniqueId, silentLogger, } from '@subql/network-support'; import { Base64 } from 'js-base64'; @@ -87,8 +88,9 @@ export class SubqueryAuthedRpcProvider extends JsonRpcProvider { return super._cache[method]; } let retries = 0; + const requestId = generateUniqueId(); const requestResult: () => Promise = async () => { - const requestParams = await this.orderManager.getRequestParams(); + const requestParams = await this.orderManager.getRequestParams(requestId); if (requestParams) { // eslint-disable-next-line @typescript-eslint/unbound-method const { url, headers, type, runner } = requestParams; @@ -107,6 +109,7 @@ export class SubqueryAuthedRpcProvider extends JsonRpcProvider { if (!result) { throw new Error('Request RPC error'); } + this.orderManager.updateScore(runner, ScoreType.SUCCESS); return result; } catch (err) { if (retries < this.maxRetries) { diff --git a/packages/network-support/CHANGELOG.md b/packages/network-support/CHANGELOG.md index 2f788634..806dcd68 100644 --- a/packages/network-support/CHANGELOG.md +++ b/packages/network-support/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.0] 2024-01-12 + ## [0.1.1] 2024-01-11 ## [0.1.0] 2023-11-27 @@ -15,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - It's a internal library. -[unreleased]: https://github.com/subquery/network-support/compare/v0.1.1...HEAD +[unreleased]: https://github.com/subquery/network-support/compare/v0.2.0...HEAD +[0.2.0]: https://github.com/subquery/network-support/releases/tag/v0.2.0 [0.1.1]: https://github.com/subquery/network-support/releases/tag/v0.1.1 [0.1.0]: https://github.com/subquery/network-support/releases/tag/v0.1.0 diff --git a/packages/network-support/package.json b/packages/network-support/package.json index c17c1f9e..a4d1dba7 100644 --- a/packages/network-support/package.json +++ b/packages/network-support/package.json @@ -1,6 +1,6 @@ { "name": "@subql/network-support", - "version": "0.1.1", + "version": "0.2.0", "main": "dist/index.js", "author": "SubQuery Pte Limited", "license": "Apache-2.0", diff --git a/packages/network-support/src/index.ts b/packages/network-support/src/index.ts index 6185dc14..76685dba 100644 --- a/packages/network-support/src/index.ts +++ b/packages/network-support/src/index.ts @@ -3,4 +3,5 @@ export * from './types'; export * from './orderManager'; +export * from './scoreManager'; export * from './utils'; diff --git a/packages/network-support/src/orderManager.ts b/packages/network-support/src/orderManager.ts index d7d2e118..9fee79c3 100644 --- a/packages/network-support/src/orderManager.ts +++ b/packages/network-support/src/orderManager.ts @@ -3,6 +3,7 @@ import assert from 'assert'; import { Base64 } from 'js-base64'; +import { ScoreManager, ScoreType } from './scoreManager'; import { ChannelAuth, ChannelState, @@ -27,6 +28,7 @@ export enum ResponseFormat { type Options = { logger: Logger; authUrl: string; + fallbackServiceUrl?: string; projectId: string; projectType: ProjectType; responseFormat?: ResponseFormat; @@ -35,18 +37,6 @@ type Options = { timeout?: number; }; -export enum ScoreType { - GRAPHQL = 'Graphql', - NETWORK = 'network', - RPC = 'RPC', -} - -const scoresDelta = { - [ScoreType.GRAPHQL]: 50, - [ScoreType.NETWORK]: 10, - [ScoreType.RPC]: 10, -}; - function tokenToAuthHeader(token: string) { return `Bearer ${token}`; } @@ -60,9 +50,11 @@ export class OrderManager { private _plans: FlexPlanOrder[] = []; private logger: Logger; - private scoreStore: IStore; private timer: NodeJS.Timeout | undefined; + private selectedRunnersStore: IStore; + private scoreManager: ScoreManager; + private authUrl: string; private projectId: string; private interval = 300_000; @@ -76,6 +68,7 @@ export class OrderManager { constructor(options: Options) { const { authUrl, + fallbackServiceUrl, projectId, logger, projectType, @@ -88,9 +81,16 @@ export class OrderManager { this.projectId = projectId; this.projectType = projectType; this.logger = logger; - this.scoreStore = scoreStore ?? createStore({ ttl: 86_400_000 }); this.responseFormat = responseFormat; + this.selectedRunnersStore = createStore({ ttl: 600_000 }); + this.scoreManager = new ScoreManager({ + logger, + projectId, + fallbackServiceUrl, + scoreStore, + }); + this._init = this.refreshAgreements(); // eslint-disable-next-line @typescript-eslint/no-misused-promises this.timer = setInterval(() => this.refreshAgreements(), this.interval); @@ -156,27 +156,23 @@ export class OrderManager { return `$query-score-${runner}-${this.projectId}`; } - private getScore(runner: string) { - const key = this.getCacheKey(runner); - let score = this.scoreStore.get(key); - if (score === undefined) { - score = 100; - this.scoreStore.set(key, score); - } - return score; + private filterOrdersByScore(orders: Order[]) { + return orders.filter(({ indexer }) => this.scoreManager.getScore(indexer) > this.minScore); } - private filterOrdersByScore(orders: Order[]) { - return orders.filter(({ indexer }) => this.getScore(indexer) > this.minScore); + private filterOrdersByRequestId(requestId: string, orders: Order[]) { + if (!requestId) return orders; + const selected = this.getSelectedRunners(requestId); + return orders.filter(({ indexer }) => !selected.includes(indexer)); } private getNextOrderIndex(total: number, currentIndex: number) { return currentIndex < total - 1 ? currentIndex + 1 : 0; } - async getRequestParams(): Promise { + async getRequestParams(requestId: string): Promise { const innerRequest = async () => { - const order = await this.getNextOrder(); + const order = await this.getNextOrder(requestId); const headers: RequestParam['headers'] = {}; if (order) { const { type, indexer: runner, url, id } = order; @@ -284,59 +280,76 @@ export class OrderManager { } } - private async getNextOrder(): Promise { + private async getNextOrder(requestId: string): Promise { await this._init; - const agreementsOrders = await this.getNextAgreement(); + const agreementsOrders = await this.getNextAgreement(requestId); if (agreementsOrders) { return { ...agreementsOrders, type: OrderType.agreement }; } - const flexPlanOrders = await this.getNextPlan(); + const flexPlanOrders = await this.getNextPlan(requestId); if (flexPlanOrders) { return { ...flexPlanOrders, type: OrderType.flexPlan }; } return undefined; } - protected async getNextAgreement(): Promise { + protected async getNextAgreement(requestId: string): Promise { await this._init; if (!this.agreements) return; - const agreements = this.filterOrdersByScore(this.agreements) as ServiceAgreementOrder[]; + const agreements = this.filterOrdersByRequestId(requestId, this.agreements); this.logger?.debug(`available agreements count: ${agreements.length}`); if (!this.healthy || !agreements.length) return; - if (this.nextAgreementIndex === undefined) { - this.nextAgreementIndex = this.getRandomStartIndex(agreements.length); - } - - const agreement = agreements[this.nextAgreementIndex]; - this.nextAgreementIndex = this.getNextOrderIndex(agreements.length, this.nextAgreementIndex); + const agreement = this.selectRunner(agreements) as ServiceAgreementOrder; this.logger?.debug(`next agreement: ${JSON.stringify(agreement.indexer)}`); + if (agreement) { + this.updateSelectedRunner(requestId, agreement.indexer); + } + return agreement; } - protected async getNextPlan(): Promise { + protected async getNextPlan(requestId: string): Promise { await this._init; if (!this.plans) return; - const plans = this.filterOrdersByScore(this.plans); + const plans = this.filterOrdersByRequestId(requestId, this.plans); if (!this.healthy || !plans?.length) return; - if (this.nextPlanIndex === undefined) { - this.nextPlanIndex = this.getRandomStartIndex(plans.length); - } + const plan = this.selectRunner(plans); - const plan = plans[this.nextPlanIndex]; - this.nextPlanIndex = this.getNextOrderIndex(plans.length, this.nextPlanIndex); + if (plan) { + this.updateSelectedRunner(requestId, plan.indexer); + } return plan; } + private selectRunner(orders: Order[]): Order | undefined { + if (!orders.length) return; + const scores = orders.map((o) => this.scoreManager.getScore(o.indexer)); + const random = Math.random() * scores.reduce((a, b) => a + b, 0); + this.logger?.debug(`selectRunner: indexers: ${orders.map((o) => o.indexer)}`); + this.logger?.debug(`selectRunner: scores: ${scores}`); + this.logger?.debug(`selectRunner: random: ${random}`); + let sum = 0; + for (let i = 0; i < scores.length; i++) { + if (scores[i] === 0) continue; + sum += scores[i]; + if (random <= sum) { + this.logger?.debug(`selectRunner: selected index: ${i}`); + this.logger?.debug(`selectRunner: selected indexer: ${orders[i].indexer}`); + return orders[i]; + } + } + } + async refreshAgreementToken(agreementId: string, runner: string): Promise { this.logger?.debug(`request new token for runner ${runner}`); const tokenUrl = new URL('/orders/token', this.authUrl); @@ -350,6 +363,18 @@ export class OrderManager { return res.token; } + private getSelectedRunners(requestId: string): string[] { + if (!requestId) return []; + return this.selectedRunnersStore.get(requestId) || []; + } + + private updateSelectedRunner(requestId: string, runner: string) { + if (!requestId || !runner) return; + const runners = this.getSelectedRunners(requestId) ?? []; + if (runners.includes(runner)) return; + this.selectedRunnersStore.set(requestId, [...runners, runner]); + } + protected updateTokenById(agreementId: string, token: string) { if (this.agreements === undefined) return; const index = this.agreements?.findIndex((a) => a.id === agreementId); @@ -359,13 +384,7 @@ export class OrderManager { } updateScore(runner: string, errorType: ScoreType) { - const key = this.getCacheKey(runner); - const score = this.scoreStore.get(key) ?? 100; - - const delta = scoresDelta[errorType]; - const newScore = Math.max(score - delta, 0); - - this.scoreStore.set(key, newScore); + this.scoreManager.updateScore(runner, errorType); } cleanup() { diff --git a/packages/network-support/src/scoreManager.ts b/packages/network-support/src/scoreManager.ts new file mode 100644 index 00000000..7936155f --- /dev/null +++ b/packages/network-support/src/scoreManager.ts @@ -0,0 +1,120 @@ +// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { Logger } from './utils'; +import { IStore, createStore } from './utils/store'; + +type Options = { + logger: Logger; + projectId: string; + fallbackServiceUrl?: string; + scoreStore?: IStore; +}; + +export enum ScoreType { + GRAPHQL = 'Graphql', + NETWORK = 'network', + RPC = 'RPC', + SUCCESS = 'success', +} + +const scoresDelta = { + [ScoreType.GRAPHQL]: -50, + [ScoreType.NETWORK]: -30, + [ScoreType.RPC]: -10, + [ScoreType.SUCCESS]: 50, +}; + +type ScoreStoreType = { + score: number; + lastUpdate: number; + lastFailed: number; +}; + +export class ScoreManager { + private logger: Logger; + private scoreStore: IStore; + private minScore: number; + private projectId: string; + + constructor(options: Options) { + this.logger = options.logger; + this.scoreStore = options.scoreStore ?? createStore({ ttl: 86_400_000 }); + this.minScore = options.fallbackServiceUrl ? 0 : 1; + this.projectId = options.projectId; + } + + getScore(runner: string) { + const key = this.getCacheKey(runner); + let score = this.scoreStore.get(key); + + if (score === undefined) { + score = { + score: 100, + lastUpdate: 0, + lastFailed: 0, + }; + this.scoreStore.set(key, score); + } + + if (typeof score === 'number') { + return Math.max(score, this.minScore); + } + + return this.calculatedScore(score); + } + + private calculatedScore(score: ScoreStoreType) { + // if (score.lastFailed) { + // return Math.min( + // score.score + Math.floor((Date.now() - score.lastFailed) / (600_000)), + // 100 + // ); + // } + + // if (score.lastUpdate && Date.now() - score.lastUpdate < 5 * 1000) { + // return Math.max(score.score - 50, this.minScore); + // } + + // return score.score; + + return Math.min(score.score + Math.floor((Date.now() - score.lastUpdate) / 600_000), 100); + } + + updateScore(runner: string, errorType: ScoreType) { + if (!runner) { + this.logger?.debug('updateScore: runner is empty'); + return; + } + + const key = this.getCacheKey(runner); + let score = this.scoreStore.get(key) ?? 100; + + if (typeof score === 'number') { + score = { + score: score, + lastUpdate: 0, + lastFailed: 0, + }; + } + + this.logger?.debug(`updateScore type: ${runner} ${errorType}`); + this.logger?.debug(`updateScore before: ${runner} ${JSON.stringify(score)}`); + + const delta = scoresDelta[errorType]; + + score = { + score: Math.min(Math.max(score.score + delta, this.minScore), 100), + lastUpdate: Date.now(), + lastFailed: errorType === ScoreType.SUCCESS ? 0 : Date.now(), + }; + + this.logger?.debug(`updateScore after: ${runner} ${JSON.stringify(score)}`); + + this.scoreStore.set(key, score); + } + + private getCacheKey(runner: string): string { + return `$query-score-${runner}-${this.projectId}`; + } +} diff --git a/packages/network-support/src/utils/index.ts b/packages/network-support/src/utils/index.ts index c8d9e7de..b6336f5f 100644 --- a/packages/network-support/src/utils/index.ts +++ b/packages/network-support/src/utils/index.ts @@ -5,3 +5,4 @@ export * from './logger'; export * from './query'; export * from './store'; export * from './auth'; +export * from './uniqueId'; diff --git a/packages/network-support/src/utils/uniqueId.ts b/packages/network-support/src/utils/uniqueId.ts new file mode 100644 index 00000000..e22f9b88 --- /dev/null +++ b/packages/network-support/src/utils/uniqueId.ts @@ -0,0 +1,6 @@ +// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +export function generateUniqueId(length = 32) { + return Array.from(Array(length), () => Math.floor(Math.random() * 36).toString(36)).join(''); +} diff --git a/yarn.lock b/yarn.lock index fe1b6325..2f58ceeb 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3270,7 +3270,6 @@ __metadata: apollo-link-error: ^1.1.13 buffer: ^6.0.3 cross-fetch: ^4.0.0 - ethers: ^5.6.8 js-base64: ^3.7.5 jwt-decode: ^3.1.2 lru-cache: ^10.0.1