Files

885 lines
32 KiB
TypeScript

import {
Database,
and,
asc,
count,
desc,
eq,
like,
or,
} from "@pkg/db";
import { file, mobileDevice, mobileMediaAsset, mobileSMS, user } from "@pkg/db/schema";
import { ResultAsync, errAsync, okAsync } from "neverthrow";
import { FlowExecCtx } from "@core/flow.execution.context";
import type {
ListMobileDeviceMediaFilters,
ListMobileDeviceSMSFilters,
ListMobileDevicesFilters,
MobileDevice,
MobileDeviceDetail,
MobileMediaAssetInput,
MobilePagination,
MobileSMSInput,
PaginatedMobileDevices,
PaginatedMobileMedia,
PaginatedMobileSMS,
RegisterMobileDevice,
} from "./data";
import { type Err } from "@pkg/result";
import { mobileErrors } from "./errors";
import { logDomainEvent } from "@pkg/logger";
import { createHash } from "node:crypto";
export class MobileRepository {
constructor(private db: Database) {}
private normalizeDate(
fctx: FlowExecCtx,
value: Date | string | null | undefined,
field: string,
required: boolean,
): ResultAsync<Date | null, Err> {
if (value === undefined || value === null) {
if (required) {
return errAsync(
mobileErrors.invalidPayload(fctx, `${field} is required`),
);
}
return okAsync(null);
}
const normalized = value instanceof Date ? value : new Date(value);
if (Number.isNaN(normalized.getTime())) {
return errAsync(
mobileErrors.invalidPayload(fctx, `${field} must be a valid date`),
);
}
return okAsync(normalized);
}
private makeSMSDedupHash(deviceId: number, sms: MobileSMSInput): string {
const sentAt =
sms.sentAt instanceof Date
? sms.sentAt.toISOString()
: new Date(sms.sentAt).toISOString();
const fingerprint = `${deviceId}|${sentAt}|${sms.sender}|${sms.body}`;
return createHash("sha256").update(fingerprint).digest("hex");
}
findAdminOwnerId(
fctx: FlowExecCtx,
preferredAdminEmail?: string,
): ResultAsync<string, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "mobile.admin_owner.resolve.started",
fctx,
meta: { hasPreferredEmail: Boolean(preferredAdminEmail) },
});
const byPreferredEmail = preferredAdminEmail
? this.db
.select({ id: user.id })
.from(user)
.where(eq(user.email, preferredAdminEmail))
.limit(1)
: Promise.resolve([]);
return ResultAsync.fromPromise(byPreferredEmail, (error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((preferredUser) => {
if (preferredUser[0]?.id) {
logDomainEvent({
event: "mobile.admin_owner.resolve.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { ownerUserId: preferredUser[0].id, via: "preferred_email" },
});
return okAsync(preferredUser[0].id);
}
return ResultAsync.fromPromise(
this.db
.select({ id: user.id })
.from(user)
.where(eq(user.role, "admin"))
.orderBy(asc(user.createdAt))
.limit(1),
(error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((admins) => {
if (!admins[0]?.id) {
logDomainEvent({
level: "warn",
event: "mobile.admin_owner.resolve.failed",
fctx,
durationMs: Date.now() - startedAt,
});
return errAsync(mobileErrors.adminOwnerNotFound(fctx));
}
logDomainEvent({
event: "mobile.admin_owner.resolve.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { ownerUserId: admins[0].id, via: "admin_role" },
});
return okAsync(admins[0].id);
});
});
}
upsertDevice(
fctx: FlowExecCtx,
payload: RegisterMobileDevice,
ownerUserId: string,
): ResultAsync<MobileDevice, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "mobile.register.started",
fctx,
meta: {
externalDeviceId: payload.externalDeviceId,
ownerUserId,
},
});
const now = new Date();
return ResultAsync.fromPromise(
this.db
.insert(mobileDevice)
.values({
externalDeviceId: payload.externalDeviceId,
name: payload.name,
manufacturer: payload.manufacturer,
model: payload.model,
androidVersion: payload.androidVersion,
ownerUserId,
lastPingAt: now,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: mobileDevice.externalDeviceId,
set: {
name: payload.name,
manufacturer: payload.manufacturer,
model: payload.model,
androidVersion: payload.androidVersion,
ownerUserId,
lastPingAt: now,
updatedAt: now,
},
})
.returning(),
(error) => {
logDomainEvent({
level: "error",
event: "mobile.register.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { externalDeviceId: payload.externalDeviceId },
});
return mobileErrors.registerDeviceFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((result) => {
const device = result[0] as MobileDevice;
logDomainEvent({
event: "mobile.register.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { deviceId: device.id, externalDeviceId: device.externalDeviceId },
});
return device;
});
}
getDeviceByExternalId(
fctx: FlowExecCtx,
externalDeviceId: string,
): ResultAsync<MobileDevice, Err> {
return ResultAsync.fromPromise(
this.db
.select()
.from(mobileDevice)
.where(eq(mobileDevice.externalDeviceId, externalDeviceId))
.limit(1),
(error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((rows) => {
if (!rows[0]) {
return errAsync(mobileErrors.deviceNotFound(fctx, externalDeviceId));
}
return okAsync(rows[0] as MobileDevice);
});
}
touchDevicePing(
fctx: FlowExecCtx,
deviceId: number,
pingAt?: Date,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
const resolvedPingAt = pingAt ?? new Date();
logDomainEvent({
event: "mobile.ping.started",
fctx,
meta: { deviceId, pingAt: resolvedPingAt.toISOString() },
});
return ResultAsync.fromPromise(
this.db
.update(mobileDevice)
.set({ lastPingAt: resolvedPingAt, updatedAt: new Date() })
.where(eq(mobileDevice.id, deviceId))
.returning({ id: mobileDevice.id }),
(error) => {
logDomainEvent({
level: "error",
event: "mobile.ping.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { deviceId },
});
return mobileErrors.pingDeviceFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((updatedRows) => {
if (!updatedRows[0]) {
return errAsync(mobileErrors.deviceNotFoundById(fctx, deviceId));
}
logDomainEvent({
event: "mobile.ping.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { deviceId },
});
return okAsync(true);
});
}
syncSMS(
fctx: FlowExecCtx,
deviceId: number,
messages: MobileSMSInput[],
): ResultAsync<{ received: number; inserted: number }, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "mobile.sms.sync.started",
fctx,
meta: { deviceId, received: messages.length },
});
const now = new Date();
const parseResult = ResultAsync.combine(
messages.map((message) =>
this.normalizeDate(fctx, message.sentAt, "messages.sentAt", true).andThen(
(sentAt) =>
this.normalizeDate(
fctx,
(message.receivedAt ?? null) as Date | string | null,
"messages.receivedAt",
false,
).map((receivedAt) => ({
deviceId,
externalMessageId: message.externalMessageId ?? null,
sender: message.sender,
recipient: message.recipient ?? null,
body: message.body,
sentAt: sentAt as Date,
receivedAt,
dedupHash:
message.dedupHash ??
this.makeSMSDedupHash(deviceId, message),
rawPayload: message.rawPayload,
createdAt: now,
updatedAt: now,
})),
),
),
);
return parseResult.andThen((rows) =>
ResultAsync.fromPromise(
this.db
.insert(mobileSMS)
.values(rows)
.onConflictDoNothing()
.returning({ id: mobileSMS.id }),
(error) => {
logDomainEvent({
level: "error",
event: "mobile.sms.sync.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { deviceId, received: messages.length },
});
return mobileErrors.syncSMSFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((insertedRows) => {
logDomainEvent({
event: "mobile.sms.sync.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: {
deviceId,
received: messages.length,
inserted: insertedRows.length,
},
});
return { received: messages.length, inserted: insertedRows.length };
}),
);
}
syncMediaAssets(
fctx: FlowExecCtx,
deviceId: number,
assets: MobileMediaAssetInput[],
): ResultAsync<{ received: number; inserted: number }, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "mobile.media.sync.started",
fctx,
meta: { deviceId, received: assets.length },
});
const now = new Date();
const parseResult = ResultAsync.combine(
assets.map((asset) =>
this.normalizeDate(
fctx,
(asset.capturedAt ?? null) as Date | string | null,
"assets.capturedAt",
false,
).map((capturedAt) => ({
deviceId,
externalMediaId: asset.externalMediaId ?? null,
fileId: asset.fileId,
mimeType: asset.mimeType,
filename: asset.filename ?? null,
capturedAt,
sizeBytes: asset.sizeBytes ?? null,
hash: asset.hash ?? null,
metadata: asset.metadata,
createdAt: now,
updatedAt: now,
})),
),
);
return parseResult.andThen((rows) =>
ResultAsync.fromPromise(
this.db
.insert(mobileMediaAsset)
.values(rows)
.onConflictDoNothing()
.returning({ id: mobileMediaAsset.id }),
(error) => {
logDomainEvent({
level: "error",
event: "mobile.media.sync.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { deviceId, received: assets.length },
});
return mobileErrors.syncMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((insertedRows) => {
logDomainEvent({
event: "mobile.media.sync.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: {
deviceId,
received: assets.length,
inserted: insertedRows.length,
},
});
return { received: assets.length, inserted: insertedRows.length };
}),
);
}
findMediaAssetByExternalMediaId(
fctx: FlowExecCtx,
deviceId: number,
externalMediaId: string,
): ResultAsync<{ id: number; fileId: string } | null, Err> {
return ResultAsync.fromPromise(
this.db
.select({
id: mobileMediaAsset.id,
fileId: mobileMediaAsset.fileId,
})
.from(mobileMediaAsset)
.where(
and(
eq(mobileMediaAsset.deviceId, deviceId),
eq(mobileMediaAsset.externalMediaId, externalMediaId),
),
)
.limit(1),
(error) =>
mobileErrors.listMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).map((rows) => rows[0] ?? null);
}
findMediaAssetByHash(
fctx: FlowExecCtx,
deviceId: number,
hash: string,
): ResultAsync<{ id: number; fileId: string } | null, Err> {
return ResultAsync.fromPromise(
this.db
.select({
id: mobileMediaAsset.id,
fileId: mobileMediaAsset.fileId,
})
.from(mobileMediaAsset)
.where(
and(
eq(mobileMediaAsset.deviceId, deviceId),
eq(mobileMediaAsset.hash, hash),
),
)
.limit(1),
(error) =>
mobileErrors.listMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).map((rows) => rows[0] ?? null);
}
listDevices(
fctx: FlowExecCtx,
filters: ListMobileDevicesFilters,
pagination: MobilePagination,
): ResultAsync<PaginatedMobileDevices, Err> {
const startedAt = Date.now();
const conditions = [eq(mobileDevice.ownerUserId, filters.ownerUserId)];
if (filters.search) {
conditions.push(
or(
like(mobileDevice.name, `%${filters.search}%`),
like(mobileDevice.externalDeviceId, `%${filters.search}%`),
like(mobileDevice.manufacturer, `%${filters.search}%`),
like(mobileDevice.model, `%${filters.search}%`),
)!,
);
}
const whereClause = and(...conditions);
return ResultAsync.fromPromise(
this.db.select({ count: count() }).from(mobileDevice).where(whereClause),
(error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((countRows) => {
const total = countRows[0]?.count || 0;
const page = pagination.page;
const pageSize = pagination.pageSize;
const offset = (page - 1) * pageSize;
const orderColumn =
pagination.sortBy === "createdAt"
? mobileDevice.createdAt
: pagination.sortBy === "updatedAt"
? mobileDevice.updatedAt
: mobileDevice.lastPingAt;
const orderFn = pagination.sortOrder === "asc" ? asc : desc;
return ResultAsync.fromPromise(
this.db
.select()
.from(mobileDevice)
.where(whereClause)
.orderBy(orderFn(orderColumn), desc(mobileDevice.id))
.limit(pageSize)
.offset(offset),
(error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).map((rows) => ({
data: rows as MobileDevice[],
total,
page,
pageSize,
totalPages: Math.ceil(total / pageSize),
}));
}).map((result) => {
logDomainEvent({
event: "mobile.devices.list.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { count: result.data.length, total: result.total },
});
return result;
});
}
getDeviceDetail(
fctx: FlowExecCtx,
deviceId: number,
ownerUserId: string,
): ResultAsync<MobileDeviceDetail, Err> {
const startedAt = Date.now();
return ResultAsync.fromPromise(
this.db
.select()
.from(mobileDevice)
.where(
and(
eq(mobileDevice.id, deviceId),
eq(mobileDevice.ownerUserId, ownerUserId),
),
)
.limit(1),
(error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((devices) => {
const device = devices[0];
if (!device) {
return errAsync(mobileErrors.deviceNotFoundById(fctx, deviceId));
}
return ResultAsync.combine([
ResultAsync.fromPromise(
this.db
.select({ count: count() })
.from(mobileSMS)
.where(eq(mobileSMS.deviceId, deviceId)),
(error) =>
mobileErrors.listSMSFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
),
ResultAsync.fromPromise(
this.db
.select({ count: count() })
.from(mobileMediaAsset)
.where(eq(mobileMediaAsset.deviceId, deviceId)),
(error) =>
mobileErrors.listMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
),
]).map(([smsCountRows, mediaCountRows]) => ({
device: device as MobileDevice,
smsCount: smsCountRows[0]?.count || 0,
mediaCount: mediaCountRows[0]?.count || 0,
}));
}).map((result) => {
logDomainEvent({
event: "mobile.device.detail.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { deviceId },
});
return result;
});
}
listDeviceSMS(
fctx: FlowExecCtx,
filters: ListMobileDeviceSMSFilters,
pagination: MobilePagination,
): ResultAsync<PaginatedMobileSMS, Err> {
const conditions = [eq(mobileSMS.deviceId, filters.deviceId)];
if (filters.search) {
conditions.push(
or(
like(mobileSMS.sender, `%${filters.search}%`),
like(mobileSMS.recipient, `%${filters.search}%`),
like(mobileSMS.body, `%${filters.search}%`),
)!,
);
}
const whereClause = and(...conditions);
const page = pagination.page;
const pageSize = pagination.pageSize;
const offset = (page - 1) * pageSize;
const orderFn = pagination.sortOrder === "asc" ? asc : desc;
return ResultAsync.fromPromise(
this.db.select({ count: count() }).from(mobileSMS).where(whereClause),
(error) =>
mobileErrors.listSMSFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((countRows) =>
ResultAsync.fromPromise(
this.db
.select()
.from(mobileSMS)
.where(whereClause)
.orderBy(orderFn(mobileSMS.sentAt), desc(mobileSMS.id))
.limit(pageSize)
.offset(offset),
(error) =>
mobileErrors.listSMSFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).map((rows) => {
const total = countRows[0]?.count || 0;
return {
data: rows,
total,
page,
pageSize,
totalPages: Math.ceil(total / pageSize),
};
}),
);
}
listDeviceMedia(
fctx: FlowExecCtx,
filters: ListMobileDeviceMediaFilters,
pagination: MobilePagination,
): ResultAsync<PaginatedMobileMedia, Err> {
const conditions = [eq(mobileMediaAsset.deviceId, filters.deviceId)];
if (filters.mimeType) {
conditions.push(like(mobileMediaAsset.mimeType, `${filters.mimeType}%`));
}
if (filters.search) {
conditions.push(like(mobileMediaAsset.filename, `%${filters.search}%`));
}
const whereClause = and(...conditions);
const page = pagination.page;
const pageSize = pagination.pageSize;
const offset = (page - 1) * pageSize;
const orderFn = pagination.sortOrder === "asc" ? asc : desc;
return ResultAsync.fromPromise(
this.db
.select({ count: count() })
.from(mobileMediaAsset)
.where(whereClause),
(error) =>
mobileErrors.listMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((countRows) =>
ResultAsync.fromPromise(
this.db
.select({
id: mobileMediaAsset.id,
deviceId: mobileMediaAsset.deviceId,
externalMediaId: mobileMediaAsset.externalMediaId,
fileId: mobileMediaAsset.fileId,
r2Url: file.r2Url,
mimeType: mobileMediaAsset.mimeType,
filename: mobileMediaAsset.filename,
capturedAt: mobileMediaAsset.capturedAt,
sizeBytes: mobileMediaAsset.sizeBytes,
hash: mobileMediaAsset.hash,
metadata: mobileMediaAsset.metadata,
createdAt: mobileMediaAsset.createdAt,
updatedAt: mobileMediaAsset.updatedAt,
})
.from(mobileMediaAsset)
.leftJoin(file, eq(mobileMediaAsset.fileId, file.id))
.where(whereClause)
.orderBy(orderFn(mobileMediaAsset.createdAt), desc(mobileMediaAsset.id))
.limit(pageSize)
.offset(offset),
(error) =>
mobileErrors.listMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).map((rows) => {
const total = countRows[0]?.count || 0;
return {
data: rows,
total,
page,
pageSize,
totalPages: Math.ceil(total / pageSize),
};
}),
);
}
deleteMediaAsset(
fctx: FlowExecCtx,
mediaAssetId: number,
ownerUserId: string,
): ResultAsync<string, Err> {
const startedAt = Date.now();
return ResultAsync.fromPromise(
this.db
.select({
mediaAssetId: mobileMediaAsset.id,
fileId: mobileMediaAsset.fileId,
ownerUserId: mobileDevice.ownerUserId,
})
.from(mobileMediaAsset)
.innerJoin(mobileDevice, eq(mobileMediaAsset.deviceId, mobileDevice.id))
.where(
and(
eq(mobileMediaAsset.id, mediaAssetId),
eq(mobileDevice.ownerUserId, ownerUserId),
),
)
.limit(1),
(error) =>
mobileErrors.deleteMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((rows) => {
const target = rows[0];
if (!target) {
return errAsync(mobileErrors.mediaAssetNotFound(fctx, mediaAssetId));
}
return ResultAsync.fromPromise(
this.db
.delete(mobileMediaAsset)
.where(eq(mobileMediaAsset.id, mediaAssetId))
.returning({ fileId: mobileMediaAsset.fileId }),
(error) =>
mobileErrors.deleteMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((deletedRows) => {
const deleted = deletedRows[0];
if (!deleted) {
return errAsync(
mobileErrors.mediaAssetNotFound(fctx, mediaAssetId),
);
}
return okAsync(deleted.fileId);
});
}).map((fileId) => {
logDomainEvent({
event: "mobile.media.delete.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { mediaAssetId, ownerUserId, fileId },
});
return fileId;
});
}
deleteDevice(
fctx: FlowExecCtx,
deviceId: number,
ownerUserId: string,
): ResultAsync<{ fileIds: string[] }, Err> {
const startedAt = Date.now();
return ResultAsync.fromPromise(
this.db
.select({ id: mobileDevice.id })
.from(mobileDevice)
.where(
and(
eq(mobileDevice.id, deviceId),
eq(mobileDevice.ownerUserId, ownerUserId),
),
)
.limit(1),
(error) =>
mobileErrors.deleteDeviceFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((devices) => {
if (!devices[0]) {
return errAsync(mobileErrors.deviceNotFoundById(fctx, deviceId));
}
return ResultAsync.fromPromise(
this.db
.select({ fileId: mobileMediaAsset.fileId })
.from(mobileMediaAsset)
.where(eq(mobileMediaAsset.deviceId, deviceId)),
(error) =>
mobileErrors.deleteDeviceFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).map((rows) => ({
fileIds: [...new Set(rows.map((item) => item.fileId))],
}));
}).map((result) => {
logDomainEvent({
event: "mobile.device.delete.prepared",
fctx,
durationMs: Date.now() - startedAt,
meta: { deviceId, deletedFileCount: result.fileIds.length },
});
return result;
});
}
finalizeDeleteDevice(
fctx: FlowExecCtx,
deviceId: number,
ownerUserId: string,
): ResultAsync<boolean, Err> {
return ResultAsync.fromPromise(
this.db
.delete(mobileDevice)
.where(
and(
eq(mobileDevice.id, deviceId),
eq(mobileDevice.ownerUserId, ownerUserId),
),
)
.returning({ id: mobileDevice.id }),
(error) =>
mobileErrors.deleteDeviceFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((rows) => {
if (!rows[0]) {
return errAsync(mobileErrors.deviceNotFoundById(fctx, deviceId));
}
return okAsync(true);
});
}
}