连接池
type factory func() (net.Conn, error)
type ping func(conn net.Conn) error
type pool struct {
idlesCons chan *idleConn
reqQueue []conReq
maxCnt int
cnt int
initCnt int
maxIdleTime time.Duration
factory factory
lock sync.Mutex
ping ping
}
type idleConn struct {
c net.Conn
lastRequest time.Time
}
type conReq struct {
c chan net.Conn
}
var (
initCntError = errors.New("初始连接数大于最大连接数")
)
func NewPool(maxCnt, initCnt int, maxIdleTime time.Duration, factory factory) (*pool, error) {
if initCnt > maxCnt {
return nil, initCntError
}
var pool = &pool{
maxCnt: maxCnt,
maxIdleTime: maxIdleTime,
}
pool.idlesCons = make(chan *idleConn, maxIdleTime)
for i := 0; i < initCnt; i++ {
con, err := factory()
if err != nil {
return nil, err
}
pool.idlesCons <- &idleConn{
c: con,
lastRequest: time.Now(),
}
}
pool.ping = func(conn net.Conn) error {
_, err := conn.Write([]byte("1"))
return err
}
return pool, nil
}
func (p *pool) get(ctx context.Context) (net.Conn, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
for {
select {
case con := <-p.idlesCons:
if con.lastRequest.Add(p.maxIdleTime).Before(time.Now()) {
con.c.Close()
continue
}
if p.ping(con.c) != nil {
con.c.Close()
continue
}
return con.c, nil
default:
p.lock.Lock()
if p.cnt > p.maxCnt {
conRe := conReq{c: make(chan net.Conn, 1)}
p.reqQueue = append(p.reqQueue, conRe)
p.lock.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case con := <-conRe.c:
return con, nil
}
}
p.lock.Lock()
p.cnt++
conn, err := p.factory()
if err != nil {
return nil, err
}
p.lock.Unlock()
return conn, nil
}
}
}
func (p *pool) put(ctx context.Context, con net.Conn) error {
p.lock.Lock()
if len(p.reqQueue) > 0 {
req := p.reqQueue[0]
p.reqQueue = p.reqQueue[1:]
p.lock.Unlock()
req.c <- con
return nil
}
ic := &idleConn{
c: con,
lastRequest: time.Now(),
}
select {
case <-ctx.Done():
return ctx.Err()
case p.idlesCons <- ic:
default:
con.Close()
p.lock.Lock()
p.cnt--
p.lock.Unlock()
}
return nil
}
Written on January 27, 2024
