Periodic: run functions at intervals
The Periodic supervisor manages a dynamic set of tasks. Each of these tasks is run repeatedly at a per-task specified interval.
A task is repreented as a function. It receives a single parameter, its current state. When complete, this function can return
-
{ :ok, new_state }
to have itself rescheduled with a (potentially) updated state. -
{ :change_interval, new_interval, new_state }
to reschedule itself with a new state, but updating the interval betweeen schedules. -
{ :stop, :normal }
to exit gracefully. -
any other return value will be treated as an error.\
All intervals are specified in milliseconds.
What does it look like?
mix.exs:
deps: { :periodic, ">= 0.0.0" },
application.ex
child_spec = [
Periodic,
MyApp,
. . .
]
First a silly example:
defmodule Silly do
use GenServer
def callback(state = [{ label, count }]) do
IO.inspect state
{ :ok, [ { label, count + 100 }]}
end
def start_link(_) do
GenServer.start_link(__MODULE__, nil)
end
def init(_) do
Periodic.repeat({ __MODULE__, :callback }, 500, state: [ one: 1 ])
Periodic.repeat({ __MODULE__, :callback }, 300, state: [ two: 2 ], offset: 100)
{ :ok, nil }
end
end
The calls to Periodic.repeat
will cause the callback
function to be
called in two different sequences: the first time it will be called
every 500ms, and it will also be called every 300ms. Each sequence of
calls will maintain its own state.
This will output:
Compiling 1 file (.ex)
[one: 1]
[two: 2]
[two: 102]
[one: 101]
[two: 202]
[one: 201]
[two: 302]
[two: 402]
. . .
As something more complex, here's a genserver that fetches data from two feeds. The first is fetched every 30 seconds, and the second every 60s.
defmodule Fetcher do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, nil)
end
def init(_) do
{ :ok, _ } = Periodic.repeat({ __MODULE__, :fetch }, 30_000,
state: %{ feed: Stocks, respond_to: self() })
{ :ok, _ } = Periodic.repeat({ __MODULE__, :fetch }, 60_000,
state: %{ feed: Bonds, respond_to: self() }, offset: 15_000)
{ :ok, %{} }
end
# this function is run by the two task runners created in `init/1)`. They
# fetch data from the feed whose name is in the state, and then send the
# result back to the original server
def fetch(task) do
data = task.feed.fetch()
Fetcher.handle_data(task.respond_to, task.feed, data)
{ :ok, state }
end
# and this function forwards the feed response on to the server
def handle_data(worker_pid, feed, data) do
GenServer.cast(worker_pid, { incoming, feed, data })
end
def handle_cast({ :incoming, Stocks, data }, state) do
## ...
end
def handle_cast({ :incoming, Bonds, data }, state) do
## ...
end
end
Notes:
-
In the real world you'd likely split this into multiple modules.
-
The parameters to the first call to
Periodic.repeat
say runFetcher.fetch
every 30s, passing it a map containing the name of a feed and the pid to send the data to. -
the second call to
Fetcher.fetch
sets up a second schedule. This happens to call the same function, but every 60s. It also offsets the time of these calls (starting with the first) by 15sThis means the timeline for calls to the function will be:
time from start call +0s fetch{feed: Stocks, ...} +15s fetch{feed: Bonds, ...} +30s fetch{feed: Stocks, ...} +60s fetch{feed: Stocks, ...} +75s fetch{feed: Bonds, ...} +90s fetch{feed: Stocks, ...} +120s fetch{feed: Stocks, ...} +135s fetch{feed: Bonds, ...} . . . -
The
fetch
function gets data for the appropriate feed, and then calls back into the original module, passing the pid of the genserver, the name of the feed and the data. -
The
handle_data
function it calls just forwards the request on to the genserver.(Technically the call to
GenServer.cast
could have been made directly in thefetch
function, but in our mythical real world, it's likely the periodically run functions would be decoupled from the genserver.
The API
To cause a function to be invoked repeatedly every so many milliseconds, use:
{ :ok, pid } = Periodic.repeat(func_spec, interval, options \\ [])
-
func_spec
may be an anonymous function of arity 1, a 2-tuple containing the name of a module and the name of a function, or just the name of the module (in which case the function is assumed to be namedrun/1
. -
The
interval
specifies the number of milliseconds between executions of the function.Periodic
makes some attempt to minimize drift of this timing, but you should treat the value as approximate: you'll see some spreading of the interval timing of perhaps a millisecond on some iterations. -
The options list make contain:
-
state:
termThe initial state that is passed as a parameter when the function is first executeded.
-
name:
nameA name for the task. This can be used subsequently to terminate it.
-
offset:
msAn offset (in milliseconds) to be applied before the first execution of the function. This can be used to stagger executions of multiple sets of periodic functions if their intervals would otherwise cause them to execute at the same time.
-
You can remove a previously added periodic function with
Periodic.stop_task(pid)
where pid
is the value returned by repeat/3
The Callback Function
You write functions that Periodic
will call. These will have the spec:
@typep state :: term()
@typep periodic_callback_return ::
{ :ok, state() } |
{ :change_interval, new_interval::integer(), state() } |
{ :stop, :normal } |
other :: any()
@spec periodic_callback(state :: state()) :: periodic_callback_return()
Runtime Charactertics
-
Periodic
is aDynamicSupervisor
which should be started by one of your application's own supervisors. -
Each call to
Periodic.repeat
creates a new worker process. This worker spends most of its time waiting for the interval timer to trigger, at which point it invokes the function you passed it, then resets the timer. -
If a function takes more time to execute than the interval time, then the next call to that function will happen immediately, and all subsequent calls to it will be timeshifted by the overrun.
See license.md for copyright and licensing information.