ReplicaSet Controller 实现

ReplicationController 的代码已经与 ReplicaSet 合并了。

与 Deployment 类似,ReplicaSet 的主要处理函数为 syncHandler,实际执行的是 syncReplicaSet,为实际的 worker。相比 Deployment,RS 没有将 enqueueRS 也抽成变量。这是因为 RS 的业务逻辑比较少,功能较为单一,测试需要的 case 也不需要非常侵入逻辑。

type ReplicaSetController struct {
	// To allow injection of syncReplicaSet for testing.
	syncHandler func(rsKey string) error
}

实现原理

实线表示持有关系,虚线表示数据流向。

ReplicaSet Controller Ling Samuel
Pod Event
RS Event
Enqueue ReplicaSet
Dequeue ReplicaSet
ReplicaSet Controller
ReplicaSet Informer
Pod Informer
EventHandler
Workers
Queue

informer

ReplicaSet Controller 会监听 ReplicaSet 的增改删,将它们入到工作队列中。当然,删除回调额外对 expectation 做了一些工作,这会在 expectation 一节讲述。

此外它还监听了 Pod 的增改删,其行为与 Deployment 十分相似。


addPod (src):

  • 如果 Pod 的 DeletionTimestamp 不为空(Controller 重启时可能发生),转交 deletePod 处理。
  • 如果 Pod 的 Controller 是 RS,那么入工作队列。
  • 检查是否有 RS 能够收养该 Pod,将能够收养的 RS 入工作队列。
func (rsc *ReplicaSetController) addPod(obj interface{}) {
	pod := obj.(*v1.Pod)

	if pod.DeletionTimestamp != nil {
		rsc.deletePod(pod)
		return
	}

	if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
		rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
		rsKey, err := controller.KeyFunc(rs)
		rsc.expectations.CreationObserved(rsKey)
		rsc.queue.Add(rsKey)
		return
	}

	rss := rsc.getPodReplicaSets(pod)
	if len(rss) == 0 {
		return
	}
	for _, rs := range rss {
		rsc.enqueueRS(rs)
	}
}

updatePod (src):

  • 检查新旧 Pod 的资源版本,如果一致(可能由 resync 触发),跳过。
  • 检查新 Pod 的 DeletionTimestamp 是否非空。如果是,将新 Pod 移交给删除逻辑 deletePod
    • 由于 grace deletion,删除 Pod 时 Kubelet 可能会等待一段时间才会进行删除。但 RS 应该尽快启动新的 Pod,因此在 Update 阶段就检测删除行为。
    • 如果新旧 Pod 的 Labels 不一致了,同样将旧 Pod 移交给删除逻辑。
      • 不需要检查旧 Pod 的删除时间,因为新 Pod 已经被删除了,旧 Pod 也不该继续存在。且因为 Labels 变动,旧 Pod 应该是由其他 RS 管理的,因此需要额外触发一次删除,以清理旧 RS。
  • 检查 controller 是否变动。如果变动了,也应该同步旧 rs。
  • 检查新 Pod 的 controller 是否是 rs,将其入工作队列。
    • 由于 Pod 的 MinReadySeconds 的存在,Pod 的状态可能由 NotReady 转向 Ready。应该在 MinReadySeconds 后同步 RS 的 Available Replicas 状态。
  • 只有孤儿 Pod 能走到这一步。检查 Label 或 controller 是否变动了,检查是否有 rs 能够收养,将能够收养的 RS 入工作队列。
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
	curPod := cur.(*v1.Pod)
	oldPod := old.(*v1.Pod)
	if curPod.ResourceVersion == oldPod.ResourceVersion {
		return
	}

	labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
	if curPod.DeletionTimestamp != nil {
		rsc.deletePod(curPod)
		if labelChanged {
			rsc.deletePod(oldPod)
		}
		return
	}

	curControllerRef := metav1.GetControllerOf(curPod)
	oldControllerRef := metav1.GetControllerOf(oldPod)
	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
	if controllerRefChanged && oldControllerRef != nil {
		if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
			rsc.enqueueRS(rs)
		}
	}

	if curControllerRef != nil {
		rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
		rsc.enqueueRS(rs)
		if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
			rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
		}
		return
	}

	if labelChanged || controllerRefChanged {
		rss := rsc.getPodReplicaSets(curPod)
		for _, rs := range rss {
			rsc.enqueueRS(rs)
		}
	}
}

