並行処理について(orDoneチャンネル編)
以前、teeチャンネルの備忘録を書いた。 goroumaru41gou.hatenablog.com
その中でorDoneチャンネルを使っているので、今回はその備忘録とする。
今回も参考にしたのはこちら。
取り扱うもの
- orDoneチャンネルについて
テストコード
github.com/goroumaru/test-code
orDoneチャンネルとは?
例えば、上位構成からdoneチャンネルやcontextで停止指示がきたとき、select ~ caseでゴルーチンを抜ける処理を書くケースが多くて煩雑になる。かといってselect ~ caseしないと、ゴルーチンリークしてしまう。
そんなときにorDoneチャンネルを使うと、可読性が向上する。
後述するcontextを使ったほうがよい
// OrDone : func OrDone(done, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if !ok { return } select { case valStream <- v: case <-done: // <-doneがないとdoneチャネルが送信されてきても、 // valStream <- vが送信されてくるまでブロックされ続けてしまう。 // ここで<-doneとなると、1つ上のネストにおける<-doneで抜けられる。 } } } }() return valStream }
なぜorDoneを利用するのか?
実行したい処理について、可読性があがる。
以降、利用有無でコードの可読性を見てみる。
orDoneを利用しないとき
メインとなる処理が埋もれて、わかりづらい・・・
// 比較部分のみ抜粋 loop: for { select{ case <- done: break loop case val,ok := <-myChannel: if !ok { return // またはforからbreakとか } fmt.Printf("valに対して何かするところ: %v\n", val) // ここがメインなのに埋もれてしまう } }
orDoneを利用したとき
// 比較部分のみ抜粋 for val := range orDoneChannel.OrDone(done, myChannel) { fmt.Printf("valに対して何かするところ: %v\n", val) // メインがわかり易くなった }
全体はこんな感じ・・・
func TestMain(t *testing.T) { // ここでやっていること // 定時実行した結果をmychannelとして渡す。 // このとき、orDoneを利用する。 // 時限処置によりタイムアウトして、すべての処理を終える。 // doneチャンネルをクローズするために、コンテキストでタイムアウトする ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() // 定時実行 tick := time.NewTicker(1 * time.Second) defer tick.Stop() // メインゴルーチンが先に終了してしまうので、子ゴルーチンを待たせる wg := sync.WaitGroup{} wg.Add(1) myChannel := make(chan interface{}) defer close(myChannel) done := make(chan interface{}) go func() { defer close(done) // gorutineから抜けるとき、doneチャンネルも閉じられる defer wg.Done() for { select { case <-ctx.Done(): // 時限でcontextがクローズする fmt.Println("context is closed!") return case <-tick.C: // 定時実行 myChannel <- "my channel!" } } }() // orDoneのゴルーチンは、doneチャンネルが送信される(閉じれれる)まで終了しない。 for val := range orDoneChannel.OrDone(done, myChannel) { fmt.Printf("valに対して何かするところ: %v\n", val) } // 子ゴルーチンを待機する wg.Wait() }
doneチャンネルの代わりにcontextを利用する
便利な標準パッケージcontextを使う。
// OrDoneCtx : doneの代わりにcontextを使う func OrDoneCtx(ctx context.Context, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-ctx.Done(): return case v, ok := <-c: if !ok { return } select { case valStream <- v: case <-ctx.Done(): } } } }() return valStream }
func TestOrDoneCtx(t *testing.T) { // ここでやっていること // 定時実行した結果をmychannelとして渡す。 // このとき、orDoneを利用する。 // 時限処置によりタイムアウトして、すべての処理を終える。 // doneチャンネルをクローズするために、コンテキストでタイムアウトする ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() // 定時実行 tick := time.NewTicker(1 * time.Second) defer tick.Stop() // メインゴルーチンが先に終了してしまうので、子ゴルーチンを待たせる wg := sync.WaitGroup{} wg.Add(1) myChannel := make(chan interface{}) defer close(myChannel) go func() { defer wg.Done() for { select { case <-ctx.Done(): // 時限でcontextがクローズする fmt.Println("context is closed!") return case <-tick.C: // 定時実行 myChannel <- "my channel!" } } }() // doneからctxへ変更 childCtx, childCancel := context.WithCancel(ctx) defer childCancel() // contextがキャンセルされるまで終了しない。 for val := range orDoneChannel.OrDoneCtx(childCtx, myChannel) { fmt.Printf("valに対して何かするところ: %v\n", val) } // 子ゴルーチンを待機する wg.Wait() }