Skip to content

Conversation

@spuckhafte
Copy link

@spuckhafte spuckhafte commented Dec 23, 2025

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • Bug Fixes

    • Stream deletion now attempts to remove orphaned ("zombie") filters; cleanup failures are logged and no longer cause the overall deletion to fail.
  • New Features

    • Deletion workflow returns and logs cleanup results with counts of successful and failed filter removals to aid auditing and troubleshooting.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 23, 2025

Walkthrough

Calls a new async delete_zombie_filters(stream_name) after logstream deletion to remove orphaned filters from the metastore and in-memory FILTERS; the helper errors if the stream still exists and returns counts of successful and failed deletions. Cleanup failures are logged and non-fatal.

Changes

Cohort / File(s) Summary
Stream deletion updates
src/handlers/http/logstream.rs, src/handlers/http/modal/query/querier_logstream.rs
After deleting stream stats, invokes delete_zombie_filters(&stream_name).await, logs returned ZombieResourceCleanupOk; failures are caught and logged without affecting primary delete response.
Zombie filters deletion utility
src/handlers/http/modal/utils/logstream_utils.rs
Added pub async fn delete_zombie_filters(stream_name: &str) -> Result<ZombieResourceCleanupOk, MetastoreError> and pub struct ZombieResourceCleanupOk { pub ok_deletions: i32, pub failed_deletions: i32 }. Verifies stream absence in PARSEABLE.streams, enumerates filters from PARSEABLE.metastore, matches candidates by name or SQL regex (sql_db_regex), deletes matching filters from metastore, removes in-memory FILTERS entries only after successful storage deletions, and returns counts; regex errors map to MetastoreError::Error(INTERNAL_SERVER_ERROR).
Metastore error enum
src/metastore/mod.rs
Added public enum variant ZombieResourceStreamStillExists { stream_name: String, resource_type: String }.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant H as LogStream Handler
    participant S as PARSEABLE.streams
    participant M as PARSEABLE.metastore
    participant C as FILTERS (in-memory)

    H->>H: delete stream stats
    H->>S: check stream exists?(stream_name)
    alt stream still present
        S-->>H: exists
        note right of H `#ffdddd`: returns MetastoreError::ZombieResourceStreamStillExists
    else stream absent
        S-->>H: not found
        H->>M: get_filters()
        M-->>H: filters[]
        H->>H: select matching filters (by name or sql regex)
        loop per matching filter
            H->>M: delete_filter(filter_id)
            alt delete success
                M-->>H: OK
                H->>C: remove FILTERS entry (if present)
            else delete failure
                M-->>H: Error (logged)
            end
        end
        H-->>H: compute ZombieResourceCleanupOk(ok_deletions, failed_deletions)
        H->>H: log cleanup_stats (non-fatal)
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • nikhilsinhaparseable

Poem

🐇 I hop through logs where stray filters creep,

I sniff the SQL and gently sweep,
I count the clears and mark the few that stay,
I tidy cache then wend away,
A small clean feast — then off to play.

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is incomplete; it contains only the template structure with no actual implementation details, rationale, or key changes filled in, and all verification checklist items are unchecked. Fill in the Description section with the goal, solution rationale, and key changes made. Complete or verify the testing checklist items before merging.
Docstring Coverage ⚠️ Warning Docstring coverage is 46.15% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main change: removing associated filters during stream deletion, which directly aligns with the changeset.
✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Contributor

CLA Assistant Lite bot:
Thank you for your submission, we really appreciate it. Like many open-source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution. You can sign the CLA by just posting a Pull Request Comment same as the below format.


I have read the CLA Document and I hereby sign the CLA


You can retrigger this bot by commenting recheck in this Pull Request

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4e67fc6 and aca2671.

📒 Files selected for processing (1)
  • src/handlers/http/logstream.rs
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/handlers/http/logstream.rs
🔇 Additional comments (1)
src/handlers/http/logstream.rs (1)

32-32: LGTM!

The import of Filter is necessary for the new filter deletion logic and is correctly placed.

coderabbitai[bot]
coderabbitai bot previously approved these changes Dec 23, 2025
Copy link
Author

@spuckhafte spuckhafte left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let PARSEABLE::metastore::get_filters be queryable using logstream name or id.

The current implementation of it fetches all the filters across existing logstreams at once.

Copy link
Contributor

@parmesant parmesant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move filter deletion so that it happens after stream deletion. Also, moving the filter deletion logic in a utility function would be better.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/parseable/mod.rs (1)

238-245: Simplify the boolean return expression.

The if-else construct is redundant when returning a boolean derived from a condition. This can be simplified to a single expression.

🔎 Proposed simplification
-    // check if a stream exists
-    pub fn check_stream_exists(&self, stream_name: &str) -> bool {
-        if self.streams.contains(stream_name) {
-            return true;
-        } else {
-            return false;
-        }
-    }
+    /// Checks if a stream exists in the in-memory streams map.
+    pub fn check_stream_exists(&self, stream_name: &str) -> bool {
+        self.streams.contains(stream_name)
+    }
src/metastore/metastores/object_store_metastore.rs (1)

549-579: Use self methods instead of global PARSEABLE.metastore calls.

The implementation calls PARSEABLE.metastore.get_filters() and PARSEABLE.metastore.delete_filter() instead of using self.get_filters() and self.delete_filter(). This is inconsistent with other methods in this trait implementation and creates unnecessary indirection through the global static. Using self is more idiomatic and avoids the implicit assumption that PARSEABLE.metastore is the same instance.

