StatefulSet Controller 实现

State in Kubernetes

在讲解 StatefulSet 的实现前,我们需要先看一看 Kubernetes 是如何跟踪有状态对象的。

ControllerRevision

对于有状态的数据,K8s 使用称为 ControllerRevision (src)、且其中存储的数据(Data 字段)不可变的对象来存储状态数据。Revision 对象虽然不能更新,但可以删除。

目前 K8s 内部的 DaemonSet 和 StatefulSet 使用了这个对象:

type ControllerRevision struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
	// Data is the serialized representation of the state.
	Data runtime.RawExtension `json:"data,omitempty" protobuf:"bytes,2,opt,name=data"`
	// Revision indicates the revision of the state represented by Data.
	Revision int64 `json:"revision" protobuf:"varint,3,opt,name=revision"`
}

其中,Data 字段存储了序列化的状态,一旦生成就不可变;Revision 字段表示 Data 中数据的版本号,这个字段是可变的。Data 中存储的内容则取决于使用者如何使用。

对 Controller Revision 的管理功能位于 controller_revision.go

在集群内部,Controller Revision 与其他 API 对象并无区别。

实现原理

在 StatefulSet 的实现中,StatefulSet 直接持有的对象为 Pod 和 Controller Revision。

StatefulSet Struct Ling Samuel
StatefulSet
Revision 1
Revision 2
Pod 1
Pod 2

基本上 Informer 监听自身资源和 Pod 增删改之类的逻辑和 ReplicaSet/Deployment 类似,除非有特殊逻辑,否则不再赘述。

sync

直接快进到同步函数。

同步前,sync 会调用 adoptOrphanRevisions 来收养符合 StatefulSet Selectors 的孤儿 Revisions:

func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error {
	revisions, err := ssc.control.ListRevisions(set)
	orphanRevisions := make([]*apps.ControllerRevision, 0)
	for i := range revisions {
		if metav1.GetControllerOf(revisions[i]) == nil {
			orphanRevisions = append(orphanRevisions, revisions[i])
		}
	}
	if len(orphanRevisions) > 0 {
		canAdoptErr := ssc.canAdoptFunc(set)()
		if canAdoptErr != nil {
			return fmt.Errorf("can't adopt ControllerRevisions: %v", canAdoptErr)
		}
		return ssc.control.AdoptOrphanRevisions(set, orphanRevisions)
	}
	return nil
}

其中 canAdoptFunc 是实时的 DeletionTimestamp 存在性检测,没有具体的业务逻辑。


随后,类似其他控制器,调用 getPodsForStatefulSet 获取所有可以被该 StatefulSet 管理的 Pod,同时也处理了收养、释放逻辑。

func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
	pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())

	filter := func(pod *v1.Pod) bool {
		return isMemberOf(set, pod)
	}

	cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, ssc.canAdoptFunc(set))
	return cm.ClaimPods(pods, filter)
}

注意 filter 函数,内部调用了 isMemberOf,该函数检测 Pod 的名字是否符合 StatefulSet 所管理的特征。

被 StatefulSet 所管理的 Pod,其名字需要是 StatefulSet 的名字 (parentName) 跟上一个序列号 (ordinal) (src):

func getPodName(set *apps.StatefulSet, ordinal int) string {
	return fmt.Sprintf("%s-%d", set.Name, ordinal)
}

此处的 isMemberOf 函数就检测了该 Pod 是否符合 parentName。


在获取了需要被同步的 StatefulSet 及其需要管理的 Pod 列表后,调用 syncStatefulSet 将逻辑转发到 StatefulSetControlInterfaceUpdateStatefulSet 中。

出于测试目的,一部分 StatefulSet 的核心逻辑被抽成了 StatefulSetControlInterface,其中就包括同步被 StatefulSet 及其管理的 Pod 的部分。

StatefulSet Control

默认的 StatefulSet Control 实现,其默认的 Pod 同步策略有一个非常重要的假设:按 Pod 序号升序依次扩容,且所有 Pod 都需要健康,才能创建下一个 Pod;缩容时,则是按序号的降序依次停止。

当然该实现也提供了 burst 策略,可以放宽这个限制。我们稍后会提到这个设计的取舍。


对外暴露的函数 UpdateStatefulSet 将执行以下逻辑 (src):

  • 获取该 ss 的所有 revisions,并排序。
  • 根据 revisions 和 pods,调用 performUpdate 进行更新处理。
  • 限制 revisions 数量,使之符合 Spec.RevisionHistoryLimit。

这一块的逻辑相对简单,我们重点关注实际的更新函数。


函数 performUpdate 主要有三个关键步骤 (src):

  • 调用 getStatefulSetRevisions,计算 ss 的当前 revision,并获得需要达到的 update revision。
  • 根据 update revision,调用 updateStatefulSet 来实际应用 update revision。
  • 调用 updateStatefulSetStatus,更新 ss 的状态。

getStatefulSetRevisions

Note:

由于 revision 在这里有三个不同的含义,提前说明其表述差异:

  • revision/revision 对象:小写或明确有指代对象。指代 Controller Revision 这个 API 对象。
  • Revision:首字母大写。由于 Controller Revision 没有 Spec 字段,所以其持有的 Revision 成员没有一个合适的前缀明确表示其为成员变量。因此下文会使用首字母大写的 Revision 表示 Controller Revision 对象持有的 .Revision 字段。
  • revision number:计算出来的 Revision 具体值。

本函数计算并创建/更新一个 update revision 对象。

获得计算出来的 update revision 之后,会从历史 revisions 中查找是否有相同的 revision。

  • 历史版本中有相同的 revision:
    • 如果该 revision 恰好是最新的 revision,那么直接使用这个,并且不递增 revision(这表明此次同步没有变动,依旧在处理上一次未完成的变更)。
    • 如果相同 revision 并非最新的一个,将会把这个 revision 的 .Revision 字段更新为递增之后的最新 revision number。
  • 历史版本中没有相同的 revision,则直接创建一个新的 revision 对象。

在 StatefulSet 的实现里,revision 对象实际持有的数据被称为 patch,通过 getPatch (src) 函数计算得到。该函数虽然语义上生成的是 strategy patch,似乎应该是增量的。但为了避免复杂性,实际上生成的是 spec.template 字段的全量复制,合并策略是 replace

生成 revision 对象时,除了会记录 patch 和 revision number 外,同时也会复制 StatefulSet 的所有 Labels 以及 Annotations,此外还有一个记录哈希冲突的 collision count。

updateStatefulSet

这个接近 300 行函数是 StatefulSet 业务逻辑的核心实现。

该函数会按如下步骤执行:

  • 以 Pod 序号 (ordinal) 为顺序,找出需要保留或更新的 Pods,放入 replicas slice(长度等于 .Spec.Replicas)中;超出的需要被删除的 Pods 则放入 condemned
    • 这一步还会根据 Pod 的 Revision 匹配当前或 update revision 来更新 Status 中的相应数据。
    • 注意之前的 filter 函数筛查 Pod 列表时,没有处理 Ordinal 不是数字的情形,因此此处还忽略了那些不能解析的非法 Pod。
  • 用新版的 Pod 对象填满 replicas 数组。创建 Pod 的函数为 newVersionedStatefulSetPod
  • 计算 replicas 和 condemned slice 中 Unhealthy 的 Pod 数量(打日志用),并找到第一个 Unhealthy 的 Pod。
  • 如果 SS 的 DeletionTimestamp 不为空,退出。
    • 这一步可以提前。在曾经的旧代码中,更新 SS 的状态也是在函数内完成的,因此这一步需要在更新状态之后。 不过在新版的代码中,状态更新被剥离到外部了,因此这一步之所以这么靠后,可能是重构时的遗留。

数据准备阶段到此结束。在这一步中,生成了全新的 Status,划分出了需要保留和删除的 Pod 列表,并且找到了第一个不健康的 Pod。

现在,我们需要根据 replicas slice 里存储的目标 Pod,进行扩缩容,同时执行 Pod 的健康检查。

Note:

这一阶段填充的 Pod,取决于 Partition 与 UpdateStrategy,其 PodTemplate 可能为 current revision 而非 update revision。

后文的 UpdateStrategy 一节,将更详尽地阐述相关行为。

扩容

在执行扩容前,需先获取更新 Pod 管理策略。当 PodManagementPolicyOrderedReady 时,SS 将依序检测 Pod 的状态,一次只更新一个 Pod,且只有当上一个 Pod Ready 且 Running(或者缩容情况下的 Terminated)时,才会处理下一个 Pod。当策略为 Parallel 时,则会并发地更新。

Note:

OrderedReady 策略保证了 Pod 是按顺序可用的。

但由于目前实现的问题,如果 SS 在更新后的状态是错误的而永远无法 Ready & Running,例如引用了不存在的镜像,那么将会在第一个发生错误的 Pod 处卡死。并且因为该 Pod 的状态并未被 SS 控制器 handle,这个损坏的 Pod 也不再会被移除,即使 PodTemplate 更新成了正确的版本。

当检测到 Pod 状态为 Failed 时,将该 Pod 删除,并更新 Status。然后创建一个新的 Pod 对象,替换掉 replicas slice 里的旧 Pod。

	for i := range replicas {
		if isFailed(replicas[i]) {
			if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
				return &status, err
			}
			if getPodRevision(replicas[i]) == currentRevision.Name {
				status.CurrentReplicas--
			}
			if getPodRevision(replicas[i]) == updateRevision.Name {
				status.UpdatedReplicas--
			}
			status.Replicas--
			replicas[i] = newVersionedStatefulSetPod(...)
		}

接着检测 Pod 的状态是否为空。这可能是数据准备阶段填充的新 Pod,也可能是上一步中重建的 Pod。若 Pod 的状态为空,则发起创建请求。同样地,需要更新 status。

由于 OrderedReady 策略要求一次只更新一个 Pod,因此如果这一步中发起了一个创建请求,那么本次同步就提前结束,直到下次同步时再处理后续的 Pod。

		if !isCreated(replicas[i]) {
			if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
				return &status, err
			}
			status.Replicas++
			if getPodRevision(replicas[i]) == currentRevision.Name {
				status.CurrentReplicas++
			}
			if getPodRevision(replicas[i]) == updateRevision.Name {
				status.UpdatedReplicas++
			}
			if monotonic {
				return &status, nil
			}
			continue
		}

接下来考虑其他 OrderedReady 策略下的其他 Pod 状态。如果 Pod 的 DeletionTimestamp 不为空,那么说明该 Pod 需要被删除,而删除尚未完成,因此依旧需要等待,提前终止本轮同步。若 Pod 的状态不为 Ready 以及 Running,则说明该 Pod 依旧在启动中,因此需要等待,提前终止本轮同步。

		if isTerminating(replicas[i]) && monotonic {
			return &status, nil
		}
		if !isRunningAndReady(replicas[i]) && monotonic {
			return &status, nil
		}

到这一步的 Pod 都处于健康状态。函数会检查它的各种元数据(Pod 名、NS 和 Labels 里记录的 statefulset.kubernetes.io/pod-name)和卷挂载信息是否符合 SS。如果符合就处理下一个 Pod,不符合就通过 StatefulSet Pod Control 的 UpdateStatefulPod 函数更新该 Pod 的元数据和卷挂载信息。

当上述所有检测与行为,保证了当前 Pod 处于健康状态。并且还创建了所有的新增 Pod 和 Failed Pod,更新了挂载信息,保证了元数据的一致。

		if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
			continue
		}
		// Make a deep copy so we don't mutate the shared cache
		replica := replicas[i].DeepCopy()
		if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
			return &status, err
		}
	}

当策略为 Parallel 时,上述所有的提前终止行为 (return) 变成处理下一个 Pod (continue)。

但是,这一步实际上尚未对现有 Pod 进行额外操作。

因此我们在这一步结束时,实际上有一组全新的、符合要求的新 Pod,和一组持有旧的 PodTemplate,但有新 Volumes 的 Pod。

缩容

在这一步中,我们能保证 [0, Replicas) 数量的 Pod 已经完全健康了。因此,可以开始删除不需要的 Pod 了。

考虑策略为 OrderedReady 时的情形。当 Pod 处于 Terminated 时,提前终止本轮,以等待 Pod 被完全删除。

	for target := len(condemned) - 1; target >= 0; target-- {
		if isTerminating(condemned[target]) {
			if monotonic {
				return &status, nil
			}
			continue
		}

随后进行一个 Sanity Check。如果当前处理的 Pod 不健康,那么检查第一个不健康的 Pod 是否等于当前 Pod。由于上面的扩容策略中不会更新缓存(调用 UpdateStatefulPod 前进行了一次 deep clone),这理论上似乎并不会发生。

		if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
			return &status, nil
		}

