added spanning methods for insights in logic + logger is fully otel-ified as well 🥳

This commit is contained in:
user
2026-03-01 03:11:21 +02:00
parent 9914218b81
commit 8004459d20
6 changed files with 564 additions and 360 deletions

View File

@@ -0,0 +1,80 @@
import { SpanStatusCode, trace, type Attributes } from "@opentelemetry/api";
import type { FlowExecCtx } from "./flow.execution.context";
import { ResultAsync } from "neverthrow";
const tracer = trace.getTracer("@pkg/logic");
type BaseSpanOptions = {
name: string;
fctx?: FlowExecCtx;
attributes?: Attributes;
};
function spanAttributes(
fctx?: FlowExecCtx,
attributes?: Attributes,
): Attributes | undefined {
const flowAttrs: Attributes = {};
if (fctx?.flowId) flowAttrs["flow.id"] = fctx.flowId;
if (fctx?.userId) flowAttrs["flow.user_id"] = fctx.userId;
if (fctx?.sessionId) flowAttrs["flow.session_id"] = fctx.sessionId;
if (!attributes && Object.keys(flowAttrs).length === 0) {
return undefined;
}
return { ...flowAttrs, ...(attributes ?? {}) };
}
export async function withFlowSpan<T>({
name,
fctx,
attributes,
fn,
}: BaseSpanOptions & {
fn: () => Promise<T>;
}): Promise<T> {
return tracer.startActiveSpan(
name,
{ attributes: spanAttributes(fctx, attributes) },
async (span) => {
try {
const result = await fn();
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.recordException(error as Error);
span.setStatus({
code: SpanStatusCode.ERROR,
message:
error instanceof Error ? error.message : String(error),
});
throw error;
} finally {
span.end();
}
},
);
}
export function traceResultAsync<T, E>({
name,
fctx,
attributes,
fn,
}: BaseSpanOptions & {
fn: () => ResultAsync<T, E>;
}): ResultAsync<T, E> {
return ResultAsync.fromPromise(
withFlowSpan({
name,
fctx,
attributes,
fn: async () =>
fn().match(
(value) => value,
(error) => Promise.reject(error),
),
}),
(error) => error as E,
);
}

View File

