Back

M找工作

M找工作

从本地队列中找goroutine

// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
// 从本地可运行队列中获取 g。 如果inheritTime为true,gp应该继承当前时间片的剩余时间。 否则,它应该开始一个新的时间片。 仅由所有者 P 执行。
func runqget(_p_ *p) (gp *g, inheritTime bool) {
	// If there's a runnext, it's the next G to run.
    // 如果有 runnext,它就是下一个要运行的 G。
	next := _p_.runnext
	// If the runnext is non-0 and the CAS fails, it could only have been stolen by another P,
	// because other Ps can race to set runnext to 0, but only the current P can set it to non-0.
	// Hence, there's no need to retry this CAS if it falls.
    // 如果runnext不为空
    // 用原子操作获取 runnext,并将其值修改为 0,也就是空。这里用到原子操作的原因是防止在这个过程中,有其他线程过来“偷工作”,导致并发修改 runnext 成员。
	if next != 0 && _p_.runnext.cas(next, 0) {
        // 返回next所指向的G,且需要继承时间片
		return next.ptr(), true
	}
    // 在尝试获取runnext失败后,尝试从本地队列中返回队列头的goroutine
	for {
        // 获取队列头,原子操作
		h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
		// 获取队列尾,不使用原子操作是因为不用担心其他线程同时修改,(偷工作只会修改队列头)
        t := _p_.runqtail
		if t == h {
            // 队列头和尾相等,说明队列为空,找不到g
			return nil, false
		}
        // 获取队列头的g,算出队列头指向的goroutine
		gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        // CAS原子操作尝试修改队列头,防止这中间被其他线程因为偷工作而修改
		if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
			return gp, false
		}
	}
}

从全局队列中找goroutine

// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
// 为了公平,每调用schedule调度函数61次,就要从全局可运行goroutine队列中获取
// 从全局队列中获取需要上锁
if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
    lock(&sched.lock)
    // 从全局队列中最大获取1个goroutine
    gp = globrunqget(_p_, 1)
    unlock(&sched.lock)
    if gp != nil {
        return gp, false, false
    }
}

从全局队列中获取goroutine:

首先根据全局队列的可运行 goroutine 长度和 P 的总数,来计算一个数值,表示每个 P 可平均分到的 goroutine 数量。

然后根据函数参数中的 max 以及 P 本地队列的长度来决定把多少全局队列中的 goroutine 转移到 P 本地。

最后,for 循环挨个把全局队列中 n-1 个 goroutine 转移到本地,并且返回最开始获取到的队列头所指向的 goroutine,毕竟它最需要得到运行的机会。

// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
// 尝试从全局队列中获取一批goroutine
func globrunqget(_p_ *p, max int32) *g {
	assertLockHeld(&sched.lock)
	// 如果队列为空,返回nil
	if sched.runqsize == 0 {
		return nil
	}
	// 根据p的数量,平分全局运行队列中的goroutines
	n := sched.runqsize/gomaxprocs + 1
	if n > sched.runqsize {
		n = sched.runqsize // 如果 gomaxprocs 为 1
	}
    // 修正"偷"的数量
	if max > 0 && n > max {
		n = max
	}
    // 最多只能"偷"本地工作队列一半的数据
	if n > int32(len(_p_.runq))/2 {
		n = int32(len(_p_.runq)) / 2
	}
	// 更新全局可运行队列的长度
	sched.runqsize -= n
    // 从队列头中获取g
	gp := sched.runq.pop()
	n--
    // 批量将全局队列中的g转移到本地队列
	for ; n > 0; n-- {
		gp1 := sched.runq.pop()
        // 将gp1放入p本地,使全局队列得到更多的执行机会
		runqput(_p_, gp1, false)
	}
    // 返回最开始获取到的队列头所指向的 goroutine
	return gp
}

从其他P中窃取

// 从其他地方找goroutine来执行
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
	_g_ := getg()

	// The conditions here and in handoffp must agree: if
	// findrunnable would return a G to run, handoffp must start
	// an M.

top:
    // 省略了一部分代码
	....

	// local runq
    // 从本地队列中获取
	if gp, inheritTime := runqget(_p_); gp != nil {
		return gp, inheritTime, false
	}

	// global runq
    // 从全局队列的头部获取一个g
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}
	// 省略代码
	............

	// Spinning Ms: steal work from other Ps.
	procs := uint32(gomaxprocs)
   
	if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
		if !_g_.m.spinning {
            // 设置为自旋状态
			_g_.m.spinning = true
            // 自旋状态加1
			atomic.Xadd(&sched.nmspinning, 1)
		}
		// 窃取工作,从其他p的本地队列中窃取goroutine
		gp, inheritTime, tnow, w, newWork := stealWork(now)
		now = tnow
		if gp != nil {
			// Successfully stole.
			return gp, inheritTime, false
		}
		if newWork {
			// There may be new timer or GC work; restart to
			// discover.
			goto top
		}
		if w != 0 && (pollUntil == 0 || w < pollUntil) {
			// Earlier timer to wait for.
			pollUntil = w
		}
	}

	// 省略代码
    .......

	// return P and block
	lock(&sched.lock)
	if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
		unlock(&sched.lock)
		goto top
	}
	if sched.runqsize != 0 {
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		return gp, false, false
	}
    // // 当前工作线程解除与 p 之间的绑定,准备去休眠
	if releasep() != _p_ {
		throw("findrunnable: wrong p")
	}
    // 把p放入空闲队列
	now = pidleput(_p_, now)
	unlock(&sched.lock)

	wasSpinning := _g_.m.spinning
	if _g_.m.spinning {
         // M即将休眠不在处于自旋状态
		_g_.m.spinning = false
		if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
			throw("findrunnable: negative nmspinning")
		}

		// Check all runqueues once again.
        // 再次检查所有运行队列
		_p_ = checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
		if _p_ != nil {
			acquirep(_p_)
			_g_.m.spinning = true
			atomic.Xadd(&sched.nmspinning, 1)
			goto top
		}
		// 省略代码
		.....
	}
	// 省略代码
	.....
    // 休眠
	stopm()
	goto top
}
// 窃取工作,从其他p的本地队列中窃取goroutine
// 
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
	pp := getg().m.p.ptr()

	ranTimer := false
	// 会尝试4次扫遍所有的G
	const stealTries = 4
	for i := 0; i < stealTries; i++ {
		stealTimersOrRunNextG := i == stealTries-1
		// 第二层循环,不是每次按照固定的顺序去遍历所有的G
        // 遍历所有的g,每次遍历开始时随机选择一个位置,直到所有p遍历完
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			if sched.gcwaiting != 0 {
				// GC work may be available.
				return nil, false, now, pollUntil, true
			}
            // 随机获取一个p
			p2 := allp[enum.position()]
			if pp == p2 {
				continue
			}

			// Steal timers from p2. This call to checkTimers is the only place
			// where we might hold a lock on a different P's timers. We do this
			// once on the last pass before checking runnext because stealing
			// from the other P's runnext should be the last resort, so if there
			// are timers to steal do that first.
			//
			// We only check timers on one of the stealing iterations because
			// the time stored in now doesn't change in this loop and checking
			// the timers for each P more than once with the same value of now
			// is probably a waste of time.
			//
			// timerpMask tells us whether the P may have timers at all. If it
			// can't, no need to check at all.
			if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
				tnow, w, ran := checkTimers(p2, now)
				now = tnow
				if w != 0 && (pollUntil == 0 || w < pollUntil) {
					pollUntil = w
				}
				if ran {
					// Running the timers may have
					// made an arbitrary number of G's
					// ready and added them to this P's
					// local run queue. That invalidates
					// the assumption of runqsteal
					// that it always has room to add
					// stolen G's. So check now if there
					// is a local G to run.
					if gp, inheritTime := runqget(pp); gp != nil {
						return gp, inheritTime, now, pollUntil, ranTimer
					}
					ranTimer = true
				}
			}

			// Don't bother to attempt to steal if p2 is idle.
            // 确定好准备偷的对象后
			if !idlepMask.read(enum.position()) {
                // 调用runqsteal执行
				if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
					return gp, false, now, pollUntil, ranTimer
				}
			}
		}
	}

	// No goroutines found to steal. Regardless, running a timer may have
	// made some goroutine ready that we missed. Indicate the next timer to
	// wait for.
	return nil, false, now, pollUntil, ranTimer
}

runqsteal:

// 从 p2 的本地可运行队列中窃取一半元素并放入 p 的本地可运行队列中。 返回被盗元素之一(如果失败则返回 nil)。
// 从p2中偷走一半的工作放入到_p_的本地
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    // 队尾
	t := _p_.runqtail
    // 从p2中偷取工作放入到_p_.runq队尾
	n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
	if n == 0 {
		return nil
	}
	n--
    // 找到最后一个g准备返回
	gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
	if n == 0 {
		return gp
	}
    // 队列头
	h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    // 判断是否偷太多
	if t-h+n >= uint32(len(_p_.runq)) {
		throw("runqsteal: runq overflow")
	}
    // 更新队尾,将偷来的工作加入队列
	atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
	return gp
}

runqgrab:

// 从p2中偷取工作放入到_p_.runq队尾
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
// 从_p_中获取可运行的 goroutine 放到batch数组里
// batch是一个环,起始于batchead
// 返回偷的数量,返回的goroutine可被任何p执行
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
	for {
        // 队列头
		h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
		// 队列尾
        t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
		//g的数量
        n := t - h
        // 取一半
		n = n - n/2
		if n == 0 {
			if stealRunNextG {
				// Try to steal from _p_.runnext.
                // 从runnext中偷,没有人性
				if next := _p_.runnext; next != 0 {
					if _p_.status == _Prunning {
				 // 这里是为了防止 _p_ 执行当前 g,并且马上就要阻塞,所以会马上执行 runnext,
                    // 这个时候偷就没必要了,因为让 g 在 P 之间"游走"不太划算,
                    // 就不偷了,给他们一个机会。
                    // channel 一次同步的的接收发送需要 50ns 左右,因此 3us 差不多给了他们 50 次机会了,做得还是不错的
						if GOOS != "windows" && GOOS != "openbsd" && GOOS != "netbsd" {
							usleep(3)
						} else {
							// On some platforms system timer granularity is
							// 1-15ms, which is way too much for this
							// optimization. So just yield.
							osyield()
						}
					}
					if !_p_.runnext.cas(next, 0) {
						continue
					}
                    // 真的偷走了next
					batch[batchHead%uint32(len(batch))] = next
                    // 返回偷的数量,只有1个
					return 1
				}
			}
			return 0
		}
       // 如果 n 这时变得太大了,重新来一遍了,不能偷的太多,做得太过分了
		if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
			continue
		}
        // 将G放入到batch中
		for i := uint32(0); i < n; i++ {
			g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
			batch[(batchHead+i)%uint32(len(batch))] = g
		}
        // 工作被偷走了,更新一下队列头指针
		if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
			return n
		}
	}
}

releasep

解出P与M的关联

// 解除 p 与 m 的关联
func releasep() *p {
	_g_ := getg()
	// .... 省略的代码
	_p_ := _g_.m.p.ptr()
	// .... 省略的代码
    // 清空一些字段
	_g_.m.p = 0
	_p_.m = 0
	_p_.status = _Pidle
	return _p_
}
Built with Hugo
Theme Stack designed by Jimmy
© Licensed Under CC BY-NC-SA 4.0