Jobnik Best Practices
This guide provides recommendations and patterns for building robust, scalable, and maintainable Jobnik-based workflows.
Job Design Patterns
Keep Jobs Focused
Each job should represent a single logical workflow. Don't combine unrelated operations into one job.
- ✅ Good
- ❌ Bad
// Separate jobs for different workflows
await producer.createJob({
name: 'image-processing',
data: { uploadId: 'upload-123' }
});
await producer.createJob({
name: 'user-notification',
data: { userId: 'user-456' }
});
// Don't mix unrelated operations
await producer.createJob({
name: 'image-processing-and-notification',
data: {
uploadId: 'upload-123',
userId: 'user-456',
shouldSendEmail: true
}
});
Use Stage Names Meaningfully
Stage names should clearly indicate the type of work being performed.
// Clear, descriptive stage names
await producer.createStage(jobId, {
type: 'validate-input',
data: { validationRules }
});
await producer.createStage(jobId, {
type: 'resize-images',
data: { dimensions }
});
await producer.createStage(jobId, {
type: 'upload-to-storage',
data: { bucket }
});
Store References, Not Large Payloads
Store URLs, IDs, and paths in task data, not large files or blobs.
- ✅ Good
- ❌ Bad
await producer.createTask(stageId, {
data: {
imageUrl: 's3://bucket/images/original/file.jpg',
targetPath: 's3://bucket/images/resized/file.jpg',
processingConfig: { width: 800, height: 600 }
}
});
// Don't embed large data
await producer.createTask(stageId, {
data: {
imageBase64: 'iVBORw0KGgoAAAANSUhEUg...(100KB+)',
metadata: { ... }
}
});
Type Safety Best Practices
Define Strict Types
Use TypeScript's type system to enforce data contracts between producers and workers.
// Define strict types with required fields
export interface ImageProcessingJob {
'image-processing': {
data: {
uploadId: string;
userId: string;
sourceLocation: string;
targetLocation: string;
};
userMetadata: {
priority: 'HIGH' | 'MEDIUM' | 'LOW'; // Use unions for enums
uploadedAt: string; // ISO timestamp
};
};
}
export interface ImageStages {
'resize': {
data: {
readonly width: number; // Use readonly when appropriate
readonly height: number;
readonly quality: number;
readonly format: 'jpg' | 'png' | 'webp';
};
userMetadata: {
processedCount: number;
failedCount: number;
};
task: {
data: {
sourceUrl: string;
targetPath: string;
fileName: string;
};
userMetadata: {
attempts: number;
lastAttemptAt?: string;
};
};
};
}
Version Your Types
When your data structures evolve, version them appropriately:
// v1 types
export interface ImageProcessingJobV1 {
'image-processing': {
data: {
uploadId: string;
userId: string;
};
userMetadata: Record<string, never>;
};
}
// v2 types with additional fields
export interface ImageProcessingJobV2 {
'image-processing': {
data: {
uploadId: string;
userId: string;
organizationId: string; // New field
};
userMetadata: {
requestContext: RequestContext; // New field
};
};
}
Worker Implementation Best Practices
Handle Graceful Shutdown
Always check the abort signal and clean up resources properly.
export class ImageProcessor {
public async processTask(
task: Task<ImageTask>,
context: TaskHandlerContext<JobTypes, StageTypes, 'image-processing', 'resize'>
): Promise<void> {
const { sourceUrl, targetPath } = task.data;
// Start processing
const stream = await downloadImage(sourceUrl);
// Check for shutdown signal
if (context.signal.aborted) {
stream.destroy();
throw new Error('Processing cancelled');
}
const processedImage = await this.resize(stream, context.stage.data);
// Check again before upload
if (context.signal.aborted) {
throw new Error('Processing cancelled');
}
await this.uploadImage(processedImage, targetPath);
context.logger.info('Task completed', { taskId: task.id });
}
}
Use Structured Logging
Include relevant context in all log statements:
context.logger.info('Task processing started', {
taskId: task.id,
stageId: context.stage.id,
jobId: context.job.id,
fileName: task.data.fileName,
attempt: task.userMetadata.attempts
});
try {
await this.processImage(task.data);
context.logger.info('Task completed successfully', {
taskId: task.id,
processingTime: Date.now() - startTime
});
} catch (error) {
context.logger.error('Task failed', {
taskId: task.id,
error: error.message,
stack: error.stack,
data: task.data
});
throw error;
}
Implement Idempotency
Tasks may be retried. Ensure your handlers are idempotent:
- ✅ Idempotent
- ❌ Not Idempotent
public async processTask(task: Task<ImageTask>, context: TaskHandlerContext): Promise<void> {
const { targetPath } = task.data;
// Check if already processed
const exists = await this.storage.exists(targetPath);
if (exists) {
context.logger.info('Task already completed, skipping', {
taskId: task.id,
targetPath
});
return;
}
// Process only if not already done
await this.processImage(task.data);
}
// This will fail or create duplicates on retry
public async processTask(task: Task<ImageTask>, context: TaskHandlerContext): Promise<void> {
// No check for existing output
await this.processImage(task.data);
await this.incrementCounter(); // Side effect without guard
}
Update Stage Metadata Strategically
Use stage metadata to track progress, but avoid excessive updates:
export class BatchProcessor {
private updateThreshold = 10;
private processedCount = 0;
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
await this.processSingleItem(task.data);
this.processedCount++;
// Update metadata every 10 tasks instead of every task
if (this.processedCount % this.updateThreshold === 0) {
await context.updateStageUserMetadata({
...context.stage.userMetadata,
processedCount: this.processedCount,
lastUpdateAt: new Date().toISOString()
});
}
}
}
Concurrency and Performance
Choose Appropriate Concurrency
Set worker concurrency based on your task characteristics:
// CPU-bound tasks (image processing, video encoding)
const cpuBoundWorker = sdk.createWorker('image-resize', handler, {
concurrency: Math.max(1, os.cpus().length - 1) // Leave one CPU free
});
// I/O-bound tasks (API calls, file transfers)
const ioBoundWorker = sdk.createWorker('api-integration', handler, {
concurrency: 20 // Can be much higher
});
// Mixed workload
const mixedWorker = sdk.createWorker('data-pipeline', handler, {
concurrency: 5 // Conservative middle ground
});
Batch Task Creation
When creating many tasks, batch them for better performance:
- ✅ Batched
- ❌ Sequential
// Create tasks in batches
const BATCH_SIZE = 100;
const tasks = [...]; // 10,000 tasks
for (let i = 0; i < tasks.length; i += BATCH_SIZE) {
const batch = tasks.slice(i, i + BATCH_SIZE);
const promises = batch.map(taskData =>
producer.createTask(stageId, taskData)
);
await Promise.all(promises);
// Optional: Small delay between batches to avoid overwhelming the manager
await new Promise(resolve => setTimeout(resolve, 100));
}
// Don't create tasks one by one
for (const taskData of tasks) {
await producer.createTask(stageId, taskData); // Slow!
}
Configure Appropriate Backoff
Tune retry backoff based on failure patterns:
// Fast recovery for transient network issues
const aggressiveBackoff = {
initialBaseRetryDelayMs: 500,
maxDelayMs: 5000,
backoffFactor: 1.5
};
// Conservative for rate-limited APIs
const conservativeBackoff = {
initialBaseRetryDelayMs: 2000,
maxDelayMs: 120000,
backoffFactor: 3
};
const worker = sdk.createWorker('api-call', handler, {
concurrency: 5,
backoffOptions: conservativeBackoff
});
Error Handling
Throw Errors for Task Failures
Let the SDK handle task status updates by throwing errors:
- ✅ Good
- ❌ Bad
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
try {
await this.downloadFile(task.data.url);
await this.processFile(task.data.path);
} catch (error) {
context.logger.error('Task failed', { error, taskId: task.id });
throw error; // SDK marks task as FAILED
}
}
// Don't swallow errors or manually manage status
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
try {
await this.processFile(task.data.path);
} catch (error) {
context.logger.error('Task failed', { error });
// Missing throw - SDK thinks task succeeded!
}
}
Categorize Errors
Distinguish between retryable and non-retryable errors:
export class RetryableError extends Error {
constructor(message: string) {
super(message);
this.name = 'RetryableError';
}
}
export class PermanentError extends Error {
constructor(message: string) {
super(message);
this.name = 'PermanentError';
}
}
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
try {
await this.callExternalAPI(task.data);
} catch (error) {
if (error.statusCode === 429) {
// Rate limited - retryable
throw new RetryableError('Rate limit exceeded');
} else if (error.statusCode === 404) {
// Resource not found - permanent failure
throw new PermanentError(`Resource not found: ${task.data.url}`);
}
throw error;
}
}
Monitoring and Alerting
Define Key Metrics
Track business-relevant metrics in addition to SDK metrics:
import { Counter, Histogram, Gauge } from 'prom-client';
export class ImageProcessorMetrics {
private readonly processingDuration: Histogram;
private readonly failuresByReason: Counter;
private readonly queueDepth: Gauge;
constructor(registry: Registry) {
this.processingDuration = new Histogram({
name: 'image_processing_duration_seconds',
help: 'Image processing duration',
labelNames: ['size_category', 'format'],
registers: [registry]
});
this.failuresByReason = new Counter({
name: 'image_processing_failures_total',
help: 'Total processing failures by reason',
labelNames: ['reason', 'format'],
registers: [registry]
});
}
public recordProcessing(durationMs: number, format: string, sizeCategory: string): void {
this.processingDuration
.labels(sizeCategory, format)
.observe(durationMs / 1000);
}
}
Set Up Alerts
Configure alerts for critical conditions:
# prometheus-alerts.yaml
groups:
- name: jobnik_worker_alerts
interval: 30s
rules:
- alert: HighTaskFailureRate
expr: |
rate(jobnik_worker_tasks_failed_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High task failure rate"
description: "Worker {{ $labels.worker_name }} has a failure rate of {{ $value }}"
- alert: WorkerStuck
expr: |
jobnik_worker_active_tasks > 0 AND
rate(jobnik_worker_tasks_completed_total[10m]) == 0
for: 10m
labels:
severity: critical
annotations:
summary: "Worker appears stuck"
description: "Worker has active tasks but no completions in 10 minutes"
- alert: QueueBacklog
expr: |
jobnik_manager_queue_depth > 1000
for: 15m
labels:
severity: warning
annotations:
summary: "Large task queue backlog"
description: "Queue has {{ $value }} pending tasks"
Testing Best Practices
Mock the SDK for Unit Tests
Use dependency injection to mock the SDK in tests:
// worker.service.ts
export class WorkerService {
constructor(
private readonly sdk: IJobnikSDK<JobTypes, StageTypes>
) {}
}
// worker.service.spec.ts
import { describe, it, expect, vi } from 'vitest';
describe('WorkerService', () => {
it('should process tasks correctly', () => {
const mockSDK = {
createWorker: vi.fn(),
getProducer: vi.fn()
} as any;
const service = new WorkerService(mockSDK);
// Test your logic
});
});
Integration Tests with Real Manager
Test against a real Jobnik Manager instance:
describe('Integration Tests', () => {
let sdk: JobnikSDK;
beforeAll(() => {
sdk = new JobnikSDK({
baseUrl: process.env.TEST_JOBNIK_URL || 'http://localhost:3000',
metricsRegistry: new Registry()
});
});
it('should complete full workflow', async () => {
const producer = sdk.getProducer();
// Create job
const job = await producer.createJob({
name: 'test-job',
data: { testId: 'test-123' }
});
// Create stage with tasks
const stage = await producer.createStage(job.id, {
type: 'test-stage',
data: { config: 'test' }
});
await producer.createTask(stage.id, {
data: { taskData: 'test' }
});
// Start worker
const worker = sdk.createWorker('test-stage', async (task) => {
// Simple test task
await new Promise(resolve => setTimeout(resolve, 100));
});
await worker.start();
// Wait for completion
await new Promise(resolve => setTimeout(resolve, 2000));
await worker.stop();
}, 30000);
});
Security Best Practices
Validate Input Data
Never trust task data without validation:
import { z } from 'zod';
const TaskDataSchema = z.object({
sourceUrl: z.string().url(),
targetPath: z.string().regex(/^s3:\/\/.+/),
fileName: z.string().max(255)
});
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
// Validate input
const validationResult = TaskDataSchema.safeParse(task.data);
if (!validationResult.success) {
context.logger.error('Invalid task data', {
taskId: task.id,
errors: validationResult.error.errors
});
throw new PermanentError('Invalid task data');
}
const { sourceUrl, targetPath, fileName } = validationResult.data;
await this.process(sourceUrl, targetPath, fileName);
}
Sanitize File Paths
Prevent path traversal attacks:
import * as path from 'path';
function sanitizeFilePath(inputPath: string, allowedDirectory: string): string {
const normalizedPath = path.normalize(inputPath);
const fullPath = path.join(allowedDirectory, normalizedPath);
// Ensure the path is within the allowed directory
if (!fullPath.startsWith(allowedDirectory)) {
throw new PermanentError('Invalid file path');
}
return fullPath;
}
Use Secure Connections
Always use HTTPS for production:
const sdk = new JobnikSDK({
baseUrl: process.env.NODE_ENV === 'production'
? 'https://jobnik-manager.prod.example.com'
: 'http://localhost:3000',
metricsRegistry: registry
});
Common Anti-Patterns to Avoid
❌ Creating Circular Dependencies
Don't create workflows where jobs depend on each other in a circle:
// BAD: Circular dependency
Job A creates tasks that create Job B
Job B creates tasks that create Job A
// This creates an infinite loop!
❌ Polling in Task Handlers
Don't poll for external state within a task:
// BAD: Polling in task
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
while (!await this.checkExternalSystem()) {
await sleep(1000); // Blocks worker!
}
await this.doWork();
}
// GOOD: Create a new task for retry
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
const isReady = await this.checkExternalSystem();
if (!isReady) {
throw new RetryableError('External system not ready');
}
await this.doWork();
}
❌ Storing Mutable State in Task Data
Don't modify task data structure:
// BAD: Trying to modify task
task.data.processedAt = new Date(); // This won't persist!
// GOOD: Use stage metadata or external storage
await context.updateStageUserMetadata({
processedAt: new Date().toISOString()
});
❌ Ignoring Metrics
Don't skip observability:
// BAD: No visibility into performance
public async processTask(task: Task<any>): Promise<void> {
await this.doWork(task);
// No metrics, no logs, no way to debug issues
}
// GOOD: Comprehensive observability
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
const startTime = Date.now();
context.logger.info('Task started', { taskId: task.id });
try {
await this.doWork(task);
const duration = Date.now() - startTime;
this.metrics.recordSuccess(duration);
context.logger.info('Task completed', { taskId: task.id, duration });
} catch (error) {
this.metrics.recordFailure(error.message);
context.logger.error('Task failed', { taskId: task.id, error });
throw error;
}
}
Checklist for Production Readiness
Before deploying to production, ensure:
Configuration
- Environment variables are properly set
- Secrets are stored securely (not in config files)
- Connection URLs use HTTPS in production
- Appropriate concurrency levels configured
- Retry backoff tuned for your workload
Code Quality
- All types are strictly defined
- Input validation implemented
- Error handling comprehensive
- Graceful shutdown implemented
- Idempotency guaranteed
Observability
- Structured logging in place
- Custom metrics defined
- Distributed tracing enabled
- Health check endpoints exposed
- Alerts configured
Testing
- Unit tests cover core logic
- Integration tests validate workflow
- Load testing performed
- Failure scenarios tested
- Recovery mechanisms validated
Deployment
- Docker images built and tested
- Helm charts configured
- Resource limits set appropriately
- Auto-scaling configured if needed
- Rollback plan documented
Additional Resources
- Jobnik Architecture Overview
- Jobnik SDK API Documentation
- Worker Boilerplate Repository
- Jobnik Manager Repository
Have feedback on these best practices? Found a pattern that works well? Share with the team!