diff --git a/.changeset/short-radios-sparkle.md b/.changeset/short-radios-sparkle.md new file mode 100644 index 0000000000..6f86588149 --- /dev/null +++ b/.changeset/short-radios-sparkle.md @@ -0,0 +1,5 @@ +--- +"@farcaster/replicator": patch +--- + +Consolidate to single subscription connection diff --git a/apps/replicator/src/hubReplicator.ts b/apps/replicator/src/hubReplicator.ts index 9dd40ec16f..94ae78298c 100644 --- a/apps/replicator/src/hubReplicator.ts +++ b/apps/replicator/src/hubReplicator.ts @@ -1,31 +1,21 @@ -import { - HubEvent, - HubEventType, - HubRpcClient, - OnChainEvent, - isHubError, - isMergeOnChainHubEvent, -} from "@farcaster/hub-nodejs"; +import { HubEvent, HubEventType, HubRpcClient, OnChainEvent, isHubError } from "@farcaster/hub-nodejs"; +import humanizeDuration from "humanize-duration"; import { Redis, RedisKey } from "ioredis"; +import { sql } from "kysely"; import { Logger } from "pino"; -import humanizeDuration from "humanize-duration"; import { DB, Tables, executeTakeFirstOrThrow, executeTx } from "./db.js"; -import { HubSubscriber } from "./hubSubscriber.js"; import { AssertionError } from "./error.js"; -import { ProcessHubEvent } from "./jobs/processHubEvent.js"; -import { sleep } from "./util.js"; -import { processMergeOnChainHubEvent } from "./processors/index.js"; -import { BackfillFidRegistration } from "./jobs/backfillFidRegistration.js"; +import { HubSubscriber } from "./hubSubscriber.js"; import { BackfillFidData } from "./jobs/backfillFidData.js"; +import { BackfillFidRegistration } from "./jobs/backfillFidRegistration.js"; +import { ProcessHubEvent } from "./jobs/processHubEvent.js"; import { processOnChainEvent } from "./processors/onChainEvent.js"; import { statsd } from "./statsd.js"; -import { sql } from "kysely"; +import { sleep } from "./util.js"; export class HubReplicator { - private chainEventsSubscriber: HubSubscriber; // Dedicated subscription for faster processing of on-chain events - private otherEventsSubscriber: HubSubscriber; - private lastChainEventIdKey: RedisKey; - private lastOtherEventIdKey: RedisKey; + private eventsSubscriber: HubSubscriber; + private lastHubEventIdKey: RedisKey; constructor( private hub: HubRpcClient, @@ -34,34 +24,18 @@ export class HubReplicator { private log: Logger, private redis: Redis, ) { - this.chainEventsSubscriber = new HubSubscriber("chain-events", this.hub, log, [HubEventType.MERGE_ON_CHAIN_EVENT]); - this.otherEventsSubscriber = new HubSubscriber("other-events", this.hub, log, [ + this.eventsSubscriber = new HubSubscriber("all-events", this.hub, log, [ + HubEventType.MERGE_ON_CHAIN_EVENT, HubEventType.MERGE_MESSAGE, HubEventType.MERGE_USERNAME_PROOF, HubEventType.PRUNE_MESSAGE, HubEventType.REVOKE_MESSAGE, ]); - this.lastChainEventIdKey = `hub:${this.hubAddress}:last-chain-event-id`; - this.lastOtherEventIdKey = `hub:${this.hubAddress}:last-other-event-id`; + this.lastHubEventIdKey = `hub:${this.hubAddress}:last-hub-event-id`; - this.chainEventsSubscriber.on("event", async (hubEvent) => { - try { - // Attempt to process immediately since chain events are relatively rare and they - // might create data that is depended on by other events (e.g. FIDs) - if (!isMergeOnChainHubEvent(hubEvent)) throw new AssertionError("Expected on-chain event"); - await processMergeOnChainHubEvent(hubEvent, this.db, this.log, this.redis); - await this.redis.set(this.lastChainEventIdKey, hubEvent.id); - } catch { - log.error( - `Error processing on-chain event ${hubEvent.id} (type ${hubEvent.type}). Enqueuing as job to retry later.`, - ); - this.processHubEvent(hubEvent); - } - }); - - this.otherEventsSubscriber.on("event", async (hubEvent) => { + this.eventsSubscriber.on("event", async (hubEvent) => { this.processHubEvent(hubEvent); - await this.redis.set(this.lastOtherEventIdKey, hubEvent.id); + await this.redis.set(this.lastHubEventIdKey, hubEvent.id); }); } @@ -73,13 +47,11 @@ export class HubReplicator { } public stop() { - this.chainEventsSubscriber.stop(); - this.otherEventsSubscriber.stop(); + this.eventsSubscriber.stop(); } public destroy() { - this.chainEventsSubscriber.destroy(); - this.otherEventsSubscriber.destroy(); + this.eventsSubscriber.destroy(); } private async processHubEvent(hubEvent: HubEvent) { @@ -133,10 +105,10 @@ export class HubReplicator { // If the last event we processed no longer exists on the hub, then we're too // far behind to catch up using the event stream alone. - const lastChainEventId = Number((await this.redis.get(this.lastChainEventIdKey)) ?? "0"); + const lastEventId = Number((await this.redis.get(this.lastHubEventIdKey)) ?? "0"); let tooFarBehind = false; try { - this.hub.getEvent({ id: lastChainEventId }); + this.hub.getEvent({ id: lastEventId }); } catch (e) { if (isHubError(e) && e.errCode === "not_found") { tooFarBehind = true; @@ -145,20 +117,6 @@ export class HubReplicator { } } - // Only check the other stream if the first stream was OK - if (!tooFarBehind) { - const lastOtherEventId = Number((await this.redis.get(this.lastOtherEventIdKey)) ?? "0"); - try { - this.hub.getEvent({ id: lastOtherEventId }); - } catch (e) { - if (isHubError(e) && e.errCode === "not_found") { - tooFarBehind = true; - } else { - throw e; - } - } - } - if (tooFarBehind) { // Force a full backfill again. We don't clear the backfilled-registrations set // since registrations are a single immutable event, but clear everything else. @@ -184,8 +142,7 @@ export class HubReplicator { // Finally, once all events have been processed, we can start subscribing to the event stream // and processing new events as they come in while backfilling messages. - void this.chainEventsSubscriber.start(); - void this.otherEventsSubscriber.start(); + void this.eventsSubscriber.start(); } private async waitForFidRegistrationsBackfill({ maxFid }: { maxFid: number }) {