Skip to main content

Jobnik Best Practices

This guide provides recommendations and patterns for building robust, scalable, and maintainable Jobnik-based workflows.


Job Design Patterns​

Keep Stage Focused​

Each stage should represent a single, distinct phase of work. Don't combine unrelated operations into one stage β€” if tasks need to both process and upload, split them into separate stages. This improves retry granularity and makes failures easier to diagnose. See Concepts & Data Structures for more on the job/stage/task hierarchy.

// Separate stages for distinct phases of work
const processingStage = await producer.createStage(job.id, {
type: 'process-tiles',
data: { resolution: 'high' }
});
await producer.createTasks(processingStage.id, processingStage.type, rawTiles.map(tile => ({ data: { tileId: tile.id } })));

const uploadStage = await producer.createStage(job.id, {
type: 'upload-tiles',
data: { bucket: 'maps-bucket', targetPath: '/maps/north' }
});
await producer.createTasks(uploadStage.id, uploadStage.type, processedTiles.map(tile => ({ data: { tileId: tile.id } })));

Use Meaningful Stage Names​

Stage names should clearly indicate the type of work being performed.

// Clear, descriptive stage names
await producer.createStage(jobId, {
type: 'validate-input',
data: { validationRules }
});

await producer.createStage(jobId, {
type: 'resize-images',
data: { dimensions }
});

await producer.createStage(jobId, {
type: 'upload-to-storage',
data: { bucket }
});

Store References, Not Large Payloads​

Store URLs, IDs, and paths in task data, not large files or blobs. Keep in mind that referencing external resources introduces a dependency on their availability and consistency β€” if a resource changes or is replaced while tasks are still in flight, different tasks may operate on different versions. For advanced use cases, consider including a version identifier in the stage data (shared across all tasks in the stage) to pin all tasks to the same resource version.

// Stage data holds shared config, including the resource version
await producer.createStage(jobId, {
type: 'resize-images',
data: {
sourceVersion: 'v3', // All tasks in this stage use the same version
processingConfig: { width: 800, height: 600 }
}
});

// Task data holds only per-item references
await producer.createTasks(stage.id, stage.type, [{
data: {
imageUrl: 's3://bucket/images/v3/original/file.jpg',
targetPath: 's3://bucket/images/v3/resized/file.jpg'
}
}]);

Type Safety Best Practices​

Define Strict Types​

Use TypeScript's type system to enforce data contracts between producers and workers.

// Define strict types with required fields
export interface ImageProcessingJob {
'image-processing': {
data: {
uploadId: string;
userId: string;
};
userMetadata: {
priority: 'HIGH' | 'MEDIUM' | 'LOW'; // Use unions for enums
};
};
}

export interface ImageStages {
'resize': {
data: {
readonly width: number;
readonly height: number;
};
userMetadata: {
processedCount: number;
};
task: {
data: {
sourceUrl: string;
targetPath: string;
};
userMetadata: {
attempts: number;
};
};
};
}

Worker Implementation Best Practices​

Handle Graceful Shutdown​

Always check the abort signal and clean up resources properly.

export class ImageProcessor {
public async processTask(
task: Task<ImageTask>,
context: TaskHandlerContext<JobTypes, StageTypes, 'image-processing', 'resize'>
): Promise<void> {
const { sourceUrl, targetPath } = task.data;

// Start processing
const stream = await downloadImage(sourceUrl);

// Check for shutdown signal
if (context.signal.aborted) {
stream.destroy();
throw new Error('Processing cancelled');
}

const processedImage = await this.resize(stream, context.stage.data);

// Check again before upload
if (context.signal.aborted) {
throw new Error('Processing cancelled');
}

await this.uploadImage(processedImage, targetPath);

context.logger.info('Task completed', { taskId: task.id });
}
}

Implement Idempotency​

Tasks may be retried. Ensure your handlers are idempotent:

public async processTask(task: Task<ImageTask>, context: TaskHandlerContext): Promise<void> {
const { targetPath } = task.data;

// Check if already processed
const exists = await this.storage.exists(targetPath);
if (exists) {
context.logger.info('Task already completed, skipping', {
taskId: task.id,
targetPath
});
return;
}

// Process only if not already done
await this.processImage(task.data);
}

Update Metadata Strategically​

Use stage metadata to store lightweight progress indicators or shared state, not large payloads. Update it only when the value provides meaningful signal:

public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
await this.processSingleItem(task.data);

await context.updateStageUserMetadata({
...context.stage.userMetadata,
lastProcessedId: task.data.id,
lastUpdateAt: new Date().toISOString()
});
}

Concurrency and Performance​

Choose Appropriate Concurrency​

Set worker concurrency based on your task characteristics:

// I/O-bound tasks (API calls, file transfers)
const ioBoundWorker = sdk.createWorker('api-integration', handler, {
concurrency: 20 // Can be much higher
});

// Mixed workload
const mixedWorker = sdk.createWorker('data-pipeline', handler, {
concurrency: 5 // Conservative middle ground
});

Batch Task Creation​

When creating many tasks, batch them for better performance:

// Create tasks in batches
const BATCH_SIZE = 100;
const allTasksData = [...]; // 10,000 tasks

for (let i = 0; i < allTasksData.length; i += BATCH_SIZE) {
const batch = allTasksData.slice(i, i + BATCH_SIZE);
await producer.createTasks(stageId, stageType, batch);
}

Configure Appropriate Backoff​

Tune retry backoff based on failure patterns:

// Fast recovery for transient network issues
const aggressiveBackoff = {
initialBaseRetryDelayMs: 500,
maxDelayMs: 5000,
backoffFactor: 1.5
};

// Conservative for rate-limited APIs
const conservativeBackoff = {
initialBaseRetryDelayMs: 2000,
maxDelayMs: 120000,
backoffFactor: 3
};

const worker = sdk.createWorker('api-call', handler, {
concurrency: 5,
backoffOptions: conservativeBackoff
});

Error Handling​

Throw Errors for Task Failures​

Let the SDK handle task status updates by throwing errors:

public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
try {
await this.downloadFile(task.data.url);
await this.processFile(task.data.path);
} catch (error) {
context.logger.error('Task failed', { error, taskId: task.id });
throw error; // SDK marks task as FAILED
}
}

Testing Best Practices​

Test Your Task Handler​

Test the manager class directly β€” not the worker wiring. Pass a mocked TaskHandlerContext (logger, signal, stage data, updateStageUserMetadata) and assert on your business logic outcomes. Keep worker setup out of unit tests.


Common Anti-Patterns to Avoid​

❌ Creating Circular Dependencies​

Don't create workflows where jobs depend on each other in a circle:

// BAD: Circular dependency
Job A creates tasks that create Job B
Job B creates tasks that create Job A
// This creates an infinite loop!

❌ Polling in Task Handlers​

Don't poll for external state within a task:

// BAD: Polling in task
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
while (!await this.checkExternalSystem()) {
await sleep(1000); // Blocks worker!
}
await this.doWork();
}

// GOOD: Create a new task for retry
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
const isReady = await this.checkExternalSystem();
if (!isReady) {
throw new RetryableError('External system not ready');
}
await this.doWork();
}

Additional Resources​


Questions or Suggestions?

Have feedback on these best practices? Found a pattern that works well? Share with the team!