diff --git a/dev/log.html b/dev/log.html new file mode 100644 index 000000000..667c5738e --- /dev/null +++ b/dev/log.html @@ -0,0 +1,32 @@ + + + + + Logs Viewer + + + + + + diff --git a/src/api/gramjs/helpers.ts b/src/api/gramjs/helpers.ts index a2897818f..089a36924 100644 --- a/src/api/gramjs/helpers.ts +++ b/src/api/gramjs/helpers.ts @@ -6,6 +6,7 @@ const LOG_BACKGROUND = '#111111DD'; const LOG_PREFIX_COLOR = '#E4D00A'; const LOG_SUFFIX = { INVOKE: '#49DBF5', + BEACON: '#F549DB', RESPONSE: '#6887F7', CONNECTING: '#E4D00A', CONNECTED: '#26D907', diff --git a/src/api/gramjs/index.ts b/src/api/gramjs/index.ts index ba584a0cc..22f8207d1 100644 --- a/src/api/gramjs/index.ts +++ b/src/api/gramjs/index.ts @@ -5,4 +5,5 @@ export { handleMethodResponse, updateFullLocalDb, updateLocalDb, + setShouldEnableDebugLog, } from './worker/provider'; diff --git a/src/api/gramjs/methods/calls.ts b/src/api/gramjs/methods/calls.ts index ee3de8f02..bcdec0099 100644 --- a/src/api/gramjs/methods/calls.ts +++ b/src/api/gramjs/methods/calls.ts @@ -5,7 +5,7 @@ import type { } from '../../types'; import { Api as GramJs } from '../../../lib/gramjs'; -import { invokeRequest } from './client'; +import { invokeRequest, invokeRequestBeacon } from './client'; import { buildInputGroupCall, buildInputPeer, buildInputPhoneCall, generateRandomInt, } from '../gramjsBuilders'; @@ -151,13 +151,20 @@ export async function fetchGroupCallParticipants({ } export function leaveGroupCall({ - call, + call, isPageUnload, }: { - call: ApiGroupCall; + call: ApiGroupCall; isPageUnload?: boolean; }) { - return invokeRequest(new GramJs.phone.LeaveGroupCall({ + const request = new GramJs.phone.LeaveGroupCall({ call: buildInputGroupCall(call), - }), { + }); + + if (isPageUnload) { + invokeRequestBeacon(request); + return; + } + + invokeRequest(request, { shouldReturnTrue: true, }); } @@ -270,14 +277,21 @@ export async function getDhConfig() { } export function discardCall({ - call, isBusy, + call, isBusy, isPageUnload, }: { - call: ApiPhoneCall; isBusy?: boolean; + call: ApiPhoneCall; isBusy?: boolean; isPageUnload?: boolean; }) { - return invokeRequest(new GramJs.phone.DiscardCall({ + const request = new GramJs.phone.DiscardCall({ peer: buildInputPhoneCall(call), reason: isBusy ? new GramJs.PhoneCallDiscardReasonBusy() : new GramJs.PhoneCallDiscardReasonHangup(), - }), { + }); + + if (isPageUnload) { + invokeRequestBeacon(request); + return; + } + + invokeRequest(request, { shouldReturnTrue: true, }); } diff --git a/src/api/gramjs/methods/client.ts b/src/api/gramjs/methods/client.ts index 7022fab1c..90fadc93b 100644 --- a/src/api/gramjs/methods/client.ts +++ b/src/api/gramjs/methods/client.ts @@ -59,7 +59,8 @@ export async function init(_onUpdate: OnApiUpdate, initialArgs: ApiInitialArgs) const { userAgent, platform, sessionData, isTest, isMovSupported, isWebmSupported, maxBufferSize, webAuthToken, dcId, - mockScenario, + mockScenario, shouldForceHttpTransport, shouldAllowHttpTransport, + shouldDebugExportedSenders, } = initialArgs; const session = new sessions.CallbackSession(sessionData, onSessionUpdate); @@ -82,6 +83,9 @@ export async function init(_onUpdate: OnApiUpdate, initialArgs: ApiInitialArgs) appVersion: `${APP_VERSION} ${APP_CODE_NAME}`, useWSS: true, additionalDcsDisabled: IS_TEST, + shouldDebugExportedSenders, + shouldForceHttpTransport, + shouldAllowHttpTransport, testServers: isTest, dcId, } as any, @@ -292,6 +296,17 @@ export async function invokeRequest( } } +export function invokeRequestBeacon( + request: T, + dcId?: number, +) { + if (DEBUG) { + log('BEACON', request.className); + } + + client.invokeBeacon(request, dcId); +} + export async function downloadMedia( args: { url: string; mediaFormat: ApiMediaFormat; start?: number; end?: number; isHtmlAllowed?: boolean }, onProgress?: ApiOnProgress, @@ -455,3 +470,15 @@ export async function repairFileReference({ return false; } + +export function setForceHttpTransport(forceHttpTransport: boolean) { + client.setForceHttpTransport(forceHttpTransport); +} + +export function setAllowHttpTransport(allowHttpTransport: boolean) { + client.setAllowHttpTransport(allowHttpTransport); +} + +export function setShouldDebugExportedSenders(value: boolean) { + client.setShouldDebugExportedSenders(value); +} diff --git a/src/api/gramjs/methods/index.ts b/src/api/gramjs/methods/index.ts index 0963cc0da..678fa2f02 100644 --- a/src/api/gramjs/methods/index.ts +++ b/src/api/gramjs/methods/index.ts @@ -1,5 +1,6 @@ export { destroy, disconnect, downloadMedia, fetchCurrentUser, repairFileReference, abortChatRequests, abortRequestGroup, + setForceHttpTransport, setShouldDebugExportedSenders, setAllowHttpTransport, } from './client'; export { diff --git a/src/api/gramjs/updateManager.ts b/src/api/gramjs/updateManager.ts index d1e06dc71..d17993b23 100644 --- a/src/api/gramjs/updateManager.ts +++ b/src/api/gramjs/updateManager.ts @@ -107,8 +107,8 @@ export function updateChannelState(channelId: string, pts: number) { } function applyUpdate(updateObject: SeqUpdate | PtsUpdate) { - if ('seq' in updateObject) { - if (updateObject.seq) localDb.commonBoxState.seq = updateObject.seq; + if ('seq' in updateObject && updateObject.seq) { + localDb.commonBoxState.seq = updateObject.seq; localDb.commonBoxState.date = updateObject.date; } @@ -165,7 +165,9 @@ function popSeqQueue() { const localSeq = localDb.commonBoxState.seq; const seqStart = 'seqStart' in update ? update.seqStart : update.seq; - if (seqStart === 0 || seqStart === localSeq + 1) { + if (seqStart === 0) { + applyUpdate(update); + } else if (seqStart === localSeq + 1) { clearTimeout(seqTimeout); seqTimeout = undefined; diff --git a/src/api/gramjs/worker/provider.ts b/src/api/gramjs/worker/provider.ts index 99231e540..3068bc160 100644 --- a/src/api/gramjs/worker/provider.ts +++ b/src/api/gramjs/worker/provider.ts @@ -11,12 +11,14 @@ import generateUniqueId from '../../../util/generateUniqueId'; import { pause } from '../../../util/schedulers'; import { getCurrentTabId, subscribeToMasterChange } from '../../../util/establishMultitabRole'; import Deferred from '../../../util/Deferred'; +import { logDebugMessage } from '../../../util/debugConsole'; type RequestStates = { messageId: string; resolve: Function; reject: Function; callback?: AnyToVoidFunction; + DEBUG_payload?: any; }; const HEALTH_CHECK_TIMEOUT = 150; @@ -82,6 +84,13 @@ export function initApi(onUpdate: OnApiUpdate, initialArgs: ApiInitialArgs) { worker = new Worker(new URL('./worker.ts', import.meta.url)); subscribeToWorker(onUpdate); + if (requestStates.size > 0) { + requestStates.forEach((value) => { + // eslint-disable-next-line no-console + console.error('Hanging request', value.DEBUG_payload); + }); + } + if (initialArgs.platform === 'iOS') { setupIosHealthCheck(); } @@ -127,6 +136,13 @@ export function callApiOnMasterTab(payload: any) { }); } +export function setShouldEnableDebugLog(value: boolean) { + return makeRequest({ + type: 'toggleDebugMode', + isEnabled: value, + }); +} + /* * Call a worker method on this tab's worker, without transferring to master tab * Mostly needed to disconnect worker when re-electing master @@ -244,6 +260,7 @@ export function cancelApiProgressMaster(messageId: string) { function subscribeToWorker(onUpdate: OnApiUpdate) { worker?.addEventListener('message', ({ data }: WorkerMessageEvent) => { + if (!data) return; if (data.type === 'update') { onUpdate(data.update); } else if (data.type === 'methodResponse') { @@ -252,6 +269,10 @@ function subscribeToWorker(onUpdate: OnApiUpdate) { handleMethodCallback(data); } else if (data.type === 'unhandledError') { throw new Error(data.error?.message); + } else if (data.type === 'sendBeacon') { + navigator.sendBeacon(data.url, data.data); + } else if (data.type === 'debugLog') { + logDebugMessage(data.level, ...data.args); } }); } @@ -344,6 +365,8 @@ function makeRequest(message: OriginRequest) { requestStatesByCallback.set(callback, requestState); } + requestState.DEBUG_payload = payload; + requestStates.set(messageId, requestState); promise @@ -388,7 +411,7 @@ async function ensureWorkerPing() { if (Date.now() - startedAt >= HEALTH_CHECK_MIN_DELAY) { worker?.terminate(); worker = undefined; - updateCallback({ '@type': 'requestInitApi' }); + updateCallback({ '@type': 'requestReconnectApi' }); } } finally { isResolved = true; diff --git a/src/api/gramjs/worker/types.ts b/src/api/gramjs/worker/types.ts index 02a551b8a..cf2fbea80 100644 --- a/src/api/gramjs/worker/types.ts +++ b/src/api/gramjs/worker/types.ts @@ -1,6 +1,7 @@ import type { ApiInitialArgs, ApiUpdate } from '../../types'; import type { Methods, MethodArgs, MethodResponse } from '../methods/types'; import type { LocalDb } from '../localDb'; +import type { DebugLevel } from '../../../util/debugConsole'; export type ThenArg = T extends Promise ? U : T; @@ -19,6 +20,14 @@ export type WorkerMessageData = { } | { type: 'unhandledError'; error?: { message: string }; +} | { + type: 'sendBeacon'; + url: string; + data: ArrayBuffer; +} | { + type: 'debugLog'; + level: DebugLevel; + args: any[]; }; export interface WorkerMessageEvent { @@ -38,6 +47,10 @@ export type OriginRequest = { } | { type: 'ping'; messageId?: string; +} | { + type: 'toggleDebugMode'; + messageId?: string; + isEnabled?: boolean; }; export type OriginMessageData = OriginRequest | { diff --git a/src/api/gramjs/worker/worker.ts b/src/api/gramjs/worker/worker.ts index 91ddea830..1f8d3b4f8 100644 --- a/src/api/gramjs/worker/worker.ts +++ b/src/api/gramjs/worker/worker.ts @@ -1,18 +1,46 @@ +/* eslint-disable no-console */ + import type { ApiOnProgress, ApiUpdate } from '../../types'; import type { OriginMessageEvent, WorkerMessageData } from './types'; import { DEBUG } from '../../../config'; import { initApi, callApi, cancelApiProgress } from '../provider'; import { log } from '../helpers'; +import type { DebugLevel } from '../../../util/debugConsole'; +import { DEBUG_LEVELS } from '../../../util/debugConsole'; declare const self: WorkerGlobalScope; +const ORIGINAL_FUNCTIONS = DEBUG_LEVELS.reduce((acc, level) => { + acc[level] = console[level]; + return acc; +}, {} as Record void>); + +function enableDebugLog() { + DEBUG_LEVELS.forEach((level) => { + console[level] = (...args: any[]) => { + postMessage({ + type: 'debugLog', + level, + args: JSON.parse(JSON.stringify(args, (key, value) => (typeof value === 'bigint' + ? value.toString() + : value))), + }); + }; + }); +} + +function disableDebugLog() { + DEBUG_LEVELS.forEach((level) => { + console[level] = ORIGINAL_FUNCTIONS[level]; + }); +} + handleErrors(); const callbackState = new Map(); if (DEBUG) { - // eslint-disable-next-line no-console console.log('>>> FINISH LOAD WORKER'); } @@ -70,7 +98,6 @@ onmessage = async (message: OriginMessageEvent) => { } } catch (error: any) { if (DEBUG) { - // eslint-disable-next-line no-console console.error(error); } @@ -105,18 +132,23 @@ onmessage = async (message: OriginMessageEvent) => { break; } + case 'toggleDebugMode': { + if (data.isEnabled) { + enableDebugLog(); + } else { + disableDebugLog(); + } + } } }; function handleErrors() { self.onerror = (e) => { - // eslint-disable-next-line no-console console.error(e); sendToOrigin({ type: 'unhandledError', error: { message: e.error.message || 'Uncaught exception in worker' } }); }; self.addEventListener('unhandledrejection', (e) => { - // eslint-disable-next-line no-console console.error(e); sendToOrigin({ type: 'unhandledError', error: { message: e.reason.message || 'Uncaught rejection in worker' } }); }); @@ -133,7 +165,7 @@ function onUpdate(update: ApiUpdate) { } } -function sendToOrigin(data: WorkerMessageData, arrayBuffer?: ArrayBuffer) { +export function sendToOrigin(data: WorkerMessageData, arrayBuffer?: ArrayBuffer) { if (arrayBuffer) { postMessage(data, [arrayBuffer]); } else { diff --git a/src/api/types/misc.ts b/src/api/types/misc.ts index 0e94a8ca6..80b80f567 100644 --- a/src/api/types/misc.ts +++ b/src/api/types/misc.ts @@ -13,6 +13,9 @@ export interface ApiInitialArgs { webAuthToken?: string; dcId?: number; mockScenario?: string; + shouldAllowHttpTransport?: boolean; + shouldForceHttpTransport?: boolean; + shouldDebugExportedSenders?: boolean; } export interface ApiOnProgress { diff --git a/src/api/types/updates.ts b/src/api/types/updates.ts index 87f15d0ed..e3c9ceee5 100644 --- a/src/api/types/updates.ts +++ b/src/api/types/updates.ts @@ -615,8 +615,8 @@ export type ApiUpdateMessageTranslations = { toLanguageCode: string; }; -export type ApiRequestInitApi = { - '@type': 'requestInitApi'; +export type ApiRequestReconnectApi = { + '@type': 'requestReconnectApi'; }; export type ApiRequestSync = { @@ -649,7 +649,7 @@ export type ApiUpdate = ( ApiUpdatePhoneCallConnectionState | ApiUpdateBotMenuButton | ApiUpdateTranscribedAudio | ApiUpdateUserEmojiStatus | ApiUpdateMessageExtendedMedia | ApiUpdateConfig | ApiUpdateTopicNotifyExceptions | ApiUpdatePinnedTopic | ApiUpdatePinnedTopicsOrder | ApiUpdateTopic | ApiUpdateTopics | ApiUpdateRecentEmojiStatuses | - ApiUpdateRecentReactions | ApiRequestInitApi | ApiRequestSync + ApiUpdateRecentReactions | ApiRequestReconnectApi | ApiRequestSync ); export type OnApiUpdate = (update: ApiUpdate) => void; diff --git a/src/components/common/Audio.scss b/src/components/common/Audio.scss index 8fae23ab2..9731eab77 100644 --- a/src/components/common/Audio.scss +++ b/src/components/common/Audio.scss @@ -9,7 +9,7 @@ .message-content.no-text & { margin-bottom: calc(0.8125rem - 0.375rem); - &[dir=rtl] { + &[dir="rtl"] { margin-bottom: 1.5rem; } } diff --git a/src/components/left/settings/SettingsExperimental.tsx b/src/components/left/settings/SettingsExperimental.tsx index 83fa046a2..9dee8ded9 100644 --- a/src/components/left/settings/SettingsExperimental.tsx +++ b/src/components/left/settings/SettingsExperimental.tsx @@ -2,11 +2,15 @@ import React, { memo } from '../../../lib/teact/teact'; import type { FC } from '../../../lib/teact/teact'; +import { DEBUG_LOG_FILENAME } from '../../../config'; import { getActions, withGlobal } from '../../../global'; import { LOCAL_TGS_URLS } from '../../common/helpers/animatedAssets'; +import { getDebugLogs } from '../../../util/debugConsole'; +import download from '../../../util/download'; import useHistoryBack from '../../../hooks/useHistoryBack'; import useLang from '../../../hooks/useLang'; +import useLastCallback from '../../../hooks/useLastCallback'; import AnimatedIcon from '../../common/AnimatedIcon'; import ListItem from '../../ui/ListItem'; @@ -19,12 +23,20 @@ type OwnProps = { type StateProps = { shouldShowLoginCodeInChatList?: boolean; + shouldForceHttpTransport?: boolean; + shouldAllowHttpTransport?: boolean; + shouldCollectDebugLogs?: boolean; + shouldDebugExportedSenders?: boolean; }; const SettingsExperimental: FC = ({ isActive, onReset, shouldShowLoginCodeInChatList, + shouldForceHttpTransport, + shouldAllowHttpTransport, + shouldCollectDebugLogs, + shouldDebugExportedSenders, }) => { const { requestConfetti, setSettingOption } = getActions(); const lang = useLang(); @@ -34,6 +46,12 @@ const SettingsExperimental: FC = ({ onBack: onReset, }); + const handleDownloadLog = useLastCallback(() => { + const file = new File([getDebugLogs()], DEBUG_LOG_FILENAME, { type: 'text/plain' }); + const url = URL.createObjectURL(file); + download(url, DEBUG_LOG_FILENAME); + }); + return (
@@ -61,6 +79,43 @@ const SettingsExperimental: FC = ({ // eslint-disable-next-line react/jsx-no-bind onCheck={() => setSettingOption({ shouldShowLoginCodeInChatList: !shouldShowLoginCodeInChatList })} /> + + setSettingOption({ shouldAllowHttpTransport: !shouldAllowHttpTransport })} + /> + + setSettingOption({ shouldForceHttpTransport: !shouldForceHttpTransport })} + /> + + setSettingOption({ shouldCollectDebugLogs: !shouldCollectDebugLogs })} + /> + + setSettingOption({ shouldDebugExportedSenders: !shouldDebugExportedSenders })} + /> + + +
Download log
+
); @@ -70,6 +125,10 @@ export default memo(withGlobal( (global): StateProps => { return { shouldShowLoginCodeInChatList: global.settings.byKey.shouldShowLoginCodeInChatList, + shouldForceHttpTransport: global.settings.byKey.shouldForceHttpTransport, + shouldAllowHttpTransport: global.settings.byKey.shouldAllowHttpTransport, + shouldCollectDebugLogs: global.settings.byKey.shouldCollectDebugLogs, + shouldDebugExportedSenders: global.settings.byKey.shouldDebugExportedSenders, }; }, )(SettingsExperimental)); diff --git a/src/components/middle/composer/AttachMenu.tsx b/src/components/middle/composer/AttachMenu.tsx index 0d55bef1b..dd7c28c41 100644 --- a/src/components/middle/composer/AttachMenu.tsx +++ b/src/components/middle/composer/AttachMenu.tsx @@ -8,13 +8,14 @@ import type { ApiAttachMenuPeerType } from '../../../api/types'; import type { ISettings } from '../../../types'; import { - CONTENT_TYPES_WITH_PREVIEW, SUPPORTED_AUDIO_CONTENT_TYPES, + CONTENT_TYPES_WITH_PREVIEW, DEBUG_LOG_FILENAME, SUPPORTED_AUDIO_CONTENT_TYPES, SUPPORTED_IMAGE_CONTENT_TYPES, SUPPORTED_VIDEO_CONTENT_TYPES, } from '../../../config'; import { IS_TOUCH_ENV } from '../../../util/windowEnvironment'; import { openSystemFilesDialog } from '../../../util/systemFilesDialog'; import { validateFiles } from '../../../util/files'; +import { getDebugLogs } from '../../../util/debugConsole'; import useLastCallback from '../../../hooks/useLastCallback'; import useMouseInside from '../../../hooks/useMouseInside'; @@ -41,6 +42,7 @@ export type OwnProps = { isScheduled?: boolean; attachBots: GlobalState['attachMenu']['bots']; peerType?: ApiAttachMenuPeerType; + shouldCollectDebugLogs?: boolean; onFileSelect: (files: File[], shouldSuggestCompression?: boolean) => void; onPollCreate: () => void; theme: ISettings['theme']; @@ -62,6 +64,7 @@ const AttachMenu: FC = ({ onFileSelect, onPollCreate, theme, + shouldCollectDebugLogs, }) => { const [isAttachMenuOpen, openAttachMenu, closeAttachMenu] = useFlag(); const [handleMouseEnter, handleMouseLeave, markMouseInside] = useMouseInside(isAttachMenuOpen, closeAttachMenu); @@ -109,6 +112,11 @@ const AttachMenu: FC = ({ ), (e) => handleFileSelect(e, false)); }); + const handleSendLogs = useLastCallback(() => { + const file = new File([getDebugLogs()], DEBUG_LOG_FILENAME, { type: 'text/plain' }); + onFileSelect([file]); + }); + const bots = useMemo(() => { return Object.values(attachBots).filter((bot) => { if (!peerType) return false; @@ -174,6 +182,11 @@ const AttachMenu: FC = ({ {lang(!canSendDocuments && canSendAudios ? 'InputAttach.Popover.Music' : 'AttachDocument')} )} + {canSendDocuments && shouldCollectDebugLogs && ( + + {lang('DebugSendLogs')} + + )} )} {canAttachPolls && ( diff --git a/src/components/middle/composer/Composer.tsx b/src/components/middle/composer/Composer.tsx index c8c32d35d..b443a5be1 100644 --- a/src/components/middle/composer/Composer.tsx +++ b/src/components/middle/composer/Composer.tsx @@ -206,6 +206,7 @@ type StateProps = attachmentSettings: GlobalState['attachmentSettings']; slowMode?: ApiChatFullInfo['slowMode']; shouldUpdateStickerSetOrder?: boolean; + shouldCollectDebugLogs?: boolean; }; enum MainButtonState { @@ -290,6 +291,7 @@ const Composer: FC = ({ theme, slowMode, shouldUpdateStickerSetOrder, + shouldCollectDebugLogs, }) => { const { sendMessage, @@ -1492,6 +1494,7 @@ const Composer: FC = ({ isScheduled={shouldSchedule} attachBots={attachBots} peerType={attachMenuPeerType} + shouldCollectDebugLogs={shouldCollectDebugLogs} theme={theme} /> {Boolean(botKeyboardMessageId) && ( @@ -1684,6 +1687,7 @@ export default memo(withGlobal( attachmentSettings: global.attachmentSettings, slowMode, currentMessageList, + shouldCollectDebugLogs: global.settings.byKey.shouldCollectDebugLogs, }; }, )(Composer)); diff --git a/src/config.ts b/src/config.ts index 31cc1432d..c04c9f416 100644 --- a/src/config.ts +++ b/src/config.ts @@ -14,6 +14,7 @@ export const IS_ELECTRON = process.env.IS_ELECTRON; export const DEBUG = process.env.APP_ENV !== 'production'; export const DEBUG_MORE = false; +export const DEBUG_LOG_FILENAME = 'tt-log.json'; export const STRICTERDOM_ENABLED = DEBUG && !IS_ELECTRON; export const BETA_CHANGELOG_URL = 'https://telegra.ph/WebA-Beta-03-20'; diff --git a/src/global/actions/api/calls.async.ts b/src/global/actions/api/calls.async.ts index 60e750fdd..2ad924d62 100644 --- a/src/global/actions/api/calls.async.ts +++ b/src/global/actions/api/calls.async.ts @@ -32,7 +32,7 @@ const HANG_UP_UI_DELAY = 500; addActionHandler('leaveGroupCall', async (global, actions, payload): Promise => { const { isFromLibrary, shouldDiscard, shouldRemove, rejoin, - tabId = getCurrentTabId(), + isPageUnload, tabId = getCurrentTabId(), } = payload || {}; const groupCall = selectActiveGroupCall(global); @@ -51,7 +51,7 @@ addActionHandler('leaveGroupCall', async (global, actions, payload): Promise { - const { tabId = getCurrentTabId() } = payload || {}; + const { isPageUnload, tabId = getCurrentTabId() } = payload || {}; const { phoneCall } = global; if (!phoneCall) return undefined; @@ -354,7 +354,7 @@ addActionHandler('hangUp', (global, actions, payload): ActionReturnType => { callApi('destroyPhoneCallState'); stopPhoneCall(); - callApi('discardCall', { call: phoneCall }); + callApi('discardCall', { call: phoneCall, isPageUnload }); if (phoneCall.state === 'requesting') { global = { diff --git a/src/global/actions/api/initial.ts b/src/global/actions/api/initial.ts index 4f1a56954..593162944 100644 --- a/src/global/actions/api/initial.ts +++ b/src/global/actions/api/initial.ts @@ -2,7 +2,9 @@ import { addActionHandler, getGlobal, setGlobal, } from '../../index'; -import { initApi, callApi, callApiLocal } from '../../../api/gramjs'; +import { + initApi, callApi, callApiLocal, setShouldEnableDebugLog, +} from '../../../api/gramjs'; import { LANG_CACHE_NAME, @@ -54,7 +56,12 @@ addActionHandler('initApi', async (global, actions): Promise => { webAuthToken: initialLocationHash?.tgWebAuthToken, dcId: initialLocationHash?.tgWebAuthDcId ? Number(initialLocationHash?.tgWebAuthDcId) : undefined, mockScenario: initialLocationHash?.mockScenario, + shouldAllowHttpTransport: global.settings.byKey.shouldAllowHttpTransport, + shouldForceHttpTransport: global.settings.byKey.shouldForceHttpTransport, + shouldDebugExportedSenders: global.settings.byKey.shouldDebugExportedSenders, }); + + void setShouldEnableDebugLog(Boolean(global.settings.byKey.shouldCollectDebugLogs)); }); addActionHandler('setAuthPhoneNumber', (global, actions, payload): ActionReturnType => { diff --git a/src/global/actions/api/reactions.ts b/src/global/actions/api/reactions.ts index 33a9ff6ae..032817fe0 100644 --- a/src/global/actions/api/reactions.ts +++ b/src/global/actions/api/reactions.ts @@ -277,7 +277,7 @@ addActionHandler('loadMessageReactions', (global, actions, payload): ActionRetur const chat = selectChat(global, chatId); - if (!chat) { + if (!chat || global.connectionState !== 'connectionStateReady') { return; } diff --git a/src/global/actions/apiUpdaters/initial.ts b/src/global/actions/apiUpdaters/initial.ts index 4c47579d4..cd290a702 100644 --- a/src/global/actions/apiUpdaters/initial.ts +++ b/src/global/actions/apiUpdaters/initial.ts @@ -58,7 +58,15 @@ addActionHandler('apiUpdate', (global, actions, update): ActionReturnType => { onUpdateCurrentUser(global, update); break; - case 'requestInitApi': + case 'requestReconnectApi': + global = getGlobal(); + global = { ...global, isSynced: false }; + setGlobal(global); + + onUpdateConnectionState(global, actions, { + '@type': 'updateConnectionState', + connectionState: 'connectionStateConnecting', + }); actions.initApi(); break; diff --git a/src/global/actions/ui/calls.ts b/src/global/actions/ui/calls.ts index 5275d0c5f..5850ad908 100644 --- a/src/global/actions/ui/calls.ts +++ b/src/global/actions/ui/calls.ts @@ -219,7 +219,7 @@ addActionHandler('createGroupCallInviteLink', async (global, actions, payload): }); addActionHandler('joinVoiceChatByLink', async (global, actions, payload): Promise => { - const { username, inviteHash, tabId = getCurrentTabId() } = payload!; + const { username, inviteHash, tabId = getCurrentTabId() } = payload; const chat = await fetchChatByUsername(global, username); diff --git a/src/global/actions/ui/settings.ts b/src/global/actions/ui/settings.ts index 62974e4ee..213403a7d 100644 --- a/src/global/actions/ui/settings.ts +++ b/src/global/actions/ui/settings.ts @@ -1,6 +1,8 @@ import { addCallback } from '../../../lib/teact/teactn'; import { requestMutation } from '../../../lib/fasterdom/fasterdom'; -import { addActionHandler, getActions } from '../../index'; +import { + addActionHandler, getActions, getGlobal, setGlobal, +} from '../../index'; import { SettingsScreens } from '../../../types'; import type { ActionReturnType, GlobalState } from '../../types'; @@ -13,12 +15,14 @@ import { updateTabState } from '../../reducers/tabs'; import { getCurrentTabId } from '../../../util/establishMultitabRole'; import { applyPerformanceSettings } from '../../../util/perfomanceSettings'; import { selectCanAnimateInterface, selectChatFolder } from '../../selectors'; +import { callApi, setShouldEnableDebugLog } from '../../../api/gramjs'; +import { disableDebugConsole, initDebugConsole } from '../../../util/debugConsole'; let prevGlobal: GlobalState | undefined; addCallback((global: GlobalState) => { // eslint-disable-next-line eslint-multitab-tt/no-getactions-in-actions - const { updatePageTitle } = getActions(); + const { updatePageTitle, updateShouldDebugExportedSenders, updateShouldEnableDebugLog } = getActions(); const oldGlobal = prevGlobal; prevGlobal = global; @@ -62,6 +66,53 @@ addCallback((global: GlobalState) => { if (settings.canDisplayChatInTitle !== prevSettings.canDisplayChatInTitle) { updatePageTitle(); } + + if (settings.shouldForceHttpTransport !== prevSettings.shouldForceHttpTransport) { + callApi('setForceHttpTransport', Boolean(settings.shouldForceHttpTransport)); + } + + if (settings.shouldAllowHttpTransport !== prevSettings.shouldAllowHttpTransport) { + callApi('setAllowHttpTransport', Boolean(settings.shouldAllowHttpTransport)); + if (!settings.shouldAllowHttpTransport && settings.shouldForceHttpTransport) { + global = getGlobal(); + global = { + ...global, + settings: { + ...global.settings, + byKey: { + ...global.settings.byKey, + shouldForceHttpTransport: false, + }, + }, + }; + setGlobal(global); + } + } + + if (settings.shouldDebugExportedSenders !== prevSettings.shouldDebugExportedSenders) { + updateShouldDebugExportedSenders(); + } + + if (settings.shouldCollectDebugLogs !== prevSettings.shouldCollectDebugLogs) { + updateShouldEnableDebugLog(); + } +}); + +addActionHandler('updateShouldEnableDebugLog', (global): ActionReturnType => { + const { settings } = global; + + if (settings.byKey.shouldCollectDebugLogs) { + setShouldEnableDebugLog(true); + initDebugConsole(); + } else { + setShouldEnableDebugLog(false); + disableDebugConsole(); + } +}); + +addActionHandler('updateShouldDebugExportedSenders', (global): ActionReturnType => { + const { settings } = global; + callApi('setShouldDebugExportedSenders', Boolean(settings.byKey.shouldDebugExportedSenders)); }); addActionHandler('setSettingOption', (global, actions, payload): ActionReturnType => { diff --git a/src/global/initialState.ts b/src/global/initialState.ts index d41d7660d..2a82b9689 100644 --- a/src/global/initialState.ts +++ b/src/global/initialState.ts @@ -223,6 +223,7 @@ export const INITIAL_GLOBAL_STATE: GlobalState = { canTranslate: false, doNotTranslate: [], canDisplayChatInTitle: true, + shouldAllowHttpTransport: true, }, themes: { light: { diff --git a/src/global/types.ts b/src/global/types.ts index aa8154cdf..61bda3dd1 100644 --- a/src/global/types.ts +++ b/src/global/types.ts @@ -2302,6 +2302,7 @@ export interface ActionPayloads { shouldDiscard?: boolean; shouldRemove?: boolean; rejoin?: ActionPayloads['joinGroupCall']; + isPageUnload?: boolean; } & WithTabId) | undefined; toggleGroupCallVideo: undefined; @@ -2335,7 +2336,7 @@ export interface ActionPayloads { isVideo?: boolean; } & WithTabId; sendSignalingData: P2pMessage; - hangUp: WithTabId | undefined; + hangUp: ({ isPageUnload?: boolean } & WithTabId) | undefined; acceptCall: undefined; setCallRating: { rating: number; @@ -2361,6 +2362,8 @@ export interface ActionPayloads { skipLockOnUnload: undefined; // Settings + updateShouldDebugExportedSenders: undefined; + updateShouldEnableDebugLog: undefined; loadConfig: undefined; loadAppConfig: { hash: number; diff --git a/src/index.tsx b/src/index.tsx index 416c1c86a..07ceba9b6 100644 --- a/src/index.tsx +++ b/src/index.tsx @@ -54,6 +54,9 @@ async function init() { getActions().initShared(); getActions().init(); + getActions().updateShouldEnableDebugLog(); + getActions().updateShouldDebugExportedSenders(); + if (IS_MULTITAB_SUPPORTED) { establishMultitabRole(); subscribeToMasterChange((isMasterTab) => { @@ -90,3 +93,9 @@ async function init() { }); } } + +onBeforeUnload(() => { + const actions = getActions(); + actions.leaveGroupCall?.({ isPageUnload: true }); + actions.hangUp?.({ isPageUnload: true }); +}); diff --git a/src/lib/gramjs/client/TelegramClient.d.ts b/src/lib/gramjs/client/TelegramClient.d.ts index 36b83b66e..af6f1a8d3 100644 --- a/src/lib/gramjs/client/TelegramClient.d.ts +++ b/src/lib/gramjs/client/TelegramClient.d.ts @@ -14,6 +14,8 @@ declare class TelegramClient { request: R, dcId?: number, abortSignal?: AbortSignal, shouldRetryOnTimeout?: boolean, ): Promise; + async invokeBeacon(request: R, dcId?: number): void; + async uploadFile(uploadParams: UploadFileParams): ReturnType; async downloadFile(uploadParams: DownloadFileParams): ReturnType; @@ -24,6 +26,10 @@ declare class TelegramClient { setPingCallback(callback: () => Promise); + setForceHttpTransport: (forceHttpTransport: boolean) => void; + + setAllowHttpTransport: (allowHttpTransport: boolean) => void; + // Untyped methods. [prop: string]: any; } diff --git a/src/lib/gramjs/client/TelegramClient.js b/src/lib/gramjs/client/TelegramClient.js index 7fee0015a..420507cc3 100644 --- a/src/lib/gramjs/client/TelegramClient.js +++ b/src/lib/gramjs/client/TelegramClient.js @@ -15,6 +15,7 @@ const { ConnectionTCPObfuscated, MTProtoSender, UpdateConnectionState, + HttpConnection, } = require('../network'); const { authFlow, @@ -27,21 +28,13 @@ const { getTmpPassword, } = require('./2fa'); const RequestState = require('../network/RequestState'); -const withAbortCheck = require('../../../util/withAbortCheck').default; -const { AbortError } = require('../../../util/withAbortCheck'); - -class ConnectTimeoutError extends Error { - constructor() { - super('Connection Timeout'); - } -} +const Deferred = require('../../../util/Deferred').default; const DEFAULT_DC_ID = 2; const WEBDOCUMENT_DC_ID = 4; const EXPORTED_SENDER_RECONNECT_TIMEOUT = 1000; // 1 sec const EXPORTED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec const WEBDOCUMENT_REQUEST_PART_SIZE = 131072; // 128kb -const EXPORTED_SENDER_CONNECT_TIMEOUT = 8000; // 8 sec const PING_INTERVAL = 3000; // 3 sec const PING_TIMEOUT = 5000; // 5 sec @@ -63,12 +56,15 @@ const sizeTypes = ['u', 'v', 'w', 'y', 'd', 'x', 'c', 'm', 'b', 'a', 's', 'f']; class TelegramClient { static DEFAULT_OPTIONS = { connection: ConnectionTCPObfuscated, + fallbackConnection: HttpConnection, useIPV6: false, proxy: undefined, timeout: 10, requestRetries: 5, connectionRetries: Infinity, + connectionRetriesToFallback: 1, retryDelay: 1000, + retryMainConnectionDelay: 10000, autoReconnect: true, sequentialUpdates: false, floodSleepLimit: 60, @@ -82,6 +78,9 @@ class TelegramClient { additionalDcsDisabled: false, testServers: false, dcId: DEFAULT_DC_ID, + shouldAllowHttpTransport: false, + shouldForceHttpTransport: false, + shouldDebugExportedSenders: false, }; /** @@ -100,6 +99,9 @@ class TelegramClient { this.apiHash = apiHash; this.defaultDcId = args.dcId || DEFAULT_DC_ID; this._useIPV6 = args.useIPV6; + this._shouldForceHttpTransport = args.shouldForceHttpTransport; + this._shouldAllowHttpTransport = args.shouldAllowHttpTransport; + this._shouldDebugExportedSenders = args.shouldDebugExportedSenders; // this._entityCache = new Set() if (typeof args.baseLogger === 'string') { this._log = new Logger(); @@ -128,7 +130,9 @@ class TelegramClient { this._requestRetries = args.requestRetries; this._connectionRetries = args.connectionRetries; + this._connectionRetriesToFallback = args.connectionRetriesToFallback; this._retryDelay = args.retryDelay || 0; + this._retryMainConnectionDelay = args.retryMainConnectionDelay || 0; if (args.proxy) { this._log.warn('proxies are not supported'); } @@ -137,6 +141,7 @@ class TelegramClient { this._autoReconnect = args.autoReconnect; this._connection = args.connection; + this._fallbackConnection = args.fallbackConnection; // TODO add proxy support this._floodWaitedRequests = {}; @@ -165,13 +170,13 @@ class TelegramClient { this._config = undefined; this.phoneCodeHashes = []; this._exportedSenderPromises = {}; - this._exportedSenderAbortControllers = {}; this._waitingForAuthKey = {}; this._exportedSenderReleaseTimeouts = {}; this._additionalDcsDisabled = args.additionalDcsDisabled; this._loopStarted = false; this._isSwitchingDc = false; this._destroyed = false; + this._connectedDeferred = new Deferred(); } // region Connecting @@ -191,11 +196,16 @@ class TelegramClient { logger: this._log, dcId: this.session.dcId, retries: this._connectionRetries, + retriesToFallback: this._connectionRetriesToFallback, + shouldForceHttpTransport: this._shouldForceHttpTransport, + shouldAllowHttpTransport: this._shouldAllowHttpTransport, delay: this._retryDelay, + retryMainConnectionDelay: this._retryMainConnectionDelay, autoReconnect: this._autoReconnect, connectTimeout: this._timeout, authKeyCallback: this._authKeyCallback.bind(this), updateCallback: this._handleUpdate.bind(this), + getShouldDebugExportedSenders: this.getShouldDebugExportedSenders.bind(this), isMainSender: true, }); } @@ -208,8 +218,11 @@ class TelegramClient { const connection = new this._connection( this.session.serverAddress, this.session.port, this.session.dcId, this._log, this._args.testServers, ); + const fallbackConnection = new this._fallbackConnection( + this.session.serverAddress, this.session.port, this.session.dcId, this._log, this._args.testServers, + ); - const newConnection = await this._sender.connect(connection); + const newConnection = await this._sender.connect(connection, undefined, fallbackConnection); if (!newConnection) { // we're already connected so no need to reset auth key. if (!this._loopStarted) { @@ -228,6 +241,7 @@ class TelegramClient { this._updateLoop(); this._loopStarted = true; } + this._connectedDeferred.resolve(); this._isSwitchingDc = false; } @@ -245,6 +259,28 @@ class TelegramClient { this.pingCallback = callback; } + async setForceHttpTransport(forceHttpTransport) { + this._shouldForceHttpTransport = forceHttpTransport; + await this.disconnect(); + this._sender = undefined; + await this.connect(); + } + + async setAllowHttpTransport(allowHttpTransport) { + this._shouldAllowHttpTransport = allowHttpTransport; + await this.disconnect(); + this._sender = undefined; + await this.connect(); + } + + setShouldDebugExportedSenders(shouldDebugExportedSenders) { + this._shouldDebugExportedSenders = shouldDebugExportedSenders; + } + + getShouldDebugExportedSenders() { + return this._shouldDebugExportedSenders; + } + async _updateLoop() { let lastPongAt; @@ -337,8 +373,13 @@ class TelegramClient { }).flat(), ); + Object.values(this._exportedSenderReleaseTimeouts).forEach((timeouts) => { + Object.values(timeouts).forEach((releaseTimeout) => { + clearTimeout(releaseTimeout); + }); + }); + this._exportedSenderPromises = {}; - this._exportedSenderAbortControllers = {}; this._waitingForAuthKey = {}; } @@ -369,6 +410,7 @@ class TelegramClient { this.session.setAuthKey(undefined); this._isSwitchingDc = true; await this.disconnect(); + this._sender = undefined; return this.connect(); } @@ -383,9 +425,10 @@ class TelegramClient { if (this.session.dcId !== dcId) { this.session.setAuthKey(undefined, dcId); } + // eslint-disable-next-line no-console + if (this._shouldDebugExportedSenders) console.log(`🧹 Cleanup idx=${index} dcId=${dcId}`); const sender = await this._exportedSenderPromises[dcId][index]; delete this._exportedSenderPromises[dcId][index]; - delete this._exportedSenderAbortControllers[dcId][index]; await sender.disconnect(); } @@ -400,7 +443,6 @@ class TelegramClient { } this._exportedSenderPromises[dcId] = {}; - this._exportedSenderAbortControllers[dcId] = {}; await Promise.all(promises.map(async (promise) => { const sender = await promise; @@ -408,7 +450,7 @@ class TelegramClient { })); } - async _connectSender(sender, dcId, index, isPremium = false, abortSignal) { + async _connectSender(sender, dcId, index, isPremium = false) { // if we don't already have an auth key we want to use normal DCs not -1 let hasAuthKey = Boolean(sender.authKey.getKey()); let firstConnectResolver; @@ -432,7 +474,7 @@ class TelegramClient { // eslint-disable-next-line no-constant-condition while (true) { try { - await withAbortCheck(abortSignal, sender.connect(new this._connection( + await sender.connect(new this._connection( dc.ipAddress, dc.port, dcId, @@ -440,20 +482,24 @@ class TelegramClient { this._args.testServers, // Premium DCs are not stable for obtaining auth keys, so need to we first connect to regular ones hasAuthKey ? isPremium : false, - ))); + ), undefined, new this._fallbackConnection( + dc.ipAddress, + dc.port, + dcId, + this._log, + this._args.testServers, + hasAuthKey ? isPremium : false, + )); if (this.session.dcId !== dcId && !sender._authenticated) { this._log.info(`Exporting authorization for data center ${dc.ipAddress}`); - const auth = await withAbortCheck( - abortSignal, - this.invoke(new requests.auth.ExportAuthorization({ dcId })), - ); + const auth = await this.invoke(new requests.auth.ExportAuthorization({ dcId })); const req = this._initWith(new requests.auth.ImportAuthorization({ id: auth.id, bytes: auth.bytes, })); - await withAbortCheck(abortSignal, sender.send(req)); + await sender.send(req); sender._authenticated = true; } @@ -465,13 +511,16 @@ class TelegramClient { delete this._waitingForAuthKey[dcId]; } + if (this._shouldDebugExportedSenders) { + // eslint-disable-next-line no-console + console.warn(`✅ Connected to exported sender idx=${index} dc=${dcId}`); + } + return sender; } catch (err) { - if (err instanceof AbortError) { - delete this._exportedSenderPromises[dcId][index]; - delete this._exportedSenderAbortControllers[dcId][index]; - sender.disconnect(); - return undefined; + if (this._shouldDebugExportedSenders) { + // eslint-disable-next-line no-console + console.error(`☠️ ERROR! idx=${index} dcId=${dcId} ${err.message}`); } // eslint-disable-next-line no-console console.error(err); @@ -482,54 +531,32 @@ class TelegramClient { } } - getConnectedExportedSenderIndex(dcId, i) { - const index = Object.keys(this._exportedSenderReleaseTimeouts[dcId] || {})[0]; - const firstIndex = Number(index ?? 0); - return { - newIndex: firstIndex === i ? i + 1 : firstIndex, - noConnectedExportedSenders: index === undefined, - }; - } - async _borrowExportedSender(dcId, shouldReconnect, existingSender, index, isPremium) { if (this._additionalDcsDisabled) { return undefined; } - let i = index || 0; + const i = index || 0; if (!this._exportedSenderPromises[dcId]) this._exportedSenderPromises[dcId] = {}; - if (!this._exportedSenderAbortControllers[dcId]) this._exportedSenderAbortControllers[dcId] = {}; - - if (this._exportedSenderAbortControllers[dcId][i]?.signal.aborted) { - const { newIndex } = this.getConnectedExportedSenderIndex(dcId, i); - i = newIndex; - } if (!this._exportedSenderPromises[dcId][i] || shouldReconnect) { - this._exportedSenderAbortControllers[dcId][i]?.abort(); - this._exportedSenderAbortControllers[dcId][i] = new AbortController(); + if (this._shouldDebugExportedSenders) { + // eslint-disable-next-line no-console + console.warn(`🕒 Connecting to exported sender idx=${i} dc=${dcId}` + + ` ${shouldReconnect ? '(reconnect)' : ''}`); + } this._exportedSenderPromises[dcId][i] = this._connectSender( existingSender || this._createExportedSender(dcId, i), dcId, index, isPremium, - this._exportedSenderAbortControllers[dcId][i].signal, ); } let sender; try { - sender = await Promise.race([ - this._exportedSenderPromises[dcId][i], - Helpers.sleep(EXPORTED_SENDER_CONNECT_TIMEOUT).then(() => { - return Promise.reject(new ConnectTimeoutError()); - }), - ]); - - if (!sender) { - throw new ConnectTimeoutError(); - } + sender = await this._exportedSenderPromises[dcId][i]; if (!sender.isConnected()) { if (sender.isConnecting) { @@ -540,11 +567,6 @@ class TelegramClient { } } } catch (err) { - if (err instanceof ConnectTimeoutError) { - this._exportedSenderAbortControllers[dcId][i]?.abort(); - const { newIndex, noConnectedExportedSenders } = this.getConnectedExportedSenderIndex(dcId, i); - return this._borrowExportedSender(dcId, noConnectedExportedSenders, undefined, newIndex, isPremium); - } // eslint-disable-next-line no-console console.error(err); @@ -558,8 +580,11 @@ class TelegramClient { } this._exportedSenderReleaseTimeouts[dcId][i] = setTimeout(() => { - this._exportedSenderReleaseTimeouts[dcId][i] = undefined; + // eslint-disable-next-line no-console + if (this._shouldDebugExportedSenders) console.log(`🚪 Release idx=${i} dcId=${dcId}`); sender.disconnect(); + this._exportedSenderReleaseTimeouts[dcId][i] = undefined; + this._exportedSenderPromises[dcId][i] = undefined; }, EXPORTED_SENDER_RELEASE_TIMEOUT); return sender; @@ -569,12 +594,19 @@ class TelegramClient { return new MTProtoSender(this.session.getAuthKey(dcId), { logger: this._log, dcId, + senderIndex: index, retries: this._connectionRetries, + retriesToFallback: this._connectionRetriesToFallback, delay: this._retryDelay, + retryMainConnectionDelay: this._retryMainConnectionDelay, + shouldForceHttpTransport: this._shouldForceHttpTransport, + shouldAllowHttpTransport: this._shouldAllowHttpTransport, autoReconnect: this._autoReconnect, connectTimeout: this._timeout, authKeyCallback: this._authKeyCallback.bind(this), isMainSender: dcId === this.session.dcId, + isExported: true, + getShouldDebugExportedSenders: this.getShouldDebugExportedSenders.bind(this), onConnectionBreak: () => this._cleanupExportedSender(dcId, index), }); } @@ -602,7 +634,7 @@ class TelegramClient { * @returns {Promise} */ downloadFile(inputLocation, args = {}) { - return downloadFile(this, inputLocation, args); + return downloadFile(this, inputLocation, args, this._shouldDebugExportedSenders); } downloadMedia(messageOrMedia, args) { @@ -933,9 +965,11 @@ class TelegramClient { throw new Error('You can only invoke MTProtoRequests'); } - const sender = dcId === undefined ? this._sender : await this.getSender(dcId); + let sender = dcId === undefined ? this._sender : await this.getSender(dcId); this._lastRequest = Date.now(); + await this._connectedDeferred.promise; + const state = new RequestState(request, abortSignal); let attempt = 0; @@ -968,6 +1002,7 @@ class TelegramClient { throw e; } await this._switchDC(e.newDc); + sender = dcId === undefined ? this._sender : await this.getSender(dcId); } else if (e instanceof errors.MsgWaitError) { // We need to resend this after the old one was confirmed. await state.isReady(); @@ -993,6 +1028,16 @@ class TelegramClient { throw new Error(`Request was unsuccessful ${attempt} time(s)`); } + async invokeBeacon(request, dcId) { + if (request.classType !== 'request') { + throw new Error('You can only invoke MTProtoRequests'); + } + + const sender = dcId === undefined ? this._sender : await this.getSender(dcId); + + sender.sendBeacon(request); + } + setIsPremium(isPremium) { this.isPremium = isPremium; } @@ -1026,7 +1071,7 @@ class TelegramClient { } uploadFile(fileParams) { - return uploadFile(this, fileParams); + return uploadFile(this, fileParams, this._shouldDebugExportedSenders); } updateTwoFaSettings(params) { diff --git a/src/lib/gramjs/client/downloadFile.ts b/src/lib/gramjs/client/downloadFile.ts index 5d66ecd3a..03d07802f 100644 --- a/src/lib/gramjs/client/downloadFile.ts +++ b/src/lib/gramjs/client/downloadFile.ts @@ -95,11 +95,12 @@ export async function downloadFile( client: TelegramClient, inputLocation: Api.InputFileLocation, fileParams: DownloadFileParams, + shouldDebugExportedSenders?: boolean, ) { const { dcId } = fileParams; for (let i = 0; i < SENDER_RETRIES; i++) { try { - return await downloadFile2(client, inputLocation, fileParams); + return await downloadFile2(client, inputLocation, fileParams, shouldDebugExportedSenders); } catch (err: any) { if ( (err.message.startsWith('SESSION_REVOKED') || err.message.startsWith('CONNECTION_NOT_INITED')) @@ -127,6 +128,7 @@ async function downloadFile2( client: TelegramClient, inputLocation: Api.InputFileLocation, fileParams: DownloadFileParams, + shouldDebugExportedSenders?: boolean, ) { let { partSizeKb, end, @@ -134,6 +136,15 @@ async function downloadFile2( const { fileSize, } = fileParams; + + const fileId = 'id' in inputLocation ? inputLocation.id : undefined; + const logWithId = (...args: any[]) => { + if (!shouldDebugExportedSenders) return; + // eslint-disable-next-line no-console + console.log(`⬇️ [${fileId}/${fileParams.dcId}]`, ...args); + }; + + logWithId('Downloading file...'); const isPremium = Boolean(client.isPremium); const { dcId, progressCallback, start = 0 } = fileParams; @@ -206,6 +217,9 @@ async function downloadFile2( foremans[senderIndex].releaseWorker(); break; } + const logWithSenderIndex = (...args: any[]) => { + logWithId(`[${senderIndex}/${dcId}]`, ...args); + }; // eslint-disable-next-line no-loop-func, @typescript-eslint/no-loop-func promises.push((async (offsetMemo: number) => { @@ -213,7 +227,23 @@ async function downloadFile2( while (true) { let sender; try { + let isDone = false; + if (shouldDebugExportedSenders) { + setTimeout(() => { + if (isDone) return; + logWithSenderIndex(`❗️️ getSender took too long ${offsetMemo}`); + }, 8000); + } sender = await client.getSender(dcId, senderIndex, isPremium); + isDone = true; + + let isDone2 = false; + if (shouldDebugExportedSenders) { + setTimeout(() => { + if (isDone2) return; + logWithSenderIndex(`❗️️ sender.send took too long ${offsetMemo}`); + }, 6000); + } // sometimes a session is revoked and will cause this to hang. const result = await Promise.race([ sender.send(new Api.upload.GetFile({ @@ -225,19 +255,23 @@ async function downloadFile2( sleep(SENDER_TIMEOUT).then(() => { // If we're on the main DC we just cancel the download and let the user retry later if (dcId === client.session.dcId) { + logWithSenderIndex(`Download timed out ${offsetMemo}`); return Promise.reject(new Error('USER_CANCELED')); } else { + logWithSenderIndex(`Download timed out [not main] ${offsetMemo}`); return Promise.reject(new Error('SESSION_REVOKED')); } }), ]); + isDone2 = true; if (progressCallback) { if (progressCallback.isCanceled) { throw new Error('USER_CANCELED'); } progress += (1 / partsCount); + logWithSenderIndex(`⬇️️ ${progress * 100}%`); progressCallback(progress); } @@ -260,6 +294,7 @@ async function downloadFile2( continue; } + logWithSenderIndex(`Ended not gracefully ${offsetMemo}`); foremans[senderIndex].releaseWorker(); if (deferred) deferred.resolve(); diff --git a/src/lib/gramjs/client/uploadFile.ts b/src/lib/gramjs/client/uploadFile.ts index 7df7805f5..a82e395e7 100644 --- a/src/lib/gramjs/client/uploadFile.ts +++ b/src/lib/gramjs/client/uploadFile.ts @@ -33,6 +33,7 @@ const foremans = Array(MAX_CONCURRENT_CONNECTIONS_PREMIUM).fill(undefined) export async function uploadFile( client: TelegramClient, fileParams: UploadFileParams, + shouldDebugExportedSenders?: boolean, ): Promise { const { file, onProgress } = fileParams; @@ -42,6 +43,13 @@ export async function uploadFile( const fileId = readBigIntFromBuffer(generateRandomBytes(8), true, true); const isLarge = size > LARGE_FILE_THRESHOLD; + const logWithId = (...args: any[]) => { + if (!shouldDebugExportedSenders) return; + // eslint-disable-next-line no-console + console.log(`⬆️ [${fileId}]`, ...args); + }; + + logWithId('Uploading file...'); const partSize = getUploadPartSize(size) * KB_TO_BYTES; const partCount = Math.floor((size + partSize - 1) / partSize); @@ -70,6 +78,10 @@ export async function uploadFile( break; } + const logWithSenderIndex = (...args: any[]) => { + logWithId(`[${senderIndex}]`, ...args); + }; + const blobSlice = file.slice(i * partSize, (i + 1) * partSize); // eslint-disable-next-line no-loop-func, @typescript-eslint/no-loop-func promises.push((async (jMemo: number, blobSliceMemo: Blob) => { @@ -78,8 +90,24 @@ export async function uploadFile( let sender; try { // We always upload from the DC we are in + let isDone = false; + if (shouldDebugExportedSenders) { + setTimeout(() => { + if (isDone) return; + logWithSenderIndex(`❗️️ getSender took too long j=${jMemo}`); + }, 8000); + } sender = await client.getSender(client.session.dcId, senderIndex, isPremium); + isDone = true; + + let isDone2 = false; const partBytes = await blobSliceMemo.arrayBuffer(); + if (shouldDebugExportedSenders) { + setTimeout(() => { + if (isDone2) return; + logWithSenderIndex(`❗️️ sender.send took too long j=${jMemo}`); + }, 6000); + } await sender.send( isLarge ? new Api.upload.SaveBigFilePart({ @@ -94,7 +122,9 @@ export async function uploadFile( bytes: Buffer.from(partBytes), }), ); + isDone2 = true; } catch (err) { + logWithSenderIndex(`❗️️️Upload part failed ${err?.toString()} j=${jMemo}`); if (sender && !sender.isConnected()) { await sleep(DISCONNECT_SLEEP); continue; @@ -115,6 +145,7 @@ export async function uploadFile( } progress += (1 / partCount); + logWithSenderIndex(`${progress * 100}%`); onProgress(progress); } break; diff --git a/src/lib/gramjs/extensions/HttpStream.ts b/src/lib/gramjs/extensions/HttpStream.ts new file mode 100644 index 000000000..dddf9cee2 --- /dev/null +++ b/src/lib/gramjs/extensions/HttpStream.ts @@ -0,0 +1,101 @@ +const closeError = new Error('HttpStream was closed'); + +class HttpStream { + private url: string | undefined; + + private isClosed: boolean; + + private stream: Buffer[] = []; + + private canRead: Promise = Promise.resolve(); + + private resolveRead: VoidFunction | undefined; + + private rejectRead: VoidFunction | undefined; + + private disconnectedCallback: VoidFunction | undefined; + + constructor(disconnectedCallback: VoidFunction) { + this.isClosed = true; + this.disconnectedCallback = disconnectedCallback; + } + + async read() { + await this.canRead; + + const data = this.stream.shift(); + if (this.stream.length === 0) { + this.canRead = new Promise((resolve, reject) => { + this.resolveRead = resolve; + this.rejectRead = reject; + }); + } + + return data; + } + + getURL(ip: string, port: number, testServers: boolean, isPremium: boolean) { + if (port === 443) { + return `https://${ip}:${port}/apiw1${testServers ? '_test' : ''}${isPremium ? '_premium' : ''}`; + } else { + return `http://${ip}:${port}/apiw1${testServers ? '_test' : ''}${isPremium ? '_premium' : ''}`; + } + } + + async connect(port: number, ip: string, testServers = false, isPremium = false) { + this.stream = []; + this.canRead = new Promise((resolve, reject) => { + this.resolveRead = resolve; + this.rejectRead = reject; + }); + this.url = this.getURL(ip, port, testServers, isPremium); + + await fetch(this.url, { + method: 'POST', + body: Buffer.from([]), + mode: 'cors', + }); + + this.isClosed = false; + } + + async write(data: Buffer) { + if (this.isClosed || !this.url) { + this.handleDisconnect(); + throw closeError; + } + + return fetch(this.url, { + method: 'POST', + body: data, + mode: 'cors', + }).then(async (response) => { + if (this.isClosed) { + this.handleDisconnect(); + return; + } + if (response.status !== 200) { + this.handleDisconnect(); + throw closeError; + } + + const arrayBuffer = await response.arrayBuffer(); + + this.stream = this.stream.concat(Buffer.from(arrayBuffer)); + if (this.resolveRead && !this.isClosed) this.resolveRead(); + }); + } + + handleDisconnect() { + this.disconnectedCallback?.(); + if (this.rejectRead) this.rejectRead(); + } + + close() { + this.isClosed = true; + this.handleDisconnect(); + this.disconnectedCallback = undefined; + } +} + +export default HttpStream; diff --git a/src/lib/gramjs/extensions/MessagePacker.js b/src/lib/gramjs/extensions/MessagePacker.js index 78de69c60..f0023d315 100644 --- a/src/lib/gramjs/extensions/MessagePacker.js +++ b/src/lib/gramjs/extensions/MessagePacker.js @@ -82,13 +82,39 @@ class MessagePacker { this.setReady(true); } - async get() { + async getBeacon(state) { + const buffer = new BinaryWriter(Buffer.alloc(0)); + const size = state.data.length + TLMessage.SIZE_OVERHEAD; + if (size <= MessageContainer.MAXIMUM_SIZE) { + let afterId; + if (state.after) { + afterId = state.after.msgId; + } + state.msgId = await this._state.writeDataAsMessage( + buffer, state.data, state.request.classType === 'request', afterId, + ); + this._log.debug(`Assigned msgId = ${state.msgId} to ${state.request.className + || state.request.constructor.name}`); + + return buffer.getValue(); + } + this._log.warn(`Message payload for ${state.request.className + || state.request.constructor.name} is too long ${state.data.length} and cannot be sent`); + state.reject('Request Payload is too big'); + + return undefined; + } + + async wait() { if (!this._queue.length) { this._ready = new Promise(((resolve) => { this.setReady = resolve; })); await this._ready; } + } + + async get() { if (!this._queue[this._queue.length - 1]) { this._queue = []; return undefined; diff --git a/src/lib/gramjs/extensions/PromisedWebSockets.js b/src/lib/gramjs/extensions/PromisedWebSockets.js index be2bb0115..24942d680 100644 --- a/src/lib/gramjs/extensions/PromisedWebSockets.js +++ b/src/lib/gramjs/extensions/PromisedWebSockets.js @@ -3,6 +3,8 @@ const { Mutex } = require('async-mutex'); const mutex = new Mutex(); const closeError = new Error('WebSocket was closed'); +const CONNECTION_TIMEOUT = 10000; +const MAX_TIMEOUT = 30000; class PromisedWebSockets { constructor(disconnectedCallback) { @@ -16,6 +18,7 @@ class PromisedWebSockets { this.client = undefined; this.closed = true; this.disconnectedCallback = disconnectedCallback; + this.timeout = CONNECTION_TIMEOUT; } async readExactly(number) { @@ -80,14 +83,20 @@ class PromisedWebSockets { this.website = this.getWebSocketLink(ip, port, testServers, isPremium); this.client = new WebSocket(this.website, 'binary'); return new Promise((resolve, reject) => { + let hasResolved = false; + let timeout; this.client.onopen = () => { this.receive(); resolve(this); + hasResolved = true; + if (timeout) clearTimeout(timeout); }; this.client.onerror = (error) => { // eslint-disable-next-line no-console console.error('WebSocket error', error); reject(error); + hasResolved = true; + if (timeout) clearTimeout(timeout); }; this.client.onclose = (event) => { const { code, reason, wasClean } = event; @@ -101,7 +110,25 @@ class PromisedWebSockets { if (this.disconnectedCallback) { this.disconnectedCallback(); } + hasResolved = true; + if (timeout) clearTimeout(timeout); }; + + timeout = setTimeout(() => { + if (hasResolved) return; + + reject(new Error('WebSocket connection timeout')); + this.resolveRead(false); + this.closed = true; + if (this.disconnectedCallback) { + this.disconnectedCallback(); + } + this.client.close(); + this.timeout *= 2; + this.timeout = Math.min(this.timeout, MAX_TIMEOUT); + timeout = undefined; + }, this.timeout); + // CONTEST // Seems to not be working, at least in a web worker // eslint-disable-next-line no-restricted-globals diff --git a/src/lib/gramjs/network/MTProtoSender.js b/src/lib/gramjs/network/MTProtoSender.js index 01a5963d0..67eda6099 100644 --- a/src/lib/gramjs/network/MTProtoSender.js +++ b/src/lib/gramjs/network/MTProtoSender.js @@ -34,11 +34,17 @@ const { MsgsStateReq, MsgResendReq, MsgsAllInfo, + HttpWait, } = require('../tl').constructors; const { SecurityError } = require('../errors/Common'); const { InvalidBufferError } = require('../errors/Common'); const { RPCMessageToError } = require('../errors'); const { TypeNotFoundError } = require('../errors/Common'); +const { sendToOrigin } = require('../../../api/gramjs/worker/worker'); + +const LONGPOLL_MAX_WAIT = 3000; +const LONGPOLL_MAX_DELAY = 500; +const LONGPOLL_WAIT_AFTER = 150; /** * MTProto Mobile Protocol sender @@ -57,7 +63,11 @@ class MTProtoSender { static DEFAULT_OPTIONS = { logger: undefined, retries: Infinity, + retriesToFallback: 1, delay: 2000, + retryMainConnectionDelay: 10000, + shouldForceHttpTransport: false, + shouldAllowHttpTransport: false, autoReconnect: true, connectTimeout: undefined, authKeyCallback: undefined, @@ -65,6 +75,8 @@ class MTProtoSender { autoReconnectCallback: undefined, isMainSender: undefined, onConnectionBreak: undefined, + isExported: undefined, + getShouldDebugExportedSenders: undefined, }; /** @@ -74,17 +86,26 @@ class MTProtoSender { constructor(authKey, opts) { const args = { ...MTProtoSender.DEFAULT_OPTIONS, ...opts }; this._connection = undefined; + this._fallbackConnection = undefined; + this._shouldForceHttpTransport = args.shouldForceHttpTransport; + this._shouldAllowHttpTransport = args.shouldAllowHttpTransport; this._log = args.logger; this._dcId = args.dcId; + this._senderIndex = args.senderIndex; this._retries = args.retries; + this._retriesToFallback = args.retriesToFallback; this._delay = args.delay; + this._retryMainConnectionDelay = args.retryMainConnectionDelay; this._autoReconnect = args.autoReconnect; this._connectTimeout = args.connectTimeout; this._authKeyCallback = args.authKeyCallback; this._updateCallback = args.updateCallback; this._autoReconnectCallback = args.autoReconnectCallback; this._isMainSender = args.isMainSender; + this._isExported = args.isExported; this._onConnectionBreak = args.onConnectionBreak; + this._isFallback = false; + this._getShouldDebugExportedSenders = args.getShouldDebugExportedSenders; /** * whether we disconnected ourself or telegram did it. @@ -107,6 +128,7 @@ class MTProtoSender { * We need to join the loops upon disconnection */ this._send_loop_handle = undefined; + this._long_poll_loop_handle = undefined; this._recv_loop_handle = undefined; /** @@ -120,6 +142,7 @@ class MTProtoSender { * Note that here we're also storing their ``_RequestState``. */ this._send_queue = new MessagePacker(this._state, this._log); + this._send_queue_long_poll = new MessagePacker(this._state, this._log); /** * Sent states are remembered until a response is received. @@ -162,13 +185,34 @@ class MTProtoSender { // Public API + logWithIndexCallback(level) { + return (...args) => { + if (!this._getShouldDebugExportedSenders + || !this._getShouldDebugExportedSenders()) return; + // eslint-disable-next-line no-console + console[level](`[${this._isExported ? `idx=${this._senderIndex} ` : 'M '}dcId=${this._dcId}]`, ...args); + }; + } + + logWithIndex = { + debug: this.logWithIndexCallback('debug'), + log: this.logWithIndexCallback('log'), + warn: this.logWithIndexCallback('warn'), + error: this.logWithIndexCallback('error'), + }; + + getConnection() { + return this._isFallback ? this._fallbackConnection : this._connection; + } + /** * Connects to the specified given connection using the given auth key. * @param connection * @param [force] + * @param fallbackConnection * @returns {Promise} */ - async connect(connection, force) { + async connect(connection, force, fallbackConnection) { this.userDisconnected = false; if (this._user_connected && !force) { @@ -176,11 +220,20 @@ class MTProtoSender { return false; } this.isConnecting = true; + this._isFallback = this._shouldForceHttpTransport && this._shouldAllowHttpTransport; this._connection = connection; + this._fallbackConnection = fallbackConnection; - for (let attempt = 0; attempt < this._retries; attempt++) { + for (let attempt = 0; attempt < this._retries + this._retriesToFallback; attempt++) { try { - await this._connect(); + if (attempt >= this._retriesToFallback && this._shouldAllowHttpTransport) { + this._isFallback = true; + this.logWithIndex.warn('Using fallback connection'); + this._log.warn('Using fallback connection'); + } + this.logWithIndex.warn('Connecting...'); + await this._connect(this.getConnection()); + this.logWithIndex.warn('Connected!'); if (this._updateCallback) { this._updateCallback(new UpdateConnectionState(UpdateConnectionState.connected)); } @@ -189,7 +242,7 @@ class MTProtoSender { if (this._updateCallback && attempt === 0) { this._updateCallback(new UpdateConnectionState(UpdateConnectionState.disconnected)); } - this._log.error(`WebSocket connection failed attempt: ${attempt + 1}`); + this._log.error(`${this._isFallback ? 'HTTP' : 'WebSocket'} connection failed attempt: ${attempt + 1}`); // eslint-disable-next-line no-console console.error(err); await Helpers.sleep(this._delay); @@ -197,9 +250,41 @@ class MTProtoSender { } this.isConnecting = false; + if (this._isFallback && !this._shouldForceHttpTransport) { + void this.tryReconnectToMain(); + } + return true; } + async tryReconnectToMain() { + if (!this.isConnecting && this._isFallback && !this._isReconnectingToMain && !this.isReconnecting + && !this._shouldForceHttpTransport && !this._isExported) { + this._log.debug('Trying to reconnect to main connection'); + this._isReconnectingToMain = true; + try { + await this._connection.connect(); + this._log.info('Reconnected to main connection'); + this.logWithIndex.warn('Reconnected to main connection'); + this.isReconnecting = true; + await this._disconnect(this._fallbackConnection); + await this.connect(this._connection, true, this._fallbackConnection); + this.isReconnecting = false; + this._isReconnectingToMain = false; + } catch (e) { + this.isReconnecting = false; + this._isReconnectingToMain = false; + this._log.error( + `Failed to reconnect to main connection, retrying in ${this._retryMainConnectionDelay}ms`, + ); + await Helpers.sleep(this._retryMainConnectionDelay); + void this.tryReconnectToMain(); + } + } else { + await Helpers.sleep(this._retryMainConnectionDelay); + } + } + isConnected() { return this._user_connected; } @@ -210,7 +295,8 @@ class MTProtoSender { */ async disconnect() { this.userDisconnected = true; - await this._disconnect(); + this.logWithIndex.warn('Disconnecting...'); + await this._disconnect(this.getConnection()); } /** @@ -237,11 +323,17 @@ class MTProtoSender { impossible to await receive a result that was never sent. * @param request * @param abortSignal + * @param isLongPoll * @returns {RequestState} */ - send(request, abortSignal) { + send(request, abortSignal, isLongPoll = false) { const state = new RequestState(request, abortSignal); - this._send_queue.append(state); + if (!isLongPoll) { + this.logWithIndex.debug(`Send ${request.className}`); + this._send_queue.append(state); + } else { + this._send_queue_long_poll.append(state); + } return state.promise; } @@ -249,6 +341,17 @@ class MTProtoSender { this._send_queue.append(state); } + async sendBeacon(request) { + if (!this._user_connected) { + throw new Error('Cannot send requests while disconnected'); + } + const state = new RequestState(request, undefined); + const data = await this._send_queue.getBeacon(state); + const encryptedData = await this._state.encryptMessageData(data); + + sendToOrigin({ type: 'sendBeacon', data: encryptedData, url: this._fallbackConnection.socket.website }); + } + /** * Performs the actual connection, retrying, generating the * authorization key if necessary, and starting the send and @@ -256,13 +359,15 @@ class MTProtoSender { * @returns {Promise} * @private */ - async _connect() { - this._log.info('Connecting to {0}...'.replace('{0}', this._connection)); - await this._connection.connect(); - this._log.debug('Connection success!'); - // process.exit(0) + async _connect(connection) { + if (!connection.isConnected()) { + this._log.info('Connecting to {0}...'.replace('{0}', connection)); + await connection.connect(); + this._log.debug('Connection success!'); + } + if (!this.authKey.getKey()) { - const plain = new MtProtoPlainSender(this._connection, this._log); + const plain = new MtProtoPlainSender(connection, this._log); this._log.debug('New auth_key attempt ...'); const res = await doAuthentication(plain, this._log); this._log.debug('Generated new auth_key successfully'); @@ -290,33 +395,80 @@ class MTProtoSender { this._user_connected = true; this.isReconnecting = false; - this._log.debug('Starting send loop'); - this._send_loop_handle = this._sendLoop(); + if (!this._send_loop_handle) { + this._log.debug('Starting send loop'); + this._send_loop_handle = this._sendLoop(); + } - this._log.debug('Starting receive loop'); - this._recv_loop_handle = this._recvLoop(); + if (!this._recv_loop_handle) { + this._log.debug('Starting receive loop'); + this._recv_loop_handle = this._recvLoop(); + } + + if (!this._long_poll_loop_handle && connection.shouldLongPoll) { + this._log.debug('Starting long-poll loop'); + this._long_poll_loop_handle = this._longPollLoop(); + } // _disconnected only completes after manual disconnection // or errors after which the sender cannot continue such // as failing to reconnect or any unexpected error. - this._log.info('Connection to %s complete!'.replace('%s', this._connection.toString())); + this._log.info('Connection to %s complete!'.replace('%s', connection.toString())); } - async _disconnect() { + async _disconnect(connection) { if (this._updateCallback) { this._updateCallback(new UpdateConnectionState(UpdateConnectionState.disconnected)); } - if (this._connection === undefined) { + if (connection === undefined) { this._log.info('Not disconnecting (already have no connection)'); return; } - this._log.info('Disconnecting from %s...'.replace('%s', this._connection.toString())); + this._log.info('Disconnecting from %s...'.replace('%s', connection.toString())); this._user_connected = false; this._log.debug('Closing current connection...'); - await this._connection.disconnect(); + this.logWithIndex.warn('Disconnecting'); + await connection.disconnect(); + } + + async _longPollLoop() { + while (this._user_connected && !this.isReconnecting && this._isFallback + && this.getConnection().shouldLongPoll) { + await this._send_queue_long_poll.wait(); + + const res = await this._send_queue_long_poll.get(); + + if (this.isReconnecting || !this._isFallback) { + this._long_poll_loop_handle = undefined; + return; + } + + if (!res) { + continue; + } + let { data } = res; + const { batch } = res; + this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`); + + data = await this._state.encryptMessageData(data); + + try { + await this._fallbackConnection.send(data); + } catch (e) { + this._log.error(e); + this._log.info('Connection closed while sending data'); + this._long_poll_loop_handle = undefined; + return; + } + + this.isSendingLongPoll = false; + this.checkLongPoll(); + } + + this._long_poll_loop_handle = undefined; } /** @@ -332,22 +484,46 @@ class MTProtoSender { this._pending_state.clear(); while (this._user_connected && !this.isReconnecting) { - if (this._pending_ack.size) { - const ack = new RequestState(new MsgsAck({ msgIds: Array(...this._pending_ack) })); - this._send_queue.append(ack); - this._last_acks.push(ack); - if (this._last_acks.length >= 10) { - this._last_acks.shift(); + const appendAcks = () => { + if (this._pending_ack.size) { + const ack = new RequestState(new MsgsAck({ msgIds: Array(...this._pending_ack) })); + this._send_queue.append(ack); + this._last_acks.push(ack); + if (this._last_acks.length >= 10) { + this._last_acks.shift(); + } + this._pending_ack.clear(); } - this._pending_ack.clear(); - } - this._log.debug(`Waiting for messages to send...${this.isReconnecting}`); + }; + + appendAcks(); + + this.logWithIndex.debug(`Waiting for messages to send... ${this.isReconnecting}`); + this._log.debug(`Waiting for messages to send... ${this.isReconnecting}`); // TODO Wait for the connection send queue to be empty? // This means that while it's not empty we can wait for // more messages to be added to the send queue. + await this._send_queue.wait(); + + if (this._isFallback) { + // We don't long-poll on main loop, instead we have a separate loop for that + this.send(new HttpWait({ + maxDelay: 0, + waitAfter: 0, + maxWait: 0, + })); + } + + // If we've had new ACKs appended while waiting for messages to send, add them to queue + appendAcks(); + const res = await this._send_queue.get(); + this.logWithIndex.debug(`Got ${res?.batch.length} message(s) to send`); + if (this.isReconnecting) { + this.logWithIndex.debug('Reconnecting :('); + this._send_loop_handle = undefined; return; } @@ -357,14 +533,17 @@ class MTProtoSender { let { data } = res; const { batch } = res; this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`); + this.logWithIndex.debug('Sending', batch.map((m) => m.request.className)); data = await this._state.encryptMessageData(data); try { - await this._connection.send(data); + await this.getConnection().send(data); } catch (e) { + this.logWithIndex.debug(`Connection closed while sending data ${e}`); this._log.error(e); this._log.info('Connection closed while sending data'); + this._send_loop_handle = undefined; return; } for (const state of batch) { @@ -372,16 +551,25 @@ class MTProtoSender { if (state.request.classType === 'request') { this._pending_state.set(state.msgId, state); } + if (state.request.className === 'HttpWait') { + state.resolve(); + } } else { for (const s of state) { if (s.request.classType === 'request') { this._pending_state.set(s.msgId, s); } + if (s.request.className === 'HttpWait') { + state.resolve(); + } } } } + this.logWithIndex.debug('Encrypted messages put in a queue to be sent'); this._log.debug('Encrypted messages put in a queue to be sent'); } + + this._send_loop_handle = undefined; } async _recvLoop() { @@ -389,10 +577,10 @@ class MTProtoSender { let message; while (this._user_connected && !this.isReconnecting) { - // this._log.debug('Receiving items from the network...'); this._log.debug('Receiving items from the network...'); + this.logWithIndex.debug('Receiving items from the network...'); try { - body = await this._connection.recv(); + body = await this.getConnection().recv(); } catch (e) { // this._log.info('Connection closed while receiving data'); /** when the server disconnects us we want to reconnect */ @@ -401,11 +589,14 @@ class MTProtoSender { this._log.warn('Connection closed while receiving data'); this.reconnect(); } + this._recv_loop_handle = undefined; return; } + try { message = await this._state.decryptMessageData(body); } catch (e) { + this.logWithIndex.debug(`Error while receiving items from the network ${e.toString()}`); if (e instanceof TypeNotFoundError) { // Received object which we don't know how to deserialize this._log.info(`Type ${e.invalidConstructorId} not found, remaining data ${e.remaining}`); @@ -426,11 +617,13 @@ class MTProtoSender { this._log.warn(`Invalid buffer ${e.code} for dc ${this._dcId}`); this.reconnect(); } + this._recv_loop_handle = undefined; return; } else { this._log.error('Unhandled error while receiving data'); this._log.error(e); this.reconnect(); + this._recv_loop_handle = undefined; return; } } @@ -448,7 +641,22 @@ class MTProtoSender { this._log.error(e); } } + + void this.checkLongPoll(); } + + this._recv_loop_handle = undefined; + } + + checkLongPoll() { + if (this.isSendingLongPoll || !this._isFallback) return; + + this.isSendingLongPoll = true; + this.send(new HttpWait({ + maxDelay: LONGPOLL_MAX_DELAY, + waitAfter: LONGPOLL_WAIT_AFTER, + maxWait: LONGPOLL_MAX_WAIT, + }), undefined, true); } _handleBadAuthKey(shouldSkipForMain) { @@ -476,7 +684,14 @@ class MTProtoSender { * @private */ async _processMessage(message) { + if (message.obj.className === 'MsgsAck') return; + this.logWithIndex.debug(`Process message ${message.obj.className}`); + this._pending_ack.add(message.msgId); + + if (this.getConnection().shouldLongPoll) { + this._send_queue.setReady(true); + } // eslint-disable-next-line require-atomic-updates message.obj = await message.obj; let handler = this._handlers[message.obj.CONSTRUCTOR_ID]; @@ -549,14 +764,18 @@ class MTProtoSender { throw new TypeNotFoundError('Not an upload.File'); } } catch (e) { - this._log.error(e); if (e instanceof TypeNotFoundError) { this._log.info(`Received response without parent request: ${result.body}`); return; - } else { - throw e; + } else if (this._isFallback) { + // If we're using HTTP transport, there might be a chance that the response comes through + // multiple times if didn't send acknowledgment in time, so we should just ignore it + return; } + + throw e; } + return; } if (result.error) { @@ -569,6 +788,7 @@ class MTProtoSender { try { const reader = new BinaryReader(result.body); const read = state.request.readResult(reader); + this.logWithIndex.debug('Handling RPC result', read); state.resolve(read); } catch (err) { state.reject(err); @@ -808,6 +1028,7 @@ class MTProtoSender { // in case of internal server issues. Helpers.sleep(1000) .then(() => { + this.logWithIndex.log('Reconnecting...'); this._log.info('Started reconnecting'); this._reconnect(); }); @@ -817,7 +1038,8 @@ class MTProtoSender { async _reconnect() { this._log.debug('Closing current connection...'); try { - await this._disconnect(); + this.logWithIndex.warn('[Reconnect] Closing current connection...'); + await this._disconnect(this.getConnection()); } catch (err) { this._log.warn(err); } @@ -833,9 +1055,18 @@ class MTProtoSender { this._connection._log, this._connection._testServers, ); - await this.connect(newConnection, true); + const newFallbackConnection = new this._fallbackConnection.constructor( + this._connection._ip, + this._connection._port, + this._connection._dcId, + this._connection._log, + this._connection._testServers, + ); + await this.connect(newConnection, true, newFallbackConnection); this.isReconnecting = false; + this._send_queue.prepend(this._pending_state.values()); + this._pending_state.clear(); if (this._autoReconnectCallback) { await this._autoReconnectCallback(); diff --git a/src/lib/gramjs/network/connection/Connection.js b/src/lib/gramjs/network/connection/Connection.js index f2e054fae..be966916b 100644 --- a/src/lib/gramjs/network/connection/Connection.js +++ b/src/lib/gramjs/network/connection/Connection.js @@ -1,4 +1,5 @@ const PromisedWebSockets = require('../../extensions/PromisedWebSockets'); +const HttpStream = require('../../extensions/HttpStream').default; const AsyncQueue = require('../../extensions/AsyncQueue'); /** @@ -31,9 +32,14 @@ class Connection { this._recvArray = new AsyncQueue(); // this.socket = new PromiseSocket(new Socket()) + this.shouldLongPoll = false; this.socket = new PromisedWebSockets(this.disconnectCallback.bind(this)); } + isConnected() { + return this._connected; + } + async disconnectCallback() { await this.disconnect(true); } @@ -178,8 +184,36 @@ class PacketCodec { } } +class HttpConnection extends Connection { + constructor(ip, port, dcId, loggers, testServers, isPremium) { + super(ip, port, dcId, loggers, testServers, isPremium); + this.shouldLongPoll = true; + this.socket = new HttpStream(this.disconnectCallback.bind(this)); + } + + send(data) { + return this.socket.write(data); + } + + recv() { + return this.socket.read(); + } + + async _connect() { + this._log.debug('Connecting'); + await this.socket.connect(this._port, this._ip, this._testServers, this._isPremium); + this._log.debug('Finished connecting'); + } + + async connect() { + await this._connect(); + this._connected = true; + } +} + module.exports = { Connection, PacketCodec, ObfuscatedConnection, + HttpConnection, }; diff --git a/src/lib/gramjs/network/connection/index.js b/src/lib/gramjs/network/connection/index.js index 737623155..e350e8873 100644 --- a/src/lib/gramjs/network/connection/index.js +++ b/src/lib/gramjs/network/connection/index.js @@ -1,10 +1,11 @@ -const { Connection } = require('./Connection'); +const { Connection, HttpConnection } = require('./Connection'); const { ConnectionTCPFull } = require('./TCPFull'); const { ConnectionTCPAbridged } = require('./TCPAbridged'); const { ConnectionTCPObfuscated } = require('./TCPObfuscated'); module.exports = { Connection, + HttpConnection, ConnectionTCPFull, ConnectionTCPAbridged, ConnectionTCPObfuscated, diff --git a/src/lib/gramjs/network/index.js b/src/lib/gramjs/network/index.js index 9b710d329..6bdf19d92 100644 --- a/src/lib/gramjs/network/index.js +++ b/src/lib/gramjs/network/index.js @@ -6,6 +6,7 @@ const { ConnectionTCPFull, ConnectionTCPAbridged, ConnectionTCPObfuscated, + HttpConnection, } = require('./connection'); const { @@ -15,6 +16,7 @@ const { module.exports = { Connection, + HttpConnection, ConnectionTCPFull, ConnectionTCPAbridged, ConnectionTCPObfuscated, diff --git a/src/lib/secret-sauce/secretsauce.ts b/src/lib/secret-sauce/secretsauce.ts index 5d340134b..2a9360c6f 100644 --- a/src/lib/secret-sauce/secretsauce.ts +++ b/src/lib/secret-sauce/secretsauce.ts @@ -148,7 +148,7 @@ function updateGroupCallStreams(userId: string) { } async function getUserStream(streamType: StreamType, facing: VideoFacingModeEnum = 'user') { - if(streamType === 'audio' && state?.audioStream) { + if (streamType === 'audio' && state?.audioStream) { return state.audioStream; } @@ -162,19 +162,19 @@ async function getUserStream(streamType: StreamType, facing: VideoFacingModeEnum const media = await navigator.mediaDevices.getUserMedia({ audio: streamType === 'audio' ? { // @ts-ignore - ...(IS_ECHO_CANCELLATION_SUPPORTED && { echoCancellation: true }), - ...(IS_NOISE_SUPPRESSION_SUPPORTED && { noiseSuppression: true }), + ...(IS_ECHO_CANCELLATION_SUPPORTED && {echoCancellation: true}), + ...(IS_NOISE_SUPPRESSION_SUPPORTED && {noiseSuppression: true}), } : false, video: streamType === 'video' ? { facingMode: facing, } : false, }); - if(state && streamType === 'audio'){ + if (state && streamType === 'audio') { state.audioStream = media; } - if(streamType === 'video') { + if (streamType === 'video') { const vid = document.createElement('video'); vid.srcObject = media; diff --git a/src/types/index.ts b/src/types/index.ts index 15b7a9159..34ca05a4d 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -100,6 +100,10 @@ export interface ISettings extends NotifySettings, Record { doNotTranslate: string[]; canDisplayChatInTitle: boolean; shouldShowLoginCodeInChatList?: boolean; + shouldForceHttpTransport?: boolean; + shouldAllowHttpTransport?: boolean; + shouldCollectDebugLogs?: boolean; + shouldDebugExportedSenders?: boolean; } export interface ApiPrivacySettings { diff --git a/src/util/debugConsole.ts b/src/util/debugConsole.ts new file mode 100644 index 000000000..09977060a --- /dev/null +++ b/src/util/debugConsole.ts @@ -0,0 +1,50 @@ +/* eslint-disable no-console */ + +export const DEBUG_LEVELS = ['log', 'error', 'warn', 'info', 'debug'] as const; +export type DebugLevel = typeof DEBUG_LEVELS[number]; +// @ts-ignore +const ORIGINAL_FUNCTIONS: Record void> = DEBUG_LEVELS.reduce((acc, level) => { + // @ts-ignore + acc[level] = console[level]; + return acc; +}, {}); + +type DebugEntry = { + level: DebugLevel; + args: any[]; + date: Date; +}; +let DEBUG_LOGS: DebugEntry[] = []; + +export function logDebugMessage(level: DebugLevel, ...args: any[]) { + DEBUG_LOGS.push({ + level, + args, + date: new Date(), + }); + ORIGINAL_FUNCTIONS[level](...args); +} + +export function initDebugConsole() { + DEBUG_LOGS = []; + DEBUG_LEVELS.forEach((level) => { + // @ts-ignore + console[level] = (...args: any[]) => { + logDebugMessage(level, ...args); + }; + }); +} + +export function disableDebugConsole() { + DEBUG_LEVELS.forEach((level) => { + // @ts-ignore + console[level] = ORIGINAL_FUNCTIONS[level]; + }); + DEBUG_LOGS = []; +} + +export function getDebugLogs() { + return JSON.stringify(DEBUG_LOGS, (key, value) => (typeof value === 'bigint' + ? value.toString() + : value)); +} diff --git a/src/util/withAbortCheck.ts b/src/util/withAbortCheck.ts index c2b784561..41471f40a 100644 --- a/src/util/withAbortCheck.ts +++ b/src/util/withAbortCheck.ts @@ -4,8 +4,8 @@ export class AbortError extends Error { } } -export default async function withAbortCheck(abortSignal: AbortSignal, cb: Promise): Promise { - const result = await cb; +export default async function withAbortCheck(abortSignal: AbortSignal, promise: Promise): Promise { + const result = await promise; if (abortSignal?.aborted) { throw new AbortError();