Skip to content

Commit

Permalink
airflow 2.x generic pipeline mods. See also #3166. Changed processor …
Browse files Browse the repository at this point in the history
…airflow and jinja2 template airflow to comply with Airflow 2.x. Added new location of KubernetesPodOperator library in Airflow 2.x to test Pipeline for processor airflow. Added cpu and memory limits fields in airflow 2.x fashion as well.

Signed-off-by: Sven Thoms <[email protected]>
  • Loading branch information
shalberd committed Jan 12, 2024
1 parent e90d364 commit da12cc2
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 31 deletions.
25 changes: 20 additions & 5 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
"cpu_limit": operation.cpu_limit,
"memory_limit": operation.memory_limit,
"gpu_limit": operation.gpu,
"gpu_vendor": operation.gpu_vendor,
"operator_source": operation.filename,
}

Expand Down Expand Up @@ -598,13 +599,23 @@ def render_volumes(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
str_to_render = ""
for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []):
str_to_render += f"""
Volume(name="{v.pvc_name}", configs={{"persistentVolumeClaim": {{"claimName": "{v.pvc_name}"}}}}),"""
k8s.V1Volume(
name="{v.pvc_name}",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="{v.pvc_name}",
),
),"""
# set custom shared memory size
shm = elyra_properties.get(pipeline_constants.KUBERNETES_SHARED_MEM_SIZE)
if shm is not None and shm.size:
config = f"""configs={{"emptyDir": {{"medium": "Memory", "sizeLimit": "{shm.size}{shm.units}"}}}}"""
str_to_render += f"""
Volume(name="shm", {config}),"""
k8s.V1Volume(
name="shm",
empty_dir=k8s.V1EmptyDirVolumeSource(
medium="Memory",
size_limit="{shm.size}{shm.units}",
),
),"""
return dedent(str_to_render)

def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
Expand All @@ -615,8 +626,12 @@ def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
str_to_render = ""
for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []):
str_to_render += f"""
VolumeMount(name="{v.pvc_name}", mount_path="{v.path}",
sub_path="{v.sub_path}", read_only={v.read_only}),"""
k8s.V1VolumeMount(
name="{v.pvc_name}",
mount_path="{v.path}",
sub_path="{v.sub_path}",
read_only={v.read_only},
),"""
return dedent(str_to_render)

def render_secrets(self, elyra_properties: Dict[str, ElyraProperty], cos_secret: Optional[str]) -> str:
Expand Down
53 changes: 28 additions & 25 deletions elyra/templates/airflow/airflow_template.jinja2
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from airflow import DAG
from airflow.utils.dates import days_ago
import pendulum

args = {
'project_id' : '{{ pipeline_name }}',
}

dag = DAG(
'{{ pipeline_name }}',
dag_id='{{ pipeline_name }}',
default_args=args,
schedule_interval='@once',
start_date=days_ago(1),
schedule='@once',
start_date=pendulum.today('UTC').add(days=-1),
description="""
{{ pipeline_description|replace("\"\"\"", "\\\"\\\"\\\"") }}
""",
Expand All @@ -23,9 +23,8 @@ dag = DAG(
{% endfor %}
{% else %}
from airflow.kubernetes.secret import Secret
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
{% endif %}

{% if operation.operator_source %}# Operator source: {{ operation.operator_source }}{% endif %}
Expand All @@ -48,23 +47,27 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
task_id='{{ operation.notebook|regex_replace }}',
env_vars={{ operation.pipeline_envs }},
{% if operation.cpu_request or operation.mem_request or operation.cpu_limit or operation.memory_limit or operation.gpu_limit %}
resources = {
{% if operation.cpu_request %}
'request_cpu': '{{ operation.cpu_request }}',
{% endif %}
{% if operation.mem_request %}
'request_memory': '{{ operation.mem_request }}G',
{% endif %}
{% if operation.cpu_limit %}
'limit_cpu': '{{ operation.cpu_limit }}',
{% endif %}
{% if operation.memory_limit %}
'limit_memory': '{{ operation.memory_limit }}G',
{% endif %}
{% if operation.gpu_limit %}
'limit_gpu': '{{ operation.gpu_limit }}',
{% endif %}
},
container_resources=k8s.V1ResourceRequirements(
requests={
{% if operation.cpu_request %}
'cpu': '{{ operation.cpu_request }}',
{% endif %}
{% if operation.mem_request %}
'memory': '{{ operation.mem_request }}G',
{% endif %}
},
limits={
{% if operation.cpu_limit %}
'cpu': '{{ operation.cpu_limit }}',
{% endif %}
{% if operation.memory_limit %}
'memory': '{{ operation.memory_limit }}G',
{% endif %}
{% if operation.gpu_limit %}
'{{ operation.gpu_vendor }}': '{{ operation.gpu_limit }}',
{% endif %}
}
),
{% endif %}
volumes=[{{ processor.render_volumes(operation.elyra_props) }}],
volume_mounts=[{{ processor.render_mounts(operation.elyra_props) }}],
Expand All @@ -73,7 +76,7 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
labels={{ processor.render_labels(operation.elyra_props) }},
tolerations=[{{ processor.render_tolerations(operation.elyra_props) }}],
in_cluster={{ in_cluster }},
config_file="{{ kube_config_path }}",
config_file={% if kube_config_path is string %}"{{ kube_config_path }}"{% else %}{{ kube_config_path }}{% endif %},
{% endif %}
dag=dag)
{% if operation.image_pull_policy %}
Expand Down
2 changes: 1 addition & 1 deletion elyra/tests/pipeline/airflow/test_processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic
with open(response) as f:
file_as_lines = f.read().splitlines()

assert "from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator" in file_as_lines
assert "from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator" in file_as_lines

# Check DAG project name
for i in range(len(file_as_lines)):
Expand Down

0 comments on commit da12cc2

Please sign in to comment.