Architecture¶
This page is for engineers who want to understand how Relier works under the hood. If you just want to use it, the Quickstart and Core Concepts are enough. Come back here when you're debugging something deep or evaluating whether Relier fits your constraints.
The central tension: FastAPI is async, Celery is not¶
FastAPI is built on asyncio. Your route handlers are async def, your database calls are await, your entire stack is event-loop-native.
Celery is built on prefork workers, separate OS processes, each running a plain synchronous Python event loop. When Celery calls your task function, it calls it synchronously. If your task is async def, Celery has no idea what to do with the returned coroutine object.
Relier's entire async bridge exists to solve this mismatch cleanly.
The Async/Celery Bridge¶
The wrong approach: asyncio.run() per task¶
The naive fix is to call asyncio.run(my_async_task()) inside each task. This works, but it creates and destroys a new event loop and a new Redis connection pool for every single task execution. At any real throughput, this exhausts connection limits and adds significant per-task overhead.
Relier's approach: one persistent event loop per worker¶
Each Celery worker process gets one event loop, created once at startup, reused forever.
Here's the exact boot sequence:
Worker process spawns (via Celery's prefork)
│
├─ init_worker_process() fires (Celery signal hook in app.py)
│ ├─ asyncio.new_event_loop() — fresh loop, not inherited from parent
│ ├─ daemon thread started: runs loop.run_forever() in the background
│ ├─ loop stored in module-level worker_loop variable
│ ├─ Redis pool pre-warmed: get_relier_redis() called immediately
│ │ (first task pays zero connection cost)
│ ├─ Worker presence heartbeat loop started (updates rl:presence:{worker_id} every 5s)
│ └─ GracefulShutdownHandler installed (SIGTERM/SIGINT → drain)
│
└─ Worker is ready to accept tasks
How task execution flows across the thread boundary¶
When Celery hands a task to the worker, it calls the sync wrapper in the Celery thread. That wrapper schedules the async orchestration onto the event loop thread:
sequenceDiagram
participant CT as Celery Thread
participant EL as Event Loop Thread
CT->>EL: run_coroutine_threadsafe(_orchestrate(envelope))
Note over CT: blocks on future.result(timeout=hard+10)
Note over EL: schema validation + migration
Note over EL: idempotency check (Lua)
Note over EL: Phoenix registration
Note over EL: await func(*args, **kwargs)
Note over EL: record result + SLO metrics
Note over EL: cleanup (heartbeat, inflight)
EL-->>CT: future resolves — result returned to Celery
This is safe because Python's asyncio is not thread-safe at the coroutine level, but run_coroutine_threadsafe is explicitly designed for this cross-thread scheduling pattern. The event loop thread owns all async state; the Celery thread only hands work to it and waits.
Background loops on the worker event loop¶
Several tasks run concurrently on the worker's event loop in the background:
| Loop | Purpose | Interval |
|---|---|---|
_presence_loop |
Refreshes rl:presence:{worker_id} |
heartbeat_ttl / 2 |
PhoenixRegistry._refresh_loop |
Extends rl:hb:{task_id} per active task |
heartbeat_ttl / 2 |
_cleanup_dead_workers |
Removes stale worker IDs from rl:workers |
60s |
These loops coexist on the same event loop as your task execution. They're all async def coroutines scheduled via asyncio.create_task.
Redis Key Schema¶
Everything Relier does is in Redis under the rl: prefix. Here's the full map:
Task lifecycle keys¶
| Key | Type | TTL | Purpose |
|---|---|---|---|
rl:hb:{task_id} |
String | heartbeat_ttl (10s) |
Heartbeat, expiry means the worker is dead |
rl:phoenix:{task_id} |
Hash | 24h | Full task state: payload, worker_id, registered_at, partial_result, checksum |
rl:resurrections:{task_id} |
String | None | How many times this task has been resurrected |
rl:lock:resurrect:{task_id} |
String | 30s | Distributed lock: prevents two resurrectorsre-queuing the same task |
rl:lease:{task_id} |
String | 180s | Fence token lease, only the holder may commit this incarnation |
rl:fence:{task_id} |
String | 600s | Current incarnation ID, stale workers are rejected on commit |
Worker tracking keys¶
| Key | Type | TTL | Purpose |
|---|---|---|---|
rl:workers |
SortedSet | None | All workers, scored by last-seen timestamp |
rl:presence:{worker_id} |
String | heartbeat_ttl (10s) |
Ephemeral worker liveness key |
rl:inflight:{worker_id} |
SortedSet | None | Task IDs actively running on this worker, scored by start time |
Reliability keys¶
| Key | Type | TTL | Purpose |
|---|---|---|---|
rl:idem:{name}:{hash} |
String | idempotency_ttl |
Idempotency cache: IN_FLIGHT sentinel or serialised result |
rl:dlq |
Hash | None | DLQ: maps task_id → full quarantine envelope |
rl:monitoring |
Hash | None | Recently resurrected tasks being watched |
rl:phoenix:expiry_index |
SortedSet | None | Task IDs scored by heartbeat expiry timestamp, efficient resurrection scan |
Metrics keys¶
| Key | Type | TTL | Purpose |
|---|---|---|---|
rl:m:global:{status} |
String | None | Global all-time counter (status: success, failed, resurrected) |
rl:m:w:{worker_id}:{status} |
String | 24h | Per-worker session counter |
rl:slo:{status}:{bucket} |
String | ~3d | SLO event counter per 60s time bucket |
rl:admission:{resource_key} |
String | admission_window (10s) |
Admission control counter for the current window |
rl:task_durations |
List | None | Recent task durations (capped at 1000 entries) for percentile queries |
Why Redis-only?¶
Every reliability operation has an O(1) or O(log N) Redis equivalent. Heartbeats are SET EX. Inflight tracking is ZADD/ZREM. Idempotency is a Lua SET NX. There's no write that requires a relational join.
The entire state needed to survive a worker crash fits in Redis. Adding Postgres to the hot path would add round-trip latency on every task without improving the reliability guarantee.
The Full Execution Lifecycle¶
Here's what happens from the moment you call await my_task.apush(arg) to the moment the result is cached:
Producer side (your FastAPI handler)¶
await my_task.apush("some_arg")
│
├─ 1. Admission check (Lua INCR, < 1ms)
│ KEYS[1] = rl:admission:global
│ ARGV[1] = limit (5000), ARGV[2] = window (10s)
│ Returns: (is_admitted, current_count, retry_after)
│ On rejection: raises AdmissionRejectedError
│
├─ 2. Schema wrapping
│ Generates task_id (UUID4)
│ Builds envelope: {schema_version, task_id, payload, checksum, enqueued_at}
│ Checksum = sha256(json.dumps({"args": [...], "kwargs": {...}}, sort_keys=True))
│ Injects OTEL context carriers (for distributed trace continuation)
│
└─ 3. Dispatch
celery_app.send_task(task_name, args=(envelope,), queue=queue, task_id=task_id)
→ Redis LPUSH {queue} {serialised_envelope}
Worker side (Celery + Relier)¶
Celery pops envelope from queue
│
├─ wrapper(self, envelope) called in Celery thread
│
├─ 1. Async bridge
│ future = asyncio.run_coroutine_threadsafe(_orchestrate(envelope), worker_loop)
│ future.result(timeout=hard_timeout + 10) ← blocks Celery thread
│
└─ _orchestrate(envelope) runs on event loop thread:
│
├─ 2. OTEL context restore
│ Extract context from envelope._otel_context
│ Continue distributed trace from producer span
│
├─ 3. Schema validation + migration
│ Pydantic validates envelope structure
│ sha256(payload) verified against envelope.checksum
│ If mismatch: PayloadIntegrityError → DLQ quarantine
│ Sequential migrations applied if schema_version < current
│
├─ 4. Idempotency check (if idempotent=True)
│ idem_key = sha256(task_name + args + kwargs)[:16]
│ Lua ACQUIRE_LUA:
│ → CLAIMED: proceed (SET rl:idem:{key} "rl:inflight:{uuid}" NX EX 120)
│ → IN_FLIGHT: raise IdempotencyInFlightError → retry in 5s
│ → COMPLETED: return cached result immediately, done
│
├─ 5. Phoenix registration
│ redis.hset(rl:phoenix:{task_id}, {payload, worker_id, registered_at})
│ redis.setex(rl:hb:{task_id}, heartbeat_ttl, "1")
│ redis.zadd(rl:phoenix:expiry_index, {task_id: now + heartbeat_ttl})
│ asyncio.create_task(_refresh_loop(task_id)) ← background heartbeat refresh
│
├─ 6. Lease + fence validation (resurrection path only)
│ kwargs may contain _fence_token, _lease_key, _fence_key
│ Lua VALIDATE_LUA: verify this incarnation still owns the lease
│ If rejected: stale resurrected worker, return without executing
│
├─ 7. Inflight tracking
│ redis.zadd(rl:inflight:{worker_id}, {task_id: now})
│
├─ 8. Checkpoint resolution (resumed task)
│ If checkpoint in kwargs: CheckpointStore.resolve(checkpoint)
│ Deserialise + decompress, inject as ctx.partial_result
│
├─ 9. TaskContext creation
│ ctx = TaskContext(task_id, task_name, args, kwargs, worker_id, partial_result)
│ Stored in contextvars (accessible via task_context proxy)
│
├─ 10. Task execution
│ If soft_timeout or hard_timeout:
│ asyncio.wait({task_coro, soft_watcher, hard_watcher},
│ return_when=FIRST_COMPLETED)
│ Soft watcher: await asyncio.sleep(soft) → call on_soft_timeout(ctx)
│ Hard watcher: await asyncio.sleep(hard) → task_coro.cancel()
│ Else:
│ result = await func(*args, **kwargs)
│
├─ 11. Post-execution (success path)
│ Lua COMMIT_CHECK_LUA: verify fence still valid (not superseded by newer resurrection)
│ Lua CLEANUP_LUA: release lease
│ If idempotent: SET rl:idem:{key} json.dumps(result) EX idempotency_ttl
│ SLOMetrics.record_event("success")
│ redis.incr(rl:m:global:success), redis.incr(rl:m:w:{worker_id}:success)
│
└─ 12. Cleanup (finally block — always runs)
Cancel _refresh_loop
PhoenixRegistry.complete(task_id):
redis.delete(rl:hb:{task_id})
redis.delete(rl:phoenix:{task_id})
redis.delete(rl:resurrections:{task_id})
redis.zrem(rl:phoenix:expiry_index, task_id)
redis.zrem(rl:inflight:{worker_id}, task_id)
task_duration_ms histogram recorded
redis.lpush(rl:task_durations, duration) ← for p95 queries
Error paths¶
PayloadIntegrityError (checksum mismatch)
→ DLQ.quarantine(task_id, reason="PayloadIntegrityError")
→ No retry — a corrupted payload will always be corrupted
IdempotencyInFlightError (concurrent duplicate)
→ self.retry(countdown=5, max_retries=10)
→ Celery backs off and retries; eventually IN_FLIGHT expires and it claims
HardTimeoutError / asyncio.CancelledError (hard timeout)
→ SLOMetrics.record_event("failure")
→ DLQ.quarantine(task_id, reason="TimeoutError")
Any other exception
→ SLOMetrics.record_event("failure")
→ DLQ.quarantine(task_id, reason=type(exc).__name__)
→ Exception re-raised (Celery sees the failure)
The Resurrection Lifecycle in Detail¶
The resurrector is a long-running async process, not a scheduled job. It runs continuously:
async def resurrection_loop():
while True:
await _scan_and_resurrect()
await asyncio.sleep(resurrection_check_interval) # default: 2s
Scan step¶
redis.zrangebyscore(rl:phoenix:expiry_index, 0, now)
→ Returns task IDs whose heartbeat deadline has passed
For each task_id:
→ Check: exists(rl:hb:{task_id})
→ If heartbeat alive: worker is fine, update score in expiry_index, skip
→ If heartbeat expired:
Load: hgetall(rl:phoenix:{task_id})
If payload missing: remove from index (task completed cleanly), skip
Acquire: SET rl:lock:resurrect:{task_id} NX EX 30
If lock fails: another resurrector has it, skip
Resurrection step¶
Check resurrection count:
→ GET rl:resurrections:{task_id}
→ If count >= max_resurrections:
DLQ.quarantine(task_id, reason="max_resurrections_exceeded")
Skip
Run Lua RESURRECT_LUA (atomic):
→ Generate fence_token = uuid4().hex
→ SET rl:lease:{task_id} fence_token EX 180
→ SET rl:fence:{task_id} fence_token EX 600
→ Returns fence metadata
Enrich payload:
→ Add _fence_token, _lease_key, _fence_key to kwargs
Dispatch to broker:
→ celery_app.send_task(task_name, args=(envelope,), queue="re-queue", task_id=task_id)
→ "re-queue" is a dedicated recovery queue — isolated from normal traffic
AFTER broker confirms receipt (not before):
→ INCR rl:resurrections:{task_id}
→ HSET rl:monitoring task_id "0"
→ ZREM rl:phoenix:expiry_index task_id
The order matters: resurrection count is only incremented after the broker has confirmed the task was received. A network partition between the resurrector and broker won't inflate the count.
Fence tokens: why they exist¶
Consider this scenario:
- Worker A is running task_123. Worker A becomes slow (GC pause, swap, network issue).
- Worker A's heartbeat expires.
- Resurrector detects the expired heartbeat and dispatches task_123 to Worker B with fence_token="abc".
- Worker B runs task_123 and completes. Result is committed. Fence token "abc" is released.
- Worker A wakes up from its GC pause and tries to commit its result for task_123.
Without fence tokens, Worker A's stale result would overwrite Worker B's correct commit. With fence tokens:
- Step 4: Worker B's commit validates that rl:fence:{task_id} == "abc", it does, commit proceeds.
- Step 5: Worker A tries to commit, validates rl:fence:{task_id}, it's gone (TTL or deleted). Commit rejected. Worker A's stale result is silently discarded.
Admission Control Internals¶
The admission Lua script runs atomically in Redis, no Python involved in the decision:
local current = redis.call('INCR', KEYS[1])
if current == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[2]) -- set window TTL on first request
end
local limit = tonumber(ARGV[1])
if current > limit then
return {0, current, redis.call('TTL', KEYS[1])} -- rejected
end
return {1, current, 0} -- admitted
KEYS[1]=rl:admission:{resource_key}(e.g.,rl:admission:global)ARGV[1]= request limit (e.g.,5000)ARGV[2]= window duration in seconds (e.g.,10)
The returned TTL becomes the Retry-After value in the HTTP 429 response. The client knows exactly when to retry.
The script is loaded via SCRIPT LOAD on first use and executed via EVALSHA, no network round-trip for the Lua code on hot paths.
Idempotency Lua Scripts¶
Two scripts handle the idempotency lifecycle:
ACQUIRE_LUA (claim or detect)¶
local val = redis.call('GET', KEYS[1])
if not val then
-- Not seen before: claim it
redis.call('SET', KEYS[1], ARGV[1], 'EX', ARGV[2])
return {0, false} -- 0 = claimed, proceed
end
local is_inflight = string.find(val, 'rl:inflight:', 1, true)
if is_inflight then
return {1, val} -- 1 = in-flight, retry
end
return {1, val} -- 1 = completed, val = cached result JSON
Callers distinguish IN_FLIGHT from COMPLETED by checking whether val starts with rl:inflight:.
RELEASE_LUA (compare-and-delete)¶
Used on exception exit from idempotency_lock to release the in-flight sentinel. The compare-and-delete prevents a stale worker from clearing a lock owned by a newer execution attempt.
IdempotencyLock context manager lifecycle¶
idempotency_lock() implements the same three-state machine as the @rl_task(idempotent=True) decorator path, but the commit step runs automatically in __aexit__ rather than being called explicitly by the task function.
idempotency_lock(key, ttl)
│
├─ __aenter__
│ ACQUIRE_LUA
│ → CLAIMED : key = rl:inflight:{uuid} EX inflight_ttl
│ return IdempotencyResult(already_executed=False)
│ → IN_FLIGHT: raise IdempotencyInFlightError
│ → COMPLETED: return IdempotencyResult(already_executed=True,
│ cached_result=<value>)
│
├─ task body runs
│ lock.set_result(value) ← sync; stages value in _pending_result
│
└─ __aexit__
Exception path:
RELEASE_LUA (compare-and-delete — removes in-flight sentinel)
→ key deleted, task can be retried
Clean-exit path, set_result() was called:
SET rl:idem:{key} json.dumps(value) EX ttl
→ key = committed result, future duplicates get cached_result=value
Clean-exit path, set_result() NOT called:
SET rl:idem:{key} json.dumps(null) EX ttl + log WARNING
→ key = None sentinel, future duplicates blocked,
cached_result=None
The critical design constraint: in-flight sentinels use a short, bounded TTL (RELIER_IDEMPOTENCY_INFLIGHT_TTL, default 120 s) so a worker that is killed mid-execution cannot permanently block future duplicate calls for the full result TTL. The committed result is written with the result TTL (ttl argument) in a separate SET call.
COMMIT_CHECK_LUA (verify fence on success)¶
local fence_key = KEYS[1]
local fence_token = ARGV[1]
local current_fence = redis.call('GET', fence_key)
if not current_fence then
return 0 -- fence expired — safe to commit (no competing incarnation)
end
if current_fence ~= fence_token then
return -1 -- stale: a newer incarnation has taken over, reject this commit
end
return 1 -- valid: this incarnation still owns the task
SLO Burn Rate Calculation¶
Relier records task outcomes into 60-second time buckets:
Task completes → SLOMetrics.record_event("success" or "failure")
→ bucket = floor(unix_timestamp / 60) * 60
→ INCR rl:slo:{status}:{bucket}
→ EXPIRE rl:slo:{status}:{bucket} RETENTION_SECONDS
To calculate burn rate for a window (e.g., 1 hour = 3600 seconds):
end = floor(now / 60) * 60
start = end - 3600 + 60
buckets = [start, start+60, start+120, ..., end]
successes = MGET rl:slo:success:{b} for b in buckets → sum non-null
failures = MGET rl:slo:failure:{b} for b in buckets → sum non-null
actual_error_rate = failures / (successes + failures)
allowed_error_rate = 1.0 - 0.999 = 0.001 ← 99.9% SLO target
burn_rate = actual_error_rate / allowed_error_rate
Each MGET fetches all buckets for a window in a single Redis round-trip. The bucket count is bounded (3600 / 60 = 60 keys for the 1h window, 259200 / 60 = 4320 for the 3d window) so queries are fast and memory usage is O(window / bucket_size), not O(task_count).
Graceful Shutdown Internals¶
SIGTERM received
│
├─ Signal handler (registered in init_worker_process)
│ asyncio.run_coroutine_threadsafe(shutdown_handler.drain(), worker_loop)
│
└─ drain() runs on event loop thread:
│
├─ Set draining = True
│ (second SIGTERM is ignored — no double shutdown)
│
├─ Cancel queue consumers:
│ celery_app.control.cancel_consumer(queue_name)
│ (worker stops accepting new tasks from the broker)
│
├─ Wait loop (up to graceful_shutdown_timeout seconds):
│ while active_tasks and elapsed < timeout:
│ await asyncio.sleep(0.5)
│ elapsed += 0.5
│
├─ If tasks finished in time:
│ shutdown_type = "clean"
│
└─ If timeout expired (tasks still running):
handoff_remaining():
for task_id in active_tasks:
redis.delete(rl:hb:{task_id})
← heartbeat deleted → resurrector picks it up within 35s
shutdown_type = "forced"
shutdowns_total counter incremented with shutdown_type label
shutdown_duration_s histogram recorded
Worker process exits
Checkpoint Storage¶
For tasks that support partial progress (via ctx.set_partial()), Relier stores the checkpoint in Redis:
- Small checkpoints (<
checkpoint_max_inline_bytes, default 256KB): stored directly inrl:phoenix:{task_id}hash fieldpartial_result. - Large checkpoints: written to the filesystem at
RELIER_CHECKPOINT_DIR/{task_id}and the hash stores a reference envelope{"backend": "filesystem", "path": "..."}.
When a resurrected task is picked up, the checkpoint is resolved and injected as ctx.partial_result before your function is called.
Envelope Integrity (Checksums)¶
Every payload is signed with a SHA-256 checksum at enqueue time:
payload = {"args": list(args), "kwargs": kwargs}
checksum_input = json.dumps(payload, sort_keys=True, ensure_ascii=True)
checksum = "sha256:" + hashlib.sha256(checksum_input.encode()).hexdigest()
At execution time, the worker recomputes the checksum and compares. If they don't match:
- PayloadIntegrityError is raised
- The task goes directly to DLQ (no retry, a corrupted payload will always be corrupted)
This catches serialisation bugs, payload tampering, and storage corruption.