diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index be489f9c9a..f1c8c6904a 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -213,6 +213,10 @@ class _BasicCrawlerOptions(TypedDict): """Allows overriding the default status message. The default status message is provided in the parameters. Returning `None` suppresses the status message.""" + id: NotRequired[int] + """Id of the crawler used for state tracking. You can use same explicit id to share state between two crawlers. + By default, each crawler will use own state.""" + class _BasicCrawlerOptionsGeneric(TypedDict, Generic[TCrawlingContext, TStatisticsState]): """Generic options the `BasicCrawler` constructor.""" @@ -266,6 +270,7 @@ class BasicCrawler(Generic[TCrawlingContext, TStatisticsState]): _CRAWLEE_STATE_KEY = 'CRAWLEE_STATE' _request_handler_timeout_text = 'Request handler timed out after' + __next_id = 0 def __init__( self, @@ -297,6 +302,7 @@ def __init__( status_message_logging_interval: timedelta = timedelta(seconds=10), status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]] | None = None, + id: int | None = None, _context_pipeline: ContextPipeline[TCrawlingContext] | None = None, _additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None, _logger: logging.Logger | None = None, @@ -349,6 +355,8 @@ def __init__( status_message_logging_interval: Interval for logging the crawler status messages. status_message_callback: Allows overriding the default status message. The default status message is provided in the parameters. Returning `None` suppresses the status message. + id: Id of the crawler used for state tracking. You can use same explicit id to share state and between two + crawlers. By default, each crawler will use own state. _context_pipeline: Enables extending the request lifecycle and modifying the crawling context. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. _additional_context_managers: Additional context managers used throughout the crawler lifecycle. @@ -356,6 +364,12 @@ def __init__( _logger: A logger instance, typically provided by a subclass, for consistent logging labels. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. """ + if id is None: + self._id = BasicCrawler.__next_id + BasicCrawler.__next_id += 1 + else: + self._id = id + implicit_event_manager_with_explicit_config = False if not configuration: configuration = service_locator.get_configuration() @@ -831,7 +845,7 @@ async def _use_state( default_value: dict[str, JsonSerializable] | None = None, ) -> dict[str, JsonSerializable]: kvs = await self.get_key_value_store() - return await kvs.get_auto_saved_value(self._CRAWLEE_STATE_KEY, default_value) + return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self._id}', default_value) async def _save_crawler_state(self) -> None: store = await self.get_key_value_store() diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index ed8c4a720d..33c76d87a5 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -12,6 +12,7 @@ from uvicorn.config import Config from crawlee import service_locator +from crawlee.crawlers import BasicCrawler from crawlee.fingerprint_suite._browserforge_adapter import get_available_header_network from crawlee.http_clients import CurlImpersonateHttpClient, HttpxHttpClient, ImpitHttpClient from crawlee.proxy_configuration import ProxyInfo @@ -74,6 +75,7 @@ def _prepare_test_env() -> None: # Reset global class variables to ensure test isolation. KeyValueStore._autosaved_values = {} Statistics._Statistics__next_id = 0 # type:ignore[attr-defined] # Mangled attribute + BasicCrawler._BasicCrawler__next_id = 0 # type:ignore[attr-defined] # Mangled attribute return _prepare_test_env diff --git a/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py b/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py index 4aedeff2eb..11fa45dba9 100644 --- a/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py +++ b/tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py @@ -381,7 +381,7 @@ async def test_adaptive_crawling_result_use_state_isolation( crawler = AdaptivePlaywrightCrawler.with_beautifulsoup_static_parser( rendering_type_predictor=static_only_predictor_enforce_detection, ) - await key_value_store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0}) + await key_value_store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0}) request_handler_calls = 0 @crawler.router.default_handler @@ -398,7 +398,7 @@ async def request_handler(context: AdaptivePlaywrightCrawlingContext) -> None: # Request handler was called twice assert request_handler_calls == 2 # Increment of global state happened only once - assert (await key_value_store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 1 + assert (await key_value_store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 1 async def test_adaptive_crawling_statistics(test_urls: list[str]) -> None: diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 5800e59d4e..5385d4389c 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -810,11 +810,62 @@ async def handler(context: BasicCrawlingContext) -> None: await crawler.run(['https://hello.world']) kvs = await crawler.get_key_value_store() - value = await kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY) + value = await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') assert value == {'hello': 'world'} +async def test_context_use_state_crawlers_share_state() -> None: + async def handler(context: BasicCrawlingContext) -> None: + state = await context.use_state({'urls': []}) + assert isinstance(state['urls'], list) + state['urls'].append(context.request.url) + + crawler_1 = BasicCrawler(id=0, request_handler=handler) + crawler_2 = BasicCrawler(id=0, request_handler=handler) + + await crawler_1.run(['https://a.com']) + await crawler_2.run(['https://b.com']) + + kvs = await KeyValueStore.open() + assert crawler_1._id == crawler_2._id == 0 + assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_{crawler_1._id}') == { + 'urls': ['https://a.com', 'https://b.com'] + } + + +async def test_crawlers_share_stats() -> None: + async def handler(context: BasicCrawlingContext) -> None: + await context.use_state({'urls': []}) + + crawler_1 = BasicCrawler(id=0, request_handler=handler) + crawler_2 = BasicCrawler(id=0, request_handler=handler, statistics=crawler_1.statistics) + + result1 = await crawler_1.run(['https://a.com']) + result2 = await crawler_2.run(['https://b.com']) + + assert crawler_1.statistics == crawler_2.statistics + assert result1.requests_finished == 1 + assert result2.requests_finished == 2 + + +async def test_context_use_state_crawlers_own_state() -> None: + async def handler(context: BasicCrawlingContext) -> None: + state = await context.use_state({'urls': []}) + assert isinstance(state['urls'], list) + state['urls'].append(context.request.url) + + crawler_1 = BasicCrawler(request_handler=handler) + crawler_2 = BasicCrawler(request_handler=handler) + + await crawler_1.run(['https://a.com']) + await crawler_2.run(['https://b.com']) + + kvs = await KeyValueStore.open() + assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') == {'urls': ['https://a.com']} + assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') == {'urls': ['https://b.com']} + + async def test_context_handlers_use_state(key_value_store: KeyValueStore) -> None: state_in_handler_one: dict[str, JsonSerializable] = {} state_in_handler_two: dict[str, JsonSerializable] = {} @@ -855,7 +906,7 @@ async def handler_three(context: BasicCrawlingContext) -> None: store = await crawler.get_key_value_store() # The state in the KVS must match with the last set state - assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY)) == {'hello': 'last_world'} + assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')) == {'hello': 'last_world'} async def test_max_requests_per_crawl() -> None: @@ -1283,7 +1334,7 @@ async def test_context_use_state_race_condition_in_handlers(key_value_store: Key crawler = BasicCrawler() store = await crawler.get_key_value_store() - await store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0}) + await store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0}) handler_barrier = Barrier(2) @crawler.router.default_handler @@ -1298,7 +1349,7 @@ async def handler(context: BasicCrawlingContext) -> None: store = await crawler.get_key_value_store() # Ensure that local state is pushed back to kvs. await store.persist_autosaved_values() - assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 2 + assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 2 @pytest.mark.run_alone @@ -1671,7 +1722,7 @@ async def test_add_requests_with_rq_param(queue_name: str | None, queue_alias: s crawler = BasicCrawler() rq = await RequestQueue.open(name=queue_name, alias=queue_alias) if by_id: - queue_id = rq.id + queue_id = rq._id queue_name = None else: queue_id = None @@ -1750,55 +1801,87 @@ async def handler(context: BasicCrawlingContext) -> None: assert await unrelated_rq.fetch_next_request() == unrelated_request -async def _run_crawler(requests: list[str], storage_dir: str) -> StatisticsState: +async def _run_crawler(crawler_id: int | None, requests: list[str], storage_dir: str) -> StatisticsState: """Run crawler and return its statistics state. Must be defined like this to be pickable for ProcessPoolExecutor.""" - service_locator.set_configuration( - Configuration( - storage_dir=storage_dir, - purge_on_start=False, - ) - ) async def request_handler(context: BasicCrawlingContext) -> None: context.log.info(f'Processing {context.request.url} ...') + # Add visited url to crawler state and use it to verify state persistence. + state = await context.use_state({'urls': []}) + state['urls'] = state.get('urls') + assert isinstance(state['urls'], list) + state['urls'].append(context.request.url) + context.log.info(f'State {state}') crawler = BasicCrawler( request_handler=request_handler, concurrency_settings=ConcurrencySettings(max_concurrency=1, desired_concurrency=1), + configuration=Configuration( + storage_dir=storage_dir, + purge_on_start=False, + ), ) await crawler.run(requests) return crawler.statistics.state -def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsState: - return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir)) +@dataclass +class _CrawlerInput: + requests: list[str] + id: None | int = None + +def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) -> list[StatisticsState]: + return [ + asyncio.run(_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir)) + for crawler_input in crawler_inputs + ] -async def test_crawler_statistics_persistence(tmp_path: Path) -> None: - """Test that crawler statistics persist and are loaded correctly. + +async def test_crawler_state_persistence(tmp_path: Path) -> None: + """Test that crawler statistics and state persist and are loaded correctly. This test simulates starting the crawler process twice, and checks that the statistics include first run.""" + state_kvs = await KeyValueStore.open( + storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path)) + ) + with ProcessPoolExecutor() as executor: # Crawl 2 requests in the first run and automatically persist the state. first_run_state = executor.submit( - _process_run_crawler, - requests=['https://a.placeholder.com', 'https://b.placeholder.com'], + _process_run_crawlers, + crawler_inputs=[_CrawlerInput(requests=['https://a.placeholder.com', 'https://b.placeholder.com'])], storage_dir=str(tmp_path), - ).result() + ).result()[0] + # Expected state after first crawler run assert first_run_state.requests_finished == 2 + state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com'] # Do not reuse the executor to simulate a fresh process to avoid modified class attributes. with ProcessPoolExecutor() as executor: # Crawl 1 additional requests in the second run, but use previously automatically persisted state. second_run_state = executor.submit( - _process_run_crawler, requests=['https://c.placeholder.com'], storage_dir=str(tmp_path) - ).result() + _process_run_crawlers, + crawler_inputs=[_CrawlerInput(requests=['https://c.placeholder.com'])], + storage_dir=str(tmp_path), + ).result()[0] + + # Expected state after second crawler run + # 2 requests from first run and 1 request from second run. assert second_run_state.requests_finished == 3 + state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + assert state.get('urls') == [ + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://c.placeholder.com', + ] + assert first_run_state.crawler_started_at == second_run_state.crawler_started_at assert first_run_state.crawler_finished_at assert second_run_state.crawler_finished_at @@ -1807,6 +1890,52 @@ async def test_crawler_statistics_persistence(tmp_path: Path) -> None: assert first_run_state.crawler_runtime < second_run_state.crawler_runtime +async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Path) -> None: + """Test that crawler statistics and state persist and are loaded correctly. + + This test simulates starting the crawler process twice, and checks that the statistics include first run. + Each time two distinct crawlers are running, and they should keep using their own statistics and state.""" + state_kvs = await KeyValueStore.open( + storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path)) + ) + + with ProcessPoolExecutor() as executor: + # Run 2 crawler, each crawl 1 request in and automatically persist the state. + first_run_states = executor.submit( + _process_run_crawlers, + crawler_inputs=[ + _CrawlerInput(requests=['https://a.placeholder.com']), + _CrawlerInput(requests=['https://c.placeholder.com']), + ], + storage_dir=str(tmp_path), + ).result() + # Expected state after first crawler run + assert first_run_states[0].requests_finished == 1 + assert first_run_states[1].requests_finished == 1 + state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + assert state_0.get('urls') == ['https://a.placeholder.com'] + state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') + assert state_1.get('urls') == ['https://c.placeholder.com'] + + with ProcessPoolExecutor() as executor: + # Run 2 crawler, each crawl 1 request in and automatically persist the state. + second_run_states = executor.submit( + _process_run_crawlers, + crawler_inputs=[ + _CrawlerInput(requests=['https://b.placeholder.com']), + _CrawlerInput(requests=['https://d.placeholder.com']), + ], + storage_dir=str(tmp_path), + ).result() + # Expected state after first crawler run + assert second_run_states[0].requests_finished == 2 + assert second_run_states[1].requests_finished == 2 + state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') + assert state_0.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com'] + state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') + assert state_1.get('urls') == ['https://c.placeholder.com', 'https://d.placeholder.com'] + + async def test_crawler_intermediate_statistics() -> None: """Test that crawler statistics are correctly updating total runtime on every calculate call.""" crawler = BasicCrawler()