Skip to main content

From Zero to Hero

Step by step guide on how to integrate the MapColonies™ Jobnik Job Management system into your distributed workflows.

Assumptions

This guide assumes that you have:

  • Basic understanding of distributed systems and asynchronous task processing
  • Familiarity with TypeScript and Node.js (>= 24.0.0)
  • Access to a Jobnik Manager instance
  • An understanding of your workflow requirements (jobs, stages, and tasks)

For background on the Jobnik architecture, see the Jobnik Knowledge Base.

Before we start

The implementation of a Jobnik-based workflow consists of the following steps:

  1. Understanding the Jobnik hierarchy (Jobs, Stages, Tasks)
  2. Setting up your project environment
  3. Defining your custom job and stage types
  4. Implementing a Producer service (creating work)
  5. Implementing a Worker service (processing work)
  6. Testing and deploying your services

Below, each step is explained in detail.


Understanding the Jobnik Hierarchy

Jobnik uses a three-level hierarchy to organize work:

  • Job: The root entity representing an entire workflow (e.g., "Process User Upload")
  • Stage: A logical grouping of similar tasks within a job (e.g., "Image Resizing")
  • Task: An atomic unit of work to be executed (e.g., "Resize image_001.jpg")

Key Concepts:

  • Each Job can have multiple Stages
  • Stages are ordered sequentially - each stage has an order field (1, 2, 3, etc.)
  • Stages execute in order - a stage cannot transition to PENDING until the previous stage completes
  • Each Stage can have multiple Tasks
  • Tasks are processed independently by Workers
  • The Manager enforces valid state transitions

State Transitions: User vs System Control

Understanding which state transitions are user-controlled (via API) versus system-controlled (automatic) is crucial for working with Jobnik effectively.

Job State Transitions

Jobs follow a state machine with both manual and automatic transitions:

User-Controlled Job Transitions (via API):

  • PENDING: Resume a paused job or explicitly start a created job
  • PAUSED: Temporarily suspend job processing
  • ABORTED: Cancel the job permanently

System-Controlled Job Transitions (Automatic):

  • 🤖 CREATED → PENDING: Automatically set when job is created
  • 🤖 PENDING → IN_PROGRESS: Triggered when the first task in the first stage starts processing
  • 🤖 IN_PROGRESS → COMPLETED: Triggered when all stages are completed
  • 🤖 IN_PROGRESS → FAILED: Triggered when a stage fails
// User-controlled: Pause a running job
await fetch(`${managerUrl}/v1/jobs/${jobId}/status`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ status: 'PAUSED' })
});

// User-controlled: Resume a paused job
await fetch(`${managerUrl}/v1/jobs/${jobId}/status`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ status: 'PENDING' })
});

// User-controlled: Abort a job
await fetch(`${managerUrl}/v1/jobs/${jobId}/status`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ status: 'ABORTED' })
});

// ❌ ILLEGAL: Cannot manually set to IN_PROGRESS, COMPLETED, or FAILED
// These are system-managed based on task/stage completion

Stage State Transitions

Stages have more restricted user control - most transitions are automatic:

User-Controlled Stage Transition (via API):

  • WAITING → PENDING: Resume a stage that was waiting for manual approval/intervention

System-Controlled Stage Transitions (Automatic):

  • 🤖 CREATED → PENDING: Automatically set when stage is created (if first stage or previous stage is COMPLETED)
  • 🤖 CREATED → WAITING: System can set stage to wait for external conditions
  • 🤖 PENDING → IN_PROGRESS: Triggered when the first task in the stage starts
  • 🤖 IN_PROGRESS → COMPLETED: Triggered when all tasks in the stage complete successfully
  • 🤖 IN_PROGRESS → FAILED: Triggered when any task reaches max retries and fails permanently
  • 🤖 Any → ABORTED: Cascaded from job abortion

Stage Ordering Constraints:

  • Stages are ordered sequentially (order: 1, 2, 3...)
  • A stage with order: 2 cannot transition to PENDING until stage with order: 1 is COMPLETED
  • This enforces sequential workflow execution
// User-controlled: Resume a waiting stage (e.g., after manual approval)
await fetch(`${managerUrl}/v1/stages/${stageId}/status`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ status: 'PENDING' })
});

// ❌ ILLEGAL: Cannot manually set to other statuses
// IN_PROGRESS, COMPLETED, FAILED, ABORTED are system-managed

// ❌ ILLEGAL: Cannot skip stage order
// Stage 2 cannot go to PENDING if Stage 1 is not COMPLETED

Task State Transitions

Tasks have very limited user control - workers can only mark them as done:

