加入收藏 | 设为首页 | 会员中心 | 我要投稿 北几岛 (https://www.beijidao.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

RabbitMQ 入门教程(PHP版) 第五部分:通过主题进行消息分发(Topi

发布时间:2021-08-28 03:44:08 所属栏目:大数据 来源: https://www.jb51.cc
导读:对于 Message 的? routing_key ?字符串格式是有限制的:以点号"."分割的字符表,如 PHP.laravel ,并且长度不能超过 255 个字节。 对于? routing_key ?而言,有两个特殊字符: * :代表任意单词 # :代表0个或多个单词 ? ? Topic Exchange 与其他 Exchange

对于 Message 的?routing_key?字符串格式是有限制的:以点号"."分割的字符表,如PHP.laravel,并且长度不能超过 255 个字节。

对于?routing_key?而言,有两个特殊字符:

  • *:代表任意单词
  • #:代表0个或多个单词

?

?

Topic Exchange 与其他 Exchange 的转化:

  • routing_key?是?#,会接收所有 Message,此时等同于 Fanout Exchange;
  • routing_key?不包含?#?或?*,则等同于 Direct Exchange

整合代码

emit_log_topic.PHP

<?PHP

/**
 * 发送消息
 */

$exchangeName = 'topic_logs';
$topic = empty($argv[1]) ? 'anonymous.info' : $argv[1]; // 主题
$message = empty($argv[2]) ? 'Hello World!' : $argv[2];

// 建立TCP连接
$connection = new AMQPConnection([
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to broker!n");

$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declareExchange();

$exchange->publish($message, $topic);
echo "Message is sent: " . $message . "n";
$connection->disconnect();

receive_logs_topic.PHP

<?PHP

/**
 * 接收消息
 */

$exchangeName = 'topic_logs';
$topic = $argv[1];

// 建立TCP连接
$connection = new AMQPConnection([
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to broker!n");

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declareQueue();
$queue->bind($exchangeName, $topic);

echo "Waiting for logs...n";
while (TRUE) {
    $queue->consume('processLogs');
}

$connection->disconnect();

function processLogs($envelope, $queue) {
    $logs = $envelope->getBody();
    var_dump("Received: " . $logs);
    $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
}

先运行脚本:

PHP receive_logs_topic.PHP *.laravel

然后再运行另外一个脚本:

PHP emit_log_topic.PHP PHP.laravel

?

效果展示:

?

(编辑:北几岛)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读