Skip to content

Commit

Permalink
removed processing state for integration.streams (#1488)
Browse files Browse the repository at this point in the history
  • Loading branch information
Uros Marolt authored Sep 18, 2023
1 parent 2d68ecf commit dcaeec2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,47 @@ export default class IntegrationStreamRepository extends RepositoryBase<Integrat

public async getPendingStreams(
runId: string,
page: number,
perPage: number,
limit: number,
lastId?: string,
): Promise<IProcessableStream[]> {
const results = await this.db().any(
`
select s.id,
s."tenantId",
i.platform as "integrationType"
from integration.streams s
inner join integrations i on i.id = s."integrationId"
where s."runId" = $(runId) and s.state = $(state)
order by s."createdAt" asc
limit ${perPage} offset ${(page - 1) * perPage}
`,
{
runId,
state: IntegrationStreamState.PENDING,
},
)
let results: IProcessableStream[]

if (lastId) {
results = await this.db().any(
`
select s.id,
s."tenantId",
i.platform as "integrationType"
from integration.streams s
inner join integrations i on i.id = s."integrationId"
where s."runId" = $(runId) and s.state = $(state) and s.id > $(lastId)
order by s.id
limit ${limit}
`,
{
runId,
lastId,
state: IntegrationStreamState.PENDING,
},
)
} else {
results = await this.db().any(
`
select s.id,
s."tenantId",
i.platform as "integrationType"
from integration.streams s
inner join integrations i on i.id = s."integrationId"
where s."runId" = $(runId) and s.state = $(state)
order by s.id
limit ${limit}
`,
{
runId,
state: IntegrationStreamState.PENDING,
},
)
}

return results
}
Expand Down Expand Up @@ -132,21 +154,6 @@ export default class IntegrationStreamRepository extends RepositoryBase<Integrat
this.checkUpdateRowCount(result.rowCount, 1)
}

public async markStreamInProgress(streamId: string): Promise<void> {
const result = await this.db().result(
`update integration.streams
set state = $(state),
"updatedAt" = now()
where id = $(streamId)`,
{
streamId,
state: IntegrationStreamState.PROCESSING,
},
)

this.checkUpdateRowCount(result.rowCount, 1)
}

public async deleteStream(streamId: string): Promise<void> {
const result = await this.db().result(
`delete from integration.streams where id = $(streamId)`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,18 @@ export default class IntegrationStreamService extends LoggerBase {
public async continueProcessingRunStreams(runId: string): Promise<void> {
this.log.info('Continuing processing run streams!')

let streams = await this.repo.getPendingStreams(runId, 1, 20)
let streams = await this.repo.getPendingStreams(runId, 20)
while (streams.length > 0) {
for (const stream of streams) {
this.log.info({ streamId: stream.id }, 'Triggering stream processing!')
await this.repo.markStreamInProgress(stream.id)
this.streamWorkerEmitter.triggerStreamProcessing(
stream.tenantId,
stream.integrationType,
stream.id,
)
}

streams = await this.repo.getPendingStreams(runId, 1, 20)
streams = await this.repo.getPendingStreams(runId, 20, streams[streams.length - 1].id)
}
}

Expand Down
1 change: 0 additions & 1 deletion services/libs/types/src/enums/integrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ export enum IntegrationRunState {
export enum IntegrationStreamState {
DELAYED = 'delayed',
PENDING = 'pending',
PROCESSING = 'processing',
PROCESSED = 'processed',
ERROR = 'error',
}
Expand Down

0 comments on commit dcaeec2

Please sign in to comment.