added file domain logic, updated drizzle package

This commit is contained in:
user
2026-03-01 05:56:15 +02:00
parent 1c2584df58
commit 5a5f565377
27 changed files with 5757 additions and 223 deletions

View File

@@ -0,0 +1,252 @@
import {
FileFilters,
FileShareRequest,
FileUpdateRequest,
FileUploadRequest,
PaginationOptions,
PresignedUploadRequest,
} from "./data";
import { FlowExecCtx } from "@core/flow.execution.context";
import { StorageRepository } from "./storage.repository";
import { FileRepository } from "./repository";
import { settings } from "@core/settings";
import { ResultAsync } from "neverthrow";
import { traceResultAsync } from "@core/observability";
import { db } from "@pkg/db";
export class FileController {
constructor(
private fileRepo: FileRepository,
private storageRepo: StorageRepository,
private publicUrl: string,
) {}
getFiles(
fctx: FlowExecCtx,
filters: FileFilters,
pagination: PaginationOptions,
) {
return traceResultAsync({
name: "logic.files.controller.getFiles",
fctx,
attributes: { "app.user.id": filters.userId },
fn: () => this.fileRepo.getFiles(fctx, filters, pagination),
});
}
getFile(fctx: FlowExecCtx, fileId: string, userId: string) {
return traceResultAsync({
name: "logic.files.controller.getFile",
fctx,
attributes: { "app.user.id": userId, "app.file.id": fileId },
fn: () => this.fileRepo.getFileById(fctx, fileId, userId),
});
}
uploadFile(
fctx: FlowExecCtx,
userId: string,
file: globalThis.File,
uploadRequest: FileUploadRequest,
) {
return traceResultAsync({
name: "logic.files.controller.uploadFile",
fctx,
attributes: { "app.user.id": userId, "app.file.name": file.name },
fn: () =>
ResultAsync.fromPromise(file.arrayBuffer(), (error) => ({
code: "INTERNAL_ERROR",
message: "Failed to read file buffer",
description: "Please try again",
detail: error instanceof Error ? error.message : String(error),
}))
.map((arrayBuffer) => Buffer.from(arrayBuffer))
.andThen((buffer) =>
this.storageRepo.uploadFile(
fctx,
buffer,
file.name,
file.type,
userId,
{
visibility:
(uploadRequest.visibility as
| "public"
| "private") || "private",
metadata: uploadRequest.metadata,
tags: uploadRequest.tags,
processImage: uploadRequest.processImage,
processDocument: uploadRequest.processDocument,
processVideo: uploadRequest.processVideo,
},
),
)
.andThen((fileMetadata) =>
this.fileRepo
.createFile(fctx, {
id: fileMetadata.id,
filename: fileMetadata.filename,
originalName: fileMetadata.originalName,
mimeType: fileMetadata.mimeType,
size: fileMetadata.size,
hash: fileMetadata.hash,
bucketName: fileMetadata.bucketName,
objectKey: fileMetadata.objectKey,
r2Url: fileMetadata.r2Url,
visibility: fileMetadata.visibility,
userId: fileMetadata.userId,
metadata: fileMetadata.metadata,
tags: fileMetadata.tags
? [...fileMetadata.tags]
: undefined,
status: "ready",
uploadedAt: fileMetadata.uploadedAt,
})
.map((dbFile) => ({
success: true,
file: dbFile,
uploadId: fileMetadata.id,
})),
),
});
}
generatePresignedUrl(
fctx: FlowExecCtx,
userId: string,
bucketName: string,
request: PresignedUploadRequest,
) {
const fileId = crypto.randomUUID();
const extension = request.filename.split(".").pop() || "";
const filename = `${fileId}.${extension}`;
const objectKey = `uploads/${userId}/${filename}`;
return traceResultAsync({
name: "logic.files.controller.generatePresignedUrl",
fctx,
attributes: { "app.user.id": userId, "app.file.id": fileId },
fn: () =>
this.storageRepo
.generatePresignedUploadUrl(
fctx,
objectKey,
request.mimeType,
3600,
)
.andThen((presignedData) =>
this.fileRepo
.createFile(fctx, {
id: fileId,
filename,
originalName: request.filename,
mimeType: request.mimeType,
size: request.size,
hash: "",
bucketName,
objectKey,
r2Url: `${this.publicUrl}/${bucketName}/${objectKey}`,
visibility: request.visibility || "private",
userId,
status: "processing",
uploadedAt: new Date(),
})
.map(() => ({
...presignedData,
fileId,
objectKey,
})),
),
});
}
updateFile(
fctx: FlowExecCtx,
fileId: string,
userId: string,
updates: FileUpdateRequest,
) {
return traceResultAsync({
name: "logic.files.controller.updateFile",
fctx,
attributes: { "app.user.id": userId, "app.file.id": fileId },
fn: () => this.fileRepo.updateFile(fctx, fileId, userId, updates),
});
}
deleteFiles(fctx: FlowExecCtx, fileIds: readonly string[], userId: string) {
return traceResultAsync({
name: "logic.files.controller.deleteFiles",
fctx,
attributes: {
"app.user.id": userId,
"app.files.count": fileIds.length,
},
fn: () =>
ResultAsync.combine(
[...fileIds].map((fileId) =>
this.fileRepo.getFileById(fctx, fileId, userId),
),
)
.map((files) => files.map((file) => file.objectKey))
.andThen((objectKeys) =>
this.storageRepo.deleteFiles(fctx, objectKeys),
)
.andThen(() =>
this.fileRepo.deleteFiles(fctx, fileIds, userId),
),
});
}
shareFile(
fctx: FlowExecCtx,
fileId: string,
ownerId: string,
shareRequest: FileShareRequest,
) {
return traceResultAsync({
name: "logic.files.controller.shareFile",
fctx,
attributes: { "app.user.id": ownerId, "app.file.id": fileId },
fn: () => this.fileRepo.shareFile(fctx, fileId, ownerId, shareRequest),
});
}
updateFileStatus(
fctx: FlowExecCtx,
fileId: string,
status: string,
processingError?: string,
) {
return traceResultAsync({
name: "logic.files.controller.updateFileStatus",
fctx,
attributes: { "app.file.id": fileId },
fn: () =>
this.fileRepo.updateFileStatus(
fctx,
fileId,
status,
processingError,
),
});
}
}
export function getFileController(): FileController {
return new FileController(
new FileRepository(db),
new StorageRepository({
bucketName: settings.r2BucketName || "",
region: settings.r2Region || "",
endpoint: settings.r2Endpoint || "",
accessKey: settings.r2AccessKey || "",
secretKey: settings.r2SecretKey || "",
publicUrl: settings.r2PublicUrl || "",
maxFileSize: settings.maxFileSize,
allowedMimeTypes: settings.allowedMimeTypes,
allowedExtensions: settings.allowedExtensions,
}),
settings.r2PublicUrl || "",
);
}

