Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent with events #852

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open

agent with events #852

wants to merge 17 commits into from

Conversation

Necr0x0Der
Copy link
Collaborator

No description provided.

@Necr0x0Der Necr0x0Der marked this pull request as draft February 20, 2025 08:47
@Necr0x0Der Necr0x0Der marked this pull request as ready for review February 27, 2025 10:01
@Necr0x0Der Necr0x0Der requested a review from besSveta March 6, 2025 10:51
Copy link
Contributor

@astroseger astroseger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review from point view of multi-threading implementation:

I.
If I understood correctly new thread is created inside
AgentObject.__metta_call
and inside
EventObject.start

Does it mean that a separate thread is created for each event (and for each _metta_call of object)? If yes, then it might be relatively expensive for some particular use cases (on my machine creating/destruction of 1000 threads costs ~0.05 sec + some memory footprint). However I'm not ready right now to propose an alternative.

II.
In
EventAgent.event_processor if self.events is empty the thread will consume 100% CPU by running

while True:

If I understood correctly this code should work, but it will not exit after self.running is set to False. However it should be noted that the current code (code from PR, not what is proposed here) does not guarantee it neither if event_processor can be called from different threads (if two threads pass self.events.empty but only one consume get) .

    def event_processor(self, *args):
        # `*args` received on `start`
        while self.running:
                    (event_id, func, args) = self.events.get()
                    # Wrapping into ValueAtom if arg is not an atom yet
                    with self.lock:
                          resp = self._metta.evaluate_atom(E(func,
                            *[a if isinstance(a, Atom) else ValueAtom(a) for a in args]))
                           for r in resp:
                                 self.outputs.put(r)

(we don't need self.lock at all if self._metta.evaluate_atom is thread safe and we don't care about order inside self.outputs)

If we want that self.events.get() is gracefully (without running in daemon mode for example) terminated immediately after self.running is set to False then it is tricky. Possible solution (for proposed version of the code):

  1. Use timeout in get
  2. Use "sentinel" object (put termination object in queue), but if there are many threads, we should put as many sentinel objects as there are threads

@astroseger
Copy link
Contributor

An addition to my previous comment:
This comment is only relevant if we need event_processor to be gracefully terminated after self.running is set to False.

In python 3.13 queue has shutdown functionality. So together with setting self.running to false we could simply shutdown queue and handle Shutdown exception. As I've mentioned in previous commend, before python 3.13 the standard solution seems to be a sentinel object trick, by the way I was wrong that we need as many sentinel objects as there are threads, since we can simply put sentinel object back to queue.

@Necr0x0Der
Copy link
Collaborator Author

@astroseger ,
I. Well, yeah.... There are different scenarios, though:
I.1) The agent is not a daemon, and doesn't have an event loop. Its methods are called directly, and start is not called. In this case, __metta_call__ returns a stream, which is typically consumed right away, and there is no real need to create a thread for it. However, someone may still want to not consume the stream immediately.
I.2) The agent is a daemon. In this case, the stream for the method called within __metta_call__ is not returned directly, and the method is evaluated in the background in a separate thread, which is the intended behavior. There is no start method, though, unless it is EventAgent, so no double work on stream creation is done.
I.3) The agent is an EventAgent and has an event loop created by start. In this case, it should not typically be a daemon. Initially, I created it as a daemon by default, but it instantly became obvious that there is no need for this. In this case, events are handle asynchronously, while other methods are evaluated immediately if being called directly, unless someone doesn't consume the stream they create right away.
I.4) The agent is both an EventAgent and a daemon. Here, a thread will be created for the event processor as well as for each __metta_call__. I don't see use cases for this atm. But if there are, this behavior should be intentional.
Concluding, it may make sense to create a stream in __metta_call__ only for daemons. But I'm not precisely sure. The current way was implemented, when there was no EventAgent, and wrapping all calls into stream might be redundant now. But this way is more flexible. It is up to the caller wether to wait for the stream to be finished on not. OTOH, there is indeed an overhead, which might be unnecessary (but it's not really the case that a separate thread is created for each event).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants