diff --git a/examples/shrpx.cc b/examples/shrpx.cc index bbc2922..cb8a0ee 100644 --- a/examples/shrpx.cc +++ b/examples/shrpx.cc @@ -343,6 +343,7 @@ void print_help(std::ostream& out) << " Default: WARNING\n" << " -D, --daemon Run in a background. If -D is used, the\n" << " current working directory is changed to '/'.\n" + << " -s, --spdy-proxy SSL/SPDY proxy mode.\n" << " -h, --help Print this help.\n" << std::endl; } @@ -367,11 +368,12 @@ int main(int argc, char **argv) {"spdy-max-concurrent-streams", required_argument, 0, 'c' }, {"log-level", required_argument, 0, 'L' }, {"daemon", no_argument, 0, 'D' }, + {"spdy-proxy", no_argument, 0, 's' }, {"help", no_argument, 0, 'h' }, {0, 0, 0, 0 } }; int option_index = 0; - int c = getopt_long(argc, argv, "DL:b:c:f:n:h", long_options, + int c = getopt_long(argc, argv, "DL:sb:c:f:n:h", long_options, &option_index); if(c == -1) { break; @@ -413,6 +415,9 @@ int main(int argc, char **argv) case 'c': mod_config()->spdy_max_concurrent_streams = strtol(optarg, 0, 10); break; + case 's': + mod_config()->spdy_proxy = true; + break; case '?': exit(EXIT_FAILURE); default: diff --git a/examples/shrpx_client_handler.cc b/examples/shrpx_client_handler.cc index 535cc46..fa6bd94 100644 --- a/examples/shrpx_client_handler.cc +++ b/examples/shrpx_client_handler.cc @@ -254,4 +254,10 @@ DownstreamConnection* ClientHandler::get_downstream_connection() } } +size_t ClientHandler::get_pending_write_length() +{ + evbuffer *output = bufferevent_get_output(bev_); + return evbuffer_get_length(output); +} + } // namespace shrpx diff --git a/examples/shrpx_client_handler.h b/examples/shrpx_client_handler.h index 424d3b9..5875597 100644 --- a/examples/shrpx_client_handler.h +++ b/examples/shrpx_client_handler.h @@ -58,6 +58,7 @@ public: void pool_downstream_connection(DownstreamConnection *dconn); void remove_downstream_connection(DownstreamConnection *dconn); DownstreamConnection* get_downstream_connection(); + size_t get_pending_write_length(); private: bufferevent *bev_; SSL *ssl_; diff --git a/examples/shrpx_config.cc b/examples/shrpx_config.cc index 4c741fc..0a6bbcd 100644 --- a/examples/shrpx_config.cc +++ b/examples/shrpx_config.cc @@ -40,7 +40,8 @@ Config::Config() downstream_hostport(0), downstream_addrlen(0), num_worker(0), - spdy_max_concurrent_streams(0) + spdy_max_concurrent_streams(0), + spdy_proxy(false) {} namespace { diff --git a/examples/shrpx_config.h b/examples/shrpx_config.h index 97157ad..7d8f9fb 100644 --- a/examples/shrpx_config.h +++ b/examples/shrpx_config.h @@ -67,6 +67,7 @@ struct Config { timeval downstream_idle_read_timeout; size_t num_worker; size_t spdy_max_concurrent_streams; + bool spdy_proxy; Config(); }; diff --git a/examples/shrpx_downstream.cc b/examples/shrpx_downstream.cc index ecd7158..512a496 100644 --- a/examples/shrpx_downstream.cc +++ b/examples/shrpx_downstream.cc @@ -166,7 +166,7 @@ void Downstream::set_last_request_header_value(const std::string& value) item.second = value; check_transfer_encoding_chunked(&chunked_request_, item); check_expect_100_continue(&request_expect_100_continue_, item); - check_connection_close(&request_connection_close_, item); + //check_connection_close(&request_connection_close_, item); } void Downstream::set_request_method(const std::string& method) @@ -174,11 +174,21 @@ void Downstream::set_request_method(const std::string& method) request_method_ = method; } +const std::string& Downstream::get_request_method() const +{ + return request_method_; +} + void Downstream::set_request_path(const std::string& path) { request_path_ = path; } +const std::string& Downstream::get_request_path() const +{ + return request_path_; +} + void Downstream::set_request_major(int major) { request_major_ = major; @@ -264,14 +274,10 @@ int Downstream::push_request_headers() hdrs += request_path_; hdrs += " "; hdrs += "HTTP/1.1\r\n"; - hdrs += "Host: "; - hdrs += get_config()->downstream_hostport; - hdrs += "\r\n"; std::string via_value; for(Headers::const_iterator i = request_headers_.begin(); i != request_headers_.end(); ++i) { if(util::strieq((*i).first.c_str(), "X-Forwarded-Proto") || - util::strieq((*i).first.c_str(), "host") || util::strieq((*i).first.c_str(), "keep-alive") || util::strieq((*i).first.c_str(), "connection") || util::strieq((*i).first.c_str(), "proxy-connection")) { @@ -298,13 +304,20 @@ int Downstream::push_request_headers() if(request_connection_close_) { hdrs += "Connection: close\r\n"; } - if(!xff_found) { - hdrs += "X-Forwarded-For: "; - hdrs += upstream_->get_client_handler()->get_ipaddr(); + if(request_method_ != "CONNECT") { + if(!xff_found) { + hdrs += "X-Forwarded-For: "; + hdrs += upstream_->get_client_handler()->get_ipaddr(); + hdrs += "\r\n"; + } + hdrs += "X-Forwarded-Proto: "; + if(util::istartsWith(request_path_, "http:")) { + hdrs += "http"; + } else { + hdrs += "https"; + } hdrs += "\r\n"; } - hdrs += "X-Forwarded-Proto: https\r\n"; - hdrs += "Via: "; hdrs += via_value; if(!via_value.empty()) { @@ -388,7 +401,7 @@ void Downstream::set_last_response_header_value(const std::string& value) Headers::value_type &item = response_headers_.back(); item.second = value; check_transfer_encoding_chunked(&chunked_response_, item); - check_connection_close(&response_connection_close_, item); + //check_connection_close(&response_connection_close_, item); } unsigned int Downstream::get_response_http_status() const @@ -421,6 +434,11 @@ int Downstream::get_response_minor() const return response_minor_; } +int Downstream::get_response_version() const +{ + return response_major_*100+response_minor_; +} + bool Downstream::get_chunked_response() const { return chunked_response_; @@ -431,6 +449,11 @@ bool Downstream::get_response_connection_close() const return response_connection_close_; } +void Downstream::set_response_connection_close(bool f) +{ + response_connection_close_ = f; +} + namespace { int htp_hdrs_completecb(htparser *htp) { @@ -439,6 +462,7 @@ int htp_hdrs_completecb(htparser *htp) downstream->set_response_http_status(htparser_get_status(htp)); downstream->set_response_major(htparser_get_major(htp)); downstream->set_response_minor(htparser_get_minor(htp)); + downstream->set_response_connection_close(!htparser_should_keep_alive(htp)); downstream->set_response_state(Downstream::HEADER_COMPLETE); downstream->get_upstream()->on_downstream_header_complete(downstream); return 0; @@ -470,6 +494,7 @@ int htp_bodycb(htparser *htp, const char *data, size_t len) { Downstream *downstream; downstream = reinterpret_cast(htparser_get_userdata(htp)); + downstream->get_upstream()->on_downstream_body (downstream, reinterpret_cast(data), len); return 0; @@ -481,6 +506,12 @@ int htp_body_completecb(htparser *htp) { Downstream *downstream; downstream = reinterpret_cast(htparser_get_userdata(htp)); + + if(downstream->tunnel_established()) { + // For tunneling, we remove timeouts. + downstream->get_downstream_connection()->remove_timeouts(); + } + downstream->set_response_state(Downstream::MSG_COMPLETE); downstream->get_upstream()->on_downstream_body_complete(downstream); return 0; @@ -515,9 +546,11 @@ int Downstream::parse_http_response() bufferevent *bev = dconn_->get_bev(); evbuffer *input = bufferevent_get_input(bev); unsigned char *mem = evbuffer_pullup(input, -1); + size_t nread = htparser_run(response_htp_, &htp_hooks, reinterpret_cast(mem), evbuffer_get_length(input)); + evbuffer_drain(input, nread); if(htparser_get_error(response_htp_) == htparse_error_none) { return 0; @@ -587,4 +620,9 @@ void Downstream::set_recv_window_size(int32_t new_size) recv_window_size_ = new_size; } +bool Downstream::tunnel_established() const +{ + return request_method_ == "CONNECT" && response_http_status_ == 200; +} + } // namespace shrpx diff --git a/examples/shrpx_downstream.h b/examples/shrpx_downstream.h index fb1605e..436f50a 100644 --- a/examples/shrpx_downstream.h +++ b/examples/shrpx_downstream.h @@ -66,12 +66,16 @@ public: int32_t get_recv_window_size() const; void inc_recv_window_size(int32_t amount); void set_recv_window_size(int32_t new_size); + // Returns true if tunnel connection has been established. + bool tunnel_established() const; // downstream request API const Headers& get_request_headers() const; void add_request_header(const std::string& name, const std::string& value); void set_last_request_header_value(const std::string& value); void set_request_method(const std::string& method); + const std::string& get_request_method() const; void set_request_path(const std::string& path); + const std::string& get_request_path() const; void set_request_major(int major); void set_request_minor(int minor); int get_request_major() const; @@ -103,8 +107,10 @@ public: void set_response_minor(int minor); int get_response_major() const; int get_response_minor() const; + int get_response_version() const; bool get_chunked_response() const; bool get_response_connection_close() const; + void set_response_connection_close(bool f); int parse_http_response(); void set_response_state(int state); int get_response_state() const; diff --git a/examples/shrpx_downstream_connection.cc b/examples/shrpx_downstream_connection.cc index ca0c675..7e22e68 100644 --- a/examples/shrpx_downstream_connection.cc +++ b/examples/shrpx_downstream_connection.cc @@ -110,6 +110,13 @@ void DownstreamConnection::start_waiting_response() } } +void DownstreamConnection::remove_timeouts() +{ + if(bev_) { + bufferevent_set_timeouts(bev_, 0, 0); + } +} + namespace { // Gets called when DownstreamConnection is pooled in ClientHandler. void idle_eventcb(bufferevent *bev, short events, void *arg) diff --git a/examples/shrpx_downstream_connection.h b/examples/shrpx_downstream_connection.h index 76a66c6..beb4b9d 100644 --- a/examples/shrpx_downstream_connection.h +++ b/examples/shrpx_downstream_connection.h @@ -47,6 +47,7 @@ public: ClientHandler* get_client_handler(); Downstream* get_downstream(); void start_waiting_response(); + void remove_timeouts(); private: ClientHandler *client_handler_; bufferevent *bev_; diff --git a/examples/shrpx_https_upstream.cc b/examples/shrpx_https_upstream.cc index b037c64..b982027 100644 --- a/examples/shrpx_https_upstream.cc +++ b/examples/shrpx_https_upstream.cc @@ -71,11 +71,11 @@ void HttpsUpstream::reset_current_header_length() namespace { int htp_msg_begin(htparser *htp) { - if(ENABLE_LOG) { - LOG(INFO) << "Upstream https request start"; - } HttpsUpstream *upstream; upstream = reinterpret_cast(htparser_get_userdata(htp)); + if(ENABLE_LOG) { + LOG(INFO) << "Upstream https request start " << upstream; + } upstream->reset_current_header_length(); Downstream *downstream = new Downstream(upstream, 0, 0); upstream->add_downstream(downstream); @@ -111,14 +111,6 @@ int htp_hdrs_begincb(htparser *htp) if(ENABLE_LOG) { LOG(INFO) << "Upstream https request headers start"; } - HttpsUpstream *upstream; - upstream = reinterpret_cast(htparser_get_userdata(htp)); - Downstream *downstream = upstream->get_last_downstream(); - - int version = htparser_get_major(htp)*100 + htparser_get_minor(htp); - if(version < 101) { - downstream->set_request_connection_close(true); - } return 0; } } // namespace @@ -148,16 +140,18 @@ int htp_hdr_valcb(htparser *htp, const char *data, size_t len) namespace { int htp_hdrs_completecb(htparser *htp) { - if(ENABLE_LOG) { - LOG(INFO) << "Upstream https request headers complete"; - } HttpsUpstream *upstream; upstream = reinterpret_cast(htparser_get_userdata(htp)); + if(ENABLE_LOG) { + LOG(INFO) << "Upstream https request headers complete " << upstream; + } Downstream *downstream = upstream->get_last_downstream(); downstream->set_request_major(htparser_get_major(htp)); downstream->set_request_minor(htparser_get_minor(htp)); + downstream->set_request_connection_close(!htparser_should_keep_alive(htp)); + DownstreamConnection *dconn; dconn = upstream->get_client_handler()->get_downstream_connection(); @@ -238,7 +232,9 @@ int HttpsUpstream::on_read() { bufferevent *bev = handler_->get_bev(); evbuffer *input = bufferevent_get_input(bev); + unsigned char *mem = evbuffer_pullup(input, -1); + int nread = htparser_run(htp_, &htp_hooks, reinterpret_cast(mem), evbuffer_get_length(input)); @@ -347,10 +343,20 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) dconn->detach_downstream(downstream); } if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { - upstream->pop_downstream(); - delete downstream; - // Process next HTTP request - upstream->resume_read(SHRPX_MSG_BLOCK); + ClientHandler *handler = upstream->get_client_handler(); + if(handler->get_should_close_after_write() && + handler->get_pending_write_length() == 0) { + // If all upstream response body has already written out to + // the peer, we cannot use writecb for ClientHandler. In + // this case, we just delete handler here. + delete handler; + return; + } else { + upstream->pop_downstream(); + delete downstream; + // Process next HTTP request + upstream->resume_read(SHRPX_MSG_BLOCK); + } } } else { ClientHandler *handler = upstream->get_client_handler(); @@ -411,10 +417,20 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { // Server may indicate the end of the request by EOF if(ENABLE_LOG) { - LOG(INFO) << "Assuming downstream content-length is 0 byte"; + LOG(INFO) << "Downstream body was ended by EOF"; } upstream->on_downstream_body_complete(downstream); - //downstream->set_response_state(Downstream::MSG_COMPLETE); + downstream->set_response_state(Downstream::MSG_COMPLETE); + + ClientHandler *handler = upstream->get_client_handler(); + if(handler->get_should_close_after_write() && + handler->get_pending_write_length() == 0) { + // If all upstream response body has already written out to + // the peer, we cannot use writecb for ClientHandler. In this + // case, we just delete handler here. + delete handler; + return; + } } else if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { // Nothing to do } else { @@ -522,8 +538,11 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) LOG(INFO) << "Downstream on_downstream_header_complete"; } std::string via_value; - std::string location; - std::string hdrs = "HTTP/1.1 "; + char temp[16]; + snprintf(temp, sizeof(temp), "HTTP/%d.%d ", + downstream->get_response_major(), + downstream->get_response_minor()); + std::string hdrs = temp; hdrs += http::get_status_string(downstream->get_response_http_status()); hdrs += "\r\n"; for(Headers::const_iterator i = downstream->get_response_headers().begin(); @@ -534,8 +553,6 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) // These are ignored } else if(util::strieq((*i).first.c_str(), "via")) { via_value = (*i).second; - } else if(util::strieq((*i).first.c_str(), "location")) { - location = (*i).second; } else { hdrs += (*i).first; hdrs += ": "; @@ -543,17 +560,17 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) hdrs += "\r\n"; } } - if(!location.empty()) { - hdrs += "Location: "; - hdrs += http::modify_location_header_value(location); - hdrs += "\r\n"; - } - if(get_client_handler()->get_should_close_after_write()) { - hdrs += "Connection: close\r\n"; - } else if(downstream->get_request_major() == 1 && - downstream->get_request_minor() == 0) { - hdrs += "Connection: Keep-Alive\r\n"; + + if(downstream->get_response_version() < 101) { + if(!downstream->get_response_connection_close()) { + hdrs += "Connection: Keep-Alive\r\n"; + } + } else { + if(downstream->get_response_connection_close()) { + hdrs += "Connection: close\r\n"; + } } + hdrs += "Via: "; hdrs += via_value; if(!via_value.empty()) { @@ -598,8 +615,8 @@ int HttpsUpstream::on_downstream_body_complete(Downstream *downstream) if(ENABLE_LOG) { LOG(INFO) << "Downstream on_downstream_body_complete"; } - if(downstream->get_request_connection_close()) { - ClientHandler *handler = downstream->get_upstream()->get_client_handler(); + if(downstream->get_response_connection_close()) { + ClientHandler *handler = get_client_handler(); handler->set_should_close_after_write(true); } return 0; diff --git a/examples/shrpx_spdy_upstream.cc b/examples/shrpx_spdy_upstream.cc index ab974c7..a626ea5 100644 --- a/examples/shrpx_spdy_upstream.cc +++ b/examples/shrpx_spdy_upstream.cc @@ -103,7 +103,8 @@ void on_stream_close_callback downstream->set_request_state(Downstream::STREAM_CLOSED); if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { // At this point, downstream response was read - if(!downstream->get_response_connection_close()) { + if(!downstream->tunnel_established() && + !downstream->get_response_connection_close()) { // Keep-alive DownstreamConnection *dconn; dconn = downstream->get_downstream_connection(); @@ -146,15 +147,39 @@ void on_ctrl_recv_callback downstream->init_response_body_buf(); char **nv = frame->syn_stream.nv; + const char *path = 0; + const char *scheme = 0; + const char *host = 0; + const char *method = 0; for(size_t i = 0; nv[i]; i += 2) { if(strcmp(nv[i], ":path") == 0) { - downstream->set_request_path(nv[i+1]); + path = nv[i+1]; + } else if(strcmp(nv[i], ":scheme") == 0) { + scheme = nv[i+1]; } else if(strcmp(nv[i], ":method") == 0) { + method = nv[i+1]; downstream->set_request_method(nv[i+1]); + } else if(strcmp(nv[i], ":host") == 0) { + host = nv[i+1]; } else if(nv[i][0] != ':') { downstream->add_request_header(nv[i], nv[i+1]); } } + if(!path || !host || !method) { + upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); + return; + } + if(get_config()->spdy_proxy && scheme) { + std::string reqpath = scheme; + reqpath += "://"; + reqpath += host; + reqpath += path; + downstream->set_request_path(reqpath); + } else { + downstream->set_request_path(path); + } + + downstream->add_request_header("host", host); downstream->add_request_header("X-Forwarded-Spdy", "true"); if(ENABLE_LOG) { @@ -411,11 +436,15 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { // Server may indicate the end of the request by EOF if(ENABLE_LOG) { - LOG(INFO) << "Assuming downstream content-length is 0 byte"; + LOG(INFO) << "Downstream body was ended by EOF"; } downstream->set_response_state(Downstream::MSG_COMPLETE); upstream->on_downstream_body_complete(downstream); - } else if(downstream->get_response_state() != Downstream::MSG_COMPLETE) { + upstream->send(); + } else if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { + // For SSL tunneling? + upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); + } else { // If stream was not closed, then we set MSG_COMPLETE and let // on_stream_close_callback delete downstream. upstream->error_reply(downstream, 502); @@ -437,7 +466,10 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) downstream->set_downstream_connection(0); delete dconn; dconn = 0; - if(downstream->get_response_state() != Downstream::MSG_COMPLETE) { + if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { + // For SSL tunneling + upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); + } else { if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); } else { @@ -499,7 +531,9 @@ ssize_t spdy_data_read_callback(spdylay_session *session, evbuffer *body = downstream->get_response_body_buf(); assert(body); int nread = evbuffer_remove(body, buf, length); - if(nread == 0 && + // For tunneling, DATA stream is endless + if(!downstream->tunnel_established() && + nread == 0 && downstream->get_response_state() == Downstream::MSG_COMPLETE) { *eof = 1; } @@ -590,7 +624,6 @@ int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) const char **nv = new const char*[nheader * 2 + 6 + 1]; size_t hdidx = 0; std::string via_value; - std::string location; nv[hdidx++] = ":status"; nv[hdidx++] = http::get_status_string(downstream->get_response_http_status()); nv[hdidx++] = ":version"; @@ -604,20 +637,11 @@ int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) // These are ignored } else if(util::strieq((*i).first.c_str(), "via")) { via_value = (*i).second; - } else if(util::strieq((*i).first.c_str(), "location")) { - location = (*i).second; } else { nv[hdidx++] = (*i).first.c_str(); nv[hdidx++] = (*i).second.c_str(); } } - if(!location.empty()) { - nv[hdidx++] = "location"; - // Assign location to store the result. Otherwise we lose the - // return value. - location = http::modify_location_header_value(location); - nv[hdidx++] = location.c_str(); - } if(!via_value.empty()) { via_value += ", "; } @@ -631,7 +655,9 @@ int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) for(size_t i = 0; nv[i]; i += 2) { ss << nv[i] << ": " << nv[i+1] << "\n"; } - LOG(INFO) << "Upstream spdy response headers\n" << ss.str(); + LOG(INFO) << "Upstream spdy response headers id=" + << downstream->get_stream_id() << "\n" + << ss.str(); } spdylay_data_provider data_prd; data_prd.source.ptr = downstream; diff --git a/examples/shrpx_ssl.cc b/examples/shrpx_ssl.cc index b1196c6..68954f2 100644 --- a/examples/shrpx_ssl.cc +++ b/examples/shrpx_ssl.cc @@ -104,7 +104,7 @@ SSL_CTX* create_ssl_context() SSL_VERIFY_FAIL_IF_NO_PEER_CERT, verify_callback); } - // We speaks "http/1.1", "spdy/2" and "spdy/3". + // We speak "http/1.1", "spdy/2" and "spdy/3". proto_list[0] = 6; memcpy(&proto_list[1], "spdy/3", 6); proto_list[7] = 6;