View File

@@ -0,0 +1,147 @@
import * as v from "valibot";
export const fileSchema = v.object({
id: v.string(),
filename: v.string(),
originalName: v.string(),
mimeType: v.string(),
size: v.pipe(v.number(), v.integer()),
hash: v.string(),
bucketName: v.string(),
objectKey: v.string(),
r2Url: v.string(),
visibility: v.string(),
userId: v.string(),
metadata: v.optional(v.record(v.string(), v.any())),
tags: v.optional(v.array(v.string())),
status: v.string(),
processingError: v.optional(v.string()),
uploadedAt: v.date(),
lastAccessedAt: v.optional(v.date()),
expiresAt: v.optional(v.date()),
createdAt: v.date(),
updatedAt: v.date(),
});
export type File = v.InferOutput<typeof fileSchema>;
export type Files = File[];
export const fileUploadRequestSchema = v.object({
visibility: v.optional(v.string()),
metadata: v.optional(v.record(v.string(), v.any())),
tags: v.optional(v.array(v.string())),
processImage: v.optional(v.boolean()),
processDocument: v.optional(v.boolean()),
processVideo: v.optional(v.boolean()),
});
export type FileUploadRequest = v.InferOutput<typeof fileUploadRequestSchema>;
export const fileFiltersSchema = v.object({
userId: v.string(),
mimeType: v.optional(v.string()),
visibility: v.optional(v.string()),
status: v.optional(v.string()),
search: v.optional(v.string()),
tags: v.optional(v.array(v.string())),
});
export type FileFilters = v.InferOutput<typeof fileFiltersSchema>;
export const paginationOptionsSchema = v.object({
page: v.pipe(v.number(), v.integer()),
pageSize: v.pipe(v.number(), v.integer()),
sortBy: v.optional(v.string()),
sortOrder: v.optional(v.string()),
});
export type PaginationOptions = v.InferOutput<typeof paginationOptionsSchema>;
export const PaginatedFilesSchema = v.object({
data: v.array(fileSchema),
total: v.pipe(v.number(), v.integer()),
page: v.pipe(v.number(), v.integer()),
pageSize: v.pipe(v.number(), v.integer()),
totalPages: v.pipe(v.number(), v.integer()),
});
export type PaginatedFiles = v.InferOutput<typeof PaginatedFilesSchema>;
export const getFilesSchema = v.object({
filters: fileFiltersSchema,
pagination: paginationOptionsSchema,
});
export type GetFiles = v.InferOutput<typeof getFilesSchema>;
export const presignedUploadRequestSchema = v.object({
filename: v.string(),
mimeType: v.string(),
size: v.pipe(v.number(), v.integer()),
visibility: v.optional(v.string()),
});
export type PresignedUploadRequest = v.InferOutput<
typeof presignedUploadRequestSchema
>;
export const presignedUploadResponseSchema = v.object({
uploadUrl: v.string(),
downloadUrl: v.optional(v.string()),
expiresIn: v.pipe(v.number(), v.integer()),
fileId: v.string(),
objectKey: v.string(),
fields: v.optional(v.record(v.string(), v.any())),
});
export type PresignedUploadResponse = v.InferOutput<
typeof presignedUploadResponseSchema
>;
export const fileUploadResultSchema = v.object({
success: v.boolean(),
file: v.optional(fileSchema),
uploadId: v.optional(v.string()),
error: v.optional(v.string()),
});
export type FileUploadResult = v.InferOutput<typeof fileUploadResultSchema>;
export const bulkFileIdsSchema = v.object({
fileIds: v.array(v.string()),
});
export type BulkFileIds = v.InferOutput<typeof bulkFileIdsSchema>;
export const fileUpdateRequestSchema = v.object({
filename: v.optional(v.string()),
visibility: v.optional(v.string()),
metadata: v.optional(v.record(v.string(), v.any())),
tags: v.optional(v.array(v.string())),
});
export type FileUpdateRequest = v.InferOutput<typeof fileUpdateRequestSchema>;
export const fileShareRequestSchema = v.object({
userId: v.string(),
permissions: v.object({
canRead: v.optional(v.boolean()),
canWrite: v.optional(v.boolean()),
canDelete: v.optional(v.boolean()),
canShare: v.optional(v.boolean()),
}),
expiresAt: v.optional(v.date()),
});
export type FileShareRequest = v.InferOutput<typeof fileShareRequestSchema>;
//
// Frontend specific models
//
export const clientFileFiltersSchema = v.object({
mimeType: v.optional(v.string()),
visibility: v.optional(v.string()),
status: v.optional(v.string()),
search: v.optional(v.string()),
tags: v.optional(v.array(v.string())),
});
export type ClientFileFilters = v.InferOutput<typeof clientFileFiltersSchema>;
export const clientPaginationOptionsSchema = v.object({
page: v.pipe(v.number(), v.integer()),
pageSize: v.pipe(v.number(), v.integer()),
sortBy: v.string(),
sortOrder: v.picklist(["asc", "desc"]),
});
export type ClientPaginationOptions = v.InferOutput<
typeof clientPaginationOptionsSchema
>;

