'parallelized' the data posting

This commit is contained in:
bootunloader
2024-11-20 14:36:04 +02:00
parent 33b4d6c254
commit 1e75b062f0
3 changed files with 120 additions and 151 deletions

View File

@@ -19,7 +19,7 @@ export type APIRespnose<T> = {
};
export function buildMessageString(
i: number,
ptr: number,
rows: PostDataEntry[],
distributorId: number,
dealerId: number,
@@ -31,7 +31,7 @@ export function buildMessageString(
let total = 0;
let startReqId = new Date().getTime();
let x = 0;
for (let j = i; j < i + jumpSize; j++) {
for (let j = ptr; j < ptr + jumpSize; j++) {
if (j >= rows.length) {
break;
}
@@ -48,7 +48,7 @@ export function buildMessageString(
message += `${reqId},${distributorId},${dealerId},${drawId},${date},${no},${f},${s},${mTotal};`;
}
message = message.slice(0, -1);
return { message, total, jumped: i + jumpSize };
return { message, total, jumped: ptr + jumpSize };
}
export async function postDataToApi(payload: {
@@ -82,83 +82,99 @@ export async function postDataToApi(payload: {
}
try {
for (const userId in dataByUser) {
const session = payload.sessions[userId];
const usr = payload.users.find((u) => u.userId === userId);
if (!usr) {
console.log(`[!] User ${userId} not found for posting to api`);
return {
ok: false,
detail: "User not found to post data with",
errors: [{ message: "User not found for request" }] as ServerError,
};
}
const distId = usr.parentDistributor ?? 0;
const dealerId = Number(session.userId.split(":")[1]);
const drawId = Number(payload.draw.id.split(":")[1]);
const date = new Date().toISOString().split("T")[0];
let i = 0;
while (i < dataByUser[userId].length) {
let tries = 0;
while (tries < 3) {
let { message, total, jumped } = buildMessageString(
i,
dataByUser[userId],
distId,
dealerId,
drawId,
date,
);
const res = await sendBatchRequest(
session,
dealerId,
payload.draw,
total,
message,
);
const rj = (await res.json()) as APIRespnose<{
bookDtos: { bookId: string; requestId: number }[];
}>;
if (rj.code === 200 && res.status === 200) {
i = jumped;
responsesIds.push(
...rj.data.bookDtos.map((b) => ({
requestId: b.requestId as number,
bookId: b.bookId as string,
})),
);
successResponses++;
break;
}
failedResponses++;
tries++;
// Process all users concurrently
const userPromises = Object.entries(dataByUser).map(
async ([userId, userData]) => {
const session = payload.sessions[userId];
const usr = payload.users.find((u) => u.userId === userId);
if (!usr) {
throw new Error(`User ${userId} not found for posting to api`);
}
if (tries >= 3) {
console.log(
`[!] Failed to send data to api for user ${userId}, deleting all booked entries...`,
);
console.log(responsesIds);
if (responsesIds.length > 0) {
const out = await deleteAllBookedEntries({
data: responsesIds,
closeTime: payload.draw.closeTime,
const distId = usr.parentDistributor ?? 0;
const dealerId = Number(session.userId.split(":")[1]);
const drawId = Number(payload.draw.id.split(":")[1]);
const date = new Date().toISOString().split("T")[0];
let ptr = 0;
const userResponseIds = [] as { requestId: number; bookId: string }[];
while (ptr < userData.length) {
let tries = 0;
while (tries < 3) {
let { message, total, jumped } = buildMessageString(
ptr,
userData,
distId,
dealerId,
drawId,
date,
);
const res = await sendBatchRequest(
session,
});
console.log(await out.text());
dealerId,
payload.draw,
total,
message,
);
const rj = (await res.json()) as APIRespnose<{
bookDtos: { bookId: string; requestId: number }[];
}>;
if (rj.code === 200 && res.status === 200) {
ptr = jumped;
userResponseIds.push(
...rj.data.bookDtos.map((b) => ({
requestId: b.requestId as number,
bookId: b.bookId as string,
})),
);
successResponses++;
break;
}
failedResponses++;
tries++;
}
if (tries >= 3) {
if (userResponseIds.length > 0) {
const out = await deleteAllBookedEntries({
data: userResponseIds,
closeTime: payload.draw.closeTime,
dealerId,
drawId,
session,
});
console.log(await out.text());
}
throw new Error(`Failed to send data to api for user ${userId}`);
}
return {
ok: false,
detail: "Failed to post data to API halfway through",
errors: [
{ message: "Failed to post data to API halfway through" },
] as ServerError,
};
}
return userResponseIds;
},
);
// Wait for all user processes to complete
const results = await Promise.allSettled(userPromises);
// Process results
let hasErrors = false;
results.forEach((result) => {
if (result.status === "fulfilled") {
responsesIds.push(...result.value);
} else {
hasErrors = true;
console.log(`[!] Error processing user: ${result.reason}`);
}
});
if (hasErrors) {
return {
ok: false,
detail: "Failed to post data to API for some users",
errors: [
{ message: "Failed to post data to API for some users" },
] as ServerError,
};
}
console.log(`[+] Finished sending ${payload.data.length} requests`);
@@ -253,83 +269,6 @@ async function mockSendBatchRequest(
);
}
async function sendRequest(
requestId: number,
session: APISession,
body: PostDataEntry,
dealerId: number,
distributorId: number,
draw: Draw,
) {
return Fetch(`${constants.SCRAP_API_URL}/v1/book/add`, {
agent: new HttpsProxyAgent(`http://${session.ip}`),
method: "POST",
headers: {
...constants.SCRAP_API_BASE_HEADERS,
"Content-Type": "application/json;charset=UTF-8",
Authorization: session.sessionToken,
"User-Agent": getRandomUserAgent(),
},
body: JSON.stringify({
retryIndex: 0,
requestId: requestId,
date: new Date().toISOString().split("T")[0],
drawId: Number(draw.id.split(":")[1]),
closeTime: draw.closeTime,
dealerId: dealerId,
distributorId: distributorId,
number: body.number,
first: body.first,
second: body.second,
changedBalance: body.first + body.second,
}),
});
}
async function mockSendRequest(
requestId: number,
session: APISession,
body: PostDataEntry,
dealerId: number,
distributorId: number,
draw: Draw,
) {
// between 5 to 15 ms
await sleep(Math.floor(Math.random() * 10 + 5));
// // simulate a failed response, 20% of the time
if (Math.random() < 0.05) {
// return a failed response
return new Response(
JSON.stringify({
code: 500,
success: false,
message: "Failed",
data: {},
time: new Date().toISOString(),
}),
{
status: 500,
headers: { "Content-Type": "application/json" },
statusText: "Failed",
},
);
}
return new Response(
JSON.stringify({
code: 200,
success: true,
message: "Success",
data: {},
time: new Date().toISOString(),
}),
{
status: 200,
headers: { "Content-Type": "application/json" },
statusText: "OK",
},
);
}
async function deleteAllBookedEntries({
session,
data,