Part 8 - Synchronization problems (2)

Posted on Feb 7, 2023
(Last updated: May 26, 2024)

In this part we’ll cover the classical problems that occur when dealing with synchronization.

But within this paradigm, we don’t encounter the same problems as when using semaphores. Mutual exclusion is not an issue, since we never share any resource, the big problem today will be synchronization and coordination.

We’ll see that many problems have the solution of a server-client architecture.

Barriers

Let’s quickly recap what our barriers should be able to do:

-module(barrier).

% Initialize barrier for ‘Expected’ processes
init(Expected) ->
    % TODO

% Block at ‘Barrier’ until all processes have reached it
wait(Barrier) ->
    % TODO

Since we are talking about processes, it’s natural to have a process for the barrier itself. This process keeps track of what other processes that has arrived at the barrier point.

When a new process arrives at the barrier, it sends a arrived message to the barrier process. When the list of all arrived processes is complete, the barrier process sends a continue message to everyone.

After notifying all other processes, the barrier processes itself, goes back to its initial state.

So we need to implement the barrier’s event loop as a server function:

barrier(Arrived, Expected, PidRefs)

Let’s implement this:

% event loop of barrier for ‘Expected’ processes
% Arrived: number of processes arrived so far
% PidRefs: list of {Pid, Ref} of processes arrived so far

% All processes arrived notify all waiting processes:
barrier(Arrived, Expected, PidRefs) when Arrived =:= Expected ->
    [To ! {continue, Ref} || {To, Ref} <- PidRefs],

    % Reset barrier
    barrier(0, Expected, []);

% Still waiting for some processes
barrier(Arrived, Expected, PidRefs) ->
    receive
        {arrived, From, Ref} ->
            % one more arrived: add {From, Ref} to PidRefs list:
            barrier(Arrived + 1, Expected, [{From, Ref}|PidRefs])
end.

Now for the wait function:

% Block at ‘Barrier’ until all processes have reached it
wait(Barrier) ->
    % Notify barrier of arrival
    Ref = make _ ref(),

    % Wait for signal to continue
    Barrier ! {arrived, self(), Ref},

    receive {continue, Ref} -> through end.

And finally, the init function, simple:

% Initialize barrier for ‘Expected’ processes
init(Expected) ->
    spawn(fun () -> barrier(0, Expected, []) end).

Resource allocator

Let’s recap the problem, an *allocator grants users, exclusive access to a number of resources.

Users asynchronously request and release resources back. The allocator ensures exclusive access to a single user, and keeps tracks of the number of available resources.

So our module would look like:

-module(allocator).

% Register allocator with list of Resources
init(Resources) ->
    % TODO

% Get N resources from allocator
request(N) ->
    % TODO

% Release Resources to allocator
release(Resources) ->
    % TODO

The user would perform something like:

user() ->
    % How many resources are needed?
    N = howMany(),

    % Get resources from allocator
    Resources = allocator:request(N),

    % Do something with resources
    use(Resources),

    % Release resources
    allocator:release(Resources),

    user().

Again, in the message-passing world, using a server-client architecture often solves the problem.

We dedicate a process to the allocator, which keeps track of list of resources.

When a process requests for some resources that are available, the allocator sends a granted message. Then accordingly removes those resources from the list.

When a process releases some resources, the allocator sends a released, and then adds the resources to the list.

If requests exceed the availability, the fall into our built-in mailbox. The allocator process will resolve this as soon as they pattern-match again (resources available again).

allocator(Resources) ->
    % Count how many resources are available
    Available = length(Resources),
    receive

        % Serve requests if enough resources are available
        {request, From, Ref, N} when N =< Available ->

            % Granted ++ Remaining =:= Resources
            % Length(Granted) =:= N
            {Granted, Remaining} = lists:split(N, Resources),

            % Send resources to requesting process
            From ! {granted, Ref, Granted},

            % Continue with Remaining resources
            allocator(Remaining);


        % Serve releases
        {releases, From, Ref, Released} ->
            % Notify releasing process
            From ! {released, Ref},

            % Continue with previous and released resources
            allocator(Resources ++ Released)

The request function:

