当所有其他工作完成时,重新消费ack'ed消息


Re-consuming nack'ed messages when all the other work is done

我正在实现RabbitMQ在另一台服务器上执行一些图像编辑操作。但是,有时请求可能会在源图像与服务器同步之前到达服务器-在这种情况下,我希望将消息弹出到队列中,并在所有其他操作完成后处理它。

然而,调用basic。设置了resubmit位的Nack使我的队列立即重新接收该消息——在任何实际可以完成的操作之前。

目前,我觉得我被迫实现一些逻辑,只是重新提交原始消息到交换,但我想避免这种情况。因为同样的消息可能已经在另一个服务器上成功处理过了(有它自己的队列),也因为我认为这是一个非常常见的模式,一定有更好的方法。

(哦,我在消费者和服务器代码中都使用php-amqplib)

谢谢!

Update:按照zaq178miami的建议,我使用死信交换解决了我的问题

我当前的解决方案:

  • 在原始队列$worker上声明一个死信交换$dead_letter_exchange
  • 声明恢复交换$recovery_exchange
  • 声明队列$dead_queue, x-message-ttl设置为5秒,x-dead-letter-exchange设置为$recovery_exchange
  • $dead_letter_queue绑定到$dead_letter_exchange
  • $worker绑定到$recovery_exchange
  • $dead_letter_exchange$recovery_exchange是根据我正在消费的交易所和$worker
  • 的值生成的名称

使每个被请求的消息只需要在5秒后返回到特定队列(服务器)上的worker进行重试。我可能仍然想应用一些逻辑,在$n重试后丢弃消息。

我仍然愿意接受更好的想法;-)

看起来你有'竞态'问题,这是问题的原因。也许延迟消息发布是一个不错的选择,或者发布延迟消息以确保映像与目标机器同步,或者在映像到达时发布消息(这可能很棘手),或者只是按需同步映像(当消息被消耗时)。您甚至可以添加一些API来获取源图像,这样您就可以在任何时候轻松地横向扩展您的消费者。这个想法是使消费者尽可能地原子化和不可靠。

回到最初的问题,如果你可以选择,尝试死信交换将失败的消息移动到单独的队列。混合失败的消息和有效的消息,但没有明确的机制来检测重新发布的气味(由于潜在的循环问题、管理困难等原因)。但这真的取决于你的需求,消息率和硬件,如果一些解决方案产生稳定的结果,你确信它-坚持下去。

注意,如果您正在使用php-amqplib,您可以同时从多个队列开始消费消息,因此您可以从主队列和延迟消息中消费消息(但在这种情况下,您必须将消息发布到延迟队列延迟,以防止它立即消费)。

通常通过每个消息或每个队列ttl和额外队列完成延迟消息发布,并将DLX设置为主工作队列,或者在您的情况下为延迟消息队列。