Merge pull request #23 from kubiks-inc/inbound

Inbound
This commit is contained in:
Alex Holovach
2025-10-21 10:32:55 -05:00
committed by GitHub
11 changed files with 2027 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
---
"@kubiks/otel-inbound": minor
---
Initial release

View File

@@ -15,6 +15,7 @@ Our goal is to bring the TypeScript ecosystem the observability tools its bee
- [`@kubiks/otel-autumn`](./packages/otel-autumn/README.md)
- [`@kubiks/otel-better-auth`](./packages/otel-better-auth/README.md)
- [`@kubiks/otel-drizzle`](./packages/otel-drizzle/README.md)
- [`@kubiks/otel-inbound`](./packages/otel-inbound/README.md)
- [`@kubiks/otel-resend`](./packages/otel-resend/README.md)
- [`@kubiks/otel-upstash-queues`](./packages/otel-upstash-queues/README.md)

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.4 MiB

View File

@@ -0,0 +1,14 @@
# @kubiks/otel-inbound
## 1.0.0
### Initial Release
- OpenTelemetry instrumentation for Inbound email API
- Support for all email operations (send, schedule, reply, retrieve)
- Support for management operations (endpoints, addresses, domains)
- Support for thread and attachment operations
- Webhook receiver instrumentation for incoming emails
- Comprehensive span attributes for debugging and monitoring
- Optional email content capture with configurable truncation

View File

