From 33d35912a5f30ca563eb239db6605f254c15f3f2 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Tue, 28 Aug 2012 00:58:45 +0900 Subject: [PATCH] python: add simple SPDY client function urlfetch() --- python/spdylay.pyx | 191 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 191 insertions(+) diff --git a/python/spdylay.pyx b/python/spdylay.pyx index b032e61..7aa2f7b 100644 --- a/python/spdylay.pyx +++ b/python/spdylay.pyx @@ -1481,6 +1481,197 @@ try: socketserver.ThreadingMixIn.process_request(self, request, client_address) + + # Simple SPDY client implementation. Since this implementation + # uses TLS NPN, Python 3.3.0 or later is required. + + from urllib.parse import urlsplit + + class BaseSPDYStreamHandler: + def __init__(self, uri, fetcher): + self.uri = uri + self.fetcher = fetcher + self.stream_id = None + + def on_header(self, nv): + pass + + def on_data(self, data): + pass + + def on_close(self, status_code): + pass + + class UrlFetchError(Exception): + pass + + class UrlFetcher: + def __init__(self, server_address, uris, StreamHandlerClass): + self.server_address = server_address + self.handlers = [StreamHandlerClass(uri, self) for uri in uris] + self.streams = {} + self.finished = [] + + self.ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + self.ctx.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | \ + ssl.OP_NO_COMPRESSION + self.ctx.set_npn_protocols(get_npn_protocols()) + + def send_cb(self, session, data): + return self.sock.send(data) + + def before_ctrl_send_cb(self, session, frame): + if frame.frame_type == SYN_STREAM: + handler = session.get_stream_user_data(frame.stream_id) + if handler: + handler.stream_id = frame.stream_id + self.streams[handler.stream_id] = handler + + def on_ctrl_recv_cb(self, session, frame): + if frame.frame_type == SYN_REPLY or frame.frame_type == HEADERS: + if frame.stream_id in self.streams: + handler = self.streams[frame.stream_id] + handler.on_header(frame.nv) + + def on_data_chunk_recv_cb(self, session, flags, stream_id, data): + if stream_id in self.streams: + handler = self.streams[stream_id] + handler.on_data(data) + + def on_stream_close_cb(self, session, stream_id, status_code): + if stream_id in self.streams: + handler = self.streams[stream_id] + handler.on_close(status_code) + del self.streams[stream_id] + self.finished.append(handler) + + def connect(self, server_address): + self.sock = None + for res in socket.getaddrinfo(server_address[0], server_address[1], + socket.AF_UNSPEC, + socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + try: + self.sock = socket.socket(af, socktype, proto) + except OSError as msg: + self.sock = None + continue + try: + self.sock.connect(sa) + except OSError as msg: + self.sock.close() + self.sock = None + continue + break + else: + raise UrlFetchError('Could not connect to {}'\ + .format(server_address)) + + def tls_handshake(self): + self.sock = self.ctx.wrap_socket(self.sock, server_side=False, + do_handshake_on_connect=False) + self.sock.do_handshake() + + self.version = npn_get_version(self.sock.selected_npn_protocol()) + if self.version == 0: + raise UrlFetchError('NPN failed') + + def loop(self): + self.connect(self.server_address) + self.tls_handshake() + self.sock.setblocking(False) + + session = Session(CLIENT, + self.version, + send_cb=self.send_cb, + on_ctrl_recv_cb=self.on_ctrl_recv_cb, + on_data_chunk_recv_cb=self.on_data_chunk_recv_cb, + before_ctrl_send_cb=self.before_ctrl_send_cb, + on_stream_close_cb=self.on_stream_close_cb) + + session.submit_settings(\ + FLAG_SETTINGS_NONE, + [(SETTINGS_MAX_CONCURRENT_STREAMS, ID_FLAG_SETTINGS_NONE, 100)] + ) + + if self.server_address[1] == 443: + hostport = self.server_address[0] + else: + hostport = '{}:{}'.format(self.server_address[0], + self.server_address[1]) + + for handler in self.handlers: + res = urlsplit(handler.uri) + if res.path: + path = res.path + else: + path = '/' + if res.query: + path = '?'.join([path, res.query]) + + session.submit_request(0, + [(':method', 'GET'), + (':scheme', 'https'), + (':path', path), + (':version', 'HTTP/1.1'), + (':host', hostport), + ('accept', '*/*'), + ('user-agent', 'python-spdylay')], + stream_user_data=handler) + + while (session.want_read() or session.want_write()) \ + and not len(self.finished) == len(self.handlers): + want_read = want_write = False + try: + data = self.sock.recv(4096) + if data: + session.recv(data) + else: + break + except ssl.SSLWantReadError: + want_read = True + except ssl.SSLWantWriteError: + want_write = True + try: + session.send() + except ssl.SSLWantReadError: + want_read = True + except ssl.SSLWantWriteError: + want_write = True + + if want_read or want_write: + select.select([self.sock] if want_read else [], + [self.sock] if want_write else [], + []) + + def _urlfetch_session_one(uris, StreamHandlerClass): + res = urlsplit(uris[0]) + if res.scheme != 'https': + raise UrlFetchError('Unsupported scheme {}'.format(res.scheme)) + hostname = res.hostname + port = res.port if res.port else 443 + + f = UrlFetcher((hostname, port), uris, StreamHandlerClass) + f.loop() + + def urlfetch(uri_or_uris, StreamHandlerClass): + if isinstance(uri_or_uris, str): + _urlfetch_session_one([uri_or_uris], StreamHandlerClass) + else: + uris = [] + prev_addr = (None, None) + for uri in uri_or_uris: + res = urlsplit(uri) + port = res.port if res.port else 443 + if prev_addr != (res.hostname, port): + if uris: + _urlfetch_session_one(uris, StreamHandlerClass) + uris = [] + prev_addr = (res.hostname, port) + uris.append(uri) + if uris: + _urlfetch_session_one(uris, StreamHandlerClass) + except ImportError: # No server for 2.x because they lack TLS NPN. pass