基于第三方KV存储的 PHP 版本消息队列实现

消息队列使用非常广泛,在缓解高并发情况时有相当好的作用,本文提供 PHP 版本消息队列的实现

相比其它消息队列有以下优点:

1.采用 PHP 编写,使用方便,可根据需求自行修改,代码一目了然;

2.支持memcache,tokyocabinet,redis,google的kv缓存服务等KV存储;

3.无需要安装,可控性强。

本文旨在帮助大家了解队列(先进先出)的实现方法。实现如下:

<?php
/**
*
* 基于第三方KV存储的消息队列
*
* the last known user to change this file in the repository <$LastChangedBy: rainkid {1}gt;
* @author rainkid <raink.kid@gmail.com>
* @version $Id: 1532 2011-03-01 02:09:44Z rainkid $
* @package
*/
class Queue
{
private $_handler = null;
private $_maxqueue = 1000000;//消息队列最大存储量

/**
*
* 构造函数
* @param handler $handler
*/
public function __construct($handler){
$this->_handler = $handler;
}

/**
*
* 添加到消息队列
* @param String $input_name
* @param String $value
*/
public function put($input_name, $value){
$queue_putpos = $this->_now_putpos($input_name);
$key = $input_name . ":" . $queue_putpos;
if($queue_putpos){
return $this->_handler->set($key, $value, 0);
}
return false;
}

/**
*
* 从消息队列中取数据
* @param String $input_name
*/
public function get($input_name){
$queue_getpos = $this->_now_getpos($input_name);
if($queue_getpos == 0){
return false;
}
$key = $input_name . ":" . $queue_getpos;
$result = $this->_handler->get($key);
$this->_handler->delete($key);
return $result;
}

/**
*
* 查看消息队列中的某个点的消息
* @param String $input_name
* @param Int $pos
*/
public function view($input_name, $pos){
$queue_name = $input_name . ":" .$pos;
return $this->_handler->get($queue_name);
}

/**
*
* 查看消息队列状态
* @param String $input_name 消息队列名称
* @return
*/
public function status($input_name){
$maxqueue_num = $this->_read_maxqueue($input_name);
$queue_putpos = $this->_read_putpos($input_name);
$queue_getpos = $this->_read_getpos($input_name);

if($queue_putpos > $queue_getpos){
$unget = abs($queue_putpos - $queue_getpos);
}else if($queue_putpos < $queue_getpos){
$unget = abs($this->_maxqueue - $queue_getpos + $queue_putpos);
}else{
$unget = 0;
}
return array('maxqueue_num'=>$maxqueue_num, 'queue_putpos'=>$queue_putpos, 'queue_getpos'=>$queue_getpos, 'unget'=>$unget);
}

/**
*
* 重置消息队列
* @param String $input_name
*/
public function reset($input_name){
$this->_handler->delete("putpos:".$input_name);
$this->_handler->delete("getpos:".$input_name);
$this->_handler->delete("maxqueue:".$input_name);
}

/**
*
* 设置消息队列的存储数
* @param Int $maxqueue
*/
public function maxqueue($maxqueue){
$this->_maxqueue = intval( $maxqueue );
}

/**
*
* 获取当前入消息队列位置
* @param String $input_name 队列名称
*/
private function _now_putpos($input_name){
$maxqueue_num = $this->_read_maxqueue($input_name);
$queue_putpos = $this->_read_putpos($input_name);
$queue_getpos = $this->_read_getpos($input_name);

$queue_name = "putpos:".$input_name;

//队列写入位置点加1
$queue_putpos +=1;
//如果队列写入ID+1之后追上队列读取ID,则说明队列已满,返回0,拒绝继续写入
if ($queue_putpos == $queue_getpos) {
$queue_putpos = 0;
//如果队列写入ID大于最大队列数量,并且从未进行过出队列操作(=0)或进行过1次出队列操作(=1),返回0,拒绝继续写入
}else if($queue_getpos <=1 && $queue_putpos > $this->_maxqueue){
$queue_putpos = 0;
//如果队列写入ID大于最大队列数量,则重置队列写入位置点的值为1
}else if($queue_putpos > $this->_maxqueue){
$queue_putpos = 1;
$this->_handler->set($queue_name, $queue_putpos, 0);
//队列写入位置点加1后的值,回写入数据库
}else{
$this->_handler->set($queue_name, $queue_putpos, 0);
}
return $queue_putpos;
}

/**
*
* 获取当前出消息队列位置
* @param String $input_name
*/
private function _now_getpos($input_name){
$maxqueue_num = $this->_read_maxqueue($input_name);
$queue_putpos = $this->_read_putpos($input_name);
$queue_getpos = $this->_read_getpos($input_name);

$queue_name = "getpos:".$input_name;
//如果queue_get_value的值不存在,重置队列读取位置点为1
if($queue_getpos == 0 && $queue_putpos > 0){
$queue_getpos = 1;
$this->_handler->set($queue_name, $queue_getpos, 0);
//如果队列的读取值(出队列)小于队列的写入值(入队列) */
}else if($queue_getpos < $queue_putpos){
$queue_getpos+=1;
$this->_handler->set($queue_name, $queue_getpos, 0);
//如果队列的读取值(出队列)大于队列的写入值(入队列),并且队列的读取值(出队列)小于最大队列数量
}else if($queue_getpos > $queue_putpos && $queue_getpos < $this->_maxqueue){
$queue_getpos+=1;
$this->_handler->set($queue_name, $queue_getpos, 0);
//如果队列的读取值(出队列)大于队列的写入值(入队列),并且队列的读取值(出队列)等于最大队列数量
}else if($queue_getpos > $queue_putpos && $queue_getpos == $this->_maxqueue){
$queue_getpos = 1;
$this->_handler->set($queue_name, $queue_getpos, 0);
}else{
$queue_getpos = 0;
}
return $queue_getpos;
}

/**
*
* 获取入消息队列位置
* @param unknown_type $input_name
*/
private function _read_putpos($input_name){
$pos = 0;
$key = "putpos:" . $input_name;

$temp = $this->_handler->get($key);
if($temp){
$pos = $temp;
}
return $pos;
}

/**
*
* 获取出消息队列位置
* @param unknown_type $input_name
*/
private function _read_getpos($input_name){
$pos = 0;
$key = "getpos:" . $input_name;
$temp = $this->_handler->get($key);
if($temp){
$pos = $temp;
}
return $pos;
}

/**
*
* 获取消息写入最大存储量
* @param String $input_name
*/
private function _read_maxqueue($input_name){
$pos = 0;
$temp = $this->_handler->get("maxqueue:" . $input_name);
if($temp){
$pos = $temp;
}else{
$pos = $this->_maxqueue;
}
return $pos;
}
}
?>

