diff --git a/.changeset/clickhouse-peer-dep-update.md b/.changeset/clickhouse-peer-dep-update.md new file mode 100644 index 0000000..8f60c52 --- /dev/null +++ b/.changeset/clickhouse-peer-dep-update.md @@ -0,0 +1,5 @@ +--- +"@kubiks/otel-clickhouse": minor +--- + +Add instrumentation for insert(), exec(), and command() methods. Previously only query() was instrumented, which meant insert operations were not traced despite the README claiming they were. Also update @clickhouse/client peer dependency to require >=0.2.7 to ensure X-ClickHouse-Summary parsing support. diff --git a/packages/otel-clickhouse/package.json b/packages/otel-clickhouse/package.json index 08292d1..1410811 100644 --- a/packages/otel-clickhouse/package.json +++ b/packages/otel-clickhouse/package.json @@ -46,7 +46,7 @@ "vitest": "0.33.0" }, "peerDependencies": { - "@clickhouse/client": ">=0.2.0", + "@clickhouse/client": ">=0.2.7", "@opentelemetry/api": ">=1.9.0 <2.0.0" }, "peerDependenciesMeta": { diff --git a/packages/otel-clickhouse/src/index.test.ts b/packages/otel-clickhouse/src/index.test.ts index fd1fd2f..a25843b 100644 --- a/packages/otel-clickhouse/src/index.test.ts +++ b/packages/otel-clickhouse/src/index.test.ts @@ -9,6 +9,9 @@ import { instrumentClickHouse, type InstrumentClickHouseConfig } from "./index"; interface MockClickHouseClient { query: (params: any) => Promise; + insert: (params: any) => Promise; + exec: (params: any) => Promise; + command: (params: any) => Promise; } describe("instrumentClickHouse", () => { @@ -29,19 +32,32 @@ describe("instrumentClickHouse", () => { trace.disable(); }); - it("wraps the query method only once", () => { + it("wraps all methods only once", () => { const client = { query: vi.fn(), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any); expect(instrumented.query).not.toBeUndefined(); + expect(instrumented.insert).not.toBeUndefined(); + expect(instrumented.exec).not.toBeUndefined(); + expect(instrumented.command).not.toBeUndefined(); + const wrappedQuery = instrumented.query; + const wrappedInsert = instrumented.insert; + const wrappedExec = instrumented.exec; + const wrappedCommand = instrumented.command; instrumentClickHouse(client as any); expect(instrumented.query).toBe(wrappedQuery); + expect(instrumented.insert).toBe(wrappedInsert); + expect(instrumented.exec).toBe(wrappedExec); + expect(instrumented.command).toBe(wrappedCommand); }); it("records a successful query", async () => { @@ -62,6 +78,9 @@ describe("instrumentClickHouse", () => { }, }) ), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any); @@ -97,6 +116,9 @@ describe("instrumentClickHouse", () => { }, }) ), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any, { @@ -120,6 +142,9 @@ describe("instrumentClickHouse", () => { const error = new Error("Query failed"); const client = { query: vi.fn(() => Promise.reject(error)), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any); @@ -141,6 +166,9 @@ describe("instrumentClickHouse", () => { it("respects captureQueryText option", async () => { const client = { query: vi.fn(() => Promise.resolve({ response_headers: {} })), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any, { @@ -160,6 +188,9 @@ describe("instrumentClickHouse", () => { const longQuery = "SELECT * FROM users WHERE " + "a = 1 AND ".repeat(200); const client = { query: vi.fn(() => Promise.resolve({ response_headers: {} })), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any, { @@ -180,6 +211,9 @@ describe("instrumentClickHouse", () => { it("includes database name when configured", async () => { const client = { query: vi.fn(() => Promise.resolve({ response_headers: {} })), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any, { @@ -198,6 +232,9 @@ describe("instrumentClickHouse", () => { it("includes network metadata when configured", async () => { const client = { query: vi.fn(() => Promise.resolve({ response_headers: {} })), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any, { @@ -218,6 +255,9 @@ describe("instrumentClickHouse", () => { it("detects different operation types", async () => { const client = { query: vi.fn(() => Promise.resolve({ response_headers: {} })), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any); @@ -242,6 +282,9 @@ describe("instrumentClickHouse", () => { it("handles queries without execution stats", async () => { const client = { query: vi.fn(() => Promise.resolve({ response_headers: {} })), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any, { @@ -271,6 +314,9 @@ describe("instrumentClickHouse", () => { }, }) ), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(), } as unknown as MockClickHouseClient; const instrumented = instrumentClickHouse(client as any, { @@ -285,4 +331,274 @@ describe("instrumentClickHouse", () => { const span = spans[0]; expect(span.attributes["clickhouse.read_rows"]).toBeUndefined(); }); + + describe("insert instrumentation", () => { + it("records a successful insert", async () => { + const client = { + query: vi.fn(), + insert: vi.fn(() => + Promise.resolve({ + executed: true, + query_id: "test-query-id", + response_headers: { + "x-clickhouse-summary": JSON.stringify({ + read_rows: "0", + read_bytes: "0", + written_rows: "100", + written_bytes: "4096", + total_rows_to_read: "0", + result_rows: "0", + result_bytes: "0", + elapsed_ns: "500000", + }), + }, + }) + ), + exec: vi.fn(), + command: vi.fn(), + } as unknown as MockClickHouseClient; + + const instrumented = instrumentClickHouse(client as any); + + await instrumented.insert({ + table: "users", + values: [{ id: 1, name: "Alice" }], + format: "JSONEachRow", + }); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("clickhouse.insert"); + expect(span.status.code).toBe(SpanStatusCode.OK); + expect(span.attributes["db.system"]).toBe("clickhouse"); + expect(span.attributes["db.operation"]).toBe("INSERT"); + expect(span.attributes["db.statement"]).toBe( + "INSERT INTO users FORMAT JSONEachRow" + ); + expect(span.attributes["clickhouse.written_rows"]).toBe(100); + expect(span.attributes["clickhouse.written_bytes"]).toBe(4096); + }); + + it("records insert with columns", async () => { + const client = { + query: vi.fn(), + insert: vi.fn(() => + Promise.resolve({ + executed: true, + query_id: "test-query-id", + response_headers: {}, + }) + ), + exec: vi.fn(), + command: vi.fn(), + } as unknown as MockClickHouseClient; + + const instrumented = instrumentClickHouse(client as any); + + await instrumented.insert({ + table: "users", + values: [{ id: 1, name: "Alice" }], + format: "JSONEachRow", + columns: ["id", "name"], + }); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.attributes["db.statement"]).toBe( + "INSERT INTO users (id, name) FORMAT JSONEachRow" + ); + }); + + it("records insert with except columns", async () => { + const client = { + query: vi.fn(), + insert: vi.fn(() => + Promise.resolve({ + executed: true, + query_id: "test-query-id", + response_headers: {}, + }) + ), + exec: vi.fn(), + command: vi.fn(), + } as unknown as MockClickHouseClient; + + const instrumented = instrumentClickHouse(client as any); + + await instrumented.insert({ + table: "users", + values: [{ id: 1, name: "Alice" }], + columns: { except: ["created_at"] }, + }); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.attributes["db.statement"]).toBe( + "INSERT INTO users (* EXCEPT (created_at)) FORMAT JSONCompactEachRow" + ); + }); + + it("records a failed insert", async () => { + const error = new Error("Insert failed"); + const client = { + query: vi.fn(), + insert: vi.fn(() => Promise.reject(error)), + exec: vi.fn(), + command: vi.fn(), + } as unknown as MockClickHouseClient; + + const instrumented = instrumentClickHouse(client as any); + + await expect( + instrumented.insert({ + table: "users", + values: [{ id: 1, name: "Alice" }], + }) + ).rejects.toThrow("Insert failed"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("clickhouse.insert"); + expect(span.status.code).toBe(SpanStatusCode.ERROR); + expect(span.events).toHaveLength(1); + expect(span.events[0].name).toBe("exception"); + }); + }); + + describe("exec instrumentation", () => { + it("records a successful exec", async () => { + const client = { + query: vi.fn(), + insert: vi.fn(), + exec: vi.fn(() => + Promise.resolve({ + query_id: "test-query-id", + stream: {}, + }) + ), + command: vi.fn(), + } as unknown as MockClickHouseClient; + + const instrumented = instrumentClickHouse(client as any); + + await instrumented.exec({ + query: "INSERT INTO users FORMAT JSONEachRow", + }); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("clickhouse.insert"); + expect(span.status.code).toBe(SpanStatusCode.OK); + expect(span.attributes["db.system"]).toBe("clickhouse"); + expect(span.attributes["db.operation"]).toBe("INSERT"); + expect(span.attributes["db.statement"]).toBe( + "INSERT INTO users FORMAT JSONEachRow" + ); + }); + + it("records a failed exec", async () => { + const error = new Error("Exec failed"); + const client = { + query: vi.fn(), + insert: vi.fn(), + exec: vi.fn(() => Promise.reject(error)), + command: vi.fn(), + } as unknown as MockClickHouseClient; + + const instrumented = instrumentClickHouse(client as any); + + await expect( + instrumented.exec({ query: "CREATE TABLE test (id UInt32)" }) + ).rejects.toThrow("Exec failed"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.status.code).toBe(SpanStatusCode.ERROR); + expect(span.events).toHaveLength(1); + expect(span.events[0].name).toBe("exception"); + }); + }); + + describe("command instrumentation", () => { + it("records a successful command", async () => { + const client = { + query: vi.fn(), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(() => + Promise.resolve({ + query_id: "test-query-id", + response_headers: { + "x-clickhouse-summary": JSON.stringify({ + read_rows: "0", + read_bytes: "0", + written_rows: "0", + written_bytes: "0", + total_rows_to_read: "0", + result_rows: "0", + result_bytes: "0", + elapsed_ns: "100000", + }), + }, + }) + ), + } as unknown as MockClickHouseClient; + + const instrumented = instrumentClickHouse(client as any); + + await instrumented.command({ + query: "CREATE TABLE test (id UInt32) ENGINE = Memory", + }); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("clickhouse.create"); + expect(span.status.code).toBe(SpanStatusCode.OK); + expect(span.attributes["db.system"]).toBe("clickhouse"); + expect(span.attributes["db.operation"]).toBe("CREATE"); + expect(span.attributes["db.statement"]).toBe( + "CREATE TABLE test (id UInt32) ENGINE = Memory" + ); + expect(span.attributes["clickhouse.elapsed_ns"]).toBe(100000); + }); + + it("records a failed command", async () => { + const error = new Error("Command failed"); + const client = { + query: vi.fn(), + insert: vi.fn(), + exec: vi.fn(), + command: vi.fn(() => Promise.reject(error)), + } as unknown as MockClickHouseClient; + + const instrumented = instrumentClickHouse(client as any); + + await expect( + instrumented.command({ query: "DROP TABLE test" }) + ).rejects.toThrow("Command failed"); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(1); + + const span = spans[0]; + expect(span.name).toBe("clickhouse.drop"); + expect(span.status.code).toBe(SpanStatusCode.ERROR); + expect(span.events).toHaveLength(1); + expect(span.events[0].name).toBe("exception"); + }); + }); }); diff --git a/packages/otel-clickhouse/src/index.ts b/packages/otel-clickhouse/src/index.ts index 0f6494c..7fc6c4f 100644 --- a/packages/otel-clickhouse/src/index.ts +++ b/packages/otel-clickhouse/src/index.ts @@ -9,6 +9,9 @@ import type { ClickHouseClient, DataFormat, QueryParams, + InsertParams, + ExecParams, + CommandParams, } from "@clickhouse/client"; const DEFAULT_TRACER_NAME = "@kubiks/otel-clickhouse"; @@ -258,8 +261,9 @@ function addExecutionStats(span: Span, summary: ClickHouseSummary): void { /** * Instruments a ClickHouse client with OpenTelemetry tracing. * - * This function wraps the client's `query` method to create spans for each database - * operation, including detailed execution statistics from ClickHouse response headers. + * This function wraps the client's `query`, `insert`, `exec`, and `command` methods to create + * spans for each database operation, including detailed execution statistics from ClickHouse + * response headers. * * The instrumentation is idempotent - calling it multiple times on the same client will only * instrument it once. @@ -403,6 +407,188 @@ export function instrumentClickHouse( } }; + // Store the original insert method + const originalInsert = client.insert.bind(client); + + // Create instrumented insert method + client.insert = async function instrumentedInsert( + params: InsertParams + ): Promise { + const spanName = "clickhouse.insert"; + + // Start span + const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT }); + span.setAttribute(SEMATTRS_DB_SYSTEM, "clickhouse"); + span.setAttribute(SEMATTRS_DB_OPERATION, "INSERT"); + + if (dbName) { + span.setAttribute(SEMATTRS_DB_NAME, dbName); + } + + // Capture the table name and format + if (captureQueryText) { + const table = params.table; + const format = params.format || "JSONCompactEachRow"; + let statement = `INSERT INTO ${table}`; + + if (params.columns) { + if (Array.isArray(params.columns)) { + statement += ` (${params.columns.join(", ")})`; + } else if ("except" in params.columns) { + statement += ` (* EXCEPT (${params.columns.except.join(", ")}))`; + } + } + + statement += ` FORMAT ${format}`; + const sanitized = sanitizeQueryText(statement, maxQueryTextLength); + span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized); + } + + if (peerName) { + span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName); + } + + if (peerPort) { + span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort); + } + + const activeContext = trace.setSpan(context.active(), span); + + try { + const result = await context.with(activeContext, () => + originalInsert(params) + ); + + // Extract and add execution statistics from response headers + if (captureExecutionStats && result.executed) { + const summary = extractSummary(result.response_headers); + if (summary) { + addExecutionStats(span, summary); + } + } + + finalizeSpan(span); + return result; + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }; + + // Store the original exec method + const originalExec = client.exec.bind(client); + + // Create instrumented exec method + client.exec = async function instrumentedExec( + params: ExecParams + ): Promise { + const queryText = params.query; + const operation = queryText ? extractOperation(queryText) : undefined; + const spanName = operation + ? `clickhouse.${operation.toLowerCase()}` + : "clickhouse.exec"; + + // Start span + const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT }); + span.setAttribute(SEMATTRS_DB_SYSTEM, "clickhouse"); + + if (operation) { + span.setAttribute(SEMATTRS_DB_OPERATION, operation); + } + + if (dbName) { + span.setAttribute(SEMATTRS_DB_NAME, dbName); + } + + if (captureQueryText && queryText !== undefined) { + const sanitized = sanitizeQueryText(queryText, maxQueryTextLength); + span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized); + } + + if (peerName) { + span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName); + } + + if (peerPort) { + span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort); + } + + const activeContext = trace.setSpan(context.active(), span); + + try { + const result = await context.with(activeContext, () => + originalExec(params) + ); + + finalizeSpan(span); + return result; + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }; + + // Store the original command method + const originalCommand = client.command.bind(client); + + // Create instrumented command method + client.command = async function instrumentedCommand( + params: CommandParams + ): Promise { + const queryText = params.query; + const operation = queryText ? extractOperation(queryText) : undefined; + const spanName = operation + ? `clickhouse.${operation.toLowerCase()}` + : "clickhouse.command"; + + // Start span + const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT }); + span.setAttribute(SEMATTRS_DB_SYSTEM, "clickhouse"); + + if (operation) { + span.setAttribute(SEMATTRS_DB_OPERATION, operation); + } + + if (dbName) { + span.setAttribute(SEMATTRS_DB_NAME, dbName); + } + + if (captureQueryText && queryText !== undefined) { + const sanitized = sanitizeQueryText(queryText, maxQueryTextLength); + span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized); + } + + if (peerName) { + span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName); + } + + if (peerPort) { + span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort); + } + + const activeContext = trace.setSpan(context.active(), span); + + try { + const result = await context.with(activeContext, () => + originalCommand(params) + ); + + // Extract and add execution statistics from response headers + if (captureExecutionStats) { + const summary = extractSummary(result.response_headers); + if (summary) { + addExecutionStats(span, summary); + } + } + + finalizeSpan(span); + return result; + } catch (error) { + finalizeSpan(span, error); + throw error; + } + }; + // Mark as instrumented instrumentedClient[INSTRUMENTED_FLAG] = true;