diff --git a/intentkit/models/db.py b/intentkit/models/db.py index e2dc37fd..844fa855 100644 --- a/intentkit/models/db.py +++ b/intentkit/models/db.py @@ -161,5 +161,5 @@ def get_checkpointer() -> AsyncShallowPostgresSaver: AsyncShallowPostgresSaver: The AsyncShallowPostgresSaver instance """ if _checkpointer is None: - raise RuntimeError("Database pool not initialized. Call init_db first.") + raise RuntimeError("Database checkpointer not initialized. Call init_db first.") return _checkpointer diff --git a/intentkit/models/db_mig.py b/intentkit/models/db_mig.py index fa18f06f..42d0cc57 100644 --- a/intentkit/models/db_mig.py +++ b/intentkit/models/db_mig.py @@ -96,8 +96,125 @@ async def update_table_wrapper(): await update_table_schema(conn, dialect, model_cls) await update_table_wrapper() + + # Migrate checkpoints tables + await migrate_checkpoints_table(conn) except Exception as e: logger.error(f"Error updating database schema: {str(e)}") raise logger.info("Database schema updated successfully") + + +async def migrate_checkpoints_table(conn) -> None: + """Migrate checkpoints tables to support langgraph 2.0.""" + tables = ["checkpoints", "checkpoint_blobs", "checkpoint_writes"] + + def _get_tables(connection): + insp = inspect(connection) + return insp.get_table_names() + + existing_tables = await conn.run_sync(_get_tables) + + for table in tables: + if table not in existing_tables: + continue + + # 1. Add checkpoint_ns column + await conn.execute( + text( + f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS checkpoint_ns TEXT DEFAULT ''" + ) + ) + + # 2. Drop columns that ShallowPostgresSaver doesn't use + if table == "checkpoints": + # ShallowPostgresSaver doesn't use checkpoint_id or parent_checkpoint_id + await conn.execute( + text("ALTER TABLE checkpoints DROP COLUMN IF EXISTS checkpoint_id") + ) + await conn.execute( + text( + "ALTER TABLE checkpoints DROP COLUMN IF EXISTS parent_checkpoint_id" + ) + ) + elif table == "checkpoint_blobs": + # ShallowPostgresSaver doesn't use version column + await conn.execute( + text("ALTER TABLE checkpoint_blobs DROP COLUMN IF EXISTS version") + ) + + # 3. Update Primary Key + def _check_pk(connection, table_name=table): + insp = inspect(connection) + return insp.get_pk_constraint(table_name) + + pk = await conn.run_sync(_check_pk) + current_cols = set(pk.get("constrained_columns", [])) + + # Expected columns depend on table + expected_cols = set() + pk_cols = "" + if table == "checkpoints": + expected_cols = {"thread_id", "checkpoint_ns"} + pk_cols = "thread_id, checkpoint_ns" + elif table == "checkpoint_blobs": + expected_cols = {"thread_id", "checkpoint_ns", "channel"} + pk_cols = "thread_id, checkpoint_ns, channel" + elif table == "checkpoint_writes": + expected_cols = { + "thread_id", + "checkpoint_ns", + "checkpoint_id", + "task_id", + "idx", + } + pk_cols = "thread_id, checkpoint_ns, checkpoint_id, task_id, idx" + + if current_cols != expected_cols: + logger.info(f"Migrating {table} PK from {current_cols} to {expected_cols}") + + # If migrating checkpoints to (thread_id, checkpoint_ns), we need to handle duplicates + if table == "checkpoints" and expected_cols == { + "thread_id", + "checkpoint_ns", + }: + # Keep only the latest checkpoint for each (thread_id, checkpoint_ns) based on checkpoint_id (time-ordered) + await conn.execute( + text(""" + DELETE FROM checkpoints + WHERE (thread_id, checkpoint_ns, checkpoint_id) NOT IN ( + SELECT thread_id, checkpoint_ns, MAX(checkpoint_id) + FROM checkpoints + GROUP BY thread_id, checkpoint_ns + ) + """) + ) + + # If migrating checkpoint_blobs to (thread_id, checkpoint_ns, channel), we need to handle duplicates + elif table == "checkpoint_blobs" and expected_cols == { + "thread_id", + "checkpoint_ns", + "channel", + }: + # Keep only blobs that are referenced by the remaining checkpoints + # The relationship is: checkpoints.checkpoint -> 'channel_versions' ->> blob.channel = blob.version + await conn.execute( + text(""" + DELETE FROM checkpoint_blobs cb + WHERE NOT EXISTS ( + SELECT 1 + FROM checkpoints cp + WHERE cp.thread_id = cb.thread_id + AND cp.checkpoint_ns = cb.checkpoint_ns + AND (cp.checkpoint -> 'channel_versions' ->> cb.channel) = cb.version + ) + """) + ) + + if pk.get("name"): + await conn.execute( + text(f"ALTER TABLE {table} DROP CONSTRAINT {pk['name']}") + ) + + await conn.execute(text(f"ALTER TABLE {table} ADD PRIMARY KEY ({pk_cols})")) diff --git a/intentkit/skills/heurist/image_generation_animagine_xl.py b/intentkit/skills/heurist/image_generation_animagine_xl.py index df5b3b7f..4c6cc409 100644 --- a/intentkit/skills/heurist/image_generation_animagine_xl.py +++ b/intentkit/skills/heurist/image_generation_animagine_xl.py @@ -77,7 +77,6 @@ async def _arun( """ context = self.get_context() skill_config = context.agent.skill_config(self.category) - skill_config = skill_config # Get the Heurist API key from configuration if "api_key" in skill_config and skill_config["api_key"]: diff --git a/intentkit/skills/heurist/image_generation_arthemy_comics.py b/intentkit/skills/heurist/image_generation_arthemy_comics.py index 14a84ba3..9743234f 100644 --- a/intentkit/skills/heurist/image_generation_arthemy_comics.py +++ b/intentkit/skills/heurist/image_generation_arthemy_comics.py @@ -76,7 +76,6 @@ async def _arun( """ context = self.get_context() skill_config = context.agent.skill_config(self.category) - skill_config = skill_config # Get the Heurist API key from configuration if "api_key" in skill_config and skill_config["api_key"]: diff --git a/intentkit/skills/heurist/image_generation_arthemy_real.py b/intentkit/skills/heurist/image_generation_arthemy_real.py index e5916f42..e0a3e3fe 100644 --- a/intentkit/skills/heurist/image_generation_arthemy_real.py +++ b/intentkit/skills/heurist/image_generation_arthemy_real.py @@ -76,7 +76,6 @@ async def _arun( """ context = self.get_context() skill_config = context.agent.skill_config(self.category) - skill_config = skill_config # Get the Heurist API key from configuration if "api_key" in skill_config and skill_config["api_key"]: diff --git a/intentkit/skills/heurist/image_generation_braindance.py b/intentkit/skills/heurist/image_generation_braindance.py index 9b05b4d4..08152d06 100644 --- a/intentkit/skills/heurist/image_generation_braindance.py +++ b/intentkit/skills/heurist/image_generation_braindance.py @@ -76,7 +76,6 @@ async def _arun( """ context = self.get_context() skill_config = context.agent.skill_config(self.category) - skill_config = skill_config # Get the Heurist API key from configuration if "api_key" in skill_config and skill_config["api_key"]: diff --git a/intentkit/skills/heurist/image_generation_cyber_realistic_xl.py b/intentkit/skills/heurist/image_generation_cyber_realistic_xl.py index 7628b450..9d136a2f 100644 --- a/intentkit/skills/heurist/image_generation_cyber_realistic_xl.py +++ b/intentkit/skills/heurist/image_generation_cyber_realistic_xl.py @@ -76,7 +76,6 @@ async def _arun( """ context = self.get_context() skill_config = context.agent.skill_config(self.category) - skill_config = skill_config # Get the Heurist API key from configuration if "api_key" in skill_config and skill_config["api_key"]: diff --git a/intentkit/skills/heurist/image_generation_flux_1_dev.py b/intentkit/skills/heurist/image_generation_flux_1_dev.py index 3e437105..caf2ad36 100644 --- a/intentkit/skills/heurist/image_generation_flux_1_dev.py +++ b/intentkit/skills/heurist/image_generation_flux_1_dev.py @@ -76,7 +76,6 @@ async def _arun( """ context = self.get_context() skill_config = context.agent.skill_config(self.category) - skill_config = skill_config # Get the Heurist API key from configuration if "api_key" in skill_config and skill_config["api_key"]: diff --git a/intentkit/skills/heurist/image_generation_sdxl.py b/intentkit/skills/heurist/image_generation_sdxl.py index 79ca4d2a..2115f1b0 100644 --- a/intentkit/skills/heurist/image_generation_sdxl.py +++ b/intentkit/skills/heurist/image_generation_sdxl.py @@ -76,7 +76,6 @@ async def _arun( """ context = self.get_context() skill_config = context.agent.skill_config(self.category) - skill_config = skill_config # Get the Heurist API key from configuration if "api_key" in skill_config and skill_config["api_key"]: