Plan 9とGo言語のブログ

主にPlan 9やGo言語の日々気づいたことを書きます。

ゴルーチンで発生したエラーの返し方

この記事はQiitaで公開されていました

ゴルーチンで並行実行中させた処理の中で、エラーが発生した場合にどう扱うかは割と面倒です。

package main

import (
    "io/ioutil"
    "log"
    "net/http"
    "os"
)

func main() {
    c := make(chan []byte)
    go func() {
        res, err := http.Get("http://lufia.org/")
        if err != nil {
            log.Fatal(err)
        }
        defer res.Body.Close()
        body, err := ioutil.ReadAll(res.Body)
        if err != nil {
            log.Fatal(err)
        }
        c <- body
    }()
    os.Stderr.Write(<-c)
}

実行してすぐ終えるプログラムなら log.Fatal() 等で済むかもしれませんが、サービスとして長期稼働を続けるプログラムの場合は、簡単に終了されても困ります。

これが全てではありませんが、個人的によく使う方法をまとめました。

値を取得して終わるパターンの非同期処理

時間のかかる処理を1回だけ行って、チャネルを通して結果を受け取るパターンのエラーハンドリング例。実行するごとにゴルーチンが生成されるような実装です。

エラーが発生するコードと、エラーを処理するコードが近い場合は、interface{} を使って複数の型を返し、受信した側で分岐させる方法が簡単です。

package main

import (
    "io/ioutil"
    "log"
    "net/http"
    "os"
)

func main() {
    c := make(chan interface{})
    go func() {
        res, err := http.Get("http://example.com/")
        if err != nil {
            c <- err
            return
        }
        defer res.Body.Close()
        body, err := ioutil.ReadAll(res.Body)
        if err != nil {
            c <- err
            return
        }
        c <- body
    }()
    v := <-c
    switch t := v.(type) {
    case error:
        log.Fatal(t)
    case []byte:
        os.Stdout.Write(t)
    }
}

この書き方は手軽ですが、エラー発生とエラー処理を記述した場所が離れてしまうと、どんな値がチャネルを通過するのか把握できなくなってきます。上記のgo文が別のファイルに実装されていた場合はどうでしょうか?このswitch文を見ただけでは、なぜ []byteerror だけ処理しているのか、他の型になることはないのか、を確認することが難しくなります。

そうした場合は、チャネルを2つ使って、正常な場合とエラーの場合で 口を分けてしまう方法を使うことが多いです。

package main

import (
    "io/ioutil"
    "log"
    "net/http"
    "os"
)

func FetchURL() (c chan []byte, errc chan error) {
    c = make(chan []byte)
    errc = make(chan error)
    go func() {
        res, err := http.Get("http://example.com/")
        if err != nil {
            errc <- err
            return
        }
        defer res.Body.Close()
        body, err := ioutil.ReadAll(res.Body)
        if err != nil {
            errc <- err
            return
        }
        c <- body
    }()
    return c, errc
}

func main() {
    c, errc := FetchURL()
    select {
    case err := <-errc:
        log.Fatal(err)
    case body := <-c:
        os.Stdout.Write(body)
    }
}

この方法であれば、どんな値がチャネルから取得できるのかは明白です。ほとんどは後者を使いますが、影響が局所的で面倒な場合は前者を使うこともあります。

常に動作し続けるパターンの非同期処理

一定期間、繰り返し動作をするパターンのゴルーチンについて。こういった書き方をする場合です。

func Start() {
    c := make(chan int)
    go func(){
        for v := range c {
            ...
        }
    }()
    return c
}

話を単純にするため、プログラム全体で(例えばIDのような)一意な値を生成する実装を考えます。

一意であることを保証するための方法はいくつかあります。重複が発生しないように、ハッシュ関数を選ぶのもいいでしょうけれど、ここはエラーの話なので過去に発行した値を map に残して重複判定を行うとします。重複した場合はエラーを返して、リトライさせることとしましょう。

どこに実行結果を返すか、ですが、呼び出し元が複数のゴルーチンに別れることがあるため、結果を返すチャネルが1つしかない場合は、複数のゴルーチンが1つのチャネルから同時に値を待つ形となってしまいます。ゴルーチンの実行順は保証されていないので、スケジュールによっては結果を受信する順番が変わってしまいます。

実行順が変わっても対応するゴルーチンへ値を返すことを保証するため、リクエスト毎に結果を受信するためのチャネルを用意します。一意な値を生成する側は、リクエストの値を更新呼び出し元へ返す値をRequest構造体にセットして完了を送信するか、エラーの場合はその値を完了として送信します。で、呼び出し元のプログラムでは完了が受信されるまで待ちます。

package main

import (
    "crypto/sha1"
    "errors"
    "fmt"
    "sync"
    "time"
)

type Request struct {
    UserID string

    SessID string
    Done   chan error
}

type Sessions struct {
    // 使われているセッション
    sess map[string]struct{}

    // リクエストを受け付けるチャネル
    Creation chan *Request
}

func (s *Sessions) Run() {
    for req := range s.Creation {
        key := fmt.Sprintf("%s%d", req.UserID, time.Now().Unix())
        hash := sha1.Sum([]byte(key))
        sessID := fmt.Sprintf("%x", hash)
        if _, existed := s.sess[sessID]; existed {
            req.Done <- errors.New("session existed")
            continue
        }
        s.sess[sessID] = struct{}{}
        req.SessID = sessID
        close(req.Done)
    }
}

var sessions *Sessions

func init() {
    sessions = &Sessions{
        sess:     make(map[string]struct{}),
        Creation: make(chan *Request, 10),
    }
    go sessions.Run()
}

func NewSession(user string) (req *Request) {
    req = &Request{
        UserID: user,
        Done:   make(chan error, 1),
    }
    sessions.Creation <- req
    return req
}

func main() {
    users := []string{
        "user1",
        "user2",
        "user2",
        "user2",
        "user3",
    }
    var wg sync.WaitGroup
    for _, user := range users {
        req := NewSession(user)
        wg.Add(1)
        go func(req *Request) {
            defer wg.Done()
            if err := <-req.Done; err != nil {
                fmt.Printf("user `%s`: %v\n", req.UserID, err)
                return
            }
            fmt.Printf("user `%s`: session ID = %s\n", req.UserID, req.SessID)
        }(req)
    }
    wg.Wait()
}

余談ですが、Goの map はゴルーチンセーフではありません。必ず1つのゴルーチンだけがmapを操作するように注意が必要になります。上記のコードでは、常に Sessions.Runmap を操作するので、複数のゴルーチンから呼ばれた場合でも、競合状態は発生しません。

sync.Mutex を使ってもいいのですが、mapを操作するゴルーチンを1つだけ実行して、常にそのゴルーチン経由で値の生成を行う方が、どういった順番で更新されているのか・誰がロックを持つのか把握しやすくなるためお勧めです。