diff --git a/lib/concurrent/__init__.py b/lib/concurrent/__init__.py new file mode 100644 index 00000000..b36383a6 --- /dev/null +++ b/lib/concurrent/__init__.py @@ -0,0 +1,3 @@ +from pkgutil import extend_path + +__path__ = extend_path(__path__, __name__) diff --git a/lib/concurrent/futures/__init__.py b/lib/concurrent/futures/__init__.py new file mode 100644 index 00000000..fef52819 --- /dev/null +++ b/lib/concurrent/futures/__init__.py @@ -0,0 +1,23 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Execute computations asynchronously using threads or processes.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +from concurrent.futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + Executor, + wait, + as_completed) +from concurrent.futures.thread import ThreadPoolExecutor + +# Jython doesn't have multiprocessing +try: + from concurrent.futures.process import ProcessPoolExecutor +except ImportError: + pass diff --git a/lib/concurrent/futures/_base.py b/lib/concurrent/futures/_base.py new file mode 100644 index 00000000..6f0c0f3b --- /dev/null +++ b/lib/concurrent/futures/_base.py @@ -0,0 +1,605 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +from __future__ import with_statement +import logging +import threading +import time + +from concurrent.futures._compat import reraise + +try: + from collections import namedtuple +except ImportError: + from concurrent.futures._compat import namedtuple + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +FIRST_COMPLETED = 'FIRST_COMPLETED' +FIRST_EXCEPTION = 'FIRST_EXCEPTION' +ALL_COMPLETED = 'ALL_COMPLETED' +_AS_COMPLETED = '_AS_COMPLETED' + +# Possible future states (for internal use by the futures package). +PENDING = 'PENDING' +RUNNING = 'RUNNING' +# The future was cancelled by the user... +CANCELLED = 'CANCELLED' +# ...and _Waiter.add_cancelled() was called by a worker. +CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' +FINISHED = 'FINISHED' + +_FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + +# Logger for internal use by the futures package. +LOGGER = logging.getLogger("concurrent.futures") + +class Error(Exception): + """Base class for all future-related exceptions.""" + pass + +class CancelledError(Error): + """The Future was cancelled.""" + pass + +class TimeoutError(Error): + """The operation exceeded the given deadline.""" + pass + +class _Waiter(object): + """Provides the event that wait() and as_completed() block on.""" + def __init__(self): + self.event = threading.Event() + self.finished_futures = [] + + def add_result(self, future): + self.finished_futures.append(future) + + def add_exception(self, future): + self.finished_futures.append(future) + + def add_cancelled(self, future): + self.finished_futures.append(future) + +class _AsCompletedWaiter(_Waiter): + """Used by as_completed().""" + + def __init__(self): + super(_AsCompletedWaiter, self).__init__() + self.lock = threading.Lock() + + def add_result(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_cancelled(future) + self.event.set() + +class _FirstCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_COMPLETED).""" + + def add_result(self, future): + super(_FirstCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + super(_FirstCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + super(_FirstCompletedWaiter, self).add_cancelled(future) + self.event.set() + +class _AllCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + + def __init__(self, num_pending_calls, stop_on_exception): + self.num_pending_calls = num_pending_calls + self.stop_on_exception = stop_on_exception + self.lock = threading.Lock() + super(_AllCompletedWaiter, self).__init__() + + def _decrement_pending_calls(self): + with self.lock: + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() + + def add_result(self, future): + super(_AllCompletedWaiter, self).add_result(future) + self._decrement_pending_calls() + + def add_exception(self, future): + super(_AllCompletedWaiter, self).add_exception(future) + if self.stop_on_exception: + self.event.set() + else: + self._decrement_pending_calls() + + def add_cancelled(self, future): + super(_AllCompletedWaiter, self).add_cancelled(future) + self._decrement_pending_calls() + +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" + + def __init__(self, futures): + self.futures = sorted(futures, key=id) + + def __enter__(self): + for future in self.futures: + future._condition.acquire() + + def __exit__(self, *args): + for future in self.futures: + future._condition.release() + +def _create_and_install_waiters(fs, return_when): + if return_when == _AS_COMPLETED: + waiter = _AsCompletedWaiter() + elif return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) + + for f in fs: + f._waiters.append(waiter) + + return waiter + +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finished or + cancelled). + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + """ + if timeout is not None: + end_time = timeout + time.time() + + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + pending = set(fs) - finished + waiter = _create_and_install_waiters(fs, _AS_COMPLETED) + + try: + for future in finished: + yield future + + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.time() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + len(pending), len(fs))) + + waiter.event.wait(wait_timeout) + + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() + + for future in finished: + yield future + pending.remove(future) + + finally: + for f in fs: + f._waiters.remove(waiter) + +DoneAndNotDoneFutures = namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') +def wait(fs, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the given sequence to complete. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when this function should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises an exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + + Returns: + A named 2-tuple of sets. The first set, named 'done', contains the + futures that completed (is finished or cancelled) before the wait + completed. The second set, named 'not_done', contains uncompleted + futures. + """ + with _AcquireFutures(fs): + done = set(f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + not_done = set(fs) - done + + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done + if not f.cancelled() and f.exception() is not None): + return DoneAndNotDoneFutures(done, not_done) + + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) + + waiter = _create_and_install_waiters(fs, return_when) + + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, set(fs) - done) + +class Future(object): + """Represents the result of an asynchronous computation.""" + + def __init__(self): + """Initializes the future. Should not be called by clients.""" + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._traceback = None + self._waiters = [] + self._done_callbacks = [] + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + try: + callback(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state]) + + def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + self._condition.notify_all() + + self._invoke_callbacks() + return True + + def cancelled(self): + """Return True if the future has cancelled.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + + def running(self): + """Return True if the future is currently executing.""" + with self._condition: + return self._state == RUNNING + + def done(self): + """Return True of the future was cancelled or finished executing.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + + def __get_result(self): + if self._exception: + reraise(self._exception, self._traceback) + else: + return self._result + + def add_done_callback(self, fn): + """Attaches a callable that will be called when the future finishes. + + Args: + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. + """ + with self._condition: + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: + self._done_callbacks.append(fn) + return + fn(self) + + def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + + def exception_info(self, timeout=None): + """Return a tuple of (exception, traceback) raised by the call that the + future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception, self._traceback + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception, self._traceback + else: + raise TimeoutError() + + def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + return self.exception_info(timeout)[0] + + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + """Mark the future as running or process any cancel notifications. + + Should only be used by Executor implementations and unit tests. + + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. + + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. + + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. + + Returns: + False if the Future was cancelled, True otherwise. + + Raises: + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. + """ + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True + else: + LOGGER.critical('Future %s in unexpected state: %s', + id(self.future), + self.future._state) + raise RuntimeError('Future in unexpected state') + + def set_result(self, result): + """Sets the return value of work associated with the future. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + + def set_exception_info(self, exception, traceback): + """Sets the result of the future as being the given exception + and traceback. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._exception = exception + self._traceback = traceback + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + self._invoke_callbacks() + + def set_exception(self, exception): + """Sets the result of the future as being the given exception. + + Should only be used by Executor implementations and unit tests. + """ + self.set_exception_info(exception, None) + +class Executor(object): + """This is an abstract base class for concrete asynchronous executors.""" + + def submit(self, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Returns: + A Future representing the given call. + """ + raise NotImplementedError() + + def map(self, fn, *iterables, **kwargs): + """Returns a iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + timeout = kwargs.get('timeout') + if timeout is not None: + end_time = timeout + time.time() + + fs = [self.submit(fn, *args) for args in zip(*iterables)] + + try: + for future in fs: + if timeout is None: + yield future.result() + else: + yield future.result(end_time - time.time()) + finally: + for future in fs: + future.cancel() + + def shutdown(self, wait=True): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + executor have been reclaimed. + """ + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown(wait=True) + return False diff --git a/lib/concurrent/futures/_compat.py b/lib/concurrent/futures/_compat.py new file mode 100644 index 00000000..e77cf0e5 --- /dev/null +++ b/lib/concurrent/futures/_compat.py @@ -0,0 +1,111 @@ +from keyword import iskeyword as _iskeyword +from operator import itemgetter as _itemgetter +import sys as _sys + + +def namedtuple(typename, field_names): + """Returns a new subclass of tuple with named fields. + + >>> Point = namedtuple('Point', 'x y') + >>> Point.__doc__ # docstring for the new class + 'Point(x, y)' + >>> p = Point(11, y=22) # instantiate with positional args or keywords + >>> p[0] + p[1] # indexable like a plain tuple + 33 + >>> x, y = p # unpack like a regular tuple + >>> x, y + (11, 22) + >>> p.x + p.y # fields also accessable by name + 33 + >>> d = p._asdict() # convert to a dictionary + >>> d['x'] + 11 + >>> Point(**d) # convert from a dictionary + Point(x=11, y=22) + >>> p._replace(x=100) # _replace() is like str.replace() but targets named fields + Point(x=100, y=22) + + """ + + # Parse and validate the field names. Validation serves two purposes, + # generating informative error messages and preventing template injection attacks. + if isinstance(field_names, basestring): + field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas + field_names = tuple(map(str, field_names)) + for name in (typename,) + field_names: + if not all(c.isalnum() or c=='_' for c in name): + raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name) + if _iskeyword(name): + raise ValueError('Type names and field names cannot be a keyword: %r' % name) + if name[0].isdigit(): + raise ValueError('Type names and field names cannot start with a number: %r' % name) + seen_names = set() + for name in field_names: + if name.startswith('_'): + raise ValueError('Field names cannot start with an underscore: %r' % name) + if name in seen_names: + raise ValueError('Encountered duplicate field name: %r' % name) + seen_names.add(name) + + # Create and fill-in the class template + numfields = len(field_names) + argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes + reprtxt = ', '.join('%s=%%r' % name for name in field_names) + dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names)) + template = '''class %(typename)s(tuple): + '%(typename)s(%(argtxt)s)' \n + __slots__ = () \n + _fields = %(field_names)r \n + def __new__(_cls, %(argtxt)s): + return _tuple.__new__(_cls, (%(argtxt)s)) \n + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new %(typename)s object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != %(numfields)d: + raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result)) + return result \n + def __repr__(self): + return '%(typename)s(%(reprtxt)s)' %% self \n + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {%(dicttxt)s} \n + def _replace(_self, **kwds): + 'Return a new %(typename)s object replacing specified fields with new values' + result = _self._make(map(kwds.pop, %(field_names)r, _self)) + if kwds: + raise ValueError('Got unexpected field names: %%r' %% kwds.keys()) + return result \n + def __getnewargs__(self): + return tuple(self) \n\n''' % locals() + for i, name in enumerate(field_names): + template += ' %s = _property(_itemgetter(%d))\n' % (name, i) + + # Execute the template string in a temporary namespace and + # support tracing utilities by setting a value for frame.f_globals['__name__'] + namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename, + _property=property, _tuple=tuple) + try: + exec(template, namespace) + except SyntaxError: + e = _sys.exc_info()[1] + raise SyntaxError(e.message + ':\n' + template) + result = namespace[typename] + + # For pickling to work, the __module__ variable needs to be set to the frame + # where the named tuple is created. Bypass this step in enviroments where + # sys._getframe is not defined (Jython for example). + if hasattr(_sys, '_getframe'): + result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__') + + return result + + +if _sys.version_info[0] < 3: + def reraise(exc, traceback): + locals_ = {'exc_type': type(exc), 'exc_value': exc, 'traceback': traceback} + exec('raise exc_type, exc_value, traceback', {}, locals_) +else: + def reraise(exc, traceback): + # Tracebacks are embedded in exceptions in Python 3 + raise exc diff --git a/lib/concurrent/futures/process.py b/lib/concurrent/futures/process.py new file mode 100644 index 00000000..98684f8e --- /dev/null +++ b/lib/concurrent/futures/process.py @@ -0,0 +1,363 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ProcessPoolExecutor. + +The follow diagram and text describe the data-flow through the system: + +|======================= In-process =====================|== Out-of-process ==| + ++----------+ +----------+ +--------+ +-----------+ +---------+ +| | => | Work Ids | => | | => | Call Q | => | | +| | +----------+ | | +-----------+ | | +| | | ... | | | | ... | | | +| | | 6 | | | | 5, call() | | | +| | | 7 | | | | ... | | | +| Process | | ... | | Local | +-----------+ | Process | +| Pool | +----------+ | Worker | | #1..n | +| Executor | | Thread | | | +| | +----------- + | | +-----------+ | | +| | <=> | Work Items | <=> | | <= | Result Q | <= | | +| | +------------+ | | +-----------+ | | +| | | 6: call() | | | | ... | | | +| | | future | | | | 4, result | | | +| | | ... | | | | 3, except | | | ++----------+ +------------+ +--------+ +-----------+ +---------+ + +Executor.submit() called: +- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict +- adds the id of the _WorkItem to the "Work Ids" queue + +Local worker thread: +- reads work ids from the "Work Ids" queue and looks up the corresponding + WorkItem from the "Work Items" dict: if the work item has been cancelled then + it is simply removed from the dict, otherwise it is repackaged as a + _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" + until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because + calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). +- reads _ResultItems from "Result Q", updates the future stored in the + "Work Items" dict and deletes the dict entry + +Process #1..n: +- reads _CallItems from "Call Q", executes the calls, and puts the resulting + _ResultItems in "Request Q" +""" + +from __future__ import with_statement +import atexit +import multiprocessing +import threading +import weakref +import sys + +from concurrent.futures import _base + +try: + import queue +except ImportError: + import Queue as queue + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +# Workers are created as daemon threads and processes. This is done to allow the +# interpreter to exit when there are still idle processes in a +# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, +# allowing workers to die with the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads/processes finish. + +_threads_queues = weakref.WeakKeyDictionary() +_shutdown = False + +def _python_exit(): + global _shutdown + _shutdown = True + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() + +# Controls how many more calls than processes will be queued in the call queue. +# A smaller number will mean that processes spend more time idle waiting for +# work while a larger number will make Future.cancel() succeed less frequently +# (Futures in the call queue cannot be cancelled). +EXTRA_QUEUED_CALLS = 1 + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, fn, args, kwargs): + self.work_id = work_id + self.fn = fn + self.args = args + self.kwargs = kwargs + +def _process_worker(call_queue, result_queue): + """Evaluates calls from call_queue and places the results in result_queue. + + This worker is run in a separate process. + + Args: + call_queue: A multiprocessing.Queue of _CallItems that will be read and + evaluated by the worker. + result_queue: A multiprocessing.Queue of _ResultItems that will written + to by the worker. + shutdown: A multiprocessing.Event that will be set as a signal to the + worker that it should exit when call_queue is empty. + """ + while True: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return + try: + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException: + e = sys.exc_info()[1] + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) + else: + result_queue.put(_ResultItem(call_item.work_id, + result=r)) + +def _add_call_item_to_queue(pending_work_items, + work_ids, + call_queue): + """Fills call_queue with _WorkItems from pending_work_items. + + This function never blocks. + + Args: + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids + are consumed and the corresponding _WorkItems from + pending_work_items are transformed into _CallItems and put in + call_queue. + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems. + """ + while True: + if call_queue.full(): + return + try: + work_id = work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = pending_work_items[work_id] + + if work_item.future.set_running_or_notify_cancel(): + call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: + del pending_work_items[work_id] + continue + +def _queue_management_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue): + """Manages the communication between this process and the worker processes. + + This function is run in a local thread. + + Args: + executor_reference: A weakref.ref to the ProcessPoolExecutor that owns + this thread. Used to determine if the ProcessPoolExecutor has been + garbage collected and that this function can exit. + process: A list of the multiprocessing.Process instances used as + workers. + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems for processing by the process workers. + result_queue: A multiprocessing.Queue of _ResultItems generated by the + process workers. + """ + nb_shutdown_processes = [0] + def shutdown_one_process(): + """Tell a worker to terminate, which will in turn wake us again""" + call_queue.put(None) + nb_shutdown_processes[0] += 1 + while True: + _add_call_item_to_queue(pending_work_items, + work_ids_queue, + call_queue) + + result_item = result_queue.get(block=True) + if result_item is not None: + work_item = pending_work_items[result_item.work_id] + del pending_work_items[result_item.work_id] + + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + # Check whether we should start shutting down. + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if _shutdown or executor is None or executor._shutdown_thread: + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + while nb_shutdown_processes[0] < len(processes): + shutdown_one_process() + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + call_queue.close() + return + del executor + +_system_limits_checked = False +_system_limited = None +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + import os + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermine limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + raise NotImplementedError(_system_limited) + +class ProcessPoolExecutor(_base.Executor): + def __init__(self, max_workers=None): + """Initializes a new ProcessPoolExecutor instance. + + Args: + max_workers: The maximum number of processes that can be used to + execute the given calls. If None or not given then as many + worker processes will be created as the machine has processors. + """ + _check_system_limits() + + if max_workers is None: + self._max_workers = multiprocessing.cpu_count() + else: + self._max_workers = max_workers + + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. + self._call_queue = multiprocessing.Queue(self._max_workers + + EXTRA_QUEUED_CALLS) + self._result_queue = multiprocessing.Queue() + self._work_ids = queue.Queue() + self._queue_management_thread = None + self._processes = set() + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_lock = threading.Lock() + self._queue_count = 0 + self._pending_work_items = {} + + def _start_queue_management_thread(self): + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, q=self._result_queue): + q.put(None) + if self._queue_management_thread is None: + self._queue_management_thread = threading.Thread( + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), + self._processes, + self._pending_work_items, + self._work_ids, + self._call_queue, + self._result_queue)) + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + _threads_queues[self._queue_management_thread] = self._result_queue + + def _adjust_process_count(self): + for _ in range(len(self._processes), self._max_workers): + p = multiprocessing.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue)) + p.start() + self._processes.add(p) + + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._pending_work_items[self._queue_count] = w + self._work_ids.put(self._queue_count) + self._queue_count += 1 + # Wake up queue management thread + self._result_queue.put(None) + + self._start_queue_management_thread() + self._adjust_process_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown_thread = True + if self._queue_management_thread: + # Wake up queue management thread + self._result_queue.put(None) + if wait: + self._queue_management_thread.join() + # To reduce the risk of openning too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._call_queue = None + self._result_queue = None + self._processes = None + shutdown.__doc__ = _base.Executor.shutdown.__doc__ + +atexit.register(_python_exit) diff --git a/lib/concurrent/futures/thread.py b/lib/concurrent/futures/thread.py new file mode 100644 index 00000000..930d1673 --- /dev/null +++ b/lib/concurrent/futures/thread.py @@ -0,0 +1,138 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ThreadPoolExecutor.""" + +from __future__ import with_statement +import atexit +import threading +import weakref +import sys + +from concurrent.futures import _base + +try: + import queue +except ImportError: + import Queue as queue + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +# Workers are created as daemon threads. This is done to allow the interpreter +# to exit when there are still idle threads in a ThreadPoolExecutor's thread +# pool (i.e. shutdown() was not called). However, allowing workers to die with +# the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads finish. + +_threads_queues = weakref.WeakKeyDictionary() +_shutdown = False + +def _python_exit(): + global _shutdown + _shutdown = True + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() + +atexit.register(_python_exit) + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException: + e, tb = sys.exc_info()[1:] + self.future.set_exception_info(e, tb) + else: + self.future.set_result(result) + +def _worker(executor_reference, work_queue): + try: + while True: + work_item = work_queue.get(block=True) + if work_item is not None: + work_item.run() + continue + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Notice other workers + work_queue.put(None) + return + del executor + except BaseException: + _base.LOGGER.critical('Exception in worker', exc_info=True) + +class ThreadPoolExecutor(_base.Executor): + def __init__(self, max_workers): + """Initializes a new ThreadPoolExecutor instance. + + Args: + max_workers: The maximum number of threads that can be used to + execute the given calls. + """ + self._max_workers = max_workers + self._work_queue = queue.Queue() + self._threads = set() + self._shutdown = False + self._shutdown_lock = threading.Lock() + + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def _adjust_thread_count(self): + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) + # TODO(bquinlan): Should avoid creating new threads if there are more + # idle threads than items in the work queue. + if len(self._threads) < self._max_workers: + t = threading.Thread(target=_worker, + args=(weakref.ref(self, weakref_cb), + self._work_queue)) + t.daemon = True + t.start() + self._threads.add(t) + _threads_queues[t] = self._work_queue + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown = True + self._work_queue.put(None) + if wait: + for t in self._threads: + t.join() + shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/lib/futures/__init__.py b/lib/futures/__init__.py new file mode 100644 index 00000000..8f8b2348 --- /dev/null +++ b/lib/futures/__init__.py @@ -0,0 +1,24 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Execute computations asynchronously using threads or processes.""" + +import warnings + +from concurrent.futures import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + Executor, + wait, + as_completed, + ProcessPoolExecutor, + ThreadPoolExecutor) + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +warnings.warn('The futures package has been deprecated. ' + 'Use the concurrent.futures package instead.', + DeprecationWarning) diff --git a/lib/futures/process.py b/lib/futures/process.py new file mode 100644 index 00000000..e9d37b16 --- /dev/null +++ b/lib/futures/process.py @@ -0,0 +1 @@ +from concurrent.futures import ProcessPoolExecutor diff --git a/lib/futures/thread.py b/lib/futures/thread.py new file mode 100644 index 00000000..f6bd05de --- /dev/null +++ b/lib/futures/thread.py @@ -0,0 +1 @@ +from concurrent.futures import ThreadPoolExecutor diff --git a/sickbeard/__init__.py b/sickbeard/__init__.py index 0c351d6a..8d433b5b 100755 --- a/sickbeard/__init__.py +++ b/sickbeard/__init__.py @@ -1844,9 +1844,9 @@ def launchBrowser(startPort=None): if not startPort: startPort = WEB_PORT if ENABLE_HTTPS: - browserURL = 'https://localhost:%d%s' % (startPort, WEB_ROOT) + browserURL = 'https://localhost:%d/' % (startPort) else: - browserURL = 'http://localhost:%d%s' % (startPort, WEB_ROOT) + browserURL = 'http://localhost:%d/' % (startPort) try: webbrowser.open(browserURL, 2, 1) except: diff --git a/sickbeard/encodingKludge.py b/sickbeard/encodingKludge.py index 18500f61..313bada1 100644 --- a/sickbeard/encodingKludge.py +++ b/sickbeard/encodingKludge.py @@ -68,7 +68,10 @@ def ss(x): try: return u_x.encode(sickbeard.SYS_ENCODING, 'replace') except: - return u_x.encode('utf-8', 'replace') + try: + return u_x.encode('utf-8', 'replace') + except: + return x def fixListEncodings(x): if not isinstance(x, (list, tuple)): diff --git a/sickbeard/ui.py b/sickbeard/ui.py index 44348b69..43044c6c 100644 --- a/sickbeard/ui.py +++ b/sickbeard/ui.py @@ -30,9 +30,6 @@ class Notifications(object): self._messages = [] self._errors = [] - def __del__(self): - pass - def message(self, title, message=''): """ Add a regular notification to the queue @@ -92,9 +89,6 @@ class Notification(object): else: self._timeout = datetime.timedelta(minutes=1) - def __del__(self): - pass - def is_new(self, remote_ip='127.0.0.1'): """ Returns True if the notification hasn't been displayed to the current client (aka IP address). diff --git a/sickbeard/webserve.py b/sickbeard/webserve.py index ef51c71d..a0487551 100644 --- a/sickbeard/webserve.py +++ b/sickbeard/webserve.py @@ -17,6 +17,7 @@ # along with SickRage. If not, see . from __future__ import with_statement +import inspect import threading import traceback @@ -54,7 +55,6 @@ from sickbeard.scene_numbering import get_scene_numbering, set_scene_numbering, get_xem_numbering_for_show, get_scene_absolute_numbering_for_show, get_xem_absolute_numbering_for_show, \ get_scene_absolute_numbering - from lib.dateutil import tz from lib.unrar2 import RarFile @@ -77,40 +77,15 @@ from Cheetah.Template import Template from functools import wraps from tornado.routes import route -from tornado.web import RequestHandler, authenticated, asynchronous +from tornado.web import RequestHandler, HTTPError, authenticated, addslash, removeslash, asynchronous +from tornado.gen import coroutine, Task +from tornado.concurrent import run_on_executor +from concurrent.futures import ThreadPoolExecutor from bug_tracker import BugTracker route_locks = {} -def run_async(func): - @wraps(func) - def async_func(*args, **kwargs): - func_hl = threading.Thread(target = func, args = args, kwargs = kwargs) - func_hl.start() - - return async_func - -@run_async -def run_handler(route, kwargs, callback = None): - try: - res = route(**kwargs) - callback(res, route) - except: - logger.log('Failed doing api request "%s": %s' % (route, traceback.format_exc()), logger.ERROR) - callback({'success': False, 'error': 'Failed returning results'}, route) - -def page_not_found(rh): - index_url = sickbeard.WEB_ROOT - url = rh.request.uri[len(index_url):] - - if url[:3] != 'api': - r = index_url + url.lstrip('/') - rh.redirect(r) - else: - rh.set_status(404) - rh.write('Wrong API key used') - class PageTemplate(Template): def __init__(self, rh, *args, **kwargs): kwargs['file'] = os.path.join(sickbeard.PROG_DIR, "gui/" + sickbeard.GUI_NAME + "/interfaces/default/", kwargs['file']) @@ -123,6 +98,7 @@ class PageTemplate(Template): self.sbHandleReverseProxy = sickbeard.HANDLE_REVERSE_PROXY self.sbThemeName = sickbeard.THEME_NAME self.sbLogin = rh.get_current_user() + self.sbURI = rh.request.uri.strip('/') if rh.request.headers['Host'][0] == '[': self.sbHost = re.match("^\[.*\]", rh.request.headers['Host'], re.X | re.M | re.S).group(0) @@ -163,6 +139,44 @@ class BaseHandler(RequestHandler): def __init__(self, *args, **kwargs): super(BaseHandler, self).__init__(*args, **kwargs) + def write_error(self, status_code, **kwargs): + # handle 404 http errors + if status_code == 404: + index_url = sickbeard.WEB_ROOT + url = self.request.uri.replace(index_url, '') + + if url[:3] != 'api': + r = index_url + url + self.redirect(r) + else: + self.write('Wrong API key used') + return + + if self.settings.get("debug") and "exc_info" in kwargs: + exc_info = kwargs["exc_info"] + trace_info = ''.join(["%s
" % line for line in traceback.format_exception(*exc_info)]) + request_info = ''.join(["%s: %s
" % (k, self.request.__dict__[k] ) for k in + self.request.__dict__.keys()]) + error = exc_info[1] + + self.set_header('Content-Type', 'text/html') + self.finish(""" + %s + +

