diff --git a/packages/apollo-links/src/auth/authHelper.ts b/packages/apollo-links/src/auth/authHelper.ts index 95e16cbf..6f1f49f5 100644 --- a/packages/apollo-links/src/auth/authHelper.ts +++ b/packages/apollo-links/src/auth/authHelper.ts @@ -3,7 +3,7 @@ import buffer from 'buffer'; import { signTypedData, SignTypedDataVersion } from '@metamask/eth-sig-util'; -import { POST } from '@subql/network-support'; +import { POST, RAW_POST } from '@subql/network-support'; import { AuthMessage, buildTypedMessage, createAuthRequestBody } from './eip712'; @@ -32,3 +32,17 @@ export async function requestAuthToken( const res = await POST<{ token: string }>(authUrl, body); return res.token; } + +export async function rawRequestAuthToken( + authUrl: string, + msg: AuthMessage, + sk: string, + chainId: number +): Promise { + const signature = signMessage(msg, sk, chainId); + if (!signature) return null; + + const body = createAuthRequestBody(msg, signature, chainId); + const res = await RAW_POST(authUrl, body); + return res; +} diff --git a/packages/network-support/src/fetch.ts b/packages/network-support/src/fetch.ts index 7f1f23a5..a209a4f4 100644 --- a/packages/network-support/src/fetch.ts +++ b/packages/network-support/src/fetch.ts @@ -3,7 +3,7 @@ import { OrderManager, ResponseFormat } from './orderManager'; import { customFetch, generateUniqueId, Logger, safeJSONParse } from './utils'; -import { OrderType } from './types'; +import { OrderType, RequestParam } from './types'; import { ScoreType } from './scoreManager'; import { Base64 } from 'js-base64'; import { ActiveType } from './stateManager'; @@ -116,6 +116,19 @@ export function createFetch( }); } + if (type === OrderType.agreement) { + logger?.info({ + type: 'to_agreement', + deploymentId: orderManager.getProjectId(), + indexer: runner, + agrId: channelId, + requestId, + fallbackServiceUrl: orderManager.fallbackServiceUrl, + retry: retries, + rid, + }); + } + const before = Date.now(); const _res = await customFetch( url, @@ -151,11 +164,18 @@ export function createFetch( } if (type === OrderType.agreement) { const data = await _res.json(); + // proxy error will return like: + // { code: 1006, error: 'Auth expired' } status: 400 + // { code: 1051, error: 'Exceed daily limit' } status: 400 + if (data.error) { + throw new Error(JSON.stringify(data)); + } + res = data; // todo: need to confirm - res = { - ...data, - ...JSON.parse(Base64.decode(data.result)), - }; + // res = { + // ...data, + // ...JSON.parse(Base64.decode(data.result)), + // }; } if (type === OrderType.fallback) { logger?.info({ @@ -200,17 +220,30 @@ export function createFetch( text: () => undefined, } as unknown as Response; } catch (e: any) { - logger?.warn(e); + // logger?.warn(e); errorMsg = (e as Error)?.message || ''; if (!triedFallback && (retries < maxRetries || orderManager.fallbackServiceUrl)) { - const [needRetry, scoreType] = handleErrorMsg(errorMsg, resHeaders); + const [needRetry, scoreType] = handleErrorMsg( + errorMsg, + orderManager, + requestParams, + resHeaders, + { + phase: 'response', + requestId, + retry: retries, + causeError: errorMsg, + rid, + } + ); if (needRetry) { logger?.error({ type: 'retry', deploymentId: orderManager.getProjectId(), indexer: runner, + orderType: type, requestId, triedFallback, retry: retries, @@ -220,14 +253,17 @@ export function createFetch( rid, scoreType, }); - const extraLog = { - requestId, - retry: retries, - error: errorMsg, - stack: e.stack, - }; - orderManager.updateScore(runner, scoreType, 0, extraLog); + if (scoreType !== ScoreType.NONE) { + const extraLog = { + requestId, + retry: retries, + error: errorMsg, + stack: e.stack, + orderType: type, + }; + orderManager.updateScore(runner, scoreType, 0, extraLog); + } retries += 1; return requestResult(); } @@ -236,6 +272,7 @@ export function createFetch( type: 'throw', deploymentId: orderManager.getProjectId(), indexer: runner, + orderType: type, requestId, triedFallback, retry: retries, @@ -253,6 +290,7 @@ export function createFetch( deploymentId: orderManager.getProjectId(), indexer: runner, requestId, + orderType: type, triedFallback, retry: retries, error: errorMsg, @@ -269,7 +307,13 @@ export function createFetch( }; } -function handleErrorMsg(errorMsg: string, resHeaders?: Headers): [boolean, ScoreType] { +function handleErrorMsg( + errorMsg: string, + orderManager: OrderManager, + requestParams: RequestParam, + resHeaders?: Headers, + logData?: any +): [boolean, ScoreType] { let needRetry = true; let scoreType = ScoreType.RPC; const errorObj = safeJSONParse(errorMsg); @@ -285,7 +329,19 @@ function handleErrorMsg(errorMsg: string, resHeaders?: Headers): [boolean, Score ); } } else if (rpcErrorCodes.has(errorObj.code)) { - scoreType = ScoreType.RPC; + const { type, channelId, runner } = requestParams; + // for agreement. { code: 1051, error: 'Exceed daily limit' } + if (type === OrderType.agreement && errorObj.code === 1051) { + scoreType = ScoreType.NONE; + orderManager.setDailyLimitedAgreement(channelId || ''); + + // { code: 1006, error: 'Auth expired' } + } else if (type === OrderType.agreement && errorObj.code === 1006) { + scoreType = ScoreType.NONE; + orderManager.refreshAgreementToken(channelId || '', runner, logData); + } else { + scoreType = ScoreType.RPC; + } } else { needRetry = false; } diff --git a/packages/network-support/src/orderManager.ts b/packages/network-support/src/orderManager.ts index 087650b7..1b9686b2 100644 --- a/packages/network-support/src/orderManager.ts +++ b/packages/network-support/src/orderManager.ts @@ -23,7 +23,7 @@ import { isTokenExpired, IStore, Logger, - POST, + RAW_POST, safeJSONParse, } from './utils'; import { BlockType, State, StateManager } from './stateManager'; @@ -210,6 +210,19 @@ export class OrderManager { }); } + filterOrdersByExpired(orders: ServiceAgreementOrder[]) { + return orders.filter(({ expired }) => { + return !expired; + }); + } + + filterOrdersByDailyLimit(orders: ServiceAgreementOrder[]) { + return orders.filter(async (o) => { + const reached = await this.stateManager.getDailyLimitedAgreement(o.id); + return !reached; + }); + } + async getRequestParams( requestId: string, proxyVersion?: string, @@ -222,19 +235,11 @@ export class OrderManager { headers['X-SQ-No-Resp-Sig'] = 'true'; const { type, indexer: runner, url, id, metadata } = order; if (type === OrderType.agreement) { - const channelId = id; + const agreementId = id; headers['X-Indexer-Response-Format'] = this.responseFormat ?? 'inline'; - let { token } = order as ServiceAgreementOrder; - if (isTokenExpired(token)) { - try { - token = await this.refreshAgreementToken(id, runner); - } catch (error) { - this.logger?.debug(`request new token for indexer ${runner} and url: ${url} failed`); - throw new RequestParamError((error as any).message, runner); - } - } + const token = await this.handleAgreementToken(order as ServiceAgreementOrder, logData); headers.authorization = tokenToAuthHeader(token); - return { url, runner, channelId, headers, type } as RequestParam; + return { url, runner, channelId: agreementId, headers, type } as RequestParam; } else if (type === OrderType.flexPlan) { const channelId = id; headers['X-Indexer-Response-Format'] = this.responseFormat ?? 'inline'; @@ -371,6 +376,117 @@ export class OrderManager { } } + async handleAgreementToken(order: ServiceAgreementOrder, logData?: any): Promise { + logData = logData || {}; + const { id: agreementId, indexer: runner } = order; + let { token, source, error } = await this.getAgreementToken(order as ServiceAgreementOrder); + if (!token) { + const fresh = await this.refreshAgreementToken(agreementId, runner, { + phase: 'get', + ...logData, + }); + token = fresh.token; + error = fresh.error; + source = 'proxy'; + } + + if (!token) { + this.logger?.error({ + type: 'token_null', + agrId: agreementId, + deploymentId: this.projectId, + indexer: runner, + source, + error, + ...logData, + }); + this.handleAgreementError(agreementId, error); + throw new Error(`Token response is null. ${logData?.rid}`); + } + + if (!isTokenExpired(token)) { + this.setAgreementToken(agreementId, token, source); + return token; + } + + const fresh = await this.refreshAgreementToken(agreementId, runner, { + phase: 'expire', + ...logData, + }); + + token = fresh.token; + error = fresh.error; + if (token) { + return token; + } + + this.handleAgreementError(agreementId, error); + const message = `request new token failed. ${error}`; + throw new RequestParamError(message, runner); + } + + async getAgreementToken(order: ServiceAgreementOrder) { + const error = ''; + const { token, id: agreementId } = order; + + // 1. try from redis. + const cachedToken = await this.stateManager.getAgreementToken(agreementId); + if (cachedToken) { + return { + token: cachedToken, + source: 'red', + error, + }; + } + + // 2. try from memory + if (token) { + return { + token, + source: 'mem', + error, + }; + } + + return { + token: '', + source: '', + error, + }; + } + + async setAgreementToken(agreementId: string, token: string, source: string) { + this.updateTokenById(agreementId, token); + + // newest token + if (source === 'proxy') { + await this.stateManager.setAgreementToken(agreementId, token); + } + } + + handleAgreementError(agreementId: string, error: string) { + // error like these: + // {"statusCode":401,"message":"{\"code\":1001,\"error\":\"Auth create failure\"}"} + const obj = safeJSONParse(error); + if (obj) { + const m = safeJSONParse(obj.message); + if (m.code === 1001) { + this.expireAgrement(agreementId); + } + } + } + + expireAgrement(agreementId: string) { + const index = this._agreements?.findIndex((a) => a.id === agreementId); + if (index === -1) return; + this._agreements[index].expired = true; + } + + async setDailyLimitedAgreement(agreementId: string) { + if (!agreementId) return; + await this.stateManager.setDailyLimitedAgreement(agreementId); + } + async getSignedState(channelId: string, block: BlockType): Promise { return this.stateManager.getSignedState(channelId, block); } @@ -412,6 +528,8 @@ export class OrderManager { agreements = this.filterOrdersByProxyVersion(agreements, proxyVersion); this.logger?.debug(`available agreements after proxy version filter: ${agreements.length}`); } + agreements = this.filterOrdersByExpired(agreements as ServiceAgreementOrder[]); + agreements = await this.filterOrdersByDailyLimit(agreements as ServiceAgreementOrder[]); if (!agreements.length) return; @@ -526,17 +644,47 @@ export class OrderManager { } } - async refreshAgreementToken(agreementId: string, runner: string): Promise { - this.logger?.debug(`request new token for runner ${runner}`); - const tokenUrl = new URL('/orders/token', this.authUrl); - const res = await POST<{ token: string }>(tokenUrl.toString(), { - projectId: this.projectId, - indexer: runner, - agreementId, - }); - this.logger?.debug(`request new token for indexer ${runner} success`); - this.updateTokenById(agreementId, res.token); - return res.token; + async refreshAgreementToken( + agreementId: string, + runner: string, + logData?: any + ): Promise<{ error: string; token: string }> { + let error = ''; + let token = ''; + try { + this.logger?.debug(`request new token for runner ${runner}`); + const tokenUrl = new URL('/orders/token', this.authUrl); + const response = await RAW_POST(tokenUrl.toString(), { + projectId: this.projectId, + indexer: runner, + agreementId, + }); + if (response.status >= 400) { + const content = await response.text(); + error = content; + } else { + const res = await response.json(); + this.logger?.debug(`request new token for indexer ${runner} success`); + this.setAgreementToken(agreementId, token, 'proxy'); + token = res.token; + } + } catch (err: any) { + error = err.message; + } + if (!token) { + this.logger?.error({ + type: 'token_refresh', + agrId: agreementId, + deploymentId: this.projectId, + indexer: runner, + error, + ...logData, + }); + } + return { + error, + token, + }; } private async getSelectedRunners(requestId: string): Promise { diff --git a/packages/network-support/src/stateManager.ts b/packages/network-support/src/stateManager.ts index 738ee006..da05103d 100644 --- a/packages/network-support/src/stateManager.ts +++ b/packages/network-support/src/stateManager.ts @@ -227,4 +227,43 @@ export class StateManager { private getCacheKey(channelId: string) { return `state:${this.projectId}:${channelId}:${computeMD5(this.apikey ?? '')}`; } + + private agreementTokenKey(agreementId: string) { + return `token:agreement:${agreementId}`; + } + + private agreementLimitKey(agreementId: string) { + const date = new Date(); + const year = date.getFullYear(); + const month = (date.getMonth() + 1).toString().padStart(2, '0'); + const day = date.getDate().toString().padStart(2, '0'); + const formattedDate = `${year}-${month}-${day}`; + return `dlimit:${formattedDate}:${agreementId}`; + } + + async getAgreementToken(agreementId: string): Promise { + const key = this.agreementTokenKey(agreementId); + return (await this.stateStore.get(key)) || ''; + } + + async setAgreementToken(agreementId: string, token: string) { + const key = this.agreementTokenKey(agreementId); + await this.stateStore.set(key, token); + } + + async setDailyLimitedAgreement(agreementId: string) { + const key = this.agreementLimitKey(agreementId); + const now = new Date(); + const midnight = new Date(now); + midnight.setHours(24, 0, 0, 0); + const mills = midnight.getTime() - now.getTime(); + const seconds = Math.floor(mills / 1000); + await this.stateStore.set(key, 1, seconds + 60); + } + + async getDailyLimitedAgreement(agreement: string) { + const key = this.agreementLimitKey(agreement); + const reached = await this.stateStore.get(key); + return reached ? true : false; + } } diff --git a/packages/network-support/src/types.ts b/packages/network-support/src/types.ts index 422ff1e3..28bf2414 100644 --- a/packages/network-support/src/types.ts +++ b/packages/network-support/src/types.ts @@ -32,6 +32,9 @@ export interface Order { export interface ServiceAgreementOrder extends Order { token: string; + + // agreement has expired or not + expired?: boolean; } export type FlexPlanOrder = Order; diff --git a/packages/network-support/src/utils/query.ts b/packages/network-support/src/utils/query.ts index e7736d44..34432cd4 100644 --- a/packages/network-support/src/utils/query.ts +++ b/packages/network-support/src/utils/query.ts @@ -50,6 +50,23 @@ export async function POST( return res.json(); } +export async function RAW_POST( + url: string, + body: Record, + headers?: Record +): Promise { + if (!headers) { + headers = {}; + } + headers['Content-Type'] = 'application/json'; + const res = await customFetch(url, { + body: JSON.stringify(body), + method: 'post', + headers, + }); + return res; +} + export async function GET(url: string): Promise { const headers = { 'Content-Type': 'application/json' }; const res = await customFetch(url, { diff --git a/packages/network-support/src/utils/store.ts b/packages/network-support/src/utils/store.ts index f60b9b15..424afd90 100644 --- a/packages/network-support/src/utils/store.ts +++ b/packages/network-support/src/utils/store.ts @@ -6,7 +6,7 @@ import { LRUCache as LRU } from 'lru-cache'; /* eslint-disable @typescript-eslint/no-unused-vars */ export interface IStore { get(key: string): Promise; - set(key: string, value: T): Promise; // ttl in milliseconds + set(key: string, value: T, ttl?: number): Promise; // ttl in milliseconds remove(key: string): Promise; lpush(key: string, value: any): Promise; @@ -18,7 +18,7 @@ export class BaseStorage implements IStore { get(key: string): Promise { return Promise.resolve(undefined); } - set(key: string, value: T): Promise { + set(key: string, value: T, ttl?: number): Promise { return Promise.resolve(); } remove(key: string): Promise { @@ -65,7 +65,7 @@ export class LocalStorageCache extends BaseStorage { } } - override async set(key: string, value: T): Promise { + override async set(key: string, value: T, ttl?: number): Promise { const expiry = this.ttl ? Date.now() + this.ttl : undefined; const data = { value, expiry };