Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion intentkit/models/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,5 @@ def get_checkpointer() -> AsyncShallowPostgresSaver:
AsyncShallowPostgresSaver: The AsyncShallowPostgresSaver instance
"""
if _checkpointer is None:
raise RuntimeError("Database pool not initialized. Call init_db first.")
raise RuntimeError("Database checkpointer not initialized. Call init_db first.")
return _checkpointer
117 changes: 117 additions & 0 deletions intentkit/models/db_mig.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,125 @@ async def update_table_wrapper():
await update_table_schema(conn, dialect, model_cls)

await update_table_wrapper()

# Migrate checkpoints tables
await migrate_checkpoints_table(conn)
except Exception as e:
logger.error(f"Error updating database schema: {str(e)}")
raise

logger.info("Database schema updated successfully")


async def migrate_checkpoints_table(conn) -> None:
"""Migrate checkpoints tables to support langgraph 2.0."""
tables = ["checkpoints", "checkpoint_blobs", "checkpoint_writes"]

def _get_tables(connection):
insp = inspect(connection)
return insp.get_table_names()

existing_tables = await conn.run_sync(_get_tables)

for table in tables:
if table not in existing_tables:
continue

# 1. Add checkpoint_ns column
await conn.execute(
text(
f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS checkpoint_ns TEXT DEFAULT ''"
)
)

# 2. Drop columns that ShallowPostgresSaver doesn't use
if table == "checkpoints":
# ShallowPostgresSaver doesn't use checkpoint_id or parent_checkpoint_id
await conn.execute(
text("ALTER TABLE checkpoints DROP COLUMN IF EXISTS checkpoint_id")
)
await conn.execute(
text(
"ALTER TABLE checkpoints DROP COLUMN IF EXISTS parent_checkpoint_id"
)
)
elif table == "checkpoint_blobs":
# ShallowPostgresSaver doesn't use version column
await conn.execute(
text("ALTER TABLE checkpoint_blobs DROP COLUMN IF EXISTS version")
)

# 3. Update Primary Key
def _check_pk(connection, table_name=table):
insp = inspect(connection)
return insp.get_pk_constraint(table_name)

pk = await conn.run_sync(_check_pk)
current_cols = set(pk.get("constrained_columns", []))

# Expected columns depend on table
expected_cols = set()
pk_cols = ""
if table == "checkpoints":
expected_cols = {"thread_id", "checkpoint_ns"}
pk_cols = "thread_id, checkpoint_ns"
elif table == "checkpoint_blobs":
expected_cols = {"thread_id", "checkpoint_ns", "channel"}
pk_cols = "thread_id, checkpoint_ns, channel"
elif table == "checkpoint_writes":
expected_cols = {
"thread_id",
"checkpoint_ns",
"checkpoint_id",
"task_id",
"idx",
}
pk_cols = "thread_id, checkpoint_ns, checkpoint_id, task_id, idx"

if current_cols != expected_cols:
logger.info(f"Migrating {table} PK from {current_cols} to {expected_cols}")

# If migrating checkpoints to (thread_id, checkpoint_ns), we need to handle duplicates
if table == "checkpoints" and expected_cols == {
"thread_id",
"checkpoint_ns",
}:
# Keep only the latest checkpoint for each (thread_id, checkpoint_ns) based on checkpoint_id (time-ordered)
await conn.execute(
text("""
DELETE FROM checkpoints
WHERE (thread_id, checkpoint_ns, checkpoint_id) NOT IN (
SELECT thread_id, checkpoint_ns, MAX(checkpoint_id)
FROM checkpoints
GROUP BY thread_id, checkpoint_ns
)
""")
)

# If migrating checkpoint_blobs to (thread_id, checkpoint_ns, channel), we need to handle duplicates
elif table == "checkpoint_blobs" and expected_cols == {
"thread_id",
"checkpoint_ns",
"channel",
}:
# Keep only blobs that are referenced by the remaining checkpoints
# The relationship is: checkpoints.checkpoint -> 'channel_versions' ->> blob.channel = blob.version
await conn.execute(
text("""
DELETE FROM checkpoint_blobs cb
WHERE NOT EXISTS (
SELECT 1
FROM checkpoints cp
WHERE cp.thread_id = cb.thread_id
AND cp.checkpoint_ns = cb.checkpoint_ns
AND (cp.checkpoint -> 'channel_versions' ->> cb.channel) = cb.version
)
""")
)

if pk.get("name"):
await conn.execute(
text(f"ALTER TABLE {table} DROP CONSTRAINT {pk['name']}")
)

await conn.execute(text(f"ALTER TABLE {table} ADD PRIMARY KEY ({pk_cols})"))
Comment on lines +109 to +220
Copy link

Copilot AI Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new migrate_checkpoints_table function lacks test coverage. Given that this migration performs destructive operations (dropping columns, deleting data), it's critical to have tests that verify:

  1. The migration works correctly on a fresh database
  2. The migration handles existing data properly
  3. The duplicate cleanup logic works as expected
  4. The primary key updates succeed

Consider adding tests similar to the existing model tests in tests/models/test_skills_schema.py.

Copilot uses AI. Check for mistakes.
Comment on lines +119 to +220
Copy link

Copilot AI Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration logic has an ordering problem. The current structure processes each table sequentially:

  1. Add checkpoint_ns column
  2. Drop columns (checkpoint_id, parent_checkpoint_id, or version)
  3. Handle duplicates (but uses the dropped columns!)
  4. Update primary key

The duplicate cleanup (lines 183-192 for checkpoints, 202-213 for checkpoint_blobs) should happen BEFORE dropping columns in step 2. Consider restructuring to handle all pre-drop operations first, then drop columns, then update primary keys.

Copilot uses AI. Check for mistakes.
1 change: 0 additions & 1 deletion intentkit/skills/heurist/image_generation_animagine_xl.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ async def _arun(
"""
context = self.get_context()
skill_config = context.agent.skill_config(self.category)
skill_config = skill_config

