Skip to main content

Event Systems

Creating Decoupled Communication Between Systems

Event systems enable game components to communicate without direct dependencies! They create loose coupling between systems, making your code more maintainable, testable, and flexible. Let's master this powerful pattern! 📡🎮

Understanding Event Systems

📻 The Radio Broadcast Analogy

Think of event systems like radio broadcasting:

graph TD A["Event System"] --> B["Event Bus"] B --> C["Event Queue"] B --> D["Listeners Map"] B --> E["Event Dispatcher"] F["Publisher 1"] --> |"Emit Event"| B G["Publisher 2"] --> |"Emit Event"| B B --> |"Notify"| H["Subscriber 1"] B --> |"Notify"| I["Subscriber 2"] B --> |"Notify"| J["Subscriber 3"] K["Event Types"] --> L["Game Events"] K --> M["Input Events"] K --> N["System Events"] K --> O["UI Events"]

Interactive Event System Visualizer

Watch events flow through the system in real-time!

System Subscriptions:

Events Emitted: 0 | Queue Size: 0

Basic Event System Implementation

from typing import Dict, List, Callable, Any
from dataclasses import dataclass
from enum import Enum, auto
import time
from queue import Queue, PriorityQueue

class EventType(Enum):
    """Enumeration of game event types"""
    # Game Events
    GAME_START = auto()
    GAME_PAUSE = auto()
    GAME_RESUME = auto()
    GAME_OVER = auto()
    
    # Player Events
    PLAYER_SPAWN = auto()
    PLAYER_DEATH = auto()
    PLAYER_JUMP = auto()
    PLAYER_SHOOT = auto()
    
    # Combat Events
    DAMAGE_DEALT = auto()
    DAMAGE_TAKEN = auto()
    ENEMY_KILLED = auto()
    
    # Item Events
    ITEM_COLLECTED = auto()
    ITEM_USED = auto()
    ITEM_DROPPED = auto()
    
    # Level Events
    LEVEL_START = auto()
    LEVEL_COMPLETE = auto()
    CHECKPOINT_REACHED = auto()
    
    # UI Events
    BUTTON_CLICKED = auto()
    MENU_OPENED = auto()
    MENU_CLOSED = auto()

@dataclass
class Event:
    """Base class for all events"""
    event_type: EventType
    data: Dict[str, Any] = None
    timestamp: float = None
    priority: int = 5  # Default priority (1 = highest, 10 = lowest)
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()
        if self.data is None:
            self.data = {}

class EventBus:
    """Central event distribution system"""
    def __init__(self):
        self.listeners: Dict[EventType, List[Callable]] = {}
        self.event_queue: Queue = Queue()
        self.processing = False
        
    def subscribe(self, event_type: EventType, callback: Callable):
        """Subscribe to an event type"""
        if event_type not in self.listeners:
            self.listeners[event_type] = []
        
        if callback not in self.listeners[event_type]:
            self.listeners[event_type].append(callback)
            print(f"Subscribed {callback.__name__} to {event_type.name}")
    
    def unsubscribe(self, event_type: EventType, callback: Callable):
        """Unsubscribe from an event type"""
        if event_type in self.listeners:
            if callback in self.listeners[event_type]:
                self.listeners[event_type].remove(callback)
                print(f"Unsubscribed {callback.__name__} from {event_type.name}")
    
    def emit(self, event: Event):
        """Emit an event to all subscribers"""
        # Add to queue for processing
        self.event_queue.put(event)
        
        # Process immediately if not already processing
        if not self.processing:
            self.process_events()
    
    def emit_immediate(self, event: Event):
        """Emit an event immediately, bypassing the queue"""
        self._dispatch_event(event)
    
    def process_events(self):
        """Process all queued events"""
        self.processing = True
        
        while not self.event_queue.empty():
            event = self.event_queue.get()
            self._dispatch_event(event)
        
        self.processing = False
    
    def _dispatch_event(self, event: Event):
        """Dispatch event to all listeners"""
        if event.event_type in self.listeners:
            for callback in self.listeners[event.event_type]:
                try:
                    callback(event)
                except Exception as e:
                    print(f"Error in event handler {callback.__name__}: {e}")
    
    def clear(self):
        """Clear all listeners and queued events"""
        self.listeners.clear()
        while not self.event_queue.empty():
            self.event_queue.get()

# Global event bus instance
event_bus = EventBus()

