如何制作一个简单的任务队列


How to make a simple task queue

我想用RabbitMQ和名为AMQP的PHP PECL扩展创建一个简单的任务队列。

我的目标很简单:生产者应该向包含需要处理的对象的信封的特定队列发送消息。

所有消费者都应该听从上述队列,并在消息到来时对其进行处理。我需要能够添加更多的消费者,并让RabbitMq以循环方式发送消息。

虽然这很容易找到python或java库的教程,但我找不到PHP的PECL库的教程。

我不太确定我是否应该绑定任何东西,我有一个使用自定义php库的工作示例,该库使用了"basic_publish和basic_sume",但在PECL库中没有以这种方式实现。

到目前为止,我得到的是:出版商:

$oConfig = Zend_Registry::get('config');
$sQueue = $oConfig->amqp->validate_queue_name;
$oConnection = new AMQPConnection();
$oConnection->setLogin($oConfig->amqp->login);
$oConnection->setPassword($oConfig->amqp->pass);
$oConnection->setVhost($oConfig->amqp->vhost);
$oConnection->setPort($oConfig->amqp->port);
$oConnection->connect();
$oChannel = new AMQPChannel($oConnection);
$oExchange = new AMQPExchange($oChannel);
$sMsg = new stdClass();
$sMsg->nId = $p_nId;
$sMsg->nStatus= $p_nStatus;
try  {
  $oChannel->startTransaction();
  $bResponse = $oExchange->publish($sMgs,$sQueue);
  if (!$bResponse)  {
    echo "<h1>An error occured, the message can't be published</h1>";
    echo "<h3>Sorry i don't know why</h3>";
    exit;
  }
  $oChannel->commitTransaction();
}  catch (Exception $oException)  {
  echo "<h1>An error occured, the message can't be published</h1>";
  echo "<h3>See error below</h3>";
  echo "<pre>";
  echo print_r($oException->getMessage());
  echo "</pre>";
  exit;
}

工人

  $oConfig = Zend_Registry::get('config');
  $oConnection = new AMQPConnection();
  $oConnection->setLogin($oConfig->amqp->login);
  $oConnection->setPassword($oConfig->amqp->pass);
  $oConnection->setVhost($oConfig->amqp->vhost);
  $oConnection->setPort($oConfig->amqp->port);
  $oConnection->connect();
  $oChannel = new AMQPChannel($oConnection);
  $oQueue = new AMQPQueue($oChannel);
  $oQueue->declare($oConfig->amqp->validate_queue_name);
  function processMessage($oMessage, $oQueue) {
    $nId     = $msg->body->nId;
    $nStatus = $msg->body->nStatus;
    $oIniAct = $oActionMap->findBy('id',$nId);
    $sReply  = $oIniAct->updateStatusMisc($nStatus);
    if ($sReply->status == $nStatus)  {
      $oQueue->ack($sMsg['delivery_tag']);
    } else {
    $oQueue->nack($sMsg['delivery_tag'],AMQP_REQUEUE);
    }
  }
  $oQueue->consume("processMessage",AMQP_NOPARAM);

PHP文档告诉我consumer()会为每个人锁定相应的线程?所以基本上我一次只能有一个工人在工作?我还看到人们正在绑定队列,但我看到的第一个带有基本消费的工人示例没有使用它。

正如你所看到的,我很困惑,任何帮助/指导/教程ASO。。。将有助于

感谢

PHP具有同步性,因此,是的,consume()将锁定主线程,而底层逻辑是读取套接字连接上的所有传入数据,将其转换为PHP结构并提供给您的消费者函数。

在github上曾讨论过让php-amqp异步化的问题,但我们都同意,如果有人需要异步功能,php在设计上并不是最好的语言。

就我个人而言,我多次运行消费者脚本(事实上,我有均衡器),所以每个消费者不会相互影响,他们可能会失败并独立重新启动。我想你也可以这么做。

我一次多次运行使用者脚本作为守护进程和平衡器脚本(事实上,没有真正的负载平衡器),监视使用者的活动(通过memcache完成,不清楚,但WFM),当使用者上没有活动时,平衡器会一个接一个地杀死他们(但至少有一个工作的使用者应该还活着)。当消费者过载时,平衡器脚本启动更多的消费者。

如果你需要消费一条消息然后死亡,让你的消费函数返回false

若您确定队列至少有一条消息可用,您可能希望使用AMQPQueue::get()方法,该方法不会阻塞(或者至少不应该阻塞)您的主线程。