Jobnik Best Practices
This guide provides recommendations and patterns for building robust, scalable, and maintainable Jobnik-based workflows.
Job Design Patternsβ
Keep Stage Focusedβ
Each stage should represent a single, distinct phase of work. Don't combine unrelated operations into one stage β if tasks need to both process and upload, split them into separate stages. This improves retry granularity and makes failures easier to diagnose. See Concepts & Data Structures for more on the job/stage/task hierarchy.
- β Good
- β Bad
// Separate stages for distinct phases of work
const processingStage = await producer.createStage(job.id, {
type: 'process-tiles',
data: { resolution: 'high' }
});
await producer.createTasks(processingStage.id, processingStage.type, rawTiles.map(tile => ({ data: { tileId: tile.id } })));
const uploadStage = await producer.createStage(job.id, {
type: 'upload-tiles',
data: { bucket: 'maps-bucket', targetPath: '/maps/north' }
});
await producer.createTasks(uploadStage.id, uploadStage.type, processedTiles.map(tile => ({ data: { tileId: tile.id } })));
// Don't combine unrelated operations into one stage
const mixedStage = await producer.createStage(job.id, {
type: 'process-and-upload-tiles', // Doing too much β hard to retry and monitor
data: { resolution: 'high', bucket: 'maps-bucket' }
});
Use Meaningful Stage Namesβ
Stage names should clearly indicate the type of work being performed.
- β Good
- β Bad
// Clear, descriptive stage names
await producer.createStage(jobId, {
type: 'validate-input',
data: { validationRules }
});
await producer.createStage(jobId, {
type: 'resize-images',
data: { dimensions }
});
await producer.createStage(jobId, {
type: 'upload-to-storage',
data: { bucket }
});
// Vague, unclear stage names
await producer.createStage(jobId, {
type: 'stage1',
data: { validationRules }
});
await producer.createStage(jobId, {
type: 'process',
data: { dimensions }
});
await producer.createStage(jobId, {
type: 'finish',
data: { bucket }
});
Store References, Not Large Payloadsβ
Store URLs, IDs, and paths in task data, not large files or blobs. Keep in mind that referencing external resources introduces a dependency on their availability and consistency β if a resource changes or is replaced while tasks are still in flight, different tasks may operate on different versions. For advanced use cases, consider including a version identifier in the stage data (shared across all tasks in the stage) to pin all tasks to the same resource version.
- β Good
- β Bad
// Stage data holds shared config, including the resource version
await producer.createStage(jobId, {
type: 'resize-images',
data: {
sourceVersion: 'v3', // All tasks in this stage use the same version
processingConfig: { width: 800, height: 600 }
}
});
// Task data holds only per-item references
await producer.createTasks(stage.id, stage.type, [{
data: {
imageUrl: 's3://bucket/images/v3/original/file.jpg',
targetPath: 's3://bucket/images/v3/resized/file.jpg'
}
}]);
// Don't embed large data
await producer.createTasks(stage.id, stage.type, [{
data: {
imageBase64: 'iVBORw0KGgoAAAANSUhEUg...(100KB+)',
metadata: { ... }
}
}]);
Type Safety Best Practicesβ
Define Strict Typesβ
Use TypeScript's type system to enforce data contracts between producers and workers.
// Define strict types with required fields
export interface ImageProcessingJob {
'image-processing': {
data: {
uploadId: string;
userId: string;
};
userMetadata: {
priority: 'HIGH' | 'MEDIUM' | 'LOW'; // Use unions for enums
};
};
}
export interface ImageStages {
'resize': {
data: {
readonly width: number;
readonly height: number;
};
userMetadata: {
processedCount: number;
};
task: {
data: {
sourceUrl: string;
targetPath: string;
};
userMetadata: {
attempts: number;
};
};
};
}
Worker Implementation Best Practicesβ
Handle Graceful Shutdownβ
Always check the abort signal and clean up resources properly.
export class ImageProcessor {
public async processTask(
task: Task<ImageTask>,
context: TaskHandlerContext<JobTypes, StageTypes, 'image-processing', 'resize'>
): Promise<void> {
const { sourceUrl, targetPath } = task.data;
// Start processing
const stream = await downloadImage(sourceUrl);
// Check for shutdown signal
if (context.signal.aborted) {
stream.destroy();
throw new Error('Processing cancelled');
}
const processedImage = await this.resize(stream, context.stage.data);
// Check again before upload
if (context.signal.aborted) {
throw new Error('Processing cancelled');
}
await this.uploadImage(processedImage, targetPath);
context.logger.info('Task completed', { taskId: task.id });
}
}
Implement Idempotencyβ
Tasks may be retried. Ensure your handlers are idempotent:
- β Idempotent
- β Not Idempotent
public async processTask(task: Task<ImageTask>, context: TaskHandlerContext): Promise<void> {
const { targetPath } = task.data;
// Check if already processed
const exists = await this.storage.exists(targetPath);
if (exists) {
context.logger.info('Task already completed, skipping', {
taskId: task.id,
targetPath
});
return;
}
// Process only if not already done
await this.processImage(task.data);
}
// This will fail or create duplicates on retry
public async processTask(task: Task<ImageTask>, context: TaskHandlerContext): Promise<void> {
// No check for existing output
await this.processImage(task.data);
await this.incrementCounter(); // Side effect without guard
}
Update Metadata Strategicallyβ
Use stage metadata to store lightweight progress indicators or shared state, not large payloads. Update it only when the value provides meaningful signal:
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
await this.processSingleItem(task.data);
await context.updateStageUserMetadata({
...context.stage.userMetadata,
lastProcessedId: task.data.id,
lastUpdateAt: new Date().toISOString()
});
}
Concurrency and Performanceβ
Choose Appropriate Concurrencyβ
Set worker concurrency based on your task characteristics:
// I/O-bound tasks (API calls, file transfers)
const ioBoundWorker = sdk.createWorker('api-integration', handler, {
concurrency: 20 // Can be much higher
});
// Mixed workload
const mixedWorker = sdk.createWorker('data-pipeline', handler, {
concurrency: 5 // Conservative middle ground
});
Batch Task Creationβ
When creating many tasks, batch them for better performance:
- Good
- Bad
// Create tasks in batches
const BATCH_SIZE = 100;
const allTasksData = [...]; // 10,000 tasks
for (let i = 0; i < allTasksData.length; i += BATCH_SIZE) {
const batch = allTasksData.slice(i, i + BATCH_SIZE);
await producer.createTasks(stageId, stageType, batch);
}
// Don't create tasks one by one
for (const taskData of allTasksData) {
await producer.createTasks(stageId, stageType, [taskData]); // Slow!
}
Configure Appropriate Backoffβ
Tune retry backoff based on failure patterns:
// Fast recovery for transient network issues
const aggressiveBackoff = {
initialBaseRetryDelayMs: 500,
maxDelayMs: 5000,
backoffFactor: 1.5
};
// Conservative for rate-limited APIs
const conservativeBackoff = {
initialBaseRetryDelayMs: 2000,
maxDelayMs: 120000,
backoffFactor: 3
};
const worker = sdk.createWorker('api-call', handler, {
concurrency: 5,
backoffOptions: conservativeBackoff
});
Error Handlingβ
Throw Errors for Task Failuresβ
Let the SDK handle task status updates by throwing errors:
- β Good
- β Bad
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
try {
await this.downloadFile(task.data.url);
await this.processFile(task.data.path);
} catch (error) {
context.logger.error('Task failed', { error, taskId: task.id });
throw error; // SDK marks task as FAILED
}
}
// Don't swallow errors or manually manage status
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
try {
await this.processFile(task.data.path);
} catch (error) {
context.logger.error('Task failed', { error });
// Missing throw - SDK thinks task succeeded!
}
}
Testing Best Practicesβ
Test Your Task Handlerβ
Test the manager class directly β not the worker wiring. Pass a mocked TaskHandlerContext (logger, signal, stage data, updateStageUserMetadata) and assert on your business logic outcomes. Keep worker setup out of unit tests.
Common Anti-Patterns to Avoidβ
β Creating Circular Dependenciesβ
Don't create workflows where jobs depend on each other in a circle:
// BAD: Circular dependency
Job A creates tasks that create Job B
Job B creates tasks that create Job A
// This creates an infinite loop!
β Polling in Task Handlersβ
Don't poll for external state within a task:
// BAD: Polling in task
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
while (!await this.checkExternalSystem()) {
await sleep(1000); // Blocks worker!
}
await this.doWork();
}
// GOOD: Create a new task for retry
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
const isReady = await this.checkExternalSystem();
if (!isReady) {
throw new RetryableError('External system not ready');
}
await this.doWork();
}
Additional Resourcesβ
- Jobnik Architecture Overview
- Concepts & Data Structures
- State Transitions
- Zero to Hero Guide
- Jobnik SDK API Documentation
- Worker Boilerplate Repository
- Jobnik Manager Repository
Have feedback on these best practices? Found a pattern that works well? Share with the team!