goroumaru41gou

遊んでいる中でインプットした内容をアウトプットする場

並行処理について(orDoneチャンネル編)

以前、teeチャンネルの備忘録を書いた。 goroumaru41gou.hatenablog.com

その中でorDoneチャンネルを使っているので、今回はその備忘録とする。

今回も参考にしたのはこちら

取り扱うもの

  • orDoneチャンネルについて

テストコード

github.com/goroumaru/test-code

orDoneチャンネルとは?

例えば、上位構成からdoneチャンネルやcontextで停止指示がきたとき、select ~ caseでゴルーチンを抜ける処理を書くケースが多くて煩雑になる。かといってselect ~ caseしないと、ゴルーチンリークしてしまう。

そんなときにorDoneチャンネルを使うと、可読性が向上する。

後述するcontextを使ったほうがよい

// OrDone :
func OrDone(done, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})

    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                    // <-doneがないとdoneチャネルが送信されてきても、
                    // valStream <- vが送信されてくるまでブロックされ続けてしまう。
                    // ここで<-doneとなると、1つ上のネストにおける<-doneで抜けられる。
                }
            }
        }
    }()
    return valStream
}

なぜorDoneを利用するのか?

実行したい処理について、可読性があがる。

以降、利用有無でコードの可読性を見てみる。

orDoneを利用しないとき

メインとなる処理が埋もれて、わかりづらい・・・

// 比較部分のみ抜粋
loop:
for {
    select{
    case <- done:
        break loop
    case val,ok := <-myChannel:
        if !ok {
            return // またはforからbreakとか
        }
        fmt.Printf("valに対して何かするところ: %v\n", val)
     // ここがメインなのに埋もれてしまう
    }
}

orDoneを利用したとき

// 比較部分のみ抜粋
for val := range orDoneChannel.OrDone(done, myChannel) {
    fmt.Printf("valに対して何かするところ: %v\n", val) 
   // メインがわかり易くなった
}

全体はこんな感じ・・・

func TestMain(t *testing.T) {
    // ここでやっていること
    // 定時実行した結果をmychannelとして渡す。
    // このとき、orDoneを利用する。
    // 時限処置によりタイムアウトして、すべての処理を終える。

    // doneチャンネルをクローズするために、コンテキストでタイムアウトする
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    // 定時実行
    tick := time.NewTicker(1 * time.Second)
    defer tick.Stop()

    // メインゴルーチンが先に終了してしまうので、子ゴルーチンを待たせる
    wg := sync.WaitGroup{}
    wg.Add(1)
    myChannel := make(chan interface{})
    defer close(myChannel)

    done := make(chan interface{})
    go func() {
        defer close(done) // gorutineから抜けるとき、doneチャンネルも閉じられる
        defer wg.Done()
        for {
            select {
            case <-ctx.Done(): // 時限でcontextがクローズする
                fmt.Println("context is closed!")
                return
            case <-tick.C: // 定時実行
                myChannel <- "my channel!"
            }
        }
    }()

    // orDoneのゴルーチンは、doneチャンネルが送信される(閉じれれる)まで終了しない。
    for val := range orDoneChannel.OrDone(done, myChannel) {
        fmt.Printf("valに対して何かするところ: %v\n", val)
    }

    // 子ゴルーチンを待機する
    wg.Wait()
}

doneチャンネルの代わりにcontextを利用する

便利な標準パッケージcontextを使う。

// OrDoneCtx : doneの代わりにcontextを使う
func OrDoneCtx(ctx context.Context, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})

    go func() {
        defer close(valStream)
        for {
            select {
            case <-ctx.Done():
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case valStream <- v:
                case <-ctx.Done():
                }
            }
        }
    }()
    return valStream
}
func TestOrDoneCtx(t *testing.T) {

    // ここでやっていること
    // 定時実行した結果をmychannelとして渡す。
    // このとき、orDoneを利用する。
    // 時限処置によりタイムアウトして、すべての処理を終える。

    // doneチャンネルをクローズするために、コンテキストでタイムアウトする
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    // 定時実行
    tick := time.NewTicker(1 * time.Second)
    defer tick.Stop()

    // メインゴルーチンが先に終了してしまうので、子ゴルーチンを待たせる
    wg := sync.WaitGroup{}
    wg.Add(1)
    myChannel := make(chan interface{})
    defer close(myChannel)

    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done(): // 時限でcontextがクローズする
                fmt.Println("context is closed!")
                return
            case <-tick.C: // 定時実行
                myChannel <- "my channel!"
            }
        }
    }()

    // doneからctxへ変更
    childCtx, childCancel := context.WithCancel(ctx)
    defer childCancel()

    // contextがキャンセルされるまで終了しない。
    for val := range orDoneChannel.OrDoneCtx(childCtx, myChannel) {
        fmt.Printf("valに対して何かするところ: %v\n", val)
    }

    // 子ゴルーチンを待機する
    wg.Wait()
}

参考