implemented the domain logic + processor endpoints

This commit is contained in:
user
2026-03-01 06:33:32 +02:00
parent c74523e4bc
commit 8aa8ddfa7d
10 changed files with 1629 additions and 32 deletions

View File

@@ -59,35 +59,35 @@ Decisions captured:
Target: `packages/db/schema/*` + new migration files. Target: `packages/db/schema/*` + new migration files.
- [ ] Add `mobile_device` table: - [x] Add `mobile_device` table:
- Fields: `id`, `externalDeviceId`, `name`, `manufacturer`, `model`, `androidVersion`, `ownerUserId`, `lastPingAt`, `createdAt`, `updatedAt`. - Fields: `id`, `externalDeviceId`, `name`, `manufacturer`, `model`, `androidVersion`, `ownerUserId`, `lastPingAt`, `createdAt`, `updatedAt`.
- [ ] Add `mobile_sms` table: - [x] Add `mobile_sms` table:
- Fields: `id`, `deviceId`, `externalMessageId?`, `sender`, `recipient?`, `body`, `sentAt`, `receivedAt?`, `rawPayload?`, `createdAt`. - Fields: `id`, `deviceId`, `externalMessageId?`, `sender`, `recipient?`, `body`, `sentAt`, `receivedAt?`, `rawPayload?`, `createdAt`.
- [ ] Add `mobile_media_asset` table: - [x] Add `mobile_media_asset` table:
- Fields: `id`, `deviceId`, `externalMediaId?`, `mimeType`, `filename?`, `capturedAt?`, `sizeBytes?`, `hash?`, `metadata`, `createdAt`. - Fields: `id`, `deviceId`, `externalMediaId?`, `mimeType`, `filename?`, `capturedAt?`, `sizeBytes?`, `hash?`, `metadata`, `createdAt`.
- [ ] Add indexes for common reads: - [x] Add indexes for common reads:
- `mobile_device.lastPingAt` - `mobile_device.lastPingAt`
- `mobile_sms.deviceId + sentAt desc` - `mobile_sms.deviceId + sentAt desc`
- `mobile_media_asset.deviceId + createdAt desc` - `mobile_media_asset.deviceId + createdAt desc`
- [ ] Export schema from `packages/db/schema/index.ts`. - [x] Export schema from `packages/db/schema/index.ts`.
Definition of done: Definition of done:
- [ ] Tables and indexes exist in schema + migration files. - [x] Tables and indexes exist in schema + migration files.
- [ ] Naming matches existing conventions. - [x] Naming matches existing conventions.
### Phase 2: Logic Domain (`@pkg/logic`) ### Phase 2: Logic Domain (`@pkg/logic`)
Target: `packages/logic/domains/mobile/*` (new domain). Target: `packages/logic/domains/mobile/*` (new domain).
- [ ] Create `data.ts` with Valibot schemas and inferred types for: - [x] Create `data.ts` with Valibot schemas and inferred types for:
- register device payload - register device payload
- ping payload - ping payload
- sms sync payload - sms sync payload
- media sync payload - media sync payload
- admin query filters/pagination - admin query filters/pagination
- [ ] Create `errors.ts` with domain error constructors using `getError`. - [x] Create `errors.ts` with domain error constructors using `getError`.
- [ ] Create `repository.ts` with ResultAsync operations for: - [x] Create `repository.ts` with ResultAsync operations for:
- upsert device - upsert device
- update last ping - update last ping
- bulk insert sms (idempotent) - bulk insert sms (idempotent)
@@ -98,33 +98,33 @@ Target: `packages/logic/domains/mobile/*` (new domain).
- list media by device (paginated, latest first) - list media by device (paginated, latest first)
- delete single media asset - delete single media asset
- delete device (removed all child sms, and media assets, and by extensionn the associated files in r2+db as well) - delete device (removed all child sms, and media assets, and by extensionn the associated files in r2+db as well)
- [ ] Create `controller.ts` as use-case layer. - [x] Create `controller.ts` as use-case layer.
- [ ] Wrap repository/controller operations with `traceResultAsync` and include `fctx`. - [x] Wrap repository/controller operations with `traceResultAsync` and include `fctx`.
Definition of done: Definition of done:
- [ ] Processor and main app can consume this domain without direct DB usage. - [x] Processor and main app can consume this domain without direct DB usage.
- [ ] No ad-hoc thrown errors in domain flow; Result pattern is preserved. - [x] No ad-hoc thrown errors in domain flow; Result pattern is preserved.
### Phase 3: Processor Ingestion API (`apps/processor`) ### Phase 3: Processor Ingestion API (`apps/processor`)
Target: `apps/processor/src/domains/mobile/router.ts`. Target: `apps/processor/src/domains/mobile/router.ts`.
- [ ] Replace stub endpoints with full handlers: - [x] Replace stub endpoints with full handlers:
- `POST /register` - `POST /register`
- `PUT /ping` - `PUT /ping`
- `PUT /sms/sync` - `PUT /sms/sync`
- `PUT /media/sync` - `PUT /media/sync`
- [ ] Validate request body using schemas from `@pkg/logic/domains/mobile/data`. - [x] Validate request body using schemas from `@pkg/logic/domains/mobile/data`.
- [ ] Build `FlowExecCtx` per request (flow id always; user/session when available). - [x] Build `FlowExecCtx` per request (flow id always; user/session when available).
- [ ] Call mobile controller methods; return `{ data, error }` response shape. - [x] Call mobile controller methods; return `{ data, error }` response shape.
- [ ] Add basic endpoint protection agreed in Phase 0. - [x] Add basic endpoint protection agreed in Phase 0.
- [ ] Add request-level structured logging for success/failure. - [x] Add request-level structured logging for success/failure.
Definition of done: Definition of done:
- [ ] Endpoints persist data into new tables. - [x] Endpoints persist data into new tables.
- [ ] Endpoint failures return normalized domain errors. - [x] Endpoint failures return normalized domain errors.
### Phase 4: Admin UI in `apps/main` ### Phase 4: Admin UI in `apps/main`

View File

@@ -7,10 +7,14 @@
"start": "node dist/index.js" "start": "node dist/index.js"
}, },
"dependencies": { "dependencies": {
"@hono/node-server": "^1.19.9",
"@pkg/db": "workspace:*",
"@pkg/logger": "workspace:*", "@pkg/logger": "workspace:*",
"@pkg/logic": "workspace:*", "@pkg/logic": "workspace:*",
"@hono/node-server": "^1.19.9", "@pkg/result": "workspace:*",
"hono": "^4.11.1" "@pkg/settings": "workspace:*",
"hono": "^4.11.1",
"valibot": "^1.2.0"
}, },
"devDependencies": { "devDependencies": {
"@types/node": "^25.3.2", "@types/node": "^25.3.2",

View File

@@ -1,15 +1,266 @@
import {
pingMobileDeviceSchema,
registerMobileDeviceSchema,
syncMobileMediaSchema,
syncMobileSMSSchema,
} from "@pkg/logic/domains/mobile/data";
import { getMobileController } from "@pkg/logic/domains/mobile/controller";
import type { FlowExecCtx } from "@pkg/logic/core/flow.execution.context";
import { errorStatusMap, type Err } from "@pkg/result";
import { logDomainEvent } from "@pkg/logger";
import type { Context } from "hono";
import * as v from "valibot";
import { Hono } from "hono"; import { Hono } from "hono";
const mobileController = getMobileController();
const DEVICE_ID_HEADER = "x-device-id";
const FLOW_ID_HEADER = "x-flow-id";
function buildFlowExecCtx(c: Context): FlowExecCtx {
return {
flowId: c.req.header(FLOW_ID_HEADER) || crypto.randomUUID(),
};
}
function respondError(c: Context, fctx: FlowExecCtx, err: Err) {
const status = errorStatusMap[err.code] || 500;
c.status(status as any);
return c.json({ data: null, error: { ...err, flowId: fctx.flowId } });
}
function requireExternalDeviceId(c: Context): string | null {
const externalDeviceId = c.req.header(DEVICE_ID_HEADER);
if (!externalDeviceId || externalDeviceId.trim().length === 0) {
return null;
}
return externalDeviceId.trim();
}
async function parseJson(c: Context) {
try {
return await c.req.json();
} catch {
return null;
}
}
export const mobileRouter = new Hono() export const mobileRouter = new Hono()
.post("/register", async (c) => {
const fctx = buildFlowExecCtx(c);
const startedAt = Date.now();
logDomainEvent({
event: "processor.mobile.register.started",
fctx,
});
const payload = await parseJson(c);
const parsed = v.safeParse(registerMobileDeviceSchema, payload);
if (!parsed.success) {
const error = {
flowId: fctx.flowId,
code: "VALIDATION_ERROR",
message: "Invalid register payload",
description: "Please validate the payload and retry",
detail: parsed.issues.map((issue) => issue.message).join(", "),
} as Err;
return respondError(c, fctx, error);
}
const result = await mobileController.registerDevice(
fctx,
parsed.output,
);
if (result.isErr()) {
logDomainEvent({
level: "error",
event: "processor.mobile.register.failed",
fctx,
durationMs: Date.now() - startedAt,
error: result.error,
});
return respondError(c, fctx, result.error);
}
logDomainEvent({
event: "processor.mobile.register.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { deviceId: result.value.id },
});
return c.json({ data: result.value, error: null });
})
.put("/ping", async (c) => { .put("/ping", async (c) => {
return c.json({ data: "" }); const fctx = buildFlowExecCtx(c);
const startedAt = Date.now();
const externalDeviceId = requireExternalDeviceId(c);
if (!externalDeviceId) {
const error = {
flowId: fctx.flowId,
code: "AUTH_ERROR",
message: "Missing device id header",
description: `Request must include ${DEVICE_ID_HEADER}`,
detail: `${DEVICE_ID_HEADER} header is required`,
} as Err;
return respondError(c, fctx, error);
}
const payload = await parseJson(c);
const parsed = v.safeParse(pingMobileDeviceSchema, payload ?? {});
if (!parsed.success) {
const error = {
flowId: fctx.flowId,
code: "VALIDATION_ERROR",
message: "Invalid ping payload",
description: "Please validate the payload and retry",
detail: parsed.issues.map((issue) => issue.message).join(", "),
} as Err;
return respondError(c, fctx, error);
}
const result = await mobileController.pingByExternalDeviceId(
fctx,
externalDeviceId,
parsed.output,
);
if (result.isErr()) {
logDomainEvent({
level: "error",
event: "processor.mobile.ping.failed",
fctx,
durationMs: Date.now() - startedAt,
error: result.error,
meta: { externalDeviceId },
});
return respondError(c, fctx, result.error);
}
logDomainEvent({
event: "processor.mobile.ping.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { externalDeviceId },
});
return c.json({ data: { ok: result.value }, error: null });
}) })
.put("/sms/sync", async (c) => { .put("/sms/sync", async (c) => {
return c.json({ data: "" }); const fctx = buildFlowExecCtx(c);
const startedAt = Date.now();
const externalDeviceId = requireExternalDeviceId(c);
if (!externalDeviceId) {
const error = {
flowId: fctx.flowId,
code: "AUTH_ERROR",
message: "Missing device id header",
description: `Request must include ${DEVICE_ID_HEADER}`,
detail: `${DEVICE_ID_HEADER} header is required`,
} as Err;
return respondError(c, fctx, error);
}
const payload = await parseJson(c);
const parsed = v.safeParse(syncMobileSMSSchema, payload);
if (!parsed.success) {
const error = {
flowId: fctx.flowId,
code: "VALIDATION_ERROR",
message: "Invalid SMS sync payload",
description: "Please validate the payload and retry",
detail: parsed.issues.map((issue) => issue.message).join(", "),
} as Err;
return respondError(c, fctx, error);
}
const result = await mobileController.syncSMSByExternalDeviceId(
fctx,
externalDeviceId,
parsed.output.messages,
);
if (result.isErr()) {
logDomainEvent({
level: "error",
event: "processor.mobile.sms_sync.failed",
fctx,
durationMs: Date.now() - startedAt,
error: result.error,
meta: {
externalDeviceId,
received: parsed.output.messages.length,
},
});
return respondError(c, fctx, result.error);
}
logDomainEvent({
event: "processor.mobile.sms_sync.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: {
externalDeviceId,
...result.value,
},
});
return c.json({ data: result.value, error: null });
}) })
.put("/media/sync", async (c) => { .put("/media/sync", async (c) => {
return c.json({ data: "" }); const fctx = buildFlowExecCtx(c);
}) const startedAt = Date.now();
.post("/register", async (c) => { const externalDeviceId = requireExternalDeviceId(c);
return c.json({ data: "" });
if (!externalDeviceId) {
const error = {
flowId: fctx.flowId,
code: "AUTH_ERROR",
message: "Missing device id header",
description: `Request must include ${DEVICE_ID_HEADER}`,
detail: `${DEVICE_ID_HEADER} header is required`,
} as Err;
return respondError(c, fctx, error);
}
const payload = await parseJson(c);
const parsed = v.safeParse(syncMobileMediaSchema, payload);
if (!parsed.success) {
const error = {
flowId: fctx.flowId,
code: "VALIDATION_ERROR",
message: "Invalid media sync payload",
description: "Please validate the payload and retry",
detail: parsed.issues.map((issue) => issue.message).join(", "),
} as Err;
return respondError(c, fctx, error);
}
const result = await mobileController.syncMediaByExternalDeviceId(
fctx,
externalDeviceId,
parsed.output.assets,
);
if (result.isErr()) {
logDomainEvent({
level: "error",
event: "processor.mobile.media_sync.failed",
fctx,
durationMs: Date.now() - startedAt,
error: result.error,
meta: {
externalDeviceId,
received: parsed.output.assets.length,
},
});
return respondError(c, fctx, result.error);
}
logDomainEvent({
event: "processor.mobile.media_sync.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: {
externalDeviceId,
...result.value,
},
});
return c.json({ data: result.value, error: null });
}); });

View File

@@ -1,13 +1,30 @@
{ {
"compilerOptions": { "compilerOptions": {
"target": "ESNext", "target": "ESNext",
"module": "NodeNext", "module": "ESNext",
"moduleResolution": "bundler",
"strict": true, "strict": true,
"verbatimModuleSyntax": true, "verbatimModuleSyntax": false,
"skipLibCheck": true, "skipLibCheck": true,
"types": [ "types": [
"node" "node"
], ],
"baseUrl": ".",
"paths": {
"@pkg/logic": ["../../packages/logic"],
"@pkg/logic/*": ["../../packages/logic/*"],
"@pkg/db": ["../../packages/db"],
"@pkg/db/*": ["../../packages/db/*"],
"@pkg/logger": ["../../packages/logger"],
"@pkg/logger/*": ["../../packages/logger/*"],
"@pkg/result": ["../../packages/result"],
"@pkg/result/*": ["../../packages/result/*"],
"@pkg/settings": ["../../packages/settings"],
"@pkg/settings/*": ["../../packages/settings/*"],
"@/*": ["../../packages/logic/*"],
"@core/*": ["../../packages/logic/core/*"],
"@domains/*": ["../../packages/logic/domains/*"]
},
"jsx": "react-jsx", "jsx": "react-jsx",
"jsxImportSource": "hono/jsx", "jsxImportSource": "hono/jsx",
"outDir": "./dist" "outDir": "./dist"

View File

@@ -0,0 +1,206 @@
import {
ListMobileDeviceMediaFilters,
ListMobileDeviceSMSFilters,
ListMobileDevicesFilters,
MobileMediaAssetInput,
MobilePagination,
MobileSMSInput,
PingMobileDevice,
RegisterMobileDevice,
} from "./data";
import { FlowExecCtx } from "@core/flow.execution.context";
import { traceResultAsync } from "@core/observability";
import { MobileRepository } from "./repository";
import { errAsync } from "neverthrow";
import { db } from "@pkg/db";
import { settings } from "@core/settings";
import { mobileErrors } from "./errors";
export class MobileController {
constructor(
private mobileRepo: MobileRepository,
private defaultAdminEmail?: string,
) {}
registerDevice(fctx: FlowExecCtx, payload: RegisterMobileDevice) {
return traceResultAsync({
name: "logic.mobile.controller.register",
fctx,
attributes: { "app.mobile.external_device_id": payload.externalDeviceId },
fn: () =>
this.mobileRepo
.findAdminOwnerId(fctx, this.defaultAdminEmail)
.andThen((ownerUserId) =>
this.mobileRepo.upsertDevice(fctx, payload, ownerUserId),
),
});
}
pingByExternalDeviceId(
fctx: FlowExecCtx,
externalDeviceId: string,
payload?: PingMobileDevice,
) {
return traceResultAsync({
name: "logic.mobile.controller.ping",
fctx,
attributes: { "app.mobile.external_device_id": externalDeviceId },
fn: () =>
this.mobileRepo.getDeviceByExternalId(fctx, externalDeviceId).andThen(
(device) => {
const pingAt = payload?.pingAt
? new Date(payload.pingAt as Date | string)
: new Date();
if (Number.isNaN(pingAt.getTime())) {
return errAsync(
mobileErrors.invalidPayload(
fctx,
"pingAt must be a valid date",
),
);
}
return this.mobileRepo.touchDevicePing(fctx, device.id, pingAt);
},
),
});
}
syncSMSByExternalDeviceId(
fctx: FlowExecCtx,
externalDeviceId: string,
messages: MobileSMSInput[],
) {
return traceResultAsync({
name: "logic.mobile.controller.sms_sync",
fctx,
attributes: {
"app.mobile.external_device_id": externalDeviceId,
"app.mobile.sms.received_count": messages.length,
},
fn: () =>
this.mobileRepo.getDeviceByExternalId(fctx, externalDeviceId).andThen(
(device) =>
this.mobileRepo
.syncSMS(fctx, device.id, messages)
.andThen((syncResult) =>
this.mobileRepo.touchDevicePing(fctx, device.id).map(() => ({
...syncResult,
deviceId: device.id,
})),
),
),
});
}
syncMediaByExternalDeviceId(
fctx: FlowExecCtx,
externalDeviceId: string,
assets: MobileMediaAssetInput[],
) {
return traceResultAsync({
name: "logic.mobile.controller.media_sync",
fctx,
attributes: {
"app.mobile.external_device_id": externalDeviceId,
"app.mobile.media.received_count": assets.length,
},
fn: () =>
this.mobileRepo.getDeviceByExternalId(fctx, externalDeviceId).andThen(
(device) =>
this.mobileRepo
.syncMediaAssets(fctx, device.id, assets)
.andThen((syncResult) =>
this.mobileRepo.touchDevicePing(fctx, device.id).map(() => ({
...syncResult,
deviceId: device.id,
})),
),
),
});
}
listDevices(
fctx: FlowExecCtx,
filters: ListMobileDevicesFilters,
pagination: MobilePagination,
) {
return traceResultAsync({
name: "logic.mobile.controller.list_devices",
fctx,
attributes: { "app.user.id": filters.ownerUserId },
fn: () => this.mobileRepo.listDevices(fctx, filters, pagination),
});
}
getDeviceDetail(fctx: FlowExecCtx, deviceId: number, ownerUserId: string) {
return traceResultAsync({
name: "logic.mobile.controller.get_device_detail",
fctx,
attributes: { "app.user.id": ownerUserId, "app.mobile.device_id": deviceId },
fn: () => this.mobileRepo.getDeviceDetail(fctx, deviceId, ownerUserId),
});
}
listDeviceSMS(
fctx: FlowExecCtx,
filters: ListMobileDeviceSMSFilters,
pagination: MobilePagination,
) {
return traceResultAsync({
name: "logic.mobile.controller.list_device_sms",
fctx,
attributes: { "app.mobile.device_id": filters.deviceId },
fn: () => this.mobileRepo.listDeviceSMS(fctx, filters, pagination),
});
}
listDeviceMedia(
fctx: FlowExecCtx,
filters: ListMobileDeviceMediaFilters,
pagination: MobilePagination,
) {
return traceResultAsync({
name: "logic.mobile.controller.list_device_media",
fctx,
attributes: { "app.mobile.device_id": filters.deviceId },
fn: () => this.mobileRepo.listDeviceMedia(fctx, filters, pagination),
});
}
deleteMediaAsset(fctx: FlowExecCtx, mediaAssetId: number, ownerUserId: string) {
return traceResultAsync({
name: "logic.mobile.controller.delete_media_asset",
fctx,
attributes: {
"app.user.id": ownerUserId,
"app.mobile.media_asset_id": mediaAssetId,
},
fn: () => this.mobileRepo.deleteMediaAsset(fctx, mediaAssetId, ownerUserId),
});
}
deleteDevice(fctx: FlowExecCtx, deviceId: number, ownerUserId: string) {
return traceResultAsync({
name: "logic.mobile.controller.delete_device",
fctx,
attributes: { "app.user.id": ownerUserId, "app.mobile.device_id": deviceId },
fn: () => this.mobileRepo.deleteDevice(fctx, deviceId, ownerUserId),
});
}
resolveDeviceByExternalId(fctx: FlowExecCtx, externalDeviceId: string) {
return traceResultAsync({
name: "logic.mobile.controller.resolve_device",
fctx,
attributes: { "app.mobile.external_device_id": externalDeviceId },
fn: () => this.mobileRepo.getDeviceByExternalId(fctx, externalDeviceId),
});
}
}
export function getMobileController(): MobileController {
return new MobileController(
new MobileRepository(db),
settings.defaultAdminEmail || undefined,
);
}

View File

@@ -0,0 +1,182 @@
import * as v from "valibot";
export const dateLikeSchema = v.union([v.date(), v.string()]);
export type DateLike = v.InferOutput<typeof dateLikeSchema>;
export const mobileDeviceSchema = v.object({
id: v.pipe(v.number(), v.integer()),
externalDeviceId: v.string(),
name: v.string(),
manufacturer: v.string(),
model: v.string(),
androidVersion: v.string(),
ownerUserId: v.string(),
lastPingAt: v.nullable(v.date()),
createdAt: v.date(),
updatedAt: v.date(),
});
export type MobileDevice = v.InferOutput<typeof mobileDeviceSchema>;
export type MobileDevices = MobileDevice[];
export const registerMobileDeviceSchema = v.object({
externalDeviceId: v.string(),
name: v.string(),
manufacturer: v.string(),
model: v.string(),
androidVersion: v.string(),
});
export type RegisterMobileDevice = v.InferOutput<typeof registerMobileDeviceSchema>;
export const pingMobileDeviceSchema = v.object({
pingAt: v.optional(dateLikeSchema),
});
export type PingMobileDevice = v.InferOutput<typeof pingMobileDeviceSchema>;
export const mobileSMSSchema = v.object({
id: v.pipe(v.number(), v.integer()),
deviceId: v.pipe(v.number(), v.integer()),
externalMessageId: v.optional(v.nullable(v.string())),
sender: v.string(),
recipient: v.optional(v.nullable(v.string())),
body: v.string(),
sentAt: v.date(),
receivedAt: v.optional(v.nullable(v.date())),
dedupHash: v.string(),
rawPayload: v.optional(v.nullable(v.record(v.string(), v.any()))),
createdAt: v.date(),
updatedAt: v.date(),
});
export type MobileSMS = v.InferOutput<typeof mobileSMSSchema>;
export type MobileSMSList = MobileSMS[];
export const mobileSMSInputSchema = v.object({
externalMessageId: v.optional(v.nullable(v.string())),
sender: v.string(),
recipient: v.optional(v.nullable(v.string())),
body: v.string(),
sentAt: dateLikeSchema,
receivedAt: v.optional(v.nullable(dateLikeSchema)),
dedupHash: v.optional(v.string()),
rawPayload: v.optional(v.record(v.string(), v.any())),
});
export type MobileSMSInput = v.InferOutput<typeof mobileSMSInputSchema>;
export const syncMobileSMSSchema = v.object({
messages: v.array(mobileSMSInputSchema),
});
export type SyncMobileSMS = v.InferOutput<typeof syncMobileSMSSchema>;
export const mobileMediaAssetSchema = v.object({
id: v.pipe(v.number(), v.integer()),
deviceId: v.pipe(v.number(), v.integer()),
externalMediaId: v.optional(v.nullable(v.string())),
fileId: v.string(),
mimeType: v.string(),
filename: v.optional(v.nullable(v.string())),
capturedAt: v.optional(v.nullable(v.date())),
sizeBytes: v.optional(v.nullable(v.number())),
hash: v.optional(v.nullable(v.string())),
metadata: v.optional(v.nullable(v.record(v.string(), v.any()))),
createdAt: v.date(),
updatedAt: v.date(),
});
export type MobileMediaAsset = v.InferOutput<typeof mobileMediaAssetSchema>;
export type MobileMediaAssets = MobileMediaAsset[];
export const mobileMediaAssetInputSchema = v.object({
externalMediaId: v.optional(v.nullable(v.string())),
fileId: v.string(),
mimeType: v.string(),
filename: v.optional(v.nullable(v.string())),
capturedAt: v.optional(v.nullable(dateLikeSchema)),
sizeBytes: v.optional(v.nullable(v.number())),
hash: v.optional(v.nullable(v.string())),
metadata: v.optional(v.record(v.string(), v.any())),
});
export type MobileMediaAssetInput = v.InferOutput<typeof mobileMediaAssetInputSchema>;
export const syncMobileMediaSchema = v.object({
assets: v.array(mobileMediaAssetInputSchema),
});
export type SyncMobileMedia = v.InferOutput<typeof syncMobileMediaSchema>;
export const mobilePaginationSchema = v.object({
page: v.pipe(v.number(), v.integer()),
pageSize: v.pipe(v.number(), v.integer()),
sortBy: v.optional(v.string()),
sortOrder: v.optional(v.picklist(["asc", "desc"])),
});
export type MobilePagination = v.InferOutput<typeof mobilePaginationSchema>;
export const listMobileDevicesFiltersSchema = v.object({
ownerUserId: v.string(),
search: v.optional(v.string()),
});
export type ListMobileDevicesFilters = v.InferOutput<
typeof listMobileDevicesFiltersSchema
>;
export const listMobileDeviceSMSFiltersSchema = v.object({
deviceId: v.pipe(v.number(), v.integer()),
search: v.optional(v.string()),
});
export type ListMobileDeviceSMSFilters = v.InferOutput<
typeof listMobileDeviceSMSFiltersSchema
>;
export const listMobileDeviceMediaFiltersSchema = v.object({
deviceId: v.pipe(v.number(), v.integer()),
mimeType: v.optional(v.string()),
search: v.optional(v.string()),
});
export type ListMobileDeviceMediaFilters = v.InferOutput<
typeof listMobileDeviceMediaFiltersSchema
>;
export const paginatedMobileDevicesSchema = v.object({
data: v.array(mobileDeviceSchema),
total: v.pipe(v.number(), v.integer()),
page: v.pipe(v.number(), v.integer()),
pageSize: v.pipe(v.number(), v.integer()),
totalPages: v.pipe(v.number(), v.integer()),
});
export type PaginatedMobileDevices = v.InferOutput<
typeof paginatedMobileDevicesSchema
>;
export const paginatedMobileSMSSchema = v.object({
data: v.array(mobileSMSSchema),
total: v.pipe(v.number(), v.integer()),
page: v.pipe(v.number(), v.integer()),
pageSize: v.pipe(v.number(), v.integer()),
totalPages: v.pipe(v.number(), v.integer()),
});
export type PaginatedMobileSMS = v.InferOutput<typeof paginatedMobileSMSSchema>;
export const paginatedMobileMediaSchema = v.object({
data: v.array(mobileMediaAssetSchema),
total: v.pipe(v.number(), v.integer()),
page: v.pipe(v.number(), v.integer()),
pageSize: v.pipe(v.number(), v.integer()),
totalPages: v.pipe(v.number(), v.integer()),
});
export type PaginatedMobileMedia = v.InferOutput<typeof paginatedMobileMediaSchema>;
export const mobileDeviceDetailSchema = v.object({
device: mobileDeviceSchema,
smsCount: v.pipe(v.number(), v.integer()),
mediaCount: v.pipe(v.number(), v.integer()),
});
export type MobileDeviceDetail = v.InferOutput<typeof mobileDeviceDetailSchema>;
export const deleteDeviceSchema = v.object({
deviceId: v.pipe(v.number(), v.integer()),
ownerUserId: v.string(),
});
export type DeleteDevice = v.InferOutput<typeof deleteDeviceSchema>;
export const deleteMediaAssetSchema = v.object({
mediaAssetId: v.pipe(v.number(), v.integer()),
ownerUserId: v.string(),
});
export type DeleteMediaAsset = v.InferOutput<typeof deleteMediaAssetSchema>;

View File

@@ -0,0 +1,131 @@
import { FlowExecCtx } from "@core/flow.execution.context";
import { ERROR_CODES, type Err } from "@pkg/result";
import { getError } from "@pkg/logger";
export const mobileErrors = {
adminOwnerNotFound: (fctx: FlowExecCtx): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.NOT_FOUND,
message: "Admin owner not found",
description: "No admin user exists to own this device",
detail: "Unable to resolve an admin user for device registration",
}),
invalidPayload: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.VALIDATION_ERROR,
message: "Invalid mobile payload",
description: "Please validate request data and try again",
detail,
}),
deviceNotFound: (fctx: FlowExecCtx, externalDeviceId: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.NOT_FOUND,
message: "Device not found",
description: "The device is not registered",
detail: `externalDeviceId=${externalDeviceId}`,
}),
deviceNotFoundById: (fctx: FlowExecCtx, deviceId: number): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.NOT_FOUND,
message: "Device not found",
description: "The device does not exist",
detail: `deviceId=${deviceId}`,
}),
mediaAssetNotFound: (fctx: FlowExecCtx, mediaAssetId: number): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.NOT_FOUND,
message: "Media asset not found",
description: "The media asset does not exist",
detail: `mediaAssetId=${mediaAssetId}`,
}),
registerDeviceFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to register device",
description: "Please try again later",
detail,
}),
pingDeviceFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to update device ping",
description: "Please try again later",
detail,
}),
syncSMSFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to sync SMS",
description: "Please try again later",
detail,
}),
syncMediaFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to sync media assets",
description: "Please try again later",
detail,
}),
listDevicesFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to list devices",
description: "Please try again later",
detail,
}),
listSMSFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to list SMS",
description: "Please try again later",
detail,
}),
listMediaFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to list media assets",
description: "Please try again later",
detail,
}),
deleteMediaFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to delete media asset",
description: "Please try again later",
detail,
}),
deleteDeviceFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to delete device",
description: "Please try again later",
detail,
}),
};

