source

함수 호출의 타임아웃

itover 2022. 10. 18. 23:18
반응형

함수 호출의 타임아웃

Python의 함수를 호출합니다.이 함수는 정지하여 스크립트를 강제로 재시작할 수 있습니다.

함수 호출 방법 또는 5초 이상 걸릴 경우 스크립트에 의해 취소되고 다른 작업이 수행되도록 함수는 무엇으로 묶어야 합니까?

UNIX 상에서 동작하고 있는 경우는, 시그널 패키지를 사용할 수 있습니다.

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

후 '10초'signal.alarm(10)핸들러가 호출됩니다.이것에 의해, 통상의 Python 코드에서 대행 수신할 수 있는 예외가 발생합니다.

이 모듈은 스레드와 잘 어울리지 않습니다(그렇다면 누가 잘 작동합니까?).

타임아웃이 발생했을 때 예외를 발생시키므로, 예를 들어 다음과 같은 기능 중 하나가 함수 내부에서 검출되어 무시될 수 있습니다.

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue

하시면 됩니다.multiprocessing.Process★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★

코드

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()

함수를 호출하려면 어떻게 해야 하나요, 아니면 어떤 형태로 묶어야 하나요? 5초 이상 걸리면 스크립트가 취소하도록 해야 하나요?

저는 이 질문/문제를 해결하는 요지를 데코레이터와 함께 올렸습니다.threading.Timer여기 내역이 있습니다.

호환성 가져오기 및 설정

Python 2와 3으로 테스트되었습니다.Unix/Linux 및 Windows에서도 동작합니다.

우선 수입품.Python 버전에 관계없이 코드 일관성을 유지하려고 합니다.

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

버전 독립 코드 사용:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

이제 표준 라이브러리에서 기능을 Import했습니다.

exit_after

다음, '', '끝나다', '끝나다' 합니다.main()위위스스스 :

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

그리고 여기 데코레이터가 있습니다.

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

사용.

5초 후 종료에 대한 질문에 직접 답변하는 사용법은 다음과 같습니다.

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

데모:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

두 번째 함수 호출이 종료되지 않고 프로세스가 트레이스 백과 함께 종료됩니다.

KeyboardInterrupt

Windows의 Python 2에서는 다음과 같은 키보드 인터럽트로 인해 sleep이 항상 중단되는 것은 아닙니다.

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

, 「 」, 「 」, 「 」, 「 」를 한, 로 동작하고 있는 .PyErr_CheckSignals(), Cython, Python 및 Keyboard인터럽트 무시

어떤 경우에도 1초 이상 sleeve 하는 것은 피하고 싶습니다.프로세서 타임으로 보면 1초 이상입니다.

함수 호출 방법 또는 5초 이상 걸릴 경우 스크립트에 의해 취소되고 다른 작업이 수행되도록 함수는 무엇으로 묶어야 합니까?

키보드를 잡고 다른 작업을 하려면 키보드를 잡으면 됩니다.방해하다.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

다른 제안서를 가지고 있습니다.그것은 순수한 함수(스레딩 제안서와 같은 API를 사용)로, 정상적으로 동작하고 있는 것 같습니다(이 스레드 제안서에 근거하고 있습니다).

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

유닛 테스트에서 타임아웃콜을 검색할 때 이 스레드를 찾았습니다.답변이나 서드파티 패키지에서 간단한 것을 찾을 수 없었기 때문에 아래의 데코레이터는 코드에 바로 넣을 수 있습니다.

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

테스트 또는 원하는 기능의 타임아웃은 다음과 같이 간단합니다.

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

stopit잘 처리하는 것 .pypi는 타임아웃을 잘 처리하는 것 같습니다.

는 음에들이 에 들어요.@stopit.threading_timeoutable데코레이터를 추가하다timeout파라미터는 장식된 함수에 대한 것으로, 예상대로 기능합니다.

pypi에서 확인하세요.https://pypi.python.org/pypi/stopit

저는 wrapt_timeout_decorator의 저자입니다.

여기에 제시된 솔루션의 대부분은 언뜻 보면 Linux에서 제대로 동작합니다.이것은 fork()와 signals()가 있기 때문입니다만, Windows 에서는 상황이 조금 다릅니다.또한 Linux의 서브스레드에 관해서는 Signals를 사용할 수 없습니다.

Windows에서 프로세스를 생성하기 위해서는 선택 가능해야 하며 많은 장식된 함수나 클래스 메서드는 선택 불가능해야 합니다.

따라서 dill이나 multiprocess(피클이나 multiprocessing이 아님)와 같은 더 나은 피클러를 사용해야 합니다.이 때문에 ProcessPoolExecutor(또는 기능이 제한된 경우에만)를 사용할 수 없습니다.

타임아웃 자체에 대해서는 타임아웃의 의미를 정의할 필요가 있습니다.Windows 에서는 프로세스를 생성하는 데 상당한(확정할 수 없는) 시간이 걸리기 때문입니다.이것은 짧은 타임아웃에서는 까다로울 수 있습니다.프로세스의 산란에는 약 0.5초(간단히!!!)가 소요된다고 가정합니다.0.2초의 타임아웃을 지정하면 어떻게 됩니까?0.5 + 0.2초 후에 기능이 타임아웃됩니까?(0.2초 동안 메서드를 실행).또는, 착신측 프로세스가 0.2초 후에 타임 아웃 되는가(이 경우, 장식된 함수는 생성되지 않기 때문에 항상 타임 아웃이 됩니다).

또한 중첩된 장식기는 고약할 수 있으며 하위 스레드에서 신호를 사용할 수 없습니다.진정한 범용 크로스 플랫폼 데코레이터를 만들고 싶다면 이 모든 것을 고려(테스트)해야 합니다.

그 외의 문제는, 예외를 발신자에게 되돌리는 것 외에, 로그의 문제도 있습니다(장식 기능에 사용되고 있는 경우는, 다른 프로세스의 파일에의 로그는 서포트되고 있지 않습니다).

모든 엣지 케이스를 커버하려고 했습니다.패키지 wrapt_timeout_decorator를 조사하거나 적어도 여기서 사용되는 unittests에서 영감을 얻은 Your own 솔루션을 테스트할 수 있습니다.

@Alexis Egermont - 아쉽게도 코멘트가 부족합니다.다른 사람이 당신에게 통지할 수 있을지도 모릅니다.당신의 수입 문제는 해결했다고 생각합니다.

여러 가지 제안이 있지만 동시에 사용하는 것은 없습니다.가장 읽기 쉬운 방법이라고 생각합니다.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

읽기 쉽고 유지보수가 매우 간단합니다.

풀을 만들고 단일 프로세스를 전송한 후 TimeoutError가 발생할 때까지 최대 5초 동안 기다립니다.이 에러는 필요에 따라 검출하여 처리할 수 있습니다.

python 3.2+ 네이티브로 2.7로 역리포트(pip install future).

은, 「스스로의 교환」을 치환하는 할 수 .ProcessPoolExecutorThreadPoolExecutor.

타임아웃 시 프로세스를 종료하고 싶다면 Pebble을 검토하는 것이 좋습니다.

@piro를 기반으로 하여 답변을 확장하면 컨텍스트 매니저를 구축할 수 있습니다.이를 통해 실행 성공 후 알람 신호를 비활성화하는 매우 읽기 쉬운 코드를 사용할 수 있습니다(set signal.alarm(0)).

from contextlib import contextmanager
import signal
import time

@contextmanager
def timeout(duration):
    def timeout_handler(signum, frame):
        raise TimeoutError(f'block timedout after {duration} seconds')
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(duration)
    try:
        yield
    finally:
        signal.alarm(0)

def sleeper(duration):
    time.sleep(duration)
    print('finished')

사용 예:

In [19]: with timeout(2):
    ...:     sleeper(1)
    ...:     
finished

In [20]: with timeout(2):
    ...:     sleeper(3)
    ...:         
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
      1 with timeout(2):
----> 2     sleeper(3)
      3 

<ipython-input-7-a75b966bf7ac> in sleeper(t)
      1 def sleeper(t):
----> 2     time.sleep(t)
      3     print('finished')
      4 

<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
      2 def timeout(duration):
      3     def timeout_handler(signum, frame):
----> 4         raise Exception(f'block timedout after {duration} seconds')
      5     signal.signal(signal.SIGALRM, timeout_handler)
      6     signal.alarm(duration)

Exception: block timedout after 2 seconds

