Merge pull request #15 from kubiks-inc/13-jsdoc-is-incorrect-and-does-not-match-docs

13 jsdoc is incorrect and does not match docs
This commit is contained in:
Alex Holovach
2025-10-05 08:38:59 -05:00
committed by GitHub
7 changed files with 1168 additions and 66 deletions
+5
View File
@@ -0,0 +1,5 @@
---
"@kubiks/otel-drizzle": minor
---
Added instrumentDrizzleClient func to instrument any drizzle instance
+126 -29
View File
@@ -42,71 +42,168 @@ Works with any observability platform that supports OpenTelemetry including:
## Usage
Simply wrap your database connection pool with `instrumentDrizzle()` before passing it to Drizzle:
### Instrument Your Drizzle Database (Recommended)
Use `instrumentDrizzleClient()` to add tracing to your Drizzle database instance. This is the simplest and most straightforward approach:
```typescript
import { drizzle } from "drizzle-orm/node-postgres";
import { Pool } from "pg";
import { instrumentDrizzle } from "@kubiks/otel-drizzle";
import { drizzle } from "drizzle-orm/postgres-js";
import { instrumentDrizzleClient } from "@kubiks/otel-drizzle";
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const instrumentedPool = instrumentDrizzle(pool);
const db = drizzle(instrumentedPool);
// Create your Drizzle database instance as usual
const db = drizzle(process.env.DATABASE_URL!);
// Add instrumentation with a single line
instrumentDrizzleClient(db);
// That's it! All queries are now traced automatically
const users = await db.select().from(usersTable);
```
### Optional Configuration
### Database-Specific Examples
#### PostgreSQL
```typescript
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const instrumentedPool = instrumentDrizzle(pool, {
dbSystem: "postgresql", // Database type (default: 'postgresql')
dbName: "myapp", // Database name for spans
captureQueryText: true, // Include SQL in traces (default: true)
maxQueryTextLength: 1000, // Max SQL length (default: 1000)
peerName: "db.example.com", // Database server hostname
peerPort: 5432, // Database server port
// PostgreSQL with postgres.js (recommended for serverless)
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { instrumentDrizzleClient } from "@kubiks/otel-drizzle";
// Using connection string directly
const db = drizzle(process.env.DATABASE_URL!);
instrumentDrizzleClient(db, { dbSystem: "postgresql" });
// Or with a client instance
const queryClient = postgres(process.env.DATABASE_URL!);
const db = drizzle({ client: queryClient });
instrumentDrizzleClient(db, {
dbSystem: "postgresql",
dbName: "myapp",
peerName: "db.example.com",
peerPort: 5432,
});
const db = drizzle(instrumentedPool);
```
### Works with All Drizzle-Supported Databases
This package supports **all databases that Drizzle ORM supports**, including PostgreSQL, MySQL, SQLite, Turso, Neon, PlanetScale, and more.
```typescript
// PostgreSQL with node-postgres
// PostgreSQL with node-postgres (pg)
import { drizzle } from "drizzle-orm/node-postgres";
import { Pool } from "pg";
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const db = drizzle(instrumentDrizzle(pool));
import { instrumentDrizzleClient } from "@kubiks/otel-drizzle";
// Using connection string directly
const db = drizzle(process.env.DATABASE_URL!);
instrumentDrizzleClient(db, { dbSystem: "postgresql" });
// Or with a pool instance
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const db = drizzle({ client: pool });
instrumentDrizzleClient(db, { dbSystem: "postgresql" });
```
#### MySQL
```typescript
// MySQL with mysql2
import { drizzle } from "drizzle-orm/mysql2";
import mysql from "mysql2/promise";
const connection = await mysql.createConnection(process.env.DATABASE_URL);
const db = drizzle(instrumentDrizzle(connection, { dbSystem: "mysql" }));
import { instrumentDrizzleClient } from "@kubiks/otel-drizzle";
// Using connection string directly
const db = drizzle(process.env.DATABASE_URL!);
instrumentDrizzleClient(db, { dbSystem: "mysql" });
// Or with a connection instance
const connection = await mysql.createConnection({
host: "localhost",
user: "root",
database: "mydb",
// ... other connection options
});
const db = drizzle({ client: connection });
instrumentDrizzleClient(db, {
dbSystem: "mysql",
dbName: "mydb",
peerName: "localhost",
peerPort: 3306,
});
```
#### SQLite
```typescript
// SQLite with better-sqlite3
import { drizzle } from "drizzle-orm/better-sqlite3";
import Database from "better-sqlite3";
const sqlite = new Database("database.db");
const db = drizzle(instrumentDrizzle(sqlite, { dbSystem: "sqlite" }));
import { instrumentDrizzleClient } from "@kubiks/otel-drizzle";
// Using file path directly
const db = drizzle("sqlite.db");
instrumentDrizzleClient(db, { dbSystem: "sqlite" });
// Or with a Database instance
const sqlite = new Database("sqlite.db");
const db = drizzle({ client: sqlite });
instrumentDrizzleClient(db, { dbSystem: "sqlite" });
```
```typescript
// SQLite with LibSQL/Turso
import { drizzle } from "drizzle-orm/libsql";
import { createClient } from "@libsql/client";
import { instrumentDrizzleClient } from "@kubiks/otel-drizzle";
// Using connection config directly
const db = drizzle({
connection: {
url: process.env.DATABASE_URL!,
authToken: process.env.DATABASE_AUTH_TOKEN,
}
});
instrumentDrizzleClient(db, { dbSystem: "sqlite" });
// Or with a client instance
const client = createClient({
url: process.env.DATABASE_URL!,
authToken: process.env.DATABASE_AUTH_TOKEN,
});
const db = drizzle({ client });
instrumentDrizzleClient(db, { dbSystem: "sqlite" });
```
### Configuration Options
```typescript
instrumentDrizzleClient(db, {
dbSystem: "postgresql", // Database type: 'postgresql' | 'mysql' | 'sqlite' (default: 'postgresql')
dbName: "myapp", // Database name for spans
captureQueryText: true, // Include SQL in traces (default: true)
maxQueryTextLength: 1000, // Max SQL length (default: 1000)
peerName: "db.example.com", // Database server hostname
peerPort: 5432, // Database server port
});
```
## What You Get
Each database query automatically creates a span with rich telemetry data:
- **Span name**: `drizzle.select`, `drizzle.insert`, `drizzle.update`, etc.
- **Operation type**: `db.operation` attribute (SELECT, INSERT, UPDATE, DELETE)
- **Operation type**: `db.operation` attribute (SELECT, INSERT, UPDATE, DELETE, SET)
- **SQL query text**: Full query statement captured in `db.statement` (configurable)
- **Database system**: `db.system` attribute (postgresql, mysql, sqlite, etc.)
- **Transaction tracking**: Transaction queries are marked with `db.transaction` attribute
- **Error tracking**: Exceptions are recorded with stack traces and proper span status
- **Performance metrics**: Duration and timing information for every query
### Transaction Support
All queries within transactions are automatically traced, including:
- RLS (Row Level Security) queries like `SET LOCAL role` and `set_config()`
- All nested transaction queries
- Transaction rollbacks and commits
### Span Attributes
The instrumentation adds the following attributes to each span following [OpenTelemetry semantic conventions](https://opentelemetry.io/docs/specs/semconv/database/):
Binary file not shown.
+1 -2
View File
@@ -36,14 +36,13 @@
"unit-test": "vitest --run",
"unit-test-watch": "vitest"
},
"dependencies": {},
"devDependencies": {
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/sdk-trace-base": "^2.1.0",
"@types/node": "18.15.11",
"@types/pg": "^8.11.10",
"drizzle-orm": "^0.36.4",
"postgres": "^3.4.5",
"postgres": "^3.4.7",
"rimraf": "3.0.2",
"typescript": "^5",
"vitest": "0.33.0"
+309 -1
View File
@@ -5,7 +5,7 @@ import {
InMemorySpanExporter,
SimpleSpanProcessor,
} from "@opentelemetry/sdk-trace-base";
import { instrumentDrizzle, type InstrumentDrizzleConfig } from "./index";
import { instrumentDrizzle, instrumentDrizzleClient, type InstrumentDrizzleConfig } from "./index";
interface MockDrizzleClient {
query: (...args: any[]) => unknown;
@@ -295,4 +295,312 @@ describe("instrumentDrizzle", () => {
const result = instrumentDrizzle(null as any);
expect(result).toBeNull();
});
it("instruments a client with execute method instead of query", async () => {
const client = {
execute: vi.fn(() => Promise.resolve({ rows: [{ id: 1 }] })),
};
instrumentDrizzle(client);
// Execute with SQL object format (used by various drivers)
const result = await client.execute({
sql: "SELECT * FROM users WHERE id = ?",
args: [1],
});
expect(result).toEqual({ rows: [{ id: 1 }] });
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a recorded span");
}
expect(span.name).toBe("drizzle.select");
expect(span.attributes["db.statement"]).toBe("SELECT * FROM users WHERE id = ?");
expect(span.attributes["db.operation"]).toBe("SELECT");
});
it("instruments a client with execute method using string query", async () => {
const client = {
execute: vi.fn(() => Promise.resolve({ rows: [] })),
};
instrumentDrizzle(client);
await client.execute("DELETE FROM users");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a recorded span");
}
expect(span.name).toBe("drizzle.delete");
expect(span.attributes["db.operation"]).toBe("DELETE");
expect(span.attributes["db.statement"]).toBe("DELETE FROM users");
});
});
describe("instrumentDrizzleClient", () => {
let provider: BasicTracerProvider;
let exporter: InMemorySpanExporter;
beforeEach(() => {
exporter = new InMemorySpanExporter();
provider = new BasicTracerProvider({
spanProcessors: [new SimpleSpanProcessor(exporter)],
});
trace.setGlobalTracerProvider(provider);
});
afterEach(async () => {
await provider.shutdown();
exporter.reset();
trace.disable();
});
it("instruments a db with session.prepareQuery method", async () => {
const mockPreparedQuery = {
execute: vi.fn(() => Promise.resolve({ rows: [{ id: 1 }] })),
};
const mockSession = {
prepareQuery: vi.fn(() => mockPreparedQuery),
};
const mockDb = {
session: mockSession,
select: vi.fn(),
};
const instrumented = instrumentDrizzleClient(mockDb);
expect(instrumented).toBe(mockDb);
// Simulate what happens when db.select().from() is called
const prepared = mockSession.prepareQuery({ sql: "SELECT * FROM users" });
const result = await prepared.execute();
expect(result).toEqual({ rows: [{ id: 1 }] });
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
expect(spans[0]?.name).toBe("drizzle.select");
expect(spans[0]?.attributes["db.statement"]).toBe("SELECT * FROM users");
});
it("instruments a db with $client property as fallback", async () => {
const mockClient = {
query: vi.fn(() => Promise.resolve({ rows: [{ id: 1 }] })),
};
const mockDb = {
$client: mockClient,
select: vi.fn(),
// No direct execute method
};
const instrumented = instrumentDrizzleClient(mockDb);
expect(instrumented).toBe(mockDb);
// Execute a query through the client
const result = await mockClient.query("SELECT * FROM users");
expect(result).toEqual({ rows: [{ id: 1 }] });
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
expect(spans[0]?.name).toBe("drizzle.select");
expect(spans[0]?.attributes["db.statement"]).toBe("SELECT * FROM users");
});
it("instruments a db with _.session.execute property", async () => {
const mockSession = {
execute: vi.fn(() => Promise.resolve({ rows: [{ id: 2 }] })),
};
const mockDb = {
_: {
session: mockSession,
},
select: vi.fn(),
};
const instrumented = instrumentDrizzleClient(mockDb);
expect(instrumented).toBe(mockDb);
// Execute a query through the session
const result = await mockSession.execute("INSERT INTO users (name) VALUES ('test')");
expect(result).toEqual({ rows: [{ id: 2 }] });
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
expect(spans[0]?.name).toBe("drizzle.insert");
expect(spans[0]?.attributes["db.operation"]).toBe("INSERT");
});
it("instruments session.query method", async () => {
const mockSession = {
query: vi.fn(() => Promise.resolve({ rows: [] })),
};
const mockDb = {
session: mockSession,
};
instrumentDrizzleClient(mockDb);
// Direct query through session
await mockSession.query("INSERT INTO users (name) VALUES ($1)", ["John"]);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
expect(spans[0]?.name).toBe("drizzle.insert");
expect(spans[0]?.attributes["db.statement"]).toBe("INSERT INTO users (name) VALUES ($1)");
});
it("only instruments once when called multiple times", async () => {
const mockClient = {
query: vi.fn(() => Promise.resolve({ rows: [] })),
};
const mockDb = {
$client: mockClient,
};
const firstInstrumented = instrumentDrizzleClient(mockDb);
const wrappedQuery = mockClient.query;
const secondInstrumented = instrumentDrizzleClient(mockDb);
expect(firstInstrumented).toBe(mockDb);
expect(secondInstrumented).toBe(mockDb);
expect(mockClient.query).toBe(wrappedQuery);
});
it("respects custom configuration", async () => {
const mockClient = {
query: vi.fn(() => Promise.resolve({ rows: [] })),
};
const mockDb = {
$client: mockClient,
};
const config: InstrumentDrizzleConfig = {
dbSystem: "mysql",
dbName: "test_db",
peerName: "db.example.com",
peerPort: 3306,
};
instrumentDrizzleClient(mockDb, config);
await mockClient.query("SELECT 1");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
if (!span) {
throw new Error("Expected a recorded span");
}
expect(span.attributes["db.system"]).toBe("mysql");
expect(span.attributes["db.name"]).toBe("test_db");
expect(span.attributes["net.peer.name"]).toBe("db.example.com");
expect(span.attributes["net.peer.port"]).toBe(3306);
});
it("returns db unchanged if db is null", () => {
const result = instrumentDrizzleClient(null as any);
expect(result).toBeNull();
});
it("returns db unchanged if no instrumentable properties exist", () => {
const mockDb = {
select: vi.fn(),
// No $client or _.session properties
};
const result = instrumentDrizzleClient(mockDb);
expect(result).toBe(mockDb);
});
it("handles errors in session.execute", async () => {
const error = new Error("database error");
const mockSession = {
execute: vi.fn(() => Promise.reject(error)),
};
const mockDb = {
_: {
session: mockSession,
},
};
instrumentDrizzleClient(mockDb);
await expect(mockSession.execute("DELETE FROM users")).rejects.toThrow(error);
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
expect(spans[0]?.status.code).toBe(SpanStatusCode.ERROR);
});
it("instruments transaction execute calls", async () => {
let txObject: any;
const mockSession = {
transaction: vi.fn(async (callback: any) => {
// Create a mock transaction object
txObject = {
execute: vi.fn(() => Promise.resolve({ rows: [] })),
};
return callback(txObject);
}),
};
const mockDb = {
session: mockSession,
};
instrumentDrizzleClient(mockDb);
// Execute a transaction with RLS queries
await mockSession.transaction(async (tx: any) => {
await tx.execute({ sql: "SET LOCAL role org_role" });
await tx.execute({ sql: "SELECT set_config('request.org_id', $1, true)", params: ["org123"] });
});
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(2);
expect(spans[0]?.name).toBe("drizzle.set");
expect(spans[0]?.attributes["db.statement"]).toBe("SET LOCAL role org_role");
expect(spans[0]?.attributes["db.transaction"]).toBe(true);
expect(spans[1]?.name).toBe("drizzle.select");
expect(spans[1]?.attributes["db.transaction"]).toBe(true);
});
it("only instruments session once when called multiple times", () => {
const mockSession = {
execute: vi.fn(() => Promise.resolve({ rows: [] })),
};
const mockDb = {
_: {
session: mockSession,
},
};
instrumentDrizzleClient(mockDb);
const wrappedExecute = mockSession.execute;
instrumentDrizzleClient(mockDb);
expect(mockSession.execute).toBe(wrappedExecute);
});
});
+726 -33
View File
@@ -22,11 +22,13 @@ export const SEMATTRS_NET_PEER_PORT = "net.peer.port";
type QueryCallback = (error: unknown, result: unknown) => void;
type QueryFunction = (...args: unknown[]) => unknown;
type QueryFunction = (...args: any[]) => any;
interface DrizzleClientLike {
query: QueryFunction;
query?: QueryFunction;
execute?: QueryFunction;
[INSTRUMENTED_FLAG]?: true;
[key: string]: any; // Allow other properties
}
/**
@@ -82,14 +84,14 @@ function extractQueryText(queryArg: unknown): string | undefined {
return queryArg;
}
if (queryArg && typeof queryArg === "object") {
// Generic SQL object format (used by LibSQL, MySQL, and others)
if (typeof (queryArg as { sql?: unknown }).sql === "string") {
return (queryArg as { sql: string }).sql;
}
// PostgreSQL-style query object
if (typeof (queryArg as { text?: unknown }).text === "string") {
return (queryArg as { text: string }).text;
}
// MySQL/generic-style query object
if (typeof (queryArg as { sql?: unknown }).sql === "string") {
return (queryArg as { sql: string }).sql;
}
// Drizzle SQL object
if (
typeof (queryArg as { queryChunks?: unknown }).queryChunks === "object"
@@ -141,41 +143,76 @@ function finalizeSpan(span: Span, error?: unknown): void {
}
/**
* Instruments a Drizzle database client with OpenTelemetry tracing.
* Instruments a database connection pool/client with OpenTelemetry tracing.
*
* This function wraps the client's `query` method to automatically create
* spans for each database operation. It supports both promise-based and
* callback-based query patterns.
* This function wraps the connection's `query` and `execute` methods to create spans for each database
* operation.
* The instrumentation is idempotent - calling it multiple times on the same connection will only
* instrument it once.
*
* The instrumentation is idempotent - calling it multiple times on the same
* client will only instrument it once.
*
* @typeParam TClient - The type of the Drizzle client
* @param client - The Drizzle client instance to instrument
* @typeParam TClient - The type of the database connection pool or client
* @param client - The database connection pool or client to instrument
* @param config - Optional configuration for instrumentation behavior
* @returns The instrumented client (same instance, modified in place)
* @returns The instrumented pool/client (same instance, modified in place)
*
* @example
* ```typescript
* // PostgreSQL with node-postgres
* import { drizzle } from 'drizzle-orm/node-postgres';
* import { Pool } from 'pg';
* import { instrumentDrizzle } from '@kubiks/otel-drizzle';
*
* const pool = new Pool({ connectionString: process.env.DATABASE_URL });
* const db = drizzle(pool);
*
* // Instrument with defaults
* instrumentDrizzle(db);
*
* // Or with custom configuration
* instrumentDrizzle(db, {
* const instrumentedPool = instrumentDrizzle(pool, {
* dbSystem: 'postgresql',
* dbName: 'myapp',
* captureQueryText: true,
* maxQueryTextLength: 500,
* peerName: 'db.example.com',
* peerPort: 5432,
* });
* const db = drizzle({ client: instrumentedPool });
* ```
*
* @example
* ```typescript
* // MySQL with mysql2
* import { drizzle } from 'drizzle-orm/mysql2';
* import mysql from 'mysql2/promise';
* import { instrumentDrizzle } from '@kubiks/otel-drizzle';
*
* const connection = await mysql.createConnection({
* host: 'localhost',
* user: 'root',
* database: 'mydb',
* });
* const instrumentedConnection = instrumentDrizzle(connection, { dbSystem: 'mysql' });
* const db = drizzle({ client: instrumentedConnection });
* ```
*
* @example
* ```typescript
* // SQLite with better-sqlite3
* import { drizzle } from 'drizzle-orm/better-sqlite3';
* import Database from 'better-sqlite3';
* import { instrumentDrizzle } from '@kubiks/otel-drizzle';
*
* const sqlite = new Database('sqlite.db');
* const instrumentedSqlite = instrumentDrizzle(sqlite, { dbSystem: 'sqlite' });
* const db = drizzle({ client: instrumentedSqlite });
* ```
*
* @example
* ```typescript
* // LibSQL/Turso
* import { drizzle } from 'drizzle-orm/libsql';
* import { createClient } from '@libsql/client';
* import { instrumentDrizzle } from '@kubiks/otel-drizzle';
*
* const client = createClient({
* url: process.env.DATABASE_URL!,
* authToken: process.env.DATABASE_AUTH_TOKEN,
* });
* const instrumentedClient = instrumentDrizzle(client, { dbSystem: 'sqlite' });
* const db = drizzle({ client: instrumentedClient });
* ```
*/
export function instrumentDrizzle<TClient extends DrizzleClientLike>(
@@ -185,7 +222,12 @@ export function instrumentDrizzle<TClient extends DrizzleClientLike>(
if (!client) {
return client;
}
if (typeof client.query !== "function") {
// Check if client has query or execute method
const hasQuery = typeof client.query === "function";
const hasExecute = typeof client.execute === "function";
if (!hasQuery && !hasExecute) {
return client;
}
@@ -204,11 +246,18 @@ export function instrumentDrizzle<TClient extends DrizzleClientLike>(
} = config ?? {};
const tracer = trace.getTracer(tracerName);
const originalQuery = client.query;
const instrumentedQuery: QueryFunction = function instrumented(
this: unknown,
...incomingArgs: unknown[]
// Store the original method (query or execute)
const methodName = hasQuery ? "query" : "execute";
const originalMethod = hasQuery ? client.query : client.execute;
if (!originalMethod) {
return client;
}
const instrumentedMethod: QueryFunction = function instrumented(
this: any,
...incomingArgs: any[]
) {
const args = [...incomingArgs];
let callback: QueryCallback | undefined;
@@ -263,7 +312,7 @@ export function instrumentDrizzle<TClient extends DrizzleClientLike>(
};
try {
return originalQuery.apply(this, [...args, wrappedCallback]);
return originalMethod.apply(this, [...args, wrappedCallback]);
} catch (error) {
finalizeSpan(span, error);
throw error;
@@ -274,7 +323,7 @@ export function instrumentDrizzle<TClient extends DrizzleClientLike>(
// Promise-based pattern
return context.with(activeContext, () => {
try {
const result = originalQuery.apply(this, args);
const result = originalMethod.apply(this, args);
return Promise.resolve(result)
.then((value) => {
finalizeSpan(span);
@@ -292,7 +341,651 @@ export function instrumentDrizzle<TClient extends DrizzleClientLike>(
};
client[INSTRUMENTED_FLAG] = true;
client.query = instrumentedQuery;
// Replace the original method with the instrumented one
if (hasQuery) {
client.query = instrumentedMethod;
} else {
client.execute = instrumentedMethod;
}
return client;
}
/**
* Interface for Drizzle database instances with minimal type requirements.
*/
interface DrizzleDbLike {
$client?: DrizzleClientLike | any; // Allow any client type
execute?: QueryFunction; // Direct execute method on db
transaction?: QueryFunction; // Transaction method on db
_?: {
session?: {
execute?: QueryFunction;
[INSTRUMENTED_FLAG]?: true;
[key: string]: any;
};
[key: string]: any;
};
[INSTRUMENTED_FLAG]?: true;
[key: string]: any; // Allow other properties
}
/**
* Instruments a Drizzle database instance with OpenTelemetry tracing.
*
* This function instruments the database at the session level, automatically tracing all database
* operations including query builders, direct SQL execution, and transactions.
*
* The instrumentation is idempotent - calling it multiple times on the same
* database will only instrument it once.
*
* @typeParam TDb - The type of the Drizzle database instance
* @param db - The Drizzle database instance to instrument
* @param config - Optional configuration for instrumentation behavior
* @returns The instrumented database instance (same instance, modified in place)
*
* @example
* ```typescript
* // PostgreSQL with postgres.js
* import { drizzle } from 'drizzle-orm/postgres-js';
* import postgres from 'postgres';
* import { instrumentDrizzleClient } from '@kubiks/otel-drizzle';
*
* // Using connection string
* const db = drizzle(process.env.DATABASE_URL!);
* instrumentDrizzleClient(db, { dbSystem: 'postgresql' });
*
* // Or with a client instance
* const queryClient = postgres(process.env.DATABASE_URL!);
* const db = drizzle({ client: queryClient });
* instrumentDrizzleClient(db, { dbSystem: 'postgresql' });
* ```
*
* @example
* ```typescript
* // PostgreSQL with node-postgres (pg)
* import { drizzle } from 'drizzle-orm/node-postgres';
* import { Pool } from 'pg';
* import { instrumentDrizzleClient } from '@kubiks/otel-drizzle';
*
* // Using connection string
* const db = drizzle(process.env.DATABASE_URL!);
* instrumentDrizzleClient(db, { dbSystem: 'postgresql' });
*
* // Or with a pool
* const pool = new Pool({ connectionString: process.env.DATABASE_URL });
* const db = drizzle({ client: pool });
* instrumentDrizzleClient(db, {
* dbSystem: 'postgresql',
* dbName: 'myapp',
* peerName: 'db.example.com',
* peerPort: 5432,
* });
* ```
*
* @example
* ```typescript
* // MySQL with mysql2
* import { drizzle } from 'drizzle-orm/mysql2';
* import mysql from 'mysql2/promise';
* import { instrumentDrizzleClient } from '@kubiks/otel-drizzle';
*
* // Using connection string
* const db = drizzle(process.env.DATABASE_URL!);
* instrumentDrizzleClient(db, { dbSystem: 'mysql' });
*
* // Or with a connection
* const connection = await mysql.createConnection({
* host: 'localhost',
* user: 'root',
* database: 'mydb',
* });
* const db = drizzle({ client: connection });
* instrumentDrizzleClient(db, {
* dbSystem: 'mysql',
* dbName: 'mydb',
* peerName: 'localhost',
* peerPort: 3306,
* });
* ```
*
* @example
* ```typescript
* // SQLite with better-sqlite3
* import { drizzle } from 'drizzle-orm/better-sqlite3';
* import Database from 'better-sqlite3';
* import { instrumentDrizzleClient } from '@kubiks/otel-drizzle';
*
* // Using file path
* const db = drizzle('sqlite.db');
* instrumentDrizzleClient(db, { dbSystem: 'sqlite' });
*
* // Or with a Database instance
* const sqlite = new Database('sqlite.db');
* const db = drizzle({ client: sqlite });
* instrumentDrizzleClient(db, { dbSystem: 'sqlite' });
* ```
*
* @example
* ```typescript
* // SQLite with LibSQL/Turso
* import { drizzle } from 'drizzle-orm/libsql';
* import { createClient } from '@libsql/client';
* import { instrumentDrizzleClient } from '@kubiks/otel-drizzle';
*
* // Using connection config
* const db = drizzle({
* connection: {
* url: process.env.DATABASE_URL!,
* authToken: process.env.DATABASE_AUTH_TOKEN,
* }
* });
* instrumentDrizzleClient(db, { dbSystem: 'sqlite' });
*
* // Or with a client instance
* const client = createClient({
* url: process.env.DATABASE_URL!,
* authToken: process.env.DATABASE_AUTH_TOKEN,
* });
* const db = drizzle({ client });
* instrumentDrizzleClient(db, { dbSystem: 'sqlite' });
* ```
*/
export function instrumentDrizzleClient<TDb extends DrizzleDbLike>(
db: TDb,
config?: InstrumentDrizzleConfig,
): TDb {
if (!db) {
return db;
}
// Check if already instrumented
if (db[INSTRUMENTED_FLAG]) {
return db;
}
const {
tracerName = DEFAULT_TRACER_NAME,
dbSystem = DEFAULT_DB_SYSTEM,
dbName,
captureQueryText = true,
maxQueryTextLength = 1000,
peerName,
peerPort,
} = config ?? {};
const tracer = trace.getTracer(tracerName);
let instrumented = false;
// First priority: Instrument the session directly
// This is where all queries actually go through
if ((db as any).session && !instrumented) {
const session = (db as any).session;
// Check if session has prepareQuery method (used by select/insert/update/delete)
if (
typeof session.prepareQuery === "function" &&
!session[INSTRUMENTED_FLAG]
) {
const originalPrepareQuery = session.prepareQuery;
session.prepareQuery = function (...args: any[]) {
const prepared = originalPrepareQuery.apply(this, args);
// Wrap the prepared query's execute method
if (prepared && typeof prepared.execute === "function") {
const originalPreparedExecute = prepared.execute;
prepared.execute = function (this: any, ...executeArgs: any[]) {
// Extract query information from the query object
const queryObj = args[0]; // The query object passed to prepareQuery
const queryText =
queryObj?.sql ||
queryObj?.queryString ||
extractQueryText(queryObj);
const operation = queryText
? extractOperation(queryText)
: undefined;
const spanName = operation
? `drizzle.${operation.toLowerCase()}`
: "drizzle.query";
// Start span
const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT });
span.setAttribute(SEMATTRS_DB_SYSTEM, dbSystem);
if (operation) {
span.setAttribute(SEMATTRS_DB_OPERATION, operation);
}
if (dbName) {
span.setAttribute(SEMATTRS_DB_NAME, dbName);
}
if (captureQueryText && queryText !== undefined) {
const sanitized = sanitizeQueryText(
queryText,
maxQueryTextLength,
);
span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized);
}
if (peerName) {
span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName);
}
if (peerPort) {
span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort);
}
const activeContext = trace.setSpan(context.active(), span);
// Execute the prepared query
return context.with(activeContext, () => {
try {
const result = originalPreparedExecute.apply(this, executeArgs);
return Promise.resolve(result)
.then((value) => {
finalizeSpan(span);
return value;
})
.catch((error) => {
finalizeSpan(span, error);
throw error;
});
} catch (error) {
finalizeSpan(span, error);
throw error;
}
});
};
}
return prepared;
};
session[INSTRUMENTED_FLAG] = true;
instrumented = true;
}
// Also instrument direct query method if exists
if (
typeof session.query === "function" &&
!session[INSTRUMENTED_FLAG + "_query"]
) {
const originalQuery = session.query;
session.query = function (this: any, queryString: string, params: any[]) {
const operation = queryString
? extractOperation(queryString)
: undefined;
const spanName = operation
? `drizzle.${operation.toLowerCase()}`
: "drizzle.query";
// Start span
const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT });
span.setAttribute(SEMATTRS_DB_SYSTEM, dbSystem);
if (operation) {
span.setAttribute(SEMATTRS_DB_OPERATION, operation);
}
if (dbName) {
span.setAttribute(SEMATTRS_DB_NAME, dbName);
}
if (captureQueryText && queryString !== undefined) {
const sanitized = sanitizeQueryText(queryString, maxQueryTextLength);
span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized);
}
if (peerName) {
span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName);
}
if (peerPort) {
span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort);
}
const activeContext = trace.setSpan(context.active(), span);
// Execute the query
return context.with(activeContext, () => {
try {
const result = originalQuery.apply(this, [queryString, params]);
return Promise.resolve(result)
.then((value) => {
finalizeSpan(span);
return value;
})
.catch((error) => {
finalizeSpan(span, error);
throw error;
});
} catch (error) {
finalizeSpan(span, error);
throw error;
}
});
};
session[INSTRUMENTED_FLAG + "_query"] = true;
instrumented = true;
}
// Instrument transaction method to ensure transaction sessions are also instrumented
if (
typeof session.transaction === "function" &&
!session[INSTRUMENTED_FLAG + "_transaction"]
) {
const originalTransaction = session.transaction;
session.transaction = function (
this: any,
transactionCallback: any,
...restArgs: any[]
) {
// Wrap the transaction callback to instrument the tx object
const wrappedCallback = async function (tx: any) {
// Instrument the transaction's session if it has one
if (tx && (tx.session || tx._?.session || tx)) {
const txSession = tx.session || tx._?.session || tx;
// Instrument tx.execute if it exists
if (
typeof tx.execute === "function" &&
!tx[INSTRUMENTED_FLAG + "_execute"]
) {
const originalTxExecute = tx.execute;
tx.execute = function (this: any, ...executeArgs: any[]) {
const queryText = extractQueryText(executeArgs[0]);
const operation = queryText
? extractOperation(queryText)
: undefined;
const spanName = operation
? `drizzle.${operation.toLowerCase()}`
: "drizzle.query";
// Start span
const span = tracer.startSpan(spanName, {
kind: SpanKind.CLIENT,
});
span.setAttribute(SEMATTRS_DB_SYSTEM, dbSystem);
span.setAttribute("db.transaction", true);
if (operation) {
span.setAttribute(SEMATTRS_DB_OPERATION, operation);
}
if (dbName) {
span.setAttribute(SEMATTRS_DB_NAME, dbName);
}
if (captureQueryText && queryText !== undefined) {
const sanitized = sanitizeQueryText(
queryText,
maxQueryTextLength,
);
span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized);
}
if (peerName) {
span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName);
}
if (peerPort) {
span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort);
}
const activeContext = trace.setSpan(context.active(), span);
// Execute the query
return context.with(activeContext, () => {
try {
const result = originalTxExecute.apply(this, executeArgs);
return Promise.resolve(result)
.then((value) => {
finalizeSpan(span);
return value;
})
.catch((error) => {
finalizeSpan(span, error);
throw error;
});
} catch (error) {
finalizeSpan(span, error);
throw error;
}
});
};
tx[INSTRUMENTED_FLAG + "_execute"] = true;
}
// Also instrument txSession.prepareQuery if it exists
if (
typeof txSession.prepareQuery === "function" &&
!txSession[INSTRUMENTED_FLAG + "_tx"]
) {
const originalTxPrepareQuery = txSession.prepareQuery;
txSession.prepareQuery = function (...prepareArgs: any[]) {
const prepared = originalTxPrepareQuery.apply(
this,
prepareArgs,
);
// Wrap the prepared query's execute method
if (prepared && typeof prepared.execute === "function") {
const originalPreparedExecute = prepared.execute;
prepared.execute = function (
this: any,
...executeArgs: any[]
) {
// Extract query information from the query object
const queryObj = prepareArgs[0]; // The query object passed to prepareQuery
const queryText =
queryObj?.sql ||
queryObj?.queryString ||
extractQueryText(queryObj);
const operation = queryText
? extractOperation(queryText)
: undefined;
const spanName = operation
? `drizzle.${operation.toLowerCase()}`
: "drizzle.query";
// Start span
const span = tracer.startSpan(spanName, {
kind: SpanKind.CLIENT,
});
span.setAttribute(SEMATTRS_DB_SYSTEM, dbSystem);
span.setAttribute("db.transaction", true);
if (operation) {
span.setAttribute(SEMATTRS_DB_OPERATION, operation);
}
if (dbName) {
span.setAttribute(SEMATTRS_DB_NAME, dbName);
}
if (captureQueryText && queryText !== undefined) {
const sanitized = sanitizeQueryText(
queryText,
maxQueryTextLength,
);
span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized);
}
if (peerName) {
span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName);
}
if (peerPort) {
span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort);
}
const activeContext = trace.setSpan(context.active(), span);
// Execute the prepared query
return context.with(activeContext, () => {
try {
const result = originalPreparedExecute.apply(
this,
executeArgs,
);
return Promise.resolve(result)
.then((value) => {
finalizeSpan(span);
return value;
})
.catch((error) => {
finalizeSpan(span, error);
throw error;
});
} catch (error) {
finalizeSpan(span, error);
throw error;
}
});
};
}
return prepared;
};
txSession[INSTRUMENTED_FLAG + "_tx"] = true;
}
}
// Call the original callback with the instrumented tx
return transactionCallback(tx);
};
// Call the original transaction with the wrapped callback
return originalTransaction.apply(this, [wrappedCallback, ...restArgs]);
};
session[INSTRUMENTED_FLAG + "_transaction"] = true;
instrumented = true;
}
}
if (db.$client && !instrumented) {
const client = db.$client;
// Check if client has query or execute function
if (
typeof client.query === "function" ||
typeof client.execute === "function"
) {
instrumentDrizzle(client, config);
instrumented = true;
}
}
// Third priority: Try to instrument via session.execute as fallback
if (
db._ &&
db._.session &&
typeof db._.session.execute === "function" &&
!instrumented
) {
const session = db._.session;
// Check if already instrumented
if (session[INSTRUMENTED_FLAG]) {
return db;
}
const {
tracerName = DEFAULT_TRACER_NAME,
dbSystem = DEFAULT_DB_SYSTEM,
dbName,
captureQueryText = true,
maxQueryTextLength = 1000,
peerName,
peerPort,
} = config ?? {};
const tracer = trace.getTracer(tracerName);
const originalExecute = session.execute;
if (!originalExecute) {
return db;
}
const instrumentedExecute: QueryFunction = function instrumented(
this: any,
...args: any[]
) {
// Extract query information
const queryText = extractQueryText(args[0]);
const operation = queryText ? extractOperation(queryText) : undefined;
const spanName = operation
? `drizzle.${operation.toLowerCase()}`
: "drizzle.query";
// Start span
const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT });
span.setAttribute(SEMATTRS_DB_SYSTEM, dbSystem);
if (operation) {
span.setAttribute(SEMATTRS_DB_OPERATION, operation);
}
if (dbName) {
span.setAttribute(SEMATTRS_DB_NAME, dbName);
}
if (captureQueryText && queryText !== undefined) {
const sanitized = sanitizeQueryText(queryText, maxQueryTextLength);
span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized);
}
if (peerName) {
span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName);
}
if (peerPort) {
span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort);
}
const activeContext = trace.setSpan(context.active(), span);
// Promise-based pattern (session.execute is typically promise-based)
return context.with(activeContext, () => {
try {
const result = originalExecute.apply(this, args);
return Promise.resolve(result)
.then((value) => {
finalizeSpan(span);
return value;
})
.catch((error) => {
finalizeSpan(span, error);
throw error;
});
} catch (error) {
finalizeSpan(span, error);
throw error;
}
});
};
session[INSTRUMENTED_FLAG] = true;
session.execute = instrumentedExecute;
instrumented = true;
}
// Mark the db as instrumented if we instrumented anything
if (instrumented) {
db[INSTRUMENTED_FLAG] = true;
}
return db;
}
+1 -1
View File
@@ -66,7 +66,7 @@ importers:
specifier: ^0.36.4
version: 0.36.4(@opentelemetry/api@1.9.0)(@types/pg@8.15.5)(@types/react@18.2.46)(kysely@0.28.7)(postgres@3.4.7)(react@18.2.0)
postgres:
specifier: ^3.4.5
specifier: ^3.4.7
version: 3.4.7
rimraf:
specifier: 3.0.2