mirror of
https://github.com/moparisthebest/SickRage
synced 2024-11-11 03:45:01 -05:00
cec4ed573d
Switched out sqlite3 libs in favour of SQLAlchemy v0.9, will gradually migrate dialects and scheme to be fully SQLAlchemy compliant for using there ORM with sessions instead of direct. Fixed getEpisode function to stop making unrequired scene number conversions on already converted data thats available now from cache.
320 lines
12 KiB
Python
320 lines
12 KiB
Python
import os
|
|
import sys
|
|
|
|
from cherrypy._cpcompat import HTTPConnection, HTTPSConnection, ntob
|
|
from cherrypy._cpcompat import BytesIO
|
|
|
|
curdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
|
|
has_space_filepath = os.path.join(curdir, 'static', 'has space.html')
|
|
bigfile_filepath = os.path.join(curdir, "static", "bigfile.log")
|
|
BIGFILE_SIZE = 1024 * 1024
|
|
|
|
import cherrypy
|
|
from cherrypy.lib import static
|
|
from cherrypy.test import helper
|
|
|
|
|
|
class StaticTest(helper.CPWebCase):
|
|
|
|
def setup_server():
|
|
if not os.path.exists(has_space_filepath):
|
|
open(has_space_filepath, 'wb').write(ntob('Hello, world\r\n'))
|
|
if not os.path.exists(bigfile_filepath):
|
|
open(bigfile_filepath, 'wb').write(ntob("x" * BIGFILE_SIZE))
|
|
|
|
class Root:
|
|
|
|
def bigfile(self):
|
|
from cherrypy.lib import static
|
|
self.f = static.serve_file(bigfile_filepath)
|
|
return self.f
|
|
bigfile.exposed = True
|
|
bigfile._cp_config = {'response.stream': True}
|
|
|
|
def tell(self):
|
|
if self.f.input.closed:
|
|
return ''
|
|
return repr(self.f.input.tell()).rstrip('L')
|
|
tell.exposed = True
|
|
|
|
def fileobj(self):
|
|
f = open(os.path.join(curdir, 'style.css'), 'rb')
|
|
return static.serve_fileobj(f, content_type='text/css')
|
|
fileobj.exposed = True
|
|
|
|
def bytesio(self):
|
|
f = BytesIO(ntob('Fee\nfie\nfo\nfum'))
|
|
return static.serve_fileobj(f, content_type='text/plain')
|
|
bytesio.exposed = True
|
|
|
|
class Static:
|
|
|
|
def index(self):
|
|
return 'You want the Baron? You can have the Baron!'
|
|
index.exposed = True
|
|
|
|
def dynamic(self):
|
|
return "This is a DYNAMIC page"
|
|
dynamic.exposed = True
|
|
|
|
root = Root()
|
|
root.static = Static()
|
|
|
|
rootconf = {
|
|
'/static': {
|
|
'tools.staticdir.on': True,
|
|
'tools.staticdir.dir': 'static',
|
|
'tools.staticdir.root': curdir,
|
|
},
|
|
'/style.css': {
|
|
'tools.staticfile.on': True,
|
|
'tools.staticfile.filename': os.path.join(curdir, 'style.css'),
|
|
},
|
|
'/docroot': {
|
|
'tools.staticdir.on': True,
|
|
'tools.staticdir.root': curdir,
|
|
'tools.staticdir.dir': 'static',
|
|
'tools.staticdir.index': 'index.html',
|
|
},
|
|
'/error': {
|
|
'tools.staticdir.on': True,
|
|
'request.show_tracebacks': True,
|
|
},
|
|
'/404test': {
|
|
'tools.staticdir.on': True,
|
|
'tools.staticdir.root': curdir,
|
|
'tools.staticdir.dir': 'static',
|
|
'error_page.404': error_page_404,
|
|
}
|
|
}
|
|
rootApp = cherrypy.Application(root)
|
|
rootApp.merge(rootconf)
|
|
|
|
test_app_conf = {
|
|
'/test': {
|
|
'tools.staticdir.index': 'index.html',
|
|
'tools.staticdir.on': True,
|
|
'tools.staticdir.root': curdir,
|
|
'tools.staticdir.dir': 'static',
|
|
},
|
|
}
|
|
testApp = cherrypy.Application(Static())
|
|
testApp.merge(test_app_conf)
|
|
|
|
vhost = cherrypy._cpwsgi.VirtualHost(rootApp, {'virt.net': testApp})
|
|
cherrypy.tree.graft(vhost)
|
|
setup_server = staticmethod(setup_server)
|
|
|
|
def teardown_server():
|
|
for f in (has_space_filepath, bigfile_filepath):
|
|
if os.path.exists(f):
|
|
try:
|
|
os.unlink(f)
|
|
except:
|
|
pass
|
|
teardown_server = staticmethod(teardown_server)
|
|
|
|
def testStatic(self):
|
|
self.getPage("/static/index.html")
|
|
self.assertStatus('200 OK')
|
|
self.assertHeader('Content-Type', 'text/html')
|
|
self.assertBody('Hello, world\r\n')
|
|
|
|
# Using a staticdir.root value in a subdir...
|
|
self.getPage("/docroot/index.html")
|
|
self.assertStatus('200 OK')
|
|
self.assertHeader('Content-Type', 'text/html')
|
|
self.assertBody('Hello, world\r\n')
|
|
|
|
# Check a filename with spaces in it
|
|
self.getPage("/static/has%20space.html")
|
|
self.assertStatus('200 OK')
|
|
self.assertHeader('Content-Type', 'text/html')
|
|
self.assertBody('Hello, world\r\n')
|
|
|
|
self.getPage("/style.css")
|
|
self.assertStatus('200 OK')
|
|
self.assertHeader('Content-Type', 'text/css')
|
|
# Note: The body should be exactly 'Dummy stylesheet\n', but
|
|
# unfortunately some tools such as WinZip sometimes turn \n
|
|
# into \r\n on Windows when extracting the CherryPy tarball so
|
|
# we just check the content
|
|
self.assertMatchesBody('^Dummy stylesheet')
|
|
|
|
def test_fallthrough(self):
|
|
# Test that NotFound will then try dynamic handlers (see [878]).
|
|
self.getPage("/static/dynamic")
|
|
self.assertBody("This is a DYNAMIC page")
|
|
|
|
# Check a directory via fall-through to dynamic handler.
|
|
self.getPage("/static/")
|
|
self.assertStatus('200 OK')
|
|
self.assertHeader('Content-Type', 'text/html;charset=utf-8')
|
|
self.assertBody('You want the Baron? You can have the Baron!')
|
|
|
|
def test_index(self):
|
|
# Check a directory via "staticdir.index".
|
|
self.getPage("/docroot/")
|
|
self.assertStatus('200 OK')
|
|
self.assertHeader('Content-Type', 'text/html')
|
|
self.assertBody('Hello, world\r\n')
|
|
# The same page should be returned even if redirected.
|
|
self.getPage("/docroot")
|
|
self.assertStatus(301)
|
|
self.assertHeader('Location', '%s/docroot/' % self.base())
|
|
self.assertMatchesBody("This resource .* <a href=(['\"])%s/docroot/\\1>"
|
|
"%s/docroot/</a>." % (self.base(), self.base()))
|
|
|
|
def test_config_errors(self):
|
|
# Check that we get an error if no .file or .dir
|
|
self.getPage("/error/thing.html")
|
|
self.assertErrorPage(500)
|
|
if sys.version_info >= (3, 3):
|
|
errmsg = ntob("TypeError: staticdir\(\) missing 2 "
|
|
"required positional arguments")
|
|
else:
|
|
errmsg = ntob("TypeError: staticdir\(\) takes at least 2 "
|
|
"(positional )?arguments \(0 given\)")
|
|
self.assertMatchesBody(errmsg)
|
|
|
|
def test_security(self):
|
|
# Test up-level security
|
|
self.getPage("/static/../../test/style.css")
|
|
self.assertStatus((400, 403))
|
|
|
|
def test_modif(self):
|
|
# Test modified-since on a reasonably-large file
|
|
self.getPage("/static/dirback.jpg")
|
|
self.assertStatus("200 OK")
|
|
lastmod = ""
|
|
for k, v in self.headers:
|
|
if k == 'Last-Modified':
|
|
lastmod = v
|
|
ims = ("If-Modified-Since", lastmod)
|
|
self.getPage("/static/dirback.jpg", headers=[ims])
|
|
self.assertStatus(304)
|
|
self.assertNoHeader("Content-Type")
|
|
self.assertNoHeader("Content-Length")
|
|
self.assertNoHeader("Content-Disposition")
|
|
self.assertBody("")
|
|
|
|
def test_755_vhost(self):
|
|
self.getPage("/test/", [('Host', 'virt.net')])
|
|
self.assertStatus(200)
|
|
self.getPage("/test", [('Host', 'virt.net')])
|
|
self.assertStatus(301)
|
|
self.assertHeader('Location', self.scheme + '://virt.net/test/')
|
|
|
|
def test_serve_fileobj(self):
|
|
self.getPage("/fileobj")
|
|
self.assertStatus('200 OK')
|
|
self.assertHeader('Content-Type', 'text/css;charset=utf-8')
|
|
self.assertMatchesBody('^Dummy stylesheet')
|
|
|
|
def test_serve_bytesio(self):
|
|
self.getPage("/bytesio")
|
|
self.assertStatus('200 OK')
|
|
self.assertHeader('Content-Type', 'text/plain;charset=utf-8')
|
|
self.assertHeader('Content-Length', 14)
|
|
self.assertMatchesBody('Fee\nfie\nfo\nfum')
|
|
|
|
def test_file_stream(self):
|
|
if cherrypy.server.protocol_version != "HTTP/1.1":
|
|
return self.skip()
|
|
|
|
self.PROTOCOL = "HTTP/1.1"
|
|
|
|
# Make an initial request
|
|
self.persistent = True
|
|
conn = self.HTTP_CONN
|
|
conn.putrequest("GET", "/bigfile", skip_host=True)
|
|
conn.putheader("Host", self.HOST)
|
|
conn.endheaders()
|
|
response = conn.response_class(conn.sock, method="GET")
|
|
response.begin()
|
|
self.assertEqual(response.status, 200)
|
|
|
|
body = ntob('')
|
|
remaining = BIGFILE_SIZE
|
|
while remaining > 0:
|
|
data = response.fp.read(65536)
|
|
if not data:
|
|
break
|
|
body += data
|
|
remaining -= len(data)
|
|
|
|
if self.scheme == "https":
|
|
newconn = HTTPSConnection
|
|
else:
|
|
newconn = HTTPConnection
|
|
s, h, b = helper.webtest.openURL(
|
|
ntob("/tell"), headers=[], host=self.HOST, port=self.PORT,
|
|
http_conn=newconn)
|
|
if not b:
|
|
# The file was closed on the server.
|
|
tell_position = BIGFILE_SIZE
|
|
else:
|
|
tell_position = int(b)
|
|
|
|
expected = len(body)
|
|
if tell_position >= BIGFILE_SIZE:
|
|
# We can't exactly control how much content the server asks
|
|
# for.
|
|
# Fudge it by only checking the first half of the reads.
|
|
if expected < (BIGFILE_SIZE / 2):
|
|
self.fail(
|
|
"The file should have advanced to position %r, but "
|
|
"has already advanced to the end of the file. It "
|
|
"may not be streamed as intended, or at the wrong "
|
|
"chunk size (64k)" % expected)
|
|
elif tell_position < expected:
|
|
self.fail(
|
|
"The file should have advanced to position %r, but has "
|
|
"only advanced to position %r. It may not be streamed "
|
|
"as intended, or at the wrong chunk size (65536)" %
|
|
(expected, tell_position))
|
|
|
|
if body != ntob("x" * BIGFILE_SIZE):
|
|
self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." %
|
|
(BIGFILE_SIZE, body[:50], len(body)))
|
|
conn.close()
|
|
|
|
def test_file_stream_deadlock(self):
|
|
if cherrypy.server.protocol_version != "HTTP/1.1":
|
|
return self.skip()
|
|
|
|
self.PROTOCOL = "HTTP/1.1"
|
|
|
|
# Make an initial request but abort early.
|
|
self.persistent = True
|
|
conn = self.HTTP_CONN
|
|
conn.putrequest("GET", "/bigfile", skip_host=True)
|
|
conn.putheader("Host", self.HOST)
|
|
conn.endheaders()
|
|
response = conn.response_class(conn.sock, method="GET")
|
|
response.begin()
|
|
self.assertEqual(response.status, 200)
|
|
body = response.fp.read(65536)
|
|
if body != ntob("x" * len(body)):
|
|
self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." %
|
|
(65536, body[:50], len(body)))
|
|
response.close()
|
|
conn.close()
|
|
|
|
# Make a second request, which should fetch the whole file.
|
|
self.persistent = False
|
|
self.getPage("/bigfile")
|
|
if self.body != ntob("x" * BIGFILE_SIZE):
|
|
self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." %
|
|
(BIGFILE_SIZE, self.body[:50], len(body)))
|
|
|
|
def test_error_page_with_serve_file(self):
|
|
self.getPage("/404test/yunyeen")
|
|
self.assertStatus(404)
|
|
self.assertInBody("I couldn't find that thing")
|
|
|
|
def error_page_404(status, message, traceback, version):
|
|
import os.path
|
|
return static.serve_file(os.path.join(curdir, 'static', '404.html'),
|
|
content_type='text/html')
|