首页 > 编程 > PHP > 正文

PHP和RabbitMQ实现消息队列的完整代码

2020-03-22 17:38:25
字体:
来源:转载
供稿:网友
本篇文章给大家带来的内容是关于PHP和RabbitMQ实现消息队列的完整代码,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.
php扩展地址: http://pecl.php.net/package/amqp
具体以网址为准 http://www.rabbitmq.com/getstarted.html

介绍

config.php 配置信息
BaseMQ.php MQ基类
ProductMQ.php 生产者类
ConsumerMQ.php 消费者类
Consumer2MQ.php 消费者2(可有多个)

config.php
 ?php return [ //配置 host = [ host = 127.0.0.1 , port = 5672 , login = guest , password = guest , vhost = / , //交换机 exchange = word , //路由 routes = [], ];
BaseMQ.php
 ?php * Created by PhpStorm. * User: pc * Date: 2018/12/13 * Time: 14:11 namespace MyObjSummary/rabbitMQ; /** Member * AMQPChannel * AMQPConnection * AMQPEnvelope * AMQPExchange * AMQPQueue * Class BaseMQ * @package MyObjSummary/rabbitMQ class BaseMQ /** MQ Channel * @var /AMQPChannel public $AMQPChannel ; /** MQ Link * @var /AMQPConnection public $AMQPConnection ; /** MQ Envelope * @var /AMQPEnvelope public $AMQPEnvelope ; /** MQ Exchange * @var /AMQPExchange public $AMQPExchange ; /** MQ Queue * @var /AMQPQueue public $AMQPQueue ; /** conf * @var public $conf ; /** exchange * @var public $exchange ; /** link * BaseMQ constructor. * @throws /AMQPConnectionException public function __construct() $conf = require config.php  if(!$conf) throw new /AMQPConnectionException( config error!  $this- conf = $conf[ host  $this- exchange = $conf[ exchange  $this- AMQPConnection = new /AMQPConnection($this- conf); if (!$this- AMQPConnection- connect()) throw new /AMQPConnectionException( Cannot connect to the broker!/n  * close link public function close() $this- AMQPConnection- disconnect(); /** Channel * @return /AMQPChannel * @throws /AMQPConnectionException public function channel() if(!$this- AMQPChannel) { $this- AMQPChannel = new /AMQPChannel($this- AMQPConnection); return $this- AMQPChannel; /** Exchange * @return /AMQPExchange * @throws /AMQPConnectionException * @throws /AMQPExchangeException public function exchange() if(!$this- AMQPExchange) { $this- AMQPExchange = new /AMQPExchange($this- channel()); $this- AMQPExchange- setName($this- exchange); return $this- AMQPExchange ; /** queue * @return /AMQPQueue * @throws /AMQPConnectionException * @throws /AMQPQueueException public function queue() if(!$this- AMQPQueue) { $this- AMQPQueue = new /AMQPQueue($this- channel()); return $this- AMQPQueue ; /** Envelope * @return /AMQPEnvelope public function envelope() if(!$this- AMQPEnvelope) { $this- AMQPEnvelope = new /AMQPEnvelope(); return $this- AMQPEnvelope; }

ProductMQ.php

 ?php //生产者 P namespace MyObjSummary/rabbitMQ; require BaseMQ.php  class ProductMQ extends BaseMQ private $routes = [ hello , word //路由key * ProductMQ constructor. * @throws /AMQPConnectionException public function __construct() parent::__construct(); /** 只控制发送成功 不接受消费者是否收到 * @throws /AMQPChannelException * @throws /AMQPConnectionException * @throws /AMQPExchangeException public function run() //频道 $channel = $this- channel(); //创建交换机对象 $ex = $this- exchange(); //消息内容 $message = product message .rand(1,99999); //开始事务 $channel- startTransaction(); $sendEd = true ; foreach ($this- routes as $route) { $sendEd = $ex- publish($message, $route) ; echo Send Message: .$sendEd. /n  if(!$sendEd) { $channel- rollbackTransaction(); $channel- commitTransaction(); //提交事务 $this- close(); die ; try{ (new ProductMQ())- run(); }catch (/Exception $exception){ var_dump($exception- getMessage()) ; }
ConsumerMQ.php
 ?php //消费者 C namespace MyObjSummary/rabbitMQ; require BaseMQ.php  class ConsumerMQ extends BaseMQ private $q_name = hello //队列名 private $route = hello //路由key * ConsumerMQ constructor. * @throws /AMQPConnectionException public function __construct() parent::__construct(); /** 接受消息 如果终止 重连时会有消息 * @throws /AMQPChannelException * @throws /AMQPConnectionException * @throws /AMQPExchangeException * @throws /AMQPQueueException public function run() //创建交换机 $ex = $this- exchange(); $ex- setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex- setFlags(AMQP_DURABLE); //持久化 //echo Exchange Status: .$ex- declare(). /n  //创建队列 $q = $this- queue(); //var_dump($q- declare());exit(); $q- setName($this- q_name); $q- setFlags(AMQP_DURABLE); //持久化 //echo Message Total: .$q- declareQueue(). /n  //绑定交换机与队列,并指定路由键 echo Queue Bind: .$q- bind($this- exchange, $this- route). /n  //阻塞模式接收消息 echo Message:/n  while(True){ $q- consume(function ($envelope,$queue){ $msg = $envelope- getBody(); echo $msg. /n //处理消息 $queue- ack($envelope- getDeliveryTag()); //手动发送ACK应答 //$q- consume( processMessage , AMQP_AUTOACK); //自动ACK应答 $this- close(); try{ (new ConsumerMQ)- run(); }catch (/Exception $exception){ var_dump($exception- getMessage()) ; }

以上就是PHP和RabbitMQ实现消息队列的完整代码的详细内容,PHP教程

郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表