按固定时间间隔使用消息/一次仅使用一条消息


consume messages on fixed time interval / only one message at once

由于外部 API 的限制,我只想在固定的时间间隔内处理消息。一次只能发送一条消息。

为了按顺序处理所有消息,我的代码如下所示:

$conn           = new AMQPConnection($rbmqHOST, $rbmqPORT, $rbmqUSER, $rbmqPASS, $rbmqVHOST);
$ch             = $conn->channel();
$ch->exchange_declare($exchange, 'direct', false, true, false);
$ch->queue_declare($queue, false, true, false, false);
$ch->queue_bind($queue, $exchange, $queue);
function process_message($msg)
{
    //do something ..
    //ack msg
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
$ch->basic_consume($queue, $queue, false, false, false, false, 'process_message');
function shutdown($ch, $conn)
{
    $ch->close();
    $conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
while (count($ch->callbacks)) {
    $ch->wait();
}

在处理步骤中调用sleep()不起作用。

有什么建议吗?

好的,知道了。(:

启发式方法的工作原理如下:

  1. 在所需的时间间隔内通过 cronjob 启动 php 消费者进程。
  2. 处理完消息后取消使用者。

    ...
    function process_message($msg)
    {
        //do something ..
        //ack msg
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        //cancel consumer
        $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
    }
    ...
    
  3. 启动一个使用者进程后,检查另一个使用者进程是否已在运行。如果是这样,请中断以避免多个并行进程。只需检查来自queue_declare()的返回值:

    ...
    list(,,$consumerCount) = $ch->queue_declare($rbmqAmazonInit, false, true, false, false);
    if ($consumerCount >= 1) {
        //another consumer is already runing
        exit;
    }
    ...