mirror of
https://github.com/moparisthebest/curl
synced 2025-01-11 05:58:01 -05:00
mqtt: improve the state machine
To handle PUBLISH before SUBACK and more. Updated the existing tests and added three new ones. Reported-by: Christoph Krey Bug: https://curl.haxx.se/mail/lib-2020-04/0021.html Closes #5246
This commit is contained in:
parent
d1a2816b41
commit
5811beba39
172
lib/mqtt.c
172
lib/mqtt.c
@ -51,8 +51,8 @@
|
|||||||
#define MQTT_MSG_SUBACK 0x90
|
#define MQTT_MSG_SUBACK 0x90
|
||||||
#define MQTT_MSG_DISCONNECT 0xe0
|
#define MQTT_MSG_DISCONNECT 0xe0
|
||||||
|
|
||||||
#define MQTT_CONNACK_LEN 4
|
#define MQTT_CONNACK_LEN 2
|
||||||
#define MQTT_SUBACK_LEN 5
|
#define MQTT_SUBACK_LEN 3
|
||||||
#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
|
#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -194,13 +194,9 @@ static CURLcode mqtt_verify_connack(struct connectdata *conn)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* verify CONNACK */
|
/* verify CONNACK */
|
||||||
if(readbuf[0] != MQTT_MSG_CONNACK ||
|
if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
|
||||||
readbuf[1] != 0x02 ||
|
failf(data, "Expected %02x%02x but got %02x%02x",
|
||||||
readbuf[2] != 0x00 ||
|
0x00, 0x00, readbuf[0], readbuf[1]);
|
||||||
readbuf[3] != 0x00) {
|
|
||||||
failf(data, "Expected %02x%02x%02x%02x but got %02x%02x%02x%02x",
|
|
||||||
MQTT_MSG_CONNACK, 0x02, 0x00, 0x00,
|
|
||||||
readbuf[0], readbuf[1], readbuf[2], readbuf[3]);
|
|
||||||
result = CURLE_WEIRD_SERVER_REPLY;
|
result = CURLE_WEIRD_SERVER_REPLY;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -285,6 +281,9 @@ fail:
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Called when the first byte was already read.
|
||||||
|
*/
|
||||||
static CURLcode mqtt_verify_suback(struct connectdata *conn)
|
static CURLcode mqtt_verify_suback(struct connectdata *conn)
|
||||||
{
|
{
|
||||||
CURLcode result;
|
CURLcode result;
|
||||||
@ -307,11 +306,9 @@ static CURLcode mqtt_verify_suback(struct connectdata *conn)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* verify SUBACK */
|
/* verify SUBACK */
|
||||||
if(readbuf[0] != MQTT_MSG_SUBACK ||
|
if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
|
||||||
readbuf[1] != 0x03 ||
|
readbuf[1] != (mqtt->packetid & 0xff) ||
|
||||||
readbuf[2] != ((mqtt->packetid >> 8) & 0xff) ||
|
readbuf[2] != 0x00)
|
||||||
readbuf[3] != (mqtt->packetid & 0xff) ||
|
|
||||||
readbuf[4] != 0x00)
|
|
||||||
result = CURLE_WEIRD_SERVER_REPLY;
|
result = CURLE_WEIRD_SERVER_REPLY;
|
||||||
|
|
||||||
fail:
|
fail:
|
||||||
@ -377,67 +374,97 @@ static size_t mqtt_decode_len(unsigned char *buf,
|
|||||||
mult *= 128;
|
mult *= 128;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(lenbytes)
|
||||||
*lenbytes = i;
|
*lenbytes = i;
|
||||||
|
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef CURLDEBUG
|
||||||
|
static const char *statenames[]={
|
||||||
|
"MQTT_FIRST",
|
||||||
|
"MQTT_REMAINING_LENGTH",
|
||||||
|
"MQTT_CONNACK",
|
||||||
|
"MQTT_SUBACK",
|
||||||
|
"MQTT_SUBACK_COMING",
|
||||||
|
"MQTT_PUBWAIT",
|
||||||
|
"MQTT_PUB_REMAIN"
|
||||||
|
};
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* The only way to change state */
|
||||||
|
static void mqstate(struct connectdata *conn,
|
||||||
|
enum mqttstate state,
|
||||||
|
enum mqttstate nextstate) /* used if state == FIRST */
|
||||||
|
{
|
||||||
|
struct mqtt_conn *mqtt = &conn->proto.mqtt;
|
||||||
|
#ifdef CURLDEBUG
|
||||||
|
infof(conn->data, "%s (from %s) (next is %s)\n",
|
||||||
|
statenames[state],
|
||||||
|
statenames[mqtt->state],
|
||||||
|
(state == MQTT_FIRST)? statenames[nextstate] : "");
|
||||||
|
#endif
|
||||||
|
mqtt->state = state;
|
||||||
|
if(state == MQTT_FIRST)
|
||||||
|
mqtt->nextstate = nextstate;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* for the publish packet */
|
/* for the publish packet */
|
||||||
#define MQTT_HEADER_LEN 5 /* max 5 bytes */
|
#define MQTT_HEADER_LEN 5 /* max 5 bytes */
|
||||||
|
|
||||||
static CURLcode mqtt_read_publish(struct connectdata *conn,
|
static CURLcode mqtt_read_publish(struct connectdata *conn,
|
||||||
bool *done)
|
bool *done)
|
||||||
{
|
{
|
||||||
CURLcode result;
|
CURLcode result = CURLE_OK;
|
||||||
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
|
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
|
||||||
ssize_t nread;
|
ssize_t nread;
|
||||||
struct Curl_easy *data = conn->data;
|
struct Curl_easy *data = conn->data;
|
||||||
unsigned char *pkt = (unsigned char *)data->state.buffer;
|
unsigned char *pkt = (unsigned char *)data->state.buffer;
|
||||||
size_t remlen, lenbytes;
|
size_t remlen;
|
||||||
struct mqtt_conn *mqtt = &conn->proto.mqtt;
|
struct mqtt_conn *mqtt = &conn->proto.mqtt;
|
||||||
struct MQTT *mq = data->req.protop;
|
struct MQTT *mq = data->req.protop;
|
||||||
|
unsigned char packet;
|
||||||
|
|
||||||
switch(mqtt->state) {
|
switch(mqtt->state) {
|
||||||
case MQTT_SUBWAIT:
|
MQTT_SUBACK_COMING:
|
||||||
/* Read the initial byte and the entire Remaining Length field
|
case MQTT_SUBACK_COMING:
|
||||||
in this state */
|
result = mqtt_verify_suback(conn);
|
||||||
result = Curl_read(conn, sockfd, (char *)&pkt[mq->npacket], 1, &nread);
|
|
||||||
if(result)
|
if(result)
|
||||||
goto end;
|
break;
|
||||||
if(data->set.verbose)
|
|
||||||
Curl_debug(data, CURLINFO_HEADER_IN, (char *)&pkt[mq->npacket], 1);
|
mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
|
||||||
/* we are expecting a PUBLISH message */
|
break;
|
||||||
if(!mq->npacket && ((pkt[0] & 0xf0) != MQTT_MSG_PUBLISH)) {
|
|
||||||
if(pkt[0] == MQTT_MSG_DISCONNECT) {
|
case MQTT_SUBACK:
|
||||||
|
case MQTT_PUBWAIT:
|
||||||
|
/* we are expecting PUBLISH or SUBACK */
|
||||||
|
packet = mq->firstbyte & 0xf0;
|
||||||
|
if(packet == MQTT_MSG_PUBLISH)
|
||||||
|
mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
|
||||||
|
else if(packet == MQTT_MSG_SUBACK) {
|
||||||
|
mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
|
||||||
|
goto MQTT_SUBACK_COMING;
|
||||||
|
}
|
||||||
|
else if(packet == MQTT_MSG_DISCONNECT) {
|
||||||
infof(data, "Got DISCONNECT\n");
|
infof(data, "Got DISCONNECT\n");
|
||||||
*done = TRUE;
|
*done = TRUE;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
result = CURLE_WEIRD_SERVER_REPLY;
|
result = CURLE_WEIRD_SERVER_REPLY;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
else if((mq->npacket >= 1) && !(pkt[mq->npacket] & 0x80))
|
|
||||||
/* as long as the high bit is set in the length byte, we read one more
|
|
||||||
byte, then get the remainder of the PUBLISH */
|
|
||||||
mqtt->state = MQTT_SUB_REMAIN;
|
|
||||||
mq->npacket++;
|
|
||||||
if(mqtt->state == MQTT_SUBWAIT)
|
|
||||||
return result;
|
|
||||||
|
|
||||||
/* -- switched state -- */
|
/* -- switched state -- */
|
||||||
|
remlen = mq->remaining_length;
|
||||||
/* remember the first byte */
|
|
||||||
mq->firstbyte = pkt[0];
|
|
||||||
|
|
||||||
remlen = mqtt_decode_len(&pkt[1], 4, &lenbytes);
|
|
||||||
|
|
||||||
infof(data, "Remaining length: %zd bytes\n", remlen);
|
infof(data, "Remaining length: %zd bytes\n", remlen);
|
||||||
Curl_pgrsSetDownloadSize(data, remlen);
|
Curl_pgrsSetDownloadSize(data, remlen);
|
||||||
data->req.bytecount = 0;
|
data->req.bytecount = 0;
|
||||||
data->req.size = remlen;
|
data->req.size = remlen;
|
||||||
mq->npacket = remlen; /* get this many bytes */
|
mq->npacket = remlen; /* get this many bytes */
|
||||||
/* FALLTHROUGH */
|
/* FALLTHROUGH */
|
||||||
case MQTT_SUB_REMAIN: {
|
case MQTT_PUB_REMAIN: {
|
||||||
/* read rest of packet, but no more. Cap to buffer size */
|
/* read rest of packet, but no more. Cap to buffer size */
|
||||||
struct SingleRequest *k = &data->req;
|
struct SingleRequest *k = &data->req;
|
||||||
size_t rest = mq->npacket;
|
size_t rest = mq->npacket;
|
||||||
@ -450,6 +477,11 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
|
|||||||
}
|
}
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
if(!nread) {
|
||||||
|
infof(data, "server disconnected\n");
|
||||||
|
result = CURLE_PARTIAL_FILE;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
if(data->set.verbose)
|
if(data->set.verbose)
|
||||||
Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
|
Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
|
||||||
|
|
||||||
@ -465,7 +497,7 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
|
|||||||
|
|
||||||
if(!mq->npacket)
|
if(!mq->npacket)
|
||||||
/* no more PUBLISH payload, back to subscribe wait state */
|
/* no more PUBLISH payload, back to subscribe wait state */
|
||||||
mqtt->state = MQTT_SUBWAIT;
|
mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -481,7 +513,6 @@ static CURLcode mqtt_do(struct connectdata *conn, bool *done)
|
|||||||
{
|
{
|
||||||
CURLcode result = CURLE_OK;
|
CURLcode result = CURLE_OK;
|
||||||
struct Curl_easy *data = conn->data;
|
struct Curl_easy *data = conn->data;
|
||||||
struct mqtt_conn *mqtt = &conn->proto.mqtt;
|
|
||||||
|
|
||||||
*done = FALSE; /* unconditionally */
|
*done = FALSE; /* unconditionally */
|
||||||
|
|
||||||
@ -490,7 +521,7 @@ static CURLcode mqtt_do(struct connectdata *conn, bool *done)
|
|||||||
failf(data, "Error %d sending MQTT CONN request", result);
|
failf(data, "Error %d sending MQTT CONN request", result);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
mqtt->state = MQTT_CONNACK;
|
mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
|
||||||
return CURLE_OK;
|
return CURLE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -500,6 +531,10 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
|
|||||||
struct mqtt_conn *mqtt = &conn->proto.mqtt;
|
struct mqtt_conn *mqtt = &conn->proto.mqtt;
|
||||||
struct Curl_easy *data = conn->data;
|
struct Curl_easy *data = conn->data;
|
||||||
struct MQTT *mq = data->req.protop;
|
struct MQTT *mq = data->req.protop;
|
||||||
|
ssize_t nread;
|
||||||
|
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
|
||||||
|
unsigned char *pkt = (unsigned char *)data->state.buffer;
|
||||||
|
unsigned char byte;
|
||||||
|
|
||||||
*done = FALSE;
|
*done = FALSE;
|
||||||
|
|
||||||
@ -512,7 +547,41 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
|
||||||
switch(mqtt->state) {
|
switch(mqtt->state) {
|
||||||
|
case MQTT_FIRST:
|
||||||
|
/* Read the initial byte only */
|
||||||
|
result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
|
||||||
|
if(result)
|
||||||
|
break;
|
||||||
|
if(data->set.verbose)
|
||||||
|
Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
|
||||||
|
/* remember the first byte */
|
||||||
|
mq->npacket = 0;
|
||||||
|
mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
|
||||||
|
/* FALLTHROUGH */
|
||||||
|
case MQTT_REMAINING_LENGTH:
|
||||||
|
do {
|
||||||
|
result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
|
||||||
|
if(result)
|
||||||
|
break;
|
||||||
|
if(data->set.verbose)
|
||||||
|
Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
|
||||||
|
pkt[mq->npacket++] = byte;
|
||||||
|
} while((byte & 0x80) && (mq->npacket < 4));
|
||||||
|
mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
|
||||||
|
mq->npacket = 0;
|
||||||
|
if(mq->remaining_length) {
|
||||||
|
mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
mqstate(conn, MQTT_FIRST, MQTT_FIRST);
|
||||||
|
|
||||||
|
if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
|
||||||
|
infof(data, "Got DISCONNECT\n");
|
||||||
|
*done = TRUE;
|
||||||
|
}
|
||||||
|
break;
|
||||||
case MQTT_CONNACK:
|
case MQTT_CONNACK:
|
||||||
result = mqtt_verify_connack(conn);
|
result = mqtt_verify_connack(conn);
|
||||||
if(result)
|
if(result)
|
||||||
@ -524,24 +593,19 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
|
|||||||
result = mqtt_disconnect(conn);
|
result = mqtt_disconnect(conn);
|
||||||
*done = TRUE;
|
*done = TRUE;
|
||||||
}
|
}
|
||||||
|
mqtt->nextstate = MQTT_FIRST;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
result = mqtt_subscribe(conn);
|
result = mqtt_subscribe(conn);
|
||||||
if(!result)
|
if(!result) {
|
||||||
mqtt->state = MQTT_SUBACK;
|
mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case MQTT_SUBACK:
|
case MQTT_SUBACK:
|
||||||
result = mqtt_verify_suback(conn);
|
case MQTT_PUBWAIT:
|
||||||
if(result)
|
case MQTT_PUB_REMAIN:
|
||||||
break;
|
|
||||||
|
|
||||||
mqtt->state = MQTT_SUBWAIT;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MQTT_SUBWAIT:
|
|
||||||
case MQTT_SUB_REMAIN:
|
|
||||||
result = mqtt_read_publish(conn, done);
|
result = mqtt_read_publish(conn, done);
|
||||||
if(result)
|
if(result)
|
||||||
break;
|
break;
|
||||||
|
24
lib/mqtt.h
24
lib/mqtt.h
@ -26,13 +26,22 @@
|
|||||||
extern const struct Curl_handler Curl_handler_mqtt;
|
extern const struct Curl_handler Curl_handler_mqtt;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
enum mqttstate {
|
||||||
|
MQTT_FIRST, /* 0 */
|
||||||
|
MQTT_REMAINING_LENGTH, /* 1 */
|
||||||
|
MQTT_CONNACK, /* 2 */
|
||||||
|
MQTT_SUBACK, /* 3 */
|
||||||
|
MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */
|
||||||
|
MQTT_PUBWAIT, /* 5 - wait for publish */
|
||||||
|
MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */
|
||||||
|
|
||||||
|
MQTT_NOSTATE = 99 /* never an actual state */
|
||||||
|
};
|
||||||
|
|
||||||
struct mqtt_conn {
|
struct mqtt_conn {
|
||||||
enum {
|
enum mqttstate state;
|
||||||
MQTT_CONNACK,
|
enum mqttstate nextstate; /* switch to this after remaining length is
|
||||||
MQTT_SUBACK,
|
done */
|
||||||
MQTT_SUBWAIT, /* wait for subscribe response */
|
|
||||||
MQTT_SUB_REMAIN /* wait for the remainder of the subscribe response */
|
|
||||||
} state;
|
|
||||||
unsigned int packetid;
|
unsigned int packetid;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -41,9 +50,10 @@ struct MQTT {
|
|||||||
char *sendleftovers;
|
char *sendleftovers;
|
||||||
size_t nsend; /* size of sendleftovers */
|
size_t nsend; /* size of sendleftovers */
|
||||||
|
|
||||||
/* when receving a PUBLISH */
|
/* when receving */
|
||||||
size_t npacket; /* byte counter */
|
size_t npacket; /* byte counter */
|
||||||
unsigned char firstbyte;
|
unsigned char firstbyte;
|
||||||
|
size_t remaining_length;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* HEADER_CURL_MQTT_H */
|
#endif /* HEADER_CURL_MQTT_H */
|
||||||
|
@ -139,7 +139,7 @@ test1160 test1161 test1162 test1163 test1164 test1165 test1166 test1167 \
|
|||||||
\
|
\
|
||||||
test1170 test1171 test1172 test1173 test1174 test1175 test1176 test1177 \
|
test1170 test1171 test1172 test1173 test1174 test1175 test1176 test1177 \
|
||||||
\
|
\
|
||||||
test1190 test1191 test1192 test1193 \
|
test1190 test1191 test1192 test1193 test1194 test1195 test1196 \
|
||||||
\
|
\
|
||||||
test1200 test1201 test1202 test1203 test1204 test1205 test1206 test1207 \
|
test1200 test1201 test1202 test1203 test1204 test1205 test1206 test1207 \
|
||||||
test1208 test1209 test1210 test1211 test1212 test1213 test1214 test1215 \
|
test1208 test1209 test1210 test1211 test1212 test1213 test1214 test1215 \
|
||||||
|
@ -46,7 +46,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/
|
|||||||
</strippart>
|
</strippart>
|
||||||
<protocol>
|
<protocol>
|
||||||
client CONNECT 18 00044d5154540402003c000c6375726c
|
client CONNECT 18 00044d5154540402003c000c6375726c
|
||||||
server CONACK 2 20020000
|
server CONNACK 2 20020000
|
||||||
client SUBSCRIBE 9 000100043131393000
|
client SUBSCRIBE 9 000100043131393000
|
||||||
server SUBACK 3 9003000100
|
server SUBACK 3 9003000100
|
||||||
server PUBLISH c 300c00043131393068656c6c6f0a
|
server PUBLISH c 300c00043131393068656c6c6f0a
|
||||||
|
@ -42,7 +42,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/
|
|||||||
</strippart>
|
</strippart>
|
||||||
<protocol>
|
<protocol>
|
||||||
client CONNECT 18 00044d5154540402003c000c6375726c
|
client CONNECT 18 00044d5154540402003c000c6375726c
|
||||||
server CONACK 2 20020000
|
server CONNACK 2 20020000
|
||||||
client PUBLISH f 000431313931736f6d657468696e67
|
client PUBLISH f 000431313931736f6d657468696e67
|
||||||
client DISCONNECT 0 e000
|
client DISCONNECT 0 e000
|
||||||
</protocol>
|
</protocol>
|
||||||
|
@ -46,7 +46,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/
|
|||||||
</strippart>
|
</strippart>
|
||||||
<protocol>
|
<protocol>
|
||||||
client CONNECT 18 00044d5154540402003c000c6375726c
|
client CONNECT 18 00044d5154540402003c000c6375726c
|
||||||
server CONACK 2 20020000
|
server CONNACK 2 20020000
|
||||||
client SUBSCRIBE 80af3131393200
|
client SUBSCRIBE 80af3131393200
|
||||||
server SUBACK 3 9003000100
|
server SUBACK 3 9003000100
|
||||||
server PUBLISH 80d 308df3131393268656c6c6f0a
|
server PUBLISH 80d 308df3131393268656c6c6f0a
|
||||||
|
@ -64,7 +64,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/
|
|||||||
</strippart>
|
</strippart>
|
||||||
<protocol>
|
<protocol>
|
||||||
client CONNECT 18 00044d5154540402003c000c6375726c
|
client CONNECT 18 00044d5154540402003c000c6375726c
|
||||||
server CONACK 2 20020000
|
server CONNACK 2 20020000
|
||||||
client PUBLISH 7c
|
client PUBLISH 7c
|
||||||
client DISCONNECT 0 e000
|
client DISCONNECT 0 e000
|
||||||
</protocol>
|
</protocol>
|
||||||
|
59
tests/data/test1194
Normal file
59
tests/data/test1194
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
<testcase>
|
||||||
|
<info>
|
||||||
|
<keywords>
|
||||||
|
MQTT
|
||||||
|
MQTT SUBSCRIBE
|
||||||
|
</keywords>
|
||||||
|
</info>
|
||||||
|
|
||||||
|
#
|
||||||
|
# Server-side
|
||||||
|
<reply>
|
||||||
|
<data nocheck="yes">
|
||||||
|
hello
|
||||||
|
</data>
|
||||||
|
<datacheck hex="yes">
|
||||||
|
00 04 31 31 39 30 68 65 6c 6c 6f 5b 4c 46 5d 0a
|
||||||
|
</datacheck>
|
||||||
|
<servercmd>
|
||||||
|
PUBLISH-before-SUBACK TRUE
|
||||||
|
</servercmd>
|
||||||
|
</reply>
|
||||||
|
|
||||||
|
#
|
||||||
|
# Client-side
|
||||||
|
<client>
|
||||||
|
<features>
|
||||||
|
mqtt
|
||||||
|
</features>
|
||||||
|
<server>
|
||||||
|
mqtt
|
||||||
|
</server>
|
||||||
|
<name>
|
||||||
|
MQTT SUBSCRIBE with PUBLISH befoire SUBACK
|
||||||
|
</name>
|
||||||
|
<command option="binary-trace">
|
||||||
|
mqtt://%HOSTIP:%MQTTPORT/1194
|
||||||
|
</command>
|
||||||
|
</client>
|
||||||
|
|
||||||
|
#
|
||||||
|
# Verify data after the test has been "shot"
|
||||||
|
<verify>
|
||||||
|
# These are hexadecimal protocol dumps from the client
|
||||||
|
#
|
||||||
|
# Strip out the random part of the client id from the CONNECT message
|
||||||
|
# before comparison
|
||||||
|
<strippart>
|
||||||
|
s/^(.* 00044d5154540402003c000c6375726c).*/$1/
|
||||||
|
</strippart>
|
||||||
|
<protocol>
|
||||||
|
client CONNECT 18 00044d5154540402003c000c6375726c
|
||||||
|
server CONNACK 2 20020000
|
||||||
|
client SUBSCRIBE 9 000100043131393400
|
||||||
|
server PUBLISH c 300c00043131393468656c6c6f0a
|
||||||
|
server SUBACK 3 9003000100
|
||||||
|
server DISCONNECT 0 e000
|
||||||
|
</protocol>
|
||||||
|
</verify>
|
||||||
|
</testcase>
|
63
tests/data/test1195
Normal file
63
tests/data/test1195
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
<testcase>
|
||||||
|
<info>
|
||||||
|
<keywords>
|
||||||
|
MQTT
|
||||||
|
MQTT SUBSCRIBE
|
||||||
|
</keywords>
|
||||||
|
</info>
|
||||||
|
|
||||||
|
#
|
||||||
|
# Server-side
|
||||||
|
<reply>
|
||||||
|
<data nocheck="yes">
|
||||||
|
hello
|
||||||
|
</data>
|
||||||
|
<datacheck hex="yes">
|
||||||
|
00 04 31 31 39 30 68 65 6c 6c 6f 5b 4c 46 5d 0a
|
||||||
|
</datacheck>
|
||||||
|
<servercmd>
|
||||||
|
PUBLISH-before-SUBACK TRUE
|
||||||
|
short-PUBLISH TRUE
|
||||||
|
</servercmd>
|
||||||
|
</reply>
|
||||||
|
|
||||||
|
#
|
||||||
|
# Client-side
|
||||||
|
<client>
|
||||||
|
<features>
|
||||||
|
mqtt
|
||||||
|
</features>
|
||||||
|
<server>
|
||||||
|
mqtt
|
||||||
|
</server>
|
||||||
|
<name>
|
||||||
|
MQTT SUBSCRIBE with short PUBLISH
|
||||||
|
</name>
|
||||||
|
<command option="binary-trace">
|
||||||
|
mqtt://%HOSTIP:%MQTTPORT/1195
|
||||||
|
</command>
|
||||||
|
</client>
|
||||||
|
|
||||||
|
#
|
||||||
|
# Verify data after the test has been "shot"
|
||||||
|
<verify>
|
||||||
|
# These are hexadecimal protocol dumps from the client
|
||||||
|
#
|
||||||
|
# Strip out the random part of the client id from the CONNECT message
|
||||||
|
# before comparison
|
||||||
|
<strippart>
|
||||||
|
s/^(.* 00044d5154540402003c000c6375726c).*/$1/
|
||||||
|
</strippart>
|
||||||
|
<protocol>
|
||||||
|
client CONNECT 18 00044d5154540402003c000c6375726c
|
||||||
|
server CONNACK 2 20020000
|
||||||
|
client SUBSCRIBE 9 000100043131393500
|
||||||
|
server PUBLISH c 300c00043131393568656c6c
|
||||||
|
</protocol>
|
||||||
|
|
||||||
|
# 18 is CURLE_PARTIAL_FILE
|
||||||
|
<errorcode>
|
||||||
|
18
|
||||||
|
</errorcode>
|
||||||
|
</verify>
|
||||||
|
</testcase>
|
62
tests/data/test1196
Normal file
62
tests/data/test1196
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
<testcase>
|
||||||
|
<info>
|
||||||
|
<keywords>
|
||||||
|
MQTT
|
||||||
|
MQTT SUBSCRIBE
|
||||||
|
</keywords>
|
||||||
|
</info>
|
||||||
|
|
||||||
|
#
|
||||||
|
# Server-side
|
||||||
|
<reply>
|
||||||
|
<data nocheck="yes">
|
||||||
|
hello
|
||||||
|
</data>
|
||||||
|
<datacheck hex="yes">
|
||||||
|
00 04 31 31 39 30 68 65 6c 6c 6f 5b 4c 46 5d 0a
|
||||||
|
</datacheck>
|
||||||
|
|
||||||
|
# error 1 - "Connection Refused, unacceptable protocol version"
|
||||||
|
<servercmd>
|
||||||
|
error-CONNACK 1
|
||||||
|
</servercmd>
|
||||||
|
</reply>
|
||||||
|
|
||||||
|
#
|
||||||
|
# Client-side
|
||||||
|
<client>
|
||||||
|
<features>
|
||||||
|
mqtt
|
||||||
|
</features>
|
||||||
|
<server>
|
||||||
|
mqtt
|
||||||
|
</server>
|
||||||
|
<name>
|
||||||
|
MQTT with error in CONNACK
|
||||||
|
</name>
|
||||||
|
<command option="binary-trace">
|
||||||
|
mqtt://%HOSTIP:%MQTTPORT/1196
|
||||||
|
</command>
|
||||||
|
</client>
|
||||||
|
|
||||||
|
#
|
||||||
|
# Verify data after the test has been "shot"
|
||||||
|
<verify>
|
||||||
|
# These are hexadecimal protocol dumps from the client
|
||||||
|
#
|
||||||
|
# Strip out the random part of the client id from the CONNECT message
|
||||||
|
# before comparison
|
||||||
|
<strippart>
|
||||||
|
s/^(.* 00044d5154540402003c000c6375726c).*/$1/
|
||||||
|
</strippart>
|
||||||
|
<protocol>
|
||||||
|
client CONNECT 18 00044d5154540402003c000c6375726c
|
||||||
|
server CONNACK 2 20020001
|
||||||
|
</protocol>
|
||||||
|
|
||||||
|
# 8 is CURLE_WEIRD_SERVER_REPLY
|
||||||
|
<errorcode>
|
||||||
|
8
|
||||||
|
</errorcode>
|
||||||
|
</verify>
|
||||||
|
</testcase>
|
@ -104,6 +104,10 @@
|
|||||||
struct configurable {
|
struct configurable {
|
||||||
unsigned char version; /* initial version byte in the request must match
|
unsigned char version; /* initial version byte in the request must match
|
||||||
this */
|
this */
|
||||||
|
bool publish_before_suback;
|
||||||
|
bool short_publish;
|
||||||
|
unsigned char error_connack;
|
||||||
|
int testnum;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define REQUEST_DUMP "log/server.input"
|
#define REQUEST_DUMP "log/server.input"
|
||||||
@ -124,6 +128,10 @@ static void resetdefaults(void)
|
|||||||
{
|
{
|
||||||
logmsg("Reset to defaults");
|
logmsg("Reset to defaults");
|
||||||
config.version = CONFIG_VERSION;
|
config.version = CONFIG_VERSION;
|
||||||
|
config.publish_before_suback = FALSE;
|
||||||
|
config.short_publish = FALSE;
|
||||||
|
config.error_connack = 0;
|
||||||
|
config.testnum = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static unsigned char byteval(char *value)
|
static unsigned char byteval(char *value)
|
||||||
@ -147,10 +155,29 @@ static void getconfig(void)
|
|||||||
config.version = byteval(value);
|
config.version = byteval(value);
|
||||||
logmsg("version [%d] set", config.version);
|
logmsg("version [%d] set", config.version);
|
||||||
}
|
}
|
||||||
|
else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
|
||||||
|
logmsg("PUBLISH-before-SUBACK set");
|
||||||
|
config.publish_before_suback = TRUE;
|
||||||
|
}
|
||||||
|
else if(!strcmp(key, "short-PUBLISH")) {
|
||||||
|
logmsg("short-PUBLISH set");
|
||||||
|
config.short_publish = TRUE;
|
||||||
|
}
|
||||||
|
else if(!strcmp(key, "error-CONNACK")) {
|
||||||
|
config.error_connack = byteval(value);
|
||||||
|
logmsg("error-CONNACK = %d", config.error_connack);
|
||||||
|
}
|
||||||
|
else if(!strcmp(key, "Testnum")) {
|
||||||
|
config.testnum = atoi(value);
|
||||||
|
logmsg("testnum = %d", config.testnum);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
logmsg("No config file '%s' to read", configfile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void loghex(unsigned char *buffer, ssize_t len)
|
static void loghex(unsigned char *buffer, ssize_t len)
|
||||||
@ -209,11 +236,17 @@ static int connack(FILE *dump, curl_socket_t fd)
|
|||||||
MQTT_MSG_CONNACK, 0x02,
|
MQTT_MSG_CONNACK, 0x02,
|
||||||
0x00, 0x00
|
0x00, 0x00
|
||||||
};
|
};
|
||||||
ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
|
ssize_t rc;
|
||||||
if(rc == sizeof(packet)) {
|
|
||||||
logmsg("WROTE %d bytes [CONACK]", rc);
|
packet[3] = config.error_connack;
|
||||||
|
|
||||||
|
rc = swrite(fd, (char *)packet, sizeof(packet));
|
||||||
|
if(rc > 0) {
|
||||||
|
logmsg("WROTE %d bytes [CONNACK]", rc);
|
||||||
loghex(packet, rc);
|
loghex(packet, rc);
|
||||||
logprotocol(FROM_SERVER, "CONACK", 2, dump, packet, sizeof(packet));
|
logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
|
||||||
|
}
|
||||||
|
if(rc == sizeof(packet)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
@ -360,6 +393,7 @@ static int publish(FILE *dump,
|
|||||||
size_t payloadindex;
|
size_t payloadindex;
|
||||||
ssize_t remaininglength = topiclen + 2 + payloadlen;
|
ssize_t remaininglength = topiclen + 2 + payloadlen;
|
||||||
ssize_t packetlen;
|
ssize_t packetlen;
|
||||||
|
ssize_t sendamount;
|
||||||
ssize_t rc;
|
ssize_t rc;
|
||||||
char rembuffer[4];
|
char rembuffer[4];
|
||||||
int encodedlen;
|
int encodedlen;
|
||||||
@ -385,13 +419,18 @@ static int publish(FILE *dump,
|
|||||||
payloadindex = 3 + topiclen + encodedlen;
|
payloadindex = 3 + topiclen + encodedlen;
|
||||||
memcpy(&packet[payloadindex], payload, payloadlen);
|
memcpy(&packet[payloadindex], payload, payloadlen);
|
||||||
|
|
||||||
rc = swrite(fd, (char *)packet, packetlen);
|
sendamount = packetlen;
|
||||||
if(rc == packetlen) {
|
if(config.short_publish)
|
||||||
|
sendamount -= 2;
|
||||||
|
|
||||||
|
rc = swrite(fd, (char *)packet, sendamount);
|
||||||
|
if(rc > 0) {
|
||||||
logmsg("WROTE %d bytes [PUBLISH]", rc);
|
logmsg("WROTE %d bytes [PUBLISH]", rc);
|
||||||
loghex(packet, rc);
|
loghex(packet, rc);
|
||||||
logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
|
logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
if(rc == packetlen)
|
||||||
|
return 0;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -459,6 +498,11 @@ static curl_socket_t mqttit(curl_socket_t fd)
|
|||||||
|
|
||||||
getconfig();
|
getconfig();
|
||||||
|
|
||||||
|
testno = config.testnum;
|
||||||
|
|
||||||
|
if(testno)
|
||||||
|
logmsg("Found test number %ld", testno);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
/* get the fixed header */
|
/* get the fixed header */
|
||||||
rc = fixedheader(fd, &byte, &remaining_length, &bytes);
|
rc = fixedheader(fd, &byte, &remaining_length, &bytes);
|
||||||
@ -506,8 +550,10 @@ static curl_socket_t mqttit(curl_socket_t fd)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if(byte == MQTT_MSG_SUBSCRIBE) {
|
else if(byte == MQTT_MSG_SUBSCRIBE) {
|
||||||
char *testnop;
|
FILE *stream;
|
||||||
|
int error;
|
||||||
|
char *data;
|
||||||
|
size_t datalen;
|
||||||
logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
|
logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
|
||||||
dump, buffer, rc);
|
dump, buffer, rc);
|
||||||
logmsg("Incoming SUBSCRIBE");
|
logmsg("Incoming SUBSCRIBE");
|
||||||
@ -533,26 +579,25 @@ static curl_socket_t mqttit(curl_socket_t fd)
|
|||||||
/* there's a QoS byte (two bits) after the topic */
|
/* there's a QoS byte (two bits) after the topic */
|
||||||
|
|
||||||
logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
|
logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
|
||||||
|
stream = test2fopen(testno);
|
||||||
|
error = getpart(&data, &datalen, "reply", "data", stream);
|
||||||
|
if(!error) {
|
||||||
|
if(!config.publish_before_suback) {
|
||||||
if(suback(dump, fd, packet_id)) {
|
if(suback(dump, fd, packet_id)) {
|
||||||
logmsg("failed sending SUBACK");
|
logmsg("failed sending SUBACK");
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
testnop = strrchr(topic, '/');
|
}
|
||||||
if(!testnop)
|
if(publish(dump, fd, packet_id, topic, data, datalen)) {
|
||||||
testnop = topic;
|
logmsg("PUBLISH failed");
|
||||||
else
|
goto end;
|
||||||
testnop++; /* pass the slash */
|
}
|
||||||
testno = strtol(testnop, NULL, 10);
|
if(config.publish_before_suback) {
|
||||||
if(testno) {
|
if(suback(dump, fd, packet_id)) {
|
||||||
FILE *stream;
|
logmsg("failed sending SUBACK");
|
||||||
int error;
|
goto end;
|
||||||
char *data;
|
}
|
||||||
size_t datalen;
|
}
|
||||||
logmsg("Found test number %ld", testno);
|
|
||||||
stream = test2fopen(testno);
|
|
||||||
error = getpart(&data, &datalen, "reply", "data", stream);
|
|
||||||
if(!error)
|
|
||||||
publish(dump, fd, packet_id, topic, data, datalen);
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
char *def = (char *)"this is random payload yes yes it is";
|
char *def = (char *)"this is random payload yes yes it is";
|
||||||
|
Loading…
Reference in New Issue
Block a user