Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion pylabrobot/resources/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
from itertools import groupby
from string import ascii_uppercase as LETTERS
from typing import Dict, List, Optional, Type, TypeVar
from typing import Any, Dict, List, Optional, Type, TypeVar

from pylabrobot.resources.coordinate import Coordinate
from pylabrobot.resources.resource import Resource
Expand Down Expand Up @@ -227,3 +228,77 @@ def query(
)
)
return matched


R = TypeVar("R", bound=Resource)


def sort_by_xy_and_chunk_by_x(
resources: list[R],
max_chunk_size: int,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might make sense to have max_chunk_size be optional as well. I could imagine wanting to just get a separate chunk for each unique x coordinate (as is I would just enter an arbitrarily large chunk size if I wanted to do this).

sort_chunks_by_size: bool = True,
) -> list[list[R]]:
"""
Sort resources spatially and partition them into chunks for channel processing.

Procedure
---------
1. Sort all resources by:
- x ascending
- y descending within each x
2. Group resources into chunks based on identical x values.
3. Split each chunk into sub-chunks of size <= max_chunk_size.
4. Optionally sort the resulting sub-chunks by their length (smallest -> largest).

Args:
resources: List of resources that implement ``.get_absolute_location()``, returning an object with ``x`` and ``y`` attributes.
max_chunk_size: Maximum allowed size for any produced chunk or sub-chunk.
sort_chunks_by_size: If True (default), the output list of chunks is sorted by ascending chunk size. If False, chunks retain their original order.

Returns:
A list of grouped and sorted resources.

Example:
>>> sorted_chunks = sort_by_xy_and_chunk_by_x(well_list, max_chunk_size=8)
>>> [
... list(
... zip(
... [r.get_identifier() for r in chunk],
... [r.get_absolute_location() for r in chunk],
... )
... )
... for chunk in sorted_chunks
... ]

[[('D1', Coordinate(x=450.9, y=402.3, z=164.45)),
('H1', Coordinate(x=450.9, y=366.3, z=164.45)), ...],
[('D2', Coordinate(x=459.9, y=402.3, z=164.45)), ...]]

"""

# 1. & 2.: Sort by x ascending, y descending
sorted_resources_with_loc = sorted(
resources,
key=lambda r: (
r.get_absolute_location().x,
-r.get_absolute_location().y,
),
)

# 3. Group into chunks by x
grouped_by_x = [
list(group)
for _, group in groupby(
sorted_resources_with_loc,
key=lambda r: r.get_absolute_location().x,
)
]

# 4. Split chunks by max_chunk_size
split_chunks: list[list[Any]] = []
for chunk in grouped_by_x:
for i in range(0, len(chunk), max_chunk_size):
split_chunks.append(chunk[i : i + max_chunk_size])

# Optional 5: Sort chunks by number of elements
return sorted(split_chunks, key=len) if sort_chunks_by_size else split_chunks