Deployment Controller 实现

Deployment 通过控制 ReplicaSet,进而间接控制 Pod。

DeploymentController 主要的工作函数有二:

type DeploymentController struct {
	// To allow injection of syncDeployment for testing.
	syncHandler func(dKey string) error
	// used for unit testing
	enqueueDeployment func(deployment *apps.Deployment)
}

很常见的为了测试目的抽成的变量。

实际运行时 syncHandler 为 syncDeployment,是实际的 worker;enqueueDeployment 是 enqueue,作为 lister 回调里实际调用的函数,将更新信息不断入到工作队列 queue 中。

Run 启动数个 worker goroutine,从工作队列 queue 中不断取出 key 并处理。

这是一段非常经典的 Controller 逻辑。

实现原理

实线表示持有关系,虚线表示数据流向。

Deployment Controller Ling Samuel
Deployment Event
Pod Event
RS Event
Enqueue Deployment
Dequeue Deployment
Deployment Controller
Deployment Informer
ReplicaSet Informer
Pod Informer
EventHandler
Workers
Queue
Deployment Struct Ling Samuel
Deployment
ReplicaSet 1
ReplicaSet 2
Pod 1
Pod 2
Pod 1
Pod 2

informer

DeploymentController 除了监听 Deployment 本身的增改删然后触发同步 syncDeployment 以外,还会监听 ReplicaSet 的增删改与 Pod 的删除。


总体上,addReplicaSet (src) 有三个工作要做:

  • 检查 DeletionTimestamp,确保(重启时)rs 被正确删除。逻辑移交给 deleteReplicaSet
  • 检查 rs 是否有 controller,如有且该 controller 是 Deployment,入工作队列。
  • 只有孤儿 rs 能够进入这一步。检查是否有 deployment 可以收养(adopt)它。若有,入工作队列。
func (dc *DeploymentController) addReplicaSet(obj interface{}) {
	rs := obj.(*apps.ReplicaSet)
	if rs.DeletionTimestamp != nil {
		dc.deleteReplicaSet(rs)
		return
	}
	if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
		d := dc.resolveControllerRef(rs.Namespace, controllerRef)
		dc.enqueueDeployment(d)
		return
	}

	ds := dc.getDeploymentsForReplicaSet(rs)
	for _, d := range ds {
		dc.enqueueDeployment(d)
	}
}

有关收养,可参考 Garbage Collector 一文。

Deployment 对 ReplicaSet 的收养行为是检查是否有 deployment 的 selector 能 match 到该 rs 的 labels。如果有多个匹配的 deployment,取第一个并记录一条日志。


updateReplicaSet (src):

  • 检查新旧 rs 资源版本是否一致(由于 resync),如果一致则跳过。
  • 检查 controller 是否变动。如果变动了,那么也应该同步旧 controller(若存在)。
  • 检查 rs 的 controller 是否是 Deployment,将其入工作队列。
  • 只有孤儿 rs 能够进入这一步。如果 rs 的 label 或 controller 变动了,检查是否有 deployment 可以收养(adopt)它。若有,入工作队列。
func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
	curRS := cur.(*apps.ReplicaSet)
	oldRS := old.(*apps.ReplicaSet)
	if curRS.ResourceVersion == oldRS.ResourceVersion {
		return
	}

	curControllerRef := metav1.GetControllerOf(curRS)
	oldControllerRef := metav1.GetControllerOf(oldRS)
	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
	if controllerRefChanged && oldControllerRef != nil {
		if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
			dc.enqueueDeployment(d)
		}
	}

	if curControllerRef != nil {
		d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
		dc.enqueueDeployment(d)
		return
	}

	labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
	if labelChanged || controllerRefChanged {
		ds := dc.getDeploymentsForReplicaSet(curRS)
		for _, d := range ds {
			dc.enqueueDeployment(d)
		}
	}
}

这一块的逻辑很大程度上与 add 相同。


deleteReplicaSet (src):

  • 检查 rs 是否有 controller,如有且该 controller 是 Deployment,入工作队列。
func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
	rs, ok := obj.(*apps.ReplicaSet)
	controllerRef := metav1.GetControllerOf(rs)
	if controllerRef == nil {
		// No controller should care about orphans being deleted.
		return
	}
	d := dc.resolveControllerRef(rs.Namespace, controllerRef)
	if d == nil {
		return
	}
	dc.enqueueDeployment(d)
}

虽然我没列出来,但注意这块代码有一段对于 DeletedFinalStateUnknown 的处理。当删除发生时,如果到 API server 的连接断开了,那么删除事件就会丢失。

这是一个比较容易忘记的问题,例如 NodeController 曾经就有相关的 Bug,ReplicaSet 也曾有过相关问题


deletePod:

  • 检查 Pod 的所有者是否是 rs,且该 rs 的所有者是否是 deployment。
    • 如果是,再检查该 deployment 的更新策略是否是 Recreate。
    • 如果是,再检查该 deployment 的 pod 数是否是 0。
    • 如果是,入工作队列。

对于 Pod 删除的检查完全是为了 Recreate 服务。当更新策略为 recreate 的 deployment 下没有存活 Pod 时,可以触发 deployment 同步。

Note:

注意,Deployment Controller 没有任何对 Pod 的直接操作,理论上不应该监听 Pod 事件。对 pod 副本数的保证是通过 ReplicaSet 完成的。Deployment Controller 可以认为是对 ReplicaSet 的一层封装。

虽然 Recreate 的实现监听了 Pod 的删除事件,但 Pod 的 scale down 是通过将 ReplicaSet 的 replicas 降至 0 完成的。

syncDeployment

syncDeployment 是 Deployment Controller 的核心同步逻辑。

  • 根据传进来的 key 获得 deployment 资源。
    • 如果没有 selector,更新 Status 后直接返回。
  • 获取 rs 和 pods。
  • 如果删除时间不为空,调用 syncStatusOnly,只同步 Status,然后结束。
  • 检查暂停状态并更新 condition,避免错误的 timeout。如果是暂停状态,调用 sync,然后结束。
  • 检查 rollback 状态,如果是,调用 rollback,然后结束。
  • 检查 scale 事件,如果是,调用 sync,然后结束。
  • 检查更新策略:
    • 如果是 Recreate,调用 rolloutRecreate,会等待所有 pod 都被删除后再创建新的副本。
    • 如果是 RollingUpdate,则调用 rolloutRolling,将会根据 maxSurge 和 maxUnavailable 进行逐步更新。
Note:

虽然 sync 的调用在前,但逻辑上它可能发生在 Rollout 之后,因此放在后面讲。

rolloutRecreate

重建策略的逻辑较为简单 (src)。

  • 找到所有活跃的 rs,将它们全部 scale 至 0。
    • 注意此处的 getAllReplicaSetsAndSyncRevision 第三个参数为 false,表示不自动创建新 rs。
  • 如果依旧有旧 pod 在运行(scale 尚未结束),什么也不做。
  • 如果没有新 rs,新建一个。
    • 将新建逻辑延迟到此,可避免在所有 rs 都被 scale 到 0 前就创建了新的 pod。
  • 将新 rs scale 到需要的 replica。
  • 如果 rollout 结束,调用 cleanupDeployment 清理所有旧的 replica。
// rolloutRecreate implements the logic for recreating a replica set.
func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
	allRSs := append(oldRSs, newRS)
	activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)

	// scale down old replica sets.
	scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
	if scaledDown {
		return dc.syncRolloutStatus(allRSs, newRS, d)
	}
	if oldPodsRunning(newRS, oldRSs, podMap) {
		return dc.syncRolloutStatus(allRSs, newRS, d)
	}

	if newRS == nil {
		newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
		allRSs = append(oldRSs, newRS)
	}

	if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
		return err
	}

	if util.DeploymentComplete(d, &d.Status) {
		if err := dc.cleanupDeployment(oldRSs, d); err != nil {
			return err
		}
	}

	// Sync deployment status.
	return dc.syncRolloutStatus(allRSs, newRS, d)
}
Rollout Recreate: Before Ling Samuel
Deployment
Active RS
Old RS
Active Pod 1
Active Pod 2
Rollout Recreate: Scale Down Ling Samuel
Deployment
Active RS (ScaledDown)
Old RS
Rollout Recreate: After Ling Samuel
Deployment
New RS
Old RS (Prev Active RS)
Old RS
Active Pod 1
Active Pod 2

rolloutRolling

滚动更新是更加复杂、更加常用的策略。

  • 获取所有 rs。
  • 将新 rs scale up。如果成功 scale up 了,那么结束。
  • 将旧 rs scale down。如果成功 scale down 了,那么结束。
  • 如果 rollout 结束,调用 cleanupDeployment 清理所有旧的 replica。
