// rsync协议实现(Phase 8) // 参考rsync源码和协议规范 use anyhow::{Result, anyhow}; use log::{info, warn, debug}; use std::path::{Path, PathBuf}; use std::fs::{self, File}; use std::io::{Read, Write, BufReader, BufWriter, BufRead}; use std::os::unix::fs::PermissionsExt; // 导入PermissionsExt trait(Unix标准) // 导入BufRead trait(OpenSSH标准) /// rsync Handler(参考rsync源码) pub struct RsyncHandler { root_dir: PathBuf, protocol_version: u32, server_mode: bool, sender_mode: bool, } impl RsyncHandler { pub fn new(root_dir: PathBuf) -> Self { Self { root_dir, protocol_version: 30, // rsync protocol version 30 server_mode: false, sender_mode: false, } } /// 解析rsync命令(参考rsync源码) pub fn parse_rsync_command(command: &str) -> Result { let parts: Vec<&str> = command.split_whitespace().collect(); if parts.len() < 2 || parts[0] != "rsync" { return Err(anyhow!("Invalid rsync command: {}", command)); } let mut handler = RsyncHandler::new(PathBuf::from("/tmp")); for part in &parts[1..] { match part { &"--server" => handler.server_mode = true, &"--sender" => handler.sender_mode = true, path if !path.starts_with('-') && !path.starts_with('.') => { handler.root_dir = PathBuf::from(path); } _ => debug!("rsync flag: {}", part), } } Ok(handler) } /// 处理rsync传输(参考rsync源码) pub fn handle_rsync(&mut self, channel: &mut dyn ReadWrite) -> Result<()> { info!("rsync handler: server={}, sender={}, root={}", self.server_mode, self.sender_mode, self.root_dir.display()); // 使用display()(Rust标准) if !self.server_mode { return Err(anyhow!("rsync --server mode required")); } // rsync协议版本协商 self.negotiate_protocol(channel)?; if self.sender_mode { // rsync --server --sender模式(发送文件列表) self.handle_sender_mode(channel)?; } else { // rsync --server模式(接收文件) self.handle_receiver_mode(channel)?; } Ok(()) } /// rsync协议版本协商(参考rsync源码) fn negotiate_protocol(&mut self, channel: &mut dyn ReadWrite) -> Result<()> { debug!("rsync protocol negotiation"); // rsync协议握手:@RSYNCD: 30 let handshake = "@RSYNCD: 30\n"; channel.write_all(handshake.as_bytes())?; channel.flush()?; // 读取客户端协议版本 let mut response = String::new(); let mut reader = BufReader::new(channel); reader.read_line(&mut response)?; if !response.starts_with("@RSYNCD: ") { return Err(anyhow!("Invalid rsync handshake: {}", response)); } let client_version: u32 = response.trim_start_matches("@RSYNCD: ") .trim() .parse()?; info!("rsync client version: {}", client_version); // 选择最低版本 self.protocol_version = std::cmp::min(client_version, 30); Ok(()) } /// rsync --server --sender模式(发送文件列表) fn handle_sender_mode(&self, channel: &mut dyn ReadWrite) -> Result<()> { info!("rsync sender mode: sending file list"); // 发送模块列表(简化:仅发送root_dir) let module_list = format!("{}\n", self.root_dir.display()); channel.write_all(module_list.as_bytes())?; channel.flush()?; // 等待客户端选择模块 let mut response = String::new(); let mut reader = BufReader::new(&mut *channel); // 重新借用(Rust标准) reader.read_line(&mut response)?; debug!("rsync module selected: {}", response.trim()); // 发送文件列表 self.send_file_list(channel)?; // 发送文件内容(简化:完整传输,不实现增量传输) self.send_files(channel)?; Ok(()) } /// rsync --server模式(接收文件) fn handle_receiver_mode(&mut self, channel: &mut dyn ReadWrite) -> Result<()> { info!("rsync receiver mode: receiving files"); // 接收模块列表请求 let mut response = String::new(); let mut reader = BufReader::new(&mut *channel); // 重新借用(Rust标准) reader.read_line(&mut response)?; debug!("rsync module request: {}", response.trim()); // 发送模块列表 let module_list = format!("{}\n", self.root_dir.display()); channel.write_all(module_list.as_bytes())?; channel.flush()?; // 接收文件列表 self.receive_file_list(channel)?; // 接收文件内容 self.receive_files(channel)?; Ok(()) } /// 发送文件列表(参考rsync源码) fn send_file_list(&self, channel: &mut dyn ReadWrite) -> Result<()> { debug!("rsync sending file list"); let full_path = self.resolve_path(&self.root_dir.to_string_lossy())?; if full_path.is_file() { self.send_file_entry(channel, &full_path)?; } else if full_path.is_dir() { for entry in fs::read_dir(&full_path)? { let entry = entry?; self.send_file_entry(channel, &entry.path())?; } } // 发送文件列表结束标记 channel.write_all(&[0])?; channel.flush()?; Ok(()) } /// 发送文件条目(参考rsync源码) fn send_file_entry(&self, channel: &mut dyn ReadWrite, path: &Path) -> Result<()> { let metadata = fs::metadata(path)?; let size = metadata.len(); let mode = metadata.permissions().mode(); let filename = path.file_name().unwrap().to_string_lossy(); // rsync文件条目格式:mode size filename // 简化实现:仅发送基本信息 let entry = format!("{} {} {}\n", mode, size, filename); channel.write_all(entry.as_bytes())?; debug!("rsync file entry: {} ({} bytes)", filename, size); Ok(()) } /// 接收文件列表(参考rsync源码) fn receive_file_list(&self, channel: &mut dyn ReadWrite) -> Result<()> { debug!("rsync receiving file list"); let mut reader = BufReader::new(channel); let mut line = String::new(); while reader.read_line(&mut line)? > 0 { if line.trim().is_empty() { break; // 文件列表结束 } let parts: Vec<&str> = line.trim().split_whitespace().collect(); if parts.len() >= 3 { let mode: u32 = parts[0].parse()?; let size: u64 = parts[1].parse()?; let filename = parts[2]; debug!("rsync file entry received: {} ({} bytes)", filename, size); } line.clear(); } Ok(()) } /// 发送文件(参考rsync源码) fn send_files(&self, channel: &mut dyn ReadWrite) -> Result<()> { info!("rsync sending files"); let full_path = self.resolve_path(&self.root_dir.to_string_lossy())?; if full_path.is_file() { self.send_file_content(channel, &full_path)?; } else if full_path.is_dir() { for entry in fs::read_dir(&full_path)? { let entry = entry?; if entry.path().is_file() { self.send_file_content(channel, &entry.path())?; } } } // 发送结束标记 channel.write_all(&[0])?; channel.flush()?; Ok(()) } /// 发送文件内容(参考rsync源码) fn send_file_content(&self, channel: &mut dyn ReadWrite, path: &Path) -> Result<()> { let metadata = fs::metadata(path)?; let size = metadata.len(); let filename = path.file_name().unwrap().to_string_lossy(); debug!("rsync sending file content: {} ({} bytes)", filename, size); // rsync文件内容格式:size data checksum // 简化实现:发送文件大小 + 文件内容 let size_bytes = size.to_be_bytes(); channel.write_all(&size_bytes)?; // 发送文件内容 let file = File::open(path)?; let mut reader = BufReader::new(file); let mut buffer = vec![0u8; 8192]; while let Ok(n) = reader.read(&mut buffer) { if n == 0 { break; } channel.write_all(&buffer[..n])?; } channel.flush()?; info!("rsync file sent: {} ({} bytes)", filename, size); Ok(()) } /// 接收文件(参考rsync源码) fn receive_files(&self, channel: &mut dyn ReadWrite) -> Result<()> { info!("rsync receiving files"); let mut reader = BufReader::new(channel); while true { // 读取文件大小(8字节) let mut size_bytes = [0u8; 8]; match reader.read_exact(&mut size_bytes) { Ok(_) => { let size = u64::from_be_bytes(size_bytes); if size == 0 { break; // 结束标记 } // 简化:使用默认文件名 let filename = "received_file.txt"; let full_path = self.resolve_path(filename)?; // 接收文件内容 let file = File::create(&full_path)?; let mut writer = BufWriter::new(file); let mut buffer = vec![0u8; 8192]; let mut remaining = size; while remaining > 0 { let to_read = std::cmp::min(buffer.len() as u64, remaining) as usize; let n = reader.read(&mut buffer[..to_read])?; if n == 0 { break; } writer.write_all(&buffer[..n])?; remaining -= n as u64; } writer.flush()?; info!("rsync file received: {} ({} bytes)", filename, size); } Err(_) => break, // EOF } } Ok(()) } /// 路径解析(安全性检查) fn resolve_path(&self, path: &str) -> Result { let full_path = self.root_dir.join(path); let canonical_path = full_path.canonicalize() .map_err(|e| anyhow!("Path resolution error: {}", e))?; if !canonical_path.starts_with(&self.root_dir.canonicalize()?) { return Err(anyhow!("Path traversal attempt detected")); } Ok(canonical_path) } } /// Read + Write trait组合(用于Channel) pub trait ReadWrite: Read + Write {} impl ReadWrite for T {} #[cfg(test)] mod tests { use super::*; #[test] fn test_rsync_command_parse() { let handler = RsyncHandler::parse_rsync_command("rsync --server --sender .").unwrap(); assert!(handler.server_mode); assert!(handler.sender_mode); } #[test] fn test_rsync_server_parse() { let handler = RsyncHandler::parse_rsync_command("rsync --server .").unwrap(); assert!(handler.server_mode); assert!(!handler.sender_mode); } #[test] fn test_rsync_protocol_version() { let handler = RsyncHandler::new(PathBuf::from("/tmp")); assert_eq!(handler.protocol_version, 30); } }