GramJS: Bugfixes (#3280)

This commit is contained in:
Alexander Zinchuk 2023-06-18 12:03:16 +02:00
parent 381b22f689
commit 0681ad44e2
11 changed files with 191 additions and 176 deletions

View File

@ -15,7 +15,7 @@ function readBigIntFromBuffer(buffer, little = true, signed = false) {
randBuffer = randBuffer.reverse();
}
let bigInt = BigInt(randBuffer.toString('hex'), 16);
if (signed && Math.floor(bigInt.toString('2').length / 8) >= bytesNumber) {
if (signed && Math.floor(bigInt.toString(2).length / 8) >= bytesNumber) {
bigInt = bigInt.subtract(BigInt(2)
.pow(BigInt(bytesNumber * 8)));
}
@ -48,7 +48,7 @@ function toSignedLittleBuffer(big, number = 8) {
*/
function readBufferFromBigInt(bigInt, bytesNumber, little = true, signed = false) {
bigInt = BigInt(bigInt);
const bitLength = bigInt.bitLength();
const bitLength = bigInt.bitLength().toJSNumber();
const bytes = Math.ceil(bitLength / 8);
if (bytesNumber < bytes) {
@ -63,38 +63,20 @@ function readBufferFromBigInt(bigInt, bytesNumber, little = true, signed = false
bigInt = bigInt.abs();
}
const hex = bigInt.toString('16')
.padStart(bytesNumber * 2, '0');
let l = Buffer.from(hex, 'hex');
if (little) {
l = l.reverse();
}
const hex = bigInt.toString(16).padStart(bytesNumber * 2, '0');
let buffer = Buffer.from(hex, 'hex');
if (signed && below) {
if (little) {
let reminder = false;
if (l[0] !== 0) {
l[0] -= 1;
}
for (let i = 0; i < l.length; i++) {
if (l[i] === 0) {
reminder = true;
continue;
}
if (reminder) {
l[i] -= 1;
reminder = false;
}
l[i] = 255 - l[i];
}
} else {
l[l.length - 1] = 256 - l[l.length - 1];
for (let i = 0; i < l.length - 1; i++) {
l[i] = 255 - l[i];
}
buffer[buffer.length - 1] = 256 - buffer[buffer.length - 1];
for (let i = 0; i < buffer.length - 1; i++) {
buffer[i] = 255 - buffer[i];
}
}
return l;
if (little) {
buffer = buffer.reverse();
}
return buffer;
}
/**

View File

@ -10,7 +10,9 @@ declare class TelegramClient {
async start(authParams: UserAuthParams | BotAuthParams);
async invoke<R extends Api.AnyRequest>(request: R, dcId?: number): Promise<R['__response']>;
async invoke<R extends Api.AnyRequest>(
request: R, dcId?: number, abortSignal?: AbortSignal,
): Promise<R['__response']>;
async uploadFile(uploadParams: UploadFileParams): ReturnType<typeof uploadFile>;
@ -20,6 +22,8 @@ declare class TelegramClient {
async getTmpPassword(currentPassword: string, ttl?: number): Promise<TmpPasswordResult>;
setPingCallback(callback: () => Promise<void>);
// Untyped methods.
[prop: string]: any;
}

View File

@ -26,6 +26,7 @@ const {
updateTwoFaSettings,
getTmpPassword,
} = require('./2fa');
const RequestState = require('../network/RequestState');
const DEFAULT_DC_ID = 2;
const WEBDOCUMENT_DC_ID = 4;
@ -230,6 +231,10 @@ class TelegramClient {
}
}
setPingCallback(callback) {
this.pingCallback = callback;
}
async _updateLoop() {
let lastPongAt;
@ -279,19 +284,16 @@ class TelegramClient {
if (this._sender.isReconnecting || this._isSwitchingDc) {
continue;
}
await this.disconnect();
await this.connect();
this._sender.reconnect();
}
// We need to send some content-related request at least hourly
// for Telegram to keep delivering updates, otherwise they will
// just stop even if we're connected. Do so every 30 minutes.
// TODO Call getDifference instead since it's more relevant
if (new Date().getTime() - this._lastRequest > 30 * 60 * 1000) {
try {
await this.invoke(new requests.updates.GetState());
await this.pingCallback();
} catch (e) {
// we don't care about errors here
}
@ -867,18 +869,22 @@ class TelegramClient {
* @returns {Promise}
*/
async invoke(request, dcId) {
async invoke(request, dcId, abortSignal) {
if (request.classType !== 'request') {
throw new Error('You can only invoke MTProtoRequests');
}
const sender = dcId === undefined ? this._sender : await this.getSender(dcId);
this._lastRequest = new Date().getTime();
const state = new RequestState(request, abortSignal);
let attempt = 0;
for (attempt = 0; attempt < this._requestRetries; attempt++) {
const promise = sender.sendWithInvokeSupport(request);
sender.addStateToQueue(state);
try {
const result = await promise.promise;
const result = await state.promise;
state.finished.resolve();
return result;
} catch (e) {
if (e instanceof errors.ServerError || e.message === 'RPC_CALL_FAIL'
@ -890,6 +896,7 @@ class TelegramClient {
this._log.info(`Sleeping for ${e.seconds}s on flood wait`);
await sleep(e.seconds * 1000);
} else {
state.finished.resolve();
throw e;
}
} else if (e instanceof errors.PhoneMigrateError || e instanceof errors.NetworkMigrateError
@ -898,20 +905,26 @@ class TelegramClient {
const shouldRaise = e instanceof errors.PhoneMigrateError
|| e instanceof errors.NetworkMigrateError;
if (shouldRaise && await checkAuthorization(this)) {
state.finished.resolve();
throw e;
}
await this._switchDC(e.newDc);
} else if (e instanceof errors.MsgWaitError) {
// we need to resend this after the old one was confirmed.
await promise.isReady();
// We need to resend this after the old one was confirmed.
await state.isReady();
state.after = undefined;
} else if (e.message === 'CONNECTION_NOT_INITED') {
await this.disconnect();
await sleep(2000);
await this.connect();
} else {
state.finished.resolve();
throw e;
}
}
state.resetPromise();
}
throw new Error(`Request was unsuccessful ${attempt} time(s)`);
}
@ -987,11 +1000,10 @@ class TelegramClient {
// this._stateCache.update(update)
}
_processUpdate(update, others, entities) {
_processUpdate(update, entities) {
update._entities = entities || [];
const args = {
update,
others,
};
this._dispatchUpdate(args);
}
@ -1227,9 +1239,6 @@ class TelegramClient {
*/
async _dispatchUpdate(args = {
update: undefined,
others: undefined,
channelId: undefined,
ptsDate: undefined,
}) {
for (const [builder, callback] of this._eventBuilders) {
const event = builder.build(args.update);

View File

@ -2,7 +2,12 @@
let _level;
class Logger {
static levels = ['error', 'warn', 'info', 'debug'];
static LEVEL_MAP = new Map([
['error', new Set(['error'])],
['warn', new Set(['error', 'warn'])],
['info', new Set(['error', 'warn', 'info'])],
['debug', new Set(['error', 'warn', 'info', 'debug'])],
]);
constructor(level) {
if (!_level) {
@ -45,18 +50,13 @@ class Logger {
* @returns {boolean}
*/
canSend(level) {
return (Logger.levels.indexOf(_level) >= Logger.levels.indexOf(level));
return Logger.LEVEL_MAP.get(_level).has(level);
}
/**
* @param message {string}
*/
warn(message) {
// todo remove later
if (_level === 'debug') {
// eslint-disable-next-line no-console
console.error(new Error().stack);
}
this._log('warn', message, this.colors.warn);
}
@ -78,11 +78,6 @@ class Logger {
* @param message {string}
*/
error(message) {
// todo remove later
if (_level === 'debug') {
// eslint-disable-next-line no-console
console.error(new Error().stack);
}
this._log('error', message, this.colors.error);
}

View File

@ -2,10 +2,10 @@ const MessageContainer = require('../tl/core/MessageContainer');
const TLMessage = require('../tl/core/TLMessage');
const BinaryWriter = require('./BinaryWriter');
const USE_INVOKE_AFTER_WITH = [
const USE_INVOKE_AFTER_WITH = new Set([
'messages.SendMessage', 'messages.SendMedia', 'messages.SendMultiMedia',
'messages.ForwardMessages', 'messages.SendInlineBotResult',
];
]);
class MessagePacker {
constructor(state, logger) {
@ -22,21 +22,38 @@ class MessagePacker {
return this._queue;
}
append(state) {
// we need to check if there is already a request with the same name that we should send after.
if (state && USE_INVOKE_AFTER_WITH.includes(state.request.className)) {
// we now need to check if there is any request in queue already.
// we loop backwards since the latest request is the most recent
for (let i = this._queue.length - 1; i >= 0; i--) {
if (USE_INVOKE_AFTER_WITH.includes(this._queue[i].request.className)) {
state.after = this._queue[i];
break;
append(state, setReady = true, atStart = false) {
// We need to check if there is already a `USE_INVOKE_AFTER_WITH` request
if (state && USE_INVOKE_AFTER_WITH.has(state.request.className)) {
if (atStart) {
// Assign `after` for the previously first `USE_INVOKE_AFTER_WITH` request
for (let i = 0; i < this._queue.length; i++) {
if (USE_INVOKE_AFTER_WITH.has(this._queue[i]?.request.className)) {
this._queue[i].after = state;
break;
}
}
} else {
// Assign after for the previous `USE_INVOKE_AFTER_WITH` request
for (let i = this._queue.length - 1; i >= 0; i--) {
if (USE_INVOKE_AFTER_WITH.has(this._queue[i]?.request.className)) {
state.after = this._queue[i];
break;
}
}
}
}
this._queue.push(state);
this.setReady(true);
if (atStart) {
this._queue.unshift(state);
} else {
this._queue.push(state);
}
if (setReady) {
this.setReady(true);
}
// 1658238041=MsgsAck, we don't care about MsgsAck here because they never resolve anyway.
if (state && state.request.CONSTRUCTOR_ID !== 1658238041) {
this._pendingStates.push(state);
@ -50,10 +67,18 @@ class MessagePacker {
}
}
prepend(states) {
states.reverse().forEach((state) => {
this.append(state, false, true);
});
this.setReady(true);
}
extend(states) {
for (const state of states) {
this._queue.push(state);
}
states.forEach((state) => {
this.append(state, false);
});
this.setReady(true);
}
@ -76,6 +101,15 @@ class MessagePacker {
while (this._queue.length && batch.length <= MessageContainer.MAXIMUM_LENGTH) {
const state = this._queue.shift();
if (!state) {
continue;
}
if (state.abortSignal?.aborted) {
state.reject(new Error('Request aborted'));
continue;
}
size += state.data.length + TLMessage.SIZE_OVERHEAD;
if (size <= MessageContainer.MAXIMUM_SIZE) {
let afterId;
@ -90,10 +124,12 @@ class MessagePacker {
batch.push(state);
continue;
}
if (batch.length) {
this._queue.unshift(state);
break;
}
this._log.warn(`Message payload for ${state.request.className
|| state.request.constructor.name} is too long ${state.data.length} and cannot be sent`);
state.reject('Request Payload is too big');
@ -122,12 +158,6 @@ class MessagePacker {
data,
};
}
rejectAll() {
this._pendingStates.forEach((requestState) => {
requestState.reject(new Error('Disconnect'));
});
}
}
module.exports = MessagePacker;

View File

@ -0,0 +1,33 @@
class PendingState {
constructor() {
this._pending = new Map();
}
set(msgId, state) {
this._pending.set(msgId.toString(), state);
}
get(msgId) {
return this._pending.get(msgId.toString());
}
getAndDelete(msgId) {
const state = this.get(msgId);
this.delete(msgId);
return state;
}
values() {
return Array.from(this._pending.values());
}
delete(msgId) {
this._pending.delete(msgId.toString());
}
clear() {
this._pending.clear();
}
}
module.exports = PendingState;

View File

@ -18,6 +18,7 @@ const {
} = require('../tl').constructors;
const MessagePacker = require('../extensions/MessagePacker');
const BinaryReader = require('../extensions/BinaryReader');
const PendingState = require('../extensions/PendingState');
const {
UpdateConnectionState,
UpdateServerTimeOffset,
@ -36,7 +37,6 @@ const {
} = require('../tl').constructors;
const { SecurityError } = require('../errors/Common');
const { InvalidBufferError } = require('../errors/Common');
const { LogOut } = require('../tl').requests.auth;
const { RPCMessageToError } = require('../errors');
const { TypeNotFoundError } = require('../errors/Common');
@ -124,7 +124,7 @@ class MTProtoSender {
/**
* Sent states are remembered until a response is received.
*/
this._pending_state = {};
this._pending_state = new PendingState();
/**
* Responses must be acknowledged, and we can also batch these.
@ -196,6 +196,7 @@ class MTProtoSender {
}
}
this.isConnecting = false;
return true;
}
@ -235,29 +236,17 @@ class MTProtoSender {
Since the receiving part is "built in" the future, it's
impossible to await receive a result that was never sent.
* @param request
* @param abortSignal
* @returns {RequestState}
*/
send(request) {
if (!this._user_connected) {
throw new Error('Cannot send requests while disconnected');
}
const state = new RequestState(request);
send(request, abortSignal) {
const state = new RequestState(request, abortSignal);
this._send_queue.append(state);
return state.promise;
}
/**
* Same as send but returns the full state. usefull for invoke after logic
* @param request
* @return {RequestState}
*/
sendWithInvokeSupport(request) {
if (!this._user_connected) {
throw new Error('Cannot send requests while disconnected');
}
const state = new RequestState(request, undefined, this._pending_state);
addStateToQueue(state) {
this._send_queue.append(state);
return state;
}
/**
@ -315,8 +304,6 @@ class MTProtoSender {
}
async _disconnect() {
this._send_queue.rejectAll();
if (this._updateCallback) {
this._updateCallback(new UpdateConnectionState(UpdateConnectionState.disconnected));
}
@ -340,7 +327,9 @@ class MTProtoSender {
* @private
*/
async _sendLoop() {
this._send_queue = new MessagePacker(this._state, this._log);
// Retry previous pending requests
this._send_queue.prepend(this._pending_state.values());
this._pending_state.clear();
while (this._user_connected && !this.isReconnecting) {
if (this._pending_ack.size) {
@ -381,12 +370,12 @@ class MTProtoSender {
for (const state of batch) {
if (!Array.isArray(state)) {
if (state.request.classType === 'request') {
this._pending_state[state.msgId] = state;
this._pending_state.set(state.msgId, state);
}
} else {
for (const s of state) {
if (s.request.classType === 'request') {
this._pending_state[s.msgId] = s;
this._pending_state.set(s.msgId, s);
}
}
}
@ -506,27 +495,23 @@ class MTProtoSender {
* @private
*/
_popStates(msgId) {
let state = this._pending_state[msgId];
const state = this._pending_state.getAndDelete(msgId);
if (state) {
this._pending_state[msgId].deferred.resolve();
delete this._pending_state[msgId];
return [state];
}
const toPop = [];
for (state of Object.values(this._pending_state)) {
if (state.containerId && state.containerId.equals(msgId)) {
toPop.push(state.msgId);
for (const pendingState of this._pending_state.values()) {
if (pendingState.containerId?.equals(msgId)) {
toPop.push(pendingState.msgId);
}
}
if (toPop.length) {
const temp = [];
for (const x of toPop) {
temp.push(this._pending_state[x]);
this._pending_state[x].deferred.resolve();
delete this._pending_state[x];
temp.push(this._pending_state.getAndDelete(x));
}
return temp;
}
@ -550,11 +535,7 @@ class MTProtoSender {
*/
_handleRPCResult(message) {
const result = message.obj;
const state = this._pending_state[result.reqMsgId];
if (state) {
state.deferred.resolve();
delete this._pending_state[result.reqMsgId];
}
const state = this._pending_state.getAndDelete(result.reqMsgId);
this._log.debug(`Handling RPC result for message ${result.reqMsgId}`);
if (!state) {
@ -577,6 +558,7 @@ class MTProtoSender {
}
}
}
if (result.error) {
// eslint-disable-next-line new-cap
const error = RPCMessageToError(result.error, state.request);
@ -652,9 +634,7 @@ class MTProtoSender {
}
this._log.debug(`Handling pong for message ${pong.msgId}`);
const state = this._pending_state[pong.msgId];
this._pending_state[pong.msgId].deferred.resolve();
delete this._pending_state[pong.msgId];
const state = this._pending_state.getAndDelete(pong.msgId);
// Todo Check result
if (state) {
@ -767,35 +747,9 @@ class MTProtoSender {
}
/**
* Handles a server acknowledge about our messages. Normally
* these can be ignored except in the case of ``auth.logOut``:
*
* auth.logOut#5717da40 = Bool;
*
* Telegram doesn't seem to send its result so we need to confirm
* it manually. No other request is known to have this behaviour.
* Since the ID of sent messages consisting of a container is
* never returned (unless on a bad notification), this method
* also removes containers messages when any of their inner
* messages are acknowledged.
* @param message
* @returns {Promise<void>}
* @private
*/
_handleAck(message) {
const ack = message.obj;
this._log.debug(`Handling acknowledge for ${ack.msgIds}`);
for (const msgId of ack.msgIds) {
const state = this._pending_state[msgId];
if (state && state.request instanceof LogOut) {
this._pending_state[msgId].deferred.resolve();
delete this._pending_state[msgId];
state.resolve(true);
}
}
}
* Handles a server acknowledge about our messages. Normally these can be ignored
*/
_handleAck() { }
/**
* Handles future salt results, which don't come inside a
@ -810,11 +764,9 @@ class MTProtoSender {
// TODO save these salts and automatically adjust to the
// correct one whenever the salt in use expires.
this._log.debug(`Handling future salts for message ${message.msgId}`);
const state = this._pending_state[message.msgId];
const state = this._pending_state.getAndDelete(message.msgId);
if (state) {
this._pending_state[message].deferred.resolve();
delete this._pending_state[message];
state.resolve(message.obj);
}
}
@ -828,8 +780,10 @@ class MTProtoSender {
*/
_handleStateForgotten(message) {
this._send_queue.append(
new RequestState(new MsgsStateInfo(message.msgId, String.fromCharCode(1)
.repeat(message.obj.msgIds))),
new RequestState(new MsgsStateInfo({
msgId: message.msgId,
query: String.fromCharCode(1).repeat(message.obj.msgIds),
})),
);
}
@ -882,12 +836,7 @@ class MTProtoSender {
await this.connect(newConnection, true);
this.isReconnecting = false;
// uncomment this if you want to resend
// this._send_queue.extend(Object.values(this._pending_state))
for (const state of Object.values(this._pending_state)) {
state.deferred.resolve();
}
this._pending_state = {};
if (this._autoReconnectCallback) {
await this._autoReconnectCallback();
}

View File

@ -123,9 +123,12 @@ class MTProtoState {
body = await GZIPPacked.gzipIfSmaller(contentRelated, data);
} else {
// Invoke query expects a query with a getBytes func
body = await GZIPPacked.gzipIfSmaller(contentRelated, new InvokeAfterMsg(afterId, {
getBytes() {
return data;
body = await GZIPPacked.gzipIfSmaller(contentRelated, new InvokeAfterMsg({
msgId: afterId,
query: {
getBytes() {
return data;
},
},
}).getBytes());
}

View File

@ -1,28 +1,36 @@
const { default: Deferred } = require('../../../util/Deferred');
const Deferred = require('../../../util/Deferred').default;
class RequestState {
constructor(request, after = undefined, pending = {}) {
constructor(request, abortSignal = undefined) {
this.containerId = undefined;
this.msgId = undefined;
this.request = request;
this.data = request.getBytes();
this.after = after;
this.after = undefined;
this.result = undefined;
this.pending = pending;
this.deferred = new Deferred();
this.abortSignal = abortSignal;
this.finished = new Deferred();
this.resetPromise();
}
isReady() {
if (!this.after) {
return true;
}
return this.after.finished.promise;
}
resetPromise() {
// Prevent stuck await
this.reject?.();
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
isReady() {
const state = this.pending[this.after.id];
if (!state) {
return true;
}
return state.deferred.promise;
}
}
module.exports = RequestState;

View File

@ -1113,6 +1113,7 @@ chatlists.chatlistInvite#1dcd839d flags:# title:string emoticon:flags.0?string p
chatlists.chatlistUpdates#93bd878d missing_peers:Vector<Peer> chats:Vector<Chat> users:Vector<User> = chatlists.ChatlistUpdates;
bots.botInfo#e8a775b0 name:string about:string description:string = bots.BotInfo;
---functions---
invokeAfterMsg#cb9f372d {X:Type} msg_id:long query:!X = X;
initConnection#c1cd5ea9 {X:Type} flags:# api_id:int device_model:string system_version:string app_version:string system_lang_code:string lang_pack:string lang_code:string proxy:flags.0?InputClientProxy params:flags.1?JSONValue query:!X = X;
invokeWithLayer#da9b0d0d {X:Type} layer:int query:!X = X;
auth.sendCode#a677244f phone_number:string api_id:int api_hash:string settings:CodeSettings = auth.SentCode;

View File

@ -1,4 +1,5 @@
[
"invokeAfterMsg",
"initConnection",
"invokeWithLayer",
"auth.sendCode",