diff --git a/images/otel-upstash-queue-trace.png b/images/otel-upstash-queue-trace.png new file mode 100644 index 0000000..71f2eb1 Binary files /dev/null and b/images/otel-upstash-queue-trace.png differ diff --git a/packages/otel-upstash-queues/CHANGELOG.md b/packages/otel-upstash-queues/CHANGELOG.md index 366bd2b..4035284 100644 --- a/packages/otel-upstash-queues/CHANGELOG.md +++ b/packages/otel-upstash-queues/CHANGELOG.md @@ -14,11 +14,23 @@ - Retries configuration - Callback URLs - Custom HTTP methods + - Optional configuration support: + - `captureBody`: Enable/disable request body capture (default: false) + - `maxBodyLength`: Limit captured body size (default: 1024) - **Consumer Instrumentation** (`instrumentConsumer`): - Instrumentation for message handler functions - Extracts QStash headers (message ID, retry count, schedule ID, caller IP) - Tracks HTTP response status codes - Works seamlessly with `verifySignatureAppRouter` from `@upstash/qstash/nextjs` -- Comprehensive test coverage (19 tests) + - Optional configuration support: + - `captureBody`: Enable/disable request and response body capture (default: false) + - `maxBodyLength`: Limit captured body size (default: 1024) +- **Body Capture Features**: + - Safe serialization with error handling + - Automatic truncation of large bodies + - Disabled by default for security + - Captured as `qstash.request.body` and `qstash.response.body` attributes +- Comprehensive test coverage (27 tests) - Full TypeScript support with proper types from @upstash/qstash SDK -- No `any` or `unknown` types - fully type-safe \ No newline at end of file +- No `any` or `unknown` types - fully type-safe +- Exported `InstrumentationConfig` type for TypeScript users \ No newline at end of file diff --git a/packages/otel-upstash-queues/README.md b/packages/otel-upstash-queues/README.md index 8ba82bd..7ef213f 100644 --- a/packages/otel-upstash-queues/README.md +++ b/packages/otel-upstash-queues/README.md @@ -4,6 +4,10 @@ OpenTelemetry instrumentation for the [Upstash QStash](https://upstash.com/docs/ Capture spans for every QStash API call, enrich them with operation metadata, and keep an eye on message queue operations from your traces. +![Upstash QStash Trace Visualization](https://github.com/kubiks-inc/otel/blob/main/images/otel-upstash-queue-trace.png) + +_Visualize your message queue operations with detailed span information including message publishing, callbacks, and delivery tracking._ + ## Installation ```bash @@ -33,6 +37,20 @@ await client.publishJSON({ `instrumentUpstash` wraps the QStash client instance you already use — no configuration changes needed. Every SDK call creates a client span with useful attributes. +### With Body Capture + +Optionally capture request/response bodies for debugging: + +```ts +const client = instrumentUpstash( + new Client({ token: process.env.QSTASH_TOKEN! }), + { + captureBody: true, // Enable body capture (default: false) + maxBodyLength: 2048, // Max characters to capture (default: 1024) + } +); +``` + ## What Gets Traced This instrumentation provides two main functions: @@ -82,7 +100,16 @@ The `instrumentConsumer` function wraps your message handler, creating a span wi | `qstash.caller_ip` | IP address of the caller | `192.168.1.1` | | `http.status_code` | HTTP response status code | `200` | -The instrumentation captures message metadata and configuration to help with debugging and monitoring, while avoiding sensitive message content. +### Body/Payload Attributes (Optional) + +When `captureBody` is enabled in configuration: + +| Attribute | Description | Captured By | +| ------------------------------ | ------------------------------------------- | -------------------------------------------- | +| `qstash.request.body` | Request/message body content | Both publisher and consumer | +| `qstash.response.body` | Response body content | Consumer only | + +The instrumentation captures message metadata and configuration to help with debugging and monitoring. Body capture is **disabled by default** to protect sensitive data. ## Usage Examples @@ -181,11 +208,23 @@ async function handler(request: Request) { export const POST = verifySignatureAppRouter(instrumentConsumer(handler)); ``` +**With body capture:** + +```ts +export const POST = verifySignatureAppRouter( + instrumentConsumer(handler, { + captureBody: true, + maxBodyLength: 2048, + }) +); +``` + The `instrumentConsumer` function: - Extracts QStash headers (message ID, retry count, schedule ID, caller IP) - Creates a SERVER span for the message processing - Tracks response status codes - Captures errors during processing +- Optionally captures request and response bodies (when configured) ### Complete Next.js Integration Example @@ -280,6 +319,20 @@ All of this happens automatically once you wrap your client and handlers with th 4. **Monitor Retry Patterns**: Use the `qstash.retried` attribute to track retry patterns and identify problematic messages. +### Configuration Best Practices + +1. **Body Capture in Development**: Enable `captureBody` in development environments for easier debugging: + ```ts + const config = { + captureBody: process.env.NODE_ENV === "development", + maxBodyLength: 2048, + }; + ``` + +2. **Protect Sensitive Data**: Be cautious about enabling body capture in production if your messages contain sensitive information (PII, credentials, etc.). + +3. **Set Appropriate Limits**: Use `maxBodyLength` to prevent excessively large spans. Bodies exceeding this limit will be truncated with `"... (truncated)"` appended. + ### General Best Practices 1. **Monitor Traces**: Use OpenTelemetry-compatible tracing backends (like Jaeger, Zipkin, or cloud providers) to visualize your message queues. diff --git a/packages/otel-upstash-queues/package.json b/packages/otel-upstash-queues/package.json index 84dbd55..5a1e872 100644 --- a/packages/otel-upstash-queues/package.json +++ b/packages/otel-upstash-queues/package.json @@ -1,6 +1,6 @@ { "name": "@kubiks/otel-upstash-queues", - "version": "1.0.0", + "version": "1.0.1", "private": false, "publishConfig": { "access": "public" @@ -50,4 +50,4 @@ "@opentelemetry/api": ">=1.9.0 <2.0.0", "@upstash/qstash": ">=2.0.0" } -} \ No newline at end of file +} diff --git a/packages/otel-upstash-queues/src/index.test.ts b/packages/otel-upstash-queues/src/index.test.ts index 1be8cdf..2e58fc9 100644 --- a/packages/otel-upstash-queues/src/index.test.ts +++ b/packages/otel-upstash-queues/src/index.test.ts @@ -26,6 +26,8 @@ import { SEMATTRS_QSTASH_SCHEDULE_ID, SEMATTRS_QSTASH_CALLER_IP, SEMATTRS_HTTP_STATUS_CODE, + SEMATTRS_QSTASH_REQUEST_BODY, + SEMATTRS_QSTASH_RESPONSE_BODY, } from "./index"; describe("instrumentUpstash", () => { @@ -354,6 +356,100 @@ describe("instrumentUpstash", () => { expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123"); expect(span.status.code).toBe(SpanStatusCode.OK); }); + + it("captures request body when captureBody is enabled", async () => { + const client = createMockClient(); + instrumentUpstash(client, { captureBody: true }); + + const request = { + url: "https://example.com/api/process", + body: { userId: "123", action: "process" }, + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBe( + JSON.stringify({ userId: "123", action: "process" }) + ); + }); + + it("does not capture request body when captureBody is disabled", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const request = { + url: "https://example.com/api/process", + body: { userId: "123", action: "process" }, + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBeUndefined(); + }); + + it("truncates long request body based on maxBodyLength", async () => { + const client = createMockClient(); + instrumentUpstash(client, { captureBody: true, maxBodyLength: 50 }); + + const longBody = { data: "x".repeat(100) }; + const request = { + url: "https://example.com/api/process", + body: longBody, + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + const capturedBody = span.attributes[SEMATTRS_QSTASH_REQUEST_BODY] as string; + expect(capturedBody).toBeDefined(); + expect(capturedBody.length).toBe(50 + "... (truncated)".length); + expect(capturedBody).toContain("... (truncated)"); + }); + + it("handles string body in request", async () => { + const client = createMockClient(); + instrumentUpstash(client, { captureBody: true }); + + const request = { + url: "https://example.com/api/process", + body: "plain text body", + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBe("plain text body"); + }); }); describe("instrumentConsumer", () => { @@ -572,4 +668,120 @@ describe("instrumentConsumer", () => { expect(span.name).toBe("qstash.messages.receive"); expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_wrapped"); }); + + it("captures request body when captureBody is enabled", async () => { + const handler = vi.fn(async (req: Request) => { + return Response.json({ success: true }); + }); + + const instrumentedHandler = instrumentConsumer(handler, { captureBody: true }); + + const mockHeaders = new Headers({ + "content-type": "application/json", + "upstash-message-id": "msg_body_test", + }); + + const requestBody = JSON.stringify({ imageId: "123", action: "process" }); + const request = { + headers: mockHeaders, + clone: () => ({ + text: vi.fn(async () => requestBody), + }), + json: vi.fn(async () => JSON.parse(requestBody)), + } as unknown as Request; + + await instrumentedHandler(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBe(requestBody); + }); + + it("captures response body when captureBody is enabled", async () => { + const responseBody = JSON.stringify({ success: true, messageId: "msg_456" }); + const handler = vi.fn(async () => { + const response = new Response(responseBody, { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + return response; + }); + + const instrumentedHandler = instrumentConsumer(handler, { captureBody: true }); + + const request = createMockRequest({ + "upstash-message-id": "msg_response_test", + }); + + await instrumentedHandler(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_RESPONSE_BODY]).toBe(responseBody); + }); + + it("does not capture bodies when captureBody is disabled", async () => { + const handler = vi.fn(async () => Response.json({ success: true })); + const instrumentedHandler = instrumentConsumer(handler); + + const request = createMockRequest({ + "upstash-message-id": "msg_no_capture", + }); + + await instrumentedHandler(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_REQUEST_BODY]).toBeUndefined(); + expect(span.attributes[SEMATTRS_QSTASH_RESPONSE_BODY]).toBeUndefined(); + }); + + it("truncates long response body based on maxBodyLength", async () => { + const longResponse = JSON.stringify({ data: "y".repeat(200) }); + const handler = vi.fn(async () => { + return new Response(longResponse, { status: 200 }); + }); + + const instrumentedHandler = instrumentConsumer(handler, { + captureBody: true, + maxBodyLength: 50, + }); + + const request = createMockRequest({ + "upstash-message-id": "msg_truncate", + }); + + await instrumentedHandler(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + const capturedBody = span.attributes[SEMATTRS_QSTASH_RESPONSE_BODY] as string; + expect(capturedBody).toBeDefined(); + expect(capturedBody.length).toBe(50 + "... (truncated)".length); + expect(capturedBody).toContain("... (truncated)"); + }); }); \ No newline at end of file diff --git a/packages/otel-upstash-queues/src/index.ts b/packages/otel-upstash-queues/src/index.ts index d53649b..abd1b67 100644 --- a/packages/otel-upstash-queues/src/index.ts +++ b/packages/otel-upstash-queues/src/index.ts @@ -31,11 +31,41 @@ export const SEMATTRS_QSTASH_SCHEDULE_ID = "qstash.schedule_id" as const; export const SEMATTRS_QSTASH_CALLER_IP = "qstash.caller_ip" as const; export const SEMATTRS_HTTP_STATUS_CODE = "http.status_code" as const; +// Body/payload attributes +export const SEMATTRS_QSTASH_REQUEST_BODY = "qstash.request.body" as const; +export const SEMATTRS_QSTASH_RESPONSE_BODY = "qstash.response.body" as const; + +export interface InstrumentationConfig { + /** + * Whether to capture request/response bodies in spans. + * @default false + */ + captureBody?: boolean; + + /** + * Maximum length of the body to capture. Bodies longer than this will be truncated. + * @default 1024 + */ + maxBodyLength?: number; +} + interface InstrumentedClient extends Client { [INSTRUMENTED_FLAG]?: true; } -function annotatePublishSpan(span: Span, request: PublishRequest): void { +function serializeBody(body: unknown, maxLength: number): string { + try { + const serialized = typeof body === "string" ? body : JSON.stringify(body); + if (serialized.length > maxLength) { + return serialized.substring(0, maxLength) + "... (truncated)"; + } + return serialized; + } catch (error) { + return "[Unable to serialize body]"; + } +} + +function annotatePublishSpan(span: Span, request: PublishRequest, config?: InstrumentationConfig): void { // Set base attributes span.setAttributes({ [SEMATTRS_MESSAGING_SYSTEM]: "qstash", @@ -86,6 +116,13 @@ function annotatePublishSpan(span: Span, request: PublishRequest): void if (request.failureCallback) { span.setAttribute(SEMATTRS_QSTASH_FAILURE_CALLBACK_URL, request.failureCallback); } + + // Capture request body if enabled + if (config?.captureBody && request.body !== undefined) { + const maxLength = config.maxBodyLength ?? 1024; + const bodyString = serializeBody(request.body, maxLength); + span.setAttribute(SEMATTRS_QSTASH_REQUEST_BODY, bodyString); + } } function annotatePublishResponse( @@ -111,7 +148,7 @@ function finalizeSpan(span: Span, error?: unknown): void { span.end(); } -export function instrumentUpstash(client: Client): Client { +export function instrumentUpstash(client: Client, config?: InstrumentationConfig): Client { // Check if already instrumented if ((client as InstrumentedClient)[INSTRUMENTED_FLAG]) { return client; @@ -130,7 +167,7 @@ export function instrumentUpstash(client: Client): Client { }); // Annotate span with request details - annotatePublishSpan(span, request as PublishRequest); + annotatePublishSpan(span, request as PublishRequest, config); // Set the span as active const activeContext = trace.setSpan(context.active(), span); @@ -200,7 +237,7 @@ function extractQStashHeaders(request: Request): Record return attributes; } -export function instrumentConsumer(handler: RouteHandler): RouteHandler { +export function instrumentConsumer(handler: RouteHandler, config?: InstrumentationConfig): RouteHandler { const tracer = trace.getTracer(DEFAULT_TRACER_NAME); return async function instrumentedConsumer(request: Request): Promise { @@ -220,6 +257,21 @@ export function instrumentConsumer(handler: RouteHandler): RouteHandler { const qstashHeaders = extractQStashHeaders(request); span.setAttributes(qstashHeaders); + // Capture request body if enabled + if (config?.captureBody) { + try { + const clonedRequest = request.clone(); + const body = await clonedRequest.text(); + if (body) { + const maxLength = config.maxBodyLength ?? 1024; + const bodyString = serializeBody(body, maxLength); + span.setAttribute(SEMATTRS_QSTASH_REQUEST_BODY, bodyString); + } + } catch (error) { + // Ignore errors when capturing request body + } + } + // Set the span as active const activeContext = trace.setSpan(context.active(), span); @@ -230,6 +282,21 @@ export function instrumentConsumer(handler: RouteHandler): RouteHandler { // Capture response status span.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status); + // Capture response body if enabled + if (config?.captureBody) { + try { + const clonedResponse = response.clone(); + const responseBody = await clonedResponse.text(); + if (responseBody) { + const maxLength = config.maxBodyLength ?? 1024; + const bodyString = serializeBody(responseBody, maxLength); + span.setAttribute(SEMATTRS_QSTASH_RESPONSE_BODY, bodyString); + } + } catch (error) { + // Ignore errors when capturing response body + } + } + // Mark as successful if status is 2xx if (response.status >= 200 && response.status < 300) { finalizeSpan(span);