PHP和AMQP RabbitMQ消费者


PHP and AMQP RabbitMQ consumer

我有一个工作的PHP脚本,它将消息发布到RabbitMQ,并从制表符分隔的文本文件中解析出来。我确实将该文件中的工作代码复制/粘贴到另一个文件中,并希望建立一个消费者,该消费者将检索发布到交易所的消息,json_edecode它们并将它们插入数据库。

每次尝试,甚至复制/粘贴PHP.net站点的示例代码,甚至SO中的示例,都会失败,出现空白的白色屏幕,没有错误消息,然后甚至会杀死PHP-fpm进程。

知道为什么队列不会绑定吗?这里出了什么问题

  • Nginx->php fpm
  • PHP 5.3.x
  • Macbook Pro(OSX Lion)
  • RabbitMQ(已安装librabbitmq和pecl-amqp)

这是我尝试过的一个例子,但我在AMQP文档上尝试过PHP.net和SO的例子,但都不起作用。我可以很好地发布,但当我尝试绑定队列时,它失败了,最终php-fpm锁定。

<?php
// Report all PHP errors
error_reporting(E_ALL);
/*****************************************
 * MQ settings
 ****************************************/
$mq = array(
           'host' => 'localhost',
           'port' => 5672,
           'login' => 'guest',
           'password' => 'guest',
           'exchange' => 'gbus.user',
           'routing_key' => 'gbus.test.mike',
           );
/*****************************************
 * Connect to queue
 ****************************************/
$conn_args = array('host' => $mq['host'], 'port' => $mq['port'], 'login' => $mq['login'], 'password' => $mq['password']);
$conn = new AMQPConnection($conn_args);
$conn->connect();
$ch = new AMQPChannel($conn);
// Create a new queue
$q = new AMQPQueue($ch);
$q->declare('test-queue');
$q->bind($mq['exchange'],$mq['routing_key']);
?>
<br>
<font color="blue" face="arial" size="4">File Contents</font>
<hr>
<?php 
while(true){
    $msg=$q->get();
    if ($msg['count']>-1){
        echo "'n--------'n";
        print_r($msg['msg']);
        echo "'n--------'n";
    }
    sleep(1);   
}
if (!$conn->disconnect()) {
    throw new Exception('Could not disconnect');
}
?>

下面是我用来发布到队列的示例,每次运行它时,我都会在RabbitMQ控制面板中查看20条新消息。我将测试限制为20行,但该文件有数以万计的行。

工作发布代码:

<?php
/*****************************************
 * MQ settings
 ****************************************/
$mq = array(
           'host' => 'localhost',
           'port' => 5672,
           'login' => 'guest',
           'password' => 'guest',
           'exchange' => 'gbus.user',
           'routing_key' => 'gbus.test.mike',
           );
/*****************************************
 * Connect to queue
 ****************************************/
$conn_args = array('host' => $mq['host'], 'port' => $mq['port'], 'login' => $mq['login'], 'password' => $mq['password']);
$conn = new AMQPConnection($conn_args);
$conn->connect();
$ch = new AMQPChannel($conn);
$ex = new AMQPExchange($ch);
$ex->setName($mq['exchange']);

/*****************************************
 * Parse the file
 ****************************************/
$filename = "/tmp/Users.txt";
$board = "test";
$fd = fopen ($filename, "r");
$contents = fread ($fd,filesize ($filename));
fclose ($fd);
$delimiter = "'r'n";
$rows = explode($delimiter, $contents);
$counter = 0;
?>
<br>
<font color="blue" face="arial" size="4">File Rows (first 20)</font>
<hr>
<?php 
foreach ( $rows as $row )
{
    $counter++;
    echo "<b>Row $counter: </b> $row<br>";
    // build list columns
    list($login_name, $pwd, $account_type, $access_level, $status, $first_name, $last_name, $agent_code) = explode("'t", $row);
    // build assoc array for json
    $user = array("domain"=>$board, "username"=>$login_name, "user_id"=>$agent_code, "password"=>$pwd, "first_name"=>$first_name, "last_name"=>$last_name);
    // Publish a message to the exchange with a routing key
    $ex->publish(json_encode($user), $mq['routing_key'], AMQP_NOPARAM, array("content_type"=>"application/data"));
    if($counter == 20) {
        break;
    }
}
$ch->close();
$conn->close();
?>

您尝试过这里的示例吗:https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php?

您要查看的两个是:emit_log.phpreceive_logs.php

使用的库与PECL/内置库不同,我认为内置库不支持消费。