Complete Phase 15: Window Control + sshbuf zero-copy + SCP support
Some checks failed
Test / test (push) Has been cancelled
Test / build (push) Has been cancelled

- Fix Window Control: decrease local_window on SSH_MSG_CHANNEL_DATA (critical fix)
- Implement SSH_MSG_CHANNEL_WINDOW_ADJUST (OpenSSH channels.c style)
- Add sshbuf.rs: zero-copy buffer management (339 lines, OpenSSH sshbuf.c reference)
- Add SCP command detection (scp -t/-f support for legacy protocol)
- Add handle_scp_exec() and handle_interactive_exec() for SCP/rsync
- Verify: rsync 100MB transfer successful, SCP legacy protocol working
- Security: All crypto using RustCrypto authoritative libraries (x25519-dalek, ed25519-dalek, aes, hmac)
This commit is contained in:
Warren
2026-06-17 13:59:28 +08:00
parent 99af9dc96e
commit 19a99cc676
6 changed files with 629 additions and 62 deletions

View File

@@ -7,7 +7,7 @@ use crate::ssh_server::port_forward::{PortForwardManager, DirectTcpipChannel, Fo
use std::io::{Read, Write}; // 导入Write traitOpenSSH标准
use anyhow::{Result, anyhow};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use log::{info, warn, debug};
use log::{info, warn, debug, error};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use crate::ssh_server::sftp_handler::SftpHandler; // Phase 7: SFTP handler
@@ -115,6 +115,16 @@ impl ChannelManager {
server_channel,
sender_channel,
channel_type: "session".to_string(),
// ⭐⭐⭐⭐⭐ Phase 15: Window Control参考OpenSSH channels.h
remote_window: initial_window_size, // 远端窗口(从 CHANNEL_OPEN packet 中读取)
remote_maxpacket: maximum_packet_size, // 远端最大 packet
local_window: 2097152, // 本地窗口OpenSSH 默认 2MB
local_window_max: 2097152, // 本地窗口最大值(同上)
local_consumed: 0, // 本地已消费的数据(初始为 0⭐⭐⭐⭐⭐
local_maxpacket: 32768, // 本地最大 packetOpenSSH 默认 32KB
// 旧字段(保留兼容)
window_size: initial_window_size,
maximum_packet_size,
state: ChannelState::Open,
@@ -172,6 +182,15 @@ impl ChannelManager {
server_channel,
sender_channel,
channel_type: "direct-tcpip".to_string(),
// ⭐⭐⭐⭐⭐ Phase 15: Window Control
remote_window: initial_window_size,
remote_maxpacket: maximum_packet_size,
local_window: 2097152,
local_window_max: 2097152,
local_consumed: 0,
local_maxpacket: 32768,
window_size: initial_window_size,
maximum_packet_size,
state: ChannelState::Open,
@@ -179,9 +198,9 @@ impl ChannelManager {
sftp_handler: None,
scp_handler: None,
rsync_handler: None,
exec_process: None, // Phase 14: 交互式exec
sftp_input_buffer: Vec::new(), // ⭐⭐⭐⭐⭐ Phase 14.2修复
scp_input_buffer: Vec::new(), // ⭐⭐⭐⭐⭐ Phase 14.4修复
exec_process: None,
sftp_input_buffer: Vec::new(),
scp_input_buffer: Vec::new(),
direct_tcpip: Some(direct_tcpip),
forwarded_tcpip: None,
};
@@ -217,6 +236,15 @@ impl ChannelManager {
server_channel,
sender_channel,
channel_type: "forwarded-tcpip".to_string(),
// ⭐⭐⭐⭐⭐ Phase 15: Window Control
remote_window: initial_window_size,
remote_maxpacket: maximum_packet_size,
local_window: 2097152,
local_window_max: 2097152,
local_consumed: 0,
local_maxpacket: 32768,
window_size: initial_window_size,
maximum_packet_size,
state: ChannelState::Open,
@@ -294,10 +322,14 @@ impl ChannelManager {
info!("Exec command: {}", command);
// Phase 14: 检测rsync命令启动交互式进程
// Phase 14: 检测rsync/SCP命令,启动交互式进程
if command.starts_with("rsync --server") || command.contains("rsync") {
info!("Detected rsync command, starting interactive process");
info!("⭐⭐⭐⭐⭐ [EXEC_REQUEST] Detected rsync command: {}", command);
self.handle_rsync_exec(&command, channel)?;
} else if command.starts_with("scp") || command.contains("scp -") {
// ⭐⭐⭐⭐⭐ Phase 14.5: SCP命令处理scp -t destination 或 scp -f source
info!("⭐⭐⭐⭐⭐ [EXEC_REQUEST] Detected SCP command: {}", command);
self.handle_scp_exec(&command, channel)?;
} else {
// Phase 6: 普通命令使用非交互式执行
let output = self.execute_command(&command)?;
@@ -318,10 +350,23 @@ impl ChannelManager {
/// Phase 14: 处理rsync交互式exec参考OpenSSH session.c: do_exec_no_pty
/// ⭐⭐⭐⭐⭐ OpenSSH风格使用poll()替代thread::spawn非阻塞I/O
fn handle_rsync_exec(&mut self, command: &str, channel_id: u32) -> Result<()> {
// ⭐⭐⭐⭐⭐ SCP和rsync共用相同的交互式exec逻辑
self.handle_interactive_exec(command, channel_id, "rsync")
}
/// Phase 14.5: 处理SCP交互式execscp -t destination 或 scp -f source
/// ⭐⭐⭐⭐⭐ OpenSSH风格使用poll()替代thread::spawn非阻塞I/O
fn handle_scp_exec(&mut self, command: &str, channel_id: u32) -> Result<()> {
// ⭐⭐⭐⭐⭐ SCP和rsync共用相同的交互式exec逻辑
self.handle_interactive_exec(command, channel_id, "scp")
}
/// ⭐⭐⭐⭐⭐ Phase 14.6: 交互式exec通用处理rsync/SCP共用
fn handle_interactive_exec(&mut self, command: &str, channel_id: u32, process_type: &str) -> Result<()> {
use std::process::{Command, Stdio};
use std::os::unix::io::AsRawFd;
info!("Starting interactive process for rsync (OpenSSH poll style): {}", command);
info!("⭐⭐⭐⭐⭐ [{}_EXEC_START] Starting interactive process: {}", process_type, command);
// 启动子进程相当于OpenSSH fork
let mut child = Command::new("sh")
@@ -332,7 +377,7 @@ impl ChannelManager {
.stderr(Stdio::piped()) // ← 创建stderr管道相当于pipe(perr)
.spawn()?;
info!("Child process spawned, PID: {:?}", child.id());
info!("⭐⭐⭐⭐⭐ [CHILD_SPAWNED] Child process spawned, PID: {}", child.id());
// 提取管道相当于OpenSSH dup2
let stdin = child.stdin.take().ok_or(anyhow!("stdin take failed"))?;
@@ -360,6 +405,21 @@ impl ChannelManager {
stderr_fd, // ⭐⭐⭐⭐⭐ RawFd用于poll
});
info!("Interactive process stored for channel {} (poll-ready)", channel_id);
// ⭐⭐⭐⭐⭐ Phase 8修复检测rsync命令并初始化RsyncHandler
if command.starts_with("rsync --server") {
info!("⭐⭐⭐⭐⭐ [RSYNC_DETECTED] Detected rsync command, initializing RsyncHandler");
match RsyncHandler::parse_rsync_command(command) {
Ok(rsync_handler) => {
info!("⭐⭐⭐⭐⭐ [RSYNC_HANDLER_INIT] RsyncHandler initialized successfully");
ch.rsync_handler = Some(rsync_handler);
info!("⭐⭐⭐⭐⭐ [RSYNC_HANDLER_STORED] RsyncHandler stored to channel {}", channel_id);
}
Err(e) => {
error!("⭐⭐⭐⭐⭐ [RSYNC_HANDLER_ERROR] Failed to initialize RsyncHandler: {}", e);
}
}
}
}
Ok(())
@@ -527,13 +587,36 @@ impl ChannelManager {
// 转发数据到子进程stdin相当于OpenSSH写fdin
if let Some(stdin) = &mut exec_process.stdin {
use std::io::Write;
info!("⭐⭐⭐⭐⭐ [BEFORE write_all] Forwarding {} bytes to stdin (OpenSSH style)", data.len());
stdin.write_all(&data)?;
stdin.flush()?;
info!("Forwarded {} bytes to stdin (OpenSSH style)", data.len());
info!("⭐⭐⭐⭐⭐ [AFTER write_all + flush] Successfully forwarded {} bytes to stdin", data.len());
}
// ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Critical修复Window Control - 减少 local_window
// OpenSSH channel.c: channel_input_data() 中 c->local_window -= data_len
if let Some(channel) = self.channels.get_mut(&recipient_channel) {
channel.local_window -= data.len() as u32;
info!("⭐⭐⭐⭐⭐ [WINDOW_DECREASED] channel {} local_window decreased by {} bytes (new window: {})",
recipient_channel, data.len(), channel.local_window);
}
// ⭐⭐⭐⭐⭐ OpenSSH风格不等待直接返回None主循环会通过poll处理stdout
info!("stdin forwarded, returning None (main loop will poll stdout/stderr)");
// ⭐⭐⭐⭐⭐ Phase 15: 更新 local_consumed跟踪已消费的数据
if let Some(channel) = self.channels.get_mut(&recipient_channel) {
channel.local_consumed += data.len() as u32;
info!("⭐⭐⭐⭐⭐ [LOCAL_CONSUMED] channel {} consumed {} bytes (total: {})",
recipient_channel, data.len(), channel.local_consumed);
// ⭐⭐⭐⭐⭐ Phase 15: 检查窗口并发送 Window adjust
if let Some(window_adjust_packet) = channel_check_window(recipient_channel, &mut self.channels) {
// 返回 window adjust packet主循环会发送
return Ok(Some(window_adjust_packet));
}
}
return Ok(None);
}
@@ -733,6 +816,7 @@ impl ChannelManager {
/// 构建SSH_MSG_CHANNEL_DATAPhase 6新增
pub fn build_channel_data(&self, channel: u32, data: &[u8]) -> Result<SshPacket> {
info!("⭐⭐⭐⭐⭐ [build_channel_data] Building SSH_MSG_CHANNEL_DATA: channel={}, data_len={}", channel, data.len());
let mut payload = Vec::new();
payload.write_u8(PacketType::SSH_MSG_CHANNEL_DATA as u8)?;
@@ -740,6 +824,7 @@ impl ChannelManager {
payload.write_u32::<BigEndian>(data.len() as u32)?;
payload.write_all(data)?;
info!("⭐⭐⭐⭐⭐ [build_channel_data] Packet built successfully, payload_len={}", payload.len());
Ok(SshPacket::new(payload))
}
@@ -763,6 +848,16 @@ impl ChannelManager {
None
}
/// ⭐⭐⭐⭐⭐ Phase 14.5新增:检查是否有 exec_process交互式进程
pub fn has_exec_process(&self) -> bool {
for channel in self.channels.values() {
if channel.exec_process.is_some() {
return true;
}
}
false
}
/// 获取channel输出Phase 6新增
pub fn get_channel_output(&mut self, channel_id: u32) -> Option<Vec<u8>> {
if let Some(channel) = self.channels.get_mut(&channel_id) {
@@ -894,9 +989,10 @@ impl ChannelManager {
return Ok((None, client_has_data, false));
}
// ⭐⭐⭐⭐⭐ Phase 14.2关键:添加poll轮询限制防止无限spinning
// 最多轮询100次1秒如果持续无数据检查child状态
let max_poll_iterations = 100;
// ⭐⭐⭐⭐⭐ Phase 14.2修复:增加poll轮询限制支持大文件传输
// 最多轮询100010如果持续无数据检查child状态
// 修复从100改到1000配合stdin close timeout500 iterations = 5s
let max_poll_iterations = 1000;
let mut poll_iteration = 0;
let mut found_data = false;
let mut stdin_closed = false; // ⭐⭐⭐⭐⭐ 新增跟踪stdin是否已关闭
@@ -949,10 +1045,11 @@ impl ChannelManager {
// Child still running正常
info!("Child still running (channel {}, iteration {}, stdin_closed={})", channel_id, iteration, stdin_closed);
// ⭐⭐⭐⭐⭐ Phase 14.2最终修复:主动关闭stdin超时机制
// 如果stdin未关闭且超过50次poll500ms无数据
// ⭐⭐⭐⭐⭐ Phase 14.2修复:增加stdin超时机制(支持大文件传输)
// 如果stdin未关闭且超过500次poll5s无数据
// 强制关闭stdin发送EOF给rsync
if !stdin_closed && iteration >= 50 && exec_process.stdin.is_some() {
// 修复从50改到500支持大文件传输预计可传输50MB+
if !stdin_closed && iteration >= 500 && exec_process.stdin.is_some() {
info!("⭐⭐⭐⭐⭐ Forcing stdin close after {} iterations ({} ms) - sending EOF to rsync", iteration, iteration * 10);
exec_process.stdin = None; // Drop stdin发送EOF
stdin_closed = true;
@@ -1058,12 +1155,13 @@ impl ChannelManager {
// 检查stdout
if let Some(revents) = poll_fds_vec[stdout_idx].revents() {
if revents.contains(PollFlags::POLLIN) {
info!("stdout fd has data (channel {})", channel_id);
info!("⭐⭐⭐⭐⭐ [stdout POLLIN] stdout fd has data (channel {})", channel_id);
if let Some(stdout) = &mut exec_process.stdout {
let mut buffer = vec![0u8; 32768];
info!("⭐⭐⭐⭐⭐ [BEFORE stdout.read] Attempting to read from stdout (buffer size 32KB)");
match stdout.read(&mut buffer) {
Ok(n) if n > 0 => {
info!("Read {} bytes from stdout (channel {})", n, channel_id);
info!("⭐⭐⭐⭐⭐ [AFTER stdout.read] Read {} bytes from stdout (channel {})", n, channel_id);
packets_data.push((channel_id, buffer[..n].to_vec()));
}
Ok(0) => {
@@ -1086,10 +1184,12 @@ impl ChannelManager {
if revents.contains(PollFlags::POLLIN) {
info!("stderr fd has data (channel {})", channel_id);
if let Some(stderr) = &mut exec_process.stderr {
info!("⭐⭐⭐⭐⭐ [BEFORE stderr.read] Attempting to read from stderr (buffer size 32KB)");
let mut buffer = vec![0u8; 32768];
match stderr.read(&mut buffer) {
Ok(n) if n > 0 => {
info!("Read {} bytes from stderr (channel {})", n, channel_id);
info!("⭐⭐⭐⭐⭐ [AFTER stderr.read] Read {} bytes from stderr (channel {})", n, channel_id);
info!("⭐⭐⭐⭐⭐ stderr content: {:?}", &buffer[..std::cmp::min(50, n)]);
packets_data.push((channel_id, buffer[..n].to_vec()));
}
Ok(0) => {
@@ -1105,6 +1205,11 @@ impl ChannelManager {
}
}
}
// ⭐⭐⭐⭐⭐ 检查 POLLHUPpipe 关闭)
if revents.contains(PollFlags::POLLHUP) {
info!("stderr POLLHUP (channel {}), pipe closed", channel_id);
exec_process.stderr = None;
}
}
}
}
@@ -1117,6 +1222,7 @@ impl ChannelManager {
let packet = self.build_channel_data(channel_id, &data)?;
packets.push(packet);
}
info!("⭐⭐⭐⭐⭐ Returning {} packets (stdout/stderr data)", packets.len());
return Ok((Some(packets), client_has_data, child_exited));
}
@@ -1137,6 +1243,7 @@ impl ChannelManager {
// ⭐⭐⭐⭐⭐ 关键立即返回child_exited标志
// server.rs会发送SSH_MSG_CHANNEL_EOF + CLOSE
info!("⭐⭐⭐⭐⭐ No packets to send, returning child_exited flag");
return Ok((None, false, true));
}
Ok(None) => {
@@ -1314,8 +1421,19 @@ struct Channel {
server_channel: u32,
sender_channel: u32,
channel_type: String,
window_size: u32,
maximum_packet_size: u32,
// ⭐⭐⭐⭐⭐ Phase 15: Window Control参考OpenSSH channels.h:176-182
remote_window: u32, // 远端窗口大小OpenSSH: c->remote_window
remote_maxpacket: u32, // 远端最大 packetOpenSSH: c->remote_maxpacket
local_window: u32, // 本地窗口大小OpenSSH: c->local_window
local_window_max: u32, // 本地窗口最大值OpenSSH: c->local_window_max
local_consumed: u32, // 本地已消费的数据OpenSSH: c->local_consumed⭐⭐⭐⭐⭐ 关键!
local_maxpacket: u32, // 本地最大 packetOpenSSH: c->local_maxpacket
// 旧字段(保留兼容)
window_size: u32, // 当前窗口大小(兼容旧代码)
maximum_packet_size: u32, // 最大 packet 大小(兼容旧代码)
state: ChannelState,
output_buffer: Option<Vec<u8>>, // Phase 6: 命令输出缓冲
sftp_handler: Option<SftpHandler>, // Phase 7: SFTP处理器
@@ -1346,6 +1464,90 @@ fn read_ssh_string<R: std::io::Read>(reader: &mut R) -> Result<String> {
Ok(String::from_utf8(buffer)?)
}
/// ⭐⭐⭐⭐⭐ Phase 15: 检查并发送 Window Adjust参考OpenSSH channels.c:2425-2450
///
/// OpenSSH 实现:
/// ```c
/// static int channel_check_window(struct ssh *ssh, Channel *c) {
/// if (c->type == SSH_CHANNEL_OPEN &&
/// !(c->flags & (CHAN_CLOSE_SENT|CHAN_CLOSE_RCVD)) &&
/// ((c->local_window_max - c->local_window > c->local_maxpacket*3) ||
/// c->local_window < c->local_window_max/2) &&
/// c->local_consumed > 0) {
///
/// // 发送 SSH2_MSG_CHANNEL_WINDOW_ADJUST
/// sshpkt_start(ssh, SSH2_MSG_CHANNEL_WINDOW_ADJUST);
/// sshpkt_put_u32(ssh, c->remote_id);
/// sshpkt_put_u32(ssh, c->local_consumed);
/// sshpkt_send(ssh);
///
/// c->local_window += c->local_consumed;
/// c->local_consumed = 0;
/// }
/// }
/// ```
pub fn channel_check_window(channel_id: u32, channels: &mut HashMap<u32, Channel>) -> Option<SshPacket> {
if let Some(channel) = channels.get_mut(&channel_id) {
// 检查窗口调整条件
let window_used = channel.local_window_max - channel.local_window;
let need_adjust = (window_used > channel.local_maxpacket * 3) ||
(channel.local_window < channel.local_window_max / 2);
if need_adjust && channel.local_consumed > 0 {
info!("⭐⭐⭐⭐⭐ [WINDOW_ADJUST] channel {} needs adjust: window_used={}, local_consumed={}",
channel_id, window_used, channel.local_consumed);
// 发送 SSH_MSG_CHANNEL_WINDOW_ADJUST
let adjust_packet = build_window_adjust(
channel.server_channel,
channel.local_consumed
);
// 更新窗口大小
channel.local_window += channel.local_consumed;
channel.local_consumed = 0;
info!("⭐⭐⭐⭐⭐ [WINDOW_UPDATED] channel {} new window: {}",
channel_id, channel.local_window);
return Some(adjust_packet);
}
}
None
}
/// ⭐⭐⭐⭐⭐ Phase 15: 构建 SSH_MSG_CHANNEL_WINDOW_ADJUST packet
///
/// OpenSSH packet format
/// ```c
/// SSH2_MSG_CHANNEL_WINDOW_ADJUST (93)
/// recipient_channel (u32)
/// bytes_to_add (u32)
/// ```
fn build_window_adjust(recipient_channel: u32, bytes_to_add: u32) -> SshPacket {
let mut payload = Vec::new();
// Packet type
payload.push(PacketType::SSH_MSG_CHANNEL_WINDOW_ADJUST as u8);
// recipient_channel (u32)
payload.write_u32::<BigEndian>(recipient_channel).unwrap();
// bytes_to_add (u32)
payload.write_u32::<BigEndian>(bytes_to_add).unwrap();
info!("⭐⭐⭐⭐⭐ [BUILD_WINDOW_ADJUST] recipient_channel={}, bytes_to_add={}",
recipient_channel, bytes_to_add);
SshPacket {
packet_length: 0,
padding_length: 0,
payload,
padding: Vec::new(),
}
}
#[cfg(test)]
mod tests {
use super::*;