mirror of
https://github.com/moparisthebest/SickRage
synced 2024-11-14 05:15:12 -05:00
145 lines
5.1 KiB
Python
145 lines
5.1 KiB
Python
"""Bridges between the `asyncio` module and Tornado IOLoop.
|
|
|
|
This is a work in progress and interfaces are subject to change.
|
|
|
|
To test:
|
|
python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop
|
|
python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOMainLoop
|
|
(the tests log a few warnings with AsyncIOMainLoop because they leave some
|
|
unfinished callbacks on the event loop that fail when it resumes)
|
|
"""
|
|
|
|
from __future__ import absolute_import, division, print_function, with_statement
|
|
import datetime
|
|
import functools
|
|
|
|
# _Timeout is used for its timedelta_to_seconds method for py26 compatibility.
|
|
from tornado.ioloop import IOLoop, _Timeout
|
|
from tornado import stack_context
|
|
|
|
try:
|
|
# Import the real asyncio module for py33+ first. Older versions of the
|
|
# trollius backport also use this name.
|
|
import asyncio
|
|
except ImportError as e:
|
|
# Asyncio itself isn't available; see if trollius is (backport to py26+).
|
|
try:
|
|
import trollius as asyncio
|
|
except ImportError:
|
|
# Re-raise the original asyncio error, not the trollius one.
|
|
raise e
|
|
|
|
class BaseAsyncIOLoop(IOLoop):
|
|
def initialize(self, asyncio_loop, close_loop=False):
|
|
self.asyncio_loop = asyncio_loop
|
|
self.close_loop = close_loop
|
|
self.asyncio_loop.call_soon(self.make_current)
|
|
# Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
|
|
self.handlers = {}
|
|
# Set of fds listening for reads/writes
|
|
self.readers = set()
|
|
self.writers = set()
|
|
self.closing = False
|
|
|
|
def close(self, all_fds=False):
|
|
self.closing = True
|
|
for fd in list(self.handlers):
|
|
fileobj, handler_func = self.handlers[fd]
|
|
self.remove_handler(fd)
|
|
if all_fds:
|
|
self.close_fd(fileobj)
|
|
if self.close_loop:
|
|
self.asyncio_loop.close()
|
|
|
|
def add_handler(self, fd, handler, events):
|
|
fd, fileobj = self.split_fd(fd)
|
|
if fd in self.handlers:
|
|
raise ValueError("fd %s added twice" % fd)
|
|
self.handlers[fd] = (fileobj, stack_context.wrap(handler))
|
|
if events & IOLoop.READ:
|
|
self.asyncio_loop.add_reader(
|
|
fd, self._handle_events, fd, IOLoop.READ)
|
|
self.readers.add(fd)
|
|
if events & IOLoop.WRITE:
|
|
self.asyncio_loop.add_writer(
|
|
fd, self._handle_events, fd, IOLoop.WRITE)
|
|
self.writers.add(fd)
|
|
|
|
def update_handler(self, fd, events):
|
|
fd, fileobj = self.split_fd(fd)
|
|
if events & IOLoop.READ:
|
|
if fd not in self.readers:
|
|
self.asyncio_loop.add_reader(
|
|
fd, self._handle_events, fd, IOLoop.READ)
|
|
self.readers.add(fd)
|
|
else:
|
|
if fd in self.readers:
|
|
self.asyncio_loop.remove_reader(fd)
|
|
self.readers.remove(fd)
|
|
if events & IOLoop.WRITE:
|
|
if fd not in self.writers:
|
|
self.asyncio_loop.add_writer(
|
|
fd, self._handle_events, fd, IOLoop.WRITE)
|
|
self.writers.add(fd)
|
|
else:
|
|
if fd in self.writers:
|
|
self.asyncio_loop.remove_writer(fd)
|
|
self.writers.remove(fd)
|
|
|
|
def remove_handler(self, fd):
|
|
fd, fileobj = self.split_fd(fd)
|
|
if fd not in self.handlers:
|
|
return
|
|
if fd in self.readers:
|
|
self.asyncio_loop.remove_reader(fd)
|
|
self.readers.remove(fd)
|
|
if fd in self.writers:
|
|
self.asyncio_loop.remove_writer(fd)
|
|
self.writers.remove(fd)
|
|
del self.handlers[fd]
|
|
|
|
def _handle_events(self, fd, events):
|
|
fileobj, handler_func = self.handlers[fd]
|
|
handler_func(fileobj, events)
|
|
|
|
def start(self):
|
|
self._setup_logging()
|
|
self.asyncio_loop.run_forever()
|
|
|
|
def stop(self):
|
|
self.asyncio_loop.stop()
|
|
|
|
def add_timeout(self, deadline, callback):
|
|
if isinstance(deadline, (int, float)):
|
|
delay = max(deadline - self.time(), 0)
|
|
elif isinstance(deadline, datetime.timedelta):
|
|
delay = _Timeout.timedelta_to_seconds(deadline)
|
|
else:
|
|
raise TypeError("Unsupported deadline %r", deadline)
|
|
return self.asyncio_loop.call_later(delay, self._run_callback,
|
|
stack_context.wrap(callback))
|
|
|
|
def remove_timeout(self, timeout):
|
|
timeout.cancel()
|
|
|
|
def add_callback(self, callback, *args, **kwargs):
|
|
if self.closing:
|
|
raise RuntimeError("IOLoop is closing")
|
|
self.asyncio_loop.call_soon_threadsafe(
|
|
self._run_callback,
|
|
functools.partial(stack_context.wrap(callback), *args, **kwargs))
|
|
|
|
add_callback_from_signal = add_callback
|
|
|
|
|
|
class AsyncIOMainLoop(BaseAsyncIOLoop):
|
|
def initialize(self):
|
|
super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(),
|
|
close_loop=False)
|
|
|
|
|
|
class AsyncIOLoop(BaseAsyncIOLoop):
|
|
def initialize(self):
|
|
super(AsyncIOLoop, self).initialize(asyncio.new_event_loop(),
|
|
close_loop=True)
|