Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery task decorated functions failing in Airflow 2.9.1 #39541

Open
1 of 2 tasks
nathadfield opened this issue May 10, 2024 · 7 comments
Open
1 of 2 tasks

BigQuery task decorated functions failing in Airflow 2.9.1 #39541

nathadfield opened this issue May 10, 2024 · 7 comments
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:google Google (including GCP) related issues

Comments

@nathadfield
Copy link
Collaborator

Apache Airflow version

2.9.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

After upgrading to Airflow 2.9.1, @task decorated functions that implement BigQuery hooks are not successfully submitting jobs but returning an error such as the following:

google.api_core.exceptions.NotFound: 404 GET https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs/airflow_1715330189073773_48af4db3b105631bb26f6855063ccef0?location=EU&prettyPrint=false: Not found: Job my-project:EU.airflow_1715330189073773_48af4db3b105631bb26f6855063ccef0

I have replicated this issue against Airflow 2.9.1 and against main but this does not seem to be related to the Google provider because using either 10.17 or 10.16 will result in this error.

Using either Google provider with Airflow 2.8.4 does not cause this error.

What you think should happen instead?

No response

How to reproduce

Here's a simple DAG that will replicate the issue using Breeze. The bq_hook_test task will fail but bq_insert_job_test based on BigQueryInsertJobOperator with the same configuration will succeed.

breeze --python 3.10 --backend postgres start-airflow --forward-credentials ${HOME}/.config/gcloud
from datetime import datetime
from airflow import models

from airflow.decorators import task
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with models.DAG(
    dag_id='bq_hook_test',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    schedule='0 0 * * *',
) as dag:

    configuration={
        'query': {
            'query': 'SELECT 1;',
            'useLegacySql': False
        },
    }

    @task
    def bq_test():
        hook = BigQueryHook()
        hook.insert_job(
            location='EU',
            configuration=configuration,
        )


    bq_hook_test = bq_test()

    test = BigQueryInsertJobOperator(
        task_id='bq_insert_job_test',
        location='EU',
        configuration=configuration,
    )
Screenshot 2024-05-10 at 10 38 21

We are using Google default credentials for authentication with the following environment variables:

GOOGLE_CLOUD_PROJECT=my-project
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT=google-cloud-platform://

Operating System

n/a

Versions of Apache Airflow Providers

apache-airflow-providers-google=10.17.0

Deployment

Astronomer

Deployment details

No response

Anything else?

Full log exception.

[2024-05-10, 08:36:29 UTC] {taskinstance.py:2910} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 478, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/airflow/models/taskinstance.py", line 441, in _execute_callable
    return ExecutionCallableRunner(
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
    return self.func(*args, **kwargs)
  File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
    return_value = super().execute(context)
  File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/airflow/operators/python.py", line 238, in execute
    return_value = self.execute_callable()
  File "/opt/airflow/airflow/operators/python.py", line 256, in execute_callable
    return runner.run(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
    return self.func(*args, **kwargs)
  File "/files/dags/test.py", line 17, in bq_test
    hook.insert_job(
  File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 524, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 1681, in insert_job
    job_api_repr.result(timeout=timeout, retry=retry)
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/query.py", line 1626, in result
    while not is_job_done():
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/query.py", line 1551, in is_job_done
    if self.done(retry=retry, timeout=timeout):
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 938, in done
    self.reload(retry=retry, timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 828, in reload
    api_response = client._call_api(
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 831, in _call_api
    return call()
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
  File "/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@nathadfield nathadfield added kind:bug This is a clearly a bug area:core needs-triage label for new issues that we didn't triage yet labels May 10, 2024
@nathadfield
Copy link
Collaborator Author

Could this be something to do with updates to some of the google-api libraries between Airflow 2.8.4 and 2.9.1?

2.8.4

google-api-core==2.17.1
google-api-python-client==2.122.0
google-auth==2.28.2

2.9.1

google-api-core==2.19.0
google-api-python-client==2.127.0
google-auth==2.29.0

Although I'm confused why this would affect @task decorated functions?

@potiuk
Copy link
Member

potiuk commented May 10, 2024

Although I'm confused why this would affect @task decorated functions?

You are using BQHook, so it does not matter if task is decorated

Could this be something to do with updates to some of the google-api libraries between Airflow 2.8.4 and 2.9.1?

Can you downgrade the libraries and check if it fixes the issue?

@VladaZakharova - you might want to take a look at this one

@nathadfield
Copy link
Collaborator Author

@potiuk Yes, I guess my confusion is more about why hook is giving the error in this context rather than the operator; especially as it doesn't seen to be caused by the provider. I downgraded the libraries but the issue persists.

@pankajastro
Copy link
Member

Look like this is related to google-cloud-bigquery downgrading it to google-cloud-bigquery<3.21.0 fix for me

@rawwar rawwar removed the needs-triage label for new issues that we didn't triage yet label May 13, 2024
@nathadfield
Copy link
Collaborator Author

@pankajastro Interesting. Downgrading does work but I'm curious to know what issue is and why the hook method gives the error but the operator does not.

@pankajastro
Copy link
Member

pankajastro commented May 13, 2024

It is because the way we pass nowait param to insert_job method. When you are using @task then nowait is False but when you are using operator it is True

error coming from

else:
# Start the job and wait for it to complete and get the result.
job_api_repr.result(timeout=timeout, retry=retry)

I have drafted a PR to pin the lib version until we fix it #39583

@pankajastro pankajastro added provider:google Google (including GCP) related issues and removed area:core labels May 13, 2024
@nathadfield
Copy link
Collaborator Author

I see. Thanks for the info.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:google Google (including GCP) related issues
Projects
None yet
Development

No branches or pull requests

5 participants