diff --git a/packages/socket.io-adapter/lib/in-memory-adapter.ts b/packages/socket.io-adapter/lib/in-memory-adapter.ts index cf178170e0..ee2a4d1a3f 100644 --- a/packages/socket.io-adapter/lib/in-memory-adapter.ts +++ b/packages/socket.io-adapter/lib/in-memory-adapter.ts @@ -300,7 +300,10 @@ export class Adapter extends EventEmitter { * @param opts - the filters to apply * @param rooms - the rooms to join */ - public addSockets(opts: BroadcastOptions, rooms: Room[]): void { + public addSockets( + opts: BroadcastOptions, + rooms: Room[], + ): void | Promise { this.apply(opts, (socket) => { socket.join(rooms); }); @@ -312,7 +315,10 @@ export class Adapter extends EventEmitter { * @param opts - the filters to apply * @param rooms - the rooms to leave */ - public delSockets(opts: BroadcastOptions, rooms: Room[]): void { + public delSockets( + opts: BroadcastOptions, + rooms: Room[], + ): void | Promise { this.apply(opts, (socket) => { rooms.forEach((room) => socket.leave(room)); }); @@ -324,7 +330,10 @@ export class Adapter extends EventEmitter { * @param opts - the filters to apply * @param close - whether to close the underlying connection */ - public disconnectSockets(opts: BroadcastOptions, close: boolean): void { + public disconnectSockets( + opts: BroadcastOptions, + close: boolean, + ): void | Promise { this.apply(opts, (socket) => { socket.disconnect(close); }); diff --git a/packages/socket.io-cluster-adapter/test/index.ts b/packages/socket.io-cluster-adapter/test/index.ts index 205bde0228..611a165fa6 100644 --- a/packages/socket.io-cluster-adapter/test/index.ts +++ b/packages/socket.io-cluster-adapter/test/index.ts @@ -249,6 +249,21 @@ describe("@socket.io/cluster-adapter", () => { expect((await getRooms(workers[1])).has("room2")).to.be(false); expect((await getRooms(workers[2])).has("room2")).to.be(true); }); + + it("avoids race condition when followed by emit (with await)", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("test-event", (payload) => { + expect(payload).to.eql("test-payload"); + partialDone(); + }); + }); + + // This test verifies that awaiting socketsJoin ensures all sockets + // receive the subsequent broadcast + workers[0].send("test socketsJoin race condition with await"); + }); }); describe("socketsLeave", () => { diff --git a/packages/socket.io-cluster-adapter/test/worker.js b/packages/socket.io-cluster-adapter/test/worker.js index ac9c447a19..321b4d7cca 100644 --- a/packages/socket.io-cluster-adapter/test/worker.js +++ b/packages/socket.io-cluster-adapter/test/worker.js @@ -106,6 +106,14 @@ process.on("message", async (msg) => { io.in("room1").socketsJoin("room2"); break; + case "test socketsJoin race condition with await": + // This demonstrates the fix: await ensures the join completes before emit + (async () => { + await io.socketsJoin("test-room"); + io.to("test-room").emit("test-event", "test-payload"); + })(); + break; + case "makes all socket instances leave the specified room": io.socketsLeave("room1"); break; diff --git a/packages/socket.io/lib/broadcast-operator.ts b/packages/socket.io/lib/broadcast-operator.ts index 6a229daef6..ded6d111ea 100644 --- a/packages/socket.io/lib/broadcast-operator.ts +++ b/packages/socket.io/lib/broadcast-operator.ts @@ -390,10 +390,18 @@ export class BroadcastOperator * // make all socket instances in the "room1" room join the "room2" and "room3" rooms * io.in("room1").socketsJoin(["room2", "room3"]); * + * // this also works with a single socket ID + * io.in(theSocketId).socketsJoin("room1"); + * + * // wait for all servers to process the join (when using a cluster adapter like the cluster adapter) + * await io.in(theSocketId).socketsJoin("room1"); + * io.to("room1").emit("my-event", "payload"); + * * @param room - a room, or an array of rooms + * @return {void|Promise} - void when using the default in-memory adapter, or a Promise when using a cluster adapter */ - public socketsJoin(room: Room | Room[]): void { - this.adapter.addSockets( + public socketsJoin(room: Room | Room[]): void | Promise { + return this.adapter.addSockets( { rooms: this.rooms, except: this.exceptRooms, @@ -416,9 +424,10 @@ export class BroadcastOperator * io.in("room1").socketsLeave(["room2", "room3"]); * * @param room - a room, or an array of rooms + * @return {void|Promise} - void when using the default in-memory adapter, or a Promise when using a cluster adapter */ - public socketsLeave(room: Room | Room[]): void { - this.adapter.delSockets( + public socketsLeave(room: Room | Room[]): void | Promise { + return this.adapter.delSockets( { rooms: this.rooms, except: this.exceptRooms, @@ -441,9 +450,10 @@ export class BroadcastOperator * io.in("room1").disconnectSockets(true); * * @param close - whether to close the underlying connection + * @return {void|Promise} - void when using the default in-memory adapter, or a Promise when using a cluster adapter */ - public disconnectSockets(close: boolean = false): void { - this.adapter.disconnectSockets( + public disconnectSockets(close: boolean = false): void | Promise { + return this.adapter.disconnectSockets( { rooms: this.rooms, except: this.exceptRooms, @@ -532,8 +542,9 @@ export class RemoteSocket * Joins a room. * * @param {String|Array} room - room or array of rooms + * @return {void|Promise} - void when using the default in-memory adapter, or a Promise when using a cluster adapter */ - public join(room: Room | Room[]): void { + public join(room: Room | Room[]): void | Promise { return this.operator.socketsJoin(room); } @@ -541,8 +552,9 @@ export class RemoteSocket * Leaves a room. * * @param {String} room + * @return {void|Promise} - void when using the default in-memory adapter, or a Promise when using a cluster adapter */ - public leave(room: Room): void { + public leave(room: Room): void | Promise { return this.operator.socketsLeave(room); } @@ -550,10 +562,13 @@ export class RemoteSocket * Disconnects this client. * * @param {Boolean} close - if `true`, closes the underlying connection - * @return {Socket} self + * @return {Socket|Promise} self, or a Promise when using a cluster adapter */ - public disconnect(close = false): this { - this.operator.disconnectSockets(close); + public disconnect(close = false): this | Promise { + const result = this.operator.disconnectSockets(close); + if (result instanceof Promise) { + return result.then(() => this); + } return this; } }