Rewrite connection pooling

This commit is contained in:
Tatsuhiro Tsujikawa 2012-06-09 23:14:00 +09:00
parent adefcad530
commit 9d19e2bfe9
14 changed files with 435 additions and 253 deletions

View File

@ -80,6 +80,7 @@ shrpx_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} \
shrpx_https_upstream.cc shrpx_https_upstream.h \
shrpx_downstream_queue.cc shrpx_downstream_queue.h \
shrpx_downstream.cc shrpx_downstream.h \
shrpx_downstream_connection.cc shrpx_downstream_connection.h \
shrpx_log.cc shrpx_log.h \
shrpx_http.cc shrpx_http.h \
shrpx_io_control.cc shrpx_io_control.h \

View File

@ -36,4 +36,6 @@
#define DIE() \
assert(0);
#define SHRPX_READ_WARTER_MARK (64*1024)
#endif // SHRPX_H

View File

@ -28,6 +28,7 @@
#include "shrpx_spdy_upstream.h"
#include "shrpx_https_upstream.h"
#include "shrpx_config.h"
#include "shrpx_downstream_connection.h"
namespace shrpx {
@ -108,6 +109,7 @@ ClientHandler::ClientHandler(bufferevent *bev, SSL *ssl, const char *ipaddr)
should_close_after_write_(false)
{
bufferevent_enable(bev_, EV_READ | EV_WRITE);
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
set_upstream_timeouts(&get_config()->upstream_read_timeout,
&get_config()->upstream_write_timeout);
set_bev_cb(0, upstream_writecb, upstream_eventcb);
@ -126,6 +128,10 @@ ClientHandler::~ClientHandler()
shutdown(fd, SHUT_WR);
close(fd);
delete upstream_;
for(std::set<DownstreamConnection*>::iterator i = dconn_pool_.begin();
i != dconn_pool_.end(); ++i) {
delete *i;
}
if(ENABLE_LOG) {
LOG(INFO) << "Deleted";
}
@ -213,4 +219,39 @@ void ClientHandler::set_should_close_after_write(bool f)
should_close_after_write_ = f;
}
void ClientHandler::pool_downstream_connection(DownstreamConnection *dconn)
{
if(ENABLE_LOG) {
LOG(INFO) << "Pooling downstream connection " << dconn;
}
dconn_pool_.insert(dconn);
}
void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn)
{
if(ENABLE_LOG) {
LOG(INFO) << "Removing downstream connection " << dconn
<< " from pool";
}
dconn_pool_.erase(dconn);
}
DownstreamConnection* ClientHandler::get_downstream_connection()
{
if(dconn_pool_.empty()) {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream connection pool is empty. Create new one";
}
return new DownstreamConnection(this);
} else {
DownstreamConnection *dconn = *dconn_pool_.begin();
dconn_pool_.erase(dconn);
if(ENABLE_LOG) {
LOG(INFO) << "Reuse downstream connection " << dconn
<< " from pool";
}
return dconn;
}
}
} // namespace shrpx

View File

@ -27,12 +27,15 @@
#include "shrpx.h"
#include <set>
#include <event.h>
#include <openssl/ssl.h>
namespace shrpx {
class Upstream;
class DownstreamConnection;
class ClientHandler {
public:
@ -51,12 +54,18 @@ public:
bool get_should_close_after_write() const;
void set_should_close_after_write(bool f);
Upstream* get_upstream();
void pool_downstream_connection(DownstreamConnection *dconn);
void remove_downstream_connection(DownstreamConnection *dconn);
DownstreamConnection* get_downstream_connection();
private:
bufferevent *bev_;
SSL *ssl_;
Upstream *upstream_;
std::string ipaddr_;
bool should_close_after_write_;
std::set<DownstreamConnection*> dconn_pool_;
};
} // namespace shrpx

View File

