From 00bed87537dfd3a8c5a55240d85ab33b90822199 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Mon, 6 Feb 2012 00:14:19 +0900 Subject: [PATCH] Support max concurrent streams limit. If max concurrent streams limit is reached, SYN_STREAM frames are not sent and backed off. If other type of frame is waiting in the tx queue, it is sent first. We introduced another priority queue for this purpose. In this change we did not add code to send RST_STREAM when SYN_STREAM is received but max concurrent stream is reached. --- lib/includes/spdylay/spdylay.h | 6 + lib/spdylay_session.c | 197 ++++++++++++++++++++++++++------- lib/spdylay_session.h | 33 ++++++ tests/main.c | 4 + tests/spdylay_session_test.c | 95 +++++++++++++++- tests/spdylay_session_test.h | 2 + 6 files changed, 294 insertions(+), 43 deletions(-) diff --git a/lib/includes/spdylay/spdylay.h b/lib/includes/spdylay/spdylay.h index 5d37308..b5b03c7 100644 --- a/lib/includes/spdylay/spdylay.h +++ b/lib/includes/spdylay/spdylay.h @@ -99,6 +99,12 @@ typedef enum { SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE = 7 } spdylay_settings_id; +/* Maximum ID of spdylay_settings_id. */ +#define SPDYLAY_SETTINGS_MAX 7 + +/* Default maximum concurrent streams */ +#define SPDYLAY_CONCURRENT_STREAMS_MAX 100 + typedef enum { SPDYLAY_OK = 0, SPDYLAY_PROTOCOL_ERROR = 1, diff --git a/lib/spdylay_session.c b/lib/spdylay_session.c index 26f8768..57501fd 100644 --- a/lib/spdylay_session.c +++ b/lib/spdylay_session.c @@ -109,6 +109,20 @@ static int spdylay_session_new(spdylay_session **session_ptr, free(*session_ptr); return r; } + r = spdylay_pq_init(&(*session_ptr)->ob_ss_pq, spdylay_outbound_item_compar); + if(r != 0) { + spdylay_pq_free(&(*session_ptr)->ob_pq); + spdylay_map_free(&(*session_ptr)->streams); + spdylay_zlib_inflate_free(&(*session_ptr)->hd_inflater); + spdylay_zlib_deflate_free(&(*session_ptr)->hd_deflater); + free(*session_ptr); + return r; + } + + memset((*session_ptr)->settings, 0, sizeof((*session_ptr)->settings)); + (*session_ptr)->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = + SPDYLAY_CONCURRENT_STREAMS_MAX; + (*session_ptr)->callbacks = *callbacks; (*session_ptr)->user_data = user_data; @@ -156,7 +170,7 @@ static void spdylay_free_streams(key_type key, void *val) free(val); } -static void spdylay_outbound_item_free(spdylay_outbound_item *item) +void spdylay_outbound_item_free(spdylay_outbound_item *item) { if(item == NULL) { return; @@ -196,18 +210,26 @@ static void spdylay_outbound_item_free(spdylay_outbound_item *item) free(item->aux_data); } -void spdylay_session_del(spdylay_session *session) +static void spdylay_session_ob_pq_free(spdylay_pq *pq) { - spdylay_map_each(&session->streams, spdylay_free_streams); - spdylay_map_free(&session->streams); - while(!spdylay_pq_empty(&session->ob_pq)) { - spdylay_outbound_item *item = (spdylay_outbound_item*) - spdylay_pq_top(&session->ob_pq); + while(!spdylay_pq_empty(pq)) { + spdylay_outbound_item *item = (spdylay_outbound_item*)spdylay_pq_top(pq); spdylay_outbound_item_free(item); free(item); - spdylay_pq_pop(&session->ob_pq); + spdylay_pq_pop(pq); } - spdylay_pq_free(&session->ob_pq); + spdylay_pq_free(pq); +} + +void spdylay_session_del(spdylay_session *session) +{ + if(session == NULL) { + return; + } + spdylay_map_each(&session->streams, spdylay_free_streams); + spdylay_map_free(&session->streams); + spdylay_session_ob_pq_free(&session->ob_pq); + spdylay_session_ob_pq_free(&session->ob_ss_pq); spdylay_zlib_deflate_free(&session->hd_deflater); spdylay_zlib_inflate_free(&session->hd_inflater); free(session->iframe.buf); @@ -279,7 +301,11 @@ int spdylay_session_add_frame(spdylay_session *session, break; } }; - r = spdylay_pq_push(&session->ob_pq, item); + if(frame_type == SPDYLAY_SYN_STREAM) { + r = spdylay_pq_push(&session->ob_ss_pq, item); + } else { + r = spdylay_pq_push(&session->ob_pq, item); + } if(r != 0) { free(item); return r; @@ -531,6 +557,86 @@ spdylay_outbound_item* spdylay_session_get_ob_pq_top return (spdylay_outbound_item*)spdylay_pq_top(&session->ob_pq); } +spdylay_outbound_item* spdylay_session_get_next_ob_item +(spdylay_session *session) +{ + if(spdylay_pq_empty(&session->ob_pq)) { + if(spdylay_pq_empty(&session->ob_ss_pq)) { + return NULL; + } else { + /* Return item only when concurrent connection limit is not + reached */ + if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] + > spdylay_map_size(&session->streams)) { + return spdylay_pq_top(&session->ob_ss_pq); + } else { + return NULL; + } + } + } else { + if(spdylay_pq_empty(&session->ob_ss_pq)) { + return spdylay_pq_top(&session->ob_pq); + } else { + spdylay_outbound_item *item, *syn_stream_item; + item = spdylay_pq_top(&session->ob_pq); + syn_stream_item = spdylay_pq_top(&session->ob_ss_pq); + if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] + <= spdylay_map_size(&session->streams) || + item->pri < syn_stream_item->pri || + (item->pri == syn_stream_item->pri && + item->seq < syn_stream_item->seq)) { + return item; + } else { + return syn_stream_item; + } + } + } +} + +spdylay_outbound_item* spdylay_session_pop_next_ob_item +(spdylay_session *session) +{ + if(spdylay_pq_empty(&session->ob_pq)) { + if(spdylay_pq_empty(&session->ob_ss_pq)) { + return NULL; + } else { + /* Pop item only when concurrent connection limit is not + reached */ + if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] + > spdylay_map_size(&session->streams)) { + spdylay_outbound_item *item; + item = spdylay_pq_top(&session->ob_ss_pq); + spdylay_pq_pop(&session->ob_ss_pq); + return item; + } else { + return NULL; + } + } + } else { + if(spdylay_pq_empty(&session->ob_ss_pq)) { + spdylay_outbound_item *item; + item = spdylay_pq_top(&session->ob_pq); + spdylay_pq_pop(&session->ob_pq); + return item; + } else { + spdylay_outbound_item *item, *syn_stream_item; + item = spdylay_pq_top(&session->ob_pq); + syn_stream_item = spdylay_pq_top(&session->ob_ss_pq); + if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] + <= spdylay_map_size(&session->streams) || + item->pri < syn_stream_item->pri || + (item->pri == syn_stream_item->pri && + item->seq < syn_stream_item->seq)) { + spdylay_pq_pop(&session->ob_pq); + return item; + } else { + spdylay_pq_pop(&session->ob_ss_pq); + return syn_stream_item; + } + } + } +} + static int spdylay_session_after_frame_sent(spdylay_session *session) { /* TODO handle FIN flag. */ @@ -645,31 +751,32 @@ static int spdylay_session_after_frame_sent(spdylay_session *session) int r; if(frame->data.flags & SPDYLAY_FLAG_FIN) { spdylay_active_outbound_item_reset(&session->aob); - } else if(spdylay_pq_empty(&session->ob_pq) || - session->aob.item->pri <= - spdylay_session_get_ob_pq_top(session)->pri) { - /* If priority of this stream is higher or equal to other stream - waiting at the top of the queue, we continue to send this - data. */ - /* We assume that buffer has at least - SPDYLAY_DATA_FRAME_LENGTH. */ - r = spdylay_session_pack_data_overwrite(session, - session->aob.framebuf, - SPDYLAY_DATA_FRAME_LENGTH, - &frame->data); - if(r < 0) { - spdylay_active_outbound_item_reset(&session->aob); - return r; - } - session->aob.framebufoff = 0; } else { - r = spdylay_pq_push(&session->ob_pq, session->aob.item); - if(r == 0) { - session->aob.item = NULL; - spdylay_active_outbound_item_reset(&session->aob); + spdylay_outbound_item* item = spdylay_session_get_next_ob_item(session); + if(item == NULL || session->aob.item->pri <= item->pri) { + /* If priority of this stream is higher or equal to other stream + waiting at the top of the queue, we continue to send this + data. */ + /* We assume that buffer has at least + SPDYLAY_DATA_FRAME_LENGTH. */ + r = spdylay_session_pack_data_overwrite(session, + session->aob.framebuf, + SPDYLAY_DATA_FRAME_LENGTH, + &frame->data); + if(r < 0) { + spdylay_active_outbound_item_reset(&session->aob); + return r; + } + session->aob.framebufoff = 0; } else { - spdylay_active_outbound_item_reset(&session->aob); - return r; + r = spdylay_pq_push(&session->ob_pq, session->aob.item); + if(r == 0) { + session->aob.item = NULL; + spdylay_active_outbound_item_reset(&session->aob); + } else { + spdylay_active_outbound_item_reset(&session->aob); + return r; + } } } } else { @@ -681,15 +788,18 @@ static int spdylay_session_after_frame_sent(spdylay_session *session) int spdylay_session_send(spdylay_session *session) { int r; - while(session->aob.item || !spdylay_pq_empty(&session->ob_pq)) { + while(1) { const uint8_t *data; size_t datalen; ssize_t sentlen; if(session->aob.item == NULL) { - spdylay_outbound_item *item = spdylay_pq_top(&session->ob_pq); + spdylay_outbound_item *item; uint8_t *framebuf; ssize_t framebuflen; - spdylay_pq_pop(&session->ob_pq); + item = spdylay_session_pop_next_ob_item(session); + if(item == NULL) { + break; + } framebuflen = spdylay_session_prep_frame(session, item, &framebuf); if(framebuflen < 0) { /* TODO Call error callback? */ @@ -1323,13 +1433,16 @@ int spdylay_session_want_write(spdylay_session *session) { /* * Unless GOAWAY is sent or received, we want to write frames if - * there is pending ones. After GOAWAY is sent or received, we want - * to write frames if there is pending ones AND there are active - * frames. + * there is pending ones. If pending frame is SYN_STREAM and + * concurrent stream limit is reached, we don't want to write + * SYN_STREAM. After GOAWAY is sent or received, we want to write + * frames if there is pending ones AND there are active frames. */ - return (session->aob.item != NULL || !spdylay_pq_empty(&session->ob_pq)) && - (!session->goaway_flags || - spdylay_map_size(&session->streams) > 0); + return (session->aob.item != NULL || !spdylay_pq_empty(&session->ob_pq) || + (!spdylay_pq_empty(&session->ob_ss_pq) && + session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] + > spdylay_map_size(&session->streams))) && + (!session->goaway_flags || spdylay_map_size(&session->streams) > 0); } int spdylay_session_add_ping(spdylay_session *session, uint32_t unique_id) diff --git a/lib/spdylay_session.h b/lib/spdylay_session.h index 9b938bc..740374b 100644 --- a/lib/spdylay_session.h +++ b/lib/spdylay_session.h @@ -100,7 +100,10 @@ struct spdylay_session { int64_t next_seq; spdylay_map /* */ streams; + /* Queue for outbound frames other than SYN_STREAM */ spdylay_pq /* */ ob_pq; + /* Queue for outbound SYN_STREAM frame */ + spdylay_pq /* */ ob_ss_pq; spdylay_active_outbound_item aob; @@ -119,6 +122,10 @@ struct spdylay_session { /* This is the value in GOAWAY frame sent by remote endpoint. */ int32_t last_good_stream_id; + /* Settings value store. We just use ID as index. The index = 0 is + unused. */ + uint32_t settings[SPDYLAY_SETTINGS_MAX+1]; + spdylay_session_callbacks callbacks; void *user_data; }; @@ -293,4 +300,30 @@ uint32_t spdylay_session_get_next_unique_id(spdylay_session *session); */ spdylay_outbound_item* spdylay_session_get_ob_pq_top(spdylay_session *session); +/* + * Pops and returns next item to send. If there is no such item, + * returns NULL. This function takes into account max concurrent + * streams. That means if session->ob_pq is empty but + * session->ob_ss_pq has item and max concurrent streams is reached, + * then this function returns NULL. + */ +spdylay_outbound_item* spdylay_session_pop_next_ob_item +(spdylay_session *session); + +/* + * Returns next item to send. If there is no such item, this function + * returns NULL. This function takes into account max concurrent + * streams. That means if session->ob_pq is empty but + * session->ob_ss_pq has item and max concurrent streams is reached, + * then this function returns NULL. + */ +spdylay_outbound_item* spdylay_session_get_next_ob_item +(spdylay_session *session); + +/* + * Deallocates resource for |item|. If |item| is NULL, this function + * does nothing. + */ +void spdylay_outbound_item_free(spdylay_outbound_item *item); + #endif /* SPDYLAY_SESSION_H */ diff --git a/tests/main.c b/tests/main.c index 53d3232..006f24d 100644 --- a/tests/main.c +++ b/tests/main.c @@ -103,6 +103,10 @@ int main(int argc, char* argv[]) test_spdylay_session_is_my_stream_id) || !CU_add_test(pSuite, "session_send_rst_stream", test_spdylay_session_send_rst_stream) || + !CU_add_test(pSuite, "session_get_next_ob_item", + test_spdylay_session_get_next_ob_item) || + !CU_add_test(pSuite, "session_pop_next_ob_item", + test_spdylay_session_pop_next_ob_item) || !CU_add_test(pSuite, "frame_unpack_nv", test_spdylay_frame_unpack_nv) || !CU_add_test(pSuite, "frame_count_nv_space", test_spdylay_frame_count_nv_space) || diff --git a/tests/spdylay_session_test.c b/tests/spdylay_session_test.c index f4b4271..a67a900 100644 --- a/tests/spdylay_session_test.c +++ b/tests/spdylay_session_test.c @@ -209,7 +209,7 @@ void test_spdylay_session_add_frame() CU_ASSERT(0 == spdylay_session_add_frame(session, SPDYLAY_SYN_STREAM, frame, aux_data)); - CU_ASSERT(0 == spdylay_pq_empty(&session->ob_pq)); + CU_ASSERT(0 == spdylay_pq_empty(&session->ob_ss_pq)); CU_ASSERT(0 == spdylay_session_send(session)); CU_ASSERT(memcmp(hd_ans1, acc.buf, 4) == 0); /* check stream id */ @@ -790,3 +790,96 @@ void test_spdylay_session_send_rst_stream() spdylay_session_del(session); } + +void test_spdylay_session_get_next_ob_item() +{ + spdylay_session *session; + spdylay_session_callbacks callbacks; + const char *nv[] = { NULL }; + memset(&callbacks, 0, sizeof(spdylay_session_callbacks)); + callbacks.send_callback = null_send_callback; + + spdylay_session_server_new(&session, &callbacks, NULL); + session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 2; + + CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session)); + spdylay_submit_ping(session); + CU_ASSERT(SPDYLAY_PING == + spdylay_session_get_next_ob_item(session)->frame_type); + + spdylay_submit_request(session, 0, nv, NULL, NULL); + CU_ASSERT(SPDYLAY_PING == + spdylay_session_get_next_ob_item(session)->frame_type); + + CU_ASSERT(0 == spdylay_session_send(session)); + CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session)); + + spdylay_session_open_stream(session, 1, SPDYLAY_FLAG_NONE, + 3, SPDYLAY_STREAM_OPENING, NULL); + + spdylay_submit_request(session, 0, nv, NULL, NULL); + CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session)); + + spdylay_submit_response(session, 1, nv, NULL); + CU_ASSERT(SPDYLAY_SYN_REPLY == + spdylay_session_get_next_ob_item(session)->frame_type); + + session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 3; + + CU_ASSERT(SPDYLAY_SYN_STREAM == + spdylay_session_get_next_ob_item(session)->frame_type); + + spdylay_session_del(session); +} + +void test_spdylay_session_pop_next_ob_item() +{ + spdylay_session *session; + spdylay_session_callbacks callbacks; + const char *nv[] = { NULL }; + spdylay_outbound_item *item; + memset(&callbacks, 0, sizeof(spdylay_session_callbacks)); + callbacks.send_callback = null_send_callback; + + spdylay_session_server_new(&session, &callbacks, NULL); + session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 1; + + CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session)); + spdylay_submit_ping(session); + spdylay_submit_request(session, 0, nv, NULL, NULL); + + item = spdylay_session_pop_next_ob_item(session); + CU_ASSERT(SPDYLAY_PING == item->frame_type); + spdylay_outbound_item_free(item); + free(item); + + item = spdylay_session_pop_next_ob_item(session); + CU_ASSERT(SPDYLAY_SYN_STREAM == item->frame_type); + spdylay_outbound_item_free(item); + free(item); + + CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session)); + + spdylay_session_open_stream(session, 1, SPDYLAY_FLAG_NONE, + 3, SPDYLAY_STREAM_OPENING, NULL); + + spdylay_submit_request(session, 0, nv, NULL, NULL); + spdylay_submit_response(session, 1, nv, NULL); + + item = spdylay_session_pop_next_ob_item(session); + CU_ASSERT(SPDYLAY_SYN_REPLY == item->frame_type); + spdylay_outbound_item_free(item); + free(item); + + CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session)); + + spdylay_submit_response(session, 1, nv, NULL); + session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 2; + + item = spdylay_session_pop_next_ob_item(session); + CU_ASSERT(SPDYLAY_SYN_STREAM == item->frame_type); + spdylay_outbound_item_free(item); + free(item); + + spdylay_session_del(session); +} diff --git a/tests/spdylay_session_test.h b/tests/spdylay_session_test.h index 7513452..66efd7b 100644 --- a/tests/spdylay_session_test.h +++ b/tests/spdylay_session_test.h @@ -43,5 +43,7 @@ void test_spdylay_session_on_data_received(); void test_spdylay_session_on_rst_received(); void test_spdylay_session_is_my_stream_id(); void test_spdylay_session_send_rst_stream(); +void test_spdylay_session_get_next_ob_item(); +void test_spdylay_session_pop_next_ob_item(); #endif // SPDYLAY_SESSION_TEST_H