& so it begins
This commit is contained in:
384
packages/logic/domains/notifications/repository.ts
Normal file
384
packages/logic/domains/notifications/repository.ts
Normal file
@@ -0,0 +1,384 @@
|
||||
import { and, asc, count, Database, desc, eq, like, or, sql } from "@pkg/db";
|
||||
import { notifications } from "@pkg/db/schema";
|
||||
import { ResultAsync } from "neverthrow";
|
||||
import { FlowExecCtx } from "@core/flow.execution.context";
|
||||
import type {
|
||||
Notification,
|
||||
NotificationFilters,
|
||||
PaginatedNotifications,
|
||||
PaginationOptions,
|
||||
} from "./data";
|
||||
import { type Err } from "@pkg/result";
|
||||
import { notificationErrors } from "./errors";
|
||||
import { logger } from "@pkg/logger";
|
||||
|
||||
export class NotificationRepository {
|
||||
constructor(private db: Database) {}
|
||||
|
||||
getNotifications(
|
||||
fctx: FlowExecCtx,
|
||||
filters: NotificationFilters,
|
||||
pagination: PaginationOptions,
|
||||
): ResultAsync<PaginatedNotifications, Err> {
|
||||
logger.info("Getting notifications with filters", { ...fctx, filters });
|
||||
|
||||
const { userId, isRead, isArchived, type, category, priority, search } =
|
||||
filters;
|
||||
const {
|
||||
page,
|
||||
pageSize,
|
||||
sortBy = "createdAt",
|
||||
sortOrder = "desc",
|
||||
} = pagination;
|
||||
|
||||
// Build WHERE conditions
|
||||
const conditions = [eq(notifications.userId, userId)];
|
||||
|
||||
if (isRead !== undefined) {
|
||||
conditions.push(eq(notifications.isRead, isRead));
|
||||
}
|
||||
|
||||
if (isArchived !== undefined) {
|
||||
conditions.push(eq(notifications.isArchived, isArchived));
|
||||
}
|
||||
|
||||
if (type) {
|
||||
conditions.push(eq(notifications.type, type));
|
||||
}
|
||||
|
||||
if (category) {
|
||||
conditions.push(eq(notifications.category, category));
|
||||
}
|
||||
|
||||
if (priority) {
|
||||
conditions.push(eq(notifications.priority, priority));
|
||||
}
|
||||
|
||||
if (search) {
|
||||
conditions.push(
|
||||
or(
|
||||
like(notifications.title, `%${search}%`),
|
||||
like(notifications.body, `%${search}%`),
|
||||
)!,
|
||||
);
|
||||
}
|
||||
|
||||
const whereClause = and(...conditions);
|
||||
|
||||
return ResultAsync.fromPromise(
|
||||
this.db.select({ count: count() }).from(notifications).where(whereClause),
|
||||
(error) => {
|
||||
logger.error("Failed to get notifications count", {
|
||||
...fctx,
|
||||
error,
|
||||
});
|
||||
return notificationErrors.getNotificationsFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
},
|
||||
).andThen((totalResult) => {
|
||||
const total = totalResult[0]?.count || 0;
|
||||
const offset = (page - 1) * pageSize;
|
||||
|
||||
// Map sortBy to proper column
|
||||
const getOrderColumn = (sortBy: string) => {
|
||||
switch (sortBy) {
|
||||
case "createdAt":
|
||||
return notifications.createdAt;
|
||||
case "sentAt":
|
||||
return notifications.sentAt;
|
||||
case "readAt":
|
||||
return notifications.readAt;
|
||||
case "priority":
|
||||
return notifications.priority;
|
||||
default:
|
||||
return notifications.createdAt;
|
||||
}
|
||||
};
|
||||
|
||||
const orderColumn = getOrderColumn(sortBy);
|
||||
const orderFunc = sortOrder === "asc" ? asc : desc;
|
||||
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.select()
|
||||
.from(notifications)
|
||||
.where(whereClause)
|
||||
.orderBy(orderFunc(orderColumn))
|
||||
.limit(pageSize)
|
||||
.offset(offset),
|
||||
(error) => {
|
||||
logger.error("Failed to get notifications data", {
|
||||
...fctx,
|
||||
error,
|
||||
});
|
||||
return notificationErrors.getNotificationsFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
},
|
||||
).map((data) => {
|
||||
const totalPages = Math.ceil(total / pageSize);
|
||||
logger.info("Retrieved notifications", {
|
||||
...fctx,
|
||||
count: data.length,
|
||||
page,
|
||||
totalPages,
|
||||
});
|
||||
|
||||
return {
|
||||
data: data as Notification[],
|
||||
total,
|
||||
page,
|
||||
pageSize,
|
||||
totalPages,
|
||||
};
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
markAsRead(
|
||||
fctx: FlowExecCtx,
|
||||
notificationIds: number[],
|
||||
userId: string,
|
||||
): ResultAsync<boolean, Err> {
|
||||
logger.info("Marking notifications as read", {
|
||||
...fctx,
|
||||
notificationIds,
|
||||
userId,
|
||||
});
|
||||
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.update(notifications)
|
||||
.set({
|
||||
isRead: true,
|
||||
readAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(notifications.userId, userId),
|
||||
sql`${notifications.id} = ANY(${notificationIds})`,
|
||||
),
|
||||
),
|
||||
(error) => {
|
||||
logger.error("Failed to mark notifications as read", {
|
||||
...fctx,
|
||||
error,
|
||||
});
|
||||
return notificationErrors.markAsReadFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
},
|
||||
).map(() => {
|
||||
logger.info("Notifications marked as read successfully", {
|
||||
...fctx,
|
||||
notificationIds,
|
||||
});
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
markAsUnread(
|
||||
fctx: FlowExecCtx,
|
||||
notificationIds: number[],
|
||||
userId: string,
|
||||
): ResultAsync<boolean, Err> {
|
||||
logger.info("Marking notifications as unread", {
|
||||
...fctx,
|
||||
notificationIds,
|
||||
userId,
|
||||
});
|
||||
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.update(notifications)
|
||||
.set({
|
||||
isRead: false,
|
||||
readAt: null,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(notifications.userId, userId),
|
||||
sql`${notifications.id} = ANY(${notificationIds})`,
|
||||
),
|
||||
),
|
||||
(error) => {
|
||||
logger.error("Failed to mark notifications as unread", {
|
||||
...fctx,
|
||||
error,
|
||||
});
|
||||
return notificationErrors.markAsUnreadFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
},
|
||||
).map(() => {
|
||||
logger.info("Notifications marked as unread successfully", {
|
||||
...fctx,
|
||||
notificationIds,
|
||||
});
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
archive(
|
||||
fctx: FlowExecCtx,
|
||||
notificationIds: number[],
|
||||
userId: string,
|
||||
): ResultAsync<boolean, Err> {
|
||||
logger.info("Archiving notifications", {
|
||||
...fctx,
|
||||
notificationIds,
|
||||
userId,
|
||||
});
|
||||
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.update(notifications)
|
||||
.set({
|
||||
isArchived: true,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(notifications.userId, userId),
|
||||
sql`${notifications.id} = ANY(${notificationIds})`,
|
||||
),
|
||||
),
|
||||
(error) => {
|
||||
logger.error("Failed to archive notifications", {
|
||||
...fctx,
|
||||
error,
|
||||
});
|
||||
return notificationErrors.archiveFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
},
|
||||
).map(() => {
|
||||
logger.info("Notifications archived successfully", {
|
||||
...fctx,
|
||||
notificationIds,
|
||||
});
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
unarchive(
|
||||
fctx: FlowExecCtx,
|
||||
notificationIds: number[],
|
||||
userId: string,
|
||||
): ResultAsync<boolean, Err> {
|
||||
logger.info("Unarchiving notifications", {
|
||||
...fctx,
|
||||
notificationIds,
|
||||
userId,
|
||||
});
|
||||
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.update(notifications)
|
||||
.set({
|
||||
isArchived: false,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(notifications.userId, userId),
|
||||
sql`${notifications.id} = ANY(${notificationIds})`,
|
||||
),
|
||||
),
|
||||
(error) => {
|
||||
logger.error("Failed to unarchive notifications", {
|
||||
...fctx,
|
||||
error,
|
||||
});
|
||||
return notificationErrors.unarchiveFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
},
|
||||
).map(() => {
|
||||
logger.info("Notifications unarchived successfully", {
|
||||
...fctx,
|
||||
notificationIds,
|
||||
});
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
deleteNotifications(
|
||||
fctx: FlowExecCtx,
|
||||
notificationIds: number[],
|
||||
userId: string,
|
||||
): ResultAsync<boolean, Err> {
|
||||
logger.info("Deleting notifications", {
|
||||
...fctx,
|
||||
notificationIds,
|
||||
userId,
|
||||
});
|
||||
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.delete(notifications)
|
||||
.where(
|
||||
and(
|
||||
eq(notifications.userId, userId),
|
||||
sql`${notifications.id} = ANY(${notificationIds})`,
|
||||
),
|
||||
),
|
||||
(error) => {
|
||||
logger.error("Failed to delete notifications", {
|
||||
...fctx,
|
||||
error,
|
||||
});
|
||||
return notificationErrors.deleteNotificationsFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
},
|
||||
).map(() => {
|
||||
logger.info("Notifications deleted successfully", {
|
||||
...fctx,
|
||||
notificationIds,
|
||||
});
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
getUnreadCount(
|
||||
fctx: FlowExecCtx,
|
||||
userId: string,
|
||||
): ResultAsync<number, Err> {
|
||||
logger.info("Getting unread count", { ...fctx, userId });
|
||||
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.select({ count: count() })
|
||||
.from(notifications)
|
||||
.where(
|
||||
and(
|
||||
eq(notifications.userId, userId),
|
||||
eq(notifications.isRead, false),
|
||||
eq(notifications.isArchived, false),
|
||||
),
|
||||
),
|
||||
(error) => {
|
||||
logger.error("Failed to get unread count", { ...fctx, error });
|
||||
return notificationErrors.getUnreadCountFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
},
|
||||
).map((result) => {
|
||||
const count = result[0]?.count || 0;
|
||||
logger.info("Retrieved unread count", { ...fctx, count });
|
||||
return count;
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user