User-Controlled Task Transitions (via Worker API):

  • IN_PROGRESS → COMPLETED: Worker successfully completes the task
  • IN_PROGRESS → FAILED: Worker fails the task (after max retries)

System-Controlled Task Transitions (Automatic):

  • 🤖 CREATED → PENDING: Automatically set when task is created
  • 🤖 PENDING → IN_PROGRESS: Triggered when a worker dequeues the task
  • 🤖 IN_PROGRESS → RETRIED: System retries if attempts < maxAttempts after failure
  • 🤖 RETRIED → IN_PROGRESS: Task re-enters queue for another attempt

Dequeue Operation: The dequeue operation is special - it atomically moves a task from PENDING to IN_PROGRESS:

// Worker dequeues a task (system automatically sets IN_PROGRESS)
const task = await worker.dequeueTask('resize-images');
// Task is now IN_PROGRESS

try {
// Process the task
await processImage(task.data);

// User-controlled: Mark as completed
await worker.updateTaskStatus(task.id, 'COMPLETED');
} catch (error) {
// User-controlled: Mark as failed
await worker.updateTaskStatus(task.id, 'FAILED');
}

// ❌ ILLEGAL: Cannot manually set to PENDING or IN_PROGRESS
// These are managed by the dequeue operation

Cascading Effects

State transitions often cascade through the hierarchy:

Task → Stage → Job:

  1. All tasks in stage complete → Stage becomes COMPLETED
  2. All stages in job complete → Job becomes COMPLETED
  3. Any task fails permanently → Stage becomes FAILED → Job becomes FAILED

Job Abortion Cascades Down:

  1. User aborts job → Job becomes ABORTED
  2. All stages cascade to ABORTED
  3. All in-progress tasks should be cancelled (not implemented yet, planned for future)

Stage Ordering:

  1. Stage 1 completes → Stage 2 can transition to PENDING
  2. Stage 2 starts processing → Stage 3 remains in CREATED or WAITING

Summary Table

EntityUser-Controlled StatusesSystem-Controlled StatusesAPI Endpoint
JobPENDING, PAUSED, ABORTEDIN_PROGRESS, COMPLETED, FAILED, CREATEDPUT /v1/jobs/{jobId}/status
StagePENDING (from WAITING only)IN_PROGRESS, COMPLETED, FAILED, ABORTED, WAITING, CREATEDPUT /v1/stages/{stageId}/status
TaskCOMPLETED, FAILEDPENDING, IN_PROGRESS, RETRIED, CREATEDPUT /v1/tasks/{taskId}/status

Best Practices for State Management

  1. Let the system manage most transitions: Trust the automatic state machine - don't try to manually control system-managed states
  2. Use PAUSED for temporary suspension: If you need to temporarily stop work, use job-level PAUSED status
  3. Use WAITING for approval workflows: If a stage needs manual approval, the system can set it to WAITING, then user can resume via PENDING
  4. Handle ABORTED gracefully: Workers should check task/job status and handle abortion scenarios
  5. Monitor for FAILED states: Set up alerting for jobs/stages that enter FAILED state
  6. Respect stage ordering: Don't try to skip or reorder stages - the system enforces sequential execution

Data Structure Explained

Understanding the structure of Jobs, Stages, and Tasks is crucial for working with Jobnik. Each entity has specific fields and serves a different purpose in the workflow.

Job Structure

A Job is the root entity that represents an entire workflow. When you create a job, you provide:

const job = await producer.createJob({
name: 'image-processing', // Job type name
data: { // Business data for the job
uploadId: 'upload-123',
userId: 'user-456',
totalFiles: 10
},
userMetadata: { // Additional metadata
priority: 'HIGH',
requestTimestamp: '2026-02-17T10:00:00Z'
},
priority: 'HIGH' // Job priority (VERY_HIGH, HIGH, MEDIUM, LOW, VERY_LOW)
});

Job Fields Explained:

FieldTypeDescription
idstringUnique Identifier: Auto-generated UUID that uniquely identifies this job across the entire system. Generated by the Manager upon job creation.
namestringJob Type: The job type identifier that must match one of your defined job types in your TypeScript definitions. Used to identify what kind of workflow this job represents (e.g., 'image-processing', 'data-ingestion').
statusenumLifecycle State: Current state of the job. Possible values:
PENDING - Job created, waiting for stages/tasks
IN_PROGRESS - At least one task is being processed
COMPLETED - All stages and tasks completed successfully
FAILED - One or more tasks/stages failed
ABORTED - Job was explicitly cancelled/aborted
dataobjectBusiness Data: Immutable business-specific data structure defined by your job type. Contains essential information needed throughout the job lifecycle. Structure is completely customizable based on your workflow needs.
userMetadataobjectCustom Metadata: Additional metadata you want to store with the job. Unlike data, this can include non-essential information like audit trails, request context, or debugging information. Structure is fully customizable.
priorityenumPriority Level: Determines task dequeue order. Possible values: VERY_HIGH, HIGH, MEDIUM, LOW, VERY_LOW. Higher priority jobs have their tasks dequeued first (VERY_HIGH is highest, VERY_LOW is lowest). This affects the order workers receive tasks but doesn't change the job structure.
createdAtstringCreation Timestamp: ISO 8601 formatted timestamp indicating when the job was created in the system. Set automatically by the Manager.
updatedAtstringLast Update Timestamp: ISO 8601 formatted timestamp indicating when the job was last modified. Updated automatically by the Manager on any status change.

Stage Structure

A Stage represents a logical grouping of tasks within a job. Stages contain configuration that applies to all tasks within them.

Stage Ordering

Stages are sequentially ordered within a job. Each stage is automatically assigned an order number (1, 2, 3, etc.) when created. Stages must execute in order - a stage can only transition to PENDING status after the previous stage is COMPLETED. This ensures your workflow executes steps in the correct sequence.

const stage = await producer.createStage(job.id, {
type: 'resize-images', // Stage type name
data: { // Configuration for all tasks
targetWidth: 1920,
targetHeight: 1080,
quality: 85
},
userMetadata: { // Stage-level metadata
format: 'jpg',
processedCount: 0
}
});

Stage Fields Explained:

FieldTypeDescription
idstringUnique Identifier: Auto-generated UUID that uniquely identifies this stage. Generated by the Manager when the stage is created within a job.
jobIdstringParent Job Reference: UUID of the parent job this stage belongs to. Establishes the hierarchical relationship in the Job → Stage → Task structure.
typestringStage Type: The stage type identifier that must match one of your defined stage types in your TypeScript definitions. Identifies what kind of work this stage performs (e.g., 'resize-images', 'validate-data'). Workers subscribe to specific stage types.
ordernumberExecution Sequence: Auto-assigned sequential number (1, 2, 3, etc.) that determines the execution order of stages within a job. The first stage gets order: 1, the second gets order: 2, and so on. Stages must complete in order - a stage cannot transition to PENDING status until the previous stage (order - 1) is COMPLETED. This ensures workflow integrity and proper stage sequencing.
statusenumLifecycle State: Current state of the stage. Possible values:
PENDING - Stage created, waiting for tasks to be added or processed
IN_PROGRESS - At least one task is being processed
COMPLETED - All tasks in this stage completed successfully
FAILED - One or more tasks in this stage failed
ABORTED - Stage was explicitly cancelled/aborted
dataobjectStage Configuration: Immutable configuration data shared by all tasks in this stage. Contains settings, parameters, or configuration that applies to every task (e.g., image dimensions, quality settings). Workers access this to configure their processing logic.
userMetadataobjectMutable Metadata: Metadata that can be updated during task processing using context.updateStageUserMetadata(). Commonly used to track progress (e.g., processed count), aggregate results, or store stage-level state. This is the only mutable part of a stage after creation.
summaryobjectTask Statistics: Aggregated counts of tasks grouped by status. Contains: pending, inProgress, completed, failed, created, retried, and total. The system automatically updates these counts as tasks change status. Used for monitoring progress and determining when all tasks are complete. Example: { pending: 5, inProgress: 2, completed: 10, failed: 1, created: 0, retried: 0, total: 18 }
percentagenumberCompletion Progress: Percentage (0-100) of completed tasks. Calculated as floor((completed / total) * 100). Automatically updated by the system as tasks complete. Used for progress tracking and UI progress bars.
createdAtstringCreation Timestamp: ISO 8601 formatted timestamp indicating when the stage was created. Set automatically by the Manager.
updatedAtstringLast Update Timestamp: ISO 8601 formatted timestamp indicating when the stage was last modified (including metadata updates). Updated automatically by the Manager.
startTimestringProcessing Start Time: ISO 8601 formatted timestamp indicating when the first task in this stage started processing (transitioned to IN_PROGRESS). Set automatically by the system. null until the stage begins processing.
endTimestringCompletion Time: ISO 8601 formatted timestamp indicating when the stage reached a final state (COMPLETED, FAILED, or ABORTED). Set automatically by the system. null until the stage finishes.

Task Structure

A Task is an atomic unit of work to be executed by a worker. Each task has its own data and can be processed independently.

const task = await producer.createTask(stage.id, {
data: { // Task-specific data
sourceUrl: 'https://example.com/image1.jpg',
targetPath: '/output/resized/image1.jpg',
fileName: 'image1.jpg'
},
userMetadata: { // Task-level metadata
retryCount: 0,
batchId: 'batch-001'
},
maxAttempts: 3 // Optional: Maximum retry attempts (default: 3)
});

Task Fields Explained:

FieldTypeDescription
idstringUnique Identifier: Auto-generated UUID that uniquely identifies this task. Generated by the Manager when the task is created within a stage.
stageIdstringParent Stage Reference: UUID of the parent stage this task belongs to. Used to access stage configuration and establish the Job → Stage → Task hierarchy.
statusenumLifecycle State: Current state of the task. Possible values:
PENDING - Task created and waiting in the queue
IN_PROGRESS - Task has been dequeued and is being processed by a worker
COMPLETED - Task processing finished successfully
FAILED - Task processing failed (worker threw an error)
RETRIED - Task failed but will be retried (attempts < maxAttempts)
ABORTED - Task was explicitly cancelled/aborted
dataobjectTask-Specific Data: Immutable data specific to this individual task. Contains the exact parameters needed to process this single unit of work (e.g., which file to process, where to save output). This is what makes each task unique within a stage.
userMetadataobjectTask Metadata: Additional metadata for this specific task. Can include tracking information, retry counts, batch identifiers, or any contextual data. Unlike stage metadata, this cannot be updated during processing—it's set at creation time.
attemptsnumberAttempt Counter: Tracks how many times this task has been attempted by workers. Incremented automatically each time a worker picks up the task. Useful for implementing retry limits or identifying problematic tasks.
maxAttemptsnumberRetry Limit: Maximum number of retry attempts before the task permanently fails. Set at task creation (default: 3). When a task fails and attempts < maxAttempts, it transitions to RETRIED status and is re-queued. When attempts >= maxAttempts, it transitions to FAILED status.
createdAtstringCreation Timestamp: ISO 8601 formatted timestamp indicating when the task was created. Set automatically by the Manager.
updatedAtstringLast Update Timestamp: ISO 8601 formatted timestamp indicating when the task status was last changed. Updated automatically by the Manager when workers update task status.

Understanding data vs userMetadata

Key Distinction

data is your business logic payload (immutable) - the essential information needed to execute the work.
userMetadata is internal helper data (mutable for stages) - additional context for tracking, debugging, and monitoring.

Both data and userMetadata are custom objects you define, but they serve different purposes:

AspectdatauserMetadata
PurposeCore business logic payloadInternal helper data for tracking/monitoring
MutabilityImmutable after creationMutable (can be updated for stages via API)
UsageEssential parameters for task executionProgress tracking, debugging, audit trails
Example (Job){ uploadId, userId, totalFiles }{ priority, requestTimestamp, requestIp }
Example (Stage){ targetWidth, targetHeight, quality }{ processedCount, failedCount, format }
Example (Task){ sourceUrl, targetPath, fileName }{ retryCount, batchId, priority }

When to use data:

  • Store essential business logic information needed to perform the work
  • Include parameters that define what the task should do
  • Keep configuration that shouldn't change (immutable)
  • Business payload that workers need to execute tasks

When to use userMetadata:

  • Track processing progress or metrics (e.g., items processed)
  • Store audit information (who, when, why, from where)
  • Add debugging or troubleshooting context
  • Include non-essential contextual information
  • Data that may need to be updated during processing (stages only)

Complete Example with All Three Entities

Here's a complete example showing how data flows through the hierarchy:

// 1. Create a Job - represents the entire upload processing workflow
const job = await producer.createJob({
name: 'user-upload-processing',
data: {
uploadId: 'upload-2026-02-17-001',
userId: 'user-12345',
uploadBucket: 's3://uploads/raw',
totalFiles: 3
},
userMetadata: {
userEmail: 'user@example.com',
uploadedAt: '2026-02-17T10:00:00Z',
clientIp: '192.168.1.100'
},
priority: 'HIGH'
});

// 2. Create a Stage - image validation (will be assigned order: 1)
const validationStage = await producer.createStage(job.id, {
type: 'validate-images',
data: {
allowedFormats: ['jpg', 'png', 'webp'],
maxFileSizeMB: 50,
minDimensions: { width: 100, height: 100 }
},
userMetadata: {
validatedCount: 0,
rejectedCount: 0
}
});

// validationStage.order === 1
// validationStage.status === 'PENDING' (first stage starts as PENDING)

// 3. Create Tasks - one for each file
const files = [
{ url: 's3://uploads/raw/image1.jpg', size: 2048000 },
{ url: 's3://uploads/raw/image2.png', size: 3145728 },
{ url: 's3://uploads/raw/image3.webp', size: 1572864 }
];

for (const file of files) {
await producer.createTask(validationStage.id, {
data: {
sourceUrl: file.url,
fileSizeBytes: file.size,
fileName: file.url.split('/').pop()
},
userMetadata: {
uploadIndex: files.indexOf(file),
attemptNumber: 1
}
});
}

// 4. Create another Stage - image resizing (will be assigned order: 2)
const resizeStage = await producer.createStage(job.id, {
type: 'resize-images',
data: {
targetWidth: 1920,
targetHeight: 1080,
quality: 85,
outputFormat: 'jpg',
outputBucket: 's3://uploads/processed'
},
userMetadata: {
processedCount: 0
}
});

// resizeStage.order === 2
// resizeStage.status === 'CREATED' (not first stage, waits for previous to complete)
// This stage will automatically transition to PENDING when validationStage completes

State Transitions

Each entity (Job, Stage, Task) follows a state machine:

State Transition Rules:

  • Jobs transition based on their stages' states (aggregate of all stages)
  • Stages transition based on their tasks' states (aggregate of all tasks)
  • Tasks transition based on worker execution results
  • You cannot add tasks to a COMPLETED, FAILED, or ABORTED stage
  • The Manager enforces these state transitions automatically and atomically
  • State changes are irreversible—once a task is COMPLETED or FAILED, it cannot change state
  • The ABORTED state can only be set explicitly via API, not by worker execution

Setting up your project environment

The fastest way to get started is using the Jobnik Worker Boilerplate:

# Clone the boilerplate
git clone git@github.com:MapColonies/jobnik-worker-boilerplate.git my-worker

cd my-worker

# Install dependencies
npm install

# Configure the Jobnik Manager URL
# Edit config/default.json or use environment variables

The boilerplate includes:

  • Pre-configured Jobnik SDK integration
  • Dependency injection setup with tsyringe
  • Observability (logging, metrics, tracing)
  • Docker and Helm deployment assets
  • Example logistics implementation

Option 2: Add SDK to existing project

If you have an existing service:

npm install @map-colonies/jobnik-sdk

Then integrate the SDK as shown in the sections below.


Defining your custom types

Type safety is a core feature of the Jobnik SDK. Define your domain-specific types to ensure compile-time validation.

Step 1: Create your types file

import type { IJobnikSDK } from '@map-colonies/jobnik-sdk';

// Define all your job types
export interface MyJobTypes {
'image-processing': {
data: {
uploadId: string;
userId: string;
totalFiles: number;
};
userMetadata: {
priority: 'HIGH' | 'MEDIUM' | 'LOW';
requestTimestamp: string;
};
};
'data-ingestion': {
data: {
sourceUrl: string;
targetBucket: string;
};
userMetadata: {
ingestionType: 'incremental' | 'full';
};
};
}

// Define all your stage types
export interface MyStageTypes {
'resize-images': {
data: {
targetWidth: number;
targetHeight: number;
quality: number;
};
userMetadata: {
format: 'jpg' | 'png' | 'webp';
};
task: {
data: {
sourceUrl: string;
targetPath: string;
fileName: string;
};
userMetadata: {
retryCount: number;
};
};
};
'generate-thumbnails': {
data: {
thumbnailSize: number;
};
userMetadata: Record<string, never>;
task: {
data: {
sourceImagePath: string;
thumbnailPath: string;
};
userMetadata: {
priority: number;
};
};
};
}

// Export typed SDK
export type MyJobnikSDK = IJobnikSDK<MyJobTypes, MyStageTypes>;
Type Safety Benefits

By defining these types, you get:

  • Autocomplete for job and stage names
  • Compile-time validation of data structures
  • Type-safe task handlers
  • Reduced runtime errors

Implementing a Producer Service

A Producer creates jobs and tasks. This is typically your API server or orchestration service.

Step 1: Initialize the SDK

import { JobnikSDK } from '@map-colonies/jobnik-sdk';
import { Registry } from 'prom-client';
import type { MyJobnikSDK } from '../types/jobnik.types';

