Merge pull request #26 from kubiks-inc/kubiks-inc/mongodb

OTEL package for mongo tracing
This commit is contained in:
Alex Holovach
2025-10-29 22:56:26 -05:00
committed by GitHub
11 changed files with 2170 additions and 0 deletions

View File

@@ -16,6 +16,7 @@ Our goal is to bring the TypeScript ecosystem the observability tools its bee
- [`@kubiks/otel-better-auth`](./packages/otel-better-auth/README.md)
- [`@kubiks/otel-drizzle`](./packages/otel-drizzle/README.md)
- [`@kubiks/otel-inbound`](./packages/otel-inbound/README.md)
- [`@kubiks/otel-mongodb`](./packages/otel-mongodb/README.md)
- [`@kubiks/otel-resend`](./packages/otel-resend/README.md)
- [`@kubiks/otel-upstash-queues`](./packages/otel-upstash-queues/README.md)

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 MiB

View File

@@ -0,0 +1,6 @@
src/
tsconfig.json
*.test.ts
.gitignore
node_modules/

View File

@@ -0,0 +1,46 @@
# @kubiks/otel-mongodb
## 1.0.0
### Major Changes
- Initial release of OpenTelemetry instrumentation for MongoDB Node.js driver
- **Core Instrumentation**:
- `instrumentMongoClient`: Instrument entire MongoDB client for automatic tracing
- `instrumentDb`: Instrument database level with auto-instrumentation of collections
- `instrumentCollection`: Instrument individual collections with fine-grained control
- **Query Operations**:
- `find()` - Trace find operations with result count (traced on `.toArray()` call)
- `findOne()` - Trace single document queries
- `countDocuments()` - Trace count operations
- `aggregate()` - Trace aggregation pipelines (traced on `.toArray()` call)
- **Write Operations**:
- `insertOne()` and `insertMany()` - Trace document insertions
- `updateOne()` and `updateMany()` - Trace document updates with match/modify counts
- `replaceOne()` - Trace document replacements
- `deleteOne()` and `deleteMany()` - Trace document deletions
- **Atomic Operations**:
- `findOneAndUpdate()` - Trace atomic find and update
- `findOneAndDelete()` - Trace atomic find and delete
- `findOneAndReplace()` - Trace atomic find and replace
- **Rich Span Attributes**:
- Database system, operation type, collection name
- Result counts (matched, modified, deleted, inserted)
- Optional query filter capture with configurable size limits
- Network peer information (hostname, port)
- **Configuration Options**:
- `captureFilters`: Enable/disable query filter capture (default: false for security)
- `maxFilterLength`: Limit captured filter size (default: 500 chars)
- `dbName`: Database name for span attributes
- `peerName` and `peerPort`: MongoDB server connection info
- **Features**:
- Idempotent instrumentation (safe to call multiple times)
- Automatic database name detection
- Comprehensive error handling and span status
- Fixed cursor handling to prevent memory leaks
- Zero-configuration setup with sensible defaults
- Full TypeScript support with proper types
- Comprehensive test coverage with 40+ tests
- Full OpenTelemetry semantic conventions compliance
- Detailed documentation with Next.js and Express examples

View File

@@ -0,0 +1,22 @@
MIT License
Copyright (c) 2025 Kubiks
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,73 @@
# @kubiks/otel-mongodb
OpenTelemetry instrumentation for the [MongoDB Node.js driver](https://www.mongodb.com/docs/drivers/node/current/).
Capture spans for every MongoDB operation, enrich them with query metadata,
and monitor database performance from your traces.
![MongoDB Trace Visualization](https://github.com/kubiks-inc/otel/blob/main/images/otel-mongodb-trace.png)
_Visualize your MongoDB operations with detailed span information including collection names, operation types, and execution metrics._
## Installation
```bash
npm install @kubiks/otel-mongodb
# or
pnpm add @kubiks/otel-mongodb
```
**Peer Dependencies:** `@opentelemetry/api` >= 1.9.0, `mongodb` >= 5.0.0
## Quick Start
```ts
import { MongoClient } from "mongodb";
import { instrumentMongoClient } from "@kubiks/otel-mongodb";
const client = new MongoClient(process.env.MONGODB_URI!);
await client.connect();
instrumentMongoClient(client, {
captureFilters: true,
peerName: "mongodb.example.com",
peerPort: 27017,
});
const db = client.db("myapp");
const users = db.collection("users");
const user = await users.findOne({ email: "user@example.com" });
```
`instrumentMongoClient` wraps the client you already use — no configuration changes
needed. Every database operation creates a client span with useful attributes.
## What Gets Traced
This instrumentation automatically traces all major MongoDB operations including `find`, `findOne`, `insertOne`, `insertMany`, `updateOne`, `updateMany`, `deleteOne`, `deleteMany`, `aggregate`, `countDocuments`, and atomic operations like `findOneAndUpdate`.
## Span Attributes
Each span includes:
| Attribute | Description | Example |
| --------------------------- | ------------------------------------- | ----------------------- |
| `db.system` | Constant value `mongodb` | `mongodb` |
| `db.operation` | MongoDB operation type | `findOne`, `insertMany` |
| `db.mongodb.collection` | Collection name | `users` |
| `db.name` | Database name | `myapp` |
| `net.peer.name` | MongoDB server hostname | `mongodb.example.com` |
| `net.peer.port` | MongoDB server port | `27017` |
| `mongodb.filter` | Query filter (when enabled) | `{"status":"active"}` |
| `mongodb.result_count` | Number of documents returned | `42` |
| `mongodb.inserted_count` | Number of documents inserted | `5` |
| `mongodb.matched_count` | Number of documents matched (updates) | `10` |
| `mongodb.modified_count` | Number of documents modified | `8` |
| `mongodb.deleted_count` | Number of documents deleted | `15` |
| `mongodb.execution_time_ms` | Query execution time (when enabled) | `42.5` |
| `mongodb.pipeline` | Aggregation pipeline | `[{"$match":...}]` |
The instrumentation captures query metadata to help with debugging and monitoring, while optionally capturing filters based on your security requirements.
## License
MIT

View File

@@ -0,0 +1,53 @@
{
"name": "@kubiks/otel-mongodb",
"version": "1.0.0",
"private": false,
"publishConfig": {
"access": "public"
},
"description": "OpenTelemetry instrumentation for MongoDB Node.js driver",
"author": "Kubiks",
"license": "MIT",
"repository": "kubiks-inc/otel",
"sideEffects": false,
"type": "module",
"exports": {
".": {
"types": "./dist/types/index.d.ts",
"import": "./dist/index.js",
"default": "./dist/index.js"
}
},
"main": "./dist/index.js",
"types": "./dist/types/index.d.ts",
"files": [
"dist",
"LICENSE",
"README.md"
],
"engines": {
"node": "^18.19.0 || >=20.6.0"
},
"scripts": {
"build": "pnpm clean && tsc",
"clean": "rimraf dist",
"prepublishOnly": "pnpm build",
"type-check": "tsc --noEmit",
"unit-test": "vitest --run",
"unit-test-watch": "vitest"
},
"dependencies": {},
"devDependencies": {
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/sdk-trace-base": "^2.1.0",
"@types/node": "18.15.11",
"mongodb": "^6.0.0",
"rimraf": "3.0.2",
"typescript": "^5",
"vitest": "0.33.0"
},
"peerDependencies": {
"@opentelemetry/api": ">=1.9.0 <2.0.0",
"mongodb": ">=5.0.0"
}
}

View File

@@ -0,0 +1,842 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { SpanStatusCode, trace } from "@opentelemetry/api";
import {
BasicTracerProvider,
InMemorySpanExporter,
SimpleSpanProcessor,
} from "@opentelemetry/sdk-trace-base";
import type {
Collection,
Db,
MongoClient,
Document,
FindCursor,
AggregationCursor,
} from "mongodb";
import {
instrumentCollection,
instrumentDb,
instrumentMongoClient,
SEMATTRS_DB_SYSTEM,
SEMATTRS_DB_OPERATION,
SEMATTRS_DB_MONGODB_COLLECTION,
SEMATTRS_MONGODB_RESULT_COUNT,
SEMATTRS_MONGODB_MATCHED_COUNT,
SEMATTRS_MONGODB_MODIFIED_COUNT,
SEMATTRS_MONGODB_DELETED_COUNT,
SEMATTRS_MONGODB_INSERTED_COUNT,
SEMATTRS_MONGODB_FILTER,
SEMATTRS_DB_NAME,
SEMATTRS_NET_PEER_NAME,
SEMATTRS_NET_PEER_PORT,
} from "./index";
describe("instrumentCollection", () => {
let provider: BasicTracerProvider;
let exporter: InMemorySpanExporter;
beforeEach(() => {
exporter = new InMemorySpanExporter();
provider = new BasicTracerProvider({
spanProcessors: [new SimpleSpanProcessor(exporter)],
});
trace.setGlobalTracerProvider(provider);
});
afterEach(async () => {
await provider.shutdown();
exporter.reset();
trace.disable();
});
const createMockCollection = (name = "users"): Collection<Document> => {
const mockCursor = {
toArray: vi.fn(async () => [{ _id: "1", name: "John" }]),
} as unknown as FindCursor<any>;
const mockAggCursor = {
toArray: vi.fn(async () => [{ count: 5 }]),
} as unknown as AggregationCursor<any>;
return {
collectionName: name,
find: vi.fn(() => mockCursor),
findOne: vi.fn(async () => ({ _id: "1", name: "John" })),
insertOne: vi.fn(async () => ({
acknowledged: true,
insertedId: "1",
})),
insertMany: vi.fn(async () => ({
acknowledged: true,
insertedCount: 3,
insertedIds: { 0: "1", 1: "2", 2: "3" },
})),
updateOne: vi.fn(async () => ({
acknowledged: true,
matchedCount: 1,
modifiedCount: 1,
upsertedCount: 0,
upsertedId: null,
})),
updateMany: vi.fn(async () => ({
acknowledged: true,
matchedCount: 5,
modifiedCount: 5,
upsertedCount: 0,
upsertedId: null,
})),
deleteOne: vi.fn(async () => ({
acknowledged: true,
deletedCount: 1,
})),
deleteMany: vi.fn(async () => ({
acknowledged: true,
deletedCount: 10,
})),
countDocuments: vi.fn(async () => 42),
aggregate: vi.fn(() => mockAggCursor),
replaceOne: vi.fn(async () => ({
acknowledged: true,
matchedCount: 1,
modifiedCount: 1,
upsertedCount: 0,
upsertedId: null,
})),
findOneAndUpdate: vi.fn(async () => ({
value: { _id: "1", name: "Updated" },
ok: 1,
})),
findOneAndDelete: vi.fn(async () => ({
value: { _id: "1", name: "John" },
ok: 1,
})),
findOneAndReplace: vi.fn(async () => ({
value: { _id: "1", name: "Replaced" },
ok: 1,
})),
} as unknown as Collection<Document>;
};
it("instruments findOne and records span", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection, { dbName: "testdb" });
const result = await collection.findOne({ email: "test@example.com" });
expect(result).toEqual({ _id: "1", name: "John" });
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.findOne");
expect(span.attributes[SEMATTRS_DB_SYSTEM]).toBe("mongodb");
expect(span.attributes[SEMATTRS_DB_OPERATION]).toBe("findOne");
expect(span.attributes[SEMATTRS_DB_MONGODB_COLLECTION]).toBe("users");
expect(span.attributes[SEMATTRS_DB_NAME]).toBe("testdb");
expect(span.attributes[SEMATTRS_MONGODB_RESULT_COUNT]).toBe(1);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("instruments find with toArray and records span", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection);
const cursor = collection.find({ status: "active" });
const results = await cursor.toArray();
expect(results).toHaveLength(1);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.find");
expect(span.attributes[SEMATTRS_DB_OPERATION]).toBe("find");
expect(span.attributes[SEMATTRS_MONGODB_RESULT_COUNT]).toBe(1);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("captures filter when captureFilters is enabled", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection, {
captureFilters: true,
dbName: "testdb",
});
await collection.findOne({ email: "test@example.com", status: "active" });
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
const filter = span.attributes[SEMATTRS_MONGODB_FILTER] as string;
expect(filter).toBeDefined();
expect(filter).toContain("test@example.com");
expect(filter).toContain("active");
});
it("does not capture filter when captureFilters is disabled", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection, {
captureFilters: false,
dbName: "testdb",
});
await collection.findOne({ email: "test@example.com" });
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.attributes[SEMATTRS_MONGODB_FILTER]).toBeUndefined();
});
it("truncates long filters based on maxFilterLength", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection, {
captureFilters: true,
maxFilterLength: 50,
});
const longFilter = { data: "x".repeat(100), field: "value" };
await collection.findOne(longFilter);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
const capturedFilter = span.attributes[SEMATTRS_MONGODB_FILTER] as string;
expect(capturedFilter).toBeDefined();
expect(capturedFilter.length).toBeLessThanOrEqual(53); // 50 + "..."
expect(capturedFilter).toContain("...");
});
it("instruments insertOne and records span", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection);
const result = await collection.insertOne({
name: "Jane",
email: "jane@example.com",
});
expect(result.acknowledged).toBe(true);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.insertOne");
expect(span.attributes[SEMATTRS_DB_OPERATION]).toBe("insertOne");
expect(span.attributes[SEMATTRS_MONGODB_INSERTED_COUNT]).toBe(1);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("instruments insertMany and records span", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection);
const result = await collection.insertMany([
{ name: "User1" },
{ name: "User2" },
{ name: "User3" },
]);
expect(result.insertedCount).toBe(3);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.insertMany");
expect(span.attributes[SEMATTRS_DB_OPERATION]).toBe("insertMany");
expect(span.attributes[SEMATTRS_MONGODB_INSERTED_COUNT]).toBe(3);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("instruments updateOne and records span", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection);
const result = await collection.updateOne({ _id: "1" } as any, {
$set: { name: "Updated" },
});
expect(result.modifiedCount).toBe(1);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.updateOne");
expect(span.attributes[SEMATTRS_DB_OPERATION]).toBe("updateOne");
expect(span.attributes[SEMATTRS_MONGODB_MATCHED_COUNT]).toBe(1);
expect(span.attributes[SEMATTRS_MONGODB_MODIFIED_COUNT]).toBe(1);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("instruments updateMany and records span", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection);
const result = await collection.updateMany(
{ status: "pending" },
{ $set: { status: "processed" } }
);
expect(result.modifiedCount).toBe(5);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.updateMany");
expect(span.attributes[SEMATTRS_DB_OPERATION]).toBe("updateMany");
expect(span.attributes[SEMATTRS_MONGODB_MATCHED_COUNT]).toBe(5);
expect(span.attributes[SEMATTRS_MONGODB_MODIFIED_COUNT]).toBe(5);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("instruments deleteOne and records span", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection);
const result = await collection.deleteOne({ _id: "1" } as any);
expect(result.deletedCount).toBe(1);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.deleteOne");
expect(span.attributes[SEMATTRS_DB_OPERATION]).toBe("deleteOne");
expect(span.attributes[SEMATTRS_MONGODB_DELETED_COUNT]).toBe(1);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("instruments deleteMany and records span", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection);
const result = await collection.deleteMany({ status: "deleted" });
expect(result.deletedCount).toBe(10);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.deleteMany");
expect(span.attributes[SEMATTRS_DB_OPERATION]).toBe("deleteMany");
expect(span.attributes[SEMATTRS_MONGODB_DELETED_COUNT]).toBe(10);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("instruments countDocuments and records span", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection);
const count = await collection.countDocuments({ status: "active" });
expect(count).toBe(42);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.countDocuments");
expect(span.attributes[SEMATTRS_DB_OPERATION]).toBe("countDocuments");
expect(span.attributes[SEMATTRS_MONGODB_RESULT_COUNT]).toBe(42);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("instruments aggregate and records span", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection, { captureFilters: true });
const pipeline = [
{ $match: { status: "active" } },
{ $group: { _id: "$status", count: { $sum: 1 } } },
];
const cursor = collection.aggregate(pipeline);
const results = await cursor.toArray();
expect(results).toHaveLength(1);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.aggregate");
expect(span.attributes[SEMATTRS_DB_OPERATION]).toBe("aggregate");
expect(span.attributes[SEMATTRS_MONGODB_RESULT_COUNT]).toBe(1);
expect(span.status.code).toBe(SpanStatusCode.OK);
const pipelineAttr = span.attributes["mongodb.pipeline"] as string;
expect(pipelineAttr).toBeDefined();
expect(pipelineAttr).toContain("active");
});
it("includes network peer attributes when configured", async () => {
const collection = createMockCollection("users");
instrumentCollection(collection, {
peerName: "mongodb.example.com",
peerPort: 27017,
});
await collection.findOne({ _id: "1" } as any);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.attributes[SEMATTRS_NET_PEER_NAME]).toBe("mongodb.example.com");
expect(span.attributes[SEMATTRS_NET_PEER_PORT]).toBe(27017);
});
it("handles errors and marks span status", async () => {
const collection = createMockCollection("users");
collection.findOne = vi
.fn()
.mockRejectedValue(new Error("Connection error"));
instrumentCollection(collection);
await expect(collection.findOne({ _id: "1" } as any)).rejects.toThrowError(
"Connection error"
);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.status.code).toBe(SpanStatusCode.ERROR);
const hasException = span.events.some(
(event: any) => event.name === "exception"
);
expect(hasException).toBe(true);
});
it("handles findOne returning null", async () => {
const collection = createMockCollection("users");
collection.findOne = vi.fn(async () => null);
instrumentCollection(collection);
const result = await collection.findOne({ _id: "nonexistent" } as any);
expect(result).toBeNull();
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.attributes[SEMATTRS_MONGODB_RESULT_COUNT]).toBe(0);
expect(span.status.code).toBe(SpanStatusCode.OK);
});
it("is idempotent - does not instrument twice", () => {
const collection = createMockCollection("users");
const first = instrumentCollection(collection);
const originalFindOne = first.findOne;
const second = instrumentCollection(first);
expect(second.findOne).toBe(originalFindOne);
});
it("returns collection unchanged if null", () => {
const result = instrumentCollection(null as any);
expect(result).toBeNull();
});
it("handles cursor errors in find.toArray", async () => {
const collection = createMockCollection("users");
const mockCursor = {
toArray: vi.fn().mockRejectedValue(new Error("Cursor error")),
} as unknown as FindCursor<any>;
collection.find = vi.fn(() => mockCursor);
instrumentCollection(collection);
const cursor = collection.find({});
await expect(cursor.toArray()).rejects.toThrowError("Cursor error");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.status.code).toBe(SpanStatusCode.ERROR);
});
it("handles cursor errors in aggregate.toArray", async () => {
const collection = createMockCollection("users");
const mockCursor = {
toArray: vi.fn().mockRejectedValue(new Error("Aggregation error")),
} as unknown as AggregationCursor<any>;
collection.aggregate = vi.fn(() => mockCursor);
instrumentCollection(collection);
const cursor = collection.aggregate([]);
await expect(cursor.toArray()).rejects.toThrowError("Aggregation error");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.status.code).toBe(SpanStatusCode.ERROR);
});
});
describe("instrumentDb", () => {
let provider: BasicTracerProvider;
let exporter: InMemorySpanExporter;
beforeEach(() => {
exporter = new InMemorySpanExporter();
provider = new BasicTracerProvider({
spanProcessors: [new SimpleSpanProcessor(exporter)],
});
trace.setGlobalTracerProvider(provider);
});
afterEach(async () => {
await provider.shutdown();
exporter.reset();
trace.disable();
});
const createMockDb = (dbName = "testdb"): Db => {
// Helper to create a new mock collection for each call
const createCollectionMock = (name: string): Collection<Document> => {
const mockCursor = {
toArray: vi.fn(async () => [{ _id: "1", name: "John" }]),
} as unknown as FindCursor<any>;
const mockAggCursor = {
toArray: vi.fn(async () => [{ count: 5 }]),
} as unknown as AggregationCursor<any>;
return {
collectionName: name,
find: vi.fn(() => mockCursor),
findOne: vi.fn(async () => ({ _id: "1", name: "John" })),
insertOne: vi.fn(async () => ({
acknowledged: true,
insertedId: "1",
})),
insertMany: vi.fn(async () => ({
acknowledged: true,
insertedCount: 3,
insertedIds: { 0: "1", 1: "2", 2: "3" },
})),
updateOne: vi.fn(async () => ({
acknowledged: true,
matchedCount: 1,
modifiedCount: 1,
upsertedCount: 0,
upsertedId: null,
})),
updateMany: vi.fn(async () => ({
acknowledged: true,
matchedCount: 5,
modifiedCount: 5,
upsertedCount: 0,
upsertedId: null,
})),
deleteOne: vi.fn(async () => ({
acknowledged: true,
deletedCount: 1,
})),
deleteMany: vi.fn(async () => ({
acknowledged: true,
deletedCount: 10,
})),
countDocuments: vi.fn(async () => 42),
aggregate: vi.fn(() => mockAggCursor),
replaceOne: vi.fn(async () => ({
acknowledged: true,
matchedCount: 1,
modifiedCount: 1,
upsertedCount: 0,
upsertedId: null,
})),
findOneAndUpdate: vi.fn(async () => ({
value: { _id: "1", name: "Updated" },
ok: 1,
})),
findOneAndDelete: vi.fn(async () => ({
value: { _id: "1", name: "John" },
ok: 1,
})),
findOneAndReplace: vi.fn(async () => ({
value: { _id: "1", name: "Replaced" },
ok: 1,
})),
} as unknown as Collection<Document>;
};
return {
databaseName: dbName,
collection: vi.fn((name: string) => createCollectionMock(name)),
} as unknown as Db;
};
it("instruments db and auto-instruments collections", async () => {
const db = createMockDb("myapp");
instrumentDb(db, { captureFilters: true });
const users = db.collection("users");
await users.findOne({ email: "test@example.com" });
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.findOne");
expect(span.attributes[SEMATTRS_DB_NAME]).toBe("myapp");
});
it("uses database name from db if not provided in config", async () => {
const db = createMockDb("autodb");
instrumentDb(db);
const users = db.collection("users");
await users.findOne({});
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.attributes[SEMATTRS_DB_NAME]).toBe("autodb");
});
it("is idempotent - does not instrument twice", () => {
const db = createMockDb();
const first = instrumentDb(db);
const originalCollection = first.collection;
const second = instrumentDb(first);
expect(second.collection).toBe(originalCollection);
});
it("returns db unchanged if null", () => {
const result = instrumentDb(null as any);
expect(result).toBeNull();
});
});
describe("instrumentMongoClient", () => {
let provider: BasicTracerProvider;
let exporter: InMemorySpanExporter;
beforeEach(() => {
exporter = new InMemorySpanExporter();
provider = new BasicTracerProvider({
spanProcessors: [new SimpleSpanProcessor(exporter)],
});
trace.setGlobalTracerProvider(provider);
});
afterEach(async () => {
await provider.shutdown();
exporter.reset();
trace.disable();
});
const createMockClient = (): MongoClient => {
// Helper to create a new mock collection for each call
const createCollectionMock = (name: string): Collection<Document> => {
const mockCursor = {
toArray: vi.fn(async () => [{ _id: "1", name: "John" }]),
} as unknown as FindCursor<any>;
const mockAggCursor = {
toArray: vi.fn(async () => [{ count: 5 }]),
} as unknown as AggregationCursor<any>;
return {
collectionName: name,
find: vi.fn(() => mockCursor),
findOne: vi.fn(async () => ({ _id: "1", name: "John" })),
insertOne: vi.fn(async () => ({
acknowledged: true,
insertedId: "1",
})),
insertMany: vi.fn(async () => ({
acknowledged: true,
insertedCount: 3,
insertedIds: { 0: "1", 1: "2", 2: "3" },
})),
updateOne: vi.fn(async () => ({
acknowledged: true,
matchedCount: 1,
modifiedCount: 1,
upsertedCount: 0,
upsertedId: null,
})),
updateMany: vi.fn(async () => ({
acknowledged: true,
matchedCount: 5,
modifiedCount: 5,
upsertedCount: 0,
upsertedId: null,
})),
deleteOne: vi.fn(async () => ({
acknowledged: true,
deletedCount: 1,
})),
deleteMany: vi.fn(async () => ({
acknowledged: true,
deletedCount: 10,
})),
countDocuments: vi.fn(async () => 42),
aggregate: vi.fn(() => mockAggCursor),
replaceOne: vi.fn(async () => ({
acknowledged: true,
matchedCount: 1,
modifiedCount: 1,
upsertedCount: 0,
upsertedId: null,
})),
findOneAndUpdate: vi.fn(async () => ({
value: { _id: "1", name: "Updated" },
ok: 1,
})),
findOneAndDelete: vi.fn(async () => ({
value: { _id: "1", name: "John" },
ok: 1,
})),
findOneAndReplace: vi.fn(async () => ({
value: { _id: "1", name: "Replaced" },
ok: 1,
})),
} as unknown as Collection<Document>;
};
const createDbMock = (dbName: string): Db => {
return {
databaseName: dbName,
collection: vi.fn((name: string) => createCollectionMock(name)),
} as unknown as Db;
};
return {
db: vi.fn((name?: string) => createDbMock(name || "testdb")),
} as unknown as MongoClient;
};
it("instruments client and auto-instruments databases and collections", async () => {
const client = createMockClient();
instrumentMongoClient(client, {
captureFilters: true,
peerName: "localhost",
peerPort: 27017,
});
const db = client.db("testdb");
const users = db.collection("users");
await users.findOne({ email: "test@example.com" });
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a span to be recorded");
}
expect(span.name).toBe("mongodb.users.findOne");
expect(span.attributes[SEMATTRS_DB_NAME]).toBe("testdb");
expect(span.attributes[SEMATTRS_NET_PEER_NAME]).toBe("localhost");
expect(span.attributes[SEMATTRS_NET_PEER_PORT]).toBe(27017);
});
it("is idempotent - does not instrument twice", () => {
const client = createMockClient();
const first = instrumentMongoClient(client);
const originalDb = first.db;
const second = instrumentMongoClient(first);
expect(second.db).toBe(originalDb);
});
it("returns client unchanged if null", () => {
const result = instrumentMongoClient(null as any);
expect(result).toBeNull();
});
});