View File

@@ -0,0 +1,792 @@
import {
Database,
and,
asc,
count,
desc,
eq,
inArray,
like,
or,
} from "@pkg/db";
import { file, mobileDevice, mobileMediaAsset, mobileSMS, user } from "@pkg/db/schema";
import { ResultAsync, errAsync, okAsync } from "neverthrow";
import { FlowExecCtx } from "@core/flow.execution.context";
import type {
ListMobileDeviceMediaFilters,
ListMobileDeviceSMSFilters,
ListMobileDevicesFilters,
MobileDevice,
MobileDeviceDetail,
MobileMediaAssetInput,
MobilePagination,
MobileSMSInput,
PaginatedMobileDevices,
PaginatedMobileMedia,
PaginatedMobileSMS,
RegisterMobileDevice,
} from "./data";
import { type Err } from "@pkg/result";
import { mobileErrors } from "./errors";
import { logDomainEvent } from "@pkg/logger";
import { createHash } from "node:crypto";
export class MobileRepository {
constructor(private db: Database) {}
private normalizeDate(
fctx: FlowExecCtx,
value: Date | string | null | undefined,
field: string,
required: boolean,
): ResultAsync<Date | null, Err> {
if (value === undefined || value === null) {
if (required) {
return errAsync(
mobileErrors.invalidPayload(fctx, `${field} is required`),
);
}
return okAsync(null);
}
const normalized = value instanceof Date ? value : new Date(value);
if (Number.isNaN(normalized.getTime())) {
return errAsync(
mobileErrors.invalidPayload(fctx, `${field} must be a valid date`),
);
}
return okAsync(normalized);
}
private makeSMSDedupHash(deviceId: number, sms: MobileSMSInput): string {
const sentAt =
sms.sentAt instanceof Date
? sms.sentAt.toISOString()
: new Date(sms.sentAt).toISOString();
const fingerprint = `${deviceId}|${sentAt}|${sms.sender}|${sms.body}`;
return createHash("sha256").update(fingerprint).digest("hex");
}
findAdminOwnerId(
fctx: FlowExecCtx,
preferredAdminEmail?: string,
): ResultAsync<string, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "mobile.admin_owner.resolve.started",
fctx,
meta: { hasPreferredEmail: Boolean(preferredAdminEmail) },
});
const byPreferredEmail = preferredAdminEmail
? this.db
.select({ id: user.id })
.from(user)
.where(eq(user.email, preferredAdminEmail))
.limit(1)
: Promise.resolve([]);
return ResultAsync.fromPromise(byPreferredEmail, (error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((preferredUser) => {
if (preferredUser[0]?.id) {
logDomainEvent({
event: "mobile.admin_owner.resolve.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { ownerUserId: preferredUser[0].id, via: "preferred_email" },
});
return okAsync(preferredUser[0].id);
}
return ResultAsync.fromPromise(
this.db
.select({ id: user.id })
.from(user)
.where(eq(user.role, "admin"))
.orderBy(asc(user.createdAt))
.limit(1),
(error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((admins) => {
if (!admins[0]?.id) {
logDomainEvent({
level: "warn",
event: "mobile.admin_owner.resolve.failed",
fctx,
durationMs: Date.now() - startedAt,
});
return errAsync(mobileErrors.adminOwnerNotFound(fctx));
}
logDomainEvent({
event: "mobile.admin_owner.resolve.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { ownerUserId: admins[0].id, via: "admin_role" },
});
return okAsync(admins[0].id);
});
});
}
upsertDevice(
fctx: FlowExecCtx,
payload: RegisterMobileDevice,
ownerUserId: string,
): ResultAsync<MobileDevice, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "mobile.register.started",
fctx,
meta: {
externalDeviceId: payload.externalDeviceId,
ownerUserId,
},
});
const now = new Date();
return ResultAsync.fromPromise(
this.db
.insert(mobileDevice)
.values({
externalDeviceId: payload.externalDeviceId,
name: payload.name,
manufacturer: payload.manufacturer,
model: payload.model,
androidVersion: payload.androidVersion,
ownerUserId,
lastPingAt: now,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: mobileDevice.externalDeviceId,
set: {
name: payload.name,
manufacturer: payload.manufacturer,
model: payload.model,
androidVersion: payload.androidVersion,
ownerUserId,
lastPingAt: now,
updatedAt: now,
},
})
.returning(),
(error) => {
logDomainEvent({
level: "error",
event: "mobile.register.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { externalDeviceId: payload.externalDeviceId },
});
return mobileErrors.registerDeviceFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((result) => {
const device = result[0] as MobileDevice;
logDomainEvent({
event: "mobile.register.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { deviceId: device.id, externalDeviceId: device.externalDeviceId },
});
return device;
});
}
getDeviceByExternalId(
fctx: FlowExecCtx,
externalDeviceId: string,
): ResultAsync<MobileDevice, Err> {
return ResultAsync.fromPromise(
this.db
.select()
.from(mobileDevice)
.where(eq(mobileDevice.externalDeviceId, externalDeviceId))
.limit(1),
(error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((rows) => {
if (!rows[0]) {
return errAsync(mobileErrors.deviceNotFound(fctx, externalDeviceId));
}
return okAsync(rows[0] as MobileDevice);
});
}
touchDevicePing(
fctx: FlowExecCtx,
deviceId: number,
pingAt?: Date,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
const resolvedPingAt = pingAt ?? new Date();
logDomainEvent({
event: "mobile.ping.started",
fctx,
meta: { deviceId, pingAt: resolvedPingAt.toISOString() },
});
return ResultAsync.fromPromise(
this.db
.update(mobileDevice)
.set({ lastPingAt: resolvedPingAt, updatedAt: new Date() })
.where(eq(mobileDevice.id, deviceId))
.returning({ id: mobileDevice.id }),
(error) => {
logDomainEvent({
level: "error",
event: "mobile.ping.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { deviceId },
});
return mobileErrors.pingDeviceFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((updatedRows) => {
if (!updatedRows[0]) {
return errAsync(mobileErrors.deviceNotFoundById(fctx, deviceId));
}
logDomainEvent({
event: "mobile.ping.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { deviceId },
});
return okAsync(true);
});
}
syncSMS(
fctx: FlowExecCtx,
deviceId: number,
messages: MobileSMSInput[],
): ResultAsync<{ received: number; inserted: number }, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "mobile.sms.sync.started",
fctx,
meta: { deviceId, received: messages.length },
});
const now = new Date();
const parseResult = ResultAsync.combine(
messages.map((message) =>
this.normalizeDate(fctx, message.sentAt, "messages.sentAt", true).andThen(
(sentAt) =>
this.normalizeDate(
fctx,
(message.receivedAt ?? null) as Date | string | null,
"messages.receivedAt",
false,
).map((receivedAt) => ({
deviceId,
externalMessageId: message.externalMessageId ?? null,
sender: message.sender,
recipient: message.recipient ?? null,
body: message.body,
sentAt: sentAt as Date,
receivedAt,
dedupHash:
message.dedupHash ??
this.makeSMSDedupHash(deviceId, message),
rawPayload: message.rawPayload,
createdAt: now,
updatedAt: now,
})),
),
),
);
return parseResult.andThen((rows) =>
ResultAsync.fromPromise(
this.db
.insert(mobileSMS)
.values(rows)
.onConflictDoNothing()
.returning({ id: mobileSMS.id }),
(error) => {
logDomainEvent({
level: "error",
event: "mobile.sms.sync.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { deviceId, received: messages.length },
});
return mobileErrors.syncSMSFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((insertedRows) => {
logDomainEvent({
event: "mobile.sms.sync.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: {
deviceId,
received: messages.length,
inserted: insertedRows.length,
},
});
return { received: messages.length, inserted: insertedRows.length };
}),
);
}
syncMediaAssets(
fctx: FlowExecCtx,
deviceId: number,
assets: MobileMediaAssetInput[],
): ResultAsync<{ received: number; inserted: number }, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "mobile.media.sync.started",
fctx,
meta: { deviceId, received: assets.length },
});
const now = new Date();
const parseResult = ResultAsync.combine(
assets.map((asset) =>
this.normalizeDate(
fctx,
(asset.capturedAt ?? null) as Date | string | null,
"assets.capturedAt",
false,
).map((capturedAt) => ({
deviceId,
externalMediaId: asset.externalMediaId ?? null,
fileId: asset.fileId,
mimeType: asset.mimeType,
filename: asset.filename ?? null,
capturedAt,
sizeBytes: asset.sizeBytes ?? null,
hash: asset.hash ?? null,
metadata: asset.metadata,
createdAt: now,
updatedAt: now,
})),
),
);
return parseResult.andThen((rows) =>
ResultAsync.fromPromise(
this.db
.insert(mobileMediaAsset)
.values(rows)
.onConflictDoNothing()
.returning({ id: mobileMediaAsset.id }),
(error) => {
logDomainEvent({
level: "error",
event: "mobile.media.sync.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { deviceId, received: assets.length },
});
return mobileErrors.syncMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((insertedRows) => {
logDomainEvent({
event: "mobile.media.sync.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: {
deviceId,
received: assets.length,
inserted: insertedRows.length,
},
});
return { received: assets.length, inserted: insertedRows.length };
}),
);
}
listDevices(
fctx: FlowExecCtx,
filters: ListMobileDevicesFilters,
pagination: MobilePagination,
): ResultAsync<PaginatedMobileDevices, Err> {
const startedAt = Date.now();
const conditions = [eq(mobileDevice.ownerUserId, filters.ownerUserId)];
if (filters.search) {
conditions.push(
or(
like(mobileDevice.name, `%${filters.search}%`),
like(mobileDevice.externalDeviceId, `%${filters.search}%`),
like(mobileDevice.manufacturer, `%${filters.search}%`),
like(mobileDevice.model, `%${filters.search}%`),
)!,
);
}
const whereClause = and(...conditions);
return ResultAsync.fromPromise(
this.db.select({ count: count() }).from(mobileDevice).where(whereClause),
(error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((countRows) => {
const total = countRows[0]?.count || 0;
const page = pagination.page;
const pageSize = pagination.pageSize;
const offset = (page - 1) * pageSize;
const orderColumn =
pagination.sortBy === "createdAt"
? mobileDevice.createdAt
: pagination.sortBy === "updatedAt"
? mobileDevice.updatedAt
: mobileDevice.lastPingAt;
const orderFn = pagination.sortOrder === "asc" ? asc : desc;
return ResultAsync.fromPromise(
this.db
.select()
.from(mobileDevice)
.where(whereClause)
.orderBy(orderFn(orderColumn), desc(mobileDevice.id))
.limit(pageSize)
.offset(offset),
(error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).map((rows) => ({
data: rows as MobileDevice[],
total,
page,
pageSize,
totalPages: Math.ceil(total / pageSize),
}));
}).map((result) => {
logDomainEvent({
event: "mobile.devices.list.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { count: result.data.length, total: result.total },
});
return result;
});
}
getDeviceDetail(
fctx: FlowExecCtx,
deviceId: number,
ownerUserId: string,
): ResultAsync<MobileDeviceDetail, Err> {
const startedAt = Date.now();
return ResultAsync.fromPromise(
this.db
.select()
.from(mobileDevice)
.where(
and(
eq(mobileDevice.id, deviceId),
eq(mobileDevice.ownerUserId, ownerUserId),
),
)
.limit(1),
(error) =>
mobileErrors.listDevicesFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((devices) => {
const device = devices[0];
if (!device) {
return errAsync(mobileErrors.deviceNotFoundById(fctx, deviceId));
}
return ResultAsync.combine([
ResultAsync.fromPromise(
this.db
.select({ count: count() })
.from(mobileSMS)
.where(eq(mobileSMS.deviceId, deviceId)),
(error) =>
mobileErrors.listSMSFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
),
ResultAsync.fromPromise(
this.db
.select({ count: count() })
.from(mobileMediaAsset)
.where(eq(mobileMediaAsset.deviceId, deviceId)),
(error) =>
mobileErrors.listMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
),
]).map(([smsCountRows, mediaCountRows]) => ({
device: device as MobileDevice,
smsCount: smsCountRows[0]?.count || 0,
mediaCount: mediaCountRows[0]?.count || 0,
}));
}).map((result) => {
logDomainEvent({
event: "mobile.device.detail.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { deviceId },
});
return result;
});
}
listDeviceSMS(
fctx: FlowExecCtx,
filters: ListMobileDeviceSMSFilters,
pagination: MobilePagination,
): ResultAsync<PaginatedMobileSMS, Err> {
const conditions = [eq(mobileSMS.deviceId, filters.deviceId)];
if (filters.search) {
conditions.push(
or(
like(mobileSMS.sender, `%${filters.search}%`),
like(mobileSMS.recipient, `%${filters.search}%`),
like(mobileSMS.body, `%${filters.search}%`),
)!,
);
}
const whereClause = and(...conditions);
const page = pagination.page;
const pageSize = pagination.pageSize;
const offset = (page - 1) * pageSize;
const orderFn = pagination.sortOrder === "asc" ? asc : desc;
return ResultAsync.fromPromise(
this.db.select({ count: count() }).from(mobileSMS).where(whereClause),
(error) =>
mobileErrors.listSMSFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((countRows) =>
ResultAsync.fromPromise(
this.db
.select()
.from(mobileSMS)
.where(whereClause)
.orderBy(orderFn(mobileSMS.sentAt), desc(mobileSMS.id))
.limit(pageSize)
.offset(offset),
(error) =>
mobileErrors.listSMSFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).map((rows) => {
const total = countRows[0]?.count || 0;
return {
data: rows,
total,
page,
pageSize,
totalPages: Math.ceil(total / pageSize),
};
}),
);
}
listDeviceMedia(
fctx: FlowExecCtx,
filters: ListMobileDeviceMediaFilters,
pagination: MobilePagination,
): ResultAsync<PaginatedMobileMedia, Err> {
const conditions = [eq(mobileMediaAsset.deviceId, filters.deviceId)];
if (filters.mimeType) {
conditions.push(like(mobileMediaAsset.mimeType, `${filters.mimeType}%`));
}
if (filters.search) {
conditions.push(like(mobileMediaAsset.filename, `%${filters.search}%`));
}
const whereClause = and(...conditions);
const page = pagination.page;
const pageSize = pagination.pageSize;
const offset = (page - 1) * pageSize;
const orderFn = pagination.sortOrder === "asc" ? asc : desc;
return ResultAsync.fromPromise(
this.db
.select({ count: count() })
.from(mobileMediaAsset)
.where(whereClause),
(error) =>
mobileErrors.listMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((countRows) =>
ResultAsync.fromPromise(
this.db
.select()
.from(mobileMediaAsset)
.where(whereClause)
.orderBy(orderFn(mobileMediaAsset.createdAt), desc(mobileMediaAsset.id))
.limit(pageSize)
.offset(offset),
(error) =>
mobileErrors.listMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).map((rows) => {
const total = countRows[0]?.count || 0;
return {
data: rows,
total,
page,
pageSize,
totalPages: Math.ceil(total / pageSize),
};
}),
);
}
deleteMediaAsset(
fctx: FlowExecCtx,
mediaAssetId: number,
ownerUserId: string,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
return ResultAsync.fromPromise(
this.db
.select({
mediaAssetId: mobileMediaAsset.id,
fileId: mobileMediaAsset.fileId,
ownerUserId: mobileDevice.ownerUserId,
})
.from(mobileMediaAsset)
.innerJoin(mobileDevice, eq(mobileMediaAsset.deviceId, mobileDevice.id))
.where(
and(
eq(mobileMediaAsset.id, mediaAssetId),
eq(mobileDevice.ownerUserId, ownerUserId),
),
)
.limit(1),
(error) =>
mobileErrors.deleteMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((rows) => {
const target = rows[0];
if (!target) {
return errAsync(mobileErrors.mediaAssetNotFound(fctx, mediaAssetId));
}
return ResultAsync.fromPromise(
this.db.transaction(async (tx) => {
await tx
.delete(mobileMediaAsset)
.where(eq(mobileMediaAsset.id, mediaAssetId));
await tx.delete(file).where(eq(file.id, target.fileId));
return true;
}),
(error) =>
mobileErrors.deleteMediaFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
);
}).map((deleted) => {
logDomainEvent({
event: "mobile.media.delete.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { mediaAssetId, ownerUserId, deleted },
});
return deleted;
});
}
deleteDevice(
fctx: FlowExecCtx,
deviceId: number,
ownerUserId: string,
): ResultAsync<{ deleted: boolean; deletedFileCount: number }, Err> {
const startedAt = Date.now();
return ResultAsync.fromPromise(
this.db
.select({ id: mobileDevice.id })
.from(mobileDevice)
.where(
and(
eq(mobileDevice.id, deviceId),
eq(mobileDevice.ownerUserId, ownerUserId),
),
)
.limit(1),
(error) =>
mobileErrors.deleteDeviceFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
).andThen((devices) => {
if (!devices[0]) {
return errAsync(mobileErrors.deviceNotFoundById(fctx, deviceId));
}
return ResultAsync.fromPromise(
this.db.transaction(async (tx) => {
const mediaFiles = await tx
.select({ fileId: mobileMediaAsset.fileId })
.from(mobileMediaAsset)
.where(eq(mobileMediaAsset.deviceId, deviceId));
const fileIds = mediaFiles.map((item) => item.fileId);
await tx.delete(mobileDevice).where(eq(mobileDevice.id, deviceId));
if (fileIds.length > 0) {
await tx.delete(file).where(inArray(file.id, fileIds));
}
return { deleted: true, deletedFileCount: fileIds.length };
}),
(error) =>
mobileErrors.deleteDeviceFailed(
fctx,
error instanceof Error ? error.message : String(error),
),
);
}).map((result) => {
logDomainEvent({
event: "mobile.device.delete.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { deviceId, deletedFileCount: result.deletedFileCount },
});
return result;
});
}
}

12
pnpm-lock.yaml generated
View File

@@ -195,15 +195,27 @@ importers:
'@hono/node-server': '@hono/node-server':
specifier: ^1.19.9 specifier: ^1.19.9
version: 1.19.9(hono@4.12.3) version: 1.19.9(hono@4.12.3)
'@pkg/db':
specifier: workspace:*
version: link:../../packages/db
'@pkg/logger': '@pkg/logger':
specifier: workspace:* specifier: workspace:*
version: link:../../packages/logger version: link:../../packages/logger
'@pkg/logic': '@pkg/logic':
specifier: workspace:* specifier: workspace:*
version: link:../../packages/logic version: link:../../packages/logic
'@pkg/result':
specifier: workspace:*
version: link:../../packages/result
'@pkg/settings':
specifier: workspace:*
version: link:../../packages/settings
hono: hono:
specifier: ^4.11.1 specifier: ^4.11.1
version: 4.12.3 version: 4.12.3
valibot:
specifier: ^1.2.0
version: 1.2.0(typescript@5.9.3)
devDependencies: devDependencies:
'@types/node': '@types/node':
specifier: ^25.3.2 specifier: ^25.3.2

View File

@@ -3,5 +3,7 @@ packages:
- packages/* - packages/*
onlyBuiltDependencies: onlyBuiltDependencies:
- argon2
- esbuild - esbuild
- protobufjs
- sharp - sharp