@@ -1,4 +1,5 @@
import { FlowExecCtx } from "@/core/flow.execution.context";
import { traceResultAsync } from "@core/observability";
import { ERROR_CODES, type Err } from "@pkg/result";
import { getError, logger } from "@pkg/logger";
import { auth } from "../auth/config.base";
@@ -34,72 +35,22 @@ export class AccountRepository {
fctx: FlowExecCtx,
userId: string,
): ResultAsync<boolean, Err> {
logger.info("Checking if account exists for user", {
...fctx,
userId,
});
return ResultAsync.fromPromise(
this.db.query.account.findFirst({
where: eq(account.userId, userId),
}),
(error) => {
logger.error("Failed to check account existence", {
...fctx,
error,
});
return this.dbError(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((existingAccount) => {
if (existingAccount) {
logger.info("Account already exists for user", {
return traceResultAsync({
name: "logic.user.repository.ensureAccountExists",
fctx,
attributes: { "app.user.id": userId },
fn: () => {
logger.info("Checking if account exists for user", {
...fctx,
userId,
});
return ResultAsync.fromSafePromise(Promise.resolve(true));
}
logger.info(
"Account does not exist, creating new account for user",
{
...fctx,
userId,
},
);
return ResultAsync.fromPromise(
auth.$context.then((ctx) => ctx.password.hash(nanoid())),
(error) => {
logger.error("Failed to hash password", {
...fctx,
error,
});
return this.dbError(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((password) => {
const aid = nanoid();
return ResultAsync.fromPromise(
this.db
.insert(account)
.values({
id: aid,
accountId: userId,
providerId: "credential",
userId: userId,
password,
createdAt: new Date(),
updatedAt: new Date(),
})
.execute(),
this.db.query.account.findFirst({
where: eq(account.userId, userId),
}),
(error) => {
logger.error("Failed to create account", {
logger.error("Failed to check account existence", {
...fctx,
error,
});
@@ -110,14 +61,82 @@ export class AccountRepository {
: String(error),
);
},
).map(() => {
logger.info("Account created successfully for user", {
...fctx,
userId,
).andThen((existingAccount) => {
if (existingAccount) {
logger.info("Account already exists for user", {
...fctx,
userId,
});
return ResultAsync.fromSafePromise(
Promise.resolve(true),
);
}
logger.info(
"Account does not exist, creating new account for user",
{
...fctx,
userId,
},
);
return ResultAsync.fromPromise(
auth.$context.then((ctx) =>
ctx.password.hash(nanoid()),
),
(error) => {
logger.error("Failed to hash password", {
...fctx,
error,
});
return this.dbError(
fctx,
error instanceof Error
? error.message
: String(error),
);
},
).andThen((password) => {
const aid = nanoid();
return ResultAsync.fromPromise(
this.db
.insert(account)
.values({
id: aid,
accountId: userId,
providerId: "credential",
userId: userId,
password,
createdAt: new Date(),
updatedAt: new Date(),
})
.execute(),
(error) => {
logger.error("Failed to create account", {
...fctx,
error,
});
return this.dbError(
fctx,
error instanceof Error
? error.message
: String(error),
);
},
).map(() => {
logger.info(
"Account created successfully for user",
{
...fctx,
userId,
},
);
return false;
});
});
return false;
});
});
},
});
}
@@ -126,70 +145,25 @@ export class AccountRepository {
userId: string,
password: string,
): ResultAsync<string, Err> {
logger.info("Starting password rotation for user", {
...fctx,
userId,
});
return ResultAsync.fromPromise(
this.db.query.account.findFirst({
where: eq(account.userId, userId),
}),
(error) => {
logger.error(
"Failed to check account existence for password rotation",
{
...fctx,
error,
},
);
return this.dbError(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((existingAccount) => {
if (!existingAccount) {
logger.error("Account not found for user", {
return traceResultAsync({
name: "logic.user.repository.rotatePassword",
fctx,
attributes: { "app.user.id": userId },
fn: () => {
logger.info("Starting password rotation for user", {
...fctx,
userId,
});
return ResultAsync.fromSafePromise(
Promise.resolve(this.accountNotFound(fctx)),
).andThen((err) =>
ResultAsync.fromSafePromise(Promise.reject(err)),
);
}
return ResultAsync.fromPromise(
auth.$context.then((ctx) => ctx.password.hash(password)),
(error) => {
logger.error("Failed to hash password for rotation", {
...fctx,
error,
});
return this.dbError(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((hashed) => {
logger.info("Updating user's password in database", {
...fctx,
});
return ResultAsync.fromPromise(
this.db
.update(account)
.set({ password: hashed })
.where(eq(account.userId, userId))
.returning()
.execute(),
this.db.query.account.findFirst({
where: eq(account.userId, userId),
}),
(error) => {
logger.error("Failed to update password", {
...fctx,
error,
});
logger.error(
"Failed to check account existence for password rotation",
{ ...fctx, error },
);
return this.dbError(
fctx,
error instanceof Error
@@ -197,17 +171,76 @@ export class AccountRepository {
: String(error),
);
},
).map((result) => {
logger.info("User's password updated successfully", {
...fctx,
).andThen((existingAccount) => {
if (!existingAccount) {
logger.error("Account not found for user", {
...fctx,
userId,
});
return ResultAsync.fromSafePromise(
Promise.resolve(this.accountNotFound(fctx)),
).andThen((err) =>
ResultAsync.fromSafePromise(Promise.reject(err)),
);
}
return ResultAsync.fromPromise(
auth.$context.then((ctx) =>
ctx.password.hash(password),
),
(error) => {
logger.error(
"Failed to hash password for rotation",
{
...fctx,
error,
},
);
return this.dbError(
fctx,
error instanceof Error
? error.message
: String(error),
);
},
).andThen((hashed) => {
logger.info("Updating user's password in database", {
...fctx,
});
return ResultAsync.fromPromise(
this.db
.update(account)
.set({ password: hashed })
.where(eq(account.userId, userId))
.returning()
.execute(),
(error) => {
logger.error("Failed to update password", {
...fctx,
error,
});
return this.dbError(
fctx,
error instanceof Error
? error.message
: String(error),
);
},
).map((result) => {
logger.info(
"User's password updated successfully",
{ ...fctx },
);
logger.debug("Password rotation result", {
...fctx,
result,
});
return password;
});
});
logger.debug("Password rotation result", {
...fctx,
result,
});
return password;
});
});
},
});
}
}

View File

@@ -1,4 +1,5 @@
import { FlowExecCtx } from "@/core/flow.execution.context";
import { traceResultAsync } from "@core/observability";
import { AccountRepository } from "./account.repository";
import { UserRepository } from "./repository";
import { db } from "@pkg/db";
@@ -10,19 +11,39 @@ export class UserController {
) {}
getUserInfo(fctx: FlowExecCtx, userId: string) {
return this.userRepository.getUserInfo(fctx, userId);
return traceResultAsync({
name: "logic.user.controller.getUserInfo",
fctx,
attributes: { "app.user.id": userId },
fn: () => this.userRepository.getUserInfo(fctx, userId),
});
}
ensureAccountExists(fctx: FlowExecCtx, userId: string) {
return this.accountRepo.ensureAccountExists(fctx, userId);
return traceResultAsync({
name: "logic.user.controller.ensureAccountExists",
fctx,
attributes: { "app.user.id": userId },
fn: () => this.accountRepo.ensureAccountExists(fctx, userId),
});
}
isUsernameAvailable(fctx: FlowExecCtx, username: string) {
return this.userRepository.isUsernameAvailable(fctx, username);
return traceResultAsync({
name: "logic.user.controller.isUsernameAvailable",
fctx,
attributes: { "app.user.username": username },
fn: () => this.userRepository.isUsernameAvailable(fctx, username),
});
}
updateLastVerified2FaAtToNow(fctx: FlowExecCtx, userId: string) {
return this.userRepository.updateLastVerified2FaAtToNow(fctx, userId);
return traceResultAsync({
name: "logic.user.controller.updateLastVerified2FaAtToNow",
fctx,
attributes: { "app.user.id": userId },
fn: () => this.userRepository.updateLastVerified2FaAtToNow(fctx, userId),
});
}
banUser(
@@ -31,19 +52,39 @@ export class UserController {
reason: string,
banExpiresAt: Date,
) {
return this.userRepository.banUser(fctx, userId, reason, banExpiresAt);
return traceResultAsync({
name: "logic.user.controller.banUser",
fctx,
attributes: { "app.user.id": userId },
fn: () => this.userRepository.banUser(fctx, userId, reason, banExpiresAt),
});
}
isUserBanned(fctx: FlowExecCtx, userId: string) {
return this.userRepository.isUserBanned(fctx, userId);
return traceResultAsync({
name: "logic.user.controller.isUserBanned",
fctx,
attributes: { "app.user.id": userId },
fn: () => this.userRepository.isUserBanned(fctx, userId),
});
}
getBanInfo(fctx: FlowExecCtx, userId: string) {
return this.userRepository.getBanInfo(fctx, userId);
return traceResultAsync({
name: "logic.user.controller.getBanInfo",
fctx,
attributes: { "app.user.id": userId },
fn: () => this.userRepository.getBanInfo(fctx, userId),
});
}
rotatePassword(fctx: FlowExecCtx, userId: string, password: string) {
return this.accountRepo.rotatePassword(fctx, userId, password);
return traceResultAsync({
name: "logic.user.controller.rotatePassword",
fctx,
attributes: { "app.user.id": userId },
fn: () => this.accountRepo.rotatePassword(fctx, userId, password),
});
}
}

View File

@@ -1,5 +1,6 @@
import { ResultAsync, errAsync, okAsync } from "neverthrow";
import { FlowExecCtx } from "@core/flow.execution.context";
import { traceResultAsync } from "@core/observability";
import { type Err } from "@pkg/result";
import { Database, eq } from "@pkg/db";
import { BanInfo, User } from "./data";
@@ -11,72 +12,46 @@ export class UserRepository {
constructor(private db: Database) {}
getUserInfo(fctx: FlowExecCtx, userId: string): ResultAsync<User, Err> {
logger.info("Getting user info for user", {
flowId: fctx.flowId,
userId,
});
return ResultAsync.fromPromise(
this.db.query.user.findFirst({
where: eq(user.id, userId),
}),
(error) => {
logger.error("Failed to get user info", {
flowId: fctx.flowId,
error,
});
return userErrors.getUserInfoFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((userData) => {
if (!userData) {
logger.error("User not found with id", {
return traceResultAsync({
name: "logic.user.repository.getUserInfo",
fctx,
attributes: { "app.user.id": userId },
fn: () => {
logger.info("Getting user info for user", {
flowId: fctx.flowId,
userId,
});
return errAsync(userErrors.userNotFound(fctx));
}
logger.info("User info retrieved successfully for user", {
flowId: fctx.flowId,
userId,
});
return okAsync(userData as User);
});
}
return ResultAsync.fromPromise(
this.db.query.user.findFirst({
where: eq(user.id, userId),
}),
(error) => {
logger.error("Failed to get user info", {
flowId: fctx.flowId,
error,
});
return userErrors.getUserInfoFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((userData) => {
if (!userData) {
logger.error("User not found with id", {
flowId: fctx.flowId,
userId,
});
return errAsync(userErrors.userNotFound(fctx));
}
updateLastVerified2FaAtToNow(
fctx: FlowExecCtx,
userId: string,
): ResultAsync<boolean, Err> {
logger.info("Updating last 2FA verified timestamp for user", {
flowId: fctx.flowId,
userId,
});
return ResultAsync.fromPromise(
this.db
.update(user)
.set({ last2FAVerifiedAt: new Date() })
.where(eq(user.id, userId))
.execute(),
(error) => {
logger.error("Failed to update last 2FA verified timestamp", {
...fctx,
error,
logger.info("User info retrieved successfully for user", {
flowId: fctx.flowId,
userId,
});
return okAsync(userData as User);
});
return userErrors.updateFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logger.info("Last 2FA verified timestamp updated successfully", {
...fctx,
});
return true;
});
}
@@ -84,33 +59,86 @@ export class UserRepository {
fctx: FlowExecCtx,
username: string,
): ResultAsync<boolean, Err> {
logger.info("Checking username availability", {
...fctx,
username,
});
return ResultAsync.fromPromise(
this.db.query.user.findFirst({
where: eq(user.username, username),
}),
(error) => {
logger.error("Failed to check username availability", {
return traceResultAsync({
name: "logic.user.repository.isUsernameAvailable",
fctx,
attributes: { "app.user.username": username },
fn: () => {
logger.info("Checking username availability", {
...fctx,
error,
username,
});
return ResultAsync.fromPromise(
this.db.query.user.findFirst({
where: eq(user.username, username),
}),
(error) => {
logger.error("Failed to check username availability", {
...fctx,
error,
});
return userErrors.usernameCheckFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((existingUser) => {
const isAvailable = !existingUser?.id;
logger.info("Username availability checked", {
...fctx,
username,
isAvailable,
});
return isAvailable;
});
},
});
}
updateLastVerified2FaAtToNow(
fctx: FlowExecCtx,
userId: string,
): ResultAsync<boolean, Err> {
return traceResultAsync({
name: "logic.user.repository.updateLastVerified2FaAtToNow",
fctx,
attributes: { "app.user.id": userId },
fn: () => {
logger.info("Updating last 2FA verified timestamp for user", {
flowId: fctx.flowId,
userId,
});
return ResultAsync.fromPromise(
this.db
.update(user)
.set({ last2FAVerifiedAt: new Date() })
.where(eq(user.id, userId))
.execute(),
(error) => {
logger.error(
"Failed to update last 2FA verified timestamp",
{
...fctx,
error,
},
);
return userErrors.updateFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logger.info(
"Last 2FA verified timestamp updated successfully",
{
...fctx,
},
);
return true;
});
return userErrors.usernameCheckFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((existingUser) => {
const isAvailable = !existingUser?.id;
logger.info("Username availability checked", {
...fctx,
username,
isAvailable,
});
return isAvailable;
});
}
@@ -120,170 +148,188 @@ export class UserRepository {
reason: string,
banExpiresAt: Date,
): ResultAsync<boolean, Err> {
logger.info("Banning user", {
...fctx,
userId,
banExpiresAt: banExpiresAt.toISOString(),
reason,
});
return ResultAsync.fromPromise(
this.db
.update(user)
.set({
banned: true,
banReason: reason,
banExpires: banExpiresAt,
})
.where(eq(user.id, userId))
.execute(),
(error) => {
logger.error("Failed to ban user", { ...fctx, error });
return userErrors.banOperationFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logger.info("User has been banned", {
...fctx,
userId,
banExpiresAt: banExpiresAt.toISOString(),
});
return true;
});
}
isUserBanned(fctx: FlowExecCtx, userId: string): ResultAsync<boolean, Err> {
logger.info("Checking ban status for user", { ...fctx, userId });
return ResultAsync.fromPromise(
this.db.query.user.findFirst({
where: eq(user.id, userId),
columns: {
banned: true,
banExpires: true,
},
}),
(error) => {
logger.error("Failed to check ban status", {
...fctx,
error,
});
return userErrors.dbError(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((userData) => {
if (!userData) {
logger.error("User not found when checking ban status", {
...fctx,
});
return errAsync(userErrors.userNotFound(fctx));
}
// If not banned, return false
if (!userData.banned) {
logger.info("User is not banned", { ...fctx, userId });
return okAsync(false);
}
// If banned but no expiry date, consider permanently banned
if (!userData.banExpires) {
logger.info("User is permanently banned", { ...fctx, userId });
return okAsync(true);
}
const now = new Date();
if (userData.banExpires <= now) {
logger.info("User ban has expired, removing ban status", {
return traceResultAsync({
name: "logic.user.repository.banUser",
fctx,
attributes: { "app.user.id": userId },
fn: () => {
logger.info("Banning user", {
...fctx,
userId,
banExpiresAt: banExpiresAt.toISOString(),
reason,
});
return ResultAsync.fromPromise(
this.db
.update(user)
.set({
banned: false,
banReason: null,
banExpires: null,
banned: true,
banReason: reason,
banExpires: banExpiresAt,
})
.where(eq(user.id, userId))
.execute(),
(error) => {
logger.error("Failed to unban user after expiry", {
logger.error("Failed to ban user", { ...fctx, error });
return userErrors.banOperationFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logger.info("User has been banned", {
...fctx,
userId,
banExpiresAt: banExpiresAt.toISOString(),
});
return true;
});
},
});
}
isUserBanned(fctx: FlowExecCtx, userId: string): ResultAsync<boolean, Err> {
return traceResultAsync({
name: "logic.user.repository.isUserBanned",
fctx,
attributes: { "app.user.id": userId },
fn: () => {
logger.info("Checking ban status for user", { ...fctx, userId });
return ResultAsync.fromPromise(
this.db.query.user.findFirst({
where: eq(user.id, userId),
columns: {
banned: true,
banExpires: true,
},
}),
(error) => {
logger.error("Failed to check ban status", {
...fctx,
error,
});
return userErrors.unbanFailed(
return userErrors.dbError(
fctx,
error instanceof Error
? error.message
: String(error),
error instanceof Error ? error.message : String(error),
);
},
)
.map(() => {
logger.info("User has been unbanned after expiry", {
).andThen((userData) => {
if (!userData) {
logger.error("User not found when checking ban status", {
...fctx,
});
return errAsync(userErrors.userNotFound(fctx));
}
if (!userData.banned) {
logger.info("User is not banned", { ...fctx, userId });
return okAsync(false);
}
if (!userData.banExpires) {
logger.info("User is permanently banned", { ...fctx, userId });
return okAsync(true);
}
const now = new Date();
if (userData.banExpires <= now) {
logger.info("User ban has expired, removing ban status", {
...fctx,
userId,
});
return false;
})
.orElse((error) => {
logger.error(
"Failed to unban user after expiry, still returning banned status",
{ ...fctx, userId, error },
);
// Still return banned status since we couldn't update
return okAsync(true);
});
}
logger.info("User is banned", {
...fctx,
userId,
banExpires: userData.banExpires.toISOString(),
});
return okAsync(true);
return ResultAsync.fromPromise(
this.db
.update(user)
.set({
banned: false,
banReason: null,
banExpires: null,
})
.where(eq(user.id, userId))
.execute(),
(error) => {
logger.error("Failed to unban user after expiry", {
...fctx,
error,
});
return userErrors.unbanFailed(
fctx,
error instanceof Error
? error.message
: String(error),
);
},
)
.map(() => {
logger.info("User has been unbanned after expiry", {
...fctx,
userId,
});
return false;
})
.orElse((error) => {
logger.error(
"Failed to unban user after expiry, still returning banned status",
{ ...fctx, userId, error },
);
return okAsync(true);
});
}
logger.info("User is banned", {
...fctx,
userId,
banExpires: userData.banExpires.toISOString(),
});
return okAsync(true);
});
},
});
}
getBanInfo(fctx: FlowExecCtx, userId: string): ResultAsync<BanInfo, Err> {
logger.info("Getting ban info for user", { ...fctx, userId });
return traceResultAsync({
name: "logic.user.repository.getBanInfo",
fctx,
attributes: { "app.user.id": userId },
fn: () => {
logger.info("Getting ban info for user", { ...fctx, userId });
return ResultAsync.fromPromise(
this.db.query.user.findFirst({
where: eq(user.id, userId),
columns: { banned: true, banReason: true, banExpires: true },
}),
(error) => {
logger.error("Failed to get ban info", { ...fctx, error });
return userErrors.getBanInfoFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((userData) => {
if (!userData) {
logger.error("User not found when getting ban info", {
...fctx,
return ResultAsync.fromPromise(
this.db.query.user.findFirst({
where: eq(user.id, userId),
columns: { banned: true, banReason: true, banExpires: true },
}),
(error) => {
logger.error("Failed to get ban info", { ...fctx, error });
return userErrors.getBanInfoFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((userData) => {
if (!userData) {
logger.error("User not found when getting ban info", {
...fctx,
});
return errAsync(userErrors.userNotFound(fctx));
}
logger.info("Ban info retrieved successfully for user", {
...fctx,
userId,
});
return okAsync({
banned: userData.banned || false,
reason: userData.banReason || undefined,
expires: userData.banExpires || undefined,
});
});
return errAsync(userErrors.userNotFound(fctx));
}
logger.info("Ban info retrieved successfully for user", {
...fctx,
userId,
});
return okAsync({
banned: userData.banned || false,
reason: userData.banReason || undefined,
expires: userData.banExpires || undefined,
});
},
});
}
}

View File

@@ -5,6 +5,7 @@
},
"dependencies": {
"@hono/standard-validator": "^0.2.1",
"@opentelemetry/api": "^1.9.0",
"@otplib/plugin-base32-scure": "^13.3.0",
"@otplib/plugin-crypto-noble": "^13.3.0",
"@otplib/totp": "^13.3.0",