1
0
mirror of https://github.com/moparisthebest/SickRage synced 2025-01-07 03:48:02 -05:00
SickRage/lib/feedcache/test_server.py

242 lines
7.4 KiB
Python
Raw Normal View History

#!/usr/bin/env python
#
# Copyright 2007 Doug Hellmann.
#
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Doug
# Hellmann not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# DOUG HELLMANN DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL DOUG HELLMANN BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
"""Simple HTTP server for testing the feed cache.
"""
__module_id__ = "$Id$"
#
# Import system modules
#
import BaseHTTPServer
import logging
import md5
import threading
import time
import unittest
import urllib
#
# Import local modules
#
#
# Module
#
logger = logging.getLogger('feedcache.test_server')
def make_etag(data):
"""Given a string containing data to be returned to the client,
compute an ETag value for the data.
"""
_md5 = md5.new()
_md5.update(data)
return _md5.hexdigest()
class TestHTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"HTTP request handler which serves the same feed data every time."
FEED_DATA = """<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="en-us">
<title>CacheTest test data</title>
<link href="http://localhost/feedcache/" rel="alternate"></link>
<link href="http://localhost/feedcache/atom/" rel="self"></link>
<id>http://localhost/feedcache/</id>
<updated>2006-10-14T11:00:36Z</updated>
<entry>
<title>single test entry</title>
<link href="http://www.example.com/" rel="alternate"></link>
<updated>2006-10-14T11:00:36Z</updated>
<author>
<name>author goes here</name>
<email>authoremail@example.com</email>
</author>
<id>http://www.example.com/</id>
<summary type="html">description goes here</summary>
<link length="100" href="http://www.example.com/enclosure" type="text/html" rel="enclosure">
</link>
</entry>
</feed>"""
# The data does not change, so save the ETag and modified times
# as class attributes.
ETAG = make_etag(FEED_DATA)
# Calculated using email.utils.formatdate(usegmt=True)
MODIFIED_TIME = 'Sun, 08 Apr 2012 20:16:48 GMT'
def do_GET(self):
"Handle GET requests."
logger.debug('GET %s', self.path)
if self.path == '/shutdown':
# Shortcut to handle stopping the server
logger.debug('Stopping server')
self.server.stop()
self.send_response(200)
else:
# Record the request for tests that count them
self.server.requests.append(self.path)
# Process the request
logger.debug('pre-defined response code: %d', self.server.response)
handler_method_name = 'do_GET_%d' % self.server.response
handler_method = getattr(self, handler_method_name)
handler_method()
return
def do_GET_3xx(self):
"Handle redirects"
if self.path.endswith('/redirected'):
logger.debug('already redirected')
# We have already redirected, so return the data.
return self.do_GET_200()
new_path = self.server.new_path
logger.debug('redirecting to %s', new_path)
self.send_response(self.server.response)
self.send_header('Location', new_path)
return
do_GET_301 = do_GET_3xx
do_GET_302 = do_GET_3xx
do_GET_303 = do_GET_3xx
do_GET_307 = do_GET_3xx
def do_GET_200(self):
logger.debug('Etag: %s' % self.ETAG)
logger.debug('Last-Modified: %s' % self.MODIFIED_TIME)
incoming_etag = self.headers.get('If-None-Match', None)
logger.debug('Incoming ETag: "%s"' % incoming_etag)
incoming_modified = self.headers.get('If-Modified-Since', None)
logger.debug('Incoming If-Modified-Since: %s' % incoming_modified)
send_data = True
# Does the client have the same version of the data we have?
if self.server.apply_modified_headers:
if incoming_etag == self.ETAG:
logger.debug('Response 304, etag')
self.send_response(304)
send_data = False
elif incoming_modified == self.MODIFIED_TIME:
logger.debug('Response 304, modified time')
self.send_response(304)
send_data = False
# Now optionally send the data, if the client needs it
if send_data:
logger.debug('Response 200')
self.send_response(200)
self.send_header('Content-Type', 'application/atom+xml')
logger.debug('Outgoing Etag: %s' % self.ETAG)
self.send_header('ETag', self.ETAG)
logger.debug('Outgoing modified time: %s' % self.MODIFIED_TIME)
self.send_header('Last-Modified', self.MODIFIED_TIME)
self.end_headers()
logger.debug('Sending data')
self.wfile.write(self.FEED_DATA)
return
class TestHTTPServer(BaseHTTPServer.HTTPServer):
"""HTTP Server which counts the number of requests made
and can stop based on client instructions.
"""
def __init__(self, applyModifiedHeaders=True, handler=TestHTTPHandler):
self.apply_modified_headers = applyModifiedHeaders
self.keep_serving = True
self.requests = []
self.setResponse(200)
BaseHTTPServer.HTTPServer.__init__(self, ('', 9999), handler)
return
def setResponse(self, newResponse, newPath=None):
"""Sets the response code to use for future requests, and a new
path to be used as a redirect target, if necessary.
"""
self.response = newResponse
self.new_path = newPath
return
def getNumRequests(self):
"Return the number of requests which have been made on the server."
return len(self.requests)
def stop(self):
"Stop serving requests, after the next request."
self.keep_serving = False
return
def serve_forever(self):
"Main loop for server"
while self.keep_serving:
self.handle_request()
logger.debug('exiting')
return
class HTTPTestBase(unittest.TestCase):
"Base class for tests that use a TestHTTPServer"
TEST_URL = 'http://localhost:9999/'
CACHE_TTL = 0
def setUp(self):
self.server = self.getServer()
self.server_thread = threading.Thread(target=self.server.serve_forever)
# set daemon flag so the tests don't hang if cleanup fails
self.server_thread.setDaemon(True)
self.server_thread.start()
return
def getServer(self):
"Return a web server for the test."
s = TestHTTPServer()
s.setResponse(200)
return s
def tearDown(self):
# Stop the server thread
urllib.urlretrieve(self.TEST_URL + 'shutdown')
time.sleep(1)
self.server.server_close()
self.server_thread.join()
return