この記事はQiitaで公開されていました
ゴルーチンで並行実行中させた処理の中で、エラーが発生した場合にどう扱うかは割と面倒です。
package main import ( "io/ioutil" "log" "net/http" "os" ) func main() { c := make(chan []byte) go func() { res, err := http.Get("http://lufia.org/") if err != nil { log.Fatal(err) } defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) if err != nil { log.Fatal(err) } c <- body }() os.Stderr.Write(<-c) }
実行してすぐ終えるプログラムなら log.Fatal()
等で済むかもしれませんが、サービスとして長期稼働を続けるプログラムの場合は、簡単に終了されても困ります。
これが全てではありませんが、個人的によく使う方法をまとめました。
値を取得して終わるパターンの非同期処理
時間のかかる処理を1回だけ行って、チャネルを通して結果を受け取るパターンのエラーハンドリング例。実行するごとにゴルーチンが生成されるような実装です。
エラーが発生するコードと、エラーを処理するコードが近い場合は、interface{}
を使って複数の型を返し、受信した側で分岐させる方法が簡単です。
package main import ( "io/ioutil" "log" "net/http" "os" ) func main() { c := make(chan interface{}) go func() { res, err := http.Get("http://example.com/") if err != nil { c <- err return } defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) if err != nil { c <- err return } c <- body }() v := <-c switch t := v.(type) { case error: log.Fatal(t) case []byte: os.Stdout.Write(t) } }
この書き方は手軽ですが、エラー発生とエラー処理を記述した場所が離れてしまうと、どんな値がチャネルを通過するのか把握できなくなってきます。上記のgo文が別のファイルに実装されていた場合はどうでしょうか?このswitch文を見ただけでは、なぜ []byte
と error
だけ処理しているのか、他の型になることはないのか、を確認することが難しくなります。
そうした場合は、チャネルを2つ使って、正常な場合とエラーの場合で 口を分けてしまう方法を使うことが多いです。
package main import ( "io/ioutil" "log" "net/http" "os" ) func FetchURL() (c chan []byte, errc chan error) { c = make(chan []byte) errc = make(chan error) go func() { res, err := http.Get("http://example.com/") if err != nil { errc <- err return } defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) if err != nil { errc <- err return } c <- body }() return c, errc } func main() { c, errc := FetchURL() select { case err := <-errc: log.Fatal(err) case body := <-c: os.Stdout.Write(body) } }
この方法であれば、どんな値がチャネルから取得できるのかは明白です。ほとんどは後者を使いますが、影響が局所的で面倒な場合は前者を使うこともあります。
常に動作し続けるパターンの非同期処理
一定期間、繰り返し動作をするパターンのゴルーチンについて。こういった書き方をする場合です。
func Start() { c := make(chan int) go func(){ for v := range c { ... } }() return c }
話を単純にするため、プログラム全体で(例えばIDのような)一意な値を生成する実装を考えます。
一意であることを保証するための方法はいくつかあります。重複が発生しないように、ハッシュ関数を選ぶのもいいでしょうけれど、ここはエラーの話なので過去に発行した値を map
に残して重複判定を行うとします。重複した場合はエラーを返して、リトライさせることとしましょう。
どこに実行結果を返すか、ですが、呼び出し元が複数のゴルーチンに別れることがあるため、結果を返すチャネルが1つしかない場合は、複数のゴルーチンが1つのチャネルから同時に値を待つ形となってしまいます。ゴルーチンの実行順は保証されていないので、スケジュールによっては結果を受信する順番が変わってしまいます。
実行順が変わっても対応するゴルーチンへ値を返すことを保証するため、リクエスト毎に結果を受信するためのチャネルを用意します。一意な値を生成する側は、リクエストの値を更新呼び出し元へ返す値をRequest構造体にセットして完了を送信するか、エラーの場合はその値を完了として送信します。で、呼び出し元のプログラムでは完了が受信されるまで待ちます。
package main import ( "crypto/sha1" "errors" "fmt" "sync" "time" ) type Request struct { UserID string SessID string Done chan error } type Sessions struct { // 使われているセッション sess map[string]struct{} // リクエストを受け付けるチャネル Creation chan *Request } func (s *Sessions) Run() { for req := range s.Creation { key := fmt.Sprintf("%s%d", req.UserID, time.Now().Unix()) hash := sha1.Sum([]byte(key)) sessID := fmt.Sprintf("%x", hash) if _, existed := s.sess[sessID]; existed { req.Done <- errors.New("session existed") continue } s.sess[sessID] = struct{}{} req.SessID = sessID close(req.Done) } } var sessions *Sessions func init() { sessions = &Sessions{ sess: make(map[string]struct{}), Creation: make(chan *Request, 10), } go sessions.Run() } func NewSession(user string) (req *Request) { req = &Request{ UserID: user, Done: make(chan error, 1), } sessions.Creation <- req return req } func main() { users := []string{ "user1", "user2", "user2", "user2", "user3", } var wg sync.WaitGroup for _, user := range users { req := NewSession(user) wg.Add(1) go func(req *Request) { defer wg.Done() if err := <-req.Done; err != nil { fmt.Printf("user `%s`: %v\n", req.UserID, err) return } fmt.Printf("user `%s`: session ID = %s\n", req.UserID, req.SessID) }(req) } wg.Wait() }
余談ですが、Goの map
はゴルーチンセーフではありません。必ず1つのゴルーチンだけがmapを操作するように注意が必要になります。上記のコードでは、常に Sessions.Run
が map
を操作するので、複数のゴルーチンから呼ばれた場合でも、競合状態は発生しません。
sync.Mutex
を使ってもいいのですが、map
を操作するゴルーチンを1つだけ実行して、常にそのゴルーチン経由で値の生成を行う方が、どういった順番で更新されているのか・誰がロックを持つのか把握しやすくなるためお勧めです。