type g struct { // Stack parameters. // stack describes the actual stack memory: [stack.lo, stack.hi). // stackguard0 is the stack pointer compared in the Go stack growth prologue. // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption. // stackguard1 is the stack pointer compared in the C stack growth prologue. // It is stack.lo+StackGuard on g0 and gsignal stacks. // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash). // 记录该goroutine使用的栈 stack stack // offset known to runtime/cgo // 下面两个成员用于栈溢出检查,实现栈的自动伸缩,抢占调度也会用到stackguard0 stackguard0 uintptr// offset known to liblink stackguard1 uintptr// offset known to liblink
_panic *_panic // innermost panic - offset known to liblink _defer *_defer // innermost defer
// 此goroutine正在被哪个工作线程执行 m *m // current m; offset known to arm liblink // 保存调度信息,主要是几个寄存器的值 sched gobuf syscallsp uintptr// if status==Gsyscall, syscallsp = sched.sp to use during gc syscallpc uintptr// if status==Gsyscall, syscallpc = sched.pc to use during gc stktopsp uintptr// expected sp at top of stack, to check in traceback param unsafe.Pointer // passed parameter on wakeup atomicstatus uint32 stackLock uint32// sigprof/scang lock; TODO: fold in to atomicstatus goid int64 // schedlink字段指向全局运行队列中的下一个g, // 所有位于全局运行队列中的g形成一个链表 schedlink guintptr
waitsince int64// approx time when the g become blocked waitreason waitReason // if status==Gwaiting // 抢占调度标志,如果需要抢占调度,设置preempt为true preempt bool// preemption signal, duplicates stackguard0 = stackpreempt paniconfault bool// panic (instead of crash) on unexpected fault address preemptscan bool// preempted g does scan for gc gcscandone bool// g has scanned stack; protected by _Gscan bit in status gcscanvalid bool// false at start of gc cycle, true if G has not run since last scan; TODO: remove? throwsplit bool// must not split stack raceignore int8// ignore race detection events sysblocktraced bool// StartTrace has emitted EvGoInSyscall about this goroutine sysexitticks int64// cputicks when syscall has returned (for tracing) traceseq uint64// trace event sequencer tracelastp puintptr // last P emitted an event for this goroutine lockedm muintptr sig uint32 writebuf []byte sigcode0 uintptr sigcode1 uintptr sigpc uintptr gopc uintptr// pc of go statement that created this goroutine ancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors) startpc uintptr// pc of goroutine function racectx uintptr waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order cgoCtxt []uintptr// cgo traceback context labels unsafe.Pointer // profiler labels timer *timer // cached timer for time.Sleep selectDone uint32// are we participating in a select and did someone win the race?
// Per-G GC state
// gcAssistBytes is this G's GC assist credit in terms of // bytes allocated. If this is positive, then the G has credit // to allocate gcAssistBytes bytes without assisting. If this // is negative, then the G must correct this by performing // scan work. We track this in bytes to make it fast to update // and check for debt in the malloc hot path. The assist ratio // determines how this corresponds to scan work debt. gcAssistBytes int64 }
type p struct { id int32 status uint32// one of pidle/prunning/... link puintptr schedtick uint32// incremented on every scheduler call syscalltick uint32// incremented on every system call sysmontick sysmontick // last tick observed by sysmon m muintptr // back-link to associated m (nil if idle) mcache *mcache raceprocctx uintptr
deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go) deferpoolbuf [5][32]*_defer
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. goidcache uint64 goidcacheend uint64
// Queue of runnable goroutines. Accessed without lock. // 本地goroutine运行队列 runqhead uint32// 列表头 runqtail uint32// 列表尾 runq [256]guintptr //使用数组实现的循环队列 // runnext, if non-nil, is a runnable G that was ready'd by // the current G and should be run next instead of what's in // runq if there's time remaining in the running G's time // slice. It will inherit the time left in the current time // slice. If a set of goroutines is locked in a // communicate-and-wait pattern, this schedules that set as a // unit and eliminates the (potentially large) scheduling // latency that otherwise arises from adding the ready'd // goroutines to the end of the run queue. runnext guintptr
// Available G's (status == Gdead) gFree struct { gList n int32 }
sudogcache []*sudog sudogbuf [128]*sudog
tracebuf traceBufPtr
// traceSweep indicates the sweep events should be traced. // This is used to defer the sweep start event until a span // has actually been swept. traceSweep bool // traceSwept and traceReclaimed track the number of bytes // swept and reclaimed by sweeping in the current sweep loop. traceSwept, traceReclaimed uintptr
palloc persistentAlloc // per-P to avoid mutex
_ uint32// Alignment for atomic fields below
// Per-P GC state gcAssistTime int64// Nanoseconds in assistAlloc gcFractionalMarkTime int64// Nanoseconds in fractional mark worker (atomic) gcBgMarkWorker guintptr // (atomic) gcMarkWorkerMode gcMarkWorkerMode
// gcMarkWorkerStartTime is the nanotime() at which this mark // worker started. gcMarkWorkerStartTime int64
// gcw is this P's GC work buffer cache. The work buffer is // filled by write barriers, drained by mutator assists, and // disposed on certain GC state transitions. gcw gcWork
// wbBuf is this P's GC write barrier buffer. // // TODO: Consider caching this in the running G. wbBuf wbBuf
runSafePointFn uint32// if 1, run sched.safePointFn at next safe point
type schedt struct { // accessed atomically. keep at top to ensure alignment on 32-bit systems. goidgen uint64 lastpoll uint64
lock mutex
// When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be // sure to call checkdead().
// 由空闲的工作线程组成链表 midle muintptr// idle m's waiting for work // 空闲的工作线程的数量 nmidle int32// number of idle m's waiting for work nmidlelockedint32 // number of locked m's waiting for work mnext int64// number of m's that have been created and next M ID // 最多只能创建maxmcount个工作线程 maxmcount int32// maximum number of m's allowed (or die) nmsys int32// number of system m's not counted for deadlock nmfreed int64// cumulative number of freed m's
ngsys uint32// number of system goroutines; updated atomically
// 由空闲的p结构体对象组成的链表 pidle puintptr// idle p's // 空闲的p结构体对象的数量 npidle uint32 nmspinning uint32// See "Worker thread parking/unparking" comment in proc.go.
// Global runnable queue. // goroutine全局运行队列 runq gQueue runqsize int32
......
// Global cache of dead G's. // gFree是所有已经退出的goroutine对应的g结构体对象组成的链表 // 用于缓存g结构体对象,避免每次创建goroutine时都重新分配内存 gFree struct{ lock mutex stack gList // Gs with stacks noStack gList // Gs without stacks n int32 } ...... }
// 一轮调度,发现可运行的g并且运行.没有返回值 funcschedule() { // 回去goroutine,返回*g // getg returns the pointer to the current g. _g_ := getg()
if _g_.m.locks != 0 { throw("schedule: holding locks") }
if _g_.m.lockedg != 0 { stoplockedm() execute(_g_.m.lockedg.ptr(), false) // Never returns. }
// We should not schedule away from a g that is executing a cgo call, // since the cgo call is using the m's g0 stack. if _g_.m.incgo { throw("schedule: in cgo") }
top: if sched.gcwaiting != 0 { gcstopm() goto top } if _g_.m.p.ptr().runSafePointFn != 0 { runSafePointFn() }
var gp *g var inheritTime bool
// Normal goroutines will check for need to wakeP in ready, // but GCworkers and tracereaders will not, so the check must // be done here instead. tryWakeP := false if trace.enabled || trace.shutdown { gp = traceReader() if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) traceGoUnpark(gp, 0) tryWakeP = true } } if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) tryWakeP = tryWakeP || gp != nil } // 如果gp是nil if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. // 每个工作线程,每1/61的在全局运行列表中回去goroutine // 并且全局运行队列的长度必须大于0,即列表不为空
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { // sched 是schedt结构对象,定义于runtime/runtime2.go的一个变量 // 对于共享的全局运行列表,上锁 lock(&sched.lock) // 在全局运行列表中获取1个可运行的g gp = globrunqget(_g_.m.p.ptr(), 1) // 解锁 unlock(&sched.lock) } } // 如果没有获取g,那么再去本地可运行的列表中获取 if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) if gp != nil && _g_.m.spinning { throw("schedule: spinning with local work") } } //如果从本地运行队列和全局运行队列都没有找到需要运行的goroutine, //则调用findrunnable函数从其它工作线程的运行队列中偷取,如果偷取不到,则当前工作线程会被阻塞, //直到获取到需要运行的goroutine之后findrunnable函数才会返回。 // findrunnable定义中使用了标签top,实现自旋 if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available }
// This thread is going to run a goroutine and is not spinning anymore, // so if it was marked as spinning we need to reset it now and potentially // start a new spinning M. if _g_.m.spinning { resetspinning() }
if sched.disable.user && !schedEnabled(gp) { // Scheduling of this goroutine is disabled. Put it on // the list of pending runnable goroutines for when we // re-enable user scheduling and look again. lock(&sched.lock) if schedEnabled(gp) { // Something re-enabled scheduling while we // were acquiring the lock. unlock(&sched.lock) } else { sched.disable.runnable.pushBack(gp) sched.disable.n++ unlock(&sched.lock) goto top } }
// If about to schedule a not-normal goroutine (a GCworker or tracereader), // wake a P if there is one. if tryWakeP { if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { wakep() } } if gp.lockedm != 0 { // Hands off own p to the locked m, // then blocks waiting for a new p. startlockedm(gp) goto top }
// 这种是为了防止sched.runqsize=2 gomaxprocs=1 // 那么n为3,超过sched.runqsize if n > sched.runqsize { n = sched.runqsize }
// 获取多少个g,以传入的参数max为准 if max > 0 && n > max { n = max } if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 }
sched.runqsize -= n
// 将可运行的g,pop出全局运行列队 gp := sched.runq.pop() n-- for ; n > 0; n-- { gp1 := sched.runq.pop() // 把从全局队列中获得的g放入本地队列中 runqput(_p_, gp1, false) } return gp }
// findrunnable的定义 // Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from global queue, poll network. funcfindrunnable()(gp *g, inheritTime bool) { _g_ := getg()
// The conditions here and in handoffp must agree: if // findrunnable would return a G to run, handoffp must start // an M.
// 没有在其他p的本地运行队列中找到,则返回top top: _p_ := _g_.m.p.ptr() if sched.gcwaiting != 0 { gcstopm() goto top } if _p_.runSafePointFn != 0 { runSafePointFn() } if fingwait && fingwake { if gp := wakefing(); gp != nil { ready(gp, 0, true) } } if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } //在偷取其他的p的本地可运行队列中获取g之前 // 优先在本地的队列中获取,如果没有的获取到g // 再去全局队列中获取 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime }
// global runq if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } }
// Poll network. // This netpoll is only an optimization before we resort to stealing. // We can safely skip it if there are no waiters or a thread is blocked // in netpoll already. If there is any kind of logical race with that // blocked thread (e.g. it has already returned from netpoll, but does // not set lastpoll yet), this thread will do blocking netpoll below // anyway. if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(false); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } }
// Steal work from other P's. // 如果在本地和全局列表中没有获取到g // 再去其他P的本地列表中偷取 procs := uint32(gomaxprocs) if atomic.Load(&sched.npidle) == procs-1 { // Either GOMAXPROCS=1 or everybody, except for us, is idle already. // New work can appear from returning syscall/cgocall, network or timers. // Neither of that submits to local run queues, so no point in stealing. goto stop } // If number of spinning M's >= number of busy P's, block. // This is necessary to prevent excessive CPU consumption // when GOMAXPROCS>>1 but the program parallelism is low. if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2// first look for ready queues with more than 1 g if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } }
stop:
// We have nothing to do. If we're in the GC mark phase, can // safely scan and blacken objects, and have work to do, run // idle-time marking rather than give up the P. if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) { _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := _p_.gcBgMarkWorker.ptr() casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false }
// wasm only: // If a callback returned and no other goroutine is awake, // then pause execution until a callback was triggered. if beforeIdle() { // At least one goroutine got woken. goto top }
// Before we drop our P, make a snapshot of the allp slice, // which can change underfoot once we no longer block // safe-points. We don't need to snapshot the contents because // everything up to cap(allp) is immutable. allpSnapshot := allp
// return P and block lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state, // potentially concurrently with submission of new goroutines. We must // drop nmspinning first and then check all per-P queues again (with // #StoreLoad memory barrier in between). If we do it the other way around, // another thread can submit a goroutine after we've checked all run queues // but before we drop nmspinning; as the result nobody will unpark a thread // to run the goroutine. // If we discover new work below, we need to restore m.spinning as a signal // for resetspinning to unpark a new worker thread (because there can be more // than one starving goroutine). However, if after discovering new work // we also observe no idle Ps, it is OK to just park the current thread: // the system is fully loaded so no spinning threads are required. // Also see "Worker thread parking/unparking" comment at the top of the file. wasSpinning := _g_.m.spinning if _g_.m.spinning { _g_.m.spinning = false ifint32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } }
// check all runqueues once again for _, _p_ := range allpSnapshot { if !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } }
// Check for idle-priority GC work again. if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) { lock(&sched.lock) _p_ = pidleget() if _p_ != nil && _p_.gcBgMarkWorker == 0 { pidleput(_p_) _p_ = nil } unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // Go back to idle GC check. goto stop } }
// poll network if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { if _g_.m.p != 0 { throw("findrunnable: netpoll with p") } if _g_.m.spinning { throw("findrunnable: netpoll with spinning") } list := netpoll(true) // block until new work is available atomic.Store64(&sched.lastpoll, uint64(nanotime())) if !list.empty() { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } injectglist(&list) } } stopm() goto top }