From 8f1c49e75c78876cea701b27ee4822dd5df31817 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Wed, 6 Jun 2012 01:26:04 +0900 Subject: [PATCH] Added multi thread support --- examples/Makefile.am | 3 + examples/shrpx.cc | 136 +++-------------------- examples/shrpx.h | 2 + examples/shrpx_client_handler.cc | 17 +-- examples/shrpx_config.cc | 3 +- examples/shrpx_config.h | 1 + examples/shrpx_https_upstream.cc | 27 ++--- examples/shrpx_listen_handler.cc | 80 ++++++++++---- examples/shrpx_listen_handler.h | 14 ++- examples/shrpx_spdy_upstream.cc | 31 +++--- examples/shrpx_ssl.cc | 141 ++++++++++++++++++++++++ examples/shrpx_ssl.h | 51 +++++++++ examples/shrpx_thread_event_receiver.cc | 69 ++++++++++++ examples/shrpx_thread_event_receiver.h | 55 +++++++++ examples/shrpx_worker.cc | 93 ++++++++++++++++ examples/shrpx_worker.h | 50 +++++++++ 16 files changed, 592 insertions(+), 181 deletions(-) create mode 100644 examples/shrpx_ssl.cc create mode 100644 examples/shrpx_ssl.h create mode 100644 examples/shrpx_thread_event_receiver.cc create mode 100644 examples/shrpx_thread_event_receiver.h create mode 100644 examples/shrpx_worker.cc create mode 100644 examples/shrpx_worker.h diff --git a/examples/Makefile.am b/examples/Makefile.am index d62e8d6..3b304be 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -78,6 +78,9 @@ shrpx_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} \ shrpx_log.cc shrpx_log.h \ shrpx_http.cc shrpx_http.h \ shrpx_io_control.cc shrpx_io_control.h \ + shrpx_ssl.cc shrpx_ssl.h \ + shrpx_thread_event_receiver.cc shrpx_thread_event_receiver.h \ + shrpx_worker.cc shrpx_worker.h \ htparse/htparse.c htparse/htparse.h noinst_PROGRAMS = spdycli diff --git a/examples/shrpx.cc b/examples/shrpx.cc index 3200f73..f01a39f 100644 --- a/examples/shrpx.cc +++ b/examples/shrpx.cc @@ -30,9 +30,7 @@ #include #include - #include -#include #include #include @@ -46,82 +44,6 @@ namespace shrpx { -namespace { -std::pair next_proto; -unsigned char proto_list[23]; -} // namespace - -namespace { -int next_proto_cb(SSL *s, const unsigned char **data, unsigned int *len, - void *arg) -{ - std::pair *next_proto = - reinterpret_cast* >(arg); - *data = next_proto->first; - *len = next_proto->second; - return SSL_TLSEXT_ERR_OK; -} -} // namespace - -namespace { -int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) -{ - // We don't verify the client certificate. Just request it for the - // testing purpose. - return 1; -} -} // namespace - -namespace { -SSL_CTX* create_ssl_ctx() -{ - // TODO lock function - SSL_CTX *ssl_ctx; - ssl_ctx = SSL_CTX_new(SSLv23_server_method()); - if(!ssl_ctx) { - std::cerr << ERR_error_string(ERR_get_error(), 0) << std::endl; - return NULL; - } - SSL_CTX_set_options(ssl_ctx, - SSL_OP_ALL | SSL_OP_NO_SSLv2 | SSL_OP_NO_COMPRESSION); - SSL_CTX_set_mode(ssl_ctx, SSL_MODE_AUTO_RETRY); - SSL_CTX_set_mode(ssl_ctx, SSL_MODE_RELEASE_BUFFERS); - if(SSL_CTX_use_PrivateKey_file(ssl_ctx, - get_config()->private_key_file, - SSL_FILETYPE_PEM) != 1) { - std::cerr << "SSL_CTX_use_PrivateKey_file failed." << std::endl; - return NULL; - } - if(SSL_CTX_use_certificate_file(ssl_ctx, get_config()->cert_file, - SSL_FILETYPE_PEM) != 1) { - std::cerr << "SSL_CTX_use_certificate_file failed." << std::endl; - return NULL; - } - if(SSL_CTX_check_private_key(ssl_ctx) != 1) { - std::cerr << "SSL_CTX_check_private_key failed." << std::endl; - return NULL; - } - if(get_config()->verify_client) { - SSL_CTX_set_verify(ssl_ctx, - SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE | - SSL_VERIFY_FAIL_IF_NO_PEER_CERT, - verify_callback); - } - // We speaks "http/1.1", "spdy/2" and "spdy/3". - proto_list[0] = 6; - memcpy(&proto_list[1], "spdy/3", 6); - proto_list[7] = 6; - memcpy(&proto_list[8], "spdy/2", 6); - proto_list[14] = 8; - memcpy(&proto_list[15], "http/1.1", 8); - - next_proto.first = proto_list; - next_proto.second = sizeof(proto_list); - SSL_CTX_set_next_protos_advertised_cb(ssl_ctx, next_proto_cb, &next_proto); - return ssl_ctx; -} -} // namespace - namespace { void ssl_acceptcb(evconnlistener *listener, int fd, sockaddr *addr, int addrlen, void *arg) @@ -144,45 +66,20 @@ int cache_downstream_host_address() #ifdef AI_ADDRCONFIG hints.ai_flags |= AI_ADDRCONFIG; #endif // AI_ADDRCONFIG - addrinfo *res, *rp; + addrinfo *res; rv = getaddrinfo(get_config()->downstream_host, service, &hints, &res); if(rv != 0) { - LOG(ERROR) << "getaddrinfo: " << gai_strerror(rv); - return -1; - } - for(rp = res; rp; rp = rp->ai_next) { - int fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if(fd == -1) { - continue; - } - rv = connect(fd, rp->ai_addr, rp->ai_addrlen); - close(fd); - if(rv == -1) { - continue; - } - break; - } - if(rp == 0 && res) { - LOG(INFO) << "Using first returned address for downstream " - << get_config()->downstream_host - << ", port " - << get_config()->downstream_port; - rp = res; - } - if(rp != 0) { - memcpy(&mod_config()->downstream_addr, rp->ai_addr, rp->ai_addrlen); - mod_config()->downstream_addrlen = rp->ai_addrlen; + LOG(FATAL) << "Unable to get downstream address: " << gai_strerror(rv); + DIE(); } + LOG(INFO) << "Using first returned address for downstream " + << get_config()->downstream_host + << ", port " + << get_config()->downstream_port; + memcpy(&mod_config()->downstream_addr, res->ai_addr, res->ai_addrlen); + mod_config()->downstream_addrlen = res->ai_addrlen; freeaddrinfo(res); - if(rp == 0) { - LOG(ERROR) << "No usable address found for downstream " - << get_config()->downstream_host - << ", port " - << get_config()->downstream_port; - return -1; - } else { - return 0; - } + return 0; } } // namespace @@ -245,12 +142,11 @@ evconnlistener* create_evlistener(ListenHandler *handler) namespace { int event_loop() { - SSL_CTX *ssl_ctx = create_ssl_ctx(); - if(ssl_ctx == NULL) { - return -1; - } event_base *evbase = event_base_new(); - ListenHandler *listener_handler = new ListenHandler(evbase, ssl_ctx); + ListenHandler *listener_handler = new ListenHandler(evbase); + if(get_config()->num_worker > 1) { + listener_handler->create_worker_thread(get_config()->num_worker); + } evconnlistener *evlistener = create_evlistener(listener_handler); if(evlistener == NULL) { return -1; @@ -261,7 +157,6 @@ int event_loop() event_base_loop(evbase, 0); evconnlistener_free(evlistener); - SSL_CTX_free(ssl_ctx); return 0; } } // namespace @@ -311,6 +206,9 @@ int main(int argc, char **argv) if(cache_downstream_host_address() == -1) { exit(EXIT_FAILURE); } + + mod_config()->num_worker = 4; + event_loop(); return 0; } diff --git a/examples/shrpx.h b/examples/shrpx.h index 2258b43..50e458e 100644 --- a/examples/shrpx.h +++ b/examples/shrpx.h @@ -29,6 +29,8 @@ # include #endif // HAVE_CONFIG_H +#include + #include "shrpx_log.h" #define DIE() \ diff --git a/examples/shrpx_client_handler.cc b/examples/shrpx_client_handler.cc index 4139c3a..b62716e 100644 --- a/examples/shrpx_client_handler.cc +++ b/examples/shrpx_client_handler.cc @@ -49,6 +49,7 @@ namespace { void upstream_writecb(bufferevent *bev, void *arg) { ClientHandler *handler = reinterpret_cast(arg); + // We actually depend on write low-warter mark == 0. if(handler->get_should_close_after_write()) { delete handler; } else { @@ -65,19 +66,19 @@ void upstream_eventcb(bufferevent *bev, short events, void *arg) bool finish = false; if(events & BEV_EVENT_EOF) { if(ENABLE_LOG) { - LOG(INFO) << " SSL/TLS handshake EOF"; + LOG(INFO) << "Upstream handshake EOF"; } finish = true; } if(events & BEV_EVENT_ERROR) { if(ENABLE_LOG) { - LOG(INFO) << " SSL/TLS network error"; + LOG(INFO) << "Upstream network error"; } finish = true; } if(events & BEV_EVENT_TIMEOUT) { if(ENABLE_LOG) { - LOG(INFO) << "SPDY upstream SSL/TLS time out"; + LOG(INFO) << "Upstream time out"; } finish = true; } @@ -86,8 +87,7 @@ void upstream_eventcb(bufferevent *bev, short events, void *arg) } else { if(events & BEV_EVENT_CONNECTED) { if(ENABLE_LOG) { - LOG(INFO) << "Connected Handler " - << handler; + LOG(INFO) << "Upstream connected. handler " << handler; } handler->set_bev_cb(upstream_readcb, upstream_writecb, upstream_eventcb); handler->validate_next_proto(); @@ -167,7 +167,7 @@ int ClientHandler::validate_next_proto() if(next_proto) { std::string proto(next_proto, next_proto+next_proto_len); if(ENABLE_LOG) { - LOG(INFO) << " The negotiated next protocol: " << proto; + LOG(INFO) << "Upstream negotiated next protocol: " << proto; } uint16_t version = spdylay_npn_get_version(next_proto, next_proto_len); if(version) { @@ -177,9 +177,12 @@ int ClientHandler::validate_next_proto() } } else { if(ENABLE_LOG) { - LOG(INFO) << " No proto negotiated"; + LOG(INFO) << "No proto negotiated."; } } + if(ENABLE_LOG) { + LOG(INFO) << "Use HTTP/1.1"; + } HttpsUpstream *https_upstream = new HttpsUpstream(this); upstream_ = https_upstream; return 0; diff --git a/examples/shrpx_config.cc b/examples/shrpx_config.cc index 234f7eb..1d801f7 100644 --- a/examples/shrpx_config.cc +++ b/examples/shrpx_config.cc @@ -38,7 +38,8 @@ Config::Config() downstream_host(0), downstream_port(0), downstream_hostport(0), - downstream_addrlen(0) + downstream_addrlen(0), + num_worker(0) {} namespace { diff --git a/examples/shrpx_config.h b/examples/shrpx_config.h index 7063081..22e3899 100644 --- a/examples/shrpx_config.h +++ b/examples/shrpx_config.h @@ -64,6 +64,7 @@ struct Config { timeval spdy_upstream_write_timeout; timeval downstream_read_timeout; timeval downstream_write_timeout; + size_t num_worker; Config(); }; diff --git a/examples/shrpx_https_upstream.cc b/examples/shrpx_https_upstream.cc index 707b82b..276b81a 100644 --- a/examples/shrpx_https_upstream.cc +++ b/examples/shrpx_https_upstream.cc @@ -71,7 +71,7 @@ namespace { int htp_msg_begin(htparser *htp) { if(ENABLE_LOG) { - LOG(INFO) << ":: request start"; + LOG(INFO) << "Upstream https request start"; } HttpsUpstream *upstream; upstream = reinterpret_cast(htparser_get_userdata(htp)); @@ -108,7 +108,7 @@ namespace { int htp_hdrs_begincb(htparser *htp) { if(ENABLE_LOG) { - LOG(INFO) << ":: request headers start"; + LOG(INFO) << "Upstream https request headers start"; } HttpsUpstream *upstream; upstream = reinterpret_cast(htparser_get_userdata(htp)); @@ -148,7 +148,7 @@ namespace { int htp_hdrs_completecb(htparser *htp) { if(ENABLE_LOG) { - LOG(INFO) << ":: request headers complete"; + LOG(INFO) << "Upstream https request headers complete"; } HttpsUpstream *upstream; upstream = reinterpret_cast(htparser_get_userdata(htp)); @@ -178,7 +178,7 @@ namespace { int htp_msg_completecb(htparser *htp) { if(ENABLE_LOG) { - LOG(INFO) << ":: request complete"; + LOG(INFO) << "Upstream https request complete"; } HttpsUpstream *upstream; upstream = reinterpret_cast(htparser_get_userdata(htp)); @@ -237,7 +237,7 @@ int HttpsUpstream::on_read() } } else if(htperr != htparse_error_none) { if(ENABLE_LOG) { - LOG(INFO) << " http parse failure: " + LOG(INFO) << "Upstream http parse failure: " << htparser_get_strerror(htp_); } get_client_handler()->set_should_close_after_write(true); @@ -328,18 +328,19 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) upstream = static_cast(downstream->get_upstream()); if(events & BEV_EVENT_CONNECTED) { if(ENABLE_LOG) { - LOG(INFO) << " Connection established. " << downstream; + LOG(INFO) << "Downstream connection established. downstream " + << downstream; } } if(events & BEV_EVENT_EOF) { if(ENABLE_LOG) { - LOG(INFO) << " EOF stream_id=" + LOG(INFO) << "Downstream EOF. stream_id=" << downstream->get_stream_id(); } if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { // Server may indicate the end of the request by EOF if(ENABLE_LOG) { - LOG(INFO) << " Assuming content-length is 0 byte"; + LOG(INFO) << "Assuming downstream content-length is 0 byte"; } upstream->on_downstream_body_complete(downstream); //downstream->set_response_state(Downstream::MSG_COMPLETE); @@ -348,7 +349,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } else { // error if(ENABLE_LOG) { - LOG(INFO) << " Treated as error"; + LOG(INFO) << "Treated as downstream error"; } upstream->error_reply(502); } @@ -357,7 +358,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) upstream->resume_read(SHRPX_MSG_BLOCK); } else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { if(ENABLE_LOG) { - LOG(INFO) << " error/timeout. " << downstream; + LOG(INFO) << "Downstream error/timeout. " << downstream; } if(downstream->get_response_state() == Downstream::INITIAL) { int status; @@ -439,7 +440,7 @@ Downstream* HttpsUpstream::get_last_downstream() int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) { if(ENABLE_LOG) { - LOG(INFO) << " on_downstream_header_complete"; + LOG(INFO) << "Downstream on_downstream_header_complete"; } std::string hdrs = "HTTP/1.1 "; hdrs += http::get_status_string(downstream->get_response_http_status()); @@ -467,7 +468,7 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) } hdrs += "\r\n"; if(ENABLE_LOG) { - LOG(INFO) << ":: Response headers\n" << hdrs; + LOG(INFO) << "Upstream https response headers\n" << hdrs; } evbuffer *output = bufferevent_get_output(handler_->get_bev()); evbuffer_add(output, hdrs.c_str(), hdrs.size()); @@ -496,7 +497,7 @@ int HttpsUpstream::on_downstream_body_complete(Downstream *downstream) evbuffer_add(output, "0\r\n\r\n", 5); } if(ENABLE_LOG) { - LOG(INFO) << " on_downstream_body_complete"; + LOG(INFO) << "Downstream on_downstream_body_complete"; } if(downstream->get_request_connection_close()) { ClientHandler *handler = downstream->get_upstream()->get_client_handler(); diff --git a/examples/shrpx_listen_handler.cc b/examples/shrpx_listen_handler.cc index 8be62a1..685321e 100644 --- a/examples/shrpx_listen_handler.cc +++ b/examples/shrpx_listen_handler.cc @@ -24,48 +24,82 @@ */ #include "shrpx_listen_handler.h" +#include + +#include + #include #include "shrpx_client_handler.h" +#include "shrpx_thread_event_receiver.h" +#include "shrpx_ssl.h" +#include "shrpx_worker.h" namespace shrpx { -ListenHandler::ListenHandler(event_base *evbase, SSL_CTX *ssl_ctx) +ListenHandler::ListenHandler(event_base *evbase) : evbase_(evbase), - ssl_ctx_(ssl_ctx) + ssl_ctx_(ssl::create_ssl_context()), + worker_round_robin_cnt_(0), + workers_(0), + num_worker_(0) {} ListenHandler::~ListenHandler() {} +void ListenHandler::create_worker_thread(size_t num) +{ + workers_ = new WorkerInfo[num]; + num_worker_ = 0; + for(size_t i = 0; i < num; ++i) { + int rv; + pthread_t thread; + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + WorkerInfo *info = &workers_[num_worker_]; + rv = socketpair(AF_UNIX, SOCK_STREAM, 0, info->sv); + if(rv == -1) { + LOG(ERROR) << "socketpair() failed: " << strerror(errno); + continue; + } + rv = pthread_create(&thread, &attr, start_threaded_worker, &info->sv[1]); + if(rv != 0) { + LOG(ERROR) << "pthread_create() failed: " << strerror(rv); + for(size_t j = 0; j < 2; ++j) { + close(info->sv[j]); + } + continue; + } + bufferevent *bev = bufferevent_socket_new(evbase_, info->sv[0], + BEV_OPT_DEFER_CALLBACKS); + info->bev = bev; + if(ENABLE_LOG) { + LOG(INFO) << "Created thread#" << num_worker_; + } + ++num_worker_; + } +} + int ListenHandler::accept_connection(evutil_socket_t fd, sockaddr *addr, int addrlen) { if(ENABLE_LOG) { LOG(INFO) << " Accepted connection. fd=" << fd; } - char host[NI_MAXHOST]; - int rv; - rv = getnameinfo(addr, addrlen, host, sizeof(host), 0, 0, NI_NUMERICHOST); - if(rv == 0) { - SSL *ssl = SSL_new(ssl_ctx_); - bufferevent *bev = bufferevent_openssl_socket_new - (evbase_, fd, ssl, - BUFFEREVENT_SSL_ACCEPTING, - BEV_OPT_DEFER_CALLBACKS); - if(bev == NULL) { - if(ENABLE_LOG) { - LOG(ERROR) << " bufferevent_openssl_socket_new failed"; - } - close(fd); - } else { - /*ClientHandler *client_handler =*/ new ClientHandler(bev, ssl, host); - } + if(num_worker_ == 0) { + /*ClientHandler* client = */ + ssl::accept_ssl_connection(evbase_, ssl_ctx_, fd, addr, addrlen); } else { - if(ENABLE_LOG) { - LOG(INFO) << " getnameinfo failed"; - } - close(fd); + size_t idx = worker_round_robin_cnt_ % num_worker_; + ++worker_round_robin_cnt_; + WorkerEvent wev; + wev.client_fd = fd; + memcpy(&wev.client_addr, addr, addrlen); + wev.client_addrlen = addrlen; + evbuffer *output = bufferevent_get_output(workers_[idx].bev); + evbuffer_add(output, &wev, sizeof(wev)); } return 0; } diff --git a/examples/shrpx_listen_handler.h b/examples/shrpx_listen_handler.h index 4b3e31f..ccae81c 100644 --- a/examples/shrpx_listen_handler.h +++ b/examples/shrpx_listen_handler.h @@ -31,19 +31,29 @@ #include #include + #include namespace shrpx { +struct WorkerInfo { + int sv[2]; + bufferevent *bev; +}; + class ListenHandler { public: - ListenHandler(event_base *evbase, SSL_CTX *ssl_ctx); + ListenHandler(event_base *evbase); ~ListenHandler(); int accept_connection(evutil_socket_t fd, sockaddr *addr, int addrlen); + void create_worker_thread(size_t num); event_base* get_evbase() const; private: event_base *evbase_; - SSL_CTX *ssl_ctx_; + SSL_CTX *ssl_ctx_; + unsigned int worker_round_robin_cnt_; + WorkerInfo *workers_; + size_t num_worker_; }; } // namespace shrpx diff --git a/examples/shrpx_spdy_upstream.cc b/examples/shrpx_spdy_upstream.cc index 7f5dd3c..8cc39d9 100644 --- a/examples/shrpx_spdy_upstream.cc +++ b/examples/shrpx_spdy_upstream.cc @@ -90,8 +90,7 @@ void on_stream_close_callback void *user_data) { if(ENABLE_LOG) { - LOG(INFO) << ":: Stream " << stream_id - << " is being closed"; + LOG(INFO) << "Upstream spdy Stream " << stream_id << " is being closed"; } SpdyUpstream *upstream = reinterpret_cast(user_data); Downstream *downstream = upstream->get_downstream_queue()->find(stream_id); @@ -114,7 +113,7 @@ void on_ctrl_recv_callback switch(type) { case SPDYLAY_SYN_STREAM: { if(ENABLE_LOG) { - LOG(INFO) << ":: Received upstream SYN_STREAM stream_id=" + LOG(INFO) << "Upstream spdy received upstream SYN_STREAM stream_id=" << frame->syn_stream.stream_id; } Downstream *downstream = new Downstream(upstream, @@ -139,14 +138,14 @@ void on_ctrl_recv_callback for(size_t i = 0; nv[i]; i += 2) { ss << nv[i] << ": " << nv[i+1] << "\n"; } - LOG(INFO) << ":: Request headers:\n" << ss.str(); + LOG(INFO) << "Upstream spdy request headers:\n" << ss.str(); } downstream->push_request_headers(); downstream->set_request_state(Downstream::HEADER_COMPLETE); if(frame->syn_stream.hd.flags & SPDYLAY_CTRL_FLAG_FIN) { if(ENABLE_LOG) { - LOG(INFO) << ":: " + LOG(INFO) << "Upstream spdy " << "Setting Downstream::MSG_COMPLETE for Downstream " << downstream; } @@ -169,7 +168,7 @@ void on_data_chunk_recv_callback(spdylay_session *session, void *user_data) { if(ENABLE_LOG) { - LOG(INFO) << ":: Received upstream DATA data stream_id=" + LOG(INFO) << "Upstream spdy received upstream DATA data stream_id=" << stream_id; } SpdyUpstream *upstream = reinterpret_cast(user_data); @@ -178,8 +177,8 @@ void on_data_chunk_recv_callback(spdylay_session *session, downstream->push_upload_data_chunk(data, len); if(flags & SPDYLAY_DATA_FLAG_FIN) { if(ENABLE_LOG) { - LOG(INFO) << ":: " - << "Setting Downstream::MSG_COMPLETE for Downstream " + LOG(INFO) << "Upstream spdy " + << "setting Downstream::MSG_COMPLETE for Downstream " << downstream; } downstream->set_request_state(Downstream::MSG_COMPLETE); @@ -309,13 +308,13 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) upstream = static_cast(downstream->get_upstream()); if(events & BEV_EVENT_CONNECTED) { if(ENABLE_LOG) { - LOG(INFO) << " Connection established. Downstream " + LOG(INFO) << "Downstream connection established. Downstream " << downstream; } } if(events & BEV_EVENT_EOF) { if(ENABLE_LOG) { - LOG(INFO) << " EOF stream_id=" + LOG(INFO) << "Downstream EOF stream_id=" << downstream->get_stream_id(); } if(downstream->get_request_state() == Downstream::STREAM_CLOSED) { @@ -328,7 +327,7 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { // Server may indicate the end of the request by EOF if(ENABLE_LOG) { - LOG(INFO) << " Assuming content-length is 0 byte"; + LOG(INFO) << "Assuming downstream content-length is 0 byte"; } downstream->set_response_state(Downstream::MSG_COMPLETE); upstream->on_downstream_body_complete(downstream); @@ -342,7 +341,7 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) } } else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { if(ENABLE_LOG) { - LOG(INFO) << " error/timeout. Downstream " << downstream; + LOG(INFO) << "Downstream error/timeout. Downstream " << downstream; } if(downstream->get_request_state() == Downstream::STREAM_CLOSED) { upstream->remove_downstream(downstream); @@ -474,7 +473,7 @@ spdylay_session* SpdyUpstream::get_spdy_session() int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) { if(ENABLE_LOG) { - LOG(INFO) << " on_downstream_header_complete"; + LOG(INFO) << "Downstream on_downstream_header_complete"; } size_t nheader = downstream->get_response_headers().size(); const char **nv = new const char*[nheader * 2 + 4 + 1]; @@ -504,7 +503,7 @@ int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) for(size_t i = 0; nv[i]; i += 2) { ss << nv[i] << ": " << nv[i+1] << "\n"; } - LOG(INFO) << ":: Response headers\n" << ss.str(); + LOG(INFO) << "Upstream spdy response headers\n" << ss.str(); } spdylay_data_provider data_prd; data_prd.source.ptr = downstream; @@ -520,7 +519,7 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream, const uint8_t *data, size_t len) { if(ENABLE_LOG) { - LOG(INFO) << " on_downstream_body"; + LOG(INFO) << "Downstream on_downstream_body"; } evbuffer *body = downstream->get_response_body_buf(); evbuffer_add(body, data, len); @@ -537,7 +536,7 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream, int SpdyUpstream::on_downstream_body_complete(Downstream *downstream) { if(ENABLE_LOG) { - LOG(INFO) << " on_downstream_body_complete"; + LOG(INFO) << "Downstream on_downstream_body_complete"; } spdylay_session_resume_data(session_, downstream->get_stream_id()); return 0; diff --git a/examples/shrpx_ssl.cc b/examples/shrpx_ssl.cc new file mode 100644 index 0000000..a278daa --- /dev/null +++ b/examples/shrpx_ssl.cc @@ -0,0 +1,141 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_ssl.h" + +#include +#include + +#include +#include + +#include "shrpx_log.h" +#include "shrpx_client_handler.h" +#include "shrpx_config.h" + +namespace shrpx { + +namespace ssl { + +namespace { +std::pair next_proto; +unsigned char proto_list[23]; +} // namespace + +namespace { +int next_proto_cb(SSL *s, const unsigned char **data, unsigned int *len, + void *arg) +{ + std::pair *next_proto = + reinterpret_cast* >(arg); + *data = next_proto->first; + *len = next_proto->second; + return SSL_TLSEXT_ERR_OK; +} +} // namespace + +namespace { +int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) +{ + // We don't verify the client certificate. Just request it for the + // testing purpose. + return 1; +} +} // namespace + +SSL_CTX* create_ssl_context() +{ + SSL_CTX *ssl_ctx; + ssl_ctx = SSL_CTX_new(SSLv23_server_method()); + if(!ssl_ctx) { + LOG(FATAL) << ERR_error_string(ERR_get_error(), 0); + DIE(); + } + SSL_CTX_set_options(ssl_ctx, + SSL_OP_ALL | SSL_OP_NO_SSLv2 | SSL_OP_NO_COMPRESSION); + SSL_CTX_set_mode(ssl_ctx, SSL_MODE_AUTO_RETRY); + SSL_CTX_set_mode(ssl_ctx, SSL_MODE_RELEASE_BUFFERS); + if(SSL_CTX_use_PrivateKey_file(ssl_ctx, + get_config()->private_key_file, + SSL_FILETYPE_PEM) != 1) { + LOG(FATAL) << "SSL_CTX_use_PrivateKey_file failed."; + DIE(); + } + if(SSL_CTX_use_certificate_file(ssl_ctx, get_config()->cert_file, + SSL_FILETYPE_PEM) != 1) { + LOG(FATAL) << "SSL_CTX_use_certificate_file failed."; + DIE(); + } + if(SSL_CTX_check_private_key(ssl_ctx) != 1) { + LOG(FATAL) << "SSL_CTX_check_private_key failed."; + DIE(); + } + if(get_config()->verify_client) { + SSL_CTX_set_verify(ssl_ctx, + SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE | + SSL_VERIFY_FAIL_IF_NO_PEER_CERT, + verify_callback); + } + // We speaks "http/1.1", "spdy/2" and "spdy/3". + proto_list[0] = 6; + memcpy(&proto_list[1], "spdy/3", 6); + proto_list[7] = 6; + memcpy(&proto_list[8], "spdy/2", 6); + proto_list[14] = 8; + memcpy(&proto_list[15], "http/1.1", 8); + + next_proto.first = proto_list; + next_proto.second = sizeof(proto_list); + SSL_CTX_set_next_protos_advertised_cb(ssl_ctx, next_proto_cb, &next_proto); + return ssl_ctx; +} + +ClientHandler* accept_ssl_connection(event_base *evbase, SSL_CTX *ssl_ctx, + evutil_socket_t fd, + sockaddr *addr, int addrlen) +{ + char host[NI_MAXHOST]; + int rv; + rv = getnameinfo(addr, addrlen, host, sizeof(host), 0, 0, NI_NUMERICHOST); + if(rv == 0) { + SSL *ssl = SSL_new(ssl_ctx); + if(!ssl) { + LOG(ERROR) << "SSL_new() failed"; + return 0; + } + bufferevent *bev = bufferevent_openssl_socket_new + (evbase, fd, ssl, + BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS); + + ClientHandler *client_handler = new ClientHandler(bev, ssl, host); + return client_handler; + } else { + LOG(ERROR) << "getnameinfo() failed: " << gai_strerror(rv); + return 0; + } +} + +} // namespace ssl + +} // namespace shrpx diff --git a/examples/shrpx_ssl.h b/examples/shrpx_ssl.h new file mode 100644 index 0000000..1f3e362 --- /dev/null +++ b/examples/shrpx_ssl.h @@ -0,0 +1,51 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_SSL_H +#define SHRPX_SSL_H + +#include "shrpx.h" + +#include +#include + +#include + +namespace shrpx { + +class ClientHandler; + +namespace ssl { + +SSL_CTX* create_ssl_context(); + +ClientHandler* accept_ssl_connection(event_base *evbase, SSL_CTX *ssl_ctx, + evutil_socket_t fd, + sockaddr *addr, int addrlen); + +} // namespace ssl + +} // namespace shrpx + +#endif // SHRPX_SSL_H diff --git a/examples/shrpx_thread_event_receiver.cc b/examples/shrpx_thread_event_receiver.cc new file mode 100644 index 0000000..009589d --- /dev/null +++ b/examples/shrpx_thread_event_receiver.cc @@ -0,0 +1,69 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_thread_event_receiver.h" + +#include "shrpx_ssl.h" +#include "shrpx_log.h" +#include "shrpx_client_handler.h" + +namespace shrpx { + +ThreadEventReceiver::ThreadEventReceiver(SSL_CTX *ssl_ctx) + : ssl_ctx_(ssl_ctx) +{} + +ThreadEventReceiver::~ThreadEventReceiver() +{} + +void ThreadEventReceiver::on_read(bufferevent *bev) +{ + evbuffer *input = bufferevent_get_input(bev); + while(evbuffer_get_length(input) >= sizeof(WorkerEvent)) { + WorkerEvent wev; + evbuffer_remove(input, &wev, sizeof(WorkerEvent)); + if(ENABLE_LOG) { + LOG(INFO) << "WorkerEvent: client_fd=" << wev.client_fd + << ", addrlen=" << wev.client_addrlen; + } + event_base *evbase = bufferevent_get_base(bev); + ClientHandler *client_handler; + client_handler = ssl::accept_ssl_connection(evbase, ssl_ctx_, + wev.client_fd, + &wev.client_addr.sa, + wev.client_addrlen); + if(client_handler) { + if(ENABLE_LOG) { + LOG(INFO) << "ClientHandler " << client_handler << " created"; + } + } else { + if(ENABLE_LOG) { + LOG(ERROR) << "ClientHandler creation failed"; + } + close(wev.client_fd); + } + } +} + +} // namespace shrpx diff --git a/examples/shrpx_thread_event_receiver.h b/examples/shrpx_thread_event_receiver.h new file mode 100644 index 0000000..70116ac --- /dev/null +++ b/examples/shrpx_thread_event_receiver.h @@ -0,0 +1,55 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_THREAD_EVENT_RECEIVER_H +#define SHRPX_THREAD_EVENT_RECEIVER_H + +#include "shrpx.h" + +#include + +#include + +#include "shrpx_config.h" + +namespace shrpx { + +struct WorkerEvent { + evutil_socket_t client_fd; + sockaddr_union client_addr; + size_t client_addrlen; +}; + +class ThreadEventReceiver { +public: + ThreadEventReceiver(SSL_CTX *ssl_ctx); + ~ThreadEventReceiver(); + void on_read(bufferevent *bev); +private: + SSL_CTX *ssl_ctx_; +}; + +} // namespace shrpx + +#endif // SHRPX_THREAD_EVENT_RECEIVER_H diff --git a/examples/shrpx_worker.cc b/examples/shrpx_worker.cc new file mode 100644 index 0000000..c0543aa --- /dev/null +++ b/examples/shrpx_worker.cc @@ -0,0 +1,93 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_worker.h" + +#include +#include + +#include +#include + +#include "shrpx_ssl.h" +#include "shrpx_thread_event_receiver.h" +#include "shrpx_log.h" + +namespace shrpx { + +Worker::Worker(int fd) + : fd_(fd), + ssl_ctx_(ssl::create_ssl_context()) +{} + +Worker::~Worker() +{ + SSL_CTX_free(ssl_ctx_); + shutdown(fd_, SHUT_WR); + close(fd_); +} + +namespace { +void readcb(bufferevent *bev, void *arg) +{ + ThreadEventReceiver *receiver = reinterpret_cast(arg); + receiver->on_read(bev); +} +} // namespace + +namespace { +void eventcb(bufferevent *bev, short events, void *arg) +{ + if(events & BEV_EVENT_EOF) { + LOG(ERROR) << "Connection to main thread lost: eof"; + } + if(events & BEV_EVENT_ERROR) { + LOG(ERROR) << "Connection to main thread lost: network error"; + } +} +} // namespace + +void Worker::run() +{ + event_base *evbase = event_base_new(); + bufferevent *bev = bufferevent_socket_new(evbase, fd_, + BEV_OPT_DEFER_CALLBACKS); + ThreadEventReceiver *receiver = new ThreadEventReceiver(ssl_ctx_); + bufferevent_enable(bev, EV_READ); + bufferevent_setcb(bev, readcb, 0, eventcb, receiver); + + event_base_loop(evbase, 0); + + delete receiver; +} + +void* start_threaded_worker(void *arg) +{ + int fd = *reinterpret_cast(arg); + Worker worker(fd); + worker.run(); + return 0; +} + +} // namespace shrpx diff --git a/examples/shrpx_worker.h b/examples/shrpx_worker.h new file mode 100644 index 0000000..2a03065 --- /dev/null +++ b/examples/shrpx_worker.h @@ -0,0 +1,50 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_WORKER_H +#define SHRPX_WORKER_H + +#include "shrpx.h" + +#include +#include + +namespace shrpx { + +class Worker { +public: + Worker(int fd); + ~Worker(); + void run(); +private: + // Channel to the main thread + int fd_; + SSL_CTX *ssl_ctx_; +}; + +void* start_threaded_worker(void *arg); + +} // namespace shrpx + +#endif // SHRPX_WORKER_H