Контекст планирования для kube-scheduler


Структура планировщика

Планировщик — это структура для всего kube-scheduler, предоставляющая компоненты, необходимые для работы kube-scheduler.

type Scheduler struct {
    // Cache是一个抽象,会缓存pod的信息,作为scheduler进行查找,操作是基于Pod进行增加
    Cache internalcache.Cache
    // Extenders 算是调度框架中提供的调度插件,会影响kubernetes中的调度策略
    Extenders []framework.Extender

    // NextPod 作为一个函数提供,会阻塞获取下一个ke'diao'du
    NextPod func() *framework.QueuedPodInfo

    // Error is called if there is an error. It is passed the pod in
    // question, and the error
    Error func(*framework.QueuedPodInfo, error)

    // SchedulePod 尝试将给出的pod调度到Node。
    SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)

    // 关闭scheduler的信号
    StopEverything <-chan struct{}

    // SchedulingQueue保存要调度的Pod
    SchedulingQueue internalqueue.SchedulingQueue

    // Profiles中是多个调度框架
    Profiles profile.Map
    client clientset.Interface
    nodeInfoSnapshot *internalcache.Snapshot
    percentageOfNodesToScore int32
    nextStartNodeIndex int
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Поскольку два ядра реальной реализации, SchedulingQueue и scheduleOne будут проанализированы для обоих

SchedulingQueue

Узнав, как инициализируется kube-scheduler, необходимо проанализировать всю структуру и рабочий процесс kube-scheduler.

В Run запускается SchedulingQueue и scheduleOne, который структурно является планировщиком.

func (sched *Scheduler) Run(ctx context.Context) {
    sched.SchedulingQueue.Run()

    // We need to start scheduleOne loop in a dedicated goroutine,
    // because scheduleOne function hangs on getting the next item
    // from the SchedulingQueue.
    // If there are no new pods to schedule, it will be hanging there
    // and if done in this goroutine it will be blocking closing
    // SchedulingQueue, in effect causing a deadlock on shutdown.
    go wait.UntilWithContext(ctx, sched.scheduleOne, 0)

    <-ctx.Done()
    sched.SchedulingQueue.Close()
}

Войдите в полноэкранный режим Выход из полноэкранного режима

SchedulingQueue — это абстракция очереди, в которой хранятся капсулы, ожидающие планирования; этот интерфейс повторяет интерфейс кэша.

type SchedulingQueue interface {
    framework.PodNominator
    Add(pod *v1.Pod) error
    // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
    // The passed-in pods are originally compiled from plugins that want to activate Pods,
    // by injecting the pods through a reserved CycleState struct (PodsToActivate).
    Activate(pods map[string]*v1.Pod)
    // 将不可调度的Pod重入到队列中
    AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    // SchedulingCycle returns the current number of scheduling cycle which is
    // cached by scheduling queue. Normally, incrementing this number whenever
    // a pod is popped (e.g. called Pop()) is enough.
    SchedulingCycle() int64
    // Pop会弹出一个pod,并从head优先级队列中删除
    Pop() (*framework.QueuedPodInfo, error)
    Update(oldPod, newPod *v1.Pod) error
    Delete(pod *v1.Pod) error
    MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
    AssignedPodAdded(pod *v1.Pod)
    AssignedPodUpdated(pod *v1.Pod)
    PendingPods() []*v1.Pod
    // Close closes the SchedulingQueue so that the goroutine which is
    // waiting to pop items can exit gracefully.
    Close()
    // Run starts the goroutines managing the queue.
    Run()
}
Войдите в полноэкранный режим Выход из полноэкранного режима

PriorityQueue является реализацией SchedulingQueue, которая состоит из двух подвопросов и структуры данных, activeQ, backoffQ и unschedulablePods. unschedulablePods

type SchedulingQueue interface {
    framework.PodNominator
    Add(pod *v1.Pod) error
    // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
    // The passed-in pods are originally compiled from plugins that want to activate Pods,
    // by injecting the pods through a reserved CycleState struct (PodsToActivate).
    Activate(pods map[string]*v1.Pod)
    // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
    // The podSchedulingCycle represents the current scheduling cycle number which can be
    // returned by calling SchedulingCycle().
    AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    // SchedulingCycle returns the current number of scheduling cycle which is
    // cached by scheduling queue. Normally, incrementing this number whenever
    // a pod is popped (e.g. called Pop()) is enough.
    SchedulingCycle() int64
    // Pop removes the head of the queue and returns it. It blocks if the
    // queue is empty and waits until a new item is added to the queue.
    Pop() (*framework.QueuedPodInfo, error)
    Update(oldPod, newPod *v1.Pod) error
    Delete(pod *v1.Pod) error
    MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
    AssignedPodAdded(pod *v1.Pod)
    AssignedPodUpdated(pod *v1.Pod)
    PendingPods() []*v1.Pod
    // Close closes the SchedulingQueue so that the goroutine which is
    // waiting to pop items can exit gracefully.
    Close()
    // Run starts the goroutines managing the queue.
    Run()
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Вы можете видеть, что эта очередь инициализируется в New scheduler

podQueue := internalqueue.NewSchedulingQueue(
    // 实现pod对比的一个函数即less
    profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
    informerFactory,
    internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
    internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
    internalqueue.WithPodNominator(nominator),
    internalqueue.WithClusterEventMap(clusterEventMap),
    internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)
Войдите в полноэкранный режим Выход из полноэкранного режима

И NewSchedulingQueue инициализирует эту PriorityQueue

// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option) SchedulingQueue {
    return NewPriorityQueue(lessFn, informerFactory, opts...)
}

// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option,
) *PriorityQueue {
    options := defaultPriorityQueueOptions
    for _, opt := range opts {
        opt(&options)
    }
    // 这个就是 less函数,作为打分的一部分
    comp := func(podInfo1, podInfo2 interface{}) bool {
        pInfo1 := podInfo1.(*framework.QueuedPodInfo)
        pInfo2 := podInfo2.(*framework.QueuedPodInfo)
        return lessFn(pInfo1, pInfo2)
    }

    if options.podNominator == nil {
        options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
    }

    pq := &PriorityQueue{
        PodNominator:                      options.podNominator,
        clock:                             options.clock,
        stop:                              make(chan struct{}),
        podInitialBackoffDuration:         options.podInitialBackoffDuration,
        podMaxBackoffDuration:             options.podMaxBackoffDuration,
        podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
        activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
        unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
        moveRequestCycle:                  -1,
        clusterEventMap:                   options.clusterEventMap,
    }
    pq.cond.L = &pq.lock
    pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
    pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()

    return pq
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Как только вы поняли структуру очереди, вам нужно знать, где работают входящие и исходящие очереди. Во время инициализации регистрируется addEventHandlerFuncs. В этот момент внедряются три функции действия, что является концепцией в контроллере; и в AddFunc можно увидеть, что очередь введена.

Инъекция происходит в информер Pod, а функция addPodToSchedulingQueue является записью в стеке

Handler: cache.ResourceEventHandlerFuncs{
    AddFunc:    sched.addPodToSchedulingQueue,
    UpdateFunc: sched.updatePodInSchedulingQueue,
    DeleteFunc: sched.deletePodFromSchedulingQueue,
},

func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
    pod := obj.(*v1.Pod)
    klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod))
    if err := sched.SchedulingQueue.Add(pod); err != nil {
        utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Реализацией этой SchedulingQueue является PriorityQueue, а действие в Add предназначено для activeQ

func (p *PriorityQueue) Add(pod *v1.Pod) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    // 格式化入栈数据,包含podinfo,里会包含v1.Pod
    // 初始化的时间,创建的时间,以及不能被调度时的记录其plugin的名称
    pInfo := p.newQueuedPodInfo(pod)
    // 入栈
    if err := p.activeQ.Add(pInfo); err != nil {
        klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod))
        return err
    }
    if p.unschedulablePods.get(pod) != nil {
        klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
        p.unschedulablePods.delete(pod)
    }
    // Delete pod from backoffQ if it is backing off
    if err := p.podBackoffQ.Delete(pInfo); err == nil {
        klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
    }
    metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
    p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
    p.cond.Broadcast()

    return nil
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Когда вы смотрите на структуру планировщика выше, вы можете увидеть, что есть nextPod. nextPod — это стручок, который выводится из очереди, и он передается в MakeNextPodFunc, когда планировщик находится на месте, то есть это nextpod

func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
    return func() *framework.QueuedPodInfo {
        podInfo, err := queue.Pop()
        if err == nil {
            klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
            for plugin := range podInfo.UnschedulablePlugins {
                metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()
            }
            return podInfo
        }
        klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
        return nil
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Этот queue.Pop() соответствует PriorityQueue‘s Pop(), который будет использоваться здесь как потребляющий конец активнойQ

func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
   p.lock.Lock()
   defer p.lock.Unlock()
   for p.activeQ.Len() == 0 {
      // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
      // When Close() is called, the p.closed is set and the condition is broadcast,
      // which causes this loop to continue and return from the Pop().
      if p.closed {
         return nil, fmt.Errorf(queueClosed)
      }
      p.cond.Wait()
   }
   obj, err := p.activeQ.Pop()
   if err != nil {
      return nil, err
   }
   pInfo := obj.(*framework.QueuedPodInfo)
   pInfo.Attempts++
   p.schedulingCycle++
   return pInfo, nil
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Как вы можете видеть в разделе записи выше, scheduleOne и scheduler, scheduleOne должен потреблять Pod, он вызовет NextPod, NextPod — это MakeNextPodFunc, переданный при инициализации, который вернется к соответствующему Pop для выполнения потребления.

schedulerOne — это процесс составления расписания для Pod.

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
    if err != nil {
        // This shouldn't happen, because we only accept for scheduling the pods
        // which specify a scheduler name that matches one of the profiles.
        klog.ErrorS(err, "Error occurred")
        return
    }
    if sched.skipPodSchedule(fwk, pod) {
        return
    }
...
Войдите в полноэкранный режим Выход из полноэкранного режима

Контекст планирования

Рисунок 1: Контекст планирования бутона

Источник: https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework

После того как структура планировщика понятна, ниже анализируется процесс планирования контекста. Посмотрите, как работают точки расширения. На этом этапе снова необходимо обратиться к диаграмме контекста планирования на сайте.

Система планирования [2]

Фреймворк планирования (scheduling framework SF ) — это подключаемая архитектура для планировщика, разработанная kubernetes. SF разрабатывает планировщик как API в стиле Plugin, который реализует некоторые политики планирования, упомянутые в предыдущей главе, как < #code>Plugin.

В SF определен ряд точек расширения (extension points EPs), и планировщики, реализованные как Plugins, регистрируются в одной или нескольких EPs, другими словами, во время выполнения этих EPs, если они зарегистрированы в более чем одной EP, они будут вызываться в более чем одной EP.

Каждый планировщик делится на две фазы, цикл планирования (Scheduling Cycel) и цикл связывания (Binding Cycle).

  • SC представлен как выбор узла для Pod; SC выполняется последовательно.
  • BC представляется как применение результатов решения SC к кластеру; BC может выполняться одновременно.

Комбинация цикла планирования и цикла связывания известна как Контекст планирования (Scheduling Context), а приведенная ниже диаграмма показывает рабочий процесс Контекста планирования

Примечание: Если в результате принятия решения не окажется узлов, доступных для результата планирования Pod, или возникнет внутренняя ошибка, SC или BC прерывается, и Pod повторно ставится в очередь для повторной попытки.

Точка расширения [3]

Точки расширения (Extension points) — это каждый расширяемый API в контексте планирования, как показано на диаграмме [Рисунок 1]. Где Filter эквивалентен Predicate, а Scoring эквивалентен Priority.

Для этапа составления расписания будут пройдены следующие точки расширения.

  • Sort: Этот плагин предоставляет функциональность сортировки для сортировки Pods, которые ожидают своей очереди планирования. Одновременно может быть включена только одна сортировка очереди.

  • preFilter: Этот плагин используется для предварительной обработки или проверки информации о Pod или кластере перед фильтрацией. Планирование будет прекращено здесь

  • filter: Этот плагин является эквивалентом Predicates в контексте планирования и используется для исключения узлов, которые не могут запускать Pods. filter будет вызываться в настроенном порядке. Если фильтр помечает узел как недоступный, то Pod помечается как незапланированный (т.е. он не будет выполняться в дальнейшем).

  • postFilter: Если для стручка не найден FN, плагин вызывается в настроенном порядке. Если какой-либо плагин postFilter пометит Pod как планируемый, остальные плагины не будут вызваны. То есть, этот шаг не будет выполняться после успешного filter.

  • preScore: может использоваться для выполнения работы перед оценкой (уведомление о точках расширения).

  • score: этот плагин предоставляет оценку для каждого узла, который проходит этап filter. Затем планировщик выберет узел с наибольшей суммой взвешенных баллов.

  • reserve: Поскольку события привязки происходят асинхронно, этот плагин предназначен для того, чтобы избежать планирования новых Pods, когда они привязаны к Node до Node, заставляя Node использовать больше ресурсов, чем доступно. Если в последующей фазе происходит ошибка или сбой, то запускается откат UnReserve (уведомляемая точка расширения). Это состояние также используется как последнее состояние в цикле планирования, либо успешное для postBind, либо неудачное для запуска UnReserve.

  • permit: Этот плагин блокирует или задерживает привязку Pods, обычно этот шаг делает три вещи.

  • preBind: Этот плагин используется для выполнения необходимой предварительной работы перед связыванием Pods. Например, preBind может предоставить сетевой том и смонтировать его на целевом узле. Если какой-либо подключаемый модуль на этом этапе возвращает ошибку, Pod будет deny и помещен в очередь планирования.

  • bind: После завершения всех preBind, плагин будет использоваться для привязки Pod к Node, и плагины, связывающие этот шаг, будут вызываться по порядку. Если событие обрабатывает один плагин, то все остальные плагины игнорируются.

  • postBind: Этот плагин вызывается после привязки Pod и может использоваться для очистки связанных ресурсов (информативных точек расширения).

  • multiPoint: Это поле только для конфигурации, которое позволяет включать или отключать плагины для всех применимых точек расширения одновременно.

Реализация контекста планирования в коде планировщика — scheduleOne, и вот взгляд на этот контекст планирования

Сортировать

Плагин Sort предоставляет функциональность сортировки для сортировки ожидающих капсул в очереди планирования. Одновременно для сортировки может быть включена только одна очередь.

После ввода scheduleOne, NextPod получает Pod из очереди в activeQ, а затем frameworkForPod выполняет действие скоринга Первая точка расширения контекста планирования sort.

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
...

func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
    // 获取指定的profile
    fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
    if !ok {
        return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
    }
    return fwk, nil
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Вспомните, как эта функция сортировки инициализируется в планировщике New

podQueue := internalqueue.NewSchedulingQueue(
    profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
    informerFactory,
    internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
    internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
    internalqueue.WithPodNominator(nominator),
    internalqueue.WithClusterEventMap(clusterEventMap),
    internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)
Войдите в полноэкранный режим Выход из полноэкранного режима

preFilter

preFilter используется в качестве первой точки расширения для предварительной обработки или проверки информации о Pod или кластере перед фильтрацией. Здесь планирование прекращается

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
    if err != nil {
        // This shouldn't happen, because we only accept for scheduling the pods
        // which specify a scheduler name that matches one of the profiles.
        klog.ErrorS(err, "Error occurred")
        return
    }
    if sched.skipPodSchedule(fwk, pod) {
        return
    }

    klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    state := framework.NewCycleState()
    state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
    // Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
    podsToActivate := framework.NewPodsToActivate()
    state.Write(framework.PodsToActivateKey, podsToActivate)

    schedulingCycleCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    // 这里将进入prefilter
    scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
Войдите в полноэкранный режим Выход из полноэкранного режима

schedulePod пытается запланировать данный pod на один из узлов в списке узлов. В случае успеха он вернет имя узла.

func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    defer trace.LogIfLong(100 * time.Millisecond)
    // 用于将cache更新为当前内容
    if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
        return result, err
    }
    trace.Step("Snapshotting scheduler cache and node infos done")

    if sched.nodeInfoSnapshot.NumNodes() == 0 {
        return result, ErrNoNodesAvailable
    }
    // 找到一个合适的pod时,会执行扩展点
    feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)

    ...
