zmq_ctx_socket_monitor(3)
ØMQ Manual - ØMQ/3.2.6
Name
zmq_socket_monitor - register a monitoring callback
Synopsis
int zmq_socket_monitor (void *socket, char * *addr, int events);
Description
The zmq_socket_monitor() function shall spawn a PAIR socket that publishes socket state changes (events) over the inproc:// transport to a given endpoint. Messages are zmq_event_t structs. It's recommended to connect via a PAIR socket in another application thread and handle monitoring events there. It's possible to also supply a bitmask (ZMQ_EVENT_ALL or any combination of the ZMQ_EVENT_* constants) of the events you're interested in.
// monitoring thread
static void *req_socket_monitor (void *ctx)
{
zmq_event_t event;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.req");
assert (rc == 0);
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
// handle socket connected event
break;
case ZMQ_EVENT_CLOSED:
// handle socket closed event
break;
}
}
zmq_close (s);
return NULL;
}
// register a monitor endpoint for all socket events
rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL);
assert (rc == 0);
// spawn a monitoring thread
rc = pthread_create (&threads [0], NULL, req_socket_monitor, ctx);
assert (rc == 0);
Only connection oriented (tcp and ipc) transports are supported in this initial implementation.
Supported events are:
ZMQ_EVENT_CONNECTED: connection established
The ZMQ_EVENT_CONNECTED event triggers when a connection has been established to a remote peer. This can happen either synchronous or asynchronous.
Event metadata:
data.connected.addr // peer address data.connected.fd // socket descriptor
ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled
The ZMQ_EVENT_CONNECT_DELAYED event triggers when an immediate connection attempt is delayed and it's completion's being polled for.
Event metadata:
data.connect_delayed.addr // peer address data.connect_delayed.err // errno value
ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt
The ZMQ_EVENT_CONNECT_RETRIED event triggers when a connection attempt is being handled by reconnect timer. The reconnect interval's recomputed for each attempt.
Event metadata:
data.connect_retried.addr // peer address data.connect_retried.interval // computed reconnect interval
ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections
The ZMQ_EVENT_LISTENING event triggers when a socket's successfully bound to a an interface.
Event metadata:
data.listening.addr // listen address data.listening.fd // socket descriptor
ZMQ_EVENT_BIND_FAILED: socket could not bind to an address
The ZMQ_EVENT_BIND_FAILED event triggers when a socket could not bind to a given interface.
Event metadata:
data.bind_failed.addr // listen address data.bind_failed.err // errno value
ZMQ_EVENT_ACCEPTED: connection accepted to bound interface
The ZMQ_EVENT_ACCEPTED event triggers when a connection from a remote peer has been established with a socket's listen address.
Event metadata:
data.accepted.addr // listen address data.accepted.fd // socket descriptor
ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection
The ZMQ_EVENT_ACCEPT_FAILED event triggers when a connection attempt to a socket's bound address fails.
Event metadata:
data.accept_failed.addr // listen address data.accept_failed.err // errno value
ZMQ_EVENT_CLOSED: connection closed
The ZMQ_EVENT_CLOSED event triggers when a connection's underlying descriptor has been closed.
Event metadata:
data.closed.addr // address data.closed.fd // socket descriptor
ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed
The ZMQ_EVENT_CLOSE_FAILED event triggers when a descriptor could not be released back to the OS.
Event metadata:
data.close_failed.addr // address data.close_failed.err // errno value
ZMQ_EVENT_DISCONNECTED: broken session
The ZMQ_EVENT_DISCONNECTED event triggers when the stream engine (tcp and ipc specific) detects a corrupted / broken session.
Event metadata:
data.disconnected.addr // address data.disconnected.fd // socket descriptor
Return value
The zmq_socket_monitor() function returns a value of 0 or greater if successful. Otherwise it returns -1 and sets errno to one of the values defined below.
Errors
- ETERM
- The ØMQ context associated with the specified socket was terminated.
- EPROTONOSUPPORT
- The requested transport protocol is not supported. Monitor sockets are required to use the inproc:// transport.
- EINVAL
- The endpoint supplied is invalid.
Example
Observing a rep socket's connection state
// REP socket monitor thread
static void *rep_socket_monitor (void *ctx)
{
zmq_event_t event;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.rep");
assert (rc == 0);
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_LISTENING:
printf ("listening socket descriptor %d\n", event.data.listening.fd);
printf ("listening socket address %s\n", event.data.listening.addr);
break;
case ZMQ_EVENT_ACCEPTED:
printf ("accepted socket descriptor %d\n", event.data.accepted.fd);
printf ("accepted socket address %s\n", event.data.accepted.addr);
break;
case ZMQ_EVENT_CLOSE_FAILED:
printf ("socket close failure error code %d\n", event.data.close_failed.err);
printf ("socket address %s\n", event.data.close_failed.addr);
break;
case ZMQ_EVENT_CLOSED:
printf ("closed socket descriptor %d\n", event.data.closed.fd);
printf ("closed socket address %s\n", event.data.closed.addr);
break;
case ZMQ_EVENT_DISCONNECTED:
printf ("disconnected socket descriptor %d\n", event.data.disconnected.fd);
printf ("disconnected socket address %s\n", event.data.disconnected.addr);
break;
}
zmq_msg_close (&msg);
}
zmq_close (s);
return NULL;
}
// Create the infrastructure
void *ctx = zmq_init (1);
assert (ctx);
// REP socket
rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
// REP socket monitor, all events
rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
assert (rc == 0);
rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx);
assert (rc == 0);
rc = zmq_bind (rep, addr);
assert (rc == 0);
// Allow some time for event detection
zmq_sleep (1);
// Close the REP socket
rc = zmq_close (rep);
assert (rc == 0);
zmq_term (ctx);
See also
Authors
This ØMQ manual page was written by Lourens Naudé <moc.gnissimdohtem|sneruol#moc.gnissimdohtem|sneruol>