Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ class _BasicCrawlerOptions(TypedDict):
"""Allows overriding the default status message. The default status message is provided in the parameters.
Returning `None` suppresses the status message."""

id: NotRequired[int]
"""Id of the crawler used for state tracking. You can use same explicit id to share state between two crawlers.
By default, each crawler will use own state."""


class _BasicCrawlerOptionsGeneric(TypedDict, Generic[TCrawlingContext, TStatisticsState]):
"""Generic options the `BasicCrawler` constructor."""
Expand Down Expand Up @@ -266,6 +270,7 @@ class BasicCrawler(Generic[TCrawlingContext, TStatisticsState]):

_CRAWLEE_STATE_KEY = 'CRAWLEE_STATE'
_request_handler_timeout_text = 'Request handler timed out after'
__next_id = 0

def __init__(
self,
Expand Down Expand Up @@ -297,6 +302,7 @@ def __init__(
status_message_logging_interval: timedelta = timedelta(seconds=10),
status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]]
| None = None,
id: int | None = None,
_context_pipeline: ContextPipeline[TCrawlingContext] | None = None,
_additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None,
_logger: logging.Logger | None = None,
Expand Down Expand Up @@ -349,13 +355,21 @@ def __init__(
status_message_logging_interval: Interval for logging the crawler status messages.
status_message_callback: Allows overriding the default status message. The default status message is
provided in the parameters. Returning `None` suppresses the status message.
id: Id of the crawler used for state tracking. You can use same explicit id to share state and between two
crawlers. By default, each crawler will use own state.
_context_pipeline: Enables extending the request lifecycle and modifying the crawling context.
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
_additional_context_managers: Additional context managers used throughout the crawler lifecycle.
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
_logger: A logger instance, typically provided by a subclass, for consistent logging labels.
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
"""
if id is None:
self._id = BasicCrawler.__next_id
BasicCrawler.__next_id += 1
else:
self._id = id

implicit_event_manager_with_explicit_config = False
if not configuration:
configuration = service_locator.get_configuration()
Expand Down Expand Up @@ -831,7 +845,7 @@ async def _use_state(
default_value: dict[str, JsonSerializable] | None = None,
) -> dict[str, JsonSerializable]:
kvs = await self.get_key_value_store()
return await kvs.get_auto_saved_value(self._CRAWLEE_STATE_KEY, default_value)
return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self._id}', default_value)

async def _save_crawler_state(self) -> None:
store = await self.get_key_value_store()
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from uvicorn.config import Config

from crawlee import service_locator
from crawlee.crawlers import BasicCrawler
from crawlee.fingerprint_suite._browserforge_adapter import get_available_header_network
from crawlee.http_clients import CurlImpersonateHttpClient, HttpxHttpClient, ImpitHttpClient
from crawlee.proxy_configuration import ProxyInfo
Expand Down Expand Up @@ -74,6 +75,7 @@ def _prepare_test_env() -> None:
# Reset global class variables to ensure test isolation.
KeyValueStore._autosaved_values = {}
Statistics._Statistics__next_id = 0 # type:ignore[attr-defined] # Mangled attribute
BasicCrawler._BasicCrawler__next_id = 0 # type:ignore[attr-defined] # Mangled attribute

return _prepare_test_env

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ async def test_adaptive_crawling_result_use_state_isolation(
crawler = AdaptivePlaywrightCrawler.with_beautifulsoup_static_parser(
rendering_type_predictor=static_only_predictor_enforce_detection,
)
await key_value_store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0})
await key_value_store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0})
request_handler_calls = 0

@crawler.router.default_handler
Expand All @@ -398,7 +398,7 @@ async def request_handler(context: AdaptivePlaywrightCrawlingContext) -> None:
# Request handler was called twice
assert request_handler_calls == 2
# Increment of global state happened only once
assert (await key_value_store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 1
assert (await key_value_store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 1


async def test_adaptive_crawling_statistics(test_urls: list[str]) -> None:
Expand Down
171 changes: 150 additions & 21 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,11 +810,62 @@ async def handler(context: BasicCrawlingContext) -> None:
await crawler.run(['https://hello.world'])

kvs = await crawler.get_key_value_store()
value = await kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY)
value = await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')

assert value == {'hello': 'world'}


async def test_context_use_state_crawlers_share_state() -> None:
async def handler(context: BasicCrawlingContext) -> None:
state = await context.use_state({'urls': []})
assert isinstance(state['urls'], list)
state['urls'].append(context.request.url)

crawler_1 = BasicCrawler(id=0, request_handler=handler)
crawler_2 = BasicCrawler(id=0, request_handler=handler)

await crawler_1.run(['https://a.com'])
await crawler_2.run(['https://b.com'])

kvs = await KeyValueStore.open()
assert crawler_1._id == crawler_2._id == 0
assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_{crawler_1._id}') == {
'urls': ['https://a.com', 'https://b.com']
}


async def test_crawlers_share_stats() -> None:
async def handler(context: BasicCrawlingContext) -> None:
await context.use_state({'urls': []})

crawler_1 = BasicCrawler(id=0, request_handler=handler)
crawler_2 = BasicCrawler(id=0, request_handler=handler, statistics=crawler_1.statistics)

result1 = await crawler_1.run(['https://a.com'])
result2 = await crawler_2.run(['https://b.com'])

assert crawler_1.statistics == crawler_2.statistics
assert result1.requests_finished == 1
assert result2.requests_finished == 2


async def test_context_use_state_crawlers_own_state() -> None:
async def handler(context: BasicCrawlingContext) -> None:
state = await context.use_state({'urls': []})
assert isinstance(state['urls'], list)
state['urls'].append(context.request.url)

crawler_1 = BasicCrawler(request_handler=handler)
crawler_2 = BasicCrawler(request_handler=handler)

await crawler_1.run(['https://a.com'])
await crawler_2.run(['https://b.com'])

kvs = await KeyValueStore.open()
assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') == {'urls': ['https://a.com']}
assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') == {'urls': ['https://b.com']}


async def test_context_handlers_use_state(key_value_store: KeyValueStore) -> None:
state_in_handler_one: dict[str, JsonSerializable] = {}
state_in_handler_two: dict[str, JsonSerializable] = {}
Expand Down Expand Up @@ -855,7 +906,7 @@ async def handler_three(context: BasicCrawlingContext) -> None:
store = await crawler.get_key_value_store()

# The state in the KVS must match with the last set state
assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY)) == {'hello': 'last_world'}
assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')) == {'hello': 'last_world'}


async def test_max_requests_per_crawl() -> None:
Expand Down Expand Up @@ -1283,7 +1334,7 @@ async def test_context_use_state_race_condition_in_handlers(key_value_store: Key

crawler = BasicCrawler()
store = await crawler.get_key_value_store()
await store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0})
await store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0})
handler_barrier = Barrier(2)

@crawler.router.default_handler
Expand All @@ -1298,7 +1349,7 @@ async def handler(context: BasicCrawlingContext) -> None:
store = await crawler.get_key_value_store()
# Ensure that local state is pushed back to kvs.
await store.persist_autosaved_values()
assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 2
assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 2


@pytest.mark.run_alone
Expand Down Expand Up @@ -1671,7 +1722,7 @@ async def test_add_requests_with_rq_param(queue_name: str | None, queue_alias: s
crawler = BasicCrawler()
rq = await RequestQueue.open(name=queue_name, alias=queue_alias)
if by_id:
queue_id = rq.id
queue_id = rq._id
queue_name = None
else:
queue_id = None
Expand Down Expand Up @@ -1750,55 +1801,87 @@ async def handler(context: BasicCrawlingContext) -> None:
assert await unrelated_rq.fetch_next_request() == unrelated_request


async def _run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
async def _run_crawler(crawler_id: int | None, requests: list[str], storage_dir: str) -> StatisticsState:
"""Run crawler and return its statistics state.

Must be defined like this to be pickable for ProcessPoolExecutor."""
service_locator.set_configuration(
Configuration(
storage_dir=storage_dir,
purge_on_start=False,
)
)

async def request_handler(context: BasicCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')
# Add visited url to crawler state and use it to verify state persistence.
state = await context.use_state({'urls': []})
state['urls'] = state.get('urls')
assert isinstance(state['urls'], list)
state['urls'].append(context.request.url)
context.log.info(f'State {state}')

crawler = BasicCrawler(
request_handler=request_handler,
concurrency_settings=ConcurrencySettings(max_concurrency=1, desired_concurrency=1),
configuration=Configuration(
storage_dir=storage_dir,
purge_on_start=False,
),
)

await crawler.run(requests)
return crawler.statistics.state


def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir))
@dataclass
class _CrawlerInput:
requests: list[str]
id: None | int = None


def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) -> list[StatisticsState]:
return [
asyncio.run(_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir))
for crawler_input in crawler_inputs
]

async def test_crawler_statistics_persistence(tmp_path: Path) -> None:
"""Test that crawler statistics persist and are loaded correctly.

async def test_crawler_state_persistence(tmp_path: Path) -> None:
"""Test that crawler statistics and state persist and are loaded correctly.

This test simulates starting the crawler process twice, and checks that the statistics include first run."""

state_kvs = await KeyValueStore.open(
storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path))
)