Войдите в полноэкранный режим Выход из полноэкранного режима

findNodesThatFitPod выполнит соответствующий плагин фильтрации для поиска наиболее подходящего Узла, включая примечания, и имя метода можно увидеть, здесь запущенный плагин 😁😁😁😁, позже будет анализировать содержание алгоритма, только для обучения рабочего процесса.

func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    diagnosis := framework.Diagnosis{
        NodeToStatusMap:      make(framework.NodeToStatusMap),
        UnschedulablePlugins: sets.NewString(),
    }

    // Run "prefilter" plugins.
    preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
    allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
    if err != nil {
        return nil, diagnosis, err
    }
    if !s.IsSuccess() {
        if !s.IsUnschedulable() {
            return nil, diagnosis, s.AsError()
        }
        // All nodes will have the same status. Some non trivial refactoring is
        // needed to avoid this copy.
        for _, n := range allNodes {
            diagnosis.NodeToStatusMap[n.Node().Name] = s
        }
        // Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins.
        if s.FailedPlugin() != "" {
            diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
        }
        return nil, diagnosis, nil
    }

    // "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
    // This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
    if len(pod.Status.NominatedNodeName) > 0 {
        feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
        if err != nil {
            klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
        }
        // Nominated node passes all the filters, scheduler is good to assign this node to the pod.
        if len(feasibleNodes) != 0 {
            return feasibleNodes, diagnosis, nil
        }
    }

    nodes := allNodes
    if !preRes.AllNodes() {
        nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
        for n := range preRes.NodeNames {
            nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
            if err != nil {
                return nil, diagnosis, err
            }
            nodes = append(nodes, nInfo)
        }
    }
    feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
    if err != nil {
        return nil, diagnosis, err
    }

    feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
    if err != nil {
        return nil, diagnosis, err
    }
    return feasibleNodes, diagnosis, nil
}
Войдите в полноэкранный режим Выход из полноэкранного режима

фильтр

Плагин filter является эквивалентом Predicates в контексте планирования, и используется для исключения узлов, которые не могут запускать Pods. filter вызывается в настроенном порядке. Если фильтр помечает узел как недоступный, то Pod помечается как незапланированный (т.е. он не будет выполняться в дальнейшем).

Для целей кода фильтр по-прежнему находится в функции findNodesThatFitPod, findNodesThatPassFilters, которая получает FN, т.е. узел линии, и этот процесс является точкой расширения фильтра.

func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    ...

    feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
    if err != nil {
        return nil, diagnosis, err
    }

    feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
    if err != nil {
        return nil, diagnosis, err
    }
    return feasibleNodes, diagnosis, nil
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Постфильтр

Если для стручка не найдено FN, плагин будет вызван в настроенном порядке. Если какой-либо плагин postFilter отмечает pod как планируемый, остальные плагины вызываться не будут. То есть, этот шаг не будет выполнен после успешного filter, поэтому давайте проверим здесь 😊.

В scheduleOne, когда запускаемый нами SchedulePod завершается (успешно или неудачно), возвращается ошибка, и postfilter будет выбирать выполнять или нет в соответствии с этой ошибкой, в соответствии с официальным заявлением.

scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    if err != nil {
        // SchedulePod() may have failed because the pod would not fit on any host, so we try to
        // preempt, with the expectation that the next time the pod is tried for scheduling it
        // will fit due to the preemption. It is also possible that a different pod will schedule
        // into the resources that were preempted, but this is harmless.
        var nominatingInfo *framework.NominatingInfo
        if fitError, ok := err.(*framework.FitError); ok {
            if !fwk.HasPostFilterPlugins() {
                klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
            } else {
                // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
                result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
                if status.Code() == framework.Error {
                    klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                } else {
                    fitError.Diagnosis.PostFilterMsg = status.Message()
                    klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                }
                if result != nil {
                    nominatingInfo = result.NominatingInfo
                }
            }
            // Pod did not fit anywhere, so it is counted as a failure. If preemption
            // succeeds, the pod should get counted as a success the next time we try to
            // schedule it. (hopefully)
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else if err == ErrNoNodesAvailable {
            nominatingInfo = clearNominatedNode
            // No nodes available is counted as unschedulable rather than an error.
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else {
            nominatingInfo = clearNominatedNode
            klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        }
        sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
        return
    }
Войдите в полноэкранный режим Выход из полноэкранного режима

PreScore, Score

Может использоваться для выполнения работы preScore, как расширение уведомления, которое свяжет плагин preScore для продолжения работы непосредственно после фильтра, вместо возврата, и если любой из этих плагинов настроен на возврат отказа, Pod будет отклонен.


func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    defer trace.LogIfLong(100 * time.Millisecond)

    if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
        return result, err
    }
    trace.Step("Snapshotting scheduler cache and node infos done")

    if sched.nodeInfoSnapshot.NumNodes() == 0 {
        return result, ErrNoNodesAvailable
    }

    feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
    if err != nil {
        return result, err
    }
    trace.Step("Computing predicates done")

    if len(feasibleNodes) == 0 {
        return result, &framework.FitError{
            Pod:         pod,
            NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
            Diagnosis:   diagnosis,
        }
    }

    // When only one node after predicate, just use it.
    if len(feasibleNodes) == 1 {
        return ScheduleResult{
            SuggestedHost:  feasibleNodes[0].Name,
            EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
            FeasibleNodes:  1,
        }, nil
    }
    // 这里会完成prescore,score
    priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
    if err != nil {
        return result, err
    }

    host, err := selectHost(priorityList)
    trace.Step("Prioritizing done")

    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
        FeasibleNodes:  len(feasibleNodes),
    }, err
}
Войдите в полноэкранный режим Выход из полноэкранного режима

priorityNodes оценивает Узел с помощью настроенных плагинов и возвращает оценку для каждого Узла, суммирует результаты оценки каждого плагина, чтобы получить оценку Узла, и, наконец, получает взвешенную общую оценку для Узла.

func prioritizeNodes(
    ctx context.Context,
    extenders []framework.Extender,
    fwk framework.Framework,
    state *framework.CycleState,
    pod *v1.Pod,
    nodes []*v1.Node,
) (framework.NodeScoreList, error) {
    // If no priority configs are provided, then all nodes will have a score of one.
    // This is required to generate the priority list in the required format
    if len(extenders) == 0 && !fwk.HasScorePlugins() {
        result := make(framework.NodeScoreList, 0, len(nodes))
        for i := range nodes {
            result = append(result, framework.NodeScore{
                Name:  nodes[i].Name,
                Score: 1,
            })
        }
        return result, nil
    }

    // Run PreScore plugins.
    preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
    if !preScoreStatus.IsSuccess() {
        return nil, preScoreStatus.AsError()
    }

    // Run the Score plugins.
    scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
    if !scoreStatus.IsSuccess() {
        return nil, scoreStatus.AsError()
    }

    // Additional details logged at level 10 if enabled.
    klogV := klog.V(10)
    if klogV.Enabled() {
        for plugin, nodeScoreList := range scoresMap {
            for _, nodeScore := range nodeScoreList {
                klogV.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", plugin, "node", nodeScore.Name, "score", nodeScore.Score)
            }
        }
    }

    // Summarize all scores.
    result := make(framework.NodeScoreList, 0, len(nodes))

    for i := range nodes {
        result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
        for j := range scoresMap {
            result[i].Score += scoresMap[j][i].Score
        }
    }

    if len(extenders) != 0 && nodes != nil {
        var mu sync.Mutex
        var wg sync.WaitGroup
        combinedScores := make(map[string]int64, len(nodes))
        for i := range extenders {
            if !extenders[i].IsInterested(pod) {
                continue
            }
            wg.Add(1)
            go func(extIndex int) {
                metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
                defer func() {
                    metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
                    wg.Done()
                }()
                prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
                if err != nil {
                    // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
                    klog.V(5).InfoS("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
                    return
                }
                mu.Lock()
                for i := range *prioritizedList {
                    host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                    if klogV.Enabled() {
                        klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score)
                    }
                    combinedScores[host] += score * weight
                }
                mu.Unlock()
            }(i)
        }
        // wait for all go routines to finish
        wg.Wait()
        for i := range result {
            // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
            // therefore we need to scale the score returned by extenders to the score range used by the scheduler.
            result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
        }
    }

    if klogV.Enabled() {
        for i := range result {
            klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score)
        }
    }
    return result, nil
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Резерв