export class ImageProcessingProducer {
private readonly sdk: MyJobnikSDK;

constructor(
private readonly jobnikBaseUrl: string,
private readonly metricsRegistry: Registry
) {
this.sdk = new JobnikSDK({
baseUrl: jobnikBaseUrl,
metricsRegistry: metricsRegistry,
});
}

public async createImageProcessingJob(
uploadId: string,
userId: string,
imageUrls: string[]
): Promise<string> {
const producer = this.sdk.getProducer();

// Create the Job
const job = await producer.createJob({
name: 'image-processing',
data: {
uploadId,
userId,
totalFiles: imageUrls.length,
},
userMetadata: {
priority: 'HIGH',
requestTimestamp: new Date().toISOString(),
},
priority: 'HIGH',
});

// Create a Stage for resizing
const resizeStage = await producer.createStage(job.id, {
type: 'resize-images',
data: {
targetWidth: 1920,
targetHeight: 1080,
quality: 85,
},
userMetadata: {
format: 'jpg',
},
});

// Create Tasks for each image
const taskPromises = imageUrls.map((url, index) =>
producer.createTask(resizeStage.id, {
data: {
sourceUrl: url,
targetPath: `/processed/${uploadId}/resized/${index}.jpg`,
fileName: `image_${index}.jpg`,
},
userMetadata: {
retryCount: 0,
},
})
);

await Promise.all(taskPromises);

return job.id;
}
}

Step 2: Use the Producer in your API

import { Request, Response } from 'express';
import { ImageProcessingProducer } from '../producer/producer.service';

export class UploadController {
constructor(private readonly producer: ImageProcessingProducer) {}

public async handleUpload(req: Request, res: Response): Promise<void> {
const { userId, imageUrls } = req.body;
const uploadId = generateUploadId();

try {
const jobId = await this.producer.createImageProcessingJob(
uploadId,
userId,
imageUrls
);

res.status(202).json({
message: 'Processing started',
jobId,
uploadId,
});
} catch (error) {
res.status(500).json({ error: 'Failed to create job' });
}
}
}

Implementing a Worker Service

A Worker consumes tasks and executes your business logic.

Step 1: Create your task handler

import { injectable } from 'tsyringe';
import type { Task, TaskHandlerContext } from '@map-colonies/jobnik-sdk';
import type { MyJobTypes, MyStageTypes } from '../types/jobnik.types';

@injectable()
export class ImageResizeHandler {
public async handleResizeTask(
task: Task<MyStageTypes['resize-images']['task']>,
context: TaskHandlerContext<MyJobTypes, MyStageTypes, 'image-processing', 'resize-images'>
): Promise<void> {
const { sourceUrl, targetPath, fileName } = task.data;

context.logger.info('Starting image resize', {
taskId: task.id,
fileName,
});

try {
// Your actual image processing logic here
await this.resizeImage(sourceUrl, targetPath, context.stage.data);

// Check for graceful shutdown signal (worker calls stop())
// Note: Task abortion is NOT automatically implemented in the Manager.
// The AbortSignal is provided by the SDK when worker.stop() is called.
// You MUST implement cancellation checks in your task handler code.
if (context.signal.aborted) {
throw new Error('Task cancelled due to shutdown');
}

// Update stage metadata if needed
const currentProcessed = context.stage.userMetadata.processedCount || 0;
await context.updateStageUserMetadata({
...context.stage.userMetadata,
processedCount: currentProcessed + 1,
});

context.logger.info('Image resize completed', {
taskId: task.id,
fileName,
});
} catch (error) {
context.logger.error('Image resize failed', {
taskId: task.id,
fileName,
error,
});
throw error; // SDK will mark task as FAILED
}
}

private async resizeImage(
sourceUrl: string,
targetPath: string,
config: MyStageTypes['resize-images']['data']
): Promise<void> {
// Implement your actual image resizing logic
// This is where you'd use libraries like sharp, jimp, etc.
}
}

Step 2: Configure and start the Worker

import { container } from 'tsyringe';
import { JobnikSDK } from '@map-colonies/jobnik-sdk';
import { Registry } from 'prom-client';
import { ImageResizeHandler } from './image-resize.handler';
import type { MyJobnikSDK } from '../types/jobnik.types';

export async function setupWorker(
jobnikBaseUrl: string,
metricsRegistry: Registry
): Promise<void> {
// Initialize SDK
const sdk: MyJobnikSDK = new JobnikSDK({
baseUrl: jobnikBaseUrl,
metricsRegistry,
});

// Register handler
const handler = container.resolve(ImageResizeHandler);

// Create worker
const worker = sdk.createWorker<'image-processing', 'resize-images'>(
'resize-images',
handler.handleResizeTask.bind(handler),
{
concurrency: 5, // Process 5 tasks in parallel
backoffOptions: {
initialBaseRetryDelayMs: 1000,
maxDelayMs: 60000,
backoffFactor: 2,
},
}
);

// Start processing
await worker.start();
console.log('Worker started successfully');

// Graceful shutdown
const shutdown = async (): Promise<void> => {
console.log('Shutting down worker...');
await worker.stop();
console.log('Worker stopped');
process.exit(0);
};

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}

Configuration

Configure your services using environment variables or config files:

# Jobnik Manager
JOBNIK_BASE_URL=https://jobnik-manager.example.com

# Worker Configuration
WORKER_CONCURRENCY=5
WORKER_RETRY_INITIAL_DELAY_MS=1000
WORKER_RETRY_MAX_DELAY_MS=60000

# Observability
LOG_LEVEL=info
TELEMETRY_TRACING_ENABLED=true
TELEMETRY_TRACING_URL=http://otlp-collector:4318/v1/traces

Testing your implementation

Jobnik provides comprehensive testing capabilities at multiple levels. For production-grade E2E test examples, see the Jobnik E2E Test Suite.

Unit Testing Task Handlers

Test your task handler logic in isolation using mocked SDK interfaces:

import { describe, it, expect, vi } from 'vitest';
import { ImageResizeHandler } from '../src/worker/image-resize.handler';
import type { Task, TaskHandlerContext } from '@map-colonies/jobnik-sdk';

describe('ImageResizeHandler', () => {
it('should successfully process a resize task', async () => {
const handler = new ImageResizeHandler();

const mockTask: Task<any> = {
id: 'task-123',
stageId: 'stage-123',
data: {
sourceUrl: 'https://example.com/image.jpg',
targetPath: '/output/resized.jpg',
fileName: 'image.jpg',
},
userMetadata: { retryCount: 0 },
status: 'IN_PROGRESS',
attempts: 1,
createdAt: new Date(),
updatedAt: new Date(),
};

const mockContext: TaskHandlerContext<any, any, any, any> = {
logger: {
info: vi.fn(),
error: vi.fn(),
warn: vi.fn(),
debug: vi.fn(),
},
signal: new AbortController().signal,
job: {
id: 'job-123',
name: 'image-processing',
data: { uploadId: 'test-123', userId: 'user-1', totalFiles: 1 },
userMetadata: { priority: 'HIGH' },
status: 'IN_PROGRESS',
priority: 'HIGH',
createdAt: new Date(),
updatedAt: new Date(),
},
stage: {
id: 'stage-123',
jobId: 'job-123',
type: 'resize-images',
data: { targetWidth: 800, targetHeight: 600, quality: 85 },
userMetadata: { processedCount: 0 },
status: 'IN_PROGRESS',
createdAt: new Date(),
updatedAt: new Date(),
},
updateStageUserMetadata: vi.fn(),
} as any;

await expect(
handler.handleResizeTask(mockTask, mockContext)
).resolves.not.toThrow();

expect(mockContext.logger.info).toHaveBeenCalledWith(
'Starting image resize',
expect.any(Object)
);
});

it('should handle errors and log appropriately', async () => {
const handler = new ImageResizeHandler();

const mockTask: Task<any> = {
id: 'task-456',
stageId: 'stage-123',
data: {
sourceUrl: 'https://example.com/invalid.jpg',
targetPath: '/output/resized.jpg',
fileName: 'invalid.jpg',
},
userMetadata: { retryCount: 1 },
status: 'IN_PROGRESS',
attempts: 2,
createdAt: new Date(),
updatedAt: new Date(),
};

const mockContext: TaskHandlerContext<any, any, any, any> = {
logger: {
info: vi.fn(),
error: vi.fn(),
warn: vi.fn(),
debug: vi.fn(),
},
signal: new AbortController().signal,
job: { id: 'job-123', name: 'image-processing', data: {} } as any,
stage: { id: 'stage-123', type: 'resize-images', data: {} } as any,
updateStageUserMetadata: vi.fn(),
} as any;

// Simulate error in processing
vi.spyOn(handler as any, 'resizeImage').mockRejectedValue(
new Error('Failed to download image')
);

await expect(
handler.handleResizeTask(mockTask, mockContext)
).rejects.toThrow('Failed to download image');

expect(mockContext.logger.error).toHaveBeenCalledWith(
'Image resize failed',
expect.objectContaining({
taskId: 'task-456',
fileName: 'invalid.jpg'
})
);
});
});

Integration Testing with Jobnik Manager

Test the full workflow against a real or test Jobnik Manager instance:

import { describe, it, expect, beforeAll } from 'vitest';
import { JobnikSDK } from '@map-colonies/jobnik-sdk';
import { Registry } from 'prom-client';

