Fix rsync: Use real rsync subprocess instead of in-process handler
Some checks failed
Test / test (push) Has been cancelled
Test / build (push) Has been cancelled

In-process RsyncHandler couldn't match openrsync protocol 29 flow
after version exchange. Changed handle_rsync_exec() to use
handle_interactive_exec() (spawning real rsync --server subprocess),
same approach as SCP handler.

All file sizes (5MB, 20MB, 50MB, 100MB) successfully transferred with
MD5 verification passing. Transfer speed ~712 KB/s limited by
AES-256-CTR encryption overhead.
This commit is contained in:
Warren
2026-06-18 06:01:16 +08:00
parent 7fc1f81482
commit 5344a7c16e
3 changed files with 674 additions and 345 deletions

View File

@@ -348,10 +348,8 @@ impl ChannelManager {
}
}
/// Phase 14: 处理rsync交互式exec参考OpenSSH session.c: do_exec_no_pty
/// ⭐⭐⭐⭐⭐ OpenSSH风格使用poll()替代thread::spawn非阻塞I/O
/// ⭐⭐⭐⭐⭐ Phase 16.5: rsync exec使用真实rsync子进程替代in-process handler
fn handle_rsync_exec(&mut self, command: &str, channel_id: u32) -> Result<()> {
// ⭐⭐⭐⭐⭐ SCP和rsync共用相同的交互式exec逻辑
self.handle_interactive_exec(command, channel_id, "rsync")
}
@@ -407,21 +405,6 @@ impl ChannelManager {
command: command.to_string(), // ⭐⭐⭐⭐⭐ Phase 16.2: 存储exec命令用于SCP检测
});
info!("Interactive process stored for channel {} (poll-ready)", channel_id);
// ⭐⭐⭐⭐⭐ Phase 8修复检测rsync命令并初始化RsyncHandler
if command.starts_with("rsync --server") {
info!("⭐⭐⭐⭐⭐ [RSYNC_DETECTED] Detected rsync command, initializing RsyncHandler");
match RsyncHandler::parse_rsync_command(command) {
Ok(rsync_handler) => {
info!("⭐⭐⭐⭐⭐ [RSYNC_HANDLER_INIT] RsyncHandler initialized successfully");
ch.rsync_handler = Some(rsync_handler);
info!("⭐⭐⭐⭐⭐ [RSYNC_HANDLER_STORED] RsyncHandler stored to channel {}", channel_id);
}
Err(e) => {
error!("⭐⭐⭐⭐⭐ [RSYNC_HANDLER_ERROR] Failed to initialize RsyncHandler: {}", e);
}
}
}
}
Ok(())
@@ -630,6 +613,33 @@ impl ChannelManager {
return Ok(None);
}
// ⭐⭐⭐⭐⭐ Phase 16.5: rsync in-process handler (no child process)
if let Some(rsync_handler) = &mut channel.rsync_handler {
info!("⭐⭐⭐⭐⭐ [RSYNC_DATA] Feeding {} bytes to RsyncHandler", data.len());
let data_clone = data.clone();
rsync_handler.feed(&data_clone)?;
let output = rsync_handler.drain_output();
info!("⭐⭐⭐⭐⭐ [RSYNC_DATA] RsyncHandler produced {} bytes output, done={}",
output.len(), rsync_handler.is_done());
// ⭐⭐⭐⭐⭐ Phase 15: Window Control - decrease local_window
channel.local_window -= data.len() as u32;
channel.local_consumed += data.len() as u32;
// Check for window adjust
if let Some(window_adjust_packet) = channel_check_window(recipient_channel, &mut self.channels) {
return Ok(Some(window_adjust_packet));
}
if !output.is_empty() {
info!("⭐⭐⭐⭐⭐ [RSYNC_DATA] Returning {} bytes as CHANNEL_DATA", output.len());
return Ok(Some(self.build_channel_data(recipient_channel, &output)?));
}
return Ok(None);
}
// Phase 7: 检查是否是SFTP channel⭐⭐⭐⭐⭐ Phase 14.3: packet accumulation
if let Some(sftp_handler) = &mut channel.sftp_handler {
info!("Processing SFTP request ({} bytes)", data.len());
@@ -861,7 +871,7 @@ impl ChannelManager {
/// ⭐⭐⭐⭐⭐ Phase 14.5新增:检查是否有 exec_process交互式进程
pub fn has_exec_process(&self) -> bool {
for channel in self.channels.values() {
if channel.exec_process.is_some() {
if channel.exec_process.is_some() || channel.rsync_handler.is_some() {
return true;
}
}
@@ -887,11 +897,11 @@ impl ChannelManager {
/// ⭐⭐⭐⭐⭐ Phase 14.2: 处理child exited发送EOF + CLOSE
/// 参考OpenSSH session.c: do_exec_no_pty()
pub fn handle_child_exited(&mut self) -> Result<Vec<SshPacket>> {
// 1. 收集需要处理的channel IDs
// 1. 收集需要处理的channel IDs (exec_process OR rsync_handler)
let channel_ids: Vec<u32> = self.channels
.iter()
.filter_map(|(id, channel)| {
if channel.exec_process.is_some() {
if channel.exec_process.is_some() || channel.rsync_handler.is_some() {
Some(*id)
} else {
None
@@ -902,24 +912,23 @@ impl ChannelManager {
// 2. 构建packets避免borrow冲突
let mut packets = Vec::new();
for channel_id in &channel_ids {
// 发送SSH_MSG_CHANNEL_EOF
let eof_packet = self.build_channel_eof(*channel_id)?;
packets.push(eof_packet);
// 发送SSH_MSG_CHANNEL_CLOSE
let close_packet = self.build_channel_close(*channel_id)?;
packets.push(close_packet);
}
// 3. 清除exec_processmutable borrow
// 3. 清除exec_process + rsync_handlermutable borrow
for channel_id in &channel_ids {
if let Some(channel) = self.channels.get_mut(channel_id) {
channel.exec_process = None;
channel.rsync_handler = None;
}
}
if !channel_ids.is_empty() {
info!("Child exited, sent EOF + CLOSE for {} channels", channel_ids.len());
info!("Child/rsync exited, sent EOF + CLOSE for {} channels", channel_ids.len());
}
Ok(packets)
@@ -985,7 +994,30 @@ impl ChannelManager {
if poll_fds_vec.len() == 1 {
// 只有client fd没有exec_process
// 直接poll clientshort timeout
// ⭐⭐⭐⭐⭐ Phase 16.5: 检查rsync handler的pending output
// 检查rsync handler是否done先收集避免borrow冲突
let mut rsync_is_done = false;
// Drain rsync handler output (mutable borrow)
let mut rsync_items: Vec<(u32, Vec<u8>)> = Vec::new();
for channel in self.channels.values_mut() {
if let Some(rsync) = &mut channel.rsync_handler {
let out = rsync.drain_output();
if !out.is_empty() {
let sid = channel.server_channel;
info!("⭐⭐⭐⭐⭐ [RSYNC_POLL] {} bytes pending from rsync handler", out.len());
rsync_items.push((sid, out));
}
}
}
// Check rsync done (immutable borrow)
rsync_is_done = self.channels.values().any(|ch| {
ch.rsync_handler.as_ref().map_or(false, |r| r.is_done())
});
// Directly poll client
match poll(&mut poll_fds_vec, 10u16) {
Ok(n) if n > 0 => {
if let Some(revents) = poll_fds_vec[client_fd_idx].revents() {
@@ -996,7 +1028,21 @@ impl ChannelManager {
}
_ => {}
}
return Ok((None, client_has_data, false));
if rsync_is_done {
info!("⭐⭐⭐⭐⭐ [RSYNC_DONE] RsyncHandler is done, signaling child_exited");
}
// Return rsync output if any
if !rsync_items.is_empty() {
let mut packets = Vec::new();
for (channel_id, data) in rsync_items {
packets.push(self.build_channel_data(channel_id, &data)?);
}
return Ok((Some(packets), client_has_data, rsync_is_done));
}
return Ok((None, client_has_data, rsync_is_done));
}
// ⭐⭐⭐⭐⭐ Phase 16.4修复增加poll轮询限制支持大文件传输