Plan 9とGo言語のブログ

主にPlan 9やGo言語の日々気づいたことを書きます。

http.RoundTripperでHTTPクライアントを拡張する

この記事はQiitaで公開されていました

GoでHTTPリクエストを行いたい場合、一般的には net/httphttp.Gethttp.Postを使うことになると思います。もしくは少し複雑なリクエストする場合、http.NewRequestを使うかもしれません。

上記どの方法を使うにしても、これらのリクエストはhttp.Client.Transportを通して、最終的にhttp.RoundTripper.RoundTripが呼び出されることになりますが、http.Client.Transporthttp.RoundTripper インターフェイスを満たしていれば良いので、ユーザが自由に置き換えることが可能です。http.RoundTripper の実装については、過去や今年のアドベントカレンダーに記事があるのでそちらを参照ください。

http.RoundTripper の具体的な話としては、例えば golang.org/x/oauth2 パッケージでは、アクセストークンの有効期限が切れた場合はリフレッシュトークンを使って自動更新する http.RoundTripper が組み込まれた http.Client を使います。こうした仕組みにより、ユーザ側のコードでOAuth2のヘッダを追加したりトークンを更新したりといった面倒な処理を書かなくてもうまく動くようになっています。

これと同様に、HTTPリクエストの事前処理または事後処理があるのなら、自作の http.RoundTripper を実装してみるといいんじゃないかなと思います。ただし自作する場合にとても重要なリソースの問題があって、http.Transportは内部でHTTP Keep-Aliveなどを行っているため、特に理由が無い場合は http.DefaultTransport を使い回すことが推奨されます。それでも、どうしても http.Transport を作らなければならない理由があるなら、全てのリクエストが終わった後で必ずhttp.Transport.CloseIdleConnectionsを呼びましょう。これを呼ばないとコネクションやゴルーチンなど色々なものがリークします。

リソース管理についてはGoのnet/httpクライアントで大量のリクエストを高速に行う方法にも少し書きました。

作ってみて便利だったもの

ここからは、今まで作った http.RoundTripper の実装で、個人的に便利だったものの紹介です。

リクエストとレスポンスを出力するもの

通過するリクエストとレスポンスのヘッダとボディを出力する http.RoundTripper です。デバッグする時だけ差し込めばいいのでとても楽でした。

type DumpTransport struct {
    Transport   http.RoundTripper
    Output      io.Writer
}

func (t *DumpTransport) RoundTrip(req *http.Request) (*http.Response, error) {
    w := t.Output
    if w == nil {
        w = os.Stdout
    }

    b, err := httputil.DumpRequest(req, true)
    if err != nil {
        return nil, err
    }
    if _, err := w.Write(b); err != nil {
        return nil, err
    }
    resp, err := t.Transport.RoundTrip(req)
    if err != nil {
        return nil, err
    }
    b, err = httputil.DumpResponse(resp, true)
    if err != nil {
        return nil, err
    }
    if _, err := w.Write(b); err != nil {
        return nil, err
    }
    return resp, nil
}

指数バックオフでリトライするもの

context で期限が来るまで指数バックオフする http.RoundTripper です。単純に増加するだけならそれほどでもないんですが、Retry-After ヘッダがあればそこまで待たなければならないなど地味に面倒だったりします。

type RetriableTransport struct {
    Transport http.RoundTripper
}

var retriableStatuses = map[int]struct{}{
    http.StatusTooManyRequests:     struct{}{},
    http.StatusInternalServerError: struct{}{},
    http.StatusServiceUnavailable:  struct{}{},
    http.StatusGatewayTimeout:      struct{}{},
}

func (p *RetriableTransport) RoundTrip(req *http.Request) (*http.Response, error) {
    ctx := req.Context()
    var w backoff.Backoff
    for {
        resp, err := p.Transport.RoundTrip(req)
        if err != nil {
            if !isTemporary(err) {
                return nil, err
            }
            w.Wait(ctx)
            continue
        }
        if _, ok := retriableStatuses[resp.StatusCode]; !ok {
            return resp, nil
        }

        if d := ParseRetryAfter(resp, time.Now()); d > 0 {
            w.SetNext(d)
        }
        io.Copy(ioutil.Discard, resp.Body)
        resp.Body.Close()
        resp = nil
        if err := w.Wait(ctx); err != nil {
            return nil, err
        }
    }
}

type temporaryer interface {
    Temporary() bool
}

func isTemporary(err error) bool {
    e, ok := err.(temporaryer)
    return ok && e.Temporary()
}

一定期間内のリクエスト数を制限するもの

一定時間のリクエスト数、例えば1秒で最大100回までに制限する http.RoundTripper です。

type RateLimitTransport struct {
    Transport http.RoundTripper
    Interval  time.Duration
    Limit     int

    l    *rate.Limiter
    once sync.Once
}

func (t *RateLimitTransport) interval() time.Duration {
    return t.Interval / time.Duration(t.Limit)
}

func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
    t.once.Do(func() {
        n := rate.Every(t.interval())
        t.l = rate.NewLimiter(n, t.Limit)
    })
    ctx := req.Context()
    if err := t.l.Wait(ctx); err != nil {
        return nil, err
    }
    return t.Transport.RoundTrip(req)
}

rategolang.org/x/time/rate を使いました。

全てのリクエストが終わったことを保証するもの

通過したリクエスト全てが終わるまで待つ http.RoundTripper です。上で紹介したリトライやレートリミットと組み合わせると、待ち時間も含めて終わるまで待ちます。

type ClosableTransport struct {
    Transport http.RoundTripper

    wg     sync.WaitGroup
    closed int32
}

func (p *ClosableTransport) RoundTrip(req *http.Request) (*http.Response, error) {
    p.wg.Add(1)
    defer p.wg.Done()
    if v := atomic.LoadInt32(&p.closed); v != 0 {
        return nil, errors.New("closed")
    }
    return p.Transport.RoundTrip(req)
}

func (p *ClosableTransport) Close() error {
    atomic.StoreInt32(&p.closed, 1)
    p.wg.Wait()
    return nil
}

おわりに

いかがだったでしょうか。ひとつひとつは簡単な実装ですが、例えば

  • 1分あたりのリクエスト数を300に制限する
  • リトライが必要なら指数バックオフでリトライする
  • リトライ分も含めて全て終わってからリソースを解放する
  • デバッグオプション付きの場合だけリクエストとレスポンスを出力する

という処理を愚直に実装すると、リトライやレートリミット解除待ちのリクエストをどう扱うのかなど地味に面倒なんですが、組み合わせで対応できるので個人的には便利だなと思っています。

ここで紹介したもの以外にもgithub.com/lufia/httpclientutilで公開しているので、興味があれば眺めてみてください。

また、似たようなパッケージもいくつかあります。サーバサイドのMiddlewareと区別して、http.RoundTripper実装のことをTripperwareと呼んでいるものもありますね。

http.RoundTripper 実装のヒントになれば幸いです。