本文的完整代码在github.com/hdt3213/godis/redis/client
通常 TCP 客户端的通信模式都是阻塞式的: 客户端发送请求 -> 等待服务端响应 -> 发送下一个请求。因为需要等待网络传输数据,完成一次请求循环需要等待较多时间。
我们能否不等待服务端响应直接发送下一条请求呢?答案是肯定的。
TCP 作为全双工协议可以同时进行上行和下行通信,不必担心客户端和服务端同时发包会导致冲突。
p.s. 打电话的时候两个人同时讲话就会冲突听不清,只能轮流讲。这种通信方式称为半双工。广播只能由电台发送到收音机不能反向传输,这种方式称为单工。
我们为每一个 tcp 连接分配了一个 goroutine 可以保证先收到的请求先先回复。另一个方面,tcp 协议会保证数据流的有序性,同一个 tcp 连接上先发送的请求服务端先接收,先回复的响应客户端先收到。因此我们不必担心混淆响应所对应的请求。
这种在服务端未响应时客户端继续向服务端发送请求的模式称为 Pipeline 模式。因为减少等待网络传输的时间,Pipeline 模式可以极大的提高吞吐量,减少所需使用的 tcp 链接数。
pipeline 模式的 redis 客户端需要有两个后台协程程负责 tcp 通信,调用方通过 channel 向后台协程发送指令,并阻塞等待直到收到响应,这是一个典型的异步编程模式。
我们先来定义 client 的结构:
type Client struct { conn net.Conn // 与服务端的 tcp 连接 pendingReqs chan *Request // 等待发送的请求 waitingReqs chan *Request // 等待服务器响应的请求 ticker *time.Ticker // 用于触发心跳包的计时器 addr string ctx context.Context cancelFunc context.CancelFunc writing *sync.WaitGroup // 有请求正在处理不能立即停止,用于实现 graceful shutdown}type Request struct { id uint64 // 请求id args [][]byte // 上行参数 reply redis.Reply // 收到的返回值 heartbeat bool // 标记是否是心跳请求 waiting *wait.Wait // 调用协程发送请求后通过 waitgroup 等待请求异步处理完成 err error}
调用者将请求发送给后台协程,并通过 wait group 等待异步处理完成:
func (client *Client) Send(args [][]byte) redis.Reply {request := &request{args: args,heartbeat: false,waiting: &wait.Wait{},}request.waiting.Add(1)client.working.Add(1)defer client.working.Done()client.pendingReqs <- request // 请求入队timeout := request.waiting.WaitWithTimeout(maxWait) // 等待响应或者超时if timeout {return reply.MakeErrReply('server time out')}if request.err != nil {return reply.MakeErrReply('request failed')}return request.reply}
client 的核心部分是后台的读写协程。先从写协程开始:
// 写协程入口func (client *Client) handleWrite() {for req := range client.pendingReqs {client.doRequest(req)}}// 发送请求func (client *Client) doRequest(req *request) {if req == nil || len(req.args) == 0 {return} // 序列化请求re := reply.MakeMultiBulkReply(req.args)bytes := re.ToBytes()_, err := client.conn.Write(bytes)i := 0 // 失败重试for err != nil && i < 3 {err = client.handleConnectionError(err)if err == nil {_, err = client.conn.Write(bytes)}i++}if err == nil { // 发送成功等待服务器响应client.waitingReqs <- req} else {req.err = errreq.waiting.Done()}}
读协程是我们熟悉的协议解析器模板, 不熟悉的朋友可以到解析Redis Cluster原理了解更多。
// 收到服务端的响应func (client *Client) finishRequest(reply redis.Reply) {defer func() {if err := recover(); err != nil {debug.PrintStack()logger.Error(err)}}()request := <-client.waitingReqsif request == nil {return}request.reply = replyif request.waiting != nil {request.waiting.Done()}}// 读协程是个 RESP 协议解析器func (client *Client) handleRead() error {ch := parser.ParseStream(client.conn)for payload := range ch {if payload.Err != nil {client.finishRequest(reply.MakeErrReply(payload.Err.Error()))continue}client.finishRequest(payload.Data)}return nil}
最后编写 client 的构造器和启动异步协程的代码:
func MakeClient(addr string) (*Client, error) { conn, err := net.Dial('tcp', addr) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) return &Client{ addr: addr, conn: conn, sendingReqs: make(chan *Request, chanSize), waitingReqs: make(chan *Request, chanSize), ctx: ctx, cancelFunc: cancel, writing: &sync.WaitGroup{}, }, nil}func (client *Client) Start() { client.ticker = time.NewTicker(10 * time.Second) go client.handleWrite() go func() { err := client.handleRead() logger.Warn(err) }() go client.heartbeat()}
关闭 client 的时候记得等待请求完成:
func (client *Client) Close() { // 先阻止新请求进入队列 close(client.sendingReqs) // 等待处理中的请求完成 client.writing.Wait() // 释放资源 _ = client.conn.Close() // 关闭与服务端的连接,连接关闭后读协程会退出 client.cancelFunc() // 使用 context 关闭读协程 close(client.waitingReqs) // 关闭队列}
测试一下:
func TestClient(t *testing.T) { client, err := MakeClient('localhost:6379') if err != nil { t.Error(err) } client.Start() result = client.Send([][]byte{ []byte('SET'), []byte('a'), []byte('a'), }) if statusRet, ok := result.(*reply.StatusReply); ok { if statusRet.Status != 'OK' { t.Error('`set` failed, result: ' + statusRet.Status) } } result = client.Send([][]byte{ []byte('GET'), []byte('a'), }) if bulkRet, ok := result.(*reply.BulkReply); ok { if string(bulkRet.Arg) != 'a' { t.Error('`get` failed, result: ' + string(bulkRet.Arg)) } }}
Keep working, we will find a way out.This is Finley, welcome to join us.
到此这篇关于Golang 实现 Redis系列(六)如何实现 pipeline 模式的 redis 客户端的文章就介绍到这了,更多相关Golang实现pipeline模式的redis客户端内容请搜索优爱好网以前的文章或继续浏览下面的相关文章希望大家以后多多支持优爱好网!
相关文章:
1. 怎么让div+css兼容ie6ie7ie8ie9和FireFoxChrome等浏览器2. requestAnimationFrame使用示例详解3. 基于JavaScript实现图片裁剪功能4. React优雅的封装SvgIcon组件示例5. uniapp自定义验证码输入框并隐藏光标6. 详解JavaScript中原始数据类型Symbol的使用7. JavaScript深拷贝方法structuredClone使用8. uniapp 手机验证码输入框实现代码(随机数、倒计时、隐藏手机号码中间四位)可以直接使用9. 使用Node.js实现Clean Architecture方法示例详解10. Jquery使用原生AJAX方法请求数据