Files

454 lines
14 KiB
TypeScript

import { and, asc, count, Database, desc, eq, like, or, sql } from "@pkg/db";
import { notifications } from "@pkg/db/schema";
import { ResultAsync } from "neverthrow";
import { FlowExecCtx } from "@core/flow.execution.context";
import type {
Notification,
NotificationFilters,
PaginatedNotifications,
PaginationOptions,
} from "./data";
import { type Err } from "@pkg/result";
import { notificationErrors } from "./errors";
import { logDomainEvent } from "@pkg/logger";
export class NotificationRepository {
constructor(private db: Database) {}
getNotifications(
fctx: FlowExecCtx,
filters: NotificationFilters,
pagination: PaginationOptions,
): ResultAsync<PaginatedNotifications, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "notifications.list.started",
fctx,
meta: {
hasSearch: Boolean(filters.search),
isRead: filters.isRead,
isArchived: filters.isArchived,
page: pagination.page,
pageSize: pagination.pageSize,
sortBy: pagination.sortBy,
sortOrder: pagination.sortOrder,
},
});
const { userId, isRead, isArchived, type, category, priority, search } =
filters;
const {
page,
pageSize,
sortBy = "createdAt",
sortOrder = "desc",
} = pagination;
// Build WHERE conditions
const conditions = [eq(notifications.userId, userId)];
if (isRead !== undefined) {
conditions.push(eq(notifications.isRead, isRead));
}
if (isArchived !== undefined) {
conditions.push(eq(notifications.isArchived, isArchived));
}
if (type) {
conditions.push(eq(notifications.type, type));
}
if (category) {
conditions.push(eq(notifications.category, category));
}
if (priority) {
conditions.push(eq(notifications.priority, priority));
}
if (search) {
conditions.push(
or(
like(notifications.title, `%${search}%`),
like(notifications.body, `%${search}%`),
)!,
);
}
const whereClause = and(...conditions);
return ResultAsync.fromPromise(
this.db.select({ count: count() }).from(notifications).where(whereClause),
(error) => {
logDomainEvent({
level: "error",
event: "notifications.list.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
});
return notificationErrors.getNotificationsFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((totalResult) => {
const total = totalResult[0]?.count || 0;
const offset = (page - 1) * pageSize;
// Map sortBy to proper column
const getOrderColumn = (sortBy: string) => {
switch (sortBy) {
case "createdAt":
return notifications.createdAt;
case "sentAt":
return notifications.sentAt;
case "readAt":
return notifications.readAt;
case "priority":
return notifications.priority;
default:
return notifications.createdAt;
}
};
const orderColumn = getOrderColumn(sortBy);
const orderFunc = sortOrder === "asc" ? asc : desc;
return ResultAsync.fromPromise(
this.db
.select()
.from(notifications)
.where(whereClause)
.orderBy(orderFunc(orderColumn))
.limit(pageSize)
.offset(offset),
(error) => {
logDomainEvent({
level: "error",
event: "notifications.list.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
});
return notificationErrors.getNotificationsFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((data) => {
const totalPages = Math.ceil(total / pageSize);
logDomainEvent({
event: "notifications.list.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: {
count: data.length,
page,
totalPages,
},
});
return {
data: data as Notification[],
total,
page,
pageSize,
totalPages,
};
});
});
}
markAsRead(
fctx: FlowExecCtx,
notificationIds: number[],
userId: string,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "notifications.mark_read.started",
fctx,
meta: { userId, notificationCount: notificationIds.length },
});
return ResultAsync.fromPromise(
this.db
.update(notifications)
.set({
isRead: true,
readAt: new Date(),
updatedAt: new Date(),
})
.where(
and(
eq(notifications.userId, userId),
sql`${notifications.id} = ANY(${notificationIds})`,
),
),
(error) => {
logDomainEvent({
level: "error",
event: "notifications.mark_read.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
});
return notificationErrors.markAsReadFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logDomainEvent({
event: "notifications.mark_read.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { notificationCount: notificationIds.length },
});
return true;
});
}
markAsUnread(
fctx: FlowExecCtx,
notificationIds: number[],
userId: string,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "notifications.mark_unread.started",
fctx,
meta: { userId, notificationCount: notificationIds.length },
});
return ResultAsync.fromPromise(
this.db
.update(notifications)
.set({
isRead: false,
readAt: null,
updatedAt: new Date(),
})
.where(
and(
eq(notifications.userId, userId),
sql`${notifications.id} = ANY(${notificationIds})`,
),
),
(error) => {
logDomainEvent({
level: "error",
event: "notifications.mark_unread.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
});
return notificationErrors.markAsUnreadFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logDomainEvent({
event: "notifications.mark_unread.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { notificationCount: notificationIds.length },
});
return true;
});
}
archive(
fctx: FlowExecCtx,
notificationIds: number[],
userId: string,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "notifications.archive.started",
fctx,
meta: { userId, notificationCount: notificationIds.length },
});
return ResultAsync.fromPromise(
this.db
.update(notifications)
.set({
isArchived: true,
updatedAt: new Date(),
})
.where(
and(
eq(notifications.userId, userId),
sql`${notifications.id} = ANY(${notificationIds})`,
),
),
(error) => {
logDomainEvent({
level: "error",
event: "notifications.archive.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
});
return notificationErrors.archiveFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logDomainEvent({
event: "notifications.archive.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { notificationCount: notificationIds.length },
});
return true;
});
}
unarchive(
fctx: FlowExecCtx,
notificationIds: number[],
userId: string,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "notifications.unarchive.started",
fctx,
meta: { userId, notificationCount: notificationIds.length },
});
return ResultAsync.fromPromise(
this.db
.update(notifications)
.set({
isArchived: false,
updatedAt: new Date(),
})
.where(
and(
eq(notifications.userId, userId),
sql`${notifications.id} = ANY(${notificationIds})`,
),
),
(error) => {
logDomainEvent({
level: "error",
event: "notifications.unarchive.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
});
return notificationErrors.unarchiveFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logDomainEvent({
event: "notifications.unarchive.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { notificationCount: notificationIds.length },
});
return true;
});
}
deleteNotifications(
fctx: FlowExecCtx,
notificationIds: number[],
userId: string,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "notifications.delete.started",
fctx,
meta: { userId, notificationCount: notificationIds.length },
});
return ResultAsync.fromPromise(
this.db
.delete(notifications)
.where(
and(
eq(notifications.userId, userId),
sql`${notifications.id} = ANY(${notificationIds})`,
),
),
(error) => {
logDomainEvent({
level: "error",
event: "notifications.delete.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
});
return notificationErrors.deleteNotificationsFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logDomainEvent({
event: "notifications.delete.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { notificationCount: notificationIds.length },
});
return true;
});
}
getUnreadCount(
fctx: FlowExecCtx,
userId: string,
): ResultAsync<number, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "notifications.unread_count.started",
fctx,
meta: { userId },
});
return ResultAsync.fromPromise(
this.db
.select({ count: count() })
.from(notifications)
.where(
and(
eq(notifications.userId, userId),
eq(notifications.isRead, false),
eq(notifications.isArchived, false),
),
),
(error) => {
logDomainEvent({
level: "error",
event: "notifications.unread_count.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
});
return notificationErrors.getUnreadCountFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((result) => {
const count = result[0]?.count || 0;
logDomainEvent({
event: "notifications.unread_count.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { count },
});
return count;
});
}
}