Home
Document
Q&A
Video
Donate
Source Code
Code-Galaxy 云原生平台
Business
Swoole Tracker
Swoole Compiler
Login
Register
全部
提问
分享
讨论
建议
公告
开发框架
发表新帖
Swoole 协程如何使用rabbitmq 多信道?
```php Co\run(function () { Co::set(['hook_flags' => SWOOLE_HOOK_FILE]); $connection = new AMQPStreamConnection(env('RABBITMQ_HOST') , env('RABBITMQ_PORT') , env('RABBITMQ_LOGIN') , env('RABBITMQ_PASSWORD') , env('RABBITMQ_VHOST') , false , 'AMQPLAIN' , null , 'en_US' , 3 , 30 , null , true , 15); if (!$connection->isConnected()) { throw new \Exception('Connecte Fail.'); } $wg = new WaitGroup(); for ($i = 1 ; $i <= 4 ; $i++) { $wg->add(1); Co::create(function () use ($connection,$i,$wg) { stdDump($i); $channel = $connection->channel(); defer(function () use ($channel,$wg) { $channel->close(); $wg->done(); }); $exChanegName = env('RABBITMQ_EXCHANGE'); $channel->exchange_declare($exChanegName , 'fanout' , false , true , false); $queue_name = env('RABBITMQ_QUEUE' , "serve_toc_fs"); $channel->queue_declare($queue_name , false , true , false , false , false , new AMQPTable([ 'x-message-ttl' => 3000 , 'durable' => true , 'consumer_no_ack' => true , 'timeout' => 0 , 'prefetch_count' => 0 , // 限流 'persistent' => true , // required if you want to listen forever ])); $channel->queue_bind($queue_name , $exChanegName); $callback = function ($msg)use ($i) { stdDump("{$i}->",$msg->body); Co::sleep(1); return; }; $channel->basic_consume($queue_name , "TAG_{$i}" , false , true , false , false , $callback); while ($channel->is_consuming()) { $channel->wait(); } }); } $wg->wait(); }); ``` 查看 Connections Channels 有4个 ,2个闲置的 未绑定到的队列 查看rabbitmq Queue Consumers 只有 2个 channel
发布于1年前 · 28 次浏览 · 来自
提问
Alonexy
```php Co\run(function () { Co::set(['hook_flags' => SWOOLE_HOOK_FILE]); $connection = new AMQPStreamConnection(env('RABBITMQ_HOST') , env('RABBITMQ_PORT') , env('RABBITMQ_LOGIN') , env('RABBITMQ_PASSWORD') , env('RABBITMQ_VHOST') , false , 'AMQPLAIN' , null , 'en_US' , 3 , 30 , null , true , 15); if (!$connection->isConnected()) { throw new \Exception('Connecte Fail.'); } $wg = new WaitGroup(); for ($i = 1 ; $i <= 4 ; $i++) { $wg->add(1); Co::create(function () use ($connection,$i,$wg) { stdDump($i); $channel = $connection->channel(); defer(function () use ($channel,$wg) { $channel->close(); $wg->done(); }); $exChanegName = env('RABBITMQ_EXCHANGE'); $channel->exchange_declare($exChanegName , 'fanout' , false , true , false); $queue_name = env('RABBITMQ_QUEUE' , "serve_toc_fs"); $channel->queue_declare($queue_name , false , true , false , false , false , new AMQPTable([ 'x-message-ttl' => 3000 , 'durable' => true , 'consumer_no_ack' => true , 'timeout' => 0 , 'prefetch_count' => 0 , // 限流 'persistent' => true , // required if you want to listen forever ])); $channel->queue_bind($queue_name , $exChanegName); $callback = function ($msg)use ($i) { stdDump("{$i}->",$msg->body); Co::sleep(1); return; }; $channel->basic_consume($queue_name , "TAG_{$i}" , false , true , false , false , $callback); while ($channel->is_consuming()) { $channel->wait(); } }); } $wg->wait(); }); ``` 查看 Connections Channels 有4个 ,2个闲置的 未绑定到的队列 查看rabbitmq Queue Consumers 只有 2个 channel
赞
0
分享
收藏
提问
分享
讨论
建议
公告
开发框架
评论
2021-06-21
李铭昕
hyperf/amqp 2.2 版本支持了多路复用,你可以看看具体的代码实现
赞
1
回复
2021-06-19
新
新用户(手机注册)
Co::create 里面创建连接
赞
0
回复
微信公众号
热门内容
- 关于不同进程共用链接的疑问
- 关于sendfile发送文件的一些疑问,还请知道的大佬赐教
- 请问哪位大佬知道我这是什么原因?
- tcp连接4次挥手的疑问
- 开启enableCoroutine发送https请求会报错abnormal exit, status=0, signal=10
作者其它话题
- Swoole Client 自定义协议规则
- Swoole Server 运行一段时间产生段错误
暂无回复的问答
- 关于openssl CURL WARNING swSSL_connect: SSL_connect(fd=69) failed. Error: error:141A318A:SSL routines:tls_process_ske_dhe:dh key too small[1|394]
- 请问那个一键协程化的代码是放外面还是set里面
- 多个模型如何进行事务异常回退?
- websocket开启wss报错
- 协程tcp服务器如何使用多进程?recv()方法接收信息,打印出来的pid一直是同一个。没用使用到多进程啊。