> ## Documentation Index
> Fetch the complete documentation index at: https://rllm-org-rllm-19-feat-renderer-parser-backend.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# AgentWorkflowEngine

> Engine for executing workflow-based agent training and evaluation

The `AgentWorkflowEngine` manages workflow execution with built-in retry logic, episode logging, and parallel task processing.

<Note>
  The Workflow path is the older of the two agent-authoring paths. For new
  code, see the [AgentFlow & Evaluator](/core-concepts/agentflow-evaluator)
  protocol — it doesn't need a custom engine wrapper. The
  [`cookbooks/`](https://github.com/rllm-org/rllm/tree/main/cookbooks) directory
  has seven worked examples.
</Note>

## AgentWorkflowEngine

```python theme={null}
from rllm.engine import AgentWorkflowEngine
```

### Constructor

```python theme={null}
def __init__(
    workflow_cls: type[Workflow],
    workflow_args: dict,
    rollout_engine: RolloutEngine,
    config = None,
    n_parallel_tasks: int = 128,
    retry_limit: int = 3,
    raise_on_error: bool = True,
    episode_logger = None,
    **kwargs
)
```

<ParamField path="workflow_cls" type="type[Workflow]">
  Workflow class to instantiate for each task.
</ParamField>

<ParamField path="workflow_args" type="dict">
  Arguments to pass to workflow instances.
</ParamField>

<ParamField path="rollout_engine" type="RolloutEngine">
  Engine for model inference and rollout.
</ParamField>

<ParamField path="config" type="dict | None">
  Optional configuration object for training.
</ParamField>

<ParamField path="n_parallel_tasks" type="int" default="128">
  Number of parallel workflow instances to maintain.
</ParamField>

<ParamField path="retry_limit" type="int" default="3">
  Maximum number of retry attempts for failed tasks.
</ParamField>

<ParamField path="raise_on_error" type="bool" default="True">
  Whether to raise exceptions on permanent failures.
</ParamField>

<ParamField path="episode_logger" type="EpisodeLogger | None">
  Optional logger for saving episode data to files.
</ParamField>

### Methods

#### initialize\_pool

Initialize the workflow pool with parallel workflow instances.

```python theme={null}
await engine.initialize_pool()
```

#### set\_training\_step

Set current training step for episode logging.

```python theme={null}
engine.set_training_step(step=100, mode="train", epoch=0)
```

<ParamField path="step" type="int">
  Current training step number.
</ParamField>

<ParamField path="mode" type="str" default="train">
  Mode identifier: "train" or "val".
</ParamField>

<ParamField path="epoch" type="int" default="0">
  Current epoch number.
</ParamField>

#### process\_task\_with\_retry

Process a single task rollout with retry logic based on termination reasons.

```python theme={null}
task_id, rollout_idx, episode = await engine.process_task_with_retry(
    task=task_data,
    task_id="task_123",
    rollout_idx=0
)
```

<ParamField path="task" type="dict">
  Task dictionary containing the task specification.
</ParamField>

<ParamField path="task_id" type="str">
  Unique identifier for the task.
</ParamField>

<ParamField path="rollout_idx" type="int">
  Index of this rollout attempt for the task.
</ParamField>

<ResponseField name="task_id" type="str">
  The task ID.
</ResponseField>

<ResponseField name="rollout_idx" type="int">
  The rollout index.
</ResponseField>

<ResponseField name="episode" type="Episode">
  Completed episode.
</ResponseField>

#### execute\_batch

Execute a batch of tasks with automatic retry and error handling.

```python theme={null}
episodes = await engine.execute_batch(
    tasks=task_list,
    num_rollouts_per_task=4
)
```

<ParamField path="tasks" type="list[dict]">
  List of task dictionaries.
</ParamField>

<ParamField path="num_rollouts_per_task" type="int" default="1">
  Number of rollouts to generate per task.
</ParamField>

<ResponseField name="episodes" type="list[Episode]">
  List of completed episodes.
</ResponseField>

***

## Retry Logic

The engine automatically retries tasks based on termination reason:

* **Retryable**: `TIMEOUT`, `ERROR`, `MAX_PROMPT_LENGTH_EXCEEDED`, `MAX_RESPONSE_LENGTH_EXCEEDED`
* **Non-retryable**: `ENV_DONE`, `MAX_TURNS_EXCEEDED`, `UNKNOWN`

Tasks are retried up to `retry_limit` times before failing permanently.

***

## Example: Simple Workflow Execution

```python theme={null}
import asyncio
from concurrent.futures import ThreadPoolExecutor
from rllm.engine import AgentWorkflowEngine
from rllm.engine.rollout import OpenAIEngine
from rllm.workflows import SimpleWorkflow
from rllm.rewards import math_reward_fn

# Create rollout engine
rollout_engine = OpenAIEngine(
    base_url="http://localhost:4000/v1",
    api_key="EMPTY",
    model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
)

# Create workflow engine
engine = AgentWorkflowEngine(
    workflow_cls=SimpleWorkflow,
    workflow_args={
        "reward_function": math_reward_fn,
        "executor": ThreadPoolExecutor(max_workers=128)
    },
    rollout_engine=rollout_engine,
    n_parallel_tasks=64,
    retry_limit=3
)

# Define tasks
tasks = [
    {"question": "What is 2+2?", "answer": "4"},
    {"question": "Solve x + 5 = 10", "answer": "x = 5"}
]

async def main():
    # Initialize pool
    await engine.initialize_pool()
    
    # Execute batch
    episodes = await engine.execute_batch(
        tasks=tasks,
        num_rollouts_per_task=2
    )
    
    # Print results
    for episode in episodes:
        print(f"Task: {episode.task}")
        print(f"Correct: {episode.is_correct}")
        print(f"Trajectories: {len(episode.trajectories)}")
        print(f"Metrics: {episode.metrics}")

asyncio.run(main())
```

***

## Example: Multi-Turn Workflow

```python theme={null}
import asyncio
from concurrent.futures import ThreadPoolExecutor
from rllm.engine import AgentWorkflowEngine
from rllm.engine.rollout import OpenAIEngine
from rllm.workflows import MultiTurnWorkflow
# Substitute your own BaseAgent / BaseEnv subclasses (the legacy
# ToolAgent / ToolEnvironment classes have been removed).
from rllm.rewards import math_reward_fn

rollout_engine = OpenAIEngine(
    base_url="http://localhost:8000/v1",
    api_key="EMPTY",
    model="Qwen/Qwen3-4B"
)

engine = AgentWorkflowEngine(
    workflow_cls=MultiTurnWorkflow,
    workflow_args={
        "agent_cls": MyAgent,
        "env_cls": MyEnv,
        "agent_args": {...},
        "env_args": {"reward_fn": math_reward_fn},
        "max_steps": 5,
        "executor": ThreadPoolExecutor(max_workers=128)
    },
    rollout_engine=rollout_engine,
    n_parallel_tasks=32,
    retry_limit=2
)

tasks = [
    {"question": "Find the population of Tokyo", "answer": "37.4 million"}
]

async def main():
    await engine.initialize_pool()
    episodes = await engine.execute_batch(tasks, num_rollouts_per_task=4)
    
    for episode in episodes:
        for traj in episode.trajectories:
            print(f"Agent: {traj.name}")
            print(f"Steps: {len(traj.steps)}")
            print(f"Reward: {traj.reward}")

asyncio.run(main())
```

***

## Episode Logging

The engine supports optional episode logging to save episodes during training:

```python theme={null}
from rllm.utils import EpisodeLogger

logger = EpisodeLogger(output_dir="./episodes")

engine = AgentWorkflowEngine(
    workflow_cls=MyWorkflow,
    workflow_args=workflow_args,
    rollout_engine=rollout_engine,
    episode_logger=logger
)

# Set training context
engine.set_training_step(step=100, mode="train", epoch=0)

# Episodes will be saved to ./episodes/train/epoch_0/step_100/
```
