GramJS: Bind WebSocket mutex to instance (#6953)

This commit is contained in:
zubiden 2026-05-15 18:38:02 +02:00 committed by Alexander Zinchuk
parent 6cff49e8b7
commit 2c82b5eed2
2 changed files with 34 additions and 9 deletions

View File

@ -1,12 +1,12 @@
import { Mutex } from 'async-mutex'; import { Mutex } from 'async-mutex';
const mutex = new Mutex();
const closeError = new Error('WebSocket was closed'); const closeError = new Error('WebSocket was closed');
const CONNECTION_TIMEOUT = 3000; const CONNECTION_TIMEOUT = 3000;
const MAX_TIMEOUT = 30000; const MAX_TIMEOUT = 30000;
export default class PromisedWebSockets { export default class PromisedWebSockets {
private readonly mutex = new Mutex();
private closed: boolean; private closed: boolean;
private timeout: number; private timeout: number;
@ -92,16 +92,20 @@ export default class PromisedWebSockets {
this.closed = false; this.closed = false;
this.website = this.getWebSocketLink(ip, port, isTestServer, isPremium); this.website = this.getWebSocketLink(ip, port, isTestServer, isPremium);
this.client = new WebSocket(this.website, 'binary'); this.client = new WebSocket(this.website, 'binary');
this.client.binaryType = 'arraybuffer';
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!this.client) return; if (!this.client) return;
let hasResolved = false; let hasResolved = false;
let timeout: ReturnType<typeof globalThis.setTimeout> | undefined; let timeout: ReturnType<typeof globalThis.setTimeout> | undefined;
this.client.onopen = () => { this.client.onopen = () => {
this.receive(); this.receive();
resolve(this); resolve(this);
hasResolved = true; hasResolved = true;
if (timeout) clearTimeout(timeout); if (timeout) clearTimeout(timeout);
}; };
this.client.onerror = (error) => { this.client.onerror = (error) => {
// eslint-disable-next-line no-console // eslint-disable-next-line no-console
console.error('WebSocket error', error); console.error('WebSocket error', error);
@ -109,6 +113,7 @@ export default class PromisedWebSockets {
hasResolved = true; hasResolved = true;
if (timeout) clearTimeout(timeout); if (timeout) clearTimeout(timeout);
}; };
this.client.onclose = (event) => { this.client.onclose = (event) => {
const { code, reason, wasClean } = event; const { code, reason, wasClean } = event;
if (code !== 1000) { if (code !== 1000) {
@ -165,7 +170,7 @@ export default class PromisedWebSockets {
receive() { receive() {
if (!this.client) return; if (!this.client) return;
this.client.onmessage = async (message) => { this.client.onmessage = async (message) => {
await mutex.runExclusive(async () => { await this.mutex.runExclusive(async () => {
const data = message.data instanceof ArrayBuffer const data = message.data instanceof ArrayBuffer
? Buffer.from(message.data) ? Buffer.from(message.data)
: Buffer.from(await new Response(message.data).arrayBuffer()); : Buffer.from(await new Response(message.data).arrayBuffer());

View File

@ -452,6 +452,8 @@ export default class MTProtoSender {
* @private * @private
*/ */
async _connect(connection: Connection) { async _connect(connection: Connection) {
const wasReconnecting = this.isReconnecting;
if (!connection.isConnected()) { if (!connection.isConnected()) {
this._log.info('Connecting to {0}...'.replace('{0}', connection._ip)); this._log.info('Connecting to {0}...'.replace('{0}', connection._ip));
await connection.connect(); await connection.connect();
@ -490,6 +492,8 @@ export default class MTProtoSender {
if (!this._sendLoopHandle) { if (!this._sendLoopHandle) {
this._log.debug('Starting send loop'); this._log.debug('Starting send loop');
this._sendLoopHandle = this._sendLoop(); this._sendLoopHandle = this._sendLoop();
} else if (wasReconnecting) {
this.retryPendingStates();
} }
if (!this._recvLoopHandle) { if (!this._recvLoopHandle) {
@ -576,9 +580,7 @@ export default class MTProtoSender {
* @private * @private
*/ */
async _sendLoop() { async _sendLoop() {
// Retry previous pending requests this.retryPendingStates();
this._sendQueue.prepend(this._pendingState.values());
this._pendingState.clear();
while (this._userConnected && !this.isReconnecting) { while (this._userConnected && !this.isReconnecting) {
const appendAcks = () => { const appendAcks = () => {
@ -647,11 +649,23 @@ export default class MTProtoSender {
this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`); this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`);
this.logWithIndex.debug('Sending', batch.map((m) => m.request.className)); this.logWithIndex.debug('Sending', batch.map((m) => m.request.className));
const connection = this.getConnection();
data = await this._state.encryptMessageData(data); data = await this._state.encryptMessageData(data);
if (this.isReconnecting) {
this.logWithIndex.debug('Reconnecting :(');
this._sendLoopHandle = undefined;
return;
}
if (!connection || connection !== this.getConnection()) {
this.retryPendingStates();
continue;
}
try { try {
await this.getConnection()!.send(data); await connection.send(data);
} catch (e: any) { } catch (e: any) {
this.logWithIndex.debug(`Connection closed while sending data ${e}`); this.logWithIndex.debug(`Connection closed while sending data ${e}`);
this._log.info('Connection closed while sending data'); this._log.info('Connection closed while sending data');
@ -1192,11 +1206,17 @@ export default class MTProtoSender {
await this.connect(newConnection, true, newFallbackConnection); await this.connect(newConnection, true, newFallbackConnection);
this.isReconnecting = false; this.isReconnecting = false;
this._sendQueue.prepend(this._pendingState.values());
this._pendingState.clear();
if (this._autoReconnectCallback) { if (this._autoReconnectCallback) {
await this._autoReconnectCallback(); await this._autoReconnectCallback();
} }
} }
private retryPendingStates() {
const pendingStates = this._pendingState.values();
if (!pendingStates.length) return;
this._sendQueue.prepend(pendingStates);
this._pendingState.clear();
}
} }