diff --git a/src/api/gramjs/methods/chats.ts b/src/api/gramjs/methods/chats.ts index 08815e5dc..cd903a232 100644 --- a/src/api/gramjs/methods/chats.ts +++ b/src/api/gramjs/methods/chats.ts @@ -192,7 +192,8 @@ export async function fetchChats({ const chat = buildApiChatFromDialog(dialog, peerEntity); lastMessageByChatId[chat.id] = dialog.topMessage; - if (dialog.pts) { + const isChannel = getEntityTypeById(chat.id) === 'channel'; + if (dialog.pts && isChannel) { updateChannelState(chat.id, dialog.pts); } @@ -525,6 +526,11 @@ export async function requestChatUpdate({ const chatUpdate = buildApiChatFromDialog(dialog, peerEntity); + const isChannel = getEntityTypeById(chat.id) === 'channel'; + if (dialog.pts && isChannel) { + updateChannelState(chat.id, dialog.pts); + } + const readState = buildThreadReadState(dialog); const threadInfo = buildApiThreadInfoFromDialog(chat.id, dialog); sendApiUpdate({ diff --git a/src/api/gramjs/methods/client.ts b/src/api/gramjs/methods/client.ts index 3bc3d2776..17c4cc94c 100644 --- a/src/api/gramjs/methods/client.ts +++ b/src/api/gramjs/methods/client.ts @@ -51,8 +51,9 @@ import { getDifference, init as initUpdatesManager, processUpdate, + requestChannelDifference as requestChannelDifferenceFromUpdates, reset as resetUpdatesManager, - scheduleGetChannelDifference, + setOpenedChannelIds as setOpenedChannelIdsInUpdates, updateChannelState, } from '../updates/updateManager'; import { @@ -660,5 +661,9 @@ export function setShouldDebugExportedSenders(value: boolean) { } export function requestChannelDifference(channelId: string) { - scheduleGetChannelDifference(channelId); + requestChannelDifferenceFromUpdates(channelId); +} + +export function setOpenedChannelIds(channelIds: string[]) { + setOpenedChannelIdsInUpdates(channelIds); } diff --git a/src/api/gramjs/methods/index.ts b/src/api/gramjs/methods/index.ts index 3611661f9..ccc59a325 100644 --- a/src/api/gramjs/methods/index.ts +++ b/src/api/gramjs/methods/index.ts @@ -1,6 +1,7 @@ export { destroy, disconnect, downloadMedia, fetchCurrentUser, repairFileReference, abortChatRequests, abortRequestGroup, setForceHttpTransport, setShouldDebugExportedSenders, setAllowHttpTransport, requestChannelDifference, + setOpenedChannelIds, } from './client'; export { diff --git a/src/api/gramjs/methods/messages.ts b/src/api/gramjs/methods/messages.ts index f50527021..912579259 100644 --- a/src/api/gramjs/methods/messages.ts +++ b/src/api/gramjs/methods/messages.ts @@ -162,6 +162,7 @@ export async function fetchMessages({ const RequestClass = threadId === MAIN_THREAD_ID ? GramJs.messages.GetHistory : isSavedDialog ? GramJs.messages.GetSavedHistory : GramJs.messages.GetReplies; + const isChannel = getEntityTypeById(chat.id) === 'channel'; let result; try { @@ -203,6 +204,10 @@ export async function fetchMessages({ return undefined; } + if (isChannel && 'pts' in result) { + updateChannelState(chat.id, result.pts); + } + const messages = result.messages.map(buildApiMessage).filter(Boolean); const users = result.users.map(buildApiUser).filter(Boolean); const chats = result.chats.map((c) => buildApiChatFromPreview(c)).filter(Boolean); @@ -259,7 +264,7 @@ export async function fetchMessage({ chat, messageId }: { chat: ApiChat; message return undefined; } - if ('pts' in result) { + if (isChannel && 'pts' in result) { updateChannelState(chat.id, result.pts); } diff --git a/src/api/gramjs/updates/updateManager.ts b/src/api/gramjs/updates/updateManager.ts index 9b3a90cd9..632f758ed 100644 --- a/src/api/gramjs/updates/updateManager.ts +++ b/src/api/gramjs/updates/updateManager.ts @@ -1,4 +1,5 @@ import { Api as GramJs, type Update } from '../../../lib/gramjs'; +import { RPCError } from '../../../lib/gramjs/errors'; import { UpdateConnectionState, UpdateServerTimeOffset } from '../../../lib/gramjs/network'; import type { Entity } from '../../../lib/gramjs/types'; @@ -24,16 +25,37 @@ export type State = { }; type SeqUpdate = (GramJs.Updates | GramJs.UpdatesCombined) & { _isFromDifference?: true }; type PtsUpdate = ((GramJs.TypeUpdate & { pts: number }) | UpdatePts) & { _isFromDifference?: true }; +type ChannelDifferenceReason = 'gapRecovery' | 'shortpoll'; +type ChannelScheduler = { + timeout?: ReturnType; + deadline?: number; + reason?: ChannelDifferenceReason; + isInFlight: boolean; + shortpollTimeoutMs?: number; + isShortpollEligible?: boolean; +}; const COMMON_BOX_QUEUE_ID = '0'; -const CHANNEL_DIFFERENCE_LIMIT = 1000; + +const SHORTPOLL_CHANNEL_DIFFERENCE_LIMIT = 100; +const CATCH_UP_CHANNEL_DIFFERENCE_LIMIT = 1000; + +const SHORTPOLL_DEFAULT_TIMEOUT_MS = 1000; +const INITIAL_SHORTPOLL_TIMEOUT_MS = 10000; +const CHANNEL_DIFFERENCE_RETRY_TIMEOUT_MS = 5000; const UPDATE_WAIT_TIMEOUT = 500; +const TERMINAL_CHANNEL_DIFFERENCE_ERRORS = new Set([ + 'CHANNEL_INVALID', + 'CHANNEL_PRIVATE', +]); + let invoke: typeof invokeRequest; let isInited = false; let seqTimeout: ReturnType | undefined; -const PTS_TIMEOUTS = new Map>(); +const CHANNEL_SCHEDULERS = new Map(); +const OPENED_CHANNEL_IDS = new Set(); const SEQ_QUEUE = new SortedQueue(seqComparator); const PTS_QUEUE = new Map>(); @@ -85,7 +107,7 @@ export function processUpdate(update: Update, isFromDifference?: boolean, should if ('pts' in update) { if (update instanceof GramJs.UpdateChannelTooLong) { - getChannelDifference(getUpdateChannelId(update)); + scheduleChannelDifference(getUpdateChannelId(update), 'gapRecovery', 0); return; } if (isFromDifference) { @@ -215,8 +237,8 @@ function popPtsQueue(channelId: string) { if (update._isFromDifference && pts >= localPts + ptsCount) { applyUpdate(update); } else if (pts === localPts + ptsCount) { - clearTimeout(PTS_TIMEOUTS.get(channelId)); - PTS_TIMEOUTS.delete(channelId); + clearScheduledChannelDifference(channelId, 'gapRecovery'); + scheduleShortpollFromNow(channelId); applyUpdate(update); } else if (pts > localPts + ptsCount) { @@ -233,13 +255,114 @@ function popPtsQueue(channelId: string) { } export function scheduleGetChannelDifference(channelId: string) { - if (PTS_TIMEOUTS.has(channelId)) return; + scheduleChannelDifference(channelId, 'gapRecovery', UPDATE_WAIT_TIMEOUT); +} - const timeout = setTimeout(async () => { - await getChannelDifference(channelId); - PTS_TIMEOUTS.delete(channelId); - }, UPDATE_WAIT_TIMEOUT); - PTS_TIMEOUTS.set(channelId, timeout); +export function requestChannelDifference(channelId: string) { + scheduleChannelDifference(channelId, 'gapRecovery', 0); +} + +export function setOpenedChannelIds(channelIds: string[]) { + const nextOpenedChannelIds = new Set(channelIds); + + OPENED_CHANNEL_IDS.forEach((channelId) => { + if (nextOpenedChannelIds.has(channelId)) { + return; + } + + getOrCreateChannelScheduler(channelId).isShortpollEligible = false; + clearScheduledChannelDifference(channelId, 'shortpoll'); + }); + + channelIds.forEach((channelId) => { + const scheduler = getOrCreateChannelScheduler(channelId); + const wasOpened = OPENED_CHANNEL_IDS.has(channelId); + + scheduler.isShortpollEligible = true; + + if (!wasOpened) { + if (scheduler.shortpollTimeoutMs !== undefined) { + restartShortpollFromNow(channelId); + } else { + scheduleChannelDifference(channelId, 'shortpoll', INITIAL_SHORTPOLL_TIMEOUT_MS); + } + } + }); + + OPENED_CHANNEL_IDS.clear(); + channelIds.forEach((channelId) => { + OPENED_CHANNEL_IDS.add(channelId); + }); +} + +function getOrCreateChannelScheduler(channelId: string) { + const current = CHANNEL_SCHEDULERS.get(channelId); + if (current) { + return current; + } + + const scheduler: ChannelScheduler = { + isInFlight: false, + }; + CHANNEL_SCHEDULERS.set(channelId, scheduler); + return scheduler; +} + +function scheduleChannelDifference(channelId: string, reason: ChannelDifferenceReason, timeoutMs: number) { + const scheduler = getOrCreateChannelScheduler(channelId); + const deadline = Date.now() + timeoutMs; + if (scheduler.deadline !== undefined && scheduler.deadline <= deadline) { + return; + } + + clearScheduledChannelDifference(channelId); + + scheduler.reason = reason; + scheduler.deadline = deadline; + scheduler.timeout = setTimeout(() => { + scheduler.timeout = undefined; + scheduler.deadline = undefined; + if (scheduler.isInFlight) { + scheduleChannelDifference(channelId, reason, UPDATE_WAIT_TIMEOUT); + return; + } + + void runChannelDifference(channelId, reason); + }, timeoutMs); +} + +function clearScheduledChannelDifference(channelId: string, reason?: ChannelDifferenceReason) { + const scheduler = CHANNEL_SCHEDULERS.get(channelId); + if (!scheduler?.timeout || (reason && scheduler.reason !== reason)) { + return; + } + + clearTimeout(scheduler.timeout); + scheduler.timeout = undefined; + scheduler.deadline = undefined; + scheduler.reason = undefined; +} + +function scheduleShortpollFromNow(channelId: string) { + const scheduler = CHANNEL_SCHEDULERS.get(channelId); + if (!scheduler?.isShortpollEligible || scheduler.shortpollTimeoutMs === undefined) { + return; + } + + scheduleChannelDifference(channelId, 'shortpoll', scheduler.shortpollTimeoutMs); +} + +function restartShortpollFromNow(channelId: string) { + const scheduler = CHANNEL_SCHEDULERS.get(channelId); + if (!scheduler?.isShortpollEligible || scheduler.shortpollTimeoutMs === undefined || scheduler.isInFlight) { + return; + } + + if (scheduler.reason === 'shortpoll') { + clearScheduledChannelDifference(channelId); + } + + scheduleChannelDifference(channelId, 'shortpoll', scheduler.shortpollTimeoutMs); } function scheduleGetDifference() { @@ -316,9 +439,30 @@ export async function getDifference() { }); } -async function getChannelDifference(channelId: string) { +async function runChannelDifference(channelId: string, reason: ChannelDifferenceReason) { + const scheduler = getOrCreateChannelScheduler(channelId); + if (scheduler.isInFlight) { + return; + } + + scheduler.isInFlight = true; + scheduler.reason = reason; + + try { + await requestChannelDifferenceInternal(channelId, reason); + } finally { + scheduler.isInFlight = false; + } +} + +async function requestChannelDifferenceInternal(channelId: string, reason: ChannelDifferenceReason): Promise { const channel = localDb.chats[channelId]; - if (!channel || !(channel instanceof GramJs.Channel) || !channel.accessHash || !localDb.channelPtsById[channelId]) { + if ( + !channel + || !(channel instanceof GramJs.Channel) + || !channel.accessHash + || localDb.channelPtsById[channelId] === undefined + ) { if (DEBUG) { // eslint-disable-next-line no-console console.error('[UpdateManager] Channel for difference not found', channelId, channel); @@ -326,18 +470,25 @@ async function getChannelDifference(channelId: string) { return; } - const response = await invoke(new GramJs.updates.GetChannelDifference({ - channel: buildInputChannel(channelId, channel.accessHash.toString()), - pts: localDb.channelPtsById[channelId], - filter: new GramJs.ChannelMessagesFilterEmpty(), - limit: CHANNEL_DIFFERENCE_LIMIT, - })); + const limit = reason === 'shortpoll' ? SHORTPOLL_CHANNEL_DIFFERENCE_LIMIT : CATCH_UP_CHANNEL_DIFFERENCE_LIMIT; + let response: GramJs.updates.TypeChannelDifference; - if (!response) { - if (DEBUG) { - // eslint-disable-next-line no-console - console.warn('[UpdatesManager] Failed to get ChannelDifference', channelId, channel); + try { + const result = await invoke(new GramJs.updates.GetChannelDifference({ + channel: buildInputChannel(channelId, channel.accessHash.toString()), + pts: localDb.channelPtsById[channelId], + filter: new GramJs.ChannelMessagesFilterEmpty(), + limit, + }), { + shouldThrow: true, + }); + if (!result) { + return; } + + response = result; + } catch (err) { + handleChannelDifferenceError(channelId, reason, err); return; } @@ -347,8 +498,13 @@ async function getChannelDifference(channelId: string) { } localDb.channelPtsById[channelId] = response.pts; + updateChannelShortpollTimeout(channelId, response); if (response instanceof GramJs.updates.ChannelDifferenceEmpty) { + if (response.final) { + scheduleShortpollIfEligible(channelId); + } + popPtsQueue(channelId); // Continue processing updates in queue return; } @@ -356,8 +512,45 @@ async function getChannelDifference(channelId: string) { processDifference(response, channelId); if (!response.final) { - getChannelDifference(channelId); + await requestChannelDifferenceInternal(channelId, 'gapRecovery'); + return; } + + scheduleShortpollIfEligible(channelId); +} + +function updateChannelShortpollTimeout(channelId: string, response: GramJs.updates.TypeChannelDifference) { + const scheduler = getOrCreateChannelScheduler(channelId); + scheduler.shortpollTimeoutMs = ('timeout' in response && response.timeout) + ? response.timeout * 1000 + : SHORTPOLL_DEFAULT_TIMEOUT_MS; +} + +function scheduleShortpollIfEligible(channelId: string) { + const scheduler = getOrCreateChannelScheduler(channelId); + if (!scheduler.isShortpollEligible) { + return; + } + + scheduleShortpollFromNow(channelId); +} + +function handleChannelDifferenceError(channelId: string, reason: ChannelDifferenceReason, err: unknown) { + if (DEBUG) { + // eslint-disable-next-line no-console + console.warn('[UpdatesManager] Failed to get ChannelDifference', channelId, err); + } + + const scheduler = getOrCreateChannelScheduler(channelId); + const errorMessage = err instanceof RPCError ? err.errorMessage : undefined; + + if (errorMessage && TERMINAL_CHANNEL_DIFFERENCE_ERRORS.has(errorMessage)) { + scheduler.isShortpollEligible = false; + clearScheduledChannelDifference(channelId); + return; + } + + scheduleChannelDifference(channelId, reason, CHANNEL_DIFFERENCE_RETRY_TIMEOUT_MS); } function forceSync() { @@ -377,10 +570,15 @@ export function reset() { clearTimeout(seqTimeout); seqTimeout = undefined; - PTS_TIMEOUTS.forEach((timeout) => { + CHANNEL_SCHEDULERS.forEach(({ timeout }) => { + if (!timeout) { + return; + } + clearTimeout(timeout); }); - PTS_TIMEOUTS.clear(); + CHANNEL_SCHEDULERS.clear(); + OPENED_CHANNEL_IDS.clear(); localDb.commonBoxState = {}; diff --git a/src/global/actions/apiUpdaters/initial.ts b/src/global/actions/apiUpdaters/initial.ts index e980c49be..6ca8d5a29 100644 --- a/src/global/actions/apiUpdaters/initial.ts +++ b/src/global/actions/apiUpdaters/initial.ts @@ -14,17 +14,20 @@ import type { ActionReturnType, GlobalState } from '../../types'; import { getCurrentTabId } from '../../../util/establishMultitabRole'; import { getShippingError, shouldClosePaymentModal } from '../../../util/getReadableErrorText'; -import { unique } from '../../../util/iteratees'; import { getAccountsInfo, getAccountSlotUrl } from '../../../util/multiaccount'; import { oldSetLanguage } from '../../../util/oldLangProvider'; import { clearWebTokenAuth } from '../../../util/routing'; import { setServerTimeOffset } from '../../../util/serverTime'; import { updateSessionUserId } from '../../../util/sessions'; import { forceWebsync } from '../../../util/websync'; -import { isChatChannel, isChatSuperGroup } from '../../helpers'; import { addActionHandler, getActions, getGlobal, setGlobal, } from '../../index'; +import { + getOpenedShortpollChannelIds, + resetOpenedChannelShortpollState, + syncOpenedShortpollChannelIds, +} from '../../openedChannelShortpoll'; import { updateUser, updateUserFullInfo } from '../../reducers'; import { updateAuth } from '../../reducers/auth'; import { updateTabState } from '../../reducers/tabs'; @@ -85,6 +88,8 @@ addActionHandler('apiUpdate', (global, actions, update): ActionReturnType => { break; case 'requestSync': + resetOpenedChannelShortpollState(); + syncOpenedShortpollChannelIds(global); actions.sync(); break; @@ -260,15 +265,10 @@ function onUpdateConnectionState( setGlobal(global); if (global.isSynced) { - const channelStackIds = Object.values(global.byTabId) - .flatMap((tab) => tab.messageLists) - .map((messageList) => messageList.chatId) - .filter((chatId) => { - const chat = global.chats.byId[chatId]; - return chat && (isChatChannel(chat) || isChatSuperGroup(chat)); - }); - if (connectionState === 'connectionStateReady' && channelStackIds.length) { - unique(channelStackIds).forEach((chatId) => { + const channelStackIds = getOpenedShortpollChannelIds(global); + + if (connectionState === 'connectionStateReady' && tabState.isMasterTab && channelStackIds.length) { + channelStackIds.forEach((chatId) => { actions.requestChannelDifference({ chatId }); }); } diff --git a/src/global/intervals.ts b/src/global/intervals.ts index d64f31b22..910157c57 100644 --- a/src/global/intervals.ts +++ b/src/global/intervals.ts @@ -3,6 +3,7 @@ import { addCallback } from '../lib/teact/teactn'; import type { GlobalState } from './types'; import { getServerTime } from '../util/serverTime'; +import { resetOpenedChannelShortpollState, syncOpenedShortpollChannelIds } from './openedChannelShortpoll'; import { removePeerStory } from './reducers'; import { selectTabState } from './selectors'; import { getGlobal, setGlobal } from '.'; @@ -22,18 +23,30 @@ addCallback((global: GlobalState) => { if (isCurrentMaster === isPreviousMaster) return; if (isCurrentMaster && !isPreviousMaster) { - startIntervals(); + startIntervals(global); } else { stopIntervals(); } }); -function startIntervals() { +addCallback((global: GlobalState) => { + if (!selectTabState(global)?.isMasterTab) { + return; + } + + syncOpenedShortpollChannelIds(global); +}); + +function startIntervals(global: GlobalState) { if (intervals.length) return; + + resetOpenedChannelShortpollState(); intervals.push(window.setInterval(checkStoryExpiration, STORY_EXPIRATION_INTERVAL)); + syncOpenedShortpollChannelIds(global); } function stopIntervals() { + resetOpenedChannelShortpollState(); intervals.forEach((interval) => clearInterval(interval)); intervals = []; } diff --git a/src/global/openedChannelShortpoll.ts b/src/global/openedChannelShortpoll.ts new file mode 100644 index 000000000..ecfb47955 --- /dev/null +++ b/src/global/openedChannelShortpoll.ts @@ -0,0 +1,114 @@ +import type { ThreadId } from '../types'; +import type { GlobalState } from './types'; + +import { callApi } from '../api/gramjs'; +import { isChatChannel, isChatSuperGroup } from './helpers'; +import { selectCurrentMessageList, selectTabState } from './selectors'; + +const MAX_OPENED_CHANNELS = 10; + +type OpenedChannelEntrySource = 'visible' | 'preview'; + +type OpenedChannelEntry = { + key: string; + channelId: string; + source: OpenedChannelEntrySource; +}; + +let openedEntryMru: string[] = []; +let lastReportedChannelIds: string[] = []; + +export function resetOpenedChannelShortpollState() { + openedEntryMru = []; + lastReportedChannelIds = []; +} + +export function getOpenedShortpollChannelIds(global: GlobalState) { + const openedEntries = buildOpenedChannelEntries(global); + const entriesByKey = new Map(openedEntries.map((entry) => [entry.key, entry])); + const openedKeys = new Set(entriesByKey.keys()); + + openedEntryMru = openedEntryMru.filter((key) => openedKeys.has(key)); + + openedEntries + .map((entry) => entry.key) + .reverse() + .forEach((key) => { + if (openedEntryMru.indexOf(key) === -1) { + openedEntryMru.unshift(key); + } + }); + + const channelIds: string[] = []; + const channelIdsSet = new Set(); + + openedEntryMru.forEach((key) => { + if (channelIds.length >= MAX_OPENED_CHANNELS) { + return; + } + + const entry = entriesByKey.get(key); + if (!entry || channelIdsSet.has(entry.channelId)) { + return; + } + + channelIds.push(entry.channelId); + channelIdsSet.add(entry.channelId); + }); + + return channelIds; +} + +export function syncOpenedShortpollChannelIds(global: GlobalState) { + if (!selectTabState(global).isMasterTab) { + return; + } + + const channelIds = getOpenedShortpollChannelIds(global); + if (areArraysEqual(channelIds, lastReportedChannelIds)) { + return; + } + + lastReportedChannelIds = channelIds; + void callApi('setOpenedChannelIds', channelIds); +} + +function buildOpenedChannelEntries(global: GlobalState) { + return Object.values(global.byTabId).flatMap(({ id: tabId, quickPreview }) => { + const entries: OpenedChannelEntry[] = []; + const currentMessageList = selectCurrentMessageList(global, tabId); + + if (currentMessageList && shouldShortpollChannel(global, currentMessageList.chatId)) { + entries.push({ + key: buildOpenedChannelEntryKey(tabId, currentMessageList.chatId, currentMessageList.threadId, 'visible'), + channelId: currentMessageList.chatId, + source: 'visible', + }); + } + + if (quickPreview && shouldShortpollChannel(global, quickPreview.chatId)) { + entries.push({ + key: buildOpenedChannelEntryKey(tabId, quickPreview.chatId, quickPreview.threadId, 'preview'), + channelId: quickPreview.chatId, + source: 'preview', + }); + } + + return entries; + }); +} + +function shouldShortpollChannel(global: GlobalState, chatId: string) { + const chat = global.chats.byId[chatId]; + return Boolean(chat && (isChatChannel(chat) || isChatSuperGroup(chat))); +} + +function buildOpenedChannelEntryKey( + tabId: number, chatId: string, threadId: ThreadId | undefined, source: OpenedChannelEntrySource, +) { + return `${tabId}:${source}:${chatId}:${threadId || 0}`; +} + +function areArraysEqual(first: string[], second: string[]) { + return first.length === second.length && first.every((value, index) => value === second[index]); +}