Behavior Trees
Modular AI with Behavior Trees
Behavior trees provide modular, reusable, and maintainable AI! Learn composite nodes, leaf nodes, decorators, and how to build complex AI behaviors that are easy to understand and debug! ๐ณ๐ค๐ง
Understanding Behavior Trees
๐ณ The Decision Tree Analogy
Think of behavior trees like a flowchart for making decisions:
- Root: The starting point of all decisions
- Branches: Different paths based on conditions
- Leaves: Actual actions to perform
- Selector: Try options until one works (OR)
- Sequence: Do all steps in order (AND)
- Decorator: Modify behavior (repeat, invert)
Interactive Behavior Tree Demo
Watch the AI make decisions using behavior trees! Control the environment to see different behaviors!
Spawn Agents:
Agents: 0 | Resources: 0 | Enemies: 0 | Buildings: 0
Behavior Tree Implementation
from enum import Enum
from typing import Any, Callable, List, Optional
import random
class NodeState(Enum):
"""Node execution states"""
SUCCESS = "success"
FAILURE = "failure"
RUNNING = "running"
class BehaviorNode:
"""Base class for behavior tree nodes"""
def __init__(self, name: str = "Node") -> None:
self.name: str = name
self.parent: Optional['BehaviorNode'] = None
self.children: List['BehaviorNode'] = []
def add_child(self, child: 'BehaviorNode') -> 'BehaviorNode':
"""Add child node"""
child.parent = self
self.children.append(child)
return self
def tick(self, agent: Any, blackboard: dict) -> NodeState:
"""Execute node logic"""
return NodeState.SUCCESS
def reset(self) -> None:
"""Reset node state"""
for child in self.children:
child.reset()
# Composite Nodes
class Selector(BehaviorNode):
"""Try children until one succeeds (OR)"""
def __init__(self, name: str = "Selector") -> None:
super().__init__(name)
self.current_child: int = 0
def tick(self, agent: Any, blackboard: dict) -> NodeState:
for i in range(self.current_child, len(self.children)):
child = self.children[i]
result = child.tick(agent, blackboard)
if result == NodeState.SUCCESS:
self.current_child = 0
return NodeState.SUCCESS
elif result == NodeState.RUNNING:
self.current_child = i
return NodeState.RUNNING
self.current_child = 0
return NodeState.FAILURE
def reset(self) -> None:
super().reset()
self.current_child = 0
class Sequence(BehaviorNode):
"""Execute children in order (AND)"""
def __init__(self, name: str = "Sequence") -> None:
super().__init__(name)
self.current_child: int = 0
def tick(self, agent: Any, blackboard: dict) -> NodeState:
for i in range(self.current_child, len(self.children)):
child = self.children[i]
result = child.tick(agent, blackboard)
if result == NodeState.FAILURE:
self.current_child = 0
return NodeState.FAILURE
elif result == NodeState.RUNNING:
self.current_child = i
return NodeState.RUNNING
self.current_child = 0
return NodeState.SUCCESS
def reset(self) -> None:
super().reset()
self.current_child = 0
class Parallel(BehaviorNode):
"""Execute children simultaneously"""
def __init__(self, success_threshold: int = -1,
failure_threshold: int = -1,
name: str = "Parallel") -> None:
super().__init__(name)
self.success_threshold: int = success_threshold
self.failure_threshold: int = failure_threshold
def tick(self, agent: Any, blackboard: dict) -> NodeState:
success_count = 0
failure_count = 0
running_count = 0
for child in self.children:
result = child.tick(agent, blackboard)
if result == NodeState.SUCCESS:
success_count += 1
elif result == NodeState.FAILURE:
failure_count += 1
else:
running_count += 1
# Check thresholds
if self.success_threshold > 0 and success_count >= self.success_threshold:
return NodeState.SUCCESS
if self.failure_threshold > 0 and failure_count >= self.failure_threshold:
return NodeState.FAILURE
if running_count > 0:
return NodeState.RUNNING
return NodeState.SUCCESS if success_count > 0 else NodeState.FAILURE
class RandomSelector(BehaviorNode):
"""Select random child to execute"""
def __init__(self, name: str = "RandomSelector") -> None:
super().__init__(name)
def tick(self, agent: Any, blackboard: dict) -> NodeState:
if not self.children:
return NodeState.FAILURE
child = random.choice(self.children)
return child.tick(agent, blackboard)
# Decorator Nodes
class Decorator(BehaviorNode):
"""Base decorator node"""
def __init__(self, child: Optional[BehaviorNode] = None,
name: str = "Decorator") -> None:
super().__init__(name)
if child:
self.add_child(child)
class Inverter(Decorator):
"""Invert child result"""
def __init__(self, child: Optional[BehaviorNode] = None) -> None:
super().__init__(child, "Inverter")
def tick(self, agent: Any, blackboard: dict) -> NodeState:
if not self.children:
return NodeState.FAILURE
result = self.children[0].tick(agent, blackboard)
if result == NodeState.SUCCESS:
return NodeState.FAILURE
elif result == NodeState.FAILURE:
return NodeState.SUCCESS
else:
return NodeState.RUNNING
class Repeater(Decorator):
"""Repeat child execution"""
def __init__(self, child: Optional[BehaviorNode] = None,
times: int = -1) -> None:
super().__init__(child, f"Repeater({times})")
self.times: int = times
self.count: int = 0
def tick(self, agent: Any, blackboard: dict) -> NodeState:
if not self.children:
return NodeState.FAILURE
while self.times < 0 or self.count < self.times:
result = self.children[0].tick(agent, blackboard)
if result == NodeState.SUCCESS:
self.count += 1
if self.times > 0 and self.count >= self.times:
self.count = 0
return NodeState.SUCCESS
elif result == NodeState.FAILURE:
self.count = 0
return NodeState.FAILURE
else:
return NodeState.RUNNING
return NodeState.SUCCESS
class Cooldown(Decorator):
"""Add cooldown to child execution"""
def __init__(self, child: Optional[BehaviorNode] = None,
cooldown_time: float = 1.0) -> None:
super().__init__(child, f"Cooldown({cooldown_time}s)")
self.cooldown_time: float = cooldown_time
self.last_execution: float = 0
def tick(self, agent: Any, blackboard: dict) -> NodeState:
import time
current_time = time.time()
if current_time - self.last_execution < self.cooldown_time:
return NodeState.FAILURE
if not self.children:
return NodeState.FAILURE
result = self.children[0].tick(agent, blackboard)
if result == NodeState.SUCCESS:
self.last_execution = current_time
return result
# Leaf Nodes
class Condition(BehaviorNode):
"""Check condition"""
def __init__(self, condition_func: Callable, name: str = "Condition") -> None:
super().__init__(name)
self.condition_func: Callable = condition_func
def tick(self, agent: Any, blackboard: dict) -> NodeState:
if self.condition_func(agent, blackboard):
return NodeState.SUCCESS
return NodeState.FAILURE
class Action(BehaviorNode):
"""Execute action"""
def __init__(self, action_func: Callable, name: str = "Action") -> None:
super().__init__(name)
self.action_func: Callable = action_func
def tick(self, agent: Any, blackboard: dict) -> NodeState:
return self.action_func(agent, blackboard)
# Blackboard for sharing data
class Blackboard:
"""Shared memory for behavior tree"""
def __init__(self) -> None:
self.data: dict[str, Any] = {}
def get(self, key: str, default: Any = None) -> Any:
return self.data.get(key, default)
def set(self, key: str, value: Any) -> None:
self.data[key] = value
def has(self, key: str) -> bool:
return key in self.data
def remove(self, key: str) -> None:
if key in self.data:
del self.data[key]
# Behavior Tree Manager
class BehaviorTree:
"""Manage behavior tree execution"""
def __init__(self, root: BehaviorNode) -> None:
self.root: BehaviorNode = root
self.blackboard: Blackboard = Blackboard()
def tick(self, agent: Any) -> NodeState:
"""Execute behavior tree"""
return self.root.tick(agent, self.blackboard)
def reset(self) -> None:
"""Reset tree state"""
self.root.reset()
Best Practices
โก Behavior Tree Tips
- Modular Design: Create reusable subtrees
- Blackboard Pattern: Share data between nodes
- Priority Order: Most important behaviors first in selectors
- Fail Fast: Check conditions early in sequences
- Decorators: Add complexity without changing base behaviors
- Visual Debugging: Show active nodes during development
- Performance: Limit tree depth and complexity
- Testing: Unit test individual nodes
Key Takeaways
- ๐ณ Behavior trees provide modular AI architecture
- ๐ Composite nodes control execution flow
- ๐ Leaf nodes perform actions and checks
- ๐จ Decorators modify behavior without changes
- ๐ Blackboards enable data sharing
- ๐ง Easy to debug and maintain
- โป๏ธ Highly reusable components
- ๐ Visual representation aids understanding
๐๏ธโโ๏ธ Practice Exercise
๐๏ธโโ๏ธ Exercise 1: Worker Behavior Tree with Selector Priority + RUNNING-State Stickiness + Decorator Cooldown in One Pygame Window
Objective: Build a runnable pygame window in roughly 85 lines that shows three orthogonal behavior-tree disciplines visible per frame on a 24×15 cell grid (each cell 32px) with a 320px tree-visualization sidebar. The agent runs a Worker BT with a Selector root over three prioritized children: a Combat Sequence (EnemyNear? → Flee), a Gather Sequence (ResourceExists? → MoveToResource → Pickup → MoveHome → Deposit), and an Idle catch-all action. Press E to spawn an enemy near the agent — the root’s current_child snaps from 1 (gather) back to 0 (combat) on the next tick because the Selector is tried top-down and Combat’s EnemyNear? gate now returns SUCCESS, so Flee runs and whatever gather step was running gets abandoned (priority interrupt). Press R to spawn a resource — once Combat fails (no enemy) the Selector falls through to Gather and the gather Sequence’s current_child visibly progresses 0→1→2→3→4 across many frames as MoveToResource and MoveHome each return RUNNING for ~120 ticks before SUCCESS, demonstrating that the index persists across ticks rather than resetting to 0 every frame (RUNNING-state stickiness). Press D to toggle a Cooldown(2s) decorator wrapping Flee — the same Flee action gets a 2-second cooldown without any source change to the action itself, and the HUD shows the cooldown timer counting down independently (decorator pattern). Right-side sidebar renders the live tree with each node colored by last tick result (gray=IDLE, green=SUCCESS, red=FAILURE, yellow=RUNNING) and an amber outline on the currently-RUNNING leaf. HUD shows tick count, root result, root.cc, gather.cc, carry Y/N, score, cd remaining ms, and cooldown ON/OFF flag — three orthogonal BT disciplines visible per frame as concrete numbers and colors.
Instructions:
- Initialize pygame with a 768×480 window plus a 320px tree-visualization sidebar (1088×480 total); create an Agent at cell (4, 7) with attributes
pos,home,carrying=False,score=0, and a reference to the world. - Define a
NodeStateenum (SUCCESS / FAILURE / RUNNING) and aNodebase withname+last(for visualization coloring) + a defaulttick(agent)returning SUCCESS. - Build
SelectorandSequenceas composite subclasses of aCompositemixin that initializesself.children = []+self.cc = 0in__init__. Ontick(), iterate children starting fromself.cc;Selectorreturns SUCCESS-on-any-child-SUCCESS (OR-of-children short-circuit),Sequencereturns FAILURE-on-any-child-FAILURE (AND-of-children short-circuit). On RUNNING return, setself.cc = iso the next tick re-enters at the same index. On terminal SUCCESS or FAILURE, resetself.cc = 0so the next outer cycle starts fresh. - Build leaf nodes: a
Condwrapping a predicate (returns SUCCESS if true, FAILURE otherwise) and anActwrapping a function returning a NodeState (so multi-tick actions can return RUNNING). Conditions:EnemyNear?= enemies exist and nearest within 150px;ResExists?= resources list non-empty. - Build action functions:
flee(a)moves 4 px/tick away from nearest enemy and returns RUNNING until distance from home exceeds 25 px (then SUCCESS);move_to(a, target)moves 4 px/tick toward target, returns RUNNING until within 8 px (then SUCCESS);pickup,deposit, andidleare one-tick actions that return SUCCESS or FAILURE based on world state. - Build a
Cooldown(child, secs)decorator: returns FAILURE iftime.time() - self.t0 < self.secs, otherwise ticks the wrapped child and timestampsself.t0 = time.time()on a SUCCESS return. The wrapped child’s source code is never touched — that’s the decorator pattern’s whole point. - Compose the worker tree in a
build_tree()function that returns the root and a reference to the gather Sequence (so the HUD can readgather.cc):root = Selector('Worker'); combat = Sequence('Combat'); combat.add(Cond(EnemyNear)); combat.add(cd if cooldown_on else flee_act); gather = Sequence('Gather'); gather.add(Cond(ResExists)); gather.add(Act(move_to_res)); gather.add(Act(pickup)); gather.add(Act(move_home)); gather.add(Act(deposit)); root.add(combat); root.add(gather); root.add(Act(idle)). The child-list order IS the agent’s priority policy. - Main loop: key E spawns an enemy 80 px from agent; key R appends a random-position resource; key D toggles
cooldown_onand rebuilds the tree (combat sequence now contains the Cooldown wrapper instead of bare Flee, or vice versa); key Z resets agent to home and clears world; SPACE pauses. Each unpaused frame, callroot.tick(agent)and let each node record itslaststate for color rendering. - Render: agent as blue 12-px circle, home as a 24-px brown square at home cell, enemies as red 10-px circles, resources as gold 8-px circles. On the right sidebar, render the live tree (root at top, two-deep layout) with each node as a 140×22 colored rectangle (gray IDLE, green SUCCESS, red FAILURE, yellow RUNNING) and an amber 3-px outline on the currently-RUNNING leaf node.
- HUD line at top of game area:
tick=N | root=RESULT | root.cc=I | gather.cc=J | carry=Y/N | score=K | cd=Lms | cdON=B. Watchinggather.ccclimb 0→1→2→3→4 across many frames (instead of resetting to 0) is the RUNNING-state stickiness in action;root.ccsnapping from 1 (gather) back to 0 (combat) on E-press is the priority interrupt in action;cd=NNNmsticking down after a successful flee is the Cooldown decorator working without touching Flee’s source.
๐ก Hint
The single most important detail is the self.cc = i assignment on RUNNING return AND the self.cc = 0 reset on terminal SUCCESS/FAILURE in BOTH Selector and Sequence — both composites need the persistence for symmetric reasons. Without self.cc = i on RUNNING, a multi-tick MoveToResource would re-evaluate from index 0 each tick and the gather sequence would never reach Pickup. Without the reset to 0 on terminal results, the next gather cycle would start partway through a stale sequence. For the priority interrupt to work, place the Selector’s cc = 0 reset on the SUCCESS return path (not on RUNNING) so the next tick’s outer walk starts from index 0 and re-checks Combat’s EnemyNear? gate before falling through to Gather — if you reset on RUNNING too, you’d lose the in-flight Flee progress. For the Cooldown decorator, store last_success on the decorator instance (not on the wrapped child); rebuild the combat sequence in-place when D toggles rather than mutating the existing Flee node, so the source code of Flee never changes when the decorator goes on or off — that’s the decorator pattern’s whole point. The child-list order in add_child() calls is the priority encoding; add_child() uses list.append() which preserves insertion order, so the order you call combat.add(...), gather.add(...), then root.add(combat); root.add(gather); root.add(idle) IS the runtime priority of those three behaviors.
โ Example Solution
import pygame, sys, time, math, random
from enum import Enum
from typing import Any, Callable, Optional
pygame.init()
W, H, CELL, SIDEBAR = 768, 480, 32, 320
screen = pygame.display.set_mode((W + SIDEBAR, H))
font = pygame.font.SysFont('Consolas', 14)
clock = pygame.time.Clock()
class S(Enum):
SUCCESS, FAILURE, RUNNING = 'S', 'F', 'R'
def dist(a: Any, b: Any) -> float: return math.hypot(a[0]-b[0], a[1]-b[1])
class Node:
def __init__(self, name: str) -> None:
self.name: str = name
self.last: Optional[S] = None
def tick(self, a: Any) -> S: self.last = S.SUCCESS; return S.SUCCESS
class Composite(Node):
def __init__(self, name: str) -> None: super().__init__(name); self.children: list[Node] = []; self.cc: int = 0
def add(self, c: Node) -> 'Composite': self.children.append(c); return self
class Selector(Composite):
def tick(self, a: Any) -> S:
for i in range(self.cc, len(self.children)):
r = self.children[i].tick(a)
if r == S.SUCCESS: self.cc = 0; self.last = r; return r
if r == S.RUNNING: self.cc = i; self.last = r; return r
self.cc = 0; self.last = S.FAILURE; return S.FAILURE
class Sequence(Composite):
def tick(self, a: Any) -> S:
for i in range(self.cc, len(self.children)):
r = self.children[i].tick(a)
if r == S.FAILURE: self.cc = 0; self.last = r; return r
if r == S.RUNNING: self.cc = i; self.last = r; return r
self.cc = 0; self.last = S.SUCCESS; return S.SUCCESS
class Cond(Node):
def __init__(self, name: str, fn: Callable) -> None: super().__init__(name); self.fn: Callable = fn
def tick(self, a: Any) -> S: r = S.SUCCESS if self.fn(a) else S.FAILURE; self.last = r; return r
class Act(Node):
def __init__(self, name: str, fn: Callable) -> None: super().__init__(name); self.fn: Callable = fn
def tick(self, a: Any) -> S: r = self.fn(a); self.last = r; return r
class Cooldown(Node):
def __init__(self, child: Node, secs: float) -> None:
super().__init__(f'Cd({secs}s)')
self.child: Node = child
self.secs: float = secs
self.t0: float = 0
def tick(self, a: Any) -> S:
if time.time() - self.t0 < self.secs: self.last = S.FAILURE; return S.FAILURE
r = self.child.tick(a)
if r == S.SUCCESS: self.t0 = time.time()
self.last = r; return r
def remaining_ms(self) -> int: return max(0, int((self.secs - (time.time() - self.t0)) * 1000))
class World: pass
world = World(); world.enemies = []; world.resources = []
agent = type('Ag', (), {})(); agent.pos = [4*CELL, 7*CELL]; agent.home = (4*CELL, 7*CELL)
agent.carrying = False; agent.score = 0; agent.world = world
def flee(a: Any) -> S:
if not a.world.enemies: return S.FAILURE
e = a.world.enemies[0]; dx, dy = a.pos[0]-e[0], a.pos[1]-e[1]; d = math.hypot(dx, dy) or 1
a.pos[0] += dx/d * 4; a.pos[1] += dy/d * 4
return S.SUCCESS if dist(a.pos, a.home) > 60 else S.RUNNING
def move_to(a: Any, t: Any) -> S:
dx, dy = t[0]-a.pos[0], t[1]-a.pos[1]; d = math.hypot(dx, dy)
if d < 8: return S.SUCCESS
a.pos[0] += dx/d * 4; a.pos[1] += dy/d * 4; return S.RUNNING
def move_to_res(a: Any) -> S: return move_to(a, a.world.resources[0]) if a.world.resources else S.FAILURE
def move_home(a: Any) -> S: return move_to(a, a.home)
def pickup(a: Any) -> S:
if a.world.resources: a.world.resources.pop(0); a.carrying = True; return S.SUCCESS
return S.FAILURE
def deposit(a: Any) -> S:
if a.carrying and dist(a.pos, a.home) < 24: a.carrying = False; a.score += 1; return S.SUCCESS
return S.FAILURE
flee_act = Act('Flee', flee); cd = Cooldown(flee_act, 2.0); cooldown_on = False
def build_tree() -> tuple[Selector, Sequence]:
root = Selector('Worker'); combat = Sequence('Combat')
combat.add(Cond('EnemyNear?', lambda a: bool(a.world.enemies) and dist(a.pos, a.world.enemies[0]) < 150))
combat.add(cd if cooldown_on else flee_act)
gather = Sequence('Gather')
gather.add(Cond('ResExists?', lambda a: bool(a.world.resources)))
gather.add(Act('MoveRes', move_to_res)); gather.add(Act('Pickup', pickup))
gather.add(Act('MoveHome', move_home)); gather.add(Act('Deposit', deposit))
root.add(combat); root.add(gather); root.add(Act('Idle', lambda a: S.SUCCESS))
return root, gather
root, gather = build_tree(); tick_n = 0; paused = False
COLOR = {S.SUCCESS:(76,175,80), S.FAILURE:(244,67,54), S.RUNNING:(255,193,7), None:(158,158,158)}
def draw_tree(node: Node, x: int, y: int) -> None:
pygame.draw.rect(screen, COLOR[node.last], (x, y, 140, 22))
if node.last == S.RUNNING and not getattr(node, 'children', None):
pygame.draw.rect(screen, (255,140,0), (x, y, 140, 22), 3)
screen.blit(font.render(node.name, True, (0,0,0)), (x+4, y+3))
if hasattr(node, 'children'):
for i, c in enumerate(node.children): draw_tree(c, x+18, y+30+i*28)
while True:
for ev in pygame.event.get():
if ev.type == pygame.QUIT: pygame.quit(); sys.exit()
if ev.type == pygame.KEYDOWN:
if ev.key == pygame.K_e: world.enemies = [(agent.pos[0]+80, agent.pos[1])]
if ev.key == pygame.K_r: world.resources.append((random.randint(50, W-50), random.randint(50, H-50)))
if ev.key == pygame.K_d: cooldown_on = not cooldown_on; root, gather = build_tree()
if ev.key == pygame.K_z: agent.pos = list(agent.home); world.enemies = []; world.resources = []
if ev.key == pygame.K_SPACE: paused = not paused
if not paused: root.tick(agent); tick_n += 1
screen.fill((44,62,80))
pygame.draw.rect(screen, (121,85,72), (agent.home[0]-12, agent.home[1]-12, 24, 24))
for e in world.enemies: pygame.draw.circle(screen, (244,67,54), (int(e[0]), int(e[1])), 10)
for r in world.resources: pygame.draw.circle(screen, (255,215,0), (int(r[0]), int(r[1])), 8)
pygame.draw.circle(screen, (33,150,243), (int(agent.pos[0]), int(agent.pos[1])), 12)
hud = f'tick={tick_n} root={root.last.value if root.last else "-"} root.cc={root.cc} gather.cc={gather.cc} carry={"Y" if agent.carrying else "N"} score={agent.score} cd={cd.remaining_ms()}ms cdON={cooldown_on}'
screen.blit(font.render(hud, True, (255,255,255)), (10, 10))
pygame.draw.rect(screen, (30,30,30), (W, 0, SIDEBAR, H)); draw_tree(root, W+10, 30)
pygame.display.flip(); clock.tick(60)
๐ฏ Quick Quiz
Question 1: A behavior tree’s Selector and Sequence composite nodes both walk their children top-down each tick, but they short-circuit on different result kinds. Which statement most accurately captures the asymmetry?
Question 2: Both Selector and Sequence store a self.cc (current_child) index that gets set to i when child i returns RUNNING and reset to 0 on terminal SUCCESS or FAILURE. The lesson’s gather Sequence is [ResExists?, MoveToResource, Pickup, MoveHome, Deposit] and a 60Hz tick runs MoveToResource for ~120 frames before it returns SUCCESS. What does self.cc persistence across ticks accomplish?
Question 3: The lesson’s Worker tree composes Selector(root) with children [Combat Sequence, Gather Sequence, Idle] in that exact order. Pressing E in the demo spawns an enemy near the agent, and on the next tick the root’s cc snaps from 1 (gather) back to 0 (combat), interrupting whatever gather step was running. What does this child-list ordering encode, and why does the order matter?
What's Next?
Now that you've mastered behavior trees, next we'll explore flocking and swarm behaviors for creating realistic group AI!