1
0
mirror of https://github.com/moparisthebest/SickRage synced 2025-01-07 11:58:01 -05:00

Added in optional multi-threading for all search queues, can be set from General->Advanced config section.

Fixed issue #417 and #335
This commit is contained in:
echel0n 2014-05-06 04:29:25 -07:00
parent 8402a98c43
commit fb71c6139e
18 changed files with 1395 additions and 132 deletions

View File

@ -324,6 +324,16 @@
</label>
</div>
<div class="field-pair">
<label class="nocheck clearfix">
<span class="component-title">Multi-Threading</span>
<input type="text" name="num_of_threads" id="num_of_threads" value="$sickbeard.NUM_OF_THREADS" size="5" />
</label>
<label class="nocheck clearfix">
<span class="component-title">&nbsp;</span>
<span class="component-desc">Number of threads to assign, the more threads the faster SickRage goes if your hardware can handle it! (eg. 1)</span>
</label>
</div>
<input type="submit" class="btn config_submitter" value="Save Changes" />
</fieldset>
</div><!-- /component-group4 //-->

View File

@ -0,0 +1,3 @@
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)

View File

@ -0,0 +1,23 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Execute computations asynchronously using threads or processes."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from concurrent.futures._base import (FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
CancelledError,
TimeoutError,
Future,
Executor,
wait,
as_completed)
from concurrent.futures.thread import ThreadPoolExecutor
# Jython doesn't have multiprocessing
try:
from concurrent.futures.process import ProcessPoolExecutor
except ImportError:
pass

View File

@ -0,0 +1,577 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
from __future__ import with_statement
import logging
import threading
import time
try:
from collections import namedtuple
except ImportError:
from concurrent.futures._compat import namedtuple
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
_AS_COMPLETED = '_AS_COMPLETED'
# Possible future states (for internal use by the futures package).
PENDING = 'PENDING'
RUNNING = 'RUNNING'
# The future was cancelled by the user...
CANCELLED = 'CANCELLED'
# ...and _Waiter.add_cancelled() was called by a worker.
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
FINISHED = 'FINISHED'
_FUTURE_STATES = [
PENDING,
RUNNING,
CANCELLED,
CANCELLED_AND_NOTIFIED,
FINISHED
]
_STATE_TO_DESCRIPTION_MAP = {
PENDING: "pending",
RUNNING: "running",
CANCELLED: "cancelled",
CANCELLED_AND_NOTIFIED: "cancelled",
FINISHED: "finished"
}
# Logger for internal use by the futures package.
LOGGER = logging.getLogger("concurrent.futures")
class Error(Exception):
"""Base class for all future-related exceptions."""
pass
class CancelledError(Error):
"""The Future was cancelled."""
pass
class TimeoutError(Error):
"""The operation exceeded the given deadline."""
pass
class _Waiter(object):
"""Provides the event that wait() and as_completed() block on."""
def __init__(self):
self.event = threading.Event()
self.finished_futures = []
def add_result(self, future):
self.finished_futures.append(future)
def add_exception(self, future):
self.finished_futures.append(future)
def add_cancelled(self, future):
self.finished_futures.append(future)
class _AsCompletedWaiter(_Waiter):
"""Used by as_completed()."""
def __init__(self):
super(_AsCompletedWaiter, self).__init__()
self.lock = threading.Lock()
def add_result(self, future):
with self.lock:
super(_AsCompletedWaiter, self).add_result(future)
self.event.set()
def add_exception(self, future):
with self.lock:
super(_AsCompletedWaiter, self).add_exception(future)
self.event.set()
def add_cancelled(self, future):
with self.lock:
super(_AsCompletedWaiter, self).add_cancelled(future)
self.event.set()
class _FirstCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_COMPLETED)."""
def add_result(self, future):
super(_FirstCompletedWaiter, self).add_result(future)
self.event.set()
def add_exception(self, future):
super(_FirstCompletedWaiter, self).add_exception(future)
self.event.set()
def add_cancelled(self, future):
super(_FirstCompletedWaiter, self).add_cancelled(future)
self.event.set()
class _AllCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
def __init__(self, num_pending_calls, stop_on_exception):
self.num_pending_calls = num_pending_calls
self.stop_on_exception = stop_on_exception
self.lock = threading.Lock()
super(_AllCompletedWaiter, self).__init__()
def _decrement_pending_calls(self):
with self.lock:
self.num_pending_calls -= 1
if not self.num_pending_calls:
self.event.set()
def add_result(self, future):
super(_AllCompletedWaiter, self).add_result(future)
self._decrement_pending_calls()
def add_exception(self, future):
super(_AllCompletedWaiter, self).add_exception(future)
if self.stop_on_exception:
self.event.set()
else:
self._decrement_pending_calls()
def add_cancelled(self, future):
super(_AllCompletedWaiter, self).add_cancelled(future)
self._decrement_pending_calls()
class _AcquireFutures(object):
"""A context manager that does an ordered acquire of Future conditions."""
def __init__(self, futures):
self.futures = sorted(futures, key=id)
def __enter__(self):
for future in self.futures:
future._condition.acquire()
def __exit__(self, *args):
for future in self.futures:
future._condition.release()
def _create_and_install_waiters(fs, return_when):
if return_when == _AS_COMPLETED:
waiter = _AsCompletedWaiter()
elif return_when == FIRST_COMPLETED:
waiter = _FirstCompletedWaiter()
else:
pending_count = sum(
f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
if return_when == FIRST_EXCEPTION:
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
elif return_when == ALL_COMPLETED:
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
else:
raise ValueError("Invalid return condition: %r" % return_when)
for f in fs:
f._waiters.append(waiter)
return waiter
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
iterate over.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator that yields the given Futures as they complete (finished or
cancelled).
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
"""
if timeout is not None:
end_time = timeout + time.time()
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = set(fs) - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try:
for future in finished:
yield future
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.time()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))
waiter.event.wait(wait_timeout)
with waiter.lock:
finished = waiter.finished_futures
waiter.finished_futures = []
waiter.event.clear()
for future in finished:
yield future
pending.remove(future)
finally:
for f in fs:
f._waiters.remove(waiter)
DoneAndNotDoneFutures = namedtuple(
'DoneAndNotDoneFutures', 'done not_done')
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the futures in the given sequence to complete.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
wait upon.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
return_when: Indicates when this function should return. The options
are:
FIRST_COMPLETED - Return when any future finishes or is
cancelled.
FIRST_EXCEPTION - Return when any future finishes by raising an
exception. If no future raises an exception
then it is equivalent to ALL_COMPLETED.
ALL_COMPLETED - Return when all futures finish or are cancelled.
Returns:
A named 2-tuple of sets. The first set, named 'done', contains the
futures that completed (is finished or cancelled) before the wait
completed. The second set, named 'not_done', contains uncompleted
futures.
"""
with _AcquireFutures(fs):
done = set(f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
not_done = set(fs) - done
if (return_when == FIRST_COMPLETED) and done:
return DoneAndNotDoneFutures(done, not_done)
elif (return_when == FIRST_EXCEPTION) and done:
if any(f for f in done
if not f.cancelled() and f.exception() is not None):
return DoneAndNotDoneFutures(done, not_done)
if len(done) == len(fs):
return DoneAndNotDoneFutures(done, not_done)
waiter = _create_and_install_waiters(fs, return_when)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, set(fs) - done)
class Future(object):
"""Represents the result of an asynchronous computation."""
def __init__(self):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._waiters = []
self._done_callbacks = []
def _invoke_callbacks(self):
for callback in self._done_callbacks:
try:
callback(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
def __repr__(self):
with self._condition:
if self._state == FINISHED:
if self._exception:
return '<Future at %s state=%s raised %s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
return '<Future at %s state=%s returned %s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
return '<Future at %s state=%s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state])
def cancel(self):
"""Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future
cannot be cancelled if it is running or has already completed.
"""
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
return True
self._state = CANCELLED
self._condition.notify_all()
self._invoke_callbacks()
return True
def cancelled(self):
"""Return True if the future has cancelled."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
def isAlive(self):
return self.running()
def running(self):
"""Return True if the future is currently executing."""
with self._condition:
return self._state == RUNNING
def done(self):
"""Return True of the future was cancelled or finished executing."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
def __get_result(self):
if self._exception:
raise self._exception
else:
return self._result
def add_done_callback(self, fn):
"""Attaches a callable that will be called when the future finishes.
Args:
fn: A callable that will be called with this future as its only
argument when the future completes or is cancelled. The callable
will always be called by a thread in the same process in which
it was added. If the future has already completed or been
cancelled then the callable will be called immediately. These
callables are called in the order that they were added.
"""
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self)
def result(self, timeout=None):
"""Return the result of the call that the future represents.
Args:
timeout: The number of seconds to wait for the result if the future
isn't done. If None, then there is no limit on the wait time.
Returns:
The result of the call that the future represents.
Raises:
CancelledError: If the future was cancelled.
TimeoutError: If the future didn't finish executing before the given
timeout.
Exception: If the call raised then that exception will be raised.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
Args:
timeout: The number of seconds to wait for the exception if the
future isn't done. If None, then there is no limit on the wait
time.
Returns:
The exception raised by the call that the future represents or None
if the call completed without raising.
Raises:
CancelledError: If the future was cancelled.
TimeoutError: If the future didn't finish executing before the given
timeout.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
else:
raise TimeoutError()
# The following methods should only be used by Executors and in tests.
def set_running_or_notify_cancel(self):
"""Mark the future as running or process any cancel notifications.
Should only be used by Executor implementations and unit tests.
If the future has been cancelled (cancel() was called and returned
True) then any threads waiting on the future completing (though calls
to as_completed() or wait()) are notified and False is returned.
If the future was not cancelled then it is put in the running state
(future calls to running() will return True) and True is returned.
This method should be called by Executor implementations before
executing the work associated with this future. If this method returns
False then the work should not be executed.
Returns:
False if the Future was cancelled, True otherwise.
Raises:
RuntimeError: if this method was already called or if set_result()
or set_exception() was called.
"""
with self._condition:
if self._state == CANCELLED:
self._state = CANCELLED_AND_NOTIFIED
for waiter in self._waiters:
waiter.add_cancelled(self)
# self._condition.notify_all() is not necessary because
# self.cancel() triggers a notification.
return False
elif self._state == PENDING:
self._state = RUNNING
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
id(self.future),
self.future._state)
raise RuntimeError('Future in unexpected state')
def set_result(self, result):
"""Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
self._result = result
self._state = FINISHED
for waiter in self._waiters:
waiter.add_result(self)
self._condition.notify_all()
self._invoke_callbacks()
def set_exception(self, exception):
"""Sets the result of the future as being the given exception.
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
self._exception = exception
self._state = FINISHED
for waiter in self._waiters:
waiter.add_exception(self)
self._condition.notify_all()
self._invoke_callbacks()
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Returns:
A Future representing the given call.
"""
raise NotImplementedError()
def map(self, fn, *iterables, **kwargs):
"""Returns a iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
timeout = kwargs.get('timeout')
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
def shutdown(self, wait=True):
"""Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other
methods can be called after this one.
Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
executor have been reclaimed.
"""
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False

View File

@ -0,0 +1,101 @@
from keyword import iskeyword as _iskeyword
from operator import itemgetter as _itemgetter
import sys as _sys
def namedtuple(typename, field_names):
"""Returns a new subclass of tuple with named fields.
>>> Point = namedtuple('Point', 'x y')
>>> Point.__doc__ # docstring for the new class
'Point(x, y)'
>>> p = Point(11, y=22) # instantiate with positional args or keywords
>>> p[0] + p[1] # indexable like a plain tuple
33
>>> x, y = p # unpack like a regular tuple
>>> x, y
(11, 22)
>>> p.x + p.y # fields also accessable by name
33
>>> d = p._asdict() # convert to a dictionary
>>> d['x']
11
>>> Point(**d) # convert from a dictionary
Point(x=11, y=22)
>>> p._replace(x=100) # _replace() is like str.replace() but targets named fields
Point(x=100, y=22)
"""
# Parse and validate the field names. Validation serves two purposes,
# generating informative error messages and preventing template injection attacks.
if isinstance(field_names, basestring):
field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas
field_names = tuple(map(str, field_names))
for name in (typename,) + field_names:
if not all(c.isalnum() or c=='_' for c in name):
raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name)
if _iskeyword(name):
raise ValueError('Type names and field names cannot be a keyword: %r' % name)
if name[0].isdigit():
raise ValueError('Type names and field names cannot start with a number: %r' % name)
seen_names = set()
for name in field_names:
if name.startswith('_'):
raise ValueError('Field names cannot start with an underscore: %r' % name)
if name in seen_names:
raise ValueError('Encountered duplicate field name: %r' % name)
seen_names.add(name)
# Create and fill-in the class template
numfields = len(field_names)
argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes
reprtxt = ', '.join('%s=%%r' % name for name in field_names)
dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names))
template = '''class %(typename)s(tuple):
'%(typename)s(%(argtxt)s)' \n
__slots__ = () \n
_fields = %(field_names)r \n
def __new__(_cls, %(argtxt)s):
return _tuple.__new__(_cls, (%(argtxt)s)) \n
@classmethod
def _make(cls, iterable, new=tuple.__new__, len=len):
'Make a new %(typename)s object from a sequence or iterable'
result = new(cls, iterable)
if len(result) != %(numfields)d:
raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result))
return result \n
def __repr__(self):
return '%(typename)s(%(reprtxt)s)' %% self \n
def _asdict(t):
'Return a new dict which maps field names to their values'
return {%(dicttxt)s} \n
def _replace(_self, **kwds):
'Return a new %(typename)s object replacing specified fields with new values'
result = _self._make(map(kwds.pop, %(field_names)r, _self))
if kwds:
raise ValueError('Got unexpected field names: %%r' %% kwds.keys())
return result \n
def __getnewargs__(self):
return tuple(self) \n\n''' % locals()
for i, name in enumerate(field_names):
template += ' %s = _property(_itemgetter(%d))\n' % (name, i)
# Execute the template string in a temporary namespace and
# support tracing utilities by setting a value for frame.f_globals['__name__']
namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename,
_property=property, _tuple=tuple)
try:
exec(template, namespace)
except SyntaxError:
e = _sys.exc_info()[1]
raise SyntaxError(e.message + ':\n' + template)
result = namespace[typename]
# For pickling to work, the __module__ variable needs to be set to the frame
# where the named tuple is created. Bypass this step in enviroments where
# sys._getframe is not defined (Jython for example).
if hasattr(_sys, '_getframe'):
result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__')
return result

