retryable.py
simple wrapper for retrying a function a few times
from typing import Callable, Awaitable, Any
async def retryable(func: Callable[[], Awaitable[Any]], *, name: str, retries: int = 3) -> Any:
"""
Calls an asynchronous function `func`, retrying up to `retries` times if it fails.
Parameters:
func: An async function that takes no arguments.
name: A string used for logging to identify the operation.
retries: The maximum number of attempts (default is 3).
Returns:
The result of func() if successful; otherwise, None.
"""
for attempt in range(1, retries + 1):
try:
print(f"Running {name} (attempt {attempt})")
result = await func()
except Exception as e:
print(f"Attempt {attempt} failed for {name}: {e}")
logger.exception(f"Error in {name} on attempt {attempt}")
if attempt == retries:
print(f"Failed {name} after {retries} attempts")
else:
print(f"Completed {name} on attempt {attempt}")
return result
return None