From dcaeec23c9a5be40ad20a6b2ab635e3788dda472 Mon Sep 17 00:00:00 2001 From: Uros Marolt Date: Mon, 18 Sep 2023 11:01:31 +0200 Subject: [PATCH] removed processing state for integration.streams (#1488) --- .../src/repo/integrationStream.repo.ts | 73 ++++++++++--------- .../src/service/integrationStreamService.ts | 5 +- services/libs/types/src/enums/integrations.ts | 1 - 3 files changed, 42 insertions(+), 37 deletions(-) diff --git a/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts b/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts index ed26cf6b5d..d6c4bc67b2 100644 --- a/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts +++ b/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts @@ -40,25 +40,47 @@ export default class IntegrationStreamRepository extends RepositoryBase { - const results = await this.db().any( - ` - select s.id, - s."tenantId", - i.platform as "integrationType" - from integration.streams s - inner join integrations i on i.id = s."integrationId" - where s."runId" = $(runId) and s.state = $(state) - order by s."createdAt" asc - limit ${perPage} offset ${(page - 1) * perPage} - `, - { - runId, - state: IntegrationStreamState.PENDING, - }, - ) + let results: IProcessableStream[] + + if (lastId) { + results = await this.db().any( + ` + select s.id, + s."tenantId", + i.platform as "integrationType" + from integration.streams s + inner join integrations i on i.id = s."integrationId" + where s."runId" = $(runId) and s.state = $(state) and s.id > $(lastId) + order by s.id + limit ${limit} + `, + { + runId, + lastId, + state: IntegrationStreamState.PENDING, + }, + ) + } else { + results = await this.db().any( + ` + select s.id, + s."tenantId", + i.platform as "integrationType" + from integration.streams s + inner join integrations i on i.id = s."integrationId" + where s."runId" = $(runId) and s.state = $(state) + order by s.id + limit ${limit} + `, + { + runId, + state: IntegrationStreamState.PENDING, + }, + ) + } return results } @@ -132,21 +154,6 @@ export default class IntegrationStreamRepository extends RepositoryBase { - const result = await this.db().result( - `update integration.streams - set state = $(state), - "updatedAt" = now() - where id = $(streamId)`, - { - streamId, - state: IntegrationStreamState.PROCESSING, - }, - ) - - this.checkUpdateRowCount(result.rowCount, 1) - } - public async deleteStream(streamId: string): Promise { const result = await this.db().result( `delete from integration.streams where id = $(streamId)`, diff --git a/services/apps/integration_stream_worker/src/service/integrationStreamService.ts b/services/apps/integration_stream_worker/src/service/integrationStreamService.ts index 7c07db69a6..890f91d344 100644 --- a/services/apps/integration_stream_worker/src/service/integrationStreamService.ts +++ b/services/apps/integration_stream_worker/src/service/integrationStreamService.ts @@ -74,11 +74,10 @@ export default class IntegrationStreamService extends LoggerBase { public async continueProcessingRunStreams(runId: string): Promise { this.log.info('Continuing processing run streams!') - let streams = await this.repo.getPendingStreams(runId, 1, 20) + let streams = await this.repo.getPendingStreams(runId, 20) while (streams.length > 0) { for (const stream of streams) { this.log.info({ streamId: stream.id }, 'Triggering stream processing!') - await this.repo.markStreamInProgress(stream.id) this.streamWorkerEmitter.triggerStreamProcessing( stream.tenantId, stream.integrationType, @@ -86,7 +85,7 @@ export default class IntegrationStreamService extends LoggerBase { ) } - streams = await this.repo.getPendingStreams(runId, 1, 20) + streams = await this.repo.getPendingStreams(runId, 20, streams[streams.length - 1].id) } } diff --git a/services/libs/types/src/enums/integrations.ts b/services/libs/types/src/enums/integrations.ts index 7d7a85841a..58a30644ea 100644 --- a/services/libs/types/src/enums/integrations.ts +++ b/services/libs/types/src/enums/integrations.ts @@ -19,7 +19,6 @@ export enum IntegrationRunState { export enum IntegrationStreamState { DELAYED = 'delayed', PENDING = 'pending', - PROCESSING = 'processing', PROCESSED = 'processed', ERROR = 'error', }