View File

@ -0,0 +1,363 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Implements ProcessPoolExecutor.
The follow diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue
Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
WorkItem from the "Work Items" dict: if the work item has been cancelled then
it is simply removed from the dict, otherwise it is repackaged as a
_CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
"Work Items" dict and deletes the dict entry
Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
_ResultItems in "Request Q"
"""
from __future__ import with_statement
import atexit
import multiprocessing
import threading
import weakref
import sys
from concurrent.futures import _base
try:
import queue
except ImportError:
import Queue as queue
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
# Workers are created as daemon threads and processes. This is done to allow the
# interpreter to exit when there are still idle processes in a
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
# allowing workers to die with the interpreter has two undesirable properties:
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently
# (Futures in the call queue cannot be cancelled).
EXTRA_QUEUED_CALLS = 1
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None):
self.work_id = work_id
self.exception = exception
self.result = result
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id
self.fn = fn
self.args = args
self.kwargs = kwargs
def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process.
Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and
evaluated by the worker.
result_queue: A multiprocessing.Queue of _ResultItems that will written
to by the worker.
shutdown: A multiprocessing.Event that will be set as a signal to the
worker that it should exit when call_queue is empty.
"""
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(None)
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException:
e = sys.exc_info()[1]
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))
def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
"""Fills call_queue with _WorkItems from pending_work_items.
This function never blocks.
Args:
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
are consumed and the corresponding _WorkItems from
pending_work_items are transformed into _CallItems and put in
call_queue.
call_queue: A multiprocessing.Queue that will be filled with _CallItems
derived from _WorkItems.
"""
while True:
if call_queue.full():
return
try:
work_id = work_ids.get(block=False)
except queue.Empty:
return
else:
work_item = pending_work_items[work_id]
if work_item.future.set_running_or_notify_cancel():
call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
else:
del pending_work_items[work_id]
continue
def _queue_management_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
Args:
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
this thread. Used to determine if the ProcessPoolExecutor has been
garbage collected and that this function can exit.
process: A list of the multiprocessing.Process instances used as
workers.
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
call_queue: A multiprocessing.Queue that will be filled with _CallItems
derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
"""
nb_shutdown_processes = [0]
def shutdown_one_process():
"""Tell a worker to terminate, which will in turn wake us again"""
call_queue.put(None)
nb_shutdown_processes[0] += 1
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
result_item = result_queue.get(block=True)
if result_item is not None:
work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread:
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
while nb_shutdown_processes[0] < len(processes):
shutdown_one_process()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for p in processes:
p.join()
call_queue.close()
return
del executor
_system_limits_checked = False
_system_limited = None
def _check_system_limits():
global _system_limits_checked, _system_limited
if _system_limits_checked:
if _system_limited:
raise NotImplementedError(_system_limited)
_system_limits_checked = True
try:
import os
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
except (AttributeError, ValueError):
# sysconf not available or setting not available
return
if nsems_max == -1:
# indetermine limit, assume that limit is determined
# by available memory only
return
if nsems_max >= 256:
# minimum number of semaphores available
# according to POSIX
return
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
raise NotImplementedError(_system_limited)
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
Args:
max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
"""
_check_system_limits()
if max_workers is None:
self._max_workers = multiprocessing.cpu_count()
else:
self._max_workers = max_workers
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
self._result_queue = multiprocessing.Queue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
self._processes = set()
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._queue_count = 0
self._pending_work_items = {}
def _start_queue_management_thread(self):
# When the executor gets lost, the weakref callback will wake up
# the queue management thread.
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue))
p.start()
self._processes.add(p)
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
self._result_queue.put(None)
self._start_queue_management_thread()
self._adjust_process_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown_thread = True
if self._queue_management_thread:
# Wake up queue management thread
self._result_queue.put(None)
if wait:
self._queue_management_thread.join()
# To reduce the risk of openning too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
self._call_queue = None
self._result_queue = None
self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
atexit.register(_python_exit)

View File

@ -0,0 +1,141 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Implements ThreadPoolExecutor."""
from __future__ import with_statement
import atexit
import threading
import weakref
import sys
from concurrent.futures import _base
try:
import queue
except ImportError:
import Queue as queue
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.
_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
atexit.register(_python_exit)
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException:
e = sys.exc_info()[1]
self.future.set_exception(e)
else:
self.future.set_result(result)
def _worker(executor_reference, work_queue):
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Notice other workers
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers, name=None):
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
"""
self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._name = name
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue),)
if self._name:
t.name = self._name
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__

24
lib/futures/__init__.py Normal file
View File

