From 36cacebcd7d1ff2ee8487fd77ef416e350b210ce Mon Sep 17 00:00:00 2001 From: wangxiao1024 <10048976+wangxiao1024@user.noreply.gitee.com> Date: Tue, 23 Jul 2024 16:50:58 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86=E8=AF=A6=E7=BB=86?= =?UTF-8?q?=E4=B8=AD=E6=96=87=E6=B3=A8=E9=87=8A=E6=96=B9=E4=BE=BF=E5=88=9D?= =?UTF-8?q?=E5=AD=A6=E8=80=85=E5=AD=A6=E4=B9=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- interface/database/db.go | 44 ++++++------ interface/tcp/handler.go | 12 ++-- main.go | 31 ++++----- redis/connection/conn.go | 92 +++++++++++++------------ redis/parser/parser.go | 111 ++++++++++++++++++++++--------- redis/protocol/asserts/assert.go | 6 +- redis/protocol/consts.go | 3 + redis/protocol/errors.go | 2 + redis/server/server.go | 32 ++++----- tcp/server.go | 58 ++++++++-------- 10 files changed, 233 insertions(+), 158 deletions(-) diff --git a/interface/database/db.go b/interface/database/db.go index 9fc2d894..46f99b2a 100644 --- a/interface/database/db.go +++ b/interface/database/db.go @@ -7,38 +7,38 @@ import ( "github.com/hdt3213/rdb/core" ) -// CmdLine is alias for [][]byte, represents a command line +// CmdLine 是命令行的别名,表示一个命令行为一个二维字节切片 type CmdLine = [][]byte -// DB is the interface for redis style storage engine +// DB 是一个接口,为Redis风格的存储引擎定义了必要的方法 type DB interface { - Exec(client redis.Connection, cmdLine [][]byte) redis.Reply - AfterClientClose(c redis.Connection) - Close() - LoadRDB(dec *core.Decoder) error + Exec(client redis.Connection, cmdLine [][]byte) redis.Reply // 执行给定的命令行,并返回响应 + AfterClientClose(c redis.Connection) // 客户端关闭后的回调处理 + Close() // 关闭数据库连接 + LoadRDB(dec *core.Decoder) error // 从RDB解码器加载数据到数据库 } -// KeyEventCallback will be called back on key event, such as key inserted or deleted -// may be called concurrently +// KeyEventCallback 是键事件的回调函数类型,如键被插入或删除时调用 +// 可能会并发调用 type KeyEventCallback func(dbIndex int, key string, entity *DataEntity) -// DBEngine is the embedding storage engine exposing more methods for complex application +// DBEngine 是一个更高级的存储引擎接口,提供了更多的方法以支持复杂的应用场景 type DBEngine interface { DB - ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply - ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply - GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine - ForEach(dbIndex int, cb func(key string, data *DataEntity, expiration *time.Time) bool) - RWLocks(dbIndex int, writeKeys []string, readKeys []string) - RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) - GetDBSize(dbIndex int) (int, int) - GetEntity(dbIndex int, key string) (*DataEntity, bool) - GetExpiration(dbIndex int, key string) *time.Time - SetKeyInsertedCallback(cb KeyEventCallback) - SetKeyDeletedCallback(cb KeyEventCallback) + ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply // 在执行命令时加锁保护 + ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply // 执行多个命令,支持事务 + GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine // 获取撤销日志 + ForEach(dbIndex int, cb func(key string, data *DataEntity, expiration *time.Time) bool) // 遍历数据库中的键 + RWLocks(dbIndex int, writeKeys []string, readKeys []string) // 读写锁定一组键 + RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) // 解锁一组键 + GetDBSize(dbIndex int) (int, int) // 获取数据库大小 + GetEntity(dbIndex int, key string) (*DataEntity, bool) // 获取与键关联的数据实体 + GetExpiration(dbIndex int, key string) *time.Time // 获取键的过期时间 + SetKeyInsertedCallback(cb KeyEventCallback) // 设置键插入事件的回调 + SetKeyDeletedCallback(cb KeyEventCallback) // 设置键删除事件的回调 } -// DataEntity stores data bound to a key, including a string, list, hash, set and so on +// DataEntity 存储绑定到键的数据,包括字符串、列表、哈希、集合等 type DataEntity struct { - Data interface{} + Data interface{} // 存储实际的数据,数据类型可以是任何类型 } diff --git a/interface/tcp/handler.go b/interface/tcp/handler.go index 8c649bcb..f74a16c8 100644 --- a/interface/tcp/handler.go +++ b/interface/tcp/handler.go @@ -5,11 +5,15 @@ import ( "net" ) -// HandleFunc represents application handler function +// HandleFunc 定义了一个应用程序处理函数的类型 +// 这种类型的函数接收一个context和一个网络连接 +// ctx context.Context:上下文,用于控制子程序的生命周期 +// conn net.Conn:表示一个网络连接,用于读取和写入数据 type HandleFunc func(ctx context.Context, conn net.Conn) -// Handler represents application server over tcp +// Handler 接口代表一个基于TCP的应用服务器 +// 它定义了处理连接和关闭服务器的方法 type Handler interface { - Handle(ctx context.Context, conn net.Conn) - Close() error + Handle(ctx context.Context, conn net.Conn) // 处理接收到的网络连接 + Close() error // 关闭服务器,清理资源,如果有错误返回错误 } diff --git a/main.go b/main.go index c0b6b5ab..892a0114 100644 --- a/main.go +++ b/main.go @@ -20,40 +20,41 @@ var banner = ` ` var defaultProperties = &config.ServerProperties{ - Bind: "0.0.0.0", - Port: 6399, - AppendOnly: false, - AppendFilename: "", - MaxClients: 1000, - RunID: utils.RandString(40), + Bind: "0.0.0.0", // 默认绑定的IP地址 + Port: 6399, // 默认端口号 + AppendOnly: false, // 是否开启追加模式 + AppendFilename: "", // 追加模式的文件名 + MaxClients: 1000, // 最大客户端连接数 + RunID: utils.RandString(40), // 生成一个随机的运行ID } +// fileExists 检查文件是否存在且不是目录 func fileExists(filename string) bool { info, err := os.Stat(filename) return err == nil && !info.IsDir() } func main() { - print(banner) - logger.Setup(&logger.Settings{ + print(banner) // 打印启动标志 + logger.Setup(&logger.Settings{ // 设置日志文件配置 Path: "logs", Name: "godis", Ext: "log", TimeFormat: "2006-01-02", }) - configFilename := os.Getenv("CONFIG") + configFilename := os.Getenv("CONFIG") // 获取环境变量中的配置文件名 if configFilename == "" { - if fileExists("redis.conf") { - config.SetupConfig("redis.conf") + if fileExists("redis.conf") { // 检查默认配置文件是否存在 + config.SetupConfig("redis.conf") // 使用默认配置文件 } else { - config.Properties = defaultProperties + config.Properties = defaultProperties // 使用内置默认配置 } } else { - config.SetupConfig(configFilename) + config.SetupConfig(configFilename) // 使用环境变量指定的配置文件 } - err := tcp.ListenAndServeWithSignal(&tcp.Config{ + err := tcp.ListenAndServeWithSignal(&tcp.Config{ // 启动TCP服务器 Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), - }, RedisServer.MakeHandler()) + }, RedisServer.MakeHandler()) //调用我们的RedisServer.MakeHandler if err != nil { logger.Error(err) } diff --git a/redis/connection/conn.go b/redis/connection/conn.go index 8e1755de..9dc40751 100644 --- a/redis/connection/conn.go +++ b/redis/connection/conn.go @@ -1,74 +1,79 @@ package connection import ( - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/sync/wait" "net" "sync" "time" + + "github.com/hdt3213/godis/lib/logger" + "github.com/hdt3213/godis/lib/sync/wait" ) const ( - // flagSlave means this a connection with slave + // flagSlave 表示这是一个从服务器的连接 flagSlave = uint64(1 << iota) - // flagSlave means this a connection with master + // flagMaster 表示这是一个主服务器的连接 flagMaster - // flagMulti means this connection is within a transaction + // flagMulti 表示这个连接正在执行事务 flagMulti ) -// Connection represents a connection with a redis-cli +// Connection 代表与一个Redis客户端的连接 type Connection struct { - conn net.Conn + conn net.Conn // 网络连接实例 - // wait until finish sending data, used for graceful shutdown + // sendingData 用于优雅关闭时等待数据发送完成 sendingData wait.Wait - // lock while server sending response + // mu 用于在发送响应时的互斥锁 mu sync.Mutex - flags uint64 + flags uint64 // 连接的标志位,如是否为主从连接、是否处于事务中等 - // subscribing channels + /// subs 保存订阅的频道 subs map[string]bool - // password may be changed by CONFIG command during runtime, so store the password + // password 可能会在运行时通过CONFIG命令被修改,因此需要存储密码 password string - // queued commands for `multi` - queue [][][]byte + // queue 保存在事务中排队的命令 + queue [][][]byte + // watching 保存被WATCH命令监视的键及其版本号 watching map[string]uint32 + // txErrors 保存事务中的错误信息 txErrors []error - // selected db + // selectedDB 表示当前选择的数据库索引 selectedDB int } +// 连接池,用于重用连接对象 var connPool = sync.Pool{ New: func() interface{} { return &Connection{} }, } -// RemoteAddr returns the remote network address +// RemoteAddr 返回远程连接的网络地址 func (c *Connection) RemoteAddr() string { return c.conn.RemoteAddr().String() } -// Close disconnect with the client +// Close 用于断开与客户端的连接 func (c *Connection) Close() error { - c.sendingData.WaitWithTimeout(10 * time.Second) - _ = c.conn.Close() + c.sendingData.WaitWithTimeout(10 * time.Second) // 等待正在发送的数据完成或超时 + _ = c.conn.Close() // 关闭底层网络连接 + // 清理连接相关的状态信息 c.subs = nil c.password = "" c.queue = nil c.watching = nil c.txErrors = nil c.selectedDB = 0 - connPool.Put(c) + connPool.Put(c) // 将连接对象放回池中 return nil } -// NewConn creates Connection instance +// NewConn 用于创建新的Connection实例 func NewConn(conn net.Conn) *Connection { c, ok := connPool.Get().(*Connection) if !ok { @@ -81,7 +86,7 @@ func NewConn(conn net.Conn) *Connection { return c } -// Write sends response to client over tcp connection +// Write 向客户端发送响应 func (c *Connection) Write(b []byte) (int, error) { if len(b) == 0 { return 0, nil @@ -101,7 +106,7 @@ func (c *Connection) Name() string { return "" } -// Subscribe add current connection into subscribers of the given channel +// Subscribe 将当前连接添加到指定频道的订阅者中 func (c *Connection) Subscribe(channel string) { c.mu.Lock() defer c.mu.Unlock() @@ -112,7 +117,7 @@ func (c *Connection) Subscribe(channel string) { c.subs[channel] = true } -// UnSubscribe removes current connection into subscribers of the given channel +// UnSubscribe 从指定频道的订阅者中移除当前连接 func (c *Connection) UnSubscribe(channel string) { c.mu.Lock() defer c.mu.Unlock() @@ -123,12 +128,12 @@ func (c *Connection) UnSubscribe(channel string) { delete(c.subs, channel) } -// SubsCount returns the number of subscribing channels +// SubsCount 返回当前连接订阅的频道数量 func (c *Connection) SubsCount() int { return len(c.subs) } -// GetChannels returns all subscribing channels +// GetChannels 返回当前连接订阅的所有频道 func (c *Connection) GetChannels() []string { if c.subs == nil { return make([]string, 0) @@ -142,58 +147,58 @@ func (c *Connection) GetChannels() []string { return channels } -// SetPassword stores password for authentication +// SetPassword 设置连接的密码,用于认证 func (c *Connection) SetPassword(password string) { c.password = password } -// GetPassword get password for authentication +// GetPassword 获取连接的密码 func (c *Connection) GetPassword() string { return c.password } -// InMultiState tells is connection in an uncommitted transaction +// InMultiState 检查连接是否处于事务状态 func (c *Connection) InMultiState() bool { return c.flags&flagMulti > 0 } -// SetMultiState sets transaction flag +// SetMultiState 设置连接的事务状态 func (c *Connection) SetMultiState(state bool) { - if !state { // reset data when cancel multi + if !state { // 如果取消事务,重置相关数据 c.watching = nil c.queue = nil - c.flags &= ^flagMulti // clean multi flag + c.flags &= ^flagMulti // 清除事务标志 return } - c.flags |= flagMulti + c.flags |= flagMulti // 设置事务标志 } -// GetQueuedCmdLine returns queued commands of current transaction +// GetQueuedCmdLine 返回事务中排队的命令 func (c *Connection) GetQueuedCmdLine() [][][]byte { return c.queue } -// EnqueueCmd enqueues command of current transaction +// EnqueueCmd 将命令添加到事务队列 func (c *Connection) EnqueueCmd(cmdLine [][]byte) { c.queue = append(c.queue, cmdLine) } -// AddTxError stores syntax error within transaction +// AddTxError 添加事务执行中的错误 func (c *Connection) AddTxError(err error) { c.txErrors = append(c.txErrors, err) } -// GetTxErrors returns syntax error within transaction +// GetTxErrors 获取事务中的错误 func (c *Connection) GetTxErrors() []error { return c.txErrors } -// ClearQueuedCmds clears queued commands of current transaction +// ClearQueuedCmds 清除事务中排队的命令 func (c *Connection) ClearQueuedCmds() { c.queue = nil } -// GetWatching returns watching keys and their version code when started watching +// GetWatching 返回被监视的键和它们的版本号 func (c *Connection) GetWatching() map[string]uint32 { if c.watching == nil { c.watching = make(map[string]uint32) @@ -201,28 +206,33 @@ func (c *Connection) GetWatching() map[string]uint32 { return c.watching } -// GetDBIndex returns selected db +// GetDBIndex 返回选定的数据库索引 func (c *Connection) GetDBIndex() int { return c.selectedDB } -// SelectDB selects a database +// SelectDB 选择一个数据库 func (c *Connection) SelectDB(dbNum int) { c.selectedDB = dbNum } +// SetSlave 设置连接为从服务器模式 func (c *Connection) SetSlave() { c.flags |= flagSlave } +// IsSlave 检查连接是否为从服务器模式 func (c *Connection) IsSlave() bool { return c.flags&flagSlave > 0 } +// SetMaster 设置连接为主服务器模式 + func (c *Connection) SetMaster() { c.flags |= flagMaster } +// IsMaster 检查连接是否为主服务器模式 func (c *Connection) IsMaster() bool { return c.flags&flagMaster > 0 } diff --git a/redis/parser/parser.go b/redis/parser/parser.go index 64110024..ed1828d3 100644 --- a/redis/parser/parser.go +++ b/redis/parser/parser.go @@ -14,41 +14,52 @@ import ( "github.com/hdt3213/godis/redis/protocol" ) -// Payload stores redis.Reply or error +// Payload 存储redis.Reply或错误,redis解析器里面解析完成存储的数据的数据结构。 type Payload struct { Data redis.Reply Err error } -// ParseStream reads data from io.Reader and send payloads through channel +/* + 有一个Reply接口,里面包含Tobytes函数,所有的不同类型的Reply结构体都实现了Tobytes函数 + 从而我们实现多态。每个不同类型的Reply结构体中有相关的字段记录了仅仅包含的字符串 + 如*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n + 存入到可以通过MakeMultiBulkReply 放入到 + type MultiBulkReply struct { + Args [][]byte + } + 被存储为[set][key][value]. + 如果我们调用每个不同Reply结构体类型的ToBytes函数,则会把他变成redis的resp格式。 +*/ +// ParseStream 从io.Reader读取数据并通过通道发送负载 func ParseStream(reader io.Reader) <-chan *Payload { - ch := make(chan *Payload) - go parse0(reader, ch) - return ch + ch := make(chan *Payload) // 创建通道 + go parse0(reader, ch) // 启动协程解析数据 + return ch // 返回通道 } -// ParseBytes reads data from []byte and return all replies +// ParseBytes 从[]byte读取数据并返回所有回复 func ParseBytes(data []byte) ([]redis.Reply, error) { - ch := make(chan *Payload) - reader := bytes.NewReader(data) - go parse0(reader, ch) - var results []redis.Reply - for payload := range ch { - if payload == nil { + ch := make(chan *Payload) // 创建通道 + reader := bytes.NewReader(data) // 创建bytes阅读器 + go parse0(reader, ch) // 启动协程解析数据 + var results []redis.Reply // 存储解析结果 + for payload := range ch { // 循环读取通道中的数据 + if payload == nil { // 检查负载是否为空 return nil, errors.New("no protocol") } - if payload.Err != nil { + if payload.Err != nil { // 检查是否有错误 if payload.Err == io.EOF { break } return nil, payload.Err } - results = append(results, payload.Data) + results = append(results, payload.Data) // 添加数据到结果集 } return results, nil } -// ParseOne reads data from []byte and return the first payload +// ParseOne 从[]byte读取数据并返回第一个回复 func ParseOne(data []byte) (redis.Reply, error) { ch := make(chan *Payload) reader := bytes.NewReader(data) @@ -60,58 +71,70 @@ func ParseOne(data []byte) (redis.Reply, error) { return payload.Data, payload.Err } +// parse0 函数解析来自io.Reader的数据,并将结果通过Payload结构发送到通道 +// rawReader io.Reader: 输入源,可以是网络连接、文件等 +// ch chan<- *Payload: 结果发送通道,Payload包含解析结果或错误信息 func parse0(rawReader io.Reader, ch chan<- *Payload) { defer func() { if err := recover(); err != nil { + // 如果发生panic,则捕获异常并记录错误和堆栈信息 logger.Error(err, string(debug.Stack())) } }() - reader := bufio.NewReader(rawReader) + reader := bufio.NewReader(rawReader) //创建一个缓冲读取器 for { - line, err := reader.ReadBytes('\n') + line, err := reader.ReadBytes('\n') // 逐行读取数据 + /* + 客户端可能发来的是单行数据,例如+OK\r\n + 或者多行数据$3\r\nset\r\n 字符串是两行数据,第一行是$3\r\n 第二行是set\r\n + */ if err != nil { - ch <- &Payload{Err: err} + ch <- &Payload{Err: err} // 读取错误处理,将错误发送到通道并关闭通道 close(ch) return } + // 处理每行数据 length := len(line) if length <= 2 || line[length-2] != '\r' { - // there are some empty lines within replication traffic, ignore this error - //protocolError(ch, "empty line") + // 如果读取的行长度小于等于2或者行的倒数第二个字符不是回车符,则忽略这行数据 continue } - line = bytes.TrimSuffix(line, []byte{'\r', '\n'}) - switch line[0] { - case '+': + line = bytes.TrimSuffix(line, []byte{'\r', '\n'}) // 移除行尾的回车换行符 + switch line[0] { // 根据行的首个字符判断数据类型,并进行相应的处理 + case '+': // 状态回复 content := string(line[1:]) ch <- &Payload{ - Data: protocol.MakeStatusReply(content), + Data: protocol.MakeStatusReply(content), // 创建状态回复并发送 } - if strings.HasPrefix(content, "FULLRESYNC") { + if strings.HasPrefix(content, "FULLRESYNC") { // 特定命令的额外处理逻辑 + // 如果内容以"FULLRESYNC"开始,处理RDB批量字符串 err = parseRDBBulkString(reader, ch) if err != nil { + // 如果处理过程中发生错误,发送错误信息并关闭通道 ch <- &Payload{Err: err} close(ch) return } } - case '-': + case '-': // 错误回复 ch <- &Payload{ - Data: protocol.MakeErrReply(string(line[1:])), + Data: protocol.MakeErrReply(string(line[1:])), // 创建错误回复并发送 } case ':': value, err := strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { + // 如果整数解析失败,记录协议错误 protocolError(ch, "illegal number "+string(line[1:])) continue } ch <- &Payload{ - Data: protocol.MakeIntReply(value), + Data: protocol.MakeIntReply(value), // 创建整数回复并发送 } case '$': - err = parseBulkString(line, reader, ch) + err = parseBulkString(line, reader, ch) //因为第一行数据已经取走,所以后面的reader部分不包含line的东西 if err != nil { ch <- &Payload{Err: err} + // 如果解析批量字符串失败,发送错误信息并关闭通道 close(ch) return } @@ -122,7 +145,7 @@ func parse0(rawReader io.Reader, ch chan<- *Payload) { close(ch) return } - default: + default: // 默认情况,处理多批量回复 args := bytes.Split(line, []byte{' '}) ch <- &Payload{ Data: protocol.MakeMultiBulkReply(args), @@ -131,29 +154,41 @@ func parse0(rawReader io.Reader, ch chan<- *Payload) { } } +// parseBulkString 解析批量字符串回复。 +// header []byte: 接收到的以 '$' 开头的行,表示批量字符串的长度。 +// reader *bufio.Reader: 用于从连接中继续读取批量字符串的内容。 +// ch chan<- *Payload: 用于发送解析后的结果或错误信息的通道。 func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { + // 解析批量字符串长度 此时的header $3 strLen, err := strconv.ParseInt(string(header[1:]), 10, 64) if err != nil || strLen < -1 { + // 如果解析错误或字符串长度非法,则发送协议错误信息到通道 protocolError(ch, "illegal bulk string header: "+string(header)) return nil } else if strLen == -1 { + // 如果字符串长度为-1,表示这是一个空的批量字符串(即 "$-1\r\n") ch <- &Payload{ - Data: protocol.MakeNullBulkReply(), + Data: protocol.MakeNullBulkReply(), // 发送空批量字符串回复 } return nil } + // 分配足够的空间来存储字符串内容和结尾的CRLF body := make([]byte, strLen+2) + //从reader中读取指定长度的数据放到缓冲区中 _, err = io.ReadFull(reader, body) if err != nil { - return err + return err // 如果读取过程中出现错误,直接返回这个错误 } + // 将读取到的内容(去除最后的CRLF)发送到通道, + //所以最终存储在我们的Payload中的DATA的redis.Reply结构体中的Arg []byte。 + ch <- &Payload{ Data: protocol.MakeBulkReply(body[:len(body)-2]), } return nil } -// there is no CRLF between RDB and following AOF, therefore it needs to be treated differently +// parseRDBBulkString 处理RDB文件数据流中的批量字符串,因为RDB和AOF之间没有CRLF。 func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error { header, err := reader.ReadBytes('\n') if err != nil { @@ -178,9 +213,12 @@ func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error { return nil } +// parseArray 用于解析Redis协议中的数组数据。 +// header []byte: 包含数组元素数量的头部数据,如*3表示数组有三个元素。 func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64) if err != nil || nStrs < 0 { + // 如果解析出错或数组长度小于0,发送协议错误信息 protocolError(ch, "illegal array header "+string(header[1:])) return nil } else if nStrs == 0 { @@ -189,6 +227,7 @@ func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { } return nil } + // 初始化一个切片来存储数组中的元素,预分配nStrs长度的空间 lines := make([][]byte, 0, nStrs) for i := int64(0); i < nStrs; i++ { var line []byte @@ -197,17 +236,23 @@ func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { return err } length := len(line) + // 检查读取的行是否合法,长度至少为4,并且以'$'开头 if length < 4 || line[length-2] != '\r' || line[0] != '$' { + // 如果不符合批量字符串的要求,发送协议错误 protocolError(ch, "illegal bulk string header "+string(line)) break } + // 解析批量字符串的长度 strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64) if err != nil || strLen < -1 { + // 如果长度解析失败,发送协议错误 protocolError(ch, "illegal bulk string length "+string(line)) break } else if strLen == -1 { + // 如果长度为-1,表示空的批量字符串 lines = append(lines, []byte{}) } else { + // 分配足够的空间来存储字符串内容和结尾的CRLF body := make([]byte, strLen+2) _, err := io.ReadFull(reader, body) if err != nil { diff --git a/redis/protocol/asserts/assert.go b/redis/protocol/asserts/assert.go index 617f9e38..aae9e5f1 100644 --- a/redis/protocol/asserts/assert.go +++ b/redis/protocol/asserts/assert.go @@ -2,13 +2,15 @@ package asserts import ( "fmt" + "runtime" + "testing" + "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/protocol" - "runtime" - "testing" ) +// 这个文件包含了用于测试的断言函数,用于验证Redis响应是否符合预期。以下为一些示例函数: // AssertIntReply checks if the given redis.Reply is the expected integer func AssertIntReply(t *testing.T, actual redis.Reply, expected int) { intResult, ok := actual.(*protocol.IntReply) diff --git a/redis/protocol/consts.go b/redis/protocol/consts.go index 6b45430f..467ca37d 100644 --- a/redis/protocol/consts.go +++ b/redis/protocol/consts.go @@ -2,9 +2,12 @@ package protocol import ( "bytes" + "github.com/hdt3213/godis/interface/redis" ) +//这个文件定义了一些固定的Redis响应类型,如OK、PONG以及其他响应格式。 + // PongReply is +PONG type PongReply struct{} diff --git a/redis/protocol/errors.go b/redis/protocol/errors.go index 4d5b2092..80b3099c 100644 --- a/redis/protocol/errors.go +++ b/redis/protocol/errors.go @@ -1,5 +1,7 @@ package protocol +// 这个文件定义了与错误处理相关的几种特殊的Redis响应类型。每种类型都具备将自身转换为Redis协议字节序列的能力。 + // UnknownErrReply represents UnknownErr type UnknownErrReply struct{} diff --git a/redis/server/server.go b/redis/server/server.go index 29dd10ba..b8571c1d 100644 --- a/redis/server/server.go +++ b/redis/server/server.go @@ -26,46 +26,48 @@ var ( unknownErrReplyBytes = []byte("-ERR unknown\r\n") ) -// Handler implements tcp.Handler and serves as a redis server +// Handler 实现 tcp.Handler 接口,作为 Redis 服务器使用 type Handler struct { - activeConn sync.Map // *client -> placeholder - db database.DB - closing atomic.Boolean // refusing new client and new request + activeConn sync.Map // 存储活动的客户端连接 + db database.DB // 数据库接口 + closing atomic.Boolean // 是否正在关闭服务的标志 } -// MakeHandler creates a Handler instance +// MakeHandler 创建一个 Handler 实例 func MakeHandler() *Handler { var db database.DB if config.Properties.ClusterEnable { - db = cluster.MakeCluster() + db = cluster.MakeCluster() // 配置为集群模式 } else { - db = database2.NewStandaloneServer() + db = database2.NewStandaloneServer() // 单机模式 } return &Handler{ db: db, } } +// closeClient 用于关闭客户端连接,并处理相关的清理工作 func (h *Handler) closeClient(client *connection.Connection) { - _ = client.Close() - h.db.AfterClientClose(client) - h.activeConn.Delete(client) + _ = client.Close() // 关闭连接 + h.db.AfterClientClose(client) // 处理客户端关闭后的数据库操作 + h.activeConn.Delete(client) // 从活动连接中移除 } -// Handle receives and executes redis commands +// Handle 接收并执行 Redis 命令 func (h *Handler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { - // closing handler refuse new connection + // 如果服务正在关闭,则拒绝新连接 _ = conn.Close() return } - client := connection.NewConn(conn) + client := connection.NewConn(conn) // 创建一个新的连接封装,此处的client指的是我们的一个tcp连接 h.activeConn.Store(client, struct{}{}) - ch := parser.ParseStream(conn) - for payload := range ch { + ch := parser.ParseStream(conn) // 解析连接流得到命令,ch是一个管道 + for payload := range ch { //循环的从管道中拿数据。 if payload.Err != nil { + //先判断我们的redis的解析器解析到的错误类型是什么样子的 if payload.Err == io.EOF || payload.Err == io.ErrUnexpectedEOF || strings.Contains(payload.Err.Error(), "use of closed network connection") { diff --git a/tcp/server.go b/tcp/server.go index b74de25d..a17f32c1 100644 --- a/tcp/server.go +++ b/tcp/server.go @@ -19,80 +19,86 @@ import ( "github.com/hdt3213/godis/lib/logger" ) -// Config stores tcp server properties +// Config 用于存储TCP服务器的配置属性 type Config struct { - Address string `yaml:"address"` - MaxConnect uint32 `yaml:"max-connect"` - Timeout time.Duration `yaml:"timeout"` + Address string `yaml:"address"` // 监听的服务器地址 + MaxConnect uint32 `yaml:"max-connect"` // 允许的最大连接数 + Timeout time.Duration `yaml:"timeout"` // 连接的超时时间 } -// ClientCounter Record the number of clients in the current Godis server +// ClientCounter 用于记录当前Godis服务器中的客户端数量,是一个原子计数器 var ClientCounter int32 -// ListenAndServeWithSignal binds port and handle requests, blocking until receive stop signal +// ListenAndServeWithSignal 在接收到信号时终止服务 +// cfg:服务器配置 +// handler:处理TCP连接的处理器接口 func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error { - closeChan := make(chan struct{}) - sigCh := make(chan os.Signal) + closeChan := make(chan struct{}) // 用于通知服务器关闭 + sigCh := make(chan os.Signal) // 监听系统信号 signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) go func() { sig := <-sigCh switch sig { case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: - closeChan <- struct{}{} + closeChan <- struct{}{} // 接收到停止信号时,发送关闭通知 } }() - listener, err := net.Listen("tcp", cfg.Address) + listener, err := net.Listen("tcp", cfg.Address) // 开始在指定地址监听TCP连接 if err != nil { return err } //cfg.Address = listener.Addr().String() - logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address)) - ListenAndServe(listener, handler, closeChan) + logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address)) // 记录日志,开始监听 + ListenAndServe(listener, handler, closeChan) // 调用函数处理监听和连接请求 return nil } -// ListenAndServe binds port and handle requests, blocking until close +// ListenAndServe 维持服务运行,直到被告知关闭 +// listener:监听器 +// handler:处理TCP连接的处理器接口 +// closeChan:接收关闭服务器的通知 func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) { // listen signal errCh := make(chan error, 1) defer close(errCh) + //TCP 服务器的优雅关闭模式通常为: 先关闭listener阻止新连接进入,然后遍历所有连接逐个进行关闭 go func() { select { case <-closeChan: - logger.Info("get exit signal") + logger.Info("get exit signal") // 接收到退出信号的日志 case er := <-errCh: - logger.Info(fmt.Sprintf("accept error: %s", er.Error())) + logger.Info(fmt.Sprintf("accept error: %s", er.Error())) // 接收连接时出错的日志 } logger.Info("shutting down...") - _ = listener.Close() // listener.Accept() will return err immediately - _ = handler.Close() // close connections + _ = listener.Close() // 关闭监听器 + _ = handler.Close() // 关闭处理器中的所有连接 }() ctx := context.Background() var waitDone sync.WaitGroup for { - conn, err := listener.Accept() + conn, err := listener.Accept() // 接受新的连接 if err != nil { - // learn from net/http/serve.go#Serve() + // 根据HTTP服务的错误处理方式,处理临时的网络错误 if ne, ok := err.(net.Error); ok && ne.Timeout() { logger.Infof("accept occurs temporary error: %v, retry in 5ms", err) time.Sleep(5 * time.Millisecond) continue } - errCh <- err + errCh <- err // 将错误发送到错误通道 break } // handle - logger.Info("accept link") - ClientCounter++ + logger.Info("accept link") // 记录接受连接的日志 + ClientCounter++ // 客户端计数器增加 waitDone.Add(1) go func() { defer func() { - waitDone.Done() - atomic.AddInt32(&ClientCounter, -1) + waitDone.Done() // 减少等待组的计数 + atomic.AddInt32(&ClientCounter, -1) // 原子减少客户端计数器 }() - handler.Handle(ctx, conn) + handler.Handle(ctx, conn) // 使用传入的handler处理连接 }() } - waitDone.Wait() + waitDone.Wait() // 等待所有连接处理完成 }