Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kuberenetes resource limits & security context configs #1612

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
65c5f8a
wip
rolf-moz Oct 3, 2023
a3fd818
fixes for cli
rolf-moz Oct 3, 2023
94c6947
fixes
rolf-moz Oct 3, 2023
da0bec9
fixes
rolf-moz Oct 3, 2023
8aa2878
cleanup
rolf-moz Oct 3, 2023
95840fa
json param
rolf-moz Oct 3, 2023
08ed550
fix
rolf-moz Oct 3, 2023
c65684d
use json parser for config
rolf-moz Oct 3, 2023
6349403
don't reparse json
rolf-moz Oct 3, 2023
133ea7e
hyphen
rolf-moz Oct 3, 2023
66d2346
revert json
rolf-moz Oct 3, 2023
9ca28ed
fix hanging ,
rolf-moz Oct 3, 2023
6046586
fixes
rolf-moz Oct 3, 2023
33ab6a4
import fixes
rolf-moz Oct 3, 2023
a9705cf
Merge branch 'Netflix:master' into metaflow-kube-security-settings
rolf-moz Oct 5, 2023
9824258
restored gpu support
rolf-moz Oct 10, 2023
21062d7
Converted resource_limits to separate fields
rolf-moz Oct 24, 2023
ca50e08
fix
rolf-moz Oct 24, 2023
6f8ef40
update
rolf-moz Oct 24, 2023
ca92120
click args
rolf-moz Oct 24, 2023
4e58664
cleanup
rolf-moz Oct 24, 2023
a8672e6
added some logging
rolf-moz Oct 24, 2023
91b7301
not sure why this workaround is needed
rolf-moz Oct 24, 2023
36f8f05
restore
rolf-moz Oct 24, 2023
b0f7994
fix
rolf-moz Oct 24, 2023
5af6fc4
removed logs
rolf-moz Oct 24, 2023
07591fd
add default
rolf-moz Oct 24, 2023
779adcc
fixed root cause
rolf-moz Oct 24, 2023
94e59f8
lint
rolf-moz Oct 24, 2023
ab34734
refactor for argo
rolf-moz Oct 26, 2023
79c1c00
fixes
rolf-moz Oct 26, 2023
73e6fcf
cleanup
rolf-moz Oct 26, 2023
0e4a972
moved misplaced out volume from kube to argo
rolf-moz Oct 27, 2023
f72d0fd
Merge remote-tracking branch 'upstream/master' into metaflow-resource…
rolf-moz Nov 24, 2023
40ed8c0
cleanup unused imports
rolf-moz Nov 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@
KUBERNETES_PERSISTENT_VOLUME_CLAIMS = from_conf(
"KUBERNETES_PERSISTENT_VOLUME_CLAIMS", ""
)
KUBERNETES_SECURITY_CONTEXT = from_conf("KUBERNETES_SECURITY_CONTEXT", {})
KUBERNETES_RESOURCE_LIMITS_MEMORY = from_conf("KUBERNETES_RESOURCE_LIMITS_MEMORY", None)
KUBERNETES_RESOURCE_LIMITS_CPU = from_conf("KUBERNETES_RESOURCE_LIMITS_CPU", None)
KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "")
# Default labels for kubernetes pods
KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "")
Expand Down
104 changes: 15 additions & 89 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
)

from .argo_client import ArgoClient
from ..kubernetes.kuberenetes_utils import make_kubernetes_container


class ArgoWorkflowsException(MetaflowException):
Expand Down Expand Up @@ -1428,96 +1429,21 @@ def _container_templates(self):
# resources attributes in particular where the keys maybe user
# defined.
to_camelcase(
kubernetes_sdk.V1Container(
name=self._sanitize(node.name),
command=cmds,
env=[
kubernetes_sdk.V1EnvVar(name=k, value=str(v))
for k, v in env.items()
]
# Add environment variables for book-keeping.
# https://argoproj.github.io/argo-workflows/fields/#fields_155
+ [
kubernetes_sdk.V1EnvVar(
name=k,
value_from=kubernetes_sdk.V1EnvVarSource(
field_ref=kubernetes_sdk.V1ObjectFieldSelector(
field_path=str(v)
)
),
)
for k, v in {
"METAFLOW_KUBERNETES_POD_NAMESPACE": "metadata.namespace",
"METAFLOW_KUBERNETES_POD_NAME": "metadata.name",
"METAFLOW_KUBERNETES_POD_ID": "metadata.uid",
"METAFLOW_KUBERNETES_SERVICE_ACCOUNT_NAME": "spec.serviceAccountName",
"METAFLOW_KUBERNETES_NODE_IP": "status.hostIP",
}.items()
],
image=resources["image"],
image_pull_policy=resources["image_pull_policy"],
resources=kubernetes_sdk.V1ResourceRequirements(
requests={
"cpu": str(resources["cpu"]),
"memory": "%sM" % str(resources["memory"]),
"ephemeral-storage": "%sM" % str(resources["disk"]),
},
limits={
"%s.com/gpu".lower()
% resources["gpu_vendor"]: str(resources["gpu"])
for k in [0]
if resources["gpu"] is not None
},
),
# Configure secrets
env_from=[
kubernetes_sdk.V1EnvFromSource(
secret_ref=kubernetes_sdk.V1SecretEnvSource(
name=str(k),
# optional=True
)
)
for k in list(
[]
if not resources.get("secrets")
else [resources.get("secrets")]
if isinstance(resources.get("secrets"), str)
else resources.get("secrets")
)
+ KUBERNETES_SECRETS.split(",")
+ ARGO_WORKFLOWS_KUBERNETES_SECRETS.split(",")
if k
],
volume_mounts=[
make_kubernetes_container(
kubernetes_sdk,
self._sanitize(node.name),
cmds,
{
**resources,
# Assign a volume mount to pass state to the next task.
kubernetes_sdk.V1VolumeMount(
name="out", mount_path="/mnt/out"
)
]
# Support tmpfs.
+ (
[
kubernetes_sdk.V1VolumeMount(
name="tmpfs-ephemeral-volume",
mount_path=tmpfs_path,
)
]
if tmpfs_enabled
else []
)
# Support persistent volume claims.
+ (
[
kubernetes_sdk.V1VolumeMount(
name=claim, mount_path=path
)
for claim, path in resources.get(
"persistent_volume_claims"
).items()
]
if resources.get("persistent_volume_claims") is not None
else []
),
"persistent_volume_claims": {
**(resources.get("persistent_volume_claims") or {}),
"out": "/mnt/out",
},
},
env,
additional_secrets=KUBERNETES_SECRETS.split(",")
+ ARGO_WORKFLOWS_KUBERNETES_SECRETS.split(","),
).to_dict()
)
)
Expand Down
120 changes: 120 additions & 0 deletions metaflow/plugins/kubernetes/kuberenetes_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import json
from metaflow.tracing import inject_tracing_vars


def compute_resource_limits(args):
limits_dict = dict()
if args.get("resource_limits_memory", None):
limits_dict["memory"] = "%sM" % str(args["resource_limits_memory"])
if args.get("resource_limits_cpu", None):
limits_dict["cpu"] = args["resource_limits_cpu"]
if args["gpu"] is not None:
limits_dict["%s.com/gpu".lower() % args["gpu_vendor"]] = str(args["gpu"])
return limits_dict


def get_list_from_untyped(possible_list):
return (
[]
if not possible_list
else [possible_list]
if isinstance(possible_list, str)
else possible_list
)


def compute_tempfs_enabled(args):
use_tmpfs = args["use_tmpfs"]
tmpfs_size = args["tmpfs_size"]
return use_tmpfs or (tmpfs_size and not use_tmpfs)


def make_kubernetes_container(
client, name, commands, args, envs, additional_secrets=[]
):
from kubernetes.client import V1SecurityContext, ApiClient

class KubernetesClientDataObj(object):
def __init__(self, data_dict, class_name):
self.data = json.dumps(data_dict) if data_dict is not None else None
self.class_name = class_name

def get_deserialized_object(self):
if self.data is not None:
return ApiClient().deserialize(self, self.class_name)
else:
return None

security_context = KubernetesClientDataObj(
args["security_context"], V1SecurityContext
).get_deserialized_object()

# tmpfs variables
tmpfs_enabled = compute_tempfs_enabled(args)

