> ## Documentation Index
> Fetch the complete documentation index at: https://rllm-org-rllm-19-feat-renderer-parser-backend.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Workflows

> Workflow orchestration for agent execution and training

Workflows orchestrate the interaction between an agent, an environment, and a rollout engine to execute multi-step tasks.

<Note>
  Workflows are the **older** of the two agent-authoring paths in rLLM. New
  agents should be authored as **AgentFlows** — see the
  [Cookbooks](/cookbooks/overview) tutorial and
  [`cookbooks/`](https://github.com/rllm-org/rllm/tree/main/cookbooks). Workflows
  remain supported for the use cases where you want explicit `BaseAgent` +
  `BaseEnv` separation; the seven cookbooks ship as examples of the
  AgentFlow alternative.
</Note>

## Workflow

Abstract base class for all workflows.

```python theme={null}
from rllm.workflows import Workflow
```

### Constructor

```python theme={null}
def __init__(
    rollout_engine: RolloutEngine,
    executor: ThreadPoolExecutor,
    timeout: float = 1e6,
    gamma: float = 0.0,
    reward_bonus_coeff: float = 0.0,
    **kwargs
)
```

<ParamField path="rollout_engine" type="RolloutEngine">
  The rollout engine for model inference.
</ParamField>

<ParamField path="executor" type="ThreadPoolExecutor">
  Thread pool executor for async operations.
</ParamField>

<ParamField path="timeout" type="float" default="1e6">
  Timeout for workflow execution in seconds.
</ParamField>

<ParamField path="gamma" type="float" default="0.0">
  Discount factor for reward computation. When > 0, computes Monte Carlo returns.
</ParamField>

<ParamField path="reward_bonus_coeff" type="float" default="0.0">
  Coefficient for reward shaping based on reward deltas.
</ParamField>

### Methods

#### run

Execute the workflow on a single task. **Must be implemented by subclasses.**

```python theme={null}
async def run(task: dict, uid: str, **kwargs) -> Episode | None
```

<ParamField path="task" type="dict">
  The task to execute.
</ParamField>

<ParamField path="uid" type="str">
  Unique identifier for the task.
</ParamField>

<ResponseField name="episode" type="Episode | None">
  The generated episode.
</ResponseField>

#### run\_with\_termination\_handling

Wrapper around `run()` that handles termination events, errors, and timeouts.

```python theme={null}
episode = await workflow.run_with_termination_handling(task, uid)
```

#### commit

Commit a trajectory for training.

```python theme={null}
workflow.commit(
    name="solver",
    agent=my_agent,
    reset=True
)
```

<ParamField path="name" type="str | None">
  Name for the trajectory.
</ParamField>

<ParamField path="agent" type="BaseAgent | None">
  Agent whose trajectory to commit.
</ParamField>

<ParamField path="trajectory" type="Trajectory | None">
  Trajectory to commit directly (alternative to agent).
</ParamField>

<ParamField path="reset" type="bool" default="False">
  Whether to reset the agent after committing.
</ParamField>

#### collect\_trajectories

Collect all trajectories from committed and agent instances.

```python theme={null}
episode = workflow.collect_trajectories()
```

<ResponseField name="episode" type="Episode">
  Episode containing all trajectories.
</ResponseField>

#### reset

Reset the workflow for a new task.

```python theme={null}
workflow.reset(task=new_task, uid="task_123:0")
```

<ParamField path="task" type="dict | None">
  The task to reset to.
</ParamField>

<ParamField path="uid" type="str | None">
  Unique identifier for the task.
</ParamField>

#### postprocess\_episode

Post-process episode after completion (compute rewards, metrics, etc.).

```python theme={null}
episode = workflow.postprocess_episode(
    episode,
    termination_reason=TerminationReason.ENV_DONE
)
```

***

## SimpleWorkflow

Simplified workflow for single-agent, single-turn tasks.

```python theme={null}
from rllm.workflows import SimpleWorkflow
from rllm.rewards import math_reward_fn

workflow = SimpleWorkflow(
    rollout_engine=engine,
    reward_function=math_reward_fn,
    executor=executor
)
```

### Constructor

```python theme={null}
def __init__(
    rollout_engine: RolloutEngine,
    reward_function: RewardFunction,
    **kwargs
)
```

<ParamField path="rollout_engine" type="RolloutEngine">
  Engine for model inference.
</ParamField>

<ParamField path="reward_function" type="RewardFunction">
  Function to compute rewards from task and action.
</ParamField>

### Methods

#### run

Execute the workflow:

```python theme={null}
episode = await workflow.run(
    task={"question": "What is 2+2?", "answer": "4"},
    uid="task_0:0"
)
```

The workflow automatically:

1. Extracts messages from task (supports `question`, `prompt`, `problem`, or `messages` keys)
2. Gets model response
3. Computes reward
4. Creates trajectory with step
5. Returns episode

***

## MultiTurnWorkflow

Workflow for multi-step agent-environment interactions.

```python theme={null}
from rllm.workflows import MultiTurnWorkflow
# Substitute your own BaseAgent subclass + BaseEnv subclass here.
# (The legacy ToolAgent / ToolEnvironment classes have been removed —
# port to an AgentFlow cookbook instead, see /core-concepts/agentflow-evaluator.)

workflow = MultiTurnWorkflow(
    agent_cls=MyAgent,
    env_cls=MyEnv,
    agent_args={...},
    env_args={"reward_fn": reward_fn},
    max_steps=5,
    rollout_engine=engine,
    executor=executor
)
```

### Constructor

```python theme={null}
def __init__(
    agent_cls: type | str,
    env_cls: type | str,
    agent_args: dict | None = None,
    env_args: dict | None = None,
    max_steps: int = 5,
    **kwargs
)
```

<ParamField path="agent_cls" type="type | str">
  Agent class (a `BaseAgent` subclass) or string identifier registered in `env_agent_mappings`.
</ParamField>

<ParamField path="env_cls" type="type | str">
  Environment class or string identifier.
</ParamField>

<ParamField path="agent_args" type="dict | None">
  Arguments to pass to agent constructor.
</ParamField>

<ParamField path="env_args" type="dict | None">
  Arguments to pass to environment constructor.
</ParamField>

<ParamField path="max_steps" type="int" default="5">
  Maximum number of steps before termination.
</ParamField>

### Methods

#### run

Execute the multi-step workflow:

```python theme={null}
episode = await workflow.run(task=task_data, uid="task_0:0")
```

The workflow:

1. Resets environment with task
2. Updates agent with initial observation
3. For each step:
   * Gets model response
   * Updates agent with response
   * Steps environment with action
   * Updates agent with new observation and reward
4. Terminates on `done=True` or max steps reached

***

## TerminationReason

Enum for workflow termination reasons.

```python theme={null}
from rllm.workflows import TerminationReason

class TerminationReason(Enum):
    MAX_PROMPT_LENGTH_EXCEEDED = "max_prompt_length_exceeded"
    MAX_RESPONSE_LENGTH_EXCEEDED = "max_response_length_exceeded"
    ENV_DONE = "env_done"
    MAX_TURNS_EXCEEDED = "max_turns_exceeded"
    TIMEOUT = "timeout"
    UNKNOWN = "unknown"
    ERROR = "error"
```

***

## Example: Custom Workflow

```python theme={null}
from rllm.workflows import Workflow
from rllm.agents import Episode, Trajectory, Step, Action
from rllm.workflows import TerminationEvent, TerminationReason

class SolverJudgeWorkflow(Workflow):
    def __init__(self, rollout_engine, **kwargs):
        super().__init__(rollout_engine, **kwargs)
        self.solver = MyAgent()
        self.judge = MyAgent()
    
    async def run(self, task, uid, **kwargs):
        # Reset for new task
        self.reset(task, uid)
        
        # Solver generates solution
        solver_messages = [{"role": "user", "content": task["question"]}]
        solver_output = await self.rollout_engine.get_model_response(
            solver_messages,
            application_id=uid
        )
        
        # Create solver step
        solver_step = Step.from_model_output(
            solver_output,
            messages=solver_messages,
            action=Action(solver_output.content)
        )
        self.solver.trajectory.steps.append(solver_step)
        
        # Judge evaluates solution
        judge_messages = [
            {"role": "user", "content": f"Evaluate: {solver_output.content}"}
        ]
        judge_output = await self.rollout_engine.get_model_response(
            judge_messages,
            application_id=uid
        )
        
        # Compute rewards
        solver_reward = 1.0 if "correct" in judge_output.content else 0.0
        solver_step.reward = solver_reward
        
        # Commit trajectories
        self.commit(name="solver", agent=self.solver)
        
        # Return episode
        raise TerminationEvent(TerminationReason.ENV_DONE)
```
