Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ratelimit score weight #343

Merged
merged 4 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions packages/network-support/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import { OrderManager, ResponseFormat } from './orderManager';
import { customFetch, generateUniqueId, Logger, safeJSONParse } from './utils';
import { OrderType } from './types';
import { OrderType, RequestParam } from './types';
import { ScoreType } from './scoreManager';
import { Base64 } from 'js-base64';
import { ActiveType } from './stateManager';
Expand Down Expand Up @@ -46,7 +46,7 @@ export function createFetch(
maxRetries = 5,
logger?: Logger,
overrideFetch?: typeof fetch,
rid?: string
traceId?: string
): (init: RequestInit) => Promise<Response> {
let retries = 0;
let triedFallback = false;
Expand Down Expand Up @@ -77,7 +77,7 @@ export function createFetch(
let requestParams;
if (retries < maxRetries) {
requestParams = await orderManager.getRequestParams(requestId, proxyVersion, {
rid,
traceId,
retries,
});
}
Expand All @@ -102,6 +102,8 @@ export function createFetch(
const { url, headers, type, runner, channelId } = requestParams;
let httpVersion = 1;
let resHeaders: Headers | undefined;
let limit = 0;
let limitRemain = 0;

try {
if (type === OrderType.fallback) {
Expand All @@ -112,7 +114,7 @@ export function createFetch(
requestId,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
retry: retries,
rid,
traceId,
});
}

Expand All @@ -133,6 +135,8 @@ export function createFetch(
const after = Date.now();
resHeaders = _res.headers;
httpVersion = Number(_res.headers.get('httpVersion')) || 1;
limit = Number(_res.headers.get('x-ratelimit-limit-second')) || 0;
limitRemain = Number(_res.headers.get('x-ratelimit-remaining-second')) || 0;

let res: object | undefined;
if (type === OrderType.flexPlan) {
Expand All @@ -141,7 +145,7 @@ export function createFetch(
new Headers(_res.headers),
channelId,
{
rid,
traceId,
requestId,
deploymentId: orderManager.getProjectId(),
indexer: runner,
Expand All @@ -165,13 +169,17 @@ export function createFetch(
requestId,
retry: retries,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
rid,
traceId,
status: _res.status,
});
res = await _res.json();
}

orderManager.updateScore(runner, ScoreType.SUCCESS, httpVersion);
if (type !== OrderType.fallback) {
orderManager.updateScore(runner, ScoreType.SUCCESS, httpVersion);
orderManager.updateRatelimit(runner, limit, limitRemain, type);
}

void orderManager.collectLatency(
runner,
after - before,
Expand All @@ -187,7 +195,7 @@ export function createFetch(
fallbackServiceUrl: orderManager.fallbackServiceUrl,
body: JSON.stringify(body),
res: JSON.stringify(res),
rid,
traceId,
});
}

Expand All @@ -204,7 +212,13 @@ export function createFetch(
errorMsg = (e as Error)?.message || '';

if (!triedFallback && (retries < maxRetries || orderManager.fallbackServiceUrl)) {
const [needRetry, scoreType] = handleErrorMsg(errorMsg, resHeaders);
const [needRetry, scoreType] = handleErrorMsg(
errorMsg,
orderManager,
requestParams,
type,
resHeaders
);

if (needRetry) {
logger?.error({
Expand All @@ -217,7 +231,7 @@ export function createFetch(
error: errorMsg,
stack: e.stack,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
rid,
traceId,
scoreType,
});
const extraLog = {
Expand All @@ -242,7 +256,7 @@ export function createFetch(
error: errorMsg,
stack: e.stack,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
rid,
traceId,
});

throw new FetchError(errorMsg, 'SQN');
Expand All @@ -258,7 +272,7 @@ export function createFetch(
error: errorMsg,
stack: e.stack,
fallbackServiceUrl: orderManager.fallbackServiceUrl,
rid,
traceId,
});

throw new FetchError(`reach max retries.${errorMsg ? ' error: ' + errorMsg : ''}`, 'SQN');
Expand All @@ -269,7 +283,13 @@ export function createFetch(
};
}

function handleErrorMsg(errorMsg: string, resHeaders?: Headers): [boolean, ScoreType] {
function handleErrorMsg(
errorMsg: string,
orderManager: OrderManager,
requestParams: RequestParam,
type: OrderType,
resHeaders?: Headers
): [boolean, ScoreType] {
let needRetry = true;
let scoreType = ScoreType.RPC;
const errorObj = safeJSONParse(errorMsg);
Expand All @@ -286,6 +306,13 @@ function handleErrorMsg(errorMsg: string, resHeaders?: Headers): [boolean, Score
}
} else if (rpcErrorCodes.has(errorObj.code)) {
scoreType = ScoreType.RPC;

// 1057: Exceed rate limit
if (errorObj.code === 1057) {
scoreType = ScoreType.NONE;
// set ratemlit remain to 0;
orderManager.updateRatelimit(requestParams.runner, 1, 0, type);
}
} else {
needRetry = false;
}
Expand Down
19 changes: 15 additions & 4 deletions packages/network-support/src/orderManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,10 @@ export class OrderManager {

if (!agreements.length) return;

const agreement = (await this.selectRunner(agreements)) as ServiceAgreementOrder;
const agreement = (await this.selectRunner(
agreements,
OrderType.agreement
)) as ServiceAgreementOrder;

this.logger?.debug(`next agreement: ${JSON.stringify(agreement.indexer)}`);

Expand Down Expand Up @@ -480,7 +483,7 @@ export class OrderManager {

if (!plans?.length) return;

const plan = await this.selectRunner(plans);
const plan = await this.selectRunner(plans, OrderType.flexPlan);

if (plan) {
await this.updateSelectedRunner(requestId, plan.indexer);
Expand All @@ -498,10 +501,12 @@ export class OrderManager {
return plan;
}

private async selectRunner(orders: Order[]): Promise<Order | undefined> {
private async selectRunner(orders: Order[], type: OrderType): Promise<Order | undefined> {
if (!orders.length) return;
const scores = await Promise.all(
orders.map((o) => this.scoreManager.getAdjustedScore(o.indexer, o.metadata?.proxyVersion))
orders.map((o) =>
this.scoreManager.getAdjustedScore(o.indexer, o.metadata?.proxyVersion, type)
)
);
const random = Math.random() * scores.reduce((a, b) => a + b.score, 0);
this.logger?.debug(`selectRunner: indexers: ${orders.map((o) => o.indexer)}`);
Expand Down Expand Up @@ -570,6 +575,12 @@ export class OrderManager {
await this.scoreManager.updateScore(runner, errorType, httpVersion, extraLog);
}

async updateRatelimit(runner: string, limit: number, limitRemain: number, type: OrderType) {
if (limit) {
await this.scoreManager.updateRatelimit(runner, limit, limitRemain, type);
}
}

async collectLatency(indexer: string, latency: number, size: number): Promise<void> {
await this.scoreManager.collectLatency(indexer, latency, size);
}
Expand Down
56 changes: 52 additions & 4 deletions packages/network-support/src/scoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

import BigNumber from 'bignumber.js';
import { NotifyFunc, Order, ScoreWithDetail } from './types';
import { NotifyFunc, Order, OrderType, ScoreWithDetail } from './types';
import { Logger, IStore, createStore } from './utils';
import {
calculateBigIntPercentile,
Expand Down Expand Up @@ -89,8 +89,13 @@ export class ScoreManager {
return Math.min(score.score + Math.floor((Date.now() - score.lastUpdate) / 600_000), 100);
}

async getAdjustedScore(runner: string, proxyVersion?: string): Promise<ScoreWithDetail> {
async getAdjustedScore(
runner: string,
proxyVersion?: string,
type?: OrderType
): Promise<ScoreWithDetail> {
proxyVersion = proxyVersion || '';
type = type || OrderType.flexPlan;
const score = await this.getScore(runner);
const base = this.getAvailabilityScore(score);
const http2 = this.getHttpVersionWeight(score);
Expand All @@ -99,11 +104,21 @@ export class ScoreManager {
const block = await getBlockScoreWeight(this.scoreStore, runner, this.projectId);
const latency = await getLatencyScoreWeight(this.scoreStore, runner, this.projectId);
const price = await this.getPriceScoreWeight(runner);
const ratelimitInfo = await this.getRatelimitWeightInfo(runner, type);
const ratelimitWeight = ratelimitInfo.weight;

this.logger?.debug(
`getAdjustedScore: ${runner} ${this.projectId} base:${base} http2:${http2} manua:${manual} multiple:${multiple} block:${block} latency:${latency} price:${price}`
`getAdjustedScore: ${runner} ${
this.projectId
} base:${base} http2:${http2} manua:${manual} multiple:${multiple} block:${block} latency:${latency} price:${price} ratelimit:${JSON.stringify(
ratelimitInfo
)}`
);
return {
score: Math.floor(base * http2 * manual * multiple * block * latency * price * 10) / 10,
score:
Math.floor(
base * http2 * manual * multiple * block * latency * price * ratelimitWeight * 10
) / 10,
scoreDetail: {
base,
http2,
Expand All @@ -112,6 +127,9 @@ export class ScoreManager {
block,
latency,
price,
ratelimit: ratelimitWeight,
ratelimit_quota: ratelimitInfo.limit,
ratelimit_remain: ratelimitInfo.limitRemain,
},
};
// return base * http2 * manual * multiple * block;
Expand All @@ -133,6 +151,26 @@ export class ScoreManager {
return (await this.scoreStore.get<number>(`${key}:${runner}_${this.projectId}`)) || 1;
}

async getRatelimitWeightInfo(runner: string, type: OrderType) {
const key = this.getRatelimitKey();
const now = Math.floor(Date.now() / 1000);
const limitRes = (await this.scoreStore.get<string>(`${key}:${type}:${runner}:${now}`)) || '';

let [limit, limitRemain] = limitRes.split('_').map((v) => Number(v) || 0);
limit = Number(limit) || 0;
limitRemain = Number(limitRemain) || 0;

let weight = 1;
if (limit) {
weight = Math.pow(limitRemain / limit, 3);
}
return {
weight,
limit,
limitRemain,
};
}

async updateScore(runner: string, errorType: ScoreType, httpVersion?: number, extraLog?: any) {
if (!runner) {
this.logger?.debug('updateScore: runner is empty');
Expand Down Expand Up @@ -191,6 +229,12 @@ export class ScoreManager {
});
}

async updateRatelimit(runner: string, limit: number, limitRemain: number, type: OrderType) {
const key = this.getRatelimitKey();
const now = Math.floor(Date.now() / 1000);
await this.scoreStore.set(`${key}:${type}:${runner}:${now}`, `${limit}_${limitRemain}`, 1);
}

async collectLatency(indexer: string, latency: number, size: number): Promise<void> {
const isLocal = process.env.NODE_ENV === 'local';
sampleCount[indexer] = sampleCount[indexer] || 0;
Expand Down Expand Up @@ -256,4 +300,8 @@ export class ScoreManager {
private getPriceScoreKey(): string {
return 'score:price';
}

private getRatelimitKey(): string {
return `sample:ratelimit`;
}
}
12 changes: 11 additions & 1 deletion packages/network-support/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,17 @@ export interface RunnerSelector {
channelIds?: string[];
}

type SCORE_DETAIL_TYPE = 'base' | 'http2' | 'manual' | 'multiple' | 'block' | 'latency' | 'price';
type SCORE_DETAIL_TYPE =
| 'base'
| 'http2'
| 'manual'
| 'multiple'
| 'block'
| 'latency'
| 'price'
| 'ratelimit'
| 'ratelimit_quota'
| 'ratelimit_remain';

export type ScoreWithDetail = {
score: number;
Expand Down
2 changes: 1 addition & 1 deletion packages/network-support/src/utils/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { LRUCache as LRU } from 'lru-cache';
/* eslint-disable @typescript-eslint/no-unused-vars */
export interface IStore {
get<T>(key: string): Promise<T | undefined>;
set<T>(key: string, value: T): Promise<void>; // ttl in milliseconds
set<T>(key: string, value: T, ttl?: number): Promise<void>; // ttl in milliseconds
remove(key: string): Promise<void>;

lpush(key: string, value: any): Promise<number>;
Expand Down
Loading