系统设计实战:高并发系统的架构设计原则
系统设计实战:高并发系统的架构设计原则
设计高并发系统需要考虑多个维度。本文将分享系统设计的核心原则和实战经验。
系统设计原则
1. 可扩展性(Scalability)
水平扩展 vs 垂直扩展
- 水平扩展:增加服务器数量(推荐)
- 垂直扩展:提升单机性能(有限)
无状态设计
# ❌ 有状态:会话存储在服务器
class SessionStore:
sessions = {} # 存储在内存中
# ✅ 无状态:使用外部存储
class StatelessService:
def __init__(self, redis_client):
self.redis = redis_client
def get_session(self, session_id):
return self.redis.get(f"session:{session_id}")
2. 可用性(Availability)
冗余设计
用户请求
↓
┌──────────┐
│ 负载均衡器 │
└──────────┘
↓
┌──────┬──────┬──────┐
│服务1 │服务2 │服务3 │ (多副本)
└──────┴──────┴──────┘
故障转移
class FailoverService:
def __init__(self, primary, secondary):
self.primary = primary
self.secondary = secondary
def call(self, request):
try:
return self.primary.handle(request)
except Exception:
# 自动故障转移
return self.secondary.handle(request)
3. 一致性(Consistency)
CAP定理
- Consistency(一致性)
- Availability(可用性)
- Partition tolerance(分区容错性)
只能同时满足两个。
最终一致性
# 使用消息队列实现最终一致性
def update_user_profile(user_id, data):
# 1. 更新主数据库
db.update(user_id, data)
# 2. 发送消息到队列
queue.publish('user.updated', {
'user_id': user_id,
'data': data
})
# 3. 异步更新缓存和其他服务
# (通过消息队列消费者)
核心组件设计
负载均衡
算法选择
# 轮询(Round Robin)
def round_robin(servers):
current = 0
while True:
yield servers[current]
current = (current + 1) % len(servers)
# 加权轮询(Weighted Round Robin)
def weighted_round_robin(servers_with_weights):
total_weight = sum(w for _, w in servers_with_weights)
current_weight = 0
while True:
current_weight = (current_weight + 1) % total_weight
cumulative = 0
for server, weight in servers_with_weights:
cumulative += weight
if current_weight < cumulative:
yield server
break
# 最少连接(Least Connections)
def least_connections(servers):
return min(servers, key=lambda s: s.active_connections)
缓存策略
缓存层级
L1: 本地缓存 (内存) - 最快,容量小
↓
L2: 分布式缓存 (Redis) - 快,容量中等
↓
L3: 数据库 - 慢,容量大
缓存模式
# Cache-Aside模式
def get_user(user_id):
# 1. 先查缓存
user = cache.get(f"user:{user_id}")
if user:
return user
# 2. 缓存未命中,查数据库
user = db.get_user(user_id)
# 3. 写入缓存
cache.set(f"user:{user_id}", user, ttl=3600)
return user
# Write-Through模式
def update_user(user_id, data):
# 1. 更新数据库
db.update_user(user_id, data)
# 2. 更新缓存
cache.set(f"user:{user_id}", data)
return data
数据库设计
读写分离
class DatabaseRouter:
def __init__(self):
self.master = MasterDB()
self.slaves = [SlaveDB1(), SlaveDB2()]
def read(self, query):
# 从从库读取
slave = random.choice(self.slaves)
return slave.execute(query)
def write(self, query):
# 写入主库
return self.master.execute(query)
分库分表
def shard_key(user_id):
"""根据用户ID计算分片"""
return user_id % 10 # 10个分片
def get_user(user_id):
shard = shard_key(user_id)
db = get_shard_db(shard)
return db.get_user(user_id)
高并发处理
限流策略
from collections import deque
import time
class RateLimiter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
def is_allowed(self):
now = time.time()
# 移除过期请求
while self.requests and self.requests[0] < now - self.window_seconds:
self.requests.popleft()
# 检查是否超过限制
if len(self.requests) >= self.max_requests:
return False
# 记录当前请求
self.requests.append(now)
return True
异步处理
import asyncio
from queue import Queue
class AsyncProcessor:
def __init__(self, workers=10):
self.queue = Queue()
self.workers = workers
async def process_task(self, task):
# 异步处理任务
result = await async_operation(task)
return result
async def worker(self):
while True:
task = await self.queue.get()
if task is None:
break
await self.process_task(task)
self.queue.task_done()
async def start(self):
# 启动工作线程
workers = [self.worker() for _ in range(self.workers)]
await asyncio.gather(*workers)
消息队列
生产者-消费者模式
import redis
import json
class MessageQueue:
def __init__(self, redis_client):
self.redis = redis_client
self.queue_name = "task_queue"
def produce(self, task):
"""生产者:发送任务"""
self.redis.lpush(
self.queue_name,
json.dumps(task)
)
def consume(self):
"""消费者:处理任务"""
while True:
# 阻塞式获取任务
task_json = self.redis.brpop(self.queue_name, timeout=1)
if task_json:
task = json.loads(task_json[1])
self.process_task(task)
def process_task(self, task):
# 处理任务逻辑
pass
监控和告警
关键指标
class MetricsCollector:
def __init__(self):
self.metrics = {
'request_count': 0,
'error_count': 0,
'response_time': [],
}
def record_request(self, duration, success):
self.metrics['request_count'] += 1
self.metrics['response_time'].append(duration)
if not success:
self.metrics['error_count'] += 1
# 计算P99延迟
if len(self.metrics['response_time']) > 100:
sorted_times = sorted(self.metrics['response_time'])
p99_index = int(len(sorted_times) * 0.99)
p99_latency = sorted_times[p99_index]
# 告警
if p99_latency > 1000: # 1秒
self.alert("P99 latency too high")
设计模式
断路器模式
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
self.last_failure_time = None
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
raise e
最佳实践
- 设计可扩展的架构:支持水平扩展
- 实现冗余和故障转移:提高可用性
- 合理使用缓存:减少数据库压力
- 异步处理:提升吞吐量
- 监控和告警:及时发现问题
总结
系统设计是一个综合性的工程,需要平衡性能、可用性、一致性等多个方面。通过合理应用这些原则和模式,可以构建稳定、高效的高并发系统。
希望这些经验对正在设计系统的开发者有所帮助!