我正在实现RabbitMQ在另一台服务器上执行一些图像编辑操作。但是,有时请求可能会在源图像与服务器同步之前到达服务器-在这种情况下,我希望将消息弹出到队列中,并在所有其他操作完成后处理它。
然而,调用basic。设置了resubmit位的Nack使我的队列立即重新接收该消息——在任何实际可以完成的操作之前。
目前,我觉得我被迫实现一些逻辑,只是重新提交原始消息到交换,但我想避免这种情况。因为同样的消息可能已经在另一个服务器上成功处理过了(有它自己的队列),也因为我认为这是一个非常常见的模式,一定有更好的方法。
(哦,我在消费者和服务器代码中都使用php-amqplib)
谢谢!
Update:按照zaq178miami的建议,我使用死信交换解决了我的问题
我当前的解决方案:
- 在原始队列
$worker
上声明一个死信交换$dead_letter_exchange
- 声明恢复交换
$recovery_exchange
- 声明队列
$dead_queue
,x-message-ttl
设置为5秒,x-dead-letter-exchange
设置为$recovery_exchange
- 将
$dead_letter_queue
绑定到$dead_letter_exchange
- 将
$worker
绑定到$recovery_exchange
-
$dead_letter_exchange
和$recovery_exchange
是根据我正在消费的交易所和$worker
的值生成的名称
使每个被请求的消息只需要在5秒后返回到特定队列(服务器)上的worker进行重试。我可能仍然想应用一些逻辑,在$n
重试后丢弃消息。
我仍然愿意接受更好的想法;-)
看起来你有'竞态'问题,这是问题的原因。也许延迟消息发布是一个不错的选择,或者发布延迟消息以确保映像与目标机器同步,或者在映像到达时发布消息(这可能很棘手),或者只是按需同步映像(当消息被消耗时)。您甚至可以添加一些API来获取源图像,这样您就可以在任何时候轻松地横向扩展您的消费者。这个想法是使消费者尽可能地原子化和不可靠。
回到最初的问题,如果你可以选择,尝试死信交换将失败的消息移动到单独的队列。混合失败的消息和有效的消息,但没有明确的机制来检测重新发布的气味(由于潜在的循环问题、管理困难等原因)。但这真的取决于你的需求,消息率和硬件,如果一些解决方案产生稳定的结果,你确信它-坚持下去。
注意,如果您正在使用php-amqplib,您可以同时从多个队列开始消费消息,因此您可以从主队列和延迟消息中消费消息(但在这种情况下,您必须将消息发布到延迟队列延迟,以防止它立即消费)。
通常通过每个消息或每个队列ttl和额外队列完成延迟消息发布,并将DLX设置为主工作队列,或者在您的情况下为延迟消息队列。