Skip to content

ConsumerMixin.consume should yield on socket.timeout to help integration in continuations #433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
alexgarel opened this issue Nov 28, 2014 · 3 comments

Comments

@alexgarel
Copy link

tl;dr:
ConsumerMixin can't be used in a framework using continuations.
Could it be that kombu add an option for consume tu yield in such cases, this would make integration lighter ?

We use kombu to poll events from amqp in a Tornado websocket. For this I need to use a generator wich can be called as a continuation on every ioLoop, non blocking if there are no events.

Our first attempt was to use the ConsumerMixin this way:

class BrokerClient(ConsumerMixin):

    def __init__(self):
        self.connection = Connection(getattr(settings, 'BROKER_URL', ''))
        self.queue = Queue(
            getattr(settings, 'WEBSOCKET_QUEUE', 'websocket'),
            Exchange(getattr(settings, 'WEBSOCKET_EXCHANGE', 'websocket')))
        # use tornado io_loop
        io_loop = ioloop.IOLoop.instance()
        pc = ioloop.PeriodicCallback(self._handle_loop, 1)
        self.events_poller = self.events_poller_generator()
        pc.start()

    def events_poller_generator(self):
        while True:
            try:
                if self.restart_limit.can_consume(1):
                    for _ in self.consume(timeout=1):
                        yield
            except self.connection.connection_errors:
                print('Connection to broker lost. Trying to re-establish the connection...')

    def _handle_loop(self):
        """Poll event during tornado io loop
        """
        # just go ahead
        next(self.events_poller)

The problem here is that consume will create/open/close a new connection each time.

Then we tried to remove the timeout. The problem is that the call is blocking until an event arrive.

The consume method of ConsumerMixin is:

def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs):
    elapsed = 0
    with self.consumer_context(**kwargs) as (conn, channel, consumers):
        for i in limit and range(limit) or count():
            if self.should_stop:
                break
            self.on_iteration()
            try:
                conn.drain_events(timeout=safety_interval)
            except socket.timeout:
                conn.heartbeat_check()
                elapsed += safety_interval
                if timeout and elapsed >= timeout:
                    raise
            except socket.error:
                if not self.should_stop:
                    raise
            else:
                yield
                elapsed = 0
    debug('consume exiting')

If the consume method would yield on a socket.timeout. (after conn.heartbeat_check()) the first attempt would work.

Could it be that kombu add an option for consume tu yield in such cases, this would make integration lighter.

For now we have to re-implement this method (and finally we didn't use consumer mixin, instead we go with:

def events_poller_generator(self, safety_interval=1):
    """iterator to poll events, this is our continuation

    This is handy because we want to eventually regenerate
    connection when lost.

    Also note that we do not use ConsumerMixin
    for its consume method does not yield in case of socket.timeout,
    it only yield when there is an incoming event.
    On the other hand if we use the timeout parameter,
    it will make a new connection to the service each time
    which is far too much overhead.
    """
    while True:
        try:
            # connect
            conn = self.connection.clone()
            conn.ensure_connection(self.on_connection_error, None)
            # define consumer
            consumer = Consumer(
                conn.default_channel,
                queues=[self.queue],
                callbacks=[self.process_task],
                on_decode_error=self.on_decode_error)
            # register
            consumer.consume()
            while True:
                try:
                    # poll events
                    conn.drain_events(timeout=safety_interval)
                except socket.timeout:
                    conn.heartbeat_check()
                yield
        except self.connection.connection_errors:
            gen_log.warning(
                'Connection to broker lost... %s' % self.connection.connection_errors)

def _handle_loop(self):
    """Poll event during tornado io loop
    """
    # just go ahead
    next(self.events_poller)
@avinassh
Copy link

avinassh commented Feb 7, 2016

Any update on this?

Trying to figure out how to use Kombu in async way in a Tornado IO Loop

@alexgarel
Copy link
Author

@avinassh the code above is for Tornado, with a class like this:

class BrokerClient(object):

    def __init__(self):
        self.connection = Connection("<broker_url>")
        # use tornado io_loop
        pc = ioloop.PeriodicCallback(self._handle_loop, 1)
        self.events_poller = self.events_poller_generator()
        pc.start()
        httpserver.HTTPServer(application).listen(options.port)

And the above methods. Then to start it:

io_loop = ioloop.IOLoop.instance()
BrokerClient()
io_loop.start()

Note: this is code stripped down from a larger program, not sure if I didn't miss somethings, but this should give you good indications.

@avinassh
Copy link

avinassh commented Feb 7, 2016

Thanks @alexgarel! I will try that code and report back here :D

@ask ask added this to the v4.1 milestone Jul 8, 2016
@auvipy auvipy modified the milestones: v4.1, 5.0 Jan 13, 2018
@auvipy auvipy modified the milestones: 5.0, 4.7 May 5, 2020
@auvipy auvipy modified the milestones: 5.1.0, 6.0 Sep 9, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants