Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the possibility of synchronous initialization #438

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

s0d3s
Copy link

@s0d3s s0d3s commented Aug 21, 2022

What do these changes do?

Prior to this commit, you could only initialize a queue in an asynchronous function. Now it is possible in synchronous, but you need to pass the event loop (even empty\not running) to the constructor

Just a new way to init janus. But gives more ways to handle exceptions and code manage.

(Minimum code to understand the changes)

Before

import asyncio
import janus

def async_f(async_q: janus.AsyncQueue[int]):
    ...
def sync_f(sync_q: janus.SyncQueue[int]):
    ...
async def main() -> None:
    loop = asyncio.get_running_loop()
    queue: janus.Queue[int] = janus.Queue()

    loop.run_in_executor(None, sync_f, queue.sync_q)
    await async_f(queue.async_q)
    ...

if __name__ == "__main__":
    asyncio.run(main())

Now

import asyncio
import janus

def async_f(async_q: janus.AsyncQueue[int]):
    ...
def sync_f(sync_q: janus.SyncQueue[int]):
    ...

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue: janus.Queue[int] = janus.Queue(loop=loop)
    loop.create_task(async_f(queue.async_q))
    loop.run_in_executor(None, sync_f, queue.sync_q)
    try:
         loop.run_forever()
    except KeyboardInterrupt:
         pass

Are there changes in behavior for the user?

There are no behavior changes for users.

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • Add a new news fragment into the CHANGES folder
    • name it <issue_id>.<type> (e.g. 588.bugfix)
    • if you don't have an issue_id change it to the pr id after creating the PR
    • ensure type is one of the following:
      • .feature: Signifying a new feature.
      • .bugfix: Signifying a bug fix.
      • .doc: Signifying a documentation improvement.
      • .removal: Signifying a deprecation or removal of public API.
      • .misc: A ticket has been closed, but it is not of interest to users.
    • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

Prior to this commit, you could only initialize a queue in an asynchronous function. Now it is possible in synchronous, but you need to pass the event loop (even empty\not running) to the constructor
@jettify jettify mentioned this pull request Mar 18, 2023
@ghost
Copy link

ghost commented Mar 22, 2023

@s0d3s The asyncio.Condition and Event in the Queue.init should also pass the self._loop. Otherwise, it will raise the following Error

RuntimeError: Task <Task pending name='Task-1' coro=<run_ace_server.<locals>.xxxx() running at server.py:21>> got Future <Future pending> attached to a different loop

@asvetlov
Copy link
Member

Calling asyncio.get_event_loop() from non-async code is soft-deprecated now.
Please provide a realistic usage example that doesn't contain the deprecated call.

@asvetlov
Copy link
Member

asvetlov commented Mar 22, 2023

I can imagine an alternative design with postponed initialization.
The tricky part is that: threading synchronization primitives can be created from any thread, but async counterparts should be called from running asyncio loop only.
There are two usage scenarios: async API is called first, and sync API is called first.

The first case is trivial: all structures are initialized as now but later in time.

The second case is more complex: sync init could instantiate only threading.Lock and threading.Condition instances leaving async counterparts untouched. In this case, the first async call, performed after the sync call, should instantiate asyncio.Lock / asyncio.Condition instances and put them in the proper state according to self._maxsize and len(self._queue) combination. _notify_async_full() and _notify_async_not_empty() should be called as well if needed.

Double-checked locking should be used b calling both sync and async inits to prevent racing.

The proposal is viable but not trivial.
It requires careful thinking of processing a semi-initialized queue state in a multithreaded environment.
It is an exciting challenge, and I will happily review a pull request from a champion.

@s0d3s
Copy link
Author

s0d3s commented Mar 26, 2023

It's not such a challenge, but it's accepted😉 Wait for the news soon.

s0d3s and others added 3 commits March 27, 2023 00:48
… the Queue.

The behavior of all functions after initialization has not changed. Also the duplicated code from `_notify_(a)sync_not_(empty/full)` was moved to a separate `_notify_(a)sync_condition` function. As a result, the bug that `_notify_sync_not_empty` did not add a handler to `_pending` was also fixed.

Prior to full initialization, some `Queue` attributes are replaced with dummies. It is worth noting that `async_q` is replaced with an instance of the `PreInitDummyAsyncQueue` class before `Queue` is fully initialized. Although after full initialization, `Queue.async_q` is replaced by the desired object, it is worth remembering that the reference to `PreInitDummyAsyncQueue` obj could remain with the user. However, this is not a problem since after initialization, the dummy starts working as a proxy.
@s0d3s
Copy link
Author

s0d3s commented Mar 30, 2023

I set myself the main task - to introduce new functionality while not changing the behavior of the existing one. The decisions made to achieve this may seem controversial (for a short introduction, see the commit comment). Nevertheless, all tasks were completed.

Now the following is possible:

  • Using janus in ONLY synchronous mode
import janus

if __name__ == '__main__':
   items_count = 10
   queue = janus.Queue(maxsize=items_count, init_async_part=False)

   for i in range(items_count):
       queue.sync_q.put(i)

   while not queue.sync_q.empty():
       print(queue.sync_q.get(), end=" ")
  • Possibility of post initialization of asynchronous parts
    • direct
    import threading
    import asyncio
    import janus
    
    if __name__ == '__main__':
        items_count = 10
        queue = janus.Queue(maxsize=items_count, init_async_part=False)
    
        all_done = threading.Event()
    
        def wait_init_the_get(sync_q: janus.SyncQueue):
            # Also now you can wait for init using `full_init` event
            # sync_q._parent.full_init.wait()
            all_done.wait()
    
            while not sync_q.empty():
                print(sync_q.get(), end=" ")
                sync_q.task_done()
    
        async def init_async_then_put(queue_: janus.Queue):
            queue_.trigger_async_initialization()
    
            for i in range(items_count):
                await queue_.async_q.put(i)
    
            all_done.set()
    
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    
        loop.create_task(init_async_then_put(queue))
        loop.run_until_complete(loop.run_in_executor(None, wait_init_the_get, queue.sync_q))
        loop.close()
    • via proxy
    import threading
    import asyncio
    import janus
    
    from typing import Union
    
    if __name__ == '__main__':
        items_count = 10
        queue = janus.Queue(maxsize=items_count, init_async_part=False)
        all_done = threading.Event()
    
        def wait_init_the_get(sync_q: janus.SyncQueue):
            all_done.wait()
    
            while not sync_q.empty():
                print(sync_q.get(), end=" ")
                sync_q.task_done()
    
        async def init_async_then_put(async_q: Union[janus.AsyncQueue, janus.PreInitDummyAsyncQueue]):
    
            for i in range(items_count):
                # asynchronous parts will be automatically initialized
                # the first time the "asynchronous queue" attributes are accessed
                # (in fact, until the moment of full initialization,
                #  this will be a stub object not real `async_q`)
                await async_q.put(i)
    
            all_done.set()
    
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    
        loop.create_task(init_async_then_put(queue.async_q))
        loop.run_until_complete(loop.run_in_executor(None, wait_init_the_get, queue.sync_q))
        loop.close()

P.S. @asvetlov I took note of your comments, but found no use for them.
P.S.S. After re-reading it again, it seemed to me that you maybe meant the possibility of initializing only the asynchronous part (without the synchronous one), but I don’t see the point in this, since the synchronous part cannot “in any way” interfere with the asynchronous one. Probably I misunderstood you =/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants