编辑
2025-09-20
技术
00
请注意,本文编写于 86 天前,最后修改于 86 天前,其中某些信息可能已经过时。

目录

目录
项目概览
数据结构与状态
核心要点
RPC 消息定义
原理解析
节点连接与消息处理
关键实现
核心逻辑
选举逻辑
实现细节
Leader 心跳
选举超时检测
主函数与节点启动
总结与优化思路
优点
局限
可扩展方向
完整代码

本篇文章深入解析一个用 Rust 和 Tokio 实现的简化版 Raft 共识算法,逐块分析源码的实现原理和设计思想。适合学习分布式系统、异步编程和 Rust 并发的开发者。

AI写的文章,别问了,自己有点累了今天


目录

  1. 项目概览
  2. 数据结构与状态
  3. RPC 消息定义
  4. 节点连接与消息处理
  5. 选举逻辑
  6. Leader 心跳
  7. 选举超时检测
  8. 主函数与节点启动
  9. 总结与优化思路

项目概览

本项目实现了 Raft 算法的核心机制:

  • Follower / Candidate / Leader 三种角色
  • Leader 选举和心跳
  • 节点间 TCP 异步通信(JSON 序列化)
  • 简化日志:只实现选举和心跳,不包含日志复制

它使用了:

  • Tokio 异步运行时
  • Arc<Mutex<Node>> 共享节点状态
  • StdRng 保证异步安全的随机数生成
  • serde 用于 RPC 消息序列化

数据结构与状态

rust
#[derive(Debug, PartialEq, Eq, Clone, Copy)] enum Role { Follower, Candidate, Leader } struct Node { id: String, addr: SocketAddr, peers: Vec<SocketAddr>, current_term: u64, voted_for: Option<String>, role: Role, votes_received: usize, }

核心要点

  • Node 保存每个节点的状态,包括网络地址、同伴列表以及 Raft 核心状态。
  • current_term:当前任期号
  • voted_for:本轮任期投票对象
  • role:节点角色
  • votes_received:Candidate 收到的票数

共享节点状态通过 Arc<Mutex<Node>> 实现,保证多任务异步访问安全。


RPC 消息定义

rust
#[derive(Debug, Clone, Serialize, Deserialize)] enum Rpc { RequestVote { term: u64, candidate_id: String }, RequestVoteResponse { term: u64, vote_granted: bool }, AppendEntries { term: u64, leader_id: String }, AppendEntriesResponse { term: u64, success: bool }, }

原理解析

  • RequestVote / RequestVoteResponse 用于选举
  • AppendEntries / AppendEntriesResponse 用于 Leader 心跳
  • serde 自动序列化到 JSON,方便网络传输

这里简化了日志条目,不包含 log replication。


节点连接与消息处理

rust
async fn handle_connection(mut stream: TcpStream, node: SharedNode)

关键实现

  • TCP 连接循环读取消息
  • 每条消息前 4 字节表示长度
  • 读取 payload 并 serde_json 反序列化
  • 调用 process_rpc 处理具体逻辑
rust
async fn process_rpc(rpc: Rpc, stream: &mut TcpStream, node: SharedNode)

核心逻辑

  • RequestVote:判断 term、是否已投票,返回投票结果
  • RequestVoteResponse:Candidate 更新票数,如果票数达到多数则成为 Leader
  • AppendEntries:Follower 收到心跳,重置选举状态
  • AppendEntriesResponse:Leader 可用来更新 matchIndex(本 demo 未实现)

注意:每次访问节点状态都需要 await node.lock(),保证状态安全。


选举逻辑

rust
async fn start_election(node: SharedNode)

实现细节

  1. 将自身角色改为 Candidate
  2. 当前任期加一,投自己一票
  3. 生成 RequestVote RPC 发送给 peers
  4. 使用 StdRng 生成随机 jitter,避免同时发起选举

使用 StdRng 而非 thread_rng 是因为 thread_rng 不可 Send,在 tokio::spawn 的异步块里会导致编译错误。


Leader 心跳

rust
async fn leader_heartbeat_loop(node: SharedNode)
  • 仅 Leader 节点发送心跳
  • 每个 peer 都发送 AppendEntries 消息
  • 心跳间隔为 150ms
  • 异步 spawn,保证并发发送不阻塞主循环

这保证了 Follower 能及时感知 Leader 存在,防止选举冲突。


选举超时检测

rust
async fn election_timeout_loop(node: SharedNode)
  • 每个节点维护随机选举超时时间(300~500ms)
  • 超时触发新一轮选举
  • Leader 存在时会重置计时器
  • 使用 StdRng 生成异步安全随机数

核心思想:Raft 利用随机化选举超时避免 split vote


主函数与节点启动

rust
#[tokio::main] async fn main() -> anyhow::Result<()> { ... }
  • 解析命令行参数(监听地址 + peer 列表)

  • 创建节点状态 Arc<Mutex<Node>>

  • 启动三个核心异步任务:

    1. listener_task:TCP 接收连接
    2. leader_heartbeat_loop:Leader 心跳
    3. election_timeout_loop:选举超时
  • 主循环每 2 秒打印节点状态

使用 tokio::spawn 并发运行异步任务,实现高性能非阻塞网络节点。


总结与优化思路

优点

  • 纯 Rust + Tokio 实现,线程安全且高性能
  • 简化版 Raft,核心选举逻辑完整
  • 异步 TCP RPC,适合分布式模拟实验

局限

  • 未实现日志复制、持久化、快照
  • 没有处理网络分区和节点崩溃恢复
  • 日志、RPC 可靠性未处理

可扩展方向

  1. 加入日志条目 replication,实现完整 Raft
  2. 持久化节点状态到磁盘
  3. 支持网络分区与恢复
  4. 优化 RPC,支持异步批量发送

完整代码

cargo.toml

toml
[package] name = "raft" version = "0.1.0" edition = "2024" [dependencies] tokio = { version = "1.34", features = ["full"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" rand = "0.8" uuid = { version = "1.4", features = ["v4"] } log = "0.4" env_logger = "0.10" anyhow = "1.0.100"

main.rs

rust
use serde::{Deserialize, Serialize}; use std::{env, net::SocketAddr, sync::Arc, time::Duration}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, sync::Mutex, time, }; use rand::{rngs::StdRng, Rng, SeedableRng}; use uuid::Uuid; use log::{info, warn, debug}; #[derive(Debug, Clone, Serialize, Deserialize)] enum Rpc { RequestVote { term: u64, candidate_id: String, }, RequestVoteResponse { term: u64, vote_granted: bool, }, AppendEntries { term: u64, leader_id: String, // simplified: no log entries in this demo }, AppendEntriesResponse { term: u64, success: bool, }, } #[derive(Debug, PartialEq, Eq, Clone, Copy)] enum Role { Follower, Candidate, Leader, } struct Node { id: String, addr: SocketAddr, peers: Vec<SocketAddr>, // volatile state current_term: u64, voted_for: Option<String>, role: Role, votes_received: usize, } type SharedNode = Arc<Mutex<Node>>; impl Node { fn new(id: String, addr: SocketAddr, peers: Vec<SocketAddr>) -> Self { Self { id, addr, peers, current_term: 0, voted_for: None, role: Role::Follower, votes_received: 0, } } } async fn handle_connection(mut stream: TcpStream, node: SharedNode) { let mut buf = Vec::new(); loop { let mut len_buf = [0u8; 4]; if let Err(e) = stream.read_exact(&mut len_buf).await { debug!("read_exact len failed: {}", e); break; } let len = u32::from_be_bytes(len_buf) as usize; buf.resize(len, 0); if let Err(e) = stream.read_exact(&mut buf).await { debug!("read_exact payload failed: {}", e); break; } let rpc: Rpc = match serde_json::from_slice(&buf) { Ok(r) => r, Err(e) => { warn!("failed deserialize rpc: {}", e); continue; } }; process_rpc(rpc, &mut stream, node.clone()).await; } } async fn process_rpc(rpc: Rpc, stream: &mut TcpStream, node: SharedNode) { match rpc { Rpc::RequestVote { term, candidate_id } => { let mut n = node.lock().await; if term > n.current_term { n.current_term = term; n.voted_for = None; n.role = Role::Follower; } let vote_granted = if (n.voted_for.is_none() || n.voted_for.as_deref() == Some(&candidate_id)) && term >= n.current_term { n.voted_for = Some(candidate_id.clone()); true } else { false }; debug!("{}: RequestVote from {} term {} -> grant: {}", n.id, candidate_id, term, vote_granted); let resp = Rpc::RequestVoteResponse { term: n.current_term, vote_granted, }; let _ = send_rpc_direct(stream, &resp).await; } Rpc::RequestVoteResponse { term, vote_granted } => { let mut n = node.lock().await; if term > n.current_term { n.current_term = term; n.role = Role::Follower; n.voted_for = None; n.votes_received = 0; } else { if n.role == Role::Candidate && vote_granted { n.votes_received += 1; let quorum = (n.peers.len() + 1) / 2 + 1; info!("{}: got vote (total {})", n.id, n.votes_received); if n.votes_received >= quorum { info!("{}: becomes Leader in term {}", n.id, n.current_term); n.role = Role::Leader; // when becoming leader, reset leader-specific state if any } } } } Rpc::AppendEntries { term, leader_id } => { let mut n = node.lock().await; if term >= n.current_term { n.current_term = term; n.role = Role::Follower; n.voted_for = Some(leader_id.clone()); // reset election timer would happen in the main loop debug!("{}: received heartbeat from leader {} term {}", n.id, leader_id, term); let resp = Rpc::AppendEntriesResponse { term: n.current_term, success: true, }; let _ = send_rpc_direct(stream, &resp).await; } else { let resp = Rpc::AppendEntriesResponse { term: n.current_term, success: false, }; let _ = send_rpc_direct(stream, &resp).await; } } Rpc::AppendEntriesResponse { term, success } => { let mut n = node.lock().await; if term > n.current_term { n.current_term = term; n.role = Role::Follower; n.voted_for = None; n.votes_received = 0; } else { // leader could use success to update matchIndex; omitted in this simplified demo debug!("{}: AppendEntriesResponse success={}", n.id, success); } } } } async fn send_rpc(addr: &SocketAddr, rpc: &Rpc) { match TcpStream::connect(addr).await { Ok(mut s) => { if let Err(e) = send_rpc_direct(&mut s, rpc).await { debug!("send_rpc write failed: {}", e); } } Err(e) => { debug!("connect failed to {}: {}", addr, e); } } } async fn send_rpc_direct(stream: &mut TcpStream, rpc: &Rpc) -> Result<(), std::io::Error> { let payload = serde_json::to_vec(rpc).unwrap(); let len = (payload.len() as u32).to_be_bytes(); stream.write_all(&len).await?; stream.write_all(&payload).await?; Ok(()) } async fn listener_task(node: SharedNode) -> anyhow::Result<()> { let addr; { let n = node.lock().await; addr = n.addr; } let listener = TcpListener::bind(addr).await?; info!("{}: listening on {}", node.lock().await.id, addr); loop { let (socket, _peer) = listener.accept().await?; let node_cl = node.clone(); tokio::spawn(async move { handle_connection(socket, node_cl).await; }); } } async fn start_election(node: SharedNode) { let mut rng = StdRng::from_entropy(); // <- StdRng 是 Send let mut n = node.lock().await; n.role = Role::Candidate; n.current_term += 1; n.voted_for = Some(n.id.clone()); n.votes_received = 1; // vote for self let term = n.current_term; let candidate_id = n.id.clone(); let peers = n.peers.clone(); info!("{}: starting election for term {}", n.id, term); drop(n); for p in peers { let rpc = Rpc::RequestVote { term, candidate_id: candidate_id.clone(), }; let node_cl = node.clone(); tokio::spawn(async move { send_rpc(&p, &rpc).await; // responses handled by listener; keep ownership to satisfy move let _ = node_cl; }); // small jitter to avoid bursts (现在用 StdRng) let ms: u64 = rng.gen_range(5..25); time::sleep(Duration::from_millis(ms)).await; } } async fn leader_heartbeat_loop(node: SharedNode) { loop { { let n = node.lock().await; if n.role != Role::Leader { // only leader sends heartbeats here drop(n); time::sleep(Duration::from_millis(50)).await; continue; } } let _n_clone = node.clone(); let (peers, term, leader_id) = { let n = node.lock().await; (n.peers.clone(), n.current_term, n.id.clone()) }; for p in peers { let rpc = Rpc::AppendEntries { term, leader_id: leader_id.clone(), }; let paddr = p; tokio::spawn(async move { send_rpc(&paddr, &rpc).await; }); } time::sleep(Duration::from_millis(150)).await; // heartbeat interval } } async fn election_timeout_loop(node: SharedNode) { // StdRng 是 Send + Sync,可以安全地跨线程 let mut rng = StdRng::from_entropy(); loop { // 生成随机超时时间 let timeout: u64 = rng.gen_range(300..500); let mut elapsed = 0u64; let step = 50u64; let mut reset = false; while elapsed < timeout { time::sleep(Duration::from_millis(step)).await; elapsed += step; let n = node.lock().await; if n.role == Role::Leader { reset = true; break; } } if reset { continue; } { let n = node.lock().await; if n.role != Role::Leader { drop(n); start_election(node.clone()).await; } } } } #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::init(); let args: Vec<String> = env::args().collect(); // expect: <bin> <listen_addr> <peer1> <peer2> ... if args.len() < 2 { println!("Usage: {} <listen_addr> [peer_addr...]", args[0]); println!("Example: cargo run -- 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002"); return Ok(()); } let listen_addr: SocketAddr = args[1].parse()?; let mut peers = Vec::new(); for a in args.iter().skip(2) { let sa: SocketAddr = a.parse()?; peers.push(sa); } let id = Uuid::new_v4().to_string(); let node = Arc::new(Mutex::new(Node::new(id.clone(), listen_addr, peers))); info!("Node id: {}", id); // start listener let node_cl = node.clone(); tokio::spawn(async move { if let Err(e) = listener_task(node_cl).await { panic!("listener failed: {}", e); } }); // heartbeat loop (leaders only actually send) let node_cl = node.clone(); tokio::spawn(async move { leader_heartbeat_loop(node_cl).await; }); // election timeout loop let node_cl = node.clone(); tokio::spawn(async move { election_timeout_loop(node_cl).await; }); // keep main alive and periodically print status loop { { let n = node.lock().await; info!("Status: id={} role={:?} term={} voted_for={:?}", n.id, n.role, n.current_term, n.voted_for); } time::sleep(Duration::from_secs(2)).await; } }
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:MapleCity

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!