用PHP编写RabbitMQ的订阅方法


Writing a subscribe method of RabbitMQ in PHP

我有一个定义如下的函数:

public function subscribe($someQueue)
{
    $callback = function($msg){
        return $msg->body;
    };
    $this->channel->basic_consume( $someQueue, '', FALSE, TRUE, FALSE, FALSE, $callback);
    while(count($this->channel->callbacks)) {
         $this->channel->wait();
    }
}

我正在使用以下功能:

注意:以下几行位于不同的类文件中,因此创建了包含上述函数的类的对象。

$objRMQ = new RabbitMQ();
$msgBody = $objRMQ->subscribe("someQueue");
echo "message body returned from someMethod: ".$msgBody; 

基本上,我想将每条消息的正文返回到发布到队列的调用方函数。

电流输出:

message body returned from subscribe: NULL

预期输出:

holla, this is your message from queue

由于这个问题很老,但仍然没有答案,我将简要解释一下。你现在可能已经找到了答案,但这可能会帮助其他人在未来寻找。

这里的关键概念是"异步执行"。

当您使用basic_consume方法订阅频道时,您并不是要求立即执行一次回调,而是要求在消息可用时执行回调,然后每次有另一条消息可用时都执行回调。

在AMQPLib的情况下,您可以通过重复调用wait()方法来等待新消息;即此处:

while(count($this->channel->callbacks)) {
     $this->channel->wait();
}

仔细想想,你的代码中有两个错误:

  • return $msg->body无处返回。调用将发生在wait()方法实现的深处,并且$this->channel->wait()没有输出,因此无法对返回的值执行任何操作
  • 另一方面,当您从另一个类调用$objRMQ->subscribe("someQueue")时,您希望它返回一些内容,但该函数没有return语句。唯一的return语句位于传递给basic_consume的匿名函数中

解决方案基本上是执行所有消息处理-echo $msg->body,或者在回调内执行任何您想执行的实际处理-。如果您真的想在消息传入时收集数据,可以将其保存到回调之外可访问的某个变量中,但请记住,在某个时刻,您需要脱离wait()循环才能对该数据执行任何操作。