deletePod (src):

  • 检查 controller 是否是 rs,若是,入工作队列。
func (rsc *ReplicaSetController) deletePod(obj interface{}) {
	pod, ok := obj.(*v1.Pod)
	controllerRef := metav1.GetControllerOf(pod)
	rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
	rsKey, err := controller.KeyFunc(rs)
	rsc.queue.Add(rsKey)
}

删除逻辑同样有对于 DeletedFinalStateUnknown 的处理,此处依然省略。

syncReplicaSet

syncReplicaSet 是实际的工作函数 (src)。

  • 取出 ReplicaSet 对象。
  • 检查缓存是否需要更新。
  • 取出所有 Pod(而不进行任何过滤),过滤掉不活跃(Phase 为 Succeeded 或 Failed,或 DeletionTimestamp 不为空)的 Pod。
    • 取出的 Pod 中可能包含并不符合当前 RS Selector 的 Pods。这是为了能够清理归属于当前 RS,但由于 Selector 变动,需要被释放的 Pod。
    • 由于 Pod OwnerReference 的 RS 依旧存在(只是 Selector 变动了),Garbage Collector 无法回收这部分 Pod。
    • 这部分逻辑交由 NewPodControllerRefManager (src) 完成,是一个通用的 adopt/release 实现。
  • 尝试收养 (claimPods)。这会返回一个符合 Selector 并进行了收养后的 Pod 列表。
  • 如果需要同步,执行 manageReplicas
  • 更新 Status。
    • 由于 ReplicaSet 的 Status 可能与创建/删除 Pod 时的错误有关,因此参数中有上一步的返回值。
  • 若 MinReadySeconds > 0,且 ReadyReplicas 或 AvailableReplicas 未达到最大值 (Spec.Replicas),在延时后重入工作队列。
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)

	rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)

	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
	filteredPods := controller.FilterActivePods(allPods)

	filteredPods, err = rsc.claimPods(rs, selector, filteredPods)

	var manageReplicasErr error
	if rsNeedsSync && rs.DeletionTimestamp == nil {
		manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
	}

	rs = rs.DeepCopy()
	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
	updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)

	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
	}
	return manageReplicasErr
}
Note:

本地 TTL 缓存 expectations 稍后会提到。

manageReplicas

实际进行扩缩容的工作函数。扩缩容可由数量差决定 (src):

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
	diff := len(filteredPods) - int(*(rs.Spec.Replicas))

已有 Pod 更多时,diff > 0,需要缩容;反之则需要扩容。

在操作时,一次同步最多操作 500 个 Pod。这是一个固定写死的参数 (BurstReplicas = 500)。

	if diff < 0 {
		diff *= -1
		if diff > rsc.burstReplicas {
			diff = rsc.burstReplicas
		}
		// ...
	} else if diff > 0 {
		if diff > rsc.burstReplicas {
			diff = rsc.burstReplicas
		}
		// ...
	}

扩容

和 Deployment 不同。虽然 ReplicaSet 没有 MaxSurge/MaxUnavailable 参数,它依旧不能并发地创建所有所需的 Pod。

主要原因在于有许多情况下,Pod 的创建会因同一种原因失败(例如资源不足)。如果并发地创建所有的 Pod,可能会给 API Server 带来不必要的负载。

ReplicaSet 和一些其他的资源(例如 Job),使用称为 slowStart 的机制来创建 Pod。它会首先创建 1 个 Pod,然后每次加倍。任意一次创建不是全部成功,则直接退出,不再创建剩余 Pod。

注意,此处的创建函数有一个特判。当错误是 NS 被删除时,为了避免 RS 每次同步都重复操作,它会忽略掉这个报错,假装创建成功了。虽然可以提前终止整个创建流程,但目前来看这个行为不是一个很严重的性能问题。

	if diff < 0 {
		// ...
		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
			err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
			if err != nil {
				if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
					return nil
				}
			}
			return err
		})
		return err
	}

缩容

缩容要更加复杂一点。

在删除 Pod 时,它会首先删除在生命周期早期的 Pod。具体来说,它会按此顺序排序 Pod:

  • Unscheduled < Scheduled
  • Pending < Unknown < Running
  • Not Ready < Ready
  • 节点上相关 Pod 更多 < 节点上相关 Pod 更少
  • Running 时间更短 < Running 时间更长
  • Restart 次数更多 < Restart 次数更少
  • 创建时间更近 < 创建时间更早

