diff --git a/src/Makefile.am b/src/Makefile.am index 0524fda..244ba24 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -84,6 +84,7 @@ shrpx_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} \ shrpx_downstream_connection.cc shrpx_downstream_connection.h \ shrpx_http_downstream_connection.cc shrpx_http_downstream_connection.h \ shrpx_spdy_downstream_connection.cc shrpx_spdy_downstream_connection.h \ + shrpx_spdy_session.cc shrpx_spdy_session.h \ shrpx_log.cc shrpx_log.h \ shrpx_http.cc shrpx_http.h \ shrpx_io_control.cc shrpx_io_control.h \ diff --git a/src/shrpx.cc b/src/shrpx.cc index 82648dd..89451d2 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -252,6 +252,8 @@ int event_loop() if(get_config()->num_worker > 1) { listener_handler->create_worker_thread(get_config()->num_worker); + } else if(get_config()->client_mode) { + listener_handler->create_spdy_session(); } if(ENABLE_LOG) { diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 2b88ac3..7bb14c5 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -118,11 +118,11 @@ ClientHandler::ClientHandler(bufferevent *bev, int fd, SSL *ssl, const char *ipaddr) : bev_(bev), fd_(fd), - ssl_client_ctx_(0), ssl_(ssl), upstream_(0), ipaddr_(ipaddr), - should_close_after_write_(false) + should_close_after_write_(false), + spdy_(0) { bufferevent_enable(bev_, EV_READ | EV_WRITE); bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK); @@ -294,14 +294,14 @@ SSL* ClientHandler::get_ssl() const return ssl_; } -void ClientHandler::set_ssl_client_ctx(SSL_CTX *ssl_ctx) +void ClientHandler::set_spdy_session(SpdySession *spdy) { - ssl_client_ctx_ = ssl_ctx; + spdy_ = spdy; } -SSL_CTX* ClientHandler::get_ssl_client_ctx() const +SpdySession* ClientHandler::get_spdy_session() const { - return ssl_client_ctx_; + return spdy_; } } // namespace shrpx diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index f8b6333..0cb843b 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -36,6 +36,7 @@ namespace shrpx { class Upstream; class DownstreamConnection; +class SpdySession; class ClientHandler { public: @@ -60,19 +61,19 @@ public: DownstreamConnection* get_downstream_connection(); size_t get_pending_write_length(); SSL* get_ssl() const; - void set_ssl_client_ctx(SSL_CTX *ssl_ctx); - SSL_CTX* get_ssl_client_ctx() const; + void set_spdy_session(SpdySession *spdy); + SpdySession* get_spdy_session() const; private: bufferevent *bev_; int fd_; - // SSL_CTX for SSL object to connect backend SPDY server - SSL_CTX *ssl_client_ctx_; SSL *ssl_; Upstream *upstream_; std::string ipaddr_; bool should_close_after_write_; - std::set dconn_pool_; + // Shared SPDY session for each thread. NULL if not client mode. Not + // deleted by this object. + SpdySession *spdy_; }; } // namespace shrpx diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index 9bfdb11..147c424 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -42,7 +42,6 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority) dconn_(0), stream_id_(stream_id), priority_(priority), - ioctrl_(0), downstream_stream_id_(-1), request_state_(INITIAL), request_major_(1), @@ -58,13 +57,9 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority) chunked_response_(false), response_connection_close_(false), response_header_key_prev_(false), - response_htp_(new http_parser()), response_body_buf_(0), recv_window_size_(0) -{ - http_parser_init(response_htp_, HTTP_RESPONSE); - response_htp_->data = this; -} +{} Downstream::~Downstream() { @@ -78,7 +73,6 @@ Downstream::~Downstream() if(dconn_) { delete dconn_; } - delete response_htp_; if(ENABLE_LOG) { LOG(INFO) << "Deleted"; } @@ -87,11 +81,6 @@ Downstream::~Downstream() void Downstream::set_downstream_connection(DownstreamConnection *dconn) { dconn_ = dconn; - if(dconn_) { - ioctrl_.set_bev(dconn_->get_bev()); - } else { - ioctrl_.set_bev(0); - } } DownstreamConnection* Downstream::get_downstream_connection() @@ -101,17 +90,25 @@ DownstreamConnection* Downstream::get_downstream_connection() void Downstream::pause_read(IOCtrlReason reason) { - ioctrl_.pause_read(reason); + if(dconn_) { + dconn_->pause_read(reason); + } } bool Downstream::resume_read(IOCtrlReason reason) { - return ioctrl_.resume_read(reason); + if(dconn_) { + return dconn_->resume_read(reason); + } else { + return false; + } } void Downstream::force_resume_read() { - ioctrl_.force_resume_read(); + if(dconn_) { + dconn_->force_resume_read(); + } } namespace { @@ -285,9 +282,7 @@ bool Downstream::get_expect_100_continue() const bool Downstream::get_output_buffer_full() { if(dconn_) { - bufferevent *bev = dconn_->get_bev(); - evbuffer *output = bufferevent_get_output(bev); - return evbuffer_get_length(output) >= DOWNSTREAM_OUTPUT_UPPER_THRES; + return dconn_->get_output_buffer_full(); } else { return false; } @@ -414,116 +409,9 @@ void Downstream::set_response_connection_close(bool f) response_connection_close_ = f; } -namespace { -int htp_hdrs_completecb(http_parser *htp) +int Downstream::on_read() { - Downstream *downstream; - downstream = reinterpret_cast(htp->data); - downstream->set_response_http_status(htp->status_code); - downstream->set_response_major(htp->http_major); - downstream->set_response_minor(htp->http_minor); - downstream->set_response_connection_close(!http_should_keep_alive(htp)); - downstream->set_response_state(Downstream::HEADER_COMPLETE); - if(downstream->get_upstream()->on_downstream_header_complete(downstream) - != 0) { - return -1; - } - unsigned int status = downstream->get_response_http_status(); - // Ignore the response body. HEAD response may contain - // Content-Length or Transfer-Encoding: chunked. Some server send - // 304 status code with nonzero Content-Length, but without response - // body. See - // http://tools.ietf.org/html/draft-ietf-httpbis-p1-messaging-20#section-3.3 - return downstream->get_request_method() == "HEAD" || - (100 <= status && status <= 199) || status == 204 || - status == 304 ? 1 : 0; -} -} // namespace - -namespace { -int htp_hdr_keycb(http_parser *htp, const char *data, size_t len) -{ - Downstream *downstream; - downstream = reinterpret_cast(htp->data); - if(downstream->get_response_header_key_prev()) { - downstream->append_last_response_header_key(data, len); - } else { - downstream->add_response_header(std::string(data, len), ""); - } - return 0; -} -} // namespace - -namespace { -int htp_hdr_valcb(http_parser *htp, const char *data, size_t len) -{ - Downstream *downstream; - downstream = reinterpret_cast(htp->data); - if(downstream->get_response_header_key_prev()) { - downstream->set_last_response_header_value(std::string(data, len)); - } else { - downstream->append_last_response_header_value(data, len); - } - return 0; -} -} // namespace - -namespace { -int htp_bodycb(http_parser *htp, const char *data, size_t len) -{ - Downstream *downstream; - downstream = reinterpret_cast(htp->data); - - return downstream->get_upstream()->on_downstream_body - (downstream, reinterpret_cast(data), len); -} -} // namespace - -namespace { -int htp_msg_completecb(http_parser *htp) -{ - Downstream *downstream; - downstream = reinterpret_cast(htp->data); - - downstream->set_response_state(Downstream::MSG_COMPLETE); - return downstream->get_upstream()->on_downstream_body_complete(downstream); -} -} // namespace - -namespace { -http_parser_settings htp_hooks = { - 0, /*http_cb on_message_begin;*/ - 0, /*http_data_cb on_url;*/ - htp_hdr_keycb, /*http_data_cb on_header_field;*/ - htp_hdr_valcb, /*http_data_cb on_header_value;*/ - htp_hdrs_completecb, /*http_cb on_headers_complete;*/ - htp_bodycb, /*http_data_cb on_body;*/ - htp_msg_completecb /*http_cb on_message_complete;*/ -}; -} // namespace - -int Downstream::parse_http_response() -{ - bufferevent *bev = dconn_->get_bev(); - evbuffer *input = bufferevent_get_input(bev); - unsigned char *mem = evbuffer_pullup(input, -1); - - size_t nread = http_parser_execute(response_htp_, &htp_hooks, - reinterpret_cast(mem), - evbuffer_get_length(input)); - - evbuffer_drain(input, nread); - http_errno htperr = HTTP_PARSER_ERRNO(response_htp_); - if(htperr == HPE_OK) { - return 0; - } else { - if(ENABLE_LOG) { - LOG(INFO) << "Downstream HTTP parser failure: " - << "(" << http_errno_name(htperr) << ") " - << http_errno_description(htperr); - } - return SHRPX_ERR_HTTP_PARSE; - } + return dconn_->on_read(); } void Downstream::set_response_state(int state) diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 74bc8a6..254a9d9 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -35,10 +35,6 @@ #include #include -extern "C" { -#include "http-parser/http_parser.h" -} - #include "shrpx_io_control.h" namespace shrpx { @@ -128,19 +124,21 @@ public: void set_chunked_response(bool f); bool get_response_connection_close() const; void set_response_connection_close(bool f); - int parse_http_response(); void set_response_state(int state); int get_response_state() const; int init_response_body_buf(); evbuffer* get_response_body_buf(); - static const size_t DOWNSTREAM_OUTPUT_UPPER_THRES = 64*1024; + // Call this method when there is incoming data in downstream + // connection. + int on_read(); + + static const size_t OUTPUT_UPPER_THRES = 64*1024; private: Upstream *upstream_; DownstreamConnection *dconn_; int32_t stream_id_; int priority_; - IOControl ioctrl_; // stream ID in backend connection int32_t downstream_stream_id_; @@ -163,7 +161,6 @@ private: bool response_connection_close_; Headers response_headers_; bool response_header_key_prev_; - http_parser *response_htp_; // This buffer is used to temporarily store downstream response // body. Spdylay reads data from this in the callback. evbuffer *response_body_buf_; diff --git a/src/shrpx_downstream_connection.cc b/src/shrpx_downstream_connection.cc index 36212e4..5aacd14 100644 --- a/src/shrpx_downstream_connection.cc +++ b/src/shrpx_downstream_connection.cc @@ -25,96 +25,17 @@ #include "shrpx_downstream_connection.h" #include "shrpx_client_handler.h" -#include "shrpx_upstream.h" #include "shrpx_downstream.h" -#include "shrpx_config.h" -#include "shrpx_error.h" namespace shrpx { DownstreamConnection::DownstreamConnection(ClientHandler *client_handler) : client_handler_(client_handler), - bev_(0), downstream_(0) {} DownstreamConnection::~DownstreamConnection() -{ - if(bev_) { - bufferevent_disable(bev_, EV_READ | EV_WRITE); - bufferevent_free(bev_); - } - // Downstream and DownstreamConnection may be deleted - // asynchronously. - if(downstream_) { - downstream_->set_downstream_connection(0); - } -} - -// When downstream request is issued, call this function to set read -// timeout. We don't know when the request is completely received by -// the downstream server. This function may be called before that -// happens. Overall it does not cause problem for most of the time. -// If the downstream server is too slow to recv/send, the connection -// will be dropped by read timeout. -void DownstreamConnection::start_waiting_response() -{ - if(bev_) { - bufferevent_set_timeouts(bev_, - &get_config()->downstream_read_timeout, - &get_config()->downstream_write_timeout); - } -} - -namespace { -// Gets called when DownstreamConnection is pooled in ClientHandler. -void idle_eventcb(bufferevent *bev, short events, void *arg) -{ - DownstreamConnection *dconn = reinterpret_cast(arg); - if(events & BEV_EVENT_CONNECTED) { - // Downstream was detached before connection established? - // This may be safe to be left. - if(ENABLE_LOG) { - LOG(INFO) << "Idle downstream connected?" << dconn; - } - return; - } - if(events & BEV_EVENT_EOF) { - if(ENABLE_LOG) { - LOG(INFO) << "Idle downstream connection EOF " << dconn; - } - } else if(events & BEV_EVENT_TIMEOUT) { - if(ENABLE_LOG) { - LOG(INFO) << "Idle downstream connection timeout " << dconn; - } - } else if(events & BEV_EVENT_ERROR) { - if(ENABLE_LOG) { - LOG(INFO) << "Idle downstream connection error " << dconn; - } - } - ClientHandler *client_handler = dconn->get_client_handler(); - client_handler->remove_downstream_connection(dconn); - delete dconn; -} -} // namespace - -void DownstreamConnection::detach_downstream(Downstream *downstream) -{ - if(ENABLE_LOG) { - LOG(INFO) << "Detaching downstream connection " << this << " from " - << "downstream " << downstream; - } - downstream->set_downstream_connection(0); - downstream_ = 0; - bufferevent_enable(bev_, EV_READ); - bufferevent_setcb(bev_, 0, 0, idle_eventcb, this); - // On idle state, just enable read timeout. Normally idle downstream - // connection will get EOF from the downstream server and closed. - bufferevent_set_timeouts(bev_, - &get_config()->downstream_idle_read_timeout, - &get_config()->downstream_write_timeout); - client_handler_->pool_downstream_connection(this); -} +{} ClientHandler* DownstreamConnection::get_client_handler() { @@ -126,9 +47,4 @@ Downstream* DownstreamConnection::get_downstream() return downstream_; } -bufferevent* DownstreamConnection::get_bev() -{ - return bev_; -} - } // namespace shrpx diff --git a/src/shrpx_downstream_connection.h b/src/shrpx_downstream_connection.h index ea3d2b0..30bf732 100644 --- a/src/shrpx_downstream_connection.h +++ b/src/shrpx_downstream_connection.h @@ -27,8 +27,7 @@ #include "shrpx.h" -#include -#include +#include "shrpx_io_control.h" namespace shrpx { @@ -40,21 +39,25 @@ public: DownstreamConnection(ClientHandler *client_handler); virtual ~DownstreamConnection(); virtual int attach_downstream(Downstream *downstream) = 0; - void detach_downstream(Downstream *downstream); - bufferevent* get_bev(); + virtual void detach_downstream(Downstream *downstream) = 0; virtual int push_request_headers() = 0; virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen) = 0; virtual int end_upload_data() = 0; - virtual int on_connect() = 0; + virtual void pause_read(IOCtrlReason reason) = 0; + virtual bool resume_read(IOCtrlReason reason) = 0; + virtual void force_resume_read() = 0; + + virtual bool get_output_buffer_full() = 0; + + virtual int on_read() = 0; + virtual int on_write() = 0; ClientHandler* get_client_handler(); Downstream* get_downstream(); - void start_waiting_response(); protected: ClientHandler *client_handler_; - bufferevent *bev_; Downstream *downstream_; }; diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 1508973..5140ab2 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -44,11 +44,25 @@ timeval max_timeout = { 86400, 0 }; HttpDownstreamConnection::HttpDownstreamConnection (ClientHandler *client_handler) - : DownstreamConnection(client_handler) + : DownstreamConnection(client_handler), + bev_(0), + ioctrl_(0), + response_htp_(new http_parser()) {} HttpDownstreamConnection::~HttpDownstreamConnection() -{} +{ + delete response_htp_; + if(bev_) { + bufferevent_disable(bev_, EV_READ | EV_WRITE); + bufferevent_free(bev_); + } + // Downstream and DownstreamConnection may be deleted + // asynchronously. + if(downstream_) { + downstream_->set_downstream_connection(0); + } +} int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { @@ -78,6 +92,12 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) } downstream->set_downstream_connection(this); downstream_ = downstream; + + ioctrl_.set_bev(bev_); + + http_parser_init(response_htp_, HTTP_RESPONSE); + response_htp_->data = downstream_; + bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK); bufferevent_enable(bev_, EV_READ); bufferevent_setcb(bev_, @@ -172,7 +192,17 @@ int HttpDownstreamConnection::push_request_headers() if(rv != 0) { return -1; } - start_waiting_response(); + + // When downstream request is issued, set read timeout. We don't + // know when the request is completely received by the downstream + // server. This function may be called before that happens. Overall + // it does not cause problem for most of the time. If the + // downstream server is too slow to recv/send, the connection will + // be dropped by read timeout. + bufferevent_set_timeouts(bev_, + &get_config()->downstream_read_timeout, + &get_config()->downstream_write_timeout); + return 0; } @@ -223,7 +253,196 @@ int HttpDownstreamConnection::end_upload_data() return 0; } -int HttpDownstreamConnection::on_connect() +namespace { +// Gets called when DownstreamConnection is pooled in ClientHandler. +void idle_eventcb(bufferevent *bev, short events, void *arg) +{ + HttpDownstreamConnection *dconn; + dconn = reinterpret_cast(arg); + if(events & BEV_EVENT_CONNECTED) { + // Downstream was detached before connection established? + // This may be safe to be left. + if(ENABLE_LOG) { + LOG(INFO) << "Idle downstream connected?" << dconn; + } + return; + } + if(events & BEV_EVENT_EOF) { + if(ENABLE_LOG) { + LOG(INFO) << "Idle downstream connection EOF " << dconn; + } + } else if(events & BEV_EVENT_TIMEOUT) { + if(ENABLE_LOG) { + LOG(INFO) << "Idle downstream connection timeout " << dconn; + } + } else if(events & BEV_EVENT_ERROR) { + if(ENABLE_LOG) { + LOG(INFO) << "Idle downstream connection error " << dconn; + } + } + ClientHandler *client_handler = dconn->get_client_handler(); + client_handler->remove_downstream_connection(dconn); + delete dconn; +} +} // namespace + +void HttpDownstreamConnection::detach_downstream(Downstream *downstream) +{ + if(ENABLE_LOG) { + LOG(INFO) << "Detaching downstream connection " << this << " from " + << "downstream " << downstream; + } + downstream->set_downstream_connection(0); + downstream_ = 0; + ioctrl_.force_resume_read(); + bufferevent_enable(bev_, EV_READ); + bufferevent_setcb(bev_, 0, 0, idle_eventcb, this); + // On idle state, just enable read timeout. Normally idle downstream + // connection will get EOF from the downstream server and closed. + bufferevent_set_timeouts(bev_, + &get_config()->downstream_idle_read_timeout, + &get_config()->downstream_write_timeout); + client_handler_->pool_downstream_connection(this); +} + +bufferevent* HttpDownstreamConnection::get_bev() +{ + return bev_; +} + +void HttpDownstreamConnection::pause_read(IOCtrlReason reason) +{ + ioctrl_.pause_read(reason); +} + +bool HttpDownstreamConnection::resume_read(IOCtrlReason reason) +{ + return ioctrl_.resume_read(reason); +} + +void HttpDownstreamConnection::force_resume_read() +{ + ioctrl_.force_resume_read(); +} + +bool HttpDownstreamConnection::get_output_buffer_full() +{ + evbuffer *output = bufferevent_get_output(bev_); + return evbuffer_get_length(output) >= Downstream::OUTPUT_UPPER_THRES; +} + +namespace { +int htp_hdrs_completecb(http_parser *htp) +{ + Downstream *downstream; + downstream = reinterpret_cast(htp->data); + downstream->set_response_http_status(htp->status_code); + downstream->set_response_major(htp->http_major); + downstream->set_response_minor(htp->http_minor); + downstream->set_response_connection_close(!http_should_keep_alive(htp)); + downstream->set_response_state(Downstream::HEADER_COMPLETE); + if(downstream->get_upstream()->on_downstream_header_complete(downstream) + != 0) { + return -1; + } + unsigned int status = downstream->get_response_http_status(); + // Ignore the response body. HEAD response may contain + // Content-Length or Transfer-Encoding: chunked. Some server send + // 304 status code with nonzero Content-Length, but without response + // body. See + // http://tools.ietf.org/html/draft-ietf-httpbis-p1-messaging-20#section-3.3 + return downstream->get_request_method() == "HEAD" || + (100 <= status && status <= 199) || status == 204 || + status == 304 ? 1 : 0; +} +} // namespace + +namespace { +int htp_hdr_keycb(http_parser *htp, const char *data, size_t len) +{ + Downstream *downstream; + downstream = reinterpret_cast(htp->data); + if(downstream->get_response_header_key_prev()) { + downstream->append_last_response_header_key(data, len); + } else { + downstream->add_response_header(std::string(data, len), ""); + } + return 0; +} +} // namespace + +namespace { +int htp_hdr_valcb(http_parser *htp, const char *data, size_t len) +{ + Downstream *downstream; + downstream = reinterpret_cast(htp->data); + if(downstream->get_response_header_key_prev()) { + downstream->set_last_response_header_value(std::string(data, len)); + } else { + downstream->append_last_response_header_value(data, len); + } + return 0; +} +} // namespace + +namespace { +int htp_bodycb(http_parser *htp, const char *data, size_t len) +{ + Downstream *downstream; + downstream = reinterpret_cast(htp->data); + + return downstream->get_upstream()->on_downstream_body + (downstream, reinterpret_cast(data), len); +} +} // namespace + +namespace { +int htp_msg_completecb(http_parser *htp) +{ + Downstream *downstream; + downstream = reinterpret_cast(htp->data); + + downstream->set_response_state(Downstream::MSG_COMPLETE); + return downstream->get_upstream()->on_downstream_body_complete(downstream); +} +} // namespace + +namespace { +http_parser_settings htp_hooks = { + 0, /*http_cb on_message_begin;*/ + 0, /*http_data_cb on_url;*/ + htp_hdr_keycb, /*http_data_cb on_header_field;*/ + htp_hdr_valcb, /*http_data_cb on_header_value;*/ + htp_hdrs_completecb, /*http_cb on_headers_complete;*/ + htp_bodycb, /*http_data_cb on_body;*/ + htp_msg_completecb /*http_cb on_message_complete;*/ +}; +} // namespace + +int HttpDownstreamConnection::on_read() +{ + evbuffer *input = bufferevent_get_input(bev_); + unsigned char *mem = evbuffer_pullup(input, -1); + + size_t nread = http_parser_execute(response_htp_, &htp_hooks, + reinterpret_cast(mem), + evbuffer_get_length(input)); + + evbuffer_drain(input, nread); + http_errno htperr = HTTP_PARSER_ERRNO(response_htp_); + if(htperr == HPE_OK) { + return 0; + } else { + if(ENABLE_LOG) { + LOG(INFO) << "Downstream HTTP parser failure: " + << "(" << http_errno_name(htperr) << ") " + << http_errno_description(htperr); + } + return SHRPX_ERR_HTTP_PARSE; + } +} + +int HttpDownstreamConnection::on_write() { return 0; } diff --git a/src/shrpx_http_downstream_connection.h b/src/shrpx_http_downstream_connection.h index 285a866..f99bf80 100644 --- a/src/shrpx_http_downstream_connection.h +++ b/src/shrpx_http_downstream_connection.h @@ -27,7 +27,13 @@ #include "shrpx.h" +#include +#include + +#include "http-parser/http_parser.h" + #include "shrpx_downstream_connection.h" +#include "shrpx_io_control.h" namespace shrpx { @@ -36,12 +42,26 @@ public: HttpDownstreamConnection(ClientHandler *client_handler); virtual ~HttpDownstreamConnection(); virtual int attach_downstream(Downstream *downstream); + virtual void detach_downstream(Downstream *downstream); virtual int push_request_headers(); virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen); virtual int end_upload_data(); - virtual int on_connect(); + virtual void pause_read(IOCtrlReason reason); + virtual bool resume_read(IOCtrlReason reason); + virtual void force_resume_read(); + + virtual bool get_output_buffer_full(); + + virtual int on_read(); + virtual int on_write(); + + bufferevent* get_bev(); +private: + bufferevent *bev_; + IOControl ioctrl_; + http_parser *response_htp_; }; } // namespace shrpx diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 5b0c2f0..d6c3a50 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -317,13 +317,15 @@ void HttpsUpstream::pause_read(IOCtrlReason reason) ioctrl_.pause_read(reason); } -void HttpsUpstream::resume_read(IOCtrlReason reason) +int HttpsUpstream::resume_read(IOCtrlReason reason) { if(ioctrl_.resume_read(reason)) { // Process remaining data in input buffer here because these bytes // are not notified by readcb until new data arrive. http_parser_pause(htp_, 0); - on_read(); + return on_read(); + } else { + return 0; } } @@ -335,12 +337,10 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) HttpsUpstream *upstream; upstream = static_cast(downstream->get_upstream()); int rv; - if(get_config()->client_mode) { - rv = reinterpret_cast(dconn)->on_read(); - } else { - rv = downstream->parse_http_response(); - } - if(rv == 0) { + rv = downstream->on_read(); + if(downstream->get_response_state() == Downstream::MSG_RESET) { + delete upstream->get_client_handler(); + } else if(rv == 0) { if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { if(downstream->get_response_connection_close()) { // Connection close @@ -379,8 +379,6 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) // We already sent HTTP response headers to upstream // client. Just close the upstream connection. delete upstream->get_client_handler(); - } else if(downstream->get_response_state() == Downstream::MSG_RESET) { - delete upstream->get_client_handler(); } else { // We did not sent any HTTP response, so sent error // response. Cannot reuse downstream connection in this case. @@ -406,14 +404,6 @@ void https_downstream_writecb(bufferevent *bev, void *ptr) HttpsUpstream *upstream; upstream = static_cast(downstream->get_upstream()); upstream->resume_read(SHRPX_NO_BUFFER); - if(get_config()->client_mode) { - int rv; - rv = reinterpret_cast(dconn)->on_write(); - if(rv != 0) { - delete upstream->get_client_handler(); - return; - } - } } } // namespace @@ -429,11 +419,6 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) LOG(INFO) << "Downstream connection established. downstream " << downstream; } - if(dconn->on_connect() != 0) { - // TODO Return error status 502 - delete upstream->get_client_handler(); - return; - } } else if(events & BEV_EVENT_EOF) { if(ENABLE_LOG) { LOG(INFO) << "Downstream EOF. stream_id=" diff --git a/src/shrpx_https_upstream.h b/src/shrpx_https_upstream.h index 9bc50c4..bbea055 100644 --- a/src/shrpx_https_upstream.h +++ b/src/shrpx_https_upstream.h @@ -57,7 +57,7 @@ public: int error_reply(int status_code); virtual void pause_read(IOCtrlReason reason); - virtual void resume_read(IOCtrlReason reason); + virtual int resume_read(IOCtrlReason reason); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, diff --git a/src/shrpx_listen_handler.cc b/src/shrpx_listen_handler.cc index b2f5e9c..3c38f87 100644 --- a/src/shrpx_listen_handler.cc +++ b/src/shrpx_listen_handler.cc @@ -36,6 +36,7 @@ #include "shrpx_ssl.h" #include "shrpx_worker.h" #include "shrpx_config.h" +#include "shrpx_spdy_session.h" namespace shrpx { @@ -45,7 +46,8 @@ ListenHandler::ListenHandler(event_base *evbase) ssl::create_ssl_client_context() : ssl::create_ssl_context()), worker_round_robin_cnt_(0), workers_(0), - num_worker_(0) + num_worker_(0), + spdy_(0) {} ListenHandler::~ListenHandler() @@ -93,8 +95,11 @@ int ListenHandler::accept_connection(evutil_socket_t fd, LOG(INFO) << " Accepted connection. fd=" << fd; } if(num_worker_ == 0) { - /*ClientHandler* client = */ - ssl::accept_ssl_connection(evbase_, ssl_ctx_, fd, addr, addrlen); + ClientHandler* client = + ssl::accept_ssl_connection(evbase_, ssl_ctx_, fd, addr, addrlen); + if(get_config()->client_mode) { + client->set_spdy_session(spdy_); + } } else { size_t idx = worker_round_robin_cnt_ % num_worker_; ++worker_round_robin_cnt_; @@ -114,4 +119,12 @@ event_base* ListenHandler::get_evbase() const return evbase_; } +int ListenHandler::create_spdy_session() +{ + int rv; + spdy_ = new SpdySession(evbase_, ssl_ctx_); + rv = spdy_->init_notification(); + return rv; +} + } // namespace shrpx diff --git a/src/shrpx_listen_handler.h b/src/shrpx_listen_handler.h index 1c9e066..80409bc 100644 --- a/src/shrpx_listen_handler.h +++ b/src/shrpx_listen_handler.h @@ -42,6 +42,8 @@ struct WorkerInfo { bufferevent *bev; }; +class SpdySession; + class ListenHandler { public: ListenHandler(event_base *evbase); @@ -49,14 +51,18 @@ public: int accept_connection(evutil_socket_t fd, sockaddr *addr, int addrlen); void create_worker_thread(size_t num); event_base* get_evbase() const; + int create_spdy_session(); private: event_base *evbase_; // In client-mode, this is for backend SPDY connection. Otherwise, - // frontend SSL/TLS connection. + // for frontend. SSL_CTX *ssl_ctx_; unsigned int worker_round_robin_cnt_; WorkerInfo *workers_; size_t num_worker_; + // Shared SPDY session. NULL if not client mode or + // multi-threaded. In multi-threaded case, see shrpx_worker.cc. + SpdySession *spdy_; }; } // namespace shrpx diff --git a/src/shrpx_spdy_downstream_connection.cc b/src/shrpx_spdy_downstream_connection.cc index d5e51dc..b6c78c4 100644 --- a/src/shrpx_spdy_downstream_connection.cc +++ b/src/shrpx_spdy_downstream_connection.cc @@ -30,12 +30,15 @@ #include +#include + #include "shrpx_client_handler.h" #include "shrpx_upstream.h" #include "shrpx_downstream.h" #include "shrpx_config.h" #include "shrpx_error.h" #include "shrpx_http.h" +#include "shrpx_spdy_session.h" #include "util.h" using namespace spdylay; @@ -45,36 +48,23 @@ namespace shrpx { SpdyDownstreamConnection::SpdyDownstreamConnection (ClientHandler *client_handler) : DownstreamConnection(client_handler), - ssl_(0), - session_(0), - request_body_buf_(0) + spdy_(client_handler->get_spdy_session()), + request_body_buf_(0), + sd_(0) {} SpdyDownstreamConnection::~SpdyDownstreamConnection() { - spdylay_session_del(session_); - int fd = -1; - if(ssl_) { - fd = SSL_get_fd(ssl_); - SSL_shutdown(ssl_); - } - if(bev_) { - // We want to deallocate bev_ between SSL_shutdown and - // SSL_free. This might not be necessary for recent libevent. - bufferevent_disable(bev_, EV_READ | EV_WRITE); - bufferevent_free(bev_); - bev_ = 0; - } - if(ssl_) { - SSL_free(ssl_); - } - if(fd != -1) { - shutdown(fd, SHUT_WR); - close(fd); - } if(request_body_buf_) { evbuffer_free(request_body_buf_); } + // TODO need RST_STREAM? + spdy_->remove_downstream_connection(this); + // Downstream and DownstreamConnection may be deleted + // asynchronously. + if(downstream_) { + downstream_->set_downstream_connection(0); + } } int SpdyDownstreamConnection::init_request_body_buf() @@ -105,47 +95,30 @@ int SpdyDownstreamConnection::attach_downstream(Downstream *downstream) if(init_request_body_buf() == -1) { return -1; } - Upstream *upstream = downstream->get_upstream(); - if(!bev_) { - event_base *evbase = client_handler_->get_evbase(); - ssl_ = SSL_new(client_handler_->get_ssl_client_ctx()); - if(!ssl_) { - LOG(ERROR) << "SSL_new() failed: " - << ERR_error_string(ERR_get_error(), NULL); - return -1; - } - bev_ = bufferevent_openssl_socket_new(evbase, -1, ssl_, - BUFFEREVENT_SSL_CONNECTING, - BEV_OPT_DEFER_CALLBACKS); - int rv = bufferevent_socket_connect - (bev_, - // TODO maybe not thread-safe? - const_cast(&get_config()->downstream_addr.sa), - get_config()->downstream_addrlen); - if(rv != 0) { - bufferevent_free(bev_); - bev_ = 0; - return SHRPX_ERR_NETWORK; - } - if(ENABLE_LOG) { - LOG(INFO) << "Connecting to downstream server " << this; - } + spdy_->add_downstream_connection(this); + if(spdy_->get_state() == SpdySession::DISCONNECTED) { + spdy_->notify(); } downstream->set_downstream_connection(this); downstream_ = downstream; - bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK); - bufferevent_enable(bev_, EV_READ); - bufferevent_setcb(bev_, - upstream->get_downstream_readcb(), - upstream->get_downstream_writecb(), - upstream->get_downstream_eventcb(), this); - bufferevent_set_timeouts(bev_, - &get_config()->downstream_read_timeout, - &get_config()->downstream_write_timeout); return 0; } +void SpdyDownstreamConnection::detach_downstream(Downstream *downstream) +{ + if(ENABLE_LOG) { + LOG(INFO) << "Detaching spdy downstream connection " << this << " from " + << "downstream " << downstream; + } + downstream->set_downstream_connection(0); + downstream_ = 0; + + // TODO do something to SpdySession? RST_STREAM? + + client_handler_->pool_downstream_connection(this); +} + namespace { ssize_t spdy_data_read_callback(spdylay_session *session, int32_t stream_id, @@ -154,6 +127,12 @@ ssize_t spdy_data_read_callback(spdylay_session *session, spdylay_data_source *source, void *user_data) { + StreamData *sd; + sd = reinterpret_cast + (spdylay_session_get_stream_user_data(session, stream_id)); + if(!sd || !sd->dconn) { + return SPDYLAY_ERR_DEFERRED; + } SpdyDownstreamConnection *dconn; dconn = reinterpret_cast(source->ptr); Downstream *downstream = dconn->get_downstream(); @@ -163,13 +142,25 @@ ssize_t spdy_data_read_callback(spdylay_session *session, return SPDYLAY_ERR_DEFERRED; } evbuffer *body = dconn->get_request_body_buf(); - int nread = evbuffer_remove(body, buf, length); - if(nread == 0 && - downstream->get_request_state() == Downstream::MSG_COMPLETE) { - *eof = 1; - } - if(nread == 0 && *eof != 1) { - return SPDYLAY_ERR_DEFERRED; + int nread = 0; + for(;;) { + nread = evbuffer_remove(body, buf, length); + if(nread == 0) { + if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { + *eof = 1; + break; + } else { + if(downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER) == -1) { + // In this case, downstream may be deleted. + return SPDYLAY_ERR_DEFERRED; + } + if(evbuffer_get_length(body) == 0) { + return SPDYLAY_ERR_DEFERRED; + } + } + } else { + break; + } } return nread; } @@ -188,10 +179,9 @@ void copy_url_component(std::string& dest, http_parser_url *u, int field, int SpdyDownstreamConnection::push_request_headers() { int rv; - if(!session_) { - // If the connection to the backend has not been established, - // session_ is not initialized. This function will be called again - // just after SSL/TLS handshake is done. + if(spdy_->get_state() != SpdySession::CONNECTED) { + // The SPDY session to the backend has not been established. This + // function will be called again just after it is established. return 0; } if(!downstream_) { @@ -318,16 +308,17 @@ int SpdyDownstreamConnection::push_request_headers() spdylay_data_provider data_prd; data_prd.source.ptr = this; data_prd.read_callback = spdy_data_read_callback; - rv = spdylay_submit_request(session_, 0, nv, &data_prd, 0); + rv = spdy_->submit_request(this, 0, nv, &data_prd); } else { - rv = spdylay_submit_request(session_, 0, nv, 0, 0); + rv = spdy_->submit_request(this, 0, nv, 0); } delete [] nv; if(rv != 0) { LOG(FATAL) << "spdylay_submit_request() failed"; return -1; } - return send(); + spdy_->notify(); + return 0; } int SpdyDownstreamConnection::push_upload_data_chunk(const uint8_t *data, @@ -339,413 +330,36 @@ int SpdyDownstreamConnection::push_upload_data_chunk(const uint8_t *data, return -1; } if(downstream_->get_downstream_stream_id() != -1) { - spdylay_session_resume_data(session_, - downstream_->get_downstream_stream_id()); - rv = send(); + rv = spdy_->resume_data(this); if(rv != 0) { return -1; } - } - size_t bodylen = evbuffer_get_length(request_body_buf_); - if(bodylen > Downstream::DOWNSTREAM_OUTPUT_UPPER_THRES) { - downstream_->get_upstream()->pause_read(SHRPX_NO_BUFFER); + spdy_->notify(); } return 0; } int SpdyDownstreamConnection::end_upload_data() { + int rv; if(downstream_->get_downstream_stream_id() != -1) { - spdylay_session_resume_data(session_, - downstream_->get_downstream_stream_id()); - return send(); - } else { - return 0; - } -} - -namespace { -ssize_t send_callback(spdylay_session *session, - const uint8_t *data, size_t len, int flags, - void *user_data) -{ - int rv; - SpdyDownstreamConnection *dconn; - dconn = reinterpret_cast(user_data); - - bufferevent *bev = dconn->get_bev(); - evbuffer *output = bufferevent_get_output(bev); - // Check buffer length and return WOULDBLOCK if it is large enough. - if(evbuffer_get_length(output) > Downstream::DOWNSTREAM_OUTPUT_UPPER_THRES) { - return SPDYLAY_ERR_WOULDBLOCK; - } - - rv = evbuffer_add(output, data, len); - if(rv == -1) { - LOG(FATAL) << "evbuffer_add() failed"; - return SPDYLAY_ERR_CALLBACK_FAILURE; - } else { - return len; - } -} -} // namespace - -namespace { -ssize_t recv_callback(spdylay_session *session, - uint8_t *data, size_t len, int flags, void *user_data) -{ - SpdyDownstreamConnection *dconn; - dconn = reinterpret_cast(user_data); - - bufferevent *bev = dconn->get_bev(); - evbuffer *input = bufferevent_get_input(bev); - int nread = evbuffer_remove(input, data, len); - if(nread == -1) { - return SPDYLAY_ERR_CALLBACK_FAILURE; - } else if(nread == 0) { - return SPDYLAY_ERR_WOULDBLOCK; - } else { - return nread; - } -} -} // namespace - -namespace { -void on_stream_close_callback -(spdylay_session *session, int32_t stream_id, spdylay_status_code status_code, - void *user_data) -{ - int rv; - if(ENABLE_LOG) { - LOG(INFO) << "Downstream spdy Stream " << stream_id << " is being closed"; - } - SpdyDownstreamConnection *dconn; - dconn = reinterpret_cast(user_data); - Downstream *downstream = dconn->get_downstream(); - if(!downstream || downstream->get_downstream_stream_id() != stream_id) { - // We might get this close callback when pushed streams are - // closed. - return; - } - downstream->set_response_state(Downstream::MSG_COMPLETE); - rv = downstream->get_upstream()->on_downstream_body_complete(downstream); - if(rv != 0) { - downstream->set_response_state(Downstream::MSG_RESET); - return; - } -} -} // namespace - -namespace { -void on_ctrl_recv_callback -(spdylay_session *session, spdylay_frame_type type, spdylay_frame *frame, - void *user_data) -{ - int rv; - SpdyDownstreamConnection *dconn; - dconn = reinterpret_cast(user_data); - Downstream *downstream = dconn->get_downstream(); - switch(type) { - case SPDYLAY_SYN_STREAM: - if(ENABLE_LOG) { - LOG(INFO) << "Downstream spdy received upstream SYN_STREAM stream_id=" - << frame->syn_stream.stream_id; - } - // We just respond pushed stream with RST_STREAM. - spdylay_submit_rst_stream(session, frame->syn_stream.stream_id, - SPDYLAY_REFUSED_STREAM); - break; - case SPDYLAY_RST_STREAM: - if(downstream && - downstream->get_downstream_stream_id() == frame->rst_stream.stream_id) { - // If we got RST_STREAM, just flag MSG_RESET to indicate - // upstream connection must be terminated. - downstream->set_response_state(Downstream::MSG_RESET); - } - break; - case SPDYLAY_SYN_REPLY: { - if(!downstream || - downstream->get_downstream_stream_id() != frame->syn_reply.stream_id) { - break; - } - char **nv = frame->syn_reply.nv; - const char *status = 0; - const char *version = 0; - const char *content_length = 0; - for(size_t i = 0; nv[i]; i += 2) { - if(strcmp(nv[i], ":status") == 0) { - unsigned int code = strtoul(nv[i+1], 0, 10); - downstream->set_response_http_status(code); - status = nv[i+1]; - } else if(strcmp(nv[i], ":version") == 0) { - // We assume for now that most version is HTTP/1.1 from - // SPDY. So just check if it is HTTP/1.0 and then set response - // minor as so. - downstream->set_response_major(1); - if(util::strieq(nv[i+1], "HTTP/1.0")) { - downstream->set_response_minor(0); - } else { - downstream->set_response_minor(1); - } - version = nv[i+1]; - } else if(nv[i][0] != ':') { - if(strcmp(nv[i], "content-length") == 0) { - content_length = nv[i+1]; - } - downstream->add_response_header(nv[i], nv[i+1]); - } - } - if(!status || !version) { - spdylay_submit_rst_stream(session, frame->syn_reply.stream_id, - SPDYLAY_PROTOCOL_ERROR); - downstream->set_response_state(Downstream::MSG_RESET); - return; - } - - if(!content_length && downstream->get_request_method() != "HEAD" && - downstream->get_request_method() != "CONNECT") { - unsigned int status; - status = downstream->get_response_http_status(); - if(!((100 <= status && status <= 199) || status == 204 || - status == 304)) { - // In SPDY, we are supporsed not to receive - // transfer-encoding. - downstream->add_response_header("transfer-encoding", "chunked"); - } - } - - if(ENABLE_LOG) { - std::stringstream ss; - for(size_t i = 0; nv[i]; i += 2) { - ss << nv[i] << ": " << nv[i+1] << "\n"; - } - LOG(INFO) << "Downstream spdy response headers id=" - << frame->syn_reply.stream_id - << "\n" << ss.str(); - } - - Upstream *upstream = downstream->get_upstream(); - downstream->set_response_state(Downstream::HEADER_COMPLETE); - rv = upstream->on_downstream_header_complete(downstream); + rv = spdy_->resume_data(this); if(rv != 0) { - spdylay_submit_rst_stream(session, frame->syn_reply.stream_id, - SPDYLAY_PROTOCOL_ERROR); - downstream->set_response_state(Downstream::MSG_RESET); - return; + return -1; } - break; + spdy_->notify(); } - default: - break; - } -} -} // namespace - -namespace { -void on_data_chunk_recv_callback(spdylay_session *session, - uint8_t flags, int32_t stream_id, - const uint8_t *data, size_t len, - void *user_data) -{ - int rv; - SpdyDownstreamConnection *dconn; - dconn = reinterpret_cast(user_data); - Downstream *downstream = dconn->get_downstream(); - if(!downstream || downstream->get_downstream_stream_id() != stream_id) { - return; - } - // TODO No manual flow control at the moment. - Upstream *upstream = downstream->get_upstream(); - rv = upstream->on_downstream_body(downstream, data, len); - if(rv != 0) { - spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR); - downstream->set_response_state(Downstream::MSG_RESET); - } -} -} // namespace - -namespace { -void before_ctrl_send_callback(spdylay_session *session, - spdylay_frame_type type, - spdylay_frame *frame, - void *user_data) -{ - if(type == SPDYLAY_SYN_STREAM) { - SpdyDownstreamConnection *dconn; - dconn = reinterpret_cast(user_data); - Downstream *downstream = dconn->get_downstream(); - if(downstream) { - downstream->set_downstream_stream_id(frame->syn_stream.stream_id); - } - } -} -} // namespace - -namespace { -void on_ctrl_not_send_callback(spdylay_session *session, - spdylay_frame_type type, - spdylay_frame *frame, - int error_code, void *user_data) -{ - LOG(WARNING) << "Failed to send control frame type=" << type << ", " - << "error_code=" << error_code << ":" - << spdylay_strerror(error_code); - if(type == SPDYLAY_SYN_STREAM) { - // To avoid stream hanging around, flag Downstream::MSG_RESET and - // terminate the upstream and downstream connections. - SpdyDownstreamConnection *dconn; - dconn = reinterpret_cast(user_data); - Downstream *downstream = dconn->get_downstream(); - int32_t stream_id = frame->syn_stream.stream_id; - if(!downstream || downstream->get_downstream_stream_id() != stream_id) { - return; - } - downstream->set_response_state(Downstream::MSG_RESET); - } -} -} // namespace - -namespace { -void on_ctrl_recv_parse_error_callback(spdylay_session *session, - spdylay_frame_type type, - const uint8_t *head, size_t headlen, - const uint8_t *payload, - size_t payloadlen, int error_code, - void *user_data) -{ - if(ENABLE_LOG) { - LOG(INFO) << "Failed to parse received control frame. type=" << type - << ", error_code=" << error_code << ":" - << spdylay_strerror(error_code); - } -} -} // namespace - -namespace { -void on_unknown_ctrl_recv_callback(spdylay_session *session, - const uint8_t *head, size_t headlen, - const uint8_t *payload, size_t payloadlen, - void *user_data) -{ - if(ENABLE_LOG) { - LOG(INFO) << "Received unknown control frame."; - } -} -} // namespace - -int SpdyDownstreamConnection::on_connect() -{ - int rv; - const unsigned char *next_proto = 0; - unsigned int next_proto_len; - SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len); - - if(ENABLE_LOG) { - std::string proto(next_proto, next_proto+next_proto_len); - LOG(INFO) << "Downstream negotiated next protocol: " << proto; - } - uint16_t version = spdylay_npn_get_version(next_proto, next_proto_len); - if(!version) { - return -1; - } - spdylay_session_callbacks callbacks; - memset(&callbacks, 0, sizeof(callbacks)); - callbacks.send_callback = send_callback; - callbacks.recv_callback = recv_callback; - callbacks.on_stream_close_callback = on_stream_close_callback; - callbacks.on_ctrl_recv_callback = on_ctrl_recv_callback; - callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback; - callbacks.before_ctrl_send_callback = before_ctrl_send_callback; - callbacks.on_ctrl_not_send_callback = on_ctrl_not_send_callback; - callbacks.on_ctrl_recv_parse_error_callback = - on_ctrl_recv_parse_error_callback; - callbacks.on_unknown_ctrl_recv_callback = on_unknown_ctrl_recv_callback; - - rv = spdylay_session_client_new(&session_, version, &callbacks, this); - if(rv != 0) { - return -1; - } - - // TODO Send initial window size when manual flow control is - // implemented. - spdylay_settings_entry entry[1]; - entry[0].settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS; - entry[0].value = get_config()->spdy_max_concurrent_streams; - entry[0].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE; - rv = spdylay_submit_settings - (session_, SPDYLAY_FLAG_SETTINGS_NONE, - entry, sizeof(entry)/sizeof(spdylay_settings_entry)); - if(rv != 0) { - return -1; - } - rv = send(); - if(rv != 0) { - return -1; - } - - // We may have pending request - push_request_headers(); - return 0; } int SpdyDownstreamConnection::on_read() { - int rv = 0; - if((rv = spdylay_session_recv(session_)) < 0) { - if(rv != SPDYLAY_ERR_EOF) { - LOG(ERROR) << "spdylay_session_recv() returned error: " - << spdylay_strerror(rv); - } - } else if((rv = spdylay_session_send(session_)) < 0) { - LOG(ERROR) << "spdylay_session_send() returned error: " - << spdylay_strerror(rv); - } - // if(rv == 0) { - // if(spdylay_session_want_read(session_) == 0 && - // spdylay_session_want_write(session_) == 0) { - // if(ENABLE_LOG) { - // LOG(INFO) << "No more read/write for this SPDY session"; - // } - // rv = -1; - // } - // } - - if(rv == SPDYLAY_ERR_EOF) { - if(downstream_) { - downstream_->set_response_connection_close(true); - } - rv = 0; - } else if(rv != 0) { - if(downstream_) { - downstream_->set_response_state(Downstream::MSG_RESET); - } - } - return rv; + return 0; } int SpdyDownstreamConnection::on_write() { - return send(); -} - -int SpdyDownstreamConnection::send() -{ - int rv = 0; - if((rv = spdylay_session_send(session_)) < 0) { - LOG(ERROR) << "spdylay_session_send() returned error: " - << spdylay_strerror(rv); - } - // if(rv == 0) { - // if(spdylay_session_want_read(session_) == 0 && - // spdylay_session_want_write(session_) == 0) { - // if(ENABLE_LOG) { - // LOG(INFO) << "No more read/write for this SPDY session"; - // } - // rv = -1; - // } - // } - return rv; + return 0; } evbuffer* SpdyDownstreamConnection::get_request_body_buf() const @@ -753,4 +367,33 @@ evbuffer* SpdyDownstreamConnection::get_request_body_buf() const return request_body_buf_; } +void SpdyDownstreamConnection::attach_stream_data(StreamData *sd) +{ + assert(sd_ == 0 && sd->dconn == 0); + sd_ = sd; + sd_->dconn = this; +} + +StreamData* SpdyDownstreamConnection::detach_stream_data() +{ + if(sd_) { + StreamData *sd = sd_; + sd_ = 0; + sd->dconn = 0; + return sd; + } else { + return 0; + } +} + +bool SpdyDownstreamConnection::get_output_buffer_full() +{ + if(request_body_buf_) { + return + evbuffer_get_length(request_body_buf_) >= Downstream::OUTPUT_UPPER_THRES; + } else { + return false; + } +} + } // namespace shrpx diff --git a/src/shrpx_spdy_downstream_connection.h b/src/shrpx_spdy_downstream_connection.h index 374c10c..450bd08 100644 --- a/src/shrpx_spdy_downstream_connection.h +++ b/src/shrpx_spdy_downstream_connection.h @@ -35,28 +35,39 @@ namespace shrpx { +struct StreamData; +class SpdySession; + class SpdyDownstreamConnection : public DownstreamConnection { public: SpdyDownstreamConnection(ClientHandler *client_handler); virtual ~SpdyDownstreamConnection(); virtual int attach_downstream(Downstream *downstream); + virtual void detach_downstream(Downstream *downstream); virtual int push_request_headers(); virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen); virtual int end_upload_data(); - virtual int on_connect(); + virtual void pause_read(IOCtrlReason reason) {} + virtual bool resume_read(IOCtrlReason reason) { return true; } + virtual void force_resume_read() {} - int on_read(); - int on_write(); + virtual bool get_output_buffer_full(); + + virtual int on_read(); + virtual int on_write(); int send(); int init_request_body_buf(); evbuffer* get_request_body_buf() const; + + void attach_stream_data(StreamData *sd); + StreamData* detach_stream_data(); private: - SSL *ssl_; - spdylay_session *session_; + SpdySession *spdy_; evbuffer *request_body_buf_; + StreamData *sd_; }; } // namespace shrpx diff --git a/src/shrpx_spdy_session.cc b/src/shrpx_spdy_session.cc new file mode 100644 index 0000000..8a650a6 --- /dev/null +++ b/src/shrpx_spdy_session.cc @@ -0,0 +1,822 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_spdy_session.h" + +#include +#include + +#include + +#include + +#include "shrpx_upstream.h" +#include "shrpx_downstream.h" +#include "shrpx_config.h" +#include "shrpx_error.h" +#include "shrpx_spdy_downstream_connection.h" +#include "shrpx_client_handler.h" +#include "util.h" + +using namespace spdylay; + +namespace shrpx { + +SpdySession::SpdySession(event_base *evbase, SSL_CTX *ssl_ctx) + : evbase_(evbase), + ssl_ctx_(ssl_ctx), + ssl_(0), + session_(0), + bev_(0), + state_(DISCONNECTED), + notified_(false), + wrbev_(0), + rdbev_(0) +{} + +SpdySession::~SpdySession() +{ + disconnect(); +} + +int SpdySession::disconnect() +{ + if(ENABLE_LOG) { + LOG(INFO) << "Downstream spdy session disconnecting " << this; + } + spdylay_session_del(session_); + session_ = 0; + + int fd = -1; + if(ssl_) { + fd = SSL_get_fd(ssl_); + SSL_shutdown(ssl_); + } + if(bev_) { + bufferevent_disable(bev_, EV_READ | EV_WRITE); + bufferevent_free(bev_); + bev_ = 0; + } + if(ssl_) { + SSL_free(ssl_); + } + ssl_ = 0; + + if(fd != -1) { + shutdown(fd, SHUT_WR); + close(fd); + } + + notified_ = false; + state_ = DISCONNECTED; + + // Delete all client handler associated to Downstream. When deleting + // SpdyDownstreamConnection, it calls this object's + // remove_downstream_connection(). So first dump them in vector and + // iterate and delete them. + std::vector vec(dconns_.begin(), dconns_.end()); + for(size_t i = 0; i < vec.size(); ++i) { + remove_downstream_connection(vec[i]); + delete vec[i]->get_client_handler(); + } + dconns_.clear(); + for(std::set::iterator i = streams_.begin(), + eoi = streams_.end(); i != eoi; ++i) { + delete *i; + } + streams_.clear(); + return 0; +} + +namespace { +void notify_readcb(bufferevent *bev, void *arg) +{ + int rv; + SpdySession *spdy = reinterpret_cast(arg); + spdy->clear_notify(); + switch(spdy->get_state()) { + case SpdySession::DISCONNECTED: + rv = spdy->initiate_connection(); + if(rv != 0) { + LOG(FATAL) << "Downstream spdy could not initiate connection " << spdy; + DIE(); + } + break; + case SpdySession::CONNECTED: + rv = spdy->send(); + if(rv != 0) { + spdy->disconnect(); + } + break; + } +} +} // namespace + +namespace { +void notify_eventcb(bufferevent *bev, short events, void *arg) +{ + // TODO should DIE()? + if(events & BEV_EVENT_EOF) { + LOG(ERROR) << "Connection to main thread lost: eof"; + } + if(events & BEV_EVENT_ERROR) { + LOG(ERROR) << "Connection to main thread lost: network error"; + } +} +} // namespace + +int SpdySession::init_notification() +{ + int rv; + int sockpair[2]; + rv = socketpair(AF_UNIX, SOCK_STREAM, 0, sockpair); + if(rv == -1) { + LOG(FATAL) << "socketpair() failed: " << strerror(errno); + return -1; + } + wrbev_ = bufferevent_socket_new(evbase_, sockpair[0], + BEV_OPT_CLOSE_ON_FREE| + BEV_OPT_DEFER_CALLBACKS); + if(!wrbev_) { + LOG(FATAL) << "bufferevent_socket_new() failed"; + for(int i = 0; i < 2; ++i) { + close(sockpair[i]); + } + return -1; + } + rdbev_ = bufferevent_socket_new(evbase_, sockpair[1], + BEV_OPT_CLOSE_ON_FREE| + BEV_OPT_DEFER_CALLBACKS); + if(!rdbev_) { + LOG(FATAL) << "bufferevent_socket_new() failed"; + close(sockpair[1]); + return -1; + } + bufferevent_enable(rdbev_, EV_READ); + bufferevent_setcb(rdbev_, notify_readcb, 0, notify_eventcb, this); + return 0; +} + +namespace { +void readcb(bufferevent *bev, void *ptr) +{ + int rv; + SpdySession *spdy = reinterpret_cast(ptr); + rv = spdy->on_read(); + if(rv != 0) { + spdy->disconnect(); + } +} +} // namespace + +namespace { +void writecb(bufferevent *bev, void *ptr) +{ + int rv; + SpdySession *spdy = reinterpret_cast(ptr); + rv = spdy->on_write(); + if(rv != 0) { + spdy->disconnect(); + } +} +} // namespace + +namespace { +void eventcb(bufferevent *bev, short events, void *ptr) +{ + SpdySession *spdy = reinterpret_cast(ptr); + if(events & BEV_EVENT_CONNECTED) { + if(ENABLE_LOG) { + LOG(INFO) << "Downstream spdy connection established. " << spdy; + } + spdy->connected(); + if(spdy->on_connect() != 0) { + spdy->disconnect(); + return; + } + } else if(events & BEV_EVENT_EOF) { + if(ENABLE_LOG) { + LOG(INFO) << "Downstream spdy EOF. " << spdy; + } + spdy->disconnect(); + } else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { + if(ENABLE_LOG) { + LOG(INFO) << "Downstream spdy error/timeout. " << spdy; + } + spdy->disconnect(); + } +} +} // namespace + +int SpdySession::initiate_connection() +{ + int rv; + assert(state_ == DISCONNECTED); + if(ENABLE_LOG) { + LOG(INFO) << "Downstream spdy initiating connection " << this; + } + + ssl_ = SSL_new(ssl_ctx_); + if(!ssl_) { + LOG(ERROR) << "SSL_new() failed: " + << ERR_error_string(ERR_get_error(), NULL); + return -1; + } + bev_ = bufferevent_openssl_socket_new(evbase_, -1, ssl_, + BUFFEREVENT_SSL_CONNECTING, + BEV_OPT_DEFER_CALLBACKS); + rv = bufferevent_socket_connect + (bev_, + // TODO maybe not thread-safe? + const_cast(&get_config()->downstream_addr.sa), + get_config()->downstream_addrlen); + if(rv != 0) { + bufferevent_free(bev_); + bev_ = 0; + return SHRPX_ERR_NETWORK; + } + if(ENABLE_LOG) { + LOG(INFO) << "Connecting to downstream " << this; + } + + bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK); + bufferevent_enable(bev_, EV_READ); + bufferevent_setcb(bev_, readcb, writecb, eventcb, this); + // No timeout for SPDY session + + state_ = CONNECTING; + return 0; +} + +void SpdySession::connected() +{ + state_ = CONNECTED; +} + +void SpdySession::add_downstream_connection(SpdyDownstreamConnection *dconn) +{ + dconns_.insert(dconn); +} + +void SpdySession::remove_downstream_connection(SpdyDownstreamConnection *dconn) +{ + dconns_.erase(dconn); + dconn->detach_stream_data(); +} + +void SpdySession::remove_stream_data(StreamData *sd) +{ + streams_.erase(sd); + if(sd->dconn) { + sd->dconn->detach_stream_data(); + } + delete sd; +} + +int SpdySession::submit_request(SpdyDownstreamConnection *dconn, + uint8_t pri, const char **nv, + const spdylay_data_provider *data_prd) +{ + assert(state_ == CONNECTED); + StreamData *sd = new StreamData(); + int rv = spdylay_submit_request(session_, pri, nv, data_prd, sd); + if(rv == 0) { + dconn->attach_stream_data(sd); + streams_.insert(sd); + } else { + LOG(FATAL) << "spdylay_submit_request() failed: " + << spdylay_strerror(rv); + delete sd; + return -1; + } + return 0; +} + +int SpdySession::submit_rst_stream(SpdyDownstreamConnection *docnn, + int32_t stream_id, uint32_t status_code) +{ + assert(state_ == CONNECTED); + int rv = spdylay_submit_rst_stream(session_, stream_id, status_code); + if(rv != 0) { + LOG(FATAL) << "spdylay_submit_rst_stream() failed: " + << spdylay_strerror(rv); + return -1; + } + return 0; +} + +int SpdySession::resume_data(SpdyDownstreamConnection *dconn) +{ + assert(state_ == CONNECTED); + Downstream *downstream = dconn->get_downstream(); + int rv = spdylay_session_resume_data(session_, + downstream->get_downstream_stream_id()); + switch(rv) { + case 0: + case SPDYLAY_ERR_INVALID_ARGUMENT: + return 0; + default: + LOG(FATAL) << "spdylay_resume_session() failed: " + << spdylay_strerror(rv); + return -1; + } +} + +namespace { +void call_downstream_readcb(SpdySession *spdy, Downstream *downstream) +{ + Upstream *upstream = downstream->get_upstream(); + if(upstream) { + (upstream->get_downstream_readcb()) + (spdy->get_bev(), + downstream->get_downstream_connection()); + } +} +} // namespace + +namespace { +ssize_t send_callback(spdylay_session *session, + const uint8_t *data, size_t len, int flags, + void *user_data) +{ + int rv; + SpdySession *spdy = reinterpret_cast(user_data); + + bufferevent *bev = spdy->get_bev(); + evbuffer *output = bufferevent_get_output(bev); + // Check buffer length and return WOULDBLOCK if it is large enough. + if(evbuffer_get_length(output) > Downstream::OUTPUT_UPPER_THRES) { + return SPDYLAY_ERR_WOULDBLOCK; + } + + rv = evbuffer_add(output, data, len); + if(rv == -1) { + LOG(FATAL) << "evbuffer_add() failed"; + return SPDYLAY_ERR_CALLBACK_FAILURE; + } else { + return len; + } +} +} // namespace + +namespace { +ssize_t recv_callback(spdylay_session *session, + uint8_t *data, size_t len, int flags, void *user_data) +{ + SpdySession *spdy = reinterpret_cast(user_data); + + bufferevent *bev = spdy->get_bev(); + evbuffer *input = bufferevent_get_input(bev); + int nread = evbuffer_remove(input, data, len); + if(nread == -1) { + return SPDYLAY_ERR_CALLBACK_FAILURE; + } else if(nread == 0) { + return SPDYLAY_ERR_WOULDBLOCK; + } else { + return nread; + } +} +} // namespace + +namespace { +void on_stream_close_callback +(spdylay_session *session, int32_t stream_id, spdylay_status_code status_code, + void *user_data) +{ + int rv; + if(ENABLE_LOG) { + LOG(INFO) << "Downstream spdy Stream " << stream_id << " is being closed"; + } + SpdySession *spdy = reinterpret_cast(user_data); + StreamData *sd; + sd = reinterpret_cast + (spdylay_session_get_stream_user_data(session, stream_id)); + if(sd == 0) { + // We might get this close callback when pushed streams are + // closed. + return; + } + SpdyDownstreamConnection* dconn = sd->dconn; + if(dconn) { + Downstream *downstream = dconn->get_downstream(); + if(downstream && downstream->get_downstream_stream_id() == stream_id) { + Upstream *upstream = downstream->get_upstream(); + if(status_code == SPDYLAY_OK) { + downstream->set_response_state(Downstream::MSG_COMPLETE); + rv = upstream->on_downstream_body_complete(downstream); + if(rv != 0) { + downstream->set_response_state(Downstream::MSG_RESET); + } + } else { + downstream->set_response_state(Downstream::MSG_RESET); + } + call_downstream_readcb(spdy, downstream); + // dconn may be deleted + } + } + // The life time of StreamData ends here + spdy->remove_stream_data(sd); +} +} // namespace + +namespace { +void on_ctrl_recv_callback +(spdylay_session *session, spdylay_frame_type type, spdylay_frame *frame, + void *user_data) +{ + int rv; + SpdySession *spdy = reinterpret_cast(user_data); + StreamData *sd; + Downstream *downstream; + switch(type) { + case SPDYLAY_SYN_STREAM: + if(ENABLE_LOG) { + LOG(INFO) << "Downstream spdy received upstream SYN_STREAM stream_id=" + << frame->syn_stream.stream_id; + } + // We just respond pushed stream with RST_STREAM. + spdylay_submit_rst_stream(session, frame->syn_stream.stream_id, + SPDYLAY_REFUSED_STREAM); + break; + case SPDYLAY_RST_STREAM: + sd = reinterpret_cast + (spdylay_session_get_stream_user_data(session, + frame->rst_stream.stream_id)); + if(sd && sd->dconn) { + downstream = sd->dconn->get_downstream(); + if(downstream && + downstream->get_downstream_stream_id() == + frame->rst_stream.stream_id) { + // If we got RST_STREAM, just flag MSG_RESET to indicate + // upstream connection must be terminated. + downstream->set_response_state(Downstream::MSG_RESET); + call_downstream_readcb(spdy, downstream); + } + } + break; + case SPDYLAY_SYN_REPLY: { + sd = reinterpret_cast + (spdylay_session_get_stream_user_data(session, + frame->syn_reply.stream_id)); + if(!sd || !sd->dconn) { + spdylay_submit_rst_stream(session, frame->syn_reply.stream_id, + SPDYLAY_INTERNAL_ERROR); + break; + } + downstream = sd->dconn->get_downstream(); + if(!downstream || + downstream->get_downstream_stream_id() != frame->syn_reply.stream_id) { + spdylay_submit_rst_stream(session, frame->syn_reply.stream_id, + SPDYLAY_INTERNAL_ERROR); + break; + } + char **nv = frame->syn_reply.nv; + const char *status = 0; + const char *version = 0; + const char *content_length = 0; + for(size_t i = 0; nv[i]; i += 2) { + if(strcmp(nv[i], ":status") == 0) { + unsigned int code = strtoul(nv[i+1], 0, 10); + downstream->set_response_http_status(code); + status = nv[i+1]; + } else if(strcmp(nv[i], ":version") == 0) { + // We assume for now that most version is HTTP/1.1 from + // SPDY. So just check if it is HTTP/1.0 and then set response + // minor as so. + downstream->set_response_major(1); + if(util::strieq(nv[i+1], "HTTP/1.0")) { + downstream->set_response_minor(0); + } else { + downstream->set_response_minor(1); + } + version = nv[i+1]; + } else if(nv[i][0] != ':') { + if(strcmp(nv[i], "content-length") == 0) { + content_length = nv[i+1]; + } + downstream->add_response_header(nv[i], nv[i+1]); + } + } + if(!status || !version) { + spdylay_submit_rst_stream(session, frame->syn_reply.stream_id, + SPDYLAY_PROTOCOL_ERROR); + downstream->set_response_state(Downstream::MSG_RESET); + call_downstream_readcb(spdy, downstream); + return; + } + + if(!content_length && downstream->get_request_method() != "HEAD" && + downstream->get_request_method() != "CONNECT") { + unsigned int status; + status = downstream->get_response_http_status(); + if(!((100 <= status && status <= 199) || status == 204 || + status == 304)) { + // In SPDY, we are supporsed not to receive + // transfer-encoding. + downstream->add_response_header("transfer-encoding", "chunked"); + } + } + + if(ENABLE_LOG) { + std::stringstream ss; + for(size_t i = 0; nv[i]; i += 2) { + ss << nv[i] << ": " << nv[i+1] << "\n"; + } + LOG(INFO) << "Downstream spdy response headers id=" + << frame->syn_reply.stream_id + << "\n" << ss.str(); + } + + Upstream *upstream = downstream->get_upstream(); + downstream->set_response_state(Downstream::HEADER_COMPLETE); + rv = upstream->on_downstream_header_complete(downstream); + if(rv != 0) { + spdylay_submit_rst_stream(session, frame->syn_reply.stream_id, + SPDYLAY_PROTOCOL_ERROR); + downstream->set_response_state(Downstream::MSG_RESET); + } + call_downstream_readcb(spdy, downstream); + break; + } + default: + break; + } +} +} // namespace + +namespace { +void on_data_chunk_recv_callback(spdylay_session *session, + uint8_t flags, int32_t stream_id, + const uint8_t *data, size_t len, + void *user_data) +{ + int rv; + SpdySession *spdy = reinterpret_cast(user_data); + StreamData *sd; + sd = reinterpret_cast + (spdylay_session_get_stream_user_data(session, stream_id)); + if(!sd || !sd->dconn) { + spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR); + return; + } + Downstream *downstream = sd->dconn->get_downstream(); + if(!downstream || downstream->get_downstream_stream_id() != stream_id) { + spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR); + return; + } + // TODO No manual flow control at the moment. + Upstream *upstream = downstream->get_upstream(); + rv = upstream->on_downstream_body(downstream, data, len); + if(rv != 0) { + spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR); + downstream->set_response_state(Downstream::MSG_RESET); + } + call_downstream_readcb(spdy, downstream); +} +} // namespace + +namespace { +void before_ctrl_send_callback(spdylay_session *session, + spdylay_frame_type type, + spdylay_frame *frame, + void *user_data) +{ + if(type == SPDYLAY_SYN_STREAM) { + StreamData *sd; + sd = reinterpret_cast + (spdylay_session_get_stream_user_data(session, + frame->syn_stream.stream_id)); + if(!sd || !sd->dconn) { + spdylay_submit_rst_stream(session, frame->syn_stream.stream_id, + SPDYLAY_CANCEL); + return; + } + Downstream *downstream = sd->dconn->get_downstream(); + if(downstream) { + downstream->set_downstream_stream_id(frame->syn_stream.stream_id); + } else { + spdylay_submit_rst_stream(session, frame->syn_stream.stream_id, + SPDYLAY_CANCEL); + } + } +} +} // namespace + +namespace { +void on_ctrl_not_send_callback(spdylay_session *session, + spdylay_frame_type type, + spdylay_frame *frame, + int error_code, void *user_data) +{ + LOG(WARNING) << "Failed to send control frame type=" << type << ", " + << "error_code=" << error_code << ":" + << spdylay_strerror(error_code); + if(type == SPDYLAY_SYN_STREAM) { + // To avoid stream hanging around, flag Downstream::MSG_RESET and + // terminate the upstream and downstream connections. + SpdySession *spdy = reinterpret_cast(user_data); + StreamData *sd; + sd = reinterpret_cast + (spdylay_session_get_stream_user_data(session, + frame->syn_stream.stream_id)); + if(!sd) { + return; + } + if(sd->dconn) { + Downstream *downstream = sd->dconn->get_downstream(); + if(!downstream || + downstream->get_downstream_stream_id() != + frame->syn_stream.stream_id) { + return; + } + downstream->set_response_state(Downstream::MSG_RESET); + call_downstream_readcb(spdy, downstream); + } + spdy->remove_stream_data(sd); + } +} +} // namespace + +namespace { +void on_ctrl_recv_parse_error_callback(spdylay_session *session, + spdylay_frame_type type, + const uint8_t *head, size_t headlen, + const uint8_t *payload, + size_t payloadlen, int error_code, + void *user_data) +{ + if(ENABLE_LOG) { + LOG(INFO) << "Failed to parse received control frame. type=" << type + << ", error_code=" << error_code << ":" + << spdylay_strerror(error_code); + } +} +} // namespace + +namespace { +void on_unknown_ctrl_recv_callback(spdylay_session *session, + const uint8_t *head, size_t headlen, + const uint8_t *payload, size_t payloadlen, + void *user_data) +{ + if(ENABLE_LOG) { + LOG(INFO) << "Received unknown control frame."; + } +} +} // namespace + +int SpdySession::on_connect() +{ + int rv; + const unsigned char *next_proto = 0; + unsigned int next_proto_len; + SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len); + + if(ENABLE_LOG) { + std::string proto(next_proto, next_proto+next_proto_len); + LOG(INFO) << "Downstream negotiated next protocol: " << proto; + } + uint16_t version = spdylay_npn_get_version(next_proto, next_proto_len); + if(!version) { + return -1; + } + spdylay_session_callbacks callbacks; + memset(&callbacks, 0, sizeof(callbacks)); + callbacks.send_callback = send_callback; + callbacks.recv_callback = recv_callback; + callbacks.on_stream_close_callback = on_stream_close_callback; + callbacks.on_ctrl_recv_callback = on_ctrl_recv_callback; + callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback; + callbacks.before_ctrl_send_callback = before_ctrl_send_callback; + callbacks.on_ctrl_not_send_callback = on_ctrl_not_send_callback; + callbacks.on_ctrl_recv_parse_error_callback = + on_ctrl_recv_parse_error_callback; + callbacks.on_unknown_ctrl_recv_callback = on_unknown_ctrl_recv_callback; + + rv = spdylay_session_client_new(&session_, version, &callbacks, this); + if(rv != 0) { + return -1; + } + + // TODO Send initial window size when manual flow control is + // implemented. + spdylay_settings_entry entry[1]; + entry[0].settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS; + entry[0].value = get_config()->spdy_max_concurrent_streams; + entry[0].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE; + rv = spdylay_submit_settings + (session_, SPDYLAY_FLAG_SETTINGS_NONE, + entry, sizeof(entry)/sizeof(spdylay_settings_entry)); + if(rv != 0) { + return -1; + } + rv = send(); + if(rv != 0) { + return -1; + } + + // submit pending request + for(std::set::iterator i = dconns_.begin(), + eoi = dconns_.end(); i != eoi; ++i) { + if((*i)->push_request_headers() != 0) { + return -1; + } + } + return 0; +} + +int SpdySession::on_read() +{ + int rv = 0; + if((rv = spdylay_session_recv(session_)) < 0) { + if(rv != SPDYLAY_ERR_EOF) { + LOG(ERROR) << "spdylay_session_recv() returned error: " + << spdylay_strerror(rv); + } + } else if((rv = spdylay_session_send(session_)) < 0) { + LOG(ERROR) << "spdylay_session_send() returned error: " + << spdylay_strerror(rv); + } + // if(rv == 0) { + // if(spdylay_session_want_read(session_) == 0 && + // spdylay_session_want_write(session_) == 0) { + // if(ENABLE_LOG) { + // LOG(INFO) << "No more read/write for this SPDY session"; + // } + // rv = -1; + // } + // } + return rv; +} + +int SpdySession::on_write() +{ + return send(); +} + +int SpdySession::send() +{ + int rv = 0; + if((rv = spdylay_session_send(session_)) < 0) { + LOG(ERROR) << "spdylay_session_send() returned error: " + << spdylay_strerror(rv); + } + // if(rv == 0) { + // if(spdylay_session_want_read(session_) == 0 && + // spdylay_session_want_write(session_) == 0) { + // if(ENABLE_LOG) { + // LOG(INFO) << "No more read/write for this SPDY session"; + // } + // rv = -1; + // } + // } + return rv; +} + +void SpdySession::clear_notify() +{ + evbuffer *input = bufferevent_get_output(rdbev_); + evbuffer_drain(input, evbuffer_get_length(input)); + notified_ = false; +} + +void SpdySession::notify() +{ + if(!notified_) { + bufferevent_write(wrbev_, "1", 1); + notified_ = true; + } +} + +bufferevent* SpdySession::get_bev() const +{ + return bev_; +} + +int SpdySession::get_state() const +{ + return state_; +} + +} // namespace shrpx diff --git a/src/shrpx_spdy_session.h b/src/shrpx_spdy_session.h new file mode 100644 index 0000000..83a4ef9 --- /dev/null +++ b/src/shrpx_spdy_session.h @@ -0,0 +1,106 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_SPDY_SESSION_H +#define SHRPX_SPDY_SESSION_H + +#include "shrpx.h" + +#include + +#include + +#include +#include + +#include + +namespace shrpx { + +class SpdyDownstreamConnection; + +struct StreamData { + SpdyDownstreamConnection *dconn; +}; + +class SpdySession { +public: + SpdySession(event_base *evbase, SSL_CTX *ssl_ctx); + ~SpdySession(); + + int init_notification(); + + int disconnect(); + int initiate_connection(); + void connected(); + + void add_downstream_connection(SpdyDownstreamConnection *dconn); + void remove_downstream_connection(SpdyDownstreamConnection *dconn); + + void remove_stream_data(StreamData *sd); + + int submit_request(SpdyDownstreamConnection *dconn, + uint8_t pri, const char **nv, + const spdylay_data_provider *data_prd); + + int submit_rst_stream(SpdyDownstreamConnection *docnn, + int32_t stream_id, uint32_t status_code); + + int resume_data(SpdyDownstreamConnection *dconn); + + int on_connect(); + + int on_read(); + int on_write(); + int send(); + + void clear_notify(); + void notify(); + + bufferevent* get_bev() const; + + int get_state() const; + + enum { + DISCONNECTED, + CONNECTING, + CONNECTED + }; +private: + event_base *evbase_; + SSL_CTX *ssl_ctx_; + SSL *ssl_; + spdylay_session *session_; + bufferevent *bev_; + std::set dconns_; + std::set streams_; + int state_; + bool notified_; + bufferevent *wrbev_; + bufferevent *rdbev_; +}; + +} // namespace shrpx + +#endif // SHRPX_SPDY_SESSION_H diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index c1a158b..1919670 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -460,7 +460,7 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr) delete downstream; return; } - int rv = downstream->parse_http_response(); + int rv = downstream->on_read(); if(rv != 0) { if(ENABLE_LOG) { LOG(INFO) << "Downstream HTTP parser failure"; @@ -846,7 +846,9 @@ int32_t SpdyUpstream::get_initial_window_size() const void SpdyUpstream::pause_read(IOCtrlReason reason) {} -void SpdyUpstream::resume_read(IOCtrlReason reason) -{} +int SpdyUpstream::resume_read(IOCtrlReason reason) +{ + return 0; +} } // namespace shrpx diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index 1c0e108..5f67396 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -59,7 +59,7 @@ public: int error_reply(Downstream *downstream, int status_code); virtual void pause_read(IOCtrlReason reason); - virtual void resume_read(IOCtrlReason reason); + virtual int resume_read(IOCtrlReason reason); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, diff --git a/src/shrpx_ssl.cc b/src/shrpx_ssl.cc index 4bb8cbc..89f58d3 100644 --- a/src/shrpx_ssl.cc +++ b/src/shrpx_ssl.cc @@ -221,9 +221,6 @@ ClientHandler* accept_ssl_connection(event_base *evbase, SSL_CTX *ssl_ctx, BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS); } ClientHandler *client_handler = new ClientHandler(bev, fd, ssl, host); - if(get_config()->client_mode) { - client_handler->set_ssl_client_ctx(ssl_ctx); - } return client_handler; } else { LOG(ERROR) << "getnameinfo() failed: " << gai_strerror(rv); diff --git a/src/shrpx_thread_event_receiver.cc b/src/shrpx_thread_event_receiver.cc index 16a74ce..dbcd692 100644 --- a/src/shrpx_thread_event_receiver.cc +++ b/src/shrpx_thread_event_receiver.cc @@ -29,11 +29,13 @@ #include "shrpx_ssl.h" #include "shrpx_log.h" #include "shrpx_client_handler.h" +#include "shrpx_spdy_session.h" namespace shrpx { -ThreadEventReceiver::ThreadEventReceiver(SSL_CTX *ssl_ctx) - : ssl_ctx_(ssl_ctx) +ThreadEventReceiver::ThreadEventReceiver(SSL_CTX *ssl_ctx, SpdySession *spdy) + : ssl_ctx_(ssl_ctx), + spdy_(spdy) {} ThreadEventReceiver::~ThreadEventReceiver() @@ -56,6 +58,7 @@ void ThreadEventReceiver::on_read(bufferevent *bev) &wev.client_addr.sa, wev.client_addrlen); if(client_handler) { + client_handler->set_spdy_session(spdy_); if(ENABLE_LOG) { LOG(INFO) << "ClientHandler " << client_handler << " created"; } diff --git a/src/shrpx_thread_event_receiver.h b/src/shrpx_thread_event_receiver.h index 02fc6ee..c76ad33 100644 --- a/src/shrpx_thread_event_receiver.h +++ b/src/shrpx_thread_event_receiver.h @@ -35,6 +35,8 @@ namespace shrpx { +class SpdySession; + struct WorkerEvent { evutil_socket_t client_fd; sockaddr_union client_addr; @@ -43,11 +45,14 @@ struct WorkerEvent { class ThreadEventReceiver { public: - ThreadEventReceiver(SSL_CTX *ssl_ctx); + ThreadEventReceiver(SSL_CTX *ssl_ctx, SpdySession *spdy); ~ThreadEventReceiver(); void on_read(bufferevent *bev); private: SSL_CTX *ssl_ctx_; + // Shared SPDY session for each thread. NULL if not client mode. Not + // deleted by this object. + SpdySession *spdy_; }; } // namespace shrpx diff --git a/src/shrpx_upstream.h b/src/shrpx_upstream.h index 8e77660..c4e4cf8 100644 --- a/src/shrpx_upstream.h +++ b/src/shrpx_upstream.h @@ -53,7 +53,7 @@ public: virtual int on_downstream_body_complete(Downstream *downstream) = 0; virtual void pause_read(IOCtrlReason reason) = 0; - virtual void resume_read(IOCtrlReason reason) = 0; + virtual int resume_read(IOCtrlReason reason) = 0; }; } // namespace shrpx diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index b2d4bf9..f9d12da 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -33,6 +33,7 @@ #include "shrpx_ssl.h" #include "shrpx_thread_event_receiver.h" #include "shrpx_log.h" +#include "shrpx_spdy_session.h" namespace shrpx { @@ -72,7 +73,14 @@ void Worker::run() event_base *evbase = event_base_new(); bufferevent *bev = bufferevent_socket_new(evbase, fd_, BEV_OPT_DEFER_CALLBACKS); - ThreadEventReceiver *receiver = new ThreadEventReceiver(ssl_ctx_); + SpdySession *spdy = 0; + if(get_config()->client_mode) { + spdy = new SpdySession(evbase, ssl_ctx_); + if(spdy->init_notification() == -1) { + DIE(); + } + } + ThreadEventReceiver *receiver = new ThreadEventReceiver(ssl_ctx_, spdy); bufferevent_enable(bev, EV_READ); bufferevent_setcb(bev, readcb, 0, eventcb, receiver);