@ -31,6 +31,7 @@
#include "shrpx_config.h"
#include "shrpx_error.h"
#include "shrpx_http.h"
#include "shrpx_downstream_connection.h"
#include "util.h"
using namespace spdylay;
@ -39,10 +40,9 @@ namespace shrpx {
Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
: upstream_(upstream),
bev_(0),
dconn_(0),
stream_id_(stream_id),
priority_(priority),
counter_(1),
ioctrl_(0),
request_state_(INITIAL),
request_major_(1),
@ -60,11 +60,6 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
{
htparser_init(response_htp_, htp_type_response);
htparser_set_userdata(response_htp_, this);
event_base *evbase = upstream_->get_client_handler()->get_evbase();
bev_ = bufferevent_socket_new
(evbase, -1,
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
ioctrl_.set_bev(bev_);
}
Downstream::~Downstream()
@ -76,53 +71,30 @@ Downstream::~Downstream()
// Passing NULL to evbuffer_free() causes segmentation fault.
evbuffer_free(response_body_buf_);
}
bufferevent_disable(bev_, EV_READ | EV_WRITE);
bufferevent_free(bev_);
if(dconn_) {
delete dconn_;
}
free(response_htp_);
if(ENABLE_LOG) {
LOG(INFO) << "Deleted";
}
}
int Downstream::get_counter() const
void Downstream::set_downstream_connection(DownstreamConnection *dconn)
{
return counter_;
}
void Downstream::reuse(int stream_id)
{
stream_id_ = stream_id;
++counter_;
request_state_ = INITIAL;
}
void Downstream::idle()
{
stream_id_ = -1;
priority_ = 0;
ioctrl_.force_resume_read();
request_state_ = IDLE;
request_method_.clear();
request_path_.clear();
request_major_ = 1;
request_minor_ = 1;
chunked_request_ = false;
request_connection_close_ = false;
request_headers_.clear();
response_state_ = INITIAL;
response_http_status_ = 0;
response_major_ = 1;
response_minor_ = 1;
chunked_response_ = false;
response_connection_close_ = false;
response_headers_.clear();
if(response_body_buf_) {
size_t len = evbuffer_get_length(response_body_buf_);
evbuffer_drain(response_body_buf_, len);
dconn_ = dconn;
if(dconn_) {
ioctrl_.set_bev(dconn_->get_bev());
} else {
ioctrl_.set_bev(0);
}
}
DownstreamConnection* Downstream::get_downstream_connection()
{
return dconn_;
}
void Downstream::pause_read(IOCtrlReason reason)
{
ioctrl_.pause_read(reason);
@ -218,28 +190,6 @@ int32_t Downstream::get_stream_id() const
return stream_id_;
}
int Downstream::start_connection()
{
bufferevent_setcb(bev_,
upstream_->get_downstream_readcb(),
upstream_->get_downstream_writecb(),
upstream_->get_downstream_eventcb(), this);
bufferevent_enable(bev_, EV_READ | EV_WRITE);
bufferevent_set_timeouts(bev_,
&get_config()->downstream_read_timeout,
&get_config()->downstream_write_timeout);
int rv = bufferevent_socket_connect
(bev_,
// TODO maybe not thread-safe?
const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen);
if(rv == 0) {
return 0;
} else {
return SHRPX_ERR_NETWORK;
}
}
void Downstream::set_request_state(int state)
{
request_state_ = state;
@ -286,6 +236,9 @@ int Downstream::push_request_headers()
via_value = (*i).second;
continue;
}
if(util::strieq((*i).first.c_str(), "host")) {
continue;
}
hdrs += (*i).first;
hdrs += ": ";
hdrs += (*i).second;
@ -318,7 +271,8 @@ int Downstream::push_request_headers()
if(ENABLE_LOG) {
LOG(INFO) << "Downstream request headers\n" << hdrs;
}
evbuffer *output = bufferevent_get_output(bev_);
bufferevent *bev = dconn_->get_bev();
evbuffer *output = bufferevent_get_output(bev);
evbuffer_add(output, hdrs.c_str(), hdrs.size());
return 0;
}
@ -329,13 +283,17 @@ int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen)
// buffer using push_request_headers().
ssize_t res = 0;
int rv;
evbuffer *output = bufferevent_get_output(bev_);
bufferevent *bev = dconn_->get_bev();
evbuffer *output = bufferevent_get_output(bev);
if(chunked_request_) {
char chunk_size_hex[16];
rv = snprintf(chunk_size_hex, sizeof(chunk_size_hex), "%X\r\n",
static_cast<unsigned int>(datalen));
evbuffer_add(output, chunk_size_hex, rv);
res += rv;
rv = evbuffer_add(output, chunk_size_hex, rv);
if(rv == -1) {
return -1;
}
}
rv = evbuffer_add(output, data, datalen);
if(rv == -1) {
@ -348,7 +306,8 @@ int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen)
int Downstream::end_upload_data()
{
if(chunked_request_) {
evbuffer *output = bufferevent_get_output(bev_);
bufferevent *bev = dconn_->get_bev();
evbuffer *output = bufferevent_get_output(bev);
evbuffer_add(output, "0\r\n\r\n", 5);
}
return 0;
@ -493,7 +452,8 @@ htparse_hooks htp_hooks = {
int Downstream::parse_http_response()
{
evbuffer *input = bufferevent_get_input(bev_);
bufferevent *bev = dconn_->get_bev();
evbuffer *input = bufferevent_get_input(bev);
unsigned char *mem = evbuffer_pullup(input, -1);
size_t nread = htparser_run(response_htp_, &htp_hooks,
reinterpret_cast<const char*>(mem),

View File

@ -44,6 +44,7 @@ extern "C" {
namespace shrpx {
class Upstream;
class DownstreamConnection;
typedef std::vector<std::pair<std::string, std::string> > Headers;
@ -51,16 +52,14 @@ class Downstream {
public:
Downstream(Upstream *upstream, int stream_id, int priority);
~Downstream();
int start_connection();
Upstream* get_upstream() const;
int32_t get_stream_id() const;
void set_priority(int pri);
void pause_read(IOCtrlReason reason);
bool resume_read(IOCtrlReason reason);
void force_resume_read();
void idle();
void reuse(int stream_id);
int get_counter() const;
void set_downstream_connection(DownstreamConnection *dconn);
DownstreamConnection* get_downstream_connection();
// downstream request API
const Headers& get_request_headers() const;
void add_request_header(const std::string& name, const std::string& value);
@ -106,10 +105,9 @@ public:
evbuffer* get_response_body_buf();
private:
Upstream *upstream_;
bufferevent *bev_;
DownstreamConnection *dconn_;
int32_t stream_id_;
int priority_;
int counter_;
IOControl ioctrl_;
int request_state_;
std::string request_method_;

View File

@ -0,0 +1,157 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_downstream_connection.h"
#include "shrpx_client_handler.h"
#include "shrpx_upstream.h"
#include "shrpx_downstream.h"
#include "shrpx_config.h"
#include "shrpx_error.h"
namespace shrpx {
DownstreamConnection::DownstreamConnection(ClientHandler *client_handler)
: client_handler_(client_handler),
bev_(0),
downstream_(0)
{
}
DownstreamConnection::~DownstreamConnection()
{
if(bev_) {
bufferevent_disable(bev_, EV_READ | EV_WRITE);
bufferevent_free(bev_);
}
// Downstream and DownstreamConnection may be deleted
// asynchronously.
if(downstream_) {
downstream_->set_downstream_connection(0);
}
}
int DownstreamConnection::attach_downstream(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "Attaching downstream connection " << this << " to "
<< "downstream " << downstream;
}
Upstream *upstream = downstream->get_upstream();
if(!bev_) {
event_base *evbase = client_handler_->get_evbase();
bev_ = bufferevent_socket_new
(evbase, -1,
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
int rv = bufferevent_socket_connect
(bev_,
// TODO maybe not thread-safe?
const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen);
if(rv != 0) {
bufferevent_free(bev_);
bev_ = 0;
return SHRPX_ERR_NETWORK;
}
if(ENABLE_LOG) {
LOG(INFO) << "Connecting to downstream server " << this;
}
}
downstream->set_downstream_connection(this);
downstream_ = downstream;
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
bufferevent_enable(bev_, EV_READ | EV_WRITE);
bufferevent_setcb(bev_,
upstream->get_downstream_readcb(),
upstream->get_downstream_writecb(),
upstream->get_downstream_eventcb(), this);
bufferevent_set_timeouts(bev_,
&get_config()->downstream_read_timeout,
&get_config()->downstream_write_timeout);
return 0;
}
namespace {
// Gets called when DownstreamConnection is pooled in ClientHandler.
void idle_eventcb(bufferevent *bev, short events, void *arg)
{
DownstreamConnection *dconn = reinterpret_cast<DownstreamConnection*>(arg);
if(events & BEV_EVENT_CONNECTED) {
// Downstream was detached before connection established?
// This may be safe to be left.
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connected?" << dconn;
}
return;
}
if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connection EOF " << dconn;
}
} else if(events & BEV_EVENT_TIMEOUT) {
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connection timeout " << dconn;
}
} else if(events & BEV_EVENT_ERROR) {
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connection error " << dconn;
}
}
ClientHandler *client_handler = dconn->get_client_handler();
client_handler->remove_downstream_connection(dconn);
delete dconn;
}
} // namespace
void DownstreamConnection::detach_downstream(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "Detaching downstream connection " << this << " from "
<< "downstream " << downstream;
}
downstream->set_downstream_connection(0);
downstream_ = 0;
bufferevent_enable(bev_, EV_READ | EV_WRITE);
bufferevent_setcb(bev_, 0, 0, idle_eventcb, this);
client_handler_->pool_downstream_connection(this);
}
ClientHandler* DownstreamConnection::get_client_handler()
{
return client_handler_;
}
Downstream* DownstreamConnection::get_downstream()
{
return downstream_;
}
bufferevent* DownstreamConnection::get_bev()
{
return bev_;
}
} // namespace shrpx

View File

@ -0,0 +1,57 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_DOWNSTREAM_CONNECTION_H
#define SHRPX_DOWNSTREAM_CONNECTION_H
#include "shrpx.h"
#include <event.h>
#include <event2/bufferevent.h>
namespace shrpx {
class ClientHandler;
class Downstream;
class DownstreamConnection {
public:
DownstreamConnection(ClientHandler *client_handler);
~DownstreamConnection();
int attach_downstream(Downstream *downstream);
void detach_downstream(Downstream *downstream);
bufferevent* get_bev();
int push_data(const void *data, size_t len);
ClientHandler* get_client_handler();
Downstream* get_downstream();
private:
ClientHandler *client_handler_;
bufferevent *bev_;
Downstream *downstream_;
};
} // namespace shrpx
#endif // SHRPX_DOWNSTREAM_CONNECTION_H

View File

@ -39,10 +39,6 @@ DownstreamQueue::~DownstreamQueue()
i != downstreams_.end(); ++i) {
delete (*i).second;
}
for(std::set<Downstream*>::iterator i = idle_downstreams_.begin();
i != idle_downstreams_.end(); ++i) {
delete *i;
}
}
void DownstreamQueue::add(Downstream *downstream)
@ -50,18 +46,9 @@ void DownstreamQueue::add(Downstream *downstream)
downstreams_[downstream->get_stream_id()] = downstream;
}
int DownstreamQueue::start(Downstream *downstream)
{
return downstream->start_connection();
}
void DownstreamQueue::remove(Downstream *downstream)
{
if(downstream->get_request_state() == Downstream::IDLE) {
idle_downstreams_.erase(downstream);
} else {
downstreams_.erase(downstream->get_stream_id());
}
downstreams_.erase(downstream->get_stream_id());
}
Downstream* DownstreamQueue::find(int32_t stream_id)
@ -74,24 +61,4 @@ Downstream* DownstreamQueue::find(int32_t stream_id)
}
}
Downstream* DownstreamQueue::reuse(int32_t stream_id)
{
if(idle_downstreams_.empty()) {
return 0;
}
Downstream* downstream = *idle_downstreams_.begin();
idle_downstreams_.erase(downstream);
downstream->reuse(stream_id);
add(downstream);
return downstream;
}
void DownstreamQueue::idle(Downstream *downstream)
{
assert(downstream->get_request_state() != Downstream::IDLE);
remove(downstream);
downstream->idle();
idle_downstreams_.insert(downstream);
}
} // namespace shrpx

View File

@ -30,7 +30,6 @@
#include <stdint.h>
#include <map>
#include <set>
namespace shrpx {
@ -41,14 +40,10 @@ public:
DownstreamQueue();
~DownstreamQueue();
void add(Downstream *downstream);
int start(Downstream *downstream);
void remove(Downstream *downstream);
Downstream* find(int32_t stream_id);
Downstream* reuse(int32_t stream_id);
void idle(Downstream *downstream);
private:
std::map<int32_t, Downstream*> downstreams_;
std::set<Downstream*> idle_downstreams_;
};
} // namespace shrpx

View File

@ -29,6 +29,7 @@
#include "shrpx_client_handler.h"
#include "shrpx_downstream.h"
#include "shrpx_downstream_connection.h"
#include "shrpx_http.h"
#include "shrpx_config.h"
#include "shrpx_error.h"
@ -39,7 +40,7 @@ using namespace spdylay;
namespace shrpx {
namespace {
const size_t SHRPX_HTTPS_UPSTREAM_OUTPUT_UPPER_THRES = 512*1024;
const size_t SHRPX_HTTPS_UPSTREAM_OUTPUT_UPPER_THRES = 64*1024;
const size_t SHRPX_HTTPS_MAX_HEADER_LENGTH = 64*1024;
} // namespace
@ -76,20 +77,8 @@ int htp_msg_begin(htparser *htp)
HttpsUpstream *upstream;
upstream = reinterpret_cast<HttpsUpstream*>(htparser_get_userdata(htp));
upstream->reset_current_header_length();
Downstream *downstream = upstream->get_top_downstream();
if(downstream) {
// Keep-Alived connection
if(ENABLE_LOG) {
LOG(INFO) << "Reusing downstream";
}
downstream->reuse(0);
} else {
if(ENABLE_LOG) {
LOG(INFO) << "Creating new downstream";
}
downstream = new Downstream(upstream, 0, 0);
upstream->add_downstream(downstream);
}
Downstream *downstream = new Downstream(upstream, 0, 0);
upstream->add_downstream(downstream);
return 0;
}
} // namespace
@ -169,18 +158,20 @@ int htp_hdrs_completecb(htparser *htp)
downstream->set_request_major(htparser_get_major(htp));
downstream->set_request_minor(htparser_get_minor(htp));
downstream->push_request_headers();
downstream->set_request_state(Downstream::HEADER_COMPLETE);
DownstreamConnection *dconn;
dconn = upstream->get_client_handler()->get_downstream_connection();
if(downstream->get_counter() == 1) {
int rv = downstream->start_connection();
if(rv != 0) {
LOG(ERROR) << "Upstream connection failed";
downstream->set_request_state(Downstream::CONNECT_FAIL);
return 1;
}
int rv = dconn->attach_downstream(downstream);
if(rv != 0) {
downstream->set_request_state(Downstream::CONNECT_FAIL);
downstream->set_downstream_connection(0);
delete dconn;
return 1;
} else {
downstream->push_request_headers();
downstream->set_request_state(Downstream::HEADER_COMPLETE);
return 0;
}
return 0;
}
} // namespace
@ -250,19 +241,29 @@ int HttpsUpstream::on_read()
htpparse_error htperr = htparser_get_error(htp_);
if(htperr == htparse_error_user) {
Downstream *downstream = get_top_downstream();
if(downstream &&
downstream->get_request_state() == Downstream::CONNECT_FAIL) {
if(downstream->get_request_state() == Downstream::CONNECT_FAIL) {
get_client_handler()->set_should_close_after_write(true);
error_reply(503);
} else if(current_header_length_ > SHRPX_HTTPS_MAX_HEADER_LENGTH) {
// Downstream gets deleted after response body is read.
} else {
assert(downstream->get_request_state() == Downstream::MSG_COMPLETE);
if(downstream->get_downstream_connection() == 0) {
// Error response already be sent
assert(downstream->get_response_state() == Downstream::MSG_COMPLETE);
pop_downstream();
delete downstream;
} else {
pause_read(SHRPX_MSG_BLOCK);
}
}
} else if(htperr == htparse_error_none) {
if(current_header_length_ > SHRPX_HTTPS_MAX_HEADER_LENGTH) {
LOG(WARNING) << "Request Header too long:" << current_header_length_
<< " bytes";
get_client_handler()->set_should_close_after_write(true);
error_reply(400);
} else {
pause_read(SHRPX_MSG_BLOCK);
}
} else if(htperr != htparse_error_none) {
} else {
if(ENABLE_LOG) {
LOG(INFO) << "Upstream http parse failure: "
<< htparser_get_strerror(htp_);
@ -273,6 +274,10 @@ int HttpsUpstream::on_read()
return 0;
}
namespace {
void https_downstream_readcb(bufferevent *bev, void *ptr);
} // namespace
int HttpsUpstream::on_write()
{
Downstream *downstream = get_top_downstream();
@ -309,28 +314,28 @@ void HttpsUpstream::resume_read(IOCtrlReason reason)
namespace {
void https_downstream_readcb(bufferevent *bev, void *ptr)
{
Downstream *downstream = reinterpret_cast<Downstream*>(ptr);
DownstreamConnection *dconn = reinterpret_cast<DownstreamConnection*>(ptr);
Downstream *downstream = dconn->get_downstream();
HttpsUpstream *upstream;
upstream = static_cast<HttpsUpstream*>(downstream->get_upstream());
if(downstream->get_request_state() == Downstream::IDLE) {
if(ENABLE_LOG) {
LOG(INFO) << "Delete idle downstream in https_downstream_readcb";
}
upstream->pop_downstream();
delete downstream;
return;
}
int rv = downstream->parse_http_response();
if(rv == 0) {
if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
assert(downstream == upstream->get_top_downstream());
if(downstream->get_response_connection_close()) {
// Connection close
downstream->set_downstream_connection(0);
delete dconn;
dconn = 0;
} else {
// Keep-alive
dconn->detach_downstream(downstream);
}
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
upstream->pop_downstream();
delete downstream;
} else {
downstream->idle();
// Process next HTTP request
upstream->resume_read(SHRPX_MSG_BLOCK);
}
upstream->resume_read(SHRPX_MSG_BLOCK);
} else {
ClientHandler *handler = upstream->get_client_handler();
bufferevent *bev = handler->get_bev();
@ -341,13 +346,19 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
}
} else {
if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
// We already sent HTTP response headers to upstream
// client. Just close the upstream connection.
delete upstream->get_client_handler();
} else {
// We did not sent any HTTP response, so sent error
// response. Cannot reuse downstream connection in this case.
upstream->error_reply(502);
assert(downstream == upstream->get_top_downstream());
upstream->pop_downstream();
delete downstream;
upstream->resume_read(SHRPX_MSG_BLOCK);
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
upstream->pop_downstream();
delete downstream;
// Process next HTTP request
upstream->resume_read(SHRPX_MSG_BLOCK);
}
}
}
}
@ -362,18 +373,10 @@ void https_downstream_writecb(bufferevent *bev, void *ptr)
namespace {
void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
{
Downstream *downstream = reinterpret_cast<Downstream*>(ptr);
DownstreamConnection *dconn = reinterpret_cast<DownstreamConnection*>(ptr);
Downstream *downstream = dconn->get_downstream();
HttpsUpstream *upstream;
upstream = static_cast<HttpsUpstream*>(downstream->get_upstream());
if(downstream->get_request_state() == Downstream::IDLE) {
if(ENABLE_LOG) {
LOG(INFO) << "Delete idle downstream in https_downstream_eventcb";
}
upstream->pop_downstream();
delete downstream;
upstream->resume_read(SHRPX_MSG_BLOCK);
return;
}
if(events & BEV_EVENT_CONNECTED) {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream connection established. downstream "
@ -400,9 +403,11 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
}
upstream->error_reply(502);
}
upstream->pop_downstream();
delete downstream;
upstream->resume_read(SHRPX_MSG_BLOCK);
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
upstream->pop_downstream();
delete downstream;
upstream->resume_read(SHRPX_MSG_BLOCK);
}
} else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream error/timeout. " << downstream;
@ -416,9 +421,11 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
}
upstream->error_reply(status);
}
upstream->pop_downstream();
delete downstream;
upstream->resume_read(SHRPX_MSG_BLOCK);
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
upstream->pop_downstream();
delete downstream;
upstream->resume_read(SHRPX_MSG_BLOCK);
}
}
}
} // namespace
@ -439,6 +446,10 @@ void HttpsUpstream::error_reply(int status_code)
evbuffer *output = bufferevent_get_output(handler_->get_bev());
evbuffer_add(output, header.c_str(), header.size());
evbuffer_add(output, html.c_str(), html.size());
Downstream *downstream = get_top_downstream();
if(downstream) {
downstream->set_response_state(Downstream::MSG_COMPLETE);
}
}
bufferevent_data_cb HttpsUpstream::get_downstream_readcb()

View File

@ -44,14 +44,18 @@ void IOControl::set_bev(bufferevent *bev)
void IOControl::pause_read(IOCtrlReason reason)
{
rdbits_ |= reason;
bufferevent_disable(bev_, EV_READ);
if(bev_) {
bufferevent_disable(bev_, EV_READ);
}
}
bool IOControl::resume_read(IOCtrlReason reason)
{
rdbits_ &= ~reason;
if(rdbits_ == 0) {
bufferevent_enable(bev_, EV_READ);
if(bev_) {
bufferevent_enable(bev_, EV_READ);
}
return true;
} else {
return false;
@ -61,7 +65,9 @@ bool IOControl::resume_read(IOCtrlReason reason)
void IOControl::force_resume_read()
{
rdbits_ = 0;
bufferevent_enable(bev_, EV_READ);
if(bev_) {
bufferevent_enable(bev_, EV_READ);
}
}
} // namespace shrpx

View File

@ -29,6 +29,7 @@
#include "shrpx_client_handler.h"
#include "shrpx_downstream.h"
#include "shrpx_downstream_connection.h"
#include "shrpx_config.h"
#include "shrpx_http.h"
#include "util.h"
@ -38,7 +39,7 @@ using namespace spdylay;
namespace shrpx {
namespace {
const size_t SHRPX_SPDY_UPSTREAM_OUTPUT_UPPER_THRES = 512*1024;
const size_t SHRPX_SPDY_UPSTREAM_OUTPUT_UPPER_THRES = 64*1024;
} // namespace
namespace {
@ -101,7 +102,16 @@ void on_stream_close_callback
} else {
downstream->set_request_state(Downstream::STREAM_CLOSED);
if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
upstream->remove_or_idle_downstream(downstream);
// At this point, downstream response was read
if(!downstream->get_response_connection_close()) {
// Keep-alive
DownstreamConnection *dconn;
dconn = downstream->get_downstream_connection();
if(dconn) {
dconn->detach_downstream(downstream);
}
}
upstream->remove_downstream(downstream);
} else {
// At this point, downstream read may be paused. To reclaim
// file descriptor, enable read here and catch read
@ -126,22 +136,10 @@ void on_ctrl_recv_callback
<< frame->syn_stream.stream_id;
}
Downstream *downstream;
downstream = upstream->reuse_downstream(frame->syn_stream.stream_id);
if(downstream) {
if(ENABLE_LOG) {
LOG(INFO) << "Reusing downstream for stream_id="
<< frame->syn_stream.stream_id;
}
downstream->set_priority(frame->syn_stream.pri);
} else {
if(ENABLE_LOG) {
LOG(INFO) << "Creating new downstream for stream_id="
<< frame->syn_stream.stream_id;
}
downstream = new Downstream(upstream,
frame->syn_stream.stream_id,
frame->syn_stream.pri);
}
downstream = new Downstream(upstream,
frame->syn_stream.stream_id,
frame->syn_stream.pri);
upstream->add_downstream(downstream);
downstream->init_response_body_buf();
char **nv = frame->syn_stream.nv;
@ -164,6 +162,15 @@ void on_ctrl_recv_callback
LOG(INFO) << "Upstream spdy request headers:\n" << ss.str();
}
DownstreamConnection *dconn;
dconn = upstream->get_client_handler()->get_downstream_connection();
int rv = dconn->attach_downstream(downstream);
if(rv != 0) {
// If downstream connection fails, issue RST_STREAM.
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->set_request_state(Downstream::CONNECT_FAIL);
return;
}
downstream->push_request_headers();
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(frame->syn_stream.hd.flags & SPDYLAY_CTRL_FLAG_FIN) {
@ -174,14 +181,6 @@ void on_ctrl_recv_callback
}
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
if(downstream->get_counter() == 1) {
upstream->add_downstream(downstream);
if(upstream->start_downstream(downstream) != 0) {
// If downstream connection fails, issue RST_STREAM.
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->set_request_state(Downstream::CONNECT_FAIL);
}
}
break;
}
default:
@ -297,20 +296,14 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr)
if(ENABLE_LOG) {
LOG(INFO) << "spdy_downstream_readcb";
}
Downstream *downstream = reinterpret_cast<Downstream*>(ptr);
DownstreamConnection *dconn = reinterpret_cast<DownstreamConnection*>(ptr);
Downstream *downstream = dconn->get_downstream();
SpdyUpstream *upstream;
upstream = static_cast<SpdyUpstream*>(downstream->get_upstream());
if(downstream->get_request_state() == Downstream::IDLE) {
if(ENABLE_LOG) {
LOG(INFO) << "Delete idle downstream in spdy_downstream_readcb";
}
upstream->remove_downstream(downstream);
delete downstream;
return;
}
// If upstream SPDY stream was closed, we just close downstream,
// because there is no consumer now.
if(downstream->get_request_state() == Downstream::STREAM_CLOSED) {
// If upstream SPDY stream was closed, we just close downstream,
// because there is no consumer now. Downstream connection is also
// closed in this case.
upstream->remove_downstream(downstream);
delete downstream;
return;
@ -326,6 +319,11 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr)
upstream->error_reply(downstream, 502);
}
downstream->set_response_state(Downstream::MSG_COMPLETE);
// Clearly, we have to close downstream connection on http parser
// failure.
downstream->set_downstream_connection(0);
delete dconn;
dconn = 0;
}
upstream->send();
}
@ -340,17 +338,10 @@ void spdy_downstream_writecb(bufferevent *bev, void *ptr)
namespace {
void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
{
Downstream *downstream = reinterpret_cast<Downstream*>(ptr);
DownstreamConnection *dconn = reinterpret_cast<DownstreamConnection*>(ptr);
Downstream *downstream = dconn->get_downstream();
SpdyUpstream *upstream;
upstream = static_cast<SpdyUpstream*>(downstream->get_upstream());
if(downstream->get_request_state() == Downstream::IDLE) {
if(ENABLE_LOG) {
LOG(INFO) << "Delete idle downstream in spdy_downstream_eventcb";
}
upstream->remove_downstream(downstream);
delete downstream;
return;
}
if(events & BEV_EVENT_CONNECTED) {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream connection established. Downstream "
@ -362,11 +353,16 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
<< downstream->get_stream_id();
}
if(downstream->get_request_state() == Downstream::STREAM_CLOSED) {
//If stream was closed already, we don't need to send reply at
// If stream was closed already, we don't need to send reply at
// the first place. We can delete downstream.
upstream->remove_downstream(downstream);
delete downstream;
} else {
// Delete downstream connection. If we don't delete it here, it
// will be pooled in on_stream_close_callback.
downstream->set_downstream_connection(0);
delete dconn;
dconn = 0;
// downstream wil be deleted in on_stream_close_callback.
if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
// Server may indicate the end of the request by EOF
@ -391,6 +387,11 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
upstream->remove_downstream(downstream);
delete downstream;
} else {
// Delete downstream connection. If we don't delete it here, it
// will be pooled in on_stream_close_callback.
downstream->set_downstream_connection(0);
delete dconn;
dconn = 0;
if(downstream->get_response_state() != Downstream::MSG_COMPLETE) {
if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
@ -499,11 +500,6 @@ void SpdyUpstream::add_downstream(Downstream *downstream)
downstream_queue_.add(downstream);
}
int SpdyUpstream::start_downstream(Downstream *downstream)
{
return downstream_queue_.start(downstream);
}
void SpdyUpstream::remove_downstream(Downstream *downstream)
{
downstream_queue_.remove(downstream);
@ -514,21 +510,6 @@ Downstream* SpdyUpstream::find_downstream(int32_t stream_id)
return downstream_queue_.find(stream_id);
}
Downstream* SpdyUpstream::reuse_downstream(int32_t stream_id)
{
return downstream_queue_.reuse(stream_id);
}
void SpdyUpstream::remove_or_idle_downstream(Downstream *downstream)
{
if(downstream->get_response_connection_close()) {
downstream_queue_.remove(downstream);
delete downstream;
} else {
downstream_queue_.idle(downstream);
}
}
spdylay_session* SpdyUpstream::get_spdy_session()
{
return session_;

View File

@ -50,10 +50,7 @@ public:
virtual bufferevent_event_cb get_downstream_eventcb();
void add_downstream(Downstream *downstream);
void remove_downstream(Downstream *downstream);
int start_downstream(Downstream *downstream);
Downstream* find_downstream(int32_t stream_id);
Downstream* reuse_downstream(int32_t stream_id);
void remove_or_idle_downstream(Downstream *downstream);
spdylay_session* get_spdy_session();