diff --git a/3rdparty/cppzmq/zmq.hpp b/3rdparty/cppzmq/zmq.hpp index 69fde093..c72708b6 100644 --- a/3rdparty/cppzmq/zmq.hpp +++ b/3rdparty/cppzmq/zmq.hpp @@ -25,6 +25,20 @@ #ifndef __ZMQ_HPP_INCLUDED__ #define __ZMQ_HPP_INCLUDED__ +#if (__cplusplus >= 201103L) + #define ZMQ_CPP11 + #define ZMQ_NOTHROW noexcept + #define ZMQ_EXPLICIT explicit +#elif (defined(_MSC_VER) && (_MSC_VER >= 1900)) + #define ZMQ_CPP11 + #define ZMQ_NOTHROW noexcept + #define ZMQ_EXPLICIT explicit +#else + #define ZMQ_CPP03 + #define ZMQ_NOTHROW + #define ZMQ_EXPLICIT +#endif + #include #include @@ -32,6 +46,14 @@ #include #include #include +#include +#include + +#ifdef ZMQ_CPP11 +#include +#include +#include +#endif // Detect whether the compiler supports C++11 rvalue references. #if (defined(__GNUC__) && (__GNUC__ > 4 || \ @@ -49,6 +71,9 @@ #else #define ZMQ_DELETED_FUNCTION #endif +#elif defined(_MSC_VER) && (_MSC_VER >= 1900) + #define ZMQ_HAS_RVALUE_REFS + #define ZMQ_DELETED_FUNCTION = delete #elif defined(_MSC_VER) && (_MSC_VER >= 1600) #define ZMQ_HAS_RVALUE_REFS #define ZMQ_DELETED_FUNCTION @@ -62,8 +87,19 @@ #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) #define ZMQ_HAS_PROXY_STEERABLE +/* Socket event data */ +typedef struct { + uint16_t event; // id of the event as bitfield + int32_t value ; // value is either error code, fd or reconnect interval +} zmq_event_t; +#endif + +// Avoid using deprecated message receive function when possible +#if ZMQ_VERSION < ZMQ_MAKE_VERSION(3, 2, 0) +# define zmq_msg_recv(msg, socket, flags) zmq_recvmsg(socket, msg, flags) #endif + // In order to prevent unused variable warnings when building in non-debug // mode use this macro to make assertions. #ifndef NDEBUG @@ -83,12 +119,17 @@ namespace zmq public: error_t () : errnum (zmq_errno ()) {} - - virtual const char *what () const throw () +#ifdef ZMQ_CPP11 + virtual const char *what () const noexcept { return zmq_strerror (errnum); } - +#else + virtual const char *what() const throw () + { + return zmq_strerror(errnum); + } +#endif int num () const { return errnum; @@ -99,21 +140,45 @@ namespace zmq int errnum; }; - inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1) + inline int poll (zmq_pollitem_t const* items_, size_t nitems_, long timeout_ = -1) { - int rc = zmq_poll (items_, nitems_, timeout_); + int rc = zmq_poll (const_cast(items_), static_cast(nitems_), timeout_); if (rc < 0) throw error_t (); return rc; } + inline int poll(zmq_pollitem_t const* items, size_t nitems) + { + return poll(items, nitems, -1); + } + + #ifdef ZMQ_CPP11 + inline int poll(zmq_pollitem_t const* items, size_t nitems, std::chrono::milliseconds timeout) + { + return poll(items, nitems, static_cast(timeout.count())); + } + + inline int poll(std::vector const& items, std::chrono::milliseconds timeout) + { + return poll(items.data(), items.size(), static_cast(timeout.count())); + } + + inline int poll(std::vector const& items, long timeout_ = -1) + { + return poll(items.data(), items.size(), timeout_); + } + #endif + + + inline void proxy (void *frontend, void *backend, void *capture) { int rc = zmq_proxy (frontend, backend, capture); if (rc != 0) throw error_t (); } - + #ifdef ZMQ_HAS_PROXY_STEERABLE inline void proxy_steerable (void *frontend, void *backend, void *capture, void *control) { @@ -122,12 +187,21 @@ namespace zmq throw error_t (); } #endif - + inline void version (int *major_, int *minor_, int *patch_) { zmq_version (major_, minor_, patch_); } + #ifdef ZMQ_CPP11 + inline std::tuple version() + { + std::tuple v; + zmq_version(&std::get<0>(v), &std::get<1>(v), &std::get<2>(v) ); + return v; + } + #endif + class message_t { friend class socket_t; @@ -148,6 +222,32 @@ namespace zmq throw error_t (); } + template message_t(I first, I last): + msg() + { + typedef typename std::iterator_traits::difference_type size_type; + typedef typename std::iterator_traits::value_type value_t; + + size_type const size_ = std::distance(first, last)*sizeof(value_t); + int const rc = zmq_msg_init_size (&msg, size_); + if (rc != 0) + throw error_t (); + value_t* dest = data(); + while (first != last) + { + *dest = *first; + ++dest; ++first; + } + } + + inline message_t (const void *data_, size_t size_) + { + int rc = zmq_msg_init_size (&msg, size_); + if (rc != 0) + throw error_t (); + memcpy(data(), data_, size_); + } + inline message_t (void *data_, size_t size_, free_fn *ffn_, void *hint_ = NULL) { @@ -157,21 +257,21 @@ namespace zmq } #ifdef ZMQ_HAS_RVALUE_REFS - inline message_t (message_t &&rhs) : msg (rhs.msg) + inline message_t (message_t &&rhs): msg (rhs.msg) { int rc = zmq_msg_init (&rhs.msg); if (rc != 0) throw error_t (); } - inline message_t &operator = (message_t &&rhs) + inline message_t &operator = (message_t &&rhs) ZMQ_NOTHROW { std::swap (msg, rhs.msg); return *this; } #endif - inline ~message_t () + inline ~message_t () ZMQ_NOTHROW { int rc = zmq_msg_close (&msg); ZMQ_ASSERT (rc == 0); @@ -197,6 +297,17 @@ namespace zmq throw error_t (); } + inline void rebuild (const void *data_, size_t size_) + { + int rc = zmq_msg_close (&msg); + if (rc != 0) + throw error_t (); + rc = zmq_msg_init_size (&msg, size_); + if (rc != 0) + throw error_t (); + memcpy(data(), data_, size_); + } + inline void rebuild (void *data_, size_t size_, free_fn *ffn_, void *hint_ = NULL) { @@ -208,50 +319,78 @@ namespace zmq throw error_t (); } - inline void move (message_t *msg_) + inline void move (message_t const *msg_) { - int rc = zmq_msg_move (&msg, &(msg_->msg)); + int rc = zmq_msg_move (&msg, const_cast(&(msg_->msg))); if (rc != 0) throw error_t (); } - inline void copy (message_t *msg_) + inline void copy (message_t const *msg_) { - int rc = zmq_msg_copy (&msg, &(msg_->msg)); + int rc = zmq_msg_copy (&msg, const_cast(&(msg_->msg))); if (rc != 0) throw error_t (); } - inline bool more () + inline bool more () const ZMQ_NOTHROW { - int rc = zmq_msg_more (&msg); + int rc = zmq_msg_more (const_cast(&msg) ); return rc != 0; } - inline void *data () + inline void *data () ZMQ_NOTHROW { return zmq_msg_data (&msg); } - inline const void* data () const + inline const void* data () const ZMQ_NOTHROW { return zmq_msg_data (const_cast(&msg)); } - inline size_t size () const + inline size_t size () const ZMQ_NOTHROW { return zmq_msg_size (const_cast(&msg)); } - private: + template T* data() ZMQ_NOTHROW + { + return static_cast( data() ); + } + template T const* data() const ZMQ_NOTHROW + { + return static_cast( data() ); + } + + inline bool equal(const message_t* other) const ZMQ_NOTHROW + { + if (size() != other->size()) + return false; + std::string a(data(), size()); + std::string b(other->data(), other->size()); + return a == b; + } + +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) + inline const char* gets(const char *property_) + { + const char* value = zmq_msg_gets (&msg, property_); + if (value == NULL) + throw error_t (); + return value; + } +#endif + + private: // The underlying message zmq_msg_t msg; // Disable implicit message copying, so that users won't use shared // messages (less efficient) without being aware of the fact. - message_t (const message_t&); - void operator = (const message_t&); + message_t (const message_t&) ZMQ_DELETED_FUNCTION; + void operator = (const message_t&) ZMQ_DELETED_FUNCTION; }; class context_t @@ -281,26 +420,27 @@ namespace zmq } #ifdef ZMQ_HAS_RVALUE_REFS - inline context_t (context_t &&rhs) : ptr (rhs.ptr) + inline context_t (context_t &&rhs) ZMQ_NOTHROW : ptr (rhs.ptr) { rhs.ptr = NULL; } - inline context_t &operator = (context_t &&rhs) + inline context_t &operator = (context_t &&rhs) ZMQ_NOTHROW { std::swap (ptr, rhs.ptr); return *this; } #endif - inline ~context_t () + inline ~context_t () ZMQ_NOTHROW { close(); } - inline void close() + inline void close() ZMQ_NOTHROW { if (ptr == NULL) return; + int rc = zmq_ctx_destroy (ptr); ZMQ_ASSERT (rc == 0); ptr = NULL; @@ -309,55 +449,90 @@ namespace zmq // Be careful with this, it's probably only useful for // using the C api together with an existing C++ api. // Normally you should never need to use this. - inline operator void* () + inline ZMQ_EXPLICIT operator void* () ZMQ_NOTHROW { return ptr; } + inline ZMQ_EXPLICIT operator void const* () const ZMQ_NOTHROW + { + return ptr; + } private: void *ptr; - context_t (const context_t&); - void operator = (const context_t&); + context_t (const context_t&) ZMQ_DELETED_FUNCTION; + void operator = (const context_t&) ZMQ_DELETED_FUNCTION; + }; + + #ifdef ZMQ_CPP11 + enum class socket_type: int + { + req = ZMQ_REQ, + rep = ZMQ_REP, + dealer = ZMQ_DEALER, + router = ZMQ_ROUTER, + pub = ZMQ_PUB, + sub = ZMQ_SUB, + xpub = ZMQ_XPUB, + xsub = ZMQ_XSUB, + push = ZMQ_PUSH, + pull = ZMQ_PULL, +#if ZMQ_VERSION_MAJOR < 4 + pair = ZMQ_PAIR +#else + pair = ZMQ_PAIR, + stream = ZMQ_STREAM +#endif }; + #endif class socket_t { friend class monitor_t; + friend class poller_t; public: + inline socket_t(context_t& context_, int type_) + { + init(context_, type_); + } - inline socket_t (context_t &context_, int type_) + #ifdef ZMQ_CPP11 + inline socket_t(context_t& context_, socket_type type_) { - ctxptr = context_.ptr; - ptr = zmq_socket (context_.ptr, type_); - if (ptr == NULL) - throw error_t (); + init(context_, static_cast(type_)); } + #endif #ifdef ZMQ_HAS_RVALUE_REFS - inline socket_t(socket_t&& rhs) : ptr(rhs.ptr) + inline socket_t(socket_t&& rhs) ZMQ_NOTHROW : ptr(rhs.ptr) { rhs.ptr = NULL; } - inline socket_t& operator=(socket_t&& rhs) + inline socket_t& operator=(socket_t&& rhs) ZMQ_NOTHROW { std::swap(ptr, rhs.ptr); return *this; } #endif - inline ~socket_t () + inline ~socket_t () ZMQ_NOTHROW { close(); } - inline operator void* () + inline operator void* () ZMQ_NOTHROW { return ptr; } - inline void close() + inline operator void const* () const ZMQ_NOTHROW + { + return ptr; + } + + inline void close() ZMQ_NOTHROW { if(ptr == NULL) // already closed @@ -367,6 +542,11 @@ namespace zmq ptr = 0 ; } + template void setsockopt(int option_, T const& optval) + { + setsockopt(option_, &optval, sizeof(T) ); + } + inline void setsockopt (int option_, const void *optval_, size_t optvallen_) { @@ -376,13 +556,26 @@ namespace zmq } inline void getsockopt (int option_, void *optval_, - size_t *optvallen_) + size_t *optvallen_) const { int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_); if (rc != 0) throw error_t (); } - + + template T getsockopt(int option_) const + { + T optval; + size_t optlen = sizeof(T); + getsockopt(option_, &optval, &optlen ); + return optval; + } + + inline void bind(std::string const& addr) + { + bind(addr.c_str()); + } + inline void bind (const char *addr_) { int rc = zmq_bind (ptr, addr_); @@ -390,6 +583,11 @@ namespace zmq throw error_t (); } + inline void unbind(std::string const& addr) + { + unbind(addr.c_str()); + } + inline void unbind (const char *addr_) { int rc = zmq_unbind (ptr, addr_); @@ -397,6 +595,11 @@ namespace zmq throw error_t (); } + inline void connect(std::string const& addr) + { + connect(addr.c_str()); + } + inline void connect (const char *addr_) { int rc = zmq_connect (ptr, addr_); @@ -404,6 +607,11 @@ namespace zmq throw error_t (); } + inline void disconnect(std::string const& addr) + { + disconnect(addr.c_str()); + } + inline void disconnect (const char *addr_) { int rc = zmq_disconnect (ptr, addr_); @@ -411,11 +619,11 @@ namespace zmq throw error_t (); } - inline bool connected() + inline bool connected() const ZMQ_NOTHROW { return(ptr != NULL); } - + inline size_t send (const void *buf_, size_t len_, int flags_ = 0) { int nbytes = zmq_send (ptr, buf_, len_, flags_); @@ -436,6 +644,19 @@ namespace zmq throw error_t (); } + template bool send(I first, I last, int flags_=0) + { + zmq::message_t msg(first, last); + return send(msg, flags_); + } + +#ifdef ZMQ_HAS_RVALUE_REFS + inline bool send (message_t &&msg_, int flags_ = 0) + { + return send(msg_, flags_); + } +#endif + inline size_t recv (void *buf_, size_t len_, int flags_ = 0) { int nbytes = zmq_recv (ptr, buf_, len_, flags_); @@ -455,8 +676,16 @@ namespace zmq return false; throw error_t (); } - + private: + inline void init(context_t& context_, int type_) + { + ctxptr = context_.ptr; + ptr = zmq_socket (context_.ptr, type_ ); + if (ptr == NULL) + throw error_t (); + } + void *ptr; void *ctxptr; @@ -470,6 +699,11 @@ namespace zmq monitor_t() : socketPtr(NULL) {} virtual ~monitor_t() {} + void monitor(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL) + { + monitor(socket, addr.c_str(), events); + } + void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) { int rc = zmq_socket_monitor(socket.ptr, addr_, events); @@ -482,22 +716,30 @@ namespace zmq rc = zmq_connect (s, addr_); assert (rc == 0); - + on_monitor_started(); - + while (true) { zmq_msg_t eventMsg; zmq_msg_init (&eventMsg); - rc = zmq_recvmsg (s, &eventMsg, 0); + rc = zmq_msg_recv (&eventMsg, s, 0); if (rc == -1 && zmq_errno() == ETERM) break; assert (rc != -1); - zmq_event_t* event = static_cast(zmq_msg_data (&eventMsg)); - +#if ZMQ_VERSION_MAJOR >= 4 + const char* data = static_cast(zmq_msg_data(&eventMsg)); + zmq_event_t msgEvent; + memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t); + memcpy(&msgEvent.value, data, sizeof(int32_t)); + zmq_event_t* event = &msgEvent; +#else + zmq_event_t* event = static_cast(zmq_msg_data(&eventMsg)); +#endif + #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT zmq_msg_t addrMsg; zmq_msg_init (&addrMsg); - rc = zmq_recvmsg (s, &addrMsg, 0); + rc = zmq_msg_recv (&addrMsg, s, 0); if (rc == -1 && zmq_errno() == ETERM) break; assert (rc != -1); @@ -545,6 +787,14 @@ namespace zmq case ZMQ_EVENT_DISCONNECTED: on_event_disconnected(*event, address.c_str()); break; +#ifdef ZMQ_BUILD_DRAFT_API + case ZMQ_EVENT_HANDSHAKE_FAILED: + on_event_handshake_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_SUCCEED: + on_event_handshake_succeed(*event, address.c_str()); + break; +#endif default: on_event_unknown(*event, address.c_str()); break; @@ -573,10 +823,69 @@ namespace zmq virtual void on_event_closed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } virtual void on_event_close_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } virtual void on_event_disconnected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } + virtual void on_event_handshake_failed(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; } + virtual void on_event_handshake_succeed(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; } virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } private: void* socketPtr; }; + +#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) + class poller_t + { + public: + poller_t () : poller_ptr (zmq_poller_new ()) + { + if (!poller_ptr) + throw error_t (); + } + + ~poller_t () + { + zmq_poller_destroy (&poller_ptr); + } + + bool add (zmq::socket_t &socket, short events, std::function &handler) + { + if (0 == zmq_poller_add (poller_ptr, socket.ptr, handler ? &handler : NULL, events)) { + poller_events.emplace_back (zmq_poller_event_t ()); + return true; + } + return false; + } + + bool remove (zmq::socket_t &socket) + { + if (0 == zmq_poller_remove (poller_ptr, socket.ptr)) { + poller_events.pop_back (); + return true; + } + return false; + } + + bool wait (std::chrono::milliseconds timeout) + { + int rc = zmq_poller_wait_all (poller_ptr, poller_events.data (), poller_events.size (), static_cast(timeout.count ())); + if (rc >= 0) { + std::for_each (poller_events.begin (), poller_events.begin () + rc, [](zmq_poller_event_t& event) { + if (event.user_data != NULL) + (*reinterpret_cast*> (event.user_data)) (); + }); + return true; + } + + if (zmq_errno () == ETIMEDOUT) + return false; + + throw error_t (); + } + + private: + void *poller_ptr; + std::vector poller_events; + }; +#endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) + } #endif diff --git a/include/nzmqt/impl.hpp b/include/nzmqt/impl.hpp index 9d5d4ed5..a66377cd 100644 --- a/include/nzmqt/impl.hpp +++ b/include/nzmqt/impl.hpp @@ -33,6 +33,7 @@ #include #include #include +#include #if defined(NZMQT_LIB) // #pragma message("nzmqt is built as library") @@ -75,11 +76,6 @@ NZMQT_INLINE void ZMQMessage::move(ZMQMessage* msg_) super::move(static_cast(msg_)); } -NZMQT_INLINE void ZMQMessage::copy(ZMQMessage* msg_) -{ - super::copy(msg_); -} - NZMQT_INLINE void ZMQMessage::clone(ZMQMessage* msg_) { rebuild(msg_->size()); @@ -88,7 +84,7 @@ NZMQT_INLINE void ZMQMessage::clone(ZMQMessage* msg_) NZMQT_INLINE QByteArray ZMQMessage::toByteArray() { - return QByteArray((const char *)data(), size()); + return size() <= INT_MAX ? QByteArray(data(), int(size())) : QByteArray(); } @@ -98,7 +94,7 @@ NZMQT_INLINE QByteArray ZMQMessage::toByteArray() */ NZMQT_INLINE ZMQSocket::ZMQSocket(ZMQContext* context_, Type type_) - : qsuper(0) + : qsuper(nullptr) , zmqsuper(*context_, type_) , m_context(context_) { @@ -116,7 +112,7 @@ NZMQT_INLINE void ZMQSocket::close() if (m_context) { m_context->unregisterSocket(this); - m_context = 0; + m_context = nullptr; } zmqsuper::close(); } @@ -136,49 +132,49 @@ NZMQT_INLINE void ZMQSocket::setOption(Option optName_, const QByteArray& bytes_ setOption(optName_, bytes_.constData(), bytes_.size()); } -NZMQT_INLINE void ZMQSocket::setOption(Option optName_, qint32 value_) +NZMQT_INLINE void ZMQSocket::getOption(Option option_, void *optval_, size_t *optvallen_) const { - setOption(optName_, &value_, sizeof(value_)); + const_cast(this)->getsockopt(option_, optval_, optvallen_); } -NZMQT_INLINE void ZMQSocket::setOption(Option optName_, quint32 value_) +NZMQT_INLINE void ZMQSocket::bindTo(const QString& addr_) { - setOption(optName_, &value_, sizeof(value_)); + bind(addr_.toLocal8Bit()); } -NZMQT_INLINE void ZMQSocket::setOption(Option optName_, qint64 value_) +NZMQT_INLINE void ZMQSocket::bindTo(const char *addr_) { - setOption(optName_, &value_, sizeof(value_)); + bind(addr_); } -NZMQT_INLINE void ZMQSocket::setOption(Option optName_, quint64 value_) +NZMQT_INLINE void ZMQSocket::unbindFrom(const QString& addr_) { - setOption(optName_, &value_, sizeof(value_)); + unbind(addr_.toLocal8Bit()); } -NZMQT_INLINE void ZMQSocket::getOption(Option option_, void *optval_, size_t *optvallen_) const +NZMQT_INLINE void ZMQSocket::unbindFrom(const char *addr_) { - const_cast(this)->getsockopt(option_, optval_, optvallen_); + unbind(addr_); } -NZMQT_INLINE void ZMQSocket::bindTo(const QString& addr_) +NZMQT_INLINE void ZMQSocket::connectTo(const QString& addr_) { - bind(addr_.toLocal8Bit()); + zmqsuper::connect(addr_.toLocal8Bit()); } -NZMQT_INLINE void ZMQSocket::bindTo(const char *addr_) +NZMQT_INLINE void ZMQSocket::connectTo(const char* addr_) { - bind(addr_); + zmqsuper::connect(addr_); } -NZMQT_INLINE void ZMQSocket::connectTo(const QString& addr_) +NZMQT_INLINE void ZMQSocket::disconnectFrom(const QString& addr_) { - zmqsuper::connect(addr_.toLocal8Bit()); + zmqsuper::disconnect(addr_.toLocal8Bit()); } -NZMQT_INLINE void ZMQSocket::connectTo(const char* addr_) +NZMQT_INLINE void ZMQSocket::disconnectFrom(const char* addr_) { - zmqsuper::connect(addr_); + zmqsuper::disconnect(addr_); } NZMQT_INLINE bool ZMQSocket::sendMessage(ZMQMessage& msg_, SendFlags flags_) @@ -211,12 +207,12 @@ NZMQT_INLINE bool ZMQSocket::receiveMessage(ZMQMessage* msg_, ReceiveFlags flags return recv(msg_, flags_); } -NZMQT_INLINE QList ZMQSocket::receiveMessage() +NZMQT_INLINE QList ZMQSocket::receiveMessage(ReceiveFlags flags_) { QList parts; ZMQMessage msg; - while (receiveMessage(&msg)) + while (receiveMessage(&msg, flags_)) { parts += msg.toByteArray(); msg.rebuild(); @@ -228,24 +224,24 @@ NZMQT_INLINE QList ZMQSocket::receiveMessage() return parts; } -NZMQT_INLINE QList< QList > ZMQSocket::receiveMessages() +NZMQT_INLINE QList< QList > ZMQSocket::receiveMessages(ReceiveFlags flags_) { QList< QList > ret; - QList parts = receiveMessage(); + QList parts = receiveMessage(flags_); while (!parts.isEmpty()) { - ret += parts; + ret += std::move(parts); - parts = receiveMessage(); + parts = receiveMessage(flags_); } return ret; } -NZMQT_INLINE qint32 ZMQSocket::fileDescriptor() const +NZMQT_INLINE qintptr ZMQSocket::fileDescriptor() const { - qint32 value; + qintptr value; size_t size = sizeof(value); getOption(OPT_FD, &value, &size); return value; @@ -289,7 +285,7 @@ NZMQT_INLINE QByteArray ZMQSocket::identity() const char idbuf[256]; size_t size = sizeof(idbuf); getOption(OPT_IDENTITY, idbuf, &size); - return QByteArray(idbuf, size); + return QByteArray(idbuf, int(size)); } NZMQT_INLINE void ZMQSocket::setLinger(int msec_) @@ -335,7 +331,20 @@ NZMQT_INLINE void ZMQSocket::unsubscribeFrom(const QByteArray& filter_) setOption(OPT_UNSUBSCRIBE, filter_); } +NZMQT_INLINE void ZMQSocket::setSendHighWaterMark(int value_) +{ + setOption(OPT_SNDHWM, value_); +} + +NZMQT_INLINE void ZMQSocket::setReceiveHighWaterMark(int value_) +{ + setOption(OPT_RCVHWM, value_); +} +NZMQT_INLINE bool ZMQSocket::isConnected() +{ + return const_cast(this)->connected(); +} /* * ZMQContext @@ -350,9 +359,9 @@ NZMQT_INLINE ZMQContext::ZMQContext(QObject* parent_, int io_threads_) NZMQT_INLINE ZMQContext::~ZMQContext() { // qDebug() << Q_FUNC_INFO << "Sockets:" << m_sockets; - foreach (ZMQSocket* socket, m_sockets) + for(ZMQSocket* socket : registeredSockets()) { - socket->m_context = 0; + socket->m_context = nullptr; // As stated by 0MQ, close() must ONLY be called from the thread // owning the socket. So we use 'invokeMethod' which (hopefully) // results in a 'close' call from within the socket's thread. @@ -375,15 +384,13 @@ NZMQT_INLINE void ZMQContext::registerSocket(ZMQSocket* socket_) NZMQT_INLINE void ZMQContext::unregisterSocket(ZMQSocket* socket_) { - Sockets::iterator soIt = m_sockets.begin(); - while (soIt != m_sockets.end()) + for(Sockets::iterator soIt = m_sockets.begin(); soIt != m_sockets.end(); ++soIt) { if (*soIt == socket_) { m_sockets.erase(soIt); break; } - ++soIt; } } @@ -421,12 +428,6 @@ NZMQT_INLINE PollingZMQSocket::PollingZMQSocket(PollingZMQContext* context_, Typ { } -NZMQT_INLINE void PollingZMQSocket::onMessageReceived(const QList& message) -{ - emit messageReceived(message); -} - - /* * PollingZMQContext @@ -454,7 +455,7 @@ NZMQT_INLINE int PollingZMQContext::getInterval() const NZMQT_INLINE void PollingZMQContext::start() { m_stopped = false; - QTimer::singleShot(0, this, SLOT(run())); + QTimer::singleShot(0, this, &PollingZMQContext::run); } NZMQT_INLINE void PollingZMQContext::stop() @@ -483,7 +484,7 @@ NZMQT_INLINE void PollingZMQContext::run() } if (!m_stopped) - QTimer::singleShot(m_interval, this, SLOT(run())); + QTimer::singleShot(m_interval, this, &PollingZMQContext::run); } NZMQT_INLINE void PollingZMQContext::poll(long timeout_) @@ -508,8 +509,8 @@ NZMQT_INLINE void PollingZMQContext::poll(long timeout_) if (poIt->revents & ZMQSocket::EVT_POLLIN) { PollingZMQSocket* socket = static_cast(*soIt); - QList message = socket->receiveMessage(); - socket->onMessageReceived(message); + QList && message = socket->receiveMessage(); + socket->messageReceived(std::move(message)); i++; } ++soIt; @@ -563,48 +564,70 @@ NZMQT_INLINE void PollingZMQContext::unregisterSocket(ZMQSocket* socket_) NZMQT_INLINE SocketNotifierZMQSocket::SocketNotifierZMQSocket(ZMQContext* context_, Type type_) : super(context_, type_) , socketNotifyRead_(0) -// , socketNotifyWrite_(0) + , socketNotifyWrite_(0) { - int fd = fileDescriptor(); + qintptr fd = fileDescriptor(); socketNotifyRead_ = new QSocketNotifier(fd, QSocketNotifier::Read, this); - QObject::connect(socketNotifyRead_, SIGNAL(activated(int)), this, SLOT(socketReadActivity())); + QObject::connect(socketNotifyRead_, &QSocketNotifier::activated, this, &SocketNotifierZMQSocket::socketReadActivity); -// socketNotifyWrite_ = new QSocketNotifier(fd, QSocketNotifier::Write, this); -// socketNotifyWrite_->setEnabled(false); -// QObject::connect(socketNotifyWrite_, SIGNAL(activated(int)), this, SLOT(socketWriteActivity())); + socketNotifyWrite_ = new QSocketNotifier(fd, QSocketNotifier::Write, this); + QObject::connect(socketNotifyWrite_, &QSocketNotifier::activated, this, &SocketNotifierZMQSocket::socketWriteActivity); } -//NZMQT_INLINE bool SocketNotifierZMQSocket::sendMessage(const QByteArray& bytes_, SendFlags flags_) -//{ -// bool result = super::sendMessage(bytes_, flags_); -// -// if (!result) -// socketNotifyWrite_->setEnabled(true); -// -// return result; -//} +NZMQT_INLINE SocketNotifierZMQSocket::~SocketNotifierZMQSocket() +{ + close(); +} + +NZMQT_INLINE void SocketNotifierZMQSocket::close() +{ + socketNotifyRead_->deleteLater(); + socketNotifyWrite_->deleteLater(); + super::close(); +} NZMQT_INLINE void SocketNotifierZMQSocket::socketReadActivity() { socketNotifyRead_->setEnabled(false); - while(events() & EVT_POLLIN) + try { - QList message = receiveMessage(); - emit messageReceived(message); + while(isConnected() && (events() & EVT_POLLIN)) + { + const QList & message = receiveMessage(); + emit messageReceived(message); + } + } + catch (const ZMQException& ex) + { + qWarning("Exception during read: %s", ex.what()); + emit notifierError(ex.num(), ex.what()); } socketNotifyRead_->setEnabled(true); } -//NZMQT_INLINE void SocketNotifierZMQSocket::socketWriteActivity() -//{ -// if(events() == 0) -// { -// socketNotifyWrite_->setEnabled(false); -// } -//} +NZMQT_INLINE void SocketNotifierZMQSocket::socketWriteActivity() +{ + socketNotifyWrite_->setEnabled(false); + + try + { + while (isConnected() && (events() & EVT_POLLIN)) + { + const QList & message = receiveMessage(); + emit messageReceived(message); + } + } + catch (const ZMQException& ex) + { + qWarning("Exception during write: %s", ex.what()); + emit notifierError(ex.num(), ex.what()); + } + + socketNotifyWrite_->setEnabled(true); +} @@ -632,7 +655,10 @@ NZMQT_INLINE bool SocketNotifierZMQContext::isStopped() const NZMQT_INLINE SocketNotifierZMQSocket* SocketNotifierZMQContext::createSocketInternal(ZMQSocket::Type type_) { - return new SocketNotifierZMQSocket(this, type_); + SocketNotifierZMQSocket *socket = new SocketNotifierZMQSocket(this, type_); + connect(socket, &SocketNotifierZMQSocket::notifierError, + this, &SocketNotifierZMQContext::notifierError); + return socket; } } diff --git a/include/nzmqt/nzmqt.hpp b/include/nzmqt/nzmqt.hpp index 52b7c843..e2493036 100755 --- a/include/nzmqt/nzmqt.hpp +++ b/include/nzmqt/nzmqt.hpp @@ -40,6 +40,8 @@ #include #include +#include + // Define default context implementation to be used. #ifndef NZMQT_DEFAULT_ZMQCONTEXT_IMPLEMENTATION #define NZMQT_DEFAULT_ZMQCONTEXT_IMPLEMENTATION PollingZMQContext @@ -88,7 +90,7 @@ namespace nzmqt void move(ZMQMessage* msg_); - void copy(ZMQMessage* msg_); + using super::copy; using super::more; @@ -144,13 +146,13 @@ namespace nzmqt enum SendFlag { SND_MORE = ZMQ_SNDMORE, - SND_NOBLOCK = ZMQ_DONTWAIT + SND_DONTWAIT = ZMQ_DONTWAIT }; Q_DECLARE_FLAGS(SendFlags, SendFlag) enum ReceiveFlag { - RCV_NOBLOCK = ZMQ_DONTWAIT + RCV_DONTWAIT = ZMQ_DONTWAIT }; Q_DECLARE_FLAGS(ReceiveFlags, ReceiveFlag) @@ -161,10 +163,20 @@ namespace nzmqt OPT_RCVMORE = ZMQ_RCVMORE, OPT_FD = ZMQ_FD, OPT_EVENTS = ZMQ_EVENTS, + OPT_MAXMSGSIZE = ZMQ_MAXMSGSIZE, // Set only. OPT_SUBSCRIBE = ZMQ_SUBSCRIBE, OPT_UNSUBSCRIBE = ZMQ_UNSUBSCRIBE, +#ifdef ZMQ_IMMEDIATE + OPT_IMMEDIATE = ZMQ_IMMEDIATE, +#endif +#ifdef ZMQ_REQ_CORRELATE + OPT_REQ_CORRELATE = ZMQ_REQ_CORRELATE, +#endif +#ifdef ZMQ_REQ_RELAXED + OPT_REQ_RELAXED = ZMQ_REQ_RELAXED, +#endif // Get and set. OPT_AFFINITY = ZMQ_AFFINITY, @@ -176,7 +188,20 @@ namespace nzmqt OPT_LINGER = ZMQ_LINGER, OPT_RECONNECT_IVL = ZMQ_RECONNECT_IVL, OPT_RECONNECT_IVL_MAX = ZMQ_RECONNECT_IVL_MAX, - OPT_BACKLOG = ZMQ_BACKLOG + OPT_BACKLOG = ZMQ_BACKLOG, + OPT_SNDHWM = ZMQ_SNDHWM, + OPT_RCVHWM = ZMQ_RCVHWM, + OPT_SNDTIMEO = ZMQ_SNDTIMEO, + OPT_RCVTIMEO = ZMQ_RCVTIMEO, +#ifdef ZMQ_IPV6 + OPT_IPV6 = ZMQ_IPV6, +#endif +#ifdef ZMQ_CONFLATE + OPT_CONFLATE = ZMQ_CONFLATE, +#endif +#ifdef ZMQ_TOS + OPT_TOS = ZMQ_TOS, +#endif }; ~ZMQSocket(); @@ -189,13 +214,11 @@ namespace nzmqt void setOption(Option optName_, const QByteArray& bytes_); - void setOption(Option optName_, qint32 value_); - - void setOption(Option optName_, quint32 value_); - - void setOption(Option optName_, qint64 value_); - - void setOption(Option optName_, quint64 value_); + template::value>::type> + void setOption(Option optName_, INT_T value_) + { + setOption(optName_, &value_, sizeof(value_)); + } void getOption(Option option_, void *optval_, size_t *optvallen_) const; @@ -203,29 +226,37 @@ namespace nzmqt void bindTo(const char *addr_); + void unbindFrom(const QString& addr_); + + void unbindFrom(const char *addr_); + void connectTo(const QString& addr_); void connectTo(const char* addr_); - bool sendMessage(ZMQMessage& msg_, SendFlags flags_ = SND_NOBLOCK); + void disconnectFrom(const QString& addr_); + + void disconnectFrom(const char* addr_); + + bool sendMessage(ZMQMessage& msg_, SendFlags flags_ = SND_DONTWAIT); // Receives a message or a message part. - bool receiveMessage(ZMQMessage* msg_, ReceiveFlags flags_ = RCV_NOBLOCK); + bool receiveMessage(ZMQMessage* msg_, ReceiveFlags flags_ = RCV_DONTWAIT); // Receives a message. // The message is represented as a list of byte arrays representing // a message's parts. If the message is not a multi-part message the // list will only contain one array. - QList receiveMessage(); + QList receiveMessage(ReceiveFlags flags_ = RCV_DONTWAIT); // Receives all messages currently available. // Each message is represented as a list of byte arrays representing the messages // and their parts in case of multi-part messages. If a message isn't a multi-part // message the corresponding byte array list will only contain one element. // Note that this method won't work with REQ-REP protocol. - QList< QList > receiveMessages(); + QList< QList > receiveMessages(ReceiveFlags flags_ = RCV_DONTWAIT); - qint32 fileDescriptor() const; + qintptr fileDescriptor() const; Events events() const; @@ -257,6 +288,12 @@ namespace nzmqt void unsubscribeFrom(const QByteArray& filter_); + void setSendHighWaterMark(int value_); + + void setReceiveHighWaterMark(int value_); + + bool isConnected(); + signals: void messageReceived(const QList&); @@ -264,12 +301,12 @@ namespace nzmqt void close(); // Send the given bytes as a single-part message. - bool sendMessage(const QByteArray& bytes_, nzmqt::ZMQSocket::SendFlags flags_ = SND_NOBLOCK); + bool sendMessage(const QByteArray& bytes_, nzmqt::ZMQSocket::SendFlags flags_ = SND_DONTWAIT); // Interprets the provided list of byte arrays as a multi-part message // and sends them accordingly. // If an empty list is provided this method doesn't do anything and returns trua. - bool sendMessage(const QList& msg_, nzmqt::ZMQSocket::SendFlags flags_ = SND_NOBLOCK); + bool sendMessage(const QList& msg_, nzmqt::ZMQSocket::SendFlags flags_ = SND_DONTWAIT); protected: @@ -296,7 +333,7 @@ namespace nzmqt friend class ZMQSocket; public: - ZMQContext(QObject* parent_ = 0, int io_threads_ = NZMQT_DEFAULT_IOTHREADS); + ZMQContext(QObject* parent_ = nullptr, int io_threads_ = NZMQT_DEFAULT_IOTHREADS); // Deleting children is necessary, because otherwise the children are deleted after the context // which results in a blocking state. So we delete the children before the zmq::context_t @@ -311,7 +348,7 @@ namespace nzmqt // ownership later on). Make sure, however, that the socket's parent // belongs to the same thread as the socket instance itself (as it is required // by Qt). Otherwise, you will encounter strange errors. - ZMQSocket* createSocket(ZMQSocket::Type type_, QObject* parent_ = 0); + ZMQSocket* createSocket(ZMQSocket::Type type_, QObject* parent_ = nullptr); // Start watching for incoming messages. virtual void start() = 0; @@ -378,10 +415,6 @@ namespace nzmqt protected: PollingZMQSocket(PollingZMQContext* context_, Type type_); - - // This method is called by the socket's context object in order - // to signal a new received message. - void onMessageReceived(const QList& message); }; class NZMQT_API PollingZMQContext : public ZMQContext, public QRunnable @@ -391,7 +424,7 @@ namespace nzmqt typedef ZMQContext super; public: - PollingZMQContext(QObject* parent_ = 0, int io_threads_ = NZMQT_DEFAULT_IOTHREADS); + PollingZMQContext(QObject* parent_ = nullptr, int io_threads_ = NZMQT_DEFAULT_IOTHREADS); // Sets the polling interval. // Note that the interval does not denote the time the zmq::poll() function will @@ -402,13 +435,13 @@ namespace nzmqt int getInterval() const; // Starts the polling process by scheduling a call to the 'run()' method into Qt's event loop. - void start(); + void start() override; // Stops the polling process in the sense that no further 'run()' calls will be scheduled into // Qt's event loop. - void stop(); + void stop() override; - bool isStopped() const; + bool isStopped() const override; public slots: // If the polling process is not stopped (by a previous call to the 'stop()' method) this @@ -429,13 +462,13 @@ namespace nzmqt void pollError(int errorNum, const QString& errorMsg); protected: - PollingZMQSocket* createSocketInternal(ZMQSocket::Type type_); + PollingZMQSocket* createSocketInternal(ZMQSocket::Type type_) override; // Add the given socket to list list of poll-items. - void registerSocket(ZMQSocket* socket_); + void registerSocket(ZMQSocket* socket_) override; // Remove the given socket object from the list of poll-items. - void unregisterSocket(ZMQSocket* socket_); + void unregisterSocket(ZMQSocket* socket_) override; private: typedef QVector PollItems; @@ -457,22 +490,27 @@ namespace nzmqt typedef ZMQSocket super; -// public: -// using super::sendMessage; + public: + using super::close; -// bool sendMessage(const QByteArray& bytes_, SendFlags flags_ = SND_NOBLOCK); + void close(); + + signals: + // This signal will be emitted by the socket notifier callback if a call + // to the events() method results in an exception. + void notifierError(int errorNum, const QString& errorMsg); protected: SocketNotifierZMQSocket(ZMQContext* context_, Type type_); + ~SocketNotifierZMQSocket(); protected slots: void socketReadActivity(); - -// void socketWriteActivity(); + void socketWriteActivity(); private: QSocketNotifier *socketNotifyRead_; -// QSocketNotifier *socketNotifyWrite_; + QSocketNotifier *socketNotifyWrite_; }; class NZMQT_API SocketNotifierZMQContext : public ZMQContext @@ -482,19 +520,24 @@ namespace nzmqt typedef ZMQContext super; public: - SocketNotifierZMQContext(QObject* parent_ = 0, int io_threads_ = NZMQT_DEFAULT_IOTHREADS); + SocketNotifierZMQContext(QObject* parent_ = nullptr, int io_threads_ = NZMQT_DEFAULT_IOTHREADS); + + void start() override; - void start(); + void stop() override; - void stop(); + bool isStopped() const override; - bool isStopped() const; + signals: + // This signal will be emitted by the socket notifier callback if a call + // to the events() method results in an exception. + void notifierError(int errorNum, const QString& errorMsg); protected: SocketNotifierZMQSocket* createSocketInternal(ZMQSocket::Type type_); }; - NZMQT_API inline ZMQContext* createDefaultContext(QObject* parent_ = 0, int io_threads_ = NZMQT_DEFAULT_IOTHREADS) + NZMQT_API inline ZMQContext* createDefaultContext(QObject* parent_ = nullptr, int io_threads_ = NZMQT_DEFAULT_IOTHREADS) { return new NZMQT_DEFAULT_ZMQCONTEXT_IMPLEMENTATION(parent_, io_threads_); } diff --git a/nzmqt.pri b/nzmqt.pri new file mode 100644 index 00000000..42546455 --- /dev/null +++ b/nzmqt.pri @@ -0,0 +1,18 @@ +# Include this file into your project to build and link +# the nzmqt library into you application + +# This define will "move" nzmqt class method +# implementations to nzmqt.cpp file. +DEFINES += NZMQT_LIB + +SOURCES += \ + $$PWD/src/nzmqt/nzmqt.cpp + +HEADERS += \ + $$PWD/include/nzmqt/global.hpp \ + $$PWD/include/nzmqt/nzmqt.hpp \ + $$PWD/include/nzmqt/impl.hpp + +INCLUDEPATH += \ + $$PWD/include \ + $$PWD/3rdparty/cppzmq \ No newline at end of file