zmq

Search:
Group by:

Nim ZeroMQ wrapper. This package contains the low level C wrappers as well as some higher level constructs.

The low-level C bindings can be found in zmq/bindings

The high-level Connections API can be found in zmq/connections

The high-level Polling API can be found in zmq/poller

The Async API can be found in zmq/asynczmq

Example:

import zmq
import zmq
import std/[asyncdispatch, asyncfutures]

proc client () {.async.} =
  var requester = zmq.connect("tcp://localhost:5555", REQ)
  echo("Connecting...")
  for i in 0..10:
    echo("Sending hello... (" & $i & ")")
    send(requester, "Hello")
    var reply = await receiveAsync(requester)
    echo("Received: ", reply)

  send(requester, "STOPSERVER")
  close(requester)

proc server() : Future[int] {.async.} =
  var responder = zmq.listen("tcp://*:5555", REP)
  while true:
    var request = await receiveAsync(responder)
    if request == "STOPSERVER": break
    echo("Received: ", request)
    send(responder, "World")
    inc(result)
  close(responder)

let r = server()
asyncCheck client()
let res = waitFor r
echo "Server processed ", $(res), " requests"
Based on std/asyncdispatch, receiveAsync, sendAsync allows for asynchrone behaviour When using asynchrone procs, be careful of the internal state of the ZMQ Socket. Some Socket (such as REP/REQ) cannot send two message in a row without a receive (or vice-versa)

Example:

import zmq
import std/asyncdispatch
import zmq

const N_TASK = 5

proc pusher(nTask: int): Future[void] {.async.} =
  var pusher = listen("tcp://localhost:15556", PUSH)
  defer: pusher.close()

  for i in 1..nTask:
    let task = "task-" & $i
    # unlilke `pusher.send(task)`
    # this allow other async tasks to run
    await pusher.sendAsync(task)

proc puller(id: int): Future[void] {.async.} =
  const connStr = "tcp://localhost:15556"
  var puller = connect(connStr, PULL)
  defer: puller.close()

  for i in 1 .. N_TASK:
    let task = await puller.receiveAsync()
    echo "Pull socket nĀ°", $id, " received ", task
    await sleepAsync(100)

when isMainModule:
  asyncCheck pusher(N_TASK)
  for i in 1..1:
    asyncCheck puller(i)

  while hasPendingOperations():
    poll()
Find more examples in the examples and tests folder

Exports

