この記事はQiitaで公開されていました
例えば、APIの結果を使った処理を行う場合、ユーザとしてはなるべく早く処理をしたいけど、早すぎるとサーバの処理が追いつかないケースはよくあると思います。
そういう時、APIを実行した後でtime.After
等を使って少し待ってから、必要な結果が揃うまで繰り返すというのが簡単な解決方法ですが、APIの実行時間とtime.After
で待つ時間の両方が実行時間に掛かってきますし、「1秒間に100回まで」というケースはなかなか対応が難しいので、x/time/rate
を使ってみました。
制限しない実装
比較のために、何も制限をしない実装を用意しました。10万件のデータをチャネルに送り続けるだけのコードです。
package main import ( "fmt" ) const N = 100000 func main() { c := make(chan int, 1000) go func() { for i := 0; i < N; i++ { c <- i } close(c) }() for n := range c { fmt.Println(n) } }
このバージョンは、手元のマシンでは0.5秒程度で終わります。
制限を入れた実装
次に、制限を入れたバージョンです。x/time/rate
を使って、1秒間に1万件までを上限としました。
package main import ( "fmt" "log" "time" "golang.org/x/net/context" "golang.org/x/time/rate" ) const ( N = 100000 // 全体数 M = 10000 // 1秒あたりの処理制限 ) func main() { log.SetFlags(0) c := make(chan int, 1000) go func() { ctx := context.Background() n := rate.Every(time.Second/M) l := rate.NewLimiter(n, M) for i := 0; i < N; i++ { if err := l.Wait(ctx); err != nil { log.Fatalln(err) } c <- i } close(c) }() for n := range c { fmt.Println(n) } }
このバージョンは手元では9秒程度が必要です。1秒に1万件の速度で10万件のデータを処理するので当然ですね。
説明
x/time/rate
パッケージは、時間の経過に従って処理を許可する個数分のトークンが溜まっていき、処理を行う前にトークンを消費して実行可能かを判断します。1秒に1万件処理できるトークンを発行するためには、100マイクロ秒ごとに1つトークンが発行される必要があります。
コードに落とすと、基本となるのは3行で、
n := rate.Every(time.Second/M) l := rate.NewLimiter(n, M) err := l.Wait(ctx)
1秒間に1万件までを制限とするので、まずrate.Every(time.Second/M)
で100マイクロ秒ごとに1つトークンが生成されるようにしています。次にn
を使ってトークンを発行するrate.Limiter
を生成しますが、rate.NewLimiter(n, M)
のM
は何かというと、トークンが消費されない場合でも貯まり続けないように上限を設定しています。際限なく溜まっていたトークンまとめて処理されると制限をする意味がないですしね。最後にl.Wait(ctx)
で1つトークンを消費します。この時点でトークンが1つも貯まっていなければ貯まるまでブロックします。
この例では使っていませんが、Wait
が必要以上に待機しないように、ctx
を使ってタイムアウトやキャンセルができます。
その他
rate.Limiter
にはWait
の他に、WaitN
, Allow
, AllowN
などのメソッドが用意されています。WaitN
はトークンが複数個貯まるまでブロックするメソッドで、1つずつ処理することが向かない場合に便利です。
また、Allow
やAllowN
はその時点でトークンが貯まっているかどうかを返します。true
が返った場合はトークンが消費されています。Reserve
はどれだけ待てば必要なだけトークンが貯まるのかを求めます。