Skip to content

Commit

Permalink
fix: Consolidate to a single subscribe stream in Replicator
Browse files Browse the repository at this point in the history
We were seeing issues where events would arrive out of order when
processing two streams. To avoid this, have them be processed on the
same stream, where hubs will ensure they are emitted in order
(specifically FID registration followed by messages for the given FID).
  • Loading branch information
sds committed Nov 1, 2023
1 parent e808a4e commit 135e023
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 62 deletions.
5 changes: 5 additions & 0 deletions .changeset/short-radios-sparkle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/replicator": patch
---

Consolidate to single subscription connection
81 changes: 19 additions & 62 deletions apps/replicator/src/hubReplicator.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,21 @@
import {
HubEvent,
HubEventType,
HubRpcClient,
OnChainEvent,
isHubError,
isMergeOnChainHubEvent,
} from "@farcaster/hub-nodejs";
import { HubEvent, HubEventType, HubRpcClient, OnChainEvent, isHubError } from "@farcaster/hub-nodejs";
import humanizeDuration from "humanize-duration";
import { Redis, RedisKey } from "ioredis";
import { sql } from "kysely";
import { Logger } from "pino";
import humanizeDuration from "humanize-duration";
import { DB, Tables, executeTakeFirstOrThrow, executeTx } from "./db.js";
import { HubSubscriber } from "./hubSubscriber.js";
import { AssertionError } from "./error.js";
import { ProcessHubEvent } from "./jobs/processHubEvent.js";
import { sleep } from "./util.js";
import { processMergeOnChainHubEvent } from "./processors/index.js";
import { BackfillFidRegistration } from "./jobs/backfillFidRegistration.js";
import { HubSubscriber } from "./hubSubscriber.js";
import { BackfillFidData } from "./jobs/backfillFidData.js";
import { BackfillFidRegistration } from "./jobs/backfillFidRegistration.js";
import { ProcessHubEvent } from "./jobs/processHubEvent.js";
import { processOnChainEvent } from "./processors/onChainEvent.js";
import { statsd } from "./statsd.js";
import { sql } from "kysely";
import { sleep } from "./util.js";

export class HubReplicator {
private chainEventsSubscriber: HubSubscriber; // Dedicated subscription for faster processing of on-chain events
private otherEventsSubscriber: HubSubscriber;
private lastChainEventIdKey: RedisKey;
private lastOtherEventIdKey: RedisKey;
private eventsSubscriber: HubSubscriber;
private lastHubEventIdKey: RedisKey;

constructor(
private hub: HubRpcClient,
Expand All @@ -34,34 +24,18 @@ export class HubReplicator {
private log: Logger,
private redis: Redis,
) {
this.chainEventsSubscriber = new HubSubscriber("chain-events", this.hub, log, [HubEventType.MERGE_ON_CHAIN_EVENT]);
this.otherEventsSubscriber = new HubSubscriber("other-events", this.hub, log, [
this.eventsSubscriber = new HubSubscriber("all-events", this.hub, log, [
HubEventType.MERGE_ON_CHAIN_EVENT,
HubEventType.MERGE_MESSAGE,
HubEventType.MERGE_USERNAME_PROOF,
HubEventType.PRUNE_MESSAGE,
HubEventType.REVOKE_MESSAGE,
]);
this.lastChainEventIdKey = `hub:${this.hubAddress}:last-chain-event-id`;
this.lastOtherEventIdKey = `hub:${this.hubAddress}:last-other-event-id`;
this.lastHubEventIdKey = `hub:${this.hubAddress}:last-hub-event-id`;

this.chainEventsSubscriber.on("event", async (hubEvent) => {
try {
// Attempt to process immediately since chain events are relatively rare and they
// might create data that is depended on by other events (e.g. FIDs)
if (!isMergeOnChainHubEvent(hubEvent)) throw new AssertionError("Expected on-chain event");
await processMergeOnChainHubEvent(hubEvent, this.db, this.log, this.redis);
await this.redis.set(this.lastChainEventIdKey, hubEvent.id);
} catch {
log.error(
`Error processing on-chain event ${hubEvent.id} (type ${hubEvent.type}). Enqueuing as job to retry later.`,
);
this.processHubEvent(hubEvent);
}
});

this.otherEventsSubscriber.on("event", async (hubEvent) => {
this.eventsSubscriber.on("event", async (hubEvent) => {
this.processHubEvent(hubEvent);
await this.redis.set(this.lastOtherEventIdKey, hubEvent.id);
await this.redis.set(this.lastHubEventIdKey, hubEvent.id);
});
}

Expand All @@ -73,13 +47,11 @@ export class HubReplicator {
}

public stop() {
this.chainEventsSubscriber.stop();
this.otherEventsSubscriber.stop();
this.eventsSubscriber.stop();
}

public destroy() {
this.chainEventsSubscriber.destroy();
this.otherEventsSubscriber.destroy();
this.eventsSubscriber.destroy();
}

private async processHubEvent(hubEvent: HubEvent) {
Expand Down Expand Up @@ -133,10 +105,10 @@ export class HubReplicator {

// If the last event we processed no longer exists on the hub, then we're too
// far behind to catch up using the event stream alone.
const lastChainEventId = Number((await this.redis.get(this.lastChainEventIdKey)) ?? "0");
const lastEventId = Number((await this.redis.get(this.lastHubEventIdKey)) ?? "0");
let tooFarBehind = false;
try {
this.hub.getEvent({ id: lastChainEventId });
this.hub.getEvent({ id: lastEventId });
} catch (e) {
if (isHubError(e) && e.errCode === "not_found") {
tooFarBehind = true;
Expand All @@ -145,20 +117,6 @@ export class HubReplicator {
}
}

// Only check the other stream if the first stream was OK
if (!tooFarBehind) {
const lastOtherEventId = Number((await this.redis.get(this.lastOtherEventIdKey)) ?? "0");
try {
this.hub.getEvent({ id: lastOtherEventId });
} catch (e) {
if (isHubError(e) && e.errCode === "not_found") {
tooFarBehind = true;
} else {
throw e;
}
}
}

if (tooFarBehind) {
// Force a full backfill again. We don't clear the backfilled-registrations set
// since registrations are a single immutable event, but clear everything else.
Expand All @@ -184,8 +142,7 @@ export class HubReplicator {

// Finally, once all events have been processed, we can start subscribing to the event stream
// and processing new events as they come in while backfilling messages.
void this.chainEventsSubscriber.start();
void this.otherEventsSubscriber.start();
void this.eventsSubscriber.start();
}

private async waitForFidRegistrationsBackfill({ maxFid }: { maxFid: number }) {
Expand Down

0 comments on commit 135e023

Please sign in to comment.