diff --git a/pylabrobot/resources/utils.py b/pylabrobot/resources/utils.py index b33866cf63c..7d11e870dc5 100644 --- a/pylabrobot/resources/utils.py +++ b/pylabrobot/resources/utils.py @@ -1,6 +1,7 @@ import re +from itertools import groupby from string import ascii_uppercase as LETTERS -from typing import Dict, List, Optional, Type, TypeVar +from typing import Any, Dict, List, Optional, Type, TypeVar from pylabrobot.resources.coordinate import Coordinate from pylabrobot.resources.resource import Resource @@ -227,3 +228,77 @@ def query( ) ) return matched + + +R = TypeVar("R", bound=Resource) + + +def sort_by_xy_and_chunk_by_x( + resources: list[R], + max_chunk_size: int, + sort_chunks_by_size: bool = True, +) -> list[list[R]]: + """ + Sort resources spatially and partition them into chunks for channel processing. + + Procedure + --------- + 1. Sort all resources by: + - x ascending + - y descending within each x + 2. Group resources into chunks based on identical x values. + 3. Split each chunk into sub-chunks of size <= max_chunk_size. + 4. Optionally sort the resulting sub-chunks by their length (smallest -> largest). + + Args: + resources: List of resources that implement ``.get_absolute_location()``, returning an object with ``x`` and ``y`` attributes. + max_chunk_size: Maximum allowed size for any produced chunk or sub-chunk. + sort_chunks_by_size: If True (default), the output list of chunks is sorted by ascending chunk size. If False, chunks retain their original order. + + Returns: + A list of grouped and sorted resources. + + Example: + >>> sorted_chunks = sort_by_xy_and_chunk_by_x(well_list, max_chunk_size=8) + >>> [ + ... list( + ... zip( + ... [r.get_identifier() for r in chunk], + ... [r.get_absolute_location() for r in chunk], + ... ) + ... ) + ... for chunk in sorted_chunks + ... ] + + [[('D1', Coordinate(x=450.9, y=402.3, z=164.45)), + ('H1', Coordinate(x=450.9, y=366.3, z=164.45)), ...], + [('D2', Coordinate(x=459.9, y=402.3, z=164.45)), ...]] + + """ + + # 1. & 2.: Sort by x ascending, y descending + sorted_resources_with_loc = sorted( + resources, + key=lambda r: ( + r.get_absolute_location().x, + -r.get_absolute_location().y, + ), + ) + + # 3. Group into chunks by x + grouped_by_x = [ + list(group) + for _, group in groupby( + sorted_resources_with_loc, + key=lambda r: r.get_absolute_location().x, + ) + ] + + # 4. Split chunks by max_chunk_size + split_chunks: list[list[Any]] = [] + for chunk in grouped_by_x: + for i in range(0, len(chunk), max_chunk_size): + split_chunks.append(chunk[i : i + max_chunk_size]) + + # Optional 5: Sort chunks by number of elements + return sorted(split_chunks, key=len) if sort_chunks_by_size else split_chunks