Startup

Kubelet Startup #

Kubelet 的起始源码按照惯例位于 cmd/kubelet/kubelet.go,本质上是对 pkg/kubelet/kubelet.go 的封装。

Kubelet startup:

sequenceDiagram participant main as cmd/kubelet/kubelet.go participant server as cmd/kubelet/app/server.go participant kubelet as pkg/kubelet/kubelet.go participant cfg as pkg/kubelet/config/config.go participant lw as ListAndWatch main-->>server: NewKubeletCommand() server-->>server: Run()
RunKubelet()
createAndInitKubelet() server-->>+kubelet: NewMainKubelet() kubelet-->>+cfg: NewPodConfig() cfg-->>+lw: NewSourceApiserver() lw-->>-cfg: Channel cfg-->>-kubelet: *PodConfig kubelet-->>-server: *Kubelet #server-->>kubelet: StartGarbageCollection server-->>server: startKubelet() server-->>kubelet: go Run() par loop Events from *PodConfig lw-->>kubelet: Events kubelet-->>kubelet: syncLoop end server-->>kubelet: go ListenAndServe()

Kubelet 的核心组件(在 NewMainKubelet 中初始化)包括但不限于:

  • kubelet
    • SharedInformer
    • Indexer
    • NodeLister
    • ServiceLister
    • ListWatch
    • ProbeManager
      • StatusManager
      • LivenessManager
      • StartupManager
    • PodManager
    • Runtime (NewKubeGenericRuntimeManager)
    • PLEG
    • ContainerGC
    • ImageGCManager
    • CertificateManager
    • TokenManager
    • PluginManager
    • VolumeManager
    • PodWorkers
    • PodKiller
    • EvictionManager
    • PodAdmitHandler
    • PodSyncLoopHandler (ActiveDeadlineHandler)
    • PodSyncHandler (ActiveDeadlineHandler)

Run #

Run 函数启动 kublet 的核心功能:处理配置更新。

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
  kl.imageManager.Start()
  kl.serverCertificateManager.Start()
  kl.oomWatcher.Start(kl.nodeRef)
  kl.resourceAnalyzer.Start()
  go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
  go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
  go kl.fastStatusUpdateOnce()
  go kl.nodeLeaseController.Run(wait.NeverStop)
  go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
  kl.initNetworkUtil()
  // Start a goroutine responsible for killing pods (that are not properly
  // handled by pod workers).
  go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)
  // Start component sync loops.
  kl.statusManager.Start()
  kl.probeManager.Start()
  kl.runtimeClassManager.Start(wait.NeverStop)
  // Start the pod lifecycle event generator.
  kl.pleg.Start()
  
  kl.syncLoop(updates, kl)
}

各个 manager 的启动与 iptables 的配置暂时可以略过。重点关注 syncLoop 函数。这是 kubelet 的核心函数。

syncLoop #

syncLoop 最后的 for 循环里,调用了 syncLoopIteration 函数:

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
  syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool

参数 configCh 是接收配置更新的 channel,后续配置的变更都从这个 channel 读取。syncChhousekeepingCh 是定时触发的同步事件。

此外,函数还从 livenessManager 里读取事件。

参数 handler 是对 Pod 进行操作的具体的实现,此处传递进来的就是 kl *Kubelet 自身。


// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
  kl.syncLoop(updates, kl)
}

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  for {
    if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
      break
    }
  }
}

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
  syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
  select {
  case u, open := <-configCh:
    switch u.Op {
    case kubetypes.ADD:
      // After restarting, kubelet will get all existing pods through
      // ADD as if they are new pods. These pods will then go through the
      // admission process and *may* be rejected. This can be resolved
      // once we have checkpointing.
      handler.HandlePodAdditions(u.Pods)
    case kubetypes.UPDATE:
      handler.HandlePodUpdates(u.Pods)
    case kubetypes.REMOVE:
      handler.HandlePodRemoves(u.Pods)
    case kubetypes.RECONCILE:
      handler.HandlePodReconcile(u.Pods)
    case kubetypes.DELETE:
      // DELETE is treated as a UPDATE because of graceful deletion.
      handler.HandlePodUpdates(u.Pods)
    default:
      klog.Errorf("Invalid event type received: %d.", u.Op)
    }

    kl.sourcesReady.AddSource(u.Source)
  }
  return true
}