🔎 Proposed fix
     // clear filters associated to a deleted stream
     async fn delete_zombie_filters(&self, stream_name: &str) -> Result<bool, MetastoreError> {
         // stream should not exist in order to have zombie filters
         if PARSEABLE.check_stream_exists(stream_name) {
             warn!("no zombie filters cleared for [undeleted] stream {}", stream_name);
             return Ok(false);
         }

-        let all_filters = match PARSEABLE.metastore.get_filters().await {
-            Ok(all_f) => all_f,
-            Err(e) => {
-                return Err(e);
-            }
-        };
+        let all_filters = self.get_filters().await?;

         // collect filters associated with the logstream being deleted
         let filters_for_stream: Vec<Filter> = all_filters
             .into_iter()
             .filter(|filter| filter.stream_name == stream_name)
             .collect();

         for filter in filters_for_stream.iter() {
-            PARSEABLE.metastore.delete_filter(filter).await?;
-            
+            self.delete_filter(filter).await?;
+
             if let Some(filter_id) = filter.filter_id.as_ref() {
                 FILTERS.delete_filter(filter_id).await;
             }
         }

-        return Ok(true);
+        Ok(true)
     }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 19704d9 and d812994.

📒 Files selected for processing (4)
  • src/handlers/http/logstream.rs
  • src/metastore/metastore_traits.rs
  • src/metastore/metastores/object_store_metastore.rs
  • src/parseable/mod.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/logstream.rs
🧰 Additional context used
🧠 Learnings (7)
📓 Common learnings
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/parseable/mod.rs
  • src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/parseable/mod.rs
📚 Learning: 2025-07-28T17:10:39.448Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1392
File: src/migration/stream_metadata_migration.rs:303-322
Timestamp: 2025-07-28T17:10:39.448Z
Learning: In Parseable's migration system (src/migration/stream_metadata_migration.rs), each migration function updates the metadata to the current latest format using CURRENT_OBJECT_STORE_VERSION and CURRENT_SCHEMA_VERSION constants, rather than producing incremental versions. For example, v5_v6 function produces v7 format output when these constants are set to "v7", not v6 format.

Applied to files:

  • src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).

Applied to files:

  • src/metastore/metastores/object_store_metastore.rs
🧬 Code graph analysis (2)
src/metastore/metastore_traits.rs (2)
src/metastore/metastores/object_store_metastore.rs (1)
  • delete_zombie_filters (550-579)
src/validator.rs (1)
  • stream_name (36-71)
src/metastore/metastores/object_store_metastore.rs (2)
src/users/filters.rs (1)
  • migrate_v1_v2 (181-193)
src/metastore/metastore_traits.rs (1)
  • delete_zombie_filters (111-111)
🔇 Additional comments (3)
src/metastore/metastore_traits.rs (1)

111-111: LGTM - trait method signature is appropriate.

The new delete_zombie_filters method follows the existing trait patterns and is logically grouped with other filter methods. Consider adding a doc comment to clarify the return semantics (true = filters deleted, false = stream still exists).

src/metastore/metastores/object_store_metastore.rs (2)

59-59: LGTM - import updated correctly.

The FILTERS import is necessary for in-memory cache cleanup during zombie filter deletion.


570-576: Consider partial failure handling.

If deletion fails mid-loop (e.g., network error on the 3rd of 5 filters), the method returns an error but some filters have already been deleted from storage. The in-memory FILTERS cache may become inconsistent with storage state. Consider whether to:

  1. Continue deleting remaining filters and collect errors
  2. Log which filters were successfully deleted before the error

This is a minor robustness concern since the operation can be retried.

coderabbitai[bot]
coderabbitai bot previously approved these changes Dec 24, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/metastore/metastores/object_store_metastore.rs (1)

549-579: LGTM! Zombie filter cleanup logic is sound.

The implementation correctly:

  • Guards against cleaning filters for active streams (line 552)
  • Fetches and filters to stream-specific filters
  • Removes from both storage and in-memory cache for consistency

The dual deletion (storage + in-memory) at lines 571-574 is essential to prevent stale filter references.

Optional: Simplify code style
-        let all_filters = match self.get_filters().await {
-            Ok(all_f) => all_f,
-            Err(e) => {
-                return Err(e);
-            }
-        };
+        let all_filters = self.get_filters().await?;
 
         // collect filters associated with the logstream being deleted
         let filters_for_stream: Vec<Filter> = all_filters
@@ -569,13 +564,11 @@
 
         for filter in filters_for_stream.iter() {
             self.delete_filter(filter).await?;
-            
             if let Some(filter_id) = filter.filter_id.as_ref() {
                 FILTERS.delete_filter(filter_id).await;
             }
         }
 
-        return Ok(true);
+        Ok(true)
     }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d812994 and 6e96d09.

📒 Files selected for processing (2)
  • src/metastore/metastores/object_store_metastore.rs
  • src/parseable/mod.rs
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-07-28T17:10:39.448Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1392
File: src/migration/stream_metadata_migration.rs:303-322
Timestamp: 2025-07-28T17:10:39.448Z
Learning: In Parseable's migration system (src/migration/stream_metadata_migration.rs), each migration function updates the metadata to the current latest format using CURRENT_OBJECT_STORE_VERSION and CURRENT_SCHEMA_VERSION constants, rather than producing incremental versions. For example, v5_v6 function produces v7 format output when these constants are set to "v7", not v6 format.

Applied to files:

  • src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).

Applied to files:

  • src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/metastore/metastores/object_store_metastore.rs
  • src/parseable/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/parseable/mod.rs
