我需要使用PHP在ActiveMQ队列中找到一条特定的消息并将其删除。
AFAIK唯一的方法是读取当前排队的所有消息,并确认我感兴趣的一条消息。
所以,我写了这个代码(显然,这只是相关的部分):
class StompController {
private $con;
public function __construct($stompSettings) {
try {
$this->con = new Stomp($stompSettings['scheme']."://".$stompSettings['host'].":".$stompSettings['port']);
$this->con->connect();
$this->con->setReadTimeout(5);
} catch(StompException $e) {
die('Connection failed:' .$e->getMessage());
}
}
public function __destruct() {
$this->con->disconnect();
}
public function ackMessageAsRead($recipient,$message) {
if($this->con->isConnected()) {
//Subscribe to the recipient user's message queue.
$this->con->subscribe("/queue/".$recipient);
//Read all messages currently in the queue (but only ACK the one we're interested in).
while($this->con->hasFrameToRead()) {
$msg = $this->con->readFrame();
if($msg != null && $msg != false) {
//This is the message we are currently reading, ACK it to AMQ and be done with it.
if($msg->body == $message) {
$this->con->ack($msg);
}
}
}
} else {
return false;
}
}
}
根据我的逻辑,这应该可行。在运行代码时,尽管检查了更多的帧,但只读取了一条随机消息。
下一帧似乎只有在我们当前阅读的帧被确认后才能准备好。(当我手动确认所有消息时,while
循环按预期工作,所有消息都被处理。
有人知道如何在不确认所有消息的情况下从队列中获取完整的消息集吗?我可以确认所有消息,然后把我不感兴趣的消息放回队列,但这种已经低效的查找单个消息的方式会变得低效得多。
我认为这是由于将activemq.prefetchSize
设置为1造成的问题。ActiveMQ使用预取大小来确定在任何时间点可以向使用者调度多少消息。一旦达到预取大小,就不会再向使用者发送消息,直到使用者开始发回确认为止。据我所知,增加预取大小应该可以解决您的问题。
请阅读http://activemq.apache.org/what-is-the-prefetch-limit-for.html有关预取限制的更多详细信息。