pobox

External buffer processes to protect against mailbox overflow


License
MIT

Documentation

Build Status

PO Box

High throughput Erlang applications often get bitten by the fact that Erlang mailboxes are unbounded and will keep accepting messages until the node runs out of memory.

In most cases, this problem can be solved by imposing a rate limit on the producers, and it is recommended to explore this idea before looking at this library.

When it is impossible to rate-limit the messages coming to a process and that your optimization efforts remain fruitless, you need to start shedding load by dropping messages.

PO Box can help by shedding the load for you, and making sure you won't run out of memory.

The Principles

PO Box is a library that implements a buffer process. Erlang processes will receive their messages locally (at home), and may become overloaded because they have to both deal with their mailbox and day-to-day tasks:

         messages
            |
            V
+-----[Pid or Name]-----+
|      |         |      |
|      | mailbox |      |
|      +---------+      |
|       |               |
|    receive            |
+-----------------------+

A PO Box process will be where you will ask for your messages to go through. The PO Box process will implement a buffer (see Types of Buffer for details) that will do nothing but churn through messages and drop them when the buffer is full for you.

Depending on how you use the API, the PO Box can tell you it received new data, so you can then ask for the data, or you can tell it to send the data to you directly, without notification:

                                                  messages
                                                     |
                                                     V
+---------[Pid]---------+                +--------[POBox]--------+
|                       |<-- got mail ---|      |         |      |
|                       |                |      | mailbox |      |
|   <important stuff>   |--- send it! -->|      +---------+      |
|                       |                |       |               |
|                       |<---<messages>--|<---buffer             |
+-----------------------+                +-----------------------+

To be more detailed, a PO Box is a state machine with an owner process (which it receives messages for), and it has 3 states:

  • Active
  • Notify
  • Passive

The passive state basically does nothing but accumulate messages in the buffer and drop them when necessary.

The notify state is enabled by the user by calling the PO Box. Its sole task is to verify if there is any message in the buffer. If there is, it will respond to the PO Box's owner with a {mail, BoxPid, new_data} message sent directly to the pid. If there is no message in the buffer, the process will wait in the notify state until it gets one. As soon as the notification is sent, it reverts back to the passive state.

The active state is the only one that can send actual messages to the owner process. The user can call the PO Box to set it active, and if there are any messages in the buffer, all the messages it contains get sent as a list to the owner. If there are no messages, the PO Box waits until there is one to send it. After forwarding the messages, the PO Box reverts to the passive state.

The FSM can be illustrated as crappy ASCII as:

         ,---->[passive]------(user makes active)----->[active]
         |         | ^                                  |  ^  |
         |         | '---(sends message to user)--<-----'  |  |
         |  (user makes notify)                            |  |
         |         |                                       |  |
(user is notified) |                                       |  |
         |         V                                       |  |
         '-----[notify]---------(user makes active)--------'  |
                     ^----------(user makes notify)<----------'

Types of buffer

Currently, there are three types of built-in buffers supported: queues and stacks, and keep_old queues. You can also provide your own buffer implementation using the pobox_buf behaviour. See samples/pobox_queue_buf.erl for an example implementation.

Queues will keep messages in order, and drop oldest messages to make place for new ones. If you have a buffer of size 3 and receive messages a, b, c, d, e in that order, the buffer will contain messages [c,d,e].

keep_old queues will keep messages in order, but block newer messages from entering, favoring keeping old messages instead. If you have a buffer of size 3 and receive messages a, b, c, d, e in that order, the buffer will contain messages [a,b,c].

Stacks will not guarantee any message ordering, and will drop the top of the stack to make place for the new messages first. for the same messages, the stack buffer should contain the messages [e,b,a].

To choose between a queue and a stack buffer, you should consider the following criterias:

  • Do you need messages in order? Choose one of the queues.
  • Do you need the latest messages coming in to be kept, or the oldest ones? If so, pick queue and keep_old, respectively.
  • Do you need low latency? Then choose a stack. Stacks will give you many messages with low latency with a few with high latency. Queues will give you a higher overall latency, but less variance over time.

More buffer types could be supported in the future, if people require them.

How to build it

./rebar compile

How to run tests

./rebar compile ct

How to use it

Start a buffer with any of the following:

