diff --git a/src/lib/gramjs/client/downloadFile.ts b/src/lib/gramjs/client/downloadFile.ts index 150b0376b..d6e31cb74 100644 --- a/src/lib/gramjs/client/downloadFile.ts +++ b/src/lib/gramjs/client/downloadFile.ts @@ -5,6 +5,7 @@ import { sleep } from '../Helpers'; import { getDownloadPartSize } from '../Utils'; import errors from '../errors'; import Deferred from '../../../util/Deferred'; +import { Foreman } from '../../../util/foreman'; interface OnProgress { isCanceled?: boolean; @@ -37,36 +38,6 @@ const SENDER_TIMEOUT = 60 * 1000; // Telegram may have server issues so we try several times const SENDER_RETRIES = 5; -class Foreman { - private deferreds: Deferred[] = []; - - activeWorkers = 0; - - constructor(private maxWorkers: number) { - } - - requestWorker() { - if (this.activeWorkers === this.maxWorkers) { - const deferred = new Deferred(); - this.deferreds.push(deferred); - return deferred.promise; - } else { - this.activeWorkers++; - } - - return Promise.resolve(); - } - - releaseWorker() { - if (this.deferreds.length && (this.activeWorkers === this.maxWorkers)) { - const deferred = this.deferreds.shift()!; - deferred.resolve(); - } else { - this.activeWorkers--; - } - } -} - class FileView { private type: 'memory' | 'opfs'; diff --git a/src/lib/gramjs/client/uploadFile.ts b/src/lib/gramjs/client/uploadFile.ts index a673ef7c5..f140df7a3 100644 --- a/src/lib/gramjs/client/uploadFile.ts +++ b/src/lib/gramjs/client/uploadFile.ts @@ -5,6 +5,7 @@ import type TelegramClient from './TelegramClient'; import { generateRandomBytes, readBigIntFromBuffer, sleep } from '../Helpers'; import { getUploadPartSize } from '../Utils'; import errors from '../errors'; +import { Foreman } from '../../../util/foreman'; interface OnProgress { isCanceled?: boolean; @@ -22,13 +23,20 @@ export interface UploadFileParams { const KB_TO_BYTES = 1024; const LARGE_FILE_THRESHOLD = 10 * 1024 * 1024; const DISCONNECT_SLEEP = 1000; +const MAX_CONCURRENT_CONNECTIONS = 3; +const MAX_CONCURRENT_CONNECTIONS_PREMIUM = 6; +const MAX_WORKERS_PER_CONNECTION = 10; + +const foremans = Array(MAX_CONCURRENT_CONNECTIONS_PREMIUM).fill(undefined) + .map(() => new Foreman(MAX_WORKERS_PER_CONNECTION)); export async function uploadFile( client: TelegramClient, fileParams: UploadFileParams, ): Promise { const { file, onProgress } = fileParams; - let { workers } = fileParams; + + const isPremium = Boolean(client.isPremium); const { name, size } = file; const fileId = readBigIntFromBuffer(generateRandomBytes(8), true, true); @@ -37,81 +45,87 @@ export async function uploadFile( const partSize = getUploadPartSize(size) * KB_TO_BYTES; const partCount = Math.floor((size + partSize - 1) / partSize); - // Make sure a new sender can be created before starting upload - await client.getSender(client.session.dcId); - - if (!workers || !size) { - workers = 1; - } - if (workers >= partCount) { - workers = partCount; - } + // Pick the least busy foreman + // For some reason, fresh connections give out a higher speed for the first couple of seconds + // I have no idea why, but this may speed up the download of small files + const activeCounts = foremans.map((l) => l.activeWorkers); + let currentForemanIndex = activeCounts.indexOf(Math.min(...activeCounts)); let progress = 0; if (onProgress) { onProgress(progress); } - for (let i = 0; i < partCount; i += workers) { - const sendingParts = []; - let end = i + workers; - if (end > partCount) { - end = partCount; + const promises: Promise[] = []; + + for (let i = 0; i < partCount; i++) { + const senderIndex = currentForemanIndex % ( + isPremium ? MAX_CONCURRENT_CONNECTIONS_PREMIUM : MAX_CONCURRENT_CONNECTIONS + ); + + await foremans[senderIndex].requestWorker(); + + if (onProgress?.isCanceled) { + foremans[senderIndex].releaseWorker(); + break; } - for (let j = i; j < end; j++) { - const blobSlice = file.slice(j * partSize, (j + 1) * partSize); - - // eslint-disable-next-line no-loop-func, @typescript-eslint/no-loop-func - sendingParts.push((async (jMemo: number, blobSliceMemo: Blob) => { - // eslint-disable-next-line no-constant-condition - while (true) { - let sender; - try { - // We always upload from the DC we are in - sender = await client.getSender(client.session.dcId); - const partBytes = await blobSliceMemo.arrayBuffer(); - await sender.send( - isLarge - ? new Api.upload.SaveBigFilePart({ - fileId, - filePart: jMemo, - fileTotalParts: partCount, - bytes: Buffer.from(partBytes), - }) - : new Api.upload.SaveFilePart({ - fileId, - filePart: jMemo, - bytes: Buffer.from(partBytes), - }), - ); - } catch (err) { - if (sender && !sender.isConnected()) { - await sleep(DISCONNECT_SLEEP); - continue; - } else if (err instanceof errors.FloodWaitError) { - await sleep(err.seconds * 1000); - continue; - } - throw err; + const blobSlice = file.slice(i * partSize, (i + 1) * partSize); + // eslint-disable-next-line no-loop-func, @typescript-eslint/no-loop-func + promises.push((async (jMemo: number, blobSliceMemo: Blob) => { + // eslint-disable-next-line no-constant-condition + while (true) { + let sender; + try { + // We always upload from the DC we are in + sender = await client.getSender(client.session.dcId, senderIndex, isPremium); + const partBytes = await blobSliceMemo.arrayBuffer(); + await sender.send( + isLarge + ? new Api.upload.SaveBigFilePart({ + fileId, + filePart: jMemo, + fileTotalParts: partCount, + bytes: Buffer.from(partBytes), + }) + : new Api.upload.SaveFilePart({ + fileId, + filePart: jMemo, + bytes: Buffer.from(partBytes), + }), + ); + } catch (err) { + if (sender && !sender.isConnected()) { + await sleep(DISCONNECT_SLEEP); + continue; + } else if (err instanceof errors.FloodWaitError) { + await sleep(err.seconds * 1000); + continue; } + foremans[senderIndex].releaseWorker(); - if (onProgress) { - if (onProgress.isCanceled) { - throw new Error('USER_CANCELED'); - } - - progress += (1 / partCount); - onProgress(progress); - } - break; + throw err; } - })(j, blobSlice)); - } - await Promise.all(sendingParts); + foremans[senderIndex].releaseWorker(); + + if (onProgress) { + if (onProgress.isCanceled) { + throw new Error('USER_CANCELED'); + } + + progress += (1 / partCount); + onProgress(progress); + } + break; + } + })(i, blobSlice)); + + currentForemanIndex++; } + await Promise.all(promises); + return isLarge ? new Api.InputFileBig({ id: fileId, diff --git a/src/util/foreman.ts b/src/util/foreman.ts new file mode 100644 index 000000000..98ef75111 --- /dev/null +++ b/src/util/foreman.ts @@ -0,0 +1,31 @@ +import Deferred from './Deferred'; + +export class Foreman { + private deferreds: Deferred[] = []; + + activeWorkers = 0; + + constructor(private maxWorkers: number) { + } + + requestWorker() { + if (this.activeWorkers === this.maxWorkers) { + const deferred = new Deferred(); + this.deferreds.push(deferred); + return deferred.promise; + } else { + this.activeWorkers++; + } + + return Promise.resolve(); + } + + releaseWorker() { + if (this.deferreds.length && (this.activeWorkers === this.maxWorkers)) { + const deferred = this.deferreds.shift()!; + deferred.resolve(); + } else { + this.activeWorkers--; + } + } +}