diff --git a/src/lib/gramjs/client/TelegramClient.js b/src/lib/gramjs/client/TelegramClient.js index 04f876b36..e8ee9b4f2 100644 --- a/src/lib/gramjs/client/TelegramClient.js +++ b/src/lib/gramjs/client/TelegramClient.js @@ -174,7 +174,7 @@ class TelegramClient { }); } // set defaults vars - this._sender.userDisconnected = true; + this._sender.userDisconnected = false; this._sender._user_connected = false; this._sender._reconnecting = false; this._sender._disconnected = true; @@ -317,6 +317,9 @@ class TelegramClient { // export region _cleanupExportedSender(dcId) { + if (this.session.dcId !== dcId) { + this.session.setAuthKey(undefined, dcId); + } this._exportedSenderPromises[dcId] = undefined; } @@ -343,6 +346,7 @@ class TelegramClient { sender._authenticated = true; } sender.dcId = dcId; + sender.userDisconnected = false; return sender; } catch (err) { diff --git a/src/lib/gramjs/client/downloadFile.ts b/src/lib/gramjs/client/downloadFile.ts index 5d61e8328..f7b9a8e72 100644 --- a/src/lib/gramjs/client/downloadFile.ts +++ b/src/lib/gramjs/client/downloadFile.ts @@ -107,7 +107,6 @@ export async function downloadFile( // used to populate the sender await client.getSender(dcId); - // eslint-disable-next-line no-constant-condition while (true) { let limit = partSize; @@ -129,12 +128,8 @@ export async function downloadFile( promises.push((async (offsetMemo: number) => { // eslint-disable-next-line no-constant-condition while (true) { - const sender = await client.getSender(dcId); try { - if (!sender._user_connected) { - await sleep(DISCONNECT_SLEEP); - continue; - } + const sender = await client.getSender(dcId); const result = await sender.send(new Api.upload.GetFile({ location: inputLocation, offset: offsetMemo, diff --git a/src/lib/gramjs/client/uploadFile.ts b/src/lib/gramjs/client/uploadFile.ts index 9437c1980..b24bc5393 100644 --- a/src/lib/gramjs/client/uploadFile.ts +++ b/src/lib/gramjs/client/uploadFile.ts @@ -38,8 +38,8 @@ export async function uploadFile( const partCount = Math.floor((size + partSize - 1) / partSize); const buffer = Buffer.from(await fileToBuffer(file)); - // We always upload from the DC we are in. - const sender = await client.getSender(client.session.dcId); + // Make sure a new sender can be created before starting upload + await client.getSender(client.session.dcId); if (!workers || !size) { workers = 1; @@ -66,11 +66,9 @@ export async function uploadFile( // eslint-disable-next-line no-loop-func sendingParts.push((async (jMemo: number, bytesMemo: Buffer) => { while (true) { - if (!sender._user_connected) { - await sleep(DISCONNECT_SLEEP); - continue; - } try { + // We always upload from the DC we are in + const sender = await client.getSender(client.session.dcId); await sender.send( isLarge ? new Api.upload.SaveBigFilePart({ diff --git a/src/lib/gramjs/extensions/PromisedWebSockets.js b/src/lib/gramjs/extensions/PromisedWebSockets.js index 4c3582157..652e8a702 100644 --- a/src/lib/gramjs/extensions/PromisedWebSockets.js +++ b/src/lib/gramjs/extensions/PromisedWebSockets.js @@ -7,7 +7,7 @@ const WebSocketClient = require('websocket').w3cwebsocket; const closeError = new Error('WebSocket was closed'); class PromisedWebSockets { - constructor() { + constructor(disconnectedCallback) { /* CONTEST this.isBrowser = typeof process === 'undefined' || process.type === 'renderer' || @@ -17,6 +17,7 @@ class PromisedWebSockets { */ this.client = undefined; this.closed = true; + this.disconnectedCallback = disconnectedCallback; } async readExactly(number) { @@ -96,6 +97,9 @@ class PromisedWebSockets { console.error(`Socket ${ip} closed. Code: ${code}, reason: ${reason}, was clean: ${wasClean}`); this.resolveRead(false); this.closed = true; + if (this.disconnectedCallback) { + this.disconnectedCallback(); + } }; // CONTEST // Seems to not be working, at least in a web worker diff --git a/src/lib/gramjs/network/MTProtoSender.js b/src/lib/gramjs/network/MTProtoSender.js index 89e3ced28..a9e664afd 100644 --- a/src/lib/gramjs/network/MTProtoSender.js +++ b/src/lib/gramjs/network/MTProtoSender.js @@ -826,7 +826,7 @@ class MTProtoSender { async _reconnect() { this._log.debug('Closing current connection...'); try { - await this.disconnect(); + await this._disconnect(); } catch (err) { this._log.warn(err); } @@ -834,7 +834,14 @@ class MTProtoSender { this._send_queue.append(undefined); this._state.reset(); - await this.connect(this._connection, true); + // For some reason reusing existing connection caused stuck requests + const newConnection = new this._connection.constructor( + this._connection._ip, + this._connection._port, + this._connection._dcId, + this._connection._log, + ); + await this.connect(newConnection, true); this._reconnecting = false; // uncomment this if you want to resend diff --git a/src/lib/gramjs/network/connection/Connection.js b/src/lib/gramjs/network/connection/Connection.js index 303037065..29264f485 100644 --- a/src/lib/gramjs/network/connection/Connection.js +++ b/src/lib/gramjs/network/connection/Connection.js @@ -29,7 +29,11 @@ class Connection { this._recvArray = new AsyncQueue(); // this.socket = new PromiseSocket(new Socket()) - this.socket = new PromisedWebSockets(); + this.socket = new PromisedWebSockets(this.disconnectCallback.bind(this)); + } + + async disconnectCallback() { + await this.disconnect(true); } async _connect() { @@ -51,10 +55,16 @@ class Connection { this._recvTask = this._recvLoop(); } - async disconnect() { + async disconnect(fromCallback = false) { + if (!this._connected) { + return; + } + this._connected = false; void this._recvArray.push(undefined); - await this.socket.close(); + if (!fromCallback) { + await this.socket.close(); + } } async send(data) {