From dd906fd08a440b7afa62e6971267d0c4062d2d2c Mon Sep 17 00:00:00 2001 From: cuisongliu Date: Mon, 25 Sep 2023 20:15:04 +0800 Subject: [PATCH] feature(main): join nodes for k3s (#3944) --- pkg/runtime/k3s/bootstrap.go | 46 ++++++++++++++++++++++++ pkg/runtime/k3s/config.go | 13 +------ pkg/runtime/k3s/consts.go | 2 +- pkg/runtime/k3s/k3s.go | 30 ++++++++++++++-- pkg/runtime/k3s/lifecycle.go | 30 +++++++++++++--- pkg/runtime/kubernetes/runtime_getter.go | 2 +- pkg/ssh/remote.go | 6 ++-- pkg/template/template_test.go | 34 +++++++----------- test/e2e/k3s_basic_test.go | 2 +- 9 files changed, 119 insertions(+), 46 deletions(-) diff --git a/pkg/runtime/k3s/bootstrap.go b/pkg/runtime/k3s/bootstrap.go index 9dd2ffebd8a..21ee5819be9 100644 --- a/pkg/runtime/k3s/bootstrap.go +++ b/pkg/runtime/k3s/bootstrap.go @@ -18,9 +18,14 @@ import ( "context" "fmt" "path/filepath" + "strings" + + "github.com/pkg/errors" "golang.org/x/sync/errgroup" + "github.com/labring/sealos/pkg/utils/iputils" + "github.com/labring/sealos/pkg/constants" "github.com/labring/sealos/pkg/utils/file" "github.com/labring/sealos/pkg/utils/logger" @@ -36,6 +41,9 @@ func (k *K3s) initMaster0() error { k.generateAndSendInitConfig, func() error { return k.enableK3sService(master0) }, k.pullKubeConfigFromMaster0, + func() error { + return k.remoteUtil.HostsAdd(master0, iputils.GetHostIP(master0), constants.DefaultAPIServerDomain) + }, func() error { return k.copyKubeConfigFileToNodes(k.cluster.GetMaster0IPAndPort()) }, ) } @@ -95,6 +103,9 @@ func (k *K3s) joinMaster(master string) error { return k.execer.Copy(master, filepath.Join(k.pathResolver.EtcPath(), defaultJoinMastersFilename), defaultK3sConfigPath) }, func() error { return k.enableK3sService(master) }, + func() error { + return k.remoteUtil.HostsAdd(master, iputils.GetHostIP(master), constants.DefaultAPIServerDomain) + }, func() error { return k.copyKubeConfigFileToNodes(master) }, ) } @@ -111,8 +122,35 @@ func (k *K3s) joinNodes(nodes []string) error { return nil } +func (k *K3s) getAPIServerPort() int { + src := filepath.Join(k.pathResolver.EtcPath(), defaultInitFilename) + if file.IsExist(src) { + cfg := &Config{} + if err := yaml.UnmarshalFile(src, cfg); err == nil { + return cfg.HTTPSPort + } + } + return constants.DefaultAPIServerPort +} + +func (k *K3s) getMasterIPListAndHTTPSPort() []string { + apiPort := k.getAPIServerPort() + masters := make([]string, 0) + for _, master := range k.cluster.GetMasterIPList() { + masters = append(masters, fmt.Sprintf("%s:%d", master, apiPort)) + } + return masters +} + +func (k *K3s) getVipAndPort() string { + return fmt.Sprintf("%s:%d", k.cluster.GetVIP(), k.getAPIServerPort()) +} + func (k *K3s) joinNode(node string) error { return k.runPipelines(fmt.Sprintf("join node %s", node), + func() error { + return k.remoteUtil.IPVS(node, k.getVipAndPort(), k.getMasterIPListAndHTTPSPort()) + }, func() error { return k.generateAndSendTokenFiles(node, "agent-token") }, func() error { return k.execer.Copy(node, filepath.Join(k.pathResolver.EtcPath(), defaultJoinNodesFilename), defaultK3sConfigPath) @@ -193,6 +231,14 @@ func (k *K3s) pullKubeConfigFromMaster0() error { func (k *K3s) copyKubeConfigFileToNodes(hosts ...string) error { src := k.pathResolver.AdminFile() + data, err := file.ReadAll(src) + if err != nil { + return errors.WithMessage(err, "read admin.config file failed") + } + newData := strings.ReplaceAll(string(data), "https://0.0.0.0", fmt.Sprintf("https://%s", constants.DefaultAPIServerDomain)) + if err = file.WriteFile(src, []byte(newData)); err != nil { + return errors.WithMessage(err, "write admin.config file failed") + } eg, _ := errgroup.WithContext(context.Background()) for _, node := range hosts { node := node diff --git a/pkg/runtime/k3s/config.go b/pkg/runtime/k3s/config.go index 4438fe56ade..c61628aa595 100644 --- a/pkg/runtime/k3s/config.go +++ b/pkg/runtime/k3s/config.go @@ -37,9 +37,8 @@ var defaultMergeOpts = []func(*mergo.Config){ } func defaultingConfig(c *Config) *Config { - //TODO update kube config c.BindAddress = "0.0.0.0" - c.HTTPSPort = 6443 + c.HTTPSPort = constants.DefaultAPIServerPort c.ClusterCIDR = []string{"10.42.0.0/16"} c.ServiceCIDR = []string{"10.96.0.0/16"} c.ClusterDomain = constants.DefaultDNSDomain @@ -173,16 +172,6 @@ func (k *K3s) getInitConfig(callbacks ...callback) (*Config, error) { return cfg, nil } -//lint:ignore U1000 Ignore unused function temporarily for debugging -func (c *Config) getContainerRuntimeEndpoint() string { - if c.AgentConfig.Docker { - return "unix:///run/k3s/cri-dockerd/cri-dockerd.sock" - } else if len(c.AgentConfig.ContainerRuntimeEndpoint) == 0 { - return "unix:///run/k3s/containerd/containerd.sock" - } - return c.AgentConfig.ContainerRuntimeEndpoint -} - // ParseConfig return nil if data structure is not matched func ParseConfig(data []byte) (*Config, error) { var cfg Config diff --git a/pkg/runtime/k3s/consts.go b/pkg/runtime/k3s/consts.go index a18eb13444f..2d7bc20f54d 100644 --- a/pkg/runtime/k3s/consts.go +++ b/pkg/runtime/k3s/consts.go @@ -25,7 +25,7 @@ const ( defaultInitFilename = "k3s-init.yaml" defaultJoinMastersFilename = "k3s-join-master.yaml" defaultJoinNodesFilename = "k3s-join-node.yaml" - defaultPodManifestPath = "pod-manifests" + k3sEtcStaticPod = "/var/lib/rancher/k3s/agent/pod-manifests" ) const ( diff --git a/pkg/runtime/k3s/k3s.go b/pkg/runtime/k3s/k3s.go index cd6a4d11913..225c19b069b 100644 --- a/pkg/runtime/k3s/k3s.go +++ b/pkg/runtime/k3s/k3s.go @@ -15,8 +15,14 @@ package k3s import ( + "context" "fmt" + "golang.org/x/sync/errgroup" + + "github.com/labring/sealos/pkg/utils/iputils" + "github.com/labring/sealos/pkg/utils/strings" + "github.com/labring/sealos/pkg/constants" "github.com/labring/sealos/pkg/env" "github.com/labring/sealos/pkg/exec" @@ -115,9 +121,27 @@ func (k *K3s) GetRawConfig() ([]byte, error) { return yaml.MarshalConfigs(cluster, cfg) } -func (k *K3s) SyncNodeIPVS(_, _ []string) error { - logger.Error("not yet implemented, skip for testing") - return nil +func (k *K3s) SyncNodeIPVS(mastersIPList, nodeIPList []string) error { + apiPort := k.getAPIServerPort() + mastersIPList = strings.RemoveDuplicate(mastersIPList) + masters := make([]string, 0) + for _, master := range mastersIPList { + masters = append(masters, fmt.Sprintf("%s:%d", iputils.GetHostIP(master), apiPort)) + } + image := k.cluster.GetLvscareImage() + eg, _ := errgroup.WithContext(context.Background()) + for _, node := range nodeIPList { + node := node + eg.Go(func() error { + logger.Info("start to sync lvscare static pod to node: %s master: %+v", node, masters) + err := k.remoteUtil.StaticPod(node, k.getVipAndPort(), constants.LvsCareStaticPodName, image, masters, k3sEtcStaticPod, "--health-status", "401") + if err != nil { + return fmt.Errorf("update lvscare static pod failed %s %v", node, err) + } + return nil + }) + } + return eg.Wait() } func (k *K3s) runPipelines(phase string, pipelines ...func() error) error { diff --git a/pkg/runtime/k3s/lifecycle.go b/pkg/runtime/k3s/lifecycle.go index c23313d546b..c580e5bbd71 100644 --- a/pkg/runtime/k3s/lifecycle.go +++ b/pkg/runtime/k3s/lifecycle.go @@ -18,6 +18,10 @@ import ( "context" "fmt" + "github.com/labring/sealos/pkg/utils/iputils" + + "github.com/labring/sealos/pkg/utils/strings" + "golang.org/x/exp/slices" "github.com/labring/sealos/pkg/utils/logger" @@ -48,15 +52,33 @@ func (k *K3s) resetNode(host string) error { } if slices.Contains(k.cluster.GetNodeIPList(), host) { vipAndPort := fmt.Sprintf("%s:%d", k.cluster.GetVIP(), k.config.APIServerPort) - ipvscleanErr := k.remoteUtil.IPVSClean(host, vipAndPort) - if ipvscleanErr != nil { - logger.Error("failed to clean node route and ipvs failed, %v", ipvscleanErr) + ipvsclearErr := k.remoteUtil.IPVSClean(host, vipAndPort) + if ipvsclearErr != nil { + logger.Error("failed to clear ipvs rules for node %s: %v", host, ipvsclearErr) } } return nil } // TODO: remove from API -func (k *K3s) deleteNode(_ string) error { +func (k *K3s) deleteNode(node string) error { + //remove master + masterIPs := strings.RemoveFromSlice(k.cluster.GetMasterIPList(), node) + if len(masterIPs) > 0 { + // TODO: do we need draining first? + if err := k.removeNode(node); err != nil { + logger.Warn(fmt.Errorf("delete nodes %s failed %v", node, err)) + } + } return nil } + +func (k *K3s) removeNode(ip string) error { + logger.Info("start to remove node from k3s %s", ip) + nodeName, err := k.execer.CmdToString(k.cluster.GetMaster0IPAndPort(), fmt.Sprintf("kubectl get nodes -o wide | awk '$6==\"%s\" {print $1}'", iputils.GetHostIP(ip)), "") + if err != nil { + return fmt.Errorf("cannot get node with ip address %s: %v", ip, err) + } + logger.Debug("found node name is %s, we will delete it", nodeName) + return k.execer.CmdAsync(k.cluster.GetMaster0IPAndPort(), fmt.Sprintf("kubectl delete node %s --ignore-not-found=true", nodeName)) +} diff --git a/pkg/runtime/kubernetes/runtime_getter.go b/pkg/runtime/kubernetes/runtime_getter.go index 92edb4185f4..2bc428d5cae 100644 --- a/pkg/runtime/kubernetes/runtime_getter.go +++ b/pkg/runtime/kubernetes/runtime_getter.go @@ -112,7 +112,7 @@ func (k *KubeadmRuntime) syncNodeIPVSYaml(masterIPs, nodesIPs []string) error { func (k *KubeadmRuntime) execIPVSPod(ip string, masters []string) error { image := k.cluster.GetLvscareImage() - return k.remoteUtil.StaticPod(ip, k.getVipAndPort(), constants.LvsCareStaticPodName, image, masters) + return k.remoteUtil.StaticPod(ip, k.getVipAndPort(), constants.LvsCareStaticPodName, image, masters, kubernetesEtcStaticPod) } func (k *KubeadmRuntime) execToken(ip, certificateKey string) (string, error) { diff --git a/pkg/ssh/remote.go b/pkg/ssh/remote.go index e191ffabdf5..b455f80a404 100644 --- a/pkg/ssh/remote.go +++ b/pkg/ssh/remote.go @@ -86,13 +86,15 @@ func (s *Remote) IPVSClean(ip, vip string) error { return s.executeRemoteUtilSubcommand(ip, out) } -func (s *Remote) StaticPod(ip, vip, name, image string, masters []string) error { - staticPodIPVSTemplate := `static-pod lvscare --name {{.name}} --vip {{.vip}} --image {{.image}} {{range $h := .masters}} --masters {{$h}}{{end}}` +func (s *Remote) StaticPod(ip, vip, name, image string, masters []string, path string, options ...string) error { + staticPodIPVSTemplate := `static-pod lvscare --path {{.path}} --name {{.name}} --vip {{.vip}} --image {{.image}} {{range $h := .masters}} --masters {{$h}} {{end}} {{range $o := .options}} --options {{$o}} {{end}}` data := map[string]interface{}{ "vip": vip, "image": image, "masters": masters, "name": name, + "path": path, + "options": options, } out, err := template.RenderTemplate("lvscare", staticPodIPVSTemplate, data) if err != nil { diff --git a/pkg/template/template_test.go b/pkg/template/template_test.go index f898596a73d..c8738148f8d 100644 --- a/pkg/template/template_test.go +++ b/pkg/template/template_test.go @@ -15,32 +15,22 @@ package template import ( - "bytes" - "fmt" "testing" ) func TestTemplateSemverCompare(t *testing.T) { - v, b, e := TryParse(` -version: {{if (semverCompare "^1.26.0" (default "" .ENV)) }}v1{{ else }}v1alpha2{{ end }} -`) - if e != nil { - t.Errorf("parse err: %v", e) + staticPodIPVSTemplate := `static-pod lvscare --path {{.path}} --name {{.name}} --vip {{.vip}} --image {{.image}} {{range $h := .masters}} --masters {{$h}} {{end}} {{range $o := .options}} --options {{$o}} {{end}} ` + data := map[string]interface{}{ + "vip": "127.0.0.1", + "image": "test", + "masters": []string{"127.0.0.2"}, + "name": "lvscare", + "path": "/etc/kubernetes", + "options": nil, } - if !b { - t.Errorf("parse failed: %v", b) + out, err := RenderTemplate("lvscare", staticPodIPVSTemplate, data) + if err != nil { + t.Errorf("%+v", err) } - - out := bytes.NewBuffer(nil) - execErr := v.Execute(out, map[string]interface{}{ - // comment out this to test true return - // "ENV": "v1.26.1", - // comment out this to test false return - "ENV": "v1.25.10", - }) - if execErr != nil { - t.Errorf("template exec err: %v", execErr) - } - - fmt.Println(out) + t.Log(out) } diff --git a/test/e2e/k3s_basic_test.go b/test/e2e/k3s_basic_test.go index 9cadd2ebbad..b0176dda821 100644 --- a/test/e2e/k3s_basic_test.go +++ b/test/e2e/k3s_basic_test.go @@ -48,7 +48,7 @@ var _ = Describe("E2E_sealos_k3s_basic_test", func() { BeforeEach(func() { By("build rootfs") dFile := config.RootfsDockerfile{ - BaseImage: "labring/k3s:latest", + BaseImage: "labring/k3s:v1.25-latest", Copys: []string{"sealctl opt/"}, } tmpdir, err := dFile.Write()