首页
下载
文档
问答社区
视频
捐赠
源代码
AI 助理
赞助商
CRMEB
Apipost
腾讯云
微擎
禅道
51Talk
商业产品
Swoole AI 智能文档翻译器
Swoole-Compiler PHP 代码加密器
CRMEB 新零售社交电商系统
登录
注册
全部
提问
分享
讨论
建议
公告
开发框架
CodeGalaxy
发表新帖
Swoole 协程如何使用rabbitmq 多信道?
```php Co\run(function () { Co::set(['hook_flags' => SWOOLE_HOOK_FILE]); $connection = new AMQPStreamConnection(env('RABBITMQ_HOST') , env('RABBITMQ_PORT') , env('RABBITMQ_LOGIN') , env('RABBITMQ_PASSWORD') , env('RABBITMQ_VHOST') , false , 'AMQPLAIN' , null , 'en_US' , 3 , 30 , null , true , 15); if (!$connection->isConnected()) { throw new \Exception('Connecte Fail.'); } $wg = new WaitGroup(); for ($i = 1 ; $i <= 4 ; $i++) { $wg->add(1); Co::create(function () use ($connection,$i,$wg) { stdDump($i); $channel = $connection->channel(); defer(function () use ($channel,$wg) { $channel->close(); $wg->done(); }); $exChanegName = env('RABBITMQ_EXCHANGE'); $channel->exchange_declare($exChanegName , 'fanout' , false , true , false); $queue_name = env('RABBITMQ_QUEUE' , "serve_toc_fs"); $channel->queue_declare($queue_name , false , true , false , false , false , new AMQPTable([ 'x-message-ttl' => 3000 , 'durable' => true , 'consumer_no_ack' => true , 'timeout' => 0 , 'prefetch_count' => 0 , // 限流 'persistent' => true , // required if you want to listen forever ])); $channel->queue_bind($queue_name , $exChanegName); $callback = function ($msg)use ($i) { stdDump("{$i}->",$msg->body); Co::sleep(1); return; }; $channel->basic_consume($queue_name , "TAG_{$i}" , false , true , false , false , $callback); while ($channel->is_consuming()) { $channel->wait(); } }); } $wg->wait(); }); ``` 查看 Connections Channels 有4个 ,2个闲置的 未绑定到的队列 查看rabbitmq Queue Consumers 只有 2个 channel
发布于3年前 · 33 次浏览 · 来自
提问
Alonexy
```php Co\run(function () { Co::set(['hook_flags' => SWOOLE_HOOK_FILE]); $connection = new AMQPStreamConnection(env('RABBITMQ_HOST') , env('RABBITMQ_PORT') , env('RABBITMQ_LOGIN') , env('RABBITMQ_PASSWORD') , env('RABBITMQ_VHOST') , false , 'AMQPLAIN' , null , 'en_US' , 3 , 30 , null , true , 15); if (!$connection->isConnected()) { throw new \Exception('Connecte Fail.'); } $wg = new WaitGroup(); for ($i = 1 ; $i <= 4 ; $i++) { $wg->add(1); Co::create(function () use ($connection,$i,$wg) { stdDump($i); $channel = $connection->channel(); defer(function () use ($channel,$wg) { $channel->close(); $wg->done(); }); $exChanegName = env('RABBITMQ_EXCHANGE'); $channel->exchange_declare($exChanegName , 'fanout' , false , true , false); $queue_name = env('RABBITMQ_QUEUE' , "serve_toc_fs"); $channel->queue_declare($queue_name , false , true , false , false , false , new AMQPTable([ 'x-message-ttl' => 3000 , 'durable' => true , 'consumer_no_ack' => true , 'timeout' => 0 , 'prefetch_count' => 0 , // 限流 'persistent' => true , // required if you want to listen forever ])); $channel->queue_bind($queue_name , $exChanegName); $callback = function ($msg)use ($i) { stdDump("{$i}->",$msg->body); Co::sleep(1); return; }; $channel->basic_consume($queue_name , "TAG_{$i}" , false , true , false , false , $callback); while ($channel->is_consuming()) { $channel->wait(); } }); } $wg->wait(); }); ``` 查看 Connections Channels 有4个 ,2个闲置的 未绑定到的队列 查看rabbitmq Queue Consumers 只有 2个 channel
赞
0
分享
收藏
提问
分享
讨论
建议
公告
开发框架
CodeGalaxy
评论
2021-06-22
李铭昕
hyperf/amqp 2.2 版本支持了多路复用,你可以看看具体的代码实现
赞
1
回复
2021-06-20
新
新用户(手机注册)
Co::create 里面创建连接
赞
0
回复
微信公众号
热门内容
作者其它话题
- Swoole Client 自定义协议规则
- Swoole Server 运行一段时间产生段错误
暂无回复的问答
- CodeGalaxy K3s 轻量集群节点之间如何实现负载均衡
- 关于openssl CURL WARNING swSSL_connect: SSL_connect(fd=69) failed. Error: error:141A318A:SSL routines:tls_process_ske_dhe:dh key too small[1|394]
- 多个模型如何进行事务异常回退?
- websocket开启wss报错
- 协程tcp服务器如何使用多进程?recv()方法接收信息,打印出来的pid一直是同一个。没用使用到多进程啊。