Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions packages/socket.io-adapter/lib/in-memory-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ export class Adapter extends EventEmitter {
* @param opts - the filters to apply
* @param rooms - the rooms to join
*/
public addSockets(opts: BroadcastOptions, rooms: Room[]): void {
public addSockets(
opts: BroadcastOptions,
rooms: Room[],
): void | Promise<void> {
this.apply(opts, (socket) => {
socket.join(rooms);
});
Expand All @@ -312,7 +315,10 @@ export class Adapter extends EventEmitter {
* @param opts - the filters to apply
* @param rooms - the rooms to leave
*/
public delSockets(opts: BroadcastOptions, rooms: Room[]): void {
public delSockets(
opts: BroadcastOptions,
rooms: Room[],
): void | Promise<void> {
this.apply(opts, (socket) => {
rooms.forEach((room) => socket.leave(room));
});
Expand All @@ -324,7 +330,10 @@ export class Adapter extends EventEmitter {
* @param opts - the filters to apply
* @param close - whether to close the underlying connection
*/
public disconnectSockets(opts: BroadcastOptions, close: boolean): void {
public disconnectSockets(
opts: BroadcastOptions,
close: boolean,
): void | Promise<void> {
this.apply(opts, (socket) => {
socket.disconnect(close);
});
Expand Down
15 changes: 15 additions & 0 deletions packages/socket.io-cluster-adapter/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,21 @@ describe("@socket.io/cluster-adapter", () => {
expect((await getRooms(workers[1])).has("room2")).to.be(false);
expect((await getRooms(workers[2])).has("room2")).to.be(true);
});

it("avoids race condition when followed by emit (with await)", (done) => {
const partialDone = times(3, done);

clientSockets.forEach((clientSocket) => {
clientSocket.on("test-event", (payload) => {
expect(payload).to.eql("test-payload");
partialDone();
});
});

// This test verifies that awaiting socketsJoin ensures all sockets
// receive the subsequent broadcast
workers[0].send("test socketsJoin race condition with await");
});
});

describe("socketsLeave", () => {
Expand Down
8 changes: 8 additions & 0 deletions packages/socket.io-cluster-adapter/test/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ process.on("message", async (msg) => {
io.in("room1").socketsJoin("room2");
break;

case "test socketsJoin race condition with await":
// This demonstrates the fix: await ensures the join completes before emit
(async () => {
await io.socketsJoin("test-room");
io.to("test-room").emit("test-event", "test-payload");
})();
break;

case "makes all socket instances leave the specified room":
io.socketsLeave("room1");
break;
Expand Down
37 changes: 26 additions & 11 deletions packages/socket.io/lib/broadcast-operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,18 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
* // make all socket instances in the "room1" room join the "room2" and "room3" rooms
* io.in("room1").socketsJoin(["room2", "room3"]);
*
* // this also works with a single socket ID
* io.in(theSocketId).socketsJoin("room1");
*
* // wait for all servers to process the join (when using a cluster adapter like the cluster adapter)
* await io.in(theSocketId).socketsJoin("room1");
* io.to("room1").emit("my-event", "payload");
*
* @param room - a room, or an array of rooms
* @return {void|Promise<void>} - void when using the default in-memory adapter, or a Promise when using a cluster adapter
*/
public socketsJoin(room: Room | Room[]): void {
this.adapter.addSockets(
public socketsJoin(room: Room | Room[]): void | Promise<void> {
return this.adapter.addSockets(
{
rooms: this.rooms,
except: this.exceptRooms,
Expand All @@ -416,9 +424,10 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
* io.in("room1").socketsLeave(["room2", "room3"]);
*
* @param room - a room, or an array of rooms
* @return {void|Promise<void>} - void when using the default in-memory adapter, or a Promise when using a cluster adapter
*/
public socketsLeave(room: Room | Room[]): void {
this.adapter.delSockets(
public socketsLeave(room: Room | Room[]): void | Promise<void> {
return this.adapter.delSockets(
{
rooms: this.rooms,
except: this.exceptRooms,
Expand All @@ -441,9 +450,10 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
* io.in("room1").disconnectSockets(true);
*
* @param close - whether to close the underlying connection
* @return {void|Promise<void>} - void when using the default in-memory adapter, or a Promise when using a cluster adapter
*/
public disconnectSockets(close: boolean = false): void {
this.adapter.disconnectSockets(
public disconnectSockets(close: boolean = false): void | Promise<void> {
return this.adapter.disconnectSockets(
{
rooms: this.rooms,
except: this.exceptRooms,
Expand Down Expand Up @@ -532,28 +542,33 @@ export class RemoteSocket<EmitEvents extends EventsMap, SocketData>
* Joins a room.
*
* @param {String|Array} room - room or array of rooms
* @return {void|Promise<void>} - void when using the default in-memory adapter, or a Promise when using a cluster adapter
*/
public join(room: Room | Room[]): void {
public join(room: Room | Room[]): void | Promise<void> {
return this.operator.socketsJoin(room);
}

/**
* Leaves a room.
*
* @param {String} room
* @return {void|Promise<void>} - void when using the default in-memory adapter, or a Promise when using a cluster adapter
*/
public leave(room: Room): void {
public leave(room: Room): void | Promise<void> {
return this.operator.socketsLeave(room);
}

/**
* Disconnects this client.
*
* @param {Boolean} close - if `true`, closes the underlying connection
* @return {Socket} self
* @return {Socket|Promise<Socket>} self, or a Promise when using a cluster adapter
*/
public disconnect(close = false): this {
this.operator.disconnectSockets(close);
public disconnect(close = false): this | Promise<this> {
const result = this.operator.disconnectSockets(close);
if (result instanceof Promise) {
return result.then(() => this);
}
return this;
}
}