Skip to main content

Jobnik Best Practices

This guide provides recommendations and patterns for building robust, scalable, and maintainable Jobnik-based workflows.


Job Design Patterns

Keep Jobs Focused

Each job should represent a single logical workflow. Don't combine unrelated operations into one job.

// Separate jobs for different workflows
await producer.createJob({
name: 'image-processing',
data: { uploadId: 'upload-123' }
});

await producer.createJob({
name: 'user-notification',
data: { userId: 'user-456' }
});

Use Stage Names Meaningfully

Stage names should clearly indicate the type of work being performed.

// Clear, descriptive stage names
await producer.createStage(jobId, {
type: 'validate-input',
data: { validationRules }
});

await producer.createStage(jobId, {
type: 'resize-images',
data: { dimensions }
});

await producer.createStage(jobId, {
type: 'upload-to-storage',
data: { bucket }
});

Store References, Not Large Payloads

Store URLs, IDs, and paths in task data, not large files or blobs.

await producer.createTask(stageId, {
data: {
imageUrl: 's3://bucket/images/original/file.jpg',
targetPath: 's3://bucket/images/resized/file.jpg',
processingConfig: { width: 800, height: 600 }
}
});

Type Safety Best Practices

Define Strict Types

Use TypeScript's type system to enforce data contracts between producers and workers.

// Define strict types with required fields
export interface ImageProcessingJob {
'image-processing': {
data: {
uploadId: string;
userId: string;
sourceLocation: string;
targetLocation: string;
};
userMetadata: {
priority: 'HIGH' | 'MEDIUM' | 'LOW'; // Use unions for enums
uploadedAt: string; // ISO timestamp
};
};
}

export interface ImageStages {
'resize': {
data: {
readonly width: number; // Use readonly when appropriate
readonly height: number;
readonly quality: number;
readonly format: 'jpg' | 'png' | 'webp';
};
userMetadata: {
processedCount: number;
failedCount: number;
};
task: {
data: {
sourceUrl: string;
targetPath: string;
fileName: string;
};
userMetadata: {
attempts: number;
lastAttemptAt?: string;
};
};
};
}

Version Your Types

When your data structures evolve, version them appropriately:

// v1 types
export interface ImageProcessingJobV1 {
'image-processing': {
data: {
uploadId: string;
userId: string;
};
userMetadata: Record<string, never>;
};
}

// v2 types with additional fields
export interface ImageProcessingJobV2 {
'image-processing': {
data: {
uploadId: string;
userId: string;
organizationId: string; // New field
};
userMetadata: {
requestContext: RequestContext; // New field
};
};
}

Worker Implementation Best Practices

Handle Graceful Shutdown

Always check the abort signal and clean up resources properly.

export class ImageProcessor {
public async processTask(
task: Task<ImageTask>,
context: TaskHandlerContext<JobTypes, StageTypes, 'image-processing', 'resize'>
): Promise<void> {
const { sourceUrl, targetPath } = task.data;

// Start processing
const stream = await downloadImage(sourceUrl);

// Check for shutdown signal
if (context.signal.aborted) {
stream.destroy();
throw new Error('Processing cancelled');
}

const processedImage = await this.resize(stream, context.stage.data);

// Check again before upload
if (context.signal.aborted) {
throw new Error('Processing cancelled');
}

await this.uploadImage(processedImage, targetPath);

context.logger.info('Task completed', { taskId: task.id });
}
}

Use Structured Logging

Include relevant context in all log statements:

context.logger.info('Task processing started', {
taskId: task.id,
stageId: context.stage.id,
jobId: context.job.id,
fileName: task.data.fileName,
attempt: task.userMetadata.attempts
});

try {
await this.processImage(task.data);

context.logger.info('Task completed successfully', {
taskId: task.id,
processingTime: Date.now() - startTime
});
} catch (error) {
context.logger.error('Task failed', {
taskId: task.id,
error: error.message,
stack: error.stack,
data: task.data
});
throw error;
}

Implement Idempotency

Tasks may be retried. Ensure your handlers are idempotent:

public async processTask(task: Task<ImageTask>, context: TaskHandlerContext): Promise<void> {
const { targetPath } = task.data;

// Check if already processed
const exists = await this.storage.exists(targetPath);
if (exists) {
context.logger.info('Task already completed, skipping', {
taskId: task.id,
targetPath
});
return;
}

// Process only if not already done
await this.processImage(task.data);
}

Update Stage Metadata Strategically

Use stage metadata to track progress, but avoid excessive updates:

export class BatchProcessor {
private updateThreshold = 10;
private processedCount = 0;

public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
await this.processSingleItem(task.data);

this.processedCount++;

// Update metadata every 10 tasks instead of every task
if (this.processedCount % this.updateThreshold === 0) {
await context.updateStageUserMetadata({
...context.stage.userMetadata,
processedCount: this.processedCount,
lastUpdateAt: new Date().toISOString()
});
}
}
}

Concurrency and Performance

Choose Appropriate Concurrency

Set worker concurrency based on your task characteristics:

// CPU-bound tasks (image processing, video encoding)
const cpuBoundWorker = sdk.createWorker('image-resize', handler, {
concurrency: Math.max(1, os.cpus().length - 1) // Leave one CPU free
});

// I/O-bound tasks (API calls, file transfers)
const ioBoundWorker = sdk.createWorker('api-integration', handler, {
concurrency: 20 // Can be much higher
});

// Mixed workload
const mixedWorker = sdk.createWorker('data-pipeline', handler, {
concurrency: 5 // Conservative middle ground
});

Batch Task Creation

When creating many tasks, batch them for better performance:

// Create tasks in batches
const BATCH_SIZE = 100;
const tasks = [...]; // 10,000 tasks

for (let i = 0; i < tasks.length; i += BATCH_SIZE) {
const batch = tasks.slice(i, i + BATCH_SIZE);
const promises = batch.map(taskData =>
producer.createTask(stageId, taskData)
);
await Promise.all(promises);

// Optional: Small delay between batches to avoid overwhelming the manager
await new Promise(resolve => setTimeout(resolve, 100));
}

Configure Appropriate Backoff

Tune retry backoff based on failure patterns:

// Fast recovery for transient network issues
const aggressiveBackoff = {
initialBaseRetryDelayMs: 500,
maxDelayMs: 5000,
backoffFactor: 1.5
};

// Conservative for rate-limited APIs
const conservativeBackoff = {
initialBaseRetryDelayMs: 2000,
maxDelayMs: 120000,
backoffFactor: 3
};

const worker = sdk.createWorker('api-call', handler, {
concurrency: 5,
backoffOptions: conservativeBackoff
});

Error Handling

Throw Errors for Task Failures

Let the SDK handle task status updates by throwing errors:

public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
try {
await this.downloadFile(task.data.url);
await this.processFile(task.data.path);
} catch (error) {
context.logger.error('Task failed', { error, taskId: task.id });
throw error; // SDK marks task as FAILED
}
}

Categorize Errors

Distinguish between retryable and non-retryable errors:

export class RetryableError extends Error {
constructor(message: string) {
super(message);
this.name = 'RetryableError';
}
}

export class PermanentError extends Error {
constructor(message: string) {
super(message);
this.name = 'PermanentError';
}
}

public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
try {
await this.callExternalAPI(task.data);
} catch (error) {
if (error.statusCode === 429) {
// Rate limited - retryable
throw new RetryableError('Rate limit exceeded');
} else if (error.statusCode === 404) {
// Resource not found - permanent failure
throw new PermanentError(`Resource not found: ${task.data.url}`);
}
throw error;
}
}

Monitoring and Alerting

Define Key Metrics

Track business-relevant metrics in addition to SDK metrics:

import { Counter, Histogram, Gauge } from 'prom-client';

export class ImageProcessorMetrics {
private readonly processingDuration: Histogram;
private readonly failuresByReason: Counter;
private readonly queueDepth: Gauge;

constructor(registry: Registry) {
this.processingDuration = new Histogram({
name: 'image_processing_duration_seconds',
help: 'Image processing duration',
labelNames: ['size_category', 'format'],
registers: [registry]
});

this.failuresByReason = new Counter({
name: 'image_processing_failures_total',
help: 'Total processing failures by reason',
labelNames: ['reason', 'format'],
registers: [registry]
});
}

public recordProcessing(durationMs: number, format: string, sizeCategory: string): void {
this.processingDuration
.labels(sizeCategory, format)
.observe(durationMs / 1000);
}
}

Set Up Alerts

Configure alerts for critical conditions:

# prometheus-alerts.yaml
groups:
- name: jobnik_worker_alerts
interval: 30s
rules:
- alert: HighTaskFailureRate
expr: |
rate(jobnik_worker_tasks_failed_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High task failure rate"
description: "Worker {{ $labels.worker_name }} has a failure rate of {{ $value }}"

- alert: WorkerStuck
expr: |
jobnik_worker_active_tasks > 0 AND
rate(jobnik_worker_tasks_completed_total[10m]) == 0
for: 10m
labels:
severity: critical
annotations:
summary: "Worker appears stuck"
description: "Worker has active tasks but no completions in 10 minutes"

- alert: QueueBacklog
expr: |
jobnik_manager_queue_depth > 1000
for: 15m
labels:
severity: warning
annotations:
summary: "Large task queue backlog"
description: "Queue has {{ $value }} pending tasks"

Testing Best Practices

Mock the SDK for Unit Tests

Use dependency injection to mock the SDK in tests:

// worker.service.ts
export class WorkerService {
constructor(
private readonly sdk: IJobnikSDK<JobTypes, StageTypes>
) {}
}

// worker.service.spec.ts
import { describe, it, expect, vi } from 'vitest';

describe('WorkerService', () => {
it('should process tasks correctly', () => {
const mockSDK = {
createWorker: vi.fn(),
getProducer: vi.fn()
} as any;

const service = new WorkerService(mockSDK);
// Test your logic
});
});

Integration Tests with Real Manager

Test against a real Jobnik Manager instance:

describe('Integration Tests', () => {
let sdk: JobnikSDK;

beforeAll(() => {
sdk = new JobnikSDK({
baseUrl: process.env.TEST_JOBNIK_URL || 'http://localhost:3000',
metricsRegistry: new Registry()
});
});

it('should complete full workflow', async () => {
const producer = sdk.getProducer();

// Create job
const job = await producer.createJob({
name: 'test-job',
data: { testId: 'test-123' }
});

// Create stage with tasks
const stage = await producer.createStage(job.id, {
type: 'test-stage',
data: { config: 'test' }
});

await producer.createTask(stage.id, {
data: { taskData: 'test' }
});

// Start worker
const worker = sdk.createWorker('test-stage', async (task) => {
// Simple test task
await new Promise(resolve => setTimeout(resolve, 100));
});

await worker.start();

// Wait for completion
await new Promise(resolve => setTimeout(resolve, 2000));

await worker.stop();
}, 30000);
});

Security Best Practices

Validate Input Data

Never trust task data without validation:

import { z } from 'zod';

const TaskDataSchema = z.object({
sourceUrl: z.string().url(),
targetPath: z.string().regex(/^s3:\/\/.+/),
fileName: z.string().max(255)
});

public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
// Validate input
const validationResult = TaskDataSchema.safeParse(task.data);

if (!validationResult.success) {
context.logger.error('Invalid task data', {
taskId: task.id,
errors: validationResult.error.errors
});
throw new PermanentError('Invalid task data');
}

const { sourceUrl, targetPath, fileName } = validationResult.data;
await this.process(sourceUrl, targetPath, fileName);
}

Sanitize File Paths

Prevent path traversal attacks:

import * as path from 'path';

function sanitizeFilePath(inputPath: string, allowedDirectory: string): string {
const normalizedPath = path.normalize(inputPath);
const fullPath = path.join(allowedDirectory, normalizedPath);

// Ensure the path is within the allowed directory
if (!fullPath.startsWith(allowedDirectory)) {
throw new PermanentError('Invalid file path');
}

return fullPath;
}

Use Secure Connections

Always use HTTPS for production:

const sdk = new JobnikSDK({
baseUrl: process.env.NODE_ENV === 'production'
? 'https://jobnik-manager.prod.example.com'
: 'http://localhost:3000',
metricsRegistry: registry
});

Common Anti-Patterns to Avoid

❌ Creating Circular Dependencies

Don't create workflows where jobs depend on each other in a circle:

// BAD: Circular dependency
Job A creates tasks that create Job B
Job B creates tasks that create Job A
// This creates an infinite loop!

❌ Polling in Task Handlers

Don't poll for external state within a task:

// BAD: Polling in task
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
while (!await this.checkExternalSystem()) {
await sleep(1000); // Blocks worker!
}
await this.doWork();
}

// GOOD: Create a new task for retry
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
const isReady = await this.checkExternalSystem();
if (!isReady) {
throw new RetryableError('External system not ready');
}
await this.doWork();
}

❌ Storing Mutable State in Task Data

Don't modify task data structure:

// BAD: Trying to modify task
task.data.processedAt = new Date(); // This won't persist!

// GOOD: Use stage metadata or external storage
await context.updateStageUserMetadata({
processedAt: new Date().toISOString()
});

❌ Ignoring Metrics

Don't skip observability:

// BAD: No visibility into performance
public async processTask(task: Task<any>): Promise<void> {
await this.doWork(task);
// No metrics, no logs, no way to debug issues
}

// GOOD: Comprehensive observability
public async processTask(task: Task<any>, context: TaskHandlerContext<any, any, any, any>): Promise<void> {
const startTime = Date.now();
context.logger.info('Task started', { taskId: task.id });

try {
await this.doWork(task);

const duration = Date.now() - startTime;
this.metrics.recordSuccess(duration);
context.logger.info('Task completed', { taskId: task.id, duration });
} catch (error) {
this.metrics.recordFailure(error.message);
context.logger.error('Task failed', { taskId: task.id, error });
throw error;
}
}

Checklist for Production Readiness

Before deploying to production, ensure:

Configuration

  • Environment variables are properly set
  • Secrets are stored securely (not in config files)
  • Connection URLs use HTTPS in production
  • Appropriate concurrency levels configured
  • Retry backoff tuned for your workload

Code Quality

  • All types are strictly defined
  • Input validation implemented
  • Error handling comprehensive
  • Graceful shutdown implemented
  • Idempotency guaranteed

Observability

  • Structured logging in place
  • Custom metrics defined
  • Distributed tracing enabled
  • Health check endpoints exposed
  • Alerts configured

Testing

  • Unit tests cover core logic
  • Integration tests validate workflow
  • Load testing performed
  • Failure scenarios tested
  • Recovery mechanisms validated

Deployment

  • Docker images built and tested
  • Helm charts configured
  • Resource limits set appropriately
  • Auto-scaling configured if needed
  • Rollback plan documented

Additional Resources


Questions or Suggestions?

Have feedback on these best practices? Found a pattern that works well? Share with the team!