// SSH Window Size管理(Phase 13.6) // 参考RFC 4254 Section 5.2: Window Size Adjustment use anyhow::{Result, anyhow}; use log::{info, warn, debug}; use std::sync::{Arc, Mutex}; use byteorder::{BigEndian, WriteBytesExt}; use crate::ssh_server::packet::PacketType; /// Window Size管理器(Phase 13.6) pub struct WindowManager { initial_window_size: u32, // RFC 4254: 2MB默认 current_window_size: Arc>, max_packet_size: u32, // RFC 4254: 32KB默认 consumed_bytes: Arc>, // 已消耗bytes统计 } impl WindowManager { /// 创建Window Size管理器(Phase 13.6) pub fn new(initial_window_size: u32, max_packet_size: u32) -> Self { Self { initial_window_size, current_window_size: Arc::new(Mutex::new(initial_window_size)), max_packet_size, consumed_bytes: Arc::new(Mutex::new(0)), } } /// RFC 4254默认window size(2MB) pub fn rfc_default() -> Self { Self::new(2097152, 32768) // 2MB window, 32KB packet } /// 检查window size是否足够(Phase 13.6) pub fn check_window_available(&self, data_size: u32) -> bool { let window = self.current_window_size.lock().unwrap(); let available = *window >= data_size; if !available { warn!("Window size insufficient: need {}, have {}", data_size, *window); } available } /// 消耗window size(Phase 13.6:发送数据后) pub fn consume_window(&self, data_size: u32) -> Result<()> { let mut window = self.current_window_size.lock().unwrap(); if *window < data_size { return Err(anyhow!("Window size insufficient: need {}, have {}", data_size, *window)); } *window -= data_size; // 统计已消耗bytes let mut consumed = self.consumed_bytes.lock().unwrap(); *consumed += data_size; info!("Window size consumed: {} bytes, remaining {}, total consumed {}", data_size, *window, *consumed); Ok(()) } /// 调整window size(Phase 13.6:收到SSH_MSG_CHANNEL_WINDOW_ADJUST) pub fn adjust_window(&self, bytes_to_add: u32) { let mut window = self.current_window_size.lock().unwrap(); *window += bytes_to_add; info!("Window size adjusted: added {} bytes, total {}", bytes_to_add, *window); } /// 构建SSH_MSG_CHANNEL_WINDOW_ADJUST packet(Phase 13.6) pub fn build_window_adjust_packet(channel_id: u32, bytes_to_add: u32) -> Result> { let mut packet = Vec::new(); // Packet type: SSH_MSG_CHANNEL_WINDOW_ADJUST (type 93) packet.write_u8(PacketType::SSH_MSG_CHANNEL_WINDOW_ADJUST as u8)?; // Recipient channel ID packet.write_u32::(channel_id)?; // Bytes to add packet.write_u32::(bytes_to_add)?; info!("Built SSH_MSG_CHANNEL_WINDOW_ADJUST for channel {}: +{} bytes", channel_id, bytes_to_add); Ok(packet) } /// 获取当前window size(Phase 13.6) pub fn get_current_window(&self) -> u32 { *self.current_window_size.lock().unwrap() } /// 获取已消耗bytes(Phase 13.6) pub fn get_consumed_bytes(&self) -> u32 { *self.consumed_bytes.lock().unwrap() } /// 重置window size(Phase 13.6:channel重置) pub fn reset_window(&self) { let mut window = self.current_window_size.lock().unwrap(); *window = self.initial_window_size; let mut consumed = self.consumed_bytes.lock().unwrap(); *consumed = 0; info!("Window size reset to initial: {}", self.initial_window_size); } } /// Channel生命周期管理(Phase 13.7) pub struct ChannelLifecycle { channel_id: u32, eof_sent: bool, close_received: bool, } impl ChannelLifecycle { /// 创建Channel生命周期管理器(Phase 13.7) pub fn new(channel_id: u32) -> Self { Self { channel_id, eof_sent: false, close_received: false, } } /// 构建SSH_MSG_CHANNEL_EOF packet(Phase 13.7) pub fn build_eof_packet(channel_id: u32) -> Result> { let mut packet = Vec::new(); // Packet type: SSH_MSG_CHANNEL_EOF (type 96) packet.write_u8(PacketType::SSH_MSG_CHANNEL_EOF as u8)?; // Recipient channel ID packet.write_u32::(channel_id)?; info!("Built SSH_MSG_CHANNEL_EOF for channel {}", channel_id); Ok(packet) } /// 构建SSH_MSG_CHANNEL_CLOSE packet(Phase 13.7) pub fn build_close_packet(channel_id: u32) -> Result> { let mut packet = Vec::new(); // Packet type: SSH_MSG_CHANNEL_CLOSE (type 97) packet.write_u8(PacketType::SSH_MSG_CHANNEL_CLOSE as u8)?; // Recipient channel ID packet.write_u32::(channel_id)?; info!("Built SSH_MSG_CHANNEL_CLOSE for channel {}", channel_id); Ok(packet) } /// 标记EOF已发送(Phase 13.7) pub fn mark_eof_sent(&mut self) { self.eof_sent = true; info!("Channel {} EOF marked as sent", self.channel_id); } /// 标记CLOSE已接收(Phase 13.7) pub fn mark_close_received(&mut self) { self.close_received = true; info!("Channel {} CLOSE marked as received", self.channel_id); } /// 检查是否可以清理channel(Phase 13.7) pub fn can_cleanup(&self) -> bool { self.eof_sent && self.close_received } /// 清理channel资源(Phase 13.7) pub fn cleanup_channel(&self) -> Result<()> { info!("Cleaning up channel {} resources", self.channel_id); // Phase 13.7: 实际清理逻辑需要在ChannelManager中实现 // - 移除channel记录 // - 关闭TCP连接 // - 清理监听器(如果是forwarded-tcpip) info!("Channel {} cleanup completed", self.channel_id); Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_window_manager_creation() { let manager = WindowManager::rfc_default(); assert_eq!(manager.get_current_window(), 2097152); assert_eq!(manager.max_packet_size, 32768); } #[test] fn test_window_consumption() { let manager = WindowManager::rfc_default(); // 消耗1000 bytes manager.consume_window(1000).unwrap(); assert_eq!(manager.get_current_window(), 2097152 - 1000); assert_eq!(manager.get_consumed_bytes(), 1000); } #[test] fn test_window_adjustment() { let manager = WindowManager::rfc_default(); // 消耗1000 bytes manager.consume_window(1000).unwrap(); // 调整500 bytes manager.adjust_window(500); assert_eq!(manager.get_current_window(), 2097152 - 1000 + 500); } #[test] fn test_build_eof_packet() { let packet = ChannelLifecycle::build_eof_packet(1).unwrap(); assert_eq!(packet[0], PacketType::SSH_MSG_CHANNEL_EOF as u8); } #[test] fn test_build_close_packet() { let packet = ChannelLifecycle::build_close_packet(1).unwrap(); assert_eq!(packet[0], PacketType::SSH_MSG_CHANNEL_CLOSE as u8); } }