사용하기 쉽고 신뢰성 높은 PyPi 프로젝트 타임아웃 조정기(https://pypi.org/project/timeout-decorator/)

설치:

pip install timeout-decorator

사용방법:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()

timeout-decorator하지 않음,하지 않음, Windows가 Windows를 지원하지 않음signal그냥...

윈도 시스템에서 타임아웃 디코레이터를 사용하면 다음 메시지가 나타납니다.

AttributeError: module 'signal' has no attribute 'SIGALRM'

분들은 '쓰자'를 쓰자는 .use_signals=False나한테는 안 통했어

작성자 @bitranox는 다음 패키지를 만들었습니다.

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

코드 샘플:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

다음 예외가 있습니다.

TimeoutError: Function mytest timed out after 5 seconds

하이라이트

  • 시간 초과 시 경고에 대한 사용 예외 발생 - 쉽게 수정할 수 있음
  • 크로스 플랫폼: Windows 및 Mac OS X
  • 호환성:Python 3.6+ (나도 python 2.7에서 테스트했는데 작은 구문 조정으로 작동합니다)

패럴렐 맵에 대한 자세한 설명 및 확장에 대해서는http://https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts 를 참조해 주세요.

최소한의 예

>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
  ...
__main__.TimeoutError: function 'bar' timed out after 4s

그리고 역시

>>> bar(2)
2

풀코드

import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill

from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any

class TimeoutError(Exception):

    def __init__(self, func: Callable, timeout: int):
        self.t = timeout
        self.fname = func.__name__

    def __str__(self):
            return f"function '{self.fname}' timed out after {self.t}s"


def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
    """lemmiwinks crawls into the unknown"""
    q.put(dill.loads(func)(*args, **kwargs))


def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
    """
    Single function call with a timeout

    Args:
        func: the function
        timeout: The timeout in seconds
    """

    if not isinstance(timeout, int):
        raise ValueError(f'timeout needs to be an int. Got: {timeout}')

    if func is None:
        return functools.partial(killer_call, timeout=timeout)

    @functools.wraps(killer_call)
    def _inners(*args, **kwargs) -> Any:
        q_worker = mp.Queue()
        proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
        proc.start()
        try:
            return q_worker.get(timeout=timeout)
        except mpq.Empty:
            raise TimeoutError(func, timeout)
        finally:
            try:
                proc.terminate()
            except:
                pass
    return _inners

if __name__ == '__main__':
    @killer_call(timeout=4)
    def bar(x):
        import time
        time.sleep(x)
        return x

    print(bar(2))
    bar(10)

메모들

다음 방법 때문에 함수 내부로 가져와야 합니다.dill작동하다.

이는 또한 이러한 기능이 다음 제품과 호환되지 않을 수 있음을 의미합니다.doctest타겟 기능 내에 Import가 있는 경우.에 문제가 생깁니다.__import__찾을 수 없습니다.

신호도 똑같이 사용할 수 있습니다.아래의 예가 당신에게 도움이 될 것 같습니다.스레드에 비해 매우 심플합니다.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

@piro의 답변을 바탕으로 누구에게나 도움이 될 수 있도록 기능 데코레이터를 만들었습니다.

import time
import signal
from functools import wraps


def timeout(timeout_secs: int):
    def wrapper(func):
        @wraps(func)
        def time_limited(*args, **kwargs):
            # Register an handler for the timeout
            def handler(signum, frame):
                raise Exception(f"Timeout for function '{func.__name__}'")

            # Register the signal function handler
            signal.signal(signal.SIGALRM, handler)

            # Define a timeout for your function
            signal.alarm(timeout_secs)

            result = None
            try:
                result = func(*args, **kwargs)
            except Exception as exc:
                raise exc
            finally:
                # disable the signal alarm
                signal.alarm(0)

            return result

        return time_limited

    return wrapper

와 함수의 래퍼 사용20 seconds타임아웃은 다음과 같습니다.

    @timeout(20)
    def my_slow_or_never_ending_function(name):
        while True:
            time.sleep(1)
            print(f"Yet another second passed {name}...")

    try:
        results = my_slow_or_never_ending_function("Yooo!")
    except Exception as e:
        print(f"ERROR: {e}")

asyncio를 사용하는 다른 솔루션:

실행 중인 메인코드의 타임아웃뿐만 아니라 백그라운드태스크를 취소하려면 메인스레드로부터의 명시적인 통신을 통해 태스크의 코드를 취소하도록 요구해야 합니다(스레드화 등).이벤트()

import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor


class SingletonTimeOut:
    pool = None

    @classmethod
    def run(cls, to_run: functools.partial, timeout: float):
        pool = cls.get_pool()
        loop = cls.get_loop()
        try:
            task = loop.run_in_executor(pool, to_run)
            return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
        except asyncio.TimeoutError as e:
            error_type = type(e).__name__ #TODO
            raise e

    @classmethod
    def get_pool(cls):
        if cls.pool is None:
            cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
        return cls.pool

    @classmethod
    def get_loop(cls):
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            asyncio.set_event_loop(asyncio.new_event_loop())
            # print("NEW LOOP" + str(threading.current_thread().ident))
            return asyncio.get_event_loop()

# ---------------

TIME_OUT = float('0.2')  # seconds

def toto(input_items,nb_predictions):
    return 1

to_run = functools.partial(toto,
                           input_items=1,
                           nb_predictions="a")

results = SingletonTimeOut.run(to_run, TIME_OUT)

#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

SIGALARM에서는 할 수 없는 네스트 가능한 시간 인터럽트가 필요했습니다.sleep(스레드 기반 접근 방식으로는 불가능)http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/에서 코드를 복사하고 가볍게 수정했습니다.

코드 자체:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

및 사용 예:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

같은 문제에 직면해 있습니다만, 서브 스레드에 대한 작업이 필요하기 때문에 신호가 작동하지 않기 때문에 python 패키지를 작성했습니다.이 문제를 해결하기 위한 타임아웃 타이머, 컨텍스트 또는 데코레이터로 사용하기 위한 지원, 신호 또는 서브 스레드 모듈을 사용하여 타임아웃 인터럽트를 트리거합니다.

from timeout_timer import timeout, TimeoutInterrupt

class TimeoutInterruptNested(TimeoutInterrupt):
    pass

def test_timeout_nested_loop_both_timeout(timer="thread"):
    cnt = 0
    try:
        with timeout(5, timer=timer):
            try:
                with timeout(2, timer=timer, exception=TimeoutInterruptNested):
                    sleep(2)
            except TimeoutInterruptNested:
                cnt += 1
            time.sleep(10)
    except TimeoutInterrupt:
        cnt += 1
    assert cnt == 2

상세보기 : https://github.com/dozysun/timeout-timer

여기에 제시된 스레드 기반 솔루션에 대한 약간의 개선점을 나타냅니다.

다음 코드는 예외를 지원합니다.

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

5초의 타임아웃으로 호출:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

POSIX 버전은 이전 답변의 많은 부분을 결합하여 다음과 같은 기능을 제공합니다.

  1. 실행을 차단하는 하위 프로세스입니다.
  2. 클래스 멤버 함수의 타임아웃 함수 사용.
  3. 종료까지의 시간에는 엄격한 요건이 있습니다.

다음은 코드와 몇 가지 테스트 케이스입니다.

import threading
import signal
import os
import time

class TerminateExecution(Exception):
    """
    Exception to indicate that execution has exceeded the preset running time.
    """


def quit_function(pid):
    # Killing all subprocesses
    os.setpgrp()
    os.killpg(0, signal.SIGTERM)

    # Killing the main thread
    os.kill(pid, signal.SIGTERM)


def handle_term(signum, frame):
    raise TerminateExecution()


def invoke_with_timeout(timeout, fn, *args, **kwargs):
    # Setting a sigterm handler and initiating a timer
    old_handler = signal.signal(signal.SIGTERM, handle_term)
    timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
    terminate = False

    # Executing the function
    timer.start()
    try:
        result = fn(*args, **kwargs)
    except TerminateExecution:
        terminate = True
    finally:
        # Restoring original handler and cancel timer
        signal.signal(signal.SIGTERM, old_handler)
        timer.cancel()

    if terminate:
        raise BaseException("xxx")

    return result

### Test cases
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        time.sleep(1)
    print('countdown finished')
    return 1337


def really_long_function():
    time.sleep(10)


def really_long_function2():
    os.system("sleep 787")


# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337

# Testing various scenarios
t1 = time.time()
try:
    print(invoke_with_timeout(1, countdown, 3))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function2))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)


t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)


class X:
    def __init__(self):
        self.value = 0

    def set(self, v):
        self.value = v


x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9

다음으로 타임아웃을 사용하여1개의 메서드를 실행하고 성공한 경우 값을 취득하는 간단한 예를 나타냅니다.

import multiprocessing
import time

ret = {"foo": False}


def worker(queue):
    """worker function"""

    ret = queue.get()

    time.sleep(1)

    ret["foo"] = True
    queue.put(ret)


if __name__ == "__main__":
    queue = multiprocessing.Queue()
    queue.put(ret)

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join(timeout=10)

    if p.exitcode is None:
        print("The worker timed out.")
    else:
        print(f"The worker completed and returned: {queue.get()}")

언급URL : https://stackoverflow.com/questions/492519/timeout-on-a-function-call

반응형