📚 Learning: 2025-09-05T09:18:44.813Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1425
File: src/query/mod.rs:484-495
Timestamp: 2025-09-05T09:18:44.813Z
Learning: In the Parseable system, stream names and column names cannot contain quotes, which eliminates SQL injection concerns when interpolating these identifiers directly into SQL queries in src/query/mod.rs.

Applied to files:

  • src/parseable/mod.rs
🧬 Code graph analysis (2)
src/metastore/metastores/object_store_metastore.rs (2)
src/users/filters.rs (1)
  • migrate_v1_v2 (181-193)
src/metastore/metastore_traits.rs (1)
  • delete_zombie_filters (111-111)
src/parseable/mod.rs (1)
src/validator.rs (1)
  • stream_name (36-71)
🔇 Additional comments (2)
src/parseable/mod.rs (1)

238-241: LGTM! Clean helper method.

The synchronous in-memory stream existence check is straightforward and fits the need for the metastore filter cleanup operation. The naming clearly conveys its purpose.

src/metastore/metastores/object_store_metastore.rs (1)

59-59: LGTM! Import supports in-memory cache cleanup.

The addition of FILTERS to imports is necessary for the in-memory filter deletion at line 574.

coderabbitai[bot]
coderabbitai bot previously approved these changes Dec 24, 2025
@nitisht nitisht requested a review from parmesant December 27, 2025 09:39
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/handlers/http/modal/utils/logstream_utils.rs (2)

112-125: Consider moving in-memory cache deletion inside the success branch.

Currently, if delete_filter fails on the metastore, the warning is logged but the in-memory FILTERS cache is still cleaned up (lines 122-124). This creates an inconsistency where the filter exists in persistent storage but not in memory—on restart, the filter would reappear.

Consider deleting from the cache only when metastore deletion succeeds:

🔎 Proposed fix
     for filter in filters_for_stream.iter() {
-        if let Err(err) = PARSEABLE.metastore.delete_filter(filter).await {
+        match PARSEABLE.metastore.delete_filter(filter).await {
+            Ok(_) => {
+                if let Some(filter_id) = filter.filter_id.as_ref() {
+                    FILTERS.delete_filter(filter_id).await;
+                }
+            }
+            Err(err) => {
             tracing::warn!(
                 "failed to delete the zombie filter: {} \nfrom storage. For logstream: {}\nError: {:#?}", 
                 filter.filter_name,
                 stream_name,
                 err 
             );
-        }
-
-        if let Some(filter_id) = filter.filter_id.as_ref() {
-            FILTERS.delete_filter(filter_id).await;
+            }
         }
     }

127-127: Remove explicit return for idiomatic Rust.

The trailing return keyword is unnecessary when the expression is the last statement in the function.

🔎 Proposed fix
-    return Ok(());
+    Ok(())
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6e96d09 and ffd81aa.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (2)
  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
🧰 Additional context used
🧠 Learnings (7)
📚 Learning: 2025-06-16T02:04:58.990Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.990Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/handlers/http/logstream.rs
🧬 Code graph analysis (2)
src/handlers/http/logstream.rs (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)
  • delete_zombie_filters (96-128)
src/handlers/http/modal/utils/logstream_utils.rs (2)
src/parseable/mod.rs (1)
  • storage (284-286)
src/storage/mod.rs (1)
  • from (182-188)
🔇 Additional comments (4)
src/handlers/http/logstream.rs (2)

23-23: LGTM!

Import correctly added for the new zombie filters cleanup utility.


83-90: LGTM with a note on failure semantics.

The zombie filter cleanup is correctly integrated after the stream is removed from memory. The error conversion to Anyhow loses type information but is acceptable for this use case.

Note: If this call fails after the stream has already been deleted from storage/memory (lines 61-79), the user receives an error response even though the stream deletion effectively succeeded. This is a reasonable trade-off since filter cleanup errors are logged with warnings and the function continues processing remaining filters.

src/handlers/http/modal/utils/logstream_utils.rs (2)

21-32: LGTM!

Imports are correctly updated to include the necessary types for the zombie filters cleanup feature.


82-93: LGTM!

Error types are well-defined using thiserror. The StreamFoundForZombieFilters wrapper provides clear error messaging, and the enum properly distinguishes between the stream-still-exists case and metastore errors.

coderabbitai[bot]
coderabbitai bot previously approved these changes Dec 28, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)

114-119: Prefer structured logging over embedded newlines.

The log message uses \n characters which may not render well in structured logging systems (JSON logs, etc.). Using tracing's field syntax is more portable.

🔎 Suggested structured logging format
-            tracing::warn!(
-                "failed to delete the zombie filter: {} \nfrom storage. For logstream: {}\nError: {:#?}", 
-                filter.filter_name,
-                stream_name,
-                err 
-            );
+            tracing::warn!(
+                filter_name = %filter.filter_name,
+                stream_name = %stream_name,
+                error = ?err,
+                "failed to delete zombie filter from storage"
+            );
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ffd81aa and fbc3b52.

📒 Files selected for processing (2)
  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/logstream.rs
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
🧬 Code graph analysis (1)
src/handlers/http/modal/utils/logstream_utils.rs (2)
src/parseable/mod.rs (1)
  • storage (284-286)
src/storage/mod.rs (1)
  • from (182-188)
🔇 Additional comments (3)
src/handlers/http/modal/utils/logstream_utils.rs (3)

21-32: Imports look correct for the new functionality.

The added imports support the new delete_zombie_filters function and error types appropriately.


82-93: Error types are well-defined.

The error enum and wrapper struct are appropriately structured. Using #[from] for automatic conversions is idiomatic.


