From c297a697cb7f3981a26e7f884d55d04851d75309 Mon Sep 17 00:00:00 2001 From: Alexander Zinchuk Date: Thu, 20 Jul 2023 17:53:21 +0200 Subject: [PATCH] GramJs: Fix exported senders release (#3625) --- src/api/gramjs/worker/provider.ts | 7 ---- src/lib/gramjs/client/TelegramClient.js | 54 ++++++++++++++++++++----- src/lib/gramjs/client/downloadFile.ts | 2 + src/lib/gramjs/client/uploadFile.ts | 2 + 4 files changed, 48 insertions(+), 17 deletions(-) diff --git a/src/api/gramjs/worker/provider.ts b/src/api/gramjs/worker/provider.ts index 3068bc160..0ec2ee11e 100644 --- a/src/api/gramjs/worker/provider.ts +++ b/src/api/gramjs/worker/provider.ts @@ -84,13 +84,6 @@ export function initApi(onUpdate: OnApiUpdate, initialArgs: ApiInitialArgs) { worker = new Worker(new URL('./worker.ts', import.meta.url)); subscribeToWorker(onUpdate); - if (requestStates.size > 0) { - requestStates.forEach((value) => { - // eslint-disable-next-line no-console - console.error('Hanging request', value.DEBUG_payload); - }); - } - if (initialArgs.platform === 'iOS') { setupIosHealthCheck(); } diff --git a/src/lib/gramjs/client/TelegramClient.js b/src/lib/gramjs/client/TelegramClient.js index 420507cc3..da2604a37 100644 --- a/src/lib/gramjs/client/TelegramClient.js +++ b/src/lib/gramjs/client/TelegramClient.js @@ -170,6 +170,7 @@ class TelegramClient { this._config = undefined; this.phoneCodeHashes = []; this._exportedSenderPromises = {}; + this._exportedSenderRefCounter = {}; this._waitingForAuthKey = {}; this._exportedSenderReleaseTimeouts = {}; this._additionalDcsDisabled = args.additionalDcsDisabled; @@ -379,6 +380,7 @@ class TelegramClient { }); }); + this._exportedSenderRefCounter = {}; this._exportedSenderPromises = {}; this._waitingForAuthKey = {}; } @@ -429,6 +431,7 @@ class TelegramClient { if (this._shouldDebugExportedSenders) console.log(`🧹 Cleanup idx=${index} dcId=${dcId}`); const sender = await this._exportedSenderPromises[dcId][index]; delete this._exportedSenderPromises[dcId][index]; + delete this._exportedSenderRefCounter[dcId][index]; await sender.disconnect(); } @@ -443,6 +446,7 @@ class TelegramClient { } this._exportedSenderPromises[dcId] = {}; + this._exportedSenderRefCounter[dcId] = {}; await Promise.all(promises.map(async (promise) => { const sender = await promise; @@ -531,6 +535,28 @@ class TelegramClient { } } + releaseExportedSender(sender) { + const dcId = sender._dcId; + const index = sender._senderIndex; + + if (!this._exportedSenderRefCounter[dcId]) return; + if (!this._exportedSenderRefCounter[dcId][index]) return; + + this._exportedSenderRefCounter[dcId][index] -= 1; + + if (this._exportedSenderRefCounter[dcId][index] <= 0) { + if (!this._exportedSenderReleaseTimeouts[dcId]) this._exportedSenderReleaseTimeouts[dcId] = {}; + + this._exportedSenderReleaseTimeouts[dcId][index] = setTimeout(() => { + // eslint-disable-next-line no-console + if (this._shouldDebugExportedSenders) console.log(`[CC] [idx=${index} dcId=${dcId}] 🚪 Release`); + sender.disconnect(); + this._exportedSenderReleaseTimeouts[dcId][index] = undefined; + this._exportedSenderPromises[dcId][index] = undefined; + }, EXPORTED_SENDER_RELEASE_TIMEOUT); + } + } + async _borrowExportedSender(dcId, shouldReconnect, existingSender, index, isPremium) { if (this._additionalDcsDisabled) { return undefined; @@ -539,6 +565,7 @@ class TelegramClient { const i = index || 0; if (!this._exportedSenderPromises[dcId]) this._exportedSenderPromises[dcId] = {}; + if (!this._exportedSenderRefCounter[dcId]) this._exportedSenderRefCounter[dcId] = {}; if (!this._exportedSenderPromises[dcId][i] || shouldReconnect) { if (this._shouldDebugExportedSenders) { @@ -546,6 +573,7 @@ class TelegramClient { console.warn(`🕒 Connecting to exported sender idx=${i} dc=${dcId}` + ` ${shouldReconnect ? '(reconnect)' : ''}`); } + this._exportedSenderRefCounter[dcId][i] = 0; this._exportedSenderPromises[dcId][i] = this._connectSender( existingSender || this._createExportedSender(dcId, i), dcId, @@ -573,20 +601,13 @@ class TelegramClient { return this._borrowExportedSender(dcId, true, undefined, i, isPremium); } + this._exportedSenderRefCounter[dcId][i] += 1; if (!this._exportedSenderReleaseTimeouts[dcId]) this._exportedSenderReleaseTimeouts[dcId] = {}; if (this._exportedSenderReleaseTimeouts[dcId][i]) { clearTimeout(this._exportedSenderReleaseTimeouts[dcId][i]); this._exportedSenderReleaseTimeouts[dcId][i] = undefined; } - this._exportedSenderReleaseTimeouts[dcId][i] = setTimeout(() => { - // eslint-disable-next-line no-console - if (this._shouldDebugExportedSenders) console.log(`🚪 Release idx=${i} dcId=${dcId}`); - sender.disconnect(); - this._exportedSenderReleaseTimeouts[dcId][i] = undefined; - this._exportedSenderPromises[dcId][i] = undefined; - }, EXPORTED_SENDER_RELEASE_TIMEOUT); - return sender; } @@ -875,6 +896,7 @@ class TelegramClient { }); const sender = await this._borrowExportedSender(WEBDOCUMENT_DC_ID); const res = await sender.send(downloaded); + this.releaseExportedSender(sender); offset += 131072; if (res.bytes.length) { buff.push(res.bytes); @@ -921,6 +943,7 @@ class TelegramClient { }); const sender = await this._borrowExportedSender(WEBDOCUMENT_DC_ID); const res = await sender.send(downloaded); + this.releaseExportedSender(sender); offset += 131072; if (res.bytes.length) { buff.push(res.bytes); @@ -965,7 +988,8 @@ class TelegramClient { throw new Error('You can only invoke MTProtoRequests'); } - let sender = dcId === undefined ? this._sender : await this.getSender(dcId); + const isExported = dcId !== undefined; + let sender = !isExported ? this._sender : await this.getSender(dcId); this._lastRequest = Date.now(); await this._connectedDeferred.promise; @@ -978,6 +1002,7 @@ class TelegramClient { try { const result = await state.promise; state.finished.resolve(); + if (isExported) this.releaseExportedSender(sender); return result; } catch (e) { if (e instanceof errors.ServerError || e.message === 'RPC_CALL_FAIL' @@ -990,6 +1015,7 @@ class TelegramClient { await sleep(e.seconds * 1000); } else { state.finished.resolve(); + if (isExported) this.releaseExportedSender(sender); throw e; } } else if (e instanceof errors.PhoneMigrateError || e instanceof errors.NetworkMigrateError @@ -999,9 +1025,11 @@ class TelegramClient { || e instanceof errors.NetworkMigrateError; if (shouldRaise && await checkAuthorization(this)) { state.finished.resolve(); + if (isExported) this.releaseExportedSender(sender); throw e; } await this._switchDC(e.newDc); + if (isExported) this.releaseExportedSender(sender); sender = dcId === undefined ? this._sender : await this.getSender(dcId); } else if (e instanceof errors.MsgWaitError) { // We need to resend this after the old one was confirmed. @@ -1015,16 +1043,19 @@ class TelegramClient { } else if (e instanceof errors.TimedOutError) { if (!shouldRetryOnTimeout) { state.finished.resolve(); + if (isExported) this.releaseExportedSender(sender); throw e; } } else { state.finished.resolve(); + if (isExported) this.releaseExportedSender(sender); throw e; } } state.resetPromise(); } + if (isExported) this.releaseExportedSender(sender); throw new Error(`Request was unsuccessful ${attempt} time(s)`); } @@ -1033,9 +1064,12 @@ class TelegramClient { throw new Error('You can only invoke MTProtoRequests'); } - const sender = dcId === undefined ? this._sender : await this.getSender(dcId); + const isExported = dcId !== undefined; + const sender = !isExported ? this._sender : await this.getSender(dcId); sender.sendBeacon(request); + + if (isExported) this.releaseExportedSender(sender); } setIsPremium(isPremium) { diff --git a/src/lib/gramjs/client/downloadFile.ts b/src/lib/gramjs/client/downloadFile.ts index 03d07802f..ddf6c00a0 100644 --- a/src/lib/gramjs/client/downloadFile.ts +++ b/src/lib/gramjs/client/downloadFile.ts @@ -263,6 +263,7 @@ async function downloadFile2( } }), ]); + client.releaseExportedSender(sender); isDone2 = true; if (progressCallback) { @@ -299,6 +300,7 @@ async function downloadFile2( if (deferred) deferred.resolve(); hasEnded = true; + client.releaseExportedSender(sender); throw err; } } diff --git a/src/lib/gramjs/client/uploadFile.ts b/src/lib/gramjs/client/uploadFile.ts index a82e395e7..7c2e2a715 100644 --- a/src/lib/gramjs/client/uploadFile.ts +++ b/src/lib/gramjs/client/uploadFile.ts @@ -122,6 +122,7 @@ export async function uploadFile( bytes: Buffer.from(partBytes), }), ); + client.releaseExportedSender(sender); isDone2 = true; } catch (err) { logWithSenderIndex(`❗️️️Upload part failed ${err?.toString()} j=${jMemo}`); @@ -133,6 +134,7 @@ export async function uploadFile( continue; } foremans[senderIndex].releaseWorker(); + client.releaseExportedSender(sender); throw err; }