diff --git a/timeout_decorator/timeout_decorator.py b/timeout_decorator/timeout_decorator.py index f05da0f..c8b01fe 100644 --- a/timeout_decorator/timeout_decorator.py +++ b/timeout_decorator/timeout_decorator.py @@ -94,24 +94,24 @@ def new_function(*args, **kwargs): return decorate -def _target(queue, function, *args, **kwargs): +def _target(child_conn, function, *args, **kwargs): """Run a function with arguments and return output via a queue. - This is a helper function for the Process created in _Timeout. It runs the function with positional arguments and keyword arguments and then returns the function's output by way of a queue. If an exception gets raised, it is returned to _Timeout to be raised by the value property. """ try: - queue.put((True, function(*args, **kwargs))) + child_conn.send((True, function(*args, **kwargs))) except: - queue.put((False, sys.exc_info()[1])) + child_conn.send((False, sys.exc_info()[1])) + finally: + child_conn.close() class _Timeout(object): """Wrap a function and add a timeout (limit) attribute to it. - Instances of this class are automatically generated by the add_timeout function defined above. Wrapping a function allows asynchronous calls to be made and termination of execution after a timeout has passed. @@ -125,49 +125,42 @@ def __init__(self, function, timeout_exception, exception_message, limit): self.__exception_message = exception_message self.__name__ = function.__name__ self.__doc__ = function.__doc__ - self.__timeout = time.time() - self.__process = multiprocessing.Process() - self.__queue = multiprocessing.Queue() + self.__process = None + self.__parent_conn = None + self.__child_conn = None def __call__(self, *args, **kwargs): """Execute the embedded function object asynchronously. - The function given to the constructor is transparently called and requires that "ready" be intermittently polled. If and when it is True, the "value" property may then be checked for returned data. """ - self.__limit = kwargs.pop('timeout', self.__limit) - self.__queue = multiprocessing.Queue(1) - args = (self.__queue, self.__function) + args - self.__process = multiprocessing.Process(target=_target, - args=args, - kwargs=kwargs) + self.__parent_conn, self.__child_conn = multiprocessing.Pipe(duplex=False) + + args = (self.__child_conn, self.__function) + args + self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs) self.__process.daemon = True self.__process.start() - self.__timeout = self.__limit + time.time() - while not self.ready: - time.sleep(0.01) - return self.value + if self.__parent_conn.poll(self.__limit): + return self.value + else: + self.cancel() def cancel(self): """Terminate any possible execution of the embedded function.""" + self.__parent_conn.close() if self.__process.is_alive(): self.__process.terminate() - _raise_exception(self.__timeout_exception, self.__exception_message) - @property - def ready(self): - """Read-only property indicating status of "value" property.""" - if self.__timeout < time.time(): - self.cancel() - return self.__queue.full() and not self.__queue.empty() - @property def value(self): - """Read-only property containing data returned from function.""" - if self.ready is True: - flag, load = self.__queue.get() - if flag: - return load - raise load + flag, load = self.__parent_conn.recv() + self.__parent_conn.close() + # when self.__parent_conn.recv() exits, maybe __process is still alive, + # then it might zombie the process. so join it explicitly + self.__process.join(1) + + if flag: + return load + raise load