Skip to content

Conversation

@mitchhs12
Copy link
Contributor

@mitchhs12 mitchhs12 commented Jan 7, 2026

Summary

  • Added GET /datasets/{namespace}/{name}/versions/{revision}/sync-progress endpoint.
  • Returns per-table sync progress including current_block, start_block, job_status, and file stats.
  • Uses TableSnapshot::synced_range() with canonical_chain logic to accurately report sync progress, handling gaps and reorgs.

Tests

  • Endpoint returns correct structure for valid dataset
  • Returns 404 for non-existent dataset
  • Verifies RUNNING status while job is actively syncing
  • Verifies COMPLETED status when end block is reached

Response format:

{
  "dataset_namespace": "ethereum",
  "dataset_name": "mainnet",
  "revision": "0.0.0",
  "manifest_hash": "2dbf16e8a4d1c526e3893341d1945040d51ea1b68d1c420e402be59b0646fcfa",
  "tables": [
    {
      "table_name": "blocks",
      "current_block": 950000,
      "start_block": 0,
      "job_id": 1,
      "job_status": "RUNNING",
      "files_count": 47,
      "total_size_bytes": 2147483648
    }
  ]
}

@mitchhs12 mitchhs12 requested review from LNSD and leoyvens January 7, 2026 23:00
@mitchhs12 mitchhs12 self-assigned this Jan 7, 2026
@mitchhs12 mitchhs12 linked an issue Jan 7, 2026 that may be closed by this pull request
Copy link
Contributor

@LNSD LNSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Slack, I suggested using a PUSH model (instead of a POLL model) to expose the table sync progress updates to all the clients (e.g., via a Kafka broker).

This is a secondary priority. But there is value for the engine management use case.

Please review my comments.

get(datasets::list_jobs::handler),
)
.route(
"/datasets/{namespace}/{name}/versions/{revision}/sync-progress",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go with a shorter word.

Suggested change
"/datasets/{namespace}/{name}/versions/{revision}/sync-progress",
"/datasets/{namespace}/{name}/versions/{revision}/progress",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea, I will implement this - it is much cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think I should change all references to this to progress instead of sync-progress or just the api route?

})?;

// Query active tables info from metadata database (job_id, status)
let writer_infos = metadata_db::sync_progress::get_active_tables_with_writer_info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not query this information directly from the metadata_db. Actually, no direct metadata_db interactions should happen in the admin API handlers. This should be part of the amp-data-store component.

