如果SEND帧在事务(STOMP)中,则它将失败


SEND frame fails if it is in a transaction (STOMP)

我正在为STOMP(Apollo)制作自己的PHP网关,因为所有现有的解决方案要么太糟糕,要么太复杂。

因此,当我在没有交易(但有收据)的情况下向队列发送消息时,一切正常,请参阅日志:

SEND
destination:/queue/teststst
content-type:text/plain
content-length:100
receipt:f1dd03450508d938e6eb8196a6d128c4
2149b4a936862b121f7928ed5c060152
75235e0fbc8a56970ede75d2147b538b
fece2e4403b52fd903b8e7a78b1918c6
RECEIPT
receipt-id:f1dd03450508d938e6eb8196a6d128c4

但当涉及到交易时,我会得到这样的信息(我删除了握手日志,别担心):

BEGIN
transaction:499cc8a062be1235d312e968e5f30802
receipt:f7c837aed5ee9efd8f27143d85061067
RECEIPT
receipt-id:f7c837aed5ee9efd8f27143d85061067

SEND
destination:/queue/teststst
content-type:text/plain
content-length:100
transaction:499cc8a062be1235d312e968e5f30802
receipt:7048ce3f8a01b55294f5e92fcd501c93
f4c356386a563be82f460900889ea07f
bdd8a503ac3e5ddf0d816f95e9448e1e
e5b7ea903beb7f6300249217e9824ef0

然后什么都没有。我的包装器试图接收接收帧,但只有一个fgets超时,看起来像是代理在等待更多的SEND帧数据,但生成帧的过程是相同的,只多了一个标头(事务)。所有必要的EOL和空八位字节都在它们的位置上。

Apollo v1.6,STOMP v1.2。

可能是什么?。。

更新:源代码

<?php
class Stompler {
  const EOL = "'n";
  const NULL_OCTET = "'x00";
  const STATE_HEADER = 1;
  const STATE_BODY   = 2;
  protected $subscription = false;
  protected $transactionStack;
  protected $connection;
  protected $socket;
  protected $possibleFrameTypes = [
    'server' => [
      'MESSAGE',
      'RECEIPT',
      'ERROR',
      'CONNECTED',
    ],
  ];
  public function send($message, $queueName, $async = false) {
    $this->connect();
    $params = [
      'destination' => $queueName,
      'content-type' => 'text/plain',
      'content-length' => mb_strlen($message . static::EOL),
    ];
    if (isset($this->transactionStack) && !$this->transactionStack->isEmpty()) {
      $params['transaction'] = $this->transactionStack->top();
    }
    if ($async === false) {
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    }
    $this->sendFrame('send', $params, $message);
    if ($async === false) $this->checkReceipt($receiptId, 'send');
  }
  public function subscribe($queueName, $async = false, $autoAck = false) {
    $this->connect();
    if ($this->subscription === true) {
      throw new StomplerException('Another subscription has already been started');
    }
    $this->subscription = true;
    $params = [
      'id' => 1,
      'destination' => $queueName,
      'ack' => ($autoAck === true ? 'auto' : 'client'),
    ];
    if ($async === false) {
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    }
    $this->sendFrame('subscribe', $params);
    if ($async === false) $this->checkReceipt($receiptId, 'subscribe');
  }
  public function unsubscribe() {
    if ($this->subscription === false) {
      throw new StomplerException('No subscription');
    }
    $this->subscription = false;
    $this->sendFrame('unsubscribe', [
      'id' => 1,
    ]);
  }
  public function connect() {
    if (!empty($this->connection)) return;
    $config = [...];
            $this->socket = fsockopen('tcp://' . $config['host'], $config['port']);
            if (empty($this->socket)) throw new StomplerConnectionException;
            stream_set_timeout($this->socket, 2);
            $this->sendFrame('connect', [
          'accept-version' => '1.2',
          'login'          => $config['login'],
          'passcode'       => $config['password'],
          'virtual-host'   => 'srv',
          'host'           => 'srv',
        ]);
            $frame = $this->readFrame();
            if ($frame['name'] === 'ERROR') {
          throw new StomplerConnectionException("Could not connect to broker: '{$frame['headers']['message']}' ({$frame['body']})");
        }
            if ($frame['name'] !== 'CONNECTED') {
          throw new StomplerConnectionException;
        }
            $this->connection = $frame['headers']['session'];
        }
  public function ack($message, $async = false) {
    $id = is_array($message) ? $message['headers']['ack'] : $message;
    $params = [
      'id' => $id,
    ];
    if (isset($this->transactionStack) && !$this->transactionStack->isEmpty()) {
      $params['transaction'] = $this->transactionStack->top();
    }
    if ($async === false) {
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    }
    $this->sendFrame('ack', $params);
    if ($async === false) $this->checkReceipt($receiptId, 'ack');
  }
  public function nack($message, $async = false) {
    $id = is_array($message) ? $message['headers']['ack'] : $message;
    $params = [
      'id' => $id,
    ];
    if (isset($this->transactionStack) && !$this->transactionStack->isEmpty()) {
      $params['transaction'] = $this->transactionStack->top();
    }
    if ($async === false) {
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    }
    $this->sendFrame('nack', $params);
    if ($async === false) $this->checkReceipt($receiptId, 'nack');
  }
  public function begin($async = false) {
    $this->connect();
    if ($this->transactionStack === null) {
      $this->transactionStack = new 'SplStack();
    }
    $this->transactionStack->unshift($this->generateRandom());
    $params = ['transaction' => $this->transactionStack->top()];
    if ($async === false) {
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    }
    $this->sendFrame('begin', $params);
    if ($async === false) $this->checkReceipt($receiptId, 'begin');
  }
  public function commit() {
    if (empty($this->transactionStack) || $this->transactionStack->isEmpty()) {
      throw new StomplerException('No transaction started');
    }
    $this->sendFrame('commit', ['transaction' => $this->transactionStack->pop()]);
  }
  public function abort($transactionId) {
    if (empty($this->transactionStack) || $this->transactionStack->isEmpty()) {
      throw new StomplerException('No transaction started');
    }
    $this->sendFrame('abort', ['transaction' => $this->transactionStack->pop()]);
  }
  public function readFrame($expectedFrameName = null) {
    $started = time();
    $frame = [
      'name' => null,
      'headers' => [],
      'body' => null,
    ];
    $state = false;
    $frameName = false;
    //echo '-------RECV--------' . PHP_EOL;
    while (true) {
      if (feof($this->socket) || ((time() - $started) > 1)) return false;
      $frameLine = fgets($this->socket);
      echo $frameLine;
      if ($state === static::STATE_HEADER) {
        $header = rtrim($frameLine, static::EOL);
        if (!empty($header)) {
          list($k, $v) = explode(':', $header);
          $frame['headers'][$k] = $v;
        }
      }
      if ($state === static::STATE_BODY) $frame['body'] .= $frameLine;
      if ($state === false) {
        $frameName = $frame['name'] = rtrim($frameLine, static::EOL);
        if (!in_array($frameName, $this->possibleFrameTypes['server'])) {
          if (empty($frameName)) return false;
          throw new StomplerUnknownFrameException($frameName);
        }
        if ($expectedFrameName !== null && $frameName !== mb_strtoupper($expectedFrameName)) {
          throw new StomplerUnexpectedFrameException($frameName);
        }
        $state = static::STATE_HEADER;
      }
      elseif ($state === static::STATE_HEADER && $frameLine === static::EOL) {
        $state = static::STATE_BODY;
      }
      elseif ($state === static::STATE_BODY && $this->detectNullOctet($frameLine)) {
        break;
      }
    }
    //echo '-------RECV--------' . PHP_EOL;
    if ($frame['body'] !== null) $frame['body'] = rtrim($frame['body'], static::EOL . static::NULL_OCTET);
    if ($frame['name'] === null) return false;
    return $frame;
  }
  private function sendFrame($frameName, $frameParams, $body = null) {
    $frame = $this->compileFrame($frameName, $frameParams, $body);
    //echo '=======SEND========' . PHP_EOL;
    echo $frame;
    //echo '=======SEND========' . PHP_EOL;
    $result = fwrite($this->socket, $frame);
    if (empty($result)) {
      $md = stream_get_meta_data($this->socket);
      if($md['timed_out']) throw new StomplerTimeoutConnectionException;
      throw new StomplerUnknownConnectionException;
    }
  }
  private function compileFrame($name, $headers, $body = null) {
    $result = mb_strtoupper($name) . static::EOL;
    foreach ($headers as $key => $value) {
      $result .= $key;
      if ($value !== false) $result .= ':' . $value;
      $result .= static::EOL;
    }
    if ($body) $result .= static::EOL . $body;
    $result .= static::EOL . static::NULL_OCTET;
    return $result;
  }
  private function detectNullOctet($string) {
    return strpos($string, static::NULL_OCTET) === (mb_strlen($string) - 2);
  }
  private function checkReceipt($receiptId, $frameName) {
    $frameName = mb_strtoupper($frameName);
    try {
      $receiptFrame = $this->readFrame('RECEIPT');
      if ($receiptFrame['headers']['receipt-id'] != $receiptId) {
        throw new StomplerException("Wrong receipt for {$frameName} frame (expected {$receiptFrame}, received {$receiptFrame['headers']['receipt-id']})");
      }
    }
    catch (StomplerUnexpectedFrameException $e) {
      throw new StomplerException("Could not receive receipt frame for {$frameName} frame (received {$e->getMessage()} frame)");
    }
  }
  private function generateRandom() {
    return md5(uniqid('', true));
  }
  public function __destruct() {
    if (empty($this->socket)) return;
    $this->connection = null;
    fclose($this->socket);
  }
}

解决了这个问题。如果您开始交易,就不需要收据头,因此必须修复官方STOMP文档(可能,这是Apollo相关的问题)。

您确定要发送stomp COMMIT帧吗?在相关事务提交之前,接收器不会获得事务处理的SEND帧。