Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exporter improvements #2617

Merged
merged 3 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed the issue where flags that require special handling were being overwritten.(#2612)

### Changed
- Various internal improvements to exporters (CSV exporter) (#2617)

## [15.0.3] - 2024-11-26
### Fixed
- Workers crashing because of lazy monitor write (#2607)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ export abstract class BaseCacheService
if ((this.isFlushable() || forceFlush) && this.flushableRecords > 0) {
this.pendingFlush = this._flushCache();
// Remove reference to pending flush once it completes
this.pendingFlush.finally(() => (this.pendingFlush = undefined));
this.pendingFlush
.catch((e) => {
/* Do nothing, avoids uncaught exception */
})
.finally(() => (this.pendingFlush = undefined));
await this.pendingFlush;
}
};
Expand All @@ -48,7 +52,11 @@ export abstract class BaseCacheService
if (this.queuedFlush === undefined) {
this.queuedFlush = flushCacheGuarded(forceFlush);

this.queuedFlush.finally(() => (this.queuedFlush = undefined));
this.queuedFlush
.catch((e) => {
/* Do nothing, avoids uncaught exception */
})
.finally(() => (this.queuedFlush = undefined));
}

return this.queuedFlush;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ import {ModelStatic} from '@subql/x-sequelize';
import {getLogger} from '../../logger';
import {MetadataRepo, PoiRepo} from '../entities';
import {HistoricalMode} from '../types';
import {Exporter} from './exporters';
import {METADATA_ENTITY_NAME} from './metadata/utils';
import {BaseEntity, IModel} from './model';
import {POI_ENTITY_NAME} from './poi';
import {Exporter} from './types';

const logger = getLogger('BaseStoreModelService');
export abstract class BaseStoreModelService<M = IModel<any>> implements BeforeApplicationShutdown {
protected historical: HistoricalMode = 'height';
protected poiRepo?: PoiRepo;
protected metadataRepo?: MetadataRepo;
protected cachedModels: Record<string, M> = {};
protected exports: Exporter[] = [];

protected abstract createModel(entity: string): M;

Expand Down Expand Up @@ -48,7 +47,13 @@ export abstract class BaseStoreModelService<M = IModel<any>> implements BeforeAp
}

async beforeApplicationShutdown(): Promise<void> {
await Promise.all(this.exports.map((f) => f.shutdown()));
await Promise.all(this.getAllExporters().map((e) => e.shutdown()));
logger.info(`Force flush exports successful!`);
}

getAllExporters(): Exporter[] {
return Object.values(this.cachedModels)
.map((m) => (m as any).exporters ?? [])
.flat();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ export abstract class Cacheable {
release();
});

const pendingFlush = this.runFlush(tx, historicalUnit);
await pendingFlush;
await this.runFlush(tx, historicalUnit);
} catch (e) {
release();
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

import fs from 'fs';
import path from 'path';
import type {Fn} from '@subql/x-sequelize/types/utils';
import {rimraf} from 'rimraf';
import {CsvStoreService} from './csvStore.service';
import {CsvExporter} from './csvStore';

describe('csv Store Service', () => {
const csvDirPath = path.join(__dirname, '../../../test/csv-test');
describe('CSV Exporter', () => {
const csvDirPath = path.join(__dirname, '../../../../test/csv-test');

beforeAll(async () => {
await fs.promises.mkdir(csvDirPath);
Expand All @@ -16,34 +17,24 @@ describe('csv Store Service', () => {
afterAll(async () => {
await rimraf(csvDirPath);
});

it('Able to export to csv with correct output, No duplicated headers', async () => {
const csvFilePath1 = path.join(csvDirPath, 'Test.csv');

const csvStore = new CsvStoreService('Test', csvDirPath);

await csvStore.export([
{
id: '1463-6',
amount: 98746560n,
blockNumber: 1463,
date: new Date('2020-05-26T18:03:24.000Z'),
fromId: '13gkdcmf2pxlw1mdctksezqf541ksy6mszfaehw5vftdpsxe',
toId: '15zf7zvduiy2eycgn6kwbv2sjpdbsp6vdhs1ytzdgjrcsmhn',
__block_range: {fn: 'int8range', args: [1463, null]},
},
]);

await csvStore.export([
{
id: '1463-6',
amount: 98746560n,
blockNumber: 1463,
date: new Date('2020-05-26T18:03:24.000Z'),
fromId: '13gkdcmf2pxlw1mdctksezqf541ksy6mszfaehw5vftdpsxe',
toId: '15zf7zvduiy2eycgn6kwbv2sjpdbsp6vdhs1ytzdgjrcsmhn',
__block_range: {fn: 'int8range', args: [1463, null]},
},
]);
const data = {
id: '1463-6',
amount: 98746560n,
blockNumber: 1463,
date: new Date('2020-05-26T18:03:24.000Z'),
fromId: '13gkdcmf2pxlw1mdctksezqf541ksy6mszfaehw5vftdpsxe',
toId: '15zf7zvduiy2eycgn6kwbv2sjpdbsp6vdhs1ytzdgjrcsmhn',
__block_range: {fn: 'int8range', args: [1463, null]} as unknown as Fn,
};

const csvStore = new CsvExporter<typeof data>('Test', csvDirPath);

await csvStore.export([data]);
await csvStore.export([data]);

await csvStore.shutdown();

Expand All @@ -57,19 +48,20 @@ describe('csv Store Service', () => {
`
);
});

it('JSON serialisation', async () => {
const csvFilePath2 = path.join(csvDirPath, 'JsonTest.csv');

const csvStore = new CsvStoreService('JsonTest', csvDirPath);
const data = {
id: '1463-6',
amount: 98746560n,
blockNumber: 1463,
jsonField: {field1: 'string', field2: 2, nestedJson: {foo: 'bar'}},
};

const csvStore = new CsvExporter<typeof data>('JsonTest', csvDirPath);

await csvStore.export([
{
id: '1463-6',
amount: 98746560n,
blockNumber: 1463,
jsonField: {field1: 'string', field2: 2, nestedJson: {foo: 'bar'}},
},
]);
await csvStore.export([data]);

await csvStore.shutdown();
const csv = await fs.promises.readFile(csvFilePath2, 'utf-8');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import fs from 'fs';
import path from 'path';
import {Stringifier, stringify} from 'csv-stringify';
import {getLogger} from '../../logger';
import {exitWithError} from '../../process';
import {Exporter} from './types';
import {getLogger} from '../../../logger';
import {exitWithError} from '../../../process';
import {BaseEntity} from '../model';
import {Exporter} from './exporter';

const logger = getLogger('CsvStore');
export class CsvStoreService implements Exporter {
export class CsvExporter<T extends BaseEntity = BaseEntity> implements Exporter<T> {
private fileExist?: boolean;
private stringifyStream: Stringifier;
private readonly writeStream: fs.WriteStream;
Expand Down Expand Up @@ -39,31 +40,45 @@ export class CsvStoreService implements Exporter {
this.fileExist = fs.existsSync(filePath);
return filePath;
}
async export(records: any[]): Promise<void> {

private async write(r: Omit<T, '__block_range'> & {blockNumber?: number}): Promise<void> {
return new Promise((resolve, reject) => {
this.stringifyStream.write(r, (error) => {
if (error) {
reject(error);
} else {
resolve(undefined);
}
});
});
}

private blockRangeToBlockNumber(input: T['__block_range']): number | undefined {
if (!input) return undefined;

if (Array.isArray(input)) {
return input[0] ?? undefined;
}

if ((input as any).fn === 'int8range') {
return (input as any).args[0];
}
}

async export(records: T[]): Promise<void> {
await Promise.all(
records
.map((r: any) => {
// remove store
const {__block_range, store, ...orgRecord} = r;
.map((r) => {
const {__block_range, ...orgRecord} = r;
if (__block_range !== undefined) {
return {
...orgRecord,
__block_number: r.blockNumber,
__block_number: this.blockRangeToBlockNumber(__block_range),
};
}
return orgRecord;
})
.map((r) => {
return new Promise((resolve, reject) => {
this.stringifyStream.write(r, (error) => {
if (error) {
reject(error);
} else {
resolve(undefined);
}
});
});
})
.map((r) => this.write(r))
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {BaseEntity} from '../model';

export interface Exporter<T extends BaseEntity = BaseEntity> {
/**
* Exports an array of records.
* This method should handle the processing of the provided records.
*
* @param records An array of records to be exported.
* These records are of the same type as the database entries
*/
export: (record: T[]) => Promise<void>;
/**
* Shuts down the export operation.
* This method should ensure that all ongoing export operations are
* completed and any resources used are properly released or closed.
*
* @returns A promise that resolves when the shutdown process is complete.
*/
shutdown: () => Promise<void>;
}

export type TransactionedExporter<T extends BaseEntity = BaseEntity> = Exporter<T> & {commit: () => Promise<void>};

export function isTxExporter<T extends BaseEntity = BaseEntity>(
exporter: Exporter<T>
): exporter is TransactionedExporter<T> {
return typeof (exporter as TransactionedExporter).commit === 'function';
}

export function asTxExporter<T extends BaseEntity = BaseEntity>(exporter: Exporter<T>): TransactionedExporter<T> {
if (isTxExporter(exporter)) return exporter;
return new TxExporter(exporter);
}

export class TxExporter<T extends BaseEntity = BaseEntity> implements TransactionedExporter<T> {
#pendingData: T[] = [];
#exporter: Exporter<T>;

constructor(exporter: Exporter<T>) {
this.#exporter = exporter;
}

// eslint-disable-next-line @typescript-eslint/require-await
async export(data: T[]): Promise<void> {
this.#pendingData.push(...data);
}

async shutdown(): Promise<void> {
await this.commit();
await this.#exporter.shutdown();
}

async commit(): Promise<void> {
await this.#exporter.export(this.#pendingData);
this.#pendingData = [];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

export * from './exporter';
export * from './csvStore';
Loading
Loading