Error

+

%s

+

Traceback

+

%s

+

Request Info

+

%s

+ + """ % (error, error, + trace_info, request_info)) + + def redirect(self, url, permanent=False, status=None): + url = sickbeard.WEB_ROOT + url if sickbeard.WEB_ROOT not in url else url + super(BaseHandler, self).redirect(url, permanent, status) + def get_current_user(self, *args, **kwargs): if not isinstance(self, UI) and sickbeard.WEB_USERNAME and sickbeard.WEB_PASSWORD: return self.get_secure_cookie('user') @@ -211,6 +225,81 @@ class BaseHandler(RequestHandler): else: return False + +class WebHandler(BaseHandler): + executor = ThreadPoolExecutor(10) + + @coroutine + @asynchronous + @authenticated + def get(self, route, *args, **kwargs): + try: + route = route.strip('/') or 'index' + method = getattr(self, route) + + self.async_worker(method, self.async_done) + except: + raise HTTPError(404) + + @run_on_executor + def async_worker(self, method, callback): + result = None + + # get params + kwargs = self.request.arguments + for arg, value in kwargs.items(): + if len(value) == 1: + kwargs[arg] = value[0] + + # get result + try: + result = ek.ss(method(**kwargs)).encode('utf-8', 'xmlcharrefreplace') + except: + result = method(**kwargs) + finally: + callback(result) + + def async_done(self, result): + # finish result + self.write(result) + self.finish() + + # link post to get + post = get + +class LoginHandler(BaseHandler): + def get(self, *args, **kwargs): + + if self.get_current_user(): + self.redirect('/home/') + else: + t = PageTemplate(rh=self, file="login.tmpl") + self.write(ek.ss(t).encode('utf-8', 'xmlcharrefreplace')) + + def post(self, *args, **kwargs): + + api_key = None + + username = sickbeard.WEB_USERNAME + password = sickbeard.WEB_PASSWORD + + if (self.get_argument('username') == username or not username) and ( + self.get_argument('password') == password or not password): + api_key = sickbeard.API_KEY + + if api_key: + remember_me = int(self.get_argument('remember_me', default=0) or 0) + self.set_secure_cookie('user', api_key, expires_days=30 if remember_me > 0 else None) + + self.redirect('/home/') + + +class LogoutHandler(BaseHandler): + def get(self, *args, **kwargs): + self.clear_cookie("user") + self.redirect('/login/') + + class KeyHandler(RequestHandler): def get(self, *args, **kwargs): api_key = None @@ -231,119 +320,6 @@ class KeyHandler(RequestHandler): logger.log('Failed doing key request: %s' % (traceback.format_exc()), logger.ERROR) self.write({'success': False, 'error': 'Failed returning results'}) -class WebHandler(BaseHandler): - def write_error(self, status_code, **kwargs): - if status_code == 404: - self.redirect(sickbeard.WEB_ROOT + '/home/') - return - elif self.settings.get("debug") and "exc_info" in kwargs: - exc_info = kwargs["exc_info"] - trace_info = ''.join(["%s
" % line for line in traceback.format_exception(*exc_info)]) - request_info = ''.join(["%s: %s
" % (k, self.request.__dict__[k] ) for k in - self.request.__dict__.keys()]) - error = exc_info[1] - - self.set_header('Content-Type', 'text/html') - self.finish(""" - %s - -

