From 8aa8ddfa7dd291e8143fb7daf1f1dabef0bac312 Mon Sep 17 00:00:00 2001 From: user Date: Sun, 1 Mar 2026 06:33:32 +0200 Subject: [PATCH] implemented the domain logic + processor endpoints --- README.md | 44 +- apps/processor/package.json | 8 +- apps/processor/src/domains/mobile/router.ts | 263 ++++++- apps/processor/tsconfig.json | 21 +- packages/logic/domains/mobile/controller.ts | 206 +++++ packages/logic/domains/mobile/data.ts | 182 +++++ packages/logic/domains/mobile/errors.ts | 131 ++++ packages/logic/domains/mobile/repository.ts | 792 ++++++++++++++++++++ pnpm-lock.yaml | 12 + pnpm-workspace.yaml | 2 + 10 files changed, 1629 insertions(+), 32 deletions(-) create mode 100644 packages/logic/domains/mobile/controller.ts create mode 100644 packages/logic/domains/mobile/data.ts create mode 100644 packages/logic/domains/mobile/errors.ts create mode 100644 packages/logic/domains/mobile/repository.ts diff --git a/README.md b/README.md index 2748bc2..bb3f603 100644 --- a/README.md +++ b/README.md @@ -59,35 +59,35 @@ Decisions captured: Target: `packages/db/schema/*` + new migration files. -- [ ] Add `mobile_device` table: +- [x] Add `mobile_device` table: - Fields: `id`, `externalDeviceId`, `name`, `manufacturer`, `model`, `androidVersion`, `ownerUserId`, `lastPingAt`, `createdAt`, `updatedAt`. -- [ ] Add `mobile_sms` table: +- [x] Add `mobile_sms` table: - Fields: `id`, `deviceId`, `externalMessageId?`, `sender`, `recipient?`, `body`, `sentAt`, `receivedAt?`, `rawPayload?`, `createdAt`. -- [ ] Add `mobile_media_asset` table: +- [x] Add `mobile_media_asset` table: - Fields: `id`, `deviceId`, `externalMediaId?`, `mimeType`, `filename?`, `capturedAt?`, `sizeBytes?`, `hash?`, `metadata`, `createdAt`. -- [ ] Add indexes for common reads: +- [x] Add indexes for common reads: - `mobile_device.lastPingAt` - `mobile_sms.deviceId + sentAt desc` - `mobile_media_asset.deviceId + createdAt desc` -- [ ] Export schema from `packages/db/schema/index.ts`. +- [x] Export schema from `packages/db/schema/index.ts`. Definition of done: -- [ ] Tables and indexes exist in schema + migration files. -- [ ] Naming matches existing conventions. +- [x] Tables and indexes exist in schema + migration files. +- [x] Naming matches existing conventions. ### Phase 2: Logic Domain (`@pkg/logic`) Target: `packages/logic/domains/mobile/*` (new domain). -- [ ] Create `data.ts` with Valibot schemas and inferred types for: +- [x] Create `data.ts` with Valibot schemas and inferred types for: - register device payload - ping payload - sms sync payload - media sync payload - admin query filters/pagination -- [ ] Create `errors.ts` with domain error constructors using `getError`. -- [ ] Create `repository.ts` with ResultAsync operations for: +- [x] Create `errors.ts` with domain error constructors using `getError`. +- [x] Create `repository.ts` with ResultAsync operations for: - upsert device - update last ping - bulk insert sms (idempotent) @@ -98,33 +98,33 @@ Target: `packages/logic/domains/mobile/*` (new domain). - list media by device (paginated, latest first) - delete single media asset - delete device (removed all child sms, and media assets, and by extensionn the associated files in r2+db as well) -- [ ] Create `controller.ts` as use-case layer. -- [ ] Wrap repository/controller operations with `traceResultAsync` and include `fctx`. +- [x] Create `controller.ts` as use-case layer. +- [x] Wrap repository/controller operations with `traceResultAsync` and include `fctx`. Definition of done: -- [ ] Processor and main app can consume this domain without direct DB usage. -- [ ] No ad-hoc thrown errors in domain flow; Result pattern is preserved. +- [x] Processor and main app can consume this domain without direct DB usage. +- [x] No ad-hoc thrown errors in domain flow; Result pattern is preserved. ### Phase 3: Processor Ingestion API (`apps/processor`) Target: `apps/processor/src/domains/mobile/router.ts`. -- [ ] Replace stub endpoints with full handlers: +- [x] Replace stub endpoints with full handlers: - `POST /register` - `PUT /ping` - `PUT /sms/sync` - `PUT /media/sync` -- [ ] Validate request body using schemas from `@pkg/logic/domains/mobile/data`. -- [ ] Build `FlowExecCtx` per request (flow id always; user/session when available). -- [ ] Call mobile controller methods; return `{ data, error }` response shape. -- [ ] Add basic endpoint protection agreed in Phase 0. -- [ ] Add request-level structured logging for success/failure. +- [x] Validate request body using schemas from `@pkg/logic/domains/mobile/data`. +- [x] Build `FlowExecCtx` per request (flow id always; user/session when available). +- [x] Call mobile controller methods; return `{ data, error }` response shape. +- [x] Add basic endpoint protection agreed in Phase 0. +- [x] Add request-level structured logging for success/failure. Definition of done: -- [ ] Endpoints persist data into new tables. -- [ ] Endpoint failures return normalized domain errors. +- [x] Endpoints persist data into new tables. +- [x] Endpoint failures return normalized domain errors. ### Phase 4: Admin UI in `apps/main` diff --git a/apps/processor/package.json b/apps/processor/package.json index 339a559..20963ef 100644 --- a/apps/processor/package.json +++ b/apps/processor/package.json @@ -7,10 +7,14 @@ "start": "node dist/index.js" }, "dependencies": { + "@hono/node-server": "^1.19.9", + "@pkg/db": "workspace:*", "@pkg/logger": "workspace:*", "@pkg/logic": "workspace:*", - "@hono/node-server": "^1.19.9", - "hono": "^4.11.1" + "@pkg/result": "workspace:*", + "@pkg/settings": "workspace:*", + "hono": "^4.11.1", + "valibot": "^1.2.0" }, "devDependencies": { "@types/node": "^25.3.2", diff --git a/apps/processor/src/domains/mobile/router.ts b/apps/processor/src/domains/mobile/router.ts index 80e60e4..aafffef 100644 --- a/apps/processor/src/domains/mobile/router.ts +++ b/apps/processor/src/domains/mobile/router.ts @@ -1,15 +1,266 @@ +import { + pingMobileDeviceSchema, + registerMobileDeviceSchema, + syncMobileMediaSchema, + syncMobileSMSSchema, +} from "@pkg/logic/domains/mobile/data"; +import { getMobileController } from "@pkg/logic/domains/mobile/controller"; +import type { FlowExecCtx } from "@pkg/logic/core/flow.execution.context"; +import { errorStatusMap, type Err } from "@pkg/result"; +import { logDomainEvent } from "@pkg/logger"; +import type { Context } from "hono"; +import * as v from "valibot"; import { Hono } from "hono"; +const mobileController = getMobileController(); +const DEVICE_ID_HEADER = "x-device-id"; +const FLOW_ID_HEADER = "x-flow-id"; + +function buildFlowExecCtx(c: Context): FlowExecCtx { + return { + flowId: c.req.header(FLOW_ID_HEADER) || crypto.randomUUID(), + }; +} + +function respondError(c: Context, fctx: FlowExecCtx, err: Err) { + const status = errorStatusMap[err.code] || 500; + c.status(status as any); + return c.json({ data: null, error: { ...err, flowId: fctx.flowId } }); +} + +function requireExternalDeviceId(c: Context): string | null { + const externalDeviceId = c.req.header(DEVICE_ID_HEADER); + if (!externalDeviceId || externalDeviceId.trim().length === 0) { + return null; + } + return externalDeviceId.trim(); +} + +async function parseJson(c: Context) { + try { + return await c.req.json(); + } catch { + return null; + } +} + export const mobileRouter = new Hono() + .post("/register", async (c) => { + const fctx = buildFlowExecCtx(c); + const startedAt = Date.now(); + + logDomainEvent({ + event: "processor.mobile.register.started", + fctx, + }); + + const payload = await parseJson(c); + const parsed = v.safeParse(registerMobileDeviceSchema, payload); + if (!parsed.success) { + const error = { + flowId: fctx.flowId, + code: "VALIDATION_ERROR", + message: "Invalid register payload", + description: "Please validate the payload and retry", + detail: parsed.issues.map((issue) => issue.message).join(", "), + } as Err; + return respondError(c, fctx, error); + } + + const result = await mobileController.registerDevice( + fctx, + parsed.output, + ); + if (result.isErr()) { + logDomainEvent({ + level: "error", + event: "processor.mobile.register.failed", + fctx, + durationMs: Date.now() - startedAt, + error: result.error, + }); + return respondError(c, fctx, result.error); + } + + logDomainEvent({ + event: "processor.mobile.register.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { deviceId: result.value.id }, + }); + return c.json({ data: result.value, error: null }); + }) .put("/ping", async (c) => { - return c.json({ data: "" }); + const fctx = buildFlowExecCtx(c); + const startedAt = Date.now(); + const externalDeviceId = requireExternalDeviceId(c); + + if (!externalDeviceId) { + const error = { + flowId: fctx.flowId, + code: "AUTH_ERROR", + message: "Missing device id header", + description: `Request must include ${DEVICE_ID_HEADER}`, + detail: `${DEVICE_ID_HEADER} header is required`, + } as Err; + return respondError(c, fctx, error); + } + + const payload = await parseJson(c); + const parsed = v.safeParse(pingMobileDeviceSchema, payload ?? {}); + if (!parsed.success) { + const error = { + flowId: fctx.flowId, + code: "VALIDATION_ERROR", + message: "Invalid ping payload", + description: "Please validate the payload and retry", + detail: parsed.issues.map((issue) => issue.message).join(", "), + } as Err; + return respondError(c, fctx, error); + } + + const result = await mobileController.pingByExternalDeviceId( + fctx, + externalDeviceId, + parsed.output, + ); + if (result.isErr()) { + logDomainEvent({ + level: "error", + event: "processor.mobile.ping.failed", + fctx, + durationMs: Date.now() - startedAt, + error: result.error, + meta: { externalDeviceId }, + }); + return respondError(c, fctx, result.error); + } + + logDomainEvent({ + event: "processor.mobile.ping.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { externalDeviceId }, + }); + return c.json({ data: { ok: result.value }, error: null }); }) .put("/sms/sync", async (c) => { - return c.json({ data: "" }); + const fctx = buildFlowExecCtx(c); + const startedAt = Date.now(); + const externalDeviceId = requireExternalDeviceId(c); + + if (!externalDeviceId) { + const error = { + flowId: fctx.flowId, + code: "AUTH_ERROR", + message: "Missing device id header", + description: `Request must include ${DEVICE_ID_HEADER}`, + detail: `${DEVICE_ID_HEADER} header is required`, + } as Err; + return respondError(c, fctx, error); + } + + const payload = await parseJson(c); + const parsed = v.safeParse(syncMobileSMSSchema, payload); + if (!parsed.success) { + const error = { + flowId: fctx.flowId, + code: "VALIDATION_ERROR", + message: "Invalid SMS sync payload", + description: "Please validate the payload and retry", + detail: parsed.issues.map((issue) => issue.message).join(", "), + } as Err; + return respondError(c, fctx, error); + } + + const result = await mobileController.syncSMSByExternalDeviceId( + fctx, + externalDeviceId, + parsed.output.messages, + ); + if (result.isErr()) { + logDomainEvent({ + level: "error", + event: "processor.mobile.sms_sync.failed", + fctx, + durationMs: Date.now() - startedAt, + error: result.error, + meta: { + externalDeviceId, + received: parsed.output.messages.length, + }, + }); + return respondError(c, fctx, result.error); + } + + logDomainEvent({ + event: "processor.mobile.sms_sync.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { + externalDeviceId, + ...result.value, + }, + }); + return c.json({ data: result.value, error: null }); }) .put("/media/sync", async (c) => { - return c.json({ data: "" }); - }) - .post("/register", async (c) => { - return c.json({ data: "" }); + const fctx = buildFlowExecCtx(c); + const startedAt = Date.now(); + const externalDeviceId = requireExternalDeviceId(c); + + if (!externalDeviceId) { + const error = { + flowId: fctx.flowId, + code: "AUTH_ERROR", + message: "Missing device id header", + description: `Request must include ${DEVICE_ID_HEADER}`, + detail: `${DEVICE_ID_HEADER} header is required`, + } as Err; + return respondError(c, fctx, error); + } + + const payload = await parseJson(c); + const parsed = v.safeParse(syncMobileMediaSchema, payload); + if (!parsed.success) { + const error = { + flowId: fctx.flowId, + code: "VALIDATION_ERROR", + message: "Invalid media sync payload", + description: "Please validate the payload and retry", + detail: parsed.issues.map((issue) => issue.message).join(", "), + } as Err; + return respondError(c, fctx, error); + } + + const result = await mobileController.syncMediaByExternalDeviceId( + fctx, + externalDeviceId, + parsed.output.assets, + ); + if (result.isErr()) { + logDomainEvent({ + level: "error", + event: "processor.mobile.media_sync.failed", + fctx, + durationMs: Date.now() - startedAt, + error: result.error, + meta: { + externalDeviceId, + received: parsed.output.assets.length, + }, + }); + return respondError(c, fctx, result.error); + } + + logDomainEvent({ + event: "processor.mobile.media_sync.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { + externalDeviceId, + ...result.value, + }, + }); + return c.json({ data: result.value, error: null }); }); diff --git a/apps/processor/tsconfig.json b/apps/processor/tsconfig.json index b55223e..fce7dfe 100644 --- a/apps/processor/tsconfig.json +++ b/apps/processor/tsconfig.json @@ -1,13 +1,30 @@ { "compilerOptions": { "target": "ESNext", - "module": "NodeNext", + "module": "ESNext", + "moduleResolution": "bundler", "strict": true, - "verbatimModuleSyntax": true, + "verbatimModuleSyntax": false, "skipLibCheck": true, "types": [ "node" ], + "baseUrl": ".", + "paths": { + "@pkg/logic": ["../../packages/logic"], + "@pkg/logic/*": ["../../packages/logic/*"], + "@pkg/db": ["../../packages/db"], + "@pkg/db/*": ["../../packages/db/*"], + "@pkg/logger": ["../../packages/logger"], + "@pkg/logger/*": ["../../packages/logger/*"], + "@pkg/result": ["../../packages/result"], + "@pkg/result/*": ["../../packages/result/*"], + "@pkg/settings": ["../../packages/settings"], + "@pkg/settings/*": ["../../packages/settings/*"], + "@/*": ["../../packages/logic/*"], + "@core/*": ["../../packages/logic/core/*"], + "@domains/*": ["../../packages/logic/domains/*"] + }, "jsx": "react-jsx", "jsxImportSource": "hono/jsx", "outDir": "./dist" diff --git a/packages/logic/domains/mobile/controller.ts b/packages/logic/domains/mobile/controller.ts new file mode 100644 index 0000000..ff5b90c --- /dev/null +++ b/packages/logic/domains/mobile/controller.ts @@ -0,0 +1,206 @@ +import { + ListMobileDeviceMediaFilters, + ListMobileDeviceSMSFilters, + ListMobileDevicesFilters, + MobileMediaAssetInput, + MobilePagination, + MobileSMSInput, + PingMobileDevice, + RegisterMobileDevice, +} from "./data"; +import { FlowExecCtx } from "@core/flow.execution.context"; +import { traceResultAsync } from "@core/observability"; +import { MobileRepository } from "./repository"; +import { errAsync } from "neverthrow"; +import { db } from "@pkg/db"; +import { settings } from "@core/settings"; +import { mobileErrors } from "./errors"; + +export class MobileController { + constructor( + private mobileRepo: MobileRepository, + private defaultAdminEmail?: string, + ) {} + + registerDevice(fctx: FlowExecCtx, payload: RegisterMobileDevice) { + return traceResultAsync({ + name: "logic.mobile.controller.register", + fctx, + attributes: { "app.mobile.external_device_id": payload.externalDeviceId }, + fn: () => + this.mobileRepo + .findAdminOwnerId(fctx, this.defaultAdminEmail) + .andThen((ownerUserId) => + this.mobileRepo.upsertDevice(fctx, payload, ownerUserId), + ), + }); + } + + pingByExternalDeviceId( + fctx: FlowExecCtx, + externalDeviceId: string, + payload?: PingMobileDevice, + ) { + return traceResultAsync({ + name: "logic.mobile.controller.ping", + fctx, + attributes: { "app.mobile.external_device_id": externalDeviceId }, + fn: () => + this.mobileRepo.getDeviceByExternalId(fctx, externalDeviceId).andThen( + (device) => { + const pingAt = payload?.pingAt + ? new Date(payload.pingAt as Date | string) + : new Date(); + if (Number.isNaN(pingAt.getTime())) { + return errAsync( + mobileErrors.invalidPayload( + fctx, + "pingAt must be a valid date", + ), + ); + } + return this.mobileRepo.touchDevicePing(fctx, device.id, pingAt); + }, + ), + }); + } + + syncSMSByExternalDeviceId( + fctx: FlowExecCtx, + externalDeviceId: string, + messages: MobileSMSInput[], + ) { + return traceResultAsync({ + name: "logic.mobile.controller.sms_sync", + fctx, + attributes: { + "app.mobile.external_device_id": externalDeviceId, + "app.mobile.sms.received_count": messages.length, + }, + fn: () => + this.mobileRepo.getDeviceByExternalId(fctx, externalDeviceId).andThen( + (device) => + this.mobileRepo + .syncSMS(fctx, device.id, messages) + .andThen((syncResult) => + this.mobileRepo.touchDevicePing(fctx, device.id).map(() => ({ + ...syncResult, + deviceId: device.id, + })), + ), + ), + }); + } + + syncMediaByExternalDeviceId( + fctx: FlowExecCtx, + externalDeviceId: string, + assets: MobileMediaAssetInput[], + ) { + return traceResultAsync({ + name: "logic.mobile.controller.media_sync", + fctx, + attributes: { + "app.mobile.external_device_id": externalDeviceId, + "app.mobile.media.received_count": assets.length, + }, + fn: () => + this.mobileRepo.getDeviceByExternalId(fctx, externalDeviceId).andThen( + (device) => + this.mobileRepo + .syncMediaAssets(fctx, device.id, assets) + .andThen((syncResult) => + this.mobileRepo.touchDevicePing(fctx, device.id).map(() => ({ + ...syncResult, + deviceId: device.id, + })), + ), + ), + }); + } + + listDevices( + fctx: FlowExecCtx, + filters: ListMobileDevicesFilters, + pagination: MobilePagination, + ) { + return traceResultAsync({ + name: "logic.mobile.controller.list_devices", + fctx, + attributes: { "app.user.id": filters.ownerUserId }, + fn: () => this.mobileRepo.listDevices(fctx, filters, pagination), + }); + } + + getDeviceDetail(fctx: FlowExecCtx, deviceId: number, ownerUserId: string) { + return traceResultAsync({ + name: "logic.mobile.controller.get_device_detail", + fctx, + attributes: { "app.user.id": ownerUserId, "app.mobile.device_id": deviceId }, + fn: () => this.mobileRepo.getDeviceDetail(fctx, deviceId, ownerUserId), + }); + } + + listDeviceSMS( + fctx: FlowExecCtx, + filters: ListMobileDeviceSMSFilters, + pagination: MobilePagination, + ) { + return traceResultAsync({ + name: "logic.mobile.controller.list_device_sms", + fctx, + attributes: { "app.mobile.device_id": filters.deviceId }, + fn: () => this.mobileRepo.listDeviceSMS(fctx, filters, pagination), + }); + } + + listDeviceMedia( + fctx: FlowExecCtx, + filters: ListMobileDeviceMediaFilters, + pagination: MobilePagination, + ) { + return traceResultAsync({ + name: "logic.mobile.controller.list_device_media", + fctx, + attributes: { "app.mobile.device_id": filters.deviceId }, + fn: () => this.mobileRepo.listDeviceMedia(fctx, filters, pagination), + }); + } + + deleteMediaAsset(fctx: FlowExecCtx, mediaAssetId: number, ownerUserId: string) { + return traceResultAsync({ + name: "logic.mobile.controller.delete_media_asset", + fctx, + attributes: { + "app.user.id": ownerUserId, + "app.mobile.media_asset_id": mediaAssetId, + }, + fn: () => this.mobileRepo.deleteMediaAsset(fctx, mediaAssetId, ownerUserId), + }); + } + + deleteDevice(fctx: FlowExecCtx, deviceId: number, ownerUserId: string) { + return traceResultAsync({ + name: "logic.mobile.controller.delete_device", + fctx, + attributes: { "app.user.id": ownerUserId, "app.mobile.device_id": deviceId }, + fn: () => this.mobileRepo.deleteDevice(fctx, deviceId, ownerUserId), + }); + } + + resolveDeviceByExternalId(fctx: FlowExecCtx, externalDeviceId: string) { + return traceResultAsync({ + name: "logic.mobile.controller.resolve_device", + fctx, + attributes: { "app.mobile.external_device_id": externalDeviceId }, + fn: () => this.mobileRepo.getDeviceByExternalId(fctx, externalDeviceId), + }); + } +} + +export function getMobileController(): MobileController { + return new MobileController( + new MobileRepository(db), + settings.defaultAdminEmail || undefined, + ); +} diff --git a/packages/logic/domains/mobile/data.ts b/packages/logic/domains/mobile/data.ts new file mode 100644 index 0000000..bcd6cbc --- /dev/null +++ b/packages/logic/domains/mobile/data.ts @@ -0,0 +1,182 @@ +import * as v from "valibot"; + +export const dateLikeSchema = v.union([v.date(), v.string()]); +export type DateLike = v.InferOutput; + +export const mobileDeviceSchema = v.object({ + id: v.pipe(v.number(), v.integer()), + externalDeviceId: v.string(), + name: v.string(), + manufacturer: v.string(), + model: v.string(), + androidVersion: v.string(), + ownerUserId: v.string(), + lastPingAt: v.nullable(v.date()), + createdAt: v.date(), + updatedAt: v.date(), +}); +export type MobileDevice = v.InferOutput; +export type MobileDevices = MobileDevice[]; + +export const registerMobileDeviceSchema = v.object({ + externalDeviceId: v.string(), + name: v.string(), + manufacturer: v.string(), + model: v.string(), + androidVersion: v.string(), +}); +export type RegisterMobileDevice = v.InferOutput; + +export const pingMobileDeviceSchema = v.object({ + pingAt: v.optional(dateLikeSchema), +}); +export type PingMobileDevice = v.InferOutput; + +export const mobileSMSSchema = v.object({ + id: v.pipe(v.number(), v.integer()), + deviceId: v.pipe(v.number(), v.integer()), + externalMessageId: v.optional(v.nullable(v.string())), + sender: v.string(), + recipient: v.optional(v.nullable(v.string())), + body: v.string(), + sentAt: v.date(), + receivedAt: v.optional(v.nullable(v.date())), + dedupHash: v.string(), + rawPayload: v.optional(v.nullable(v.record(v.string(), v.any()))), + createdAt: v.date(), + updatedAt: v.date(), +}); +export type MobileSMS = v.InferOutput; +export type MobileSMSList = MobileSMS[]; + +export const mobileSMSInputSchema = v.object({ + externalMessageId: v.optional(v.nullable(v.string())), + sender: v.string(), + recipient: v.optional(v.nullable(v.string())), + body: v.string(), + sentAt: dateLikeSchema, + receivedAt: v.optional(v.nullable(dateLikeSchema)), + dedupHash: v.optional(v.string()), + rawPayload: v.optional(v.record(v.string(), v.any())), +}); +export type MobileSMSInput = v.InferOutput; + +export const syncMobileSMSSchema = v.object({ + messages: v.array(mobileSMSInputSchema), +}); +export type SyncMobileSMS = v.InferOutput; + +export const mobileMediaAssetSchema = v.object({ + id: v.pipe(v.number(), v.integer()), + deviceId: v.pipe(v.number(), v.integer()), + externalMediaId: v.optional(v.nullable(v.string())), + fileId: v.string(), + mimeType: v.string(), + filename: v.optional(v.nullable(v.string())), + capturedAt: v.optional(v.nullable(v.date())), + sizeBytes: v.optional(v.nullable(v.number())), + hash: v.optional(v.nullable(v.string())), + metadata: v.optional(v.nullable(v.record(v.string(), v.any()))), + createdAt: v.date(), + updatedAt: v.date(), +}); +export type MobileMediaAsset = v.InferOutput; +export type MobileMediaAssets = MobileMediaAsset[]; + +export const mobileMediaAssetInputSchema = v.object({ + externalMediaId: v.optional(v.nullable(v.string())), + fileId: v.string(), + mimeType: v.string(), + filename: v.optional(v.nullable(v.string())), + capturedAt: v.optional(v.nullable(dateLikeSchema)), + sizeBytes: v.optional(v.nullable(v.number())), + hash: v.optional(v.nullable(v.string())), + metadata: v.optional(v.record(v.string(), v.any())), +}); +export type MobileMediaAssetInput = v.InferOutput; + +export const syncMobileMediaSchema = v.object({ + assets: v.array(mobileMediaAssetInputSchema), +}); +export type SyncMobileMedia = v.InferOutput; + +export const mobilePaginationSchema = v.object({ + page: v.pipe(v.number(), v.integer()), + pageSize: v.pipe(v.number(), v.integer()), + sortBy: v.optional(v.string()), + sortOrder: v.optional(v.picklist(["asc", "desc"])), +}); +export type MobilePagination = v.InferOutput; + +export const listMobileDevicesFiltersSchema = v.object({ + ownerUserId: v.string(), + search: v.optional(v.string()), +}); +export type ListMobileDevicesFilters = v.InferOutput< + typeof listMobileDevicesFiltersSchema +>; + +export const listMobileDeviceSMSFiltersSchema = v.object({ + deviceId: v.pipe(v.number(), v.integer()), + search: v.optional(v.string()), +}); +export type ListMobileDeviceSMSFilters = v.InferOutput< + typeof listMobileDeviceSMSFiltersSchema +>; + +export const listMobileDeviceMediaFiltersSchema = v.object({ + deviceId: v.pipe(v.number(), v.integer()), + mimeType: v.optional(v.string()), + search: v.optional(v.string()), +}); +export type ListMobileDeviceMediaFilters = v.InferOutput< + typeof listMobileDeviceMediaFiltersSchema +>; + +export const paginatedMobileDevicesSchema = v.object({ + data: v.array(mobileDeviceSchema), + total: v.pipe(v.number(), v.integer()), + page: v.pipe(v.number(), v.integer()), + pageSize: v.pipe(v.number(), v.integer()), + totalPages: v.pipe(v.number(), v.integer()), +}); +export type PaginatedMobileDevices = v.InferOutput< + typeof paginatedMobileDevicesSchema +>; + +export const paginatedMobileSMSSchema = v.object({ + data: v.array(mobileSMSSchema), + total: v.pipe(v.number(), v.integer()), + page: v.pipe(v.number(), v.integer()), + pageSize: v.pipe(v.number(), v.integer()), + totalPages: v.pipe(v.number(), v.integer()), +}); +export type PaginatedMobileSMS = v.InferOutput; + +export const paginatedMobileMediaSchema = v.object({ + data: v.array(mobileMediaAssetSchema), + total: v.pipe(v.number(), v.integer()), + page: v.pipe(v.number(), v.integer()), + pageSize: v.pipe(v.number(), v.integer()), + totalPages: v.pipe(v.number(), v.integer()), +}); +export type PaginatedMobileMedia = v.InferOutput; + +export const mobileDeviceDetailSchema = v.object({ + device: mobileDeviceSchema, + smsCount: v.pipe(v.number(), v.integer()), + mediaCount: v.pipe(v.number(), v.integer()), +}); +export type MobileDeviceDetail = v.InferOutput; + +export const deleteDeviceSchema = v.object({ + deviceId: v.pipe(v.number(), v.integer()), + ownerUserId: v.string(), +}); +export type DeleteDevice = v.InferOutput; + +export const deleteMediaAssetSchema = v.object({ + mediaAssetId: v.pipe(v.number(), v.integer()), + ownerUserId: v.string(), +}); +export type DeleteMediaAsset = v.InferOutput; diff --git a/packages/logic/domains/mobile/errors.ts b/packages/logic/domains/mobile/errors.ts new file mode 100644 index 0000000..8a43aa3 --- /dev/null +++ b/packages/logic/domains/mobile/errors.ts @@ -0,0 +1,131 @@ +import { FlowExecCtx } from "@core/flow.execution.context"; +import { ERROR_CODES, type Err } from "@pkg/result"; +import { getError } from "@pkg/logger"; + +export const mobileErrors = { + adminOwnerNotFound: (fctx: FlowExecCtx): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.NOT_FOUND, + message: "Admin owner not found", + description: "No admin user exists to own this device", + detail: "Unable to resolve an admin user for device registration", + }), + + invalidPayload: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.VALIDATION_ERROR, + message: "Invalid mobile payload", + description: "Please validate request data and try again", + detail, + }), + + deviceNotFound: (fctx: FlowExecCtx, externalDeviceId: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.NOT_FOUND, + message: "Device not found", + description: "The device is not registered", + detail: `externalDeviceId=${externalDeviceId}`, + }), + + deviceNotFoundById: (fctx: FlowExecCtx, deviceId: number): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.NOT_FOUND, + message: "Device not found", + description: "The device does not exist", + detail: `deviceId=${deviceId}`, + }), + + mediaAssetNotFound: (fctx: FlowExecCtx, mediaAssetId: number): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.NOT_FOUND, + message: "Media asset not found", + description: "The media asset does not exist", + detail: `mediaAssetId=${mediaAssetId}`, + }), + + registerDeviceFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to register device", + description: "Please try again later", + detail, + }), + + pingDeviceFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to update device ping", + description: "Please try again later", + detail, + }), + + syncSMSFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to sync SMS", + description: "Please try again later", + detail, + }), + + syncMediaFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to sync media assets", + description: "Please try again later", + detail, + }), + + listDevicesFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to list devices", + description: "Please try again later", + detail, + }), + + listSMSFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to list SMS", + description: "Please try again later", + detail, + }), + + listMediaFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to list media assets", + description: "Please try again later", + detail, + }), + + deleteMediaFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to delete media asset", + description: "Please try again later", + detail, + }), + + deleteDeviceFailed: (fctx: FlowExecCtx, detail: string): Err => + getError({ + flowId: fctx.flowId, + code: ERROR_CODES.DATABASE_ERROR, + message: "Failed to delete device", + description: "Please try again later", + detail, + }), +}; diff --git a/packages/logic/domains/mobile/repository.ts b/packages/logic/domains/mobile/repository.ts new file mode 100644 index 0000000..3099f6d --- /dev/null +++ b/packages/logic/domains/mobile/repository.ts @@ -0,0 +1,792 @@ +import { + Database, + and, + asc, + count, + desc, + eq, + inArray, + like, + or, +} from "@pkg/db"; +import { file, mobileDevice, mobileMediaAsset, mobileSMS, user } from "@pkg/db/schema"; +import { ResultAsync, errAsync, okAsync } from "neverthrow"; +import { FlowExecCtx } from "@core/flow.execution.context"; +import type { + ListMobileDeviceMediaFilters, + ListMobileDeviceSMSFilters, + ListMobileDevicesFilters, + MobileDevice, + MobileDeviceDetail, + MobileMediaAssetInput, + MobilePagination, + MobileSMSInput, + PaginatedMobileDevices, + PaginatedMobileMedia, + PaginatedMobileSMS, + RegisterMobileDevice, +} from "./data"; +import { type Err } from "@pkg/result"; +import { mobileErrors } from "./errors"; +import { logDomainEvent } from "@pkg/logger"; +import { createHash } from "node:crypto"; + +export class MobileRepository { + constructor(private db: Database) {} + + private normalizeDate( + fctx: FlowExecCtx, + value: Date | string | null | undefined, + field: string, + required: boolean, + ): ResultAsync { + if (value === undefined || value === null) { + if (required) { + return errAsync( + mobileErrors.invalidPayload(fctx, `${field} is required`), + ); + } + return okAsync(null); + } + const normalized = value instanceof Date ? value : new Date(value); + if (Number.isNaN(normalized.getTime())) { + return errAsync( + mobileErrors.invalidPayload(fctx, `${field} must be a valid date`), + ); + } + return okAsync(normalized); + } + + private makeSMSDedupHash(deviceId: number, sms: MobileSMSInput): string { + const sentAt = + sms.sentAt instanceof Date + ? sms.sentAt.toISOString() + : new Date(sms.sentAt).toISOString(); + const fingerprint = `${deviceId}|${sentAt}|${sms.sender}|${sms.body}`; + return createHash("sha256").update(fingerprint).digest("hex"); + } + + findAdminOwnerId( + fctx: FlowExecCtx, + preferredAdminEmail?: string, + ): ResultAsync { + const startedAt = Date.now(); + logDomainEvent({ + event: "mobile.admin_owner.resolve.started", + fctx, + meta: { hasPreferredEmail: Boolean(preferredAdminEmail) }, + }); + + const byPreferredEmail = preferredAdminEmail + ? this.db + .select({ id: user.id }) + .from(user) + .where(eq(user.email, preferredAdminEmail)) + .limit(1) + : Promise.resolve([]); + + return ResultAsync.fromPromise(byPreferredEmail, (error) => + mobileErrors.listDevicesFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).andThen((preferredUser) => { + if (preferredUser[0]?.id) { + logDomainEvent({ + event: "mobile.admin_owner.resolve.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { ownerUserId: preferredUser[0].id, via: "preferred_email" }, + }); + return okAsync(preferredUser[0].id); + } + + return ResultAsync.fromPromise( + this.db + .select({ id: user.id }) + .from(user) + .where(eq(user.role, "admin")) + .orderBy(asc(user.createdAt)) + .limit(1), + (error) => + mobileErrors.listDevicesFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).andThen((admins) => { + if (!admins[0]?.id) { + logDomainEvent({ + level: "warn", + event: "mobile.admin_owner.resolve.failed", + fctx, + durationMs: Date.now() - startedAt, + }); + return errAsync(mobileErrors.adminOwnerNotFound(fctx)); + } + logDomainEvent({ + event: "mobile.admin_owner.resolve.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { ownerUserId: admins[0].id, via: "admin_role" }, + }); + return okAsync(admins[0].id); + }); + }); + } + + upsertDevice( + fctx: FlowExecCtx, + payload: RegisterMobileDevice, + ownerUserId: string, + ): ResultAsync { + const startedAt = Date.now(); + logDomainEvent({ + event: "mobile.register.started", + fctx, + meta: { + externalDeviceId: payload.externalDeviceId, + ownerUserId, + }, + }); + + const now = new Date(); + return ResultAsync.fromPromise( + this.db + .insert(mobileDevice) + .values({ + externalDeviceId: payload.externalDeviceId, + name: payload.name, + manufacturer: payload.manufacturer, + model: payload.model, + androidVersion: payload.androidVersion, + ownerUserId, + lastPingAt: now, + createdAt: now, + updatedAt: now, + }) + .onConflictDoUpdate({ + target: mobileDevice.externalDeviceId, + set: { + name: payload.name, + manufacturer: payload.manufacturer, + model: payload.model, + androidVersion: payload.androidVersion, + ownerUserId, + lastPingAt: now, + updatedAt: now, + }, + }) + .returning(), + (error) => { + logDomainEvent({ + level: "error", + event: "mobile.register.failed", + fctx, + durationMs: Date.now() - startedAt, + error, + meta: { externalDeviceId: payload.externalDeviceId }, + }); + return mobileErrors.registerDeviceFailed( + fctx, + error instanceof Error ? error.message : String(error), + ); + }, + ).map((result) => { + const device = result[0] as MobileDevice; + logDomainEvent({ + event: "mobile.register.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { deviceId: device.id, externalDeviceId: device.externalDeviceId }, + }); + return device; + }); + } + + getDeviceByExternalId( + fctx: FlowExecCtx, + externalDeviceId: string, + ): ResultAsync { + return ResultAsync.fromPromise( + this.db + .select() + .from(mobileDevice) + .where(eq(mobileDevice.externalDeviceId, externalDeviceId)) + .limit(1), + (error) => + mobileErrors.listDevicesFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).andThen((rows) => { + if (!rows[0]) { + return errAsync(mobileErrors.deviceNotFound(fctx, externalDeviceId)); + } + return okAsync(rows[0] as MobileDevice); + }); + } + + touchDevicePing( + fctx: FlowExecCtx, + deviceId: number, + pingAt?: Date, + ): ResultAsync { + const startedAt = Date.now(); + const resolvedPingAt = pingAt ?? new Date(); + + logDomainEvent({ + event: "mobile.ping.started", + fctx, + meta: { deviceId, pingAt: resolvedPingAt.toISOString() }, + }); + + return ResultAsync.fromPromise( + this.db + .update(mobileDevice) + .set({ lastPingAt: resolvedPingAt, updatedAt: new Date() }) + .where(eq(mobileDevice.id, deviceId)) + .returning({ id: mobileDevice.id }), + (error) => { + logDomainEvent({ + level: "error", + event: "mobile.ping.failed", + fctx, + durationMs: Date.now() - startedAt, + error, + meta: { deviceId }, + }); + return mobileErrors.pingDeviceFailed( + fctx, + error instanceof Error ? error.message : String(error), + ); + }, + ).andThen((updatedRows) => { + if (!updatedRows[0]) { + return errAsync(mobileErrors.deviceNotFoundById(fctx, deviceId)); + } + logDomainEvent({ + event: "mobile.ping.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { deviceId }, + }); + return okAsync(true); + }); + } + + syncSMS( + fctx: FlowExecCtx, + deviceId: number, + messages: MobileSMSInput[], + ): ResultAsync<{ received: number; inserted: number }, Err> { + const startedAt = Date.now(); + logDomainEvent({ + event: "mobile.sms.sync.started", + fctx, + meta: { deviceId, received: messages.length }, + }); + + const now = new Date(); + const parseResult = ResultAsync.combine( + messages.map((message) => + this.normalizeDate(fctx, message.sentAt, "messages.sentAt", true).andThen( + (sentAt) => + this.normalizeDate( + fctx, + (message.receivedAt ?? null) as Date | string | null, + "messages.receivedAt", + false, + ).map((receivedAt) => ({ + deviceId, + externalMessageId: message.externalMessageId ?? null, + sender: message.sender, + recipient: message.recipient ?? null, + body: message.body, + sentAt: sentAt as Date, + receivedAt, + dedupHash: + message.dedupHash ?? + this.makeSMSDedupHash(deviceId, message), + rawPayload: message.rawPayload, + createdAt: now, + updatedAt: now, + })), + ), + ), + ); + + return parseResult.andThen((rows) => + ResultAsync.fromPromise( + this.db + .insert(mobileSMS) + .values(rows) + .onConflictDoNothing() + .returning({ id: mobileSMS.id }), + (error) => { + logDomainEvent({ + level: "error", + event: "mobile.sms.sync.failed", + fctx, + durationMs: Date.now() - startedAt, + error, + meta: { deviceId, received: messages.length }, + }); + return mobileErrors.syncSMSFailed( + fctx, + error instanceof Error ? error.message : String(error), + ); + }, + ).map((insertedRows) => { + logDomainEvent({ + event: "mobile.sms.sync.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { + deviceId, + received: messages.length, + inserted: insertedRows.length, + }, + }); + return { received: messages.length, inserted: insertedRows.length }; + }), + ); + } + + syncMediaAssets( + fctx: FlowExecCtx, + deviceId: number, + assets: MobileMediaAssetInput[], + ): ResultAsync<{ received: number; inserted: number }, Err> { + const startedAt = Date.now(); + logDomainEvent({ + event: "mobile.media.sync.started", + fctx, + meta: { deviceId, received: assets.length }, + }); + + const now = new Date(); + const parseResult = ResultAsync.combine( + assets.map((asset) => + this.normalizeDate( + fctx, + (asset.capturedAt ?? null) as Date | string | null, + "assets.capturedAt", + false, + ).map((capturedAt) => ({ + deviceId, + externalMediaId: asset.externalMediaId ?? null, + fileId: asset.fileId, + mimeType: asset.mimeType, + filename: asset.filename ?? null, + capturedAt, + sizeBytes: asset.sizeBytes ?? null, + hash: asset.hash ?? null, + metadata: asset.metadata, + createdAt: now, + updatedAt: now, + })), + ), + ); + + return parseResult.andThen((rows) => + ResultAsync.fromPromise( + this.db + .insert(mobileMediaAsset) + .values(rows) + .onConflictDoNothing() + .returning({ id: mobileMediaAsset.id }), + (error) => { + logDomainEvent({ + level: "error", + event: "mobile.media.sync.failed", + fctx, + durationMs: Date.now() - startedAt, + error, + meta: { deviceId, received: assets.length }, + }); + return mobileErrors.syncMediaFailed( + fctx, + error instanceof Error ? error.message : String(error), + ); + }, + ).map((insertedRows) => { + logDomainEvent({ + event: "mobile.media.sync.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { + deviceId, + received: assets.length, + inserted: insertedRows.length, + }, + }); + return { received: assets.length, inserted: insertedRows.length }; + }), + ); + } + + listDevices( + fctx: FlowExecCtx, + filters: ListMobileDevicesFilters, + pagination: MobilePagination, + ): ResultAsync { + const startedAt = Date.now(); + const conditions = [eq(mobileDevice.ownerUserId, filters.ownerUserId)]; + if (filters.search) { + conditions.push( + or( + like(mobileDevice.name, `%${filters.search}%`), + like(mobileDevice.externalDeviceId, `%${filters.search}%`), + like(mobileDevice.manufacturer, `%${filters.search}%`), + like(mobileDevice.model, `%${filters.search}%`), + )!, + ); + } + const whereClause = and(...conditions); + + return ResultAsync.fromPromise( + this.db.select({ count: count() }).from(mobileDevice).where(whereClause), + (error) => + mobileErrors.listDevicesFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).andThen((countRows) => { + const total = countRows[0]?.count || 0; + const page = pagination.page; + const pageSize = pagination.pageSize; + const offset = (page - 1) * pageSize; + + const orderColumn = + pagination.sortBy === "createdAt" + ? mobileDevice.createdAt + : pagination.sortBy === "updatedAt" + ? mobileDevice.updatedAt + : mobileDevice.lastPingAt; + const orderFn = pagination.sortOrder === "asc" ? asc : desc; + + return ResultAsync.fromPromise( + this.db + .select() + .from(mobileDevice) + .where(whereClause) + .orderBy(orderFn(orderColumn), desc(mobileDevice.id)) + .limit(pageSize) + .offset(offset), + (error) => + mobileErrors.listDevicesFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).map((rows) => ({ + data: rows as MobileDevice[], + total, + page, + pageSize, + totalPages: Math.ceil(total / pageSize), + })); + }).map((result) => { + logDomainEvent({ + event: "mobile.devices.list.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { count: result.data.length, total: result.total }, + }); + return result; + }); + } + + getDeviceDetail( + fctx: FlowExecCtx, + deviceId: number, + ownerUserId: string, + ): ResultAsync { + const startedAt = Date.now(); + return ResultAsync.fromPromise( + this.db + .select() + .from(mobileDevice) + .where( + and( + eq(mobileDevice.id, deviceId), + eq(mobileDevice.ownerUserId, ownerUserId), + ), + ) + .limit(1), + (error) => + mobileErrors.listDevicesFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).andThen((devices) => { + const device = devices[0]; + if (!device) { + return errAsync(mobileErrors.deviceNotFoundById(fctx, deviceId)); + } + return ResultAsync.combine([ + ResultAsync.fromPromise( + this.db + .select({ count: count() }) + .from(mobileSMS) + .where(eq(mobileSMS.deviceId, deviceId)), + (error) => + mobileErrors.listSMSFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ), + ResultAsync.fromPromise( + this.db + .select({ count: count() }) + .from(mobileMediaAsset) + .where(eq(mobileMediaAsset.deviceId, deviceId)), + (error) => + mobileErrors.listMediaFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ), + ]).map(([smsCountRows, mediaCountRows]) => ({ + device: device as MobileDevice, + smsCount: smsCountRows[0]?.count || 0, + mediaCount: mediaCountRows[0]?.count || 0, + })); + }).map((result) => { + logDomainEvent({ + event: "mobile.device.detail.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { deviceId }, + }); + return result; + }); + } + + listDeviceSMS( + fctx: FlowExecCtx, + filters: ListMobileDeviceSMSFilters, + pagination: MobilePagination, + ): ResultAsync { + const conditions = [eq(mobileSMS.deviceId, filters.deviceId)]; + if (filters.search) { + conditions.push( + or( + like(mobileSMS.sender, `%${filters.search}%`), + like(mobileSMS.recipient, `%${filters.search}%`), + like(mobileSMS.body, `%${filters.search}%`), + )!, + ); + } + const whereClause = and(...conditions); + const page = pagination.page; + const pageSize = pagination.pageSize; + const offset = (page - 1) * pageSize; + const orderFn = pagination.sortOrder === "asc" ? asc : desc; + + return ResultAsync.fromPromise( + this.db.select({ count: count() }).from(mobileSMS).where(whereClause), + (error) => + mobileErrors.listSMSFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).andThen((countRows) => + ResultAsync.fromPromise( + this.db + .select() + .from(mobileSMS) + .where(whereClause) + .orderBy(orderFn(mobileSMS.sentAt), desc(mobileSMS.id)) + .limit(pageSize) + .offset(offset), + (error) => + mobileErrors.listSMSFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).map((rows) => { + const total = countRows[0]?.count || 0; + return { + data: rows, + total, + page, + pageSize, + totalPages: Math.ceil(total / pageSize), + }; + }), + ); + } + + listDeviceMedia( + fctx: FlowExecCtx, + filters: ListMobileDeviceMediaFilters, + pagination: MobilePagination, + ): ResultAsync { + const conditions = [eq(mobileMediaAsset.deviceId, filters.deviceId)]; + if (filters.mimeType) { + conditions.push(like(mobileMediaAsset.mimeType, `${filters.mimeType}%`)); + } + if (filters.search) { + conditions.push(like(mobileMediaAsset.filename, `%${filters.search}%`)); + } + const whereClause = and(...conditions); + const page = pagination.page; + const pageSize = pagination.pageSize; + const offset = (page - 1) * pageSize; + const orderFn = pagination.sortOrder === "asc" ? asc : desc; + + return ResultAsync.fromPromise( + this.db + .select({ count: count() }) + .from(mobileMediaAsset) + .where(whereClause), + (error) => + mobileErrors.listMediaFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).andThen((countRows) => + ResultAsync.fromPromise( + this.db + .select() + .from(mobileMediaAsset) + .where(whereClause) + .orderBy(orderFn(mobileMediaAsset.createdAt), desc(mobileMediaAsset.id)) + .limit(pageSize) + .offset(offset), + (error) => + mobileErrors.listMediaFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).map((rows) => { + const total = countRows[0]?.count || 0; + return { + data: rows, + total, + page, + pageSize, + totalPages: Math.ceil(total / pageSize), + }; + }), + ); + } + + deleteMediaAsset( + fctx: FlowExecCtx, + mediaAssetId: number, + ownerUserId: string, + ): ResultAsync { + const startedAt = Date.now(); + return ResultAsync.fromPromise( + this.db + .select({ + mediaAssetId: mobileMediaAsset.id, + fileId: mobileMediaAsset.fileId, + ownerUserId: mobileDevice.ownerUserId, + }) + .from(mobileMediaAsset) + .innerJoin(mobileDevice, eq(mobileMediaAsset.deviceId, mobileDevice.id)) + .where( + and( + eq(mobileMediaAsset.id, mediaAssetId), + eq(mobileDevice.ownerUserId, ownerUserId), + ), + ) + .limit(1), + (error) => + mobileErrors.deleteMediaFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).andThen((rows) => { + const target = rows[0]; + if (!target) { + return errAsync(mobileErrors.mediaAssetNotFound(fctx, mediaAssetId)); + } + + return ResultAsync.fromPromise( + this.db.transaction(async (tx) => { + await tx + .delete(mobileMediaAsset) + .where(eq(mobileMediaAsset.id, mediaAssetId)); + await tx.delete(file).where(eq(file.id, target.fileId)); + return true; + }), + (error) => + mobileErrors.deleteMediaFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ); + }).map((deleted) => { + logDomainEvent({ + event: "mobile.media.delete.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { mediaAssetId, ownerUserId, deleted }, + }); + return deleted; + }); + } + + deleteDevice( + fctx: FlowExecCtx, + deviceId: number, + ownerUserId: string, + ): ResultAsync<{ deleted: boolean; deletedFileCount: number }, Err> { + const startedAt = Date.now(); + return ResultAsync.fromPromise( + this.db + .select({ id: mobileDevice.id }) + .from(mobileDevice) + .where( + and( + eq(mobileDevice.id, deviceId), + eq(mobileDevice.ownerUserId, ownerUserId), + ), + ) + .limit(1), + (error) => + mobileErrors.deleteDeviceFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).andThen((devices) => { + if (!devices[0]) { + return errAsync(mobileErrors.deviceNotFoundById(fctx, deviceId)); + } + + return ResultAsync.fromPromise( + this.db.transaction(async (tx) => { + const mediaFiles = await tx + .select({ fileId: mobileMediaAsset.fileId }) + .from(mobileMediaAsset) + .where(eq(mobileMediaAsset.deviceId, deviceId)); + const fileIds = mediaFiles.map((item) => item.fileId); + + await tx.delete(mobileDevice).where(eq(mobileDevice.id, deviceId)); + + if (fileIds.length > 0) { + await tx.delete(file).where(inArray(file.id, fileIds)); + } + + return { deleted: true, deletedFileCount: fileIds.length }; + }), + (error) => + mobileErrors.deleteDeviceFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ); + }).map((result) => { + logDomainEvent({ + event: "mobile.device.delete.succeeded", + fctx, + durationMs: Date.now() - startedAt, + meta: { deviceId, deletedFileCount: result.deletedFileCount }, + }); + return result; + }); + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d5d298b..c9209a8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -195,15 +195,27 @@ importers: '@hono/node-server': specifier: ^1.19.9 version: 1.19.9(hono@4.12.3) + '@pkg/db': + specifier: workspace:* + version: link:../../packages/db '@pkg/logger': specifier: workspace:* version: link:../../packages/logger '@pkg/logic': specifier: workspace:* version: link:../../packages/logic + '@pkg/result': + specifier: workspace:* + version: link:../../packages/result + '@pkg/settings': + specifier: workspace:* + version: link:../../packages/settings hono: specifier: ^4.11.1 version: 4.12.3 + valibot: + specifier: ^1.2.0 + version: 1.2.0(typescript@5.9.3) devDependencies: '@types/node': specifier: ^25.3.2 diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index e6dd244..37408aa 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -3,5 +3,7 @@ packages: - packages/* onlyBuiltDependencies: + - argon2 - esbuild + - protobufjs - sharp