@@ -0,0 +1,22 @@
MIT License
Copyright (c) 2025 Kubiks
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,280 @@
# @kubiks/otel-inbound
OpenTelemetry instrumentation for the [Inbound](https://inbound.new) email API SDK.
Capture spans for every Inbound API operation, enrich them with detailed metadata,
and monitor your complete email workflow from traces.
![Inbound Trace Visualization](https://github.com/kubiks-inc/otel/blob/main/images/otel-inbound-trace.png)
_Visualize your email operations with detailed span information including recipients, subjects, scheduling, and webhook processing._
## Installation
```bash
npm install @kubiks/otel-inbound
# or
pnpm add @kubiks/otel-inbound
```
**Peer Dependencies:** `@opentelemetry/api` >= 1.9.0, `@inboundemail/sdk` >= 4.0.0
## Quick Start
```ts
import { Inbound } from "@inboundemail/sdk";
import { instrumentInbound } from "@kubiks/otel-inbound";
const inbound = instrumentInbound(new Inbound(process.env.INBOUND_API_KEY!));
await inbound.emails.send({
from: "hello@example.com",
to: ["user@example.com"],
subject: "Welcome",
html: "<p>Hello world</p>",
});
```
`instrumentInbound` wraps the instance you already use — no configuration changes
needed. Every SDK call creates a client span with useful attributes.
## What Gets Traced
This instrumentation wraps all Inbound API operations, creating spans for each:
### Email Operations
- `emails.send()` - Send email
- `emails.schedule()` - Schedule email for later delivery
- `emails.reply()` - Reply to an existing email thread
- `emails.retrieve()` - Retrieve email details
- `emails.listScheduled()` - List scheduled emails
- `emails.getScheduled()` - Get specific scheduled email
- `emails.cancelScheduled()` - Cancel a scheduled email
### Management Operations
- **Endpoints**: `list()`, `create()`, `get()`, `update()`, `delete()`
- **Addresses**: `list()`, `create()`, `get()`, `update()`, `delete()`
- **Domains**: `list()`, `create()`, `get()`, `update()`, `delete()`, `getDNS()`
### Thread & Attachment Operations
- **Threads**: `list()`, `get()`, `actions()`, `statistics()`
- **Attachments**: `download()`
### Webhook Receivers
- Incoming email webhooks (via `instrumentInboundWebhook`)
## Span Attributes
Each span includes relevant attributes based on the operation type:
### Base Attributes (All Operations)
| Attribute | Description | Example |
| ----------------------- | ---------------------------- | ------------------------ |
| `messaging.system` | Constant value `inbound` | `inbound` |
| `messaging.operation` | Operation type | `send`, `schedule`, etc. |
| `inbound.resource` | Resource being accessed | `emails`, `endpoints` |
| `inbound.target` | Full operation target | `emails.send` |
### Email Attributes
| Attribute | Description | Example |
| --------------------------- | --------------------------------- | --------------------------------------- |
| `inbound.message_id` | Message ID returned by Inbound | `msg_123` |
| `inbound.to_addresses` | Comma-separated TO addresses | `user@example.com, another@example.com` |
| `inbound.cc_addresses` | Comma-separated CC addresses | `cc@example.com` |
| `inbound.bcc_addresses` | Comma-separated BCC addresses | `bcc@example.com` |
| `inbound.recipient_count` | Total number of recipients | `3` |
| `inbound.from` | Sender email address | `noreply@example.com` |
| `inbound.subject` | Email subject line | `Welcome to our service` |
| `inbound.html_content` | HTML content (if capture enabled) | `<p>Hello</p>` |
| `inbound.text_content` | Text content (if capture enabled) | `Hello` |
### Scheduling Attributes
| Attribute | Description | Example |
| ----------------------- | ---------------------------- | ------------------------ |
| `inbound.scheduled_at` | Scheduled delivery time | `2025-01-01T00:00:00Z` |
| `inbound.schedule_id` | Schedule ID from API | `sched_123` |
### Management Attributes
| Attribute | Description | Example |
| ---------------------- | ------------------------- | ------------ |
| `inbound.endpoint_id` | Endpoint identifier | `ep_123` |
| `inbound.domain_id` | Domain identifier | `dom_123` |
| `inbound.address_id` | Email address identifier | `addr_123` |
### Thread & Attachment Attributes
| Attribute | Description | Example |
| ------------------------ | -------------------------- | -------------- |
| `inbound.thread_id` | Email thread identifier | `thread_123` |
| `inbound.attachment_id` | Attachment identifier | `attach_123` |
### Webhook Attributes
| Attribute | Description | Example |
| ---------------------- | ------------------------------ | ---------------- |
| `inbound.webhook_id` | Webhook identifier from header | `webhook_123` |
| `http.status_code` | HTTP response status code | `200` |
## Advanced Usage
### Webhook Receiver Instrumentation
Instrument Next.js route handlers that receive incoming emails:
```ts
import { instrumentInboundWebhook } from "@kubiks/otel-inbound";
export const POST = instrumentInboundWebhook(async (request: Request) => {
const email = await request.json();
// Process incoming email
console.log('Received email from:', email.from);
console.log('Subject:', email.subject);
// Your email processing logic here
await processIncomingEmail(email);
return Response.json({ success: true });
});
```
This creates SERVER spans (SpanKind.SERVER) that automatically capture:
- Email metadata from webhook payload
- Webhook headers
- Response status
- Any errors during processing
### Configuration Options
Control what data is captured in your spans:
```ts
import { instrumentInbound, type InstrumentInboundConfig } from "@kubiks/otel-inbound";
const config: InstrumentInboundConfig = {
// Capture email HTML/text content in spans (default: false)
captureEmailContent: true,
// Maximum content length before truncation (default: 1024)
maxContentLength: 2048,
};
const inbound = instrumentInbound(
new Inbound(process.env.INBOUND_API_KEY!),
config
);
```
**Note:** Be cautious when enabling `captureEmailContent` as it may capture sensitive information in your traces.
### Scheduling Emails
```ts
await inbound.emails.schedule({
from: "noreply@example.com",
to: "user@example.com",
subject: "Scheduled Newsletter",
html: "<p>Weekly update</p>",
scheduledAt: "2025-01-01T09:00:00Z",
});
// List all scheduled emails
const scheduled = await inbound.emails.listScheduled();
// Cancel a scheduled email
await inbound.emails.cancelScheduled("sched_123");
```
### Reply to Emails
```ts
await inbound.emails.reply({
from: "support@example.com",
to: "customer@example.com",
subject: "Re: Support Request",
html: "<p>Thanks for reaching out!</p>",
threadId: "thread_123", // Thread ID from webhook payload
});
```
### Domain Management
```ts
// Create a domain
const domain = await inbound.domains.create({
domain: "yourdomain.com",
});
// Get DNS records for verification
const dns = await inbound.domains.getDNS(domain.data.id);
console.log("Add these DNS records:", dns.data.records);
// List all domains
const domains = await inbound.domains.list();
```
### Endpoint Management
```ts
// Create webhook endpoint
const endpoint = await inbound.endpoints.create({
url: "https://yourdomain.com/webhook",
events: ["email.received"],
});
// Update endpoint
await inbound.endpoints.update(endpoint.data.id, {
url: "https://yourdomain.com/new-webhook",
});
// Delete endpoint
await inbound.endpoints.delete(endpoint.data.id);
```
### Complete Example with Webhook
```ts
import { Inbound } from "@inboundemail/sdk";
import { instrumentInbound, instrumentInboundWebhook } from "@kubiks/otel-inbound";
// Instrument the Inbound client
const inbound = instrumentInbound(
new Inbound(process.env.INBOUND_API_KEY!),
{ captureEmailContent: true }
);
// Send an email
await inbound.emails.send({
from: "hello@yourdomain.com",
to: "user@example.com",
subject: "Welcome!",
html: "<p>Thanks for signing up!</p>",
});
// Webhook handler for receiving emails
export const POST = instrumentInboundWebhook(
async (request: Request) => {
const email = await request.json();
// Automatically reply to incoming emails
await inbound.emails.reply({
from: email.to,
to: email.from,
subject: `Re: ${email.subject}`,
html: "<p>Thanks for your email! We'll get back to you soon.</p>",
threadId: email.threadId,
});
return Response.json({ processed: true });
},
{ captureEmailContent: true }
);
```
## License
MIT

View File

@@ -0,0 +1,54 @@
{
"name": "@kubiks/otel-inbound",
"version": "1.0.0",
"private": false,
"publishConfig": {
"access": "public"
},
"description": "OpenTelemetry instrumentation for the Inbound email API SDK",
"author": "Kubiks",
"license": "MIT",
"repository": "kubiks-inc/otel",
"sideEffects": false,
"type": "module",
"exports": {
".": {
"types": "./dist/types/index.d.ts",
"import": "./dist/index.js",
"default": "./dist/index.js"
}
},
"main": "./dist/index.js",
"types": "./dist/types/index.d.ts",
"files": [
"dist",
"LICENSE",
"README.md"
],
"engines": {
"node": "^18.19.0 || >=20.6.0"
},
"scripts": {
"build": "pnpm clean && tsc",
"clean": "rimraf dist",
"prepublishOnly": "pnpm build",
"type-check": "tsc --noEmit",
"unit-test": "vitest --run",
"unit-test-watch": "vitest"
},
"dependencies": {},
"devDependencies": {
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/sdk-trace-base": "^2.1.0",
"@types/node": "18.15.11",
"@inboundemail/sdk": "^4.3.1",
"rimraf": "3.0.2",
"typescript": "^5",
"vitest": "0.33.0"
},
"peerDependencies": {
"@opentelemetry/api": ">=1.9.0 <2.0.0",
"@inboundemail/sdk": ">=4.0.0"
}
}

View File

@@ -0,0 +1,687 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { SpanStatusCode, trace } from "@opentelemetry/api";
import {
BasicTracerProvider,
InMemorySpanExporter,
SimpleSpanProcessor,
} from "@opentelemetry/sdk-trace-base";
import {
instrumentInbound,
instrumentInboundWebhook,
SEMATTRS_MESSAGING_OPERATION,
SEMATTRS_MESSAGING_SYSTEM,
SEMATTRS_INBOUND_MESSAGE_ID,
SEMATTRS_INBOUND_RECIPIENT_COUNT,
SEMATTRS_INBOUND_RESOURCE,
SEMATTRS_INBOUND_TARGET,
SEMATTRS_INBOUND_TO_ADDRESSES,
SEMATTRS_INBOUND_CC_ADDRESSES,
SEMATTRS_INBOUND_BCC_ADDRESSES,
SEMATTRS_INBOUND_FROM,
SEMATTRS_INBOUND_SUBJECT,
SEMATTRS_INBOUND_HTML_CONTENT,
SEMATTRS_INBOUND_TEXT_CONTENT,
SEMATTRS_INBOUND_SCHEDULED_AT,
SEMATTRS_INBOUND_SCHEDULE_ID,
SEMATTRS_INBOUND_ENDPOINT_ID,
SEMATTRS_INBOUND_DOMAIN_ID,
SEMATTRS_INBOUND_ADDRESS_ID,
SEMATTRS_INBOUND_THREAD_ID,
SEMATTRS_INBOUND_ATTACHMENT_ID,
SEMATTRS_HTTP_STATUS_CODE,
} from "./index";
describe("instrumentInbound", () => {
let provider: BasicTracerProvider;
let exporter: InMemorySpanExporter;
beforeEach(() => {
exporter = new InMemorySpanExporter();
provider = new BasicTracerProvider({
spanProcessors: [new SimpleSpanProcessor(exporter)],
});
trace.setGlobalTracerProvider(provider);
});
afterEach(async () => {
await provider.shutdown();
exporter.reset();
trace.disable();
});
const createMockInbound = () => {
return {
emails: {
send: vi.fn(async (payload: any) => ({
data: { id: "email_123" },
error: null,
})),
schedule: vi.fn(async (payload: any) => ({
data: { id: "email_123", scheduleId: "sched_456" },
error: null,
})),
reply: vi.fn(async (payload: any) => ({
data: { id: "email_789" },
error: null,
})),
retrieve: vi.fn(async (id: string) => ({
data: { id, from: "sender@example.com", to: "user@example.com" },
error: null,
})),
listScheduled: vi.fn(async () => ({
data: [{ id: "sched_1" }, { id: "sched_2" }],
error: null,
})),
getScheduled: vi.fn(async (id: string) => ({
data: { id, scheduledAt: "2025-01-01T00:00:00Z" },
error: null,
})),
cancelScheduled: vi.fn(async (id: string) => ({
data: { success: true },
error: null,
})),
},
endpoints: {
list: vi.fn(async () => ({
data: [{ id: "ep_1" }, { id: "ep_2" }],
error: null,
})),
create: vi.fn(async (payload: any) => ({
data: { id: "ep_123" },
error: null,
})),
get: vi.fn(async (id: string) => ({
data: { id },
error: null,
})),
update: vi.fn(async (id: string, payload: any) => ({
data: { id },
error: null,
})),
delete: vi.fn(async (id: string) => ({
data: { success: true },
error: null,
})),
},
addresses: {
list: vi.fn(async () => ({
data: [{ id: "addr_1" }, { id: "addr_2" }],
error: null,
})),
create: vi.fn(async (payload: any) => ({
data: { id: "addr_123" },
error: null,
})),
get: vi.fn(async (id: string) => ({
data: { id },
error: null,
})),
update: vi.fn(async (id: string, payload: any) => ({
data: { id },
error: null,
})),
delete: vi.fn(async (id: string) => ({
data: { success: true },
error: null,
})),
},
domains: {
list: vi.fn(async () => ({
data: [{ id: "dom_1" }, { id: "dom_2" }],
error: null,
})),
create: vi.fn(async (payload: any) => ({
data: { id: "dom_123" },
error: null,
})),
get: vi.fn(async (id: string) => ({
data: { id },
error: null,
})),
update: vi.fn(async (id: string, payload: any) => ({
data: { id },
error: null,
})),
delete: vi.fn(async (id: string) => ({
data: { success: true },
error: null,
})),
getDNS: vi.fn(async (id: string) => ({
data: { records: [] },
error: null,
})),
},
threads: {
list: vi.fn(async () => ({
data: [{ id: "thread_1" }, { id: "thread_2" }],
error: null,
})),
get: vi.fn(async (id: string) => ({
data: { id },
error: null,
})),
actions: vi.fn(async (id: string, action: any) => ({
data: { success: true },
error: null,
})),
statistics: vi.fn(async () => ({
data: { total: 100 },
error: null,
})),
},
attachments: {
download: vi.fn(async (id: string) => ({
data: new Blob(),
error: null,
})),
},
};
};
describe("Email Operations", () => {
it("instruments emails.send with all attributes", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
const payload = {
to: ["user@example.com", "second@example.com"],
cc: ["cc@example.com"],
bcc: "bcc@example.com",
from: "sender@example.com",
subject: "Test Email",
html: "<p>Hello</p>",
text: "Hello",
};
const response = await inbound.emails.send(payload);
expect(response.data?.id).toBe("email_123");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("inbound.emails.send");
expect(span.attributes[SEMATTRS_MESSAGING_SYSTEM]).toBe("inbound");
expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("send");
expect(span.attributes[SEMATTRS_INBOUND_RESOURCE]).toBe("emails");
expect(span.attributes[SEMATTRS_INBOUND_TARGET]).toBe("emails.send");
expect(span.attributes[SEMATTRS_INBOUND_MESSAGE_ID]).toBe("email_123");
expect(span.attributes[SEMATTRS_INBOUND_RECIPIENT_COUNT]).toBe(4);
expect(span.attributes[SEMATTRS_INBOUND_TO_ADDRESSES]).toBe(
"user@example.com, second@example.com"
);
expect(span.attributes[SEMATTRS_INBOUND_CC_ADDRESSES]).toBe("cc@example.com");
expect(span.attributes[SEMATTRS_INBOUND_BCC_ADDRESSES]).toBe("bcc@example.com");
expect(span.attributes[SEMATTRS_INBOUND_FROM]).toBe("sender@example.com");
expect(span.attributes[SEMATTRS_INBOUND_SUBJECT]).toBe("Test Email");
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("instruments emails.schedule with scheduling attributes", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
const payload = {
to: "user@example.com",
from: "sender@example.com",
subject: "Scheduled Email",
html: "<p>Hello</p>",
scheduledAt: "2025-01-01T00:00:00Z",
};
const response = await inbound.emails.schedule(payload);
expect(response.data?.scheduleId).toBe("sched_456");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("inbound.emails.schedule");
expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("schedule");
expect(span.attributes[SEMATTRS_INBOUND_SCHEDULED_AT]).toBe("2025-01-01T00:00:00Z");
expect(span.attributes[SEMATTRS_INBOUND_SCHEDULE_ID]).toBe("sched_456");
});
it("instruments emails.reply with thread tracking", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
const payload = {
to: "user@example.com",
from: "sender@example.com",
subject: "Re: Test",
html: "<p>Reply</p>",
threadId: "thread_123",
};
await inbound.emails.reply(payload);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("inbound.emails.reply");
expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("reply");
expect(span.attributes[SEMATTRS_INBOUND_THREAD_ID]).toBe("thread_123");
});
it("instruments emails.retrieve", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
await inbound.emails.retrieve("email_123");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("inbound.emails.retrieve");
expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("retrieve");
expect(span.attributes[SEMATTRS_INBOUND_MESSAGE_ID]).toBe("email_123");
});
it("instruments scheduled email operations", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
await inbound.emails.listScheduled();
await inbound.emails.getScheduled("sched_123");
await inbound.emails.cancelScheduled("sched_123");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(3);
expect(spans[0].name).toBe("inbound.emails.listScheduled");
expect(spans[0].attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("list");
expect(spans[1].name).toBe("inbound.emails.getScheduled");
expect(spans[1].attributes[SEMATTRS_INBOUND_SCHEDULE_ID]).toBe("sched_123");
expect(spans[2].name).toBe("inbound.emails.cancelScheduled");
expect(spans[2].attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("cancel");
});
it("captures email content when enabled", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound, { captureEmailContent: true });
const payload = {
to: "user@example.com",
from: "sender@example.com",
subject: "Test",
html: "<p>HTML content</p>",
text: "Text content",
};
await inbound.emails.send(payload);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.attributes[SEMATTRS_INBOUND_HTML_CONTENT]).toBe("<p>HTML content</p>");
expect(span.attributes[SEMATTRS_INBOUND_TEXT_CONTENT]).toBe("Text content");
});
it("truncates long content", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound, {
captureEmailContent: true,
maxContentLength: 10,
});
const payload = {
to: "user@example.com",
from: "sender@example.com",
subject: "Test",
html: "This is a very long HTML content that should be truncated",
};
await inbound.emails.send(payload);
const spans = exporter.getFinishedSpans();
const span = spans[0];
expect(span.attributes[SEMATTRS_INBOUND_HTML_CONTENT]).toContain("... (truncated)");
});
});
describe("Management Operations", () => {
it("instruments endpoint operations", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
await inbound.endpoints.list();
await inbound.endpoints.create({ url: "https://example.com" });
await inbound.endpoints.get("ep_123");
await inbound.endpoints.update("ep_123", { url: "https://new.com" });
await inbound.endpoints.delete("ep_123");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(5);
expect(spans[0].name).toBe("inbound.endpoints.list");
expect(spans[0].attributes[SEMATTRS_INBOUND_RESOURCE]).toBe("endpoints");
expect(spans[1].name).toBe("inbound.endpoints.create");
expect(spans[1].attributes[SEMATTRS_INBOUND_ENDPOINT_ID]).toBe("ep_123");
expect(spans[2].name).toBe("inbound.endpoints.get");
expect(spans[2].attributes[SEMATTRS_INBOUND_ENDPOINT_ID]).toBe("ep_123");
expect(spans[3].name).toBe("inbound.endpoints.update");
expect(spans[4].name).toBe("inbound.endpoints.delete");
});
it("instruments address operations", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
await inbound.addresses.list();
await inbound.addresses.create({ email: "test@example.com" });
await inbound.addresses.get("addr_123");
await inbound.addresses.update("addr_123", { name: "Test" });
await inbound.addresses.delete("addr_123");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(5);
expect(spans[0].name).toBe("inbound.addresses.list");
expect(spans[1].name).toBe("inbound.addresses.create");
expect(spans[1].attributes[SEMATTRS_INBOUND_ADDRESS_ID]).toBe("addr_123");
expect(spans[2].attributes[SEMATTRS_INBOUND_ADDRESS_ID]).toBe("addr_123");
});
it("instruments domain operations", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
await inbound.domains.list();
await inbound.domains.create({ domain: "example.com" });
await inbound.domains.get("dom_123");
await inbound.domains.update("dom_123", { name: "Example" });
await inbound.domains.delete("dom_123");
await inbound.domains.getDNS("dom_123");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(6);
expect(spans[0].name).toBe("inbound.domains.list");
expect(spans[1].name).toBe("inbound.domains.create");
expect(spans[1].attributes[SEMATTRS_INBOUND_DOMAIN_ID]).toBe("dom_123");
expect(spans[5].name).toBe("inbound.domains.getDNS");
expect(spans[5].attributes[SEMATTRS_INBOUND_DOMAIN_ID]).toBe("dom_123");
});
});
describe("Thread Operations", () => {
it("instruments thread operations", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
await inbound.threads.list();
await inbound.threads.get("thread_123");
await inbound.threads.actions("thread_123", { action: "archive" });
await inbound.threads.statistics();
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(4);
expect(spans[0].name).toBe("inbound.threads.list");
expect(spans[1].name).toBe("inbound.threads.get");
expect(spans[1].attributes[SEMATTRS_INBOUND_THREAD_ID]).toBe("thread_123");
expect(spans[2].name).toBe("inbound.threads.actions");
expect(spans[3].name).toBe("inbound.threads.statistics");
});
});
describe("Attachment Operations", () => {
it("instruments attachment download", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
await inbound.attachments.download("attach_123");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("inbound.attachments.download");
expect(span.attributes[SEMATTRS_INBOUND_ATTACHMENT_ID]).toBe("attach_123");
});
});
describe("Error Handling", () => {
it("captures errors and marks span status", async () => {
const inbound = createMockInbound();
inbound.emails.send = vi.fn().mockRejectedValue(new Error("API Error"));
instrumentInbound(inbound);
await expect(
inbound.emails.send({
to: "test@example.com",
from: "sender@example.com",
subject: "Test",
html: "<p>Test</p>",
})
).rejects.toThrowError("API Error");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.status.code).toBe(SpanStatusCode.ERROR);
const hasException = span.events.some((event) => event.name === "exception");
expect(hasException).toBe(true);
});
});
describe("Idempotency", () => {
it("is idempotent", async () => {
const inbound = createMockInbound();
const first = instrumentInbound(inbound);
const second = instrumentInbound(first);
expect(first).toBe(second);
await second.emails.send({
to: "test@example.com",
from: "sender@example.com",
subject: "Test",
html: "<p>Test</p>",
});
expect(exporter.getFinishedSpans()).toHaveLength(1);
});
});
describe("Edge Cases", () => {
it("handles missing optional fields gracefully", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
const payload = {
to: "single@example.com",
from: "sender@example.com",
subject: "Test",
html: "<p>Test</p>",
};
await inbound.emails.send(payload);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.attributes[SEMATTRS_INBOUND_RECIPIENT_COUNT]).toBe(1);
expect(span.attributes[SEMATTRS_INBOUND_TO_ADDRESSES]).toBe("single@example.com");
expect(span.attributes[SEMATTRS_INBOUND_CC_ADDRESSES]).toBeUndefined();
expect(span.attributes[SEMATTRS_INBOUND_BCC_ADDRESSES]).toBeUndefined();
});
it("handles mixed string and array formats", async () => {
const inbound = createMockInbound();
instrumentInbound(inbound);
const payload = {
to: "single@example.com",
cc: ["cc1@example.com", "cc2@example.com"],
bcc: ["bcc@example.com"],
from: "sender@example.com",
subject: "Mixed Format",
html: "<p>Test</p>",
};
await inbound.emails.send(payload);
const spans = exporter.getFinishedSpans();
const span = spans[0];
expect(span.attributes[SEMATTRS_INBOUND_TO_ADDRESSES]).toBe("single@example.com");
expect(span.attributes[SEMATTRS_INBOUND_CC_ADDRESSES]).toBe(
"cc1@example.com, cc2@example.com"
);
expect(span.attributes[SEMATTRS_INBOUND_BCC_ADDRESSES]).toBe("bcc@example.com");
expect(span.attributes[SEMATTRS_INBOUND_RECIPIENT_COUNT]).toBe(4);
});
});
});
describe("instrumentInboundWebhook", () => {
let provider: BasicTracerProvider;
let exporter: InMemorySpanExporter;
beforeEach(() => {
exporter = new InMemorySpanExporter();
provider = new BasicTracerProvider({
spanProcessors: [new SimpleSpanProcessor(exporter)],
});
trace.setGlobalTracerProvider(provider);
});
afterEach(async () => {
await provider.shutdown();
exporter.reset();
trace.disable();
});
it("instruments webhook receivers", async () => {
const handler = vi.fn(async (request: Request) => {
return Response.json({ success: true });
});
const instrumentedHandler = instrumentInboundWebhook(handler);
const emailPayload = {
from: "sender@example.com",
to: "recipient@example.com",
subject: "Webhook Test",
html: "<p>Test</p>",
messageId: "msg_123",
};
const request = new Request("https://example.com/webhook", {
method: "POST",
headers: {
"content-type": "application/json",
"x-webhook-id": "webhook_123",
"x-message-id": "msg_123",
},
body: JSON.stringify(emailPayload),
});
const response = await instrumentedHandler(request);
expect(response.status).toBe(200);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("inbound.webhook.receive");
expect(span.attributes[SEMATTRS_MESSAGING_SYSTEM]).toBe("inbound");
expect(span.attributes[SEMATTRS_MESSAGING_OPERATION]).toBe("receive");
expect(span.attributes[SEMATTRS_INBOUND_RESOURCE]).toBe("webhook");
expect(span.attributes[SEMATTRS_INBOUND_FROM]).toBe("sender@example.com");
expect(span.attributes[SEMATTRS_INBOUND_SUBJECT]).toBe("Webhook Test");
expect(span.attributes[SEMATTRS_INBOUND_MESSAGE_ID]).toBe("msg_123");
expect(span.attributes[SEMATTRS_HTTP_STATUS_CODE]).toBe(200);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("captures webhook email content when enabled", async () => {
const handler = vi.fn(async () => Response.json({ success: true }));
const instrumentedHandler = instrumentInboundWebhook(handler, {
captureEmailContent: true,
});
const emailPayload = {
from: "sender@example.com",
to: "recipient@example.com",
subject: "Content Test",
html: "<p>HTML content</p>",
text: "Text content",
};
const request = new Request("https://example.com/webhook", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(emailPayload),
});
await instrumentedHandler(request);
const spans = exporter.getFinishedSpans();
const span = spans[0];
expect(span.attributes[SEMATTRS_INBOUND_HTML_CONTENT]).toBe("<p>HTML content</p>");
expect(span.attributes[SEMATTRS_INBOUND_TEXT_CONTENT]).toBe("Text content");
});
it("handles webhook errors", async () => {
const handler = vi.fn(async () => {
throw new Error("Webhook processing failed");
});
const instrumentedHandler = instrumentInboundWebhook(handler);
const request = new Request("https://example.com/webhook", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ from: "test@example.com" }),
});
await expect(instrumentedHandler(request)).rejects.toThrowError(
"Webhook processing failed"
);
const spans = exporter.getFinishedSpans();
const span = spans[0];
expect(span.status.code).toBe(SpanStatusCode.ERROR);
const hasException = span.events.some((event) => event.name === "exception");
expect(hasException).toBe(true);
});
it("marks non-2xx responses as errors", async () => {
const handler = vi.fn(async () => {
return new Response("Bad Request", { status: 400 });
});
const instrumentedHandler = instrumentInboundWebhook(handler);
const request = new Request("https://example.com/webhook", {
method: "POST",
body: JSON.stringify({}),
});
const response = await instrumentedHandler(request);
expect(response.status).toBe(400);
const spans = exporter.getFinishedSpans();
const span = spans[0];
expect(span.attributes[SEMATTRS_HTTP_STATUS_CODE]).toBe(400);
expect(span.status.code).toBe(SpanStatusCode.ERROR);
});
});

