mirror of
https://github.com/zoriya/drizzle-otel.git
synced 2025-12-06 00:46:09 +00:00
@@ -15,6 +15,7 @@ Our goal is to bring the TypeScript ecosystem the observability tools it’s bee
|
||||
- [`@kubiks/otel-better-auth`](./packages/otel-better-auth/README.md)
|
||||
- [`@kubiks/otel-drizzle`](./packages/otel-drizzle/README.md)
|
||||
- [`@kubiks/otel-resend`](./packages/otel-resend/README.md)
|
||||
- [`@kubiks/otel-upstash-queues`](./packages/otel-upstash-queues/README.md)
|
||||
|
||||
---
|
||||
|
||||
|
||||
BIN
images/otel-upstash-queue-trace.png
Normal file
BIN
images/otel-upstash-queue-trace.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 2.4 MiB |
36
packages/otel-upstash-queues/CHANGELOG.md
Normal file
36
packages/otel-upstash-queues/CHANGELOG.md
Normal file
@@ -0,0 +1,36 @@
|
||||
# @kubiks/otel-upstash-queues
|
||||
|
||||
## 1.0.0
|
||||
|
||||
### Major Changes
|
||||
|
||||
- Initial release of OpenTelemetry instrumentation for Upstash QStash (Queues)
|
||||
- **Publisher Instrumentation** (`instrumentUpstash`):
|
||||
- Instrumentation for `publishJSON` method
|
||||
- Support for all QStash request parameters including:
|
||||
- URL targeting
|
||||
- Delayed and scheduled messages
|
||||
- Deduplication
|
||||
- Retries configuration
|
||||
- Callback URLs
|
||||
- Custom HTTP methods
|
||||
- Optional configuration support:
|
||||
- `captureBody`: Enable/disable request body capture (default: false)
|
||||
- `maxBodyLength`: Limit captured body size (default: 1024)
|
||||
- **Consumer Instrumentation** (`instrumentConsumer`):
|
||||
- Instrumentation for message handler functions
|
||||
- Extracts QStash headers (message ID, retry count, schedule ID, caller IP)
|
||||
- Tracks HTTP response status codes
|
||||
- Works seamlessly with `verifySignatureAppRouter` from `@upstash/qstash/nextjs`
|
||||
- Optional configuration support:
|
||||
- `captureBody`: Enable/disable request and response body capture (default: false)
|
||||
- `maxBodyLength`: Limit captured body size (default: 1024)
|
||||
- **Body Capture Features**:
|
||||
- Safe serialization with error handling
|
||||
- Automatic truncation of large bodies
|
||||
- Disabled by default for security
|
||||
- Captured as `qstash.request.body` and `qstash.response.body` attributes
|
||||
- Comprehensive test coverage (27 tests)
|
||||
- Full TypeScript support with proper types from @upstash/qstash SDK
|
||||
- No `any` or `unknown` types - fully type-safe
|
||||
- Exported `InstrumentationConfig` type for TypeScript users
|
||||
21
packages/otel-upstash-queues/LICENSE
Normal file
21
packages/otel-upstash-queues/LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Kubiks
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
296
packages/otel-upstash-queues/README.md
Normal file
296
packages/otel-upstash-queues/README.md
Normal file
@@ -0,0 +1,296 @@
|
||||
# @kubiks/otel-upstash-queues
|
||||
|
||||
OpenTelemetry instrumentation for the [Upstash QStash](https://upstash.com/docs/qstash) Node.js SDK.
|
||||
Capture spans for every QStash API call, enrich them with operation metadata,
|
||||
and keep an eye on message queue operations from your traces.
|
||||
|
||||

|
||||
|
||||
_Visualize your message queue operations with detailed span information including message publishing, callbacks, and delivery tracking._
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
npm install @kubiks/otel-upstash-queues
|
||||
# or
|
||||
pnpm add @kubiks/otel-upstash-queues
|
||||
```
|
||||
|
||||
**Peer Dependencies:** `@opentelemetry/api` >= 1.9.0, `@upstash/qstash` >= 2.0.0
|
||||
|
||||
## Quick Start
|
||||
|
||||
### Publishing Messages
|
||||
|
||||
```ts
|
||||
import { Client } from "@upstash/qstash";
|
||||
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";
|
||||
|
||||
const client = instrumentUpstash(
|
||||
new Client({ token: process.env.QSTASH_TOKEN! })
|
||||
);
|
||||
|
||||
await client.publishJSON({
|
||||
url: "https://your-api-endpoint.com/process-image",
|
||||
body: { imageId: "123" },
|
||||
});
|
||||
```
|
||||
|
||||
`instrumentUpstash` wraps the QStash client instance you already use — no configuration changes
|
||||
needed. Every SDK call creates a client span with useful attributes.
|
||||
|
||||
### Consuming Messages
|
||||
|
||||
```ts
|
||||
// app/api/process/route.ts
|
||||
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
|
||||
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";
|
||||
|
||||
async function handler(request: Request) {
|
||||
const data = await request.json();
|
||||
|
||||
// Process your message
|
||||
await processImage(data.imageId);
|
||||
|
||||
return Response.json({ success: true });
|
||||
}
|
||||
|
||||
// Instrument first, then verify signature
|
||||
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
|
||||
```
|
||||
|
||||
`instrumentConsumer` wraps your message handler to trace message consumption, creating a SERVER span for each message received and processed.
|
||||
|
||||
### With Body Capture
|
||||
|
||||
Optionally capture request/response bodies for debugging:
|
||||
|
||||
```ts
|
||||
const client = instrumentUpstash(
|
||||
new Client({ token: process.env.QSTASH_TOKEN! }),
|
||||
{
|
||||
captureBody: true, // Enable body capture (default: false)
|
||||
maxBodyLength: 2048, // Max characters to capture (default: 1024)
|
||||
}
|
||||
);
|
||||
```
|
||||
|
||||
## What Gets Traced
|
||||
|
||||
This instrumentation provides two main functions:
|
||||
|
||||
1. **`instrumentUpstash`** - Wraps the QStash client to trace message publishing
|
||||
2. **`instrumentConsumer`** - Wraps your message handler to trace message consumption
|
||||
|
||||
### Publisher Instrumentation
|
||||
|
||||
The `instrumentUpstash` function wraps the `client.publishJSON` method, creating a span with `SpanKind.CLIENT` for each message publish operation.
|
||||
|
||||
### Consumer Instrumentation
|
||||
|
||||
The `instrumentConsumer` function wraps your message handler, creating a span with `SpanKind.SERVER` for each message received and processed.
|
||||
|
||||
## Span Attributes
|
||||
|
||||
### Publisher Spans (`instrumentUpstash`)
|
||||
|
||||
| Attribute | Description | Example |
|
||||
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
|
||||
| `messaging.system` | Constant value `qstash` | `qstash` |
|
||||
| `messaging.operation` | Operation type | `publish` |
|
||||
| `qstash.resource` | Resource name | `messages` |
|
||||
| `qstash.target` | Full operation target | `messages.publish` |
|
||||
| `qstash.url` | Target URL for the message | `https://example.com/api/process` |
|
||||
| `qstash.method` | HTTP method (default: POST) | `POST`, `PUT`, `GET` |
|
||||
| `qstash.message_id` | Message ID returned by QStash | `msg_123` |
|
||||
| `qstash.delay` | Delay before processing (seconds or string) | `60` or `"1h"` |
|
||||
| `qstash.not_before` | Unix timestamp for earliest processing | `1672531200` |
|
||||
| `qstash.deduplication_id` | Deduplication ID for idempotent operations | `unique-id-123` |
|
||||
| `qstash.retries` | Number of retry attempts (max) | `3` |
|
||||
| `qstash.callback_url` | Success callback URL | `https://example.com/callback` |
|
||||
| `qstash.failure_callback_url` | Failure callback URL | `https://example.com/failure` |
|
||||
|
||||
### Consumer Spans (`instrumentConsumer`)
|
||||
|
||||
| Attribute | Description | Example |
|
||||
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
|
||||
| `messaging.system` | Constant value `qstash` | `qstash` |
|
||||
| `messaging.operation` | Operation type | `receive` |
|
||||
| `qstash.resource` | Resource name | `messages` |
|
||||
| `qstash.target` | Full operation target | `messages.receive` |
|
||||
| `qstash.message_id` | Message ID from QStash | `msg_456` |
|
||||
| `qstash.retried` | Number of times retried (actual count) | `2` |
|
||||
| `qstash.schedule_id` | Schedule ID (if from scheduled message) | `schedule_123` |
|
||||
| `qstash.caller_ip` | IP address of the caller | `192.168.1.1` |
|
||||
| `http.status_code` | HTTP response status code | `200` |
|
||||
|
||||
### Body/Payload Attributes (Optional)
|
||||
|
||||
When `captureBody` is enabled in configuration:
|
||||
|
||||
| Attribute | Description | Captured By |
|
||||
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
|
||||
| `qstash.request.body` | Request/message body content | Both publisher and consumer |
|
||||
| `qstash.response.body` | Response body content | Consumer only |
|
||||
|
||||
The instrumentation captures message metadata and configuration to help with debugging and monitoring. Body capture is **disabled by default** to protect sensitive data.
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Basic Message Publishing
|
||||
|
||||
```ts
|
||||
import { Client } from "@upstash/qstash";
|
||||
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";
|
||||
|
||||
const client = instrumentUpstash(
|
||||
new Client({ token: process.env.QSTASH_TOKEN! })
|
||||
);
|
||||
|
||||
// Publish a message
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/webhook",
|
||||
body: {
|
||||
userId: "user_123",
|
||||
action: "process_data",
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
### Delayed Message Publishing
|
||||
|
||||
```ts
|
||||
// Delay message processing by 60 seconds
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/delayed-task",
|
||||
body: { taskId: "task_456" },
|
||||
delay: 60,
|
||||
});
|
||||
|
||||
// Or use a human-readable delay
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/delayed-task",
|
||||
body: { taskId: "task_789" },
|
||||
delay: "1h", // 1 hour
|
||||
});
|
||||
```
|
||||
|
||||
### Message with Callbacks
|
||||
|
||||
```ts
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/process",
|
||||
body: { orderId: "order_123" },
|
||||
callback: "https://your-api.com/success",
|
||||
failureCallback: "https://your-api.com/failure",
|
||||
});
|
||||
```
|
||||
|
||||
### Message with Retries and Deduplication
|
||||
|
||||
```ts
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/critical-task",
|
||||
body: { taskId: "critical_123" },
|
||||
retries: 5,
|
||||
deduplicationId: "task-critical-123",
|
||||
});
|
||||
```
|
||||
|
||||
### Scheduled Message
|
||||
|
||||
```ts
|
||||
// Schedule for a specific time
|
||||
const scheduledTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now
|
||||
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/scheduled-task",
|
||||
body: { reportId: "report_456" },
|
||||
notBefore: scheduledTime,
|
||||
});
|
||||
```
|
||||
|
||||
### Message Consumer Instrumentation
|
||||
|
||||
Use `instrumentConsumer` to trace your message handler that receives QStash messages:
|
||||
|
||||
```ts
|
||||
// app/api/process/route.ts
|
||||
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
|
||||
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";
|
||||
|
||||
async function handler(request: Request) {
|
||||
const data = await request.json();
|
||||
|
||||
// Process your message
|
||||
await processImage(data.imageId);
|
||||
|
||||
return Response.json({ success: true });
|
||||
}
|
||||
|
||||
// Instrument first, then verify signature
|
||||
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
|
||||
```
|
||||
|
||||
**With body capture:**
|
||||
|
||||
```ts
|
||||
export const POST = verifySignatureAppRouter(
|
||||
instrumentConsumer(handler, {
|
||||
captureBody: true,
|
||||
maxBodyLength: 2048,
|
||||
})
|
||||
);
|
||||
```
|
||||
|
||||
The `instrumentConsumer` function:
|
||||
- Extracts QStash headers (message ID, retry count, schedule ID, caller IP)
|
||||
- Creates a SERVER span for the message processing
|
||||
- Tracks response status codes
|
||||
- Captures errors during processing
|
||||
- Optionally captures request and response bodies (when configured)
|
||||
|
||||
### Complete Next.js Integration Example
|
||||
|
||||
**Publishing messages:**
|
||||
```ts
|
||||
// app/actions.ts
|
||||
"use server";
|
||||
import { Client } from "@upstash/qstash";
|
||||
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";
|
||||
|
||||
const qstashClient = instrumentUpstash(
|
||||
new Client({
|
||||
token: process.env.QSTASH_TOKEN!,
|
||||
})
|
||||
);
|
||||
|
||||
export async function startBackgroundJob(imageId: string) {
|
||||
await qstashClient.publishJSON({
|
||||
url: "https://your-app.vercel.app/api/process",
|
||||
body: { imageId },
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
**Receiving messages:**
|
||||
```ts
|
||||
// app/api/process/route.ts
|
||||
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
|
||||
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";
|
||||
|
||||
async function handler(request: Request) {
|
||||
const { imageId } = await request.json();
|
||||
|
||||
// Your processing logic
|
||||
await processImage(imageId);
|
||||
|
||||
return Response.json({ success: true });
|
||||
}
|
||||
|
||||
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
|
||||
```
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
53
packages/otel-upstash-queues/package.json
Normal file
53
packages/otel-upstash-queues/package.json
Normal file
@@ -0,0 +1,53 @@
|
||||
{
|
||||
"name": "@kubiks/otel-upstash-queues",
|
||||
"version": "1.0.1",
|
||||
"private": false,
|
||||
"publishConfig": {
|
||||
"access": "public"
|
||||
},
|
||||
"description": "OpenTelemetry instrumentation for the Upstash QStash (Queues) Node.js SDK",
|
||||
"author": "Kubiks",
|
||||
"license": "MIT",
|
||||
"repository": "kubiks-inc/otel",
|
||||
"sideEffects": false,
|
||||
"type": "module",
|
||||
"exports": {
|
||||
".": {
|
||||
"types": "./dist/types/index.d.ts",
|
||||
"import": "./dist/index.js",
|
||||
"default": "./dist/index.js"
|
||||
}
|
||||
},
|
||||
"main": "./dist/index.js",
|
||||
"types": "./dist/types/index.d.ts",
|
||||
"files": [
|
||||
"dist",
|
||||
"LICENSE",
|
||||
"README.md"
|
||||
],
|
||||
"engines": {
|
||||
"node": "^18.19.0 || >=20.6.0"
|
||||
},
|
||||
"scripts": {
|
||||
"build": "pnpm clean && tsc",
|
||||
"clean": "rimraf dist",
|
||||
"prepublishOnly": "pnpm build",
|
||||
"type-check": "tsc --noEmit",
|
||||
"unit-test": "vitest --run",
|
||||
"unit-test-watch": "vitest"
|
||||
},
|
||||
"dependencies": {},
|
||||
"devDependencies": {
|
||||
"@opentelemetry/api": "^1.9.0",
|
||||
"@opentelemetry/sdk-trace-base": "^2.1.0",
|
||||
"@types/node": "18.15.11",
|
||||
"@upstash/qstash": "^2.0.0",
|
||||
"rimraf": "3.0.2",
|
||||
"typescript": "^5",
|
||||
"vitest": "0.33.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@opentelemetry/api": ">=1.9.0 <2.0.0",
|
||||
"@upstash/qstash": ">=2.0.0"
|
||||
}
|
||||
}
|
||||
787
packages/otel-upstash-queues/src/index.test.ts
Normal file
787
packages/otel-upstash-queues/src/index.test.ts
Normal file
@@ -0,0 +1,787 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { SpanStatusCode, trace } from "@opentelemetry/api";
|
||||
import {
|
||||
BasicTracerProvider,
|
||||
InMemorySpanExporter,
|
||||
SimpleSpanProcessor,
|
||||
} from "@opentelemetry/sdk-trace-base";
|
||||
import type { Client } from "@upstash/qstash";
|
||||
import {
|
||||
instrumentUpstash,
|
||||
instrumentConsumer,
|
||||
SEMATTRS_MESSAGING_OPERATION,
|
||||
SEMATTRS_MESSAGING_SYSTEM,
|
||||
SEMATTRS_QSTASH_CALLBACK_URL,
|
||||
SEMATTRS_QSTASH_DEDUPLICATION_ID,
|
||||
SEMATTRS_QSTASH_DELAY,
|
||||
SEMATTRS_QSTASH_FAILURE_CALLBACK_URL,
|
||||
SEMATTRS_QSTASH_MESSAGE_ID,
|
||||
SEMATTRS_QSTASH_METHOD,
|
||||
SEMATTRS_QSTASH_NOT_BEFORE,
|
||||
SEMATTRS_QSTASH_RESOURCE,
|
||||
SEMATTRS_QSTASH_RETRIES,
|
||||
SEMATTRS_QSTASH_TARGET,
|
||||
SEMATTRS_QSTASH_URL,
|
||||
SEMATTRS_QSTASH_RETRIED,
|
||||
SEMATTRS_QSTASH_SCHEDULE_ID,
|
||||
SEMATTRS_QSTASH_CALLER_IP,
|
||||
SEMATTRS_HTTP_STATUS_CODE,
|
||||
SEMATTRS_QSTASH_REQUEST_BODY,
|
||||
SEMATTRS_QSTASH_RESPONSE_BODY,
|
||||
} from "./index";
|
||||
|
||||
describe("instrumentUpstash", () => {
|
||||
let provider: BasicTracerProvider;
|
||||
let exporter: InMemorySpanExporter;
|
||||
|
||||
beforeEach(() => {
|
||||
exporter = new InMemorySpanExporter();
|
||||
provider = new BasicTracerProvider({
|
||||
spanProcessors: [new SimpleSpanProcessor(exporter)],
|
||||
});
|
||||
trace.setGlobalTracerProvider(provider);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await provider.shutdown();
|
||||
exporter.reset();
|
||||
trace.disable();
|
||||
});
|
||||
|
||||
const createMockClient = (): Client => {
|
||||
const mockClient = {
|
||||
publishJSON: vi.fn(async (request: any) => ({
|
||||
messageId: "msg_123",
|
||||
})),
|
||||
} as unknown as Client;
|
||||
|
||||
return mockClient;
|
||||
};
|
||||
|
||||
it("wraps publishJSON and records spans", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { hello: "world" },
|
||||
};
|
||||
|
||||
const response = await client.publishJSON(request);
|
||||
expect(response.messageId).toBe("msg_123");
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.name).toBe("qstash.messages.publish");
|
||||
expect(span.attributes[SEMATTRS_MESSAGING_SYSTEM]).toBe("qstash");
|
||||
expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("publish");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RESOURCE]).toBe("messages");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_TARGET]).toBe("messages.publish");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/process");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("POST");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123");
|
||||
expect(span.status.code).toBe(SpanStatusCode.OK);
|
||||
});
|
||||
|
||||
it("captures request with delay", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/delayed",
|
||||
body: { task: "process" },
|
||||
delay: 60,
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_DELAY]).toBe(60);
|
||||
expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/delayed");
|
||||
});
|
||||
|
||||
it("captures request with delay as string", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/delayed",
|
||||
body: { task: "process" },
|
||||
delay: "1h",
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_DELAY]).toBe("1h");
|
||||
});
|
||||
|
||||
it("captures request with notBefore", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const notBefore = Math.floor(Date.now() / 1000) + 3600;
|
||||
const request = {
|
||||
url: "https://example.com/api/scheduled",
|
||||
body: { task: "process" },
|
||||
notBefore,
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_NOT_BEFORE]).toBe(notBefore);
|
||||
});
|
||||
|
||||
it("captures request with deduplication ID", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { data: "test" },
|
||||
deduplicationId: "unique-id-123",
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_DEDUPLICATION_ID]).toBe("unique-id-123");
|
||||
});
|
||||
|
||||
it("captures request with custom method", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { data: "test" },
|
||||
method: "PUT" as const,
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("PUT");
|
||||
});
|
||||
|
||||
|
||||
it("captures request with retries", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { data: "test" },
|
||||
retries: 3,
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RETRIES]).toBe(3);
|
||||
});
|
||||
|
||||
it("captures request with callback URLs", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { data: "test" },
|
||||
callback: "https://example.com/api/callback",
|
||||
failureCallback: "https://example.com/api/failure",
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_CALLBACK_URL]).toBe("https://example.com/api/callback");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_FAILURE_CALLBACK_URL]).toBe("https://example.com/api/failure");
|
||||
});
|
||||
|
||||
it("captures all request properties together", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/complex",
|
||||
body: { data: "complex" },
|
||||
method: "POST",
|
||||
delay: 120,
|
||||
deduplicationId: "complex-id-456",
|
||||
contentType: "application/json",
|
||||
retries: 5,
|
||||
callback: "https://example.com/callback",
|
||||
failureCallback: "https://example.com/failure",
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/complex");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("POST");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_DELAY]).toBe(120);
|
||||
expect(span.attributes[SEMATTRS_QSTASH_DEDUPLICATION_ID]).toBe("complex-id-456");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RETRIES]).toBe(5);
|
||||
expect(span.attributes[SEMATTRS_QSTASH_CALLBACK_URL]).toBe("https://example.com/callback");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_FAILURE_CALLBACK_URL]).toBe("https://example.com/failure");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123");
|
||||
});
|
||||
|
||||
it("captures errors and marks span status", async () => {
|
||||
const client = createMockClient();
|
||||
client.publishJSON = vi.fn().mockRejectedValue(new Error("Network error"));
|
||||
|
||||
instrumentUpstash(client);
|
||||
|
||||
await expect(async () =>
|
||||
client.publishJSON({
|
||||
url: "https://example.com/api/fail",
|
||||
body: { test: "error" },
|
||||
})
|
||||
).rejects.toThrowError("Network error");
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.status.code).toBe(SpanStatusCode.ERROR);
|
||||
const hasException = span.events.some((event) => event.name === "exception");
|
||||
expect(hasException).toBe(true);
|
||||
});
|
||||
|
||||
it("is idempotent", async () => {
|
||||
const client = createMockClient();
|
||||
const first = instrumentUpstash(client);
|
||||
const second = instrumentUpstash(first);
|
||||
|
||||
expect(first).toBe(second);
|
||||
|
||||
await second.publishJSON({
|
||||
url: "https://example.com/api/test",
|
||||
body: { test: "idempotent" },
|
||||
});
|
||||
|
||||
expect(exporter.getFinishedSpans()).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("handles minimal request", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/minimal",
|
||||
body: { minimal: true },
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/minimal");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("POST");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123");
|
||||
expect(span.status.code).toBe(SpanStatusCode.OK);
|
||||
});
|
||||
|
||||
it("captures request body when captureBody is enabled", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client, { captureBody: true });
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { userId: "123", action: "process" },
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBe(
|
||||
JSON.stringify({ userId: "123", action: "process" })
|
||||
);
|
||||
});
|
||||
|
||||
it("does not capture request body when captureBody is disabled", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { userId: "123", action: "process" },
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBeUndefined();
|
||||
});
|
||||
|
||||
it("truncates long request body based on maxBodyLength", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client, { captureBody: true, maxBodyLength: 50 });
|
||||
|
||||
const longBody = { data: "x".repeat(100) };
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: longBody,
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
const capturedBody = span.attributes[SEMATTRS_QSTASH_REQUEST_BODY] as string;
|
||||
expect(capturedBody).toBeDefined();
|
||||
expect(capturedBody.length).toBe(50 + "... (truncated)".length);
|
||||
expect(capturedBody).toContain("... (truncated)");
|
||||
});
|
||||
|
||||
it("handles string body in request", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client, { captureBody: true });
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: "plain text body",
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBe("plain text body");
|
||||
});
|
||||
});
|
||||
|
||||
describe("instrumentConsumer", () => {
|
||||
let provider: BasicTracerProvider;
|
||||
let exporter: InMemorySpanExporter;
|
||||
|
||||
beforeEach(() => {
|
||||
exporter = new InMemorySpanExporter();
|
||||
provider = new BasicTracerProvider({
|
||||
spanProcessors: [new SimpleSpanProcessor(exporter)],
|
||||
});
|
||||
trace.setGlobalTracerProvider(provider);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await provider.shutdown();
|
||||
exporter.reset();
|
||||
trace.disable();
|
||||
});
|
||||
|
||||
const createMockRequest = (headers: Record<string, string> = {}): Request => {
|
||||
const mockHeaders = new Headers({
|
||||
"content-type": "application/json",
|
||||
...headers,
|
||||
});
|
||||
|
||||
return {
|
||||
headers: mockHeaders,
|
||||
json: vi.fn(async () => ({ data: "test" })),
|
||||
} as unknown as Request;
|
||||
};
|
||||
|
||||
it("wraps handler and records spans", async () => {
|
||||
const handler = vi.fn(async (req: Request) => {
|
||||
return Response.json({ success: true });
|
||||
});
|
||||
|
||||
const instrumentedHandler = instrumentConsumer(handler);
|
||||
|
||||
const request = createMockRequest({
|
||||
"upstash-message-id": "msg_456",
|
||||
});
|
||||
|
||||
const response = await instrumentedHandler(request);
|
||||
expect(response.status).toBe(200);
|
||||
expect(handler).toHaveBeenCalledWith(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.name).toBe("qstash.messages.receive");
|
||||
expect(span.attributes[SEMATTRS_MESSAGING_SYSTEM]).toBe("qstash");
|
||||
expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("receive");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RESOURCE]).toBe("messages");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_TARGET]).toBe("messages.receive");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_456");
|
||||
expect(span.attributes[SEMATTRS_HTTP_STATUS_CODE]).toBe(200);
|
||||
expect(span.status.code).toBe(SpanStatusCode.OK);
|
||||
});
|
||||
|
||||
it("captures QStash headers", async () => {
|
||||
const handler = vi.fn(async () => Response.json({ success: true }));
|
||||
const instrumentedHandler = instrumentConsumer(handler);
|
||||
|
||||
const request = createMockRequest({
|
||||
"upstash-message-id": "msg_789",
|
||||
"upstash-retried": "2",
|
||||
"upstash-schedule-id": "schedule_123",
|
||||
"upstash-caller-ip": "192.168.1.1",
|
||||
});
|
||||
|
||||
await instrumentedHandler(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_789");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RETRIED]).toBe(2);
|
||||
expect(span.attributes[SEMATTRS_QSTASH_SCHEDULE_ID]).toBe("schedule_123");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_CALLER_IP]).toBe("192.168.1.1");
|
||||
});
|
||||
|
||||
it("handles missing QStash headers gracefully", async () => {
|
||||
const handler = vi.fn(async () => Response.json({ success: true }));
|
||||
const instrumentedHandler = instrumentConsumer(handler);
|
||||
|
||||
const request = createMockRequest({});
|
||||
|
||||
await instrumentedHandler(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBeUndefined();
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RETRIED]).toBeUndefined();
|
||||
expect(span.attributes[SEMATTRS_HTTP_STATUS_CODE]).toBe(200);
|
||||
expect(span.status.code).toBe(SpanStatusCode.OK);
|
||||
});
|
||||
|
||||
it("captures errors and marks span status", async () => {
|
||||
const handler = vi.fn().mockRejectedValue(new Error("Processing failed"));
|
||||
const instrumentedHandler = instrumentConsumer(handler);
|
||||
|
||||
const request = createMockRequest({
|
||||
"upstash-message-id": "msg_error",
|
||||
});
|
||||
|
||||
await expect(async () => instrumentedHandler(request)).rejects.toThrowError(
|
||||
"Processing failed"
|
||||
);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.status.code).toBe(SpanStatusCode.ERROR);
|
||||
const hasException = span.events.some((event) => event.name === "exception");
|
||||
expect(hasException).toBe(true);
|
||||
});
|
||||
|
||||
it("marks span as error for non-2xx status codes", async () => {
|
||||
const handler = vi.fn(async () => new Response("Bad Request", { status: 400 }));
|
||||
const instrumentedHandler = instrumentConsumer(handler);
|
||||
|
||||
const request = createMockRequest({
|
||||
"upstash-message-id": "msg_400",
|
||||
});
|
||||
|
||||
const response = await instrumentedHandler(request);
|
||||
expect(response.status).toBe(400);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_HTTP_STATUS_CODE]).toBe(400);
|
||||
expect(span.status.code).toBe(SpanStatusCode.ERROR);
|
||||
});
|
||||
|
||||
it("handles retry count as number", async () => {
|
||||
const handler = vi.fn(async () => Response.json({ success: true }));
|
||||
const instrumentedHandler = instrumentConsumer(handler);
|
||||
|
||||
const request = createMockRequest({
|
||||
"upstash-message-id": "msg_retry",
|
||||
"upstash-retried": "5",
|
||||
});
|
||||
|
||||
await instrumentedHandler(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RETRIED]).toBe(5);
|
||||
});
|
||||
|
||||
it("works with verifySignatureAppRouter pattern", async () => {
|
||||
// Simulate the pattern: verifySignatureAppRouter(instrumentConsumer(handler))
|
||||
const handler = vi.fn(async (req: Request) => {
|
||||
const data = await req.json();
|
||||
return Response.json({ received: data });
|
||||
});
|
||||
|
||||
const instrumentedHandler = instrumentConsumer(handler);
|
||||
|
||||
// Simulate what verifySignatureAppRouter might do (simplified)
|
||||
const wrappedHandler = async (req: Request) => {
|
||||
// Signature verification would happen here
|
||||
return instrumentedHandler(req);
|
||||
};
|
||||
|
||||
const request = createMockRequest({
|
||||
"upstash-message-id": "msg_wrapped",
|
||||
"upstash-retried": "0",
|
||||
});
|
||||
|
||||
const response = await wrappedHandler(request);
|
||||
expect(response.status).toBe(200);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.name).toBe("qstash.messages.receive");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_wrapped");
|
||||
});
|
||||
|
||||
it("captures request body when captureBody is enabled", async () => {
|
||||
const handler = vi.fn(async (req: Request) => {
|
||||
return Response.json({ success: true });
|
||||
});
|
||||
|
||||
const instrumentedHandler = instrumentConsumer(handler, { captureBody: true });
|
||||
|
||||
const mockHeaders = new Headers({
|
||||
"content-type": "application/json",
|
||||
"upstash-message-id": "msg_body_test",
|
||||
});
|
||||
|
||||
const requestBody = JSON.stringify({ imageId: "123", action: "process" });
|
||||
const request = {
|
||||
headers: mockHeaders,
|
||||
clone: () => ({
|
||||
text: vi.fn(async () => requestBody),
|
||||
}),
|
||||
json: vi.fn(async () => JSON.parse(requestBody)),
|
||||
} as unknown as Request;
|
||||
|
||||
await instrumentedHandler(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBe(requestBody);
|
||||
});
|
||||
|
||||
it("captures response body when captureBody is enabled", async () => {
|
||||
const responseBody = JSON.stringify({ success: true, messageId: "msg_456" });
|
||||
const handler = vi.fn(async () => {
|
||||
const response = new Response(responseBody, {
|
||||
status: 200,
|
||||
headers: { "Content-Type": "application/json" },
|
||||
});
|
||||
return response;
|
||||
});
|
||||
|
||||
const instrumentedHandler = instrumentConsumer(handler, { captureBody: true });
|
||||
|
||||
const request = createMockRequest({
|
||||
"upstash-message-id": "msg_response_test",
|
||||
});
|
||||
|
||||
await instrumentedHandler(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RESPONSE_BODY]).toBe(responseBody);
|
||||
});
|
||||
|
||||
it("does not capture bodies when captureBody is disabled", async () => {
|
||||
const handler = vi.fn(async () => Response.json({ success: true }));
|
||||
const instrumentedHandler = instrumentConsumer(handler);
|
||||
|
||||
const request = createMockRequest({
|
||||
"upstash-message-id": "msg_no_capture",
|
||||
});
|
||||
|
||||
await instrumentedHandler(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBeUndefined();
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RESPONSE_BODY]).toBeUndefined();
|
||||
});
|
||||
|
||||
it("truncates long response body based on maxBodyLength", async () => {
|
||||
const longResponse = JSON.stringify({ data: "y".repeat(200) });
|
||||
const handler = vi.fn(async () => {
|
||||
return new Response(longResponse, { status: 200 });
|
||||
});
|
||||
|
||||
const instrumentedHandler = instrumentConsumer(handler, {
|
||||
captureBody: true,
|
||||
maxBodyLength: 50,
|
||||
});
|
||||
|
||||
const request = createMockRequest({
|
||||
"upstash-message-id": "msg_truncate",
|
||||
});
|
||||
|
||||
await instrumentedHandler(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
const capturedBody = span.attributes[SEMATTRS_QSTASH_RESPONSE_BODY] as string;
|
||||
expect(capturedBody).toBeDefined();
|
||||
expect(capturedBody.length).toBe(50 + "... (truncated)".length);
|
||||
expect(capturedBody).toContain("... (truncated)");
|
||||
});
|
||||
});
|
||||
314
packages/otel-upstash-queues/src/index.ts
Normal file
314
packages/otel-upstash-queues/src/index.ts
Normal file
@@ -0,0 +1,314 @@
|
||||
import {
|
||||
context,
|
||||
SpanKind,
|
||||
SpanStatusCode,
|
||||
trace,
|
||||
type Span,
|
||||
} from "@opentelemetry/api";
|
||||
import type { Client, PublishRequest, PublishResponse } from "@upstash/qstash";
|
||||
|
||||
const DEFAULT_TRACER_NAME = "@kubiks/otel-upstash-queues";
|
||||
const INSTRUMENTED_FLAG = Symbol("kubiksOtelUpstashQueuesInstrumented");
|
||||
|
||||
// Semantic attribute constants
|
||||
export const SEMATTRS_MESSAGING_SYSTEM = "messaging.system" as const;
|
||||
export const SEMATTRS_MESSAGING_OPERATION = "messaging.operation" as const;
|
||||
export const SEMATTRS_QSTASH_RESOURCE = "qstash.resource" as const;
|
||||
export const SEMATTRS_QSTASH_TARGET = "qstash.target" as const;
|
||||
export const SEMATTRS_QSTASH_MESSAGE_ID = "qstash.message_id" as const;
|
||||
export const SEMATTRS_QSTASH_URL = "qstash.url" as const;
|
||||
export const SEMATTRS_QSTASH_METHOD = "qstash.method" as const;
|
||||
export const SEMATTRS_QSTASH_DELAY = "qstash.delay" as const;
|
||||
export const SEMATTRS_QSTASH_NOT_BEFORE = "qstash.not_before" as const;
|
||||
export const SEMATTRS_QSTASH_DEDUPLICATION_ID = "qstash.deduplication_id" as const;
|
||||
export const SEMATTRS_QSTASH_RETRIES = "qstash.retries" as const;
|
||||
export const SEMATTRS_QSTASH_CALLBACK_URL = "qstash.callback_url" as const;
|
||||
export const SEMATTRS_QSTASH_FAILURE_CALLBACK_URL = "qstash.failure_callback_url" as const;
|
||||
|
||||
// Receiver-specific attributes
|
||||
export const SEMATTRS_QSTASH_RETRIED = "qstash.retried" as const;
|
||||
export const SEMATTRS_QSTASH_SCHEDULE_ID = "qstash.schedule_id" as const;
|
||||
export const SEMATTRS_QSTASH_CALLER_IP = "qstash.caller_ip" as const;
|
||||
export const SEMATTRS_HTTP_STATUS_CODE = "http.status_code" as const;
|
||||
|
||||
// Body/payload attributes
|
||||
export const SEMATTRS_QSTASH_REQUEST_BODY = "qstash.request.body" as const;
|
||||
export const SEMATTRS_QSTASH_RESPONSE_BODY = "qstash.response.body" as const;
|
||||
|
||||
export interface InstrumentationConfig {
|
||||
/**
|
||||
* Whether to capture request/response bodies in spans.
|
||||
* @default false
|
||||
*/
|
||||
captureBody?: boolean;
|
||||
|
||||
/**
|
||||
* Maximum length of the body to capture. Bodies longer than this will be truncated.
|
||||
* @default 1024
|
||||
*/
|
||||
maxBodyLength?: number;
|
||||
}
|
||||
|
||||
interface InstrumentedClient extends Client {
|
||||
[INSTRUMENTED_FLAG]?: true;
|
||||
}
|
||||
|
||||
function serializeBody(body: unknown, maxLength: number): string {
|
||||
try {
|
||||
const serialized = typeof body === "string" ? body : JSON.stringify(body);
|
||||
if (serialized.length > maxLength) {
|
||||
return serialized.substring(0, maxLength) + "... (truncated)";
|
||||
}
|
||||
return serialized;
|
||||
} catch (error) {
|
||||
return "[Unable to serialize body]";
|
||||
}
|
||||
}
|
||||
|
||||
function annotatePublishSpan(span: Span, request: PublishRequest<string>, config?: InstrumentationConfig): void {
|
||||
// Set base attributes
|
||||
span.setAttributes({
|
||||
[SEMATTRS_MESSAGING_SYSTEM]: "qstash",
|
||||
[SEMATTRS_MESSAGING_OPERATION]: "publish",
|
||||
[SEMATTRS_QSTASH_RESOURCE]: "messages",
|
||||
[SEMATTRS_QSTASH_TARGET]: "messages.publish",
|
||||
});
|
||||
|
||||
// Set URL
|
||||
if (request.url) {
|
||||
span.setAttribute(SEMATTRS_QSTASH_URL, request.url);
|
||||
}
|
||||
|
||||
// Set HTTP method (default is POST)
|
||||
const method = request.method || "POST";
|
||||
span.setAttribute(SEMATTRS_QSTASH_METHOD, method);
|
||||
|
||||
// Set delay if present
|
||||
if (typeof request.delay !== "undefined") {
|
||||
if (typeof request.delay === "number") {
|
||||
span.setAttribute(SEMATTRS_QSTASH_DELAY, request.delay);
|
||||
} else if (typeof request.delay === "string") {
|
||||
span.setAttribute(SEMATTRS_QSTASH_DELAY, request.delay);
|
||||
}
|
||||
}
|
||||
|
||||
// Set notBefore if present
|
||||
if (typeof request.notBefore !== "undefined") {
|
||||
span.setAttribute(SEMATTRS_QSTASH_NOT_BEFORE, request.notBefore);
|
||||
}
|
||||
|
||||
// Set deduplication ID if present
|
||||
if (request.deduplicationId) {
|
||||
span.setAttribute(SEMATTRS_QSTASH_DEDUPLICATION_ID, request.deduplicationId);
|
||||
}
|
||||
|
||||
// Set retries if present
|
||||
if (typeof request.retries !== "undefined") {
|
||||
span.setAttribute(SEMATTRS_QSTASH_RETRIES, request.retries);
|
||||
}
|
||||
|
||||
// Set callback URL if present
|
||||
if (request.callback) {
|
||||
span.setAttribute(SEMATTRS_QSTASH_CALLBACK_URL, request.callback);
|
||||
}
|
||||
|
||||
// Set failure callback URL if present
|
||||
if (request.failureCallback) {
|
||||
span.setAttribute(SEMATTRS_QSTASH_FAILURE_CALLBACK_URL, request.failureCallback);
|
||||
}
|
||||
|
||||
// Capture request body if enabled
|
||||
if (config?.captureBody && request.body !== undefined) {
|
||||
const maxLength = config.maxBodyLength ?? 1024;
|
||||
const bodyString = serializeBody(request.body, maxLength);
|
||||
span.setAttribute(SEMATTRS_QSTASH_REQUEST_BODY, bodyString);
|
||||
}
|
||||
}
|
||||
|
||||
function annotatePublishResponse(
|
||||
span: Span,
|
||||
response: { messageId?: string },
|
||||
): void {
|
||||
if (response && typeof response === "object" && "messageId" in response && response.messageId) {
|
||||
span.setAttribute(SEMATTRS_QSTASH_MESSAGE_ID, response.messageId);
|
||||
}
|
||||
}
|
||||
|
||||
function finalizeSpan(span: Span, error?: unknown): void {
|
||||
if (error) {
|
||||
if (error instanceof Error) {
|
||||
span.recordException(error);
|
||||
} else {
|
||||
span.recordException(new Error(String(error)));
|
||||
}
|
||||
span.setStatus({ code: SpanStatusCode.ERROR });
|
||||
} else {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
}
|
||||
span.end();
|
||||
}
|
||||
|
||||
export function instrumentUpstash(client: Client, config?: InstrumentationConfig): Client {
|
||||
// Check if already instrumented
|
||||
if ((client as InstrumentedClient)[INSTRUMENTED_FLAG]) {
|
||||
return client;
|
||||
}
|
||||
|
||||
const tracer = trace.getTracer(DEFAULT_TRACER_NAME);
|
||||
|
||||
// Instrument publishJSON method
|
||||
const originalPublishJSON = client.publishJSON.bind(client);
|
||||
|
||||
const instrumentedPublishJSON = async function instrumentedPublishJSON<TBody = unknown, TRequest extends PublishRequest<TBody> = PublishRequest<TBody>>(
|
||||
request: TRequest,
|
||||
): Promise<PublishResponse<TRequest>> {
|
||||
const span = tracer.startSpan("qstash.messages.publish", {
|
||||
kind: SpanKind.CLIENT,
|
||||
});
|
||||
|
||||
// Annotate span with request details
|
||||
annotatePublishSpan(span, request as PublishRequest<string>, config);
|
||||
|
||||
// Set the span as active
|
||||
const activeContext = trace.setSpan(context.active(), span);
|
||||
|
||||
try {
|
||||
// Call the original method within the active context
|
||||
const response = await context.with(activeContext, () =>
|
||||
originalPublishJSON(request),
|
||||
);
|
||||
|
||||
// Annotate with response data
|
||||
annotatePublishResponse(span, response);
|
||||
|
||||
// Mark as successful
|
||||
finalizeSpan(span);
|
||||
|
||||
return response;
|
||||
} catch (error) {
|
||||
// Mark as failed
|
||||
finalizeSpan(span, error);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
|
||||
// Replace the method with our instrumented version
|
||||
client.publishJSON = instrumentedPublishJSON;
|
||||
|
||||
// Mark as instrumented
|
||||
(client as InstrumentedClient)[INSTRUMENTED_FLAG] = true;
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
// Type for Next.js route handlers
|
||||
type RouteHandler = (request: Request) => Promise<Response> | Response;
|
||||
|
||||
function extractQStashHeaders(request: Request): Record<string, string | number> {
|
||||
const attributes: Record<string, string | number> = {};
|
||||
|
||||
// Extract QStash message ID
|
||||
const messageId = request.headers.get("upstash-message-id");
|
||||
if (messageId) {
|
||||
attributes[SEMATTRS_QSTASH_MESSAGE_ID] = messageId;
|
||||
}
|
||||
|
||||
// Extract retry count
|
||||
const retried = request.headers.get("upstash-retried");
|
||||
if (retried) {
|
||||
const retriedNum = parseInt(retried, 10);
|
||||
if (!isNaN(retriedNum)) {
|
||||
attributes[SEMATTRS_QSTASH_RETRIED] = retriedNum;
|
||||
}
|
||||
}
|
||||
|
||||
// Extract schedule ID if present
|
||||
const scheduleId = request.headers.get("upstash-schedule-id");
|
||||
if (scheduleId) {
|
||||
attributes[SEMATTRS_QSTASH_SCHEDULE_ID] = scheduleId;
|
||||
}
|
||||
|
||||
// Extract caller IP
|
||||
const callerIp = request.headers.get("upstash-caller-ip");
|
||||
if (callerIp) {
|
||||
attributes[SEMATTRS_QSTASH_CALLER_IP] = callerIp;
|
||||
}
|
||||
|
||||
return attributes;
|
||||
}
|
||||
|
||||
export function instrumentConsumer(handler: RouteHandler, config?: InstrumentationConfig): RouteHandler {
|
||||
const tracer = trace.getTracer(DEFAULT_TRACER_NAME);
|
||||
|
||||
return async function instrumentedConsumer(request: Request): Promise<Response> {
|
||||
const span = tracer.startSpan("qstash.messages.receive", {
|
||||
kind: SpanKind.SERVER,
|
||||
});
|
||||
|
||||
// Set base attributes
|
||||
span.setAttributes({
|
||||
[SEMATTRS_MESSAGING_SYSTEM]: "qstash",
|
||||
[SEMATTRS_MESSAGING_OPERATION]: "receive",
|
||||
[SEMATTRS_QSTASH_RESOURCE]: "messages",
|
||||
[SEMATTRS_QSTASH_TARGET]: "messages.receive",
|
||||
});
|
||||
|
||||
// Extract and set QStash headers
|
||||
const qstashHeaders = extractQStashHeaders(request);
|
||||
span.setAttributes(qstashHeaders);
|
||||
|
||||
// Capture request body if enabled
|
||||
if (config?.captureBody) {
|
||||
try {
|
||||
const clonedRequest = request.clone();
|
||||
const body = await clonedRequest.text();
|
||||
if (body) {
|
||||
const maxLength = config.maxBodyLength ?? 1024;
|
||||
const bodyString = serializeBody(body, maxLength);
|
||||
span.setAttribute(SEMATTRS_QSTASH_REQUEST_BODY, bodyString);
|
||||
}
|
||||
} catch (error) {
|
||||
// Ignore errors when capturing request body
|
||||
}
|
||||
}
|
||||
|
||||
// Set the span as active
|
||||
const activeContext = trace.setSpan(context.active(), span);
|
||||
|
||||
try {
|
||||
// Call the handler within the active context
|
||||
const response = await context.with(activeContext, () => handler(request));
|
||||
|
||||
// Capture response status
|
||||
span.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status);
|
||||
|
||||
// Capture response body if enabled
|
||||
if (config?.captureBody) {
|
||||
try {
|
||||
const clonedResponse = response.clone();
|
||||
const responseBody = await clonedResponse.text();
|
||||
if (responseBody) {
|
||||
const maxLength = config.maxBodyLength ?? 1024;
|
||||
const bodyString = serializeBody(responseBody, maxLength);
|
||||
span.setAttribute(SEMATTRS_QSTASH_RESPONSE_BODY, bodyString);
|
||||
}
|
||||
} catch (error) {
|
||||
// Ignore errors when capturing response body
|
||||
}
|
||||
}
|
||||
|
||||
// Mark as successful if status is 2xx
|
||||
if (response.status >= 200 && response.status < 300) {
|
||||
finalizeSpan(span);
|
||||
} else {
|
||||
finalizeSpan(span, new Error(`Handler returned status ${response.status}`));
|
||||
}
|
||||
|
||||
return response;
|
||||
} catch (error) {
|
||||
// Mark as failed
|
||||
finalizeSpan(span, error);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
}
|
||||
21
packages/otel-upstash-queues/tsconfig.json
Normal file
21
packages/otel-upstash-queues/tsconfig.json
Normal file
@@ -0,0 +1,21 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"target": "ES2020",
|
||||
"module": "ESNext",
|
||||
"moduleResolution": "bundler",
|
||||
"lib": ["ES2020", "DOM"],
|
||||
"outDir": "dist",
|
||||
"declaration": true,
|
||||
"declarationMap": true,
|
||||
"sourceMap": true,
|
||||
"strict": false,
|
||||
"esModuleInterop": true,
|
||||
"skipLibCheck": true,
|
||||
"forceConsistentCasingInFileNames": true,
|
||||
"resolveJsonModule": true,
|
||||
"declarationDir": "dist/types",
|
||||
"stripInternal": true
|
||||
},
|
||||
"include": ["src/**/*"],
|
||||
"exclude": ["node_modules", "dist", "**/*.test.ts"]
|
||||
}
|
||||
49
pnpm-lock.yaml
generated
49
pnpm-lock.yaml
generated
@@ -102,6 +102,30 @@ importers:
|
||||
specifier: 0.33.0
|
||||
version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0)
|
||||
|
||||
packages/otel-upstash-queues:
|
||||
devDependencies:
|
||||
'@opentelemetry/api':
|
||||
specifier: ^1.9.0
|
||||
version: 1.9.0
|
||||
'@opentelemetry/sdk-trace-base':
|
||||
specifier: ^2.1.0
|
||||
version: 2.1.0(@opentelemetry/api@1.9.0)
|
||||
'@types/node':
|
||||
specifier: 18.15.11
|
||||
version: 18.15.11
|
||||
'@upstash/qstash':
|
||||
specifier: ^2.0.0
|
||||
version: 2.8.3
|
||||
rimraf:
|
||||
specifier: 3.0.2
|
||||
version: 3.0.2
|
||||
typescript:
|
||||
specifier: ^5
|
||||
version: 5.3.3
|
||||
vitest:
|
||||
specifier: 0.33.0
|
||||
version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0)
|
||||
|
||||
packages:
|
||||
|
||||
'@adobe/css-tools@4.3.2':
|
||||
@@ -536,6 +560,9 @@ packages:
|
||||
'@types/semver@7.5.6':
|
||||
resolution: {integrity: sha512-dn1l8LaMea/IjDoHNd9J52uBbInB796CDffS6VdIxvqYCPSG0V0DzHp76GpaWnlhg88uYyPbXCDIowa86ybd5A==}
|
||||
|
||||
'@upstash/qstash@2.8.3':
|
||||
resolution: {integrity: sha512-SHf1mCGqZur0UTzXVx33phtFXIuLyjwDL1QsBE36gQFEx3rEG4fJc3qA2eD7jTUXEAYYrNkCQxMOtcteHFpwqw==}
|
||||
|
||||
'@vitest/expect@0.33.0':
|
||||
resolution: {integrity: sha512-sVNf+Gla3mhTCxNJx+wJLDPp/WcstOe0Ksqz4Vec51MmgMth/ia0MGFEkIZmVGeTL5HtjYR4Wl/ZxBxBXZJTzQ==}
|
||||
|
||||
@@ -770,6 +797,9 @@ packages:
|
||||
resolution: {integrity: sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==}
|
||||
engines: {node: '>= 8'}
|
||||
|
||||
crypto-js@4.2.0:
|
||||
resolution: {integrity: sha512-KALDyEYgpY+Rlob/iriUtjV6d5Eq+Y191A5g4UqLAi8CyGP9N1+FdVbkc1SxKc2r4YAYqG8JzO2KGL+AizD70Q==}
|
||||
|
||||
csstype@3.1.3:
|
||||
resolution: {integrity: sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw==}
|
||||
|
||||
@@ -1302,6 +1332,9 @@ packages:
|
||||
jackspeak@3.4.3:
|
||||
resolution: {integrity: sha512-OGlZQpz2yfahA/Rd1Y8Cd9SIEsqvXkLVoSw/cgwhnhFMDbsQFeZYoJJ7bIZBS9BcamUW96asq/npPWugM+RQBw==}
|
||||
|
||||
jose@5.10.0:
|
||||
resolution: {integrity: sha512-s+3Al/p9g32Iq+oqXxkW//7jk2Vig6FF1CFqzVXoTUXt2qz89YWbL+OwS17NFYEvxC35n0FKeGO2LGYSxeM2Gg==}
|
||||
|
||||
jose@6.1.0:
|
||||
resolution: {integrity: sha512-TTQJyoEoKcC1lscpVDCSsVgYzUDg/0Bt3WE//WiTPK6uOCQC2KZS4MpugbMWt/zyjkopgZoXhZuCi00gLudfUA==}
|
||||
|
||||
@@ -1469,6 +1502,10 @@ packages:
|
||||
engines: {node: '>= 4.4.x'}
|
||||
hasBin: true
|
||||
|
||||
neverthrow@7.2.0:
|
||||
resolution: {integrity: sha512-iGBUfFB7yPczHHtA8dksKTJ9E8TESNTAx1UQWW6TzMF280vo9jdPYpLUXrMN1BCkPdHFdNG3fxOt2CUad8KhAw==}
|
||||
engines: {node: '>=18'}
|
||||
|
||||
node-fetch@2.7.0:
|
||||
resolution: {integrity: sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==}
|
||||
engines: {node: 4.x || >=6.0.0}
|
||||
@@ -2781,6 +2818,12 @@ snapshots:
|
||||
|
||||
'@types/semver@7.5.6': {}
|
||||
|
||||
'@upstash/qstash@2.8.3':
|
||||
dependencies:
|
||||
crypto-js: 4.2.0
|
||||
jose: 5.10.0
|
||||
neverthrow: 7.2.0
|
||||
|
||||
'@vitest/expect@0.33.0':
|
||||
dependencies:
|
||||
'@vitest/spy': 0.33.0
|
||||
@@ -3039,6 +3082,8 @@ snapshots:
|
||||
shebang-command: 2.0.0
|
||||
which: 2.0.2
|
||||
|
||||
crypto-js@4.2.0: {}
|
||||
|
||||
csstype@3.1.3:
|
||||
optional: true
|
||||
|
||||
@@ -3555,6 +3600,8 @@ snapshots:
|
||||
optionalDependencies:
|
||||
'@pkgjs/parseargs': 0.11.0
|
||||
|
||||
jose@5.10.0: {}
|
||||
|
||||
jose@6.1.0: {}
|
||||
|
||||
js-beautify@1.15.4:
|
||||
@@ -3726,6 +3773,8 @@ snapshots:
|
||||
sax: 1.3.0
|
||||
optional: true
|
||||
|
||||
neverthrow@7.2.0: {}
|
||||
|
||||
node-fetch@2.7.0:
|
||||
dependencies:
|
||||
whatwg-url: 5.0.0
|
||||
|
||||
Reference in New Issue
Block a user