Part 6 & 7 - Message-Passing Concurrency

Posted on Feb 4, 2023
(Last updated: May 26, 2024)

So far we have looked at threads which share memory. The so-called shared memory model, but in this part we’ll cover distributed memory model, specifically, message-passing concurrency.

For this we’ll cover and use the programming language, Erlang!

So let’s quickly have a crash course over Erlang!

What is Erlang?

Erlang is a functional programming language with message-passing features. The message-passing part is concurrent and implements the actor model, where Erlang processes are actors.

This post won’t cover functional languages in general, so I hope you remember good ‘ol Haskell.

Let’s look at a little code snippet from Erlang:

-module(power).
-export([power/2]).

power(X, 0) -> 1;
power(X, N) -> X * power(X, N - 1).

Then if we enter the erl shell by erl and doing:

c(power).

power:power(2, 3).

We see the output 8!

Types in Erlang

Erlang has 8 primitives types and 5 compound types:

  • Primitives
    • Integers
      • Arbitrary sized integers with the usual operations
    • Atoms
    • Floats
      • 64-bit floating point numbers
    • References
      • Globally unique symbols
    • Binaries
      • Sequence of bytes
    • Pids
      • Process identifiers
    • Ports
      • Used for communication
    • Funs
      • Function closures
  • Compound types
    • Tuples
      • Fixed size containers
    • Lists
      • Dynamically sized containers
    • Maps
      • Key-value tables
    • Strings
      • Syntactic sugar for sequence of characters
    • Records
      • Syntactic sugar to access tuples elements by name

As you can see for example, there is no boolean type for true and false. Instead, the atoms true and false are used for this.

One interesting design choice I have encountered in Erlang is the relational operators.

== is numerical equal, /= is numerical not equal. Compared to =:= and =/= which are the “normal” equal we associate with.

Order between types

In Erlang, types have a distinct order relation:

number < atom < reference < funs < port < pid < tuple < map < list

Which means that for example: 3 < true is true.

Tuples

Tuples are written like this in Erlang:

{ }
{10, 12, 98}
{8.88, false, aToM}
{10, {-1, true}}

They are also 1-indexed.

Lists

[ ]
[10, 12, 98]
[8.88, false, aToM]
[10, [-1, true]]

They have the following nice operators as well:

% A copy of L with H added as head element
[H | L]

% Get head element
hd(L)

% Get tail element
tl(L)

% Concatenation and deletion
L1 ++ L2
L1 -- L2

Records

Records are ordered sequences with a fixed number of elements, where each element has an atom as name

  • Records are just syntactic sugar for tuples where positions are named
-record(person, { name="add name", age })

Multiple expressions

The , operator in Erlang makes it so we can combine expressions, for example:

% Evalutes 3 < 0 and returns 2
3 < 0, 2.

% Evaluates 3 + true and fails (because of 3 + true)
3 + true, 2.

% Binds 10 to R, binds 3.14 to Pi, returns 62.8...
R = 10, Pi = 3.14, 2*Pi*R.

Modules

In our power example you may have noticed the module and export keywords.

-module(foo).
-export([bar/1, baz/0]).

bar(X) -> X.
baz()  -> 0.

As you can the, f/n, declares the function name and number of parameters (or arity n, with patterns).

Guards

This is something we should be familiar with from Haskell:

can_drive(Name, Age) when Age >= 18 -> Name ++ " can drive";
can_drive(Name, _) -> Name ++ " cannot drive".

Erlang is very similar to Haskell in its clauses, gaurds, pattern matchings. Since it’s functional as well, we can pass around functions as values, but also remember higher-order functions etc etc.

This is mostly so, I remember the Erlang syntax itself, and not a post on functional languages as I said :).

Erlang’s principles

Erlang has some principles that it follows, let’s take a look:

  • Processes are strongly isolated.
  • Process creation and destruction is a lightweight operation.
  • Message passing is the only way for processes to interact with each other.
  • Processes have unique names.
  • If you know the name of a process, you can send it a message to it.
  • Processes share no resources.
  • Error handling is non-local.
  • Processes do what they are supposed to do or fail.

Let’s quickly compare the different concurrent models:

  • Shared memory
    • Synchronize by writing to and reading from shared memory.
  • Message passing
    • Synchronize by exchanging messages

The actor model

Erlang’s message-passing model is based on the actor model:

  • Actors are just abstractions of processes
  • No shared state or resources between actors (processes)
  • Actors (processes) communicate by exchanging messages - asynchronous messages passing at that.

Let’s clearly define the actors and messages:

Each actor is identified by their address, we can think of this as their unique ID.

An actor can:

  • send messages to other actors via their addresses/IDs.
  • change its behavior, what it computes/how it reacts to messages.
  • create new actors

A message includes:

  • A recipient, the identifier address/ID
  • Content, arbitrary information necessary for the actors

Sending and receiving messages

A process (actor) in Erlang is created by calling spawn, it is identified by its PID (Process identifier) and executes its behavioral function, f, (which is passed as a parameter into spawn). When the function terminates the process ends.

Let’s take a look at the spawn function:

spawn(M, F, Args)

When creating a process, the newly created process runs the function F in module M with arguments Args.

Let’s look at an example:

-module(procs).

print_sum(X,Y) ->
    io:format("~p~n", [X+Y]).

compute_sum(X,Y) -> X + Y.

then in the shell we just do:

spawn(procs, print_sum, [3, 4]).

And we should see the result 7.

Messages are as simple, typically a message is the result of evaluting an expression:

Pid ! Message

This sends the evaluation, T, of Message to the process with PID, Pid, and returns T as a result.

Note that the, !, or “bang” operator is right-associative

Mailboxes

Every process has a mailbox, this is how it becomes asynchronous, a process never needs to wait to read a message, we only need to check our mailbox.

To check a process with Pid, Pid, we can use:

% How many elements are in the mailbox
process_info(Pid, message_queue_len)

% List of messages in the mailbox (oldest to newest)
process_info(Pid, messages)

% Empty the current process's mailbox
flush()

For example:

% Send 'hello' twice to self
self() ! self() ! hello.

% send 'world' to self
self() ! world.

process_info(self(), message).

> {messages, [hello, hello, world]}

To receive a message, we can use the receive expression:

receive
    P1 when C1 -> E1;
        .
        .
        .
    PN when CN -> EN
end

Evaluating the recieve expression, selects the oldest message in our mailbox, that matches the pattern. If we don’t find any match we block until a suitable message arrives.

For example, a simple echo function:

echo() ->
    recieve Msg -> io:format("Receive: ~p~n", [Msg]) end.
    % To make this permanent we just do this:
    echo().

Note that Erlang’s runtime only provides weak guarantees of message delivery order. So it’s not guaranteed that the messages are received in the same order as sent. This is in the case when sending to multiple different processes, if we are only sending to a single process, the order will be the same.

Stateful processes

Processes can only operate on the arguments of the function they run, thus we need to store the state information using arguments.

These values gets updates by the recursive calls used to make a process permanently running.

An example makes this clear, let’s implement our beloved counter program:

base_counter(N) ->
recieve {From, Command} -> case Command of
    increment -> base_counter(N + 1);

    count     -> From ! {self(), N},
                 base_counter(N);

    % Unrecongnized command
    U         -> io:format("? ~p~n", [U])

Clients and servers

The client/server architecture is widely used, so let’s go!

  1. A server is available to serve requests from any clients.

  2. An arbitrary number of clients send commands to the server and wait for the server’s response.

Servers

A server is a process that:

  • Responds to a fixed number of commands
  • Runs indefinitely, serving an arbitrary number of requests, until it recieves a shutdown command.
  • Can serve an arbitrary number of clients

Each command is a message of the form:

{Command, From, Ref, Arg1, ..., ArgN}

  • Command is the command’s name.
  • From is the PID of the client.
  • Ref is a unique identifier of the request.
  • Arg1, ..., ArgN are arguments to the command.

Math server

Let’s make a simple math server:

% Interface for the server/commands

start(): start a math server, and return the server's PID

factorial(S, M): compute the factorial of M on server with PID, S

status(S): return the number of requests server so far on the server with PID, S

stop(S): shutdown server with PID, S

The main/event loop would be:

loop(N) ->

    receive% ‘factorial’ command:
    {factorial, From, Ref, M} ->
        From ! {response, Ref, compute_factorial(M)},

        % increment request number
        loop(N+1);

    % ‘status’ command:
    {status, From, Ref} ->
        From ! {response, Ref, N},

        % don’t increment request number
        loop(N);

    % ‘stop’ command:
    {stop, _From, _Ref} -> ok
end.

The starting and stopping:

% start a server, return server’s pid
start() ->
    spawn(fun () -> loop(0) end).

% shutdown ‘Server’
stop(Server) ->
    % Ref is not needed
    Server ! {stop, self(), 0},
    ok.

Factorial and status:

% compute factorial(M) on ‘Server’:
factorial(Server, M) ->
    % unique reference number
    Ref = make_ref(),

    % send request to server:
    Server ! {factorial, self(), Ref, M},

    % wait for response, and return it:
    receive {response, Ref, Result} -> Result end.

% return number of requests served so far by ‘Server’:
status(Server) ->
    % unique reference number
    Ref = make_ref(),

    % send request to server:
    Server ! {status, self(), Ref},

    % wait for response, and return it:
    receive {response, Ref, Result} -> Result end.