From 0e59ff9bd70f4cae90e3fd0444b52004fdf9313d Mon Sep 17 00:00:00 2001 From: Alexander Zinchuk Date: Sat, 24 Jul 2021 02:17:36 +0300 Subject: [PATCH] GramJs: Support reconnects when transferring files, avoid main loop race condition (#1319) --- src/lib/gramjs/Utils.js | 2 +- src/lib/gramjs/client/TelegramClient.js | 201 +++++++++++------- src/lib/gramjs/client/downloadFile.ts | 88 ++++---- src/lib/gramjs/client/uploadFile.ts | 83 ++++---- src/lib/gramjs/extensions/Logger.js | 10 + .../gramjs/extensions/PromisedWebSockets.js | 2 + src/lib/gramjs/network/MTProtoSender.js | 4 + 7 files changed, 233 insertions(+), 157 deletions(-) diff --git a/src/lib/gramjs/Utils.js b/src/lib/gramjs/Utils.js index 82d7caada..3cb93892a 100644 --- a/src/lib/gramjs/Utils.js +++ b/src/lib/gramjs/Utils.js @@ -332,7 +332,7 @@ function getAppropriatedPartSize(fileSize) { if (fileSize <= 786432000) { // 750MB return 256; } - if (fileSize <= 1572864000) { // 1500MB + if (fileSize <= 2097152000) { // 2000MB return 512; } diff --git a/src/lib/gramjs/client/TelegramClient.js b/src/lib/gramjs/client/TelegramClient.js index e7b5c1a0e..af1f1d8bc 100644 --- a/src/lib/gramjs/client/TelegramClient.js +++ b/src/lib/gramjs/client/TelegramClient.js @@ -25,7 +25,7 @@ const DEFAULT_DC_ID = 2; const WEBDOCUMENT_DC_ID = 4; const DEFAULT_IPV4_IP = 'zws4.web.telegram.org'; const DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]'; -const BORROWED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec +const EXPORTED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec const WEBDOCUMENT_REQUEST_PART_SIZE = 131072; // 128kb const PING_INTERVAL = 3000; // 3 sec @@ -139,9 +139,12 @@ class TelegramClient { // These will be set later this._config = undefined; this.phoneCodeHashes = []; - this._borrowedSenderPromises = {}; - this._borrowedSenderReleaseTimeouts = {}; + this._exportedSenderPromises = {}; + this._exportedSenderReleaseTimeouts = {}; this._additionalDcsDisabled = args.additionalDcsDisabled; + this._loopStarted = false; + this._reconnecting = false; + this._destroyed = false; } @@ -156,30 +159,50 @@ class TelegramClient { async connect() { await this._initSession(); - this._sender = new MTProtoSender(this.session.getAuthKey(), { - logger: this._log, - dcId: this.session.dcId, - retries: this._connectionRetries, - delay: this._retryDelay, - autoReconnect: this._autoReconnect, - connectTimeout: this._timeout, - authKeyCallback: this._authKeyCallback.bind(this), - updateCallback: this._handleUpdate.bind(this), - isMainSender: true, - }); + if (this._sender === undefined) { + // only init sender once to avoid multiple loops. + this._sender = new MTProtoSender(this.session.getAuthKey(), { + logger: this._log, + dcId: this.session.dcId, + retries: this._connectionRetries, + delay: this._retryDelay, + autoReconnect: this._autoReconnect, + connectTimeout: this._timeout, + authKeyCallback: this._authKeyCallback.bind(this), + updateCallback: this._handleUpdate.bind(this), + isMainSender: true, + }); + } + // set defaults vars + this._sender.userDisconnected = true; + this._sender._user_connected = false; + this._sender._reconnecting = false; + this._sender._disconnected = true; const connection = new this._connection( this.session.serverAddress, this.session.port, this.session.dcId, this._log, ); - await this._sender.connect(connection); + const newConnection = await this._sender.connect(connection); + if (!newConnection) { + // we're already connected so no need to reset auth key. + if (!this._loopStarted) { + this._updateLoop(); + this._loopStarted = true; + } + return; + } this.session.setAuthKey(this._sender.authKey); await this._sender.send(this._initWith( new requests.help.GetConfig({}), )); - this._updateLoop(); + if (!this._loopStarted) { + this._updateLoop(); + this._loopStarted = true; + } + this._reconnecting = false; } async _initSession() { @@ -192,8 +215,11 @@ class TelegramClient { } async _updateLoop() { - while (this.isConnected()) { + while (!this._destroyed) { await Helpers.sleep(PING_INTERVAL); + if (this._reconnecting) { + continue; + } try { await attempts(() => { @@ -205,11 +231,12 @@ class TelegramClient { } catch (err) { // eslint-disable-next-line no-console console.warn(err); + if (this._reconnecting) { + continue; + } await this.disconnect(); - this.connect(); - - return; + await this.connect(); } // We need to send some content-related request at least hourly @@ -225,6 +252,7 @@ class TelegramClient { } } } + await this.disconnect(); } /** @@ -237,7 +265,7 @@ class TelegramClient { } await Promise.all( - Object.values(this._borrowedSenderPromises) + Object.values(this._exportedSenderPromises) .map((promise) => { return promise && promise.then((sender) => { if (sender) { @@ -248,7 +276,7 @@ class TelegramClient { }), ); - this._borrowedSenderPromises = {}; + this._exportedSenderPromises = {}; } /** @@ -256,6 +284,8 @@ class TelegramClient { * @returns {Promise} */ async destroy() { + this._destroyed = true; + try { await this.disconnect(); } catch (err) { @@ -274,6 +304,7 @@ class TelegramClient { // so it's not valid anymore. Set to None to force recreating it. await this._sender.authKey.setKey(undefined); this.session.setAuthKey(undefined); + this._reconnecting = true; await this.disconnect(); return this.connect(); } @@ -285,58 +316,14 @@ class TelegramClient { // endregion // export region - _cleanupBorrowedSender(dcId) { - this._borrowedSenderPromises[dcId] = undefined; + _cleanupExportedSender(dcId) { + this._exportedSenderPromises[dcId] = undefined; } - _borrowExportedSender(dcId) { - if (this._additionalDcsDisabled) { - return undefined; - } - - if (!this._borrowedSenderPromises[dcId]) { - this._borrowedSenderPromises[dcId] = this._createExportedSender(dcId); - } - - return this._borrowedSenderPromises[dcId].then((sender) => { - if (!sender) { - this._borrowedSenderPromises[dcId] = undefined; - return this._borrowExportedSender(dcId); - } - - if (this._borrowedSenderReleaseTimeouts[dcId]) { - clearTimeout(this._borrowedSenderReleaseTimeouts[dcId]); - this._borrowedSenderReleaseTimeouts[dcId] = undefined; - } - - this._borrowedSenderReleaseTimeouts[dcId] = setTimeout(() => { - this._borrowedSenderReleaseTimeouts[dcId] = undefined; - this._borrowedSenderPromises[dcId] = undefined; - - // eslint-disable-next-line no-console - console.warn(`Disconnecting from file socket #${dcId}...`); - sender.disconnect(); - }, BORROWED_SENDER_RELEASE_TIMEOUT); - - return sender; - }); - } - - async _createExportedSender(dcId) { + async _connectSender(sender, dcId) { const dc = utils.getDC(dcId); - const sender = new MTProtoSender(this.session.getAuthKey(dcId), - { - logger: this._log, - dcId, - retries: this._connectionRetries, - delay: this._retryDelay, - autoReconnect: this._autoReconnect, - connectTimeout: this._timeout, - authKeyCallback: this._authKeyCallback.bind(this), - isMainSender: dcId === this.session.dcId, - onConnectionBreak: this._cleanupBorrowedSender.bind(this), - }); - for (let i = 0; i < 5; i++) { + + while (true) { try { await sender.connect(new this._connection( dc.ipAddress, @@ -344,7 +331,8 @@ class TelegramClient { dcId, this._log, )); - if (this.session.dcId !== dcId) { + + if (this.session.dcId !== dcId && !sender._authenticated) { this._log.info(`Exporting authorization for data center ${dc.ipAddress}`); const auth = await this.invoke(new requests.auth.ExportAuthorization({ dcId })); const req = this._initWith(new requests.auth.ImportAuthorization({ @@ -352,14 +340,78 @@ class TelegramClient { bytes: auth.bytes, })); await sender.send(req); + sender._authenticated = true; } sender.dcId = dcId; + return sender; - } catch (e) { + } catch (err) { + // eslint-disable-next-line no-console + console.error(err); + + await Helpers.sleep(1000); await sender.disconnect(); } } - return undefined; + } + + async _borrowExportedSender(dcId, shouldReconnect, existingSender) { + if (this._additionalDcsDisabled) { + return undefined; + } + + if (!this._exportedSenderPromises[dcId] || shouldReconnect) { + this._exportedSenderPromises[dcId] = this._connectSender( + existingSender || this._createExportedSender(dcId), + dcId, + ); + } + + let sender; + try { + sender = await this._exportedSenderPromises[dcId]; + if (!sender.isConnected()) { + return this._borrowExportedSender(dcId, true, sender); + } + } catch (err) { + // eslint-disable-next-line no-console + console.error(err); + + return this._borrowExportedSender(dcId, true); + } + + if (this._exportedSenderReleaseTimeouts[dcId]) { + clearTimeout(this._exportedSenderReleaseTimeouts[dcId]); + this._exportedSenderReleaseTimeouts[dcId] = undefined; + } + + this._exportedSenderReleaseTimeouts[dcId] = setTimeout(() => { + // eslint-disable-next-line no-console + console.warn(`Disconnecting from file socket #${dcId}...`); + + this._exportedSenderReleaseTimeouts[dcId] = undefined; + sender.disconnect(); + }, EXPORTED_SENDER_RELEASE_TIMEOUT); + + return sender; + } + + _createExportedSender(dcId) { + return new MTProtoSender(this.session.getAuthKey(dcId), { + logger: this._log, + dcId, + retries: this._connectionRetries, + delay: this._retryDelay, + autoReconnect: this._autoReconnect, + connectTimeout: this._timeout, + authKeyCallback: this._authKeyCallback.bind(this), + isMainSender: dcId === this.session.dcId, + onConnectionBreak: this._cleanupExportedSender.bind(this), + }); + } + + getSender(dcId) { + return dcId ? this._borrowExportedSender(dcId) : Promise.resolve(this._sender); } // end region @@ -610,6 +662,7 @@ class TelegramClient { } } } + // region Invoking Telegram request /** * Invokes a MTProtoRequest (sends and receives it) and returns its result diff --git a/src/lib/gramjs/client/downloadFile.ts b/src/lib/gramjs/client/downloadFile.ts index 721cd2391..5d61e8328 100644 --- a/src/lib/gramjs/client/downloadFile.ts +++ b/src/lib/gramjs/client/downloadFile.ts @@ -3,6 +3,7 @@ import { default as Api } from '../tl/api'; import TelegramClient from './TelegramClient'; import { getAppropriatedPartSize } from '../Utils'; import { sleep, createDeferred } from '../Helpers'; +import errors from '../errors'; export interface progressCallback { isCanceled?: boolean; @@ -33,7 +34,7 @@ interface Deferred { const MIN_CHUNK_SIZE = 4096; const DEFAULT_CHUNK_SIZE = 64; // kb const ONE_MB = 1024 * 1024; -const REQUEST_TIMEOUT = 15000; +const DISCONNECT_SLEEP = 1000; class Foreman { @@ -90,24 +91,6 @@ export async function downloadFile( throw new Error(`The part size must be evenly divisible by ${MIN_CHUNK_SIZE}`); } - let sender: any; - if (dcId) { - try { - sender = await client._borrowExportedSender(dcId); - } catch (e) { - // This should never raise - client._log.error(e); - if (e.message === 'DC_ID_INVALID') { - // Can't export a sender for the ID we are currently in - sender = client._sender; - } else { - throw e; - } - } - } else { - sender = client._sender; - } - client._log.info(`Downloading file in chunks of ${partSize} bytes`); const foreman = new Foreman(workers); @@ -121,6 +104,10 @@ export async function downloadFile( progressCallback(progress); } + // used to populate the sender + await client.getSender(dcId); + + // eslint-disable-next-line no-constant-condition while (true) { let limit = partSize; @@ -139,39 +126,55 @@ export async function downloadFile( } // eslint-disable-next-line no-loop-func - promises.push((async () => { - try { - const result = await Promise.race([ - await sender.send(new Api.upload.GetFile({ + promises.push((async (offsetMemo: number) => { + // eslint-disable-next-line no-constant-condition + while (true) { + const sender = await client.getSender(dcId); + try { + if (!sender._user_connected) { + await sleep(DISCONNECT_SLEEP); + continue; + } + const result = await sender.send(new Api.upload.GetFile({ location: inputLocation, - offset, + offset: offsetMemo, limit, precise: isPrecise || undefined, - })), - sleep(REQUEST_TIMEOUT).then(() => Promise.reject(new Error('REQUEST_TIMEOUT'))), - ]); + })); - if (progressCallback) { - if (progressCallback.isCanceled) { - throw new Error('USER_CANCELED'); + if (progressCallback) { + if (progressCallback.isCanceled) { + throw new Error('USER_CANCELED'); + } + + progress += (1 / partsCount); + progressCallback(progress); } - progress += (1 / partsCount); - progressCallback(progress); - } + if (!end && (result.bytes.length < limit)) { + hasEnded = true; + } + + return result.bytes; + } catch (err) { + if (err.message === 'Disconnect') { + await sleep(DISCONNECT_SLEEP); + continue; + } else if (err instanceof errors.FloodWaitError) { + await sleep(err.seconds * 1000); + continue; + } else if (err.message !== 'USER_CANCELED') { + // eslint-disable-next-line no-console + console.error(err); + } - if (!end && (result.bytes.length < limit)) { hasEnded = true; + throw err; + } finally { + foreman.releaseWorker(); } - - return result.bytes; - } catch (err) { - hasEnded = true; - throw err; - } finally { - foreman.releaseWorker(); } - })()); + })(offset)); offset += limit; @@ -179,7 +182,6 @@ export async function downloadFile( break; } } - const results = await Promise.all(promises); const buffers = results.filter(Boolean); const totalLength = end ? (end + 1) - start : undefined; diff --git a/src/lib/gramjs/client/uploadFile.ts b/src/lib/gramjs/client/uploadFile.ts index 9844a85ed..9437c1980 100644 --- a/src/lib/gramjs/client/uploadFile.ts +++ b/src/lib/gramjs/client/uploadFile.ts @@ -4,6 +4,7 @@ import { default as Api } from '../tl/api'; import TelegramClient from './TelegramClient'; import { generateRandomBytes, readBigIntFromBuffer, sleep } from '../Helpers'; import { getAppropriatedPartSize } from '../Utils'; +import errors from '../errors'; interface OnProgress { isCanceled?: boolean; @@ -20,7 +21,7 @@ export interface UploadFileParams { const KB_TO_BYTES = 1024; const LARGE_FILE_THRESHOLD = 10 * 1024 * 1024; -const UPLOAD_TIMEOUT = 15 * 1000; +const DISCONNECT_SLEEP = 1000; export async function uploadFile( client: TelegramClient, @@ -38,7 +39,7 @@ export async function uploadFile( const buffer = Buffer.from(await fileToBuffer(file)); // We always upload from the DC we are in. - const sender = await client._borrowExportedSender(client.session.dcId); + const sender = await client.getSender(client.session.dcId); if (!workers || !size) { workers = 1; @@ -63,47 +64,51 @@ export async function uploadFile( const bytes = buffer.slice(j * partSize, (j + 1) * partSize); // eslint-disable-next-line no-loop-func - sendingParts.push((async () => { - await sender.send( - isLarge - ? new Api.upload.SaveBigFilePart({ - fileId, - filePart: j, - fileTotalParts: partCount, - bytes, - }) - : new Api.upload.SaveFilePart({ - fileId, - filePart: j, - bytes, - }), - ); - - if (onProgress) { - if (onProgress.isCanceled) { - throw new Error('USER_CANCELED'); + sendingParts.push((async (jMemo: number, bytesMemo: Buffer) => { + while (true) { + if (!sender._user_connected) { + await sleep(DISCONNECT_SLEEP); + continue; + } + try { + await sender.send( + isLarge + ? new Api.upload.SaveBigFilePart({ + fileId, + filePart: jMemo, + fileTotalParts: partCount, + bytes: bytesMemo, + }) + : new Api.upload.SaveFilePart({ + fileId, + filePart: jMemo, + bytes: bytesMemo, + }), + ); + } catch (err) { + if (err.message === 'Disconnect') { + await sleep(DISCONNECT_SLEEP); + continue; + } else if (err instanceof errors.FloodWaitError) { + await sleep(err.seconds * 1000); + continue; + } + throw err; } - progress += (1 / partCount); - onProgress(progress); - } - })()); - } - try { - await Promise.race([ - await Promise.all(sendingParts), - sleep(UPLOAD_TIMEOUT * workers).then(() => Promise.reject(new Error('TIMEOUT'))), - ]); - } catch (err) { - if (err.message === 'TIMEOUT') { - // eslint-disable-next-line no-console - console.warn('Upload timeout. Retrying...'); - i -= workers; - continue; - } + if (onProgress) { + if (onProgress.isCanceled) { + throw new Error('USER_CANCELED'); + } - throw err; + progress += (1 / partCount); + onProgress(progress); + } + break; + } + })(j, bytes)); } + await Promise.all(sendingParts); } return isLarge diff --git a/src/lib/gramjs/extensions/Logger.js b/src/lib/gramjs/extensions/Logger.js index 62297b736..9267f0c5a 100644 --- a/src/lib/gramjs/extensions/Logger.js +++ b/src/lib/gramjs/extensions/Logger.js @@ -51,6 +51,11 @@ class Logger { * @param message {string} */ warn(message) { + // todo remove later + if (_level === 'debug') { + // eslint-disable-next-line no-console + console.error(new Error().stack); + } this._log('warn', message, this.colors.warn); } @@ -72,6 +77,11 @@ class Logger { * @param message {string} */ error(message) { + // todo remove later + if (_level === 'debug') { + // eslint-disable-next-line no-console + console.error(new Error().stack); + } this._log('error', message, this.colors.error); } diff --git a/src/lib/gramjs/extensions/PromisedWebSockets.js b/src/lib/gramjs/extensions/PromisedWebSockets.js index d2912ba7f..4c3582157 100644 --- a/src/lib/gramjs/extensions/PromisedWebSockets.js +++ b/src/lib/gramjs/extensions/PromisedWebSockets.js @@ -86,6 +86,8 @@ class PromisedWebSockets { resolve(this); }; this.client.onerror = (error) => { + // eslint-disable-next-line no-console + console.error('WebSocket error', error); reject(error); }; this.client.onclose = (event) => { diff --git a/src/lib/gramjs/network/MTProtoSender.js b/src/lib/gramjs/network/MTProtoSender.js index 207f99f96..89e3ced28 100644 --- a/src/lib/gramjs/network/MTProtoSender.js +++ b/src/lib/gramjs/network/MTProtoSender.js @@ -7,6 +7,7 @@ const RPCResult = require('../tl/core/RPCResult'); const MessageContainer = require('../tl/core/MessageContainer'); const GZIPPacked = require('../tl/core/GZIPPacked'); const RequestState = require('./RequestState'); + const { MsgsAck, upload, @@ -169,6 +170,7 @@ class MTProtoSender { this._log.info('User is already connected!'); return false; } + this.isConnecting = true; this._connection = connection; for (let attempt = 0; attempt < this._retries; attempt++) { @@ -188,6 +190,7 @@ class MTProtoSender { await Helpers.sleep(this._delay); } } + this.isConnecting = false; return true; } @@ -287,6 +290,7 @@ class MTProtoSender { await this._authKeyCallback(this.authKey, this._dcId); } } else { + this._authenticated = true; this._log.debug('Already have an auth key ...'); } this._user_connected = true;