597 lines
24 KiB
C
597 lines
24 KiB
C
//
|
|
// Copyright 2023 NanoMQ Team, Inc. <jaylin@emqx.io>
|
|
//
|
|
// This software is supplied under the terms of the MIT License, a
|
|
// copy of which should be located in the distribution where this
|
|
// file was obtained (LICENSE.txt). A copy of the license may also be
|
|
// found online at https://opensource.org/licenses/MIT.
|
|
//
|
|
|
|
// This file is for the MQTT client implementation.
|
|
// Note that while there are some similarities, MQTT is sufficiently
|
|
// different enough from SP that many of the APIs cannot be easily
|
|
// shared.
|
|
//
|
|
// At this time there is no server provided by NNG itself, although
|
|
// the nanomq project provides such a server (and is based on NNG.)
|
|
//
|
|
// About our semantics:
|
|
//
|
|
// 1. MQTT client sockets have a single implicit dialer, and cannot
|
|
// support creation of additional dialers or listeners.
|
|
// 2. MQTT client sockets do support contexts; each context will
|
|
// maintain its own subscriptions, and the socket will keep a
|
|
// per-socket list of them and manage the full list.
|
|
// 3. Send sends PUBLISH messages.
|
|
// 4. Receive is used to receive published data from the server.
|
|
// 5. Most of the MQTT specific "features" are as options on the socket,
|
|
// dialer, or even the message. (For example message topics are set
|
|
// as options on the message.)
|
|
// 6. Pipe events can be used to detect connect/disconnect events.
|
|
// 7. Any QoS details such as retransmit, etc. are handled under the hood.
|
|
// This includes packet IDs.
|
|
// 8. PING and keep-alive is handled under the hood.
|
|
// 9. For publish actions, a separate method is used (not send/receive).
|
|
|
|
#ifndef NNG_MQTT_CLIENT_H
|
|
#define NNG_MQTT_CLIENT_H
|
|
|
|
#include <nng/nng.h>
|
|
#include <stdbool.h>
|
|
#include <stdint.h>
|
|
|
|
#ifdef __cplusplus
|
|
extern "C" {
|
|
#endif
|
|
|
|
#define MQTT_PROTOCOL_NAME_v31 "MQIsdp"
|
|
#define MQTT_PROTOCOL_VERSION_v31 3
|
|
#define MQTT_PROTOCOL_NAME "MQTT"
|
|
#define MQTT_PROTOCOL_VERSION_v311 4
|
|
#define MQTT_PROTOCOL_VERSION_v5 5
|
|
|
|
// NNG_OPT_MQTT_EXPIRES is a 32-bit integer representing the expiration in
|
|
// seconds. This can be applied to a message.
|
|
// (TODO: What about session expiry?)
|
|
#define NNG_OPT_MQTT_EXPIRES "expires"
|
|
|
|
#define NNG_OPT_MQTT_CONNMSG "mqtt-connect-msg"
|
|
|
|
#define NNG_OPT_MQTT_CONNECT_PROPERTY "mqtt-connack-property"
|
|
|
|
#define NNG_OPT_MQTT_CONNECT_REASON "mqtt-connack-reason"
|
|
|
|
#define NNG_OPT_MQTT_RECONNECT_BACKOFF_MAX "mqtt-reconnect-backoff-max"
|
|
|
|
#define NNG_OPT_MQTT_DISCONNECT_PROPERTY "mqtt-disconnect-property"
|
|
|
|
#define NNG_OPT_MQTT_RETRY_INTERVAL "mqtt-client-retry-interval"
|
|
|
|
#define NNG_OPT_MQTT_RETRY_WAIT_TIME "mqtt-client-retry-wait-time"
|
|
|
|
#define NNG_OPT_MQTT_DISCONNECT_REASON "mqtt-disconnect-reason"
|
|
|
|
#define NNG_OPT_MQTT_SQLITE "mqtt-sqlite-option"
|
|
|
|
#define NNG_OPT_MQTT_ENABLE_SCRAM "mqtt-scram-option"
|
|
|
|
// NNG_OPT_MQTT_QOS is a byte (only lower two bits significant) representing
|
|
// the quality of service. At this time, only level zero is supported.
|
|
// TODO: level 1 and level 2 QoS
|
|
#define NNG_OPT_MQTT_QOS "qos"
|
|
|
|
// NNG_OPT_MQTT_RETAIN indicates that the message should be retained on
|
|
// the server as the single retained message for the associated topic.
|
|
// This is a boolean.
|
|
#define NNG_OPT_MQTT_RETAIN "retain"
|
|
|
|
// NNG_OPT_MQTT_DUP indicates that the message is a duplicate. This can
|
|
// only be returned on a message -- this client will add this flag if
|
|
// sending a duplicate message (QoS 1 and 2 only).
|
|
#define NNG_OPT_MQTT_DUP "dup"
|
|
|
|
// NNG_OPT_MQTT_TOPIC is the message topic. It is encoded as an "Encoded
|
|
// UTF-8 string" (uint16 length followed by UTF-8 data). At the API, it
|
|
// is just a UTF-8 string (C style, with a terminating byte.) Note that
|
|
// we do not support embedded NUL bytes in our UTF-8 strings. Every
|
|
// MQTT published message must have a topic.
|
|
#define NNG_OPT_MQTT_TOPIC "topic"
|
|
|
|
// NNG_OPT_MQTT_REASON is a reason that can be conveyed with a message.
|
|
// It is a UTF-8 string.
|
|
#define NNG_OPT_MQTT_REASON "reason"
|
|
|
|
// NNG_OPT_MQTT_USER_PROPS is an array of user properties. These are
|
|
// send with the message, and used for application specific purposes.
|
|
// The properties are of the type nng_mqtt_user_props_t.
|
|
#define NNG_OPT_MQTT_USER_PROPS "user-props"
|
|
|
|
// NNG_OPT_MQTT_PAYLOAD_FORMAT is the format of the payload for a message.
|
|
// It can be 0, indicating binary data, or 1, indicating UTF-8.
|
|
#define NNG_OPT_MQTT_PAYLOAD_FORMAT "mqtt-payload-format"
|
|
|
|
// NNG_OPT_MQTT_CONTENT_TYPE is the mime type as UTF-8 for PUBLISH
|
|
// or Will messages.
|
|
#define NNG_OPT_MQTT_CONTENT_TYPE "content-type"
|
|
|
|
// The following options are reserved for MQTT v5.0 request/reply support.
|
|
#define NNG_OPT_MQTT_RESPONSE_TOPIC "response-topic"
|
|
#define NNG_OPT_MQTT_CORRELATION_DATA "correlation-data"
|
|
|
|
// NNG_OPT_MQTT_CLIENT_ID is the UTF-8 string corresponding to the client
|
|
// identification. We automatically generate an initial value fo this,
|
|
// which is the UUID.
|
|
// TODO: Should applications be permitted to change this?
|
|
#define NNG_OPT_MQTT_CLIENT_ID "client-id" // UTF-8 string
|
|
|
|
#define NNG_OPT_MQTT_WILL_DELAY "will-delay"
|
|
|
|
// NNG_OPT_MQTT_RECEIVE_MAX is used with QoS 1 or 2 (not implemented),
|
|
// and indicates the level of concurrent receives it is willing to
|
|
// process. (TODO: Implementation note: we will need to preallocate a complete
|
|
// state machine (aio, plus any state) for each value of this > 0.
|
|
// It's not clear whether this should be tunable.) This is read-only
|
|
// property on the socket, and records the value given from the server.
|
|
// It will be 64K if the server did not indicate a specific value.
|
|
#define NNG_OPT_MQTT_RECEIVE_MAX "mqtt-receive-max"
|
|
|
|
// NNG_OPT_MQTT_SESSION_EXPIRES is an nng_duration.
|
|
// If set to NNG_DURATION_ZERO, then the session will expire automatically
|
|
// when the connection is closed.
|
|
// If it set to NNG_DURATION_INFINITE, the session never expires.
|
|
// Otherwise it will be a whole number of seconds indicating the session
|
|
// expiry interval.
|
|
#define NNG_OPT_MQTT_SESSION_EXPIRES "session-expires"
|
|
|
|
#define NNG_OPT_MQTT_TOPIC_ALIAS_MAX "alias-max"
|
|
#define NNG_OPT_MQTT_TOPIC_ALIAS "topic-alias"
|
|
#define NNG_OPT_MQTT_MAX_QOS "max-qos"
|
|
|
|
// NNG_MAX_RECV_LMQ and NNG_MAX_SEND_LMQ define the length of waiting queue
|
|
// they are the length of nni_lmq, please be ware it affects the memory usage
|
|
// significantly while having heavy throughput
|
|
#define NNG_MAX_RECV_LMQ 64
|
|
#define NNG_MAX_SEND_LMQ 64
|
|
#define NNG_TRAN_MAX_LMQ_SIZE 128
|
|
|
|
// NNG_TLS_xxx options can be set on the client as well.
|
|
// E.g. NNG_OPT_TLS_CA_CERT, etc.
|
|
|
|
// TBD: Extended authentication. I think we should skip it -- everyone
|
|
// should just use TLS if they need security.
|
|
|
|
// NNG_OPT_MQTT_KEEP_ALIVE is set on the client, and can be retrieved
|
|
// by the client. This is an nng_duration but will always be zero or
|
|
// a whole number of seconds less than 65536. If setting the value,
|
|
// it must be set before the client connects. When retrieved, the
|
|
// server's value will be returned (if it is different from what we
|
|
// requested.) If we reconnect, we will try again with the configured
|
|
// value rather than the value that we got from the server last time.
|
|
#define NNG_OPT_MQTT_KEEP_ALIVE "mqtt-keep-alive"
|
|
|
|
// NNG_OPT_MQTT_MAX_PACKET_SIZE is the maximum packet size that can
|
|
// be used. It needs to be set before the client dials.
|
|
#define NNG_OPT_MQTT_MAX_PACKET_SIZE "mqtt-max-packet-size"
|
|
#define NNG_OPT_MQTT_USERNAME "username"
|
|
#define NNG_OPT_MQTT_PASSWORD "password"
|
|
|
|
// Message handling. Note that topic aliases are handled by the library
|
|
// automatically on behalf of the consumer.
|
|
|
|
typedef enum {
|
|
nng_mqtt_msg_format_binary = 0,
|
|
nng_mqtt_msg_format_utf8 = 1,
|
|
} nng_mqtt_msg_format_t;
|
|
|
|
/* Message types & flags */
|
|
#define CMD_UNKNOWN 0x00
|
|
#define CMD_CONNECT 0x10
|
|
#define CMD_CONNACK 0x20
|
|
#define CMD_PUBLISH 0x30 // indicates PUBLISH packet & MQTTV4 pub packet
|
|
#define CMD_PUBLISH_V5 0x31 // this is the flag for differing MQTTV5 from V4 V3
|
|
#define CMD_PUBACK 0x40
|
|
#define CMD_PUBREC 0x50
|
|
#define CMD_PUBREL 0x60
|
|
#define CMD_PUBCOMP 0x70
|
|
#define CMD_SUBSCRIBE 0x80
|
|
#define CMD_SUBACK 0x90
|
|
#define CMD_UNSUBSCRIBE 0xA0
|
|
#define CMD_UNSUBACK 0xB0
|
|
#define CMD_PINGREQ 0xC0
|
|
#define CMD_PINGRESP 0xD0
|
|
#define CMD_DISCONNECT 0xE0
|
|
#define CMD_AUTH_V5 0xF0
|
|
#define CMD_DISCONNECT_EV 0xE2
|
|
#define CMD_LASTWILL 0XE3
|
|
|
|
typedef enum {
|
|
NNG_MQTT_CONNECT = 0x01,
|
|
NNG_MQTT_CONNACK = 0x02,
|
|
NNG_MQTT_PUBLISH = 0x03,
|
|
NNG_MQTT_PUBACK = 0x04,
|
|
NNG_MQTT_PUBREC = 0x05,
|
|
NNG_MQTT_PUBREL = 0x06,
|
|
NNG_MQTT_PUBCOMP = 0x07,
|
|
NNG_MQTT_SUBSCRIBE = 0x08,
|
|
NNG_MQTT_SUBACK = 0x09,
|
|
NNG_MQTT_UNSUBSCRIBE = 0x0A,
|
|
NNG_MQTT_UNSUBACK = 0x0B,
|
|
NNG_MQTT_PINGREQ = 0x0C,
|
|
NNG_MQTT_PINGRESP = 0x0D,
|
|
NNG_MQTT_DISCONNECT = 0x0E,
|
|
NNG_MQTT_AUTH = 0x0F
|
|
} nng_mqtt_packet_type;
|
|
|
|
//TODO Enum is not fitting here
|
|
typedef enum {
|
|
SUCCESS = 0,
|
|
NORMAL_DISCONNECTION = 0,
|
|
GRANTED_QOS_0 = 0,
|
|
GRANTED_QOS_1 = 1,
|
|
GRANTED_QOS_2 = 2,
|
|
DISCONNECT_WITH_WILL_MESSAGE = 4,
|
|
NO_MATCHING_SUBSCRIBERS = 16,
|
|
NO_SUBSCRIPTION_EXISTED = 17,
|
|
CONTINUE_AUTHENTICATION = 24,
|
|
RE_AUTHENTICATE = 25,
|
|
UNSPECIFIED_ERROR = 128,
|
|
MALFORMED_PACKET = 129,
|
|
PROTOCOL_ERROR = 130,
|
|
IMPLEMENTATION_SPECIFIC_ERROR = 131,
|
|
UNSUPPORTED_PROTOCOL_VERSION = 132,
|
|
CLIENT_IDENTIFIER_NOT_VALID = 133,
|
|
BAD_USER_NAME_OR_PASSWORD = 134,
|
|
NOT_AUTHORIZED = 135,
|
|
SERVER_UNAVAILABLE = 136,
|
|
SERVER_BUSY = 137,
|
|
BANNED = 138,
|
|
SERVER_SHUTTING_DOWN = 139,
|
|
BAD_AUTHENTICATION_METHOD = 140,
|
|
KEEP_ALIVE_TIMEOUT = 141,
|
|
SESSION_TAKEN_OVER = 142,
|
|
TOPIC_FILTER_INVALID = 143,
|
|
TOPIC_NAME_INVALID = 144,
|
|
PACKET_IDENTIFIER_IN_USE = 145,
|
|
PACKET_IDENTIFIER_NOT_FOUND = 146,
|
|
RECEIVE_MAXIMUM_EXCEEDED = 147,
|
|
TOPIC_ALIAS_INVALID = 148,
|
|
PACKET_TOO_LARGE = 149,
|
|
MESSAGE_RATE_TOO_HIGH = 150,
|
|
QUOTA_EXCEEDED = 151,
|
|
ADMINISTRATIVE_ACTION = 152,
|
|
PAYLOAD_FORMAT_INVALID = 153,
|
|
RETAIN_NOT_SUPPORTED = 154,
|
|
QOS_NOT_SUPPORTED = 155,
|
|
USE_ANOTHER_SERVER = 156,
|
|
SERVER_MOVED = 157,
|
|
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = 158,
|
|
CONNECTION_RATE_EXCEEDED = 159,
|
|
MAXIMUM_CONNECT_TIME = 160,
|
|
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 161,
|
|
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = 162
|
|
|
|
} reason_code;
|
|
|
|
typedef enum {
|
|
PAYLOAD_FORMAT_INDICATOR = 1,
|
|
MESSAGE_EXPIRY_INTERVAL = 2,
|
|
CONTENT_TYPE = 3,
|
|
RESPONSE_TOPIC = 8,
|
|
CORRELATION_DATA = 9,
|
|
SUBSCRIPTION_IDENTIFIER = 11,
|
|
SESSION_EXPIRY_INTERVAL = 17,
|
|
ASSIGNED_CLIENT_IDENTIFIER = 18,
|
|
SERVER_KEEP_ALIVE = 19,
|
|
AUTHENTICATION_METHOD = 21,
|
|
AUTHENTICATION_DATA = 22,
|
|
REQUEST_PROBLEM_INFORMATION = 23,
|
|
WILL_DELAY_INTERVAL = 24,
|
|
REQUEST_RESPONSE_INFORMATION = 25,
|
|
RESPONSE_INFORMATION = 26,
|
|
SERVER_REFERENCE = 28,
|
|
REASON_STRING = 31,
|
|
RECEIVE_MAXIMUM = 33,
|
|
TOPIC_ALIAS_MAXIMUM = 34,
|
|
TOPIC_ALIAS = 35,
|
|
PUBLISH_MAXIMUM_QOS = 36,
|
|
RETAIN_AVAILABLE = 37,
|
|
USER_PROPERTY = 38,
|
|
MAXIMUM_PACKET_SIZE = 39,
|
|
WILDCARD_SUBSCRIPTION_AVAILABLE = 40,
|
|
SUBSCRIPTION_IDENTIFIER_AVAILABLE = 41,
|
|
SHARED_SUBSCRIPTION_AVAILABLE = 42
|
|
} properties_type;
|
|
|
|
struct mqtt_buf_t {
|
|
uint32_t length;
|
|
uint8_t *buf;
|
|
};
|
|
|
|
typedef struct mqtt_buf_t mqtt_buf;
|
|
typedef struct mqtt_buf_t nng_mqtt_buffer;
|
|
typedef struct mqtt_buf_t nng_mqtt_topic;
|
|
|
|
struct mqtt_kv_t {
|
|
mqtt_buf key;
|
|
mqtt_buf value;
|
|
};
|
|
typedef struct mqtt_kv_t mqtt_kv;
|
|
|
|
typedef struct mqtt_topic_qos_t {
|
|
nng_mqtt_topic topic;
|
|
uint8_t qos;
|
|
uint8_t nolocal;
|
|
uint8_t rap;
|
|
uint8_t retain_handling;
|
|
} mqtt_topic_qos;
|
|
|
|
typedef struct mqtt_topic_qos_t nng_mqtt_topic_qos;
|
|
|
|
extern uint16_t nni_msg_get_pub_pid(nng_msg *m);
|
|
struct mqtt_string {
|
|
char * body;
|
|
uint32_t len;
|
|
};
|
|
typedef struct mqtt_string mqtt_string;
|
|
|
|
struct mqtt_string_node {
|
|
struct mqtt_string_node *next;
|
|
mqtt_string * it;
|
|
};
|
|
typedef struct mqtt_string_node mqtt_string_node;
|
|
|
|
struct mqtt_binary {
|
|
uint8_t *body;
|
|
uint32_t len;
|
|
};
|
|
typedef struct mqtt_binary mqtt_binary;
|
|
|
|
struct mqtt_str_pair {
|
|
char * key; // key
|
|
uint32_t len_key;
|
|
char * val; // value
|
|
uint32_t len_val;
|
|
};
|
|
typedef struct mqtt_str_pair mqtt_str_pair;
|
|
|
|
union Property_type {
|
|
uint8_t u8;
|
|
uint16_t u16;
|
|
uint32_t u32;
|
|
uint32_t varint;
|
|
mqtt_buf binary;
|
|
mqtt_buf str;
|
|
mqtt_kv strpair;
|
|
};
|
|
|
|
typedef enum {
|
|
U8,
|
|
U16,
|
|
U32,
|
|
VARINT,
|
|
BINARY,
|
|
STR,
|
|
STR_PAIR,
|
|
UNKNOWN
|
|
} property_type_enum;
|
|
|
|
struct property_data {
|
|
property_type_enum p_type;
|
|
union Property_type p_value;
|
|
bool is_copy;
|
|
};
|
|
|
|
typedef struct property_data property_data;
|
|
|
|
struct property {
|
|
uint8_t id;
|
|
property_data data;
|
|
struct property *next;
|
|
};
|
|
typedef struct property property;
|
|
|
|
NNG_DECL int nng_mqtt_msg_alloc(nng_msg **, size_t);
|
|
NNG_DECL int nng_mqtt_msg_proto_data_alloc(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_proto_data_free(nng_msg *);
|
|
NNG_DECL int nng_mqtt_msg_encode(nng_msg *);
|
|
NNG_DECL int nng_mqtt_msg_decode(nng_msg *);
|
|
NNG_DECL int nng_mqttv5_msg_encode(nng_msg *);
|
|
NNG_DECL int nng_mqttv5_msg_decode(nng_msg *);
|
|
NNG_DECL int nng_mqtt_msg_validate(nng_msg *, uint8_t);
|
|
NNG_DECL void nng_mqtt_msg_set_packet_type(nng_msg *, nng_mqtt_packet_type);
|
|
NNG_DECL nng_mqtt_packet_type nng_mqtt_msg_get_packet_type(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_bridge_bool(nng_msg *msg, bool bridged);
|
|
NNG_DECL bool nng_mqtt_msg_get_bridge_bool(nng_msg *msg);
|
|
NNG_DECL void nng_mqtt_msg_set_sub_retain_bool(nng_msg *msg, bool retain);
|
|
NNG_DECL bool nng_mqtt_msg_get_sub_retain_bool(nng_msg *msg);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_proto_version(nng_msg *, uint8_t);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_keep_alive(nng_msg *, uint16_t);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_client_id(nng_msg *, const char *);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_user_name(nng_msg *, const char *);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_password(nng_msg *, const char *);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_clean_session(nng_msg *, bool);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_will_topic(nng_msg *, const char *);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_will_msg(
|
|
nng_msg *, uint8_t *, uint32_t);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_will_retain(nng_msg *, bool);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_will_qos(nng_msg *, uint8_t);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_property(nng_msg *, property *);
|
|
NNG_DECL property *nng_mqtt_msg_get_connect_will_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_connect_will_property(nng_msg *, property *);
|
|
NNG_DECL const char *nng_mqtt_msg_get_connect_user_name(nng_msg *);
|
|
NNG_DECL const char *nng_mqtt_msg_get_connect_password(nng_msg *);
|
|
NNG_DECL bool nng_mqtt_msg_get_connect_clean_session(nng_msg *);
|
|
NNG_DECL uint8_t nng_mqtt_msg_get_connect_proto_version(nng_msg *);
|
|
NNG_DECL uint16_t nng_mqtt_msg_get_connect_keep_alive(nng_msg *);
|
|
NNG_DECL const char *nng_mqtt_msg_get_connect_client_id(nng_msg *);
|
|
NNG_DECL const char *nng_mqtt_msg_get_connect_will_topic(nng_msg *);
|
|
NNG_DECL uint8_t *nng_mqtt_msg_get_connect_will_msg(nng_msg *, uint32_t *);
|
|
NNG_DECL bool nng_mqtt_msg_get_connect_will_retain(nng_msg *);
|
|
NNG_DECL uint8_t nng_mqtt_msg_get_connect_will_qos(nng_msg *);
|
|
NNG_DECL property *nng_mqtt_msg_get_connect_property(nng_msg *);
|
|
|
|
NNG_DECL void nng_mqtt_msg_set_connack_return_code(nng_msg *, uint8_t);
|
|
NNG_DECL void nng_mqtt_msg_set_connack_flags(nng_msg *, uint8_t);
|
|
NNG_DECL void nng_mqtt_msg_set_connack_property(nng_msg *, property *);
|
|
NNG_DECL uint8_t nng_mqtt_msg_get_connack_return_code(nng_msg *);
|
|
NNG_DECL uint8_t nng_mqtt_msg_get_connack_flags(nng_msg *);
|
|
NNG_DECL property *nng_mqtt_msg_get_connack_property(nng_msg *);
|
|
|
|
NNG_DECL void nng_mqtt_msg_set_publish_qos(nng_msg *, uint8_t);
|
|
NNG_DECL uint8_t nng_mqtt_msg_get_publish_qos(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_publish_retain(nng_msg *, bool);
|
|
NNG_DECL bool nng_mqtt_msg_get_publish_retain(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_publish_dup(nng_msg *, bool);
|
|
NNG_DECL bool nng_mqtt_msg_get_publish_dup(nng_msg *);
|
|
NNG_DECL int nng_mqtt_msg_set_publish_topic(nng_msg *, const char *);
|
|
NNG_DECL int nng_mqtt_msg_set_publish_topic_len(nng_msg *msg, uint32_t len);
|
|
|
|
NNG_DECL const char *nng_mqtt_msg_get_publish_topic(nng_msg *, uint32_t *);
|
|
NNG_DECL int nng_mqtt_msg_set_publish_payload(nng_msg *, uint8_t *, uint32_t);
|
|
NNG_DECL uint8_t *nng_mqtt_msg_get_publish_payload(nng_msg *, uint32_t *);
|
|
NNG_DECL property *nng_mqtt_msg_get_publish_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_publish_property(nng_msg *, property *);
|
|
|
|
NNG_DECL uint16_t nng_mqtt_msg_get_puback_packet_id(nng_msg *);
|
|
NNG_DECL property *nng_mqtt_msg_get_puback_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_puback_property(nng_msg *, property *);
|
|
|
|
NNG_DECL uint16_t nng_mqtt_msg_get_pubrec_packet_id(nng_msg *);
|
|
NNG_DECL property *nng_mqtt_msg_get_pubrec_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_pubrec_property(nng_msg *, property *);
|
|
|
|
NNG_DECL uint16_t nng_mqtt_msg_get_pubrel_packet_id(nng_msg *);
|
|
NNG_DECL property *nng_mqtt_msg_get_pubrel_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_pubrel_property(nng_msg *, property *);
|
|
|
|
NNG_DECL uint16_t nng_mqtt_msg_get_pubcomp_packet_id(nng_msg *);
|
|
NNG_DECL property *nng_mqtt_msg_get_pubcomp_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_pubcomp_property(nng_msg *, property *);
|
|
|
|
NNG_DECL nng_mqtt_topic_qos *nng_mqtt_msg_get_subscribe_topics(
|
|
nng_msg *, uint32_t *);
|
|
NNG_DECL void nng_mqtt_msg_set_subscribe_topics(
|
|
nng_msg *, nng_mqtt_topic_qos *, uint32_t);
|
|
NNG_DECL property *nng_mqtt_msg_get_subscribe_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_subscribe_property(nng_msg *, property *);
|
|
|
|
NNG_DECL void nng_mqtt_msg_set_suback_return_codes(
|
|
nng_msg *, uint8_t *, uint32_t);
|
|
NNG_DECL uint8_t *nng_mqtt_msg_get_suback_return_codes(nng_msg *, uint32_t *);
|
|
NNG_DECL property *nng_mqtt_msg_get_suback_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_suback_property(nng_msg *, property *);
|
|
|
|
NNG_DECL void nng_mqtt_msg_set_unsubscribe_topics(
|
|
nng_msg *, nng_mqtt_topic *, uint32_t);
|
|
NNG_DECL nng_mqtt_topic *nng_mqtt_msg_get_unsubscribe_topics(
|
|
nng_msg *, uint32_t *);
|
|
NNG_DECL property *nng_mqtt_msg_get_unsubscribe_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_unsubscribe_property(nng_msg *, property *);
|
|
|
|
NNG_DECL void nng_mqtt_msg_set_unsuback_return_codes(
|
|
nng_msg *, uint8_t *, uint32_t);
|
|
NNG_DECL uint8_t *nng_mqtt_msg_get_unsuback_return_codes(nng_msg *, uint32_t *);
|
|
NNG_DECL property *nng_mqtt_msg_get_unsuback_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_unsuback_property(nng_msg *, property *);
|
|
|
|
NNG_DECL property *nng_mqtt_msg_get_disconnect_property(nng_msg *);
|
|
NNG_DECL void nng_mqtt_msg_set_disconnect_property(nng_msg *, property *);
|
|
|
|
NNG_DECL nng_mqtt_topic *nng_mqtt_topic_array_create(size_t);
|
|
NNG_DECL void nng_mqtt_topic_array_set(nng_mqtt_topic *, size_t, const char *);
|
|
NNG_DECL void nng_mqtt_topic_array_free(nng_mqtt_topic *, size_t);
|
|
NNG_DECL nng_mqtt_topic_qos *nng_mqtt_topic_qos_array_create(size_t);
|
|
NNG_DECL void nng_mqtt_topic_qos_array_set(nng_mqtt_topic_qos *, size_t,
|
|
const char *, uint8_t, uint8_t, uint8_t, uint8_t);
|
|
NNG_DECL void nng_mqtt_topic_qos_array_free(nng_mqtt_topic_qos *, size_t);
|
|
NNG_DECL int nng_mqtt_set_connect_cb(nng_socket, nng_pipe_cb, void *);
|
|
NNG_DECL int nng_mqtt_set_disconnect_cb(nng_socket, nng_pipe_cb, void *);
|
|
NNG_DECL void nng_mqtt_msg_dump(nng_msg *, uint8_t *, uint32_t, bool);
|
|
|
|
NNG_DECL conn_param *nng_get_conn_param_from_msg(nng_msg *);
|
|
NNG_DECL void nng_msg_proto_set_property(nng_msg *msg, void *p);
|
|
NNG_DECL void nng_mqtt_msg_set_disconnect_reason_code(nng_msg *msg, uint8_t reason_code);
|
|
|
|
NNG_DECL uint32_t get_mqtt_properties_len(property *prop);
|
|
NNG_DECL int mqtt_property_free(property *prop);
|
|
NNG_DECL void mqtt_property_foreach(property *prop, void (*cb)(property *));
|
|
NNG_DECL int mqtt_property_dup(property **dup, const property *src);
|
|
NNG_DECL property *mqtt_property_pub_by_will(property *will_prop);
|
|
NNG_DECL int mqtt_property_value_copy(property *dst, const property *src);
|
|
|
|
NNG_DECL property *mqtt_property_alloc(void);
|
|
NNG_DECL property *mqtt_property_set_value_u8(uint8_t prop_id, uint8_t value);
|
|
NNG_DECL property *mqtt_property_set_value_u16(uint8_t prop_id, uint16_t value);
|
|
NNG_DECL property *mqtt_property_set_value_u32(uint8_t prop_id, uint32_t value);
|
|
NNG_DECL property *mqtt_property_set_value_varint(uint8_t prop_id, uint32_t value);
|
|
NNG_DECL property *mqtt_property_set_value_binary(uint8_t prop_id, uint8_t *value, uint32_t len, bool copy_value);
|
|
NNG_DECL property *mqtt_property_set_value_str( uint8_t prop_id, const char *value, uint32_t len, bool copy_value);
|
|
NNG_DECL property *mqtt_property_set_value_strpair(uint8_t prop_id, const char *key, uint32_t key_len, const char *value, uint32_t value_len, bool copy_value);
|
|
|
|
NNG_DECL property_type_enum mqtt_property_get_value_type(uint8_t prop_id);
|
|
NNG_DECL property_data *mqtt_property_get_value(property *prop, uint8_t prop_id);
|
|
NNG_DECL void mqtt_property_append(property *prop_list, property *last);
|
|
|
|
// Note that MQTT sockets can be connected to at most a single server.
|
|
// Creating the client does not connect it.
|
|
NNG_DECL int nng_mqtt_client_open(nng_socket *);
|
|
NNG_DECL int nng_mqttv5_client_open(nng_socket *);
|
|
|
|
// Note that there is a single implicit dialer for the client,
|
|
// and options may be set on the socket to configure dial options.
|
|
// Those options should be set before doing nng_dial().
|
|
|
|
// close done via nng_close().
|
|
|
|
// Question: session resumption. Should we resume sessions under the hood
|
|
// as part of reconnection, or do we want to expose this to the API user?
|
|
// My inclination is not to expose.
|
|
|
|
// nng_dial or nng_dialer_create can be used, but this protocol only
|
|
// allows a single dialer to be created on the socket.
|
|
|
|
// Subscriptions are normally run synchronously from the view of the
|
|
// caller. Because there is a round-trip message involved, we use
|
|
// a separate method instead of merely relying upon socket options.
|
|
// TODO: shared subscriptions. Subscription options (retain, QoS)
|
|
typedef struct nng_mqtt_client nng_mqtt_client;
|
|
typedef void(nng_mqtt_send_cb)(nng_mqtt_client *client, nng_msg *msg, void *);
|
|
struct nng_mqtt_client{
|
|
nng_socket sock;
|
|
nng_aio *send_aio;
|
|
nng_aio *recv_aio;
|
|
void *msgq;
|
|
void *obj; // user defined callback obj
|
|
bool async;
|
|
|
|
nng_mqtt_send_cb *cb;
|
|
};
|
|
|
|
|
|
NNG_DECL nng_mqtt_client *nng_mqtt_client_alloc(nng_socket, nng_mqtt_send_cb, bool);
|
|
NNG_DECL void nng_mqtt_client_free(nng_mqtt_client*, bool);
|
|
NNG_DECL int nng_mqtt_subscribe(nng_socket, nng_mqtt_topic_qos *, uint32_t, property *);
|
|
NNG_DECL int nng_mqtt_subscribe_async(nng_mqtt_client *, nng_mqtt_topic_qos *, size_t, property *);
|
|
NNG_DECL int nng_mqtt_unsubscribe(nng_socket, nng_mqtt_topic *, size_t, property *);
|
|
NNG_DECL int nng_mqtt_unsubscribe_async(nng_mqtt_client *, nng_mqtt_topic *, size_t, property *);
|
|
// as with other ctx based methods, we use the aio form exclusively
|
|
NNG_DECL int nng_mqtt_ctx_subscribe(nng_ctx *, const char *, nng_aio *, ...);
|
|
NNG_DECL int nng_mqtt_disconnect(nng_socket *, uint8_t, property *);
|
|
|
|
typedef struct nng_mqtt_sqlite_option nng_mqtt_sqlite_option;
|
|
|
|
#if defined(NNG_SUPP_SQLITE)
|
|
NNG_DECL int nng_mqtt_alloc_sqlite_opt(nng_mqtt_sqlite_option **);
|
|
NNG_DECL int nng_mqtt_free_sqlite_opt(nng_mqtt_sqlite_option *);
|
|
NNG_DECL void nng_mqtt_set_sqlite_conf(
|
|
nng_mqtt_sqlite_option *sqlite, void *config);
|
|
NNG_DECL void nng_mqtt_sqlite_db_init(nng_mqtt_sqlite_option *, const char *);
|
|
NNG_DECL void nng_mqtt_sqlite_db_fini(nng_mqtt_sqlite_option *);
|
|
#endif
|
|
|
|
#ifdef __cplusplus
|
|
}
|
|
#endif
|
|
|
|
#endif // NNG_MQTT_CLIENT_H
|