Source code for kombu.transport.virtual.scheduling
"""
kombu.transport.virtual.scheduling
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Consumer utilities.
"""
from __future__ import absolute_import
from itertools import count
[docs]class FairCycle(object):
"""Consume from a set of resources, where each resource gets
an equal chance to be consumed from."""
def __init__(self, fun, resources, predicate=Exception):
self.fun = fun
self.resources = resources
self.predicate = predicate
self.pos = 0
def _next(self):
while 1:
try:
resource = self.resources[self.pos]
self.pos += 1
return resource
except IndexError:
self.pos = 0
if not self.resources:
raise self.predicate()
[docs] def get(self, **kwargs):
for tried in count(0): # for infinity
resource = self._next()
try:
return self.fun(resource, **kwargs), resource
except self.predicate:
if tried >= len(self.resources) - 1:
raise
def __repr__(self):
return '<FairCycle: {self.pos}/{size} {self.resources}>'.format(
self=self, size=len(self.resources))