Skip to content

Commit

Permalink
Record async migration execution time
Browse files Browse the repository at this point in the history
Record async migration execution time in migration status table.
Add migrationMetricsEmitter to emit metrics
Current metrics emitted:
- async_migration_status
- sync_migration_status

Change-type: minor
Signed-off-by: fisehara <[email protected]>
  • Loading branch information
fisehara committed Dec 20, 2022
1 parent 889c06f commit 04cded0
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 15 deletions.
18 changes: 15 additions & 3 deletions src/migrator/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import {
filterAndSortPendingMigrations,
MigrationStatus,
BaseAsyncMigration,
migrationMetricsEmitter,
MigrationMetricsEventName,
} from './utils';

export const run = async (tx: Tx, model: ApiRootModel): Promise<void> => {
Expand Down Expand Up @@ -122,7 +124,7 @@ const $run = async (
let asyncRunnerMigratorFn: (tx: Tx) => Promise<number>;
let initMigrationState: InitialMigrationStatus = {
migration_key: key,
start_time: new Date(),
last_execution_time_ms: 0,
last_run_time: new Date(),
run_count: 0,
migrated_row_count: 0,
Expand Down Expand Up @@ -212,7 +214,7 @@ const $run = async (
);
return false;
}
// sync on the last execution time between instances
// sync on the execution time between instances
// precondition: All running instances are running on the same time/block
// skip execution
if (migrationState.last_run_time) {
Expand All @@ -229,11 +231,15 @@ const $run = async (
try {
// here a separate transaction is needed as this migration may fail
// when it fails it would break the transaction for managing the migration status
const executionStartTimeMS = Date.now();
const migratedRows = await sbvrUtils.db.transaction(
async (migrationTx) => {
return (await asyncRunnerMigratorFn?.(migrationTx)) ?? 0;
},
);
migrationState.last_execution_time_ms =
Date.now() - executionStartTimeMS;

migrationState.migrated_row_count += migratedRows;
if (migratedRows === 0) {
// when all rows have been catched up once we only catch up less frequently
Expand All @@ -250,7 +256,7 @@ const $run = async (
if (err instanceof Error) {
if (
migrationState.error_count %
initMigrationState.errorThreshold ===
initMigrationState.errorThreshold ===
0
) {
(sbvrUtils.api.migrations?.logger.error ?? console.error)(
Expand All @@ -268,6 +274,12 @@ const $run = async (
// either success or error release the lock
migrationState.last_run_time = new Date();
migrationState.run_count += 1;
// just emit
migrationMetricsEmitter.emit(
MigrationMetricsEventName.async_migration_status,
migrationState,
);

await updateMigrationStatus(tx, migrationState);
}
return migrationState;
Expand Down
8 changes: 4 additions & 4 deletions src/migrator/migrations.sbvr
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ Fact Type: migration lock has model name

Term: migration key
Concept Type: Short Text (Type)
Term: start time
Concept Type: Date Time (Type)
Term: last execution time ms
Concept Type: Integer (Type)
Term: last run time
Concept Type: Date Time (Type)
Term: run count
Expand All @@ -45,8 +45,8 @@ Term: migration status
Fact Type: migration status has migration key
Necessity: each migration status has exactly one migration key

Fact Type: migration status has start time
Necessity: each migration status has at most one start time
Fact Type: migration status has last execution time ms
Necessity: each migration status has at most one last execution time ms

Fact Type: migration status has last run time
Necessity: each migration status has at most one last run time
Expand Down
17 changes: 17 additions & 0 deletions src/migrator/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
RunnableMigrations,
filterAndSortPendingMigrations,
getRunnableSyncMigrations,
migrationMetricsEmitter,
MigrationMetricsEventName,
} from './utils';
import type { Tx } from '../database-layer/db';
import type { Config, Model } from '../config-loader/config-loader';
Expand Down Expand Up @@ -122,13 +124,24 @@ const executeMigration = async (
`Running migration ${JSON.stringify(key)}`,
);

const migrationStartTimeMs = Date.now();

if (typeof migration === 'function') {
await migration(tx, sbvrUtils);
} else if (typeof migration === 'string') {
await tx.executeSql(migration);
} else {
throw new MigrationError(`Invalid migration type: ${typeof migration}`);
}

// follow the same interface for migration_key and last_execution_time_ms as migration status
migrationMetricsEmitter.emit(
MigrationMetricsEventName.sync_migration_status,
{
migration_key: key,
last_execution_time_ms: Date.now() - migrationStartTimeMs,
},
);
};

export const config: Config = {
Expand All @@ -146,6 +159,10 @@ export const config: Config = {
ALTER TABLE "migration lock"
ADD COLUMN IF NOT EXISTS "modified at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL;
`,
'14.55.0-last-execution-time': `
ALTER TABLE "migration status"
ADD COLUMN IF NOT EXISTS "last execution time ms" INTEGER NULL;
`,
},
},
],
Expand Down
22 changes: 16 additions & 6 deletions src/migrator/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import { TypedError } from 'typed-error';
import { migrator as migratorEnv } from '../config-loader/env';
export { migrator as migratorEnv } from '../config-loader/env';
import { delay } from '../sbvr-api/control-flow';
import { EventEmitter } from 'eventemitter3';

export const migrationMetricsEmitter = new EventEmitter();

export enum MigrationMetricsEventName {
'sync_migration_status' = 'sync_migration_status',
'async_migration_status' = 'async_migration_status',
}

// tslint:disable-next-line:no-var-requires
export const modelText = require('./migrations.sbvr');
Expand Down Expand Up @@ -87,7 +95,7 @@ export class MigrationError extends TypedError {}

export type MigrationStatus = {
migration_key: string;
start_time: Date;
last_execution_time_ms: number;
last_run_time: Date | null;
run_count: number;
migrated_row_count: number;
Expand Down Expand Up @@ -283,13 +291,13 @@ export const initMigrationStatus = async (
try {
return await tx.executeSql(
binds`
INSERT INTO "migration status" ("migration key", "start time", "is backing off", "run count")
INSERT INTO "migration status" ("migration key", "last execution time ms", "is backing off", "run count")
SELECT ${1}, ${2}, ${3}, ${4}
WHERE NOT EXISTS (SELECT 1 FROM "migration status" WHERE "migration key" = ${5})
`,
[
migrationStatus['migration_key'],
migrationStatus['start_time'],
migrationStatus['last_execution_time_ms'],
migrationStatus['is_backing_off'] ? 1 : 0,
migrationStatus['run_count'],
migrationStatus['migration_key'],
Expand All @@ -316,15 +324,17 @@ SET
"migrated row count" = ${3},
"error count" = ${4},
"converged time" = ${5},
"is backing off" = ${6}
WHERE "migration status"."migration key" = ${7};`,
"is backing off" = ${6},
"last execution time ms" = ${7}
WHERE "migration status"."migration key" = ${8};`,
[
migrationStatus['run_count'],
migrationStatus['last_run_time'],
migrationStatus['migrated_row_count'],
migrationStatus['error_count'],
migrationStatus['converged_time'],
migrationStatus['is_backing_off'] ? 1 : 0,
migrationStatus['last_execution_time_ms'],
migrationStatus['migration_key'],
],
);
Expand Down Expand Up @@ -355,7 +365,7 @@ LIMIT 1;`,

return {
migration_key: data['migration key'],
start_time: data['start time'],
last_execution_time_ms: data['last execution time ms'],
last_run_time: data['last run time'],
run_count: data['run count'],
migrated_row_count: data['migrated row count'],
Expand Down
2 changes: 1 addition & 1 deletion src/server-glue/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export * as env from '../config-loader/env';
export * as types from '../sbvr-api/common-types';
export * as hooks from '../sbvr-api/hooks';
export type { configLoader as ConfigLoader };
export type { migratorUtils as Migrator };
export { migratorUtils as Migrator };

let envDatabaseOptions: dbModule.DatabaseOptions<string>;
if (dbModule.engines.websql != null) {
Expand Down
25 changes: 24 additions & 1 deletion test/03-async-migrator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,29 @@ describe('03 Async Migrations', async function () {
expect(result[0]?.migrated_row_count - firstRowsMigrated).to.equal(1);
expect(Date.now().valueOf() - startTime).to.be.greaterThan(4000); // backOff time from migrator
});

it('should record last execution time ms of the last migration run', async function () {
let result: MigrationStatus[] = [];

const startTime = Date.now().valueOf();
// Wait until all migrations have run at least once
while (result.length === 0 || !result.every((row) => row.run_count > 0)) {
result = await getMigrationStatus();
}

const checkMigration0003 = result.find(
(row) => row.migration_key === '0003',
);
expect(checkMigration0003?.last_execution_time_ms).to.be.greaterThan(500); // should be reported greater than 500 ms (from the async migration function)
expect(checkMigration0003?.last_execution_time_ms).to.be.lessThan(2 * 500); // should not exceed twice the last execution time ms defined in the async migration function

/**
* Here it may need to be checked if the event for the async_migration_status metric was emitted.
* As the pine instance is running in a childprocess the EventEmitter is decoupled from the test instance.
* The EventEmitter exported from the migration utilities cannot be used for registering a listener, as it would
* instantiate a separate EventEmitter in this process, which is not linked to the child-process in which pine actually runs.
*/
});
});

describe('Init sync and async migrations for new model', function () {
Expand All @@ -226,7 +249,7 @@ describe('03 Async Migrations', async function () {
.to.be.an('array');
expect(res.body.d[0]?.executed_migrations)
.to.be.an('array')
.to.have.all.members(['0001', '0002']);
.to.have.all.members(['0001', '0002', '0003']);
});
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { AsyncMigration } from '../../../../../src/migrator/utils';

const migration: AsyncMigration = {
asyncFn: async (tx, options) => {
options;
return await tx.executeSql(`SELECT pg_sleep(0.5);`);
},
asyncBatchSize: 1,
syncFn: async (tx) => {
await tx.executeSql(`SELECT pg_sleep(0.5);`);
},
delayMS: 1000,
backoffDelayMS: 4000,
errorThreshold: 15,
finalize: false,
};

export default migration;

0 comments on commit 04cded0

Please sign in to comment.