Previously, processing a large batch of timeouts was O(n^2) in the number of

outstanding queries, and processing a DNS response packet was O(n) in the
number of outstanding queries. To speed things up in Google, we added a few circular,
doubly-linked lists of queries that are hash-bucketed based on
the attributes we care about, so most important operations are now O(1).

It might be that the number of buckets are higher than most people would need,
but on a quick calculation it should only be 100kB or so even on a 64-bit
system, so I've let it stay as-is.
This commit is contained in:
Steinar H. Gunderson 2007-09-29 18:18:47 +00:00
parent b01ab65225
commit 23f5d145ec
10 changed files with 326 additions and 84 deletions

View File

@ -14,6 +14,7 @@
*/
#include "setup.h"
#include <assert.h>
#include <stdlib.h>
#include "ares.h"
#include "ares_private.h"
@ -25,18 +26,33 @@
*/
void ares_cancel(ares_channel channel)
{
struct query *query, *next;
struct query *query;
struct list_node* list_head;
struct list_node* list_node;
int i;
for (query = channel->queries; query; query = next)
list_head = &(channel->all_queries);
for (list_node = list_head->next; list_node != list_head; )
{
next = query->next;
query = list_node->data;
list_node = list_node->next; /* since we're deleting the query */
query->callback(query->arg, ARES_ETIMEOUT, 0, NULL, 0);
free(query->tcpbuf);
free(query->server_info);
free(query);
ares__free_query(query);
}
channel->queries = NULL;
#ifndef NDEBUG
/* Freeing the query should remove it from all the lists in which it sits,
* so all query lists should be empty now.
*/
assert(ares__is_list_empty(&(channel->all_queries)));
for (i = 0; i < ARES_QID_TABLE_SIZE; i++)
{
assert(ares__is_list_empty(&(channel->queries_by_qid[i])));
}
for (i = 0; i < ARES_TIMEOUT_TABLE_SIZE; i++)
{
assert(ares__is_list_empty(&(channel->queries_by_timeout[i])));
}
#endif
if (!(channel->flags & ARES_FLAG_STAYOPEN))
{
if (channel->servers)

View File

@ -16,6 +16,7 @@
*/
#include "setup.h"
#include <assert.h>
#include <stdlib.h>
#include "ares.h"
#include "ares_private.h"
@ -37,13 +38,42 @@ void ares_destroy(ares_channel channel)
{
int i;
struct query *query;
struct list_node* list_head;
struct list_node* list_node;
list_head = &(channel->all_queries);
for (list_node = list_head->next; list_node != list_head; )
{
query = list_node->data;
list_node = list_node->next; /* since we're deleting the query */
query->callback(query->arg, ARES_EDESTRUCTION, 0, NULL, 0);
ares__free_query(query);
}
#ifndef NDEBUG
/* Freeing the query should remove it from all the lists in which it sits,
* so all query lists should be empty now.
*/
assert(ares__is_list_empty(&(channel->all_queries)));
for (i = 0; i < ARES_QID_TABLE_SIZE; i++)
{
assert(ares__is_list_empty(&(channel->queries_by_qid[i])));
}
for (i = 0; i < ARES_TIMEOUT_TABLE_SIZE; i++)
{
assert(ares__is_list_empty(&(channel->queries_by_timeout[i])));
}
#endif
if (!channel)
return;
if (channel->servers) {
for (i = 0; i < channel->nservers; i++)
ares__close_sockets(channel, &channel->servers[i]);
{
struct server_state *server = &channel->servers[i];
ares__close_sockets(channel, server);
assert(ares__is_list_empty(&(server->queries_to_server)));
}
free(channel->servers);
}
@ -59,16 +89,5 @@ void ares_destroy(ares_channel channel)
if (channel->lookups)
free(channel->lookups);
while (channel->queries) {
query = channel->queries;
channel->queries = query->next;
query->callback(query->arg, ARES_EDESTRUCTION, 0, NULL, 0);
if (query->tcpbuf)
free(query->tcpbuf);
if (query->server_info)
free(query->server_info);
free(query);
}
free(channel);
}

View File

@ -30,6 +30,9 @@ int ares_fds(ares_channel channel, fd_set *read_fds, fd_set *write_fds)
ares_socket_t nfds;
int i;
/* Are there any active queries? */
int active_queries = !ares__is_list_empty(&(channel->all_queries));
nfds = 0;
for (i = 0; i < channel->nservers; i++)
{
@ -37,7 +40,7 @@ int ares_fds(ares_channel channel, fd_set *read_fds, fd_set *write_fds)
/* We only need to register interest in UDP sockets if we have
* outstanding queries.
*/
if (channel->queries && server->udp_socket != ARES_SOCKET_BAD)
if (active_queries && server->udp_socket != ARES_SOCKET_BAD)
{
FD_SET(server->udp_socket, read_fds);
if (server->udp_socket >= nfds)

View File

@ -35,7 +35,7 @@ int ares_getsock(ares_channel channel,
ares_socket_t *socks = (ares_socket_t *)s;
/* No queries, no file descriptors. */
if (!channel->queries)
if (ares__is_list_empty(&(channel->all_queries)))
return 0;
for (i = 0;

View File

@ -107,6 +107,7 @@ int ares_init_options(ares_channel *channelptr, struct ares_options *options,
int i;
int status = ARES_SUCCESS;
struct server_state *server;
struct timeval tv;
#ifdef CURLDEBUG
const char *env = getenv("CARES_MEMDEBUG");
@ -140,13 +141,26 @@ int ares_init_options(ares_channel *channelptr, struct ares_options *options,
channel->nsort = -1;
channel->tcp_connection_generation = 0;
channel->lookups = NULL;
channel->queries = NULL;
channel->domains = NULL;
channel->sortlist = NULL;
channel->servers = NULL;
channel->sock_state_cb = NULL;
channel->sock_state_cb_data = NULL;
gettimeofday(&tv, NULL);
channel->last_timeout_processed = tv.tv_sec;
/* Initialize our lists of queries */
ares__init_list_head(&(channel->all_queries));
for (i = 0; i < ARES_QID_TABLE_SIZE; i++)
{
ares__init_list_head(&(channel->queries_by_qid[i]));
}
for (i = 0; i < ARES_TIMEOUT_TABLE_SIZE; i++)
{
ares__init_list_head(&(channel->queries_by_timeout[i]));
}
/* Initialize configuration by each of the four sources, from highest
* precedence to lowest.
*/
@ -209,13 +223,14 @@ int ares_init_options(ares_channel *channelptr, struct ares_options *options,
server->tcp_buffer = NULL;
server->qhead = NULL;
server->qtail = NULL;
ares__init_list_head(&(server->queries_to_server));
server->channel = channel;
server->is_broken = 0;
}
init_id_key(&channel->id_key, ARES_ID_KEY_LEN);
channel->next_id = ares__generate_new_id(&channel->id_key);
channel->queries = NULL;
*channelptr = channel;
return ARES_SUCCESS;

View File

@ -84,6 +84,15 @@
#include "ares_ipv6.h"
struct query;
/* Node definition for circular, doubly-linked list */
struct list_node {
struct list_node *prev;
struct list_node *next;
void* data;
};
struct send_request {
/* Remaining data to send */
const unsigned char *data;
@ -122,17 +131,35 @@ struct server_state {
* re-send. */
int tcp_connection_generation;
/* Circular, doubly-linked list of outstanding queries to this server */
struct list_node queries_to_server;
/* Link back to owning channel */
ares_channel channel;
/* Is this server broken? We mark connections as broken when a
* request that is queued for sending times out.
*/
int is_broken;
};
/* State to represent a DNS query */
struct query {
/* Query ID from qbuf, for faster lookup, and current timeout */
unsigned short qid;
time_t timeout;
/*
* Links for the doubly-linked lists in which we insert a query.
* These circular, doubly-linked lists that are hash-bucketed based
* the attributes we care about, help making most important
* operations O(1).
*/
struct list_node queries_by_qid; /* hopefully in same cache line as qid */
struct list_node queries_by_timeout;
struct list_node queries_to_server;
struct list_node all_queries;
/* Query buf with length at beginning, for TCP transmission */
unsigned char *tcpbuf;
int tcplen;
@ -149,9 +176,6 @@ struct query {
struct query_server_info *server_info; /* per-server state */
int using_tcp;
int error_status;
/* Next query in chain */
struct query *next;
int timeouts; /* number of timeouts we saw for this request */
};
@ -216,8 +240,18 @@ struct ares_channeldata {
/* Generation number to use for the next TCP socket open/close */
int tcp_connection_generation;
/* Active queries */
struct query *queries;
/* The time at which we last called process_timeouts() */
time_t last_timeout_processed;
/* Circular, doubly-linked list of queries, bucketed various ways.... */
/* All active queries in a single list: */
struct list_node all_queries;
/* Queries bucketed by qid, for quickly dispatching DNS responses: */
#define ARES_QID_TABLE_SIZE 2048
struct list_node queries_by_qid[ARES_QID_TABLE_SIZE];
/* Queries bucketed by timeout, for quickly handling timeouts: */
#define ARES_TIMEOUT_TABLE_SIZE 1024
struct list_node queries_by_timeout[ARES_TIMEOUT_TABLE_SIZE];
ares_sock_state_cb sock_state_cb;
void *sock_state_cb_data;
@ -228,8 +262,76 @@ void ares__send_query(ares_channel channel, struct query *query, time_t now);
void ares__close_sockets(ares_channel channel, struct server_state *server);
int ares__get_hostent(FILE *fp, int family, struct hostent **host);
int ares__read_line(FILE *fp, char **buf, int *bufsize);
void ares__free_query(struct query *query);
short ares__generate_new_id(rc4_key* key);
/* Routines for managing doubly-linked circular linked lists with a
* dummy head.
*/
/* Initialize a new head node */
static inline void ares__init_list_head(struct list_node* head) {
head->prev = head;
head->next = head;
head->data = NULL;
}
/* Initialize a list node */
static inline void ares__init_list_node(struct list_node* node, void* d) {
node->prev = NULL;
node->next = NULL;
node->data = d;
}
/* Returns true iff the given list is empty */
static inline int ares__is_list_empty(struct list_node* head) {
return ((head->next == head) && (head->prev == head));
}
/* Inserts new_node before old_node */
static inline void ares__insert_in_list(struct list_node* new_node,
struct list_node* old_node) {
new_node->next = old_node;
new_node->prev = old_node->prev;
old_node->prev->next = new_node;
old_node->prev = new_node;
}
/* Removes the node from the list it's in, if any */
static inline void ares__remove_from_list(struct list_node* node) {
if (node->next != NULL) {
node->prev->next = node->next;
node->next->prev = node->prev;
node->prev = NULL;
node->next = NULL;
}
}
/* Swap the contents of two lists */
static inline void ares__swap_lists(struct list_node* head_a,
struct list_node* head_b) {
int is_a_empty = ares__is_list_empty(head_a);
int is_b_empty = ares__is_list_empty(head_b);
struct list_node old_a = *head_a;
struct list_node old_b = *head_b;
if (is_a_empty) {
ares__init_list_head(head_b);
} else {
*head_b = old_a;
old_a.next->prev = head_b;
old_a.prev->next = head_b;
}
if (is_b_empty) {
ares__init_list_head(head_a);
} else {
*head_a = old_b;
old_b.next->prev = head_a;
old_b.prev->next = head_a;
}
}
#define ARES_SWAP_BYTE(a,b) \
{ unsigned char swapByte = *(a); *(a) = *(b); *(b) = swapByte; }

View File

@ -71,13 +71,13 @@ static void process_answer(ares_channel channel, unsigned char *abuf,
static void handle_error(ares_channel channel, int whichserver, time_t now);
static void skip_server(ares_channel channel, struct query *query,
int whichserver);
static struct query *next_server(ares_channel channel, struct query *query, time_t now);
static void next_server(ares_channel channel, struct query *query, time_t now);
static int configure_socket(int s, ares_channel channel);
static int open_tcp_socket(ares_channel channel, struct server_state *server);
static int open_udp_socket(ares_channel channel, struct server_state *server);
static int same_questions(const unsigned char *qbuf, int qlen,
const unsigned char *abuf, int alen);
static struct query *end_query(ares_channel channel, struct query *query, int status,
static void end_query(ares_channel channel, struct query *query, int status,
unsigned char *abuf, int alen);
/* Something interesting happened on the wire, or there was a timeout.
@ -416,18 +416,33 @@ static void read_udp_packets(ares_channel channel, fd_set *read_fds,
/* If any queries have timed out, note the timeout and move them on. */
static void process_timeouts(ares_channel channel, time_t now)
{
struct query *query, *next;
time_t t; /* the time of the timeouts we're processing */
struct query *query;
struct list_node* list_head;
struct list_node* list_node;
for (query = channel->queries; query; query = next)
/* Process all the timeouts that have fired since the last time we
* processed timeouts. If things are going well, then we'll have
* hundreds/thousands of queries that fall into future buckets, and
* only a handful of requests that fall into the "now" bucket, so
* this should be quite quick.
*/
for (t = channel->last_timeout_processed; t <= now; t++)
{
next = query->next;
if (query->timeout != 0 && now >= query->timeout)
list_head = &(channel->queries_by_timeout[t % ARES_TIMEOUT_TABLE_SIZE]);
for (list_node = list_head->next; list_node != list_head; )
{
query->error_status = ARES_ETIMEOUT;
++query->timeouts;
next = next_server(channel, query, now);
query = list_node->data;
list_node = list_node->next; /* in case the query gets deleted */
if (query->timeout != 0 && now >= query->timeout)
{
query->error_status = ARES_ETIMEOUT;
++query->timeouts;
next_server(channel, query, now);
}
}
}
}
channel->last_timeout_processed = now;
}
/* Handle an answer from a server. */
@ -437,6 +452,8 @@ static void process_answer(ares_channel channel, unsigned char *abuf,
int tc, rcode;
unsigned short id;
struct query *query;
struct list_node* list_head;
struct list_node* list_node;
/* If there's no room in the answer for a header, we can't do much
* with it. */
@ -448,11 +465,24 @@ static void process_answer(ares_channel channel, unsigned char *abuf,
tc = DNS_HEADER_TC(abuf);
rcode = DNS_HEADER_RCODE(abuf);
/* Find the query corresponding to this packet. */
for (query = channel->queries; query; query = query->next)
/* Find the query corresponding to this packet. The queries are
* hashed/bucketed by query id, so this lookup should be quick.
* Note that both the query id and the questions must be the same;
* when the query id wraps around we can have multiple outstanding
* queries with the same query id, so we need to check both the id and
* question.
*/
query = NULL;
list_head = &(channel->queries_by_qid[id % ARES_QID_TABLE_SIZE]);
for (list_node = list_head->next; list_node != list_head;
list_node = list_node->next)
{
if (query->qid == id)
break;
struct query *q = list_node->data;
if ((q->qid == id) && same_questions(q->qbuf, q->qlen, abuf, alen))
{
query = q;
break;
}
}
if (!query)
return;
@ -516,24 +546,37 @@ static void process_broken_connections(ares_channel channel, time_t now)
static void handle_error(ares_channel channel, int whichserver, time_t now)
{
struct query *query, *next;
struct server_state *server;
struct query *query;
struct list_node list_head;
struct list_node* list_node;
server = &channel->servers[whichserver];
/* Reset communications with this server. */
ares__close_sockets(channel, &channel->servers[whichserver]);
ares__close_sockets(channel, server);
/* Tell all queries talking to this server to move on and not try
* this server again.
* this server again. We steal the current list of queries that were
* in-flight to this server, since when we call next_server this can
* cause the queries to be re-sent to this server, which will
* re-insert these queries in that same server->queries_to_server
* list.
*/
for (query = channel->queries; query; query = next)
ares__init_list_head(&list_head);
ares__swap_lists(&list_head, &(server->queries_to_server));
for (list_node = list_head.next; list_node != &list_head; )
{
next = query->next;
if (query->server == whichserver)
{
skip_server(channel, query, whichserver);
next = next_server(channel, query, now);
}
query = list_node->data;
list_node = list_node->next; /* in case the query gets deleted */
assert(query->server == whichserver);
skip_server(channel, query, whichserver);
next_server(channel, query, now);
}
/* Each query should have removed itself from our temporary list as
* it re-sent itself or finished up...
*/
assert(ares__is_list_empty(&list_head));
}
static void skip_server(ares_channel channel, struct query *query,
@ -552,7 +595,7 @@ static void skip_server(ares_channel channel, struct query *query,
}
}
static struct query *next_server(ares_channel channel, struct query *query, time_t now)
static void next_server(ares_channel channel, struct query *query, time_t now)
{
/* Advance to the next server or try. */
query->server++;
@ -574,7 +617,7 @@ static struct query *next_server(ares_channel channel, struct query *query, time
server->tcp_connection_generation)))
{
ares__send_query(channel, query, now);
return (query->next);
return;
}
}
query->server = 0;
@ -586,7 +629,7 @@ static struct query *next_server(ares_channel channel, struct query *query, time
* tickle a bug that drops our request.
*/
}
return end_query(channel, query, query->error_status, NULL, 0);
end_query(channel, query, query->error_status, NULL, 0);
}
void ares__send_query(ares_channel channel, struct query *query, time_t now)
@ -659,6 +702,21 @@ void ares__send_query(ares_channel channel, struct query *query, time_t now)
query->timeout = now
+ ((query->try == 0) ? channel->timeout
: channel->timeout << query->try / channel->nservers);
/* Keep track of queries bucketed by timeout, so we can process
* timeout events quickly.
*/
ares__remove_from_list(&(query->queries_by_timeout));
ares__insert_in_list(
&(query->queries_by_timeout),
&(channel->queries_by_timeout[query->timeout %
ARES_TIMEOUT_TABLE_SIZE]));
/* Keep track of queries bucketed by server, so we can process server
* errors quickly.
*/
ares__remove_from_list(&(query->queries_to_server));
ares__insert_in_list(&(query->queries_to_server),
&(server->queries_to_server));
}
/*
@ -925,10 +983,9 @@ static int same_questions(const unsigned char *qbuf, int qlen,
return 1;
}
static struct query *end_query (ares_channel channel, struct query *query, int status,
unsigned char *abuf, int alen)
static void end_query (ares_channel channel, struct query *query, int status,
unsigned char *abuf, int alen)
{
struct query **q, *next;
int i;
/* First we check to see if this query ended while one of our send
@ -987,27 +1044,31 @@ static struct query *end_query (ares_channel channel, struct query *query, int s
/* Invoke the callback */
query->callback(query->arg, status, query->timeouts, abuf, alen);
for (q = &channel->queries; *q; q = &(*q)->next)
{
if (*q == query)
break;
}
*q = query->next;
if (*q)
next = (*q)->next;
else
next = NULL;
free(query->tcpbuf);
free(query->server_info);
free(query);
ares__free_query(query);
/* Simple cleanup policy: if no queries are remaining, close all
* network sockets unless STAYOPEN is set.
*/
if (!channel->queries && !(channel->flags & ARES_FLAG_STAYOPEN))
if (!(channel->flags & ARES_FLAG_STAYOPEN) &&
ares__is_list_empty(&(channel->all_queries)))
{
for (i = 0; i < channel->nservers; i++)
ares__close_sockets(channel, &channel->servers[i]);
}
return (next);
}
void ares__free_query(struct query *query)
{
/* Remove the query from all the lists in which it is linked */
ares__remove_from_list(&(query->queries_by_qid));
ares__remove_from_list(&(query->queries_by_timeout));
ares__remove_from_list(&(query->queries_to_server));
ares__remove_from_list(&(query->all_queries));
/* Zero out some important stuff, to help catch bugs */
query->callback = NULL;
query->arg = NULL;
/* Deallocate the memory associated with the query */
free(query->tcpbuf);
free(query->server_info);
free(query);
}

View File

@ -68,15 +68,19 @@ void ares__rc4(rc4_key* key, unsigned char *buffer_ptr, int buffer_len)
static struct query* find_query_by_id(ares_channel channel, int id)
{
unsigned short qid;
struct query* q;
struct list_node* list_head;
struct list_node* list_node;
DNS_HEADER_SET_QID(((unsigned char*)&qid), id);
/* Find the query corresponding to this packet. */
for (q = channel->queries; q; q = q->next)
{
if (q->qid == qid)
list_head = &(channel->queries_by_qid[qid % ARES_QID_TABLE_SIZE]);
for (list_node = list_head->next; list_node != list_head;
list_node = list_node->next)
{
struct query *q = list_node->data;
if (q->qid == qid)
return q;
}
}
return NULL;
}

View File

@ -102,9 +102,20 @@ void ares_send(ares_channel channel, const unsigned char *qbuf, int qlen,
query->error_status = ARES_ECONNREFUSED;
query->timeouts = 0;
/* Chain the query into this channel's query list. */
query->next = channel->queries;
channel->queries = query;
/* Initialize our list nodes. */
ares__init_list_node(&(query->queries_by_qid), query);
ares__init_list_node(&(query->queries_by_timeout), query);
ares__init_list_node(&(query->queries_to_server), query);
ares__init_list_node(&(query->all_queries), query);
/* Chain the query into the list of all queries. */
ares__insert_in_list(&(query->all_queries), &(channel->all_queries));
/* Keep track of queries bucketed by qid, so we can process DNS
* responses quickly.
*/
ares__insert_in_list(
&(query->queries_by_qid),
&(channel->queries_by_qid[query->qid % ARES_QID_TABLE_SIZE]));
/* Perform the first query action. */
time(&now);

View File

@ -26,23 +26,34 @@
#include "ares.h"
#include "ares_private.h"
/* WARNING: Beware that this is linear in the number of outstanding
* requests! You are probably far better off just calling ares_process()
* once per second, rather than calling ares_timeout() to figure out
* when to next call ares_process().
*/
struct timeval *ares_timeout(ares_channel channel, struct timeval *maxtv,
struct timeval *tvbuf)
{
struct query *query;
struct list_node* list_head;
struct list_node* list_node;
time_t now;
time_t offset, min_offset; /* these use time_t since some 32 bit systems
still use 64 bit time_t! (like VS2005) */
/* No queries, no timeout (and no fetch of the current time). */
if (!channel->queries)
if (ares__is_list_empty(&(channel->all_queries)))
return maxtv;
/* Find the minimum timeout for the current set of queries. */
time(&now);
min_offset = -1;
for (query = channel->queries; query; query = query->next)
list_head = &(channel->all_queries);
for (list_node = list_head->next; list_node != list_head;
list_node = list_node->next)
{
query = list_node->data;
if (query->timeout == 0)
continue;
offset = query->timeout - now;