updated mobile media uploader logic to be more redundant
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -8,6 +8,8 @@ node_modules
|
||||
__pycache__
|
||||
.venv
|
||||
|
||||
secret.md
|
||||
|
||||
# ignore generated log files
|
||||
**/logs/**.log
|
||||
**/logs/**.log.gz
|
||||
|
||||
@@ -12,6 +12,7 @@ import { logDomainEvent } from "@pkg/logger";
|
||||
import type { Context } from "hono";
|
||||
import * as v from "valibot";
|
||||
import { Hono } from "hono";
|
||||
import { createHash } from "node:crypto";
|
||||
|
||||
const mobileController = getMobileController();
|
||||
const fileController = getFileController();
|
||||
@@ -81,6 +82,15 @@ function parseOptionalJsonRecord(value: string | undefined): {
|
||||
}
|
||||
}
|
||||
|
||||
async function makeFileHash(file: File): Promise<string | null> {
|
||||
try {
|
||||
const buffer = Buffer.from(await file.arrayBuffer());
|
||||
return createHash("sha256").update(buffer).digest("hex");
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export const mobileRouter = new Hono()
|
||||
.post("/register", async (c) => {
|
||||
const fctx = buildFlowExecCtx(c);
|
||||
@@ -348,6 +358,82 @@ export const mobileRouter = new Hono()
|
||||
return respondError(c, fctx, deviceResult.error);
|
||||
}
|
||||
|
||||
const externalMediaId = parsed.output.externalMediaId ?? null;
|
||||
const mediaHash = parsed.output.hash ?? (await makeFileHash(fileEntry));
|
||||
if (externalMediaId) {
|
||||
const existingResult =
|
||||
await mobileController.findMediaAssetByExternalMediaId(
|
||||
fctx,
|
||||
deviceResult.value.id,
|
||||
externalMediaId,
|
||||
);
|
||||
if (existingResult.isErr()) {
|
||||
return respondError(c, fctx, existingResult.error);
|
||||
}
|
||||
|
||||
if (existingResult.value) {
|
||||
logDomainEvent({
|
||||
event: "processor.mobile.media_sync.duplicate_skipped",
|
||||
fctx,
|
||||
durationMs: Date.now() - startedAt,
|
||||
meta: {
|
||||
externalDeviceId,
|
||||
externalMediaId,
|
||||
deviceId: deviceResult.value.id,
|
||||
mediaAssetId: existingResult.value.id,
|
||||
fileId: existingResult.value.fileId,
|
||||
},
|
||||
});
|
||||
return c.json({
|
||||
data: {
|
||||
received: 1,
|
||||
inserted: 0,
|
||||
deviceId: deviceResult.value.id,
|
||||
fileId: existingResult.value.fileId,
|
||||
deduplicated: true,
|
||||
},
|
||||
error: null,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (!externalMediaId && mediaHash) {
|
||||
const existingByHash = await mobileController.findMediaAssetByHash(
|
||||
fctx,
|
||||
deviceResult.value.id,
|
||||
mediaHash,
|
||||
);
|
||||
if (existingByHash.isErr()) {
|
||||
return respondError(c, fctx, existingByHash.error);
|
||||
}
|
||||
|
||||
if (existingByHash.value) {
|
||||
logDomainEvent({
|
||||
event: "processor.mobile.media_sync.duplicate_skipped",
|
||||
fctx,
|
||||
durationMs: Date.now() - startedAt,
|
||||
meta: {
|
||||
externalDeviceId,
|
||||
dedupKey: "hash",
|
||||
mediaHash,
|
||||
deviceId: deviceResult.value.id,
|
||||
mediaAssetId: existingByHash.value.id,
|
||||
fileId: existingByHash.value.fileId,
|
||||
},
|
||||
});
|
||||
return c.json({
|
||||
data: {
|
||||
received: 1,
|
||||
inserted: 0,
|
||||
deviceId: deviceResult.value.id,
|
||||
fileId: existingByHash.value.fileId,
|
||||
deduplicated: true,
|
||||
},
|
||||
error: null,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const uploadResult = await fileController.uploadFile(
|
||||
fctx,
|
||||
deviceResult.value.ownerUserId,
|
||||
@@ -393,7 +479,7 @@ export const mobileRouter = new Hono()
|
||||
const result = await mobileController.syncMediaByExternalDeviceId(
|
||||
fctx,
|
||||
externalDeviceId,
|
||||
[{ ...parsed.output, fileId: uploadedFileId }],
|
||||
[{ ...parsed.output, hash: mediaHash ?? undefined, fileId: uploadedFileId }],
|
||||
);
|
||||
if (result.isErr()) {
|
||||
await fileController.deleteFiles(
|
||||
@@ -415,6 +501,64 @@ export const mobileRouter = new Hono()
|
||||
return respondError(c, fctx, result.error);
|
||||
}
|
||||
|
||||
if (result.value.inserted === 0) {
|
||||
const rollback = await fileController.deleteFiles(
|
||||
fctx,
|
||||
[uploadedFileId],
|
||||
deviceResult.value.ownerUserId,
|
||||
);
|
||||
if (rollback.isErr()) {
|
||||
logDomainEvent({
|
||||
level: "error",
|
||||
event: "processor.mobile.media_sync.duplicate_cleanup_failed",
|
||||
fctx,
|
||||
durationMs: Date.now() - startedAt,
|
||||
error: rollback.error,
|
||||
meta: {
|
||||
externalDeviceId,
|
||||
fileId: uploadedFileId,
|
||||
externalMediaId,
|
||||
},
|
||||
});
|
||||
return respondError(c, fctx, rollback.error);
|
||||
}
|
||||
|
||||
let dedupedFileId = uploadedFileId;
|
||||
if (externalMediaId) {
|
||||
const existingAfterInsert =
|
||||
await mobileController.findMediaAssetByExternalMediaId(
|
||||
fctx,
|
||||
deviceResult.value.id,
|
||||
externalMediaId,
|
||||
);
|
||||
if (existingAfterInsert.isOk() && existingAfterInsert.value) {
|
||||
dedupedFileId = existingAfterInsert.value.fileId;
|
||||
}
|
||||
}
|
||||
|
||||
logDomainEvent({
|
||||
event: "processor.mobile.media_sync.duplicate_cleaned",
|
||||
fctx,
|
||||
durationMs: Date.now() - startedAt,
|
||||
meta: {
|
||||
externalDeviceId,
|
||||
externalMediaId,
|
||||
deviceId: deviceResult.value.id,
|
||||
uploadedFileId,
|
||||
dedupedFileId,
|
||||
},
|
||||
});
|
||||
|
||||
return c.json({
|
||||
data: {
|
||||
...result.value,
|
||||
fileId: dedupedFileId,
|
||||
deduplicated: true,
|
||||
},
|
||||
error: null,
|
||||
});
|
||||
}
|
||||
|
||||
logDomainEvent({
|
||||
event: "processor.mobile.media_sync.succeeded",
|
||||
fctx,
|
||||
@@ -425,10 +569,12 @@ export const mobileRouter = new Hono()
|
||||
...result.value,
|
||||
},
|
||||
});
|
||||
|
||||
return c.json({
|
||||
data: {
|
||||
...result.value,
|
||||
fileId: uploadedFileId,
|
||||
deduplicated: false,
|
||||
},
|
||||
error: null,
|
||||
});
|
||||
|
||||
@@ -137,6 +137,39 @@ export class MobileController {
|
||||
});
|
||||
}
|
||||
|
||||
findMediaAssetByExternalMediaId(
|
||||
fctx: FlowExecCtx,
|
||||
deviceId: number,
|
||||
externalMediaId: string,
|
||||
) {
|
||||
return traceResultAsync({
|
||||
name: "mobile.media.findByExternalId",
|
||||
fctx,
|
||||
attributes: {
|
||||
"app.mobile.device_id": deviceId,
|
||||
"app.mobile.external_media_id": externalMediaId,
|
||||
},
|
||||
fn: () =>
|
||||
this.mobileRepo.findMediaAssetByExternalMediaId(
|
||||
fctx,
|
||||
deviceId,
|
||||
externalMediaId,
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
findMediaAssetByHash(fctx: FlowExecCtx, deviceId: number, hash: string) {
|
||||
return traceResultAsync({
|
||||
name: "mobile.media.findByHash",
|
||||
fctx,
|
||||
attributes: {
|
||||
"app.mobile.device_id": deviceId,
|
||||
"app.mobile.media_hash": hash,
|
||||
},
|
||||
fn: () => this.mobileRepo.findMediaAssetByHash(fctx, deviceId, hash),
|
||||
});
|
||||
}
|
||||
|
||||
listDevices(
|
||||
fctx: FlowExecCtx,
|
||||
filters: ListMobileDevicesFilters,
|
||||
|
||||
@@ -424,6 +424,60 @@ export class MobileRepository {
|
||||
);
|
||||
}
|
||||
|
||||
findMediaAssetByExternalMediaId(
|
||||
fctx: FlowExecCtx,
|
||||
deviceId: number,
|
||||
externalMediaId: string,
|
||||
): ResultAsync<{ id: number; fileId: string } | null, Err> {
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.select({
|
||||
id: mobileMediaAsset.id,
|
||||
fileId: mobileMediaAsset.fileId,
|
||||
})
|
||||
.from(mobileMediaAsset)
|
||||
.where(
|
||||
and(
|
||||
eq(mobileMediaAsset.deviceId, deviceId),
|
||||
eq(mobileMediaAsset.externalMediaId, externalMediaId),
|
||||
),
|
||||
)
|
||||
.limit(1),
|
||||
(error) =>
|
||||
mobileErrors.listMediaFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
),
|
||||
).map((rows) => rows[0] ?? null);
|
||||
}
|
||||
|
||||
findMediaAssetByHash(
|
||||
fctx: FlowExecCtx,
|
||||
deviceId: number,
|
||||
hash: string,
|
||||
): ResultAsync<{ id: number; fileId: string } | null, Err> {
|
||||
return ResultAsync.fromPromise(
|
||||
this.db
|
||||
.select({
|
||||
id: mobileMediaAsset.id,
|
||||
fileId: mobileMediaAsset.fileId,
|
||||
})
|
||||
.from(mobileMediaAsset)
|
||||
.where(
|
||||
and(
|
||||
eq(mobileMediaAsset.deviceId, deviceId),
|
||||
eq(mobileMediaAsset.hash, hash),
|
||||
),
|
||||
)
|
||||
.limit(1),
|
||||
(error) =>
|
||||
mobileErrors.listMediaFailed(
|
||||
fctx,
|
||||
error instanceof Error ? error.message : String(error),
|
||||
),
|
||||
).map((rows) => rows[0] ?? null);
|
||||
}
|
||||
|
||||
listDevices(
|
||||
fctx: FlowExecCtx,
|
||||
filters: ListMobileDevicesFilters,
|
||||
|
||||
@@ -38,6 +38,7 @@ export class R2StorageClient {
|
||||
this.s3Client = new S3Client({
|
||||
region: config.region,
|
||||
endpoint: config.endpoint,
|
||||
forcePathStyle: true,
|
||||
credentials: {
|
||||
accessKeyId: config.accessKey,
|
||||
secretAccessKey: config.secretKey,
|
||||
|
||||
@@ -146,7 +146,8 @@ Failure:
|
||||
- Media:
|
||||
- Raw file is uploaded first and persisted in `file`.
|
||||
- Then one `mobile_media_asset` row is created referencing uploaded `fileId`.
|
||||
- Dedup key: `(deviceId, externalMediaId)` when provided.
|
||||
- Dedup key #1: `(deviceId, externalMediaId)` when provided.
|
||||
- Dedup key #2 fallback: `(deviceId, hash)` where hash is client-provided or server-computed SHA-256 of file bytes.
|
||||
|
||||
## Operator Checklist
|
||||
|
||||
|
||||
Reference in New Issue
Block a user