この記事はQiitaで公開されていました
特定の処理をゴルーチンで並列実行したいけれど、サーバの負荷等を考慮して、同時実行数の上限を設定したい話です。元ネタの記事では、チャネルやsync.Pool
を使って実現していて、すでに十分シンプルなのですが、x/sync/semaphoreを使う方法も便利だったので紹介します。
見た目はほぼ、チャネルを使った実装と同じですが、s.Acquire(ctx, n)
はnの値で重みをつけることができます。なので、Aという処理が動いているときは他の処理を行わない、けれどBなら3個まで同時に動いても良い、といった対応をチャネルで行うと面倒ですが、semaphore.Weighted
なら重みを変更するだけで実現できるので便利だと思いました。
元ネタ
- チャンネルを使って、決まった数のリソースをgoroutine間で共有するパターン - 詩と創作・思索のひろば
- Big Sky :: 簡単に goroutine の実行個数を制限する方法
- channelとsync.Poolを使ってgoroutineの同時実行数を制御する
実装例
以下の例は、同時実行数が3つに制限された状態でdoSomething(u)
を並列実行します。サンプルコード自体はmattnさんのものをほぼそのまま流用しました。
全ての処理完了を待つためにsync.WaitGroup
を使っていますが、semaphore.Weighted
には全く関係ありません。
2024-04-10追記
sync.WaitGroup
を使わなくても、最後にs.Acquire(Limit)
すると完了を待てるので、そっちのほうがシンプルですね。
package main import ( "context" "fmt" "sync" "time" "golang.org/x/sync/semaphore" ) func doSomething(u string) { fmt.Println(u) time.Sleep(2 * time.Second) } const ( Limit = 3 // 同時実行数の上限 Weight = 1 // 1処理あたりの実行コスト ) func main() { urls := []string{ "http://www.example.com", "http://www.example.net", "http://www.example.net/foo", "http://www.example.net/bar", "http://www.example.net/baz", } s := semaphore.NewWeighted(Limit) var w sync.WaitGroup for _, u := range urls { w.Add(1) s.Acquire(context.Background(), Weight) go func(u string) { doSomething(u) s.Release(Weight) w.Done() }(u) } w.Wait() }
説明
まずは、semaphore.NewWeighted(lim)
でlim個のリソースをもつsemaphore.Weighted
を作成します。s.Acquire(ctx, n)
は、全体のリソース(lim)からn個消費しますが、Weighted
のリソースが足りない場合は、s.Acquire(ctx, n)
の呼び出しは他のゴルーチンからs.Release(n)
されるまでブロックします。そのため、同時にlim個以上の処理が動くことはありません。
処理が終わった後s.Release(n)
を使うと、n個のリソースをWeighted
へ戻します。ブロックしていたs.Acquire(ctx, n)
があれば、ブロックが解除されて続きの処理を行います。
また、s.TryAcquire(n)
というメソッドも用意されていて、こちらはブロックしません。代わりに、リソースが取得できたらtrue
を返します。