GramJS: Call getChannelDifference for opened chats (#6887)

This commit is contained in:
zubiden 2026-05-05 13:46:19 +02:00 committed by Alexander Zinchuk
parent c89d8089a1
commit 889bca140d
8 changed files with 385 additions and 43 deletions

View File

@ -192,7 +192,8 @@ export async function fetchChats({
const chat = buildApiChatFromDialog(dialog, peerEntity);
lastMessageByChatId[chat.id] = dialog.topMessage;
if (dialog.pts) {
const isChannel = getEntityTypeById(chat.id) === 'channel';
if (dialog.pts && isChannel) {
updateChannelState(chat.id, dialog.pts);
}
@ -525,6 +526,11 @@ export async function requestChatUpdate({
const chatUpdate = buildApiChatFromDialog(dialog, peerEntity);
const isChannel = getEntityTypeById(chat.id) === 'channel';
if (dialog.pts && isChannel) {
updateChannelState(chat.id, dialog.pts);
}
const readState = buildThreadReadState(dialog);
const threadInfo = buildApiThreadInfoFromDialog(chat.id, dialog);
sendApiUpdate({

View File

@ -51,8 +51,9 @@ import {
getDifference,
init as initUpdatesManager,
processUpdate,
requestChannelDifference as requestChannelDifferenceFromUpdates,
reset as resetUpdatesManager,
scheduleGetChannelDifference,
setOpenedChannelIds as setOpenedChannelIdsInUpdates,
updateChannelState,
} from '../updates/updateManager';
import {
@ -660,5 +661,9 @@ export function setShouldDebugExportedSenders(value: boolean) {
}
export function requestChannelDifference(channelId: string) {
scheduleGetChannelDifference(channelId);
requestChannelDifferenceFromUpdates(channelId);
}
export function setOpenedChannelIds(channelIds: string[]) {
setOpenedChannelIdsInUpdates(channelIds);
}

View File

@ -1,6 +1,7 @@
export {
destroy, disconnect, downloadMedia, fetchCurrentUser, repairFileReference, abortChatRequests, abortRequestGroup,
setForceHttpTransport, setShouldDebugExportedSenders, setAllowHttpTransport, requestChannelDifference,
setOpenedChannelIds,
} from './client';
export {

View File

@ -162,6 +162,7 @@ export async function fetchMessages({
const RequestClass = threadId === MAIN_THREAD_ID
? GramJs.messages.GetHistory : isSavedDialog
? GramJs.messages.GetSavedHistory : GramJs.messages.GetReplies;
const isChannel = getEntityTypeById(chat.id) === 'channel';
let result;
try {
@ -203,6 +204,10 @@ export async function fetchMessages({
return undefined;
}
if (isChannel && 'pts' in result) {
updateChannelState(chat.id, result.pts);
}
const messages = result.messages.map(buildApiMessage).filter(Boolean);
const users = result.users.map(buildApiUser).filter(Boolean);
const chats = result.chats.map((c) => buildApiChatFromPreview(c)).filter(Boolean);
@ -259,7 +264,7 @@ export async function fetchMessage({ chat, messageId }: { chat: ApiChat; message
return undefined;
}
if ('pts' in result) {
if (isChannel && 'pts' in result) {
updateChannelState(chat.id, result.pts);
}

View File

@ -1,4 +1,5 @@
import { Api as GramJs, type Update } from '../../../lib/gramjs';
import { RPCError } from '../../../lib/gramjs/errors';
import { UpdateConnectionState, UpdateServerTimeOffset } from '../../../lib/gramjs/network';
import type { Entity } from '../../../lib/gramjs/types';
@ -24,16 +25,37 @@ export type State = {
};
type SeqUpdate = (GramJs.Updates | GramJs.UpdatesCombined) & { _isFromDifference?: true };
type PtsUpdate = ((GramJs.TypeUpdate & { pts: number }) | UpdatePts) & { _isFromDifference?: true };
type ChannelDifferenceReason = 'gapRecovery' | 'shortpoll';
type ChannelScheduler = {
timeout?: ReturnType<typeof setTimeout>;
deadline?: number;
reason?: ChannelDifferenceReason;
isInFlight: boolean;
shortpollTimeoutMs?: number;
isShortpollEligible?: boolean;
};
const COMMON_BOX_QUEUE_ID = '0';
const CHANNEL_DIFFERENCE_LIMIT = 1000;
const SHORTPOLL_CHANNEL_DIFFERENCE_LIMIT = 100;
const CATCH_UP_CHANNEL_DIFFERENCE_LIMIT = 1000;
const SHORTPOLL_DEFAULT_TIMEOUT_MS = 1000;
const INITIAL_SHORTPOLL_TIMEOUT_MS = 10000;
const CHANNEL_DIFFERENCE_RETRY_TIMEOUT_MS = 5000;
const UPDATE_WAIT_TIMEOUT = 500;
const TERMINAL_CHANNEL_DIFFERENCE_ERRORS = new Set([
'CHANNEL_INVALID',
'CHANNEL_PRIVATE',
]);
let invoke: typeof invokeRequest;
let isInited = false;
let seqTimeout: ReturnType<typeof setTimeout> | undefined;
const PTS_TIMEOUTS = new Map<string, ReturnType<typeof setTimeout>>();
const CHANNEL_SCHEDULERS = new Map<string, ChannelScheduler>();
const OPENED_CHANNEL_IDS = new Set<string>();
const SEQ_QUEUE = new SortedQueue<SeqUpdate>(seqComparator);
const PTS_QUEUE = new Map<string, SortedQueue<PtsUpdate>>();
@ -85,7 +107,7 @@ export function processUpdate(update: Update, isFromDifference?: boolean, should
if ('pts' in update) {
if (update instanceof GramJs.UpdateChannelTooLong) {
getChannelDifference(getUpdateChannelId(update));
scheduleChannelDifference(getUpdateChannelId(update), 'gapRecovery', 0);
return;
}
if (isFromDifference) {
@ -215,8 +237,8 @@ function popPtsQueue(channelId: string) {
if (update._isFromDifference && pts >= localPts + ptsCount) {
applyUpdate(update);
} else if (pts === localPts + ptsCount) {
clearTimeout(PTS_TIMEOUTS.get(channelId));
PTS_TIMEOUTS.delete(channelId);
clearScheduledChannelDifference(channelId, 'gapRecovery');
scheduleShortpollFromNow(channelId);
applyUpdate(update);
} else if (pts > localPts + ptsCount) {
@ -233,13 +255,114 @@ function popPtsQueue(channelId: string) {
}
export function scheduleGetChannelDifference(channelId: string) {
if (PTS_TIMEOUTS.has(channelId)) return;
scheduleChannelDifference(channelId, 'gapRecovery', UPDATE_WAIT_TIMEOUT);
}
const timeout = setTimeout(async () => {
await getChannelDifference(channelId);
PTS_TIMEOUTS.delete(channelId);
}, UPDATE_WAIT_TIMEOUT);
PTS_TIMEOUTS.set(channelId, timeout);
export function requestChannelDifference(channelId: string) {
scheduleChannelDifference(channelId, 'gapRecovery', 0);
}
export function setOpenedChannelIds(channelIds: string[]) {
const nextOpenedChannelIds = new Set(channelIds);
OPENED_CHANNEL_IDS.forEach((channelId) => {
if (nextOpenedChannelIds.has(channelId)) {
return;
}
getOrCreateChannelScheduler(channelId).isShortpollEligible = false;
clearScheduledChannelDifference(channelId, 'shortpoll');
});
channelIds.forEach((channelId) => {
const scheduler = getOrCreateChannelScheduler(channelId);
const wasOpened = OPENED_CHANNEL_IDS.has(channelId);
scheduler.isShortpollEligible = true;
if (!wasOpened) {
if (scheduler.shortpollTimeoutMs !== undefined) {
restartShortpollFromNow(channelId);
} else {
scheduleChannelDifference(channelId, 'shortpoll', INITIAL_SHORTPOLL_TIMEOUT_MS);
}
}
});
OPENED_CHANNEL_IDS.clear();
channelIds.forEach((channelId) => {
OPENED_CHANNEL_IDS.add(channelId);
});
}
function getOrCreateChannelScheduler(channelId: string) {
const current = CHANNEL_SCHEDULERS.get(channelId);
if (current) {
return current;
}
const scheduler: ChannelScheduler = {
isInFlight: false,
};
CHANNEL_SCHEDULERS.set(channelId, scheduler);
return scheduler;
}
function scheduleChannelDifference(channelId: string, reason: ChannelDifferenceReason, timeoutMs: number) {
const scheduler = getOrCreateChannelScheduler(channelId);
const deadline = Date.now() + timeoutMs;
if (scheduler.deadline !== undefined && scheduler.deadline <= deadline) {
return;
}
clearScheduledChannelDifference(channelId);
scheduler.reason = reason;
scheduler.deadline = deadline;
scheduler.timeout = setTimeout(() => {
scheduler.timeout = undefined;
scheduler.deadline = undefined;
if (scheduler.isInFlight) {
scheduleChannelDifference(channelId, reason, UPDATE_WAIT_TIMEOUT);
return;
}
void runChannelDifference(channelId, reason);
}, timeoutMs);
}
function clearScheduledChannelDifference(channelId: string, reason?: ChannelDifferenceReason) {
const scheduler = CHANNEL_SCHEDULERS.get(channelId);
if (!scheduler?.timeout || (reason && scheduler.reason !== reason)) {
return;
}
clearTimeout(scheduler.timeout);
scheduler.timeout = undefined;
scheduler.deadline = undefined;
scheduler.reason = undefined;
}
function scheduleShortpollFromNow(channelId: string) {
const scheduler = CHANNEL_SCHEDULERS.get(channelId);
if (!scheduler?.isShortpollEligible || scheduler.shortpollTimeoutMs === undefined) {
return;
}
scheduleChannelDifference(channelId, 'shortpoll', scheduler.shortpollTimeoutMs);
}
function restartShortpollFromNow(channelId: string) {
const scheduler = CHANNEL_SCHEDULERS.get(channelId);
if (!scheduler?.isShortpollEligible || scheduler.shortpollTimeoutMs === undefined || scheduler.isInFlight) {
return;
}
if (scheduler.reason === 'shortpoll') {
clearScheduledChannelDifference(channelId);
}
scheduleChannelDifference(channelId, 'shortpoll', scheduler.shortpollTimeoutMs);
}
function scheduleGetDifference() {
@ -316,9 +439,30 @@ export async function getDifference() {
});
}
async function getChannelDifference(channelId: string) {
async function runChannelDifference(channelId: string, reason: ChannelDifferenceReason) {
const scheduler = getOrCreateChannelScheduler(channelId);
if (scheduler.isInFlight) {
return;
}
scheduler.isInFlight = true;
scheduler.reason = reason;
try {
await requestChannelDifferenceInternal(channelId, reason);
} finally {
scheduler.isInFlight = false;
}
}
async function requestChannelDifferenceInternal(channelId: string, reason: ChannelDifferenceReason): Promise<void> {
const channel = localDb.chats[channelId];
if (!channel || !(channel instanceof GramJs.Channel) || !channel.accessHash || !localDb.channelPtsById[channelId]) {
if (
!channel
|| !(channel instanceof GramJs.Channel)
|| !channel.accessHash
|| localDb.channelPtsById[channelId] === undefined
) {
if (DEBUG) {
// eslint-disable-next-line no-console
console.error('[UpdateManager] Channel for difference not found', channelId, channel);
@ -326,18 +470,25 @@ async function getChannelDifference(channelId: string) {
return;
}
const response = await invoke(new GramJs.updates.GetChannelDifference({
channel: buildInputChannel(channelId, channel.accessHash.toString()),
pts: localDb.channelPtsById[channelId],
filter: new GramJs.ChannelMessagesFilterEmpty(),
limit: CHANNEL_DIFFERENCE_LIMIT,
}));
const limit = reason === 'shortpoll' ? SHORTPOLL_CHANNEL_DIFFERENCE_LIMIT : CATCH_UP_CHANNEL_DIFFERENCE_LIMIT;
let response: GramJs.updates.TypeChannelDifference;
if (!response) {
if (DEBUG) {
// eslint-disable-next-line no-console
console.warn('[UpdatesManager] Failed to get ChannelDifference', channelId, channel);
try {
const result = await invoke(new GramJs.updates.GetChannelDifference({
channel: buildInputChannel(channelId, channel.accessHash.toString()),
pts: localDb.channelPtsById[channelId],
filter: new GramJs.ChannelMessagesFilterEmpty(),
limit,
}), {
shouldThrow: true,
});
if (!result) {
return;
}
response = result;
} catch (err) {
handleChannelDifferenceError(channelId, reason, err);
return;
}
@ -347,8 +498,13 @@ async function getChannelDifference(channelId: string) {
}
localDb.channelPtsById[channelId] = response.pts;
updateChannelShortpollTimeout(channelId, response);
if (response instanceof GramJs.updates.ChannelDifferenceEmpty) {
if (response.final) {
scheduleShortpollIfEligible(channelId);
}
popPtsQueue(channelId); // Continue processing updates in queue
return;
}
@ -356,8 +512,45 @@ async function getChannelDifference(channelId: string) {
processDifference(response, channelId);
if (!response.final) {
getChannelDifference(channelId);
await requestChannelDifferenceInternal(channelId, 'gapRecovery');
return;
}
scheduleShortpollIfEligible(channelId);
}
function updateChannelShortpollTimeout(channelId: string, response: GramJs.updates.TypeChannelDifference) {
const scheduler = getOrCreateChannelScheduler(channelId);
scheduler.shortpollTimeoutMs = ('timeout' in response && response.timeout)
? response.timeout * 1000
: SHORTPOLL_DEFAULT_TIMEOUT_MS;
}
function scheduleShortpollIfEligible(channelId: string) {
const scheduler = getOrCreateChannelScheduler(channelId);
if (!scheduler.isShortpollEligible) {
return;
}
scheduleShortpollFromNow(channelId);
}
function handleChannelDifferenceError(channelId: string, reason: ChannelDifferenceReason, err: unknown) {
if (DEBUG) {
// eslint-disable-next-line no-console
console.warn('[UpdatesManager] Failed to get ChannelDifference', channelId, err);
}
const scheduler = getOrCreateChannelScheduler(channelId);
const errorMessage = err instanceof RPCError ? err.errorMessage : undefined;
if (errorMessage && TERMINAL_CHANNEL_DIFFERENCE_ERRORS.has(errorMessage)) {
scheduler.isShortpollEligible = false;
clearScheduledChannelDifference(channelId);
return;
}
scheduleChannelDifference(channelId, reason, CHANNEL_DIFFERENCE_RETRY_TIMEOUT_MS);
}
function forceSync() {
@ -377,10 +570,15 @@ export function reset() {
clearTimeout(seqTimeout);
seqTimeout = undefined;
PTS_TIMEOUTS.forEach((timeout) => {
CHANNEL_SCHEDULERS.forEach(({ timeout }) => {
if (!timeout) {
return;
}
clearTimeout(timeout);
});
PTS_TIMEOUTS.clear();
CHANNEL_SCHEDULERS.clear();
OPENED_CHANNEL_IDS.clear();
localDb.commonBoxState = {};

View File

@ -14,17 +14,20 @@ import type { ActionReturnType, GlobalState } from '../../types';
import { getCurrentTabId } from '../../../util/establishMultitabRole';
import { getShippingError, shouldClosePaymentModal } from '../../../util/getReadableErrorText';
import { unique } from '../../../util/iteratees';
import { getAccountsInfo, getAccountSlotUrl } from '../../../util/multiaccount';
import { oldSetLanguage } from '../../../util/oldLangProvider';
import { clearWebTokenAuth } from '../../../util/routing';
import { setServerTimeOffset } from '../../../util/serverTime';
import { updateSessionUserId } from '../../../util/sessions';
import { forceWebsync } from '../../../util/websync';
import { isChatChannel, isChatSuperGroup } from '../../helpers';
import {
addActionHandler, getActions, getGlobal, setGlobal,
} from '../../index';
import {
getOpenedShortpollChannelIds,
resetOpenedChannelShortpollState,
syncOpenedShortpollChannelIds,
} from '../../openedChannelShortpoll';
import { updateUser, updateUserFullInfo } from '../../reducers';
import { updateAuth } from '../../reducers/auth';
import { updateTabState } from '../../reducers/tabs';
@ -85,6 +88,8 @@ addActionHandler('apiUpdate', (global, actions, update): ActionReturnType => {
break;
case 'requestSync':
resetOpenedChannelShortpollState();
syncOpenedShortpollChannelIds(global);
actions.sync();
break;
@ -260,15 +265,10 @@ function onUpdateConnectionState<T extends GlobalState>(
setGlobal(global);
if (global.isSynced) {
const channelStackIds = Object.values(global.byTabId)
.flatMap((tab) => tab.messageLists)
.map((messageList) => messageList.chatId)
.filter((chatId) => {
const chat = global.chats.byId[chatId];
return chat && (isChatChannel(chat) || isChatSuperGroup(chat));
});
if (connectionState === 'connectionStateReady' && channelStackIds.length) {
unique(channelStackIds).forEach((chatId) => {
const channelStackIds = getOpenedShortpollChannelIds(global);
if (connectionState === 'connectionStateReady' && tabState.isMasterTab && channelStackIds.length) {
channelStackIds.forEach((chatId) => {
actions.requestChannelDifference({ chatId });
});
}

View File

@ -3,6 +3,7 @@ import { addCallback } from '../lib/teact/teactn';
import type { GlobalState } from './types';
import { getServerTime } from '../util/serverTime';
import { resetOpenedChannelShortpollState, syncOpenedShortpollChannelIds } from './openedChannelShortpoll';
import { removePeerStory } from './reducers';
import { selectTabState } from './selectors';
import { getGlobal, setGlobal } from '.';
@ -22,18 +23,30 @@ addCallback((global: GlobalState) => {
if (isCurrentMaster === isPreviousMaster) return;
if (isCurrentMaster && !isPreviousMaster) {
startIntervals();
startIntervals(global);
} else {
stopIntervals();
}
});
function startIntervals() {
addCallback((global: GlobalState) => {
if (!selectTabState(global)?.isMasterTab) {
return;
}
syncOpenedShortpollChannelIds(global);
});
function startIntervals(global: GlobalState) {
if (intervals.length) return;
resetOpenedChannelShortpollState();
intervals.push(window.setInterval(checkStoryExpiration, STORY_EXPIRATION_INTERVAL));
syncOpenedShortpollChannelIds(global);
}
function stopIntervals() {
resetOpenedChannelShortpollState();
intervals.forEach((interval) => clearInterval(interval));
intervals = [];
}

View File

@ -0,0 +1,114 @@
import type { ThreadId } from '../types';
import type { GlobalState } from './types';
import { callApi } from '../api/gramjs';
import { isChatChannel, isChatSuperGroup } from './helpers';
import { selectCurrentMessageList, selectTabState } from './selectors';
const MAX_OPENED_CHANNELS = 10;
type OpenedChannelEntrySource = 'visible' | 'preview';
type OpenedChannelEntry = {
key: string;
channelId: string;
source: OpenedChannelEntrySource;
};
let openedEntryMru: string[] = [];
let lastReportedChannelIds: string[] = [];
export function resetOpenedChannelShortpollState() {
openedEntryMru = [];
lastReportedChannelIds = [];
}
export function getOpenedShortpollChannelIds(global: GlobalState) {
const openedEntries = buildOpenedChannelEntries(global);
const entriesByKey = new Map(openedEntries.map((entry) => [entry.key, entry]));
const openedKeys = new Set(entriesByKey.keys());
openedEntryMru = openedEntryMru.filter((key) => openedKeys.has(key));
openedEntries
.map((entry) => entry.key)
.reverse()
.forEach((key) => {
if (openedEntryMru.indexOf(key) === -1) {
openedEntryMru.unshift(key);
}
});
const channelIds: string[] = [];
const channelIdsSet = new Set<string>();
openedEntryMru.forEach((key) => {
if (channelIds.length >= MAX_OPENED_CHANNELS) {
return;
}
const entry = entriesByKey.get(key);
if (!entry || channelIdsSet.has(entry.channelId)) {
return;
}
channelIds.push(entry.channelId);
channelIdsSet.add(entry.channelId);
});
return channelIds;
}
export function syncOpenedShortpollChannelIds(global: GlobalState) {
if (!selectTabState(global).isMasterTab) {
return;
}
const channelIds = getOpenedShortpollChannelIds(global);
if (areArraysEqual(channelIds, lastReportedChannelIds)) {
return;
}
lastReportedChannelIds = channelIds;
void callApi('setOpenedChannelIds', channelIds);
}
function buildOpenedChannelEntries(global: GlobalState) {
return Object.values(global.byTabId).flatMap(({ id: tabId, quickPreview }) => {
const entries: OpenedChannelEntry[] = [];
const currentMessageList = selectCurrentMessageList(global, tabId);
if (currentMessageList && shouldShortpollChannel(global, currentMessageList.chatId)) {
entries.push({
key: buildOpenedChannelEntryKey(tabId, currentMessageList.chatId, currentMessageList.threadId, 'visible'),
channelId: currentMessageList.chatId,
source: 'visible',
});
}
if (quickPreview && shouldShortpollChannel(global, quickPreview.chatId)) {
entries.push({
key: buildOpenedChannelEntryKey(tabId, quickPreview.chatId, quickPreview.threadId, 'preview'),
channelId: quickPreview.chatId,
source: 'preview',
});
}
return entries;
});
}
function shouldShortpollChannel(global: GlobalState, chatId: string) {
const chat = global.chats.byId[chatId];
return Boolean(chat && (isChatChannel(chat) || isChatSuperGroup(chat)));
}
function buildOpenedChannelEntryKey(
tabId: number, chatId: string, threadId: ThreadId | undefined, source: OpenedChannelEntrySource,
) {
return `${tabId}:${source}:${chatId}:${threadId || 0}`;
}
function areArraysEqual(first: string[], second: string[]) {
return first.length === second.length && first.every((value, index) => value === second[index]);
}