diff --git a/src/lib/gramjs/client/TelegramClient.js b/src/lib/gramjs/client/TelegramClient.js index da2604a37..9fbdbd5cf 100644 --- a/src/lib/gramjs/client/TelegramClient.js +++ b/src/lib/gramjs/client/TelegramClient.js @@ -394,6 +394,7 @@ class TelegramClient { try { await this.disconnect(); + this._sender.destroy(); } catch (err) { // Do nothing } diff --git a/src/lib/gramjs/extensions/MessagePacker.js b/src/lib/gramjs/extensions/MessagePacker.js index f0023d315..1962704b0 100644 --- a/src/lib/gramjs/extensions/MessagePacker.js +++ b/src/lib/gramjs/extensions/MessagePacker.js @@ -22,6 +22,11 @@ class MessagePacker { return this._queue; } + clear() { + this._queue = []; + this.append(undefined); + } + append(state, setReady = true, atStart = false) { // We need to check if there is already a `USE_INVOKE_AFTER_WITH` request if (state && USE_INVOKE_AFTER_WITH.has(state.request.className)) { @@ -116,7 +121,7 @@ class MessagePacker { async get() { if (!this._queue[this._queue.length - 1]) { - this._queue = []; + this._queue = this._queue.filter(Boolean); return undefined; } let data; diff --git a/src/lib/gramjs/network/MTProtoSender.js b/src/lib/gramjs/network/MTProtoSender.js index 67eda6099..1b7d5c2bb 100644 --- a/src/lib/gramjs/network/MTProtoSender.js +++ b/src/lib/gramjs/network/MTProtoSender.js @@ -299,6 +299,10 @@ class MTProtoSender { await this._disconnect(this.getConnection()); } + destroy() { + this._send_queue.clear(); + } + /** * This method enqueues the given request to be sent. Its send @@ -461,6 +465,10 @@ class MTProtoSender { this._log.error(e); this._log.info('Connection closed while sending data'); this._long_poll_loop_handle = undefined; + this.isSendingLongPoll = false; + if (!this.userDisconnected) { + this.reconnect(); + } return; } @@ -521,17 +529,33 @@ class MTProtoSender { this.logWithIndex.debug(`Got ${res?.batch.length} message(s) to send`); + if (!res) { + continue; + } + + let { data } = res; + const { batch } = res; + + for (const state of batch) { + if (!Array.isArray(state)) { + if (state.request.classType === 'request' && state.request.className !== 'HttpWait') { + this._pending_state.set(state.msgId, state); + } + } else { + for (const s of state) { + if (s.request.classType === 'request' && s.request.className !== 'HttpWait') { + this._pending_state.set(s.msgId, s); + } + } + } + } + if (this.isReconnecting) { this.logWithIndex.debug('Reconnecting :('); this._send_loop_handle = undefined; return; } - if (!res) { - continue; - } - let { data } = res; - const { batch } = res; this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`); this.logWithIndex.debug('Sending', batch.map((m) => m.request.className)); @@ -544,29 +568,28 @@ class MTProtoSender { this._log.error(e); this._log.info('Connection closed while sending data'); this._send_loop_handle = undefined; + if (!this.userDisconnected) { + this.reconnect(); + } return; - } - for (const state of batch) { - if (!Array.isArray(state)) { - if (state.request.classType === 'request') { - this._pending_state.set(state.msgId, state); - } - if (state.request.className === 'HttpWait') { - state.resolve(); - } - } else { - for (const s of state) { - if (s.request.classType === 'request') { - this._pending_state.set(s.msgId, s); - } - if (s.request.className === 'HttpWait') { + } finally { + for (const state of batch) { + if (!Array.isArray(state)) { + if (state.request.className === 'HttpWait') { state.resolve(); } + } else { + for (const s of state) { + if (s.request.className === 'HttpWait') { + state.resolve(); + } + } } } + + this.logWithIndex.debug('Encrypted messages put in a queue to be sent'); + this._log.debug('Encrypted messages put in a queue to be sent'); } - this.logWithIndex.debug('Encrypted messages put in a queue to be sent'); - this._log.debug('Encrypted messages put in a queue to be sent'); } this._send_loop_handle = undefined;