The task handler system in fib-flow provides a flexible way to define and manage task execution logic across different workers and environments.
- Handler Registration
- Worker Specialization
- Handler Selection
- Example Configuration
- Usage Scenarios
- Task Lifecycle
- Best Practices
For API details, see Task Registration.
Key concepts:
- Register specific handlers for different task types
- Each handler can be specialized for specific task requirements
- Multiple workers can register the same handler
- Specialized workers can register unique handlers
- Multiple workers can register common handlers
- Tasks are distributed across available workers
- Automatic failover if a worker becomes unavailable
- Improved system throughput and reliability
- GPU-dependent tasks can be routed to GPU-enabled workers
- Memory-intensive tasks can be directed to high-memory workers
- Special hardware requirements (e.g., TPU, FPGA) can be accommodated
// Configure worker with specific capabilities
taskManager.configureWorker({
maxConcurrency: 5,
capabilities: ['GPU', 'high-memory'],
retryStrategy: {
maxAttempts: 3,
backoff: 'exponential'
}
});
- Tasks are automatically routed to workers with matching handlers
- If multiple workers are available, load is balanced automatically
- Tasks requiring specific resources wait for appropriate workers
- Ensures optimal resource utilization across the system
// Handler for data processing tasks
taskManager.use('data_processing', async (task) => {
const { data } = task.payload;
return { result: await processData(data) };
});
// Handler for GPU-intensive tasks
taskManager.use('gpu_task', async (task) => {
if (!hasGPU) {
throw new Error('GPU not available');
}
return { result: await gpuProcess(task.payload) };
});
// Handler for memory-intensive tasks
taskManager.use('memory_task', async (task) => {
if (getAvailableMemory() < requiredMemory) {
throw new Error('Insufficient memory');
}
return { result: await largeDataProcess(task.payload) };
});
// Process large datasets across multiple workers
taskManager.use('process_dataset', async (task) => {
const { dataset_id, chunk_size } = task.payload;
// Split dataset into chunks for parallel processing
const chunks = await splitDataset(dataset_id, chunk_size);
// Create child tasks for each chunk
return next(chunks.map(chunk => ({
name: 'process_chunk',
payload: { chunk_id: chunk.id }
})));
});
// GPU-specific task handler
taskManager.use('train_model', async (task) => {
if (!hasGPU()) {
throw new Error('GPU required for this task');
}
// ... training code ...
});
- Task Creation: Tasks are created with specific type and payload
- Handler Selection: System matches task with registered handlers
- Execution: Selected worker processes the task
- Result Handling: Results are collected and processed
- Error Management: Failed tasks are retried or marked as failed
- Register handlers early in application lifecycle
- Implement proper error handling and retries
- Monitor worker health and task completion rates
- Use appropriate task priorities for critical operations
- Implement graceful shutdown handling