AI Core & Tasks
The ai_core app is the central nervous system of the Apptork Root System. It abstracts the chaotic world of third-party AI providers into a clean, unified, asynchronous interface.
The AI Pipeline
When a client requests a generation, the system follows a strict, asynchronous lifecycle:
- API Request & Deduction: The client hits the endpoint. The system validates the payload, calculates the cost via
ProviderFeatureCost, atomically deducts the credits, and creates aTaskrecord with aPENDINGstatus. - Celery Orchestration: The API immediately returns a
202 Acceptedalongside a Task ID, offloading the heavy work to a Celery Worker via Redis. - Provider Execution: The worker utilizes the
BasePayloadTransformerto standardize the prompt and sends it to the AI vendor (e.g., Replicate). - Storage & Webhooks: Upon completion, the worker downloads the generated asset, uploads it directly to MinIO (or AWS S3), links the secure URL to the Task, and marks it
COMPLETED.
Webhooks & SSE
Real-Time Updates without Polling
Long-polling is inefficient. Root System pushes updates directly to the client.
By listening to the Server-Sent Events (SSE) endpoint at /api/v1/ai/stream/, clients receive real-time streams of their task statuses (PROCESSING, COMPLETED, FAILED). This is handled natively by the Daphne ASGI server, which easily maintains thousands of concurrent SSE connections.
Creating a New AI Feature
Root System makes adding a new feature (like a novel Video Generator) incredibly structured.
Define your expected input in the appropriate app (my_video_app/serializers.py).
Create an endpoint extending BaseGenerationViewSet.
class VideoGenerationView(BaseGenerationViewSet):
create_serializer_class = VideoGenerationSerializer
detail_serializer_class = GenerationTaskDetailSerializer
task_type = "video_gen" # (1)!
def dispatch_task(self, task):
# Best practice: cast UUID to string for Celery
from .tasks import process_video_generation_task
celery_task = process_video_generation_task.delay(str(task.id))
return celery_task.id
- This
task_typemaps directly to theProviderFeatureCostin the database!
Extend BaseGenerationTask and implement _run_generation in my_video_app/tasks.py.
from ai_core.tasks import BaseGenerationTask
from celery import current_app
class ProcessVideoGenerationTask(BaseGenerationTask):
def _run_generation(self, task_instance):
# Your vendor-specific logic here (1)!
# e.g. replicate.run(...)
# Then link the results using process_and_link_results utility
pass
process_video_generation_task = current_app.register_task(ProcessVideoGenerationTask())
- All status updates, retries, refunds, and error handling are managed automatically by the
BaseGenerationTaskbase class.
Head to the Django Admin, create a ProviderFeatureCost for video_gen, assign a token cost, and the system instantly handles all credit deductions, race-conditions, and task tracking for your new feature!