Supervisor Stream Service
src/main/socket/supervisor-stream.service.ts
Збирає зміни стану з усіх ladyRunner-ів і шле тімліду (через workspace на голдені) одним батчем. Активний тільки коли supervisorConnected = true — інакше всі notify*-методи no-op (тобто за відсутності спостерігача ні алокацій, ні таймерів).
Активація / деактивація
| Подія | Дія |
|---|---|
SUPERVISOR_CONNECTED від StackSocket | setSupervisorConnected(true) + sendInitialSnapshot(...) |
SUPERVISOR_DISCONNECTED | setSupervisorConnected(false) + onSupervisorDisconnected() (скидає buffer і favoritesSnapshot) |
disconnect / connect_error / forceDisconnect сокета | setSupervisorConnected(false) — спостереження автоматично завершується |
Initial snapshot
sendInitialSnapshot({ ladies, favorites, tasks, streams }) шле один payload з усіма трьома CRUD-секціями в updated: [...] + поточний streams-snapshot. Bypass-ить batcher (immediate send) і зберігає favoritesSnapshot — Map favoriteId → JSON.stringify(fav) — щоб подальші notifyFavoritesAll могли діффити проти цієї бази.
Batcher (debounce 100мс)
Перший notify*-виклик заводить setTimeout(100мс). Усі наступні notify* у межах вікна — просто доповнюють accumulator, без нового таймера. На flush — payload шле одним повідомленням через stackSocket.sendMessage(UPDATE_APP_STATE).
accumulator — інстанс AppStatePatch що накопичується. Після flush — обнуляється.
Notify-методи
| Метод | Семантика | Куди в payload |
|---|---|---|
notifyLadyUpdated(lady) | ТЮ зʼявилась або змінилась (upsert by id) | ladies.updated[] |
notifyLadyRemoved(ladyId_api) | ТЮ знята з runner-а | ladies.removed[] |
notifyFavoriteUpdated(fav) | Гранулярна зміна одного favorite-а | favorites.updated[] |
notifyFavoritesAll(favorites[]) | Bulk-вхід — diff vs favoritesSnapshot (див. нижче) | favorites.{updated,removed} |
notifyTasks(tasks[]) | Заміна повного списку tasks | tasks.updated |
notifyChatDialog({ladyId, manId}) | Ping: у цьому діалозі є чат-подія | dialogs.chat[] |
notifyMailDialog({...}) | Ping: вихідний лист | dialogs.mail[] |
notifyEmailDialog({...}) | Ping: вихідний email | dialogs.email[] |
notifyStreams(snapshot) | Знімок поточного стану камер (snapshot-replace) | streams |
notifySleepMode(value) | Поточний Sleep Mode оператора (snapshot-replace) | sleepMode |
Усі notify* що пушать у масив — роблять inline-дедуплікацію за id: якщо в акумуляторі вже є запис із тим самим id, він заміняється новим значенням, а не дублюється. Те ж для протилежної гілки (наприклад notifyLadyRemoved прибере id з updated[] якщо він там був, і навпаки). Дублів у wire-payload-і не буває.
Diff для favorites
notifyFavoritesAll(list) отримує повний список favorite-ів від interval-сервісу. Замість того щоб лити цілий список щоразу:
- Стрінгіфайнить кожен fav через
JSON.stringify - Порівнює з
favoritesSnapshot.get(favoriteId) - id немає у snapshot або serialized різниться →
updated(upsert) - id є + serialized збігається → пропуск
- id є у snapshot але немає в новому списку →
removed - Заміняє snapshot на новий список
- Результат мерджимо в накопичену секцію (а не заміняємо) — щоб другий виклик у тому ж 100мс-вікні не затирав перший
- Якщо delta пуста — payload не шлеться взагалі
Самозаживлюється: будь-яка зміна favorite-у (whitelist, blockedByTU, language, додавання/видалення ТЮ, online-стан чоловіка) проходить через operatorRunner.favoriteService.sendToFrontend() → notifyFavoritesAll() → diff → шле тільки реально змінені.
Snapshot для streams
notifyStreams(snapshot) отримує повний глобальний стан від operatorRunner.buildStreamsSnapshot():
{
manStream: number[], // manId_api що зараз з власним стрімом
manWatching: Array<{ ladyId_api: number; manId_api: number }> // хто дивиться нашу ТЮ
}Замінює повністю поле streams у накопиченому payload-і. Snapshot-семантика → frontend заміняє свій стан тим що прийшло.
Self-healing: якщо socket-подія closeStream чи receiveVideoMode загубилась — наступний polling ManOnlineService (15 сек) перезатягне правду.
Тригерять:
ManOnlineService.start()кожні 15с після polling (hasStreamоновлюється з API)message-parser.closeStream— чоловік закрив свій стрімmessage-parser.receiveVideoMode— чоловік почав/перестав дивитись нашу ТЮ
favoritesHaveContacts флаг
Кожен payload іде через stackSocket.sendMessage з опціональним favoritesHaveContacts: boolean у IClientCommand. Виставляється true якщо в favorites.updated[] є хоч один fav з contactDetails?.status === 'Yes'.
Голден на стороні proxyDataToSupervisor дивиться на флаг:
false→ проксі-ить compressed payload без розпаковки на тімліда (fast path)true→ розпаковує, прокачує favorites через email-enrichment (контактні деталі + clientEmail), пакує назад
Мета — мінімізувати CPU на голдені. Більшість event-ів (lady-online toggle, chat-ping, stream-update, sleep-mode toggle) не мають favorite-ів з контактами → passthrough.
Зв’язки
- Активується від: StackSocket (SUPERVISOR_CONNECTED / SUPERVISOR_DISCONNECTED)
- Шле: AppStatePatch через
stackSocket.sendMessage(UPDATE_APP_STATE)
Для фронта тімліда (тимчасово, поки нема golden-доки)
Цей розділ — короткий брифінг для фронту що споживає workspace-стрім. Постійне місце — у документації голдена (
workspace.service.tsіproxyDataToSupervisor), сюди винесено тимчасово.
Що приходить на сокет
Один event — updateAppState. Payload — стиснутий рядок (gzip + base64), розпаковується назад у IAppStatePatch.
// envelope що йде через socket
interface IClientCommand {
messageId: string;
event: 'updateAppState';
data: string; // gzip+base64 of IAppStatePatch
favoritesHaveContacts?: boolean; // голден дивиться, фронту не релевантний
}Контракт payload-у
interface IAppStatePatch {
ladies?: SectionDelta<ILadyReact, number>; // id = ladyId_api
favorites?: SectionDelta<IFavorite, string>; // id = favoriteId
tasks?: SectionDelta<IRunnerTask, string>; // id = task id
dialogs?: IDialogsPatch;
streams?: IStreamsPatch;
sleepMode?: boolean;
}
interface SectionDelta<T, IdT> {
updated?: T[];
removed?: IdT[];
}
interface IDialogsPatch {
chat?: Array<{ ladyId_api: number; manId_api: number }>;
mail?: Array<{ ladyId_api: number; manId_api: number }>;
email?: Array<{ ladyId_api: number; manId_api: number }>;
}
interface IStreamsPatch {
manStream: number[]; // manId_api що стрімлять
manWatching: Array<{ ladyId_api: number; manId_api: number }>; // хто дивиться нашу ТЮ
}Типи ILadyReact, IFavorite, IRunnerTask — ті ж самі що оператор бачить у себе на фронті (можна взяти зі shared/interfaces).
Як обробляти на фронті
- Розпакувати
data: gzip-decompress →JSON.parse→IAppStatePatch. - Перебрати секції (всі опційні). На кожну:
| Секція | Дія |
|---|---|
ladies / favorites / tasks | updated — upsert by id (додати або замінити). removed — видалити з store по id. |
dialogs.chat / mail / email | Сигнал «у діалозі є зміна». Якщо діалог зараз відкритий — рефрешити дані через бек; інакше можна підсвітити в списку. Без власних даних. |
streams | Snapshot-replace. state.streams = patch.streams — повна заміна. |
sleepMode | Snapshot-replace. state.sleepMode = patch.sleepMode — повна заміна. Поле опційне; якщо нема — стан не міняти. |
Особливості
- Initial snapshot = один UPDATE_APP_STATE з усіма секціями в
updated: [...все]+ поточніstreamsіsleepMode. Окремого «full state» event-у нема. На початку спостереження фронт отримує перший такий payload — це база. - Дублі в
updated[]неможливі — backend робить inline dedup за id. Один id → один запис уupdated[]за batch-вікно (останнє значення виграє). streamsприходить повним, не дельтою — фронт завжди заміняє свій стан тим що в payload.sleepMode === false— теж валідне значення, не плутати з відсутністю поля. Перевіряй через'sleepMode' in patchабоpatch.sleepMode !== undefined.- Якщо тімлід відключився і знову підключився — фронт отримає новий initial snapshot. Старий стан можна скидати при дисконекті.
Список тригерів (що чекати від оператора)
- Додав ТЮ / змінився стан ТЮ (online, socket-ready) →
ladies.updated - Зняв ТЮ →
ladies.removed - Зміни favorite (whitelist, RU blocked, мова, новий favorite, видалений) →
favorites.updated/removed(через diff vs попередній snapshot) - Бекенд оновив tasks →
tasks.updated - Прийшов/відправлений чат →
dialogs.chat - Відправлений лист →
dialogs.mail - Відправлений email →
dialogs.email - Чоловік увімкнув/зупинив свій стрім →
streams - Чоловік почав/перестав дивитись нашу ТЮ →
streams - Оператор увімкнув/вимкнув Sleep Mode (вручну або по таймауту неактивності) →
sleepMode
TODO для майбутньої golden-доки
- Опис
proxyDataToSupervisor(fast-path passthrough vs enrichment-path) - Опис email-enrichment kostyl-а (
favoritesHaveContacts→contactDetails+clientEmailдогрібання з Mongo/HTTP) - Connect/disconnect протокол (
connectToOperator/disconnectFromOperator/SUPERVISOR_CONNECTEDACK)