Elemental | Documentation
Task Queue

Task Queue

The TaskQueue object is responsible for holding the tasks that are assigned to the agent. The task queue is used to keep track of the tasks that are assigned to the agent and to provide context of the tasks for the agent.

TaskQueue is meant to be populated by tasks that are created by the planning agent. The tasks are created by decomposition of the original user's instruction in order to specify simpler components that will contribute to the final solution. The tasks are then assigned to the agent for execution.

class TaskQueue(BaseModel):
    _tasks: Dict[str, Task] = {}
    _status: Status = Status.BLOCKED
  • create_task_queue - Create a task queue with a list of tasks. The task queue is initialized from the list of strings that have JSON representation of tasks. This creates Task objects from the JSON strings and adds them to the task queue.
    def create_task_queue(
            self,
            tasks: List[Dict[str, Any]],
            original_instruction: str,
            keep_ids: bool = False,
        ) -> Status
    
  • mark_tasks_with_results_as_done - Mark all tasks with results as DONE.
    def mark_tasks_with_results_as_done(self) -> None
    
  • update_tasks_statuses - Update the status of the tasks in the task queue. If a task has all its dependencies done, set the status of the task to READY.
    def update_tasks_statuses(self) -> None
    
  • add_context_to_all_tasks - Add information to the context of all tasks in the task queue.
    def add_context_to_all_tasks(self, identifier: str, information: str) -> None
    
  • fix_context_for_all_tasks - Fix the context of all tasks in the task queue. This method assumes that the context has not been properly set and results of dependent tasks are not available.
    def fix_context_for_all_tasks(self) -> None
    
  • update_task - Update the status of a task in the task queue. This method is used to update the status of a task when the task is completed or when the task is blocked.
    def update_task(
            self, 
            task_id: str, 
            status: Status, 
            Optional[result]: str = None
        ) -> None
    
  • get_task - Get a task from the task queue by task ID.
    def get_task(self, task_id: str) -> Task
    
  • get_all_tasks - Get all tasks from the task queue.
    def get_all_tasks(self) -> Dict[str, Task]
    
  • remove_task - Remove a task from the task queue by task ID.
    def remove_task(self, task_id: str) -> None
    
  • status - Get the status of the task queue. The status of the task queue can be READY, BLOCKED, DONE, FAILED or IN_PROGRESS.
    def status(self) -> Status
    
  • get_next_ready_tasks - Get the next ready tasks from the task queue. The next ready tasks are the tasks that are ready to be executed. The tasks are ready to be executed if all their dependencies are completed and the task is not blocked. This method returns the next ready task and the status of the queue.
    def get_next_ready_tasks(self) -> Tuple[Task, Status]
    
  • print_tasks - Print the tasks in the task queue. This method is used to print the tasks in the task queue for debugging purposes.
    def print_tasks(self) -> None
    
  • revise_task_queue - Revise the task queue based on the revised plan. The revised plan follows the same structure as the original plan.
    def revise_task_queue(
            self, 
            revised_plan: List[Dict[str, Any]], 
            original_instruction: str
        ) -> None
    

Status

The Status class is used to represent the status of the task queue. The status of the task queue can be READY, BLOCKED, DONE, FAILED or IN_PROGRESS. The status of the task queue is used to determine if the task queue is ready to be executed or if it is blocked.

class Status(Enum):
    """
    Enum class for the status of a task. 
    The status can be READY, BLOCKED,
    DONE, FAILED or IN_PROGRESS.
    """

    READY = "ready"
    BLOCKED = "blocked"
    DONE = "done"
    FAILED = "failed"
    IN_PROGRESS = "in_progress"

    def __str__(self) -> str:
        """
        Return the string representation of the status.
        """
        return self.value

Task

The Task class is used to represent a task in the task queue. A task has an ID, a description, a result, a status, dependencies, and context. The status of a task can be READY, BLOCKED, DONE, FAILED or IN_PROGRESS. Dependencies are the IDs of the tasks that this task depends on. Context is a dictionary to store the results of the tasks that this task depends on.

class Task(BaseModel):

    id: str
    description: str
    result: str = ""
    status: Status = Status.BLOCKED
    dependencies: List[str]
    context: Dict[str, str] = {}
    origin: str = ""

    def add_to_context(
            self, 
            identifier: str, 
            information: str
        ) -> None:
        ...

Example of Using the Task Queue

Below is an example of how to use the task queue. The example creates a task queue with three tasks. The first task is READY and the other tasks are blocked. The second task depends on the first task. The third one depends on the first two tasks. The example then updates the status of the first task to DONE and prints the tasks in the task queue.

INSTRUCTION = "Test instruction"
task_queue = TaskQueue()

logger.info("Creating a task queue with three tasks")

planned_tasks: list[Dict[str, Any]] = [
    {
        "id": "1",
        "description": "Task 1",
        "result": "",
        "status": Status.READY,
        "dependencies": [],
        "context": {},
    },
    {
        "id": "2",
        "description": "Task 2",
        "result": "",
        "status": Status.BLOCKED,
        "dependencies": ["1"],
        "context": {},
    },
    {
        "id": "3",
        "description": "Task 3",
        "result": "",
        "dependencies": ["1", "2"],
        "context": {},
    },
]

task_queue.create_task_queue(planned_tasks, INSTRUCTION)
task_queue.print_tasks()

logger.info("Updating the status of the first task to DONE")
t, s = task_queue.get_next_ready_tasks()
current_id = t.id
logger.info(f"Next ready task: {t}")
task_queue.update_task(current_id, Status.DONE, "Task 1 result")
task_queue.print_tasks()