// rolloutRolling implements the logic for rolling a new replica set.
func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
	allRSs := append(oldRSs, newRS)

	scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
	if scaledUp {
		return dc.syncRolloutStatus(allRSs, newRS, d)
	}

	// Scale down, if we can.
	scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
	if scaledDown {
		return dc.syncRolloutStatus(allRSs, newRS, d)
	}

	if deploymentutil.DeploymentComplete(d, &d.Status) {
		if err := dc.cleanupDeployment(oldRSs, d); err != nil {
			return err
		}
	}

	return dc.syncRolloutStatus(allRSs, newRS, d)
}

这个函数的逻辑看似简单清晰,但背后有着不少的设计。

为了保证滚动更新时的可用性,scale down 并不是与 scale up 同时发生的。可以看到如果能够 scale up 时,是不会进入 scale down 的逻辑的。

滚动更新的实质行为是根据 maxSurge 和 maxUnavailable 参数,反复“迭代”、交替进行,直到旧 rs scale down 至 0,新 rs scale up 至 Spec replica。


如果仅仅是 scale down,那么自然可以一步到位(src):

func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
	if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
		return false, nil
	}
	if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
		scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
		return scaled, err
	}
	// ...

主要的问题在于 scale up 时,不能一步到位,而是需要根据 maxSurge 参数避免一次性创建太多 pod,渐进式地提升新 rs 的 replica,直到符合 Spec:

	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
	return scaled, err
}

函数 NewRSNewReplicas (src) 就是用来计算 replica 取何值的。

使用期望数量 replica + 可容忍的额外数量 maxSurge可以求出新旧 pod 共存的最大数量 maxTotalPods。如果当前的 pod 数量超过了,那么就返回 rs 的当前 replica 数,使得外层的 scale up 函数返回 false,进而走到 scale down 逻辑。

如果当前的 pod 数量不足时,使用最大数量 maxTotalPods - 当前数量 currentPodCount,可以得到本次迭代最多 scale up 的副本数。由于 maxSurge 可能非常大,还需要取这个结果与 Spec replica 中更小的一个。

func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) {
	switch deployment.Spec.Strategy.Type {
	case apps.RollingUpdateDeploymentStrategyType:
		// Check if we can scale up.
		maxSurge, err := intstrutil.GetScaledValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true)
		
		currentPodCount := GetReplicaCountForReplicaSets(allRSs)
		maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge)
		if currentPodCount >= maxTotalPods {
			return *(newRS.Spec.Replicas), nil
		}
		scaleUpCount := maxTotalPods - currentPodCount
		scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas))))
		return *(newRS.Spec.Replicas) + scaleUpCount, nil
	// ...
	}
}

在 scale down 时,也不能一次性将旧 replica scale 到 0。因此 reconcileOldReplicaSets 函数也需要根据 maxUnavailable 参数避免一次性移除太多 pod。

使用期望数量 replica - 最小不可用数量 maxUnavailable可求出至少需要有多少存活的 pod 数量 minAvailable

此时注意,新 rs 不一定所有的 pod 都健康或 ready。我们实际使用的计算公式应该是 可缩减数量 maxScaledDown = 当前可用数量 - 最小可用数量 minAvailable。其中当前可用数量 = 所有数量 allPodsCount - 新 rs 不可用数量 newRSUnavailablePodCount。如果在新 rs 没有全部 ready 的情况下,仅考虑 maxUnavailable,简单地直接用会得到一个较大的 scale down 值,导致可用性不符合设计。

根据新 rs 的 replica - availableReplicas 可以求出新 rs 有多少 pod 不可用。根据上述公式即可求出本次迭代最多可以杀死几个 pod。如果小于等于零,说明可容忍的缩容程度已经到达限制了,应该将控制权移交回扩容,函数返回 false。

func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
	oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
	if oldPodsCount == 0 {
		return false, nil
	}

	allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
	maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
	minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
	newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
	maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
	if maxScaledDown <= 0 {
		return false, nil
	}
	// ...

到目前为止,在计算可缩容数量时,并没有考虑到旧 rs 持有的 pod 可能不健康的情形。

为了保证可用性,我们必须保证实际可用的 pod 数量不少于 minAvailable。具体的可用性保证在 scaleDownOldReplicaSetsForRollingUpdate 函数 (src) 中:

func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
	maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
	minAvailable := *(deployment.Spec.Replicas) - maxUnavailable

	availablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
	if availablePodCount <= minAvailable {
		return 0, nil
	}
	// ...

