This is now a working example using libevent and curl_multi_socket() for really

fast treatment of many simultaneous transfers
This commit is contained in:
Daniel Stenberg 2006-08-03 22:57:04 +00:00
parent 51f258d103
commit b9b06b00bf
1 changed files with 98 additions and 216 deletions

View File

@ -10,8 +10,7 @@
* Connect N connections. Z are idle, and X are active. Transfer as fast as
* possible.
*
* Run for a specific amount of time (10 secs for now). Output detailed timing
* information.
* Output detailed timing information.
*
* Uses libevent.
*
@ -50,16 +49,6 @@
when using asynch supported libcurl. */
#define IDLE_TIME 10
struct ourfdset {
/* __fds_bits is what the Linux glibc headers use when they declare the
fd_set struct so by using this we can actually avoid the typecase for the
FD_SET() macro usage but it would hardly be portable */
char __fds_bits[NCONNECTIONS/8];
};
#define FD2_ZERO(x) memset(x, 0, sizeof(struct ourfdset))
typedef struct ourfdset fd2_set;
struct globalinfo {
size_t dlcounter;
};
@ -73,6 +62,8 @@ struct connection {
char error[CURL_ERROR_SIZE];
};
/* this is the struct associated with each file descriptor libcurl tells us
it is dealing with */
struct fdinfo {
/* create a link list of fdinfo structs */
struct fdinfo *next;
@ -91,15 +82,64 @@ static struct fdinfo *allsocks;
static int running_handles;
/* we have the timerevent global so that when the final socket-based event is
done, we can remove the timerevent as well */
static struct event timerevent;
/* called from libevent on action on a particular socket ("event") */
static void eventcallback(int fd, short type, void *userp)
{
struct fdinfo *fdp = (struct fdinfo *)userp;
CURLMcode rc;
fprintf(stderr, "EVENT callback\n");
fprintf(stderr, "EVENT callback type %d\n", type);
/* tell libcurl to deal with the transfer associated with this socket */
curl_multi_socket(fdp->multi, fd, fdp->running_handles);
do {
rc = curl_multi_socket(fdp->multi, fd, fdp->running_handles);
} while (rc == CURLM_CALL_MULTI_PERFORM);
if(rc) {
fprintf(stderr, "curl_multi_socket() returned %d\n", (int)rc);
}
fprintf(stderr, "running_handles: %d\n", *fdp->running_handles);
if(!*fdp->running_handles) {
/* last transfer is complete, kill pending timeout */
fprintf(stderr, "last transfer done, kill timeout\n");
if(evtimer_pending(&timerevent, NULL))
evtimer_del(&timerevent);
}
}
/* called from libevent when our timer event expires */
static void timercallback(int fd, short type, void *userp)
{
(void)fd; /* not used for this */
(void)type; /* ignored in here */
CURLM *multi_handle = (CURLM *)userp;
long timeout_ms;
struct timeval timeout;
int running_handles;
CURLMcode rc;
fprintf(stderr, "EVENT timeout\n");
/* tell libcurl to deal with the transfer associated with this socket */
do {
rc = curl_multi_socket(multi_handle, CURL_SOCKET_TIMEOUT,
&running_handles);
} while (rc == CURLM_CALL_MULTI_PERFORM);
if(running_handles) {
/* Get the current timeout value from libcurl and set a new timeout */
curl_multi_timeout(multi_handle, &timeout_ms);
/* convert ms to timeval */
timeout.tv_sec = timeout_ms/1000;
timeout.tv_usec = (timeout_ms%1000)*1000;
evtimer_add(&timerevent, &timeout);
}
}
static void remsock(struct fdinfo *f)
@ -108,6 +148,9 @@ static void remsock(struct fdinfo *f)
/* did not find socket to remove! */
return;
if(f->evset)
event_del(&f->ev);
if(f->prev)
f->prev->next = f->next;
if(f->next)
@ -128,16 +171,22 @@ static void setsock(struct fdinfo *fdp, curl_socket_t s, CURL *easy,
/* first remove the existing event if the old setup was used */
event_del(&fdp->ev);
/* now use and add the current socket setup */
/* now use and add the current socket setup to libevent. The EV_PERSIST is
the key here as otherwise libevent will automatically remove the event
when it occurs the first time */
event_set(&fdp->ev, fdp->sockfd,
(action&CURL_POLL_IN?EV_READ:0)|
(action&CURL_POLL_OUT?EV_WRITE:0),
(action&CURL_POLL_OUT?EV_WRITE:0)| EV_PERSIST,
eventcallback, fdp);
fdp->evset=1;
fprintf(stderr, "event_add() for fd %d\n", s);
event_add(&fdp->ev, NULL); /* no timeout */
/* We don't use any socket-specific timeout but intead we use a single
global one. This is (mostly) because libcurl doesn't expose any
particular socket- based timeout value. */
event_add(&fdp->ev, NULL);
}
static void addsock(curl_socket_t s, CURL *easy, int action, CURLM *multi)
@ -162,55 +211,17 @@ static void addsock(curl_socket_t s, CURL *easy, int action, CURLM *multi)
curl_multi_assign(multi, s, fdp);
}
static void fdinfo2fdset(fd2_set *fdread, fd2_set *fdwrite, int *maxfd)
{
struct fdinfo *fdp = allsocks;
int writable=0;
FD2_ZERO(fdread);
FD2_ZERO(fdwrite);
*maxfd = 0;
#if 0
printf("Wait for: ");
#endif
while(fdp) {
if(fdp->action & CURL_POLL_IN) {
FD_SET(fdp->sockfd, (fd_set *)fdread);
}
if(fdp->action & CURL_POLL_OUT) {
FD_SET(fdp->sockfd, (fd_set *)fdwrite);
writable++;
}
#if 0
printf("%d (%s%s) ",
fdp->sockfd,
(fdp->action & CURL_POLL_IN)?"r":"",
(fdp->action & CURL_POLL_OUT)?"w":"");
#endif
if(fdp->sockfd > *maxfd)
*maxfd = fdp->sockfd;
fdp = fdp->next;
}
#if 0
if(writable)
printf("Check for %d writable sockets\n", writable);
#endif
}
/* on port 8999 we run a fork enabled sws that supports 'idle' and 'stream' */
#define PORT "8999"
#define HOST "192.168.1.13"
#define HOST "127.0.0.1"
#define URL_IDLE "http://" HOST ":" PORT "/1000"
#if 1
#define URL_ACTIVE "http://" HOST ":" PORT "/1001"
#else
#define URL_ACTIVE "http://localhost/"
#endif
static int socket_callback(CURL *easy, /* easy handle */
curl_socket_t s, /* socket */
@ -219,15 +230,24 @@ static int socket_callback(CURL *easy, /* easy handle */
void *socketp) /* socket pointer */
{
struct fdinfo *fdp = (struct fdinfo *)socketp;
char *whatstr[]={
"none",
"IN",
"OUT",
"INOUT",
"REMOVE"};
fprintf(stderr, "socket %d easy %p what %d\n", s, easy, what);
fprintf(stderr, "socket %d easy %p what %s\n", s, easy,
whatstr[what]);
if(what == CURL_POLL_REMOVE)
remsock(fdp);
else {
if(!fdp) {
/* not previously known, add it and set association */
printf("Add info for socket %d (%d)\n", s, what);
printf("Add info for socket %d %s%s\n", s,
what&CURL_POLL_IN?"READ":"",
what&CURL_POLL_OUT?"WRITE":"" );
addsock(s, easy, what, cbp);
}
else {
@ -250,135 +270,19 @@ writecallback(void *ptr, size_t size, size_t nmemb, void *data)
c->dlcounter += realsize;
c->global->dlcounter += realsize;
#if 1
printf("%02d: %d, total %d\n",
c->id, c->dlcounter, c->global->dlcounter);
#endif
return realsize;
}
/* return the diff between two timevals, in us */
static long tvdiff(struct timeval *newer, struct timeval *older)
{
return (newer->tv_sec-older->tv_sec)*1000000+
(newer->tv_usec-older->tv_usec);
}
/* store the start time of the program in this variable */
static struct timeval timer;
static void timer_start(void)
{
/* capture the time of the start moment */
gettimeofday(&timer, NULL);
}
static struct timeval cont; /* at this moment we continued */
int still_running; /* keep number of running handles */
struct conncount {
long time_us;
long laps;
long maxtime;
};
static struct timeval timerpause;
static void timer_pause(void)
{
/* capture the time of the pause moment */
gettimeofday(&timerpause, NULL);
/* If we have a previous continue (all times except the first), we can now
store the time for a whole "lap" */
if(cont.tv_sec) {
long lap;
lap = tvdiff(&timerpause, &cont);
}
}
static long paused; /* amount of us we have been pausing */
static void timer_continue(void)
{
/* Capture the time of the restored operation moment, now calculate how long
time we were paused and added that to the 'paused' variable.
*/
gettimeofday(&cont, NULL);
paused += tvdiff(&cont, &timerpause);
}
static long total; /* amount of us from start to stop */
static void timer_total(void)
{
struct timeval stop;
/* Capture the time of the operation stopped moment, now calculate how long
time we were running and how much of that pausing.
*/
gettimeofday(&stop, NULL);
total = tvdiff(&stop, &timer);
}
struct globalinfo info;
struct connection *conns;
long selects;
long timeouts;
long multi_socket;
long performalive;
long performselect;
long topselect;
int num_total;
int num_idle;
int num_active;
static void report(void)
{
int i;
long active = total - paused;
long numdl = 0;
for(i=0; i < num_total; i++) {
if(conns[i].dlcounter)
numdl++;
}
printf("Summary from %d simultanoues transfers (%d active)\n",
num_total, num_active);
printf("%d out of %d connections provided data\n", numdl, num_total);
printf("Total time: %ldus paused: %ldus curl_multi_socket(): %ldus\n",
total, paused, active);
printf("%d calls to select() "
"Average time: %dus\n",
selects, paused/selects);
printf(" Average number of readable connections per select() return: %d\n",
performselect/selects);
printf(" Max number of readable connections for a single select() "
"return: %d\n",
topselect);
printf("%ld calls to multi_socket(), "
"Average time: %ldus\n",
multi_socket, active/multi_socket);
printf("%ld select() timeouts\n", timeouts);
printf("Downloaded %ld bytes in %ld bytes/sec, %ld usec/byte\n",
info.dlcounter,
info.dlcounter/(total/1000000),
total/info.dlcounter);
}
int main(int argc, char **argv)
{
CURLM *multi_handle;
@ -387,11 +291,11 @@ int main(int argc, char **argv)
CURLMcode mcode = CURLM_OK;
int rc;
int i;
fd2_set fdsizecheck;
int selectmaxamount;
struct fdinfo *fdp;
char act;
long timeout_ms;
struct timeval timeout;
memset(&info, 0, sizeof(struct globalinfo));
@ -462,45 +366,25 @@ int main(int argc, char **argv)
curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_callback);
curl_multi_setopt(multi_handle, CURLMOPT_SOCKETDATA, multi_handle);
/* we start the action by calling *socket() right away */
/* we start the action by calling *socket_all() */
while(CURLM_CALL_MULTI_PERFORM == curl_multi_socket_all(multi_handle,
&running_handles));
/* event_dispatch() isn't good enough for us, since we need a global timeout
to occur after a given time of inactivity
*/
/* get the timeout value from libcurl */
/* Since we need a global timeout to occur after a given time of inactivity,
we add a single timeout-event. Get the timeout value from libcurl */
curl_multi_timeout(multi_handle, &timeout_ms);
/* convert ms to timeval */
timeout.tv_sec = timeout_ms/1000;
timeout.tv_usec = (timeout_ms%1000)*1000;
evtimer_set(&timerevent, timercallback, multi_handle);
evtimer_add(&timerevent, &timeout);
while(running_handles) {
struct timeval timeout;
/* event_dispatch() runs the event main loop. It ends when no events are
left to wait for. */
/* convert ms to timeval */
timeout.tv_sec = timeout_ms/1000;
timeout.tv_usec = (timeout_ms%1000)*1000;
event_dispatch();
event_loopexit(&timeout);
/* The event_loopexit() function may have taken a while and it may or may
not have invoked libcurl calls during that time. During those calls,
the timeout situation might very well have changed, so we check the
timeout time again to see if we really need to call curl_multi_socket()
at this point! */
/* get the timeout value from libcurl */
curl_multi_timeout(multi_handle, &timeout_ms);
if(timeout_ms <= 0) {
/* no time left */
curl_multi_socket(multi_handle, CURL_SOCKET_TIMEOUT, &running_handles);
/* and get the new timeout value again */
curl_multi_timeout(multi_handle, &timeout_ms);
}
}
if(still_running != num_total) {
{
/* something made connections fail, extract the reason and tell */
int msgs_left;
struct connection *cptr;
@ -508,10 +392,10 @@ int main(int argc, char **argv)
if (msg->msg == CURLMSG_DONE) {
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &cptr);
printf("%d => (%d) %s", cptr->id, msg->data.result, cptr->error);
printf("%d => (%d) %s\n",
cptr->id, msg->data.result, cptr->error);
}
}
}
curl_multi_cleanup(multi_handle);
@ -520,7 +404,5 @@ int main(int argc, char **argv)
for(i=0; i< num_total; i++)
curl_easy_cleanup(conns[i].e);
report();
return code;
}