View File

@@ -0,0 +1,906 @@
import {
context,
SpanKind,
SpanStatusCode,
trace,
type Span,
} from "@opentelemetry/api";
const DEFAULT_TRACER_NAME = "@kubiks/otel-inbound";
const INSTRUMENTED_FLAG = Symbol("kubiksOtelInboundInstrumented");
// Semantic attribute constants - Base
export const SEMATTRS_MESSAGING_SYSTEM = "messaging.system" as const;
export const SEMATTRS_MESSAGING_OPERATION = "messaging.operation" as const;
export const SEMATTRS_INBOUND_RESOURCE = "inbound.resource" as const;
export const SEMATTRS_INBOUND_TARGET = "inbound.target" as const;
// Email-specific attributes
export const SEMATTRS_INBOUND_MESSAGE_ID = "inbound.message_id" as const;
export const SEMATTRS_INBOUND_TO_ADDRESSES = "inbound.to_addresses" as const;
export const SEMATTRS_INBOUND_CC_ADDRESSES = "inbound.cc_addresses" as const;
export const SEMATTRS_INBOUND_BCC_ADDRESSES = "inbound.bcc_addresses" as const;
export const SEMATTRS_INBOUND_RECIPIENT_COUNT = "inbound.recipient_count" as const;
export const SEMATTRS_INBOUND_FROM = "inbound.from" as const;
export const SEMATTRS_INBOUND_SUBJECT = "inbound.subject" as const;
export const SEMATTRS_INBOUND_HTML_CONTENT = "inbound.html_content" as const;
export const SEMATTRS_INBOUND_TEXT_CONTENT = "inbound.text_content" as const;
// Scheduling attributes
export const SEMATTRS_INBOUND_SCHEDULED_AT = "inbound.scheduled_at" as const;
export const SEMATTRS_INBOUND_SCHEDULE_ID = "inbound.schedule_id" as const;
// Management attributes
export const SEMATTRS_INBOUND_ENDPOINT_ID = "inbound.endpoint_id" as const;
export const SEMATTRS_INBOUND_DOMAIN_ID = "inbound.domain_id" as const;
export const SEMATTRS_INBOUND_ADDRESS_ID = "inbound.address_id" as const;
export const SEMATTRS_INBOUND_THREAD_ID = "inbound.thread_id" as const;
export const SEMATTRS_INBOUND_ATTACHMENT_ID = "inbound.attachment_id" as const;
// Webhook-specific attributes
export const SEMATTRS_INBOUND_WEBHOOK_ID = "inbound.webhook_id" as const;
export const SEMATTRS_HTTP_STATUS_CODE = "http.status_code" as const;
// Configuration interface
export interface InstrumentInboundConfig {
/**
* Whether to capture email content (html/text) in spans.
* @default false
*/
captureEmailContent?: boolean;
/**
* Maximum length of content to capture. Content longer than this will be truncated.
* @default 1024
*/
maxContentLength?: number;
}
interface InstrumentedInbound {
[INSTRUMENTED_FLAG]?: true;
emails?: any;
endpoints?: any;
addresses?: any;
domains?: any;
threads?: any;
attachments?: any;
}
// Helper function to extract email addresses
function extractEmailAddresses(value: string | string[] | undefined): string[] {
if (!value) {
return [];
}
if (typeof value === "string") {
const trimmed = value.trim();
return trimmed ? [trimmed] : [];
}
if (Array.isArray(value)) {
return value
.filter((email) => typeof email === "string" && email.trim())
.map((email) => email.trim());
}
return [];
}
// Helper function to serialize and truncate content
function serializeContent(content: unknown, maxLength: number): string {
try {
const serialized = typeof content === "string" ? content : JSON.stringify(content);
if (serialized.length > maxLength) {
return serialized.substring(0, maxLength) + "... (truncated)";
}
return serialized;
} catch (error) {
return "[Unable to serialize content]";
}
}
// Annotate email operation spans
function annotateEmailSpan(
span: Span,
operation: string,
resource: string,
payload: any,
config?: InstrumentInboundConfig
): void {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: operation,
[SEMATTRS_INBOUND_RESOURCE]: resource,
[SEMATTRS_INBOUND_TARGET]: `${resource}.${operation}`,
});
// Extract and set email addresses
const toAddresses = extractEmailAddresses(payload.to);
if (toAddresses.length > 0) {
span.setAttribute(SEMATTRS_INBOUND_TO_ADDRESSES, toAddresses.join(", "));
}
const ccAddresses = extractEmailAddresses(payload.cc);
if (ccAddresses.length > 0) {
span.setAttribute(SEMATTRS_INBOUND_CC_ADDRESSES, ccAddresses.join(", "));
}
const bccAddresses = extractEmailAddresses(payload.bcc);
if (bccAddresses.length > 0) {
span.setAttribute(SEMATTRS_INBOUND_BCC_ADDRESSES, bccAddresses.join(", "));
}
// Count recipients
const recipientCount = toAddresses.length + ccAddresses.length + bccAddresses.length;
if (recipientCount > 0) {
span.setAttribute(SEMATTRS_INBOUND_RECIPIENT_COUNT, recipientCount);
}
// Set other email attributes
if (payload.from) {
span.setAttribute(SEMATTRS_INBOUND_FROM, payload.from);
}
if (payload.subject) {
span.setAttribute(SEMATTRS_INBOUND_SUBJECT, payload.subject);
}
// Capture email content if enabled
if (config?.captureEmailContent) {
const maxLength = config.maxContentLength ?? 1024;
if (payload.html) {
span.setAttribute(
SEMATTRS_INBOUND_HTML_CONTENT,
serializeContent(payload.html, maxLength)
);
}
if (payload.text) {
span.setAttribute(
SEMATTRS_INBOUND_TEXT_CONTENT,
serializeContent(payload.text, maxLength)
);
}
}
// Scheduling attributes
if (payload.scheduledAt) {
span.setAttribute(SEMATTRS_INBOUND_SCHEDULED_AT, payload.scheduledAt);
}
// Thread ID for replies
if (payload.threadId) {
span.setAttribute(SEMATTRS_INBOUND_THREAD_ID, payload.threadId);
}
}
// Annotate management operation spans
function annotateManagementSpan(
span: Span,
operation: string,
resource: string,
payload?: any,
response?: any
): void {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: operation,
[SEMATTRS_INBOUND_RESOURCE]: resource,
[SEMATTRS_INBOUND_TARGET]: `${resource}.${operation}`,
});
// Set resource-specific IDs
if (response?.data?.id) {
if (resource === "endpoints") {
span.setAttribute(SEMATTRS_INBOUND_ENDPOINT_ID, response.data.id);
} else if (resource === "domains") {
span.setAttribute(SEMATTRS_INBOUND_DOMAIN_ID, response.data.id);
} else if (resource === "addresses") {
span.setAttribute(SEMATTRS_INBOUND_ADDRESS_ID, response.data.id);
}
}
// Also check payload for ID (for get/update/delete operations)
if (payload && typeof payload === "string") {
if (resource === "endpoints") {
span.setAttribute(SEMATTRS_INBOUND_ENDPOINT_ID, payload);
} else if (resource === "domains") {
span.setAttribute(SEMATTRS_INBOUND_DOMAIN_ID, payload);
} else if (resource === "addresses") {
span.setAttribute(SEMATTRS_INBOUND_ADDRESS_ID, payload);
}
}
}
// Annotate email response
function annotateEmailResponse(span: Span, response: any): void {
if (response?.data?.id) {
span.setAttribute(SEMATTRS_INBOUND_MESSAGE_ID, response.data.id);
}
if (response?.data?.scheduleId) {
span.setAttribute(SEMATTRS_INBOUND_SCHEDULE_ID, response.data.scheduleId);
}
}
// Finalize span with status
function finalizeSpan(span: Span, error?: unknown): void {
if (error) {
if (error instanceof Error) {
span.recordException(error);
} else {
span.recordException(new Error(String(error)));
}
span.setStatus({ code: SpanStatusCode.ERROR });
} else {
span.setStatus({ code: SpanStatusCode.OK });
}
span.end();
}
// Wrapper for async operations
function wrapAsyncOperation(
tracer: any,
spanName: string,
originalFn: Function,
annotator: (span: Span, ...args: any[]) => void,
config?: InstrumentInboundConfig
) {
return async function wrapped(...args: any[]): Promise<any> {
const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT });
annotator(span, ...args);
const activeContext = trace.setSpan(context.active(), span);
try {
const response = await context.with(activeContext, () =>
originalFn.apply(this, args)
);
annotateEmailResponse(span, response);
finalizeSpan(span);
return response;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
}
/**
* Instruments an Inbound client instance with OpenTelemetry tracing.
*
* @param client - The Inbound client instance to instrument
* @param config - Optional configuration for instrumentation
* @returns The instrumented client instance
*
* @example
* ```typescript
* import { Inbound } from '@inboundemail/sdk';
* import { instrumentInbound } from '@kubiks/otel-inbound';
*
* const inbound = instrumentInbound(
* new Inbound(process.env.INBOUND_API_KEY!),
* { captureEmailContent: true }
* );
*
* await inbound.emails.send({
* from: 'hello@example.com',
* to: 'user@example.com',
* subject: 'Welcome',
* html: '<p>Hello world</p>',
* });
* ```
*/
export function instrumentInbound<T extends InstrumentedInbound>(
client: T,
config?: InstrumentInboundConfig
): T {
// Check if already instrumented
if ((client as InstrumentedInbound)[INSTRUMENTED_FLAG]) {
return client;
}
const tracer = trace.getTracer(DEFAULT_TRACER_NAME);
// Instrument emails namespace
if (client.emails) {
const emails = client.emails;
// emails.send
if (emails.send) {
const originalSend = emails.send.bind(emails);
emails.send = wrapAsyncOperation(
tracer,
"inbound.emails.send",
originalSend,
(span, payload) => annotateEmailSpan(span, "send", "emails", payload, config),
config
);
}
// emails.schedule
if (emails.schedule) {
const originalSchedule = emails.schedule.bind(emails);
emails.schedule = wrapAsyncOperation(
tracer,
"inbound.emails.schedule",
originalSchedule,
(span, payload) => annotateEmailSpan(span, "schedule", "emails", payload, config),
config
);
}
// emails.reply
if (emails.reply) {
const originalReply = emails.reply.bind(emails);
emails.reply = wrapAsyncOperation(
tracer,
"inbound.emails.reply",
originalReply,
(span, payload) => annotateEmailSpan(span, "reply", "emails", payload, config),
config
);
}
// emails.retrieve
if (emails.retrieve) {
const originalRetrieve = emails.retrieve.bind(emails);
emails.retrieve = wrapAsyncOperation(
tracer,
"inbound.emails.retrieve",
originalRetrieve,
(span, id) => {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "retrieve",
[SEMATTRS_INBOUND_RESOURCE]: "emails",
[SEMATTRS_INBOUND_TARGET]: "emails.retrieve",
[SEMATTRS_INBOUND_MESSAGE_ID]: id,
});
},
config
);
}
// emails.listScheduled
if (emails.listScheduled) {
const originalListScheduled = emails.listScheduled.bind(emails);
emails.listScheduled = wrapAsyncOperation(
tracer,
"inbound.emails.listScheduled",
originalListScheduled,
(span) => {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "list",
[SEMATTRS_INBOUND_RESOURCE]: "scheduled_emails",
[SEMATTRS_INBOUND_TARGET]: "emails.listScheduled",
});
},
config
);
}
// emails.getScheduled
if (emails.getScheduled) {
const originalGetScheduled = emails.getScheduled.bind(emails);
emails.getScheduled = wrapAsyncOperation(
tracer,
"inbound.emails.getScheduled",
originalGetScheduled,
(span, id) => {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "get",
[SEMATTRS_INBOUND_RESOURCE]: "scheduled_emails",
[SEMATTRS_INBOUND_TARGET]: "emails.getScheduled",
[SEMATTRS_INBOUND_SCHEDULE_ID]: id,
});
},
config
);
}
// emails.cancelScheduled
if (emails.cancelScheduled) {
const originalCancelScheduled = emails.cancelScheduled.bind(emails);
emails.cancelScheduled = wrapAsyncOperation(
tracer,
"inbound.emails.cancelScheduled",
originalCancelScheduled,
(span, id) => {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "cancel",
[SEMATTRS_INBOUND_RESOURCE]: "scheduled_emails",
[SEMATTRS_INBOUND_TARGET]: "emails.cancelScheduled",
[SEMATTRS_INBOUND_SCHEDULE_ID]: id,
});
},
config
);
}
}
// Instrument endpoints namespace
if (client.endpoints) {
const endpoints = client.endpoints;
if (endpoints.list) {
const originalList = endpoints.list.bind(endpoints);
endpoints.list = wrapAsyncOperation(
tracer,
"inbound.endpoints.list",
originalList,
(span) => annotateManagementSpan(span, "list", "endpoints"),
config
);
}
if (endpoints.create) {
const originalCreate = endpoints.create.bind(endpoints);
endpoints.create = async function(...args: any[]) {
const span = tracer.startSpan("inbound.endpoints.create", { kind: SpanKind.CLIENT });
annotateManagementSpan(span, "create", "endpoints", args[0]);
const activeContext = trace.setSpan(context.active(), span);
try {
const response = await context.with(activeContext, () => originalCreate.apply(this, args));
annotateManagementSpan(span, "create", "endpoints", args[0], response);
finalizeSpan(span);
return response;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
}
if (endpoints.get) {
const originalGet = endpoints.get.bind(endpoints);
endpoints.get = wrapAsyncOperation(
tracer,
"inbound.endpoints.get",
originalGet,
(span, id) => annotateManagementSpan(span, "get", "endpoints", id),
config
);
}
if (endpoints.update) {
const originalUpdate = endpoints.update.bind(endpoints);
endpoints.update = wrapAsyncOperation(
tracer,
"inbound.endpoints.update",
originalUpdate,
(span, id, payload) => annotateManagementSpan(span, "update", "endpoints", id),
config
);
}
if (endpoints.delete) {
const originalDelete = endpoints.delete.bind(endpoints);
endpoints.delete = wrapAsyncOperation(
tracer,
"inbound.endpoints.delete",
originalDelete,
(span, id) => annotateManagementSpan(span, "delete", "endpoints", id),
config
);
}
}
// Instrument addresses namespace
if (client.addresses) {
const addresses = client.addresses;
if (addresses.list) {
const originalList = addresses.list.bind(addresses);
addresses.list = wrapAsyncOperation(
tracer,
"inbound.addresses.list",
originalList,
(span) => annotateManagementSpan(span, "list", "addresses"),
config
);
}
if (addresses.create) {
const originalCreate = addresses.create.bind(addresses);
addresses.create = async function(...args: any[]) {
const span = tracer.startSpan("inbound.addresses.create", { kind: SpanKind.CLIENT });
annotateManagementSpan(span, "create", "addresses", args[0]);
const activeContext = trace.setSpan(context.active(), span);
try {
const response = await context.with(activeContext, () => originalCreate.apply(this, args));
annotateManagementSpan(span, "create", "addresses", args[0], response);
finalizeSpan(span);
return response;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
}
if (addresses.get) {
const originalGet = addresses.get.bind(addresses);
addresses.get = wrapAsyncOperation(
tracer,
"inbound.addresses.get",
originalGet,
(span, id) => annotateManagementSpan(span, "get", "addresses", id),
config
);
}
if (addresses.update) {
const originalUpdate = addresses.update.bind(addresses);
addresses.update = wrapAsyncOperation(
tracer,
"inbound.addresses.update",
originalUpdate,
(span, id, payload) => annotateManagementSpan(span, "update", "addresses", id),
config
);
}
if (addresses.delete) {
const originalDelete = addresses.delete.bind(addresses);
addresses.delete = wrapAsyncOperation(
tracer,
"inbound.addresses.delete",
originalDelete,
(span, id) => annotateManagementSpan(span, "delete", "addresses", id),
config
);
}
}
// Instrument domains namespace
if (client.domains) {
const domains = client.domains;
if (domains.list) {
const originalList = domains.list.bind(domains);
domains.list = wrapAsyncOperation(
tracer,
"inbound.domains.list",
originalList,
(span) => annotateManagementSpan(span, "list", "domains"),
config
);
}
if (domains.create) {
const originalCreate = domains.create.bind(domains);
domains.create = async function(...args: any[]) {
const span = tracer.startSpan("inbound.domains.create", { kind: SpanKind.CLIENT });
annotateManagementSpan(span, "create", "domains", args[0]);
const activeContext = trace.setSpan(context.active(), span);
try {
const response = await context.with(activeContext, () => originalCreate.apply(this, args));
annotateManagementSpan(span, "create", "domains", args[0], response);
finalizeSpan(span);
return response;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
}
if (domains.get) {
const originalGet = domains.get.bind(domains);
domains.get = wrapAsyncOperation(
tracer,
"inbound.domains.get",
originalGet,
(span, id) => annotateManagementSpan(span, "get", "domains", id),
config
);
}
if (domains.update) {
const originalUpdate = domains.update.bind(domains);
domains.update = wrapAsyncOperation(
tracer,
"inbound.domains.update",
originalUpdate,
(span, id, payload) => annotateManagementSpan(span, "update", "domains", id),
config
);
}
if (domains.delete) {
const originalDelete = domains.delete.bind(domains);
domains.delete = wrapAsyncOperation(
tracer,
"inbound.domains.delete",
originalDelete,
(span, id) => annotateManagementSpan(span, "delete", "domains", id),
config
);
}
if (domains.getDNS) {
const originalGetDNS = domains.getDNS.bind(domains);
domains.getDNS = wrapAsyncOperation(
tracer,
"inbound.domains.getDNS",
originalGetDNS,
(span, id) => {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "getDNS",
[SEMATTRS_INBOUND_RESOURCE]: "domains",
[SEMATTRS_INBOUND_TARGET]: "domains.getDNS",
[SEMATTRS_INBOUND_DOMAIN_ID]: id,
});
},
config
);
}
}
// Instrument threads namespace
if (client.threads) {
const threads = client.threads;
if (threads.list) {
const originalList = threads.list.bind(threads);
threads.list = wrapAsyncOperation(
tracer,
"inbound.threads.list",
originalList,
(span) => {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "list",
[SEMATTRS_INBOUND_RESOURCE]: "threads",
[SEMATTRS_INBOUND_TARGET]: "threads.list",
});
},
config
);
}
if (threads.get) {
const originalGet = threads.get.bind(threads);
threads.get = wrapAsyncOperation(
tracer,
"inbound.threads.get",
originalGet,
(span, id) => {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "get",
[SEMATTRS_INBOUND_RESOURCE]: "threads",
[SEMATTRS_INBOUND_TARGET]: "threads.get",
[SEMATTRS_INBOUND_THREAD_ID]: id,
});
},
config
);
}
if (threads.actions) {
const originalActions = threads.actions.bind(threads);
threads.actions = wrapAsyncOperation(
tracer,
"inbound.threads.actions",
originalActions,
(span, id, action) => {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "actions",
[SEMATTRS_INBOUND_RESOURCE]: "threads",
[SEMATTRS_INBOUND_TARGET]: "threads.actions",
[SEMATTRS_INBOUND_THREAD_ID]: id,
});
},
config
);
}
if (threads.statistics) {
const originalStatistics = threads.statistics.bind(threads);
threads.statistics = wrapAsyncOperation(
tracer,
"inbound.threads.statistics",
originalStatistics,
(span) => {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "statistics",
[SEMATTRS_INBOUND_RESOURCE]: "threads",
[SEMATTRS_INBOUND_TARGET]: "threads.statistics",
});
},
config
);
}
}
// Instrument attachments namespace
if (client.attachments) {
const attachments = client.attachments;
if (attachments.download) {
const originalDownload = attachments.download.bind(attachments);
attachments.download = wrapAsyncOperation(
tracer,
"inbound.attachments.download",
originalDownload,
(span, id) => {
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "download",
[SEMATTRS_INBOUND_RESOURCE]: "attachments",
[SEMATTRS_INBOUND_TARGET]: "attachments.download",
[SEMATTRS_INBOUND_ATTACHMENT_ID]: id,
});
},
config
);
}
}
// Mark as instrumented
(client as InstrumentedInbound)[INSTRUMENTED_FLAG] = true;
return client;
}
// Type for Next.js route handlers
type RouteHandler = (request: Request) => Promise<Response> | Response;
// Extract webhook headers
function extractWebhookHeaders(request: Request): Record<string, string> {
const attributes: Record<string, string> = {};
// Extract common webhook headers from Inbound
const webhookId = request.headers.get("x-webhook-id") || request.headers.get("x-inbound-webhook-id");
if (webhookId) {
attributes[SEMATTRS_INBOUND_WEBHOOK_ID] = webhookId;
}
const messageId = request.headers.get("x-message-id") || request.headers.get("x-inbound-message-id");
if (messageId) {
attributes[SEMATTRS_INBOUND_MESSAGE_ID] = messageId;
}
return attributes;
}
// Annotate webhook span with email data
function annotateWebhookSpan(
span: Span,
payload: any,
config?: InstrumentInboundConfig
): void {
if (!payload) return;
// Extract email information from webhook payload
if (payload.from) {
span.setAttribute(SEMATTRS_INBOUND_FROM, payload.from);
}
if (payload.to) {
const toAddresses = extractEmailAddresses(payload.to);
if (toAddresses.length > 0) {
span.setAttribute(SEMATTRS_INBOUND_TO_ADDRESSES, toAddresses.join(", "));
}
}
if (payload.subject) {
span.setAttribute(SEMATTRS_INBOUND_SUBJECT, payload.subject);
}
if (payload.messageId || payload.id) {
span.setAttribute(SEMATTRS_INBOUND_MESSAGE_ID, payload.messageId || payload.id);
}
if (payload.threadId) {
span.setAttribute(SEMATTRS_INBOUND_THREAD_ID, payload.threadId);
}
// Capture email content if enabled
if (config?.captureEmailContent) {
const maxLength = config.maxContentLength ?? 1024;
if (payload.html) {
span.setAttribute(
SEMATTRS_INBOUND_HTML_CONTENT,
serializeContent(payload.html, maxLength)
);
}
if (payload.text) {
span.setAttribute(
SEMATTRS_INBOUND_TEXT_CONTENT,
serializeContent(payload.text, maxLength)
);
}
}
}
/**
* Instruments a Next.js route handler to trace incoming webhook requests from Inbound.
*
* @param handler - The Next.js route handler function
* @param config - Optional configuration for instrumentation
* @returns The instrumented route handler
*
* @example
* ```typescript
* import { instrumentInboundWebhook } from '@kubiks/otel-inbound';
*
* export const POST = instrumentInboundWebhook(async (request: Request) => {
* const email = await request.json();
*
* // Process incoming email
* console.log('Received email from:', email.from);
*
* return Response.json({ success: true });
* }, { captureEmailContent: true });
* ```
*/
export function instrumentInboundWebhook(
handler: RouteHandler,
config?: InstrumentInboundConfig
): RouteHandler {
const tracer = trace.getTracer(DEFAULT_TRACER_NAME);
return async function instrumentedWebhook(request: Request): Promise<Response> {
const span = tracer.startSpan("inbound.webhook.receive", {
kind: SpanKind.SERVER,
});
// Set base attributes
span.setAttributes({
[SEMATTRS_MESSAGING_SYSTEM]: "inbound",
[SEMATTRS_MESSAGING_OPERATION]: "receive",
[SEMATTRS_INBOUND_RESOURCE]: "webhook",
[SEMATTRS_INBOUND_TARGET]: "webhook.receive",
});
// Extract webhook headers
const webhookHeaders = extractWebhookHeaders(request);
span.setAttributes(webhookHeaders);
// Try to parse and annotate with email data
try {
const clonedRequest = request.clone();
const payload = await clonedRequest.json();
annotateWebhookSpan(span, payload, config);
} catch (error) {
// Ignore errors when parsing webhook payload
}
const activeContext = trace.setSpan(context.active(), span);
try {
const response = await context.with(activeContext, () => handler(request));
// Capture response status
span.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status);
// Mark as successful if status is 2xx
if (response.status >= 200 && response.status < 300) {
finalizeSpan(span);
} else {
finalizeSpan(span, new Error(`Handler returned status ${response.status}`));
}
return response;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
}

View File

@@ -0,0 +1,22 @@
{
"compilerOptions": {
"target": "ES2020",
"module": "ESNext",
"moduleResolution": "bundler",
"lib": ["ES2020", "DOM"],
"outDir": "dist",
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"strict": false,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"declarationDir": "dist/types",
"stripInternal": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "**/*.test.ts"]
}

36
pnpm-lock.yaml generated
View File

@@ -102,6 +102,30 @@ importers:
specifier: 0.33.0
version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0)
packages/otel-inbound:
devDependencies:
'@inboundemail/sdk':
specifier: ^4.3.1
version: 4.3.1(react-dom@18.3.1(react@18.2.0))(react@18.2.0)
'@opentelemetry/api':
specifier: ^1.9.0
version: 1.9.0
'@opentelemetry/sdk-trace-base':
specifier: ^2.1.0
version: 2.1.0(@opentelemetry/api@1.9.0)
'@types/node':
specifier: 18.15.11
version: 18.15.11
rimraf:
specifier: 3.0.2
version: 3.0.2
typescript:
specifier: ^5
version: 5.3.3
vitest:
specifier: 0.33.0
version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0)
packages/otel-resend:
devDependencies:
'@opentelemetry/api':
@@ -526,6 +550,13 @@ packages:
'@hexagon/base64@1.1.28':
resolution: {integrity: sha512-lhqDEAvWixy3bZ+UOYbPwUbBkwBq5C1LAJ/xPC8Oi+lL54oyakv/npbA0aU2hgCsx/1NUd4IBvV03+aUBWxerw==}
'@inboundemail/sdk@4.3.1':
resolution: {integrity: sha512-WHyGHnGa5Kyzd/rJW4p/XJVqwx9s/e2LgqO3eL1+XCpu3xAU3W8A7oHs1l5kD3kakMuEjAoLacAp+xT3P58q6w==}
engines: {node: '>=16.0.0'}
peerDependencies:
react: '>=16.8.0'
react-dom: '>=16.8.0'
'@isaacs/cliui@8.0.2':
resolution: {integrity: sha512-O8jcjabXaleOG9DQ0+ARXWZBTfnP4WNAqzuiJK7ll44AmxGKv/J2M4TPjxjY3znBCfvBXFzucm1twdyFybFqEA==}
engines: {node: '>=12'}
@@ -2857,6 +2888,11 @@ snapshots:
'@hexagon/base64@1.1.28': {}
'@inboundemail/sdk@4.3.1(react-dom@18.3.1(react@18.2.0))(react@18.2.0)':
dependencies:
react: 18.2.0
react-dom: 18.3.1(react@18.2.0)
'@isaacs/cliui@8.0.2':
dependencies:
string-width: 5.1.2