diff --git a/packages/otel-upstash/CHANGELOG.md b/packages/otel-upstash/CHANGELOG.md new file mode 100644 index 0000000..c042834 --- /dev/null +++ b/packages/otel-upstash/CHANGELOG.md @@ -0,0 +1,17 @@ +# @kubiks/otel-upstash + +## 1.0.0 + +### Major Changes + +- Initial release of OpenTelemetry instrumentation for Upstash QStash +- Instrumentation for `publishJSON` method +- Support for all QStash request parameters including: + - URL targeting + - Delayed and scheduled messages + - Deduplication + - Retries configuration + - Callback URLs + - Custom HTTP methods +- Comprehensive test coverage +- Full TypeScript support with proper types from @upstash/qstash SDK \ No newline at end of file diff --git a/packages/otel-upstash/LICENSE b/packages/otel-upstash/LICENSE new file mode 100644 index 0000000..f374d9a --- /dev/null +++ b/packages/otel-upstash/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Kubiks + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/packages/otel-upstash/README.md b/packages/otel-upstash/README.md new file mode 100644 index 0000000..916a27d --- /dev/null +++ b/packages/otel-upstash/README.md @@ -0,0 +1,189 @@ +# @kubiks/otel-upstash + +OpenTelemetry instrumentation for the [Upstash QStash](https://upstash.com/docs/qstash) Node.js SDK. +Capture spans for every QStash API call, enrich them with operation metadata, +and keep an eye on message queue operations from your traces. + +## Installation + +```bash +npm install @kubiks/otel-upstash +# or +pnpm add @kubiks/otel-upstash +``` + +**Peer Dependencies:** `@opentelemetry/api` >= 1.9.0, `@upstash/qstash` >= 2.0.0 + +## Quick Start + +```ts +import { Client } from "@upstash/qstash"; +import { instrumentUpstash } from "@kubiks/otel-upstash"; + +const client = instrumentUpstash( + new Client({ token: process.env.QSTASH_TOKEN! }) +); + +await client.publishJSON({ + url: "https://your-api-endpoint.com/process-image", + body: { imageId: "123" }, +}); +``` + +`instrumentUpstash` wraps the QStash client instance you already use — no configuration changes +needed. Every SDK call creates a client span with useful attributes. + +## What Gets Traced + +This instrumentation specifically wraps the `client.publishJSON` method, creating a single clean span for each message publish operation. + +## Span Attributes + +Each span includes: + +| Attribute | Description | Example | +| ------------------------------ | ------------------------------------------- | -------------------------------------------- | +| `messaging.system` | Constant value `qstash` | `qstash` | +| `messaging.operation` | Operation type | `publish` | +| `qstash.resource` | Resource name | `messages` | +| `qstash.target` | Full operation target | `messages.publish` | +| `qstash.url` | Target URL for the message | `https://example.com/api/process` | +| `qstash.method` | HTTP method (default: POST) | `POST`, `PUT`, `GET` | +| `qstash.message_id` | Message ID returned by QStash | `msg_123` | +| `qstash.delay` | Delay before processing (seconds or string) | `60` or `"1h"` | +| `qstash.not_before` | Unix timestamp for earliest processing | `1672531200` | +| `qstash.deduplication_id` | Deduplication ID for idempotent operations | `unique-id-123` | +| `qstash.retries` | Number of retry attempts | `3` | +| `qstash.callback_url` | Success callback URL | `https://example.com/callback` | +| `qstash.failure_callback_url` | Failure callback URL | `https://example.com/failure` | + +The instrumentation captures message metadata and configuration to help with debugging and monitoring, while avoiding sensitive message content. + +## Usage Examples + +### Basic Message Publishing + +```ts +import { Client } from "@upstash/qstash"; +import { instrumentUpstash } from "@kubiks/otel-upstash"; + +const client = instrumentUpstash( + new Client({ token: process.env.QSTASH_TOKEN! }) +); + +// Publish a message +await client.publishJSON({ + url: "https://your-api.com/webhook", + body: { + userId: "user_123", + action: "process_data", + }, +}); +``` + +### Delayed Message Publishing + +```ts +// Delay message processing by 60 seconds +await client.publishJSON({ + url: "https://your-api.com/delayed-task", + body: { taskId: "task_456" }, + delay: 60, +}); + +// Or use a human-readable delay +await client.publishJSON({ + url: "https://your-api.com/delayed-task", + body: { taskId: "task_789" }, + delay: "1h", // 1 hour +}); +``` + +### Message with Callbacks + +```ts +await client.publishJSON({ + url: "https://your-api.com/process", + body: { orderId: "order_123" }, + callback: "https://your-api.com/success", + failureCallback: "https://your-api.com/failure", +}); +``` + +### Message with Retries and Deduplication + +```ts +await client.publishJSON({ + url: "https://your-api.com/critical-task", + body: { taskId: "critical_123" }, + retries: 5, + deduplicationId: "task-critical-123", +}); +``` + +### Scheduled Message + +```ts +// Schedule for a specific time +const scheduledTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now + +await client.publishJSON({ + url: "https://your-api.com/scheduled-task", + body: { reportId: "report_456" }, + notBefore: scheduledTime, +}); +``` + +### Next.js Integration Example + +```ts +// app/actions.ts +"use server"; +import { Client } from "@upstash/qstash"; +import { instrumentUpstash } from "@kubiks/otel-upstash"; + +const qstashClient = instrumentUpstash( + new Client({ + token: process.env.QSTASH_TOKEN!, + }) +); + +export async function startBackgroundJob() { + await qstashClient.publishJSON({ + url: "https://your-app.vercel.app/api/process", + body: { + userId: "user_123", + timestamp: Date.now(), + }, + }); +} +``` + +## How It Works + +The instrumentation creates OpenTelemetry spans for QStash operations by: + +1. Wrapping the `publishJSON` method of the QStash client +2. Creating a span before the operation starts +3. Adding relevant attributes from the request parameters +4. Capturing the message ID from the response +5. Recording any errors that occur +6. Properly closing the span with success or error status + +All of this happens automatically once you wrap your client with `instrumentUpstash()`. + +## Best Practices + +1. **Instrument Early**: Call `instrumentUpstash()` when you create your QStash client, typically at application startup. + +2. **Reuse the Client**: Create one instrumented client and reuse it throughout your application. + +3. **Use Deduplication IDs**: For idempotent operations, always provide a `deduplicationId` to prevent duplicate processing. + +4. **Monitor Traces**: Use OpenTelemetry-compatible tracing backends (like Jaeger, Zipkin, or cloud providers) to visualize your message queues. + +5. **Set Appropriate Retries**: Configure retry counts based on the criticality and nature of your tasks. + +## License + +MIT \ No newline at end of file diff --git a/packages/otel-upstash/package.json b/packages/otel-upstash/package.json new file mode 100644 index 0000000..e915858 --- /dev/null +++ b/packages/otel-upstash/package.json @@ -0,0 +1,53 @@ +{ + "name": "@kubiks/otel-upstash", + "version": "1.0.0", + "private": false, + "publishConfig": { + "access": "public" + }, + "description": "OpenTelemetry instrumentation for the Upstash QStash Node.js SDK", + "author": "Kubiks", + "license": "MIT", + "repository": "kubiks-inc/otel", + "sideEffects": false, + "type": "module", + "exports": { + ".": { + "types": "./dist/types/index.d.ts", + "import": "./dist/index.js", + "default": "./dist/index.js" + } + }, + "main": "./dist/index.js", + "types": "./dist/types/index.d.ts", + "files": [ + "dist", + "LICENSE", + "README.md" + ], + "engines": { + "node": "^18.19.0 || >=20.6.0" + }, + "scripts": { + "build": "pnpm clean && tsc", + "clean": "rimraf dist", + "prepublishOnly": "pnpm build", + "type-check": "tsc --noEmit", + "unit-test": "vitest --run", + "unit-test-watch": "vitest" + }, + "dependencies": {}, + "devDependencies": { + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/sdk-trace-base": "^2.1.0", + "@types/node": "18.15.11", + "@upstash/qstash": "^2.0.0", + "rimraf": "3.0.2", + "typescript": "^5", + "vitest": "0.33.0" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.9.0 <2.0.0", + "@upstash/qstash": ">=2.0.0" + } +} \ No newline at end of file diff --git a/packages/otel-upstash/src/index.test.ts b/packages/otel-upstash/src/index.test.ts new file mode 100644 index 0000000..e848096 --- /dev/null +++ b/packages/otel-upstash/src/index.test.ts @@ -0,0 +1,352 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { SpanStatusCode, trace } from "@opentelemetry/api"; +import { + BasicTracerProvider, + InMemorySpanExporter, + SimpleSpanProcessor, +} from "@opentelemetry/sdk-trace-base"; +import type { Client } from "@upstash/qstash"; +import { + instrumentUpstash, + SEMATTRS_MESSAGING_OPERATION, + SEMATTRS_MESSAGING_SYSTEM, + SEMATTRS_QSTASH_CALLBACK_URL, + SEMATTRS_QSTASH_DEDUPLICATION_ID, + SEMATTRS_QSTASH_DELAY, + SEMATTRS_QSTASH_FAILURE_CALLBACK_URL, + SEMATTRS_QSTASH_MESSAGE_ID, + SEMATTRS_QSTASH_METHOD, + SEMATTRS_QSTASH_NOT_BEFORE, + SEMATTRS_QSTASH_RESOURCE, + SEMATTRS_QSTASH_RETRIES, + SEMATTRS_QSTASH_TARGET, + SEMATTRS_QSTASH_URL, +} from "./index"; + +describe("instrumentUpstash", () => { + let provider: BasicTracerProvider; + let exporter: InMemorySpanExporter; + + beforeEach(() => { + exporter = new InMemorySpanExporter(); + provider = new BasicTracerProvider({ + spanProcessors: [new SimpleSpanProcessor(exporter)], + }); + trace.setGlobalTracerProvider(provider); + }); + + afterEach(async () => { + await provider.shutdown(); + exporter.reset(); + trace.disable(); + }); + + const createMockClient = (): Client => { + const mockClient = { + publishJSON: vi.fn(async (request: any) => ({ + messageId: "msg_123", + })), + } as unknown as Client; + + return mockClient; + }; + + it("wraps publishJSON and records spans", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const request = { + url: "https://example.com/api/process", + body: { hello: "world" }, + }; + + const response = await client.publishJSON(request); + expect(response.messageId).toBe("msg_123"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.name).toBe("qstash.messages.publish"); + expect(span.attributes[SEMATTRS_MESSAGING_SYSTEM]).toBe("qstash"); + expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("publish"); + expect(span.attributes[SEMATTRS_QSTASH_RESOURCE]).toBe("messages"); + expect(span.attributes[SEMATTRS_QSTASH_TARGET]).toBe("messages.publish"); + expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/process"); + expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("POST"); + expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123"); + expect(span.status.code).toBe(SpanStatusCode.OK); + }); + + it("captures request with delay", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const request = { + url: "https://example.com/api/delayed", + body: { task: "process" }, + delay: 60, + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_DELAY]).toBe(60); + expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/delayed"); + }); + + it("captures request with delay as string", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const request = { + url: "https://example.com/api/delayed", + body: { task: "process" }, + delay: "1h", + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_DELAY]).toBe("1h"); + }); + + it("captures request with notBefore", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const notBefore = Math.floor(Date.now() / 1000) + 3600; + const request = { + url: "https://example.com/api/scheduled", + body: { task: "process" }, + notBefore, + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_NOT_BEFORE]).toBe(notBefore); + }); + + it("captures request with deduplication ID", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const request = { + url: "https://example.com/api/process", + body: { data: "test" }, + deduplicationId: "unique-id-123", + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_DEDUPLICATION_ID]).toBe("unique-id-123"); + }); + + it("captures request with custom method", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const request = { + url: "https://example.com/api/process", + body: { data: "test" }, + method: "PUT" as const, + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("PUT"); + }); + + + it("captures request with retries", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const request = { + url: "https://example.com/api/process", + body: { data: "test" }, + retries: 3, + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_RETRIES]).toBe(3); + }); + + it("captures request with callback URLs", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const request = { + url: "https://example.com/api/process", + body: { data: "test" }, + callback: "https://example.com/api/callback", + failureCallback: "https://example.com/api/failure", + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_CALLBACK_URL]).toBe("https://example.com/api/callback"); + expect(span.attributes[SEMATTRS_QSTASH_FAILURE_CALLBACK_URL]).toBe("https://example.com/api/failure"); + }); + + it("captures all request properties together", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const request = { + url: "https://example.com/api/complex", + body: { data: "complex" }, + method: "POST", + delay: 120, + deduplicationId: "complex-id-456", + contentType: "application/json", + retries: 5, + callback: "https://example.com/callback", + failureCallback: "https://example.com/failure", + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/complex"); + expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("POST"); + expect(span.attributes[SEMATTRS_QSTASH_DELAY]).toBe(120); + expect(span.attributes[SEMATTRS_QSTASH_DEDUPLICATION_ID]).toBe("complex-id-456"); + expect(span.attributes[SEMATTRS_QSTASH_RETRIES]).toBe(5); + expect(span.attributes[SEMATTRS_QSTASH_CALLBACK_URL]).toBe("https://example.com/callback"); + expect(span.attributes[SEMATTRS_QSTASH_FAILURE_CALLBACK_URL]).toBe("https://example.com/failure"); + expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123"); + }); + + it("captures errors and marks span status", async () => { + const client = createMockClient(); + client.publishJSON = vi.fn().mockRejectedValue(new Error("Network error")); + + instrumentUpstash(client); + + await expect(async () => + client.publishJSON({ + url: "https://example.com/api/fail", + body: { test: "error" }, + }) + ).rejects.toThrowError("Network error"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.status.code).toBe(SpanStatusCode.ERROR); + const hasException = span.events.some((event) => event.name === "exception"); + expect(hasException).toBe(true); + }); + + it("is idempotent", async () => { + const client = createMockClient(); + const first = instrumentUpstash(client); + const second = instrumentUpstash(first); + + expect(first).toBe(second); + + await second.publishJSON({ + url: "https://example.com/api/test", + body: { test: "idempotent" }, + }); + + expect(exporter.getFinishedSpans()).toHaveLength(1); + }); + + it("handles minimal request", async () => { + const client = createMockClient(); + instrumentUpstash(client); + + const request = { + url: "https://example.com/api/minimal", + body: { minimal: true }, + }; + + await client.publishJSON(request); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a span to be recorded"); + } + + expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/minimal"); + expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("POST"); + expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123"); + expect(span.status.code).toBe(SpanStatusCode.OK); + }); +}); \ No newline at end of file diff --git a/packages/otel-upstash/src/index.ts b/packages/otel-upstash/src/index.ts new file mode 100644 index 0000000..58d3828 --- /dev/null +++ b/packages/otel-upstash/src/index.ts @@ -0,0 +1,159 @@ +import { + context, + SpanKind, + SpanStatusCode, + trace, + type Span, +} from "@opentelemetry/api"; +import type { Client, PublishRequest, PublishResponse } from "@upstash/qstash"; + +const DEFAULT_TRACER_NAME = "@kubiks/otel-upstash"; +const INSTRUMENTED_FLAG = Symbol("kubiksOtelUpstashInstrumented"); + +// Semantic attribute constants +export const SEMATTRS_MESSAGING_SYSTEM = "messaging.system" as const; +export const SEMATTRS_MESSAGING_OPERATION = "messaging.operation" as const; +export const SEMATTRS_QSTASH_RESOURCE = "qstash.resource" as const; +export const SEMATTRS_QSTASH_TARGET = "qstash.target" as const; +export const SEMATTRS_QSTASH_MESSAGE_ID = "qstash.message_id" as const; +export const SEMATTRS_QSTASH_URL = "qstash.url" as const; +export const SEMATTRS_QSTASH_METHOD = "qstash.method" as const; +export const SEMATTRS_QSTASH_DELAY = "qstash.delay" as const; +export const SEMATTRS_QSTASH_NOT_BEFORE = "qstash.not_before" as const; +export const SEMATTRS_QSTASH_DEDUPLICATION_ID = "qstash.deduplication_id" as const; +export const SEMATTRS_QSTASH_RETRIES = "qstash.retries" as const; +export const SEMATTRS_QSTASH_CALLBACK_URL = "qstash.callback_url" as const; +export const SEMATTRS_QSTASH_FAILURE_CALLBACK_URL = "qstash.failure_callback_url" as const; + +interface InstrumentedClient extends Client { + [INSTRUMENTED_FLAG]?: true; +} + +function annotatePublishSpan(span: Span, request: PublishRequest): void { + // Set base attributes + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "qstash", + [SEMATTRS_MESSAGING_OPERATION]: "publish", + [SEMATTRS_QSTASH_RESOURCE]: "messages", + [SEMATTRS_QSTASH_TARGET]: "messages.publish", + }); + + // Set URL + if (request.url) { + span.setAttribute(SEMATTRS_QSTASH_URL, request.url); + } + + // Set HTTP method (default is POST) + const method = request.method || "POST"; + span.setAttribute(SEMATTRS_QSTASH_METHOD, method); + + // Set delay if present + if (typeof request.delay !== "undefined") { + if (typeof request.delay === "number") { + span.setAttribute(SEMATTRS_QSTASH_DELAY, request.delay); + } else if (typeof request.delay === "string") { + span.setAttribute(SEMATTRS_QSTASH_DELAY, request.delay); + } + } + + // Set notBefore if present + if (typeof request.notBefore !== "undefined") { + span.setAttribute(SEMATTRS_QSTASH_NOT_BEFORE, request.notBefore); + } + + // Set deduplication ID if present + if (request.deduplicationId) { + span.setAttribute(SEMATTRS_QSTASH_DEDUPLICATION_ID, request.deduplicationId); + } + + // Set retries if present + if (typeof request.retries !== "undefined") { + span.setAttribute(SEMATTRS_QSTASH_RETRIES, request.retries); + } + + // Set callback URL if present + if (request.callback) { + span.setAttribute(SEMATTRS_QSTASH_CALLBACK_URL, request.callback); + } + + // Set failure callback URL if present + if (request.failureCallback) { + span.setAttribute(SEMATTRS_QSTASH_FAILURE_CALLBACK_URL, request.failureCallback); + } +} + +function annotatePublishResponse( + span: Span, + response: { messageId?: string }, +): void { + if (response && typeof response === "object" && "messageId" in response && response.messageId) { + span.setAttribute(SEMATTRS_QSTASH_MESSAGE_ID, response.messageId); + } +} + +function finalizeSpan(span: Span, error?: unknown): void { + if (error) { + if (error instanceof Error) { + span.recordException(error); + } else { + span.recordException(new Error(String(error))); + } + span.setStatus({ code: SpanStatusCode.ERROR }); + } else { + span.setStatus({ code: SpanStatusCode.OK }); + } + span.end(); +} + +export function instrumentUpstash(client: Client): Client { + // Check if already instrumented + if ((client as InstrumentedClient)[INSTRUMENTED_FLAG]) { + return client; + } + + const tracer = trace.getTracer(DEFAULT_TRACER_NAME); + + // Instrument publishJSON method + const originalPublishJSON = client.publishJSON.bind(client); + + const instrumentedPublishJSON = async function instrumentedPublishJSON = PublishRequest>( + request: TRequest, + ): Promise> { + const span = tracer.startSpan("qstash.messages.publish", { + kind: SpanKind.CLIENT, + }); + + // Annotate span with request details + annotatePublishSpan(span, request as PublishRequest); + + // Set the span as active + const activeContext = trace.setSpan(context.active(), span); + + try { + // Call the original method within the active context + const response = await context.with(activeContext, () => + originalPublishJSON(request), + ); + + // Annotate with response data + annotatePublishResponse(span, response); + + // Mark as successful + finalizeSpan(span); + + return response; + } catch (error) { + // Mark as failed + finalizeSpan(span, error); + throw error; + } + }; + + // Replace the method with our instrumented version + client.publishJSON = instrumentedPublishJSON; + + // Mark as instrumented + (client as InstrumentedClient)[INSTRUMENTED_FLAG] = true; + + return client; +} \ No newline at end of file diff --git a/packages/otel-upstash/tsconfig.json b/packages/otel-upstash/tsconfig.json new file mode 100644 index 0000000..ea956eb --- /dev/null +++ b/packages/otel-upstash/tsconfig.json @@ -0,0 +1,21 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "ESNext", + "moduleResolution": "bundler", + "lib": ["ES2020", "DOM"], + "outDir": "dist", + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "strict": false, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "declarationDir": "dist/types", + "stripInternal": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist", "**/*.test.ts"] +} \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 13d67ba..91ca3a7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -102,6 +102,30 @@ importers: specifier: 0.33.0 version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0) + packages/otel-upstash: + devDependencies: + '@opentelemetry/api': + specifier: ^1.9.0 + version: 1.9.0 + '@opentelemetry/sdk-trace-base': + specifier: ^2.1.0 + version: 2.1.0(@opentelemetry/api@1.9.0) + '@types/node': + specifier: 18.15.11 + version: 18.15.11 + '@upstash/qstash': + specifier: ^2.0.0 + version: 2.8.3 + rimraf: + specifier: 3.0.2 + version: 3.0.2 + typescript: + specifier: ^5 + version: 5.3.3 + vitest: + specifier: 0.33.0 + version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0) + packages: '@adobe/css-tools@4.3.2': @@ -536,6 +560,9 @@ packages: '@types/semver@7.5.6': resolution: {integrity: sha512-dn1l8LaMea/IjDoHNd9J52uBbInB796CDffS6VdIxvqYCPSG0V0DzHp76GpaWnlhg88uYyPbXCDIowa86ybd5A==} + '@upstash/qstash@2.8.3': + resolution: {integrity: sha512-SHf1mCGqZur0UTzXVx33phtFXIuLyjwDL1QsBE36gQFEx3rEG4fJc3qA2eD7jTUXEAYYrNkCQxMOtcteHFpwqw==} + '@vitest/expect@0.33.0': resolution: {integrity: sha512-sVNf+Gla3mhTCxNJx+wJLDPp/WcstOe0Ksqz4Vec51MmgMth/ia0MGFEkIZmVGeTL5HtjYR4Wl/ZxBxBXZJTzQ==} @@ -770,6 +797,9 @@ packages: resolution: {integrity: sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==} engines: {node: '>= 8'} + crypto-js@4.2.0: + resolution: {integrity: sha512-KALDyEYgpY+Rlob/iriUtjV6d5Eq+Y191A5g4UqLAi8CyGP9N1+FdVbkc1SxKc2r4YAYqG8JzO2KGL+AizD70Q==} + csstype@3.1.3: resolution: {integrity: sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw==} @@ -1302,6 +1332,9 @@ packages: jackspeak@3.4.3: resolution: {integrity: sha512-OGlZQpz2yfahA/Rd1Y8Cd9SIEsqvXkLVoSw/cgwhnhFMDbsQFeZYoJJ7bIZBS9BcamUW96asq/npPWugM+RQBw==} + jose@5.10.0: + resolution: {integrity: sha512-s+3Al/p9g32Iq+oqXxkW//7jk2Vig6FF1CFqzVXoTUXt2qz89YWbL+OwS17NFYEvxC35n0FKeGO2LGYSxeM2Gg==} + jose@6.1.0: resolution: {integrity: sha512-TTQJyoEoKcC1lscpVDCSsVgYzUDg/0Bt3WE//WiTPK6uOCQC2KZS4MpugbMWt/zyjkopgZoXhZuCi00gLudfUA==} @@ -1469,6 +1502,10 @@ packages: engines: {node: '>= 4.4.x'} hasBin: true + neverthrow@7.2.0: + resolution: {integrity: sha512-iGBUfFB7yPczHHtA8dksKTJ9E8TESNTAx1UQWW6TzMF280vo9jdPYpLUXrMN1BCkPdHFdNG3fxOt2CUad8KhAw==} + engines: {node: '>=18'} + node-fetch@2.7.0: resolution: {integrity: sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==} engines: {node: 4.x || >=6.0.0} @@ -2781,6 +2818,12 @@ snapshots: '@types/semver@7.5.6': {} + '@upstash/qstash@2.8.3': + dependencies: + crypto-js: 4.2.0 + jose: 5.10.0 + neverthrow: 7.2.0 + '@vitest/expect@0.33.0': dependencies: '@vitest/spy': 0.33.0 @@ -3039,6 +3082,8 @@ snapshots: shebang-command: 2.0.0 which: 2.0.2 + crypto-js@4.2.0: {} + csstype@3.1.3: optional: true @@ -3555,6 +3600,8 @@ snapshots: optionalDependencies: '@pkgjs/parseargs': 0.11.0 + jose@5.10.0: {} + jose@6.1.0: {} js-beautify@1.15.4: @@ -3726,6 +3773,8 @@ snapshots: sax: 1.3.0 optional: true + neverthrow@7.2.0: {} + node-fetch@2.7.0: dependencies: whatwg-url: 5.0.0