API Reference¶
Relier exposes one decorator (@rl_task) and a handful of supporting types.
Everything else is internal.
The library is async-first: the orchestration layer is built on asyncio,
and every Relier worker process runs a persistent event loop that lives for the
worker's lifetime. Your tasks can be async def or plain def, Relier
bridges them either way. The producer-side API (apush / push) is the same
shape regardless of which web framework you dispatch from.
Built for FastAPI, works with anything. Relier is designed for asyncio-native web frameworks (FastAPI, Starlette, async Django), but
pushexists specifically so sync code paths (Flask, classic Django views, CLIs, management commands, scripts running inside a Celery task) can dispatch reliably without owning an event loop. The reliability guarantees are identical regardless of which one you call.
@rl_task¶
The main decorator. Wraps any async (or sync) function with Relier's full reliability stack.
from relier.tasks.decorator import rl_task
@rl_task(
queue="default",
idempotent=False,
idempotency_ttl=3600,
soft_timeout=None,
hard_timeout=None,
on_soft_timeout=None,
name=None, # optional stable override — see below
)
async def my_task(arg1: str, arg2: int = 0) -> dict:
...
Parameters¶
queue¶
Type: str Default: "default"
The Celery queue this task is routed to. Three public queues are pre-configured:
| Queue | Intended use |
|---|---|
"high_priority" |
User-facing, latency-sensitive tasks |
"default" |
Standard background work |
"low_priority" |
Batch jobs, non-urgent processing |
There is also an internal queue, re-queue, that the Phoenix resurrector
uses to inject resurrected tasks. Decorating a task with queue="re-queue"
raises ValueError, only Relier itself may publish there. The internal
queue exists so you can dedicate a worker pool to recovery traffic (see
docker-compose.yml's worker-recovery service) without sharing capacity
with normal work.
Workers consume queues based on their -Q flag:
# Consume normal traffic only
celery -A relier.tasks.app worker -Q high_priority,default,low_priority
# Dedicate this pool to Phoenix recovery
celery -A relier.tasks.app worker -Q re-queue
idempotent¶
Type: bool Default: False
When True, Relier atomically prevents duplicate execution. If the same task
with the same arguments is submitted while one is already running, or has
already completed, the second submission either retries (in-flight) or
returns the cached result (completed) without re-executing.
The idempotency key is derived by hashing the task name + arguments. For
non-deterministic arguments (random IDs, timestamps, large blobs), use
idempotency_lock for explicit control over the key.
@rl_task(idempotent=True, idempotency_ttl=3600)
async def send_invoice(invoice_id: str) -> dict:
...
idempotency_ttl¶
Type: int (seconds) Default: 3600
How long to cache the result of a completed idempotent task. After this TTL expires, the same arguments can trigger a fresh execution.
Constraint: idempotency_ttl must be greater than RELIER_IDEMPOTENCY_INFLIGHT_TTL
(default 120 s), which itself must be greater than hard_timeout. Relier
validates this at decoration time and raises ValueError if violated. The
reason: if the in-flight sentinel expires before the task hard-times-out,
another worker could claim the same key and run a duplicate.
soft_timeout¶
Type: int | None (seconds) Default: None
After this many seconds, the on_soft_timeout hook is called. The task is
not cancelled, it keeps running until either it completes or
hard_timeout fires. Use the soft window to checkpoint progress and release
locks.
If soft_timeout is set, hard_timeout must also be set and must be strictly
greater. Relier validates this at decoration time.
@rl_task(soft_timeout=25, hard_timeout=30, on_soft_timeout=save_state)
async def long_running_job(job_id: str) -> dict:
...
hard_timeout¶
Type: int | None (seconds) Default: None
After this many seconds, the task coroutine is cancelled unconditionally via
asyncio cancellation. The task is quarantined to the DLQ with
reason=TimeoutError.
If only hard_timeout is set (no soft_timeout), there is no cleanup window,
the task is cancelled immediately when it expires.
Always set hard_timeout — the default is not \"no timeout\"
When hard_timeout=None, Relier's async bridge applies a 300-second
fallback timeout on the thread-to-loop handoff. After 300 s the bridge
raises TimeoutError and Celery marks the task failed, but the underlying
coroutine is only requested to cancel, it will stop at its next await
checkpoint, not immediately.
Always set hard_timeout to match your task's expected worst-case
duration plus a safety margin. This replaces the bridge fallback with
asyncio's own cancellation machinery, which fires cleanly inside the
event loop.
Timeouts only apply to async tasks
The two-tier timeout machinery uses asyncio cancellation. Decorating a
plain def function with soft_timeout= or hard_timeout= raises
ValueError at decoration time. If you need timeouts on sync code, refactor
it to async def and asyncio.to_thread() the blocking calls inside.
on_soft_timeout¶
Type: Callable[[TaskContext], Awaitable[None]] | None Default: None
An async function called when the soft timeout fires. Receives the same
TaskContext instance the task body is using, so any
ctx.metadata updates made by the task body before the soft threshold fires
are visible in the hook. Has hard_timeout - soft_timeout seconds to complete
before the hard timeout fires.
async def checkpoint_progress(ctx: TaskContext) -> None:
# ctx.metadata is live — reflects what the task body wrote before the
# soft timeout fired.
last = ctx.metadata.get("last_cursor")
if last is not None:
await ctx.set_partial({"last_cursor": last})
# Release any external locks.
await release_lock(ctx.args[0])
@rl_task(soft_timeout=25, hard_timeout=30, on_soft_timeout=checkpoint_progress)
async def process_large_batch(batch_id: str) -> dict:
...
The hook runs on the same event loop as the task, so it must be async def
and non-blocking.
name¶
Type: str | None Default: None
An explicit, stable task name that overrides the auto-derived
"{module}.{function_name}" default.
When name is not set, the task is registered as myapp.tasks.process_order
(the dotted import path). This works fine until you rename the function or move
it to a different module at which point the task name changes silently,
breaking:
SchemaRegistry.register_migration("old.name", ...)registrations- Phoenix heartbeat keys for in-flight tasks
- Any external reference to the task by name (DLQ entries, monitoring dashboards)
Set name= to a stable identifier that won't change with refactoring:
@rl_task(name="myapp.process_order", queue="default")
async def process_order(order_id: str, region: str = "global") -> dict:
...
The name value must also be used in register_migration:
@SchemaRegistry.register_migration("myapp.process_order", from_version=1)
def migrate_v1_to_v2(args: tuple, kwargs: dict) -> tuple[tuple, dict]:
kwargs.setdefault("region", "global")
return args, kwargs
Convention
Use "<app_name>.<task_function_name>" — it's stable, human-readable, and unique within a project. Avoid module paths (myapp.tasks.payments.process_order) since those encode your directory structure and force renames when you reorganise.
Dispatch methods (apush, push, delay)¶
Every function decorated with @rl_task is bound to a per-task Celery Task
subclass that adds two Relier-aware dispatch methods on top of Celery's native
.delay() / .apply_async().
await my_task.apush(*args, **kwargs) # async, preferred
my_task.push(*args, **kwargs) # sync wrapper over apush
my_task.delay(*args, **kwargs) # raw Celery dispatch, do not use
The distinction matters. apush and push are the only dispatch paths that
run admission control, wrap the payload in Relier's signed envelope, and inject
OpenTelemetry context. Choose based on whether the caller is async or sync.
await task.apush(*args, **kwargs) → AsyncResult¶
Use this in FastAPI, Starlette, async Django, or any other asyncio code.
# FastAPI route
@app.post("/invoices/{invoice_id}/send")
async def dispatch_invoice(invoice_id: str):
await send_invoice.apush(invoice_id)
return {"status": "queued"}
What apush does, in order:
- Admission check: atomic Lua
INCRagainstrl:admission:celery-dispatch. RaisesAdmissionRejectedError(withretry_after) if the cluster window is full. - Envelope wrap: generates a UUID4
task_id, builds the schema envelope (schema_version,task_id,payload, sha256checksum,enqueued_at). - OTel context injection: current trace context goes into
envelope._otel_contextso the worker can continue the distributed trace. - Dispatch:
celery_app.send_task(name, args=(envelope,), queue=queue, task_id=task_id). Returns the CeleryAsyncResult.
It does not wait for the task to finish, fire-and-forget, like Celery's
own .delay().
Calling apush without await silently does nothing
apush returns a coroutine. If you call it without await, the coroutine
is created but never executed — no admission check runs, no envelope is
built, and no task reaches the broker. The task is silently dropped.
Python will eventually emit RuntimeWarning: coroutine 'apush' was never awaited
when the garbage collector cleans up, but by then the dispatch window has passed.
# Wrong — task is never dispatched
send_invoice.apush(invoice_id)
# Correct
await send_invoice.apush(invoice_id)
If you are in a sync context (script, Django view, Flask route), use push
instead — it handles the event loop bridging for you:
task.push(*args, **kwargs) → AsyncResult¶
Use this in Flask, classic (sync) Django, management commands, scripts, or inside a Celery task that wants to enqueue another task.
# Flask route
@app.route("/invoices/<invoice_id>/send", methods=["POST"])
def dispatch_invoice(invoice_id):
send_invoice.push(invoice_id)
return {"status": "queued"}, 202
# Django view (sync)
def dispatch_invoice(request, invoice_id):
send_invoice.push(invoice_id)
return JsonResponse({"status": "queued"}, status=202)
push is a thin synchronous wrapper around apush:
- If called inside a Celery worker (whose persistent event loop is already
running),
pushschedules the coroutine viaasyncio.run_coroutine_threadsafe(self.apush(...), worker_loop)and waits up to 5 s for the dispatch to be acknowledged by the broker. - If called outside Celery (e.g., from a Flask or Django request handler),
pushfalls back toasyncio.run(self.apush(...)).
Either way, the reliability semantics are identical to apush. The only
difference is that you didn't have to await.
task.delay(*args, **kwargs) / task.apply_async(...) - do not use¶
These are Celery's native dispatch interfaces. They are available because
@rl_task is, underneath, a Celery shared_task. They bypass Relier's
admission control and schema envelope, so:
- Admission control will not reject overloaded clusters when dispatched via
.delay(). - The worker won't see a signed envelope, so checksum verification is skipped
and schema migration cannot run. Tasks enqueued by
.delay()are processed as legacy payloads. - The OTel producer span is not injected.
There is one place where .delay() is correct: the rl bench command uses it
as the baseline in a like-for-like comparison against .apush(). That is the
only intended user of .delay(). In application code, always use apush or
push.
| If you are here… | Use this |
|---|---|
| Async route or coroutine | await task.apush(...) |
| Sync route, script, management command | task.push(...) |
| Existing Celery code you're migrating | Replace .delay() with .push() |
.delay() in new code |
Don't. Use push. |
Handling AdmissionRejectedError¶
from relier.core.exceptions import AdmissionRejectedError
try:
await my_task.apush(payload)
except AdmissionRejectedError as exc:
raise HTTPException(
status_code=429,
headers={"Retry-After": str(exc.retry_after)},
detail="Service at capacity.",
)
exc.retry_after is the number of seconds until the admission window resets,
hand it to the client as the standard HTTP Retry-After header.
TaskContext¶
A dataclass passed to cleanup hooks and accessible from inside any @rl_task
function. Use it to read task identity, save checkpoints, or read a checkpoint
that a previous incarnation wrote.
Accessing the context inside a task¶
Two ways. Pick whichever fits the call site:
# 1. Inject via kwarg — Relier detects the `ctx` parameter and fills it in.
from relier.tasks.context import TaskContext
@rl_task()
async def my_task(item_id: str, ctx: TaskContext = None) -> dict:
resume_at = (ctx.partial_result or {}).get("cursor", 0)
ctx.metadata["last_id"] = item_id
return {"id": item_id}
# 2. Read from the contextvar proxy — no parameter needed.
from relier.tasks.context import task_context
@rl_task()
async def my_task(item_id: str) -> dict:
resume_at = (task_context.partial_result or {}).get("cursor", 0)
task_context.metadata["last_id"] = item_id
return {"id": item_id}
When to use the proxy (task_context)¶
The proxy is most useful when you need to reach the context from a helper function that the task calls, without threading ctx through every intermediate signature:
from relier.tasks.context import task_context
async def save_progress(cursor: str) -> None:
"""Called from deep inside the task — no ctx parameter needed."""
task_context.metadata["cursor"] = cursor
await task_context.set_partial({"cursor": cursor})
async def process_page(items: list) -> None:
for item in items:
await handle(item)
await save_progress(items[-1]["id"])
@rl_task(soft_timeout=55, hard_timeout=60, on_soft_timeout=checkpoint_on_timeout)
async def process_feed(feed_id: str) -> dict:
async for page in fetch_pages(feed_id):
await process_page(page)
return {"status": "done"}
Without the proxy, save_progress and process_page would both need ctx as a parameter even though they don't logically own it. The proxy keeps helper functions clean.
The proxy is bound per task execution via Python's contextvars — each concurrent task has its own isolated view. Reads in one task never bleed into another.
The proxy raises LookupError if accessed outside a running @rl_task function, which catches accidental reads in tests or scripts early.
When to use injection (ctx: TaskContext = None)¶
Prefer injection when the function is the task and the parameter makes the interface explicit — it's immediately visible in the signature that the function depends on the context. Both approaches give you the same object; it's a readability choice.
Fields¶
| Field | Type | Description |
|---|---|---|
task_id |
str |
Celery task UUID. Stable across resurrections. |
task_name |
str |
Fully qualified task name (module.func). |
args |
tuple |
Positional arguments as originally passed. |
kwargs |
dict |
Keyword arguments as originally passed. |
worker_id |
str |
Hostname of the executing worker (celery@…). |
partial_result |
Any |
Checkpoint from a previous incarnation. None on first run. |
metadata |
dict |
Arbitrary metadata, extensible. |
started_at |
float |
Unix timestamp when execution began. |
await ctx.set_partial(data: Any) -> None¶
Save a checkpoint. Whatever you pass here becomes ctx.partial_result on the
next resurrection, so the resurrected incarnation can resume from where the
previous one left off.
How does the checkpoint know where the task got to?¶
It doesn't, you tell it. Relier has no way to introspect "the task was on line 47 of your loop." That's not the level it operates at. The checkpoint is just an arbitrary JSON-serialisable blob you choose to persist; what goes inside is your application's responsibility.
There are two patterns for getting progress out of the task body. Pick the one that fits your workload.
Pattern A: Checkpoint inline (recommended for incremental work)¶
The task body itself calls ctx.set_partial after each unit of work. The
soft-timeout hook is unnecessary because the latest progress is already in
Redis when the worker dies.
from relier.tasks.context import TaskContext
@rl_task(hard_timeout=60)
async def process_items(cursor: str = "start", ctx: TaskContext = None) -> dict:
start = (ctx.partial_result or {}).get("last_cursor", cursor)
async for item in fetch_items(start):
await process(item)
# Persist progress after each item. The resurrected task picks up here.
await ctx.set_partial({"last_cursor": item.id})
return {"status": "done"}
Trade-offs
- ✅ Simplest mental model, no hook, no shared state.
- ✅ Fine-grained recovery: you lose at most one item's work.
- ✅ Works without
soft_timeout/on_soft_timeout. - ⚠️ One Redis write per item. Cheap, but not free; if your loop iterates millions of items per second, throttle the writes (only save every N items).
Pattern B: Track progress in ctx.metadata, save in the soft hook¶
The task body writes progress into ctx.metadata (an in-memory dict on the
context object, no Redis writes). The soft-timeout hook reads
ctx.metadata and persists a final checkpoint just before the hard timeout
fires.
from relier.tasks.context import TaskContext
async def save_checkpoint(ctx: TaskContext) -> None:
# Reads whatever the task body recorded into ctx.metadata.
last = ctx.metadata.get("last_cursor")
if last is not None:
await ctx.set_partial({"last_cursor": last})
@rl_task(soft_timeout=55, hard_timeout=60, on_soft_timeout=save_checkpoint)
async def process_items(cursor: str = "start", ctx: TaskContext = None) -> dict:
start = (ctx.partial_result or {}).get("last_cursor", cursor)
async for item in fetch_items(start):
await process(item)
# In-memory only, no Redis write per item.
ctx.metadata["last_cursor"] = item.id
return {"status": "done"}
Trade-offs
- ✅ Zero Redis writes during normal execution only one write at the soft-timeout boundary.
- ✅ Best for very hot loops where per-item checkpointing would dominate the work.
- ⚠️ Only protects you against soft-timeout-triggered handoff. A SIGKILL or
OOM-kill bypasses the soft hook entirely, the task is resurrected with
whatever
set_partiallast persisted (often nothing). For real crash resilience, combine: write toctx.metadataper item and also callset_partialperiodically (every N items, or every M seconds).
Pattern C: Hybrid (periodic durable checkpoints + final rescue)¶
The conservative production default: cheap inline progress with periodic durable saves.
@rl_task(soft_timeout=55, hard_timeout=60, on_soft_timeout=save_checkpoint)
async def process_items(cursor: str = "start", ctx: TaskContext = None) -> dict:
start = (ctx.partial_result or {}).get("last_cursor", cursor)
count = 0
async for item in fetch_items(start):
await process(item)
ctx.metadata["last_cursor"] = item.id
count += 1
# Durable checkpoint every 100 items.
if count % 100 == 0:
await ctx.set_partial({"last_cursor": item.id})
return {"status": "done", "processed": count}
This combines crash resilience (you re-do at most 100 items after a SIGKILL) with low write overhead (one Redis hit per 100 items, not per item).
Size limits: read this before checkpointing anything bigger than a JSON document¶
Redis is Relier's coordination plane. It runs with maxmemory-policy
noeviction, replicates writes to every replica, and appends to the AOF. A
careless checkpoint i.e an embedding vector, an image, a model snapshot
serialised straight into Redis would bloat memory, flood replication, and can
take the entire cluster down.
CheckpointStore routes every checkpoint through a size-aware store:
| Checkpoint size | What happens | Configured by |
|---|---|---|
≤ RELIER_CHECKPOINT_MAX_INLINE_BYTES (default 256 KB) |
Stored inline in rl:phoenix:{task_id} hash field partial_result. Fast, no external dependency. |
always on |
> 256 KB with RELIER_CHECKPOINT_BACKEND=filesystem |
Gzipped, written to RELIER_CHECKPOINT_DIR/{task_id}.ckpt. Only a tiny reference envelope ({ref, bytes, codec}) stays in Redis. |
filesystem backend |
> 256 KB with RELIER_CHECKPOINT_BACKEND=inline (default) |
Rejected, raises CheckpointTooLargeError. |
default behaviour |
The default is to reject oversized checkpoints, deliberately. Silently bloating Redis is worse than a loud error: the loud error tells you that you need to either (a) shrink the checkpoint, or (b) configure a blob backend.
# Production: enable filesystem spillover with a shared volume.
RELIER_CHECKPOINT_BACKEND=filesystem
RELIER_CHECKPOINT_DIR=/var/lib/relier/checkpoints
The filesystem backend MUST be shared storage, every worker and the
resurrector must see the same path, because a resurrected task is replayed on
a different process than the one that wrote its checkpoint. Use a mounted
volume (Docker volume, NFS, EFS), not node-local disk. The bundled
docker-compose.prod.yml does this with the redis_checkpoints named volume.
CheckpointTooLargeError is a subclass of RuntimeError. Catch it if you
have a fallback (e.g., write the bulky state to S3 yourself and checkpoint just
the S3 key):
from relier.core.checkpoint import CheckpointTooLargeError
async def save_progress(ctx: TaskContext) -> None:
try:
await ctx.set_partial(huge_state)
except CheckpointTooLargeError:
s3_key = await upload_to_s3(huge_state)
await ctx.set_partial({"s3_key": s3_key})
Lifecycle of a checkpoint¶
- Reclaimed immediately when
PhoenixRegistry.complete(task_id)runs (task finished, including DLQ quarantine). - Reclaimed when the DLQ is purged via
rl dlq purge --confirm. - Orphaned blobs (whose owning Phoenix hash TTL-expired without cleanup) are
swept by
CheckpointStore.sweep()after24h + 1hdriven by the resurrection loop.
idempotency_lock¶
A context manager for manual idempotency control. Use this when you need a
custom key or conditional logic that the auto-keyed idempotent=True flag
can't express.
Async only — sync tasks use idempotent=True
idempotency_lock is an async context manager and requires async with.
Using it with plain with will raise:
If you have a sync task that needs idempotency, use the decorator flag instead, it works for both sync and async task functions:
@rl_task(queue="default", idempotent=True, idempotency_ttl=86400)
def my_sync_task(event_id: str) -> dict:
...
idempotency_lock is only for cases where you need a custom key that the auto-keyed idempotent=True cannot derive (e.g. a webhook event_id that is more stable than the full argument hash). Those cases are inherently async.
from relier.core.idempotency import idempotency_lock
@rl_task()
async def handle_webhook(event_id: str, event_type: str, payload: dict) -> dict:
# Use event_id as the idempotency key, not a hash of all args.
async with idempotency_lock(key=f"webhook:{event_id}", ttl=86400) as lock:
if lock.already_executed:
return lock.cached_result
result = await process_event(event_type, payload)
lock.set_result(result) # sync — committed automatically on exit
return result
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
key |
str |
required | Idempotency key. Must be unique per logical operation. |
ttl |
int |
required | Cache TTL in seconds. |
Execution contract¶
idempotency_lock is a 3-step protocol, not just a duplicate check. Every step is required:
async with idempotency_lock(key=f"webhook:{event_id}", ttl=86400) as lock:
# Step 1 — check: was this already done?
if lock.already_executed:
return lock.cached_result
# Step 2 — execute: do the work
result = await process_event(payload)
# Step 3 — stage the result (sync, no await)
lock.set_result(result)
return result
# Step 3 is committed automatically when the context exits cleanly.
What happens on each exit path:
| Exit | set_result() called |
Outcome |
|---|---|---|
| Clean (return) | Yes | Commits value → future duplicates get cached_result=value |
| Clean (return) | No | Commits None sentinel + warning → future duplicates blocked, cached_result=None |
| Exception | Either | Releases lock → task can retry |
The key guarantee: even if you forget set_result(), the task will not run again on the next duplicate call. The context manager always commits on clean exit. The only case where the task can retry is an exception, which is intentional.
For fire-and-forget tasks with no meaningful return value:
Releasing a committed lock¶
Once a clean exit commits a result (or a None sentinel), the key is permanent until its TTL expires. Future duplicate calls will always get already_executed=True.
If you need to allow the task to run again — for example, you caught a bug after a None sentinel was committed and want to force a fresh execution — delete the Redis key directly:
The rl:idem: prefix is Relier's internal namespace for idempotency keys. After deletion, the next call will see no existing entry and claim a fresh lock.
Warning
Only delete the key after verifying that the original operation did not partially complete. Deleting the key on a task that already charged a payment, sent an email, or wrote to a database will not undo those side effects — it only allows the task to attempt them again.
Lock object¶
| Attribute / Method | Type | Description |
|---|---|---|
lock.already_executed |
bool |
True if a cached result exists for this key |
lock.cached_result |
Any |
Deserialised cached result (only valid when already_executed is True) |
lock.set_result(value) |
— | Stage the result to be committed on clean context exit. Sync — no await. Call with None for fire-and-forget tasks. |
lock._record_result(value) |
— | Internal — do not call directly. Used by the @rl_task decorator to write results to Redis immediately. Use set_result() in your own code. |
Choosing a key¶
The key must be stable across retries and re-deliveries — it is the field your system will use to recognise "this is the same logical operation I already ran."
Use stable, externally-assigned identifiers:
# Good — event_id is assigned by the sender and never changes across retries
async with idempotency_lock(key=f"webhook:{event_id}", ttl=86400) as lock: ...
# Good — payment_id is assigned by Stripe, stable across retries
async with idempotency_lock(key=f"charge:{payment_id}", ttl=3600) as lock: ...
# Good — compound key when the same resource can have multiple distinct operations
async with idempotency_lock(key=f"order:{order_id}:refund", ttl=86400) as lock: ...
Avoid fields that change between retries:
# Bad — timestamp is different on every retry
async with idempotency_lock(key=f"event:{received_at}", ttl=86400) as lock: ...
# Bad — full payload hash changes if any field in the payload differs between deliveries
async with idempotency_lock(key=hashlib.md5(str(payload).encode()).hexdigest(), ...) as lock: ...
Always prefix with an operation name to avoid collisions between different task types that happen to share the same ID space:
# Risky — a payment and a webhook could share the same UUID
async with idempotency_lock(key=event_id, ttl=86400) as lock: ...
# Safe — namespaced
async with idempotency_lock(key=f"webhook.payment_received:{event_id}", ttl=86400) as lock: ...
When to use idempotent=True instead:
If all your task arguments are stable across retries (no timestamps, no random IDs, no mutable payload fields), @rl_task(idempotent=True) is simpler — it derives the key automatically from a hash of the task name and arguments. Use idempotency_lock only when you need to choose which specific field is the stable identifier.
SchemaRegistry¶
Registers payload migrations so workers running new code can safely process
payloads enqueued by older versions. Primarily used alongside @rl_task(name=...).
Migration workflow¶
When you make a breaking change to a task's argument signature (renaming a required arg, adding a required arg, removing one), you need to tell Relier how to upgrade old payloads that are already in the queue.
The workflow is always two steps in order:
Step 1 — Bump CURRENT_VERSION so new payloads are tagged at the new version
and old payloads enter the migration loop when picked up:
Step 2 — Register the migration that transforms v1 → v2 payloads:
@SchemaRegistry.register_migration("myapp.process_order", from_version=1)
def migrate_v1_to_v2(args: tuple, kwargs: dict) -> tuple[tuple, dict]:
# Old signature: process_order(order_id)
# New signature: process_order(order_id, region="global")
kwargs.setdefault("region", "global")
return args, kwargs
Step order matters — bump CURRENT_VERSION first
If you register a migration without bumping CURRENT_VERSION, the migration
loop runs as while 1 < 1: — it never fires. Relier will log a warning at
registration time:
Schema migration registered but will never run: from_version=1 >= CURRENT_VERSION=1
for task 'myapp.process_order'. Bump SchemaRegistry.CURRENT_VERSION to 2 to activate.
If you see this warning, set SchemaRegistry.CURRENT_VERSION = <from_version + 1>
before the @register_migration decorator.
SchemaRegistry.register_migration(task_name, from_version)¶
Registers a migration callable for a specific task and source version.
| Argument | Type | Description |
|---|---|---|
task_name |
str |
The task's stable name — must match @rl_task(name=...). |
from_version |
int |
The envelope version this migration upgrades from. |
The decorated function receives (args: tuple, kwargs: dict) and must return
(args, kwargs) with the payload transformed for the new signature.
@SchemaRegistry.register_migration("myapp.process_order", from_version=1)
def migrate_v1_to_v2(args: tuple, kwargs: dict) -> tuple[tuple, dict]:
kwargs.setdefault("region", "global")
return args, kwargs
Migrations are applied sequentially. If CURRENT_VERSION is 3 and a payload
is at version 1, the chain runs: v1→v2 then v2→v3. Register one handler
per version step.
SchemaRegistry.validate()¶
Call this once at worker startup, after all application modules are imported
and CURRENT_VERSION is in its final state. Relier calls it automatically
inside init_worker_process(), so you only need to call it explicitly if you
have a custom worker bootstrap that doesn't use relier.tasks.app.
# Only needed if you have a custom worker setup.
from relier.core.schema import SchemaRegistry
SchemaRegistry.validate()
For each migration where from_version >= CURRENT_VERSION, it logs CRITICAL
with the task name and the exact version to bump to:
CRITICAL relier.core.schema — Misconfigured migration will never run:
task 'myapp.process_order' has from_version=1 but CURRENT_VERSION=1.
Bump SchemaRegistry.CURRENT_VERSION to 2 before this worker
starts processing tasks.
The worker is not prevented from starting — CRITICAL is chosen so the
problem is immediately visible in logs and will page on-call in any standard
log alerting setup.
Structured logs emitted¶
| Event | Level | When |
|---|---|---|
Applying task payload migration. |
INFO |
Migration ran and payload was transformed |
Task payload migration failed. |
ERROR |
Migration function raised an exception |
No schema migration registered for task ... |
WARNING |
Version loop step has no handler (task may not need one) |
Schema migration registered but will never run ... |
WARNING |
from_version >= CURRENT_VERSION at registration time |
Misconfigured migration will never run ... |
CRITICAL |
validate() found an unreachable migration at worker startup |
All log lines include task_name, from_version, and to_version (or current_version) in the structured extra dict.
DeadLetterQueue¶
Direct access to DLQ operations. For most operational tasks, prefer the
rl dlq CLI commands, they wrap the same calls.
await DeadLetterQueue.list_tasks(limit=None)¶
Returns all quarantined tasks sorted by quarantine time (newest first). Each
entry is a dict:
| Field | Description |
|---|---|
task_id |
Task UUID |
task_name |
Fully qualified task name |
queue |
Original target queue |
args |
Original positional arguments |
kwargs |
Original keyword arguments |
partial_result |
Last checkpoint before quarantine (if any) |
reason |
Error type that triggered quarantine |
resurrections |
Number of resurrection attempts |
quarantined_at |
ISO-8601 timestamp |
await DeadLetterQueue.inspect(task_id)¶
Returns the full quarantine envelope for one task, or None if not found.
await DeadLetterQueue.release(task_id) → bool¶
Un-quarantines a task and re-submits it to its original queue. Returns True
on success, False if not found. Resurrection count is preserved so a
task can't bypass max_resurrections by being repeatedly released.
await DeadLetterQueue.purge() → int¶
Permanently deletes all DLQ entries. Returns the deleted count. Also releases any checkpoint blobs the quarantined tasks owned.
SLOMetrics¶
Read SLO data programmatically.
from relier.core.slo import SLOMetrics
# Current burn rate for a window
rate = await SLOMetrics.get_burn_rate(window="1h", target_slo=0.999)
# All windows at once
report = await SLOMetrics.get_report()
# {"1h": 0.42, "6h": 0.38, "3d": 0.21}
Valid windows: "1h", "6h", "3d".
await SLOMetrics.record_event(status) → None¶
Manually record a task outcome. Relier calls this automatically only use it for custom integrations.
status="success", task completed successfullystatus="failure", task failed or was quarantined
Exceptions¶
from relier.core.exceptions import (
RelierError, # base class
IdempotencyInFlightError, # another worker holds the idempotency key
AdmissionRejectedError, # cluster at capacity (check exc.retry_after)
PayloadIntegrityError, # checksum mismatch on received payload
SchemaMigrationError, # migration function raised
HardTimeoutError, # task exceeded hard_timeout
)
from relier.core.checkpoint import CheckpointTooLargeError raised by
ctx.set_partial(...) when the checkpoint exceeds the inline limit and no
filesystem backend is configured. Subclass of RuntimeError, not RelierError.
AdmissionRejectedError¶
try:
await my_task.apush(arg)
except AdmissionRejectedError as exc:
print(exc.retry_after) # seconds until the admission window resets
IdempotencyInFlightError¶
This is caught and handled internally by @rl_task it triggers a Celery
retry with 5-second backoff. You should not need to handle it yourself unless
you're using idempotency_lock manually.
Settings¶
from relier.config import get_settings
settings = get_settings()
print(settings.heartbeat_ttl) # 10
print(settings.max_resurrections) # 5
All settings are read-only after process startup (the Settings model is
frozen). To change a value, update .env and restart workers:
See Configuration for the full settings reference.