Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde_json::Value as JsonValue;
use sqlx::{Executor, PgPool, Postgres};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::RwLock;
use uuid::Uuid;
Expand All @@ -24,6 +25,8 @@ struct SpawnOptionsDb<'a> {
retry_strategy: Option<&'a RetryStrategy>,
#[serde(skip_serializing_if = "Option::is_none")]
cancellation: Option<CancellationPolicyDb>,
#[serde(skip_serializing_if = "Option::is_none")]
parent_task_id: Option<&'a Uuid>,
}

/// Internal struct for serializing cancellation policy (only non-None fields).
Expand Down Expand Up @@ -108,13 +111,42 @@ where
State: Clone + Send + Sync + 'static,
{
pool: PgPool,
owns_pool: bool,
owns_pool: AtomicBool,
queue_name: String,
spawn_defaults: SpawnDefaults,
registry: Arc<RwLock<TaskRegistry<State>>>,
state: State,
}

impl<State: Clone + Send + Sync> Durable<State> {
/// TODO: Decide if we want to implement `Clone`,
/// which will allow consumers to clone `Durable`
/// Currently, we only allow cloning with in the crate
/// via this method
pub(crate) fn clone_inner(&self) -> Durable<State> {
// When we clone a durable client, mark *ourself* as no longer owning the pool
// This will cause `Durable.close()` to be a no-op, since something else could
// still be using the pool.
// sqlx itself will still close the pool when the last reference to it is dropped.
// At the moment, we only call `clone_inner` when spawning a worker, which has its own
// `shutdown()` method.
self.owns_pool.store(false, Ordering::Relaxed);
Durable {
pool: self.pool.clone(),
// A clone of a durable client never owns the pool, so we set this to false
owns_pool: AtomicBool::new(false),
queue_name: self.queue_name.clone(),
spawn_defaults: self.spawn_defaults.clone(),
registry: self.registry.clone(),
state: self.state.clone(),
}
}

pub(crate) fn registry(&self) -> &Arc<RwLock<TaskRegistry<State>>> {
&self.registry
}
}

/// Builder for configuring a [`Durable`] client.
///
/// # Example
Expand Down Expand Up @@ -252,7 +284,7 @@ impl DurableBuilder {

Ok(Durable {
pool,
owns_pool,
owns_pool: AtomicBool::new(owns_pool),
queue_name: self.queue_name,
spawn_defaults: self.spawn_defaults,
registry: Arc::new(RwLock::new(HashMap::new())),
Expand Down Expand Up @@ -557,6 +589,7 @@ where
.cancellation
.as_ref()
.and_then(CancellationPolicyDb::from_policy),
parent_task_id: options.parent_task_id.as_ref(),
};
serde_json::to_value(db_options)
}
Expand Down Expand Up @@ -692,20 +725,12 @@ where
});
}

Ok(Worker::start(
self.pool.clone(),
self.queue_name.clone(),
self.registry.clone(),
options,
self.state.clone(),
self.spawn_defaults.clone(),
)
.await)
Ok(Worker::start(self.clone_inner(), options).await)
}

/// Close the client. Closes the pool if owned.
pub async fn close(self) {
if self.owns_pool {
if self.owns_pool.load(Ordering::Relaxed) {
self.pool.close().await;
}
}
Expand Down
Loading