mirror of
https://github.com/zoriya/drizzle-otel.git
synced 2025-12-06 00:46:09 +00:00
296 lines
10 KiB
Markdown
296 lines
10 KiB
Markdown
# @kubiks/otel-upstash-queues
|
|
|
|
OpenTelemetry instrumentation for the [Upstash QStash](https://upstash.com/docs/qstash) Node.js SDK.
|
|
Capture spans for every QStash API call, enrich them with operation metadata,
|
|
and keep an eye on message queue operations from your traces.
|
|
|
|

|
|
|
|
_Visualize your message queue operations with detailed span information including message publishing, callbacks, and delivery tracking._
|
|
|
|
## Installation
|
|
|
|
```bash
|
|
npm install @kubiks/otel-upstash-queues
|
|
# or
|
|
pnpm add @kubiks/otel-upstash-queues
|
|
```
|
|
|
|
**Peer Dependencies:** `@opentelemetry/api` >= 1.9.0, `@upstash/qstash` >= 2.0.0
|
|
|
|
## Quick Start
|
|
|
|
### Publishing Messages
|
|
|
|
```ts
|
|
import { Client } from "@upstash/qstash";
|
|
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";
|
|
|
|
const client = instrumentUpstash(
|
|
new Client({ token: process.env.QSTASH_TOKEN! })
|
|
);
|
|
|
|
await client.publishJSON({
|
|
url: "https://your-api-endpoint.com/process-image",
|
|
body: { imageId: "123" },
|
|
});
|
|
```
|
|
|
|
`instrumentUpstash` wraps the QStash client instance you already use — no configuration changes
|
|
needed. Every SDK call creates a client span with useful attributes.
|
|
|
|
### Consuming Messages
|
|
|
|
```ts
|
|
// app/api/process/route.ts
|
|
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
|
|
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";
|
|
|
|
async function handler(request: Request) {
|
|
const data = await request.json();
|
|
|
|
// Process your message
|
|
await processImage(data.imageId);
|
|
|
|
return Response.json({ success: true });
|
|
}
|
|
|
|
// Instrument first, then verify signature
|
|
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
|
|
```
|
|
|
|
`instrumentConsumer` wraps your message handler to trace message consumption, creating a SERVER span for each message received and processed.
|
|
|
|
### With Body Capture
|
|
|
|
Optionally capture request/response bodies for debugging:
|
|
|
|
```ts
|
|
const client = instrumentUpstash(
|
|
new Client({ token: process.env.QSTASH_TOKEN! }),
|
|
{
|
|
captureBody: true, // Enable body capture (default: false)
|
|
maxBodyLength: 2048, // Max characters to capture (default: 1024)
|
|
}
|
|
);
|
|
```
|
|
|
|
## What Gets Traced
|
|
|
|
This instrumentation provides two main functions:
|
|
|
|
1. **`instrumentUpstash`** - Wraps the QStash client to trace message publishing
|
|
2. **`instrumentConsumer`** - Wraps your message handler to trace message consumption
|
|
|
|
### Publisher Instrumentation
|
|
|
|
The `instrumentUpstash` function wraps the `client.publishJSON` method, creating a span with `SpanKind.CLIENT` for each message publish operation.
|
|
|
|
### Consumer Instrumentation
|
|
|
|
The `instrumentConsumer` function wraps your message handler, creating a span with `SpanKind.SERVER` for each message received and processed.
|
|
|
|
## Span Attributes
|
|
|
|
### Publisher Spans (`instrumentUpstash`)
|
|
|
|
| Attribute | Description | Example |
|
|
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
|
|
| `messaging.system` | Constant value `qstash` | `qstash` |
|
|
| `messaging.operation` | Operation type | `publish` |
|
|
| `qstash.resource` | Resource name | `messages` |
|
|
| `qstash.target` | Full operation target | `messages.publish` |
|
|
| `qstash.url` | Target URL for the message | `https://example.com/api/process` |
|
|
| `qstash.method` | HTTP method (default: POST) | `POST`, `PUT`, `GET` |
|
|
| `qstash.message_id` | Message ID returned by QStash | `msg_123` |
|
|
| `qstash.delay` | Delay before processing (seconds or string) | `60` or `"1h"` |
|
|
| `qstash.not_before` | Unix timestamp for earliest processing | `1672531200` |
|
|
| `qstash.deduplication_id` | Deduplication ID for idempotent operations | `unique-id-123` |
|
|
| `qstash.retries` | Number of retry attempts (max) | `3` |
|
|
| `qstash.callback_url` | Success callback URL | `https://example.com/callback` |
|
|
| `qstash.failure_callback_url` | Failure callback URL | `https://example.com/failure` |
|
|
|
|
### Consumer Spans (`instrumentConsumer`)
|
|
|
|
| Attribute | Description | Example |
|
|
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
|
|
| `messaging.system` | Constant value `qstash` | `qstash` |
|
|
| `messaging.operation` | Operation type | `receive` |
|
|
| `qstash.resource` | Resource name | `messages` |
|
|
| `qstash.target` | Full operation target | `messages.receive` |
|
|
| `qstash.message_id` | Message ID from QStash | `msg_456` |
|
|
| `qstash.retried` | Number of times retried (actual count) | `2` |
|
|
| `qstash.schedule_id` | Schedule ID (if from scheduled message) | `schedule_123` |
|
|
| `qstash.caller_ip` | IP address of the caller | `192.168.1.1` |
|
|
| `http.status_code` | HTTP response status code | `200` |
|
|
|
|
### Body/Payload Attributes (Optional)
|
|
|
|
When `captureBody` is enabled in configuration:
|
|
|
|
| Attribute | Description | Captured By |
|
|
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
|
|
| `qstash.request.body` | Request/message body content | Both publisher and consumer |
|
|
| `qstash.response.body` | Response body content | Consumer only |
|
|
|
|
The instrumentation captures message metadata and configuration to help with debugging and monitoring. Body capture is **disabled by default** to protect sensitive data.
|
|
|
|
## Usage Examples
|
|
|
|
### Basic Message Publishing
|
|
|
|
```ts
|
|
import { Client } from "@upstash/qstash";
|
|
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";
|
|
|
|
const client = instrumentUpstash(
|
|
new Client({ token: process.env.QSTASH_TOKEN! })
|
|
);
|
|
|
|
// Publish a message
|
|
await client.publishJSON({
|
|
url: "https://your-api.com/webhook",
|
|
body: {
|
|
userId: "user_123",
|
|
action: "process_data",
|
|
},
|
|
});
|
|
```
|
|
|
|
### Delayed Message Publishing
|
|
|
|
```ts
|
|
// Delay message processing by 60 seconds
|
|
await client.publishJSON({
|
|
url: "https://your-api.com/delayed-task",
|
|
body: { taskId: "task_456" },
|
|
delay: 60,
|
|
});
|
|
|
|
// Or use a human-readable delay
|
|
await client.publishJSON({
|
|
url: "https://your-api.com/delayed-task",
|
|
body: { taskId: "task_789" },
|
|
delay: "1h", // 1 hour
|
|
});
|
|
```
|
|
|
|
### Message with Callbacks
|
|
|
|
```ts
|
|
await client.publishJSON({
|
|
url: "https://your-api.com/process",
|
|
body: { orderId: "order_123" },
|
|
callback: "https://your-api.com/success",
|
|
failureCallback: "https://your-api.com/failure",
|
|
});
|
|
```
|
|
|
|
### Message with Retries and Deduplication
|
|
|
|
```ts
|
|
await client.publishJSON({
|
|
url: "https://your-api.com/critical-task",
|
|
body: { taskId: "critical_123" },
|
|
retries: 5,
|
|
deduplicationId: "task-critical-123",
|
|
});
|
|
```
|
|
|
|
### Scheduled Message
|
|
|
|
```ts
|
|
// Schedule for a specific time
|
|
const scheduledTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now
|
|
|
|
await client.publishJSON({
|
|
url: "https://your-api.com/scheduled-task",
|
|
body: { reportId: "report_456" },
|
|
notBefore: scheduledTime,
|
|
});
|
|
```
|
|
|
|
### Message Consumer Instrumentation
|
|
|
|
Use `instrumentConsumer` to trace your message handler that receives QStash messages:
|
|
|
|
```ts
|
|
// app/api/process/route.ts
|
|
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
|
|
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";
|
|
|
|
async function handler(request: Request) {
|
|
const data = await request.json();
|
|
|
|
// Process your message
|
|
await processImage(data.imageId);
|
|
|
|
return Response.json({ success: true });
|
|
}
|
|
|
|
// Instrument first, then verify signature
|
|
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
|
|
```
|
|
|
|
**With body capture:**
|
|
|
|
```ts
|
|
export const POST = verifySignatureAppRouter(
|
|
instrumentConsumer(handler, {
|
|
captureBody: true,
|
|
maxBodyLength: 2048,
|
|
})
|
|
);
|
|
```
|
|
|
|
The `instrumentConsumer` function:
|
|
- Extracts QStash headers (message ID, retry count, schedule ID, caller IP)
|
|
- Creates a SERVER span for the message processing
|
|
- Tracks response status codes
|
|
- Captures errors during processing
|
|
- Optionally captures request and response bodies (when configured)
|
|
|
|
### Complete Next.js Integration Example
|
|
|
|
**Publishing messages:**
|
|
```ts
|
|
// app/actions.ts
|
|
"use server";
|
|
import { Client } from "@upstash/qstash";
|
|
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";
|
|
|
|
const qstashClient = instrumentUpstash(
|
|
new Client({
|
|
token: process.env.QSTASH_TOKEN!,
|
|
})
|
|
);
|
|
|
|
export async function startBackgroundJob(imageId: string) {
|
|
await qstashClient.publishJSON({
|
|
url: "https://your-app.vercel.app/api/process",
|
|
body: { imageId },
|
|
});
|
|
}
|
|
```
|
|
|
|
**Receiving messages:**
|
|
```ts
|
|
// app/api/process/route.ts
|
|
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
|
|
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";
|
|
|
|
async function handler(request: Request) {
|
|
const { imageId } = await request.json();
|
|
|
|
// Your processing logic
|
|
await processImage(imageId);
|
|
|
|
return Response.json({ success: true });
|
|
}
|
|
|
|
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
|
|
```
|
|
|
|
## License
|
|
|
|
MIT |