Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test failure: AttributeError: 'ChannelPromise' object has no attribute '__value__' resulting in timeout #665

Closed
MeggyCal opened this issue Jul 18, 2023 · 1 comment

Comments

@MeggyCal
Copy link

Summary:

  • Celery Version: 5.2.7
  • Celery-Beat Version: 2.5.0
  • Kombu version: 5.2.4

I am a packager in openSUSE. When I build your package and then run the testsuite, I get suspicious results which look like I hit celery/kombu#369.

Two tests fail for me: test_modeladmin_PeriodicTaskAdmin.test_run_task and test_modeladmin_PeriodicTaskAdmin.test_run_tasks, both with an identical stacktrace:

[  112s] =================================== FAILURES ===================================
[  112s] _______________ test_modeladmin_PeriodicTaskAdmin.test_run_task ________________
[  112s] 
[  112s] self = <promise: 0xffffaafe68b0>
[  112s] 
[  112s]     def __call__(self):
[  112s]         try:
[  112s] >           return self.__value__
[  112s] E           AttributeError: 'ChannelPromise' object has no attribute '__value__'
[  112s] 
[  112s] /usr/lib/python3.9/site-packages/kombu/utils/functional.py:30: AttributeError
[  112s] 
[  112s] During handling of the above exception, another exception occurred:
[  112s] 
[  112s] fun = <bound method Connection._connection_factory of <Connection: amqp://guest:**@127.0.0.1:5672// at 0xffffab0c4760>>
[  112s] catch = (<class 'amqp.exceptions.RecoverableConnectionError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>)
[  112s] args = [], kwargs = {}
[  112s] errback = <function Connection._ensure_connection.<locals>.on_error at 0xffffaaff49d0>
[  112s] max_retries = None, interval_start = 2, interval_step = 2, interval_max = 30
[  112s] callback = None, timeout = 4
[  112s] 
[  112s]     def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
[  112s]                         max_retries=None, interval_start=2, interval_step=2,
[  112s]                         interval_max=30, callback=None, timeout=None):
[  112s]         """Retry the function over and over until max retries is exceeded.
[  112s]     
[  112s]         For each retry we sleep a for a while before we try again, this interval
[  112s]         is increased for every retry until the max seconds is reached.
[  112s]     
[  112s]         Arguments:
[  112s]             fun (Callable): The function to try
[  112s]             catch (Tuple[BaseException]): Exceptions to catch, can be either
[  112s]                 tuple or a single exception class.
[  112s]     
[  112s]         Keyword Arguments:
[  112s]             args (Tuple): Positional arguments passed on to the function.
[  112s]             kwargs (Dict): Keyword arguments passed on to the function.
[  112s]             errback (Callable): Callback for when an exception in ``catch``
[  112s]                 is raised.  The callback must take three arguments:
[  112s]                 ``exc``, ``interval_range`` and ``retries``, where ``exc``
[  112s]                 is the exception instance, ``interval_range`` is an iterator
[  112s]                 which return the time in seconds to sleep next, and ``retries``
[  112s]                 is the number of previous retries.
[  112s]             max_retries (int): Maximum number of retries before we give up.
[  112s]                 If neither of this and timeout is set, we will retry forever.
[  112s]                 If one of this and timeout is reached, stop.
[  112s]             interval_start (float): How long (in seconds) we start sleeping
[  112s]                 between retries.
[  112s]             interval_step (float): By how much the interval is increased for
[  112s]                 each retry.
[  112s]             interval_max (float): Maximum number of seconds to sleep
[  112s]                 between retries.
[  112s]             timeout (int): Maximum seconds waiting before we give up.
[  112s]         """
[  112s]         kwargs = {} if not kwargs else kwargs
[  112s]         args = [] if not args else args
[  112s]         interval_range = fxrange(interval_start,
[  112s]                                  interval_max + interval_start,
[  112s]                                  interval_step, repeatlast=True)
[  112s]         end = time() + timeout if timeout else None
[  112s]         for retries in count():
[  112s]             try:
[  112s] >               return fun(*args, **kwargs)
[  112s] 
[  112s] /usr/lib/python3.9/site-packages/kombu/utils/functional.py:312: 
[  112s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[  112s] 
[  112s] self = <Connection: amqp://guest:**@127.0.0.1:5672// at 0xffffab0c4760>
[  112s] 
[  112s]     def _connection_factory(self):
[  112s]         self.declared_entities.clear()
[  112s]         self._default_channel = None
[  112s] >       self._connection = self._establish_connection()
[  112s] 
[  112s] /usr/lib/python3.9/site-packages/kombu/connection.py:877: 
[  112s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[  112s] 
[  112s] self = <Connection: amqp://guest:**@127.0.0.1:5672// at 0xffffab0c4760>
[  112s] 
[  112s]     def _establish_connection(self):
[  112s]         self._debug('establishing connection...')
[  112s] >       conn = self.transport.establish_connection()
[  112s] 
[  112s] /usr/lib/python3.9/site-packages/kombu/connection.py:812: 
[  112s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[  112s] 
[  112s] self = <kombu.transport.pyamqp.Transport object at 0xffffab0a9700>
[  112s] 
[  112s]     def establish_connection(self):
[  112s]         """Establish connection to the AMQP broker."""
[  112s]         conninfo = self.client
[  112s]         for name, default_value in self.default_connection_params.items():
[  112s]             if not getattr(conninfo, name, None):
[  112s]                 setattr(conninfo, name, default_value)
[  112s]         if conninfo.hostname == 'localhost':
[  112s]             conninfo.hostname = '127.0.0.1'
[  112s]         # when server_hostname is None, use hostname from URI.
[  112s]         if isinstance(conninfo.ssl, dict) and \
[  112s]                 'server_hostname' in conninfo.ssl and \
[  112s]                 conninfo.ssl['server_hostname'] is None:
[  112s]             conninfo.ssl['server_hostname'] = conninfo.hostname
[  112s]         opts = dict({
[  112s]             'host': conninfo.host,
[  112s]             'userid': conninfo.userid,
[  112s]             'password': conninfo.password,
[  112s]             'login_method': conninfo.login_method,
[  112s]             'virtual_host': conninfo.virtual_host,
[  112s]             'insist': conninfo.insist,
[  112s]             'ssl': conninfo.ssl,
[  112s]             'connect_timeout': conninfo.connect_timeout,
[  112s]             'heartbeat': conninfo.heartbeat,
[  112s]         }, **conninfo.transport_options or {})
[  112s]         conn = self.Connection(**opts)
[  112s]         conn.client = self.client
[  112s] >       conn.connect()
[  112s] 
[  112s] /usr/lib/python3.9/site-packages/kombu/transport/pyamqp.py:201: 
[  112s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[  112s] 
[  112s] self = <AMQP Connection: 127.0.0.1:5672// (disconnected) at 0xffffaaff9510>
[  112s] callback = None
[  112s] 
[  112s]     def connect(self, callback=None):
[  112s]         # Let the transport.py module setup the actual
[  112s]         # socket connection to the broker.
[  112s]         #
[  112s]         if self.connected:
[  112s]             return callback() if callback else None
[  112s]         try:
[  112s]             self.transport = self.Transport(
[  112s]                 self.host, self.connect_timeout, self.ssl,
[  112s]                 self.read_timeout, self.write_timeout,
[  112s]                 socket_settings=self.socket_settings,
[  112s]             )
[  112s] >           self.transport.connect()
[  112s] 
[  112s] /usr/lib/python3.9/site-packages/amqp/connection.py:323: 
[  112s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[  112s] 
[  112s] self = <TCPTransport: (disconnected) at 0xffffaafdd840>
[  112s] 
[  112s]     def connect(self):
[  112s]         try:
[  112s]             # are we already connected?
[  112s]             if self.connected:
[  112s]                 return
[  112s] >           self._connect(self.host, self.port, self.connect_timeout)
[  112s] 
[  112s] /usr/lib/python3.9/site-packages/amqp/transport.py:129: 
[  112s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[  112s] 
[  112s] self = <TCPTransport: (disconnected) at 0xffffaafdd840>, host = '127.0.0.1'
[  112s] port = 5672, timeout = 4
[  112s] 
[  112s]     def _connect(self, host, port, timeout):
[  112s]         entries = socket.getaddrinfo(
[  112s]             host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, SOL_TCP,
[  112s]         )
[  112s]         for i, res in enumerate(entries):
[  112s]             af, socktype, proto, canonname, sa = res
[  112s]             try:
[  112s]                 self.sock = socket.socket(af, socktype, proto)
[  112s]                 try:
[  112s]                     set_cloexec(self.sock, True)
[  112s]                 except NotImplementedError:
[  112s]                     pass
[  112s]                 self.sock.settimeout(timeout)
[  112s] >               self.sock.connect(sa)
[  112s] E               ConnectionRefusedError: [Errno 111] Connection refused
[  112s] 
[  112s] /usr/lib/python3.9/site-packages/amqp/transport.py:184: ConnectionRefusedError
[  112s] 
[  112s] During handling of the above exception, another exception occurred:
[  112s] 
[  112s] self = <t.unit.test_schedulers.test_modeladmin_PeriodicTaskAdmin object at 0xffffab607610>
[  112s] 
[  112s]     @pytest.mark.timeout(5)
[  112s]     def test_run_task(self):
[  112s]         ma = PeriodicTaskAdmin(PeriodicTask, self.site)
[  112s]         self.request = self.patch_request(self.request_factory.get('/'))
[  112s] >       ma.run_tasks(self.request, PeriodicTask.objects.filter(id=self.m1.id))
[  112s] 
[  112s] t/unit/test_schedulers.py:835: 
[  112s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[  112s] django_celery_beat/admin.py:235: in run_tasks
[  112s]     task_ids = [
[  112s] django_celery_beat/admin.py:239: in <listcomp>
[  112s]     else task.apply_async(args=args, kwargs=kwargs,
[  112s] /usr/lib/python3.9/site-packages/celery/app/task.py:575: in apply_async
[  112s]     return app.send_task(
[  112s] /usr/lib/python3.9/site-packages/celery/app/base.py:788: in send_task
[  112s]     amqp.send_task_message(P, name, message, **options)
[  112s] /usr/lib/python3.9/site-packages/celery/app/amqp.py:510: in send_task_message
[  112s]     ret = producer.publish(
[  112s] /usr/lib/python3.9/site-packages/kombu/messaging.py:177: in publish
[  112s]     return _publish(
[  112s] /usr/lib/python3.9/site-packages/kombu/connection.py:523: in _ensured
[  112s]     return fun(*args, **kwargs)
[  112s] /usr/lib/python3.9/site-packages/kombu/messaging.py:186: in _publish
[  112s]     channel = self.channel
[  112s] /usr/lib/python3.9/site-packages/kombu/messaging.py:209: in _get_channel
[  112s]     channel = self._channel = channel()
[  112s] /usr/lib/python3.9/site-packages/kombu/utils/functional.py:32: in __call__
[  112s]     value = self.__value__ = self.__contract__()
[  112s] /usr/lib/python3.9/site-packages/kombu/messaging.py:225: in <lambda>
[  112s]     channel = ChannelPromise(lambda: connection.default_channel)
[  112s] /usr/lib/python3.9/site-packages/kombu/connection.py:895: in default_channel
[  112s]     self._ensure_connection(**conn_opts)
[  112s] /usr/lib/python3.9/site-packages/kombu/connection.py:433: in _ensure_connection
[  112s]     return retry_over_time(
[  112s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[  112s] 
[  112s] fun = <bound method Connection._connection_factory of <Connection: amqp://guest:**@127.0.0.1:5672// at 0xffffab0c4760>>
[  112s] catch = (<class 'amqp.exceptions.RecoverableConnectionError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>)
[  112s] args = [], kwargs = {}
[  112s] errback = <function Connection._ensure_connection.<locals>.on_error at 0xffffaaff49d0>
[  112s] max_retries = None, interval_start = 2, interval_step = 2, interval_max = 30
[  112s] callback = None, timeout = 4
[  112s] 
[  112s]     def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
[  112s]                         max_retries=None, interval_start=2, interval_step=2,
[  112s]                         interval_max=30, callback=None, timeout=None):
[  112s]         """Retry the function over and over until max retries is exceeded.
[  112s]     
[  112s]         For each retry we sleep a for a while before we try again, this interval
[  112s]         is increased for every retry until the max seconds is reached.
[  112s]     
[  112s]         Arguments:
[  112s]             fun (Callable): The function to try
[  112s]             catch (Tuple[BaseException]): Exceptions to catch, can be either
[  112s]                 tuple or a single exception class.
[  112s]     
[  112s]         Keyword Arguments:
[  112s]             args (Tuple): Positional arguments passed on to the function.
[  112s]             kwargs (Dict): Keyword arguments passed on to the function.
[  112s]             errback (Callable): Callback for when an exception in ``catch``
[  112s]                 is raised.  The callback must take three arguments:
[  112s]                 ``exc``, ``interval_range`` and ``retries``, where ``exc``
[  112s]                 is the exception instance, ``interval_range`` is an iterator
[  112s]                 which return the time in seconds to sleep next, and ``retries``
[  112s]                 is the number of previous retries.
[  112s]             max_retries (int): Maximum number of retries before we give up.
[  112s]                 If neither of this and timeout is set, we will retry forever.
[  112s]                 If one of this and timeout is reached, stop.
[  112s]             interval_start (float): How long (in seconds) we start sleeping
[  112s]                 between retries.
[  112s]             interval_step (float): By how much the interval is increased for
[  112s]                 each retry.
[  112s]             interval_max (float): Maximum number of seconds to sleep
[  112s]                 between retries.
[  112s]             timeout (int): Maximum seconds waiting before we give up.
[  112s]         """
[  112s]         kwargs = {} if not kwargs else kwargs
[  112s]         args = [] if not args else args
[  112s]         interval_range = fxrange(interval_start,
[  112s]                                  interval_max + interval_start,
[  112s]                                  interval_step, repeatlast=True)
[  112s]         end = time() + timeout if timeout else None
[  112s]         for retries in count():
[  112s]             try:
[  112s]                 return fun(*args, **kwargs)
[  112s]             except catch as exc:
[  112s]                 if max_retries is not None and retries >= max_retries:
[  112s]                     raise
[  112s]                 if end and time() > end:
[  112s]                     raise
[  112s]                 if callback:
[  112s]                     callback()
[  112s]                 tts = float(errback(exc, interval_range, retries) if errback
[  112s]                             else next(interval_range))
[  112s]                 if tts:
[  112s]                     for _ in range(int(tts)):
[  112s]                         if callback:
[  112s]                             callback()
[  112s] >                       sleep(1.0)
[  112s] E                       Failed: Timeout >5.0s
[  112s] 
[  112s] /usr/lib/python3.9/site-packages/kombu/utils/functional.py:326: Failed
[  112s] ------------------------------ Captured log call -------------------------------
[  112s] WARNING  kombu.connection:connection.py:629 No hostname was supplied. Reverting to default 'localhost'

But maybe I have something configured improperly, I also get few warnings like this:

[  112s] t/unit/test_schedulers.py:188
[  112s]   /home/abuild/rpmbuild/BUILD/django-celery-beat-2.5.0/t/unit/test_schedulers.py:188: PytestUnknownMarkWarning: Unknown pytest.mark.celery - is this a typo?  You can register custom marks to avoid this warning - for details, see https://docs.pytest.org/en/stable/how-to/mark.html
[  112s]     @pytest.mark.celery(timezone='Europe/Berlin')

It shouldn't happen since celery 4.4.0: celery/celery#5720

Could you please tell me what is wrong? My package lives here: https://build.opensuse.org/package/show/home:mcalabkova:branches:devel:languages:python/python-django-celery-beat

@auvipy
Copy link
Member

auvipy commented Nov 19, 2023

celery/kombu#369 (comment) should fix your issue. it is not a celery bug but config issue. let us know if otherwise

@auvipy auvipy closed this as completed Nov 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants