Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

debounce new buildrequests events #6286

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions master/buildbot/data/buildrequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,12 @@ class EntityType(types.Entity):

@defer.inlineCallbacks
def generateEvent(self, brids, event):
events = []
for brid in brids:
# get the build and munge the result for the notification
br = yield self.master.data.get(('buildrequests', str(brid)))
events.append(br)
for br in events:
self.produceEvent(br, event)

@defer.inlineCallbacks
Expand Down
46 changes: 31 additions & 15 deletions master/buildbot/process/botmaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from buildbot.process.results import CANCELLED
from buildbot.process.results import RETRY
from buildbot.process.workerforbuilder import States
from buildbot.util import debounce
from buildbot.util import service


Expand Down Expand Up @@ -100,6 +101,7 @@ def __init__(self):
# a distributor for incoming build requests; see below
self.brd = BuildRequestDistributor(self)
self.brd.setServiceParent(self)
self._pending_builderids = set()

@defer.inlineCallbacks
def cleanShutdown(self, quickMode=False, stopReactor=True):
Expand Down Expand Up @@ -198,29 +200,38 @@ def getBuildernames(self):
def getBuilders(self):
return list(self.builders.values())

@defer.inlineCallbacks
def _buildrequest_added(self, key, msg):
self._pending_builderids.add(msg['builderid'])
self._flush_pending_builders()

# flush pending builders needs to be debounced, as per design the
# buildrequests events will arrive in burst.
# We debounce them to let the brd manage them as a whole
# without having to debounce the brd itself
@debounce.method(wait=.1, until_idle=True)
def _flush_pending_builders(self):
if not self._pending_builderids:
return
buildernames = []
for builderid in self._pending_builderids:
builder = self.getBuilderById(builderid)
if builder:
buildernames.append(builder.name)
self._pending_builderids.clear()
self.brd.maybeStartBuildsOn(buildernames)

def getBuilderById(self, builderid):
for builder in self.getBuilders():
if builderid == (yield builder.getBuilderId()):
return builder
return None
return self._builders_byid.get(builderid)

@defer.inlineCallbacks
def startService(self):
@defer.inlineCallbacks
def buildRequestAdded(key, msg):
builderid = msg['builderid']
builder = yield self.getBuilderById(builderid)
if builder is not None:
self.maybeStartBuildsForBuilder(builder.name)

# consume both 'new' and 'unclaimed' build requests
# consume both 'new' and 'unclaimed' build requests events
startConsuming = self.master.mq.startConsuming
self.buildrequest_consumer_new = yield startConsuming(
buildRequestAdded,
self._buildrequest_added,
('buildrequests', None, "new"))
self.buildrequest_consumer_unclaimed = yield startConsuming(
buildRequestAdded,
self._buildrequest_added,
('buildrequests', None, 'unclaimed'))
yield super().startService()

Expand Down Expand Up @@ -281,6 +292,9 @@ def reconfigServiceBuilders(self, new_config):
yield builder.setServiceParent(self)

self.builderNames = list(self.builders)
self._builders_byid = {}
for builder in self.builders.values():
self._builders_byid[(yield builder.getBuilderId())] = builder

yield self.master.data.updates.updateBuilderList(
self.master.masterid,
Expand All @@ -298,6 +312,8 @@ def stopService(self):
if self.buildrequest_consumer_unclaimed:
self.buildrequest_consumer_unclaimed.stopConsuming()
self.buildrequest_consumer_unclaimed = None
self._pending_builderids.clear()
self._flush_pending_builders.stop()
return super().stopService()

def maybeStartBuildsForBuilder(self, buildername):
Expand Down
8 changes: 8 additions & 0 deletions master/buildbot/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,11 @@
# https://github.com/pypa/setuptools/issues/2086
warnings.filterwarnings('ignore', ".*lib2to3 package is deprecated",
category=PendingDeprecationWarning)

# on python 3.9, this warning is generated by the stdlib
warnings.filterwarnings('ignore', ".*The loop argument is deprecated since Python",
category=DeprecationWarning)

# This warning is generated by the EC2 latent
warnings.filterwarnings('ignore', ".*stream argument is deprecated. Use stream parameter",
category=DeprecationWarning)
2 changes: 2 additions & 0 deletions master/buildbot/test/integration/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def test_latent_max_builds(self):
'project': ''},
],
)
self.master.reactor.advance(1)

# The worker fails to substantiate.
controller.start_instance(True)
Expand Down Expand Up @@ -165,6 +166,7 @@ def test_local_worker_max_builds(self):
'project': ''},
],
)
self.master.reactor.advance(1)

self.assertEqual(len(started_builds), 1)

Expand Down
6 changes: 5 additions & 1 deletion master/buildbot/test/integration/test_worker_latent.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,8 @@ def test_worker_close_connection_while_building(self):
yield self.assertBuildResults(1, RETRY)

# Now check that the build requeued and finished with success
# after the debounce delay
self.reactor.advance(1)
yield controller.start_instance(True)

yield self.assertBuildResults(2, None)
Expand Down Expand Up @@ -1153,13 +1155,15 @@ def test_build_stop_with_retry_during_substantiation(self):

# Indicate that the worker can't start an instance.
yield controller.start_instance(False)
self.reactor.advance(1)

yield self.assertBuildResults(1, RETRY)
self.assertEqual(
set(brids),
{req['buildrequestid'] for req in unclaimed_build_requests}
)
yield controller.auto_stop(True)
yield controller.start_instance(False)
self.flushLoggedErrors(LatentWorkerFailedToSubstantiate)

@defer.inlineCallbacks
Expand Down Expand Up @@ -1475,7 +1479,7 @@ def test_1worker_does_not_stop_machine_machine_after_timeout_during_build(self):

# create build request while machine is still awake. It should not
# suspend regardless of how much time passes
self.reactor.advance(4.9)
self.reactor.advance(3.9)
self.assertEqual(machine_controller.machine.state,
MachineStates.STARTED)
yield self.create_build_request([builder_id])
Expand Down
24 changes: 24 additions & 0 deletions master/buildbot/test/unit/util/test_debounce.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ def __init__(self, reactor):

@debounce.method(wait=4.0, get_reactor=lambda self: self.reactor)
def maybe(self):
return self._maybe()

@debounce.method(wait=4.0, until_idle=True, get_reactor=lambda self: self.reactor)
def maybe_until_idle(self):
return self._maybe()

def _maybe(self):
assert not self.callDeferred
self.calls += 1
log.msg('debounced function called')
Expand Down Expand Up @@ -62,6 +69,8 @@ def scenario(self, events):
self.clock.advance(t - self.clock.seconds())
if e == 'maybe':
db.maybe()
elif e == 'maybe_until_idle':
db.maybe_until_idle()
elif e == 'called':
db.expCalls += 1
elif e == 'complete':
Expand Down Expand Up @@ -113,6 +122,21 @@ def test_coalesce_calls(self):
(1, 7.0, 'check'),
])

def test_coalesce_calls_until_idle(self):
"""Multiple calls are coalesced during 4 seconds, but the function
runs 4 seconds after the last call."""
self.scenario([
(1, 0.0, 'maybe_until_idle'),
(1, 1.0, 'maybe_until_idle'),
(1, 2.0, 'maybe_until_idle'),
(1, 3.0, 'maybe_until_idle'),
(1, 4.0, 'check'), # should not be called at that time
(1, 7.0, 'called'),
(1, 8.0, 'check'),
(1, 9.0, 'complete'),
(1, 10.0, 'check'),
])

def test_second_call_during_first(self):
"""If the debounced method is called after an execution has begun, then
a second execution will take place 4 seconds after the execution
Expand Down
2 changes: 2 additions & 0 deletions master/buildbot/test/util/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ def create_build_request(self, builder_ids, properties=None):
],
properties=properties,
)
# run debounced calls
self.master.reactor.advance(1)
return ret

@defer.inlineCallbacks
Expand Down
21 changes: 14 additions & 7 deletions master/buildbot/util/debounce.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@

class Debouncer:
__slots__ = ['phase', 'timer', 'wait', 'function', 'stopped',
'completeDeferreds', 'get_reactor']
'completeDeferreds', 'get_reactor', 'until_idle']

def __init__(self, wait, function, get_reactor):
def __init__(self, wait, function, get_reactor, until_idle):
# time to wait
self.wait = wait
# zero-argument callable to invoke
Expand All @@ -45,6 +45,8 @@ def __init__(self, wait, function, get_reactor):
self.completeDeferreds = None
# for tests
self.get_reactor = get_reactor
# invoke after wait s of idle
self.until_idle = until_idle

def __call__(self):
if self.stopped:
Expand All @@ -53,9 +55,12 @@ def __call__(self):
if phase == PH_IDLE:
self.timer = self.get_reactor().callLater(self.wait, self.invoke)
self.phase = PH_WAITING
elif phase == PH_WAITING:
if self.until_idle:
self.timer.reset(self.wait)
elif phase == PH_RUNNING:
self.phase = PH_RUNNING_QUEUED
else: # phase == PH_WAITING or phase == PH_RUNNING_QUEUED:
else: # phase == PH_RUNNING_QUEUED:
pass

def __repr__(self):
Expand Down Expand Up @@ -95,18 +100,20 @@ def stop(self):

class _Descriptor:

def __init__(self, fn, wait, attrName, get_reactor):
def __init__(self, fn, wait, attrName, get_reactor, until_idle):
self.fn = fn
self.wait = wait
self.attrName = attrName
self.get_reactor = get_reactor
self.until_idle = until_idle

def __get__(self, instance, cls):
try:
db = getattr(instance, self.attrName)
except AttributeError:
db = Debouncer(self.wait, functools.partial(self.fn, instance),
functools.partial(self.get_reactor, instance))
functools.partial(self.get_reactor, instance),
self.until_idle)
setattr(instance, self.attrName, db)
return db

Expand All @@ -115,8 +122,8 @@ def _get_reactor_from_master(o):
return o.master.reactor


def method(wait, get_reactor=_get_reactor_from_master):
def method(wait, until_idle=False, get_reactor=_get_reactor_from_master):
def wrap(fn):
stateName = "__debounce_" + fn.__name__ + "__"
return _Descriptor(fn, wait, stateName, get_reactor)
return _Descriptor(fn, wait, stateName, get_reactor, until_idle)
return wrap
15 changes: 9 additions & 6 deletions master/docs/developer/utils.rst
Original file line number Diff line number Diff line change
Expand Up @@ -464,22 +464,25 @@ It's often necessary to perform some action in response to a particular type of
For example, steps need to update their status after updates arrive from the worker.
However, when many events arrive in quick succession, it's more efficient to only perform the action once, after the last event has occurred.

The ``debounce.method(wait)`` decorator is the tool for the job.
The ``debounce.method(wait, until_idle=False)`` decorator is the tool for the job.

.. py:function:: method(wait, get_reactor)
.. py:function:: method(wait, until_idle=False, get_reactor)

:param wait: time to wait before invoking, in seconds
:param until_idle: resets the timer on every call
:param get_reactor: A callable that takes the underlying instance and returns the reactor to use. Defaults to ``instance.master.reactor``.

Returns a decorator that debounces the underlying method.
The underlying method must take no arguments (except ``self``).

For each call to the decorated method, the underlying method will be invoked at least once within *wait* seconds (plus the time the method takes to execute).
Calls are "debounced" during that time, meaning that multiple calls to the decorated method will result in a single invocation.
Calls are "debounced", meaning that multiple calls to the decorated method will result in a single invocation.

.. note::
When `until_idle` is `True`, the underlying method will be called after *wait* seconds have elapsed since the last time the decorated method have been called.
In case of constant stream, it will never be called.

When `until_idle` is `False`, the underlying method will be called after *wait* seconds have elapsed since the first time the decorated method have been called.
In case of constant stream, it will called about once every *wait* seconds (plus the time the method takes to execute)

This functionality is similar to Underscore's ``debounce``, except that the Underscore method resets its timer on every call.

The decorated method is an instance of :py:class:`Debouncer`, allowing it to be started and stopped.
This is useful when the method is a part of a Buildbot service: call ``method.start()`` from ``startService`` and ``method.stop()`` from ``stopService``, handling its Deferred appropriately.
Expand Down