GramJs: Fix exported senders release (#3625)

This commit is contained in:
Alexander Zinchuk 2023-07-20 17:53:21 +02:00
parent 6aa4d20458
commit c297a697cb
4 changed files with 48 additions and 17 deletions

View File

@ -84,13 +84,6 @@ export function initApi(onUpdate: OnApiUpdate, initialArgs: ApiInitialArgs) {
worker = new Worker(new URL('./worker.ts', import.meta.url));
subscribeToWorker(onUpdate);
if (requestStates.size > 0) {
requestStates.forEach((value) => {
// eslint-disable-next-line no-console
console.error('Hanging request', value.DEBUG_payload);
});
}
if (initialArgs.platform === 'iOS') {
setupIosHealthCheck();
}

View File

@ -170,6 +170,7 @@ class TelegramClient {
this._config = undefined;
this.phoneCodeHashes = [];
this._exportedSenderPromises = {};
this._exportedSenderRefCounter = {};
this._waitingForAuthKey = {};
this._exportedSenderReleaseTimeouts = {};
this._additionalDcsDisabled = args.additionalDcsDisabled;
@ -379,6 +380,7 @@ class TelegramClient {
});
});
this._exportedSenderRefCounter = {};
this._exportedSenderPromises = {};
this._waitingForAuthKey = {};
}
@ -429,6 +431,7 @@ class TelegramClient {
if (this._shouldDebugExportedSenders) console.log(`🧹 Cleanup idx=${index} dcId=${dcId}`);
const sender = await this._exportedSenderPromises[dcId][index];
delete this._exportedSenderPromises[dcId][index];
delete this._exportedSenderRefCounter[dcId][index];
await sender.disconnect();
}
@ -443,6 +446,7 @@ class TelegramClient {
}
this._exportedSenderPromises[dcId] = {};
this._exportedSenderRefCounter[dcId] = {};
await Promise.all(promises.map(async (promise) => {
const sender = await promise;
@ -531,6 +535,28 @@ class TelegramClient {
}
}
releaseExportedSender(sender) {
const dcId = sender._dcId;
const index = sender._senderIndex;
if (!this._exportedSenderRefCounter[dcId]) return;
if (!this._exportedSenderRefCounter[dcId][index]) return;
this._exportedSenderRefCounter[dcId][index] -= 1;
if (this._exportedSenderRefCounter[dcId][index] <= 0) {
if (!this._exportedSenderReleaseTimeouts[dcId]) this._exportedSenderReleaseTimeouts[dcId] = {};
this._exportedSenderReleaseTimeouts[dcId][index] = setTimeout(() => {
// eslint-disable-next-line no-console
if (this._shouldDebugExportedSenders) console.log(`[CC] [idx=${index} dcId=${dcId}] 🚪 Release`);
sender.disconnect();
this._exportedSenderReleaseTimeouts[dcId][index] = undefined;
this._exportedSenderPromises[dcId][index] = undefined;
}, EXPORTED_SENDER_RELEASE_TIMEOUT);
}
}
async _borrowExportedSender(dcId, shouldReconnect, existingSender, index, isPremium) {
if (this._additionalDcsDisabled) {
return undefined;
@ -539,6 +565,7 @@ class TelegramClient {
const i = index || 0;
if (!this._exportedSenderPromises[dcId]) this._exportedSenderPromises[dcId] = {};
if (!this._exportedSenderRefCounter[dcId]) this._exportedSenderRefCounter[dcId] = {};
if (!this._exportedSenderPromises[dcId][i] || shouldReconnect) {
if (this._shouldDebugExportedSenders) {
@ -546,6 +573,7 @@ class TelegramClient {
console.warn(`🕒 Connecting to exported sender idx=${i} dc=${dcId}`
+ ` ${shouldReconnect ? '(reconnect)' : ''}`);
}
this._exportedSenderRefCounter[dcId][i] = 0;
this._exportedSenderPromises[dcId][i] = this._connectSender(
existingSender || this._createExportedSender(dcId, i),
dcId,
@ -573,20 +601,13 @@ class TelegramClient {
return this._borrowExportedSender(dcId, true, undefined, i, isPremium);
}
this._exportedSenderRefCounter[dcId][i] += 1;
if (!this._exportedSenderReleaseTimeouts[dcId]) this._exportedSenderReleaseTimeouts[dcId] = {};
if (this._exportedSenderReleaseTimeouts[dcId][i]) {
clearTimeout(this._exportedSenderReleaseTimeouts[dcId][i]);
this._exportedSenderReleaseTimeouts[dcId][i] = undefined;
}
this._exportedSenderReleaseTimeouts[dcId][i] = setTimeout(() => {
// eslint-disable-next-line no-console
if (this._shouldDebugExportedSenders) console.log(`🚪 Release idx=${i} dcId=${dcId}`);
sender.disconnect();
this._exportedSenderReleaseTimeouts[dcId][i] = undefined;
this._exportedSenderPromises[dcId][i] = undefined;
}, EXPORTED_SENDER_RELEASE_TIMEOUT);
return sender;
}
@ -875,6 +896,7 @@ class TelegramClient {
});
const sender = await this._borrowExportedSender(WEBDOCUMENT_DC_ID);
const res = await sender.send(downloaded);
this.releaseExportedSender(sender);
offset += 131072;
if (res.bytes.length) {
buff.push(res.bytes);
@ -921,6 +943,7 @@ class TelegramClient {
});
const sender = await this._borrowExportedSender(WEBDOCUMENT_DC_ID);
const res = await sender.send(downloaded);
this.releaseExportedSender(sender);
offset += 131072;
if (res.bytes.length) {
buff.push(res.bytes);
@ -965,7 +988,8 @@ class TelegramClient {
throw new Error('You can only invoke MTProtoRequests');
}
let sender = dcId === undefined ? this._sender : await this.getSender(dcId);
const isExported = dcId !== undefined;
let sender = !isExported ? this._sender : await this.getSender(dcId);
this._lastRequest = Date.now();
await this._connectedDeferred.promise;
@ -978,6 +1002,7 @@ class TelegramClient {
try {
const result = await state.promise;
state.finished.resolve();
if (isExported) this.releaseExportedSender(sender);
return result;
} catch (e) {
if (e instanceof errors.ServerError || e.message === 'RPC_CALL_FAIL'
@ -990,6 +1015,7 @@ class TelegramClient {
await sleep(e.seconds * 1000);
} else {
state.finished.resolve();
if (isExported) this.releaseExportedSender(sender);
throw e;
}
} else if (e instanceof errors.PhoneMigrateError || e instanceof errors.NetworkMigrateError
@ -999,9 +1025,11 @@ class TelegramClient {
|| e instanceof errors.NetworkMigrateError;
if (shouldRaise && await checkAuthorization(this)) {
state.finished.resolve();
if (isExported) this.releaseExportedSender(sender);
throw e;
}
await this._switchDC(e.newDc);
if (isExported) this.releaseExportedSender(sender);
sender = dcId === undefined ? this._sender : await this.getSender(dcId);
} else if (e instanceof errors.MsgWaitError) {
// We need to resend this after the old one was confirmed.
@ -1015,16 +1043,19 @@ class TelegramClient {
} else if (e instanceof errors.TimedOutError) {
if (!shouldRetryOnTimeout) {
state.finished.resolve();
if (isExported) this.releaseExportedSender(sender);
throw e;
}
} else {
state.finished.resolve();
if (isExported) this.releaseExportedSender(sender);
throw e;
}
}
state.resetPromise();
}
if (isExported) this.releaseExportedSender(sender);
throw new Error(`Request was unsuccessful ${attempt} time(s)`);
}
@ -1033,9 +1064,12 @@ class TelegramClient {
throw new Error('You can only invoke MTProtoRequests');
}
const sender = dcId === undefined ? this._sender : await this.getSender(dcId);
const isExported = dcId !== undefined;
const sender = !isExported ? this._sender : await this.getSender(dcId);
sender.sendBeacon(request);
if (isExported) this.releaseExportedSender(sender);
}
setIsPremium(isPremium) {

View File

@ -263,6 +263,7 @@ async function downloadFile2(
}
}),
]);
client.releaseExportedSender(sender);
isDone2 = true;
if (progressCallback) {
@ -299,6 +300,7 @@ async function downloadFile2(
if (deferred) deferred.resolve();
hasEnded = true;
client.releaseExportedSender(sender);
throw err;
}
}

View File

@ -122,6 +122,7 @@ export async function uploadFile(
bytes: Buffer.from(partBytes),
}),
);
client.releaseExportedSender(sender);
isDone2 = true;
} catch (err) {
logWithSenderIndex(`Upload part failed ${err?.toString()} j=${jMemo}`);
@ -133,6 +134,7 @@ export async function uploadFile(
continue;
}
foremans[senderIndex].releaseWorker();
client.releaseExportedSender(sender);
throw err;
}