PahoMqttCpp
MQTT C++ Client for POSIX and Windows
Loading...
Searching...
No Matches
async_client.h
Go to the documentation of this file.
1
16
17/*******************************************************************************
18 * Copyright (c) 2013-2024 Frank Pagliughi <fpagliughi@mindspring.com>
19 *
20 * All rights reserved. This program and the accompanying materials
21 * are made available under the terms of the Eclipse Public License v2.0
22 * and Eclipse Distribution License v1.0 which accompany this distribution.
23 *
24 * The Eclipse Public License is available at
25 * http://www.eclipse.org/legal/epl-v20.html
26 * and the Eclipse Distribution License is available at
27 * http://www.eclipse.org/org/documents/edl-v10.php.
28 *
29 * Contributors:
30 * Frank Pagliughi - initial implementation and documentation
31 * Frank Pagliughi - MQTT v5 support
32 *******************************************************************************/
33
34#ifndef __mqtt_async_client_h
35#define __mqtt_async_client_h
36
37#include <functional>
38#include <list>
39#include <memory>
40#include <stdexcept>
41#include <tuple>
42#include <vector>
43
44#include "MQTTAsync.h"
45#include "mqtt/callback.h"
46#include "mqtt/create_options.h"
47#include "mqtt/delivery_token.h"
48#include "mqtt/event.h"
49#include "mqtt/exception.h"
51#include "mqtt/iasync_client.h"
53#include "mqtt/message.h"
54#include "mqtt/properties.h"
56#include "mqtt/thread_queue.h"
57#include "mqtt/token.h"
58#include "mqtt/types.h"
59
60namespace mqtt {
61
62// OBSOLETE: The legacy constants that lacked the "PAHO_MQTTPP_" prefix
63// clashed with #define's from other libraries and will be removed at the
64// next major version upgrade.
65
66#if defined(PAHO_MQTTPP_VERSIONS)
68const uint32_t PAHO_MQTTPP_VERSION = 0x01050000;
70const string PAHO_MQTTPP_VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.0");
72const string PAHO_MQTTPP_COPYRIGHT("Copyright (c) 2013-2024 Frank Pagliughi");
73#else
75const uint32_t VERSION = 0x01050000;
77const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.0");
79const string COPYRIGHT("Copyright (c) 2013-2024 Frank Pagliughi");
80#endif
81
83
120class async_client : public virtual iasync_client
121{
122public:
124 using ptr_t = std::shared_ptr<async_client>;
126 using consumer_queue_type = std::unique_ptr<thread_queue<event>>;
127
129 using message_handler = std::function<void(const_message_ptr)>;
131 using connection_handler = std::function<void(const string& cause)>;
133 using disconnected_handler = std::function<void(const properties&, ReasonCode)>;
135 using update_connection_handler = std::function<bool(connect_data&)>;
136
137private:
139 using guard = std::unique_lock<std::mutex>;
141 using unique_lock = std::unique_lock<std::mutex>;
142
144 mutable std::mutex lock_;
146 MQTTAsync cli_;
148 const create_options createOpts_;
150 int mqttVersion_;
152 std::unique_ptr<MQTTClient_persistence> persist_{};
154 callback* userCallback_{};
156 connection_handler connHandler_;
158 connection_handler connLostHandler_;
160 disconnected_handler disconnectedHandler_;
162 update_connection_handler updateConnectionHandler_;
164 message_handler msgHandler_;
166 connect_options connOpts_;
168 token_ptr connTok_;
170 std::list<token_ptr> pendingTokens_;
172 std::list<delivery_token_ptr> pendingDeliveryTokens_;
175
177 static void on_connected(void* context, char* cause);
178 static void on_connection_lost(void* context, char* cause);
179 static void on_disconnected(
180 void* context, MQTTProperties* cprops, MQTTReasonCodes reasonCode
181 );
182 static int on_message_arrived(
183 void* context, char* topicName, int topicLen, MQTTAsync_message* msg
184 );
185 static void on_delivery_complete(void* context, MQTTAsync_token tok);
186 static int on_update_connection(void* context, MQTTAsync_connectData* cdata);
187
189 friend class token;
190 virtual void add_token(token_ptr tok);
191 virtual void add_token(delivery_token_ptr tok);
192 virtual void remove_token(token* tok) override;
193 virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
194 void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
195
197 async_client() = delete;
198 async_client(const async_client&) = delete;
199 async_client& operator=(const async_client&) = delete;
200
202 static void check_ret(int rc) {
203 if (rc != MQTTASYNC_SUCCESS)
204 throw exception(rc);
205 }
213 void create();
214
215public:
226 explicit async_client(const string& serverURI, const string& clientId = string{})
227 : async_client(serverURI, clientId, NO_PERSISTENCE) {}
241 const string& serverURI, const string& clientId, const persistence_type& persistence
242 )
243 : createOpts_{serverURI, clientId, persistence} {
244 create();
245 }
260 const string& serverURI, const string& clientId, int maxBufferedMessages,
261 const persistence_type& persistence = persistence_type{}
262 )
263 : createOpts_{serverURI, clientId, maxBufferedMessages, persistence} {
264 create();
265 }
279 const string& serverURI, const string& clientId, const create_options& opts,
280 const persistence_type& persistence
281 )
282 : createOpts_{serverURI, clientId, opts, persistence} {
283 create();
284 }
293 async_client(const create_options& opts) : createOpts_{opts} { create(); }
297 ~async_client() override;
304 void set_callback(callback& cb) override;
310 void disable_callbacks() override;
347 token_ptr connect() override;
371 token_ptr connect(connect_options options, void* userContext, iaction_listener& cb)
372 override;
384 token_ptr connect(void* userContext, iaction_listener& cb) override {
385 return connect(connect_options{}, userContext, cb);
386 }
418 token_ptr disconnect(int timeout) override {
419 return disconnect(disconnect_options(timeout));
420 }
431 template <class Rep, class Period>
432 token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout) {
433 // TODO: check range
434 return disconnect((int)to_milliseconds_count(timeout));
435 }
450 token_ptr disconnect(int timeout, void* userContext, iaction_listener& cb) override;
465 template <class Rep, class Period>
467 const std::chrono::duration<Rep, Period>& timeout, void* userContext,
469 ) {
470 // TODO: check range
471 return disconnect((int)to_milliseconds_count(timeout), userContext, cb);
472 }
484 token_ptr disconnect(void* userContext, iaction_listener& cb) override {
485 return disconnect(0L, userContext, cb);
486 }
496 std::vector<delivery_token_ptr> get_pending_delivery_tokens() const override;
501 string get_client_id() const override { return createOpts_.get_client_id(); }
506 string get_server_uri() const override { return createOpts_.get_server_uri(); }
516 int mqtt_version() const noexcept { return mqttVersion_; }
523 guard g(lock_);
524 return connOpts_;
525 }
530 bool is_connected() const override { return to_bool(MQTTAsync_isConnected(cli_)); }
544 string_ref topic, const void* payload, size_t n, int qos, bool retained,
545 const properties &props=properties()
546 ) override;
555 delivery_token_ptr publish(string_ref topic, const void* payload, size_t n) override {
556 return publish(
557 std::move(topic), payload, n, message::DFLT_QOS, message::DFLT_RETAINED
558 );
559 }
571 delivery_token_ptr publish(string_ref topic, binary_ref payload, int qos, bool retained,
572 const properties &props=properties()
573 )
574 override;
583 return publish(
584 std::move(topic), std::move(payload), message::DFLT_QOS, message::DFLT_RETAINED
585 );
586 }
603 string_ref topic, const void* payload, size_t n, int qos, bool retained,
604 void* userContext, iaction_listener& cb
605 ) override;
627 override;
639 const string& topicFilter, int qos,
640 const subscribe_options& opts = subscribe_options(),
641 const properties& props = properties()
642 ) override;
661 const string& topicFilter, int qos, void* userContext, iaction_listener& cb,
662 const subscribe_options& opts = subscribe_options(),
663 const properties& props = properties()
664 ) override;
680 const_string_collection_ptr topicFilters, const qos_collection& qos,
681 const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
682 const properties& props = properties()
683 ) override;
702 const_string_collection_ptr topicFilters, const qos_collection& qos,
703 void* userContext, iaction_listener& cb,
704 const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
705 const properties& props = properties()
706 ) override;
715 token_ptr unsubscribe(const string& topicFilter, const properties& props = properties())
716 override;
727 const_string_collection_ptr topicFilters, const properties& props = properties()
728 ) override;
743 const_string_collection_ptr topicFilters, void* userContext, iaction_listener& cb,
744 const properties& props = properties()
745 ) override;
759 const string& topicFilter, void* userContext, iaction_listener& cb,
760 const properties& props = properties()
761 ) override;
781 void start_consuming() override;
790 void stop_consuming() override;
794 void clear_consumer() override {
795 if (que_) que_->clear();
796 }
804 bool consumer_closed() noexcept override {
805 return !que_ || que_->closed();
806 }
814 bool consumer_done() noexcept override {
815 return !que_ || que_->done();
816 }
825 std::size_t consumer_queue_size() const override {
826 return (que_) ? que_->size() : 0;
827 }
834 event consume_event() override;
841 bool try_consume_event(event* evt) override;
849 template <typename Rep, class Period>
851 event* evt, const std::chrono::duration<Rep, Period>& relTime
852 ) {
853 if (!que_)
854 throw mqtt::exception(-1, "Consumer not started");
855
856 try {
857 return que_->try_get_for(evt, relTime);
858 }
859 catch (queue_closed&) {
860 *evt = event{shutdown_event{}};
861 return true;
862 }
863 }
870 template <typename Rep, class Period>
871 event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
872 event evt;
873 try {
874 que_->try_get_for(&evt, relTime);
875 }
876 catch (queue_closed&) {
877 evt = event{shutdown_event{}};
878 }
879 return evt;
880 }
888 template <class Clock, class Duration>
890 event* evt, const std::chrono::time_point<Clock, Duration>& absTime
891 ) {
892 if (!que_)
893 throw mqtt::exception(-1, "Consumer not started");
894
895 try {
896 return que_->try_get_until(evt, absTime);
897 }
898 catch (queue_closed&) {
899 *evt = event{shutdown_event{}};
900 return true;
901 }
902 }
909 template <class Clock, class Duration>
910 event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime
911 ) {
912 event evt;
913 try {
914 que_->try_get_until(&evt, absTime);
915 }
916 catch (queue_closed&) {
917 evt = event{shutdown_event{}};
918 }
919 return evt;
920 }
942 template <typename Rep, class Period>
944 const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
945 ) {
946 if (!que_)
947 throw mqtt::exception(-1, "Consumer not started");
948
949 event evt;
950
951 while (true) {
952 if (!try_consume_event_for(&evt, relTime))
953 return false;
954
955 if (const auto* pval = evt.get_message_if()) {
956 *msg = std::move(*pval);
957 break;
958 }
959
960 if (evt.is_any_disconnect()) {
961 *msg = const_message_ptr{};
962 break;
963 }
964 }
965 return true;
966 }
973 template <typename Rep, class Period>
975 const std::chrono::duration<Rep, Period>& relTime
976 ) {
978 this->try_consume_message_for(&msg, relTime);
979 return msg;
980 }
988 template <class Clock, class Duration>
990 const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
991 ) {
992 if (!que_)
993 throw mqtt::exception(-1, "Consumer not started");
994
995 event evt;
996
997 while (true) {
998 if (!try_consume_event_until(&evt, absTime))
999 return false;
1000
1001 if (const auto* pval = evt.get_message_if()) {
1002 *msg = std::move(*pval);
1003 break;
1004 }
1005
1006 if (!evt.is_any_disconnect()) {
1007 *msg = const_message_ptr{};
1008 break;
1009 }
1010 }
1011
1012 return true;
1013 }
1019 template <class Clock, class Duration>
1021 const std::chrono::time_point<Clock, Duration>& absTime
1022 ) {
1024 this->try_consume_message_until(&msg, absTime);
1025 return msg;
1026 }
1027};
1028
1031
1033} // namespace mqtt
1034
1035#endif // __mqtt_async_client_h
Definition async_client.h:121
void set_connection_lost_handler(connection_handler cb)
bool try_consume_event_until(event *evt, const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:889
delivery_token_ptr publish(const_message_ptr msg, void *userContext, iaction_listener &cb) override
bool consumer_done() noexcept override
Definition async_client.h:814
async_client(const string &serverURI, const string &clientId, const persistence_type &persistence)
Definition async_client.h:240
std::unique_ptr< thread_queue< event > > consumer_queue_type
Definition async_client.h:126
void set_disconnected_handler(disconnected_handler cb)
const_message_ptr try_consume_message_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:1020
token_ptr subscribe(const string &topicFilter, int qos, const subscribe_options &opts=subscribe_options(), const properties &props=properties()) override
token_ptr disconnect(disconnect_options opts) override
bool try_consume_message(const_message_ptr *msg) override
token_ptr disconnect() override
Definition async_client.h:399
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained, void *userContext, iaction_listener &cb) override
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n) override
Definition async_client.h:555
delivery_token_ptr publish(string_ref topic, binary_ref payload, int qos, bool retained, const properties &props=properties()) override
token_ptr subscribe(const string &topicFilter, int qos, void *userContext, iaction_listener &cb, const subscribe_options &opts=subscribe_options(), const properties &props=properties()) override
std::shared_ptr< async_client > ptr_t
Definition async_client.h:124
bool consumer_closed() noexcept override
Definition async_client.h:804
void stop_consuming() override
connect_options get_connect_options() const
Definition async_client.h:522
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:943
token_ptr disconnect(int timeout) override
Definition async_client.h:418
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties()) override
std::function< bool(connect_data &)> update_connection_handler
Definition async_client.h:135
void set_message_callback(message_handler cb)
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout, void *userContext, iaction_listener &cb)
Definition async_client.h:466
token_ptr connect(connect_options options, void *userContext, iaction_listener &cb) override
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained, const properties &props=properties()) override
std::function< void(const properties &, ReasonCode)> disconnected_handler
Definition async_client.h:133
std::function< void(const_message_ptr)> message_handler
Definition async_client.h:129
std::size_t consumer_queue_size() const override
Definition async_client.h:825
token_ptr unsubscribe(const string &topicFilter, const properties &props=properties()) override
delivery_token_ptr publish(string_ref topic, binary_ref payload) override
Definition async_client.h:582
string get_client_id() const override
Definition async_client.h:501
async_client(const string &serverURI, const string &clientId, int maxBufferedMessages, const persistence_type &persistence=persistence_type{})
Definition async_client.h:259
void set_callback(callback &cb) override
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:989
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos, void *userContext, iaction_listener &cb, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties()) override
const_message_ptr consume_message() override
token_ptr reconnect() override
const_message_ptr try_consume_message_for(const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:974
void set_connected_handler(connection_handler cb)
bool try_consume_event_for(event *evt, const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:850
async_client(const create_options &opts)
Definition async_client.h:293
token_ptr disconnect(int timeout, void *userContext, iaction_listener &cb) override
std::vector< delivery_token_ptr > get_pending_delivery_tokens() const override
token_ptr unsubscribe(const_string_collection_ptr topicFilters, const properties &props=properties()) override
token_ptr unsubscribe(const_string_collection_ptr topicFilters, void *userContext, iaction_listener &cb, const properties &props=properties()) override
void clear_consumer() override
Definition async_client.h:794
async_client(const string &serverURI, const string &clientId, const create_options &opts, const persistence_type &persistence)
Definition async_client.h:278
bool try_consume_event(event *evt) override
int mqtt_version() const noexcept
Definition async_client.h:516
token_ptr unsubscribe(const string &topicFilter, void *userContext, iaction_listener &cb, const properties &props=properties()) override
event try_consume_event_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:910
event consume_event() override
string get_server_uri() const override
Definition async_client.h:506
~async_client() override
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout)
Definition async_client.h:432
async_client(const string &serverURI, const string &clientId=string{})
Definition async_client.h:226
delivery_token_ptr get_pending_delivery_token(int msgID) const override
token_ptr disconnect(void *userContext, iaction_listener &cb) override
Definition async_client.h:484
token_ptr connect(void *userContext, iaction_listener &cb) override
Definition async_client.h:384
void disable_callbacks() override
void set_update_connection_handler(update_connection_handler cb)
token_ptr connect() override
bool is_connected() const override
Definition async_client.h:530
event try_consume_event_for(const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:871
delivery_token_ptr publish(const_message_ptr msg) override
void start_consuming() override
token_ptr connect(connect_options options) override
std::function< void(const string &cause)> connection_handler
Definition async_client.h:131
Definition callback.h:43
Definition connect_options.h:571
Definition connect_options.h:50
Definition create_options.h:60
const string & get_server_uri() const noexcept
Definition create_options.h:199
const string & get_client_id() const noexcept
Definition create_options.h:209
Definition disconnect_options.h:41
Definition event.h:85
Definition exception.h:48
Definition iaction_listener.h:50
Definition iasync_client.h:60
std::vector< int > qos_collection
Definition iasync_client.h:66
static constexpr bool DFLT_RETAINED
Definition message.h:62
static constexpr int DFLT_QOS
Definition message.h:60
Definition properties.h:293
Definition thread_queue.h:43
Definition subscribe_options.h:49
Definition token.h:54
Definition topic.h:45
Definition async_client.h:60
ReasonCode
Definition reason_code.h:39
constexpr no_persistence NO_PERSISTENCE
Definition create_options.h:43
string_collection::const_ptr_t const_string_collection_ptr
Definition string_collection.h:259
bool to_bool(int n)
Definition types.h:107
token::ptr_t token_ptr
Definition token.h:513
const uint32_t VERSION
Definition async_client.h:75
message::const_ptr_t const_message_ptr
Definition message.h:378
const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.0")
const string COPYRIGHT("Copyright (c) 2013-2024 Frank Pagliughi")
long to_milliseconds_count(const std::chrono::duration< Rep, Period > &dur)
Definition types.h:95
delivery_token::ptr_t delivery_token_ptr
Definition delivery_token.h:127
async_client::ptr_t async_client_ptr
Definition async_client.h:1030
std::variant< no_persistence, string, iclient_persistence * > persistence_type
Definition create_options.h:52
Definition event.h:57