Skip to content

AI Core & Tasks

The ai_core app is the central nervous system of the Apptork Root System. It abstracts the chaotic world of third-party AI providers into a clean, unified, asynchronous interface.


The AI Pipeline

When a client requests a generation, the system follows a strict, asynchronous lifecycle:

  1. API Request & Deduction: The client hits the endpoint. The system validates the payload, calculates the cost via ProviderFeatureCost, atomically deducts the credits, and creates a Task record with a PENDING status.
  2. Celery Orchestration: The API immediately returns a 202 Accepted alongside a Task ID, offloading the heavy work to a Celery Worker via Redis.
  3. Provider Execution: The worker utilizes the BasePayloadTransformer to standardize the prompt and sends it to the AI vendor (e.g., Replicate).
  4. Storage & Webhooks: Upon completion, the worker downloads the generated asset, uploads it directly to MinIO (or AWS S3), links the secure URL to the Task, and marks it COMPLETED.

Webhooks & SSE

Real-Time Updates without Polling

Long-polling is inefficient. Root System pushes updates directly to the client.

By listening to the Server-Sent Events (SSE) endpoint at /api/v1/ai/stream/, clients receive real-time streams of their task statuses (PROCESSING, COMPLETED, FAILED). This is handled natively by the Daphne ASGI server, which easily maintains thousands of concurrent SSE connections.


Creating a New AI Feature

Root System makes adding a new feature (like a novel Video Generator) incredibly structured.

Define your expected input in the appropriate app (my_video_app/serializers.py).

serializers.py
class VideoGenerationSerializer(serializers.Serializer):
    prompt = serializers.CharField(max_length=1000)
    aspect_ratio = serializers.ChoiceField(choices=["16:9", "9:16"])

Create an endpoint extending BaseGenerationViewSet.

views.py
class VideoGenerationView(BaseGenerationViewSet):
    create_serializer_class = VideoGenerationSerializer
    detail_serializer_class = GenerationTaskDetailSerializer
    task_type = "video_gen" # (1)!

    def dispatch_task(self, task):
        # Best practice: cast UUID to string for Celery
        from .tasks import process_video_generation_task
        celery_task = process_video_generation_task.delay(str(task.id))
        return celery_task.id

  1. This task_type maps directly to the ProviderFeatureCost in the database!

Extend BaseGenerationTask and implement _run_generation in my_video_app/tasks.py.

tasks.py
from ai_core.tasks import BaseGenerationTask
from celery import current_app

class ProcessVideoGenerationTask(BaseGenerationTask):
    def _run_generation(self, task_instance):
        # Your vendor-specific logic here (1)!
        # e.g. replicate.run(...)
        # Then link the results using process_and_link_results utility
        pass

process_video_generation_task = current_app.register_task(ProcessVideoGenerationTask())

  1. All status updates, retries, refunds, and error handling are managed automatically by the BaseGenerationTask base class.

Head to the Django Admin, create a ProviderFeatureCost for video_gen, assign a token cost, and the system instantly handles all credit deductions, race-conditions, and task tracking for your new feature!