describe('Simple Workflow', () => {
let sdk: JobnikSDK;

beforeAll(() => {
sdk = new JobnikSDK({
baseUrl: process.env.JOBNIK_MANAGER_BASE_URL || 'http://localhost:8080',
metricsRegistry: new Registry(),
});
});

it('should create job with stage and task', async () => {
const producer = sdk.getProducer();

// Create job
const job = await producer.createJob({
name: 'image-processing',
data: { uploadId: 'test-123', userId: 'user-1', totalFiles: 1 },
userMetadata: { priority: 'HIGH', requestTimestamp: new Date().toISOString() },
priority: 'HIGH',
});

expect(job.id).toBeDefined();
expect(job.name).toBe('image-processing');
expect(job.status).toBe('PENDING');

// Create stage
const stage = await producer.createStage(job.id, {
type: 'resize-images',
data: { targetWidth: 800, targetHeight: 600, quality: 85 },
userMetadata: { format: 'jpg' },
});

expect(stage.id).toBeDefined();
expect(stage.jobId).toBe(job.id);
expect(stage.type).toBe('resize-images');

// Create task
const task = await producer.createTask(stage.id, {
data: {
sourceUrl: 'https://example.com/test.jpg',
targetPath: '/output/test.jpg',
fileName: 'test.jpg',
},
userMetadata: { retryCount: 0 },
});

expect(task.id).toBeDefined();
expect(task.stageId).toBe(stage.id);
expect(task.status).toBe('PENDING');
});
});

End-to-End Testing

For comprehensive E2E testing examples including:

  • Multi-stage workflows
  • Retry mechanisms and error recovery
  • Priority queue management
  • Pause/resume operations
  • Job abortion and cleanup
  • Distributed tracing validation

See the Jobnik E2E Test Suite repository which provides:

# Clone the E2E test suite
git clone https://github.com/MapColonies/jobnik-e2e.git
cd jobnik-e2e

# Install dependencies
npm ci

# Start test environment with Docker Compose
docker compose up -d

# Run all E2E tests
npm test

# Run specific test suite
npm test -- simple.spec.ts

# Run tests matching pattern
npm test -- -t "retry"

Available E2E Test Suites:

  • simple.spec.ts - Basic job execution
  • multipleStagesWorkflow.spec.ts - Multi-stage pipelines
  • retry.spec.ts - Retry mechanisms
  • pause.spec.ts - Pause/resume operations
  • abort.spec.ts - Job abortion
  • priority.spec.ts - Priority queue management
  • deleteJob.spec.ts - Job deletion
  • sharedStageTypes.spec.ts - Shared stage definitions
  • wait.spec.ts - Asynchronous operations

Deployment

Using Docker

FROM node:24-alpine AS builder

WORKDIR /app

COPY package*.json ./
RUN npm ci

COPY . .
RUN npm run build

FROM node:24-alpine

WORKDIR /app

COPY --from=builder /app/dist ./dist
COPY --from=builder /app/node_modules ./node_modules
COPY package*.json ./

ENV NODE_ENV=production

CMD ["node", "dist/index.js"]

Using Kubernetes/Helm

If you used the Worker Boilerplate, Helm charts are included:

# Deploy worker
helm install my-image-worker ./helm \
--set env.jobnik.baseUrl=https://jobnik-manager.prod.example.com \
--set env.worker.concurrency=10

Monitoring and Observability

Metrics

The SDK automatically exposes Prometheus metrics:

  • jobnik_worker_tasks_total - Total tasks processed
  • jobnik_worker_task_duration_seconds - Task processing duration
  • jobnik_worker_active_tasks - Currently processing tasks
  • jobnik_producer_jobs_created_total - Jobs created by producer

Scrape the /metrics endpoint (default port 8080).

Tracing

Enable distributed tracing in your config:

{
"telemetry": {
"tracing": {
"isEnabled": true,
"url": "http://otlp-collector:4318/v1/traces"
}
}
}

Traces automatically propagate through Jobs → Stages → Tasks.

Logging

Use structured logging provided by the SDK:

context.logger.info('Processing started', {
taskId: task.id,
fileName: task.data.fileName,
jobId: context.job.id,
});

Next Steps

Congratulations! You now have a complete Jobnik-based workflow.

What to do next:

  1. Review the Best Practices Guide for optimization tips
  2. Explore the Jobnik SDK API Documentation
  3. Check out the Jobnik Knowledge Base for architecture details
  4. Join the team discussions for questions and support

Common patterns to explore:

  • Multi-stage workflows: Chain multiple stages with dependencies
  • Priority-based processing: Use job priorities for SLA management
  • Dynamic task creation: Add tasks to stages during processing
  • Failure handling: Implement retry strategies and dead-letter queues
  • Monitoring dashboards: Create Grafana dashboards for your metrics

Need Help?

If you encounter issues or have questions: