diff --git a/.changeset/quick-walls-brake.md b/.changeset/quick-walls-brake.md new file mode 100644 index 0000000..6b68f8b --- /dev/null +++ b/.changeset/quick-walls-brake.md @@ -0,0 +1,5 @@ +--- +"@kubiks/otel-drizzle": minor +--- + +Added instrumentDrizzleClient func to instrument any drizzle instance diff --git a/packages/otel-drizzle/README.md b/packages/otel-drizzle/README.md index e514d88..aff11b8 100644 --- a/packages/otel-drizzle/README.md +++ b/packages/otel-drizzle/README.md @@ -42,71 +42,168 @@ Works with any observability platform that supports OpenTelemetry including: ## Usage -Simply wrap your database connection pool with `instrumentDrizzle()` before passing it to Drizzle: +### Instrument Your Drizzle Database (Recommended) + +Use `instrumentDrizzleClient()` to add tracing to your Drizzle database instance. This is the simplest and most straightforward approach: ```typescript -import { drizzle } from "drizzle-orm/node-postgres"; -import { Pool } from "pg"; -import { instrumentDrizzle } from "@kubiks/otel-drizzle"; +import { drizzle } from "drizzle-orm/postgres-js"; +import { instrumentDrizzleClient } from "@kubiks/otel-drizzle"; -const pool = new Pool({ connectionString: process.env.DATABASE_URL }); -const instrumentedPool = instrumentDrizzle(pool); -const db = drizzle(instrumentedPool); +// Create your Drizzle database instance as usual +const db = drizzle(process.env.DATABASE_URL!); + +// Add instrumentation with a single line +instrumentDrizzleClient(db); // That's it! All queries are now traced automatically const users = await db.select().from(usersTable); ``` -### Optional Configuration +### Database-Specific Examples + +#### PostgreSQL ```typescript -const pool = new Pool({ connectionString: process.env.DATABASE_URL }); -const instrumentedPool = instrumentDrizzle(pool, { - dbSystem: "postgresql", // Database type (default: 'postgresql') - dbName: "myapp", // Database name for spans - captureQueryText: true, // Include SQL in traces (default: true) - maxQueryTextLength: 1000, // Max SQL length (default: 1000) - peerName: "db.example.com", // Database server hostname - peerPort: 5432, // Database server port +// PostgreSQL with postgres.js (recommended for serverless) +import { drizzle } from "drizzle-orm/postgres-js"; +import postgres from "postgres"; +import { instrumentDrizzleClient } from "@kubiks/otel-drizzle"; + +// Using connection string directly +const db = drizzle(process.env.DATABASE_URL!); +instrumentDrizzleClient(db, { dbSystem: "postgresql" }); + +// Or with a client instance +const queryClient = postgres(process.env.DATABASE_URL!); +const db = drizzle({ client: queryClient }); +instrumentDrizzleClient(db, { + dbSystem: "postgresql", + dbName: "myapp", + peerName: "db.example.com", + peerPort: 5432, }); -const db = drizzle(instrumentedPool); ``` -### Works with All Drizzle-Supported Databases - -This package supports **all databases that Drizzle ORM supports**, including PostgreSQL, MySQL, SQLite, Turso, Neon, PlanetScale, and more. - ```typescript -// PostgreSQL with node-postgres +// PostgreSQL with node-postgres (pg) import { drizzle } from "drizzle-orm/node-postgres"; import { Pool } from "pg"; -const pool = new Pool({ connectionString: process.env.DATABASE_URL }); -const db = drizzle(instrumentDrizzle(pool)); +import { instrumentDrizzleClient } from "@kubiks/otel-drizzle"; +// Using connection string directly +const db = drizzle(process.env.DATABASE_URL!); +instrumentDrizzleClient(db, { dbSystem: "postgresql" }); + +// Or with a pool instance +const pool = new Pool({ connectionString: process.env.DATABASE_URL }); +const db = drizzle({ client: pool }); +instrumentDrizzleClient(db, { dbSystem: "postgresql" }); +``` + +#### MySQL + +```typescript // MySQL with mysql2 import { drizzle } from "drizzle-orm/mysql2"; import mysql from "mysql2/promise"; -const connection = await mysql.createConnection(process.env.DATABASE_URL); -const db = drizzle(instrumentDrizzle(connection, { dbSystem: "mysql" })); +import { instrumentDrizzleClient } from "@kubiks/otel-drizzle"; +// Using connection string directly +const db = drizzle(process.env.DATABASE_URL!); +instrumentDrizzleClient(db, { dbSystem: "mysql" }); + +// Or with a connection instance +const connection = await mysql.createConnection({ + host: "localhost", + user: "root", + database: "mydb", + // ... other connection options +}); +const db = drizzle({ client: connection }); +instrumentDrizzleClient(db, { + dbSystem: "mysql", + dbName: "mydb", + peerName: "localhost", + peerPort: 3306, +}); +``` + +#### SQLite + +```typescript // SQLite with better-sqlite3 import { drizzle } from "drizzle-orm/better-sqlite3"; import Database from "better-sqlite3"; -const sqlite = new Database("database.db"); -const db = drizzle(instrumentDrizzle(sqlite, { dbSystem: "sqlite" })); +import { instrumentDrizzleClient } from "@kubiks/otel-drizzle"; + +// Using file path directly +const db = drizzle("sqlite.db"); +instrumentDrizzleClient(db, { dbSystem: "sqlite" }); + +// Or with a Database instance +const sqlite = new Database("sqlite.db"); +const db = drizzle({ client: sqlite }); +instrumentDrizzleClient(db, { dbSystem: "sqlite" }); ``` +```typescript +// SQLite with LibSQL/Turso +import { drizzle } from "drizzle-orm/libsql"; +import { createClient } from "@libsql/client"; +import { instrumentDrizzleClient } from "@kubiks/otel-drizzle"; + +// Using connection config directly +const db = drizzle({ + connection: { + url: process.env.DATABASE_URL!, + authToken: process.env.DATABASE_AUTH_TOKEN, + } +}); +instrumentDrizzleClient(db, { dbSystem: "sqlite" }); + +// Or with a client instance +const client = createClient({ + url: process.env.DATABASE_URL!, + authToken: process.env.DATABASE_AUTH_TOKEN, +}); +const db = drizzle({ client }); +instrumentDrizzleClient(db, { dbSystem: "sqlite" }); +``` + +### Configuration Options + +```typescript +instrumentDrizzleClient(db, { + dbSystem: "postgresql", // Database type: 'postgresql' | 'mysql' | 'sqlite' (default: 'postgresql') + dbName: "myapp", // Database name for spans + captureQueryText: true, // Include SQL in traces (default: true) + maxQueryTextLength: 1000, // Max SQL length (default: 1000) + peerName: "db.example.com", // Database server hostname + peerPort: 5432, // Database server port +}); +``` + + ## What You Get Each database query automatically creates a span with rich telemetry data: - **Span name**: `drizzle.select`, `drizzle.insert`, `drizzle.update`, etc. -- **Operation type**: `db.operation` attribute (SELECT, INSERT, UPDATE, DELETE) +- **Operation type**: `db.operation` attribute (SELECT, INSERT, UPDATE, DELETE, SET) - **SQL query text**: Full query statement captured in `db.statement` (configurable) - **Database system**: `db.system` attribute (postgresql, mysql, sqlite, etc.) +- **Transaction tracking**: Transaction queries are marked with `db.transaction` attribute - **Error tracking**: Exceptions are recorded with stack traces and proper span status - **Performance metrics**: Duration and timing information for every query +### Transaction Support + +All queries within transactions are automatically traced, including: +- RLS (Row Level Security) queries like `SET LOCAL role` and `set_config()` +- All nested transaction queries +- Transaction rollbacks and commits + ### Span Attributes The instrumentation adds the following attributes to each span following [OpenTelemetry semantic conventions](https://opentelemetry.io/docs/specs/semconv/database/): diff --git a/packages/otel-drizzle/kubiks-otel-drizzle-1.0.0.tgz b/packages/otel-drizzle/kubiks-otel-drizzle-1.0.0.tgz deleted file mode 100644 index 30eb6c0..0000000 Binary files a/packages/otel-drizzle/kubiks-otel-drizzle-1.0.0.tgz and /dev/null differ diff --git a/packages/otel-drizzle/package.json b/packages/otel-drizzle/package.json index d000e8d..abadd31 100644 --- a/packages/otel-drizzle/package.json +++ b/packages/otel-drizzle/package.json @@ -36,14 +36,13 @@ "unit-test": "vitest --run", "unit-test-watch": "vitest" }, - "dependencies": {}, "devDependencies": { "@opentelemetry/api": "^1.9.0", "@opentelemetry/sdk-trace-base": "^2.1.0", "@types/node": "18.15.11", "@types/pg": "^8.11.10", "drizzle-orm": "^0.36.4", - "postgres": "^3.4.5", + "postgres": "^3.4.7", "rimraf": "3.0.2", "typescript": "^5", "vitest": "0.33.0" diff --git a/packages/otel-drizzle/src/index.test.ts b/packages/otel-drizzle/src/index.test.ts index 8b8bda1..2a8ea16 100644 --- a/packages/otel-drizzle/src/index.test.ts +++ b/packages/otel-drizzle/src/index.test.ts @@ -5,7 +5,7 @@ import { InMemorySpanExporter, SimpleSpanProcessor, } from "@opentelemetry/sdk-trace-base"; -import { instrumentDrizzle, type InstrumentDrizzleConfig } from "./index"; +import { instrumentDrizzle, instrumentDrizzleClient, type InstrumentDrizzleConfig } from "./index"; interface MockDrizzleClient { query: (...args: any[]) => unknown; @@ -295,4 +295,312 @@ describe("instrumentDrizzle", () => { const result = instrumentDrizzle(null as any); expect(result).toBeNull(); }); + + it("instruments a client with execute method instead of query", async () => { + const client = { + execute: vi.fn(() => Promise.resolve({ rows: [{ id: 1 }] })), + }; + + instrumentDrizzle(client); + + // Execute with SQL object format (used by various drivers) + const result = await client.execute({ + sql: "SELECT * FROM users WHERE id = ?", + args: [1], + }); + + expect(result).toEqual({ rows: [{ id: 1 }] }); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a recorded span"); + } + + expect(span.name).toBe("drizzle.select"); + expect(span.attributes["db.statement"]).toBe("SELECT * FROM users WHERE id = ?"); + expect(span.attributes["db.operation"]).toBe("SELECT"); + }); + + it("instruments a client with execute method using string query", async () => { + const client = { + execute: vi.fn(() => Promise.resolve({ rows: [] })), + }; + + instrumentDrizzle(client); + + await client.execute("DELETE FROM users"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a recorded span"); + } + + expect(span.name).toBe("drizzle.delete"); + expect(span.attributes["db.operation"]).toBe("DELETE"); + expect(span.attributes["db.statement"]).toBe("DELETE FROM users"); + }); +}); + +describe("instrumentDrizzleClient", () => { + let provider: BasicTracerProvider; + let exporter: InMemorySpanExporter; + + beforeEach(() => { + exporter = new InMemorySpanExporter(); + provider = new BasicTracerProvider({ + spanProcessors: [new SimpleSpanProcessor(exporter)], + }); + trace.setGlobalTracerProvider(provider); + }); + + afterEach(async () => { + await provider.shutdown(); + exporter.reset(); + trace.disable(); + }); + + it("instruments a db with session.prepareQuery method", async () => { + const mockPreparedQuery = { + execute: vi.fn(() => Promise.resolve({ rows: [{ id: 1 }] })), + }; + + const mockSession = { + prepareQuery: vi.fn(() => mockPreparedQuery), + }; + + const mockDb = { + session: mockSession, + select: vi.fn(), + }; + + const instrumented = instrumentDrizzleClient(mockDb); + expect(instrumented).toBe(mockDb); + + // Simulate what happens when db.select().from() is called + const prepared = mockSession.prepareQuery({ sql: "SELECT * FROM users" }); + const result = await prepared.execute(); + expect(result).toEqual({ rows: [{ id: 1 }] }); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + expect(spans[0]?.name).toBe("drizzle.select"); + expect(spans[0]?.attributes["db.statement"]).toBe("SELECT * FROM users"); + }); + + it("instruments a db with $client property as fallback", async () => { + const mockClient = { + query: vi.fn(() => Promise.resolve({ rows: [{ id: 1 }] })), + }; + + const mockDb = { + $client: mockClient, + select: vi.fn(), + // No direct execute method + }; + + const instrumented = instrumentDrizzleClient(mockDb); + expect(instrumented).toBe(mockDb); + + // Execute a query through the client + const result = await mockClient.query("SELECT * FROM users"); + expect(result).toEqual({ rows: [{ id: 1 }] }); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + expect(spans[0]?.name).toBe("drizzle.select"); + expect(spans[0]?.attributes["db.statement"]).toBe("SELECT * FROM users"); + }); + + it("instruments a db with _.session.execute property", async () => { + const mockSession = { + execute: vi.fn(() => Promise.resolve({ rows: [{ id: 2 }] })), + }; + + const mockDb = { + _: { + session: mockSession, + }, + select: vi.fn(), + }; + + const instrumented = instrumentDrizzleClient(mockDb); + expect(instrumented).toBe(mockDb); + + // Execute a query through the session + const result = await mockSession.execute("INSERT INTO users (name) VALUES ('test')"); + expect(result).toEqual({ rows: [{ id: 2 }] }); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + expect(spans[0]?.name).toBe("drizzle.insert"); + expect(spans[0]?.attributes["db.operation"]).toBe("INSERT"); + }); + + it("instruments session.query method", async () => { + const mockSession = { + query: vi.fn(() => Promise.resolve({ rows: [] })), + }; + + const mockDb = { + session: mockSession, + }; + + instrumentDrizzleClient(mockDb); + + // Direct query through session + await mockSession.query("INSERT INTO users (name) VALUES ($1)", ["John"]); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + expect(spans[0]?.name).toBe("drizzle.insert"); + expect(spans[0]?.attributes["db.statement"]).toBe("INSERT INTO users (name) VALUES ($1)"); + }); + + it("only instruments once when called multiple times", async () => { + const mockClient = { + query: vi.fn(() => Promise.resolve({ rows: [] })), + }; + + const mockDb = { + $client: mockClient, + }; + + const firstInstrumented = instrumentDrizzleClient(mockDb); + const wrappedQuery = mockClient.query; + + const secondInstrumented = instrumentDrizzleClient(mockDb); + + expect(firstInstrumented).toBe(mockDb); + expect(secondInstrumented).toBe(mockDb); + expect(mockClient.query).toBe(wrappedQuery); + }); + + it("respects custom configuration", async () => { + const mockClient = { + query: vi.fn(() => Promise.resolve({ rows: [] })), + }; + + const mockDb = { + $client: mockClient, + }; + + const config: InstrumentDrizzleConfig = { + dbSystem: "mysql", + dbName: "test_db", + peerName: "db.example.com", + peerPort: 3306, + }; + + instrumentDrizzleClient(mockDb, config); + + await mockClient.query("SELECT 1"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + if (!span) { + throw new Error("Expected a recorded span"); + } + + expect(span.attributes["db.system"]).toBe("mysql"); + expect(span.attributes["db.name"]).toBe("test_db"); + expect(span.attributes["net.peer.name"]).toBe("db.example.com"); + expect(span.attributes["net.peer.port"]).toBe(3306); + }); + + it("returns db unchanged if db is null", () => { + const result = instrumentDrizzleClient(null as any); + expect(result).toBeNull(); + }); + + it("returns db unchanged if no instrumentable properties exist", () => { + const mockDb = { + select: vi.fn(), + // No $client or _.session properties + }; + + const result = instrumentDrizzleClient(mockDb); + expect(result).toBe(mockDb); + }); + + it("handles errors in session.execute", async () => { + const error = new Error("database error"); + const mockSession = { + execute: vi.fn(() => Promise.reject(error)), + }; + + const mockDb = { + _: { + session: mockSession, + }, + }; + + instrumentDrizzleClient(mockDb); + + await expect(mockSession.execute("DELETE FROM users")).rejects.toThrow(error); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + expect(spans[0]?.status.code).toBe(SpanStatusCode.ERROR); + }); + + it("instruments transaction execute calls", async () => { + let txObject: any; + + const mockSession = { + transaction: vi.fn(async (callback: any) => { + // Create a mock transaction object + txObject = { + execute: vi.fn(() => Promise.resolve({ rows: [] })), + }; + return callback(txObject); + }), + }; + + const mockDb = { + session: mockSession, + }; + + instrumentDrizzleClient(mockDb); + + // Execute a transaction with RLS queries + await mockSession.transaction(async (tx: any) => { + await tx.execute({ sql: "SET LOCAL role org_role" }); + await tx.execute({ sql: "SELECT set_config('request.org_id', $1, true)", params: ["org123"] }); + }); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(2); + expect(spans[0]?.name).toBe("drizzle.set"); + expect(spans[0]?.attributes["db.statement"]).toBe("SET LOCAL role org_role"); + expect(spans[0]?.attributes["db.transaction"]).toBe(true); + expect(spans[1]?.name).toBe("drizzle.select"); + expect(spans[1]?.attributes["db.transaction"]).toBe(true); + }); + + it("only instruments session once when called multiple times", () => { + const mockSession = { + execute: vi.fn(() => Promise.resolve({ rows: [] })), + }; + + const mockDb = { + _: { + session: mockSession, + }, + }; + + instrumentDrizzleClient(mockDb); + const wrappedExecute = mockSession.execute; + + instrumentDrizzleClient(mockDb); + + expect(mockSession.execute).toBe(wrappedExecute); + }); }); diff --git a/packages/otel-drizzle/src/index.ts b/packages/otel-drizzle/src/index.ts index d6ed67c..fa91c34 100644 --- a/packages/otel-drizzle/src/index.ts +++ b/packages/otel-drizzle/src/index.ts @@ -22,11 +22,13 @@ export const SEMATTRS_NET_PEER_PORT = "net.peer.port"; type QueryCallback = (error: unknown, result: unknown) => void; -type QueryFunction = (...args: unknown[]) => unknown; +type QueryFunction = (...args: any[]) => any; interface DrizzleClientLike { - query: QueryFunction; + query?: QueryFunction; + execute?: QueryFunction; [INSTRUMENTED_FLAG]?: true; + [key: string]: any; // Allow other properties } /** @@ -82,14 +84,14 @@ function extractQueryText(queryArg: unknown): string | undefined { return queryArg; } if (queryArg && typeof queryArg === "object") { + // Generic SQL object format (used by LibSQL, MySQL, and others) + if (typeof (queryArg as { sql?: unknown }).sql === "string") { + return (queryArg as { sql: string }).sql; + } // PostgreSQL-style query object if (typeof (queryArg as { text?: unknown }).text === "string") { return (queryArg as { text: string }).text; } - // MySQL/generic-style query object - if (typeof (queryArg as { sql?: unknown }).sql === "string") { - return (queryArg as { sql: string }).sql; - } // Drizzle SQL object if ( typeof (queryArg as { queryChunks?: unknown }).queryChunks === "object" @@ -141,41 +143,76 @@ function finalizeSpan(span: Span, error?: unknown): void { } /** - * Instruments a Drizzle database client with OpenTelemetry tracing. + * Instruments a database connection pool/client with OpenTelemetry tracing. * - * This function wraps the client's `query` method to automatically create - * spans for each database operation. It supports both promise-based and - * callback-based query patterns. + * This function wraps the connection's `query` and `execute` methods to create spans for each database + * operation. + * The instrumentation is idempotent - calling it multiple times on the same connection will only + * instrument it once. * - * The instrumentation is idempotent - calling it multiple times on the same - * client will only instrument it once. - * - * @typeParam TClient - The type of the Drizzle client - * @param client - The Drizzle client instance to instrument + * @typeParam TClient - The type of the database connection pool or client + * @param client - The database connection pool or client to instrument * @param config - Optional configuration for instrumentation behavior - * @returns The instrumented client (same instance, modified in place) + * @returns The instrumented pool/client (same instance, modified in place) * * @example * ```typescript + * // PostgreSQL with node-postgres * import { drizzle } from 'drizzle-orm/node-postgres'; * import { Pool } from 'pg'; * import { instrumentDrizzle } from '@kubiks/otel-drizzle'; * * const pool = new Pool({ connectionString: process.env.DATABASE_URL }); - * const db = drizzle(pool); - * - * // Instrument with defaults - * instrumentDrizzle(db); - * - * // Or with custom configuration - * instrumentDrizzle(db, { + * const instrumentedPool = instrumentDrizzle(pool, { * dbSystem: 'postgresql', * dbName: 'myapp', - * captureQueryText: true, - * maxQueryTextLength: 500, * peerName: 'db.example.com', * peerPort: 5432, * }); + * const db = drizzle({ client: instrumentedPool }); + * ``` + * + * @example + * ```typescript + * // MySQL with mysql2 + * import { drizzle } from 'drizzle-orm/mysql2'; + * import mysql from 'mysql2/promise'; + * import { instrumentDrizzle } from '@kubiks/otel-drizzle'; + * + * const connection = await mysql.createConnection({ + * host: 'localhost', + * user: 'root', + * database: 'mydb', + * }); + * const instrumentedConnection = instrumentDrizzle(connection, { dbSystem: 'mysql' }); + * const db = drizzle({ client: instrumentedConnection }); + * ``` + * + * @example + * ```typescript + * // SQLite with better-sqlite3 + * import { drizzle } from 'drizzle-orm/better-sqlite3'; + * import Database from 'better-sqlite3'; + * import { instrumentDrizzle } from '@kubiks/otel-drizzle'; + * + * const sqlite = new Database('sqlite.db'); + * const instrumentedSqlite = instrumentDrizzle(sqlite, { dbSystem: 'sqlite' }); + * const db = drizzle({ client: instrumentedSqlite }); + * ``` + * + * @example + * ```typescript + * // LibSQL/Turso + * import { drizzle } from 'drizzle-orm/libsql'; + * import { createClient } from '@libsql/client'; + * import { instrumentDrizzle } from '@kubiks/otel-drizzle'; + * + * const client = createClient({ + * url: process.env.DATABASE_URL!, + * authToken: process.env.DATABASE_AUTH_TOKEN, + * }); + * const instrumentedClient = instrumentDrizzle(client, { dbSystem: 'sqlite' }); + * const db = drizzle({ client: instrumentedClient }); * ``` */ export function instrumentDrizzle( @@ -185,7 +222,12 @@ export function instrumentDrizzle( if (!client) { return client; } - if (typeof client.query !== "function") { + + // Check if client has query or execute method + const hasQuery = typeof client.query === "function"; + const hasExecute = typeof client.execute === "function"; + + if (!hasQuery && !hasExecute) { return client; } @@ -204,11 +246,18 @@ export function instrumentDrizzle( } = config ?? {}; const tracer = trace.getTracer(tracerName); - const originalQuery = client.query; - const instrumentedQuery: QueryFunction = function instrumented( - this: unknown, - ...incomingArgs: unknown[] + // Store the original method (query or execute) + const methodName = hasQuery ? "query" : "execute"; + const originalMethod = hasQuery ? client.query : client.execute; + + if (!originalMethod) { + return client; + } + + const instrumentedMethod: QueryFunction = function instrumented( + this: any, + ...incomingArgs: any[] ) { const args = [...incomingArgs]; let callback: QueryCallback | undefined; @@ -263,7 +312,7 @@ export function instrumentDrizzle( }; try { - return originalQuery.apply(this, [...args, wrappedCallback]); + return originalMethod.apply(this, [...args, wrappedCallback]); } catch (error) { finalizeSpan(span, error); throw error; @@ -274,7 +323,7 @@ export function instrumentDrizzle( // Promise-based pattern return context.with(activeContext, () => { try { - const result = originalQuery.apply(this, args); + const result = originalMethod.apply(this, args); return Promise.resolve(result) .then((value) => { finalizeSpan(span); @@ -292,7 +341,651 @@ export function instrumentDrizzle( }; client[INSTRUMENTED_FLAG] = true; - client.query = instrumentedQuery; + + // Replace the original method with the instrumented one + if (hasQuery) { + client.query = instrumentedMethod; + } else { + client.execute = instrumentedMethod; + } return client; } + +/** + * Interface for Drizzle database instances with minimal type requirements. + */ +interface DrizzleDbLike { + $client?: DrizzleClientLike | any; // Allow any client type + execute?: QueryFunction; // Direct execute method on db + transaction?: QueryFunction; // Transaction method on db + _?: { + session?: { + execute?: QueryFunction; + [INSTRUMENTED_FLAG]?: true; + [key: string]: any; + }; + [key: string]: any; + }; + [INSTRUMENTED_FLAG]?: true; + [key: string]: any; // Allow other properties +} + +/** + * Instruments a Drizzle database instance with OpenTelemetry tracing. + * + * This function instruments the database at the session level, automatically tracing all database + * operations including query builders, direct SQL execution, and transactions. + * + * The instrumentation is idempotent - calling it multiple times on the same + * database will only instrument it once. + * + * @typeParam TDb - The type of the Drizzle database instance + * @param db - The Drizzle database instance to instrument + * @param config - Optional configuration for instrumentation behavior + * @returns The instrumented database instance (same instance, modified in place) + * + * @example + * ```typescript + * // PostgreSQL with postgres.js + * import { drizzle } from 'drizzle-orm/postgres-js'; + * import postgres from 'postgres'; + * import { instrumentDrizzleClient } from '@kubiks/otel-drizzle'; + * + * // Using connection string + * const db = drizzle(process.env.DATABASE_URL!); + * instrumentDrizzleClient(db, { dbSystem: 'postgresql' }); + * + * // Or with a client instance + * const queryClient = postgres(process.env.DATABASE_URL!); + * const db = drizzle({ client: queryClient }); + * instrumentDrizzleClient(db, { dbSystem: 'postgresql' }); + * ``` + * + * @example + * ```typescript + * // PostgreSQL with node-postgres (pg) + * import { drizzle } from 'drizzle-orm/node-postgres'; + * import { Pool } from 'pg'; + * import { instrumentDrizzleClient } from '@kubiks/otel-drizzle'; + * + * // Using connection string + * const db = drizzle(process.env.DATABASE_URL!); + * instrumentDrizzleClient(db, { dbSystem: 'postgresql' }); + * + * // Or with a pool + * const pool = new Pool({ connectionString: process.env.DATABASE_URL }); + * const db = drizzle({ client: pool }); + * instrumentDrizzleClient(db, { + * dbSystem: 'postgresql', + * dbName: 'myapp', + * peerName: 'db.example.com', + * peerPort: 5432, + * }); + * ``` + * + * @example + * ```typescript + * // MySQL with mysql2 + * import { drizzle } from 'drizzle-orm/mysql2'; + * import mysql from 'mysql2/promise'; + * import { instrumentDrizzleClient } from '@kubiks/otel-drizzle'; + * + * // Using connection string + * const db = drizzle(process.env.DATABASE_URL!); + * instrumentDrizzleClient(db, { dbSystem: 'mysql' }); + * + * // Or with a connection + * const connection = await mysql.createConnection({ + * host: 'localhost', + * user: 'root', + * database: 'mydb', + * }); + * const db = drizzle({ client: connection }); + * instrumentDrizzleClient(db, { + * dbSystem: 'mysql', + * dbName: 'mydb', + * peerName: 'localhost', + * peerPort: 3306, + * }); + * ``` + * + * @example + * ```typescript + * // SQLite with better-sqlite3 + * import { drizzle } from 'drizzle-orm/better-sqlite3'; + * import Database from 'better-sqlite3'; + * import { instrumentDrizzleClient } from '@kubiks/otel-drizzle'; + * + * // Using file path + * const db = drizzle('sqlite.db'); + * instrumentDrizzleClient(db, { dbSystem: 'sqlite' }); + * + * // Or with a Database instance + * const sqlite = new Database('sqlite.db'); + * const db = drizzle({ client: sqlite }); + * instrumentDrizzleClient(db, { dbSystem: 'sqlite' }); + * ``` + * + * @example + * ```typescript + * // SQLite with LibSQL/Turso + * import { drizzle } from 'drizzle-orm/libsql'; + * import { createClient } from '@libsql/client'; + * import { instrumentDrizzleClient } from '@kubiks/otel-drizzle'; + * + * // Using connection config + * const db = drizzle({ + * connection: { + * url: process.env.DATABASE_URL!, + * authToken: process.env.DATABASE_AUTH_TOKEN, + * } + * }); + * instrumentDrizzleClient(db, { dbSystem: 'sqlite' }); + * + * // Or with a client instance + * const client = createClient({ + * url: process.env.DATABASE_URL!, + * authToken: process.env.DATABASE_AUTH_TOKEN, + * }); + * const db = drizzle({ client }); + * instrumentDrizzleClient(db, { dbSystem: 'sqlite' }); + * ``` + */ +export function instrumentDrizzleClient( + db: TDb, + config?: InstrumentDrizzleConfig, +): TDb { + if (!db) { + return db; + } + + // Check if already instrumented + if (db[INSTRUMENTED_FLAG]) { + return db; + } + + const { + tracerName = DEFAULT_TRACER_NAME, + dbSystem = DEFAULT_DB_SYSTEM, + dbName, + captureQueryText = true, + maxQueryTextLength = 1000, + peerName, + peerPort, + } = config ?? {}; + + const tracer = trace.getTracer(tracerName); + let instrumented = false; + + // First priority: Instrument the session directly + // This is where all queries actually go through + if ((db as any).session && !instrumented) { + const session = (db as any).session; + + // Check if session has prepareQuery method (used by select/insert/update/delete) + if ( + typeof session.prepareQuery === "function" && + !session[INSTRUMENTED_FLAG] + ) { + const originalPrepareQuery = session.prepareQuery; + + session.prepareQuery = function (...args: any[]) { + const prepared = originalPrepareQuery.apply(this, args); + + // Wrap the prepared query's execute method + if (prepared && typeof prepared.execute === "function") { + const originalPreparedExecute = prepared.execute; + + prepared.execute = function (this: any, ...executeArgs: any[]) { + // Extract query information from the query object + const queryObj = args[0]; // The query object passed to prepareQuery + const queryText = + queryObj?.sql || + queryObj?.queryString || + extractQueryText(queryObj); + const operation = queryText + ? extractOperation(queryText) + : undefined; + const spanName = operation + ? `drizzle.${operation.toLowerCase()}` + : "drizzle.query"; + + // Start span + const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT }); + span.setAttribute(SEMATTRS_DB_SYSTEM, dbSystem); + + if (operation) { + span.setAttribute(SEMATTRS_DB_OPERATION, operation); + } + + if (dbName) { + span.setAttribute(SEMATTRS_DB_NAME, dbName); + } + + if (captureQueryText && queryText !== undefined) { + const sanitized = sanitizeQueryText( + queryText, + maxQueryTextLength, + ); + span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized); + } + + if (peerName) { + span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName); + } + + if (peerPort) { + span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort); + } + + const activeContext = trace.setSpan(context.active(), span); + + // Execute the prepared query + return context.with(activeContext, () => { + try { + const result = originalPreparedExecute.apply(this, executeArgs); + return Promise.resolve(result) + .then((value) => { + finalizeSpan(span); + return value; + }) + .catch((error) => { + finalizeSpan(span, error); + throw error; + }); + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }); + }; + } + + return prepared; + }; + + session[INSTRUMENTED_FLAG] = true; + instrumented = true; + } + + // Also instrument direct query method if exists + if ( + typeof session.query === "function" && + !session[INSTRUMENTED_FLAG + "_query"] + ) { + const originalQuery = session.query; + + session.query = function (this: any, queryString: string, params: any[]) { + const operation = queryString + ? extractOperation(queryString) + : undefined; + const spanName = operation + ? `drizzle.${operation.toLowerCase()}` + : "drizzle.query"; + + // Start span + const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT }); + span.setAttribute(SEMATTRS_DB_SYSTEM, dbSystem); + + if (operation) { + span.setAttribute(SEMATTRS_DB_OPERATION, operation); + } + + if (dbName) { + span.setAttribute(SEMATTRS_DB_NAME, dbName); + } + + if (captureQueryText && queryString !== undefined) { + const sanitized = sanitizeQueryText(queryString, maxQueryTextLength); + span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized); + } + + if (peerName) { + span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName); + } + + if (peerPort) { + span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort); + } + + const activeContext = trace.setSpan(context.active(), span); + + // Execute the query + return context.with(activeContext, () => { + try { + const result = originalQuery.apply(this, [queryString, params]); + return Promise.resolve(result) + .then((value) => { + finalizeSpan(span); + return value; + }) + .catch((error) => { + finalizeSpan(span, error); + throw error; + }); + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }); + }; + + session[INSTRUMENTED_FLAG + "_query"] = true; + instrumented = true; + } + + // Instrument transaction method to ensure transaction sessions are also instrumented + if ( + typeof session.transaction === "function" && + !session[INSTRUMENTED_FLAG + "_transaction"] + ) { + const originalTransaction = session.transaction; + + session.transaction = function ( + this: any, + transactionCallback: any, + ...restArgs: any[] + ) { + // Wrap the transaction callback to instrument the tx object + const wrappedCallback = async function (tx: any) { + // Instrument the transaction's session if it has one + if (tx && (tx.session || tx._?.session || tx)) { + const txSession = tx.session || tx._?.session || tx; + + // Instrument tx.execute if it exists + if ( + typeof tx.execute === "function" && + !tx[INSTRUMENTED_FLAG + "_execute"] + ) { + const originalTxExecute = tx.execute; + + tx.execute = function (this: any, ...executeArgs: any[]) { + const queryText = extractQueryText(executeArgs[0]); + const operation = queryText + ? extractOperation(queryText) + : undefined; + const spanName = operation + ? `drizzle.${operation.toLowerCase()}` + : "drizzle.query"; + + // Start span + const span = tracer.startSpan(spanName, { + kind: SpanKind.CLIENT, + }); + span.setAttribute(SEMATTRS_DB_SYSTEM, dbSystem); + span.setAttribute("db.transaction", true); + + if (operation) { + span.setAttribute(SEMATTRS_DB_OPERATION, operation); + } + + if (dbName) { + span.setAttribute(SEMATTRS_DB_NAME, dbName); + } + + if (captureQueryText && queryText !== undefined) { + const sanitized = sanitizeQueryText( + queryText, + maxQueryTextLength, + ); + span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized); + } + + if (peerName) { + span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName); + } + + if (peerPort) { + span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort); + } + + const activeContext = trace.setSpan(context.active(), span); + + // Execute the query + return context.with(activeContext, () => { + try { + const result = originalTxExecute.apply(this, executeArgs); + return Promise.resolve(result) + .then((value) => { + finalizeSpan(span); + return value; + }) + .catch((error) => { + finalizeSpan(span, error); + throw error; + }); + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }); + }; + + tx[INSTRUMENTED_FLAG + "_execute"] = true; + } + + // Also instrument txSession.prepareQuery if it exists + if ( + typeof txSession.prepareQuery === "function" && + !txSession[INSTRUMENTED_FLAG + "_tx"] + ) { + const originalTxPrepareQuery = txSession.prepareQuery; + + txSession.prepareQuery = function (...prepareArgs: any[]) { + const prepared = originalTxPrepareQuery.apply( + this, + prepareArgs, + ); + + // Wrap the prepared query's execute method + if (prepared && typeof prepared.execute === "function") { + const originalPreparedExecute = prepared.execute; + + prepared.execute = function ( + this: any, + ...executeArgs: any[] + ) { + // Extract query information from the query object + const queryObj = prepareArgs[0]; // The query object passed to prepareQuery + const queryText = + queryObj?.sql || + queryObj?.queryString || + extractQueryText(queryObj); + const operation = queryText + ? extractOperation(queryText) + : undefined; + const spanName = operation + ? `drizzle.${operation.toLowerCase()}` + : "drizzle.query"; + + // Start span + const span = tracer.startSpan(spanName, { + kind: SpanKind.CLIENT, + }); + span.setAttribute(SEMATTRS_DB_SYSTEM, dbSystem); + span.setAttribute("db.transaction", true); + + if (operation) { + span.setAttribute(SEMATTRS_DB_OPERATION, operation); + } + + if (dbName) { + span.setAttribute(SEMATTRS_DB_NAME, dbName); + } + + if (captureQueryText && queryText !== undefined) { + const sanitized = sanitizeQueryText( + queryText, + maxQueryTextLength, + ); + span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized); + } + + if (peerName) { + span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName); + } + + if (peerPort) { + span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort); + } + + const activeContext = trace.setSpan(context.active(), span); + + // Execute the prepared query + return context.with(activeContext, () => { + try { + const result = originalPreparedExecute.apply( + this, + executeArgs, + ); + return Promise.resolve(result) + .then((value) => { + finalizeSpan(span); + return value; + }) + .catch((error) => { + finalizeSpan(span, error); + throw error; + }); + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }); + }; + } + + return prepared; + }; + + txSession[INSTRUMENTED_FLAG + "_tx"] = true; + } + } + + // Call the original callback with the instrumented tx + return transactionCallback(tx); + }; + + // Call the original transaction with the wrapped callback + return originalTransaction.apply(this, [wrappedCallback, ...restArgs]); + }; + + session[INSTRUMENTED_FLAG + "_transaction"] = true; + instrumented = true; + } + } + + if (db.$client && !instrumented) { + const client = db.$client; + // Check if client has query or execute function + if ( + typeof client.query === "function" || + typeof client.execute === "function" + ) { + instrumentDrizzle(client, config); + instrumented = true; + } + } + + // Third priority: Try to instrument via session.execute as fallback + if ( + db._ && + db._.session && + typeof db._.session.execute === "function" && + !instrumented + ) { + const session = db._.session; + + // Check if already instrumented + if (session[INSTRUMENTED_FLAG]) { + return db; + } + + const { + tracerName = DEFAULT_TRACER_NAME, + dbSystem = DEFAULT_DB_SYSTEM, + dbName, + captureQueryText = true, + maxQueryTextLength = 1000, + peerName, + peerPort, + } = config ?? {}; + + const tracer = trace.getTracer(tracerName); + const originalExecute = session.execute; + + if (!originalExecute) { + return db; + } + + const instrumentedExecute: QueryFunction = function instrumented( + this: any, + ...args: any[] + ) { + // Extract query information + const queryText = extractQueryText(args[0]); + const operation = queryText ? extractOperation(queryText) : undefined; + const spanName = operation + ? `drizzle.${operation.toLowerCase()}` + : "drizzle.query"; + + // Start span + const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT }); + span.setAttribute(SEMATTRS_DB_SYSTEM, dbSystem); + + if (operation) { + span.setAttribute(SEMATTRS_DB_OPERATION, operation); + } + + if (dbName) { + span.setAttribute(SEMATTRS_DB_NAME, dbName); + } + + if (captureQueryText && queryText !== undefined) { + const sanitized = sanitizeQueryText(queryText, maxQueryTextLength); + span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized); + } + + if (peerName) { + span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName); + } + + if (peerPort) { + span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort); + } + + const activeContext = trace.setSpan(context.active(), span); + + // Promise-based pattern (session.execute is typically promise-based) + return context.with(activeContext, () => { + try { + const result = originalExecute.apply(this, args); + return Promise.resolve(result) + .then((value) => { + finalizeSpan(span); + return value; + }) + .catch((error) => { + finalizeSpan(span, error); + throw error; + }); + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }); + }; + + session[INSTRUMENTED_FLAG] = true; + session.execute = instrumentedExecute; + instrumented = true; + } + + // Mark the db as instrumented if we instrumented anything + if (instrumented) { + db[INSTRUMENTED_FLAG] = true; + } + + return db; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9dfeebe..1234b90 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -66,7 +66,7 @@ importers: specifier: ^0.36.4 version: 0.36.4(@opentelemetry/api@1.9.0)(@types/pg@8.15.5)(@types/react@18.2.46)(kysely@0.28.7)(postgres@3.4.7)(react@18.2.0) postgres: - specifier: ^3.4.5 + specifier: ^3.4.7 version: 3.4.7 rimraf: specifier: 3.0.2