Skip to content

Commit

Permalink
feat: ratelimit score weight (#343)
Browse files Browse the repository at this point in the history
* feat: ratelimit score weight

* feat: rename to traceId

* feat: seperate order type

* fix: deal exceed rate limit
  • Loading branch information
cool-firer authored Nov 11, 2024
1 parent a866e80 commit 0f873d3
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 23 deletions.
53 changes: 40 additions & 13 deletions packages/network-support/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import { OrderManager, ResponseFormat } from './orderManager';
import { customFetch, generateUniqueId, Logger, safeJSONParse } from './utils';
import { OrderType } from './types';
import { OrderType, RequestParam } from './types';
import { ScoreType } from './scoreManager';
import { Base64 } from 'js-base64';
import { ActiveType } from './stateManager';
Expand Down Expand Up @@ -46,7 +46,7 @@ export function createFetch(
maxRetries = 5,
logger?: Logger,
overrideFetch?: typeof fetch,
rid?: string
traceId?: string
): (init: RequestInit) => Promise<Response> {
let retries = 0;
let triedFallback = false;
Expand Down Expand Up @@ -77,7 +77,7 @@ export function createFetch(
let requestParams;
if (retries < maxRetries) {
requestParams = await orderManager.getRequestParams(requestId, proxyVersion, {
rid,
traceId,
retries,
});
}
Expand All @@ -102,6 +102,8 @@ export function createFetch(
const { url, headers, type, runner, channelId } = requestParams;
let httpVersion = 1;
let resHeaders: Headers | undefined;
let limit = 0;
let limitRemain = 0;

try {
if (type === OrderType.fallback) {
Expand All @@ -112,7 +114,7 @@ export function createFetch(
requestId,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
retry: retries,
rid,
traceId,
});
}

Expand All @@ -133,6 +135,8 @@ export function createFetch(
const after = Date.now();
resHeaders = _res.headers;
httpVersion = Number(_res.headers.get('httpVersion')) || 1;
limit = Number(_res.headers.get('x-ratelimit-limit-second')) || 0;
limitRemain = Number(_res.headers.get('x-ratelimit-remaining-second')) || 0;

let res: object | undefined;
if (type === OrderType.flexPlan) {
Expand All @@ -141,7 +145,7 @@ export function createFetch(
new Headers(_res.headers),
channelId,
{
rid,
traceId,
requestId,
deploymentId: orderManager.getProjectId(),
indexer: runner,
Expand All @@ -165,13 +169,17 @@ export function createFetch(
requestId,
retry: retries,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
rid,
traceId,
status: _res.status,
});
res = await _res.json();
}

orderManager.updateScore(runner, ScoreType.SUCCESS, httpVersion);
if (type !== OrderType.fallback) {
orderManager.updateScore(runner, ScoreType.SUCCESS, httpVersion);
orderManager.updateRatelimit(runner, limit, limitRemain, type);
}

void orderManager.collectLatency(
runner,
after - before,
Expand All @@ -187,7 +195,7 @@ export function createFetch(
fallbackServiceUrl: orderManager.fallbackServiceUrl,
body: JSON.stringify(body),
res: JSON.stringify(res),
rid,
traceId,
});
}

Expand All @@ -204,7 +212,13 @@ export function createFetch(
errorMsg = (e as Error)?.message || '';

if (!triedFallback && (retries < maxRetries || orderManager.fallbackServiceUrl)) {
const [needRetry, scoreType] = handleErrorMsg(errorMsg, resHeaders);
const [needRetry, scoreType] = handleErrorMsg(
errorMsg,
orderManager,
requestParams,
type,
resHeaders
);

if (needRetry) {
logger?.error({
Expand All @@ -217,7 +231,7 @@ export function createFetch(
error: errorMsg,
stack: e.stack,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
rid,
traceId,
scoreType,
});
const extraLog = {
Expand All @@ -242,7 +256,7 @@ export function createFetch(
error: errorMsg,
stack: e.stack,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
rid,
traceId,
});

throw new FetchError(errorMsg, 'SQN');
Expand All @@ -258,7 +272,7 @@ export function createFetch(
error: errorMsg,
stack: e.stack,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
rid,
traceId,
});

throw new FetchError(`reach max retries.${errorMsg ? ' error: ' + errorMsg : ''}`, 'SQN');
Expand All @@ -269,7 +283,13 @@ export function createFetch(
};
}

function handleErrorMsg(errorMsg: string, resHeaders?: Headers): [boolean, ScoreType] {
function handleErrorMsg(
errorMsg: string,
orderManager: OrderManager,
requestParams: RequestParam,
type: OrderType,
resHeaders?: Headers
): [boolean, ScoreType] {
let needRetry = true;
let scoreType = ScoreType.RPC;
const errorObj = safeJSONParse(errorMsg);
Expand All @@ -286,6 +306,13 @@ function handleErrorMsg(errorMsg: string, resHeaders?: Headers): [boolean, Score
}
} else if (rpcErrorCodes.has(errorObj.code)) {
scoreType = ScoreType.RPC;

// 1057: Exceed rate limit
if (errorObj.code === 1057) {
scoreType = ScoreType.NONE;
// set ratemlit remain to 0;
orderManager.updateRatelimit(requestParams.runner, 1, 0, type);
}
} else {
needRetry = false;
}
Expand Down
19 changes: 15 additions & 4 deletions packages/network-support/src/orderManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,10 @@ export class OrderManager {

if (!agreements.length) return;

const agreement = (await this.selectRunner(agreements)) as ServiceAgreementOrder;
const agreement = (await this.selectRunner(
agreements,
OrderType.agreement
)) as ServiceAgreementOrder;

this.logger?.debug(`next agreement: ${JSON.stringify(agreement.indexer)}`);

Expand Down Expand Up @@ -480,7 +483,7 @@ export class OrderManager {

if (!plans?.length) return;

const plan = await this.selectRunner(plans);
const plan = await this.selectRunner(plans, OrderType.flexPlan);

if (plan) {
await this.updateSelectedRunner(requestId, plan.indexer);
Expand All @@ -498,10 +501,12 @@ export class OrderManager {
return plan;
}

private async selectRunner(orders: Order[]): Promise<Order | undefined> {
private async selectRunner(orders: Order[], type: OrderType): Promise<Order | undefined> {
if (!orders.length) return;
const scores = await Promise.all(
orders.map((o) => this.scoreManager.getAdjustedScore(o.indexer, o.metadata?.proxyVersion))
orders.map((o) =>
this.scoreManager.getAdjustedScore(o.indexer, o.metadata?.proxyVersion, type)
)
);
const random = Math.random() * scores.reduce((a, b) => a + b.score, 0);
this.logger?.debug(`selectRunner: indexers: ${orders.map((o) => o.indexer)}`);
Expand Down Expand Up @@ -570,6 +575,12 @@ export class OrderManager {
await this.scoreManager.updateScore(runner, errorType, httpVersion, extraLog);
}

async updateRatelimit(runner: string, limit: number, limitRemain: number, type: OrderType) {
if (limit) {
await this.scoreManager.updateRatelimit(runner, limit, limitRemain, type);
}
}

async collectLatency(indexer: string, latency: number, size: number): Promise<void> {
await this.scoreManager.collectLatency(indexer, latency, size);
}
Expand Down
56 changes: 52 additions & 4 deletions packages/network-support/src/scoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

import BigNumber from 'bignumber.js';
import { NotifyFunc, Order, ScoreWithDetail } from './types';
import { NotifyFunc, Order, OrderType, ScoreWithDetail } from './types';
import { Logger, IStore, createStore } from './utils';
import {
calculateBigIntPercentile,
Expand Down Expand Up @@ -89,8 +89,13 @@ export class ScoreManager {
return Math.min(score.score + Math.floor((Date.now() - score.lastUpdate) / 600_000), 100);
}

async getAdjustedScore(runner: string, proxyVersion?: string): Promise<ScoreWithDetail> {
async getAdjustedScore(
runner: string,
proxyVersion?: string,
type?: OrderType
): Promise<ScoreWithDetail> {
proxyVersion = proxyVersion || '';
type = type || OrderType.flexPlan;
const score = await this.getScore(runner);
const base = this.getAvailabilityScore(score);
const http2 = this.getHttpVersionWeight(score);
Expand All @@ -99,11 +104,21 @@ export class ScoreManager {
const block = await getBlockScoreWeight(this.scoreStore, runner, this.projectId);
const latency = await getLatencyScoreWeight(this.scoreStore, runner, this.projectId);
const price = await this.getPriceScoreWeight(runner);
const ratelimitInfo = await this.getRatelimitWeightInfo(runner, type);
const ratelimitWeight = ratelimitInfo.weight;

this.logger?.debug(
`getAdjustedScore: ${runner} ${this.projectId} base:${base} http2:${http2} manua:${manual} multiple:${multiple} block:${block} latency:${latency} price:${price}`
`getAdjustedScore: ${runner} ${
this.projectId
} base:${base} http2:${http2} manua:${manual} multiple:${multiple} block:${block} latency:${latency} price:${price} ratelimit:${JSON.stringify(
ratelimitInfo
)}`
);
return {
score: Math.floor(base * http2 * manual * multiple * block * latency * price * 10) / 10,
score:
Math.floor(
base * http2 * manual * multiple * block * latency * price * ratelimitWeight * 10
) / 10,
scoreDetail: {
base,
http2,
Expand All @@ -112,6 +127,9 @@ export class ScoreManager {
block,
latency,
price,
ratelimit: ratelimitWeight,
ratelimit_quota: ratelimitInfo.limit,
ratelimit_remain: ratelimitInfo.limitRemain,
},
};
// return base * http2 * manual * multiple * block;
Expand All @@ -133,6 +151,26 @@ export class ScoreManager {
return (await this.scoreStore.get<number>(`${key}:${runner}_${this.projectId}`)) || 1;
}

async getRatelimitWeightInfo(runner: string, type: OrderType) {
const key = this.getRatelimitKey();
const now = Math.floor(Date.now() / 1000);
const limitRes = (await this.scoreStore.get<string>(`${key}:${type}:${runner}:${now}`)) || '';

let [limit, limitRemain] = limitRes.split('_').map((v) => Number(v) || 0);
limit = Number(limit) || 0;
limitRemain = Number(limitRemain) || 0;

let weight = 1;
if (limit) {
weight = Math.pow(limitRemain / limit, 3);
}
return {
weight,
limit,
limitRemain,
};
}

async updateScore(runner: string, errorType: ScoreType, httpVersion?: number, extraLog?: any) {
if (!runner) {
this.logger?.debug('updateScore: runner is empty');
Expand Down Expand Up @@ -191,6 +229,12 @@ export class ScoreManager {
});
}

async updateRatelimit(runner: string, limit: number, limitRemain: number, type: OrderType) {
const key = this.getRatelimitKey();
const now = Math.floor(Date.now() / 1000);
await this.scoreStore.set(`${key}:${type}:${runner}:${now}`, `${limit}_${limitRemain}`, 1);
}

async collectLatency(indexer: string, latency: number, size: number): Promise<void> {
const isLocal = process.env.NODE_ENV === 'local';
sampleCount[indexer] = sampleCount[indexer] || 0;
Expand Down Expand Up @@ -256,4 +300,8 @@ export class ScoreManager {
private getPriceScoreKey(): string {
return 'score:price';
}

private getRatelimitKey(): string {
return `sample:ratelimit`;
}
}
12 changes: 11 additions & 1 deletion packages/network-support/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,17 @@ export interface RunnerSelector {
channelIds?: string[];
}

type SCORE_DETAIL_TYPE = 'base' | 'http2' | 'manual' | 'multiple' | 'block' | 'latency' | 'price';
type SCORE_DETAIL_TYPE =
| 'base'
| 'http2'
| 'manual'
| 'multiple'
| 'block'
| 'latency'
| 'price'
| 'ratelimit'
| 'ratelimit_quota'
| 'ratelimit_remain';

export type ScoreWithDetail = {
score: number;
Expand Down
2 changes: 1 addition & 1 deletion packages/network-support/src/utils/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { LRUCache as LRU } from 'lru-cache';
/* eslint-disable @typescript-eslint/no-unused-vars */
export interface IStore {
get<T>(key: string): Promise<T | undefined>;
set<T>(key: string, value: T): Promise<void>; // ttl in milliseconds
set<T>(key: string, value: T, ttl?: number): Promise<void>; // ttl in milliseconds
remove(key: string): Promise<void>;

lpush(key: string, value: any): Promise<number>;
Expand Down

0 comments on commit 0f873d3

Please sign in to comment.