1
0
mirror of https://github.com/moparisthebest/curl synced 2024-11-17 06:55:02 -05:00

multi: multiplexing improvements

Fixes #3436
Closes #3448

 Problem 1

After LOTS of scratching my head, I eventually realized that even when doing
10 uploads in parallel, sometimes the socket callback to the application that
tells it what to wait for on the socket, looked like it would reflect the
status of just the single transfer that just changed state.

Digging into the code revealed that this was indeed the truth. When multiple
transfers are using the same connection, the application did not correctly get
the *combined* flags for all transfers which then could make it switch to READ
(only) when in fact most transfers wanted to get told when the socket was
WRITEABLE.

 Problem 1b

A separate but related regression had also been introduced by me when I
cleared connection/transfer association better a while ago, as now the logic
couldn't find the connection and see if that was marked as used by more
transfers and then it would also prematurely remove the socket from the socket
hash table even in times other transfers were still using it!

 Fix 1

Make sure that each socket stored in the socket hash has a "combined" action
field of what to ask the application to wait for, that is potentially the ORed
action of multiple parallel transfers. And remove that socket hash entry only
if there are no transfers left using it.

 Problem 2

The socket hash entry stored an association to a single transfer using that
socket - and when curl_multi_socket_action() was called to tell libcurl about
activities on that specific socket only that transfer was "handled".

This was WRONG, as a single socket/connection can be used by numerous parallel
transfers and not necessarily a single one.

 Fix 2

We now store a list of handles in the socket hashtable entry and when libcurl
is told there's traffic for a particular socket, it now iterates over all
known transfers using that single socket.
This commit is contained in:
Daniel Stenberg 2019-01-08 14:24:15 +01:00
parent 5f5b5afcb7
commit 4c35574bb7
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
2 changed files with 133 additions and 101 deletions

View File

