From 1c3a213990bbc30d867d8f359a36429f3fb6d5ca Mon Sep 17 00:00:00 2001 From: joohwan Date: Fri, 10 May 2024 15:55:13 +0800 Subject: [PATCH] fix:imageprepull not report when nodename is not exist Signed-off-by: joohwan --- .../image_prepull_controller.go | 14 +++++++++ cloud/pkg/taskmanager/manager/executor.go | 17 +++++----- .../node_upgrade_controller.go | 12 +++---- .../taskmanager/util/controller/controller.go | 31 +++++++++++-------- 4 files changed, 46 insertions(+), 28 deletions(-) diff --git a/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go b/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go index 31dc525aac9..568d310b444 100644 --- a/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go +++ b/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go @@ -26,6 +26,7 @@ import ( "time" jsonpatch "github.com/evanphx/json-patch" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apimachineryType "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -323,6 +324,19 @@ func (ndc *ImagePrePullController) imagePrePullJobUpdated(pullJob *v1alpha1.Imag } } +func (ndc *ImagePrePullController) ValidateNode(taskMessage util.TaskMessage) ([]*corev1.Node, []*corev1.Node) { + var validateNodes []*corev1.Node + nodes, failNodes := ndc.BaseController.ValidateNode(taskMessage) + _, ok := taskMessage.Msg.(commontypes.ImagePrePullJobRequest) + if !ok { + klog.Errorf("convert message to commontypes.ImagePrePullJobRequest failed") + return nil, nil + } + + validateNodes = append(validateNodes, nodes...) + + return validateNodes, failNodes +} func checkUpdateNode(old, new *v1alpha1.ImagePrePullJob) *v1alpha1.TaskStatus { if len(old.Status.Status) == 0 { return nil diff --git a/cloud/pkg/taskmanager/manager/executor.go b/cloud/pkg/taskmanager/manager/executor.go index d3a7aec4555..7c5c6bcb0f3 100644 --- a/cloud/pkg/taskmanager/manager/executor.go +++ b/cloud/pkg/taskmanager/manager/executor.go @@ -208,17 +208,16 @@ func initExecutor(message util.TaskMessage) (*Executor, error) { return nil, err } if len(nodeStatus) == 0 { - nodeList := controller.ValidateNode(message) - if len(nodeList) == 0 { - return nil, fmt.Errorf("no node need to be upgrade") + successNodes, failNodes := controller.ValidateNode(message) + nodeStatus = make([]v1alpha1.TaskStatus, 0, len(message.Name)) + for _, node := range failNodes { + nodeStatus = append(nodeStatus, v1alpha1.TaskStatus{NodeName: node.Name, State: api.TaskFailed}) } - nodeStatus = make([]v1alpha1.TaskStatus, len(nodeList)) - for i, node := range nodeList { - nodeStatus[i] = v1alpha1.TaskStatus{NodeName: node.Name} + for _, node := range successNodes { + nodeStatus = append(nodeStatus, v1alpha1.TaskStatus{NodeName: node.Name}) } - err = controller.UpdateNodeStatus(message.Name, nodeStatus) - if err != nil { - return nil, err + if err := controller.UpdateNodeStatus(message.Name, nodeStatus); err != nil { + return nil, fmt.Errorf("failed to update node status,err:%s", err.Error()) } } e := &Executor{ diff --git a/cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go b/cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go index 649b95cbb5a..df35ff6ea16 100644 --- a/cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go +++ b/cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go @@ -135,20 +135,20 @@ func (ndc *NodeUpgradeController) ReportTaskStatus(taskID string, event fsm.Even return taskFSM.CurrentState() } -func (ndc *NodeUpgradeController) ValidateNode(taskMessage util.TaskMessage) []v1.Node { - var validateNodes []v1.Node - nodes := ndc.BaseController.ValidateNode(taskMessage) +func (ndc *NodeUpgradeController) ValidateNode(taskMessage util.TaskMessage) ([]*v1.Node, []*v1.Node) { + var validateNodes []*v1.Node + nodes, failNodes := ndc.BaseController.ValidateNode(taskMessage) req, ok := taskMessage.Msg.(commontypes.NodeUpgradeJobRequest) if !ok { klog.Errorf("convert message to commontypes.NodeUpgradeJobRequest failed") - return nil + return nil, nil } for _, node := range nodes { - if needUpgrade(node, req.Version) { + if needUpgrade(*node, req.Version) { validateNodes = append(validateNodes, node) } } - return validateNodes + return validateNodes, failNodes } func (ndc *NodeUpgradeController) StageCompleted(taskID string, state api.State) bool { diff --git a/cloud/pkg/taskmanager/util/controller/controller.go b/cloud/pkg/taskmanager/util/controller/controller.go index 78259d3abf1..eb33abfc5f0 100644 --- a/cloud/pkg/taskmanager/util/controller/controller.go +++ b/cloud/pkg/taskmanager/util/controller/controller.go @@ -20,6 +20,7 @@ import ( "fmt" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sinformer "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -38,7 +39,7 @@ type Controller interface { Start() error ReportNodeStatus(string, string, fsm.Event) (api.State, error) ReportTaskStatus(string, fsm.Event) (api.State, error) - ValidateNode(util.TaskMessage) []v1.Node + ValidateNode(util.TaskMessage) ([]*v1.Node, []*v1.Node) GetNodeStatus(string) ([]v1alpha1.TaskStatus, error) UpdateNodeStatus(string, []v1alpha1.TaskStatus) error StageCompleted(taskID string, state api.State) bool @@ -65,12 +66,12 @@ func (bc *BaseController) StageCompleted(taskID string, state api.State) bool { return false } -func (bc *BaseController) ValidateNode(taskMessage util.TaskMessage) []v1.Node { - var validateNodes []v1.Node - nodes, err := bc.getNodeList(taskMessage.NodeNames, taskMessage.LabelSelector) +func (bc *BaseController) ValidateNode(taskMessage util.TaskMessage) ([]*v1.Node, []*v1.Node) { + var validateNodes []*v1.Node + nodes, nodesNonExist, err := bc.getNodeList(taskMessage.NodeNames, taskMessage.LabelSelector) if err != nil { klog.Warningf("get node list error: %s", err.Error()) - return nil + return nil, nil } for _, node := range nodes { if !util.IsEdgeNode(node) { @@ -81,9 +82,9 @@ func (bc *BaseController) ValidateNode(taskMessage util.TaskMessage) []v1.Node { if !ready { continue } - validateNodes = append(validateNodes, *node) + validateNodes = append(validateNodes, node) } - return validateNodes + return validateNodes, nodesNonExist } func (bc *BaseController) GetNodeStatus(name string) ([]v1alpha1.TaskStatus, error) { @@ -141,29 +142,33 @@ func GetController(name string) (Controller, error) { return controller, nil } -func (bc *BaseController) getNodeList(nodeNames []string, labelSelector *metav1.LabelSelector) ([]*v1.Node, error) { +func (bc *BaseController) getNodeList(nodeNames []string, labelSelector *metav1.LabelSelector) ([]*v1.Node, []*v1.Node, error) { var nodesToUpgrade []*v1.Node - + var nodesNonExist []*v1.Node if len(nodeNames) != 0 { for _, name := range nodeNames { node, err := bc.Informer.Core().V1().Nodes().Lister().Get(name) + if apierrors.IsNotFound(err) { + nodesNonExist = append(nodesNonExist, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) + continue + } if err != nil { - return nil, fmt.Errorf("failed to get node with name %s: %v", name, err) + return nil, nil, fmt.Errorf("failed to get node with name %s: %v", name, err) } nodesToUpgrade = append(nodesToUpgrade, node) } } else if labelSelector != nil { selector, err := metav1.LabelSelectorAsSelector(labelSelector) if err != nil { - return nil, fmt.Errorf("labelSelector(%s) is not valid: %v", labelSelector, err) + return nil, nil, fmt.Errorf("labelSelector(%s) is not valid: %v", labelSelector, err) } nodes, err := bc.Informer.Core().V1().Nodes().Lister().List(selector) if err != nil { - return nil, fmt.Errorf("failed to get nodes with label %s: %v", selector.String(), err) + return nil, nil, fmt.Errorf("failed to get nodes with label %s: %v", selector.String(), err) } nodesToUpgrade = nodes } - return nodesToUpgrade, nil + return nodesToUpgrade, nodesNonExist, nil }