Integration Recipes¶
Plug Relier into the Python web framework you're already using. The reliability
guarantees are identical across all of them, the only thing that changes is
the dispatch method (apush for async, push for sync) and where you put the
exception handler.
TL;DR: import your task, call
.apush(...)in async handlers or.push(...)in sync ones, catchAdmissionRejectedErrorand convert it to HTTP 429. That's the entire integration.
FastAPI (async, primary integration)¶
Relier is designed for FastAPI. The producer-side code is naturally async, the trace context is propagated end-to-end, and admission rejections map cleanly onto FastAPI's exception handlers.
Project layout¶
my_app/
├── main.py # FastAPI app + routes
├── tasks.py # @rl_task definitions
└── .env # RELIER_REDIS_URL=...
tasks.py¶
from relier.tasks.decorator import rl_task
@rl_task(
queue="default",
idempotent=True,
idempotency_ttl=3600,
soft_timeout=25,
hard_timeout=30,
)
async def send_invoice(invoice_id: str) -> dict:
# Anything you'd do in a normal async function works here.
result = await charge_card(invoice_id)
await email_invoice(invoice_id)
return {"charged": True, "invoice_id": invoice_id}
main.py¶
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from relier.core.exceptions import AdmissionRejectedError
from tasks import send_invoice
app = FastAPI()
@app.exception_handler(AdmissionRejectedError)
async def admission_handler(_: Request, exc: AdmissionRejectedError) -> JSONResponse:
# Map cluster-at-capacity into a standard 429 with Retry-After.
return JSONResponse(
status_code=429,
headers={"Retry-After": str(exc.retry_after)},
content={"detail": "Service at capacity, retry later."},
)
@app.post("/invoices/{invoice_id}/send")
async def dispatch_invoice(invoice_id: str) -> dict:
await send_invoice.apush(invoice_id)
return {"status": "queued"}
Running it¶
In three terminals (or via make dev for a Docker stack):
# 1. The web app
uvicorn main:app --reload
# 2. The Celery worker
celery -A relier.tasks.app worker -l info -Q high_priority,default,low_priority,re-queue
# 3. The Phoenix resurrector
rl run-resurrector
Trace continuity¶
When you await task.apush(...) from inside a request, Relier injects the
current OpenTelemetry trace context into the task envelope. The worker extracts
it and the task span becomes a child of your HTTP span. This works
automatically when RELIER_OTEL_ENABLED=true.
Flask (sync)¶
Flask doesn't own an event loop, so use push, it bridges into asyncio
internally without making you await anything.
tasks.py¶
from relier.tasks.decorator import rl_task
@rl_task(idempotent=True)
async def send_invoice(invoice_id: str) -> dict:
# The task itself is still async, Relier runs it on the worker's loop.
result = await charge_card(invoice_id)
return result
app.py¶
from flask import Flask, jsonify
from relier.core.exceptions import AdmissionRejectedError
from tasks import send_invoice
app = Flask(__name__)
@app.errorhandler(AdmissionRejectedError)
def admission_handler(exc: AdmissionRejectedError) -> tuple:
response = jsonify(detail="Service at capacity, retry later.")
response.status_code = 429
response.headers["Retry-After"] = str(exc.retry_after)
return response
@app.post("/invoices/<invoice_id>/send")
def dispatch_invoice(invoice_id: str) -> tuple:
send_invoice.push(invoice_id) # sync dispatch, no await
return jsonify(status="queued"), 202
Why push and not apush¶
push is a thin sync wrapper around apush. Inside a Flask handler (a sync
function), there is no running event loop, so push calls
asyncio.run(apush(...)) for you. The dispatch finishes in a couple of
milliseconds.
If you're inside a Celery worker (whose persistent event loop is running),
push instead schedules the dispatch onto that loop via
run_coroutine_threadsafe. You don't have to think about which case applies,
push figures it out.
Django (sync views)¶
Same pattern as Flask. Use push in sync views; use apush in async def
views (asgi Django).
Sync view¶
# views.py
from django.http import HttpRequest, JsonResponse
from django.views.decorators.http import require_POST
from relier.core.exceptions import AdmissionRejectedError
from .tasks import send_invoice
@require_POST
def dispatch_invoice(request: HttpRequest, invoice_id: str) -> JsonResponse:
try:
send_invoice.push(invoice_id)
except AdmissionRejectedError as exc:
response = JsonResponse(
{"detail": "Service at capacity, retry later."},
status=429,
)
response["Retry-After"] = str(exc.retry_after)
return response
return JsonResponse({"status": "queued"}, status=202)
Async view (Django 4.1+)¶
async def dispatch_invoice(request: HttpRequest, invoice_id: str) -> JsonResponse:
try:
await send_invoice.apush(invoice_id)
except AdmissionRejectedError as exc:
response = JsonResponse(
{"detail": "Service at capacity, retry later."},
status=429,
)
response["Retry-After"] = str(exc.retry_after)
return response
return JsonResponse({"status": "queued"}, status=202)
Django management commands¶
Management commands are sync entry points. Use push:
# management/commands/replay_invoices.py
from django.core.management.base import BaseCommand
from myapp.tasks import send_invoice
class Command(BaseCommand):
def handle(self, *args: Any, **opts: Any) -> None:
for invoice in queryset:
send_invoice.push(invoice.id)
self.stdout.write(f"Queued {queryset.count()} invoices.")
Starlette / async frameworks generally¶
Anything that runs on an asyncio event loop uses apush. The receipt is
identical to FastAPI:
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
from relier.core.exceptions import AdmissionRejectedError
from tasks import send_invoice
async def dispatch_invoice(request: Request) -> JSONResponse:
invoice_id = request.path_params["invoice_id"]
try:
await send_invoice.apush(invoice_id)
except AdmissionRejectedError as exc:
return JSONResponse(
{"detail": "at capacity"},
status_code=429,
headers={"Retry-After": str(exc.retry_after)},
)
return JSONResponse({"status": "queued"})
app = Starlette(routes=[
Route("/invoices/{invoice_id}/send", dispatch_invoice, methods=["POST"]),
])
Scripts, cron jobs, CLI tools¶
Anywhere you can run a Python interpreter, you can dispatch a Relier task. Use
push for sync, asyncio.run(apush(...)) for async-style scripts.
# replay.py, sync script
from tasks import send_invoice
for invoice_id in load_invoice_ids():
send_invoice.push(invoice_id)
# replay.py, async script
import asyncio
from tasks import send_invoice
async def main() -> None:
for invoice_id in load_invoice_ids():
await send_invoice.apush(invoice_id)
asyncio.run(main())
Dispatching from inside a task¶
Sometimes a task needs to enqueue another task. Both apush and push work
from inside a Celery worker, push is convenient because the task body is
often easier to read without await on dispatch:
@rl_task()
async def import_user(user_id: str) -> None:
profile = await fetch_profile(user_id)
await store(profile)
# Fan out follow-up work, fire-and-forget.
send_welcome_email.push(user_id) # sync convenience
await regenerate_avatar.apush(user_id) # equivalent
Inside the worker, both call paths schedule onto the worker's persistent event loop. There is no risk of nested-loop trouble.
What about .delay() and .apply_async()?¶
@rl_task is built on Celery's shared_task, so these methods technically
work. They bypass Relier's admission control and skip the signed envelope.
Tasks dispatched via .delay() enter the queue as plain Celery payloads and
the worker treats them as legacy unsigned messages.
The only intentional caller of .delay() in this codebase is rl bench,
which uses it as the baseline for comparing dispatch overhead.
In application code, always use apush or push.
Quickly checking it works¶
After wiring an integration, three commands tell you the reliability stack is live:
# 1. Connectivity
rl doctor
# 2. Config sanity
rl config validate
# 3. End-to-end: dispatch from your app, then watch the task run
rl tasks inflight --follow
If the task appears in rl tasks inflight and disappears when it completes,
the integration works. If it goes into rl dlq list instead, see
Troubleshooting.