Резерв используется для того, чтобы избежать отправки стручков в новые стручки до того, как они будут привязаны к узлу, что заставит узел использовать больше ресурсов, чем доступно, поскольку событие привязки происходит асинхронно. Если в последующей фазе происходит ошибка или сбой, запускается откат UnReserve (точка расширения уведомления). Это состояние также используется как последнее состояние в цикле планирования, либо успешное для postBind, либо неудачное для запуска UnReserve.

// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { // 当处理不成功时
    metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    // 触发 un-reserve 来清理相关Pod的状态
    fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
        klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
    }
    sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
    return
}
Войдите в полноэкранный режим Выход из полноэкранного режима

разрешение

Плагин Permit может блокировать или задерживать привязку Pod

    // Run "permit" plugins.
    runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
        var reason string
        if runPermitStatus.IsUnschedulable() {
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = v1.PodReasonUnschedulable
        } else {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = SchedulerError
        }
        // 只要其中一个插件返回的状态不是 success 或者 wait
        fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        // 从cache中忘掉pod
        if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
        }
        sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
        return
    }

Войдите в полноэкранный режим Выход из полноэкранного режима

Цикл связывания

После выбора FN происходит гипотетическое связывание и обновление кэша, затем выполняется реальное связывание, т.е. цикл связывания.

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    ...
    ...
    // binding cycle 是一个异步的操作,这里表现就是go协程
    go func() {
        bindingCycleCtx, cancel := context.WithCancel(ctx)
        defer cancel()
        metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
        defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
        // 运行WaitOnPermit插件,如果失败则,unReserve回滚
        waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
        if !waitOnPermitStatus.IsSuccess() {
            var reason string
            if waitOnPermitStatus.IsUnschedulable() {
                metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
                reason = v1.PodReasonUnschedulable
            } else {
                metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
                reason = SchedulerError
            }
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
                klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
            } else {
                // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
                // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
                // TODO(#103853): de-duplicate the logic.
                // Avoid moving the assumed Pod itself as it's always Unschedulable.
                // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
                // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
                defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
                    return assumedPod.UID != pod.UID
                })
            }
            sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
            return
        }

    // 运行Prebind 插件
        preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if !preBindStatus.IsSuccess() {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
                klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
            } else {
                // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
                // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
                // TODO(#103853): de-duplicate the logic.
                sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
            }
            sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
            return
        }
        // bind是真正的绑定操作
        err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
        if err != nil {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            // 如果失败了就触发 un-reserve plugins 
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if err := sched.Cache.ForgetPod(assumedPod); err != nil {
                klog.ErrorS(err, "scheduler cache ForgetPod failed")
            } else {
                // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
                // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
                // TODO(#103853): de-duplicate the logic.
                sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
            }
            sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
            return
        }
        // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
        klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
        metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
        metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
        metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))

        // 运行 "postbind" 插件
        // 是通知性的扩展点,该插件在绑定 Pod 后调用,可用于清理相关资源()。
        fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

        // At the end of a successful binding cycle, move up Pods if needed.
        if len(podsToActivate.Map) != 0 {
            sched.SchedulingQueue.Activate(podsToActivate.Map)
            // Unlike the logic in scheduling cycle, we don't bother deleting the entries
            // as `podsToActivate.Map` is no longer consumed.
        }
    }()
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Процесс отказа в контексте планирования

Выше приведены обычные запросы, ниже приведен анализ того, как повторно выполняются неудачные запросы, а свойства планировщика, связанные с обработкой неудач, относятся к backoffQ и unschedulablePods в структуре планировщика выше. код>

backoffQ и unschedulablePods инициализируются при инициализации планировщика, и

func NewPriorityQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option,
) *PriorityQueue {
    options := defaultPriorityQueueOptions
    for _, opt := range opts {
        opt(&options)
    }

    comp := func(podInfo1, podInfo2 interface{}) bool {
        pInfo1 := podInfo1.(*framework.QueuedPodInfo)
        pInfo2 := podInfo2.(*framework.QueuedPodInfo)
        return lessFn(pInfo1, pInfo2)
    }

    if options.podNominator == nil {
        options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
    }

    pq := &PriorityQueue{
        PodNominator:                      options.podNominator,
        clock:                             options.clock,
        stop:                              make(chan struct{}),
        podInitialBackoffDuration:         options.podInitialBackoffDuration,
        podMaxBackoffDuration:             options.podMaxBackoffDuration,
        podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
        activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
        unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
        moveRequestCycle:                  -1,
        clusterEventMap:                   options.clusterEventMap,
    }
    pq.cond.L = &pq.lock
    // 初始化backoffQ
    // NewWithRecorder作为一个可选的 metricRecorder 的 Heap 对象。
    // podInfoKeyFunc是一个函数,返回错误与字符串
    // pq.podsCompareBackoffCompleted 比较两个pod的回退时间,如果第一个在第二个之前为true,
    // 反之 false
    pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
    pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()

    return pq
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Для двух функций, созданных при инициализации backoffQ, getBackoffTime и calculateBackoffDuration

// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
    duration := p.calculateBackoffDuration(podInfo)
    backoffTime := podInfo.Timestamp.Add(duration)
    return backoffTime
}

// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
    duration := p.podInitialBackoffDuration
    for i := 1; i < podInfo.Attempts; i++ {
        // Use subtraction instead of addition or multiplication to avoid overflow.
        if duration > p.podMaxBackoffDuration-duration {
            return p.podMaxBackoffDuration
        }
        duration += duration
    }
    return duration
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Для всего процесса ошибки неисправности планировщик инициализации регистрирует функцию Error, которая используется для обработки не планируемых Pods, фактически зарегистрированной функцией является MakeDefaultErrorFunc, которая будет вызываться как функция Error.

sched := newScheduler(
    schedulerCache,
    extenders,
    internalqueue.MakeNextPodFunc(podQueue),
    MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache),
    stopEverything,
    podQueue,
    profiles,
    client,
    snapshot,
    options.percentageOfNodesToScore,
)
Войдите в полноэкранный режим Выход из полноэкранного режима

Как вы можете видеть в цикле планирования, scheduleOne, функция handleSchedulingFailure вызывается для каждой операции точки расширения, которая терпит неудачу, и эта функция использует зарегистрированную функцию Error для обработки Pod

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    ...
    defer cancel()
    scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    if err != nil {

        var nominatingInfo *framework.NominatingInfo
        if fitError, ok := err.(*framework.FitError); ok {
            if !fwk.HasPostFilterPlugins() {
                klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
            } else {

                result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
                if status.Code() == framework.Error {
                    klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                } else {
                    fitError.Diagnosis.PostFilterMsg = status.Message()
                    klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                }
                if result != nil {
                    nominatingInfo = result.NominatingInfo
                }
            }

            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else if err == ErrNoNodesAvailable {
            nominatingInfo = clearNominatedNode
            // No nodes available is counted as unschedulable rather than an error.
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else {
            nominatingInfo = clearNominatedNode
            klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        }
        // 处理不可调度Pod
        sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
        return
    }

Войдите в полноэкранный режим Выход из полноэкранного режима

Это происходит в зарегистрированной функции ошибки MakeDefaultErrorFunc.

func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
    return func(podInfo *framework.QueuedPodInfo, err error) {
        pod := podInfo.Pod
        if err == ErrNoNodesAvailable {
            klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
        } else if fitError, ok := err.(*framework.FitError); ok {
            // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
            podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
            klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)
        } else if apierrors.IsNotFound(err) {
            klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)
            if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
                nodeName := errStatus.Status().Details.Name
                // when node is not found, We do not remove the node right away. Trying again to get
                // the node and if the node is still not found, then remove it from the scheduler cache.
                _, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
                if err != nil && apierrors.IsNotFound(err) {
                    node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
                    if err := schedulerCache.RemoveNode(&node); err != nil {
                        klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)
                    }
                }
            }
        } else {
            klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
        }

        // Check if the Pod exists in informer cache.
        cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
        if err != nil {
            klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err)
            return
        }

        // In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
        // It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
        if len(cachedPod.Spec.NodeName) != 0 {
            klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
            return
        }

        // As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
        podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy())
        // 添加到unschedulable队列中
        if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
            klog.ErrorS(err, "Error occurred")
        }
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

А вот и AddUnschedulableIfNotPresent, который также является реальным действием для backoffQ и unschedulablePods.

