Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions kombu/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ def declaration_cached(entity, channel):
return entity in channel.connection.client.declared_entities


def maybe_declare(entity, channel=None, retry=False, **retry_policy):
async def maybe_declare(entity, channel=None, retry=False, **retry_policy):
"""Declare entity (cached)."""
if retry:
return _imaybe_declare(entity, channel, **retry_policy)
return _maybe_declare(entity, channel)
return await _imaybe_declare(entity, channel, **retry_policy)
return await _maybe_declare(entity, channel)


def _ensure_channel_is_bound(entity, channel):
Expand All @@ -135,7 +135,7 @@ def _ensure_channel_is_bound(entity, channel):
return entity


def _maybe_declare(entity, channel):
async def _maybe_declare(entity, channel):
# _maybe_declare sets name on original for autogen queues
orig = entity

Expand All @@ -156,21 +156,21 @@ def _maybe_declare(entity, channel):

if not channel.connection:
raise RecoverableConnectionError('channel disconnected')
entity.declare(channel=channel)
await entity.declare(channel=channel)
if declared is not None and ident:
declared.add(ident)
if orig is not None:
orig.name = entity.name
return True


def _imaybe_declare(entity, channel, **retry_policy):
async def _imaybe_declare(entity, channel, **retry_policy):
_ensure_channel_is_bound(entity, channel)

if not entity.channel.connection:
raise RecoverableConnectionError('channel disconnected')

return entity.channel.connection.client.ensure(
return await entity.channel.connection.client.ensure(
entity, _maybe_declare, **retry_policy)(entity, channel)


Expand Down