threading/channels

Source   Edit  

This module works only with one of --mm:arc / --mm:atomicArc / --mm:orc compilation flags.

Warning: This module is experimental and its interface may change.

This module implements multi-producer multi-consumer channels - a concurrency primitive with a high-level interface intended for communication and synchronization between threads. It allows sending and receiving typed, isolated data, enabling safe and efficient concurrency.

The Chan type represents a generic fixed-size channel object that internally manages the underlying resources and synchronization. It has to be initialized using the newChan proc. Sending and receiving operations are provided by the blocking send and recv procs, and non-blocking trySend and tryRecv procs. Send operations add messages to the channel, receiving operations remove them.

See also:

The following is a simple example of two different ways to use channels: blocking and non-blocking.

Example: cmd: --threads:on --gc:orc

import threading/channels
import std/os

# In this example a channel is declared at module scope.
# Channels are generic, and they include support for passing objects between
# threads.
# Note that isolated data passed through channels is moved around.
var chan = newChan[string]()

block example_blocking:
  # This proc will be run in another thread.
  proc basicWorker() =
    chan.send("Hello World!")

  # Launch the worker.
  var worker: Thread[void]
  createThread(worker, basicWorker)

  # Block until the message arrives, then print it out.
  var dest = ""
  dest = chan.recv()
  assert dest == "Hello World!"

  # Wait for the thread to exit before moving on to the next example.
  worker.joinThread()

block example_non_blocking:
  # This is another proc to run in a background thread. This proc takes a while
  # to send the message since it first sleeps for some time.
  proc slowWorker(delay: Natural) =
    # `delay` is a period in milliseconds
    sleep(delay)
    chan.send("Another message")

  # Launch the worker with a delay set to 2 seconds (2000 ms).
  var worker: Thread[Natural]
  createThread(worker, slowWorker, 2000)

  # This time, use a non-blocking approach with tryRecv.
  # Since the main thread is not blocked, it could be used to perform other
  # useful work while it waits for data to arrive on the channel.
  var messages: seq[string]
  while true:
    var msg = ""
    if chan.tryRecv(msg):
      messages.add msg # "Another message"
      break
    messages.add "Pretend I'm doing useful work..."
    # For this example, sleep in order not to flood the sequence with too many
    # "pretend" messages.
    sleep(400)

  # Wait for the second thread to exit before cleaning up the channel.
  worker.joinThread()

  # Thread exits right after receiving the message
  assert messages[^1] == "Another message"
  # At least one non-successful attempt to receive the message had to occur.
  assert messages.len >= 2

Types

Chan[T] = object
Typed channel Source   Edit  

Procs

proc `=copy`[T](dest: var Chan[T]; src: Chan[T])
Shares Channel by reference counting. Source   Edit  
proc `=destroy`[T](c: Chan[T]) {....raises: [].}
Source   Edit  
proc `=dup`[T](src: Chan[T]): Chan[T]
Source   Edit  
proc `=wasMoved`[T](x: var Chan[T])
Source   Edit  
proc newChan[T](elements: Positive = 30): Chan[T]

An initialization procedure, necessary for acquiring resources and initializing internal state of the channel.

elements is the capacity of the channel and thus how many messages it can hold before it refuses to accept any further messages.

Source   Edit  
proc peek[T](c: Chan[T]): int {.inline.}
Returns an estimation of the current number of messages held by the channel. Source   Edit  
proc recv[T](c: Chan[T]): T {.inline.}
Receives a message from the channel. A version of recv that returns the message. Source   Edit  
proc recv[T](c: Chan[T]; dst: var T) {.inline.}

Receives a message from the channel c and fill dst with its value.

This blocks the receiving thread until a message was successfully received.

If the channel does not contain any messages this will block the thread until a message get sent to the channel.

Source   Edit  
proc recvIso[T](c: Chan[T]): Isolated[T] {.inline.}
Receives a message from the channel. A version of recv that returns the message and isolates it. Source   Edit  
proc send[T](c: Chan[T]; src: sink Isolated[T]) {.inline.}

Sends the message src to the channel c. This blocks the sending thread until src was successfully sent.

The memory of src is moved, not copied.

If the channel is already full with messages this will block the thread until messages from the channel are removed.

Source   Edit  
proc tryRecv[T](c: Chan[T]; dst: var T): bool {.inline.}

Tries to receive a message from the channel c and fill dst with its value.

Doesn't block waiting for messages in the channel to become available. Instead returns after an attempt to receive a message was made.

Warning: In high-concurrency situations, consider using an exponential backoff strategy to reduce contention and improve the success rate of operations.

Returns false and does not change dist if no message was received.

Source   Edit  
proc trySend[T](c: Chan[T]; src: sink Isolated[T]): bool {.inline.}

Tries to send the message src to the channel c.

The memory of src will be moved if possible. Doesn't block waiting for space in the channel to become available. Instead returns after an attempt to send a message was made.

Warning: In high-concurrency situations, consider using an exponential backoff strategy to reduce contention and improve the success rate of operations.

Returns false if the message was not sent because the number of pending messages in the channel exceeded its capacity.

Source   Edit  
proc tryTake[T](c: Chan[T]; src: var Isolated[T]): bool {.inline.}

Tries to send the message src to the channel c.

The memory of src is moved directly. Be careful not to reuse src afterwards. This proc is suitable when src cannot be copied.

Doesn't block waiting for space in the channel to become available. Instead returns after an attempt to send a message was made.

Warning: In high-concurrency situations, consider using an exponential backoff strategy to reduce contention and improve the success rate of operations.

Returns false if the message was not sent because the number of pending messages in the channel exceeded its capacity.

Source   Edit  

Templates

template send[T](c: Chan[T]; src: T)
Helper template for send. Source   Edit  
template trySend[T](c: Chan[T]; src: T): bool
Helper template for trySend.
Warning: For repeated sends of the same value, consider using the tryTake proc with a pre-isolated value to avoid unnecessary copying.
Source   Edit