feat(ssh): Optimize SSH performance Phase 1-2c + stdin fix
Some checks failed
Test / test (push) Has been cancelled
Test / build (push) Has been cancelled

Phase 1: take_payload() optimization
- cipher.rs: Added take_payload() to EncryptedPacket
- server.rs: Use take_payload() to avoid .to_vec() copy

Phase 2a: reuse_buf for CHANNEL_DATA
- channel.rs: Added reuse_buf to ExecProcess
- handle_channel_data(): Read directly into reuse buffer

Phase 2b: read_buf for stdout/stderr
- channel.rs: Added read_buf to ExecProcess
- poll_exec_stdout_and_client(): Use read_buf for all reads

Phase 2c: AES-GCM padding optimization
- cipher.rs: Removed padding .to_vec() in AES-GCM decrypt

stdin fix: All exec commands use interactive process
- channel.rs: Removed conditional rsync/SCP detection
- All exec commands now use handle_interactive_exec()
- Fixes cat/grep/sed stdin support (small files working)

AES-GCM improvements:
- cipher.rs: Added CipherMode enum (AES-GCM vs AES-CTR)
- cipher.rs: AES-256 key derivation (32 bytes)
- cipher.rs: Nonce format follows OpenSSH inc_iv()
- kex.rs: Added aes256-gcm@openssh.com to algorithms

Performance: ~21% improvement for small files
Test: 158 passed, 0 failed
Limitation: Large files (>10MB) not working yet (poll loop issue)
This commit is contained in:
Warren
2026-06-19 20:18:20 +08:00
parent 1650708ac7
commit bd89152e81
7 changed files with 484 additions and 187 deletions

View File