Comment on lines 142 to 208
for resolved_table in dataset.resolved_tables(partial_ref) {
let table_name = resolved_table.name().clone();
let writer_info = writer_info_map.get(table_name.as_str());

// Get the active physical table if it exists
let physical_table =
PhysicalTable::get_active(ctx.data_store.clone(), resolved_table.clone())
.await
.map_err(|err| {
tracing::error!(
table = %table_name,
error = %err,
error_source = logging::error_source(&*err),
"failed to get active physical table"
);
Error::PhysicalTable(err)
})?;

let (current_block, start_block, files_count, total_size_bytes) =
if let Some(pt) = physical_table {
// Take a snapshot to get accurate synced range
let snapshot = pt
.snapshot(false, ctx.data_store.clone())
.await
.map_err(|err| {
tracing::error!(
table = %table_name,
error = %err,
error_source = logging::error_source(&*err),
"failed to snapshot physical table"
);
Error::PhysicalTable(err)
})?;

let synced_range = snapshot.synced_range();
let canonical_segments = snapshot.canonical_segments();

let files_count = canonical_segments.len() as i64;
let total_size_bytes = canonical_segments
.iter()
.map(|s| s.object.size as i64)
.sum();

let (start, end) = match synced_range {
Some(range) => (
Some(range.start().try_into().unwrap_or(0)),
Some(range.end().try_into().unwrap_or(0)),
),
None => (None, None),
};

(end, start, files_count, total_size_bytes)
} else {
// Table hasn't been created/synced yet
(None, None, 0, 0)
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this logic seems too ad hoc to be included in the admin API handler. It belongs somewhere else in the data plane, not in the admin API.

@LNSD
Copy link
Contributor

LNSD commented Jan 14, 2026

In addition to a /datasets progress endpoint, we should add a per-table /datasets/{namespace}/{name}/version/{revision}/tables/{table_name}/progress endpoint.

@mitchhs12
Copy link
Contributor Author

I've added an RFC documenting the proposed changes based on all the feedback.

@LNSD @leoyvens @Chriswhited
I'd appreciate your thoughts before I proceed with the implementation. In particular Section 6:

  • DataStore API Design - Does the proposed get_table_progress() and get_tables_writer_info() method signatures look correct? Are there any concerns with adding these to the existing DataStore struct or should these be added to the amp-data-store crate or somewhere else entirely?

  • Bulk Endpoint - I've noted in Future Considerations that a bulk GET /datasets/progress endpoint may be needed for Platform to monitor many datasets efficiently. Should this be part of the initial implementation?

  • Future Considerations - Are there other scenarios that should be documented (multi-chain, non-block progress, etc.)?

@mitchhs12 mitchhs12 force-pushed the feature/admin-api-sync-progress branch from 27cb3ef to 359a8ac Compare January 15, 2026 14:55
@mitchhs12
Copy link
Contributor Author

In addition to a /datasets progress endpoint, we should add a per-table /datasets/{namespace}/{name}/version/{revision}/tables/{table_name}/progress endpoint.

This has been added.

@mitchhs12 mitchhs12 force-pushed the feature/admin-api-sync-progress branch from 16f4c38 to 408b46c Compare January 15, 2026 20:56
Copy link
Contributor

@LNSD LNSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check my comments 🙂

I am still checking the implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should add this feature doc when you implement the Kafka-based worker events emitter

-H "Authorization: Bearer $TOKEN"
```

## Implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you align the ## Implementation section content with this:

## Implementation
### Database Schema
Workers are stored in the `workers` table:
| Column | Type | Description |
|--------|------|-------------|
| `node_id` | TEXT | Unique worker identifier |
| `heartbeat_at` | TIMESTAMPTZ | Last heartbeat timestamp |
| `created_at` | TIMESTAMPTZ | Initial registration |
| `registered_at` | TIMESTAMPTZ | Last registration |
| `info` | JSONB | Build metadata |
### Source Files
- `crates/services/admin-api/src/handlers/workers/` - API endpoint handlers
- `crates/clients/admin/src/workers.rs` - Client library
- `crates/core/metadata-db/src/workers.rs` - Database operations

Instead of the "Database Schema", as part of this feature implementation, it is important to explain how we define the "latest block" (like we described in the Slack conversation).

Also, no need to add code in the feature doc. Claude is capable of following the "Dource Files" references and read Rust code.

Comment on lines +114 to +128
## Usage

### Get Dataset Progress

```bash
curl -X GET "http://localhost:8080/datasets/ethereum/mainnet/versions/0.0.0/progress" \
-H "Authorization: Bearer $TOKEN"
```

### Get Table Progress

```bash
curl -X GET "http://localhost:8080/datasets/ethereum/mainnet/versions/0.0.0/tables/blocks/progress" \
-H "Authorization: Bearer $TOKEN"
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The admin-<resource> feature docs should reflect from a user's POV how to use the progress reporting feature, e.g., by using it via ampctl.

See the workers' admin feature doc for some examples:

## Usage
**List all workers:**
View all registered workers and their last heartbeat times to get an overview of your worker fleet.
```bash
# Basic listing
ampctl worker list
ampctl worker ls # alias
# Example output:
# worker-01h2xcejqtf2nbrexx3vqjhp41 (last heartbeat: 2025-01-15T17:20:15Z)
# indexer-node-1 (last heartbeat: 2025-01-15T17:18:45Z)
# eu-west-1a-worker (last heartbeat: 2025-01-15T17:20:10Z)
```
**Inspect specific worker:**
Get detailed information about a specific worker including build version, commit SHA, and lifecycle timestamps.
```bash
# Get detailed worker information
ampctl worker inspect worker-01
ampctl worker get worker-01 # alias
# Example output:
# Node ID: worker-01h2xcejqtf2nbrexx3vqjhp41
# Created: 2025-01-01T12:00:00Z
# Registered: 2025-01-15T16:45:30Z
# Heartbeat: 2025-01-15T17:20:15Z
#
# Worker Info:
# Version: v0.0.22-15-g8b065bde
# Commit: 8b065bde9c1a2f3e4d5c6b7a8e9f0a1b2c3d4e5f
# Commit Timestamp: 2025-01-15T14:30:00Z
# Build Date: 2025-01-15T15:45:30Z
```
**JSON output for scripting:**
Use JSON format to pipe output to jq or other tools for automated processing and monitoring.
```bash
# List workers as JSON
ampctl worker list --json
# Inspect worker as JSON
ampctl worker inspect worker-01 --json
# Extract specific fields with jq
ampctl worker list --json | jq -r '.workers[] | "\(.node_id): \(.heartbeat_at)"'
ampctl worker inspect worker-01 --json | jq -r '.info.version'
```
**Direct API access:**
Query the Admin API directly using curl for integrations or when ampctl is not available.
```bash
# List all workers
curl http://localhost:1610/workers
# Get worker details
curl http://localhost:1610/workers/worker-01
```

6. Format and return JSON response
```

## API Reference
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to repeat the JSON data structures; we can refer to the generated OpenAPI spec. See the workers' document for an example:

## API Reference
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/workers` | GET | List all workers |
| `/workers/{id}` | GET | Get worker details by node ID |
For request/response schemas, see [Admin API OpenAPI spec](../openapi-specs/admin.spec.json):
```bash
# List workers endpoint
jq '.paths["/workers"]' docs/openapi-specs/admin.spec.json
# Get worker endpoint
jq '.paths["/workers/{id}"]' docs/openapi-specs/admin.spec.json
```

- **Progress**: The current state of data synchronization for a dataset, including the range of blocks that have been synced and the number of files produced
- **Current Block**: The highest block number that has been synced (end of the synced range)
- **Start Block**: The lowest block number that has been synced (beginning of the synced range)
- **Job Status**: The health state of the writer job (e.g., `RUNNING`, `FAILED`, `COMPLETED`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which job are we returning the status for? In the DB, multiple jobs for a dataset can exist, although only one can be in running mode.


The API uses a **Pull Model** where progress is calculated on-demand:

1. **No new infrastructure** - Avoids introducing Kafka or message broker dependencies
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to refer to the Kafka solution. Basically, explain what this is, not what it is not.

- **Single-network datasets**: Multi-chain derived datasets may need `Vec<BlockRange>` per network in the future
- **No chainhead**: Response does not include chainhead for lag calculation (requires provider RPC access)
- **No off-chain progress**: Datasets without block numbers (e.g., IPFS sources) have no defined progress metric
- **Bulk endpoint not available**: No endpoint to retrieve progress for multiple datasets in a single request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem. No need to explain, this functionality doesn't exist. Only explain what exists.

## Limitations

- **Polling required**: Platform services must poll each dataset individually; no push/subscription model currently exists
- **Single-network datasets**: Multi-chain derived datasets may need `Vec<BlockRange>` per network in the future
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem. Not supported yet. Do not mention it.


## Summary

The Dataset Progress API provides visibility into the operational state of datasets, reporting sync metrics like `start_block`, `current_block`, job health status, and file statistics. This API serves as the "ground truth" for the engine's state, which Platform services can use to calculate higher-level metrics like freshness or block lag.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not mention the "platform". This functionality allows anyone to track the progress of a dataset sync at a point in time. Either programmatically over the RESTful API or via the administration CLI (ampctl).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Job progress reporting

3 participants