我们可以通过goroutine和channel机制非常方便地编写并发业务,但就和面向对象与设计模式的关系一样,是一种思想具体落实到行动方针的过程,在牛逼的战略,没有基本的战术指导,也只是空谈。
因此,第七章并发模式,并没有太多语法上的新东西,而是利用goroutine和channel介绍了三种并发模式,分别适用于三种不同的业务场景。
runner——给每个并发任务设置deadline,管理并发任务的生命周期
pool——利用有缓冲通道创建资源池,统一管理并发时的资源访问
work——利用无缓冲通道创建goroutine池,统一管理并发
runner 先假设一个场景需求,比如http服务的并发,我们要为每个来自客户端的请求创建一个临时的并发响应任务,但这个最好在某个规定的时间内完成响应,否则就强制它退出,这样可以很好地避免某些情况下,一些并发任务卡死的情况,同时可以很好地管理每个并发的生命周期。
runner就是为这样的场景应用而生的,runner可以理解为是一个运行管理器,所有的并发任务都要叫给它负责管理,它负责并发任务的启动、超时监控、强制中断等。
(由于我个人在阅读原著的时候是先讲runner的内部实现,再看实际应用,总感觉云里雾里的,觉得还是先通篇看一下如何运用runner,再来看其内部的实现,可能效果会好一点)
先来看runner的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 func testRunner () { log.Println("Runner test starting work..." ) r := runner.New(3 * time.Second) for i := 0 ; i < 5 ; i++ { r.Add(func (id int ) { log.Printf("Processor - Task #%d.\n" , id) time.Sleep(time.Duration(id) * time.Second) log.Printf("Task #%d done.\n" , id) }) } if err := r.Start(); err != nil { switch err { case runner.ErrTimeout: log.Println("Terminating due to timeout." ) os.Exit(1 ) case runner.ErrInterrupt: log.Println("Terminating due to interrupt." ) os.Exit(2 ) } } log.Println("Process end." ) } 2019 /03 /03 14 :48 :41 Runner test starting work...2019 /03 /03 14 :48 :41 Processor - Task #2. 2019 /03 /03 14 :48 :41 Processor - Task #4. 2019 /03 /03 14 :48 :41 Processor - Task #3. 2019 /03 /03 14 :48 :41 Processor - Task #0. 2019 /03 /03 14 :48 :41 Task #0 done.2019 /03 /03 14 :48 :41 Processor - Task #1. 2019 /03 /03 14 :48 :42 Task #1 done.2019 /03 /03 14 :48 :43 Task #2 done.2019 /03 /03 14 :48 :44 Terminating due to timeout.^C2019/03 /03 14 :49 :36 Terminating due to interrupt.
可以看到,runner就是一个类型,需要用其创建对象后才能具体使用。而在外部,我们只需要定义好每个任务的函数,并简单的将它们添加到runner当中即可,剩下的全部交由runner自行管理。
现在再来看看runner类型是如何实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 type Runner struct { interrupt chan os.Signal complete chan error timeout <-chan time.Time tasks []func (id int ) } var ErrTimeout = errors.New("received timeout" )var ErrInterrupt = errors.New("received interrupt" )func New (d time.Duration) *Runner { return &Runner{ interrupt: make (chan os.Signal, 1 ), complete: make (chan error), timeout: time.After(d), } } func (r *Runner) Add (tasks ...func (int ) ) { r.tasks = append (r.tasks, tasks...) } func (r *Runner) Start () error { signal.Notify(r.interrupt, os.Interrupt) var wg sync.WaitGroup wg.Add(len (r.tasks)) for i, t := range r.tasks { go func (id int , task func (int ) ) { task(id) wg.Done() }(i, t) } go func () { wg.Wait() r.complete <- nil }() select { case err := <-r.complete: return err case <-r.timeout: return ErrTimeout case <-r.interrupt: signal.Stop(r.interrupt) return ErrInterrupt } }
由于只是原型演示,runner的内部实现不算复杂,只需要记住一个核心思想无缓冲通道在没有数据读写的时候,会被阻塞 。说千道万,runner就是利用了这个特性才得以在select语句中完成了:
并行接收系统的中断信号——interrupt通道。
并行接收定时器的超时信号——timeout通道。
pool 这里的pool是指资源池的意思,如果熟悉Java/C#中的“数据连接池”的概念,那这里的池大体就是这个意思了。
换而言之,在并发场景下,难免会遇到并发任务争夺临界资源的情况,还是以数据库访问为例:如果有1000个并发任务要去访问数据库,每个并发都需要完成建立连接——认证——查询——断开连接等操作,那不论是应用服务器还是数据库服务器,无疑都是巨大的负担。因此,通过创建10个数据库连接,并把这些“连接”当作资源放入“池”中,给所有的并发任务共享,每个并发在需要的时候从池中取出连接,完成查询后再放回池中,不仅能大幅降低CPU的负载,也能减少内存的开销。(但我个人觉得最爽的地方是,你的代码可以更专注地去query,而不必考虑connection本身😂)
同样,先来看看pool的运用过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 const ( maxGoroutines = 25 pooledResource = 5 ) type dbConnection struct { ID int32 } func (dbConn *dbConnection) Close () error { log.Println("Close: Connection" , dbConn.ID) return nil } var idCounter int32 func createConnection () (io.Closer, error) { id := atomic.AddInt32(&idCounter, 1 ) log.Println("Create: New Connection" , id) return &dbConnection{id}, nil } func testPool () { var wg sync.WaitGroup wg.Add(maxGoroutines) p, err := pool.New(createConnection, pooledResource) if err != nil { log.Println(err) } for query := 0 ; query < maxGoroutines; query++ { go func (q int ) { performQueries(q, p) wg.Done() }(query) } wg.Wait() log.Println("Shutdown Program" ) p.Close() } func performQueries (query int , p *pool.Pool) { conn, err := p.Acquire() if err != nil { log.Println(err) return } defer p.Release(conn) time.Sleep(time.Duration(rand.Intn(1000 )) * time.Millisecond) log.Printf("QID[%d] CID[%d]\n" , query, conn.(*dbConnection).ID) }
(可能是我没学习到位,我个人觉得pool模式并不是特别容易掌握,思想是很好理解的,但牵扯太多接口实现、有/无缓冲通道的特性等内容,所以代码可能要再多消化几遍。)
再看看pool包的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 type Pool struct { m sync.Mutex resource chan io.Closer factory func () (io.Closer, error) closed bool } var ErrPoolClosed = errors.New("Pool has been closed" )func New (fn func () (io.Closer, error) , size uint ) (*Pool, error) { if size <= 0 { return nil , errors.New("size value too small" ) } return &Pool{ factory: fn, resource: make (chan io.Closer, size), }, nil } func (p *Pool) Acquire () (io.Closer, error) { select { case r, ok := <-p.resource: log.Println("Acquire:" , "Shared Resource" ) if !ok { return nil , ErrPoolClosed } return r, nil default : log.Println("Acquire:" , "New Resource" ) return p.factory() } } func (p *Pool) Release (r io.Closer) { p.m.Lock() defer p.m.Unlock() if p.closed { r.Close() return } select { case p.resource <- r: log.Println("Release:" , "In Queue" ) default : log.Println("Release:" , "Closing" ) r.Close() } } func (p *Pool) Close () { p.m.Lock() defer p.m.Unlock() if p.closed { return } p.closed = true close (p.resource) for r := range p.resource { r.Close() } }
pool资源池实现的核心思想是有缓冲通道读写时不会引起阻塞,select语句在通道内没有数据的情况下会自动执行default选项 。
work work模式就是创建一个goroutine池,管理池中所有的goroutine统一执行。但它有别于runner模式,runner其实是负责监控池中的每个并发任务的生命周期的,而work则是负责池中的每个并发任务的执行顺序,即任务队列。
这个模式的好处在于,可以很好地控制程序运行的负载,比如突发情况下,某台服务器的http请求一瞬间到达100万,如果为了响应所有请求也在一瞬起启动100万个响应任务,那估计服务器就冒烟了。所以最好的方式就是限制并发任务数量,比如每次最多启动1万个响应,剩下的排队慢慢来。
因此,work就是一个并发任务的队列池,还是先看看如何运用的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 var names = []string { "steve" , "bob" , "mary" , "therese" , "jason" , } type namePrinter struct { name string } func (m *namePrinter) Task () { log.Println(m.name) time.Sleep(time.Second) } func testWork () { p := work.New(2 ) var wg sync.WaitGroup wg.Add(100 * len (names)) for i := 0 ; i < 100 ; i++ { for _, name := range names { np := namePrinter{name: name} go func () { p.Run(&np) wg.Done() }() } } wg.Wait() p.Shutdown() }
如上代码,namePrinter实现了work包内规定的Task
接口之后,work的工作池就能够统一管理namePrinter对象了。这个namePrinter可以理解为某个业务的模拟,比如上面说的http响应任务(这里仅是简单地做个打印)。
而后,不论创建多少个namePrinter相关的goroutine(并发),都只需简单地将其丢到工作池中Run(p.Run并没有立刻启动任务,工作池会根据情况自行安排)。
最后在看看work包的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 type Worker interface { Task() } type Pool struct { work chan Worker wg sync.WaitGroup } func New (maxGoroutines int ) *Pool { p := Pool{ work: make (chan Worker), } p.wg.Add(maxGoroutines) for i := 0 ; i < maxGoroutines; i++ { go func () { for w := range p.work { w.Task() } p.wg.Done() }() } return &p } func (p *Pool) Run (w Worker) { p.work <- w } func (p *Pool) Shutdown () { close (p.work) p.wg.Wait() }
work的实现其实是非常简单的,核心思想是for-range循环时,无缓冲通道会阻塞 ,工作池是一个无缓冲通道,而每个for-range都相当于一个队列,当池中有数据是,所有的for-range都会争夺这个输入数据来处理,但如果某个队列本身已经在工作时,就没空再争夺通道内的数据。可以说是最简单有效的负载均衡。
小结一下
无缓冲通道在读写时会引起阻塞,可以用来控制程序生命周期
带default分支的select语句会尝试读写通道,而不会阻塞
可以利用无缓冲通道创建一个工作池,统一管理goroutine并发任务
可以利用有缓冲通道创建一个资源池,统一管理并发时的资源访问