GramJs: Fix resend and pending state handling (#3643)

This commit is contained in:
Alexander Zinchuk 2023-07-27 11:48:04 +02:00
parent 7c76f4d539
commit cd15c71b3d
3 changed files with 52 additions and 23 deletions

View File

@ -394,6 +394,7 @@ class TelegramClient {
try {
await this.disconnect();
this._sender.destroy();
} catch (err) {
// Do nothing
}

View File

@ -22,6 +22,11 @@ class MessagePacker {
return this._queue;
}
clear() {
this._queue = [];
this.append(undefined);
}
append(state, setReady = true, atStart = false) {
// We need to check if there is already a `USE_INVOKE_AFTER_WITH` request
if (state && USE_INVOKE_AFTER_WITH.has(state.request.className)) {
@ -116,7 +121,7 @@ class MessagePacker {
async get() {
if (!this._queue[this._queue.length - 1]) {
this._queue = [];
this._queue = this._queue.filter(Boolean);
return undefined;
}
let data;

View File

@ -299,6 +299,10 @@ class MTProtoSender {
await this._disconnect(this.getConnection());
}
destroy() {
this._send_queue.clear();
}
/**
*
This method enqueues the given request to be sent. Its send
@ -461,6 +465,10 @@ class MTProtoSender {
this._log.error(e);
this._log.info('Connection closed while sending data');
this._long_poll_loop_handle = undefined;
this.isSendingLongPoll = false;
if (!this.userDisconnected) {
this.reconnect();
}
return;
}
@ -521,17 +529,33 @@ class MTProtoSender {
this.logWithIndex.debug(`Got ${res?.batch.length} message(s) to send`);
if (!res) {
continue;
}
let { data } = res;
const { batch } = res;
for (const state of batch) {
if (!Array.isArray(state)) {
if (state.request.classType === 'request' && state.request.className !== 'HttpWait') {
this._pending_state.set(state.msgId, state);
}
} else {
for (const s of state) {
if (s.request.classType === 'request' && s.request.className !== 'HttpWait') {
this._pending_state.set(s.msgId, s);
}
}
}
}
if (this.isReconnecting) {
this.logWithIndex.debug('Reconnecting :(');
this._send_loop_handle = undefined;
return;
}
if (!res) {
continue;
}
let { data } = res;
const { batch } = res;
this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`);
this.logWithIndex.debug('Sending', batch.map((m) => m.request.className));
@ -544,29 +568,28 @@ class MTProtoSender {
this._log.error(e);
this._log.info('Connection closed while sending data');
this._send_loop_handle = undefined;
if (!this.userDisconnected) {
this.reconnect();
}
return;
}
for (const state of batch) {
if (!Array.isArray(state)) {
if (state.request.classType === 'request') {
this._pending_state.set(state.msgId, state);
}
if (state.request.className === 'HttpWait') {
state.resolve();
}
} else {
for (const s of state) {
if (s.request.classType === 'request') {
this._pending_state.set(s.msgId, s);
}
if (s.request.className === 'HttpWait') {
} finally {
for (const state of batch) {
if (!Array.isArray(state)) {
if (state.request.className === 'HttpWait') {
state.resolve();
}
} else {
for (const s of state) {
if (s.request.className === 'HttpWait') {
state.resolve();
}
}
}
}
this.logWithIndex.debug('Encrypted messages put in a queue to be sent');
this._log.debug('Encrypted messages put in a queue to be sent');
}
this.logWithIndex.debug('Encrypted messages put in a queue to be sent');
this._log.debug('Encrypted messages put in a queue to be sent');
}
this._send_loop_handle = undefined;