start_link(OwnerPid, MaxSize, BufferType)
start_link(OwnerPid, MaxSize, BufferType, InitialState)
start_link(Name, OwnerPid, MaxSize, BufferType)
start_link(Name, OwnerPid, MaxSize, BufferType, InitialState)
start_link(#{
    name => Name,
    owner => OwnerPid,
    max => MaxSize, %% mandatory
    type => BufferType,
    initial_state => InitialState,
    heir => HeirPid,
    heir_data => HeirData
})
start_link(Name, #{
    owner => OwnerPid,
    max => MaxSize, %% mandatory
    type => BufferType,
    initial_state => InitialState,
    heir => Heir,
    heir_data => HeirData
})

Where:

  • Name is any name a regular gen_fsm process can accept (including {via,...} tuples)
  • OwnerPid is the pid of the PO Box owner. It's the only one that can communicate with it in terms of setting state and reading messages. The OwnerPid can be either a pid or an atom. The PO Box will set up a link directly between itself and OwnerPid, and won't trap exits. If you're using named processes (atoms) and want to have the PO Box survive them individually, you should unlink the processes manually. This also means that processes that terminate normally won't kill the POBox.
  • MaxSize is the maximum number of messages in a buffer. Note that this is a mandatory property.
  • BufferType can be either queue, stack or keep_old and specifies which type is going to be used. You can also provide your buffer module using {mod, Module}.
  • InitialState can be either passive or notify. The default value is set to notify. Having the buffer passive is desirable when you start it during an asynchronous init and do not want to receive notifications right away.
  • Heir The name or pid of a process that will take over the PO Box if the owner dies. You can use a local registered name such as an atom or you can also use {global, Name} and {via, Module, Name} if you wish. If the Heir is a name the name is resolved as soon as the Owner dies. A message will be sent to the heir to notify it of the transfer and the PO Box will be put into passive state. The format of this message should look like {pobox_transfer, BoxPid, PreviousOwnerPid, HeirData, Reason}.
  • HeirData is data that should be sent as part of the pobox_transfer message to the heir when the owner dies.

The buffer can be made active by calling:

pobox:active(BoxPid, FilterFun, FilterState)

The FilterFun is a function that will take messages one by one along with custom state and can return:

  • {{ok, Message}, NewState}: the message will be sent.
  • {drop, NewState}: the message will be dropped.
  • skip: the message is left in the buffer and whatever was filtered so far gets sent.

A function that would blindly forward all messages could be written as:

fun(Msg, _) -> {{ok,Msg},nostate} end

A function that would limit binary messages by size could be written as:

fun(Msg, Allowed) ->
    case Allowed - byte_size(Msg) of
        N when N < 0 -> skip;
        N -> {{ok, Msg}, N}
    end
end

Or you could drop messages that are empty binaries by doing:

fun(<<>>, State) -> {drop, State};
   (Msg, State) -> {{ok,Msg}, State}
end.

The resulting message sent will be:

{mail, BoxPid, Messages, MessageCount, MessageDropCount}

Finally, the PO Box can be forced to notify by calling:

pobox:notify(BoxPid)

Which is objectively much simpler.

Messages can be sent to a PO Box by calling pobox:post(BoxPid, Msg) or sending a message directly to the process as BoxPid ! {post, Msg}.

The ownership of the PO Box can be transfered to another process by calling:

pobox:give_away(BoxPid, DestPid, DestData, Timeout)

or

pobox:give_away(BoxPid, DestPid, Timeout)

which is equivalent to:

pobox_give_away(BoxPid, DestPid, undefined, Timeout)

The call should return true on success and false on failure. Note that you can only call this from within the owner process otherwise the call always fails. If DestData is not provided it will be sent as undefined in the pobox_transfer message.

The destination process should receive a message of the following form:

{pobox_transfer, BoxPid, PreviousOwnerPid, DestData | undefined, give_away}

Example Session

First start a PO Box for the current process:

1> {ok, Box} = pobox:start_link(self(), 10, queue).
{ok,<0.39.0>}

We'll also define a spammer function that will just keep mailing a bunch of messages:

2> Spam = fun(F,N) -> pobox:post(Box,N), F(F,N+1) end.
#Fun<erl_eval.12.17052888>

Because we're in the shell, the function takes itself as an argument so it can both remain anonymous and loop. Each message is an increasing integer.

I can start the process and wait for a while:

3> Spammer = spawn(fun() -> Spam(Spam,0) end).
<0.42.0>

Let's see if we have anything in our PO box:

4> flush().
Shell got {mail, <0.39.0>, new_data}
ok

Yes! Let's get that content:

5> pobox:active(Box, fun(X,ok) -> {{ok,X},ok} end, ok).
ok
6> flush().
Shell got {mail,<0.39.0>,
                [778918,778919,778920,778921,778922,778923,778924,778925,
                 778926,778927],
                10,778918}
ok

So we have 10 messages with seqential IDs (we used a queue buffer), and the process kindly dropped over 700,000 messages for us, keeping our node's memory safe.

The spammer is still going and our PO Box is in passive mode. Let's cut to the chase and go directly to the active state:

7> pobox:active(Box, fun(X,ok) -> {{ok,X},ok} end, ok).
ok
8> flush().
Shell got {mail,<0.39.0>,
                [1026883,1026884,1026885,1026886,1026887,1026888,1026889,
                 1026890,1026891,1026892],
                10,247955}
ok

Nice. We can go back to notification mode too:

9> pobox:notify(Box).
ok
10> flush().
Shell got {mail, <0.39.0>, new_data}
ok

And keep going on and on and on.

Notes

  • Be careful to have a lightweight filter function if you expect constant overload from messages that keep coming very very fast. While the buffer filters out whatever messages you have, the new ones keep accumulating in the PO Box's own mailbox!
  • It is possible for a process to have multiple PO Boxes, although coordinating the multiple state machines together may get tricky.
  • The library is a generalization of ideas designed and implemented in logplex by Geoff Cant's (@archaelus). Props to him.
  • Using a keep_old buffer with a filter function that selects one message at a time would be equivalent to a naive bounded mailbox similar to what plenty of users asked for before. Tricking the filter function to forward the message (self() ! Msg) while dropping it will allow to do selective receives on bounded mailboxes.
  • When using post_sync/3 keep in mind that full doesn't mean your message will be dropped unless you are using the keep_old buffer type or a custom buffer that behaves the same way as keep_old.

Contributing

Accepted contributions need to be non-aesthetic, and provide some new functionality, fix abstractions, improve performance or semantics, and so on.

All changes received must be tested and not break existing tests.

Changes to currently untested functionality should ideally first provide a separate commit that shows the current behaviour working with the new tests (or some of the new tests, if you expand on the functionality), and then your own feature (and additional tests if required) in its own commit so we can verify nothing breaks in unpredictable ways.

Tests are written using Common Test. PropEr tests will be accepted, because they objectively rule. Ideally, you will wrap your PropEr tests in a Common Test suite so we can run everything with one command.

If you need help, feel free to ask for it in issues or pull requests. These rules are strict, but we're nice people!

Roadmap

This is more a wishlist than a roadmap, in no particular order:

  • Provide default filter functions in a new module

Changelog

  • 1.2.0: added heir and give_away functionality / fixed keep_old buffer size tracking
  • 1.1.0: added pobox_buf behaviour to add custom buffer implementations
  • 1.0.4: move to gen_statem implementation to avoid OTP 21 compile errors and OTP 20 warnings
  • 1.0.3: fix typespecs to generate fewer errors
  • 1.0.2: explicitly specify registered to be [] for relx compatibility, switch to rebar3
  • 1.0.1: fixing bug where manually dropped messages (with the active filter) would result in wrong size values and crashes for queues.
  • 1.0.0: A PO Box links itself to the process that it receives data for.
  • 0.2.0: Added PO Box's pid in the newdata message so a process can own more than a PO Box. Changed internal queue and stack size monitoring to be O(1) in all cases.
  • 0.1.1: adding keep_old queue, which blocks new messages from entering a filled queue.
  • 0.1.0: initial commit

Authors / Thanks

  • Fred Hebert / @ferd: library generalization and current implementation
  • Geoff Cant / @archaelus: design, original implementation
  • Jean-Samuel BĂ©dard / @jsbed: adaptation to gen_statem behaviour
  • Eric des Courtis / @edescourtis: added pobox_buf behaviour & heir/give_away functionality