diff --git a/.changeset/little-cows-melt.md b/.changeset/little-cows-melt.md new file mode 100644 index 0000000..3a85ed8 --- /dev/null +++ b/.changeset/little-cows-melt.md @@ -0,0 +1,5 @@ +--- +"@kubiks/otel-inbound": minor +--- + +Initial release diff --git a/README.md b/README.md index 38033bc..85dbce7 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Our goal is to bring the TypeScript ecosystem the observability tools it’s bee - [`@kubiks/otel-autumn`](./packages/otel-autumn/README.md) - [`@kubiks/otel-better-auth`](./packages/otel-better-auth/README.md) - [`@kubiks/otel-drizzle`](./packages/otel-drizzle/README.md) +- [`@kubiks/otel-inbound`](./packages/otel-inbound/README.md) - [`@kubiks/otel-resend`](./packages/otel-resend/README.md) - [`@kubiks/otel-upstash-queues`](./packages/otel-upstash-queues/README.md) diff --git a/images/otel-inbound-trace.png b/images/otel-inbound-trace.png new file mode 100644 index 0000000..b598e61 Binary files /dev/null and b/images/otel-inbound-trace.png differ diff --git a/packages/otel-inbound/CHANGELOG.md b/packages/otel-inbound/CHANGELOG.md new file mode 100644 index 0000000..4615cc9 --- /dev/null +++ b/packages/otel-inbound/CHANGELOG.md @@ -0,0 +1,14 @@ +# @kubiks/otel-inbound + +## 1.0.0 + +### Initial Release + +- OpenTelemetry instrumentation for Inbound email API +- Support for all email operations (send, schedule, reply, retrieve) +- Support for management operations (endpoints, addresses, domains) +- Support for thread and attachment operations +- Webhook receiver instrumentation for incoming emails +- Comprehensive span attributes for debugging and monitoring +- Optional email content capture with configurable truncation + diff --git a/packages/otel-inbound/LICENSE b/packages/otel-inbound/LICENSE new file mode 100644 index 0000000..3162213 --- /dev/null +++ b/packages/otel-inbound/LICENSE @@ -0,0 +1,22 @@ +MIT License + +Copyright (c) 2025 Kubiks + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/packages/otel-inbound/README.md b/packages/otel-inbound/README.md new file mode 100644 index 0000000..4d2dea6 --- /dev/null +++ b/packages/otel-inbound/README.md @@ -0,0 +1,280 @@ +# @kubiks/otel-inbound + +OpenTelemetry instrumentation for the [Inbound](https://inbound.new) email API SDK. +Capture spans for every Inbound API operation, enrich them with detailed metadata, +and monitor your complete email workflow from traces. + +![Inbound Trace Visualization](https://github.com/kubiks-inc/otel/blob/main/images/otel-inbound-trace.png) + +_Visualize your email operations with detailed span information including recipients, subjects, scheduling, and webhook processing._ + +## Installation + +```bash +npm install @kubiks/otel-inbound +# or +pnpm add @kubiks/otel-inbound +``` + +**Peer Dependencies:** `@opentelemetry/api` >= 1.9.0, `@inboundemail/sdk` >= 4.0.0 + +## Quick Start + +```ts +import { Inbound } from "@inboundemail/sdk"; +import { instrumentInbound } from "@kubiks/otel-inbound"; + +const inbound = instrumentInbound(new Inbound(process.env.INBOUND_API_KEY!)); + +await inbound.emails.send({ + from: "hello@example.com", + to: ["user@example.com"], + subject: "Welcome", + html: "

Hello world

", +}); +``` + +`instrumentInbound` wraps the instance you already use — no configuration changes +needed. Every SDK call creates a client span with useful attributes. + +## What Gets Traced + +This instrumentation wraps all Inbound API operations, creating spans for each: + +### Email Operations +- `emails.send()` - Send email +- `emails.schedule()` - Schedule email for later delivery +- `emails.reply()` - Reply to an existing email thread +- `emails.retrieve()` - Retrieve email details +- `emails.listScheduled()` - List scheduled emails +- `emails.getScheduled()` - Get specific scheduled email +- `emails.cancelScheduled()` - Cancel a scheduled email + +### Management Operations +- **Endpoints**: `list()`, `create()`, `get()`, `update()`, `delete()` +- **Addresses**: `list()`, `create()`, `get()`, `update()`, `delete()` +- **Domains**: `list()`, `create()`, `get()`, `update()`, `delete()`, `getDNS()` + +### Thread & Attachment Operations +- **Threads**: `list()`, `get()`, `actions()`, `statistics()` +- **Attachments**: `download()` + +### Webhook Receivers +- Incoming email webhooks (via `instrumentInboundWebhook`) + +## Span Attributes + +Each span includes relevant attributes based on the operation type: + +### Base Attributes (All Operations) + +| Attribute | Description | Example | +| ----------------------- | ---------------------------- | ------------------------ | +| `messaging.system` | Constant value `inbound` | `inbound` | +| `messaging.operation` | Operation type | `send`, `schedule`, etc. | +| `inbound.resource` | Resource being accessed | `emails`, `endpoints` | +| `inbound.target` | Full operation target | `emails.send` | + +### Email Attributes + +| Attribute | Description | Example | +| --------------------------- | --------------------------------- | --------------------------------------- | +| `inbound.message_id` | Message ID returned by Inbound | `msg_123` | +| `inbound.to_addresses` | Comma-separated TO addresses | `user@example.com, another@example.com` | +| `inbound.cc_addresses` | Comma-separated CC addresses | `cc@example.com` | +| `inbound.bcc_addresses` | Comma-separated BCC addresses | `bcc@example.com` | +| `inbound.recipient_count` | Total number of recipients | `3` | +| `inbound.from` | Sender email address | `noreply@example.com` | +| `inbound.subject` | Email subject line | `Welcome to our service` | +| `inbound.html_content` | HTML content (if capture enabled) | `

Hello

