From Zero to Hero
Step-by-step guide on how to integrate the MapColonies™ Jobnik Job Management system into your distributed workflows.
Assumptions
This guide assumes that you have:
- Basic understanding of distributed systems and asynchronous task processing
- Familiarity with TypeScript and Node.js (>= 24.0.0)
- Access to a Jobnik Manager instance
- An understanding of your workflow requirements (jobs, stages, and tasks)
Before continuing, make sure you're familiar with the core concepts:
- Concepts & Data Structures — the Job → Stage → Task hierarchy and entity fields
- State Transitions — what you can control vs. what the system manages
- Architecture Overview — the Manager, SDK, and Worker Boilerplate
Steps
- Set up your project environment
- Define your custom job and stage types
- Implement a Producer service (creating work)
- Implement a Worker service (processing work)
Step 1: Set up your project environment
Option 1: Start with the Worker Boilerplate (Recommended)
The fastest way to get started is using the Jobnik Worker Boilerplate:
# Clone the boilerplate
git clone git@github.com:MapColonies/jobnik-worker-boilerplate.git my-worker
cd my-worker
# Install dependencies
npm install
# Configure the Jobnik Manager URL
# Edit config/default.json or use environment variables
The boilerplate includes:
- Pre-configured Jobnik SDK integration
- Dependency injection setup with tsyringe
- Observability (logging, metrics, tracing)
- Docker and Helm deployment assets
- Example logistics implementation
Option 2: Add SDK to existing project
If you have an existing service:
npm install @map-colonies/jobnik-sdk
Then integrate the SDK as shown in the sections below.
Step 2: Define your custom types
Type safety is a core feature of the Jobnik SDK. Define your domain-specific types to ensure compile-time validation.
Step 1: Create your types file
- src/types/jobnik.types.ts
import type { IJobnikSDK } from '@map-colonies/jobnik-sdk';
// Define all your job types
export interface MyJobTypes {
'image-processing': {
data: {
uploadId: string;
userId: string;
totalFiles: number;
};
userMetadata: {
priority: 'HIGH' | 'MEDIUM' | 'LOW';
requestTimestamp: string;
};
};
'data-ingestion': {
data: {
sourceUrl: string;
targetBucket: string;
};
userMetadata: {
ingestionType: 'incremental' | 'full';
};
};
}
// Define all your stage types
export interface MyStageTypes {
'resize-images': {
data: {
targetWidth: number;
targetHeight: number;
quality: number;
};
userMetadata: {
format: 'jpg' | 'png' | 'webp';
};
task: {
data: {
sourceUrl: string;
targetPath: string;
fileName: string;
};
userMetadata: {
retryCount: number;
};
};
};
'generate-thumbnails': {
data: {
thumbnailSize: number;
};
userMetadata: Record<string, never>;
task: {
data: {
sourceImagePath: string;
thumbnailPath: string;
};
userMetadata: {
priority: number;
};
};
};
}
// Export typed SDK
export type MyJobnikSDK = IJobnikSDK<MyJobTypes, MyStageTypes>;
By defining these types, you get:
- Autocomplete for job and stage names
- Compile-time validation of data structures
- Type-safe task handlers
- Reduced runtime errors
Step 3: Implement a Producer Service
A Producer creates jobs and tasks. This is typically your API server or orchestration service.
Step 1: Initialize the SDK
- src/producer/producer.service.ts
import { JobnikSDK } from '@map-colonies/jobnik-sdk';
import { Registry } from 'prom-client';
import type { MyJobnikSDK } from '../types/jobnik.types';
export class ImageProcessingProducer {
private readonly sdk: MyJobnikSDK;
constructor(
private readonly jobnikBaseUrl: string,
private readonly metricsRegistry: Registry
) {
this.sdk = new JobnikSDK({
baseUrl: jobnikBaseUrl,
metricsRegistry: metricsRegistry,
});
}
public async createImageProcessingJob(
uploadId: string,
userId: string,
imageUrls: string[]
): Promise<string> {
const producer = this.sdk.getProducer();
// Create the Job
const job = await producer.createJob({
name: 'image-processing',
data: {
uploadId,
userId,
totalFiles: imageUrls.length,
},
userMetadata: {
priority: 'HIGH',
requestTimestamp: new Date().toISOString(),
},
priority: 'HIGH',
});
// Create a Stage for resizing
const resizeStage = await producer.createStage(job.id, {
type: 'resize-images',
data: {
targetWidth: 1920,
targetHeight: 1080,
quality: 85,
},
userMetadata: {
format: 'jpg',
},
});
// Create all tasks in a single batch call
await producer.createTasks(
resizeStage.id,
resizeStage.type,
imageUrls.map((url, index) => ({
data: {
sourceUrl: url,
targetPath: `/processed/${uploadId}/resized/${index}.jpg`,
fileName: `image_${index}.jpg`,
},
userMetadata: {
retryCount: 0,
},
}))
);
return job.id;
}
}
Step 2: Use the Producer in your API
- src/api/upload.controller.ts
import { Request, Response } from 'express';
import { ImageProcessingProducer } from '../producer/producer.service';
export class UploadController {
constructor(private readonly producer: ImageProcessingProducer) {}
public async handleUpload(req: Request, res: Response): Promise<void> {
const { userId, imageUrls } = req.body;
const uploadId = generateUploadId();
try {
const jobId = await this.producer.createImageProcessingJob(
uploadId,
userId,
imageUrls
);
res.status(202).json({
message: 'Processing started',
jobId,
uploadId,
});
} catch (error) {
res.status(500).json({ error: 'Failed to create job' });
}
}
}
Step 4: Implement a Worker Service
A Worker consumes tasks and executes your business logic.
Step 1: Create your task handler
- src/worker/image-resize.handler.ts
import { injectable } from 'tsyringe';
import type { Task, TaskHandlerContext } from '@map-colonies/jobnik-sdk';
import type { MyJobTypes, MyStageTypes } from '../types/jobnik.types';
@injectable()
export class ImageResizeHandler {
public async handleResizeTask(
task: Task<MyStageTypes['resize-images']['task']>,
context: TaskHandlerContext<MyJobTypes, MyStageTypes, 'image-processing', 'resize-images'>
): Promise<void> {
const { sourceUrl, targetPath, fileName } = task.data;
context.logger.info('Starting image resize', {
taskId: task.id,
fileName,
});
try {
// Your actual image processing logic here
await this.resizeImage(sourceUrl, targetPath, context.stage.data);
// Check for graceful shutdown signal (worker calls stop())
// Note: Task abortion is NOT automatically implemented in the Manager.
// The AbortSignal is provided by the SDK when worker.stop() is called.
// You MUST implement cancellation checks in your task handler code.
if (context.signal.aborted) {
throw new Error('Task cancelled due to shutdown');
}
// Update stage metadata if needed
const currentProcessed = context.stage.userMetadata.processedCount || 0;
await context.updateStageUserMetadata({
...context.stage.userMetadata,
processedCount: currentProcessed + 1,
});
context.logger.info('Image resize completed', {
taskId: task.id,
fileName,
});
} catch (error) {
context.logger.error('Image resize failed', {
taskId: task.id,
fileName,
error,
});
throw error; // SDK will mark task as FAILED
}
}
private async resizeImage(
sourceUrl: string,
targetPath: string,
config: MyStageTypes['resize-images']['data']
): Promise<void> {
// Implement your actual image resizing logic
// This is where you'd use libraries like sharp, jimp, etc.
}
}
Step 2: Configure and start the Worker
- src/worker/worker.setup.ts
import { container } from 'tsyringe';
import { JobnikSDK } from '@map-colonies/jobnik-sdk';
import { Registry } from 'prom-client';
import { ImageResizeHandler } from './image-resize.handler';
import type { MyJobnikSDK } from '../types/jobnik.types';
export async function setupWorker(
jobnikBaseUrl: string,
metricsRegistry: Registry
): Promise<void> {
// Initialize SDK
const sdk: MyJobnikSDK = new JobnikSDK({
baseUrl: jobnikBaseUrl,
metricsRegistry,
});
// Register handler
const handler = container.resolve(ImageResizeHandler);
// Create worker
const worker = sdk.createWorker<'image-processing', 'resize-images'>(
'resize-images',
handler.handleResizeTask.bind(handler),
{
concurrency: 5, // Process 5 tasks in parallel
backoffOptions: {
initialBaseRetryDelayMs: 1000,
maxDelayMs: 60000,
backoffFactor: 2,
},
}
);
// Start processing
await worker.start();
console.log('Worker started successfully');
// Graceful shutdown
const shutdown = async (): Promise<void> => {
console.log('Shutting down worker...');
await worker.stop();
console.log('Worker stopped');
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
Step 5: Monitoring and Observability
Metrics
The SDK automatically exposes Prometheus metrics:
jobnik_worker_tasks_total- Total tasks processedjobnik_worker_task_duration_seconds- Task processing durationjobnik_worker_active_tasks- Currently processing tasksjobnik_producer_jobs_created_total- Jobs created by producer
Scrape the /metrics endpoint (default port 8080).
Tracing
Enable distributed tracing in your config:
{
"telemetry": {
"tracing": {
"isEnabled": true,
"url": "http://otlp-collector:4318/v1/traces"
}
}
}
Traces automatically propagate through Jobs → Stages → Tasks.
Next Steps
Congratulations! You now have a complete Jobnik-based workflow.
What to do next:
- Review the Best Practices Guide for optimization tips
- Explore the Jobnik SDK API Documentation
- Check out the Jobnik Knowledge Base for architecture details
- Join the team discussions for questions and support
Common patterns to explore:
- Multi-stage workflows: Chain multiple stages with dependencies
- Priority-based processing: Use job priorities for SLA management
- Dynamic task creation: Add tasks to stages during processing
- Failure handling: Implement retry strategies and dead-letter queues
- Monitoring dashboards: Create Grafana dashboards for your metrics
If you encounter issues or have questions:
- Check the Jobnik Manager Repository
- Review the Jobnik SDK Repository
- Consult the team on Slack or your communication platform