这个顺序可以保证删除 Pod 时,尽量避免删除存活可用的 Pod,也能避免调度等流程重新被执行。

除了生命周期规则外,注意“节点上相关 Pod”(此处称为 Rank)这一规则。

由于 ReplicaSet 被设计成可能被其他控制器管理的资源,因此可能有多个 RS 管理着一组相似的 Pod(比如 Deployment)。

Related Pods Ling Samuel
Parent Owner
ReplicaSet 1
ReplicaSet 2
Pod 1
Pod 2
Pod 3
Pod 4
Node 1
Node 2

以上图为例,Pod 1~3 均分布在 Node 1 上,而 Pod 4 分布在 Node 2 上。且管理这些 Pod 的 ReplicaSet 拥有共同的 Owner。这时候,Pod 1~3 的 Rank 为 3,而 Pod 4 则为 1。

可以发现,如果保持相关 Pod 的 Rank 尽可能低,那么会拥有更好的可用性(Pod 更加分散了)。因此会在生命周期规则之后优先删除高 Rank 的 Pod。

	if diff > 0 {
		// ...
		relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
		podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)

		var wg sync.WaitGroup
		wg.Add(diff)
		for _, pod := range podsToDelete {
			go func(targetPod *v1.Pod) {
				defer wg.Done()
				if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
				}
			}(pod)
		}
		wg.Wait()
	}
}

func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
	if diff < len(filteredPods) {
		podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
		sort.Sort(podsWithRanks)
	}
	return filteredPods[:diff]
}

由于 ReplicaSet 的删除工作是单纯的缩容(而不是 RollingUpdate),因此删除是可以完全并发完成的。

expectations

ControllerExpectations 是一种控制器用来追踪期望状态和现有状态的缓存工具。ControlleeExpectations 表征被控制器追踪的资源应有的期望状态。

以增加资源为例,当一个控制器管理的资源应该被增加时(例如扩容 RS),将提高 (raise) expectations,表征还有多少资源应当被创建;当确实观察 (observe) 到这个资源增加了,降低 (lower) expectations,表明一定数量的资源预期被满足 (fullfill) 了。

实现上来讲,expectations 是一组原子的 int 值,目前包含 add/del 两组,分别代表增加和删除。


以 ReplicaSet 为例,讲解 K8s 控制器是如何利用 expectations 来缓存期望状态的。

实际对 RS 进行扩缩容的函数为 manageReplicas

在扩容时,它首先通过 ExpectCreations 增加期望值,设该值为 N。在观察到成功创建了 N0 个 Pod 后,它将期望值降低至 N - (N - N0) == N0。

这是因为控制器(理论上)永远不会收到创建失败的 Pod 的通知(无法观察到资源变更)。

		// ...
		rsc.expectations.ExpectCreations(rsKey, diff)
		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {})
		// ...
		if skippedPods := diff - successfulCreations; skippedPods > 0 {
			for i := 0; i < skippedPods; i++ {
				rsc.expectations.CreationObserved(rsKey)
			}
		}

addPod 的 Informer 回调中,也调用了 CreationObserved,来降低期望值。

func (rsc *ReplicaSetController) addPod(obj interface{}) {
	pod := obj.(*v1.Pod)
	if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
		// ...
		rsc.expectations.CreationObserved(rsKey)
		// ...
		return
	}

在缩容时,同样的,首先通过 ExpectDeletions 增加期望值,然后在创建失败时降低期望值,因为(理论上)永远不会观察到删除失败的资源变动。

		// ...
		rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
		// ...
		for _, pod := range podsToDelete {
			go func(targetPod *v1.Pod) {
				if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
					rsc.expectations.DeletionObserved(rsKey, podKey)
				}
			}(pod)
		}

同样在 deletePod 回调中,也调用了 DeletionObserved

func (rsc *ReplicaSetController) deletePod(obj interface{}) {
	pod, ok := obj.(*v1.Pod)
	// ...
	rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
}

现在,假设一切运行正常,没有创建失败也没有删除失败,那么 expectations 的值将在一定时间(所有创建和删除都结束后)等于 0。

也就是期望被满足了,syncReplicaSet 中的 SatisfiedExpectations 将返回 true。

func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
	// ...
	rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
}

