SupervisorStreamService

Supervisor Stream Service

src/main/socket/supervisor-stream.service.ts

Збирає зміни стану з усіх ladyRunner-ів і шле тімліду (через workspace на голдені) одним батчем. Активний тільки коли supervisorConnected = true — інакше всі notify*-методи no-op (тобто за відсутності спостерігача ні алокацій, ні таймерів).


Активація / деактивація

ПодіяДія
SUPERVISOR_CONNECTED від StackSocketsetSupervisorConnected(true) + sendInitialSnapshot(...)
SUPERVISOR_DISCONNECTEDsetSupervisorConnected(false) + onSupervisorDisconnected() (скидає buffer і favoritesSnapshot)
disconnect / connect_error / forceDisconnect сокетаsetSupervisorConnected(false) — спостереження автоматично завершується

Initial snapshot

sendInitialSnapshot({ ladies, favorites, tasks, streams }) шле один payload з усіма трьома CRUD-секціями в updated: [...] + поточний streams-snapshot. Bypass-ить batcher (immediate send) і зберігає favoritesSnapshot — Map favoriteId → JSON.stringify(fav) — щоб подальші notifyFavoritesAll могли діффити проти цієї бази.


Batcher (debounce 100мс)

Перший notify*-виклик заводить setTimeout(100мс). Усі наступні notify* у межах вікна — просто доповнюють accumulator, без нового таймера. На flush — payload шле одним повідомленням через stackSocket.sendMessage(UPDATE_APP_STATE).

accumulator — інстанс AppStatePatch що накопичується. Після flush — обнуляється.


Notify-методи

МетодСемантикаКуди в payload
notifyLadyUpdated(lady)ТЮ зʼявилась або змінилась (upsert by id)ladies.updated[]
notifyLadyRemoved(ladyId_api)ТЮ знята з runner-аladies.removed[]
notifyFavoriteUpdated(fav)Гранулярна зміна одного favorite-аfavorites.updated[]
notifyFavoritesAll(favorites[])Bulk-вхід — diff vs favoritesSnapshot (див. нижче)favorites.{updated,removed}
notifyTasks(tasks[])Заміна повного списку taskstasks.updated
notifyChatDialog({ladyId, manId})Ping: у цьому діалозі є чат-подіяdialogs.chat[]
notifyMailDialog({...})Ping: вихідний листdialogs.mail[]
notifyEmailDialog({...})Ping: вихідний emaildialogs.email[]
notifyStreams(snapshot)Знімок поточного стану камер (snapshot-replace)streams
notifySleepMode(value)Поточний Sleep Mode оператора (snapshot-replace)sleepMode

Усі notify* що пушать у масив — роблять inline-дедуплікацію за id: якщо в акумуляторі вже є запис із тим самим id, він заміняється новим значенням, а не дублюється. Те ж для протилежної гілки (наприклад notifyLadyRemoved прибере id з updated[] якщо він там був, і навпаки). Дублів у wire-payload-і не буває.


Diff для favorites

notifyFavoritesAll(list) отримує повний список favorite-ів від interval-сервісу. Замість того щоб лити цілий список щоразу:

  1. Стрінгіфайнить кожен fav через JSON.stringify
  2. Порівнює з favoritesSnapshot.get(favoriteId)
  3. id немає у snapshot або serialized різниться → updated (upsert)
  4. id є + serialized збігається → пропуск
  5. id є у snapshot але немає в новому списку → removed
  6. Заміняє snapshot на новий список
  7. Результат мерджимо в накопичену секцію (а не заміняємо) — щоб другий виклик у тому ж 100мс-вікні не затирав перший
  8. Якщо delta пуста — payload не шлеться взагалі

Самозаживлюється: будь-яка зміна favorite-у (whitelist, blockedByTU, language, додавання/видалення ТЮ, online-стан чоловіка) проходить через operatorRunner.favoriteService.sendToFrontend()notifyFavoritesAll() → diff → шле тільки реально змінені.


Snapshot для streams

notifyStreams(snapshot) отримує повний глобальний стан від operatorRunner.buildStreamsSnapshot():

{
  manStream: number[],                                            // manId_api що зараз з власним стрімом
  manWatching: Array<{ ladyId_api: number; manId_api: number }>   // хто дивиться нашу ТЮ
}

Замінює повністю поле streams у накопиченому payload-і. Snapshot-семантика → frontend заміняє свій стан тим що прийшло.

Self-healing: якщо socket-подія closeStream чи receiveVideoMode загубилась — наступний polling ManOnlineService (15 сек) перезатягне правду.

Тригерять:

  • ManOnlineService.start() кожні 15с після polling (hasStream оновлюється з API)
  • message-parser.closeStream — чоловік закрив свій стрім
  • message-parser.receiveVideoMode — чоловік почав/перестав дивитись нашу ТЮ

favoritesHaveContacts флаг

Кожен payload іде через stackSocket.sendMessage з опціональним favoritesHaveContacts: boolean у IClientCommand. Виставляється true якщо в favorites.updated[] є хоч один fav з contactDetails?.status === 'Yes'.

Голден на стороні proxyDataToSupervisor дивиться на флаг:

  • false → проксі-ить compressed payload без розпаковки на тімліда (fast path)
  • true → розпаковує, прокачує favorites через email-enrichment (контактні деталі + clientEmail), пакує назад

Мета — мінімізувати CPU на голдені. Більшість event-ів (lady-online toggle, chat-ping, stream-update, sleep-mode toggle) не мають favorite-ів з контактами → passthrough.


Зв’язки

  • Активується від: StackSocket (SUPERVISOR_CONNECTED / SUPERVISOR_DISCONNECTED)
  • Шле: AppStatePatch через stackSocket.sendMessage(UPDATE_APP_STATE)

Для фронта тімліда (тимчасово, поки нема golden-доки)

Цей розділ — короткий брифінг для фронту що споживає workspace-стрім. Постійне місце — у документації голдена (workspace.service.ts і proxyDataToSupervisor), сюди винесено тимчасово.

Що приходить на сокет

Один event — updateAppState. Payload — стиснутий рядок (gzip + base64), розпаковується назад у IAppStatePatch.

// envelope що йде через socket
interface IClientCommand {
  messageId: string;
  event: 'updateAppState';
  data: string;                      // gzip+base64 of IAppStatePatch
  favoritesHaveContacts?: boolean;   // голден дивиться, фронту не релевантний
}

Контракт payload-у

interface IAppStatePatch {
  ladies?:    SectionDelta<ILadyReact, number>;        // id = ladyId_api
  favorites?: SectionDelta<IFavorite, string>;          // id = favoriteId
  tasks?:     SectionDelta<IRunnerTask, string>;        // id = task id
  dialogs?:   IDialogsPatch;
  streams?:   IStreamsPatch;
  sleepMode?: boolean;
}
 
interface SectionDelta<T, IdT> {
  updated?: T[];
  removed?: IdT[];
}
 
interface IDialogsPatch {
  chat?:  Array<{ ladyId_api: number; manId_api: number }>;
  mail?:  Array<{ ladyId_api: number; manId_api: number }>;
  email?: Array<{ ladyId_api: number; manId_api: number }>;
}
 
interface IStreamsPatch {
  manStream: number[];                                            // manId_api що стрімлять
  manWatching: Array<{ ladyId_api: number; manId_api: number }>;  // хто дивиться нашу ТЮ
}

Типи ILadyReact, IFavorite, IRunnerTask — ті ж самі що оператор бачить у себе на фронті (можна взяти зі shared/interfaces).

Як обробляти на фронті

  1. Розпакувати data: gzip-decompress → JSON.parseIAppStatePatch.
  2. Перебрати секції (всі опційні). На кожну:
СекціяДія
ladies / favorites / tasksupdated — upsert by id (додати або замінити). removed — видалити з store по id.
dialogs.chat / mail / emailСигнал «у діалозі є зміна». Якщо діалог зараз відкритий — рефрешити дані через бек; інакше можна підсвітити в списку. Без власних даних.
streamsSnapshot-replace. state.streams = patch.streams — повна заміна.
sleepModeSnapshot-replace. state.sleepMode = patch.sleepMode — повна заміна. Поле опційне; якщо нема — стан не міняти.

Особливості

  • Initial snapshot = один UPDATE_APP_STATE з усіма секціями в updated: [...все] + поточні streams і sleepMode. Окремого «full state» event-у нема. На початку спостереження фронт отримує перший такий payload — це база.
  • Дублі в updated[] неможливі — backend робить inline dedup за id. Один id → один запис у updated[] за batch-вікно (останнє значення виграє).
  • streams приходить повним, не дельтою — фронт завжди заміняє свій стан тим що в payload.
  • sleepMode === false — теж валідне значення, не плутати з відсутністю поля. Перевіряй через 'sleepMode' in patch або patch.sleepMode !== undefined.
  • Якщо тімлід відключився і знову підключився — фронт отримає новий initial snapshot. Старий стан можна скидати при дисконекті.

Список тригерів (що чекати від оператора)

  • Додав ТЮ / змінився стан ТЮ (online, socket-ready) → ladies.updated
  • Зняв ТЮ → ladies.removed
  • Зміни favorite (whitelist, RU blocked, мова, новий favorite, видалений) → favorites.updated/removed (через diff vs попередній snapshot)
  • Бекенд оновив tasks → tasks.updated
  • Прийшов/відправлений чат → dialogs.chat
  • Відправлений лист → dialogs.mail
  • Відправлений email → dialogs.email
  • Чоловік увімкнув/зупинив свій стрім → streams
  • Чоловік почав/перестав дивитись нашу ТЮ → streams
  • Оператор увімкнув/вимкнув Sleep Mode (вручну або по таймауту неактивності) → sleepMode

TODO для майбутньої golden-доки

  • Опис proxyDataToSupervisor (fast-path passthrough vs enrichment-path)
  • Опис email-enrichment kostyl-а (favoritesHaveContactscontactDetails + clientEmail догрібання з Mongo/HTTP)
  • Connect/disconnect протокол (connectToOperator / disconnectFromOperator / SUPERVISOR_CONNECTED ACK)