mqtt: add new experimental protocol

Closes #5173
This commit is contained in:
Bjorn Stenberg 2020-04-14 11:19:12 +02:00 committed by Daniel Stenberg
parent 8909865191
commit 2522903b79
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
18 changed files with 766 additions and 57 deletions

View File

@ -172,6 +172,8 @@ option(CURL_DISABLE_SMTP "to disable SMTP" OFF)
mark_as_advanced(CURL_DISABLE_SMTP)
option(CURL_DISABLE_GOPHER "to disable Gopher" OFF)
mark_as_advanced(CURL_DISABLE_GOPHER)
option(CURL_ENABLE_MQTT "to enable MQTT" OFF)
mark_as_advanced(CURL_ENABLE_MQTT)
if(HTTP_ONLY)
set(CURL_DISABLE_FTP ON)
@ -1307,6 +1309,7 @@ _add_if("SCP" USE_LIBSSH2)
_add_if("SFTP" USE_LIBSSH2)
_add_if("RTSP" NOT CURL_DISABLE_RTSP)
_add_if("RTMP" USE_LIBRTMP)
_add_if("MQTT" CURL_ENABLE_MQTT)
if(_items)
list(SORT _items)
endif()

View File

@ -637,6 +637,22 @@ AC_HELP_STRING([--disable-gopher],[Disable Gopher support]),
AC_MSG_RESULT(yes)
)
AC_MSG_CHECKING([whether to support mqtt])
AC_ARG_ENABLE(mqtt,
AC_HELP_STRING([--enable-mqtt],[Enable MQTT support])
AC_HELP_STRING([--disable-mqtt],[Disable MQTT support]),
[ case "$enableval" in
no)
AC_MSG_RESULT(no)
;;
*) AC_MSG_RESULT(yes)
experimental="$experimental MQTT"
AC_DEFINE(CURL_ENABLE_MQTT, 1, [to enable MQTT])
AC_SUBST(CURL_ENABLE_MQTT, [1])
;;
esac ],
AC_MSG_RESULT(no)
)
dnl **********************************************************************
dnl Check for built-in manual
@ -4843,6 +4859,9 @@ fi
if test "x$CURL_DISABLE_GOPHER" != "x1"; then
SUPPORT_PROTOCOLS="$SUPPORT_PROTOCOLS GOPHER"
fi
if test "x$CURL_ENABLE_MQTT" = "x1"; then
SUPPORT_PROTOCOLS="$SUPPORT_PROTOCOLS MQTT"
fi
if test "x$CURL_DISABLE_POP3" != "x1"; then
SUPPORT_PROTOCOLS="$SUPPORT_PROTOCOLS POP3"
if test "x$SSL_ENABLED" = "x1"; then

View File

@ -20,3 +20,4 @@ Experimental support in curl means:
- HTTP/3 support and options
- alt-svc support and options
- MQTT

View File

@ -180,6 +180,9 @@ IMAPS (*1)
- explicit "STARTTLS" usage to "upgrade" plain imap:// connections to use SSL
- via http-proxy
MQTT
- Subscribe to and publish topics using url scheme mqtt://broker/topic
FOOTNOTES
=========

61
docs/MQTT.md Normal file
View File

@ -0,0 +1,61 @@
# MQTT in curl
## Experimental!
MQTT support in curl is considered **EXPERIMENTAL** until further notice. It
needs to be enabled at build-time. See below.
After the initial merge, further development and tweaking of the MQTT support
in curl will happen in the master branch using pull-requests, just like
ordinary changes.
Experimental support for MQTT means that we **do not guarantee** that the
current protocol functionality will remain or remain this way going forward.
There are no API or ABI promises for experimental features as for regular curl
features.
Do not ship anything with this enabled.
## Build
./configure --enable-mqtt
## Usage
A plain "GET" subscribes to the topic and prints all published messages.
Doing a "POST" publishes the post data to the topic and exits.
Example subscribe:
curl mqtt://host/home/bedroom/temp
Example publish:
curl -d 80 mqtt://host/home/bedroom/dimmer
## What does curl deliver as a response to a subscribe
It outputs two bytes topic length (MSB | LSB), the topic followed by the
payload.
## Caveats
Remaining limitations:
- No username support
- Only QoS level 0 is implemented for publish
- No way to set retain flag for publish
- No username/password support
- No TLS (mqtts) support
- Naive EAGAIN handling won't handle split messages
## Work
1. Write a mqtt server for the test suite
2. Create a few tests verifying the existing mqtt functionality
3. Work on fixing some of the worst limitations - with accompanying tests
4. Consider replacing the client-side MQTT code with wolfMQTT
## Credits
The initial MQTT patch was authored by Björn Stenberg. This work is built upon
that patch and has been expanded since.