SyncHandler (interface) #

总的来说,所有的事件都是 SyncHandler 这个接口的实现(此处即是 *Kubelet)处理的,以 HandlePodUpdates 为例:

// HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
  start := kl.clock.Now()
  for _, pod := range pods {
    kl.podManager.UpdatePod(pod)
    if kubetypes.IsMirrorPod(pod) {
      kl.handleMirrorPod(pod, start)
      continue
    }
    mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
  }
}

Kubelet.SyncHandler 的函数实现,除了 HandlePodRemoves 之外,无论是 Add 还是 Update,最终都会调用 dispatchWork,而这个函数会将请求派发给 statusManager.TerminatePodpodWorkers.UpdatePod 进行处理。

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
  // check whether we are ready to delete the pod from the API server (all status up to date)
  containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
  if pod.DeletionTimestamp != nil && containersTerminal {
    kl.statusManager.TerminatePod(pod)
    return
  }

  // Run the sync in an async worker.
  kl.podWorkers.UpdatePod(&UpdatePodOptions{
    Pod:        pod,
    MirrorPod:  mirrorPod,
    UpdateType: syncType,
  })
}

顺带一提,PodManager 的代码我不是很熟悉,HandlePodAdditions 的这段代码为何如此我暂且蒙古里。

// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
  for _, pod := range pods {
    existingPods := kl.podManager.GetPods() // ? Why we need repeat GetPods in iteration?
    kl.podManager.AddPod(pod)
  }
}

StatusManager.TerminatePod #

TerminatePod 会将请求发送给 podStatusChannel 这个 channel。

StatusManager.Start 中,会不断读取这个 channel,调用 syncPod 处理。

注意此处的 syncPodKubelet.syncPod 不是同一个函数。因为 StatusManager 只负责处理 Status 相关的数据。这个函数最终会调用 util.PatchPodStatus,通过 client API 发送 Patch 请求。如果 pod 可以被删除,它还会发送 Delete 请求。

func (m *manager) TerminatePod(pod *v1.Pod) {
  m.podStatusesLock.Lock()
  defer m.podStatusesLock.Unlock()

  m.updateStatusInternal(pod, status, true)
}
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
// necessary. Returns whether an update was triggered.
// This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {
  select {
  case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
    return true
  default:
    return false
  }
}

PodWorkers.UpdatePod #

PodWorkers.UpdatePod 会根据 Pod 的 uid 启动一个 managePodLoop 的 goroutine,监听 podUpdates <-chan UpdatePodOptions 这个 channel。随后将更新的信息传递给 podUpdates 这个 channel。

// Apply the new setting to the specified pod.
// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
// Update requests are ignored if a kill pod request is pending.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
  pod := options.Pod
  uid := pod.UID
  var podUpdates chan UpdatePodOptions
  var exists bool

  p.podLock.Lock()
  defer p.podLock.Unlock()
  if podUpdates, exists = p.podUpdates[uid]; !exists {
    podUpdates = make(chan UpdatePodOptions, 1)
    p.podUpdates[uid] = podUpdates

    go func() {
      defer runtime.HandleCrash()
      p.managePodLoop(podUpdates) // Handler here
    }()
  }
  if !p.isWorking[pod.UID] {
    p.isWorking[pod.UID] = true
    podUpdates <- *options
  }
}

managePodLoop,它又会调用 syncPodFn

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
  var lastSyncTime time.Time
  for update := range podUpdates {
    err := func() error {
      podUID := update.Pod.UID
      err = p.syncPodFn(syncPodOptions{
        mirrorPod:      update.MirrorPod,
        pod:            update.Pod,
        podStatus:      status,
        killPodOptions: update.KillPodOptions,
        updateType:     update.UpdateType,
      })
      lastSyncTime = time.Now()
      return err
    }()
    // notify the call-back function if the operation succeeded or not
    if update.OnCompleteFunc != nil {
      update.OnCompleteFunc(err)
    }
  }
}