@ -0,0 +1,24 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Execute computations asynchronously using threads or processes."""
import warnings
from concurrent.futures import (FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
CancelledError,
TimeoutError,
Future,
Executor,
wait,
as_completed,
ProcessPoolExecutor,
ThreadPoolExecutor)
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
warnings.warn('The futures package has been deprecated. '
'Use the concurrent.futures package instead.',
DeprecationWarning)

1
lib/futures/process.py Normal file
View File

@ -0,0 +1 @@
from concurrent.futures import ProcessPoolExecutor

1
lib/futures/thread.py Normal file
View File

@ -0,0 +1 @@
from concurrent.futures import ThreadPoolExecutor

View File

@ -134,6 +134,7 @@ ROOT_DIRS = None
UPDATE_SHOWS_ON_START = None
SORT_ARTICLE = None
DEBUG = False
NUM_OF_THREADS = None
USE_LISTVIEW = None
METADATA_XBMC = None
@ -502,7 +503,7 @@ def initialize(consoleLogging=True):
GUI_NAME, HOME_LAYOUT, HISTORY_LAYOUT, DISPLAY_SHOW_SPECIALS, COMING_EPS_LAYOUT, COMING_EPS_SORT, COMING_EPS_DISPLAY_PAUSED, COMING_EPS_MISSED_RANGE, DATE_PRESET, TIME_PRESET, TIME_PRESET_W_SECONDS, \
METADATA_WDTV, METADATA_TIVO, IGNORE_WORDS, CALENDAR_UNPROTECTED, CREATE_MISSING_SHOW_DIRS, \
ADD_SHOWS_WO_DIR, USE_SUBTITLES, SUBTITLES_LANGUAGES, SUBTITLES_DIR, SUBTITLES_SERVICES_LIST, SUBTITLES_SERVICES_ENABLED, SUBTITLES_HISTORY, SUBTITLES_FINDER_FREQUENCY, subtitlesFinderScheduler, \
USE_FAILED_DOWNLOADS, DELETE_FAILED, ANON_REDIRECT, LOCALHOST_IP, TMDB_API_KEY, DEBUG, PROXY_SETTING
USE_FAILED_DOWNLOADS, DELETE_FAILED, ANON_REDIRECT, LOCALHOST_IP, TMDB_API_KEY, DEBUG, PROXY_SETTING, NUM_OF_THREADS
if __INITIALIZED__:
return False
@ -569,6 +570,8 @@ def initialize(consoleLogging=True):
DEBUG = bool(check_setting_int(CFG, 'General', 'debug', 0))
NUM_OF_THREADS = check_setting_int(CFG, 'General', 'num_of_threads', 1)
ENABLE_HTTPS = bool(check_setting_int(CFG, 'General', 'enable_https', 0))
HTTPS_CERT = check_setting_str(CFG, 'General', 'https_cert', 'server.crt')
@ -1312,6 +1315,7 @@ def save_config():
new_config['General']['use_api'] = int(USE_API)
new_config['General']['api_key'] = API_KEY
new_config['General']['debug'] = int(DEBUG)
new_config['General']['num_of_threads'] = int(NUM_OF_THREADS)
new_config['General']['enable_https'] = int(ENABLE_HTTPS)
new_config['General']['https_cert'] = HTTPS_CERT
new_config['General']['https_key'] = HTTPS_KEY

View File

@ -234,6 +234,9 @@ class GenericProvider:
results = {}
searchItems = {}
if manualSearch:
self.cache.updateCache()
for epObj in episodes:
itemList = []

View File

@ -137,15 +137,15 @@ class PublicHDProvider(generic.TorrentProvider):
urllib.quote(unidecode(search_string)), ';'.join(self.categories[mode]))
logger.log(u"Search string: " + searchURL, logger.DEBUG)
html = self.getURL(searchURL)
if not html:
continue
#remove unneccecary <option> lines which are slowing down BeautifulSoup
optreg = re.compile(r'<option.*</option>')
html = os.linesep.join([s for s in html.splitlines() if not optreg.search(s)])
try:
soup = BeautifulSoup(html, features=["html5lib", "permissive"])
@ -313,16 +313,15 @@ class PublicHDCache(tvcache.TVCache):
logger.log(u"Clearing " + self.provider.name + " cache and updating with new information")
self._clearCache()
cl = []
ql = []
for result in rss_results:
item = (result[0], result[1])
ci = self._parseItem(item)
if ci is not None:
cl.append(ci)
ql.append(ci)
if len(cl) > 0:
myDB = self._getDB()
myDB.mass_action(cl)
myDB = self._getDB()
myDB.mass_action(ql)
def _parseItem(self, item):

View File

@ -20,6 +20,7 @@ from __future__ import with_statement
import os
import re
import threading
import traceback
import datetime
@ -170,57 +171,51 @@ def snatchEpisode(result, endStatus=SNATCHED):
return True
def searchForNeededEpisodes():
logger.log(u"Searching all providers for any needed episodes")
def searchForNeededEpisodes(curProvider):
threading.currentThread().name = curProvider.name
logger.log(u"Searching all providers for any needed episodes")
foundResults = {}
didSearch = False
# ask all providers for any episodes it finds
for curProvider in providers.sortedProviderList():
try:
curFoundResults = curProvider.searchRSS()
except exceptions.AuthException, e:
logger.log(u"Authentication error: " + ex(e), logger.ERROR)
return
except Exception, e:
logger.log(u"Error while searching " + curProvider.name + ", skipping: " + ex(e), logger.ERROR)
logger.log(traceback.format_exc(), logger.DEBUG)
return
if not curProvider.isActive():
didSearch = True
# pick a single result for each episode, respecting existing results
for curEp in curFoundResults:
if curEp.show.paused:
logger.log(u"Show " + curEp.show.name + " is paused, ignoring all RSS items for " + curEp.prettyName(),
logger.DEBUG)
continue
try:
curFoundResults = curProvider.searchRSS()
except exceptions.AuthException, e:
logger.log(u"Authentication error: " + ex(e), logger.ERROR)
continue
except Exception, e:
logger.log(u"Error while searching " + curProvider.name + ", skipping: " + ex(e), logger.ERROR)
logger.log(traceback.format_exc(), logger.DEBUG)
# find the best result for the current episode
bestResult = None
for curResult in curFoundResults[curEp]:
if not bestResult or bestResult.quality < curResult.quality:
bestResult = curResult
bestResult = pickBestResult(curFoundResults[curEp], curEp.show)
# if all results were rejected move on to the next episode
if not bestResult:
logger.log(u"All found results for " + curEp.prettyName() + " were rejected.", logger.DEBUG)
continue
didSearch = True
# if it's already in the list (from another provider) and the newly found quality is no better then skip it
if curEp in foundResults and bestResult.quality <= foundResults[curEp].quality:
continue
# pick a single result for each episode, respecting existing results
for curEp in curFoundResults:
if curEp.show.paused:
logger.log(u"Show " + curEp.show.name + " is paused, ignoring all RSS items for " + curEp.prettyName(),
logger.DEBUG)
continue
# find the best result for the current episode
bestResult = None
for curResult in curFoundResults[curEp]:
if not bestResult or bestResult.quality < curResult.quality:
bestResult = curResult
bestResult = pickBestResult(curFoundResults[curEp], curEp.show)
# if all results were rejected move on to the next episode
if not bestResult:
logger.log(u"All found results for " + curEp.prettyName() + " were rejected.", logger.DEBUG)
continue
# if it's already in the list (from another provider) and the newly found quality is no better then skip it
if curEp in foundResults and bestResult.quality <= foundResults[curEp].quality:
continue
foundResults[curEp] = bestResult
foundResults[curEp] = bestResult
if not didSearch:
logger.log(u"No NZB/Torrent providers found or enabled in the sickbeard config. Please check your settings.",
@ -362,36 +357,35 @@ def filterSearchResults(show, results):
return foundResults
def searchProviders(show, season, episodes, seasonSearch=False, manualSearch=False):
def searchProviders(show, season, episodes, curProvider, seasonSearch=False, manualSearch=False):
threading.currentThread().name = curProvider.name
logger.log(u"Searching for stuff we need from " + show.name + " season " + str(season))
foundResults = {}
didSearch = False
if manualSearch:
curProvider.cache.updateCache()
for curProvider in providers.sortedProviderList():
if not curProvider.isActive():
continue
# convert indexer numbering to scene numbering for searches
map(lambda x: x.convertToSceneNumbering, episodes)
if manualSearch:
curProvider.cache.updateCache()
try:
curResults = curProvider.findSearchResults(show, season, episodes, seasonSearch, manualSearch)
except exceptions.AuthException, e:
logger.log(u"Authentication error: " + ex(e), logger.ERROR)
return
except Exception, e:
logger.log(u"Error while searching " + curProvider.name + ", skipping: " + ex(e), logger.ERROR)
logger.log(traceback.format_exc(), logger.DEBUG)
return
try:
curResults = curProvider.findSearchResults(show, season, episodes, seasonSearch, manualSearch)
except exceptions.AuthException, e:
logger.log(u"Authentication error: " + ex(e), logger.ERROR)
continue
except Exception, e:
logger.log(u"Error while searching " + curProvider.name + ", skipping: " + ex(e), logger.ERROR)
logger.log(traceback.format_exc(), logger.DEBUG)
continue
# finished searching this provider successfully
didSearch = True
# finished searching this provider successfully
didSearch = True
curResults = filterSearchResults(show, curResults)
if len(curResults):
foundResults.update(curResults)
logger.log(u"Provider search results: " + str(foundResults), logger.DEBUG)
curResults = filterSearchResults(show, curResults)
if len(curResults):
foundResults.update(curResults)
logger.log(u"Provider search results: " + str(foundResults), logger.DEBUG)
if not didSearch:
logger.log(u"No NZB/Torrent providers found or enabled in the sickbeard config. Please check your settings.",

View File

@ -33,4 +33,4 @@ class CurrentSearcher():
def run(self):
search_queue_item = search_queue.RSSSearchQueueItem()
sickbeard.searchQueueScheduler.action.add_item(search_queue_item) #@UndefinedVariable
sickbeard.searchQueueScheduler.action.add_item(search_queue_item)

View File

@ -19,6 +19,8 @@
from __future__ import with_statement
import datetime
from threading import Thread
import threading
import time
import sickbeard
@ -27,6 +29,8 @@ from sickbeard import generic_queue
from sickbeard import search, failed_history, history
from sickbeard import ui
from lib.concurrent import futures
BACKLOG_SEARCH = 10
RSS_SEARCH = 20
FAILED_SEARCH = 30
@ -90,20 +94,19 @@ class ManualSearchQueueItem(generic_queue.QueueItem):
def execute(self):
generic_queue.QueueItem.execute(self)
with futures.ThreadPoolExecutor(sickbeard.NUM_OF_THREADS) as executor:
foundResults = list(executor.map(self.process, [x for x in sickbeard.providers.sortedProviderList() if x.isActive()]))
# convert indexer numbering to scene numbering for searches
(self.ep_obj.scene_season, self.ep_obj.scene_episode) = sickbeard.scene_numbering.get_scene_numbering(
self.ep_obj.show.indexerid, self.ep_obj.show.indexer, self.ep_obj.season, self.ep_obj.episode)
logger.log("Beginning manual search for " + self.ep_obj.prettyName() + ' as ' + self.ep_obj.prettySceneName())
foundResults = search.searchProviders(self.ep_obj.show, self.ep_obj.season, [self.ep_obj], manualSearch=True)
result = False
if not foundResults:
ui.notifications.message('No downloads were found',
if self.ep_obj.show.air_by_date:
ui.notifications.message('No downloads were found ...',
"Couldn't find a download for <i>%s</i>" % self.ep_obj.prettyABName())
logger.log(u"Unable to find a download for " + self.ep_obj.prettyABDName())
else:
ui.notifications.message('No downloads were found ...',
"Couldn't find a download for <i>%s</i>" % self.ep_obj.prettyName())
logger.log(u"Unable to find a download for " + self.ep_obj.prettyName())
logger.log(u"Unable to find a download for " + self.ep_obj.prettyName())
self.success = result
else:
@ -115,12 +118,23 @@ class ManualSearchQueueItem(generic_queue.QueueItem):
providerModule = foundResult.provider
if not result:
ui.notifications.error('Error while attempting to snatch ' + foundResult.name + ', check your logs')
ui.notifications.error(
'Error while attempting to snatch ' + foundResult.name + ', check your logs')
elif providerModule == None:
ui.notifications.error('Provider is configured incorrectly, unable to download')
self.success = result
self.finish()
def process(self, curProvider):
if self.ep_obj.show.air_by_date:
logger.log("Beginning manual search for " + self.ep_obj.prettyABDName())
else:
logger.log("Beginning manual search for " + self.ep_obj.prettyName())
return search.searchProviders(self.ep_obj.show, self.ep_obj.season, self.ep_obj, curProvider, True, False)
def finish(self):
# don't let this linger if something goes wrong
if self.success == None:
@ -134,21 +148,21 @@ class RSSSearchQueueItem(generic_queue.QueueItem):
def execute(self):
generic_queue.QueueItem.execute(self)
with futures.ThreadPoolExecutor(sickbeard.NUM_OF_THREADS) as executor:
foundResults = list(executor.map(self.process, [x for x in sickbeard.providers.sortedProviderList() if x.isActive()]))
for curResult in foundResults:
if curResult:
search.snatchEpisode(curResult)
#time.sleep(2)
generic_queue.QueueItem.finish(self)
def process(self, curProvider):
self._changeMissingEpisodes()
logger.log(u"Beginning search for new episodes on RSS feeds and in cache")
foundResults = search.searchForNeededEpisodes()
if not len(foundResults):
logger.log(u"No needed episodes found on the RSS feeds")
else:
for curResult in foundResults:
search.snatchEpisode(curResult)
time.sleep(2)
generic_queue.QueueItem.finish(self)
return search.searchForNeededEpisodes(curProvider)
def _changeMissingEpisodes(self):
@ -218,35 +232,24 @@ class BacklogQueueItem(generic_queue.QueueItem):
self.wantedEpisodes = self._need_any_episodes(statusResults, bestQualities)
def execute(self):
generic_queue.QueueItem.execute(self)
with futures.ThreadPoolExecutor(sickbeard.NUM_OF_THREADS) as executor:
foundResults = sum(list(executor.map(self.process, [x for x in sickbeard.providers.sortedProviderList() if x.isActive()])))
for curResult in foundResults if foundResults else logger.log(
u"Backlog search found nothing to snatch ..."):
search.snatchEpisode(curResult)
self.finish()
def process(self, curProvider):
# check if we want to search for season packs instead of just season/episode
seasonSearch = False
seasonEps = self.show.getAllEpisodes(self.segment)
if len(seasonEps) == len(self.wantedEpisodes):
seasonSearch = True
# convert indexer numbering to scene numbering for searches
for i, epObj in enumerate(self.wantedEpisodes):
(self.wantedEpisodes[i].scene_season,
self.wantedEpisodes[i].scene_episode) = sickbeard.scene_numbering.get_scene_numbering(self.show.indexerid,
self.show.indexer,
epObj.season,
epObj.episode)
logger.log(
"Beginning backlog search for " + self.wantedEpisodes[i].prettyName() + ' as ' + self.wantedEpisodes[
i].prettySceneName())
# search for our wanted items and return the results
results = search.searchProviders(self.show, self.segment, self.wantedEpisodes, seasonSearch=seasonSearch)
# download whatever we find
for curResult in results:
search.snatchEpisode(curResult)
time.sleep(5)
self.finish()
return search.searchProviders(self.show, self.segment, self.wantedEpisodes, curProvider, False, seasonSearch)
def _need_any_episodes(self, statusResults, bestQualities):
wantedEpisodes = []
@ -284,16 +287,25 @@ class FailedQueueItem(generic_queue.QueueItem):
def execute(self):
generic_queue.QueueItem.execute(self)
with futures.ThreadPoolExecutor(sickbeard.NUM_OF_THREADS) as executor:
foundResults = list(executor.map(self.process, [x for x in sickbeard.providers.sortedProviderList() if x.isActive()]))
# download whatever we find
for curResult in foundResults:
self.success = search.snatchEpisode(curResult)
time.sleep(5)
self.finish()
def process(self, curProvider):
episodes = []
for i, epObj in enumerate(episodes):
# convert indexer numbering to scene numbering for searches
(episodes[i].scene_season, self.episodes[i].scene_episode) = sickbeard.scene_numbering.get_scene_numbering(
self.show.indexerid, self.show.indexer, epObj.season, epObj.episode)
logger.log(
"Beginning failed download search for " + epObj.prettyName() + ' as ' + epObj.prettySceneName())
if epObj.show.air_by_date:
logger.log("Beginning manual search for " + epObj.prettyABDName())
else:
logger.log(
"Beginning failed download search for " + epObj.prettyName())
(release, provider) = failed_history.findRelease(self.show, epObj.season, epObj.episode)
if release:
@ -305,12 +317,4 @@ class FailedQueueItem(generic_queue.QueueItem):
failed_history.revertEpisode(self.show, epObj.season, epObj.episode)
episodes.append(epObj)
# get search results
results = search.searchProviders(self.show, episodes[0].season, episodes)
# download whatever we find
for curResult in results:
self.success = search.snatchEpisode(curResult)
time.sleep(5)
self.finish()
return search.searchProviders(self.show, self.episodes[0].season, self.episodes, curProvider, False, False)

View File

@ -2136,3 +2136,15 @@ class TVEpisode(object):
self.saveToDB()
for relEp in self.relatedEps:
relEp.saveToDB()
def convertToSceneNumbering(self):
(self.scene_season, self.scene_episode) = sickbeard.scene_numbering.get_scene_numbering(self.show.indexerid,
self.show.indexer,
self.season,
self.episode)
def convertToIndexerNumbering(self):
(self.season, self.episode) = sickbeard.scene_numbering.get_indexer_numbering(self.show.indexerid,
self.show.indexer,
self.scene_season,
self.scene_episode)

View File

@ -984,7 +984,8 @@ class ConfigGeneral:
update_shows_on_start=None, update_frequency=None, launch_browser=None, web_username=None, use_api=None, api_key=None,
web_password=None, version_notify=None, enable_https=None, https_cert=None, https_key=None,
handle_reverse_proxy=None, sort_article=None, auto_update=None, proxy_setting=None,
anon_redirect=None, git_path=None, calendar_unprotected=None, date_preset=None, time_preset=None, indexer_default=None):
anon_redirect=None, git_path=None, calendar_unprotected=None, date_preset=None, time_preset=None, indexer_default=None,
num_of_threads=None):
results = []
@ -1039,6 +1040,8 @@ class ConfigGeneral:
sickbeard.HANDLE_REVERSE_PROXY = config.checkbox_to_value(handle_reverse_proxy)
sickbeard.NUM_OF_THREADS = config.to_int(num_of_threads)
sickbeard.save_config()
if len(results) > 0: