Concurrent Programming in Go

Let’s implement an “ID service” that can be used simultaneously by multiple goroutines.

CounterService interface

CounterService interface
type CounterService interface {
    // Returns values in ascending order; it should be safe to call
    // getNext() concurrently from multiple goroutines without any
    // additional synchronization.
    getNext() uint64
}

Used by 1

And let’s implement it using each of the following four strategies:

Verify correctness (aside from “no synchronization”).

1. Hyphotheses about performance and bottlenecks

Expected performance ranking based on gut instinct

  1. No synchronization
  2. Atomic
  3. Mutex
  4. Goroutine

Expected performance ranking based on dissassembly output

There’s a couple of ways to view the assembly of a go program.

GOOS=linux GOARCH=amd64 go tool compile -S atomic.go | sed -n '/^main.nosyncAdd/,/^main/p'

main.nosyncAdd STEXT nosplit size=8 args=0x0 locals=0x0 funcid=0x0 align=0x0
        0x0000 00000 (atomic.go:10)     TEXT    main.nosyncAdd(SB), NOSPLIT|ABIInternal, $0-0
        0x0000 00000 (atomic.go:10)     FUNCDATA        $0, gclocals·g2BeySu+wFnoycgXfElmcg==(SB)
        0x0000 00000 (atomic.go:10)     FUNCDATA        $1, gclocals·g2BeySu+wFnoycgXfElmcg==(SB)
        0x0000 00000 (atomic.go:11)     INCQ    main.c0(SB)
        0x0007 00007 (atomic.go:12)     RET
        0x0000 48 ff 05 00 00 00 00 c3                          H.......
        rel 3+4 t=14 main.c0+0

If you build and want to use external tools like objdump then you might want to pass certain arguments to go build.

go build -gcflags='-N -l' atomic.go

-N builds with no optimizations and -l prevents inlining.

➜  concurrent-programming-in-go git:(main) ✗ objdump -d atomic | grep -a2 nosyncadd
    0000000000482a80 <main.nosyncadd>:
    482a80:       48 ff 05 b9 95 0d 00    incq   0xd95b9(%rip)        # 55c040 <main.c0>
    482a87:       c3                      ret

We’ll come back to this after we’ve written some code.

2. The CounterService interface and 4 different implementations

CounterService interface

The no synchronization and the atomic synchronization are simple and both are the same.

no synchronization struct
type Counter struct {
    id uint64
}

Used by 1

The atomic synchronization happens at the compiler/instruction level, so unlike the mutex example where we have to hold a reference to the mutex, for the atomic version we don’t need to hold a reference to any other data.

(Spoiler alert: it actually uses the x86_64 LOCK instruction prefix. This StackOverflow question has a good description.)

atomic synchronization struct
type AtomicCounter struct {
    id uint64
}

Used by 1

We’ll have to call Lock and Unlock around the critical code for this version so we need access to a mutex.

mutex synchronization struct
type MutexCounter struct {
    id uint64
    mu sync.Mutex
}

Used by 1

Now for the goroutine version.

id should technically be private here. The only way it should be possible to change it is through the getNext() function. But I’m going to ignore that now for the sake of simplicity.

The key to how these channels will avoid race conditions is that we’ll have a “constructor” that will spawn a single goroutine that will read from the ch channel, increment the id, then write the new value to the output channel.

The reason we need the output channel is that the updates to id happen asynchronously inside the goroutine that we span in our constructor. But the caller of getNext needs to know the value of id after that asynchronous update. The way we can block until the update is finished is by having the goroutine that updates the id subsequently push the id onto the output stream and getNext will return the value from that stream. This way, getNext blocks until it gets the updated value and thus the caller will get the updated value.

goroutine synchronization struct
type GoroutineCounter struct {
    ch chan bool
    out chan uint64
    id uint64
}

Used by 1

3. Testing and verification.

Before we look at the implementation, I want to thing about how we’ll test this.

A reminder of requirements:

Race detection is the easiest. I won’t bother putting that in a test suite. I’ll just run it from the command line. go run -race <file> or go test -race <package> Aside from the -race option, we should also be able to catch race detection by asserting the maximum value returne by getNext() matches the total number of calls across all goroutines.

For monotonically increasing, I’ll track the previous and current values as each thread iteratively calls getNext() and I’ll panic if it’s ever non-monotonic (diatonic?).

In addition to tracking previous and current, I’ll have a helper Max function to track the greatest value. Since the maximum value will be getting updated in a number of different threads, that itself will also have to be concurrency-safe.

concurrency_test.go

We’ll start with a generic test function that can be used to test any CounterService.

It will create NUM_THREADS goroutines and each of those will iterate NUM_ITER / NUM_THREADS times calling counterService.getNext() each iteration.

After each call we’ll ensure monotonicity by checking that the most recent value is greater than the previous value and we’ll atomically update the maximum value that any thread has seen.

Once all threads have finished, we’ll ensure the maximum value seen by any thread is equal to NUM_THREADS * NUM_ITER

generic CounterService test function
func test(counterService CounterService) uint64 {
    var wg sync.WaitGroup
    var max Max
    for i := 0; i < NUM_THREADS; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            var current, previous uint64 = 0, 0
            for j := 0; j < NUM_ITER / NUM_THREADS; j++ {
                @{get next value and ensure monotonicity}
                @{set the shared max to the max of all threads}
            }
        }()
    }
    wg.Wait()
    return max.v
}

Used by 1

This next struct, max, includes a mutex to make sure that we can concurrently update the maximum value seen across all threads.

struct to hold shared max across threads
type Max struct {
    v uint64
    mu sync.Mutex
}

Used by 1

get next value and ensure monotonicity
previous, current = current, counterService.getNext()
if current < previous {
    panic(fmt.Sprintf("%#v is non-monotonic", counterService))
}

Used by 1

set the shared max to the max of all threads
max.mu.Lock()
max.v = MaxUint64(current, max.v)
max.mu.Unlock()

Used by 1

I’ll add the actual complete test later. But for now, our concurrency_test.go file looks like this:

/concurrency/concurrency_test.go
package concurrency

import (
    "fmt"
    "sync"
    "testing"
)

var NUM_THREADS = 4
var NUM_ITER = int(1e6)

@{generic CounterService test function}

4. concurrency.go

concurrency.go will hold our concurrency package with the interfaces and implementations.

/concurrency/concurrency.go
package concurrency

import (
    "sync"
    "sync/atomic"
)

@{CounterService interface}
@{structs}
@{implementations}

The structs were already defined earlier.

structs
@{struct to hold shared max across threads}
@{no synchronization struct}
@{atomic synchronization struct}
@{mutex synchronization struct}
@{goroutine synchronization struct}

Used by 1

Implementations

implementations
@{no synchronization implementation}
@{atomic synchronization implementation}
@{mutex synchronization implementation}
@{goroutine synchronization implementation}

Used by 1

The implementations we’ll define now.

After each implementation, I’ll include the assembly so that we can make some better informed hypotheses about performance. The assembly produced by go tool compile -S concurrency/concurrency.go is noisy, so I’ll elide what I think is noise and just include the core operations of each function.

Self explanatory…

no synchronization implementation
func (c *Counter) getNext() uint64 {
    c.id += 1
    return c.id
}

Used by 1

No synchronization performance

Simple, but obviously wrong. I’m actually not even testing anything about this. Although, I should add some confirmation that this does indeed result in race conditions. I tested it by hand early on. I’m running low on time and probably won’t get around to adding it to this document.

<unlinkable>.(*Counter).getNext STEXT nosplit size=13 args=0x8 locals=0x0 funcid=0x0 align=0x0
        // ...
        0x0000 00000 (concurrency/concurrency.go:33)    MOVQ    (AX), CX
        0x0003 00003 (concurrency/concurrency.go:33)    INCQ    CX
        0x0006 00006 (concurrency/concurrency.go:33)    MOVQ    CX, (AX)
        0x0009 00009 (concurrency/concurrency.go:34)    MOVQ    CX, AX
        0x000c 00012 (concurrency/concurrency.go:34)    RET

Atomic

https://pkg.go.dev/sync/atomic#AddUint64

Sidenote: you’ll see a lot of uint64. I didn’t pick that type for any particular reason. It’s arbitrary.

atomic synchronization implementation
func (c *AtomicCounter) getNext() uint64 {
    return atomic.AddUint64(&c.id, 1)
}

Used by 1

Atomic performance

<unlinkable>.(*AtomicCounter).getNext STEXT nosplit size=15 args=0x8 locals=0x0 funcid=0x0 align=0x0
        // ...
        0x0000 00000 (concurrency/concurrency.go:37)    MOVL    $1, CX
        0x0005 00005 (concurrency/concurrency.go:37)    LOCK
        0x0006 00006 (concurrency/concurrency.go:37)    XADDQ   CX, (AX)
        0x000a 00010 (concurrency/concurrency.go:37)    LEAQ    1(CX), AX
        0x000e 00014 (concurrency/concurrency.go:37)    RET

Mutex implementation

mutex synchronization implementation
func (c *MutexCounter) getNext() uint64 {
    c.mu.Lock()
    c.id++
    v := c.id
    c.mu.Unlock()
    return v
}

Used by 1

Mutex performance

https://pkg.go.dev/sync#Mutex

Needless to say… we’re getting into a large number of assembly instructions at this point.

It looks like our original intuition was right so far.

<unlinkable>.(*MutexCounter).getNext STEXT size=149 args=0x8 locals=0x30 funcid=0x0 align=0x0
        // ...
        0x0014 00020 (concurrency/concurrency.go:40)    MOVQ    AX, CX
        0x0017 00023 (concurrency/concurrency.go:40)    XORL    AX, AX
        0x0019 00025 (concurrency/concurrency.go:40)    MOVL    $1, DX
        0x001e 00030 (concurrency/concurrency.go:40)    LOCK
        0x001f 00031 (concurrency/concurrency.go:40)    CMPXCHGL        DX, 8(CX)
        0x0023 00035 (concurrency/concurrency.go:40)    SETEQ   DL
        0x0026 00038 (concurrency/concurrency.go:40)    LEAQ    8(CX), AX
        0x002a 00042 ($GOROOT/src/sync/mutex.go:83)     TESTB   DL, DL
        0x002c 00044 ($GOROOT/src/sync/mutex.go:83)     JNE     71
        0x002e 00046 (concurrency/concurrency.go:39)    MOVQ    CX, <unlinkable>..autotmp_9+32(SP)
        0x0033 00051 (concurrency/concurrency.go:40)    MOVQ    AX, sync.m+24(SP)
        0x0038 00056 ($GOROOT/src/sync/mutex.go:90)     PCDATA  $1, $1
        0x0038 00056 ($GOROOT/src/sync/mutex.go:90)     CALL    sync.(*Mutex).lockSlow(SB)
        0x003d 00061 ($GOROOT/src/sync/mutex.go:223)    MOVQ    sync.m+24(SP), AX
        0x0042 00066 (concurrency/concurrency.go:41)    MOVQ    <unlinkable>..autotmp_9+32(SP), CX
        0x0047 00071 (concurrency/concurrency.go:41)    MOVQ    (CX), DX
        0x004a 00074 (concurrency/concurrency.go:41)    INCQ    DX
        0x004d 00077 (concurrency/concurrency.go:41)    MOVQ    DX, (CX)
        0x0050 00080 (concurrency/concurrency.go:43)    XCHGL   AX, AX
        0x0051 00081 ($GOROOT/src/sync/mutex.go:219)    MOVL    $-1, BX
        0x0056 00086 ($GOROOT/src/sync/mutex.go:219)    LOCK
        0x0057 00087 ($GOROOT/src/sync/mutex.go:219)    XADDL   BX, 8(CX)
        0x005b 00091 ($GOROOT/src/sync/mutex.go:219)    DECL    BX
        0x005d 00093 ($GOROOT/src/sync/mutex.go:219)    NOP
        0x0060 00096 ($GOROOT/src/sync/mutex.go:220)    TESTL   BX, BX
        0x0062 00098 ($GOROOT/src/sync/mutex.go:220)    JEQ     115
        0x0064 00100 (concurrency/concurrency.go:41)    MOVQ    DX, <unlinkable>..autotmp_10+16(SP)
        0x0069 00105 ($GOROOT/src/sync/mutex.go:223)    PCDATA  $1, $2
        0x0069 00105 ($GOROOT/src/sync/mutex.go:223)    CALL    sync.(*Mutex).unlockSlow(SB)
        0x006e 00110 (concurrency/concurrency.go:44)    MOVQ    <unlinkable>..autotmp_10+16(SP), DX
        0x0073 00115 (concurrency/concurrency.go:44)    MOVQ    DX, AX
        0x0076 00118 (concurrency/concurrency.go:44)    MOVQ    40(SP), BP
        0x007b 00123 (concurrency/concurrency.go:44)    ADDQ    $48, SP
        0x007f 00127 (concurrency/concurrency.go:44)    NOP
        0x0080 00128 (concurrency/concurrency.go:44)    RET

Goroutine implementation

This one needs an initializer since we have channels in a struct.

I went with the default channel buffer size of 0. That means every send will block until another goroutine receives. This is fine because I’m sending and receiving in back-to-back expressions in getNext().

goroutine synchronization implementation
func NewGoroutineCounter() *GoroutineCounter {
    var c GoroutineCounter
    c.ch = make(chan bool)
    c.out = make(chan uint64)
    go func() {
        for _ = range c.ch {
            c.id++
            c.out <- c.id
        }
    }()
    return &c
}

func (c *GoroutineCounter) getNext() uint64 {
    c.ch <- true
    return <- c.out
}

Used by 1

Goroutine performance

<unlinkable>.NewGoroutineCounter STEXT size=170 args=0x0 locals=0x20 funcid=0x0 align=0x0
        // ...
        0x0020 00032 (concurrency/concurrency.go:47)    CALL    runtime.newobject(SB)
        0x0025 00037 (concurrency/concurrency.go:47)    MOVQ    AX, <unlinkable>.&c+16(SP)
        0x002a 00042 (concurrency/concurrency.go:48)    XORL    BX, BX
        0x002c 00044 (concurrency/concurrency.go:48)    LEAQ    type.chan bool(SB), AX
        0x0033 00051 (concurrency/concurrency.go:48)    PCDATA  $1, $1
        0x0033 00051 (concurrency/concurrency.go:48)    CALL    runtime.makechan(SB)
        0x0038 00056 (concurrency/concurrency.go:48)    PCDATA  $0, $-2
        0x0038 00056 (concurrency/concurrency.go:48)    CMPL    runtime.writeBarrier(SB), $0
        0x003f 00063 (concurrency/concurrency.go:48)    NOP
        0x0040 00064 (concurrency/concurrency.go:48)    JNE     76
        0x0042 00066 (concurrency/concurrency.go:48)    MOVQ    <unlinkable>.&c+16(SP), CX
        0x0047 00071 (concurrency/concurrency.go:48)    MOVQ    AX, (CX)
        0x004a 00074 (concurrency/concurrency.go:48)    JMP     86
        0x004c 00076 (concurrency/concurrency.go:48)    MOVQ    <unlinkable>.&c+16(SP), DI
        0x0051 00081 (concurrency/concurrency.go:48)    CALL    runtime.gcWriteBarrier(SB)
        0x0056 00086 (concurrency/concurrency.go:49)    PCDATA  $0, $-1
        0x0056 00086 (concurrency/concurrency.go:49)    LEAQ    type.chan uint64(SB), AX
        0x005d 00093 (concurrency/concurrency.go:49)    XORL    BX, BX
        0x005f 00095 (concurrency/concurrency.go:49)    NOP
        0x0060 00096 (concurrency/concurrency.go:49)    CALL    runtime.makechan(SB)
        0x0065 00101 (concurrency/concurrency.go:49)    PCDATA  $0, $-2
        0x0065 00101 (concurrency/concurrency.go:49)    CMPL    runtime.writeBarrier(SB), $0
        0x006c 00108 (concurrency/concurrency.go:49)    JNE     121
        0x006e 00110 (concurrency/concurrency.go:49)    MOVQ    <unlinkable>.&c+16(SP), CX
        0x0073 00115 (concurrency/concurrency.go:49)    MOVQ    AX, 8(CX)
        0x0077 00119 (concurrency/concurrency.go:49)    JMP     144
        0x0079 00121 (concurrency/concurrency.go:49)    MOVQ    <unlinkable>.&c+16(SP), DI
        0x007e 00126 (concurrency/concurrency.go:49)    LEAQ    8(DI), CX
        0x0082 00130 (concurrency/concurrency.go:47)    MOVQ    DI, DX
        0x0085 00133 (concurrency/concurrency.go:49)    MOVQ    CX, DI
        0x0088 00136 (concurrency/concurrency.go:49)    CALL    runtime.gcWriteBarrier(SB)
        0x008d 00141 (concurrency/concurrency.go:50)    MOVQ    DX, CX
        0x0090 00144 (concurrency/concurrency.go:50)    PCDATA  $0, $-1
        0x0090 00144 (concurrency/concurrency.go:50)    MOVQ    CX, AX
        0x0093 00147 (concurrency/concurrency.go:50)    MOVQ    24(SP), BP
        0x0098 00152 (concurrency/concurrency.go:50)    ADDQ    $32, SP
        0x009c 00156 (concurrency/concurrency.go:50)    RET

5. Testing each version

/concurrency/concurrency_test.go +=
func TestAtomicSynchronization(t *testing.T) {
    var atomicCounter AtomicCounter
    maxId := test(&atomicCounter)
    if (int(maxId) != NUM_ITER) {
        t.Error("Wrong max id", maxId, NUM_ITER)
    }
}
/concurrency/concurrency_test.go +=
func TestMutexSynchronization(t *testing.T) {
    var mutexCounter MutexCounter
    maxId := test(&mutexCounter)
    if (int(maxId) != NUM_ITER) {
        t.Error("Wrong max id", maxId, NUM_ITER)
    }
}
/concurrency/concurrency_test.go +=
func TestGoroutineSynchronization(t *testing.T) {
    goroutineCounter := NewGoroutineCounter()
    maxId := test(goroutineCounter)
    t.Log(maxId)
    if (int(maxId) != NUM_ITER) {
        t.Error("Wrong max id", maxId, NUM_ITER)
    }
}
/concurrency/concurrency_test.go +=
func MaxUint64(m uint64, n uint64) uint64 {
    if m > n {
        return m
    } else {
        return n
    }
}

6. Results

results
➜  concurrent-programming-in-go git:(main) ✗ go clean -testcache
➜  concurrent-programming-in-go git:(main) ✗ go test -race ./concurrency
ok      owoga.com/concurrency/concurrency       2.844s