% Get N resources from allocator, gets blocked if not available
request(N) ->
    Ref = make_ref(),
    allocator ! {request, self(), Ref, N},
    recieve {granted, Ref, Granted} -> Granted end.

% Release Resources to allocator
release(Resources) ->
    Ref = make_ref(),
    allocator ! {release, self(), Ref, Resources},
    recieve {released, Ref} -> released end.

Producer-consumer

Recap; Implement a buffer such that:

  • Producers and consumers access the buffer atomically
  • Consumers block when the buffer is empty
  • Producers block when the buffer is full (bounded buffer variant)
-module(buffer).

% Initialize buffer with size Bound
init_buffer(Bound) ->
    % TODO

% Put Item in Buffer; Block if full
put(Buffer, Item) ->
    % TODO

% Get Item from Buffer; Block if empty
get(Buffer) ->
    % TODO

The producer and buffer:

producer(Buffer) ->
    Item = produce(),
    buffer:put(Buffer, Item),
    producer(Buffer).


consumer(Buffer) ->
    Item = buffer:get(Buffer),
    % Do something with Item

    consume(Item),
    consumer(Buffer).

At this point you pretty much can see the pattern here that arises:

buffer(Content, Count, Bound) ->
    receive

    % Serve gets when buffer not empty
    {get, From, Ref} when Count > 0 ->
        % Match first item
        [First | Rest] = Content,

        % Send it out
        From ! {item, Ref, First},

        % Remove it from buffer
        buffer(Rest, Count-1, Bound);

    % Serve puts when buffer not full
    {put, From, Ref, Item} when Count < Bound ->

        % Send ack
        From ! {done, Ref},

        % Add item to end
        buffer(Content ++ [Item], Count + 1, Bound)
end.

In this solution, both a bounded and unbounded will work - due to Erlang’s order between numbers and atoms!

Now for get and put:

% Get item from ‘Buffer’; block if empty
get(Buffer) ->
    Ref = make_ref(),
    Buffer ! {get, self(), Ref},
    receive {item, Ref, Item} -> Item end.

% Put ‘Item’ in ‘Buffer’; block if full
put(Buffer, Item) ->
    Ref = make_ref(),
    Buffer ! {put, self(), Ref, Item},
    receive {done, Ref} -> done end.

Readers-writers

-module(board).

% Register board with Name
init(Name) ->
    % TODO

% Get read access to Board
begin_read(Board) ->
    % TODO

% Release read access to Board
end_read(Board) ->
    % TODO

% Get write access to Board
begin_write(Board) ->
    % TODO

% Release write access to Board
end_write(Board) ->
    % TODO

Our first naive server function would be:

% ‘Readers’ active readers and ‘Writers’ active writers
board_row(Readers, Writers) ->
receive
    {begin_read, From, Ref} when Writers =:= 0 ->
        From ! {ok_ to_ read, Ref},
        board_row(Readers+1, Writers);

    {begin_write, From, Ref} when (Writers =:= 0) and (Readers =:= 0) ->
        From ! {ok_ to_ write, Ref},
        board_row(Readers, Writers+1);

    {end_read, From, Ref} -> From ! {ok, Ref},
        board_row(Readers-1, Writers);

    {end_write, From, Ref} -> From ! {ok, Ref},
        board_row(Readers, Writers-1)
end.

Just as our naive solution when using semaphores, this doesn’t prevent starvation due to this version prioritizes readers.

The solution based on two monitors is a approach here, but it’s quite cumbersome for a message-passing program.

We instead implement two macro states:

  • Empty - no readers or writers
  • Readers - Readers but no writers

The initial board is in empty state, then:

  • When board is in state emtpy:
    • Read requests - served immediately, then switches to readers state.
    • Write requests - served immediately and synchronously, wait until writing ends, then go into empty state.
  • When board is in state readers:
    • Read requests - served immediately and stays in readers.
    • Write requests - served as soon as possible, board waits until all reading ends, then request is served. Back to empty state.

For this we’ll need two server functions, empty_board and readers_board:

