环境/背景:
使用PHP Stomp库发送和接收来自ActiveMQ(v.4.3(的消息。
步骤:
- 客户端向&请求队列的相关id头(say/queue/request(
- 订阅响应队列(say/queue/response(
- 读取帧
- ack
- 取消订阅
当不存在挂起消息或挂起消息<n.就我而言,n=200。当挂起消息的数量>200时,该消息不会传递。进程一直等待到超时,最后超时而没有响应。我可以在超时后看到消息(使用管理员UI(。这是我在这个案例中使用的代码:
<?php
// make a connection
$con = new Stomp("tcp://localhost:61616");
// Set read timeout.
$con->setReadTimeout(10);
// Prepare request variables.
$correlation_id = rand();
$request_queue = '/queue/com.domain.service.request';
$response_queue = '/queue/com.domain.service.response';
$selector = "JMSCorrelationID='$correlation_id'";
$headers = array('correlation-id' => $correlation_id, 'reply-to' => $response_queue);
$message = '<RequestBody></RequestBody>';
// send a message to the queue.
$con->send($request_queue, $message, $headers);
// subscribe to the queue
$con->subscribe($response_queue, array('selector' => $selector, 'ack' => 'auto'));
// receive a message from the queue
$msg = $con->readFrame();
// do what you want with the message
if ( $msg != null) {
echo "Received message with body'n";
var_dump($msg);
// mark the message as received in the queue
$con->ack($msg);
} else {
echo "Failed to receive a message'n";
}
unset($con);
其他发现:
从一个文件(比如sender.php(发送消息,并使用另一个脚本(比如receiver.php(接收消息,工作正常。
允许在同一请求队列中发送1000多条消息(最终处理并放入响应队列(。所以这看起来不像是内存问题。
有趣的是,在等待超时时,如果我在管理UI上浏览队列,就会得到响应。
默认情况下,我使用的stomp代理将预取大小设置为1。
在不了解更多信息的情况下,我的猜测是,您有多个消费者,其中一个占据了它的预取缓冲区中的消息,我认为默认大小是1000。要调试这样的情况,通常最好查看web控制台或连接jconsole并检查Queue MBean,以查看飞行中消息的统计信息、消费者数量等。
回答我自己的问题。
我面临的问题正是http://trenaman.blogspot.co.uk/2009/01/message-selectors-and-activemq.html解决方案是增加ActiveMQ中指定的maxPageSize和maxPageSize
可以匹配的是,200不是一个变化的数字,而是maxPageSize的默认值。
更多参考:
-
http://activemq.2283324.n4.nabble.com/Consumer-is-not-able-to-pick-messages-from-queue-td2531722.html
-
https://issues.apache.org/jira/browse/AMQ-2217
-
https://issues.apache.org/jira/browse/AMQ-2745