View File

@@ -0,0 +1,975 @@
import {
context,
SpanKind,
SpanStatusCode,
trace,
type Span,
} from "@opentelemetry/api";
import type {
Collection,
Db,
MongoClient,
Document,
Filter,
UpdateFilter,
OptionalUnlessRequiredId,
WithId,
WithoutId,
DeleteResult,
UpdateResult,
InsertOneResult,
InsertManyResult,
FindCursor,
AggregationCursor,
CountDocumentsOptions,
FindOptions,
} from "mongodb";
const DEFAULT_TRACER_NAME = "@kubiks/otel-mongodb";
const INSTRUMENTED_FLAG = Symbol("kubiksOtelMongoDBInstrumented");
const COMMAND_SPAN_MAP = Symbol("kubiksOtelMongoDBCommandSpanMap");
// Semantic conventions for database attributes
export const SEMATTRS_DB_SYSTEM = "db.system" as const;
export const SEMATTRS_DB_OPERATION = "db.operation" as const;
export const SEMATTRS_DB_NAME = "db.name" as const;
export const SEMATTRS_DB_MONGODB_COLLECTION = "db.mongodb.collection" as const;
export const SEMATTRS_DB_STATEMENT = "db.statement" as const;
export const SEMATTRS_NET_PEER_NAME = "net.peer.name" as const;
export const SEMATTRS_NET_PEER_PORT = "net.peer.port" as const;
// MongoDB-specific attributes
export const SEMATTRS_MONGODB_FILTER = "mongodb.filter" as const;
export const SEMATTRS_MONGODB_RESULT_COUNT = "mongodb.result_count" as const;
export const SEMATTRS_MONGODB_MATCHED_COUNT = "mongodb.matched_count" as const;
export const SEMATTRS_MONGODB_MODIFIED_COUNT =
"mongodb.modified_count" as const;
export const SEMATTRS_MONGODB_DELETED_COUNT = "mongodb.deleted_count" as const;
export const SEMATTRS_MONGODB_INSERTED_COUNT =
"mongodb.inserted_count" as const;
export const SEMATTRS_MONGODB_UPSERTED_COUNT =
"mongodb.upserted_count" as const;
// MongoDB execution stats attributes
export const SEMATTRS_MONGODB_EXECUTION_TIME_MS =
"mongodb.execution_time_ms" as const;
/**
* Configuration options for MongoDB instrumentation.
*/
export interface InstrumentMongoDBConfig {
/**
* Custom tracer name. Defaults to "@kubiks/otel-mongodb".
*/
tracerName?: string;
/**
* Database name to include in spans.
*/
dbName?: string;
/**
* Whether to capture query filters in spans.
* Defaults to false for security (filters may contain sensitive data).
*/
captureFilters?: boolean;
/**
* Maximum length for captured filter text. Filters longer than this
* will be truncated. Defaults to 500 characters.
*/
maxFilterLength?: number;
/**
* Whether to capture execution statistics from command monitoring.
*
* IMPORTANT: Requires MongoClient to be created with monitorCommands: true
* Example: new MongoClient(uri, { monitorCommands: true })
*
* This captures execution time (mongodb.execution_time_ms) for all queries.
*
* Only works when instrumenting the MongoClient. Defaults to false.
*/
captureExecutionStats?: boolean;
/**
* Remote hostname or IP address of the MongoDB server.
* Example: "mongodb.example.com" or "192.168.1.100"
*/
peerName?: string;
/**
* Remote port number of the MongoDB server.
* Example: 27017
*/
peerPort?: number;
}
interface InstrumentedCollection {
[INSTRUMENTED_FLAG]?: true;
}
interface InstrumentedDb {
[INSTRUMENTED_FLAG]?: true;
}
interface InstrumentedClient {
[INSTRUMENTED_FLAG]?: true;
[COMMAND_SPAN_MAP]?: Map<number, Span>;
}
/**
* Sanitizes and truncates filter text for safe inclusion in spans.
*/
function sanitizeFilter(filter: unknown, maxLength: number): string {
try {
const filterText = JSON.stringify(filter);
if (filterText.length <= maxLength) {
return filterText;
}
return `${filterText.substring(0, maxLength)}...`;
} catch {
return "[Unable to serialize filter]";
}
}
/**
* Finalizes a span with status, timing, and optional error.
*/
function finalizeSpan(span: Span, error?: unknown): void {
if (error) {
if (error instanceof Error) {
span.recordException(error);
} else {
span.recordException(new Error(String(error)));
}
span.setStatus({ code: SpanStatusCode.ERROR });
} else {
span.setStatus({ code: SpanStatusCode.OK });
}
span.end();
}
/**
* Creates common span attributes for MongoDB operations.
*/
function createBaseAttributes(
collectionName: string,
operation: string,
config?: InstrumentMongoDBConfig
): Record<string, string | number> {
const attributes: Record<string, string | number> = {
[SEMATTRS_DB_SYSTEM]: "mongodb",
[SEMATTRS_DB_OPERATION]: operation,
[SEMATTRS_DB_MONGODB_COLLECTION]: collectionName,
};
if (config?.dbName) {
attributes[SEMATTRS_DB_NAME] = config.dbName;
}
if (config?.peerName) {
attributes[SEMATTRS_NET_PEER_NAME] = config.peerName;
}
if (config?.peerPort) {
attributes[SEMATTRS_NET_PEER_PORT] = config.peerPort;
}
return attributes;
}
/**
* Adds filter to span attributes if enabled.
*/
function addFilterAttribute(
span: Span,
filter: unknown,
config?: InstrumentMongoDBConfig
): void {
if (config?.captureFilters && filter) {
const maxLength = config.maxFilterLength ?? 500;
const sanitized = sanitizeFilter(filter, maxLength);
span.setAttribute(SEMATTRS_MONGODB_FILTER, sanitized);
span.setAttribute(SEMATTRS_DB_STATEMENT, `filter: ${sanitized}`);
}
}
/**
* Instruments a MongoDB Collection with OpenTelemetry tracing.
*
* This function wraps all major Collection methods to create spans for each database
* operation. The instrumentation is idempotent - calling it multiple times on the same
* collection will only instrument it once.
*
* @typeParam TSchema - The schema of documents in the collection
* @param collection - The MongoDB collection to instrument
* @param config - Optional configuration for instrumentation behavior
* @returns The instrumented collection (same instance, modified in place)
*
* @example
* ```typescript
* import { MongoClient } from 'mongodb';
* import { instrumentCollection } from '@kubiks/otel-mongodb';
*
* const client = new MongoClient('mongodb://localhost:27017');
* await client.connect();
* const db = client.db('myapp');
* const users = db.collection('users');
*
* instrumentCollection(users, {
* dbName: 'myapp',
* captureFilters: true,
* peerName: 'localhost',
* peerPort: 27017,
* });
*
* // All operations are now traced
* await users.findOne({ email: 'user@example.com' });
* ```
*/
export function instrumentCollection<TSchema extends Document = Document>(
collection: Collection<TSchema>,
config?: InstrumentMongoDBConfig
): Collection<TSchema> {
if (!collection) {
return collection;
}
// Check if already instrumented
if ((collection as unknown as InstrumentedCollection)[INSTRUMENTED_FLAG]) {
return collection;
}
const {
tracerName = DEFAULT_TRACER_NAME,
captureFilters = false,
maxFilterLength = 500,
} = config ?? {};
const tracer = trace.getTracer(tracerName);
const collectionName = collection.collectionName;
// Instrument find - wrap toArray as a separate operation
const originalFind = collection.find.bind(collection);
collection.find = function instrumentedFind(
filter?: Filter<TSchema>,
options?: FindOptions
): FindCursor<WithId<TSchema>> {
const cursor = originalFind(filter ?? {}, options);
// Wrap toArray to create span when results are actually fetched
const originalToArray = cursor.toArray.bind(cursor);
cursor.toArray = async function instrumentedToArray() {
const span = tracer.startSpan(`mongodb.${collectionName}.find`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(collectionName, "find", config);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const results = await context.with(activeContext, () =>
originalToArray()
);
span.setAttribute(SEMATTRS_MONGODB_RESULT_COUNT, results.length);
finalizeSpan(span);
return results;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
return cursor;
};
// Instrument findOne
const originalFindOne = collection.findOne.bind(collection);
collection.findOne = async function instrumentedFindOne(
filter?: Filter<TSchema>,
options?: FindOptions
): Promise<WithId<TSchema> | null> {
const span = tracer.startSpan(`mongodb.${collectionName}.findOne`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(collectionName, "findOne", config);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = (await context.with(activeContext, () =>
originalFindOne(filter ?? {}, options)
)) as WithId<TSchema> | null;
span.setAttribute(SEMATTRS_MONGODB_RESULT_COUNT, result ? 1 : 0);
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument insertOne
const originalInsertOne = collection.insertOne.bind(collection);
collection.insertOne = async function instrumentedInsertOne(
doc: OptionalUnlessRequiredId<TSchema>,
options?: any
): Promise<InsertOneResult<TSchema>> {
const span = tracer.startSpan(`mongodb.${collectionName}.insertOne`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(
collectionName,
"insertOne",
config
);
span.setAttributes(attributes);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalInsertOne(doc, options)
);
span.setAttribute(
SEMATTRS_MONGODB_INSERTED_COUNT,
result.acknowledged ? 1 : 0
);
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument insertMany
const originalInsertMany = collection.insertMany.bind(collection);
collection.insertMany = async function instrumentedInsertMany(
docs: OptionalUnlessRequiredId<TSchema>[],
options?: any
): Promise<InsertManyResult<TSchema>> {
const span = tracer.startSpan(`mongodb.${collectionName}.insertMany`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(
collectionName,
"insertMany",
config
);
span.setAttributes(attributes);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalInsertMany(docs, options)
);
span.setAttribute(SEMATTRS_MONGODB_INSERTED_COUNT, result.insertedCount);
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument updateOne
const originalUpdateOne = collection.updateOne.bind(collection);
collection.updateOne = async function instrumentedUpdateOne(
filter: Filter<TSchema>,
update: UpdateFilter<TSchema>,
options?: any
): Promise<UpdateResult<TSchema>> {
const span = tracer.startSpan(`mongodb.${collectionName}.updateOne`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(
collectionName,
"updateOne",
config
);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalUpdateOne(filter, update, options)
);
span.setAttribute(SEMATTRS_MONGODB_MATCHED_COUNT, result.matchedCount);
span.setAttribute(SEMATTRS_MONGODB_MODIFIED_COUNT, result.modifiedCount);
if (result.upsertedCount !== undefined) {
span.setAttribute(
SEMATTRS_MONGODB_UPSERTED_COUNT,
result.upsertedCount
);
}
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument updateMany
const originalUpdateMany = collection.updateMany.bind(collection);
collection.updateMany = async function instrumentedUpdateMany(
filter: Filter<TSchema>,
update: UpdateFilter<TSchema>,
options?: any
): Promise<UpdateResult<TSchema>> {
const span = tracer.startSpan(`mongodb.${collectionName}.updateMany`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(
collectionName,
"updateMany",
config
);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalUpdateMany(filter, update, options)
);
span.setAttribute(SEMATTRS_MONGODB_MATCHED_COUNT, result.matchedCount);
span.setAttribute(SEMATTRS_MONGODB_MODIFIED_COUNT, result.modifiedCount);
if (result.upsertedCount !== undefined) {
span.setAttribute(
SEMATTRS_MONGODB_UPSERTED_COUNT,
result.upsertedCount
);
}
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument deleteOne
const originalDeleteOne = collection.deleteOne.bind(collection);
collection.deleteOne = async function instrumentedDeleteOne(
filter: Filter<TSchema>,
options?: any
): Promise<DeleteResult> {
const span = tracer.startSpan(`mongodb.${collectionName}.deleteOne`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(
collectionName,
"deleteOne",
config
);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalDeleteOne(filter, options)
);
span.setAttribute(SEMATTRS_MONGODB_DELETED_COUNT, result.deletedCount);
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument deleteMany
const originalDeleteMany = collection.deleteMany.bind(collection);
collection.deleteMany = async function instrumentedDeleteMany(
filter: Filter<TSchema>,
options?: any
): Promise<DeleteResult> {
const span = tracer.startSpan(`mongodb.${collectionName}.deleteMany`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(
collectionName,
"deleteMany",
config
);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalDeleteMany(filter, options)
);
span.setAttribute(SEMATTRS_MONGODB_DELETED_COUNT, result.deletedCount);
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument countDocuments
const originalCountDocuments = collection.countDocuments.bind(collection);
collection.countDocuments = async function instrumentedCountDocuments(
filter?: Filter<TSchema>,
options?: CountDocumentsOptions
): Promise<number> {
const span = tracer.startSpan(`mongodb.${collectionName}.countDocuments`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(
collectionName,
"countDocuments",
config
);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const count = await context.with(activeContext, () =>
originalCountDocuments(filter, options)
);
span.setAttribute(SEMATTRS_MONGODB_RESULT_COUNT, count);
finalizeSpan(span);
return count;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument aggregate - wrap toArray as a separate operation
const originalAggregate = collection.aggregate.bind(collection) as <
T extends Document = Document,
>(
pipeline?: Document[],
options?: any
) => AggregationCursor<T>;
collection.aggregate = function instrumentedAggregate<
T extends Document = Document,
>(pipeline?: Document[], options?: any): AggregationCursor<T> {
const cursor = originalAggregate<T>(pipeline, options);
// Wrap toArray to create span when results are actually fetched
const originalToArray = cursor.toArray.bind(cursor);
cursor.toArray = async function instrumentedToArray() {
const span = tracer.startSpan(`mongodb.${collectionName}.aggregate`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(
collectionName,
"aggregate",
config
);
span.setAttributes(attributes);
if (captureFilters && pipeline) {
const maxLength = maxFilterLength ?? 500;
const sanitized = sanitizeFilter(pipeline, maxLength);
span.setAttribute("mongodb.pipeline", sanitized);
span.setAttribute(SEMATTRS_DB_STATEMENT, `pipeline: ${sanitized}`);
}
const activeContext = trace.setSpan(context.active(), span);
try {
const results = await context.with(activeContext, () =>
originalToArray()
);
span.setAttribute(SEMATTRS_MONGODB_RESULT_COUNT, results.length);
finalizeSpan(span);
return results;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
return cursor;
};
// Instrument replaceOne
const originalReplaceOne = collection.replaceOne.bind(collection);
collection.replaceOne = async function instrumentedReplaceOne(
filter: Filter<TSchema>,
replacement: WithoutId<TSchema>,
options?: any
): Promise<UpdateResult<TSchema>> {
const span = tracer.startSpan(`mongodb.${collectionName}.replaceOne`, {
kind: SpanKind.CLIENT,
});
const attributes = createBaseAttributes(
collectionName,
"replaceOne",
config
);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalReplaceOne(filter, replacement, options)
);
span.setAttribute(SEMATTRS_MONGODB_MATCHED_COUNT, result.matchedCount);
span.setAttribute(SEMATTRS_MONGODB_MODIFIED_COUNT, result.modifiedCount);
if (result.upsertedCount !== undefined) {
span.setAttribute(
SEMATTRS_MONGODB_UPSERTED_COUNT,
result.upsertedCount
);
}
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument findOneAndUpdate
const originalFindOneAndUpdate = collection.findOneAndUpdate.bind(collection);
collection.findOneAndUpdate = async function instrumentedFindOneAndUpdate(
filter: Filter<TSchema>,
update: UpdateFilter<TSchema>,
options?: any
): Promise<any> {
const span = tracer.startSpan(
`mongodb.${collectionName}.findOneAndUpdate`,
{
kind: SpanKind.CLIENT,
}
);
const attributes = createBaseAttributes(
collectionName,
"findOneAndUpdate",
config
);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalFindOneAndUpdate(filter, update, options)
);
span.setAttribute(SEMATTRS_MONGODB_RESULT_COUNT, result.value ? 1 : 0);
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument findOneAndDelete
const originalFindOneAndDelete = collection.findOneAndDelete.bind(collection);
collection.findOneAndDelete = async function instrumentedFindOneAndDelete(
filter: Filter<TSchema>,
options?: any
): Promise<any> {
const span = tracer.startSpan(
`mongodb.${collectionName}.findOneAndDelete`,
{
kind: SpanKind.CLIENT,
}
);
const attributes = createBaseAttributes(
collectionName,
"findOneAndDelete",
config
);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalFindOneAndDelete(filter, options)
);
span.setAttribute(SEMATTRS_MONGODB_DELETED_COUNT, result.value ? 1 : 0);
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Instrument findOneAndReplace
const originalFindOneAndReplace =
collection.findOneAndReplace.bind(collection);
collection.findOneAndReplace = async function instrumentedFindOneAndReplace(
filter: Filter<TSchema>,
replacement: WithoutId<TSchema>,
options?: any
): Promise<any> {
const span = tracer.startSpan(
`mongodb.${collectionName}.findOneAndReplace`,
{
kind: SpanKind.CLIENT,
}
);
const attributes = createBaseAttributes(
collectionName,
"findOneAndReplace",
config
);
span.setAttributes(attributes);
addFilterAttribute(span, filter, config);
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalFindOneAndReplace(filter, replacement, options)
);
span.setAttribute(SEMATTRS_MONGODB_RESULT_COUNT, result.value ? 1 : 0);
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Mark as instrumented
(collection as unknown as InstrumentedCollection)[INSTRUMENTED_FLAG] = true;
return collection;
}
/**
* Instruments a MongoDB Database with OpenTelemetry tracing.
*
* This function wraps the database's collection method to automatically instrument
* all collections accessed through this database instance.
*
* @param db - The MongoDB database to instrument
* @param config - Optional configuration for instrumentation behavior
* @returns The instrumented database (same instance, modified in place)
*
* @example
* ```typescript
* import { MongoClient } from 'mongodb';
* import { instrumentDb } from '@kubiks/otel-mongodb';
*
* const client = new MongoClient('mongodb://localhost:27017');
* await client.connect();
* const db = client.db('myapp');
*
* instrumentDb(db, {
* dbName: 'myapp',
* captureFilters: true,
* });
*
* // All collections are automatically instrumented
* const users = db.collection('users');
* await users.findOne({ email: 'user@example.com' });
* ```
*/
export function instrumentDb(db: Db, config?: InstrumentMongoDBConfig): Db {
if (!db) {
return db;
}
// Check if already instrumented
if ((db as unknown as InstrumentedDb)[INSTRUMENTED_FLAG]) {
return db;
}
// Auto-populate dbName from database if not provided
const finalConfig: InstrumentMongoDBConfig = {
...config,
dbName: config?.dbName ?? db.databaseName,
};
// Wrap collection method to instrument all collections
const originalCollection = db.collection.bind(db) as <
TSchema extends Document = Document,
>(
name: string,
options?: any
) => Collection<TSchema>;
db.collection = function instrumentedCollection<
TSchema extends Document = Document,
>(name: string, options?: any): Collection<TSchema> {
const collection = originalCollection<TSchema>(name, options);
return instrumentCollection(collection, finalConfig);
};
// Mark as instrumented
(db as unknown as InstrumentedDb)[INSTRUMENTED_FLAG] = true;
return db;
}
/**
* Instruments a MongoDB Client with OpenTelemetry tracing.
*
* This function wraps the client's db method to automatically instrument
* all databases accessed through this client instance.
*
* @param client - The MongoDB client to instrument
* @param config - Optional configuration for instrumentation behavior
* @returns The instrumented client (same instance, modified in place)
*
* @example
* ```typescript
* import { MongoClient } from 'mongodb';
* import { instrumentMongoClient } from '@kubiks/otel-mongodb';
*
* // To enable execution stats, you MUST create the client with monitorCommands: true
* const client = new MongoClient('mongodb://localhost:27017', {
* monitorCommands: true, // Required for captureExecutionStats
* });
* await client.connect();
*
* instrumentMongoClient(client, {
* captureFilters: true,
* captureExecutionStats: true,
* peerName: 'localhost',
* peerPort: 27017,
* });
*
* // All databases and collections are automatically instrumented
* const db = client.db('myapp');
* const users = db.collection('users');
* await users.findOne({ email: 'user@example.com' });
* ```
*
*/
export function instrumentMongoClient(
client: MongoClient,
config?: InstrumentMongoDBConfig
): MongoClient {
if (!client) {
return client;
}
// Check if already instrumented
const instrumentedClient = client as unknown as InstrumentedClient;
if (instrumentedClient[INSTRUMENTED_FLAG]) {
return client;
}
// Wrap db method to instrument all databases
const originalDb = client.db.bind(client);
client.db = function instrumentedDb(dbName?: string, options?: any): Db {
const db = originalDb(dbName, options);
return instrumentDb(db, config);
};
// Set up command monitoring for execution stats if enabled
if (config?.captureExecutionStats) {
// Create a map to store active spans by request ID
const spanMap = new Map<number, Span>();
instrumentedClient[COMMAND_SPAN_MAP] = spanMap;
// Listen for command started events to capture the active span
client.on("commandStarted", (event: any) => {
try {
// Get the currently active span from OpenTelemetry context
const activeSpan = trace.getActiveSpan();
if (activeSpan) {
spanMap.set(event.requestId, activeSpan);
}
} catch (error) {
// Silently ignore errors to avoid disrupting the application
}
});
// Listen for command succeeded events to capture execution stats
client.on("commandSucceeded", (event: any) => {
const span = spanMap.get(event.requestId);
if (span) {
try {
// Add execution time from the event (always available)
if (event.duration !== undefined) {
span.setAttribute(
SEMATTRS_MONGODB_EXECUTION_TIME_MS,
event.duration
);
}
// Add command-level metadata
if (event.reply) {
// Some operations include counts directly in the reply
if (event.reply.n !== undefined) {
span.setAttribute("mongodb.reply_count", event.reply.n);
}
if (event.reply.nModified !== undefined) {
span.setAttribute(
"mongodb.reply_modified",
event.reply.nModified
);
}
}
} catch (error) {
// Silently ignore errors in stats extraction
// to avoid disrupting the application
}
}
// Clean up the span from the map
spanMap.delete(event.requestId);
});
// Listen for command failed events to clean up
client.on("commandFailed", (event: any) => {
spanMap.delete(event.requestId);
});
}
// Mark as instrumented
instrumentedClient[INSTRUMENTED_FLAG] = true;
return client;
}

View File

@@ -0,0 +1,21 @@
{
"compilerOptions": {
"target": "ES2020",
"module": "ESNext",
"moduleResolution": "bundler",
"lib": ["ES2020"],
"outDir": "dist",
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"declarationDir": "dist/types",
"stripInternal": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "**/*.test.ts"]
}

131
pnpm-lock.yaml generated
View File

@@ -126,6 +126,30 @@ importers:
specifier: 0.33.0
version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0)
packages/otel-mongodb:
devDependencies:
'@opentelemetry/api':
specifier: ^1.9.0
version: 1.9.0
'@opentelemetry/sdk-trace-base':
specifier: ^2.1.0
version: 2.1.0(@opentelemetry/api@1.9.0)
'@types/node':
specifier: 18.15.11
version: 18.15.11
mongodb:
specifier: ^6.0.0
version: 6.20.0
rimraf:
specifier: 3.0.2
version: 3.0.2
typescript:
specifier: ^5
version: 5.3.3
vitest:
specifier: 0.33.0
version: 0.33.0(less@4.2.0)(sass@1.69.7)(stylus@0.59.0)
packages/otel-resend:
devDependencies:
'@opentelemetry/api':
@@ -577,6 +601,9 @@ packages:
'@manypkg/get-packages@1.1.3':
resolution: {integrity: sha512-fo+QhuU3qE/2TQMQmbVMqaQ6EWbMhi4ABWP+O4AM1NqPBuy0OrApV5LO6BrrgnhtAHS2NH6RrVk9OL181tTi8A==}
'@mongodb-js/saslprep@1.3.2':
resolution: {integrity: sha512-QgA5AySqB27cGTXBFmnpifAi7HxoGUeezwo6p9dI03MuDB6Pp33zgclqVb6oVK3j6I9Vesg0+oojW2XxB59SGg==}
'@noble/ciphers@2.0.1':
resolution: {integrity: sha512-xHK3XHPUW8DTAobU+G0XT+/w+JLM7/8k1UFdB5xg/zTFPnFCobhftzw8wl4Lw2aq/Rvir5pxfZV5fEazmeCJ2g==}
engines: {node: '>= 20.19.0'}
@@ -765,6 +792,12 @@ packages:
'@types/semver@7.5.6':
resolution: {integrity: sha512-dn1l8LaMea/IjDoHNd9J52uBbInB796CDffS6VdIxvqYCPSG0V0DzHp76GpaWnlhg88uYyPbXCDIowa86ybd5A==}
'@types/webidl-conversions@7.0.3':
resolution: {integrity: sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==}
'@types/whatwg-url@11.0.5':
resolution: {integrity: sha512-coYR071JRaHa+xoEvvYqvnIHaVqaYrLPbsufM9BF63HkwI5Lgmy2QR8Q5K/lYDYo5AK82wOvSOS0UsLTpTG7uQ==}
'@upstash/qstash@2.8.3':
resolution: {integrity: sha512-SHf1mCGqZur0UTzXVx33phtFXIuLyjwDL1QsBE36gQFEx3rEG4fJc3qA2eD7jTUXEAYYrNkCQxMOtcteHFpwqw==}
@@ -932,6 +965,10 @@ packages:
breakword@1.0.6:
resolution: {integrity: sha512-yjxDAYyK/pBvws9H4xKYpLDpYKEH6CzrBPAuXq3x18I+c/2MkVtT3qAr7Oloi6Dss9qNhPVueAAVU1CSeNDIXw==}
bson@6.10.4:
resolution: {integrity: sha512-WIsKqkSC0ABoBJuT1LEX+2HEvNmNKKgnTAyd0fL8qzK4SH2i9NXg+t08YtdZp/V9IZ33cxe3iV4yM0qg8lMQng==}
engines: {node: '>=16.20.1'}
cac@6.7.14:
resolution: {integrity: sha512-b6Ilus+c3RrdDk+JhLKUAQfzzgLEPy6wcXqS7f/xe1EETvsDP6GORG7SFuOs6cID5YkqchW/LXZbX5bc8j7ZcQ==}
engines: {node: '>=8'}
@@ -1691,6 +1728,9 @@ packages:
resolution: {integrity: sha512-hdN1wVrZbb29eBGiGjJbeP8JbKjq1urkHJ/LIP/NY48MZ1QVXUsQBV1G1zvYFHn1XE06cwjBsOI2K3Ulnj1YXQ==}
engines: {node: '>=8'}
memory-pager@1.5.0:
resolution: {integrity: sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==}
meow@6.1.1:
resolution: {integrity: sha512-3YffViIt2QWgTy6Pale5QpopX/IvU3LPL03jOTqp6pGj3VjesdO/U8CuHMKpnQr4shCNCM5fd5XFFvIIl6JBHg==}
engines: {node: '>=8'}
@@ -1738,6 +1778,36 @@ packages:
mlly@1.4.2:
resolution: {integrity: sha512-i/Ykufi2t1EZ6NaPLdfnZk2AX8cs0d+mTzVKuPfqPKPatxLApaBoxJQ9x1/uckXtrS/U5oisPMDkNs0yQTaBRg==}
mongodb-connection-string-url@3.0.2:
resolution: {integrity: sha512-rMO7CGo/9BFwyZABcKAWL8UJwH/Kc2x0g72uhDWzG48URRax5TCIcJ7Rc3RZqffZzO/Gwff/jyKwCU9TN8gehA==}
mongodb@6.20.0:
resolution: {integrity: sha512-Tl6MEIU3K4Rq3TSHd+sZQqRBoGlFsOgNrH5ltAcFBV62Re3Fd+FcaVf8uSEQFOJ51SDowDVttBTONMfoYWrWlQ==}
engines: {node: '>=16.20.1'}
peerDependencies:
'@aws-sdk/credential-providers': ^3.188.0
'@mongodb-js/zstd': ^1.1.0 || ^2.0.0
gcp-metadata: ^5.2.0
kerberos: ^2.0.1
mongodb-client-encryption: '>=6.0.0 <7'
snappy: ^7.3.2
socks: ^2.7.1
peerDependenciesMeta:
'@aws-sdk/credential-providers':
optional: true
'@mongodb-js/zstd':
optional: true
gcp-metadata:
optional: true
kerberos:
optional: true
mongodb-client-encryption:
optional: true
snappy:
optional: true
socks:
optional: true
ms@2.1.2:
resolution: {integrity: sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==}
@@ -1959,6 +2029,10 @@ packages:
pseudomap@1.0.2:
resolution: {integrity: sha512-b/YwNhb8lk1Zz2+bXXpS/LK9OisiZZ1SNsSLxN1x2OXVEhW2Ckr/7mWE5vrC1ZTiJlD9g19jWszTmJsB+oEpFQ==}
punycode@2.3.1:
resolution: {integrity: sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==}
engines: {node: '>=6'}
pvtsutils@1.3.6:
resolution: {integrity: sha512-PLgQXQ6H2FWCaeRak8vvk1GW462lMxB5s3Jm673N82zI4vqtVUPuZdffdZbPDFRoU8kAhItWFtPCWiPpp4/EDg==}
@@ -2164,6 +2238,9 @@ packages:
resolution: {integrity: sha512-l3BikUxvPOcn5E74dZiq5BGsTb5yEwhaTSzccU6t4sDOH8NWJCstKO5QT2CvtFoK6F0saL7p9xHAqHOlCPJygA==}
engines: {node: '>= 8'}
sparse-bitfield@3.0.3:
resolution: {integrity: sha512-kvzhi7vqKTfkh0PZU+2D2PIllw2ymqJKujUcyPMd9Y75Nv4nPbGJZXNhxsgdQab2BmlDct1YnfQCguEvHr7VsQ==}
spawndamnit@2.0.0:
resolution: {integrity: sha512-j4JKEcncSjFlqIwU5L/rp2N5SIPsdxaRsIv678+TZxZ0SRDJTm8JrxJMjE/XuiEZNEir3S8l0Fa3Ke339WI4qA==}
@@ -2279,6 +2356,10 @@ packages:
tr46@0.0.3:
resolution: {integrity: sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==}
tr46@5.1.1:
resolution: {integrity: sha512-hdF5ZgjTqgAntKkklYw0R03MG2x/bSzTtkxmIRw/sTNV8YXsCJ1tfLAX23lhxhHJlEf3CRCOCGGWw3vI3GaSPw==}
engines: {node: '>=18'}
trim-newlines@3.0.1:
resolution: {integrity: sha512-c1PTsA3tYrIsLGkJkzHF+w9F2EyxfXGo4UyJc4pFL++FMjnq0HJS69T3M7d//gKrFKwy429bouPescbjecU+Zw==}
engines: {node: '>=8'}
@@ -2459,6 +2540,14 @@ packages:
webidl-conversions@3.0.1:
resolution: {integrity: sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==}
webidl-conversions@7.0.0:
resolution: {integrity: sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==}
engines: {node: '>=12'}
whatwg-url@14.2.0:
resolution: {integrity: sha512-De72GdQZzNTUBBChsXueQUnPKDkg/5A5zp7pFDuQAj5UFoENpiACU0wlCvzpAGnTkj++ihpKwKyYewn/XNUbKw==}
engines: {node: '>=18'}
whatwg-url@5.0.0:
resolution: {integrity: sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==}
@@ -2926,6 +3015,10 @@ snapshots:
globby: 11.1.0
read-yaml-file: 1.1.0
'@mongodb-js/saslprep@1.3.2':
dependencies:
sparse-bitfield: 3.0.3
'@noble/ciphers@2.0.1': {}
'@noble/hashes@2.0.1': {}
@@ -3172,6 +3265,12 @@ snapshots:
'@types/semver@7.5.6': {}
'@types/webidl-conversions@7.0.3': {}
'@types/whatwg-url@11.0.5':
dependencies:
'@types/webidl-conversions': 7.0.3
'@upstash/qstash@2.8.3':
dependencies:
crypto-js: 4.2.0
@@ -3340,6 +3439,8 @@ snapshots:
dependencies:
wcwidth: 1.0.1
bson@6.10.4: {}
cac@6.7.14: {}
call-bind@1.0.5:
@@ -4115,6 +4216,8 @@ snapshots:
map-obj@4.3.0: {}
memory-pager@1.5.0: {}
meow@6.1.1:
dependencies:
'@types/minimist': 1.2.5
@@ -4170,6 +4273,17 @@ snapshots:
pkg-types: 1.0.3
ufo: 1.3.2
mongodb-connection-string-url@3.0.2:
dependencies:
'@types/whatwg-url': 11.0.5
whatwg-url: 14.2.0
mongodb@6.20.0:
dependencies:
'@mongodb-js/saslprep': 1.3.2
bson: 6.10.4
mongodb-connection-string-url: 3.0.2
ms@2.1.2: {}
nanoid@3.3.7: {}
@@ -4357,6 +4471,8 @@ snapshots:
pseudomap@1.0.2: {}
punycode@2.3.1: {}
pvtsutils@1.3.6:
dependencies:
tslib: 2.8.1
@@ -4568,6 +4684,10 @@ snapshots:
source-map@0.7.4:
optional: true
sparse-bitfield@3.0.3:
dependencies:
memory-pager: 1.5.0
spawndamnit@2.0.0:
dependencies:
cross-spawn: 5.1.0
@@ -4692,6 +4812,10 @@ snapshots:
tr46@0.0.3: {}
tr46@5.1.1:
dependencies:
punycode: 2.3.1
trim-newlines@3.0.1: {}
tslib@1.14.1: {}
@@ -4869,6 +4993,13 @@ snapshots:
webidl-conversions@3.0.1: {}
webidl-conversions@7.0.0: {}
whatwg-url@14.2.0:
dependencies:
tr46: 5.1.1
webidl-conversions: 7.0.0
whatwg-url@5.0.0:
dependencies:
tr46: 0.0.3