Files
Alex Holovach 4f731b4949 update README
2025-10-07 10:26:02 -05:00

296 lines
10 KiB
Markdown

# @kubiks/otel-upstash-queues
OpenTelemetry instrumentation for the [Upstash QStash](https://upstash.com/docs/qstash) Node.js SDK.
Capture spans for every QStash API call, enrich them with operation metadata,
and keep an eye on message queue operations from your traces.
![Upstash QStash Trace Visualization](https://github.com/kubiks-inc/otel/blob/main/images/otel-upstash-queue-trace.png)
_Visualize your message queue operations with detailed span information including message publishing, callbacks, and delivery tracking._
## Installation
```bash
npm install @kubiks/otel-upstash-queues
# or
pnpm add @kubiks/otel-upstash-queues
```
**Peer Dependencies:** `@opentelemetry/api` >= 1.9.0, `@upstash/qstash` >= 2.0.0
## Quick Start
### Publishing Messages
```ts
import { Client } from "@upstash/qstash";
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";
const client = instrumentUpstash(
new Client({ token: process.env.QSTASH_TOKEN! })
);
await client.publishJSON({
url: "https://your-api-endpoint.com/process-image",
body: { imageId: "123" },
});
```
`instrumentUpstash` wraps the QStash client instance you already use — no configuration changes
needed. Every SDK call creates a client span with useful attributes.
### Consuming Messages
```ts
// app/api/process/route.ts
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";
async function handler(request: Request) {
const data = await request.json();
// Process your message
await processImage(data.imageId);
return Response.json({ success: true });
}
// Instrument first, then verify signature
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
```
`instrumentConsumer` wraps your message handler to trace message consumption, creating a SERVER span for each message received and processed.
### With Body Capture
Optionally capture request/response bodies for debugging:
```ts
const client = instrumentUpstash(
new Client({ token: process.env.QSTASH_TOKEN! }),
{
captureBody: true, // Enable body capture (default: false)
maxBodyLength: 2048, // Max characters to capture (default: 1024)
}
);
```
## What Gets Traced
This instrumentation provides two main functions:
1. **`instrumentUpstash`** - Wraps the QStash client to trace message publishing
2. **`instrumentConsumer`** - Wraps your message handler to trace message consumption
### Publisher Instrumentation
The `instrumentUpstash` function wraps the `client.publishJSON` method, creating a span with `SpanKind.CLIENT` for each message publish operation.
### Consumer Instrumentation
The `instrumentConsumer` function wraps your message handler, creating a span with `SpanKind.SERVER` for each message received and processed.
## Span Attributes
### Publisher Spans (`instrumentUpstash`)
| Attribute | Description | Example |
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
| `messaging.system` | Constant value `qstash` | `qstash` |
| `messaging.operation` | Operation type | `publish` |
| `qstash.resource` | Resource name | `messages` |
| `qstash.target` | Full operation target | `messages.publish` |
| `qstash.url` | Target URL for the message | `https://example.com/api/process` |
| `qstash.method` | HTTP method (default: POST) | `POST`, `PUT`, `GET` |
| `qstash.message_id` | Message ID returned by QStash | `msg_123` |
| `qstash.delay` | Delay before processing (seconds or string) | `60` or `"1h"` |
| `qstash.not_before` | Unix timestamp for earliest processing | `1672531200` |
| `qstash.deduplication_id` | Deduplication ID for idempotent operations | `unique-id-123` |
| `qstash.retries` | Number of retry attempts (max) | `3` |
| `qstash.callback_url` | Success callback URL | `https://example.com/callback` |
| `qstash.failure_callback_url` | Failure callback URL | `https://example.com/failure` |
### Consumer Spans (`instrumentConsumer`)
| Attribute | Description | Example |
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
| `messaging.system` | Constant value `qstash` | `qstash` |
| `messaging.operation` | Operation type | `receive` |
| `qstash.resource` | Resource name | `messages` |
| `qstash.target` | Full operation target | `messages.receive` |
| `qstash.message_id` | Message ID from QStash | `msg_456` |
| `qstash.retried` | Number of times retried (actual count) | `2` |
| `qstash.schedule_id` | Schedule ID (if from scheduled message) | `schedule_123` |
| `qstash.caller_ip` | IP address of the caller | `192.168.1.1` |
| `http.status_code` | HTTP response status code | `200` |
### Body/Payload Attributes (Optional)
When `captureBody` is enabled in configuration:
| Attribute | Description | Captured By |
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
| `qstash.request.body` | Request/message body content | Both publisher and consumer |
| `qstash.response.body` | Response body content | Consumer only |
The instrumentation captures message metadata and configuration to help with debugging and monitoring. Body capture is **disabled by default** to protect sensitive data.
## Usage Examples
### Basic Message Publishing
```ts
import { Client } from "@upstash/qstash";
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";
const client = instrumentUpstash(
new Client({ token: process.env.QSTASH_TOKEN! })
);
// Publish a message
await client.publishJSON({
url: "https://your-api.com/webhook",
body: {
userId: "user_123",
action: "process_data",
},
});
```
### Delayed Message Publishing
```ts
// Delay message processing by 60 seconds
await client.publishJSON({
url: "https://your-api.com/delayed-task",
body: { taskId: "task_456" },
delay: 60,
});
// Or use a human-readable delay
await client.publishJSON({
url: "https://your-api.com/delayed-task",
body: { taskId: "task_789" },
delay: "1h", // 1 hour
});
```
### Message with Callbacks
```ts
await client.publishJSON({
url: "https://your-api.com/process",
body: { orderId: "order_123" },
callback: "https://your-api.com/success",
failureCallback: "https://your-api.com/failure",
});
```
### Message with Retries and Deduplication
```ts
await client.publishJSON({
url: "https://your-api.com/critical-task",
body: { taskId: "critical_123" },
retries: 5,
deduplicationId: "task-critical-123",
});
```
### Scheduled Message
```ts
// Schedule for a specific time
const scheduledTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now
await client.publishJSON({
url: "https://your-api.com/scheduled-task",
body: { reportId: "report_456" },
notBefore: scheduledTime,
});
```
### Message Consumer Instrumentation
Use `instrumentConsumer` to trace your message handler that receives QStash messages:
```ts
// app/api/process/route.ts
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";
async function handler(request: Request) {
const data = await request.json();
// Process your message
await processImage(data.imageId);
return Response.json({ success: true });
}
// Instrument first, then verify signature
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
```
**With body capture:**
```ts
export const POST = verifySignatureAppRouter(
instrumentConsumer(handler, {
captureBody: true,
maxBodyLength: 2048,
})
);
```
The `instrumentConsumer` function:
- Extracts QStash headers (message ID, retry count, schedule ID, caller IP)
- Creates a SERVER span for the message processing
- Tracks response status codes
- Captures errors during processing
- Optionally captures request and response bodies (when configured)
### Complete Next.js Integration Example
**Publishing messages:**
```ts
// app/actions.ts
"use server";
import { Client } from "@upstash/qstash";
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";
const qstashClient = instrumentUpstash(
new Client({
token: process.env.QSTASH_TOKEN!,
})
);
export async function startBackgroundJob(imageId: string) {
await qstashClient.publishJSON({
url: "https://your-app.vercel.app/api/process",
body: { imageId },
});
}
```
**Receiving messages:**
```ts
// app/api/process/route.ts
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";
async function handler(request: Request) {
const { imageId } = await request.json();
// Your processing logic
await processImage(imageId);
return Response.json({ success: true });
}
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
```
## License
MIT