From 1417a5f5e573a7237c02f428cb6a8c3cf574ea81 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 12 Dec 2025 17:07:05 +0000 Subject: [PATCH 01/15] test `STAR.probe_liquid_volumes()` on machine --- .../backends/hamilton/STAR_backend.py | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 57151d016b..dc672cfddd 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -69,6 +69,7 @@ TipRack, TipSpot, Well, + standard_volume_tip_with_filter ) from pylabrobot.resources.barcode import Barcode, Barcode1DSymbology from pylabrobot.resources.errors import ( @@ -1749,6 +1750,150 @@ async def probe_liquid_heights( return relative_to_well + async def probe_liquid_volumes( + self, + containers: List[Container], + use_channels: List[int], + resource_offsets: Optional[List[Coordinate]] = None, + lld_mode: Optional[List[int]] = None, + minimum_traverse_height_at_beginning_of_a_command: Optional[float] = None, + min_z_endpos: Optional[float] = None, + techn_replicate_num: int = 3, + return_geomean: bool = True, + traversal_height: float = 240.0, + move_to_z_safety_after: bool = True, +) -> Union[List[float], List[List[float]]]: + """ + Probe liquid volumes in one or more containers using Hamilton STAR Liquid Level Detection (LLD). + Both lld_mode=1/capacitative-, and lld_mode=2/pressure- LLD are accepted. + + This performs repeated zero-volume aspirate operations (LLD-only probe moves) for the + specified channels and converts the detected liquid heights into estimated volumes using + each container’s ``compute_volume_from_height()`` method. + """ + + # Default offsets + resource_offsets = ( + resource_offsets or + [Coordinate().zero() for _ in range(len(containers))] + ) + + # Default LLD mode == capacitive LLD + if lld_mode is None: + lld_mode = [self.LLDMode(1)] * len(containers) + + assert len(containers) == len(use_channels) == len(resource_offsets) == len(lld_mode), \ + f"{containers=}, {use_channels=}, {resource_offsets=}, and {lld_mode=} must be same length" + + # Validate individual modes + for mode in lld_mode: + assert mode in [self.LLDMode(1), self.LLDMode(2)], \ + f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {mode}" + + if minimum_traverse_height_at_beginning_of_a_command is None: + minimum_traverse_height_at_beginning_of_a_command=traversal_height + + if min_z_endpos is None: + min_z_endpos=traversal_height + + center_offsets = [Coordinate.zero()] * len(use_channels) + if len(set(containers)) == 1: + center_offsets = get_wide_single_resource_liquid_op_offsets( + resource=containers[0], num_channels=len(containers) + ) + + # Create proxy aspirate operations + ops = [] + for i, c in enumerate(containers): + ops.append( + SingleChannelAspiration( + resource=c, + offset=center_offsets[i], + tip=standard_volume_tip_with_filter(), + volume=0.0, + flow_rate=None, + liquid_height=None, + blow_out_air_volume=None, + mix=None + ) + ) + + minimum_heights_tip_can_go_to = [ + c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z + for c in containers + ] + + replicate_summary = [] + + # TODO: merge x_chunking helper function of containers into PLR, then use it to smartly split + # containers to probe -> minimise back and forth movement in x dimension when containers are + # in different x coordinates + + for replicate_idx in range(techn_replicate_num): + + x_coords_of_ops = [ + c.get_location_wrt(self.deck, "c").x + for c in containers + ] + assert len(set(x_coords_of_ops)) == 1, ( + "probing is only allowed in the same x-coordinate for safety reasons" + f"given: {x_coords_of_ops}" + ) + + # Perform zero-volume aspirate operation for probing + await self.aspirate( + ops=ops, + use_channels=use_channels, + lld_mode=lld_mode, + minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, + immersion_depth=[-2]*len(containers), + minimum_height=minimum_heights_tip_can_go_to, + settling_time=[0]*len(containers), + pull_out_distance_transport_air=[0]*len(containers), + transport_air_volume=[0]*len(containers), + min_z_endpos=min_z_endpos, + ) + + all_absolute_liquid_heights = await self.request_pip_height_last_lld() + + absolute_llds_filtered_to_used_channels = [ + all_absolute_liquid_heights[i] for i in use_channels + ] + + # Compute heights relative to cavity bottom + relative_to_well = [] + for abs_h, resource in zip(absolute_llds_filtered_to_used_channels, containers): + bottom_z = resource.get_location_wrt( + self.deck, "c", "c", z="cavity_bottom" + ).z + relative_to_well.append(abs_h - bottom_z) + + # Convert heights to volumes + computed_volumes = [ + resource.compute_volume_from_height(h) + for resource, h in zip(containers, relative_to_well) + ] + + replicate_summary.append(computed_volumes) + + # Move to specified traversal height to save time + zs = {ch: traversal_height for ch in use_channels} + await self.position_channels_in_z_direction(zs) + + merged_channel_results = list(zip(*replicate_summary)) + + if move_to_z_safety_after: + await self.move_all_channels_in_z_safety() + + if return_geomean: + return [ + round(sum(channel_results)/techn_replicate_num, 2) + for channel_results in merged_channel_results + ] + + return merged_channel_results + + async def aspirate( self, ops: List[SingleChannelAspiration], From 216161d4c938b23dd199863a3445163b152574c9 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 12 Dec 2025 17:27:02 +0000 Subject: [PATCH 02/15] `make format` --- .../backends/hamilton/STAR_backend.py | 163 +++++++++--------- 1 file changed, 77 insertions(+), 86 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index dc672cfddd..3be5c6ff28 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -69,7 +69,7 @@ TipRack, TipSpot, Well, - standard_volume_tip_with_filter + standard_volume_tip_with_filter, ) from pylabrobot.resources.barcode import Barcode, Barcode1DSymbology from pylabrobot.resources.errors import ( @@ -1755,72 +1755,71 @@ async def probe_liquid_volumes( containers: List[Container], use_channels: List[int], resource_offsets: Optional[List[Coordinate]] = None, - lld_mode: Optional[List[int]] = None, + lld_mode: Optional[List[LLDMode]] = None, minimum_traverse_height_at_beginning_of_a_command: Optional[float] = None, min_z_endpos: Optional[float] = None, techn_replicate_num: int = 3, return_geomean: bool = True, traversal_height: float = 240.0, move_to_z_safety_after: bool = True, -) -> Union[List[float], List[List[float]]]: + ) -> Union[List[float], List[tuple[float]]]: """ Probe liquid volumes in one or more containers using Hamilton STAR Liquid Level Detection (LLD). Both lld_mode=1/capacitative-, and lld_mode=2/pressure- LLD are accepted. This performs repeated zero-volume aspirate operations (LLD-only probe moves) for the specified channels and converts the detected liquid heights into estimated volumes using - each container’s ``compute_volume_from_height()`` method. + each container's `compute_volume_from_height()` method. """ - + # Default offsets - resource_offsets = ( - resource_offsets or - [Coordinate().zero() for _ in range(len(containers))] - ) - + resource_offsets = resource_offsets or [Coordinate().zero() for _ in range(len(containers))] + # Default LLD mode == capacitive LLD if lld_mode is None: - lld_mode = [self.LLDMode(1)] * len(containers) + lld_mode = [self.LLDMode(1)] * len(containers) - assert len(containers) == len(use_channels) == len(resource_offsets) == len(lld_mode), \ - f"{containers=}, {use_channels=}, {resource_offsets=}, and {lld_mode=} must be same length" + assert ( + len(containers) == len(use_channels) == len(resource_offsets) == len(lld_mode) + ), f"{containers=}, {use_channels=}, {resource_offsets=}, and {lld_mode=} must be same length" # Validate individual modes for mode in lld_mode: - assert mode in [self.LLDMode(1), self.LLDMode(2)], \ - f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {mode}" + assert mode in [ + self.LLDMode(1), + self.LLDMode(2), + ], f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {mode}" if minimum_traverse_height_at_beginning_of_a_command is None: - minimum_traverse_height_at_beginning_of_a_command=traversal_height + minimum_traverse_height_at_beginning_of_a_command = traversal_height if min_z_endpos is None: - min_z_endpos=traversal_height + min_z_endpos = traversal_height center_offsets = [Coordinate.zero()] * len(use_channels) - if len(set(containers)) == 1: - center_offsets = get_wide_single_resource_liquid_op_offsets( - resource=containers[0], num_channels=len(containers) - ) + if len(set(containers)) == 1: + center_offsets = get_wide_single_resource_liquid_op_offsets( + resource=containers[0], num_channels=len(containers) + ) # Create proxy aspirate operations ops = [] for i, c in enumerate(containers): - ops.append( - SingleChannelAspiration( - resource=c, - offset=center_offsets[i], - tip=standard_volume_tip_with_filter(), - volume=0.0, - flow_rate=None, - liquid_height=None, - blow_out_air_volume=None, - mix=None - ) + ops.append( + SingleChannelAspiration( + resource=c, + offset=center_offsets[i], + tip=standard_volume_tip_with_filter(), + volume=0.0, + flow_rate=None, + liquid_height=None, + blow_out_air_volume=None, + mix=None, ) + ) minimum_heights_tip_can_go_to = [ - c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z - for c in containers + c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z for c in containers ] replicate_summary = [] @@ -1828,72 +1827,64 @@ async def probe_liquid_volumes( # TODO: merge x_chunking helper function of containers into PLR, then use it to smartly split # containers to probe -> minimise back and forth movement in x dimension when containers are # in different x coordinates - + for replicate_idx in range(techn_replicate_num): + x_coords_of_ops = [c.get_location_wrt(self.deck, "c").x for c in containers] + assert len(set(x_coords_of_ops)) == 1, ( + "probing is only allowed in the same x-coordinate for safety reasons" + f"given: {x_coords_of_ops}" + ) - x_coords_of_ops = [ - c.get_location_wrt(self.deck, "c").x - for c in containers - ] - assert len(set(x_coords_of_ops)) == 1, ( - "probing is only allowed in the same x-coordinate for safety reasons" - f"given: {x_coords_of_ops}" - ) + # Perform zero-volume aspirate operation for probing + await self.aspirate( + ops=ops, + use_channels=use_channels, + lld_mode=lld_mode, + minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, + immersion_depth=[-2] * len(containers), + minimum_height=minimum_heights_tip_can_go_to, + settling_time=[0] * len(containers), + pull_out_distance_transport_air=[0] * len(containers), + transport_air_volume=[0] * len(containers), + min_z_endpos=min_z_endpos, + ) - # Perform zero-volume aspirate operation for probing - await self.aspirate( - ops=ops, - use_channels=use_channels, - lld_mode=lld_mode, - minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, - immersion_depth=[-2]*len(containers), - minimum_height=minimum_heights_tip_can_go_to, - settling_time=[0]*len(containers), - pull_out_distance_transport_air=[0]*len(containers), - transport_air_volume=[0]*len(containers), - min_z_endpos=min_z_endpos, - ) - - all_absolute_liquid_heights = await self.request_pip_height_last_lld() - - absolute_llds_filtered_to_used_channels = [ - all_absolute_liquid_heights[i] for i in use_channels - ] - - # Compute heights relative to cavity bottom - relative_to_well = [] - for abs_h, resource in zip(absolute_llds_filtered_to_used_channels, containers): - bottom_z = resource.get_location_wrt( - self.deck, "c", "c", z="cavity_bottom" - ).z - relative_to_well.append(abs_h - bottom_z) - - # Convert heights to volumes - computed_volumes = [ - resource.compute_volume_from_height(h) - for resource, h in zip(containers, relative_to_well) - ] + all_absolute_liquid_heights = await self.request_pip_height_last_lld() - replicate_summary.append(computed_volumes) + absolute_llds_filtered_to_used_channels = [ + all_absolute_liquid_heights[i] for i in use_channels + ] - # Move to specified traversal height to save time - zs = {ch: traversal_height for ch in use_channels} - await self.position_channels_in_z_direction(zs) + # Compute heights relative to cavity bottom + relative_to_well = [] + for abs_h, resource in zip(absolute_llds_filtered_to_used_channels, containers): + bottom_z = resource.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z + relative_to_well.append(abs_h - bottom_z) + + # Convert heights to volumes + computed_volumes = [ + resource.compute_volume_from_height(h) for resource, h in zip(containers, relative_to_well) + ] + + replicate_summary.append(computed_volumes) + + # Move to specified traversal height to save time + zs = {ch: traversal_height for ch in use_channels} + await self.position_channels_in_z_direction(zs) merged_channel_results = list(zip(*replicate_summary)) - + if move_to_z_safety_after: await self.move_all_channels_in_z_safety() if return_geomean: - return [ - round(sum(channel_results)/techn_replicate_num, 2) - for channel_results in merged_channel_results - ] + return [ + round(sum(channel_results) / techn_replicate_num, 2) + for channel_results in merged_channel_results + ] return merged_channel_results - async def aspirate( self, ops: List[SingleChannelAspiration], From b2091c6a6350a6dc356e1b0ec734bdc93fa642bc Mon Sep 17 00:00:00 2001 From: Camillo Moschner <122165124+BioCam@users.noreply.github.com> Date: Fri, 12 Dec 2025 17:32:18 +0000 Subject: [PATCH 03/15] Update pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 3be5c6ff28..659e597163 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1762,7 +1762,7 @@ async def probe_liquid_volumes( return_geomean: bool = True, traversal_height: float = 240.0, move_to_z_safety_after: bool = True, - ) -> Union[List[float], List[tuple[float]]]: + ) -> Union[List[float], List[Tuple[float, ...]]]: """ Probe liquid volumes in one or more containers using Hamilton STAR Liquid Level Detection (LLD). Both lld_mode=1/capacitative-, and lld_mode=2/pressure- LLD are accepted. From 88ecf4063e43ce5db2f4e693dfe9d6712112670a Mon Sep 17 00:00:00 2001 From: Camillo Moschner <122165124+BioCam@users.noreply.github.com> Date: Fri, 12 Dec 2025 17:33:03 +0000 Subject: [PATCH 04/15] Update pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 659e597163..eea456bab9 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1831,7 +1831,7 @@ async def probe_liquid_volumes( for replicate_idx in range(techn_replicate_num): x_coords_of_ops = [c.get_location_wrt(self.deck, "c").x for c in containers] assert len(set(x_coords_of_ops)) == 1, ( - "probing is only allowed in the same x-coordinate for safety reasons" + "probing is only allowed in the same x-coordinate for safety reasons." f"given: {x_coords_of_ops}" ) From a3f8312a5a578494449c7b261b7ce2280013ea98 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 12 Dec 2025 17:34:28 +0000 Subject: [PATCH 05/15] rename to argument `return_mean` --- pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index eea456bab9..921677b45d 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -17,6 +17,7 @@ Literal, Optional, Sequence, + Tuple, Type, TypeVar, Union, @@ -1759,7 +1760,7 @@ async def probe_liquid_volumes( minimum_traverse_height_at_beginning_of_a_command: Optional[float] = None, min_z_endpos: Optional[float] = None, techn_replicate_num: int = 3, - return_geomean: bool = True, + return_mean: bool = True, traversal_height: float = 240.0, move_to_z_safety_after: bool = True, ) -> Union[List[float], List[Tuple[float, ...]]]: @@ -1877,7 +1878,7 @@ async def probe_liquid_volumes( if move_to_z_safety_after: await self.move_all_channels_in_z_safety() - if return_geomean: + if return_mean: return [ round(sum(channel_results) / techn_replicate_num, 2) for channel_results in merged_channel_results From f17e90e7facd872c8a40aabfee823f04b5e671ac Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 12 Dec 2025 17:38:50 +0000 Subject: [PATCH 06/15] default `min_z_endpos` to `self._channel_traversal_height` enabling time acceleration --- pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 921677b45d..a666a14d7b 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1761,7 +1761,7 @@ async def probe_liquid_volumes( min_z_endpos: Optional[float] = None, techn_replicate_num: int = 3, return_mean: bool = True, - traversal_height: float = 240.0, + traversal_height: Optional[float] = None, move_to_z_safety_after: bool = True, ) -> Union[List[float], List[Tuple[float, ...]]]: """ @@ -1795,7 +1795,7 @@ async def probe_liquid_volumes( minimum_traverse_height_at_beginning_of_a_command = traversal_height if min_z_endpos is None: - min_z_endpos = traversal_height + min_z_endpos = self._channel_traversal_height center_offsets = [Coordinate.zero()] * len(use_channels) if len(set(containers)) == 1: From ddca287d60d266e839ba3c1a8600db57d0564aa7 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 12 Dec 2025 17:40:40 +0000 Subject: [PATCH 07/15] and `traversal_height` argument --- pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index a666a14d7b..93130ab675 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1797,6 +1797,9 @@ async def probe_liquid_volumes( if min_z_endpos is None: min_z_endpos = self._channel_traversal_height + if traversal_height is None: + traversal_height = self._channel_traversal_height + center_offsets = [Coordinate.zero()] * len(use_channels) if len(set(containers)) == 1: center_offsets = get_wide_single_resource_liquid_op_offsets( From 05e7956629d7251de4afa4b6ed0243c4065890b3 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 12 Dec 2025 17:44:45 +0000 Subject: [PATCH 08/15] and `minimum_traverse_height_at_beginning_of_a_command` --- pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 93130ab675..49d123e070 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1792,7 +1792,7 @@ async def probe_liquid_volumes( ], f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {mode}" if minimum_traverse_height_at_beginning_of_a_command is None: - minimum_traverse_height_at_beginning_of_a_command = traversal_height + minimum_traverse_height_at_beginning_of_a_command = self._channel_traversal_height if min_z_endpos is None: min_z_endpos = self._channel_traversal_height From 3ea80ce80c0a2a0d6299764eac34657cf5af218c Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 17 Dec 2025 10:29:22 +0000 Subject: [PATCH 09/15] rebuild `probe_liquid_heights` by splitting new `probe_liquid_volumes` --- .../backends/hamilton/STAR_backend.py | 300 +++++++++++------- 1 file changed, 192 insertions(+), 108 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 24251c51a4..b17be17edf 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1670,111 +1670,70 @@ class LLDMode(enum.Enum): Z_TOUCH_OFF = 4 async def probe_liquid_heights( - self, - containers: List[Container], - use_channels: List[int], - tips: List[HamiltonTip], - resource_offsets: Optional[List[Coordinate]] = None, - move_to_z_safety_after: bool = True, - ) -> List[float]: - """Probe liquid heights for the specified channels. - - Moves the channels to the x and y positions of the containers, then probes the liquid height - using the CLLD function. - - Returns the liquid height in each well in mm with respect to the bottom of the container cavity. - Returns `None` for channels where the liquid height could not be determined. - """ - - if any(not resource.supports_compute_height_volume_functions() for resource in containers): - raise ValueError( - "automatic_surface_following can only be used with containers that support height<->volume functions." - ) - - resource_offsets = resource_offsets or [Coordinate.zero()] * len(containers) - - assert len(containers) == len(use_channels) == len(resource_offsets) == len(tips) - - await self.move_all_channels_in_z_safety() - - # Check if all channels are on the same x position, then move there - x_pos = [ - resource.get_location_wrt(self.deck, x="c", y="c", z="b").x + offset.x - for resource, offset in zip(containers, resource_offsets) - ] - if len(set(x_pos)) > 1: - raise NotImplementedError( - "automatic_surface_following is not supported for multiple x positions." - ) - await self.move_channel_x(0, x_pos[0]) - - # move channels to above their y positions - y_pos = [ - resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + offset.y - for resource, offset in zip(containers, resource_offsets) - ] - await self.position_channels_in_y_direction( - {channel: y for channel, y in zip(use_channels, y_pos)} - ) - - # detect liquid heights - current_absolute_liquid_heights = await asyncio.gather( - *[ - self.move_z_drive_to_liquid_surface_using_clld( - channel_idx=channel, - lowest_immers_pos=container.get_absolute_location("c", "c", "cavity_bottom").z - + tip.total_tip_length - - tip.fitting_depth, - start_pos_search=container.get_absolute_location("c", "c", "t").z - + tip.total_tip_length - - tip.fitting_depth - + 5, - ) - for channel, container, tip in zip(use_channels, containers, tips) - ] - ) - - current_absolute_liquid_heights = await self.request_pip_height_last_lld() # type: ignore - - filtered_absolute_liquid_heights = [ - current_absolute_liquid_heights[idx] for idx in use_channels - ] - - relative_to_well = [ - filtered_absolute_liquid_heights[i] - - resource.get_absolute_location("c", "c", "cavity_bottom").z - for i, resource in enumerate(containers) - ] - - if move_to_z_safety_after: - await self.move_all_channels_in_z_safety() - - return relative_to_well - - async def probe_liquid_volumes( self, containers: List[Container], use_channels: List[int], resource_offsets: Optional[List[Coordinate]] = None, lld_mode: Optional[List[LLDMode]] = None, + lld_search_height: Optional[List[float]] = None, minimum_traverse_height_at_beginning_of_a_command: Optional[float] = None, min_z_endpos: Optional[float] = None, - techn_replicate_num: int = 3, - return_mean: bool = True, traversal_height: Optional[float] = None, + post_detection_distance: float = 2.0, + swap_speed: Optional[List[float]] = None, + n_replicates: int = 1, + return_mean: bool = True, move_to_z_safety_after: bool = True, ) -> Union[List[float], List[Tuple[float, ...]]]: """ - Probe liquid volumes in one or more containers using Hamilton STAR Liquid Level Detection (LLD). - Both lld_mode=1/capacitative-, and lld_mode=2/pressure- LLD are accepted. - - This performs repeated zero-volume aspirate operations (LLD-only probe moves) for the - specified channels and converts the detected liquid heights into estimated volumes using - each container's `compute_volume_from_height()` method. + Probe liquid surface heights in one or more containers using Hamilton STAR + Liquid Level Detection (LLD). + + This method performs one or more zero-volume aspirate operations (LLD-only probe + moves) on the specified channels and records the detected liquid surface height. + Both capacitive (LLDMode=1) and pressure-based (LLDMode=2) detection modes are + supported. + + For each replicate, the absolute Z position reported by the STAR at the moment of + LLD detection is converted into a height relative to the container cavity bottom. + Negative heights are clamped to zero as a safety hedge against geometry definition + errors. + + Parameters + ---------- + containers: + Containers to probe. All containers must share the same X-coordinate for safety reasons. + use_channels: + STAR channels to use for probing. Must correspond one-to-one with `containers`. + resource_offsets: + Optional per-container XY offsets applied during probing. Defaults to zero offsets. + lld_mode: + Per-channel LLD mode (1 = capacitive, 2 = pressure-based). Defaults to cLLD. + lld_search_height: + Absolute Z height (deck coordinates) from which the LLD search begins. Defaults to + 5 mm above the container top. + post_detection_distance: + Distance (in mm) the tip is allowed to continue moving downward after LLD + detection. Defaults to 2 mm. + swap_speed: + Optional per-channel Z movement speed during probing. Defaults to 100 mm/s. + n_replicates: + Number of repeated LLD probe operations per channel. Defaults to 1. + return_mean: + If True, return the mean liquid height per channel. If False, return all + replicate heights per channel. Defaults to True. + move_to_z_safety_after: + If True, move all channels to Z safety after probing completes. Defaults to True. + + Returns + ------- + List[float] or List[Tuple[float, ...]] + If `return_mean` is True, returns mean liquid heights (mm) per channel. + Otherwise, returns per-channel tuples of replicate heights. """ # Default offsets - resource_offsets = resource_offsets or [Coordinate().zero() for _ in range(len(containers))] + resource_offsets = resource_offsets or [Coordinate.zero() for _ in range(len(containers))] # Default LLD mode == capacitive LLD if lld_mode is None: @@ -1791,6 +1750,13 @@ async def probe_liquid_volumes( self.LLDMode(2), ], f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {mode}" + if lld_search_height is None: + lld_search_height = [ + c.get_location_wrt(self.deck, "c", "c", z="top").z + + 5.0 # Default 5 mm above top of Container + for c in containers + ] + if minimum_traverse_height_at_beginning_of_a_command is None: minimum_traverse_height_at_beginning_of_a_command = self._channel_traversal_height @@ -1800,19 +1766,29 @@ async def probe_liquid_volumes( if traversal_height is None: traversal_height = self._channel_traversal_height - center_offsets = [Coordinate.zero()] * len(use_channels) if len(set(containers)) == 1: - center_offsets = get_wide_single_resource_liquid_op_offsets( + resource_offsets = get_wide_single_resource_liquid_op_offsets( resource=containers[0], num_channels=len(containers) ) + tip_presence_summary = [] + for c in self._num_channels: + channel_check = await self.request_tip_presence() + tip_presence_summary.append(channel_check) + + if not all(tip_presence_summary[ch] for ch in use_channels): + raise RuntimeError( + "All channels used for probing must have tips attached." + f" Tips present: {tip_presence_summary}, requested channels: {use_channels}" + ) + # Create proxy aspirate operations ops = [] for i, c in enumerate(containers): ops.append( SingleChannelAspiration( resource=c, - offset=center_offsets[i], + offset=resource_offsets[i], tip=standard_volume_tip_with_filter(), volume=0.0, flow_rate=None, @@ -1826,13 +1802,18 @@ async def probe_liquid_volumes( c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z for c in containers ] + if swap_speed is None: + swap_speed = [100.0] * len(containers) + + assert n_replicates > 0 + replicate_summary = [] # TODO: merge x_chunking helper function of containers into PLR, then use it to smartly split # containers to probe -> minimise back and forth movement in x dimension when containers are # in different x coordinates - for replicate_idx in range(techn_replicate_num): + for replicate_idx in range(n_replicates): x_coords_of_ops = [c.get_location_wrt(self.deck, "c").x for c in containers] assert len(set(x_coords_of_ops)) == 1, ( "probing is only allowed in the same x-coordinate for safety reasons." @@ -1844,12 +1825,14 @@ async def probe_liquid_volumes( ops=ops, use_channels=use_channels, lld_mode=lld_mode, + lld_search_height=lld_search_height, minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, - immersion_depth=[-2] * len(containers), + immersion_depth=[-post_detection_distance] * len(containers), minimum_height=minimum_heights_tip_can_go_to, settling_time=[0] * len(containers), pull_out_distance_transport_air=[0] * len(containers), transport_air_volume=[0] * len(containers), + swap_speed=swap_speed, min_z_endpos=min_z_endpos, ) @@ -1860,22 +1843,19 @@ async def probe_liquid_volumes( ] # Compute heights relative to cavity bottom - relative_to_well = [] + relative_liquid_height_to_well = [] for abs_h, resource in zip(absolute_llds_filtered_to_used_channels, containers): bottom_z = resource.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z - relative_to_well.append(abs_h - bottom_z) - - # Convert heights to volumes - computed_volumes = [ - resource.compute_volume_from_height(h) for resource, h in zip(containers, relative_to_well) - ] - - replicate_summary.append(computed_volumes) + relative_height = abs_h - bottom_z + # Hedge against definition mistakes (cavity bottom lower than expected) + relative_liquid_height_to_well.append(relative_height if relative_height >= 0 else 0.0) # Move to specified traversal height to save time zs = {ch: traversal_height for ch in use_channels} await self.position_channels_in_z_direction(zs) + replicate_summary.append(relative_liquid_height_to_well) + merged_channel_results = list(zip(*replicate_summary)) if move_to_z_safety_after: @@ -1883,11 +1863,115 @@ async def probe_liquid_volumes( if return_mean: return [ - round(sum(channel_results) / techn_replicate_num, 2) - for channel_results in merged_channel_results + round(sum(channel_results) / n_replicates, 2) for channel_results in merged_channel_results + ] + + else: + return merged_channel_results + + async def probe_liquid_volumes( + self, + containers: List[Container], + use_channels: List[int], + resource_offsets: Optional[List[Coordinate]] = None, + lld_mode: Optional[List[LLDMode]] = None, + lld_search_height: Optional[List[float]] = None, + minimum_traverse_height_at_beginning_of_a_command: Optional[float] = None, + min_z_endpos: Optional[float] = None, + traversal_height: Optional[float] = None, + post_detection_distance: float = 2.0, + swap_speed: Optional[List[float]] = None, + n_replicates: int = 3, + return_mean: bool = True, + move_to_z_safety_after: bool = True, + ) -> Union[List[float], List[Tuple[float, ...]]]: + """ + Probe liquid volumes in one or more containers using Hamilton STAR Liquid Level + Detection (LLD). + + This method performs repeated LLD-only probe operations to measure the liquid + surface height in each container and converts the measured heights into liquid + volumes using each container’s geometric model + (`Container.compute_volume_from_height`). + + Only containers that support height-to-volume conversion can be used with this + method. All probing motion, safety constraints, and replicate handling are + delegated to `probe_liquid_heights`. + + Parameters + ---------- + containers: + Containers to probe. All containers must support height-to-volume conversion. + use_channels: + STAR channels to use for probing. Must correspond one-to-one with + `containers`. + resource_offsets: + Optional per-container XY offsets applied during probing. + lld_mode: + Per-channel LLD mode (1 = capacitive, 2 = pressure-based). + lld_search_height: + Absolute Z height (deck coordinates) from which the LLD search begins. + post_detection_distance: + Distance (in mm) the tip is allowed to continue moving downward after LLD + detection. + swap_speed: + Optional per-channel Z movement speed during probing. + n_replicates: + Number of repeated LLD probe operations per channel. + return_mean: + If True, return the mean liquid volume per channel. If False, return all + replicate volumes per channel. + move_to_z_safety_after: + If True, move all channels to Z safety after probing completes. + + Returns + ------- + List[float] or List[Tuple[float, ...]] + If `return_mean` is True, returns mean liquid volumes per channel. + Otherwise, returns per-channel tuples of replicate volumes. + """ + + # Validate that containers support height<->volume functions + for resource in containers: + try: + resource.compute_volume_from_height(1.0) + except NotImplementedError as e: + raise ValueError( + "probe_liquid_volumes can only be used with containers " + "that support height<->volume functions." + ) from e + + # First, probe liquid heights + merged_channel_results_liquid_heights = await self.probe_liquid_heights( + containers=containers, + use_channels=use_channels, + resource_offsets=resource_offsets, + lld_mode=lld_mode, + lld_search_height=lld_search_height, + minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, + min_z_endpos=min_z_endpos, + traversal_height=traversal_height, + post_detection_distance=post_detection_distance, + swap_speed=swap_speed, + n_replicates=n_replicates, + return_mean=False, + move_to_z_safety_after=move_to_z_safety_after, + ) + + merged_channel_results_volumes = [] + for channel_results in merged_channel_results_liquid_heights: + computed_volumes = [ + resource.compute_volume_from_height(h) for resource, h in zip(containers, channel_results) + ] + merged_channel_results_volumes.append(computed_volumes) + + if return_mean: + return [ + round(sum(channel_results) / n_replicates, 2) + for channel_results in merged_channel_results_volumes ] - return merged_channel_results + return merged_channel_results_volumes async def aspirate( self, From d0e6523c37da2e6ace7848944bac2ab2faee5ba5 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 17 Dec 2025 13:41:38 +0000 Subject: [PATCH 10/15] fix `await self.request_tip_presence() --- pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index b17be17edf..67512e560a 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1771,10 +1771,7 @@ async def probe_liquid_heights( resource=containers[0], num_channels=len(containers) ) - tip_presence_summary = [] - for c in self._num_channels: - channel_check = await self.request_tip_presence() - tip_presence_summary.append(channel_check) + tip_presence_summary = await self.request_tip_presence() if not all(tip_presence_summary[ch] for ch in use_channels): raise RuntimeError( From f5a6cababdcdd6fefcabe245789ed53f080a084f Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 17 Dec 2025 13:57:25 +0000 Subject: [PATCH 11/15] fix `probe_liquid_heights` across backend --- .../backends/hamilton/STAR_backend.py | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 67512e560a..77a504be95 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1677,6 +1677,7 @@ async def probe_liquid_heights( lld_mode: Optional[List[LLDMode]] = None, lld_search_height: Optional[List[float]] = None, minimum_traverse_height_at_beginning_of_a_command: Optional[float] = None, + minimum_height: Optional[float] = None, min_z_endpos: Optional[float] = None, traversal_height: Optional[float] = None, post_detection_distance: float = 2.0, @@ -1760,8 +1761,15 @@ async def probe_liquid_heights( if minimum_traverse_height_at_beginning_of_a_command is None: minimum_traverse_height_at_beginning_of_a_command = self._channel_traversal_height + + minimal_z_position = min([ + c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z for c in containers + ]) if min_z_endpos is None: - min_z_endpos = self._channel_traversal_height + min_z_endpos = minimal_z_position + + if minimum_height is None: + minimum_height = minimal_z_position if traversal_height is None: traversal_height = self._channel_traversal_height @@ -1795,10 +1803,6 @@ async def probe_liquid_heights( ) ) - minimum_heights_tip_can_go_to = [ - c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z for c in containers - ] - if swap_speed is None: swap_speed = [100.0] * len(containers) @@ -1825,7 +1829,7 @@ async def probe_liquid_heights( lld_search_height=lld_search_height, minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, immersion_depth=[-post_detection_distance] * len(containers), - minimum_height=minimum_heights_tip_can_go_to, + minimum_height=min_z_endpos, settling_time=[0] * len(containers), pull_out_distance_transport_air=[0] * len(containers), transport_air_volume=[0] * len(containers), @@ -1939,20 +1943,23 @@ async def probe_liquid_volumes( ) from e # First, probe liquid heights - merged_channel_results_liquid_heights = await self.probe_liquid_heights( - containers=containers, - use_channels=use_channels, - resource_offsets=resource_offsets, - lld_mode=lld_mode, - lld_search_height=lld_search_height, - minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, - min_z_endpos=min_z_endpos, - traversal_height=traversal_height, - post_detection_distance=post_detection_distance, - swap_speed=swap_speed, - n_replicates=n_replicates, - return_mean=False, - move_to_z_safety_after=move_to_z_safety_after, + merged_channel_results_liquid_heights = cast( + List[Tuple[float, ...]], + await self.probe_liquid_heights( + containers=containers, + use_channels=use_channels, + resource_offsets=resource_offsets, + lld_mode=lld_mode, + lld_search_height=lld_search_height, + minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, + min_z_endpos=min_z_endpos, + traversal_height=traversal_height, + post_detection_distance=post_detection_distance, + swap_speed=swap_speed, + n_replicates=n_replicates, + return_mean=False, + move_to_z_safety_after=move_to_z_safety_after, + ) ) merged_channel_results_volumes = [] @@ -2224,7 +2231,6 @@ async def aspirate( liquid_heights = await self.probe_liquid_heights( containers=[op.resource for op in ops], use_channels=use_channels, - tips=[cast(HamiltonTip, op.tip) for op in ops], resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, ) @@ -2591,7 +2597,6 @@ async def dispense( liquid_heights = await self.probe_liquid_heights( containers=[op.resource for op in ops], use_channels=use_channels, - tips=[cast(HamiltonTip, op.tip) for op in ops], resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, ) From faafea30b8364ce05c67e21e1f72969f25481f80 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 17 Dec 2025 15:13:39 +0000 Subject: [PATCH 12/15] make `minimum_height` a list again --- .../liquid_handling/backends/hamilton/STAR_backend.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 77a504be95..a17e4fe6b6 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1677,7 +1677,7 @@ async def probe_liquid_heights( lld_mode: Optional[List[LLDMode]] = None, lld_search_height: Optional[List[float]] = None, minimum_traverse_height_at_beginning_of_a_command: Optional[float] = None, - minimum_height: Optional[float] = None, + minimum_height: Optional[List[float]] = None, min_z_endpos: Optional[float] = None, traversal_height: Optional[float] = None, post_detection_distance: float = 2.0, @@ -1762,14 +1762,14 @@ async def probe_liquid_heights( minimum_traverse_height_at_beginning_of_a_command = self._channel_traversal_height - minimal_z_position = min([ + minimal_z_positions =[ c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z for c in containers - ]) + ] if min_z_endpos is None: - min_z_endpos = minimal_z_position + min_z_endpos = min(minimal_z_positions) if minimum_height is None: - minimum_height = minimal_z_position + minimum_height = minimal_z_positions if traversal_height is None: traversal_height = self._channel_traversal_height From 44507094441a953ac5729dd3aade3ce7123ad3ce Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 17 Dec 2025 15:17:42 +0000 Subject: [PATCH 13/15] typo --- .../backends/hamilton/STAR_backend.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index a17e4fe6b6..ff1f069945 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1761,15 +1761,14 @@ async def probe_liquid_heights( if minimum_traverse_height_at_beginning_of_a_command is None: minimum_traverse_height_at_beginning_of_a_command = self._channel_traversal_height - - minimal_z_positions =[ - c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z for c in containers - ] + minimal_z_positions = [ + c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z for c in containers + ] if min_z_endpos is None: min_z_endpos = min(minimal_z_positions) if minimum_height is None: - minimum_height = minimal_z_positions + minimum_height = minimal_z_positions if traversal_height is None: traversal_height = self._channel_traversal_height @@ -1829,7 +1828,7 @@ async def probe_liquid_heights( lld_search_height=lld_search_height, minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, immersion_depth=[-post_detection_distance] * len(containers), - minimum_height=min_z_endpos, + minimum_height=minimum_height, settling_time=[0] * len(containers), pull_out_distance_transport_air=[0] * len(containers), transport_air_volume=[0] * len(containers), @@ -1944,8 +1943,8 @@ async def probe_liquid_volumes( # First, probe liquid heights merged_channel_results_liquid_heights = cast( - List[Tuple[float, ...]], - await self.probe_liquid_heights( + List[Tuple[float, ...]], + await self.probe_liquid_heights( containers=containers, use_channels=use_channels, resource_offsets=resource_offsets, @@ -1959,7 +1958,7 @@ async def probe_liquid_volumes( n_replicates=n_replicates, return_mean=False, move_to_z_safety_after=move_to_z_safety_after, - ) + ), ) merged_channel_results_volumes = [] From e25ac8dd5a35df136b6ac69b7208b67e70f12145 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 17 Dec 2025 16:09:58 +0000 Subject: [PATCH 14/15] updates --- pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index ff1f069945..a2aec345b0 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1754,6 +1754,7 @@ async def probe_liquid_heights( if lld_search_height is None: lld_search_height = [ c.get_location_wrt(self.deck, "c", "c", z="top").z + - c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z + 5.0 # Default 5 mm above top of Container for c in containers ] @@ -1891,7 +1892,7 @@ async def probe_liquid_volumes( This method performs repeated LLD-only probe operations to measure the liquid surface height in each container and converts the measured heights into liquid - volumes using each container’s geometric model + volumes using each container's geometric model (`Container.compute_volume_from_height`). Only containers that support height-to-volume conversion can be used with this From 78d73fe3cb3b0c23ca1e0e7b6e3257859a765d70 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 17 Dec 2025 22:44:25 +0000 Subject: [PATCH 15/15] remove superfluous logs --- .../backends/hamilton/STAR_backend.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index a2aec345b0..cc087ad9bd 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1814,12 +1814,13 @@ async def probe_liquid_heights( # containers to probe -> minimise back and forth movement in x dimension when containers are # in different x coordinates - for replicate_idx in range(n_replicates): + for _ in range(n_replicates): x_coords_of_ops = [c.get_location_wrt(self.deck, "c").x for c in containers] - assert len(set(x_coords_of_ops)) == 1, ( - "probing is only allowed in the same x-coordinate for safety reasons." - f"given: {x_coords_of_ops}" - ) + if n_replicates > 1 and len(set(x_coords_of_ops)) > 1: + raise ValueError( + "probing is only allowed in the same x-coordinate for safety reasons." + f"given: {x_coords_of_ops}" + ) # Perform zero-volume aspirate operation for probing await self.aspirate( @@ -2228,11 +2229,18 @@ async def aspirate( if any(op.liquid_height is not None for op in ops): raise ValueError("Cannot use probe_liquid_height when liquid heights are set.") + if len(set(x_positions)) > 1: + raise ValueError( + "probe_liquid_height can only be used when all operations are in the same x position." + ) + liquid_heights = await self.probe_liquid_heights( containers=[op.resource for op in ops], use_channels=use_channels, resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, + minimum_height=minimum_height, + min_z_endpos=minimum_height, ) # override minimum traversal height because we don't want to move channels up. we are already above the liquid.