From 47e8078cd5847f3cc0b4db1ea9c33407a4dfff8d Mon Sep 17 00:00:00 2001 From: Victor1890 Date: Wed, 24 Dec 2025 11:37:37 -0400 Subject: [PATCH 1/5] fix(broadcast): update methods to return Promise for async handling --- packages/socket.io/lib/broadcast-operator.ts | 45 +++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/packages/socket.io/lib/broadcast-operator.ts b/packages/socket.io/lib/broadcast-operator.ts index 6a229daef6..79cc7af7ac 100644 --- a/packages/socket.io/lib/broadcast-operator.ts +++ b/packages/socket.io/lib/broadcast-operator.ts @@ -15,8 +15,7 @@ import type { } from "./typed-events"; export class BroadcastOperator - implements TypedEventBroadcaster -{ + implements TypedEventBroadcaster { constructor( private readonly adapter: Adapter, private readonly rooms: Set = new Set(), @@ -24,7 +23,7 @@ export class BroadcastOperator private readonly flags: BroadcastFlags & { expectSingleResponse?: boolean; } = {}, - ) {} + ) { } /** * Targets a room when emitting. @@ -390,10 +389,18 @@ export class BroadcastOperator * // make all socket instances in the "room1" room join the "room2" and "room3" rooms * io.in("room1").socketsJoin(["room2", "room3"]); * + * // this also works with a single socket ID + * io.in(theSocketId).socketsJoin("room1"); + * + * // wait for all servers to process the join (when using a cluster adapter like the cluster adapter) + * await io.in(theSocketId).socketsJoin("room1"); + * io.to("room1").emit("my-event", "payload"); + * * @param room - a room, or an array of rooms + * @return {void|Promise} - void when using the default in-memory adapter, or a Promise when using a cluster adapter */ - public socketsJoin(room: Room | Room[]): void { - this.adapter.addSockets( + public socketsJoin(room: Room | Room[]): void | Promise { + return this.adapter.addSockets( { rooms: this.rooms, except: this.exceptRooms, @@ -416,9 +423,10 @@ export class BroadcastOperator * io.in("room1").socketsLeave(["room2", "room3"]); * * @param room - a room, or an array of rooms + * @return {void|Promise} - void when using the default in-memory adapter, or a Promise when using a cluster adapter */ - public socketsLeave(room: Room | Room[]): void { - this.adapter.delSockets( + public socketsLeave(room: Room | Room[]): void | Promise { + return this.adapter.delSockets( { rooms: this.rooms, except: this.exceptRooms, @@ -441,9 +449,10 @@ export class BroadcastOperator * io.in("room1").disconnectSockets(true); * * @param close - whether to close the underlying connection + * @return {void|Promise} - void when using the default in-memory adapter, or a Promise when using a cluster adapter */ - public disconnectSockets(close: boolean = false): void { - this.adapter.disconnectSockets( + public disconnectSockets(close: boolean = false): void | Promise { + return this.adapter.disconnectSockets( { rooms: this.rooms, except: this.exceptRooms, @@ -468,8 +477,7 @@ interface SocketDetails { * Expose of subset of the attributes and methods of the Socket class */ export class RemoteSocket - implements TypedEventBroadcaster -{ + implements TypedEventBroadcaster { public readonly id: SocketId; public readonly handshake: Handshake; public readonly rooms: Set; @@ -532,8 +540,9 @@ export class RemoteSocket * Joins a room. * * @param {String|Array} room - room or array of rooms + * @return {void|Promise} - void when using the default in-memory adapter, or a Promise when using a cluster adapter */ - public join(room: Room | Room[]): void { + public join(room: Room | Room[]): void | Promise { return this.operator.socketsJoin(room); } @@ -541,8 +550,9 @@ export class RemoteSocket * Leaves a room. * * @param {String} room + * @return {void|Promise} - void when using the default in-memory adapter, or a Promise when using a cluster adapter */ - public leave(room: Room): void { + public leave(room: Room): void | Promise { return this.operator.socketsLeave(room); } @@ -550,10 +560,13 @@ export class RemoteSocket * Disconnects this client. * * @param {Boolean} close - if `true`, closes the underlying connection - * @return {Socket} self + * @return {Socket|Promise} self, or a Promise when using a cluster adapter */ - public disconnect(close = false): this { - this.operator.disconnectSockets(close); + public disconnect(close = false): this | Promise { + const result = this.operator.disconnectSockets(close); + if (result instanceof Promise) { + return result.then(() => this); + } return this; } } From dd122219b4537d5490737aeab70088cb6803dd1e Mon Sep 17 00:00:00 2001 From: Victor1890 Date: Wed, 24 Dec 2025 11:37:44 -0400 Subject: [PATCH 2/5] fix(adapter): update addSockets, delSockets, and disconnectSockets methods to support Promise --- packages/socket.io-adapter/lib/in-memory-adapter.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/socket.io-adapter/lib/in-memory-adapter.ts b/packages/socket.io-adapter/lib/in-memory-adapter.ts index cf178170e0..d85dc634a6 100644 --- a/packages/socket.io-adapter/lib/in-memory-adapter.ts +++ b/packages/socket.io-adapter/lib/in-memory-adapter.ts @@ -300,7 +300,7 @@ export class Adapter extends EventEmitter { * @param opts - the filters to apply * @param rooms - the rooms to join */ - public addSockets(opts: BroadcastOptions, rooms: Room[]): void { + public addSockets(opts: BroadcastOptions, rooms: Room[]): void | Promise { this.apply(opts, (socket) => { socket.join(rooms); }); @@ -312,7 +312,7 @@ export class Adapter extends EventEmitter { * @param opts - the filters to apply * @param rooms - the rooms to leave */ - public delSockets(opts: BroadcastOptions, rooms: Room[]): void { + public delSockets(opts: BroadcastOptions, rooms: Room[]): void | Promise { this.apply(opts, (socket) => { rooms.forEach((room) => socket.leave(room)); }); @@ -324,7 +324,7 @@ export class Adapter extends EventEmitter { * @param opts - the filters to apply * @param close - whether to close the underlying connection */ - public disconnectSockets(opts: BroadcastOptions, close: boolean): void { + public disconnectSockets(opts: BroadcastOptions, close: boolean): void | Promise { this.apply(opts, (socket) => { socket.disconnect(close); }); From 4f2522cd47391b178ec9ec7a6ecf6c4599c34f62 Mon Sep 17 00:00:00 2001 From: Victor1890 Date: Wed, 24 Dec 2025 11:37:51 -0400 Subject: [PATCH 3/5] fix(tests): add test to avoid race condition in socketsJoin with await --- packages/socket.io-cluster-adapter/test/index.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/packages/socket.io-cluster-adapter/test/index.ts b/packages/socket.io-cluster-adapter/test/index.ts index 205bde0228..611a165fa6 100644 --- a/packages/socket.io-cluster-adapter/test/index.ts +++ b/packages/socket.io-cluster-adapter/test/index.ts @@ -249,6 +249,21 @@ describe("@socket.io/cluster-adapter", () => { expect((await getRooms(workers[1])).has("room2")).to.be(false); expect((await getRooms(workers[2])).has("room2")).to.be(true); }); + + it("avoids race condition when followed by emit (with await)", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("test-event", (payload) => { + expect(payload).to.eql("test-payload"); + partialDone(); + }); + }); + + // This test verifies that awaiting socketsJoin ensures all sockets + // receive the subsequent broadcast + workers[0].send("test socketsJoin race condition with await"); + }); }); describe("socketsLeave", () => { From 5f8c905ba409ef19fa2464b6b0f57ada643b6bab Mon Sep 17 00:00:00 2001 From: Victor1890 Date: Wed, 24 Dec 2025 11:37:57 -0400 Subject: [PATCH 4/5] fix(tests): add demonstration for socketsJoin race condition with await --- packages/socket.io-cluster-adapter/test/worker.js | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/socket.io-cluster-adapter/test/worker.js b/packages/socket.io-cluster-adapter/test/worker.js index ac9c447a19..321b4d7cca 100644 --- a/packages/socket.io-cluster-adapter/test/worker.js +++ b/packages/socket.io-cluster-adapter/test/worker.js @@ -106,6 +106,14 @@ process.on("message", async (msg) => { io.in("room1").socketsJoin("room2"); break; + case "test socketsJoin race condition with await": + // This demonstrates the fix: await ensures the join completes before emit + (async () => { + await io.socketsJoin("test-room"); + io.to("test-room").emit("test-event", "test-payload"); + })(); + break; + case "makes all socket instances leave the specified room": io.socketsLeave("room1"); break; From d802cc6ac4aeb5c3788561ea5f6393002f2a46ed Mon Sep 17 00:00:00 2001 From: Victor1890 Date: Wed, 24 Dec 2025 14:56:00 -0400 Subject: [PATCH 5/5] fix(adapter): refactor addSockets, delSockets, and disconnectSockets methods for improved readability --- .../socket.io-adapter/lib/in-memory-adapter.ts | 15 ++++++++++++--- packages/socket.io/lib/broadcast-operator.ts | 8 +++++--- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/packages/socket.io-adapter/lib/in-memory-adapter.ts b/packages/socket.io-adapter/lib/in-memory-adapter.ts index d85dc634a6..ee2a4d1a3f 100644 --- a/packages/socket.io-adapter/lib/in-memory-adapter.ts +++ b/packages/socket.io-adapter/lib/in-memory-adapter.ts @@ -300,7 +300,10 @@ export class Adapter extends EventEmitter { * @param opts - the filters to apply * @param rooms - the rooms to join */ - public addSockets(opts: BroadcastOptions, rooms: Room[]): void | Promise { + public addSockets( + opts: BroadcastOptions, + rooms: Room[], + ): void | Promise { this.apply(opts, (socket) => { socket.join(rooms); }); @@ -312,7 +315,10 @@ export class Adapter extends EventEmitter { * @param opts - the filters to apply * @param rooms - the rooms to leave */ - public delSockets(opts: BroadcastOptions, rooms: Room[]): void | Promise { + public delSockets( + opts: BroadcastOptions, + rooms: Room[], + ): void | Promise { this.apply(opts, (socket) => { rooms.forEach((room) => socket.leave(room)); }); @@ -324,7 +330,10 @@ export class Adapter extends EventEmitter { * @param opts - the filters to apply * @param close - whether to close the underlying connection */ - public disconnectSockets(opts: BroadcastOptions, close: boolean): void | Promise { + public disconnectSockets( + opts: BroadcastOptions, + close: boolean, + ): void | Promise { this.apply(opts, (socket) => { socket.disconnect(close); }); diff --git a/packages/socket.io/lib/broadcast-operator.ts b/packages/socket.io/lib/broadcast-operator.ts index 79cc7af7ac..ded6d111ea 100644 --- a/packages/socket.io/lib/broadcast-operator.ts +++ b/packages/socket.io/lib/broadcast-operator.ts @@ -15,7 +15,8 @@ import type { } from "./typed-events"; export class BroadcastOperator - implements TypedEventBroadcaster { + implements TypedEventBroadcaster +{ constructor( private readonly adapter: Adapter, private readonly rooms: Set = new Set(), @@ -23,7 +24,7 @@ export class BroadcastOperator private readonly flags: BroadcastFlags & { expectSingleResponse?: boolean; } = {}, - ) { } + ) {} /** * Targets a room when emitting. @@ -477,7 +478,8 @@ interface SocketDetails { * Expose of subset of the attributes and methods of the Socket class */ export class RemoteSocket - implements TypedEventBroadcaster { + implements TypedEventBroadcaster +{ public readonly id: SocketId; public readonly handshake: Handshake; public readonly rooms: Set;