diff --git a/docker-compose.yml b/docker-compose.yml index b571cc6f2..0551d1d27 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,6 +39,60 @@ services: volumes: - minio_data:/data + zookeeper: + image: confluentinc/cp-zookeeper:7.4.0 + environment: + ZOOKEEPER_CLIENT_PORT: ${ZOOKEEPER_PORT:-2181} + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "${ZOOKEEPER_PORT:-2181}:2181" + volumes: + - zookeeper_data:/var/lib/zookeeper/data + - zookeeper_log:/var/lib/zookeeper/log + healthcheck: + test: ["CMD-SHELL", "zookeeper-shell.sh localhost:${ZOOKEEPER_PORT:-2181} ls /"] + interval: 10s + timeout: 5s + retries: 5 + + # Add Kafka + kafka: + image: confluentinc/cp-kafka:7.4.0 + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:${ZOOKEEPER_PORT:-2181} + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + ports: + - "${KAFKA_PORT:-9092}:9092" + - "${KAFKA_EXTERNAL_PORT:-29092}:29092" + volumes: + - kafka_data:/var/lib/kafka/data + healthcheck: + test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list"] + interval: 10s + timeout: 5s + retries: 5 + + kafka-ui: + image: provectuslabs/kafka-ui:latest + container_name: kafka-ui + ports: + - "${KAFKA_UI_PORT:-9080}:8080" + environment: + KAFKA_CLUSTERS_0_NAME: maxun-cluster + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 + depends_on: + - kafka + backend: #build: #context: . @@ -63,6 +117,7 @@ services: - postgres - redis - minio + - kafka volumes: - /var/run/dbus:/var/run/dbus @@ -83,4 +138,7 @@ services: volumes: postgres_data: minio_data: - redis_data: \ No newline at end of file + redis_data: + kafka_data: + zookeeper_data: + zookeeper_log: \ No newline at end of file diff --git a/maxun-core/src/config/kafka.ts b/maxun-core/src/config/kafka.ts new file mode 100644 index 000000000..10f3cc96a --- /dev/null +++ b/maxun-core/src/config/kafka.ts @@ -0,0 +1,10 @@ +export const kafkaConfig = { + clientId: 'maxun-scraper', + brokers: ['localhost:29092'], + topics: { + SCRAPING_TASKS: 'scraping-tasks', + SCRAPING_RESULTS: 'scraping-results', + SCRAPING_DLQ: 'scraping-dlq' + }, + consumerGroup: 'scraping-group' +}; \ No newline at end of file diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index 70d425a18..cfef300f3 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -16,6 +16,11 @@ import Concurrency from './utils/concurrency'; import Preprocessor from './preprocessor'; import log, { Level } from './utils/logger'; +import { Kafka } from 'kafkajs'; +import { kafkaConfig } from './config/kafka'; + +import os from 'os'; + /** * Extending the Window interface for custom scraping functions. */ @@ -39,6 +44,7 @@ declare global { interface InterpreterOptions { maxRepeats: number; maxConcurrency: number; + maxWorkers: number; serializableCallback: (output: any) => (void | Promise); binaryCallback: (output: any, mimeType: string) => (void | Promise); debug: boolean; @@ -68,13 +74,31 @@ export default class Interpreter extends EventEmitter { private cumulativeResults: Record[] = []; + private kafka: Kafka; + + private producer: any; + + private async initializeKafka() { + this.producer = this.kafka.producer({ + allowAutoTopicCreation: true, + idempotent: true + }); + await this.producer.connect(); + } + constructor(workflow: WorkflowFile, options?: Partial) { super(); this.workflow = workflow.workflow; this.initializedWorkflow = null; + this.kafka = new Kafka({ + clientId: kafkaConfig.clientId, + brokers: kafkaConfig.brokers + }); + this.initializeKafka(); this.options = { maxRepeats: 5, maxConcurrency: 5, + maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)), serializableCallback: (data) => { log(JSON.stringify(data), Level.WARN); }, @@ -451,7 +475,7 @@ export default class Interpreter extends EventEmitter { const scrapeResults: Record[] = await page.evaluate((cfg) => window.scrapeList(cfg), config); await this.options.serializableCallback(scrapeResults); } else { - const scrapeResults: Record[] = await this.handlePagination(page, config); + const scrapeResults: Record[] = await this.handleParallelPagination(page, config); await this.options.serializableCallback(scrapeResults); } }, @@ -540,6 +564,276 @@ export default class Interpreter extends EventEmitter { } } + private async handleParallelPagination(page: Page, config: any): Promise { + if (config.limit > 10000 && config.pagination.type === 'clickNext') { + console.time('parallel-scraping'); + + const workflowId = `workflow-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; + console.log(`Starting workflow with ID: ${workflowId}`); + + const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4)); + const batchSize = Math.ceil(config.limit / numWorkers); + const tasks = []; + const pageUrls: string[] = []; + + let availableSelectors = config.pagination.selector.split(','); + let visitedUrls: string[] = []; + + const { itemsPerPage, estimatedPages } = await page.evaluate( + ({ listSelector, limit }) => { + const items = document.querySelectorAll(listSelector).length; + return { + itemsPerPage: items, + estimatedPages: Math.ceil(limit / items) + }; + }, + { listSelector: config.listSelector, limit: config.limit } + ); + + console.log(`Items per page: ${itemsPerPage}`); + console.log(`Estimated pages needed: ${estimatedPages}`); + + try { + while (true) { + pageUrls.push(page.url()) + + if (pageUrls.length >= estimatedPages) { + console.log('Reached estimated number of pages. Stopping pagination.'); + break; + } + + let checkButton = null; + let workingSelector = null; + + for (let i = 0; i < availableSelectors.length; i++) { + const selector = availableSelectors[i]; + try { + // Wait for selector with a short timeout + checkButton = await page.waitForSelector(selector, { state: 'attached' }); + if (checkButton) { + workingSelector = selector; + break; + } + } catch (error) { + console.log(`Selector failed: ${selector}`); + } + } + + if(!workingSelector) { + break; + } + + const nextButton = await page.$(workingSelector); + if (!nextButton) { + break; + } + + const selectorIndex = availableSelectors.indexOf(workingSelector!); + availableSelectors = availableSelectors.slice(selectorIndex); + + const previousUrl = page.url(); + visitedUrls.push(previousUrl); + + try { + // Try both click methods simultaneously + await Promise.race([ + Promise.all([ + page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), + nextButton.click() + ]), + Promise.all([ + page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), + nextButton.dispatchEvent('click') + ]) + ]); + } catch (error) { + // Verify if navigation actually succeeded + const currentUrl = page.url(); + if (currentUrl === previousUrl) { + console.log("Previous URL same as current URL. Navigation failed."); + } + } + + const currentUrl = page.url(); + if (visitedUrls.includes(currentUrl)) { + console.log(`Detected navigation to a previously visited URL: ${currentUrl}`); + + // Extract the current page number from the URL + const match = currentUrl.match(/\d+/); + if (match) { + const currentNumber = match[0]; + // Use visitedUrls.length + 1 as the next page number + const nextNumber = visitedUrls.length + 1; + + // Create new URL by replacing the current number with the next number + const nextUrl = currentUrl.replace(currentNumber, nextNumber.toString()); + + console.log(`Navigating to constructed URL: ${nextUrl}`); + + // Navigate to the next page + await Promise.all([ + page.waitForNavigation({ waitUntil: 'networkidle' }), + page.goto(nextUrl) + ]); + } + } + + await page.waitForTimeout(1000); + } + } catch (error) { + console.error('Error collecting page URLs:', error); + } + + console.log(`Collected ${pageUrls.length} unique page URLs`); + + for (let i = 0; i < numWorkers; i++) { + const startIndex = i * batchSize; + const endIndex = Math.min((i + 1) * batchSize, config.limit); + const workerUrls = pageUrls.slice( + i * Math.ceil(pageUrls.length / numWorkers), + (i + 1) * Math.ceil(pageUrls.length / numWorkers) + ); + + const task = { + taskId: `${workflowId}-task-${i}`, + workflowId, + urls: workerUrls, + config: { + listSelector: config.listSelector, + fields: config.fields, + pagination: config.pagination, + limit: endIndex - startIndex, + startIndex, + endIndex + } + }; + + await this.producer.send({ + topic: kafkaConfig.topics.SCRAPING_TASKS, + messages: [{ + key: task.taskId, + value: JSON.stringify(task), + headers: { + 'workflow-id': workflowId, + 'retry-count': '0', + 'total-tasks': numWorkers.toString() + } + }] + }); + + tasks.push(task); + } + + console.log("TASKS SENT TO KAFKA (Not stringified)", tasks); + + // Wait for results from Kafka + const results = await this.waitForScrapingResults(tasks); + console.timeEnd('parallel-scraping'); + return results; + } + + return this.handlePagination(page, config); + } + + private async waitForScrapingResults(tasks: any[]): Promise { + // Create a map to store our workflow's results + const resultsMap = new Map(); + + // Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID + const workflowId = tasks[0].workflowId; + console.log(`Waiting for results from workflow: ${workflowId}`); + + // Create a Set of task IDs for quick lookup - these are the only tasks we care about + const expectedTaskIds = new Set(tasks.map(task => task.taskId)); + + // Create a consumer specifically for this workflow + const resultConsumer = this.kafka.consumer({ + groupId: `scraping-group-results-${workflowId}`, + maxWaitTimeInMs: 1000, + maxBytesPerPartition: 2097152 // 2MB + }); + + try { + await resultConsumer.connect(); + console.log('Result consumer connected successfully'); + + await resultConsumer.subscribe({ + topic: kafkaConfig.topics.SCRAPING_RESULTS, + fromBeginning: true + }); + console.log('Result consumer subscribed to topic successfully'); + + return new Promise((resolve, reject) => { + let isRunning = true; + + resultConsumer.run({ + eachMessage: async ({ topic, partition, message }) => { + if (!isRunning) return; + + try { + const result = JSON.parse(message.value!.toString()); + + // Verify both task ID and workflow ID match + if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) { + // Store this task's results + if (!resultsMap.has(result.taskId)) { + resultsMap.set(result.taskId, result.data); + console.log(`Received results for task ${result.taskId}. ` + + `Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`); + } + + // Check if we have all our workflow's results + if (resultsMap.size === tasks.length) { + isRunning = false; + + // Sort tasks by their numeric index (extract number from task ID) + const sortedTasks = [...tasks].sort((a, b) => { + const aIndex = parseInt(a.taskId.split('-').pop() || '0'); + const bIndex = parseInt(b.taskId.split('-').pop() || '0'); + return aIndex - bIndex; + }); + + // Combine results in the sorted task order + const allResults = sortedTasks + .map(task => { + const taskResults = resultsMap.get(task.taskId); + if (!taskResults) { + console.warn(`Missing results for task ${task.taskId} in workflow ${workflowId}`); + return []; + } + return taskResults; + }) + .flat(); + + console.log(`Successfully collected all results from workflow ${workflowId}`); + + resolve(allResults); + } + } + } catch (error) { + console.error(`Error processing message in workflow ${workflowId}:`, error); + reject(error); + } + } + }); + + // // Add a timeout to prevent hanging + // const timeout = setTimeout(() => { + // if (isRunning) { + // isRunning = false; + // console.error(`Timeout waiting for results from workflow ${workflowId}. ` + + // `Received ${resultsMap.size} of ${tasks.length} expected results.`); + // reject(new Error(`Timeout waiting for results from workflow ${workflowId}`)); + // } + // }, 30000); // 30 second timeout + }); + + } catch (error) { + console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error); + throw error; + } + } + private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { let allResults: Record[] = []; let previousHeight = 0; @@ -556,6 +850,7 @@ export default class Interpreter extends EventEmitter { await page.waitForTimeout(2000); const currentHeight = await page.evaluate(() => document.body.scrollHeight); + console.log(`Current scroll height: ${currentHeight}`); if (currentHeight === previousHeight) { const finalResults = await page.evaluate((cfg) => window.scrapeList(cfg), config); allResults = allResults.concat(finalResults); diff --git a/maxun-core/src/scripts/setup-kafka.ts b/maxun-core/src/scripts/setup-kafka.ts new file mode 100644 index 000000000..99b4a2796 --- /dev/null +++ b/maxun-core/src/scripts/setup-kafka.ts @@ -0,0 +1,23 @@ +import { KafkaManager } from '../utils/kafka-manager'; + +async function setupKafka() { + const manager = new KafkaManager(); + + try { + console.log('Initializing Kafka manager...'); + await manager.initialize(); + console.log('Kafka setup completed successfully'); + + // Keep monitoring for a while to verify setup + setTimeout(async () => { + await manager.cleanup(); + process.exit(0); + }, 10000); + + } catch (error) { + console.error('Failed to setup Kafka:', error); + process.exit(1); + } +} + +setupKafka().catch(console.error); \ No newline at end of file diff --git a/maxun-core/src/scripts/start-consumer.ts b/maxun-core/src/scripts/start-consumer.ts new file mode 100644 index 000000000..5b96ee6e9 --- /dev/null +++ b/maxun-core/src/scripts/start-consumer.ts @@ -0,0 +1,22 @@ +import { ScrapingConsumer } from '../utils/scraping-consumer'; + +async function main() { + const consumer = new ScrapingConsumer(); + + // Handle graceful shutdown + process.on('SIGINT', async () => { + console.log('Shutting down consumer...'); + process.exit(0); + }); + + try { + console.log('Starting scraping consumer...'); + await consumer.start(); + console.log('Consumer is running and waiting for tasks...'); + } catch (error) { + console.error('Failed to start consumer:', error); + process.exit(1); + } +} + +main().catch(console.error); \ No newline at end of file diff --git a/maxun-core/src/types/worker.ts b/maxun-core/src/types/worker.ts new file mode 100644 index 000000000..3520a2b3c --- /dev/null +++ b/maxun-core/src/types/worker.ts @@ -0,0 +1,59 @@ +export interface WorkerConfig { + workerIndex: number; + startIndex: number; + endIndex: number; + batchSize: number; + pageUrls: string[]; + listSelector: string; + fields: any; + pagination: { + type: string; + selector: string; + }; +} + +export interface SharedState { + totalScraped: number; + results: any[]; +} + +export interface WorkerProgressData { + percentage: number; + currentUrl: string; + scrapedItems: number; + timeElapsed: number; + estimatedTimeRemaining: number; + failures: number; + performance: PerformanceMetrics; +} + +export interface PerformanceMetrics { + startTime: number; + endTime: number; + duration: number; + pagesProcessed: number; + itemsScraped: number; + failedPages: number; + averageTimePerPage: number; + memoryUsage: { + heapUsed: number; + heapTotal: number; + external: number; + rss: number; + }; + cpuUsage: { + user: number; + system: number; + }; +} + +export interface GlobalMetrics { + totalPagesProcessed: number; + totalItemsScraped: number; + totalFailures: number; + workersActive: number; + averageSpeed: number; + timeElapsed: number; + memoryUsage: NodeJS.MemoryUsage; + cpuUsage: NodeJS.CpuUsage; +} \ No newline at end of file diff --git a/maxun-core/src/utils/kafka-manager.ts b/maxun-core/src/utils/kafka-manager.ts new file mode 100644 index 000000000..7aa78f831 --- /dev/null +++ b/maxun-core/src/utils/kafka-manager.ts @@ -0,0 +1,66 @@ +import { Kafka, Consumer, Producer } from 'kafkajs'; +import { kafkaConfig } from '../config/kafka'; +import { EventEmitter } from 'events'; + +export class KafkaManager extends EventEmitter { + private kafka: Kafka; + private producer: Producer; + private consumer: Consumer; + private metricsInterval: NodeJS.Timeout | null = null; + + constructor() { + super(); + this.kafka = new Kafka({ + clientId: kafkaConfig.clientId, + brokers: kafkaConfig.brokers + }); + + this.producer = this.kafka.producer(); + this.consumer = this.kafka.consumer({ + groupId: kafkaConfig.consumerGroup, + sessionTimeout: 30000 + }); + } + + async initialize() { + await this.producer.connect(); + await this.consumer.connect(); + await this.createTopics(); + this.startMetricsReporting(); + } + + private async createTopics() { + const admin = this.kafka.admin(); + await admin.createTopics({ + topics: [ + { topic: kafkaConfig.topics.SCRAPING_TASKS, numPartitions: 10 }, + { topic: kafkaConfig.topics.SCRAPING_RESULTS, numPartitions: 10 }, + { topic: kafkaConfig.topics.SCRAPING_DLQ, numPartitions: 1 } + ] + }); + await admin.disconnect(); + } + + private startMetricsReporting() { + this.metricsInterval = setInterval(async () => { + const admin = this.kafka.admin(); + const metrics = await admin.fetchTopicMetadata({ + topics: [ + kafkaConfig.topics.SCRAPING_TASKS, + kafkaConfig.topics.SCRAPING_RESULTS + ] + }); + + this.emit('metrics', metrics); + await admin.disconnect(); + }, 5000); + } + + async cleanup() { + if (this.metricsInterval) { + clearInterval(this.metricsInterval); + } + await this.producer.disconnect(); + await this.consumer.disconnect(); + } +} \ No newline at end of file diff --git a/maxun-core/src/utils/scraping-consumer.ts b/maxun-core/src/utils/scraping-consumer.ts new file mode 100644 index 000000000..2e6e79edf --- /dev/null +++ b/maxun-core/src/utils/scraping-consumer.ts @@ -0,0 +1,245 @@ +import { Kafka, Consumer, Producer } from 'kafkajs'; +import { chromium, Browser, Page } from 'playwright'; +import { kafkaConfig } from '../config/kafka'; +import path from 'path'; + +declare global { + interface Window { + scrape: (selector: string | null) => Record[]; + scrapeSchema: ( + schema: Record + ) => Record; + scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record[]; + scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[]; + scrollDown: (pages?: number) => void; + scrollUp: (pages?: number) => void; + } +} + +export class ScrapingConsumer { + private kafka: Kafka; + private consumer: Consumer; + private producer: Producer; + private processedWorkflows: Map>; + private workflowStats: Map; + + constructor() { + this.kafka = new Kafka({ + clientId: `${kafkaConfig.clientId}-consumer`, + brokers: kafkaConfig.brokers + }); + + this.consumer = this.kafka.consumer({ + groupId: kafkaConfig.consumerGroup, + sessionTimeout: 30000, + heartbeatInterval: 3000, + maxWaitTimeInMs: 1000, + }); + this.producer = this.kafka.producer(); + this.processedWorkflows = new Map(); + this.workflowStats = new Map(); + } + + async start() { + await this.consumer.connect(); + await this.producer.connect(); + await this.consumer.subscribe({ + topic: kafkaConfig.topics.SCRAPING_TASKS, + fromBeginning: false + }); + + await this.consumer.run({ + partitionsConsumedConcurrently: 4, + autoCommit: false, + eachMessage: async ({ topic, partition, message }) => { + try { + const task = JSON.parse(message.value!.toString()); + const workflowId = task.workflowId; + + // Initialize workflow tracking if needed + if (!this.processedWorkflows.has(workflowId)) { + this.processedWorkflows.set(workflowId, new Set()); + this.workflowStats.set(workflowId, { + startTime: Date.now(), + totalTasks: parseInt(message.headers['total-tasks']?.toString() || '0'), + processedTasks: 0, + totalItems: 0 + }); + } + + // Check if this task was already processed within its workflow + if (this.processedWorkflows.get(workflowId)?.has(task.taskId)) { + console.log(`Task ${task.taskId} from workflow ${workflowId} already processed`); + await this.consumer.commitOffsets([{ + topic, + partition, + offset: (Number(message.offset) + 1).toString() + }]); + return; + } + + const results = await this.processTask(task); + + const stats = this.workflowStats.get(workflowId); + if (stats) { + stats.processedTasks += 1; + stats.totalItems += results.length; + + console.log( + `Workflow ${workflowId} progress: ` + + `${stats.processedTasks}/${stats.totalTasks} tasks, ` + + `${stats.totalItems} items collected` + ); + } + + // Send results with workflow context + await this.producer.send({ + topic: kafkaConfig.topics.SCRAPING_RESULTS, + messages: [{ + key: task.taskId, + value: JSON.stringify({ + taskId: task.taskId, + workflowId: task.workflowId, + data: results + }), + // Add workflow headers for better tracking + headers: { + 'workflow-id': task.workflowId, + 'items-count': results.length.toString() + } + }] + }); + + // Mark task as processed within its workflow + this.processedWorkflows.get(workflowId)?.add(task.taskId); + + // Clean up old workflows periodically + this.cleanupOldWorkflows(); + + await this.consumer.commitOffsets([{ + topic, + partition, + offset: (Number(message.offset) + 1).toString() + }]); + + } catch (error) { + await this.handleError(message, error); + } + } + }); + } + + private async ensureScriptsLoaded(page: Page) { + const isScriptLoaded = await page.evaluate(() => typeof window.scrape === 'function' && typeof window.scrapeSchema === 'function' && typeof window.scrapeList === 'function' && typeof window.scrapeListAuto === 'function' && typeof window.scrollDown === 'function' && typeof window.scrollUp === 'function'); + if (!isScriptLoaded) { + await page.addInitScript({ path: path.join(__dirname, '..', 'browserSide', 'scraper.js') }); + } +} + + private async processTask(task: any) { + let browser: Browser | null = null; + let scrapedItems: Set = new Set(); + let allResults: Record[] = []; + + try { + browser = await chromium.launch({ + headless: true, + args: [ + "--disable-blink-features=AutomationControlled", + "--disable-web-security", + "--disable-features=IsolateOrigins,site-per-process", + "--disable-site-isolation-trials", + "--disable-extensions", + "--no-sandbox", + "--disable-dev-shm-usage", + ] + }); + + const context = await browser.newContext(); + const page = await context.newPage(); + + await this.ensureScriptsLoaded(page); + + for (const url of task.urls) { + try { + await page.goto(url, { + waitUntil: 'networkidle', + timeout: 30000 + }); + + await page.waitForTimeout(1000); + + const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), task.config); + + // Filter out already scraped items + const newResults = pageResults.filter(item => { + const uniqueKey = JSON.stringify(item); + if (scrapedItems.has(uniqueKey)) return false; // Ignore if already scraped + scrapedItems.add(uniqueKey); // Mark as scraped + return true; + }); + + allResults = allResults.concat(newResults); + console.log(`Results so far (${task.taskId}): ${allResults.length}`); + } catch (error) { + console.error(`Error processing URL ${url}:`, error); + } + } + + await page.close(); + } finally { + if (browser) await browser.close(); + } + + return allResults; + } + + private async cleanupOldWorkflows() { + const ONE_HOUR = 60 * 60 * 1000; + const now = Date.now(); + + for (const [workflowId] of this.processedWorkflows) { + const workflowTimestamp = parseInt(workflowId.split('-')[1]); + if (now - workflowTimestamp > ONE_HOUR) { + this.processedWorkflows.delete(workflowId); + } + } + } + + private async handleError(message: any, error: Error) { + const retryCount = parseInt(message.headers['retry-count'] || '0'); + const task = JSON.parse(message.value!.toString()); + + if (retryCount < 3) { + await this.producer.send({ + topic: kafkaConfig.topics.SCRAPING_TASKS, + messages: [{ + key: message.key, + value: message.value, + headers: { + 'workflow-id': task.workflowId, + 'retry-count': (retryCount + 1).toString(), + 'error': error.message + } + }] + }); + } else { + await this.producer.send({ + topic: kafkaConfig.topics.SCRAPING_DLQ, + messages: [{ + key: message.key, + value: message.value, + headers: { + 'workflow-id': task.workflowId, + 'final-error': error.message + } + }] + }); + } + } +} \ No newline at end of file diff --git a/maxun-core/tsconfig.json b/maxun-core/tsconfig.json index 68c3ead79..728e4da92 100644 --- a/maxun-core/tsconfig.json +++ b/maxun-core/tsconfig.json @@ -7,5 +7,5 @@ "module": "commonjs", "esModuleInterop": true }, - "include": ["src"] + "include": ["src/**/*"], }