From 0681ad44e2d44263073c1b5d69a4bee394e27c9c Mon Sep 17 00:00:00 2001 From: Alexander Zinchuk Date: Sun, 18 Jun 2023 12:03:16 +0200 Subject: [PATCH] GramJS: Bugfixes (#3280) --- src/lib/gramjs/Helpers.js | 42 +++----- src/lib/gramjs/client/TelegramClient.d.ts | 6 +- src/lib/gramjs/client/TelegramClient.js | 39 +++++--- src/lib/gramjs/extensions/Logger.js | 19 ++-- src/lib/gramjs/extensions/MessagePacker.js | 74 +++++++++----- src/lib/gramjs/extensions/PendingState.js | 33 +++++++ src/lib/gramjs/network/MTProtoSender.js | 109 ++++++--------------- src/lib/gramjs/network/MTProtoState.js | 9 +- src/lib/gramjs/network/RequestState.js | 34 ++++--- src/lib/gramjs/tl/apiTl.js | 1 + src/lib/gramjs/tl/static/api.json | 1 + 11 files changed, 191 insertions(+), 176 deletions(-) create mode 100644 src/lib/gramjs/extensions/PendingState.js diff --git a/src/lib/gramjs/Helpers.js b/src/lib/gramjs/Helpers.js index de5fd701e..235f37571 100644 --- a/src/lib/gramjs/Helpers.js +++ b/src/lib/gramjs/Helpers.js @@ -15,7 +15,7 @@ function readBigIntFromBuffer(buffer, little = true, signed = false) { randBuffer = randBuffer.reverse(); } let bigInt = BigInt(randBuffer.toString('hex'), 16); - if (signed && Math.floor(bigInt.toString('2').length / 8) >= bytesNumber) { + if (signed && Math.floor(bigInt.toString(2).length / 8) >= bytesNumber) { bigInt = bigInt.subtract(BigInt(2) .pow(BigInt(bytesNumber * 8))); } @@ -48,7 +48,7 @@ function toSignedLittleBuffer(big, number = 8) { */ function readBufferFromBigInt(bigInt, bytesNumber, little = true, signed = false) { bigInt = BigInt(bigInt); - const bitLength = bigInt.bitLength(); + const bitLength = bigInt.bitLength().toJSNumber(); const bytes = Math.ceil(bitLength / 8); if (bytesNumber < bytes) { @@ -63,38 +63,20 @@ function readBufferFromBigInt(bigInt, bytesNumber, little = true, signed = false bigInt = bigInt.abs(); } - const hex = bigInt.toString('16') - .padStart(bytesNumber * 2, '0'); - let l = Buffer.from(hex, 'hex'); - if (little) { - l = l.reverse(); - } + const hex = bigInt.toString(16).padStart(bytesNumber * 2, '0'); + let buffer = Buffer.from(hex, 'hex'); if (signed && below) { - if (little) { - let reminder = false; - if (l[0] !== 0) { - l[0] -= 1; - } - for (let i = 0; i < l.length; i++) { - if (l[i] === 0) { - reminder = true; - continue; - } - if (reminder) { - l[i] -= 1; - reminder = false; - } - l[i] = 255 - l[i]; - } - } else { - l[l.length - 1] = 256 - l[l.length - 1]; - for (let i = 0; i < l.length - 1; i++) { - l[i] = 255 - l[i]; - } + buffer[buffer.length - 1] = 256 - buffer[buffer.length - 1]; + for (let i = 0; i < buffer.length - 1; i++) { + buffer[i] = 255 - buffer[i]; } } - return l; + if (little) { + buffer = buffer.reverse(); + } + + return buffer; } /** diff --git a/src/lib/gramjs/client/TelegramClient.d.ts b/src/lib/gramjs/client/TelegramClient.d.ts index 81c52682e..cb091b756 100644 --- a/src/lib/gramjs/client/TelegramClient.d.ts +++ b/src/lib/gramjs/client/TelegramClient.d.ts @@ -10,7 +10,9 @@ declare class TelegramClient { async start(authParams: UserAuthParams | BotAuthParams); - async invoke(request: R, dcId?: number): Promise; + async invoke( + request: R, dcId?: number, abortSignal?: AbortSignal, + ): Promise; async uploadFile(uploadParams: UploadFileParams): ReturnType; @@ -20,6 +22,8 @@ declare class TelegramClient { async getTmpPassword(currentPassword: string, ttl?: number): Promise; + setPingCallback(callback: () => Promise); + // Untyped methods. [prop: string]: any; } diff --git a/src/lib/gramjs/client/TelegramClient.js b/src/lib/gramjs/client/TelegramClient.js index 63204e15e..b7e674b74 100644 --- a/src/lib/gramjs/client/TelegramClient.js +++ b/src/lib/gramjs/client/TelegramClient.js @@ -26,6 +26,7 @@ const { updateTwoFaSettings, getTmpPassword, } = require('./2fa'); +const RequestState = require('../network/RequestState'); const DEFAULT_DC_ID = 2; const WEBDOCUMENT_DC_ID = 4; @@ -230,6 +231,10 @@ class TelegramClient { } } + setPingCallback(callback) { + this.pingCallback = callback; + } + async _updateLoop() { let lastPongAt; @@ -279,19 +284,16 @@ class TelegramClient { if (this._sender.isReconnecting || this._isSwitchingDc) { continue; } - - await this.disconnect(); - await this.connect(); + this._sender.reconnect(); } // We need to send some content-related request at least hourly // for Telegram to keep delivering updates, otherwise they will // just stop even if we're connected. Do so every 30 minutes. - // TODO Call getDifference instead since it's more relevant if (new Date().getTime() - this._lastRequest > 30 * 60 * 1000) { try { - await this.invoke(new requests.updates.GetState()); + await this.pingCallback(); } catch (e) { // we don't care about errors here } @@ -867,18 +869,22 @@ class TelegramClient { * @returns {Promise} */ - async invoke(request, dcId) { + async invoke(request, dcId, abortSignal) { if (request.classType !== 'request') { throw new Error('You can only invoke MTProtoRequests'); } const sender = dcId === undefined ? this._sender : await this.getSender(dcId); this._lastRequest = new Date().getTime(); + + const state = new RequestState(request, abortSignal); + let attempt = 0; for (attempt = 0; attempt < this._requestRetries; attempt++) { - const promise = sender.sendWithInvokeSupport(request); + sender.addStateToQueue(state); try { - const result = await promise.promise; + const result = await state.promise; + state.finished.resolve(); return result; } catch (e) { if (e instanceof errors.ServerError || e.message === 'RPC_CALL_FAIL' @@ -890,6 +896,7 @@ class TelegramClient { this._log.info(`Sleeping for ${e.seconds}s on flood wait`); await sleep(e.seconds * 1000); } else { + state.finished.resolve(); throw e; } } else if (e instanceof errors.PhoneMigrateError || e instanceof errors.NetworkMigrateError @@ -898,20 +905,26 @@ class TelegramClient { const shouldRaise = e instanceof errors.PhoneMigrateError || e instanceof errors.NetworkMigrateError; if (shouldRaise && await checkAuthorization(this)) { + state.finished.resolve(); throw e; } await this._switchDC(e.newDc); } else if (e instanceof errors.MsgWaitError) { - // we need to resend this after the old one was confirmed. - await promise.isReady(); + // We need to resend this after the old one was confirmed. + await state.isReady(); + + state.after = undefined; } else if (e.message === 'CONNECTION_NOT_INITED') { await this.disconnect(); await sleep(2000); await this.connect(); } else { + state.finished.resolve(); throw e; } } + + state.resetPromise(); } throw new Error(`Request was unsuccessful ${attempt} time(s)`); } @@ -987,11 +1000,10 @@ class TelegramClient { // this._stateCache.update(update) } - _processUpdate(update, others, entities) { + _processUpdate(update, entities) { update._entities = entities || []; const args = { update, - others, }; this._dispatchUpdate(args); } @@ -1227,9 +1239,6 @@ class TelegramClient { */ async _dispatchUpdate(args = { update: undefined, - others: undefined, - channelId: undefined, - ptsDate: undefined, }) { for (const [builder, callback] of this._eventBuilders) { const event = builder.build(args.update); diff --git a/src/lib/gramjs/extensions/Logger.js b/src/lib/gramjs/extensions/Logger.js index fbd31ebd6..c90e08fd6 100644 --- a/src/lib/gramjs/extensions/Logger.js +++ b/src/lib/gramjs/extensions/Logger.js @@ -2,7 +2,12 @@ let _level; class Logger { - static levels = ['error', 'warn', 'info', 'debug']; + static LEVEL_MAP = new Map([ + ['error', new Set(['error'])], + ['warn', new Set(['error', 'warn'])], + ['info', new Set(['error', 'warn', 'info'])], + ['debug', new Set(['error', 'warn', 'info', 'debug'])], + ]); constructor(level) { if (!_level) { @@ -45,18 +50,13 @@ class Logger { * @returns {boolean} */ canSend(level) { - return (Logger.levels.indexOf(_level) >= Logger.levels.indexOf(level)); + return Logger.LEVEL_MAP.get(_level).has(level); } /** * @param message {string} */ warn(message) { - // todo remove later - if (_level === 'debug') { - // eslint-disable-next-line no-console - console.error(new Error().stack); - } this._log('warn', message, this.colors.warn); } @@ -78,11 +78,6 @@ class Logger { * @param message {string} */ error(message) { - // todo remove later - if (_level === 'debug') { - // eslint-disable-next-line no-console - console.error(new Error().stack); - } this._log('error', message, this.colors.error); } diff --git a/src/lib/gramjs/extensions/MessagePacker.js b/src/lib/gramjs/extensions/MessagePacker.js index 5ca041f32..78de69c60 100644 --- a/src/lib/gramjs/extensions/MessagePacker.js +++ b/src/lib/gramjs/extensions/MessagePacker.js @@ -2,10 +2,10 @@ const MessageContainer = require('../tl/core/MessageContainer'); const TLMessage = require('../tl/core/TLMessage'); const BinaryWriter = require('./BinaryWriter'); -const USE_INVOKE_AFTER_WITH = [ +const USE_INVOKE_AFTER_WITH = new Set([ 'messages.SendMessage', 'messages.SendMedia', 'messages.SendMultiMedia', 'messages.ForwardMessages', 'messages.SendInlineBotResult', -]; +]); class MessagePacker { constructor(state, logger) { @@ -22,21 +22,38 @@ class MessagePacker { return this._queue; } - append(state) { - // we need to check if there is already a request with the same name that we should send after. - if (state && USE_INVOKE_AFTER_WITH.includes(state.request.className)) { - // we now need to check if there is any request in queue already. - // we loop backwards since the latest request is the most recent - for (let i = this._queue.length - 1; i >= 0; i--) { - if (USE_INVOKE_AFTER_WITH.includes(this._queue[i].request.className)) { - state.after = this._queue[i]; - break; + append(state, setReady = true, atStart = false) { + // We need to check if there is already a `USE_INVOKE_AFTER_WITH` request + if (state && USE_INVOKE_AFTER_WITH.has(state.request.className)) { + if (atStart) { + // Assign `after` for the previously first `USE_INVOKE_AFTER_WITH` request + for (let i = 0; i < this._queue.length; i++) { + if (USE_INVOKE_AFTER_WITH.has(this._queue[i]?.request.className)) { + this._queue[i].after = state; + break; + } + } + } else { + // Assign after for the previous `USE_INVOKE_AFTER_WITH` request + for (let i = this._queue.length - 1; i >= 0; i--) { + if (USE_INVOKE_AFTER_WITH.has(this._queue[i]?.request.className)) { + state.after = this._queue[i]; + break; + } } } } - this._queue.push(state); - this.setReady(true); + if (atStart) { + this._queue.unshift(state); + } else { + this._queue.push(state); + } + + if (setReady) { + this.setReady(true); + } + // 1658238041=MsgsAck, we don't care about MsgsAck here because they never resolve anyway. if (state && state.request.CONSTRUCTOR_ID !== 1658238041) { this._pendingStates.push(state); @@ -50,10 +67,18 @@ class MessagePacker { } } + prepend(states) { + states.reverse().forEach((state) => { + this.append(state, false, true); + }); + + this.setReady(true); + } + extend(states) { - for (const state of states) { - this._queue.push(state); - } + states.forEach((state) => { + this.append(state, false); + }); this.setReady(true); } @@ -76,6 +101,15 @@ class MessagePacker { while (this._queue.length && batch.length <= MessageContainer.MAXIMUM_LENGTH) { const state = this._queue.shift(); + if (!state) { + continue; + } + + if (state.abortSignal?.aborted) { + state.reject(new Error('Request aborted')); + continue; + } + size += state.data.length + TLMessage.SIZE_OVERHEAD; if (size <= MessageContainer.MAXIMUM_SIZE) { let afterId; @@ -90,10 +124,12 @@ class MessagePacker { batch.push(state); continue; } + if (batch.length) { this._queue.unshift(state); break; } + this._log.warn(`Message payload for ${state.request.className || state.request.constructor.name} is too long ${state.data.length} and cannot be sent`); state.reject('Request Payload is too big'); @@ -122,12 +158,6 @@ class MessagePacker { data, }; } - - rejectAll() { - this._pendingStates.forEach((requestState) => { - requestState.reject(new Error('Disconnect')); - }); - } } module.exports = MessagePacker; diff --git a/src/lib/gramjs/extensions/PendingState.js b/src/lib/gramjs/extensions/PendingState.js new file mode 100644 index 000000000..f66601f91 --- /dev/null +++ b/src/lib/gramjs/extensions/PendingState.js @@ -0,0 +1,33 @@ +class PendingState { + constructor() { + this._pending = new Map(); + } + + set(msgId, state) { + this._pending.set(msgId.toString(), state); + } + + get(msgId) { + return this._pending.get(msgId.toString()); + } + + getAndDelete(msgId) { + const state = this.get(msgId); + this.delete(msgId); + return state; + } + + values() { + return Array.from(this._pending.values()); + } + + delete(msgId) { + this._pending.delete(msgId.toString()); + } + + clear() { + this._pending.clear(); + } +} + +module.exports = PendingState; diff --git a/src/lib/gramjs/network/MTProtoSender.js b/src/lib/gramjs/network/MTProtoSender.js index bc4633f08..febb9b938 100644 --- a/src/lib/gramjs/network/MTProtoSender.js +++ b/src/lib/gramjs/network/MTProtoSender.js @@ -18,6 +18,7 @@ const { } = require('../tl').constructors; const MessagePacker = require('../extensions/MessagePacker'); const BinaryReader = require('../extensions/BinaryReader'); +const PendingState = require('../extensions/PendingState'); const { UpdateConnectionState, UpdateServerTimeOffset, @@ -36,7 +37,6 @@ const { } = require('../tl').constructors; const { SecurityError } = require('../errors/Common'); const { InvalidBufferError } = require('../errors/Common'); -const { LogOut } = require('../tl').requests.auth; const { RPCMessageToError } = require('../errors'); const { TypeNotFoundError } = require('../errors/Common'); @@ -124,7 +124,7 @@ class MTProtoSender { /** * Sent states are remembered until a response is received. */ - this._pending_state = {}; + this._pending_state = new PendingState(); /** * Responses must be acknowledged, and we can also batch these. @@ -196,6 +196,7 @@ class MTProtoSender { } } this.isConnecting = false; + return true; } @@ -235,29 +236,17 @@ class MTProtoSender { Since the receiving part is "built in" the future, it's impossible to await receive a result that was never sent. * @param request + * @param abortSignal * @returns {RequestState} */ - send(request) { - if (!this._user_connected) { - throw new Error('Cannot send requests while disconnected'); - } - const state = new RequestState(request); + send(request, abortSignal) { + const state = new RequestState(request, abortSignal); this._send_queue.append(state); return state.promise; } - /** - * Same as send but returns the full state. usefull for invoke after logic - * @param request - * @return {RequestState} - */ - sendWithInvokeSupport(request) { - if (!this._user_connected) { - throw new Error('Cannot send requests while disconnected'); - } - const state = new RequestState(request, undefined, this._pending_state); + addStateToQueue(state) { this._send_queue.append(state); - return state; } /** @@ -315,8 +304,6 @@ class MTProtoSender { } async _disconnect() { - this._send_queue.rejectAll(); - if (this._updateCallback) { this._updateCallback(new UpdateConnectionState(UpdateConnectionState.disconnected)); } @@ -340,7 +327,9 @@ class MTProtoSender { * @private */ async _sendLoop() { - this._send_queue = new MessagePacker(this._state, this._log); + // Retry previous pending requests + this._send_queue.prepend(this._pending_state.values()); + this._pending_state.clear(); while (this._user_connected && !this.isReconnecting) { if (this._pending_ack.size) { @@ -381,12 +370,12 @@ class MTProtoSender { for (const state of batch) { if (!Array.isArray(state)) { if (state.request.classType === 'request') { - this._pending_state[state.msgId] = state; + this._pending_state.set(state.msgId, state); } } else { for (const s of state) { if (s.request.classType === 'request') { - this._pending_state[s.msgId] = s; + this._pending_state.set(s.msgId, s); } } } @@ -506,27 +495,23 @@ class MTProtoSender { * @private */ _popStates(msgId) { - let state = this._pending_state[msgId]; + const state = this._pending_state.getAndDelete(msgId); if (state) { - this._pending_state[msgId].deferred.resolve(); - delete this._pending_state[msgId]; return [state]; } const toPop = []; - for (state of Object.values(this._pending_state)) { - if (state.containerId && state.containerId.equals(msgId)) { - toPop.push(state.msgId); + for (const pendingState of this._pending_state.values()) { + if (pendingState.containerId?.equals(msgId)) { + toPop.push(pendingState.msgId); } } if (toPop.length) { const temp = []; for (const x of toPop) { - temp.push(this._pending_state[x]); - this._pending_state[x].deferred.resolve(); - delete this._pending_state[x]; + temp.push(this._pending_state.getAndDelete(x)); } return temp; } @@ -550,11 +535,7 @@ class MTProtoSender { */ _handleRPCResult(message) { const result = message.obj; - const state = this._pending_state[result.reqMsgId]; - if (state) { - state.deferred.resolve(); - delete this._pending_state[result.reqMsgId]; - } + const state = this._pending_state.getAndDelete(result.reqMsgId); this._log.debug(`Handling RPC result for message ${result.reqMsgId}`); if (!state) { @@ -577,6 +558,7 @@ class MTProtoSender { } } } + if (result.error) { // eslint-disable-next-line new-cap const error = RPCMessageToError(result.error, state.request); @@ -652,9 +634,7 @@ class MTProtoSender { } this._log.debug(`Handling pong for message ${pong.msgId}`); - const state = this._pending_state[pong.msgId]; - this._pending_state[pong.msgId].deferred.resolve(); - delete this._pending_state[pong.msgId]; + const state = this._pending_state.getAndDelete(pong.msgId); // Todo Check result if (state) { @@ -767,35 +747,9 @@ class MTProtoSender { } /** - * Handles a server acknowledge about our messages. Normally - * these can be ignored except in the case of ``auth.logOut``: - * - * auth.logOut#5717da40 = Bool; - * - * Telegram doesn't seem to send its result so we need to confirm - * it manually. No other request is known to have this behaviour. - - * Since the ID of sent messages consisting of a container is - * never returned (unless on a bad notification), this method - * also removes containers messages when any of their inner - * messages are acknowledged. - - * @param message - * @returns {Promise} - * @private - */ - _handleAck(message) { - const ack = message.obj; - this._log.debug(`Handling acknowledge for ${ack.msgIds}`); - for (const msgId of ack.msgIds) { - const state = this._pending_state[msgId]; - if (state && state.request instanceof LogOut) { - this._pending_state[msgId].deferred.resolve(); - delete this._pending_state[msgId]; - state.resolve(true); - } - } - } + * Handles a server acknowledge about our messages. Normally these can be ignored + */ + _handleAck() { } /** * Handles future salt results, which don't come inside a @@ -810,11 +764,9 @@ class MTProtoSender { // TODO save these salts and automatically adjust to the // correct one whenever the salt in use expires. this._log.debug(`Handling future salts for message ${message.msgId}`); - const state = this._pending_state[message.msgId]; + const state = this._pending_state.getAndDelete(message.msgId); if (state) { - this._pending_state[message].deferred.resolve(); - delete this._pending_state[message]; state.resolve(message.obj); } } @@ -828,8 +780,10 @@ class MTProtoSender { */ _handleStateForgotten(message) { this._send_queue.append( - new RequestState(new MsgsStateInfo(message.msgId, String.fromCharCode(1) - .repeat(message.obj.msgIds))), + new RequestState(new MsgsStateInfo({ + msgId: message.msgId, + query: String.fromCharCode(1).repeat(message.obj.msgIds), + })), ); } @@ -882,12 +836,7 @@ class MTProtoSender { await this.connect(newConnection, true); this.isReconnecting = false; - // uncomment this if you want to resend - // this._send_queue.extend(Object.values(this._pending_state)) - for (const state of Object.values(this._pending_state)) { - state.deferred.resolve(); - } - this._pending_state = {}; + if (this._autoReconnectCallback) { await this._autoReconnectCallback(); } diff --git a/src/lib/gramjs/network/MTProtoState.js b/src/lib/gramjs/network/MTProtoState.js index 576c03974..0f748bea0 100644 --- a/src/lib/gramjs/network/MTProtoState.js +++ b/src/lib/gramjs/network/MTProtoState.js @@ -123,9 +123,12 @@ class MTProtoState { body = await GZIPPacked.gzipIfSmaller(contentRelated, data); } else { // Invoke query expects a query with a getBytes func - body = await GZIPPacked.gzipIfSmaller(contentRelated, new InvokeAfterMsg(afterId, { - getBytes() { - return data; + body = await GZIPPacked.gzipIfSmaller(contentRelated, new InvokeAfterMsg({ + msgId: afterId, + query: { + getBytes() { + return data; + }, }, }).getBytes()); } diff --git a/src/lib/gramjs/network/RequestState.js b/src/lib/gramjs/network/RequestState.js index 69c74ad17..c07500c93 100644 --- a/src/lib/gramjs/network/RequestState.js +++ b/src/lib/gramjs/network/RequestState.js @@ -1,28 +1,36 @@ -const { default: Deferred } = require('../../../util/Deferred'); +const Deferred = require('../../../util/Deferred').default; class RequestState { - constructor(request, after = undefined, pending = {}) { + constructor(request, abortSignal = undefined) { this.containerId = undefined; this.msgId = undefined; this.request = request; this.data = request.getBytes(); - this.after = after; + this.after = undefined; this.result = undefined; - this.pending = pending; - this.deferred = new Deferred(); + this.abortSignal = abortSignal; + this.finished = new Deferred(); + + this.resetPromise(); + } + + isReady() { + if (!this.after) { + return true; + } + + return this.after.finished.promise; + } + + resetPromise() { + // Prevent stuck await + this.reject?.(); + this.promise = new Promise((resolve, reject) => { this.resolve = resolve; this.reject = reject; }); } - - isReady() { - const state = this.pending[this.after.id]; - if (!state) { - return true; - } - return state.deferred.promise; - } } module.exports = RequestState; diff --git a/src/lib/gramjs/tl/apiTl.js b/src/lib/gramjs/tl/apiTl.js index 549b3a4a0..105c532b1 100644 --- a/src/lib/gramjs/tl/apiTl.js +++ b/src/lib/gramjs/tl/apiTl.js @@ -1113,6 +1113,7 @@ chatlists.chatlistInvite#1dcd839d flags:# title:string emoticon:flags.0?string p chatlists.chatlistUpdates#93bd878d missing_peers:Vector chats:Vector users:Vector = chatlists.ChatlistUpdates; bots.botInfo#e8a775b0 name:string about:string description:string = bots.BotInfo; ---functions--- +invokeAfterMsg#cb9f372d {X:Type} msg_id:long query:!X = X; initConnection#c1cd5ea9 {X:Type} flags:# api_id:int device_model:string system_version:string app_version:string system_lang_code:string lang_pack:string lang_code:string proxy:flags.0?InputClientProxy params:flags.1?JSONValue query:!X = X; invokeWithLayer#da9b0d0d {X:Type} layer:int query:!X = X; auth.sendCode#a677244f phone_number:string api_id:int api_hash:string settings:CodeSettings = auth.SentCode; diff --git a/src/lib/gramjs/tl/static/api.json b/src/lib/gramjs/tl/static/api.json index fb16f3458..bab7cd0a1 100644 --- a/src/lib/gramjs/tl/static/api.json +++ b/src/lib/gramjs/tl/static/api.json @@ -1,4 +1,5 @@ [ + "invokeAfterMsg", "initConnection", "invokeWithLayer", "auth.sendCode",