Функция AddUnschedulableIfNotPresent вставляет незапланированную капсулу в очередь, если она еще не находится в очереди. Обычно PriorityQueue помещает незапланированные капсулы в unschedulablePods. Однако, если недавно был запрос на перемещение, капсула будет помещена в podBackoffQ.

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    pod := pInfo.Pod
    // 如果已经存在则不添加
    if p.unschedulablePods.get(pod) != nil {
        return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
    }
    // 检查是否在activeQ中
    if _, exists, _ := p.activeQ.Get(pInfo); exists {
        return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
    }
    // 检查是否在podBackoffQ中
    if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
        return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
    }

    // 在重新添加时,会刷新 Pod时间为最新操作的时间
    pInfo.Timestamp = p.clock.Now()

    for plugin := range pInfo.UnschedulablePlugins {
        metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
    }
    // 如果接受到move request那么则放入BackoffQ
    if p.moveRequestCycle >= podSchedulingCycle {
        if err := p.podBackoffQ.Add(pInfo); err != nil {
            return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
        }
        metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
    } else {
        // 否则将放入到 unschedulablePods
        p.unschedulablePods.addOrUpdate(pInfo)
        metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()

    }

    p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
    return nil
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Когда запускается планировщик, две очереди включаются асинхронно с двумя циклами для работы с очередью. Это показано в Run()

func (p *PriorityQueue) Run() {
    go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
    go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Вы можете видеть, что flushBackoffQCompleted реализован как BackoffQ; а flushUnschedulablePodsLeftover реализован как UnschedulablePods.

flushBackoffQCompleted используется для перемещения всех завершенных капсул отката из backoffQ в activeQ.

func (p *PriorityQueue) flushBackoffQCompleted() {
    p.lock.Lock()
    defer p.lock.Unlock()
    broadcast := false
    for { // 这就是heap实现的方法,窥视下,但不弹出
        rawPodInfo := p.podBackoffQ.Peek()
        if rawPodInfo == nil {
            break
        }
        pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
        boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
        if boTime.After(p.clock.Now()) {
            break
        }
        _, err := p.podBackoffQ.Pop() // 弹出一个
        if err != nil {
            klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
            break
        }
        p.activeQ.Add(rawPodInfo) // 放入到活动队列中
        metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
        broadcast = true
    }

    if broadcast {
        p.cond.Broadcast()
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Функция flushUnschedulablePodsLeftover используется для перемещения капсулы, которая хранилась в unschedulablePods дольше, чем podMaxInUnschedulablePodsDuration значение backoffQ или activeQ.

podMaxInUnschedulablePodsDuration будет передаваться в зависимости от конфигурации, если не передается, т.е. используется Deprecated, то это будет 5 минут.

func NewOptions() *Options {
    o := &Options{
        SecureServing:  apiserveroptions.NewSecureServingOptions().WithLoopback(),
        Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
        Authorization:  apiserveroptions.NewDelegatingAuthorizationOptions(),
        Deprecated: &DeprecatedOptions{
            PodMaxInUnschedulablePodsDuration: 5 * time.Minute,
        },
Войдите в полноэкранный режим Выход из полноэкранного режима

Для flushUnschedulablePodsLeftover это сравнение по времени, а затем добавление в соответствующую очередь

func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
    p.lock.Lock()
    defer p.lock.Unlock()

    var podsToMove []*framework.QueuedPodInfo
    currentTime := p.clock.Now()
    for _, pInfo := range p.unschedulablePods.podInfoMap {
        lastScheduleTime := pInfo.Timestamp
        if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
            podsToMove = append(podsToMove, pInfo)
        }
    }

    if len(podsToMove) > 0 {
        p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Обобщение контекстного потока планирования

  • При создании планировщика выполняются следующие действия.
    • подготовить кэш, информер, очередь, функции обработки ошибок и т.д.
    • Добавление функций событий, которые будут прослушивать ресурсы (например, Pods) и запускать соответствующую функцию события, когда происходит изменение, это входящий activeQ.
  • Когда сборка будет завершена, она будет запущена, и запустится SchedulingQueue, который используется как не планируемая очередь
    • BackoffQ
    • UnschedulablePods
    • Очередь без расписания будет добавлена к activeQ на основе Стручков в очереди, которые регулярно потребляются при регистрации
  • Запустить цикл scheduleOne, который представляет собой выполнение всех расширений в контексте планирования и является потребляющим концом activeQ.
    • Выполнить каждую точку расширения, и если возникает ошибка, функция Error MakeDefaultErrorFunc добавляет ее в непланируемую очередь
    • Вернемся к потребляющей части недиспетчерской очереди

Ссылка

[1] kubernetes scheduler ext. расширитель планировщика kubernetes
[2] система планирования
[3] Точки расширения Точки расширения

Оцените статью
devanswers.ru
Добавить комментарий