Error

-

%s

-

Traceback

-

%s

-

Request Info

-

%s

- - """ % (error, error, - trace_info, request_info)) - - @asynchronous - @authenticated - def get(self, route, *args, **kwargs): - route = route.strip('/') - - try: - route = getattr(self, route) - except: - route = getattr(self, 'index') - - # acquire route lock - route_locks[route] = threading.Lock() - route_locks[route].acquire() - - try: - - # Sanitize argument lists: - kwargs = self.request.arguments - for arg, value in kwargs.items(): - if len(value) == 1: - kwargs[arg] = value[0] - - run_handler(route, kwargs, callback=self.taskFinished) - except: - route_locks[route].release() - page_not_found(self) - - def taskFinished(self, result, route): - try: - # encode results - try:result = ek.ss(result).encode('utf-8', 'xmlcharrefreplace') if result else None - except:pass - - # ignore empty results - if result: - # Check JSONP callback - jsonp_callback = self.get_argument('callback_func', default=None) - - if jsonp_callback: - self.write(str(jsonp_callback) + '(' + json.dumps(result) + ')') - self.set_header("Content-Type", "text/javascript") - self.finish() - else: - self.write(result) - self.finish() - except UnicodeDecodeError: - logger.log('Failed proper encode: %s' % traceback.format_exc(), logger.ERROR) - except: - logger.log("Failed doing web request '%s': %s" % (route, traceback.format_exc()), logger.ERROR) - try:self.finish({'success': False, 'error': 'Failed returning results'}) - except:pass - - # release route lock - route_locks[route].release() - - # link post to get - post = get - -class LoginHandler(BaseHandler): - def get(self, *args, **kwargs): - - if self.get_current_user(): - self.redirect('%shome/' % sickbeard.WEB_ROOT) - else: - t = PageTemplate(rh=self, file="login.tmpl") - self.write(ek.ss(t).encode('utf-8', 'xmlcharrefreplace')) - - def post(self, *args, **kwargs): - - api_key = None - - username = sickbeard.WEB_USERNAME - password = sickbeard.WEB_PASSWORD - - if (self.get_argument('username') == username or not username) and (self.get_argument('password') == password or not password): - api_key = sickbeard.API_KEY - - if api_key: - remember_me = int(self.get_argument('remember_me', default=0) or 0) - self.set_secure_cookie('user', api_key, expires_days=30 if remember_me > 0 else None) - - self.redirect('%shome/' % sickbeard.WEB_ROOT) - -class LogoutHandler(BaseHandler): - def get(self, *args, **kwargs): - self.clear_cookie("user") - self.redirect('%slogin/' % sickbeard.WEB_ROOT) @route('(.*)(/?)') class WebRoot(WebHandler): @@ -639,6 +615,7 @@ class WebRoot(WebHandler): return ical + @route('/ui/(.*)(/?)') class UI(WebRoot): def add_message(self, *args, **kwargs): @@ -658,6 +635,7 @@ class UI(WebRoot): return json.dumps(messages) + @route('/browser/(.*)(/?)') class WebFileBrowser(WebRoot): def index(self, path='', includeFiles=False, *args, **kwargs): @@ -671,6 +649,7 @@ class WebFileBrowser(WebRoot): return json.dumps(paths) + @route('/home/(.*)(/?)') class Home(WebRoot): def HomeMenu(self, *args, **kwargs): @@ -1980,6 +1959,8 @@ class Home(WebRoot): return json.dumps({'result': 'success'}) else: return json.dumps({'result': 'failure'}) + + @route('/home/postprocess/(.*)(/?)') class HomePostProcess(Home): def index(self, *args, **kwargs): @@ -2017,6 +1998,7 @@ class HomePostProcess(Home): result = result.replace("\n", "
\n") return self._genericMessage("Postprocessing results", result) + @route('/home/addShows/(.*)(/?)') class NewHomeAddShows(Home): def index(self, *args, **kwargs): @@ -2517,6 +2499,7 @@ class NewHomeAddShows(Home): # for the remaining shows we need to prompt for each one, so forward this on to the newShow page return self.newShow(dirs_only[0], dirs_only[1:]) + @route('/manage/(.*)(/?)') class Manage(WebRoot): def ManageMenu(self, *args, **kwargs): @@ -2645,7 +2628,7 @@ class Manage(WebRoot): to_change[cur_indexer_id] = all_eps WebRoot.Home.setStatus(cur_indexer_id, '|'.join(to_change[cur_indexer_id]), - newStatus, direct=True) + newStatus, direct=True) self.redirect('/manage/episodeStatuses/') @@ -3021,13 +3004,13 @@ class Manage(WebRoot): exceptions_list = [] curErrors += WebRoot.Home.editShow(curShow, new_show_dir, anyQualities, - bestQualities, exceptions_list, - archive_firstmatch=new_archive_firstmatch, - flatten_folders=new_flatten_folders, - paused=new_paused, sports=new_sports, - subtitles=new_subtitles, anime=new_anime, - scene=new_scene, air_by_date=new_air_by_date, - directCall=True) + bestQualities, exceptions_list, + archive_firstmatch=new_archive_firstmatch, + flatten_folders=new_flatten_folders, + paused=new_paused, sports=new_sports, + subtitles=new_subtitles, anime=new_anime, + scene=new_scene, air_by_date=new_air_by_date, + directCall=True) if curErrors: logger.log(u"Errors: " + str(curErrors), logger.ERROR) @@ -3212,6 +3195,7 @@ class Manage(WebRoot): return t + @route('/manage/manageSearches/(.*)(/?)') class ManageSearches(Manage): def index(self, *args, **kwargs): @@ -3274,6 +3258,7 @@ class ManageSearches(Manage): self.redirect("/manage/manageSearches/") + @route('/history/(.*)(/?)') class History(WebRoot): def index(self, limit=100): @@ -3365,6 +3350,7 @@ class History(WebRoot): ui.notifications.message('Removed history entries greater than 30 days old') self.redirect("/history/") + @route('/config/(.*)(/?)') class Config(WebRoot): def ConfigMenu(self, *args, **kwargs): @@ -3387,9 +3373,9 @@ class Config(WebRoot): return t + @route('/config/general/(.*)(/?)') class ConfigGeneral(Config): - def index(self, *args, **kwargs): t = PageTemplate(rh=self, file="config_general.tmpl") t.submenu = self.ConfigMenu() @@ -3569,6 +3555,7 @@ class ConfigBackupRestore(Config): return finalResult + @route('/config/search/(.*)(/?)') class ConfigSearch(Config): def index(self, *args, **kwargs): @@ -3661,7 +3648,8 @@ class ConfigSearch(Config): self.redirect("/config/search/") -@route('/config/postProcessing(.*)(/?)') + +@route('/config/postProcessing/(.*)(/?)') class ConfigPostProcessing(Config): def index(self, *args, **kwargs): @@ -3860,6 +3848,7 @@ class ConfigPostProcessing(Config): logger.log(u'Rar Not Supported: ' + ex(e), logger.ERROR) return 'not supported' + @route('/config/providers/(.*)(/?)') class ConfigProviders(Config): def index(self, *args, **kwargs): @@ -4298,6 +4287,7 @@ class ConfigProviders(Config): self.redirect("/config/providers/") + @route('/config/notifications/(.*)(/?)') class ConfigNotifications(Config): def index(self, *args, **kwargs): @@ -4507,6 +4497,7 @@ class ConfigNotifications(Config): self.redirect("/config/notifications/") + @route('/config/subtitles/(.*)(/?)') class ConfigSubtitles(Config): def index(self, *args, **kwargs): @@ -4570,6 +4561,7 @@ class ConfigSubtitles(Config): self.redirect("/config/subtitles/") + @route('/config/anime/(.*)(/?)') class ConfigAnime(Config): def index(self, *args, **kwargs): @@ -4602,6 +4594,7 @@ class ConfigAnime(Config): self.redirect("/config/anime/") + @route('/errorlogs/(.*)(/?)') class ErrorLogs(WebRoot): def ErrorLogsMenu(self, *args, **kwargs): diff --git a/sickbeard/webserveInit.py b/sickbeard/webserveInit.py index 8d68084d..fdd34327 100644 --- a/sickbeard/webserveInit.py +++ b/sickbeard/webserveInit.py @@ -12,27 +12,23 @@ from tornado.httpserver import HTTPServer from tornado.ioloop import IOLoop from tornado.routes import route - class MultiStaticFileHandler(StaticFileHandler): - def initialize(self, paths, default_filename=None): + def initialize(self, paths): self.paths = paths - self.default_filename = default_filename + + def set_extra_headers(self, path): + self.set_header("Cache-control", "no-store, no-cache, must-revalidate, max-age=0") def get(self, path, include_body=True): - for p in self.paths: + for staticPath in self.paths: try: - # Initialize the Static file with a path - super(MultiStaticFileHandler, self).initialize(p) - # Try to get the file - return super(MultiStaticFileHandler, self).get(path) - except HTTPError as exc: - # File not found, carry on - if exc.status_code == 404: + super(MultiStaticFileHandler, self).initialize(staticPath) + return super(MultiStaticFileHandler, self).get(path.strip('/')).result() + except HTTPError as e: + if e.status_code == 404: continue raise - - # Oops file not found anywhere! - raise HTTPError(404) + self.set_status(404) class SRWebServer(threading.Thread): @@ -61,8 +57,8 @@ class SRWebServer(threading.Thread): self.video_root = None # web root - self.options['web_root'] = ('/' + self.options['web_root'].lstrip('/')).strip('/') - sickbeard.WEB_ROOT = self.options['web_root'] + if self.options['web_root']: + sickbeard.WEB_ROOT = self.options['web_root'] = ('/' + self.options['web_root'].lstrip('/').strip('/')) # api root if not sickbeard.API_KEY: @@ -95,36 +91,37 @@ class SRWebServer(threading.Thread): gzip=True, xheaders=sickbeard.HANDLE_REVERSE_PROXY, cookie_secret='61oETzKXQAGaYdkL5gEmGeJJFuYh7EQnp2XdTP1o/Vo=', - login_url='%slogin/' % self.options['web_root'], + login_url=r'%s/login/' % self.options['web_root'], ) # Main Handlers - self.app.add_handlers(".*$", [ + self.app.add_handlers('.*$', [ (r'%s(/?)' % self.options['api_root'], ApiHandler), (r'%s/getkey(/?)' % self.options['web_root'], KeyHandler), (r'%s/api/builder' % self.options['web_root'], RedirectHandler, {"url": self.options['web_root'] + '/apibuilder/'}), (r'%s/login(/?)' % self.options['web_root'], LoginHandler), (r'%s/logout(/?)' % self.options['web_root'], LogoutHandler), - ] + route.get_routes()) + (r'/', RedirectHandler, {"url": self.options['web_root'] + '/home/'}), + ] + route.get_routes(self.options['web_root'])) # Static Path Handlers self.app.add_handlers(".*$", [ (r'%s/(favicon\.ico)' % self.options['web_root'], MultiStaticFileHandler, {'paths': [os.path.join(self.options['data_root'], 'images/ico/favicon.ico')]}), - (r'%s/%s/(.*)(/?)' % (self.options['web_root'], 'images'), MultiStaticFileHandler, + (r'%s/%s(.*)(/?)' % (self.options['web_root'], 'images'), MultiStaticFileHandler, {'paths': [os.path.join(self.options['data_root'], 'images'), os.path.join(sickbeard.CACHE_DIR, 'images')]}), - (r'%s/%s/(.*)(/?)' % (self.options['web_root'], 'css'), MultiStaticFileHandler, + (r'%s/%s(.*)(/?)' % (self.options['web_root'], 'css'), MultiStaticFileHandler, {'paths': [os.path.join(self.options['data_root'], 'css')]}), - (r'%s/%s/(.*)(/?)' % (self.options['web_root'], 'js'), MultiStaticFileHandler, + (r'%s/%s(.*)(/?)' % (self.options['web_root'], 'js'), MultiStaticFileHandler, {'paths': [os.path.join(self.options['data_root'], 'js')]}), ]) # Static Videos Path if self.video_root: self.app.add_handlers(".*$", [ - (r'%s%s/(.*)' % (self.options['web_root'], 'videos'), MultiStaticFileHandler, + (r'%s/%s/(.*)' % (self.options['web_root'], 'videos'), MultiStaticFileHandler, {'paths': [self.video_root]}), ]) diff --git a/tornado/routes.py b/tornado/routes.py index 3d19c88d..a7042aa0 100644 --- a/tornado/routes.py +++ b/tornado/routes.py @@ -1,5 +1,9 @@ +import inspect +import os import tornado.web +route_list = [] + class route(object): _routes = [] @@ -10,13 +14,14 @@ class route(object): def __call__(self, _handler): """gets called when we class decorate""" name = self.name and self.name or _handler.__name__ - self._routes.append(tornado.web.url(self._uri, _handler, name=name)) + self._routes.append((self._uri, _handler, name)) return _handler @classmethod - def get_routes(self): + def get_routes(self, webroot=''): self._routes.reverse() - return self._routes + routes = [tornado.web.url(webroot + _uri, _handler, name=name) for _uri, _handler, name, in self._routes] + return routes def route_redirect(from_, to, name=None): route._routes.append(tornado.web.url(from_, tornado.web.RedirectHandler, dict(url=to), name=name)) \ No newline at end of file