View File

@ -71,6 +71,7 @@ EXTRA_DIST = \
KNOWN_BUGS \
LICENSE-MIXING.md \
MAIL-ETIQUETTE \
MQTT.md \
PARALLEL-TRANSFERS.md \
README.md \
RELEASE-PROCEDURE.md \

View File

@ -2,7 +2,7 @@ Long: data
Short: d
Arg: <data>
Help: HTTP POST data
Protocols: HTTP
Protocols: HTTP MQTT
See-also: data-binary data-urlencode data-raw
Mutexed: form head upload-file
---

View File

@ -31,8 +31,8 @@ curl \- transfer a URL
.B curl
is a tool to transfer data from or to a server, using one of the supported
protocols (DICT, FILE, FTP, FTPS, GOPHER, HTTP, HTTPS, IMAP, IMAPS, LDAP,
LDAPS, POP3, POP3S, RTMP, RTSP, SCP, SFTP, SMB, SMBS, SMTP, SMTPS, TELNET
and TFTP). The command is designed to work without user interaction.
LDAPS, MQTT, POP3, POP3S, RTMP, RTSP, SCP, SFTP, SMB, SMBS, SMTP, SMTPS,
TELNET and TFTP). The command is designed to work without user interaction.
curl offers a busload of useful tricks like proxy support, user
authentication, FTP upload, HTTP post, SSL connections, cookies, file transfer

View File

@ -38,7 +38,7 @@ CURLPROTO_IMAPS, CURLPROTO_LDAP, CURLPROTO_LDAPS, CURLPROTO_POP3,
CURLPROTO_POP3S, CURLPROTO_RTMP, CURLPROTO_RTMPE, CURLPROTO_RTMPS,
CURLPROTO_RTMPT, CURLPROTO_RTMPTE, CURLPROTO_RTMPTS, CURLPROTO_RTSP,
CURLPROTO_SCP, CURLPROTO_SFTP, CURLPROTO_SMB, CURLPROTO_SMBS, CURLPROTO_SMTP,
CURLPROTO_SMTPS, CURLPROTO_TELNET, CURLPROTO_TFTP
CURLPROTO_SMTPS, CURLPROTO_TELNET, CURLPROTO_TFTP, CURLPROTO_MQTT
.SH PROTOCOLS
All
.SH EXAMPLE

View File

@ -676,6 +676,7 @@ CURLPROTO_IMAP 7.20.0
CURLPROTO_IMAPS 7.20.0
CURLPROTO_LDAP 7.19.4
CURLPROTO_LDAPS 7.19.4
CURLPROTO_MQTT 7.71.0
CURLPROTO_POP3 7.20.0
CURLPROTO_POP3S 7.20.0
CURLPROTO_RTMP 7.21.0

View File

@ -937,6 +937,7 @@ typedef enum {
#define CURLPROTO_GOPHER (1<<25)
#define CURLPROTO_SMB (1<<26)
#define CURLPROTO_SMBS (1<<27)
#define CURLPROTO_MQTT (1<<28)
#define CURLPROTO_ALL (~0) /* enable everything */
/* long may be 32 or 64 bits, but we should never depend on anything else

View File

@ -20,71 +20,66 @@
#
###########################################################################
LIB_VAUTH_CFILES = vauth/vauth.c vauth/cleartext.c vauth/cram.c \
vauth/digest.c vauth/digest_sspi.c vauth/krb5_gssapi.c \
vauth/krb5_sspi.c vauth/ntlm.c vauth/ntlm_sspi.c vauth/oauth2.c \
vauth/spnego_gssapi.c vauth/spnego_sspi.c
LIB_VAUTH_CFILES = vauth/cleartext.c vauth/cram.c vauth/digest.c \
vauth/digest_sspi.c vauth/krb5_gssapi.c vauth/krb5_sspi.c vauth/ntlm.c \
vauth/ntlm_sspi.c vauth/oauth2.c vauth/spnego_gssapi.c vauth/spnego_sspi.c \
vauth/vauth.c
LIB_VAUTH_HFILES = vauth/vauth.h vauth/digest.h vauth/ntlm.h
LIB_VAUTH_HFILES = vauth/digest.h vauth/ntlm.h vauth/vauth.h
LIB_VTLS_CFILES = vtls/openssl.c vtls/gtls.c vtls/vtls.c vtls/nss.c \
vtls/mbedtls_threadlock.c vtls/wolfssl.c vtls/schannel.c \
vtls/schannel_verify.c vtls/sectransp.c vtls/gskit.c vtls/mbedtls.c \
vtls/mesalink.c vtls/bearssl.c
LIB_VTLS_CFILES = vtls/bearssl.c vtls/gskit.c vtls/gtls.c vtls/mbedtls.c \
vtls/mbedtls_threadlock.c vtls/mesalink.c vtls/nss.c vtls/openssl.c \
vtls/schannel.c vtls/schannel_verify.c vtls/sectransp.c vtls/vtls.c \
vtls/wolfssl.c
LIB_VTLS_HFILES = vtls/openssl.h vtls/vtls.h vtls/gtls.h vtls/nssg.h \
vtls/mbedtls_threadlock.h vtls/wolfssl.h vtls/schannel.h \
vtls/sectransp.h vtls/gskit.h vtls/mbedtls.h vtls/mesalink.h \
vtls/bearssl.h
LIB_VTLS_HFILES = vtls/bearssl.h vtls/gskit.h vtls/gtls.h vtls/mbedtls.h \
vtls/mbedtls_threadlock.h vtls/mesalink.h vtls/nssg.h vtls/openssl.h \
vtls/schannel.h vtls/sectransp.h vtls/vtls.h vtls/wolfssl.h
LIB_VQUIC_CFILES = vquic/ngtcp2.c vquic/quiche.c
LIB_VQUIC_HFILES = vquic/ngtcp2.h vquic/quiche.h
LIB_VSSH_CFILES = vssh/libssh2.c vssh/libssh.c vssh/wolfssh.c
LIB_VSSH_CFILES = vssh/libssh.c vssh/libssh2.c vssh/wolfssh.c
LIB_VSSH_HFILES = vssh/ssh.h
LIB_CFILES = file.c timeval.c base64.c hostip.c progress.c formdata.c \
cookie.c http.c sendf.c ftp.c url.c dict.c if2ip.c speedcheck.c \
ldap.c version.c getenv.c escape.c mprintf.c telnet.c netrc.c \
getinfo.c transfer.c strcase.c easy.c security.c curl_fnmatch.c \
fileinfo.c ftplistparser.c wildcard.c krb5.c memdebug.c http_chunks.c \
strtok.c connect.c llist.c hash.c multi.c content_encoding.c share.c \
http_digest.c md4.c md5.c http_negotiate.c inet_pton.c strtoofft.c \
strerror.c amigaos.c hostasyn.c hostip4.c hostip6.c hostsyn.c \
inet_ntop.c parsedate.c select.c tftp.c splay.c strdup.c socks.c \
curl_addrinfo.c socks_gssapi.c socks_sspi.c \
curl_sspi.c slist.c nonblock.c curl_memrchr.c imap.c pop3.c smtp.c \
pingpong.c rtsp.c curl_threads.c warnless.c hmac.c curl_rtmp.c \
openldap.c curl_gethostname.c gopher.c idn_win32.c \
http_proxy.c non-ascii.c asyn-ares.c asyn-thread.c curl_gssapi.c \
http_ntlm.c curl_ntlm_wb.c curl_ntlm_core.c curl_sasl.c rand.c \
curl_multibyte.c hostcheck.c conncache.c dotdot.c \
x509asn1.c http2.c smb.c curl_endian.c curl_des.c system_win32.c \
mime.c sha256.c setopt.c curl_path.c curl_ctype.c curl_range.c psl.c \
doh.c urlapi.c curl_get_line.c altsvc.c socketpair.c rename.c
LIB_CFILES = altsvc.c amigaos.c asyn-ares.c asyn-thread.c base64.c \
conncache.c connect.c content_encoding.c cookie.c curl_addrinfo.c \
curl_ctype.c curl_des.c curl_endian.c curl_fnmatch.c curl_get_line.c \
curl_gethostname.c curl_gssapi.c curl_memrchr.c curl_multibyte.c \
curl_ntlm_core.c curl_ntlm_wb.c curl_path.c curl_range.c curl_rtmp.c \
curl_sasl.c curl_sspi.c curl_threads.c dict.c dotdot.c easy.c escape.c \
file.c fileinfo.c formdata.c ftp.c url.c ftplistparser.c getenv.c getinfo.c \
gopher.c hash.c hmac.c hostasyn.c hostcheck.c hostip.c hostip4.c hostip6.c \
hostsyn.c http.c http2.c http_chunks.c http_digest.c http_negotiate.c \
http_ntlm.c http_proxy.c idn_win32.c if2ip.c imap.c inet_ntop.c inet_pton.c \
krb5.c ldap.c llist.c md4.c md5.c memdebug.c mime.c mprintf.c mqtt.c \
multi.c netrc.c non-ascii.c nonblock.c openldap.c parsedate.c pingpong.c \
pop3.c progress.c psl.c doh.c rand.c rename.c rtsp.c security.c select.c \
sendf.c setopt.c sha256.c share.c slist.c smb.c smtp.c socketpair.c socks.c \
socks_gssapi.c socks_sspi.c speedcheck.c splay.c strcase.c strdup.c \
strerror.c strtok.c strtoofft.c system_win32.c telnet.c tftp.c timeval.c \
transfer.c urlapi.c version.c warnless.c wildcard.c x509asn1.c
LIB_HFILES = arpa_telnet.h netrc.h file.h timeval.h hostip.h progress.h \
formdata.h cookie.h http.h sendf.h ftp.h url.h dict.h if2ip.h \
speedcheck.h urldata.h curl_ldap.h escape.h telnet.h getinfo.h \
strcase.h curl_sec.h memdebug.h http_chunks.h curl_fnmatch.h \
wildcard.h fileinfo.h ftplistparser.h strtok.h connect.h llist.h \
hash.h content_encoding.h share.h curl_md4.h curl_md5.h http_digest.h \
http_negotiate.h inet_pton.h amigaos.h strtoofft.h strerror.h \
inet_ntop.h curlx.h curl_memory.h curl_setup.h transfer.h select.h \
easyif.h multiif.h parsedate.h tftp.h sockaddr.h splay.h strdup.h \
socks.h curl_base64.h curl_addrinfo.h curl_sspi.h \
slist.h nonblock.h curl_memrchr.h imap.h pop3.h smtp.h pingpong.h \
rtsp.h curl_threads.h warnless.h curl_hmac.h curl_rtmp.h \
curl_gethostname.h gopher.h http_proxy.h non-ascii.h asyn.h \
http_ntlm.h curl_gssapi.h curl_ntlm_wb.h curl_ntlm_core.h \
curl_sasl.h curl_multibyte.h hostcheck.h conncache.h \
curl_setup_once.h multihandle.h setup-vms.h dotdot.h \
x509asn1.h http2.h sigpipe.h smb.h curl_endian.h curl_des.h \
curl_printf.h system_win32.h rand.h mime.h curl_sha256.h setopt.h \
curl_path.h curl_ctype.h curl_range.h psl.h doh.h urlapi-int.h \
curl_get_line.h altsvc.h quic.h socketpair.h rename.h
LIB_HFILES = altsvc.h amigaos.h arpa_telnet.h asyn.h conncache.h connect.h \
content_encoding.h cookie.h curl_addrinfo.h curl_base64.h curl_ctype.h \
curl_des.h curl_endian.h curl_fnmatch.h curl_get_line.h curl_gethostname.h \
curl_gssapi.h curl_hmac.h curl_ldap.h curl_md4.h curl_md5.h curl_memory.h \
curl_memrchr.h curl_multibyte.h curl_ntlm_core.h curl_ntlm_wb.h curl_path.h \
curl_printf.h curl_range.h curl_rtmp.h curl_sasl.h curl_sec.h curl_setup.h \
curl_setup_once.h curl_sha256.h curl_sspi.h curl_threads.h curlx.h dict.h \
dotdot.h easyif.h escape.h file.h fileinfo.h formdata.h ftp.h url.h \
ftplistparser.h getinfo.h gopher.h hash.h hostcheck.h hostip.h http.h \
http2.h http_chunks.h http_digest.h http_negotiate.h http_ntlm.h \
http_proxy.h if2ip.h imap.h inet_ntop.h inet_pton.h llist.h memdebug.h \
mime.h mqtt.h multihandle.h multiif.h netrc.h non-ascii.h nonblock.h \
parsedate.h pingpong.h pop3.h progress.h psl.h doh.h quic.h rand.h rename.h \
rtsp.h select.h sendf.h setopt.h setup-vms.h share.h sigpipe.h slist.h \
smb.h smtp.h sockaddr.h socketpair.h socks.h speedcheck.h splay.h strcase.h \
strdup.h strerror.h strtok.h strtoofft.h system_win32.h telnet.h tftp.h \
timeval.h transfer.h urlapi-int.h urldata.h warnless.h wildcard.h \
x509asn1.h
LIB_RCFILES = libcurl.rc

View File

@ -63,6 +63,9 @@
/* to disable LDAPS */
#cmakedefine CURL_DISABLE_LDAPS 1
/* to enable MQTT */
#undef CURL_ENABLE_MQTT
/* to disable POP3 */
#cmakedefine CURL_DISABLE_POP3 1

561
lib/mqtt.c Normal file
View File

@ -0,0 +1,561 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
* Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.haxx.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
***************************************************************************/
#include "curl_setup.h"
#ifdef CURL_ENABLE_MQTT
#include "urldata.h"
#include <curl/curl.h>
#include "transfer.h"
#include "sendf.h"
#include "progress.h"
#include "mqtt.h"
#include "select.h"
#include "strdup.h"
#include "url.h"
#include "escape.h"
#include "warnless.h"
#include "curl_printf.h"
#include "curl_memory.h"
#include "multiif.h"
#include "rand.h"
/* The last #include file should be: */
#include "memdebug.h"
#define MQTT_MSG_CONNECT 0x10
#define MQTT_MSG_CONNACK 0x20
#define MQTT_MSG_PUBLISH 0x30
#define MQTT_MSG_SUBSCRIBE 0x82
#define MQTT_MSG_SUBACK 0x90
#define MQTT_MSG_DISCONNECT 0xe0
#define MQTT_CONNACK_LEN 4
#define MQTT_SUBACK_LEN 5
#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
/*
* Forward declarations.
*/
static CURLcode mqtt_do(struct connectdata *conn, bool *done);
static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
static CURLcode mqtt_setup_conn(struct connectdata *conn);
/*
* MQTT protocol handler.
*/
const struct Curl_handler Curl_handler_mqtt = {
"MQTT", /* scheme */
mqtt_setup_conn, /* setup_connection */
mqtt_do, /* do_it */
ZERO_NULL, /* done */
ZERO_NULL, /* do_more */
ZERO_NULL, /* connect_it */
ZERO_NULL, /* connecting */
mqtt_doing, /* doing */
ZERO_NULL, /* proto_getsock */
mqtt_getsock, /* doing_getsock */
ZERO_NULL, /* domore_getsock */
ZERO_NULL, /* perform_getsock */
ZERO_NULL, /* disconnect */
ZERO_NULL, /* readwrite */
ZERO_NULL, /* connection_check */
PORT_MQTT, /* defport */
CURLPROTO_MQTT, /* protocol */
PROTOPT_NONE /* flags */
};
static CURLcode mqtt_setup_conn(struct connectdata *conn)
{
/* allocate the HTTP-specific struct for the Curl_easy, only to survive
during this request */
struct MQTT *mq;
struct Curl_easy *data = conn->data;
DEBUGASSERT(data->req.protop == NULL);
mq = calloc(1, sizeof(struct MQTT));
if(!mq)
return CURLE_OUT_OF_MEMORY;
data->req.protop = mq;
return CURLE_OK;
}
static CURLcode mqtt_send(struct connectdata *conn,
char *buf, size_t len)
{
CURLcode result = CURLE_OK;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
struct Curl_easy *data = conn->data;
struct MQTT *mq = data->req.protop;
ssize_t n;
result = Curl_write(conn, sockfd, buf, len, &n);
if(!result && data->set.verbose)
Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
if(len != (size_t)n) {
size_t nsend = len - n;
char *sendleftovers = Curl_memdup(&buf[n], nsend);
if(!sendleftovers)
return CURLE_OUT_OF_MEMORY;
mq->sendleftovers = sendleftovers;
mq->nsend = nsend;
}
return result;
}
/* Generic function called by the multi interface to figure out what socket(s)
to wait for and for what actions during the DOING and PROTOCONNECT
states */
static int mqtt_getsock(struct connectdata *conn,
curl_socket_t *sock)
{
sock[0] = conn->sock[FIRSTSOCKET];
return GETSOCK_READSOCK(FIRSTSOCKET);
}
static CURLcode mqtt_connect(struct connectdata *conn)
{
CURLcode result = CURLE_OK;
const size_t client_id_offset = 14;
const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
const size_t curl_len = strlen("curl");
char packet[32] = {
MQTT_MSG_CONNECT, /* packet type */
0x00, /* remaining length */
0x00, 0x04, /* protocol length */
'M','Q','T','T', /* protocol name */
0x04, /* protocol level */
0x02, /* CONNECT flag: CleanSession */
0x00, 0x3c, /* keep-alive 0 = disabled */
0x00, 0x00 /* payload1 length */
};
packet[1] = (packetlen - 2) & 0x7f;
packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[curl_len],
MQTT_CLIENTID_LEN - curl_len + 1);
memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
infof(conn->data, "Using client id '%s'\n", client_id);
if(!result)
result = mqtt_send(conn, packet, packetlen);
return result;
}
static CURLcode mqtt_disconnect(struct connectdata *conn)
{
CURLcode result = CURLE_OK;
result = mqtt_send(conn, (char *)"\xe0\x00", 2);
return result;
}
static CURLcode mqtt_verify_connack(struct connectdata *conn)
{
CURLcode result;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
unsigned char readbuf[MQTT_CONNACK_LEN];
ssize_t nread;
struct Curl_easy *data = conn->data;
result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
if(result)
goto fail;
if(data->set.verbose)
Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
/* fixme */
if(nread < MQTT_CONNACK_LEN) {
result = CURLE_WEIRD_SERVER_REPLY;
goto fail;
}
/* verify CONNACK */
if(readbuf[0] != MQTT_MSG_CONNACK ||
readbuf[1] != 0x02 ||
readbuf[2] != 0x00 ||
readbuf[3] != 0x00) {
failf(data, "Expected %02x%02x%02x%02x but got %02x%02x%02x%02x",
MQTT_MSG_CONNACK, 0x02, 0x00, 0x00,
readbuf[0], readbuf[1], readbuf[2], readbuf[3]);
result = CURLE_WEIRD_SERVER_REPLY;
}
fail:
return result;
}
static CURLcode mqtt_get_topic(struct connectdata *conn,
char **topic, size_t *topiclen)
{
CURLcode result = CURLE_OK;
char *path = conn->data->state.up.path;
if(strlen(path) > 1) {
result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen, FALSE);
}
else {
failf(conn->data, "Error: No topic specified.");
result = CURLE_URL_MALFORMAT;
}
return result;
}
static int mqtt_encode_len(char *buf, size_t len)
{
unsigned char encoded;
int i;
for(i = 0; (len > 0) && (i<4); i++) {
encoded = len % 0x80;
len /= 0x80;
if(len)
encoded |= 0x80;
buf[i] = encoded;
}
return i;
}
static CURLcode mqtt_subscribe(struct connectdata *conn)
{
CURLcode result = CURLE_OK;
char *topic = NULL;
size_t topiclen;
unsigned char *packet = NULL;
size_t packetlen;
char encodedsize[4];
size_t n;
result = mqtt_get_topic(conn, &topic, &topiclen);
if(result)
goto fail;
conn->proto.mqtt.packetid++;
packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
+ 2 bytes topic length + QoS byte */
n = mqtt_encode_len((char *)encodedsize, packetlen);
packetlen += n + 1; /* add one for the control packet type byte */
packet = malloc(packetlen);
if(!packet) {
result = CURLE_OUT_OF_MEMORY;
goto fail;
}
packet[0] = MQTT_MSG_SUBSCRIBE;
memcpy(&packet[1], encodedsize, n);
packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
packet[3 + n] = (topiclen >> 8) & 0xff;
packet[4 + n ] = topiclen & 0xff;
memcpy(&packet[5 + n], topic, topiclen);
packet[5 + n + topiclen] = 0; /* QoS zero */
result = mqtt_send(conn, (char *)packet, packetlen);
fail:
free(topic);
free(packet);
return result;
}
static CURLcode mqtt_verify_suback(struct connectdata *conn)
{
CURLcode result;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
unsigned char readbuf[MQTT_SUBACK_LEN];
ssize_t nread;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
if(result)
goto fail;
if(conn->data->set.verbose)
Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
/* fixme */
if(nread < MQTT_SUBACK_LEN) {
result = CURLE_WEIRD_SERVER_REPLY;
goto fail;
}
/* verify SUBACK */
if(readbuf[0] != MQTT_MSG_SUBACK ||
readbuf[1] != 0x03 ||
readbuf[2] != ((mqtt->packetid >> 8) & 0xff) ||
readbuf[3] != (mqtt->packetid & 0xff) ||
readbuf[4] != 0x00)
result = CURLE_WEIRD_SERVER_REPLY;
fail:
return result;
}
static CURLcode mqtt_publish(struct connectdata *conn)
{
CURLcode result;
char *payload = conn->data->set.postfields;
size_t payloadlen = (size_t)conn->data->set.postfieldsize;
char *topic = NULL;
size_t topiclen;
unsigned char *pkt = NULL;
size_t i = 0;
size_t remaininglength;
size_t encodelen;
char encodedbytes[4];
result = mqtt_get_topic(conn, &topic, &topiclen);
if(result)
goto fail;
remaininglength = payloadlen + 2 + topiclen;
encodelen = mqtt_encode_len(encodedbytes, remaininglength);
/* add the control byte and the encoded remaining length */
pkt = malloc(remaininglength + 1 + encodelen);
if(!pkt) {
result = CURLE_OUT_OF_MEMORY;
goto fail;
}
/* assemble packet */
pkt[i++] = MQTT_MSG_PUBLISH;
memcpy(&pkt[i], encodedbytes, encodelen);
i += encodelen;
pkt[i++] = (topiclen >> 8) & 0xff;
pkt[i++] = (topiclen & 0xff);
memcpy(&pkt[i], topic, topiclen);
i += topiclen;
memcpy(&pkt[i], payload, payloadlen);
i += payloadlen;
result = mqtt_send(conn, (char *)pkt, i);
fail:
free(pkt);
free(topic);
return result;
}
static size_t mqtt_decode_len(unsigned char *buf,
size_t buflen, size_t *lenbytes)
{
size_t len = 0;
size_t mult = 1;
size_t i;
unsigned char encoded = 128;
for(i = 0; (i < buflen) && (encoded & 128); i++) {
encoded = buf[i];
len += (encoded & 127) * mult;
mult *= 128;
}
*lenbytes = i;
return len;
}
/* for the publish packet */
#define MQTT_HEADER_LEN 5 /* max 5 bytes */
static CURLcode mqtt_read_publish(struct connectdata *conn,
bool *done)
{
CURLcode result;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
ssize_t nread;
struct Curl_easy *data = conn->data;
unsigned char *pkt = (unsigned char *)data->state.buffer;
size_t remlen, lenbytes;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct MQTT *mq = data->req.protop;
switch(mqtt->state) {
case MQTT_SUBWAIT:
/* Read the initial byte and the entire Remaining Length field
in this state */
result = Curl_read(conn, sockfd, (char *)&pkt[mq->npacket], 1, &nread);
if(result)
goto end;
if(data->set.verbose)
Curl_debug(data, CURLINFO_HEADER_IN, (char *)&pkt[mq->npacket], 1);
/* we are expecting a PUBLISH message */
if(!mq->npacket && ((pkt[0] & 0xf0) != MQTT_MSG_PUBLISH)) {
if(pkt[0] == MQTT_MSG_DISCONNECT) {
infof(data, "Got DISCONNECT\n");
*done = TRUE;
goto end;
}
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
else if((mq->npacket >= 1) && !(pkt[mq->npacket] & 0x80))
/* as long as the high bit is set in the length byte, we read one more
byte, then get the remainder of the PUBLISH */
mqtt->state = MQTT_SUB_REMAIN;
mq->npacket++;
if(mqtt->state == MQTT_SUBWAIT)
return result;
/* -- switched state -- */
/* remember the first byte */
mq->firstbyte = pkt[0];
remlen = mqtt_decode_len(&pkt[1], 4, &lenbytes);
infof(data, "Remaining length: %zd bytes\n", remlen);
Curl_pgrsSetDownloadSize(data, remlen);
data->req.bytecount = 0;
data->req.size = remlen;
mq->npacket = remlen; /* get this many bytes */
/* FALLTHROUGH */
case MQTT_SUB_REMAIN: {
/* read rest of packet, but no more. Cap to buffer size */
struct SingleRequest *k = &data->req;
size_t rest = mq->npacket;
if(rest > (size_t)data->set.buffer_size)
rest = (size_t)data->set.buffer_size;
result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
if(result) {
if(CURLE_AGAIN == result) {
infof(data, "EEEE AAAAGAIN\n");
}
goto end;
}
if(data->set.verbose)
Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
mq->npacket -= nread;
k->bytecount += nread;
Curl_pgrsSetDownloadCounter(data, k->bytecount);
/* if QoS is set, message contains packet id */
result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
if(result)
goto end;
if(!mq->npacket)
/* no more PUBLISH payload, back to subscribe wait state */
mqtt->state = MQTT_SUBWAIT;
break;
}
default:
DEBUGASSERT(NULL); /* illegal state */
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
end:
return result;
}
static CURLcode mqtt_do(struct connectdata *conn, bool *done)
{
CURLcode result = CURLE_OK;
struct Curl_easy *data = conn->data;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
*done = FALSE; /* unconditionally */
result = mqtt_connect(conn);
if(result) {
failf(data, "Error %d sending MQTT CONN request", result);
return result;
}
mqtt->state = MQTT_CONNACK;
return CURLE_OK;
}
static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
{
CURLcode result = CURLE_OK;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct Curl_easy *data = conn->data;
struct MQTT *mq = data->req.protop;
*done = FALSE;
if(mq->nsend) {
/* send the remainder of an outgoing packet */
char *ptr = mq->sendleftovers;
result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
free(ptr);
if(result)
return result;
}
switch(mqtt->state) {
case MQTT_CONNACK:
result = mqtt_verify_connack(conn);
if(result)
break;
if(conn->data->set.httpreq == HTTPREQ_POST) {
result = mqtt_publish(conn);
if(!result) {
result = mqtt_disconnect(conn);
*done = TRUE;
}
}
else {
result = mqtt_subscribe(conn);
if(!result)
mqtt->state = MQTT_SUBACK;
}
break;
case MQTT_SUBACK:
result = mqtt_verify_suback(conn);
if(result)
break;
mqtt->state = MQTT_SUBWAIT;
break;
case MQTT_SUBWAIT:
case MQTT_SUB_REMAIN:
result = mqtt_read_publish(conn, done);
if(result)
break;
break;
default:
failf(conn->data, "State not handled yet");
*done = TRUE;
break;
}
if(result == CURLE_AGAIN)
result = CURLE_OK;
return result;
}
#endif /* CURL_ENABLE_MQTT */

49
lib/mqtt.h Normal file
View File

@ -0,0 +1,49 @@
#ifndef HEADER_CURL_MQTT_H
#define HEADER_CURL_MQTT_H
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) 2019 - 2020, Björn Stenberg, <bjorn@haxx.se>
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.haxx.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
***************************************************************************/
#ifdef CURL_ENABLE_MQTT
extern const struct Curl_handler Curl_handler_mqtt;
#endif
struct mqtt_conn {
enum {
MQTT_CONNACK,
MQTT_SUBACK,
MQTT_SUBWAIT, /* wait for subscribe response */
MQTT_SUB_REMAIN /* wait for the remainder of the subscribe response */
} state;
unsigned int packetid;
};
/* protocol-specific transfer-related data */
struct MQTT {
char *sendleftovers;
size_t nsend; /* size of sendleftovers */
/* when receving a PUBLISH */
size_t npacket; /* byte counter */
unsigned char firstbyte;
};
#endif /* HEADER_CURL_MQTT_H */

View File

@ -114,6 +114,7 @@ bool curl_win32_idn_to_ascii(const char *in, char **out);
#include "http_ntlm.h"
#include "curl_rtmp.h"
#include "gopher.h"
#include "mqtt.h"
#include "http_proxy.h"
#include "conncache.h"
#include "multihandle.h"
@ -232,6 +233,10 @@ static const struct Curl_handler * const protocols[] = {
&Curl_handler_gopher,
#endif
#ifdef CURL_ENABLE_MQTT
&Curl_handler_mqtt,
#endif
#ifdef USE_LIBRTMP
&Curl_handler_rtmp,
&Curl_handler_rtmpt,

View File

@ -49,6 +49,7 @@
#define PORT_RTMPT PORT_HTTP
#define PORT_RTMPS PORT_HTTPS
#define PORT_GOPHER 70
#define PORT_MQTT 1883
#define DICT_MATCH "/MATCH:"
#define DICT_MATCH2 "/M:"
@ -128,6 +129,7 @@ typedef ssize_t (Curl_recv)(struct connectdata *conn, /* connection data */
#include "http.h"
#include "rtsp.h"
#include "smb.h"
#include "mqtt.h"
#include "wildcard.h"
#include "multihandle.h"
#include "quic.h"
@ -1081,6 +1083,7 @@ struct connectdata {
struct smb_conn smbc;
void *rtmp;
struct ldapconninfo *ldapc;
struct mqtt_conn mqtt;
} proto;
int cselect_bits; /* bitmask of socket events */

View File

@ -271,6 +271,9 @@ static const char * const protocols[] = {
"ldaps",
#endif
#endif
#ifdef CURL_ENABLE_MQTT
"mqtt",
#endif
#ifndef CURL_DISABLE_POP3
"pop3",
#endif