当 available pod 数目不多于最小可用性要求时,scale down 不会发生。在很早期的版本中,这导致了潜在的问题。

优先考虑 scale down 不健康的 rs 有两个好处:

  1. (在当时的版本)rs 删除 pod 时会有如下优先级:not-ready < ready, unscheduled < scheduled, and pending < running。这使得缩容不健康的 rs 并不会降低可靠性(因为它本身就不健康)。现在的 rs 删除 pod 考虑的更多,包括会尽量让 pod 分散在不同的节点上,等。

  2. 缩容健康的 rs 会导致可用数量检查不通过。如果现有的不健康 rs 始终无法恢复,那么缩容就停止了。

因此需要首先调用 cleanupUnhealthyReplicas 清理不健康的 rs。

func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
	// ...
	oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown)

	// Scale down old replica sets, need check maxUnavailable to ensure we can scale down
	allRSs = append(oldRSs, newRS)
	scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)

	totalScaledDown := cleanupCount + scaledDownCount
	return totalScaledDown > 0, nil
}

这两个缩容函数的行为模式与上述公式类似,不再赘述。区别在于,cleanupUnhealthyReplicas 会将不健康的 rs 缩容到 replicas == availableReplicas,而 scaleDownOldReplicaSetsForRollingUpdate 会直接尝试缩到尽可能小的值,比如 0。

Note:

scaleDownOldReplicaSetsForRollingUpdate 函数中有一个很奇怪的判断。

func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
	maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
	minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
	availablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
	if availablePodCount <= minAvailable {
		return 0, nil
	}

	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
	
	// 已缩容的 replica
	totalScaledDown := int32(0)
	// 最多缩容几个 replica。由于上面的可用性判断,totalScaleDownCount > 0
	totalScaleDownCount := availablePodCount - minAvailable
	for _, targetRS := range oldRSs {
		if totalScaledDown >= totalScaleDownCount {
			// 缩容够了,退出
			break
		}
		if *(targetRS.Spec.Replicas) == 0 {
			continue
		}
		// 缩容的数量。如果还能缩容的数量 (totalScaleDownCount - totalScaledDown) 比该 rs 的全部 replica 都多,那么直接缩到 0
		scaleDownCount := int32(integer.IntMin(int(*(targetRS.Spec.Replicas)), int(totalScaleDownCount-totalScaledDown)))
		newReplicasCount := *(targetRS.Spec.Replicas) - scaleDownCount
		// 神秘的判断。如果 A = B - C 且 A > B,那么说明 C 是负数。而 C = Min(B, totalScaleDownCount - totalScaledDown)
		// 但是由于 totalScaledDown >= totalScaleDownCount 时直接退出了循环,所以此处的 totalScaleDownCount - totalScaledDown > 0
		// 来源:https://github.com/kubernetes/kubernetes/pull/22828
		// 这个 PR 认为是 cleanup 时缩容了 rs,但是没有返回更新后的 oldRS。乍一看感觉好像很有道理,但问题是 cleanup 并不是异步的。
		// 假该 rs 在 cleanup 前有 3/5 个 replica,被 cleanup 缩容成了 3/3(这里一定会缩容到极限,而不会缩到比如 3/4。因为如果缩容名额不足了,那么就不会走到这个循环),然后修改后的值没有返回。
		// 因此这个函数里还是认为该 rs 是 3/5,那么,很显然这个计算逻辑依旧保证了 newReplicasCount 最小是 0,而不是负数。
		// 按理来说这个情形是不应该发生的。除非有别的地方并发地修改了 oldRSs 里的指针。但我好像没找到可疑的地方,不太懂。
		if newReplicasCount > *(targetRS.Spec.Replicas) {
			return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
		}
		_, _, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
		if err != nil {
			return totalScaledDown, err
		}

		totalScaledDown += scaleDownCount
	}

	return totalScaledDown, nil
}

rollback

每个 Deployment 都有 Revision 概念。保留的最大 Revision 数由 Spec 中的 RevisionHistoryLimit 决定。

前文可以发现,Deployment 对 rs 的操作一般是 scale 到 0,而不是直接删除。这是因为 Deployment 的历史版本是通过保留一个 Replicas 为 0、有着旧版 PodTemplate 的 rs 来实现的。对于 rs 的删除逻辑,在 cleanupDeployment 中处理。

