http2: setup the new pushed stream properly

This commit is contained in:
Daniel Stenberg 2015-06-01 14:20:57 +02:00
parent ea7134ac87
commit feea9263e9
6 changed files with 127 additions and 38 deletions

View File

@ -164,6 +164,7 @@ CURLcode Curl_http_setup_conn(struct connectdata *conn)
conn->data->req.protop = http; conn->data->req.protop = http;
Curl_http2_setup_conn(conn); Curl_http2_setup_conn(conn);
Curl_http2_setup_req(conn->data);
return CURLE_OK; return CURLE_OK;
} }

View File

@ -176,6 +176,7 @@ struct HTTP {
const uint8_t *upload_mem; /* points to a buffer to read from */ const uint8_t *upload_mem; /* points to a buffer to read from */
size_t upload_len; /* size of the buffer 'upload_mem' points to */ size_t upload_len; /* size of the buffer 'upload_mem' points to */
curl_off_t upload_left; /* number of bytes left to upload */ curl_off_t upload_left; /* number of bytes left to upload */
Curl_send_buffer *push_recvbuf; /* store incoming push headers */
#endif #endif
}; };

View File

@ -95,12 +95,9 @@ static CURLcode http2_disconnect(struct connectdata *conn,
} }
/* called from Curl_http_setup_conn */ /* called from Curl_http_setup_conn */
void Curl_http2_setup_conn(struct connectdata *conn) void Curl_http2_setup_req(struct SessionHandle *data)
{ {
struct HTTP *http = conn->data->req.protop; struct HTTP *http = data->req.protop;
conn->proto.httpc.settings.max_concurrent_streams =
DEFAULT_MAX_CONCURRENT_STREAMS;
http->nread_header_recvbuf = 0; http->nread_header_recvbuf = 0;
http->bodystarted = FALSE; http->bodystarted = FALSE;
@ -109,13 +106,18 @@ void Curl_http2_setup_conn(struct connectdata *conn)
http->pauselen = 0; http->pauselen = 0;
http->error_code = NGHTTP2_NO_ERROR; http->error_code = NGHTTP2_NO_ERROR;
http->closed = FALSE; http->closed = FALSE;
http->mem = data->state.buffer;
/* where to store incoming data for this stream and how big the buffer is */
http->mem = conn->data->state.buffer;
http->len = BUFSIZE; http->len = BUFSIZE;
http->memlen = 0; http->memlen = 0;
} }
/* called from Curl_http_setup_conn */
void Curl_http2_setup_conn(struct connectdata *conn)
{
conn->proto.httpc.settings.max_concurrent_streams =
DEFAULT_MAX_CONCURRENT_STREAMS;
}
/* /*
* HTTP2 handler interface. This isn't added to the general list of protocols * HTTP2 handler interface. This isn't added to the general list of protocols
* but will be used at run-time when the protocol is dynamically switched from * but will be used at run-time when the protocol is dynamically switched from
@ -228,46 +230,98 @@ struct curl_headerpair *curl_pushheader_bynum(struct curl_pushheaders *h,
return NULL; return NULL;
} }
static CURL *duphandle(struct SessionHandle *data)
{
struct SessionHandle *second = curl_easy_duphandle(data);
if(second) {
/* setup the request struct */
struct HTTP *http = calloc(1, sizeof(struct HTTP));
if(!http) {
(void)Curl_close(second);
second = NULL;
}
else {
second->req.protop = http;
http->header_recvbuf = Curl_add_buffer_init();
if(!http->header_recvbuf) {
free(http);
(void)Curl_close(second);
second = NULL;
}
else
Curl_http2_setup_req(second);
}
}
return second;
}
static int push_promise(struct SessionHandle *data, static int push_promise(struct SessionHandle *data,
struct connectdata *conn,
const nghttp2_push_promise *frame) const nghttp2_push_promise *frame)
{ {
int rv; int rv;
DEBUGF(infof(data, "PUSH_PROMISE received, stream %u!\n",
frame->promised_stream_id));
if(data->multi->push_cb) { if(data->multi->push_cb) {
struct HTTP *stream;
struct curl_pushheaders heads;
CURLMcode rc;
struct http_conn *httpc;
/* clone the parent */ /* clone the parent */
CURL *newhandle = curl_easy_duphandle(data); CURL *newhandle = duphandle(data);
if(!newhandle) { if(!newhandle) {
infof(data, "failed to duplicate handle\n"); infof(data, "failed to duplicate handle\n");
rv = 1; /* FAIL HARD */ rv = 1; /* FAIL HARD */
goto fail;
} }
else {
struct curl_pushheaders heads;
heads.data = data; heads.data = data;
heads.frame = frame; heads.frame = frame;
/* ask the application */ /* ask the application */
DEBUGF(infof(data, "Got PUSH_PROMISE, ask application!\n")); DEBUGF(infof(data, "Got PUSH_PROMISE, ask application!\n"));
stream = data->req.protop;
#ifdef CURLDEBUG
fprintf(stderr, "PUSHHDR %s\n", stream->push_recvbuf->buffer);
#endif
rv = data->multi->push_cb(data, newhandle, rv = data->multi->push_cb(data, newhandle,
frame->nvlen, &heads, frame->nvlen, &heads,
data->multi->push_userp); data->multi->push_userp);
if(rv) if(rv) {
/* denied, kill off the new handle again */ /* denied, kill off the new handle again */
(void)Curl_close(newhandle); (void)Curl_close(newhandle);
else { goto fail;
/* approved, add to the multi handle */ }
CURLMcode rc = curl_multi_add_handle(data->multi, newhandle);
/* approved, add to the multi handle and immediately switch to PERFORM
state with the given connection !*/
rc = Curl_multi_add_perform(data->multi, newhandle, conn);
if(rc) { if(rc) {
infof(data, "failed to add handle to multi\n"); infof(data, "failed to add handle to multi\n");
Curl_close(newhandle); Curl_close(newhandle);
rv = 1; rv = 1;
goto fail;
}
httpc = &conn->proto.httpc;
/* put the newhandle in the hash with the stream id as key */
if(!Curl_hash_add(&httpc->streamsh,
(size_t *)&frame->promised_stream_id,
sizeof(frame->promised_stream_id), newhandle)) {
failf(conn->data, "Couldn't add stream to hash!");
rv = 1;
} }
else else
rv = 0; rv = 0;
} }
}
}
else { else {
DEBUGF(infof(data, "Got PUSH_PROMISE, ignore it!\n")); DEBUGF(infof(data, "Got PUSH_PROMISE, ignore it!\n"));
rv = 1; rv = 1;
} }
fail:
return rv; return rv;
} }
@ -358,7 +412,7 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
Curl_expire(data_s, 1); Curl_expire(data_s, 1);
break; break;
case NGHTTP2_PUSH_PROMISE: case NGHTTP2_PUSH_PROMISE:
rv = push_promise(data_s, &frame->push_promise); rv = push_promise(data_s, conn, &frame->push_promise);
if(rv) { /* deny! */ if(rv) { /* deny! */
rv = nghttp2_submit_rst_stream(session, NGHTTP2_FLAG_NONE, rv = nghttp2_submit_rst_stream(session, NGHTTP2_FLAG_NONE,
frame->push_promise.promised_stream_id, frame->push_promise.promised_stream_id,
@ -591,11 +645,6 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
(void)frame; (void)frame;
(void)flags; (void)flags;
/* Ignore PUSH_PROMISE for now */
if(frame->hd.type != NGHTTP2_HEADERS) {
return 0;
}
DEBUGASSERT(stream_id); /* should never be a zero stream ID here */ DEBUGASSERT(stream_id); /* should never be a zero stream ID here */
/* get the stream from the hash based on Stream ID */ /* get the stream from the hash based on Stream ID */
@ -615,6 +664,21 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
consequence is handled in on_frame_recv(). */ consequence is handled in on_frame_recv(). */
return 0; return 0;
/* Store received PUSH_PROMISE headers to be used when the subsequent
PUSH_PROMISE callback comes */
if(frame->hd.type == NGHTTP2_PUSH_PROMISE) {
fprintf(stderr, "*** PUSH_PROMISE headers on stream %u for %u\n",
stream_id,
frame->push_promise.promised_stream_id);
if(!stream->push_recvbuf)
stream->push_recvbuf = Curl_add_buffer_init();
Curl_add_buffer(stream->push_recvbuf, name, namelen);
Curl_add_buffer(stream->push_recvbuf, ":", 1);
Curl_add_buffer(stream->push_recvbuf, value, valuelen);
Curl_add_buffer(stream->push_recvbuf, "\r\n", 2);
return 0;
}
if(namelen == sizeof(":status") - 1 && if(namelen == sizeof(":status") - 1 &&
memcmp(":status", name, namelen) == 0) { memcmp(":status", name, namelen) == 0) {
/* nghttp2 guarantees :status is received first and only once, and /* nghttp2 guarantees :status is received first and only once, and

View File

@ -46,6 +46,7 @@ CURLcode Curl_http2_switched(struct connectdata *conn,
const char *data, size_t nread); const char *data, size_t nread);
/* called from Curl_http_setup_conn */ /* called from Curl_http_setup_conn */
void Curl_http2_setup_conn(struct connectdata *conn); void Curl_http2_setup_conn(struct connectdata *conn);
void Curl_http2_setup_req(struct SessionHandle *data);
#else /* USE_NGHTTP2 */ #else /* USE_NGHTTP2 */
#define Curl_http2_init(x) CURLE_UNSUPPORTED_PROTOCOL #define Curl_http2_init(x) CURLE_UNSUPPORTED_PROTOCOL
#define Curl_http2_send_request(x) CURLE_UNSUPPORTED_PROTOCOL #define Curl_http2_send_request(x) CURLE_UNSUPPORTED_PROTOCOL
@ -53,6 +54,7 @@ void Curl_http2_setup_conn(struct connectdata *conn);
#define Curl_http2_setup(x) CURLE_UNSUPPORTED_PROTOCOL #define Curl_http2_setup(x) CURLE_UNSUPPORTED_PROTOCOL
#define Curl_http2_switched(x,y,z) CURLE_UNSUPPORTED_PROTOCOL #define Curl_http2_switched(x,y,z) CURLE_UNSUPPORTED_PROTOCOL
#define Curl_http2_setup_conn(x) #define Curl_http2_setup_conn(x)
#define Curl_http2_setup_req(x)
#endif #endif
#endif /* HEADER_CURL_HTTP2_H */ #endif /* HEADER_CURL_HTTP2_H */

View File

@ -950,6 +950,21 @@ static bool multi_ischanged(struct Curl_multi *multi, bool clear)
return retval; return retval;
} }
CURLMcode Curl_multi_add_perform(struct Curl_multi *multi,
struct SessionHandle *data,
struct connectdata *conn)
{
CURLMcode rc;
rc = curl_multi_add_handle(multi, data);
if(!rc) {
/* take this handle to the perform state right away */
multistate(data, CURLM_STATE_PERFORM);
data->easy_conn = conn;
}
return rc;
}
static CURLMcode multi_runsingle(struct Curl_multi *multi, static CURLMcode multi_runsingle(struct Curl_multi *multi,
struct timeval now, struct timeval now,
struct SessionHandle *data) struct SessionHandle *data)

View File

@ -88,4 +88,10 @@ void Curl_multi_connchanged(struct Curl_multi *multi);
void Curl_multi_closed(struct connectdata *conn, curl_socket_t s); void Curl_multi_closed(struct connectdata *conn, curl_socket_t s);
/*
* Add a handle and move it into PERFORM state at once. For pushed streams.
*/
CURLMcode Curl_multi_add_perform(struct Curl_multi *multi,
struct SessionHandle *data,
struct connectdata *conn);
#endif /* HEADER_CURL_MULTIIF_H */ #endif /* HEADER_CURL_MULTIIF_H */