测试消息队列(Memcache , Tokyotrant):

<?php
include_once 'Queue.php';

echo "<h2>基于第三方KV存储的消息队列</h2>";
echo "<font color='red'>注:传入队列存储句柄必须提供set, get ,delete方法</font>";
echo "<hr/>";
/*******Memcache*******/
echo "Memcache队列测试:<br />";
//实例化Memcache
$mc = new Memcache;
$mc->connect('127.0.0.1', 11211);
//实例化消息队列
$queue = new Queue($mc);
$name = "Memcache";
//写入消息队列
$queue->put($name, "Memcache消息队列测试--".rand(0,100));
//出消息队列
$temp = $queue->get($name);
var_dump($temp);
$temp = $queue->status($name);
var_dump($temp);
echo "<hr/>";

/*******Tokyotrant*******/
echo "Tokyotrant队列测试:<br />";
//实例化Tokyotran
$tt = new Memcache;
$tt->connect('127.0.0.1', 1978);
//实例化消息队列
$queue = new Queue($tt);
$name = "Tokyotrant";
//写入消息队列
$queue->put($name, "Tokyotrant消息队列测试--".rand(0,100));
//出消息队列
$temp = $queue->get($name);
var_dump($temp);
$temp = $queue->status($name);
var_dump($temp);
echo "<hr/>";

发表评论

电子邮件地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据