PHP AMQP使用者过了一段时间没有响应


PHP AMQP consumer does not respond after a while

我在这里遇到了一个php amqp使用者的小问题,它过了一段时间就停止工作了。下面你可以看到我的silex命令。我还尝试过使用心跳和保活配置来处理断开的网络连接,但它没有改变。消费者没有从队列中读取消息的原因是什么?剧本并没有停止,只是看起来在睡觉。

<?php
use Symfony'Component'Console'Input'InputInterface;
use Symfony'Component'Console'Output'OutputInterface;
use Knp'Command'Command as BaseCommand;
use PhpAmqpLib'Message'AMQPMessage;
class RequestWorkerCommand extends BaseCommand
{
    protected function configure()
    {
        $this->setName('queue:worker');
    }
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $app = $this->getSilexApplication();
        $amqp = $app['amqp.connection']; /* @var $amqp 'PhpAmqpLib'Connection'AMQPStreamConnection */
        $channel = $amqp->channel();
        $callback = function($message) use ($input, $output) {
            return call_user_func_array([$this, 'processMessage'], [$message, $input, $output]);
        };
        $channel->queue_declare('myqueue', false, true, false, false);
        $channel->basic_qos(null, 1, null);
        $channel->basic_consume('myqueue', '', false, false, false, false, $callback);
        while(count($channel->callbacks)) {
            $output->writeln('Waiting for incoming price requests');
            $channel->wait();
        }
    }
    protected function processMessage(AMQPMessage $message, InputInterface $input, OutputInterface $output)
    {
        $app = $this->getSilexApplication();
        try {
            $data = json_decode($message->body, true);
            $request = Request::createFromArray($data); /* create object from data */
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
            $app['distributor']->distribute($request); /* process message */
        } catch ('Exception $e) { /* handle error */ }
    }
}

我不能确定PHP,但我在Python/kombu中遇到了类似的问题。纯puython amqplib从未做过心跳,尽管我给了它这样做的指令。当我改用librabbitmq(它包裹在rabbitmq-c周围)作为替代时,心跳不再是一个问题,我的消费者也不再挂断我的电话。