はじめてのGo―シンプルな言語仕様、型システム、並行処理

第5章並行プログラミング―ゴルーチンとチャネルを使いこなす

本章では、ゴルーチンやチャネル、syncパッケージを用いて、並行処理を行う方法について解説します。

並行プログラミングの基本

複数の処理を効率良く行うために、Goは言語自体が並行処理に必要な機能をサポートしています。特に本章で扱うゴルーチンやチャネルの機能などは、Goで並行処理プログラミングをするうえで必要不可欠な知識であり、これらを適切に使うことで、マルチコアが一般的になった近年のマシンリソースを最大限に引き出す、パフォーマンスの良いプログラムを作成できるようになります。

本節では、ゴルーチンやチャネルを用いた並行処理の考え方と、それらと合わせてよく使うsyncパッケージの使い方などについて解説します。

ゴルーチン

Goには、ゴルーチンGoroutineという軽量スレッドのしくみがあります。ここまで行っていたmain()関数も、1つのゴルーチンの中で実行されています。go構文を用いて、任意の関数を別のゴルーチンとして起動することで、処理を並行して走らせることができます。

ここではHTTPへのアクセス処理を用いて、ゴルーチンの使い方とチャネルによるメッセージのやりとりの方法を見てみましょう。

ゴルーチンを使わない場合

たとえば、3つのWebサイトにアクセスし、そのステータスコードを表示するプログラムを考えます。

package main

import (
    "fmt"
    "log"
    "net/http"
)

func main() {
    urls := []string{
        "http://example.com",
        "http://example.net",
        "http://example.org",
    }
    for _, url := range urls {
        res, err := http.Get(url)
        if err != nil {
            log.Fatal(err)
        }
        defer res.Body.Close()
        fmt.Println(url, res.Status)
    }
}

http.Get()は同期処理であるため、この方法では別々のサイトにリクエストしているにもかかわらず、前のレスポンスが返らないと次のリクエストに進むことができません図1⁠。

図1 同期処理
図1 同期処理

しかし、それぞれのリクエストは互いに依存がなく独立しているため、順番に実行する必要はありません。こうした場合、ゴルーチンを用いることで3つのリクエストを並行して行うことができます。

ゴルーチンを使った場合

先ほどのプログラムを、リクエストが別々のゴルーチンで行われるように修正してみます。実際にリクエストを発行している部分を無名関数化し、関数の前にgoというキーワードを加えると、それだけで関数の実行が別のゴルーチンで行われるようになります。

func main() {
    urls := []string{
        "http://example.com",
        "http://example.net",
        "http://example.org",
    }
    for _, url := range urls {
        // 取得処理をゴルーチンで実行する
        go func(url string) {
            res, err := http.Get(url)
            if err != nil {
                log.Fatal(err)
            }
            defer res.Body.Close()
            fmt.Println(url, res.Status)
        }(url)
    }
    // main()が終わらないように待ち合わせる
    time.Sleep(time.Second)
}

ここではmain()が実行されたときに内部で3つのゴルーチンを起動していますが、起動が終わってゴルーチン内で処理が行われている間もmain()は先に進んでしまうため、待ち合わせのためにtime.Sleep()を呼んで1秒間main()を止めています(より適した処理方法は次に紹介します⁠⁠。

このプログラムの実行は図2のようなイメージです。各ゴルーチンが並行してリクエストを発行しているため、結果はレスポンスが早い順に出力されます。

図2 Sleep()によるゴルーチンの待ち合わせ
図2 Sleep()によるゴルーチンの待ち合わせ

sync.WaitGroup

先ほどの例ではtime.Sleep()main()を1秒間待たせていましたが、実際に待ちたいのはhttp.Get()を行っているすべてのゴルーチンの終了です。

起動したすべてのゴルーチンの終了を待ち合わせるにはsync.WaitGroupが利用できます。sync.WaitGroupは、Add()でカウントを増やしDone()でカウントを減らし、Wait()でカウントがゼロになるまで待ち合わせます。

func main() {
    wait := new(sync.WaitGroup)
    urls := []string{
        "http://example.com",
        "http://example.net",
        "http://example.org",
    }
    for _, url := range urls {
        // waitGroupに追加
        wait.Add(1)
        // 取得処理をゴルーチンで実行する
        go func(url string) {
            res, err := http.Get(url)
            if err != nil {
                log.Fatal(err)
            }
            defer res.Body.Close()
            fmt.Println(url, res.Status)
            // waitGroupから削除
            wait.Done()
        }(url)
    }
    // 待ち合わせ
    wait.Wait()
}

チャネル

複数のゴルーチン間でデータをやりとりしたい場合、組込みのチャネルchannelという機能を用いることで、メッセージパッシング(情報をメッセージとして送受信する)によってデータを送受信できます。チャネルはmake()関数に型を指定して生成することで、その型のデータの書き込みと読み出しができます。

// stringを扱うチャネルを生成
ch := make(chan string)

// チャネルにstringを書き込む
ch <- "a"

// チャネルからstringを読み出す
message := <- ch

今回の場合は、ゴルーチン内で取得したステータスコードをチャネルに書き込み、それをmain()のゴルーチンで読み出すことで、ゴルーチン間でデータを受け渡すことができます。

func main() {
    urls := []string{
        "http://example.com",
        "http://example.net",
        "http://example.org",
    }
    statusChan := make(chan string)
    for _, url := range urls {
        // 取得処理をゴルーチンで実行する
        go func(url string) {
            res, err := http.Get(url)
            if err != nil {
                log.Fatal(err)
            }
            defer res.Body.Close()
            statusChan <- res.Status
        }(url)
    }
    for i := 0; i < len(urls); i++ {
        fmt.Println(<-statusChan)
    }
}

ゴルーチンの中でstatusChanに値が書き込まれるまで、main()の中では値を読み出すことができません。この場合、main()内ではstatusChanの読み出しが3回完了するまで処理がブロックされるため、waitGroupのような待ち合わせ処理は必要ありません。

これにより、HTTPリクエストを並行して発行し、早く取得されたステータスから順に受け取ることができます図3⁠。

図3 チャネルによる値の受け渡し
図3 チャネルによる値の受け渡し

チャネルを返すパターン

先ほどはmain() 内の匿名関数でHTTPのGETを実行していましたが、この処理をgetStatus()という別の関数にし、関数が内部で生成したチャネルを返すように実装してみます。

func getStatus(urls []string) <-chan string {
    // 関数でチャネルを生成
    statusChan := make(chan string)
    for _, url := range urls {
        go func(url string) {
            res, err := http.Get(url)
            if err != nil {
                log.Fatal(err)
            }
            defer res.Body.Close()
            statusChan <- res.Status
        }(url)
    }
    return statusChan // チャネルを返す
}

func main() {
    urls := []string{
        "http://example.com",
        "http://example.net",
        "http://example.org",
    }
    statusChan := getStatus(urls)

    for i := 0; i < len(urls); i++ {
        fmt.Println(<-statusChan)
    }
}

まず、getStatus()内で結果を渡すためのstatus Chanを生成します。次に非同期に行う処理を匿名関数にし、リクエストをそれぞれ別のゴルーチンで実行します。関数自体はstatusChanを返して終了し、起動されたゴルーチンが内部でstatusChanに結果を書き込んでいきます。

main()は、関数を呼び出すと同時に結果を受信するチャネルを受け取り、それをfor文内で読み出します。これにより、main()側が非常にスッキリと記述でき、ロジックの大半はgetStatus()に隠蔽(いんぺい)できました。

また、このときgetStatus()main()がチャネルに値を書き込むことを想定していません。こうした場合は、getStatus()の戻り値を<-chan stringと読み出し専用のチャネルにすることで、main()がこのチャネルに値を書き込むことを型レベルで防ぐことができます。

このパターンはチャネルを用いる場合によく使うので、覚えておくとよいでしょう。

select文を用いたイベント制御

複数のチャネルに対する読み出しや書き込みを同時に制御するためにはselect文を用います。select文はfor文と組み合わせて使う場合が多くあります。

case

複数の操作をselect文のcaseに指定しておくと、いずれかのcaseの操作が実行されたときに、該当する処理が実行されます。どれか1つcaseが実行されない限りは、select文はブロックします。

ch1 := make(chan string)
ch2 := make(chan string)
for {
    select {
    case c1 := <-ch1:
        // ch1からデータを読み出したときに実行される
    case c2 := <-ch2:
        // ch2からデータを読み出したときに実行される
    case ch2 <- "c":
        // ch2にデータを書き込んだときに実行される
    }
}

default

caseの最後にdefaultを記述すると、実行するcaseがなかった場合はdefaultが実行されます。defaultの実行が終わるとselect文の処理が終わるため、select文がブロックされなくなります。

ch1 := make(chan string)
ch2 := make(chan string)
for {
    select {
    case c1 := <-ch1:
        // ch1からデータを読み出したときに実行される
    case c2 := <-ch2:
        // ch2からデータを読み出したときに実行される
    case ch2 <- "c":
        // ch2にデータを書き込んだときに実行される
    default:
        // caseが実行されなかった場合に実行される
    }
}

タイムアウト

for/select文とbreakを用いて実装する代表的な例はタイムアウト処理です。

func main() {
    // 1秒後に値が読み出せるチャネル
    timeout := time.After(time.Second)
    urls := []string{
        "http://example.com",
        "http://example.net",
        "http://example.org",
    }
    statusChan := getStatus(urls)

LOOP: // ラベル名は任意
    for {
        select {
        case status := <-statusChan:
            fmt.Println(status) // 受信したデータを表示
        case <-timeout:
            break LOOP // このfor/selectを抜ける
        }
    }
}

timeパケージにあるtime.After()関数は、時間を指定するとその時間後にデータを書き込むチャネルを返す関数です。このチャネルの読み出しをselect文に登録することで、タイムアウト処理を実現できます。

先ほどのstatusChanの読み出しを無限forループ内のselect文で受け取るようにします。ステータスを受け取った場合はそれが表示されますが、1秒後にtimeoutから値が読み出せると、そこでfor/select文を抜けて、HTTPリクエストがすべて終わっている/いないにかかわらず、プログラムを終わらせることができます。

注意点として、caseの中でbreakを呼ぶと、select文は抜けられますが、その外側のfor文は抜けられません。そこでfor文にラベルを付け、breakでそのラベルを指定することで、caseからfor/select文を抜けることができます。returnで関数ごと抜けることもできますが、ラベルのbreakもよく使うパターンなので覚えておくとよいでしょう。

チャネルバッファ

make()でチャネルを生成するときに、バッファを指定できます。バッファとは、そのチャネルが内部に保持できるデータの数です。

バッファなしチャネル

バッファを指定せずにmake()で生成したチャネルは、内部に値を保持しておくことができません。

次の場合はmain()内でチャネルの読み出す側に先に到達しますが、ゴルーチン内で値が書き込まれるまでそこで1秒間ブロックします。

func main() {
    ch := make(chan string)
    go func() {
        time.Sleep(time.Second)
        ch <- "a" // 1秒後にデータを書き込む
    }()
    <-ch // 1秒後にデータが書き込まれるまでブロック
}

逆に次の場合は、main()内でチャネルに書き込む側に先に到達しますが、ゴルーチン内でその値が読み出されるまでそこで1秒間ブロックします。

func main() {
    ch := make(chan string)
    go func() {
        time.Sleep(time.Second)
        <-ch // 1秒後にデータを読み出す
    }()
    ch <- "a" // 1秒後にデータが読み出されるまでブロック
}

このことを利用してバッファゼロのチャネルをゴルーチン間の同期制御に使うこともできますが、ブロックしないほうが都合の良い場合もあります。

バッファ付きチャネル

チャネルのバッファサイズはmake()の引数で指定します。たとえば次のようにバッファを3にして生成したチャネルは、同時に3つまでは値を内部に保持できます。そのため、3つまでの書き込みはブロックしません。しかし、4つ目の書き込みは、先にチャネルから値が読み出されないかぎりブロックします。

func main() {
    ch := make(chan string, 3)
    go func() {
        time.Sleep(time.Second)
        <-ch // 1秒後にデータを読み出す
    }()
    ch <- "a" // ブロックしない
    ch <- "b" // ブロックしない
    ch <- "c" // ブロックしない
    ch <- "d" // 1秒後にデータが読み出されるまでブロック
}

このように、バッファ付きのチャネルはメッセージキューのような挙動になります。

たとえば先ほどのstatus取得の例では、いずれのゴルーチンもstatusChanに値を書き込むことで終了するのですが、もしstatusChanからデータを受け取るmain()側の処理が非常に遅かった場合、ゴルーチンはステータスの取得が終わっているのに、書き込みでブロックして閉じることができません。この場合は、statusChanにバッファを付けることで、main()側の処理が遅くても、ゴルーチンはチャネルに値を書き込んで終了することができ、メモリへの負荷を下げることができます。

func getStatus(urls []string) <-chan string {
    // バッファをURLの数(3)に
    statusChan := make(chan string, len(urls))
    for _, url := range urls {
        go func(url string) {
            res, err := http.Get(url)
            if err != nil {
                log.Fatal(err)
            }
            defer res.Body.Close()
            // main()の読み出しが遅くても
            // 3つのゴルーチンはすぐに終わる
            statusChan <- res.Status
        }(url)
    }
    return statusChan
}

ゴルーチンの同時起動数制御

getStatus()にURLが複数渡ってきた場合に、ここまでの実装ではURLの数だけゴルーチンを起動していました。しかし、これではURLの数が多かった場合にゴルーチンが起動し過ぎてしまい、メモリを圧迫する可能性があります。

ここではlimitというバッファ付きのチャネルを用いて、このチャネルに値を書き込める場合はゴルーチンを起動し、ゴルーチンが終わったらlimitから値を読み出すことで、ゴルーチンの同時起動数を制御してみます。この場合、チャネルのバッファにあるデータの数が重要であり、データ自体には意味がないため、サイズがゼロの構造体を用います。

var empty struct{} // サイズがゼロの構造体

func getStatus(urls []string) <-chan string {
    statusChan := make(chan string, 3)
    // バッファを5に指定して生成
    limit := make(chan struct{}, 5)
    go func() {
        for _, url := range urls {
            select {
            case limit <- empty:
                // limitに書き込みが可能な場合は取得処理を実施
                go func(url string) {
                    // このゴルーチンは同時に5つしか起動しない
                    res, err := http.Get(url)
                    if err != nil {
                        log.Fatal(err)
                    }
                    statusChan <- res.Status
                    // 終わったら1つ読み出して空きを作る
                    <-limit
                }(url)
            }
        }
    }()
    return statusChan
}

特集のまとめ

本特集では、Goの文法や型の扱い、代表的なパッケージの扱い、並行処理の考え方などについて、基本的な部分を網羅的に解説しました。解説は基本的に初心者向けのものとなっており、扱いきれなかったものも多々ありますが、今回の特集が、読者のみなさんが書きたいプログラムの作成に進んでいくうえでの足がかりとなれば幸いです。

おすすめ記事

記事・ニュース一覧