` | +| `inbound.text_content` | Text content (if capture enabled) | `Hello` | + +### Scheduling Attributes + +| Attribute | Description | Example | +| ----------------------- | ---------------------------- | ------------------------ | +| `inbound.scheduled_at` | Scheduled delivery time | `2025-01-01T00:00:00Z` | +| `inbound.schedule_id` | Schedule ID from API | `sched_123` | + +### Management Attributes + +| Attribute | Description | Example | +| ---------------------- | ------------------------- | ------------ | +| `inbound.endpoint_id` | Endpoint identifier | `ep_123` | +| `inbound.domain_id` | Domain identifier | `dom_123` | +| `inbound.address_id` | Email address identifier | `addr_123` | + +### Thread & Attachment Attributes + +| Attribute | Description | Example | +| ------------------------ | -------------------------- | -------------- | +| `inbound.thread_id` | Email thread identifier | `thread_123` | +| `inbound.attachment_id` | Attachment identifier | `attach_123` | + +### Webhook Attributes + +| Attribute | Description | Example | +| ---------------------- | ------------------------------ | ---------------- | +| `inbound.webhook_id` | Webhook identifier from header | `webhook_123` | +| `http.status_code` | HTTP response status code | `200` | + +## Advanced Usage + +### Webhook Receiver Instrumentation + +Instrument Next.js route handlers that receive incoming emails: + +```ts +import { instrumentInboundWebhook } from "@kubiks/otel-inbound"; + +export const POST = instrumentInboundWebhook(async (request: Request) => { + const email = await request.json(); + + // Process incoming email + console.log('Received email from:', email.from); + console.log('Subject:', email.subject); + + // Your email processing logic here + await processIncomingEmail(email); + + return Response.json({ success: true }); +}); +``` + +This creates SERVER spans (SpanKind.SERVER) that automatically capture: +- Email metadata from webhook payload +- Webhook headers +- Response status +- Any errors during processing + +### Configuration Options + +Control what data is captured in your spans: + +```ts +import { instrumentInbound, type InstrumentInboundConfig } from "@kubiks/otel-inbound"; + +const config: InstrumentInboundConfig = { + // Capture email HTML/text content in spans (default: false) + captureEmailContent: true, + + // Maximum content length before truncation (default: 1024) + maxContentLength: 2048, +}; + +const inbound = instrumentInbound( + new Inbound(process.env.INBOUND_API_KEY!), + config +); +``` + +**Note:** Be cautious when enabling `captureEmailContent` as it may capture sensitive information in your traces. + +### Scheduling Emails + +```ts +await inbound.emails.schedule({ + from: "noreply@example.com", + to: "user@example.com", + subject: "Scheduled Newsletter", + html: "

Weekly update

", + scheduledAt: "2025-01-01T09:00:00Z", +}); + +// List all scheduled emails +const scheduled = await inbound.emails.listScheduled(); + +// Cancel a scheduled email +await inbound.emails.cancelScheduled("sched_123"); +``` + +### Reply to Emails + +```ts +await inbound.emails.reply({ + from: "support@example.com", + to: "customer@example.com", + subject: "Re: Support Request", + html: "

Thanks for reaching out!

", + threadId: "thread_123", // Thread ID from webhook payload +}); +``` + +### Domain Management + +```ts +// Create a domain +const domain = await inbound.domains.create({ + domain: "yourdomain.com", +}); + +// Get DNS records for verification +const dns = await inbound.domains.getDNS(domain.data.id); +console.log("Add these DNS records:", dns.data.records); + +// List all domains +const domains = await inbound.domains.list(); +``` + +### Endpoint Management + +```ts +// Create webhook endpoint +const endpoint = await inbound.endpoints.create({ + url: "https://yourdomain.com/webhook", + events: ["email.received"], +}); + +// Update endpoint +await inbound.endpoints.update(endpoint.data.id, { + url: "https://yourdomain.com/new-webhook", +}); + +// Delete endpoint +await inbound.endpoints.delete(endpoint.data.id); +``` + +### Complete Example with Webhook + +```ts +import { Inbound } from "@inboundemail/sdk"; +import { instrumentInbound, instrumentInboundWebhook } from "@kubiks/otel-inbound"; + +// Instrument the Inbound client +const inbound = instrumentInbound( + new Inbound(process.env.INBOUND_API_KEY!), + { captureEmailContent: true } +); + +// Send an email +await inbound.emails.send({ + from: "hello@yourdomain.com", + to: "user@example.com", + subject: "Welcome!", + html: "

Thanks for signing up!

", +}); + +// Webhook handler for receiving emails +export const POST = instrumentInboundWebhook( + async (request: Request) => { + const email = await request.json(); + + // Automatically reply to incoming emails + await inbound.emails.reply({ + from: email.to, + to: email.from, + subject: `Re: ${email.subject}`, + html: "

Thanks for your email! We'll get back to you soon.

", + threadId: email.threadId, + }); + + return Response.json({ processed: true }); + }, + { captureEmailContent: true } +); +``` + +## License + +MIT + diff --git a/packages/otel-inbound/package.json b/packages/otel-inbound/package.json new file mode 100644 index 0000000..94e18e3 --- /dev/null +++ b/packages/otel-inbound/package.json @@ -0,0 +1,54 @@ +{ + "name": "@kubiks/otel-inbound", + "version": "1.0.0", + "private": false, + "publishConfig": { + "access": "public" + }, + "description": "OpenTelemetry instrumentation for the Inbound email API SDK", + "author": "Kubiks", + "license": "MIT", + "repository": "kubiks-inc/otel", + "sideEffects": false, + "type": "module", + "exports": { + ".": { + "types": "./dist/types/index.d.ts", + "import": "./dist/index.js", + "default": "./dist/index.js" + } + }, + "main": "./dist/index.js", + "types": "./dist/types/index.d.ts", + "files": [ + "dist", + "LICENSE", + "README.md" + ], + "engines": { + "node": "^18.19.0 || >=20.6.0" + }, + "scripts": { + "build": "pnpm clean && tsc", + "clean": "rimraf dist", + "prepublishOnly": "pnpm build", + "type-check": "tsc --noEmit", + "unit-test": "vitest --run", + "unit-test-watch": "vitest" + }, + "dependencies": {}, + "devDependencies": { + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/sdk-trace-base": "^2.1.0", + "@types/node": "18.15.11", + "@inboundemail/sdk": "^4.3.1", + "rimraf": "3.0.2", + "typescript": "^5", + "vitest": "0.33.0" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.9.0 <2.0.0", + "@inboundemail/sdk": ">=4.0.0" + } +} + diff --git a/packages/otel-inbound/src/index.test.ts b/packages/otel-inbound/src/index.test.ts new file mode 100644 index 0000000..e571302 --- /dev/null +++ b/packages/otel-inbound/src/index.test.ts @@ -0,0 +1,687 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { SpanStatusCode, trace } from "@opentelemetry/api"; +import { + BasicTracerProvider, + InMemorySpanExporter, + SimpleSpanProcessor, +} from "@opentelemetry/sdk-trace-base"; +import { + instrumentInbound, + instrumentInboundWebhook, + SEMATTRS_MESSAGING_OPERATION, + SEMATTRS_MESSAGING_SYSTEM, + SEMATTRS_INBOUND_MESSAGE_ID, + SEMATTRS_INBOUND_RECIPIENT_COUNT, + SEMATTRS_INBOUND_RESOURCE, + SEMATTRS_INBOUND_TARGET, + SEMATTRS_INBOUND_TO_ADDRESSES, + SEMATTRS_INBOUND_CC_ADDRESSES, + SEMATTRS_INBOUND_BCC_ADDRESSES, + SEMATTRS_INBOUND_FROM, + SEMATTRS_INBOUND_SUBJECT, + SEMATTRS_INBOUND_HTML_CONTENT, + SEMATTRS_INBOUND_TEXT_CONTENT, + SEMATTRS_INBOUND_SCHEDULED_AT, + SEMATTRS_INBOUND_SCHEDULE_ID, + SEMATTRS_INBOUND_ENDPOINT_ID, + SEMATTRS_INBOUND_DOMAIN_ID, + SEMATTRS_INBOUND_ADDRESS_ID, + SEMATTRS_INBOUND_THREAD_ID, + SEMATTRS_INBOUND_ATTACHMENT_ID, + SEMATTRS_HTTP_STATUS_CODE, +} from "./index"; + +describe("instrumentInbound", () => { + let provider: BasicTracerProvider; + let exporter: InMemorySpanExporter; + + beforeEach(() => { + exporter = new InMemorySpanExporter(); + provider = new BasicTracerProvider({ + spanProcessors: [new SimpleSpanProcessor(exporter)], + }); + trace.setGlobalTracerProvider(provider); + }); + + afterEach(async () => { + await provider.shutdown(); + exporter.reset(); + trace.disable(); + }); + + const createMockInbound = () => { + return { + emails: { + send: vi.fn(async (payload: any) => ({ + data: { id: "email_123" }, + error: null, + })), + schedule: vi.fn(async (payload: any) => ({ + data: { id: "email_123", scheduleId: "sched_456" }, + error: null, + })), + reply: vi.fn(async (payload: any) => ({ + data: { id: "email_789" }, + error: null, + })), + retrieve: vi.fn(async (id: string) => ({ + data: { id, from: "sender@example.com", to: "user@example.com" }, + error: null, + })), + listScheduled: vi.fn(async () => ({ + data: [{ id: "sched_1" }, { id: "sched_2" }], + error: null, + })), + getScheduled: vi.fn(async (id: string) => ({ + data: { id, scheduledAt: "2025-01-01T00:00:00Z" }, + error: null, + })), + cancelScheduled: vi.fn(async (id: string) => ({ + data: { success: true }, + error: null, + })), + }, + endpoints: { + list: vi.fn(async () => ({ + data: [{ id: "ep_1" }, { id: "ep_2" }], + error: null, + })), + create: vi.fn(async (payload: any) => ({ + data: { id: "ep_123" }, + error: null, + })), + get: vi.fn(async (id: string) => ({ + data: { id }, + error: null, + })), + update: vi.fn(async (id: string, payload: any) => ({ + data: { id }, + error: null, + })), + delete: vi.fn(async (id: string) => ({ + data: { success: true }, + error: null, + })), + }, + addresses: { + list: vi.fn(async () => ({ + data: [{ id: "addr_1" }, { id: "addr_2" }], + error: null, + })), + create: vi.fn(async (payload: any) => ({ + data: { id: "addr_123" }, + error: null, + })), + get: vi.fn(async (id: string) => ({ + data: { id }, + error: null, + })), + update: vi.fn(async (id: string, payload: any) => ({ + data: { id }, + error: null, + })), + delete: vi.fn(async (id: string) => ({ + data: { success: true }, + error: null, + })), + }, + domains: { + list: vi.fn(async () => ({ + data: [{ id: "dom_1" }, { id: "dom_2" }], + error: null, + })), + create: vi.fn(async (payload: any) => ({ + data: { id: "dom_123" }, + error: null, + })), + get: vi.fn(async (id: string) => ({ + data: { id }, + error: null, + })), + update: vi.fn(async (id: string, payload: any) => ({ + data: { id }, + error: null, + })), + delete: vi.fn(async (id: string) => ({ + data: { success: true }, + error: null, + })), + getDNS: vi.fn(async (id: string) => ({ + data: { records: [] }, + error: null, + })), + }, + threads: { + list: vi.fn(async () => ({ + data: [{ id: "thread_1" }, { id: "thread_2" }], + error: null, + })), + get: vi.fn(async (id: string) => ({ + data: { id }, + error: null, + })), + actions: vi.fn(async (id: string, action: any) => ({ + data: { success: true }, + error: null, + })), + statistics: vi.fn(async () => ({ + data: { total: 100 }, + error: null, + })), + }, + attachments: { + download: vi.fn(async (id: string) => ({ + data: new Blob(), + error: null, + })), + }, + }; + }; + + describe("Email Operations", () => { + it("instruments emails.send with all attributes", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + const payload = { + to: ["user@example.com", "second@example.com"], + cc: ["cc@example.com"], + bcc: "bcc@example.com", + from: "sender@example.com", + subject: "Test Email", + html: "

Hello

", + text: "Hello", + }; + + const response = await inbound.emails.send(payload); + expect(response.data?.id).toBe("email_123"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("inbound.emails.send"); + expect(span.attributes[SEMATTRS_MESSAGING_SYSTEM]).toBe("inbound"); + expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("send"); + expect(span.attributes[SEMATTRS_INBOUND_RESOURCE]).toBe("emails"); + expect(span.attributes[SEMATTRS_INBOUND_TARGET]).toBe("emails.send"); + expect(span.attributes[SEMATTRS_INBOUND_MESSAGE_ID]).toBe("email_123"); + expect(span.attributes[SEMATTRS_INBOUND_RECIPIENT_COUNT]).toBe(4); + expect(span.attributes[SEMATTRS_INBOUND_TO_ADDRESSES]).toBe( + "user@example.com, second@example.com" + ); + expect(span.attributes[SEMATTRS_INBOUND_CC_ADDRESSES]).toBe("cc@example.com"); + expect(span.attributes[SEMATTRS_INBOUND_BCC_ADDRESSES]).toBe("bcc@example.com"); + expect(span.attributes[SEMATTRS_INBOUND_FROM]).toBe("sender@example.com"); + expect(span.attributes[SEMATTRS_INBOUND_SUBJECT]).toBe("Test Email"); + expect(span.status.code).toBe(SpanStatusCode.OK); + }); + + it("instruments emails.schedule with scheduling attributes", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + const payload = { + to: "user@example.com", + from: "sender@example.com", + subject: "Scheduled Email", + html: "

Hello

", + scheduledAt: "2025-01-01T00:00:00Z", + }; + + const response = await inbound.emails.schedule(payload); + expect(response.data?.scheduleId).toBe("sched_456"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("inbound.emails.schedule"); + expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("schedule"); + expect(span.attributes[SEMATTRS_INBOUND_SCHEDULED_AT]).toBe("2025-01-01T00:00:00Z"); + expect(span.attributes[SEMATTRS_INBOUND_SCHEDULE_ID]).toBe("sched_456"); + }); + + it("instruments emails.reply with thread tracking", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + const payload = { + to: "user@example.com", + from: "sender@example.com", + subject: "Re: Test", + html: "

Reply

", + threadId: "thread_123", + }; + + await inbound.emails.reply(payload); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("inbound.emails.reply"); + expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("reply"); + expect(span.attributes[SEMATTRS_INBOUND_THREAD_ID]).toBe("thread_123"); + }); + + it("instruments emails.retrieve", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + await inbound.emails.retrieve("email_123"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("inbound.emails.retrieve"); + expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("retrieve"); + expect(span.attributes[SEMATTRS_INBOUND_MESSAGE_ID]).toBe("email_123"); + }); + + it("instruments scheduled email operations", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + await inbound.emails.listScheduled(); + await inbound.emails.getScheduled("sched_123"); + await inbound.emails.cancelScheduled("sched_123"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(3); + + expect(spans[0].name).toBe("inbound.emails.listScheduled"); + expect(spans[0].attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("list"); + + expect(spans[1].name).toBe("inbound.emails.getScheduled"); + expect(spans[1].attributes[SEMATTRS_INBOUND_SCHEDULE_ID]).toBe("sched_123"); + + expect(spans[2].name).toBe("inbound.emails.cancelScheduled"); + expect(spans[2].attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("cancel"); + }); + + it("captures email content when enabled", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound, { captureEmailContent: true }); + + const payload = { + to: "user@example.com", + from: "sender@example.com", + subject: "Test", + html: "

HTML content

", + text: "Text content", + }; + + await inbound.emails.send(payload); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.attributes[SEMATTRS_INBOUND_HTML_CONTENT]).toBe("

HTML content

"); + expect(span.attributes[SEMATTRS_INBOUND_TEXT_CONTENT]).toBe("Text content"); + }); + + it("truncates long content", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound, { + captureEmailContent: true, + maxContentLength: 10, + }); + + const payload = { + to: "user@example.com", + from: "sender@example.com", + subject: "Test", + html: "This is a very long HTML content that should be truncated", + }; + + await inbound.emails.send(payload); + + const spans = exporter.getFinishedSpans(); + const span = spans[0]; + expect(span.attributes[SEMATTRS_INBOUND_HTML_CONTENT]).toContain("... (truncated)"); + }); + }); + + describe("Management Operations", () => { + it("instruments endpoint operations", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + await inbound.endpoints.list(); + await inbound.endpoints.create({ url: "https://example.com" }); + await inbound.endpoints.get("ep_123"); + await inbound.endpoints.update("ep_123", { url: "https://new.com" }); + await inbound.endpoints.delete("ep_123"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(5); + + expect(spans[0].name).toBe("inbound.endpoints.list"); + expect(spans[0].attributes[SEMATTRS_INBOUND_RESOURCE]).toBe("endpoints"); + + expect(spans[1].name).toBe("inbound.endpoints.create"); + expect(spans[1].attributes[SEMATTRS_INBOUND_ENDPOINT_ID]).toBe("ep_123"); + + expect(spans[2].name).toBe("inbound.endpoints.get"); + expect(spans[2].attributes[SEMATTRS_INBOUND_ENDPOINT_ID]).toBe("ep_123"); + + expect(spans[3].name).toBe("inbound.endpoints.update"); + expect(spans[4].name).toBe("inbound.endpoints.delete"); + }); + + it("instruments address operations", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + await inbound.addresses.list(); + await inbound.addresses.create({ email: "test@example.com" }); + await inbound.addresses.get("addr_123"); + await inbound.addresses.update("addr_123", { name: "Test" }); + await inbound.addresses.delete("addr_123"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(5); + + expect(spans[0].name).toBe("inbound.addresses.list"); + expect(spans[1].name).toBe("inbound.addresses.create"); + expect(spans[1].attributes[SEMATTRS_INBOUND_ADDRESS_ID]).toBe("addr_123"); + expect(spans[2].attributes[SEMATTRS_INBOUND_ADDRESS_ID]).toBe("addr_123"); + }); + + it("instruments domain operations", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + await inbound.domains.list(); + await inbound.domains.create({ domain: "example.com" }); + await inbound.domains.get("dom_123"); + await inbound.domains.update("dom_123", { name: "Example" }); + await inbound.domains.delete("dom_123"); + await inbound.domains.getDNS("dom_123"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(6); + + expect(spans[0].name).toBe("inbound.domains.list"); + expect(spans[1].name).toBe("inbound.domains.create"); + expect(spans[1].attributes[SEMATTRS_INBOUND_DOMAIN_ID]).toBe("dom_123"); + expect(spans[5].name).toBe("inbound.domains.getDNS"); + expect(spans[5].attributes[SEMATTRS_INBOUND_DOMAIN_ID]).toBe("dom_123"); + }); + }); + + describe("Thread Operations", () => { + it("instruments thread operations", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + await inbound.threads.list(); + await inbound.threads.get("thread_123"); + await inbound.threads.actions("thread_123", { action: "archive" }); + await inbound.threads.statistics(); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(4); + + expect(spans[0].name).toBe("inbound.threads.list"); + expect(spans[1].name).toBe("inbound.threads.get"); + expect(spans[1].attributes[SEMATTRS_INBOUND_THREAD_ID]).toBe("thread_123"); + expect(spans[2].name).toBe("inbound.threads.actions"); + expect(spans[3].name).toBe("inbound.threads.statistics"); + }); + }); + + describe("Attachment Operations", () => { + it("instruments attachment download", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + await inbound.attachments.download("attach_123"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("inbound.attachments.download"); + expect(span.attributes[SEMATTRS_INBOUND_ATTACHMENT_ID]).toBe("attach_123"); + }); + }); + + describe("Error Handling", () => { + it("captures errors and marks span status", async () => { + const inbound = createMockInbound(); + inbound.emails.send = vi.fn().mockRejectedValue(new Error("API Error")); + + instrumentInbound(inbound); + + await expect( + inbound.emails.send({ + to: "test@example.com", + from: "sender@example.com", + subject: "Test", + html: "

Test

", + }) + ).rejects.toThrowError("API Error"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.status.code).toBe(SpanStatusCode.ERROR); + const hasException = span.events.some((event) => event.name === "exception"); + expect(hasException).toBe(true); + }); + }); + + describe("Idempotency", () => { + it("is idempotent", async () => { + const inbound = createMockInbound(); + const first = instrumentInbound(inbound); + const second = instrumentInbound(first); + + expect(first).toBe(second); + + await second.emails.send({ + to: "test@example.com", + from: "sender@example.com", + subject: "Test", + html: "

Test

", + }); + + expect(exporter.getFinishedSpans()).toHaveLength(1); + }); + }); + + describe("Edge Cases", () => { + it("handles missing optional fields gracefully", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + const payload = { + to: "single@example.com", + from: "sender@example.com", + subject: "Test", + html: "

Test

", + }; + + await inbound.emails.send(payload); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.attributes[SEMATTRS_INBOUND_RECIPIENT_COUNT]).toBe(1); + expect(span.attributes[SEMATTRS_INBOUND_TO_ADDRESSES]).toBe("single@example.com"); + expect(span.attributes[SEMATTRS_INBOUND_CC_ADDRESSES]).toBeUndefined(); + expect(span.attributes[SEMATTRS_INBOUND_BCC_ADDRESSES]).toBeUndefined(); + }); + + it("handles mixed string and array formats", async () => { + const inbound = createMockInbound(); + instrumentInbound(inbound); + + const payload = { + to: "single@example.com", + cc: ["cc1@example.com", "cc2@example.com"], + bcc: ["bcc@example.com"], + from: "sender@example.com", + subject: "Mixed Format", + html: "

Test

", + }; + + await inbound.emails.send(payload); + + const spans = exporter.getFinishedSpans(); + const span = spans[0]; + + expect(span.attributes[SEMATTRS_INBOUND_TO_ADDRESSES]).toBe("single@example.com"); + expect(span.attributes[SEMATTRS_INBOUND_CC_ADDRESSES]).toBe( + "cc1@example.com, cc2@example.com" + ); + expect(span.attributes[SEMATTRS_INBOUND_BCC_ADDRESSES]).toBe("bcc@example.com"); + expect(span.attributes[SEMATTRS_INBOUND_RECIPIENT_COUNT]).toBe(4); + }); + }); +}); + +describe("instrumentInboundWebhook", () => { + let provider: BasicTracerProvider; + let exporter: InMemorySpanExporter; + + beforeEach(() => { + exporter = new InMemorySpanExporter(); + provider = new BasicTracerProvider({ + spanProcessors: [new SimpleSpanProcessor(exporter)], + }); + trace.setGlobalTracerProvider(provider); + }); + + afterEach(async () => { + await provider.shutdown(); + exporter.reset(); + trace.disable(); + }); + + it("instruments webhook receivers", async () => { + const handler = vi.fn(async (request: Request) => { + return Response.json({ success: true }); + }); + + const instrumentedHandler = instrumentInboundWebhook(handler); + + const emailPayload = { + from: "sender@example.com", + to: "recipient@example.com", + subject: "Webhook Test", + html: "

Test

", + messageId: "msg_123", + }; + + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { + "content-type": "application/json", + "x-webhook-id": "webhook_123", + "x-message-id": "msg_123", + }, + body: JSON.stringify(emailPayload), + }); + + const response = await instrumentedHandler(request); + expect(response.status).toBe(200); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("inbound.webhook.receive"); + expect(span.attributes[SEMATTRS_MESSAGING_SYSTEM]).toBe("inbound"); + expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("receive"); + expect(span.attributes[SEMATTRS_INBOUND_RESOURCE]).toBe("webhook"); + expect(span.attributes[SEMATTRS_INBOUND_FROM]).toBe("sender@example.com"); + expect(span.attributes[SEMATTRS_INBOUND_SUBJECT]).toBe("Webhook Test"); + expect(span.attributes[SEMATTRS_INBOUND_MESSAGE_ID]).toBe("msg_123"); + expect(span.attributes[SEMATTRS_HTTP_STATUS_CODE]).toBe(200); + expect(span.status.code).toBe(SpanStatusCode.OK); + }); + + it("captures webhook email content when enabled", async () => { + const handler = vi.fn(async () => Response.json({ success: true })); + const instrumentedHandler = instrumentInboundWebhook(handler, { + captureEmailContent: true, + }); + + const emailPayload = { + from: "sender@example.com", + to: "recipient@example.com", + subject: "Content Test", + html: "

HTML content

", + text: "Text content", + }; + + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(emailPayload), + }); + + await instrumentedHandler(request); + + const spans = exporter.getFinishedSpans(); + const span = spans[0]; + + expect(span.attributes[SEMATTRS_INBOUND_HTML_CONTENT]).toBe("

HTML content

"); + expect(span.attributes[SEMATTRS_INBOUND_TEXT_CONTENT]).toBe("Text content"); + }); + + it("handles webhook errors", async () => { + const handler = vi.fn(async () => { + throw new Error("Webhook processing failed"); + }); + + const instrumentedHandler = instrumentInboundWebhook(handler); + + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ from: "test@example.com" }), + }); + + await expect(instrumentedHandler(request)).rejects.toThrowError( + "Webhook processing failed" + ); + + const spans = exporter.getFinishedSpans(); + const span = spans[0]; + + expect(span.status.code).toBe(SpanStatusCode.ERROR); + const hasException = span.events.some((event) => event.name === "exception"); + expect(hasException).toBe(true); + }); + + it("marks non-2xx responses as errors", async () => { + const handler = vi.fn(async () => { + return new Response("Bad Request", { status: 400 }); + }); + + const instrumentedHandler = instrumentInboundWebhook(handler); + + const request = new Request("https://example.com/webhook", { + method: "POST", + body: JSON.stringify({}), + }); + + const response = await instrumentedHandler(request); + expect(response.status).toBe(400); + + const spans = exporter.getFinishedSpans(); + const span = spans[0]; + + expect(span.attributes[SEMATTRS_HTTP_STATUS_CODE]).toBe(400); + expect(span.status.code).toBe(SpanStatusCode.ERROR); + }); +}); + diff --git a/packages/otel-inbound/src/index.ts b/packages/otel-inbound/src/index.ts new file mode 100644 index 0000000..ee04575 --- /dev/null +++ b/packages/otel-inbound/src/index.ts @@ -0,0 +1,906 @@ +import { + context, + SpanKind, + SpanStatusCode, + trace, + type Span, +} from "@opentelemetry/api"; + +const DEFAULT_TRACER_NAME = "@kubiks/otel-inbound"; +const INSTRUMENTED_FLAG = Symbol("kubiksOtelInboundInstrumented"); + +// Semantic attribute constants - Base +export const SEMATTRS_MESSAGING_SYSTEM = "messaging.system" as const; +export const SEMATTRS_MESSAGING_OPERATION = "messaging.operation" as const; +export const SEMATTRS_INBOUND_RESOURCE = "inbound.resource" as const; +export const SEMATTRS_INBOUND_TARGET = "inbound.target" as const; + +// Email-specific attributes +export const SEMATTRS_INBOUND_MESSAGE_ID = "inbound.message_id" as const; +export const SEMATTRS_INBOUND_TO_ADDRESSES = "inbound.to_addresses" as const; +export const SEMATTRS_INBOUND_CC_ADDRESSES = "inbound.cc_addresses" as const; +export const SEMATTRS_INBOUND_BCC_ADDRESSES = "inbound.bcc_addresses" as const; +export const SEMATTRS_INBOUND_RECIPIENT_COUNT = "inbound.recipient_count" as const; +export const SEMATTRS_INBOUND_FROM = "inbound.from" as const; +export const SEMATTRS_INBOUND_SUBJECT = "inbound.subject" as const; +export const SEMATTRS_INBOUND_HTML_CONTENT = "inbound.html_content" as const; +export const SEMATTRS_INBOUND_TEXT_CONTENT = "inbound.text_content" as const; + +// Scheduling attributes +export const SEMATTRS_INBOUND_SCHEDULED_AT = "inbound.scheduled_at" as const; +export const SEMATTRS_INBOUND_SCHEDULE_ID = "inbound.schedule_id" as const; + +// Management attributes +export const SEMATTRS_INBOUND_ENDPOINT_ID = "inbound.endpoint_id" as const; +export const SEMATTRS_INBOUND_DOMAIN_ID = "inbound.domain_id" as const; +export const SEMATTRS_INBOUND_ADDRESS_ID = "inbound.address_id" as const; +export const SEMATTRS_INBOUND_THREAD_ID = "inbound.thread_id" as const; +export const SEMATTRS_INBOUND_ATTACHMENT_ID = "inbound.attachment_id" as const; + +// Webhook-specific attributes +export const SEMATTRS_INBOUND_WEBHOOK_ID = "inbound.webhook_id" as const; +export const SEMATTRS_HTTP_STATUS_CODE = "http.status_code" as const; + +// Configuration interface +export interface InstrumentInboundConfig { + /** + * Whether to capture email content (html/text) in spans. + * @default false + */ + captureEmailContent?: boolean; + + /** + * Maximum length of content to capture. Content longer than this will be truncated. + * @default 1024 + */ + maxContentLength?: number; +} + +interface InstrumentedInbound { + [INSTRUMENTED_FLAG]?: true; + emails?: any; + endpoints?: any; + addresses?: any; + domains?: any; + threads?: any; + attachments?: any; +} + +// Helper function to extract email addresses +function extractEmailAddresses(value: string | string[] | undefined): string[] { + if (!value) { + return []; + } + if (typeof value === "string") { + const trimmed = value.trim(); + return trimmed ? [trimmed] : []; + } + if (Array.isArray(value)) { + return value + .filter((email) => typeof email === "string" && email.trim()) + .map((email) => email.trim()); + } + return []; +} + +// Helper function to serialize and truncate content +function serializeContent(content: unknown, maxLength: number): string { + try { + const serialized = typeof content === "string" ? content : JSON.stringify(content); + if (serialized.length > maxLength) { + return serialized.substring(0, maxLength) + "... (truncated)"; + } + return serialized; + } catch (error) { + return "[Unable to serialize content]"; + } +} + +// Annotate email operation spans +function annotateEmailSpan( + span: Span, + operation: string, + resource: string, + payload: any, + config?: InstrumentInboundConfig +): void { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: operation, + [SEMATTRS_INBOUND_RESOURCE]: resource, + [SEMATTRS_INBOUND_TARGET]: `${resource}.${operation}`, + }); + + // Extract and set email addresses + const toAddresses = extractEmailAddresses(payload.to); + if (toAddresses.length > 0) { + span.setAttribute(SEMATTRS_INBOUND_TO_ADDRESSES, toAddresses.join(", ")); + } + + const ccAddresses = extractEmailAddresses(payload.cc); + if (ccAddresses.length > 0) { + span.setAttribute(SEMATTRS_INBOUND_CC_ADDRESSES, ccAddresses.join(", ")); + } + + const bccAddresses = extractEmailAddresses(payload.bcc); + if (bccAddresses.length > 0) { + span.setAttribute(SEMATTRS_INBOUND_BCC_ADDRESSES, bccAddresses.join(", ")); + } + + // Count recipients + const recipientCount = toAddresses.length + ccAddresses.length + bccAddresses.length; + if (recipientCount > 0) { + span.setAttribute(SEMATTRS_INBOUND_RECIPIENT_COUNT, recipientCount); + } + + // Set other email attributes + if (payload.from) { + span.setAttribute(SEMATTRS_INBOUND_FROM, payload.from); + } + + if (payload.subject) { + span.setAttribute(SEMATTRS_INBOUND_SUBJECT, payload.subject); + } + + // Capture email content if enabled + if (config?.captureEmailContent) { + const maxLength = config.maxContentLength ?? 1024; + + if (payload.html) { + span.setAttribute( + SEMATTRS_INBOUND_HTML_CONTENT, + serializeContent(payload.html, maxLength) + ); + } + + if (payload.text) { + span.setAttribute( + SEMATTRS_INBOUND_TEXT_CONTENT, + serializeContent(payload.text, maxLength) + ); + } + } + + // Scheduling attributes + if (payload.scheduledAt) { + span.setAttribute(SEMATTRS_INBOUND_SCHEDULED_AT, payload.scheduledAt); + } + + // Thread ID for replies + if (payload.threadId) { + span.setAttribute(SEMATTRS_INBOUND_THREAD_ID, payload.threadId); + } +} + +// Annotate management operation spans +function annotateManagementSpan( + span: Span, + operation: string, + resource: string, + payload?: any, + response?: any +): void { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: operation, + [SEMATTRS_INBOUND_RESOURCE]: resource, + [SEMATTRS_INBOUND_TARGET]: `${resource}.${operation}`, + }); + + // Set resource-specific IDs + if (response?.data?.id) { + if (resource === "endpoints") { + span.setAttribute(SEMATTRS_INBOUND_ENDPOINT_ID, response.data.id); + } else if (resource === "domains") { + span.setAttribute(SEMATTRS_INBOUND_DOMAIN_ID, response.data.id); + } else if (resource === "addresses") { + span.setAttribute(SEMATTRS_INBOUND_ADDRESS_ID, response.data.id); + } + } + + // Also check payload for ID (for get/update/delete operations) + if (payload && typeof payload === "string") { + if (resource === "endpoints") { + span.setAttribute(SEMATTRS_INBOUND_ENDPOINT_ID, payload); + } else if (resource === "domains") { + span.setAttribute(SEMATTRS_INBOUND_DOMAIN_ID, payload); + } else if (resource === "addresses") { + span.setAttribute(SEMATTRS_INBOUND_ADDRESS_ID, payload); + } + } +} + +// Annotate email response +function annotateEmailResponse(span: Span, response: any): void { + if (response?.data?.id) { + span.setAttribute(SEMATTRS_INBOUND_MESSAGE_ID, response.data.id); + } + + if (response?.data?.scheduleId) { + span.setAttribute(SEMATTRS_INBOUND_SCHEDULE_ID, response.data.scheduleId); + } +} + +// Finalize span with status +function finalizeSpan(span: Span, error?: unknown): void { + if (error) { + if (error instanceof Error) { + span.recordException(error); + } else { + span.recordException(new Error(String(error))); + } + span.setStatus({ code: SpanStatusCode.ERROR }); + } else { + span.setStatus({ code: SpanStatusCode.OK }); + } + span.end(); +} + +// Wrapper for async operations +function wrapAsyncOperation( + tracer: any, + spanName: string, + originalFn: Function, + annotator: (span: Span, ...args: any[]) => void, + config?: InstrumentInboundConfig +) { + return async function wrapped(...args: any[]): Promise { + const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT }); + + annotator(span, ...args); + + const activeContext = trace.setSpan(context.active(), span); + + try { + const response = await context.with(activeContext, () => + originalFn.apply(this, args) + ); + + annotateEmailResponse(span, response); + finalizeSpan(span); + + return response; + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }; +} + +/** + * Instruments an Inbound client instance with OpenTelemetry tracing. + * + * @param client - The Inbound client instance to instrument + * @param config - Optional configuration for instrumentation + * @returns The instrumented client instance + * + * @example + * ```typescript + * import { Inbound } from '@inboundemail/sdk'; + * import { instrumentInbound } from '@kubiks/otel-inbound'; + * + * const inbound = instrumentInbound( + * new Inbound(process.env.INBOUND_API_KEY!), + * { captureEmailContent: true } + * ); + * + * await inbound.emails.send({ + * from: 'hello@example.com', + * to: 'user@example.com', + * subject: 'Welcome', + * html: '

Hello world

', + * }); + * ``` + */ +export function instrumentInbound( + client: T, + config?: InstrumentInboundConfig +): T { + // Check if already instrumented + if ((client as InstrumentedInbound)[INSTRUMENTED_FLAG]) { + return client; + } + + const tracer = trace.getTracer(DEFAULT_TRACER_NAME); + + // Instrument emails namespace + if (client.emails) { + const emails = client.emails; + + // emails.send + if (emails.send) { + const originalSend = emails.send.bind(emails); + emails.send = wrapAsyncOperation( + tracer, + "inbound.emails.send", + originalSend, + (span, payload) => annotateEmailSpan(span, "send", "emails", payload, config), + config + ); + } + + // emails.schedule + if (emails.schedule) { + const originalSchedule = emails.schedule.bind(emails); + emails.schedule = wrapAsyncOperation( + tracer, + "inbound.emails.schedule", + originalSchedule, + (span, payload) => annotateEmailSpan(span, "schedule", "emails", payload, config), + config + ); + } + + // emails.reply + if (emails.reply) { + const originalReply = emails.reply.bind(emails); + emails.reply = wrapAsyncOperation( + tracer, + "inbound.emails.reply", + originalReply, + (span, payload) => annotateEmailSpan(span, "reply", "emails", payload, config), + config + ); + } + + // emails.retrieve + if (emails.retrieve) { + const originalRetrieve = emails.retrieve.bind(emails); + emails.retrieve = wrapAsyncOperation( + tracer, + "inbound.emails.retrieve", + originalRetrieve, + (span, id) => { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "retrieve", + [SEMATTRS_INBOUND_RESOURCE]: "emails", + [SEMATTRS_INBOUND_TARGET]: "emails.retrieve", + [SEMATTRS_INBOUND_MESSAGE_ID]: id, + }); + }, + config + ); + } + + // emails.listScheduled + if (emails.listScheduled) { + const originalListScheduled = emails.listScheduled.bind(emails); + emails.listScheduled = wrapAsyncOperation( + tracer, + "inbound.emails.listScheduled", + originalListScheduled, + (span) => { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "list", + [SEMATTRS_INBOUND_RESOURCE]: "scheduled_emails", + [SEMATTRS_INBOUND_TARGET]: "emails.listScheduled", + }); + }, + config + ); + } + + // emails.getScheduled + if (emails.getScheduled) { + const originalGetScheduled = emails.getScheduled.bind(emails); + emails.getScheduled = wrapAsyncOperation( + tracer, + "inbound.emails.getScheduled", + originalGetScheduled, + (span, id) => { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "get", + [SEMATTRS_INBOUND_RESOURCE]: "scheduled_emails", + [SEMATTRS_INBOUND_TARGET]: "emails.getScheduled", + [SEMATTRS_INBOUND_SCHEDULE_ID]: id, + }); + }, + config + ); + } + + // emails.cancelScheduled + if (emails.cancelScheduled) { + const originalCancelScheduled = emails.cancelScheduled.bind(emails); + emails.cancelScheduled = wrapAsyncOperation( + tracer, + "inbound.emails.cancelScheduled", + originalCancelScheduled, + (span, id) => { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "cancel", + [SEMATTRS_INBOUND_RESOURCE]: "scheduled_emails", + [SEMATTRS_INBOUND_TARGET]: "emails.cancelScheduled", + [SEMATTRS_INBOUND_SCHEDULE_ID]: id, + }); + }, + config + ); + } + } + + // Instrument endpoints namespace + if (client.endpoints) { + const endpoints = client.endpoints; + + if (endpoints.list) { + const originalList = endpoints.list.bind(endpoints); + endpoints.list = wrapAsyncOperation( + tracer, + "inbound.endpoints.list", + originalList, + (span) => annotateManagementSpan(span, "list", "endpoints"), + config + ); + } + + if (endpoints.create) { + const originalCreate = endpoints.create.bind(endpoints); + endpoints.create = async function(...args: any[]) { + const span = tracer.startSpan("inbound.endpoints.create", { kind: SpanKind.CLIENT }); + annotateManagementSpan(span, "create", "endpoints", args[0]); + const activeContext = trace.setSpan(context.active(), span); + + try { + const response = await context.with(activeContext, () => originalCreate.apply(this, args)); + annotateManagementSpan(span, "create", "endpoints", args[0], response); + finalizeSpan(span); + return response; + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }; + } + + if (endpoints.get) { + const originalGet = endpoints.get.bind(endpoints); + endpoints.get = wrapAsyncOperation( + tracer, + "inbound.endpoints.get", + originalGet, + (span, id) => annotateManagementSpan(span, "get", "endpoints", id), + config + ); + } + + if (endpoints.update) { + const originalUpdate = endpoints.update.bind(endpoints); + endpoints.update = wrapAsyncOperation( + tracer, + "inbound.endpoints.update", + originalUpdate, + (span, id, payload) => annotateManagementSpan(span, "update", "endpoints", id), + config + ); + } + + if (endpoints.delete) { + const originalDelete = endpoints.delete.bind(endpoints); + endpoints.delete = wrapAsyncOperation( + tracer, + "inbound.endpoints.delete", + originalDelete, + (span, id) => annotateManagementSpan(span, "delete", "endpoints", id), + config + ); + } + } + + // Instrument addresses namespace + if (client.addresses) { + const addresses = client.addresses; + + if (addresses.list) { + const originalList = addresses.list.bind(addresses); + addresses.list = wrapAsyncOperation( + tracer, + "inbound.addresses.list", + originalList, + (span) => annotateManagementSpan(span, "list", "addresses"), + config + ); + } + + if (addresses.create) { + const originalCreate = addresses.create.bind(addresses); + addresses.create = async function(...args: any[]) { + const span = tracer.startSpan("inbound.addresses.create", { kind: SpanKind.CLIENT }); + annotateManagementSpan(span, "create", "addresses", args[0]); + const activeContext = trace.setSpan(context.active(), span); + + try { + const response = await context.with(activeContext, () => originalCreate.apply(this, args)); + annotateManagementSpan(span, "create", "addresses", args[0], response); + finalizeSpan(span); + return response; + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }; + } + + if (addresses.get) { + const originalGet = addresses.get.bind(addresses); + addresses.get = wrapAsyncOperation( + tracer, + "inbound.addresses.get", + originalGet, + (span, id) => annotateManagementSpan(span, "get", "addresses", id), + config + ); + } + + if (addresses.update) { + const originalUpdate = addresses.update.bind(addresses); + addresses.update = wrapAsyncOperation( + tracer, + "inbound.addresses.update", + originalUpdate, + (span, id, payload) => annotateManagementSpan(span, "update", "addresses", id), + config + ); + } + + if (addresses.delete) { + const originalDelete = addresses.delete.bind(addresses); + addresses.delete = wrapAsyncOperation( + tracer, + "inbound.addresses.delete", + originalDelete, + (span, id) => annotateManagementSpan(span, "delete", "addresses", id), + config + ); + } + } + + // Instrument domains namespace + if (client.domains) { + const domains = client.domains; + + if (domains.list) { + const originalList = domains.list.bind(domains); + domains.list = wrapAsyncOperation( + tracer, + "inbound.domains.list", + originalList, + (span) => annotateManagementSpan(span, "list", "domains"), + config + ); + } + + if (domains.create) { + const originalCreate = domains.create.bind(domains); + domains.create = async function(...args: any[]) { + const span = tracer.startSpan("inbound.domains.create", { kind: SpanKind.CLIENT }); + annotateManagementSpan(span, "create", "domains", args[0]); + const activeContext = trace.setSpan(context.active(), span); + + try { + const response = await context.with(activeContext, () => originalCreate.apply(this, args)); + annotateManagementSpan(span, "create", "domains", args[0], response); + finalizeSpan(span); + return response; + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }; + } + + if (domains.get) { + const originalGet = domains.get.bind(domains); + domains.get = wrapAsyncOperation( + tracer, + "inbound.domains.get", + originalGet, + (span, id) => annotateManagementSpan(span, "get", "domains", id), + config + ); + } + + if (domains.update) { + const originalUpdate = domains.update.bind(domains); + domains.update = wrapAsyncOperation( + tracer, + "inbound.domains.update", + originalUpdate, + (span, id, payload) => annotateManagementSpan(span, "update", "domains", id), + config + ); + } + + if (domains.delete) { + const originalDelete = domains.delete.bind(domains); + domains.delete = wrapAsyncOperation( + tracer, + "inbound.domains.delete", + originalDelete, + (span, id) => annotateManagementSpan(span, "delete", "domains", id), + config + ); + } + + if (domains.getDNS) { + const originalGetDNS = domains.getDNS.bind(domains); + domains.getDNS = wrapAsyncOperation( + tracer, + "inbound.domains.getDNS", + originalGetDNS, + (span, id) => { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "getDNS", + [SEMATTRS_INBOUND_RESOURCE]: "domains", + [SEMATTRS_INBOUND_TARGET]: "domains.getDNS", + [SEMATTRS_INBOUND_DOMAIN_ID]: id, + }); + }, + config + ); + } + } + + // Instrument threads namespace + if (client.threads) { + const threads = client.threads; + + if (threads.list) { + const originalList = threads.list.bind(threads); + threads.list = wrapAsyncOperation( + tracer, + "inbound.threads.list", + originalList, + (span) => { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "list", + [SEMATTRS_INBOUND_RESOURCE]: "threads", + [SEMATTRS_INBOUND_TARGET]: "threads.list", + }); + }, + config + ); + } + + if (threads.get) { + const originalGet = threads.get.bind(threads); + threads.get = wrapAsyncOperation( + tracer, + "inbound.threads.get", + originalGet, + (span, id) => { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "get", + [SEMATTRS_INBOUND_RESOURCE]: "threads", + [SEMATTRS_INBOUND_TARGET]: "threads.get", + [SEMATTRS_INBOUND_THREAD_ID]: id, + }); + }, + config + ); + } + + if (threads.actions) { + const originalActions = threads.actions.bind(threads); + threads.actions = wrapAsyncOperation( + tracer, + "inbound.threads.actions", + originalActions, + (span, id, action) => { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "actions", + [SEMATTRS_INBOUND_RESOURCE]: "threads", + [SEMATTRS_INBOUND_TARGET]: "threads.actions", + [SEMATTRS_INBOUND_THREAD_ID]: id, + }); + }, + config + ); + } + + if (threads.statistics) { + const originalStatistics = threads.statistics.bind(threads); + threads.statistics = wrapAsyncOperation( + tracer, + "inbound.threads.statistics", + originalStatistics, + (span) => { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "statistics", + [SEMATTRS_INBOUND_RESOURCE]: "threads", + [SEMATTRS_INBOUND_TARGET]: "threads.statistics", + }); + }, + config + ); + } + } + + // Instrument attachments namespace + if (client.attachments) { + const attachments = client.attachments; + + if (attachments.download) { + const originalDownload = attachments.download.bind(attachments); + attachments.download = wrapAsyncOperation( + tracer, + "inbound.attachments.download", + originalDownload, + (span, id) => { + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "download", + [SEMATTRS_INBOUND_RESOURCE]: "attachments", + [SEMATTRS_INBOUND_TARGET]: "attachments.download", + [SEMATTRS_INBOUND_ATTACHMENT_ID]: id, + }); + }, + config + ); + } + } + + // Mark as instrumented + (client as InstrumentedInbound)[INSTRUMENTED_FLAG] = true; + + return client; +} + +// Type for Next.js route handlers +type RouteHandler = (request: Request) => Promise | Response; + +// Extract webhook headers +function extractWebhookHeaders(request: Request): Record { + const attributes: Record = {}; + + // Extract common webhook headers from Inbound + const webhookId = request.headers.get("x-webhook-id") || request.headers.get("x-inbound-webhook-id"); + if (webhookId) { + attributes[SEMATTRS_INBOUND_WEBHOOK_ID] = webhookId; + } + + const messageId = request.headers.get("x-message-id") || request.headers.get("x-inbound-message-id"); + if (messageId) { + attributes[SEMATTRS_INBOUND_MESSAGE_ID] = messageId; + } + + return attributes; +} + +// Annotate webhook span with email data +function annotateWebhookSpan( + span: Span, + payload: any, + config?: InstrumentInboundConfig +): void { + if (!payload) return; + + // Extract email information from webhook payload + if (payload.from) { + span.setAttribute(SEMATTRS_INBOUND_FROM, payload.from); + } + + if (payload.to) { + const toAddresses = extractEmailAddresses(payload.to); + if (toAddresses.length > 0) { + span.setAttribute(SEMATTRS_INBOUND_TO_ADDRESSES, toAddresses.join(", ")); + } + } + + if (payload.subject) { + span.setAttribute(SEMATTRS_INBOUND_SUBJECT, payload.subject); + } + + if (payload.messageId || payload.id) { + span.setAttribute(SEMATTRS_INBOUND_MESSAGE_ID, payload.messageId || payload.id); + } + + if (payload.threadId) { + span.setAttribute(SEMATTRS_INBOUND_THREAD_ID, payload.threadId); + } + + // Capture email content if enabled + if (config?.captureEmailContent) { + const maxLength = config.maxContentLength ?? 1024; + + if (payload.html) { + span.setAttribute( + SEMATTRS_INBOUND_HTML_CONTENT, + serializeContent(payload.html, maxLength) + ); + } + + if (payload.text) { + span.setAttribute( + SEMATTRS_INBOUND_TEXT_CONTENT, + serializeContent(payload.text, maxLength) + ); + } + } +} + +/** + * Instruments a Next.js route handler to trace incoming webhook requests from Inbound. + * + * @param handler - The Next.js route handler function + * @param config - Optional configuration for instrumentation + * @returns The instrumented route handler + * + * @example + * ```typescript + * import { instrumentInboundWebhook } from '@kubiks/otel-inbound'; + * + * export const POST = instrumentInboundWebhook(async (request: Request) => { + * const email = await request.json(); + * + * // Process incoming email + * console.log('Received email from:', email.from); + * + * return Response.json({ success: true }); + * }, { captureEmailContent: true }); + * ``` + */ +export function instrumentInboundWebhook( + handler: RouteHandler, + config?: InstrumentInboundConfig +): RouteHandler { + const tracer = trace.getTracer(DEFAULT_TRACER_NAME); + + return async function instrumentedWebhook(request: Request): Promise { + const span = tracer.startSpan("inbound.webhook.receive", { + kind: SpanKind.SERVER, + }); + + // Set base attributes + span.setAttributes({ + [SEMATTRS_MESSAGING_SYSTEM]: "inbound", + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_INBOUND_RESOURCE]: "webhook", + [SEMATTRS_INBOUND_TARGET]: "webhook.receive", + }); + + // Extract webhook headers + const webhookHeaders = extractWebhookHeaders(request); + span.setAttributes(webhookHeaders); + + // Try to parse and annotate with email data + try { + const clonedRequest = request.clone(); + const payload = await clonedRequest.json(); + annotateWebhookSpan(span, payload, config); + } catch (error) { + // Ignore errors when parsing webhook payload + } + + const activeContext = trace.setSpan(context.active(), span); + + try { + const response = await context.with(activeContext, () => handler(request)); + + // Capture response status + span.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status); + + // Mark as successful if status is 2xx + if (response.status >= 200 && response.status < 300) { + finalizeSpan(span); + } else { + finalizeSpan(span, new Error(`Handler returned status ${response.status}`)); + } + + return response; + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }; +} + diff --git a/packages/otel-inbound/tsconfig.json b/packages/otel-inbound/tsconfig.json new file mode 100644 index 0000000..2fd1b81 --- /dev/null +++ b/packages/otel-inbound/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "ESNext", + "moduleResolution": "bundler", + "lib": ["ES2020", "DOM"], + "outDir": "dist", + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "strict": false, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "declarationDir": "dist/types", + "stripInternal": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist", "**/*.test.ts"] +} + diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c57fe6e..ae710d4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -102,6 +102,30 @@ importers: specifier: 0.33.0 version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0) + packages/otel-inbound: + devDependencies: + '@inboundemail/sdk': + specifier: ^4.3.1 + version: 4.3.1(react-dom@18.3.1(react@18.2.0))(react@18.2.0) + '@opentelemetry/api': + specifier: ^1.9.0 + version: 1.9.0 + '@opentelemetry/sdk-trace-base': + specifier: ^2.1.0 + version: 2.1.0(@opentelemetry/api@1.9.0) + '@types/node': + specifier: 18.15.11 + version: 18.15.11 + rimraf: + specifier: 3.0.2 + version: 3.0.2 + typescript: + specifier: ^5 + version: 5.3.3 + vitest: + specifier: 0.33.0 + version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0) + packages/otel-resend: devDependencies: '@opentelemetry/api': @@ -526,6 +550,13 @@ packages: '@hexagon/base64@1.1.28': resolution: {integrity: sha512-lhqDEAvWixy3bZ+UOYbPwUbBkwBq5C1LAJ/xPC8Oi+lL54oyakv/npbA0aU2hgCsx/1NUd4IBvV03+aUBWxerw==} + '@inboundemail/sdk@4.3.1': + resolution: {integrity: sha512-WHyGHnGa5Kyzd/rJW4p/XJVqwx9s/e2LgqO3eL1+XCpu3xAU3W8A7oHs1l5kD3kakMuEjAoLacAp+xT3P58q6w==} + engines: {node: '>=16.0.0'} + peerDependencies: + react: '>=16.8.0' + react-dom: '>=16.8.0' + '@isaacs/cliui@8.0.2': resolution: {integrity: sha512-O8jcjabXaleOG9DQ0+ARXWZBTfnP4WNAqzuiJK7ll44AmxGKv/J2M4TPjxjY3znBCfvBXFzucm1twdyFybFqEA==} engines: {node: '>=12'} @@ -2857,6 +2888,11 @@ snapshots: '@hexagon/base64@1.1.28': {} + '@inboundemail/sdk@4.3.1(react-dom@18.3.1(react@18.2.0))(react@18.2.0)': + dependencies: + react: 18.2.0 + react-dom: 18.3.1(react@18.2.0) + '@isaacs/cliui@8.0.2': dependencies: string-width: 5.1.2