Advanced Event Patterns

# Priority Queue Event System
class PriorityEventBus(EventBus):
    """Event bus with priority queue processing"""
    def __init__(self):
        super().__init__()
        self.event_queue = PriorityQueue()
        
    def emit(self, event: Event):
        """Emit event with priority"""
        # Priority queue uses (priority, item) tuples
        self.event_queue.put((event.priority, event.timestamp, event))
        
        if not self.processing:
            self.process_events()
    
    def process_events(self):
        """Process events in priority order"""
        self.processing = True
        
        while not self.event_queue.empty():
            _, _, event = self.event_queue.get()
            self._dispatch_event(event)
        
        self.processing = False

# Event Manager with filtering
class EventManager:
    """Advanced event management with filtering and wildcards"""
    def __init__(self):
        self.event_bus = EventBus()
        self.event_filters: Dict[str, Callable] = {}
        self.event_history: List[Event] = []
        self.max_history = 100
        
    def subscribe_with_filter(self, event_type: EventType, 
                            callback: Callable, 
                            filter_func: Callable = None):
        """Subscribe with optional filtering"""
        if filter_func:
            # Create filtered callback
            def filtered_callback(event):
                if filter_func(event):
                    callback(event)
            
            self.event_bus.subscribe(event_type, filtered_callback)
        else:
            self.event_bus.subscribe(event_type, callback)
    
    def emit_and_record(self, event: Event):
        """Emit event and keep history"""
        # Add to history
        self.event_history.append(event)
        if len(self.event_history) > self.max_history:
            self.event_history.pop(0)
        
        # Emit event
        self.event_bus.emit(event)
    
    def get_recent_events(self, event_type: EventType = None, 
                          count: int = 10) -> List[Event]:
        """Get recent events from history"""
        if event_type:
            filtered = [e for e in self.event_history 
                       if e.event_type == event_type]
            return filtered[-count:]
        return self.event_history[-count:]

# Event Channel System
class EventChannel:
    """Named channel for specific event types"""
    def __init__(self, name: str):
        self.name = name
        self.subscribers: List[Callable] = []
        self.enabled = True
        
    def subscribe(self, callback: Callable):
        """Subscribe to this channel"""
        if callback not in self.subscribers:
            self.subscribers.append(callback)
    
    def unsubscribe(self, callback: Callable):
        """Unsubscribe from this channel"""
        if callback in self.subscribers:
            self.subscribers.remove(callback)
    
    def broadcast(self, data: Any):
        """Broadcast to all subscribers"""
        if not self.enabled:
            return
        
        event = Event(
            event_type=EventType.CUSTOM,
            data={'channel': self.name, 'payload': data}
        )
        
        for callback in self.subscribers:
            try:
                callback(event)
            except Exception as e:
                print(f"Error in channel {self.name}: {e}")
    
    def toggle(self, enabled: bool):
        """Enable/disable channel"""
        self.enabled = enabled

class ChannelManager:
    """Manages multiple event channels"""
    def __init__(self):
        self.channels: Dict[str, EventChannel] = {}
    
    def create_channel(self, name: str) -> EventChannel:
        """Create a new channel"""
        if name not in self.channels:
            self.channels[name] = EventChannel(name)
        return self.channels[name]
    
    def get_channel(self, name: str) -> EventChannel:
        """Get existing channel"""
        return self.channels.get(name)
    
    def remove_channel(self, name: str):
        """Remove a channel"""
        if name in self.channels:
            del self.channels[name]

Game Systems Using Events

# Achievement System using events
class AchievementSystem:
    """Tracks achievements through events"""
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.achievements = {}
        self.unlocked = set()
        self.progress = {}
        
        # Subscribe to relevant events
        self.event_bus.subscribe(EventType.ENEMY_KILLED, self.on_enemy_killed)
        self.event_bus.subscribe(EventType.ITEM_COLLECTED, self.on_item_collected)
        self.event_bus.subscribe(EventType.LEVEL_COMPLETE, self.on_level_complete)
        
        self.setup_achievements()
    
    def setup_achievements(self):
        """Define achievements"""
        self.achievements = {
            'first_kill': {
                'name': 'First Blood',
                'description': 'Defeat your first enemy',
                'requirement': 1,
                'type': 'enemy_kills'
            },
            'collector': {
                'name': 'Collector',
                'description': 'Collect 100 items',
                'requirement': 100,
                'type': 'items_collected'
            },
            'speedrun': {
                'name': 'Speed Runner',
                'description': 'Complete level in under 60 seconds',
                'requirement': 60,
                'type': 'level_time'
            }
        }
    
    def on_enemy_killed(self, event: Event):
        """Handle enemy killed events"""
        # Update kill counter
        if 'enemy_kills' not in self.progress:
            self.progress['enemy_kills'] = 0
        self.progress['enemy_kills'] += 1
        
        # Check achievements
        self.check_achievement('first_kill')
    
    def on_item_collected(self, event: Event):
        """Handle item collection events"""
        if 'items_collected' not in self.progress:
            self.progress['items_collected'] = 0
        self.progress['items_collected'] += 1
        
        self.check_achievement('collector')
    
    def on_level_complete(self, event: Event):
        """Handle level completion events"""
        if 'completion_time' in event.data:
            if event.data['completion_time'] < 60:
                self.unlock_achievement('speedrun')
    
    def check_achievement(self, achievement_id: str):
        """Check if achievement should be unlocked"""
        if achievement_id in self.unlocked:
            return
        
        achievement = self.achievements.get(achievement_id)
        if not achievement:
            return
        
        progress_type = achievement['type']
        requirement = achievement['requirement']
        
        if progress_type in self.progress:
            if self.progress[progress_type] >= requirement:
                self.unlock_achievement(achievement_id)
    
    def unlock_achievement(self, achievement_id: str):
        """Unlock an achievement"""
        if achievement_id not in self.unlocked:
            self.unlocked.add(achievement_id)
            achievement = self.achievements[achievement_id]
            
            # Emit achievement unlocked event
            self.event_bus.emit(Event(
                event_type=EventType.ACHIEVEMENT_UNLOCKED,
                data={
                    'achievement_id': achievement_id,
                    'name': achievement['name'],
                    'description': achievement['description']
                }
            ))
            
            print(f"Achievement Unlocked: {achievement['name']}")

# Audio System using events
class AudioSystem:
    """Plays sounds based on game events"""
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.sounds = {}
        self.music_volume = 0.7
        self.sfx_volume = 0.8
        
        # Subscribe to audio events
        self.event_bus.subscribe(EventType.PLAYER_JUMP, self.on_player_jump)
        self.event_bus.subscribe(EventType.PLAYER_SHOOT, self.on_player_shoot)
        self.event_bus.subscribe(EventType.ENEMY_KILLED, self.on_enemy_killed)
        self.event_bus.subscribe(EventType.ITEM_COLLECTED, self.on_item_collected)
        self.event_bus.subscribe(EventType.LEVEL_COMPLETE, self.on_level_complete)
    
    def on_player_jump(self, event: Event):
        """Play jump sound"""
        self.play_sound('jump', self.sfx_volume)
    
    def on_player_shoot(self, event: Event):
        """Play shoot sound"""
        self.play_sound('shoot', self.sfx_volume)
    
    def on_enemy_killed(self, event: Event):
        """Play enemy death sound"""
        enemy_type = event.data.get('enemy_type', 'default')
        self.play_sound(f'enemy_death_{enemy_type}', self.sfx_volume)
    
    def on_item_collected(self, event: Event):
        """Play item collection sound"""
        item_type = event.data.get('item_type', 'default')
        self.play_sound(f'pickup_{item_type}', self.sfx_volume)
    
    def on_level_complete(self, event: Event):
        """Play level complete music"""
        self.play_sound('victory', self.music_volume)
    
    def play_sound(self, sound_name: str, volume: float):
        """Play a sound effect"""
        print(f"Playing sound: {sound_name} at volume {volume}")
        # In real implementation, would play actual sound

