mirror of
https://github.com/moparisthebest/SickRage
synced 2024-12-12 11:02:21 -05:00
271 lines
11 KiB
Python
271 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# ########################## Copyrights and license ############################
|
|
# #
|
|
# Copyright 2012 Vincent Jacques <vincent@vincent-jacques.net> #
|
|
# Copyright 2012 Zearin <zearin@gonk.net> #
|
|
# Copyright 2013 AKFish <akfish@gmail.com> #
|
|
# Copyright 2013 Vincent Jacques <vincent@vincent-jacques.net> #
|
|
# #
|
|
# This file is part of PyGithub. http://jacquev6.github.com/PyGithub/ #
|
|
# #
|
|
# PyGithub is free software: you can redistribute it and/or modify it under #
|
|
# the terms of the GNU Lesser General Public License as published by the Free #
|
|
# Software Foundation, either version 3 of the License, or (at your option) #
|
|
# any later version. #
|
|
# #
|
|
# PyGithub is distributed in the hope that it will be useful, but WITHOUT ANY #
|
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
|
|
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more #
|
|
# details. #
|
|
# #
|
|
# You should have received a copy of the GNU Lesser General Public License #
|
|
# along with PyGithub. If not, see <http://www.gnu.org/licenses/>. #
|
|
# #
|
|
# ##############################################################################
|
|
|
|
import os
|
|
import sys
|
|
import unittest
|
|
import httplib
|
|
import traceback
|
|
|
|
import github
|
|
|
|
atLeastPython26 = sys.hexversion >= 0x02060000
|
|
atLeastPython3 = sys.hexversion >= 0x03000000
|
|
atMostPython32 = sys.hexversion < 0x03030000
|
|
|
|
if atLeastPython26:
|
|
import json
|
|
else: # pragma no cover (Covered by all tests with Python 2.5)
|
|
import simplejson as json # pragma no cover (Covered by all tests with Python 2.5)
|
|
|
|
|
|
def readLine(file):
|
|
if atLeastPython3:
|
|
return file.readline().decode("utf-8").strip()
|
|
else:
|
|
return file.readline().strip()
|
|
|
|
|
|
class FakeHttpResponse:
|
|
def __init__(self, status, headers, output):
|
|
self.status = status
|
|
self.__headers = headers
|
|
self.__output = output
|
|
|
|
def getheaders(self):
|
|
return self.__headers
|
|
|
|
def read(self):
|
|
return self.__output
|
|
|
|
|
|
def fixAuthorizationHeader(headers):
|
|
if "Authorization" in headers:
|
|
if headers["Authorization"].endswith("ZmFrZV9sb2dpbjpmYWtlX3Bhc3N3b3Jk"):
|
|
# This special case is here to test the real Authorization header
|
|
# sent by PyGithub. It would have avoided issue https://github.com/jacquev6/PyGithub/issues/153
|
|
# because we would have seen that Python 3 was not generating the same
|
|
# header as Python 2
|
|
pass
|
|
elif headers["Authorization"].startswith("token "):
|
|
headers["Authorization"] = "token private_token_removed"
|
|
elif headers["Authorization"].startswith("Basic "):
|
|
headers["Authorization"] = "Basic login_and_password_removed"
|
|
|
|
|
|
class RecordingConnection: # pragma no cover (Class useful only when recording new tests, not used during automated tests)
|
|
def __init__(self, file, protocol, host, port, *args, **kwds):
|
|
self.__file = file
|
|
self.__protocol = protocol
|
|
self.__host = host
|
|
self.__port = str(port)
|
|
self.__cnx = self._realConnection(host, port, *args, **kwds)
|
|
|
|
def request(self, verb, url, input, headers):
|
|
print verb, url, input, headers,
|
|
self.__cnx.request(verb, url, input, headers)
|
|
fixAuthorizationHeader(headers)
|
|
self.__writeLine(self.__protocol)
|
|
self.__writeLine(verb)
|
|
self.__writeLine(self.__host)
|
|
self.__writeLine(self.__port)
|
|
self.__writeLine(url)
|
|
self.__writeLine(str(headers))
|
|
self.__writeLine(input.replace('\n', '').replace('\r', ''))
|
|
|
|
def getresponse(self):
|
|
res = self.__cnx.getresponse()
|
|
|
|
status = res.status
|
|
print "=>", status
|
|
headers = res.getheaders()
|
|
output = res.read()
|
|
|
|
self.__writeLine(str(status))
|
|
self.__writeLine(str(headers))
|
|
self.__writeLine(str(output))
|
|
|
|
return FakeHttpResponse(status, headers, output)
|
|
|
|
def close(self):
|
|
self.__writeLine("")
|
|
return self.__cnx.close()
|
|
|
|
def __writeLine(self, line):
|
|
self.__file.write(line + "\n")
|
|
|
|
|
|
class RecordingHttpConnection(RecordingConnection): # pragma no cover (Class useful only when recording new tests, not used during automated tests)
|
|
_realConnection = httplib.HTTPConnection
|
|
|
|
def __init__(self, file, *args, **kwds):
|
|
RecordingConnection.__init__(self, file, "http", *args, **kwds)
|
|
|
|
|
|
class RecordingHttpsConnection(RecordingConnection): # pragma no cover (Class useful only when recording new tests, not used during automated tests)
|
|
_realConnection = httplib.HTTPSConnection
|
|
|
|
def __init__(self, file, *args, **kwds):
|
|
RecordingConnection.__init__(self, file, "https", *args, **kwds)
|
|
|
|
|
|
class ReplayingConnection:
|
|
def __init__(self, testCase, file, protocol, host, port, *args, **kwds):
|
|
self.__testCase = testCase
|
|
self.__file = file
|
|
self.__protocol = protocol
|
|
self.__host = host
|
|
self.__port = str(port)
|
|
|
|
def request(self, verb, url, input, headers):
|
|
fixAuthorizationHeader(headers)
|
|
self.__testCase.assertEqual(self.__protocol, readLine(self.__file))
|
|
self.__testCase.assertEqual(verb, readLine(self.__file))
|
|
self.__testCase.assertEqual(self.__host, readLine(self.__file))
|
|
self.__testCase.assertEqual(self.__port, readLine(self.__file))
|
|
self.__testCase.assertEqual(self.__splitUrl(url), self.__splitUrl(readLine(self.__file)))
|
|
self.__testCase.assertEqual(headers, eval(readLine(self.__file)))
|
|
expectedInput = readLine(self.__file)
|
|
if input.startswith("{"):
|
|
self.__testCase.assertEqual(json.loads(input.replace('\n', '').replace('\r', '')), json.loads(expectedInput))
|
|
elif atMostPython32: # @todo Test in all cases, including Python 3.3
|
|
# In Python 3.3, dicts are not output in the same order as in Python 2.5 -> 3.2.
|
|
# So, form-data encoding is not deterministic and is difficult to test.
|
|
self.__testCase.assertEqual(input.replace('\n', '').replace('\r', ''), expectedInput)
|
|
|
|
def __splitUrl(self, url):
|
|
splitedUrl = url.split("?")
|
|
if len(splitedUrl) == 1:
|
|
return splitedUrl
|
|
self.__testCase.assertEqual(len(splitedUrl), 2)
|
|
base, qs = splitedUrl
|
|
return (base, sorted(qs.split("&")))
|
|
|
|
def getresponse(self):
|
|
status = int(readLine(self.__file))
|
|
headers = eval(readLine(self.__file))
|
|
output = readLine(self.__file)
|
|
|
|
return FakeHttpResponse(status, headers, output)
|
|
|
|
def close(self):
|
|
readLine(self.__file)
|
|
|
|
|
|
def ReplayingHttpConnection(testCase, file, *args, **kwds):
|
|
return ReplayingConnection(testCase, file, "http", *args, **kwds)
|
|
|
|
|
|
def ReplayingHttpsConnection(testCase, file, *args, **kwds):
|
|
return ReplayingConnection(testCase, file, "https", *args, **kwds)
|
|
|
|
|
|
class BasicTestCase(unittest.TestCase):
|
|
recordMode = False
|
|
|
|
def setUp(self):
|
|
unittest.TestCase.setUp(self)
|
|
self.__fileName = ""
|
|
self.__file = None
|
|
if self.recordMode: # pragma no cover (Branch useful only when recording new tests, not used during automated tests)
|
|
github.Requester.Requester.injectConnectionClasses(
|
|
lambda ignored, *args, **kwds: RecordingHttpConnection(self.__openFile("wb"), *args, **kwds),
|
|
lambda ignored, *args, **kwds: RecordingHttpsConnection(self.__openFile("wb"), *args, **kwds)
|
|
)
|
|
import GithubCredentials
|
|
self.login = GithubCredentials.login
|
|
self.password = GithubCredentials.password
|
|
self.oauth_token = GithubCredentials.oauth_token
|
|
# @todo Remove client_id and client_secret from ReplayData (as we already remove login, password and oauth_token)
|
|
# self.client_id = GithubCredentials.client_id
|
|
# self.client_secret = GithubCredentials.client_secret
|
|
else:
|
|
github.Requester.Requester.injectConnectionClasses(
|
|
lambda ignored, *args, **kwds: ReplayingHttpConnection(self, self.__openFile("rb"), *args, **kwds),
|
|
lambda ignored, *args, **kwds: ReplayingHttpsConnection(self, self.__openFile("rb"), *args, **kwds)
|
|
)
|
|
self.login = "login"
|
|
self.password = "password"
|
|
self.oauth_token = "oauth_token"
|
|
self.client_id = "client_id"
|
|
self.client_secret = "client_secret"
|
|
|
|
def tearDown(self):
|
|
unittest.TestCase.tearDown(self)
|
|
self.__closeReplayFileIfNeeded()
|
|
github.Requester.Requester.resetConnectionClasses()
|
|
|
|
def __openFile(self, mode):
|
|
for (_, _, functionName, _) in traceback.extract_stack():
|
|
if functionName.startswith("test") or functionName == "setUp" or functionName == "tearDown":
|
|
if functionName != "test": # because in class Hook(Framework.TestCase), method testTest calls Hook.test
|
|
fileName = os.path.join(os.path.dirname(__file__), "ReplayData", self.__class__.__name__ + "." + functionName + ".txt")
|
|
if fileName != self.__fileName:
|
|
self.__closeReplayFileIfNeeded()
|
|
self.__fileName = fileName
|
|
self.__file = open(self.__fileName, mode)
|
|
return self.__file
|
|
|
|
def __closeReplayFileIfNeeded(self):
|
|
if self.__file is not None:
|
|
if not self.recordMode: # pragma no branch (Branch useful only when recording new tests, not used during automated tests)
|
|
self.assertEqual(readLine(self.__file), "")
|
|
self.__file.close()
|
|
|
|
def assertListKeyEqual(self, elements, key, expectedKeys):
|
|
realKeys = [key(element) for element in elements]
|
|
self.assertEqual(realKeys, expectedKeys)
|
|
|
|
def assertListKeyBegin(self, elements, key, expectedKeys):
|
|
realKeys = [key(element) for element in elements[: len(expectedKeys)]]
|
|
self.assertEqual(realKeys, expectedKeys)
|
|
|
|
|
|
class TestCase(BasicTestCase):
|
|
def doCheckFrame(self, obj, frame):
|
|
if obj._headers == {} and frame is None:
|
|
return
|
|
if obj._headers is None and frame == {}:
|
|
return
|
|
self.assertEqual(obj._headers, frame[2])
|
|
|
|
def getFrameChecker(self):
|
|
return lambda requester, obj, frame: self.doCheckFrame(obj, frame)
|
|
|
|
def setUp(self):
|
|
BasicTestCase.setUp(self)
|
|
|
|
# Set up frame debugging
|
|
github.GithubObject.GithubObject.setCheckAfterInitFlag(True)
|
|
github.Requester.Requester.setDebugFlag(True)
|
|
github.Requester.Requester.setOnCheckMe(self.getFrameChecker())
|
|
|
|
self.g = github.Github(self.login, self.password)
|
|
|
|
|
|
def activateRecordMode(): # pragma no cover (Function useful only when recording new tests, not used during automated tests)
|
|
BasicTestCase.recordMode = True
|