diff --git a/src/api/gramjs/methods/messages.ts b/src/api/gramjs/methods/messages.ts index 38d500033..a79af3aeb 100644 --- a/src/api/gramjs/methods/messages.ts +++ b/src/api/gramjs/methods/messages.ts @@ -72,6 +72,7 @@ import { getEmojiOnlyCountForMessage } from '../../../global/helpers/getEmojiOnl import { getServerTimeOffset } from '../../../util/serverTime'; import { getApiChatIdFromMtpPeer } from '../apiBuilders/peers'; import { updateChannelState } from '../updateManager'; +import { compact } from '../../../util/iteratees'; const FAST_SEND_TIMEOUT = 1000; const INPUT_WAVEFORM_LENGTH = 63; @@ -620,11 +621,18 @@ async function uploadMedia(localMessage: ApiMessage, attachment: ApiAttachment, } }; - const file = await fetchFile(blobUrl, filename); - const inputFile = await uploadFile(file, patchedOnProgress); + const fetchAndUpload = async (url: string) => { + const file = await fetchFile(url, filename); + return uploadFile(file, patchedOnProgress); + }; - const thumbFile = previewBlobUrl && await fetchFile(previewBlobUrl, filename); - const thumb = thumbFile ? await uploadFile(thumbFile) : undefined; + const isVideo = SUPPORTED_VIDEO_CONTENT_TYPES.has(mimeType); + const shouldUploadThumb = audio || isVideo || shouldSendAsFile; + + const [inputFile, thumb] = await Promise.all(compact([ + fetchAndUpload(blobUrl), + shouldUploadThumb && previewBlobUrl && fetchAndUpload(previewBlobUrl), + ])); const attributes: GramJs.TypeDocumentAttribute[] = [new GramJs.DocumentAttributeFilename({ fileName: filename })]; if (!shouldSendAsFile) { @@ -636,7 +644,7 @@ async function uploadMedia(localMessage: ApiMessage, attachment: ApiAttachment, }); } - if (SUPPORTED_VIDEO_CONTENT_TYPES.has(mimeType)) { + if (isVideo) { const { width, height, duration } = quick; if (duration !== undefined) { attributes.push(new GramJs.DocumentAttributeVideo({ diff --git a/src/lib/gramjs/client/TelegramClient.js b/src/lib/gramjs/client/TelegramClient.js index 72a1af64f..7fee0015a 100644 --- a/src/lib/gramjs/client/TelegramClient.js +++ b/src/lib/gramjs/client/TelegramClient.js @@ -27,12 +27,21 @@ const { getTmpPassword, } = require('./2fa'); const RequestState = require('../network/RequestState'); +const withAbortCheck = require('../../../util/withAbortCheck').default; +const { AbortError } = require('../../../util/withAbortCheck'); + +class ConnectTimeoutError extends Error { + constructor() { + super('Connection Timeout'); + } +} const DEFAULT_DC_ID = 2; const WEBDOCUMENT_DC_ID = 4; const EXPORTED_SENDER_RECONNECT_TIMEOUT = 1000; // 1 sec const EXPORTED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec const WEBDOCUMENT_REQUEST_PART_SIZE = 131072; // 128kb +const EXPORTED_SENDER_CONNECT_TIMEOUT = 8000; // 8 sec const PING_INTERVAL = 3000; // 3 sec const PING_TIMEOUT = 5000; // 5 sec @@ -156,6 +165,7 @@ class TelegramClient { this._config = undefined; this.phoneCodeHashes = []; this._exportedSenderPromises = {}; + this._exportedSenderAbortControllers = {}; this._waitingForAuthKey = {}; this._exportedSenderReleaseTimeouts = {}; this._additionalDcsDisabled = args.additionalDcsDisabled; @@ -328,6 +338,7 @@ class TelegramClient { ); this._exportedSenderPromises = {}; + this._exportedSenderAbortControllers = {}; this._waitingForAuthKey = {}; } @@ -374,6 +385,7 @@ class TelegramClient { } const sender = await this._exportedSenderPromises[dcId][index]; delete this._exportedSenderPromises[dcId][index]; + delete this._exportedSenderAbortControllers[dcId][index]; await sender.disconnect(); } @@ -388,6 +400,7 @@ class TelegramClient { } this._exportedSenderPromises[dcId] = {}; + this._exportedSenderAbortControllers[dcId] = {}; await Promise.all(promises.map(async (promise) => { const sender = await promise; @@ -395,7 +408,7 @@ class TelegramClient { })); } - async _connectSender(sender, dcId, isPremium = false) { + async _connectSender(sender, dcId, index, isPremium = false, abortSignal) { // if we don't already have an auth key we want to use normal DCs not -1 let hasAuthKey = Boolean(sender.authKey.getKey()); let firstConnectResolver; @@ -419,7 +432,7 @@ class TelegramClient { // eslint-disable-next-line no-constant-condition while (true) { try { - await sender.connect(new this._connection( + await withAbortCheck(abortSignal, sender.connect(new this._connection( dc.ipAddress, dc.port, dcId, @@ -427,18 +440,23 @@ class TelegramClient { this._args.testServers, // Premium DCs are not stable for obtaining auth keys, so need to we first connect to regular ones hasAuthKey ? isPremium : false, - )); + ))); if (this.session.dcId !== dcId && !sender._authenticated) { this._log.info(`Exporting authorization for data center ${dc.ipAddress}`); - const auth = await this.invoke(new requests.auth.ExportAuthorization({ dcId })); + const auth = await withAbortCheck( + abortSignal, + this.invoke(new requests.auth.ExportAuthorization({ dcId })), + ); + const req = this._initWith(new requests.auth.ImportAuthorization({ id: auth.id, bytes: auth.bytes, })); - await sender.send(req); + await withAbortCheck(abortSignal, sender.send(req)); sender._authenticated = true; } + sender.dcId = dcId; sender.userDisconnected = false; @@ -449,6 +467,12 @@ class TelegramClient { return sender; } catch (err) { + if (err instanceof AbortError) { + delete this._exportedSenderPromises[dcId][index]; + delete this._exportedSenderAbortControllers[dcId][index]; + sender.disconnect(); + return undefined; + } // eslint-disable-next-line no-console console.error(err); @@ -458,26 +482,54 @@ class TelegramClient { } } + getConnectedExportedSenderIndex(dcId, i) { + const index = Object.keys(this._exportedSenderReleaseTimeouts[dcId] || {})[0]; + const firstIndex = Number(index ?? 0); + return { + newIndex: firstIndex === i ? i + 1 : firstIndex, + noConnectedExportedSenders: index === undefined, + }; + } + async _borrowExportedSender(dcId, shouldReconnect, existingSender, index, isPremium) { if (this._additionalDcsDisabled) { return undefined; } - const i = index || 0; + let i = index || 0; if (!this._exportedSenderPromises[dcId]) this._exportedSenderPromises[dcId] = {}; + if (!this._exportedSenderAbortControllers[dcId]) this._exportedSenderAbortControllers[dcId] = {}; + + if (this._exportedSenderAbortControllers[dcId][i]?.signal.aborted) { + const { newIndex } = this.getConnectedExportedSenderIndex(dcId, i); + i = newIndex; + } if (!this._exportedSenderPromises[dcId][i] || shouldReconnect) { + this._exportedSenderAbortControllers[dcId][i]?.abort(); + this._exportedSenderAbortControllers[dcId][i] = new AbortController(); this._exportedSenderPromises[dcId][i] = this._connectSender( existingSender || this._createExportedSender(dcId, i), dcId, + index, isPremium, + this._exportedSenderAbortControllers[dcId][i].signal, ); } let sender; try { - sender = await this._exportedSenderPromises[dcId][i]; + sender = await Promise.race([ + this._exportedSenderPromises[dcId][i], + Helpers.sleep(EXPORTED_SENDER_CONNECT_TIMEOUT).then(() => { + return Promise.reject(new ConnectTimeoutError()); + }), + ]); + + if (!sender) { + throw new ConnectTimeoutError(); + } if (!sender.isConnected()) { if (sender.isConnecting) { @@ -488,6 +540,11 @@ class TelegramClient { } } } catch (err) { + if (err instanceof ConnectTimeoutError) { + this._exportedSenderAbortControllers[dcId][i]?.abort(); + const { newIndex, noConnectedExportedSenders } = this.getConnectedExportedSenderIndex(dcId, i); + return this._borrowExportedSender(dcId, noConnectedExportedSenders, undefined, newIndex, isPremium); + } // eslint-disable-next-line no-console console.error(err); diff --git a/src/util/withAbortCheck.ts b/src/util/withAbortCheck.ts new file mode 100644 index 000000000..c2b784561 --- /dev/null +++ b/src/util/withAbortCheck.ts @@ -0,0 +1,15 @@ +export class AbortError extends Error { + constructor() { + super('Aborted'); + } +} + +export default async function withAbortCheck(abortSignal: AbortSignal, cb: Promise): Promise { + const result = await cb; + + if (abortSignal?.aborted) { + throw new AbortError(); + } + + return result; +}