mirror of
https://github.com/moparisthebest/spdylay
synced 2024-11-16 06:25:04 -05:00
340 lines
12 KiB
Python
340 lines
12 KiB
Python
#!/usr/bin/env python
|
|
|
|
import unittest
|
|
import io
|
|
import collections
|
|
|
|
import spdylay
|
|
|
|
class BufferList:
|
|
def __init__(self):
|
|
self.buffers = collections.deque()
|
|
|
|
def add_buffer(self, bytebuf):
|
|
self.buffers.append(bytebuf)
|
|
|
|
def get_bytes(self, length):
|
|
while self.buffers:
|
|
first = self.buffers[0]
|
|
data = first.read(length)
|
|
if data:
|
|
return data
|
|
else:
|
|
self.buffers.popleft()
|
|
return None
|
|
|
|
class IOBridge:
|
|
def __init__(self, inputs, outputs):
|
|
self.inputs = inputs
|
|
self.outputs = outputs
|
|
|
|
class Streams:
|
|
def __init__(self, iob):
|
|
self.iob = iob
|
|
self.streams = {}
|
|
self.recv_frames = []
|
|
self.recv_data = io.BytesIO()
|
|
|
|
def recv_cb(session, length):
|
|
iob = session.user_data.iob
|
|
return iob.inputs.get_bytes(length)
|
|
|
|
def send_cb(session, data):
|
|
iob = session.user_data.iob
|
|
iob.outputs.add_buffer(io.BytesIO(data))
|
|
return len(data)
|
|
|
|
def read_cb(session, stream_id, length, source):
|
|
return source.read(length)
|
|
|
|
def on_data_chunk_recv_cb(session, flags, stream_id, data):
|
|
session.user_data.recv_data.write(data)
|
|
|
|
def on_ctrl_recv_cb(session, frame):
|
|
session.user_data.recv_frames.append(frame)
|
|
|
|
class SpdylayTests(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
client_output = BufferList()
|
|
server_output = BufferList()
|
|
|
|
client_iob = IOBridge(server_output, client_output)
|
|
server_iob = IOBridge(client_output, server_output)
|
|
|
|
self.client_streams = Streams(client_iob)
|
|
self.server_streams = Streams(server_iob)
|
|
|
|
self.client_session = spdylay.Session(\
|
|
spdylay.CLIENT,
|
|
spdylay.PROTO_SPDY3,
|
|
user_data=self.client_streams,
|
|
recv_cb=recv_cb,
|
|
send_cb=send_cb,
|
|
on_ctrl_recv_cb=on_ctrl_recv_cb,
|
|
on_data_chunk_recv_cb=on_data_chunk_recv_cb)
|
|
|
|
self.server_session = spdylay.Session(\
|
|
spdylay.SERVER,
|
|
spdylay.PROTO_SPDY3,
|
|
user_data=self.server_streams,
|
|
recv_cb=recv_cb,
|
|
send_cb=send_cb,
|
|
on_ctrl_recv_cb=on_ctrl_recv_cb,
|
|
on_data_chunk_recv_cb=on_data_chunk_recv_cb)
|
|
|
|
def test_submit_request_and_response(self):
|
|
data_prd = spdylay.DataProvider(io.BytesIO(b'Hello World'), read_cb)
|
|
self.client_session.submit_request(0, [(b':method', b'POST')],
|
|
data_prd=data_prd,
|
|
stream_user_data=data_prd)
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(1, len(self.server_streams.recv_frames))
|
|
frame = self.server_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.SYN_STREAM, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
self.assertEqual(0, frame.assoc_stream_id)
|
|
self.assertEqual(0, frame.pri)
|
|
self.assertEqual((b':method', b'POST'), frame.nv[0])
|
|
|
|
self.assertEqual(b'Hello World',
|
|
self.server_streams.recv_data.getvalue())
|
|
|
|
self.assertEqual(data_prd, self.client_session.get_stream_user_data(1))
|
|
|
|
data_prd = spdylay.DataProvider(io.BytesIO(b'Foo the bar'), read_cb)
|
|
self.server_session.submit_response(1, [(b':status', b'200 OK')],
|
|
data_prd=data_prd)
|
|
self.server_session.send()
|
|
self.client_session.recv()
|
|
|
|
self.assertEqual(1, len(self.client_streams.recv_frames))
|
|
frame = self.client_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.SYN_REPLY, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
self.assertEqual((b':status', b'200 OK'), frame.nv[0])
|
|
|
|
self.assertEqual(b'Foo the bar',
|
|
self.client_streams.recv_data.getvalue())
|
|
|
|
def test_submit_syn_stream_and_syn_stream(self):
|
|
self.client_session.submit_syn_stream(spdylay.CTRL_FLAG_FIN, 0, 2,
|
|
[(b':path', b'/')], None)
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(1, len(self.server_streams.recv_frames))
|
|
frame = self.server_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.SYN_STREAM, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
self.assertEqual(0, frame.assoc_stream_id)
|
|
self.assertEqual(2, frame.pri)
|
|
self.assertEqual((b':path', b'/'), frame.nv[0])
|
|
|
|
self.server_session.submit_syn_reply(spdylay.CTRL_FLAG_FIN, 1,
|
|
[(b':version', b'HTTP/1.1')])
|
|
self.server_session.send()
|
|
self.client_session.recv()
|
|
|
|
self.assertEqual(1, len(self.client_streams.recv_frames))
|
|
frame = self.client_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.SYN_REPLY, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
self.assertEqual((b':version', b'HTTP/1.1'), frame.nv[0])
|
|
|
|
def test_submit_rst_stream(self):
|
|
self.client_session.submit_syn_stream(spdylay.CTRL_FLAG_FIN, 0, 2,
|
|
[(b':path', b'/')], None)
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.server_session.submit_rst_stream(1, spdylay.PROTOCOL_ERROR)
|
|
self.server_session.send()
|
|
self.client_session.recv()
|
|
|
|
self.assertEqual(1, len(self.client_streams.recv_frames))
|
|
frame = self.client_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.RST_STREAM, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
self.assertEqual(spdylay.PROTOCOL_ERROR, frame.status_code)
|
|
|
|
def test_submit_goaway(self):
|
|
self.client_session.submit_goaway(spdylay.GOAWAY_PROTOCOL_ERROR)
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(1, len(self.server_streams.recv_frames))
|
|
frame = self.server_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.GOAWAY, frame.frame_type)
|
|
self.assertEqual(spdylay.GOAWAY_PROTOCOL_ERROR, frame.status_code)
|
|
|
|
def test_resume_data(self):
|
|
with self.assertRaises(spdylay.InvalidArgumentError):
|
|
self.client_session.resume_data(1)
|
|
|
|
def test_get_pri_lowest(self):
|
|
self.assertEqual(7, self.client_session.get_pri_lowest())
|
|
|
|
def test_fail_session(self):
|
|
self.client_session.fail_session(spdylay.GOAWAY_PROTOCOL_ERROR)
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(1, len(self.server_streams.recv_frames))
|
|
frame = self.server_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.GOAWAY, frame.frame_type)
|
|
self.assertEqual(spdylay.GOAWAY_PROTOCOL_ERROR, frame.status_code)
|
|
|
|
self.assertFalse(self.client_session.want_read())
|
|
self.assertFalse(self.client_session.want_write())
|
|
|
|
def test_deferred_data(self):
|
|
def deferred_read_cb(session, stream_id, length, source):
|
|
return spdylay.ERR_DEFERRED
|
|
|
|
data_prd = spdylay.DataProvider(io.BytesIO(b'Hello World'),
|
|
deferred_read_cb)
|
|
self.client_session.submit_request(0, [(b':method', b'POST')],
|
|
data_prd=data_prd,
|
|
stream_user_data=data_prd)
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(1, len(self.server_streams.recv_frames))
|
|
frame = self.server_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.SYN_STREAM, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
self.assertEqual(0, frame.assoc_stream_id)
|
|
self.assertEqual(0, frame.pri)
|
|
self.assertEqual((b':method', b'POST'), frame.nv[0])
|
|
|
|
self.assertEqual(b'', self.server_streams.recv_data.getvalue())
|
|
|
|
data_prd.read_cb = read_cb
|
|
|
|
self.client_session.resume_data(1)
|
|
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(b'Hello World',
|
|
self.server_streams.recv_data.getvalue())
|
|
|
|
def test_recv_cb_eof(self):
|
|
def eof_recv_cb(session, length):
|
|
raise spdylay.EOFError()
|
|
|
|
self.client_session = spdylay.Session(\
|
|
spdylay.CLIENT,
|
|
spdylay.PROTO_SPDY3,
|
|
user_data=self.client_streams,
|
|
recv_cb=eof_recv_cb)
|
|
|
|
with self.assertRaises(spdylay.EOFError):
|
|
self.client_session.recv()
|
|
|
|
def test_recv_cb_callback_failure(self):
|
|
def cbfail_recv_cb(session, length):
|
|
raise spdylay.CallbackFailureError()
|
|
|
|
self.client_session = spdylay.Session(\
|
|
spdylay.CLIENT,
|
|
spdylay.PROTO_SPDY3,
|
|
user_data=self.client_streams,
|
|
recv_cb=cbfail_recv_cb)
|
|
|
|
with self.assertRaises(spdylay.CallbackFailureError):
|
|
self.client_session.recv()
|
|
|
|
def test_send_cb_callback_failure(self):
|
|
def cbfail_send_cb(session, data):
|
|
raise spdylay.CallbackFailureError()
|
|
|
|
self.client_session = spdylay.Session(\
|
|
spdylay.CLIENT,
|
|
spdylay.PROTO_SPDY3,
|
|
user_data=self.client_streams,
|
|
send_cb=cbfail_send_cb)
|
|
|
|
self.client_session.submit_goaway(spdylay.GOAWAY_OK)
|
|
|
|
with self.assertRaises(spdylay.CallbackFailureError):
|
|
self.client_session.send()
|
|
|
|
def test_submit_data(self):
|
|
self.client_session.submit_syn_stream(spdylay.CTRL_FLAG_NONE, 0, 2,
|
|
[(b':path', b'/')], None)
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(1, len(self.server_streams.recv_frames))
|
|
frame = self.server_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.SYN_STREAM, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
|
|
data_prd = spdylay.DataProvider(io.BytesIO(b'Hello World'), read_cb)
|
|
self.client_session.submit_data(1, spdylay.DATA_FLAG_FIN, data_prd)
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(b'Hello World',
|
|
self.server_streams.recv_data.getvalue())
|
|
|
|
def test_submit_headers(self):
|
|
self.client_session.submit_syn_stream(spdylay.CTRL_FLAG_NONE, 0, 2,
|
|
[(b':path', b'/')], None)
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(1, len(self.server_streams.recv_frames))
|
|
frame = self.server_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.SYN_STREAM, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
|
|
self.client_session.submit_headers(spdylay.CTRL_FLAG_FIN, 1,
|
|
[(b':host', b'localhost')])
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(2, len(self.server_streams.recv_frames))
|
|
frame = self.server_streams.recv_frames[1]
|
|
self.assertEqual(spdylay.HEADERS, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
self.assertEqual((b':host', b'localhost'), frame.nv[0])
|
|
|
|
def test_submit_ping(self):
|
|
self.client_session.submit_ping()
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(1, len(self.server_streams.recv_frames))
|
|
frame = self.server_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.PING, frame.frame_type)
|
|
self.assertEqual(1, frame.unique_id)
|
|
|
|
def test_submit_window_update(self):
|
|
self.client_session.submit_syn_stream(spdylay.CTRL_FLAG_NONE, 0, 2,
|
|
[(b':path', b'/')], None)
|
|
self.client_session.send()
|
|
self.server_session.recv()
|
|
|
|
self.assertEqual(1, len(self.server_streams.recv_frames))
|
|
frame = self.server_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.SYN_STREAM, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
|
|
self.server_session.submit_window_update(1, 4096)
|
|
self.server_session.send()
|
|
self.client_session.recv()
|
|
|
|
self.assertEqual(1, len(self.client_streams.recv_frames))
|
|
frame = self.client_streams.recv_frames[0]
|
|
self.assertEqual(spdylay.WINDOW_UPDATE, frame.frame_type)
|
|
self.assertEqual(1, frame.stream_id)
|
|
self.assertEqual(4096, frame.delta_window_size)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|