104-110: The metastore trait does not support filtering by stream_name. The only available method is get_filters() which returns all filters regardless of stream. The current client-side filtering approach in lines 107-110 is the only viable implementation given the metastore API design.

Likely an incorrect or invalid review comment.

Comment on lines 112 to 125
for filter in filters_for_stream.iter() {
if let Err(err) = PARSEABLE.metastore.delete_filter(filter).await {
tracing::warn!(
"failed to delete the zombie filter: {} \nfrom storage. For logstream: {}\nError: {:#?}",
filter.filter_name,
stream_name,
err
);
} else { // ok: have the filter removed from memory only when the storage deletion succeeds
if let Some(filter_id) = filter.filter_id.as_ref() {
FILTERS.delete_filter(filter_id).await;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Find Filter struct definition
rg -n "struct Filter" -A 10 --type rust

Repository: parseablehq/parseable

Length of output: 4126


🏁 Script executed:

# Check how delete_zombie_filters is called
rg -n "delete_zombie_filters" --type rust -B 2 -A 2

Repository: parseablehq/parseable

Length of output: 1351


🏁 Script executed:

# Find where filter_id is used and if it's ever None
rg -n "filter_id" --type rust -C 3 | head -150

Repository: parseablehq/parseable

Length of output: 10295


🏁 Script executed:

# Find complete delete_zombie_filters implementation
rg -n "pub async fn delete_zombie_filters" -A 30 --type rust src/handlers/http/modal/utils/logstream_utils.rs

Repository: parseablehq/parseable

Length of output: 1442


🏁 Script executed:

# Find get_filters_for_stream implementation
rg -n "get_filters_for_stream" --type rust -A 10

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

# Check ZombieFiltersDeletionError definition
rg -n "enum ZombieFiltersDeletionError\|struct ZombieFiltersDeletionError" --type rust -A 5

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

# Search for ZombieFiltersDeletionError
rg -n "ZombieFiltersDeletionError" --type rust

Repository: parseablehq/parseable

Length of output: 422


🏁 Script executed:

# Check the file containing delete_zombie_filters for error definitions
head -100 src/handlers/http/modal/utils/logstream_utils.rs

Repository: parseablehq/parseable

Length of output: 3705


🏁 Script executed:

# Check how PARSEABLE.metastore.delete_filter is defined and what it returns
rg -n "fn delete_filter" --type rust -A 5

Repository: parseablehq/parseable

Length of output: 1590


🏁 Script executed:

# Check how filters are loaded from metastore and if filter_id is always set
rg -n "get_filters\(" --type rust -A 10 | head -80

Repository: parseablehq/parseable

Length of output: 3834


🏁 Script executed:

# Check ObjectStoreMetastore implementation of get_filters
rg -n "async fn get_filters" --type rust -A 20

Repository: parseablehq/parseable

Length of output: 3827


🏁 Script executed:

# Get the complete get_filters implementation
sed -n '457,530p' src/metastore/metastores/object_store_metastore.rs

Repository: parseablehq/parseable

Length of output: 3410


Partial filter deletion failures are silently swallowed, creating potential cleanup inconsistencies during stream deletion.

The function always returns Ok(()) even when individual filter deletions fail (line 113-119). While failures are logged, the caller (logstream.rs:85-87) uses the ? operator expecting error propagation on critical cleanup failures. This mismatch means incomplete filter cleanup during stream deletion will silently succeed.

Additionally, if a filter has filter_id = None, the metastore deletion succeeds (line 113) but the in-memory cache is never updated (line 121-123 only executes for Some), creating a storage/memory inconsistency.

Consider:

  1. Track and propagate deletion failures so the caller can handle incomplete cleanup appropriately
  2. Add a warning when filter_id is None after successful storage deletion
  3. Use structured logging fields instead of \n in the warning message

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)

102-119: Critical: Partial deletion failures still treated as success.

This is the same core issue flagged in the previous review. The function returns Ok(ZombieResourceCleanupOk { failed_deletions: ... }) even when some deletions fail. Since the caller uses .await?, any result with failed_deletions > 0 is treated as a complete success, leaving orphaned filters in storage without signaling the failure.

Consider propagating an error when failed_deletions > 0, or use a custom error type that forces the caller to handle partial failures explicitly.

Additionally, when filter_id is None after successful storage deletion (line 113), consider logging a warning to flag potential data inconsistencies.

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fbc3b52 and 332485d.

📒 Files selected for processing (3)
  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/metastore/mod.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/logstream.rs
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).

Applied to files:

  • src/metastore/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/metastore/mod.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-06-16T02:04:58.990Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.990Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
🧬 Code graph analysis (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)
src/parseable/mod.rs (1)
  • storage (284-286)
🔇 Additional comments (5)
src/metastore/mod.rs (3)

58-59: LGTM! Error variant is well-structured.

The new error variant appropriately captures the context needed for zombie resource cleanup scenarios, with clear naming and relevant fields.


149-162: LGTM! Error detail mapping is consistent.

The implementation correctly follows the established pattern for error detail construction, properly populating all fields including structured metadata.


174-174: LGTM! Status code is appropriate.

Returning 500 INTERNAL_SERVER_ERROR is correct for this scenario, as the presence of a stream during zombie resource cleanup indicates an internal logic error in the deletion sequence.

src/handlers/http/modal/utils/logstream_utils.rs (2)

22-31: LGTM! Imports are appropriate.

All newly added imports are necessary for the delete_zombie_filters implementation.


82-89: LGTM! Guard clause is appropriate.

The stream existence check correctly prevents zombie filter cleanup when the stream is still present, with proper error propagation.

Comment on lines 127 to 131
#[derive(Debug)]
pub struct ZombieResourceCleanupOk {
pub ok_deletions: i32,
pub failed_deletions: i32,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use unsigned integer types for deletion counts.

The ok_deletions and failed_deletions fields represent counts and cannot be negative. Use u32 or usize instead of i32.

🔎 Proposed fix
 #[derive(Debug)]
 pub struct ZombieResourceCleanupOk {
-    pub ok_deletions: i32,
-    pub failed_deletions: i32,
+    pub ok_deletions: u32,
+    pub failed_deletions: u32,
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[derive(Debug)]
pub struct ZombieResourceCleanupOk {
pub ok_deletions: i32,
pub failed_deletions: i32,
}
#[derive(Debug)]
pub struct ZombieResourceCleanupOk {
pub ok_deletions: u32,
pub failed_deletions: u32,
}
🤖 Prompt for AI Agents
In src/handlers/http/modal/utils/logstream_utils.rs around lines 127 to 131, the
struct ZombieResourceCleanupOk uses signed i32 for ok_deletions and
failed_deletions even though these are non-negative counts; change their types
to an unsigned integer (prefer u32 for stable cross-platform size or usize if
you need pointer-sized indexing) and update any code that constructs or reads
these fields to use the new type (adjust literal suffixes, casts, or function
signatures that accept/return these fields accordingly).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)

151-159: Cache compiled regex and handle potential panic.

Line 158 unwraps Regex::new, which will panic if the pattern is invalid. While unlikely given the controlled pattern construction, consider using expect() with a message or handling the error.

More importantly, compiling the regex on every call is inefficient. For better performance, cache the compiled regex using lazy_static or std::sync::OnceLock.

Note that this heuristic may produce false positives (e.g., database name in string literals or comments) or miss complex SQL constructs. This trade-off is acceptable for cleanup, but consider documenting the limitations.

🔎 Proposed optimization using OnceLock
+use std::sync::OnceLock;
+
 fn sql_contains_db(sql: &str, db: &str) -> bool {
     let escaped = regex::escape(db);
     let pattern = format!(
         r#"(?i)(^|[\s.(])(?:`{}`|"{}"|\[{}\]|{})([\s).,]|$)"#,
         escaped, escaped, escaped, escaped
     );
-    Regex::new(&pattern).unwrap().is_match(sql)
+    // Note: Still need to compile per db name, but could cache if db set is small
+    Regex::new(&pattern)
+        .expect("sql_contains_db: pattern construction should always produce valid regex")
+        .is_match(sql)
 }

Alternatively, if the set of stream names is bounded, consider caching compiled regexes in a HashMap<String, Regex>.

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 332485d and 82dae76.

📒 Files selected for processing (1)
  • src/handlers/http/modal/utils/logstream_utils.rs
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-06-16T02:04:58.990Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.990Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
🔇 Additional comments (2)
src/handlers/http/modal/utils/logstream_utils.rs (2)

19-32: Imports are appropriate for the new functionality.

The added imports (Regex, MetastoreError, PARSEABLE, FILTERS, Filter, FilterType) properly support the zombie filter cleanup logic.


85-90: Stream existence guard is correct.

The check prevents deleting filters for active streams, properly returning ZombieResourceStreamStillExists error.

Comment on lines 92 to 113
let all_filters: Vec<Filter> = PARSEABLE.metastore.get_filters().await?;


// collect filters associated with the logstream being deleted
let mut filters_for_stream = Vec::<Filter>::new();

for filter in all_filters.into_iter() {
match filter.query.filter_type {
FilterType::Filter => {
if filter.stream_name == stream_name {
filters_for_stream.push(filter);
}
},
FilterType::SQL => {
if let Some(sql) = filter.query.filter_query.as_ref()
&& self::sql_contains_db(&sql, stream_name) {
filters_for_stream.push(filter);
}
},
_ => continue
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash

# Check Rust edition
echo "=== Checking Rust Edition ==="
if [ -f "Cargo.toml" ]; then
  rg -n '^edition\s*=' Cargo.toml
else
  echo "Cargo.toml not found"
fi

# Find FilterType enum definition
echo -e "\n=== Finding FilterType enum ==="
rg -n 'enum FilterType' --type rs

# Find metastore.get_filters definition
echo -e "\n=== Finding get_filters method ==="
rg -n 'fn get_filters' --type rs

Repository: parseablehq/parseable

Length of output: 253


🏁 Script executed:

#!/bin/bash

# Check if filtered query methods exist in metastore
echo "=== Checking metastore implementation ==="
fd metastore --type rs | head -20

Repository: parseablehq/parseable

Length of output: 293


🏁 Script executed:

#!/bin/bash

# Get more context about the metastore trait/struct
echo "=== Searching for metastore get_filters ==="
rg -A 10 'fn get_filters' --type rs | head -40

echo -e "\n=== Searching for FilterType variants ==="
rg -A 15 'enum FilterType' --type rs

Repository: parseablehq/parseable

Length of output: 222


🏁 Script executed:

#!/bin/bash

# Find the specific file mentioned in the review
echo "=== Finding logstream_utils.rs ==="
find . -name "logstream_utils.rs" -type f

# Search for FilterType enum without type filter
echo -e "\n=== Searching for FilterType enum ==="
rg 'enum FilterType' | head -20

# Search for get_filters method
echo -e "\n=== Searching for get_filters method ==="
rg 'fn get_filters' | head -30

Repository: parseablehq/parseable

Length of output: 583


🏁 Script executed:

#!/bin/bash

# Search for metastore trait and its methods
echo "=== Searching for metastore trait ==="
rg 'trait.*[Mm]etastore' | head -20

echo -e "\n=== Searching for get_filters_by ==="
rg 'get_filters' | head -40

Repository: parseablehq/parseable

Length of output: 2501


🏁 Script executed:

#!/bin/bash

# Check the actual code in logstream_utils.rs
echo "=== Checking logstream_utils.rs context ==="
find . -name "logstream_utils.rs" -type f -exec cat -n {} \; | sed -n '85,120p'

# Also search for FilterType variants
echo -e "\n=== FilterType definition ==="
rg -B 2 -A 10 'enum FilterType'

Repository: parseablehq/parseable

Length of output: 2216


Fetching all filters may cause performance issues; wildcard pattern silently skips Search variant.

Line 92 fetches all filters from the metastore and then filters them in-memory by stream name. Consider whether the metastore API could support a filtered query (e.g., get_filters_by_stream(stream_name)) to reduce overhead when deployments have many filters.

The wildcard pattern _ => continue (line 111) silently skips the Search variant of FilterType and any future variants. Explicitly handle or document the intended behavior to prevent accidental omissions when new variants are added.

🤖 Prompt for AI Agents
In src/handlers/http/modal/utils/logstream_utils.rs around lines 92 to 113,
avoid fetching all filters then filtering in-memory which can be very expensive;
instead call a metastore method that returns filters scoped to the stream (e.g.,
implement/use get_filters_by_stream(stream_name).await? or add an indexed query
in PARSEABLE.metastore) and iterate only that result; additionally replace the
wildcard `_ => continue` with explicit handling for FilterType::Search (and any
other known variants) — either process Search according to intended semantics or
explicitly log/skip it with a comment so future variants are not silently
ignored, and ensure the code compiles with exhaustive matching or a documented
default arm.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
src/handlers/http/modal/utils/logstream_utils.rs (2)

142-146: Use unsigned integer types for deletion counts.

The ok_deletions and failed_deletions fields represent counts and cannot be negative. Using i32 is semantically incorrect. Change to u32 or usize for type safety.

🔎 Proposed fix
 #[derive(Debug)]
 pub struct ZombieResourceCleanupOk {
-    pub ok_deletions: i32,
-    pub failed_deletions: i32,
+    pub ok_deletions: u32,
+    pub failed_deletions: u32,
 }

Also update the counter variables at lines 114-115:

-    let mut deleted_filter_count = 0;
-    let mut undeleted_filter_count = 0;
+    let mut deleted_filter_count: u32 = 0;
+    let mut undeleted_filter_count: u32 = 0;

117-134: Partial deletion failures return success; add warning for missing filter_id.

The function returns Ok(ZombieResourceCleanupOk) even when individual filter deletions fail (lines 118-126). While failures are logged and counted in failed_deletions, the caller receives a successful Result, potentially masking incomplete cleanup. Consider whether the caller should handle partial failures differently, especially since the call site may use the ? operator expecting error propagation for critical cleanup issues.

Additionally, when filter_id is None (line 128), the filter is deleted from storage but not removed from the in-memory FILTERS cache (lines 128-130). Add a warning log when this occurs to aid debugging of storage/memory inconsistencies.

Minor: Use structured logging fields instead of \n in the error message (lines 119-124).

🔎 Suggested improvements
     for filter in filters_for_stream.iter() {
         if let Err(err) = PARSEABLE.metastore.delete_filter(filter).await {
             tracing::error!(
-                "failed to delete the zombie filter: {} \nfrom storage. For logstream: {}\nError: {:#?}", 
-                filter.filter_name,
-                stream_name,
-                err 
+                filter_name = %filter.filter_name,
+                stream_name = %stream_name,
+                error = ?err,
+                "failed to delete zombie filter from storage"
             );
 
             undeleted_filter_count += 1;
         } else { // ok: have the filter removed from memory only when the storage deletion succeeds
             if let Some(filter_id) = filter.filter_id.as_ref() {
                 FILTERS.delete_filter(filter_id).await;
+            } else {
+                tracing::warn!(
+                    filter_name = %filter.filter_name,
+                    stream_name = %stream_name,
+                    "filter deleted from storage but filter_id is None; cannot remove from in-memory cache"
+                );
             }
 
             deleted_filter_count += 1;
         }
     }
🧹 Nitpick comments (2)
src/handlers/http/modal/utils/logstream_utils.rs (2)

92-112: Performance concern: fetching all filters may not scale.

Line 92 fetches all filters from the metastore and filters in-memory (lines 98-112). As noted in a previous review, this approach doesn't scale when deployments have many filters. Consider adding a metastore API method like get_filters_by_stream(stream_name) or an indexed query to fetch only relevant filters.

Note: The FilterType::Search variant is now explicitly handled, addressing the previous concern about silent skipping via wildcard pattern.


150-158: Replace .unwrap() and optimize regex compilation.

Line 157 uses .unwrap() on regex compilation, which will panic if the pattern is malformed. While the pattern should be valid, prefer explicit error handling or use lazy_static/once_cell to compile the regex once at initialization.

Additionally, compiling the regex on every call is inefficient. Consider using lazy_static or once_cell::sync::Lazy to compile the regex pattern once and reuse it.

Note: This regex-based heuristic may produce false positives (e.g., stream name in SQL comments or string literals) or false negatives (complex aliasing). Document these limitations if this is acceptable for the current use case.

🔎 Proposed fix using once_cell

Add once_cell to dependencies if not already present, then:

+use once_cell::sync::Lazy;
+
 fn sql_contains_db(sql: &str, db: &str) -> bool {
     let escaped = regex::escape(db);
     let pattern = format!(
         r#"(?i)(^|[\s.(])(?:`{}`|"{}"|\[{}\]|{})([\s).,]|$)"#,
         escaped, escaped, escaped, escaped
     );
 
-    Regex::new(&pattern).unwrap().is_match(sql)
+    Regex::new(&pattern)
+        .map(|re| re.is_match(sql))
+        .unwrap_or(false) // or handle error appropriately
 }

For better performance with static pattern (if escaping isn't needed per-call), use:

use once_cell::sync::Lazy;

static DB_PATTERN: Lazy<Regex> = Lazy::new(|| {
    Regex::new(r#"(?i)(^|[\s.(])(?:`DATABASE`|"DATABASE"|\[DATABASE\]|DATABASE)([\s).,]|$)"#).unwrap()
});

However, since the pattern includes the escaped db parameter, the current per-call compilation is necessary unless you cache compiled patterns per stream name.

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 82dae76 and 5295f70.

📒 Files selected for processing (1)
  • src/handlers/http/modal/utils/logstream_utils.rs
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-06-16T02:04:58.990Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.990Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-05T09:27:12.659Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/users/filters.rs:116-121
Timestamp: 2025-09-05T09:27:12.659Z
Learning: The Filters::load() function in src/users/filters.rs is only called once at server initialization, so there's no risk of duplicate entries from repeated invocations.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
🧬 Code graph analysis (1)
src/handlers/http/modal/utils/logstream_utils.rs (2)
src/parseable/mod.rs (2)
  • storage (284-286)
  • new (180-194)
src/metadata.rs (1)
  • new (99-136)
🔇 Additional comments (2)
src/handlers/http/modal/utils/logstream_utils.rs (2)

106-109: Rust edition is correctly configured. The codebase specifies edition = "2024" in Cargo.toml, which fully supports the if let...&& guard syntax (available since edition 2021). No changes needed.


84-90: No actionable issues found. The stream existence check using PARSEABLE.streams.contains(stream_name) is appropriate for zombie filter cleanup. This check runs after stream deletion and serves as a sanity check to confirm deletion completed. Stream deletion in Parseable is not mode-specific, so mode-aware verification is unnecessary in this context.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
src/handlers/http/modal/utils/logstream_utils.rs (2)

124-141: Previous concerns about partial deletion handling remain applicable.

The issues regarding partial filter deletion failures being silently swallowed and the filter_id = None case causing storage/memory inconsistency were raised in a previous review and remain unaddressed. Please refer to the earlier comments for details.


149-153: Previous suggestion to use unsigned integer types remains applicable.

The recommendation to use u32 or usize instead of i32 for the count fields was raised in a previous review and remains unaddressed.

🧹 Nitpick comments (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)

157-172: Regex heuristic may have edge cases with complex SQL patterns.

The regex pattern handles common identifier quoting styles (backticks, double quotes, brackets, bare names), but be aware it may not catch all cases:

  • Schema-qualified names like schema.stream_name or "schema"."stream_name"
  • Subqueries or CTEs that reference the stream indirectly
  • Aliased table references

Since this is a cleanup heuristic for orphaned filters, false negatives (missed filters) are acceptable but could leave orphans. Consider adding a comment documenting this limitation for future maintainers.

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5295f70 and 8f50052.

📒 Files selected for processing (1)
  • src/handlers/http/modal/utils/logstream_utils.rs
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-06-16T02:04:58.990Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.990Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-05T09:27:12.659Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/users/filters.rs:116-121
Timestamp: 2025-09-05T09:27:12.659Z
Learning: The Filters::load() function in src/users/filters.rs is only called once at server initialization, so there's no risk of duplicate entries from repeated invocations.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
🔇 Additional comments (4)
src/handlers/http/modal/utils/logstream_utils.rs (4)

20-32: LGTM!

The new imports are correctly added and necessary for the delete_zombie_filters functionality.


105-119: Explicit handling of all FilterType variants is good.

The code now explicitly handles FilterType::Filter, FilterType::Search, and FilterType::SQL variants, which addresses exhaustive matching concerns. This is cleaner than using a wildcard pattern.


113-116: No issues found. The if let Some(sql) = ... && sql_filter_regex.is_match(sql) syntax is fully compatible with the project's minimum supported Rust version (1.88.0 and edition 2024), well beyond the 1.76.0 stabilization point for let-chains.


83-90: The current implementation is correct. The delete_zombie_filters function is called after stream deletion is already complete, and the check PARSEABLE.streams.contains() serves as a sanity check to ensure the stream is truly gone before cleaning up filters. If the stream still exists in memory, returning an error is appropriate. The pattern referenced in learnings applies to operations that load or access streams in query mode; this function is a post-deletion cleanup operation where the simple check is sufficient.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8f50052 and 8147ad4.

📒 Files selected for processing (1)
  • src/handlers/http/modal/query/querier_logstream.rs
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
🧬 Code graph analysis (1)
src/handlers/http/modal/query/querier_logstream.rs (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)
  • delete_zombie_filters (83-147)
🔇 Additional comments (1)
src/handlers/http/modal/query/querier_logstream.rs (1)

44-44: LGTM! Import correctly added for zombie filter cleanup.

The import is properly structured and necessary for the filter cleanup functionality added below.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
src/handlers/http/modal/utils/logstream_utils.rs (3)

94-123: Filtering logic explicitly handles all FilterType variants.

The match statement correctly handles Filter, Search, and SQL variants without a wildcard. The TODO comment at line 114 acknowledges that SQL filter matching relies on a regex heuristic rather than explicit stream name tracking.

Note that line 94 fetches all filters from the metastore, which may have performance implications in deployments with many filters. As previously mentioned, consider whether the metastore could support a filtered query API to reduce overhead.


128-146: Partial deletion failures return Ok() with non-zero failed_deletions.

The function always returns Ok(ZombieResourceCleanupOk { ... }) even when some filters fail to delete (tracked in failed_deletions). The caller in logstream.rs logs errors and continues, treating this as best-effort cleanup.

The log message at line 131 uses \n for formatting. Consider using structured logging fields instead:

tracing::error!(
    filter_name = %filter.filter_name,
    stream_name = %stream_name,
    error = ?err,
    "failed to delete zombie filter from storage"
);

The approach of only updating in-memory state after successful storage deletion (lines 140-141) is correct and prevents storage/memory inconsistency.

Based on learnings, this best-effort cleanup pattern (log and continue) matches the intentional design for cleanup operations.


154-158: Consider using unsigned integer types for deletion counts.

The ok_deletions and failed_deletions fields represent non-negative counts. Using unsigned types (u32 or usize) would better express this constraint and prevent accidental negative values.

🔎 Proposed fix
 #[derive(Debug)]
 pub struct ZombieResourceCleanupOk {
-    pub ok_deletions: i32,
-    pub failed_deletions: i32,
+    pub ok_deletions: u32,
+    pub failed_deletions: u32,
 }
🧹 Nitpick comments (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)

162-177: The regex pattern is overly defensive given stream name constraints.

Stream names are validated to contain only alphanumeric characters and hyphens/underscores (see validator.rs), which means the backtick, double-quote, and square-bracket patterns in the regex can never match. The unquoted identifier pattern is sufficient for valid stream names.

As the TODO at line 114 acknowledges, this entire approach is a temporary heuristic—SQL filters should ideally store stream names explicitly rather than parsing them from filter queries. The current pattern may produce false positives or false negatives with unusual SQL syntax.

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8147ad4 and 1a3564a.

📒 Files selected for processing (3)
  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/metastore/mod.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/metastore/mod.rs
🧰 Additional context used
🧠 Learnings (8)
📚 Learning: 2025-06-16T02:04:58.990Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.990Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-05T09:27:12.659Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/users/filters.rs:116-121
Timestamp: 2025-09-05T09:27:12.659Z
Learning: The Filters::load() function in src/users/filters.rs is only called once at server initialization, so there's no risk of duplicate entries from repeated invocations.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
🧬 Code graph analysis (1)
src/handlers/http/logstream.rs (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)
  • delete_zombie_filters (83-152)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/handlers/http/modal/query/querier_logstream.rs (1)

116-127: Consider adding stream context to the error log.

The error message on line 119 could be more helpful for debugging by including the stream name, similar to how other operations in this function log errors (e.g., line 112).

🔎 Proposed enhancement
     let cleanup_stats = match delete_zombie_filters(&stream_name).await {
         Ok(stats) => stats,
         Err(e) => {
-            tracing::error!("{}", e.to_string());
+            tracing::error!("failed to delete zombie filters for stream {}: {}", stream_name, e);
             ZombieResourceCleanupOk {
                 ok_deletions: 0,
                 failed_deletions: 0,
             }
         }
     };
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1a3564a and 373e9df.

📒 Files selected for processing (1)
  • src/handlers/http/modal/query/querier_logstream.rs
🧰 Additional context used
🧠 Learnings (8)
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
📚 Learning: 2025-06-16T02:04:58.990Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.990Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
📚 Learning: 2025-09-25T07:12:27.407Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
📚 Learning: 2025-04-07T13:23:10.092Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1288
File: src/handlers/http/modal/mod.rs:279-301
Timestamp: 2025-04-07T13:23:10.092Z
Learning: For critical operations like writing metadata to disk in NodeMetadata::put_on_disk(), it's preferred to let exceptions propagate (using expect/unwrap) rather than trying to recover with fallback mechanisms, as the failure indicates a fundamental system issue that needs immediate attention.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
🧬 Code graph analysis (1)
src/handlers/http/modal/query/querier_logstream.rs (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)
  • delete_zombie_filters (83-152)
🔇 Additional comments (1)
src/handlers/http/modal/query/querier_logstream.rs (1)

44-47: LGTM: Imports properly added.

The imports for ZombieResourceCleanupOk and delete_zombie_filters are correctly placed and follow the existing import structure.

Comment on lines +114 to +115
// clear filters associated to the deleted logstream
// clear filters associated to the deleted logstream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove duplicate comment.

The comment "clear filters associated to the deleted logstream" appears twice on consecutive lines.

🔎 Proposed fix
-    // clear filters associated to the deleted logstream
     // clear filters associated to the deleted logstream
     let cleanup_stats = match delete_zombie_filters(&stream_name).await {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// clear filters associated to the deleted logstream
// clear filters associated to the deleted logstream
// clear filters associated to the deleted logstream
let cleanup_stats = match delete_zombie_filters(&stream_name).await {
🤖 Prompt for AI Agents
In src/handlers/http/modal/query/querier_logstream.rs around lines 114 to 115,
there is a duplicated comment "clear filters associated to the deleted
logstream" on consecutive lines; remove the redundant line so the comment
appears only once (keep the single, correct comment and delete the extra
duplicate), then run a quick lint/format to ensure spacing and line numbers are
consistent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants