Support max concurrent streams limit.

If max concurrent streams limit is reached, SYN_STREAM frames are not sent
and backed off. If other type of frame is waiting in the tx queue, it is
sent first. We introduced another priority queue for this purpose.
In this change we did not add code to send RST_STREAM when SYN_STREAM is
received but max concurrent stream is reached.
This commit is contained in:
Tatsuhiro Tsujikawa 2012-02-06 00:14:19 +09:00
parent d4c5f39cf9
commit 00bed87537
6 changed files with 294 additions and 43 deletions

View File

@ -99,6 +99,12 @@ typedef enum {
SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE = 7
} spdylay_settings_id;
/* Maximum ID of spdylay_settings_id. */
#define SPDYLAY_SETTINGS_MAX 7
/* Default maximum concurrent streams */
#define SPDYLAY_CONCURRENT_STREAMS_MAX 100
typedef enum {
SPDYLAY_OK = 0,
SPDYLAY_PROTOCOL_ERROR = 1,

View File

@ -109,6 +109,20 @@ static int spdylay_session_new(spdylay_session **session_ptr,
free(*session_ptr);
return r;
}
r = spdylay_pq_init(&(*session_ptr)->ob_ss_pq, spdylay_outbound_item_compar);
if(r != 0) {
spdylay_pq_free(&(*session_ptr)->ob_pq);
spdylay_map_free(&(*session_ptr)->streams);
spdylay_zlib_inflate_free(&(*session_ptr)->hd_inflater);
spdylay_zlib_deflate_free(&(*session_ptr)->hd_deflater);
free(*session_ptr);
return r;
}
memset((*session_ptr)->settings, 0, sizeof((*session_ptr)->settings));
(*session_ptr)->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] =
SPDYLAY_CONCURRENT_STREAMS_MAX;
(*session_ptr)->callbacks = *callbacks;
(*session_ptr)->user_data = user_data;
@ -156,7 +170,7 @@ static void spdylay_free_streams(key_type key, void *val)
free(val);
}
static void spdylay_outbound_item_free(spdylay_outbound_item *item)
void spdylay_outbound_item_free(spdylay_outbound_item *item)
{
if(item == NULL) {
return;
@ -196,18 +210,26 @@ static void spdylay_outbound_item_free(spdylay_outbound_item *item)
free(item->aux_data);
}
void spdylay_session_del(spdylay_session *session)
static void spdylay_session_ob_pq_free(spdylay_pq *pq)
{
spdylay_map_each(&session->streams, spdylay_free_streams);
spdylay_map_free(&session->streams);
while(!spdylay_pq_empty(&session->ob_pq)) {
spdylay_outbound_item *item = (spdylay_outbound_item*)
spdylay_pq_top(&session->ob_pq);
while(!spdylay_pq_empty(pq)) {
spdylay_outbound_item *item = (spdylay_outbound_item*)spdylay_pq_top(pq);
spdylay_outbound_item_free(item);
free(item);
spdylay_pq_pop(&session->ob_pq);
spdylay_pq_pop(pq);
}
spdylay_pq_free(&session->ob_pq);
spdylay_pq_free(pq);
}
void spdylay_session_del(spdylay_session *session)
{
if(session == NULL) {
return;
}
spdylay_map_each(&session->streams, spdylay_free_streams);
spdylay_map_free(&session->streams);
spdylay_session_ob_pq_free(&session->ob_pq);
spdylay_session_ob_pq_free(&session->ob_ss_pq);
spdylay_zlib_deflate_free(&session->hd_deflater);
spdylay_zlib_inflate_free(&session->hd_inflater);
free(session->iframe.buf);
@ -279,7 +301,11 @@ int spdylay_session_add_frame(spdylay_session *session,
break;
}
};
r = spdylay_pq_push(&session->ob_pq, item);
if(frame_type == SPDYLAY_SYN_STREAM) {
r = spdylay_pq_push(&session->ob_ss_pq, item);
} else {
r = spdylay_pq_push(&session->ob_pq, item);
}
if(r != 0) {
free(item);
return r;
@ -531,6 +557,86 @@ spdylay_outbound_item* spdylay_session_get_ob_pq_top
return (spdylay_outbound_item*)spdylay_pq_top(&session->ob_pq);
}
spdylay_outbound_item* spdylay_session_get_next_ob_item
(spdylay_session *session)
{
if(spdylay_pq_empty(&session->ob_pq)) {
if(spdylay_pq_empty(&session->ob_ss_pq)) {
return NULL;
} else {
/* Return item only when concurrent connection limit is not
reached */
if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS]
> spdylay_map_size(&session->streams)) {
return spdylay_pq_top(&session->ob_ss_pq);
} else {
return NULL;
}
}
} else {
if(spdylay_pq_empty(&session->ob_ss_pq)) {
return spdylay_pq_top(&session->ob_pq);
} else {
spdylay_outbound_item *item, *syn_stream_item;
item = spdylay_pq_top(&session->ob_pq);
syn_stream_item = spdylay_pq_top(&session->ob_ss_pq);
if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS]
<= spdylay_map_size(&session->streams) ||
item->pri < syn_stream_item->pri ||
(item->pri == syn_stream_item->pri &&
item->seq < syn_stream_item->seq)) {
return item;
} else {
return syn_stream_item;
}
}
}
}
spdylay_outbound_item* spdylay_session_pop_next_ob_item
(spdylay_session *session)
{
if(spdylay_pq_empty(&session->ob_pq)) {
if(spdylay_pq_empty(&session->ob_ss_pq)) {
return NULL;
} else {
/* Pop item only when concurrent connection limit is not
reached */
if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS]
> spdylay_map_size(&session->streams)) {
spdylay_outbound_item *item;
item = spdylay_pq_top(&session->ob_ss_pq);
spdylay_pq_pop(&session->ob_ss_pq);
return item;
} else {
return NULL;
}
}
} else {
if(spdylay_pq_empty(&session->ob_ss_pq)) {
spdylay_outbound_item *item;
item = spdylay_pq_top(&session->ob_pq);
spdylay_pq_pop(&session->ob_pq);
return item;
} else {
spdylay_outbound_item *item, *syn_stream_item;
item = spdylay_pq_top(&session->ob_pq);
syn_stream_item = spdylay_pq_top(&session->ob_ss_pq);
if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS]
<= spdylay_map_size(&session->streams) ||
item->pri < syn_stream_item->pri ||
(item->pri == syn_stream_item->pri &&
item->seq < syn_stream_item->seq)) {
spdylay_pq_pop(&session->ob_pq);
return item;
} else {
spdylay_pq_pop(&session->ob_ss_pq);
return syn_stream_item;
}
}
}
}
static int spdylay_session_after_frame_sent(spdylay_session *session)
{
/* TODO handle FIN flag. */
@ -645,31 +751,32 @@ static int spdylay_session_after_frame_sent(spdylay_session *session)
int r;
if(frame->data.flags & SPDYLAY_FLAG_FIN) {
spdylay_active_outbound_item_reset(&session->aob);
} else if(spdylay_pq_empty(&session->ob_pq) ||
session->aob.item->pri <=
spdylay_session_get_ob_pq_top(session)->pri) {
/* If priority of this stream is higher or equal to other stream
waiting at the top of the queue, we continue to send this
data. */
/* We assume that buffer has at least
SPDYLAY_DATA_FRAME_LENGTH. */
r = spdylay_session_pack_data_overwrite(session,
session->aob.framebuf,
SPDYLAY_DATA_FRAME_LENGTH,
&frame->data);
if(r < 0) {
spdylay_active_outbound_item_reset(&session->aob);
return r;
}
session->aob.framebufoff = 0;
} else {
r = spdylay_pq_push(&session->ob_pq, session->aob.item);
if(r == 0) {
session->aob.item = NULL;
spdylay_active_outbound_item_reset(&session->aob);
spdylay_outbound_item* item = spdylay_session_get_next_ob_item(session);
if(item == NULL || session->aob.item->pri <= item->pri) {
/* If priority of this stream is higher or equal to other stream
waiting at the top of the queue, we continue to send this
data. */
/* We assume that buffer has at least
SPDYLAY_DATA_FRAME_LENGTH. */
r = spdylay_session_pack_data_overwrite(session,
session->aob.framebuf,
SPDYLAY_DATA_FRAME_LENGTH,
&frame->data);
if(r < 0) {
spdylay_active_outbound_item_reset(&session->aob);
return r;
}
session->aob.framebufoff = 0;
} else {
spdylay_active_outbound_item_reset(&session->aob);
return r;
r = spdylay_pq_push(&session->ob_pq, session->aob.item);
if(r == 0) {
session->aob.item = NULL;
spdylay_active_outbound_item_reset(&session->aob);
} else {
spdylay_active_outbound_item_reset(&session->aob);
return r;
}
}
}
} else {
@ -681,15 +788,18 @@ static int spdylay_session_after_frame_sent(spdylay_session *session)
int spdylay_session_send(spdylay_session *session)
{
int r;
while(session->aob.item || !spdylay_pq_empty(&session->ob_pq)) {
while(1) {
const uint8_t *data;
size_t datalen;
ssize_t sentlen;
if(session->aob.item == NULL) {
spdylay_outbound_item *item = spdylay_pq_top(&session->ob_pq);
spdylay_outbound_item *item;
uint8_t *framebuf;
ssize_t framebuflen;
spdylay_pq_pop(&session->ob_pq);
item = spdylay_session_pop_next_ob_item(session);
if(item == NULL) {
break;
}
framebuflen = spdylay_session_prep_frame(session, item, &framebuf);
if(framebuflen < 0) {
/* TODO Call error callback? */
@ -1323,13 +1433,16 @@ int spdylay_session_want_write(spdylay_session *session)
{
/*
* Unless GOAWAY is sent or received, we want to write frames if
* there is pending ones. After GOAWAY is sent or received, we want
* to write frames if there is pending ones AND there are active
* frames.
* there is pending ones. If pending frame is SYN_STREAM and
* concurrent stream limit is reached, we don't want to write
* SYN_STREAM. After GOAWAY is sent or received, we want to write
* frames if there is pending ones AND there are active frames.
*/
return (session->aob.item != NULL || !spdylay_pq_empty(&session->ob_pq)) &&
(!session->goaway_flags ||
spdylay_map_size(&session->streams) > 0);
return (session->aob.item != NULL || !spdylay_pq_empty(&session->ob_pq) ||
(!spdylay_pq_empty(&session->ob_ss_pq) &&
session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS]
> spdylay_map_size(&session->streams))) &&
(!session->goaway_flags || spdylay_map_size(&session->streams) > 0);
}
int spdylay_session_add_ping(spdylay_session *session, uint32_t unique_id)

View File

@ -100,7 +100,10 @@ struct spdylay_session {
int64_t next_seq;
spdylay_map /* <spdylay_stream*> */ streams;
/* Queue for outbound frames other than SYN_STREAM */
spdylay_pq /* <spdylay_outbound_item*> */ ob_pq;
/* Queue for outbound SYN_STREAM frame */
spdylay_pq /* <spdylay_outbound_item*> */ ob_ss_pq;
spdylay_active_outbound_item aob;
@ -119,6 +122,10 @@ struct spdylay_session {
/* This is the value in GOAWAY frame sent by remote endpoint. */
int32_t last_good_stream_id;
/* Settings value store. We just use ID as index. The index = 0 is
unused. */
uint32_t settings[SPDYLAY_SETTINGS_MAX+1];
spdylay_session_callbacks callbacks;
void *user_data;
};
@ -293,4 +300,30 @@ uint32_t spdylay_session_get_next_unique_id(spdylay_session *session);
*/
spdylay_outbound_item* spdylay_session_get_ob_pq_top(spdylay_session *session);
/*
* Pops and returns next item to send. If there is no such item,
* returns NULL. This function takes into account max concurrent
* streams. That means if session->ob_pq is empty but
* session->ob_ss_pq has item and max concurrent streams is reached,
* then this function returns NULL.
*/
spdylay_outbound_item* spdylay_session_pop_next_ob_item
(spdylay_session *session);
/*
* Returns next item to send. If there is no such item, this function
* returns NULL. This function takes into account max concurrent
* streams. That means if session->ob_pq is empty but
* session->ob_ss_pq has item and max concurrent streams is reached,
* then this function returns NULL.
*/
spdylay_outbound_item* spdylay_session_get_next_ob_item
(spdylay_session *session);
/*
* Deallocates resource for |item|. If |item| is NULL, this function
* does nothing.
*/
void spdylay_outbound_item_free(spdylay_outbound_item *item);
#endif /* SPDYLAY_SESSION_H */

View File

@ -103,6 +103,10 @@ int main(int argc, char* argv[])
test_spdylay_session_is_my_stream_id) ||
!CU_add_test(pSuite, "session_send_rst_stream",
test_spdylay_session_send_rst_stream) ||
!CU_add_test(pSuite, "session_get_next_ob_item",
test_spdylay_session_get_next_ob_item) ||
!CU_add_test(pSuite, "session_pop_next_ob_item",
test_spdylay_session_pop_next_ob_item) ||
!CU_add_test(pSuite, "frame_unpack_nv", test_spdylay_frame_unpack_nv) ||
!CU_add_test(pSuite, "frame_count_nv_space",
test_spdylay_frame_count_nv_space) ||

View File

@ -209,7 +209,7 @@ void test_spdylay_session_add_frame()
CU_ASSERT(0 == spdylay_session_add_frame(session, SPDYLAY_SYN_STREAM, frame,
aux_data));
CU_ASSERT(0 == spdylay_pq_empty(&session->ob_pq));
CU_ASSERT(0 == spdylay_pq_empty(&session->ob_ss_pq));
CU_ASSERT(0 == spdylay_session_send(session));
CU_ASSERT(memcmp(hd_ans1, acc.buf, 4) == 0);
/* check stream id */
@ -790,3 +790,96 @@ void test_spdylay_session_send_rst_stream()
spdylay_session_del(session);
}
void test_spdylay_session_get_next_ob_item()
{
spdylay_session *session;
spdylay_session_callbacks callbacks;
const char *nv[] = { NULL };
memset(&callbacks, 0, sizeof(spdylay_session_callbacks));
callbacks.send_callback = null_send_callback;
spdylay_session_server_new(&session, &callbacks, NULL);
session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 2;
CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session));
spdylay_submit_ping(session);
CU_ASSERT(SPDYLAY_PING ==
spdylay_session_get_next_ob_item(session)->frame_type);
spdylay_submit_request(session, 0, nv, NULL, NULL);
CU_ASSERT(SPDYLAY_PING ==
spdylay_session_get_next_ob_item(session)->frame_type);
CU_ASSERT(0 == spdylay_session_send(session));
CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session));
spdylay_session_open_stream(session, 1, SPDYLAY_FLAG_NONE,
3, SPDYLAY_STREAM_OPENING, NULL);
spdylay_submit_request(session, 0, nv, NULL, NULL);
CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session));
spdylay_submit_response(session, 1, nv, NULL);
CU_ASSERT(SPDYLAY_SYN_REPLY ==
spdylay_session_get_next_ob_item(session)->frame_type);
session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 3;
CU_ASSERT(SPDYLAY_SYN_STREAM ==
spdylay_session_get_next_ob_item(session)->frame_type);
spdylay_session_del(session);
}
void test_spdylay_session_pop_next_ob_item()
{
spdylay_session *session;
spdylay_session_callbacks callbacks;
const char *nv[] = { NULL };
spdylay_outbound_item *item;
memset(&callbacks, 0, sizeof(spdylay_session_callbacks));
callbacks.send_callback = null_send_callback;
spdylay_session_server_new(&session, &callbacks, NULL);
session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 1;
CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session));
spdylay_submit_ping(session);
spdylay_submit_request(session, 0, nv, NULL, NULL);
item = spdylay_session_pop_next_ob_item(session);
CU_ASSERT(SPDYLAY_PING == item->frame_type);
spdylay_outbound_item_free(item);
free(item);
item = spdylay_session_pop_next_ob_item(session);
CU_ASSERT(SPDYLAY_SYN_STREAM == item->frame_type);
spdylay_outbound_item_free(item);
free(item);
CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session));
spdylay_session_open_stream(session, 1, SPDYLAY_FLAG_NONE,
3, SPDYLAY_STREAM_OPENING, NULL);
spdylay_submit_request(session, 0, nv, NULL, NULL);
spdylay_submit_response(session, 1, nv, NULL);
item = spdylay_session_pop_next_ob_item(session);
CU_ASSERT(SPDYLAY_SYN_REPLY == item->frame_type);
spdylay_outbound_item_free(item);
free(item);
CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session));
spdylay_submit_response(session, 1, nv, NULL);
session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 2;
item = spdylay_session_pop_next_ob_item(session);
CU_ASSERT(SPDYLAY_SYN_STREAM == item->frame_type);
spdylay_outbound_item_free(item);
free(item);
spdylay_session_del(session);
}

View File

@ -43,5 +43,7 @@ void test_spdylay_session_on_data_received();
void test_spdylay_session_on_rst_received();
void test_spdylay_session_is_my_stream_id();
void test_spdylay_session_send_rst_stream();
void test_spdylay_session_get_next_ob_item();
void test_spdylay_session_pop_next_ob_item();
#endif // SPDYLAY_SESSION_TEST_H