# Get the Heurist API key from configuration
if "api_key" in skill_config and skill_config["api_key"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ async def _arun(
"""
context = self.get_context()
skill_config = context.agent.skill_config(self.category)
skill_config = skill_config

# Get the Heurist API key from configuration
if "api_key" in skill_config and skill_config["api_key"]:
Expand Down
1 change: 0 additions & 1 deletion intentkit/skills/heurist/image_generation_arthemy_real.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ async def _arun(
"""
context = self.get_context()
skill_config = context.agent.skill_config(self.category)
skill_config = skill_config

# Get the Heurist API key from configuration
if "api_key" in skill_config and skill_config["api_key"]:
Expand Down
1 change: 0 additions & 1 deletion intentkit/skills/heurist/image_generation_braindance.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ async def _arun(
"""
context = self.get_context()
skill_config = context.agent.skill_config(self.category)
skill_config = skill_config

# Get the Heurist API key from configuration
if "api_key" in skill_config and skill_config["api_key"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ async def _arun(
"""
context = self.get_context()
skill_config = context.agent.skill_config(self.category)
skill_config = skill_config

# Get the Heurist API key from configuration
if "api_key" in skill_config and skill_config["api_key"]:
Expand Down
1 change: 0 additions & 1 deletion intentkit/skills/heurist/image_generation_flux_1_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ async def _arun(
"""
context = self.get_context()
skill_config = context.agent.skill_config(self.category)
skill_config = skill_config

# Get the Heurist API key from configuration
if "api_key" in skill_config and skill_config["api_key"]:
Expand Down
1 change: 0 additions & 1 deletion intentkit/skills/heurist/image_generation_sdxl.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ async def _arun(
"""
context = self.get_context()
skill_config = context.agent.skill_config(self.category)
skill_config = skill_config

# Get the Heurist API key from configuration
if "api_key" in skill_config and skill_config["api_key"]:
Expand Down
Loading