Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new ExternalAPITaskSensor to monitor external DAGs via Airflow REST API #39463

Open
2 tasks done
cherrera20 opened this issue May 7, 2024 · 6 comments
Open
2 tasks done
Labels
kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet pending-response

Comments

@cherrera20
Copy link

Description

I'm proposing to add a new ExternalAPITaskSensor to the Apache Airflow project. This sensor leverages the Airflow REST API to monitor the status of an external DAG or task. It can be used to check the completion of another DAG or task before proceeding with the execution of the current task, providing better orchestration across DAGs.

Use case/motivation

The new sensor provides the ability to monitor the status of external DAGs using the Airflow REST API, which is particularly useful in a distributed environment where multiple Airflow instances are running.

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@cherrera20 cherrera20 added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet labels May 7, 2024
Copy link

boring-cyborg bot commented May 7, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@RNHTTR
Copy link
Collaborator

RNHTTR commented May 7, 2024

Could you use the HTTP Sensor?

@rawwar rawwar removed the needs-triage label for new issues that we didn't triage yet label May 8, 2024
@cherrera20
Copy link
Author

cherrera20 commented May 8, 2024

Hi there! Thanks for your response and please correct me if I'm wrong about the HttpSensor implementation.

The idea behind the ExternalAPITaskSensor is to address specific needs that go beyond what the HttpSensor can offer. While the HttpSensor provides a basic mechanism to poll an API endpoint, it doesn't really grasp the concept of external tasks and DAGs in the Airflow world.

The ExternalAPITaskSensor would retain the context of external tasks and DAGs, providing a seamless way to monitor specific DAG runs or tasks in another Airflow instance via the API. It would leverage attributes like external_dag_id, execution_date, allowed_states, skipped_states, and failed_states, which are unique to external DAGs and tasks.

Here's why I think this sensor is necessary:

  • State Awareness and Exception Management: It would handle different states (like success, skipped, and failed) and raise exceptions (e.g., AirflowFailException, AirflowSkipException) when a specific state is reached. This ensures that task dependencies are managed correctly.

  • Advanced Error Handling: It would differentiate between transient network errors (retries with a threshold) and API-related issues, providing more robust error handling.

  • Deferrable Execution: The sensor would be able to defer execution to a custom WorkflowTrigger.

In summary, while the HttpSensor is great for basic API polling, it lacks the features needed to monitor external tasks and DAGs properly. The ExternalAPITaskSensor would encapsulate this logic, making it a valuable tool for workflows that rely on task dependencies across multiple DAGs or external Airflow instances (Same concept as ExternalTaskSensor)

I hope this explanation makes the purpose clear. Let me know if you have any questions or suggestions!

@rawwar rawwar added needs-triage label for new issues that we didn't triage yet and removed pending-response labels May 8, 2024
@RNHTTR
Copy link
Collaborator

RNHTTR commented May 8, 2024

State Awareness and Exception Management: It would handle different states (like success, skipped, and failed) and raise exceptions (e.g., AirflowFailException, AirflowSkipException) when a specific state is reached. This ensures that task dependencies are managed correctly.

Can't that be achieved with the HTTPSensor's response_check parameter? This allows you to pass a function that evaluates the response to the HTTPSensor for some condition.

So, if you query the taskInstances endpoint of the Airflow REST API , you can configure your response_check accordingly.

You could then use existing TI states to determine how to respond to a given state.

Advanced Error Handling: It would differentiate between transient network errors (retries with a threshold) and API-related issues, providing more robust error handling.

I believe this can also be done both with HTTPSensor parameters (for example tcp_keep_alive_count ) and normal Airflow retries.

Deferrable Execution: The sensor would be able to defer execution to a custom WorkflowTrigger.

The HTTPSensor already supports deferrable execution.

@Taragolis
Copy link
Contributor

IMHO, technically it could be added as a separate Provider with access through apache-airflow-client, but as every new provider it should comes comes thought Accepting new community providers process. So I would recommend to start a discussion in Dev List

image

@cherrera20
Copy link
Author

cherrera20 commented May 9, 2024

OK thanks. Just to illustrate how the sensor would look like I leave you a code snippet of the hypothetical call to the sensor. If it makes sense to you we can move on :-)

`

wait_for_single_task = ExternalAPITaskSensor(
    task_id="wait_for_single_task",
    external_dag_id="secondary_dag",
    external_task_id="task_a",
    allowed_states=["success"],
    failed_states=["failed"],
    skipped_states=["skipped"],
    http_conn_id="http_default",
    execution_date="{{ execution_date }}",
    deferrable=True
)


wait_for_multiple_tasks = ExternalAPITaskSensor(
    task_id="wait_for_multiple_tasks",
    external_dag_id="secondary_dag",
    external_task_ids=["task_b", "task_c"],
    allowed_states=["success"],
    failed_states=["failed"],
    skipped_states=["skipped"],
    http_conn_id="http_default",
    execution_date="{{ execution_date }}",
    deferrable=True
)


wait_for_task_group = ExternalAPITaskSensor(
    task_id="wait_for_task_group",
    external_dag_id="secondary_dag",
    external_task_group_id="group_1",
    allowed_states=["success"],
    failed_states=["failed"],
    skipped_states=["skipped"],
    http_conn_id="http_default",
    execution_date="{{ execution_date }}",
    deferrable=True
)

`

I'm not sure this can be addressed using the HttpSensor, because we need to use more than one API Call: /api/v1/dags/{self.external_dag_id}/dagRuns to count the records matching the given states and /api/v1/dags/{self.external_dag_id}/dagRuns/{dag_run_id}/taskInstances to monitor the external tasks Ids.

I keep waiting for your response. Thanks you all. @Taragolis @rawwar @RNHTTR ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet pending-response
Projects
None yet
Development

No branches or pull requests

4 participants