@@ -42,6 +42,8 @@ pub struct ExecProcess {
pub stdout_fd: RawFd, // ⭐⭐⭐⭐⭐ stdout RawFd用于poll
pub stderr_fd: RawFd, // ⭐⭐⭐⭐⭐ stderr RawFd用于poll
pub command: String, // ⭐⭐⭐⭐⭐ Phase 16.2: 存储exec命令用于SCP检测
pub reuse_buf: Vec<u8>, // Phase 2a: reusable buffer for CHANNEL_DATA content
pub read_buf: Vec<u8>, // Phase 2b: reusable buffer for stdout/stderr reads (32KB)
}
impl ChannelManager {
@@ -422,29 +424,13 @@ impl ChannelManager {
info!("Exec command: {}", command);
// Phase 14: 检测rsync/SCP命令启动交互式进程
if command.starts_with("rsync --server") || command.contains("rsync") {
info!(
"⭐⭐⭐⭐⭐ [EXEC_REQUEST] Detected rsync command: {}",
command
);
self.handle_rsync_exec(&command, channel)?;
} else if command.starts_with("scp") || command.contains("scp -") {
// ⭐⭐⭐⭐⭐ Phase 14.5: SCP命令处理scp -t destination 或 scp -f source
info!(
"⭐⭐⭐⭐⭐ [EXEC_REQUEST] Detected SCP command: {}",
command
);
self.handle_scp_exec(&command, channel)?;
} else {
// Phase 6: 普通命令使用非交互式执行
let output = self.execute_command(&command)?;
// 存储输出等待后续发送CHANNEL_DATA
if let Some(ch) = self.channels.get_mut(&channel) {
ch.output_buffer = Some(output);
}
}
// Phase 14: 所有exec命令使用交互式进程与OpenSSH一致
// ⭐⭐⭐⭐⭐ 修复cat/grep/sed等命令需要stdin数据必须使用交互式进程
info!(
"⭐⭐⭐⭐⭐ [EXEC_REQUEST] Starting interactive process for: {}",
command
);
self.handle_interactive_exec(&command, channel, "exec")?;
if want_reply {
Ok(Some(self.build_channel_success(channel)?))
@@ -503,6 +489,7 @@ impl ChannelManager {
let stderr = child.stderr.take().ok_or(anyhow!("stderr take failed"))?;
// ⭐⭐⭐⭐⭐ OpenSSH关键设置非阻塞模式fcntl O_NONBLOCK
// stdin 保持阻塞模式write_all 需要阻塞写入)
let stdout_fd = stdout.as_raw_fd();
let stderr_fd = stderr.as_raw_fd();
@@ -525,6 +512,8 @@ impl ChannelManager {
stdout_fd, // ⭐⭐⭐⭐⭐ RawFd用于poll
stderr_fd, // ⭐⭐⭐⭐⭐ RawFd用于poll
command: command.to_string(), // ⭐⭐⭐⭐⭐ Phase 16.2: 存储exec命令用于SCP检测
reuse_buf: Vec::new(), // Phase 2a: reusable buffer
read_buf: Vec::new(), // Phase 2b: reusable read buffer
});
info!(
"Interactive process stored for channel {} (poll-ready)",
@@ -700,91 +689,68 @@ impl ChannelManager {
// 读取recipient channel
let recipient_channel = cursor.read_u32::<BigEndian>()?;
// 读取数据SSH string
// 读取数据长度SSH string — 先读长度,数据稍后读取
let data_length = cursor.read_u32::<BigEndian>()?;
let mut data = vec![0u8; data_length as usize];
cursor.read_exact(&mut data)?;
info!(
"Channel data: channel={}, length={}",
recipient_channel,
data.len()
);
info!(
"Channel data content (first 20 bytes): {:?}",
&data[..std::cmp::min(20, data.len())]
);
// Phase 14: 检查是否是交互式exec进程
// Phase 14: 检查是否是交互式exec进程用reuse buffer避免分配
if let Some(channel) = self.channels.get_mut(&recipient_channel) {
if let Some(exec_process) = &mut channel.exec_process {
info!("Interactive exec process detected, forwarding data to stdin");
info!("Channel data content: {:?}", &data);
info!("Child PID: {:?}", exec_process.child.id());
// Phase 2a: read into reusable buffer
exec_process.reuse_buf.resize(data_length as usize, 0);
cursor.read_exact(&mut exec_process.reuse_buf)?;
// 检查子进程状态
match exec_process.child.try_wait() {
Ok(Some(status)) => {
warn!("Child process already exited with status: {:?}", status);
}
Ok(None) => {
info!("Child process still running");
}
Err(e) => {
warn!("Failed to check child status: {}", e);
}
}
info!(
"Channel data: channel={}, length={}",
recipient_channel,
exec_process.reuse_buf.len()
);
// 转发数据到子进程stdin相当于OpenSSH写fdin
// 转发数据到子进程stdin
if let Some(stdin) = &mut exec_process.stdin {
use std::io::Write;
info!("⭐⭐⭐⭐⭐ [BEFORE write_all] Forwarding {} bytes to stdin (OpenSSH style)", data.len());
stdin.write_all(&data)?;
info!("⭐⭐⭐⭐⭐ [STDIN_WRITE] Writing {} bytes to child stdin", exec_process.reuse_buf.len());
stdin.write_all(&exec_process.reuse_buf)?;
stdin.flush()?;
info!("⭐⭐⭐⭐⭐ [AFTER write_all + flush] Successfully forwarded {} bytes to stdin", data.len());
info!("⭐⭐⭐⭐⭐ [STDIN_FLUSH] Flushed stdin (channel {})", recipient_channel);
} else {
warn!("⚠️⚠️⚠️⚠️⚠️ [STDIN_MISSING] No stdin pipe available for channel {}", recipient_channel);
}
// ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Critical修复Window Control - 减少 local_window
// OpenSSH channel.c: channel_input_data() 中 c->local_window -= data_len
if let Some(channel) = self.channels.get_mut(&recipient_channel) {
channel.local_window -= data.len() as u32;
info!("⭐⭐⭐⭐⭐ [WINDOW_DECREASED] channel {} local_window decreased by {} bytes (new window: {})",
recipient_channel, data.len(), channel.local_window);
}
// Window Control — all in one borrow scope (NLL releases after last use)
let data_len = exec_process.reuse_buf.len() as u32;
channel.local_window -= data_len;
channel.local_consumed += data_len;
// ⭐⭐⭐⭐⭐ OpenSSH风格不等待直接返回None主循环会通过poll处理stdout
info!("stdin forwarded, returning None (main loop will poll stdout/stderr)");
// No more uses of channel or exec_process after this point
// ⭐⭐⭐⭐⭐ Phase 15: 更新 local_consumed跟踪已消费的数据
if let Some(channel) = self.channels.get_mut(&recipient_channel) {
channel.local_consumed += data.len() as u32;
info!(
"⭐⭐⭐⭐⭐ [LOCAL_CONSUMED] channel {} consumed {} bytes (total: {})",
recipient_channel,
data.len(),
channel.local_consumed
);
// ⭐⭐⭐⭐⭐ Phase 15: 检查窗口并发送 Window adjust
if let Some(window_adjust_packet) =
channel_check_window(recipient_channel, &mut self.channels)
{
// 返回 window adjust packet主循环会发送
return Ok(Some(window_adjust_packet));
}
// 检查窗口并发送 Window adjust
if let Some(window_adjust_packet) =
channel_check_window(recipient_channel, &mut self.channels)
{
return Ok(Some(window_adjust_packet));
}
return Ok(None);
}
// 非exec_process路径分配data供rsync/SFTP handlers使用
let mut data = vec![0u8; data_length as usize];
cursor.read_exact(&mut data)?;
info!(
"Channel data: channel={}, length={}",
recipient_channel,
data.len()
);
// ⭐⭐⭐⭐⭐ Phase 16.5: rsync in-process handler (no child process)
if let Some(rsync_handler) = &mut channel.rsync_handler {
info!(
"⭐⭐⭐⭐⭐ [RSYNC_DATA] Feeding {} bytes to RsyncHandler",
data.len()
);
let data_clone = data.clone();
rsync_handler.feed(&data_clone)?;
rsync_handler.feed(&data)?;
let output = rsync_handler.drain_output();
info!(
@@ -793,7 +759,7 @@ impl ChannelManager {
rsync_handler.is_done()
);
// ⭐⭐⭐⭐⭐ Phase 15: Window Control - decrease local_window
// Window Control - decrease local_window
channel.local_window -= data.len() as u32;
channel.local_consumed += data.len() as u32;
@@ -1392,14 +1358,14 @@ impl ChannelManager {
&& (command_str.contains("scp") || command_str.contains("rsync"));
if let Some(stdout) = &mut exec_process.stdout {
let mut buffer = vec![0u8; 32768];
match stdout.read(&mut buffer) {
exec_process.read_buf.resize(32768, 0);
match stdout.read(&mut exec_process.read_buf) {
Ok(n) if n > 0 => {
info!("Read {} final bytes from stdout (child exited)", n);
// 构建packet并返回
let data = exec_process.read_buf[..n].to_vec();
let packet = self.build_channel_data(
*channel_id,
&buffer[..n],
&data,
)?;
return Ok((
Some(vec![packet]),
@@ -1490,11 +1456,11 @@ impl ChannelManager {
// 读取剩余stdout
if let Some(stdout) = &mut exec_process.stdout {
let mut buffer = vec![0u8; 32768];
match stdout.read(&mut buffer) {
exec_process.read_buf.resize(32768, 0);
match stdout.read(&mut exec_process.read_buf) {
Ok(n) if n > 0 => {
let packet =
self.build_channel_data(*channel_id, &buffer[..n])?;
let data = exec_process.read_buf[..n].to_vec();
let packet = self.build_channel_data(*channel_id, &data)?;
return Ok((Some(vec![packet]), false, true));
}
_ => {}
@@ -1558,12 +1524,12 @@ impl ChannelManager {
channel_id
);
if let Some(stdout) = &mut exec_process.stdout {
let mut buffer = vec![0u8; 32768];
exec_process.read_buf.resize(32768, 0);
info!("⭐⭐⭐⭐⭐ [BEFORE stdout.read] Attempting to read from stdout (buffer size 32KB)");
match stdout.read(&mut buffer) {
match stdout.read(&mut exec_process.read_buf) {
Ok(n) if n > 0 => {
info!("⭐⭐⭐⭐⭐ [AFTER stdout.read] Read {} bytes from stdout (channel {})", n, channel_id);
packets_data.push((channel_id, buffer[..n].to_vec()));
packets_data.push((channel_id, exec_process.read_buf[..n].to_vec()));
}
Ok(0) => {
info!(
@@ -1588,17 +1554,17 @@ impl ChannelManager {
if revents.contains(PollFlags::POLLIN) {
info!("stderr fd has data (channel {})", channel_id);
if let Some(stderr) = &mut exec_process.stderr {
exec_process.read_buf.resize(32768, 0);
info!("⭐⭐⭐⭐⭐ [BEFORE stderr.read] Attempting to read from stderr (buffer size 32KB)");
let mut buffer = vec![0u8; 32768];
match stderr.read(&mut buffer) {
match stderr.read(&mut exec_process.read_buf) {
Ok(n) if n > 0 => {
info!("⭐⭐⭐⭐⭐ [AFTER stderr.read] Read {} bytes from stderr (channel {})", n, channel_id);
info!(
"⭐⭐⭐⭐⭐ stderr content: {:?}",
&buffer[..std::cmp::min(50, n)]
&exec_process.read_buf[..std::cmp::min(50, n)]
);
// ⭐⭐⭐⭐⭐ Phase 17: stderr → SSH_MSG_CHANNEL_EXTENDED_DATA (data_type=1)
stderr_packets.push((channel_id, buffer[..n].to_vec()));
stderr_packets.push((channel_id, exec_process.read_buf[..n].to_vec()));
}
Ok(0) => {
info!(
@@ -1779,24 +1745,18 @@ impl ChannelManager {
if let Some(revents) = poll_fds_vec[stdout_idx].revents() {
if revents.contains(PollFlags::POLLIN) {
info!("stdout fd has data (channel {})", channel_id);
// ⭐⭐⭐⭐⭐ 非阻塞读取因为设置了O_NONBLOCK
if let Some(stdout) = &mut exec_process.stdout {
let mut buffer = vec![0u8; 32768];
match stdout.read(&mut buffer) {
exec_process.read_buf.resize(32768, 0);
match stdout.read(&mut exec_process.read_buf) {
Ok(n) => {
if n > 0 {
info!(
"Read {} bytes from stdout (channel {})",
n, channel_id
);
packets_data.push((channel_id, buffer[..n].to_vec()));
info!("Read {} bytes from stdout (channel {})", n, channel_id);
packets_data.push((channel_id, exec_process.read_buf[..n].to_vec()));
} else {
info!("stdout EOF (channel {})", channel_id);
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// 非阻塞模式,没有数据(正常)
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(e) => {
warn!("stdout read error: {}", e);
}
@@ -1805,27 +1765,21 @@ impl ChannelManager {
}
}
// 检查stderr是否有数据类似处理
if let Some(revents) = poll_fds_vec[stderr_idx].revents() {
if revents.contains(PollFlags::POLLIN) {
info!("stderr fd has data (channel {})", channel_id);
if let Some(stderr) = &mut exec_process.stderr {
let mut buffer = vec![0u8; 32768];
match stderr.read(&mut buffer) {
exec_process.read_buf.resize(32768, 0);
match stderr.read(&mut exec_process.read_buf) {
Ok(n) => {
if n > 0 {
info!(
"Read {} bytes from stderr (channel {})",
n, channel_id
);
packets_data.push((channel_id, buffer[..n].to_vec()));
info!("Read {} bytes from stderr (channel {})", n, channel_id);
packets_data.push((channel_id, exec_process.read_buf[..n].to_vec()));
} else {
info!("stderr EOF (channel {})", channel_id);
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// 非阻塞模式,没有数据(正常)
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(e) => {
warn!("stderr read error: {}", e);
}