mirror of
https://github.com/moparisthebest/curl
synced 2025-01-10 21:48:10 -05:00
multi: implement wait using winsock events
This avoids using a pair of TCP ports to provide wakeup functionality for every multi instance on Windows, where socketpair() is emulated using a TCP socket on loopback which could in turn lead to socket resource exhaustion. A previous version of this patch failed to account for how in WinSock, FD_WRITE is set only once when writing becomes possible and not again until after a send has failed due to the buffer filling. This contrasts to how FD_READ and FD_OOB continue to be set until the conditions they refer to no longer apply. This meant that if a user wrote some data to a socket, but not enough data to completely fill its send buffer, then waited on that socket to become writable, we'd erroneously stall until their configured timeout rather than returning immediately. This version of the patch addresses that issue by checking each socket we're waiting on to become writable with select() before the wait, and zeroing the timeout if it's already writable. Assisted-by: Marc Hörsken Reviewed-by: Marcel Raad Reviewed-by: Daniel Stenberg Tested-by: Gergely Nagy Tested-by: Rasmus Melchior Jacobsen Tested-by: Tomas Berger Replaces #5397 Reverts #5632 Closes #5634
This commit is contained in:
parent
17f58c8d98
commit
d2a7d7c185
160
lib/multi.c
160
lib/multi.c
@ -374,6 +374,11 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* socket hash */
|
|||||||
multi->max_concurrent_streams = 100;
|
multi->max_concurrent_streams = 100;
|
||||||
multi->ipv6_works = Curl_ipv6works(NULL);
|
multi->ipv6_works = Curl_ipv6works(NULL);
|
||||||
|
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
multi->wsa_event = WSACreateEvent();
|
||||||
|
if(multi->wsa_event == WSA_INVALID_EVENT)
|
||||||
|
goto error;
|
||||||
|
#else
|
||||||
#ifdef ENABLE_WAKEUP
|
#ifdef ENABLE_WAKEUP
|
||||||
if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, multi->wakeup_pair) < 0) {
|
if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, multi->wakeup_pair) < 0) {
|
||||||
multi->wakeup_pair[0] = CURL_SOCKET_BAD;
|
multi->wakeup_pair[0] = CURL_SOCKET_BAD;
|
||||||
@ -386,6 +391,7 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* socket hash */
|
|||||||
multi->wakeup_pair[0] = CURL_SOCKET_BAD;
|
multi->wakeup_pair[0] = CURL_SOCKET_BAD;
|
||||||
multi->wakeup_pair[1] = CURL_SOCKET_BAD;
|
multi->wakeup_pair[1] = CURL_SOCKET_BAD;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return multi;
|
return multi;
|
||||||
@ -1081,11 +1087,16 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
|
|||||||
unsigned int i;
|
unsigned int i;
|
||||||
unsigned int nfds = 0;
|
unsigned int nfds = 0;
|
||||||
unsigned int curlfds;
|
unsigned int curlfds;
|
||||||
bool ufds_malloc = FALSE;
|
|
||||||
long timeout_internal;
|
long timeout_internal;
|
||||||
int retcode = 0;
|
int retcode = 0;
|
||||||
|
#ifndef USE_WINSOCK
|
||||||
struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
|
struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
|
||||||
struct pollfd *ufds = &a_few_on_stack[0];
|
struct pollfd *ufds = &a_few_on_stack[0];
|
||||||
|
bool ufds_malloc = FALSE;
|
||||||
|
#else
|
||||||
|
int already_writable = 0;
|
||||||
|
DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT);
|
||||||
|
#endif
|
||||||
|
|
||||||
if(!GOOD_MULTI_HANDLE(multi))
|
if(!GOOD_MULTI_HANDLE(multi))
|
||||||
return CURLM_BAD_HANDLE;
|
return CURLM_BAD_HANDLE;
|
||||||
@ -1131,11 +1142,16 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
|
|||||||
nfds += extra_nfds; /* add the externally provided ones */
|
nfds += extra_nfds; /* add the externally provided ones */
|
||||||
|
|
||||||
#ifdef ENABLE_WAKEUP
|
#ifdef ENABLE_WAKEUP
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
if(use_wakeup) {
|
||||||
|
#else
|
||||||
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
|
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
|
||||||
|
#endif
|
||||||
++nfds;
|
++nfds;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#ifndef USE_WINSOCK
|
||||||
if(nfds > NUM_POLLS_ON_STACK) {
|
if(nfds > NUM_POLLS_ON_STACK) {
|
||||||
/* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes
|
/* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes
|
||||||
big, so at 2^29 sockets this value might wrap. When a process gets
|
big, so at 2^29 sockets this value might wrap. When a process gets
|
||||||
@ -1146,7 +1162,9 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
|
|||||||
return CURLM_OUT_OF_MEMORY;
|
return CURLM_OUT_OF_MEMORY;
|
||||||
ufds_malloc = TRUE;
|
ufds_malloc = TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
nfds = 0;
|
nfds = 0;
|
||||||
|
#endif
|
||||||
|
|
||||||
/* only do the second loop if we found descriptors in the first stage run
|
/* only do the second loop if we found descriptors in the first stage run
|
||||||
above */
|
above */
|
||||||
@ -1159,22 +1177,45 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
|
|||||||
|
|
||||||
for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) {
|
for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) {
|
||||||
curl_socket_t s = CURL_SOCKET_BAD;
|
curl_socket_t s = CURL_SOCKET_BAD;
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
long mask = 0;
|
||||||
|
#endif
|
||||||
if(bitmap & GETSOCK_READSOCK(i)) {
|
if(bitmap & GETSOCK_READSOCK(i)) {
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
mask |= FD_READ;
|
||||||
|
#else
|
||||||
ufds[nfds].fd = sockbunch[i];
|
ufds[nfds].fd = sockbunch[i];
|
||||||
ufds[nfds].events = POLLIN;
|
ufds[nfds].events = POLLIN;
|
||||||
++nfds;
|
++nfds;
|
||||||
|
#endif
|
||||||
s = sockbunch[i];
|
s = sockbunch[i];
|
||||||
}
|
}
|
||||||
if(bitmap & GETSOCK_WRITESOCK(i)) {
|
if(bitmap & GETSOCK_WRITESOCK(i)) {
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
struct timeval timeout;
|
||||||
|
fd_set writefds;
|
||||||
|
timeout.tv_sec = 0;
|
||||||
|
timeout.tv_usec = 0;
|
||||||
|
FD_ZERO(&writefds);
|
||||||
|
FD_SET(sockbunch[i], &writefds);
|
||||||
|
if(select((int)sockbunch[i] + 1, NULL, &writefds, NULL,
|
||||||
|
&timeout) == 1)
|
||||||
|
already_writable++;
|
||||||
|
mask |= FD_WRITE;
|
||||||
|
#else
|
||||||
ufds[nfds].fd = sockbunch[i];
|
ufds[nfds].fd = sockbunch[i];
|
||||||
ufds[nfds].events = POLLOUT;
|
ufds[nfds].events = POLLOUT;
|
||||||
++nfds;
|
++nfds;
|
||||||
|
#endif
|
||||||
s = sockbunch[i];
|
s = sockbunch[i];
|
||||||
}
|
}
|
||||||
if(s == CURL_SOCKET_BAD) {
|
if(s == CURL_SOCKET_BAD) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
if(WSAEventSelect(s, multi->wsa_event, mask) != 0)
|
||||||
|
return CURLM_INTERNAL_ERROR;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
data = data->next; /* check next handle */
|
data = data->next; /* check next handle */
|
||||||
@ -1183,6 +1224,30 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
|
|||||||
|
|
||||||
/* Add external file descriptions from poll-like struct curl_waitfd */
|
/* Add external file descriptions from poll-like struct curl_waitfd */
|
||||||
for(i = 0; i < extra_nfds; i++) {
|
for(i = 0; i < extra_nfds; i++) {
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
long events = 0;
|
||||||
|
extra_fds[i].revents = 0;
|
||||||
|
if(extra_fds[i].events & CURL_WAIT_POLLIN)
|
||||||
|
events |= FD_READ;
|
||||||
|
if(extra_fds[i].events & CURL_WAIT_POLLPRI)
|
||||||
|
events |= FD_OOB;
|
||||||
|
if(extra_fds[i].events & CURL_WAIT_POLLOUT) {
|
||||||
|
struct timeval timeout;
|
||||||
|
fd_set writefds;
|
||||||
|
timeout.tv_sec = 0;
|
||||||
|
timeout.tv_usec = 0;
|
||||||
|
FD_ZERO(&writefds);
|
||||||
|
FD_SET(extra_fds[i].fd, &writefds);
|
||||||
|
if(select((int)extra_fds[i].fd + 1, NULL, &writefds, NULL,
|
||||||
|
&timeout) == 1) {
|
||||||
|
extra_fds[i].revents = CURL_WAIT_POLLOUT;
|
||||||
|
already_writable++;
|
||||||
|
}
|
||||||
|
events |= FD_WRITE;
|
||||||
|
}
|
||||||
|
if(WSAEventSelect(extra_fds[i].fd, multi->wsa_event, events) != 0)
|
||||||
|
return CURLM_INTERNAL_ERROR;
|
||||||
|
#else
|
||||||
ufds[nfds].fd = extra_fds[i].fd;
|
ufds[nfds].fd = extra_fds[i].fd;
|
||||||
ufds[nfds].events = 0;
|
ufds[nfds].events = 0;
|
||||||
if(extra_fds[i].events & CURL_WAIT_POLLIN)
|
if(extra_fds[i].events & CURL_WAIT_POLLIN)
|
||||||
@ -1192,28 +1257,60 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
|
|||||||
if(extra_fds[i].events & CURL_WAIT_POLLOUT)
|
if(extra_fds[i].events & CURL_WAIT_POLLOUT)
|
||||||
ufds[nfds].events |= POLLOUT;
|
ufds[nfds].events |= POLLOUT;
|
||||||
++nfds;
|
++nfds;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef ENABLE_WAKEUP
|
#ifdef ENABLE_WAKEUP
|
||||||
|
#ifndef USE_WINSOCK
|
||||||
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
|
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
|
||||||
ufds[nfds].fd = multi->wakeup_pair[0];
|
ufds[nfds].fd = multi->wakeup_pair[0];
|
||||||
ufds[nfds].events = POLLIN;
|
ufds[nfds].events = POLLIN;
|
||||||
++nfds;
|
++nfds;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if(nfds) {
|
if(nfds) {
|
||||||
int pollrc;
|
|
||||||
/* wait... */
|
/* wait... */
|
||||||
pollrc = Curl_poll(ufds, nfds, timeout_ms);
|
#ifdef USE_WINSOCK
|
||||||
|
if(already_writable > 0)
|
||||||
|
timeout_ms = 0;
|
||||||
|
WSAWaitForMultipleEvents(1, &multi->wsa_event, FALSE, timeout_ms, FALSE);
|
||||||
|
#else
|
||||||
|
int pollrc = Curl_poll(ufds, nfds, timeout_ms);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
/* With Winsock, we have to run this unconditionally to call
|
||||||
|
WSAEventSelect(fd, event, 0) on all the sockets */
|
||||||
|
{
|
||||||
|
retcode = 0;
|
||||||
|
#else
|
||||||
if(pollrc > 0) {
|
if(pollrc > 0) {
|
||||||
retcode = pollrc;
|
retcode = pollrc;
|
||||||
|
#endif
|
||||||
/* copy revents results from the poll to the curl_multi_wait poll
|
/* copy revents results from the poll to the curl_multi_wait poll
|
||||||
struct, the bit values of the actual underlying poll() implementation
|
struct, the bit values of the actual underlying poll() implementation
|
||||||
may not be the same as the ones in the public libcurl API! */
|
may not be the same as the ones in the public libcurl API! */
|
||||||
for(i = 0; i < extra_nfds; i++) {
|
for(i = 0; i < extra_nfds; i++) {
|
||||||
unsigned short mask = 0;
|
unsigned short mask = 0;
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
WSANETWORKEVENTS events = {0};
|
||||||
|
mask = extra_fds[i].revents;
|
||||||
|
if(WSAEnumNetworkEvents(extra_fds[i].fd, multi->wsa_event,
|
||||||
|
&events) == 0) {
|
||||||
|
if(events.lNetworkEvents & FD_READ)
|
||||||
|
mask |= CURL_WAIT_POLLIN;
|
||||||
|
if(events.lNetworkEvents & FD_WRITE)
|
||||||
|
mask |= CURL_WAIT_POLLOUT;
|
||||||
|
if(events.lNetworkEvents & FD_OOB)
|
||||||
|
mask |= CURL_WAIT_POLLPRI;
|
||||||
|
|
||||||
|
if(events.lNetworkEvents != 0)
|
||||||
|
retcode++;
|
||||||
|
}
|
||||||
|
WSAEventSelect(extra_fds[i].fd, multi->wsa_event, 0);
|
||||||
|
#else
|
||||||
unsigned r = ufds[curlfds + i].revents;
|
unsigned r = ufds[curlfds + i].revents;
|
||||||
|
|
||||||
if(r & POLLIN)
|
if(r & POLLIN)
|
||||||
@ -1222,10 +1319,51 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
|
|||||||
mask |= CURL_WAIT_POLLOUT;
|
mask |= CURL_WAIT_POLLOUT;
|
||||||
if(r & POLLPRI)
|
if(r & POLLPRI)
|
||||||
mask |= CURL_WAIT_POLLPRI;
|
mask |= CURL_WAIT_POLLPRI;
|
||||||
|
#endif
|
||||||
|
|
||||||
extra_fds[i].revents = mask;
|
extra_fds[i].revents = mask;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
/* Count up all our own sockets that had activity,
|
||||||
|
and remove them from the event. */
|
||||||
|
if(curlfds) {
|
||||||
|
data = multi->easyp;
|
||||||
|
while(data) {
|
||||||
|
bitmap = multi_getsock(data, sockbunch);
|
||||||
|
|
||||||
|
for(i = 0; i < MAX_SOCKSPEREASYHANDLE; i++) {
|
||||||
|
if(bitmap & (GETSOCK_READSOCK(i) | GETSOCK_WRITESOCK(i))) {
|
||||||
|
WSANETWORKEVENTS events = {0};
|
||||||
|
if(WSAEnumNetworkEvents(sockbunch[i], multi->wsa_event,
|
||||||
|
&events) == 0) {
|
||||||
|
if(events.lNetworkEvents != 0)
|
||||||
|
retcode++;
|
||||||
|
}
|
||||||
|
if(ret && !events.lNetworkEvents &&
|
||||||
|
(bitmap & GETSOCK_WRITESOCK(i))) {
|
||||||
|
struct timeval timeout;
|
||||||
|
fd_set writefds;
|
||||||
|
timeout.tv_sec = 0;
|
||||||
|
timeout.tv_usec = 0;
|
||||||
|
FD_ZERO(&writefds);
|
||||||
|
FD_SET(sockbunch[i], &writefds);
|
||||||
|
if(select((int)sockbunch[i] + 1, NULL, &writefds, NULL,
|
||||||
|
&timeout) == 1)
|
||||||
|
retcode++;
|
||||||
|
}
|
||||||
|
WSAEventSelect(sockbunch[i], multi->wsa_event, 0);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
data = data->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
WSAResetEvent(multi->wsa_event);
|
||||||
|
#else
|
||||||
#ifdef ENABLE_WAKEUP
|
#ifdef ENABLE_WAKEUP
|
||||||
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
|
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
|
||||||
if(ufds[curlfds + extra_nfds].revents & POLLIN) {
|
if(ufds[curlfds + extra_nfds].revents & POLLIN) {
|
||||||
@ -1238,10 +1376,8 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
|
|||||||
when there is no more data, breaking the loop. */
|
when there is no more data, breaking the loop. */
|
||||||
nread = sread(multi->wakeup_pair[0], buf, sizeof(buf));
|
nread = sread(multi->wakeup_pair[0], buf, sizeof(buf));
|
||||||
if(nread <= 0) {
|
if(nread <= 0) {
|
||||||
#ifndef USE_WINSOCK
|
|
||||||
if(nread < 0 && EINTR == SOCKERRNO)
|
if(nread < 0 && EINTR == SOCKERRNO)
|
||||||
continue;
|
continue;
|
||||||
#endif
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1249,12 +1385,15 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
|
|||||||
retcode--;
|
retcode--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef USE_WINSOCK
|
||||||
if(ufds_malloc)
|
if(ufds_malloc)
|
||||||
free(ufds);
|
free(ufds);
|
||||||
|
#endif
|
||||||
if(ret)
|
if(ret)
|
||||||
*ret = retcode;
|
*ret = retcode;
|
||||||
if(!extrawait || nfds)
|
if(!extrawait || nfds)
|
||||||
@ -1309,6 +1448,10 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi)
|
|||||||
return CURLM_BAD_HANDLE;
|
return CURLM_BAD_HANDLE;
|
||||||
|
|
||||||
#ifdef ENABLE_WAKEUP
|
#ifdef ENABLE_WAKEUP
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
if(WSASetEvent(multi->wsa_event))
|
||||||
|
return CURLM_OK;
|
||||||
|
#else
|
||||||
/* the wakeup_pair variable is only written during init and cleanup,
|
/* the wakeup_pair variable is only written during init and cleanup,
|
||||||
making it safe to access from another thread after the init part
|
making it safe to access from another thread after the init part
|
||||||
and before cleanup */
|
and before cleanup */
|
||||||
@ -1341,6 +1484,7 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi)
|
|||||||
return CURLM_OK;
|
return CURLM_OK;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
#endif
|
#endif
|
||||||
return CURLM_WAKEUP_FAILURE;
|
return CURLM_WAKEUP_FAILURE;
|
||||||
}
|
}
|
||||||
@ -2500,9 +2644,13 @@ CURLMcode curl_multi_cleanup(struct Curl_multi *multi)
|
|||||||
Curl_hash_destroy(&multi->hostcache);
|
Curl_hash_destroy(&multi->hostcache);
|
||||||
Curl_psl_destroy(&multi->psl);
|
Curl_psl_destroy(&multi->psl);
|
||||||
|
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
WSACloseEvent(multi->wsa_event);
|
||||||
|
#else
|
||||||
#ifdef ENABLE_WAKEUP
|
#ifdef ENABLE_WAKEUP
|
||||||
sclose(multi->wakeup_pair[0]);
|
sclose(multi->wakeup_pair[0]);
|
||||||
sclose(multi->wakeup_pair[1]);
|
sclose(multi->wakeup_pair[1]);
|
||||||
|
#endif
|
||||||
#endif
|
#endif
|
||||||
free(multi);
|
free(multi);
|
||||||
|
|
||||||
|
@ -138,9 +138,13 @@ struct Curl_multi {
|
|||||||
previous callback */
|
previous callback */
|
||||||
unsigned int max_concurrent_streams;
|
unsigned int max_concurrent_streams;
|
||||||
|
|
||||||
|
#ifdef USE_WINSOCK
|
||||||
|
WSAEVENT wsa_event; /* winsock event used for waits */
|
||||||
|
#else
|
||||||
#ifdef ENABLE_WAKEUP
|
#ifdef ENABLE_WAKEUP
|
||||||
curl_socket_t wakeup_pair[2]; /* socketpair() used for wakeup
|
curl_socket_t wakeup_pair[2]; /* socketpair() used for wakeup
|
||||||
0 is used for read, 1 is used for write */
|
0 is used for read, 1 is used for write */
|
||||||
|
#endif
|
||||||
#endif
|
#endif
|
||||||
/* multiplexing wanted */
|
/* multiplexing wanted */
|
||||||
bool multiplexing;
|
bool multiplexing;
|
||||||
|
Loading…
Reference in New Issue
Block a user