Dmitry Kurochkin worked a lot on improving the HTTP Pipelining support that

previously had a number of flaws, perhaps most notably when an application
fired up N transfers at once as then they wouldn't pipeline at all that
nicely as anyone would think... Test case 530 was also updated to take the
improved functionality into account.
This commit is contained in:
Daniel Stenberg 2008-01-16 12:24:00 +00:00
parent ed6466d176
commit b3de497d83
11 changed files with 273 additions and 121 deletions

View File

@ -7,6 +7,12 @@
Changelog
Daniel S (16 Jan 2008)
- Dmitry Kurochkin worked a lot on improving the HTTP Pipelining support that
previously had a number of flaws, perhaps most notably when an application
fired up N transfers at once as then they wouldn't pipeline at all that
nicely as anyone would think... Test case 530 was also updated to take the
improved functionality into account.
- Calls to Curl_failf() are not supposed to provide a trailing newline as the
function itself adds that. Fixed on 50 or something strings!

View File

@ -54,6 +54,7 @@ This release includes the following bugfixes:
o range support for file:// transfers
o libcurl hang with huge POST request and request-body read from callback
o removed extra newlines from many error messages
o improved pipelining
This release includes the following known bugs:

View File

@ -1,11 +1,4 @@
To be addressed before 7.18.0 (planned release: January 2008)
=============================
Less likely to go in 7.18.0
===========================
112 - pipelining patch(es) from Dmitry Kurochkin (outstanding work left probably
no longer targeted for this release)
http://curl.haxx.se/mail/lib-2007-12/0252.html
118 -

View File

@ -5,7 +5,7 @@
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) 1998 - 2007, Daniel Stenberg, <daniel@haxx.se>, et al.
* Copyright (C) 1998 - 2008, Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
@ -136,3 +136,52 @@ Curl_llist_count(struct curl_llist *list)
{
return list->size;
}
int Curl_llist_move(struct curl_llist *list, struct curl_llist_element *e,
struct curl_llist *to_list, struct curl_llist_element *to_e)
{
/* Remove element from list */
if(e == NULL || list->size == 0)
return 0;
if(e == list->head) {
list->head = e->next;
if(list->head == NULL)
list->tail = NULL;
else
e->next->prev = NULL;
}
else {
e->prev->next = e->next;
if(!e->next)
list->tail = e->prev;
else
e->next->prev = e->prev;
}
--list->size;
/* Add element to to_list after to_e */
if(to_list->size == 0) {
to_list->head = e;
to_list->head->prev = NULL;
to_list->head->next = NULL;
to_list->tail = e;
}
else {
e->next = to_e->next;
e->prev = to_e;
if(to_e->next) {
to_e->next->prev = e;
}
else {
to_list->tail = e;
}
to_e->next = e;
}
++to_list->size;
return 1;
}

View File

@ -7,7 +7,7 @@
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) 1998 - 2005, Daniel Stenberg, <daniel@haxx.se>, et al.
* Copyright (C) 1998 - 2008, Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
@ -56,5 +56,7 @@ int Curl_llist_remove_next(struct curl_llist *, struct curl_llist_element *,
void *);
size_t Curl_llist_count(struct curl_llist *);
void Curl_llist_destroy(struct curl_llist *, void *);
int Curl_llist_move(struct curl_llist *, struct curl_llist_element *,
struct curl_llist *, struct curl_llist_element *);
#endif

View File

@ -5,7 +5,7 @@
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) 1998 - 2007, Daniel Stenberg, <daniel@haxx.se>, et al.
* Copyright (C) 1998 - 2008, Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
@ -190,6 +190,14 @@ static void add_closure(struct Curl_multi *multi,
struct SessionHandle *data);
static int update_timer(struct Curl_multi *multi);
static CURLcode addHandleToSendOrPendPipeline(struct SessionHandle *handle,
struct connectdata *conn);
static int checkPendPipeline(struct connectdata *conn);
static int moveHandleFromSendToRecvPipeline(struct SessionHandle *habdle,
struct connectdata *conn);
static bool isHandleAtHead(struct SessionHandle *handle,
struct curl_llist *pipeline);
#ifdef CURLDEBUG
static const char * const statename[]={
"INIT",
@ -932,28 +940,32 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
&async, &protocol_connect);
if(CURLE_OK == easy->result) {
/* Add this handle to the send pipeline */
easy->result = Curl_addHandleToPipeline(easy->easy_handle,
easy->easy_conn->send_pipe);
/* Add this handle to the send or pend pipeline */
easy->result = addHandleToSendOrPendPipeline(easy->easy_handle,
easy->easy_conn);
if(CURLE_OK == easy->result) {
if(async)
/* We're now waiting for an asynchronous name lookup */
multistate(easy, CURLM_STATE_WAITRESOLVE);
if (easy->easy_handle->state.is_in_pipeline)
multistate(easy, CURLM_STATE_WAITDO);
else {
/* after the connect has been sent off, go WAITCONNECT unless the
protocol connect is already done and we can go directly to
WAITDO! */
result = CURLM_CALL_MULTI_PERFORM;
if(protocol_connect)
multistate(easy, CURLM_STATE_WAITDO);
if(async)
/* We're now waiting for an asynchronous name lookup */
multistate(easy, CURLM_STATE_WAITRESOLVE);
else {
/* after the connect has been sent off, go WAITCONNECT unless the
protocol connect is already done and we can go directly to
WAITDO! */
result = CURLM_CALL_MULTI_PERFORM;
if(protocol_connect)
multistate(easy, CURLM_STATE_WAITDO);
else {
#ifndef CURL_DISABLE_HTTP
if(easy->easy_conn->bits.tunnel_connecting)
multistate(easy, CURLM_STATE_WAITPROXYCONNECT);
else
if(easy->easy_conn->bits.tunnel_connecting)
multistate(easy, CURLM_STATE_WAITPROXYCONNECT);
else
#endif
multistate(easy, CURLM_STATE_WAITCONNECT);
multistate(easy, CURLM_STATE_WAITCONNECT);
}
}
}
}
@ -1077,12 +1089,12 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
easy->easy_conn->connectindex,
easy->easy_conn->send_pipe->size,
easy->easy_conn->writechannel_inuse,
Curl_isHandleAtHead(easy->easy_handle,
easy->easy_conn->send_pipe));
isHandleAtHead(easy->easy_handle,
easy->easy_conn->send_pipe));
#endif
if(!easy->easy_conn->writechannel_inuse &&
Curl_isHandleAtHead(easy->easy_handle,
easy->easy_conn->send_pipe)) {
isHandleAtHead(easy->easy_handle,
easy->easy_conn->send_pipe)) {
/* Grab the channel */
easy->easy_conn->writechannel_inuse = TRUE;
multistate(easy, CURLM_STATE_DO);
@ -1190,12 +1202,10 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
break;
case CURLM_STATE_DO_DONE:
/* Remove ourselves from the send pipeline */
Curl_removeHandleFromPipeline(easy->easy_handle,
easy->easy_conn->send_pipe);
/* Add ourselves to the recv pipeline */
easy->result = Curl_addHandleToPipeline(easy->easy_handle,
easy->easy_conn->recv_pipe);
/* Move ourselves from the send to recv pipeline */
moveHandleFromSendToRecvPipeline(easy->easy_handle, easy->easy_conn);
/* Check if we can move pending requests to send pipe */
checkPendPipeline(easy->easy_conn);
multistate(easy, CURLM_STATE_WAITPERFORM);
result = CURLM_CALL_MULTI_PERFORM;
break;
@ -1206,13 +1216,13 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
easy->easy_conn->connectindex,
easy->easy_conn->recv_pipe->size,
easy->easy_conn->readchannel_inuse,
Curl_isHandleAtHead(easy->easy_handle,
easy->easy_conn->recv_pipe));
isHandleAtHead(easy->easy_handle,
easy->easy_conn->recv_pipe));
#endif
/* Wait for our turn to PERFORM */
if(!easy->easy_conn->readchannel_inuse &&
Curl_isHandleAtHead(easy->easy_handle,
easy->easy_conn->recv_pipe)) {
isHandleAtHead(easy->easy_handle,
easy->easy_conn->recv_pipe)) {
/* Grab the channel */
easy->easy_conn->readchannel_inuse = TRUE;
multistate(easy, CURLM_STATE_PERFORM);
@ -1292,6 +1302,8 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
if(easy->easy_handle->req.newurl || retry) {
Curl_removeHandleFromPipeline(easy->easy_handle,
easy->easy_conn->recv_pipe);
/* Check if we can move pending requests to send pipe */
checkPendPipeline(easy->easy_conn);
if(!retry) {
/* if the URL is a follow-location and not just a retried request
then figure out the URL here */
@ -1323,6 +1335,8 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
/* Remove ourselves from the receive pipeline */
Curl_removeHandleFromPipeline(easy->easy_handle,
easy->easy_conn->recv_pipe);
/* Check if we can move pending requests to send pipe */
checkPendPipeline(easy->easy_conn);
easy->easy_handle->state.is_in_pipeline = FALSE;
if(easy->easy_conn->bits.stream_was_rewound) {
@ -1390,6 +1404,8 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
easy->easy_conn->send_pipe);
Curl_removeHandleFromPipeline(easy->easy_handle,
easy->easy_conn->recv_pipe);
/* Check if we can move pending requests to send pipe */
checkPendPipeline(easy->easy_conn);
}
if(disconnect_conn) {
@ -1952,6 +1968,77 @@ static int update_timer(struct Curl_multi *multi)
return multi->timer_cb((CURLM*)multi, timeout_ms, multi->timer_userp);
}
static CURLcode addHandleToSendOrPendPipeline(struct SessionHandle *handle,
struct connectdata *conn)
{
size_t pipeLen = conn->send_pipe->size + conn->recv_pipe->size;
struct curl_llist *pipeline;
if(!Curl_isPipeliningEnabled(handle) ||
pipeLen == 0)
pipeline = conn->send_pipe;
else {
if(conn->server_supports_pipelining &&
pipeLen < MAX_PIPELINE_LENGTH)
pipeline = conn->send_pipe;
else
pipeline = conn->pend_pipe;
}
return Curl_addHandleToPipeline(handle, pipeline);
}
static int checkPendPipeline(struct connectdata *conn)
{
int result = 0;
if (conn->server_supports_pipelining) {
size_t pipeLen = conn->send_pipe->size + conn->recv_pipe->size;
struct curl_llist_element *curr = conn->pend_pipe->head;
while(pipeLen < MAX_PIPELINE_LENGTH && curr) {
Curl_llist_move(conn->pend_pipe, curr,
conn->send_pipe, conn->send_pipe->tail);
Curl_pgrsTime(curr->ptr, TIMER_CONNECT);
++result; /* count how many handles we moved */
curr = conn->pend_pipe->head;
++pipeLen;
}
if (result > 0)
conn->now = Curl_tvnow();
}
return result;
}
static int moveHandleFromSendToRecvPipeline(struct SessionHandle *handle,
struct connectdata *conn)
{
struct curl_llist_element *curr;
curr = conn->send_pipe->head;
while(curr) {
if(curr->ptr == handle) {
Curl_llist_move(conn->send_pipe, curr,
conn->recv_pipe, conn->recv_pipe->tail);
return 1; /* we moved a handle */
}
curr = curr->next;
}
return 0;
}
static bool isHandleAtHead(struct SessionHandle *handle,
struct curl_llist *pipeline)
{
struct curl_llist_element *curr = pipeline->head;
if(curr)
return (bool)(curr->ptr == handle);
return FALSE;
}
/* given a number of milliseconds from now to use to set the 'act before
this'-time for the transfer, to be extracted by curl_multi_timeout() */
void Curl_expire(struct SessionHandle *data, long milli)

View File

@ -830,6 +830,15 @@ CURLcode Curl_readwrite(struct connectdata *conn,
infof(data, "HTTP 1.0, assume close after body\n");
conn->bits.close = TRUE;
}
else if(k->httpversion >= 11 &&
!conn->bits.close) {
/* If HTTP version is >= 1.1 and connection is persistent
server supports pipelining. */
DEBUGF(infof(data,
"HTTP 1.1 or later with persistent connection, "
"pipelining supported\n"));
conn->server_supports_pipelining = TRUE;
}
switch(k->httpcode) {
case 204:

152
lib/url.c
View File

@ -159,7 +159,6 @@ static bool ConnectionExists(struct SessionHandle *data,
static long ConnectionStore(struct SessionHandle *data,
struct connectdata *conn);
static bool IsPipeliningPossible(const struct SessionHandle *handle);
static bool IsPipeliningEnabled(const struct SessionHandle *handle);
static void conn_free(struct connectdata *conn);
static void signalPipeClose(struct curl_llist *pipeline);
@ -176,8 +175,6 @@ static void flush_cookies(struct SessionHandle *data, int cleanup);
#define verboseconnect(x) do { } while (0)
#endif
#define MAX_PIPELINE_LENGTH 5
#ifndef USE_ARES
/* not for ares builds */
@ -425,6 +422,16 @@ CURLcode Curl_close(struct SessionHandle *data)
}
}
}
pipeline = connptr->pend_pipe;
if(pipeline) {
for (curr = pipeline->head; curr; curr=curr->next) {
if(data == (struct SessionHandle *) curr->ptr) {
fprintf(stderr,
"MAJOR problem we %p are still in pend pipe for %p done %d\n",
data, connptr, connptr->bits.done);
}
}
}
}
}
#endif
@ -2105,6 +2112,7 @@ static void conn_free(struct connectdata *conn)
Curl_llist_destroy(conn->send_pipe, NULL);
Curl_llist_destroy(conn->recv_pipe, NULL);
Curl_llist_destroy(conn->pend_pipe, NULL);
/* possible left-overs from the async name resolvers */
#if defined(USE_ARES)
@ -2188,9 +2196,10 @@ CURLcode Curl_disconnect(struct connectdata *conn)
Curl_ssl_close(conn, FIRSTSOCKET);
/* Indicate to all handles on the pipe that we're dead */
if(IsPipeliningEnabled(data)) {
if(Curl_isPipeliningEnabled(data)) {
signalPipeClose(conn->send_pipe);
signalPipeClose(conn->recv_pipe);
signalPipeClose(conn->pend_pipe);
}
conn_free(conn);
@ -2228,7 +2237,7 @@ static bool IsPipeliningPossible(const struct SessionHandle *handle)
return FALSE;
}
static bool IsPipeliningEnabled(const struct SessionHandle *handle)
bool Curl_isPipeliningEnabled(const struct SessionHandle *handle)
{
if(handle->multi && Curl_multi_canPipeline(handle->multi))
return TRUE;
@ -2251,9 +2260,8 @@ CURLcode Curl_addHandleToPipeline(struct SessionHandle *data,
return CURLE_OK;
}
int Curl_removeHandleFromPipeline(struct SessionHandle *handle,
struct curl_llist *pipeline)
struct curl_llist *pipeline)
{
struct curl_llist_element *curr;
@ -2283,17 +2291,6 @@ static void Curl_printPipeline(struct curl_llist *pipeline)
}
#endif
bool Curl_isHandleAtHead(struct SessionHandle *handle,
struct curl_llist *pipeline)
{
struct curl_llist_element *curr = pipeline->head;
if(curr) {
return (bool)(curr->ptr == handle);
}
return FALSE;
}
static struct SessionHandle* gethandleathead(struct curl_llist *pipeline)
{
struct curl_llist_element *curr = pipeline->head;
@ -2369,43 +2366,6 @@ ConnectionExists(struct SessionHandle *data,
from the multi */
}
if(pipeLen > 0 && !canPipeline) {
/* can only happen within multi handles, and means that another easy
handle is using this connection */
continue;
}
#ifdef CURLRES_ASYNCH
/* ip_addr_str is NULL only if the resolving of the name hasn't completed
yet and until then we don't re-use this connection */
if(!check->ip_addr_str) {
infof(data,
"Connection #%ld hasn't finished name resolve, can't reuse\n",
check->connectindex);
continue;
}
#endif
if((check->sock[FIRSTSOCKET] == CURL_SOCKET_BAD) || check->bits.close) {
/* Don't pick a connection that hasn't connected yet or that is going to
get closed. */
infof(data, "Connection #%ld isn't open enough, can't reuse\n",
check->connectindex);
#ifdef CURLDEBUG
if(check->recv_pipe->size > 0) {
infof(data, "BAD! Unconnected #%ld has a non-empty recv pipeline!\n",
check->connectindex);
}
#endif
continue;
}
if(pipeLen >= MAX_PIPELINE_LENGTH) {
infof(data, "Connection #%ld has its pipeline full, can't reuse\n",
check->connectindex);
continue;
}
if(canPipeline) {
/* Make sure the pipe has only GET requests */
struct SessionHandle* sh = gethandleathead(check->send_pipe);
@ -2418,6 +2378,45 @@ ConnectionExists(struct SessionHandle *data,
if(!IsPipeliningPossible(rh))
continue;
}
#ifdef CURLDEBUG
if(pipeLen > MAX_PIPELINE_LENGTH) {
infof(data, "BAD! Connection #%ld has too big pipeline!\n",
check->connectindex);
}
#endif
}
else {
if(pipeLen > 0) {
/* can only happen within multi handles, and means that another easy
handle is using this connection */
continue;
}
#ifdef CURLRES_ASYNCH
/* ip_addr_str is NULL only if the resolving of the name hasn't completed
yet and until then we don't re-use this connection */
if(!check->ip_addr_str) {
infof(data,
"Connection #%ld hasn't finished name resolve, can't reuse\n",
check->connectindex);
continue;
}
#endif
if((check->sock[FIRSTSOCKET] == CURL_SOCKET_BAD) || check->bits.close) {
/* Don't pick a connection that hasn't connected yet or that is going to
get closed. */
infof(data, "Connection #%ld isn't open enough, can't reuse\n",
check->connectindex);
#ifdef CURLDEBUG
if(check->recv_pipe->size > 0) {
infof(data, "BAD! Unconnected #%ld has a non-empty recv pipeline!\n",
check->connectindex);
}
#endif
continue;
}
}
if((needle->protocol&PROT_SSL) != (check->protocol&PROT_SSL))
@ -2478,7 +2477,7 @@ ConnectionExists(struct SessionHandle *data,
}
if(match) {
if(!IsPipeliningEnabled(data)) {
if(!Curl_isPipeliningEnabled(data)) {
/* The check for a dead socket makes sense only in the
non-pipelining case */
bool dead = SocketIsDead(check->sock[FIRSTSOCKET]);
@ -2561,7 +2560,7 @@ static void
ConnectionDone(struct connectdata *conn)
{
conn->inuse = FALSE;
if(!conn->send_pipe && !conn->recv_pipe)
if(!conn->send_pipe && !conn->recv_pipe && !conn->pend_pipe)
conn->is_in_pipeline = FALSE;
}
@ -3555,7 +3554,8 @@ static CURLcode CreateConnection(struct SessionHandle *data,
/* Initialize the pipeline lists */
conn->send_pipe = Curl_llist_alloc((curl_llist_dtor) llist_dtor);
conn->recv_pipe = Curl_llist_alloc((curl_llist_dtor) llist_dtor);
if(!conn->send_pipe || !conn->recv_pipe)
conn->pend_pipe = Curl_llist_alloc((curl_llist_dtor) llist_dtor);
if(!conn->send_pipe || !conn->recv_pipe || !conn->pend_pipe)
return CURLE_OUT_OF_MEMORY;
/* This initing continues below, see the comment "Continue connectdata
@ -4019,6 +4019,7 @@ static CURLcode CreateConnection(struct SessionHandle *data,
Curl_safefree(old_conn->proxypasswd);
Curl_llist_destroy(old_conn->send_pipe, NULL);
Curl_llist_destroy(old_conn->recv_pipe, NULL);
Curl_llist_destroy(old_conn->pend_pipe, NULL);
Curl_safefree(old_conn->master_buffer);
free(old_conn); /* we don't need this anymore */
@ -4353,26 +4354,24 @@ CURLcode Curl_connect(struct SessionHandle *data,
if(CURLE_OK == code) {
/* no error */
if(dns || !*asyncp)
/* If an address is available it means that we already have the name
resolved, OR it isn't async. if this is a re-used connection 'dns'
will be NULL here. Continue connecting from here */
code = SetupConnection(*in_connect, dns, protocol_done);
/* else
response will be received and treated async wise */
}
if(CURLE_OK != code) {
/* We're not allowed to return failure with memory left allocated
in the connectdata struct, free those here */
if(*in_connect) {
Curl_disconnect(*in_connect); /* close the connection */
*in_connect = NULL; /* return a NULL */
}
}
else {
if((*in_connect)->is_in_pipeline)
data->state.is_in_pipeline = TRUE;
else {
if(dns || !*asyncp)
/* If an address is available it means that we already have the name
resolved, OR it isn't async. if this is a re-used connection 'dns'
will be NULL here. Continue connecting from here */
code = SetupConnection(*in_connect, dns, protocol_done);
/* else
response will be received and treated async wise */
}
}
if(CURLE_OK != code && *in_connect) {
/* We're not allowed to return failure with memory left allocated
in the connectdata struct, free those here */
Curl_disconnect(*in_connect); /* close the connection */
*in_connect = NULL; /* return a NULL */
}
return code;
@ -4426,6 +4425,7 @@ CURLcode Curl_done(struct connectdata **connp,
if(Curl_removeHandleFromPipeline(data, conn->send_pipe) &&
conn->writechannel_inuse)
conn->writechannel_inuse = FALSE;
Curl_removeHandleFromPipeline(data, conn->pend_pipe);
/* Cleanup possible redirect junk */
if(data->req.newurl) {

View File

@ -7,7 +7,7 @@
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) 1998 - 2007, Daniel Stenberg, <daniel@haxx.se>, et al.
* Copyright (C) 1998 - 2008, Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
@ -64,12 +64,11 @@ int Curl_doing_getsock(struct connectdata *conn,
curl_socket_t *socks,
int numsocks);
bool Curl_isPipeliningEnabled(const struct SessionHandle *handle);
CURLcode Curl_addHandleToPipeline(struct SessionHandle *handle,
struct curl_llist *pipeline);
int Curl_removeHandleFromPipeline(struct SessionHandle *handle,
struct curl_llist *pipeline);
bool Curl_isHandleAtHead(struct SessionHandle *handle,
struct curl_llist *pipeline);
void Curl_close_connections(struct SessionHandle *data);

View File

@ -952,11 +952,16 @@ struct connectdata {
bool writechannel_inuse; /* whether the write channel is in use by an easy
handle */
bool is_in_pipeline; /* TRUE if this connection is in a pipeline */
bool server_supports_pipelining; /* TRUE if server supports pipelining,
set after first response */
struct curl_llist *send_pipe; /* List of handles waiting to
send on this pipeline */
struct curl_llist *recv_pipe; /* List of handles waiting to read
their responses on this pipeline */
struct curl_llist *pend_pipe; /* List of pending handles on
this pipeline */
#define MAX_PIPELINE_LENGTH 5
char* master_buffer; /* The master buffer allocated on-demand;
used for pipelining. */

View File

@ -8,7 +8,8 @@ Pipelining
# Server-side
<reply>
<servercmd>
pipe: 4
pipe: 1
pipe: 3
</servercmd>
<data>
HTTP/1.1 200 OK