system/channels_builtin

Source   Edit  

Channel support for threads.

Note: This is part of the system module. Do not import it directly. To activate thread support compile with the --threads:on command line switch.

Note: Channels are designed for the Thread type. They are unstable when used with spawn

Note: The current implementation of message passing does not work with cyclic data structures.

Note: Channels cannot be passed between threads. Use globals or pass them by ptr.

Example

The following is a simple example of two different ways to use channels: blocking and non-blocking.

# Be sure to compile with --threads:on.
# The channels and threads modules are part of system and should not be
# imported.
import std/os

# Channels can either be:
#  - declared at the module level, or
#  - passed to procedures by ptr (raw pointer) -- see note on safety.
#
# For simplicity, in this example a channel is declared at module scope.
# Channels are generic, and they include support for passing objects between
# threads.
# Note that objects passed through channels will be deeply copied.
var chan: Channel[string]

# This proc will be run in another thread using the threads module.
proc firstWorker() =
  chan.send("Hello World!")

# This is another proc to run in a background thread. This proc takes a while
# to send the message since it sleeps for 2 seconds (or 2000 milliseconds).
proc secondWorker() =
  sleep(2000)
  chan.send("Another message")

# Initialize the channel.
chan.open()

# Launch the worker.
var worker1: Thread[void]
createThread(worker1, firstWorker)

# Block until the message arrives, then print it out.
echo chan.recv() # "Hello World!"

# Wait for the thread to exit before moving on to the next example.
worker1.joinThread()

# Launch the other worker.
var worker2: Thread[void]
createThread(worker2, secondWorker)
# This time, use a non-blocking approach with tryRecv.
# Since the main thread is not blocked, it could be used to perform other
# useful work while it waits for data to arrive on the channel.
while true:
  let tried = chan.tryRecv()
  if tried.dataAvailable:
    echo tried.msg # "Another message"
    break
  
  echo "Pretend I'm doing useful work..."
  # For this example, sleep in order not to flood stdout with the above
  # message.
  sleep(400)

# Wait for the second thread to exit before cleaning up the channel.
worker2.joinThread()

# Clean up the channel.
chan.close()

Sample output

The program should output something similar to this, but keep in mind that exact results may vary in the real world:

Hello World!
Pretend I'm doing useful work...
Pretend I'm doing useful work...
Pretend I'm doing useful work...
Pretend I'm doing useful work...
Pretend I'm doing useful work...
Another message

Passing Channels Safely

Note that when passing objects to procedures on another thread by pointer (for example through a thread's argument), objects created using the default allocator will use thread-local, GC-managed memory. Thus it is generally safer to store channel objects in global variables (as in the above example), in which case they will use a process-wide (thread-safe) shared heap.

However, it is possible to manually allocate shared memory for channels using e.g. system.allocShared0 and pass these pointers through thread arguments:

proc worker(channel: ptr Channel[string]) =
  let greeting = channel[].recv()
  echo greeting

proc localChannelExample() =
  # Use allocShared0 to allocate some shared-heap memory and zero it.
  # The usual warnings about dealing with raw pointers apply. Exercise caution.
  var channel = cast[ptr Channel[string]](
    allocShared0(sizeof(Channel[string]))
  )
  channel[].open()
  # Create a thread which will receive the channel as an argument.
  var thread: Thread[ptr Channel[string]]
  createThread(thread, worker, channel)
  channel[].send("Hello from the main thread!")
  # Clean up resources.
  thread.joinThread()
  channel[].close()
  deallocShared(channel)

localChannelExample() # "Hello from the main thread!"

Types

Channel*[TMsg] {....gcsafe.} = RawChannel
a channel for thread communication Source   Edit  

Procs

proc close*[TMsg](c: var Channel[TMsg])
Closes a channel c and frees its associated resources. Source   Edit  
proc open*[TMsg](c: var Channel[TMsg]; maxItems: int = 0)

Opens a channel c for inter thread communication.

The send operation will block until number of unprocessed items is less than maxItems.

For unlimited queue set maxItems to 0.

Source   Edit  
proc peek*[TMsg](c: var Channel[TMsg]): int

Returns the current number of messages in the channel c.

Returns -1 if the channel has been closed.

Note: This is dangerous to use as it encourages races. It's much better to use tryRecv proc instead.

Source   Edit  
proc ready*[TMsg](c: var Channel[TMsg]): bool
Returns true if some thread is waiting on the channel c for new messages. Source   Edit  
proc recv*[TMsg](c: var Channel[TMsg]): TMsg

Receives a message from the channel c.

This blocks until a message has arrived! You may use peek proc to avoid the blocking.

Source   Edit  
proc send*[TMsg](c: var Channel[TMsg]; msg: sink TMsg) {.inline.}
Sends a message to a thread. msg is deeply copied. Source   Edit  
proc tryRecv*[TMsg](c: var Channel[TMsg]): tuple[dataAvailable: bool, msg: TMsg]

Tries to receive a message from the channel c, but this can fail for all sort of reasons, including contention.

If it fails, it returns (false, default(msg)) otherwise it returns (true, msg).

Source   Edit  
proc trySend*[TMsg](c: var Channel[TMsg]; msg: sink TMsg): bool {.inline.}

Tries to send a message to a thread.

msg is deeply copied. Doesn't block.

Returns false if the message was not sent because number of pending items in the channel exceeded maxItems.

Source   Edit