前文已经数次提到 cleanupDeployment 这个函数 (src),它的作用就是清理超过保留数的 ReplicaSet。

  • RevisionHistoryLimit == math.MaxInt32,代表保留所有副本,于是什么也不做。
  • 排除没有 DeletionTimestamp 的存活 rs。
  • 按 Revision 排序。如果没有 Revision 或 Revision 相同,按 CreationTimestamp 排序。Revision 储存在注解 deployment.kubernetes.io/revision 中。
  • 删除超出 RevisionHistoryLimit 的 rs。
func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
	if !deploymentutil.HasRevisionHistoryLimit(deployment) {
		return nil
	}

	aliveFilter := func(rs *apps.ReplicaSet) bool {
		return rs != nil && rs.ObjectMeta.DeletionTimestamp == nil
	}
	cleanableRSes := controller.FilterReplicaSets(oldRSs, aliveFilter)

	diff := int32(len(cleanableRSes)) - *deployment.Spec.RevisionHistoryLimit
	if diff <= 0 {
		return nil
	}

	sort.Sort(deploymentutil.ReplicaSetsByRevision(cleanableRSes))

	for i := int32(0); i < diff; i++ {
		rs := cleanableRSes[i]
		// Avoid delete replica set with non-zero replica counts
		if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil {
			continue
		}
		if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(context.TODO(), rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
			return err
		}
	}

	return nil
}

回滚是在 Kubectl 中调用的 (src)。Deployment Controller 收到 Rollback 请求后 (src):

  • ToRevision == 0,从现存 rs 中寻找次新的 Revision。如果没有找到,放弃这次 Rollback。
  • 寻找现存 rs 中匹配 ToRevision 的 rs。如果找到,那么复制该 rs 的 PodTemplate 到 deployment 上,并将非 metadata 的注解复制到 deployment 上。
    • metadata 注解包括 Revision、RevisionHistory 等,详见此
  • 没有找到,放弃这次 Rollback。

无论是找到了还是没有找到,都会通过调用 updateDeploymentAndClearRollbackTo 发起一次 Update 请求。回滚操作分了两步,第一步更新注解,服务端会解析这个注解,然后再发起真正的回滚请求(依旧是一次更新操作)。当然,即使没有找到对应的 revision,也会发送一次更新请求删掉这个 Rollback 注解。

func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)

	allRSs := append(allOldRSs, newRS)
	rollbackTo := getRollbackTo(d)
	if rollbackTo.Revision == 0 {
		if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 {
			return dc.updateDeploymentAndClearRollbackTo(d)
		}
	}
	for _, rs := range allRSs {
		v, err := deploymentutil.Revision(rs)
		if v == rollbackTo.Revision {
			performedRollback, err := dc.rollbackToTemplate(d, rs)
			return err
		}
	}
	return dc.updateDeploymentAndClearRollbackTo(d)
}

func (dc *DeploymentController) rollbackToTemplate(d *apps.Deployment, rs *apps.ReplicaSet) (bool, error) {
	performedRollback := false
	if !deploymentutil.EqualIgnoreHash(&d.Spec.Template, &rs.Spec.Template) {
		deploymentutil.SetFromReplicaSetTemplate(d, rs.Spec.Template)
		deploymentutil.SetDeploymentAnnotationsTo(d, rs)
		performedRollback = true
	}

	return performedRollback, dc.updateDeploymentAndClearRollbackTo(d)
}

updateDeploymentAndClearRollbackTo 函数则清除了 rollback 注解,避免注解残留 (src):

func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(d *apps.Deployment) error {
	setRollbackTo(d, nil)
	_, err := dc.client.AppsV1().Deployments(d.Namespace).Update(context.TODO(), d, metav1.UpdateOptions{})
	return err
}

我们从 FindNewReplicaSet 的实现中可以发现,更新时会优先复用已有的、同 PodTemplate 的 rs。而在回滚这个功能上,是必定能够找到已有的 rs 的。因此回滚时会重新 scale up 已存在的 rs,而非重新创建一个 rs。

pause/resume

暂停和恢复功能,主要是用于临时停止滚动更新,用到的场景不是很多。

暂停的 Deployment 不响应任何操作,以便于我们修复损坏的 Deployment。

和 Rollback 一样,pause/resume 也是在 Kubectl 端实现的 (src),原理是更新 Spec.Paused 的值。Resume 同理。

暂停 Deployment 的具体实现与 Scaling 一致,因此一同放在 sync 一节中讲述。

sync

