Redis协议笔记
redis协议相当简单好理解。
redis协议支持类型:
- 正确 Simple Strings
- 错误 Errors
- 整数 Integers
- 字符块 Bulk Strings
- 数组 Arrays
Simple Strings(+)
Simple Strings的呈现以+开始,以\r\n结尾,一个只含有'OK'的表示为+OK\r\n。所以简单的字符串不能包含连续的\r\n字符。
Errors(-)
Errors 的呈现以-开始,以\r\n结尾,一条表示Error message错误信息的字符为-Error message\r\n
Integers(:)
Integers 的呈现以:开始,以\r\n结尾,比如:1000\r\n表示数字1000
Bulk Strings($)
Bulk Strings 数据头同时表示下一行数据长度,不包括换行符长度\r\n,$后面则是对应的长度的数据。它还有一个特殊用途,用长度为-1的空字符表示NUll值
Arrays(*)
Arrays 头以 * 开始,后边接一个Integer类型,表示消息体总共有多少行,不包括当前行,*后面是具体的行数,也就是数组长度,之后接着各自的数据类型。
发送方式
客户端将命令用块字符的方式发送给服务端,比如:
SET xyz abcde
*3\r\n
$3\r\n
SET\r\n
$3\r\n
xyz\r\n
$5\r\n
abcde\r\n
成功或者失败:
+OK\r\n
-错误信息\r\n
如果是内建的命令操作,可以直接发送命令给服务器。比如PING, EXISTS mykey等.使用telnet 127.0.0.1 6379连上默认的redis server, 敲入PING就可以返回结果+PONG
php实现redis客户端和服务端 string功能:
- 服务端
<?php
/**
* 多进程阻塞式
*/
class Xtgxiso_server
{
private $socket = false;
private $process_num = 100;
public $redis_kv_data = array();
public $onMessage = null;
function __construct($host="0.0.0.0",$port=1215)
{
$this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
if (!$this->socket) die($errstr."--".$errno);
echo "listen $host $port \r\n";
ini_set("memory_limit", "128M");
}
private function parseRESP(&$conn){
$line = fgets($conn);
if($line === '' || $line === false)
{
return null;
}
$type = $line[0];
$line = mb_substr($line,1,-2);
switch ( $type ){
case "*":
$count = (int) $line;
$data = array();
for ($i = 1; $i <= $count; $i++) {
$data[] = $this->parseRESP($conn);
}
return $data;
case "$":
if ($line == '-1') {
return null;
}
$length = $line + 2;
$data = '';
while ($length > 0) {
$block = fread($conn, $length);
if ($length !== strlen($block)) {
throw new Exception('RECEIVING');
}
$data .= $block;
$length -= mb_strlen($block);
}
return mb_substr($data, 0, -2);
}
return $line;
}
private function start_worker_process(){
$pid = pcntl_fork();
switch ($pid) {
case -1:
echo "fork error : {$i} \r\n";
exit;
case 0:
while ( 1 ) {
echo "waiting...\n";
$conn = stream_socket_accept($this->socket, -1);
if ( !$conn ){
continue;
}
//"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"
while(1){
$arr = $this->parseRESP($conn);
if ( is_array($arr) ) {
if ($this->onMessage) {
call_user_func($this->onMessage, $conn, $arr);
}
}else if ( $arr ){
if ($this->onMessage) {
call_user_func($this->onMessage, $conn, $arr);
}
}else{
fclose($conn);
break;
}
}
}
default:
$this->pids[$pid] = $pid;
break;
}
}
public function run(){
for($i = 1; $i <= $this->process_num; $i++){
$this->start_worker_process();
}
while(1){
foreach ($this->pids as $i => $pid) {
if($pid) {
$res = pcntl_waitpid($pid, $status,WNOHANG);
if ( $res == -1 || $res > 0 ){
$this->start_worker_process();
unset($this->pids[$pid]);
}
}
}
sleep(1);
}
}
}
$server = new Xtgxiso_server();
$server->onMessage = function($conn,$info) use($server){
if ( is_array($info) ){
if ( $info["0"] == "SET" ) {
$key = $info[1];
$val = $info[2];
$server->redis_kv_data[$key] = $val;
fwrite($conn, "+OK\r\n");
}else if ( $info["0"] == "GET" ){
$key = $info[1];
fwrite($conn, "$".strlen($server->redis_kv_data[$key])."\r\n".$server->redis_kv_data[$key]."\r\n");
}else{
fwrite($conn,"+OK\r\n");
}
}else{
fwrite($conn,"+OK\r\n");
}
};
$server->run();
通过如下命令来测试PHP实现的性能:
redis-benchmark -h 127.0.0.1 -p 1215 -t set -n 80000 -q
- 客户端
<?php
namespace xtgxiso;
class Redis {
private $redis_socket = false;
private $cmd = '';
public function __construct($host='127.0.0.1',$port=6379,$timeout = 3) {
$this->redis_socket = stream_socket_client("tcp://".$host.":".$port, $errno, $errstr, $timeout);
if ( !$this->redis_socket) {
throw new Exception("{$errno} - {$errstr}");
}
}
public function __destruct() {
fclose($this->redis_socket);
}
public function __call($name, $args) {
$crlf = "\r\n";
array_unshift($args,$name);
$command = '*' . count($args) . $crlf;
foreach ($args as $arg) {
$command .= '$' . strlen($arg) . $crlf . $arg . $crlf;
}
$fwrite = fwrite($this->redis_socket,$command);
if ($fwrite === FALSE || $fwrite <= 0) {
throw new Exception('Failed to write entire command to stream');
}
return $this->readResponse();
}
private function readResponse() {
$reply = trim(fgets($this->redis_socket, 1024));
switch (substr($reply, 0, 1)) {
case '-':
throw new Exception(trim(substr($reply, 4)));
break;
case '+':
$response = substr(trim($reply), 1);
if ($response === 'OK') {
$response = TRUE;
}
break;
case '$':
$response = NULL;
if ($reply == '$-1') {
break;
}
$read = 0;
$size = intval(substr($reply, 1));
if ($size > 0) {
do {
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
$r = fread($this->redis_socket, $block_size);
if ($r === FALSE) {
throw new Exception('Failed to read response from stream');
} else {
$read += strlen($r);
$response .= $r;
}
} while ($read < $size);
}
fread($this->redis_socket, 2); /* discard crlf */
break;
/* Multi-bulk reply */
case '*':
$count = intval(substr($reply, 1));
if ($count == '-1') {
return NULL;
}
$response = array();
for ($i = 0; $i < $count; $i++) {
$response[] = $this->readResponse();
}
break;
/* Integer reply */
case ':':
$response = intval(substr(trim($reply), 1));
break;
default:
throw new RedisException("Unknown response: {$reply}");
break;
}
return $response;
}
}
/*
$redis = new Client_test();
var_dump($redis->auth("123456"));
var_dump($redis->set("a",'b'));
var_dump($redis->get("a"));
*/
参考:
http://redis.cn/topics/protocol.html
http://www.01happy.com/php-redis-server/
http://www.01happy.com/php-redis-client/