import { and, asc, count, Database, desc, eq, like, or, sql } from "@pkg/db"; import { notifications } from "@pkg/db/schema"; import { ResultAsync } from "neverthrow"; import { FlowExecCtx } from "@core/flow.execution.context"; import type { Notification, NotificationFilters, PaginatedNotifications, PaginationOptions, } from "./data"; import { type Err } from "@pkg/result"; import { notificationErrors } from "./errors"; import { logger } from "@pkg/logger"; export class NotificationRepository { constructor(private db: Database) {} getNotifications( fctx: FlowExecCtx, filters: NotificationFilters, pagination: PaginationOptions, ): ResultAsync { logger.info("Getting notifications with filters", { ...fctx, filters }); const { userId, isRead, isArchived, type, category, priority, search } = filters; const { page, pageSize, sortBy = "createdAt", sortOrder = "desc", } = pagination; // Build WHERE conditions const conditions = [eq(notifications.userId, userId)]; if (isRead !== undefined) { conditions.push(eq(notifications.isRead, isRead)); } if (isArchived !== undefined) { conditions.push(eq(notifications.isArchived, isArchived)); } if (type) { conditions.push(eq(notifications.type, type)); } if (category) { conditions.push(eq(notifications.category, category)); } if (priority) { conditions.push(eq(notifications.priority, priority)); } if (search) { conditions.push( or( like(notifications.title, `%${search}%`), like(notifications.body, `%${search}%`), )!, ); } const whereClause = and(...conditions); return ResultAsync.fromPromise( this.db.select({ count: count() }).from(notifications).where(whereClause), (error) => { logger.error("Failed to get notifications count", { ...fctx, error, }); return notificationErrors.getNotificationsFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).andThen((totalResult) => { const total = totalResult[0]?.count || 0; const offset = (page - 1) * pageSize; // Map sortBy to proper column const getOrderColumn = (sortBy: string) => { switch (sortBy) { case "createdAt": return notifications.createdAt; case "sentAt": return notifications.sentAt; case "readAt": return notifications.readAt; case "priority": return notifications.priority; default: return notifications.createdAt; } }; const orderColumn = getOrderColumn(sortBy); const orderFunc = sortOrder === "asc" ? asc : desc; return ResultAsync.fromPromise( this.db .select() .from(notifications) .where(whereClause) .orderBy(orderFunc(orderColumn)) .limit(pageSize) .offset(offset), (error) => { logger.error("Failed to get notifications data", { ...fctx, error, }); return notificationErrors.getNotificationsFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map((data) => { const totalPages = Math.ceil(total / pageSize); logger.info("Retrieved notifications", { ...fctx, count: data.length, page, totalPages, }); return { data: data as Notification[], total, page, pageSize, totalPages, }; }); }); } markAsRead( fctx: FlowExecCtx, notificationIds: number[], userId: string, ): ResultAsync { logger.info("Marking notifications as read", { ...fctx, notificationIds, userId, }); return ResultAsync.fromPromise( this.db .update(notifications) .set({ isRead: true, readAt: new Date(), updatedAt: new Date(), }) .where( and( eq(notifications.userId, userId), sql`${notifications.id} = ANY(${notificationIds})`, ), ), (error) => { logger.error("Failed to mark notifications as read", { ...fctx, error, }); return notificationErrors.markAsReadFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map(() => { logger.info("Notifications marked as read successfully", { ...fctx, notificationIds, }); return true; }); } markAsUnread( fctx: FlowExecCtx, notificationIds: number[], userId: string, ): ResultAsync { logger.info("Marking notifications as unread", { ...fctx, notificationIds, userId, }); return ResultAsync.fromPromise( this.db .update(notifications) .set({ isRead: false, readAt: null, updatedAt: new Date(), }) .where( and( eq(notifications.userId, userId), sql`${notifications.id} = ANY(${notificationIds})`, ), ), (error) => { logger.error("Failed to mark notifications as unread", { ...fctx, error, }); return notificationErrors.markAsUnreadFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map(() => { logger.info("Notifications marked as unread successfully", { ...fctx, notificationIds, }); return true; }); } archive( fctx: FlowExecCtx, notificationIds: number[], userId: string, ): ResultAsync { logger.info("Archiving notifications", { ...fctx, notificationIds, userId, }); return ResultAsync.fromPromise( this.db .update(notifications) .set({ isArchived: true, updatedAt: new Date(), }) .where( and( eq(notifications.userId, userId), sql`${notifications.id} = ANY(${notificationIds})`, ), ), (error) => { logger.error("Failed to archive notifications", { ...fctx, error, }); return notificationErrors.archiveFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map(() => { logger.info("Notifications archived successfully", { ...fctx, notificationIds, }); return true; }); } unarchive( fctx: FlowExecCtx, notificationIds: number[], userId: string, ): ResultAsync { logger.info("Unarchiving notifications", { ...fctx, notificationIds, userId, }); return ResultAsync.fromPromise( this.db .update(notifications) .set({ isArchived: false, updatedAt: new Date(), }) .where( and( eq(notifications.userId, userId), sql`${notifications.id} = ANY(${notificationIds})`, ), ), (error) => { logger.error("Failed to unarchive notifications", { ...fctx, error, }); return notificationErrors.unarchiveFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map(() => { logger.info("Notifications unarchived successfully", { ...fctx, notificationIds, }); return true; }); } deleteNotifications( fctx: FlowExecCtx, notificationIds: number[], userId: string, ): ResultAsync { logger.info("Deleting notifications", { ...fctx, notificationIds, userId, }); return ResultAsync.fromPromise( this.db .delete(notifications) .where( and( eq(notifications.userId, userId), sql`${notifications.id} = ANY(${notificationIds})`, ), ), (error) => { logger.error("Failed to delete notifications", { ...fctx, error, }); return notificationErrors.deleteNotificationsFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map(() => { logger.info("Notifications deleted successfully", { ...fctx, notificationIds, }); return true; }); } getUnreadCount( fctx: FlowExecCtx, userId: string, ): ResultAsync { logger.info("Getting unread count", { ...fctx, userId }); return ResultAsync.fromPromise( this.db .select({ count: count() }) .from(notifications) .where( and( eq(notifications.userId, userId), eq(notifications.isRead, false), eq(notifications.isArchived, false), ), ), (error) => { logger.error("Failed to get unread count", { ...fctx, error }); return notificationErrors.getUnreadCountFailed( fctx, error instanceof Error ? error.message : String(error), ); }, ).map((result) => { const count = result[0]?.count || 0; logger.info("Retrieved unread count", { ...fctx, count }); return count; }); } }