func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
	if exp, exists, err := r.GetExpectations(controllerKey); exists {
		if exp.Fulfilled() {
			return true
Note:

如果没有获取到缓存,说明可能是第一次进行同步,也返回 true,触发同步。

当 expectations 超时(默认五分钟)时,可能是由于创建或删除迟迟没有发生,也进行下一次同步,避免控制器停留在异常状态中。

当然,在过滤存活 Pod 阶段不会再次计算 DeletionTimestamp 不为空的 Pod,避免重复删除。

倘若上一次超时的删除操作甚至没有能改变理应被删除的 Pod 的 DeletionTimestamp,那这个 Pod 无论过多久都不会再被删除,再次考虑删除它则是合理的。

期望被满足意味着上一次的同步逻辑 (manageReplicas) 已经结束了,应当进行下一次同步。


sync
Fulfilled?
Alive Pods
Diff
Reset expectations

现在考虑一些其他状态。例如,虽然一切运行正常,但用户手动删除了一个已经被管理的 Pod,这时删除逻辑依旧被触发了。这可能导致当前的期望值变成 -1。

当然我们观察 ControlleeExpectationsFulfilled 函数实现可知,期望值是负数也被认为是期望被满足,依旧可以进行正常的同步行为。

考虑另一个比较极端的情况。该删除恰好使期望提前变成 0,且恰好进行了一次同步,触发了下一轮的 manageReplicas。这时候会发生什么?

在这个情况下,实际被删除的 Pod 真正被删除发生在计算期望是否满足之后。

假设它在计算现有 Pod 时已经被删除了,那么什么也不会影响。因为只要在计算现有 Pod 前删除结束了,那么获得的存活 Pod 信息是正确的,旧的缓存状态如何已经不关键了。

如果这个删除发生在 manageReplicas 重置期望值之后(而不论这个删除究竟拖延了几轮 manageReplicas),当删除发生时,本质上会变成上一种情形:在下一轮 syncReplicaSet 时,期望值被减到了负数。

如果这个删除发生在计算现有 Pod 之后、重置期望值之前,这会使得 manageReplicas 误以为存活 Pod 更多了一个,会造成下一轮的同步额外增或删一个 Pod。但这会在下下轮的同步中被修复。

因此,这个机制可以说具有足够的健壮性。


当然,这是比较简单的情形。一个显著的特例是对 DeletionTimestamp 的更新行为。它实际上预示着一次删除行为,逻辑会转交给 deletePod 处理。此外如果新旧 Pod 的 Label 变化了,它还会一同 inform 旧 Pod 被删除了(即使新旧 Pod 是同一个)。

如果仅仅是对数字进行增减,这里会不可避免地造成重复删除的问题。

因此,RS 实际上使用的是 UIDTracking ControllerExpectations。它特殊在期望删除时不是简单地加减 1,而是根据对象的 UID 来处理,避免重复“观察”到同一个对象的删除行为。

其实,在创建 Pod 时也可以追踪 UID。但是问题在于删除时 UID 是已知的,而创建时是未知的。且 UIDTracking ControllerExpectations 实现中,存储是带锁的。由于创建行为是并发的,若要跟踪创建 Pod 时的 UID,会导致 UIDTracking 内部发生激烈的锁冲突。


不过,UID Tracking 会导致删除有一个特别的情形。

假设某个 Pod 预期被删除,但在删除真正发生前,用户改变了它的 Labels,使之不应再被 RS 管理,且在 syncReplicaSet 时被 claimPods 释放。那么该 Pod 的删除将不再能进入到 DeletionObserved 这一段中。因此本次同步会一直等待到 expectations 超时才会触发下一轮。


缓存 Expectation 的机制保证了 slow start 行为不会因为多次同步而被破坏。同时也保证了一个 RS 同时只有一个创建、删除流程进行,避免了并发时需要加锁或频繁计算集群当前状态和期望状态造成的资源浪费。

总结

ReplicaSet Controller 的实现虽然比较直接,但也能发现逐渐的演进过程。例如,最初的 ReplicaSet 缩容没有考虑 Rank 而只考虑 Pod 生命周期;创建 Pod 时也不会忽略 NS 正在被删除的错误(会产生大量日志);没有 slow start,等等。

从 ReplicaSet 的演进中可以发现,一个看似简单的功能,随着系统的复杂化,也可以演变出很多高级的特性。完成一个功能,与这个功能好用而完善,还有一段相当的距离。

在平时的使用中,一般是不会直接使用 ReplicaSet 的。但如果需要实现定制化的控制器,就可能需要依赖 ReplicaSet 来实现。