From 2522903b792ac5a802f780df60dc4647c58e2477 Mon Sep 17 00:00:00 2001 From: Bjorn Stenberg Date: Tue, 14 Apr 2020 11:19:12 +0200 Subject: [PATCH] mqtt: add new experimental protocol Closes #5173 --- CMakeLists.txt | 3 + configure.ac | 19 + docs/EXPERIMENTAL.md | 1 + docs/FEATURES | 3 + docs/MQTT.md | 61 +++ docs/Makefile.am | 1 + docs/cmdline-opts/data.d | 2 +- docs/cmdline-opts/page-header | 4 +- docs/libcurl/opts/CURLINFO_PROTOCOL.3 | 2 +- docs/libcurl/symbols-in-versions | 1 + include/curl/curl.h | 1 + lib/Makefile.inc | 101 +++-- lib/curl_config.h.cmake | 3 + lib/mqtt.c | 561 ++++++++++++++++++++++++++ lib/mqtt.h | 49 +++ lib/url.c | 5 + lib/urldata.h | 3 + lib/version.c | 3 + 18 files changed, 766 insertions(+), 57 deletions(-) create mode 100644 docs/MQTT.md create mode 100644 lib/mqtt.c create mode 100644 lib/mqtt.h diff --git a/CMakeLists.txt b/CMakeLists.txt index a410d4955..b8061d14d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -172,6 +172,8 @@ option(CURL_DISABLE_SMTP "to disable SMTP" OFF) mark_as_advanced(CURL_DISABLE_SMTP) option(CURL_DISABLE_GOPHER "to disable Gopher" OFF) mark_as_advanced(CURL_DISABLE_GOPHER) +option(CURL_ENABLE_MQTT "to enable MQTT" OFF) +mark_as_advanced(CURL_ENABLE_MQTT) if(HTTP_ONLY) set(CURL_DISABLE_FTP ON) @@ -1307,6 +1309,7 @@ _add_if("SCP" USE_LIBSSH2) _add_if("SFTP" USE_LIBSSH2) _add_if("RTSP" NOT CURL_DISABLE_RTSP) _add_if("RTMP" USE_LIBRTMP) +_add_if("MQTT" CURL_ENABLE_MQTT) if(_items) list(SORT _items) endif() diff --git a/configure.ac b/configure.ac index 0479099e8..0213b1cc9 100755 --- a/configure.ac +++ b/configure.ac @@ -637,6 +637,22 @@ AC_HELP_STRING([--disable-gopher],[Disable Gopher support]), AC_MSG_RESULT(yes) ) +AC_MSG_CHECKING([whether to support mqtt]) +AC_ARG_ENABLE(mqtt, +AC_HELP_STRING([--enable-mqtt],[Enable MQTT support]) +AC_HELP_STRING([--disable-mqtt],[Disable MQTT support]), +[ case "$enableval" in + no) + AC_MSG_RESULT(no) + ;; + *) AC_MSG_RESULT(yes) + experimental="$experimental MQTT" + AC_DEFINE(CURL_ENABLE_MQTT, 1, [to enable MQTT]) + AC_SUBST(CURL_ENABLE_MQTT, [1]) + ;; + esac ], + AC_MSG_RESULT(no) +) dnl ********************************************************************** dnl Check for built-in manual @@ -4843,6 +4859,9 @@ fi if test "x$CURL_DISABLE_GOPHER" != "x1"; then SUPPORT_PROTOCOLS="$SUPPORT_PROTOCOLS GOPHER" fi +if test "x$CURL_ENABLE_MQTT" = "x1"; then + SUPPORT_PROTOCOLS="$SUPPORT_PROTOCOLS MQTT" +fi if test "x$CURL_DISABLE_POP3" != "x1"; then SUPPORT_PROTOCOLS="$SUPPORT_PROTOCOLS POP3" if test "x$SSL_ENABLED" = "x1"; then diff --git a/docs/EXPERIMENTAL.md b/docs/EXPERIMENTAL.md index 6c33bcf53..34974fba8 100644 --- a/docs/EXPERIMENTAL.md +++ b/docs/EXPERIMENTAL.md @@ -20,3 +20,4 @@ Experimental support in curl means: - HTTP/3 support and options - alt-svc support and options + - MQTT diff --git a/docs/FEATURES b/docs/FEATURES index 68d38fc27..1d23fccbf 100644 --- a/docs/FEATURES +++ b/docs/FEATURES @@ -180,6 +180,9 @@ IMAPS (*1) - explicit "STARTTLS" usage to "upgrade" plain imap:// connections to use SSL - via http-proxy +MQTT + - Subscribe to and publish topics using url scheme mqtt://broker/topic + FOOTNOTES ========= diff --git a/docs/MQTT.md b/docs/MQTT.md new file mode 100644 index 000000000..1c7b678aa --- /dev/null +++ b/docs/MQTT.md @@ -0,0 +1,61 @@ +# MQTT in curl + +## Experimental! + +MQTT support in curl is considered **EXPERIMENTAL** until further notice. It +needs to be enabled at build-time. See below. + +After the initial merge, further development and tweaking of the MQTT support +in curl will happen in the master branch using pull-requests, just like +ordinary changes. + +Experimental support for MQTT means that we **do not guarantee** that the +current protocol functionality will remain or remain this way going forward. +There are no API or ABI promises for experimental features as for regular curl +features. + +Do not ship anything with this enabled. + +## Build + + ./configure --enable-mqtt + +## Usage + +A plain "GET" subscribes to the topic and prints all published messages. +Doing a "POST" publishes the post data to the topic and exits. + +Example subscribe: + + curl mqtt://host/home/bedroom/temp + +Example publish: + + curl -d 80 mqtt://host/home/bedroom/dimmer + +## What does curl deliver as a response to a subscribe + +It outputs two bytes topic length (MSB | LSB), the topic followed by the +payload. + +## Caveats + +Remaining limitations: + - No username support + - Only QoS level 0 is implemented for publish + - No way to set retain flag for publish + - No username/password support + - No TLS (mqtts) support + - Naive EAGAIN handling won't handle split messages + +## Work + +1. Write a mqtt server for the test suite +2. Create a few tests verifying the existing mqtt functionality +3. Work on fixing some of the worst limitations - with accompanying tests +4. Consider replacing the client-side MQTT code with wolfMQTT + +## Credits + +The initial MQTT patch was authored by Björn Stenberg. This work is built upon +that patch and has been expanded since. diff --git a/docs/Makefile.am b/docs/Makefile.am index 7acce0b04..6ead27b79 100644 --- a/docs/Makefile.am +++ b/docs/Makefile.am @@ -71,6 +71,7 @@ EXTRA_DIST = \ KNOWN_BUGS \ LICENSE-MIXING.md \ MAIL-ETIQUETTE \ + MQTT.md \ PARALLEL-TRANSFERS.md \ README.md \ RELEASE-PROCEDURE.md \ diff --git a/docs/cmdline-opts/data.d b/docs/cmdline-opts/data.d index 8b5200d34..280d38bc0 100644 --- a/docs/cmdline-opts/data.d +++ b/docs/cmdline-opts/data.d @@ -2,7 +2,7 @@ Long: data Short: d Arg: Help: HTTP POST data -Protocols: HTTP +Protocols: HTTP MQTT See-also: data-binary data-urlencode data-raw Mutexed: form head upload-file --- diff --git a/docs/cmdline-opts/page-header b/docs/cmdline-opts/page-header index 3f0b1c33e..60c3b07fe 100644 --- a/docs/cmdline-opts/page-header +++ b/docs/cmdline-opts/page-header @@ -31,8 +31,8 @@ curl \- transfer a URL .B curl is a tool to transfer data from or to a server, using one of the supported protocols (DICT, FILE, FTP, FTPS, GOPHER, HTTP, HTTPS, IMAP, IMAPS, LDAP, -LDAPS, POP3, POP3S, RTMP, RTSP, SCP, SFTP, SMB, SMBS, SMTP, SMTPS, TELNET -and TFTP). The command is designed to work without user interaction. +LDAPS, MQTT, POP3, POP3S, RTMP, RTSP, SCP, SFTP, SMB, SMBS, SMTP, SMTPS, +TELNET and TFTP). The command is designed to work without user interaction. curl offers a busload of useful tricks like proxy support, user authentication, FTP upload, HTTP post, SSL connections, cookies, file transfer diff --git a/docs/libcurl/opts/CURLINFO_PROTOCOL.3 b/docs/libcurl/opts/CURLINFO_PROTOCOL.3 index 2f5a3edfc..5825669c4 100644 --- a/docs/libcurl/opts/CURLINFO_PROTOCOL.3 +++ b/docs/libcurl/opts/CURLINFO_PROTOCOL.3 @@ -38,7 +38,7 @@ CURLPROTO_IMAPS, CURLPROTO_LDAP, CURLPROTO_LDAPS, CURLPROTO_POP3, CURLPROTO_POP3S, CURLPROTO_RTMP, CURLPROTO_RTMPE, CURLPROTO_RTMPS, CURLPROTO_RTMPT, CURLPROTO_RTMPTE, CURLPROTO_RTMPTS, CURLPROTO_RTSP, CURLPROTO_SCP, CURLPROTO_SFTP, CURLPROTO_SMB, CURLPROTO_SMBS, CURLPROTO_SMTP, -CURLPROTO_SMTPS, CURLPROTO_TELNET, CURLPROTO_TFTP +CURLPROTO_SMTPS, CURLPROTO_TELNET, CURLPROTO_TFTP, CURLPROTO_MQTT .SH PROTOCOLS All .SH EXAMPLE diff --git a/docs/libcurl/symbols-in-versions b/docs/libcurl/symbols-in-versions index c2ed53880..3b340ed8e 100644 --- a/docs/libcurl/symbols-in-versions +++ b/docs/libcurl/symbols-in-versions @@ -676,6 +676,7 @@ CURLPROTO_IMAP 7.20.0 CURLPROTO_IMAPS 7.20.0 CURLPROTO_LDAP 7.19.4 CURLPROTO_LDAPS 7.19.4 +CURLPROTO_MQTT 7.71.0 CURLPROTO_POP3 7.20.0 CURLPROTO_POP3S 7.20.0 CURLPROTO_RTMP 7.21.0 diff --git a/include/curl/curl.h b/include/curl/curl.h index 17f07b09f..00156a8b4 100644 --- a/include/curl/curl.h +++ b/include/curl/curl.h @@ -937,6 +937,7 @@ typedef enum { #define CURLPROTO_GOPHER (1<<25) #define CURLPROTO_SMB (1<<26) #define CURLPROTO_SMBS (1<<27) +#define CURLPROTO_MQTT (1<<28) #define CURLPROTO_ALL (~0) /* enable everything */ /* long may be 32 or 64 bits, but we should never depend on anything else diff --git a/lib/Makefile.inc b/lib/Makefile.inc index 46ded90bb..e3cf41891 100644 --- a/lib/Makefile.inc +++ b/lib/Makefile.inc @@ -20,71 +20,66 @@ # ########################################################################### -LIB_VAUTH_CFILES = vauth/vauth.c vauth/cleartext.c vauth/cram.c \ - vauth/digest.c vauth/digest_sspi.c vauth/krb5_gssapi.c \ - vauth/krb5_sspi.c vauth/ntlm.c vauth/ntlm_sspi.c vauth/oauth2.c \ - vauth/spnego_gssapi.c vauth/spnego_sspi.c +LIB_VAUTH_CFILES = vauth/cleartext.c vauth/cram.c vauth/digest.c \ + vauth/digest_sspi.c vauth/krb5_gssapi.c vauth/krb5_sspi.c vauth/ntlm.c \ + vauth/ntlm_sspi.c vauth/oauth2.c vauth/spnego_gssapi.c vauth/spnego_sspi.c \ + vauth/vauth.c -LIB_VAUTH_HFILES = vauth/vauth.h vauth/digest.h vauth/ntlm.h +LIB_VAUTH_HFILES = vauth/digest.h vauth/ntlm.h vauth/vauth.h -LIB_VTLS_CFILES = vtls/openssl.c vtls/gtls.c vtls/vtls.c vtls/nss.c \ - vtls/mbedtls_threadlock.c vtls/wolfssl.c vtls/schannel.c \ - vtls/schannel_verify.c vtls/sectransp.c vtls/gskit.c vtls/mbedtls.c \ - vtls/mesalink.c vtls/bearssl.c +LIB_VTLS_CFILES = vtls/bearssl.c vtls/gskit.c vtls/gtls.c vtls/mbedtls.c \ + vtls/mbedtls_threadlock.c vtls/mesalink.c vtls/nss.c vtls/openssl.c \ + vtls/schannel.c vtls/schannel_verify.c vtls/sectransp.c vtls/vtls.c \ + vtls/wolfssl.c -LIB_VTLS_HFILES = vtls/openssl.h vtls/vtls.h vtls/gtls.h vtls/nssg.h \ - vtls/mbedtls_threadlock.h vtls/wolfssl.h vtls/schannel.h \ - vtls/sectransp.h vtls/gskit.h vtls/mbedtls.h vtls/mesalink.h \ - vtls/bearssl.h +LIB_VTLS_HFILES = vtls/bearssl.h vtls/gskit.h vtls/gtls.h vtls/mbedtls.h \ + vtls/mbedtls_threadlock.h vtls/mesalink.h vtls/nssg.h vtls/openssl.h \ + vtls/schannel.h vtls/sectransp.h vtls/vtls.h vtls/wolfssl.h LIB_VQUIC_CFILES = vquic/ngtcp2.c vquic/quiche.c LIB_VQUIC_HFILES = vquic/ngtcp2.h vquic/quiche.h -LIB_VSSH_CFILES = vssh/libssh2.c vssh/libssh.c vssh/wolfssh.c +LIB_VSSH_CFILES = vssh/libssh.c vssh/libssh2.c vssh/wolfssh.c LIB_VSSH_HFILES = vssh/ssh.h -LIB_CFILES = file.c timeval.c base64.c hostip.c progress.c formdata.c \ - cookie.c http.c sendf.c ftp.c url.c dict.c if2ip.c speedcheck.c \ - ldap.c version.c getenv.c escape.c mprintf.c telnet.c netrc.c \ - getinfo.c transfer.c strcase.c easy.c security.c curl_fnmatch.c \ - fileinfo.c ftplistparser.c wildcard.c krb5.c memdebug.c http_chunks.c \ - strtok.c connect.c llist.c hash.c multi.c content_encoding.c share.c \ - http_digest.c md4.c md5.c http_negotiate.c inet_pton.c strtoofft.c \ - strerror.c amigaos.c hostasyn.c hostip4.c hostip6.c hostsyn.c \ - inet_ntop.c parsedate.c select.c tftp.c splay.c strdup.c socks.c \ - curl_addrinfo.c socks_gssapi.c socks_sspi.c \ - curl_sspi.c slist.c nonblock.c curl_memrchr.c imap.c pop3.c smtp.c \ - pingpong.c rtsp.c curl_threads.c warnless.c hmac.c curl_rtmp.c \ - openldap.c curl_gethostname.c gopher.c idn_win32.c \ - http_proxy.c non-ascii.c asyn-ares.c asyn-thread.c curl_gssapi.c \ - http_ntlm.c curl_ntlm_wb.c curl_ntlm_core.c curl_sasl.c rand.c \ - curl_multibyte.c hostcheck.c conncache.c dotdot.c \ - x509asn1.c http2.c smb.c curl_endian.c curl_des.c system_win32.c \ - mime.c sha256.c setopt.c curl_path.c curl_ctype.c curl_range.c psl.c \ - doh.c urlapi.c curl_get_line.c altsvc.c socketpair.c rename.c +LIB_CFILES = altsvc.c amigaos.c asyn-ares.c asyn-thread.c base64.c \ + conncache.c connect.c content_encoding.c cookie.c curl_addrinfo.c \ + curl_ctype.c curl_des.c curl_endian.c curl_fnmatch.c curl_get_line.c \ + curl_gethostname.c curl_gssapi.c curl_memrchr.c curl_multibyte.c \ + curl_ntlm_core.c curl_ntlm_wb.c curl_path.c curl_range.c curl_rtmp.c \ + curl_sasl.c curl_sspi.c curl_threads.c dict.c dotdot.c easy.c escape.c \ + file.c fileinfo.c formdata.c ftp.c url.c ftplistparser.c getenv.c getinfo.c \ + gopher.c hash.c hmac.c hostasyn.c hostcheck.c hostip.c hostip4.c hostip6.c \ + hostsyn.c http.c http2.c http_chunks.c http_digest.c http_negotiate.c \ + http_ntlm.c http_proxy.c idn_win32.c if2ip.c imap.c inet_ntop.c inet_pton.c \ + krb5.c ldap.c llist.c md4.c md5.c memdebug.c mime.c mprintf.c mqtt.c \ + multi.c netrc.c non-ascii.c nonblock.c openldap.c parsedate.c pingpong.c \ + pop3.c progress.c psl.c doh.c rand.c rename.c rtsp.c security.c select.c \ + sendf.c setopt.c sha256.c share.c slist.c smb.c smtp.c socketpair.c socks.c \ + socks_gssapi.c socks_sspi.c speedcheck.c splay.c strcase.c strdup.c \ + strerror.c strtok.c strtoofft.c system_win32.c telnet.c tftp.c timeval.c \ + transfer.c urlapi.c version.c warnless.c wildcard.c x509asn1.c -LIB_HFILES = arpa_telnet.h netrc.h file.h timeval.h hostip.h progress.h \ - formdata.h cookie.h http.h sendf.h ftp.h url.h dict.h if2ip.h \ - speedcheck.h urldata.h curl_ldap.h escape.h telnet.h getinfo.h \ - strcase.h curl_sec.h memdebug.h http_chunks.h curl_fnmatch.h \ - wildcard.h fileinfo.h ftplistparser.h strtok.h connect.h llist.h \ - hash.h content_encoding.h share.h curl_md4.h curl_md5.h http_digest.h \ - http_negotiate.h inet_pton.h amigaos.h strtoofft.h strerror.h \ - inet_ntop.h curlx.h curl_memory.h curl_setup.h transfer.h select.h \ - easyif.h multiif.h parsedate.h tftp.h sockaddr.h splay.h strdup.h \ - socks.h curl_base64.h curl_addrinfo.h curl_sspi.h \ - slist.h nonblock.h curl_memrchr.h imap.h pop3.h smtp.h pingpong.h \ - rtsp.h curl_threads.h warnless.h curl_hmac.h curl_rtmp.h \ - curl_gethostname.h gopher.h http_proxy.h non-ascii.h asyn.h \ - http_ntlm.h curl_gssapi.h curl_ntlm_wb.h curl_ntlm_core.h \ - curl_sasl.h curl_multibyte.h hostcheck.h conncache.h \ - curl_setup_once.h multihandle.h setup-vms.h dotdot.h \ - x509asn1.h http2.h sigpipe.h smb.h curl_endian.h curl_des.h \ - curl_printf.h system_win32.h rand.h mime.h curl_sha256.h setopt.h \ - curl_path.h curl_ctype.h curl_range.h psl.h doh.h urlapi-int.h \ - curl_get_line.h altsvc.h quic.h socketpair.h rename.h +LIB_HFILES = altsvc.h amigaos.h arpa_telnet.h asyn.h conncache.h connect.h \ + content_encoding.h cookie.h curl_addrinfo.h curl_base64.h curl_ctype.h \ + curl_des.h curl_endian.h curl_fnmatch.h curl_get_line.h curl_gethostname.h \ + curl_gssapi.h curl_hmac.h curl_ldap.h curl_md4.h curl_md5.h curl_memory.h \ + curl_memrchr.h curl_multibyte.h curl_ntlm_core.h curl_ntlm_wb.h curl_path.h \ + curl_printf.h curl_range.h curl_rtmp.h curl_sasl.h curl_sec.h curl_setup.h \ + curl_setup_once.h curl_sha256.h curl_sspi.h curl_threads.h curlx.h dict.h \ + dotdot.h easyif.h escape.h file.h fileinfo.h formdata.h ftp.h url.h \ + ftplistparser.h getinfo.h gopher.h hash.h hostcheck.h hostip.h http.h \ + http2.h http_chunks.h http_digest.h http_negotiate.h http_ntlm.h \ + http_proxy.h if2ip.h imap.h inet_ntop.h inet_pton.h llist.h memdebug.h \ + mime.h mqtt.h multihandle.h multiif.h netrc.h non-ascii.h nonblock.h \ + parsedate.h pingpong.h pop3.h progress.h psl.h doh.h quic.h rand.h rename.h \ + rtsp.h select.h sendf.h setopt.h setup-vms.h share.h sigpipe.h slist.h \ + smb.h smtp.h sockaddr.h socketpair.h socks.h speedcheck.h splay.h strcase.h \ + strdup.h strerror.h strtok.h strtoofft.h system_win32.h telnet.h tftp.h \ + timeval.h transfer.h urlapi-int.h urldata.h warnless.h wildcard.h \ + x509asn1.h LIB_RCFILES = libcurl.rc diff --git a/lib/curl_config.h.cmake b/lib/curl_config.h.cmake index 24b693eec..57a86e50a 100644 --- a/lib/curl_config.h.cmake +++ b/lib/curl_config.h.cmake @@ -63,6 +63,9 @@ /* to disable LDAPS */ #cmakedefine CURL_DISABLE_LDAPS 1 +/* to enable MQTT */ +#undef CURL_ENABLE_MQTT + /* to disable POP3 */ #cmakedefine CURL_DISABLE_POP3 1 diff --git a/lib/mqtt.c b/lib/mqtt.c new file mode 100644 index 000000000..3e244694d --- /dev/null +++ b/lib/mqtt.c @@ -0,0 +1,561 @@ +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) 2020, Daniel Stenberg, , et al. + * Copyright (C) 2019, Björn Stenberg, + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.haxx.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + ***************************************************************************/ + +#include "curl_setup.h" + +#ifdef CURL_ENABLE_MQTT + +#include "urldata.h" +#include +#include "transfer.h" +#include "sendf.h" +#include "progress.h" +#include "mqtt.h" +#include "select.h" +#include "strdup.h" +#include "url.h" +#include "escape.h" +#include "warnless.h" +#include "curl_printf.h" +#include "curl_memory.h" +#include "multiif.h" +#include "rand.h" + +/* The last #include file should be: */ +#include "memdebug.h" + +#define MQTT_MSG_CONNECT 0x10 +#define MQTT_MSG_CONNACK 0x20 +#define MQTT_MSG_PUBLISH 0x30 +#define MQTT_MSG_SUBSCRIBE 0x82 +#define MQTT_MSG_SUBACK 0x90 +#define MQTT_MSG_DISCONNECT 0xe0 + +#define MQTT_CONNACK_LEN 4 +#define MQTT_SUBACK_LEN 5 +#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ + +/* + * Forward declarations. + */ + +static CURLcode mqtt_do(struct connectdata *conn, bool *done); +static CURLcode mqtt_doing(struct connectdata *conn, bool *done); +static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock); +static CURLcode mqtt_setup_conn(struct connectdata *conn); + +/* + * MQTT protocol handler. + */ + +const struct Curl_handler Curl_handler_mqtt = { + "MQTT", /* scheme */ + mqtt_setup_conn, /* setup_connection */ + mqtt_do, /* do_it */ + ZERO_NULL, /* done */ + ZERO_NULL, /* do_more */ + ZERO_NULL, /* connect_it */ + ZERO_NULL, /* connecting */ + mqtt_doing, /* doing */ + ZERO_NULL, /* proto_getsock */ + mqtt_getsock, /* doing_getsock */ + ZERO_NULL, /* domore_getsock */ + ZERO_NULL, /* perform_getsock */ + ZERO_NULL, /* disconnect */ + ZERO_NULL, /* readwrite */ + ZERO_NULL, /* connection_check */ + PORT_MQTT, /* defport */ + CURLPROTO_MQTT, /* protocol */ + PROTOPT_NONE /* flags */ +}; + +static CURLcode mqtt_setup_conn(struct connectdata *conn) +{ + /* allocate the HTTP-specific struct for the Curl_easy, only to survive + during this request */ + struct MQTT *mq; + struct Curl_easy *data = conn->data; + DEBUGASSERT(data->req.protop == NULL); + + mq = calloc(1, sizeof(struct MQTT)); + if(!mq) + return CURLE_OUT_OF_MEMORY; + data->req.protop = mq; + return CURLE_OK; +} + +static CURLcode mqtt_send(struct connectdata *conn, + char *buf, size_t len) +{ + CURLcode result = CURLE_OK; + curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; + struct Curl_easy *data = conn->data; + struct MQTT *mq = data->req.protop; + ssize_t n; + result = Curl_write(conn, sockfd, buf, len, &n); + if(!result && data->set.verbose) + Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n); + if(len != (size_t)n) { + size_t nsend = len - n; + char *sendleftovers = Curl_memdup(&buf[n], nsend); + if(!sendleftovers) + return CURLE_OUT_OF_MEMORY; + mq->sendleftovers = sendleftovers; + mq->nsend = nsend; + } + return result; +} + +/* Generic function called by the multi interface to figure out what socket(s) + to wait for and for what actions during the DOING and PROTOCONNECT + states */ +static int mqtt_getsock(struct connectdata *conn, + curl_socket_t *sock) +{ + sock[0] = conn->sock[FIRSTSOCKET]; + return GETSOCK_READSOCK(FIRSTSOCKET); +} + +static CURLcode mqtt_connect(struct connectdata *conn) +{ + CURLcode result = CURLE_OK; + const size_t client_id_offset = 14; + const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN; + char client_id[MQTT_CLIENTID_LEN + 1] = "curl"; + const size_t curl_len = strlen("curl"); + char packet[32] = { + MQTT_MSG_CONNECT, /* packet type */ + 0x00, /* remaining length */ + 0x00, 0x04, /* protocol length */ + 'M','Q','T','T', /* protocol name */ + 0x04, /* protocol level */ + 0x02, /* CONNECT flag: CleanSession */ + 0x00, 0x3c, /* keep-alive 0 = disabled */ + 0x00, 0x00 /* payload1 length */ + }; + packet[1] = (packetlen - 2) & 0x7f; + packet[client_id_offset - 1] = MQTT_CLIENTID_LEN; + + result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[curl_len], + MQTT_CLIENTID_LEN - curl_len + 1); + memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN); + infof(conn->data, "Using client id '%s'\n", client_id); + if(!result) + result = mqtt_send(conn, packet, packetlen); + return result; +} + +static CURLcode mqtt_disconnect(struct connectdata *conn) +{ + CURLcode result = CURLE_OK; + result = mqtt_send(conn, (char *)"\xe0\x00", 2); + return result; +} + +static CURLcode mqtt_verify_connack(struct connectdata *conn) +{ + CURLcode result; + curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; + unsigned char readbuf[MQTT_CONNACK_LEN]; + ssize_t nread; + struct Curl_easy *data = conn->data; + + result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread); + if(result) + goto fail; + + if(data->set.verbose) + Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread); + + /* fixme */ + if(nread < MQTT_CONNACK_LEN) { + result = CURLE_WEIRD_SERVER_REPLY; + goto fail; + } + + /* verify CONNACK */ + if(readbuf[0] != MQTT_MSG_CONNACK || + readbuf[1] != 0x02 || + readbuf[2] != 0x00 || + readbuf[3] != 0x00) { + failf(data, "Expected %02x%02x%02x%02x but got %02x%02x%02x%02x", + MQTT_MSG_CONNACK, 0x02, 0x00, 0x00, + readbuf[0], readbuf[1], readbuf[2], readbuf[3]); + result = CURLE_WEIRD_SERVER_REPLY; + } + +fail: + return result; +} + +static CURLcode mqtt_get_topic(struct connectdata *conn, + char **topic, size_t *topiclen) +{ + CURLcode result = CURLE_OK; + char *path = conn->data->state.up.path; + + if(strlen(path) > 1) { + result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen, FALSE); + } + else { + failf(conn->data, "Error: No topic specified."); + result = CURLE_URL_MALFORMAT; + } + return result; +} + + +static int mqtt_encode_len(char *buf, size_t len) +{ + unsigned char encoded; + int i; + + for(i = 0; (len > 0) && (i<4); i++) { + encoded = len % 0x80; + len /= 0x80; + if(len) + encoded |= 0x80; + buf[i] = encoded; + } + + return i; +} + +static CURLcode mqtt_subscribe(struct connectdata *conn) +{ + CURLcode result = CURLE_OK; + char *topic = NULL; + size_t topiclen; + unsigned char *packet = NULL; + size_t packetlen; + char encodedsize[4]; + size_t n; + + result = mqtt_get_topic(conn, &topic, &topiclen); + if(result) + goto fail; + + conn->proto.mqtt.packetid++; + + packetlen = topiclen + 5; /* packetid + topic (has a two byte length field) + + 2 bytes topic length + QoS byte */ + n = mqtt_encode_len((char *)encodedsize, packetlen); + packetlen += n + 1; /* add one for the control packet type byte */ + + packet = malloc(packetlen); + if(!packet) { + result = CURLE_OUT_OF_MEMORY; + goto fail; + } + + packet[0] = MQTT_MSG_SUBSCRIBE; + memcpy(&packet[1], encodedsize, n); + packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff; + packet[2 + n] = conn->proto.mqtt.packetid & 0xff; + packet[3 + n] = (topiclen >> 8) & 0xff; + packet[4 + n ] = topiclen & 0xff; + memcpy(&packet[5 + n], topic, topiclen); + packet[5 + n + topiclen] = 0; /* QoS zero */ + + result = mqtt_send(conn, (char *)packet, packetlen); + +fail: + free(topic); + free(packet); + return result; +} + +static CURLcode mqtt_verify_suback(struct connectdata *conn) +{ + CURLcode result; + curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; + unsigned char readbuf[MQTT_SUBACK_LEN]; + ssize_t nread; + struct mqtt_conn *mqtt = &conn->proto.mqtt; + + result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread); + if(result) + goto fail; + + if(conn->data->set.verbose) + Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread); + + /* fixme */ + if(nread < MQTT_SUBACK_LEN) { + result = CURLE_WEIRD_SERVER_REPLY; + goto fail; + } + + /* verify SUBACK */ + if(readbuf[0] != MQTT_MSG_SUBACK || + readbuf[1] != 0x03 || + readbuf[2] != ((mqtt->packetid >> 8) & 0xff) || + readbuf[3] != (mqtt->packetid & 0xff) || + readbuf[4] != 0x00) + result = CURLE_WEIRD_SERVER_REPLY; + +fail: + return result; +} + +static CURLcode mqtt_publish(struct connectdata *conn) +{ + CURLcode result; + char *payload = conn->data->set.postfields; + size_t payloadlen = (size_t)conn->data->set.postfieldsize; + char *topic = NULL; + size_t topiclen; + unsigned char *pkt = NULL; + size_t i = 0; + size_t remaininglength; + size_t encodelen; + char encodedbytes[4]; + + result = mqtt_get_topic(conn, &topic, &topiclen); + if(result) + goto fail; + + remaininglength = payloadlen + 2 + topiclen; + encodelen = mqtt_encode_len(encodedbytes, remaininglength); + + /* add the control byte and the encoded remaining length */ + pkt = malloc(remaininglength + 1 + encodelen); + if(!pkt) { + result = CURLE_OUT_OF_MEMORY; + goto fail; + } + + /* assemble packet */ + pkt[i++] = MQTT_MSG_PUBLISH; + memcpy(&pkt[i], encodedbytes, encodelen); + i += encodelen; + pkt[i++] = (topiclen >> 8) & 0xff; + pkt[i++] = (topiclen & 0xff); + memcpy(&pkt[i], topic, topiclen); + i += topiclen; + memcpy(&pkt[i], payload, payloadlen); + i += payloadlen; + result = mqtt_send(conn, (char *)pkt, i); + +fail: + free(pkt); + free(topic); + return result; +} + +static size_t mqtt_decode_len(unsigned char *buf, + size_t buflen, size_t *lenbytes) +{ + size_t len = 0; + size_t mult = 1; + size_t i; + unsigned char encoded = 128; + + for(i = 0; (i < buflen) && (encoded & 128); i++) { + encoded = buf[i]; + len += (encoded & 127) * mult; + mult *= 128; + } + + *lenbytes = i; + + return len; +} + +/* for the publish packet */ +#define MQTT_HEADER_LEN 5 /* max 5 bytes */ + +static CURLcode mqtt_read_publish(struct connectdata *conn, + bool *done) +{ + CURLcode result; + curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; + ssize_t nread; + struct Curl_easy *data = conn->data; + unsigned char *pkt = (unsigned char *)data->state.buffer; + size_t remlen, lenbytes; + struct mqtt_conn *mqtt = &conn->proto.mqtt; + struct MQTT *mq = data->req.protop; + + switch(mqtt->state) { + case MQTT_SUBWAIT: + /* Read the initial byte and the entire Remaining Length field + in this state */ + result = Curl_read(conn, sockfd, (char *)&pkt[mq->npacket], 1, &nread); + if(result) + goto end; + if(data->set.verbose) + Curl_debug(data, CURLINFO_HEADER_IN, (char *)&pkt[mq->npacket], 1); + /* we are expecting a PUBLISH message */ + if(!mq->npacket && ((pkt[0] & 0xf0) != MQTT_MSG_PUBLISH)) { + if(pkt[0] == MQTT_MSG_DISCONNECT) { + infof(data, "Got DISCONNECT\n"); + *done = TRUE; + goto end; + } + result = CURLE_WEIRD_SERVER_REPLY; + goto end; + } + else if((mq->npacket >= 1) && !(pkt[mq->npacket] & 0x80)) + /* as long as the high bit is set in the length byte, we read one more + byte, then get the remainder of the PUBLISH */ + mqtt->state = MQTT_SUB_REMAIN; + mq->npacket++; + if(mqtt->state == MQTT_SUBWAIT) + return result; + + /* -- switched state -- */ + + /* remember the first byte */ + mq->firstbyte = pkt[0]; + + remlen = mqtt_decode_len(&pkt[1], 4, &lenbytes); + + infof(data, "Remaining length: %zd bytes\n", remlen); + Curl_pgrsSetDownloadSize(data, remlen); + data->req.bytecount = 0; + data->req.size = remlen; + mq->npacket = remlen; /* get this many bytes */ + /* FALLTHROUGH */ + case MQTT_SUB_REMAIN: { + /* read rest of packet, but no more. Cap to buffer size */ + struct SingleRequest *k = &data->req; + size_t rest = mq->npacket; + if(rest > (size_t)data->set.buffer_size) + rest = (size_t)data->set.buffer_size; + result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread); + if(result) { + if(CURLE_AGAIN == result) { + infof(data, "EEEE AAAAGAIN\n"); + } + goto end; + } + if(data->set.verbose) + Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread); + + mq->npacket -= nread; + k->bytecount += nread; + Curl_pgrsSetDownloadCounter(data, k->bytecount); + + /* if QoS is set, message contains packet id */ + + result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread); + if(result) + goto end; + + if(!mq->npacket) + /* no more PUBLISH payload, back to subscribe wait state */ + mqtt->state = MQTT_SUBWAIT; + break; + } + default: + DEBUGASSERT(NULL); /* illegal state */ + result = CURLE_WEIRD_SERVER_REPLY; + goto end; + } + end: + return result; +} + +static CURLcode mqtt_do(struct connectdata *conn, bool *done) +{ + CURLcode result = CURLE_OK; + struct Curl_easy *data = conn->data; + struct mqtt_conn *mqtt = &conn->proto.mqtt; + + *done = FALSE; /* unconditionally */ + + result = mqtt_connect(conn); + if(result) { + failf(data, "Error %d sending MQTT CONN request", result); + return result; + } + mqtt->state = MQTT_CONNACK; + return CURLE_OK; +} + +static CURLcode mqtt_doing(struct connectdata *conn, bool *done) +{ + CURLcode result = CURLE_OK; + struct mqtt_conn *mqtt = &conn->proto.mqtt; + struct Curl_easy *data = conn->data; + struct MQTT *mq = data->req.protop; + + *done = FALSE; + + if(mq->nsend) { + /* send the remainder of an outgoing packet */ + char *ptr = mq->sendleftovers; + result = mqtt_send(conn, mq->sendleftovers, mq->nsend); + free(ptr); + if(result) + return result; + } + + switch(mqtt->state) { + case MQTT_CONNACK: + result = mqtt_verify_connack(conn); + if(result) + break; + + if(conn->data->set.httpreq == HTTPREQ_POST) { + result = mqtt_publish(conn); + if(!result) { + result = mqtt_disconnect(conn); + *done = TRUE; + } + } + else { + result = mqtt_subscribe(conn); + if(!result) + mqtt->state = MQTT_SUBACK; + } + break; + + case MQTT_SUBACK: + result = mqtt_verify_suback(conn); + if(result) + break; + + mqtt->state = MQTT_SUBWAIT; + break; + + case MQTT_SUBWAIT: + case MQTT_SUB_REMAIN: + result = mqtt_read_publish(conn, done); + if(result) + break; + break; + + default: + failf(conn->data, "State not handled yet"); + *done = TRUE; + break; + } + + if(result == CURLE_AGAIN) + result = CURLE_OK; + return result; +} + +#endif /* CURL_ENABLE_MQTT */ diff --git a/lib/mqtt.h b/lib/mqtt.h new file mode 100644 index 000000000..b5e447be5 --- /dev/null +++ b/lib/mqtt.h @@ -0,0 +1,49 @@ +#ifndef HEADER_CURL_MQTT_H +#define HEADER_CURL_MQTT_H +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) 2019 - 2020, Björn Stenberg, + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.haxx.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + ***************************************************************************/ + +#ifdef CURL_ENABLE_MQTT +extern const struct Curl_handler Curl_handler_mqtt; +#endif + +struct mqtt_conn { + enum { + MQTT_CONNACK, + MQTT_SUBACK, + MQTT_SUBWAIT, /* wait for subscribe response */ + MQTT_SUB_REMAIN /* wait for the remainder of the subscribe response */ + } state; + unsigned int packetid; +}; + +/* protocol-specific transfer-related data */ +struct MQTT { + char *sendleftovers; + size_t nsend; /* size of sendleftovers */ + + /* when receving a PUBLISH */ + size_t npacket; /* byte counter */ + unsigned char firstbyte; +}; + +#endif /* HEADER_CURL_MQTT_H */ diff --git a/lib/url.c b/lib/url.c index 4c62b50ec..03c274438 100644 --- a/lib/url.c +++ b/lib/url.c @@ -114,6 +114,7 @@ bool curl_win32_idn_to_ascii(const char *in, char **out); #include "http_ntlm.h" #include "curl_rtmp.h" #include "gopher.h" +#include "mqtt.h" #include "http_proxy.h" #include "conncache.h" #include "multihandle.h" @@ -232,6 +233,10 @@ static const struct Curl_handler * const protocols[] = { &Curl_handler_gopher, #endif +#ifdef CURL_ENABLE_MQTT + &Curl_handler_mqtt, +#endif + #ifdef USE_LIBRTMP &Curl_handler_rtmp, &Curl_handler_rtmpt, diff --git a/lib/urldata.h b/lib/urldata.h index 2a36c1147..6e426a29f 100644 --- a/lib/urldata.h +++ b/lib/urldata.h @@ -49,6 +49,7 @@ #define PORT_RTMPT PORT_HTTP #define PORT_RTMPS PORT_HTTPS #define PORT_GOPHER 70 +#define PORT_MQTT 1883 #define DICT_MATCH "/MATCH:" #define DICT_MATCH2 "/M:" @@ -128,6 +129,7 @@ typedef ssize_t (Curl_recv)(struct connectdata *conn, /* connection data */ #include "http.h" #include "rtsp.h" #include "smb.h" +#include "mqtt.h" #include "wildcard.h" #include "multihandle.h" #include "quic.h" @@ -1081,6 +1083,7 @@ struct connectdata { struct smb_conn smbc; void *rtmp; struct ldapconninfo *ldapc; + struct mqtt_conn mqtt; } proto; int cselect_bits; /* bitmask of socket events */ diff --git a/lib/version.c b/lib/version.c index 4d7c2d0a3..47204e881 100644 --- a/lib/version.c +++ b/lib/version.c @@ -271,6 +271,9 @@ static const char * const protocols[] = { "ldaps", #endif #endif +#ifdef CURL_ENABLE_MQTT + "mqtt", +#endif #ifndef CURL_DISABLE_POP3 "pop3", #endif