Advanced Go Concurrency Patterns Sameer Ajmani Google http://profiles.google.com/ajmani @Sajma https://go.dev * Video This talk was presented at Google I/O in May 2013. .link https://www.youtube.com/watch?v=QDDwwePbDtw Watch the talk on YouTube * Get ready .image advconc/gopherswim.jpg 400 400 * Go supports concurrency In the language and runtime, not a library. This changes how you structure your programs. * Goroutines and Channels Goroutines are independently executing functions in the same address space. go f() go g(1, 2) Channels are typed values that allow goroutines to synchronize and exchange information. c := make(chan int) go func() { c <- 3 }() n := <-c For more on the basics, watch [[/talks/2012/concurrency.slide#1][Go Concurrency Patterns (Pike, 2012)]]. * Example: ping-pong .play advconc/pingpong/pingpong.go /STARTMAIN1/,/STOPMAIN1/ * Deadlock detection .play advconc/pingpongdeadlock/pingpongdeadlock.go /STARTMAIN1/,/STOPMAIN1/ * Panic dumps the stacks .play advconc/pingpongpanic/pingpongpanic.go /STARTMAIN1/,/STOPMAIN1/ * It's easy to go, but how to stop? Long-lived programs need to clean up. Let's look at how to write programs that handle communication, periodic events, and cancellation. The core is Go's `select` statement: like a `switch`, but the decision is made based on the ability to communicate. select { case xc <- x: // sent x on xc case y := <-yc: // received y from yc } * Example: feed reader My favorite feed reader disappeared. I need a new one. Why not write one? Where do we start? * Find an RSS client Searching [[https://pkg.go.dev][pkg.go.dev]] for *"rss"* turns up several hits, including one that provides: // Fetch fetches Items for uri and returns the time when the next // fetch should be attempted. On failure, Fetch returns an error. func Fetch(uri string) (items []Item, next time.Time, err error) type Item struct{ Title, Channel, GUID string // a subset of RSS fields } But I want a stream: <-chan Item And I want multiple subscriptions. * Here's what we have type Fetcher interface { Fetch() (items []Item, next time.Time, err error) } func Fetch(domain string) Fetcher {...} // fetches Items from domain * Here's what we want type Subscription interface { Updates() <-chan Item // stream of Items Close() error // shuts down the stream } func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream func Merge(subs ...Subscription) Subscription {...} // merges several streams * Example .play advconc/fakemain/fakemain.go /func main/,/^}/ * Subscribe `Subscribe` creates a new `Subscription` that repeatedly fetches items until `Close` is called. func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates } go s.loop() return s } // sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // delivers items to the user } // loop fetches items using s.fetcher and sends them // on s.updates. loop exits when s.Close is called. func (s *sub) loop() {...} * Implementing Subscription To implement the `Subscription` interface, define `Updates` and `Close`. .code advconc/fakemain/fakemain.go /func.* Updates/,/^}/ func (s *sub) Close() error { // TODO: make loop exit // TODO: find out about any error return err } * What does loop do? - periodically call `Fetch` - send fetched items on the `Updates` channel - exit when `Close` is called, reporting any error * Naive Implementation # Not quite enough room for this; retry after format change: # .play advconc/naivemain/naivemain.go /naiveSub\) loop/,/^}/ # also on subsequent slides. .play advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / .code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ * Bug 1: unsynchronized access to s.closed/s.err .code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsync .code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ HLsync * Race Detector go run -race naivemain.go # original is 400x1500 .image advconc/race.png 150 562 .play advconc/naivemain/naivemain.go /STARTNAIVE /,/s.err/ HLsync .code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ HLsync * Bug 2: time.Sleep may keep loop running .code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsleep * Bug 3: loop may block forever on s.updates .code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsend * Solution Change the body of `loop` to a `select` with three cases: - `Close` was called - it's time to call `Fetch` - send an item on `s.updates` * Structure: for-select loop `loop` runs in its own goroutine. `select` lets `loop` avoid blocking indefinitely in any one state. func (s *sub) loop() { ... declare mutable state ... for { ... set up channels for cases ... select { case <-c1: ... read/write state ... case c2 <- x: ... read/write state ... case y := <-c3: ... read/write state ... } } } The cases interact via local state in `loop`. * Case 1: Close `Close` communicates with `loop` via `s.closing`. type sub struct { closing chan chan error } The service (`loop`) listens for requests on its channel (`s.closing`). The client (`Close`) sends a request on `s.closing`: _exit_and_reply_with_the_error_ In this case, the only thing in the request is the _reply_channel_. * Case 1: Close `Close` asks loop to exit and waits for a response. .code advconc/fakemain/fakemain.go /\*sub\) Close/,/^}/ HLchan `loop` handles `Close` by replying with the `Fetch` error and exiting. .code advconc/fakemain/fakemain.go /STARTCLOSEONLY /,/STOPCLOSEONLY / HLchan * Case 2: Fetch Schedule the next `Fetch` after some delay. .code advconc/fakemain/fakemain.go /STARTFETCHONLY /,/STOPFETCHONLY / * Case 3: Send Send the fetched items, one at a time. var pending []Item // appended by fetch; consumed by send for { select { case s.updates <- pending[0]: pending = pending[1:] } } Whoops. This crashes. .image advconc/gopherswrench.jpg 200 337 * Select and nil channels Sends and receives on nil channels block. Select never selects a blocking case. .play advconc/nilselect/nilselect.go /func main/,/^}/ * Case 3: Send (fixed) Enable send only when pending is non-empty. .code advconc/fakemain/fakemain.go /STARTSENDONLY /,/STOPSENDONLY / HLupdates * Select Put the three cases together: .code advconc/fakemain/fakemain.go /STARTSELECT /,/STOPSELECT / The cases interact via `err`, `next`, and `pending`. No locks, no condition variables, no callbacks. * Bugs fixed - Bug 1: unsynchronized access to `s.closed` and `s.err` - Bug 2: `time.Sleep` may keep loop running - Bug 3: `loop` may block forever sending on `s.updates` .code advconc/fakemain/fakemain.go /STARTSELECT /,/STOPSELECT / HLcases * We can improve loop further * Issue: Fetch may return duplicates .code advconc/fakemain/fakemain.go /STARTFETCHVARS /,/STOPFETCHVARS / HLfetch .code advconc/fakemain/fakemain.go /STARTFETCHCASE /,/STOPFETCHCASE / HLfetch * Fix: Filter items before adding to pending .code advconc/fakemain/fakemain.go /STARTSEEN /,/STOPSEEN / HLseen .code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe * Issue: Pending queue grows without bound .code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe * Fix: Disable fetch case when too much pending const maxPending = 10 .code advconc/fakemain/fakemain.go /STARTCAP /,/STOPCAP / HLcap Could instead drop older items from the head of `pending`. * Issue: Loop blocks on Fetch .code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLfetch * Fix: Run Fetch asynchronously Add a new `select` case for `fetchDone`. type fetchResult struct{ fetched []Item; next time.Time; err error } .code advconc/fakemain/fakemain.go /STARTFETCHDONE /,/STOPFETCHDONE / HLfetch .code advconc/fakemain/fakemain.go /STARTFETCHIF /,/STOPFETCHIF / HLfetch .code advconc/fakemain/fakemain.go /STARTFETCHASYNC /,/STOPFETCHASYNC / HLfetch * Implemented Subscribe Responsive. Cleans up. Easy to read and change. Three techniques: - `for-select` loop - service channel, reply channels (`chan`chan`error`) - `nil` channels in `select` cases More details online, including `Merge`. .image advconc/gopherhat.jpg 200 158 * Conclusion Concurrent programming can be tricky. Go makes it easier: - channels convey data, timer events, cancellation signals - goroutines serialize access to local mutable state - stack traces & deadlock detector - race detector .image advconc/race.png 200 750 * Links Go Concurrency Patterns (2012) .link /talks/2012/concurrency.slide go.dev/talks/2012/concurrency.slide Concurrency is not parallelism .link /s/concurrency-is-not-parallelism go.dev/s/concurrency-is-not-parallelism Share memory by communicating .link /doc/codewalk/sharemem go.dev/doc/codewalk/sharemem Go Tour (learn Go in your browser) .link /tour/ go.dev/tour