"""A generally useful event scheduler class. | |
Each instance of this class manages its own queue. | |
No multi-threading is implied; you are supposed to hack that | |
yourself, or use a single instance per application. | |
Each instance is parametrized with two functions, one that is | |
supposed to return the current time, one that is supposed to | |
implement a delay. You can implement real-time scheduling by | |
substituting time and sleep from built-in module time, or you can | |
implement simulated time by writing your own functions. This can | |
also be used to integrate scheduling with STDWIN events; the delay | |
function is allowed to modify the queue. Time can be expressed as | |
integers or floating point numbers, as long as it is consistent. | |
Events are specified by tuples (time, priority, action, argument). | |
As in UNIX, lower priority numbers mean higher priority; in this | |
way the queue can be maintained as a priority queue. Execution of the | |
event means calling the action function, passing it the argument | |
sequence in "argument" (remember that in Python, multiple function | |
arguments are be packed in a sequence). | |
The action function may be an instance method so it | |
has another way to reference private data (besides global variables). | |
""" | |
# XXX The timefunc and delayfunc should have been defined as methods | |
# XXX so you can define new kinds of schedulers using subclassing | |
# XXX instead of having to define a module or class just to hold | |
# XXX the global state of your particular time and delay functions. | |
import heapq | |
from collections import namedtuple | |
__all__ = ["scheduler"] | |
Event = namedtuple('Event', 'time, priority, action, argument') | |
class scheduler: | |
def __init__(self, timefunc, delayfunc): | |
"""Initialize a new instance, passing the time and delay | |
functions""" | |
self._queue = [] | |
self.timefunc = timefunc | |
self.delayfunc = delayfunc | |
def enterabs(self, time, priority, action, argument): | |
"""Enter a new event in the queue at an absolute time. | |
Returns an ID for the event which can be used to remove it, | |
if necessary. | |
""" | |
event = Event(time, priority, action, argument) | |
heapq.heappush(self._queue, event) | |
return event # The ID | |
def enter(self, delay, priority, action, argument): | |
"""A variant that specifies the time as a relative time. | |
This is actually the more commonly used interface. | |
""" | |
time = self.timefunc() + delay | |
return self.enterabs(time, priority, action, argument) | |
def cancel(self, event): | |
"""Remove an event from the queue. | |
This must be presented the ID as returned by enter(). | |
If the event is not in the queue, this raises ValueError. | |
""" | |
self._queue.remove(event) | |
heapq.heapify(self._queue) | |
def empty(self): | |
"""Check whether the queue is empty.""" | |
return not self._queue | |
def run(self): | |
"""Execute events until the queue is empty. | |
When there is a positive delay until the first event, the | |
delay function is called and the event is left in the queue; | |
otherwise, the event is removed from the queue and executed | |
(its action function is called, passing it the argument). If | |
the delay function returns prematurely, it is simply | |
restarted. | |
It is legal for both the delay function and the action | |
function to to modify the queue or to raise an exception; | |
exceptions are not caught but the scheduler's state remains | |
well-defined so run() may be called again. | |
A questionable hack is added to allow other threads to run: | |
just after an event is executed, a delay of 0 is executed, to | |
avoid monopolizing the CPU when other threads are also | |
runnable. | |
""" | |
# localize variable access to minimize overhead | |
# and to improve thread safety | |
q = self._queue | |
delayfunc = self.delayfunc | |
timefunc = self.timefunc | |
pop = heapq.heappop | |
while q: | |
time, priority, action, argument = checked_event = q[0] | |
now = timefunc() | |
if now < time: | |
delayfunc(time - now) | |
else: | |
event = pop(q) | |
# Verify that the event was not removed or altered | |
# by another thread after we last looked at q[0]. | |
if event is checked_event: | |
action(*argument) | |
delayfunc(0) # Let other threads run | |
else: | |
heapq.heappush(q, event) | |
@property | |
def queue(self): | |
"""An ordered list of upcoming events. | |
Events are named tuples with fields for: | |
time, priority, action, arguments | |
""" | |
# Use heapq to sort the queue rather than using 'sorted(self._queue)'. | |
# With heapq, two events scheduled at the same time will show in | |
# the actual order they would be retrieved. | |
events = self._queue[:] | |
return map(heapq.heappop, [events]*len(events)) |