return client.V1Container(
name=name,
command=commands,
env=[client.V1EnvVar(name=k, value=str(v)) for k, v in envs.items()]
# And some downward API magic. Add (key, value)
# pairs below to make pod metadata available
# within Kubernetes container.
+ [
client.V1EnvVar(
name=k,
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path=str(v))
),
)
for k, v in {
"METAFLOW_KUBERNETES_POD_NAMESPACE": "metadata.namespace",
"METAFLOW_KUBERNETES_POD_NAME": "metadata.name",
"METAFLOW_KUBERNETES_POD_ID": "metadata.uid",
"METAFLOW_KUBERNETES_SERVICE_ACCOUNT_NAME": "spec.serviceAccountName",
"METAFLOW_KUBERNETES_NODE_IP": "status.hostIP",
}.items()
]
+ [
client.V1EnvVar(name=k, value=str(v))
for k, v in inject_tracing_vars({}).items()
],
env_from=[
client.V1EnvFromSource(
secret_ref=client.V1SecretEnvSource(
name=str(k),
# optional=True
)
)
for k in get_list_from_untyped(args.get("secrets")) + additional_secrets
if k
],
image=args["image"],
security_context=security_context,
image_pull_policy=args["image_pull_policy"],
resources=client.V1ResourceRequirements(
requests={
"cpu": str(args["cpu"]),
"memory": "%sM" % str(args["memory"]),
"ephemeral-storage": "%sM" % str(args["disk"]),
},
limits=compute_resource_limits(args),
),
volume_mounts=(
[
client.V1VolumeMount(
mount_path=args.get("tmpfs_path"),
name="tmpfs-ephemeral-volume",
)
]
if tmpfs_enabled
else []
)
+ (
[
client.V1VolumeMount(mount_path=path, name=claim)
for claim, path in args["persistent_volume_claims"].items()
]
if args["persistent_volume_claims"] is not None
else []
),
)
6 changes: 6 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ def create_job(
persistent_volume_claims=None,
tolerations=None,
labels=None,
security_context=None,
resource_limits_memory=None,
resource_limits_cpu=None,
):
if env is None:
env = {}
Expand Down Expand Up @@ -213,6 +216,9 @@ def create_job(
tmpfs_size=tmpfs_size,
tmpfs_path=tmpfs_path,
persistent_volume_claims=persistent_volume_claims,
security_context=security_context,
resource_limits_memory=resource_limits_memory,
resource_limits_cpu=resource_limits_cpu,
)
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
.environment_variable("METAFLOW_CODE_URL", code_package_url)
Expand Down
15 changes: 15 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ def kubernetes():
@click.option("--memory", help="Memory requirement for Kubernetes pod.")
@click.option("--gpu", help="GPU requirement for Kubernetes pod.")
@click.option("--gpu-vendor", help="GPU vendor requirement for Kubernetes pod.")
@click.option("--resource-limits-memory", help="Memory limits for Kubernetes pod.")
@click.option("--resource-limits-cpu", help="CPU limits for Kubernetes pod.")
@click.option(
"--security-context",
default=None,
type=JSONTypeClass(),
multiple=False,
help="Security context Kubernetes pod.",
)
@click.option("--run-id", help="Passed to the top-level 'step'.")
@click.option("--task-id", help="Passed to the top-level 'step'.")
@click.option("--input-paths", help="Passed to the top-level 'step'.")
Expand Down Expand Up @@ -132,6 +141,9 @@ def step(
run_time_limit=None,
persistent_volume_claims=None,
tolerations=None,
security_context=None,
resource_limits_memory=None,
resource_limits_cpu=None,
**kwargs
):
def echo(msg, stream="stderr", job_id=None, **kwargs):
Expand Down Expand Up @@ -245,6 +257,9 @@ def _sync_metadata():
env=env,
persistent_volume_claims=persistent_volume_claims,
tolerations=tolerations,
security_context=security_context,
resource_limits_memory=resource_limits_memory,
resource_limits_cpu=resource_limits_cpu,
)
except Exception as e:
traceback.print_exc(chain=False)
Expand Down
26 changes: 25 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
KUBERNETES_PERSISTENT_VOLUME_CLAIMS,
KUBERNETES_TOLERATIONS,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_SECURITY_CONTEXT,
KUBERNETES_RESOURCE_LIMITS_MEMORY,
KUBERNETES_RESOURCE_LIMITS_CPU,
)
from metaflow.plugins.resources_decorator import ResourcesDecorator
from metaflow.plugins.timeout_decorator import get_run_time_limit_for_task
Expand Down Expand Up @@ -108,6 +111,9 @@ class KubernetesDecorator(StepDecorator):
"tmpfs_size": None,
"tmpfs_path": "/metaflow_temp",
"persistent_volume_claims": None, # e.g., {"pvc-name": "/mnt/vol", "another-pvc": "/mnt/vol2"}
"security_context": None,
"resource_limits_memory": None,
"resource_limits_cpu": None,
}
package_url = None
package_sha = None
Expand All @@ -122,6 +128,20 @@ def __init__(self, attributes=None, statically_defined=False):
self.attributes["service_account"] = KUBERNETES_SERVICE_ACCOUNT
if not self.attributes["gpu_vendor"]:
self.attributes["gpu_vendor"] = KUBERNETES_GPU_VENDOR
if not self.attributes["security_context"] and KUBERNETES_SECURITY_CONTEXT:
self.attributes["security_context"] = KUBERNETES_SECURITY_CONTEXT
if (
not self.attributes["resource_limits_memory"]
and KUBERNETES_RESOURCE_LIMITS_MEMORY
):
self.attributes[
"resource_limits_memory"
] = KUBERNETES_RESOURCE_LIMITS_MEMORY
if (
not self.attributes["resource_limits_cpu"]
and KUBERNETES_RESOURCE_LIMITS_CPU
):
self.attributes["resource_limits_cpu"] = KUBERNETES_RESOURCE_LIMITS_CPU
if not self.attributes["node_selector"] and KUBERNETES_NODE_SELECTOR:
self.attributes["node_selector"] = KUBERNETES_NODE_SELECTOR
if not self.attributes["tolerations"] and KUBERNETES_TOLERATIONS:
Expand Down Expand Up @@ -339,7 +359,11 @@ def runtime_step_cli(
"=".join([key, str(val)]) if val else key
for key, val in v.items()
]
elif k in ["tolerations", "persistent_volume_claims"]:
elif k in [
"tolerations",
"persistent_volume_claims",
"security_context",
]:
cli_args.command_options[k] = json.dumps(v)
else:
cli_args.command_options[k] = v
Expand Down