Skip to content

Commit

Permalink
fix:imageprepull not report when nodename is not exist
Browse files Browse the repository at this point in the history
Signed-off-by: joohwan <[email protected]>
  • Loading branch information
Piwriw committed May 10, 2024
1 parent fed3fcd commit 1c3a213
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

jsonpatch "github.com/evanphx/json-patch"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachineryType "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -323,6 +324,19 @@ func (ndc *ImagePrePullController) imagePrePullJobUpdated(pullJob *v1alpha1.Imag
}
}

func (ndc *ImagePrePullController) ValidateNode(taskMessage util.TaskMessage) ([]*corev1.Node, []*corev1.Node) {
var validateNodes []*corev1.Node
nodes, failNodes := ndc.BaseController.ValidateNode(taskMessage)
_, ok := taskMessage.Msg.(commontypes.ImagePrePullJobRequest)
if !ok {
klog.Errorf("convert message to commontypes.ImagePrePullJobRequest failed")
return nil, nil
}

validateNodes = append(validateNodes, nodes...)

return validateNodes, failNodes
}
func checkUpdateNode(old, new *v1alpha1.ImagePrePullJob) *v1alpha1.TaskStatus {
if len(old.Status.Status) == 0 {
return nil
Expand Down
17 changes: 8 additions & 9 deletions cloud/pkg/taskmanager/manager/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,16 @@ func initExecutor(message util.TaskMessage) (*Executor, error) {
return nil, err
}
if len(nodeStatus) == 0 {
nodeList := controller.ValidateNode(message)
if len(nodeList) == 0 {
return nil, fmt.Errorf("no node need to be upgrade")
successNodes, failNodes := controller.ValidateNode(message)
nodeStatus = make([]v1alpha1.TaskStatus, 0, len(message.Name))
for _, node := range failNodes {
nodeStatus = append(nodeStatus, v1alpha1.TaskStatus{NodeName: node.Name, State: api.TaskFailed})
}
nodeStatus = make([]v1alpha1.TaskStatus, len(nodeList))
for i, node := range nodeList {
nodeStatus[i] = v1alpha1.TaskStatus{NodeName: node.Name}
for _, node := range successNodes {
nodeStatus = append(nodeStatus, v1alpha1.TaskStatus{NodeName: node.Name})
}
err = controller.UpdateNodeStatus(message.Name, nodeStatus)
if err != nil {
return nil, err
if err := controller.UpdateNodeStatus(message.Name, nodeStatus); err != nil {
return nil, fmt.Errorf("failed to update node status,err:%s", err.Error())
}
}
e := &Executor{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,20 @@ func (ndc *NodeUpgradeController) ReportTaskStatus(taskID string, event fsm.Even
return taskFSM.CurrentState()
}

func (ndc *NodeUpgradeController) ValidateNode(taskMessage util.TaskMessage) []v1.Node {
var validateNodes []v1.Node
nodes := ndc.BaseController.ValidateNode(taskMessage)
func (ndc *NodeUpgradeController) ValidateNode(taskMessage util.TaskMessage) ([]*v1.Node, []*v1.Node) {
var validateNodes []*v1.Node
nodes, failNodes := ndc.BaseController.ValidateNode(taskMessage)
req, ok := taskMessage.Msg.(commontypes.NodeUpgradeJobRequest)
if !ok {
klog.Errorf("convert message to commontypes.NodeUpgradeJobRequest failed")
return nil
return nil, nil
}
for _, node := range nodes {
if needUpgrade(node, req.Version) {
if needUpgrade(*node, req.Version) {
validateNodes = append(validateNodes, node)
}
}
return validateNodes
return validateNodes, failNodes
}

func (ndc *NodeUpgradeController) StageCompleted(taskID string, state api.State) bool {
Expand Down
31 changes: 18 additions & 13 deletions cloud/pkg/taskmanager/util/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sinformer "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -38,7 +39,7 @@ type Controller interface {
Start() error
ReportNodeStatus(string, string, fsm.Event) (api.State, error)
ReportTaskStatus(string, fsm.Event) (api.State, error)
ValidateNode(util.TaskMessage) []v1.Node
ValidateNode(util.TaskMessage) ([]*v1.Node, []*v1.Node)
GetNodeStatus(string) ([]v1alpha1.TaskStatus, error)
UpdateNodeStatus(string, []v1alpha1.TaskStatus) error
StageCompleted(taskID string, state api.State) bool
Expand All @@ -65,12 +66,12 @@ func (bc *BaseController) StageCompleted(taskID string, state api.State) bool {
return false
}

func (bc *BaseController) ValidateNode(taskMessage util.TaskMessage) []v1.Node {
var validateNodes []v1.Node
nodes, err := bc.getNodeList(taskMessage.NodeNames, taskMessage.LabelSelector)
func (bc *BaseController) ValidateNode(taskMessage util.TaskMessage) ([]*v1.Node, []*v1.Node) {
var validateNodes []*v1.Node
nodes, nodesNonExist, err := bc.getNodeList(taskMessage.NodeNames, taskMessage.LabelSelector)
if err != nil {
klog.Warningf("get node list error: %s", err.Error())
return nil
return nil, nil
}
for _, node := range nodes {
if !util.IsEdgeNode(node) {
Expand All @@ -81,9 +82,9 @@ func (bc *BaseController) ValidateNode(taskMessage util.TaskMessage) []v1.Node {
if !ready {
continue
}
validateNodes = append(validateNodes, *node)
validateNodes = append(validateNodes, node)
}
return validateNodes
return validateNodes, nodesNonExist
}

func (bc *BaseController) GetNodeStatus(name string) ([]v1alpha1.TaskStatus, error) {
Expand Down Expand Up @@ -141,29 +142,33 @@ func GetController(name string) (Controller, error) {
return controller, nil
}

func (bc *BaseController) getNodeList(nodeNames []string, labelSelector *metav1.LabelSelector) ([]*v1.Node, error) {
func (bc *BaseController) getNodeList(nodeNames []string, labelSelector *metav1.LabelSelector) ([]*v1.Node, []*v1.Node, error) {
var nodesToUpgrade []*v1.Node

var nodesNonExist []*v1.Node
if len(nodeNames) != 0 {
for _, name := range nodeNames {
node, err := bc.Informer.Core().V1().Nodes().Lister().Get(name)
if apierrors.IsNotFound(err) {
nodesNonExist = append(nodesNonExist, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
continue
}
if err != nil {
return nil, fmt.Errorf("failed to get node with name %s: %v", name, err)
return nil, nil, fmt.Errorf("failed to get node with name %s: %v", name, err)
}
nodesToUpgrade = append(nodesToUpgrade, node)
}
} else if labelSelector != nil {
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil, fmt.Errorf("labelSelector(%s) is not valid: %v", labelSelector, err)
return nil, nil, fmt.Errorf("labelSelector(%s) is not valid: %v", labelSelector, err)
}

nodes, err := bc.Informer.Core().V1().Nodes().Lister().List(selector)
if err != nil {
return nil, fmt.Errorf("failed to get nodes with label %s: %v", selector.String(), err)
return nil, nil, fmt.Errorf("failed to get nodes with label %s: %v", selector.String(), err)
}
nodesToUpgrade = nodes
}

return nodesToUpgrade, nil
return nodesToUpgrade, nodesNonExist, nil
}

0 comments on commit 1c3a213

Please sign in to comment.