GramJs: Use multiple connections for faster upload (#2154)

This commit is contained in:
Alexander Zinchuk 2022-12-06 13:29:38 +01:00
parent 378f35da9f
commit d216688757
3 changed files with 108 additions and 92 deletions

View File

@ -5,6 +5,7 @@ import { sleep } from '../Helpers';
import { getDownloadPartSize } from '../Utils';
import errors from '../errors';
import Deferred from '../../../util/Deferred';
import { Foreman } from '../../../util/foreman';
interface OnProgress {
isCanceled?: boolean;
@ -37,36 +38,6 @@ const SENDER_TIMEOUT = 60 * 1000;
// Telegram may have server issues so we try several times
const SENDER_RETRIES = 5;
class Foreman {
private deferreds: Deferred[] = [];
activeWorkers = 0;
constructor(private maxWorkers: number) {
}
requestWorker() {
if (this.activeWorkers === this.maxWorkers) {
const deferred = new Deferred();
this.deferreds.push(deferred);
return deferred.promise;
} else {
this.activeWorkers++;
}
return Promise.resolve();
}
releaseWorker() {
if (this.deferreds.length && (this.activeWorkers === this.maxWorkers)) {
const deferred = this.deferreds.shift()!;
deferred.resolve();
} else {
this.activeWorkers--;
}
}
}
class FileView {
private type: 'memory' | 'opfs';

View File

@ -5,6 +5,7 @@ import type TelegramClient from './TelegramClient';
import { generateRandomBytes, readBigIntFromBuffer, sleep } from '../Helpers';
import { getUploadPartSize } from '../Utils';
import errors from '../errors';
import { Foreman } from '../../../util/foreman';
interface OnProgress {
isCanceled?: boolean;
@ -22,13 +23,20 @@ export interface UploadFileParams {
const KB_TO_BYTES = 1024;
const LARGE_FILE_THRESHOLD = 10 * 1024 * 1024;
const DISCONNECT_SLEEP = 1000;
const MAX_CONCURRENT_CONNECTIONS = 3;
const MAX_CONCURRENT_CONNECTIONS_PREMIUM = 6;
const MAX_WORKERS_PER_CONNECTION = 10;
const foremans = Array(MAX_CONCURRENT_CONNECTIONS_PREMIUM).fill(undefined)
.map(() => new Foreman(MAX_WORKERS_PER_CONNECTION));
export async function uploadFile(
client: TelegramClient,
fileParams: UploadFileParams,
): Promise<Api.InputFile | Api.InputFileBig> {
const { file, onProgress } = fileParams;
let { workers } = fileParams;
const isPremium = Boolean(client.isPremium);
const { name, size } = file;
const fileId = readBigIntFromBuffer(generateRandomBytes(8), true, true);
@ -37,81 +45,87 @@ export async function uploadFile(
const partSize = getUploadPartSize(size) * KB_TO_BYTES;
const partCount = Math.floor((size + partSize - 1) / partSize);
// Make sure a new sender can be created before starting upload
await client.getSender(client.session.dcId);
if (!workers || !size) {
workers = 1;
}
if (workers >= partCount) {
workers = partCount;
}
// Pick the least busy foreman
// For some reason, fresh connections give out a higher speed for the first couple of seconds
// I have no idea why, but this may speed up the download of small files
const activeCounts = foremans.map((l) => l.activeWorkers);
let currentForemanIndex = activeCounts.indexOf(Math.min(...activeCounts));
let progress = 0;
if (onProgress) {
onProgress(progress);
}
for (let i = 0; i < partCount; i += workers) {
const sendingParts = [];
let end = i + workers;
if (end > partCount) {
end = partCount;
const promises: Promise<any>[] = [];
for (let i = 0; i < partCount; i++) {
const senderIndex = currentForemanIndex % (
isPremium ? MAX_CONCURRENT_CONNECTIONS_PREMIUM : MAX_CONCURRENT_CONNECTIONS
);
await foremans[senderIndex].requestWorker();
if (onProgress?.isCanceled) {
foremans[senderIndex].releaseWorker();
break;
}
for (let j = i; j < end; j++) {
const blobSlice = file.slice(j * partSize, (j + 1) * partSize);
// eslint-disable-next-line no-loop-func, @typescript-eslint/no-loop-func
sendingParts.push((async (jMemo: number, blobSliceMemo: Blob) => {
// eslint-disable-next-line no-constant-condition
while (true) {
let sender;
try {
// We always upload from the DC we are in
sender = await client.getSender(client.session.dcId);
const partBytes = await blobSliceMemo.arrayBuffer();
await sender.send(
isLarge
? new Api.upload.SaveBigFilePart({
fileId,
filePart: jMemo,
fileTotalParts: partCount,
bytes: Buffer.from(partBytes),
})
: new Api.upload.SaveFilePart({
fileId,
filePart: jMemo,
bytes: Buffer.from(partBytes),
}),
);
} catch (err) {
if (sender && !sender.isConnected()) {
await sleep(DISCONNECT_SLEEP);
continue;
} else if (err instanceof errors.FloodWaitError) {
await sleep(err.seconds * 1000);
continue;
}
throw err;
const blobSlice = file.slice(i * partSize, (i + 1) * partSize);
// eslint-disable-next-line no-loop-func, @typescript-eslint/no-loop-func
promises.push((async (jMemo: number, blobSliceMemo: Blob) => {
// eslint-disable-next-line no-constant-condition
while (true) {
let sender;
try {
// We always upload from the DC we are in
sender = await client.getSender(client.session.dcId, senderIndex, isPremium);
const partBytes = await blobSliceMemo.arrayBuffer();
await sender.send(
isLarge
? new Api.upload.SaveBigFilePart({
fileId,
filePart: jMemo,
fileTotalParts: partCount,
bytes: Buffer.from(partBytes),
})
: new Api.upload.SaveFilePart({
fileId,
filePart: jMemo,
bytes: Buffer.from(partBytes),
}),
);
} catch (err) {
if (sender && !sender.isConnected()) {
await sleep(DISCONNECT_SLEEP);
continue;
} else if (err instanceof errors.FloodWaitError) {
await sleep(err.seconds * 1000);
continue;
}
foremans[senderIndex].releaseWorker();
if (onProgress) {
if (onProgress.isCanceled) {
throw new Error('USER_CANCELED');
}
progress += (1 / partCount);
onProgress(progress);
}
break;
throw err;
}
})(j, blobSlice));
}
await Promise.all(sendingParts);
foremans[senderIndex].releaseWorker();
if (onProgress) {
if (onProgress.isCanceled) {
throw new Error('USER_CANCELED');
}
progress += (1 / partCount);
onProgress(progress);
}
break;
}
})(i, blobSlice));
currentForemanIndex++;
}
await Promise.all(promises);
return isLarge
? new Api.InputFileBig({
id: fileId,

31
src/util/foreman.ts Normal file
View File

@ -0,0 +1,31 @@
import Deferred from './Deferred';
export class Foreman {
private deferreds: Deferred[] = [];
activeWorkers = 0;
constructor(private maxWorkers: number) {
}
requestWorker() {
if (this.activeWorkers === this.maxWorkers) {
const deferred = new Deferred();
this.deferreds.push(deferred);
return deferred.promise;
} else {
this.activeWorkers++;
}
return Promise.resolve();
}
releaseWorker() {
if (this.deferreds.length && (this.activeWorkers === this.maxWorkers)) {
const deferred = this.deferreds.shift()!;
deferred.resolve();
} else {
this.activeWorkers--;
}
}
}