-
Notifications
You must be signed in to change notification settings - Fork 4
feat(admin-api): add sync progress endpoint #1528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
7723067 to
f21b0eb
Compare
LNSD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Slack, I suggested using a PUSH model (instead of a POLL model) to expose the table sync progress updates to all the clients (e.g., via a Kafka broker).
This is a secondary priority. But there is value for the engine management use case.
Please review my comments.
crates/services/admin-api/src/lib.rs
Outdated
| get(datasets::list_jobs::handler), | ||
| ) | ||
| .route( | ||
| "/datasets/{namespace}/{name}/versions/{revision}/sync-progress", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would go with a shorter word.
| "/datasets/{namespace}/{name}/versions/{revision}/sync-progress", | |
| "/datasets/{namespace}/{name}/versions/{revision}/progress", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea, I will implement this - it is much cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think I should change all references to this to progress instead of sync-progress or just the api route?
crates/services/admin-api/src/handlers/datasets/sync_progress.rs
Outdated
Show resolved
Hide resolved
| })?; | ||
|
|
||
| // Query active tables info from metadata database (job_id, status) | ||
| let writer_infos = metadata_db::sync_progress::get_active_tables_with_writer_info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not query this information directly from the metadata_db. Actually, no direct metadata_db interactions should happen in the admin API handlers. This should be part of the amp-data-store component.
| for resolved_table in dataset.resolved_tables(partial_ref) { | ||
| let table_name = resolved_table.name().clone(); | ||
| let writer_info = writer_info_map.get(table_name.as_str()); | ||
|
|
||
| // Get the active physical table if it exists | ||
| let physical_table = | ||
| PhysicalTable::get_active(ctx.data_store.clone(), resolved_table.clone()) | ||
| .await | ||
| .map_err(|err| { | ||
| tracing::error!( | ||
| table = %table_name, | ||
| error = %err, | ||
| error_source = logging::error_source(&*err), | ||
| "failed to get active physical table" | ||
| ); | ||
| Error::PhysicalTable(err) | ||
| })?; | ||
|
|
||
| let (current_block, start_block, files_count, total_size_bytes) = | ||
| if let Some(pt) = physical_table { | ||
| // Take a snapshot to get accurate synced range | ||
| let snapshot = pt | ||
| .snapshot(false, ctx.data_store.clone()) | ||
| .await | ||
| .map_err(|err| { | ||
| tracing::error!( | ||
| table = %table_name, | ||
| error = %err, | ||
| error_source = logging::error_source(&*err), | ||
| "failed to snapshot physical table" | ||
| ); | ||
| Error::PhysicalTable(err) | ||
| })?; | ||
|
|
||
| let synced_range = snapshot.synced_range(); | ||
| let canonical_segments = snapshot.canonical_segments(); | ||
|
|
||
| let files_count = canonical_segments.len() as i64; | ||
| let total_size_bytes = canonical_segments | ||
| .iter() | ||
| .map(|s| s.object.size as i64) | ||
| .sum(); | ||
|
|
||
| let (start, end) = match synced_range { | ||
| Some(range) => ( | ||
| Some(range.start().try_into().unwrap_or(0)), | ||
| Some(range.end().try_into().unwrap_or(0)), | ||
| ), | ||
| None => (None, None), | ||
| }; | ||
|
|
||
| (end, start, files_count, total_size_bytes) | ||
| } else { | ||
| // Table hasn't been created/synced yet | ||
| (None, None, 0, 0) | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this logic seems too ad hoc to be included in the admin API handler. It belongs somewhere else in the data plane, not in the admin API.
|
In addition to a |
|
I've added an RFC documenting the proposed changes based on all the feedback. @LNSD @leoyvens @Chriswhited
|
27cb3ef to
359a8ac
Compare
…p + reorg handling via canonical_chain logic
… for a specific table within a dataset
This has been added. |
16f4c38 to
408b46c
Compare
LNSD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, check my comments 🙂
I am still checking the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should add this feature doc when you implement the Kafka-based worker events emitter
| -H "Authorization: Bearer $TOKEN" | ||
| ``` | ||
|
|
||
| ## Implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you align the ## Implementation section content with this:
amp/docs/features/admin-workers.md
Lines 123 to 141 in bf8e401
| ## Implementation | |
| ### Database Schema | |
| Workers are stored in the `workers` table: | |
| | Column | Type | Description | | |
| |--------|------|-------------| | |
| | `node_id` | TEXT | Unique worker identifier | | |
| | `heartbeat_at` | TIMESTAMPTZ | Last heartbeat timestamp | | |
| | `created_at` | TIMESTAMPTZ | Initial registration | | |
| | `registered_at` | TIMESTAMPTZ | Last registration | | |
| | `info` | JSONB | Build metadata | | |
| ### Source Files | |
| - `crates/services/admin-api/src/handlers/workers/` - API endpoint handlers | |
| - `crates/clients/admin/src/workers.rs` - Client library | |
| - `crates/core/metadata-db/src/workers.rs` - Database operations |
Instead of the "Database Schema", as part of this feature implementation, it is important to explain how we define the "latest block" (like we described in the Slack conversation).
Also, no need to add code in the feature doc. Claude is capable of following the "Dource Files" references and read Rust code.
| ## Usage | ||
|
|
||
| ### Get Dataset Progress | ||
|
|
||
| ```bash | ||
| curl -X GET "http://localhost:8080/datasets/ethereum/mainnet/versions/0.0.0/progress" \ | ||
| -H "Authorization: Bearer $TOKEN" | ||
| ``` | ||
|
|
||
| ### Get Table Progress | ||
|
|
||
| ```bash | ||
| curl -X GET "http://localhost:8080/datasets/ethereum/mainnet/versions/0.0.0/tables/blocks/progress" \ | ||
| -H "Authorization: Bearer $TOKEN" | ||
| ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The admin-<resource> feature docs should reflect from a user's POV how to use the progress reporting feature, e.g., by using it via ampctl.
See the workers' admin feature doc for some examples:
amp/docs/features/admin-workers.md
Lines 39 to 104 in bf8e401
| ## Usage | |
| **List all workers:** | |
| View all registered workers and their last heartbeat times to get an overview of your worker fleet. | |
| ```bash | |
| # Basic listing | |
| ampctl worker list | |
| ampctl worker ls # alias | |
| # Example output: | |
| # worker-01h2xcejqtf2nbrexx3vqjhp41 (last heartbeat: 2025-01-15T17:20:15Z) | |
| # indexer-node-1 (last heartbeat: 2025-01-15T17:18:45Z) | |
| # eu-west-1a-worker (last heartbeat: 2025-01-15T17:20:10Z) | |
| ``` | |
| **Inspect specific worker:** | |
| Get detailed information about a specific worker including build version, commit SHA, and lifecycle timestamps. | |
| ```bash | |
| # Get detailed worker information | |
| ampctl worker inspect worker-01 | |
| ampctl worker get worker-01 # alias | |
| # Example output: | |
| # Node ID: worker-01h2xcejqtf2nbrexx3vqjhp41 | |
| # Created: 2025-01-01T12:00:00Z | |
| # Registered: 2025-01-15T16:45:30Z | |
| # Heartbeat: 2025-01-15T17:20:15Z | |
| # | |
| # Worker Info: | |
| # Version: v0.0.22-15-g8b065bde | |
| # Commit: 8b065bde9c1a2f3e4d5c6b7a8e9f0a1b2c3d4e5f | |
| # Commit Timestamp: 2025-01-15T14:30:00Z | |
| # Build Date: 2025-01-15T15:45:30Z | |
| ``` | |
| **JSON output for scripting:** | |
| Use JSON format to pipe output to jq or other tools for automated processing and monitoring. | |
| ```bash | |
| # List workers as JSON | |
| ampctl worker list --json | |
| # Inspect worker as JSON | |
| ampctl worker inspect worker-01 --json | |
| # Extract specific fields with jq | |
| ampctl worker list --json | jq -r '.workers[] | "\(.node_id): \(.heartbeat_at)"' | |
| ampctl worker inspect worker-01 --json | jq -r '.info.version' | |
| ``` | |
| **Direct API access:** | |
| Query the Admin API directly using curl for integrations or when ampctl is not available. | |
| ```bash | |
| # List all workers | |
| curl http://localhost:1610/workers | |
| # Get worker details | |
| curl http://localhost:1610/workers/worker-01 | |
| ``` |
| 6. Format and return JSON response | ||
| ``` | ||
|
|
||
| ## API Reference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to repeat the JSON data structures; we can refer to the generated OpenAPI spec. See the workers' document for an example:
amp/docs/features/admin-workers.md
Lines 106 to 121 in bf8e401
| ## API Reference | |
| | Endpoint | Method | Description | | |
| |----------|--------|-------------| | |
| | `/workers` | GET | List all workers | | |
| | `/workers/{id}` | GET | Get worker details by node ID | | |
| For request/response schemas, see [Admin API OpenAPI spec](../openapi-specs/admin.spec.json): | |
| ```bash | |
| # List workers endpoint | |
| jq '.paths["/workers"]' docs/openapi-specs/admin.spec.json | |
| # Get worker endpoint | |
| jq '.paths["/workers/{id}"]' docs/openapi-specs/admin.spec.json | |
| ``` |
| - **Progress**: The current state of data synchronization for a dataset, including the range of blocks that have been synced and the number of files produced | ||
| - **Current Block**: The highest block number that has been synced (end of the synced range) | ||
| - **Start Block**: The lowest block number that has been synced (beginning of the synced range) | ||
| - **Job Status**: The health state of the writer job (e.g., `RUNNING`, `FAILED`, `COMPLETED`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which job are we returning the status for? In the DB, multiple jobs for a dataset can exist, although only one can be in running mode.
|
|
||
| The API uses a **Pull Model** where progress is calculated on-demand: | ||
|
|
||
| 1. **No new infrastructure** - Avoids introducing Kafka or message broker dependencies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to refer to the Kafka solution. Basically, explain what this is, not what it is not.
| - **Single-network datasets**: Multi-chain derived datasets may need `Vec<BlockRange>` per network in the future | ||
| - **No chainhead**: Response does not include chainhead for lag calculation (requires provider RPC access) | ||
| - **No off-chain progress**: Datasets without block numbers (e.g., IPFS sources) have no defined progress metric | ||
| - **Bulk endpoint not available**: No endpoint to retrieve progress for multiple datasets in a single request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idem. No need to explain, this functionality doesn't exist. Only explain what exists.
| ## Limitations | ||
|
|
||
| - **Polling required**: Platform services must poll each dataset individually; no push/subscription model currently exists | ||
| - **Single-network datasets**: Multi-chain derived datasets may need `Vec<BlockRange>` per network in the future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idem. Not supported yet. Do not mention it.
|
|
||
| ## Summary | ||
|
|
||
| The Dataset Progress API provides visibility into the operational state of datasets, reporting sync metrics like `start_block`, `current_block`, job health status, and file statistics. This API serves as the "ground truth" for the engine's state, which Platform services can use to calculate higher-level metrics like freshness or block lag. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not mention the "platform". This functionality allows anyone to track the progress of a dataset sync at a point in time. Either programmatically over the RESTful API or via the administration CLI (ampctl).
Summary
GET /datasets/{namespace}/{name}/versions/{revision}/sync-progressendpoint.current_block,start_block,job_status, and file stats.TableSnapshot::synced_range()withcanonical_chainlogic to accurately report sync progress, handling gaps and reorgs.Tests
Response format:
{ "dataset_namespace": "ethereum", "dataset_name": "mainnet", "revision": "0.0.0", "manifest_hash": "2dbf16e8a4d1c526e3893341d1945040d51ea1b68d1c420e402be59b0646fcfa", "tables": [ { "table_name": "blocks", "current_block": 950000, "start_block": 0, "job_id": 1, "job_status": "RUNNING", "files_count": 47, "total_size_bytes": 2147483648 } ] }