GramJs: Fix upload hanging (#3475)

This commit is contained in:
Alexander Zinchuk 2023-07-05 13:16:30 +02:00
parent 332ae408a4
commit 44b3f4d8dd
3 changed files with 92 additions and 12 deletions

View File

@ -72,6 +72,7 @@ import { getEmojiOnlyCountForMessage } from '../../../global/helpers/getEmojiOnl
import { getServerTimeOffset } from '../../../util/serverTime';
import { getApiChatIdFromMtpPeer } from '../apiBuilders/peers';
import { updateChannelState } from '../updateManager';
import { compact } from '../../../util/iteratees';
const FAST_SEND_TIMEOUT = 1000;
const INPUT_WAVEFORM_LENGTH = 63;
@ -620,11 +621,18 @@ async function uploadMedia(localMessage: ApiMessage, attachment: ApiAttachment,
}
};
const file = await fetchFile(blobUrl, filename);
const inputFile = await uploadFile(file, patchedOnProgress);
const fetchAndUpload = async (url: string) => {
const file = await fetchFile(url, filename);
return uploadFile(file, patchedOnProgress);
};
const thumbFile = previewBlobUrl && await fetchFile(previewBlobUrl, filename);
const thumb = thumbFile ? await uploadFile(thumbFile) : undefined;
const isVideo = SUPPORTED_VIDEO_CONTENT_TYPES.has(mimeType);
const shouldUploadThumb = audio || isVideo || shouldSendAsFile;
const [inputFile, thumb] = await Promise.all(compact([
fetchAndUpload(blobUrl),
shouldUploadThumb && previewBlobUrl && fetchAndUpload(previewBlobUrl),
]));
const attributes: GramJs.TypeDocumentAttribute[] = [new GramJs.DocumentAttributeFilename({ fileName: filename })];
if (!shouldSendAsFile) {
@ -636,7 +644,7 @@ async function uploadMedia(localMessage: ApiMessage, attachment: ApiAttachment,
});
}
if (SUPPORTED_VIDEO_CONTENT_TYPES.has(mimeType)) {
if (isVideo) {
const { width, height, duration } = quick;
if (duration !== undefined) {
attributes.push(new GramJs.DocumentAttributeVideo({

View File

@ -27,12 +27,21 @@ const {
getTmpPassword,
} = require('./2fa');
const RequestState = require('../network/RequestState');
const withAbortCheck = require('../../../util/withAbortCheck').default;
const { AbortError } = require('../../../util/withAbortCheck');
class ConnectTimeoutError extends Error {
constructor() {
super('Connection Timeout');
}
}
const DEFAULT_DC_ID = 2;
const WEBDOCUMENT_DC_ID = 4;
const EXPORTED_SENDER_RECONNECT_TIMEOUT = 1000; // 1 sec
const EXPORTED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec
const WEBDOCUMENT_REQUEST_PART_SIZE = 131072; // 128kb
const EXPORTED_SENDER_CONNECT_TIMEOUT = 8000; // 8 sec
const PING_INTERVAL = 3000; // 3 sec
const PING_TIMEOUT = 5000; // 5 sec
@ -156,6 +165,7 @@ class TelegramClient {
this._config = undefined;
this.phoneCodeHashes = [];
this._exportedSenderPromises = {};
this._exportedSenderAbortControllers = {};
this._waitingForAuthKey = {};
this._exportedSenderReleaseTimeouts = {};
this._additionalDcsDisabled = args.additionalDcsDisabled;
@ -328,6 +338,7 @@ class TelegramClient {
);
this._exportedSenderPromises = {};
this._exportedSenderAbortControllers = {};
this._waitingForAuthKey = {};
}
@ -374,6 +385,7 @@ class TelegramClient {
}
const sender = await this._exportedSenderPromises[dcId][index];
delete this._exportedSenderPromises[dcId][index];
delete this._exportedSenderAbortControllers[dcId][index];
await sender.disconnect();
}
@ -388,6 +400,7 @@ class TelegramClient {
}
this._exportedSenderPromises[dcId] = {};
this._exportedSenderAbortControllers[dcId] = {};
await Promise.all(promises.map(async (promise) => {
const sender = await promise;
@ -395,7 +408,7 @@ class TelegramClient {
}));
}
async _connectSender(sender, dcId, isPremium = false) {
async _connectSender(sender, dcId, index, isPremium = false, abortSignal) {
// if we don't already have an auth key we want to use normal DCs not -1
let hasAuthKey = Boolean(sender.authKey.getKey());
let firstConnectResolver;
@ -419,7 +432,7 @@ class TelegramClient {
// eslint-disable-next-line no-constant-condition
while (true) {
try {
await sender.connect(new this._connection(
await withAbortCheck(abortSignal, sender.connect(new this._connection(
dc.ipAddress,
dc.port,
dcId,
@ -427,18 +440,23 @@ class TelegramClient {
this._args.testServers,
// Premium DCs are not stable for obtaining auth keys, so need to we first connect to regular ones
hasAuthKey ? isPremium : false,
));
)));
if (this.session.dcId !== dcId && !sender._authenticated) {
this._log.info(`Exporting authorization for data center ${dc.ipAddress}`);
const auth = await this.invoke(new requests.auth.ExportAuthorization({ dcId }));
const auth = await withAbortCheck(
abortSignal,
this.invoke(new requests.auth.ExportAuthorization({ dcId })),
);
const req = this._initWith(new requests.auth.ImportAuthorization({
id: auth.id,
bytes: auth.bytes,
}));
await sender.send(req);
await withAbortCheck(abortSignal, sender.send(req));
sender._authenticated = true;
}
sender.dcId = dcId;
sender.userDisconnected = false;
@ -449,6 +467,12 @@ class TelegramClient {
return sender;
} catch (err) {
if (err instanceof AbortError) {
delete this._exportedSenderPromises[dcId][index];
delete this._exportedSenderAbortControllers[dcId][index];
sender.disconnect();
return undefined;
}
// eslint-disable-next-line no-console
console.error(err);
@ -458,26 +482,54 @@ class TelegramClient {
}
}
getConnectedExportedSenderIndex(dcId, i) {
const index = Object.keys(this._exportedSenderReleaseTimeouts[dcId] || {})[0];
const firstIndex = Number(index ?? 0);
return {
newIndex: firstIndex === i ? i + 1 : firstIndex,
noConnectedExportedSenders: index === undefined,
};
}
async _borrowExportedSender(dcId, shouldReconnect, existingSender, index, isPremium) {
if (this._additionalDcsDisabled) {
return undefined;
}
const i = index || 0;
let i = index || 0;
if (!this._exportedSenderPromises[dcId]) this._exportedSenderPromises[dcId] = {};
if (!this._exportedSenderAbortControllers[dcId]) this._exportedSenderAbortControllers[dcId] = {};
if (this._exportedSenderAbortControllers[dcId][i]?.signal.aborted) {
const { newIndex } = this.getConnectedExportedSenderIndex(dcId, i);
i = newIndex;
}
if (!this._exportedSenderPromises[dcId][i] || shouldReconnect) {
this._exportedSenderAbortControllers[dcId][i]?.abort();
this._exportedSenderAbortControllers[dcId][i] = new AbortController();
this._exportedSenderPromises[dcId][i] = this._connectSender(
existingSender || this._createExportedSender(dcId, i),
dcId,
index,
isPremium,
this._exportedSenderAbortControllers[dcId][i].signal,
);
}
let sender;
try {
sender = await this._exportedSenderPromises[dcId][i];
sender = await Promise.race([
this._exportedSenderPromises[dcId][i],
Helpers.sleep(EXPORTED_SENDER_CONNECT_TIMEOUT).then(() => {
return Promise.reject(new ConnectTimeoutError());
}),
]);
if (!sender) {
throw new ConnectTimeoutError();
}
if (!sender.isConnected()) {
if (sender.isConnecting) {
@ -488,6 +540,11 @@ class TelegramClient {
}
}
} catch (err) {
if (err instanceof ConnectTimeoutError) {
this._exportedSenderAbortControllers[dcId][i]?.abort();
const { newIndex, noConnectedExportedSenders } = this.getConnectedExportedSenderIndex(dcId, i);
return this._borrowExportedSender(dcId, noConnectedExportedSenders, undefined, newIndex, isPremium);
}
// eslint-disable-next-line no-console
console.error(err);

View File

@ -0,0 +1,15 @@
export class AbortError extends Error {
constructor() {
super('Aborted');
}
}
export default async function withAbortCheck<T>(abortSignal: AbortSignal, cb: Promise<T>): Promise<T> {
const result = await cb;
if (abortSignal?.aborted) {
throw new AbortError();
}
return result;
}