连接池

type factory func() (net.Conn, error)

type ping func(conn net.Conn) error

type pool struct {
   idlesCons   chan *idleConn
   reqQueue    []conReq
   maxCnt      int
   cnt         int
   initCnt     int
   maxIdleTime time.Duration
   factory     factory
   lock        sync.Mutex
   ping        ping
}

type idleConn struct {
   c           net.Conn
   lastRequest time.Time
}

type conReq struct {
   c chan net.Conn
}

var (
   initCntError = errors.New("初始连接数大于最大连接数")
)

func NewPool(maxCnt, initCnt int, maxIdleTime time.Duration, factory factory) (*pool, error) {
   if initCnt > maxCnt {
      return nil, initCntError
   }
   var pool = &pool{
      maxCnt:      maxCnt,
      maxIdleTime: maxIdleTime,
   }
   pool.idlesCons = make(chan *idleConn, maxIdleTime)
   for i := 0; i < initCnt; i++ {
      con, err := factory()
      if err != nil {
         return nil, err
      }
      pool.idlesCons <- &idleConn{
         c:           con,
         lastRequest: time.Now(),
      }
   }
   pool.ping = func(conn net.Conn) error {
      _, err := conn.Write([]byte("1"))
      return err
   }
   return pool, nil
}

func (p *pool) get(ctx context.Context) (net.Conn, error) {
   select {
   case <-ctx.Done():
      return nil, ctx.Err()
   default:
   }
   for {
      select {
      case con := <-p.idlesCons:
         if con.lastRequest.Add(p.maxIdleTime).Before(time.Now()) {
            con.c.Close()
            continue
         }
         if p.ping(con.c) != nil {
            con.c.Close()
            continue
         }
         return con.c, nil
      default:
         p.lock.Lock()
         if p.cnt > p.maxCnt {
            conRe := conReq{c: make(chan net.Conn, 1)}
            p.reqQueue = append(p.reqQueue, conRe)
            p.lock.Unlock()
            select {
            case <-ctx.Done():
               return nil, ctx.Err()
            case con := <-conRe.c:
               return con, nil
            }
         }
         p.lock.Lock()
         p.cnt++
         conn, err := p.factory()
         if err != nil {
            return nil, err
         }
         p.lock.Unlock()
         return conn, nil

      }
   }
}

func (p *pool) put(ctx context.Context, con net.Conn) error {
   p.lock.Lock()
   if len(p.reqQueue) > 0 {
      req := p.reqQueue[0]
      p.reqQueue = p.reqQueue[1:]
      p.lock.Unlock()
      req.c <- con
      return nil
   }
   ic := &idleConn{
      c:           con,
      lastRequest: time.Now(),
   }
   select {
   case <-ctx.Done():
      return ctx.Err()
   case p.idlesCons <- ic:
   default:
      con.Close()
      p.lock.Lock()
      p.cnt--
      p.lock.Unlock()
   }
   return nil
}
Written on January 27, 2024