Simple Lobby System
Creating Game Lobbies and Matchmaking
Build a complete lobby system for your multiplayer games! Learn room creation, player matchmaking, ready states, game launching, and session management! 🎮🚪👥
Understanding Lobby Systems
🏛️ What is a Game Lobby?
A lobby is a virtual waiting room where players gather before starting a game session. Think of it as the multiplayer equivalent of a main menu!
Core Features:
- Room Creation: Players can create new game rooms
- Room Discovery: Browse and join existing rooms
- Player Management: Track who's in each room
- Ready System: Ensure all players are prepared
- Game Launch: Start when conditions are met
- Chat System: Pre-game communication
Complete Lobby Server Implementation
🖥️ Lobby Server with Rooms
import socket
import threading
import json
import time
import random
import string
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
class PlayerState(Enum):
LOBBY = "lobby"
IN_ROOM = "in_room"
READY = "ready"
IN_GAME = "in_game"
class RoomState(Enum):
WAITING = "waiting"
STARTING = "starting"
IN_PROGRESS = "in_progress"
FINISHED = "finished"
class Player:
def __init__(self, socket: socket.socket, address: Tuple[str, int], player_id: int) -> None:
self.socket = socket
self.address = address
self.id = player_id
self.name = f"Player_{player_id}"
self.state = PlayerState.LOBBY
self.room_id = None
self.is_ready = False
self.is_host = False
self.stats = {
'wins': 0,
'losses': 0,
'games_played': 0,
'rating': 1000
}
def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'name': self.name,
'state': self.state.value,
'is_ready': self.is_ready,
'is_host': self.is_host,
'stats': self.stats
}
class GameRoom:
def __init__(self, room_id: str, host_id: int, settings: Optional[Dict[str, Any]] = None) -> None:
self.id = room_id
self.host_id = host_id
self.players = {} # {player_id: Player}
self.state = RoomState.WAITING
self.settings = settings or self.default_settings()
self.created_at = time.time()
self.game_data = {}
def default_settings(self) -> Dict[str, Any]:
return {
'name': f"Room {self.id}",
'max_players': 4,
'min_players': 2,
'game_mode': 'classic',
'map': 'default',
'time_limit': 300, # 5 minutes
'is_private': False,
'password': None
}
def add_player(self, player: Player) -> bool:
"""Add player to room"""
if len(self.players) >= self.settings['max_players']:
return False
self.players[player.id] = player
player.room_id = self.id
player.state = PlayerState.IN_ROOM
player.is_ready = False
# First player becomes host
if len(self.players) == 1:
player.is_host = True
self.host_id = player.id
return True
def remove_player(self, player_id: int) -> bool:
"""Remove player from room"""
if player_id in self.players:
player = self.players[player_id]
del self.players[player_id]
# Transfer host if needed
if player.is_host and self.players:
new_host = next(iter(self.players.values()))
new_host.is_host = True
self.host_id = new_host.id
# Reset player state
player.room_id = None
player.state = PlayerState.LOBBY
player.is_ready = False
player.is_host = False
return True
return False
def set_player_ready(self, player_id: int, ready: bool) -> bool:
"""Set player ready state"""
if player_id in self.players:
self.players[player_id].is_ready = ready
return True
return False
def can_start(self) -> bool:
"""Check if game can start"""
if len(self.players) < self.settings['min_players']:
return False
# All non-host players must be ready
for player in self.players.values():
if not player.is_host and not player.is_ready:
return False
return True
def start_game(self) -> None:
"""Start the game"""
self.state = RoomState.STARTING
for player in self.players.values():
player.state = PlayerState.IN_GAME
# Initialize game data
self.game_data = {
'start_time': time.time(),
'players': list(self.players.keys()),
'scores': {pid: 0 for pid in self.players.keys()}
}
# After a delay, mark as in progress
threading.Timer(3.0, self._set_in_progress).start()
def _set_in_progress(self) -> None:
self.state = RoomState.IN_PROGRESS
def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'host_id': self.host_id,
'state': self.state.value,
'players': [p.to_dict() for p in self.players.values()],
'settings': self.settings,
'player_count': len(self.players),
'can_start': self.can_start()
}
class LobbyServer:
def __init__(self, host: str = '0.0.0.0', port: int = 5555) -> None:
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.running = False
# Data structures
self.players = {} # {socket: Player}
self.rooms = {} # {room_id: GameRoom}
self.player_id_counter = 0
# Matchmaking queue
self.matchmaking_queue = []
def start(self) -> None:
"""Start the lobby server"""
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(10)
self.running = True
print(f"Lobby Server started on {self.host}:{self.port}")
# Start matchmaking thread
matchmaking_thread = threading.Thread(target=self.matchmaking_loop)
matchmaking_thread.daemon = True
matchmaking_thread.start()
# Accept connections
while self.running:
try:
client_socket, address = self.socket.accept()
print(f"New connection from {address}")
# Create player
self.player_id_counter += 1
player = Player(client_socket, address, self.player_id_counter)
self.players[client_socket] = player
# Handle in new thread
client_thread = threading.Thread(
target=self.handle_client,
args=(player,)
)
client_thread.daemon = True
client_thread.start()
# Send welcome message
self.send_to_player(player, {
'type': 'welcome',
'player_id': player.id,
'message': 'Welcome to the lobby!'
})
except Exception as e:
print(f"Error accepting connection: {e}")
def handle_client(self, player: Player) -> None:
"""Handle individual client messages"""
while self.running:
try:
data = player.socket.recv(4096).decode('utf-8')
if not data:
break
# Parse and process message
message = json.loads(data)
self.process_message(player, message)
except ConnectionResetError:
print(f"Player {player.id} disconnected")
break
except json.JSONDecodeError:
print(f"Invalid JSON from player {player.id}")
except Exception as e:
print(f"Error handling player {player.id}: {e}")
break
# Clean up on disconnect
self.remove_player(player)
def process_message(self, player: Player, message: Dict[str, Any]) -> None:
"""Process player message"""
msg_type = message.get('type')
if msg_type == 'set_name':
self.handle_set_name(player, message)
elif msg_type == 'get_rooms':
self.handle_get_rooms(player)
elif msg_type == 'create_room':
self.handle_create_room(player, message)
elif msg_type == 'join_room':
self.handle_join_room(player, message)
elif msg_type == 'leave_room':
self.handle_leave_room(player)
elif msg_type == 'set_ready':
self.handle_set_ready(player, message)
elif msg_type == 'start_game':
self.handle_start_game(player)
elif msg_type == 'room_chat':
self.handle_room_chat(player, message)
elif msg_type == 'quick_match':
self.handle_quick_match(player)
elif msg_type == 'update_room_settings':
self.handle_update_room_settings(player, message)
def handle_set_name(self, player: Player, message: Dict[str, Any]) -> None:
"""Handle name change"""
name = message.get('name', '').strip()[:20] # Max 20 chars
if name:
player.name = name
self.send_to_player(player, {
'type': 'name_changed',
'name': name
})
# Notify room if in one
if player.room_id:
self.broadcast_room_update(player.room_id)
def handle_get_rooms(self, player: Player) -> None:
"""Send list of available rooms"""
room_list = []
for room in self.rooms.values():
if room.state == RoomState.WAITING and not room.settings['is_private']:
room_list.append({
'id': room.id,
'name': room.settings['name'],
'players': len(room.players),
'max_players': room.settings['max_players'],
'game_mode': room.settings['game_mode'],
'host_name': self.get_player_by_id(room.host_id).name
})
self.send_to_player(player, {
'type': 'room_list',
'rooms': room_list
})
def handle_create_room(self, player: Player, message: Dict[str, Any]) -> None:
"""Handle room creation"""
if player.room_id:
self.send_error(player, "Already in a room")
return
# Generate room ID
room_id = self.generate_room_id()
# Get settings
settings = message.get('settings', {})
# Create room
room = GameRoom(room_id, player.id, settings)
room.add_player(player)
self.rooms[room_id] = room
# Send confirmation
self.send_to_player(player, {
'type': 'room_created',
'room': room.to_dict()
})
print(f"Room {room_id} created by player {player.id}")
def handle_join_room(self, player: Player, message: Dict[str, Any]) -> None:
"""Handle joining a room"""
if player.room_id:
self.send_error(player, "Already in a room")
return
room_id = message.get('room_id')
password = message.get('password')
if room_id not in self.rooms:
self.send_error(player, "Room not found")
return
room = self.rooms[room_id]
# Check password if private
if room.settings['is_private'] and room.settings['password'] != password:
self.send_error(player, "Invalid password")
return
# Try to add player
if room.add_player(player):
# Notify all players in room
self.broadcast_room_update(room_id)
# Send room info to joining player
self.send_to_player(player, {
'type': 'room_joined',
'room': room.to_dict()
})
# Notify others
self.broadcast_to_room(room_id, {
'type': 'player_joined',
'player': player.to_dict()
}, exclude=player)
print(f"Player {player.id} joined room {room_id}")
else:
self.send_error(player, "Room is full")
def handle_leave_room(self, player: Player) -> None:
"""Handle leaving a room"""
if not player.room_id:
self.send_error(player, "Not in a room")
return
room_id = player.room_id
room = self.rooms.get(room_id)
if room:
# Notify others before removing
self.broadcast_to_room(room_id, {
'type': 'player_left',
'player_id': player.id
}, exclude=player)
# Remove player
room.remove_player(player.id)
# Delete room if empty
if len(room.players) == 0:
del self.rooms[room_id]
print(f"Room {room_id} deleted (empty)")
else:
# Update remaining players
self.broadcast_room_update(room_id)
# Confirm to player
self.send_to_player(player, {
'type': 'room_left'
})
def handle_set_ready(self, player: Player, message: Dict[str, Any]) -> None:
"""Handle ready state change"""
if not player.room_id:
self.send_error(player, "Not in a room")
return
room = self.rooms.get(player.room_id)
if room:
ready = message.get('ready', False)
room.set_player_ready(player.id, ready)
# Notify all players
self.broadcast_room_update(player.room_id)
def handle_start_game(self, player: Player) -> None:
"""Handle game start request"""
if not player.room_id or not player.is_host:
self.send_error(player, "Not authorized to start game")
return
room = self.rooms.get(player.room_id)
if room and room.can_start():
room.start_game()
# Notify all players
self.broadcast_to_room(player.room_id, {
'type': 'game_starting',
'countdown': 3,
'game_data': room.game_data
})
print(f"Game starting in room {room.id}")
else:
self.send_error(player, "Cannot start game (check player count and ready states)")
def handle_room_chat(self, player: Player, message: Dict[str, Any]) -> None:
"""Handle room chat message"""
if not player.room_id:
return
chat_message = message.get('message', '').strip()[:200] # Max 200 chars
if chat_message:
self.broadcast_to_room(player.room_id, {
'type': 'room_chat',
'player_id': player.id,
'player_name': player.name,
'message': chat_message,
'timestamp': time.time()
})
def handle_quick_match(self, player: Player) -> None:
"""Add player to matchmaking queue"""
if player.room_id:
self.send_error(player, "Already in a room")
return
if player not in self.matchmaking_queue:
self.matchmaking_queue.append(player)
self.send_to_player(player, {
'type': 'matchmaking_started',
'queue_position': len(self.matchmaking_queue)
})
print(f"Player {player.id} entered matchmaking")
def handle_update_room_settings(self, player: Player, message: Dict[str, Any]) -> None:
"""Update room settings (host only)"""
if not player.room_id or not player.is_host:
self.send_error(player, "Not authorized")
return
room = self.rooms.get(player.room_id)
if room:
new_settings = message.get('settings', {})
# Update allowed settings
allowed_keys = ['name', 'max_players', 'game_mode', 'map', 'time_limit']
for key in allowed_keys:
if key in new_settings:
room.settings[key] = new_settings[key]
# Notify all players
self.broadcast_room_update(player.room_id)
def matchmaking_loop(self) -> None:
"""Background thread for matchmaking"""
while self.running:
time.sleep(2) # Check every 2 seconds
if len(self.matchmaking_queue) >= 2:
# Create match with first 2-4 players
match_size = min(4, len(self.matchmaking_queue))
players_to_match = self.matchmaking_queue[:match_size]
self.matchmaking_queue = self.matchmaking_queue[match_size:]
# Create room
room_id = self.generate_room_id()
host = players_to_match[0]
settings = {
'name': f"Quick Match {room_id}",
'max_players': 4,
'min_players': 2,
'game_mode': 'quick_match',
'map': 'random'
}
room = GameRoom(room_id, host.id, settings)
# Add all players
for player in players_to_match:
room.add_player(player)
self.rooms[room_id] = room
# Notify all matched players
for player in players_to_match:
self.send_to_player(player, {
'type': 'match_found',
'room': room.to_dict()
})
print(f"Quick match created: Room {room_id} with {match_size} players")
def broadcast_room_update(self, room_id: str) -> None:
"""Send room update to all players in room"""
room = self.rooms.get(room_id)
if room:
self.broadcast_to_room(room_id, {
'type': 'room_update',
'room': room.to_dict()
})
def broadcast_to_room(self, room_id: str, message: Dict[str, Any], exclude: Optional[Player] = None) -> None:
"""Broadcast message to all players in a room"""
room = self.rooms.get(room_id)
if room:
for player in room.players.values():
if player != exclude:
self.send_to_player(player, message)
def send_to_player(self, player: Player, message: Dict[str, Any]) -> None:
"""Send message to specific player"""
try:
json_msg = json.dumps(message)
player.socket.send(json_msg.encode('utf-8'))
except Exception as e:
print(f"Error sending to player {player.id}: {e}")
def send_error(self, player: Player, error_message: str) -> None:
"""Send error message to player"""
self.send_to_player(player, {
'type': 'error',
'message': error_message
})
def get_player_by_id(self, player_id: int) -> Optional[Player]:
"""Get player object by ID"""
for player in self.players.values():
if player.id == player_id:
return player
return None
def generate_room_id(self) -> str:
"""Generate unique room ID"""
while True:
room_id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
if room_id not in self.rooms:
return room_id
def remove_player(self, player: Player) -> None:
"""Remove player from server"""
# Leave room if in one
if player.room_id:
self.handle_leave_room(player)
# Remove from matchmaking
if player in self.matchmaking_queue:
self.matchmaking_queue.remove(player)
# Remove from players list
if player.socket in self.players:
del self.players[player.socket]
# Close socket
try:
player.socket.close()
except:
pass
print(f"Player {player.id} removed from server")
def stop(self) -> None:
"""Stop the server"""
self.running = False
self.socket.close()
# Run server
if __name__ == "__main__":
server = LobbyServer()
try:
server.start()
except KeyboardInterrupt:
print("\nShutting down server...")
server.stop()
Lobby Client Implementation
🎮 Game Client with Lobby UI
import pygame
import socket
import threading
import json
import time
class LobbyClient:
def __init__(self) -> None:
pygame.init()
self.screen = pygame.display.set_mode((1024, 768))
pygame.display.set_caption("Game Lobby")
self.clock = pygame.time.Clock()
self.font = pygame.font.Font(None, 24)
self.title_font = pygame.font.Font(None, 48)
# Network
self.socket = None
self.connected = False
self.player_id = None
self.player_name = "Player"
# State
self.current_screen = "main_menu" # main_menu, lobby, room, game
self.current_room = None
self.room_list = []
self.chat_messages = []
self.is_ready = False
# UI Elements
self.buttons = {}
self.input_fields = {}
self.selected_room = None
self.setup_ui()
def setup_ui(self) -> None:
"""Initialize UI elements"""
# Main Menu buttons
self.buttons['connect'] = pygame.Rect(400, 300, 200, 50)
self.buttons['quit'] = pygame.Rect(400, 400, 200, 50)
# Lobby buttons
self.buttons['create_room'] = pygame.Rect(50, 100, 150, 40)
self.buttons['join_room'] = pygame.Rect(220, 100, 150, 40)
self.buttons['quick_match'] = pygame.Rect(390, 100, 150, 40)
self.buttons['refresh'] = pygame.Rect(560, 100, 150, 40)
# Room buttons
self.buttons['ready'] = pygame.Rect(50, 600, 150, 50)
self.buttons['start_game'] = pygame.Rect(220, 600, 150, 50)
self.buttons['leave_room'] = pygame.Rect(390, 600, 150, 50)
# Input fields
self.input_fields['player_name'] = {
'rect': pygame.Rect(400, 200, 200, 30),
'text': self.player_name,
'active': False
}
self.input_fields['chat'] = {
'rect': pygame.Rect(50, 550, 400, 30),
'text': '',
'active': False
}
def connect_to_server(self, host: str = 'localhost', port: int = 5555) -> bool:
"""Connect to lobby server"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((host, port))
self.connected = True
# Start receive thread
receive_thread = threading.Thread(target=self.receive_loop)
receive_thread.daemon = True
receive_thread.start()
# Set player name
self.send_message({
'type': 'set_name',
'name': self.player_name
})
self.current_screen = "lobby"
return True
except Exception as e:
print(f"Failed to connect: {e}")
return False
def receive_loop(self) -> None:
"""Receive messages from server"""
while self.connected:
try:
data = self.socket.recv(4096).decode('utf-8')
if not data:
break
message = json.loads(data)
self.handle_server_message(message)
except Exception as e:
print(f"Receive error: {e}")
break
self.disconnect()
def handle_server_message(self, message: dict) -> None:
"""Process server messages"""
msg_type = message.get('type')
if msg_type == 'welcome':
self.player_id = message.get('player_id')
print(f"Connected as Player {self.player_id}")
elif msg_type == 'room_list':
self.room_list = message.get('rooms', [])
elif msg_type == 'room_created' or msg_type == 'room_joined':
self.current_room = message.get('room')
self.current_screen = "room"
self.chat_messages = []
elif msg_type == 'room_update':
self.current_room = message.get('room')
elif msg_type == 'room_left':
self.current_room = None
self.current_screen = "lobby"
self.is_ready = False
self.request_room_list()
elif msg_type == 'room_chat':
chat_msg = f"[{message.get('player_name')}]: {message.get('message')}"
self.chat_messages.append(chat_msg)
if len(self.chat_messages) > 10:
self.chat_messages.pop(0)
elif msg_type == 'player_joined':
player = message.get('player')
self.chat_messages.append(f"{player['name']} joined the room")
elif msg_type == 'player_left':
self.chat_messages.append(f"Player {message.get('player_id')} left")
elif msg_type == 'game_starting':
self.current_screen = "game"
self.chat_messages.append("Game starting in 3 seconds!")
elif msg_type == 'match_found':
self.current_room = message.get('room')
self.current_screen = "room"
self.chat_messages = ["Match found! Get ready!"]
elif msg_type == 'error':
print(f"Error: {message.get('message')}")
self.chat_messages.append(f"Error: {message.get('message')}")
def send_message(self, message: dict) -> None:
"""Send message to server"""
if self.connected and self.socket:
try:
json_msg = json.dumps(message)
self.socket.send(json_msg.encode('utf-8'))
except Exception as e:
print(f"Send error: {e}")
def request_room_list(self) -> None:
"""Request list of rooms"""
self.send_message({'type': 'get_rooms'})
def create_room(self) -> None:
"""Create a new room"""
self.send_message({
'type': 'create_room',
'settings': {
'name': f"{self.player_name}'s Room",
'max_players': 4,
'game_mode': 'classic'
}
})
def join_room(self, room_id: str) -> None:
"""Join a room"""
self.send_message({
'type': 'join_room',
'room_id': room_id
})
def leave_room(self) -> None:
"""Leave current room"""
self.send_message({'type': 'leave_room'})
def toggle_ready(self) -> None:
"""Toggle ready state"""
self.is_ready = not self.is_ready
self.send_message({
'type': 'set_ready',
'ready': self.is_ready
})
def start_game(self) -> None:
"""Request game start (host only)"""
self.send_message({'type': 'start_game'})
def quick_match(self) -> None:
"""Enter quick match queue"""
self.send_message({'type': 'quick_match'})
self.chat_messages.append("Searching for match...")
def send_chat(self, message: str) -> None:
"""Send chat message"""
if message.strip():
self.send_message({
'type': 'room_chat',
'message': message
})
def disconnect(self) -> None:
"""Disconnect from server"""
self.connected = False
if self.socket:
self.socket.close()
self.current_screen = "main_menu"
def draw_main_menu(self) -> None:
"""Draw main menu screen"""
self.screen.fill((30, 30, 40))
# Title
title = self.title_font.render("Game Lobby", True, (255, 255, 255))
title_rect = title.get_rect(center=(512, 100))
self.screen.blit(title, title_rect)
# Name input
self.draw_input_field('player_name', "Enter Name:")
# Buttons
self.draw_button('connect', "Connect to Server", (0, 150, 255))
self.draw_button('quit', "Quit", (255, 50, 50))
def draw_lobby(self) -> None:
"""Draw lobby screen"""
self.screen.fill((40, 40, 50))
# Title
title = self.font.render(f"Lobby - {self.player_name}", True, (255, 255, 255))
self.screen.blit(title, (50, 30))
# Buttons
self.draw_button('create_room', "Create Room", (0, 200, 100))
self.draw_button('join_room', "Join Selected", (0, 150, 200))
self.draw_button('quick_match', "Quick Match", (200, 150, 0))
self.draw_button('refresh', "Refresh", (150, 150, 150))
# Room list
y = 160
self.screen.blit(self.font.render("Available Rooms:", True, (255, 255, 255)), (50, y))
y += 30
for i, room in enumerate(self.room_list):
color = (100, 200, 100) if room == self.selected_room else (200, 200, 200)
room_text = f"{room['name']} - {room['players']}/{room['max_players']} - Host: {room['host_name']}"
text = self.font.render(room_text, True, color)
# Clickable area
text_rect = pygame.Rect(50, y, 700, 25)
if text_rect.collidepoint(pygame.mouse.get_pos()):
pygame.draw.rect(self.screen, (80, 80, 100), text_rect)
self.screen.blit(text, (50, y))
y += 30
def draw_room(self) -> None:
"""Draw room screen"""
self.screen.fill((45, 45, 55))
if not self.current_room:
return
# Room title
room_name = self.current_room['settings']['name']
title = self.title_font.render(room_name, True, (255, 255, 255))
self.screen.blit(title, (50, 30))
# Room info
info_text = f"Room ID: {self.current_room['id']} | Mode: {self.current_room['settings']['game_mode']}"
self.screen.blit(self.font.render(info_text, True, (200, 200, 200)), (50, 90))
# Players
y = 140
self.screen.blit(self.font.render("Players:", True, (255, 255, 255)), (50, y))
y += 30
for player in self.current_room['players']:
status = "🟢" if player['is_ready'] else "🔴"
host_tag = " [HOST]" if player['is_host'] else ""
player_text = f"{status} {player['name']}{host_tag}"
color = (255, 215, 0) if player['id'] == self.player_id else (200, 200, 200)
self.screen.blit(self.font.render(player_text, True, color), (70, y))
y += 30
# Chat area
chat_y = 350
pygame.draw.rect(self.screen, (30, 30, 40), (50, chat_y, 700, 180))
self.screen.blit(self.font.render("Chat:", True, (255, 255, 255)), (60, chat_y + 5))
for i, msg in enumerate(self.chat_messages[-8:]):
self.screen.blit(self.font.render(msg, True, (200, 200, 200)),
(60, chat_y + 30 + i * 20))
# Chat input
self.draw_input_field('chat', "Type message...")
# Buttons
ready_text = "Not Ready" if self.is_ready else "Ready"
ready_color = (200, 100, 0) if self.is_ready else (0, 200, 100)
self.draw_button('ready', ready_text, ready_color)
# Show start button only for host
for player in self.current_room['players']:
if player['id'] == self.player_id and player['is_host']:
can_start = self.current_room.get('can_start', False)
start_color = (0, 255, 0) if can_start else (100, 100, 100)
self.draw_button('start_game', "Start Game", start_color)
break
self.draw_button('leave_room', "Leave Room", (200, 50, 50))
def draw_game(self) -> None:
"""Draw game screen (placeholder)"""
self.screen.fill((20, 50, 20))
# Game starting message
text = self.title_font.render("Game Starting!", True, (255, 255, 255))
text_rect = text.get_rect(center=(512, 384))
self.screen.blit(text, text_rect)
# This is where the actual game would start
# You would transition to your game scene here
def draw_button(self, button_id: str, text: str, color: tuple) -> None:
"""Draw a button"""
if button_id not in self.buttons:
return
rect = self.buttons[button_id]
# Hover effect
mouse_pos = pygame.mouse.get_pos()
if rect.collidepoint(mouse_pos):
color = tuple(min(255, c + 30) for c in color)
pygame.draw.rect(self.screen, color, rect)
pygame.draw.rect(self.screen, (255, 255, 255), rect, 2)
text_surface = self.font.render(text, True, (255, 255, 255))
text_rect = text_surface.get_rect(center=rect.center)
self.screen.blit(text_surface, text_rect)
def draw_input_field(self, field_id: str, label: str) -> None:
"""Draw an input field"""
if field_id not in self.input_fields:
return
field = self.input_fields[field_id]
# Label
label_y = field['rect'].y - 25
self.screen.blit(self.font.render(label, True, (200, 200, 200)),
(field['rect'].x, label_y))
# Field background
color = (100, 100, 120) if field['active'] else (60, 60, 70)
pygame.draw.rect(self.screen, color, field['rect'])
pygame.draw.rect(self.screen, (200, 200, 200), field['rect'], 2)
# Text
text_surface = self.font.render(field['text'], True, (255, 255, 255))
self.screen.blit(text_surface, (field['rect'].x + 5, field['rect'].y + 5))
# Cursor
if field['active']:
cursor_x = field['rect'].x + 5 + text_surface.get_width()
pygame.draw.line(self.screen, (255, 255, 255),
(cursor_x, field['rect'].y + 5),
(cursor_x, field['rect'].y + 25), 2)
def handle_click(self, pos: tuple) -> bool:
"""Handle mouse click"""
if self.current_screen == "main_menu":
if self.buttons['connect'].collidepoint(pos):
self.player_name = self.input_fields['player_name']['text']
self.connect_to_server()
elif self.buttons['quit'].collidepoint(pos):
return False
# Check input field
for field_id, field in self.input_fields.items():
field['active'] = field['rect'].collidepoint(pos)
elif self.current_screen == "lobby":
if self.buttons['create_room'].collidepoint(pos):
self.create_room()
elif self.buttons['join_room'].collidepoint(pos):
if self.selected_room:
self.join_room(self.selected_room['id'])
elif self.buttons['quick_match'].collidepoint(pos):
self.quick_match()
elif self.buttons['refresh'].collidepoint(pos):
self.request_room_list()
# Check room selection
y = 190
for room in self.room_list:
room_rect = pygame.Rect(50, y, 700, 25)
if room_rect.collidepoint(pos):
self.selected_room = room
y += 30
elif self.current_screen == "room":
if self.buttons['ready'].collidepoint(pos):
self.toggle_ready()
elif self.buttons['start_game'].collidepoint(pos):
self.start_game()
elif self.buttons['leave_room'].collidepoint(pos):
self.leave_room()
# Check input field
for field_id, field in self.input_fields.items():
field['active'] = field['rect'].collidepoint(pos)
return True
def handle_key(self, event: pygame.event.Event) -> None:
"""Handle keyboard input"""
for field_id, field in self.input_fields.items():
if field['active']:
if event.key == pygame.K_RETURN:
if field_id == 'chat' and self.current_screen == "room":
self.send_chat(field['text'])
field['text'] = ''
field['active'] = False
elif event.key == pygame.K_BACKSPACE:
field['text'] = field['text'][:-1]
else:
field['text'] += event.unicode
def run(self) -> None:
"""Main game loop"""
running = True
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
elif event.type == pygame.MOUSEBUTTONDOWN:
if not self.handle_click(event.pos):
running = False
elif event.type == pygame.KEYDOWN:
self.handle_key(event)
# Draw current screen
if self.current_screen == "main_menu":
self.draw_main_menu()
elif self.current_screen == "lobby":
self.draw_lobby()
elif self.current_screen == "room":
self.draw_room()
elif self.current_screen == "game":
self.draw_game()
pygame.display.flip()
self.clock.tick(60)
# Clean up
if self.connected:
self.disconnect()
pygame.quit()
if __name__ == "__main__":
client = LobbyClient()
client.run()
Room Management Features
🚪 Advanced Room Features
# Additional features for room management
from typing import Optional
class RoomManager:
"""Extended room management features"""
def __init__(self) -> None:
self.rooms: dict = {}
self.room_templates: dict = self.load_templates()
def load_templates(self) -> dict:
"""Load predefined room templates"""
return {
'competitive': {
'max_players': 2,
'min_players': 2,
'game_mode': 'ranked',
'time_limit': 600,
'allow_spectators': True,
'skill_based_matchmaking': True
},
'casual': {
'max_players': 8,
'min_players': 2,
'game_mode': 'casual',
'time_limit': 300,
'allow_spectators': False,
'skill_based_matchmaking': False
},
'tournament': {
'max_players': 16,
'min_players': 16,
'game_mode': 'tournament',
'time_limit': 900,
'allow_spectators': True,
'bracket_type': 'single_elimination'
}
}
def create_room_from_template(self, template_name: str, host_id: int) -> Optional[GameRoom]:
"""Create room using template"""
if template_name not in self.room_templates:
return None
template = self.room_templates[template_name]
room_id = self.generate_room_id()
room = GameRoom(room_id, host_id, template.copy())
return room
def find_suitable_room(self, player: Player, criteria: dict) -> Optional[GameRoom]:
"""Find room matching criteria"""
suitable_rooms = []
for room in self.rooms.values():
if room.state != RoomState.WAITING:
continue
# Check criteria
if criteria.get('game_mode') and room.settings['game_mode'] != criteria['game_mode']:
continue
if criteria.get('skill_range'):
avg_skill = self.calculate_room_skill(room)
if abs(avg_skill - player.stats['rating']) > criteria['skill_range']:
continue
if len(room.players) < room.settings['max_players']:
suitable_rooms.append(room)
# Sort by best match (fullest rooms first)
suitable_rooms.sort(key=lambda r: len(r.players), reverse=True)
return suitable_rooms[0] if suitable_rooms else None
def calculate_room_skill(self, room: GameRoom) -> float:
"""Calculate average skill rating in room"""
if not room.players:
return 1000
total_rating = sum(p.stats['rating'] for p in room.players.values())
return total_rating / len(room.players)
def balance_teams(self, room: GameRoom) -> dict:
"""Balance teams based on skill rating"""
players = list(room.players.values())
players.sort(key=lambda p: p.stats['rating'], reverse=True)
team_a = []
team_b = []
# Snake draft for balance
for i, player in enumerate(players):
if i % 4 in [0, 3]:
team_a.append(player)
else:
team_b.append(player)
return {'team_a': team_a, 'team_b': team_b}
class SpectatorSystem:
"""Handle spectators in rooms"""
def __init__(self, room: GameRoom) -> None:
self.room = room
self.spectators: dict = {}
self.max_spectators = 10
def add_spectator(self, player: Player) -> bool:
"""Add spectator to room"""
if len(self.spectators) >= self.max_spectators:
return False
self.spectators[player.id] = player
player.state = PlayerState.SPECTATING
return True
def remove_spectator(self, player_id: int) -> bool:
"""Remove spectator"""
if player_id in self.spectators:
del self.spectators[player_id]
return True
return False
def broadcast_to_spectators(self, message: dict) -> None:
"""Send message to all spectators"""
for spectator in self.spectators.values():
# Send message to spectator
pass
Matchmaking Algorithms
🎯 Smart Matchmaking
class Matchmaker:
"""Advanced matchmaking system"""
def __init__(self) -> None:
self.queue: list = []
self.matches: list = []
def add_to_queue(self, player: Player) -> None:
"""Add player to matchmaking queue"""
self.queue.append({
'player': player,
'queue_time': time.time(),
'search_range': 100 # Initial skill range
})
def find_matches(self) -> None:
"""Find suitable matches"""
while len(self.queue) >= 2:
# Sort by wait time (longest first)
self.queue.sort(key=lambda x: x['queue_time'])
base_player = self.queue[0]
match_found = False
# Expand search range over time
wait_time = time.time() - base_player['queue_time']
base_player['search_range'] = 100 + (wait_time * 10) # +10 rating per second
# Find compatible players
compatible = []
for other in self.queue[1:]:
if self.are_compatible(base_player, other):
compatible.append(other)
if compatible:
# Create match with best compatible players
match_size = min(4, len(compatible) + 1)
match_players = [base_player] + compatible[:match_size-1]
# Remove from queue
for mp in match_players:
self.queue.remove(mp)
# Create match
self.create_match(match_players)
match_found = True
if not match_found:
# No match found, wait
break
def are_compatible(self, player1: dict, player2: dict) -> bool:
"""Check if two players are compatible"""
p1_rating = player1['player'].stats['rating']
p2_rating = player2['player'].stats['rating']
# Check skill difference
skill_diff = abs(p1_rating - p2_rating)
max_diff = max(player1['search_range'], player2['search_range'])
return skill_diff <= max_diff
def create_match(self, players: list) -> dict:
"""Create a match from compatible players"""
match = {
'id': self.generate_match_id(),
'players': [p['player'] for p in players],
'avg_rating': sum(p['player'].stats['rating'] for p in players) / len(players),
'created_at': time.time()
}
self.matches.append(match)
return match
def generate_match_id(self) -> str:
"""Generate unique match ID"""
return f"match_{int(time.time() * 1000)}"
Best Practices
✨ Lobby System Best Practices
- Room Codes: Use short, memorable codes for private rooms
- Ready System: Require explicit ready confirmation
- Host Migration: Automatically transfer host if they leave
- Idle Timeout: Remove inactive players
- Room Persistence: Keep rooms alive briefly after disconnect
- Skill Matching: Balance teams for fair games
- Queue Times: Expand search criteria over time
- Chat Moderation: Filter inappropriate content
- Spectator Mode: Allow watching ongoing games
- Rejoin System: Let players reconnect to games
Security Considerations
🔒 Lobby Security
- Room Passwords: Hash passwords, never store plain text
- Rate Limiting: Prevent spam room creation
- Input Validation: Sanitize all user input
- Permission Checks: Verify host privileges
- Anti-Cheat: Validate game results server-side
- Privacy: Don't expose player IPs
- Report System: Allow reporting bad behavior
Key Takeaways
- 🏛️ Lobbies are essential for multiplayer game organization
- 🚪 Room management includes creation, joining, and settings
- ✅ Ready systems ensure all players are prepared
- 👑 Host migration keeps games running smoothly
- 🎯 Matchmaking should balance skill and wait time
- 💬 Pre-game chat builds community
- 🔄 Handle disconnections gracefully
- 📊 Track player statistics for better matchmaking
🏋️♂️ Practice Exercise
🏋️♂️ Exercise 1: Lobby State Machine + Ready-Up Gate + Host Migration in One Pygame Window
Objective: Build a single-process pygame demo (~85 lines) that exercises three pillar lobby patterns from this lesson in one runnable program: the explicit PlayerState enum driving each player's UI cell color (LOBBY → IN_ROOM → READY → IN_GAME), the host-excluded can_start() ready gate (every non-host player must be ready while the host is implicitly ready — preventing the deadlock where the host both needs to tick is_ready AND press start), and host migration on host-leave (the next player in the dict inherits the crown rather than collapsing the room). Four simulated players (Alice / Bob / Carol / Dave) are wired to keys 1/2/3/4 for ready toggle, L to remove the current host (triggers migration), S to start (gated on can_start()), R to reset.
Instructions:
- Define
PlayerStateas an Enum with values LOBBY, IN_ROOM, READY, IN_GAME. Define aPlayerdataclass withid,name,state,is_ready,is_host. - Define a
Roomclass holdingplayers(dict id → Player),host_id,MIN_PLAYERS=2,MAX_PLAYERS=4. Implementcan_start(): return False iflen(players) < MIN_PLAYERS, return False if any non-host player hasis_ready=False, otherwise True — the host is implicitly skipped. - Implement
remove_player(player_id): del the player from the dict; if the removed player was the host AND the dict is non-empty, transfer host tonext(iter(self.players.values()))by setting that player'sis_host=Trueand updatingself.host_id. - Pre-populate four players on launch with Alice as host. Keys
1/2/3/4toggleis_readyfor the corresponding player AND walk that player's state IN_ROOM ↔ READY in lockstep. KeyLremoves the current host (triggering migration). KeyScallsstart_game()guarded bycan_start()— if True, walk every player's state to IN_GAME and run a 3-second countdown viadt-based subtraction. KeyRresets to fresh four-player lobby. - Render four player cells colored by state (gray=LOBBY, blue=IN_ROOM, green=READY, gold=IN_GAME) with [HOST] tag on the host. HUD shows live
can_start()boolean, room state, and any countdown remaining. Verify by pressingLmid-lobby: the [HOST] tag jumps to the next player and the lobby keeps running rather than collapsing.
💡 Hint
The host migration is one expression: new_host = next(iter(self.players.values())); new_host.is_host = True; self.host_id = new_host.id. Order matters — del the leaving player from the dict FIRST, then check if the dict is non-empty before iterating. The ready gate's host-exclusion is one conditional: if not player.is_host and not player.is_ready: return False; reversing the conjunction or dropping the not player.is_host guard recreates the deadlock.
✅ Example Solution
import pygame
from dataclasses import dataclass
from enum import Enum
class PlayerState(Enum):
LOBBY = 'lobby'
IN_ROOM = 'in_room'
READY = 'ready'
IN_GAME = 'in_game'
@dataclass
class Player:
id: int
name: str
state: PlayerState = PlayerState.IN_ROOM
is_ready: bool = False
is_host: bool = False
class Room:
MIN_PLAYERS = 2
MAX_PLAYERS = 4
def __init__(self) -> None:
self.players: dict = {}
self.host_id = None
self.state: str = 'WAITING'
self.countdown: float = 0.0
def add_player(self, p: Player) -> bool:
if len(self.players) >= self.MAX_PLAYERS:
return False
self.players[p.id] = p
if self.host_id is None:
p.is_host = True
self.host_id = p.id
return True
def remove_player(self, pid: int) -> None:
if pid not in self.players:
return
was_host = self.players[pid].is_host
del self.players[pid]
if was_host and self.players:
new_host = next(iter(self.players.values()))
new_host.is_host = True
self.host_id = new_host.id
elif not self.players:
self.host_id = None
def toggle_ready(self, pid: int) -> None:
p = self.players.get(pid)
if p is None or p.is_host:
return
p.is_ready = not p.is_ready
p.state = PlayerState.READY if p.is_ready else PlayerState.IN_ROOM
def can_start(self) -> bool:
if len(self.players) < self.MIN_PLAYERS:
return False
for p in self.players.values():
if not p.is_host and not p.is_ready:
return False
return True
def start_game(self) -> bool:
if not self.can_start():
return False
self.state = 'STARTING'
self.countdown = 3.0
return True
def update(self, dt: float) -> None:
if self.state == 'STARTING':
self.countdown -= dt
if self.countdown <= 0:
self.state = 'IN_PROGRESS'
for p in self.players.values():
p.state = PlayerState.IN_GAME
def reset(room: Room) -> None:
room.players.clear()
room.host_id = None
room.state = 'WAITING'
room.countdown = 0.0
for i, name in enumerate(['Alice', 'Bob', 'Carol', 'Dave']):
room.add_player(Player(i + 1, name))
pygame.init()
screen = pygame.display.set_mode((640, 360))
clock = pygame.time.Clock()
font = pygame.font.SysFont(None, 22)
COLORS = {PlayerState.LOBBY: (96, 96, 96), PlayerState.IN_ROOM: (60, 110, 200),
PlayerState.READY: (60, 180, 80), PlayerState.IN_GAME: (220, 180, 40)}
room = Room()
reset(room)
running = True
while running:
dt = clock.tick(60) / 1000.0
for ev in pygame.event.get():
if ev.type == pygame.QUIT:
running = False
elif ev.type == pygame.KEYDOWN:
if ev.key in (pygame.K_1, pygame.K_2, pygame.K_3, pygame.K_4):
room.toggle_ready(ev.key - pygame.K_0)
elif ev.key == pygame.K_l and room.host_id is not None:
room.remove_player(room.host_id)
elif ev.key == pygame.K_s:
room.start_game()
elif ev.key == pygame.K_r:
reset(room)
room.update(dt)
screen.fill((22, 24, 28))
for i, p in enumerate(sorted(room.players.values(), key=lambda x: x.id)):
rect = pygame.Rect(20 + i * 150, 60, 140, 100)
pygame.draw.rect(screen, COLORS[p.state], rect, border_radius=8)
label = f"{p.name}{' [HOST]' if p.is_host else ''}"
screen.blit(font.render(label, True, (255, 255, 255)), (rect.x + 8, rect.y + 8))
screen.blit(font.render(p.state.value, True, (240, 240, 240)), (rect.x + 8, rect.y + 36))
screen.blit(font.render(f"ready={p.is_ready}", True, (240, 240, 240)), (rect.x + 8, rect.y + 64))
hud = [f"Room: {room.state} can_start: {room.can_start()} countdown: {room.countdown:.1f}",
"Keys: 1/2/3/4=toggle ready L=remove host S=start R=reset"]
for i, line in enumerate(hud):
screen.blit(font.render(line, True, (220, 220, 220)), (20, 200 + i * 24))
pygame.display.flip()
pygame.quit()
🎯 Quick Quiz
Question 1: In the lesson's can_start() method, why does the loop for player in self.players.values(): if not player.is_host and not player.is_ready: return False skip the host's ready state when checking whether the game can start?
Question 2: When a player marked is_host=True leaves the lobby, why does the lesson's remove_player() method transfer the host role to next(iter(self.players.values())) instead of just deleting the room and disconnecting all remaining players?
Question 3: In the lesson's Matchmaker.find_matches(), why does the code expand search_range = 100 + (wait_time * 10) per second of queue time rather than keeping the search range fixed at the initial value?
What's Next?
Congratulations! You've completed the Multiplayer Basics section. You now have all the tools to create networked multiplayer games!