39#include <condition_variable>
56 using ptr_t = std::shared_ptr<token>;
73 using guard = std::lock_guard<std::mutex>;
75 using unique_lock = std::unique_lock<std::mutex>;
78 mutable std::mutex lock_;
80 mutable std::condition_variable cond_;
93 MQTTAsync_token msgId_;
113 std::unique_ptr<connect_response> connRsp_;
115 std::unique_ptr<subscribe_response> subRsp_;
117 std::unique_ptr<unsubscribe_response> unsubRsp_;
137 void set_message_id(MQTTAsync_token msgId) {
150 static void on_success(
void* tokObj, MQTTAsync_successData* rsp);
151 static void on_success5(
void* tokObj, MQTTAsync_successData5* rsp);
161 static void on_failure(
void* tokObj, MQTTAsync_failureData* rsp);
162 static void on_failure5(
void* tokObj, MQTTAsync_failureData5* rsp);
169 static void on_connected(
void* tokObj,
char* );
174 void on_success(MQTTAsync_successData* rsp);
175 void on_success5(MQTTAsync_successData5* rsp);
180 void on_failure(MQTTAsync_failureData* rsp);
181 void on_failure5(MQTTAsync_failureData5* rsp);
187 void check_ret()
const {
189 throw exception(rc_, reasonCode_, errMsg_);
199 :
token(typ, cli, MQTTAsync_token(0)) {}
271 return std::make_shared<token>(typ, cli);
284 return std::make_shared<token>(typ, cli, userContext, cb);
293 return std::make_shared<token>(typ, cli,
topic);
307 return std::make_shared<token>(typ, cli,
topic, userContext, cb);
316 return std::make_shared<token>(typ, cli, topics);
330 return std::make_shared<token>(typ, cli, topics, userContext, cb);
357 static_assert(
sizeof(msgId_) <=
sizeof(
int),
"MQTTAsync_token must fit into int");
395 listener_ = &listener;
404 userContext_ = userContext;
447 return wait_for(std::chrono::milliseconds(timeout));
455 template <
class Rep,
class Period>
456 bool wait_for(
const std::chrono::duration<Rep, Period>& relTime) {
457 unique_lock g(lock_);
458 if (!cond_.wait_for(g, std::chrono::milliseconds(relTime),
459 [
this]{return complete_;}))
470 template <
class Clock,
class Duration>
471 bool wait_until(
const std::chrono::time_point<Clock, Duration>& absTime) {
472 unique_lock g(lock_);
473 if (!cond_.wait_until(g, absTime, [
this]{return complete_;}))
Definition async_client.h:108
Definition connect_options.h:49
Definition server_response.h:75
Definition response_options.h:204
Definition disconnect_options.h:40
Definition exception.h:47
Definition iaction_listener.h:49
Definition iasync_client.h:59
Definition response_options.h:35
Definition string_collection.h:43
token(Type typ, iasync_client &cli, void *userContext, iaction_listener &cb)
Definition token.h:209
static ptr_t create(Type typ, iasync_client &cli, const string &topic)
Definition token.h:292
static ptr_t create(Type typ, iasync_client &cli)
Definition token.h:270
virtual int get_message_id() const
Definition token.h:356
virtual void * get_user_context() const
Definition token.h:373
virtual int get_return_code() const
Definition token.h:388
token(Type typ, iasync_client &cli, const string &topic, void *userContext, iaction_listener &cb)
Definition token.h:230
static ptr_t create(Type typ, iasync_client &cli, const string &topic, void *userContext, iaction_listener &cb)
Definition token.h:305
Type get_type() const
Definition token.h:337
ReasonCode get_reason_code() const
Definition token.h:422
std::shared_ptr< token > ptr_t
Definition token.h:56
unsubscribe_response get_unsubscribe_response() const
static ptr_t create(Type typ, iasync_client &cli, const_string_collection_ptr topics)
Definition token.h:315
subscribe_response get_subscribe_response() const
std::shared_ptr< const token > const_ptr_t
Definition token.h:58
virtual bool wait_for(long timeout)
Definition token.h:446
virtual const_string_collection_ptr get_topics() const
Definition token.h:366
virtual bool is_complete() const
Definition token.h:381
virtual void set_user_context(void *userContext)
Definition token.h:402
token(Type typ, iasync_client &cli)
Definition token.h:198
Type
Definition token.h:63
@ SUBSCRIBE
Definition token.h:65
@ CONNECT
Definition token.h:64
@ UNSUBSCRIBE
Definition token.h:67
@ PUBLISH
Definition token.h:66
@ DISCONNECT
Definition token.h:68
token(Type typ, iasync_client &cli, MQTTAsync_token tok)
bool wait_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition token.h:471
token(Type typ, iasync_client &cli, const_string_collection_ptr topics)
connect_response get_connect_response() const
virtual iaction_listener * get_action_callback() const
Definition token.h:342
virtual iasync_client * get_client() const
Definition token.h:351
virtual bool try_wait()
Definition token.h:433
virtual ~token()
Definition token.h:263
std::weak_ptr< token > weak_ptr_t
Definition token.h:60
token(Type typ, iasync_client &cli, const string &topic)
Definition token.h:218
void set_num_expected(size_t n)
Definition token.h:411
token(Type typ, iasync_client &cli, const_string_collection_ptr topics, void *userContext, iaction_listener &cb)
virtual void set_action_callback(iaction_listener &listener)
Definition token.h:393
friend class mock_async_client
Definition token.h:121
static ptr_t create(Type typ, iasync_client &cli, const_string_collection_ptr topics, void *userContext, iaction_listener &cb)
Definition token.h:328
static ptr_t create(Type typ, iasync_client &cli, void *userContext, iaction_listener &cb)
Definition token.h:282
bool wait_for(const std::chrono::duration< Rep, Period > &relTime)
Definition token.h:456
Definition server_response.h:177
Definition async_client.h:49
ReasonCode
Definition types.h:57
@ GRANTED_QOS_2
Definition types.h:62
token::ptr_t token_ptr
Definition token.h:506
string_collection::const_ptr_t const_string_collection_ptr
Definition string_collection.h:234
token::const_ptr_t const_token_ptr
Definition token.h:509
Definition server_response.h:123