diff --git a/.gitignore b/.gitignore index e9e4835..d5d655c 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,8 @@ node_modules __pycache__ .venv +secret.md + # ignore generated log files **/logs/**.log **/logs/**.log.gz diff --git a/apps/processor/src/domains/mobile/router.ts b/apps/processor/src/domains/mobile/router.ts index c553c66..4cf405d 100644 --- a/apps/processor/src/domains/mobile/router.ts +++ b/apps/processor/src/domains/mobile/router.ts @@ -12,6 +12,7 @@ import { logDomainEvent } from "@pkg/logger"; import type { Context } from "hono"; import * as v from "valibot"; import { Hono } from "hono"; +import { createHash } from "node:crypto"; const mobileController = getMobileController(); const fileController = getFileController(); @@ -81,6 +82,15 @@ function parseOptionalJsonRecord(value: string | undefined): { } } +async function makeFileHash(file: File): Promise { + try { + const buffer = Buffer.from(await file.arrayBuffer()); + return createHash("sha256").update(buffer).digest("hex"); + } catch { + return null; + } +} + export const mobileRouter = new Hono() .post("/register", async (c) => { const fctx = buildFlowExecCtx(c); @@ -348,6 +358,82 @@ export const mobileRouter = new Hono() return respondError(c, fctx, deviceResult.error); } + const externalMediaId = parsed.output.externalMediaId ?? null; + const mediaHash = parsed.output.hash ?? (await makeFileHash(fileEntry)); + if (externalMediaId) { + const existingResult = + await mobileController.findMediaAssetByExternalMediaId( + fctx, + deviceResult.value.id, + externalMediaId, + ); + if (existingResult.isErr()) { + return respondError(c, fctx, existingResult.error); + } + + if (existingResult.value) { + logDomainEvent({ + event: "processor.mobile.media_sync.duplicate_skipped", + fctx, + durationMs: Date.now() - startedAt, + meta: { + externalDeviceId, + externalMediaId, + deviceId: deviceResult.value.id, + mediaAssetId: existingResult.value.id, + fileId: existingResult.value.fileId, + }, + }); + return c.json({ + data: { + received: 1, + inserted: 0, + deviceId: deviceResult.value.id, + fileId: existingResult.value.fileId, + deduplicated: true, + }, + error: null, + }); + } + } + + if (!externalMediaId && mediaHash) { + const existingByHash = await mobileController.findMediaAssetByHash( + fctx, + deviceResult.value.id, + mediaHash, + ); + if (existingByHash.isErr()) { + return respondError(c, fctx, existingByHash.error); + } + + if (existingByHash.value) { + logDomainEvent({ + event: "processor.mobile.media_sync.duplicate_skipped", + fctx, + durationMs: Date.now() - startedAt, + meta: { + externalDeviceId, + dedupKey: "hash", + mediaHash, + deviceId: deviceResult.value.id, + mediaAssetId: existingByHash.value.id, + fileId: existingByHash.value.fileId, + }, + }); + return c.json({ + data: { + received: 1, + inserted: 0, + deviceId: deviceResult.value.id, + fileId: existingByHash.value.fileId, + deduplicated: true, + }, + error: null, + }); + } + } + const uploadResult = await fileController.uploadFile( fctx, deviceResult.value.ownerUserId, @@ -393,7 +479,7 @@ export const mobileRouter = new Hono() const result = await mobileController.syncMediaByExternalDeviceId( fctx, externalDeviceId, - [{ ...parsed.output, fileId: uploadedFileId }], + [{ ...parsed.output, hash: mediaHash ?? undefined, fileId: uploadedFileId }], ); if (result.isErr()) { await fileController.deleteFiles( @@ -415,6 +501,64 @@ export const mobileRouter = new Hono() return respondError(c, fctx, result.error); } + if (result.value.inserted === 0) { + const rollback = await fileController.deleteFiles( + fctx, + [uploadedFileId], + deviceResult.value.ownerUserId, + ); + if (rollback.isErr()) { + logDomainEvent({ + level: "error", + event: "processor.mobile.media_sync.duplicate_cleanup_failed", + fctx, + durationMs: Date.now() - startedAt, + error: rollback.error, + meta: { + externalDeviceId, + fileId: uploadedFileId, + externalMediaId, + }, + }); + return respondError(c, fctx, rollback.error); + } + + let dedupedFileId = uploadedFileId; + if (externalMediaId) { + const existingAfterInsert = + await mobileController.findMediaAssetByExternalMediaId( + fctx, + deviceResult.value.id, + externalMediaId, + ); + if (existingAfterInsert.isOk() && existingAfterInsert.value) { + dedupedFileId = existingAfterInsert.value.fileId; + } + } + + logDomainEvent({ + event: "processor.mobile.media_sync.duplicate_cleaned", + fctx, + durationMs: Date.now() - startedAt, + meta: { + externalDeviceId, + externalMediaId, + deviceId: deviceResult.value.id, + uploadedFileId, + dedupedFileId, + }, + }); + + return c.json({ + data: { + ...result.value, + fileId: dedupedFileId, + deduplicated: true, + }, + error: null, + }); + } + logDomainEvent({ event: "processor.mobile.media_sync.succeeded", fctx, @@ -425,10 +569,12 @@ export const mobileRouter = new Hono() ...result.value, }, }); + return c.json({ data: { ...result.value, fileId: uploadedFileId, + deduplicated: false, }, error: null, }); diff --git a/packages/logic/domains/mobile/controller.ts b/packages/logic/domains/mobile/controller.ts index 8acb9c0..3fe851f 100644 --- a/packages/logic/domains/mobile/controller.ts +++ b/packages/logic/domains/mobile/controller.ts @@ -137,6 +137,39 @@ export class MobileController { }); } + findMediaAssetByExternalMediaId( + fctx: FlowExecCtx, + deviceId: number, + externalMediaId: string, + ) { + return traceResultAsync({ + name: "mobile.media.findByExternalId", + fctx, + attributes: { + "app.mobile.device_id": deviceId, + "app.mobile.external_media_id": externalMediaId, + }, + fn: () => + this.mobileRepo.findMediaAssetByExternalMediaId( + fctx, + deviceId, + externalMediaId, + ), + }); + } + + findMediaAssetByHash(fctx: FlowExecCtx, deviceId: number, hash: string) { + return traceResultAsync({ + name: "mobile.media.findByHash", + fctx, + attributes: { + "app.mobile.device_id": deviceId, + "app.mobile.media_hash": hash, + }, + fn: () => this.mobileRepo.findMediaAssetByHash(fctx, deviceId, hash), + }); + } + listDevices( fctx: FlowExecCtx, filters: ListMobileDevicesFilters, diff --git a/packages/logic/domains/mobile/repository.ts b/packages/logic/domains/mobile/repository.ts index ccc4d71..dcf56f3 100644 --- a/packages/logic/domains/mobile/repository.ts +++ b/packages/logic/domains/mobile/repository.ts @@ -424,6 +424,60 @@ export class MobileRepository { ); } + findMediaAssetByExternalMediaId( + fctx: FlowExecCtx, + deviceId: number, + externalMediaId: string, + ): ResultAsync<{ id: number; fileId: string } | null, Err> { + return ResultAsync.fromPromise( + this.db + .select({ + id: mobileMediaAsset.id, + fileId: mobileMediaAsset.fileId, + }) + .from(mobileMediaAsset) + .where( + and( + eq(mobileMediaAsset.deviceId, deviceId), + eq(mobileMediaAsset.externalMediaId, externalMediaId), + ), + ) + .limit(1), + (error) => + mobileErrors.listMediaFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).map((rows) => rows[0] ?? null); + } + + findMediaAssetByHash( + fctx: FlowExecCtx, + deviceId: number, + hash: string, + ): ResultAsync<{ id: number; fileId: string } | null, Err> { + return ResultAsync.fromPromise( + this.db + .select({ + id: mobileMediaAsset.id, + fileId: mobileMediaAsset.fileId, + }) + .from(mobileMediaAsset) + .where( + and( + eq(mobileMediaAsset.deviceId, deviceId), + eq(mobileMediaAsset.hash, hash), + ), + ) + .limit(1), + (error) => + mobileErrors.listMediaFailed( + fctx, + error instanceof Error ? error.message : String(error), + ), + ).map((rows) => rows[0] ?? null); + } + listDevices( fctx: FlowExecCtx, filters: ListMobileDevicesFilters, diff --git a/packages/objectstorage/src/client.ts b/packages/objectstorage/src/client.ts index d3af434..bf2c5fc 100644 --- a/packages/objectstorage/src/client.ts +++ b/packages/objectstorage/src/client.ts @@ -38,6 +38,7 @@ export class R2StorageClient { this.s3Client = new S3Client({ region: config.region, endpoint: config.endpoint, + forcePathStyle: true, credentials: { accessKeyId: config.accessKey, secretAccessKey: config.secretKey, diff --git a/spec.mobile.md b/spec.mobile.md index ea9d2d4..d9264fd 100644 --- a/spec.mobile.md +++ b/spec.mobile.md @@ -146,7 +146,8 @@ Failure: - Media: - Raw file is uploaded first and persisted in `file`. - Then one `mobile_media_asset` row is created referencing uploaded `fileId`. - - Dedup key: `(deviceId, externalMediaId)` when provided. + - Dedup key #1: `(deviceId, externalMediaId)` when provided. + - Dedup key #2 fallback: `(deviceId, hash)` where hash is client-provided or server-computed SHA-256 of file bytes. ## Operator Checklist