上述检测都通过后,可以安全删除当前处理的 Pod,然后提前终止本轮。当然,也要注意更新 Status。

		if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
			return &status, err
		}
		if getPodRevision(condemned[target]) == currentRevision.Name {
			status.CurrentReplicas--
		}
		if getPodRevision(condemned[target]) == updateRevision.Name {
			status.UpdatedReplicas--
		}
		if monotonic {
			return &status, nil
		}
	}

当策略为 Parallel 时,行为与扩容阶段一致,所有的提前终止行为改为处理下一个 Pod。

滚动更新

现在我们的扩缩容已经进行完毕了。接下来的工作是清理旧的 Pod,以便将它们更新为最新的 PodTemplate。

注意,如果 UpdateStrategy(注意更新策略和前文提到的 Pod 管理策略的差异)为 OnDelete,那么 SS 的同步将终止。该策略期待用户手动删除 Pod,从而在下一次同步时,在扩容阶段填充新的 Pod(以便响应 PodTemplate 的变动)。

清理旧 Pod 然后更新至新 Pod 的行为与缩容类似,是逆序进行的。

Note:

注意,Rolling Update 不受 PodManagementPolicy 控制,也没有类似 MaxSurge 或 MaxUnavailable 等参数。每一轮的删除都仅仅只会删除一个 Pod。

当 Pod 的 revision number 不是最新且 DeletionTimestamp 为空时,该 Pod 将会删除,并更新 SS 的状态。

当 Pod 的 revision number 匹配最新但 Pod 并未 Ready & Running 时,将提前终止本轮,以等待 Pod 就绪。

分区更新 (Partition)

有时候,可能需要只升级一小部分 Pod 来进行测试、灰度发布等工作。对于 Deployment 等无状态资源来说,只需要新创建一个即可。但是由于 SS 绑定了 PV,新创建的 SS 不能继承原有 PV 的数据,可能不符合需求。因此,Kubernetes 提供了 Partition 功能。

如果声明了 partition,当 StatefulSet 的 .spec.template 被更新时,所有序号大于等于该分区序号的 Pod 都会被更新。所有序号小于该分区序号的 Pod 都不会被更新,并且,即使他们被删除也会依据之前的版本进行重建。

Partition 的实现非常讨巧 (src):

func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, currentRevision, updateRevision string, ordinal int) *v1.Pod {
	if currentSet.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
		(currentSet.Spec.UpdateStrategy.RollingUpdate == nil && ordinal < int(currentSet.Status.CurrentReplicas)) ||
		(currentSet.Spec.UpdateStrategy.RollingUpdate != nil && ordinal < int(*currentSet.Spec.UpdateStrategy.RollingUpdate.Partition)) {
		pod := newStatefulSetPod(currentSet, ordinal)
		setPodRevision(pod, currentRevision)
		return pod
	}
	pod := newStatefulSetPod(updateSet, ordinal)
	setPodRevision(pod, updateRevision)
	return pod
}

设我们现在更新策略为 RollingUpdate,Replica 数为 R,Paritiaon 为 P。那么,在创建新 Pod 时,Ordinal 位于 [0, P) 的 Pod 会使用 current revision,而位于 [P, R) 的则会使用 update revision。这样就保证了无论是新建的 Pod 还是重建的 Pod,都能满足 partition 的需要。

同时也要注意计算时的 Replicas 与 Partition 的值使用的是 current set 而不是 update set。

updateStatefulSetStatus

这一步比较简单。它检查了 Rolling Update 情况下的相关状态是否达到了要求,然后相应地将 Status 的 CurrentRevision 更新为新的 Revision。

注意,它仅仅在 Rolling Update 策略下,状态符合预期时才会更新 CurrentRevision。

这是因为,在 UpdateStrategy 为 OnDelete 的情况下,没有分区功能,所有的扩容、重建直接使用最新的 SS 里的 PodTemplate 就可以了,不需要存储 Revision。

实际上,若 Partition == 0,那么 Rolling Update 对于 Revision 的使用也仅仅只是追踪滚动更新状态罢了。而 OnDelete 时,一切删除都是用户手动进行的,自然也无需追踪了。

StatefulSet Pod Control

提供 PVC 创建,以及有序 Pod 创建与删除,Pod 校验等额外功能的封装。

TODO