diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index ab69ca935..4d1e51b5f 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -5,7 +5,7 @@ import { extractRequestSchema, ExtractResponse, } from "./types"; -import { getExtractQueue } from "../../services/queue-service"; +import { getScrapeQueue } from "../../services/queue-service"; import * as Sentry from "@sentry/node"; import { saveExtract } from "../../lib/extract/extract-redis"; import { getTeamIdSyncB } from "../../lib/extract/team-id-sync"; @@ -73,12 +73,12 @@ export async function extractController( op: "queue.publish", attributes: { "messaging.message.id": extractId, - "messaging.destination.name": getExtractQueue().name, + "messaging.destination.name": getScrapeQueue().name, "messaging.message.body.size": size, }, }, async (span) => { - await getExtractQueue().add(extractId, { + await getScrapeQueue().add(extractId, { ...jobData, sentry: { trace: Sentry.spanToTraceHeader(span), @@ -89,7 +89,7 @@ export async function extractController( }, ); } else { - await getExtractQueue().add(extractId, jobData, { + await getScrapeQueue().add(extractId, jobData, { jobId: extractId, }); } diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index cc2d41b8c..43805d2c2 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node"; import express, { NextFunction, Request, Response } from "express"; import bodyParser from "body-parser"; import cors from "cors"; -import { getExtractQueue, getScrapeQueue } from "./services/queue-service"; +import { getScrapeQueue } from "./services/queue-service"; import { v0Router } from "./routes/v0"; import os from "os"; import { logger } from "./lib/logger"; @@ -45,8 +45,8 @@ const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ - queues: [new BullAdapter(getScrapeQueue()), new BullAdapter(getExtractQueue())], - serverAdapter: serverAdapter, + queues: [new BullAdapter(getScrapeQueue())], + serverAdapter, }); app.use( @@ -245,13 +245,3 @@ app.use( ); logger.info(`Worker ${process.pid} started`); - -// const sq = getScrapeQueue(); - -// sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); -// sq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); -// sq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); -// sq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); -// sq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); -// sq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); -// diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index d3d8a4e53..edca81308 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -2,22 +2,20 @@ import { Queue } from "bullmq"; import { logger } from "../lib/logger"; import IORedis from "ioredis"; -let scrapeQueue: Queue; -let extractQueue: Queue; +let mainQueue: Queue; let loggingQueue: Queue; export const redisConnection = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, }); -export const scrapeQueueName = "{scrapeQueue}"; -export const extractQueueName = "{extractQueue}"; +export const mainQueueName = "{mainQueue}"; export const loggingQueueName = "{loggingQueue}"; -export function getScrapeQueue() { - if (!scrapeQueue) { - scrapeQueue = new Queue( - scrapeQueueName, +export function getMainQueue() { + if (!mainQueue) { + mainQueue = new Queue( + mainQueueName, { connection: redisConnection, defaultJobOptions: { @@ -30,33 +28,21 @@ export function getScrapeQueue() { }, } ); - logger.info("Web scraper queue created"); + logger.info("Main queue created"); } - return scrapeQueue; + return mainQueue; } -export function getExtractQueue() { - if (!extractQueue) { - extractQueue = new Queue( - extractQueueName, - { - connection: redisConnection, - defaultJobOptions: { - removeOnComplete: { - age: 90000, // 25 hours - }, - removeOnFail: { - age: 90000, // 25 hours - }, - }, - } - ); - logger.info("Extraction queue created"); +export function getLoggingQueue() { + if (!loggingQueue) { + loggingQueue = new Queue(loggingQueueName, { + connection: redisConnection, + }); + logger.info("Logging queue created"); } - return extractQueue; + return loggingQueue; } - -// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE -// import { QueueEvents } from 'bullmq'; -// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); +// Backwards compatibility exports +export const getScrapeQueue = getMainQueue; +export const getExtractQueue = getMainQueue; diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index f28f3f35e..132193876 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -6,8 +6,7 @@ import { getScrapeQueue, getExtractQueue, redisConnection, - scrapeQueueName, - extractQueueName, + mainQueueName, } from "./queue-service"; import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; @@ -269,7 +268,23 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { await addJobPriority(job.data.team_id, job.id); let err = null; try { - if (job.data?.mode === "kickoff") { + // Check job type and process accordingly + if (job.data.extractId) { + // This is an extract job + const result = await performExtraction(job.data.extractId, { + request: job.data.request, + teamId: job.data.teamId, + plan: job.data.plan, + subId: job.data.subId, + }); + + if (result.success) { + await job.moveToCompleted(result, token, false); + } else { + throw new Error(result.error || "Unknown error during extraction"); + } + } else if (job.data?.mode === "kickoff") { + // This is a kickoff job const result = await processKickoffJob(job, token); if (result.success) { try { @@ -280,6 +295,7 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { await job.moveToFailed((result as any).error, token, false); } } else { + // This is a scrape job const result = await processJob(job, token); if (result.success) { try { @@ -306,6 +322,14 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { Sentry.captureException(error); err = error; await job.moveToFailed(error, token, false); + + // Handle extract job specific error updates + if (job.data.extractId) { + await updateExtract(job.data.extractId, { + status: "failed", + error: error.error ?? error ?? "Unknown error, please contact help@firecrawl.dev. Extract id: " + job.data.extractId, + }); + } } finally { await deleteJobPriority(job.data.team_id, job.id); clearInterval(extendLockInterval); @@ -314,58 +338,6 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { return err; }; -const processExtractJobInternal = async (token: string, job: Job & { id: string }) => { - const logger = _logger.child({ - module: "extract-worker", - method: "processJobInternal", - jobId: job.id, - extractId: job.data.extractId, - teamId: job.data?.teamId ?? undefined, - }); - - const extendLockInterval = setInterval(async () => { - logger.info(`🔄 Worker extending lock on job ${job.id}`); - await job.extendLock(token, jobLockExtensionTime); - }, jobLockExtendInterval); - - try { - const result = await performExtraction(job.data.extractId, { - request: job.data.request, - teamId: job.data.teamId, - plan: job.data.plan, - subId: job.data.subId, - }); - - if (result.success) { - // Move job to completed state in Redis - await job.moveToCompleted(result, token, false); - return result; - } else { - throw new Error(result.error || "Unknown error during extraction"); - } - } catch (error) { - logger.error(`🚫 Job errored ${job.id} - ${error}`, { error }); - - Sentry.captureException(error, { - data: { - job: job.id, - }, - }); - - // Move job to failed state in Redis - await job.moveToFailed(error, token, false); - - await updateExtract(job.data.extractId, { - status: "failed", - error: error.error ?? error ?? "Unknown error, please contact help@firecrawl.dev. Extract id: " + job.data.extractId, - }); - // throw error; - } finally { - - clearInterval(extendLockInterval); - } -}; - let isShuttingDown = false; process.on("SIGINT", () => { @@ -522,9 +494,8 @@ const workerFun = async ( } }; -// Start both workers +// Start single worker for all jobs workerFun(getScrapeQueue(), processJobInternal); -workerFun(getExtractQueue(), processExtractJobInternal); async function processKickoffJob(job: Job & { id: string }, token: string) { const logger = _logger.child({