<?php
|
/**
|
* 数据库MQ
|
*
|
* - 队列存放于数据库表中,并支持分表
|
*
|
* @author dogstar <chanzonghuang@gmail.com> 20150516
|
*/
|
|
class Model_Task_TaskMq extends PhalApi_Model_NotORM {
|
|
protected function getTableName($id = NULL) {
|
$prefix = hexdec(substr(sha1($id), -1)) % 10;
|
return 'task_mq_' . $prefix;
|
}
|
|
public function add($service, $params = array()) {
|
$data = array(
|
'service' => $service,
|
'params' => json_encode($params),
|
'create_time' => time(),
|
);
|
|
$id = $this->insert($data, $service);
|
|
return $id > 0 ? TRUE : FALSE;
|
}
|
|
public function pop($service, $num = 1) {
|
$rows = $this->getORM($service)
|
->select('id, params')
|
->where('service', $service)
|
->order('id ASC')
|
->limit(0, $num)
|
->fetchAll();
|
|
if (empty($rows)) {
|
return array();
|
}
|
|
$ids = array();
|
foreach ($rows as $row) {
|
$ids[] = $row['id'];
|
}
|
|
$this->getORM($service)->where('id', $ids)->delete();
|
|
$rs = array();
|
foreach ($rows as $row) {
|
$params = json_decode($row['params'], TRUE);
|
if (!is_array($params)) {
|
$params = array();
|
}
|
|
$rs[] = $params;
|
}
|
|
return $rs;
|
}
|
}
|