|
1 | 1 | import re |
| 2 | +from itertools import groupby |
2 | 3 | from string import ascii_uppercase as LETTERS |
3 | | -from typing import Dict, List, Optional, Type, TypeVar |
| 4 | +from typing import Any, Dict, List, Optional, Type, TypeVar |
4 | 5 |
|
5 | 6 | from pylabrobot.resources.coordinate import Coordinate |
6 | 7 | from pylabrobot.resources.resource import Resource |
@@ -227,3 +228,77 @@ def query( |
227 | 228 | ) |
228 | 229 | ) |
229 | 230 | return matched |
| 231 | + |
| 232 | + |
| 233 | +R = TypeVar("R", bound=Resource) |
| 234 | + |
| 235 | + |
| 236 | +def sort_by_xy_and_chunk_by_x( |
| 237 | + resources: list[R], |
| 238 | + max_chunk_size: int, |
| 239 | + sort_chunks_by_size: bool = True, |
| 240 | +) -> list[list[R]]: |
| 241 | + """ |
| 242 | + Sort resources spatially and partition them into chunks for channel processing. |
| 243 | +
|
| 244 | + Procedure |
| 245 | + --------- |
| 246 | + 1. Sort all resources by: |
| 247 | + - x ascending |
| 248 | + - y descending within each x |
| 249 | + 2. Group resources into chunks based on identical x values. |
| 250 | + 3. Split each chunk into sub-chunks of size <= max_chunk_size. |
| 251 | + 4. Optionally sort the resulting sub-chunks by their length (smallest -> largest). |
| 252 | +
|
| 253 | + Args: |
| 254 | + resources: List of resources that implement ``.get_absolute_location()``, returning an object with ``x`` and ``y`` attributes. |
| 255 | + max_chunk_size: Maximum allowed size for any produced chunk or sub-chunk. |
| 256 | + sort_chunks_by_size: If True (default), the output list of chunks is sorted by ascending chunk size. If False, chunks retain their original order. |
| 257 | +
|
| 258 | + Returns: |
| 259 | + A list of grouped and sorted resources. |
| 260 | +
|
| 261 | + Example: |
| 262 | + >>> sorted_chunks = sort_by_xy_and_chunk_by_x(well_list, max_chunk_size=8) |
| 263 | + >>> [ |
| 264 | + ... list( |
| 265 | + ... zip( |
| 266 | + ... [r.get_identifier() for r in chunk], |
| 267 | + ... [r.get_absolute_location() for r in chunk], |
| 268 | + ... ) |
| 269 | + ... ) |
| 270 | + ... for chunk in sorted_chunks |
| 271 | + ... ] |
| 272 | +
|
| 273 | + [[('D1', Coordinate(x=450.9, y=402.3, z=164.45)), |
| 274 | + ('H1', Coordinate(x=450.9, y=366.3, z=164.45)), ...], |
| 275 | + [('D2', Coordinate(x=459.9, y=402.3, z=164.45)), ...]] |
| 276 | +
|
| 277 | + """ |
| 278 | + |
| 279 | + # 1. & 2.: Sort by x ascending, y descending |
| 280 | + sorted_resources_with_loc = sorted( |
| 281 | + resources, |
| 282 | + key=lambda r: ( |
| 283 | + r.get_absolute_location().x, |
| 284 | + -r.get_absolute_location().y, |
| 285 | + ), |
| 286 | + ) |
| 287 | + |
| 288 | + # 3. Group into chunks by x |
| 289 | + grouped_by_x = [ |
| 290 | + list(group) |
| 291 | + for _, group in groupby( |
| 292 | + sorted_resources_with_loc, |
| 293 | + key=lambda r: r.get_absolute_location().x, |
| 294 | + ) |
| 295 | + ] |
| 296 | + |
| 297 | + # 4. Split chunks by max_chunk_size |
| 298 | + split_chunks: list[list[Any]] = [] |
| 299 | + for chunk in grouped_by_x: |
| 300 | + for i in range(0, len(chunk), max_chunk_size): |
| 301 | + split_chunks.append(chunk[i : i + max_chunk_size]) |
| 302 | + |
| 303 | + # Optional 5: Sort chunks by number of elements |
| 304 | + return sorted(split_chunks, key=len) if sort_chunks_by_size else split_chunks |
0 commit comments