首页
下载
文档
问答社区
视频
捐赠
源代码
AI 助理
赞助商
CRMEB
Apipost
腾讯云
微擎
禅道
51Talk
商业产品
Swoole AI 智能文档翻译器
Swoole-Compiler PHP 代码加密器
CRMEB 新零售社交电商系统
登录
注册
全部
提问
分享
讨论
建议
公告
开发框架
CodeGalaxy
发表新帖
基于Hyperf开发的任务调度系统.支持任务投递,DAG任务编排(多个任务使用同一个事务).
https://github.com/Hyperf-Glory/Task-Schedule # Task-Schedule Hyperf开发的任务调度系统. 基于 Hyperf + Nsq 的一个异步队列库.支持投递任务,DAG任务编排.多个任务使用同一个事务。 ## 特性 - 默认 Nsq 驱动 - 秒级延时任务 - 自定义重试次数和时间 - 自定义错误回调 - 支持任务执行中间件 - 自定义队列快照事件 - 弹性多进程消费 - 协程支持 - 漂亮的仪表盘 - 任务编排协程安全的单连接模式(事务保持、多路复用等条件下,有时必须使用一个连接) - dag任务编排 ## 环境 - PHP 7.4+ - Swoole 4.6+ - Redis 5.0+ (redis 驱动) - Nsq 1.2.0 ## TODO - 分布式支持 ## 案例 1.投递任务 ```php use App\Model\Task; use App\Job\SimpleJob; use App\Kernel\Nsq\Queue; class Example{ /** * @desc 测试job队列功能 */ public function queue() : void { $task = Task::find(1); $job = new SimpleJob($task); $queue = new Queue('queue'); $queue->push($job); } } ``` 2.任务编排协程安全的单连接模式(事务保持、多路复用等条件下,有时必须使用一个连接) ```php use App\Kernel\Concurrent\ConcurrentMySQLPattern; use App\Dag\Task\Task1; use App\Dag\Task\Task2; use App\Dag\Task\Task3; class Example{ public function conCurrentMySQL() : void { $dsn = 'DSN'; $user = 'USER'; $password = 'PWD'; try { $pdo = new \PDO($dsn, $user, $password); $pdo->setAttribute(\PDO::ATTR_EMULATE_PREPARES, true); $c = new ConcurrentMySQLPattern($pdo, $this->logger); $c->beginTransaction(); $dag = new \Hyperf\Dag\Dag(); $a = \Hyperf\Dag\Vertex::make(function () use ($c) { $task = new Task1(); return $task->Run($c); }, 'a'); $b = \Hyperf\Dag\Vertex::make(function ($results) use ($c) { $task = new Task2(); return $task->Run($c); }, 'b'); $d = \Hyperf\Dag\Vertex::make(function ($results) use ($c, $a, $b) { if ($results[$a->key] && $results[$b->key]) { return $c->commit(); } return $c->rollback(); }, 'd'); $e = \Hyperf\Dag\Vertex::make(function ($results) use ($c) { $c->close(); }, 'e'); $results = $dag ->addVertex($a) ->addVertex($b) ->addVertex($d) ->addVertex($e) ->addEdge($a, $b) ->addEdge($b, $d) ->addEdge($d, $e) ->run(); } catch (\PDOException $exception) { echo 'Connection failed: ' . $exception->getMessage(); } } } ``` ## 仪表盘 https://github.com/Hyperf-Glory/Task-Schedule/raw/main/img.png 参考:https://github.com/Littlesqx/aint-queue/
发布于3年前 · 13 次浏览 · 来自
分享
komorebi
https://github.com/Hyperf-Glory/Task-Schedule # Task-Schedule Hyperf开发的任务调度系统. 基于 Hyperf + Nsq 的一个异步队列库.支持投递任务,DAG任务编排.多个任务使用同一个事务。 ## 特性 - 默认 Nsq 驱动 - 秒级延时任务 - 自定义重试次数和时间 - 自定义错误回调 - 支持任务执行中间件 - 自定义队列快照事件 - 弹性多进程消费 - 协程支持 - 漂亮的仪表盘 - 任务编排协程安全的单连接模式(事务保持、多路复用等条件下,有时必须使用一个连接) - dag任务编排 ## 环境 - PHP 7.4+ - Swoole 4.6+ - Redis 5.0+ (redis 驱动) - Nsq 1.2.0 ## TODO - 分布式支持 ## 案例 1.投递任务 ```php use App\Model\Task; use App\Job\SimpleJob; use App\Kernel\Nsq\Queue; class Example{ /** * @desc 测试job队列功能 */ public function queue() : void { $task = Task::find(1); $job = new SimpleJob($task); $queue = new Queue('queue'); $queue->push($job); } } ``` 2.任务编排协程安全的单连接模式(事务保持、多路复用等条件下,有时必须使用一个连接) ```php use App\Kernel\Concurrent\ConcurrentMySQLPattern; use App\Dag\Task\Task1; use App\Dag\Task\Task2; use App\Dag\Task\Task3; class Example{ public function conCurrentMySQL() : void { $dsn = 'DSN'; $user = 'USER'; $password = 'PWD'; try { $pdo = new \PDO($dsn, $user, $password); $pdo->setAttribute(\PDO::ATTR_EMULATE_PREPARES, true); $c = new ConcurrentMySQLPattern($pdo, $this->logger); $c->beginTransaction(); $dag = new \Hyperf\Dag\Dag(); $a = \Hyperf\Dag\Vertex::make(function () use ($c) { $task = new Task1(); return $task->Run($c); }, 'a'); $b = \Hyperf\Dag\Vertex::make(function ($results) use ($c) { $task = new Task2(); return $task->Run($c); }, 'b'); $d = \Hyperf\Dag\Vertex::make(function ($results) use ($c, $a, $b) { if ($results[$a->key] && $results[$b->key]) { return $c->commit(); } return $c->rollback(); }, 'd'); $e = \Hyperf\Dag\Vertex::make(function ($results) use ($c) { $c->close(); }, 'e'); $results = $dag ->addVertex($a) ->addVertex($b) ->addVertex($d) ->addVertex($e) ->addEdge($a, $b) ->addEdge($b, $d) ->addEdge($d, $e) ->run(); } catch (\PDOException $exception) { echo 'Connection failed: ' . $exception->getMessage(); } } } ``` ## 仪表盘 https://github.com/Hyperf-Glory/Task-Schedule/raw/main/img.png 参考:https://github.com/Littlesqx/aint-queue/
赞
0
分享
收藏
提问
分享
讨论
建议
公告
开发框架
CodeGalaxy
评论
还没有评论!
微信公众号
热门内容
暂无回复的问答
- CodeGalaxy K3s 轻量集群节点之间如何实现负载均衡
- 关于openssl CURL WARNING swSSL_connect: SSL_connect(fd=69) failed. Error: error:141A318A:SSL routines:tls_process_ske_dhe:dh key too small[1|394]
- 多个模型如何进行事务异常回退?
- websocket开启wss报错
- 协程tcp服务器如何使用多进程?recv()方法接收信息,打印出来的pid一直是同一个。没用使用到多进程啊。