% Board with no readers and no writers
empty_board() ->
    receive

    % Serve read request
    {begin_read, From, Ref} ->

        % Notify reader
        From ! {ok_to_read, Ref},

        % Board has one reader
        readers_board(1);

    % Serve write request synchronously
    {begin_write, From, Ref} ->
        % Notify writer
        From ! {ok_to_write, Ref},

        % Wait for writer to finish
        Receive
            {end_write, _From, _Ref} ->
                % Board is empty again
                empty_board()
        end
end.

% Board with no readers (and no writers)
readers_ board(0) -> empty_ board();

% Board with ‘Readers’ active readers
% (and no writers)
readers_board(Readers) ->
    receive

        % Serve write request
        {begin_write, From, Ref} ->
            % Wait until all ‘Readers’ have finished
            [receive {end_read, _From, _Ref} -> end_read end || _ <- lists:seq(1, Readers)],

            % Notify writer
            From ! {ok_to_write, Ref},

            % Wait for writer to finish
            receive
                {end_write, _From, _Ref} -> empty_board()
            end;

        % Serve read request
        {begin_read, From, Ref} ->
            % Notify reader
            From ! {ok _ to _ read, Ref},

            % Board has one more reader
            readers _ board(Readers+1);

        % Serve end read
        {end_read, From, Ref} ->

            % Board has one less reader
            readers_board(Readers-1)
end.

Dining Philosophers

-module(philosophers).

% Set up table of N philosophers
init(N) ->
    % TODO

% Philosopher picks up Fork
get_fork(Fork) ->
    % TODO

% Philosopher releases Fork
put_fork(Fork) ->
    % TODO

We could explore the solutions we did based on locking and breaking symmetry - but there is a solution which better fits into the message-passing paradigm

We have a waiter (process) who supervises access to the table. So each philosopher asks for permission to sit at the table before picking up both forks.

So, as long as the waiter allows strictly fewer philosopher than the total number of forks to sit around the table, deadlock and starvation are avoided.

Waiter interface:

% Ask Waiter to be seated; may wait
sit(Waiter) ->
    % TODO

% Ssk Waiter to leave
leave(Waiter) ->
    % TODO

Our server function:

waiter(Eating, Seats) ->
    receive

    % Serve as long as seats are available
    {sit, From, Ref} when Eating < Seats ->
        From ! {ok_to_sit, Ref},

        % One more eating
        waiter(Eating+1, Seats);

    % Can leave at any time
    {leave, From, Ref} ->
        From ! {ok_to_leave, Ref},

        % One less eating
        waiter(Eating-1, Seats)
end.

And sit and leave:

% ask Waiter to be seated; may wait
sit(Waiter) ->
    Ref = make _ ref(),
    Waiter ! {sit, self(), Ref},
    receive {ok_to_sit, Ref} -> ok end.

% ask Waiter to leave
leave(Waiter) ->
    Ref = make _ ref(),
    Waiter ! {leave, self(), Ref},
    receive {ok_to_leave, Ref} -> ok end.

Now, each fork is also a process, which keeps track of whether the for is free or not.

Server function:

% Fork not held by anyone
fork() ->
    receive
        {get, From, Ref} ->
            From ! {ack, Ref},

            % Fork held
            fork(From)
end.

% a fork held by Owner
fork(Owner) ->
    receive
        {put, Owner, _ Ref} ->
            % Fork not held
            fork()
end.

and the get and put for the forks:

% Pick up Fork; block until available
get_fork(Fork) ->
    Ref = make _ ref(),
    Fork ! {get, self(), Ref},
    receive {ack, Ref} -> ack end.

% Put down Fork
put_fork(Fork) ->
    Ref = make _ ref(),
    Fork ! {put, self(), Ref}.

And finally, the init function for the whole problem:

% Set up table of ‘N’ philosophers
init(N) ->
    % Spawn waiter process
    Waiter = spawn(fun () -> waiter(0, N-1) end),

    % [1, 2, ..., N]
    Ids = lists:seq(1,N),

    % Spawn fork processes
    Forks = [spawn(fun fork/0) || _ <- Ids],

    % Spawn philosopher processes
    [spawn(fun () ->
        Left = lists:nth(I, Forks),

        % 1-based indexes
        Right = lists:nth(1+(I rem N), Forks),
        philosopher(#forks{left=Left, right=Right}, Waiter)
    end) || I <- Ids].