// WebSocket signaling. Two kinds of WS clients: // agent -> authenticates with machine enroll_token, waits for session requests // viewer -> authenticated technician, requests a session to a machine // The server brokers consent and relays SDP/ICE. Media never traverses the server. const R = require('./repos'); const A = require('./auth'); const { currentUser, audit } = require('./session'); const { onlineAgents, liveSessions, pendingShares, meetingRooms, roomToDmCall, roomHost, transcriptBuffers, transcriptSubs } = require('./presence'); const W = require('./webhooks'); const CHAT = require('./chat'); function onConnection(ws, req) { const hb = setInterval(() => { if (ws.readyState === 1) { try { ws.ping(); } catch {} } else { clearInterval(hb); } }, 25000); ws.on('message', (raw) => { let m; try { m = JSON.parse(raw); } catch { return; } handle(ws, m, req); }); ws.on('close', () => { clearInterval(hb); cleanup(ws); }); } function handle(ws, m, req) { switch (m.type) { // --- Logged-in user registers this socket for live chat delivery --- case 'chat-hello': { const u = currentUser(req); // identity from the cookie/Bearer on the WS upgrade if (!u) return ws.send(JSON.stringify({ type: 'error', message: 'unauthorized' })); ws._chatUserId = u.id; ws._chatTeamId = u.team_id; CHAT.register(u.id, ws); ws.send(JSON.stringify({ type: 'chat-ready' })); break; } // Recipient's client acknowledges a DM was delivered → mark it + tell the sender. case 'chat-delivered': { if (!ws._chatUserId || !m.id) break; const msg = R.messages.byId(m.id); if (!msg || msg.conversation_id || msg.team_id !== ws._chatTeamId) break; // DMs only if (msg.recipient_id !== ws._chatUserId) break; // only the recipient can ack if (!msg.delivered_at) { R.messages.markDelivered(m.id); try { CHAT.pushToUser(msg.sender_id, { type: 'chat-delivered', id: m.id }); } catch (_) {} } break; } // --- Meetings (mesh): create a room, join by code, relay SDP/ICE peer-to-peer --- case 'meeting-create': { let code; do { code = A.numericCode(6); } while (meetingRooms.has(code)); meetingRooms.set(code, new Map()); const cu = currentUser(req); if (cu) roomHost.set(code, cu.id); // ad-hoc meeting: creator = host ws.send(JSON.stringify({ type: 'meeting-created', room: code })); break; } case 'meeting-join': { const room = String(m.room || '').trim(); let peers = meetingRooms.get(room); // A scheduled meeting's room is created lazily on first join (its code lives in the DB). if (!peers) { const sched = R.scheduledMeetings.byCode(room); if (sched && !sched.ended_at) { peers = new Map(); meetingRooms.set(room, peers); } } if (!peers) return ws.send(JSON.stringify({ type: 'error', message: 'Meeting not found' })); const peerId = A.token(6); const name = String(m.name || 'Guest').slice(0, 60); ws.kind = 'meeting'; ws._meetingRoom = room; ws._peerId = peerId; ws._peerName = name; // Host = the meeting's creator. roomHost is set on call/meeting creation; scheduled meetings fall back to created_by. let hostUserId = roomHost.get(room); if (hostUserId === undefined) { try { const s = R.scheduledMeetings.byCode(room); if (s) { hostUserId = s.created_by; roomHost.set(room, hostUserId); } } catch (_) {} } const ju = currentUser(req); ws._meetingUserId = ju ? ju.id : null; // for per-user transcript ownership const isHost = !!(ju && hostUserId && ju.id === hostUserId); // Tell the newcomer who's already here (they initiate offers to existing peers)… ws.send(JSON.stringify({ type: 'meeting-joined', room, peerId, isHost, peers: [...peers.entries()].map(([id, p]) => ({ peerId: id, name: p.name })) })); // …and tell existing peers a newcomer arrived. for (const [, p] of peers) { if (p.ws.readyState === 1) p.ws.send(JSON.stringify({ type: 'meeting-peer-joined', peerId, name })); } peers.set(peerId, { ws, name }); const tsubs = transcriptSubs.get(room); if (tsubs && tsubs.size > 0) ws.send(JSON.stringify({ type: 'meeting-transcribe-state', active: true })); // catch up: already transcribing break; } case 'meeting-signal': { const peers = ws._meetingRoom && meetingRooms.get(ws._meetingRoom); if (!peers) return; const target = peers.get(m.to); if (target && target.ws.readyState === 1) target.ws.send(JSON.stringify({ type: 'meeting-signal', from: ws._peerId, data: m.data })); break; } // Relay a peer's mic/cam state to everyone else in the room (for the tile mute icon). case 'meeting-state': { const peers = ws._meetingRoom && meetingRooms.get(ws._meetingRoom); if (!peers) return; for (const [id, p] of peers) { if (id !== ws._peerId && p.ws.readyState === 1) p.ws.send(JSON.stringify({ type: 'meeting-peer-state', peerId: ws._peerId, muted: !!m.muted, camOff: !!m.camOff })); } break; } // Relay a peer's screen-share on/off to everyone else (for the tile badge + single-share rule). case 'meeting-screen': { const peers = ws._meetingRoom && meetingRooms.get(ws._meetingRoom); if (!peers) return; for (const [id, p] of peers) { if (id !== ws._peerId && p.ws.readyState === 1) p.ws.send(JSON.stringify({ type: 'meeting-peer-screen', from: ws._peerId, on: !!m.on })); } break; } // Host: set whether multiple people may share their screen at once. case 'meeting-sharemode': { const peers = ws._meetingRoom && meetingRooms.get(ws._meetingRoom); if (!peers) return; for (const [id, p] of peers) { if (id !== ws._peerId && p.ws.readyState === 1) p.ws.send(JSON.stringify({ type: 'meeting-sharemode', multi: !!m.multi })); } break; } // Host starts/stops recording → tell everyone so they see (and hear) the "being recorded" notice. case 'meeting-recording': { const peers = ws._meetingRoom && meetingRooms.get(ws._meetingRoom); if (!peers) return; for (const [id, p] of peers) { if (id !== ws._peerId && p.ws.readyState === 1) p.ws.send(JSON.stringify({ type: 'meeting-recording', on: !!m.on, by: ws._peerName || 'The host' })); } break; } // A participant subscribes/unsubscribes to a transcript copy. While ≥1 subscriber, EVERY client // transcribes its own mic (full conversation); each subscriber gets their own private copy. // Unsubscribing only drops YOUR copy — it never stops anyone else's. case 'meeting-transcribe': { const room = ws._meetingRoom; const peers = room && meetingRooms.get(room); if (!peers) return; const uid = ws._meetingUserId; if (!uid) return; let subs = transcriptSubs.get(room); if (!subs) { subs = new Set(); transcriptSubs.set(room, subs); } if (m.on) subs.add(uid); else { try { require('./calls').finalizeTranscript(room, uid); } catch (_) {} } // finalize writes + removes the sub const active = subs.size > 0; for (const [, p] of peers) { if (p.ws.readyState === 1) p.ws.send(JSON.stringify({ type: 'meeting-transcribe-state', active })); } break; } // A participant's recognized speech segment → appended to the room's shared transcript buffer. case 'meeting-transcript': { const room = ws._meetingRoom; if (!room || !meetingRooms.get(room)) return; const text = String(m.text || '').slice(0, 1000).trim(); if (!text) return; let buf = transcriptBuffers.get(room); if (!buf) { buf = []; transcriptBuffers.set(room, buf); } buf.push({ t: Date.now(), speaker: ws._peerName || 'Guest', text }); if (buf.length > 8000) buf.shift(); break; } // Host: mute everyone else in the room. case 'meeting-muteall': { const peers = ws._meetingRoom && meetingRooms.get(ws._meetingRoom); if (!peers) return; for (const [id, p] of peers) { if (id !== ws._peerId && p.ws.readyState === 1) p.ws.send(JSON.stringify({ type: 'meeting-muteall', by: ws._peerId })); } break; } // Host: transfer host to another peer (broadcast the new host to the room). case 'meeting-host': { const peers = ws._meetingRoom && meetingRooms.get(ws._meetingRoom); if (!peers || !m.to) return; for (const [, p] of peers) { if (p.ws.readyState === 1) p.ws.send(JSON.stringify({ type: 'meeting-host', hostPeerId: m.to })); } break; } case 'meeting-leave': { leaveMeeting(ws); break; } // --- Agent comes online --- case 'agent-hello': { const machine = R.machines.byEnrollToken(m.enrollToken); if (!machine) return ws.send(JSON.stringify({ type: 'error', message: 'invalid enroll token' })); ws.kind = 'agent'; ws.machineId = machine.id; onlineAgents.set(machine.id, { ws, machine }); R.machines.touch(machine.id); ws.send(JSON.stringify({ type: 'agent-registered', machineId: machine.id, name: machine.name })); break; } // --- Technician requests control of a machine --- case 'viewer-connect': { const u = currentUser(req); // cookie sent on WS upgrade if (!u) return ws.send(JSON.stringify({ type: 'error', message: 'unauthorized' })); const agent = onlineAgents.get(m.machineId); const machine = R.machines.inTenant(m.machineId, u.team_id); if (!machine) return ws.send(JSON.stringify({ type: 'error', message: 'no such machine' })); if (!agent) return ws.send(JSON.stringify({ type: 'error', message: 'machine offline' })); if (u.role === 'viewer' && false) {} // view-only still allowed to watch; control gated agent-side const sessionId = A.token(8); ws.kind = 'viewer'; ws.sessionId = sessionId; liveSessions.set(sessionId, { agentWs: agent.ws, viewerWs: ws, machine, user: u }); audit({ team_id: u.team_id, user_id: u.id, user_email: u.email, machine_id: machine.id, machine_name: machine.name, action: 'session_requested' }); // Ask the agent for consent (or auto-grant if unattended policy is on) agent.ws.sessionId = sessionId; agent.ws.send(JSON.stringify({ type: 'session-request', sessionId, technician: u.email, unattended: !!machine.unattended, })); ws.send(JSON.stringify({ type: 'session-pending', sessionId, machineName: machine.name })); break; } // --- Agent grants/denies consent --- case 'consent': { const sess = liveSessions.get(m.sessionId); if (!sess) return; if (m.granted) { audit({ team_id: sess.machine.team_id, user_id: sess.user.id, user_email: sess.user.email, machine_id: sess.machine.id, machine_name: sess.machine.name, action: 'consent_granted', detail: sess.ticket ? 'Ticket ' + sess.ticket : (sess.machine.id ? null : 'Direct session') }); try { R.sessionsLog.create({ id: m.sessionId, tenantId: sess.machine.team_id, agentEmail: sess.user.email, agentName: sess.agentName || sess.user.email, ticket: sess.ticket || null }); } catch (e) { /* duplicate consent */ } try { W.emit('session.started', sess.machine.team_id, { sessionId: m.sessionId, agent_email: sess.user.email, agent_name: sess.agentName || sess.user.email, ticket: sess.ticket || null, started_at: Date.now() }); } catch (_) {} sess.viewerWs.send(JSON.stringify({ type: 'session-ready', sessionId: m.sessionId })); sess.agentWs.send(JSON.stringify({ type: 'start-stream', sessionId: m.sessionId })); } else { audit({ team_id: sess.machine.team_id, user_id: sess.user.id, user_email: sess.user.email, machine_id: sess.machine.id, machine_name: sess.machine.name, action: 'consent_denied', detail: sess.ticket ? 'Ticket ' + sess.ticket : (sess.machine.id ? null : 'Direct session') }); sess.viewerWs.send(JSON.stringify({ type: 'session-denied', sessionId: m.sessionId })); liveSessions.delete(m.sessionId); } break; } // --- No-install: end user opens /share, gets a one-time code --- case 'share-create': { let code; do { code = A.numericCode(6); } while (pendingShares.has(code)); const sessionId = A.token(8); ws.kind = 'sharer'; ws.shareCode = code; ws.sessionId = sessionId; pendingShares.set(code, { sharerWs: ws, sessionId }); ws.send(JSON.stringify({ type: 'share-code', code })); break; } // --- Logged-in agent enters the code (+ ticket) to connect --- case 'code-connect': { const agent = currentUser(req); // identity from the agent's authenticated session if (!agent) { return ws.send(JSON.stringify({ type: 'error', message: 'Please sign in as an agent first' })); } const ticket = String(m.ticket || '').trim() || null; // optional: direct sessions have no ticket const pend = pendingShares.get(String(m.code || '').trim()); if (!pend || pend.sharerWs.readyState !== 1) { return ws.send(JSON.stringify({ type: 'error', message: 'Invalid or expired code' })); } pendingShares.delete(pend.sharerWs.shareCode); const sessionId = pend.sessionId; ws.kind = 'viewer'; ws.sessionId = sessionId; const agentName = agent.name || agent.email; const machine = { id: null, name: 'Support session ' + pend.sharerWs.shareCode, team_id: agent.team_id }; const user = { id: agent.id, email: agent.email, team_id: agent.team_id }; liveSessions.set(sessionId, { agentWs: pend.sharerWs, viewerWs: ws, machine, user, ticket, agentName }); pend.sharerWs.sessionId = sessionId; audit({ team_id: agent.team_id, user_id: agent.id, user_email: agent.email, machine_name: machine.name, action: 'code_session_requested', detail: (ticket ? 'Ticket ' + ticket : 'Direct session (no ticket)') + ' · agent ' + agentName }); pend.sharerWs.send(JSON.stringify({ type: 'share-request', sessionId, technician: agentName, ticket })); ws.send(JSON.stringify({ type: 'code-pending', sessionId })); break; } // --- Relay WebRTC signaling between the two peers --- case 'offer': case 'answer': case 'ice-candidate': { const sess = liveSessions.get(m.sessionId || ws.sessionId); if (!sess) return; const peer = ws === sess.agentWs ? sess.viewerWs : sess.agentWs; if (peer && peer.readyState === 1) peer.send(JSON.stringify(m)); break; } case 'transcript': { const sess = liveSessions.get(m.sessionId || ws.sessionId); if (!sess) return; const peer = ws === sess.agentWs ? sess.viewerWs : sess.agentWs; if (peer && peer.readyState === 1) peer.send(JSON.stringify(m)); break; } case 'recording': { const sess = liveSessions.get(m.sessionId || ws.sessionId); if (!sess) return; const peer = ws === sess.agentWs ? sess.viewerWs : sess.agentWs; if (peer && peer.readyState === 1) peer.send(JSON.stringify(m)); break; } case 'end-session': { endSession(ws.sessionId, m.reason || null); break; } } } function endSession(sessionId, reason) { const sess = liveSessions.get(sessionId); if (!sess) return; try { R.sessionsLog.end(sessionId); } catch (e) {} try { const row = R.sessionsLog.byId(sessionId); if (row) W.emit('session.ended', sess.machine.team_id, { sessionId: row.id, agent_email: row.agent_email, agent_name: row.agent_name, ticket: row.ticket, started_at: row.started_at, ended_at: row.ended_at, duration_ms: row.ended_at ? row.ended_at - row.started_at : null }); } catch (e) {} audit({ team_id: sess.machine.team_id, user_id: sess.user.id, user_email: sess.user.email, machine_id: sess.machine.id, machine_name: sess.machine.name, action: 'session_ended', detail: sess.ticket ? 'Ticket ' + sess.ticket : (sess.machine.id ? null : 'Direct session') }); [sess.agentWs, sess.viewerWs].forEach((p) => { if (p && p.readyState === 1) p.send(JSON.stringify({ type: 'session-ended', sessionId, reason: reason || null })); }); liveSessions.delete(sessionId); } function leaveMeeting(ws) { const room = ws._meetingRoom; if (!room) return; const peers = meetingRooms.get(room); ws._meetingRoom = null; const pid = ws._peerId; if (!peers) return; try { require('./calls').finalizeTranscript(room, ws._meetingUserId); } catch (_) {} // save THIS user's transcript peers.delete(pid); // 1:1 call: when either party leaves, end it for everyone (a DM call has no "remaining" call). if (roomToDmCall.has(room)) { for (const [, p] of peers) { if (p.ws.readyState === 1) { try { p.ws.send(JSON.stringify({ type: 'meeting-ended' })); } catch (_) {} p._meetingRoom = null; } } meetingRooms.delete(room); try { require('./calls').finalizeTranscript(room); } catch (_) {} // any remaining buffers (safety) roomHost.delete(room); try { require('./calls').endCallByRoom(room); } catch (_) {} return; } for (const [, p] of peers) { if (p.ws.readyState === 1) p.ws.send(JSON.stringify({ type: 'meeting-peer-left', peerId: pid })); } if (peers.size === 0) { meetingRooms.delete(room); try { require('./calls').finalizeTranscript(room); } catch (_) {} // before endCallByRoom clears the maps roomHost.delete(room); try { require('./calls').endCallByRoom(room); } catch (_) {} } } function cleanup(ws) { CHAT.unregister(ws); leaveMeeting(ws); if (ws.kind === 'agent' && ws.machineId) onlineAgents.delete(ws.machineId); if (ws.kind === 'sharer' && ws.shareCode) pendingShares.delete(ws.shareCode); if (ws.sessionId) { for (const [sid, sess] of liveSessions) { if (sess.agentWs === ws || sess.viewerWs === ws) endSession(sid); } } } module.exports = { onConnection };