parser.sql package (imdb package).
This package provides the IMDbSqlAccessSystem class used to access
IMDb's data through a SQL database. Every database supported by
the SQLObject _AND_ SQLAlchemy Object Relational Managers is available.
the imdb.IMDb function will return an instance of this class when
called with the 'accessSystem' argument set to "sql", "database" or "db".
# FIXME: this whole module was written in a veeery short amount of time.
# The code should be commented, rewritten and cleaned. :-)
import re
import logging
from difflib import SequenceMatcher
from codecs import lookup
from imdb import IMDbBase
from imdb.utils import normalizeName, normalizeTitle, build_title, \
build_name, analyze_name, analyze_title, \
canonicalTitle, canonicalName, re_titleRef, \
build_company_name, re_episodes, _unicodeArticles, \
analyze_company_name, re_year_index, re_nameRef
from imdb.Person import Person
from imdb.Movie import Movie
from imdb.Company import Company
from imdb._exceptions import IMDbDataAccessError, IMDbError
# Logger for miscellaneous functions.
_aux_logger = logging.getLogger('imdbpy.parser.sql.aux')
# =============================
# Things that once upon a time were in imdb.parser.common.locsql.
def titleVariations(title, fromPtdf=0):
"""Build title variations useful for searches; if fromPtdf is true,
the input is assumed to be in the plain text data files format."""
if fromPtdf: title1 = u''
else: title1 = title
title2 = title3 = u''
if fromPtdf or re_year_index.search(title):
# If it appears to have a (year[/imdbIndex]) indication,
# assume that a long imdb canonical name was provided.
titldict = analyze_title(title, canonical=1)
# title1: the canonical name.
title1 = titldict['title']
if titldict['kind'] != 'episode':
# title3: the long imdb canonical name.
if fromPtdf: title3 = title
else: title3 = build_title(titldict, canonical=1, ptdf=1)
title1 = normalizeTitle(title1)
title3 = build_title(titldict, canonical=1, ptdf=1)
# Just a title.
# title1: the canonical title.
title1 = canonicalTitle(title)
title3 = u''
# title2 is title1 without the article, or title1 unchanged.
if title1:
title2 = title1
t2s = title2.split(u', ')
if t2s[-1].lower() in _unicodeArticles:
title2 = u', '.join(t2s[:-1])
_aux_logger.debug('title variations: 1:[%s] 2:[%s] 3:[%s]',
title1, title2, title3)
return title1, title2, title3
re_nameIndex = re.compile(r'\(([IVXLCDM]+)\)')
def nameVariations(name, fromPtdf=0):
"""Build name variations useful for searches; if fromPtdf is true,
the input is assumed to be in the plain text data files format."""
name1 = name2 = name3 = u''
if fromPtdf or re_nameIndex.search(name):
# We've a name with an (imdbIndex)
namedict = analyze_name(name, canonical=1)
# name1 is the name in the canonical format.
name1 = namedict['name']
# name3 is the canonical name with the imdbIndex.
if fromPtdf:
if namedict.has_key('imdbIndex'):
name3 = name
name3 = build_name(namedict, canonical=1)
# name1 is the name in the canonical format.
name1 = canonicalName(name)
name3 = u''
# name2 is the name in the normal format, if it differs from name1.
name2 = normalizeName(name1)
if name1 == name2: name2 = u''
_aux_logger.debug('name variations: 1:[%s] 2:[%s] 3:[%s]',
name1, name2, name3)
return name1, name2, name3
from cutils import ratcliff as _ratcliff
def ratcliff(s1, s2, sm):
"""Return the Ratcliff-Obershelp value between the two strings,
using the C implementation."""
return _ratcliff(s1.encode('latin_1', 'replace'),
s2.encode('latin_1', 'replace'))
except ImportError:
_aux_logger.warn('Unable to import the cutils.ratcliff function.'
' Searching names and titles using the "sql"'
' data access system will be slower.')
def ratcliff(s1, s2, sm):
"""Ratcliff-Obershelp similarity."""
s1len = len(s1)
s2len = len(s2)
if s1len < s2len:
threshold = float(s1len) / s2len
threshold = float(s2len) / s1len
return 0.0
return sm.ratio()
def merge_roles(mop):
"""Merge multiple roles."""
new_list = []
for m in mop:
if m in new_list:
keep_this = new_list[new_list.index(m)]
if not isinstance(keep_this.currentRole, list):
keep_this.currentRole = [keep_this.currentRole]
return new_list
def scan_names(name_list, name1, name2, name3, results=0, ro_thresold=None,
"""Scan a list of names, searching for best matches against
the given variations."""
if ro_thresold is not None: RO_THRESHOLD = ro_thresold
else: RO_THRESHOLD = 0.6
sm1 = SequenceMatcher()
sm2 = SequenceMatcher()
sm3 = SequenceMatcher()
if name2: sm2.set_seq1(name2.lower())
if name3: sm3.set_seq1(name3.lower())
resd = {}
for i, n_data in name_list:
nil = n_data['name']
# XXX: on Symbian, here we get a str; not sure this is the
# right place to fix it.
if isinstance(nil, str):
nil = unicode(nil, 'latin1', 'ignore')
# Distance with the canonical name.
ratios = [ratcliff(name1, nil, sm1) + 0.05]
namesurname = u''
if not _scan_character:
nils = nil.split(', ', 1)
surname = nils[0]
if len(nils) == 2: namesurname = '%s %s' % (nils[1], surname)
nils = nil.split(' ', 1)
surname = nils[-1]
namesurname = nil
if surname != nil:
# Distance with the "Surname" in the database.
ratios.append(ratcliff(name1, surname, sm1))
if not _scan_character:
ratios.append(ratcliff(name1, namesurname, sm1))
if name2:
ratios.append(ratcliff(name2, surname, sm2))
# Distance with the "Name Surname" in the database.
if namesurname:
ratios.append(ratcliff(name2, namesurname, sm2))
if name3:
# Distance with the long imdb canonical name.
build_name(n_data, canonical=1), sm3) + 0.1)
ratio = max(ratios)
if ratio >= RO_THRESHOLD:
if resd.has_key(i):
if ratio > resd[i][0]: resd[i] = (ratio, (i, n_data))
else: resd[i] = (ratio, (i, n_data))
res = resd.values()
if results > 0: res[:] = res[:results]
return res
def scan_titles(titles_list, title1, title2, title3, results=0,
searchingEpisode=0, onlyEpisodes=0, ro_thresold=None):
"""Scan a list of titles, searching for best matches against
the given variations."""
if ro_thresold is not None: RO_THRESHOLD = ro_thresold
else: RO_THRESHOLD = 0.6
sm1 = SequenceMatcher()
sm2 = SequenceMatcher()
sm3 = SequenceMatcher()
if title3:
if title3[-1] == '}': searchingEpisode = 1
hasArt = 0
if title2 != title1: hasArt = 1
resd = {}
for i, t_data in titles_list:
if onlyEpisodes:
if t_data.get('kind') != 'episode':
til = t_data['title']
if til[-1] == ')':
dateIdx = til.rfind('(')
if dateIdx != -1:
til = til[:dateIdx].rstrip()
if not til:
ratio = ratcliff(title1, til, sm1)
if ratio >= RO_THRESHOLD:
resd[i] = (ratio, (i, t_data))
if searchingEpisode:
if t_data.get('kind') != 'episode': continue
elif t_data.get('kind') == 'episode': continue
til = t_data['title']
# XXX: on Symbian, here we get a str; not sure this is the
# right place to fix it.
if isinstance(til, str):
til = unicode(til, 'latin1', 'ignore')
# Distance with the canonical title (with or without article).
# titleS -> titleR
# titleS, the -> titleR, the
if not searchingEpisode:
til = canonicalTitle(til)
ratios = [ratcliff(title1, til, sm1) + 0.05]
# til2 is til without the article, if present.
til2 = til
tils = til2.split(', ')
matchHasArt = 0
if tils[-1].lower() in _unicodeArticles:
til2 = ', '.join(tils[:-1])
matchHasArt = 1
if hasArt and not matchHasArt:
# titleS[, the] -> titleR
ratios.append(ratcliff(title2, til, sm2))
elif matchHasArt and not hasArt:
# titleS -> titleR[, the]
ratios.append(ratcliff(title1, til2, sm1))
ratios = [0.0]
if title3:
# Distance with the long imdb canonical title.
build_title(t_data, canonical=1, ptdf=1), sm3) + 0.1)
ratio = max(ratios)
if ratio >= RO_THRESHOLD:
if resd.has_key(i):
if ratio > resd[i][0]:
resd[i] = (ratio, (i, t_data))
else: resd[i] = (ratio, (i, t_data))
res = resd.values()
if results > 0: res[:] = res[:results]
return res
def scan_company_names(name_list, name1, results=0, ro_thresold=None):
"""Scan a list of company names, searching for best matches against
the given name. Notice that this function takes a list of
strings, and not a list of dictionaries."""
if ro_thresold is not None: RO_THRESHOLD = ro_thresold
else: RO_THRESHOLD = 0.6
sm1 = SequenceMatcher()
resd = {}
withoutCountry = not name1.endswith(']')
for i, n in name_list:
# XXX: on Symbian, here we get a str; not sure this is the
# right place to fix it.
if isinstance(n, str):
n = unicode(n, 'latin1', 'ignore')
o_name = n
var = 0.0
if withoutCountry and n.endswith(']'):
cidx = n.rfind('[')
if cidx != -1:
n = n[:cidx].rstrip()
var = -0.05
# Distance with the company name.
ratio = ratcliff(name1, n, sm1) + var
if ratio >= RO_THRESHOLD:
if resd.has_key(i):
if ratio > resd[i][0]: resd[i] = (ratio,
(i, analyze_company_name(o_name)))
resd[i] = (ratio, (i, analyze_company_name(o_name)))
res = resd.values()
if results > 0: res[:] = res[:results]
return res
from cutils import soundex
except ImportError:
_aux_logger.warn('Unable to import the cutils.soundex function.'
' Searches of movie titles and person names will be'
' a bit slower.')
_translate = dict(B='1', C='2', D='3', F='1', G='2', J='2', K='2', L='4',
M='5', N='5', P='1', Q='2', R='6', S='2', T='3', V='1',
X='2', Z='2')
_translateget = _translate.get
_re_non_ascii = re.compile(r'^[^a-z]*', re.I)
def soundex(s):
"""Return the soundex code for the given string."""
# Maximum length of the soundex code.
s = _re_non_ascii.sub('', s)
if not s: return None
s = s.upper()
soundCode = s[0]
for c in s[1:]:
cw = _translateget(c, '0')
if cw != '0' and soundCode[-1] != cw:
soundCode += cw
return soundCode[:SOUNDEX_LEN] or None
def _sortKeywords(keyword, kwds):
"""Sort a list of keywords, based on the searched one."""
sm = SequenceMatcher()
ratios = [(ratcliff(keyword, k, sm), k) for k in kwds]
checkContained = False
if len(keyword) > 4:
checkContained = True
for idx, data in enumerate(ratios):
ratio, key = data
if key.startswith(keyword):
ratios[idx] = (ratio+0.5, key)
elif checkContained and keyword in key:
ratios[idx] = (ratio+0.3, key)
return [r[1] for r in ratios]
def filterSimilarKeywords(keyword, kwdsIterator):
"""Return a sorted list of keywords similar to the one given."""
seenDict = {}
kwdSndx = soundex(keyword.encode('ascii', 'ignore'))
matches = []
matchesappend = matches.append
checkContained = False
if len(keyword) > 4:
checkContained = True
for movieID, key in kwdsIterator:
if key in seenDict:
seenDict[key] = None
if checkContained and keyword in key:
if kwdSndx == soundex(key.encode('ascii', 'ignore')):
return _sortKeywords(keyword, matches)
# =============================
_litlist = ['screenplay/teleplay', 'novel', 'adaption', 'book',
'production process protocol', 'interviews',
'printed media reviews', 'essays', 'other literature']
_litd = dict([(x, ('literature', x)) for x in _litlist])
_buslist = ['budget', 'weekend gross', 'gross', 'opening weekend', 'rentals',
'admissions', 'filming dates', 'production dates', 'studios',
'copyright holder']
_busd = dict([(x, ('business', x)) for x in _buslist])
def _reGroupDict(d, newgr):
"""Regroup keys in the d dictionary in subdictionaries, based on
the scheme in the newgr dictionary.
E.g.: in the newgr, an entry 'LD label': ('laserdisc', 'label')
tells the _reGroupDict() function to take the entry with
label 'LD label' (as received from the sql database)
and put it in the subsection (another dictionary) named
'laserdisc', using the key 'label'."""
r = {}
newgrks = newgr.keys()
for k, v in d.items():
if k in newgrks:
r.setdefault(newgr[k][0], {})[newgr[k][1]] = v
# A not-so-clearer version:
##r.setdefault(newgr[k][0], {})
##r[newgr[k][0]][newgr[k][1]] = v
else: r[k] = v
return r
def _groupListBy(l, index):
"""Regroup items in a list in a list of lists, grouped by
the value at the given index."""
tmpd = {}
for item in l:
tmpd.setdefault(item[index], []).append(item)
res = tmpd.values()
return res
def sub_dict(d, keys):
"""Return the subdictionary of 'd', with just the keys listed in 'keys'."""
return dict([(k, d[k]) for k in keys if k in d])
def get_movie_data(movieID, kindDict, fromAka=0, _table=None):
"""Return a dictionary containing data about the given movieID;
if fromAka is true, the AkaTitle table is searched; _table is
reserved for the imdbpy2sql.py script."""
if _table is not None:
Table = _table
if not fromAka: Table = Title
else: Table = AkaTitle
m = Table.get(movieID)
mdict = {'title': m.title, 'kind': kindDict[m.kindID],
'year': m.productionYear, 'imdbIndex': m.imdbIndex,
'season': m.seasonNr, 'episode': m.episodeNr}
if not fromAka:
if m.seriesYears is not None:
mdict['series years'] = unicode(m.seriesYears)
if mdict['imdbIndex'] is None: del mdict['imdbIndex']
if mdict['year'] is None: del mdict['year']
mdict['year'] = int(mdict['year'])
except (TypeError, ValueError):
del mdict['year']
if mdict['season'] is None: del mdict['season']
try: mdict['season'] = int(mdict['season'])
except: pass
if mdict['episode'] is None: del mdict['episode']
try: mdict['episode'] = int(mdict['episode'])
except: pass
episodeOfID = m.episodeOfID
if episodeOfID is not None:
ser_dict = get_movie_data(episodeOfID, kindDict, fromAka)
mdict['episode of'] = Movie(data=ser_dict, movieID=episodeOfID,
if fromAka:
ser_note = AkaTitle.get(episodeOfID).note
if ser_note:
mdict['episode of'].notes = ser_note
return mdict
def _iterKeywords(results):
"""Iterate over (key.id, key.keyword) columns of a selection of
the Keyword table."""
for key in results:
yield key.id, key.keyword
def getSingleInfo(table, movieID, infoType, notAList=False):
"""Return a dictionary in the form {infoType: infoListOrString},
retrieving a single set of information about a given movie, from
the specified table."""
infoTypeID = InfoType.select(InfoType.q.info == infoType)
if infoTypeID.count() == 0:
return {}
res = table.select(AND(table.q.movieID == movieID,
table.q.infoTypeID == infoTypeID[0].id))
retList = []
for r in res:
info = r.info
note = r.note
if note:
info += u'::%s' % note
if not retList:
return {}
if not notAList: return {infoType: retList}
else: return {infoType: retList[0]}
def _cmpTop(a, b, what='top 250 rank'):
"""Compare function used to sort top 250/bottom 10 rank."""
av = int(a[1].get(what))
bv = int(b[1].get(what))
if av == bv:
return 0
return (-1, 1)[av > bv]
def _cmpBottom(a, b):
"""Compare function used to sort top 250/bottom 10 rank."""
return _cmpTop(a, b, what='bottom 10 rank')
class IMDbSqlAccessSystem(IMDbBase):
"""The class used to access IMDb's data through a SQL database."""
accessSystem = 'sql'
_sql_logger = logging.getLogger('imdbpy.parser.sql')
def __init__(self, uri, adultSearch=1, useORM=None, *arguments, **keywords):
"""Initialize the access system."""
IMDbBase.__init__(self, *arguments, **keywords)
if useORM is None:
useORM = ('sqlobject', 'sqlalchemy')
if not isinstance(useORM, (tuple, list)):
if ',' in useORM:
useORM = useORM.split(',')
useORM = [useORM]
self.useORM = useORM
nrMods = len(useORM)
_gotError = False
for idx, mod in enumerate(useORM):
mod = mod.strip().lower()
if mod == 'sqlalchemy':
from alchemyadapter import getDBTables, NotFoundError, \
setConnection, AND, OR, IN, \
elif mod == 'sqlobject':
from objectadapter import getDBTables, NotFoundError, \
setConnection, AND, OR, IN, \
self._sql_logger.warn('unknown module "%s"' % mod)
self._sql_logger.info('using %s ORM', mod)
# XXX: look ma'... black magic! It's used to make
# TableClasses and some functions accessible
# through the whole module.
for k, v in [('NotFoundError', NotFoundError),
('AND', AND), ('OR', OR), ('IN', IN),
globals()[k] = v
self.toUTF8 = toUTF8
DB_TABLES = getDBTables(uri)
for t in DB_TABLES:
globals()[t._imdbpyName] = t
if _gotError:
self._sql_logger.warn('falling back to "%s"' % mod)
except ImportError, e:
if idx+1 >= nrMods:
raise IMDbError('unable to use any ORM in %s: %s' % (
str(useORM), str(e)))
self._sql_logger.warn('unable to use "%s": %s' % (mod,
_gotError = True
raise IMDbError('unable to use any ORM in %s' % str(useORM))
# Set the connection to the database.
self._sql_logger.debug('connecting to %s', uri)
self._connection = setConnection(uri, DB_TABLES)
except AssertionError, e:
raise IMDbDataAccessError( \
'unable to connect to the database server; ' + \
'complete message: "%s"' % str(e))
self.Error = self._connection.module.Error
# Maps some IDs to the corresponding strings.
self._kind = {}
self._kindRev = {}
self._sql_logger.debug('reading constants from the database')
for kt in KindType.select():
self._kind[kt.id] = kt.kind
self._kindRev[str(kt.kind)] = kt.id
except self.Error:
# NOTE: you can also get the error, but - at least with
# MySQL - it also contains the password, and I don't
# like the idea to print it out.
raise IMDbDataAccessError( \
'unable to connect to the database server')
self._role = {}
for rl in RoleType.select():
self._role[rl.id] = str(rl.role)
self._info = {}
self._infoRev = {}
for inf in InfoType.select():
self._info[inf.id] = str(inf.info)
self._infoRev[str(inf.info)] = inf.id
self._compType = {}
for cType in CompanyType.select():
self._compType[cType.id] = cType.kind
info = [(it.id, it.info) for it in InfoType.select()]
self._compcast = {}
for cc in CompCastType.select():
self._compcast[cc.id] = str(cc.kind)
self._link = {}
for lt in LinkType.select():
self._link[lt.id] = str(lt.link)
self._moviesubs = {}
# Build self._moviesubs, a dictionary used to rearrange
# the data structure for a movie object.
for vid, vinfo in info:
if not vinfo.startswith('LD '): continue
self._moviesubs[vinfo] = ('laserdisc', vinfo[3:])
def _findRefs(self, o, trefs, nrefs):
"""Find titles or names references in strings."""
if isinstance(o, (unicode, str)):
for title in re_titleRef.findall(o):
a_title = analyze_title(title, canonical=0)
rtitle = build_title(a_title, ptdf=1)
if trefs.has_key(rtitle): continue
movieID = self._getTitleID(rtitle)
if movieID is None:
movieID = self._getTitleID(title)
if movieID is None:
m = Movie(title=rtitle, movieID=movieID,
trefs[rtitle] = m
rtitle2 = canonicalTitle(a_title.get('title', u''))
if rtitle2 and rtitle2 != rtitle and rtitle2 != title:
trefs[rtitle2] = m
if title != rtitle:
trefs[title] = m
for name in re_nameRef.findall(o):
a_name = analyze_name(name, canonical=1)
rname = build_name(a_name, canonical=1)
if nrefs.has_key(rname): continue
personID = self._getNameID(rname)
if personID is None:
personID = self._getNameID(name)
if personID is None: continue
p = Person(name=rname, personID=personID,
nrefs[rname] = p
rname2 = normalizeName(a_name.get('name', u''))
if rname2 and rname2 != rname:
nrefs[rname2] = p
if name != rname and name != rname2:
nrefs[name] = p
elif isinstance(o, (list, tuple)):
for item in o:
self._findRefs(item, trefs, nrefs)
elif isinstance(o, dict):
for value in o.values():
self._findRefs(value, trefs, nrefs)
return (trefs, nrefs)
def _extractRefs(self, o):
"""Scan for titles or names references in strings."""
trefs = {}
nrefs = {}
return self._findRefs(o, trefs, nrefs)
except RuntimeError, e:
# Symbian/python 2.2 has a poor regexp implementation.
import warnings
warnings.warn('RuntimeError in '
"imdb.parser.sql.IMDbSqlAccessSystem; "
"if it's not a recursion limit exceeded and we're not "
"running in a Symbian environment, it's a bug:\n%s" % e)
return (trefs, nrefs)
def _changeAKAencoding(self, akanotes, akatitle):
"""Return akatitle in the correct charset, as specified in
the akanotes field; if akatitle doesn't need to be modified,
return None."""
oti = akanotes.find('(original ')
if oti == -1: return None
ote = akanotes[oti+10:].find(' title)')
if ote != -1:
cs_info = akanotes[oti+10:oti+10+ote].lower().split()
for e in cs_info:
# excludes some strings that clearly are not encoding.
if e in ('script', '', 'cyrillic', 'greek'): continue
if e.startswith('iso-') and e.find('latin') != -1:
e = e[4:].replace('-', '')
lat1 = akatitle.encode('latin_1', 'replace')
return unicode(lat1, e, 'replace')
except (LookupError, ValueError, TypeError):
return None
def _buildNULLCondition(self, col, val):
"""Build a comparison for columns where values can be NULL."""
if val is None:
return ISNULL(col)
if isinstance(val, (int, long)):
return col == val
return col == self.toUTF8(val)
def _getTitleID(self, title):
"""Given a long imdb canonical title, returns a movieID or
None if not found."""
td = analyze_title(title)
condition = None
if td['kind'] == 'episode':
epof = td['episode of']
seriesID = [s.id for s in Title.select(
AND(Title.q.title == self.toUTF8(epof['title']),
Title.q.kindID == self._kindRev[epof['kind']],
if seriesID:
condition = AND(IN(Title.q.episodeOfID, seriesID),
Title.q.title == self.toUTF8(td['title']),
Title.q.kindID == self._kindRev[td['kind']],
if condition is None:
condition = AND(Title.q.title == self.toUTF8(td['title']),
Title.q.kindID == self._kindRev[td['kind']],
res = Title.select(condition)
if res.count() != 1:
return None
except (UnicodeDecodeError, TypeError):
return None
return res[0].id
def _getNameID(self, name):
"""Given a long imdb canonical name, returns a personID or
None if not found."""
nd = analyze_name(name)
res = Name.select(AND(Name.q.name == self.toUTF8(nd['name']),
c = res.count()
if res.count() != 1:
return None
except (UnicodeDecodeError, TypeError):
return None
return res[0].id
def _normalize_movieID(self, movieID):
"""Normalize the given movieID."""
return int(movieID)
except (ValueError, OverflowError):
raise IMDbError('movieID "%s" can\'t be converted to integer' % \
def _normalize_personID(self, personID):
"""Normalize the given personID."""
return int(personID)
except (ValueError, OverflowError):
raise IMDbError('personID "%s" can\'t be converted to integer' % \
def _normalize_characterID(self, characterID):
"""Normalize the given characterID."""
return int(characterID)
except (ValueError, OverflowError):
raise IMDbError('characterID "%s" can\'t be converted to integer' \
% characterID)
def _normalize_companyID(self, companyID):
"""Normalize the given companyID."""
return int(companyID)
except (ValueError, OverflowError):
raise IMDbError('companyID "%s" can\'t be converted to integer' \
% companyID)
def get_imdbMovieID(self, movieID):
"""Translate a movieID in an imdbID.
If not in the database, try an Exact Primary Title search on IMDb;
return None if it's unable to get the imdbID.
try: movie = Title.get(movieID)
except NotFoundError: return None
imdbID = movie.imdbID
if imdbID is not None: return '%07d' % imdbID
m_dict = get_movie_data(movie.id, self._kind)
titline = build_title(m_dict, ptdf=1)
imdbID = self.title2imdbID(titline)
# If the imdbID was retrieved from the web and was not in the
# database, update the database (ignoring errors, because it's
# possibile that the current user has not update privileges).
# There're times when I think I'm a genius; this one of
# those times... <g>
if imdbID is not None:
try: movie.imdbID = int(imdbID)
except: pass
return imdbID
def get_imdbPersonID(self, personID):
"""Translate a personID in an imdbID.
If not in the database, try an Exact Primary Name search on IMDb;
return None if it's unable to get the imdbID.
try: person = Name.get(personID)
except NotFoundError: return None
imdbID = person.imdbID
if imdbID is not None: return '%07d' % imdbID
n_dict = {'name': person.name, 'imdbIndex': person.imdbIndex}
namline = build_name(n_dict, canonical=1)
imdbID = self.name2imdbID(namline)
if imdbID is not None:
try: person.imdbID = int(imdbID)
except: pass
return imdbID
def get_imdbCharacterID(self, characterID):
"""Translate a characterID in an imdbID.
If not in the database, try an Exact Primary Name search on IMDb;
return None if it's unable to get the imdbID.
try: character = CharName.get(characterID)
except NotFoundError: return None
imdbID = character.imdbID
if imdbID is not None: return '%07d' % imdbID
n_dict = {'name': character.name, 'imdbIndex': character.imdbIndex}
namline = build_name(n_dict, canonical=1)
imdbID = self.character2imdbID(namline)
if imdbID is not None:
try: character.imdbID = int(imdbID)
except: pass
return imdbID
def get_imdbCompanyID(self, companyID):
"""Translate a companyID in an imdbID.
If not in the database, try an Exact Primary Name search on IMDb;
return None if it's unable to get the imdbID.
try: company = CompanyName.get(companyID)
except NotFoundError: return None
imdbID = company.imdbID
if imdbID is not None: return '%07d' % imdbID
n_dict = {'name': company.name, 'country': company.countryCode}
namline = build_company_name(n_dict)
imdbID = self.company2imdbID(namline)
if imdbID is not None:
try: company.imdbID = int(imdbID)
except: pass
return imdbID
def do_adult_search(self, doAdult):
"""If set to 0 or False, movies in the Adult category are not
episodeOf = title_dict.get('episode of')
shown in the results of a search."""
self.doAdult = doAdult
def _search_movie(self, title, results, _episodes=False):
title = title.strip()
if not title: return []
title_dict = analyze_title(title, canonical=1)
s_title = title_dict['title']
if not s_title: return []
episodeOf = title_dict.get('episode of')
if episodeOf:
_episodes = False
s_title_split = s_title.split(', ')
if len(s_title_split) > 1 and \
s_title_split[-1].lower() in _unicodeArticles:
s_title_rebuilt = ', '.join(s_title_split[:-1])
if s_title_rebuilt:
s_title = s_title_rebuilt
#if not episodeOf:
# if not _episodes:
# s_title_split = s_title.split(', ')
# if len(s_title_split) > 1 and \
# s_title_split[-1].lower() in _articles:
# s_title_rebuilt = ', '.join(s_title_split[:-1])
# if s_title_rebuilt:
# s_title = s_title_rebuilt
# _episodes = False
if isinstance(s_title, unicode):
s_title = s_title.encode('ascii', 'ignore')
soundexCode = soundex(s_title)
# XXX: improve the search restricting the kindID if the
# "kind" of the input differs from "movie"?
condition = conditionAka = None
if _episodes:
condition = AND(Title.q.phoneticCode == soundexCode,
Title.q.kindID == self._kindRev['episode'])
conditionAka = AND(AkaTitle.q.phoneticCode == soundexCode,
AkaTitle.q.kindID == self._kindRev['episode'])
elif title_dict['kind'] == 'episode' and episodeOf is not None:
# set canonical=0 ? Should not make much difference.
series_title = build_title(episodeOf, canonical=1)
# XXX: is it safe to get "results" results?
# Too many? Too few?
serRes = results
if serRes < 3 or serRes > 10:
serRes = 10
searchSeries = self._search_movie(series_title, serRes)
seriesIDs = [result[0] for result in searchSeries]
if seriesIDs:
condition = AND(Title.q.phoneticCode == soundexCode,
IN(Title.q.episodeOfID, seriesIDs),
Title.q.kindID == self._kindRev['episode'])
conditionAka = AND(AkaTitle.q.phoneticCode == soundexCode,
IN(AkaTitle.q.episodeOfID, seriesIDs),
AkaTitle.q.kindID == self._kindRev['episode'])
# XXX: bad situation: we have found no matching series;
# try searching everything (both episodes and
# non-episodes) for the title.
condition = AND(Title.q.phoneticCode == soundexCode,
IN(Title.q.episodeOfID, seriesIDs))
conditionAka = AND(AkaTitle.q.phoneticCode == soundexCode,
IN(AkaTitle.q.episodeOfID, seriesIDs))
if condition is None:
# XXX: excludes episodes?
condition = AND(Title.q.kindID != self._kindRev['episode'],
Title.q.phoneticCode == soundexCode)
conditionAka = AND(AkaTitle.q.kindID != self._kindRev['episode'],
AkaTitle.q.phoneticCode == soundexCode)
# Up to 3 variations of the title are searched, plus the
# long imdb canonical title, if provided.
if not _episodes:
title1, title2, title3 = titleVariations(title)
title1 = title
title2 = ''
title3 = ''
qr = [(q.id, get_movie_data(q.id, self._kind))
for q in Title.select(condition)]
q2 = [(q.movieID, get_movie_data(q.id, self._kind, fromAka=1))
for q in AkaTitle.select(conditionAka)]
qr += q2
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to search the database: "%s"' % str(e))
resultsST = results * 3
res = scan_titles(qr, title1, title2, title3, resultsST,
searchingEpisode=episodeOf is not None,
res[:] = [x[1] for x in res]
if res and not self.doAdult:
mids = [x[0] for x in res]
genreID = self._infoRev['genres']
adultlist = [al.movieID for al
in MovieInfo.select(
AND(MovieInfo.q.infoTypeID == genreID,
MovieInfo.q.info == 'Adult',
IN(MovieInfo.q.movieID, mids)))]
res[:] = [x for x in res if x[0] not in adultlist]
new_res = []
# XXX: can there be duplicates?
for r in res:
if r not in q2:
mdict = r[1]
aka_title = build_title(mdict, ptdf=1)
orig_dict = get_movie_data(r[0], self._kind)
orig_title = build_title(orig_dict, ptdf=1)
if aka_title == orig_title:
orig_dict['akas'] = [aka_title]
new_res.append((r[0], orig_dict))
if results > 0: new_res[:] = new_res[:results]
return new_res
def _search_episode(self, title, results):
return self._search_movie(title, results, _episodes=True)
def get_movie_main(self, movieID):
# Every movie information is retrieved from here.
infosets = self.get_movie_infoset()
res = get_movie_data(movieID, self._kind)
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to get movieID "%s": "%s"' % (movieID, str(e)))
if not res:
raise IMDbDataAccessError('unable to get movieID "%s"' % movieID)
# Collect cast information.
castdata = [[cd.personID, cd.personRoleID, cd.note, cd.nrOrder,
for cd in CastInfo.select(CastInfo.q.movieID == movieID)]
for p in castdata:
person = Name.get(p[0])
p += [person.name, person.imdbIndex]
if p[4] in ('actor', 'actress'):
p[4] = 'cast'
# Regroup by role/duty (cast, writer, director, ...)
castdata[:] = _groupListBy(castdata, 4)
for group in castdata:
duty = group[0][4]
for pdata in group:
curRole = pdata[1]
curRoleID = None
if curRole is not None:
robj = CharName.get(curRole)
curRole = robj.name
curRoleID = robj.id
p = Person(personID=pdata[0], name=pdata[5],
currentRole=curRole or u'',
notes=pdata[2] or u'',
if pdata[6]: p['imdbIndex'] = pdata[6]
p.billingPos = pdata[3]
res.setdefault(duty, []).append(p)
if duty == 'cast':
res[duty] = merge_roles(res[duty])
# Info about the movie.
minfo = [(self._info[m.infoTypeID], m.info, m.note)
for m in MovieInfo.select(MovieInfo.q.movieID == movieID)]
minfo += [(self._info[m.infoTypeID], m.info, m.note)
for m in MovieInfoIdx.select(MovieInfoIdx.q.movieID == movieID)]
minfo += [('keywords', Keyword.get(m.keywordID).keyword, None)
for m in MovieKeyword.select(MovieKeyword.q.movieID == movieID)]
minfo = _groupListBy(minfo, 0)
for group in minfo:
sect = group[0][0]
for mdata in group:
data = mdata[1]
if mdata[2]: data += '::%s' % mdata[2]
res.setdefault(sect, []).append(data)
# Companies info about a movie.
cinfo = [(self._compType[m.companyTypeID], m.companyID, m.note) for m
in MovieCompanies.select(MovieCompanies.q.movieID == movieID)]
cinfo = _groupListBy(cinfo, 0)
for group in cinfo:
sect = group[0][0]
for mdata in group:
cDb = CompanyName.get(mdata[1])
cDbTxt = cDb.name
if cDb.countryCode:
cDbTxt += ' %s' % cDb.countryCode
company = Company(name=cDbTxt,
notes=mdata[2] or u'',
res.setdefault(sect, []).append(company)
# AKA titles.
akat = [(get_movie_data(at.id, self._kind, fromAka=1), at.note)
for at in AkaTitle.select(AkaTitle.q.movieID == movieID)]
if akat:
res['akas'] = []
for td, note in akat:
nt = build_title(td, ptdf=1)
if note:
net = self._changeAKAencoding(note, nt)
if net is not None: nt = net
nt += '::%s' % note
if nt not in res['akas']: res['akas'].append(nt)
# Complete cast/crew.
compcast = [(self._compcast[cc.subjectID], self._compcast[cc.statusID])
for cc in CompleteCast.select(CompleteCast.q.movieID == movieID)]
if compcast:
for entry in compcast:
val = unicode(entry[1])
res[u'complete %s' % entry[0]] = val
# Movie connections.
mlinks = [[ml.linkedMovieID, self._link[ml.linkTypeID]]
for ml in MovieLink.select(MovieLink.q.movieID == movieID)]
if mlinks:
for ml in mlinks:
lmovieData = get_movie_data(ml[0], self._kind)
m = Movie(movieID=ml[0], data=lmovieData, accessSystem='sql')
ml[0] = m
res['connections'] = {}
mlinks[:] = _groupListBy(mlinks, 1)
for group in mlinks:
lt = group[0][1]
res['connections'][lt] = [i[0] for i in group]
# Episodes.
episodes = {}
eps_list = list(Title.select(Title.q.episodeOfID == movieID))
if eps_list:
ps_data = {'title': res['title'], 'kind': res['kind'],
'year': res.get('year'),
'imdbIndex': res.get('imdbIndex')}
parentSeries = Movie(movieID=movieID, data=ps_data,
for episode in eps_list:
episodeID = episode.id
episode_data = get_movie_data(episodeID, self._kind)
m = Movie(movieID=episodeID, data=episode_data,
m['episode of'] = parentSeries
season = episode_data.get('season', 'UNKNOWN')
if season not in episodes: episodes[season] = {}
ep_number = episode_data.get('episode')
if ep_number is None:
ep_number = max((episodes[season].keys() or [0])) + 1
episodes[season][ep_number] = m
res['episodes'] = episodes
res['number of episodes'] = sum([len(x) for x in episodes.values()])
res['number of seasons'] = len(episodes.keys())
# Regroup laserdisc information.
res = _reGroupDict(res, self._moviesubs)
# Do some transformation to preserve consistency with other
# data access systems.
if 'quotes' in res:
for idx, quote in enumerate(res['quotes']):
res['quotes'][idx] = quote.split('::')
if 'runtimes' in res and len(res['runtimes']) > 0:
rt = res['runtimes'][0]
episodes = re_episodes.findall(rt)
if episodes:
res['runtimes'][0] = re_episodes.sub('', rt)
if res['runtimes'][0][-2:] == '::':
res['runtimes'][0] = res['runtimes'][0][:-2]
if 'votes' in res:
res['votes'] = int(res['votes'][0])
if 'rating' in res:
res['rating'] = float(res['rating'][0])
if 'votes distribution' in res:
res['votes distribution'] = res['votes distribution'][0]
if 'mpaa' in res:
res['mpaa'] = res['mpaa'][0]
if 'top 250 rank' in res:
try: res['top 250 rank'] = int(res['top 250 rank'])
except: pass
if 'bottom 10 rank' in res:
try: res['bottom 100 rank'] = int(res['bottom 10 rank'])
except: pass
del res['bottom 10 rank']
for old, new in [('guest', 'guests'), ('trademarks', 'trade-mark'),
('articles', 'article'), ('pictorials', 'pictorial'),
('magazine-covers', 'magazine-cover-photo')]:
if old in res:
res[new] = res[old]
del res[old]
trefs,nrefs = {}, {}
trefs,nrefs = self._extractRefs(sub_dict(res,Movie.keys_tomodify_list))
return {'data': res, 'titlesRefs': trefs, 'namesRefs': nrefs,
'info sets': infosets}
# Just to know what kind of information are available.
get_movie_alternate_versions = get_movie_main
get_movie_business = get_movie_main
get_movie_connections = get_movie_main
get_movie_crazy_credits = get_movie_main
get_movie_goofs = get_movie_main
get_movie_keywords = get_movie_main
get_movie_literature = get_movie_main
get_movie_locations = get_movie_main
get_movie_plot = get_movie_main
get_movie_quotes = get_movie_main
get_movie_release_dates = get_movie_main
get_movie_soundtrack = get_movie_main
get_movie_taglines = get_movie_main
get_movie_technical = get_movie_main
get_movie_trivia = get_movie_main
get_movie_vote_details = get_movie_main
get_movie_episodes = get_movie_main
def _search_person(self, name, results):
name = name.strip()
if not name: return []
s_name = analyze_name(name)['name']
if not s_name: return []
if isinstance(s_name, unicode):
s_name = s_name.encode('ascii', 'ignore')
soundexCode = soundex(s_name)
name1, name2, name3 = nameVariations(name)
# If the soundex is None, compare only with the first
# phoneticCode column.
if soundexCode is not None:
condition = IN(soundexCode, [Name.q.namePcodeCf,
conditionAka = IN(soundexCode, [AkaName.q.namePcodeCf,
condition = ISNULL(Name.q.namePcodeCf)
conditionAka = ISNULL(AkaName.q.namePcodeCf)
qr = [(q.id, {'name': q.name, 'imdbIndex': q.imdbIndex})
for q in Name.select(condition)]
q2 = [(q.personID, {'name': q.name, 'imdbIndex': q.imdbIndex})
for q in AkaName.select(conditionAka)]
qr += q2
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to search the database: "%s"' % str(e))
res = scan_names(qr, name1, name2, name3, results)
res[:] = [x[1] for x in res]
# Purge empty imdbIndex.
returnl = []
for x in res:
tmpd = x[1]
if tmpd['imdbIndex'] is None:
del tmpd['imdbIndex']
returnl.append((x[0], tmpd))
new_res = []
# XXX: can there be duplicates?
for r in returnl:
if r not in q2:
pdict = r[1]
aka_name = build_name(pdict, canonical=1)
p = Name.get(r[0])
orig_dict = {'name': p.name, 'imdbIndex': p.imdbIndex}
if orig_dict['imdbIndex'] is None:
del orig_dict['imdbIndex']
orig_name = build_name(orig_dict, canonical=1)
if aka_name == orig_name:
orig_dict['akas'] = [aka_name]
new_res.append((r[0], orig_dict))
if results > 0: new_res[:] = new_res[:results]
return new_res
def get_person_main(self, personID):
# Every person information is retrieved from here.
infosets = self.get_person_infoset()
p = Name.get(personID)
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to get personID "%s": "%s"' % (personID, str(e)))
res = {'name': p.name, 'imdbIndex': p.imdbIndex}
if res['imdbIndex'] is None: del res['imdbIndex']
if not res:
raise IMDbDataAccessError('unable to get personID "%s"' % personID)
# Collect cast information.
castdata = [(cd.movieID, cd.personRoleID, cd.note,
get_movie_data(cd.movieID, self._kind))
for cd in CastInfo.select(CastInfo.q.personID == personID)]
# Regroup by role/duty (cast, writer, director, ...)
castdata[:] = _groupListBy(castdata, 3)
episodes = {}
seenDuties = []
for group in castdata:
for mdata in group:
duty = orig_duty = group[0][3]
if duty not in seenDuties: seenDuties.append(orig_duty)
note = mdata[2] or u''
if 'episode of' in mdata[4]:
duty = 'episodes'
if orig_duty not in ('actor', 'actress'):
if note: note = ' %s' % note
note = '[%s]%s' % (orig_duty, note)
curRole = mdata[1]
curRoleID = None
if curRole is not None:
robj = CharName.get(curRole)
curRole = robj.name
curRoleID = robj.id
m = Movie(movieID=mdata[0], data=mdata[4],
currentRole=curRole or u'',
notes=note, accessSystem='sql')
if duty != 'episodes':
res.setdefault(duty, []).append(m)
episodes.setdefault(m['episode of'], []).append(m)
if episodes:
for k in episodes:
res['episodes'] = episodes
for duty in seenDuties:
if duty in res:
if duty in ('actor', 'actress', 'himself', 'herself',
res[duty] = merge_roles(res[duty])
# Info about the person.
pinfo = [(self._info[pi.infoTypeID], pi.info, pi.note)
for pi in PersonInfo.select(PersonInfo.q.personID == personID)]
# Regroup by duty.
pinfo = _groupListBy(pinfo, 0)
for group in pinfo:
sect = group[0][0]
for pdata in group:
data = pdata[1]
if pdata[2]: data += '::%s' % pdata[2]
res.setdefault(sect, []).append(data)
# AKA names.
akan = [(an.name, an.imdbIndex)
for an in AkaName.select(AkaName.q.personID == personID)]
if akan:
res['akas'] = []
for n in akan:
nd = {'name': n[0]}
if n[1]: nd['imdbIndex'] = n[1]
nt = build_name(nd, canonical=1)
# Do some transformation to preserve consistency with other
# data access systems.
for key in ('birth date', 'birth notes', 'death date', 'death notes',
'birth name', 'height'):
if key in res:
res[key] = res[key][0]
if 'guest' in res:
res['notable tv guest appearances'] = res['guest']
del res['guest']
miscnames = res.get('nick names', [])
if 'birth name' in res: miscnames.append(res['birth name'])
if 'akas' in res:
for mname in miscnames:
if mname in res['akas']: res['akas'].remove(mname)
if not res['akas']: del res['akas']
trefs,nrefs = self._extractRefs(sub_dict(res,Person.keys_tomodify_list))
return {'data': res, 'titlesRefs': trefs, 'namesRefs': nrefs,
'info sets': infosets}
# Just to know what kind of information are available.
get_person_filmography = get_person_main
get_person_biography = get_person_main
get_person_other_works = get_person_main
get_person_episodes = get_person_main
def _search_character(self, name, results):
name = name.strip()
if not name: return []
s_name = analyze_name(name)['name']
if not s_name: return []
if isinstance(s_name, unicode):
s_name = s_name.encode('ascii', 'ignore')
s_name = normalizeName(s_name)
soundexCode = soundex(s_name)
surname = s_name.split(' ')[-1]
surnameSoundex = soundex(surname)
name2 = ''
soundexName2 = None
nsplit = s_name.split()
if len(nsplit) > 1:
name2 = '%s %s' % (nsplit[-1], ' '.join(nsplit[:-1]))
if s_name == name2:
name2 = ''
soundexName2 = soundex(name2)
# If the soundex is None, compare only with the first
# phoneticCode column.
if soundexCode is not None:
if soundexName2 is not None:
condition = OR(surnameSoundex == CharName.q.surnamePcode,
IN(CharName.q.namePcodeNf, [soundexCode,
IN(CharName.q.surnamePcode, [soundexCode,
condition = OR(surnameSoundex == CharName.q.surnamePcode,
IN(soundexCode, [CharName.q.namePcodeNf,
condition = ISNULL(Name.q.namePcodeNf)
qr = [(q.id, {'name': q.name, 'imdbIndex': q.imdbIndex})
for q in CharName.select(condition)]
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to search the database: "%s"' % str(e))
res = scan_names(qr, s_name, name2, '', results,
res[:] = [x[1] for x in res]
# Purge empty imdbIndex.
returnl = []
for x in res:
tmpd = x[1]
if tmpd['imdbIndex'] is None:
del tmpd['imdbIndex']
returnl.append((x[0], tmpd))
return returnl
def get_character_main(self, characterID, results=1000):
# Every character information is retrieved from here.
infosets = self.get_character_infoset()
c = CharName.get(characterID)
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to get characterID "%s": "%s"' % (characterID, e))
res = {'name': c.name, 'imdbIndex': c.imdbIndex}
if res['imdbIndex'] is None: del res['imdbIndex']
if not res:
raise IMDbDataAccessError('unable to get characterID "%s"' % \
# Collect filmography information.
items = CastInfo.select(CastInfo.q.personRoleID == characterID)
if results > 0:
items = items[:results]
filmodata = [(cd.movieID, cd.personID, cd.note,
get_movie_data(cd.movieID, self._kind)) for cd in items
if self._role[cd.roleID] in ('actor', 'actress')]
fdata = []
for f in filmodata:
curRole = None
curRoleID = f[1]
note = f[2] or u''
if curRoleID is not None:
robj = Name.get(curRoleID)
curRole = robj.name
m = Movie(movieID=f[0], data=f[3],
currentRole=curRole or u'',
roleID=curRoleID, roleIsPerson=True,
notes=note, accessSystem='sql')
fdata = merge_roles(fdata)
if fdata:
res['filmography'] = fdata
return {'data': res, 'info sets': infosets}
get_character_filmography = get_character_main
get_character_biography = get_character_main
def _search_company(self, name, results):
name = name.strip()
if not name: return []
if isinstance(name, unicode):
name = name.encode('ascii', 'ignore')
soundexCode = soundex(name)
# If the soundex is None, compare only with the first
# phoneticCode column.
if soundexCode is None:
condition = ISNULL(CompanyName.q.namePcodeNf)
if name.endswith(']'):
condition = CompanyName.q.namePcodeSf == soundexCode
condition = CompanyName.q.namePcodeNf == soundexCode
qr = [(q.id, {'name': q.name, 'country': q.countryCode})
for q in CompanyName.select(condition)]
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to search the database: "%s"' % str(e))
qr[:] = [(x[0], build_company_name(x[1])) for x in qr]
res = scan_company_names(qr, name, results)
res[:] = [x[1] for x in res]
# Purge empty country keys.
returnl = []
for x in res:
tmpd = x[1]
country = tmpd.get('country')
if country is None and 'country' in tmpd:
del tmpd['country']
returnl.append((x[0], tmpd))
return returnl
def get_company_main(self, companyID, results=0):
# Every company information is retrieved from here.
infosets = self.get_company_infoset()
c = CompanyName.get(companyID)
except NotFoundError, e:
raise IMDbDataAccessError( \
'unable to get companyID "%s": "%s"' % (companyID, e))
res = {'name': c.name, 'country': c.countryCode}
if res['country'] is None: del res['country']
if not res:
raise IMDbDataAccessError('unable to get companyID "%s"' % \
# Collect filmography information.
items = MovieCompanies.select(MovieCompanies.q.companyID == companyID)
if results > 0:
items = items[:results]
filmodata = [(cd.movieID, cd.companyID,
self._compType[cd.companyTypeID], cd.note,
get_movie_data(cd.movieID, self._kind)) for cd in items]
filmodata = _groupListBy(filmodata, 2)
for group in filmodata:
ctype = group[0][2]
for movieID, companyID, ctype, note, movieData in group:
movie = Movie(data=movieData, movieID=movieID,
notes=note or u'', accessSystem=self.accessSystem)
res.setdefault(ctype, []).append(movie)
res.get(ctype, []).sort()
return {'data': res, 'info sets': infosets}
def _search_keyword(self, keyword, results):
constr = OR(Keyword.q.phoneticCode ==
soundex(keyword.encode('ascii', 'ignore')),
CONTAINSSTRING(Keyword.q.keyword, self.toUTF8(keyword)))
return filterSimilarKeywords(keyword,
def _get_keyword(self, keyword, results):
keyID = Keyword.select(Keyword.q.keyword == keyword)
if keyID.count() == 0:
return []
keyID = keyID[0].id
movies = MovieKeyword.select(MovieKeyword.q.keywordID ==
return [(m.movieID, get_movie_data(m.movieID, self._kind))
for m in movies]
def _get_top_bottom_movies(self, kind):
if kind == 'top':
kind = 'top 250 rank'
elif kind == 'bottom':
# Not a refuse: the plain text data files contains only
# the bottom 10 movies.
kind = 'bottom 10 rank'
return []
infoID = InfoType.select(InfoType.q.info == kind)
if infoID.count() == 0:
return []
infoID = infoID[0].id
movies = MovieInfoIdx.select(MovieInfoIdx.q.infoTypeID == infoID)
ml = []
for m in movies:
minfo = get_movie_data(m.movieID, self._kind)
for k in kind, 'votes', 'rating', 'votes distribution':
valueDict = getSingleInfo(MovieInfoIdx, m.movieID,
k, notAList=True)
if k in (kind, 'votes') and k in valueDict:
valueDict[k] = int(valueDict[k])
elif k == 'rating' and k in valueDict:
valueDict[k] = float(valueDict[k])
ml.append((m.movieID, minfo))
sorter = (_cmpBottom, _cmpTop)[kind == 'top 250 rank']
return ml
def __del__(self):
"""Ensure that the connection is closed."""
if not hasattr(self, '_connection'): return
self._sql_logger.debug('closing connection to the database')