Skip to content

Commit

Permalink
feature(main): join nodes for k3s (labring#3944)
Browse files Browse the repository at this point in the history
  • Loading branch information
cuisongliu committed Sep 25, 2023
1 parent fa091a8 commit dd906fd
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 46 deletions.
46 changes: 46 additions & 0 deletions pkg/runtime/k3s/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ import (
"context"
"fmt"
"path/filepath"
"strings"

"github.com/pkg/errors"

"golang.org/x/sync/errgroup"

"github.com/labring/sealos/pkg/utils/iputils"

"github.com/labring/sealos/pkg/constants"
"github.com/labring/sealos/pkg/utils/file"
"github.com/labring/sealos/pkg/utils/logger"
Expand All @@ -36,6 +41,9 @@ func (k *K3s) initMaster0() error {
k.generateAndSendInitConfig,
func() error { return k.enableK3sService(master0) },
k.pullKubeConfigFromMaster0,
func() error {
return k.remoteUtil.HostsAdd(master0, iputils.GetHostIP(master0), constants.DefaultAPIServerDomain)
},
func() error { return k.copyKubeConfigFileToNodes(k.cluster.GetMaster0IPAndPort()) },
)
}
Expand Down Expand Up @@ -95,6 +103,9 @@ func (k *K3s) joinMaster(master string) error {
return k.execer.Copy(master, filepath.Join(k.pathResolver.EtcPath(), defaultJoinMastersFilename), defaultK3sConfigPath)
},
func() error { return k.enableK3sService(master) },
func() error {
return k.remoteUtil.HostsAdd(master, iputils.GetHostIP(master), constants.DefaultAPIServerDomain)
},
func() error { return k.copyKubeConfigFileToNodes(master) },
)
}
Expand All @@ -111,8 +122,35 @@ func (k *K3s) joinNodes(nodes []string) error {
return nil
}

func (k *K3s) getAPIServerPort() int {
src := filepath.Join(k.pathResolver.EtcPath(), defaultInitFilename)
if file.IsExist(src) {
cfg := &Config{}
if err := yaml.UnmarshalFile(src, cfg); err == nil {
return cfg.HTTPSPort
}
}
return constants.DefaultAPIServerPort
}

func (k *K3s) getMasterIPListAndHTTPSPort() []string {
apiPort := k.getAPIServerPort()
masters := make([]string, 0)
for _, master := range k.cluster.GetMasterIPList() {
masters = append(masters, fmt.Sprintf("%s:%d", master, apiPort))
}
return masters
}

func (k *K3s) getVipAndPort() string {
return fmt.Sprintf("%s:%d", k.cluster.GetVIP(), k.getAPIServerPort())
}

func (k *K3s) joinNode(node string) error {
return k.runPipelines(fmt.Sprintf("join node %s", node),
func() error {
return k.remoteUtil.IPVS(node, k.getVipAndPort(), k.getMasterIPListAndHTTPSPort())
},
func() error { return k.generateAndSendTokenFiles(node, "agent-token") },
func() error {
return k.execer.Copy(node, filepath.Join(k.pathResolver.EtcPath(), defaultJoinNodesFilename), defaultK3sConfigPath)
Expand Down Expand Up @@ -193,6 +231,14 @@ func (k *K3s) pullKubeConfigFromMaster0() error {

func (k *K3s) copyKubeConfigFileToNodes(hosts ...string) error {
src := k.pathResolver.AdminFile()
data, err := file.ReadAll(src)
if err != nil {
return errors.WithMessage(err, "read admin.config file failed")
}
newData := strings.ReplaceAll(string(data), "https://0.0.0.0", fmt.Sprintf("https://%s", constants.DefaultAPIServerDomain))
if err = file.WriteFile(src, []byte(newData)); err != nil {
return errors.WithMessage(err, "write admin.config file failed")
}
eg, _ := errgroup.WithContext(context.Background())
for _, node := range hosts {
node := node
Expand Down
13 changes: 1 addition & 12 deletions pkg/runtime/k3s/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ var defaultMergeOpts = []func(*mergo.Config){
}

func defaultingConfig(c *Config) *Config {
//TODO update kube config
c.BindAddress = "0.0.0.0"
c.HTTPSPort = 6443
c.HTTPSPort = constants.DefaultAPIServerPort
c.ClusterCIDR = []string{"10.42.0.0/16"}
c.ServiceCIDR = []string{"10.96.0.0/16"}
c.ClusterDomain = constants.DefaultDNSDomain
Expand Down Expand Up @@ -173,16 +172,6 @@ func (k *K3s) getInitConfig(callbacks ...callback) (*Config, error) {
return cfg, nil
}

//lint:ignore U1000 Ignore unused function temporarily for debugging
func (c *Config) getContainerRuntimeEndpoint() string {
if c.AgentConfig.Docker {
return "unix:///run/k3s/cri-dockerd/cri-dockerd.sock"
} else if len(c.AgentConfig.ContainerRuntimeEndpoint) == 0 {
return "unix:///run/k3s/containerd/containerd.sock"
}
return c.AgentConfig.ContainerRuntimeEndpoint
}

// ParseConfig return nil if data structure is not matched
func ParseConfig(data []byte) (*Config, error) {
var cfg Config
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/k3s/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
defaultInitFilename = "k3s-init.yaml"
defaultJoinMastersFilename = "k3s-join-master.yaml"
defaultJoinNodesFilename = "k3s-join-node.yaml"
defaultPodManifestPath = "pod-manifests"
k3sEtcStaticPod = "/var/lib/rancher/k3s/agent/pod-manifests"
)

const (
Expand Down
30 changes: 27 additions & 3 deletions pkg/runtime/k3s/k3s.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
package k3s

import (
"context"
"fmt"

"golang.org/x/sync/errgroup"

"github.com/labring/sealos/pkg/utils/iputils"
"github.com/labring/sealos/pkg/utils/strings"

"github.com/labring/sealos/pkg/constants"
"github.com/labring/sealos/pkg/env"
"github.com/labring/sealos/pkg/exec"
Expand Down Expand Up @@ -115,9 +121,27 @@ func (k *K3s) GetRawConfig() ([]byte, error) {
return yaml.MarshalConfigs(cluster, cfg)
}

func (k *K3s) SyncNodeIPVS(_, _ []string) error {
logger.Error("not yet implemented, skip for testing")
return nil
func (k *K3s) SyncNodeIPVS(mastersIPList, nodeIPList []string) error {
apiPort := k.getAPIServerPort()
mastersIPList = strings.RemoveDuplicate(mastersIPList)
masters := make([]string, 0)
for _, master := range mastersIPList {
masters = append(masters, fmt.Sprintf("%s:%d", iputils.GetHostIP(master), apiPort))
}
image := k.cluster.GetLvscareImage()
eg, _ := errgroup.WithContext(context.Background())
for _, node := range nodeIPList {
node := node
eg.Go(func() error {
logger.Info("start to sync lvscare static pod to node: %s master: %+v", node, masters)
err := k.remoteUtil.StaticPod(node, k.getVipAndPort(), constants.LvsCareStaticPodName, image, masters, k3sEtcStaticPod, "--health-status", "401")
if err != nil {
return fmt.Errorf("update lvscare static pod failed %s %v", node, err)
}
return nil
})
}
return eg.Wait()
}

func (k *K3s) runPipelines(phase string, pipelines ...func() error) error {
Expand Down
30 changes: 26 additions & 4 deletions pkg/runtime/k3s/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"context"
"fmt"

"github.com/labring/sealos/pkg/utils/iputils"

"github.com/labring/sealos/pkg/utils/strings"

"golang.org/x/exp/slices"

"github.com/labring/sealos/pkg/utils/logger"
Expand Down Expand Up @@ -48,15 +52,33 @@ func (k *K3s) resetNode(host string) error {
}
if slices.Contains(k.cluster.GetNodeIPList(), host) {
vipAndPort := fmt.Sprintf("%s:%d", k.cluster.GetVIP(), k.config.APIServerPort)
ipvscleanErr := k.remoteUtil.IPVSClean(host, vipAndPort)
if ipvscleanErr != nil {
logger.Error("failed to clean node route and ipvs failed, %v", ipvscleanErr)
ipvsclearErr := k.remoteUtil.IPVSClean(host, vipAndPort)
if ipvsclearErr != nil {
logger.Error("failed to clear ipvs rules for node %s: %v", host, ipvsclearErr)
}
}
return nil
}

// TODO: remove from API
func (k *K3s) deleteNode(_ string) error {
func (k *K3s) deleteNode(node string) error {
//remove master
masterIPs := strings.RemoveFromSlice(k.cluster.GetMasterIPList(), node)
if len(masterIPs) > 0 {
// TODO: do we need draining first?
if err := k.removeNode(node); err != nil {
logger.Warn(fmt.Errorf("delete nodes %s failed %v", node, err))
}
}
return nil
}

func (k *K3s) removeNode(ip string) error {
logger.Info("start to remove node from k3s %s", ip)
nodeName, err := k.execer.CmdToString(k.cluster.GetMaster0IPAndPort(), fmt.Sprintf("kubectl get nodes -o wide | awk '$6==\"%s\" {print $1}'", iputils.GetHostIP(ip)), "")
if err != nil {
return fmt.Errorf("cannot get node with ip address %s: %v", ip, err)
}
logger.Debug("found node name is %s, we will delete it", nodeName)
return k.execer.CmdAsync(k.cluster.GetMaster0IPAndPort(), fmt.Sprintf("kubectl delete node %s --ignore-not-found=true", nodeName))
}
2 changes: 1 addition & 1 deletion pkg/runtime/kubernetes/runtime_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (k *KubeadmRuntime) syncNodeIPVSYaml(masterIPs, nodesIPs []string) error {

func (k *KubeadmRuntime) execIPVSPod(ip string, masters []string) error {
image := k.cluster.GetLvscareImage()
return k.remoteUtil.StaticPod(ip, k.getVipAndPort(), constants.LvsCareStaticPodName, image, masters)
return k.remoteUtil.StaticPod(ip, k.getVipAndPort(), constants.LvsCareStaticPodName, image, masters, kubernetesEtcStaticPod)
}

func (k *KubeadmRuntime) execToken(ip, certificateKey string) (string, error) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/ssh/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,15 @@ func (s *Remote) IPVSClean(ip, vip string) error {
return s.executeRemoteUtilSubcommand(ip, out)
}

func (s *Remote) StaticPod(ip, vip, name, image string, masters []string) error {
staticPodIPVSTemplate := `static-pod lvscare --name {{.name}} --vip {{.vip}} --image {{.image}} {{range $h := .masters}} --masters {{$h}}{{end}}`
func (s *Remote) StaticPod(ip, vip, name, image string, masters []string, path string, options ...string) error {
staticPodIPVSTemplate := `static-pod lvscare --path {{.path}} --name {{.name}} --vip {{.vip}} --image {{.image}} {{range $h := .masters}} --masters {{$h}} {{end}} {{range $o := .options}} --options {{$o}} {{end}}`
data := map[string]interface{}{
"vip": vip,
"image": image,
"masters": masters,
"name": name,
"path": path,
"options": options,
}
out, err := template.RenderTemplate("lvscare", staticPodIPVSTemplate, data)
if err != nil {
Expand Down
34 changes: 12 additions & 22 deletions pkg/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,22 @@
package template

import (
"bytes"
"fmt"
"testing"
)

func TestTemplateSemverCompare(t *testing.T) {
v, b, e := TryParse(`
version: {{if (semverCompare "^1.26.0" (default "" .ENV)) }}v1{{ else }}v1alpha2{{ end }}
`)
if e != nil {
t.Errorf("parse err: %v", e)
staticPodIPVSTemplate := `static-pod lvscare --path {{.path}} --name {{.name}} --vip {{.vip}} --image {{.image}} {{range $h := .masters}} --masters {{$h}} {{end}} {{range $o := .options}} --options {{$o}} {{end}} `
data := map[string]interface{}{
"vip": "127.0.0.1",
"image": "test",
"masters": []string{"127.0.0.2"},
"name": "lvscare",
"path": "/etc/kubernetes",
"options": nil,
}
if !b {
t.Errorf("parse failed: %v", b)
out, err := RenderTemplate("lvscare", staticPodIPVSTemplate, data)
if err != nil {
t.Errorf("%+v", err)
}

out := bytes.NewBuffer(nil)
execErr := v.Execute(out, map[string]interface{}{
// comment out this to test true return
// "ENV": "v1.26.1",
// comment out this to test false return
"ENV": "v1.25.10",
})
if execErr != nil {
t.Errorf("template exec err: %v", execErr)
}

fmt.Println(out)
t.Log(out)
}
2 changes: 1 addition & 1 deletion test/e2e/k3s_basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ = Describe("E2E_sealos_k3s_basic_test", func() {
BeforeEach(func() {
By("build rootfs")
dFile := config.RootfsDockerfile{
BaseImage: "labring/k3s:latest",
BaseImage: "labring/k3s:v1.25-latest",
Copys: []string{"sealctl opt/"},
}
tmpdir, err := dFile.Write()
Expand Down

0 comments on commit dd906fd

Please sign in to comment.