Skip to content

Commit

Permalink
test and try improve agreement (#341)
Browse files Browse the repository at this point in the history
* feat: v1

* feat: agreement score

* feat: add log

* feat: ws complete

* feat: refactor agreement expire

* feat: little improve
  • Loading branch information
cool-firer authored Nov 20, 2024
1 parent b86b3de commit d26fd3f
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 46 deletions.
16 changes: 15 additions & 1 deletion packages/apollo-links/src/auth/authHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import buffer from 'buffer';
import { signTypedData, SignTypedDataVersion } from '@metamask/eth-sig-util';
import { POST } from '@subql/network-support';
import { POST, RAW_POST } from '@subql/network-support';

import { AuthMessage, buildTypedMessage, createAuthRequestBody } from './eip712';

Expand Down Expand Up @@ -32,3 +32,17 @@ export async function requestAuthToken(
const res = await POST<{ token: string }>(authUrl, body);
return res.token;
}

export async function rawRequestAuthToken(
authUrl: string,
msg: AuthMessage,
sk: string,
chainId: number
): Promise<Response | null> {
const signature = signMessage(msg, sk, chainId);
if (!signature) return null;

const body = createAuthRequestBody(msg, signature, chainId);
const res = await RAW_POST(authUrl, body);
return res;
}
83 changes: 63 additions & 20 deletions packages/network-support/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ export function createFetch(
});
}

if (type === OrderType.agreement) {
logger?.info({
type: 'to_agreement',
deploymentId: orderManager.getProjectId(),
indexer: runner,
agrId: channelId,
requestId,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
retry: retries,
traceId,
});
}

const before = Date.now();
const _res = await customFetch(
url,
Expand Down Expand Up @@ -155,11 +168,18 @@ export function createFetch(
}
if (type === OrderType.agreement) {
const data = await _res.json();
// proxy error will return like:
// { code: 1006, error: 'Auth expired' } status: 400
// { code: 1051, error: 'Exceed daily limit' } status: 400
if (data.error) {
throw new Error(JSON.stringify(data));
}
res = data;
// todo: need to confirm
res = {
...data,
...JSON.parse(Base64.decode(data.result)),
};
// res = {
// ...data,
// ...JSON.parse(Base64.decode(data.result)),
// };
}
if (type === OrderType.fallback) {
logger?.info({
Expand Down Expand Up @@ -208,23 +228,30 @@ export function createFetch(
text: () => undefined,
} as unknown as Response;
} catch (e: any) {
logger?.warn(e);
// logger?.warn(e);
errorMsg = (e as Error)?.message || '';

if (!triedFallback && (retries < maxRetries || orderManager.fallbackServiceUrl)) {
const [needRetry, scoreType] = handleErrorMsg(
errorMsg,
orderManager,
requestParams,
type,
resHeaders
resHeaders,
{
phase: 'response',
requestId,
retry: retries,
causeError: errorMsg,
traceId,
}
);

if (needRetry) {
logger?.error({
type: 'retry',
deploymentId: orderManager.getProjectId(),
indexer: runner,
orderType: type,
requestId,
triedFallback,
retry: retries,
Expand All @@ -234,15 +261,18 @@ export function createFetch(
traceId,
scoreType,
});
const extraLog = {
traceId,
requestId,
retry: retries,
error: errorMsg,
stack: e.stack,
};

orderManager.updateScore(runner, scoreType, 0, extraLog);
if (scoreType !== ScoreType.NONE) {
const extraLog = {
requestId,
retry: retries,
error: errorMsg,
stack: e.stack,
orderType: type,
traceId,
};
orderManager.updateScore(runner, scoreType, 0, extraLog);
}
retries += 1;
return requestResult();
}
Expand All @@ -251,6 +281,7 @@ export function createFetch(
type: 'throw',
deploymentId: orderManager.getProjectId(),
indexer: runner,
orderType: type,
requestId,
triedFallback,
retry: retries,
Expand All @@ -268,6 +299,7 @@ export function createFetch(
deploymentId: orderManager.getProjectId(),
indexer: runner,
requestId,
orderType: type,
triedFallback,
retry: retries,
error: errorMsg,
Expand All @@ -288,8 +320,8 @@ function handleErrorMsg(
errorMsg: string,
orderManager: OrderManager,
requestParams: RequestParam,
type: OrderType,
resHeaders?: Headers
resHeaders?: Headers,
logData?: any
): [boolean, ScoreType] {
let needRetry = true;
let scoreType = ScoreType.RPC;
Expand All @@ -306,13 +338,24 @@ function handleErrorMsg(
);
}
} else if (rpcErrorCodes.has(errorObj.code)) {
scoreType = ScoreType.RPC;
const { type, channelId, runner } = requestParams;
// for agreement. { code: 1051, error: 'Exceed daily limit' }
if (type === OrderType.agreement && errorObj.code === 1051) {
scoreType = ScoreType.NONE;
orderManager.setDailyLimitedAgreement(channelId || '');

// { code: 1006, error: 'Auth expired' }
} else if (type === OrderType.agreement && errorObj.code === 1006) {
scoreType = ScoreType.NONE;
orderManager.refreshAgreementToken(channelId || '', runner, logData);

// 1057: Exceed rate limit
if (errorObj.code === 1057) {
// 1057: Exceed rate limit
} else if (errorObj.code === 1057) {
scoreType = ScoreType.NONE;
// set ratemlit remain to 0;
orderManager.updateRatelimit(requestParams.runner, 1, 0, type);
} else {
scoreType = ScoreType.RPC;
}
} else {
needRetry = false;
Expand Down
Loading

0 comments on commit d26fd3f

Please sign in to comment.