Concurrency

Google I/O 2012 - Go Cucurrency Patterns by Rob Pike

Introduction

Concurrency is the composition of independently executing computations

Concurrency is a way to structure software, particularly as a way to write clean code that interacts well with the real world.

it is not parallelism, if you have only one processor, your program can still be concurrent but it cannot be parallel.

History

Concurrency features are rooted in a long history, reaching back to Hoare's CSP in 1978 and even Dijkstra's guarded commands (1975).

Compared to Erlang, Erlang also uses CSP, but erlang communicates to a process by name rather than over a channel, (Erlang writing to a file by name (process) vs Go writing to a file descriptor (channel))

Example

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    go boring("boring!")
    fmt.Println("I'm listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You're boring; I'm leaving.")
}

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

Goroutines

What it is a goroutine? It's an independently executing function, launched by a go statement.

It has its own call stack, which grows and shrinks as required.

It's very cheap. It's practical to have thousands, even hundreds of thousands of goroutines.

It is not a thread

There might by only one thread in a program with thousands of goroutines

Instead, goroutines are multiplexed dynamically onto threads as needed to keep all the goroutines running.

But if you think of it as a very cheap thread, you won't be far off

Channels

A channel in Go provides a connection between two goroutines, allowing them to communicate


// Declaring and initializing
var c chan int
c = make(chan int)
// or
c := make(chan int)
// Sending on a channel
c <- 1
// Receiving from a channel.
// The "arrow" indicates the direction of data flow.
value = <-c

Using channels

func main() {
    c := make(chan string)
    go boring("boring!", c)
    for i := 0; i < 5; i++ {
        fmt.Printf("You say: %q\n", <-c)
    }
    fmt.Println("You're boring; I'm leaving.")
}

func boring(msg string, c chan string) {
    for i := 0; ; i++ {
        c <- fmt.Sprintf("%s %d", msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

When the main function executes <-c, it will wait for a value to be sent. (blocking)

Similarly, when the boring function executes c <- value, it waits for a receiver to be ready. (blocking)

A sender and receiver must both be ready to play their part in the communication. Other we wait until they are.

Thus channels both communicate and synchronize.

Note for experts: Go channels can also be created with a buffer.

Buffering removes synchronization!

Buffering makes them more like Erlang's mailboxes.

Don't communicate by sharing memory, share memory by communicating.

Patten

Generator: function that returns a channel

func main() {
    c := boring("boring!")
    for i := 0; i < 5; i++ {
        fmt.Println("You say: %q\n", <-c)
    }
    fmt.Println("You're boring; I'm leaving.")
}

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
    c := make(chan string)
    go func() {
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c // return the channel to the caller.
}

this can be used to construct services!

func main() {
    joe := boring("Joe")
    ann := boring("Ann")
    for i := 0; i < 5; i++ {
        fmt.Println(<-joe)
        fmt.Println(<-ann)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

However, in the code above, joe always goes before ann because of synchronization


func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() { for { c <- <-input1 } } ()
    go func() { for { c <- <-input2 } } ()
    return c
}

func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

This time, you have an extra channel to aggregate (fan in) all messages

type MessageR struct {
	str string
	wait chan bool // embedding a sender controller channel to the receiver
}

func boringR(msg string) <-chan MessageR { // Returns receive-only channel of strings.
	c := make(chan MessageR)
	waitForIt := make(chan bool)
	go func() {
		for i := 0; ; i++ {
			c <- MessageR{ fmt.Sprintf("%s %d", msg, i), waitForIt}
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
			<-waitForIt
		}
	}()
	return c // return the channel to the caller.
}

func fanInR(input1, input2 <-chan MessageR) <-chan MessageR {
	c := make(chan MessageR)
	go func() { for { c <- <-input1 } } ()
	go func() { for { c <- <-input2 } } ()
	return c
}

func main() {
	c := fanInR(boringR("Joe"), boringR("Ann"))
	for i := 0; i < 5; i++ {
		msg1 := <-c; fmt.Println(msg1.str)
		msg2 := <-c; fmt.Println(msg2.str)
		msg1.wait <- true
		msg2.wait <- true
	}
	fmt.Println("You're both boring; I'm leaving.")
}

We could embed a sender controller channel to the receiver, to control the send behaviour

Select

The select statement provides another way to handle multiple channels. It's like a switch, but each case is a communication:

* All channels are evaluated. * Selection blocks until one communication can proceed, which then does. * If multiple can proceed, select choose pseudo-randomly. * A default clause, if present, executes immediately if no channel is ready.

select {
case v1:= <-c1:
    fmt.Printf("received %v from c1\n", v1)
case v2:= <-c2:
    fmt.Printf("received %v from c2\n", v2)
case c3 <- 23:
    fmt.Printf("send %v to c3\n", 23)
default:
    fmt.Printf("no one was ready to communicate\n")
}
func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1: c <- s
            case s := <-input2: c <- s
            }
        }
    }()
    return c
}

time.After function returns a channel that blocks for the specified duration. After the interval, the channel delivers the current time, once.

func main() {
    c := boring("Joe")
    for {
        select {
        case s := <-c:
            fmt.Println(s)
        case <- time.After(1 * time.Second):
            fmt.Println("You are too slow.")
            return
        }
    }
}

global for loop timeout instead of timeout for each message

func main() {
    c := boring("Joe")
    timeout := time.After(5 * time.Second)
    for {
        select {
        case s := <-c:
            fmt.Println(s)
        case <-timeout:
            fmt.Println("You talk too much.")
            return
        }
    }
}