syncPodFn 是在建立 PodWorker 时传递进来的,实际上是 Kubelet.syncPod

// pkg/kubelet/kubelet.go
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
  ...) (*Kubelet, error) {
  klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
}

Kubelet.syncPod #

真正的处理逻辑即是 syncPod。

Kubelet.syncPod 会调用一堆函数,包括生成各种事件、更新 Status、kill 掉不应该运行的 pod、创建目录、mount volume、同步 secret、调用容器 runtime 的 SyncPod,等等。

configCh #

除了定时任务和 PLEG 事件外,syncLoopIteration 的事件均来自 configCh

那么,configCh 从何而来?追溯一下调用栈:

// kubelet/app/server.go
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
  // NewMainKubelet should have set up a pod source config if one didn't exist
  // when the builder was run. This is just a precaution.
  if kubeDeps.PodConfig == nil {
    return fmt.Errorf("failed to create kubelet, pod source config was nil")
  }
  podCfg := kubeDeps.PodConfig
  startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
}

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
  go k.Run(podCfg.Updates())

从注释中也可以看出,实际上,kubeDeps.PodConfig 的初始化来源于 pkg 中的 NewMainKubelet 函数:

/// pkg/kubelet/kubelet.go
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
  kubeDeps *Dependencies,
  ...) {
  if kubeDeps.PodConfig == nil {
    var err error
    kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
    if err != nil {
      return nil, err
    }
  }
}

// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName) (*config.PodConfig, error) {
  // source of all configuration
  cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

  if kubeDeps.KubeClient != nil {
    config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
  }
  return cfg, nil
}

makePodSourceConfig 中,NewPodConfig 就生成了最后使用到的 channel。

但实际上我们看到 newSourceApiserverFromLW 中,实际写入的 channel 是 cfg.Channel(kubetypes.ApiserverSource),而不是 PodConfig.Updates()

PodConfig #

函数 cfg.Channel(source) 里实际工作的是 PodConfig.mux.Channel 这个函数。

// pkg/kubelet/config/config.go
func (c *PodConfig) Channel(source string) chan<- interface{} {
  c.sourcesLock.Lock()
  defer c.sourcesLock.Unlock()
  c.sources.Insert(source)
  return c.mux.Channel(source)
}

mux.Channel 为每个 source 创建一个 channel,并监听这个 channel 的变动,然后调用 merger.Merge 将所有 channel 的变动合并起来。

// pkg/util/config/config.go
func (m *Mux) Channel(source string) chan interface{} {
  m.sourceLock.Lock()
  defer m.sourceLock.Unlock()
  channel, exists := m.sources[source]
  if exists {
    return channel
  }
  newChannel := make(chan interface{})
  m.sources[source] = newChannel
  go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
  return newChannel
}

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
  for update := range listenChannel {
    m.merger.Merge(source, update)
  }
}

创建 PodConfig 的时候就通过 newPodStorage 创建了 mux.merger。可以看到创建 PodStorageupdatesPodConfig.updates 是同一个变量。

// pkg/kubelet/config/config.go
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
  updates := make(chan kubetypes.PodUpdate, 50)
  storage := newPodStorage(updates, mode, recorder)
  podConfig := &PodConfig{
    pods:    storage,
    mux:     config.NewMux(storage),
    updates: updates,
    sources: sets.String{},
  }
  return podConfig
}

PodStorage.Merge 保证了消息传递的顺序,并且还过滤了重复的消息。

ListAndWatch #

NewSourceApiserver 机制中,利用了 client-go 的 List and Watch 机制不断接收消息。相关稍后再议。

// pkg/kubelet/config/apiserver.go
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
  lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
  newSourceApiserverFromLW(lw, updates)
}

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
  send := func(objs []interface{}) {
    var pods []*v1.Pod
    for _, o := range objs {
      pods = append(pods, o.(*v1.Pod))
    }
    updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
  }
  r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
  go r.Run(wait.NeverStop)
}