tyltr技术窝

goroutine 是golang的一大特色,用户级别的轻量级线程。它是怎么调度的呢?本文主要讲述 goroutine的调度。

1.调度模型#

现在主流的线程模型分三种:内核级线程模型用户级线程模型两级线程模型(有的文档也称为混合型线程模型)。它们之间根本的区别就在于:线程与内核调度实体的对应关系。

1.1 内核级线程模型#

就是把所有的用户线程绑定到一个内核线程上(N:1的关系)。什么时候创建、终止、调度都是基于内核提供的系统调用。也就是说,完全靠操作系统调度

这种调度模型,因为在所有用户线程映射到同一个内核线程上,所有上下文的切换低,但不能充分利用多核资源了。

1.2 用户级线程模型#

每一个线程对应一个内核线程(1:1关系)。充分利用多核资源,上下文切换成本较高。

1.3 两级线程模型#

因为上面两种调度模型,都有各自的优劣点。对上面两种进行权衡,实现了(M:N)两级线程模型。既充分利用了内核资源,又尽可能的减少上下文切换的开销。goroutine就是采用的这种调度模型。这里的M:N 指的是M个 goroutine 运行在N个操作系统线程之上。内核负责这 N个操作系统线程的调度,这N个线程负责M个 goroutine 的调度与运行。

2.MPG模型#

2.1 简述#

有上文可知,M个 goroutine 的调度也是M:N模型。那么goroutine是如何调度的呢?所谓的对goroutine的调度,是指程序代码按照一定的算法在适当的时候挑选出合适的goroutine并放到CPU上去运行的过程,这些负责对goroutine进行调度的程序代码我们称之为goroutine调度器

由上面的简述可以知道,goroutine调度器会挑选适合的goroutine运行,那么从哪里去挑选goroutine呢?在切换的过程中状态信息又保存在哪里呢?带着这两个问题我们继续下面的内容。

goroutine的调度实际就是通过保存和修改CPU寄存器的值到达切换线程/goroutine的目的(话外语:简要概述起来运行在CPU上的线程,本质上就是把自己的状态信息放到了CPU寄存器上了。cpu上下文的切换的本质就是把旧线程在CPU寄存器的状态信息存入内存对应的寄存器中,把新线程保存在内存之中的寄存器的值放入CPU寄存器从而恢复新线程的运行

万变不离其宗,要想实现对goroutine的调度,就必须有一个保存CPU寄存器的值以及goroutine的状态信息的对象。这个对象(或者成为数据结构)就是GO语言中的G结构体,也就是MPG模型中的G。它保存着所有goroutine的状态信息,该结构体每个实例都代表一个goroutine。调度器通过g对象实现对goroutine的调度。当goroutine调离CPU时,调度器会把CPU寄存器的值保存到g对象的变量中;当goroutine被调度运行的时候,调度器会把CPU会把g对象的变量中值恢复到CPU寄存器中。

由上可以看出,G是goroutine的抽象。那么仅仅有这个抽象就可以完成goroutine的调度吗?显然不够,还记得我们的问题吗?( goroutine调度器会挑选适合的goroutine运行,那么从哪里去挑选goroutine呢? )。所以我们需要一个存放所有(或者可运行状态)的goroutine的容器,以便调度器寻找到可运行状态的goroutine,于是GO调度器引入的schedt结构体

schedt结构体既保存调度器自身的状态信息,又拥有一个保存goroutine的运行队列。因为每一个go程序只有一个调度器,而且在每一个go程序中,schedt结构体也只有一个实例对象。这个实例对象在源码中被定义为一个共享的全局变量,所以每一个工作线程都可以访问到它和它所拥有的goroutine的运行队列,因此这个运行队列被称为全局运行队列

根据上面讲述,我们可以初步地得到答案:G是goroutine的抽象,保存中goroutine的全部信息。每个Go程序中有且只有一个共享的全局运行队列保存可运行的goroutine(也可以说是全局队列中保存着可运行状态G)。

说到全局运行队列,那有没有局部运行队列呢?如果您有这种猜想,那说明您才思敏捷。每个工作线程中确实有一个私有的局部goroutine运行队列。那为啥有了全局运行队列还要有局部运行队列呢?这么说吧。既然是共享,也就意味着锁的存在,加锁势必会影响性能。如果每个工作线程有了各自私有的局部goroutine运行队列,那么就会避免的锁的问题,提升性能。工作线程优先在全局运行队列选取goroutine进行运行,但只有1/61的概率;如果没有获取到,再去自己的本地可运行列表中获取g。在Go调度器源代码中,局部运行队列被包含在p结构体的实例对象之中,每一个运行着go代码的工作线程都会与一个p结构体的实例对象关联在一起。

MPG模型中,PG都逐渐漏出庐山真面目了,那么M又是什么鬼?那你注意到上文中一直提到的‘工作线程’了吗?

Go调度器源代码中,M结构体就代表工作线程,每个工作线程都有唯一的一个M结构体的实例对象与之对应,M结构体对象除了记录着工作线程的诸如栈的起止位置、当前正在执行的goroutine以及是否空闲等等状态信息之外,还通过指针维持着与P结构体的实例对象之间的绑定关系。那么直奔主题吧。

MPG模型

  • M: 一个M直接关联了一个内核线程
  • P: 代表了M所需的上下文环境,也是处理用户级代码逻辑的处理器。
  • G: goroutine的抽象

2.2 源码分析#

上面我们把MPG等概念粗略的说了一遍。但MPG具体是什么鬼呢?那么咱们分析一下源码,源码文件runtime/runtime2.go

2.2.1 g结构体#

g结构体用于代表一个goroutine,该结构体保存了goroutine的所有信息,包括栈,gobuf结构体和其它的一些状态信息。下面看一下g的定义。因为g都会被放入调度队列之中,所以每个g中都有一个指向下一个g的指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
type g struct {
// Stack parameters.
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
// stackguard1 is the stack pointer compared in the C stack growth prologue.
// It is stack.lo+StackGuard on g0 and gsignal stacks.
// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).

// 记录该goroutine使用的栈
stack stack // offset known to runtime/cgo

// 下面两个成员用于栈溢出检查,实现栈的自动伸缩,抢占调度也会用到stackguard0
stackguard0 uintptr // offset known to liblink
stackguard1 uintptr // offset known to liblink

_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost defer


// 此goroutine正在被哪个工作线程执行
m *m // current m; offset known to arm liblink

// 保存调度信息,主要是几个寄存器的值
sched gobuf

syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
stktopsp uintptr // expected sp at top of stack, to check in traceback
param unsafe.Pointer // passed parameter on wakeup
atomicstatus uint32
stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
goid int64

// schedlink字段指向全局运行队列中的下一个g,
// 所有位于全局运行队列中的g形成一个链表
schedlink guintptr


waitsince int64 // approx time when the g become blocked
waitreason waitReason // if status==Gwaiting

// 抢占调度标志,如果需要抢占调度,设置preempt为true
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt

paniconfault bool // panic (instead of crash) on unexpected fault address
preemptscan bool // preempted g does scan for gc
gcscandone bool // g has scanned stack; protected by _Gscan bit in status
gcscanvalid bool // false at start of gc cycle, true if G has not run since last scan; TODO: remove?
throwsplit bool // must not split stack
raceignore int8 // ignore race detection events
sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine
sysexitticks int64 // cputicks when syscall has returned (for tracing)
traceseq uint64 // trace event sequencer
tracelastp puintptr // last P emitted an event for this goroutine
lockedm muintptr
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
gopc uintptr // pc of go statement that created this goroutine
ancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors)
startpc uintptr // pc of goroutine function
racectx uintptr
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
cgoCtxt []uintptr // cgo traceback context
labels unsafe.Pointer // profiler labels
timer *timer // cached timer for time.Sleep
selectDone uint32 // are we participating in a select and did someone win the race?

// Per-G GC state

// gcAssistBytes is this G's GC assist credit in terms of
// bytes allocated. If this is positive, then the G has credit
// to allocate gcAssistBytes bytes without assisting. If this
// is negative, then the G must correct this by performing
// scan work. We track this in bytes to make it fast to update
// and check for debt in the malloc hot path. The assist ratio
// determines how this corresponds to scan work debt.
gcAssistBytes int64
}

在上面源码里,定义了stack类型的数据,那么stack 是什么鬼?
stack结构体主要用来记录goroutine所使用的栈的信息,包括栈顶和栈底位置

1
2
3
4
5
6
7
8
9
// Stack describes a Go execution stack.
// The bounds of the stack are exactly [lo, hi),
// with no implicit data structures on either side.

//用于记录goroutine使用的栈的起始和结束位置
type stack struct {
lo uintptr // 栈顶,指向内存低地址
hi uintptr // 栈底,指向内存高地址
}

2.2.2 m结构体#

m结构体用来代表工作线程,它保存了m自身使用的栈信息,当前正在运行的goroutine以及与m绑定的p等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type m struct{
// g0主要用来记录工作线程使用的栈信息,在执行调度代码时需要使用这个栈
// 执行用户goroutine代码时,使用用户goroutine自己的栈,调度时会发生栈的切换
g0 *g // goroutine with scheduling stack

// 通过TLS实现m结构体对象与工作线程之间的绑定
tls [6]uintptr // thread-local storage (for x86 extern register)
mstartfn func()
// 指向工作线程正在运行的goroutineg结构体对象
curg *g // current running goroutine

// 记录与当前工作线程绑定的p结构体对象
p puintptr// attached p for executing go code (nil if not executing go code)
nextp puintptr
oldp puintptr// the p that was attached before executing a syscall

// spinning状态:表示当前工作线程正在试图从其它工作线程的本地运行队列偷取goroutine
spinning bool// m is out of work and is actively looking for work
blocked bool// m is blocked on a note

// 没有goroutine需要运行时,工作线程睡眠在这个park成员上,
// 其它线程通过这个park唤醒该工作线程
park note
// 记录所有工作线程的一个链表
alllink *m// on allm
schedlink muintptr

// Linux平台thread的值就是操作系统线程ID
thread uintptr// thread handle
freelink *m // on sched.freem

......

2.2.3 p结构体#

p结构体用于保存工作线程执行go代码时所必需的资源,比如goroutine的运行队列,内存分配用到的缓存等等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
type p struct {
id int32
status uint32 // one of pidle/prunning/...
link puintptr
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)
mcache *mcache
raceprocctx uintptr

deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
deferpoolbuf [5][32]*_defer

// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
goidcache uint64
goidcacheend uint64

// Queue of runnable goroutines. Accessed without lock.
// 本地goroutine运行队列
runqhead uint32 // 列表头
runqtail uint32 // 列表尾
runq [256]guintptr //使用数组实现的循环队列
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
runnext guintptr

// Available G's (status == Gdead)
gFree struct {
gList
n int32
}

sudogcache []*sudog
sudogbuf [128]*sudog

tracebuf traceBufPtr

// traceSweep indicates the sweep events should be traced.
// This is used to defer the sweep start event until a span
// has actually been swept.
traceSweep bool
// traceSwept and traceReclaimed track the number of bytes
// swept and reclaimed by sweeping in the current sweep loop.
traceSwept, traceReclaimed uintptr

palloc persistentAlloc // per-P to avoid mutex

_ uint32 // Alignment for atomic fields below

// Per-P GC state
gcAssistTime int64 // Nanoseconds in assistAlloc
gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)
gcBgMarkWorker guintptr // (atomic)
gcMarkWorkerMode gcMarkWorkerMode

// gcMarkWorkerStartTime is the nanotime() at which this mark
// worker started.
gcMarkWorkerStartTime int64

// gcw is this P's GC work buffer cache. The work buffer is
// filled by write barriers, drained by mutator assists, and
// disposed on certain GC state transitions.
gcw gcWork

// wbBuf is this P's GC write barrier buffer.
//
// TODO: Consider caching this in the running G.
wbBuf wbBuf

runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point

pad cpu.CacheLinePad
}

2.2.4 schedt结构体#

schedt结构体用来保存调度器的状态信息和goroutine的全局运行队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type schedt struct {
// accessed atomically. keep at top to ensure alignment on 32-bit systems.
goidgen uint64
lastpoll uint64

lock mutex

// When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
// sure to call checkdead().

// 由空闲的工作线程组成链表
midle muintptr// idle m's waiting for work
// 空闲的工作线程的数量
nmidle int32 // number of idle m's waiting for work
nmidlelockedint32 // number of locked m's waiting for work
mnext int64 // number of m's that have been created and next M ID
// 最多只能创建maxmcount个工作线程
maxmcount int32 // maximum number of m's allowed (or die)
nmsys int32 // number of system m's not counted for deadlock
nmfreed int64 // cumulative number of freed m's

ngsys uint32// number of system goroutines; updated atomically

// 由空闲的p结构体对象组成的链表
pidle puintptr// idle p's
// 空闲的p结构体对象的数量
npidle uint32
nmspinning uint32// See "Worker thread parking/unparking" comment in proc.go.

// Global runnable queue.
// goroutine全局运行队列
runq gQueue
runqsize int32

......

// Global cache of dead G's.
// gFree是所有已经退出的goroutine对应的g结构体对象组成的链表
// 用于缓存g结构体对象,避免每次创建goroutine时都重新分配内存
gFree struct{
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}

......
}

2.2.5 其他结构体#

1
2
3
4
5
6
7
8
9
10
11
allgs      []*g     // 保存所有的g
allm *m // 所有的m构成的一个链表,包括下面的m0
allp []*p // 保存所有的p,len(allp) == gomaxprocs,gomaxprocs就是设置P的数量

ncpu int32 // 系统中cpu核的数量,程序启动时由runtime代码初始化
gomaxprocs int32 // p的最大值,默认等于ncpu,但可以通过GOMAXPROCS修改

sched schedt // 调度器结构体对象,记录了调度器的工作状态

m0 m // 代表进程的主线程
g0 g // m0的g0,也就是m0.g0 = &g0

在程序初始化时,所有的变量都是对应类型的零值。所以程序刚启动时allgs,allm和allp都不包含任何g,m和p。

3.调度策略#

上面我们说了一大堆源码,即繁琐又看着一大堆。那么这些结构体怎么进行调度的呢?既然说到调度策略,那我们就看看调度的函数中到底发生什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407

// 一轮调度,发现可运行的g并且运行.没有返回值
func schedule() {
// 回去goroutine,返回*g
// getg returns the pointer to the current g.
_g_ := getg()

if _g_.m.locks != 0 {
throw("schedule: holding locks")
}

if _g_.m.lockedg != 0 {
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}

// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m's g0 stack.
if _g_.m.incgo {
throw("schedule: in cgo")
}

top:
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _g_.m.p.ptr().runSafePointFn != 0 {
runSafePointFn()
}

var gp *g
var inheritTime bool

// Normal goroutines will check for need to wakeP in ready,
// but GCworkers and tracereaders will not, so the check must
// be done here instead.
tryWakeP := false
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
tryWakeP = tryWakeP || gp != nil
}

// 如果gp是nil
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.

// 每个工作线程,每1/61的在全局运行列表中回去goroutine
// 并且全局运行队列的长度必须大于0,即列表不为空

if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
// sched 是schedt结构对象,定义于runtime/runtime2.go的一个变量
// 对于共享的全局运行列表,上锁
lock(&sched.lock)

// 在全局运行列表中获取1个可运行的g
gp = globrunqget(_g_.m.p.ptr(), 1)

// 解锁
unlock(&sched.lock)
}
}

// 如果没有获取g,那么再去本地可运行的列表中获取
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}

//如果从本地运行队列和全局运行队列都没有找到需要运行的goroutine,
//则调用findrunnable函数从其它工作线程的运行队列中偷取,如果偷取不到,则当前工作线程会被阻塞,
//直到获取到需要运行的goroutine之后findrunnable函数才会返回。
// findrunnable定义中使用了标签top,实现自旋
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}

// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning {
resetspinning()
}

if sched.disable.user && !schedEnabled(gp) {
// Scheduling of this goroutine is disabled. Put it on
// the list of pending runnable goroutines for when we
// re-enable user scheduling and look again.
lock(&sched.lock)
if schedEnabled(gp) {
// Something re-enabled scheduling while we
// were acquiring the lock.
unlock(&sched.lock)
} else {
sched.disable.runnable.pushBack(gp)
sched.disable.n++
unlock(&sched.lock)
goto top
}
}

// If about to schedule a not-normal goroutine (a GCworker or tracereader),
// wake a P if there is one.
if tryWakeP {
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
}
if gp.lockedm != 0 {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp)
goto top
}

execute(gp, inheritTime)
}


// globrunqget 定义
func globrunqget(_p_ *p, max int32) *g {
// 此函数是线程非安全的,所以使用这个函数必须上锁
// 全局可运行列表为空,则返回nil
if sched.runqsize == 0 {
return nil
}
// 这是为了全局运行列表中的g 分配均衡
// gomaxprocs 由runtime.GOMAXPROCS()设置
// 为什么+1呢,原因很简单
// 如果 sched.runqsize< gomaxprocs 那么sched.runqsize/gomaxprocs==0
// 所以不加1的话,那么这些g就不会被调用到
n := sched.runqsize/gomaxprocs + 1

// 这种是为了防止sched.runqsize=2 gomaxprocs=1
// 那么n为3,超过sched.runqsize
if n > sched.runqsize {
n = sched.runqsize
}

// 获取多少个g,以传入的参数max为准
if max > 0 && n > max {
n = max
}
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}

sched.runqsize -= n

// 将可运行的g,pop出全局运行列队
gp := sched.runq.pop()
n--
for ; n > 0; n-- {
gp1 := sched.runq.pop()
// 把从全局队列中获得的g放入本地队列中
runqput(_p_, gp1, false)
}
return gp
}


// findrunnable的定义
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()

// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.

// 没有在其他p的本地运行队列中找到,则返回top
top:
_p_ := _g_.m.p.ptr()
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _p_.runSafePointFn != 0 {
runSafePointFn()
}
if fingwait && fingwake {
if gp := wakefing(); gp != nil {
ready(gp, 0, true)
}
}
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}

//在偷取其他的p的本地可运行队列中获取g之前
// 优先在本地的队列中获取,如果没有的获取到g
// 再去全局队列中获取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}

// global runq
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}

// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}


// Steal work from other P's.
// 如果在本地和全局列表中没有获取到g
// 再去其他P的本地列表中偷取
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
// New work can appear from returning syscall/cgocall, network or timers.
// Neither of that submits to local run queues, so no point in stealing.

goto stop
}
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}

stop:

// We have nothing to do. If we're in the GC mark phase, can
// safely scan and blacken objects, and have work to do, run
// idle-time marking rather than give up the P.
if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
gp := _p_.gcBgMarkWorker.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}

// wasm only:
// If a callback returned and no other goroutine is awake,
// then pause execution until a callback was triggered.
if beforeIdle() {
// At least one goroutine got woken.
goto top
}

// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don't need to snapshot the contents because
// everything up to cap(allp) is immutable.
allpSnapshot := allp

// return P and block
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)

// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}

// check all runqueues once again
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}

// Check for idle-priority GC work again.
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
lock(&sched.lock)
_p_ = pidleget()
if _p_ != nil && _p_.gcBgMarkWorker == 0 {
pidleput(_p_)
_p_ = nil
}
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// Go back to idle GC check.
goto stop
}
}

// poll network
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
list := netpoll(true) // block until new work is available
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if !list.empty() {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
injectglist(&list)
}
}
stopm()
goto top
}

上面是源码的分析,下面对schedule()进行梳理:
注:schedule()是g调度的执行函数。

    1. 如果gp为空的时候,先从全局可运行列表中获取g。前提全局可运行列表必须不为空,并且有1/61的概率(使用mod61==0的方式)获取1个可运行的g,并保存到p的本地可运行列表中。
    1. 如果不是1/61概率(即mod61!=0)或者没有在全局可运行队列中获取g,那么就在本地的可运行列表中进行获取。
    1. 如果在全局和本地可运行列表都没有获取g,则会调用findrunnable()函数。在执行findrunnable()函数时,会先检查本地和全局的列表中进行获取g。如果没有获取到g,那么会在其他的P中偷取g。如果没有获取到g,那么findrunnable()进入自旋状态。也就是说schedule()处于阻塞状态

参考#