Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: async fs operations #676

Open
wants to merge 2 commits into
base: feat/reregister-callback
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions components/client/typescript/src/predicates.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as fs from 'fs';
import * as fs from 'fs/promises';
import * as path from 'path';
import { logger } from './util/logger';
import {
Expand All @@ -17,17 +17,32 @@ const RegisteredPredicates = new Map<string, Predicate>();

const CompiledPredicateSchema = TypeCompiler.Compile(PredicateSchema);

// Async version of fs.existsSync
async function pathExists(path: string): Promise<boolean> {
try {
await fs.access(path);
return true;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return false;
}
throw error; // Re-throw other errors (e.g., permission issues)
}
}

/**
* Looks on disk and returns a map of registered Predicates, where the key is the predicate `name`
* as defined by the user.
*/
export function recallPersistedPredicatesFromDisk(basePath: string): Map<string, Predicate> {
export async function recallPersistedPredicatesFromDisk(
basePath: string
): Promise<Map<string, Predicate>> {
RegisteredPredicates.clear();
try {
if (!fs.existsSync(basePath)) return RegisteredPredicates;
for (const file of fs.readdirSync(basePath)) {
if (!(await pathExists(basePath))) return RegisteredPredicates;
for (const file of await fs.readdir(basePath)) {
if (file.endsWith('.json')) {
const text = fs.readFileSync(path.join(basePath, file), 'utf-8');
const text = await fs.readFile(path.join(basePath, file), 'utf-8');
const predicate = JSON.parse(text) as JSON;
if (CompiledPredicateSchema.Check(predicate)) {
logger.info(
Expand All @@ -44,11 +59,11 @@ export function recallPersistedPredicatesFromDisk(basePath: string): Map<string,
return RegisteredPredicates;
}

export function savePredicateToDisk(basePath: string, predicate: Predicate) {
export async function savePredicateToDisk(basePath: string, predicate: Predicate) {
const predicatePath = `${basePath}/predicate-${encodeURIComponent(predicate.name)}.json`;
try {
fs.mkdirSync(basePath, { recursive: true });
fs.writeFileSync(predicatePath, JSON.stringify(predicate, null, 2));
await fs.mkdir(basePath, { recursive: true });
await fs.writeFile(predicatePath, JSON.stringify(predicate, null, 2));
logger.info(
`ChainhookEventObserver persisted predicate '${predicate.name}' (${predicate.uuid}) to disk`
);
Expand All @@ -60,13 +75,18 @@ export function savePredicateToDisk(basePath: string, predicate: Predicate) {
}
}

function deletePredicateFromDisk(basePath: string, predicate: Predicate) {
async function deletePredicateFromDisk(basePath: string, predicate: Predicate) {
const predicatePath = `${basePath}/predicate-${encodeURIComponent(predicate.name)}.json`;
if (fs.existsSync(predicatePath)) {
fs.rmSync(predicatePath);
try {
await fs.rm(predicatePath);
logger.info(
`ChainhookEventObserver deleted predicate '${predicate.name}' (${predicate.uuid}) from disk`
);
} catch (error: unknown) {
// ignore if the file doesn't exist
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
logger.error(error, `Failed to delete predicate`);
}
}
}

Expand Down Expand Up @@ -165,7 +185,7 @@ async function registerPredicate(
logger.info(
`ChainhookEventObserver registered '${newPredicate.name}' predicate (${newPredicate.uuid})`
);
savePredicateToDisk(observer.predicate_disk_file_path, newPredicate);
await savePredicateToDisk(observer.predicate_disk_file_path, newPredicate);
RegisteredPredicates.set(newPredicate.name, newPredicate);
} catch (error) {
logger.error(error, `ChainhookEventObserver unable to register predicate`);
Expand All @@ -190,7 +210,7 @@ async function removePredicate(
throwOnError: true,
});
logger.info(`ChainhookEventObserver removed predicate '${predicate.name}' (${predicate.uuid})`);
deletePredicateFromDisk(observer.predicate_disk_file_path, predicate);
await deletePredicateFromDisk(observer.predicate_disk_file_path, predicate);
} catch (error) {
logger.error(error, `ChainhookEventObserver unable to deregister predicate`);
}
Expand All @@ -207,7 +227,7 @@ export async function registerAllPredicatesOnObserverReady(
logger.info(`ChainhookEventObserver does not have predicates to register`);
return;
}
const diskPredicates = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const diskPredicates = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
for (const predicate of predicates)
await registerPredicate(predicate, diskPredicates, observer, chainhook);
}
Expand All @@ -217,7 +237,7 @@ export async function removeAllPredicatesOnObserverClose(
observer: EventObserverOptions,
chainhook: ChainhookNodeOptions
) {
const diskPredicates = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const diskPredicates = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
if (diskPredicates.size === 0) {
logger.info(`ChainhookEventObserver does not have predicates to close`);
return;
Expand Down
8 changes: 4 additions & 4 deletions components/client/typescript/tests/predicates.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe('predicates', () => {
await server.start([testPredicate], async () => {});

expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true);
const disk = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const disk = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const storedPredicate = disk.get('test');
expect(storedPredicate).not.toBeUndefined();
expect(storedPredicate?.name).toBe(testPredicate.name);
Expand All @@ -102,8 +102,8 @@ describe('predicates', () => {
});

describe('pre-stored', () => {
beforeEach(() => {
savePredicateToDisk(observer.predicate_disk_file_path, {
beforeEach(async () => {
await savePredicateToDisk(observer.predicate_disk_file_path, {
uuid: 'e2777d77-473a-4c1d-9012-152deb36bf4c',
name: 'test',
version: 1,
Expand Down Expand Up @@ -164,7 +164,7 @@ describe('predicates', () => {

mockAgent.assertNoPendingInterceptors();
expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true);
const disk = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const disk = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const storedPredicate = disk.get('test');
// Should have a different uuid
expect(storedPredicate?.uuid).not.toBe('e2777d77-473a-4c1d-9012-152deb36bf4c');
Expand Down
Loading