123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. <?php
  2. namespace com;
  3. use think\Cache;
  4. /**
  5. * 伪队列,(用于导入导出轮询时排队)
  6. *
  7. * @author Ymob
  8. * @date 2019-09-30
  9. */
  10. class PseudoQueue
  11. {
  12. /**
  13. * 队列名称
  14. *
  15. * @var string
  16. */
  17. protected $name = '';
  18. /**
  19. * 队列最大数量,默认0,不限数量
  20. *
  21. * @var int
  22. */
  23. protected $max = 0;
  24. /**
  25. * 最大可执行数
  26. *
  27. * @var int
  28. */
  29. protected $max_exec = 0;
  30. /**
  31. * 队列任务请求最大间隔时间,单位秒 默认600秒
  32. *
  33. * *考虑网络波动等原因不建议小于 10 秒*
  34. *
  35. * @var int
  36. */
  37. protected $overtime = 600;
  38. /**
  39. * 队列
  40. *
  41. * @var array
  42. */
  43. protected $queue = [];
  44. /**
  45. * 当前任务ID
  46. *
  47. * @var string
  48. */
  49. public $task_id = '';
  50. /**
  51. * 错误信息
  52. *
  53. * @var string
  54. */
  55. public $error = '';
  56. /**
  57. * 构造函数
  58. *
  59. * @param string $name 队列名称
  60. * @param int $max_exec 最大可执行数
  61. */
  62. public function __construct($name, $max_exec)
  63. {
  64. $this->name = 'QUEUE_' . $name;
  65. $this->max_exec = $max_exec;
  66. $this->queue = Cache::get($this->name) ?: [];
  67. }
  68. /**
  69. * 设置当前任务ID
  70. *
  71. * @param string $task_id 队列索引
  72. * @return bool
  73. */
  74. public function setTaskId($task_id)
  75. {
  76. $index = $this->getIndex($task_id);
  77. if ($index === false) {
  78. return false;
  79. }
  80. $this->task_id = $task_id;
  81. return true;
  82. }
  83. /**
  84. * 设置队列最大长度
  85. *
  86. * @param int $max
  87. * @return void
  88. */
  89. public function setMax(int $max)
  90. {
  91. $this->max = $max;
  92. }
  93. /**
  94. * 队列最大长度
  95. *
  96. * @return int
  97. */
  98. public function getMax()
  99. {
  100. return $this->max;
  101. }
  102. /**
  103. * 设置队列任务请求最大间隔时间 (单位:秒s)
  104. *
  105. * @param int $overtime
  106. * @return void
  107. */
  108. public function setOvertime(int $overtime)
  109. {
  110. $this->overtime = $overtime;
  111. }
  112. /**
  113. * 读取队列任务请求最大间隔时间
  114. *
  115. * @return int
  116. */
  117. public function getOvertime()
  118. {
  119. return $this->overtime;
  120. }
  121. /**
  122. * 生成任务ID,并加入队列
  123. *
  124. * @return string
  125. */
  126. public function makeTaskId()
  127. {
  128. do {
  129. $task_id = md5(time() . rand(100, 999));
  130. } while (in_array($task_id, array_column($this->queue, 'task_id')));
  131. if (!$this->enqueue($task_id)) {
  132. return false;
  133. }
  134. $this->task_id = $task_id;
  135. return $task_id;
  136. }
  137. /**
  138. * 入队
  139. *
  140. * @param string $$this->queue 任务ID
  141. * @return bool
  142. */
  143. public function enqueue($task_id)
  144. {
  145. if ($this->max > 0) {
  146. if (count($this->queue) >= $this->max) {
  147. $this->error = '队列长度达到上限';
  148. return false;
  149. }
  150. }
  151. $this->queue[] = [
  152. 'task_id' => $task_id,
  153. // 上次处理时间
  154. 'last_processing_time' => time(),
  155. ];
  156. Cache::set($this->name, $this->queue);
  157. return true;
  158. }
  159. /**
  160. * 出队
  161. *
  162. * @param string $task_id 任务ID,默认当前任务ID
  163. * @return bool
  164. */
  165. public function dequeue($task_id = null)
  166. {
  167. $task_id = $task_id ?: $this->task_id;
  168. $index = $this->getIndex($task_id);
  169. if ($index === false) {
  170. return false;
  171. }
  172. unset($this->queue[$index]);
  173. $this->queue = array_values($this->queue);
  174. Cache::set($this->name, $this->queue);
  175. return true;
  176. }
  177. /**
  178. * 当前任务是否可执行
  179. *
  180. * @param string $task_id 任务ID,默认当前任务ID
  181. * @return bool
  182. */
  183. public function canExec($task_id = null)
  184. {
  185. $task_id = $task_id ?: $this->task_id;
  186. $index = $this->getIndex($task_id);
  187. if ($index === false) {
  188. return false;
  189. }
  190. $res = false;
  191. for ($i = 0; $i < $this->max_exec; $i++) {
  192. if (!isset($this->queue[$i])) {
  193. break;
  194. }
  195. // 判断任务处理是否超时
  196. $temp_time = $this->queue[$i]['last_processing_time'] + $this->overtime;
  197. if (time() > $temp_time) {
  198. $this->dequeue($this->queue[$i]['task_id']);
  199. $i--;
  200. $index--;
  201. continue;
  202. }
  203. if ($index == $i) {
  204. $res = true;
  205. $this->queue[$i]['last_processing_time'] = time();
  206. Cache::set($this->name, $this->queue);
  207. break;
  208. }
  209. }
  210. if (!$res) {
  211. $this->error = '服务器繁忙,排队中...' . ($index - $this->max_exec);
  212. }
  213. return $res;
  214. }
  215. /**
  216. * 获取 [当前] 任务所在队列索引
  217. *
  218. * @param string $task_id 任务ID,默认当前任务ID
  219. * @return mixed bool | int
  220. */
  221. public function getIndex($task_id = null)
  222. {
  223. $task_id = $task_id ?: $this->task_id;
  224. $index = array_search($task_id, array_column($this->queue, 'task_id'));
  225. if ($index === false) {
  226. $this->error = '队列中不存在该任务';
  227. }
  228. return $index;
  229. }
  230. /**
  231. *
  232. */
  233. /**
  234. * 任务缓存数据
  235. *
  236. * @param string $key 缓存键
  237. * @param mixed $val 缓存值,不传改参数为获取值
  238. * @return mixed|void
  239. * @author Ymob
  240. * @datetime 2019-10-22 13:52:19
  241. */
  242. public function cache($key, $val = null)
  243. {
  244. if (!$this->task_id) {
  245. $this->error = '任务ID不存在';
  246. return false;
  247. }
  248. $index = $this->getIndex($this->task_id);
  249. if ($val === null) {
  250. return $this->queue[$index]['cache'][$key];
  251. } else {
  252. $this->queue[$index]['cache'][$key] = $val;
  253. Cache::set($this->name, $this->queue);
  254. }
  255. }
  256. }