# Save System using events
class SaveSystem:
    """Auto-saves game based on events"""
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.save_data = {}
        self.auto_save_enabled = True
        self.last_save_time = time.time()
        self.save_interval = 60  # Auto-save every 60 seconds
        
        # Subscribe to save-triggering events
        self.event_bus.subscribe(EventType.CHECKPOINT_REACHED, self.on_checkpoint)
        self.event_bus.subscribe(EventType.LEVEL_COMPLETE, self.on_level_complete)
        self.event_bus.subscribe(EventType.ITEM_COLLECTED, self.on_item_collected)
    
    def on_checkpoint(self, event: Event):
        """Save at checkpoint"""
        checkpoint_id = event.data.get('checkpoint_id')
        self.save_data['last_checkpoint'] = checkpoint_id
        self.save_game()
    
    def on_level_complete(self, event: Event):
        """Save after level completion"""
        level_id = event.data.get('level_id')
        self.save_data['completed_levels'] = self.save_data.get('completed_levels', [])
        self.save_data['completed_levels'].append(level_id)
        self.save_game()
    
    def on_item_collected(self, event: Event):
        """Update inventory in save data"""
        item_id = event.data.get('item_id')
        if 'inventory' not in self.save_data:
            self.save_data['inventory'] = []
        self.save_data['inventory'].append(item_id)
        
        # Auto-save if interval passed
        if self.auto_save_enabled:
            current_time = time.time()
            if current_time - self.last_save_time > self.save_interval:
                self.save_game()
    
    def save_game(self):
        """Save game state"""
        self.save_data['timestamp'] = time.time()
        print(f"Game saved: {self.save_data}")
        self.last_save_time = time.time()
        
        # Emit save complete event
        self.event_bus.emit(Event(
            event_type=EventType.GAME_SAVED,
            data={'save_data': self.save_data}
        ))

Complete Event-Driven Game Example

import pygame
from typing import List

class EventDrivenGame:
    """Pygame loop integrated with EventBus showing process_events() at the safe boundary.

    The publisher (Game) holds ZERO references to subscriber systems after construction
    — they wire themselves into the bus in their own __init__ and operate purely through
    the bus, the load-bearing publish-subscribe decoupling property. Adding a fifth
    subscriber requires zero edits to Game; only the bus holds the listener identities.
    """
    def __init__(self) -> None:
        pygame.init()
        self.screen = pygame.display.set_mode((600, 400))
        pygame.display.set_caption("Event-Driven Game")
        self.clock = pygame.time.Clock()
        self.event_bus = PriorityEventBus()
        # Subscribers wire themselves into the bus; Game does not retain references:
        AchievementSystem(self.event_bus)
        AudioSystem(self.event_bus)
        SaveSystem(self.event_bus)
        self.player = Player(300, 200, self.event_bus)
        self.score = 0
        self.running = True
        self.event_bus.emit(Event(event_type=EventType.GAME_START, priority=1))

    def handle_input(self, key: int) -> None:
        """Translate pygame KEYDOWN events into game events on the bus."""
        if key == pygame.K_SPACE:
            self.event_bus.emit(Event(
                event_type=EventType.PLAYER_JUMP,
                data={'pos': (self.player.x, self.player.y)}
            ))
        elif key == pygame.K_x:
            # Queued: dispatch lands at top of next frame, batched at the safe boundary.
            self.event_bus.emit(Event(
                event_type=EventType.ENEMY_KILLED,
                data={'enemy_type': 'goblin', 'points': 10}
            ))
            self.score += 10
        elif key == pygame.K_h:
            # Take damage; PLAYER_DEATH at priority=2 fires inline if health hits 0.
            self.player.take_damage(25)

    def run(self) -> None:
        """Main loop: drain queued events at the TOP of every frame.

        Same accumulator-then-integrate shape as chat-44 fixed-timestep physics:
        consume deferred work at the safe batch boundary, then advance the frame.
        """
        while self.running:
            self.event_bus.process_events()  # batch boundary; queue depth resets to 0
            for ev in pygame.event.get():
                if ev.type == pygame.QUIT:
                    self.running = False
                elif ev.type == pygame.KEYDOWN:
                    self.handle_input(ev.key)
            self.screen.fill((20, 20, 30))
            pygame.display.flip()
            self.clock.tick(60)

class Player:
    """Player emitting PLAYER_DEATH at priority=2 so death lands ahead of routine events."""
    def __init__(self, x: int, y: int, event_bus: EventBus) -> None:
        self.x = x
        self.y = y
        self.event_bus = event_bus
        self.health = 100
        self.max_health = 100

    def take_damage(self, amount: int) -> None:
        old_health = self.health
        self.health = max(0, self.health - amount)
        self.event_bus.emit(Event(
            event_type=EventType.DAMAGE_TAKEN,
            data={'old_health': old_health, 'new_health': self.health, 'damage': amount}
        ))
        if self.health <= 0:
            # priority=2 keeps death ahead of routine queued events at dispatch time.
            self.event_bus.emit(Event(
                event_type=EventType.PLAYER_DEATH,
                priority=2
            ))

if __name__ == "__main__":
    EventDrivenGame().run()

Best Practices

⚡ Event System Tips

Common Patterns

🎮 Event System Patterns

Key Takeaways

🏋️‍♂️ Practice Exercise

🏋️‍♂️ Exercise 1: Event Bus + Four Decoupled Subscribers + Queued/Immediate/Resilient Dispatch in One Pygame Window

Objective: Build an ~85-line single-process pygame demo that exercises the three pillar event-system patterns from this lesson — decoupled publish-subscribe topology where emitters share only the event-type identifier with subscribers (zero direct references between publisher and any listener), queued emit() via a queue + process_events() drain at the top of every frame vs synchronous emit_immediate() as orthogonal dispatch modes (queued = batched at safe boundary; immediate = listener reaction lands before publisher's next line), and per-callback try/except in _dispatch as the resilience guarantee that keeps remaining listeners running when one throws — in one runnable pygame window where all three pillars are visible on screen. An EventBus class holds a listeners dict mapping EventType → list of callbacks and a queue list; subscribe(event_type, cb) appends; emit(event_type, data) queues an Event onto the list; emit_immediate(event_type, data) bypasses the queue and dispatches synchronously the same call frame; process_events() drains the queue at the top of every frame; _dispatch(event) walks listeners[event.event_type] and wraps each cb(event) in try/except so a buggy listener can't break dispatch to remaining listeners (the bus catches the exception, records it on last_error for HUD display, and continues iterating). Four orthogonal subscriber systems register at startup with ZERO references to the publisher: AudioSystem subscribes to PLAYER_HIT + ENEMY_KILLED (prints '[Audio] hit' / '[Audio] death'); HUDSystem subscribes to PLAYER_HIT + SCORE_CHANGED (mutates per-frame strings player_hp and score); ParticleSystem subscribes to ENEMY_KILLED (spawns 12 yellow particles per death decaying over ~25 frames); AchievementSystem subscribes to ENEMY_KILLED + SCORE_CHANGED (increments lifetime_kills and tracks max_score). Keys: 1 = bus.emit(PLAYER_HIT) queued, dispatched at top of NEXT frame so HUD updates one frame later (queue depth visibly goes 0 → 1 the frame the key is pressed and back to 0 after next-frame drain); 2 = bus.emit_immediate(ENEMY_KILLED, ...) synchronous, particles spawn the SAME frame and AchievementSystem.lifetime_kills increments same frame (queue depth never ticks); 3 = bus.emit(SCORE_CHANGED, ...) queued; F = toggle 'fault inject' in HUDSystem so its on_score handler raises ValueError('injected') when fired — the EventBus's try/except wrapper catches it, prints '[Bus] error in HUDSystem.on_score: injected', AchievementSystem still receives the SCORE_CHANGED event and updates max_score (visible proof that try/except isolates per-listener failures so partial bus failure doesn't become full bus failure). The publisher (a thin Game object that emits via the bus) holds ZERO attribute references to AudioSystem / HUDSystem / ParticleSystem / AchievementSystem after construction — adding a fifth subscriber later requires zero edits to the publisher. HUD shows: queue depth (rises by 1 on each queued emit, drops to 0 at top of next frame after process_events drains), last-event-type, player_hp, score, lifetime_kills, max_score, particle count, fault-inject state, last-error message — every signal feeding the bus is on screen so the abstract pub-sub / queued-vs-immediate / try-except shapes become concrete numbers per frame. Closes the architecture module's decoupling-via-indirection arc started at chat-54 M2 architecture_state_machines (decouple control flow over time via explicit transition table) + chat-56 M1 architecture_component_systems (decouple behavior over scope via component-filter) + chat-57 M1 architecture_event_systems here (decouple cross-system communication via centralized message bus) — three orthogonal axes of decoupling, each replacing a different form of imperative tight-coupling. Couples directly to chat-44 M5 fixed-timestep determinism (queued events as deferred work consumed at safe boundaries, the same accumulator-then-integrate shape applied to event dispatch instead of physics integration), chat-46 M2 platformer_tilemap is_solid-True-OOB invariant (try/except wrapper as fail-safe-defaults at the bus boundary — the same Saltzer & Schroeder principle applied to listener-error handling), chat-47 M2 level_design tile-palette JSON externalization (data-driven event-type identifier as the contract between emitter and listener, the same externalize-the-contract pattern), chat-54 M2 architecture_state_machines explicit transition table (events as transition triggers — the FSM consumes events to drive state changes; in production architectures a state machine subscribes to relevant events from the bus rather than reading them from coupled callers), and chat-56 M1 architecture_component_systems composable system-filter (event-bus is the natural cross-system communication layer that pairs with component-filter's intra-system entity selection — systems pick which entities to operate on via component-filter and communicate among themselves via the event bus). Continues the architecture module advancing 3/5 → 4/5 partial at chat-57 M1; recommended chat-58 = architecture_save_load to close architecture 5/5 = 8th complete Phase-8 module.

Instructions:

  1. Define EventType as an enum with PLAYER_HIT / ENEMY_KILLED / SCORE_CHANGED; define an Event dataclass with event_type and data:dict (default empty).
  2. Build EventBus with listeners dict, queue list, last_error string; subscribe(event_type, cb), emit(event_type, data=None) (appends to queue), emit_immediate(event_type, data=None) (calls _dispatch directly), process_events() (drains queue), _dispatch(event) wrapping each callback in try/except so one listener's exception can't kill remaining dispatch.
  3. Implement AudioSystem / HUDSystem / ParticleSystem / AchievementSystem each with __init__(bus) that calls bus.subscribe(...) for the event types it cares about; the publisher (Game) never imports or references these classes after construction (only the bus).
  4. Wire keys 1/2/3 to emit / emit_immediate / emit; F to toggle HUDSystem.fault; call bus.process_events() at the TOP of every frame so queued events from the previous frame land before this frame's input/render — queue depth drops to 0 at this drain, then rises to 1 on the first queued emit of this frame, visible on the HUD.
  5. Render HUD lines for queue depth, last-event-type, player_hp, score, lifetime_kills, max_score, particle count, fault-inject state, and last-error message; observe (a) emit_immediate (key 2) updates HUD same frame while emit (keys 1+3) updates next frame; (b) fault-inject (F + key 3) shows the HUDSystem error message while AchievementSystem still increments max_score; (c) the Game object has zero direct references to any subscriber system after construction.
💡 Hint

The decoupling test: after construction, the publisher (Game) should have ZERO attribute references to AudioSystem / HUDSystem / ParticleSystem / AchievementSystem — they wire themselves into the bus in their own __init__ and then operate purely through the bus, which is the publish-subscribe topology made concrete. The queued-vs-immediate test: queue depth on the HUD goes 0 → 1 the frame you press key 1 or 3 and back to 0 on the NEXT frame's process_events drain; key 2 (emit_immediate) never increments the queue depth at all because it bypasses the queue and dispatches synchronously. The try-except test: with fault-inject ON, pressing 3 fires HUDSystem.on_score which raises ValueError, the bus prints the error onto last_error, and AchievementSystem.max_score still updates — proving the remaining-subscribers-still-run guarantee is real, not theoretical.

✅ Example Solution
import pygame, sys
from enum import Enum, auto
from dataclasses import dataclass, field

class EventType(Enum):
    PLAYER_HIT = auto(); ENEMY_KILLED = auto(); SCORE_CHANGED = auto()

@dataclass
class Event:
    event_type: EventType
    data: dict = field(default_factory=dict)

class EventBus:
    def __init__(self):
        self.listeners = {}
        self.queue = []
        self.last_error = ''
    def subscribe(self, et, cb):
        self.listeners.setdefault(et, []).append(cb)
    def emit(self, et, data=None):
        self.queue.append(Event(et, data or {}))
    def emit_immediate(self, et, data=None):
        self._dispatch(Event(et, data or {}))
    def process_events(self):
        while self.queue:
            self._dispatch(self.queue.pop(0))
    def _dispatch(self, ev):
        for cb in self.listeners.get(ev.event_type, []):
            try:
                cb(ev)
            except Exception as e:
                self.last_error = f'{cb.__qualname__}: {e}'
                print(f'[Bus] error in {cb.__qualname__}: {e}')

class AudioSystem:
    def __init__(self, bus):
        bus.subscribe(EventType.PLAYER_HIT, self.on_hit)
        bus.subscribe(EventType.ENEMY_KILLED, self.on_kill)
    def on_hit(self, e): print('[Audio] hit')
    def on_kill(self, e): print('[Audio] death')

class HUDSystem:
    def __init__(self, bus):
        self.player_hp = 100; self.score = 0; self.fault = False
        bus.subscribe(EventType.PLAYER_HIT, self.on_hit)
        bus.subscribe(EventType.SCORE_CHANGED, self.on_score)
    def on_hit(self, e):
        self.player_hp = max(0, self.player_hp - 10)
    def on_score(self, e):
        if self.fault:
            raise ValueError('injected')
        self.score = e.data.get('score', self.score)

class ParticleSystem:
    def __init__(self, bus):
        self.particles = []
        bus.subscribe(EventType.ENEMY_KILLED, self.on_kill)
    def on_kill(self, e):
        x, y = e.data.get('pos', (400, 240))
        for _ in range(12):
            self.particles.append([x, y, 25])
    def update(self):
        for p in self.particles:
            p[1] += 2; p[2] -= 1
        self.particles = [p for p in self.particles if p[2] > 0]

class AchievementSystem:
    def __init__(self, bus):
        self.lifetime_kills = 0; self.max_score = 0
        bus.subscribe(EventType.ENEMY_KILLED, self.on_kill)
        bus.subscribe(EventType.SCORE_CHANGED, self.on_score)
    def on_kill(self, e): self.lifetime_kills += 1
    def on_score(self, e):
        self.max_score = max(self.max_score, e.data.get('score', 0))

pygame.init()
screen = pygame.display.set_mode((800, 480))
clock = pygame.time.Clock()
font = pygame.font.SysFont('monospace', 14)
bus = EventBus()
# Publisher (Game) holds NO references to these after this line:
AudioSystem(bus); hud = HUDSystem(bus); part = ParticleSystem(bus); ach = AchievementSystem(bus)
last_event = '-'; score_counter = 0
while True:
    bus.process_events()  # drain queued events from prior frame at TOP
    for ev in pygame.event.get():
        if ev.type == pygame.QUIT:
            pygame.quit(); sys.exit()
        if ev.type == pygame.KEYDOWN:
            if ev.key == pygame.K_1:
                bus.emit(EventType.PLAYER_HIT); last_event = 'PLAYER_HIT (queued)'
            elif ev.key == pygame.K_2:
                bus.emit_immediate(EventType.ENEMY_KILLED, {'pos': (400, 240)}); last_event = 'ENEMY_KILLED (immediate)'
            elif ev.key == pygame.K_3:
                score_counter += 100; bus.emit(EventType.SCORE_CHANGED, {'score': score_counter}); last_event = 'SCORE_CHANGED (queued)'
            elif ev.key == pygame.K_f:
                hud.fault = not hud.fault
    part.update()
    screen.fill((20, 20, 30))
    for p in part.particles:
        pygame.draw.circle(screen, (255, 230, 80), (int(p[0]), int(p[1])), max(1, p[2] // 5))
    lines = [
        f'queue={len(bus.queue)}  last={last_event}',
        f'hp={hud.player_hp}  score={hud.score}',
        f'kills={ach.lifetime_kills}  max_score={ach.max_score}',
        f'particles={len(part.particles)}  fault={hud.fault}',
        f'err={bus.last_error[:60]}',
        '1=PLAYER_HIT(q)  2=ENEMY_KILLED(imm)  3=SCORE(q)  F=fault',
    ]
    for i, line in enumerate(lines):
        screen.blit(font.render(line, True, (220, 220, 220)), (10, 10 + i * 18))
    pygame.display.flip()
    clock.tick(60)

🎯 Quick Quiz

Question 1: In the lesson's EventBus, the subscribe / emit / _dispatch_event trio implements publish-subscribe by having listeners register only for an EventType identifier, while emitters call event_bus.emit(Event(event_type=ENEMY_KILLED, ...)) with no reference to who's listening. Why is this design's zero-references-between-emitter-and-listener property — rather than performance — the load-bearing claim?

Question 2: The lesson distinguishes EventBus.emit(event) (which calls self.event_queue.put(event) and processes via process_events()) from EventBus.emit_immediate(event) (which calls self._dispatch_event(event) directly, bypassing the queue). Why does the lesson keep both methods rather than collapsing to one?

Question 3: The lesson's _dispatch_event wraps each callback in try/except with print(f'Error in event handler {callback.__name__}: {e}') rather than letting the exception propagate up through the dispatch loop. Why is this wrap correctness, not just convenience?

What's Next?

Now that you understand event systems, next we'll explore save/load systems - how to persist and restore game state!