[Perf] API and Post Message Connector: Batch postMessage calls
f-u
This commit is contained in:
parent
8b717b010f
commit
7b86865d67
@ -3,17 +3,17 @@ import type { TypedBroadcastChannel } from '../../../util/multitab';
|
||||
import type { ApiInitialArgs, ApiOnProgress, OnApiUpdate } from '../../types';
|
||||
import type { LocalDb } from '../localDb';
|
||||
import type { MethodArgs, MethodResponse, Methods } from '../methods/types';
|
||||
import type { OriginRequest, ThenArg, WorkerMessageEvent } from './types';
|
||||
import type { OriginPayload, ThenArg, WorkerMessageEvent } from './types';
|
||||
|
||||
import { DATA_BROADCAST_CHANNEL_NAME, DEBUG } from '../../../config';
|
||||
import { logDebugMessage } from '../../../util/debugConsole';
|
||||
import Deferred from '../../../util/Deferred';
|
||||
import { getCurrentTabId, subscribeToMasterChange } from '../../../util/establishMultitabRole';
|
||||
import generateUniqueId from '../../../util/generateUniqueId';
|
||||
import { pause } from '../../../util/schedulers';
|
||||
import { pause, throttleWithTickEnd } from '../../../util/schedulers';
|
||||
import { IS_MULTITAB_SUPPORTED } from '../../../util/windowEnvironment';
|
||||
|
||||
type RequestStates = {
|
||||
type RequestState = {
|
||||
messageId: string;
|
||||
resolve: Function;
|
||||
reject: Function;
|
||||
@ -26,8 +26,12 @@ const HEALTH_CHECK_MIN_DELAY = 5 * 1000; // 5 sec
|
||||
const NO_QUEUE_BEFORE_INIT = new Set(['destroy']);
|
||||
|
||||
let worker: Worker | undefined;
|
||||
const requestStates = new Map<string, RequestStates>();
|
||||
const requestStatesByCallback = new Map<AnyToVoidFunction, RequestStates>();
|
||||
|
||||
const requestStates = new Map<string, RequestState>();
|
||||
const requestStatesByCallback = new Map<AnyToVoidFunction, RequestState>();
|
||||
|
||||
let pendingPayloads: OriginPayload[] = [];
|
||||
|
||||
const savedLocalDb: LocalDb = {
|
||||
chats: {},
|
||||
users: {},
|
||||
@ -48,6 +52,17 @@ const channel = IS_MULTITAB_SUPPORTED
|
||||
? new BroadcastChannel(DATA_BROADCAST_CHANNEL_NAME) as TypedBroadcastChannel
|
||||
: undefined;
|
||||
|
||||
const postMessagesOnTickEnd = throttleWithTickEnd(() => {
|
||||
const payloads = pendingPayloads;
|
||||
pendingPayloads = [];
|
||||
worker?.postMessage({ payloads });
|
||||
});
|
||||
|
||||
function postMessageOnTickEnd(payload: OriginPayload) {
|
||||
pendingPayloads.push(payload);
|
||||
postMessagesOnTickEnd();
|
||||
}
|
||||
|
||||
export function initApiOnMasterTab(initialArgs: ApiInitialArgs) {
|
||||
if (!channel) return;
|
||||
|
||||
@ -250,7 +265,7 @@ export function cancelApiProgress(progressCallback: ApiOnProgress) {
|
||||
}
|
||||
|
||||
export function cancelApiProgressMaster(messageId: string) {
|
||||
worker?.postMessage({
|
||||
postMessageOnTickEnd({
|
||||
type: 'cancelProgress',
|
||||
messageId,
|
||||
});
|
||||
@ -258,34 +273,35 @@ export function cancelApiProgressMaster(messageId: string) {
|
||||
|
||||
function subscribeToWorker(onUpdate: OnApiUpdate) {
|
||||
worker?.addEventListener('message', ({ data }: WorkerMessageEvent) => {
|
||||
if (!data) return;
|
||||
if (data.type === 'updates') {
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
let DEBUG_startAt: number | undefined;
|
||||
if (DEBUG) {
|
||||
DEBUG_startAt = performance.now();
|
||||
}
|
||||
|
||||
data.updates.forEach(onUpdate);
|
||||
|
||||
if (DEBUG) {
|
||||
const duration = performance.now() - DEBUG_startAt!;
|
||||
if (duration > 5) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn(`[API] Slow updates processing: ${data.updates.length} updates in ${duration} ms`);
|
||||
data?.payloads.forEach((payload) => {
|
||||
if (payload.type === 'updates') {
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
let DEBUG_startAt: number | undefined;
|
||||
if (DEBUG) {
|
||||
DEBUG_startAt = performance.now();
|
||||
}
|
||||
|
||||
payload.updates.forEach(onUpdate);
|
||||
|
||||
if (DEBUG) {
|
||||
const duration = performance.now() - DEBUG_startAt!;
|
||||
if (duration > 5) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn(`[API] Slow updates processing: ${payload.updates.length} updates in ${duration} ms`);
|
||||
}
|
||||
}
|
||||
} else if (payload.type === 'methodResponse') {
|
||||
handleMethodResponse(payload);
|
||||
} else if (payload.type === 'methodCallback') {
|
||||
handleMethodCallback(payload);
|
||||
} else if (payload.type === 'unhandledError') {
|
||||
throw new Error(payload.error?.message);
|
||||
} else if (payload.type === 'sendBeacon') {
|
||||
navigator.sendBeacon(payload.url, payload.data);
|
||||
} else if (payload.type === 'debugLog') {
|
||||
logDebugMessage(payload.level, ...payload.args);
|
||||
}
|
||||
} else if (data.type === 'methodResponse') {
|
||||
handleMethodResponse(data);
|
||||
} else if (data.type === 'methodCallback') {
|
||||
handleMethodCallback(data);
|
||||
} else if (data.type === 'unhandledError') {
|
||||
throw new Error(data.error?.message);
|
||||
} else if (data.type === 'sendBeacon') {
|
||||
navigator.sendBeacon(data.url, data.data);
|
||||
} else if (data.type === 'debugLog') {
|
||||
logDebugMessage(data.level, ...data.args);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@ -323,7 +339,7 @@ function makeRequestToMaster(message: {
|
||||
...message,
|
||||
};
|
||||
|
||||
const requestState = { messageId } as RequestStates;
|
||||
const requestState = { messageId } as RequestState;
|
||||
|
||||
// Re-wrap type because of `postMessage`
|
||||
const promise: Promise<MethodResponse<keyof Methods>> = new Promise((resolve, reject) => {
|
||||
@ -355,14 +371,14 @@ function makeRequestToMaster(message: {
|
||||
return promise;
|
||||
}
|
||||
|
||||
function makeRequest(message: OriginRequest) {
|
||||
function makeRequest(message: OriginPayload) {
|
||||
const messageId = generateUniqueId();
|
||||
const payload: OriginRequest = {
|
||||
const payload: OriginPayload = {
|
||||
messageId,
|
||||
...message,
|
||||
};
|
||||
|
||||
const requestState = { messageId } as RequestStates;
|
||||
const requestState = { messageId } as RequestState;
|
||||
|
||||
// Re-wrap type because of `postMessage`
|
||||
const promise: Promise<MethodResponse<keyof Methods>> = new Promise((resolve, reject) => {
|
||||
@ -391,7 +407,7 @@ function makeRequest(message: OriginRequest) {
|
||||
}
|
||||
});
|
||||
|
||||
worker?.postMessage(payload);
|
||||
postMessageOnTickEnd(payload);
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
@ -5,59 +5,85 @@ import type { MethodArgs, MethodResponse, Methods } from '../methods/types';
|
||||
|
||||
export type ThenArg<T> = T extends Promise<infer U> ? U : T;
|
||||
|
||||
export type WorkerPayload =
|
||||
{
|
||||
type: 'updates';
|
||||
updates: ApiUpdate[];
|
||||
}
|
||||
|
|
||||
{
|
||||
type: 'methodResponse';
|
||||
messageId: string;
|
||||
response?: ThenArg<MethodResponse<keyof Methods>>;
|
||||
error?: { message: string };
|
||||
}
|
||||
|
|
||||
{
|
||||
type: 'methodCallback';
|
||||
messageId: string;
|
||||
callbackArgs: any[];
|
||||
}
|
||||
|
|
||||
{
|
||||
type: 'unhandledError';
|
||||
error?: { message: string };
|
||||
}
|
||||
|
|
||||
{
|
||||
type: 'sendBeacon';
|
||||
url: string;
|
||||
data: ArrayBuffer;
|
||||
}
|
||||
|
|
||||
{
|
||||
type: 'debugLog';
|
||||
level: DebugLevel;
|
||||
args: any[];
|
||||
};
|
||||
|
||||
export type WorkerMessageData = {
|
||||
type: 'updates';
|
||||
updates: ApiUpdate[];
|
||||
} | {
|
||||
type: 'methodResponse';
|
||||
messageId: string;
|
||||
response?: ThenArg<MethodResponse<keyof Methods>>;
|
||||
error?: { message: string };
|
||||
} | {
|
||||
type: 'methodCallback';
|
||||
messageId: string;
|
||||
callbackArgs: any[];
|
||||
} | {
|
||||
type: 'unhandledError';
|
||||
error?: { message: string };
|
||||
} | {
|
||||
type: 'sendBeacon';
|
||||
url: string;
|
||||
data: ArrayBuffer;
|
||||
} | {
|
||||
type: 'debugLog';
|
||||
level: DebugLevel;
|
||||
args: any[];
|
||||
payloads: WorkerPayload[];
|
||||
};
|
||||
|
||||
export interface WorkerMessageEvent {
|
||||
export type WorkerMessageEvent = {
|
||||
data: WorkerMessageData;
|
||||
}
|
||||
|
||||
export type OriginRequest = {
|
||||
type: 'initApi';
|
||||
messageId?: string;
|
||||
args: [ApiInitialArgs, LocalDb];
|
||||
} | {
|
||||
type: 'callMethod';
|
||||
messageId?: string;
|
||||
name: keyof Methods;
|
||||
args: MethodArgs<keyof Methods>;
|
||||
withCallback?: boolean;
|
||||
} | {
|
||||
type: 'ping';
|
||||
messageId?: string;
|
||||
} | {
|
||||
type: 'toggleDebugMode';
|
||||
messageId?: string;
|
||||
isEnabled?: boolean;
|
||||
};
|
||||
|
||||
export type OriginMessageData = OriginRequest | {
|
||||
type: 'cancelProgress';
|
||||
messageId: string;
|
||||
export type OriginPayload =
|
||||
{
|
||||
type: 'initApi';
|
||||
messageId?: string;
|
||||
args: [ApiInitialArgs, LocalDb];
|
||||
}
|
||||
|
|
||||
{
|
||||
type: 'callMethod';
|
||||
messageId?: string;
|
||||
name: keyof Methods;
|
||||
args: MethodArgs<keyof Methods>;
|
||||
withCallback?: boolean;
|
||||
}
|
||||
|
|
||||
{
|
||||
type: 'ping';
|
||||
messageId?: string;
|
||||
}
|
||||
|
|
||||
{
|
||||
type: 'toggleDebugMode';
|
||||
messageId?: string;
|
||||
isEnabled?: boolean;
|
||||
}
|
||||
|
|
||||
{
|
||||
type: 'cancelProgress';
|
||||
messageId: string;
|
||||
};
|
||||
|
||||
export type OriginMessageData = {
|
||||
payloads: OriginPayload[];
|
||||
};
|
||||
|
||||
export interface OriginMessageEvent {
|
||||
export type OriginMessageEvent = {
|
||||
data: OriginMessageData;
|
||||
}
|
||||
};
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
import type { DebugLevel } from '../../../util/debugConsole';
|
||||
import type { ApiOnProgress, ApiUpdate } from '../../types';
|
||||
import type { OriginMessageEvent, WorkerMessageData } from './types';
|
||||
import type { OriginMessageEvent, WorkerPayload } from './types';
|
||||
|
||||
import { DEBUG } from '../../../config';
|
||||
import { DEBUG_LEVELS } from '../../../util/debugConsole';
|
||||
@ -39,108 +39,111 @@ function disableDebugLog() {
|
||||
|
||||
handleErrors();
|
||||
|
||||
let pendingPayloads: WorkerPayload[] = [];
|
||||
let pendingTransferables: Transferable[] = [];
|
||||
|
||||
const callbackState = new Map<string, ApiOnProgress>();
|
||||
|
||||
if (DEBUG) {
|
||||
console.log('>>> FINISH LOAD WORKER');
|
||||
}
|
||||
|
||||
onmessage = async (message: OriginMessageEvent) => {
|
||||
const { data } = message;
|
||||
|
||||
switch (data.type) {
|
||||
case 'initApi': {
|
||||
const { messageId, args } = data;
|
||||
await initApi(onUpdate, args[0], args[1]);
|
||||
if (messageId) {
|
||||
sendToOrigin({
|
||||
type: 'methodResponse',
|
||||
messageId,
|
||||
response: true,
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'callMethod': {
|
||||
const {
|
||||
messageId, name, args, withCallback,
|
||||
} = data;
|
||||
try {
|
||||
if (messageId && withCallback) {
|
||||
const callback = (...callbackArgs: any[]) => {
|
||||
const lastArg = callbackArgs[callbackArgs.length - 1];
|
||||
|
||||
sendToOrigin({
|
||||
type: 'methodCallback',
|
||||
messageId,
|
||||
callbackArgs,
|
||||
}, lastArg instanceof ArrayBuffer ? lastArg : undefined);
|
||||
};
|
||||
|
||||
callbackState.set(messageId, callback);
|
||||
|
||||
args.push(callback as never);
|
||||
}
|
||||
|
||||
const response = await callApi(name, ...args);
|
||||
|
||||
if (DEBUG && typeof response === 'object' && 'CONSTRUCTOR_ID' in response) {
|
||||
log('UNEXPECTED RESPONSE', `${name}: ${response.className}`);
|
||||
}
|
||||
|
||||
const { arrayBuffer } = (typeof response === 'object' && 'arrayBuffer' in response && response) || {};
|
||||
|
||||
onmessage = ({ data }: OriginMessageEvent) => {
|
||||
data.payloads.forEach(async (payload) => {
|
||||
switch (payload.type) {
|
||||
case 'initApi': {
|
||||
const { messageId, args } = payload;
|
||||
await initApi(onUpdate, args[0], args[1]);
|
||||
if (messageId) {
|
||||
sendToOrigin({
|
||||
type: 'methodResponse',
|
||||
messageId,
|
||||
response,
|
||||
}, arrayBuffer);
|
||||
}
|
||||
} catch (error: any) {
|
||||
if (DEBUG) {
|
||||
console.error(error);
|
||||
}
|
||||
|
||||
if (messageId) {
|
||||
sendToOrigin({
|
||||
type: 'methodResponse',
|
||||
messageId,
|
||||
error: { message: error.message },
|
||||
response: true,
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'callMethod': {
|
||||
const {
|
||||
messageId, name, args, withCallback,
|
||||
} = payload;
|
||||
try {
|
||||
if (messageId && withCallback) {
|
||||
const callback = (...callbackArgs: any[]) => {
|
||||
const lastArg = callbackArgs[callbackArgs.length - 1];
|
||||
|
||||
if (messageId) {
|
||||
callbackState.delete(messageId);
|
||||
sendToOrigin({
|
||||
type: 'methodCallback',
|
||||
messageId,
|
||||
callbackArgs,
|
||||
}, lastArg instanceof ArrayBuffer ? lastArg : undefined);
|
||||
};
|
||||
|
||||
callbackState.set(messageId, callback);
|
||||
|
||||
args.push(callback as never);
|
||||
}
|
||||
|
||||
const response = await callApi(name, ...args);
|
||||
|
||||
if (DEBUG && typeof response === 'object' && 'CONSTRUCTOR_ID' in response) {
|
||||
log('UNEXPECTED RESPONSE', `${name}: ${response.className}`);
|
||||
}
|
||||
|
||||
const { arrayBuffer } = (typeof response === 'object' && 'arrayBuffer' in response && response) || {};
|
||||
|
||||
if (messageId) {
|
||||
sendToOrigin({
|
||||
type: 'methodResponse',
|
||||
messageId,
|
||||
response,
|
||||
}, arrayBuffer);
|
||||
}
|
||||
} catch (error: any) {
|
||||
if (DEBUG) {
|
||||
console.error(error);
|
||||
}
|
||||
|
||||
if (messageId) {
|
||||
sendToOrigin({
|
||||
type: 'methodResponse',
|
||||
messageId,
|
||||
error: { message: error.message },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (messageId) {
|
||||
callbackState.delete(messageId);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case 'cancelProgress': {
|
||||
const callback = callbackState.get(payload.messageId);
|
||||
if (callback) {
|
||||
cancelApiProgress(callback);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case 'cancelProgress': {
|
||||
const callback = callbackState.get(data.messageId);
|
||||
if (callback) {
|
||||
cancelApiProgress(callback);
|
||||
break;
|
||||
}
|
||||
case 'ping': {
|
||||
sendToOrigin({
|
||||
type: 'methodResponse',
|
||||
messageId: payload.messageId!,
|
||||
});
|
||||
|
||||
break;
|
||||
}
|
||||
case 'ping': {
|
||||
sendToOrigin({
|
||||
type: 'methodResponse',
|
||||
messageId: data.messageId!,
|
||||
});
|
||||
|
||||
break;
|
||||
}
|
||||
case 'toggleDebugMode': {
|
||||
if (data.isEnabled) {
|
||||
enableDebugLog();
|
||||
} else {
|
||||
disableDebugLog();
|
||||
break;
|
||||
}
|
||||
case 'toggleDebugMode': {
|
||||
if (payload.isEnabled) {
|
||||
enableDebugLog();
|
||||
} else {
|
||||
disableDebugLog();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
function handleErrors() {
|
||||
@ -176,10 +179,26 @@ function onUpdate(update: ApiUpdate) {
|
||||
sendUpdatesOnTickEnd();
|
||||
}
|
||||
|
||||
function sendToOrigin(data: WorkerMessageData, arrayBuffer?: ArrayBuffer) {
|
||||
if (arrayBuffer) {
|
||||
postMessage(data, [arrayBuffer]);
|
||||
const sendToOriginOnTickEnd = throttleWithTickEnd(() => {
|
||||
const data = { payloads: pendingPayloads };
|
||||
const transferables = pendingTransferables;
|
||||
|
||||
pendingPayloads = [];
|
||||
pendingTransferables = [];
|
||||
|
||||
if (transferables.length) {
|
||||
postMessage(data, transferables);
|
||||
} else {
|
||||
postMessage(data);
|
||||
}
|
||||
});
|
||||
|
||||
function sendToOrigin(payload: WorkerPayload, transferable?: Transferable) {
|
||||
pendingPayloads.push(payload);
|
||||
|
||||
if (transferable) {
|
||||
pendingTransferables.push(transferable);
|
||||
}
|
||||
|
||||
sendToOriginOnTickEnd();
|
||||
}
|
||||
|
||||
@ -173,6 +173,6 @@ const api = {
|
||||
'rlottie:destroy': destroy,
|
||||
};
|
||||
|
||||
createWorkerInterface(api);
|
||||
createWorkerInterface(api, 'media');
|
||||
|
||||
export type RLottieApi = typeof api;
|
||||
|
||||
@ -81,6 +81,6 @@ const api = {
|
||||
'video-preview:destroy': destroy,
|
||||
};
|
||||
|
||||
createWorkerInterface(api);
|
||||
createWorkerInterface(api, 'media');
|
||||
|
||||
export type VideoPreviewApi = typeof api;
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import generateUniqueId from './generateUniqueId';
|
||||
import { throttleWithTickEnd } from './schedulers';
|
||||
|
||||
export interface CancellableCallback {
|
||||
(
|
||||
@ -8,16 +9,13 @@ export interface CancellableCallback {
|
||||
isCanceled?: boolean;
|
||||
}
|
||||
|
||||
type InitData = {
|
||||
channel?: string;
|
||||
type InitPayload = {
|
||||
type: 'init';
|
||||
messageId?: string;
|
||||
name: 'init';
|
||||
args: any;
|
||||
};
|
||||
|
||||
type CallMethodData = {
|
||||
channel?: string;
|
||||
type CallMethodPayload = {
|
||||
type: 'callMethod';
|
||||
messageId?: string;
|
||||
name: string;
|
||||
@ -25,12 +23,21 @@ type CallMethodData = {
|
||||
withCallback?: boolean;
|
||||
};
|
||||
|
||||
export type OriginMessageData = InitData | CallMethodData | {
|
||||
channel?: string;
|
||||
type CancelProgressPayload = {
|
||||
type: 'cancelProgress';
|
||||
messageId: string;
|
||||
};
|
||||
|
||||
export type OriginPayload =
|
||||
InitPayload
|
||||
| CallMethodPayload
|
||||
| CancelProgressPayload;
|
||||
|
||||
export type OriginMessageData = {
|
||||
channel?: string;
|
||||
payloads: OriginPayload[];
|
||||
};
|
||||
|
||||
export interface OriginMessageEvent {
|
||||
data: OriginMessageData;
|
||||
}
|
||||
@ -39,32 +46,44 @@ export type ApiUpdate =
|
||||
{ type: string }
|
||||
& any;
|
||||
|
||||
export type WorkerPayload =
|
||||
{
|
||||
channel?: string;
|
||||
type: 'update';
|
||||
update: ApiUpdate;
|
||||
}
|
||||
|
|
||||
{
|
||||
channel?: string;
|
||||
type: 'methodResponse';
|
||||
messageId: string;
|
||||
response?: any;
|
||||
error?: { message: string };
|
||||
}
|
||||
|
|
||||
{
|
||||
channel?: string;
|
||||
type: 'methodCallback';
|
||||
messageId: string;
|
||||
callbackArgs: any[];
|
||||
}
|
||||
|
|
||||
{
|
||||
channel?: string;
|
||||
type: 'unhandledError';
|
||||
error?: { message: string };
|
||||
};
|
||||
|
||||
export type WorkerMessageData = {
|
||||
channel?: string;
|
||||
type: 'update';
|
||||
update: ApiUpdate;
|
||||
} | {
|
||||
channel?: string;
|
||||
type: 'methodResponse';
|
||||
messageId: string;
|
||||
response?: any;
|
||||
error?: { message: string };
|
||||
} | {
|
||||
channel?: string;
|
||||
type: 'methodCallback';
|
||||
messageId: string;
|
||||
callbackArgs: any[];
|
||||
} | {
|
||||
channel?: string;
|
||||
type: 'unhandledError';
|
||||
error?: { message: string };
|
||||
payloads: WorkerPayload[];
|
||||
};
|
||||
|
||||
export interface WorkerMessageEvent {
|
||||
data: WorkerMessageData;
|
||||
}
|
||||
|
||||
interface RequestStates {
|
||||
interface RequestState {
|
||||
messageId: string;
|
||||
resolve: Function;
|
||||
reject: Function;
|
||||
@ -82,9 +101,11 @@ export type RequestTypes<T extends InputRequestTypes> = Values<{
|
||||
}>;
|
||||
|
||||
class ConnectorClass<T extends InputRequestTypes> {
|
||||
private requestStates = new Map<string, RequestStates>();
|
||||
private requestStates = new Map<string, RequestState>();
|
||||
|
||||
private requestStatesByCallback = new Map<AnyToVoidFunction, RequestStates>();
|
||||
private requestStatesByCallback = new Map<AnyToVoidFunction, RequestState>();
|
||||
|
||||
private pendingPayloads: OriginPayload[] = [];
|
||||
|
||||
constructor(
|
||||
public target: Worker,
|
||||
@ -98,7 +119,7 @@ class ConnectorClass<T extends InputRequestTypes> {
|
||||
}
|
||||
|
||||
init(...args: any[]) {
|
||||
this.postMessage({
|
||||
this.postMessageOnTickEnd({
|
||||
type: 'init',
|
||||
args,
|
||||
});
|
||||
@ -108,13 +129,13 @@ class ConnectorClass<T extends InputRequestTypes> {
|
||||
const { requestStates, requestStatesByCallback } = this;
|
||||
|
||||
const messageId = generateUniqueId();
|
||||
const payload: CallMethodData = {
|
||||
const payload: CallMethodPayload = {
|
||||
type: 'callMethod',
|
||||
messageId,
|
||||
...messageData,
|
||||
};
|
||||
|
||||
const requestState = { messageId } as RequestStates;
|
||||
const requestState = { messageId } as RequestState;
|
||||
|
||||
// Re-wrap type because of `postMessage`
|
||||
const promise: Promise<any> = new Promise((resolve, reject) => {
|
||||
@ -140,7 +161,7 @@ class ConnectorClass<T extends InputRequestTypes> {
|
||||
}
|
||||
});
|
||||
|
||||
this.postMessage(payload);
|
||||
this.postMessageOnTickEnd(payload);
|
||||
|
||||
return promise;
|
||||
}
|
||||
@ -153,7 +174,7 @@ class ConnectorClass<T extends InputRequestTypes> {
|
||||
return;
|
||||
}
|
||||
|
||||
this.postMessage({
|
||||
this.postMessageOnTickEnd({
|
||||
type: 'cancelProgress',
|
||||
messageId,
|
||||
});
|
||||
@ -165,31 +186,43 @@ class ConnectorClass<T extends InputRequestTypes> {
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.type === 'update' && this.onUpdate) {
|
||||
this.onUpdate(data.update);
|
||||
}
|
||||
if (data.type === 'methodResponse') {
|
||||
const requestState = requestStates.get(data.messageId);
|
||||
if (requestState) {
|
||||
if (data.error) {
|
||||
requestState.reject(data.error);
|
||||
} else {
|
||||
requestState.resolve(data.response);
|
||||
}
|
||||
data.payloads.forEach((payload) => {
|
||||
if (payload.type === 'update' && this.onUpdate) {
|
||||
this.onUpdate(payload.update);
|
||||
}
|
||||
} else if (data.type === 'methodCallback') {
|
||||
const requestState = requestStates.get(data.messageId);
|
||||
requestState?.callback?.(...data.callbackArgs);
|
||||
} else if (data.type === 'unhandledError') {
|
||||
throw new Error(data.error?.message);
|
||||
}
|
||||
if (payload.type === 'methodResponse') {
|
||||
const requestState = requestStates.get(payload.messageId);
|
||||
if (requestState) {
|
||||
if (payload.error) {
|
||||
requestState.reject(payload.error);
|
||||
} else {
|
||||
requestState.resolve(payload.response);
|
||||
}
|
||||
}
|
||||
} else if (payload.type === 'methodCallback') {
|
||||
const requestState = requestStates.get(payload.messageId);
|
||||
requestState?.callback?.(...payload.callbackArgs);
|
||||
} else if (payload.type === 'unhandledError') {
|
||||
throw new Error(payload.error?.message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private postMessage(data: AnyLiteral) {
|
||||
data.channel = this.channel;
|
||||
|
||||
this.target.postMessage(data);
|
||||
private postMessageOnTickEnd(payload: OriginPayload) {
|
||||
this.pendingPayloads.push(payload);
|
||||
this.postMessagesOnTickEnd();
|
||||
}
|
||||
|
||||
private postMessagesOnTickEnd = throttleWithTickEnd(() => {
|
||||
const payloads = this.pendingPayloads;
|
||||
|
||||
this.pendingPayloads = [];
|
||||
|
||||
this.target.postMessage({
|
||||
channel: this.channel,
|
||||
payloads,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function createConnector<T extends InputRequestTypes>(
|
||||
|
||||
@ -1,10 +1,14 @@
|
||||
import type {
|
||||
ApiUpdate,
|
||||
CancellableCallback, OriginMessageData, OriginMessageEvent, WorkerMessageData,
|
||||
CancellableCallback,
|
||||
OriginMessageData,
|
||||
OriginMessageEvent,
|
||||
WorkerPayload,
|
||||
} from './PostMessageConnector';
|
||||
|
||||
import { DEBUG } from '../config';
|
||||
import { createCallbackManager } from './callbacks';
|
||||
import { throttleWithTickEnd } from './schedulers';
|
||||
|
||||
declare const self: WorkerGlobalScope;
|
||||
|
||||
@ -13,20 +17,37 @@ const callbackState = new Map<string, CancellableCallback>();
|
||||
type ApiConfig =
|
||||
((name: string, ...args: any[]) => any | [any, ArrayBuffer[]])
|
||||
| Record<string, Function>;
|
||||
type SendToOrigin = (data: WorkerMessageData, transferables?: Transferable[]) => void;
|
||||
type SendToOrigin = (data: WorkerPayload, transferables?: Transferable[]) => void;
|
||||
|
||||
const messageHandlers = createCallbackManager();
|
||||
onmessage = messageHandlers.runCallbacks;
|
||||
|
||||
export function createWorkerInterface(api: ApiConfig, channel?: string) {
|
||||
function sendToOrigin(data: WorkerMessageData, transferables?: Transferable[]) {
|
||||
data.channel = channel;
|
||||
let pendingPayloads: WorkerPayload[] = [];
|
||||
let pendingTransferables: Transferable[] = [];
|
||||
|
||||
if (transferables) {
|
||||
const sendToOriginOnTickEnd = throttleWithTickEnd(() => {
|
||||
const data = { channel, payloads: pendingPayloads };
|
||||
const transferables = pendingTransferables;
|
||||
|
||||
pendingPayloads = [];
|
||||
pendingTransferables = [];
|
||||
|
||||
if (transferables.length) {
|
||||
postMessage(data, transferables);
|
||||
} else {
|
||||
postMessage(data);
|
||||
}
|
||||
});
|
||||
|
||||
function sendToOrigin(payload: WorkerPayload, transferables?: Transferable[]) {
|
||||
pendingPayloads.push(payload);
|
||||
|
||||
if (transferables) {
|
||||
pendingTransferables.push(...transferables);
|
||||
}
|
||||
|
||||
sendToOriginOnTickEnd();
|
||||
}
|
||||
|
||||
handleErrors(sendToOrigin);
|
||||
@ -38,7 +59,7 @@ export function createWorkerInterface(api: ApiConfig, channel?: string) {
|
||||
});
|
||||
}
|
||||
|
||||
async function onMessage(
|
||||
function onMessage(
|
||||
api: ApiConfig,
|
||||
data: OriginMessageData,
|
||||
sendToOrigin: SendToOrigin,
|
||||
@ -53,84 +74,89 @@ async function onMessage(
|
||||
};
|
||||
}
|
||||
|
||||
switch (data.type) {
|
||||
case 'init': {
|
||||
const { args } = data;
|
||||
const promise = typeof api === 'function'
|
||||
? api('init', onUpdate, ...args)
|
||||
: api.init?.(onUpdate, ...args);
|
||||
await promise;
|
||||
|
||||
break;
|
||||
}
|
||||
case 'callMethod': {
|
||||
const {
|
||||
messageId, name, args, withCallback,
|
||||
} = data;
|
||||
|
||||
try {
|
||||
if (typeof api !== 'function' && !api[name]) return;
|
||||
|
||||
if (messageId && withCallback) {
|
||||
const callback = (...callbackArgs: any[]) => {
|
||||
const lastArg = callbackArgs[callbackArgs.length - 1];
|
||||
|
||||
sendToOrigin({
|
||||
type: 'methodCallback',
|
||||
messageId,
|
||||
callbackArgs,
|
||||
}, isTransferable(lastArg) ? [lastArg] : undefined);
|
||||
};
|
||||
|
||||
callbackState.set(messageId, callback);
|
||||
|
||||
args.push(callback as never);
|
||||
data.payloads.forEach(async (payload) => {
|
||||
switch (payload.type) {
|
||||
case 'init': {
|
||||
const { args } = payload;
|
||||
if (typeof api === 'function') {
|
||||
await api('init', onUpdate, ...args);
|
||||
} else {
|
||||
await api.init?.(onUpdate, ...args);
|
||||
}
|
||||
|
||||
const response = typeof api === 'function'
|
||||
? await api(name, ...args)
|
||||
: await api[name](...args);
|
||||
const { arrayBuffer } = (typeof response === 'object' && 'arrayBuffer' in response && response) || {};
|
||||
if (messageId) {
|
||||
sendToOrigin(
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
case 'callMethod': {
|
||||
const {
|
||||
messageId, name, args, withCallback,
|
||||
} = payload;
|
||||
|
||||
try {
|
||||
if (typeof api !== 'function' && !api[name]) return;
|
||||
|
||||
if (messageId && withCallback) {
|
||||
const callback = (...callbackArgs: any[]) => {
|
||||
const lastArg = callbackArgs[callbackArgs.length - 1];
|
||||
|
||||
sendToOrigin({
|
||||
type: 'methodCallback',
|
||||
messageId,
|
||||
callbackArgs,
|
||||
}, isTransferable(lastArg) ? [lastArg] : undefined);
|
||||
};
|
||||
|
||||
callbackState.set(messageId, callback);
|
||||
|
||||
args.push(callback as never);
|
||||
}
|
||||
|
||||
const response = typeof api === 'function'
|
||||
? await api(name, ...args)
|
||||
: await api[name](...args);
|
||||
const { arrayBuffer } = (typeof response === 'object' && 'arrayBuffer' in response && response) || {};
|
||||
if (messageId) {
|
||||
sendToOrigin(
|
||||
{
|
||||
type: 'methodResponse',
|
||||
messageId,
|
||||
response,
|
||||
},
|
||||
arrayBuffer ? [arrayBuffer] : undefined,
|
||||
);
|
||||
}
|
||||
} catch (error: any) {
|
||||
if (DEBUG) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.error(error);
|
||||
}
|
||||
|
||||
if (messageId) {
|
||||
sendToOrigin({
|
||||
type: 'methodResponse',
|
||||
messageId,
|
||||
response,
|
||||
},
|
||||
arrayBuffer ? [arrayBuffer] : undefined,
|
||||
);
|
||||
}
|
||||
} catch (error: any) {
|
||||
if (DEBUG) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.error(error);
|
||||
error: { message: error.message },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (messageId) {
|
||||
sendToOrigin({
|
||||
type: 'methodResponse',
|
||||
messageId,
|
||||
error: { message: error.message },
|
||||
});
|
||||
callbackState.delete(messageId);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if (messageId) {
|
||||
callbackState.delete(messageId);
|
||||
}
|
||||
case 'cancelProgress': {
|
||||
const callback = callbackState.get(payload.messageId);
|
||||
if (callback) {
|
||||
callback.isCanceled = true;
|
||||
}
|
||||
|
||||
break;
|
||||
break;
|
||||
}
|
||||
}
|
||||
case 'cancelProgress': {
|
||||
const callback = callbackState.get(data.messageId);
|
||||
if (callback) {
|
||||
callback.isCanceled = true;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function isTransferable(obj: any) {
|
||||
|
||||
@ -18,7 +18,7 @@ export default function launchMediaWorkers() {
|
||||
instances = new Array(MAX_WORKERS).fill(undefined).map(
|
||||
() => {
|
||||
const worker = new Worker(new URL('../lib/mediaWorker/index.worker.ts', import.meta.url));
|
||||
const connector = createConnector<MediaWorkerApi>(worker);
|
||||
const connector = createConnector<MediaWorkerApi>(worker, undefined, 'media');
|
||||
return { worker, connector };
|
||||
},
|
||||
);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user