Skip to main content

From Zero to Hero

Step-by-step guide on how to integrate the MapColonies™ Jobnik Job Management system into your distributed workflows.

Assumptions

This guide assumes that you have:

  • Basic understanding of distributed systems and asynchronous task processing
  • Familiarity with TypeScript and Node.js (>= 24.0.0)
  • Access to a Jobnik Manager instance
  • An understanding of your workflow requirements (jobs, stages, and tasks)

Before continuing, make sure you're familiar with the core concepts:

Steps

  1. Set up your project environment
  2. Define your custom job and stage types
  3. Implement a Producer service (creating work)
  4. Implement a Worker service (processing work)

Step 1: Set up your project environment

The fastest way to get started is using the Jobnik Worker Boilerplate:

# Clone the boilerplate
git clone git@github.com:MapColonies/jobnik-worker-boilerplate.git my-worker

cd my-worker

# Install dependencies
npm install

# Configure the Jobnik Manager URL
# Edit config/default.json or use environment variables

The boilerplate includes:

  • Pre-configured Jobnik SDK integration
  • Dependency injection setup with tsyringe
  • Observability (logging, metrics, tracing)
  • Docker and Helm deployment assets
  • Example logistics implementation

Option 2: Add SDK to existing project

If you have an existing service:

npm install @map-colonies/jobnik-sdk

Then integrate the SDK as shown in the sections below.


Step 2: Define your custom types

Type safety is a core feature of the Jobnik SDK. Define your domain-specific types to ensure compile-time validation.

Step 1: Create your types file

import type { IJobnikSDK } from '@map-colonies/jobnik-sdk';

// Define all your job types
export interface MyJobTypes {
'image-processing': {
data: {
uploadId: string;
userId: string;
totalFiles: number;
};
userMetadata: {
priority: 'HIGH' | 'MEDIUM' | 'LOW';
requestTimestamp: string;
};
};
'data-ingestion': {
data: {
sourceUrl: string;
targetBucket: string;
};
userMetadata: {
ingestionType: 'incremental' | 'full';
};
};
}

// Define all your stage types
export interface MyStageTypes {
'resize-images': {
data: {
targetWidth: number;
targetHeight: number;
quality: number;
};
userMetadata: {
format: 'jpg' | 'png' | 'webp';
};
task: {
data: {
sourceUrl: string;
targetPath: string;
fileName: string;
};
userMetadata: {
retryCount: number;
};
};
};
'generate-thumbnails': {
data: {
thumbnailSize: number;
};
userMetadata: Record<string, never>;
task: {
data: {
sourceImagePath: string;
thumbnailPath: string;
};
userMetadata: {
priority: number;
};
};
};
}

// Export typed SDK
export type MyJobnikSDK = IJobnikSDK<MyJobTypes, MyStageTypes>;
Type Safety Benefits

By defining these types, you get:

  • Autocomplete for job and stage names
  • Compile-time validation of data structures
  • Type-safe task handlers
  • Reduced runtime errors

Step 3: Implement a Producer Service

A Producer creates jobs and tasks. This is typically your API server or orchestration service.

Step 1: Initialize the SDK

import { JobnikSDK } from '@map-colonies/jobnik-sdk';
import { Registry } from 'prom-client';
import type { MyJobnikSDK } from '../types/jobnik.types';

export class ImageProcessingProducer {
private readonly sdk: MyJobnikSDK;

constructor(
private readonly jobnikBaseUrl: string,
private readonly metricsRegistry: Registry
) {
this.sdk = new JobnikSDK({
baseUrl: jobnikBaseUrl,
metricsRegistry: metricsRegistry,
});
}

public async createImageProcessingJob(
uploadId: string,
userId: string,
imageUrls: string[]
): Promise<string> {
const producer = this.sdk.getProducer();

// Create the Job
const job = await producer.createJob({
name: 'image-processing',
data: {
uploadId,
userId,
totalFiles: imageUrls.length,
},
userMetadata: {
priority: 'HIGH',
requestTimestamp: new Date().toISOString(),
},
priority: 'HIGH',
});

// Create a Stage for resizing
const resizeStage = await producer.createStage(job.id, {
type: 'resize-images',
data: {
targetWidth: 1920,
targetHeight: 1080,
quality: 85,
},
userMetadata: {
format: 'jpg',
},
});

// Create all tasks in a single batch call
await producer.createTasks(
resizeStage.id,
resizeStage.type,
imageUrls.map((url, index) => ({
data: {
sourceUrl: url,
targetPath: `/processed/${uploadId}/resized/${index}.jpg`,
fileName: `image_${index}.jpg`,
},
userMetadata: {
retryCount: 0,
},
}))
);

return job.id;
}
}

Step 2: Use the Producer in your API

import { Request, Response } from 'express';
import { ImageProcessingProducer } from '../producer/producer.service';

export class UploadController {
constructor(private readonly producer: ImageProcessingProducer) {}

public async handleUpload(req: Request, res: Response): Promise<void> {
const { userId, imageUrls } = req.body;
const uploadId = generateUploadId();

try {
const jobId = await this.producer.createImageProcessingJob(
uploadId,
userId,
imageUrls
);

res.status(202).json({
message: 'Processing started',
jobId,
uploadId,
});
} catch (error) {
res.status(500).json({ error: 'Failed to create job' });
}
}
}

Step 4: Implement a Worker Service

A Worker consumes tasks and executes your business logic.

Step 1: Create your task handler

import { injectable } from 'tsyringe';
import type { Task, TaskHandlerContext } from '@map-colonies/jobnik-sdk';
import type { MyJobTypes, MyStageTypes } from '../types/jobnik.types';

@injectable()
export class ImageResizeHandler {
public async handleResizeTask(
task: Task<MyStageTypes['resize-images']['task']>,
context: TaskHandlerContext<MyJobTypes, MyStageTypes, 'image-processing', 'resize-images'>
): Promise<void> {
const { sourceUrl, targetPath, fileName } = task.data;

context.logger.info('Starting image resize', {
taskId: task.id,
fileName,
});

try {
// Your actual image processing logic here
await this.resizeImage(sourceUrl, targetPath, context.stage.data);

// Check for graceful shutdown signal (worker calls stop())
// Note: Task abortion is NOT automatically implemented in the Manager.
// The AbortSignal is provided by the SDK when worker.stop() is called.
// You MUST implement cancellation checks in your task handler code.
if (context.signal.aborted) {
throw new Error('Task cancelled due to shutdown');
}

// Update stage metadata if needed
const currentProcessed = context.stage.userMetadata.processedCount || 0;
await context.updateStageUserMetadata({
...context.stage.userMetadata,
processedCount: currentProcessed + 1,
});

context.logger.info('Image resize completed', {
taskId: task.id,
fileName,
});
} catch (error) {
context.logger.error('Image resize failed', {
taskId: task.id,
fileName,
error,
});
throw error; // SDK will mark task as FAILED
}
}

private async resizeImage(
sourceUrl: string,
targetPath: string,
config: MyStageTypes['resize-images']['data']
): Promise<void> {
// Implement your actual image resizing logic
// This is where you'd use libraries like sharp, jimp, etc.
}
}

Step 2: Configure and start the Worker

import { container } from 'tsyringe';
import { JobnikSDK } from '@map-colonies/jobnik-sdk';
import { Registry } from 'prom-client';
import { ImageResizeHandler } from './image-resize.handler';
import type { MyJobnikSDK } from '../types/jobnik.types';

export async function setupWorker(
jobnikBaseUrl: string,
metricsRegistry: Registry
): Promise<void> {
// Initialize SDK
const sdk: MyJobnikSDK = new JobnikSDK({
baseUrl: jobnikBaseUrl,
metricsRegistry,
});

// Register handler
const handler = container.resolve(ImageResizeHandler);

// Create worker
const worker = sdk.createWorker<'image-processing', 'resize-images'>(
'resize-images',
handler.handleResizeTask.bind(handler),
{
concurrency: 5, // Process 5 tasks in parallel
backoffOptions: {
initialBaseRetryDelayMs: 1000,
maxDelayMs: 60000,
backoffFactor: 2,
},
}
);

// Start processing
await worker.start();
console.log('Worker started successfully');

// Graceful shutdown
const shutdown = async (): Promise<void> => {
console.log('Shutting down worker...');
await worker.stop();
console.log('Worker stopped');
process.exit(0);
};

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}

Step 5: Monitoring and Observability

Metrics

The SDK automatically exposes Prometheus metrics:

  • jobnik_worker_tasks_total - Total tasks processed
  • jobnik_worker_task_duration_seconds - Task processing duration
  • jobnik_worker_active_tasks - Currently processing tasks
  • jobnik_producer_jobs_created_total - Jobs created by producer

Scrape the /metrics endpoint (default port 8080).

Tracing

Enable distributed tracing in your config:

{
"telemetry": {
"tracing": {
"isEnabled": true,
"url": "http://otlp-collector:4318/v1/traces"
}
}
}

Traces automatically propagate through Jobs → Stages → Tasks.


Next Steps

Congratulations! You now have a complete Jobnik-based workflow.

What to do next:

  1. Review the Best Practices Guide for optimization tips
  2. Explore the Jobnik SDK API Documentation
  3. Check out the Jobnik Knowledge Base for architecture details
  4. Join the team discussions for questions and support

Common patterns to explore:

  • Multi-stage workflows: Chain multiple stages with dependencies
  • Priority-based processing: Use job priorities for SLA management
  • Dynamic task creation: Add tasks to stages during processing
  • Failure handling: Implement retry strategies and dead-letter queues
  • Monitoring dashboards: Create Grafana dashboards for your metrics

Need Help?

If you encounter issues or have questions: