Use multiple connections and fix parallel workers for faster download (#1955)

This commit is contained in:
Alexander Zinchuk 2022-08-31 15:00:33 +02:00
parent a5219b69f7
commit 358d59ff53
9 changed files with 167 additions and 1070 deletions

1029
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -49,7 +49,7 @@
"@babel/preset-react": "^7.16.7",
"@babel/preset-typescript": "^7.16.7",
"@peculiar/webcrypto": "^1.3.3",
"@playwright/test": "^1.18.1",
"@playwright/test": "^1.23.4",
"@statoscope/cli": "^5.20.1",
"@statoscope/webpack-plugin": "^5.20.1",
"@testing-library/jest-dom": "^5.16.4",

View File

@ -40,6 +40,7 @@ const gramJsUpdateEventBuilder = { build: (update: object) => update };
let onUpdate: OnApiUpdate;
let client: TelegramClient;
let isConnected = false;
let currentUserId: string | undefined;
export async function init(_onUpdate: OnApiUpdate, initialArgs: ApiInitialArgs) {
if (DEBUG) {
@ -138,6 +139,10 @@ export async function init(_onUpdate: OnApiUpdate, initialArgs: ApiInitialArgs)
}
}
export function setIsPremium({ isPremium }: { isPremium: boolean }) {
client.setIsPremium(isPremium);
}
export async function destroy(noLogOut = false) {
if (!noLogOut) {
await invokeRequest(new GramJs.auth.LogOut());
@ -173,6 +178,14 @@ function handleGramJsUpdate(update: any) {
'@type': 'updateServerTimeOffset',
serverTimeOffset: update.timeOffset,
});
} else if (update instanceof GramJs.UpdateConfig) {
// eslint-disable-next-line no-underscore-dangle
const currentUser = (update as GramJs.UpdateConfig & { _entities?: (GramJs.TypeUser | GramJs.TypeChat)[] })
._entities
?.find((entity) => entity instanceof GramJs.User && buildApiPeerId(entity.id, 'user') === currentUserId);
if (!(currentUser instanceof GramJs.User)) return;
setIsPremium({ isPremium: Boolean(currentUser.premium) });
}
}
@ -322,6 +335,9 @@ export async function fetchCurrentUser() {
setMessageBuilderCurrentUserId(currentUser.id);
onCurrentUserUpdate(currentUser);
currentUserId = currentUser.id;
setIsPremium({ isPremium: Boolean(currentUser.isPremium) });
}
export function dispatchErrorUpdate<T extends GramJs.AnyRequest>(err: Error, request: T) {

View File

@ -38,6 +38,7 @@ addActionHandler('apiUpdate', (global, actions, update) => {
if (update.id === global.currentUserId && update.user.isPremium && !selectIsCurrentUserPremium(global)) {
actions.openPremiumModal({ isSuccess: true });
}
return updateUser(global, update.id, update.user);
}

View File

@ -152,6 +152,7 @@ class TelegramClient {
this._config = undefined;
this.phoneCodeHashes = [];
this._exportedSenderPromises = {};
this._waitingForAuthKey = {};
this._exportedSenderReleaseTimeouts = {};
this._additionalDcsDisabled = args.additionalDcsDisabled;
this._loopStarted = false;
@ -308,17 +309,20 @@ class TelegramClient {
await Promise.all(
Object.values(this._exportedSenderPromises)
.map((promise) => {
return promise && promise.then((sender) => {
if (sender) {
return sender.disconnect();
}
return undefined;
.map((promises) => {
return Object.values(promises).map((promise) => {
return promise && promise.then((sender) => {
if (sender) {
return sender.disconnect();
}
return undefined;
});
});
}),
}).flat(),
);
this._exportedSenderPromises = {};
this._waitingForAuthKey = {};
}
/**
@ -358,18 +362,35 @@ class TelegramClient {
// endregion
// export region
async _cleanupExportedSender(dcId) {
async _cleanupExportedSender(dcId, index) {
if (this.session.dcId !== dcId) {
this.session.setAuthKey(undefined, dcId);
}
const sender = await this._exportedSenderPromises[dcId];
this._exportedSenderPromises[dcId] = undefined;
const sender = await this._exportedSenderPromises[dcId][index];
delete this._exportedSenderPromises[dcId][index];
await sender.disconnect();
}
async _connectSender(sender, dcId) {
async _connectSender(sender, dcId, isPremium = false) {
// if we don't already have an auth key we want to use normal DCs not -1
const dc = utils.getDC(dcId, Boolean(sender.authKey.getKey()));
let hasAuthKey = Boolean(sender.authKey.getKey());
let firstConnectResolver;
if (!hasAuthKey) {
if (this._waitingForAuthKey[dcId]) {
await this._waitingForAuthKey[dcId];
const authKey = this.session.getAuthKey(dcId);
await sender.authKey.setKey(authKey.getKey());
hasAuthKey = Boolean(sender.authKey.getKey());
} else {
this._waitingForAuthKey[dcId] = new Promise((resolve) => {
firstConnectResolver = resolve;
});
}
}
const dc = utils.getDC(dcId, hasAuthKey);
// eslint-disable-next-line no-constant-condition
while (true) {
@ -380,6 +401,8 @@ class TelegramClient {
dcId,
this._log,
this._args.testServers,
// Premium DCs are not stable for obtaining auth keys, so need to we first connect to regular ones
hasAuthKey ? isPremium : false,
));
if (this.session.dcId !== dcId && !sender._authenticated) {
@ -395,6 +418,11 @@ class TelegramClient {
sender.dcId = dcId;
sender.userDisconnected = false;
if (firstConnectResolver) {
firstConnectResolver();
delete this._waitingForAuthKey[dcId];
}
return sender;
} catch (err) {
// eslint-disable-next-line no-console
@ -406,51 +434,57 @@ class TelegramClient {
}
}
async _borrowExportedSender(dcId, shouldReconnect, existingSender) {
async _borrowExportedSender(dcId, shouldReconnect, existingSender, index, isPremium) {
if (this._additionalDcsDisabled) {
return undefined;
}
if (!this._exportedSenderPromises[dcId] || shouldReconnect) {
this._exportedSenderPromises[dcId] = this._connectSender(
existingSender || this._createExportedSender(dcId),
const i = index || 0;
if (!this._exportedSenderPromises[dcId]) this._exportedSenderPromises[dcId] = {};
if (!this._exportedSenderPromises[dcId][i] || shouldReconnect) {
this._exportedSenderPromises[dcId][i] = this._connectSender(
existingSender || this._createExportedSender(dcId, i),
dcId,
isPremium,
);
}
let sender;
try {
sender = await this._exportedSenderPromises[dcId];
sender = await this._exportedSenderPromises[dcId][i];
if (!sender.isConnected()) {
if (sender.isConnecting) {
await Helpers.sleep(EXPORTED_SENDER_RECONNECT_TIMEOUT);
return this._borrowExportedSender(dcId, false, sender);
return this._borrowExportedSender(dcId, false, sender, i, isPremium);
} else {
return this._borrowExportedSender(dcId, true, sender);
return this._borrowExportedSender(dcId, true, sender, i, isPremium);
}
}
} catch (err) {
// eslint-disable-next-line no-console
console.error(err);
return this._borrowExportedSender(dcId, true);
return this._borrowExportedSender(dcId, true, undefined, i, isPremium);
}
if (this._exportedSenderReleaseTimeouts[dcId]) {
clearTimeout(this._exportedSenderReleaseTimeouts[dcId]);
this._exportedSenderReleaseTimeouts[dcId] = undefined;
if (!this._exportedSenderReleaseTimeouts[dcId]) this._exportedSenderReleaseTimeouts[dcId] = {};
if (this._exportedSenderReleaseTimeouts[dcId][i]) {
clearTimeout(this._exportedSenderReleaseTimeouts[dcId][i]);
this._exportedSenderReleaseTimeouts[dcId][i] = undefined;
}
this._exportedSenderReleaseTimeouts[dcId] = setTimeout(() => {
this._exportedSenderReleaseTimeouts[dcId] = undefined;
this._exportedSenderReleaseTimeouts[dcId][i] = setTimeout(() => {
this._exportedSenderReleaseTimeouts[dcId][i] = undefined;
sender.disconnect();
}, EXPORTED_SENDER_RELEASE_TIMEOUT);
return sender;
}
_createExportedSender(dcId) {
_createExportedSender(dcId, index) {
return new MTProtoSender(this.session.getAuthKey(dcId), {
logger: this._log,
dcId,
@ -460,12 +494,14 @@ class TelegramClient {
connectTimeout: this._timeout,
authKeyCallback: this._authKeyCallback.bind(this),
isMainSender: dcId === this.session.dcId,
onConnectionBreak: this._cleanupExportedSender.bind(this),
onConnectionBreak: () => this._cleanupExportedSender(dcId, index),
});
}
getSender(dcId) {
return dcId ? this._borrowExportedSender(dcId) : Promise.resolve(this._sender);
getSender(dcId, index, isPremium) {
return dcId
? this._borrowExportedSender(dcId, undefined, undefined, index, isPremium)
: Promise.resolve(this._sender);
}
// end region
@ -632,7 +668,7 @@ class TelegramClient {
}),
{
dcId: photo.dcId,
fileSize: size.size,
fileSize: size.size || Math.max(...(size.sizes || [])),
progressCallback: args.progressCallback,
},
);
@ -832,6 +868,10 @@ class TelegramClient {
throw new Error(`Request was unsuccessful ${attempt} time(s)`);
}
setIsPremium(isPremium) {
this.isPremium = isPremium;
}
async getMe() {
try {
return (await this.invoke(new requests.users

View File

@ -42,29 +42,31 @@ const SENDER_TIMEOUT = 60 * 1000;
const SENDER_RETRIES = 5;
class Foreman {
private deferred: Deferred | undefined;
private deferreds: Deferred[] = [];
private activeWorkers = 0;
activeWorkers = 0;
constructor(private maxWorkers: number) {
}
requestWorker() {
this.activeWorkers++;
if (this.activeWorkers > this.maxWorkers) {
this.deferred = createDeferred();
return this.deferred!.promise;
if (this.activeWorkers === this.maxWorkers) {
const deferred = createDeferred();
this.deferreds.push(deferred);
return deferred.promise;
} else {
this.activeWorkers++;
}
return Promise.resolve();
}
releaseWorker() {
this.activeWorkers--;
if (this.deferred && (this.activeWorkers <= this.maxWorkers)) {
this.deferred.resolve();
if (this.deferreds.length && (this.activeWorkers === this.maxWorkers)) {
const deferred = this.deferreds.shift()!;
deferred.resolve();
} else {
this.activeWorkers--;
}
}
}
@ -142,6 +144,14 @@ export async function downloadFile(
return undefined;
}
const MAX_CONCURRENT_CONNECTIONS = 3;
const MAX_CONCURRENT_CONNECTIONS_PREMIUM = 6;
const MAX_WORKERS_PER_CONNECTION = 10;
const MULTIPLE_CONNECTIONS_MIN_FILE_SIZE = 10485760; // 10MB
const foremans = Array(MAX_CONCURRENT_CONNECTIONS_PREMIUM).fill(undefined)
.map(() => new Foreman(MAX_WORKERS_PER_CONNECTION));
async function downloadFile2(
client: TelegramClient,
inputLocation: Api.InputFileLocation,
@ -151,8 +161,9 @@ async function downloadFile2(
partSizeKb, end,
} = fileParams;
const {
fileSize, workers = 1,
fileSize,
} = fileParams;
const isPremium = Boolean(client.isPremium);
const { dcId, progressCallback, start = 0 } = fileParams;
end = end && end < fileSize ? end : fileSize - 1;
@ -163,6 +174,11 @@ async function downloadFile2(
const partSize = partSizeKb * 1024;
const partsCount = end ? Math.ceil((end - start) / partSize) : 1;
const noParallel = !end;
const shouldUseMultipleConnections = fileSize
&& fileSize >= MULTIPLE_CONNECTIONS_MIN_FILE_SIZE
&& !noParallel;
let deferred: Deferred | undefined;
if (partSize % MIN_CHUNK_SIZE !== 0) {
throw new Error(`The part size must be evenly divisible by ${MIN_CHUNK_SIZE}`);
@ -170,10 +186,14 @@ async function downloadFile2(
client._log.info(`Downloading file in chunks of ${partSize} bytes`);
const foreman = new Foreman(workers);
const fileView = new FileView(end - start + 1);
const promises: Promise<any>[] = [];
let offset = start;
// Pick the least busy foreman
// For some reason, fresh connections give out a higher speed for the first couple of seconds
// I have no idea why, but this may speed up the download of small files
const activeCounts = foremans.map((l) => l.activeWorkers);
let currentForemanIndex = activeCounts.indexOf(Math.min(...activeCounts));
// Used for files with unknown size and for manual cancellations
let hasEnded = false;
@ -182,9 +202,6 @@ async function downloadFile2(
progressCallback(progress);
}
// Preload sender
await client.getSender(dcId);
// Allocate memory
await fileView.init();
@ -198,10 +215,20 @@ async function downloadFile2(
isPrecise = true;
}
await foreman.requestWorker();
// Use only first connection for avatars, because no size is known and we don't want to
// download empty parts using all connections at once
const senderIndex = !shouldUseMultipleConnections ? 0 : currentForemanIndex % (
isPremium ? MAX_CONCURRENT_CONNECTIONS_PREMIUM : MAX_CONCURRENT_CONNECTIONS
);
await foremans[senderIndex].requestWorker();
if (deferred) await deferred.promise;
if (noParallel) deferred = createDeferred();
if (hasEnded) {
foreman.releaseWorker();
foremans[senderIndex].releaseWorker();
break;
}
@ -211,7 +238,7 @@ async function downloadFile2(
while (true) {
let sender;
try {
sender = await client.getSender(dcId);
sender = await client.getSender(dcId, senderIndex, isPremium);
// sometimes a session is revoked and will cause this to hang.
const result = await Promise.race([
sender.send(new Api.upload.GetFile({
@ -243,7 +270,8 @@ async function downloadFile2(
hasEnded = true;
}
foreman.releaseWorker();
foremans[senderIndex].releaseWorker();
if (deferred) deferred.resolve();
fileView.write(result.bytes, offsetMemo - start);
@ -257,7 +285,8 @@ async function downloadFile2(
continue;
}
foreman.releaseWorker();
foremans[senderIndex].releaseWorker();
if (deferred) deferred.resolve();
hasEnded = true;
throw err;
@ -266,6 +295,7 @@ async function downloadFile2(
})(offset));
offset += limit;
currentForemanIndex++;
if (end && (offset > end)) {
break;

View File

@ -65,21 +65,21 @@ class PromisedWebSockets {
return toReturn;
}
getWebSocketLink(ip, port, testServers) {
getWebSocketLink(ip, port, testServers, isPremium) {
if (port === 443) {
return `wss://${ip}:${port}/apiws${testServers ? '_test' : ''}`;
return `wss://${ip}:${port}/apiws${testServers ? '_test' : ''}${isPremium ? '_premium' : ''}`;
} else {
return `ws://${ip}:${port}/apiws${testServers ? '_test' : ''}`;
return `ws://${ip}:${port}/apiws${testServers ? '_test' : ''}${isPremium ? '_premium' : ''}`;
}
}
connect(port, ip, testServers = false) {
connect(port, ip, testServers = false, isPremium = false) {
this.stream = Buffer.alloc(0);
this.canRead = new Promise((resolve) => {
this.resolveRead = resolve;
});
this.closed = false;
this.website = this.getWebSocketLink(ip, port, testServers);
this.website = this.getWebSocketLink(ip, port, testServers, isPremium);
this.client = new WebSocketClient(this.website, 'binary');
return new Promise((resolve, reject) => {
this.client.onopen = () => {

View File

@ -15,12 +15,13 @@ const AsyncQueue = require('../../extensions/AsyncQueue');
class Connection {
PacketCodecClass = undefined;
constructor(ip, port, dcId, loggers, testServers) {
constructor(ip, port, dcId, loggers, testServers, isPremium) {
this._ip = ip;
this._port = port;
this._dcId = dcId;
this._log = loggers;
this._testServers = testServers;
this._isPremium = isPremium;
this._connected = false;
this._sendTask = undefined;
this._recvTask = undefined;
@ -40,7 +41,7 @@ class Connection {
async _connect() {
this._log.debug('Connecting');
this._codec = new this.PacketCodecClass(this);
await this.socket.connect(this._port, this._ip, this._testServers);
await this.socket.connect(this._port, this._ip, this._testServers, this._isPremium);
this._log.debug('Finished connecting');
// await this.socket.connect({host: this._ip, port: this._port});
await this._initConn();

View File

@ -4,7 +4,7 @@ import { requestPart } from './progressive';
const DOWNLOAD_PART_SIZE = 1024 * 1024;
const TEST_PART_SIZE = 64 * 1024;
const QUEUE_SIZE = 5;
const QUEUE_SIZE = 8;
class FilePartQueue<T> {
queue: Promise<T>[];