import { and, asc, count, Database, desc, eq, like, or, sql } from "@pkg/db"; import { notifications } from "@pkg/db/schema"; import { ResultAsync } from "neverthrow"; import { FlowExecCtx } from "@core/flow.execution.context"; import type { Notification, NotificationFilters, PaginatedNotifications, PaginationOptions, } from "./data"; import { type Err } from "@pkg/result"; import { notificationErrors } from "./errors"; import { logDomainEvent } from "@pkg/logger"; export class NotificationRepository { constructor(private db: Database) {} getNotifications( fctx: FlowExecCtx, filters: NotificationFilters, pagination: PaginationOptions, ): ResultAsync { const startedAt = Date.now(); logDomainEvent({ event: "notifications.list.started", fctx, meta: { hasSearch: Boolean(filters.search), isRead: filters.isRead, isArchived: filters.isArchived, page: pagination.page, pageSize: pagination.pageSize, sortBy: pagination.sortBy, sortOrder: pagination.sortOrder, }, }); const { userId, isRead, isArchived, type, category, priority, search } = filters; const { page, pageSize, sortBy = "createdAt", sortOrder = "desc", } = pagination; // Build WHERE conditions const conditions = [eq(notifications.userId, userId)]; if (isRead !== undefined) { conditions.push(eq(notifications.isRead, isRead)); } if (isArchived !== undefined) { conditions.push(eq(notifications.isArchived, isArchived)); } if (type) { conditions.push(eq(notifications.type, type)); } if (category) { conditions.push(eq(notifications.category, category)); } if (priority) { conditions.push(eq(notifications.priority, priority)); } if (search) { conditions.push( or( like(notifications.title, `%${search}%`), like(notifications.body, `%${search}%`), )!, ); } const whereClause = and(...conditions); return ResultAsync.fromPromise( this.db.select({ count: count() }).from(notifications).where(whereClause), (error) => { logDomainEvent({ level: "error", event: "notifications.list.failed", fctx, durationMs: Date.now() - startedAt, error, }); return notificationErrors.getNotificationsFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).andThen((totalResult) => { const total = totalResult[0]?.count || 0; const offset = (page - 1) * pageSize; // Map sortBy to proper column const getOrderColumn = (sortBy: string) => { switch (sortBy) { case "createdAt": return notifications.createdAt; case "sentAt": return notifications.sentAt; case "readAt": return notifications.readAt; case "priority": return notifications.priority; default: return notifications.createdAt; } }; const orderColumn = getOrderColumn(sortBy); const orderFunc = sortOrder === "asc" ? asc : desc; return ResultAsync.fromPromise( this.db .select() .from(notifications) .where(whereClause) .orderBy(orderFunc(orderColumn)) .limit(pageSize) .offset(offset), (error) => { logDomainEvent({ level: "error", event: "notifications.list.failed", fctx, durationMs: Date.now() - startedAt, error, }); return notificationErrors.getNotificationsFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map((data) => { const totalPages = Math.ceil(total / pageSize); logDomainEvent({ event: "notifications.list.succeeded", fctx, durationMs: Date.now() - startedAt, meta: { count: data.length, page, totalPages, }, }); return { data: data as Notification[], total, page, pageSize, totalPages, }; }); }); } markAsRead( fctx: FlowExecCtx, notificationIds: number[], userId: string, ): ResultAsync { const startedAt = Date.now(); logDomainEvent({ event: "notifications.mark_read.started", fctx, meta: { userId, notificationCount: notificationIds.length }, }); return ResultAsync.fromPromise( this.db .update(notifications) .set({ isRead: true, readAt: new Date(), updatedAt: new Date(), }) .where( and( eq(notifications.userId, userId), sql`${notifications.id} = ANY(${notificationIds})`, ), ), (error) => { logDomainEvent({ level: "error", event: "notifications.mark_read.failed", fctx, durationMs: Date.now() - startedAt, error, }); return notificationErrors.markAsReadFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map(() => { logDomainEvent({ event: "notifications.mark_read.succeeded", fctx, durationMs: Date.now() - startedAt, meta: { notificationCount: notificationIds.length }, }); return true; }); } markAsUnread( fctx: FlowExecCtx, notificationIds: number[], userId: string, ): ResultAsync { const startedAt = Date.now(); logDomainEvent({ event: "notifications.mark_unread.started", fctx, meta: { userId, notificationCount: notificationIds.length }, }); return ResultAsync.fromPromise( this.db .update(notifications) .set({ isRead: false, readAt: null, updatedAt: new Date(), }) .where( and( eq(notifications.userId, userId), sql`${notifications.id} = ANY(${notificationIds})`, ), ), (error) => { logDomainEvent({ level: "error", event: "notifications.mark_unread.failed", fctx, durationMs: Date.now() - startedAt, error, }); return notificationErrors.markAsUnreadFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map(() => { logDomainEvent({ event: "notifications.mark_unread.succeeded", fctx, durationMs: Date.now() - startedAt, meta: { notificationCount: notificationIds.length }, }); return true; }); } archive( fctx: FlowExecCtx, notificationIds: number[], userId: string, ): ResultAsync { const startedAt = Date.now(); logDomainEvent({ event: "notifications.archive.started", fctx, meta: { userId, notificationCount: notificationIds.length }, }); return ResultAsync.fromPromise( this.db .update(notifications) .set({ isArchived: true, updatedAt: new Date(), }) .where( and( eq(notifications.userId, userId), sql`${notifications.id} = ANY(${notificationIds})`, ), ), (error) => { logDomainEvent({ level: "error", event: "notifications.archive.failed", fctx, durationMs: Date.now() - startedAt, error, }); return notificationErrors.archiveFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map(() => { logDomainEvent({ event: "notifications.archive.succeeded", fctx, durationMs: Date.now() - startedAt, meta: { notificationCount: notificationIds.length }, }); return true; }); } unarchive( fctx: FlowExecCtx, notificationIds: number[], userId: string, ): ResultAsync { const startedAt = Date.now(); logDomainEvent({ event: "notifications.unarchive.started", fctx, meta: { userId, notificationCount: notificationIds.length }, }); return ResultAsync.fromPromise( this.db .update(notifications) .set({ isArchived: false, updatedAt: new Date(), }) .where( and( eq(notifications.userId, userId), sql`${notifications.id} = ANY(${notificationIds})`, ), ), (error) => { logDomainEvent({ level: "error", event: "notifications.unarchive.failed", fctx, durationMs: Date.now() - startedAt, error, }); return notificationErrors.unarchiveFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map(() => { logDomainEvent({ event: "notifications.unarchive.succeeded", fctx, durationMs: Date.now() - startedAt, meta: { notificationCount: notificationIds.length }, }); return true; }); } deleteNotifications( fctx: FlowExecCtx, notificationIds: number[], userId: string, ): ResultAsync { const startedAt = Date.now(); logDomainEvent({ event: "notifications.delete.started", fctx, meta: { userId, notificationCount: notificationIds.length }, }); return ResultAsync.fromPromise( this.db .delete(notifications) .where( and( eq(notifications.userId, userId), sql`${notifications.id} = ANY(${notificationIds})`, ), ), (error) => { logDomainEvent({ level: "error", event: "notifications.delete.failed", fctx, durationMs: Date.now() - startedAt, error, }); return notificationErrors.deleteNotificationsFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map(() => { logDomainEvent({ event: "notifications.delete.succeeded", fctx, durationMs: Date.now() - startedAt, meta: { notificationCount: notificationIds.length }, }); return true; }); } getUnreadCount( fctx: FlowExecCtx, userId: string, ): ResultAsync { const startedAt = Date.now(); logDomainEvent({ event: "notifications.unread_count.started", fctx, meta: { userId }, }); return ResultAsync.fromPromise( this.db .select({ count: count() }) .from(notifications) .where( and( eq(notifications.userId, userId), eq(notifications.isRead, false), eq(notifications.isArchived, false), ), ), (error) => { logDomainEvent({ level: "error", event: "notifications.unread_count.failed", fctx, durationMs: Date.now() - startedAt, error, }); return notificationErrors.getUnreadCountFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map((result) => { const count = result[0]?.count || 0; logDomainEvent({ event: "notifications.unread_count.succeeded", fctx, durationMs: Date.now() - startedAt, meta: { count }, }); return count; }); } }