syncDeployment 中,检查是否是 scale 事件的函数 isScalingEvent 比对活跃 rs 和 deployment 的 replicas 字段,若不一致则判定为一次扩缩容事件。

sync 即是处理暂停或扩缩容事件的函数。注意,这并不是一个常规的 Rollout 流程。常规的 Rollout 流程在上面进行,不应该走到这里。

在同步时,会有“新 rs”与“旧 rs”的概念,通过 pod template hash 来区分 (src)。

“新 rs”指的是 PodTemplate 与目标 PodTemplate 一致的 rs,它会尽量复用已存在的 rs。

func FindNewReplicaSet(deployment *apps.Deployment, rsList []*apps.ReplicaSet) *apps.ReplicaSet {
	sort.Sort(controller.ReplicaSetsByCreationTimestamp(rsList))
	for i := range rsList {
		if EqualIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) {
			return rsList[i]
		}
	}
	// new ReplicaSet does not exist.
	return nil
}

sync (src) 进行以下操作:

  • 更新 rs 的 revision number,如果存在新 rs,找到它。
  • 进行 scale。
  • 如果发现 deployment 处于暂停状态,检查是否在进行 rollback。如果不是,那么会清理旧的 rs。
  • 更新 Status。
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
	if err := dc.scale(d, newRS, oldRSs); err != nil {
		return err
	}

	if d.Spec.Paused && getRollbackTo(d) == nil {
		if err := dc.cleanupDeployment(oldRSs, d); err != nil {
			return err
		}
	}

	allRSs := append(oldRSs, newRS)
	return dc.syncDeploymentStatus(allRSs, newRS, d)
}

scale 函数中,首先通过 FindActiveOrLatest 函数,尝试是否能获得单个 rs 进行直接操作(有且只有一个活跃 rs,或者全部 rs 都不活跃)。如果 (src):

  • 没有活跃 rs(活跃 rs 指的是 replica 不为 0 的 rs)。
    • 有“新 rs”,则返回新 rs(Active)。
    • 返回旧 rs 中尽可能新的 rs(Latest)。
  • 有且恰好有一个活跃的 rs,则将其返回(Active)。
  • 有多于一个活跃的 fs,返回空。多于一个的活跃 rs 说明 deployment 正在滚动更新中,不应该直接操作。
func FindActiveOrLatest(newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) *apps.ReplicaSet {
	if newRS == nil && len(oldRSs) == 0 {
		return nil
	}

	sort.Sort(sort.Reverse(controller.ReplicaSetsByCreationTimestamp(oldRSs)))
	allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))

	switch len(allRSs) {
	case 0:
		if newRS != nil {
			return newRS
		}
		return oldRSs[0]
	case 1:
		return allRSs[0]
	default:
		return nil
	}
}

如果得到了一个活跃或尽可能新的 rs,那么直接对这个 rs 进行 scale:

func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
	if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
		if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
			return nil
		}
		_, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, *(deployment.Spec.Replicas), deployment)
		return err
	}
	// ...

如果发现新 rs 已经饱和(replica 达到期望状态)了,那么将旧 rs scale 成 0:

	if deploymentutil.IsSaturated(deployment, newRS) {
		for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
			if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
				return err
			}
		}
		return nil
	}
	// ...

但是,如果 deployment 处于滚动更新中,有可能存在多个活跃的 rs,且均未饱和,滚动更新的逻辑依旧在继续。

滚动更新时发生的扩缩容事件,应该均等地分布在所有的 rs 上。当前的实现,是从新到旧扩容,从旧到新缩容。

这一部分的扩缩容应该按比例进行。公式为:可扩容的额外副本数 / 当前副本数。展开来即:(Spec.Replicas + MaxSurge - Status.Replicas) / Status.Replicas,这个比例最后将乘上 rs 的 Spec.Replicas,再经过一些转换(最小值取 1、不超过期望值等)便可得到每个 rs 的 replicas 变化。

得到更新后的 rs 列表后,再调用 Update 函数将更新提交至服务器,一次扩缩容便完成了。

总结

Deployment Controller 的实现从概念上讲,还是比较简单、清晰的。

但是,概念简单离实现正确之间往往相距非常遥远。Deployment Controller 的实现考虑了很多比较细节和边界的情况(例如 revision number、generation、hash collision 等),本文并没有仔细阐述。

如果要实现自己的控制器,参考 Deployment Controller 的实现,或直接复用 Deployment,会是比较好的选择。