@ -189,14 +189,17 @@ static void mstate(struct Curl_easy *data, CURLMstate state
#endif #endif
/* /*
* We add one of these structs to the sockhash for a particular socket * We add one of these structs to the sockhash for each socket
*/ */
struct Curl_sh_entry { struct Curl_sh_entry {
struct Curl_easy *easy; struct curl_llist list; /* list of easy handles using this socket */
int action; /* what action READ/WRITE this socket waits for */ unsigned int action; /* what combined action READ/WRITE this socket waits
curl_socket_t socket; /* mainly to ease debugging */ for */
void *socketp; /* settable by users with curl_multi_assign() */ void *socketp; /* settable by users with curl_multi_assign() */
unsigned int users; /* number of transfers using this */
unsigned int readers; /* this many transfers want to read */
unsigned int writers; /* this many transfers want to write */
}; };
/* bits for 'action' having no bits means this socket is not expecting any /* bits for 'action' having no bits means this socket is not expecting any
action */ action */
@ -215,8 +218,7 @@ static struct Curl_sh_entry *sh_getentry(struct curl_hash *sh,
/* make sure this socket is present in the hash for this handle */ /* make sure this socket is present in the hash for this handle */
static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh, static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh,
curl_socket_t s, curl_socket_t s)
struct Curl_easy *data)
{ {
struct Curl_sh_entry *there = sh_getentry(sh, s); struct Curl_sh_entry *there = sh_getentry(sh, s);
struct Curl_sh_entry *check; struct Curl_sh_entry *check;
@ -230,8 +232,7 @@ static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh,
if(!check) if(!check)
return NULL; /* major failure */ return NULL; /* major failure */
check->easy = data; Curl_llist_init(&check->list, NULL);
check->socket = s;
/* make/add new hash entry */ /* make/add new hash entry */
if(!Curl_hash_add(sh, (char *)&s, sizeof(curl_socket_t), check)) { if(!Curl_hash_add(sh, (char *)&s, sizeof(curl_socket_t), check)) {
@ -2354,6 +2355,9 @@ static CURLMcode singlesocket(struct Curl_multi *multi,
curl_socket_t s; curl_socket_t s;
int num; int num;
unsigned int curraction; unsigned int curraction;
int actions[MAX_SOCKSPEREASYHANDLE];
unsigned int comboaction;
bool sincebefore = FALSE;
for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++)
socks[i] = CURL_SOCKET_BAD; socks[i] = CURL_SOCKET_BAD;
@ -2370,7 +2374,8 @@ static CURLMcode singlesocket(struct Curl_multi *multi,
for(i = 0; (i< MAX_SOCKSPEREASYHANDLE) && for(i = 0; (i< MAX_SOCKSPEREASYHANDLE) &&
(curraction & (GETSOCK_READSOCK(i) | GETSOCK_WRITESOCK(i))); (curraction & (GETSOCK_READSOCK(i) | GETSOCK_WRITESOCK(i)));
i++) { i++) {
int action = CURL_POLL_NONE; unsigned int action = CURL_POLL_NONE;
unsigned int prevaction = 0;
s = socks[i]; s = socks[i];
@ -2382,29 +2387,72 @@ static CURLMcode singlesocket(struct Curl_multi *multi,
if(curraction & GETSOCK_WRITESOCK(i)) if(curraction & GETSOCK_WRITESOCK(i))
action |= CURL_POLL_OUT; action |= CURL_POLL_OUT;
actions[i] = action;
comboaction = action;
if(entry) { if(entry) {
/* yeps, already present so check if it has the same action set */ /* check if new for this transfer */
if(entry->action == action) for(i = 0; i< data->numsocks; i++) {
/* same, continue */ if(s == data->sockets[i]) {
continue; prevaction = data->actions[i];
sincebefore = TRUE;
break;
}
}
} }
else { else {
/* this is a socket we didn't have before, add it! */ /* this is a socket we didn't have before, add it to the hash! */
entry = sh_addentry(&multi->sockhash, s, data); entry = sh_addentry(&multi->sockhash, s);
if(!entry) if(!entry)
/* fatal */ /* fatal */
return CURLM_OUT_OF_MEMORY; return CURLM_OUT_OF_MEMORY;
} }
if(sincebefore && (prevaction != action)) {
/* Socket was used already, but different action now */
if(prevaction & CURL_POLL_IN)
entry->readers--;
if(prevaction & CURL_POLL_OUT)
entry->writers--;
if(action & CURL_POLL_IN)
entry->readers++;
if(action & CURL_POLL_OUT)
entry->writers++;
}
else if(!sincebefore) {
/* a new user */
entry->users++;
if(action & CURL_POLL_IN)
entry->readers++;
if(action & CURL_POLL_OUT)
entry->writers++;
/* add 'data' to the list of handles using this socket! */
Curl_llist_insert_next(&entry->list, entry->list.tail,
data, &data->sh_queue);
}
comboaction = (entry->writers? CURL_POLL_OUT : 0) |
(entry->readers ? CURL_POLL_IN : 0);
#if 0
infof(data, "--- Comboaction: %u readers %u writers\n",
entry->readers, entry->writers);
#endif
/* check if it has the same action set */
if(entry->action == comboaction)
/* same, continue */
continue;
/* we know (entry != NULL) at this point, see the logic above */ /* we know (entry != NULL) at this point, see the logic above */
if(multi->socket_cb) if(multi->socket_cb)
multi->socket_cb(data, multi->socket_cb(data,
s, s,
action, comboaction,
multi->socket_userp, multi->socket_userp,
entry->socketp); entry->socketp);
entry->action = action; /* store the current action state */ entry->action = comboaction; /* store the current action state */
} }
num = i; /* number of sockets */ num = i; /* number of sockets */
@ -2413,73 +2461,45 @@ static CURLMcode singlesocket(struct Curl_multi *multi,
make sure to detect sockets that are removed */ make sure to detect sockets that are removed */
for(i = 0; i< data->numsocks; i++) { for(i = 0; i< data->numsocks; i++) {
int j; int j;
bool stillused = FALSE;
s = data->sockets[i]; s = data->sockets[i];
for(j = 0; j<num; j++) { for(j = 0; j < num; j++) {
if(s == socks[j]) { if(s == socks[j]) {
/* this is still supervised */ /* this is still supervised */
s = CURL_SOCKET_BAD; stillused = TRUE;
break; break;
} }
} }
if(stillused)
continue;
entry = sh_getentry(&multi->sockhash, s); entry = sh_getentry(&multi->sockhash, s);
/* if this is NULL here, the socket has been closed and notified so
already by Curl_multi_closed() */
if(entry) { if(entry) {
/* this socket has been removed. Tell the app to remove it */ int oldactions = data->actions[i];
bool remove_sock_from_hash = TRUE; /* this socket has been removed. Decrease user count */
entry->users--;
/* check if the socket to be removed serves a connection which has if(oldactions & CURL_POLL_OUT)
other easy-s in a pipeline. In this case the socket should not be entry->writers--;
removed. */ if(oldactions & CURL_POLL_IN)
struct connectdata *easy_conn = data->easy_conn; entry->readers--;
if(easy_conn) { if(!entry->users) {
if(easy_conn->recv_pipe.size > 1) {
/* the handle should not be removed from the pipe yet */
remove_sock_from_hash = FALSE;
/* Update the sockhash entry to instead point to the next in line
for the recv_pipe, or the first (in case this particular easy
isn't already) */
if(entry->easy == data) {
if(Curl_recvpipe_head(data, easy_conn))
entry->easy = easy_conn->recv_pipe.head->next->ptr;
else
entry->easy = easy_conn->recv_pipe.head->ptr;
}
}
if(easy_conn->send_pipe.size > 1) {
/* the handle should not be removed from the pipe yet */
remove_sock_from_hash = FALSE;
/* Update the sockhash entry to instead point to the next in line
for the send_pipe, or the first (in case this particular easy
isn't already) */
if(entry->easy == data) {
if(Curl_sendpipe_head(data, easy_conn))
entry->easy = easy_conn->send_pipe.head->next->ptr;
else
entry->easy = easy_conn->send_pipe.head->ptr;
}
}
/* Don't worry about overwriting recv_pipe head with send_pipe_head,
when action will be asked on the socket (see multi_socket()), the
head of the correct pipe will be taken according to the
action. */
}
if(remove_sock_from_hash) {
/* in this case 'entry' is always non-NULL */
if(multi->socket_cb) if(multi->socket_cb)
multi->socket_cb(data, multi->socket_cb(data, s, CURL_POLL_REMOVE,
s,
CURL_POLL_REMOVE,
multi->socket_userp, multi->socket_userp,
entry->socketp); entry->socketp);
sh_delentry(&multi->sockhash, s); sh_delentry(&multi->sockhash, s);
} }
} /* if sockhash entry existed */ else {
/* remove this transfer as a user of this socket */
Curl_llist_remove(&entry->list, &data->sh_queue, NULL);
}
}
} /* for loop over numsocks */ } /* for loop over numsocks */
memcpy(data->sockets, socks, num*sizeof(curl_socket_t)); memcpy(data->sockets, socks, num*sizeof(curl_socket_t));
memcpy(data->actions, actions, num*sizeof(int));
data->numsocks = num; data->numsocks = num;
return CURLM_OK; return CURLM_OK;
} }
@ -2619,46 +2639,52 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
and just move on. */ and just move on. */
; ;
else { else {
struct curl_llist *list = &entry->list;
struct curl_llist_element *e;
SIGPIPE_VARIABLE(pipe_st); SIGPIPE_VARIABLE(pipe_st);
data = entry->easy; /* the socket can be shared by many transfers, iterate */
for(e = list->head; e; e = e->next) {
data = (struct Curl_easy *)e->ptr;
if(data->magic != CURLEASY_MAGIC_NUMBER) if(data->magic != CURLEASY_MAGIC_NUMBER)
/* bad bad bad bad bad bad bad */ /* bad bad bad bad bad bad bad */
return CURLM_INTERNAL_ERROR; return CURLM_INTERNAL_ERROR;
/* If the pipeline is enabled, take the handle which is in the head of /* If the pipeline is enabled, take the handle which is in the head of
the pipeline. If we should write into the socket, take the send_pipe the pipeline. If we should write into the socket, take the
head. If we should read from the socket, take the recv_pipe head. */ send_pipe head. If we should read from the socket, take the
if(data->easy_conn) { recv_pipe head. */
if((ev_bitmask & CURL_POLL_OUT) && if(data->easy_conn) {
data->easy_conn->send_pipe.head) if((ev_bitmask & CURL_POLL_OUT) &&
data = data->easy_conn->send_pipe.head->ptr; data->easy_conn->send_pipe.head)
else if((ev_bitmask & CURL_POLL_IN) && data = data->easy_conn->send_pipe.head->ptr;
data->easy_conn->recv_pipe.head) else if((ev_bitmask & CURL_POLL_IN) &&
data = data->easy_conn->recv_pipe.head->ptr; data->easy_conn->recv_pipe.head)
} data = data->easy_conn->recv_pipe.head->ptr;
}
if(data->easy_conn && if(data->easy_conn &&
!(data->easy_conn->handler->flags & PROTOPT_DIRLOCK)) !(data->easy_conn->handler->flags & PROTOPT_DIRLOCK))
/* set socket event bitmask if they're not locked */ /* set socket event bitmask if they're not locked */
data->easy_conn->cselect_bits = ev_bitmask; data->easy_conn->cselect_bits = ev_bitmask;
sigpipe_ignore(data, &pipe_st); sigpipe_ignore(data, &pipe_st);
result = multi_runsingle(multi, now, data); result = multi_runsingle(multi, now, data);
sigpipe_restore(&pipe_st); sigpipe_restore(&pipe_st);
if(data->easy_conn && if(data->easy_conn &&
!(data->easy_conn->handler->flags & PROTOPT_DIRLOCK)) !(data->easy_conn->handler->flags & PROTOPT_DIRLOCK))
/* clear the bitmask only if not locked */ /* clear the bitmask only if not locked */
data->easy_conn->cselect_bits = 0; data->easy_conn->cselect_bits = 0;
if(CURLM_OK >= result) { if(CURLM_OK >= result) {
/* get the socket(s) and check if the state has been changed since /* get the socket(s) and check if the state has been changed since
last */ last */
result = singlesocket(multi, data); result = singlesocket(multi, data);
if(result) if(result)
return result; return result;
}
} }
/* Now we fall-through and do the timer-based stuff, since we don't want /* Now we fall-through and do the timer-based stuff, since we don't want
@ -3002,6 +3028,9 @@ void Curl_expire(struct Curl_easy *data, time_t milli, expire_id id)
DEBUGASSERT(id < EXPIRE_LAST); DEBUGASSERT(id < EXPIRE_LAST);
infof(data, "Expire in %ld ms for %x (transfer %p)\n",
(long)milli, id, data);
set = Curl_now(); set = Curl_now();
set.tv_sec += milli/1000; set.tv_sec += milli/1000;
set.tv_usec += (unsigned int)(milli%1000)*1000; set.tv_usec += (unsigned int)(milli%1000)*1000;
@ -3093,7 +3122,7 @@ void Curl_expire_clear(struct Curl_easy *data)
} }
#ifdef DEBUGBUILD #ifdef DEBUGBUILD
infof(data, "Expire cleared\n"); infof(data, "Expire cleared (transfer %p)\n", data);
#endif #endif
nowp->tv_sec = 0; nowp->tv_sec = 0;
nowp->tv_usec = 0; nowp->tv_usec = 0;

View File

@ -7,7 +7,7 @@
* | (__| |_| | _ <| |___ * | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____| * \___|\___/|_| \_\_____|
* *
* Copyright (C) 1998 - 2018, Daniel Stenberg, <daniel@haxx.se>, et al. * Copyright (C) 1998 - 2019, Daniel Stenberg, <daniel@haxx.se>, et al.
* *
* This software is licensed as described in the file COPYING, which * This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms * you should have received as part of this distribution. The terms
@ -1780,6 +1780,7 @@ struct Curl_easy {
struct connectdata *easy_conn; /* the "unit's" connection */ struct connectdata *easy_conn; /* the "unit's" connection */
struct curl_llist_element connect_queue; struct curl_llist_element connect_queue;
struct curl_llist_element pipeline_queue; struct curl_llist_element pipeline_queue;
struct curl_llist_element sh_queue; /* list per Curl_sh_entry */
CURLMstate mstate; /* the handle's state */ CURLMstate mstate; /* the handle's state */
CURLcode result; /* previous result */ CURLcode result; /* previous result */
@ -1791,6 +1792,8 @@ struct Curl_easy {
the state etc are also kept. This array is mostly used to detect when a the state etc are also kept. This array is mostly used to detect when a
socket is to be removed from the hash. See singlesocket(). */ socket is to be removed from the hash. See singlesocket(). */
curl_socket_t sockets[MAX_SOCKSPEREASYHANDLE]; curl_socket_t sockets[MAX_SOCKSPEREASYHANDLE];
int actions[MAX_SOCKSPEREASYHANDLE]; /* action for each socket in
sockets[] */
int numsocks; int numsocks;
struct Names dns; struct Names dns;