Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0ad1c4b
Remove incorrect duplicate batch skipping in ReorgAwareStream
incrypto32 Jan 11, 2026
65a73f5
Implement a LMDB stream state store
incrypto32 Nov 25, 2025
ef69741
Add configurable data_dir for LMDB state storage
incrypto32 Jan 11, 2026
9f948c5
implement a Kafka loader
incrypto32 Nov 17, 2025
7af461a
Add kafka streaming loader app
incrypto32 Nov 20, 2025
3669668
Refactor kafka streaming loader and fix Resume watermark
incrypto32 Nov 24, 2025
32a3987
use transactions in kafka loader
incrypto32 Nov 24, 2025
fba0bd2
Fix unit tests failing
incrypto32 Nov 25, 2025
1c18cad
Add more integration tests for kafka loader
incrypto32 Nov 25, 2025
abb28c2
Add a simple kafka consumer script
incrypto32 Dec 1, 2025
be86d5a
Dockerize kafka loader
incrypto32 Dec 1, 2025
2fbe91a
use lmdb stream state for kafka loader
incrypto32 Dec 1, 2025
5beea22
Docs for kafka loader
incrypto32 Dec 2, 2025
648a1c8
Add example query with specific schema
incrypto32 Dec 3, 2025
6c8e237
Udpate docs for kafka loader and fix query
incrypto32 Dec 15, 2025
53e11a7
Fix integration test failures
incrypto32 Dec 16, 2025
7e85d29
Quote dataset names in SQL queries to support slashes
incrypto32 Jan 11, 2026
4970fcf
Pass data_dir config to LMDB state store in Kafka loader
incrypto32 Jan 11, 2026
1c79ead
Add --state-dir CLI parameter for LMDB state storage path
incrypto32 Jan 11, 2026
ffa520e
Add auth, retry logic, and proper logging to Kafka streaming loader
incrypto32 Jan 12, 2026
91a1a66
Add pass-through Kafka producer config via --kafka-config
incrypto32 Jan 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ data/
# Build artifacts
*.tar.gz
*.zip
.amp_state
Empty file added .test.env
Empty file.
23 changes: 23 additions & 0 deletions Dockerfile.kafka
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM python:3.12-slim

RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*

COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

WORKDIR /app

COPY pyproject.toml README.md ./
COPY src/ ./src/
COPY apps/ ./apps/

RUN uv pip install --system --no-cache . && \
uv pip install --system --no-cache kafka-python lmdb

ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1

ENTRYPOINT ["python", "apps/kafka_streaming_loader.py"]
CMD ["--help"]
75 changes: 75 additions & 0 deletions apps/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""Simple Kafka consumer script to print messages from a topic in real-time.

Messages are consumed from a consumer group, so subsequent runs will only show
new messages. Press Ctrl+C to exit cleanly.

Usage:
python kafka_consumer.py [topic] [broker] [group_id]

Examples:
python kafka_consumer.py
python kafka_consumer.py anvil_logs
python kafka_consumer.py anvil_logs localhost:9092
python kafka_consumer.py anvil_logs localhost:9092 my-group
"""

import json
import sys
from datetime import datetime

from kafka import KafkaConsumer

topic = sys.argv[1] if len(sys.argv) > 1 else 'anvil_logs'
broker = sys.argv[2] if len(sys.argv) > 2 else 'localhost:9092'
group_id = sys.argv[3] if len(sys.argv) > 3 else 'kafka-consumer-cli'

print(f'Consuming from: {broker} -> topic: {topic}')
print(f'Consumer group: {group_id}')
print(f'Started at: {datetime.now().strftime("%H:%M:%S")}')
print('-' * 80)

consumer = KafkaConsumer(
topic,
bootstrap_servers=broker,
group_id=group_id,
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)

msg_count = 0
data_count = 0
reorg_count = 0

try:
for msg in consumer:
msg_count += 1
msg_type = msg.value.get('_type', 'unknown')

if msg_type == 'data':
data_count += 1
print(f'\nMessage #{msg_count} [DATA] - Key: {msg.key.decode() if msg.key else "None"}')
print(f'Offset: {msg.offset} | Partition: {msg.partition}')

for k, v in msg.value.items():
if k != '_type':
print(f'{k}: {v}')

elif msg_type == 'reorg':
reorg_count += 1
print(f'\nMessage #{msg_count} [REORG] - Key: {msg.key.decode() if msg.key else "None"}')
print(f'Network: {msg.value.get("network")}')
print(f'Blocks: {msg.value.get("start_block")} -> {msg.value.get("end_block")}')

else:
print(f'\nMessage #{msg_count} [UNKNOWN]')
print(json.dumps(msg.value, indent=2))

print(f'\nTotal: {msg_count} msgs | Data: {data_count} | Reorgs: {reorg_count}')
print('-' * 80)

except KeyboardInterrupt:
print('\n\nStopped')
finally:
consumer.close()
205 changes: 205 additions & 0 deletions apps/kafka_streaming_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#!/usr/bin/env python3
"""Stream data to Kafka with resume watermark support."""

import argparse
import json
import logging
import os
import time
from pathlib import Path

from amp.client import Client
from amp.loaders.types import LabelJoinConfig
from amp.streaming import BlockRange, ResumeWatermark

logger = logging.getLogger('amp.kafka_streaming_loader')

RETRYABLE_ERRORS = (
ConnectionError,
TimeoutError,
OSError,
)


def retry_with_backoff(func, max_retries=5, initial_delay=1.0, max_delay=60.0, backoff_factor=2.0):
"""Execute function with exponential backoff retry on transient errors."""
delay = initial_delay
last_exception = None

for attempt in range(max_retries + 1):
try:
return func()
except RETRYABLE_ERRORS as e:
last_exception = e
if attempt == max_retries:
logger.error(f'Max retries ({max_retries}) exceeded: {e}')
raise
logger.warning(f'Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s...')
time.sleep(delay)
delay = min(delay * backoff_factor, max_delay)

raise last_exception


def get_block_hash(client: Client, raw_dataset: str, block_num: int) -> str:
"""Get block hash from dataset.blocks table."""
query = f'SELECT hash FROM "{raw_dataset}".blocks WHERE block_num = {block_num} LIMIT 1'
result = client.get_sql(query, read_all=True)
hash_val = result.to_pydict()['hash'][0]
return '0x' + hash_val.hex() if isinstance(hash_val, bytes) else hash_val


def get_latest_block(client: Client, raw_dataset: str) -> int:
"""Get latest block number from dataset.blocks table."""
query = f'SELECT block_num FROM "{raw_dataset}".blocks ORDER BY block_num DESC LIMIT 1'
logger.debug(f'Fetching latest block from {raw_dataset}')
logger.debug(f'Query: {query}')
result = client.get_sql(query, read_all=True)
block_num = result.to_pydict()['block_num'][0]
logger.info(f'Latest block in {raw_dataset}: {block_num}')
return block_num


def create_watermark(client: Client, raw_dataset: str, network: str, start_block: int) -> ResumeWatermark:
"""Create a resume watermark for the given start block."""
watermark_block = start_block - 1
watermark_hash = get_block_hash(client, raw_dataset, watermark_block)
return ResumeWatermark(
ranges=[BlockRange(network=network, start=watermark_block, end=watermark_block, hash=watermark_hash)]
)


def main(
amp_server: str,
kafka_brokers: str,
topic: str,
query_file: str,
raw_dataset: str,
network: str,
start_block: int = None,
label_csv: str = None,
state_dir: str = '.amp_state',
auth: bool = False,
auth_token: str = None,
max_retries: int = 5,
retry_delay: float = 1.0,
kafka_config: dict = None,
):
def connect():
return Client(amp_server, auth=auth, auth_token=auth_token)

client = retry_with_backoff(connect, max_retries=max_retries, initial_delay=retry_delay)
logger.info(f'Connected to {amp_server}')

if label_csv and Path(label_csv).exists():
client.configure_label('tokens', label_csv)
logger.info(f'Loaded {len(client.label_manager.get_label("tokens"))} labels from {label_csv}')
label_config = LabelJoinConfig(
label_name='tokens', label_key_column='token_address', stream_key_column='token_address'
)
else:
label_config = None

connection_config = {
'bootstrap_servers': kafka_brokers,
'client_id': 'amp-kafka-loader',
'state': {'enabled': True, 'storage': 'lmdb', 'data_dir': state_dir},
}
if kafka_config:
connection_config.update(kafka_config)
client.configure_connection('kafka', 'kafka', connection_config)

with open(query_file) as f:
query = f.read()

if start_block is not None:
resume_watermark = create_watermark(client, raw_dataset, network, start_block) if start_block > 0 else None
logger.info(f'Starting query from block {start_block}')
else:
resume_watermark = None
logger.info('Resuming from LMDB state (or starting from latest if no state)')
logger.info(f'Streaming to Kafka: {kafka_brokers} -> {topic}')

batch_count = 0

def stream_batches():
nonlocal batch_count
for result in client.sql(query).load(
'kafka', topic, stream=True, label_config=label_config, resume_watermark=resume_watermark
):
if result.success:
batch_count += 1
if batch_count == 1 and result.metadata:
logger.info(f'First batch: {result.metadata.get("block_ranges")}')
logger.info(f'Batch {batch_count}: {result.rows_loaded} rows in {result.duration:.2f}s')
else:
logger.error(f'Batch error: {result.error}')

retry_with_backoff(stream_batches, max_retries=max_retries, initial_delay=retry_delay)


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Stream data to Kafka with resume watermark')
parser.add_argument('--amp-server', default=os.getenv('AMP_SERVER_URL', 'grpc://127.0.0.1:1602'))
parser.add_argument('--kafka-brokers', default='localhost:9092')
parser.add_argument('--topic', required=True)
parser.add_argument('--query-file', required=True)
parser.add_argument(
'--raw-dataset', required=True, help='Dataset name for the raw dataset of the chain (e.g., anvil, eth_firehose)'
)
parser.add_argument('--network', default='anvil')
parser.add_argument('--start-block', type=int, help='Start from specific block (default: latest - 10)')
parser.add_argument('--label-csv', help='Optional CSV for label joining')
parser.add_argument('--state-dir', default='.amp_state', help='Directory for LMDB state storage')
parser.add_argument('--auth', action='store_true', help='Enable auth using ~/.amp/cache or AMP_AUTH_TOKEN env var')
parser.add_argument('--auth-token', help='Explicit auth token (works independently, does not require --auth)')
parser.add_argument('--max-retries', type=int, default=5, help='Max retries for connection failures (default: 5)')
parser.add_argument('--retry-delay', type=float, default=1.0, help='Initial retry delay in seconds (default: 1.0)')
parser.add_argument(
'--kafka-config',
type=str,
help='Extra Kafka producer config as JSON. Uses kafka-python naming (underscores). '
'Example: \'{"compression_type": "lz4", "linger_ms": 5}\'. '
'See: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html',
)
parser.add_argument(
'--kafka-config-file',
type=Path,
help='Path to JSON file with extra Kafka producer config',
)
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'])
args = parser.parse_args()

logging.basicConfig(level=logging.WARNING, format='%(asctime)s [%(name)s] %(levelname)s: %(message)s')
log_level = getattr(logging, args.log_level) if args.log_level else logging.INFO
logging.getLogger('amp').setLevel(log_level)

kafka_config = {}
if args.kafka_config_file:
kafka_config = json.loads(args.kafka_config_file.read_text())
logger.info(f'Loaded Kafka config from {args.kafka_config_file}')
if args.kafka_config:
kafka_config.update(json.loads(args.kafka_config))

try:
main(
amp_server=args.amp_server,
kafka_brokers=args.kafka_brokers,
topic=args.topic,
query_file=args.query_file,
raw_dataset=args.raw_dataset,
network=args.network,
start_block=args.start_block,
label_csv=args.label_csv,
state_dir=args.state_dir,
auth=args.auth,
auth_token=args.auth_token,
max_retries=args.max_retries,
retry_delay=args.retry_delay,
kafka_config=kafka_config or None,
)
except KeyboardInterrupt:
logger.info('Stopped by user')
except Exception as e:
logger.error(f'Fatal error: {e}')
raise
Loading
Loading