<?php$connection = new AMQPConnection();$connection->connect();$channel = new AMQPChannel($connection);$exchange = new AMQPExchange($channel);$exchange->setName('exchange1');$exchange->setType('fanout');$exchange->declare();
队列建立
<?php$connection = new AMQPConnection();$connection->connect();$channel = new AMQPChannel($connection);$queue = new AMQPQueue($channel);$queue->setName('queue1');$queue->declare();
队列绑定
<?php$connection = new AMQPConnection();$connection->connect();$channel = new AMQPChannel($connection);$queue = new AMQPQueue($channel);$queue->setName('queue1');$queue->declare();$queue->bind('exchange1', 'routekey');
消息发送
<?php$connection = new AMQPConnection();$connection->connect();$channel = new AMQPChannel($connection);$exchange = new AMQPExchange($channel);$exchange->setName('exchange5');$exchange->setType('fanout');$exchange->declare();for($i = 0; $i < 2000000; $i++) { $exchange->publish("message $i", "routekey");}
消息接收
<?php$connection = new AMQPConnection();$connection->connect();$channel = new AMQPChannel($connection);$queue = new AMQPQueue($channel);$queue->setName('queue1');$queue->declare();$queue->bind('exchange1', 'routekey');while (true) { $queue->consume(function($envelope, $queue){ echo $envelope->getBody(), PHP_EOL; }, AMQP_AUTOACK);}