由于外部 API 的限制,我只想在固定的时间间隔内处理消息。一次只能发送一条消息。
为了按顺序处理所有消息,我的代码如下所示:
$conn = new AMQPConnection($rbmqHOST, $rbmqPORT, $rbmqUSER, $rbmqPASS, $rbmqVHOST);
$ch = $conn->channel();
$ch->exchange_declare($exchange, 'direct', false, true, false);
$ch->queue_declare($queue, false, true, false, false);
$ch->queue_bind($queue, $exchange, $queue);
function process_message($msg)
{
//do something ..
//ack msg
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
$ch->basic_consume($queue, $queue, false, false, false, false, 'process_message');
function shutdown($ch, $conn)
{
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
while (count($ch->callbacks)) {
$ch->wait();
}
在处理步骤中调用sleep()
不起作用。
有什么建议吗?
好的,知道了。(:
启发式方法的工作原理如下:
- 在所需的时间间隔内通过 cronjob 启动 php 消费者进程。
-
处理完消息后取消使用者。
... function process_message($msg) { //do something .. //ack msg $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //cancel consumer $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']); } ...
-
启动一个使用者进程后,检查另一个使用者进程是否已在运行。如果是这样,请中断以避免多个并行进程。只需检查来自
queue_declare()
的返回值:... list(,,$consumerCount) = $ch->queue_declare($rbmqAmazonInit, false, true, false, false); if ($consumerCount >= 1) { //another consumer is already runing exit; } ...