Asyncio retry utility for Python 3.7+
$ pip install aioretry
A conda-forge recipe is also available, so you can also use
conda install -c conda-forge aioretry
import asyncio
from aioretry import (
retry,
# Tuple[bool, Union[int, float]]
RetryPolicyStrategy,
RetryInfo
)
# This example shows the usage with python typings
def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
"""
- It will always retry until succeeded: abandon = False
- If fails for the first time, it will retry immediately,
- If it fails again,
aioretry will perform a 100ms delay before the second retry,
200ms delay before the 3rd retry,
the 4th retry immediately,
100ms delay before the 5th retry,
etc...
"""
return False, (info.fails - 1) % 3 * 0.1
@retry(retry_policy)
async def connect_to_server():
# connec to server
...
asyncio.run(connect_to_server())
We could also use retry
as a decorator for instance method
class Client:
@retry(retry_policy)
async def connect(self):
await self._connect()
asyncio.run(Client().connect())
retry_policy
could be the method name of the class if retry
is used as a decorator for instance method.
class ClientWithConfigurableRetryPolicy(Client):
def __init__(self, max_retries: int = 3):
self._max_retries = max_retries
def _retry_policy(self, info: RetryInfo) -> RetryPolicyStrategy:
return info.fails > self._max_retries, info.fails * 0.1
# Then aioretry will use `self._retry_policy` as the retry policy.
# And by using a str as the parameter `retry_policy`,
# the decorator must be used for instance methods
@retry('_retry_policy')
async def connect(self):
await self._connect()
asyncio.run(ClientWithConfigurableRetryPolicy(10).connect())
We could also register an before_retry
callback which will be executed after every failure of the target function if the corresponding retry is not abandoned.
class ClientTrackableFailures(ClientWithConfigurableRetryPolicy):
# `before_retry` could either be a sync function or an async function
async def _before_retry(self, info: RetryInfo) -> None:
await self._send_failure_log(info.exception, info.fails)
@retry(
retry_policy='_retry_policy',
# Similar to `retry_policy`,
# `before_retry` could either be a Callable or a str
before_retry='_before_retry'
)
async def connect(self):
await self._connect()
def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
if isinstance(info.exception, (KeyError, ValueError)):
# If it raises a KeyError or a ValueError, it will not retry.
return True, 0
# Otherwise, retry immediately
return False, 0
@retry(retry_policy)
async def foo():
# do something that might raise KeyError, ValueError or RuntimeError
...
-
fn
Callable[[...], Awaitable]
the function to be wrapped. The function should be an async function or normal function returns an awaitable. -
retry_policy
Union[str, RetryPolicy]
-
before_retry?
Optional[Union[str, Callable[[RetryInfo], Optional[Awaitable]]]]
If specified,before_retry
is called after each failure offn
and before the corresponding retry. If the retry is abandoned,before_retry
will not be executed.
Returns a wrapped function which accepts the same arguments as fn
and returns an Awaitable
.
RetryPolicyStrategy = Tuple[bool, int | float]
RetryPolicy = Callable[[RetryInfo], RetryPolicyStrategy | Awaitable[RetryPolicyStrategy]]
Retry policy is used to determine what to do next after the fn
fails to do some certain thing.
rt = retry_policy(info)
abandon, delay = (
# Since 6.2.0, retry_policy could also return an `Awaitable`
await rt
if inspect.isawaitable(rt)
else rt
)
-
info
RetryInfo
-
info.fails
int
is the counter number of how many times functionfn
performs as a failure. Iffn
fails for the first time, thenfails
will be1
. -
info.exception
Exception
is the exception thatfn
raised. -
info.since
float
is the fractional time seconds generated bytime.monotonic()
when the first failure happens.
-
info.fails
- If
abandon
isTrue
, then aioretry will give up the retry and raise the exception directly, otherwise aioretry will sleepdelay
seconds (asyncio.sleep(delay)
) before the next retry.
def retry_policy(info: RetryInfo):
if isinstance(info.exception, KeyError):
# Just raise exceptions of type KeyError
return True, 0
return False, info.fails * 0.1
from aioretry import (
# The type of retry_policy function
RetryPolicy,
# The type of the return value of retry_policy function
RetryPolicyStrategy,
# The type of before_retry function
BeforeRetry,
RetryInfo
)
Since 5.0.0
, aioretry introduces RetryInfo
as the only parameter of retry_policy
or before_retry
2.x
def retry_policy(fails: int):
"""A policy that gives no chances to retry
"""
return True, 0.1 * fails
5.x
def retry_policy(info: RetryInfo):
return True, 0.1 * info.fails
3.x
def before_retry(e: Exception, fails: int):
...
5.x
# Change the sequence of the parameters
def before_retry(info: RetryInfo):
info.exception
info.fails
...
Since 5.0.0
, both retry_policy
and before_retry
have only one parameter of type RetryInfo
respectively.
Since 6.0.0
, RetryInfo::since
is a float
value which is generated by time.monotonic()
and is better for measuring intervals than datetime
, while in 5.x
RetryInfo::since
is a datetime
# 5.x
from datetime import datetime
def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
print('error occurred', (datetime.now() - info.since).total_seconds(), 'seconds ago')
...
# 6.x
import time
def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
print('error occurred', time.monotonic() - info.since, 'seconds ago')
...