本篇文章深入解析一个用 Rust 和 Tokio 实现的简化版 Raft 共识算法,逐块分析源码的实现原理和设计思想。适合学习分布式系统、异步编程和 Rust 并发的开发者。
AI写的文章,别问了,自己有点累了今天
本项目实现了 Raft 算法的核心机制:
它使用了:
Tokio 异步运行时Arc<Mutex<Node>> 共享节点状态StdRng 保证异步安全的随机数生成serde 用于 RPC 消息序列化rust#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum Role { Follower, Candidate, Leader }
struct Node {
id: String,
addr: SocketAddr,
peers: Vec<SocketAddr>,
current_term: u64,
voted_for: Option<String>,
role: Role,
votes_received: usize,
}
共享节点状态通过 Arc<Mutex<Node>> 实现,保证多任务异步访问安全。
rust#[derive(Debug, Clone, Serialize, Deserialize)]
enum Rpc {
RequestVote { term: u64, candidate_id: String },
RequestVoteResponse { term: u64, vote_granted: bool },
AppendEntries { term: u64, leader_id: String },
AppendEntriesResponse { term: u64, success: bool },
}
RequestVote / RequestVoteResponse 用于选举AppendEntries / AppendEntriesResponse 用于 Leader 心跳serde 自动序列化到 JSON,方便网络传输这里简化了日志条目,不包含 log replication。
rustasync fn handle_connection(mut stream: TcpStream, node: SharedNode)
serde_json 反序列化process_rpc 处理具体逻辑rustasync fn process_rpc(rpc: Rpc, stream: &mut TcpStream, node: SharedNode)
RequestVote:判断 term、是否已投票,返回投票结果RequestVoteResponse:Candidate 更新票数,如果票数达到多数则成为 LeaderAppendEntries:Follower 收到心跳,重置选举状态AppendEntriesResponse:Leader 可用来更新 matchIndex(本 demo 未实现)注意:每次访问节点状态都需要
await node.lock(),保证状态安全。
rustasync fn start_election(node: SharedNode)
CandidateStdRng 生成随机 jitter,避免同时发起选举使用
StdRng而非thread_rng是因为thread_rng不可Send,在tokio::spawn的异步块里会导致编译错误。
rustasync fn leader_heartbeat_loop(node: SharedNode)
AppendEntries 消息这保证了 Follower 能及时感知 Leader 存在,防止选举冲突。
rustasync fn election_timeout_loop(node: SharedNode)
StdRng 生成异步安全随机数核心思想:Raft 利用随机化选举超时避免 split vote。
rust#[tokio::main]
async fn main() -> anyhow::Result<()> { ... }
解析命令行参数(监听地址 + peer 列表)
创建节点状态 Arc<Mutex<Node>>
启动三个核心异步任务:
listener_task:TCP 接收连接leader_heartbeat_loop:Leader 心跳election_timeout_loop:选举超时主循环每 2 秒打印节点状态
使用
tokio::spawn并发运行异步任务,实现高性能非阻塞网络节点。
cargo.toml
toml[package]
name = "raft"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.34", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.8"
uuid = { version = "1.4", features = ["v4"] }
log = "0.4"
env_logger = "0.10"
anyhow = "1.0.100"
main.rs
rustuse serde::{Deserialize, Serialize};
use std::{env, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
sync::Mutex,
time,
};
use rand::{rngs::StdRng, Rng, SeedableRng};
use uuid::Uuid;
use log::{info, warn, debug};
#[derive(Debug, Clone, Serialize, Deserialize)]
enum Rpc {
RequestVote {
term: u64,
candidate_id: String,
},
RequestVoteResponse {
term: u64,
vote_granted: bool,
},
AppendEntries {
term: u64,
leader_id: String,
// simplified: no log entries in this demo
},
AppendEntriesResponse {
term: u64,
success: bool,
},
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum Role {
Follower,
Candidate,
Leader,
}
struct Node {
id: String,
addr: SocketAddr,
peers: Vec<SocketAddr>,
// volatile state
current_term: u64,
voted_for: Option<String>,
role: Role,
votes_received: usize,
}
type SharedNode = Arc<Mutex<Node>>;
impl Node {
fn new(id: String, addr: SocketAddr, peers: Vec<SocketAddr>) -> Self {
Self {
id,
addr,
peers,
current_term: 0,
voted_for: None,
role: Role::Follower,
votes_received: 0,
}
}
}
async fn handle_connection(mut stream: TcpStream, node: SharedNode) {
let mut buf = Vec::new();
loop {
let mut len_buf = [0u8; 4];
if let Err(e) = stream.read_exact(&mut len_buf).await {
debug!("read_exact len failed: {}", e);
break;
}
let len = u32::from_be_bytes(len_buf) as usize;
buf.resize(len, 0);
if let Err(e) = stream.read_exact(&mut buf).await {
debug!("read_exact payload failed: {}", e);
break;
}
let rpc: Rpc = match serde_json::from_slice(&buf) {
Ok(r) => r,
Err(e) => {
warn!("failed deserialize rpc: {}", e);
continue;
}
};
process_rpc(rpc, &mut stream, node.clone()).await;
}
}
async fn process_rpc(rpc: Rpc, stream: &mut TcpStream, node: SharedNode) {
match rpc {
Rpc::RequestVote { term, candidate_id } => {
let mut n = node.lock().await;
if term > n.current_term {
n.current_term = term;
n.voted_for = None;
n.role = Role::Follower;
}
let vote_granted = if (n.voted_for.is_none() || n.voted_for.as_deref() == Some(&candidate_id))
&& term >= n.current_term
{
n.voted_for = Some(candidate_id.clone());
true
} else {
false
};
debug!("{}: RequestVote from {} term {} -> grant: {}", n.id, candidate_id, term, vote_granted);
let resp = Rpc::RequestVoteResponse {
term: n.current_term,
vote_granted,
};
let _ = send_rpc_direct(stream, &resp).await;
}
Rpc::RequestVoteResponse { term, vote_granted } => {
let mut n = node.lock().await;
if term > n.current_term {
n.current_term = term;
n.role = Role::Follower;
n.voted_for = None;
n.votes_received = 0;
} else {
if n.role == Role::Candidate && vote_granted {
n.votes_received += 1;
let quorum = (n.peers.len() + 1) / 2 + 1;
info!("{}: got vote (total {})", n.id, n.votes_received);
if n.votes_received >= quorum {
info!("{}: becomes Leader in term {}", n.id, n.current_term);
n.role = Role::Leader;
// when becoming leader, reset leader-specific state if any
}
}
}
}
Rpc::AppendEntries { term, leader_id } => {
let mut n = node.lock().await;
if term >= n.current_term {
n.current_term = term;
n.role = Role::Follower;
n.voted_for = Some(leader_id.clone());
// reset election timer would happen in the main loop
debug!("{}: received heartbeat from leader {} term {}", n.id, leader_id, term);
let resp = Rpc::AppendEntriesResponse {
term: n.current_term,
success: true,
};
let _ = send_rpc_direct(stream, &resp).await;
} else {
let resp = Rpc::AppendEntriesResponse {
term: n.current_term,
success: false,
};
let _ = send_rpc_direct(stream, &resp).await;
}
}
Rpc::AppendEntriesResponse { term, success } => {
let mut n = node.lock().await;
if term > n.current_term {
n.current_term = term;
n.role = Role::Follower;
n.voted_for = None;
n.votes_received = 0;
} else {
// leader could use success to update matchIndex; omitted in this simplified demo
debug!("{}: AppendEntriesResponse success={}", n.id, success);
}
}
}
}
async fn send_rpc(addr: &SocketAddr, rpc: &Rpc) {
match TcpStream::connect(addr).await {
Ok(mut s) => {
if let Err(e) = send_rpc_direct(&mut s, rpc).await {
debug!("send_rpc write failed: {}", e);
}
}
Err(e) => {
debug!("connect failed to {}: {}", addr, e);
}
}
}
async fn send_rpc_direct(stream: &mut TcpStream, rpc: &Rpc) -> Result<(), std::io::Error> {
let payload = serde_json::to_vec(rpc).unwrap();
let len = (payload.len() as u32).to_be_bytes();
stream.write_all(&len).await?;
stream.write_all(&payload).await?;
Ok(())
}
async fn listener_task(node: SharedNode) -> anyhow::Result<()> {
let addr;
{
let n = node.lock().await;
addr = n.addr;
}
let listener = TcpListener::bind(addr).await?;
info!("{}: listening on {}", node.lock().await.id, addr);
loop {
let (socket, _peer) = listener.accept().await?;
let node_cl = node.clone();
tokio::spawn(async move {
handle_connection(socket, node_cl).await;
});
}
}
async fn start_election(node: SharedNode) {
let mut rng = StdRng::from_entropy(); // <- StdRng 是 Send
let mut n = node.lock().await;
n.role = Role::Candidate;
n.current_term += 1;
n.voted_for = Some(n.id.clone());
n.votes_received = 1; // vote for self
let term = n.current_term;
let candidate_id = n.id.clone();
let peers = n.peers.clone();
info!("{}: starting election for term {}", n.id, term);
drop(n);
for p in peers {
let rpc = Rpc::RequestVote {
term,
candidate_id: candidate_id.clone(),
};
let node_cl = node.clone();
tokio::spawn(async move {
send_rpc(&p, &rpc).await;
// responses handled by listener; keep ownership to satisfy move
let _ = node_cl;
});
// small jitter to avoid bursts (现在用 StdRng)
let ms: u64 = rng.gen_range(5..25);
time::sleep(Duration::from_millis(ms)).await;
}
}
async fn leader_heartbeat_loop(node: SharedNode) {
loop {
{
let n = node.lock().await;
if n.role != Role::Leader {
// only leader sends heartbeats here
drop(n);
time::sleep(Duration::from_millis(50)).await;
continue;
}
}
let _n_clone = node.clone();
let (peers, term, leader_id) = {
let n = node.lock().await;
(n.peers.clone(), n.current_term, n.id.clone())
};
for p in peers {
let rpc = Rpc::AppendEntries {
term,
leader_id: leader_id.clone(),
};
let paddr = p;
tokio::spawn(async move {
send_rpc(&paddr, &rpc).await;
});
}
time::sleep(Duration::from_millis(150)).await; // heartbeat interval
}
}
async fn election_timeout_loop(node: SharedNode) {
// StdRng 是 Send + Sync,可以安全地跨线程
let mut rng = StdRng::from_entropy();
loop {
// 生成随机超时时间
let timeout: u64 = rng.gen_range(300..500);
let mut elapsed = 0u64;
let step = 50u64;
let mut reset = false;
while elapsed < timeout {
time::sleep(Duration::from_millis(step)).await;
elapsed += step;
let n = node.lock().await;
if n.role == Role::Leader {
reset = true;
break;
}
}
if reset {
continue;
}
{
let n = node.lock().await;
if n.role != Role::Leader {
drop(n);
start_election(node.clone()).await;
}
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
let args: Vec<String> = env::args().collect();
// expect: <bin> <listen_addr> <peer1> <peer2> ...
if args.len() < 2 {
println!("Usage: {} <listen_addr> [peer_addr...]", args[0]);
println!("Example: cargo run -- 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002");
return Ok(());
}
let listen_addr: SocketAddr = args[1].parse()?;
let mut peers = Vec::new();
for a in args.iter().skip(2) {
let sa: SocketAddr = a.parse()?;
peers.push(sa);
}
let id = Uuid::new_v4().to_string();
let node = Arc::new(Mutex::new(Node::new(id.clone(), listen_addr, peers)));
info!("Node id: {}", id);
// start listener
let node_cl = node.clone();
tokio::spawn(async move {
if let Err(e) = listener_task(node_cl).await {
panic!("listener failed: {}", e);
}
});
// heartbeat loop (leaders only actually send)
let node_cl = node.clone();
tokio::spawn(async move {
leader_heartbeat_loop(node_cl).await;
});
// election timeout loop
let node_cl = node.clone();
tokio::spawn(async move {
election_timeout_loop(node_cl).await;
});
// keep main alive and periodically print status
loop {
{
let n = node.lock().await;
info!("Status: id={} role={:?} term={} voted_for={:?}",
n.id, n.role, n.current_term, n.voted_for);
}
time::sleep(Duration::from_secs(2)).await;
}
}


本文作者:MapleCity
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!