Added ability to postpone DATA frames for asynchronous I/O.

This commit is contained in:
Tatsuhiro Tsujikawa 2012-02-19 23:42:25 +09:00
parent 4f28698572
commit 301eb29cd4
11 changed files with 302 additions and 78 deletions

View File

@ -34,13 +34,13 @@ lib_LTLIBRARIES = libspdylay.la
OBJECTS = spdylay_pq.c spdylay_map.c spdylay_queue.c \
spdylay_buffer.c spdylay_frame.c spdylay_zlib.c \
spdylay_session.c spdylay_helper.c spdylay_stream.c spdylay_npn.c \
spdylay_submit.c
spdylay_submit.c spdylay_outbound_item.c
HFILES = spdylay_pq.h spdylay_int.h spdylay_map.h spdylay_queue.h \
spdylay_buffer.h spdylay_frame.h spdylay_zlib.h \
spdylay_session.h spdylay_helper.h spdylay_stream.h spdylay_int.h \
spdylay_npn.h \
spdylay_submit.h
spdylay_submit.h spdylay_outbound_item.h
libspdylay_la_SOURCES = $(HFILES) $(OBJECTS)
libspdylay_la_LDFLAGS = -no-undefined \

View File

@ -48,7 +48,7 @@ typedef enum {
SPDYLAY_ERR_PROTO = -505,
SPDYLAY_ERR_INVALID_FRAME = -506,
SPDYLAY_ERR_EOF = -507,
SPDYLAY_ERR_DEFERRED = -508,
/* The errors < SPDYLAY_ERR_FATAL mean that the library is under
unexpected condition that it cannot process any further data
reliably (e.g., out of memory). */
@ -185,7 +185,13 @@ typedef union {
* |source|. The implementation of this function must read at most
* |length| bytes of data from |source| (or possibly other places) and
* store them in |buf| and return number of data stored in |buf|. If
* EOF is reached, set |*eof| to 1. In case of error, return
* EOF is reached, set |*eof| to 1. If the application wants to
* postpone DATA frames, (e.g., asynchronous I/O, or reading data
* blocks for long time), it is achieved by returning
* SPDYLAY_ERR_DEFERRED without reading any data in this invocation.
* The library removes DATA frame from outgoing queue temporarily. To
* move back deferred DATA frame to outgoing queue, call
* spdylay_session_resume_data(). In case of error, return
* SPDYLAY_ERR_CALLBACK_FAILURE, which leads to session failure.
*/
typedef ssize_t (*spdylay_data_source_read_callback)
@ -544,6 +550,13 @@ int spdylay_submit_goaway(spdylay_session *session);
int spdylay_select_next_protocol(unsigned char **out, unsigned char *outlen,
const unsigned char *in, unsigned int inlen);
/*
* Put back previously deferred DATA frame in the stream |stream_id|
* to outbound queue. This function returns 0 if it succeeds, or
* negative error code.
*/
int spdylay_session_resume_data(spdylay_session *session, int32_t stream_id);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,65 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "spdylay_outbound_item.h"
void spdylay_outbound_item_free(spdylay_outbound_item *item)
{
if(item == NULL) {
return;
}
switch(item->frame_type) {
case SPDYLAY_SYN_STREAM:
spdylay_frame_syn_stream_free(&item->frame->syn_stream);
free(((spdylay_syn_stream_aux_data*)item->aux_data)->data_prd);
break;
case SPDYLAY_SYN_REPLY:
spdylay_frame_syn_reply_free(&item->frame->syn_reply);
break;
case SPDYLAY_RST_STREAM:
spdylay_frame_rst_stream_free(&item->frame->rst_stream);
break;
case SPDYLAY_SETTINGS:
spdylay_frame_settings_free(&item->frame->settings);
break;
case SPDYLAY_NOOP:
/* We don't have any public API to add NOOP, so here is
unreachable. */
abort();
case SPDYLAY_PING:
spdylay_frame_ping_free(&item->frame->ping);
break;
case SPDYLAY_GOAWAY:
spdylay_frame_goaway_free(&item->frame->goaway);
break;
case SPDYLAY_HEADERS:
spdylay_frame_headers_free(&item->frame->headers);
break;
case SPDYLAY_DATA:
spdylay_frame_data_free(&item->frame->data);
break;
}
free(item->frame);
free(item->aux_data);
}

View File

@ -0,0 +1,54 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SPDYLAY_OUTBOUND_ITEM_H
#define SPDYLAY_OUTBOUND_ITEM_H
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif /* HAVE_CONFIG_H */
#include <spdylay/spdylay.h>
#include "spdylay_frame.h"
typedef struct {
spdylay_data_provider *data_prd;
void *stream_user_data;
} spdylay_syn_stream_aux_data;
typedef struct {
spdylay_frame_type frame_type;
spdylay_frame *frame;
void *aux_data;
int pri;
int64_t seq;
} spdylay_outbound_item;
/*
* Deallocates resource for |item|. If |item| is NULL, this function
* does nothing.
*/
void spdylay_outbound_item_free(spdylay_outbound_item *item);
#endif /* SPDYLAY_OUTBOUND_ITEM_H */

View File

@ -206,46 +206,6 @@ static void spdylay_free_streams(key_type key, void *val)
free(val);
}
void spdylay_outbound_item_free(spdylay_outbound_item *item)
{
if(item == NULL) {
return;
}
switch(item->frame_type) {
case SPDYLAY_SYN_STREAM:
spdylay_frame_syn_stream_free(&item->frame->syn_stream);
free(((spdylay_syn_stream_aux_data*)item->aux_data)->data_prd);
break;
case SPDYLAY_SYN_REPLY:
spdylay_frame_syn_reply_free(&item->frame->syn_reply);
break;
case SPDYLAY_RST_STREAM:
spdylay_frame_rst_stream_free(&item->frame->rst_stream);
break;
case SPDYLAY_SETTINGS:
spdylay_frame_settings_free(&item->frame->settings);
break;
case SPDYLAY_NOOP:
/* We don't have any public API to add NOOP, so here is
unreachable. */
abort();
case SPDYLAY_PING:
spdylay_frame_ping_free(&item->frame->ping);
break;
case SPDYLAY_GOAWAY:
spdylay_frame_goaway_free(&item->frame->goaway);
break;
case SPDYLAY_HEADERS:
spdylay_frame_headers_free(&item->frame->headers);
break;
case SPDYLAY_DATA:
spdylay_frame_data_free(&item->frame->data);
break;
}
free(item->frame);
free(item->aux_data);
}
static void spdylay_session_ob_pq_free(spdylay_pq *pq)
{
while(!spdylay_pq_empty(pq)) {
@ -257,6 +217,15 @@ static void spdylay_session_ob_pq_free(spdylay_pq *pq)
spdylay_pq_free(pq);
}
static void spdylay_active_outbound_item_reset
(spdylay_active_outbound_item *aob)
{
spdylay_outbound_item_free(aob->item);
free(aob->item);
aob->item = NULL;
aob->framebuflen = aob->framebufoff = 0;
}
void spdylay_session_del(spdylay_session *session)
{
if(session == NULL) {
@ -268,6 +237,7 @@ void spdylay_session_del(spdylay_session *session)
spdylay_session_ob_pq_free(&session->ob_ss_pq);
spdylay_zlib_deflate_free(&session->hd_deflater);
spdylay_zlib_inflate_free(&session->hd_inflater);
spdylay_active_outbound_item_reset(&session->aob);
free(session->aob.framebuf);
free(session->nvbuf);
spdylay_buffer_free(&session->inflatebuf);
@ -464,6 +434,12 @@ static int spdylay_session_is_data_allowed(spdylay_session *session,
if(stream == NULL) {
return 0;
}
if(stream->deferred_data != NULL) {
/* stream->deferred_data != NULL means previously queued DATA
frame has not been sent. We don't allow new DATA frame is sent
in this case. */
return 0;
}
if(spdylay_session_is_my_stream_id(session, stream_id)) {
/* If stream->state is SPDYLAY_STREAM_CLOSING, RST_STREAM was
queued but not yet sent. In this case, we won't send DATA
@ -589,7 +565,14 @@ ssize_t spdylay_session_prep_frame(spdylay_session *session,
&session->aob.framebuf,
&session->aob.framebufmax,
&item->frame->data);
if(framebuflen < 0) {
if(framebuflen == SPDYLAY_ERR_DEFERRED) {
spdylay_stream *stream = spdylay_session_get_stream
(session, item->frame->data.stream_id);
/* Assuming stream is not NULL */
assert(stream);
spdylay_stream_defer_data(stream, item);
return SPDYLAY_ERR_DEFERRED;
} else if(framebuflen < 0) {
return framebuflen;
}
break;
@ -600,15 +583,6 @@ ssize_t spdylay_session_prep_frame(spdylay_session *session,
return framebuflen;
}
static void spdylay_active_outbound_item_reset
(spdylay_active_outbound_item *aob)
{
spdylay_outbound_item_free(aob->item);
free(aob->item);
aob->item = NULL;
aob->framebuflen = aob->framebufoff = 0;
}
spdylay_outbound_item* spdylay_session_get_ob_pq_top
(spdylay_session *session)
{
@ -824,12 +798,21 @@ static int spdylay_session_after_frame_sent(spdylay_session *session)
session->aob.framebuf,
SPDYLAY_DATA_FRAME_LENGTH,
&frame->data);
if(r < 0) {
if(r == SPDYLAY_ERR_DEFERRED) {
spdylay_stream *stream =
spdylay_session_get_stream(session, frame->data.stream_id);
/* Assuming stream is not NULL */
assert(stream);
spdylay_stream_defer_data(stream, session->aob.item);
session->aob.item = NULL;
spdylay_active_outbound_item_reset(&session->aob);
} else if(r < 0) {
spdylay_active_outbound_item_reset(&session->aob);
return r;
} else {
session->aob.framebuflen = r;
session->aob.framebufoff = 0;
}
session->aob.framebuflen = r;
session->aob.framebufoff = 0;
} else {
r = spdylay_pq_push(&session->ob_pq, session->aob.item);
if(r == 0) {
@ -862,14 +845,16 @@ int spdylay_session_send(spdylay_session *session)
break;
}
framebuflen = spdylay_session_prep_frame(session, item);
if(framebuflen < 0) {
if(framebuflen == SPDYLAY_ERR_DEFERRED) {
continue;
} else if(framebuflen < 0) {
/* TODO Call error callback? */
spdylay_outbound_item_free(item);
free(item);
if(framebuflen <= SPDYLAY_ERR_FATAL) {
return framebuflen;
} else {
continue;;
continue;
}
}
session->aob.item = item;
@ -1746,3 +1731,18 @@ void* spdylay_session_get_stream_user_data(spdylay_session *session,
return NULL;
}
}
int spdylay_session_resume_data(spdylay_session *session, int32_t stream_id)
{
int r;
spdylay_stream *stream;
stream = spdylay_session_get_stream(session, stream_id);
if(stream == NULL || stream->deferred_data == NULL) {
return SPDYLAY_ERR_INVALID_ARGUMENT;
}
r = spdylay_pq_push(&session->ob_pq, stream->deferred_data);
if(r == 0) {
spdylay_stream_detach_deferred_data(stream);
}
return r;
}

View File

@ -36,14 +36,7 @@
#include "spdylay_zlib.h"
#include "spdylay_stream.h"
#include "spdylay_buffer.h"
typedef struct {
spdylay_frame_type frame_type;
spdylay_frame *frame;
void *aux_data;
int pri;
int64_t seq;
} spdylay_outbound_item;
#include "spdylay_outbound_item.h"
typedef struct {
spdylay_outbound_item *item;
@ -161,11 +154,6 @@ struct spdylay_session {
void *user_data;
};
typedef struct {
spdylay_data_provider *data_prd;
void *stream_user_data;
} spdylay_syn_stream_aux_data;
/* TODO stream timeout etc */
/*
@ -352,10 +340,4 @@ spdylay_outbound_item* spdylay_session_pop_next_ob_item
spdylay_outbound_item* spdylay_session_get_next_ob_item
(spdylay_session *session);
/*
* Deallocates resource for |item|. If |item| is NULL, this function
* does nothing.
*/
void spdylay_outbound_item_free(spdylay_outbound_item *item);
#endif /* SPDYLAY_SESSION_H */

View File

@ -24,6 +24,8 @@
*/
#include "spdylay_stream.h"
#include <assert.h>
void spdylay_stream_init(spdylay_stream *stream, int32_t stream_id,
uint8_t flags, uint8_t pri,
spdylay_stream_state initial_state,
@ -38,11 +40,14 @@ void spdylay_stream_init(spdylay_stream *stream, int32_t stream_id,
stream->pushed_streams_length = 0;
stream->pushed_streams_capacity = 0;
stream->stream_user_data = stream_user_data;
stream->deferred_data = NULL;
}
void spdylay_stream_free(spdylay_stream *stream)
{
free(stream->pushed_streams);
spdylay_outbound_item_free(stream->deferred_data);
free(stream->deferred_data);
}
void spdylay_stream_shutdown(spdylay_stream *stream, spdylay_shut_flag flag)
@ -66,3 +71,15 @@ int spdylay_stream_add_pushed_stream(spdylay_stream *stream, int32_t stream_id)
stream->pushed_streams[stream->pushed_streams_length++] = stream_id;
return 0;
}
void spdylay_stream_defer_data(spdylay_stream *stream,
spdylay_outbound_item *data)
{
assert(stream->deferred_data == NULL);
stream->deferred_data = data;
}
void spdylay_stream_detach_deferred_data(spdylay_stream *stream)
{
stream->deferred_data = NULL;
}

View File

@ -30,6 +30,7 @@
#endif /* HAVE_CONFIG_H */
#include <spdylay/spdylay.h>
#include "spdylay_outbound_item.h"
/*
* If local peer is stream initiator:
@ -87,6 +88,8 @@ typedef struct {
size_t pushed_streams_capacity;
/* The arbitrary data provided by user for this stream. */
void *stream_user_data;
/* Deferred DATA frame */
spdylay_outbound_item *deferred_data;
} spdylay_stream;
void spdylay_stream_init(spdylay_stream *stream, int32_t stream_id,
@ -115,4 +118,17 @@ void spdylay_stream_shutdown(spdylay_stream *stream, spdylay_shut_flag flag);
*/
int spdylay_stream_add_pushed_stream(spdylay_stream *stream, int32_t stream_id);
/*
* Defer DATA frame |data|. We won't call this function in the
* situation where stream->deferred_data != NULL.
*/
void spdylay_stream_defer_data(spdylay_stream *stream,
spdylay_outbound_item *data);
/*
* Detaches deferred data from this stream. This function does not
* free deferred data.
*/
void spdylay_stream_detach_deferred_data(spdylay_stream *stream);
#endif /* SPDYLAY_STREAM */

View File

@ -121,6 +121,8 @@ int main(int argc, char* argv[])
test_spdylay_session_stream_close_on_syn_stream) ||
!CU_add_test(pSuite, "session_recv_invalid_frame",
test_spdylay_session_recv_invalid_frame) ||
!CU_add_test(pSuite, "session_defer_data",
test_spdylay_session_defer_data) ||
!CU_add_test(pSuite, "frame_unpack_nv", test_spdylay_frame_unpack_nv) ||
!CU_add_test(pSuite, "frame_count_nv_space",
test_spdylay_frame_count_nv_space) ||

View File

@ -1272,3 +1272,77 @@ void test_spdylay_session_recv_invalid_frame()
spdylay_session_del(session);
}
static ssize_t defer_data_source_read_callback
(spdylay_session *session, uint8_t *buf, size_t len, int *eof,
spdylay_data_source *source, void *user_data)
{
return SPDYLAY_ERR_DEFERRED;
}
void test_spdylay_session_defer_data()
{
spdylay_session *session;
spdylay_session_callbacks callbacks;
const char *nv[] = { NULL };
my_user_data ud;
spdylay_data_provider data_prd;
spdylay_outbound_item *item;
memset(&callbacks, 0, sizeof(spdylay_session_callbacks));
callbacks.on_ctrl_send_callback = on_ctrl_send_callback;
callbacks.send_callback = block_count_send_callback;
data_prd.read_callback = defer_data_source_read_callback;
ud.ctrl_send_cb_called = 0;
ud.data_source_length = 16*1024;
spdylay_session_server_new(&session, &callbacks, &ud);
spdylay_session_open_stream(session, 1, SPDYLAY_FLAG_NONE, 3,
SPDYLAY_STREAM_OPENING, NULL);
spdylay_submit_response(session, 1, nv, &data_prd);
ud.block_count = 1;
/* Sends SYN_REPLY */
CU_ASSERT(0 == spdylay_session_send(session));
CU_ASSERT(SPDYLAY_SYN_REPLY == ud.sent_frame_type);
/* No data is read */
CU_ASSERT(ud.data_source_length == 16*1024);
ud.block_count = 1;
spdylay_submit_ping(session);
/* Sends PING */
CU_ASSERT(0 == spdylay_session_send(session));
CU_ASSERT(SPDYLAY_PING == ud.sent_frame_type);
/* Resume deferred DATA */
CU_ASSERT(0 == spdylay_session_resume_data(session, 1));
item = spdylay_session_get_ob_pq_top(session);
item->frame->data.data_prd.read_callback =
fixed_length_data_source_read_callback;
ud.block_count = 1;
/* Reads 2 4KiB blocks */
CU_ASSERT(0 == spdylay_session_send(session));
CU_ASSERT(ud.data_source_length == 8*1024);
/* Deferred again */
item->frame->data.data_prd.read_callback = defer_data_source_read_callback;
/* This is needed since 4KiB block is already read and waiting to be
sent. No read_callback invocation. */
ud.block_count = 1;
CU_ASSERT(0 == spdylay_session_send(session));
CU_ASSERT(ud.data_source_length == 8*1024);
/* Resume deferred DATA */
CU_ASSERT(0 == spdylay_session_resume_data(session, 1));
item = spdylay_session_get_ob_pq_top(session);
item->frame->data.data_prd.read_callback =
fixed_length_data_source_read_callback;
ud.block_count = 1;
/* Reads 2 4KiB blocks */
CU_ASSERT(0 == spdylay_session_send(session));
CU_ASSERT(ud.data_source_length == 0);
spdylay_session_del(session);
}

View File

@ -52,5 +52,6 @@ void test_spdylay_session_data_backoff_by_high_pri_frame();
void test_spdylay_session_stop_data_with_rst_stream();
void test_spdylay_session_stream_close_on_syn_stream();
void test_spdylay_session_recv_invalid_frame();
void test_spdylay_session_defer_data();
#endif // SPDYLAY_SESSION_TEST_H