首页
下载
文档
问答社区
视频
捐赠
源代码
AI 助理
赞助商
CRMEB
Apipost
腾讯云
微擎
禅道
51Talk
商业产品
Swoole AI 智能文档翻译器
Swoole-Compiler PHP 代码加密器
CRMEB 新零售社交电商系统
登录
注册
全部
提问
分享
讨论
建议
公告
开发框架
CodeGalaxy
发表新帖
请大神来帮我看下 我使用 swoole 群发数据的代码是否正确
### 问题描述 就是我想用swoole 做个批量群发 的服务器。 功能就是 我向微信的API 发送模板消息 给所以粉丝。 ### Swoole版本,PHP版本,以及操作系统版本信息 swoole4.x版本 php 7.4 linux 4核 8G内存 10M带宽 ### 相关代码 ```php <?php require './communication.php'; //这个是封装的CURL方法 $serv = new Swoole\Server('0.0.0.0', 9555); //设置异步任务的工作进程数量 $serv->set([ //配置此参数后将会启用 task 功能。所以 Server 务必要注册 onTask、onFinish 2 个事件回调函数。如果没有注册,服务器程序将无法启动。 'worker_num'=>4, 'task_worker_num' => 10, 'task_enable_coroutine'=>true, 'enable_coroutine'=>true, ]); $serv->PDO = new PDO('mysql:host=60.0.0.1;dbname=name;charset=utf8', 'username', 'ttNxMyj4scFneyES'); //此回调函数在worker进程中执行 $serv->on('Receive', function($serv, $fd, $reactor_id, $data) { //投递异步任务 $account = json_decode($data,true); $page = $account['page']; $pdo = $serv->PDO; $pageindex = max(intval($page), 1); $pagesize = $account['pagesize']; $sql = 'SELECT openid FROM `ims_mc_mapping_fans` WHERE `uniacid`=:uniacid and `follow`=:follow ORDER BY fanid desc LIMIT '.(($pageindex -1) * $pagesize).','. $pagesize; $statement = $pdo->prepare($sql); $statement->execute(array(':uniacid'=>$account['uniacid'],':follow'=>'1')); $swoole = $statement->fetchall(); $statement = null; $pdo = null; foreach($swoole as $key=>$val){ unset($swoole[$key][0]); } $account['fans'] = $swoole; $task_id = $serv->task($account); $serv->send($fd,'SUCCESS'); echo "已接收worker:id={$task_id}\n"; }); //处理异步任务(此回调函数在task进程中执行) $serv->on('Task', function ($serv,Swoole\Server\Task $task) { $account = $task->data; $page = $account['page']; $swoolecount= count($account['fans']); $result = array(); $return_arr['no'] = array(); $return_arr['ok'] = array(); if($swoolecount>0){ $chan = new chan($swoolecount); $chan->datar= array( 'getAccessToken'=>$account['getAccessToken'], 'template_id'=>$account['template_id'], 'url'=>$account['url'], 'postdata'=>$account['postdata'] ); foreach($account['fans'] as $key=>$val){ $chan->val = $val; go(function () use ($chan) { $valdata = $chan->val; $obj_data= $chan->datar; $token = $obj_data['getAccessToken']; $datar = array(); $datar['touser'] = $valdata['openid']; $datar['template_id'] = trim($obj_data['template_id']); $datar['url'] = trim($obj_data['url']); $datar['topcolor'] = trim('#FF683F'); $datar['data'] = $obj_data['postdata']; $datar = json_encode($datar); $post_url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={$token}"; $response = ihttp_request($post_url, $datar); $result = @json_decode($response['content'], true); if($result['errmsg']=='ok'){ $as['ok'] = $valdata['openid']; $chan->push($as); }else{ $as['no'] = $valdata['openid']; $chan->push($as); } }); $chan->val = null; }; for ($i = 0; $i < $swoolecount; $i++){ $result[]= $chan->pop(); } foreach($result as $k=>$v){ if($v['ok']!=''){ $return_arr['ok'][] = $v['ok']; } if($v['no']!=''){ $return_arr['no'][] = $v['no']; } } } $result = count($return_arr['ok']); $error = count($return_arr['no']); echo "处理异步任务[id={$task_id}]-数据数量:{$swoolecount}-当前处理第{$page}页-成功发送{$result}条-失败{$error}条".PHP_EOL; //返回任务执行的结果 $query['result'] = '10000'; $query['messages'] = '执行完成'; $query['pagesize'] = $account['pagesize']; $query['tid'] = $account['tid']; $query['page'] = $account['page']; $query['success'] = $swoolecount;//查询数据 $query['result'] = $result; //执行数据 $query['error'] = $error; //执行数据 $task->finish($query); }); //处理异步任务的结果(此回调函数在worker进程中执行) $serv->on('Finish', function ($serv, $task_id, $data) { $pdo = $serv->PDO; if($data['success']<=0 && $data['result']<=0){ $data['state'] = '2'; $tmplmsg_s = "update ims_gicai_asgr_tmplmsg set click=click+{$data['result']},error=error+{$data['error']},page=page-1,now_worker=now_worker-1,state={$data['state']} where id=:id"; }else{ $tmplmsg_s = "update ims_gicai_asgr_tmplmsg set click=click+{$data['result']},error=error+{$data['error']},now_worker=now_worker-1 where id=:id"; } $statement = $pdo->prepare($tmplmsg_s); $statement->execute(array(':id'=>$data['tid'])); if($statement){ $affect_row = $statement->rowCount(); echo "AsyncTask[{$task_id}] Finish: ok".PHP_EOL; }else{ echo "AsyncTask[{$task_id}] Finish: no".PHP_EOL; } $statement = null; $pdo = null; }); $serv->on('Close', function ($serv, $fd) { echo "Client: Close.\n"; }); $serv->start(); ``` ### 你期待的结果是什么?实际看到的错误信息又是什么? 我现在的业务是 通过PHP TCP 向这个服务器 发送数据 ,然后服务器 接收到参数后 开始查询数据库的粉丝 拿到 第一页 100条数据 开始 布置Task任务 ,然后 通过foreach 循环 创建GO协程 每个GO里面是 curl 向服务器提交模板消息,然后回调 统计出成功的 返回 Finish 里 然后修改下 SQL 指定的表 。这个过程 都跑通了,但是CUP 大概在 60-90%徘徊 内存一点没消耗,大概 1个小时能发 不到 6万条。 我想肯定有其他好办法,我请教各位大神 能帮我优化下吗 。
发布于2年前 · 19 次浏览 · 来自
提问
高手话不多
### 问题描述 就是我想用swoole 做个批量群发 的服务器。 功能就是 我向微信的API 发送模板消息 给所以粉丝。 ### Swoole版本,PHP版本,以及操作系统版本信息 swoole4.x版本 php 7.4 linux 4核 8G内存 10M带宽 ### 相关代码 ```php <?php require './communication.php'; //这个是封装的CURL方法 $serv = new Swoole\Server('0.0.0.0', 9555); //设置异步任务的工作进程数量 $serv->set([ //配置此参数后将会启用 task 功能。所以 Server 务必要注册 onTask、onFinish 2 个事件回调函数。如果没有注册,服务器程序将无法启动。 'worker_num'=>4, 'task_worker_num' => 10, 'task_enable_coroutine'=>true, 'enable_coroutine'=>true, ]); $serv->PDO = new PDO('mysql:host=60.0.0.1;dbname=name;charset=utf8', 'username', 'ttNxMyj4scFneyES'); //此回调函数在worker进程中执行 $serv->on('Receive', function($serv, $fd, $reactor_id, $data) { //投递异步任务 $account = json_decode($data,true); $page = $account['page']; $pdo = $serv->PDO; $pageindex = max(intval($page), 1); $pagesize = $account['pagesize']; $sql = 'SELECT openid FROM `ims_mc_mapping_fans` WHERE `uniacid`=:uniacid and `follow`=:follow ORDER BY fanid desc LIMIT '.(($pageindex -1) * $pagesize).','. $pagesize; $statement = $pdo->prepare($sql); $statement->execute(array(':uniacid'=>$account['uniacid'],':follow'=>'1')); $swoole = $statement->fetchall(); $statement = null; $pdo = null; foreach($swoole as $key=>$val){ unset($swoole[$key][0]); } $account['fans'] = $swoole; $task_id = $serv->task($account); $serv->send($fd,'SUCCESS'); echo "已接收worker:id={$task_id}\n"; }); //处理异步任务(此回调函数在task进程中执行) $serv->on('Task', function ($serv,Swoole\Server\Task $task) { $account = $task->data; $page = $account['page']; $swoolecount= count($account['fans']); $result = array(); $return_arr['no'] = array(); $return_arr['ok'] = array(); if($swoolecount>0){ $chan = new chan($swoolecount); $chan->datar= array( 'getAccessToken'=>$account['getAccessToken'], 'template_id'=>$account['template_id'], 'url'=>$account['url'], 'postdata'=>$account['postdata'] ); foreach($account['fans'] as $key=>$val){ $chan->val = $val; go(function () use ($chan) { $valdata = $chan->val; $obj_data= $chan->datar; $token = $obj_data['getAccessToken']; $datar = array(); $datar['touser'] = $valdata['openid']; $datar['template_id'] = trim($obj_data['template_id']); $datar['url'] = trim($obj_data['url']); $datar['topcolor'] = trim('#FF683F'); $datar['data'] = $obj_data['postdata']; $datar = json_encode($datar); $post_url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={$token}"; $response = ihttp_request($post_url, $datar); $result = @json_decode($response['content'], true); if($result['errmsg']=='ok'){ $as['ok'] = $valdata['openid']; $chan->push($as); }else{ $as['no'] = $valdata['openid']; $chan->push($as); } }); $chan->val = null; }; for ($i = 0; $i < $swoolecount; $i++){ $result[]= $chan->pop(); } foreach($result as $k=>$v){ if($v['ok']!=''){ $return_arr['ok'][] = $v['ok']; } if($v['no']!=''){ $return_arr['no'][] = $v['no']; } } } $result = count($return_arr['ok']); $error = count($return_arr['no']); echo "处理异步任务[id={$task_id}]-数据数量:{$swoolecount}-当前处理第{$page}页-成功发送{$result}条-失败{$error}条".PHP_EOL; //返回任务执行的结果 $query['result'] = '10000'; $query['messages'] = '执行完成'; $query['pagesize'] = $account['pagesize']; $query['tid'] = $account['tid']; $query['page'] = $account['page']; $query['success'] = $swoolecount;//查询数据 $query['result'] = $result; //执行数据 $query['error'] = $error; //执行数据 $task->finish($query); }); //处理异步任务的结果(此回调函数在worker进程中执行) $serv->on('Finish', function ($serv, $task_id, $data) { $pdo = $serv->PDO; if($data['success']<=0 && $data['result']<=0){ $data['state'] = '2'; $tmplmsg_s = "update ims_gicai_asgr_tmplmsg set click=click+{$data['result']},error=error+{$data['error']},page=page-1,now_worker=now_worker-1,state={$data['state']} where id=:id"; }else{ $tmplmsg_s = "update ims_gicai_asgr_tmplmsg set click=click+{$data['result']},error=error+{$data['error']},now_worker=now_worker-1 where id=:id"; } $statement = $pdo->prepare($tmplmsg_s); $statement->execute(array(':id'=>$data['tid'])); if($statement){ $affect_row = $statement->rowCount(); echo "AsyncTask[{$task_id}] Finish: ok".PHP_EOL; }else{ echo "AsyncTask[{$task_id}] Finish: no".PHP_EOL; } $statement = null; $pdo = null; }); $serv->on('Close', function ($serv, $fd) { echo "Client: Close.\n"; }); $serv->start(); ``` ### 你期待的结果是什么?实际看到的错误信息又是什么? 我现在的业务是 通过PHP TCP 向这个服务器 发送数据 ,然后服务器 接收到参数后 开始查询数据库的粉丝 拿到 第一页 100条数据 开始 布置Task任务 ,然后 通过foreach 循环 创建GO协程 每个GO里面是 curl 向服务器提交模板消息,然后回调 统计出成功的 返回 Finish 里 然后修改下 SQL 指定的表 。这个过程 都跑通了,但是CUP 大概在 60-90%徘徊 内存一点没消耗,大概 1个小时能发 不到 6万条。 我想肯定有其他好办法,我请教各位大神 能帮我优化下吗 。
赞
0
分享
收藏
提问
分享
讨论
建议
公告
开发框架
CodeGalaxy
评论
2022-11-21
Rango
说明你的程序主要消耗是在`CPU`运算,协程解决的是`IO`的问题。建议开启多个`Task`进程,利用多核
赞
0
回复
微信公众号
热门内容
暂无回复的问答
- CodeGalaxy K3s 轻量集群节点之间如何实现负载均衡
- 关于openssl CURL WARNING swSSL_connect: SSL_connect(fd=69) failed. Error: error:141A318A:SSL routines:tls_process_ske_dhe:dh key too small[1|394]
- 多个模型如何进行事务异常回退?
- websocket开启wss报错
- 协程tcp服务器如何使用多进程?recv()方法接收信息,打印出来的pid一直是同一个。没用使用到多进程啊。