"use strict"; var _a; Object.defineProperty(exports, "__esModule", { value: true }); exports.SessionAwareAdapter = exports.Adapter = void 0; const events_1 = require("events"); const yeast_1 = require("./contrib/yeast"); const WebSocket = require("ws"); const canPreComputeFrame = typeof ((_a = WebSocket === null || WebSocket === void 0 ? void 0 : WebSocket.Sender) === null || _a === void 0 ? void 0 : _a.frame) === "function"; class Adapter extends events_1.EventEmitter { /** * In-memory adapter constructor. * * @param {Namespace} nsp */ constructor(nsp) { super(); this.nsp = nsp; this.rooms = new Map(); this.sids = new Map(); this.encoder = nsp.server.encoder; } /** * To be overridden */ init() { } /** * To be overridden */ close() { } /** * Returns the number of Socket.IO servers in the cluster * * @public */ serverCount() { return Promise.resolve(1); } /** * Adds a socket to a list of room. * * @param {SocketId} id the socket id * @param {Set} rooms a set of rooms * @public */ addAll(id, rooms) { if (!this.sids.has(id)) { this.sids.set(id, new Set()); } for (const room of rooms) { this.sids.get(id).add(room); if (!this.rooms.has(room)) { this.rooms.set(room, new Set()); this.emit("create-room", room); } if (!this.rooms.get(room).has(id)) { this.rooms.get(room).add(id); this.emit("join-room", room, id); } } } /** * Removes a socket from a room. * * @param {SocketId} id the socket id * @param {Room} room the room name */ del(id, room) { if (this.sids.has(id)) { this.sids.get(id).delete(room); } this._del(room, id); } _del(room, id) { const _room = this.rooms.get(room); if (_room != null) { const deleted = _room.delete(id); if (deleted) { this.emit("leave-room", room, id); } if (_room.size === 0 && this.rooms.delete(room)) { this.emit("delete-room", room); } } } /** * Removes a socket from all rooms it's joined. * * @param {SocketId} id the socket id */ delAll(id) { if (!this.sids.has(id)) { return; } for (const room of this.sids.get(id)) { this._del(room, id); } this.sids.delete(id); } /** * Broadcasts a packet. * * Options: * - `flags` {Object} flags for this packet * - `except` {Array} sids that should be excluded * - `rooms` {Array} list of rooms to broadcast to * * @param {Object} packet the packet object * @param {Object} opts the options * @public */ broadcast(packet, opts) { const flags = opts.flags || {}; const packetOpts = { preEncoded: true, volatile: flags.volatile, compress: flags.compress, }; packet.nsp = this.nsp.name; const encodedPackets = this._encode(packet, packetOpts); this.apply(opts, (socket) => { if (typeof socket.notifyOutgoingListeners === "function") { socket.notifyOutgoingListeners(packet); } socket.client.writeToEngine(encodedPackets, packetOpts); }); } /** * Broadcasts a packet and expects multiple acknowledgements. * * Options: * - `flags` {Object} flags for this packet * - `except` {Array} sids that should be excluded * - `rooms` {Array} list of rooms to broadcast to * * @param {Object} packet the packet object * @param {Object} opts the options * @param clientCountCallback - the number of clients that received the packet * @param ack - the callback that will be called for each client response * * @public */ broadcastWithAck(packet, opts, clientCountCallback, ack) { const flags = opts.flags || {}; const packetOpts = { preEncoded: true, volatile: flags.volatile, compress: flags.compress, }; packet.nsp = this.nsp.name; // we can use the same id for each packet, since the _ids counter is common (no duplicate) packet.id = this.nsp._ids++; const encodedPackets = this._encode(packet, packetOpts); let clientCount = 0; this.apply(opts, (socket) => { // track the total number of acknowledgements that are expected clientCount++; // call the ack callback for each client response socket.acks.set(packet.id, ack); if (typeof socket.notifyOutgoingListeners === "function") { socket.notifyOutgoingListeners(packet); } socket.client.writeToEngine(encodedPackets, packetOpts); }); clientCountCallback(clientCount); } _encode(packet, packetOpts) { const encodedPackets = this.encoder.encode(packet); if (canPreComputeFrame && encodedPackets.length === 1 && typeof encodedPackets[0] === "string") { // "4" being the "message" packet type in the Engine.IO protocol const data = Buffer.from("4" + encodedPackets[0]); // see https://github.com/websockets/ws/issues/617#issuecomment-283002469 packetOpts.wsPreEncodedFrame = WebSocket.Sender.frame(data, { readOnly: false, mask: false, rsv1: false, opcode: 1, fin: true, }); } return encodedPackets; } /** * Gets a list of sockets by sid. * * @param {Set} rooms the explicit set of rooms to check. */ sockets(rooms) { const sids = new Set(); this.apply({ rooms }, (socket) => { sids.add(socket.id); }); return Promise.resolve(sids); } /** * Gets the list of rooms a given socket has joined. * * @param {SocketId} id the socket id */ socketRooms(id) { return this.sids.get(id); } /** * Returns the matching socket instances * * @param opts - the filters to apply */ fetchSockets(opts) { const sockets = []; this.apply(opts, (socket) => { sockets.push(socket); }); return Promise.resolve(sockets); } /** * Makes the matching socket instances join the specified rooms * * @param opts - the filters to apply * @param rooms - the rooms to join */ addSockets(opts, rooms) { this.apply(opts, (socket) => { socket.join(rooms); }); } /** * Makes the matching socket instances leave the specified rooms * * @param opts - the filters to apply * @param rooms - the rooms to leave */ delSockets(opts, rooms) { this.apply(opts, (socket) => { rooms.forEach((room) => socket.leave(room)); }); } /** * Makes the matching socket instances disconnect * * @param opts - the filters to apply * @param close - whether to close the underlying connection */ disconnectSockets(opts, close) { this.apply(opts, (socket) => { socket.disconnect(close); }); } apply(opts, callback) { const rooms = opts.rooms; const except = this.computeExceptSids(opts.except); if (rooms.size) { const ids = new Set(); for (const room of rooms) { if (!this.rooms.has(room)) continue; for (const id of this.rooms.get(room)) { if (ids.has(id) || except.has(id)) continue; const socket = this.nsp.sockets.get(id); if (socket) { callback(socket); ids.add(id); } } } } else { for (const [id] of this.sids) { if (except.has(id)) continue; const socket = this.nsp.sockets.get(id); if (socket) callback(socket); } } } computeExceptSids(exceptRooms) { const exceptSids = new Set(); if (exceptRooms && exceptRooms.size > 0) { for (const room of exceptRooms) { if (this.rooms.has(room)) { this.rooms.get(room).forEach((sid) => exceptSids.add(sid)); } } } return exceptSids; } /** * Send a packet to the other Socket.IO servers in the cluster * @param packet - an array of arguments, which may include an acknowledgement callback at the end */ serverSideEmit(packet) { console.warn("this adapter does not support the serverSideEmit() functionality"); } /** * Save the client session in order to restore it upon reconnection. */ persistSession(session) { } /** * Restore the session and find the packets that were missed by the client. * @param pid * @param offset */ restoreSession(pid, offset) { return null; } } exports.Adapter = Adapter; class SessionAwareAdapter extends Adapter { constructor(nsp) { super(nsp); this.nsp = nsp; this.sessions = new Map(); this.packets = []; this.maxDisconnectionDuration = nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration; const timer = setInterval(() => { const threshold = Date.now() - this.maxDisconnectionDuration; this.sessions.forEach((session, sessionId) => { const hasExpired = session.disconnectedAt < threshold; if (hasExpired) { this.sessions.delete(sessionId); } }); for (let i = this.packets.length - 1; i >= 0; i--) { const hasExpired = this.packets[i].emittedAt < threshold; if (hasExpired) { this.packets.splice(0, i + 1); break; } } }, 60 * 1000); // prevents the timer from keeping the process alive timer.unref(); } persistSession(session) { session.disconnectedAt = Date.now(); this.sessions.set(session.pid, session); } restoreSession(pid, offset) { const session = this.sessions.get(pid); if (!session) { // the session may have expired return null; } const hasExpired = session.disconnectedAt + this.maxDisconnectionDuration < Date.now(); if (hasExpired) { // the session has expired this.sessions.delete(pid); return null; } const index = this.packets.findIndex((packet) => packet.id === offset); if (index === -1) { // the offset may be too old return null; } const missedPackets = []; for (let i = index + 1; i < this.packets.length; i++) { const packet = this.packets[i]; if (shouldIncludePacket(session.rooms, packet.opts)) { missedPackets.push(packet.data); } } return Promise.resolve(Object.assign(Object.assign({}, session), { missedPackets })); } broadcast(packet, opts) { var _a; const isEventPacket = packet.type === 2; // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and // restored on another server upon reconnection const withoutAcknowledgement = packet.id === undefined; const notVolatile = ((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.volatile) === undefined; if (isEventPacket && withoutAcknowledgement && notVolatile) { const id = (0, yeast_1.yeast)(); // the offset is stored at the end of the data array, so the client knows the ID of the last packet it has // processed (and the format is backward-compatible) packet.data.push(id); this.packets.push({ id, opts, data: packet.data, emittedAt: Date.now(), }); } super.broadcast(packet, opts); } } exports.SessionAwareAdapter = SessionAwareAdapter; function shouldIncludePacket(sessionRooms, opts) { const included = opts.rooms.size === 0 || sessionRooms.some((room) => opts.rooms.has(room)); const notExcluded = sessionRooms.every((room) => !opts.except.has(room)); return included && notExcluded; }