diff --git a/src/lib/gramjs/extensions/PromisedWebSockets.ts b/src/lib/gramjs/extensions/PromisedWebSockets.ts index 286d215c4..fbf499294 100644 --- a/src/lib/gramjs/extensions/PromisedWebSockets.ts +++ b/src/lib/gramjs/extensions/PromisedWebSockets.ts @@ -1,12 +1,12 @@ import { Mutex } from 'async-mutex'; -const mutex = new Mutex(); - const closeError = new Error('WebSocket was closed'); const CONNECTION_TIMEOUT = 3000; const MAX_TIMEOUT = 30000; export default class PromisedWebSockets { + private readonly mutex = new Mutex(); + private closed: boolean; private timeout: number; @@ -92,16 +92,20 @@ export default class PromisedWebSockets { this.closed = false; this.website = this.getWebSocketLink(ip, port, isTestServer, isPremium); this.client = new WebSocket(this.website, 'binary'); + this.client.binaryType = 'arraybuffer'; + return new Promise((resolve, reject) => { if (!this.client) return; let hasResolved = false; let timeout: ReturnType | undefined; + this.client.onopen = () => { this.receive(); resolve(this); hasResolved = true; if (timeout) clearTimeout(timeout); }; + this.client.onerror = (error) => { // eslint-disable-next-line no-console console.error('WebSocket error', error); @@ -109,6 +113,7 @@ export default class PromisedWebSockets { hasResolved = true; if (timeout) clearTimeout(timeout); }; + this.client.onclose = (event) => { const { code, reason, wasClean } = event; if (code !== 1000) { @@ -165,7 +170,7 @@ export default class PromisedWebSockets { receive() { if (!this.client) return; this.client.onmessage = async (message) => { - await mutex.runExclusive(async () => { + await this.mutex.runExclusive(async () => { const data = message.data instanceof ArrayBuffer ? Buffer.from(message.data) : Buffer.from(await new Response(message.data).arrayBuffer()); diff --git a/src/lib/gramjs/network/MTProtoSender.ts b/src/lib/gramjs/network/MTProtoSender.ts index 5653c5773..1584062b3 100644 --- a/src/lib/gramjs/network/MTProtoSender.ts +++ b/src/lib/gramjs/network/MTProtoSender.ts @@ -452,6 +452,8 @@ export default class MTProtoSender { * @private */ async _connect(connection: Connection) { + const wasReconnecting = this.isReconnecting; + if (!connection.isConnected()) { this._log.info('Connecting to {0}...'.replace('{0}', connection._ip)); await connection.connect(); @@ -490,6 +492,8 @@ export default class MTProtoSender { if (!this._sendLoopHandle) { this._log.debug('Starting send loop'); this._sendLoopHandle = this._sendLoop(); + } else if (wasReconnecting) { + this.retryPendingStates(); } if (!this._recvLoopHandle) { @@ -576,9 +580,7 @@ export default class MTProtoSender { * @private */ async _sendLoop() { - // Retry previous pending requests - this._sendQueue.prepend(this._pendingState.values()); - this._pendingState.clear(); + this.retryPendingStates(); while (this._userConnected && !this.isReconnecting) { const appendAcks = () => { @@ -647,11 +649,23 @@ export default class MTProtoSender { this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`); this.logWithIndex.debug('Sending', batch.map((m) => m.request.className)); + const connection = this.getConnection(); data = await this._state.encryptMessageData(data); + if (this.isReconnecting) { + this.logWithIndex.debug('Reconnecting :('); + this._sendLoopHandle = undefined; + return; + } + + if (!connection || connection !== this.getConnection()) { + this.retryPendingStates(); + continue; + } + try { - await this.getConnection()!.send(data); + await connection.send(data); } catch (e: any) { this.logWithIndex.debug(`Connection closed while sending data ${e}`); this._log.info('Connection closed while sending data'); @@ -1192,11 +1206,17 @@ export default class MTProtoSender { await this.connect(newConnection, true, newFallbackConnection); this.isReconnecting = false; - this._sendQueue.prepend(this._pendingState.values()); - this._pendingState.clear(); if (this._autoReconnectCallback) { await this._autoReconnectCallback(); } } + + private retryPendingStates() { + const pendingStates = this._pendingState.values(); + if (!pendingStates.length) return; + + this._sendQueue.prepend(pendingStates); + this._pendingState.clear(); + } }