ZMQ_PAIR, ENOBUFS, ZMQ_STREAM, ZMQ_NULL, EHOSTUNREACH, ZMQ_EVENT_LISTENING, ECONNRESET, msg_more, ZMQ_SUBSCRIBE, ZMQ_TCP_KEEPALIVE_CNT, ZMQ_TCP_ACCEPT_FILTER, send, ZMQ_IPC_FILTER_PID, ZMQ_FD, ZMQ_TCP_KEEPALIVE_INTVL, ZMQ_XPUB_MANUAL, ZMQ_RECONNECT_IVL, ZMQ_POLLOUT, ZContextOptions, ZMQ_SHARED, ZMQ_IPC_FILTER_GID, ZMQ_PLAIN_PASSWORD, ZMQ_CURVE_SERVER, ZMQ_VERSION_MINOR, ZMQ_MAKE_VERSION, EADDRNOTAVAIL, ZMQ_EVENT_CLOSED, ZMQ_SNDMORE, ZMQ_THREAD_SAFE, ZMQ_MAXMSGSIZE, ZMQ_SOCKET_LIMIT, ZContext, ZMQ_RCVMORE, ZMQ_DELAY_ATTACH_ON_CONNECT, ZMQ_VERSION_PATCH, ZMQ_XPUB_WELCOME_MSG, ZMQ_BACKLOG, msg_copy, ZMQ_IPV4ONLY, ZMQ_LAST_ENDPOINT, ZMQ_IO_THREADS, ZMQ_RCVBUF, connect, ZMQ_UNSUBSCRIBE, ZMQ_POLLITEMS_DFLT, EPROTONOSUPPORT, ZMQ_IMMEDIATE, ZMQ_CURVE_PUBLICKEY, ZMQ_EVENT_CONNECT_DELAYED, socket, ZMQ_POLLERR, ZMQ_THREAD_PRIORITY, send_const, ZMQ_IPC_FILTER_UID, ZMQ_MECHANISM, ZMQ_XREP, ZMQ_RCVTIMEO, ZMQ_ROUTER_MANDATORY, ZMQ_SOCKS_PROXY, ZMQ_EVENT_ACCEPT_FAILED, unbind, ZMQ_ROUTER_BEHAVIOR, msg_size, z85_decode, ZMQ_ZAP_DOMAIN, ZMQ_POLLPRI, errno, msg_send, ZMQ_XPUB_VERBOSE_UNSUBSCRIBE, ZMQ_ROUTER_HANDOVER, ZMQ_LINGER, ZMQ_IO_THREADS_DFLT, ZMQ_EVENT_CONNECTED, ZMQ_THREAD_SCHED_POLICY_DFLT, ZMQ_CURVE, ctx_term, poll, zmqdll, ZMQ_REQ_CORRELATE, ctx_new, EMTHREAD, ZMQ_DEALER, bindAddr, ZMQ_INVERT_MATCHING, ZMQ_SNDTIMEO, ZMQ_PULL, z85_encode, ZPollItem, setsockopt, msg_move, ZMQ_XPUB, ZMQ_CONNECT_RID, ZMQ_BLOCKY, ZMQ_MAX_SOCKETS, getsockopt, ZMQ_TCP_KEEPALIVE_IDLE, ZMQ_REP, ENETRESET, EINPROGRESS, proxy_steerable, disconnect, ZMQ_POLLIN, ZMQ_CURVE_SERVERKEY, ZMsg, ZMQ_GSSAPI_PRINCIPAL, ZMQ_PLAIN_SERVER, ZSocket, ZMQ_CURVE_SECRETKEY, term, ZMQ_HEARTBEAT_TIMEOUT, ZMQ_EVENT_ALL, ZMQ_FAIL_UNROUTABLE, ZMQ_RCVHWM, ZMQ_SRCFD, ctx_get, ZMQ_TCP_KEEPALIVE, recvmsg, proxy, ZMQ_EVENT_CLOSE_FAILED, ZMQ_MAX_SOCKETS_DFLT, strerror, ZMQ_GSSAPI, socket_monitor, init, ZMsgOptions, ctx_destroy, ZMQ_TCP_RETRANSMIT_TIMEOUT, msg_init, ZMQ_ROUTER, ZMQ_EVENT_ACCEPTED, ZMQ_HEARTBEAT_IVL, EAFNOSUPPORT, ZMQ_XSUB, sendmsg, ECONNABORTED, ZSockOptions, ZMQ_GSSAPI_PLAINTEXT, ZMQ_VERSION_MAJOR, ZMQ_IDENTITY, ZMQ_RATE, ZMQ_DONTWAIT, zmq_event_t, ZMQ_REQ_RELAXED, ZMQ_EAGAIN, ENOTSUP, ENETDOWN, version, ZMQ_XPUB_NODROP, ZMQ_HEARTBEAT_TTL, recv, ZMQ_PROBE_ROUTER, ETIMEDOUT, ZMQ_SUB, ZMQ_EVENT_MONITOR_STOPPED, msg_get, ETERM, ZSocketType, ZMQ_TYPE, msg_init, ZMQ_MORE, ZMQ_STREAM_NOTIFY, msg_recv, ZMQ_SERVER, ZMQ_REQ, ENETUNREACH, msg_init, ENOTCONN, ZMQ_GSSAPI_SERVICE_PRINCIPAL, ZMQ_SNDHWM, ZMQ_VERSION, ENOCOMPATPROTO, ZMQ_XPUB_VERBOSE, msg_set, ZMQ_SNDBUF, ZMQ_THREAD_PRIORITY_DFLT, ZMQ_CONFLATE, ctx_shutdown, ZMQ_AFFINITY, ZMQ_PUB, ZMQ_CONNECT_TIMEOUT, ZMQ_THREAD_SCHED_POLICY, EFSM, ZMQ_EVENT_DISCONNECTED, close, msg_data, ZSendRecvOptions, ZMQ_EVENT_CONNECT_RETRIED, ZMQ_CLIENT, ZMQ_PLAIN_USERNAME, ZMQ_MULTICAST_HOPS, ZMQ_EVENT_BIND_FAILED, ZMQ_ROUTER_RAW, ctx_set, ZMQ_HANDSHAKE_IVL, ZMQ_GSSAPI_SERVER, ZMQ_RECONNECT_IVL_MAX, EMSGSIZE, ZMQ_PUSH, ZMQ_EVENTS, ZMQ_NOBLOCK, ENOTSOCK, ECONNREFUSED, EADDRINUSE, ZMQ_PLAIN, ZMQ_TOS, ZMQ_RECOVERY_IVL, ZMQ_XREQ, msg_close, receiveAll, reconnect, send, receiveAll, tryReceive, ZConnectionImpl, close, getsockopt, =copy, connect, reconnect, sendAll, listen, connect, sendAll, receive, send, ZmqError, proxy, terminate, disconnect, waitForReceive, setsockopt, =sink, zmqError, receive, waitForReceive, proxy, ZConnection, tryReceive, getsockopt, newZContext, listen, newZContext, setsockopt, bindAddr, newZContext, unbind, len, register, events, initZPoller, events, poll, initZPoller, ZPoller, poll, initZPoller, initZPoller, register, =destroy, [], AsyncZPollCB, len, initZPoller, register, sendAsync, pollAsync, AsyncZPoller, register, receiveAsync, initZPoller, register, =destroy