| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- <?php
-
- namespace com;
-
- use think\Cache;
-
- /**
- * 伪队列,(用于导入导出轮询时排队)
- *
- * @author Ymob
- * @date 2019-09-30
- */
- class PseudoQueue
- {
-
- /**
- * 队列名称
- *
- * @var string
- */
- protected $name = '';
-
- /**
- * 队列最大数量,默认0,不限数量
- *
- * @var int
- */
- protected $max = 0;
-
- /**
- * 最大可执行数
- *
- * @var int
- */
- protected $max_exec = 0;
-
- /**
- * 队列任务请求最大间隔时间,单位秒 默认600秒
- *
- * *考虑网络波动等原因不建议小于 10 秒*
- *
- * @var int
- */
- protected $overtime = 600;
-
- /**
- * 队列
- *
- * @var array
- */
- protected $queue = [];
-
- /**
- * 当前任务ID
- *
- * @var string
- */
- public $task_id = '';
-
- /**
- * 错误信息
- *
- * @var string
- */
- public $error = '';
-
- /**
- * 构造函数
- *
- * @param string $name 队列名称
- * @param int $max_exec 最大可执行数
- */
- public function __construct($name, $max_exec)
- {
- $this->name = 'QUEUE_' . $name;
- $this->max_exec = $max_exec;
- $this->queue = Cache::get($this->name) ?: [];
- }
-
- /**
- * 设置当前任务ID
- *
- * @param string $task_id 队列索引
- * @return bool
- */
- public function setTaskId($task_id)
- {
- $index = $this->getIndex($task_id);
- if ($index === false) {
- return false;
- }
-
- $this->task_id = $task_id;
- return true;
- }
-
- /**
- * 设置队列最大长度
- *
- * @param int $max
- * @return void
- */
- public function setMax(int $max)
- {
- $this->max = $max;
- }
-
- /**
- * 队列最大长度
- *
- * @return int
- */
- public function getMax()
- {
- return $this->max;
- }
-
- /**
- * 设置队列任务请求最大间隔时间 (单位:秒s)
- *
- * @param int $overtime
- * @return void
- */
- public function setOvertime(int $overtime)
- {
- $this->overtime = $overtime;
- }
-
- /**
- * 读取队列任务请求最大间隔时间
- *
- * @return int
- */
- public function getOvertime()
- {
- return $this->overtime;
- }
-
- /**
- * 生成任务ID,并加入队列
- *
- * @return string
- */
- public function makeTaskId()
- {
- do {
- $task_id = md5(time() . rand(100, 999));
- } while (in_array($task_id, array_column($this->queue, 'task_id')));
-
- if (!$this->enqueue($task_id)) {
- return false;
- }
-
- $this->task_id = $task_id;
- return $task_id;
- }
-
- /**
- * 入队
- *
- * @param string $$this->queue 任务ID
- * @return bool
- */
- public function enqueue($task_id)
- {
- if ($this->max > 0) {
- if (count($this->queue) >= $this->max) {
- $this->error = '队列长度达到上限';
- return false;
- }
- }
- $this->queue[] = [
- 'task_id' => $task_id,
- // 上次处理时间
- 'last_processing_time' => time(),
- ];
- Cache::set($this->name, $this->queue);
- return true;
- }
-
- /**
- * 出队
- *
- * @param string $task_id 任务ID,默认当前任务ID
- * @return bool
- */
- public function dequeue($task_id = null)
- {
- $task_id = $task_id ?: $this->task_id;
- $index = $this->getIndex($task_id);
- if ($index === false) {
- return false;
- }
-
- unset($this->queue[$index]);
- $this->queue = array_values($this->queue);
- Cache::set($this->name, $this->queue);
- return true;
- }
-
- /**
- * 当前任务是否可执行
- *
- * @param string $task_id 任务ID,默认当前任务ID
- * @return bool
- */
- public function canExec($task_id = null)
- {
- $task_id = $task_id ?: $this->task_id;
-
- $index = $this->getIndex($task_id);
- if ($index === false) {
- return false;
- }
-
- $res = false;
-
- for ($i = 0; $i < $this->max_exec; $i++) {
- if (!isset($this->queue[$i])) {
- break;
- }
- // 判断任务处理是否超时
- $temp_time = $this->queue[$i]['last_processing_time'] + $this->overtime;
- if (time() > $temp_time) {
- $this->dequeue($this->queue[$i]['task_id']);
- $i--;
- $index--;
- continue;
- }
-
- if ($index == $i) {
- $res = true;
- $this->queue[$i]['last_processing_time'] = time();
- Cache::set($this->name, $this->queue);
- break;
- }
- }
-
- if (!$res) {
- $this->error = '服务器繁忙,排队中...' . ($index - $this->max_exec);
- }
-
- return $res;
- }
-
- /**
- * 获取 [当前] 任务所在队列索引
- *
- * @param string $task_id 任务ID,默认当前任务ID
- * @return mixed bool | int
- */
- public function getIndex($task_id = null)
- {
- $task_id = $task_id ?: $this->task_id;
-
- $index = array_search($task_id, array_column($this->queue, 'task_id'));
- if ($index === false) {
- $this->error = '队列中不存在该任务';
- }
-
- return $index;
- }
-
- /**
- *
- */
- /**
- * 任务缓存数据
- *
- * @param string $key 缓存键
- * @param mixed $val 缓存值,不传改参数为获取值
- * @return mixed|void
- * @author Ymob
- * @datetime 2019-10-22 13:52:19
- */
- public function cache($key, $val = null)
- {
- if (!$this->task_id) {
- $this->error = '任务ID不存在';
- return false;
- }
-
- $index = $this->getIndex($this->task_id);
-
- if ($val === null) {
- return $this->queue[$index]['cache'][$key];
- } else {
- $this->queue[$index]['cache'][$key] = $val;
- Cache::set($this->name, $this->queue);
- }
-
- }
- }
|