Redis协议笔记

redis协议相当简单好理解。

redis协议支持类型:

  • 正确 Simple Strings
  • 错误 Errors
  • 整数 Integers
  • 字符块 Bulk Strings
  • 数组 Arrays

Simple Strings(+)
Simple Strings的呈现以+开始,以\r\n结尾,一个只含有'OK'的表示为+OK\r\n。所以简单的字符串不能包含连续的\r\n字符。

Errors(-)
Errors 的呈现以-开始,以\r\n结尾,一条表示Error message错误信息的字符为-Error message\r\n

Integers(:)
Integers 的呈现以:开始,以\r\n结尾,比如:1000\r\n表示数字1000

Bulk Strings($)
Bulk Strings 数据头同时表示下一行数据长度,不包括换行符长度\r\n,$后面则是对应的长度的数据。它还有一个特殊用途,用长度为-1的空字符表示NUll值

Arrays(*)
Arrays 头以 * 开始,后边接一个Integer类型,表示消息体总共有多少行,不包括当前行,*后面是具体的行数,也就是数组长度,之后接着各自的数据类型。

发送方式

客户端将命令用块字符的方式发送给服务端,比如:

SET xyz abcde
*3\r\n
$3\r\n
SET\r\n
$3\r\n
xyz\r\n
$5\r\n
abcde\r\n

成功或者失败:

+OK\r\n
-错误信息\r\n

如果是内建的命令操作,可以直接发送命令给服务器。比如PING, EXISTS mykey等.使用telnet 127.0.0.1 6379连上默认的redis server, 敲入PING就可以返回结果+PONG

php实现redis客户端和服务端 string功能:

  • 服务端
<?php
/**
 * 多进程阻塞式
 */
class Xtgxiso_server
{
    private $socket = false;
    private $process_num = 100;
    public $redis_kv_data = array();
    public $onMessage = null;
 
    function __construct($host="0.0.0.0",$port=1215)
    {
        $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
        if (!$this->socket) die($errstr."--".$errno);
        echo "listen $host $port \r\n";
        ini_set("memory_limit", "128M");
    }
 
    private function parseRESP(&$conn){
        $line = fgets($conn);
        if($line === '' || $line === false)
        {
            return null;
        }
        $type = $line[0];
        $line = mb_substr($line,1,-2);
        switch ( $type ){
            case "*":
                $count = (int) $line;
                $data = array();
                for ($i = 1; $i <= $count; $i++) {
                    $data[] = $this->parseRESP($conn);
                }
                return $data;
            case "$":
                if ($line == '-1') {
                    return null;
                }
                $length = $line + 2;
                $data = '';
                while ($length > 0) {
                    $block = fread($conn, $length);
                    if ($length !== strlen($block)) {
                        throw new Exception('RECEIVING');
                    }
                    $data .= $block;
                    $length -= mb_strlen($block);
                }
                return mb_substr($data, 0, -2);
        }
        return $line;
    }
 
    private function start_worker_process(){
        $pid = pcntl_fork();
        switch ($pid) {
            case -1:
                echo "fork error : {$i} \r\n";
                exit;
            case 0:
                while ( 1 ) {
                    echo  "waiting...\n";
                    $conn = stream_socket_accept($this->socket, -1);
                    if ( !$conn ){
                        continue;
                    }
                    //"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"
                    while(1){
                        $arr = $this->parseRESP($conn);
                        if ( is_array($arr) ) {
                            if ($this->onMessage) {
                                call_user_func($this->onMessage, $conn, $arr);
                            }
                        }else if ( $arr ){
                            if ($this->onMessage) {
                                call_user_func($this->onMessage, $conn, $arr);
                            }
                        }else{
                            fclose($conn);
                            break;
                        }
                    }
                }
            default:
                $this->pids[$pid] = $pid;
                break;
        }
    }
 
    public function run(){
        for($i = 1; $i <= $this->process_num; $i++){
            $this->start_worker_process();
        }
 
        while(1){
            foreach ($this->pids as $i => $pid) {
                if($pid) {
                    $res = pcntl_waitpid($pid, $status,WNOHANG);
 
                    if ( $res == -1 || $res > 0 ){
                        $this->start_worker_process();
                        unset($this->pids[$pid]);
                    }
                }
            }
            sleep(1);
        }
    }
 
}
$server =  new Xtgxiso_server();
$server->onMessage = function($conn,$info) use($server){
    if ( is_array($info) ){
        if ( $info["0"] == "SET" ) {
            $key = $info[1];
            $val = $info[2];
            $server->redis_kv_data[$key] = $val;
            fwrite($conn, "+OK\r\n");
        }else if ( $info["0"] == "GET" ){
            $key = $info[1];
            fwrite($conn, "$".strlen($server->redis_kv_data[$key])."\r\n".$server->redis_kv_data[$key]."\r\n");
        }else{
            fwrite($conn,"+OK\r\n");
        }
    }else{
        fwrite($conn,"+OK\r\n");
    }
};
$server->run();

通过如下命令来测试PHP实现的性能:

redis-benchmark -h 127.0.0.1 -p 1215 -t set -n 80000 -q
  • 客户端
<?php
namespace xtgxiso;
class Redis {
    private $redis_socket = false;
    private $cmd = '';
    public function __construct($host='127.0.0.1',$port=6379,$timeout = 3) {
        $this->redis_socket = stream_socket_client("tcp://".$host.":".$port, $errno, $errstr,  $timeout);
        if ( !$this->redis_socket) {
            throw new Exception("{$errno} - {$errstr}");
        }
    }
    public function __destruct() {
        fclose($this->redis_socket);
    }
    public function __call($name, $args) {
        $crlf = "\r\n";
        array_unshift($args,$name);
        $command = '*' . count($args) . $crlf;
        foreach ($args as $arg) {
            $command .= '$' . strlen($arg) . $crlf . $arg . $crlf;
        }
        $fwrite = fwrite($this->redis_socket,$command);
        if ($fwrite === FALSE || $fwrite <= 0) {
            throw new Exception('Failed to write entire command to stream');
        }
        return $this->readResponse();
    }
    private function readResponse() {
        $reply = trim(fgets($this->redis_socket, 1024));
        switch (substr($reply, 0, 1)) {
            case '-':
                throw new Exception(trim(substr($reply, 4)));
                break;
            case '+':
                $response = substr(trim($reply), 1);
                if ($response === 'OK') {
                    $response = TRUE;
                }
                break;
            case '$':
                $response = NULL;
                if ($reply == '$-1') {
                    break;
                }
                $read = 0;
                $size = intval(substr($reply, 1));
                if ($size > 0) {
                    do {
                        $block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
                        $r = fread($this->redis_socket, $block_size);
                        if ($r === FALSE) {
                            throw new Exception('Failed to read response from stream');
                        } else {
                            $read += strlen($r);
                            $response .= $r;
                        }
                    } while ($read < $size);
                }
                fread($this->redis_socket, 2); /* discard crlf */
                break;
            /* Multi-bulk reply */
            case '*':
                $count = intval(substr($reply, 1));
                if ($count == '-1') {
                    return NULL;
                }
                $response = array();
                for ($i = 0; $i < $count; $i++) {
                    $response[] = $this->readResponse();
                }
                break;
            /* Integer reply */
            case ':':
                $response = intval(substr(trim($reply), 1));
                break;
            default:
                throw new RedisException("Unknown response: {$reply}");
                break;
        }
        return $response;
    }
}
/*
$redis = new Client_test();
var_dump($redis->auth("123456"));
var_dump($redis->set("a",'b'));
var_dump($redis->get("a"));
*/

参考:
http://redis.cn/topics/protocol.html
http://www.01happy.com/php-redis-server/
http://www.01happy.com/php-redis-client/

标签: none

已有 2 条评论

  1. jackwu jackwu

    请问下,Typecho 更多内容的,插件是什么?

    1. jackwu jackwu

      错了 是 “- 阅读剩余部分 -”

添加新评论