Having the number of server and client streams be limited separately

using SETTINGS_MAX_CONCURRENT_STREAMS
This commit is contained in:
Tatsuhiro Tsujikawa 2012-05-08 00:59:26 +09:00
parent 67eca8d078
commit 02e4440e4a
3 changed files with 76 additions and 27 deletions

View File

@ -33,18 +33,27 @@
#include "spdylay_net.h"
/*
* Returns non-zero if the number of opened streams is larger than or
* equal to SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS value.
* Returns non-zero if the number of outgoing opened streams is larger
* than or equal to
* remote_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS].
*/
static int spdylay_session_get_max_concurrent_streams_reached
static int spdylay_session_is_outgoing_concurrent_streams_max
(spdylay_session *session)
{
uint32_t local_max, remote_max;
local_max = session->local_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS];
remote_max =
session->remote_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS];
return spdylay_min(local_max, remote_max)
<= spdylay_map_size(&session->streams);
return session->remote_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS]
<= session->num_outgoing_streams;
}
/*
* Returns non-zero if the number of incoming opened streams is larger
* than or equal to
* local_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS].
*/
static int spdylay_session_is_incoming_concurrent_streams_max
(spdylay_session *session)
{
return session->local_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS]
<= session->num_incoming_streams;
}
/*
@ -451,6 +460,11 @@ spdylay_stream* spdylay_session_open_stream(spdylay_session *session,
free(stream);
stream = NULL;
}
if(spdylay_session_is_my_stream_id(session, stream_id)) {
++session->num_outgoing_streams;
} else {
++session->num_incoming_streams;
}
return stream;
}
@ -465,6 +479,11 @@ int spdylay_session_close_stream(spdylay_session *session, int32_t stream_id,
status_code,
session->user_data);
}
if(spdylay_session_is_my_stream_id(session, stream_id)) {
--session->num_outgoing_streams;
} else {
--session->num_incoming_streams;
}
spdylay_map_erase(&session->streams, stream_id);
spdylay_stream_free(stream);
free(stream);
@ -1128,7 +1147,7 @@ spdylay_outbound_item* spdylay_session_get_next_ob_item
} else {
/* Return item only when concurrent connection limit is not
reached */
if(spdylay_session_get_max_concurrent_streams_reached(session)) {
if(spdylay_session_is_outgoing_concurrent_streams_max(session)) {
return NULL;
} else {
return spdylay_pq_top(&session->ob_ss_pq);
@ -1141,7 +1160,7 @@ spdylay_outbound_item* spdylay_session_get_next_ob_item
spdylay_outbound_item *item, *syn_stream_item;
item = spdylay_pq_top(&session->ob_pq);
syn_stream_item = spdylay_pq_top(&session->ob_ss_pq);
if(spdylay_session_get_max_concurrent_streams_reached(session) ||
if(spdylay_session_is_outgoing_concurrent_streams_max(session) ||
item->pri < syn_stream_item->pri ||
(item->pri == syn_stream_item->pri &&
item->seq < syn_stream_item->seq)) {
@ -1162,7 +1181,7 @@ spdylay_outbound_item* spdylay_session_pop_next_ob_item
} else {
/* Pop item only when concurrent connection limit is not
reached */
if(spdylay_session_get_max_concurrent_streams_reached(session)) {
if(spdylay_session_is_outgoing_concurrent_streams_max(session)) {
return NULL;
} else {
spdylay_outbound_item *item;
@ -1181,7 +1200,7 @@ spdylay_outbound_item* spdylay_session_pop_next_ob_item
spdylay_outbound_item *item, *syn_stream_item;
item = spdylay_pq_top(&session->ob_pq);
syn_stream_item = spdylay_pq_top(&session->ob_ss_pq);
if(spdylay_session_get_max_concurrent_streams_reached(session) ||
if(spdylay_session_is_outgoing_concurrent_streams_max(session) ||
item->pri < syn_stream_item->pri ||
(item->pri == syn_stream_item->pri &&
item->seq < syn_stream_item->seq)) {
@ -1558,8 +1577,8 @@ static int spdylay_session_check_version(spdylay_session *session,
}
/*
* Validates SYN_STREAM frame |frame|. This function returns 0 if it
* succeeds, or non-zero spdylay_status_code.
* Validates received SYN_STREAM frame |frame|. This function returns
* 0 if it succeeds, or non-zero spdylay_status_code.
*/
static int spdylay_session_validate_syn_stream(spdylay_session *session,
spdylay_syn_stream *frame)
@ -1586,7 +1605,7 @@ static int spdylay_session_validate_syn_stream(spdylay_session *session,
return SPDYLAY_PROTOCOL_ERROR;
}
}
if(spdylay_session_get_max_concurrent_streams_reached(session)) {
if(spdylay_session_is_incoming_concurrent_streams_max(session)) {
/* spdy/2 spec does not clearly say what to do when max concurrent
streams number is reached. The mod_spdy sends
SPDYLAY_REFUSED_STREAM and we think it is reasonable. So we
@ -2432,7 +2451,7 @@ int spdylay_session_want_write(spdylay_session *session)
*/
return (session->aob.item != NULL || !spdylay_pq_empty(&session->ob_pq) ||
(!spdylay_pq_empty(&session->ob_ss_pq) &&
!spdylay_session_get_max_concurrent_streams_reached(session))) &&
!spdylay_session_is_outgoing_concurrent_streams_max(session))) &&
(!session->goaway_flags || spdylay_map_size(&session->streams) > 0);
}

View File

@ -133,6 +133,13 @@ struct spdylay_session {
int64_t next_seq;
spdylay_map /* <spdylay_stream*> */ streams;
/* The number of outgoing streams. This will be capped by
remote_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS]. */
size_t num_outgoing_streams;
/* The number of incoming streams. This will be capped by
local_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS]. */
size_t num_incoming_streams;
/* Queue for outbound frames other than SYN_STREAM */
spdylay_pq /* <spdylay_outbound_item*> */ ob_pq;
/* Queue for outbound SYN_STREAM frame */

View File

@ -457,6 +457,20 @@ void test_spdylay_session_on_syn_stream_received(void)
spdylay_frame_syn_stream_free(&frame.syn_stream);
/* More than max concurrent streams leads REFUSED_STREAM */
session->local_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 1;
spdylay_frame_syn_stream_init(&frame.syn_stream, SPDYLAY_PROTO_SPDY2,
SPDYLAY_CTRL_FLAG_NONE,
5, 0, 3, dup_nv(nv));
user_data.invalid_ctrl_recv_cb_called = 0;
CU_ASSERT(0 == spdylay_session_on_syn_stream_received(session, &frame));
CU_ASSERT(1 == user_data.invalid_ctrl_recv_cb_called);
spdylay_frame_syn_stream_free(&frame.syn_stream);
session->local_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] =
SPDYLAY_INITIAL_MAX_CONCURRENT_STREAMS;
/* Stream ID less than previouly received SYN_STREAM leads session
error */
spdylay_frame_syn_stream_init(&frame.syn_stream, SPDYLAY_PROTO_SPDY2,
@ -468,6 +482,7 @@ void test_spdylay_session_on_syn_stream_received(void)
CU_ASSERT(session->goaway_flags & SPDYLAY_GOAWAY_FAIL_ON_SEND);
spdylay_frame_syn_stream_free(&frame.syn_stream);
spdylay_session_del(session);
}
@ -1236,7 +1251,7 @@ void test_spdylay_session_get_next_ob_item(void)
callbacks.send_callback = null_send_callback;
spdylay_session_server_new(&session, SPDYLAY_PROTO_SPDY2, &callbacks, NULL);
session->local_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 2;
session->remote_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 2;
CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session));
spdylay_submit_ping(session);
@ -1250,17 +1265,20 @@ void test_spdylay_session_get_next_ob_item(void)
CU_ASSERT(0 == spdylay_session_send(session));
CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session));
/* Incoming stream does not affect the number of outgoing max
concurrent streams. */
spdylay_session_open_stream(session, 1, SPDYLAY_CTRL_FLAG_NONE,
3, SPDYLAY_STREAM_OPENING, NULL);
spdylay_submit_request(session, 0, nv, NULL, NULL);
CU_ASSERT(SPDYLAY_SYN_STREAM ==
OB_CTRL_TYPE(spdylay_session_get_next_ob_item(session)));
CU_ASSERT(0 == spdylay_session_send(session));
spdylay_submit_request(session, 0, nv, NULL, NULL);
CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session));
spdylay_submit_response(session, 1, nv, NULL);
CU_ASSERT(SPDYLAY_SYN_REPLY ==
OB_CTRL_TYPE(spdylay_session_get_next_ob_item(session)));
session->local_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 3;
session->remote_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 3;
CU_ASSERT(SPDYLAY_SYN_STREAM ==
OB_CTRL_TYPE(spdylay_session_get_next_ob_item(session)));
@ -1278,11 +1296,11 @@ void test_spdylay_session_pop_next_ob_item(void)
callbacks.send_callback = null_send_callback;
spdylay_session_server_new(&session, SPDYLAY_PROTO_SPDY2, &callbacks, NULL);
session->local_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 1;
session->remote_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 1;
CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session));
spdylay_submit_ping(session);
spdylay_submit_request(session, 0, nv, NULL, NULL);
spdylay_submit_request(session, 1, nv, NULL, NULL);
item = spdylay_session_pop_next_ob_item(session);
CU_ASSERT(SPDYLAY_PING == OB_CTRL_TYPE(item));
@ -1296,9 +1314,15 @@ void test_spdylay_session_pop_next_ob_item(void)
CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session));
/* Incoming stream does not affect the number of outgoing max
concurrent streams. */
spdylay_session_open_stream(session, 1, SPDYLAY_CTRL_FLAG_NONE,
3, SPDYLAY_STREAM_OPENING, NULL);
/* In-flight outgoing stream */
spdylay_session_open_stream(session, 4, SPDYLAY_CTRL_FLAG_NONE,
3, SPDYLAY_STREAM_OPENING, NULL);
spdylay_submit_request(session, 0, nv, NULL, NULL);
spdylay_submit_response(session, 1, nv, NULL);
@ -1309,8 +1333,7 @@ void test_spdylay_session_pop_next_ob_item(void)
CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session));
spdylay_submit_response(session, 1, nv, NULL);
session->local_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 2;
session->remote_settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 2;
item = spdylay_session_pop_next_ob_item(session);
CU_ASSERT(SPDYLAY_SYN_STREAM == OB_CTRL_TYPE(item));