From fc6648e4fda3c3ab525cf3e5595ac3aae08f878c Mon Sep 17 00:00:00 2001 From: Warren Date: Sat, 20 Jun 2026 11:48:57 +0800 Subject: [PATCH] feat(ssh): Implement SCP protocol handling with ChannelReadWrite (Phase 8 complete) --- markbase-core/src/ssh_server/channel.rs | 77 +++++++++++---------- markbase-core/src/ssh_server/scp_handler.rs | 59 ++++++++++++++++ 2 files changed, 101 insertions(+), 35 deletions(-) diff --git a/markbase-core/src/ssh_server/channel.rs b/markbase-core/src/ssh_server/channel.rs index 095ae06..042a4a6 100644 --- a/markbase-core/src/ssh_server/channel.rs +++ b/markbase-core/src/ssh_server/channel.rs @@ -780,42 +780,49 @@ impl ChannelManager { channel.scp_input_buffer.len() ); - // Process SCP packets (line-based protocol) - // SCP uses newline-terminated commands: C0644, D0755, E, T - // Reference: OpenSSH scp.c - - // Find complete lines in buffer - let mut responses: Vec> = Vec::new(); - while let Some(newline_pos) = channel.scp_input_buffer.iter().position(|&b| b == b'\n') { - let line = channel.scp_input_buffer[..newline_pos].to_vec(); - channel.scp_input_buffer = channel.scp_input_buffer[newline_pos + 1..].to_vec(); - - info!("SCP command: {}", String::from_utf8_lossy(&line)); - - // Process SCP command - // TODO: Full implementation requires ScpHandler.handle_scp() with ReadWrite trait - // Current implementation: basic ACK (0 byte) - responses.push(vec![0]); // SCP ACK - } - - // Check for window adjust - if let Some(window_adjust_packet) = - channel_check_window(recipient_channel, &mut self.channels) - { - return Ok(Some(window_adjust_packet)); - } - - // Send SCP responses - if !responses.is_empty() { - // All responses except last go to pending_packets - for i in 0..responses.len().saturating_sub(1) { - let pending = self.build_channel_data(recipient_channel, &responses[i])?; - self.pending_packets.push_back(pending); + // ⭐⭐⭐⭐⭐ Phase 8: Use ChannelReadWrite wrapper + use crate::ssh_server::scp_handler::ChannelReadWrite; + + // Create wrapper with accumulated input + let mut channel_rw = ChannelReadWrite::new(channel.scp_input_buffer.clone()); + + // Process SCP protocol + // Reference: OpenSSH scp.c: sink() (destination mode) + match scp_handler.handle_scp(&mut channel_rw) { + Ok(_) => { + info!("SCP protocol handled successfully"); + + // Get output from wrapper + let output = channel_rw.drain_output(); + info!("SCP produced {} bytes output", output.len()); + + // Update input buffer (remove consumed data) + if channel_rw.has_remaining_input() { + warn!("SCP handler did not consume all input"); + } + channel.scp_input_buffer.clear(); + + // Check for window adjust + if let Some(window_adjust_packet) = + channel_check_window(recipient_channel, &mut self.channels) + { + // Send window adjust before SCP response + self.pending_packets.push_back(window_adjust_packet); + } + + // Send SCP response if available + if !output.is_empty() { + return Ok(Some(self.build_channel_data(recipient_channel, &output)?)); + } } - - // Last response is returned - if let Some(last_response) = responses.into_iter().last() { - return Ok(Some(self.build_channel_data(recipient_channel, &last_response)?)); + Err(e) => { + warn!("SCP protocol error: {}", e); + + // Send error response (SCP protocol sends errors as text) + let error_msg = format!("{}\n", e); + channel.scp_input_buffer.clear(); + + return Ok(Some(self.build_channel_data(recipient_channel, error_msg.as_bytes())?)); } } diff --git a/markbase-core/src/ssh_server/scp_handler.rs b/markbase-core/src/ssh_server/scp_handler.rs index 4885230..6f94383 100644 --- a/markbase-core/src/ssh_server/scp_handler.rs +++ b/markbase-core/src/ssh_server/scp_handler.rs @@ -397,6 +397,65 @@ impl ScpHandler { pub trait ReadWrite: Read + Write {} impl ReadWrite for T {} +/// ⭐⭐⭐⭐⭐ Phase 8: Channel wrapper for SCP protocol +/// 实现 Read + Write traits,用于 ScpHandler 和 SSH channel 之间传递数据 +pub struct ChannelReadWrite { + input_buffer: Vec, + output_buffer: Vec, + input_pos: usize, +} + +impl ChannelReadWrite { + pub fn new(input_buffer: Vec) -> Self { + Self { + input_buffer, + output_buffer: Vec::new(), + input_pos: 0, + } + } + + pub fn feed_input(&mut self, data: &[u8]) { + self.input_buffer.extend_from_slice(data); + } + + pub fn drain_output(&mut self) -> Vec { + let output = self.output_buffer.clone(); + self.output_buffer.clear(); + output + } + + pub fn has_remaining_input(&self) -> bool { + self.input_pos < self.input_buffer.len() + } +} + +impl Read for ChannelReadWrite { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let remaining = self.input_buffer.len() - self.input_pos; + let to_read = std::cmp::min(buf.len(), remaining); + + if to_read == 0 { + return Ok(0); + } + + buf[..to_read].copy_from_slice(&self.input_buffer[self.input_pos..self.input_pos + to_read]); + self.input_pos += to_read; + + Ok(to_read) + } +} + +impl Write for ChannelReadWrite { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.output_buffer.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*;