View File

@@ -0,0 +1,132 @@
import { FlowExecCtx } from "@core/flow.execution.context";
import { ERROR_CODES, type Err } from "@pkg/result";
import { getError } from "@pkg/logger";
export const fileErrors = {
dbError: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Database operation failed",
description: "Please try again later",
detail,
}),
fileNotFound: (fctx: FlowExecCtx, fileId: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.NOT_FOUND,
message: "File not found",
description:
"The requested file does not exist or you don't have access to it",
detail: `File ID: ${fileId}`,
}),
getFilesFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to fetch files",
description: "Please try again later",
detail,
}),
getFileFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to get file",
description: "Please try again later",
detail,
}),
createFileFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to create file record",
description: "Please try again later",
detail,
}),
updateFileFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to update file",
description: "Please try again later",
detail,
}),
deleteFilesFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to delete files",
description: "Please try again later",
detail,
}),
updateStatusFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to update file status",
description: "Please try again later",
detail,
}),
shareFileFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.DATABASE_ERROR,
message: "Failed to share file",
description: "Please try again later",
detail,
}),
uploadFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.INTERNAL_SERVER_ERROR,
message: "File upload failed",
description: "Please try again later",
detail,
}),
noFileMetadata: (fctx: FlowExecCtx): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.INTERNAL_SERVER_ERROR,
message: "Upload succeeded but no file metadata returned",
description: "Please try uploading again",
detail: "Storage service returned no file metadata",
}),
presignedUrlFailed: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.INTERNAL_SERVER_ERROR,
message: "Failed to generate presigned URL",
description: "Please try again later",
detail,
}),
noPresignedData: (fctx: FlowExecCtx): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.INTERNAL_SERVER_ERROR,
message: "Failed to generate presigned URL",
description: "Please try again later",
detail: "Storage service returned no presigned data",
}),
storageError: (fctx: FlowExecCtx, detail: string): Err =>
getError({
flowId: fctx.flowId,
code: ERROR_CODES.STORAGE_ERROR,
message: "Storage operation failed",
description: "Please try again later",
detail,
}),
};

View File

@@ -0,0 +1,537 @@
import type {
File,
FileFilters,
FileShareRequest,
FileUpdateRequest,
PaginatedFiles,
PaginationOptions,
} from "./data";
import {
Database,
and,
asc,
count,
desc,
eq,
inArray,
like,
or,
sql,
} from "@pkg/db";
import { ResultAsync, errAsync, okAsync } from "neverthrow";
import { FlowExecCtx } from "@core/flow.execution.context";
import { file, fileAccess } from "@pkg/db/schema";
import { type Err } from "@pkg/result";
import { fileErrors } from "./errors";
import { logDomainEvent } from "@pkg/logger";
export class FileRepository {
constructor(private db: Database) {}
getFiles(
fctx: FlowExecCtx,
filters: FileFilters,
pagination: PaginationOptions,
): ResultAsync<PaginatedFiles, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.list.started",
fctx,
meta: {
userId: filters.userId,
hasSearch: Boolean(filters.search),
hasTags: Boolean(filters.tags?.length),
page: pagination.page,
pageSize: pagination.pageSize,
},
});
const { userId, mimeType, visibility, status, search, tags } = filters;
const {
page,
pageSize,
sortBy = "createdAt",
sortOrder = "desc",
} = pagination;
const conditions = [eq(file.userId, userId)];
if (mimeType) {
conditions.push(like(file.mimeType, `${mimeType}%`));
}
if (visibility) {
conditions.push(eq(file.visibility, visibility));
}
if (status) {
conditions.push(eq(file.status, status));
}
if (search) {
conditions.push(
or(
like(file.filename, `%${search}%`),
like(file.originalName, `%${search}%`),
)!,
);
}
if (tags && tags.length > 0) {
conditions.push(sql`${file.tags} @> ${JSON.stringify(tags)}`);
}
const whereClause = and(...conditions);
return ResultAsync.fromPromise(
this.db.select({ count: count() }).from(file).where(whereClause),
(error) => {
logDomainEvent({
level: "error",
event: "files.list.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { userId },
});
return fileErrors.getFilesFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((totalResult) => {
const total = totalResult[0]?.count || 0;
const offset = (page - 1) * pageSize;
const getOrderColumn = (currentSortBy: string) => {
switch (currentSortBy) {
case "createdAt":
return file.createdAt;
case "uploadedAt":
return file.uploadedAt;
case "size":
return file.size;
case "filename":
return file.filename;
default:
return file.createdAt;
}
};
const orderColumn = getOrderColumn(sortBy);
const orderFunc = sortOrder === "asc" ? asc : desc;
return ResultAsync.fromPromise(
this.db
.select()
.from(file)
.where(whereClause)
.orderBy(orderFunc(orderColumn))
.limit(pageSize)
.offset(offset),
(error) => {
logDomainEvent({
level: "error",
event: "files.list.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { userId },
});
return fileErrors.getFilesFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((data) => {
const totalPages = Math.ceil(total / pageSize);
logDomainEvent({
event: "files.list.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: {
userId,
page,
totalPages,
count: data.length,
},
});
return {
data: data as File[],
total,
page,
pageSize,
totalPages,
};
});
});
}
getFileById(
fctx: FlowExecCtx,
fileId: string,
userId: string,
): ResultAsync<File, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.get.started",
fctx,
meta: { fileId, userId },
});
return ResultAsync.fromPromise(
this.db
.select()
.from(file)
.where(and(eq(file.id, fileId), eq(file.userId, userId)))
.limit(1),
(error) => {
logDomainEvent({
level: "error",
event: "files.get.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { fileId, userId },
});
return fileErrors.getFileFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((result) => {
const dbFile = result[0];
if (!dbFile) {
logDomainEvent({
level: "warn",
event: "files.get.failed",
fctx,
durationMs: Date.now() - startedAt,
error: { code: "NOT_FOUND", message: "File not found" },
meta: { fileId, userId },
});
return errAsync(fileErrors.fileNotFound(fctx, fileId));
}
logDomainEvent({
event: "files.get.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { fileId, userId },
});
return okAsync(dbFile as File);
});
}
createFile(
fctx: FlowExecCtx,
fileData: Omit<File, "createdAt" | "updatedAt">,
): ResultAsync<File, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.create.started",
fctx,
meta: { userId: fileData.userId, filename: fileData.filename },
});
const now = new Date();
const insertData = {
...fileData,
createdAt: now,
updatedAt: now,
} as any;
return ResultAsync.fromPromise(
this.db.insert(file).values(insertData).returning(),
(error) => {
logDomainEvent({
level: "error",
event: "files.create.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { userId: fileData.userId, filename: fileData.filename },
});
return fileErrors.createFileFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map((result) => {
const created = result[0] as File;
logDomainEvent({
event: "files.create.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { fileId: created.id, userId: created.userId },
});
return created;
});
}
updateFile(
fctx: FlowExecCtx,
fileId: string,
userId: string,
updates: FileUpdateRequest,
): ResultAsync<File, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.update.started",
fctx,
meta: {
fileId,
userId,
hasFilename: updates.filename !== undefined,
hasMetadata: updates.metadata !== undefined,
hasTags: updates.tags !== undefined,
},
});
const updateData = {
...updates,
updatedAt: new Date(),
} as any;
return ResultAsync.fromPromise(
this.db
.update(file)
.set(updateData)
.where(and(eq(file.id, fileId), eq(file.userId, userId)))
.returning(),
(error) => {
logDomainEvent({
level: "error",
event: "files.update.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { fileId, userId },
});
return fileErrors.updateFileFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((result) => {
const updated = result[0];
if (!updated) {
logDomainEvent({
level: "warn",
event: "files.update.failed",
fctx,
durationMs: Date.now() - startedAt,
error: { code: "NOT_FOUND", message: "File not found" },
meta: { fileId, userId },
});
return errAsync(fileErrors.fileNotFound(fctx, fileId));
}
logDomainEvent({
event: "files.update.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { fileId, userId },
});
return okAsync(updated as File);
});
}
deleteFiles(
fctx: FlowExecCtx,
fileIds: readonly string[],
userId: string,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.delete.started",
fctx,
meta: { userId, fileCount: fileIds.length },
});
return ResultAsync.fromPromise(
this.db
.delete(file)
.where(and(eq(file.userId, userId), inArray(file.id, [...fileIds]))),
(error) => {
logDomainEvent({
level: "error",
event: "files.delete.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { userId, fileCount: fileIds.length },
});
return fileErrors.deleteFilesFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logDomainEvent({
event: "files.delete.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { userId, fileCount: fileIds.length },
});
return true;
});
}
updateFileStatus(
fctx: FlowExecCtx,
fileId: string,
status: string,
processingError?: string,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.status_update.started",
fctx,
meta: {
fileId,
status,
hasProcessingError: Boolean(processingError),
},
});
return ResultAsync.fromPromise(
this.db
.update(file)
.set({
status,
processingError,
updatedAt: new Date(),
})
.where(eq(file.id, fileId)),
(error) => {
logDomainEvent({
level: "error",
event: "files.status_update.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { fileId, status },
});
return fileErrors.updateStatusFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logDomainEvent({
event: "files.status_update.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { fileId, status },
});
return true;
});
}
shareFile(
fctx: FlowExecCtx,
fileId: string,
ownerId: string,
shareRequest: FileShareRequest,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.share.started",
fctx,
meta: {
fileId,
ownerId,
targetUserId: shareRequest.userId,
},
});
return ResultAsync.fromPromise(
this.db
.select()
.from(file)
.where(and(eq(file.id, fileId), eq(file.userId, ownerId)))
.limit(1),
(error) => {
logDomainEvent({
level: "error",
event: "files.share.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { fileId, ownerId, targetUserId: shareRequest.userId },
});
return fileErrors.shareFileFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((result) => {
const ownedFile = result[0];
if (!ownedFile) {
logDomainEvent({
level: "warn",
event: "files.share.failed",
fctx,
durationMs: Date.now() - startedAt,
error: {
code: "NOT_FOUND",
message: "File not found or not owned by user",
},
meta: { fileId, ownerId, targetUserId: shareRequest.userId },
});
return errAsync(fileErrors.fileNotFound(fctx, fileId));
}
const now = new Date();
return ResultAsync.fromPromise(
this.db
.insert(fileAccess)
.values({
fileId,
userId: shareRequest.userId,
canRead: shareRequest.permissions.canRead || false,
canWrite: shareRequest.permissions.canWrite || false,
canDelete: shareRequest.permissions.canDelete || false,
canShare: shareRequest.permissions.canShare || false,
grantedAt: now,
expiresAt: shareRequest.expiresAt,
createdAt: now,
updatedAt: now,
})
.onConflictDoNothing(),
(error) => {
logDomainEvent({
level: "error",
event: "files.share.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: {
fileId,
ownerId,
targetUserId: shareRequest.userId,
},
});
return fileErrors.shareFileFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).map(() => {
logDomainEvent({
event: "files.share.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { fileId, ownerId, targetUserId: shareRequest.userId },
});
return true;
});
});
}
}

View File

@@ -0,0 +1,287 @@
import { ResultAsync, errAsync, okAsync } from "neverthrow";
import { FlowExecCtx } from "@core/flow.execution.context";
import type { PresignedUploadResponse } from "./data";
import { R2StorageClient } from "@pkg/objectstorage";
import { type Err } from "@pkg/result";
import { fileErrors } from "./errors";
import { logDomainEvent } from "@pkg/logger";
export type StorageConfig = {
bucketName: string;
region: string;
endpoint: string;
accessKey: string;
secretKey: string;
publicUrl: string;
maxFileSize: number;
allowedMimeTypes: string[];
allowedExtensions: string[];
};
export type UploadOptions = {
visibility: "public" | "private";
metadata?: Record<string, any>;
tags?: string[];
processImage?: boolean;
processDocument?: boolean;
processVideo?: boolean;
};
export type UploadedFileMetadata = {
id: string;
filename: string;
originalName: string;
mimeType: string;
size: number;
hash: string;
bucketName: string;
objectKey: string;
r2Url: string;
visibility: string;
userId: string;
metadata?: Record<string, any>;
tags?: string[];
uploadedAt: Date;
};
export class StorageRepository {
private storageClient: R2StorageClient;
constructor(config: StorageConfig) {
this.storageClient = new R2StorageClient(config);
}
uploadFile(
fctx: FlowExecCtx,
buffer: Buffer,
filename: string,
mimeType: string,
userId: string,
options: UploadOptions,
): ResultAsync<UploadedFileMetadata, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.storage.upload.started",
fctx,
meta: {
userId,
filename,
mimeType,
size: buffer.length,
visibility: options.visibility,
},
});
return ResultAsync.fromPromise(
this.storageClient.uploadFile(
buffer,
filename,
mimeType,
userId,
options,
),
(error) => {
logDomainEvent({
level: "error",
event: "files.storage.upload.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { userId, filename },
});
return fileErrors.uploadFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((uploadResult) => {
if (uploadResult.error) {
logDomainEvent({
level: "error",
event: "files.storage.upload.failed",
fctx,
durationMs: Date.now() - startedAt,
error: uploadResult.error,
meta: { userId, filename, stage: "storage_response" },
});
return errAsync(
fileErrors.uploadFailed(fctx, String(uploadResult.error)),
);
}
const uploadData = uploadResult.data;
if (!uploadData || !uploadData.file) {
logDomainEvent({
level: "error",
event: "files.storage.upload.failed",
fctx,
durationMs: Date.now() - startedAt,
error: {
code: "NO_FILE_METADATA",
message: "Storage upload returned no file metadata",
},
meta: { userId, filename },
});
return errAsync(fileErrors.noFileMetadata(fctx));
}
logDomainEvent({
event: "files.storage.upload.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { userId, fileId: uploadData.file.id, filename },
});
return okAsync(uploadData.file as UploadedFileMetadata);
});
}
generatePresignedUploadUrl(
fctx: FlowExecCtx,
objectKey: string,
mimeType: string,
expiresIn: number,
): ResultAsync<PresignedUploadResponse, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.storage.presigned.started",
fctx,
meta: { objectKey, mimeType, expiresIn },
});
return ResultAsync.fromPromise(
this.storageClient.generatePresignedUploadUrl(
objectKey,
mimeType,
expiresIn,
),
(error) => {
logDomainEvent({
level: "error",
event: "files.storage.presigned.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { objectKey },
});
return fileErrors.presignedUrlFailed(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((result) => {
if (result.error) {
logDomainEvent({
level: "error",
event: "files.storage.presigned.failed",
fctx,
durationMs: Date.now() - startedAt,
error: result.error,
meta: { objectKey, stage: "storage_response" },
});
return errAsync(
fileErrors.presignedUrlFailed(fctx, String(result.error)),
);
}
const presignedData = result.data;
if (!presignedData) {
logDomainEvent({
level: "error",
event: "files.storage.presigned.failed",
fctx,
durationMs: Date.now() - startedAt,
error: {
code: "NO_PRESIGNED_DATA",
message: "No presigned data returned",
},
meta: { objectKey },
});
return errAsync(fileErrors.noPresignedData(fctx));
}
logDomainEvent({
event: "files.storage.presigned.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { objectKey },
});
return okAsync(presignedData as PresignedUploadResponse);
});
}
deleteFile(
fctx: FlowExecCtx,
objectKey: string,
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.storage.delete.started",
fctx,
meta: { objectKey },
});
return ResultAsync.fromPromise(
this.storageClient.deleteFile(objectKey),
(error) => {
logDomainEvent({
level: "error",
event: "files.storage.delete.failed",
fctx,
durationMs: Date.now() - startedAt,
error,
meta: { objectKey },
});
return fileErrors.storageError(
fctx,
error instanceof Error ? error.message : String(error),
);
},
).andThen((result) => {
if (result.error) {
logDomainEvent({
level: "error",
event: "files.storage.delete.failed",
fctx,
durationMs: Date.now() - startedAt,
error: result.error,
meta: { objectKey, stage: "storage_response" },
});
return errAsync(
fileErrors.storageError(fctx, String(result.error)),
);
}
logDomainEvent({
event: "files.storage.delete.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { objectKey },
});
return okAsync(true);
});
}
deleteFiles(
fctx: FlowExecCtx,
objectKeys: string[],
): ResultAsync<boolean, Err> {
const startedAt = Date.now();
logDomainEvent({
event: "files.storage.delete_many.started",
fctx,
meta: { fileCount: objectKeys.length },
});
return ResultAsync.combine(
objectKeys.map((key) => this.deleteFile(fctx, key)),
).map(() => {
logDomainEvent({
event: "files.storage.delete_many.succeeded",
fctx,
durationMs: Date.now() - startedAt,
meta: { fileCount: objectKeys.length },
});
return true;
});
}
}