diff --git a/packages/apollo-links/src/auth/authHelper.ts b/packages/apollo-links/src/auth/authHelper.ts index 95e16cbf..6f1f49f5 100644 --- a/packages/apollo-links/src/auth/authHelper.ts +++ b/packages/apollo-links/src/auth/authHelper.ts @@ -3,7 +3,7 @@ import buffer from 'buffer'; import { signTypedData, SignTypedDataVersion } from '@metamask/eth-sig-util'; -import { POST } from '@subql/network-support'; +import { POST, RAW_POST } from '@subql/network-support'; import { AuthMessage, buildTypedMessage, createAuthRequestBody } from './eip712'; @@ -32,3 +32,17 @@ export async function requestAuthToken( const res = await POST<{ token: string }>(authUrl, body); return res.token; } + +export async function rawRequestAuthToken( + authUrl: string, + msg: AuthMessage, + sk: string, + chainId: number +): Promise { + const signature = signMessage(msg, sk, chainId); + if (!signature) return null; + + const body = createAuthRequestBody(msg, signature, chainId); + const res = await RAW_POST(authUrl, body); + return res; +} diff --git a/packages/network-support/src/fetch.ts b/packages/network-support/src/fetch.ts index 665485da..eead16f1 100644 --- a/packages/network-support/src/fetch.ts +++ b/packages/network-support/src/fetch.ts @@ -118,6 +118,19 @@ export function createFetch( }); } + if (type === OrderType.agreement) { + logger?.info({ + type: 'to_agreement', + deploymentId: orderManager.getProjectId(), + indexer: runner, + agrId: channelId, + requestId, + fallbackServiceUrl: orderManager.fallbackServiceUrl, + retry: retries, + traceId, + }); + } + const before = Date.now(); const _res = await customFetch( url, @@ -155,11 +168,18 @@ export function createFetch( } if (type === OrderType.agreement) { const data = await _res.json(); + // proxy error will return like: + // { code: 1006, error: 'Auth expired' } status: 400 + // { code: 1051, error: 'Exceed daily limit' } status: 400 + if (data.error) { + throw new Error(JSON.stringify(data)); + } + res = data; // todo: need to confirm - res = { - ...data, - ...JSON.parse(Base64.decode(data.result)), - }; + // res = { + // ...data, + // ...JSON.parse(Base64.decode(data.result)), + // }; } if (type === OrderType.fallback) { logger?.info({ @@ -208,7 +228,7 @@ export function createFetch( text: () => undefined, } as unknown as Response; } catch (e: any) { - logger?.warn(e); + // logger?.warn(e); errorMsg = (e as Error)?.message || ''; if (!triedFallback && (retries < maxRetries || orderManager.fallbackServiceUrl)) { @@ -216,8 +236,14 @@ export function createFetch( errorMsg, orderManager, requestParams, - type, - resHeaders + resHeaders, + { + phase: 'response', + requestId, + retry: retries, + causeError: errorMsg, + traceId, + } ); if (needRetry) { @@ -225,6 +251,7 @@ export function createFetch( type: 'retry', deploymentId: orderManager.getProjectId(), indexer: runner, + orderType: type, requestId, triedFallback, retry: retries, @@ -234,15 +261,18 @@ export function createFetch( traceId, scoreType, }); - const extraLog = { - traceId, - requestId, - retry: retries, - error: errorMsg, - stack: e.stack, - }; - orderManager.updateScore(runner, scoreType, 0, extraLog); + if (scoreType !== ScoreType.NONE) { + const extraLog = { + requestId, + retry: retries, + error: errorMsg, + stack: e.stack, + orderType: type, + traceId, + }; + orderManager.updateScore(runner, scoreType, 0, extraLog); + } retries += 1; return requestResult(); } @@ -251,6 +281,7 @@ export function createFetch( type: 'throw', deploymentId: orderManager.getProjectId(), indexer: runner, + orderType: type, requestId, triedFallback, retry: retries, @@ -268,6 +299,7 @@ export function createFetch( deploymentId: orderManager.getProjectId(), indexer: runner, requestId, + orderType: type, triedFallback, retry: retries, error: errorMsg, @@ -288,8 +320,8 @@ function handleErrorMsg( errorMsg: string, orderManager: OrderManager, requestParams: RequestParam, - type: OrderType, - resHeaders?: Headers + resHeaders?: Headers, + logData?: any ): [boolean, ScoreType] { let needRetry = true; let scoreType = ScoreType.RPC; @@ -306,13 +338,24 @@ function handleErrorMsg( ); } } else if (rpcErrorCodes.has(errorObj.code)) { - scoreType = ScoreType.RPC; + const { type, channelId, runner } = requestParams; + // for agreement. { code: 1051, error: 'Exceed daily limit' } + if (type === OrderType.agreement && errorObj.code === 1051) { + scoreType = ScoreType.NONE; + orderManager.setDailyLimitedAgreement(channelId || ''); + + // { code: 1006, error: 'Auth expired' } + } else if (type === OrderType.agreement && errorObj.code === 1006) { + scoreType = ScoreType.NONE; + orderManager.refreshAgreementToken(channelId || '', runner, logData); - // 1057: Exceed rate limit - if (errorObj.code === 1057) { + // 1057: Exceed rate limit + } else if (errorObj.code === 1057) { scoreType = ScoreType.NONE; // set ratemlit remain to 0; orderManager.updateRatelimit(requestParams.runner, 1, 0, type); + } else { + scoreType = ScoreType.RPC; } } else { needRetry = false; diff --git a/packages/network-support/src/orderManager.ts b/packages/network-support/src/orderManager.ts index aa8f916f..e5580ec6 100644 --- a/packages/network-support/src/orderManager.ts +++ b/packages/network-support/src/orderManager.ts @@ -23,7 +23,7 @@ import { isTokenExpired, IStore, Logger, - POST, + RAW_POST, safeJSONParse, } from './utils'; import { BlockType, State, StateManager } from './stateManager'; @@ -210,6 +210,25 @@ export class OrderManager { }); } + filterOrdersByExpired(orders: ServiceAgreementOrder[]) { + return orders.filter(({ expired }) => { + return !expired; + }); + } + + async filterOrdersByDailyLimit(orders: ServiceAgreementOrder[]) { + const res: any = []; + await Promise.all( + orders.map(async (o) => { + const reached = await this.stateManager.getDailyLimitedAgreement(o.id); + if (!reached) { + res.push(o); + } + }) + ); + return res; + } + async getRequestParams( requestId: string, proxyVersion?: string, @@ -222,19 +241,11 @@ export class OrderManager { headers['X-SQ-No-Resp-Sig'] = 'true'; const { type, indexer: runner, url, id, metadata } = order; if (type === OrderType.agreement) { - const channelId = id; + const agreementId = id; headers['X-Indexer-Response-Format'] = this.responseFormat ?? 'inline'; - let { token } = order as ServiceAgreementOrder; - if (isTokenExpired(token)) { - try { - token = await this.refreshAgreementToken(id, runner); - } catch (error) { - this.logger?.debug(`request new token for indexer ${runner} and url: ${url} failed`); - throw new RequestParamError((error as any).message, runner); - } - } + const token = await this.handleAgreementToken(order as ServiceAgreementOrder, logData); headers.authorization = tokenToAuthHeader(token); - return { url, runner, channelId, headers, type } as RequestParam; + return { url, runner, channelId: agreementId, headers, type } as RequestParam; } else if (type === OrderType.flexPlan) { const channelId = id; headers['X-Indexer-Response-Format'] = this.responseFormat ?? 'inline'; @@ -371,6 +382,123 @@ export class OrderManager { } } + async handleAgreementToken(order: ServiceAgreementOrder, logData?: any): Promise { + logData = logData || {}; + const { id: agreementId, indexer: runner } = order; + let { token, source, error } = await this.getAgreementToken(order as ServiceAgreementOrder); + if (!token) { + const fresh = await this.refreshAgreementToken(agreementId, runner, { + phase: 'get', + ...logData, + }); + token = fresh.token; + error = fresh.error; + source = 'proxy'; + } + + if (!token) { + this.logger?.error({ + type: 'token_null', + agrId: agreementId, + deploymentId: this.projectId, + indexer: runner, + source, + error, + ...logData, + }); + this.handleAgreementError(agreementId, error); + throw new Error(`Token response is null. ${logData?.rid}`); + } + + if (!isTokenExpired(token)) { + this.setAgreementToken(agreementId, token, source); + return token; + } + + const fresh = await this.refreshAgreementToken(agreementId, runner, { + phase: 'expire', + ...logData, + }); + + token = fresh.token; + error = fresh.error; + if (token) { + return token; + } + + this.handleAgreementError(agreementId, error); + const message = `request new token failed. ${error}`; + throw new RequestParamError(message, runner); + } + + async getAgreementToken(order: ServiceAgreementOrder) { + const error = ''; + const { token, id: agreementId } = order; + + // 1. try from redis. + const cachedToken = await this.stateManager.getAgreementToken(agreementId); + if (cachedToken) { + return { + token: cachedToken, + source: 'red', + error, + }; + } + + // 2. try from memory + if (token) { + return { + token, + source: 'mem', + error, + }; + } + + return { + token: '', + source: '', + error, + }; + } + + async setAgreementToken(agreementId: string, token: string, source: string) { + this.updateTokenById(agreementId, token); + + // newest token + if (source === 'proxy') { + await this.stateManager.setAgreementToken(agreementId, token); + } + } + + handleAgreementError(agreementId: string, error: string) { + // error like these: + // {"statusCode":401,"message":"{\"code\":1001,\"error\":\"Auth create failure\"}"} + if (this.isAgreementExpired(error)) { + this.setAgrementExpired(agreementId); + } + } + + isAgreementExpired(error: string) { + const obj = safeJSONParse(error); + if (obj) { + // error maybe: {"statusCode":500,"message":"Internal server error"} + const m = safeJSONParse(obj.message); + return m?.code === 1001; + } + return false; + } + + setAgrementExpired(agreementId: string) { + const index = this._agreements?.findIndex((a) => a.id === agreementId); + if (index === -1) return; + this._agreements[index].expired = true; + } + + async setDailyLimitedAgreement(agreementId: string) { + if (!agreementId) return; + await this.stateManager.setDailyLimitedAgreement(agreementId); + } + async getSignedState(channelId: string, block: BlockType): Promise { return this.stateManager.getSignedState(channelId, block); } @@ -416,6 +544,8 @@ export class OrderManager { agreements = this.filterOrdersByProxyVersion(agreements, proxyVersion); this.logger?.debug(`available agreements after proxy version filter: ${agreements.length}`); } + agreements = this.filterOrdersByExpired(agreements as ServiceAgreementOrder[]); + agreements = await this.filterOrdersByDailyLimit(agreements as ServiceAgreementOrder[]); if (!agreements.length) return; @@ -536,17 +666,47 @@ export class OrderManager { } } - async refreshAgreementToken(agreementId: string, runner: string): Promise { - this.logger?.debug(`request new token for runner ${runner}`); - const tokenUrl = new URL('/orders/token', this.authUrl); - const res = await POST<{ token: string }>(tokenUrl.toString(), { - projectId: this.projectId, - indexer: runner, - agreementId, - }); - this.logger?.debug(`request new token for indexer ${runner} success`); - this.updateTokenById(agreementId, res.token); - return res.token; + async refreshAgreementToken( + agreementId: string, + runner: string, + logData?: any + ): Promise<{ error: string; token: string }> { + let error = ''; + let token = ''; + try { + this.logger?.debug(`request new token for runner ${runner}`); + const tokenUrl = new URL('/orders/token', this.authUrl); + const response = await RAW_POST(tokenUrl.toString(), { + projectId: this.projectId, + indexer: runner, + agreementId, + }); + if (response.status >= 400) { + const content = await response.text(); + error = content; + } else { + const res = await response.json(); + this.logger?.debug(`request new token for indexer ${runner} success`); + this.setAgreementToken(agreementId, token, 'proxy'); + token = res.token; + } + } catch (err: any) { + error = err.message; + } + if (!token) { + this.logger?.error({ + type: 'token_refresh', + agrId: agreementId, + deploymentId: this.projectId, + indexer: runner, + error, + ...logData, + }); + } + return { + error, + token, + }; } private async getSelectedRunners(requestId: string): Promise { diff --git a/packages/network-support/src/stateManager.ts b/packages/network-support/src/stateManager.ts index eaff8e50..bf4108c5 100644 --- a/packages/network-support/src/stateManager.ts +++ b/packages/network-support/src/stateManager.ts @@ -237,4 +237,43 @@ export class StateManager { private getCacheKey(channelId: string) { return `state:${this.projectId}:${channelId}:${computeMD5(this.apikey ?? '')}`; } + + private agreementTokenKey(agreementId: string) { + return `token:agreement:${agreementId}`; + } + + private agreementLimitKey(agreementId: string) { + const date = new Date(); + const year = date.getFullYear(); + const month = (date.getMonth() + 1).toString().padStart(2, '0'); + const day = date.getDate().toString().padStart(2, '0'); + const formattedDate = `${year}-${month}-${day}`; + return `dlimit:${formattedDate}:${agreementId}`; + } + + async getAgreementToken(agreementId: string): Promise { + const key = this.agreementTokenKey(agreementId); + return (await this.stateStore.get(key)) || ''; + } + + async setAgreementToken(agreementId: string, token: string) { + const key = this.agreementTokenKey(agreementId); + await this.stateStore.set(key, token); + } + + async setDailyLimitedAgreement(agreementId: string) { + const key = this.agreementLimitKey(agreementId); + const now = new Date(); + const midnight = new Date(now); + midnight.setHours(24, 0, 0, 0); + const mills = midnight.getTime() - now.getTime(); + const seconds = Math.floor(mills / 1000); + await this.stateStore.set(key, 1, seconds + 60); + } + + async getDailyLimitedAgreement(agreementId: string) { + const key = this.agreementLimitKey(agreementId); + const reached = await this.stateStore.get(key); + return reached ? true : false; + } } diff --git a/packages/network-support/src/types.ts b/packages/network-support/src/types.ts index 6ae00e4f..1c0acdac 100644 --- a/packages/network-support/src/types.ts +++ b/packages/network-support/src/types.ts @@ -32,6 +32,9 @@ export interface Order { export interface ServiceAgreementOrder extends Order { token: string; + + // agreement has expired or not + expired?: boolean; } export type FlexPlanOrder = Order; diff --git a/packages/network-support/src/utils/query.ts b/packages/network-support/src/utils/query.ts index e7736d44..34432cd4 100644 --- a/packages/network-support/src/utils/query.ts +++ b/packages/network-support/src/utils/query.ts @@ -50,6 +50,23 @@ export async function POST( return res.json(); } +export async function RAW_POST( + url: string, + body: Record, + headers?: Record +): Promise { + if (!headers) { + headers = {}; + } + headers['Content-Type'] = 'application/json'; + const res = await customFetch(url, { + body: JSON.stringify(body), + method: 'post', + headers, + }); + return res; +} + export async function GET(url: string): Promise { const headers = { 'Content-Type': 'application/json' }; const res = await customFetch(url, { diff --git a/packages/network-support/src/utils/store.ts b/packages/network-support/src/utils/store.ts index a176bb08..424afd90 100644 --- a/packages/network-support/src/utils/store.ts +++ b/packages/network-support/src/utils/store.ts @@ -18,7 +18,7 @@ export class BaseStorage implements IStore { get(key: string): Promise { return Promise.resolve(undefined); } - set(key: string, value: T): Promise { + set(key: string, value: T, ttl?: number): Promise { return Promise.resolve(); } remove(key: string): Promise { @@ -65,7 +65,7 @@ export class LocalStorageCache extends BaseStorage { } } - override async set(key: string, value: T): Promise { + override async set(key: string, value: T, ttl?: number): Promise { const expiry = this.ttl ? Date.now() + this.ttl : undefined; const data = { value, expiry };