diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index edce944f1..7b80c4ad2 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -91,6 +91,8 @@ type PostgresSpec struct { // deprecated json tags InitContainersOld []v1.Container `json:"init_containers,omitempty"` PodPriorityClassNameOld string `json:"pod_priority_class_name,omitempty"` + + TopologySpreadConstraints []v1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index b00d4b6b7..5495e9e53 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -453,6 +453,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa needsRollUpdate = true reasons = append(reasons, "new statefulset's pod affinity does not match the current one") } + if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.TopologySpreadConstraints, statefulSet.Spec.Template.Spec.TopologySpreadConstraints) { + needsReplace = true + needsRollUpdate = true + reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one") + } if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) { needsReplace = true needsRollUpdate = true diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index b60c4ecb3..a1b6490ec 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -578,6 +578,36 @@ func generatePodAntiAffinity(podAffinityTerm v1.PodAffinityTerm, preferredDuring return podAntiAffinity } +func generateTopologySpreadConstraints(labels labels.Set, topologySpreadConstraintObjs []v1.TopologySpreadConstraint) []v1.TopologySpreadConstraint { + var topologySpreadConstraints []v1.TopologySpreadConstraint + var nodeAffinityPolicy *v1.NodeInclusionPolicy + var nodeTaintsPolicy *v1.NodeInclusionPolicy + for _, topologySpreadConstraintObj := range topologySpreadConstraintObjs { + if topologySpreadConstraintObj.NodeAffinityPolicy != nil { + nodeAffinityPolicy = (*v1.NodeInclusionPolicy)(topologySpreadConstraintObj.NodeAffinityPolicy) + } + if topologySpreadConstraintObj.NodeTaintsPolicy != nil { + nodeTaintsPolicy = (*v1.NodeInclusionPolicy)(topologySpreadConstraintObj.NodeTaintsPolicy) + } + topologySpreadConstraint := v1.TopologySpreadConstraint{ + MaxSkew: topologySpreadConstraintObj.MaxSkew, + TopologyKey: topologySpreadConstraintObj.TopologyKey, + WhenUnsatisfiable: v1.UnsatisfiableConstraintAction(topologySpreadConstraintObj.WhenUnsatisfiable), + LabelSelector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + MinDomains: topologySpreadConstraintObj.MinDomains, + NodeAffinityPolicy: nodeAffinityPolicy, + NodeTaintsPolicy: nodeTaintsPolicy, + MatchLabelKeys: topologySpreadConstraintObj.MatchLabelKeys, + } + topologySpreadConstraints = append(topologySpreadConstraints, topologySpreadConstraint) + nodeAffinityPolicy = nil + nodeTaintsPolicy = nil + } + return topologySpreadConstraints +} + func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]string) []v1.Toleration { // allow to override tolerations by postgresql manifest if len(*tolerationsSpec) > 0 { @@ -791,6 +821,7 @@ func (c *Cluster) generatePodTemplate( podAntiAffinity bool, podAntiAffinityTopologyKey string, podAntiAffinityPreferredDuringScheduling bool, + topologySpreadConstraints []v1.TopologySpreadConstraint, additionalSecretMount string, additionalSecretMountPath string, additionalVolumes []acidv1.AdditionalVolume, @@ -846,6 +877,10 @@ func (c *Cluster) generatePodTemplate( podSpec.PriorityClassName = priorityClassName } + if len(topologySpreadConstraints) > 0 { + podSpec.TopologySpreadConstraints = generateTopologySpreadConstraints(labels, topologySpreadConstraints) + } + if sharePgSocketWithSidecars != nil && *sharePgSocketWithSidecars { addVarRunVolume(&podSpec) } @@ -1447,6 +1482,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef c.OpConfig.EnablePodAntiAffinity, c.OpConfig.PodAntiAffinityTopologyKey, c.OpConfig.PodAntiAffinityPreferredDuringScheduling, + spec.TopologySpreadConstraints, c.OpConfig.AdditionalSecretMount, c.OpConfig.AdditionalSecretMountPath, additionalVolumes) @@ -2282,6 +2318,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) { false, "", false, + []v1.TopologySpreadConstraint{}, c.OpConfig.AdditionalSecretMount, c.OpConfig.AdditionalSecretMountPath, []acidv1.AdditionalVolume{}); err != nil {