From fd11226602bf20597c03a9f618de352b7864773d Mon Sep 17 00:00:00 2001 From: zubiden <19638254+zubiden@users.noreply.github.com> Date: Tue, 11 Nov 2025 14:37:51 +0100 Subject: [PATCH] Media: Fix big file download speed (#6421) --- src/assets/localization/fallback.strings | 13 +- src/components/common/AnimatedFileSize.tsx | 87 ++++++++ src/components/common/Audio.tsx | 7 +- src/components/common/File.tsx | 20 +- src/components/common/helpers/documentInfo.ts | 13 -- .../left/settings/SettingsDataStorage.tsx | 3 +- .../common/PremiumLimitReachedModal.tsx | 22 +- .../mediaViewer/VideoPlayerControls.tsx | 4 +- src/global/helpers/messageMedia.ts | 2 +- src/hooks/useMediaWithLoadProgress.ts | 16 +- src/lib/gramjs/client/downloadFile.ts | 99 ++++----- src/lib/gramjs/client/uploadFile.ts | 28 +-- src/lib/gramjs/crypto/converters.ts | 55 ----- src/lib/gramjs/crypto/crypto.ts | 98 ++++++--- src/lib/gramjs/crypto/words.ts | 51 ----- src/types/language.d.ts | 28 +-- src/util/dcBandwithManager.ts | 191 ++++++++++++++++++ src/util/foreman.ts | 40 ---- src/util/mediaLoader.ts | 6 +- src/util/textFormat.ts | 12 +- 20 files changed, 461 insertions(+), 334 deletions(-) create mode 100644 src/components/common/AnimatedFileSize.tsx delete mode 100644 src/lib/gramjs/crypto/converters.ts delete mode 100644 src/lib/gramjs/crypto/words.ts create mode 100644 src/util/dcBandwithManager.ts delete mode 100644 src/util/foreman.ts diff --git a/src/assets/localization/fallback.strings b/src/assets/localization/fallback.strings index 3ff64fefb..711bee245 100644 --- a/src/assets/localization/fallback.strings +++ b/src/assets/localization/fallback.strings @@ -419,7 +419,6 @@ "ClearOtherWebSessionsHelp" = "You can log in on websites that support signing in with Telegram."; "AreYouSureWebSessions" = "Are you sure you want to disconnect all websites where you logged in with Telegram?"; "AutodownloadSizeLimitUpTo" = "up to {limit}"; -"FileSizeMB" = "{count} MB"; "AutoDownloadMaxFileSize" = "Maximum file size"; "AutoDownloadSettingsContacts" = "Contacts"; "AutoDownloadSettingsPrivateChats" = "Other Private Chats"; @@ -590,7 +589,6 @@ "ContactPhone" = "Phone Number"; "NewContact" = "New Contact"; "Done" = "Done"; -"FileSizeGB" = "{count} GB"; "LimitReached" = "Limit Reached"; "IncreaseLimit" = "Increase Limit"; "LimitFree" = "Free"; @@ -1223,8 +1221,6 @@ "WaitingForNetwork" = "Waiting for network..."; "ScheduleSendWhenOnline" = "Send When Online"; "VoipIncoming" = "Incoming call"; -"FileSizeB" = "{count} B"; -"FileSizeKB" = "{count} KB"; "Years_one" = "{count} year"; "Years_other" = "{count} years"; "MessageTimerShortHours" = "{count}h"; @@ -2315,6 +2311,15 @@ "UserNoteHint" = "only visible to you"; "EditUserNoteHint" = "Notes are only visible to you."; "AriaStoryTogglerOpen" = "Open Story List"; +"FileTransferProgress" = "{currentSize} / {totalSize}"; +"MediaSizeB_one" = "{size}B"; +"MediaSizeB_other" = "{size}B"; +"MediaSizeKB_one" = "{size}KB"; +"MediaSizeKB_other" = "{size}KB"; +"MediaSizeMB_one" = "{size}MB"; +"MediaSizeMB_other" = "{size}MB"; +"MediaSizeGB_one" = "{size}GB"; +"MediaSizeGB_other" = "{size}GB"; "InviteBlockedTitle" = "Invite via Link"; "InviteBlockedOneMessage" = "This user restricts adding them to groups. You can send an invite link in a private message instead."; "InviteBlockedManyMessage" = "Some users restrict adding them to groups. You can send these people an invite link in a private message instead."; diff --git a/src/components/common/AnimatedFileSize.tsx b/src/components/common/AnimatedFileSize.tsx new file mode 100644 index 000000000..fe1fd4486 --- /dev/null +++ b/src/components/common/AnimatedFileSize.tsx @@ -0,0 +1,87 @@ +import { memo, useEffect, useRef, useState, useUnmountCleanup } from '@teact'; + +import { formatFileSize } from '../../util/textFormat'; + +import useLang from '../../hooks/useLang'; +import useLastCallback from '../../hooks/useLastCallback'; + +type OwnProps = { + className?: string; + size: number; + progress?: number; +}; + +const SKIP_AFTER = 1024 * 1024 * 1024; // 1GB +const MIN_SIZE_INCREASE = 256 * 1024; // 256KB +const UPDATES_PER_SECOND = 10; + +const AnimatedFileSize = ({ + className, + size, + progress, +}: OwnProps) => { + const lang = useLang(); + const [currentSize, setCurrentSize] = useState(0); + const timerRef = useRef(); + + const resetAnimation = useLastCallback(() => { + clearTimeout(timerRef.current); + timerRef.current = undefined; + }); + + const animateSize = useLastCallback(() => { + if (progress === undefined) return; + + const currentTarget = size * progress; + const diff = currentTarget - currentSize; + + if (diff !== 0) { + const increase = Math.max(MIN_SIZE_INCREASE, diff / UPDATES_PER_SECOND); + const newSize = Math.min(currentTarget, currentSize + increase); + setCurrentSize(newSize); + } + + timerRef.current = window.setTimeout(() => { + animateSize(); + }, 1000 / UPDATES_PER_SECOND); + }); + + useEffect(() => { + if (progress === undefined) { + resetAnimation(); + setCurrentSize(0); + return; + }; + + const currentProgress = size * progress; + if (currentProgress > SKIP_AFTER || progress === 1) { + resetAnimation(); + setCurrentSize(currentProgress); + return; + } + + if (timerRef.current) return; + + animateSize(); + }, [progress, size]); + + useUnmountCleanup(resetAnimation); + + const currentSizeString = formatFileSize(lang, currentSize); + const totalSizeString = formatFileSize(lang, size); + + if (progress === undefined || progress === 1) { + return totalSizeString; + } + + return ( + + {lang('FileTransferProgress', { + currentSize: currentSizeString, + totalSize: totalSizeString, + }, { withNodes: true })} + + ); +}; + +export default memo(AnimatedFileSize); diff --git a/src/components/common/Audio.tsx b/src/components/common/Audio.tsx index 73400d650..42446b11d 100644 --- a/src/components/common/Audio.tsx +++ b/src/components/common/Audio.tsx @@ -32,7 +32,6 @@ import { captureEvents } from '../../util/captureEvents'; import { formatMediaDateTime, formatMediaDuration, formatPastTimeShort } from '../../util/dates/dateFormat'; import { decodeWaveform, interpolateArray } from '../../util/waveform'; import { LOCAL_TGS_URLS } from './helpers/animatedAssets'; -import { getFileSizeString } from './helpers/documentInfo'; import renderText from './helpers/renderText'; import { MAX_EMPTY_WAVEFORM_POINTS, renderWaveform } from './helpers/waveform'; @@ -49,6 +48,7 @@ import useShowTransitionDeprecated from '../../hooks/useShowTransitionDeprecated import Button from '../ui/Button'; import Link from '../ui/Link'; import ProgressSpinner from '../ui/ProgressSpinner'; +import AnimatedFileSize from './AnimatedFileSize'; import AnimatedIcon from './AnimatedIcon'; import Icon from './icons/Icon'; @@ -540,10 +540,7 @@ function renderAudio( )} {!showSeekline && showProgress && ( -
- {progress ? `${getFileSizeString(audio.size * progress)} / ` : undefined} - {getFileSizeString(audio.size)} -
+ )} {!showSeekline && !showProgress && (
diff --git a/src/components/common/File.tsx b/src/components/common/File.tsx index 71e40c2e5..ff68d4abc 100644 --- a/src/components/common/File.tsx +++ b/src/components/common/File.tsx @@ -1,6 +1,6 @@ -import type { ElementRef, FC } from '../../lib/teact/teact'; +import type { ElementRef } from '../../lib/teact/teact'; import { - memo, useMemo, useRef, useState, + memo, useRef, useState, } from '../../lib/teact/teact'; import type { IconName } from '../../types/icons'; @@ -8,7 +8,7 @@ import type { IconName } from '../../types/icons'; import { IS_CANVAS_FILTER_SUPPORTED } from '../../util/browser/windowEnvironment'; import buildClassName from '../../util/buildClassName'; import { formatMediaDateTime, formatPastTimeShort } from '../../util/dates/dateFormat'; -import { getColorFromExtension, getFileSizeString } from './helpers/documentInfo'; +import { getColorFromExtension } from './helpers/documentInfo'; import { getDocumentThumbnailDimensions } from './helpers/mediaDimensions'; import renderText from './helpers/renderText'; @@ -21,6 +21,7 @@ import useShowTransitionDeprecated from '../../hooks/useShowTransitionDeprecated import Link from '../ui/Link'; import ProgressSpinner from '../ui/ProgressSpinner'; +import AnimatedFileSize from './AnimatedFileSize'; import Icon from './icons/Icon'; import './File.scss'; @@ -47,7 +48,7 @@ type OwnProps = { onDateClick?: (e: React.MouseEvent) => void; }; -const File: FC = ({ +const File = ({ ref, id, name, @@ -67,7 +68,7 @@ const File: FC = ({ actionIcon, onClick, onDateClick, -}) => { +}: OwnProps) => { const oldLang = useOldLang(); const lang = useLang(); let elementRef = useRef(); @@ -87,11 +88,6 @@ const File: FC = ({ } = useShowTransitionDeprecated(isTransferring, undefined, true); const color = getColorFromExtension(extension); - const sizeString = getFileSizeString(size); - const subtitle = useMemo(() => { - if (!isTransferring || !transferProgress) return sizeString; - return `${getFileSizeString(size * transferProgress)} / ${sizeString}`; - }, [isTransferring, size, sizeString, transferProgress]); const { width, height } = getDocumentThumbnailDimensions(smaller); @@ -154,9 +150,7 @@ const File: FC = ({
{renderText(name)}
- - {subtitle} - + {sender && {renderText(sender)}} {!sender && Boolean(timestamp) && ( <> diff --git a/src/components/common/helpers/documentInfo.ts b/src/components/common/helpers/documentInfo.ts index 99197ecf7..b5d4f0179 100644 --- a/src/components/common/helpers/documentInfo.ts +++ b/src/components/common/helpers/documentInfo.ts @@ -1,18 +1,5 @@ import type { ApiDocument } from '../../../api/types'; -const ONE_GIGABYTE = 1024 * 1024 * 1024; -const ONE_MEGABYTE = 1024 * 1024; - -export function getFileSizeString(bytes: number) { - if (bytes > (ONE_GIGABYTE / 2)) { - return `${(bytes / ONE_GIGABYTE).toFixed(1)} GB`; - } - if (bytes > (ONE_MEGABYTE / 2)) { - return `${(bytes / ONE_MEGABYTE).toFixed(1)} MB`; - } - return `${(bytes / (1024)).toFixed(1)} KB`; -} - export function getDocumentExtension(document: ApiDocument) { const { fileName, mimeType } = document; diff --git a/src/components/left/settings/SettingsDataStorage.tsx b/src/components/left/settings/SettingsDataStorage.tsx index 5f36cfab7..b8b4af6be 100644 --- a/src/components/left/settings/SettingsDataStorage.tsx +++ b/src/components/left/settings/SettingsDataStorage.tsx @@ -61,8 +61,9 @@ const SettingsDataStorage: FC = ({ }); const renderFileSizeCallback = useCallback((value: number) => { + const size = AUTODOWNLOAD_FILESIZE_MB_LIMITS[value]; return lang('AutodownloadSizeLimitUpTo', { - limit: lang('FileSizeMB', { count: AUTODOWNLOAD_FILESIZE_MB_LIMITS[value] }), + limit: lang('MediaSizeMB', { size }, { pluralValue: size }), }); }, [lang]); diff --git a/src/components/main/premium/common/PremiumLimitReachedModal.tsx b/src/components/main/premium/common/PremiumLimitReachedModal.tsx index 31d14dc7f..b57a42f4f 100644 --- a/src/components/main/premium/common/PremiumLimitReachedModal.tsx +++ b/src/components/main/premium/common/PremiumLimitReachedModal.tsx @@ -9,10 +9,12 @@ import type { IconName } from '../../../../types/icons'; import { MAX_UPLOAD_FILEPART_SIZE } from '../../../../config'; import { selectIsCurrentUserPremium, selectIsPremiumPurchaseBlocked } from '../../../../global/selectors'; import buildClassName from '../../../../util/buildClassName'; +import { type LangFn } from '../../../../util/localization'; import { formatFileSize } from '../../../../util/textFormat'; import renderText from '../../../common/helpers/renderText'; import useFlag from '../../../../hooks/useFlag'; +import useLang from '../../../../hooks/useLang'; import useOldLang from '../../../../hooks/useOldLang'; import Icon from '../../../common/icons/Icon'; @@ -71,16 +73,17 @@ const LIMIT_ICON: Record = { }; const LIMIT_VALUE_FORMATTER: Partial string>> = { - uploadMaxFileparts: (lang: OldLangFn, value: number) => { + uploadMaxFileparts: (lang: LangFn, value: number) => { // The real size is not exactly 4gb, so we need to round it - if (value === 8000) return lang('FileSize.GB', '4'); - if (value === 4000) return lang('FileSize.GB', '2'); + if (value === 8000) return lang('MediaSizeGB', { size: 4 }, { pluralValue: 4 }); + if (value === 4000) return lang('MediaSizeGB', { size: 2 }, { pluralValue: 2 }); return formatFileSize(lang, value * MAX_UPLOAD_FILEPART_SIZE); }, }; function getLimiterDescription({ lang, + oldLang, limitType, isPremium, canBuyPremium, @@ -88,7 +91,8 @@ function getLimiterDescription({ premiumValue, valueFormatter, }: { - lang: OldLangFn; + lang: LangFn; + oldLang: OldLangFn; limitType?: ApiLimitTypeWithModal; isPremium?: boolean; canBuyPremium?: boolean; @@ -104,13 +108,13 @@ function getLimiterDescription({ const premiumValueFormatted = valueFormatter ? valueFormatter(lang, premiumValue) : premiumValue; if (isPremium) { - return lang(LIMIT_DESCRIPTION_PREMIUM[limitType], premiumValueFormatted); + return oldLang(LIMIT_DESCRIPTION_PREMIUM[limitType], premiumValueFormatted); } return canBuyPremium - ? lang(LIMIT_DESCRIPTION[limitType], + ? oldLang(LIMIT_DESCRIPTION[limitType], limitType === 'channelsPublic' ? premiumValueFormatted : [defaultValueFormatted, premiumValueFormatted]) - : lang(LIMIT_DESCRIPTION_BLOCKED[limitType], defaultValueFormatted); + : oldLang(LIMIT_DESCRIPTION_BLOCKED[limitType], defaultValueFormatted); } export type OwnProps = { @@ -132,7 +136,8 @@ const PremiumLimitReachedModal: FC = ({ canBuyPremium, }) => { const { closeLimitReachedModal, openPremiumModal } = getActions(); - const lang = useOldLang(); + const lang = useLang(); + const oldLang = useOldLang(); const [isClosing, startClosing, stopClosing] = useFlag(); @@ -149,6 +154,7 @@ const PremiumLimitReachedModal: FC = ({ const valueFormatter = limit && LIMIT_VALUE_FORMATTER[limit]; const description = getLimiterDescription({ lang, + oldLang, limitType: limit, isPremium, canBuyPremium, diff --git a/src/components/mediaViewer/VideoPlayerControls.tsx b/src/components/mediaViewer/VideoPlayerControls.tsx index d93e974e9..239ad0fb0 100644 --- a/src/components/mediaViewer/VideoPlayerControls.tsx +++ b/src/components/mediaViewer/VideoPlayerControls.tsx @@ -14,7 +14,6 @@ import type { IconName } from '../../types/icons'; import { IS_IOS, IS_TOUCH_ENV } from '../../util/browser/windowEnvironment'; import buildClassName from '../../util/buildClassName'; import { formatMediaDuration } from '../../util/dates/dateFormat'; -import { formatFileSize } from '../../util/textFormat'; import useAppLayout from '../../hooks/useAppLayout'; import useCurrentTimeSignal from '../../hooks/useCurrentTimeSignal'; @@ -24,6 +23,7 @@ import useLastCallback from '../../hooks/useLastCallback'; import useOldLang from '../../hooks/useOldLang'; import useControlsSignal from './hooks/useControlsSignal'; +import AnimatedFileSize from '../common/AnimatedFileSize'; import Icon from '../common/icons/Icon'; import Button from '../ui/Button'; import Menu from '../ui/Menu'; @@ -209,7 +209,7 @@ const VideoPlayerControls: FC = ({ {renderTime(currentTime, duration)} {!isBuffered && (
- {`${formatFileSize(lang, fileSize * bufferedProgress)} / ${formatFileSize(lang, fileSize)}`} +
)}
diff --git a/src/global/helpers/messageMedia.ts b/src/global/helpers/messageMedia.ts index c8d0931fa..c218e927b 100644 --- a/src/global/helpers/messageMedia.ts +++ b/src/global/helpers/messageMedia.ts @@ -458,7 +458,7 @@ export function getMediaTransferState( progress?: number, isLoadNeeded = false, isUploading = false, ) { const isTransferring = isUploading || isLoadNeeded; - const transferProgress = Number(progress); + const transferProgress = progress || 0; return { isUploading, isTransferring, transferProgress, diff --git a/src/hooks/useMediaWithLoadProgress.ts b/src/hooks/useMediaWithLoadProgress.ts index 8ab27b567..f11377e70 100644 --- a/src/hooks/useMediaWithLoadProgress.ts +++ b/src/hooks/useMediaWithLoadProgress.ts @@ -1,5 +1,5 @@ import { - useEffect, useMemo, useRef, useState, + useEffect, useRef, useState, } from '../lib/teact/teact'; import { ApiMediaFormat } from '../api/types'; @@ -7,9 +7,9 @@ import { ApiMediaFormat } from '../api/types'; import { selectIsSynced } from '../global/selectors'; import { IS_PROGRESSIVE_SUPPORTED } from '../util/browser/windowEnvironment'; import * as mediaLoader from '../util/mediaLoader'; -import { throttle } from '../util/schedulers'; import useSelector from './data/useSelector'; import useForceUpdate from './useForceUpdate'; +import useThrottledCallback from './useThrottledCallback'; import useUniqueId from './useUniqueId'; const STREAMING_PROGRESS = 0.75; @@ -34,13 +34,11 @@ export default function useMediaWithLoadProgress( const [loadProgress, setLoadProgress] = useState(mediaData && !isStreaming ? 1 : 0); const startedAtRef = useRef(); - const handleProgress = useMemo(() => { - return throttle((progress: number) => { - if (startedAtRef.current && (!delay || (Date.now() - startedAtRef.current > delay))) { - setLoadProgress(progress); - } - }, PROGRESS_THROTTLE, true); - }, [delay]); + const handleProgress = useThrottledCallback((progress: number) => { + if (startedAtRef.current && id && (!delay || (Date.now() - startedAtRef.current > delay))) { + setLoadProgress(progress); + } + }, [delay, id], PROGRESS_THROTTLE, true); useEffect(() => { if (!noLoad && mediaHash) { diff --git a/src/lib/gramjs/client/downloadFile.ts b/src/lib/gramjs/client/downloadFile.ts index ba79c16e2..9799a9059 100644 --- a/src/lib/gramjs/client/downloadFile.ts +++ b/src/lib/gramjs/client/downloadFile.ts @@ -1,8 +1,8 @@ import type TelegramClient from './TelegramClient'; import type { SizeType } from './TelegramClient'; +import { getDcBandwidthManager } from '../../../util/dcBandwithManager'; import Deferred from '../../../util/Deferred'; -import { Foreman } from '../../../util/foreman'; import { FloodPremiumWaitError, FloodWaitError, RPCError } from '../errors'; import Api from '../tl/api'; @@ -41,8 +41,6 @@ const DEFAULT_CHUNK_SIZE = 64; // kb const ONE_MB = 1024 * 1024; const DISCONNECT_SLEEP = 1000; -const NEW_CONNECTION_QUEUE_THRESHOLD = 5; - // when the sender requests hangs for 60 second we will reimport const SENDER_TIMEOUT = 60 * 1000; // Telegram may have server issues so we try several times @@ -82,14 +80,23 @@ class FileView { write(data: Uint8Array, offset: number) { if (this.type === 'opfs') { this.largeFileAccessHandle!.write(data, { at: offset }); - } else if (this.size) { - for (let i = 0; i < data.length; i++) { - if (offset + i >= this.buffer!.length) return; - this.buffer!.writeUInt8(data[i], offset + i); - } - } else { - this.buffer = Buffer.concat([this.buffer!, data]); + return; } + + if (this.size) { + const endOffset = offset + data.length; + if (endOffset > this.buffer!.length) { // Slow path for potential overflow + if (offset >= this.buffer!.length) return; // Ignore writes past the end + const writeLength = this.buffer!.length - offset; + this.buffer!.set(data.subarray(0, writeLength), offset); + return; + } + + this.buffer!.set(data, offset); + return; + } + + this.buffer = Buffer.concat([this.buffer!, data]); } async getData(): Promise | File> { @@ -126,14 +133,6 @@ export async function downloadFile( return undefined; } -const MAX_CONCURRENT_CONNECTIONS = 3; -const MAX_CONCURRENT_CONNECTIONS_PREMIUM = 6; -const MAX_WORKERS_PER_CONNECTION = 10; -const MULTIPLE_CONNECTIONS_MIN_FILE_SIZE = 10485760; // 10MB - -const foremans = Array(MAX_CONCURRENT_CONNECTIONS_PREMIUM).fill(undefined) - .map(() => new Foreman(MAX_WORKERS_PER_CONNECTION)); - async function downloadFile2( client: TelegramClient, inputLocation: Api.TypeInputFileLocation, @@ -170,9 +169,6 @@ async function downloadFile2( const partSize = partSizeKb * 1024; const partsCount = rangeSize ? Math.ceil(rangeSize / partSize) : 1; const noParallel = !end; - const shouldUseMultipleConnections = Boolean(fileSize) - && fileSize >= MULTIPLE_CONNECTIONS_MIN_FILE_SIZE - && !noParallel; let deferred: Deferred | undefined; if (partSize % MIN_CHUNK_SIZE !== 0) { @@ -188,9 +184,7 @@ async function downloadFile2( let hasEnded = false; let progress = 0; - if (progressCallback) { - progressCallback(progress); - } + progressCallback?.(progress); // Limit updates to one per file let isPremiumFloodWaitSent = false; @@ -198,6 +192,8 @@ async function downloadFile2( // Allocate memory await fileView.init(); + const dcManager = getDcBandwidthManager(dcId, isPremium); + while (true) { let limit = partSize; let isPrecise = false; @@ -211,23 +207,28 @@ async function downloadFile2( isPrecise = true; } - const senderIndex = getFreeForemanIndex(isPremium, shouldUseMultipleConnections); - - await foremans[senderIndex].requestWorker(isPriority); - if (deferred) await deferred.promise; if (noParallel) deferred = new Deferred(); if (hasEnded) { - foremans[senderIndex].releaseWorker(); break; } - const logWithSenderIndex = (...args: any[]) => { + + const senderIndex = await dcManager.requestWorker(Boolean(isPriority), limit); + const logWithSenderIndex = (...args: unknown[]) => { logWithId(`[${senderIndex}/${dcId}]`, ...args); }; - promises.push((async (offsetMemo: number) => { + // Check again after waiting for capacity + if (progressCallback?.isCanceled) { + hasEnded = true; + dcManager.releaseWorker(senderIndex, limit); + deferred?.resolve(); + break; + } + + promises.push((async (offsetMemo: number, limitMemo: number, senderIndexMemo: number) => { while (true) { let sender; try { @@ -238,7 +239,7 @@ async function downloadFile2( logWithSenderIndex(`❗️️ getSender took too long ${offsetMemo}`); }, 8000); } - sender = await client.getSender(dcId, senderIndex, isPremium); + sender = await client.getSender(dcId, senderIndexMemo, isPremium); isDone = true; let isDone2 = false; @@ -253,7 +254,7 @@ async function downloadFile2( sender.send(new Api.upload.GetFile({ location: inputLocation, offset: BigInt(offsetMemo), - limit, + limit: limitMemo, precise: isPrecise || undefined, })), sleep(SENDER_TIMEOUT).then(() => { @@ -284,11 +285,11 @@ async function downloadFile2( progressCallback(progress); } - if (!end && (result.bytes.length < limit)) { + if (!end && (result.bytes.length < limitMemo)) { hasEnded = true; } - foremans[senderIndex].releaseWorker(); + dcManager.releaseWorker(senderIndexMemo, limitMemo); if (deferred) deferred.resolve(); fileView.write(result.bytes, offsetMemo - start); @@ -308,7 +309,7 @@ async function downloadFile2( } logWithSenderIndex(`Ended not gracefully ${offsetMemo}`); - foremans[senderIndex].releaseWorker(); + dcManager.releaseWorker(senderIndexMemo, limitMemo); if (deferred) deferred.resolve(); hasEnded = true; @@ -316,7 +317,7 @@ async function downloadFile2( throw err; } } - })(offset)); + })(offset, limit, senderIndex)); offset += limit; @@ -327,27 +328,3 @@ async function downloadFile2( await Promise.all(promises); return fileView.getData(); } - -function getFreeForemanIndex(isPremium: boolean, forceNewConnection?: boolean) { - const availableConnections = isPremium ? MAX_CONCURRENT_CONNECTIONS_PREMIUM : MAX_CONCURRENT_CONNECTIONS; - let foremanIndex = 0; - let minQueueLength = Infinity; - for (let i = 0; i < availableConnections; i++) { - const foreman = foremans[i]; - // If worker is free, return it - if (!foreman.queueLength) return i; - - // Potentially create a new connection if the current queue is too long - if (!forceNewConnection && foreman.queueLength <= NEW_CONNECTION_QUEUE_THRESHOLD) { - return i; - } - - // If every connection is equally busy, prefer the last one in the list - if (foreman.queueLength <= minQueueLength) { - foremanIndex = i; - minQueueLength = foreman.activeWorkers; - } - } - - return foremanIndex; -} diff --git a/src/lib/gramjs/client/uploadFile.ts b/src/lib/gramjs/client/uploadFile.ts index f4eb49ec5..2e64b75d7 100644 --- a/src/lib/gramjs/client/uploadFile.ts +++ b/src/lib/gramjs/client/uploadFile.ts @@ -1,6 +1,6 @@ import type TelegramClient from './TelegramClient'; -import { Foreman } from '../../../util/foreman'; +import { getDcBandwidthManager } from '../../../util/dcBandwithManager'; import { FloodPremiumWaitError, FloodWaitError } from '../errors'; import Api from '../tl/api'; @@ -24,12 +24,6 @@ export interface UploadFileParams { const KB_TO_BYTES = 1024; const LARGE_FILE_THRESHOLD = 10 * 1024 * 1024; const DISCONNECT_SLEEP = 1000; -const MAX_CONCURRENT_CONNECTIONS = 3; -const MAX_CONCURRENT_CONNECTIONS_PREMIUM = 6; -const MAX_WORKERS_PER_CONNECTION = 10; - -const foremans = Array(MAX_CONCURRENT_CONNECTIONS_PREMIUM).fill(undefined) - .map(() => new Foreman(MAX_WORKERS_PER_CONNECTION)); export async function uploadFile( client: TelegramClient, @@ -54,11 +48,7 @@ export async function uploadFile( const partSize = getUploadPartSize(size) * KB_TO_BYTES; const partCount = Math.floor((size + partSize - 1) / partSize); - // Pick the least busy foreman - // For some reason, fresh connections give out a higher speed for the first couple of seconds - // I have no idea why, but this may speed up the download of small files - const activeCounts = foremans.map(({ activeWorkers }) => activeWorkers); - let currentForemanIndex = activeCounts.indexOf(Math.min(...activeCounts)); + const dcManager = getDcBandwidthManager(client.session.dcId, isPremium); let progress = 0; if (onProgress) { @@ -71,14 +61,10 @@ export async function uploadFile( const promises: Promise[] = []; for (let i = 0; i < partCount; i++) { - const senderIndex = currentForemanIndex % ( - isPremium ? MAX_CONCURRENT_CONNECTIONS_PREMIUM : MAX_CONCURRENT_CONNECTIONS - ); - - await foremans[senderIndex].requestWorker(); + const senderIndex = await dcManager.requestWorker(false, partSize); if (onProgress?.isCanceled) { - foremans[senderIndex].releaseWorker(); + dcManager.releaseWorker(senderIndex, partSize); break; } @@ -140,13 +126,13 @@ export async function uploadFile( await sleep(err.seconds * 1000); continue; } - foremans[senderIndex].releaseWorker(); + dcManager.releaseWorker(senderIndex, partSize); if (sender) client.releaseExportedSender(sender); throw err; } - foremans[senderIndex].releaseWorker(); + dcManager.releaseWorker(senderIndex, partSize); if (onProgress) { if (onProgress.isCanceled) { @@ -160,8 +146,6 @@ export async function uploadFile( break; } })(i, blobSlice)); - - currentForemanIndex++; } await Promise.all(promises); diff --git a/src/lib/gramjs/crypto/converters.ts b/src/lib/gramjs/crypto/converters.ts deleted file mode 100644 index 895755a6f..000000000 --- a/src/lib/gramjs/crypto/converters.ts +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Uint32Array -> ArrayBuffer (low-endian os) - */ -export function i2abLow(buf: Uint32Array): ArrayBuffer { - const uint8 = new Uint8Array(buf.length * 4); - let i = 0; - - for (let j = 0; j < buf.length; j++) { - const int = buf[j]; - - uint8[i++] = int >>> 24; - uint8[i++] = (int >> 16) & 0xFF; - uint8[i++] = (int >> 8) & 0xFF; - uint8[i++] = int & 0xFF; - } - - return uint8.buffer; -} - -/** - * Uint32Array -> ArrayBuffer (big-endian os) - */ -export function i2abBig(buf: Uint32Array): ArrayBuffer { - return buf.buffer; -} - -/** - * ArrayBuffer -> Uint32Array (low-endian os) - */ -export function ab2iLow(ab: ArrayBuffer | Uint8Array): Uint32Array { - const uint8 = new Uint8Array(ab); - const buf = new Uint32Array(uint8.length / 4); - - for (let i = 0; i < uint8.length; i += 4) { - buf[i / 4] = ( - uint8[i] << 24 - ^ uint8[i + 1] << 16 - ^ uint8[i + 2] << 8 - ^ uint8[i + 3] - ); - } - - return buf; -} - -/** - * ArrayBuffer -> Uint32Array (big-endian os) - */ -export function ab2iBig(ab: ArrayBuffer | Uint8Array): Uint32Array { - return new Uint32Array(ab); -} - -export const isBigEndian = new Uint8Array(new Uint32Array([0x01020304]))[0] === 0x01; -export const i2ab = isBigEndian ? i2abBig : i2abLow; -export const ab2i = isBigEndian ? ab2iBig : ab2iLow; diff --git a/src/lib/gramjs/crypto/crypto.ts b/src/lib/gramjs/crypto/crypto.ts index a25c3e51b..e74ed3c77 100644 --- a/src/lib/gramjs/crypto/crypto.ts +++ b/src/lib/gramjs/crypto/crypto.ts @@ -1,21 +1,18 @@ import AES from '@cryptography/aes'; -import { ab2i, i2ab } from './converters'; -import { getWords } from './words'; - class Counter { - _counter: Buffer; + public counter: Buffer; constructor(initialValue: Buffer) { - this._counter = Buffer.from(initialValue); + this.counter = Buffer.from(initialValue); } increment() { for (let i = 15; i >= 0; i--) { - if (this._counter[i] === 255) { - this._counter[i] = 0; + if (this.counter[i] === 255) { + this.counter[i] = 0; } else { - this._counter[i]++; + this.counter[i]++; break; } } @@ -25,9 +22,9 @@ class Counter { class CTR { private _counter: Counter; - private _remainingCounter?: Buffer; + private _carryBlock: Buffer | undefined; - private _remainingCounterIndex: number; + private _carryOffset: number; private _aes: AES; @@ -38,37 +35,72 @@ class CTR { this._counter = counter; - this._remainingCounter = undefined; - this._remainingCounterIndex = 16; + this._carryBlock = undefined; + this._carryOffset = 0; - this._aes = new AES(getWords(key)); + this._aes = new AES(key); } update(plainText: Buffer) { return this.encrypt(plainText); } - encrypt(plainText: Buffer) { - const encrypted = Buffer.from(plainText); + encrypt(plain: Buffer): Buffer { + const aes = this._aes; + const ctr = this._counter; - for (let i = 0; i < encrypted.length; i++) { - if (this._remainingCounterIndex === 16) { - this._remainingCounter = Buffer.from( - i2ab( - this._aes.encrypt( - ab2i(this._counter._counter), - ) as Uint32Array, - ), - ); - this._remainingCounterIndex = 0; - this._counter.increment(); + const src = plain; + const n = src.length; + + const dst = Buffer.allocUnsafe(n); + + let pos = 0; + + // 1) Consume any carried keystream from the previous call + if (this._carryBlock) { + const take = Math.min(16 - this._carryOffset, n); + for (let j = 0; j < take; j++) { + dst[pos + j] = src[pos + j] ^ this._carryBlock[this._carryOffset + j]; } - if (this._remainingCounter) { - encrypted[i] ^= this._remainingCounter[this._remainingCounterIndex++]; + pos += take; + this._carryOffset += take; + + if (this._carryOffset === 16) { + this._carryBlock = undefined; + this._carryOffset = 0; } } - return encrypted; + // Temporary keystream block for this call + const keystream = Buffer.allocUnsafe(16); + + // 2) Full 16-byte blocks + while (pos + 16 <= n) { + const words = aes.encrypt(ctr.counter); + writeU32WordsBE(words, keystream); + ctr.increment(); + + for (let j = 0; j < 16; j++) { + dst[pos + j] = src[pos + j] ^ keystream[j]; + } + pos += 16; + } + + // 3) Tail (<16 bytes) — store carryover for next call + if (pos < n) { + const words = aes.encrypt(ctr.counter); + writeU32WordsBE(words, keystream); + ctr.increment(); + + let used = 0; + for (; pos < n; pos++, used++) { + dst[pos] = src[pos] ^ keystream[used]; + } + this._carryBlock = keystream; + this._carryOffset = used; + } + + return dst; } } @@ -100,7 +132,7 @@ export function randomBytes(count: number) { class Hash { private data = new Uint8Array(0); - constructor(private algorithm: 'sha1' | 'sha256') {} + constructor(private algorithm: 'sha1' | 'sha256') { } update(data: ArrayLike) { // We shouldn't be needing new Uint8Array but it doesn't @@ -130,3 +162,9 @@ export async function pbkdf2(password: Buffer, salt: Buffer { 'AutodownloadSizeLimitUpTo': { 'limit': V; }; - 'FileSizeMB': { - 'count': V; - }; 'WebAppAddToAttachmentText': { 'bot': V; }; @@ -1872,9 +1869,6 @@ export interface LangPairWithVariables { 'AddContactSharedContactExceptionInfo': { 'user': V; }; - 'FileSizeGB': { - 'count': V; - }; 'SubscribeToPremium': { 'price': V; }; @@ -2029,12 +2023,6 @@ export interface LangPairWithVariables { 'count': V; 'total': V; }; - 'FileSizeB': { - 'count': V; - }; - 'FileSizeKB': { - 'count': V; - }; 'MessageTimerShortHours': { 'count': V; }; @@ -2994,6 +2982,10 @@ export interface LangPairWithVariables { 'ActionStarGiftPrepaidUpgraded': { 'user': V; }; + 'FileTransferProgress': { + 'currentSize': V; + 'totalSize': V; + }; 'InviteRestrictedPremiumReasonSingle': { 'user': V; }; @@ -3349,6 +3341,18 @@ export interface LangPairPluralWithVariables { 'points': V; 'link': V; }; + 'MediaSizeB': { + 'size': V; + }; + 'MediaSizeKB': { + 'size': V; + }; + 'MediaSizeMB': { + 'size': V; + }; + 'MediaSizeGB': { + 'size': V; + }; 'InviteRestrictedUsers': { 'count': V; }; diff --git a/src/util/dcBandwithManager.ts b/src/util/dcBandwithManager.ts new file mode 100644 index 000000000..8ed721cca --- /dev/null +++ b/src/util/dcBandwithManager.ts @@ -0,0 +1,191 @@ +import { DEBUG } from '../config'; + +import Deferred from './Deferred'; + +const MAX_CONCURRENT_CONNECTIONS = 3; +const MAX_CONCURRENT_CONNECTIONS_PREMIUM = 6; + +const MAX_ACTIVE_REQUEST_SIZE = 9 * 1024 * 1024; +const MAX_ACTIVE_REQUEST_SIZE_PREMIUM = 20 * 1024 * 1024; + +const FOREMAN_MAX_HEAP_SIZE = MAX_ACTIVE_REQUEST_SIZE_PREMIUM / MAX_CONCURRENT_CONNECTIONS_PREMIUM; + +interface QueuedRequest { + deferred: Deferred; + requestSize: number; +} + +const dcManagers: Record = {}; + +export function getDcBandwidthManager(dcId: number, isPremium: boolean): DcBandwidthManager { + if (!dcManagers[dcId]) { + dcManagers[dcId] = new DcBandwidthManager(); + } + const dcManager = dcManagers[dcId]; + dcManager.updateIsPremium(isPremium); + return dcManager; +} + +if (DEBUG) { + (globalThis as any).getDcManagers = () => dcManagers; +} + +class Foreman { + private queuedRequests: QueuedRequest[] = []; + private priorityQueuedRequests: QueuedRequest[] = []; + + activeRequestHeapSize = 0; + queuedRequestHeapSize = 0; + + constructor(private maxRequestHeapSize: number) { } + + requestWorker(requestSize: number, isPriority?: boolean) { + if (this.activeRequestHeapSize + requestSize > this.maxRequestHeapSize) { + const deferred = new Deferred(); + const queuedRequest = { deferred, requestSize }; + if (isPriority) { + this.priorityQueuedRequests.push(queuedRequest); + } else { + this.queuedRequests.push(queuedRequest); + } + this.queuedRequestHeapSize += requestSize; + return deferred.promise; + } + + this.activeRequestHeapSize += requestSize; + return Promise.resolve(); + } + + releaseWorker(requestSize: number) { + this.activeRequestHeapSize -= requestSize; + + // Try to process queued requests + while (this.queueLength > 0) { + const queuedRequest = this.priorityQueuedRequests[0] || this.queuedRequests[0]; + if (!queuedRequest) break; + + // Check if we can process the next queued request + if (this.activeRequestHeapSize + queuedRequest.requestSize <= this.maxRequestHeapSize) { + const request = (this.priorityQueuedRequests.shift() || this.queuedRequests.shift())!; + this.queuedRequestHeapSize -= request.requestSize; + this.activeRequestHeapSize += request.requestSize; + request.deferred.resolve(); + } else { + break; + } + } + } + + canAccept(requestSize: number) { + return this.activeRequestHeapSize + requestSize <= this.maxRequestHeapSize; + } + + get queueLength() { + return this.queuedRequests.length + this.priorityQueuedRequests.length; + } +} + +class DcBandwidthManager { + private foremans: Foreman[] = []; + + private maxConnections: number = MAX_CONCURRENT_CONNECTIONS; + private maxActiveSize: number = MAX_ACTIVE_REQUEST_SIZE; + + private queuedRequests: QueuedRequest[] = []; + + private priorityQueuedRequests: QueuedRequest[] = []; + + activeRequestSize = 0; + + constructor() { + const maxForemans = Math.max(MAX_CONCURRENT_CONNECTIONS, MAX_CONCURRENT_CONNECTIONS_PREMIUM); + + this.foremans = Array(maxForemans) + .fill(undefined) + .map(() => new Foreman(FOREMAN_MAX_HEAP_SIZE)); + } + + updateIsPremium(isPremium: boolean) { + this.maxConnections = isPremium ? MAX_CONCURRENT_CONNECTIONS_PREMIUM : MAX_CONCURRENT_CONNECTIONS; + this.maxActiveSize = isPremium ? MAX_ACTIVE_REQUEST_SIZE_PREMIUM : MAX_ACTIVE_REQUEST_SIZE; + } + + async requestWorker(isPriority: boolean, requestSize: number): Promise { + // Check if adding this request would exceed the dcId size limit + if (this.activeRequestSize + requestSize > this.maxActiveSize) { + const deferred = new Deferred(); + const queuedRequest = { deferred, requestSize }; + if (isPriority) { + this.priorityQueuedRequests.push(queuedRequest); + } else { + this.queuedRequests.push(queuedRequest); + } + await deferred.promise; + // After being dequeued, select and request a foreman + const foremanIndex = this.getFreeForemanIndex(requestSize); + const foreman = this.foremans[foremanIndex]; + await foreman.requestWorker(requestSize, isPriority); + return foremanIndex; + } + + const foremanIndex = this.getFreeForemanIndex(requestSize); + const foreman = this.foremans[foremanIndex]; + await foreman.requestWorker(requestSize, isPriority); + this.activeRequestSize += requestSize; + return foremanIndex; + } + + releaseWorker(foremanIndex: number, requestSize: number) { + this.activeRequestSize -= requestSize; + this.foremans[foremanIndex].releaseWorker(requestSize); + + // Try to process queued requests + this.processQueue(); + } + + private processQueue() { + while (true) { + const queuedRequest = this.priorityQueuedRequests[0] || this.queuedRequests[0]; + if (!queuedRequest) { + return; + } + + if (this.activeRequestSize + queuedRequest.requestSize > this.maxActiveSize) { + return; + } + + const request = (this.priorityQueuedRequests.shift() || this.queuedRequests.shift())!; + this.activeRequestSize += request.requestSize; + request.deferred.resolve(); + } + } + + private getFreeForemanIndex(requestSize: number): number { + let minTotalHeapSize = Infinity; + let minHeapForemanIndex = 0; + + for (let i = 0; i < this.maxConnections; i++) { + const foreman = this.foremans[i]; + if (foreman.canAccept(requestSize)) { + return i; // Prefer filling up free foremans to avoid unnecessary connections + } + + const totalHeapSize = foreman.activeRequestHeapSize + foreman.queuedRequestHeapSize; + if (totalHeapSize < minTotalHeapSize) { + minTotalHeapSize = totalHeapSize; + minHeapForemanIndex = i; + } + } + + // If every foreman is busy, use the one with the smallest total heap size + return minHeapForemanIndex; + } + + getForeman(index: number): Foreman { + return this.foremans[index]; + } + + get queueLength() { + return this.queuedRequests.length + this.priorityQueuedRequests.length; + } +} diff --git a/src/util/foreman.ts b/src/util/foreman.ts deleted file mode 100644 index 9733b9043..000000000 --- a/src/util/foreman.ts +++ /dev/null @@ -1,40 +0,0 @@ -import Deferred from './Deferred'; - -export class Foreman { - private deferreds: Deferred[] = []; - - private priorityDeferreds: Deferred[] = []; - - activeWorkers = 0; - - constructor(private maxWorkers: number) { - } - - requestWorker(isPriority?: boolean) { - if (this.activeWorkers === this.maxWorkers) { - const deferred = new Deferred(); - if (isPriority) { - this.priorityDeferreds.push(deferred); - } else { - this.deferreds.push(deferred); - } - return deferred.promise; - } - - this.activeWorkers++; - return Promise.resolve(); - } - - releaseWorker() { - if (this.queueLength) { - const deferred = (this.priorityDeferreds.shift() || this.deferreds.shift())!; - deferred.resolve(); - } else { - this.activeWorkers--; - } - } - - get queueLength() { - return this.deferreds.length + this.priorityDeferreds.length; - } -} diff --git a/src/util/mediaLoader.ts b/src/util/mediaLoader.ts index 66c989fa7..fce8300a1 100644 --- a/src/util/mediaLoader.ts +++ b/src/util/mediaLoader.ts @@ -104,6 +104,7 @@ export function cancelProgress(progressCallback: ApiOnProgress) { cancelApiProgress(parentCallback); cancellableCallbacks.delete(url); progressCallbacks.delete(url); + return; } }); }); @@ -185,7 +186,10 @@ function makeOnProgress(url: string) { const onProgress: ApiOnProgress = (progress: number) => { progressCallbacks.get(url)?.forEach((callback) => { callback(progress); - if (callback.isCanceled) onProgress.isCanceled = true; + if (callback.isCanceled) { + onProgress.isCanceled = true; + memoryCache.delete(url); + } }); }; diff --git a/src/util/textFormat.ts b/src/util/textFormat.ts index 10cbf6886..b66c6bdb3 100644 --- a/src/util/textFormat.ts +++ b/src/util/textFormat.ts @@ -1,4 +1,3 @@ -import type { OldLangFn } from '../hooks/useOldLang'; import type { LangFn } from './localization'; import EMOJI_REGEX from '../lib/twemojiRegex'; @@ -44,15 +43,16 @@ export const getFirstLetters = withCache((phrase: string, count = 2) => { .join(''); }); -const FILE_SIZE_UNITS = ['B', 'KB', 'MB', 'GB']; -export function formatFileSize(lang: OldLangFn, bytes: number, decimals = 1): string { +const FILE_SIZE_UNITS = ['B', 'KB', 'MB', 'GB'] as const; +export function formatFileSize(lang: LangFn, bytes: number, decimals = 1): string { if (bytes === 0) { - return lang('FileSize.B', 0); + return lang('MediaSizeB', { size: 0 }, { pluralValue: 0 }); } const k = 1024; const i = Math.floor(Math.log(bytes) / Math.log(k)); - const value = (bytes / (k ** i)).toFixed(Math.max(decimals, 0)); + const v = (bytes / (k ** i)); + const value = v.toFixed(Math.max(decimals, 0)); - return lang(`FileSize.${FILE_SIZE_UNITS[i]}`, value); + return lang(`MediaSize${FILE_SIZE_UNITS[i]}`, { size: value }, { pluralValue: v }); }