Skip to content

Commit

Permalink
Extract various minor enhancements from #474 (#513)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie authored Nov 22, 2024
2 parents 7b31a89 + cef1de1 commit 3e87e12
Show file tree
Hide file tree
Showing 17 changed files with 166 additions and 82 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ node_modules/
/tasks/
/rewired/
_LOCAL/
/graphile-pro-worker
13 changes: 8 additions & 5 deletions __tests__/migrate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {

const options: WorkerSharedOptions = {};

const MAX_MIGRATION_NUMBER = 18;

test("migration installs schema; second migration does no harm", async () => {
await withPgClient(async (pgClient) => {
await pgClient.query(
Expand All @@ -40,7 +42,7 @@ test("migration installs schema; second migration does no harm", async () => {
const { rows: migrationRows } = await pgClient.query(
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations order by id asc`,
);
expect(migrationRows).toHaveLength(18);
expect(migrationRows).toHaveLength(MAX_MIGRATION_NUMBER);
const migration = migrationRows[0];
expect(migration.id).toEqual(1);

Expand Down Expand Up @@ -90,16 +92,17 @@ test("multiple concurrent installs of the schema is fine", async () => {
);
}
} finally {
await Promise.allSettled(promises);
const results = await Promise.allSettled(promises);
await Promise.allSettled(clients.map((c) => c.release()));
expect(results.every((r) => r.status === "fulfilled")).toBeTruthy();
}
});
await withPgClient(async (pgClient) => {
// Assert migrations table exists and has relevant entries
const { rows: migrationRows } = await pgClient.query(
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations order by id asc`,
);
expect(migrationRows).toHaveLength(18);
expect(migrationRows).toHaveLength(MAX_MIGRATION_NUMBER);
const migration = migrationRows[0];
expect(migration.id).toEqual(1);

Expand Down Expand Up @@ -147,7 +150,7 @@ insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations (id) values (1);
const { rows: migrationRows } = await pgClient.query(
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations order by id asc`,
);
expect(migrationRows.length).toBeGreaterThanOrEqual(18);
expect(migrationRows.length).toBeGreaterThanOrEqual(MAX_MIGRATION_NUMBER);
const migration2 = migrationRows[1];
expect(migration2.id).toEqual(2);
expect(migration2.breaking).toEqual(false);
Expand Down Expand Up @@ -208,7 +211,7 @@ test("aborts if database is more up to date than current worker", async () => {
await expect(
migrate(compiledSharedOptions, pgClient),
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Database is using Graphile Worker schema revision 999999 which includes breaking migration 999999, but the currently running worker only supports up to revision 18. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible."`,
`"Database is using Graphile Worker schema revision 999999 which includes breaking migration 999999, but the currently running worker only supports up to revision ${MAX_MIGRATION_NUMBER}. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible."`,
);
});
});
Expand Down
2 changes: 2 additions & 0 deletions __tests__/runner.helpers.getTaskName.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ beforeAll(() => {
connectionString: databaseDetails!.TEST_CONNECTION_STRING,
max: JOB_COUNT * 2 + 5,
});
pgPool.on("error", () => {});
pgPool.on("connect", () => {});
});
afterAll(() => {
pgPool.end();
Expand Down
4 changes: 3 additions & 1 deletion __tests__/runner.runOnce.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Pool } from "pg";

import { makeWorkerPresetWorkerOptions } from "../src/config";
import { Job, RunnerOptions, WorkerUtils } from "../src/interfaces";
import { coerceError } from "../src/lib";
import { _allWorkerPools } from "../src/main";
import { WorkerPreset } from "../src/preset";
import { runOnce } from "../src/runner";
Expand Down Expand Up @@ -55,7 +56,8 @@ async function runOnceErrorAssertion(
expect.assertions(1);
try {
await runOnce(options);
} catch (e) {
} catch (rawE) {
const e = coerceError(rawE);
expect(e.message).toMatch(message);
}
}
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@
"homepage": "https://github.com/graphile/worker#readme",
"dependencies": {
"@graphile/logger": "^0.2.0",
"@tsconfig/node18": "^18.2.4",
"@types/debug": "^4.1.10",
"@types/pg": "^8.10.5",
"cosmiconfig": "^8.3.6",
"graphile-config": "^0.0.1-beta.4",
"graphile-config": "^0.0.1-beta.11",
"json5": "^2.2.3",
"pg": "^8.11.3",
"tslib": "^2.6.2",
Expand Down
23 changes: 14 additions & 9 deletions src/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ import {
TimestampDigest,
WorkerEvents,
} from "./interfaces";
import { CompiledOptions, CompiledSharedOptions, Releasers } from "./lib";
import {
coerceError,
CompiledOptions,
CompiledSharedOptions,
Releasers,
} from "./lib";

interface CronRequirements {
pgPool: Pool;
Expand Down Expand Up @@ -333,19 +338,19 @@ export const runCron = (
}

const start = new Date();
events.emit("cron:starting", { cron: this, start });
events.emit("cron:starting", { cron, start });

// We must backfill BEFORE scheduling any new jobs otherwise backfill won't
// work due to known_crontabs.last_execution having been updated.
await registerAndBackfillItems(
{ pgPool, events, cron: this },
{ pgPool, events, cron },
escapedWorkerSchema,
parsedCronItems,
new Date(+start),
useNodeTime,
);

events.emit("cron:started", { cron: this, start });
events.emit("cron:started", { cron, start });

if (!cron._active) {
return stop();
Expand Down Expand Up @@ -406,7 +411,7 @@ export const runCron = (
},
);
events.emit("cron:prematureTimer", {
cron: this,
cron,
currentTimestamp,
expectedTimestamp,
});
Expand All @@ -422,7 +427,7 @@ export const runCron = (
)}s behind)`,
);
events.emit("cron:overdueTimer", {
cron: this,
cron,
currentTimestamp,
expectedTimestamp,
});
Expand All @@ -444,7 +449,7 @@ export const runCron = (
// Finally actually run the jobs.
if (jobsAndIdentifiers.length) {
events.emit("cron:schedule", {
cron: this,
cron,
timestamp: expectedTimestamp,
jobsAndIdentifiers,
});
Expand All @@ -456,7 +461,7 @@ export const runCron = (
useNodeTime,
);
events.emit("cron:scheduled", {
cron: this,
cron,
timestamp: expectedTimestamp,
jobsAndIdentifiers,
});
Expand All @@ -475,7 +480,7 @@ export const runCron = (
} catch (e) {
// If something goes wrong; abort. The calling code should re-schedule
// which will re-trigger the backfilling code.
return stop(e);
return stop(coerceError(e));
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/crontab.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
CronItemOptions,
ParsedCronItem,
} from "./interfaces";
import { coerceError } from "./lib";

/**
* Returns a period of time in milliseconds representing the time phrase given.
Expand Down Expand Up @@ -179,7 +180,9 @@ const parseCrontabPayload = (
return JSON5.parse(payloadString);
} catch (e) {
throw new Error(
`Failed to parse JSON5 payload on line ${lineNumber} of crontab: ${e.message}`,
`Failed to parse JSON5 payload on line ${lineNumber} of crontab: ${
coerceError(e).message
}`,
);
}
};
Expand Down
6 changes: 3 additions & 3 deletions src/deferred.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ export interface Deferred<T = void> extends Promise<T> {
}

export default function defer<T = void>(): Deferred<T> {
let resolve: (result?: T | PromiseLike<T>) => void;
let reject: (error: Error) => void;
let resolve: Deferred<T>["resolve"];
let reject: Deferred<T>["reject"];
return Object.assign(
new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
resolve = _resolve as Deferred<T>["resolve"];
reject = _reject;
}),
// @ts-ignore error TS2454: Variable 'resolve' is used before being assigned.
Expand Down
1 change: 0 additions & 1 deletion src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,6 @@ export type WorkerEventMap = {
"pool:listen:error": {
workerPool: WorkerPool;
error: unknown;
client: PoolClient;
};

/**
Expand Down
34 changes: 31 additions & 3 deletions src/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,17 @@ export async function assertPool(
pgPool = _rawOptions.pgPool;
if (pgPool.listeners("error").length === 0) {
console.warn(
`Your pool doesn't have error handlers! See: https://err.red/wpeh`,
`Your pool doesn't have error handlers! See: https://err.red/wpeh?v=${encodeURIComponent(
version,
)}`,
);
installErrorHandlers(compiledSharedOptions, releasers, pgPool);
}
if (pgPool.listeners("connect").length === 0) {
console.warn(
`Your pool doesn't have all of the error handlers! See: https://err.red/wpeh?v=${encodeURIComponent(
version,
)}&method=connect`,
);
installErrorHandlers(compiledSharedOptions, releasers, pgPool);
}
Expand Down Expand Up @@ -400,7 +410,7 @@ export async function withReleasers<T>(
try {
await releasers[i]();
} catch (e) {
firstError = firstError || e;
firstError ??= coerceError(e);
}
}
if (firstError) {
Expand Down Expand Up @@ -530,7 +540,8 @@ export function makeEnhancedWithPgClient(
for (let attempts = 0; attempts < MAX_RETRIES; attempts++) {
try {
return await withPgClient(...args);
} catch (e) {
} catch (rawE) {
const e = coerceError(rawE);
const retryable = RETRYABLE_ERROR_CODES.find(
({ code }) => code === e.code,
);
Expand All @@ -551,3 +562,20 @@ export function makeEnhancedWithPgClient(

export const sleep = (ms: number) =>
new Promise<void>((resolve) => setTimeout(resolve, ms));

export function coerceError(err: unknown): Error & { code?: unknown } {
if (err instanceof Error) {
return err;
} else {
const message =
typeof err === "object" && err !== null && "message" in err
? String(err.message)
: "An error occurred";
return new Error(message, { cause: err });
}
}

export function isPromiseLike<T>(v: PromiseLike<T> | T): v is PromiseLike<T> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return v != null && typeof (v as any).then === "function";
}
Loading

0 comments on commit 3e87e12

Please sign in to comment.