Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAGs are able to see historical dataset events when created new #39456

Open
2 tasks done
tosheer opened this issue May 7, 2024 · 3 comments · May be fixed by #39603
Open
2 tasks done

DAGs are able to see historical dataset events when created new #39456

tosheer opened this issue May 7, 2024 · 3 comments · May be fixed by #39603
Labels
area:datasets Issues related to the datasets feature kind:bug This is a clearly a bug

Comments

@tosheer
Copy link

tosheer commented May 7, 2024

Apache Airflow version

2.9.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

If a new dataset triggreed DAG is created for an already existing dataset. (Dataset has already existing dataset events) DAG see all dataset events from very first event for dataset.

What you think should happen instead?

If a new dataset triggreed DAG is created for an already existing dataset. (Dataset has already existing dataset events). DAG should see all the dataset events from the time when DAG was added.

How to reproduce

  • Create a new DAG which has dataset event producer. Enable both the producer.
  • Run producer dag multiple times.
  • Create a new consumer DAG which also get triggered from same dataset with catchup false.
  • Enable the new DAG.
  • You will see new DAG will run as soon as enabled and see very first event in the dataset.
  • In the next run whenever that was scheduled DAG will get all the events other than the first event it has already consumed.

import pendulum

from airflow.datasets import Dataset
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

# [START dataset_def]
tosheer_dag1_dataset = Dataset("s3://tosheer-dag1/output_1.txt", extra={"partition": "bye"})


with DAG(
    dag_id="tosheer_dataset_produces_1",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule="@daily",
    tags=["produces", "dataset-scheduled"],
) as dag1:
    # [START task_outlet]
    BashOperator(outlets=[tosheer_dag1_dataset], task_id="producing_task_1", bash_command="sleep 5")
    # [END task_outlet]

After this add

# [START dag_dep]
with DAG(
    dag_id="tosheer_dataset_consumes_1",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[tosheer_dag1_dataset],
    tags=["consumes", "dataset-scheduled"],
) as dag3:
    # [END dag_dep]
    BashOperator(
        task_id="consuming_1",
        bash_command='echo "ti_key={{ triggering_dataset_events }}"',
    )

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

Plain vanilla airflow.

Anything else?

This issue is associated with an earlier issue #38826. Fix of that issue just fixed disabled - enabled / deleted - recreated.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@tosheer tosheer added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 7, 2024
@RNHTTR RNHTTR added Can't Reproduce The problem cannot be reproduced pending-response area:datasets Issues related to the datasets feature and removed needs-triage label for new issues that we didn't triage yet labels May 7, 2024
@RNHTTR
Copy link
Collaborator

RNHTTR commented May 7, 2024

I'm not able to reproduce -- I think the context object in the post execute function is incorrect. context["dataset_events"]["test-cluster/test-schema/test-table"] should be (I think) context["triggered_dataset_events"]["test-cluster/test-schema/test-table"]. Even with that updated, it's coming back as an empty List.

@tosheer
Copy link
Author

tosheer commented May 9, 2024

Screenshot 2024-05-09 at 11 54 49 PM Screenshot 2024-05-09 at 11 53 15 PM @RNHTTR i have updated the dag definition as i was reproducing issue on local with a more simpler DAG definition. can you please try again and see if you can reproduce the issue. For more details see attached images.

@tosheer
Copy link
Author

tosheer commented May 9, 2024

Logs

[2024-05-09, 18:24:03 UTC] {local_task_job_runner.py:120} ▶ Pre task execution logs
[2024-05-09, 18:24:04 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-05-09, 18:24:04 UTC] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'echo "ti_key=defaultdict(<class \'list\'>, {\'s3://tosheer-dag1/output_1.txt\': [DatasetEvent(id=1, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'scheduled__2024-05-08T00:00:00+00:00\', source_map_index=-1), DatasetEvent(id=2, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:18:06.483814+00:00\', source_map_index=-1), DatasetEvent(id=3, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:18:27.256324+00:00\', source_map_index=-1), DatasetEvent(id=4, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:23:54.937647+00:00\', source_map_index=-1)]})"']
[2024-05-09, 18:24:04 UTC] {subprocess.py:86} INFO - Output:
[2024-05-09, 18:24:04 UTC] {subprocess.py:93} INFO - ti_key=defaultdict(<class 'list'>, {'s3://tosheer-dag1/output_1.txt': [DatasetEvent(id=1, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='scheduled__2024-05-08T00:00:00+00:00', source_map_index=-1), DatasetEvent(id=2, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:18:06.483814+00:00', source_map_index=-1), DatasetEvent(id=3, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:18:27.256324+00:00', source_map_index=-1), DatasetEvent(id=4, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:23:54.937647+00:00', source_map_index=-1)]})
[2024-05-09, 18:24:04 UTC] {subprocess.py:97} INFO - Command exited with return code 0

@RNHTTR RNHTTR removed Can't Reproduce The problem cannot be reproduced pending-response area:core labels May 10, 2024
@tosheer tosheer linked a pull request May 14, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:datasets Issues related to the datasets feature kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants