Event Systems
Creating Decoupled Communication Between Systems
Event systems enable game components to communicate without direct dependencies! They create loose coupling between systems, making your code more maintainable, testable, and flexible. Let's master this powerful pattern! 📡🎮
Understanding Event Systems
📻 The Radio Broadcast Analogy
Think of event systems like radio broadcasting:
- Events: Radio signals being broadcast
- Publishers: Radio stations sending signals
- Subscribers: Radio receivers tuned to specific frequencies
- Event Bus: The airwaves carrying all signals
- Decoupling: Stations don't need to know who's listening
Interactive Event System Visualizer
Watch events flow through the system in real-time!
System Subscriptions:
Events Emitted: 0 | Queue Size: 0
Basic Event System Implementation
from typing import Dict, List, Callable, Any
from dataclasses import dataclass
from enum import Enum, auto
import time
from queue import Queue, PriorityQueue
class EventType(Enum):
"""Enumeration of game event types"""
# Game Events
GAME_START = auto()
GAME_PAUSE = auto()
GAME_RESUME = auto()
GAME_OVER = auto()
# Player Events
PLAYER_SPAWN = auto()
PLAYER_DEATH = auto()
PLAYER_JUMP = auto()
PLAYER_SHOOT = auto()
# Combat Events
DAMAGE_DEALT = auto()
DAMAGE_TAKEN = auto()
ENEMY_KILLED = auto()
# Item Events
ITEM_COLLECTED = auto()
ITEM_USED = auto()
ITEM_DROPPED = auto()
# Level Events
LEVEL_START = auto()
LEVEL_COMPLETE = auto()
CHECKPOINT_REACHED = auto()
# UI Events
BUTTON_CLICKED = auto()
MENU_OPENED = auto()
MENU_CLOSED = auto()
@dataclass
class Event:
"""Base class for all events"""
event_type: EventType
data: Dict[str, Any] = None
timestamp: float = None
priority: int = 5 # Default priority (1 = highest, 10 = lowest)
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
if self.data is None:
self.data = {}
class EventBus:
"""Central event distribution system"""
def __init__(self):
self.listeners: Dict[EventType, List[Callable]] = {}
self.event_queue: Queue = Queue()
self.processing = False
def subscribe(self, event_type: EventType, callback: Callable):
"""Subscribe to an event type"""
if event_type not in self.listeners:
self.listeners[event_type] = []
if callback not in self.listeners[event_type]:
self.listeners[event_type].append(callback)
print(f"Subscribed {callback.__name__} to {event_type.name}")
def unsubscribe(self, event_type: EventType, callback: Callable):
"""Unsubscribe from an event type"""
if event_type in self.listeners:
if callback in self.listeners[event_type]:
self.listeners[event_type].remove(callback)
print(f"Unsubscribed {callback.__name__} from {event_type.name}")
def emit(self, event: Event):
"""Emit an event to all subscribers"""
# Add to queue for processing
self.event_queue.put(event)
# Process immediately if not already processing
if not self.processing:
self.process_events()
def emit_immediate(self, event: Event):
"""Emit an event immediately, bypassing the queue"""
self._dispatch_event(event)
def process_events(self):
"""Process all queued events"""
self.processing = True
while not self.event_queue.empty():
event = self.event_queue.get()
self._dispatch_event(event)
self.processing = False
def _dispatch_event(self, event: Event):
"""Dispatch event to all listeners"""
if event.event_type in self.listeners:
for callback in self.listeners[event.event_type]:
try:
callback(event)
except Exception as e:
print(f"Error in event handler {callback.__name__}: {e}")
def clear(self):
"""Clear all listeners and queued events"""
self.listeners.clear()
while not self.event_queue.empty():
self.event_queue.get()
# Global event bus instance
event_bus = EventBus()
Advanced Event Patterns
# Priority Queue Event System
class PriorityEventBus(EventBus):
"""Event bus with priority queue processing"""
def __init__(self):
super().__init__()
self.event_queue = PriorityQueue()
def emit(self, event: Event):
"""Emit event with priority"""
# Priority queue uses (priority, item) tuples
self.event_queue.put((event.priority, event.timestamp, event))
if not self.processing:
self.process_events()
def process_events(self):
"""Process events in priority order"""
self.processing = True
while not self.event_queue.empty():
_, _, event = self.event_queue.get()
self._dispatch_event(event)
self.processing = False
# Event Manager with filtering
class EventManager:
"""Advanced event management with filtering and wildcards"""
def __init__(self):
self.event_bus = EventBus()
self.event_filters: Dict[str, Callable] = {}
self.event_history: List[Event] = []
self.max_history = 100
def subscribe_with_filter(self, event_type: EventType,
callback: Callable,
filter_func: Callable = None):
"""Subscribe with optional filtering"""
if filter_func:
# Create filtered callback
def filtered_callback(event):
if filter_func(event):
callback(event)
self.event_bus.subscribe(event_type, filtered_callback)
else:
self.event_bus.subscribe(event_type, callback)
def emit_and_record(self, event: Event):
"""Emit event and keep history"""
# Add to history
self.event_history.append(event)
if len(self.event_history) > self.max_history:
self.event_history.pop(0)
# Emit event
self.event_bus.emit(event)
def get_recent_events(self, event_type: EventType = None,
count: int = 10) -> List[Event]:
"""Get recent events from history"""
if event_type:
filtered = [e for e in self.event_history
if e.event_type == event_type]
return filtered[-count:]
return self.event_history[-count:]
# Event Channel System
class EventChannel:
"""Named channel for specific event types"""
def __init__(self, name: str):
self.name = name
self.subscribers: List[Callable] = []
self.enabled = True
def subscribe(self, callback: Callable):
"""Subscribe to this channel"""
if callback not in self.subscribers:
self.subscribers.append(callback)
def unsubscribe(self, callback: Callable):
"""Unsubscribe from this channel"""
if callback in self.subscribers:
self.subscribers.remove(callback)
def broadcast(self, data: Any):
"""Broadcast to all subscribers"""
if not self.enabled:
return
event = Event(
event_type=EventType.CUSTOM,
data={'channel': self.name, 'payload': data}
)
for callback in self.subscribers:
try:
callback(event)
except Exception as e:
print(f"Error in channel {self.name}: {e}")
def toggle(self, enabled: bool):
"""Enable/disable channel"""
self.enabled = enabled
class ChannelManager:
"""Manages multiple event channels"""
def __init__(self):
self.channels: Dict[str, EventChannel] = {}
def create_channel(self, name: str) -> EventChannel:
"""Create a new channel"""
if name not in self.channels:
self.channels[name] = EventChannel(name)
return self.channels[name]
def get_channel(self, name: str) -> EventChannel:
"""Get existing channel"""
return self.channels.get(name)
def remove_channel(self, name: str):
"""Remove a channel"""
if name in self.channels:
del self.channels[name]
Game Systems Using Events
# Achievement System using events
class AchievementSystem:
"""Tracks achievements through events"""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.achievements = {}
self.unlocked = set()
self.progress = {}
# Subscribe to relevant events
self.event_bus.subscribe(EventType.ENEMY_KILLED, self.on_enemy_killed)
self.event_bus.subscribe(EventType.ITEM_COLLECTED, self.on_item_collected)
self.event_bus.subscribe(EventType.LEVEL_COMPLETE, self.on_level_complete)
self.setup_achievements()
def setup_achievements(self):
"""Define achievements"""
self.achievements = {
'first_kill': {
'name': 'First Blood',
'description': 'Defeat your first enemy',
'requirement': 1,
'type': 'enemy_kills'
},
'collector': {
'name': 'Collector',
'description': 'Collect 100 items',
'requirement': 100,
'type': 'items_collected'
},
'speedrun': {
'name': 'Speed Runner',
'description': 'Complete level in under 60 seconds',
'requirement': 60,
'type': 'level_time'
}
}
def on_enemy_killed(self, event: Event):
"""Handle enemy killed events"""
# Update kill counter
if 'enemy_kills' not in self.progress:
self.progress['enemy_kills'] = 0
self.progress['enemy_kills'] += 1
# Check achievements
self.check_achievement('first_kill')
def on_item_collected(self, event: Event):
"""Handle item collection events"""
if 'items_collected' not in self.progress:
self.progress['items_collected'] = 0
self.progress['items_collected'] += 1
self.check_achievement('collector')
def on_level_complete(self, event: Event):
"""Handle level completion events"""
if 'completion_time' in event.data:
if event.data['completion_time'] < 60:
self.unlock_achievement('speedrun')
def check_achievement(self, achievement_id: str):
"""Check if achievement should be unlocked"""
if achievement_id in self.unlocked:
return
achievement = self.achievements.get(achievement_id)
if not achievement:
return
progress_type = achievement['type']
requirement = achievement['requirement']
if progress_type in self.progress:
if self.progress[progress_type] >= requirement:
self.unlock_achievement(achievement_id)
def unlock_achievement(self, achievement_id: str):
"""Unlock an achievement"""
if achievement_id not in self.unlocked:
self.unlocked.add(achievement_id)
achievement = self.achievements[achievement_id]
# Emit achievement unlocked event
self.event_bus.emit(Event(
event_type=EventType.ACHIEVEMENT_UNLOCKED,
data={
'achievement_id': achievement_id,
'name': achievement['name'],
'description': achievement['description']
}
))
print(f"Achievement Unlocked: {achievement['name']}")
# Audio System using events
class AudioSystem:
"""Plays sounds based on game events"""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.sounds = {}
self.music_volume = 0.7
self.sfx_volume = 0.8
# Subscribe to audio events
self.event_bus.subscribe(EventType.PLAYER_JUMP, self.on_player_jump)
self.event_bus.subscribe(EventType.PLAYER_SHOOT, self.on_player_shoot)
self.event_bus.subscribe(EventType.ENEMY_KILLED, self.on_enemy_killed)
self.event_bus.subscribe(EventType.ITEM_COLLECTED, self.on_item_collected)
self.event_bus.subscribe(EventType.LEVEL_COMPLETE, self.on_level_complete)
def on_player_jump(self, event: Event):
"""Play jump sound"""
self.play_sound('jump', self.sfx_volume)
def on_player_shoot(self, event: Event):
"""Play shoot sound"""
self.play_sound('shoot', self.sfx_volume)
def on_enemy_killed(self, event: Event):
"""Play enemy death sound"""
enemy_type = event.data.get('enemy_type', 'default')
self.play_sound(f'enemy_death_{enemy_type}', self.sfx_volume)
def on_item_collected(self, event: Event):
"""Play item collection sound"""
item_type = event.data.get('item_type', 'default')
self.play_sound(f'pickup_{item_type}', self.sfx_volume)
def on_level_complete(self, event: Event):
"""Play level complete music"""
self.play_sound('victory', self.music_volume)
def play_sound(self, sound_name: str, volume: float):
"""Play a sound effect"""
print(f"Playing sound: {sound_name} at volume {volume}")
# In real implementation, would play actual sound
# Save System using events
class SaveSystem:
"""Auto-saves game based on events"""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.save_data = {}
self.auto_save_enabled = True
self.last_save_time = time.time()
self.save_interval = 60 # Auto-save every 60 seconds
# Subscribe to save-triggering events
self.event_bus.subscribe(EventType.CHECKPOINT_REACHED, self.on_checkpoint)
self.event_bus.subscribe(EventType.LEVEL_COMPLETE, self.on_level_complete)
self.event_bus.subscribe(EventType.ITEM_COLLECTED, self.on_item_collected)
def on_checkpoint(self, event: Event):
"""Save at checkpoint"""
checkpoint_id = event.data.get('checkpoint_id')
self.save_data['last_checkpoint'] = checkpoint_id
self.save_game()
def on_level_complete(self, event: Event):
"""Save after level completion"""
level_id = event.data.get('level_id')
self.save_data['completed_levels'] = self.save_data.get('completed_levels', [])
self.save_data['completed_levels'].append(level_id)
self.save_game()
def on_item_collected(self, event: Event):
"""Update inventory in save data"""
item_id = event.data.get('item_id')
if 'inventory' not in self.save_data:
self.save_data['inventory'] = []
self.save_data['inventory'].append(item_id)
# Auto-save if interval passed
if self.auto_save_enabled:
current_time = time.time()
if current_time - self.last_save_time > self.save_interval:
self.save_game()
def save_game(self):
"""Save game state"""
self.save_data['timestamp'] = time.time()
print(f"Game saved: {self.save_data}")
self.last_save_time = time.time()
# Emit save complete event
self.event_bus.emit(Event(
event_type=EventType.GAME_SAVED,
data={'save_data': self.save_data}
))
Complete Event-Driven Game Example
import pygame
from typing import List
class EventDrivenGame:
"""Pygame loop integrated with EventBus showing process_events() at the safe boundary.
The publisher (Game) holds ZERO references to subscriber systems after construction
— they wire themselves into the bus in their own __init__ and operate purely through
the bus, the load-bearing publish-subscribe decoupling property. Adding a fifth
subscriber requires zero edits to Game; only the bus holds the listener identities.
"""
def __init__(self) -> None:
pygame.init()
self.screen = pygame.display.set_mode((600, 400))
pygame.display.set_caption("Event-Driven Game")
self.clock = pygame.time.Clock()
self.event_bus = PriorityEventBus()
# Subscribers wire themselves into the bus; Game does not retain references:
AchievementSystem(self.event_bus)
AudioSystem(self.event_bus)
SaveSystem(self.event_bus)
self.player = Player(300, 200, self.event_bus)
self.score = 0
self.running = True
self.event_bus.emit(Event(event_type=EventType.GAME_START, priority=1))
def handle_input(self, key: int) -> None:
"""Translate pygame KEYDOWN events into game events on the bus."""
if key == pygame.K_SPACE:
self.event_bus.emit(Event(
event_type=EventType.PLAYER_JUMP,
data={'pos': (self.player.x, self.player.y)}
))
elif key == pygame.K_x:
# Queued: dispatch lands at top of next frame, batched at the safe boundary.
self.event_bus.emit(Event(
event_type=EventType.ENEMY_KILLED,
data={'enemy_type': 'goblin', 'points': 10}
))
self.score += 10
elif key == pygame.K_h:
# Take damage; PLAYER_DEATH at priority=2 fires inline if health hits 0.
self.player.take_damage(25)
def run(self) -> None:
"""Main loop: drain queued events at the TOP of every frame.
Same accumulator-then-integrate shape as chat-44 fixed-timestep physics:
consume deferred work at the safe batch boundary, then advance the frame.
"""
while self.running:
self.event_bus.process_events() # batch boundary; queue depth resets to 0
for ev in pygame.event.get():
if ev.type == pygame.QUIT:
self.running = False
elif ev.type == pygame.KEYDOWN:
self.handle_input(ev.key)
self.screen.fill((20, 20, 30))
pygame.display.flip()
self.clock.tick(60)
class Player:
"""Player emitting PLAYER_DEATH at priority=2 so death lands ahead of routine events."""
def __init__(self, x: int, y: int, event_bus: EventBus) -> None:
self.x = x
self.y = y
self.event_bus = event_bus
self.health = 100
self.max_health = 100
def take_damage(self, amount: int) -> None:
old_health = self.health
self.health = max(0, self.health - amount)
self.event_bus.emit(Event(
event_type=EventType.DAMAGE_TAKEN,
data={'old_health': old_health, 'new_health': self.health, 'damage': amount}
))
if self.health <= 0:
# priority=2 keeps death ahead of routine queued events at dispatch time.
self.event_bus.emit(Event(
event_type=EventType.PLAYER_DEATH,
priority=2
))
if __name__ == "__main__":
EventDrivenGame().run()
Best Practices
⚡ Event System Tips
- Event Naming: Use clear, descriptive event names
- Data Contracts: Document expected event data structures
- Avoid Cycles: Prevent events from triggering circular dependencies
- Priority Levels: Use priorities for critical events
- Error Handling: Always catch exceptions in event handlers
- Memory Management: Clean up listeners when objects are destroyed
- Event Batching: Process multiple events together for efficiency
- Debug Logging: Log event flow for debugging
Common Patterns
🎮 Event System Patterns
- Command Pattern: Events as commands with undo/redo
- Event Sourcing: Store all events as game history
- Event Aggregation: Combine related events
- Wildcard Subscriptions: Subscribe to event patterns
- Event Replay: Replay events for debugging or replay systems
- Deferred Events: Schedule events for future processing
Key Takeaways
- 📡 Events enable decoupled communication between systems
- 🔄 Publishers don't need to know about subscribers
- 📨 Events can be immediate or queued for processing
- ⚡ Priority systems handle critical events first
- 📝 Event history enables replay and debugging
- 🎯 Filters allow selective event handling
- 🔧 Channels organize related events
🏋️♂️ Practice Exercise
🏋️♂️ Exercise 1: Event Bus + Four Decoupled Subscribers + Queued/Immediate/Resilient Dispatch in One Pygame Window
Objective: Build an ~85-line single-process pygame demo that exercises the three pillar event-system patterns from this lesson — decoupled publish-subscribe topology where emitters share only the event-type identifier with subscribers (zero direct references between publisher and any listener), queued emit() via a queue + process_events() drain at the top of every frame vs synchronous emit_immediate() as orthogonal dispatch modes (queued = batched at safe boundary; immediate = listener reaction lands before publisher's next line), and per-callback try/except in _dispatch as the resilience guarantee that keeps remaining listeners running when one throws — in one runnable pygame window where all three pillars are visible on screen. An EventBus class holds a listeners dict mapping EventType → list of callbacks and a queue list; subscribe(event_type, cb) appends; emit(event_type, data) queues an Event onto the list; emit_immediate(event_type, data) bypasses the queue and dispatches synchronously the same call frame; process_events() drains the queue at the top of every frame; _dispatch(event) walks listeners[event.event_type] and wraps each cb(event) in try/except so a buggy listener can't break dispatch to remaining listeners (the bus catches the exception, records it on last_error for HUD display, and continues iterating). Four orthogonal subscriber systems register at startup with ZERO references to the publisher: AudioSystem subscribes to PLAYER_HIT + ENEMY_KILLED (prints '[Audio] hit' / '[Audio] death'); HUDSystem subscribes to PLAYER_HIT + SCORE_CHANGED (mutates per-frame strings player_hp and score); ParticleSystem subscribes to ENEMY_KILLED (spawns 12 yellow particles per death decaying over ~25 frames); AchievementSystem subscribes to ENEMY_KILLED + SCORE_CHANGED (increments lifetime_kills and tracks max_score). Keys: 1 = bus.emit(PLAYER_HIT) queued, dispatched at top of NEXT frame so HUD updates one frame later (queue depth visibly goes 0 → 1 the frame the key is pressed and back to 0 after next-frame drain); 2 = bus.emit_immediate(ENEMY_KILLED, ...) synchronous, particles spawn the SAME frame and AchievementSystem.lifetime_kills increments same frame (queue depth never ticks); 3 = bus.emit(SCORE_CHANGED, ...) queued; F = toggle 'fault inject' in HUDSystem so its on_score handler raises ValueError('injected') when fired — the EventBus's try/except wrapper catches it, prints '[Bus] error in HUDSystem.on_score: injected', AchievementSystem still receives the SCORE_CHANGED event and updates max_score (visible proof that try/except isolates per-listener failures so partial bus failure doesn't become full bus failure). The publisher (a thin Game object that emits via the bus) holds ZERO attribute references to AudioSystem / HUDSystem / ParticleSystem / AchievementSystem after construction — adding a fifth subscriber later requires zero edits to the publisher. HUD shows: queue depth (rises by 1 on each queued emit, drops to 0 at top of next frame after process_events drains), last-event-type, player_hp, score, lifetime_kills, max_score, particle count, fault-inject state, last-error message — every signal feeding the bus is on screen so the abstract pub-sub / queued-vs-immediate / try-except shapes become concrete numbers per frame. Closes the architecture module's decoupling-via-indirection arc started at chat-54 M2 architecture_state_machines (decouple control flow over time via explicit transition table) + chat-56 M1 architecture_component_systems (decouple behavior over scope via component-filter) + chat-57 M1 architecture_event_systems here (decouple cross-system communication via centralized message bus) — three orthogonal axes of decoupling, each replacing a different form of imperative tight-coupling. Couples directly to chat-44 M5 fixed-timestep determinism (queued events as deferred work consumed at safe boundaries, the same accumulator-then-integrate shape applied to event dispatch instead of physics integration), chat-46 M2 platformer_tilemap is_solid-True-OOB invariant (try/except wrapper as fail-safe-defaults at the bus boundary — the same Saltzer & Schroeder principle applied to listener-error handling), chat-47 M2 level_design tile-palette JSON externalization (data-driven event-type identifier as the contract between emitter and listener, the same externalize-the-contract pattern), chat-54 M2 architecture_state_machines explicit transition table (events as transition triggers — the FSM consumes events to drive state changes; in production architectures a state machine subscribes to relevant events from the bus rather than reading them from coupled callers), and chat-56 M1 architecture_component_systems composable system-filter (event-bus is the natural cross-system communication layer that pairs with component-filter's intra-system entity selection — systems pick which entities to operate on via component-filter and communicate among themselves via the event bus). Continues the architecture module advancing 3/5 → 4/5 partial at chat-57 M1; recommended chat-58 = architecture_save_load to close architecture 5/5 = 8th complete Phase-8 module.
Instructions:
- Define
EventTypeas an enum with PLAYER_HIT / ENEMY_KILLED / SCORE_CHANGED; define anEventdataclass with event_type and data:dict (default empty). - Build
EventBuswithlistenersdict,queuelist,last_errorstring;subscribe(event_type, cb),emit(event_type, data=None)(appends to queue),emit_immediate(event_type, data=None)(calls _dispatch directly),process_events()(drains queue),_dispatch(event)wrapping each callback in try/except so one listener's exception can't kill remaining dispatch. - Implement AudioSystem / HUDSystem / ParticleSystem / AchievementSystem each with __init__(bus) that calls bus.subscribe(...) for the event types it cares about; the publisher (Game) never imports or references these classes after construction (only the bus).
- Wire keys 1/2/3 to emit / emit_immediate / emit; F to toggle HUDSystem.fault; call
bus.process_events()at the TOP of every frame so queued events from the previous frame land before this frame's input/render — queue depth drops to 0 at this drain, then rises to 1 on the first queued emit of this frame, visible on the HUD. - Render HUD lines for queue depth, last-event-type, player_hp, score, lifetime_kills, max_score, particle count, fault-inject state, and last-error message; observe (a) emit_immediate (key 2) updates HUD same frame while emit (keys 1+3) updates next frame; (b) fault-inject (F + key 3) shows the HUDSystem error message while AchievementSystem still increments max_score; (c) the Game object has zero direct references to any subscriber system after construction.
💡 Hint
The decoupling test: after construction, the publisher (Game) should have ZERO attribute references to AudioSystem / HUDSystem / ParticleSystem / AchievementSystem — they wire themselves into the bus in their own __init__ and then operate purely through the bus, which is the publish-subscribe topology made concrete. The queued-vs-immediate test: queue depth on the HUD goes 0 → 1 the frame you press key 1 or 3 and back to 0 on the NEXT frame's process_events drain; key 2 (emit_immediate) never increments the queue depth at all because it bypasses the queue and dispatches synchronously. The try-except test: with fault-inject ON, pressing 3 fires HUDSystem.on_score which raises ValueError, the bus prints the error onto last_error, and AchievementSystem.max_score still updates — proving the remaining-subscribers-still-run guarantee is real, not theoretical.
✅ Example Solution
import pygame, sys
from enum import Enum, auto
from dataclasses import dataclass, field
class EventType(Enum):
PLAYER_HIT = auto(); ENEMY_KILLED = auto(); SCORE_CHANGED = auto()
@dataclass
class Event:
event_type: EventType
data: dict = field(default_factory=dict)
class EventBus:
def __init__(self):
self.listeners = {}
self.queue = []
self.last_error = ''
def subscribe(self, et, cb):
self.listeners.setdefault(et, []).append(cb)
def emit(self, et, data=None):
self.queue.append(Event(et, data or {}))
def emit_immediate(self, et, data=None):
self._dispatch(Event(et, data or {}))
def process_events(self):
while self.queue:
self._dispatch(self.queue.pop(0))
def _dispatch(self, ev):
for cb in self.listeners.get(ev.event_type, []):
try:
cb(ev)
except Exception as e:
self.last_error = f'{cb.__qualname__}: {e}'
print(f'[Bus] error in {cb.__qualname__}: {e}')
class AudioSystem:
def __init__(self, bus):
bus.subscribe(EventType.PLAYER_HIT, self.on_hit)
bus.subscribe(EventType.ENEMY_KILLED, self.on_kill)
def on_hit(self, e): print('[Audio] hit')
def on_kill(self, e): print('[Audio] death')
class HUDSystem:
def __init__(self, bus):
self.player_hp = 100; self.score = 0; self.fault = False
bus.subscribe(EventType.PLAYER_HIT, self.on_hit)
bus.subscribe(EventType.SCORE_CHANGED, self.on_score)
def on_hit(self, e):
self.player_hp = max(0, self.player_hp - 10)
def on_score(self, e):
if self.fault:
raise ValueError('injected')
self.score = e.data.get('score', self.score)
class ParticleSystem:
def __init__(self, bus):
self.particles = []
bus.subscribe(EventType.ENEMY_KILLED, self.on_kill)
def on_kill(self, e):
x, y = e.data.get('pos', (400, 240))
for _ in range(12):
self.particles.append([x, y, 25])
def update(self):
for p in self.particles:
p[1] += 2; p[2] -= 1
self.particles = [p for p in self.particles if p[2] > 0]
class AchievementSystem:
def __init__(self, bus):
self.lifetime_kills = 0; self.max_score = 0
bus.subscribe(EventType.ENEMY_KILLED, self.on_kill)
bus.subscribe(EventType.SCORE_CHANGED, self.on_score)
def on_kill(self, e): self.lifetime_kills += 1
def on_score(self, e):
self.max_score = max(self.max_score, e.data.get('score', 0))
pygame.init()
screen = pygame.display.set_mode((800, 480))
clock = pygame.time.Clock()
font = pygame.font.SysFont('monospace', 14)
bus = EventBus()
# Publisher (Game) holds NO references to these after this line:
AudioSystem(bus); hud = HUDSystem(bus); part = ParticleSystem(bus); ach = AchievementSystem(bus)
last_event = '-'; score_counter = 0
while True:
bus.process_events() # drain queued events from prior frame at TOP
for ev in pygame.event.get():
if ev.type == pygame.QUIT:
pygame.quit(); sys.exit()
if ev.type == pygame.KEYDOWN:
if ev.key == pygame.K_1:
bus.emit(EventType.PLAYER_HIT); last_event = 'PLAYER_HIT (queued)'
elif ev.key == pygame.K_2:
bus.emit_immediate(EventType.ENEMY_KILLED, {'pos': (400, 240)}); last_event = 'ENEMY_KILLED (immediate)'
elif ev.key == pygame.K_3:
score_counter += 100; bus.emit(EventType.SCORE_CHANGED, {'score': score_counter}); last_event = 'SCORE_CHANGED (queued)'
elif ev.key == pygame.K_f:
hud.fault = not hud.fault
part.update()
screen.fill((20, 20, 30))
for p in part.particles:
pygame.draw.circle(screen, (255, 230, 80), (int(p[0]), int(p[1])), max(1, p[2] // 5))
lines = [
f'queue={len(bus.queue)} last={last_event}',
f'hp={hud.player_hp} score={hud.score}',
f'kills={ach.lifetime_kills} max_score={ach.max_score}',
f'particles={len(part.particles)} fault={hud.fault}',
f'err={bus.last_error[:60]}',
'1=PLAYER_HIT(q) 2=ENEMY_KILLED(imm) 3=SCORE(q) F=fault',
]
for i, line in enumerate(lines):
screen.blit(font.render(line, True, (220, 220, 220)), (10, 10 + i * 18))
pygame.display.flip()
clock.tick(60)
🎯 Quick Quiz
Question 1: In the lesson's EventBus, the subscribe / emit / _dispatch_event trio implements publish-subscribe by having listeners register only for an EventType identifier, while emitters call event_bus.emit(Event(event_type=ENEMY_KILLED, ...)) with no reference to who's listening. Why is this design's zero-references-between-emitter-and-listener property — rather than performance — the load-bearing claim?
Question 2: The lesson distinguishes EventBus.emit(event) (which calls self.event_queue.put(event) and processes via process_events()) from EventBus.emit_immediate(event) (which calls self._dispatch_event(event) directly, bypassing the queue). Why does the lesson keep both methods rather than collapsing to one?
Question 3: The lesson's _dispatch_event wraps each callback in try/except with print(f'Error in event handler {callback.__name__}: {e}') rather than letting the exception propagate up through the dispatch loop. Why is this wrap correctness, not just convenience?
What's Next?
Now that you understand event systems, next we'll explore save/load systems - how to persist and restore game state!