update docs, add image

This commit is contained in:
Alex Holovach
2025-10-07 10:18:51 -05:00
parent 51d04091cf
commit f755db971b
6 changed files with 353 additions and 9 deletions

View File

@@ -14,11 +14,23 @@
- Retries configuration
- Callback URLs
- Custom HTTP methods
- Optional configuration support:
- `captureBody`: Enable/disable request body capture (default: false)
- `maxBodyLength`: Limit captured body size (default: 1024)
- **Consumer Instrumentation** (`instrumentConsumer`):
- Instrumentation for message handler functions
- Extracts QStash headers (message ID, retry count, schedule ID, caller IP)
- Tracks HTTP response status codes
- Works seamlessly with `verifySignatureAppRouter` from `@upstash/qstash/nextjs`
- Comprehensive test coverage (19 tests)
- Optional configuration support:
- `captureBody`: Enable/disable request and response body capture (default: false)
- `maxBodyLength`: Limit captured body size (default: 1024)
- **Body Capture Features**:
- Safe serialization with error handling
- Automatic truncation of large bodies
- Disabled by default for security
- Captured as `qstash.request.body` and `qstash.response.body` attributes
- Comprehensive test coverage (27 tests)
- Full TypeScript support with proper types from @upstash/qstash SDK
- No `any` or `unknown` types - fully type-safe
- No `any` or `unknown` types - fully type-safe
- Exported `InstrumentationConfig` type for TypeScript users

View File

@@ -4,6 +4,10 @@ OpenTelemetry instrumentation for the [Upstash QStash](https://upstash.com/docs/
Capture spans for every QStash API call, enrich them with operation metadata,
and keep an eye on message queue operations from your traces.
![Upstash QStash Trace Visualization](https://github.com/kubiks-inc/otel/blob/main/images/otel-upstash-queue-trace.png)
_Visualize your message queue operations with detailed span information including message publishing, callbacks, and delivery tracking._
## Installation
```bash
@@ -33,6 +37,20 @@ await client.publishJSON({
`instrumentUpstash` wraps the QStash client instance you already use — no configuration changes
needed. Every SDK call creates a client span with useful attributes.
### With Body Capture
Optionally capture request/response bodies for debugging:
```ts
const client = instrumentUpstash(
new Client({ token: process.env.QSTASH_TOKEN! }),
{
captureBody: true, // Enable body capture (default: false)
maxBodyLength: 2048, // Max characters to capture (default: 1024)
}
);
```
## What Gets Traced
This instrumentation provides two main functions:
@@ -82,7 +100,16 @@ The `instrumentConsumer` function wraps your message handler, creating a span wi
| `qstash.caller_ip` | IP address of the caller | `192.168.1.1` |
| `http.status_code` | HTTP response status code | `200` |
The instrumentation captures message metadata and configuration to help with debugging and monitoring, while avoiding sensitive message content.
### Body/Payload Attributes (Optional)
When `captureBody` is enabled in configuration:
| Attribute | Description | Captured By |
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
| `qstash.request.body` | Request/message body content | Both publisher and consumer |
| `qstash.response.body` | Response body content | Consumer only |
The instrumentation captures message metadata and configuration to help with debugging and monitoring. Body capture is **disabled by default** to protect sensitive data.
## Usage Examples
@@ -181,11 +208,23 @@ async function handler(request: Request) {
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
```
**With body capture:**
```ts
export const POST = verifySignatureAppRouter(
instrumentConsumer(handler, {
captureBody: true,
maxBodyLength: 2048,
})
);
```
The `instrumentConsumer` function:
- Extracts QStash headers (message ID, retry count, schedule ID, caller IP)
- Creates a SERVER span for the message processing
- Tracks response status codes
- Captures errors during processing
- Optionally captures request and response bodies (when configured)
### Complete Next.js Integration Example
@@ -280,6 +319,20 @@ All of this happens automatically once you wrap your client and handlers with th
4. **Monitor Retry Patterns**: Use the `qstash.retried` attribute to track retry patterns and identify problematic messages.
### Configuration Best Practices
1. **Body Capture in Development**: Enable `captureBody` in development environments for easier debugging:
```ts
const config = {
captureBody: process.env.NODE_ENV === "development",
maxBodyLength: 2048,
};
```
2. **Protect Sensitive Data**: Be cautious about enabling body capture in production if your messages contain sensitive information (PII, credentials, etc.).
3. **Set Appropriate Limits**: Use `maxBodyLength` to prevent excessively large spans. Bodies exceeding this limit will be truncated with `"... (truncated)"` appended.
### General Best Practices
1. **Monitor Traces**: Use OpenTelemetry-compatible tracing backends (like Jaeger, Zipkin, or cloud providers) to visualize your message queues.

View File

@@ -1,6 +1,6 @@
{
"name": "@kubiks/otel-upstash-queues",
"version": "1.0.0",
"version": "1.0.1",
"private": false,
"publishConfig": {
"access": "public"
@@ -50,4 +50,4 @@
"@opentelemetry/api": ">=1.9.0 <2.0.0",
"@upstash/qstash": ">=2.0.0"
}
}
}

View File

@@ -26,6 +26,8 @@ import {
SEMATTRS_QSTASH_SCHEDULE_ID,
SEMATTRS_QSTASH_CALLER_IP,
SEMATTRS_HTTP_STATUS_CODE,
SEMATTRS_QSTASH_REQUEST_BODY,
SEMATTRS_QSTASH_RESPONSE_BODY,
} from "./index";
describe("instrumentUpstash", () => {
@@ -354,6 +356,100 @@ describe("instrumentUpstash", () => {
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123");
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("captures request body when captureBody is enabled", async () => {
const client = createMockClient();
instrumentUpstash(client, { captureBody: true });
const request = {
url: "https://example.com/api/process",
body: { userId: "123", action: "process" },
};
await client.publishJSON(request);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBe(
JSON.stringify({ userId: "123", action: "process" })
);
});
it("does not capture request body when captureBody is disabled", async () => {
const client = createMockClient();
instrumentUpstash(client);
const request = {
url: "https://example.com/api/process",
body: { userId: "123", action: "process" },
};
await client.publishJSON(request);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBeUndefined();
});
it("truncates long request body based on maxBodyLength", async () => {
const client = createMockClient();
instrumentUpstash(client, { captureBody: true, maxBodyLength: 50 });
const longBody = { data: "x".repeat(100) };
const request = {
url: "https://example.com/api/process",
body: longBody,
};
await client.publishJSON(request);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
const capturedBody = span.attributes[SEMATTRS_QSTASH_REQUEST_BODY] as string;
expect(capturedBody).toBeDefined();
expect(capturedBody.length).toBe(50 + "... (truncated)".length);
expect(capturedBody).toContain("... (truncated)");
});
it("handles string body in request", async () => {
const client = createMockClient();
instrumentUpstash(client, { captureBody: true });
const request = {
url: "https://example.com/api/process",
body: "plain text body",
};
await client.publishJSON(request);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBe("plain text body");
});
});
describe("instrumentConsumer", () => {
@@ -572,4 +668,120 @@ describe("instrumentConsumer", () => {
expect(span.name).toBe("qstash.messages.receive");
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_wrapped");
});
it("captures request body when captureBody is enabled", async () => {
const handler = vi.fn(async (req: Request) => {
return Response.json({ success: true });
});
const instrumentedHandler = instrumentConsumer(handler, { captureBody: true });
const mockHeaders = new Headers({
"content-type": "application/json",
"upstash-message-id": "msg_body_test",
});
const requestBody = JSON.stringify({ imageId: "123", action: "process" });
const request = {
headers: mockHeaders,
clone: () => ({
text: vi.fn(async () => requestBody),
}),
json: vi.fn(async () => JSON.parse(requestBody)),
} as unknown as Request;
await instrumentedHandler(request);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBe(requestBody);
});
it("captures response body when captureBody is enabled", async () => {
const responseBody = JSON.stringify({ success: true, messageId: "msg_456" });
const handler = vi.fn(async () => {
const response = new Response(responseBody, {
status: 200,
headers: { "Content-Type": "application/json" },
});
return response;
});
const instrumentedHandler = instrumentConsumer(handler, { captureBody: true });
const request = createMockRequest({
"upstash-message-id": "msg_response_test",
});
await instrumentedHandler(request);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.attributes[SEMATTRS_QSTASH_RESPONSE_BODY]).toBe(responseBody);
});
it("does not capture bodies when captureBody is disabled", async () => {
const handler = vi.fn(async () => Response.json({ success: true }));
const instrumentedHandler = instrumentConsumer(handler);
const request = createMockRequest({
"upstash-message-id": "msg_no_capture",
});
await instrumentedHandler(request);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBeUndefined();
expect(span.attributes[SEMATTRS_QSTASH_RESPONSE_BODY]).toBeUndefined();
});
it("truncates long response body based on maxBodyLength", async () => {
const longResponse = JSON.stringify({ data: "y".repeat(200) });
const handler = vi.fn(async () => {
return new Response(longResponse, { status: 200 });
});
const instrumentedHandler = instrumentConsumer(handler, {
captureBody: true,
maxBodyLength: 50,
});
const request = createMockRequest({
"upstash-message-id": "msg_truncate",
});
await instrumentedHandler(request);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
const capturedBody = span.attributes[SEMATTRS_QSTASH_RESPONSE_BODY] as string;
expect(capturedBody).toBeDefined();
expect(capturedBody.length).toBe(50 + "... (truncated)".length);
expect(capturedBody).toContain("... (truncated)");
});
});

View File

@@ -31,11 +31,41 @@ export const SEMATTRS_QSTASH_SCHEDULE_ID = "qstash.schedule_id" as const;
export const SEMATTRS_QSTASH_CALLER_IP = "qstash.caller_ip" as const;
export const SEMATTRS_HTTP_STATUS_CODE = "http.status_code" as const;
// Body/payload attributes
export const SEMATTRS_QSTASH_REQUEST_BODY = "qstash.request.body" as const;
export const SEMATTRS_QSTASH_RESPONSE_BODY = "qstash.response.body" as const;
export interface InstrumentationConfig {
/**
* Whether to capture request/response bodies in spans.
* @default false
*/
captureBody?: boolean;
/**
* Maximum length of the body to capture. Bodies longer than this will be truncated.
* @default 1024
*/
maxBodyLength?: number;
}
interface InstrumentedClient extends Client {
[INSTRUMENTED_FLAG]?: true;
}
function annotatePublishSpan(span: Span, request: PublishRequest<string>): void {
function serializeBody(body: unknown, maxLength: number): string {
try {
const serialized = typeof body === "string" ? body : JSON.stringify(body);
if (serialized.length > maxLength) {
return serialized.substring(0, maxLength) + "... (truncated)";
}
return serialized;
} catch (error) {
return "[Unable to serialize body]";
}
}
function annotatePublishSpan(span: Span, request: PublishRequest<string>, config?: InstrumentationConfig): void {
// Set base attributes
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "qstash",
@@ -86,6 +116,13 @@ function annotatePublishSpan(span: Span, request: PublishRequest<string>): void
if (request.failureCallback) {
span.setAttribute(SEMATTRS_QSTASH_FAILURE_CALLBACK_URL, request.failureCallback);
}
// Capture request body if enabled
if (config?.captureBody && request.body !== undefined) {
const maxLength = config.maxBodyLength ?? 1024;
const bodyString = serializeBody(request.body, maxLength);
span.setAttribute(SEMATTRS_QSTASH_REQUEST_BODY, bodyString);
}
}
function annotatePublishResponse(
@@ -111,7 +148,7 @@ function finalizeSpan(span: Span, error?: unknown): void {
span.end();
}
export function instrumentUpstash(client: Client): Client {
export function instrumentUpstash(client: Client, config?: InstrumentationConfig): Client {
// Check if already instrumented
if ((client as InstrumentedClient)[INSTRUMENTED_FLAG]) {
return client;
@@ -130,7 +167,7 @@ export function instrumentUpstash(client: Client): Client {
});
// Annotate span with request details
annotatePublishSpan(span, request as PublishRequest<string>);
annotatePublishSpan(span, request as PublishRequest<string>, config);
// Set the span as active
const activeContext = trace.setSpan(context.active(), span);
@@ -200,7 +237,7 @@ function extractQStashHeaders(request: Request): Record<string, string | number>
return attributes;
}
export function instrumentConsumer(handler: RouteHandler): RouteHandler {
export function instrumentConsumer(handler: RouteHandler, config?: InstrumentationConfig): RouteHandler {
const tracer = trace.getTracer(DEFAULT_TRACER_NAME);
return async function instrumentedConsumer(request: Request): Promise<Response> {
@@ -220,6 +257,21 @@ export function instrumentConsumer(handler: RouteHandler): RouteHandler {
const qstashHeaders = extractQStashHeaders(request);
span.setAttributes(qstashHeaders);
// Capture request body if enabled
if (config?.captureBody) {
try {
const clonedRequest = request.clone();
const body = await clonedRequest.text();
if (body) {
const maxLength = config.maxBodyLength ?? 1024;
const bodyString = serializeBody(body, maxLength);
span.setAttribute(SEMATTRS_QSTASH_REQUEST_BODY, bodyString);
}
} catch (error) {
// Ignore errors when capturing request body
}
}
// Set the span as active
const activeContext = trace.setSpan(context.active(), span);
@@ -230,6 +282,21 @@ export function instrumentConsumer(handler: RouteHandler): RouteHandler {
// Capture response status
span.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status);
// Capture response body if enabled
if (config?.captureBody) {
try {
const clonedResponse = response.clone();
const responseBody = await clonedResponse.text();
if (responseBody) {
const maxLength = config.maxBodyLength ?? 1024;
const bodyString = serializeBody(responseBody, maxLength);
span.setAttribute(SEMATTRS_QSTASH_RESPONSE_BODY, bodyString);
}
} catch (error) {
// Ignore errors when capturing response body
}
}
// Mark as successful if status is 2xx
if (response.status >= 200 && response.status < 300) {
finalizeSpan(span);