読者です 読者をやめる 読者になる 読者になる

Carpe Diem

備忘録。https://github.com/jun06t

goroutine、channelの扱い方

概要

goroutineやchannelを扱う上で気になった点を箇条書きしました。まとまってはいないのでご注意ください。 簡単のためchannelでの受信側をreader、送信側をwriterとして書いていきます。

環境

  • Go 1.6.2

実装時に注意するポイント

  1. readerとwriterは別goroutineにする
  2. writerとそのchannelのcloseは同じ関数内で定義した方が良い
  3. errorで関数を終了させる場合、下流で走っているgoroutineが適切に終了する仕組みをいれる

特に3つ目は色んなコードを見てますが考慮されてないことが多いです。

all goroutines are asleepがでる

A. main関数内でchannelでブロックされたときに出る

func main() {
    i := make(chan int)
    <-i
}

初めてgoroutineを扱う時によく起きます。実装時に注意するポイントの1つめが守られてない時によく起きます。

closeしたchannelに対してどういった操作だとpanicになるか

NG. closeしたchannelにwriteしたらpanic

close(ch)
ch <- 10 // panic

NG. 2回closeしたらpanic

close(ch)
close(ch) // panic

OK. closeしたchannelをreadするのはOK

close(ch)
h := <-ch // zero値が生成される

並列数を上げるにはbufferを増やせばいいのか?

No. bufferを持たせても処理自体はシリアル。goroutine自体を増やす。

bufferはqueueサイズが変わるだけで、rangeやselectでreadするときはシリアルな動きです。

buffer付きchannelをcloseするとどうなるか

A. bufferに入った分が読み出される

func main() {
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    close(ch)

    for n := range ch {
        fmt.Println(n)
    }
    fmt.Println("done")
}

結果

1
2
done

つまり、チャンネルを閉じた後、bufferに残っていた場合、そのreaderのgoroutineは処理を続ける

func main() {
    in := gen()

    go reader(in)
    time.Sleep(time.Second * 3)
    close(in)
    fmt.Println("called close")

    time.Sleep(time.Second * 10)
    fmt.Println("done")

}

func gen() chan int {
    elm := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    in := make(chan int, len(elm))
    for _, v := range elm {
        in <- v
    }

    return in
}

func reader(in chan int) {
    for i := range in {
        fmt.Println(i)
        time.Sleep(time.Second * 1)
    }
}

結果

1
2
3
called close
4
5
6
7
8
9
10
done

なので無駄な処理をなくすために、明示的にdoneチャンネルなどを用いて処理を完了させる仕組みが必要。

closeしたらselect内のチャンネルは反応するのか

A. する

func main() {
    done := make(chan struct{})
    go sq(done)

    close(done)
    time.Sleep(time.Second * 2)
    fmt.Println("Hello, playground")
}

func sq(done <-chan struct{}) {
    for {
        select {
        case <-done:
            fmt.Println("done called")
            return
        }
    }
}

結果

done called
Hello, playground

closeしたチャンネルに対しreadすると、zero値を生成して受信します。
これはいくらでも受信できるので、returnなどで抜けないとずっとそのzero値を受け取り続けるため注意。

fan-outとは

1producer、複数consumer型

in := producer()

w1 := worker(in)
w2 := worker(in)

writerのchannelを返り値で渡すコードが多いのはなぜか

A. closeしやすくするため

func hoge() {
    in := producer(1, 2, 3, 4, 5)

    go consumer(in)

    return
}

func producer(nums ...int) <-chan int {
    in := make(chan int, len(nums))
    go func() {
        for _, n := range nums {
            in <- n
        }
        close(in)
    }()
    return in
}

func consumer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

こんな感じでchannelを返り値で渡すケースをよく見かけます。
これはwriterがこの関数の中にいるのですが、処理が完了した時にこの中でcloseできるようにしているためです。
先に挙げたように

  • closeしたchannelにwriteはpanic
  • 2回closeはpanic

という問題がありますが、writerと別のところでchannelを定義&closeしてしまうと、これが起きやすくなります。なのでwriterのスコープでcloseした方が問題が少ないです。
そういうわけで実装時に注意するポイントの2番目に書いています。

channelはqueueのように扱えるので、以下のように橋渡しした形も作れますが、

func hoge() {
    in := make(chan int)
    go producer(in)
    go consumer(in)

    time.Sleep(time.Second * 3)
    fmt.Println("done")
}

func producer(in chan<- int) {
    for i := 0; i < 10; i++ {
        in <- i
    }
}

func consumer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

しかしこれだとhoge関数内でエラーが起きた時、producerやconsumerの処理を止められません。
inがcloseできればconsumerの方は止められますが、hoge()内でdefer close(in)とかしてしまうとproducerでwriteしてpanicになる可能性があります。

closeの必要性って?してる人としてない人がいる

リソースの無駄遣いを防げるので必要な箇所ではcloseすべき

前述のコードでもしhoge関数内でエラーが起きた時、closeするようなロジックにしてあれば

  1. inがcloseされる
  2. consumerのrangeから抜ける
  3. consumerの処理が止まる

と言った感じで不要なリソースを使わなくて済みます。

以下はcloseしないケースのコードです。

func main() {
    err := hoge()
    fmt.Println(err)
    time.Sleep(time.Second * 5)
    fmt.Println("done")
}

func hoge() error {
    in := producer(1, 2, 3, 4, 5)

    go consumer(in)

    time.Sleep(time.Second * 3)
    return fmt.Errorf("error occurred")
}

func producer(nums ...int) <-chan int {
    in := make(chan int, len(nums))
    go func() {
        defer close(in)
        for _, n := range nums {
            time.Sleep(time.Second * 1)
            in <- n
        }
    }()
    return in
}

func consumer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

結果

1
2
error occurred
3
4
5
done

見ての通り、errorが起きて関数が終了した後もproducerは動き続け、consumerもそれに続いて動いています。
error処理で関数が完了したので大丈夫、と思う人が多いですが、goroutineは動き続けます。

一方error時にちゃんとcloseしたらどうなるでしょう。

func main() {
    err := hoge()
    fmt.Println(err)
    time.Sleep(time.Second*5)
    fmt.Println("done")
}

func hoge() error {
    done := make(chan struct{})
    defer close(done)
    in := producer(done, 1, 2, 3, 4, 5)

    go consumer(in)

    time.Sleep(time.Second * 3)
    return fmt.Errorf("error occurred")
}

func producer(done <-chan struct{}, nums ...int) <-chan int {
    in := make(chan int, len(nums))
    go func() {
        defer close(in)
        for _, n := range nums {
            time.Sleep(time.Second * 1)

            select {
            case <-done:
                return
            case in <- n:
            }
        }
    }()
    return in
}

func consumer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

結果

1
2
error occurred
3
done

処理中の3はchannelに入ってしまいましたが、それ以降は処理がきちんと止まってくれます。
なのでリソースの無駄遣いを防げます。

実装時に注意するポイントの3つめですね。

selectのcaseの実行順は?

A. 複数のcaseが実行可能であるときはランダムで選ばれる

If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.

The Go Programming Language Specification - The Go Programming Language

bufferって必要?なくてもreaderとwriterの関係がしっかりしてればブロックされないので大丈夫では?

複数のworkerがいる時は必要なケースもある

例えば以下の様な3つのconsumerが動いていて、途中でerrorで関数が終了したケースを考えます。

func main() {
    err := hoge()
    fmt.Println(err)
    time.Sleep(time.Second * 5)
    fmt.Println("done")
}

func hoge() error {
    concurrency := 3
    var wg sync.WaitGroup
    errc := make(chan error)
    done := make(chan struct{})

    wg.Add(concurrency)
    for i := 0; i < concurrency; i++ {
        go func() {
            defer wg.Done()
            consumer(errc)
        }()
    }

    go func() {
        wg.Wait()
        close(done)
    }()

    for {
        select {
        case e := <-errc:
            return e
        case <-done:
            return nil
        }
    }
}

func consumer(errc chan<- error) {
    errc <- fmt.Errorf("error")
    fmt.Println("error occurred")
}

結果

error occurred
error
done

3つのworkerがいるのでerror occurredも3回出るとおもいきや1つしかでませんでした。
これはerrcのbufferが無いため、hoge関数が終了した時点でreaderがいなくなり、残りのconsumerがブロックされてしまったからです。
このgoroutineはずっとこの状態でいるため、リソースリークと言えます。

改善案

errcにworker数分bufferを与えます。

   errc := make(chan error, concurrency)

結果

error occurred
error
error occurred
error occurred
done

bufferがあることでブロックがされず、残りのconsumerも瞬時に処理を完了できたため、リソースリークせずに済みました。

channelのrangeはいつまで続くのか

A. そのchannelがcloseされるまで

普通のforループだと要素が空になると抜けちゃう気がしますが、channelの場合はcloseしない限りそこでブロックされます。

func main() {
    in := producer()
    go consumer(in)
    time.Sleep(time.Second * 10)
    fmt.Println("done")
}

func producer() <-chan int {
    in := make(chan int, 5)
    go func() {
        for i := 0; i < 5; i++ {
            in <- i
            time.Sleep(time.Second * 1)
        }
        fmt.Println("finish writing")

        time.Sleep(time.Second * 3)
        close(in)
    }()

    return in
}

func consumer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
    fmt.Println("in closed")
}

結果

0
1
2
3
4
finish writing
in closed
done

上流でエラーによって関数が終了した。下流のgoroutineを止める方法は?

A. doneチャンネルを用意する & 十分なbufferを持たせる

closeの必要性って?で書いてますが、エラー時に終了シグナルを送るような仕組みを導入しておけば処理を停止できます。
またbufferって必要?でも書いてあるように、ブロックされる可能性がある箇所に関してはbufferを十分に保たせておく必要があります。

ソース