From 21e165f1f8d311caa8d7255feb45e99233cb1333 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Fri, 27 Jan 2012 01:17:40 +0900 Subject: [PATCH] Added spdylay_reply_submit() and DATA frame handling after SYN_REPLY. --- lib/includes/spdylay/spdylay.h | 41 ++++++- lib/spdylay_frame.c | 41 ++++++- lib/spdylay_frame.h | 13 +++ lib/spdylay_session.c | 206 ++++++++++++++++++++++++++++++--- lib/spdylay_session.h | 21 ++++ tests/main.c | 1 + tests/spdylay_session_test.c | 49 ++++++++ tests/spdylay_session_test.h | 1 + 8 files changed, 346 insertions(+), 27 deletions(-) diff --git a/lib/includes/spdylay/spdylay.h b/lib/includes/spdylay/spdylay.h index 946b4e6..b29b498 100644 --- a/lib/includes/spdylay/spdylay.h +++ b/lib/includes/spdylay/spdylay.h @@ -32,6 +32,9 @@ extern "C" { #include #include +struct spdylay_session; +typedef struct spdylay_session spdylay_session; + typedef enum { SPDYLAY_ERR_INVALID_ARGUMENT = -501, SPDYLAY_ERR_ZLIB = -502, @@ -60,6 +63,7 @@ typedef enum { SPDYLAY_NOOP = 5, SPDYLAY_PING = 6, SPDYLAY_GOAWAY = 7, + SPDYLAY_DATA = 100, } spdylay_frame_type; typedef enum { @@ -106,15 +110,33 @@ typedef struct { uint32_t status_code; } spdylay_rst_stream; +typedef union { + int fd; + void *ptr; +} spdylay_data_source; + +typedef ssize_t (*spdylay_data_source_read_callback) +(spdylay_session *session, uint8_t *buf, size_t length, int *eof, + spdylay_data_source *source, void *user_data); + +typedef struct { + spdylay_data_source source; + spdylay_data_source_read_callback read_callback; +} spdylay_data_provider; + +typedef struct { + int32_t stream_id; + uint8_t flags; + spdylay_data_provider data_prd; +} spdylay_data; + typedef union { spdylay_syn_stream syn_stream; spdylay_syn_reply syn_reply; spdylay_rst_stream rst_stream; + spdylay_data data; } spdylay_frame; -struct spdylay_session; -typedef struct spdylay_session spdylay_session; - typedef ssize_t (*spdylay_send_callback) (spdylay_session *session, const uint8_t *data, size_t length, int flags, void *user_data); @@ -163,6 +185,19 @@ int spdylay_session_want_write(spdylay_session *session); int spdylay_req_submit(spdylay_session *session, const char *path); +/* + * Submits SYN_REPLY frame against stream |stream_id|. |nv| must + * include "status" and "version" key. "status" must be status code + * (e.g., "200" or "200 OK"). "version" is HTTP response version + * (e.g., "HTTP/1.1"). This function creates copies of all name/value + * pairs in |nv|. If |data_prd| is not NULL, it provides data which + * will be sent in subsequent DATA frames. If |data_prd| is NULL, + * SYN_REPLY will have FLAG_FIN. + */ +int spdylay_reply_submit(spdylay_session *session, + int32_t stream_id, const char **nv, + spdylay_data_provider *data_prd); + #ifdef __cplusplus } #endif diff --git a/lib/spdylay_frame.c b/lib/spdylay_frame.c index 2216877..5e155d1 100644 --- a/lib/spdylay_frame.c +++ b/lib/spdylay_frame.c @@ -313,12 +313,32 @@ int spdylay_frame_is_ctrl_frame(uint8_t first_byte) void spdylay_frame_nv_free(char **nv) { int i; - for(i = 0; nv[i]; i += 2) { + for(i = 0; nv[i]; ++i) { free(nv[i]); - free(nv[i+1]); } } +char** spdylay_frame_nv_copy(const char **nv) +{ + int n, i; + char **nnv; + for(n = 0;nv[n]; ++n); + nnv = malloc((n+1)*sizeof(char*)); + if(nnv == NULL) { + return NULL; + } + for(i = 0; i < n; ++i) { + nnv[i] = strdup(nv[i]); + if(nnv[i] == NULL) { + spdylay_frame_nv_free(nnv[i]); + free(nnv); + return NULL; + } + } + nnv[n] = NULL; + return nnv; +} + void spdylay_frame_syn_stream_init(spdylay_syn_stream *frame, uint8_t flags, int32_t stream_id, int32_t assoc_stream_id, uint8_t pri, char **nv) @@ -371,12 +391,23 @@ void spdylay_frame_rst_stream_init(spdylay_rst_stream *frame, void spdylay_frame_rst_stream_free(spdylay_rst_stream *frame) {} +void spdylay_frame_data_init(spdylay_data *frame, int32_t stream_id, + spdylay_data_provider *data_prd) +{ + memset(frame, 0, sizeof(spdylay_data)); + frame->stream_id = stream_id; + frame->data_prd = *data_prd; +} + +void spdylay_frame_data_free(spdylay_data *frame) +{} + ssize_t spdylay_frame_pack_syn_stream(uint8_t **buf_ptr, spdylay_syn_stream *frame, spdylay_zlib *deflater) { uint8_t *framebuf = NULL; - size_t framelen; + ssize_t framelen; framelen = spdylay_frame_alloc_pack_nv(&framebuf, frame->nv, 18, deflater); if(framelen < 0) { return framelen; @@ -417,7 +448,7 @@ ssize_t spdylay_frame_pack_syn_reply(uint8_t **buf_ptr, spdylay_zlib *deflater) { uint8_t *framebuf = NULL; - size_t framelen; + ssize_t framelen; framelen = spdylay_frame_alloc_pack_nv(&framebuf, frame->nv, 14, deflater); if(framelen < 0) { return framelen; @@ -447,7 +478,7 @@ ssize_t spdylay_frame_pack_rst_stream(uint8_t **buf_ptr, spdylay_rst_stream *frame) { uint8_t *framebuf; - size_t framelen = 16; + ssize_t framelen = 16; framebuf = malloc(framelen); if(framebuf == NULL) { return SPDYLAY_ERR_NOMEM; diff --git a/lib/spdylay_frame.h b/lib/spdylay_frame.h index f97ec52..e56a0f2 100644 --- a/lib/spdylay_frame.h +++ b/lib/spdylay_frame.h @@ -37,6 +37,8 @@ #define SPDYLAY_LENGTH_MASK 0xffffff #define SPDYLAY_VERSION_MASK 0x7fff +#define SPDYLAY_DATA_FRAME_LENGTH 4096 + /* * Packs SYN_STREAM frame |frame| in wire frame format and store it in * |*buf_ptr|. This function allocates enough memory to store given @@ -141,6 +143,11 @@ void spdylay_frame_rst_stream_init(spdylay_rst_stream *frame, void spdylay_frame_rst_stream_free(spdylay_rst_stream *frame); +void spdylay_frame_data_init(spdylay_data *frame, int32_t stream_id, + spdylay_data_provider *data_prd); + +void spdylay_frame_data_free(spdylay_data *frame); + /* * Returns 1 if the first byte of this frame indicates it is a control * frame. @@ -152,4 +159,10 @@ int spdylay_frame_is_ctrl_frame(uint8_t first_byte); */ void spdylay_frame_nv_free(char **nv); +/* + * Makes a deep copy of |nv| and returns the copy. This function + * returns the pointer to the copy if it succeeds, or NULL. + */ +char** spdylay_frame_nv_copy(const char **nv); + #endif /* SPDYLAY_FRAME_H */ diff --git a/lib/spdylay_session.c b/lib/spdylay_session.c index 0ff6240..33f20b6 100644 --- a/lib/spdylay_session.c +++ b/lib/spdylay_session.c @@ -32,6 +32,21 @@ #include "spdylay_helper.h" +/* + * Returns non-zero value if |stream_id| is initiated by local host. + * Otherwrise returns 0. + */ +static int spdylay_session_is_my_stream_id(spdylay_session *session, + int32_t stream_id) +{ + int r; + if(stream_id == 0) { + return 0; + } + r = stream_id % 2; + return (session->server && r == 0) || r == 1; +} + spdylay_stream* spdylay_session_get_stream(spdylay_session *session, int32_t stream_id) { @@ -113,6 +128,12 @@ static void spdylay_outbound_item_free(spdylay_outbound_item *item) case SPDYLAY_SYN_REPLY: spdylay_frame_syn_reply_free(&item->frame->syn_reply); break; + case SPDYLAY_RST_STREAM: + spdylay_frame_rst_stream_free(&item->frame->rst_stream); + break; + case SPDYLAY_DATA: + spdylay_frame_data_free(&item->frame->data); + break; } free(item->frame); } @@ -170,6 +191,14 @@ int spdylay_session_add_frame(spdylay_session *session, } break; } + case SPDYLAY_DATA: { + spdylay_stream *stream = spdylay_session_get_stream + (session, frame->data.stream_id); + if(stream) { + item->pri = stream->pri; + } + break; + } }; r = spdylay_pq_push(&session->ob_pq, item); if(r != 0) { @@ -232,6 +261,7 @@ ssize_t spdylay_session_prep_frame(spdylay_session *session, spdylay_outbound_item *item, uint8_t **framebuf_ptr) { + /* TODO Get or validate stream ID here */ uint8_t *framebuf; ssize_t framebuflen; int r; @@ -264,6 +294,14 @@ ssize_t spdylay_session_prep_frame(spdylay_session *session, } break; } + case SPDYLAY_DATA: { + framebuflen = spdylay_session_pack_data(session, &framebuf, + &item->frame->data); + if(framebuflen < 0) { + return framebuflen; + } + break; + } default: framebuflen = SPDYLAY_ERR_INVALID_ARGUMENT; } @@ -280,10 +318,18 @@ static void spdylay_active_outbound_item_reset memset(aob, 0, sizeof(spdylay_active_outbound_item)); } +static spdylay_outbound_item* spdylay_session_get_ob_pq_top +(spdylay_session *session) +{ + return (spdylay_outbound_item*)spdylay_pq_top(&session->ob_pq); +} + static int spdylay_session_after_frame_sent(spdylay_session *session) { + /* TODO handle FIN flag. */ spdylay_frame *frame = session->aob.item->frame; - switch(session->aob.item->frame_type) { + spdylay_frame_type type = session->aob.item->frame_type; + switch(type) { case SPDYLAY_SYN_STREAM: { spdylay_stream *stream = spdylay_session_get_stream(session, frame->syn_stream.stream_id); @@ -302,14 +348,55 @@ static int spdylay_session_after_frame_sent(spdylay_session *session) } case SPDYLAY_RST_STREAM: spdylay_session_close_stream(session, frame->rst_stream.stream_id); + break; + case SPDYLAY_DATA: + if((frame->data.flags & SPDYLAY_FLAG_FIN) && + !spdylay_session_is_my_stream_id(session, frame->data.stream_id)) { + /* We send all data requested by peer, so close the stream. */ + spdylay_session_close_stream(session, frame->data.stream_id); + } + break; }; - /* TODO If frame is data frame, we need to sent all chunk of - data.*/ - spdylay_active_outbound_item_reset(&session->aob); + if(type == SPDYLAY_DATA) { + int r; + if(frame->data.flags & SPDYLAY_FLAG_FIN) { + spdylay_active_outbound_item_reset(&session->aob); + } else if(spdylay_pq_empty(&session->ob_pq) || + session->aob.item->pri <= + spdylay_session_get_ob_pq_top(session)->pri) { + /* If priority of this stream is higher or equal to other stream + waiting at the top of the queue, we continue to send this + data. */ + /* We assume that buffer has at least + SPDYLAY_DATA_FRAME_LENGTH. */ + r = spdylay_session_pack_data_overwrite(session, + session->aob.framebuf, + SPDYLAY_DATA_FRAME_LENGTH, + &frame->data); + if(r < 0) { + spdylay_active_outbound_item_reset(&session->aob); + return r; + } + session->aob.framebufoff = 0; + } else { + r = spdylay_pq_push(&session->ob_pq, session->aob.item); + if(r == 0) { + session->aob.item = NULL; + spdylay_active_outbound_item_reset(&session->aob); + } else { + spdylay_active_outbound_item_reset(&session->aob); + return r; + } + } + } else { + spdylay_active_outbound_item_reset(&session->aob); + } + return 0; } int spdylay_session_send(spdylay_session *session) { + int r; printf("session_send\n"); while(session->aob.item || !spdylay_pq_empty(&session->ob_pq)) { const uint8_t *data; @@ -320,7 +407,6 @@ int spdylay_session_send(spdylay_session *session) uint8_t *framebuf; ssize_t framebuflen; spdylay_pq_pop(&session->ob_pq); - /* TODO Get or validate stream id here */ framebuflen = spdylay_session_prep_frame(session, item, &framebuf); if(framebuflen < 0) { /* TODO Call error callback? */ @@ -331,6 +417,7 @@ int spdylay_session_send(spdylay_session *session) session->aob.item = item; session->aob.framebuf = framebuf; session->aob.framebuflen = framebuflen; + /* TODO Call before_send callback */ } data = session->aob.framebuf + session->aob.framebufoff; datalen = session->aob.framebuflen - session->aob.framebufoff; @@ -343,11 +430,13 @@ int spdylay_session_send(spdylay_session *session) return sentlen; } } else { - printf("sent %d bytes\n", sentlen); session->aob.framebufoff += sentlen; if(session->aob.framebufoff == session->aob.framebuflen) { /* Frame has completely sent */ - spdylay_session_after_frame_sent(session); + r = spdylay_session_after_frame_sent(session); + if(r < 0) { + return r; + } } else { /* partial write */ break; @@ -437,17 +526,6 @@ static int spdylay_session_is_new_peer_stream_id(spdylay_session *session, } } -static int spdylay_session_is_my_stream_id(spdylay_session *session, - int32_t stream_id) -{ - int r; - if(stream_id == 0) { - return 0; - } - r = stream_id % 2; - return (session->server && r == 0) || r == 1; -} - /* * Validates SYN_STREAM frame |frame|. This function returns 0 if it * succeeds, or -1. @@ -683,6 +761,53 @@ int spdylay_session_want_write(spdylay_session *session) return session->aob.item != NULL || !spdylay_pq_empty(&session->ob_pq); } +int spdylay_reply_submit(spdylay_session *session, + int32_t stream_id, const char **nv, + spdylay_data_provider *data_prd) +{ + int r; + spdylay_frame *frame; + spdylay_frame *data_frame = NULL; + char **nv_copy; + uint8_t flags = 0; + frame = malloc(sizeof(spdylay_frame)); + if(frame == NULL) { + return SPDYLAY_ERR_NOMEM; + } + nv_copy = spdylay_frame_nv_copy(nv); + if(nv_copy == NULL) { + free(frame); + return SPDYLAY_ERR_NOMEM; + } + if(data_prd == NULL) { + flags |= SPDYLAY_FLAG_FIN; + } + spdylay_frame_syn_reply_init(&frame->syn_reply, flags, stream_id, + nv_copy); + r = spdylay_session_add_frame(session, SPDYLAY_SYN_REPLY, frame); + if(r != 0) { + spdylay_frame_syn_reply_free(&frame->syn_reply); + free(frame); + return r; + } + if(data_prd != NULL) { + /* TODO If error is not FATAL, we should add RST_STREAM frame to + cancel this stream. */ + data_frame = malloc(sizeof(spdylay_frame)); + if(data_frame == NULL) { + return SPDYLAY_ERR_NOMEM; + } + spdylay_frame_data_init(&data_frame->data, stream_id, data_prd); + r = spdylay_session_add_frame(session, SPDYLAY_DATA, data_frame); + if(r != 0) { + spdylay_frame_data_free(&data_frame->data); + free(data_frame); + return r; + } + } + return 0; +} + int spdylay_req_submit(spdylay_session *session, const char *path) { int r; @@ -702,5 +827,48 @@ int spdylay_req_submit(spdylay_session *session, const char *path) spdylay_frame_syn_stream_init(&frame->syn_stream, SPDYLAY_FLAG_FIN, 0, 0, 0, nv); r = spdylay_session_add_frame(session, SPDYLAY_SYN_STREAM, frame); - assert(r == 0); + return r; +} + +ssize_t spdylay_session_pack_data(spdylay_session *session, + uint8_t **buf_ptr, spdylay_data *frame) +{ + uint8_t *framebuf; + ssize_t framelen = SPDYLAY_DATA_FRAME_LENGTH; + framebuf = malloc(framelen); + if(framebuf == NULL) { + return SPDYLAY_ERR_NOMEM; + } + framelen = spdylay_session_pack_data_overwrite(session, framebuf, framelen, + frame); + if(framelen < 0) { + free(framebuf); + } + *buf_ptr = framebuf; + return framelen; +} + +ssize_t spdylay_session_pack_data_overwrite(spdylay_session *session, + uint8_t *buf, size_t len, + spdylay_data *frame) +{ + ssize_t r; + int eof = 0; + uint8_t flags = 0; + r = frame->data_prd.read_callback + (session, buf+8, len-8, &eof, &frame->data_prd.source, session->user_data); + if(r < 0) { + return r; + } else if(len < r) { + return SPDYLAY_ERR_CALLBACK_FAILURE; + } + memset(buf, 0, len); + spdylay_put_uint32be(&buf[0], frame->stream_id); + spdylay_put_uint32be(&buf[4], 8+r); + if(eof) { + flags |= SPDYLAY_FLAG_FIN; + } + buf[4] = flags; + frame->flags = flags; + return r+8; } diff --git a/lib/spdylay_session.h b/lib/spdylay_session.h index 164dc4d..0188a3a 100644 --- a/lib/spdylay_session.h +++ b/lib/spdylay_session.h @@ -147,4 +147,25 @@ int spdylay_session_on_syn_reply_received(spdylay_session *session, spdylay_stream* spdylay_session_get_stream(spdylay_session *session, int32_t stream_id); +/* + * Packs DATA frame |frame| in wire frame format and store it in + * |*buf_ptr|. This function always allocates + * 8+SPDYLAY_DATA_CHUNK_LENGTH bytes. It packs header in first 8 + * bytes. Remaining bytes are filled using frame->data_prd. This + * function returns the size of packed frame if it succeeds, or + * negative error code. + */ +ssize_t spdylay_session_pack_data(spdylay_session *session, + uint8_t **buf_ptr, spdylay_data *frame); + +/* + * Packs DATA frame |frame| in wire frame format and store it in + * |buf|. |len| must be greater than or equal to 8. This function + * returns the sizeof packed frame if it succeeds, or negative error + * code. + */ +ssize_t spdylay_session_pack_data_overwrite(spdylay_session *session, + uint8_t *buf, size_t len, + spdylay_data *frame); + #endif /* SPDYLAY_SESSION_H */ diff --git a/tests/main.c b/tests/main.c index 17bc8ef..ebdf19d 100644 --- a/tests/main.c +++ b/tests/main.c @@ -79,6 +79,7 @@ int main() test_spdylay_session_send_syn_stream) || !CU_add_test(pSuite, "session_send_syn_reply", test_spdylay_session_send_syn_reply) || + !CU_add_test(pSuite, "reply_submit", test_spdylay_reply_submit) || !CU_add_test(pSuite, "frame_unpack_nv", test_spdylay_frame_unpack_nv) || !CU_add_test(pSuite, "frame_count_nv_space", test_spdylay_frame_count_nv_space)) { diff --git a/tests/spdylay_session_test.c b/tests/spdylay_session_test.c index 719c3c2..a74c671 100644 --- a/tests/spdylay_session_test.c +++ b/tests/spdylay_session_test.c @@ -50,6 +50,7 @@ typedef struct { accumulator *acc; scripted_data_feed *df; int valid, invalid; + size_t data_source_length; } my_user_data; static void scripted_data_feed_init(scripted_data_feed *df, @@ -114,6 +115,24 @@ static void on_invalid_ctrl_recv_callback(spdylay_session *session, ++ud->invalid; } +static ssize_t fixed_length_data_source_read_callback +(spdylay_session *session, uint8_t *buf, size_t len, int *eof, + spdylay_data_source *source, void *user_data) +{ + my_user_data *ud = (my_user_data*)user_data; + size_t wlen; + if(len < ud->data_source_length) { + wlen = len; + } else { + wlen = ud->data_source_length; + } + ud->data_source_length -= wlen; + if(ud->data_source_length == 0) { + *eof = 1; + } + return wlen; +} + static char** dup_nv(const char **src) { int i; @@ -368,3 +387,33 @@ void test_spdylay_session_send_syn_reply() spdylay_session_del(session); } + +void test_spdylay_reply_submit() +{ + spdylay_session *session; + spdylay_session_callbacks callbacks = { + null_send_callback, + NULL, + NULL, + NULL + }; + const char *nv[] = { NULL }; + spdylay_stream *stream; + int32_t stream_id = 2; + spdylay_data_source source; + spdylay_data_provider data_prd = { + source, + fixed_length_data_source_read_callback + }; + my_user_data ud; + + ud.data_source_length = 64*1024; + + CU_ASSERT(0 == spdylay_session_client_new(&session, &callbacks, &ud)); + spdylay_session_open_stream(session, stream_id, SPDYLAY_FLAG_NONE, 3, + SPDYLAY_STREAM_OPENING); + CU_ASSERT(0 == spdylay_reply_submit(session, stream_id, nv, &data_prd)); + CU_ASSERT(0 == spdylay_session_send(session)); + spdylay_session_del(session); +} + diff --git a/tests/spdylay_session_test.h b/tests/spdylay_session_test.h index 3dcd0a1..19158ac 100644 --- a/tests/spdylay_session_test.h +++ b/tests/spdylay_session_test.h @@ -32,5 +32,6 @@ void test_spdylay_session_on_syn_stream_received(); void test_spdylay_session_on_syn_reply_received(); void test_spdylay_session_send_syn_stream(); void test_spdylay_session_send_syn_reply(); +void test_spdylay_reply_submit(); #endif // SPDYLAY_SESSION_TEST_H