Go如何创建Grpc链接池

其他教程   发布日期:2023年08月02日   浏览次数:449

这篇“Go如何创建Grpc链接池”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Go如何创建Grpc链接池”文章吧。

常规用法

  1. gRPC
四种基本使用
  • 请求响应模式

  • 客户端数据流模式

  • 服务端数据流模式

  • 双向流模式

常见的

  1. gRPC
调用写法
  1. func main(){
  2. //... some code
  3. // 链接grpc服务
  4. conn , err := grpc.Dial(":8000",grpc.WithInsecure)
  5. if err != nil {
  6. //...log
  7. }
  8. defer conn.Close()
  9. //...some code

存在的问题:面临高并发的情况,性能问题很容易就会出现,例如我们在做性能测试的时候,就会发现,打一会性能测试,客户端请求服务端的时候就会报错:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused

实际去查看问题的时候,很明显,这是 gRPC 的连接数被打满了,很多连接都还未完全释放。[#本文来源:janrs.com#]

  1. gRPC
的通信本质上也是
  1. TCP
的连接,那么一次连接就需要三次握手,和四次挥手,每一次建立连接和释放连接的时候,都需要走这么一个过程,如果我们频繁的建立和释放连接,这对于资源和性能其实都是一个大大的浪费。

在服务端,

  1. gRPC
服务端的链接管理不用我们操心,但是
  1. gRPC
客户端的链接管理非常有必要关心,要实现复用客户端的连接。

创建链接池

创建链接池需要考虑的问题:

  • 连接池是否支持扩缩容

  • 空闲的连接是否支持超时自行关闭,是否支持保活

  • 池子满的时候,处理的策略是什么样的

创建链接池接口

  1. type Pool interface {
  2. // 获取一个新的连接 , 当关闭连接的时候,会将该连接放入到池子中
  3. Get() (Conn, error)
  4. // 关闭连接池,自然连接池子中的连接也不再可用
  5. Close() error
  6. //[#本文来源:janrs.com#]
  7. Status() string
  8. }

实现链接池接口

创建链接池代码

  1. func New(address string, option Options) (Pool, error) {
  2. if address == "" {
  3. return nil, errors.New("invalid address settings")
  4. }
  5. if option.Dial == nil {
  6. return nil, errors.New("invalid dial settings")
  7. }
  8. if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive {
  9. return nil, errors.New("invalid maximum settings")
  10. }
  11. if option.MaxConcurrentStreams <= 0 {
  12. return nil, errors.New("invalid maximun settings")
  13. }
  14. p := &pool{
  15. index: 0,
  16. current: int32(option.MaxIdle),
  17. ref: 0,
  18. opt: option,
  19. conns: make([]*conn, option.MaxActive),
  20. address: address,
  21. closed: 0,
  22. }
  23. for i := 0; i < p.opt.MaxIdle; i++ {
  24. c, err := p.opt.Dial(address)
  25. if err != nil {
  26. p.Close()
  27. return nil, fmt.Errorf("dial is not able to fill the pool: %s", err)
  28. }
  29. p.conns[i] = p.wrapConn(c, false)
  30. }
  31. log.Printf("new pool success: %v
  32. ", p.Status())
  33. return p, nil
  34. }

关于以上的代码,需要特别注意每一个连接的建立也是在

  1. New
里面完成的,[#本文来源:janrs.com#]只要有
  1. 1
个连接未建立成功,那么咱们的连接池就算是建立失败,咱们会调用
  1. p.Close()
将之前建立好的连接全部释放掉。

关闭链接池代码

  1. // 关闭连接池
  2. func (p *pool) Close() error {
  3. atomic.StoreInt32(&p.closed, 1)
  4. atomic.StoreUint32(&p.index, 0)
  5. atomic.StoreInt32(&p.current, 0)
  6. atomic.StoreInt32(&p.ref, 0)
  7. p.deleteFrom(0)
  8. log.Printf("[janrs.com]close pool success: %v
  9. ", p.Status())
  10. return nil
  11. }

从具体位置删除链接池代码

  1. // 清除从 指定位置开始到 MaxActive 之间的连接
  2. func (p *pool) deleteFrom(begin int) {
  3. for i := begin; i < p.opt.MaxActive; i++ {
  4. p.reset(i)
  5. }
  6. }

销毁具体的链接代码

  1. // 清除具体的连接
  2. func (p *pool) reset(index int) {
  3. conn := p.conns[index]
  4. if conn == nil {
  5. return
  6. }
  7. conn.reset()
  8. p.conns[index] = nil
  9. }

关闭链接

代码

  1. func (c *conn) reset() error {
  2. cc := c.cc
  3. c.cc = nil
  4. c.once = false
  5. // 本文博客来源:janrs.com
  6. if cc != nil {
  7. return cc.Close()
  8. }
  9. return nil
  10. }
  11. func (c *conn) Close() error {
  12. c.pool.decrRef()
  13. if c.once {
  14. return c.reset()
  15. }
  16. return nil
  17. }

在使用连接池通过

  1. pool.Get()
拿到具体的连接句柄
  1. conn
之后,会使用
  1. conn.Close()
关闭连接,实际上也是会走到上述的
  1. Close()
实现的位置,但是并未指定当然也没有权限显示的指定将
  1. once
置位为
  1. false
,也就是对于调用者来说,是关闭了连接,对于连接池来说,实际上是将连接归还到连接池中。

扩缩容

关键代码

  1. func (p *pool) Get() (Conn, error) {
  2. // the first selected from the created connections
  3. nextRef := p.incrRef()
  4. p.RLock()
  5. current := atomic.LoadInt32(&p.current)
  6. p.RUnlock()
  7. if current == 0 {
  8. return nil, ErrClosed
  9. }
  10. if nextRef <= current*int32(p.opt.MaxConcurrentStreams) {
  11. next := atomic.AddUint32(&p.index, 1) % uint32(current)
  12. return p.conns[next], nil
  13. }
  14. // 本文博客来源:janrs.com
  15. // the number connection of pool is reach to max active
  16. if current == int32(p.opt.MaxActive) {
  17. // the second if reuse is true, select from pool's connections
  18. if p.opt.Reuse {
  19. next := atomic.AddUint32(&p.index, 1) % uint32(current)
  20. return p.conns[next], nil
  21. }
  22. // the third create one-time connection
  23. c, err := p.opt.Dial(p.address)
  24. return p.wrapConn(c, true), err
  25. }
  26. // the fourth create new connections given back to pool
  27. p.Lock()
  28. current = atomic.LoadInt32(&p.current)
  29. if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) {
  30. // 2 times the incremental or the remain incremental ##janrs.com
  31. increment := current
  32. if current+increment > int32(p.opt.MaxActive) {
  33. increment = int32(p.opt.MaxActive) - current
  34. }
  35. var i int32
  36. var err error
  37. for i = 0; i < increment; i++ {
  38. c, er := p.opt.Dial(p.address)
  39. if er != nil {
  40. err = er
  41. break
  42. }
  43. p.reset(int(current + i))
  44. p.conns[current+i] = p.wrapConn(c, false)
  45. }
  46. // 本文博客来源:janrs.com
  47. current += i
  48. log.Printf("#janrs.com#grow pool: %d ---> %d, increment: %d, maxActive: %d
  49. ",
  50. p.current, current, increment, p.opt.MaxActive)
  51. atomic.StoreInt32(&p.current, current)
  52. if err != nil {
  53. p.Unlock()
  54. return nil, err
  55. }
  56. }
  57. p.Unlock()
  58. next := atomic.AddUint32(&p.index, 1) % uint32(current)
  59. return p.conns[next], nil
  60. }

  1. Get
代码逻辑
  • 先增加连接的引用计数,如果在设定

    1. current*int32(p.opt.MaxConcurrentStreams)
    范围内,那么直接取连接进行使用即可。
  • 若当前的连接数达到了最大活跃的连接数,那么就看我们新建池子的时候传递的

    1. option
    中的
    1. reuse
    参数是否是
    1. true
    ,若是复用,则随机取出连接池中的任意连接提供使用,如果不复用,则新建一个连接。
  • 其余的情况,就需要我们进行

    1. 2
    倍或者
    1. 1
    倍的数量对连接池进行扩容了。

也可以在

  1. Get
的实现上进行缩容,具体的缩容策略可以根据实际情况来定,例如当引用计数
  1. nextRef
只有当前活跃连接数的
  1. 10%
的时候(这只是一个例子),就可以考虑缩容了。

以上就是Go如何创建Grpc链接池的详细内容,更多关于Go如何创建Grpc链接池的资料请关注九品源码其它相关文章!