Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,7 @@ describe('PlatformServicesClient', () => {
);
await delay(50);

expect(remoteHandler).toHaveBeenCalledOnce();
expect(remoteHandler).toHaveBeenCalledWith(
expect(remoteHandler).toHaveBeenCalledExactlyOnceWith(
'peer-123',
'test-message',
);
Expand Down Expand Up @@ -471,8 +470,7 @@ describe('PlatformServicesClient', () => {
);
await delay(50);

expect(giveUpHandler).toHaveBeenCalledOnce();
expect(giveUpHandler).toHaveBeenCalledWith('peer-456');
expect(giveUpHandler).toHaveBeenCalledExactlyOnceWith('peer-456');

const successResponse = outputs.find(
(message) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,7 @@ describe('PlatformServicesServer', () => {
await stream.receiveInput(makeMessageEvent('m0', { method: 'foo' }));
await delay(10);

expect(errorSpy).toHaveBeenCalledOnce();
expect(errorSpy).toHaveBeenCalledWith(
expect(errorSpy).toHaveBeenCalledExactlyOnceWith(
'Error handling "foo" request:',
rpcErrors.methodNotFound(),
);
Expand Down Expand Up @@ -275,8 +274,9 @@ describe('PlatformServicesServer', () => {
await delay(10);

expect(workers).toHaveLength(1);
expect(workers[0]?.launch).toHaveBeenCalledOnce();
expect(workers[0]?.launch).toHaveBeenCalledWith(makeVatConfig());
expect(workers[0]?.launch).toHaveBeenCalledExactlyOnceWith(
makeVatConfig(),
);
});

it('logs error if a vat with the same id already exists', async () => {
Expand All @@ -285,8 +285,7 @@ describe('PlatformServicesServer', () => {
await stream.receiveInput(makeLaunchMessageEvent('m1', 'v0'));
await delay(10);

expect(errorSpy).toHaveBeenCalledOnce();
expect(errorSpy).toHaveBeenCalledWith(
expect(errorSpy).toHaveBeenCalledExactlyOnceWith(
'Error handling "launch" request:',
new VatAlreadyExistsError('v0'),
);
Expand All @@ -306,17 +305,15 @@ describe('PlatformServicesServer', () => {
await delay(10);

expect(workers).toHaveLength(1);
expect(workers[0]?.terminate).toHaveBeenCalledOnce();
expect(workers[0]?.terminate).toHaveBeenCalledWith();
expect(workers[0]?.terminate).toHaveBeenCalledExactlyOnceWith();
});

it('logs error if a vat with the specified id does not exist', async () => {
const errorSpy = vi.spyOn(logger, 'error');
await stream.receiveInput(makeTerminateMessageEvent('m0', 'v0'));
await delay(10);

expect(errorSpy).toHaveBeenCalledOnce();
expect(errorSpy).toHaveBeenCalledWith(
expect(errorSpy).toHaveBeenCalledExactlyOnceWith(
'Error handling "terminate" request:',
new VatNotFoundError('v0'),
);
Expand All @@ -336,8 +333,7 @@ describe('PlatformServicesServer', () => {
await stream.receiveInput(makeTerminateMessageEvent('m1', vatId));
await delay(10);

expect(errorSpy).toHaveBeenCalledOnce();
expect(errorSpy).toHaveBeenCalledWith(
expect(errorSpy).toHaveBeenCalledExactlyOnceWith(
'Error handling "terminate" request:',
vatNotFoundError,
);
Expand Down Expand Up @@ -380,8 +376,7 @@ describe('PlatformServicesServer', () => {
await stream.receiveInput(makeTerminateAllMessageEvent('m1'));
await delay(10);

expect(errorSpy).toHaveBeenCalledOnce();
expect(errorSpy).toHaveBeenCalledWith(
expect(errorSpy).toHaveBeenCalledExactlyOnceWith(
'Error handling "terminateAll" request:',
vatNotFoundError,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ describe('internal-connections', () => {

it('should handle new internal process connections', async () => {
receiveInternalConnections({
handleInternalMessage: mockHandleMessage,
handler: mockHandleMessage,
logger,
});

Expand All @@ -208,7 +208,7 @@ describe('internal-connections', () => {

it('should handle valid message', async () => {
receiveInternalConnections({
handleInternalMessage: mockHandleMessage,
handler: mockHandleMessage,
logger,
});

Expand Down Expand Up @@ -257,7 +257,7 @@ describe('internal-connections', () => {

it('should handle JSON-RPC notifications', async () => {
receiveInternalConnections({
handleInternalMessage: mockHandleMessage,
handler: mockHandleMessage,
logger,
});

Expand Down Expand Up @@ -300,7 +300,7 @@ describe('internal-connections', () => {

it('should handle multiple simultaneous connections', async () => {
receiveInternalConnections({
handleInternalMessage: mockHandleMessage,
handler: mockHandleMessage,
logger,
});

Expand Down Expand Up @@ -335,7 +335,7 @@ describe('internal-connections', () => {

it('should forget ids of closed channels', async () => {
receiveInternalConnections({
handleInternalMessage: mockHandleMessage,
handler: mockHandleMessage,
logger,
});
const controlChannel = MockBroadcastChannel.channels.get(
Expand Down Expand Up @@ -376,7 +376,7 @@ describe('internal-connections', () => {

it('should reject duplicate connections', () => {
receiveInternalConnections({
handleInternalMessage: mockHandleMessage,
handler: mockHandleMessage,
logger,
});
const controlChannel = MockBroadcastChannel.channels.get(
Expand All @@ -401,7 +401,7 @@ describe('internal-connections', () => {

it('should reject invalid control commands', () => {
receiveInternalConnections({
handleInternalMessage: mockHandleMessage,
handler: mockHandleMessage,
logger,
});
const controlChannel = MockBroadcastChannel.channels.get(
Expand All @@ -425,7 +425,7 @@ describe('internal-connections', () => {

it('should handle comms channel message errors', async () => {
receiveInternalConnections({
handleInternalMessage: mockHandleMessage,
handler: mockHandleMessage,
logger,
});

Expand Down Expand Up @@ -455,5 +455,140 @@ describe('internal-connections', () => {
expect.any(Error),
);
});

it('should handle messages with handlerPromise after resolution', async () => {
const handlerPromise = Promise.resolve(mockHandleMessage);

receiveInternalConnections({
handlerPromise,
logger,
});

const controlChannel = MockBroadcastChannel.channels.get(
COMMS_CONTROL_CHANNEL_NAME,
);
controlChannel?.onmessage?.(
new MessageEvent('message', {
data: {
method: 'init',
params: { channelName: 'internal-process-channel' },
},
}),
);

await delay();
const commsStream = streamInstances[0]!;
expect(commsStream).toBeDefined();
const commsStreamWriteSpy = vi.spyOn(commsStream, 'write');

const commsChannel = MockBroadcastChannel.channels.get(
'internal-process-channel',
)!;

// Send first message
commsChannel.onmessage?.(
new MessageEvent('message', {
data: {
method: 'getStatus',
params: null,
id: 1,
},
}),
);
await delay();

expect(mockHandleMessage).toHaveBeenCalledWith({
method: 'getStatus',
params: null,
id: 1,
});
expect(commsStreamWriteSpy).toHaveBeenCalledWith({
jsonrpc: '2.0',
id: 1,
result: { vats: [], clusterConfig: makeClusterConfig() },
});

// Send second message to verify caching (handler should be used directly)
commsChannel.onmessage?.(
new MessageEvent('message', {
data: {
method: 'getStatus',
params: null,
id: 2,
},
}),
);
await delay();

expect(mockHandleMessage).toHaveBeenCalledTimes(2);
expect(commsStreamWriteSpy).toHaveBeenCalledTimes(2);
});

it('should queue messages until handlerPromise resolves', async () => {
let resolveHandler: (handler: typeof mockHandleMessage) => void;
const handlerPromise = new Promise<typeof mockHandleMessage>(
(resolve) => {
resolveHandler = resolve;
},
);

receiveInternalConnections({
handlerPromise,
logger,
});

const controlChannel = MockBroadcastChannel.channels.get(
COMMS_CONTROL_CHANNEL_NAME,
);
controlChannel?.onmessage?.(
new MessageEvent('message', {
data: {
method: 'init',
params: { channelName: 'internal-process-channel' },
},
}),
);

await delay();
const commsStream = streamInstances[0]!;
expect(commsStream).toBeDefined();
const commsStreamWriteSpy = vi.spyOn(commsStream, 'write');

const commsChannel = MockBroadcastChannel.channels.get(
'internal-process-channel',
)!;

// Send message before handler is ready
commsChannel.onmessage?.(
new MessageEvent('message', {
data: {
method: 'getStatus',
params: null,
id: 1,
},
}),
);

// Handler should not be called yet
await delay();
expect(mockHandleMessage).not.toHaveBeenCalled();
expect(commsStreamWriteSpy).not.toHaveBeenCalled();

// Now resolve the handler
resolveHandler!(mockHandleMessage);
await delay();

// Now the message should be handled
expect(mockHandleMessage).toHaveBeenCalledWith({
method: 'getStatus',
params: null,
id: 1,
});
expect(commsStreamWriteSpy).toHaveBeenCalledWith({
jsonrpc: '2.0',
id: 1,
result: { vats: [], clusterConfig: makeClusterConfig() },
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,51 @@ const connectToInternalProcess = async (
return stream;
};

type ReceiveConnectionsOptions = Omit<Options, 'label'> & {
handleInternalMessage: HandleInternalMessage;
};
type ReceiveConnectionsOptions = Omit<Options, 'label'> &
(
| {
handler: HandleInternalMessage;
handlerPromise?: never;
}
| {
handler?: never;
handlerPromise: Promise<HandleInternalMessage>;
}
);

/**
* Listens for connections between the kernel and an internal process, e.g. a UI instance.
* Should be called exactly once in the kernel, during initialization, before any internal
* processes have attempted to connect.
*
* @param options - The options for the connection.
* @param options.handleInternalMessage - The function to handle the internal message.
* @param options.handler - The function to handle internal messages. Mutually exclusive
* with `handlerPromise`.
* @param options.handlerPromise - A promise that resolves to the handler function.
* Mutually exclusive with `handler`.
* @param options.logger - The logger instance.
* @param options.controlChannelName - The name of the control channel. Must match
* the name used by {@link connectToKernel} on the other end.
*/
export const receiveInternalConnections = ({
handleInternalMessage,
handler: directHandler,
handlerPromise,
logger,
controlChannelName = COMMS_CONTROL_CHANNEL_NAME,
}: ReceiveConnectionsOptions): void => {
let handler: HandleInternalMessage | null = null;
let handlerReady: Promise<HandleInternalMessage>;

if (handlerPromise === undefined) {
handler = directHandler;
handlerReady = Promise.resolve(directHandler);
} else {
handlerReady = handlerPromise.then((resolvedHandler) => {
handler = resolvedHandler;
return resolvedHandler;
});
}

const seenChannels = new Set<string>();
new BroadcastChannel(controlChannelName).onmessage = (event) => {
if (!isCommsControlMessage(event.data)) {
Expand All @@ -148,7 +173,8 @@ export const receiveInternalConnections = ({
`Received message from internal process "${channelName}": ${JSON.stringify(message)}`,
);

const reply = await handleInternalMessage(message);
const messageHandler = handler ?? (await handlerReady);
const reply = await messageHandler(message);
if (reply !== undefined) {
await kernelRpcStream.write(reply);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ async function main(): Promise<void> {
resetStorage,
},
);
const serverP = kernelP.then((kernel) => {
return new JsonRpcServer({
const handlerP = kernelP.then((kernel) => {
const server = new JsonRpcServer({
middleware: [
makeLoggingMiddleware(logger.subLogger('kernel-command')),
makePanelMessageMiddleware(kernel, kernelDatabase),
],
});
return async (request: JsonRpcCall) => server.handle(request);
});

receiveInternalConnections({
handleInternalMessage: async (request) =>
serverP.then(async (rpcServer) => rpcServer.handle(request)),
handlerPromise: handlerP,
logger,
});

Expand Down
Loading