Files
illusory-mapp/packages/logic/core/observability.ts

81 lines
2.1 KiB
TypeScript

import { SpanStatusCode, trace, type Attributes } from "@opentelemetry/api";
import type { FlowExecCtx } from "./flow.execution.context";
import { ResultAsync } from "neverthrow";
const tracer = trace.getTracer("@pkg/logic");
type BaseSpanOptions = {
name: string;
fctx?: FlowExecCtx;
attributes?: Attributes;
};
function spanAttributes(
fctx?: FlowExecCtx,
attributes?: Attributes,
): Attributes | undefined {
const flowAttrs: Attributes = {};
if (fctx?.flowId) flowAttrs["flow.id"] = fctx.flowId;
if (fctx?.userId) flowAttrs["flow.user_id"] = fctx.userId;
if (fctx?.sessionId) flowAttrs["flow.session_id"] = fctx.sessionId;
if (!attributes && Object.keys(flowAttrs).length === 0) {
return undefined;
}
return { ...flowAttrs, ...(attributes ?? {}) };
}
export async function withFlowSpan<T>({
name,
fctx,
attributes,
fn,
}: BaseSpanOptions & {
fn: () => Promise<T>;
}): Promise<T> {
return tracer.startActiveSpan(
name,
{ attributes: spanAttributes(fctx, attributes) },
async (span) => {
try {
const result = await fn();
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.recordException(error as Error);
span.setStatus({
code: SpanStatusCode.ERROR,
message:
error instanceof Error ? error.message : String(error),
});
throw error;
} finally {
span.end();
}
},
);
}
export function traceResultAsync<T, E>({
name,
fctx,
attributes,
fn,
}: BaseSpanOptions & {
fn: () => ResultAsync<T, E>;
}): ResultAsync<T, E> {
return ResultAsync.fromPromise(
withFlowSpan({
name,
fctx,
attributes,
fn: async () =>
fn().match(
(value) => value,
(error) => Promise.reject(error),
),
}),
(error) => error as E,
);
}