Let’s implement an “ID service” that can be used simultaneously by multiple goroutines.
type CounterService interface {
// Returns values in ascending order; it should be safe to call
// getNext() concurrently from multiple goroutines without any
// additional synchronization.
getNext() uint64
}
Used by 1
And let’s implement it using each of the following four strategies:
Verify correctness (aside from “no synchronization”).
getNext() are monotonically increasinggetNext() matches the total number of calls across all goroutinesThere’s a couple of ways to view the assembly of a go program.
GOOS=linux GOARCH=amd64 go tool compile -S atomic.go | sed -n '/^main.nosyncAdd/,/^main/p'
main.nosyncAdd STEXT nosplit size=8 args=0x0 locals=0x0 funcid=0x0 align=0x0
0x0000 00000 (atomic.go:10) TEXT main.nosyncAdd(SB), NOSPLIT|ABIInternal, $0-0
0x0000 00000 (atomic.go:10) FUNCDATA $0, gclocals·g2BeySu+wFnoycgXfElmcg==(SB)
0x0000 00000 (atomic.go:10) FUNCDATA $1, gclocals·g2BeySu+wFnoycgXfElmcg==(SB)
0x0000 00000 (atomic.go:11) INCQ main.c0(SB)
0x0007 00007 (atomic.go:12) RET
0x0000 48 ff 05 00 00 00 00 c3 H.......
rel 3+4 t=14 main.c0+0
If you build and want to use external tools like objdump then you might want to pass certain arguments to go build.
go build -gcflags='-N -l' atomic.go
-N builds with no optimizations and -l prevents inlining.
➜ concurrent-programming-in-go git:(main) ✗ objdump -d atomic | grep -a2 nosyncadd
0000000000482a80 <main.nosyncadd>:
482a80: 48 ff 05 b9 95 0d 00 incq 0xd95b9(%rip) # 55c040 <main.c0>
482a87: c3 ret
We’ll come back to this after we’ve written some code.
The no synchronization and the atomic synchronization are simple and both are the same.
The atomic synchronization happens at the compiler/instruction level, so unlike the mutex example where we have to hold a reference to the mutex, for the atomic version we don’t need to hold a reference to any other data.
(Spoiler alert: it actually uses the x86_64 LOCK instruction prefix. This StackOverflow question has a good description.)
We’ll have to call Lock and Unlock around the critical code for this version so we need access to a mutex.
Now for the goroutine version.
id should technically be private here.
The only way it should be possible to change it is through the getNext() function.
But I’m going to ignore that now for the sake of simplicity.
The key to how these channels will avoid race conditions is that we’ll have a “constructor” that will spawn a single goroutine that will read from the ch channel, increment the id, then write the new value to the output channel.
The reason we need the output channel is that the updates to id happen asynchronously inside the goroutine that we span in our constructor.
But the caller of getNext needs to know the value of id after that asynchronous update.
The way we can block until the update is finished is by having the goroutine that updates the id subsequently push the id onto the output stream and getNext will return the value from that stream.
This way, getNext blocks until it gets the updated value and thus the caller will get the updated value.
type GoroutineCounter struct {
ch chan bool
out chan uint64
id uint64
}
Used by 1
Before we look at the implementation, I want to thing about how we’ll test this.
A reminder of requirements:
getNext() are monotonically increasinggetNext() matches the total number of calls across all goroutinesRace detection is the easiest.
I won’t bother putting that in a test suite.
I’ll just run it from the command line. go run -race <file> or go test -race <package>
Aside from the -race option, we should also be able to catch race detection by asserting the maximum value returne by getNext() matches the total number of calls across all goroutines.
For monotonically increasing, I’ll track the previous and current values as each thread iteratively calls getNext() and I’ll panic if it’s ever non-monotonic (diatonic?).
In addition to tracking previous and current, I’ll have a helper Max function to track the greatest value. Since the maximum value will be getting updated in a number of different threads, that itself will also have to be concurrency-safe.
We’ll start with a generic test function that can be used to test any CounterService.
It will create NUM_THREADS goroutines and each of those will iterate NUM_ITER / NUM_THREADS times calling counterService.getNext() each iteration.
After each call we’ll ensure monotonicity by checking that the most recent value is greater than the previous value and we’ll atomically update the maximum value that any thread has seen.
Once all threads have finished, we’ll ensure the maximum value seen by any thread is equal to NUM_THREADS * NUM_ITER
func test(counterService CounterService) uint64 {
var wg sync.WaitGroup
var max Max
for i := 0; i < NUM_THREADS; i++ {
wg.Add(1)
go func() {
defer wg.Done()
var current, previous uint64 = 0, 0
for j := 0; j < NUM_ITER / NUM_THREADS; j++ {
@{get next value and ensure monotonicity}
@{set the shared max to the max of all threads}
}
}()
}
wg.Wait()
return max.v
}
Used by 1
This next struct, max, includes a mutex to make sure that we can concurrently update the maximum value seen across all threads.
previous, current = current, counterService.getNext()
if current < previous {
panic(fmt.Sprintf("%#v is non-monotonic", counterService))
}
Used by 1
max.mu.Lock()
max.v = MaxUint64(current, max.v)
max.mu.Unlock()
Used by 1
I’ll add the actual complete test later.
But for now, our concurrency_test.go file looks like this:
package concurrency
import (
"fmt"
"sync"
"testing"
)
var NUM_THREADS = 4
var NUM_ITER = int(1e6)
@{generic CounterService test function}
concurrency.go will hold our concurrency package with the interfaces and implementations.
package concurrency
import (
"sync"
"sync/atomic"
)
@{CounterService interface}
@{structs}
@{implementations}
The structs were already defined earlier.
@{struct to hold shared max across threads}
@{no synchronization struct}
@{atomic synchronization struct}
@{mutex synchronization struct}
@{goroutine synchronization struct}
Used by 1
@{no synchronization implementation}
@{atomic synchronization implementation}
@{mutex synchronization implementation}
@{goroutine synchronization implementation}
Used by 1
The implementations we’ll define now.
After each implementation, I’ll include the assembly so that we can make some better informed hypotheses about performance.
The assembly produced by go tool compile -S concurrency/concurrency.go is noisy, so I’ll elide what I think is noise and just include the core operations of each function.
Self explanatory…
func (c *Counter) getNext() uint64 {
c.id += 1
return c.id
}
Used by 1
Simple, but obviously wrong. I’m actually not even testing anything about this. Although, I should add some confirmation that this does indeed result in race conditions. I tested it by hand early on. I’m running low on time and probably won’t get around to adding it to this document.
<unlinkable>.(*Counter).getNext STEXT nosplit size=13 args=0x8 locals=0x0 funcid=0x0 align=0x0
// ...
0x0000 00000 (concurrency/concurrency.go:33) MOVQ (AX), CX
0x0003 00003 (concurrency/concurrency.go:33) INCQ CX
0x0006 00006 (concurrency/concurrency.go:33) MOVQ CX, (AX)
0x0009 00009 (concurrency/concurrency.go:34) MOVQ CX, AX
0x000c 00012 (concurrency/concurrency.go:34) RET
https://pkg.go.dev/sync/atomic#AddUint64
Sidenote: you’ll see a lot of uint64. I didn’t pick that type for any particular reason. It’s arbitrary.
func (c *AtomicCounter) getNext() uint64 {
return atomic.AddUint64(&c.id, 1)
}
Used by 1
<unlinkable>.(*AtomicCounter).getNext STEXT nosplit size=15 args=0x8 locals=0x0 funcid=0x0 align=0x0
// ...
0x0000 00000 (concurrency/concurrency.go:37) MOVL $1, CX
0x0005 00005 (concurrency/concurrency.go:37) LOCK
0x0006 00006 (concurrency/concurrency.go:37) XADDQ CX, (AX)
0x000a 00010 (concurrency/concurrency.go:37) LEAQ 1(CX), AX
0x000e 00014 (concurrency/concurrency.go:37) RET
func (c *MutexCounter) getNext() uint64 {
c.mu.Lock()
c.id++
v := c.id
c.mu.Unlock()
return v
}
Used by 1
Needless to say… we’re getting into a large number of assembly instructions at this point.
It looks like our original intuition was right so far.
<unlinkable>.(*MutexCounter).getNext STEXT size=149 args=0x8 locals=0x30 funcid=0x0 align=0x0
// ...
0x0014 00020 (concurrency/concurrency.go:40) MOVQ AX, CX
0x0017 00023 (concurrency/concurrency.go:40) XORL AX, AX
0x0019 00025 (concurrency/concurrency.go:40) MOVL $1, DX
0x001e 00030 (concurrency/concurrency.go:40) LOCK
0x001f 00031 (concurrency/concurrency.go:40) CMPXCHGL DX, 8(CX)
0x0023 00035 (concurrency/concurrency.go:40) SETEQ DL
0x0026 00038 (concurrency/concurrency.go:40) LEAQ 8(CX), AX
0x002a 00042 ($GOROOT/src/sync/mutex.go:83) TESTB DL, DL
0x002c 00044 ($GOROOT/src/sync/mutex.go:83) JNE 71
0x002e 00046 (concurrency/concurrency.go:39) MOVQ CX, <unlinkable>..autotmp_9+32(SP)
0x0033 00051 (concurrency/concurrency.go:40) MOVQ AX, sync.m+24(SP)
0x0038 00056 ($GOROOT/src/sync/mutex.go:90) PCDATA $1, $1
0x0038 00056 ($GOROOT/src/sync/mutex.go:90) CALL sync.(*Mutex).lockSlow(SB)
0x003d 00061 ($GOROOT/src/sync/mutex.go:223) MOVQ sync.m+24(SP), AX
0x0042 00066 (concurrency/concurrency.go:41) MOVQ <unlinkable>..autotmp_9+32(SP), CX
0x0047 00071 (concurrency/concurrency.go:41) MOVQ (CX), DX
0x004a 00074 (concurrency/concurrency.go:41) INCQ DX
0x004d 00077 (concurrency/concurrency.go:41) MOVQ DX, (CX)
0x0050 00080 (concurrency/concurrency.go:43) XCHGL AX, AX
0x0051 00081 ($GOROOT/src/sync/mutex.go:219) MOVL $-1, BX
0x0056 00086 ($GOROOT/src/sync/mutex.go:219) LOCK
0x0057 00087 ($GOROOT/src/sync/mutex.go:219) XADDL BX, 8(CX)
0x005b 00091 ($GOROOT/src/sync/mutex.go:219) DECL BX
0x005d 00093 ($GOROOT/src/sync/mutex.go:219) NOP
0x0060 00096 ($GOROOT/src/sync/mutex.go:220) TESTL BX, BX
0x0062 00098 ($GOROOT/src/sync/mutex.go:220) JEQ 115
0x0064 00100 (concurrency/concurrency.go:41) MOVQ DX, <unlinkable>..autotmp_10+16(SP)
0x0069 00105 ($GOROOT/src/sync/mutex.go:223) PCDATA $1, $2
0x0069 00105 ($GOROOT/src/sync/mutex.go:223) CALL sync.(*Mutex).unlockSlow(SB)
0x006e 00110 (concurrency/concurrency.go:44) MOVQ <unlinkable>..autotmp_10+16(SP), DX
0x0073 00115 (concurrency/concurrency.go:44) MOVQ DX, AX
0x0076 00118 (concurrency/concurrency.go:44) MOVQ 40(SP), BP
0x007b 00123 (concurrency/concurrency.go:44) ADDQ $48, SP
0x007f 00127 (concurrency/concurrency.go:44) NOP
0x0080 00128 (concurrency/concurrency.go:44) RET
This one needs an initializer since we have channels in a struct.
I went with the default channel buffer size of 0.
That means every send will block until another goroutine receives.
This is fine because I’m sending and receiving in back-to-back expressions in getNext().
func NewGoroutineCounter() *GoroutineCounter {
var c GoroutineCounter
c.ch = make(chan bool)
c.out = make(chan uint64)
go func() {
for _ = range c.ch {
c.id++
c.out <- c.id
}
}()
return &c
}
func (c *GoroutineCounter) getNext() uint64 {
c.ch <- true
return <- c.out
}
Used by 1
<unlinkable>.NewGoroutineCounter STEXT size=170 args=0x0 locals=0x20 funcid=0x0 align=0x0
// ...
0x0020 00032 (concurrency/concurrency.go:47) CALL runtime.newobject(SB)
0x0025 00037 (concurrency/concurrency.go:47) MOVQ AX, <unlinkable>.&c+16(SP)
0x002a 00042 (concurrency/concurrency.go:48) XORL BX, BX
0x002c 00044 (concurrency/concurrency.go:48) LEAQ type.chan bool(SB), AX
0x0033 00051 (concurrency/concurrency.go:48) PCDATA $1, $1
0x0033 00051 (concurrency/concurrency.go:48) CALL runtime.makechan(SB)
0x0038 00056 (concurrency/concurrency.go:48) PCDATA $0, $-2
0x0038 00056 (concurrency/concurrency.go:48) CMPL runtime.writeBarrier(SB), $0
0x003f 00063 (concurrency/concurrency.go:48) NOP
0x0040 00064 (concurrency/concurrency.go:48) JNE 76
0x0042 00066 (concurrency/concurrency.go:48) MOVQ <unlinkable>.&c+16(SP), CX
0x0047 00071 (concurrency/concurrency.go:48) MOVQ AX, (CX)
0x004a 00074 (concurrency/concurrency.go:48) JMP 86
0x004c 00076 (concurrency/concurrency.go:48) MOVQ <unlinkable>.&c+16(SP), DI
0x0051 00081 (concurrency/concurrency.go:48) CALL runtime.gcWriteBarrier(SB)
0x0056 00086 (concurrency/concurrency.go:49) PCDATA $0, $-1
0x0056 00086 (concurrency/concurrency.go:49) LEAQ type.chan uint64(SB), AX
0x005d 00093 (concurrency/concurrency.go:49) XORL BX, BX
0x005f 00095 (concurrency/concurrency.go:49) NOP
0x0060 00096 (concurrency/concurrency.go:49) CALL runtime.makechan(SB)
0x0065 00101 (concurrency/concurrency.go:49) PCDATA $0, $-2
0x0065 00101 (concurrency/concurrency.go:49) CMPL runtime.writeBarrier(SB), $0
0x006c 00108 (concurrency/concurrency.go:49) JNE 121
0x006e 00110 (concurrency/concurrency.go:49) MOVQ <unlinkable>.&c+16(SP), CX
0x0073 00115 (concurrency/concurrency.go:49) MOVQ AX, 8(CX)
0x0077 00119 (concurrency/concurrency.go:49) JMP 144
0x0079 00121 (concurrency/concurrency.go:49) MOVQ <unlinkable>.&c+16(SP), DI
0x007e 00126 (concurrency/concurrency.go:49) LEAQ 8(DI), CX
0x0082 00130 (concurrency/concurrency.go:47) MOVQ DI, DX
0x0085 00133 (concurrency/concurrency.go:49) MOVQ CX, DI
0x0088 00136 (concurrency/concurrency.go:49) CALL runtime.gcWriteBarrier(SB)
0x008d 00141 (concurrency/concurrency.go:50) MOVQ DX, CX
0x0090 00144 (concurrency/concurrency.go:50) PCDATA $0, $-1
0x0090 00144 (concurrency/concurrency.go:50) MOVQ CX, AX
0x0093 00147 (concurrency/concurrency.go:50) MOVQ 24(SP), BP
0x0098 00152 (concurrency/concurrency.go:50) ADDQ $32, SP
0x009c 00156 (concurrency/concurrency.go:50) RET
func TestAtomicSynchronization(t *testing.T) {
var atomicCounter AtomicCounter
maxId := test(&atomicCounter)
if (int(maxId) != NUM_ITER) {
t.Error("Wrong max id", maxId, NUM_ITER)
}
}
func TestMutexSynchronization(t *testing.T) {
var mutexCounter MutexCounter
maxId := test(&mutexCounter)
if (int(maxId) != NUM_ITER) {
t.Error("Wrong max id", maxId, NUM_ITER)
}
}
func TestGoroutineSynchronization(t *testing.T) {
goroutineCounter := NewGoroutineCounter()
maxId := test(goroutineCounter)
t.Log(maxId)
if (int(maxId) != NUM_ITER) {
t.Error("Wrong max id", maxId, NUM_ITER)
}
}
func MaxUint64(m uint64, n uint64) uint64 {
if m > n {
return m
} else {
return n
}
}
➜ concurrent-programming-in-go git:(main) ✗ go clean -testcache
➜ concurrent-programming-in-go git:(main) ✗ go test -race ./concurrency
ok owoga.com/concurrency/concurrency 2.844s