kubelet代码梳理及流程
在 Kubernetes 集群中,在每个 Node 节点上都会启动一个 kubelet 服务进程。该进程用于处理 Master 节点下发到本节点的任务,管理 Pod 及 Pod 中的容器。每个 Kubelet 进程会在 APIServer 上注册节点自身信息,定期向 Master 节点汇报节点资源的使用情况,并通过 cAdvise 监控容器和节点资源。
资源同步方式:
kubelet有几种方式获取自身Node上所需要运行的Pod清单。我们使用通过API Server监听etcd目录,同步Pod列表的方式。
kubelet通过API Server Client使用WatchAndList的方式监听etcd中/registry/nodes/${当前节点名称}和/registry/pods的目录,将获取的信息同步到本地缓存中。
kubelet监听etcd,执行对Pod的操作,对容器的操作则是通过Docker Client执行,例如启动删除容器等。
创建pod的前期准备工作
syncLoop为控制主函数,在 for 循环中一直调用 syncLoopIteration。
syncLoopIteration 这个方法就会对多个管道进行遍历,发现任何一个管道有消息就交给 handler 去处理。
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
if !open {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
...
case kubetypes.UPDATE:
...
case kubetypes.REMOVE:
...
case kubetypes.RECONCILE:
...
case kubetypes.DELETE:
...
case kubetypes.RESTORE:
...
case kubetypes.SET:
...
}
...
case e := <-plegCh:
...
case <-syncCh:
...
case update := <-kl.livenessManager.Updates():
...
case <-housekeepingCh:
...
}
return true
}
HandlePodAdditon处理新建pod事件。
dispatchwork把某个对 Pod 的操作(创建/更新/删除)下发给 podWorkers。
备注:probeManager add pod:如果定义了健康检查,启动goroutine进行健康检查。
podWorkers 子模块主要的作用就是处理针对每一个的 Pod 的更新事件,比如 Pod 的创建,删除,更新。
podWorkers 采取的基本思路是:为每一个 Pod 都单独创建一个 goroutine 和更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发(我理解这里完成第一次下发,后续监听其他的event)。
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
// 如果当前 pod 还没有启动过 goroutine ,则启动 goroutine,并且创建 channel
if podUpdates, exists = p.podUpdates[uid]; !exists {
// 创建 channel
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// 启动 goroutine
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
// 下发更新事件
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
// 写入刚刚创建的channel
podUpdates <- *options
} else {
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
managePodLoop 调用 syncPodFn 方法去同步 pod,syncPodFn 实际上就是kubelet.syncPod
kubelet.podWorkers = &fakePodWorkers{
syncPodFn: kubelet.syncPod,
cache: kubelet.podCache,
t: t,
}
SyncPod是创建容器前的准备工作:
在这个方法中,主要完成以下几件事情:
- 如果是删除 pod,立即执行并返回
- 同步 podStatus 到 kubelet.statusManager
- 检查 pod 是否能运行在本节点,主要是权限检查(是否能使用主机网络模式,是否可以以 privileged 权限运行等)。如果没有权限,就删除本地旧的 pod 并返回错误信息
- 创建 containerManagar 对象,并且创建 pod level cgroup,更新 Qos level cgroup
- 如果是 static Pod,就创建或者更新对应的 mirrorPod
- 创建 pod 的数据目录,存放 volume 和 plugin 信息,如果定义了 pv,等待所有的 volume mount 完成(volumeManager 会在后台做这些事情),如果有 image secrets,去 apiserver 获取对应的 secrets 数据
- 然后调用 kubelet.volumeManager 组件,等待它将 pod 所需要的所有外挂的 volume 都准备好(此处可进行需求一的开发,生成默认nginx.conf)。
- 调用 container runtime 的 SyncPod 方法,去实现真正的容器创建逻辑。
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// pull out the required options
// 该pod存放着要创建的所有信息,详见数据结构调研
pod := o.pod
mirrorPod := o.mirrorPod
podStatus := o.podStatus
updateType := o.updateType
// 是否为 删除 pod
if updateType == kubetypes.SyncPodKill {
...
}
...
// 检查 pod 是否能运行在本节点
runnable := kl.canRunPod(pod)
if !runnable.Admit {
...
}
// 更新 pod 状态
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// 如果 pod 非 running 状态则直接 kill 掉
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
...
}
// 加载网络插件
if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
...
}
pcm := kl.containerManager.NewPodContainerManager()
if !kl.podIsTerminated(pod) {
...
// 创建并更新 pod 的 cgroups
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if !pcm.Exists(pod) {
...
}
}
}
// 为 static pod 创建对应的 mirror pod
if kubepod.IsStaticPod(pod) {
...
}
// 创建数据目录
if err := kl.makePodDataDirs(pod); err != nil {
...
}
// 挂载 volume
if !kl.podIsTerminated(pod) {
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
...
}
}
// 获取 secret 信息
pullSecrets := kl.getPullSecretsForPod(pod)
// 调用 containerRuntime 的 SyncPod 方法开始创建容器
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
...
}
return nil
}
在该函数确认container创建成功返回之前,可以进行neutron上报。
创建和启动容器
containerRuntime(pkg/kubelet/kuberuntime)子模块的 SyncPod 函数才是真正完成 pod 内容器实体的创建。
syncPod 主要执行以下几个操作:
计算 sandbox 和 container 是否发生变化
创建 sandbox 容器
启动 init 容器
启动业务容器
最终由 startContainer 完成容器的启动。
主要有以下几个步骤:
拉取镜像
生成业务容器的配置信息
调用 docker api 创建容器
启动容器
执行 post start hook
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
// 1、检查业务镜像是否存在,不存在则到 Docker Registry 或是 Private Registry 拉取镜像。
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
if err != nil {
...
}
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
...
}
// 设置 RestartCount
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
restartCount = containerStatus.RestartCount + 1
}
// 2、生成业务容器的配置信息
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
if cleanupAction != nil {
defer cleanupAction()
}
...
// 3、通过 client.CreateContainer 调用 docker api 创建业务容器
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
if err != nil {
...
}
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
if err != nil {
...
}
...
// 3、启动业务容器
err = m.runtimeService.StartContainer(containerID)
if err != nil {
...
}
containerMeta := containerConfig.GetMetadata()
sandboxMeta := podSandboxConfig.GetMetadata()
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
sandboxMeta.Namespace)
containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
legacySymlink, containerID, containerLog, err)
}
}
// 4、执行 post start hook
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
kubeContainerID := kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}
// runner.Run 这个方法的主要作用就是在业务容器起来的时候,
// 首先会执行一个 container hook(PostStart 和 PreStop),做一些预处理工作。
// 只有 container hook 执行成功才会运行具体的业务服务,否则容器异常。
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
...
}
}
return "", nil
}