1
0
mirror of https://github.com/moparisthebest/SickRage synced 2025-01-05 10:58:01 -05:00
SickRage/lib/imdb/parser/sql/__init__.py

1596 lines
65 KiB
Python
Raw Normal View History

"""
parser.sql package (imdb package).
This package provides the IMDbSqlAccessSystem class used to access
IMDb's data through a SQL database. Every database supported by
the SQLObject _AND_ SQLAlchemy Object Relational Managers is available.
the imdb.IMDb function will return an instance of this class when
called with the 'accessSystem' argument set to "sql", "database" or "db".
Copyright 2005-2012 Davide Alberani <da@erlug.linux.it>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
# FIXME: this whole module was written in a veeery short amount of time.
# The code should be commented, rewritten and cleaned. :-)
import re
import logging
from difflib import SequenceMatcher
from codecs import lookup
from imdb import IMDbBase
from imdb.utils import normalizeName, normalizeTitle, build_title, \
build_name, analyze_name, analyze_title, \
canonicalTitle, canonicalName, re_titleRef, \
build_company_name, re_episodes, _unicodeArticles, \
analyze_company_name, re_year_index, re_nameRef
from imdb.Person import Person
from imdb.Movie import Movie
from imdb.Company import Company
from imdb._exceptions import IMDbDataAccessError, IMDbError
# Logger for miscellaneous functions.
_aux_logger = logging.getLogger('imdbpy.parser.sql.aux')
# =============================
# Things that once upon a time were in imdb.parser.common.locsql.
def titleVariations(title, fromPtdf=0):
"""Build title variations useful for searches; if fromPtdf is true,
the input is assumed to be in the plain text data files format."""
if fromPtdf: title1 = u''
else: title1 = title
title2 = title3 = u''
if fromPtdf or re_year_index.search(title):
# If it appears to have a (year[/imdbIndex]) indication,
# assume that a long imdb canonical name was provided.
titldict = analyze_title(title, canonical=1)
# title1: the canonical name.
title1 = titldict['title']
if titldict['kind'] != 'episode':
# title3: the long imdb canonical name.
if fromPtdf: title3 = title
else: title3 = build_title(titldict, canonical=1, ptdf=1)
else:
title1 = normalizeTitle(title1)
title3 = build_title(titldict, canonical=1, ptdf=1)
else:
# Just a title.
# title1: the canonical title.
title1 = canonicalTitle(title)
title3 = u''
# title2 is title1 without the article, or title1 unchanged.
if title1:
title2 = title1
t2s = title2.split(u', ')
if t2s[-1].lower() in _unicodeArticles:
title2 = u', '.join(t2s[:-1])
_aux_logger.debug('title variations: 1:[%s] 2:[%s] 3:[%s]',
title1, title2, title3)
return title1, title2, title3
re_nameIndex = re.compile(r'\(([IVXLCDM]+)\)')
def nameVariations(name, fromPtdf=0):
"""Build name variations useful for searches; if fromPtdf is true,
the input is assumed to be in the plain text data files format."""
name1 = name2 = name3 = u''
if fromPtdf or re_nameIndex.search(name):
# We've a name with an (imdbIndex)
namedict = analyze_name(name, canonical=1)
# name1 is the name in the canonical format.
name1 = namedict['name']
# name3 is the canonical name with the imdbIndex.
if fromPtdf:
if namedict.has_key('imdbIndex'):
name3 = name
else:
name3 = build_name(namedict, canonical=1)
else:
# name1 is the name in the canonical format.
name1 = canonicalName(name)
name3 = u''
# name2 is the name in the normal format, if it differs from name1.
name2 = normalizeName(name1)
if name1 == name2: name2 = u''
_aux_logger.debug('name variations: 1:[%s] 2:[%s] 3:[%s]',
name1, name2, name3)
return name1, name2, name3
try:
from cutils import ratcliff as _ratcliff
def ratcliff(s1, s2, sm):
"""Return the Ratcliff-Obershelp value between the two strings,
using the C implementation."""
return _ratcliff(s1.encode('latin_1', 'replace'),
s2.encode('latin_1', 'replace'))
except ImportError:
_aux_logger.warn('Unable to import the cutils.ratcliff function.'
' Searching names and titles using the "sql"'
' data access system will be slower.')
def ratcliff(s1, s2, sm):
"""Ratcliff-Obershelp similarity."""
STRING_MAXLENDIFFER = 0.7
s1len = len(s1)
s2len = len(s2)
if s1len < s2len:
threshold = float(s1len) / s2len
else:
threshold = float(s2len) / s1len
if threshold < STRING_MAXLENDIFFER:
return 0.0
sm.set_seq2(s2.lower())
return sm.ratio()
def merge_roles(mop):
"""Merge multiple roles."""
new_list = []
for m in mop:
if m in new_list:
keep_this = new_list[new_list.index(m)]
if not isinstance(keep_this.currentRole, list):
keep_this.currentRole = [keep_this.currentRole]
keep_this.currentRole.append(m.currentRole)
else:
new_list.append(m)
return new_list
def scan_names(name_list, name1, name2, name3, results=0, ro_thresold=None,
_scan_character=False):
"""Scan a list of names, searching for best matches against
the given variations."""
if ro_thresold is not None: RO_THRESHOLD = ro_thresold
else: RO_THRESHOLD = 0.6
sm1 = SequenceMatcher()
sm2 = SequenceMatcher()
sm3 = SequenceMatcher()
sm1.set_seq1(name1.lower())
if name2: sm2.set_seq1(name2.lower())
if name3: sm3.set_seq1(name3.lower())
resd = {}
for i, n_data in name_list:
nil = n_data['name']
# XXX: on Symbian, here we get a str; not sure this is the
# right place to fix it.
if isinstance(nil, str):
nil = unicode(nil, 'latin1', 'ignore')
# Distance with the canonical name.
ratios = [ratcliff(name1, nil, sm1) + 0.05]
namesurname = u''
if not _scan_character:
nils = nil.split(', ', 1)
surname = nils[0]
if len(nils) == 2: namesurname = '%s %s' % (nils[1], surname)
else:
nils = nil.split(' ', 1)
surname = nils[-1]
namesurname = nil
if surname != nil:
# Distance with the "Surname" in the database.
ratios.append(ratcliff(name1, surname, sm1))
if not _scan_character:
ratios.append(ratcliff(name1, namesurname, sm1))
if name2:
ratios.append(ratcliff(name2, surname, sm2))
# Distance with the "Name Surname" in the database.
if namesurname:
ratios.append(ratcliff(name2, namesurname, sm2))
if name3:
# Distance with the long imdb canonical name.
ratios.append(ratcliff(name3,
build_name(n_data, canonical=1), sm3) + 0.1)
ratio = max(ratios)
if ratio >= RO_THRESHOLD:
if resd.has_key(i):
if ratio > resd[i][0]: resd[i] = (ratio, (i, n_data))
else: resd[i] = (ratio, (i, n_data))
res = resd.values()
res.sort()
res.reverse()
if results > 0: res[:] = res[:results]
return res
def scan_titles(titles_list, title1, title2, title3, results=0,
searchingEpisode=0, onlyEpisodes=0, ro_thresold=None):
"""Scan a list of titles, searching for best matches against
the given variations."""
if ro_thresold is not None: RO_THRESHOLD = ro_thresold
else: RO_THRESHOLD = 0.6
sm1 = SequenceMatcher()
sm2 = SequenceMatcher()
sm3 = SequenceMatcher()
sm1.set_seq1(title1.lower())
sm2.set_seq2(title2.lower())
if title3:
sm3.set_seq1(title3.lower())
if title3[-1] == '}': searchingEpisode = 1
hasArt = 0
if title2 != title1: hasArt = 1
resd = {}
for i, t_data in titles_list:
if onlyEpisodes:
if t_data.get('kind') != 'episode':
continue
til = t_data['title']
if til[-1] == ')':
dateIdx = til.rfind('(')
if dateIdx != -1:
til = til[:dateIdx].rstrip()
if not til:
continue
ratio = ratcliff(title1, til, sm1)
if ratio >= RO_THRESHOLD:
resd[i] = (ratio, (i, t_data))
continue
if searchingEpisode:
if t_data.get('kind') != 'episode': continue
elif t_data.get('kind') == 'episode': continue
til = t_data['title']
# XXX: on Symbian, here we get a str; not sure this is the
# right place to fix it.
if isinstance(til, str):
til = unicode(til, 'latin1', 'ignore')
# Distance with the canonical title (with or without article).
# titleS -> titleR
# titleS, the -> titleR, the
if not searchingEpisode:
til = canonicalTitle(til)
ratios = [ratcliff(title1, til, sm1) + 0.05]
# til2 is til without the article, if present.
til2 = til
tils = til2.split(', ')
matchHasArt = 0
if tils[-1].lower() in _unicodeArticles:
til2 = ', '.join(tils[:-1])
matchHasArt = 1
if hasArt and not matchHasArt:
# titleS[, the] -> titleR
ratios.append(ratcliff(title2, til, sm2))
elif matchHasArt and not hasArt:
# titleS -> titleR[, the]
ratios.append(ratcliff(title1, til2, sm1))
else:
ratios = [0.0]
if title3:
# Distance with the long imdb canonical title.
ratios.append(ratcliff(title3,
build_title(t_data, canonical=1, ptdf=1), sm3) + 0.1)
ratio = max(ratios)
if ratio >= RO_THRESHOLD:
if resd.has_key(i):
if ratio > resd[i][0]:
resd[i] = (ratio, (i, t_data))
else: resd[i] = (ratio, (i, t_data))
res = resd.values()
res.sort()
res.reverse()
if results > 0: res[:] = res[:results]
return res
def scan_company_names(name_list, name1, results=0, ro_thresold=None):
"""Scan a list of company names, searching for best matches against
the given name. Notice that this function takes a list of
strings, and not a list of dictionaries."""
if ro_thresold is not None: RO_THRESHOLD = ro_thresold
else: RO_THRESHOLD = 0.6
sm1 = SequenceMatcher()
sm1.set_seq1(name1.lower())
resd = {}
withoutCountry = not name1.endswith(']')
for i, n in name_list:
# XXX: on Symbian, here we get a str; not sure this is the
# right place to fix it.
if isinstance(n, str):
n = unicode(n, 'latin1', 'ignore')
o_name = n
var = 0.0
if withoutCountry and n.endswith(']'):
cidx = n.rfind('[')
if cidx != -1:
n = n[:cidx].rstrip()
var = -0.05
# Distance with the company name.
ratio = ratcliff(name1, n, sm1) + var
if ratio >= RO_THRESHOLD:
if resd.has_key(i):
if ratio > resd[i][0]: resd[i] = (ratio,
(i, analyze_company_name(o_name)))
else:
resd[i] = (ratio, (i, analyze_company_name(o_name)))
res = resd.values()
res.sort()
res.reverse()
if results > 0: res[:] = res[:results]
return res
try:
from cutils import soundex
except ImportError:
_aux_logger.warn('Unable to import the cutils.soundex function.'
' Searches of movie titles and person names will be'
' a bit slower.')
_translate = dict(B='1', C='2', D='3', F='1', G='2', J='2', K='2', L='4',
M='5', N='5', P='1', Q='2', R='6', S='2', T='3', V='1',
X='2', Z='2')
_translateget = _translate.get
_re_non_ascii = re.compile(r'^[^a-z]*', re.I)
SOUNDEX_LEN = 5
def soundex(s):
"""Return the soundex code for the given string."""
# Maximum length of the soundex code.
s = _re_non_ascii.sub('', s)
if not s: return None
s = s.upper()
soundCode = s[0]
for c in s[1:]:
cw = _translateget(c, '0')
if cw != '0' and soundCode[-1] != cw:
soundCode += cw
return soundCode[:SOUNDEX_LEN] or None
def _sortKeywords(keyword, kwds):
"""Sort a list of keywords, based on the searched one."""
sm = SequenceMatcher()
sm.set_seq1(keyword.lower())
ratios = [(ratcliff(keyword, k, sm), k) for k in kwds]
checkContained = False
if len(keyword) > 4:
checkContained = True
for idx, data in enumerate(ratios):
ratio, key = data
if key.startswith(keyword):
ratios[idx] = (ratio+0.5, key)
elif checkContained and keyword in key:
ratios[idx] = (ratio+0.3, key)
ratios.sort()
ratios.reverse()
return [r[1] for r in ratios]
def filterSimilarKeywords(keyword, kwdsIterator):
"""Return a sorted list of keywords similar to the one given."""
seenDict = {}
kwdSndx = soundex(keyword.encode('ascii', 'ignore'))
matches = []
matchesappend = matches.append
checkContained = False
if len(keyword) > 4:
checkContained = True
for movieID, key in kwdsIterator:
if key in seenDict:
continue
seenDict[key] = None
if checkContained and keyword in key:
matchesappend(key)
continue
if kwdSndx == soundex(key.encode('ascii', 'ignore')):
matchesappend(key)
return _sortKeywords(keyword, matches)
# =============================
_litlist = ['screenplay/teleplay', 'novel', 'adaption', 'book',
'production process protocol', 'interviews',
'printed media reviews', 'essays', 'other literature']
_litd = dict([(x, ('literature', x)) for x in _litlist])
_buslist = ['budget', 'weekend gross', 'gross', 'opening weekend', 'rentals',
'admissions', 'filming dates', 'production dates', 'studios',
'copyright holder']
_busd = dict([(x, ('business', x)) for x in _buslist])
def _reGroupDict(d, newgr):
"""Regroup keys in the d dictionary in subdictionaries, based on
the scheme in the newgr dictionary.
E.g.: in the newgr, an entry 'LD label': ('laserdisc', 'label')
tells the _reGroupDict() function to take the entry with
label 'LD label' (as received from the sql database)
and put it in the subsection (another dictionary) named
'laserdisc', using the key 'label'."""
r = {}
newgrks = newgr.keys()
for k, v in d.items():
if k in newgrks:
r.setdefault(newgr[k][0], {})[newgr[k][1]] = v
# A not-so-clearer version:
##r.setdefault(newgr[k][0], {})
##r[newgr[k][0]][newgr[k][1]] = v
else: r[k] = v
return r
def _groupListBy(l, index):
"""Regroup items in a list in a list of lists, grouped by
the value at the given index."""
tmpd = {}
for item in l:
tmpd.setdefault(item[index], []).append(item)
res = tmpd.values()
return res
def sub_dict(d, keys):
"""Return the subdictionary of 'd', with just the keys listed in 'keys'."""
return dict([(k, d[k]) for k in keys if k in d])
def get_movie_data(movieID, kindDict, fromAka=0, _table=None):
"""Return a dictionary containing data about the given movieID;
if fromAka is true, the AkaTitle table is searched; _table is
reserved for the imdbpy2sql.py script."""
if _table is not None:
Table = _table
else:
if not fromAka: Table = Title
else: Table = AkaTitle
try:
m = Table.get(movieID)
except Exception, e:
_aux_logger.warn('Unable to fetch information for movieID %s: %s', movieID, e)
mdict = {}
return mdict
mdict = {'title': m.title, 'kind': kindDict[m.kindID],
'year': m.productionYear, 'imdbIndex': m.imdbIndex,
'season': m.seasonNr, 'episode': m.episodeNr}
if not fromAka:
if m.seriesYears is not None:
mdict['series years'] = unicode(m.seriesYears)
if mdict['imdbIndex'] is None: del mdict['imdbIndex']
if mdict['year'] is None: del mdict['year']
else:
try:
mdict['year'] = int(mdict['year'])
except (TypeError, ValueError):
del mdict['year']
if mdict['season'] is None: del mdict['season']
else:
try: mdict['season'] = int(mdict['season'])
except: pass
if mdict['episode'] is None: del mdict['episode']
else:
try: mdict['episode'] = int(mdict['episode'])
except: pass
episodeOfID = m.episodeOfID
if episodeOfID is not None:
ser_dict = get_movie_data(episodeOfID, kindDict, fromAka)
mdict['episode of'] = Movie(data=ser_dict, movieID=episodeOfID,
accessSystem='sql')
if fromAka:
ser_note = AkaTitle.get(episodeOfID).note
if ser_note:
mdict['episode of'].notes = ser_note
return mdict
def _iterKeywords(results):
"""Iterate over (key.id, key.keyword) columns of a selection of
the Keyword table."""
for key in results:
yield key.id, key.keyword
def getSingleInfo(table, movieID, infoType, notAList=False):
"""Return a dictionary in the form {infoType: infoListOrString},
retrieving a single set of information about a given movie, from
the specified table."""
infoTypeID = InfoType.select(InfoType.q.info == infoType)
if infoTypeID.count() == 0:
return {}
res = table.select(AND(table.q.movieID == movieID,
table.q.infoTypeID == infoTypeID[0].id))
retList = []
for r in res:
info = r.info
note = r.note
if note:
info += u'::%s' % note
retList.append(info)
if not retList:
return {}
if not notAList: return {infoType: retList}
else: return {infoType: retList[0]}
def _cmpTop(a, b, what='top 250 rank'):
"""Compare function used to sort top 250/bottom 10 rank."""
av = int(a[1].get(what))
bv = int(b[1].get(what))
if av == bv:
return 0
return (-1, 1)[av > bv]
def _cmpBottom(a, b):
"""Compare function used to sort top 250/bottom 10 rank."""
return _cmpTop(a, b, what='bottom 10 rank')
class IMDbSqlAccessSystem(IMDbBase):
"""The class used to access IMDb's data through a SQL database."""
accessSystem = 'sql'
_sql_logger = logging.getLogger('imdbpy.parser.sql')
def __init__(self, uri, adultSearch=1, useORM=None, *arguments, **keywords):
"""Initialize the access system."""
IMDbBase.__init__(self, *arguments, **keywords)
if useORM is None:
useORM = ('sqlobject', 'sqlalchemy')
if not isinstance(useORM, (tuple, list)):
if ',' in useORM:
useORM = useORM.split(',')
else:
useORM = [useORM]
self.useORM = useORM
nrMods = len(useORM)
_gotError = False
DB_TABLES = []
for idx, mod in enumerate(useORM):
mod = mod.strip().lower()
try:
if mod == 'sqlalchemy':
from alchemyadapter import getDBTables, NotFoundError, \
setConnection, AND, OR, IN, \
ISNULL, CONTAINSSTRING, toUTF8
elif mod == 'sqlobject':
from objectadapter import getDBTables, NotFoundError, \
setConnection, AND, OR, IN, \
ISNULL, CONTAINSSTRING, toUTF8
else:
self._sql_logger.warn('unknown module "%s"' % mod)
continue
self._sql_logger.info('using %s ORM', mod)
# XXX: look ma'... black magic! It's used to make
# TableClasses and some functions accessible
# through the whole module.
for k, v in [('NotFoundError', NotFoundError),
('AND', AND), ('OR', OR), ('IN', IN),
('ISNULL', ISNULL),
('CONTAINSSTRING', CONTAINSSTRING)]:
globals()[k] = v
self.toUTF8 = toUTF8
DB_TABLES = getDBTables(uri)
for t in DB_TABLES:
globals()[t._imdbpyName] = t
if _gotError:
self._sql_logger.warn('falling back to "%s"' % mod)
break
except ImportError, e:
if idx+1 >= nrMods:
raise IMDbError('unable to use any ORM in %s: %s' % (
str(useORM), str(e)))
else:
self._sql_logger.warn('unable to use "%s": %s' % (mod,
str(e)))
_gotError = True
continue
else:
raise IMDbError('unable to use any ORM in %s' % str(useORM))
# Set the connection to the database.
self._sql_logger.debug('connecting to %s', uri)
try:
self._connection = setConnection(uri, DB_TABLES)
except AssertionError, e:
raise IMDbDataAccessError( \
'unable to connect to the database server; ' + \
'complete message: "%s"' % str(e))
self.Error = self._connection.module.Error
# Maps some IDs to the corresponding strings.
self._kind = {}
self._kindRev = {}
self._sql_logger.debug('reading constants from the database')
try:
for kt in KindType.select():
self._kind[kt.id] = kt.kind
self._kindRev[str(kt.kind)] = kt.id
except self.Error:
# NOTE: you can also get the error, but - at least with
# MySQL - it also contains the password, and I don't
# like the idea to print it out.
raise IMDbDataAccessError( \
'unable to connect to the database server')
self._role = {}
for rl in RoleType.select():
self._role[rl.id] = str(rl.role)
self._info = {}
self._infoRev = {}
for inf in InfoType.select():
self._info[inf.id] = str(inf.info)
self._infoRev[str(inf.info)] = inf.id
self._compType = {}
for cType in CompanyType.select():
self._compType[cType.id] = cType.kind
info = [(it.id, it.info) for it in InfoType.select()]
self._compcast = {}
for cc in CompCastType.select():
self._compcast[cc.id] = str(cc.kind)
self._link = {}
for lt in LinkType.select():
self._link[lt.id] = str(lt.link)
self._moviesubs = {}
# Build self._moviesubs, a dictionary used to rearrange
# the data structure for a movie object.
for vid, vinfo in info:
if not vinfo.startswith('LD '): continue
self._moviesubs[vinfo] = ('laserdisc', vinfo[3:])
self._moviesubs.update(_litd)
self._moviesubs.update(_busd)
self.do_adult_search(adultSearch)
def _findRefs(self, o, trefs, nrefs):
"""Find titles or names references in strings."""
if isinstance(o, (unicode, str)):
for title in re_titleRef.findall(o):
a_title = analyze_title(title, canonical=0)
rtitle = build_title(a_title, ptdf=1)
if trefs.has_key(rtitle): continue
movieID = self._getTitleID(rtitle)
if movieID is None:
movieID = self._getTitleID(title)
if movieID is None:
continue
m = Movie(title=rtitle, movieID=movieID,
accessSystem=self.accessSystem)
trefs[rtitle] = m
rtitle2 = canonicalTitle(a_title.get('title', u''))
if rtitle2 and rtitle2 != rtitle and rtitle2 != title:
trefs[rtitle2] = m
if title != rtitle:
trefs[title] = m
for name in re_nameRef.findall(o):
a_name = analyze_name(name, canonical=1)
rname = build_name(a_name, canonical=1)
if nrefs.has_key(rname): continue
personID = self._getNameID(rname)
if personID is None:
personID = self._getNameID(name)
if personID is None: continue
p = Person(name=rname, personID=personID,
accessSystem=self.accessSystem)
nrefs[rname] = p
rname2 = normalizeName(a_name.get('name', u''))
if rname2 and rname2 != rname:
nrefs[rname2] = p
if name != rname and name != rname2:
nrefs[name] = p
elif isinstance(o, (list, tuple)):
for item in o:
self._findRefs(item, trefs, nrefs)
elif isinstance(o, dict):
for value in o.values():
self._findRefs(value, trefs, nrefs)
return (trefs, nrefs)
def _extractRefs(self, o):
"""Scan for titles or names references in strings."""
trefs = {}
nrefs = {}
try:
return self._findRefs(o, trefs, nrefs)
except RuntimeError, e:
# Symbian/python 2.2 has a poor regexp implementation.
import warnings
warnings.warn('RuntimeError in '
"imdb.parser.sql.IMDbSqlAccessSystem; "
"if it's not a recursion limit exceeded and we're not "
"running in a Symbian environment, it's a bug:\n%s" % e)
return (trefs, nrefs)
def _changeAKAencoding(self, akanotes, akatitle):
"""Return akatitle in the correct charset, as specified in
the akanotes field; if akatitle doesn't need to be modified,
return None."""
oti = akanotes.find('(original ')
if oti == -1: return None
ote = akanotes[oti+10:].find(' title)')
if ote != -1:
cs_info = akanotes[oti+10:oti+10+ote].lower().split()
for e in cs_info:
# excludes some strings that clearly are not encoding.
if e in ('script', '', 'cyrillic', 'greek'): continue
if e.startswith('iso-') and e.find('latin') != -1:
e = e[4:].replace('-', '')
try:
lookup(e)
lat1 = akatitle.encode('latin_1', 'replace')
return unicode(lat1, e, 'replace')
except (LookupError, ValueError, TypeError):
continue
return None
def _buildNULLCondition(self, col, val):
"""Build a comparison for columns where values can be NULL."""
if val is None:
return ISNULL(col)
else:
if isinstance(val, (int, long)):
return col == val
else:
return col == self.toUTF8(val)
def _getTitleID(self, title):
"""Given a long imdb canonical title, returns a movieID or
None if not found."""
td = analyze_title(title)
condition = None
if td['kind'] == 'episode':
epof = td['episode of']
seriesID = [s.id for s in Title.select(
AND(Title.q.title == self.toUTF8(epof['title']),
self._buildNULLCondition(Title.q.imdbIndex,
epof.get('imdbIndex')),
Title.q.kindID == self._kindRev[epof['kind']],
self._buildNULLCondition(Title.q.productionYear,
epof.get('year'))))]
if seriesID:
condition = AND(IN(Title.q.episodeOfID, seriesID),
Title.q.title == self.toUTF8(td['title']),
self._buildNULLCondition(Title.q.imdbIndex,
td.get('imdbIndex')),
Title.q.kindID == self._kindRev[td['kind']],
self._buildNULLCondition(Title.q.productionYear,
td.get('year')))
if condition is None:
condition = AND(Title.q.title == self.toUTF8(td['title']),
self._buildNULLCondition(Title.q.imdbIndex,
td.get('imdbIndex')),
Title.q.kindID == self._kindRev[td['kind']],
self._buildNULLCondition(Title.q.productionYear,
td.get('year')))
res = Title.select(condition)
try:
if res.count() != 1:
return None
except (UnicodeDecodeError, TypeError):
return None
return res[0].id
def _getNameID(self, name):
"""Given a long imdb canonical name, returns a personID or
None if not found."""
nd = analyze_name(name)
res = Name.select(AND(Name.q.name == self.toUTF8(nd['name']),
self._buildNULLCondition(Name.q.imdbIndex,
nd.get('imdbIndex'))))
try:
c = res.count()
if res.count() != 1:
return None
except (UnicodeDecodeError, TypeError):
return None
return res[0].id
def _normalize_movieID(self, movieID):
"""Normalize the given movieID."""
try:
return int(movieID)
except (ValueError, OverflowError):
raise IMDbError('movieID "%s" can\'t be converted to integer' % \
movieID)
def _normalize_personID(self, personID):
"""Normalize the given personID."""
try:
return int(personID)
except (ValueError, OverflowError):
raise IMDbError('personID "%s" can\'t be converted to integer' % \
personID)
def _normalize_characterID(self, characterID):
"""Normalize the given characterID."""
try:
return int(characterID)
except (ValueError, OverflowError):
raise IMDbError('characterID "%s" can\'t be converted to integer' \
% characterID)
def _normalize_companyID(self, companyID):
"""Normalize the given companyID."""
try:
return int(companyID)
except (ValueError, OverflowError):
raise IMDbError('companyID "%s" can\'t be converted to integer' \
% companyID)
def get_imdbMovieID(self, movieID):
"""Translate a movieID in an imdbID.
If not in the database, try an Exact Primary Title search on IMDb;
return None if it's unable to get the imdbID.
"""
try: movie = Title.get(movieID)
except NotFoundError: return None
imdbID = movie.imdbID
if imdbID is not None: return '%07d' % imdbID
m_dict = get_movie_data(movie.id, self._kind)
titline = build_title(m_dict, ptdf=0)
imdbID = self.title2imdbID(titline, m_dict['kind'])
# If the imdbID was retrieved from the web and was not in the
# database, update the database (ignoring errors, because it's
# possibile that the current user has not update privileges).
# There're times when I think I'm a genius; this one of
# those times... <g>
if imdbID is not None and not isinstance(imdbID, list):
try: movie.imdbID = int(imdbID)
except: pass
return imdbID
def get_imdbPersonID(self, personID):
"""Translate a personID in an imdbID.
If not in the database, try an Exact Primary Name search on IMDb;
return None if it's unable to get the imdbID.
"""
try: person = Name.get(personID)
except NotFoundError: return None
imdbID = person.imdbID
if imdbID is not None: return '%07d' % imdbID
n_dict = {'name': person.name, 'imdbIndex': person.imdbIndex}
namline = build_name(n_dict, canonical=False)
imdbID = self.name2imdbID(namline)
if imdbID is not None and not isinstance(imdbID, list):
try: person.imdbID = int(imdbID)
except: pass
return imdbID
def get_imdbCharacterID(self, characterID):
"""Translate a characterID in an imdbID.
If not in the database, try an Exact Primary Name search on IMDb;
return None if it's unable to get the imdbID.
"""
try: character = CharName.get(characterID)
except NotFoundError: return None
imdbID = character.imdbID
if imdbID is not None: return '%07d' % imdbID
n_dict = {'name': character.name, 'imdbIndex': character.imdbIndex}
namline = build_name(n_dict, canonical=False)
imdbID = self.character2imdbID(namline)
if imdbID is not None and not isinstance(imdbID, list):
try: character.imdbID = int(imdbID)
except: pass
return imdbID
def get_imdbCompanyID(self, companyID):
"""Translate a companyID in an imdbID.
If not in the database, try an Exact Primary Name search on IMDb;
return None if it's unable to get the imdbID.
"""
try: company = CompanyName.get(companyID)
except NotFoundError: return None
imdbID = company.imdbID
if imdbID is not None: return '%07d' % imdbID
n_dict = {'name': company.name, 'country': company.countryCode}
namline = build_company_name(n_dict)
imdbID = self.company2imdbID(namline)
if imdbID is not None and not isinstance(imdbID, list):
try: company.imdbID = int(imdbID)
except: pass
return imdbID
def do_adult_search(self, doAdult):
"""If set to 0 or False, movies in the Adult category are not
episodeOf = title_dict.get('episode of')
shown in the results of a search."""
self.doAdult = doAdult
def _search_movie(self, title, results, _episodes=False):
title = title.strip()
if not title: return []
title_dict = analyze_title(title, canonical=1)
s_title = title_dict['title']
if not s_title: return []
episodeOf = title_dict.get('episode of')
if episodeOf:
_episodes = False
s_title_split = s_title.split(', ')
if len(s_title_split) > 1 and \
s_title_split[-1].lower() in _unicodeArticles:
s_title_rebuilt = ', '.join(s_title_split[:-1])
if s_title_rebuilt:
s_title = s_title_rebuilt
#if not episodeOf:
# if not _episodes:
# s_title_split = s_title.split(', ')
# if len(s_title_split) > 1 and \
# s_title_split[-1].lower() in _articles:
# s_title_rebuilt = ', '.join(s_title_split[:-1])
# if s_title_rebuilt:
# s_title = s_title_rebuilt
#else:
# _episodes = False
if isinstance(s_title, unicode):
s_title = s_title.encode('ascii', 'ignore')
soundexCode = soundex(s_title)
# XXX: improve the search restricting the kindID if the
# "kind" of the input differs from "movie"?
condition = conditionAka = None
if _episodes:
condition = AND(Title.q.phoneticCode == soundexCode,
Title.q.kindID == self._kindRev['episode'])
conditionAka = AND(AkaTitle.q.phoneticCode == soundexCode,
AkaTitle.q.kindID == self._kindRev['episode'])
elif title_dict['kind'] == 'episode' and episodeOf is not None:
# set canonical=0 ? Should not make much difference.
series_title = build_title(episodeOf, canonical=1)
# XXX: is it safe to get "results" results?
# Too many? Too few?
serRes = results
if serRes < 3 or serRes > 10:
serRes = 10
searchSeries = self._search_movie(series_title, serRes)
seriesIDs = [result[0] for result in searchSeries]
if seriesIDs:
condition = AND(Title.q.phoneticCode == soundexCode,
IN(Title.q.episodeOfID, seriesIDs),
Title.q.kindID == self._kindRev['episode'])
conditionAka = AND(AkaTitle.q.phoneticCode == soundexCode,
IN(AkaTitle.q.episodeOfID, seriesIDs),
AkaTitle.q.kindID == self._kindRev['episode'])
else:
# XXX: bad situation: we have found no matching series;
# try searching everything (both episodes and
# non-episodes) for the title.
condition = AND(Title.q.phoneticCode == soundexCode,
IN(Title.q.episodeOfID, seriesIDs))
conditionAka = AND(AkaTitle.q.phoneticCode == soundexCode,
IN(AkaTitle.q.episodeOfID, seriesIDs))
if condition is None:
# XXX: excludes episodes?
condition = AND(Title.q.kindID != self._kindRev['episode'],
Title.q.phoneticCode == soundexCode)
conditionAka = AND(AkaTitle.q.kindID != self._kindRev['episode'],
AkaTitle.q.phoneticCode == soundexCode)
# Up to 3 variations of the title are searched, plus the
# long imdb canonical title, if provided.
if not _episodes:
title1, title2, title3 = titleVariations(title)
else:
title1 = title
title2 = ''
title3 = ''
try:
qr = [(q.id, get_movie_data(q.id, self._kind))
for q in Title.select(condition)]
q2 = [(q.movieID, get_movie_data(q.id, self._kind, fromAka=1))
for q in AkaTitle.select(conditionAka)]
qr += q2
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to search the database: "%s"' % str(e))
resultsST = results * 3
res = scan_titles(qr, title1, title2, title3, resultsST,
searchingEpisode=episodeOf is not None,
onlyEpisodes=_episodes,
ro_thresold=0.0)
res[:] = [x[1] for x in res]
if res and not self.doAdult:
mids = [x[0] for x in res]
genreID = self._infoRev['genres']
adultlist = [al.movieID for al
in MovieInfo.select(
AND(MovieInfo.q.infoTypeID == genreID,
MovieInfo.q.info == 'Adult',
IN(MovieInfo.q.movieID, mids)))]
res[:] = [x for x in res if x[0] not in adultlist]
new_res = []
# XXX: can there be duplicates?
for r in res:
if r not in q2:
new_res.append(r)
continue
mdict = r[1]
aka_title = build_title(mdict, ptdf=1)
orig_dict = get_movie_data(r[0], self._kind)
orig_title = build_title(orig_dict, ptdf=1)
if aka_title == orig_title:
new_res.append(r)
continue
orig_dict['akas'] = [aka_title]
new_res.append((r[0], orig_dict))
if results > 0: new_res[:] = new_res[:results]
return new_res
def _search_episode(self, title, results):
return self._search_movie(title, results, _episodes=True)
def get_movie_main(self, movieID):
# Every movie information is retrieved from here.
infosets = self.get_movie_infoset()
try:
res = get_movie_data(movieID, self._kind)
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to get movieID "%s": "%s"' % (movieID, str(e)))
if not res:
raise IMDbDataAccessError('unable to get movieID "%s"' % movieID)
# Collect cast information.
castdata = [[cd.personID, cd.personRoleID, cd.note, cd.nrOrder,
self._role[cd.roleID]]
for cd in CastInfo.select(CastInfo.q.movieID == movieID)]
for p in castdata:
person = Name.get(p[0])
p += [person.name, person.imdbIndex]
if p[4] in ('actor', 'actress'):
p[4] = 'cast'
# Regroup by role/duty (cast, writer, director, ...)
castdata[:] = _groupListBy(castdata, 4)
for group in castdata:
duty = group[0][4]
for pdata in group:
curRole = pdata[1]
curRoleID = None
if curRole is not None:
robj = CharName.get(curRole)
curRole = robj.name
curRoleID = robj.id
p = Person(personID=pdata[0], name=pdata[5],
currentRole=curRole or u'',
roleID=curRoleID,
notes=pdata[2] or u'',
accessSystem='sql')
if pdata[6]: p['imdbIndex'] = pdata[6]
p.billingPos = pdata[3]
res.setdefault(duty, []).append(p)
if duty == 'cast':
res[duty] = merge_roles(res[duty])
res[duty].sort()
# Info about the movie.
minfo = [(self._info[m.infoTypeID], m.info, m.note)
for m in MovieInfo.select(MovieInfo.q.movieID == movieID)]
minfo += [(self._info[m.infoTypeID], m.info, m.note)
for m in MovieInfoIdx.select(MovieInfoIdx.q.movieID == movieID)]
minfo += [('keywords', Keyword.get(m.keywordID).keyword, None)
for m in MovieKeyword.select(MovieKeyword.q.movieID == movieID)]
minfo = _groupListBy(minfo, 0)
for group in minfo:
sect = group[0][0]
for mdata in group:
data = mdata[1]
if mdata[2]: data += '::%s' % mdata[2]
res.setdefault(sect, []).append(data)
# Companies info about a movie.
cinfo = [(self._compType[m.companyTypeID], m.companyID, m.note) for m
in MovieCompanies.select(MovieCompanies.q.movieID == movieID)]
cinfo = _groupListBy(cinfo, 0)
for group in cinfo:
sect = group[0][0]
for mdata in group:
cDb = CompanyName.get(mdata[1])
cDbTxt = cDb.name
if cDb.countryCode:
cDbTxt += ' %s' % cDb.countryCode
company = Company(name=cDbTxt,
companyID=mdata[1],
notes=mdata[2] or u'',
accessSystem=self.accessSystem)
res.setdefault(sect, []).append(company)
# AKA titles.
akat = [(get_movie_data(at.id, self._kind, fromAka=1), at.note)
for at in AkaTitle.select(AkaTitle.q.movieID == movieID)]
if akat:
res['akas'] = []
for td, note in akat:
nt = build_title(td, ptdf=1)
if note:
net = self._changeAKAencoding(note, nt)
if net is not None: nt = net
nt += '::%s' % note
if nt not in res['akas']: res['akas'].append(nt)
# Complete cast/crew.
compcast = [(self._compcast[cc.subjectID], self._compcast[cc.statusID])
for cc in CompleteCast.select(CompleteCast.q.movieID == movieID)]
if compcast:
for entry in compcast:
val = unicode(entry[1])
res[u'complete %s' % entry[0]] = val
# Movie connections.
mlinks = [[ml.linkedMovieID, self._link[ml.linkTypeID]]
for ml in MovieLink.select(MovieLink.q.movieID == movieID)]
if mlinks:
for ml in mlinks:
lmovieData = get_movie_data(ml[0], self._kind)
if lmovieData:
m = Movie(movieID=ml[0], data=lmovieData, accessSystem='sql')
ml[0] = m
res['connections'] = {}
mlinks[:] = _groupListBy(mlinks, 1)
for group in mlinks:
lt = group[0][1]
res['connections'][lt] = [i[0] for i in group]
# Episodes.
episodes = {}
eps_list = list(Title.select(Title.q.episodeOfID == movieID))
eps_list.sort()
if eps_list:
ps_data = {'title': res['title'], 'kind': res['kind'],
'year': res.get('year'),
'imdbIndex': res.get('imdbIndex')}
parentSeries = Movie(movieID=movieID, data=ps_data,
accessSystem='sql')
for episode in eps_list:
episodeID = episode.id
episode_data = get_movie_data(episodeID, self._kind)
m = Movie(movieID=episodeID, data=episode_data,
accessSystem='sql')
m['episode of'] = parentSeries
season = episode_data.get('season', 'UNKNOWN')
if season not in episodes: episodes[season] = {}
ep_number = episode_data.get('episode')
if ep_number is None:
ep_number = max((episodes[season].keys() or [0])) + 1
episodes[season][ep_number] = m
res['episodes'] = episodes
res['number of episodes'] = sum([len(x) for x in episodes.values()])
res['number of seasons'] = len(episodes.keys())
# Regroup laserdisc information.
res = _reGroupDict(res, self._moviesubs)
# Do some transformation to preserve consistency with other
# data access systems.
if 'quotes' in res:
for idx, quote in enumerate(res['quotes']):
res['quotes'][idx] = quote.split('::')
if 'runtimes' in res and len(res['runtimes']) > 0:
rt = res['runtimes'][0]
episodes = re_episodes.findall(rt)
if episodes:
res['runtimes'][0] = re_episodes.sub('', rt)
if res['runtimes'][0][-2:] == '::':
res['runtimes'][0] = res['runtimes'][0][:-2]
if 'votes' in res:
res['votes'] = int(res['votes'][0])
if 'rating' in res:
res['rating'] = float(res['rating'][0])
if 'votes distribution' in res:
res['votes distribution'] = res['votes distribution'][0]
if 'mpaa' in res:
res['mpaa'] = res['mpaa'][0]
if 'top 250 rank' in res:
try: res['top 250 rank'] = int(res['top 250 rank'])
except: pass
if 'bottom 10 rank' in res:
try: res['bottom 100 rank'] = int(res['bottom 10 rank'])
except: pass
del res['bottom 10 rank']
for old, new in [('guest', 'guests'), ('trademarks', 'trade-mark'),
('articles', 'article'), ('pictorials', 'pictorial'),
('magazine-covers', 'magazine-cover-photo')]:
if old in res:
res[new] = res[old]
del res[old]
trefs,nrefs = {}, {}
trefs,nrefs = self._extractRefs(sub_dict(res,Movie.keys_tomodify_list))
return {'data': res, 'titlesRefs': trefs, 'namesRefs': nrefs,
'info sets': infosets}
# Just to know what kind of information are available.
get_movie_alternate_versions = get_movie_main
get_movie_business = get_movie_main
get_movie_connections = get_movie_main
get_movie_crazy_credits = get_movie_main
get_movie_goofs = get_movie_main
get_movie_keywords = get_movie_main
get_movie_literature = get_movie_main
get_movie_locations = get_movie_main
get_movie_plot = get_movie_main
get_movie_quotes = get_movie_main
get_movie_release_dates = get_movie_main
get_movie_soundtrack = get_movie_main
get_movie_taglines = get_movie_main
get_movie_technical = get_movie_main
get_movie_trivia = get_movie_main
get_movie_vote_details = get_movie_main
get_movie_episodes = get_movie_main
def _search_person(self, name, results):
name = name.strip()
if not name: return []
s_name = analyze_name(name)['name']
if not s_name: return []
if isinstance(s_name, unicode):
s_name = s_name.encode('ascii', 'ignore')
soundexCode = soundex(s_name)
name1, name2, name3 = nameVariations(name)
# If the soundex is None, compare only with the first
# phoneticCode column.
if soundexCode is not None:
condition = IN(soundexCode, [Name.q.namePcodeCf,
Name.q.namePcodeNf,
Name.q.surnamePcode])
conditionAka = IN(soundexCode, [AkaName.q.namePcodeCf,
AkaName.q.namePcodeNf,
AkaName.q.surnamePcode])
else:
condition = ISNULL(Name.q.namePcodeCf)
conditionAka = ISNULL(AkaName.q.namePcodeCf)
try:
qr = [(q.id, {'name': q.name, 'imdbIndex': q.imdbIndex})
for q in Name.select(condition)]
q2 = [(q.personID, {'name': q.name, 'imdbIndex': q.imdbIndex})
for q in AkaName.select(conditionAka)]
qr += q2
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to search the database: "%s"' % str(e))
res = scan_names(qr, name1, name2, name3, results)
res[:] = [x[1] for x in res]
# Purge empty imdbIndex.
returnl = []
for x in res:
tmpd = x[1]
if tmpd['imdbIndex'] is None:
del tmpd['imdbIndex']
returnl.append((x[0], tmpd))
new_res = []
# XXX: can there be duplicates?
for r in returnl:
if r not in q2:
new_res.append(r)
continue
pdict = r[1]
aka_name = build_name(pdict, canonical=1)
p = Name.get(r[0])
orig_dict = {'name': p.name, 'imdbIndex': p.imdbIndex}
if orig_dict['imdbIndex'] is None:
del orig_dict['imdbIndex']
orig_name = build_name(orig_dict, canonical=1)
if aka_name == orig_name:
new_res.append(r)
continue
orig_dict['akas'] = [aka_name]
new_res.append((r[0], orig_dict))
if results > 0: new_res[:] = new_res[:results]
return new_res
def get_person_main(self, personID):
# Every person information is retrieved from here.
infosets = self.get_person_infoset()
try:
p = Name.get(personID)
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to get personID "%s": "%s"' % (personID, str(e)))
res = {'name': p.name, 'imdbIndex': p.imdbIndex}
if res['imdbIndex'] is None: del res['imdbIndex']
if not res:
raise IMDbDataAccessError('unable to get personID "%s"' % personID)
# Collect cast information.
castdata = [(cd.movieID, cd.personRoleID, cd.note,
self._role[cd.roleID],
get_movie_data(cd.movieID, self._kind))
for cd in CastInfo.select(CastInfo.q.personID == personID)]
# Regroup by role/duty (cast, writer, director, ...)
castdata[:] = _groupListBy(castdata, 3)
episodes = {}
seenDuties = []
for group in castdata:
for mdata in group:
duty = orig_duty = group[0][3]
if duty not in seenDuties: seenDuties.append(orig_duty)
note = mdata[2] or u''
if 'episode of' in mdata[4]:
duty = 'episodes'
if orig_duty not in ('actor', 'actress'):
if note: note = ' %s' % note
note = '[%s]%s' % (orig_duty, note)
curRole = mdata[1]
curRoleID = None
if curRole is not None:
robj = CharName.get(curRole)
curRole = robj.name
curRoleID = robj.id
m = Movie(movieID=mdata[0], data=mdata[4],
currentRole=curRole or u'',
roleID=curRoleID,
notes=note, accessSystem='sql')
if duty != 'episodes':
res.setdefault(duty, []).append(m)
else:
episodes.setdefault(m['episode of'], []).append(m)
if episodes:
for k in episodes:
episodes[k].sort()
episodes[k].reverse()
res['episodes'] = episodes
for duty in seenDuties:
if duty in res:
if duty in ('actor', 'actress', 'himself', 'herself',
'themselves'):
res[duty] = merge_roles(res[duty])
res[duty].sort()
# Info about the person.
pinfo = [(self._info[pi.infoTypeID], pi.info, pi.note)
for pi in PersonInfo.select(PersonInfo.q.personID == personID)]
# Regroup by duty.
pinfo = _groupListBy(pinfo, 0)
for group in pinfo:
sect = group[0][0]
for pdata in group:
data = pdata[1]
if pdata[2]: data += '::%s' % pdata[2]
res.setdefault(sect, []).append(data)
# AKA names.
akan = [(an.name, an.imdbIndex)
for an in AkaName.select(AkaName.q.personID == personID)]
if akan:
res['akas'] = []
for n in akan:
nd = {'name': n[0]}
if n[1]: nd['imdbIndex'] = n[1]
nt = build_name(nd, canonical=1)
res['akas'].append(nt)
# Do some transformation to preserve consistency with other
# data access systems.
for key in ('birth date', 'birth notes', 'death date', 'death notes',
'birth name', 'height'):
if key in res:
res[key] = res[key][0]
if 'guest' in res:
res['notable tv guest appearances'] = res['guest']
del res['guest']
miscnames = res.get('nick names', [])
if 'birth name' in res: miscnames.append(res['birth name'])
if 'akas' in res:
for mname in miscnames:
if mname in res['akas']: res['akas'].remove(mname)
if not res['akas']: del res['akas']
trefs,nrefs = self._extractRefs(sub_dict(res,Person.keys_tomodify_list))
return {'data': res, 'titlesRefs': trefs, 'namesRefs': nrefs,
'info sets': infosets}
# Just to know what kind of information are available.
get_person_filmography = get_person_main
get_person_biography = get_person_main
get_person_other_works = get_person_main
get_person_episodes = get_person_main
def _search_character(self, name, results):
name = name.strip()
if not name: return []
s_name = analyze_name(name)['name']
if not s_name: return []
if isinstance(s_name, unicode):
s_name = s_name.encode('ascii', 'ignore')
s_name = normalizeName(s_name)
soundexCode = soundex(s_name)
surname = s_name.split(' ')[-1]
surnameSoundex = soundex(surname)
name2 = ''
soundexName2 = None
nsplit = s_name.split()
if len(nsplit) > 1:
name2 = '%s %s' % (nsplit[-1], ' '.join(nsplit[:-1]))
if s_name == name2:
name2 = ''
else:
soundexName2 = soundex(name2)
# If the soundex is None, compare only with the first
# phoneticCode column.
if soundexCode is not None:
if soundexName2 is not None:
condition = OR(surnameSoundex == CharName.q.surnamePcode,
IN(CharName.q.namePcodeNf, [soundexCode,
soundexName2]),
IN(CharName.q.surnamePcode, [soundexCode,
soundexName2]))
else:
condition = OR(surnameSoundex == CharName.q.surnamePcode,
IN(soundexCode, [CharName.q.namePcodeNf,
CharName.q.surnamePcode]))
else:
condition = ISNULL(Name.q.namePcodeNf)
try:
qr = [(q.id, {'name': q.name, 'imdbIndex': q.imdbIndex})
for q in CharName.select(condition)]
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to search the database: "%s"' % str(e))
res = scan_names(qr, s_name, name2, '', results,
_scan_character=True)
res[:] = [x[1] for x in res]
# Purge empty imdbIndex.
returnl = []
for x in res:
tmpd = x[1]
if tmpd['imdbIndex'] is None:
del tmpd['imdbIndex']
returnl.append((x[0], tmpd))
return returnl
def get_character_main(self, characterID, results=1000):
# Every character information is retrieved from here.
infosets = self.get_character_infoset()
try:
c = CharName.get(characterID)
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to get characterID "%s": "%s"' % (characterID, e))
res = {'name': c.name, 'imdbIndex': c.imdbIndex}
if res['imdbIndex'] is None: del res['imdbIndex']
if not res:
raise IMDbDataAccessError('unable to get characterID "%s"' % \
characterID)
# Collect filmography information.
items = CastInfo.select(CastInfo.q.personRoleID == characterID)
if results > 0:
items = items[:results]
filmodata = [(cd.movieID, cd.personID, cd.note,
get_movie_data(cd.movieID, self._kind)) for cd in items
if self._role[cd.roleID] in ('actor', 'actress')]
fdata = []
for f in filmodata:
curRole = None
curRoleID = f[1]
note = f[2] or u''
if curRoleID is not None:
robj = Name.get(curRoleID)
curRole = robj.name
m = Movie(movieID=f[0], data=f[3],
currentRole=curRole or u'',
roleID=curRoleID, roleIsPerson=True,
notes=note, accessSystem='sql')
fdata.append(m)
fdata = merge_roles(fdata)
fdata.sort()
if fdata:
res['filmography'] = fdata
return {'data': res, 'info sets': infosets}
get_character_filmography = get_character_main
get_character_biography = get_character_main
def _search_company(self, name, results):
name = name.strip()
if not name: return []
if isinstance(name, unicode):
name = name.encode('ascii', 'ignore')
soundexCode = soundex(name)
# If the soundex is None, compare only with the first
# phoneticCode column.
if soundexCode is None:
condition = ISNULL(CompanyName.q.namePcodeNf)
else:
if name.endswith(']'):
condition = CompanyName.q.namePcodeSf == soundexCode
else:
condition = CompanyName.q.namePcodeNf == soundexCode
try:
qr = [(q.id, {'name': q.name, 'country': q.countryCode})
for q in CompanyName.select(condition)]
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to search the database: "%s"' % str(e))
qr[:] = [(x[0], build_company_name(x[1])) for x in qr]
res = scan_company_names(qr, name, results)
res[:] = [x[1] for x in res]
# Purge empty country keys.
returnl = []
for x in res:
tmpd = x[1]
country = tmpd.get('country')
if country is None and 'country' in tmpd:
del tmpd['country']
returnl.append((x[0], tmpd))
return returnl
def get_company_main(self, companyID, results=0):
# Every company information is retrieved from here.
infosets = self.get_company_infoset()
try:
c = CompanyName.get(companyID)
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to get companyID "%s": "%s"' % (companyID, e))
res = {'name': c.name, 'country': c.countryCode}
if res['country'] is None: del res['country']
if not res:
raise IMDbDataAccessError('unable to get companyID "%s"' % \
companyID)
# Collect filmography information.
items = MovieCompanies.select(MovieCompanies.q.companyID == companyID)
if results > 0:
items = items[:results]
filmodata = [(cd.movieID, cd.companyID,
self._compType[cd.companyTypeID], cd.note,
get_movie_data(cd.movieID, self._kind)) for cd in items]
filmodata = _groupListBy(filmodata, 2)
for group in filmodata:
ctype = group[0][2]
for movieID, companyID, ctype, note, movieData in group:
movie = Movie(data=movieData, movieID=movieID,
notes=note or u'', accessSystem=self.accessSystem)
res.setdefault(ctype, []).append(movie)
res.get(ctype, []).sort()
return {'data': res, 'info sets': infosets}
def _search_keyword(self, keyword, results):
constr = OR(Keyword.q.phoneticCode ==
soundex(keyword.encode('ascii', 'ignore')),
CONTAINSSTRING(Keyword.q.keyword, self.toUTF8(keyword)))
return filterSimilarKeywords(keyword,
_iterKeywords(Keyword.select(constr)))[:results]
def _get_keyword(self, keyword, results):
keyID = Keyword.select(Keyword.q.keyword == keyword)
if keyID.count() == 0:
return []
keyID = keyID[0].id
movies = MovieKeyword.select(MovieKeyword.q.keywordID ==
keyID)[:results]
return [(m.movieID, get_movie_data(m.movieID, self._kind))
for m in movies]
def _get_top_bottom_movies(self, kind):
if kind == 'top':
kind = 'top 250 rank'
elif kind == 'bottom':
# Not a refuse: the plain text data files contains only
# the bottom 10 movies.
kind = 'bottom 10 rank'
else:
return []
infoID = InfoType.select(InfoType.q.info == kind)
if infoID.count() == 0:
return []
infoID = infoID[0].id
movies = MovieInfoIdx.select(MovieInfoIdx.q.infoTypeID == infoID)
ml = []
for m in movies:
minfo = get_movie_data(m.movieID, self._kind)
for k in kind, 'votes', 'rating', 'votes distribution':
valueDict = getSingleInfo(MovieInfoIdx, m.movieID,
k, notAList=True)
if k in (kind, 'votes') and k in valueDict:
valueDict[k] = int(valueDict[k])
elif k == 'rating' and k in valueDict:
valueDict[k] = float(valueDict[k])
minfo.update(valueDict)
ml.append((m.movieID, minfo))
sorter = (_cmpBottom, _cmpTop)[kind == 'top 250 rank']
ml.sort(sorter)
return ml
def __del__(self):
"""Ensure that the connection is closed."""
if not hasattr(self, '_connection'): return
self._sql_logger.debug('closing connection to the database')
self._connection.close()