Skip to content

Commit 71751b9

Browse files
committed
try to intercept messages
1 parent b78a27f commit 71751b9

File tree

6 files changed

+275
-19
lines changed

6 files changed

+275
-19
lines changed

README.md

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ func main() {
5353
}).WithConcurrent(4) // set the number of concurrent consumers
5454
// send delay message
5555
for i := 0; i < 10; i++ {
56-
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
56+
_, err := queue.SendDelayMsgV2(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
5757
if err != nil {
5858
panic(err)
5959
}
6060
}
6161
// send schedule message
6262
for i := 0; i < 10; i++ {
63-
err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Hour))
63+
_, err := queue.SendScheduleMsgV2(strconv.Itoa(i), time.Now().Add(time.Hour))
6464
if err != nil {
6565
panic(err)
6666
}
@@ -71,6 +71,8 @@ func main() {
7171
}
7272
```
7373

74+
> `SendScheduleMsgV2` (`SendDelayMsgV2`) is fully compatible with `SendScheduleMsg` (`SendDelayMsg`)
75+
7476
> Please note that redis/v8 is not compatible with redis cluster 7.x. [detail](https://github.com/redis/go-redis/issues/2085)
7577
7678
> If you are using redis client other than go-redis, you could wrap your redis client into [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) interface
@@ -95,6 +97,30 @@ func producer() {
9597
}
9698
```
9799

100+
## Intercept/Delete Messages
101+
102+
```go
103+
msg, err := queue.SendScheduleMsgV2(strconv.Itoa(i), time.Now().Add(time.Second))
104+
if err != nil {
105+
panic(err)
106+
}
107+
result, err := queue.TryIntercept(msg)
108+
if err != nil {
109+
panic(err)
110+
}
111+
if result.Intercepted {
112+
println("interception success!")
113+
} else {
114+
println("interception failed, message has been consumed!")
115+
}
116+
```
117+
118+
`SendScheduleMsgV2` and `SendDelayMsgV2` return a structure which contains message tracking information.Then passing it to `TryIntercept` to try to intercept the consumption of the message.
119+
120+
If the message is pending or waiting to consume the interception will succeed.If the message has been consumed or is awaiting retry, the interception will fail, but TryIntercept will prevent subsequent retries.
121+
122+
TryIntercept returns a InterceptResult, which Intercepted field indicates whether it is successful.
123+
98124
## Options
99125

100126
### Consume Function

README_CN.md

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ func main() {
4848
}).WithConcurrent(4) // 设置消费者并发数
4949
// 发送延时投递消息
5050
for i := 0; i < 10; i++ {
51-
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
51+
_, err := queue.SendDelayMsgV2(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
5252
if err != nil {
5353
panic(err)
5454
}
5555
}
5656
// 发送定时投递消息
5757
for i := 0; i < 10; i++ {
58-
err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Hour))
58+
_, err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Hour))
5959
if err != nil {
6060
panic(err)
6161
}
@@ -90,6 +90,30 @@ func producer() {
9090
}
9191
```
9292

93+
## 拦截消息/删除消息
94+
95+
```go
96+
msg, err := queue.SendScheduleMsgV2(strconv.Itoa(i), time.Now().Add(time.Second))
97+
if err != nil {
98+
panic(err)
99+
}
100+
result, err := queue.TryIntercept(msg)
101+
if err != nil {
102+
panic(err)
103+
}
104+
if result.Intercepted {
105+
println("拦截成功!")
106+
} else {
107+
println("拦截失败,消息已被消费!")
108+
}
109+
```
110+
111+
`SendScheduleMsgV2``SendDelayMsgV2` 返回一个可以跟踪消息的结构体。然后将其传递给 `TryIntercept` 就可以尝试拦截消息的消费。
112+
113+
如果消息处于待处理状态(pending)或等待消费(ready),则可以成功拦截。如果消息已被消费或正在等待重试,则无法拦截,但 TryIntercept 将阻止后续重试。
114+
115+
TryIntercept 返回一个 InterceptResult,其中的 Intercepted 字段会表示拦截是否成功。
116+
93117
## 选项
94118

95119
### 回调函数

delayqueue.go

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,11 @@ type RedisCli interface {
6666
SMembers(key string) ([]string, error)
6767
SRem(key string, members []string) error
6868
ZAdd(key string, values map[string]float64) error
69-
ZRem(key string, fields []string) error
69+
ZRem(key string, fields []string) (int64, error)
7070
ZCard(key string) (int64, error)
71+
ZScore(key string, member string) (float64, error)
7172
LLen(key string) (int64, error)
73+
LRem(key string, count int64, value string) (int64, error)
7274

7375
// Publish used for monitor only
7476
Publish(channel string, payload string) error
@@ -206,7 +208,7 @@ func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
206208
return q
207209
}
208210

209-
// WithNackRedeliveryDelay customizes the interval between redelivery and nack (callback returns false)
211+
// WithNackRedeliveryDelay customizes the interval between redelivery and nack (callback returns false)
210212
// If consumption exceeded deadline, the message will be redelivered immediately
211213
func (q *DelayQueue) WithNackRedeliveryDelay(d time.Duration) *DelayQueue {
212214
q.nackRedeliveryDelay = d
@@ -236,8 +238,25 @@ func WithMsgTTL(d time.Duration) interface{} {
236238
return msgTTLOpt(d)
237239
}
238240

239-
// SendScheduleMsg submits a message delivered at given time
240-
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
241+
// MessageInfo stores information to trace a message
242+
type MessageInfo struct {
243+
id string
244+
}
245+
246+
func (msg *MessageInfo) ID() string {
247+
return msg.id
248+
}
249+
250+
const (
251+
StatePending = "pending"
252+
StateReady = "ready"
253+
StateReadyRetry = "ready_to_retry"
254+
StateConsuming = "consuming"
255+
StateUnknown = "unknown"
256+
)
257+
258+
// SendScheduleMsgV2 submits a message delivered at given time
259+
func (q *DelayQueue) SendScheduleMsgV2(payload string, t time.Time, opts ...interface{}) (*MessageInfo, error) {
241260
// parse options
242261
retryCount := q.defaultRetryCount
243262
for _, opt := range opts {
@@ -255,28 +274,90 @@ func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interf
255274
msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
256275
err := q.redisCli.Set(q.genMsgKey(idStr), payload, msgTTL)
257276
if err != nil {
258-
return fmt.Errorf("store msg failed: %v", err)
277+
return nil, fmt.Errorf("store msg failed: %v", err)
259278
}
260279
// store retry count
261280
err = q.redisCli.HSet(q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
262281
if err != nil {
263-
return fmt.Errorf("store retry count failed: %v", err)
282+
return nil, fmt.Errorf("store retry count failed: %v", err)
264283
}
265284
// put to pending
266285
err = q.redisCli.ZAdd(q.pendingKey, map[string]float64{idStr: float64(t.Unix())})
267286
if err != nil {
268-
return fmt.Errorf("push to pending failed: %v", err)
287+
return nil, fmt.Errorf("push to pending failed: %v", err)
269288
}
270289
q.reportEvent(NewMessageEvent, 1)
271-
return nil
290+
return &MessageInfo{
291+
id: idStr,
292+
}, nil
272293
}
273294

274295
// SendDelayMsg submits a message delivered after given duration
296+
func (q *DelayQueue) SendDelayMsgV2(payload string, duration time.Duration, opts ...interface{}) (*MessageInfo, error) {
297+
t := time.Now().Add(duration)
298+
return q.SendScheduleMsgV2(payload, t, opts...)
299+
}
300+
301+
// SendScheduleMsg submits a message delivered at given time
302+
// It is compatible with SendScheduleMsgV2, but does not return MessageInfo
303+
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
304+
_, err := q.SendScheduleMsgV2(payload, t, opts...)
305+
return err
306+
}
307+
308+
// SendDelayMsg submits a message delivered after given duration
309+
// It is compatible with SendDelayMsgV2, but does not return MessageInfo
275310
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error {
276311
t := time.Now().Add(duration)
277312
return q.SendScheduleMsg(payload, t, opts...)
278313
}
279314

315+
type InterceptResult struct {
316+
Intercepted bool
317+
State string
318+
}
319+
320+
// TryIntercept trys to intercept a message
321+
func (q *DelayQueue) TryIntercept(msg *MessageInfo) (*InterceptResult, error) {
322+
id := msg.ID()
323+
// try to intercept at ready
324+
removed, err := q.redisCli.LRem(q.readyKey, 0, id)
325+
if err != nil {
326+
q.logger.Printf("intercept %s from ready failed: %v", id, err)
327+
}
328+
if removed > 0 {
329+
_ = q.redisCli.Del([]string{q.genMsgKey(id)})
330+
_ = q.redisCli.HDel(q.retryCountKey, []string{id})
331+
return &InterceptResult{
332+
Intercepted: true,
333+
State: StateReady,
334+
}, nil
335+
}
336+
// try to intercept at pending
337+
removed, err = q.redisCli.ZRem(q.pendingKey, []string{id})
338+
if err != nil {
339+
q.logger.Printf("intercept %s from pending failed: %v", id, err)
340+
}
341+
if removed > 0 {
342+
_ = q.redisCli.Del([]string{q.genMsgKey(id)})
343+
_ = q.redisCli.HDel(q.retryCountKey, []string{id})
344+
return &InterceptResult{
345+
Intercepted: true,
346+
State: StatePending,
347+
}, nil
348+
}
349+
// message may be being consumed or has been successfully consumed
350+
// if the message has been successfully consumed, the following action will cause nothing
351+
// if the message is being consumed,the following action will prevent it from being retried
352+
q.redisCli.HDel(q.retryCountKey, []string{id})
353+
q.redisCli.LRem(q.retryKey, 0, id)
354+
355+
return &InterceptResult{
356+
Intercepted: false,
357+
State: StateUnknown,
358+
}, nil
359+
}
360+
280361
func (q *DelayQueue) loadScript(script string) (string, error) {
281362
sha1, err := q.redisCli.ScriptLoad(script)
282363
if err != nil {
@@ -420,7 +501,7 @@ func (q *DelayQueue) callback(idStr string) error {
420501

421502
func (q *DelayQueue) ack(idStr string) error {
422503
atomic.AddInt32(&q.fetchCount, -1)
423-
err := q.redisCli.ZRem(q.unAckKey, []string{idStr})
504+
_, err := q.redisCli.ZRem(q.unAckKey, []string{idStr})
424505
if err != nil {
425506
return fmt.Errorf("remove from unack failed: %v", err)
426507
}

delayqueue_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,3 +445,84 @@ func TestDelayQueue_NackRedeliveryDelay(t *testing.T) {
445445
return
446446
}
447447
}
448+
449+
func TestDelayQueue_TryIntercept(t *testing.T) {
450+
redisCli := redis.NewClient(&redis.Options{
451+
Addr: "127.0.0.1:6379",
452+
})
453+
redisCli.FlushDB(context.Background())
454+
cb := func(s string) bool {
455+
return false
456+
}
457+
queue := NewQueue("test", redisCli, cb).
458+
WithDefaultRetryCount(3).
459+
WithNackRedeliveryDelay(time.Minute)
460+
461+
// intercept pending message
462+
msg, err := queue.SendDelayMsgV2("foobar", time.Minute)
463+
if err != nil {
464+
t.Error(err)
465+
return
466+
}
467+
result, err := queue.TryIntercept(msg)
468+
if err != nil {
469+
t.Error(err)
470+
return
471+
}
472+
if !result.Intercepted {
473+
t.Error("expect intercepted")
474+
}
475+
476+
// intercept ready message
477+
msg, err = queue.SendScheduleMsgV2("foobar2", time.Now().Add(-time.Minute))
478+
if err != nil {
479+
t.Error(err)
480+
return
481+
}
482+
err = queue.pending2Ready()
483+
if err != nil {
484+
t.Error(err)
485+
return
486+
}
487+
result, err = queue.TryIntercept(msg)
488+
if err != nil {
489+
t.Error(err)
490+
return
491+
}
492+
if !result.Intercepted {
493+
t.Error("expect intercepted")
494+
}
495+
496+
// prevent from retry
497+
msg, err = queue.SendScheduleMsgV2("foobar3", time.Now().Add(-time.Minute))
498+
if err != nil {
499+
t.Error(err)
500+
return
501+
}
502+
ids, err := queue.beforeConsume()
503+
if err != nil {
504+
t.Errorf("consume error: %v", err)
505+
return
506+
}
507+
for _, id := range ids {
508+
queue.nack(id)
509+
}
510+
queue.afterConsume()
511+
result, err = queue.TryIntercept(msg)
512+
if err != nil {
513+
t.Error(err)
514+
return
515+
}
516+
if result.Intercepted {
517+
t.Error("expect not intercepted")
518+
return
519+
}
520+
ids, err = queue.beforeConsume()
521+
if err != nil {
522+
t.Errorf("consume error: %v", err)
523+
return
524+
}
525+
if len(ids) > 0 {
526+
t.Error("expect empty messages")
527+
}
528+
}

example/getstarted/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ func main() {
1919
}).WithConcurrent(4)
2020
// send delay message
2121
for i := 0; i < 10; i++ {
22-
err := queue.SendDelayMsg(strconv.Itoa(i), time.Second, delayqueue.WithRetryCount(3))
22+
_, err := queue.SendDelayMsgV2(strconv.Itoa(i), time.Second, delayqueue.WithRetryCount(3))
2323
if err != nil {
2424
panic(err)
2525
}
2626
}
2727
// send schedule message
2828
for i := 0; i < 10; i++ {
29-
err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Second))
29+
_, err := queue.SendScheduleMsgV2(strconv.Itoa(i), time.Now().Add(time.Second))
3030
if err != nil {
3131
panic(err)
3232
}

0 commit comments

Comments
 (0)