我正在使用https://github.com/videlalvaro/php-amqplib做一些拉比的工作:
我正在尝试创建一个阻塞版本的basic_get(或一个我可以重复调用的basic_sume版本,每次只得到一条消息),它将阻塞直到消息准备好,然后返回消息,而不是在队列中没有消息时返回null。
当我试图用basic_sume获取一条消息时,事情会变得一团糟,最后我会收到一堆"还没有准备好"但未确认的消息。(如果我只以这种方式获取一条消息,它每次都有效,如果我尝试获取两条消息,有时会挂断,其他消息也有效)
class Foo {
...
private function blockingGet() {
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
*/
$this->ch->basic_consume($this->queueName, "consumer_".$this->consumerNum++, false, false, false, false, function($msg) {
$this->msgCache = json_decode($msg->body);
$this->ch->basic_ack($msg->delivery_info['delivery_tag']);
$this->ch->basic_cancel($msg->delivery_info['consumer_tag']);
});
while (count($this->ch->callbacks)) {
$this->ch->wait();
}
return $this->msgCache;
}
}
$q = new Foo();
for ($i = 0; $i < 5; $i++) {
print $q->blockingGet();
}
我已经实现了类似于您在这里所追求的东西,将接收到的消息保存在传递给$channel->basic_consume()
的回调参数的闭包中,然后在$channel->wait()
调用后处理它,因为如果接收到消息(或者设置了超时参数并达到超时),wait()
将返回控制。试试下面这样的东西:
class Foo {
// ...
public function __construct() {
$this->ch->basic_consume($this->queueName, "", false, false, false, false, function($msg) {
$this->msgCache = json_decode($msg->body);
$this->ch->basic_ack($msg->delivery_info['delivery_tag']);
});
}
// ...
private function blockingGet() {
$this->ch->wait();
if ($this->msgCache) {
$msgCache = $this->msgCache;
$this->msgCache = null;
return $msgCache;
}
return null;
}
}
$q = new Foo();
for ($i = 0; $i < 5; $i++) {
print $q->blockingGet();
}