From 88e0fb15474759edc61203700acfb56ec20d58fa Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Mon, 12 Jan 2026 12:24:17 +0100 Subject: [PATCH 1/5] Expand existing test --- src/crawlee/crawlers/_basic/_basic_crawler.py | 3 +++ .../crawlers/_basic/test_basic_crawler.py | 21 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index be489f9c9a..aff285c489 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -297,6 +297,7 @@ def __init__( status_message_logging_interval: timedelta = timedelta(seconds=10), status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]] | None = None, + id: int | None = None, _context_pipeline: ContextPipeline[TCrawlingContext] | None = None, _additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None, _logger: logging.Logger | None = None, @@ -349,6 +350,8 @@ def __init__( status_message_logging_interval: Interval for logging the crawler status messages. status_message_callback: Allows overriding the default status message. The default status message is provided in the parameters. Returning `None` suppresses the status message. + id: Id of the crawler used for state and statistics tracking. You can use same explicit id to share same + state and statistics between two crawlers. By default, each crawler will use own state and statistics. _context_pipeline: Enables extending the request lifecycle and modifying the crawling context. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. _additional_context_managers: Additional context managers used throughout the crawler lifecycle. diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 5800e59d4e..7df6d7f849 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -1763,6 +1763,12 @@ async def _run_crawler(requests: list[str], storage_dir: str) -> StatisticsState async def request_handler(context: BasicCrawlingContext) -> None: context.log.info(f'Processing {context.request.url} ...') + # Add visited url to crawler state and use it to verify state persistence. + state = await context.use_state({'urls': []}) + state['urls'] = state.get('urls') + assert isinstance(state['urls'], list) + state['urls'].append(context.request.url) + context.log.info(f'State {state}') crawler = BasicCrawler( request_handler=request_handler, @@ -1777,11 +1783,14 @@ def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsSta return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir)) -async def test_crawler_statistics_persistence(tmp_path: Path) -> None: +async def test_crawler_state_persistence(tmp_path: Path) -> None: """Test that crawler statistics persist and are loaded correctly. This test simulates starting the crawler process twice, and checks that the statistics include first run.""" + state_kvs = await KeyValueStore.open(storage_client=FileSystemStorageClient(), + configuration=Configuration(storage_dir=str(tmp_path))) + with ProcessPoolExecutor() as executor: # Crawl 2 requests in the first run and automatically persist the state. first_run_state = executor.submit( @@ -1789,7 +1798,10 @@ async def test_crawler_statistics_persistence(tmp_path: Path) -> None: requests=['https://a.placeholder.com', 'https://b.placeholder.com'], storage_dir=str(tmp_path), ).result() + # Expected state after first crawler run assert first_run_state.requests_finished == 2 + state = await state_kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY) + assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com'] # Do not reuse the executor to simulate a fresh process to avoid modified class attributes. with ProcessPoolExecutor() as executor: @@ -1797,8 +1809,15 @@ async def test_crawler_statistics_persistence(tmp_path: Path) -> None: second_run_state = executor.submit( _process_run_crawler, requests=['https://c.placeholder.com'], storage_dir=str(tmp_path) ).result() + + # Expected state after second crawler run + # 2 requests from first run and 1 request from second run. assert second_run_state.requests_finished == 3 + state = await state_kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY) + assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com', + 'https://c.placeholder.com'] + assert first_run_state.crawler_started_at == second_run_state.crawler_started_at assert first_run_state.crawler_finished_at assert second_run_state.crawler_finished_at From 31e16d2c0a6f4d75d8d2c0ba4a02493b85a9ec39 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Mon, 12 Jan 2026 15:03:59 +0100 Subject: [PATCH 2/5] Version 1: State depends on crawler_id, but stats does not. --- src/crawlee/crawlers/_basic/_basic_crawler.py | 14 +- tests/unit/conftest.py | 2 + .../test_adaptive_playwright_crawler.py | 4 +- .../crawlers/_basic/test_basic_crawler.py | 160 +++++++++++++++--- 4 files changed, 150 insertions(+), 30 deletions(-) diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index aff285c489..df93168603 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -266,6 +266,7 @@ class BasicCrawler(Generic[TCrawlingContext, TStatisticsState]): _CRAWLEE_STATE_KEY = 'CRAWLEE_STATE' _request_handler_timeout_text = 'Request handler timed out after' + __next_id = 0 def __init__( self, @@ -297,7 +298,7 @@ def __init__( status_message_logging_interval: timedelta = timedelta(seconds=10), status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]] | None = None, - id: int | None = None, + crawler_id: int | None = None, _context_pipeline: ContextPipeline[TCrawlingContext] | None = None, _additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None, _logger: logging.Logger | None = None, @@ -350,7 +351,7 @@ def __init__( status_message_logging_interval: Interval for logging the crawler status messages. status_message_callback: Allows overriding the default status message. The default status message is provided in the parameters. Returning `None` suppresses the status message. - id: Id of the crawler used for state and statistics tracking. You can use same explicit id to share same + crawler_id: Id of the crawler used for state and statistics tracking. You can use same explicit id to share state and statistics between two crawlers. By default, each crawler will use own state and statistics. _context_pipeline: Enables extending the request lifecycle and modifying the crawling context. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. @@ -359,6 +360,13 @@ def __init__( _logger: A logger instance, typically provided by a subclass, for consistent logging labels. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. """ + if crawler_id is None: + # This could look into set of already used ids, but lets not overengineer this. + self.id = BasicCrawler.__next_id + BasicCrawler.__next_id += 1 + else: + self.id = crawler_id + implicit_event_manager_with_explicit_config = False if not configuration: configuration = service_locator.get_configuration() @@ -834,7 +842,7 @@ async def _use_state( default_value: dict[str, JsonSerializable] | None = None, ) -> dict[str, JsonSerializable]: kvs = await self.get_key_value_store() - return await kvs.get_auto_saved_value(self._CRAWLEE_STATE_KEY, default_value) + return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self.id}', default_value) async def _save_crawler_state(self) -> None: store = await self.get_key_value_store() diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index ed8c4a720d..33c76d87a5 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -12,6 +12,7 @@ from uvicorn.config import Config from crawlee import service_locator +from crawlee.crawlers import BasicCrawler from crawlee.fingerprint_suite._browserforge_adapter import get_available_header_network from crawlee.http_clients import CurlImpersonateHttpClient, HttpxHttpClient, ImpitHttpClient from crawlee.proxy_configuration import ProxyInfo @@ -74,6 +75,7 @@ def _prepare_test_env() -> None: # Reset global class variables to ensure test isolation. KeyValueStore._autosaved_values = {} Statistics._Statistics__next_id = 0 # type:ignore[attr-defined] # Mangled attribute + BasicCrawler._BasicCrawler__next_id = 0 # type:ignore[attr-defined] # Mangled attribute return _prepare_test_env diff --git a/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py b/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py index 4aedeff2eb..11fa45dba9 100644 --- a/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py +++ b/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py @@ -381,7 +381,7 @@ async def test_adaptive_crawling_result_use_state_isolation( crawler = AdaptivePlaywrightCrawler.with_beautifulsoup_static_parser( rendering_type_predictor=static_only_predictor_enforce_detection, ) - await key_value_store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0}) + await key_value_store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0}) request_handler_calls = 0 @crawler.router.default_handler @@ -398,7 +398,7 @@ async def request_handler(context: AdaptivePlaywrightCrawlingContext) -> None: # Request handler was called twice assert request_handler_calls == 2 # Increment of global state happened only once - assert (await key_value_store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 1 + assert (await key_value_store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 1 async def test_adaptive_crawling_statistics(test_urls: list[str]) -> None: diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 7df6d7f849..b7d55f728b 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -810,11 +810,62 @@ async def handler(context: BasicCrawlingContext) -> None: await crawler.run(['https://hello.world']) kvs = await crawler.get_key_value_store() - value = await kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY) + value = await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') assert value == {'hello': 'world'} +async def test_context_use_state_crawlers_share_state() -> None: + async def handler(context: BasicCrawlingContext) -> None: + state = await context.use_state({'urls': []}) + assert isinstance(state['urls'], list) + state['urls'].append(context.request.url) + + crawler_1 = BasicCrawler(crawler_id=0, request_handler=handler) + crawler_2 = BasicCrawler(crawler_id=0, request_handler=handler) + + await crawler_1.run(['https://a.com']) + await crawler_2.run(['https://b.com']) + + kvs = await KeyValueStore.open() + assert crawler_1.id == crawler_2.id == 0 + assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_{crawler_1.id}') == { + 'urls': ['https://a.com', 'https://b.com'] + } + + +async def test_crawlers_share_stats() -> None: + async def handler(context: BasicCrawlingContext) -> None: + await context.use_state({'urls': []}) + + crawler_1 = BasicCrawler(crawler_id=0, request_handler=handler) + crawler_2 = BasicCrawler(crawler_id=0, request_handler=handler, statistics=crawler_1.statistics) + + result1 = await crawler_1.run(['https://a.com']) + result2 = await crawler_2.run(['https://b.com']) + + assert crawler_1.statistics == crawler_2.statistics + assert result1.requests_finished == 1 + assert result2.requests_finished == 2 + + +async def test_context_use_state_crawlers_own_state() -> None: + async def handler(context: BasicCrawlingContext) -> None: + state = await context.use_state({'urls': []}) + assert isinstance(state['urls'], list) + state['urls'].append(context.request.url) + + crawler_1 = BasicCrawler(request_handler=handler) + crawler_2 = BasicCrawler(request_handler=handler) + + await crawler_1.run(['https://a.com']) + await crawler_2.run(['https://b.com']) + + kvs = await KeyValueStore.open() + assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') == {'urls': ['https://a.com']} + assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') == {'urls': ['https://b.com']} + + async def test_context_handlers_use_state(key_value_store: KeyValueStore) -> None: state_in_handler_one: dict[str, JsonSerializable] = {} state_in_handler_two: dict[str, JsonSerializable] = {} @@ -855,7 +906,7 @@ async def handler_three(context: BasicCrawlingContext) -> None: store = await crawler.get_key_value_store() # The state in the KVS must match with the last set state - assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY)) == {'hello': 'last_world'} + assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')) == {'hello': 'last_world'} async def test_max_requests_per_crawl() -> None: @@ -1283,7 +1334,7 @@ async def test_context_use_state_race_condition_in_handlers(key_value_store: Key crawler = BasicCrawler() store = await crawler.get_key_value_store() - await store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0}) + await store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0}) handler_barrier = Barrier(2) @crawler.router.default_handler @@ -1298,7 +1349,7 @@ async def handler(context: BasicCrawlingContext) -> None: store = await crawler.get_key_value_store() # Ensure that local state is pushed back to kvs. await store.persist_autosaved_values() - assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 2 + assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 2 @pytest.mark.run_alone @@ -1750,16 +1801,10 @@ async def handler(context: BasicCrawlingContext) -> None: assert await unrelated_rq.fetch_next_request() == unrelated_request -async def _run_crawler(requests: list[str], storage_dir: str) -> StatisticsState: +async def _run_crawler(crawler_id: int | None, requests: list[str], storage_dir: str) -> StatisticsState: """Run crawler and return its statistics state. Must be defined like this to be pickable for ProcessPoolExecutor.""" - service_locator.set_configuration( - Configuration( - storage_dir=storage_dir, - purge_on_start=False, - ) - ) async def request_handler(context: BasicCrawlingContext) -> None: context.log.info(f'Processing {context.request.url} ...') @@ -1773,50 +1818,69 @@ async def request_handler(context: BasicCrawlingContext) -> None: crawler = BasicCrawler( request_handler=request_handler, concurrency_settings=ConcurrencySettings(max_concurrency=1, desired_concurrency=1), + configuration=Configuration( + storage_dir=storage_dir, + purge_on_start=False, + ), ) await crawler.run(requests) return crawler.statistics.state -def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsState: - return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir)) +@dataclass +class _CrawlerInput: + requests: list[str] + id: None | int = None + + +def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) -> list[StatisticsState]: + return [ + asyncio.run(_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir)) + for crawler_input in crawler_inputs + ] async def test_crawler_state_persistence(tmp_path: Path) -> None: - """Test that crawler statistics persist and are loaded correctly. + """Test that crawler statistics and state persist and are loaded correctly. This test simulates starting the crawler process twice, and checks that the statistics include first run.""" - state_kvs = await KeyValueStore.open(storage_client=FileSystemStorageClient(), - configuration=Configuration(storage_dir=str(tmp_path))) + state_kvs = await KeyValueStore.open( + storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path)) + ) with ProcessPoolExecutor() as executor: # Crawl 2 requests in the first run and automatically persist the state. first_run_state = executor.submit( - _process_run_crawler, - requests=['https://a.placeholder.com', 'https://b.placeholder.com'], + _process_run_crawlers, + crawler_inputs=[_CrawlerInput(requests=['https://a.placeholder.com', 'https://b.placeholder.com'])], storage_dir=str(tmp_path), - ).result() + ).result()[0] # Expected state after first crawler run assert first_run_state.requests_finished == 2 - state = await state_kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY) + state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com'] # Do not reuse the executor to simulate a fresh process to avoid modified class attributes. with ProcessPoolExecutor() as executor: # Crawl 1 additional requests in the second run, but use previously automatically persisted state. second_run_state = executor.submit( - _process_run_crawler, requests=['https://c.placeholder.com'], storage_dir=str(tmp_path) - ).result() + _process_run_crawlers, + crawler_inputs=[_CrawlerInput(requests=['https://c.placeholder.com'])], + storage_dir=str(tmp_path), + ).result()[0] # Expected state after second crawler run # 2 requests from first run and 1 request from second run. assert second_run_state.requests_finished == 3 - state = await state_kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY) - assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com', - 'https://c.placeholder.com'] + state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + assert state.get('urls') == [ + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://c.placeholder.com', + ] assert first_run_state.crawler_started_at == second_run_state.crawler_started_at assert first_run_state.crawler_finished_at @@ -1826,6 +1890,52 @@ async def test_crawler_state_persistence(tmp_path: Path) -> None: assert first_run_state.crawler_runtime < second_run_state.crawler_runtime +async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Path) -> None: + """Test that crawler statistics and state persist and are loaded correctly. + + This test simulates starting the crawler process twice, and checks that the statistics include first run. + Each time two distinct crawlers are running, and they should keep using their own statistics and state.""" + state_kvs = await KeyValueStore.open( + storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path)) + ) + + with ProcessPoolExecutor() as executor: + # Run 2 crawler, each crawl 1 request in and automatically persist the state. + first_run_states = executor.submit( + _process_run_crawlers, + crawler_inputs=[ + _CrawlerInput(requests=['https://a.placeholder.com']), + _CrawlerInput(requests=['https://c.placeholder.com']), + ], + storage_dir=str(tmp_path), + ).result() + # Expected state after first crawler run + assert first_run_states[0].requests_finished == 1 + assert first_run_states[1].requests_finished == 1 + state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + assert state_0.get('urls') == ['https://a.placeholder.com'] + state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') + assert state_1.get('urls') == ['https://c.placeholder.com'] + + with ProcessPoolExecutor() as executor: + # Run 2 crawler, each crawl 1 request in and automatically persist the state. + second_run_states = executor.submit( + _process_run_crawlers, + crawler_inputs=[ + _CrawlerInput(requests=['https://b.placeholder.com']), + _CrawlerInput(requests=['https://d.placeholder.com']), + ], + storage_dir=str(tmp_path), + ).result() + # Expected state after first crawler run + assert second_run_states[0].requests_finished == 2 + assert second_run_states[1].requests_finished == 2 + state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + assert state_0.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com'] + state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') + assert state_1.get('urls') == ['https://c.placeholder.com', 'https://d.placeholder.com'] + + async def test_crawler_intermediate_statistics() -> None: """Test that crawler statistics are correctly updating total runtime on every calculate call.""" crawler = BasicCrawler() From 1bbb651ff3863aff87059aabfbf6ce298d6b8d6c Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 13 Jan 2026 16:54:46 +0100 Subject: [PATCH 3/5] Draft of use_state as input argument Only for discussion, types ignored for now. --- .../_adaptive_playwright_crawler.py | 6 +-- src/crawlee/crawlers/_basic/_basic_crawler.py | 53 ++++++++++++------- tests/unit/conftest.py | 4 +- .../test_adaptive_playwright_crawler.py | 5 +- .../crawlers/_basic/test_basic_crawler.py | 47 ++++++++-------- 5 files changed, 66 insertions(+), 49 deletions(-) diff --git a/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py b/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py index fbbf811f11..fb5f11b5c7 100644 --- a/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py +++ b/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py @@ -43,8 +43,7 @@ from typing_extensions import Unpack - from crawlee.crawlers._basic._basic_crawler import _BasicCrawlerOptions - +from crawlee.crawlers._basic._basic_crawler import _BasicCrawlerOptions, _DefaultUseState TStaticParseResult = TypeVar('TStaticParseResult') TStaticSelectResult = TypeVar('TStaticSelectResult') @@ -389,7 +388,8 @@ async def _run_request_handler(self, context: BasicCrawlingContext) -> None: # (This static crawl is performed only to evaluate rendering type detection.) kvs = await context.get_key_value_store() default_value = dict[str, JsonSerializable]() - old_state: dict[str, JsonSerializable] = await kvs.get_value(self._CRAWLEE_STATE_KEY, default_value) + # This was fragile even before. Out of scope for draft + old_state: dict[str, JsonSerializable] = await kvs.get_value(_DefaultUseState._CRAWLEE_STATE_KEY, default_value) old_state_copy = deepcopy(old_state) pw_run = await self._crawl_one('client only', context=context) diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index df93168603..177da92ff4 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -10,7 +10,7 @@ import threading import traceback from asyncio import CancelledError -from collections.abc import AsyncGenerator, Awaitable, Callable, Iterable, Sequence +from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine, Iterable, Sequence from contextlib import AsyncExitStack, suppress from datetime import timedelta from functools import partial @@ -42,6 +42,7 @@ RequestHandlerRunResult, SendRequestFunction, SkippedReason, + UseStateFunction, ) from crawlee._utils.docs import docs_group from crawlee._utils.file import atomic_write, export_csv_to_stream, export_json_to_stream @@ -239,6 +240,29 @@ class BasicCrawlerOptions( """ +class _DefaultUseState: + _next_state_id = 0 + _CRAWLEE_STATE_KEY = 'CRAWLEE_STATE' + + def __init__(self, get_key_value_store: Awaitable[KeyValueStore]) -> None: + self._get_key_value_store = get_key_value_store + self._id = self._next_state_id + _DefaultUseState._next_state_id += 1 + + async def _use_state( + self, + default_value: dict[str, JsonSerializable] | None = None, + ) -> dict[str, JsonSerializable]: + kvs = await self._get_key_value_store() + return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self._id}', default_value) + + def __call__( + self, + default_value: dict[str, JsonSerializable] | None = None, + ) -> Coroutine[None, None, dict[str, JsonSerializable]]: + return self._use_state(default_value) + + @docs_group('Crawlers') class BasicCrawler(Generic[TCrawlingContext, TStatisticsState]): """A basic web crawler providing a framework for crawling websites. @@ -264,7 +288,6 @@ class BasicCrawler(Generic[TCrawlingContext, TStatisticsState]): - and more. """ - _CRAWLEE_STATE_KEY = 'CRAWLEE_STATE' _request_handler_timeout_text = 'Request handler timed out after' __next_id = 0 @@ -298,7 +321,7 @@ def __init__( status_message_logging_interval: timedelta = timedelta(seconds=10), status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]] | None = None, - crawler_id: int | None = None, + use_state: UseStateFunction | None = None, _context_pipeline: ContextPipeline[TCrawlingContext] | None = None, _additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None, _logger: logging.Logger | None = None, @@ -351,8 +374,8 @@ def __init__( status_message_logging_interval: Interval for logging the crawler status messages. status_message_callback: Allows overriding the default status message. The default status message is provided in the parameters. Returning `None` suppresses the status message. - crawler_id: Id of the crawler used for state and statistics tracking. You can use same explicit id to share - state and statistics between two crawlers. By default, each crawler will use own state and statistics. + use_state: Callback used to access shared state. Use only for custom state implementation, for example when + you want to share state between two different crawlers. _context_pipeline: Enables extending the request lifecycle and modifying the crawling context. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. _additional_context_managers: Additional context managers used throughout the crawler lifecycle. @@ -360,13 +383,6 @@ def __init__( _logger: A logger instance, typically provided by a subclass, for consistent logging labels. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. """ - if crawler_id is None: - # This could look into set of already used ids, but lets not overengineer this. - self.id = BasicCrawler.__next_id - BasicCrawler.__next_id += 1 - else: - self.id = crawler_id - implicit_event_manager_with_explicit_config = False if not configuration: configuration = service_locator.get_configuration() @@ -442,6 +458,12 @@ def __init__( self._use_session_pool = use_session_pool self._retry_on_blocked = retry_on_blocked + # Set use state + if use_state: + self._use_state = use_state + else: + self._use_state = _DefaultUseState(get_key_value_store=self.get_key_value_store) + # Logging setup if configure_logging: root_logger = logging.getLogger() @@ -837,13 +859,6 @@ async def add_requests( wait_for_all_requests_to_be_added_timeout=wait_for_all_requests_to_be_added_timeout, ) - async def _use_state( - self, - default_value: dict[str, JsonSerializable] | None = None, - ) -> dict[str, JsonSerializable]: - kvs = await self.get_key_value_store() - return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self.id}', default_value) - async def _save_crawler_state(self) -> None: store = await self.get_key_value_store() await store.persist_autosaved_values() diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 33c76d87a5..71e7cd0c7d 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -12,7 +12,7 @@ from uvicorn.config import Config from crawlee import service_locator -from crawlee.crawlers import BasicCrawler +from crawlee.crawlers._basic._basic_crawler import _DefaultUseState from crawlee.fingerprint_suite._browserforge_adapter import get_available_header_network from crawlee.http_clients import CurlImpersonateHttpClient, HttpxHttpClient, ImpitHttpClient from crawlee.proxy_configuration import ProxyInfo @@ -75,7 +75,7 @@ def _prepare_test_env() -> None: # Reset global class variables to ensure test isolation. KeyValueStore._autosaved_values = {} Statistics._Statistics__next_id = 0 # type:ignore[attr-defined] # Mangled attribute - BasicCrawler._BasicCrawler__next_id = 0 # type:ignore[attr-defined] # Mangled attribute + _DefaultUseState._next_state_id = 0 # type:ignore[attr-defined] # Mangled attribute return _prepare_test_env diff --git a/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py b/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py index 11fa45dba9..28fcd2811f 100644 --- a/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py +++ b/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py @@ -19,7 +19,6 @@ AdaptivePlaywrightCrawler, AdaptivePlaywrightCrawlingContext, AdaptivePlaywrightPreNavCrawlingContext, - BasicCrawler, RenderingType, RenderingTypePrediction, RenderingTypePredictor, @@ -381,7 +380,7 @@ async def test_adaptive_crawling_result_use_state_isolation( crawler = AdaptivePlaywrightCrawler.with_beautifulsoup_static_parser( rendering_type_predictor=static_only_predictor_enforce_detection, ) - await key_value_store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0}) + await key_value_store.set_value('CRAWLEE_STATE_0', {'counter': 0}) request_handler_calls = 0 @crawler.router.default_handler @@ -398,7 +397,7 @@ async def request_handler(context: AdaptivePlaywrightCrawlingContext) -> None: # Request handler was called twice assert request_handler_calls == 2 # Increment of global state happened only once - assert (await key_value_store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 1 + assert (await key_value_store.get_value('CRAWLEE_STATE_0'))['counter'] == 1 async def test_adaptive_crawling_statistics(test_urls: list[str]) -> None: diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index b7d55f728b..fd992ae441 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -810,36 +810,39 @@ async def handler(context: BasicCrawlingContext) -> None: await crawler.run(['https://hello.world']) kvs = await crawler.get_key_value_store() - value = await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + value = await kvs.get_value('CRAWLEE_STATE_0') assert value == {'hello': 'world'} -async def test_context_use_state_crawlers_share_state() -> None: +async def test_context_use_state_crawlers_share_custom_state() -> None: + custom_state_dict = {} + + async def custom_use_state(default_state: dict[str, JsonSerializable]) -> dict[str, JsonSerializable]: + if not custom_state_dict: + custom_state_dict.update(default_state) + return custom_state_dict + async def handler(context: BasicCrawlingContext) -> None: state = await context.use_state({'urls': []}) assert isinstance(state['urls'], list) state['urls'].append(context.request.url) - crawler_1 = BasicCrawler(crawler_id=0, request_handler=handler) - crawler_2 = BasicCrawler(crawler_id=0, request_handler=handler) + crawler_1 = BasicCrawler(use_state=custom_use_state, request_handler=handler) + crawler_2 = BasicCrawler(use_state=custom_use_state, request_handler=handler) await crawler_1.run(['https://a.com']) await crawler_2.run(['https://b.com']) - kvs = await KeyValueStore.open() - assert crawler_1.id == crawler_2.id == 0 - assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_{crawler_1.id}') == { - 'urls': ['https://a.com', 'https://b.com'] - } + assert custom_state_dict == {'urls': ['https://a.com', 'https://b.com']} async def test_crawlers_share_stats() -> None: async def handler(context: BasicCrawlingContext) -> None: await context.use_state({'urls': []}) - crawler_1 = BasicCrawler(crawler_id=0, request_handler=handler) - crawler_2 = BasicCrawler(crawler_id=0, request_handler=handler, statistics=crawler_1.statistics) + crawler_1 = BasicCrawler(request_handler=handler) + crawler_2 = BasicCrawler(request_handler=handler, statistics=crawler_1.statistics) result1 = await crawler_1.run(['https://a.com']) result2 = await crawler_2.run(['https://b.com']) @@ -862,8 +865,8 @@ async def handler(context: BasicCrawlingContext) -> None: await crawler_2.run(['https://b.com']) kvs = await KeyValueStore.open() - assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') == {'urls': ['https://a.com']} - assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') == {'urls': ['https://b.com']} + assert await kvs.get_value('CRAWLEE_STATE_0') == {'urls': ['https://a.com']} + assert await kvs.get_value('CRAWLEE_STATE_1') == {'urls': ['https://b.com']} async def test_context_handlers_use_state(key_value_store: KeyValueStore) -> None: @@ -906,7 +909,7 @@ async def handler_three(context: BasicCrawlingContext) -> None: store = await crawler.get_key_value_store() # The state in the KVS must match with the last set state - assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')) == {'hello': 'last_world'} + assert (await store.get_value('CRAWLEE_STATE_0')) == {'hello': 'last_world'} async def test_max_requests_per_crawl() -> None: @@ -1334,7 +1337,7 @@ async def test_context_use_state_race_condition_in_handlers(key_value_store: Key crawler = BasicCrawler() store = await crawler.get_key_value_store() - await store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0}) + await store.set_value('CRAWLEE_STATE_0', {'counter': 0}) handler_barrier = Barrier(2) @crawler.router.default_handler @@ -1349,7 +1352,7 @@ async def handler(context: BasicCrawlingContext) -> None: store = await crawler.get_key_value_store() # Ensure that local state is pushed back to kvs. await store.persist_autosaved_values() - assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 2 + assert (await store.get_value('CRAWLEE_STATE_0'))['counter'] == 2 @pytest.mark.run_alone @@ -1859,7 +1862,7 @@ async def test_crawler_state_persistence(tmp_path: Path) -> None: ).result()[0] # Expected state after first crawler run assert first_run_state.requests_finished == 2 - state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + state = await state_kvs.get_value('CRAWLEE_STATE_0') assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com'] # Do not reuse the executor to simulate a fresh process to avoid modified class attributes. @@ -1875,7 +1878,7 @@ async def test_crawler_state_persistence(tmp_path: Path) -> None: # 2 requests from first run and 1 request from second run. assert second_run_state.requests_finished == 3 - state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + state = await state_kvs.get_value('CRAWLEE_STATE_0') assert state.get('urls') == [ 'https://a.placeholder.com', 'https://b.placeholder.com', @@ -1912,9 +1915,9 @@ async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Pat # Expected state after first crawler run assert first_run_states[0].requests_finished == 1 assert first_run_states[1].requests_finished == 1 - state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + state_0 = await state_kvs.get_value('CRAWLEE_STATE_0') assert state_0.get('urls') == ['https://a.placeholder.com'] - state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') + state_1 = await state_kvs.get_value('CRAWLEE_STATE_1') assert state_1.get('urls') == ['https://c.placeholder.com'] with ProcessPoolExecutor() as executor: @@ -1930,9 +1933,9 @@ async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Pat # Expected state after first crawler run assert second_run_states[0].requests_finished == 2 assert second_run_states[1].requests_finished == 2 - state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + state_0 = await state_kvs.get_value('CRAWLEE_STATE_0') assert state_0.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com'] - state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') + state_1 = await state_kvs.get_value('CRAWLEE_STATE_1') assert state_1.get('urls') == ['https://c.placeholder.com', 'https://d.placeholder.com'] From 415299f217475f51003ca0541e13176418c4cf40 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 15 Jan 2026 12:57:00 +0100 Subject: [PATCH 4/5] Revert "Draft of use_state as input argument" This reverts commit 1bbb651ff3863aff87059aabfbf6ce298d6b8d6c. --- .../_adaptive_playwright_crawler.py | 6 +-- src/crawlee/crawlers/_basic/_basic_crawler.py | 53 +++++++------------ tests/unit/conftest.py | 4 +- .../test_adaptive_playwright_crawler.py | 5 +- .../crawlers/_basic/test_basic_crawler.py | 47 ++++++++-------- 5 files changed, 49 insertions(+), 66 deletions(-) diff --git a/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py b/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py index fb5f11b5c7..fbbf811f11 100644 --- a/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py +++ b/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py @@ -43,7 +43,8 @@ from typing_extensions import Unpack -from crawlee.crawlers._basic._basic_crawler import _BasicCrawlerOptions, _DefaultUseState + from crawlee.crawlers._basic._basic_crawler import _BasicCrawlerOptions + TStaticParseResult = TypeVar('TStaticParseResult') TStaticSelectResult = TypeVar('TStaticSelectResult') @@ -388,8 +389,7 @@ async def _run_request_handler(self, context: BasicCrawlingContext) -> None: # (This static crawl is performed only to evaluate rendering type detection.) kvs = await context.get_key_value_store() default_value = dict[str, JsonSerializable]() - # This was fragile even before. Out of scope for draft - old_state: dict[str, JsonSerializable] = await kvs.get_value(_DefaultUseState._CRAWLEE_STATE_KEY, default_value) + old_state: dict[str, JsonSerializable] = await kvs.get_value(self._CRAWLEE_STATE_KEY, default_value) old_state_copy = deepcopy(old_state) pw_run = await self._crawl_one('client only', context=context) diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index 177da92ff4..df93168603 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -10,7 +10,7 @@ import threading import traceback from asyncio import CancelledError -from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine, Iterable, Sequence +from collections.abc import AsyncGenerator, Awaitable, Callable, Iterable, Sequence from contextlib import AsyncExitStack, suppress from datetime import timedelta from functools import partial @@ -42,7 +42,6 @@ RequestHandlerRunResult, SendRequestFunction, SkippedReason, - UseStateFunction, ) from crawlee._utils.docs import docs_group from crawlee._utils.file import atomic_write, export_csv_to_stream, export_json_to_stream @@ -240,29 +239,6 @@ class BasicCrawlerOptions( """ -class _DefaultUseState: - _next_state_id = 0 - _CRAWLEE_STATE_KEY = 'CRAWLEE_STATE' - - def __init__(self, get_key_value_store: Awaitable[KeyValueStore]) -> None: - self._get_key_value_store = get_key_value_store - self._id = self._next_state_id - _DefaultUseState._next_state_id += 1 - - async def _use_state( - self, - default_value: dict[str, JsonSerializable] | None = None, - ) -> dict[str, JsonSerializable]: - kvs = await self._get_key_value_store() - return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self._id}', default_value) - - def __call__( - self, - default_value: dict[str, JsonSerializable] | None = None, - ) -> Coroutine[None, None, dict[str, JsonSerializable]]: - return self._use_state(default_value) - - @docs_group('Crawlers') class BasicCrawler(Generic[TCrawlingContext, TStatisticsState]): """A basic web crawler providing a framework for crawling websites. @@ -288,6 +264,7 @@ class BasicCrawler(Generic[TCrawlingContext, TStatisticsState]): - and more. """ + _CRAWLEE_STATE_KEY = 'CRAWLEE_STATE' _request_handler_timeout_text = 'Request handler timed out after' __next_id = 0 @@ -321,7 +298,7 @@ def __init__( status_message_logging_interval: timedelta = timedelta(seconds=10), status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]] | None = None, - use_state: UseStateFunction | None = None, + crawler_id: int | None = None, _context_pipeline: ContextPipeline[TCrawlingContext] | None = None, _additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None, _logger: logging.Logger | None = None, @@ -374,8 +351,8 @@ def __init__( status_message_logging_interval: Interval for logging the crawler status messages. status_message_callback: Allows overriding the default status message. The default status message is provided in the parameters. Returning `None` suppresses the status message. - use_state: Callback used to access shared state. Use only for custom state implementation, for example when - you want to share state between two different crawlers. + crawler_id: Id of the crawler used for state and statistics tracking. You can use same explicit id to share + state and statistics between two crawlers. By default, each crawler will use own state and statistics. _context_pipeline: Enables extending the request lifecycle and modifying the crawling context. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. _additional_context_managers: Additional context managers used throughout the crawler lifecycle. @@ -383,6 +360,13 @@ def __init__( _logger: A logger instance, typically provided by a subclass, for consistent logging labels. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. """ + if crawler_id is None: + # This could look into set of already used ids, but lets not overengineer this. + self.id = BasicCrawler.__next_id + BasicCrawler.__next_id += 1 + else: + self.id = crawler_id + implicit_event_manager_with_explicit_config = False if not configuration: configuration = service_locator.get_configuration() @@ -458,12 +442,6 @@ def __init__( self._use_session_pool = use_session_pool self._retry_on_blocked = retry_on_blocked - # Set use state - if use_state: - self._use_state = use_state - else: - self._use_state = _DefaultUseState(get_key_value_store=self.get_key_value_store) - # Logging setup if configure_logging: root_logger = logging.getLogger() @@ -859,6 +837,13 @@ async def add_requests( wait_for_all_requests_to_be_added_timeout=wait_for_all_requests_to_be_added_timeout, ) + async def _use_state( + self, + default_value: dict[str, JsonSerializable] | None = None, + ) -> dict[str, JsonSerializable]: + kvs = await self.get_key_value_store() + return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self.id}', default_value) + async def _save_crawler_state(self) -> None: store = await self.get_key_value_store() await store.persist_autosaved_values() diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 71e7cd0c7d..33c76d87a5 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -12,7 +12,7 @@ from uvicorn.config import Config from crawlee import service_locator -from crawlee.crawlers._basic._basic_crawler import _DefaultUseState +from crawlee.crawlers import BasicCrawler from crawlee.fingerprint_suite._browserforge_adapter import get_available_header_network from crawlee.http_clients import CurlImpersonateHttpClient, HttpxHttpClient, ImpitHttpClient from crawlee.proxy_configuration import ProxyInfo @@ -75,7 +75,7 @@ def _prepare_test_env() -> None: # Reset global class variables to ensure test isolation. KeyValueStore._autosaved_values = {} Statistics._Statistics__next_id = 0 # type:ignore[attr-defined] # Mangled attribute - _DefaultUseState._next_state_id = 0 # type:ignore[attr-defined] # Mangled attribute + BasicCrawler._BasicCrawler__next_id = 0 # type:ignore[attr-defined] # Mangled attribute return _prepare_test_env diff --git a/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py b/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py index 28fcd2811f..11fa45dba9 100644 --- a/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py +++ b/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py @@ -19,6 +19,7 @@ AdaptivePlaywrightCrawler, AdaptivePlaywrightCrawlingContext, AdaptivePlaywrightPreNavCrawlingContext, + BasicCrawler, RenderingType, RenderingTypePrediction, RenderingTypePredictor, @@ -380,7 +381,7 @@ async def test_adaptive_crawling_result_use_state_isolation( crawler = AdaptivePlaywrightCrawler.with_beautifulsoup_static_parser( rendering_type_predictor=static_only_predictor_enforce_detection, ) - await key_value_store.set_value('CRAWLEE_STATE_0', {'counter': 0}) + await key_value_store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0}) request_handler_calls = 0 @crawler.router.default_handler @@ -397,7 +398,7 @@ async def request_handler(context: AdaptivePlaywrightCrawlingContext) -> None: # Request handler was called twice assert request_handler_calls == 2 # Increment of global state happened only once - assert (await key_value_store.get_value('CRAWLEE_STATE_0'))['counter'] == 1 + assert (await key_value_store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 1 async def test_adaptive_crawling_statistics(test_urls: list[str]) -> None: diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index fd992ae441..b7d55f728b 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -810,39 +810,36 @@ async def handler(context: BasicCrawlingContext) -> None: await crawler.run(['https://hello.world']) kvs = await crawler.get_key_value_store() - value = await kvs.get_value('CRAWLEE_STATE_0') + value = await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') assert value == {'hello': 'world'} -async def test_context_use_state_crawlers_share_custom_state() -> None: - custom_state_dict = {} - - async def custom_use_state(default_state: dict[str, JsonSerializable]) -> dict[str, JsonSerializable]: - if not custom_state_dict: - custom_state_dict.update(default_state) - return custom_state_dict - +async def test_context_use_state_crawlers_share_state() -> None: async def handler(context: BasicCrawlingContext) -> None: state = await context.use_state({'urls': []}) assert isinstance(state['urls'], list) state['urls'].append(context.request.url) - crawler_1 = BasicCrawler(use_state=custom_use_state, request_handler=handler) - crawler_2 = BasicCrawler(use_state=custom_use_state, request_handler=handler) + crawler_1 = BasicCrawler(crawler_id=0, request_handler=handler) + crawler_2 = BasicCrawler(crawler_id=0, request_handler=handler) await crawler_1.run(['https://a.com']) await crawler_2.run(['https://b.com']) - assert custom_state_dict == {'urls': ['https://a.com', 'https://b.com']} + kvs = await KeyValueStore.open() + assert crawler_1.id == crawler_2.id == 0 + assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_{crawler_1.id}') == { + 'urls': ['https://a.com', 'https://b.com'] + } async def test_crawlers_share_stats() -> None: async def handler(context: BasicCrawlingContext) -> None: await context.use_state({'urls': []}) - crawler_1 = BasicCrawler(request_handler=handler) - crawler_2 = BasicCrawler(request_handler=handler, statistics=crawler_1.statistics) + crawler_1 = BasicCrawler(crawler_id=0, request_handler=handler) + crawler_2 = BasicCrawler(crawler_id=0, request_handler=handler, statistics=crawler_1.statistics) result1 = await crawler_1.run(['https://a.com']) result2 = await crawler_2.run(['https://b.com']) @@ -865,8 +862,8 @@ async def handler(context: BasicCrawlingContext) -> None: await crawler_2.run(['https://b.com']) kvs = await KeyValueStore.open() - assert await kvs.get_value('CRAWLEE_STATE_0') == {'urls': ['https://a.com']} - assert await kvs.get_value('CRAWLEE_STATE_1') == {'urls': ['https://b.com']} + assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') == {'urls': ['https://a.com']} + assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') == {'urls': ['https://b.com']} async def test_context_handlers_use_state(key_value_store: KeyValueStore) -> None: @@ -909,7 +906,7 @@ async def handler_three(context: BasicCrawlingContext) -> None: store = await crawler.get_key_value_store() # The state in the KVS must match with the last set state - assert (await store.get_value('CRAWLEE_STATE_0')) == {'hello': 'last_world'} + assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')) == {'hello': 'last_world'} async def test_max_requests_per_crawl() -> None: @@ -1337,7 +1334,7 @@ async def test_context_use_state_race_condition_in_handlers(key_value_store: Key crawler = BasicCrawler() store = await crawler.get_key_value_store() - await store.set_value('CRAWLEE_STATE_0', {'counter': 0}) + await store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0}) handler_barrier = Barrier(2) @crawler.router.default_handler @@ -1352,7 +1349,7 @@ async def handler(context: BasicCrawlingContext) -> None: store = await crawler.get_key_value_store() # Ensure that local state is pushed back to kvs. await store.persist_autosaved_values() - assert (await store.get_value('CRAWLEE_STATE_0'))['counter'] == 2 + assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 2 @pytest.mark.run_alone @@ -1862,7 +1859,7 @@ async def test_crawler_state_persistence(tmp_path: Path) -> None: ).result()[0] # Expected state after first crawler run assert first_run_state.requests_finished == 2 - state = await state_kvs.get_value('CRAWLEE_STATE_0') + state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com'] # Do not reuse the executor to simulate a fresh process to avoid modified class attributes. @@ -1878,7 +1875,7 @@ async def test_crawler_state_persistence(tmp_path: Path) -> None: # 2 requests from first run and 1 request from second run. assert second_run_state.requests_finished == 3 - state = await state_kvs.get_value('CRAWLEE_STATE_0') + state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') assert state.get('urls') == [ 'https://a.placeholder.com', 'https://b.placeholder.com', @@ -1915,9 +1912,9 @@ async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Pat # Expected state after first crawler run assert first_run_states[0].requests_finished == 1 assert first_run_states[1].requests_finished == 1 - state_0 = await state_kvs.get_value('CRAWLEE_STATE_0') + state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') assert state_0.get('urls') == ['https://a.placeholder.com'] - state_1 = await state_kvs.get_value('CRAWLEE_STATE_1') + state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') assert state_1.get('urls') == ['https://c.placeholder.com'] with ProcessPoolExecutor() as executor: @@ -1933,9 +1930,9 @@ async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Pat # Expected state after first crawler run assert second_run_states[0].requests_finished == 2 assert second_run_states[1].requests_finished == 2 - state_0 = await state_kvs.get_value('CRAWLEE_STATE_0') + state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') assert state_0.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com'] - state_1 = await state_kvs.get_value('CRAWLEE_STATE_1') + state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') assert state_1.get('urls') == ['https://c.placeholder.com', 'https://d.placeholder.com'] From 01335f0d3b258d8b1899c18b73a06302ea3363b5 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 15 Jan 2026 13:01:51 +0100 Subject: [PATCH 5/5] Rename `crawler_id` to just `id`. Polish. --- src/crawlee/crawlers/_basic/_basic_crawler.py | 19 +++++++++++-------- .../crawlers/_basic/test_basic_crawler.py | 14 +++++++------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index df93168603..f1c8c6904a 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -213,6 +213,10 @@ class _BasicCrawlerOptions(TypedDict): """Allows overriding the default status message. The default status message is provided in the parameters. Returning `None` suppresses the status message.""" + id: NotRequired[int] + """Id of the crawler used for state tracking. You can use same explicit id to share state between two crawlers. + By default, each crawler will use own state.""" + class _BasicCrawlerOptionsGeneric(TypedDict, Generic[TCrawlingContext, TStatisticsState]): """Generic options the `BasicCrawler` constructor.""" @@ -298,7 +302,7 @@ def __init__( status_message_logging_interval: timedelta = timedelta(seconds=10), status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]] | None = None, - crawler_id: int | None = None, + id: int | None = None, _context_pipeline: ContextPipeline[TCrawlingContext] | None = None, _additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None, _logger: logging.Logger | None = None, @@ -351,8 +355,8 @@ def __init__( status_message_logging_interval: Interval for logging the crawler status messages. status_message_callback: Allows overriding the default status message. The default status message is provided in the parameters. Returning `None` suppresses the status message. - crawler_id: Id of the crawler used for state and statistics tracking. You can use same explicit id to share - state and statistics between two crawlers. By default, each crawler will use own state and statistics. + id: Id of the crawler used for state tracking. You can use same explicit id to share state and between two + crawlers. By default, each crawler will use own state. _context_pipeline: Enables extending the request lifecycle and modifying the crawling context. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. _additional_context_managers: Additional context managers used throughout the crawler lifecycle. @@ -360,12 +364,11 @@ def __init__( _logger: A logger instance, typically provided by a subclass, for consistent logging labels. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. """ - if crawler_id is None: - # This could look into set of already used ids, but lets not overengineer this. - self.id = BasicCrawler.__next_id + if id is None: + self._id = BasicCrawler.__next_id BasicCrawler.__next_id += 1 else: - self.id = crawler_id + self._id = id implicit_event_manager_with_explicit_config = False if not configuration: @@ -842,7 +845,7 @@ async def _use_state( default_value: dict[str, JsonSerializable] | None = None, ) -> dict[str, JsonSerializable]: kvs = await self.get_key_value_store() - return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self.id}', default_value) + return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self._id}', default_value) async def _save_crawler_state(self) -> None: store = await self.get_key_value_store() diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index b7d55f728b..5385d4389c 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -821,15 +821,15 @@ async def handler(context: BasicCrawlingContext) -> None: assert isinstance(state['urls'], list) state['urls'].append(context.request.url) - crawler_1 = BasicCrawler(crawler_id=0, request_handler=handler) - crawler_2 = BasicCrawler(crawler_id=0, request_handler=handler) + crawler_1 = BasicCrawler(id=0, request_handler=handler) + crawler_2 = BasicCrawler(id=0, request_handler=handler) await crawler_1.run(['https://a.com']) await crawler_2.run(['https://b.com']) kvs = await KeyValueStore.open() - assert crawler_1.id == crawler_2.id == 0 - assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_{crawler_1.id}') == { + assert crawler_1._id == crawler_2._id == 0 + assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_{crawler_1._id}') == { 'urls': ['https://a.com', 'https://b.com'] } @@ -838,8 +838,8 @@ async def test_crawlers_share_stats() -> None: async def handler(context: BasicCrawlingContext) -> None: await context.use_state({'urls': []}) - crawler_1 = BasicCrawler(crawler_id=0, request_handler=handler) - crawler_2 = BasicCrawler(crawler_id=0, request_handler=handler, statistics=crawler_1.statistics) + crawler_1 = BasicCrawler(id=0, request_handler=handler) + crawler_2 = BasicCrawler(id=0, request_handler=handler, statistics=crawler_1.statistics) result1 = await crawler_1.run(['https://a.com']) result2 = await crawler_2.run(['https://b.com']) @@ -1722,7 +1722,7 @@ async def test_add_requests_with_rq_param(queue_name: str | None, queue_alias: s crawler = BasicCrawler() rq = await RequestQueue.open(name=queue_name, alias=queue_alias) if by_id: - queue_id = rq.id + queue_id = rq._id queue_name = None else: queue_id = None