GramJS: Fallback to HTTP and fix various connection issues

This commit is contained in:
Alexander Zinchuk 2023-07-20 15:58:36 +02:00
parent afd894a3a5
commit ef02a4a11d
41 changed files with 1047 additions and 149 deletions

32
dev/log.html Normal file
View File

@ -0,0 +1,32 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Logs Viewer</title>
</head>
<body>
<input type="file" id="log" accept="application/json">
<script>
document.querySelector("#log").addEventListener('change', (e) => {
const { files } = e.target;
const file = files[0];
const json = new FileReader();
json.onload = (e) => {
const { result } = e.target;
const logs = JSON.parse(result);
logs.forEach((log) => {
const { level, args, date, } = log;
const dateStr = "[" + new Date(date).toLocaleTimeString("en-GB") + "] ";
if(args[0] && typeof args[0] === 'string' && args[0].startsWith("%")) {
args[0] = dateStr + args[0];
} else {
args.unshift(dateStr);
}
console[level](...args);
});
};
json.readAsText(file);
})
</script>
</body>
</html>

View File

@ -6,6 +6,7 @@ const LOG_BACKGROUND = '#111111DD';
const LOG_PREFIX_COLOR = '#E4D00A';
const LOG_SUFFIX = {
INVOKE: '#49DBF5',
BEACON: '#F549DB',
RESPONSE: '#6887F7',
CONNECTING: '#E4D00A',
CONNECTED: '#26D907',

View File

@ -5,4 +5,5 @@ export {
handleMethodResponse,
updateFullLocalDb,
updateLocalDb,
setShouldEnableDebugLog,
} from './worker/provider';

View File

@ -5,7 +5,7 @@ import type {
} from '../../types';
import { Api as GramJs } from '../../../lib/gramjs';
import { invokeRequest } from './client';
import { invokeRequest, invokeRequestBeacon } from './client';
import {
buildInputGroupCall, buildInputPeer, buildInputPhoneCall, generateRandomInt,
} from '../gramjsBuilders';
@ -151,13 +151,20 @@ export async function fetchGroupCallParticipants({
}
export function leaveGroupCall({
call,
call, isPageUnload,
}: {
call: ApiGroupCall;
call: ApiGroupCall; isPageUnload?: boolean;
}) {
return invokeRequest(new GramJs.phone.LeaveGroupCall({
const request = new GramJs.phone.LeaveGroupCall({
call: buildInputGroupCall(call),
}), {
});
if (isPageUnload) {
invokeRequestBeacon(request);
return;
}
invokeRequest(request, {
shouldReturnTrue: true,
});
}
@ -270,14 +277,21 @@ export async function getDhConfig() {
}
export function discardCall({
call, isBusy,
call, isBusy, isPageUnload,
}: {
call: ApiPhoneCall; isBusy?: boolean;
call: ApiPhoneCall; isBusy?: boolean; isPageUnload?: boolean;
}) {
return invokeRequest(new GramJs.phone.DiscardCall({
const request = new GramJs.phone.DiscardCall({
peer: buildInputPhoneCall(call),
reason: isBusy ? new GramJs.PhoneCallDiscardReasonBusy() : new GramJs.PhoneCallDiscardReasonHangup(),
}), {
});
if (isPageUnload) {
invokeRequestBeacon(request);
return;
}
invokeRequest(request, {
shouldReturnTrue: true,
});
}

View File

@ -59,7 +59,8 @@ export async function init(_onUpdate: OnApiUpdate, initialArgs: ApiInitialArgs)
const {
userAgent, platform, sessionData, isTest, isMovSupported, isWebmSupported, maxBufferSize, webAuthToken, dcId,
mockScenario,
mockScenario, shouldForceHttpTransport, shouldAllowHttpTransport,
shouldDebugExportedSenders,
} = initialArgs;
const session = new sessions.CallbackSession(sessionData, onSessionUpdate);
@ -82,6 +83,9 @@ export async function init(_onUpdate: OnApiUpdate, initialArgs: ApiInitialArgs)
appVersion: `${APP_VERSION} ${APP_CODE_NAME}`,
useWSS: true,
additionalDcsDisabled: IS_TEST,
shouldDebugExportedSenders,
shouldForceHttpTransport,
shouldAllowHttpTransport,
testServers: isTest,
dcId,
} as any,
@ -292,6 +296,17 @@ export async function invokeRequest<T extends GramJs.AnyRequest>(
}
}
export function invokeRequestBeacon<T extends GramJs.AnyRequest>(
request: T,
dcId?: number,
) {
if (DEBUG) {
log('BEACON', request.className);
}
client.invokeBeacon(request, dcId);
}
export async function downloadMedia(
args: { url: string; mediaFormat: ApiMediaFormat; start?: number; end?: number; isHtmlAllowed?: boolean },
onProgress?: ApiOnProgress,
@ -455,3 +470,15 @@ export async function repairFileReference({
return false;
}
export function setForceHttpTransport(forceHttpTransport: boolean) {
client.setForceHttpTransport(forceHttpTransport);
}
export function setAllowHttpTransport(allowHttpTransport: boolean) {
client.setAllowHttpTransport(allowHttpTransport);
}
export function setShouldDebugExportedSenders(value: boolean) {
client.setShouldDebugExportedSenders(value);
}

View File

@ -1,5 +1,6 @@
export {
destroy, disconnect, downloadMedia, fetchCurrentUser, repairFileReference, abortChatRequests, abortRequestGroup,
setForceHttpTransport, setShouldDebugExportedSenders, setAllowHttpTransport,
} from './client';
export {

View File

@ -107,8 +107,8 @@ export function updateChannelState(channelId: string, pts: number) {
}
function applyUpdate(updateObject: SeqUpdate | PtsUpdate) {
if ('seq' in updateObject) {
if (updateObject.seq) localDb.commonBoxState.seq = updateObject.seq;
if ('seq' in updateObject && updateObject.seq) {
localDb.commonBoxState.seq = updateObject.seq;
localDb.commonBoxState.date = updateObject.date;
}
@ -165,7 +165,9 @@ function popSeqQueue() {
const localSeq = localDb.commonBoxState.seq;
const seqStart = 'seqStart' in update ? update.seqStart : update.seq;
if (seqStart === 0 || seqStart === localSeq + 1) {
if (seqStart === 0) {
applyUpdate(update);
} else if (seqStart === localSeq + 1) {
clearTimeout(seqTimeout);
seqTimeout = undefined;

View File

@ -11,12 +11,14 @@ import generateUniqueId from '../../../util/generateUniqueId';
import { pause } from '../../../util/schedulers';
import { getCurrentTabId, subscribeToMasterChange } from '../../../util/establishMultitabRole';
import Deferred from '../../../util/Deferred';
import { logDebugMessage } from '../../../util/debugConsole';
type RequestStates = {
messageId: string;
resolve: Function;
reject: Function;
callback?: AnyToVoidFunction;
DEBUG_payload?: any;
};
const HEALTH_CHECK_TIMEOUT = 150;
@ -82,6 +84,13 @@ export function initApi(onUpdate: OnApiUpdate, initialArgs: ApiInitialArgs) {
worker = new Worker(new URL('./worker.ts', import.meta.url));
subscribeToWorker(onUpdate);
if (requestStates.size > 0) {
requestStates.forEach((value) => {
// eslint-disable-next-line no-console
console.error('Hanging request', value.DEBUG_payload);
});
}
if (initialArgs.platform === 'iOS') {
setupIosHealthCheck();
}
@ -127,6 +136,13 @@ export function callApiOnMasterTab(payload: any) {
});
}
export function setShouldEnableDebugLog(value: boolean) {
return makeRequest({
type: 'toggleDebugMode',
isEnabled: value,
});
}
/*
* Call a worker method on this tab's worker, without transferring to master tab
* Mostly needed to disconnect worker when re-electing master
@ -244,6 +260,7 @@ export function cancelApiProgressMaster(messageId: string) {
function subscribeToWorker(onUpdate: OnApiUpdate) {
worker?.addEventListener('message', ({ data }: WorkerMessageEvent) => {
if (!data) return;
if (data.type === 'update') {
onUpdate(data.update);
} else if (data.type === 'methodResponse') {
@ -252,6 +269,10 @@ function subscribeToWorker(onUpdate: OnApiUpdate) {
handleMethodCallback(data);
} else if (data.type === 'unhandledError') {
throw new Error(data.error?.message);
} else if (data.type === 'sendBeacon') {
navigator.sendBeacon(data.url, data.data);
} else if (data.type === 'debugLog') {
logDebugMessage(data.level, ...data.args);
}
});
}
@ -344,6 +365,8 @@ function makeRequest(message: OriginRequest) {
requestStatesByCallback.set(callback, requestState);
}
requestState.DEBUG_payload = payload;
requestStates.set(messageId, requestState);
promise
@ -388,7 +411,7 @@ async function ensureWorkerPing() {
if (Date.now() - startedAt >= HEALTH_CHECK_MIN_DELAY) {
worker?.terminate();
worker = undefined;
updateCallback({ '@type': 'requestInitApi' });
updateCallback({ '@type': 'requestReconnectApi' });
}
} finally {
isResolved = true;

View File

@ -1,6 +1,7 @@
import type { ApiInitialArgs, ApiUpdate } from '../../types';
import type { Methods, MethodArgs, MethodResponse } from '../methods/types';
import type { LocalDb } from '../localDb';
import type { DebugLevel } from '../../../util/debugConsole';
export type ThenArg<T> = T extends Promise<infer U> ? U : T;
@ -19,6 +20,14 @@ export type WorkerMessageData = {
} | {
type: 'unhandledError';
error?: { message: string };
} | {
type: 'sendBeacon';
url: string;
data: ArrayBuffer;
} | {
type: 'debugLog';
level: DebugLevel;
args: any[];
};
export interface WorkerMessageEvent {
@ -38,6 +47,10 @@ export type OriginRequest = {
} | {
type: 'ping';
messageId?: string;
} | {
type: 'toggleDebugMode';
messageId?: string;
isEnabled?: boolean;
};
export type OriginMessageData = OriginRequest | {

View File

@ -1,18 +1,46 @@
/* eslint-disable no-console */
import type { ApiOnProgress, ApiUpdate } from '../../types';
import type { OriginMessageEvent, WorkerMessageData } from './types';
import { DEBUG } from '../../../config';
import { initApi, callApi, cancelApiProgress } from '../provider';
import { log } from '../helpers';
import type { DebugLevel } from '../../../util/debugConsole';
import { DEBUG_LEVELS } from '../../../util/debugConsole';
declare const self: WorkerGlobalScope;
const ORIGINAL_FUNCTIONS = DEBUG_LEVELS.reduce((acc, level) => {
acc[level] = console[level];
return acc;
}, {} as Record<DebugLevel, (...args: any[]) => void>);
function enableDebugLog() {
DEBUG_LEVELS.forEach((level) => {
console[level] = (...args: any[]) => {
postMessage({
type: 'debugLog',
level,
args: JSON.parse(JSON.stringify(args, (key, value) => (typeof value === 'bigint'
? value.toString()
: value))),
});
};
});
}
function disableDebugLog() {
DEBUG_LEVELS.forEach((level) => {
console[level] = ORIGINAL_FUNCTIONS[level];
});
}
handleErrors();
const callbackState = new Map<string, ApiOnProgress>();
if (DEBUG) {
// eslint-disable-next-line no-console
console.log('>>> FINISH LOAD WORKER');
}
@ -70,7 +98,6 @@ onmessage = async (message: OriginMessageEvent) => {
}
} catch (error: any) {
if (DEBUG) {
// eslint-disable-next-line no-console
console.error(error);
}
@ -105,18 +132,23 @@ onmessage = async (message: OriginMessageEvent) => {
break;
}
case 'toggleDebugMode': {
if (data.isEnabled) {
enableDebugLog();
} else {
disableDebugLog();
}
}
}
};
function handleErrors() {
self.onerror = (e) => {
// eslint-disable-next-line no-console
console.error(e);
sendToOrigin({ type: 'unhandledError', error: { message: e.error.message || 'Uncaught exception in worker' } });
};
self.addEventListener('unhandledrejection', (e) => {
// eslint-disable-next-line no-console
console.error(e);
sendToOrigin({ type: 'unhandledError', error: { message: e.reason.message || 'Uncaught rejection in worker' } });
});
@ -133,7 +165,7 @@ function onUpdate(update: ApiUpdate) {
}
}
function sendToOrigin(data: WorkerMessageData, arrayBuffer?: ArrayBuffer) {
export function sendToOrigin(data: WorkerMessageData, arrayBuffer?: ArrayBuffer) {
if (arrayBuffer) {
postMessage(data, [arrayBuffer]);
} else {

View File

@ -13,6 +13,9 @@ export interface ApiInitialArgs {
webAuthToken?: string;
dcId?: number;
mockScenario?: string;
shouldAllowHttpTransport?: boolean;
shouldForceHttpTransport?: boolean;
shouldDebugExportedSenders?: boolean;
}
export interface ApiOnProgress {

View File

@ -615,8 +615,8 @@ export type ApiUpdateMessageTranslations = {
toLanguageCode: string;
};
export type ApiRequestInitApi = {
'@type': 'requestInitApi';
export type ApiRequestReconnectApi = {
'@type': 'requestReconnectApi';
};
export type ApiRequestSync = {
@ -649,7 +649,7 @@ export type ApiUpdate = (
ApiUpdatePhoneCallConnectionState | ApiUpdateBotMenuButton | ApiUpdateTranscribedAudio | ApiUpdateUserEmojiStatus |
ApiUpdateMessageExtendedMedia | ApiUpdateConfig | ApiUpdateTopicNotifyExceptions | ApiUpdatePinnedTopic |
ApiUpdatePinnedTopicsOrder | ApiUpdateTopic | ApiUpdateTopics | ApiUpdateRecentEmojiStatuses |
ApiUpdateRecentReactions | ApiRequestInitApi | ApiRequestSync
ApiUpdateRecentReactions | ApiRequestReconnectApi | ApiRequestSync
);
export type OnApiUpdate = (update: ApiUpdate) => void;

View File

@ -9,7 +9,7 @@
.message-content.no-text & {
margin-bottom: calc(0.8125rem - 0.375rem);
&[dir=rtl] {
&[dir="rtl"] {
margin-bottom: 1.5rem;
}
}

View File

@ -2,11 +2,15 @@ import React, { memo } from '../../../lib/teact/teact';
import type { FC } from '../../../lib/teact/teact';
import { DEBUG_LOG_FILENAME } from '../../../config';
import { getActions, withGlobal } from '../../../global';
import { LOCAL_TGS_URLS } from '../../common/helpers/animatedAssets';
import { getDebugLogs } from '../../../util/debugConsole';
import download from '../../../util/download';
import useHistoryBack from '../../../hooks/useHistoryBack';
import useLang from '../../../hooks/useLang';
import useLastCallback from '../../../hooks/useLastCallback';
import AnimatedIcon from '../../common/AnimatedIcon';
import ListItem from '../../ui/ListItem';
@ -19,12 +23,20 @@ type OwnProps = {
type StateProps = {
shouldShowLoginCodeInChatList?: boolean;
shouldForceHttpTransport?: boolean;
shouldAllowHttpTransport?: boolean;
shouldCollectDebugLogs?: boolean;
shouldDebugExportedSenders?: boolean;
};
const SettingsExperimental: FC<OwnProps & StateProps> = ({
isActive,
onReset,
shouldShowLoginCodeInChatList,
shouldForceHttpTransport,
shouldAllowHttpTransport,
shouldCollectDebugLogs,
shouldDebugExportedSenders,
}) => {
const { requestConfetti, setSettingOption } = getActions();
const lang = useLang();
@ -34,6 +46,12 @@ const SettingsExperimental: FC<OwnProps & StateProps> = ({
onBack: onReset,
});
const handleDownloadLog = useLastCallback(() => {
const file = new File([getDebugLogs()], DEBUG_LOG_FILENAME, { type: 'text/plain' });
const url = URL.createObjectURL(file);
download(url, DEBUG_LOG_FILENAME);
});
return (
<div className="settings-content custom-scroll">
<div className="settings-content-header no-border">
@ -61,6 +79,43 @@ const SettingsExperimental: FC<OwnProps & StateProps> = ({
// eslint-disable-next-line react/jsx-no-bind
onCheck={() => setSettingOption({ shouldShowLoginCodeInChatList: !shouldShowLoginCodeInChatList })}
/>
<Checkbox
label="Allow HTTP Transport"
checked={Boolean(shouldAllowHttpTransport)}
// eslint-disable-next-line react/jsx-no-bind
onCheck={() => setSettingOption({ shouldAllowHttpTransport: !shouldAllowHttpTransport })}
/>
<Checkbox
label="Force HTTP Transport"
disabled={!shouldAllowHttpTransport}
checked={Boolean(shouldForceHttpTransport)}
// eslint-disable-next-line react/jsx-no-bind
onCheck={() => setSettingOption({ shouldForceHttpTransport: !shouldForceHttpTransport })}
/>
<Checkbox
label={lang('DebugMenuEnableLogs')}
checked={Boolean(shouldCollectDebugLogs)}
// eslint-disable-next-line react/jsx-no-bind
onCheck={() => setSettingOption({ shouldCollectDebugLogs: !shouldCollectDebugLogs })}
/>
<Checkbox
label="Enable exported senders debug"
checked={Boolean(shouldDebugExportedSenders)}
// eslint-disable-next-line react/jsx-no-bind
onCheck={() => setSettingOption({ shouldDebugExportedSenders: !shouldDebugExportedSenders })}
/>
<ListItem
// eslint-disable-next-line react/jsx-no-bind
onClick={handleDownloadLog}
icon="bug"
>
<div className="title">Download log</div>
</ListItem>
</div>
</div>
);
@ -70,6 +125,10 @@ export default memo(withGlobal(
(global): StateProps => {
return {
shouldShowLoginCodeInChatList: global.settings.byKey.shouldShowLoginCodeInChatList,
shouldForceHttpTransport: global.settings.byKey.shouldForceHttpTransport,
shouldAllowHttpTransport: global.settings.byKey.shouldAllowHttpTransport,
shouldCollectDebugLogs: global.settings.byKey.shouldCollectDebugLogs,
shouldDebugExportedSenders: global.settings.byKey.shouldDebugExportedSenders,
};
},
)(SettingsExperimental));

View File

@ -8,13 +8,14 @@ import type { ApiAttachMenuPeerType } from '../../../api/types';
import type { ISettings } from '../../../types';
import {
CONTENT_TYPES_WITH_PREVIEW, SUPPORTED_AUDIO_CONTENT_TYPES,
CONTENT_TYPES_WITH_PREVIEW, DEBUG_LOG_FILENAME, SUPPORTED_AUDIO_CONTENT_TYPES,
SUPPORTED_IMAGE_CONTENT_TYPES,
SUPPORTED_VIDEO_CONTENT_TYPES,
} from '../../../config';
import { IS_TOUCH_ENV } from '../../../util/windowEnvironment';
import { openSystemFilesDialog } from '../../../util/systemFilesDialog';
import { validateFiles } from '../../../util/files';
import { getDebugLogs } from '../../../util/debugConsole';
import useLastCallback from '../../../hooks/useLastCallback';
import useMouseInside from '../../../hooks/useMouseInside';
@ -41,6 +42,7 @@ export type OwnProps = {
isScheduled?: boolean;
attachBots: GlobalState['attachMenu']['bots'];
peerType?: ApiAttachMenuPeerType;
shouldCollectDebugLogs?: boolean;
onFileSelect: (files: File[], shouldSuggestCompression?: boolean) => void;
onPollCreate: () => void;
theme: ISettings['theme'];
@ -62,6 +64,7 @@ const AttachMenu: FC<OwnProps> = ({
onFileSelect,
onPollCreate,
theme,
shouldCollectDebugLogs,
}) => {
const [isAttachMenuOpen, openAttachMenu, closeAttachMenu] = useFlag();
const [handleMouseEnter, handleMouseLeave, markMouseInside] = useMouseInside(isAttachMenuOpen, closeAttachMenu);
@ -109,6 +112,11 @@ const AttachMenu: FC<OwnProps> = ({
), (e) => handleFileSelect(e, false));
});
const handleSendLogs = useLastCallback(() => {
const file = new File([getDebugLogs()], DEBUG_LOG_FILENAME, { type: 'text/plain' });
onFileSelect([file]);
});
const bots = useMemo(() => {
return Object.values(attachBots).filter((bot) => {
if (!peerType) return false;
@ -174,6 +182,11 @@ const AttachMenu: FC<OwnProps> = ({
{lang(!canSendDocuments && canSendAudios ? 'InputAttach.Popover.Music' : 'AttachDocument')}
</MenuItem>
)}
{canSendDocuments && shouldCollectDebugLogs && (
<MenuItem icon="bug" onClick={handleSendLogs}>
{lang('DebugSendLogs')}
</MenuItem>
)}
</>
)}
{canAttachPolls && (

View File

@ -206,6 +206,7 @@ type StateProps =
attachmentSettings: GlobalState['attachmentSettings'];
slowMode?: ApiChatFullInfo['slowMode'];
shouldUpdateStickerSetOrder?: boolean;
shouldCollectDebugLogs?: boolean;
};
enum MainButtonState {
@ -290,6 +291,7 @@ const Composer: FC<OwnProps & StateProps> = ({
theme,
slowMode,
shouldUpdateStickerSetOrder,
shouldCollectDebugLogs,
}) => {
const {
sendMessage,
@ -1492,6 +1494,7 @@ const Composer: FC<OwnProps & StateProps> = ({
isScheduled={shouldSchedule}
attachBots={attachBots}
peerType={attachMenuPeerType}
shouldCollectDebugLogs={shouldCollectDebugLogs}
theme={theme}
/>
{Boolean(botKeyboardMessageId) && (
@ -1684,6 +1687,7 @@ export default memo(withGlobal<OwnProps>(
attachmentSettings: global.attachmentSettings,
slowMode,
currentMessageList,
shouldCollectDebugLogs: global.settings.byKey.shouldCollectDebugLogs,
};
},
)(Composer));

View File

@ -14,6 +14,7 @@ export const IS_ELECTRON = process.env.IS_ELECTRON;
export const DEBUG = process.env.APP_ENV !== 'production';
export const DEBUG_MORE = false;
export const DEBUG_LOG_FILENAME = 'tt-log.json';
export const STRICTERDOM_ENABLED = DEBUG && !IS_ELECTRON;
export const BETA_CHANGELOG_URL = 'https://telegra.ph/WebA-Beta-03-20';

View File

@ -32,7 +32,7 @@ const HANG_UP_UI_DELAY = 500;
addActionHandler('leaveGroupCall', async (global, actions, payload): Promise<void> => {
const {
isFromLibrary, shouldDiscard, shouldRemove, rejoin,
tabId = getCurrentTabId(),
isPageUnload, tabId = getCurrentTabId(),
} = payload || {};
const groupCall = selectActiveGroupCall(global);
@ -51,7 +51,7 @@ addActionHandler('leaveGroupCall', async (global, actions, payload): Promise<voi
setGlobal(global);
await callApi('leaveGroupCall', {
call: groupCall,
call: groupCall, isPageUnload,
});
await callApi('abortRequestGroup', 'call');
@ -331,7 +331,7 @@ addActionHandler('setCallRating', (global, actions, payload): ActionReturnType =
});
addActionHandler('hangUp', (global, actions, payload): ActionReturnType => {
const { tabId = getCurrentTabId() } = payload || {};
const { isPageUnload, tabId = getCurrentTabId() } = payload || {};
const { phoneCall } = global;
if (!phoneCall) return undefined;
@ -354,7 +354,7 @@ addActionHandler('hangUp', (global, actions, payload): ActionReturnType => {
callApi('destroyPhoneCallState');
stopPhoneCall();
callApi('discardCall', { call: phoneCall });
callApi('discardCall', { call: phoneCall, isPageUnload });
if (phoneCall.state === 'requesting') {
global = {

View File

@ -2,7 +2,9 @@ import {
addActionHandler, getGlobal, setGlobal,
} from '../../index';
import { initApi, callApi, callApiLocal } from '../../../api/gramjs';
import {
initApi, callApi, callApiLocal, setShouldEnableDebugLog,
} from '../../../api/gramjs';
import {
LANG_CACHE_NAME,
@ -54,7 +56,12 @@ addActionHandler('initApi', async (global, actions): Promise<void> => {
webAuthToken: initialLocationHash?.tgWebAuthToken,
dcId: initialLocationHash?.tgWebAuthDcId ? Number(initialLocationHash?.tgWebAuthDcId) : undefined,
mockScenario: initialLocationHash?.mockScenario,
shouldAllowHttpTransport: global.settings.byKey.shouldAllowHttpTransport,
shouldForceHttpTransport: global.settings.byKey.shouldForceHttpTransport,
shouldDebugExportedSenders: global.settings.byKey.shouldDebugExportedSenders,
});
void setShouldEnableDebugLog(Boolean(global.settings.byKey.shouldCollectDebugLogs));
});
addActionHandler('setAuthPhoneNumber', (global, actions, payload): ActionReturnType => {

View File

@ -277,7 +277,7 @@ addActionHandler('loadMessageReactions', (global, actions, payload): ActionRetur
const chat = selectChat(global, chatId);
if (!chat) {
if (!chat || global.connectionState !== 'connectionStateReady') {
return;
}

View File

@ -58,7 +58,15 @@ addActionHandler('apiUpdate', (global, actions, update): ActionReturnType => {
onUpdateCurrentUser(global, update);
break;
case 'requestInitApi':
case 'requestReconnectApi':
global = getGlobal();
global = { ...global, isSynced: false };
setGlobal(global);
onUpdateConnectionState(global, actions, {
'@type': 'updateConnectionState',
connectionState: 'connectionStateConnecting',
});
actions.initApi();
break;

View File

@ -219,7 +219,7 @@ addActionHandler('createGroupCallInviteLink', async (global, actions, payload):
});
addActionHandler('joinVoiceChatByLink', async (global, actions, payload): Promise<void> => {
const { username, inviteHash, tabId = getCurrentTabId() } = payload!;
const { username, inviteHash, tabId = getCurrentTabId() } = payload;
const chat = await fetchChatByUsername(global, username);

View File

@ -1,6 +1,8 @@
import { addCallback } from '../../../lib/teact/teactn';
import { requestMutation } from '../../../lib/fasterdom/fasterdom';
import { addActionHandler, getActions } from '../../index';
import {
addActionHandler, getActions, getGlobal, setGlobal,
} from '../../index';
import { SettingsScreens } from '../../../types';
import type { ActionReturnType, GlobalState } from '../../types';
@ -13,12 +15,14 @@ import { updateTabState } from '../../reducers/tabs';
import { getCurrentTabId } from '../../../util/establishMultitabRole';
import { applyPerformanceSettings } from '../../../util/perfomanceSettings';
import { selectCanAnimateInterface, selectChatFolder } from '../../selectors';
import { callApi, setShouldEnableDebugLog } from '../../../api/gramjs';
import { disableDebugConsole, initDebugConsole } from '../../../util/debugConsole';
let prevGlobal: GlobalState | undefined;
addCallback((global: GlobalState) => {
// eslint-disable-next-line eslint-multitab-tt/no-getactions-in-actions
const { updatePageTitle } = getActions();
const { updatePageTitle, updateShouldDebugExportedSenders, updateShouldEnableDebugLog } = getActions();
const oldGlobal = prevGlobal;
prevGlobal = global;
@ -62,6 +66,53 @@ addCallback((global: GlobalState) => {
if (settings.canDisplayChatInTitle !== prevSettings.canDisplayChatInTitle) {
updatePageTitle();
}
if (settings.shouldForceHttpTransport !== prevSettings.shouldForceHttpTransport) {
callApi('setForceHttpTransport', Boolean(settings.shouldForceHttpTransport));
}
if (settings.shouldAllowHttpTransport !== prevSettings.shouldAllowHttpTransport) {
callApi('setAllowHttpTransport', Boolean(settings.shouldAllowHttpTransport));
if (!settings.shouldAllowHttpTransport && settings.shouldForceHttpTransport) {
global = getGlobal();
global = {
...global,
settings: {
...global.settings,
byKey: {
...global.settings.byKey,
shouldForceHttpTransport: false,
},
},
};
setGlobal(global);
}
}
if (settings.shouldDebugExportedSenders !== prevSettings.shouldDebugExportedSenders) {
updateShouldDebugExportedSenders();
}
if (settings.shouldCollectDebugLogs !== prevSettings.shouldCollectDebugLogs) {
updateShouldEnableDebugLog();
}
});
addActionHandler('updateShouldEnableDebugLog', (global): ActionReturnType => {
const { settings } = global;
if (settings.byKey.shouldCollectDebugLogs) {
setShouldEnableDebugLog(true);
initDebugConsole();
} else {
setShouldEnableDebugLog(false);
disableDebugConsole();
}
});
addActionHandler('updateShouldDebugExportedSenders', (global): ActionReturnType => {
const { settings } = global;
callApi('setShouldDebugExportedSenders', Boolean(settings.byKey.shouldDebugExportedSenders));
});
addActionHandler('setSettingOption', (global, actions, payload): ActionReturnType => {

View File

@ -223,6 +223,7 @@ export const INITIAL_GLOBAL_STATE: GlobalState = {
canTranslate: false,
doNotTranslate: [],
canDisplayChatInTitle: true,
shouldAllowHttpTransport: true,
},
themes: {
light: {

View File

@ -2302,6 +2302,7 @@ export interface ActionPayloads {
shouldDiscard?: boolean;
shouldRemove?: boolean;
rejoin?: ActionPayloads['joinGroupCall'];
isPageUnload?: boolean;
} & WithTabId) | undefined;
toggleGroupCallVideo: undefined;
@ -2335,7 +2336,7 @@ export interface ActionPayloads {
isVideo?: boolean;
} & WithTabId;
sendSignalingData: P2pMessage;
hangUp: WithTabId | undefined;
hangUp: ({ isPageUnload?: boolean } & WithTabId) | undefined;
acceptCall: undefined;
setCallRating: {
rating: number;
@ -2361,6 +2362,8 @@ export interface ActionPayloads {
skipLockOnUnload: undefined;
// Settings
updateShouldDebugExportedSenders: undefined;
updateShouldEnableDebugLog: undefined;
loadConfig: undefined;
loadAppConfig: {
hash: number;

View File

@ -54,6 +54,9 @@ async function init() {
getActions().initShared();
getActions().init();
getActions().updateShouldEnableDebugLog();
getActions().updateShouldDebugExportedSenders();
if (IS_MULTITAB_SUPPORTED) {
establishMultitabRole();
subscribeToMasterChange((isMasterTab) => {
@ -90,3 +93,9 @@ async function init() {
});
}
}
onBeforeUnload(() => {
const actions = getActions();
actions.leaveGroupCall?.({ isPageUnload: true });
actions.hangUp?.({ isPageUnload: true });
});

View File

@ -14,6 +14,8 @@ declare class TelegramClient {
request: R, dcId?: number, abortSignal?: AbortSignal, shouldRetryOnTimeout?: boolean,
): Promise<R['__response']>;
async invokeBeacon<R extends Api.AnyRequest>(request: R, dcId?: number): void;
async uploadFile(uploadParams: UploadFileParams): ReturnType<typeof uploadFile>;
async downloadFile(uploadParams: DownloadFileParams): ReturnType<typeof downloadFile>;
@ -24,6 +26,10 @@ declare class TelegramClient {
setPingCallback(callback: () => Promise<void>);
setForceHttpTransport: (forceHttpTransport: boolean) => void;
setAllowHttpTransport: (allowHttpTransport: boolean) => void;
// Untyped methods.
[prop: string]: any;
}

View File

@ -15,6 +15,7 @@ const {
ConnectionTCPObfuscated,
MTProtoSender,
UpdateConnectionState,
HttpConnection,
} = require('../network');
const {
authFlow,
@ -27,21 +28,13 @@ const {
getTmpPassword,
} = require('./2fa');
const RequestState = require('../network/RequestState');
const withAbortCheck = require('../../../util/withAbortCheck').default;
const { AbortError } = require('../../../util/withAbortCheck');
class ConnectTimeoutError extends Error {
constructor() {
super('Connection Timeout');
}
}
const Deferred = require('../../../util/Deferred').default;
const DEFAULT_DC_ID = 2;
const WEBDOCUMENT_DC_ID = 4;
const EXPORTED_SENDER_RECONNECT_TIMEOUT = 1000; // 1 sec
const EXPORTED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec
const WEBDOCUMENT_REQUEST_PART_SIZE = 131072; // 128kb
const EXPORTED_SENDER_CONNECT_TIMEOUT = 8000; // 8 sec
const PING_INTERVAL = 3000; // 3 sec
const PING_TIMEOUT = 5000; // 5 sec
@ -63,12 +56,15 @@ const sizeTypes = ['u', 'v', 'w', 'y', 'd', 'x', 'c', 'm', 'b', 'a', 's', 'f'];
class TelegramClient {
static DEFAULT_OPTIONS = {
connection: ConnectionTCPObfuscated,
fallbackConnection: HttpConnection,
useIPV6: false,
proxy: undefined,
timeout: 10,
requestRetries: 5,
connectionRetries: Infinity,
connectionRetriesToFallback: 1,
retryDelay: 1000,
retryMainConnectionDelay: 10000,
autoReconnect: true,
sequentialUpdates: false,
floodSleepLimit: 60,
@ -82,6 +78,9 @@ class TelegramClient {
additionalDcsDisabled: false,
testServers: false,
dcId: DEFAULT_DC_ID,
shouldAllowHttpTransport: false,
shouldForceHttpTransport: false,
shouldDebugExportedSenders: false,
};
/**
@ -100,6 +99,9 @@ class TelegramClient {
this.apiHash = apiHash;
this.defaultDcId = args.dcId || DEFAULT_DC_ID;
this._useIPV6 = args.useIPV6;
this._shouldForceHttpTransport = args.shouldForceHttpTransport;
this._shouldAllowHttpTransport = args.shouldAllowHttpTransport;
this._shouldDebugExportedSenders = args.shouldDebugExportedSenders;
// this._entityCache = new Set()
if (typeof args.baseLogger === 'string') {
this._log = new Logger();
@ -128,7 +130,9 @@ class TelegramClient {
this._requestRetries = args.requestRetries;
this._connectionRetries = args.connectionRetries;
this._connectionRetriesToFallback = args.connectionRetriesToFallback;
this._retryDelay = args.retryDelay || 0;
this._retryMainConnectionDelay = args.retryMainConnectionDelay || 0;
if (args.proxy) {
this._log.warn('proxies are not supported');
}
@ -137,6 +141,7 @@ class TelegramClient {
this._autoReconnect = args.autoReconnect;
this._connection = args.connection;
this._fallbackConnection = args.fallbackConnection;
// TODO add proxy support
this._floodWaitedRequests = {};
@ -165,13 +170,13 @@ class TelegramClient {
this._config = undefined;
this.phoneCodeHashes = [];
this._exportedSenderPromises = {};
this._exportedSenderAbortControllers = {};
this._waitingForAuthKey = {};
this._exportedSenderReleaseTimeouts = {};
this._additionalDcsDisabled = args.additionalDcsDisabled;
this._loopStarted = false;
this._isSwitchingDc = false;
this._destroyed = false;
this._connectedDeferred = new Deferred();
}
// region Connecting
@ -191,11 +196,16 @@ class TelegramClient {
logger: this._log,
dcId: this.session.dcId,
retries: this._connectionRetries,
retriesToFallback: this._connectionRetriesToFallback,
shouldForceHttpTransport: this._shouldForceHttpTransport,
shouldAllowHttpTransport: this._shouldAllowHttpTransport,
delay: this._retryDelay,
retryMainConnectionDelay: this._retryMainConnectionDelay,
autoReconnect: this._autoReconnect,
connectTimeout: this._timeout,
authKeyCallback: this._authKeyCallback.bind(this),
updateCallback: this._handleUpdate.bind(this),
getShouldDebugExportedSenders: this.getShouldDebugExportedSenders.bind(this),
isMainSender: true,
});
}
@ -208,8 +218,11 @@ class TelegramClient {
const connection = new this._connection(
this.session.serverAddress, this.session.port, this.session.dcId, this._log, this._args.testServers,
);
const fallbackConnection = new this._fallbackConnection(
this.session.serverAddress, this.session.port, this.session.dcId, this._log, this._args.testServers,
);
const newConnection = await this._sender.connect(connection);
const newConnection = await this._sender.connect(connection, undefined, fallbackConnection);
if (!newConnection) {
// we're already connected so no need to reset auth key.
if (!this._loopStarted) {
@ -228,6 +241,7 @@ class TelegramClient {
this._updateLoop();
this._loopStarted = true;
}
this._connectedDeferred.resolve();
this._isSwitchingDc = false;
}
@ -245,6 +259,28 @@ class TelegramClient {
this.pingCallback = callback;
}
async setForceHttpTransport(forceHttpTransport) {
this._shouldForceHttpTransport = forceHttpTransport;
await this.disconnect();
this._sender = undefined;
await this.connect();
}
async setAllowHttpTransport(allowHttpTransport) {
this._shouldAllowHttpTransport = allowHttpTransport;
await this.disconnect();
this._sender = undefined;
await this.connect();
}
setShouldDebugExportedSenders(shouldDebugExportedSenders) {
this._shouldDebugExportedSenders = shouldDebugExportedSenders;
}
getShouldDebugExportedSenders() {
return this._shouldDebugExportedSenders;
}
async _updateLoop() {
let lastPongAt;
@ -337,8 +373,13 @@ class TelegramClient {
}).flat(),
);
Object.values(this._exportedSenderReleaseTimeouts).forEach((timeouts) => {
Object.values(timeouts).forEach((releaseTimeout) => {
clearTimeout(releaseTimeout);
});
});
this._exportedSenderPromises = {};
this._exportedSenderAbortControllers = {};
this._waitingForAuthKey = {};
}
@ -369,6 +410,7 @@ class TelegramClient {
this.session.setAuthKey(undefined);
this._isSwitchingDc = true;
await this.disconnect();
this._sender = undefined;
return this.connect();
}
@ -383,9 +425,10 @@ class TelegramClient {
if (this.session.dcId !== dcId) {
this.session.setAuthKey(undefined, dcId);
}
// eslint-disable-next-line no-console
if (this._shouldDebugExportedSenders) console.log(`🧹 Cleanup idx=${index} dcId=${dcId}`);
const sender = await this._exportedSenderPromises[dcId][index];
delete this._exportedSenderPromises[dcId][index];
delete this._exportedSenderAbortControllers[dcId][index];
await sender.disconnect();
}
@ -400,7 +443,6 @@ class TelegramClient {
}
this._exportedSenderPromises[dcId] = {};
this._exportedSenderAbortControllers[dcId] = {};
await Promise.all(promises.map(async (promise) => {
const sender = await promise;
@ -408,7 +450,7 @@ class TelegramClient {
}));
}
async _connectSender(sender, dcId, index, isPremium = false, abortSignal) {
async _connectSender(sender, dcId, index, isPremium = false) {
// if we don't already have an auth key we want to use normal DCs not -1
let hasAuthKey = Boolean(sender.authKey.getKey());
let firstConnectResolver;
@ -432,7 +474,7 @@ class TelegramClient {
// eslint-disable-next-line no-constant-condition
while (true) {
try {
await withAbortCheck(abortSignal, sender.connect(new this._connection(
await sender.connect(new this._connection(
dc.ipAddress,
dc.port,
dcId,
@ -440,20 +482,24 @@ class TelegramClient {
this._args.testServers,
// Premium DCs are not stable for obtaining auth keys, so need to we first connect to regular ones
hasAuthKey ? isPremium : false,
)));
), undefined, new this._fallbackConnection(
dc.ipAddress,
dc.port,
dcId,
this._log,
this._args.testServers,
hasAuthKey ? isPremium : false,
));
if (this.session.dcId !== dcId && !sender._authenticated) {
this._log.info(`Exporting authorization for data center ${dc.ipAddress}`);
const auth = await withAbortCheck(
abortSignal,
this.invoke(new requests.auth.ExportAuthorization({ dcId })),
);
const auth = await this.invoke(new requests.auth.ExportAuthorization({ dcId }));
const req = this._initWith(new requests.auth.ImportAuthorization({
id: auth.id,
bytes: auth.bytes,
}));
await withAbortCheck(abortSignal, sender.send(req));
await sender.send(req);
sender._authenticated = true;
}
@ -465,13 +511,16 @@ class TelegramClient {
delete this._waitingForAuthKey[dcId];
}
if (this._shouldDebugExportedSenders) {
// eslint-disable-next-line no-console
console.warn(`✅ Connected to exported sender idx=${index} dc=${dcId}`);
}
return sender;
} catch (err) {
if (err instanceof AbortError) {
delete this._exportedSenderPromises[dcId][index];
delete this._exportedSenderAbortControllers[dcId][index];
sender.disconnect();
return undefined;
if (this._shouldDebugExportedSenders) {
// eslint-disable-next-line no-console
console.error(`☠️ ERROR! idx=${index} dcId=${dcId} ${err.message}`);
}
// eslint-disable-next-line no-console
console.error(err);
@ -482,54 +531,32 @@ class TelegramClient {
}
}
getConnectedExportedSenderIndex(dcId, i) {
const index = Object.keys(this._exportedSenderReleaseTimeouts[dcId] || {})[0];
const firstIndex = Number(index ?? 0);
return {
newIndex: firstIndex === i ? i + 1 : firstIndex,
noConnectedExportedSenders: index === undefined,
};
}
async _borrowExportedSender(dcId, shouldReconnect, existingSender, index, isPremium) {
if (this._additionalDcsDisabled) {
return undefined;
}
let i = index || 0;
const i = index || 0;
if (!this._exportedSenderPromises[dcId]) this._exportedSenderPromises[dcId] = {};
if (!this._exportedSenderAbortControllers[dcId]) this._exportedSenderAbortControllers[dcId] = {};
if (this._exportedSenderAbortControllers[dcId][i]?.signal.aborted) {
const { newIndex } = this.getConnectedExportedSenderIndex(dcId, i);
i = newIndex;
}
if (!this._exportedSenderPromises[dcId][i] || shouldReconnect) {
this._exportedSenderAbortControllers[dcId][i]?.abort();
this._exportedSenderAbortControllers[dcId][i] = new AbortController();
if (this._shouldDebugExportedSenders) {
// eslint-disable-next-line no-console
console.warn(`🕒 Connecting to exported sender idx=${i} dc=${dcId}`
+ ` ${shouldReconnect ? '(reconnect)' : ''}`);
}
this._exportedSenderPromises[dcId][i] = this._connectSender(
existingSender || this._createExportedSender(dcId, i),
dcId,
index,
isPremium,
this._exportedSenderAbortControllers[dcId][i].signal,
);
}
let sender;
try {
sender = await Promise.race([
this._exportedSenderPromises[dcId][i],
Helpers.sleep(EXPORTED_SENDER_CONNECT_TIMEOUT).then(() => {
return Promise.reject(new ConnectTimeoutError());
}),
]);
if (!sender) {
throw new ConnectTimeoutError();
}
sender = await this._exportedSenderPromises[dcId][i];
if (!sender.isConnected()) {
if (sender.isConnecting) {
@ -540,11 +567,6 @@ class TelegramClient {
}
}
} catch (err) {
if (err instanceof ConnectTimeoutError) {
this._exportedSenderAbortControllers[dcId][i]?.abort();
const { newIndex, noConnectedExportedSenders } = this.getConnectedExportedSenderIndex(dcId, i);
return this._borrowExportedSender(dcId, noConnectedExportedSenders, undefined, newIndex, isPremium);
}
// eslint-disable-next-line no-console
console.error(err);
@ -558,8 +580,11 @@ class TelegramClient {
}
this._exportedSenderReleaseTimeouts[dcId][i] = setTimeout(() => {
this._exportedSenderReleaseTimeouts[dcId][i] = undefined;
// eslint-disable-next-line no-console
if (this._shouldDebugExportedSenders) console.log(`🚪 Release idx=${i} dcId=${dcId}`);
sender.disconnect();
this._exportedSenderReleaseTimeouts[dcId][i] = undefined;
this._exportedSenderPromises[dcId][i] = undefined;
}, EXPORTED_SENDER_RELEASE_TIMEOUT);
return sender;
@ -569,12 +594,19 @@ class TelegramClient {
return new MTProtoSender(this.session.getAuthKey(dcId), {
logger: this._log,
dcId,
senderIndex: index,
retries: this._connectionRetries,
retriesToFallback: this._connectionRetriesToFallback,
delay: this._retryDelay,
retryMainConnectionDelay: this._retryMainConnectionDelay,
shouldForceHttpTransport: this._shouldForceHttpTransport,
shouldAllowHttpTransport: this._shouldAllowHttpTransport,
autoReconnect: this._autoReconnect,
connectTimeout: this._timeout,
authKeyCallback: this._authKeyCallback.bind(this),
isMainSender: dcId === this.session.dcId,
isExported: true,
getShouldDebugExportedSenders: this.getShouldDebugExportedSenders.bind(this),
onConnectionBreak: () => this._cleanupExportedSender(dcId, index),
});
}
@ -602,7 +634,7 @@ class TelegramClient {
* @returns {Promise<Buffer>}
*/
downloadFile(inputLocation, args = {}) {
return downloadFile(this, inputLocation, args);
return downloadFile(this, inputLocation, args, this._shouldDebugExportedSenders);
}
downloadMedia(messageOrMedia, args) {
@ -933,9 +965,11 @@ class TelegramClient {
throw new Error('You can only invoke MTProtoRequests');
}
const sender = dcId === undefined ? this._sender : await this.getSender(dcId);
let sender = dcId === undefined ? this._sender : await this.getSender(dcId);
this._lastRequest = Date.now();
await this._connectedDeferred.promise;
const state = new RequestState(request, abortSignal);
let attempt = 0;
@ -968,6 +1002,7 @@ class TelegramClient {
throw e;
}
await this._switchDC(e.newDc);
sender = dcId === undefined ? this._sender : await this.getSender(dcId);
} else if (e instanceof errors.MsgWaitError) {
// We need to resend this after the old one was confirmed.
await state.isReady();
@ -993,6 +1028,16 @@ class TelegramClient {
throw new Error(`Request was unsuccessful ${attempt} time(s)`);
}
async invokeBeacon(request, dcId) {
if (request.classType !== 'request') {
throw new Error('You can only invoke MTProtoRequests');
}
const sender = dcId === undefined ? this._sender : await this.getSender(dcId);
sender.sendBeacon(request);
}
setIsPremium(isPremium) {
this.isPremium = isPremium;
}
@ -1026,7 +1071,7 @@ class TelegramClient {
}
uploadFile(fileParams) {
return uploadFile(this, fileParams);
return uploadFile(this, fileParams, this._shouldDebugExportedSenders);
}
updateTwoFaSettings(params) {

View File

@ -95,11 +95,12 @@ export async function downloadFile(
client: TelegramClient,
inputLocation: Api.InputFileLocation,
fileParams: DownloadFileParams,
shouldDebugExportedSenders?: boolean,
) {
const { dcId } = fileParams;
for (let i = 0; i < SENDER_RETRIES; i++) {
try {
return await downloadFile2(client, inputLocation, fileParams);
return await downloadFile2(client, inputLocation, fileParams, shouldDebugExportedSenders);
} catch (err: any) {
if (
(err.message.startsWith('SESSION_REVOKED') || err.message.startsWith('CONNECTION_NOT_INITED'))
@ -127,6 +128,7 @@ async function downloadFile2(
client: TelegramClient,
inputLocation: Api.InputFileLocation,
fileParams: DownloadFileParams,
shouldDebugExportedSenders?: boolean,
) {
let {
partSizeKb, end,
@ -134,6 +136,15 @@ async function downloadFile2(
const {
fileSize,
} = fileParams;
const fileId = 'id' in inputLocation ? inputLocation.id : undefined;
const logWithId = (...args: any[]) => {
if (!shouldDebugExportedSenders) return;
// eslint-disable-next-line no-console
console.log(`⬇️ [${fileId}/${fileParams.dcId}]`, ...args);
};
logWithId('Downloading file...');
const isPremium = Boolean(client.isPremium);
const { dcId, progressCallback, start = 0 } = fileParams;
@ -206,6 +217,9 @@ async function downloadFile2(
foremans[senderIndex].releaseWorker();
break;
}
const logWithSenderIndex = (...args: any[]) => {
logWithId(`[${senderIndex}/${dcId}]`, ...args);
};
// eslint-disable-next-line no-loop-func, @typescript-eslint/no-loop-func
promises.push((async (offsetMemo: number) => {
@ -213,7 +227,23 @@ async function downloadFile2(
while (true) {
let sender;
try {
let isDone = false;
if (shouldDebugExportedSenders) {
setTimeout(() => {
if (isDone) return;
logWithSenderIndex(`❗️️ getSender took too long ${offsetMemo}`);
}, 8000);
}
sender = await client.getSender(dcId, senderIndex, isPremium);
isDone = true;
let isDone2 = false;
if (shouldDebugExportedSenders) {
setTimeout(() => {
if (isDone2) return;
logWithSenderIndex(`❗️️ sender.send took too long ${offsetMemo}`);
}, 6000);
}
// sometimes a session is revoked and will cause this to hang.
const result = await Promise.race([
sender.send(new Api.upload.GetFile({
@ -225,19 +255,23 @@ async function downloadFile2(
sleep(SENDER_TIMEOUT).then(() => {
// If we're on the main DC we just cancel the download and let the user retry later
if (dcId === client.session.dcId) {
logWithSenderIndex(`Download timed out ${offsetMemo}`);
return Promise.reject(new Error('USER_CANCELED'));
} else {
logWithSenderIndex(`Download timed out [not main] ${offsetMemo}`);
return Promise.reject(new Error('SESSION_REVOKED'));
}
}),
]);
isDone2 = true;
if (progressCallback) {
if (progressCallback.isCanceled) {
throw new Error('USER_CANCELED');
}
progress += (1 / partsCount);
logWithSenderIndex(`⬇️️ ${progress * 100}%`);
progressCallback(progress);
}
@ -260,6 +294,7 @@ async function downloadFile2(
continue;
}
logWithSenderIndex(`Ended not gracefully ${offsetMemo}`);
foremans[senderIndex].releaseWorker();
if (deferred) deferred.resolve();

View File

@ -33,6 +33,7 @@ const foremans = Array(MAX_CONCURRENT_CONNECTIONS_PREMIUM).fill(undefined)
export async function uploadFile(
client: TelegramClient,
fileParams: UploadFileParams,
shouldDebugExportedSenders?: boolean,
): Promise<Api.InputFile | Api.InputFileBig> {
const { file, onProgress } = fileParams;
@ -42,6 +43,13 @@ export async function uploadFile(
const fileId = readBigIntFromBuffer(generateRandomBytes(8), true, true);
const isLarge = size > LARGE_FILE_THRESHOLD;
const logWithId = (...args: any[]) => {
if (!shouldDebugExportedSenders) return;
// eslint-disable-next-line no-console
console.log(`⬆️ [${fileId}]`, ...args);
};
logWithId('Uploading file...');
const partSize = getUploadPartSize(size) * KB_TO_BYTES;
const partCount = Math.floor((size + partSize - 1) / partSize);
@ -70,6 +78,10 @@ export async function uploadFile(
break;
}
const logWithSenderIndex = (...args: any[]) => {
logWithId(`[${senderIndex}]`, ...args);
};
const blobSlice = file.slice(i * partSize, (i + 1) * partSize);
// eslint-disable-next-line no-loop-func, @typescript-eslint/no-loop-func
promises.push((async (jMemo: number, blobSliceMemo: Blob) => {
@ -78,8 +90,24 @@ export async function uploadFile(
let sender;
try {
// We always upload from the DC we are in
let isDone = false;
if (shouldDebugExportedSenders) {
setTimeout(() => {
if (isDone) return;
logWithSenderIndex(`❗️️ getSender took too long j=${jMemo}`);
}, 8000);
}
sender = await client.getSender(client.session.dcId, senderIndex, isPremium);
isDone = true;
let isDone2 = false;
const partBytes = await blobSliceMemo.arrayBuffer();
if (shouldDebugExportedSenders) {
setTimeout(() => {
if (isDone2) return;
logWithSenderIndex(`❗️️ sender.send took too long j=${jMemo}`);
}, 6000);
}
await sender.send(
isLarge
? new Api.upload.SaveBigFilePart({
@ -94,7 +122,9 @@ export async function uploadFile(
bytes: Buffer.from(partBytes),
}),
);
isDone2 = true;
} catch (err) {
logWithSenderIndex(`Upload part failed ${err?.toString()} j=${jMemo}`);
if (sender && !sender.isConnected()) {
await sleep(DISCONNECT_SLEEP);
continue;
@ -115,6 +145,7 @@ export async function uploadFile(
}
progress += (1 / partCount);
logWithSenderIndex(`${progress * 100}%`);
onProgress(progress);
}
break;

View File

@ -0,0 +1,101 @@
const closeError = new Error('HttpStream was closed');
class HttpStream {
private url: string | undefined;
private isClosed: boolean;
private stream: Buffer[] = [];
private canRead: Promise<void> = Promise.resolve();
private resolveRead: VoidFunction | undefined;
private rejectRead: VoidFunction | undefined;
private disconnectedCallback: VoidFunction | undefined;
constructor(disconnectedCallback: VoidFunction) {
this.isClosed = true;
this.disconnectedCallback = disconnectedCallback;
}
async read() {
await this.canRead;
const data = this.stream.shift();
if (this.stream.length === 0) {
this.canRead = new Promise((resolve, reject) => {
this.resolveRead = resolve;
this.rejectRead = reject;
});
}
return data;
}
getURL(ip: string, port: number, testServers: boolean, isPremium: boolean) {
if (port === 443) {
return `https://${ip}:${port}/apiw1${testServers ? '_test' : ''}${isPremium ? '_premium' : ''}`;
} else {
return `http://${ip}:${port}/apiw1${testServers ? '_test' : ''}${isPremium ? '_premium' : ''}`;
}
}
async connect(port: number, ip: string, testServers = false, isPremium = false) {
this.stream = [];
this.canRead = new Promise((resolve, reject) => {
this.resolveRead = resolve;
this.rejectRead = reject;
});
this.url = this.getURL(ip, port, testServers, isPremium);
await fetch(this.url, {
method: 'POST',
body: Buffer.from([]),
mode: 'cors',
});
this.isClosed = false;
}
async write(data: Buffer) {
if (this.isClosed || !this.url) {
this.handleDisconnect();
throw closeError;
}
return fetch(this.url, {
method: 'POST',
body: data,
mode: 'cors',
}).then(async (response) => {
if (this.isClosed) {
this.handleDisconnect();
return;
}
if (response.status !== 200) {
this.handleDisconnect();
throw closeError;
}
const arrayBuffer = await response.arrayBuffer();
this.stream = this.stream.concat(Buffer.from(arrayBuffer));
if (this.resolveRead && !this.isClosed) this.resolveRead();
});
}
handleDisconnect() {
this.disconnectedCallback?.();
if (this.rejectRead) this.rejectRead();
}
close() {
this.isClosed = true;
this.handleDisconnect();
this.disconnectedCallback = undefined;
}
}
export default HttpStream;

View File

@ -82,13 +82,39 @@ class MessagePacker {
this.setReady(true);
}
async get() {
async getBeacon(state) {
const buffer = new BinaryWriter(Buffer.alloc(0));
const size = state.data.length + TLMessage.SIZE_OVERHEAD;
if (size <= MessageContainer.MAXIMUM_SIZE) {
let afterId;
if (state.after) {
afterId = state.after.msgId;
}
state.msgId = await this._state.writeDataAsMessage(
buffer, state.data, state.request.classType === 'request', afterId,
);
this._log.debug(`Assigned msgId = ${state.msgId} to ${state.request.className
|| state.request.constructor.name}`);
return buffer.getValue();
}
this._log.warn(`Message payload for ${state.request.className
|| state.request.constructor.name} is too long ${state.data.length} and cannot be sent`);
state.reject('Request Payload is too big');
return undefined;
}
async wait() {
if (!this._queue.length) {
this._ready = new Promise(((resolve) => {
this.setReady = resolve;
}));
await this._ready;
}
}
async get() {
if (!this._queue[this._queue.length - 1]) {
this._queue = [];
return undefined;

View File

@ -3,6 +3,8 @@ const { Mutex } = require('async-mutex');
const mutex = new Mutex();
const closeError = new Error('WebSocket was closed');
const CONNECTION_TIMEOUT = 10000;
const MAX_TIMEOUT = 30000;
class PromisedWebSockets {
constructor(disconnectedCallback) {
@ -16,6 +18,7 @@ class PromisedWebSockets {
this.client = undefined;
this.closed = true;
this.disconnectedCallback = disconnectedCallback;
this.timeout = CONNECTION_TIMEOUT;
}
async readExactly(number) {
@ -80,14 +83,20 @@ class PromisedWebSockets {
this.website = this.getWebSocketLink(ip, port, testServers, isPremium);
this.client = new WebSocket(this.website, 'binary');
return new Promise((resolve, reject) => {
let hasResolved = false;
let timeout;
this.client.onopen = () => {
this.receive();
resolve(this);
hasResolved = true;
if (timeout) clearTimeout(timeout);
};
this.client.onerror = (error) => {
// eslint-disable-next-line no-console
console.error('WebSocket error', error);
reject(error);
hasResolved = true;
if (timeout) clearTimeout(timeout);
};
this.client.onclose = (event) => {
const { code, reason, wasClean } = event;
@ -101,7 +110,25 @@ class PromisedWebSockets {
if (this.disconnectedCallback) {
this.disconnectedCallback();
}
hasResolved = true;
if (timeout) clearTimeout(timeout);
};
timeout = setTimeout(() => {
if (hasResolved) return;
reject(new Error('WebSocket connection timeout'));
this.resolveRead(false);
this.closed = true;
if (this.disconnectedCallback) {
this.disconnectedCallback();
}
this.client.close();
this.timeout *= 2;
this.timeout = Math.min(this.timeout, MAX_TIMEOUT);
timeout = undefined;
}, this.timeout);
// CONTEST
// Seems to not be working, at least in a web worker
// eslint-disable-next-line no-restricted-globals

View File

@ -34,11 +34,17 @@ const {
MsgsStateReq,
MsgResendReq,
MsgsAllInfo,
HttpWait,
} = require('../tl').constructors;
const { SecurityError } = require('../errors/Common');
const { InvalidBufferError } = require('../errors/Common');
const { RPCMessageToError } = require('../errors');
const { TypeNotFoundError } = require('../errors/Common');
const { sendToOrigin } = require('../../../api/gramjs/worker/worker');
const LONGPOLL_MAX_WAIT = 3000;
const LONGPOLL_MAX_DELAY = 500;
const LONGPOLL_WAIT_AFTER = 150;
/**
* MTProto Mobile Protocol sender
@ -57,7 +63,11 @@ class MTProtoSender {
static DEFAULT_OPTIONS = {
logger: undefined,
retries: Infinity,
retriesToFallback: 1,
delay: 2000,
retryMainConnectionDelay: 10000,
shouldForceHttpTransport: false,
shouldAllowHttpTransport: false,
autoReconnect: true,
connectTimeout: undefined,
authKeyCallback: undefined,
@ -65,6 +75,8 @@ class MTProtoSender {
autoReconnectCallback: undefined,
isMainSender: undefined,
onConnectionBreak: undefined,
isExported: undefined,
getShouldDebugExportedSenders: undefined,
};
/**
@ -74,17 +86,26 @@ class MTProtoSender {
constructor(authKey, opts) {
const args = { ...MTProtoSender.DEFAULT_OPTIONS, ...opts };
this._connection = undefined;
this._fallbackConnection = undefined;
this._shouldForceHttpTransport = args.shouldForceHttpTransport;
this._shouldAllowHttpTransport = args.shouldAllowHttpTransport;
this._log = args.logger;
this._dcId = args.dcId;
this._senderIndex = args.senderIndex;
this._retries = args.retries;
this._retriesToFallback = args.retriesToFallback;
this._delay = args.delay;
this._retryMainConnectionDelay = args.retryMainConnectionDelay;
this._autoReconnect = args.autoReconnect;
this._connectTimeout = args.connectTimeout;
this._authKeyCallback = args.authKeyCallback;
this._updateCallback = args.updateCallback;
this._autoReconnectCallback = args.autoReconnectCallback;
this._isMainSender = args.isMainSender;
this._isExported = args.isExported;
this._onConnectionBreak = args.onConnectionBreak;
this._isFallback = false;
this._getShouldDebugExportedSenders = args.getShouldDebugExportedSenders;
/**
* whether we disconnected ourself or telegram did it.
@ -107,6 +128,7 @@ class MTProtoSender {
* We need to join the loops upon disconnection
*/
this._send_loop_handle = undefined;
this._long_poll_loop_handle = undefined;
this._recv_loop_handle = undefined;
/**
@ -120,6 +142,7 @@ class MTProtoSender {
* Note that here we're also storing their ``_RequestState``.
*/
this._send_queue = new MessagePacker(this._state, this._log);
this._send_queue_long_poll = new MessagePacker(this._state, this._log);
/**
* Sent states are remembered until a response is received.
@ -162,13 +185,34 @@ class MTProtoSender {
// Public API
logWithIndexCallback(level) {
return (...args) => {
if (!this._getShouldDebugExportedSenders
|| !this._getShouldDebugExportedSenders()) return;
// eslint-disable-next-line no-console
console[level](`[${this._isExported ? `idx=${this._senderIndex} ` : 'M '}dcId=${this._dcId}]`, ...args);
};
}
logWithIndex = {
debug: this.logWithIndexCallback('debug'),
log: this.logWithIndexCallback('log'),
warn: this.logWithIndexCallback('warn'),
error: this.logWithIndexCallback('error'),
};
getConnection() {
return this._isFallback ? this._fallbackConnection : this._connection;
}
/**
* Connects to the specified given connection using the given auth key.
* @param connection
* @param [force]
* @param fallbackConnection
* @returns {Promise<boolean>}
*/
async connect(connection, force) {
async connect(connection, force, fallbackConnection) {
this.userDisconnected = false;
if (this._user_connected && !force) {
@ -176,11 +220,20 @@ class MTProtoSender {
return false;
}
this.isConnecting = true;
this._isFallback = this._shouldForceHttpTransport && this._shouldAllowHttpTransport;
this._connection = connection;
this._fallbackConnection = fallbackConnection;
for (let attempt = 0; attempt < this._retries; attempt++) {
for (let attempt = 0; attempt < this._retries + this._retriesToFallback; attempt++) {
try {
await this._connect();
if (attempt >= this._retriesToFallback && this._shouldAllowHttpTransport) {
this._isFallback = true;
this.logWithIndex.warn('Using fallback connection');
this._log.warn('Using fallback connection');
}
this.logWithIndex.warn('Connecting...');
await this._connect(this.getConnection());
this.logWithIndex.warn('Connected!');
if (this._updateCallback) {
this._updateCallback(new UpdateConnectionState(UpdateConnectionState.connected));
}
@ -189,7 +242,7 @@ class MTProtoSender {
if (this._updateCallback && attempt === 0) {
this._updateCallback(new UpdateConnectionState(UpdateConnectionState.disconnected));
}
this._log.error(`WebSocket connection failed attempt: ${attempt + 1}`);
this._log.error(`${this._isFallback ? 'HTTP' : 'WebSocket'} connection failed attempt: ${attempt + 1}`);
// eslint-disable-next-line no-console
console.error(err);
await Helpers.sleep(this._delay);
@ -197,9 +250,41 @@ class MTProtoSender {
}
this.isConnecting = false;
if (this._isFallback && !this._shouldForceHttpTransport) {
void this.tryReconnectToMain();
}
return true;
}
async tryReconnectToMain() {
if (!this.isConnecting && this._isFallback && !this._isReconnectingToMain && !this.isReconnecting
&& !this._shouldForceHttpTransport && !this._isExported) {
this._log.debug('Trying to reconnect to main connection');
this._isReconnectingToMain = true;
try {
await this._connection.connect();
this._log.info('Reconnected to main connection');
this.logWithIndex.warn('Reconnected to main connection');
this.isReconnecting = true;
await this._disconnect(this._fallbackConnection);
await this.connect(this._connection, true, this._fallbackConnection);
this.isReconnecting = false;
this._isReconnectingToMain = false;
} catch (e) {
this.isReconnecting = false;
this._isReconnectingToMain = false;
this._log.error(
`Failed to reconnect to main connection, retrying in ${this._retryMainConnectionDelay}ms`,
);
await Helpers.sleep(this._retryMainConnectionDelay);
void this.tryReconnectToMain();
}
} else {
await Helpers.sleep(this._retryMainConnectionDelay);
}
}
isConnected() {
return this._user_connected;
}
@ -210,7 +295,8 @@ class MTProtoSender {
*/
async disconnect() {
this.userDisconnected = true;
await this._disconnect();
this.logWithIndex.warn('Disconnecting...');
await this._disconnect(this.getConnection());
}
/**
@ -237,11 +323,17 @@ class MTProtoSender {
impossible to await receive a result that was never sent.
* @param request
* @param abortSignal
* @param isLongPoll
* @returns {RequestState}
*/
send(request, abortSignal) {
send(request, abortSignal, isLongPoll = false) {
const state = new RequestState(request, abortSignal);
this._send_queue.append(state);
if (!isLongPoll) {
this.logWithIndex.debug(`Send ${request.className}`);
this._send_queue.append(state);
} else {
this._send_queue_long_poll.append(state);
}
return state.promise;
}
@ -249,6 +341,17 @@ class MTProtoSender {
this._send_queue.append(state);
}
async sendBeacon(request) {
if (!this._user_connected) {
throw new Error('Cannot send requests while disconnected');
}
const state = new RequestState(request, undefined);
const data = await this._send_queue.getBeacon(state);
const encryptedData = await this._state.encryptMessageData(data);
sendToOrigin({ type: 'sendBeacon', data: encryptedData, url: this._fallbackConnection.socket.website });
}
/**
* Performs the actual connection, retrying, generating the
* authorization key if necessary, and starting the send and
@ -256,13 +359,15 @@ class MTProtoSender {
* @returns {Promise<void>}
* @private
*/
async _connect() {
this._log.info('Connecting to {0}...'.replace('{0}', this._connection));
await this._connection.connect();
this._log.debug('Connection success!');
// process.exit(0)
async _connect(connection) {
if (!connection.isConnected()) {
this._log.info('Connecting to {0}...'.replace('{0}', connection));
await connection.connect();
this._log.debug('Connection success!');
}
if (!this.authKey.getKey()) {
const plain = new MtProtoPlainSender(this._connection, this._log);
const plain = new MtProtoPlainSender(connection, this._log);
this._log.debug('New auth_key attempt ...');
const res = await doAuthentication(plain, this._log);
this._log.debug('Generated new auth_key successfully');
@ -290,33 +395,80 @@ class MTProtoSender {
this._user_connected = true;
this.isReconnecting = false;
this._log.debug('Starting send loop');
this._send_loop_handle = this._sendLoop();
if (!this._send_loop_handle) {
this._log.debug('Starting send loop');
this._send_loop_handle = this._sendLoop();
}
this._log.debug('Starting receive loop');
this._recv_loop_handle = this._recvLoop();
if (!this._recv_loop_handle) {
this._log.debug('Starting receive loop');
this._recv_loop_handle = this._recvLoop();
}
if (!this._long_poll_loop_handle && connection.shouldLongPoll) {
this._log.debug('Starting long-poll loop');
this._long_poll_loop_handle = this._longPollLoop();
}
// _disconnected only completes after manual disconnection
// or errors after which the sender cannot continue such
// as failing to reconnect or any unexpected error.
this._log.info('Connection to %s complete!'.replace('%s', this._connection.toString()));
this._log.info('Connection to %s complete!'.replace('%s', connection.toString()));
}
async _disconnect() {
async _disconnect(connection) {
if (this._updateCallback) {
this._updateCallback(new UpdateConnectionState(UpdateConnectionState.disconnected));
}
if (this._connection === undefined) {
if (connection === undefined) {
this._log.info('Not disconnecting (already have no connection)');
return;
}
this._log.info('Disconnecting from %s...'.replace('%s', this._connection.toString()));
this._log.info('Disconnecting from %s...'.replace('%s', connection.toString()));
this._user_connected = false;
this._log.debug('Closing current connection...');
await this._connection.disconnect();
this.logWithIndex.warn('Disconnecting');
await connection.disconnect();
}
async _longPollLoop() {
while (this._user_connected && !this.isReconnecting && this._isFallback
&& this.getConnection().shouldLongPoll) {
await this._send_queue_long_poll.wait();
const res = await this._send_queue_long_poll.get();
if (this.isReconnecting || !this._isFallback) {
this._long_poll_loop_handle = undefined;
return;
}
if (!res) {
continue;
}
let { data } = res;
const { batch } = res;
this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`);
data = await this._state.encryptMessageData(data);
try {
await this._fallbackConnection.send(data);
} catch (e) {
this._log.error(e);
this._log.info('Connection closed while sending data');
this._long_poll_loop_handle = undefined;
return;
}
this.isSendingLongPoll = false;
this.checkLongPoll();
}
this._long_poll_loop_handle = undefined;
}
/**
@ -332,22 +484,46 @@ class MTProtoSender {
this._pending_state.clear();
while (this._user_connected && !this.isReconnecting) {
if (this._pending_ack.size) {
const ack = new RequestState(new MsgsAck({ msgIds: Array(...this._pending_ack) }));
this._send_queue.append(ack);
this._last_acks.push(ack);
if (this._last_acks.length >= 10) {
this._last_acks.shift();
const appendAcks = () => {
if (this._pending_ack.size) {
const ack = new RequestState(new MsgsAck({ msgIds: Array(...this._pending_ack) }));
this._send_queue.append(ack);
this._last_acks.push(ack);
if (this._last_acks.length >= 10) {
this._last_acks.shift();
}
this._pending_ack.clear();
}
this._pending_ack.clear();
}
this._log.debug(`Waiting for messages to send...${this.isReconnecting}`);
};
appendAcks();
this.logWithIndex.debug(`Waiting for messages to send... ${this.isReconnecting}`);
this._log.debug(`Waiting for messages to send... ${this.isReconnecting}`);
// TODO Wait for the connection send queue to be empty?
// This means that while it's not empty we can wait for
// more messages to be added to the send queue.
await this._send_queue.wait();
if (this._isFallback) {
// We don't long-poll on main loop, instead we have a separate loop for that
this.send(new HttpWait({
maxDelay: 0,
waitAfter: 0,
maxWait: 0,
}));
}
// If we've had new ACKs appended while waiting for messages to send, add them to queue
appendAcks();
const res = await this._send_queue.get();
this.logWithIndex.debug(`Got ${res?.batch.length} message(s) to send`);
if (this.isReconnecting) {
this.logWithIndex.debug('Reconnecting :(');
this._send_loop_handle = undefined;
return;
}
@ -357,14 +533,17 @@ class MTProtoSender {
let { data } = res;
const { batch } = res;
this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`);
this.logWithIndex.debug('Sending', batch.map((m) => m.request.className));
data = await this._state.encryptMessageData(data);
try {
await this._connection.send(data);
await this.getConnection().send(data);
} catch (e) {
this.logWithIndex.debug(`Connection closed while sending data ${e}`);
this._log.error(e);
this._log.info('Connection closed while sending data');
this._send_loop_handle = undefined;
return;
}
for (const state of batch) {
@ -372,16 +551,25 @@ class MTProtoSender {
if (state.request.classType === 'request') {
this._pending_state.set(state.msgId, state);
}
if (state.request.className === 'HttpWait') {
state.resolve();
}
} else {
for (const s of state) {
if (s.request.classType === 'request') {
this._pending_state.set(s.msgId, s);
}
if (s.request.className === 'HttpWait') {
state.resolve();
}
}
}
}
this.logWithIndex.debug('Encrypted messages put in a queue to be sent');
this._log.debug('Encrypted messages put in a queue to be sent');
}
this._send_loop_handle = undefined;
}
async _recvLoop() {
@ -389,10 +577,10 @@ class MTProtoSender {
let message;
while (this._user_connected && !this.isReconnecting) {
// this._log.debug('Receiving items from the network...');
this._log.debug('Receiving items from the network...');
this.logWithIndex.debug('Receiving items from the network...');
try {
body = await this._connection.recv();
body = await this.getConnection().recv();
} catch (e) {
// this._log.info('Connection closed while receiving data');
/** when the server disconnects us we want to reconnect */
@ -401,11 +589,14 @@ class MTProtoSender {
this._log.warn('Connection closed while receiving data');
this.reconnect();
}
this._recv_loop_handle = undefined;
return;
}
try {
message = await this._state.decryptMessageData(body);
} catch (e) {
this.logWithIndex.debug(`Error while receiving items from the network ${e.toString()}`);
if (e instanceof TypeNotFoundError) {
// Received object which we don't know how to deserialize
this._log.info(`Type ${e.invalidConstructorId} not found, remaining data ${e.remaining}`);
@ -426,11 +617,13 @@ class MTProtoSender {
this._log.warn(`Invalid buffer ${e.code} for dc ${this._dcId}`);
this.reconnect();
}
this._recv_loop_handle = undefined;
return;
} else {
this._log.error('Unhandled error while receiving data');
this._log.error(e);
this.reconnect();
this._recv_loop_handle = undefined;
return;
}
}
@ -448,7 +641,22 @@ class MTProtoSender {
this._log.error(e);
}
}
void this.checkLongPoll();
}
this._recv_loop_handle = undefined;
}
checkLongPoll() {
if (this.isSendingLongPoll || !this._isFallback) return;
this.isSendingLongPoll = true;
this.send(new HttpWait({
maxDelay: LONGPOLL_MAX_DELAY,
waitAfter: LONGPOLL_WAIT_AFTER,
maxWait: LONGPOLL_MAX_WAIT,
}), undefined, true);
}
_handleBadAuthKey(shouldSkipForMain) {
@ -476,7 +684,14 @@ class MTProtoSender {
* @private
*/
async _processMessage(message) {
if (message.obj.className === 'MsgsAck') return;
this.logWithIndex.debug(`Process message ${message.obj.className}`);
this._pending_ack.add(message.msgId);
if (this.getConnection().shouldLongPoll) {
this._send_queue.setReady(true);
}
// eslint-disable-next-line require-atomic-updates
message.obj = await message.obj;
let handler = this._handlers[message.obj.CONSTRUCTOR_ID];
@ -549,14 +764,18 @@ class MTProtoSender {
throw new TypeNotFoundError('Not an upload.File');
}
} catch (e) {
this._log.error(e);
if (e instanceof TypeNotFoundError) {
this._log.info(`Received response without parent request: ${result.body}`);
return;
} else {
throw e;
} else if (this._isFallback) {
// If we're using HTTP transport, there might be a chance that the response comes through
// multiple times if didn't send acknowledgment in time, so we should just ignore it
return;
}
throw e;
}
return;
}
if (result.error) {
@ -569,6 +788,7 @@ class MTProtoSender {
try {
const reader = new BinaryReader(result.body);
const read = state.request.readResult(reader);
this.logWithIndex.debug('Handling RPC result', read);
state.resolve(read);
} catch (err) {
state.reject(err);
@ -808,6 +1028,7 @@ class MTProtoSender {
// in case of internal server issues.
Helpers.sleep(1000)
.then(() => {
this.logWithIndex.log('Reconnecting...');
this._log.info('Started reconnecting');
this._reconnect();
});
@ -817,7 +1038,8 @@ class MTProtoSender {
async _reconnect() {
this._log.debug('Closing current connection...');
try {
await this._disconnect();
this.logWithIndex.warn('[Reconnect] Closing current connection...');
await this._disconnect(this.getConnection());
} catch (err) {
this._log.warn(err);
}
@ -833,9 +1055,18 @@ class MTProtoSender {
this._connection._log,
this._connection._testServers,
);
await this.connect(newConnection, true);
const newFallbackConnection = new this._fallbackConnection.constructor(
this._connection._ip,
this._connection._port,
this._connection._dcId,
this._connection._log,
this._connection._testServers,
);
await this.connect(newConnection, true, newFallbackConnection);
this.isReconnecting = false;
this._send_queue.prepend(this._pending_state.values());
this._pending_state.clear();
if (this._autoReconnectCallback) {
await this._autoReconnectCallback();

View File

@ -1,4 +1,5 @@
const PromisedWebSockets = require('../../extensions/PromisedWebSockets');
const HttpStream = require('../../extensions/HttpStream').default;
const AsyncQueue = require('../../extensions/AsyncQueue');
/**
@ -31,9 +32,14 @@ class Connection {
this._recvArray = new AsyncQueue();
// this.socket = new PromiseSocket(new Socket())
this.shouldLongPoll = false;
this.socket = new PromisedWebSockets(this.disconnectCallback.bind(this));
}
isConnected() {
return this._connected;
}
async disconnectCallback() {
await this.disconnect(true);
}
@ -178,8 +184,36 @@ class PacketCodec {
}
}
class HttpConnection extends Connection {
constructor(ip, port, dcId, loggers, testServers, isPremium) {
super(ip, port, dcId, loggers, testServers, isPremium);
this.shouldLongPoll = true;
this.socket = new HttpStream(this.disconnectCallback.bind(this));
}
send(data) {
return this.socket.write(data);
}
recv() {
return this.socket.read();
}
async _connect() {
this._log.debug('Connecting');
await this.socket.connect(this._port, this._ip, this._testServers, this._isPremium);
this._log.debug('Finished connecting');
}
async connect() {
await this._connect();
this._connected = true;
}
}
module.exports = {
Connection,
PacketCodec,
ObfuscatedConnection,
HttpConnection,
};

View File

@ -1,10 +1,11 @@
const { Connection } = require('./Connection');
const { Connection, HttpConnection } = require('./Connection');
const { ConnectionTCPFull } = require('./TCPFull');
const { ConnectionTCPAbridged } = require('./TCPAbridged');
const { ConnectionTCPObfuscated } = require('./TCPObfuscated');
module.exports = {
Connection,
HttpConnection,
ConnectionTCPFull,
ConnectionTCPAbridged,
ConnectionTCPObfuscated,

View File

@ -6,6 +6,7 @@ const {
ConnectionTCPFull,
ConnectionTCPAbridged,
ConnectionTCPObfuscated,
HttpConnection,
} = require('./connection');
const {
@ -15,6 +16,7 @@ const {
module.exports = {
Connection,
HttpConnection,
ConnectionTCPFull,
ConnectionTCPAbridged,
ConnectionTCPObfuscated,

View File

@ -148,7 +148,7 @@ function updateGroupCallStreams(userId: string) {
}
async function getUserStream(streamType: StreamType, facing: VideoFacingModeEnum = 'user') {
if(streamType === 'audio' && state?.audioStream) {
if (streamType === 'audio' && state?.audioStream) {
return state.audioStream;
}
@ -162,19 +162,19 @@ async function getUserStream(streamType: StreamType, facing: VideoFacingModeEnum
const media = await navigator.mediaDevices.getUserMedia({
audio: streamType === 'audio' ? {
// @ts-ignore
...(IS_ECHO_CANCELLATION_SUPPORTED && { echoCancellation: true }),
...(IS_NOISE_SUPPRESSION_SUPPORTED && { noiseSuppression: true }),
...(IS_ECHO_CANCELLATION_SUPPORTED && {echoCancellation: true}),
...(IS_NOISE_SUPPRESSION_SUPPORTED && {noiseSuppression: true}),
} : false,
video: streamType === 'video' ? {
facingMode: facing,
} : false,
});
if(state && streamType === 'audio'){
if (state && streamType === 'audio') {
state.audioStream = media;
}
if(streamType === 'video') {
if (streamType === 'video') {
const vid = document.createElement('video');
vid.srcObject = media;

View File

@ -100,6 +100,10 @@ export interface ISettings extends NotifySettings, Record<string, any> {
doNotTranslate: string[];
canDisplayChatInTitle: boolean;
shouldShowLoginCodeInChatList?: boolean;
shouldForceHttpTransport?: boolean;
shouldAllowHttpTransport?: boolean;
shouldCollectDebugLogs?: boolean;
shouldDebugExportedSenders?: boolean;
}
export interface ApiPrivacySettings {

50
src/util/debugConsole.ts Normal file
View File

@ -0,0 +1,50 @@
/* eslint-disable no-console */
export const DEBUG_LEVELS = ['log', 'error', 'warn', 'info', 'debug'] as const;
export type DebugLevel = typeof DEBUG_LEVELS[number];
// @ts-ignore
const ORIGINAL_FUNCTIONS: Record<DebugLevel, (...args: any[]) => void> = DEBUG_LEVELS.reduce((acc, level) => {
// @ts-ignore
acc[level] = console[level];
return acc;
}, {});
type DebugEntry = {
level: DebugLevel;
args: any[];
date: Date;
};
let DEBUG_LOGS: DebugEntry[] = [];
export function logDebugMessage(level: DebugLevel, ...args: any[]) {
DEBUG_LOGS.push({
level,
args,
date: new Date(),
});
ORIGINAL_FUNCTIONS[level](...args);
}
export function initDebugConsole() {
DEBUG_LOGS = [];
DEBUG_LEVELS.forEach((level) => {
// @ts-ignore
console[level] = (...args: any[]) => {
logDebugMessage(level, ...args);
};
});
}
export function disableDebugConsole() {
DEBUG_LEVELS.forEach((level) => {
// @ts-ignore
console[level] = ORIGINAL_FUNCTIONS[level];
});
DEBUG_LOGS = [];
}
export function getDebugLogs() {
return JSON.stringify(DEBUG_LOGS, (key, value) => (typeof value === 'bigint'
? value.toString()
: value));
}

View File

@ -4,8 +4,8 @@ export class AbortError extends Error {
}
}
export default async function withAbortCheck<T>(abortSignal: AbortSignal, cb: Promise<T>): Promise<T> {
const result = await cb;
export default async function withAbortCheck<T>(abortSignal: AbortSignal, promise: Promise<T>): Promise<T> {
const result = await promise;
if (abortSignal?.aborted) {
throw new AbortError();