with ProcessPoolExecutor() as executor:
# Crawl 2 requests in the first run and automatically persist the state.
first_run_state = executor.submit(
_process_run_crawler,
requests=['https://a.placeholder.com', 'https://b.placeholder.com'],
_process_run_crawlers,
crawler_inputs=[_CrawlerInput(requests=['https://a.placeholder.com', 'https://b.placeholder.com'])],
storage_dir=str(tmp_path),
).result()
).result()[0]
# Expected state after first crawler run
assert first_run_state.requests_finished == 2
state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com']

# Do not reuse the executor to simulate a fresh process to avoid modified class attributes.
with ProcessPoolExecutor() as executor:
# Crawl 1 additional requests in the second run, but use previously automatically persisted state.
second_run_state = executor.submit(
_process_run_crawler, requests=['https://c.placeholder.com'], storage_dir=str(tmp_path)
).result()
_process_run_crawlers,
crawler_inputs=[_CrawlerInput(requests=['https://c.placeholder.com'])],
storage_dir=str(tmp_path),
).result()[0]

# Expected state after second crawler run
# 2 requests from first run and 1 request from second run.
assert second_run_state.requests_finished == 3

state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
assert state.get('urls') == [
'https://a.placeholder.com',
'https://b.placeholder.com',
'https://c.placeholder.com',
]

assert first_run_state.crawler_started_at == second_run_state.crawler_started_at
assert first_run_state.crawler_finished_at
assert second_run_state.crawler_finished_at
Expand All @@ -1807,6 +1890,52 @@ async def test_crawler_statistics_persistence(tmp_path: Path) -> None:
assert first_run_state.crawler_runtime < second_run_state.crawler_runtime


async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Path) -> None:
"""Test that crawler statistics and state persist and are loaded correctly.

This test simulates starting the crawler process twice, and checks that the statistics include first run.
Each time two distinct crawlers are running, and they should keep using their own statistics and state."""
state_kvs = await KeyValueStore.open(
storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path))
)

with ProcessPoolExecutor() as executor:
# Run 2 crawler, each crawl 1 request in and automatically persist the state.
first_run_states = executor.submit(
_process_run_crawlers,
crawler_inputs=[
_CrawlerInput(requests=['https://a.placeholder.com']),
_CrawlerInput(requests=['https://c.placeholder.com']),
],
storage_dir=str(tmp_path),
).result()
# Expected state after first crawler run
assert first_run_states[0].requests_finished == 1
assert first_run_states[1].requests_finished == 1
state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
assert state_0.get('urls') == ['https://a.placeholder.com']
state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1')
assert state_1.get('urls') == ['https://c.placeholder.com']

with ProcessPoolExecutor() as executor:
# Run 2 crawler, each crawl 1 request in and automatically persist the state.
second_run_states = executor.submit(
_process_run_crawlers,
crawler_inputs=[
_CrawlerInput(requests=['https://b.placeholder.com']),
_CrawlerInput(requests=['https://d.placeholder.com']),
],
storage_dir=str(tmp_path),
).result()
# Expected state after first crawler run
assert second_run_states[0].requests_finished == 2
assert second_run_states[1].requests_finished == 2
state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
assert state_0.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com']
state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1')
assert state_1.get('urls') == ['https://c.placeholder.com', 'https://d.placeholder.com']


async def test_crawler_intermediate_statistics() -> None:
"""Test that crawler statistics are correctly updating total runtime on every calculate call."""
crawler = BasicCrawler()
Expand Down
Loading