instrument all commands

This commit is contained in:
Alex Holovach
2025-11-04 01:41:44 -06:00
parent 4948896fb6
commit d1dc4099a5
3 changed files with 507 additions and 5 deletions

View File

@@ -1,5 +1,5 @@
---
"@kubiks/otel-clickhouse": patch
"@kubiks/otel-clickhouse": minor
---
Update @clickhouse/client peer dependency to require >=0.2.7 to ensure X-ClickHouse-Summary parsing support
Add instrumentation for insert(), exec(), and command() methods. Previously only query() was instrumented, which meant insert operations were not traced despite the README claiming they were. Also update @clickhouse/client peer dependency to require >=0.2.7 to ensure X-ClickHouse-Summary parsing support.

View File

@@ -9,6 +9,9 @@ import { instrumentClickHouse, type InstrumentClickHouseConfig } from "./index";
interface MockClickHouseClient {
query: (params: any) => Promise<any>;
insert: (params: any) => Promise<any>;
exec: (params: any) => Promise<any>;
command: (params: any) => Promise<any>;
}
describe("instrumentClickHouse", () => {
@@ -29,19 +32,32 @@ describe("instrumentClickHouse", () => {
trace.disable();
});
it("wraps the query method only once", () => {
it("wraps all methods only once", () => {
const client = {
query: vi.fn(),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
expect(instrumented.query).not.toBeUndefined();
expect(instrumented.insert).not.toBeUndefined();
expect(instrumented.exec).not.toBeUndefined();
expect(instrumented.command).not.toBeUndefined();
const wrappedQuery = instrumented.query;
const wrappedInsert = instrumented.insert;
const wrappedExec = instrumented.exec;
const wrappedCommand = instrumented.command;
instrumentClickHouse(client as any);
expect(instrumented.query).toBe(wrappedQuery);
expect(instrumented.insert).toBe(wrappedInsert);
expect(instrumented.exec).toBe(wrappedExec);
expect(instrumented.command).toBe(wrappedCommand);
});
it("records a successful query", async () => {
@@ -62,6 +78,9 @@ describe("instrumentClickHouse", () => {
},
})
),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
@@ -97,6 +116,9 @@ describe("instrumentClickHouse", () => {
},
})
),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any, {
@@ -120,6 +142,9 @@ describe("instrumentClickHouse", () => {
const error = new Error("Query failed");
const client = {
query: vi.fn(() => Promise.reject(error)),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
@@ -141,6 +166,9 @@ describe("instrumentClickHouse", () => {
it("respects captureQueryText option", async () => {
const client = {
query: vi.fn(() => Promise.resolve({ response_headers: {} })),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any, {
@@ -160,6 +188,9 @@ describe("instrumentClickHouse", () => {
const longQuery = "SELECT * FROM users WHERE " + "a = 1 AND ".repeat(200);
const client = {
query: vi.fn(() => Promise.resolve({ response_headers: {} })),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any, {
@@ -180,6 +211,9 @@ describe("instrumentClickHouse", () => {
it("includes database name when configured", async () => {
const client = {
query: vi.fn(() => Promise.resolve({ response_headers: {} })),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any, {
@@ -198,6 +232,9 @@ describe("instrumentClickHouse", () => {
it("includes network metadata when configured", async () => {
const client = {
query: vi.fn(() => Promise.resolve({ response_headers: {} })),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any, {
@@ -218,6 +255,9 @@ describe("instrumentClickHouse", () => {
it("detects different operation types", async () => {
const client = {
query: vi.fn(() => Promise.resolve({ response_headers: {} })),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
@@ -242,6 +282,9 @@ describe("instrumentClickHouse", () => {
it("handles queries without execution stats", async () => {
const client = {
query: vi.fn(() => Promise.resolve({ response_headers: {} })),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any, {
@@ -271,6 +314,9 @@ describe("instrumentClickHouse", () => {
},
})
),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any, {
@@ -285,4 +331,274 @@ describe("instrumentClickHouse", () => {
const span = spans[0];
expect(span.attributes["clickhouse.read_rows"]).toBeUndefined();
});
describe("insert instrumentation", () => {
it("records a successful insert", async () => {
const client = {
query: vi.fn(),
insert: vi.fn(() =>
Promise.resolve({
executed: true,
query_id: "test-query-id",
response_headers: {
"x-clickhouse-summary": JSON.stringify({
read_rows: "0",
read_bytes: "0",
written_rows: "100",
written_bytes: "4096",
total_rows_to_read: "0",
result_rows: "0",
result_bytes: "0",
elapsed_ns: "500000",
}),
},
})
),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
await instrumented.insert({
table: "users",
values: [{ id: 1, name: "Alice" }],
format: "JSONEachRow",
});
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("clickhouse.insert");
expect(span.status.code).toBe(SpanStatusCode.OK);
expect(span.attributes["db.system"]).toBe("clickhouse");
expect(span.attributes["db.operation"]).toBe("INSERT");
expect(span.attributes["db.statement"]).toBe(
"INSERT INTO users FORMAT JSONEachRow"
);
expect(span.attributes["clickhouse.written_rows"]).toBe(100);
expect(span.attributes["clickhouse.written_bytes"]).toBe(4096);
});
it("records insert with columns", async () => {
const client = {
query: vi.fn(),
insert: vi.fn(() =>
Promise.resolve({
executed: true,
query_id: "test-query-id",
response_headers: {},
})
),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
await instrumented.insert({
table: "users",
values: [{ id: 1, name: "Alice" }],
format: "JSONEachRow",
columns: ["id", "name"],
});
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.attributes["db.statement"]).toBe(
"INSERT INTO users (id, name) FORMAT JSONEachRow"
);
});
it("records insert with except columns", async () => {
const client = {
query: vi.fn(),
insert: vi.fn(() =>
Promise.resolve({
executed: true,
query_id: "test-query-id",
response_headers: {},
})
),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
await instrumented.insert({
table: "users",
values: [{ id: 1, name: "Alice" }],
columns: { except: ["created_at"] },
});
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.attributes["db.statement"]).toBe(
"INSERT INTO users (* EXCEPT (created_at)) FORMAT JSONCompactEachRow"
);
});
it("records a failed insert", async () => {
const error = new Error("Insert failed");
const client = {
query: vi.fn(),
insert: vi.fn(() => Promise.reject(error)),
exec: vi.fn(),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
await expect(
instrumented.insert({
table: "users",
values: [{ id: 1, name: "Alice" }],
})
).rejects.toThrow("Insert failed");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("clickhouse.insert");
expect(span.status.code).toBe(SpanStatusCode.ERROR);
expect(span.events).toHaveLength(1);
expect(span.events[0].name).toBe("exception");
});
});
describe("exec instrumentation", () => {
it("records a successful exec", async () => {
const client = {
query: vi.fn(),
insert: vi.fn(),
exec: vi.fn(() =>
Promise.resolve({
query_id: "test-query-id",
stream: {},
})
),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
await instrumented.exec({
query: "INSERT INTO users FORMAT JSONEachRow",
});
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("clickhouse.insert");
expect(span.status.code).toBe(SpanStatusCode.OK);
expect(span.attributes["db.system"]).toBe("clickhouse");
expect(span.attributes["db.operation"]).toBe("INSERT");
expect(span.attributes["db.statement"]).toBe(
"INSERT INTO users FORMAT JSONEachRow"
);
});
it("records a failed exec", async () => {
const error = new Error("Exec failed");
const client = {
query: vi.fn(),
insert: vi.fn(),
exec: vi.fn(() => Promise.reject(error)),
command: vi.fn(),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
await expect(
instrumented.exec({ query: "CREATE TABLE test (id UInt32)" })
).rejects.toThrow("Exec failed");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.status.code).toBe(SpanStatusCode.ERROR);
expect(span.events).toHaveLength(1);
expect(span.events[0].name).toBe("exception");
});
});
describe("command instrumentation", () => {
it("records a successful command", async () => {
const client = {
query: vi.fn(),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(() =>
Promise.resolve({
query_id: "test-query-id",
response_headers: {
"x-clickhouse-summary": JSON.stringify({
read_rows: "0",
read_bytes: "0",
written_rows: "0",
written_bytes: "0",
total_rows_to_read: "0",
result_rows: "0",
result_bytes: "0",
elapsed_ns: "100000",
}),
},
})
),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
await instrumented.command({
query: "CREATE TABLE test (id UInt32) ENGINE = Memory",
});
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("clickhouse.create");
expect(span.status.code).toBe(SpanStatusCode.OK);
expect(span.attributes["db.system"]).toBe("clickhouse");
expect(span.attributes["db.operation"]).toBe("CREATE");
expect(span.attributes["db.statement"]).toBe(
"CREATE TABLE test (id UInt32) ENGINE = Memory"
);
expect(span.attributes["clickhouse.elapsed_ns"]).toBe(100000);
});
it("records a failed command", async () => {
const error = new Error("Command failed");
const client = {
query: vi.fn(),
insert: vi.fn(),
exec: vi.fn(),
command: vi.fn(() => Promise.reject(error)),
} as unknown as MockClickHouseClient;
const instrumented = instrumentClickHouse(client as any);
await expect(
instrumented.command({ query: "DROP TABLE test" })
).rejects.toThrow("Command failed");
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(1);
const span = spans[0];
expect(span.name).toBe("clickhouse.drop");
expect(span.status.code).toBe(SpanStatusCode.ERROR);
expect(span.events).toHaveLength(1);
expect(span.events[0].name).toBe("exception");
});
});
});

View File

@@ -9,6 +9,9 @@ import type {
ClickHouseClient,
DataFormat,
QueryParams,
InsertParams,
ExecParams,
CommandParams,
} from "@clickhouse/client";
const DEFAULT_TRACER_NAME = "@kubiks/otel-clickhouse";
@@ -258,8 +261,9 @@ function addExecutionStats(span: Span, summary: ClickHouseSummary): void {
/**
* Instruments a ClickHouse client with OpenTelemetry tracing.
*
* This function wraps the client's `query` method to create spans for each database
* operation, including detailed execution statistics from ClickHouse response headers.
* This function wraps the client's `query`, `insert`, `exec`, and `command` methods to create
* spans for each database operation, including detailed execution statistics from ClickHouse
* response headers.
*
* The instrumentation is idempotent - calling it multiple times on the same client will only
* instrument it once.
@@ -403,6 +407,188 @@ export function instrumentClickHouse(
}
};
// Store the original insert method
const originalInsert = client.insert.bind(client);
// Create instrumented insert method
client.insert = async function instrumentedInsert<T = unknown>(
params: InsertParams<any, T>
): Promise<any> {
const spanName = "clickhouse.insert";
// Start span
const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT });
span.setAttribute(SEMATTRS_DB_SYSTEM, "clickhouse");
span.setAttribute(SEMATTRS_DB_OPERATION, "INSERT");
if (dbName) {
span.setAttribute(SEMATTRS_DB_NAME, dbName);
}
// Capture the table name and format
if (captureQueryText) {
const table = params.table;
const format = params.format || "JSONCompactEachRow";
let statement = `INSERT INTO ${table}`;
if (params.columns) {
if (Array.isArray(params.columns)) {
statement += ` (${params.columns.join(", ")})`;
} else if ("except" in params.columns) {
statement += ` (* EXCEPT (${params.columns.except.join(", ")}))`;
}
}
statement += ` FORMAT ${format}`;
const sanitized = sanitizeQueryText(statement, maxQueryTextLength);
span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized);
}
if (peerName) {
span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName);
}
if (peerPort) {
span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort);
}
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalInsert(params)
);
// Extract and add execution statistics from response headers
if (captureExecutionStats && result.executed) {
const summary = extractSummary(result.response_headers);
if (summary) {
addExecutionStats(span, summary);
}
}
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Store the original exec method
const originalExec = client.exec.bind(client);
// Create instrumented exec method
client.exec = async function instrumentedExec(
params: ExecParams
): Promise<any> {
const queryText = params.query;
const operation = queryText ? extractOperation(queryText) : undefined;
const spanName = operation
? `clickhouse.${operation.toLowerCase()}`
: "clickhouse.exec";
// Start span
const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT });
span.setAttribute(SEMATTRS_DB_SYSTEM, "clickhouse");
if (operation) {
span.setAttribute(SEMATTRS_DB_OPERATION, operation);
}
if (dbName) {
span.setAttribute(SEMATTRS_DB_NAME, dbName);
}
if (captureQueryText && queryText !== undefined) {
const sanitized = sanitizeQueryText(queryText, maxQueryTextLength);
span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized);
}
if (peerName) {
span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName);
}
if (peerPort) {
span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort);
}
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalExec(params)
);
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Store the original command method
const originalCommand = client.command.bind(client);
// Create instrumented command method
client.command = async function instrumentedCommand(
params: CommandParams
): Promise<any> {
const queryText = params.query;
const operation = queryText ? extractOperation(queryText) : undefined;
const spanName = operation
? `clickhouse.${operation.toLowerCase()}`
: "clickhouse.command";
// Start span
const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT });
span.setAttribute(SEMATTRS_DB_SYSTEM, "clickhouse");
if (operation) {
span.setAttribute(SEMATTRS_DB_OPERATION, operation);
}
if (dbName) {
span.setAttribute(SEMATTRS_DB_NAME, dbName);
}
if (captureQueryText && queryText !== undefined) {
const sanitized = sanitizeQueryText(queryText, maxQueryTextLength);
span.setAttribute(SEMATTRS_DB_STATEMENT, sanitized);
}
if (peerName) {
span.setAttribute(SEMATTRS_NET_PEER_NAME, peerName);
}
if (peerPort) {
span.setAttribute(SEMATTRS_NET_PEER_PORT, peerPort);
}
const activeContext = trace.setSpan(context.active(), span);
try {
const result = await context.with(activeContext, () =>
originalCommand(params)
);
// Extract and add execution statistics from response headers
if (captureExecutionStats) {
const summary = extractSummary(result.response_headers);
if (summary) {
addExecutionStats(span, summary);
}
}
finalizeSpan(span);
return result;
} catch (error) {
finalizeSpan(span, error);
throw error;
}
};
// Mark as instrumented
instrumentedClient[INSTRUMENTED_FLAG] = true;