mirror of
https://github.com/zoriya/drizzle-otel.git
synced 2025-12-06 00:46:09 +00:00
upstash telemetry package
This commit is contained in:
17
packages/otel-upstash/CHANGELOG.md
Normal file
17
packages/otel-upstash/CHANGELOG.md
Normal file
@@ -0,0 +1,17 @@
|
||||
# @kubiks/otel-upstash
|
||||
|
||||
## 1.0.0
|
||||
|
||||
### Major Changes
|
||||
|
||||
- Initial release of OpenTelemetry instrumentation for Upstash QStash
|
||||
- Instrumentation for `publishJSON` method
|
||||
- Support for all QStash request parameters including:
|
||||
- URL targeting
|
||||
- Delayed and scheduled messages
|
||||
- Deduplication
|
||||
- Retries configuration
|
||||
- Callback URLs
|
||||
- Custom HTTP methods
|
||||
- Comprehensive test coverage
|
||||
- Full TypeScript support with proper types from @upstash/qstash SDK
|
||||
21
packages/otel-upstash/LICENSE
Normal file
21
packages/otel-upstash/LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Kubiks
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
189
packages/otel-upstash/README.md
Normal file
189
packages/otel-upstash/README.md
Normal file
@@ -0,0 +1,189 @@
|
||||
# @kubiks/otel-upstash
|
||||
|
||||
OpenTelemetry instrumentation for the [Upstash QStash](https://upstash.com/docs/qstash) Node.js SDK.
|
||||
Capture spans for every QStash API call, enrich them with operation metadata,
|
||||
and keep an eye on message queue operations from your traces.
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
npm install @kubiks/otel-upstash
|
||||
# or
|
||||
pnpm add @kubiks/otel-upstash
|
||||
```
|
||||
|
||||
**Peer Dependencies:** `@opentelemetry/api` >= 1.9.0, `@upstash/qstash` >= 2.0.0
|
||||
|
||||
## Quick Start
|
||||
|
||||
```ts
|
||||
import { Client } from "@upstash/qstash";
|
||||
import { instrumentUpstash } from "@kubiks/otel-upstash";
|
||||
|
||||
const client = instrumentUpstash(
|
||||
new Client({ token: process.env.QSTASH_TOKEN! })
|
||||
);
|
||||
|
||||
await client.publishJSON({
|
||||
url: "https://your-api-endpoint.com/process-image",
|
||||
body: { imageId: "123" },
|
||||
});
|
||||
```
|
||||
|
||||
`instrumentUpstash` wraps the QStash client instance you already use — no configuration changes
|
||||
needed. Every SDK call creates a client span with useful attributes.
|
||||
|
||||
## What Gets Traced
|
||||
|
||||
This instrumentation specifically wraps the `client.publishJSON` method, creating a single clean span for each message publish operation.
|
||||
|
||||
## Span Attributes
|
||||
|
||||
Each span includes:
|
||||
|
||||
| Attribute | Description | Example |
|
||||
| ------------------------------ | ------------------------------------------- | -------------------------------------------- |
|
||||
| `messaging.system` | Constant value `qstash` | `qstash` |
|
||||
| `messaging.operation` | Operation type | `publish` |
|
||||
| `qstash.resource` | Resource name | `messages` |
|
||||
| `qstash.target` | Full operation target | `messages.publish` |
|
||||
| `qstash.url` | Target URL for the message | `https://example.com/api/process` |
|
||||
| `qstash.method` | HTTP method (default: POST) | `POST`, `PUT`, `GET` |
|
||||
| `qstash.message_id` | Message ID returned by QStash | `msg_123` |
|
||||
| `qstash.delay` | Delay before processing (seconds or string) | `60` or `"1h"` |
|
||||
| `qstash.not_before` | Unix timestamp for earliest processing | `1672531200` |
|
||||
| `qstash.deduplication_id` | Deduplication ID for idempotent operations | `unique-id-123` |
|
||||
| `qstash.retries` | Number of retry attempts | `3` |
|
||||
| `qstash.callback_url` | Success callback URL | `https://example.com/callback` |
|
||||
| `qstash.failure_callback_url` | Failure callback URL | `https://example.com/failure` |
|
||||
|
||||
The instrumentation captures message metadata and configuration to help with debugging and monitoring, while avoiding sensitive message content.
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Basic Message Publishing
|
||||
|
||||
```ts
|
||||
import { Client } from "@upstash/qstash";
|
||||
import { instrumentUpstash } from "@kubiks/otel-upstash";
|
||||
|
||||
const client = instrumentUpstash(
|
||||
new Client({ token: process.env.QSTASH_TOKEN! })
|
||||
);
|
||||
|
||||
// Publish a message
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/webhook",
|
||||
body: {
|
||||
userId: "user_123",
|
||||
action: "process_data",
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
### Delayed Message Publishing
|
||||
|
||||
```ts
|
||||
// Delay message processing by 60 seconds
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/delayed-task",
|
||||
body: { taskId: "task_456" },
|
||||
delay: 60,
|
||||
});
|
||||
|
||||
// Or use a human-readable delay
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/delayed-task",
|
||||
body: { taskId: "task_789" },
|
||||
delay: "1h", // 1 hour
|
||||
});
|
||||
```
|
||||
|
||||
### Message with Callbacks
|
||||
|
||||
```ts
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/process",
|
||||
body: { orderId: "order_123" },
|
||||
callback: "https://your-api.com/success",
|
||||
failureCallback: "https://your-api.com/failure",
|
||||
});
|
||||
```
|
||||
|
||||
### Message with Retries and Deduplication
|
||||
|
||||
```ts
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/critical-task",
|
||||
body: { taskId: "critical_123" },
|
||||
retries: 5,
|
||||
deduplicationId: "task-critical-123",
|
||||
});
|
||||
```
|
||||
|
||||
### Scheduled Message
|
||||
|
||||
```ts
|
||||
// Schedule for a specific time
|
||||
const scheduledTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now
|
||||
|
||||
await client.publishJSON({
|
||||
url: "https://your-api.com/scheduled-task",
|
||||
body: { reportId: "report_456" },
|
||||
notBefore: scheduledTime,
|
||||
});
|
||||
```
|
||||
|
||||
### Next.js Integration Example
|
||||
|
||||
```ts
|
||||
// app/actions.ts
|
||||
"use server";
|
||||
import { Client } from "@upstash/qstash";
|
||||
import { instrumentUpstash } from "@kubiks/otel-upstash";
|
||||
|
||||
const qstashClient = instrumentUpstash(
|
||||
new Client({
|
||||
token: process.env.QSTASH_TOKEN!,
|
||||
})
|
||||
);
|
||||
|
||||
export async function startBackgroundJob() {
|
||||
await qstashClient.publishJSON({
|
||||
url: "https://your-app.vercel.app/api/process",
|
||||
body: {
|
||||
userId: "user_123",
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
## How It Works
|
||||
|
||||
The instrumentation creates OpenTelemetry spans for QStash operations by:
|
||||
|
||||
1. Wrapping the `publishJSON` method of the QStash client
|
||||
2. Creating a span before the operation starts
|
||||
3. Adding relevant attributes from the request parameters
|
||||
4. Capturing the message ID from the response
|
||||
5. Recording any errors that occur
|
||||
6. Properly closing the span with success or error status
|
||||
|
||||
All of this happens automatically once you wrap your client with `instrumentUpstash()`.
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Instrument Early**: Call `instrumentUpstash()` when you create your QStash client, typically at application startup.
|
||||
|
||||
2. **Reuse the Client**: Create one instrumented client and reuse it throughout your application.
|
||||
|
||||
3. **Use Deduplication IDs**: For idempotent operations, always provide a `deduplicationId` to prevent duplicate processing.
|
||||
|
||||
4. **Monitor Traces**: Use OpenTelemetry-compatible tracing backends (like Jaeger, Zipkin, or cloud providers) to visualize your message queues.
|
||||
|
||||
5. **Set Appropriate Retries**: Configure retry counts based on the criticality and nature of your tasks.
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
53
packages/otel-upstash/package.json
Normal file
53
packages/otel-upstash/package.json
Normal file
@@ -0,0 +1,53 @@
|
||||
{
|
||||
"name": "@kubiks/otel-upstash",
|
||||
"version": "1.0.0",
|
||||
"private": false,
|
||||
"publishConfig": {
|
||||
"access": "public"
|
||||
},
|
||||
"description": "OpenTelemetry instrumentation for the Upstash QStash Node.js SDK",
|
||||
"author": "Kubiks",
|
||||
"license": "MIT",
|
||||
"repository": "kubiks-inc/otel",
|
||||
"sideEffects": false,
|
||||
"type": "module",
|
||||
"exports": {
|
||||
".": {
|
||||
"types": "./dist/types/index.d.ts",
|
||||
"import": "./dist/index.js",
|
||||
"default": "./dist/index.js"
|
||||
}
|
||||
},
|
||||
"main": "./dist/index.js",
|
||||
"types": "./dist/types/index.d.ts",
|
||||
"files": [
|
||||
"dist",
|
||||
"LICENSE",
|
||||
"README.md"
|
||||
],
|
||||
"engines": {
|
||||
"node": "^18.19.0 || >=20.6.0"
|
||||
},
|
||||
"scripts": {
|
||||
"build": "pnpm clean && tsc",
|
||||
"clean": "rimraf dist",
|
||||
"prepublishOnly": "pnpm build",
|
||||
"type-check": "tsc --noEmit",
|
||||
"unit-test": "vitest --run",
|
||||
"unit-test-watch": "vitest"
|
||||
},
|
||||
"dependencies": {},
|
||||
"devDependencies": {
|
||||
"@opentelemetry/api": "^1.9.0",
|
||||
"@opentelemetry/sdk-trace-base": "^2.1.0",
|
||||
"@types/node": "18.15.11",
|
||||
"@upstash/qstash": "^2.0.0",
|
||||
"rimraf": "3.0.2",
|
||||
"typescript": "^5",
|
||||
"vitest": "0.33.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@opentelemetry/api": ">=1.9.0 <2.0.0",
|
||||
"@upstash/qstash": ">=2.0.0"
|
||||
}
|
||||
}
|
||||
352
packages/otel-upstash/src/index.test.ts
Normal file
352
packages/otel-upstash/src/index.test.ts
Normal file
@@ -0,0 +1,352 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { SpanStatusCode, trace } from "@opentelemetry/api";
|
||||
import {
|
||||
BasicTracerProvider,
|
||||
InMemorySpanExporter,
|
||||
SimpleSpanProcessor,
|
||||
} from "@opentelemetry/sdk-trace-base";
|
||||
import type { Client } from "@upstash/qstash";
|
||||
import {
|
||||
instrumentUpstash,
|
||||
SEMATTRS_MESSAGING_OPERATION,
|
||||
SEMATTRS_MESSAGING_SYSTEM,
|
||||
SEMATTRS_QSTASH_CALLBACK_URL,
|
||||
SEMATTRS_QSTASH_DEDUPLICATION_ID,
|
||||
SEMATTRS_QSTASH_DELAY,
|
||||
SEMATTRS_QSTASH_FAILURE_CALLBACK_URL,
|
||||
SEMATTRS_QSTASH_MESSAGE_ID,
|
||||
SEMATTRS_QSTASH_METHOD,
|
||||
SEMATTRS_QSTASH_NOT_BEFORE,
|
||||
SEMATTRS_QSTASH_RESOURCE,
|
||||
SEMATTRS_QSTASH_RETRIES,
|
||||
SEMATTRS_QSTASH_TARGET,
|
||||
SEMATTRS_QSTASH_URL,
|
||||
} from "./index";
|
||||
|
||||
describe("instrumentUpstash", () => {
|
||||
let provider: BasicTracerProvider;
|
||||
let exporter: InMemorySpanExporter;
|
||||
|
||||
beforeEach(() => {
|
||||
exporter = new InMemorySpanExporter();
|
||||
provider = new BasicTracerProvider({
|
||||
spanProcessors: [new SimpleSpanProcessor(exporter)],
|
||||
});
|
||||
trace.setGlobalTracerProvider(provider);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await provider.shutdown();
|
||||
exporter.reset();
|
||||
trace.disable();
|
||||
});
|
||||
|
||||
const createMockClient = (): Client => {
|
||||
const mockClient = {
|
||||
publishJSON: vi.fn(async (request: any) => ({
|
||||
messageId: "msg_123",
|
||||
})),
|
||||
} as unknown as Client;
|
||||
|
||||
return mockClient;
|
||||
};
|
||||
|
||||
it("wraps publishJSON and records spans", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { hello: "world" },
|
||||
};
|
||||
|
||||
const response = await client.publishJSON(request);
|
||||
expect(response.messageId).toBe("msg_123");
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.name).toBe("qstash.messages.publish");
|
||||
expect(span.attributes[SEMATTRS_MESSAGING_SYSTEM]).toBe("qstash");
|
||||
expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("publish");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RESOURCE]).toBe("messages");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_TARGET]).toBe("messages.publish");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/process");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("POST");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123");
|
||||
expect(span.status.code).toBe(SpanStatusCode.OK);
|
||||
});
|
||||
|
||||
it("captures request with delay", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/delayed",
|
||||
body: { task: "process" },
|
||||
delay: 60,
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_DELAY]).toBe(60);
|
||||
expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/delayed");
|
||||
});
|
||||
|
||||
it("captures request with delay as string", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/delayed",
|
||||
body: { task: "process" },
|
||||
delay: "1h",
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_DELAY]).toBe("1h");
|
||||
});
|
||||
|
||||
it("captures request with notBefore", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const notBefore = Math.floor(Date.now() / 1000) + 3600;
|
||||
const request = {
|
||||
url: "https://example.com/api/scheduled",
|
||||
body: { task: "process" },
|
||||
notBefore,
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_NOT_BEFORE]).toBe(notBefore);
|
||||
});
|
||||
|
||||
it("captures request with deduplication ID", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { data: "test" },
|
||||
deduplicationId: "unique-id-123",
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_DEDUPLICATION_ID]).toBe("unique-id-123");
|
||||
});
|
||||
|
||||
it("captures request with custom method", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { data: "test" },
|
||||
method: "PUT" as const,
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("PUT");
|
||||
});
|
||||
|
||||
|
||||
it("captures request with retries", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { data: "test" },
|
||||
retries: 3,
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RETRIES]).toBe(3);
|
||||
});
|
||||
|
||||
it("captures request with callback URLs", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/process",
|
||||
body: { data: "test" },
|
||||
callback: "https://example.com/api/callback",
|
||||
failureCallback: "https://example.com/api/failure",
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_CALLBACK_URL]).toBe("https://example.com/api/callback");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_FAILURE_CALLBACK_URL]).toBe("https://example.com/api/failure");
|
||||
});
|
||||
|
||||
it("captures all request properties together", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/complex",
|
||||
body: { data: "complex" },
|
||||
method: "POST",
|
||||
delay: 120,
|
||||
deduplicationId: "complex-id-456",
|
||||
contentType: "application/json",
|
||||
retries: 5,
|
||||
callback: "https://example.com/callback",
|
||||
failureCallback: "https://example.com/failure",
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/complex");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("POST");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_DELAY]).toBe(120);
|
||||
expect(span.attributes[SEMATTRS_QSTASH_DEDUPLICATION_ID]).toBe("complex-id-456");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_RETRIES]).toBe(5);
|
||||
expect(span.attributes[SEMATTRS_QSTASH_CALLBACK_URL]).toBe("https://example.com/callback");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_FAILURE_CALLBACK_URL]).toBe("https://example.com/failure");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123");
|
||||
});
|
||||
|
||||
it("captures errors and marks span status", async () => {
|
||||
const client = createMockClient();
|
||||
client.publishJSON = vi.fn().mockRejectedValue(new Error("Network error"));
|
||||
|
||||
instrumentUpstash(client);
|
||||
|
||||
await expect(async () =>
|
||||
client.publishJSON({
|
||||
url: "https://example.com/api/fail",
|
||||
body: { test: "error" },
|
||||
})
|
||||
).rejects.toThrowError("Network error");
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.status.code).toBe(SpanStatusCode.ERROR);
|
||||
const hasException = span.events.some((event) => event.name === "exception");
|
||||
expect(hasException).toBe(true);
|
||||
});
|
||||
|
||||
it("is idempotent", async () => {
|
||||
const client = createMockClient();
|
||||
const first = instrumentUpstash(client);
|
||||
const second = instrumentUpstash(first);
|
||||
|
||||
expect(first).toBe(second);
|
||||
|
||||
await second.publishJSON({
|
||||
url: "https://example.com/api/test",
|
||||
body: { test: "idempotent" },
|
||||
});
|
||||
|
||||
expect(exporter.getFinishedSpans()).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("handles minimal request", async () => {
|
||||
const client = createMockClient();
|
||||
instrumentUpstash(client);
|
||||
|
||||
const request = {
|
||||
url: "https://example.com/api/minimal",
|
||||
body: { minimal: true },
|
||||
};
|
||||
|
||||
await client.publishJSON(request);
|
||||
|
||||
const spans = exporter.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
|
||||
const span = spans[0];
|
||||
if (!span) {
|
||||
throw new Error("Expected a span to be recorded");
|
||||
}
|
||||
|
||||
expect(span.attributes[SEMATTRS_QSTASH_URL]).toBe("https://example.com/api/minimal");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_METHOD]).toBe("POST");
|
||||
expect(span.attributes[SEMATTRS_QSTASH_MESSAGE_ID]).toBe("msg_123");
|
||||
expect(span.status.code).toBe(SpanStatusCode.OK);
|
||||
});
|
||||
});
|
||||
159
packages/otel-upstash/src/index.ts
Normal file
159
packages/otel-upstash/src/index.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
import {
|
||||
context,
|
||||
SpanKind,
|
||||
SpanStatusCode,
|
||||
trace,
|
||||
type Span,
|
||||
} from "@opentelemetry/api";
|
||||
import type { Client, PublishRequest, PublishResponse } from "@upstash/qstash";
|
||||
|
||||
const DEFAULT_TRACER_NAME = "@kubiks/otel-upstash";
|
||||
const INSTRUMENTED_FLAG = Symbol("kubiksOtelUpstashInstrumented");
|
||||
|
||||
// Semantic attribute constants
|
||||
export const SEMATTRS_MESSAGING_SYSTEM = "messaging.system" as const;
|
||||
export const SEMATTRS_MESSAGING_OPERATION = "messaging.operation" as const;
|
||||
export const SEMATTRS_QSTASH_RESOURCE = "qstash.resource" as const;
|
||||
export const SEMATTRS_QSTASH_TARGET = "qstash.target" as const;
|
||||
export const SEMATTRS_QSTASH_MESSAGE_ID = "qstash.message_id" as const;
|
||||
export const SEMATTRS_QSTASH_URL = "qstash.url" as const;
|
||||
export const SEMATTRS_QSTASH_METHOD = "qstash.method" as const;
|
||||
export const SEMATTRS_QSTASH_DELAY = "qstash.delay" as const;
|
||||
export const SEMATTRS_QSTASH_NOT_BEFORE = "qstash.not_before" as const;
|
||||
export const SEMATTRS_QSTASH_DEDUPLICATION_ID = "qstash.deduplication_id" as const;
|
||||
export const SEMATTRS_QSTASH_RETRIES = "qstash.retries" as const;
|
||||
export const SEMATTRS_QSTASH_CALLBACK_URL = "qstash.callback_url" as const;
|
||||
export const SEMATTRS_QSTASH_FAILURE_CALLBACK_URL = "qstash.failure_callback_url" as const;
|
||||
|
||||
interface InstrumentedClient extends Client {
|
||||
[INSTRUMENTED_FLAG]?: true;
|
||||
}
|
||||
|
||||
function annotatePublishSpan(span: Span, request: PublishRequest<string>): void {
|
||||
// Set base attributes
|
||||
span.setAttributes({
|
||||
[SEMATTRS_MESSAGING_SYSTEM]: "qstash",
|
||||
[SEMATTRS_MESSAGING_OPERATION]: "publish",
|
||||
[SEMATTRS_QSTASH_RESOURCE]: "messages",
|
||||
[SEMATTRS_QSTASH_TARGET]: "messages.publish",
|
||||
});
|
||||
|
||||
// Set URL
|
||||
if (request.url) {
|
||||
span.setAttribute(SEMATTRS_QSTASH_URL, request.url);
|
||||
}
|
||||
|
||||
// Set HTTP method (default is POST)
|
||||
const method = request.method || "POST";
|
||||
span.setAttribute(SEMATTRS_QSTASH_METHOD, method);
|
||||
|
||||
// Set delay if present
|
||||
if (typeof request.delay !== "undefined") {
|
||||
if (typeof request.delay === "number") {
|
||||
span.setAttribute(SEMATTRS_QSTASH_DELAY, request.delay);
|
||||
} else if (typeof request.delay === "string") {
|
||||
span.setAttribute(SEMATTRS_QSTASH_DELAY, request.delay);
|
||||
}
|
||||
}
|
||||
|
||||
// Set notBefore if present
|
||||
if (typeof request.notBefore !== "undefined") {
|
||||
span.setAttribute(SEMATTRS_QSTASH_NOT_BEFORE, request.notBefore);
|
||||
}
|
||||
|
||||
// Set deduplication ID if present
|
||||
if (request.deduplicationId) {
|
||||
span.setAttribute(SEMATTRS_QSTASH_DEDUPLICATION_ID, request.deduplicationId);
|
||||
}
|
||||
|
||||
// Set retries if present
|
||||
if (typeof request.retries !== "undefined") {
|
||||
span.setAttribute(SEMATTRS_QSTASH_RETRIES, request.retries);
|
||||
}
|
||||
|
||||
// Set callback URL if present
|
||||
if (request.callback) {
|
||||
span.setAttribute(SEMATTRS_QSTASH_CALLBACK_URL, request.callback);
|
||||
}
|
||||
|
||||
// Set failure callback URL if present
|
||||
if (request.failureCallback) {
|
||||
span.setAttribute(SEMATTRS_QSTASH_FAILURE_CALLBACK_URL, request.failureCallback);
|
||||
}
|
||||
}
|
||||
|
||||
function annotatePublishResponse(
|
||||
span: Span,
|
||||
response: { messageId?: string },
|
||||
): void {
|
||||
if (response && typeof response === "object" && "messageId" in response && response.messageId) {
|
||||
span.setAttribute(SEMATTRS_QSTASH_MESSAGE_ID, response.messageId);
|
||||
}
|
||||
}
|
||||
|
||||
function finalizeSpan(span: Span, error?: unknown): void {
|
||||
if (error) {
|
||||
if (error instanceof Error) {
|
||||
span.recordException(error);
|
||||
} else {
|
||||
span.recordException(new Error(String(error)));
|
||||
}
|
||||
span.setStatus({ code: SpanStatusCode.ERROR });
|
||||
} else {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
}
|
||||
span.end();
|
||||
}
|
||||
|
||||
export function instrumentUpstash(client: Client): Client {
|
||||
// Check if already instrumented
|
||||
if ((client as InstrumentedClient)[INSTRUMENTED_FLAG]) {
|
||||
return client;
|
||||
}
|
||||
|
||||
const tracer = trace.getTracer(DEFAULT_TRACER_NAME);
|
||||
|
||||
// Instrument publishJSON method
|
||||
const originalPublishJSON = client.publishJSON.bind(client);
|
||||
|
||||
const instrumentedPublishJSON = async function instrumentedPublishJSON<TBody = unknown, TRequest extends PublishRequest<TBody> = PublishRequest<TBody>>(
|
||||
request: TRequest,
|
||||
): Promise<PublishResponse<TRequest>> {
|
||||
const span = tracer.startSpan("qstash.messages.publish", {
|
||||
kind: SpanKind.CLIENT,
|
||||
});
|
||||
|
||||
// Annotate span with request details
|
||||
annotatePublishSpan(span, request as PublishRequest<string>);
|
||||
|
||||
// Set the span as active
|
||||
const activeContext = trace.setSpan(context.active(), span);
|
||||
|
||||
try {
|
||||
// Call the original method within the active context
|
||||
const response = await context.with(activeContext, () =>
|
||||
originalPublishJSON(request),
|
||||
);
|
||||
|
||||
// Annotate with response data
|
||||
annotatePublishResponse(span, response);
|
||||
|
||||
// Mark as successful
|
||||
finalizeSpan(span);
|
||||
|
||||
return response;
|
||||
} catch (error) {
|
||||
// Mark as failed
|
||||
finalizeSpan(span, error);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
|
||||
// Replace the method with our instrumented version
|
||||
client.publishJSON = instrumentedPublishJSON;
|
||||
|
||||
// Mark as instrumented
|
||||
(client as InstrumentedClient)[INSTRUMENTED_FLAG] = true;
|
||||
|
||||
return client;
|
||||
}
|
||||
21
packages/otel-upstash/tsconfig.json
Normal file
21
packages/otel-upstash/tsconfig.json
Normal file
@@ -0,0 +1,21 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"target": "ES2020",
|
||||
"module": "ESNext",
|
||||
"moduleResolution": "bundler",
|
||||
"lib": ["ES2020", "DOM"],
|
||||
"outDir": "dist",
|
||||
"declaration": true,
|
||||
"declarationMap": true,
|
||||
"sourceMap": true,
|
||||
"strict": false,
|
||||
"esModuleInterop": true,
|
||||
"skipLibCheck": true,
|
||||
"forceConsistentCasingInFileNames": true,
|
||||
"resolveJsonModule": true,
|
||||
"declarationDir": "dist/types",
|
||||
"stripInternal": true
|
||||
},
|
||||
"include": ["src/**/*"],
|
||||
"exclude": ["node_modules", "dist", "**/*.test.ts"]
|
||||
}
|
||||
49
pnpm-lock.yaml
generated
49
pnpm-lock.yaml
generated
@@ -102,6 +102,30 @@ importers:
|
||||
specifier: 0.33.0
|
||||
version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0)
|
||||
|
||||
packages/otel-upstash:
|
||||
devDependencies:
|
||||
'@opentelemetry/api':
|
||||
specifier: ^1.9.0
|
||||
version: 1.9.0
|
||||
'@opentelemetry/sdk-trace-base':
|
||||
specifier: ^2.1.0
|
||||
version: 2.1.0(@opentelemetry/api@1.9.0)
|
||||
'@types/node':
|
||||
specifier: 18.15.11
|
||||
version: 18.15.11
|
||||
'@upstash/qstash':
|
||||
specifier: ^2.0.0
|
||||
version: 2.8.3
|
||||
rimraf:
|
||||
specifier: 3.0.2
|
||||
version: 3.0.2
|
||||
typescript:
|
||||
specifier: ^5
|
||||
version: 5.3.3
|
||||
vitest:
|
||||
specifier: 0.33.0
|
||||
version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0)
|
||||
|
||||
packages:
|
||||
|
||||
'@adobe/css-tools@4.3.2':
|
||||
@@ -536,6 +560,9 @@ packages:
|
||||
'@types/semver@7.5.6':
|
||||
resolution: {integrity: sha512-dn1l8LaMea/IjDoHNd9J52uBbInB796CDffS6VdIxvqYCPSG0V0DzHp76GpaWnlhg88uYyPbXCDIowa86ybd5A==}
|
||||
|
||||
'@upstash/qstash@2.8.3':
|
||||
resolution: {integrity: sha512-SHf1mCGqZur0UTzXVx33phtFXIuLyjwDL1QsBE36gQFEx3rEG4fJc3qA2eD7jTUXEAYYrNkCQxMOtcteHFpwqw==}
|
||||
|
||||
'@vitest/expect@0.33.0':
|
||||
resolution: {integrity: sha512-sVNf+Gla3mhTCxNJx+wJLDPp/WcstOe0Ksqz4Vec51MmgMth/ia0MGFEkIZmVGeTL5HtjYR4Wl/ZxBxBXZJTzQ==}
|
||||
|
||||
@@ -770,6 +797,9 @@ packages:
|
||||
resolution: {integrity: sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==}
|
||||
engines: {node: '>= 8'}
|
||||
|
||||
crypto-js@4.2.0:
|
||||
resolution: {integrity: sha512-KALDyEYgpY+Rlob/iriUtjV6d5Eq+Y191A5g4UqLAi8CyGP9N1+FdVbkc1SxKc2r4YAYqG8JzO2KGL+AizD70Q==}
|
||||
|
||||
csstype@3.1.3:
|
||||
resolution: {integrity: sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw==}
|
||||
|
||||
@@ -1302,6 +1332,9 @@ packages:
|
||||
jackspeak@3.4.3:
|
||||
resolution: {integrity: sha512-OGlZQpz2yfahA/Rd1Y8Cd9SIEsqvXkLVoSw/cgwhnhFMDbsQFeZYoJJ7bIZBS9BcamUW96asq/npPWugM+RQBw==}
|
||||
|
||||
jose@5.10.0:
|
||||
resolution: {integrity: sha512-s+3Al/p9g32Iq+oqXxkW//7jk2Vig6FF1CFqzVXoTUXt2qz89YWbL+OwS17NFYEvxC35n0FKeGO2LGYSxeM2Gg==}
|
||||
|
||||
jose@6.1.0:
|
||||
resolution: {integrity: sha512-TTQJyoEoKcC1lscpVDCSsVgYzUDg/0Bt3WE//WiTPK6uOCQC2KZS4MpugbMWt/zyjkopgZoXhZuCi00gLudfUA==}
|
||||
|
||||
@@ -1469,6 +1502,10 @@ packages:
|
||||
engines: {node: '>= 4.4.x'}
|
||||
hasBin: true
|
||||
|
||||
neverthrow@7.2.0:
|
||||
resolution: {integrity: sha512-iGBUfFB7yPczHHtA8dksKTJ9E8TESNTAx1UQWW6TzMF280vo9jdPYpLUXrMN1BCkPdHFdNG3fxOt2CUad8KhAw==}
|
||||
engines: {node: '>=18'}
|
||||
|
||||
node-fetch@2.7.0:
|
||||
resolution: {integrity: sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==}
|
||||
engines: {node: 4.x || >=6.0.0}
|
||||
@@ -2781,6 +2818,12 @@ snapshots:
|
||||
|
||||
'@types/semver@7.5.6': {}
|
||||
|
||||
'@upstash/qstash@2.8.3':
|
||||
dependencies:
|
||||
crypto-js: 4.2.0
|
||||
jose: 5.10.0
|
||||
neverthrow: 7.2.0
|
||||
|
||||
'@vitest/expect@0.33.0':
|
||||
dependencies:
|
||||
'@vitest/spy': 0.33.0
|
||||
@@ -3039,6 +3082,8 @@ snapshots:
|
||||
shebang-command: 2.0.0
|
||||
which: 2.0.2
|
||||
|
||||
crypto-js@4.2.0: {}
|
||||
|
||||
csstype@3.1.3:
|
||||
optional: true
|
||||
|
||||
@@ -3555,6 +3600,8 @@ snapshots:
|
||||
optionalDependencies:
|
||||
'@pkgjs/parseargs': 0.11.0
|
||||
|
||||
jose@5.10.0: {}
|
||||
|
||||
jose@6.1.0: {}
|
||||
|
||||
js-beautify@1.15.4:
|
||||
@@ -3726,6 +3773,8 @@ snapshots:
|
||||
sax: 1.3.0
|
||||
optional: true
|
||||
|
||||
neverthrow@7.2.0: {}
|
||||
|
||||
node-fetch@2.7.0:
|
||||
dependencies:
|
||||
whatwg-url: 5.0.0
|
||||
|
||||
Reference in New Issue
Block a user