Kubelet Startup #
Kubelet 的起始源码按照惯例位于 cmd/kubelet/kubelet.go
,本质上是对 pkg/kubelet/kubelet.go
的封装。
Kubelet startup:
sequenceDiagram
participant main as cmd/kubelet/kubelet.go
participant server as cmd/kubelet/app/server.go
participant kubelet as pkg/kubelet/kubelet.go
participant cfg as pkg/kubelet/config/config.go
participant lw as ListAndWatch
main-->>server: NewKubeletCommand()
server-->>server: Run()
RunKubelet()
createAndInitKubelet()
server-->>+kubelet: NewMainKubelet()
kubelet-->>+cfg: NewPodConfig()
cfg-->>+lw: NewSourceApiserver()
lw-->>-cfg: Channel
cfg-->>-kubelet: *PodConfig
kubelet-->>-server: *Kubelet
#server-->>kubelet: StartGarbageCollection
server-->>server: startKubelet()
server-->>kubelet: go Run()
par loop Events from *PodConfig
lw-->>kubelet: Events
kubelet-->>kubelet: syncLoop
end
server-->>kubelet: go ListenAndServe()
Kubelet 的核心组件(在 NewMainKubelet
中初始化)包括但不限于:
- kubelet
- SharedInformer
- Indexer
- NodeLister
- ServiceLister
- ListWatch
- ProbeManager
- StatusManager
- LivenessManager
- StartupManager
- PodManager
- Runtime (NewKubeGenericRuntimeManager)
- PLEG
- ContainerGC
- ImageGCManager
- CertificateManager
- TokenManager
- PluginManager
- VolumeManager
- PodWorkers
- PodKiller
- EvictionManager
- PodAdmitHandler
- PodSyncLoopHandler (ActiveDeadlineHandler)
- PodSyncHandler (ActiveDeadlineHandler)
Run #
Run
函数启动 kublet 的核心功能:处理配置更新。
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
kl.imageManager.Start()
kl.serverCertificateManager.Start()
kl.oomWatcher.Start(kl.nodeRef)
kl.resourceAnalyzer.Start()
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
go kl.nodeLeaseController.Run(wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
kl.initNetworkUtil()
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)
// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()
kl.runtimeClassManager.Start(wait.NeverStop)
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
各个 manager 的启动与 iptables 的配置暂时可以略过。重点关注 syncLoop
函数。这是 kubelet 的核心函数。
syncLoop #
在 syncLoop
最后的 for 循环里,调用了 syncLoopIteration
函数:
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool
参数 configCh
是接收配置更新的 channel,后续配置的变更都从这个 channel 读取。syncCh
和 housekeepingCh
是定时触发的同步事件。
此外,函数还从 livenessManager
里读取事件。
参数 handler
是对 Pod 进行操作的具体的实现,此处传递进来的就是 kl *Kubelet
自身。
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
kl.syncLoop(updates, kl)
}
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
for {
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
}
}
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
switch u.Op {
case kubetypes.ADD:
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
default:
klog.Errorf("Invalid event type received: %d.", u.Op)
}
kl.sourcesReady.AddSource(u.Source)
}
return true
}
SyncHandler (interface) #
总的来说,所有的事件都是 SyncHandler
这个接口的实现(此处即是 *Kubelet
)处理的,以 HandlePodUpdates
为例:
// HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.UpdatePod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
}
}
Kubelet.SyncHandler
的函数实现,除了 HandlePodRemoves
之外,无论是 Add 还是 Update,最终都会调用 dispatchWork
,而这个函数会将请求派发给 statusManager.TerminatePod
或 podWorkers.UpdatePod
进行处理。
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// check whether we are ready to delete the pod from the API server (all status up to date)
containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
if pod.DeletionTimestamp != nil && containersTerminal {
kl.statusManager.TerminatePod(pod)
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
})
}
顺带一提,PodManager
的代码我不是很熟悉,HandlePodAdditions
的这段代码为何如此我暂且蒙古里。
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
for _, pod := range pods {
existingPods := kl.podManager.GetPods() // ? Why we need repeat GetPods in iteration?
kl.podManager.AddPod(pod)
}
}
StatusManager.TerminatePod #
TerminatePod
会将请求发送给 podStatusChannel
这个 channel。
在 StatusManager.Start
中,会不断读取这个 channel,调用 syncPod
处理。
注意此处的 syncPod
与 Kubelet.syncPod
不是同一个函数。因为 StatusManager 只负责处理 Status 相关的数据。这个函数最终会调用 util.PatchPodStatus
,通过 client API 发送 Patch 请求。如果 pod 可以被删除,它还会发送 Delete 请求。
func (m *manager) TerminatePod(pod *v1.Pod) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
m.updateStatusInternal(pod, status, true)
}
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
// necessary. Returns whether an update was triggered.
// This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {
select {
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
return true
default:
return false
}
}
PodWorkers.UpdatePod #
PodWorkers.UpdatePod
会根据 Pod 的 uid 启动一个 managePodLoop
的 goroutine,监听 podUpdates <-chan UpdatePodOptions
这个 channel。随后将更新的信息传递给 podUpdates
这个 channel。
// Apply the new setting to the specified pod.
// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
// Update requests are ignored if a kill pod request is pending.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates) // Handler here
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
}
}
在 managePodLoop
,它又会调用 syncPodFn
。
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
// notify the call-back function if the operation succeeded or not
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
}
}
而 syncPodFn
是在建立 PodWorker 时传递进来的,实际上是 Kubelet.syncPod
。
// pkg/kubelet/kubelet.go
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
...) (*Kubelet, error) {
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
}
Kubelet.syncPod #
真正的处理逻辑即是 syncPod。
Kubelet.syncPod 会调用一堆函数,包括生成各种事件、更新 Status、kill 掉不应该运行的 pod、创建目录、mount volume、同步 secret、调用容器 runtime 的 SyncPod
,等等。
configCh #
除了定时任务和 PLEG 事件外,syncLoopIteration
的事件均来自 configCh
。
那么,configCh
从何而来?追溯一下调用栈:
// kubelet/app/server.go
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
// NewMainKubelet should have set up a pod source config if one didn't exist
// when the builder was run. This is just a precaution.
if kubeDeps.PodConfig == nil {
return fmt.Errorf("failed to create kubelet, pod source config was nil")
}
podCfg := kubeDeps.PodConfig
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
}
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
go k.Run(podCfg.Updates())
从注释中也可以看出,实际上,kubeDeps.PodConfig
的初始化来源于 pkg
中的 NewMainKubelet
函数:
/// pkg/kubelet/kubelet.go
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
...) {
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
if err != nil {
return nil, err
}
}
}
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName) (*config.PodConfig, error) {
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
if kubeDeps.KubeClient != nil {
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
}
return cfg, nil
}
在 makePodSourceConfig
中,NewPodConfig
就生成了最后使用到的 channel。
但实际上我们看到 newSourceApiserverFromLW
中,实际写入的 channel 是 cfg.Channel(kubetypes.ApiserverSource)
,而不是 PodConfig.Updates()
。
PodConfig #
函数 cfg.Channel(source)
里实际工作的是 PodConfig.mux.Channel
这个函数。
// pkg/kubelet/config/config.go
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}
mux.Channel
为每个 source 创建一个 channel,并监听这个 channel 的变动,然后调用 merger.Merge
将所有 channel 的变动合并起来。
// pkg/util/config/config.go
func (m *Mux) Channel(source string) chan interface{} {
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}
创建 PodConfig
的时候就通过 newPodStorage
创建了 mux.merger
。可以看到创建 PodStorage
的 updates
与 PodConfig.updates
是同一个变量。
// pkg/kubelet/config/config.go
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: sets.String{},
}
return podConfig
}
PodStorage.Merge
保证了消息传递的顺序,并且还过滤了重复的消息。
ListAndWatch #
在 NewSourceApiserver
机制中,利用了 client-go
的 List and Watch 机制不断接收消息。相关稍后再议。
// pkg/kubelet/config/apiserver.go
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}