Skip to content

Commit a96044d

Browse files
committed
Always downgrade to the shared seqno
Should be pretty recent!
1 parent c37bf5b commit a96044d

File tree

3 files changed

+28
-27
lines changed

3 files changed

+28
-27
lines changed

src/persist-client/src/internal/machine.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ where
620620
pub async fn downgrade_since(
621621
&self,
622622
reader_id: &LeasedReaderId,
623-
outstanding_seqno: Option<SeqNo>,
623+
outstanding_seqno: SeqNo,
624624
new_since: &Antichain<T>,
625625
heartbeat_timestamp_ms: u64,
626626
) -> (SeqNo, Since<T>, RoutineMaintenance) {
@@ -1615,7 +1615,7 @@ pub mod datadriven {
16151615
args: DirectiveArgs<'_>,
16161616
) -> Result<String, anyhow::Error> {
16171617
let since = args.expect_antichain("since");
1618-
let seqno = args.optional("seqno");
1618+
let seqno = args.optional("seqno").unwrap_or(datadriven.machine.seqno());
16191619
let reader_id = args.expect("reader_id");
16201620
let (_, since, routine) = datadriven
16211621
.machine

src/persist-client/src/internal/state.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1877,7 +1877,7 @@ where
18771877
&mut self,
18781878
reader_id: &LeasedReaderId,
18791879
seqno: SeqNo,
1880-
outstanding_seqno: Option<SeqNo>,
1880+
outstanding_seqno: SeqNo,
18811881
new_since: &Antichain<T>,
18821882
heartbeat_timestamp_ms: u64,
18831883
) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
@@ -1905,18 +1905,15 @@ where
19051905
reader_state.last_heartbeat_timestamp_ms,
19061906
);
19071907

1908-
let seqno = match outstanding_seqno {
1909-
Some(outstanding_seqno) => {
1910-
assert!(
1911-
outstanding_seqno >= reader_state.seqno,
1912-
"SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1908+
let seqno = {
1909+
assert!(
1910+
outstanding_seqno >= reader_state.seqno,
1911+
"SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
19131912
is behind current reader_state ({:?})",
1914-
outstanding_seqno,
1915-
reader_state.seqno,
1916-
);
1917-
std::cmp::min(outstanding_seqno, seqno)
1918-
}
1919-
None => seqno,
1913+
outstanding_seqno,
1914+
reader_state.seqno,
1915+
);
1916+
std::cmp::min(outstanding_seqno, seqno)
19201917
};
19211918

19221919
reader_state.seqno = seqno;
@@ -3231,7 +3228,7 @@ pub(crate) mod tests {
32313228
state.collections.downgrade_since(
32323229
&reader,
32333230
seqno,
3234-
None,
3231+
seqno,
32353232
&Antichain::from_elem(2),
32363233
now()
32373234
),
@@ -3243,7 +3240,7 @@ pub(crate) mod tests {
32433240
state.collections.downgrade_since(
32443241
&reader,
32453242
seqno,
3246-
None,
3243+
seqno,
32473244
&Antichain::from_elem(2),
32483245
now()
32493246
),
@@ -3255,7 +3252,7 @@ pub(crate) mod tests {
32553252
state.collections.downgrade_since(
32563253
&reader,
32573254
seqno,
3258-
None,
3255+
seqno,
32593256
&Antichain::from_elem(1),
32603257
now()
32613258
),
@@ -3280,7 +3277,7 @@ pub(crate) mod tests {
32803277
state.collections.downgrade_since(
32813278
&reader2,
32823279
seqno,
3283-
None,
3280+
seqno,
32843281
&Antichain::from_elem(3),
32853282
now()
32863283
),
@@ -3292,7 +3289,7 @@ pub(crate) mod tests {
32923289
state.collections.downgrade_since(
32933290
&reader,
32943291
seqno,
3295-
None,
3292+
seqno,
32963293
&Antichain::from_elem(5),
32973294
now()
32983295
),
@@ -3324,7 +3321,7 @@ pub(crate) mod tests {
33243321
state.collections.downgrade_since(
33253322
&reader3,
33263323
seqno,
3327-
None,
3324+
seqno,
33283325
&Antichain::from_elem(10),
33293326
now()
33303327
),
@@ -3624,7 +3621,7 @@ pub(crate) mod tests {
36243621
state.collections.downgrade_since(
36253622
&reader,
36263623
SeqNo::minimum(),
3627-
None,
3624+
SeqNo::minimum(),
36283625
&Antichain::from_elem(2),
36293626
now()
36303627
),

src/persist-client/src/read.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -581,15 +581,15 @@ impl LeaseMetadata {
581581
lease.clone()
582582
}
583583

584-
pub fn outstanding_seqno(&mut self) -> Option<SeqNo> {
584+
pub fn outstanding_seqno(&mut self) -> SeqNo {
585585
while let Some(first) = self.leases.first_entry() {
586586
if first.get().count() <= 1 {
587587
first.remove();
588588
} else {
589-
return Some(*first.key());
589+
return *first.key();
590590
}
591591
}
592-
None
592+
self.recent_seqno
593593
}
594594
}
595595

@@ -700,8 +700,12 @@ where
700700
&self.since
701701
}
702702

703-
fn outstanding_seqno(&mut self) -> Option<SeqNo> {
704-
self.leased_seqnos.modify(|s| s.outstanding_seqno())
703+
fn outstanding_seqno(&mut self) -> SeqNo {
704+
let current_seqno = self.machine.seqno();
705+
self.leased_seqnos.modify(|s| {
706+
s.observe_seqno(current_seqno);
707+
s.outstanding_seqno()
708+
})
705709
}
706710

707711
/// Forwards the since frontier of this handle, giving up the ability to
@@ -1611,7 +1615,7 @@ mod tests {
16111615

16121616
// We should expect the SeqNo to be downgraded if this part's SeqNo
16131617
// is no longer leased to any other parts, either.
1614-
let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1618+
let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > part_seqno;
16151619

16161620
let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
16171621
if expect_downgrade {

0 commit comments

Comments
 (0)